Python library for working with the PD Buddy Sink Serial Console Configuration Interface
Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.

__init__.py 20KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620
  1. """Python bindings for PD Buddy Sink configuration"""
  2. from collections import namedtuple
  3. try:
  4. # Try importing enum from the standard library
  5. import enum
  6. # Make sure Flag is available
  7. enum.Flag
  8. except (ImportError, NameError, AttributeError):
  9. # If something above failed, try aenum instead
  10. import aenum as enum
  11. import serial
  12. import serial.tools.list_ports
  13. class Sink:
  14. """Interface for configuring a PD Buddy Sink"""
  15. vid = 0x1209
  16. pid = 0x9DB5
  17. def __init__(self, sp):
  18. """Open a serial port to communicate with the PD Buddy Sink
  19. :param sp: the serial port of the device
  20. :type sp: str or `serial.tools.list_ports.ListPortInfo`
  21. """
  22. try:
  23. self._port = serial.Serial(sp, baudrate=115200)
  24. except ValueError:
  25. self._port = serial.Serial(sp.device, baudrate=115200)
  26. # Put communications in a known state, cancelling any partially-entered
  27. # command that may be sitting in the buffer.
  28. self.send_command("\x04", newline=False)
  29. def __enter__(self):
  30. return self
  31. def __exit__(self, exc_type, exc_value, traceback):
  32. self._port.close()
  33. def send_command(self, cmd, newline=True):
  34. """Send a command to the PD Buddy Sink, returning the result
  35. :param cmd: the text to send to the Sink
  36. :param newline: whether to append a ``\r\n`` to the command
  37. :type cmd: str
  38. :type newline: bool
  39. :returns: a list of zero or more bytes objects, each being one line
  40. printed as a response to the command.
  41. """
  42. # Build the command
  43. cmd = cmd.encode("utf-8")
  44. if newline:
  45. cmd += b"\r\n"
  46. # Send the command
  47. self._port.write(cmd)
  48. self._port.flush()
  49. # Read the result
  50. answer = b""
  51. while not answer.endswith(b"PDBS) "):
  52. answer += self._port.read(1)
  53. answer = answer.split(b"\r\n")
  54. # Remove the echoed command and prompt
  55. answer = answer[1:-1]
  56. # Raise an exception if the command wasn't recognized
  57. if len(answer) and answer[0] == cmd.strip().split()[0] + b" ?":
  58. raise KeyError("command not found")
  59. return answer
  60. def close(self):
  61. """Close the serial port"""
  62. self._port.close()
  63. def help(self):
  64. """Returns the help text from the PD Buddy Sink"""
  65. return self.send_command("help")
  66. def license(self):
  67. """Returns the license text from the PD Buddy Sink"""
  68. return self.send_command("license")
  69. def boot(self):
  70. """Runs the PD Buddy Sink's DFU bootloader and closes the serial port"""
  71. self._port.write(b"boot\r\n")
  72. self._port.flush()
  73. self.close()
  74. def erase(self):
  75. """Synchronously erases all stored configuration from flash"""
  76. self.send_command("erase")
  77. def write(self):
  78. """Synchronously writes the contents of the configuration buffer to flash"""
  79. self.send_command("write")
  80. def load(self):
  81. """Loads the current configuration from flash into the buffer
  82. :raises: KeyError
  83. """
  84. text = self.send_command("load")
  85. if len(text) > 0 and text[0].startswith(b"No configuration"):
  86. raise KeyError("no configuration")
  87. def get_cfg(self, index=None):
  88. """Reads configuration from flash
  89. :param index: optional index of configuration object in flash to read
  90. :returns: a `SinkConfig` object
  91. """
  92. if index is None:
  93. cfg = self.send_command("get_cfg")
  94. else:
  95. cfg = self.send_command("get_cfg {}".format(index))
  96. return SinkConfig.from_text(cfg)
  97. def get_tmpcfg(self):
  98. """Reads the contents of the configuration buffer
  99. :returns: a `SinkConfig` object
  100. """
  101. cfg = self.send_command("get_tmpcfg")
  102. return SinkConfig.from_text(cfg)
  103. def clear_flags(self):
  104. """Clears all the flags in the configuration buffer"""
  105. self.send_command("clear_flags")
  106. def toggle_giveback(self):
  107. """Toggles the GiveBack flag in the configuration buffer"""
  108. self.send_command("toggle_giveback")
  109. def set_v(self, mv):
  110. """Sets the voltage of the configuration buffer, in millivolts"""
  111. out = self.send_command("set_v {}".format(mv))
  112. # If that command gave any output, that indicates an error. Raise an
  113. # exception to make that clear.
  114. if len(out):
  115. raise ValueError(out[0])
  116. def set_i(self, ma):
  117. """Sets the current of the configuration buffer, in milliamperes"""
  118. out = self.send_command("set_i {}".format(ma))
  119. # If that command gave any output, that indicates an error. Raise an
  120. # exception to make that clear.
  121. if len(out):
  122. raise ValueError(out[0])
  123. def identify(self):
  124. """Blinks the LED quickly"""
  125. self.send_command("identify")
  126. @property
  127. def output(self):
  128. """The state of the Sink's output
  129. Raises KeyError if the ``output`` command is not available on the Sink.
  130. Raises ValueError if an invalid output is read.
  131. """
  132. value = self.send_command("output")
  133. if value[0] == b"enabled":
  134. return True
  135. elif value[0] == b"disabled":
  136. return False
  137. else:
  138. # If unexpected text is returned, raise an exception indicating a
  139. # firmware error
  140. raise ValueError("unknown output state")
  141. @output.setter
  142. def output(self, state):
  143. if state:
  144. self.send_command("output enable")
  145. else:
  146. self.send_command("output disable")
  147. def get_source_cap(self):
  148. """Gets the most recent Source_Capabilities read by the Sink"""
  149. return read_pdo_list(self.send_command("get_source_cap"))
  150. def set_tmpcfg(self, sc):
  151. """Writes a SinkConfig object to the device's configuration buffer
  152. Note: the value of the status field is ignored; it will always be
  153. `SinkStatus.VALID`.
  154. """
  155. # Set flags
  156. self.clear_flags()
  157. if sc.flags & SinkFlags.GIVEBACK:
  158. self.toggle_giveback()
  159. # Set voltage
  160. self.set_v(sc.v)
  161. # Set current
  162. self.set_i(sc.i)
  163. @classmethod
  164. def get_devices(cls):
  165. """Get an iterable of PD Buddy Sink devices
  166. :returns: an iterable of `serial.tools.list_ports.ListPortInfo` objects
  167. """
  168. return serial.tools.list_ports.grep("{:04X}:{:04X}".format(cls.vid,
  169. cls.pid))
  170. class SinkConfig(namedtuple("SinkConfig", "status flags v i")):
  171. """Python representation of a PD Buddy Sink configuration object
  172. ``status`` should be a `SinkStatus` object. ``flags`` should be zero or
  173. more `SinkFlags` values. ``v`` is the voltage in millivolts, and ``i``
  174. is the current in milliamperes. `None` is also an acceptible value for
  175. any of the fields.
  176. """
  177. __slots__ = ()
  178. def __str__(self):
  179. """Print the SinkStatus in the manner of the configuration shell"""
  180. s = ""
  181. if self.status is not None:
  182. s += "status: "
  183. if self.status is SinkStatus.EMPTY:
  184. s += "empty"
  185. elif self.status is SinkStatus.VALID:
  186. s += "valid"
  187. elif self.status is SinkStatus.INVALID:
  188. s += "invalid"
  189. s += "\n"
  190. if self.flags is not None:
  191. s += "flags: "
  192. if self.flags is SinkFlags.NONE:
  193. s += "(none)"
  194. else:
  195. if self.flags & SinkFlags.GIVEBACK:
  196. s += "GiveBack"
  197. s += "\n"
  198. if self.v is not None:
  199. s += "v: {:.2f} V\n".format(self.v / 1000.0)
  200. if self.i is not None:
  201. s += "i: {:.2f} A\n".format(self.i / 1000.0)
  202. # Return all but the last character of s to remove the trailing newline
  203. if s:
  204. return s[:-1]
  205. else:
  206. return "No configuration"
  207. @classmethod
  208. def from_text(cls, text):
  209. """Creates a SinkConfig from text returned by Sink.send_command
  210. :param text: the text to load
  211. :type text: a list of bytes objects
  212. :returns: a new `SinkConfig` object.
  213. :raises: IndexError
  214. """
  215. # Assume the parameters will all be None
  216. status = None
  217. flags = None
  218. v = None
  219. i = None
  220. # Iterate over all lines of text
  221. for line in text:
  222. # If the configuration said invalid index, raise an IndexError
  223. if line.startswith(b"Invalid index"):
  224. raise IndexError("configuration index out of range")
  225. # If there is no configuration, return an empty SinkConfig
  226. elif line.startswith(b"No configuration"):
  227. return cls(None, None, None, None)
  228. # If this line is the status field
  229. elif line.startswith(b"status: "):
  230. line = line.split()[1:]
  231. if line[0] == b"empty":
  232. status = SinkStatus.EMPTY
  233. elif line[0] == b"valid":
  234. status = SinkStatus.VALID
  235. elif line[0] == b"invalid":
  236. status = SinkStatus.INVALID
  237. # If this line is the flags field
  238. elif line.startswith(b"flags: "):
  239. line = line.split()[1:]
  240. flags = SinkFlags.NONE
  241. for word in line:
  242. if word == b"(none)":
  243. # If there are no flags set, stop looking
  244. break
  245. elif word == b"GiveBack":
  246. flags |= SinkFlags.GIVEBACK
  247. # If this line is the v field
  248. elif line.startswith(b"v: "):
  249. word = line.split()[1]
  250. v = round(1000*float(word))
  251. # If this line is the i field
  252. elif line.startswith(b"i: "):
  253. word = line.split()[1]
  254. i = round(1000*float(word))
  255. # Create a new SinkConfig object with the values we just read
  256. return cls(status=status, flags=flags, v=v, i=i)
  257. class SinkStatus(enum.Enum):
  258. """Status field of a PD Buddy Sink configuration object"""
  259. EMPTY = 1
  260. VALID = 2
  261. INVALID = 3
  262. class SinkFlags(enum.Flag):
  263. """Flags field of a PD Buddy Sink configuration object"""
  264. NONE = 0
  265. GIVEBACK = enum.auto()
  266. class UnknownPDO(namedtuple("UnknownPDO", "value")):
  267. """A PDO of an unknown type
  268. ``value`` should be a 32-bit integer representing the PDO exactly as
  269. transmitted.
  270. """
  271. __slots__ = ()
  272. pdo_type = "unknown"
  273. def __str__(self):
  274. """Print the UnknownPDO in the manner of the configuration shell"""
  275. return "{:08X}".format(self.value)
  276. class SrcFixedPDO(namedtuple("SrcFixedPDO", "dual_role_pwr usb_suspend "
  277. "unconstrained_pwr usb_comms dual_role_data unchunked_ext_msg peak_i "
  278. "v i")):
  279. """A Source Fixed PDO
  280. ``dual_role_pwr``, ``usb_suspend``, ``unconstrained_pwr``,
  281. ``usb_comms``, ``dual_role_data``, and ``unchunked_ext_msg`` should be
  282. booleans. ``peak_i`` should be an integer in the range [0, 3]. ``v``
  283. is the voltage in millivolts, and ``i`` is the maximum current in
  284. milliamperes.
  285. """
  286. __slots__ = ()
  287. pdo_type = "fixed"
  288. def __str__(self):
  289. """Print the SrcFixedPDO in the manner of the configuration shell"""
  290. s = self.pdo_type + "\n"
  291. if self.dual_role_pwr:
  292. s += "\tdual_role_pwr: 1\n"
  293. if self.usb_suspend:
  294. s += "\tusb_suspend: 1\n"
  295. if self.unconstrained_pwr:
  296. s += "\tunconstrained_pwr: 1\n"
  297. if self.usb_comms:
  298. s += "\tusb_comms: 1\n"
  299. if self.dual_role_data:
  300. s += "\tdual_role_data: 1\n"
  301. if self.unchunked_ext_msg:
  302. s += "\tunchunked_ext_msg: 1\n"
  303. if self.peak_i:
  304. s += "\tpeak_i: {}\n".format(self.peak_i)
  305. s += "\tv: {:.2f} V\n".format(self.v / 1000.0)
  306. s += "\ti: {:.2f} A".format(self.i / 1000.0)
  307. return s
  308. class TypeCVirtualPDO(namedtuple("TypeCVirtualPDO", "i")):
  309. """A Type-C Current Virtual PDO
  310. ``i`` is the advertised current in milliamperes.
  311. """
  312. __slots__ = ()
  313. pdo_type = "typec_virtual"
  314. def __str__(self):
  315. """Print the TypeCVirtualPDO in the manner of the configuration shell"""
  316. s = self.pdo_type + "\n"
  317. s += "\ti: {:.2f} A".format(self.i / 1000.0)
  318. return s
  319. def read_pdo(text):
  320. """Create a PDO object from partial text returned by Sink.send_command"""
  321. # First, determine the PDO type
  322. pdo_type = text[0].split(b":")[-1].strip().decode("utf-8")
  323. if pdo_type == SrcFixedPDO.pdo_type:
  324. # Set default values (n.b. there are none for v and i)
  325. dual_role_pwr = False
  326. usb_suspend = False
  327. unconstrained_pwr = False
  328. usb_comms = False
  329. dual_role_data = False
  330. unchunked_ext_msg = False
  331. peak_i = 0
  332. # Load a SrcFixedPDO
  333. for line in text[1:]:
  334. fields = line.split(b":")
  335. fields[0] = fields[0].strip()
  336. fields[1] = fields[1].strip()
  337. if fields[0] == b"dual_role_pwr":
  338. dual_role_pwr = (fields[1] == b"1")
  339. elif fields[0] == b"usb_suspend":
  340. usb_suspend = (fields[1] == b"1")
  341. elif fields[0] == b"unconstrained_pwr":
  342. unconstrained_pwr = (fields[1] == b"1")
  343. elif fields[0] == b"usb_comms":
  344. usb_comms = (fields[1] == b"1")
  345. elif fields[0] == b"dual_role_data":
  346. dual_role_data = (fields[1] == b"1")
  347. elif fields[0] == b"unchunked_ext_msg":
  348. unchunked_ext_msg = (fields[1] == b"1")
  349. elif fields[0] == b"peak_i":
  350. peak_i = int(fields[1])
  351. elif fields[0] == b"v":
  352. v = round(1000*float(fields[1].split()[0]))
  353. elif fields[0] == b"i":
  354. i = round(1000*float(fields[1].split()[0]))
  355. # Make the SrcFixedPDO
  356. return SrcFixedPDO(
  357. dual_role_pwr=dual_role_pwr,
  358. usb_suspend=usb_suspend,
  359. unconstrained_pwr=unconstrained_pwr,
  360. usb_comms=usb_comms,
  361. dual_role_data=dual_role_data,
  362. unchunked_ext_msg=unchunked_ext_msg,
  363. peak_i=peak_i,
  364. v=v,
  365. i=i)
  366. elif pdo_type == TypeCVirtualPDO.pdo_type:
  367. # Load a TypeCVirtualPDO
  368. for line in text[1:]:
  369. fields = line.split(b":")
  370. fields[0] = fields[0].strip()
  371. fields[1] = fields[1].strip()
  372. if fields[0] == b"i":
  373. i = round(1000*float(fields[1].split()[0]))
  374. # Make the TypeCVirtualPDO
  375. return TypeCVirtualPDO(i=i)
  376. elif pdo_type == "No Source_Capabilities":
  377. return None
  378. else:
  379. # Make an UnknownPDO
  380. return UnknownPDO(value=int(pdo_type, 16))
  381. def read_pdo_list(text):
  382. """Create a list of PDOs from text returned by Sink.send_command"""
  383. # Get the lines where PDOs start
  384. pdo_start_list = []
  385. for index, line in enumerate(text):
  386. if not line.startswith(b"\t"):
  387. pdo_start_list.append(index)
  388. # Append the number of lines so the last slice will work right
  389. pdo_start_list.append(len(text))
  390. # Read the PDOs
  391. pdo_list = []
  392. for start, end in zip(pdo_start_list[:-1], pdo_start_list[1:]):
  393. pdo = read_pdo(text[start:end])
  394. if pdo is not None:
  395. pdo_list.append(pdo)
  396. return pdo_list
  397. def calculate_pdp(pdo_list):
  398. """Calculate the PDP in watts of a list of PDOs
  399. The result is only guaranteed to be correct if the power supply follows
  400. the USB Power Delivery standard. Since quite a few power supplies
  401. unfortunately do not, this can really only be considered an estimate.
  402. """
  403. max_power = 0
  404. # The Source Power Rules make it so the PDP can be determined by the
  405. # highest power available from any fixed supply PDO.
  406. for pdo in pdo_list:
  407. if pdo.pdo_type == "fixed":
  408. max_power = max(max_power, pdo.v / 1000.0 * pdo.i / 1000.0)
  409. elif pdo.pdo_type == "typec_virtual":
  410. max_power = max(max_power, 5.0 * pdo.i / 1000.0)
  411. return max_power
  412. def follows_power_rules(pdo_list):
  413. """Test whether a list of PDOs follows the Power Rules for PD 2.0
  414. This function is a false-biased approximation; that is, when it returns
  415. False it is definitely correct, but when it returns True it might be
  416. incorrect.
  417. """
  418. # First, estimate the PDP assuming the rules are being followed
  419. pdp = calculate_pdp(pdo_list)
  420. # If there's a typec_virtual PDO, there's no Power Delivery so the Power
  421. # Rules cannot be violated. In truth they're not really being followed
  422. # either since they only apply to Power Delivery, but returning True here
  423. # seems like the safer option.
  424. if pdo_list and pdo_list[0].pdo_type == "typec_virtual":
  425. return True
  426. # Make sure nothing exceeds the PDP
  427. for pdo in pdo_list:
  428. if pdo.pdo_type == "fixed":
  429. if pdp < pdo.v / 1000.0 * pdo.i / 1000.0:
  430. return False
  431. # TODO: in the future, there will be more types of PDO checked here
  432. # Check that the fixed supply PDOs look right
  433. seen_5v = False
  434. seen_9v = False
  435. seen_15v = False
  436. seen_20v = False
  437. seen_normative_voltages = False
  438. if pdp == 0:
  439. # No power is fine
  440. seen_normative_voltages = True
  441. elif pdp <= 15:
  442. # Below 15 W, make sure the PDP is available at 5 V.
  443. for pdo in pdo_list:
  444. if pdo.pdo_type == "fixed" and pdo.v == 5000:
  445. seen_5v = True
  446. if pdo.v / 1000.0 * pdo.i / 1000.0 != pdp:
  447. return False
  448. seen_normative_voltages = seen_5v
  449. elif pdp <= 27:
  450. # Between 15 and 27 W, make sure at least 3 A is available at 5 V and
  451. # the PDP is available at 9 V.
  452. for pdo in pdo_list:
  453. if pdo.pdo_type == "fixed" and pdo.v == 5000:
  454. seen_5v = True
  455. if pdo.i < 3000.0:
  456. return False
  457. elif pdo.pdo_type == "fixed" and pdo.v == 9000:
  458. seen_9v = True
  459. if pdo.v / 1000.0 * pdo.i / 1000.0 != pdp:
  460. return False
  461. seen_normative_voltages = seen_5v and seen_9v
  462. elif pdp <= 45:
  463. # Between 27 and 45 W, make sure at least 3 A is available at 5 and
  464. # 9 V, and the PDP is available at 15 V.
  465. for pdo in pdo_list:
  466. if pdo.pdo_type == "fixed" and pdo.v == 5000:
  467. seen_5v = True
  468. if pdo.i < 3000.0:
  469. return False
  470. elif pdo.pdo_type == "fixed" and pdo.v == 9000:
  471. seen_9v = True
  472. if pdo.i < 3000.0:
  473. return False
  474. elif pdo.pdo_type == "fixed" and pdo.v == 15000:
  475. seen_15v = True
  476. if pdo.v / 1000.0 * pdo.i / 1000.0 != pdp:
  477. return False
  478. seen_normative_voltages = seen_5v and seen_9v and seen_15v
  479. else:
  480. # Above 45 W, make sure at least 3 A is available at 5, 9, and 15 V,
  481. # and the PDP is available at 20 V.
  482. for pdo in pdo_list:
  483. if pdo.pdo_type == "fixed" and pdo.v == 5000:
  484. seen_5v = True
  485. if pdo.i < 3000.0:
  486. return False
  487. elif pdo.pdo_type == "fixed" and pdo.v == 9000:
  488. seen_9v = True
  489. if pdo.i < 3000.0:
  490. return False
  491. elif pdo.pdo_type == "fixed" and pdo.v == 15000:
  492. seen_15v = True
  493. if pdo.i < 3000.0:
  494. return False
  495. elif pdo.pdo_type == "fixed" and pdo.v == 20000:
  496. seen_20v = True
  497. if pdo.v / 1000.0 * pdo.i / 1000.0 != pdp:
  498. return False
  499. seen_normative_voltages = seen_5v and seen_9v and seen_15v and seen_20v
  500. if not seen_normative_voltages:
  501. return False
  502. # TODO: there are several things this currently doesn't test, such as
  503. # variable and battery PDOs.
  504. return True