Python library for working with the PD Buddy Sink Serial Console Configuration Interface
Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

__init__.py 10KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. """Python bindings for PD Buddy Sink configuration"""
  2. from collections import namedtuple
  3. try:
  4. # Try importing enum from the standard library
  5. import enum
  6. # Make sure Flag is available
  7. enum.Flag
  8. except (ImportError, NameError):
  9. # If something above failed, try aenum instead
  10. import aenum as enum
  11. import serial
  12. import serial.tools.list_ports
  13. class Sink:
  14. """Interface for configuring a PD Buddy Sink"""
  15. vid = 0x1209
  16. pid = 0x9DB5
  17. def __init__(self, sp):
  18. """Open a serial port to communicate with the PD Buddy Sink
  19. :param sp: the serial port of the device
  20. :type sp: str or `serial.tools.list_ports.ListPortInfo`
  21. """
  22. try:
  23. self._port = serial.Serial(sp, baudrate=115200)
  24. except ValueError:
  25. self._port = serial.Serial(sp.device, baudrate=115200)
  26. # Put communications in a known state, cancelling any partially-entered
  27. # command that may be sitting in the buffer.
  28. self.send_command("\x04", newline=False)
  29. def __enter__(self):
  30. return self
  31. def __exit__(self, exc_type, exc_value, traceback):
  32. self._port.close()
  33. def send_command(self, cmd, newline=True):
  34. """Send a command to the PD Buddy Sink, returning the result
  35. :param cmd: the text to send to the Sink
  36. :param newline: whether to append a ``\r\n`` to the command
  37. :type cmd: str
  38. :type newline: bool
  39. :returns: a list of zero or more bytes objects, each being one line
  40. printed as a response to the command.
  41. """
  42. # Build the command
  43. cmd = cmd.encode("utf-8")
  44. if newline:
  45. cmd += b"\r\n"
  46. # Send the command
  47. self._port.write(cmd)
  48. self._port.flush()
  49. # Read the result
  50. answer = b""
  51. while not answer.endswith(b"PDBS) "):
  52. answer += self._port.read(1)
  53. answer = answer.split(b"\r\n")
  54. # Remove the echoed command and prompt
  55. answer = answer[1:-1]
  56. # Raise an exception if the command wasn't recognized
  57. if len(answer) and answer[0] == cmd.strip().split()[0] + b" ?":
  58. raise KeyError("command not found")
  59. return answer
  60. def close(self):
  61. """Close the serial port"""
  62. self._port.close()
  63. def help(self):
  64. """Returns the help text from the PD Buddy Sink"""
  65. return self.send_command("help")
  66. def license(self):
  67. """Returns the license text from the PD Buddy Sink"""
  68. return self.send_command("license")
  69. def erase(self):
  70. """Synchronously erases all stored configuration from flash"""
  71. self.send_command("erase")
  72. def write(self):
  73. """Synchronously writes the contents of the configuration buffer to flash"""
  74. self.send_command("write")
  75. def load(self):
  76. """Loads the current configuration from flash into the buffer
  77. :raises: KeyError
  78. """
  79. text = self.send_command("load")
  80. if len(text) > 0 and text[0].startswith(b"No configuration"):
  81. raise KeyError("no configuration")
  82. def get_cfg(self, index=None):
  83. """Reads configuration from flash
  84. :param index: optional index of configuration object in flash to read
  85. :returns: a `SinkConfig` object
  86. """
  87. if index is None:
  88. cfg = self.send_command("get_cfg")
  89. else:
  90. cfg = self.send_command("get_cfg {}".format(index))
  91. return SinkConfig.from_text(cfg)
  92. def get_tmpcfg(self):
  93. """Reads the contents of the configuration buffer
  94. :returns: a `SinkConfig` object
  95. """
  96. cfg = self.send_command("get_tmpcfg")
  97. return SinkConfig.from_text(cfg)
  98. def clear_flags(self):
  99. """Clears all the flags in the configuration buffer"""
  100. self.send_command("clear_flags")
  101. def toggle_giveback(self):
  102. """Toggles the GiveBack flag in the configuration buffer"""
  103. self.send_command("toggle_giveback")
  104. def set_v(self, mv):
  105. """Sets the voltage of the configuration buffer, in millivolts"""
  106. out = self.send_command("set_v {}".format(mv))
  107. # If that command gave any output, that indicates an error. Raise an
  108. # exception to make that clear.
  109. if len(out):
  110. raise ValueError(out[0])
  111. def set_i(self, ma):
  112. """Sets the current of the configuration buffer, in milliamperes"""
  113. out = self.send_command("set_i {}".format(ma))
  114. # If that command gave any output, that indicates an error. Raise an
  115. # exception to make that clear.
  116. if len(out):
  117. raise ValueError(out[0])
  118. def identify(self):
  119. """Blinks the LED quickly"""
  120. self.send_command("identify")
  121. @property
  122. def output(self):
  123. """The state of the Sink's output
  124. Raises KeyError if the ``output`` command is not available on the Sink.
  125. Raises ValueError if an invalid output is read.
  126. """
  127. value = self.send_command("output")
  128. if value[0] == b"enabled":
  129. return True
  130. elif value[0] == b"disabled":
  131. return False
  132. else:
  133. # If unexpected text is returned, raise an exception indicating a
  134. # firmware error
  135. raise ValueError("unknown output state")
  136. @output.setter
  137. def output(self, state):
  138. if state:
  139. self.send_command("output enable")
  140. else:
  141. self.send_command("output disable")
  142. def set_tmpcfg(self, sc):
  143. """Writes a SinkConfig object to the device's configuration buffer
  144. Note: the value of the status field is ignored; it will always be
  145. `SinkStatus.VALID`.
  146. """
  147. # Set flags
  148. self.clear_flags()
  149. if sc.flags & SinkFlags.GIVEBACK:
  150. self.toggle_giveback()
  151. # Set voltage
  152. self.set_v(sc.v)
  153. # Set current
  154. self.set_i(sc.i)
  155. @classmethod
  156. def get_devices(cls):
  157. """Get an iterable of PD Buddy Sink devices
  158. :returns: an iterable of `serial.tools.list_ports.ListPortInfo` objects
  159. """
  160. return serial.tools.list_ports.grep("{:04X}:{:04X}".format(cls.vid,
  161. cls.pid))
  162. class SinkConfig(namedtuple("SinkConfig", "status flags v i")):
  163. """Python representation of a PD Buddy Sink configuration object
  164. ``status`` should be a `SinkStatus` object. ``flags`` should be zero or
  165. more `SinkFlags` values. ``v`` is the voltage in millivolts, and ``i``
  166. is the current in milliamperes. `None` is also an acceptible value for
  167. any of the fields.
  168. """
  169. __slots__ = ()
  170. def __str__(self):
  171. """Print the SinkStatus in the manner of the configuration shell"""
  172. s = ""
  173. if self.status is not None:
  174. s += "status: "
  175. if self.status is SinkStatus.EMPTY:
  176. s += "empty"
  177. elif self.status is SinkStatus.VALID:
  178. s += "valid"
  179. elif self.status is SinkStatus.INVALID:
  180. s += "invalid"
  181. s += "\n"
  182. if self.flags is not None:
  183. s += "flags: "
  184. if self.flags is SinkFlags.NONE:
  185. s += "(none)"
  186. else:
  187. if self.flags & SinkFlags.GIVEBACK:
  188. s += "GiveBack"
  189. s += "\n"
  190. if self.v is not None:
  191. s += "v: {:.2f} V\n".format(self.v / 1000)
  192. if self.i is not None:
  193. s += "i: {:.2f} A\n".format(self.i / 1000)
  194. # Return all but the last character of s to remove the trailing newline
  195. if s:
  196. return s[:-1]
  197. else:
  198. return "No configuration"
  199. @classmethod
  200. def from_text(cls, text):
  201. """Creates a SinkConfig from text returned by Sink.send_command
  202. :param text: the text to load
  203. :type text: a list of bytes objects
  204. :returns: a new `SinkConfig` object.
  205. :raises: IndexError
  206. """
  207. # Assume the parameters will all be None
  208. status = None
  209. flags = None
  210. v = None
  211. i = None
  212. # Iterate over all lines of text
  213. for line in text:
  214. # If the configuration said invalid index, raise an IndexError
  215. if line.startswith(b"Invalid index"):
  216. raise IndexError("configuration index out of range")
  217. # If there is no configuration, return an empty SinkConfig
  218. elif line.startswith(b"No configuration"):
  219. return cls(None, None, None, None)
  220. # If this line is the status field
  221. elif line.startswith(b"status: "):
  222. line = line.split()[1:]
  223. if line[0] == b"empty":
  224. status = SinkStatus.EMPTY
  225. elif line[0] == b"valid":
  226. status = SinkStatus.VALID
  227. elif line[0] == b"invalid":
  228. status = SinkStatus.INVALID
  229. # If this line is the flags field
  230. elif line.startswith(b"flags: "):
  231. line = line.split()[1:]
  232. flags = SinkFlags.NONE
  233. for word in line:
  234. if word == b"(none)":
  235. # If there are no flags set, stop looking
  236. break
  237. elif word == b"GiveBack":
  238. flags |= SinkFlags.GIVEBACK
  239. # If this line is the v field
  240. elif line.startswith(b"v: "):
  241. word = line.split()[1]
  242. v = round(1000*float(word))
  243. # If this line is the i field
  244. elif line.startswith(b"i: "):
  245. word = line.split()[1]
  246. i = round(1000*float(word))
  247. # Create a new SinkConfig object with the values we just read
  248. return cls(status=status, flags=flags, v=v, i=i)
  249. class SinkStatus(enum.Enum):
  250. """Status field of a PD Buddy Sink configuration object"""
  251. EMPTY = 1
  252. VALID = 2
  253. INVALID = 3
  254. class SinkFlags(enum.Flag):
  255. """Flags field of a PD Buddy Sink configuration object"""
  256. NONE = 0
  257. GIVEBACK = enum.auto()