#!/usr/bin/python import matplotlib matplotlib.use('GTKAgg') from pylab import * import gtk import gobject import pdb import os import datetime from mpl_toolkits.axes_grid.parasite_axes import SubplotHost from driver.contec import * port = '/dev/ttyUSB0' verbose = True try: driver = CMS50(port=port, verbose=verbose) except Exception, e: print e print "Not plugged in?" driver = None # apt-get install python-matplotlib def fmt_time_tick(tick): tick = tick / 60 return "%02d:%02d"%((tick/60)%24, tick % 60) class App: SEC_PER_SLOW = 5 def __init__(self): self.is_pause = False self.last_packet = { 'hr': 0, 'spo2': 0 } self.background = None self.slow_size = 360 self.fast_size = 100 self.setup_ui() def user_dir(self, user): target_dir = os.path.join(os.environ['HOME'], ".quantself", "om", user) if not os.access(target_dir, os.F_OK): os.makedirs(target_dir) return target_dir def setup_ui(self): self.fig = figure(1) self.canvas = self.fig.canvas self.hr_host = SubplotHost(self.fig, 111) self.fig.add_subplot(self.hr_host) self.fast_x = arange(self.fast_size) self.slow_x = arange(self.slow_size) self.pulse1_y = [ None for x in zeros(self.fast_size)] self.pulse_ax = self.fig.add_axes((0.3,0.7,0.1,0.1), title="Pulse") self.pulse1_line, = self.pulse_ax.plot(self.fast_x, self.pulse1_y, '-') self.pulse_ax.axis([0, self.fast_size, 0, 100]) self.pulse_ax.set_xticks([]) self.pulse_ax.set_yticks([]) spo2_host = self.hr_host.twinx() self.hr_y = [ None for x in zeros(self.slow_size)] self.hr_line, = self.hr_host.plot(self.slow_x, self.hr_y, 'r-', linewidth=2, label="Rate") self.spo2_y = [ None for x in zeros(self.slow_size)] self.spo2_line, = spo2_host.plot(self.slow_x, self.spo2_y, 'g-', linewidth=2, label="SpO2") self.hr_host.axis([0, self.slow_size, 40, 150]) spo2_host.axis([0, self.slow_size, 70, 100]) self.hr_host.set_ylabel("Rate") spo2_host.set_ylabel("SpO2") self.hr_host.axis["left"].label.set_color(self.hr_line.get_color()) spo2_host.axis["right"].label.set_color(self.spo2_line.get_color()) #self.fig.legend((self.hr_line, spo2_line), ('HR', 'SpO2'), 'upper left') self.hr_host.legend(loc="lower center") self.hr_host.set_xticks(range(0, self.slow_size, 60)) self.hr_host.set_xticks(range(0, self.slow_size, 12), minor=True) self.hr_host.set_xticklabels([fmt_time_tick((self.slow_size - tick)*App.SEC_PER_SLOW) for tick in range(0, self.slow_size, 60)]) self.hr_host.axis["bottom"].major_ticklabels.set_rotation(45) self.manager = get_current_fig_manager() toolbar = self.manager.toolbar self.label = gtk.Label('No Reading') self.label.show() self.user_field = gtk.Entry(max=30) self.user_field.set_text('self') self.user_field.show() vbox = self.manager.vbox hbox = gtk.HBox() l = gtk.Label('Status: ') l.show() hbox.pack_start(l, False, False) hbox.pack_start(self.label, False, False) l = gtk.Label(' User: ') l.show() hbox.pack_start(l, False, False) hbox.pack_start(self.user_field, False, False) hbox.show() vbox.pack_start(hbox, False, False) vbox.reorder_child(toolbar, -1) button_h = 0.05 button_y = 0.94 self.b_pause = Button(axes([0.5, button_y, 0.1, button_h]), "Pause") self.b_pause.on_clicked(self.pause) self.b_test = Button(axes([0.6, button_y, 0.1, button_h]), "Test") self.b_test.on_clicked(self.test) self.b_review = Button(axes([0.8, button_y, 0.1, button_h]), "Review") self.b_review.on_clicked(self.review) self.b_quit = Button(axes([0.9, button_y, 0.09, button_h]), "Quit") self.b_quit.on_clicked(self.quit) self.fig.show() self.count = 0 def quit(self, event): exit(0) def pause(self, event): self.is_pause = not self.is_pause self.make_label() def make_label(self): self.label.set_markup("HR %d / SpO2 %d %s"%(self.last_packet['hr'], self.last_packet['spo2'], "PAUSED" if self.is_pause else "")) def update_background(self): if self.background is None: #pdb.set_trace() #self.background = self.canvas.copy_from_bbox(pulse_ax.bbox) self.background = self.canvas.copy_from_bbox(self.hr_host.bbox) def test(self, event): head = { 'date': datetime.datetime.today(), 'hours': 10, 'minutes': 55, 'seconds': 0, } packets = [ { 'hr': 55, 'spo2': 99, 'valid': True }, { 'hr': 57, 'spo2': 98, 'valid': True }, { 'hr': 58, 'spo2': 97, 'valid': True }, ] self.update_upload(head, packets) def handle_upload(self, head_packet): self.label.set_markup("Uploading, please wait...") packets = [] while True: packet = driver.read_packet() if packet is None: return if packet['type'] == 'upload': continue if packet['type'] != 'upload_data': break packets.append(packet) length = len(packets) print "got %d packets"%(length) start_time = datetime.time(head_packet['hours'], head_packet['minutes']) today = datetime.date.today() start = datetime.datetime.combine(today, start_time) if start + datetime.timedelta(0, length) > datetime.datetime.today(): start = datetime.datetime.combine(today - datetime.timedelta(1), start_time) head_packet['date'] = start self.save_upload(head_packet, packets) self.update_upload(head_packet, packets) def review(self, event): user = self.user_field.get_text() files = os.listdir(self.user_dir(user)) files.sort() if len(files) == 0: print "no files in user dir" return head_packet, packets = self.load_upload(files[-1]) self.update_upload(head_packet, packets) def load_upload(self, file): user = self.user_field.get_text() source_file = os.path.join(self.user_dir(user), file) file = open(source_file, "r") # header line = file.readline() packets = [] while True: line = file.readline() if line == '': break line = line.strip("\n") packet = {} packet['date'], packet['hr'], packet['spo2'], packet['valid'], packet['byte0'] = line.split("\t") packet['date'] = datetime.datetime.strptime(packet['date'], "%Y-%m-%d %H:%M:%S") packet['valid'] = packet['valid'] != "0" packet['hr'] = int(packet['hr']) packet['spo2'] = int(packet['spo2']) packets.append(packet) file.close() head_packet = { 'hours': packets[0]['date'].hour, 'minutes': packets[0]['date'].minute, 'date': packets[0]['date'] } return head_packet, packets def save_upload(self, head_packet, packets): length = len(packets) start = head_packet['date'] user = self.user_field.get_text() target_dir = self.user_dir(user) target_file = os.path.join(target_dir, start.strftime(user + "-%Y-%m-%d-%H-%M.csv")) if os.access(target_file, os.F_OK): os.remove(target_file) file = open(target_file, "w") print "saving upload to %s"%(target_file) print >> file, "time\theartrate\tspo2\tvalid\tbyte0" count = 0 for packet in packets: dt = start + datetime.timedelta(0, count) print >> file, "%s\t%d\t%d\t%d\t%d"%(dt.isoformat(" "), packet['hr'], packet['spo2'], 1 if packet['valid'] else 0, packet['byte0']) count = count + 1 file.close() def update_upload(self, head_packet, packets): self.upload_fig = figure(2) self.upload_host = SubplotHost(self.upload_fig, 111, title=head_packet['date'].date().isoformat()) self.upload_fig.add_subplot(self.upload_host) self.upload_host.plot([1,1], [2,4]) self.upload_spo2_host = self.upload_host.twinx() self.upload_host.axis["bottom"].major_ticklabels.set_rotation(45) self.upload_host.set_ylabel("Rate") self.upload_spo2_host.set_ylabel("SpO2") start = (head_packet['hours'] * 60 + head_packet['minutes']) * 60 upload_x = range(start, start + len(packets)) hr_y = [ packet['hr'] if packet['valid'] else None for packet in packets ] spo2_y = [ packet['spo2'] if packet['valid'] else None for packet in packets ] quiet_count = 0 for ind in range(1, len(packets)): if not packets[ind]['valid']: quiet_count = 60 if quiet_count > 0: quiet_count = quiet_count - 1 hr_y[ind] = None spo2_y[ind] = None hr_line, = self.upload_host.plot(upload_x, hr_y, 'r-', linewidth=2, label="Rate") spo2_line, = self.upload_spo2_host.plot(upload_x, spo2_y, 'g-', linewidth=2, label="SpO2") self.upload_host.axis["left"].label.set_color(self.hr_line.get_color()) self.upload_spo2_host.axis["right"].label.set_color(self.spo2_line.get_color()) self.upload_host.axis([start, start + len(packets), 40, 150]) self.upload_spo2_host.axis([start, start +len(packets), 70, 100]) minutes = len(packets) / 60 if minutes < 10: spacing = 1 elif minutes < 60: spacing = 5 elif minutes < 120: spacing = 10 else: spacing = 30 ticks = range(start, start+len(packets), spacing * 60) self.upload_host.set_xticks(ticks) minor_ticks = range(start, start+len(packets), spacing * 12) self.upload_host.set_xticks(minor_ticks, minor=True) self.upload_host.set_xticklabels([fmt_time_tick(tick) for tick in ticks], rotation=90, fontsize=8) self.upload_fig.show() self.upload_fig.canvas.draw() def update_ui(self, *args): self.update_background() if driver is None: self.label.set_markup("Not plugged in") return if self.is_pause: for ind in range(5): packet = driver.read_packet() if packet is None: self.label.set_markup("No Connection") continue if packet['type'] == 'upload': self.handle_upload(packet) return True return True for ind in range(5): self.count = self.count + 1 packet = driver.read_packet() if packet is None: self.label.set_markup("No Connection") continue if packet['type'] == 'upload': self.handle_upload(packet) return True if packet['valid']: self.pulse1_y = append(self.pulse1_y[1:], packet['pulse1']) else: self.pulse1_y = append(self.pulse1_y[1:], None) self.pulse1_line.set_data(self.fast_x, self.pulse1_y) if not packet['valid']: self.label.set_markup("No Reading") if packet['beat'] and packet['valid']: self.make_label() self.last_packet = packet if self.count % (60 * App.SEC_PER_SLOW) == 0: if packet['valid']: self.hr_y = append(self.hr_y[1:], self.last_packet['hr']) self.spo2_y = append(self.spo2_y[1:], self.last_packet['spo2']) else: self.hr_y = append(self.hr_y[1:], None) self.spo2_y = append(self.spo2_y[1:], None) self.hr_line.set_data(self.slow_x, self.hr_y) self.spo2_line.set_data(self.slow_x, self.spo2_y) self.canvas.draw() # Optimize drawing the pulse # Instead of calling self.canvas.draw(), just draw the pulse itself and # blit it to the window. # This doesn't clear properly for some reason #self.canvas.restore_region(self.background, bbox=self.pulse_ax.bbox) self.canvas.restore_region(self.background) self.pulse_ax.draw_artist(self.pulse1_line) self.canvas.blit(self.pulse_ax.bbox) return True app = App() gobject.idle_add(app.update_ui) show()