Python library for working with the PD Buddy Sink Serial Console Configuration Interface
Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.

__init__.py 11KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. """Python bindings for PD Buddy Sink configuration"""
  2. try:
  3. # Try importing enum from the standard library
  4. import enum
  5. # Make sure Flag is available
  6. enum.Flag
  7. except (ImportError, NameError):
  8. # If something above failed, try aenum instead
  9. import aenum as enum
  10. import serial
  11. import serial.tools.list_ports
  12. class Sink:
  13. """Interface for configuring a PD Buddy Sink"""
  14. vid = 0x1209
  15. pid = 0x9DB5
  16. def __init__(self, sp):
  17. """Open a serial port to communicate with the PD Buddy Sink
  18. :param sp: the serial port of the device
  19. :type sp: str or `serial.tools.list_ports.ListPortInfo`
  20. """
  21. try:
  22. self._port = serial.Serial(sp, baudrate=115200)
  23. except ValueError:
  24. self._port = serial.Serial(sp.device, baudrate=115200)
  25. # Put communications in a known state, cancelling any partially-entered
  26. # command that may be sitting in the buffer.
  27. self.send_command("\x04", newline=False)
  28. def __enter__(self):
  29. return self
  30. def __exit__(self, exc_type, exc_value, traceback):
  31. self._port.close()
  32. def send_command(self, cmd, newline=True):
  33. """Send a command to the PD Buddy Sink, returning the result
  34. :param cmd: the text to send to the Sink
  35. :param newline: whether to append a ``\r\n`` to the command
  36. :type cmd: str
  37. :type newline: bool
  38. :returns: a list of zero or more bytes objects, each being one line
  39. printed as a response to the command.
  40. """
  41. # Build the command
  42. cmd = cmd.encode("utf-8")
  43. if newline:
  44. cmd += b"\r\n"
  45. # Send the command
  46. self._port.write(cmd)
  47. self._port.flush()
  48. # Read the result
  49. answer = b""
  50. while not answer.endswith(b"PDBS) "):
  51. answer += self._port.read(1)
  52. answer = answer.split(b"\r\n")
  53. # Remove the echoed command and prompt
  54. answer = answer[1:-1]
  55. # Raise an exception if the command wasn't recognized
  56. if len(answer) and answer[0] == cmd.strip().split()[0] + b" ?":
  57. raise KeyError("command not found")
  58. return answer
  59. def close(self):
  60. """Close the serial port"""
  61. self._port.close()
  62. def help(self):
  63. """Returns the help text from the PD Buddy Sink"""
  64. return self.send_command("help")
  65. def license(self):
  66. """Returns the license text from the PD Buddy Sink"""
  67. return self.send_command("license")
  68. def erase(self):
  69. """Synchronously erases all stored configuration from flash"""
  70. self.send_command("erase")
  71. def write(self):
  72. """Synchronously writes the contents of the configuration buffer to flash"""
  73. self.send_command("write")
  74. def load(self):
  75. """Loads the current configuration from flash into the buffer
  76. :raises: KeyError
  77. """
  78. text = self.send_command("load")
  79. if len(text) > 0 and text[0].startswith(b"No configuration"):
  80. raise KeyError("no configuration")
  81. def get_cfg(self, index=None):
  82. """Reads configuration from flash
  83. :param index: optional index of configuration object in flash to read
  84. :returns: a `SinkConfig` object
  85. """
  86. if index is None:
  87. cfg = self.send_command("get_cfg")
  88. else:
  89. cfg = self.send_command("get_cfg {}".format(index))
  90. return SinkConfig.from_text(cfg)
  91. def get_tmpcfg(self):
  92. """Reads the contents of the configuration buffer
  93. :returns: a `SinkConfig` object
  94. """
  95. cfg = self.send_command("get_tmpcfg")
  96. return SinkConfig.from_text(cfg)
  97. def clear_flags(self):
  98. """Clears all the flags in the configuration buffer"""
  99. self.send_command("clear_flags")
  100. def toggle_giveback(self):
  101. """Toggles the GiveBack flag in the configuration buffer"""
  102. self.send_command("toggle_giveback")
  103. def set_v(self, mv):
  104. """Sets the voltage of the configuration buffer, in millivolts"""
  105. out = self.send_command("set_v {}".format(mv))
  106. # If that command gave any output, that indicates an error. Raise an
  107. # exception to make that clear.
  108. if len(out):
  109. raise ValueError(out[0])
  110. def set_i(self, ma):
  111. """Sets the current of the configuration buffer, in milliamperes"""
  112. out = self.send_command("set_i {}".format(ma))
  113. # If that command gave any output, that indicates an error. Raise an
  114. # exception to make that clear.
  115. if len(out):
  116. raise ValueError(out[0])
  117. def identify(self):
  118. """Blinks the LED quickly"""
  119. self.send_command("identify")
  120. def output(self, state=None):
  121. """Gets or sets the state of a Sink's output
  122. Raises KeyError if the ``output`` command is not available on the Sink.
  123. Raises ValueError if an invalid output is read.
  124. :param state: optional value of the output to set
  125. :returns: the output state if state is None, None otherwise
  126. """
  127. # With no parameter, return the output state
  128. if state is None:
  129. value = self.send_command("output")
  130. if value[0] == b"enabled":
  131. return True
  132. elif value[0] == b"disabled":
  133. return False
  134. else:
  135. # If unexpected text is returned, raise an exception indicating a
  136. # firmware error
  137. raise ValueError("unknown output state")
  138. # With a parameter, set the output state
  139. if state:
  140. self.send_command("output enable")
  141. else:
  142. self.send_command("output disable")
  143. def set_tmpcfg(self, sc):
  144. """Writes a SinkConfig object to the device's configuration buffer
  145. Note: the value of the status field is ignored; it will always be
  146. `SinkStatus.VALID`.
  147. """
  148. # Set flags
  149. self.clear_flags()
  150. if sc.flags & SinkFlags.GIVEBACK:
  151. self.toggle_giveback()
  152. # Set voltage
  153. self.set_v(sc.v)
  154. # Set current
  155. self.set_i(sc.i)
  156. @classmethod
  157. def get_devices(cls):
  158. """Get an iterable of PD Buddy Sink devices
  159. :returns: an iterable of `serial.tools.list_ports.ListPortInfo` objects
  160. """
  161. return serial.tools.list_ports.grep("{:04X}:{:04X}".format(cls.vid,
  162. cls.pid))
  163. class SinkConfig:
  164. """Python representation of a PD Buddy Sink configuration object"""
  165. def __init__(self, status=None, flags=None, v=None, i=None):
  166. """Create a SinkConfig object
  167. :param status: A `SinkStatus` value
  168. :param flags: Zero or more `SinkFlags` values
  169. :param v: Voltage in millivolts
  170. :param i: Current in milliamperes
  171. """
  172. self.status = status
  173. self.flags = flags
  174. self.v = v
  175. self.i = i
  176. def __repr__(self):
  177. s = self.__class__.__name__ + "("
  178. if self.status is not None:
  179. s += "status={}".format(self.status)
  180. if self.flags is not None:
  181. if not s.endswith("("):
  182. s += ", "
  183. s += "flags={}".format(self.flags)
  184. if self.v is not None:
  185. if not s.endswith("("):
  186. s += ", "
  187. s += "v={}".format(self.v)
  188. if self.i is not None:
  189. if not s.endswith("("):
  190. s += ", "
  191. s += "i={}".format(self.i)
  192. s += ")"
  193. return s
  194. def __str__(self):
  195. """Print the SinkStatus in the manner of the configuration shell"""
  196. s = ""
  197. if self.status is not None:
  198. s += "status: "
  199. if self.status is SinkStatus.EMPTY:
  200. s += "empty"
  201. elif self.status is SinkStatus.VALID:
  202. s += "valid"
  203. elif self.status is SinkStatus.INVALID:
  204. s += "invalid"
  205. s += "\n"
  206. if self.flags is not None:
  207. s += "flags: "
  208. if self.flags is SinkFlags.NONE:
  209. s += "(none)"
  210. else:
  211. if self.flags & SinkFlags.GIVEBACK:
  212. s += "GiveBack"
  213. s += "\n"
  214. if self.v is not None:
  215. s += "v: {:.2f} V\n".format(self.v / 1000)
  216. if self.i is not None:
  217. s += "i: {:.2f} A\n".format(self.i / 1000)
  218. # Return all but the last character of s to remove the trailing newline
  219. if s:
  220. return s[:-1]
  221. else:
  222. return "No configuration"
  223. def __eq__(self, other):
  224. if isinstance(other, self.__class__):
  225. if other.status is not self.status:
  226. return False
  227. if other.flags is not self.flags:
  228. return False
  229. if other.v != self.v:
  230. return False
  231. if other.i != self.i:
  232. return False
  233. return True
  234. return NotImplemented
  235. def __ne__(self, other):
  236. if isinstance(other, self.__class__):
  237. return not self.__eq__(other)
  238. return NotImplemented
  239. def __hash__(self):
  240. return hash(tuple(sorted(self.__dict__.items())))
  241. @classmethod
  242. def from_text(cls, text):
  243. """Creates a SinkConfig from text returned by Sink.send_command
  244. :param text: the text to load
  245. :type text: a list of bytes objects
  246. :returns: a new `SinkConfig` object.
  247. :raises: IndexError
  248. """
  249. # Assume the parameters will all be None
  250. status = None
  251. flags = None
  252. v = None
  253. i = None
  254. # Iterate over all lines of text
  255. for line in text:
  256. # If the configuration said invalid index, raise an IndexError
  257. if line.startswith(b"Invalid index"):
  258. raise IndexError("configuration index out of range")
  259. # If there is no configuration, return an empty SinkConfig
  260. elif line.startswith(b"No configuration"):
  261. return cls()
  262. # If this line is the status field
  263. elif line.startswith(b"status: "):
  264. line = line.split()[1:]
  265. if line[0] == b"empty":
  266. status = SinkStatus.EMPTY
  267. elif line[0] == b"valid":
  268. status = SinkStatus.VALID
  269. elif line[0] == b"invalid":
  270. status = SinkStatus.INVALID
  271. # If this line is the flags field
  272. elif line.startswith(b"flags: "):
  273. line = line.split()[1:]
  274. flags = SinkFlags.NONE
  275. for word in line:
  276. if word == b"(none)":
  277. # If there are no flags set, stop looking
  278. break
  279. elif word == b"GiveBack":
  280. flags |= SinkFlags.GIVEBACK
  281. # If this line is the v field
  282. elif line.startswith(b"v: "):
  283. word = line.split()[1]
  284. v = round(1000*float(word))
  285. # If this line is the i field
  286. elif line.startswith(b"i: "):
  287. word = line.split()[1]
  288. i = round(1000*float(word))
  289. # Create a new SinkConfig object with the values we just read
  290. return cls(status=status, flags=flags, v=v, i=i)
  291. class SinkStatus(enum.Enum):
  292. """Status field of a PD Buddy Sink configuration object"""
  293. EMPTY = 1
  294. VALID = 2
  295. INVALID = 3
  296. class SinkFlags(enum.Flag):
  297. """Flags field of a PD Buddy Sink configuration object"""
  298. NONE = 0
  299. GIVEBACK = enum.auto()