123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359 |
- #!/usr/bin/env python3
-
- import sys
-
- import serial
- import serial.tools.list_ports
- import gi
- gi.require_version('Gtk', '3.0')
- from gi.repository import Gtk, Gio, GObject, GLib
-
-
- def pdb_send_message(sp, message, window=None):
- """Send a message over the serial port and return the response"""
- try:
- # Open the serial port
- sp = serial.Serial(sp.device, baudrate=115200, timeout=0.01)
-
- sp.write(bytes(message, 'utf-8') + b'\r\n')
- sp.flush()
- answer = sp.readlines()
-
- sp.close()
-
- # Remove the echoed command and prompt
- answer = answer[1:-1]
- return answer
- except OSError as e:
- if window is not None:
- dialog = Gtk.MessageDialog(window, 0, Gtk.MessageType.ERROR,
- Gtk.ButtonsType.CLOSE, "Error communicating with device")
- dialog.format_secondary_text(e.strerror)
- dialog.run()
-
- dialog.destroy()
- raise
-
-
- class ListRowModel(GObject.GObject):
-
- def __init__(self, serport):
- GObject.GObject.__init__(self)
- self.serport = serport
-
-
- class SelectListStore(Gio.ListStore):
-
- def update_items(self):
- # Get a list of serial ports
- serports = list(serial.tools.list_ports.grep("1209:0001"))
-
- # Mark ports to remove or add
- remove_list = []
- list_len = self.get_n_items()
- for i in range(list_len):
- remove = True
- for j in range(len(serports)):
- if serports[j] is not None and self.get_item(i).serport == serports[j]:
- serports[j] = None
- remove = False
- if remove:
- remove_list.append(i)
-
- # Remove the missing ones
- for i in remove_list:
- self.remove(i)
-
- # Add any new ports
- for port in serports:
- if port is not None:
- self.append(ListRowModel(port))
-
-
- class SelectList(Gtk.Box):
- __gsignals__ = {
- 'row-activated': (GObject.SIGNAL_RUN_FIRST, None,
- (object,))
- }
-
- def __init__(self):
- Gtk.Box.__init__(self)
-
- self._model = None
-
- self._builder = Gtk.Builder()
- self._builder.add_from_file("data/select-stack.ui")
- self._builder.connect_signals(self)
-
- sl = self._builder.get_object("select-list")
-
- # Add separators to the list
- sl.set_header_func(self._update_header_func, None)
-
- self.pack_start(self._builder.get_object("select-stack"), True, True, 0)
- self.show_all()
-
- def _update_header_func(self, row, before, data):
- """Add a separator header to all rows but the first one"""
- if before is None:
- row.set_header(None)
- return
-
- current = row.get_header()
- if current is None:
- current = Gtk.Separator.new(Gtk.Orientation.HORIZONTAL)
- row.set_header(current)
-
- def bind_model(self, model, func):
- self._builder.get_object("select-list").bind_model(model, func)
- self._model = model
-
- self.reload()
- GLib.timeout_add(1000, self.reload)
-
- def reload(self):
- self._model.update_items()
-
- # Set the visible child
- stack = self._builder.get_object("select-stack")
- if self._model.get_n_items():
- stack.set_visible_child(self._builder.get_object("select-frame"))
- else:
- stack.set_visible_child(self._builder.get_object("select-none"))
-
- return True
-
- def on_select_list_row_activated(self, box, row):
- self.emit("row-activated", row.model.serport)
-
-
- class SelectListRow(Gtk.ListBoxRow):
-
- def __init__(self, model):
- Gtk.EventBox.__init__(self)
-
- self.model = model
-
- self._builder = Gtk.Builder()
- self._builder.add_from_file("data/select-list-row.ui")
- self._builder.connect_signals(self)
-
- name = self._builder.get_object("name")
- name.set_text('{} {}'.format(self.model.serport.manufacturer,
- self.model.serport.product))
-
- device = self._builder.get_object("device")
- device.set_text(self.model.serport.device)
-
- self.add(self._builder.get_object("grid"))
- self.show_all()
-
- def on_identify_clicked(self, button):
- window = self.get_toplevel()
- try:
- pdb_send_message(self.model.serport, 'identify', window)
- except:
- return
-
-
- class Handler:
-
- def __init__(self, builder):
- self.builder = builder
- self.serial_port = None
- self.voltage = None
- self.current = None
- self.giveback = None
- self.selectlist = None
-
- def on_pdb_window_realize(self, *args):
- # Get the list
- sb = self.builder.get_object("select-box")
- self.selectlist = SelectList()
- sb.pack_start(self.selectlist, True, True, 0)
-
- liststore = SelectListStore()
-
- self.selectlist.bind_model(liststore, SelectListRow)
-
- self.selectlist.connect("row-activated", self.on_select_list_row_activated)
-
- def on_pdb_window_delete_event(self, *args):
- Gtk.main_quit(*args)
-
- def on_select_list_row_activated(self, selectlist, serport):
- # Get voltage and current widgets
- voltage = self.builder.get_object("voltage-combobox")
- current = self.builder.get_object("current-spinbutton")
- giveback = self.builder.get_object("giveback-toggle")
-
- self.serial_port = serport
-
- window = self.builder.get_object("pdb-window")
- try:
- pdb_send_message(self.serial_port, 'load', window)
- tmpcfg = pdb_send_message(self.serial_port, 'get_tmpcfg', window)
- except:
- return
-
- # Get voltage and current from device and load them into the GUI
- for line in tmpcfg:
- if line.startswith(b'flags:'):
- line = line.split()[1:]
- try:
- line.index(b'GiveBack')
- giveback.set_active(True)
- except:
- giveback.set_active(False)
- elif line.startswith(b'v:'):
- v = line.split()[1]
- if v == b'5.00':
- voltage.set_active_id('voltage-five')
- elif v == b'9.00':
- voltage.set_active_id('voltage-nine')
- elif v == b'15.00':
- voltage.set_active_id('voltage-fifteen')
- if v == b'20.00':
- voltage.set_active_id('voltage-twenty')
- elif line.startswith(b'i:'):
- i = float(line.split()[1])
- current.set_value(i)
-
- self._store_device_settings()
- self._set_save_button_visibility()
-
- # Show the Sink page
- hst = self.builder.get_object("header-stack")
- hsink = self.builder.get_object("header-sink")
- hsink.set_title('{} {}'.format(serport.manufacturer, serport.product))
- hsink.set_subtitle(serport.device)
- hst.set_visible_child(hsink)
-
- st = self.builder.get_object("stack")
- sink = self.builder.get_object("sink")
- st.set_visible_child(sink)
-
- # Ping the Sink repeatedly
- GLib.timeout_add(1000, self._ping)
-
- def _ping(self):
- """Ping the device we're configuring, showing to the list on failure"""
- if self.serial_port is None:
- self.selectlist.reload()
- self.on_header_sink_back_clicked(None)
- return False
- try:
- pdb_send_message(self.serial_port, '')
- return True
- except:
- self.selectlist.reload()
- self.on_header_sink_back_clicked(None)
- return False
-
- def on_header_sink_back_clicked(self, data):
- self.serial_port = None
-
- # Show the Select page
- hst = self.builder.get_object("header-stack")
- hselect = self.builder.get_object("header-select")
- hst.set_visible_child(hselect)
-
- st = self.builder.get_object("stack")
- select = self.builder.get_object("select")
- st.set_visible_child(select)
-
- def on_header_sink_save_clicked(self, button):
- window = self.builder.get_object("pdb-window")
- try:
- pdb_send_message(self.serial_port, 'write', window)
-
- self._store_device_settings()
- self._set_save_button_visibility()
- except:
- self.on_header_sink_back_clicked(None)
-
- def _store_device_settings(self):
- """Store the settings that were loaded from the device"""
- # Get voltage and current widgets
- voltage = self.builder.get_object("voltage-combobox")
- current = self.builder.get_object("current-spinbutton")
- giveback = self.builder.get_object("giveback-toggle")
-
- # Remember the loaded settings
- self.voltage = voltage.get_active_id()
- self.current = current.get_value()
- self.giveback = giveback.get_active()
-
- def _set_save_button_visibility(self):
- """Show the save button if there are new settings to save"""
- # Get relevant widgets
- voltage = self.builder.get_object("voltage-combobox")
- current = self.builder.get_object("current-spinbutton")
- giveback = self.builder.get_object("giveback-toggle")
- rev = self.builder.get_object("header-sink-save-revealer")
-
- # Set visibility
- rev.set_reveal_child(voltage.get_active_id() != self.voltage
- or current.get_value() != self.current
- or giveback.get_active() != self.giveback)
-
- def on_voltage_combobox_changed(self, combo):
- window = self.builder.get_object("pdb-window")
- try:
- pdb_send_message(self.serial_port,
- 'set_v {}'.format(int(combo.get_active_text())*1000),
- window)
-
- self._set_save_button_visibility()
- except:
- self.on_header_sink_back_clicked(None)
-
- def on_current_spinbutton_changed(self, spin):
- window = self.builder.get_object("pdb-window")
- try:
- pdb_send_message(self.serial_port,
- 'set_i {}'.format(int(spin.get_value()*1000)),
- window)
-
- self._set_save_button_visibility()
- except:
- self.on_header_sink_back_clicked(None)
-
- def on_giveback_toggle_toggled(self, toggle):
- window = self.builder.get_object("pdb-window")
- try:
- pdb_send_message(self.serial_port, 'toggle_giveback', window)
-
- self._set_save_button_visibility()
- except:
- self.on_header_sink_back_clicked(None)
-
- class Application(Gtk.Application):
-
- def __init__(self, *args, **kwargs):
- super().__init__(*args, application_id="com.clayhobbs.pd-buddy-gtk",
- **kwargs)
- self.window = None
-
- def do_startup(self):
- Gtk.Application.do_startup(self)
-
- self.builder = Gtk.Builder()
- self.builder.add_from_file("data/pd-buddy-gtk.ui")
- self.builder.connect_signals(Handler(self.builder))
-
- def do_activate(self):
- # We only allow a single window and raise any existing ones
- if not self.window:
- # Windows are associated with the application
- # when the last one is closed the application shuts down
- self.window = self.builder.get_object("pdb-window")
- self.add_window(self.window)
- self.window.set_wmclass("PD Buddy Configuration",
- "PD Buddy Configuration")
-
- self.window.present()
-
- if __name__ == "__main__":
- app = Application()
- app.run(sys.argv)
|