Python library for working with the PD Buddy Sink Serial Console Configuration Interface
Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.

__init__.py 9.3KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. """Python bindings for PD Buddy Sink configuration"""
  2. from enum import *
  3. import serial
  4. import serial.tools.list_ports
  5. class Sink:
  6. """Interface for configuring a PD Buddy Sink"""
  7. vid = 0x1209
  8. pid = 0x0001
  9. def __init__(self, sp):
  10. """Open a serial port to communicate with the PD Buddy Sink
  11. :param sp: the serial port of the device
  12. :type sp: str or `serial.tools.list_ports.ListPortInfo`
  13. """
  14. try:
  15. self._port = serial.Serial(sp, baudrate=115200)
  16. except ValueError:
  17. self._port = serial.Serial(sp.device, baudrate=115200)
  18. def __enter__(self):
  19. return self
  20. def __exit__(self, exc_type, exc_value, traceback):
  21. self._port.close()
  22. def send_command(self, cmd):
  23. """Send a command to the PD Buddy Sink, returning the result
  24. :param cmd: the text to send to the Sink
  25. :type sp: str
  26. :returns: a list of zero or more bytes objects, each being one line
  27. printed as a response to the command.
  28. """
  29. # Send the command
  30. self._port.write(bytes(cmd, "utf-8") + b"\r\n")
  31. self._port.flush()
  32. # Read the result
  33. answer = b""
  34. while not answer.endswith(b"PDBS) "):
  35. answer += self._port.read(1)
  36. answer = answer.split(b"\r\n")
  37. # Remove the echoed command and prompt
  38. answer = answer[1:-1]
  39. return answer
  40. def close(self):
  41. """Close the serial port"""
  42. self._port.close()
  43. def help(self):
  44. """Returns the help text from the PD Buddy Sink"""
  45. return self.send_command("help")
  46. def license(self):
  47. """Returns the license text from the PD Buddy Sink"""
  48. return self.send_command("license")
  49. def erase(self):
  50. """Synchronously erases all stored configuration from flash"""
  51. self.send_command("erase")
  52. def write(self):
  53. """Synchronously writes the contents of the configuration buffer to flash"""
  54. self.send_command("write")
  55. def load(self):
  56. """Loads the current configuration from flash into the buffer
  57. :raises: KeyError
  58. """
  59. text = self.send_command("load")
  60. if len(text) > 0 and text[0].startswith(b"No configuration"):
  61. raise KeyError("no configuration")
  62. def get_cfg(self, index=None):
  63. """Reads configuration from flash
  64. :param index: optional index of configuration object in flash to read
  65. :returns: a `SinkConfig` object
  66. """
  67. if index is None:
  68. cfg = self.send_command("get_cfg")
  69. else:
  70. cfg = self.send_command("get_cfg {}".format(index))
  71. return SinkConfig.from_text(cfg)
  72. def get_tmpcfg(self):
  73. """Reads the contents of the configuration buffer
  74. :returns: a `SinkConfig` object
  75. """
  76. cfg = self.send_command("get_tmpcfg")
  77. return SinkConfig.from_text(cfg)
  78. def clear_flags(self):
  79. """Clears all the flags in the configuration buffer"""
  80. self.send_command("clear_flags")
  81. def toggle_giveback(self):
  82. """Toggles the GiveBack flag in the configuration buffer"""
  83. self.send_command("toggle_giveback")
  84. def set_v(self, mv):
  85. """Sets the voltage of the configuration buffer, in millivolts"""
  86. self.send_command("set_v {}".format(mv))
  87. def set_i(self, ma):
  88. """Sets the current of the configuration buffer, in milliamperes"""
  89. self.send_command("set_i {}".format(ma))
  90. def identify(self):
  91. """Blinks the LED quickly"""
  92. self.send_command("identify")
  93. def set_tmpcfg(self, sc):
  94. """Writes a SinkConfig object to the device's configuration buffer
  95. Note: the value of the status field is ignored; it will always be
  96. `SinkStatus.VALID`.
  97. """
  98. # Set flags
  99. self.clear_flags()
  100. if sc.flags & SinkFlags.GIVEBACK:
  101. self.toggle_giveback()
  102. # Set voltage
  103. self.set_v(sc.v)
  104. # Set current
  105. self.set_i(sc.i)
  106. @classmethod
  107. def get_devices(cls):
  108. """Get an iterable of PD Buddy Sink devices
  109. :returns: an iterable of `serial.tools.list_ports.ListPortInfo` objects
  110. """
  111. return serial.tools.list_ports.grep("{:04X}:{:04X}".format(cls.vid,
  112. cls.pid))
  113. class SinkConfig:
  114. """Python representation of a PD Buddy Sink configuration object"""
  115. def __init__(self, status=None, flags=None, v=None, i=None):
  116. """Create a SinkConfig object
  117. :param status: A `SinkStatus` value
  118. :param flags: Zero or more `SinkFlags` values
  119. :param v: Voltage in millivolts
  120. :param i: Current in milliamperes
  121. """
  122. self.status = status
  123. self.flags = flags
  124. self.v = v
  125. self.i = i
  126. def __repr__(self):
  127. s = type(self).__name__ + "("
  128. if self.status is not None:
  129. s += "status={}".format(self.status)
  130. if self.flags is not None:
  131. if not s.endswith("("):
  132. s += ", "
  133. s += "flags={}".format(self.flags)
  134. if self.v is not None:
  135. if not s.endswith("("):
  136. s += ", "
  137. s += "v={}".format(self.v)
  138. if self.i is not None:
  139. if not s.endswith("("):
  140. s += ", "
  141. s += "i={}".format(self.i)
  142. s += ")"
  143. return s
  144. def __str__(self):
  145. """Print the SinkStatus in the manner of the configuration shell"""
  146. s = ""
  147. if self.status is not None:
  148. s += "status: "
  149. if self.status is SinkStatus.EMPTY:
  150. s += "empty"
  151. elif self.status is SinkStatus.VALID:
  152. s += "valid"
  153. elif self.status is SinkStatus.INVALID:
  154. s += "invalid"
  155. s += "\n"
  156. if self.flags is not None:
  157. s += "flags: "
  158. if self.flags is SinkFlags.NONE:
  159. s += "(none)"
  160. else:
  161. if self.flags & SinkFlags.GIVEBACK:
  162. s += "GiveBack"
  163. s += "\n"
  164. if self.v is not None:
  165. s += "v: {:.2f} V\n".format(self.v / 1000)
  166. if self.i is not None:
  167. s += "i: {:.2f} A\n".format(self.i / 1000)
  168. # Return all but the last character of s to remove the trailing newline
  169. if s:
  170. return s[:-1]
  171. else:
  172. return "No configuration"
  173. def __eq__(self, other):
  174. if isinstance(other, self.__class__):
  175. if other.status is not self.status:
  176. return False
  177. if other.flags is not self.flags:
  178. return False
  179. if other.v != self.v:
  180. return False
  181. if other.i != self.i:
  182. return False
  183. return True
  184. return NotImplemented
  185. def __ne__(self, other):
  186. if isinstance(other, self.__class__):
  187. return not self.__eq__(other)
  188. return NotImplemented
  189. def __hash__(self):
  190. return hash(tuple(sorted(self.__dict__.items())))
  191. @classmethod
  192. def from_text(cls, text):
  193. """Creates a SinkConfig from text returned by Sink.send_command
  194. :returns: a new `SinkConfig` object.
  195. :raises: IndexError
  196. """
  197. # Assume the parameters will all be None
  198. status = None
  199. flags = None
  200. v = None
  201. i = None
  202. # Iterate over all lines of text
  203. for line in text:
  204. # If the configuration said invalid index, raise an IndexError
  205. if line.startswith(b"Invalid index"):
  206. raise IndexError("configuration index out of range")
  207. # If there is no configuration, return an empty SinkConfig
  208. elif line.startswith(b"No configuration"):
  209. return cls()
  210. # If this line is the status field
  211. elif line.startswith(b"status: "):
  212. line = line.split()[1:]
  213. if line[0] == b"empty":
  214. status = SinkStatus.EMPTY
  215. elif line[0] == b"valid":
  216. status = SinkStatus.VALID
  217. elif line[0] == b"invalid":
  218. status = SinkStatus.INVALID
  219. # If this line is the flags field
  220. elif line.startswith(b"flags: "):
  221. line = line.split()[1:]
  222. flags = SinkFlags.NONE
  223. for word in line:
  224. if word == b"(none)":
  225. # If there are no flags set, stop looking
  226. break
  227. elif word == b"GiveBack":
  228. flags |= SinkFlags.GIVEBACK
  229. # If this line is the v field
  230. elif line.startswith(b"v: "):
  231. word = line.split()[1]
  232. v = round(1000*float(word))
  233. # If this line is the i field
  234. elif line.startswith(b"i: "):
  235. word = line.split()[1]
  236. i = round(1000*float(word))
  237. # Create a new SinkConfig object with the values we just read
  238. return cls(status=status, flags=flags, v=v, i=i)
  239. class SinkStatus(Enum):
  240. """Status field of a PD Buddy Sink configuration object"""
  241. EMPTY = 1
  242. VALID = 2
  243. INVALID = 3
  244. class SinkFlags(Flag):
  245. """Flags field of a PD Buddy Sink configuration object"""
  246. NONE = 0
  247. GIVEBACK = auto()