123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688 |
- """Python bindings for PD Buddy Sink configuration"""
-
- from collections import namedtuple
-
- try:
- # Try importing enum from the standard library
- import enum
- # Make sure Flag is available
- enum.Flag
- except (ImportError, NameError, AttributeError):
- # If something above failed, try aenum instead
- import aenum as enum
-
- import serial
- import serial.tools.list_ports
-
-
- class Sink:
- """Interface for configuring a PD Buddy Sink"""
- vid = 0x1209
- pid = 0x9DB5
-
- def __init__(self, sp):
- """Open a serial port to communicate with the PD Buddy Sink
-
- :param sp: the serial port of the device
- :type sp: str or `serial.tools.list_ports.ListPortInfo`
- """
- try:
- self._port = serial.Serial(sp, baudrate=115200)
- except ValueError:
- self._port = serial.Serial(sp.device, baudrate=115200)
-
- # Put communications in a known state, cancelling any partially-entered
- # command that may be sitting in the buffer.
- self.send_command("\x04", newline=False)
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_value, traceback):
- self._port.close()
-
- def send_command(self, cmd, newline=True):
- """Send a command to the PD Buddy Sink, returning the result
-
- :param cmd: the text to send to the Sink
- :param newline: whether to append a ``\r\n`` to the command
- :type cmd: str
- :type newline: bool
-
- :returns: a list of zero or more bytes objects, each being one line
- printed as a response to the command.
- """
- # Build the command
- cmd = cmd.encode("utf-8")
- if newline:
- cmd += b"\r\n"
-
- # Send the command
- self._port.write(cmd)
- self._port.flush()
-
- # Read the result
- answer = b""
- while not answer.endswith(b"PDBS) "):
- answer += self._port.read(1)
- answer = answer.split(b"\r\n")
-
- # Remove the echoed command and prompt
- answer = answer[1:-1]
-
- # Raise an exception if the command wasn't recognized
- if len(answer) and answer[0] == cmd.strip().split()[0] + b" ?":
- raise KeyError("command not found")
-
- return answer
-
- def close(self):
- """Close the serial port"""
- self._port.close()
-
- def help(self):
- """Returns the help text from the PD Buddy Sink"""
- return self.send_command("help")
-
- def license(self):
- """Returns the license text from the PD Buddy Sink"""
- return self.send_command("license")
-
- def boot(self):
- """Runs the PD Buddy Sink's DFU bootloader and closes the serial port"""
- self._port.write(b"boot\r\n")
- self._port.flush()
- self.close()
-
- def erase(self):
- """Synchronously erases all stored configuration from flash"""
- self.send_command("erase")
-
- def write(self):
- """Synchronously writes the contents of the configuration buffer to flash"""
- self.send_command("write")
-
- def load(self):
- """Loads the current configuration from flash into the buffer
-
- :raises: KeyError
- """
- text = self.send_command("load")
- if len(text) > 0 and text[0].startswith(b"No configuration"):
- raise KeyError("no configuration")
-
- def get_cfg(self, index=None):
- """Reads configuration from flash
-
- :param index: optional index of configuration object in flash to read
-
- :returns: a `SinkConfig` object
- """
- if index is None:
- cfg = self.send_command("get_cfg")
- else:
- cfg = self.send_command("get_cfg {}".format(index))
-
- return SinkConfig.from_text(cfg)
-
- def get_tmpcfg(self):
- """Reads the contents of the configuration buffer
-
- :returns: a `SinkConfig` object
- """
- cfg = self.send_command("get_tmpcfg")
-
- return SinkConfig.from_text(cfg)
-
- def clear_flags(self):
- """Clears all the flags in the configuration buffer"""
- self.send_command("clear_flags")
-
- def toggle_giveback(self):
- """Toggles the GiveBack flag in the configuration buffer"""
- self.send_command("toggle_giveback")
-
- def toggle_hv_preferred(self):
- """Toggles the HV_Preferred flag in the configuration buffer"""
- self.send_command("toggle_hv_preferred")
-
- def set_v(self, mv):
- """Sets the voltage of the configuration buffer, in millivolts"""
- out = self.send_command("set_v {}".format(mv))
- # If that command gave any output, that indicates an error. Raise an
- # exception to make that clear.
- if len(out):
- raise ValueError(out[0])
-
- def set_vrange(self, vmin, vmax):
- """Sets the min and max voltage of the configuration buffer, in millivolts"""
- # First, make sure we're sending numbers to the Sink in all valid cases
- if vmin is None and vmax is None:
- vmin = 0
- vmax = 0
- out = self.send_command("set_vrange {} {}".format(vmin, vmax))
- # If that command gave any output, that indicates an error. Raise an
- # exception to make that clear.
- if len(out):
- raise ValueError(out[0])
-
- def set_i(self, ma):
- """Sets the current of the configuration buffer, in milliamperes"""
- out = self.send_command("set_i {}".format(ma))
- # If that command gave any output, that indicates an error. Raise an
- # exception to make that clear.
- if len(out):
- raise ValueError(out[0])
-
- def identify(self):
- """Blinks the LED quickly"""
- self.send_command("identify")
-
- @property
- def output(self):
- """The state of the Sink's output
-
- Raises KeyError if the ``output`` command is not available on the Sink.
- Raises ValueError if an invalid output is read.
- """
- value = self.send_command("output")
- if value[0] == b"enabled":
- return True
- elif value[0] == b"disabled":
- return False
- else:
- # If unexpected text is returned, raise an exception indicating a
- # firmware error
- raise ValueError("unknown output state")
-
- @output.setter
- def output(self, state):
- if state:
- self.send_command("output enable")
- else:
- self.send_command("output disable")
-
- def get_source_cap(self):
- """Gets the most recent Source_Capabilities read by the Sink"""
- return read_pdo_list(self.send_command("get_source_cap"))
-
- def set_tmpcfg(self, sc):
- """Writes a SinkConfig object to the device's configuration buffer
-
- Note: the value of the status field is ignored; it will always be
- `SinkStatus.VALID`.
- """
- # Set flags
- self.clear_flags()
- if sc.flags & SinkFlags.GIVEBACK:
- self.toggle_giveback()
- if sc.flags & SinkFlags.HV_PREFERRED:
- self.toggle_hv_preferred()
-
- # Set voltage
- self.set_v(sc.v)
-
- # Set voltage range
- self.set_vrange(sc.vmin, sc.vmax)
-
- # Set current
- self.set_i(sc.i)
-
- @classmethod
- def get_devices(cls):
- """Get an iterable of PD Buddy Sink devices
-
- :returns: an iterable of `serial.tools.list_ports.ListPortInfo` objects
- """
- return serial.tools.list_ports.grep("{:04X}:{:04X}".format(cls.vid,
- cls.pid))
-
-
- class SinkConfig(namedtuple("SinkConfig", "status flags v vmin vmax i idim")):
- """Python representation of a PD Buddy Sink configuration object
-
- ``status`` should be a `SinkStatus` object. ``flags`` should be zero or
- more `SinkFlags` values. ``v``, ``vmin``, and ``vmax`` are voltages in
- millivolts, ``i`` is the value used to set current in the appropriate
- milli- SI unit, and ``idim`` is the dimension of ``i``. `None` is also
- an acceptible value for any of the fields.
- """
- __slots__ = ()
-
- def __str__(self):
- """Print the SinkStatus in the manner of the configuration shell"""
- s = ""
-
- if self.status is not None:
- s += "status: "
- if self.status is SinkStatus.EMPTY:
- s += "empty"
- elif self.status is SinkStatus.VALID:
- s += "valid"
- elif self.status is SinkStatus.INVALID:
- s += "invalid"
- s += "\n"
-
- if self.flags is not None:
- s += "flags: "
- if self.flags is SinkFlags.NONE:
- s += "(none)"
- else:
- if self.flags & SinkFlags.GIVEBACK:
- s += "GiveBack"
- if self.flags & SinkFlags.HV_PREFERRED:
- s += "HV_Preferred"
- s += "\n"
-
- if self.v is not None:
- s += "v: {:.3f} V\n".format(self.v / 1000.0)
-
- if self.vmin is not None:
- s += "vmin: {:.3f} V\n".format(self.vmin / 1000.0)
-
- if self.vmax is not None:
- s += "vmax: {:.3f} V\n".format(self.vmax / 1000.0)
-
- if self.i is not None:
- if self.idim is SinkDimension.CURRENT:
- s += "i: {:.2f} A\n".format(self.i / 1000.0)
- if self.idim is SinkDimension.POWER:
- s += "p: {:.2f} W\n".format(self.i / 1000.0)
- if self.idim is SinkDimension.RESISTANCE:
- s += "r: {:.2f} \u03A9\n".format(self.i / 1000.0)
-
- # Return all but the last character of s to remove the trailing newline
- if s:
- return s[:-1]
- else:
- return "No configuration"
-
- @classmethod
- def from_text(cls, text):
- """Creates a SinkConfig from text returned by Sink.send_command
-
- :param text: the text to load
- :type text: a list of bytes objects
- :returns: a new `SinkConfig` object.
-
- :raises: IndexError
- """
- # Assume the parameters will all be None
- status = None
- flags = None
- v = None
- vmin = None
- vmax = None
- i = None
- idim = None
-
- # Iterate over all lines of text
- for line in text:
- # If the configuration said invalid index, raise an IndexError
- if line.startswith(b"Invalid index"):
- raise IndexError("configuration index out of range")
- # If there is no configuration, return an empty SinkConfig
- elif line.startswith(b"No configuration"):
- return cls(None, None, None, None, None, None, None)
- # If this line is the status field
- elif line.startswith(b"status: "):
- line = line.split()[1:]
- if line[0] == b"empty":
- status = SinkStatus.EMPTY
- elif line[0] == b"valid":
- status = SinkStatus.VALID
- elif line[0] == b"invalid":
- status = SinkStatus.INVALID
- # If this line is the flags field
- elif line.startswith(b"flags: "):
- line = line.split()[1:]
- flags = SinkFlags.NONE
- for word in line:
- if word == b"(none)":
- # If there are no flags set, stop looking
- break
- elif word == b"GiveBack":
- flags |= SinkFlags.GIVEBACK
- elif word == b"HV_Preferred":
- flags |= SinkFlags.HV_PREFERRED
- # If this line is the v field
- elif line.startswith(b"v: "):
- word = line.split()[1]
- v = round(1000*float(word))
- # If this line is the vmin field
- elif line.startswith(b"vmin: "):
- word = line.split()[1]
- vmin = round(1000*float(word))
- # If this line is the vmax field
- elif line.startswith(b"vmax: "):
- word = line.split()[1]
- vmax = round(1000*float(word))
- # If this line is the i field
- elif line.startswith(b"i: "):
- word = line.split()[1]
- i = round(1000*float(word))
- idim = SinkDimension.CURRENT
- # If this line is the p field
- elif line.startswith(b"p: "):
- word = line.split()[1]
- i = round(1000*float(word))
- idim = SinkDimension.POWER
- # If this line is the r field
- elif line.startswith(b"r: "):
- word = line.split()[1]
- i = round(1000*float(word))
- idim = SinkDimension.RESISTANCE
-
- # Create a new SinkConfig object with the values we just read
- return cls(status=status, flags=flags, v=v, vmin=vmin, vmax=vmax, i=i,
- idim=idim)
-
-
- class SinkStatus(enum.Enum):
- """Status field of a PD Buddy Sink configuration object"""
- EMPTY = 1
- VALID = 2
- INVALID = 3
-
-
- class SinkDimension(enum.Enum):
- """Dimension of the value used to set the current requested"""
- CURRENT = 1
- POWER = 2
- RESISTANCE = 3
-
-
- class SinkFlags(enum.Flag):
- """Flags field of a PD Buddy Sink configuration object"""
- NONE = 0
- GIVEBACK = enum.auto()
- HV_PREFERRED = enum.auto()
-
-
- class UnknownPDO(namedtuple("UnknownPDO", "value")):
- """A PDO of an unknown type
-
- ``value`` should be a 32-bit integer representing the PDO exactly as
- transmitted.
- """
- __slots__ = ()
-
- pdo_type = "unknown"
-
- def __str__(self):
- """Print the UnknownPDO in the manner of the configuration shell"""
- return "{:08X}".format(self.value)
-
-
- class SrcFixedPDO(namedtuple("SrcFixedPDO", "dual_role_pwr usb_suspend "
- "unconstrained_pwr usb_comms dual_role_data unchunked_ext_msg peak_i "
- "v i")):
- """A Source Fixed PDO
-
- ``dual_role_pwr``, ``usb_suspend``, ``unconstrained_pwr``,
- ``usb_comms``, ``dual_role_data``, and ``unchunked_ext_msg`` should be
- booleans. ``peak_i`` should be an integer in the range [0, 3]. ``v``
- is the voltage in millivolts, and ``i`` is the maximum current in
- milliamperes.
- """
- __slots__ = ()
-
- pdo_type = "fixed"
-
- def __str__(self):
- """Print the SrcFixedPDO in the manner of the configuration shell"""
- s = self.pdo_type + "\n"
-
- if self.dual_role_pwr:
- s += "\tdual_role_pwr: 1\n"
-
- if self.usb_suspend:
- s += "\tusb_suspend: 1\n"
-
- if self.unconstrained_pwr:
- s += "\tunconstrained_pwr: 1\n"
-
- if self.usb_comms:
- s += "\tusb_comms: 1\n"
-
- if self.dual_role_data:
- s += "\tdual_role_data: 1\n"
-
- if self.unchunked_ext_msg:
- s += "\tunchunked_ext_msg: 1\n"
-
- if self.peak_i:
- s += "\tpeak_i: {}\n".format(self.peak_i)
-
- s += "\tv: {:.2f} V\n".format(self.v / 1000.0)
- s += "\ti: {:.2f} A".format(self.i / 1000.0)
-
- return s
-
-
- class TypeCVirtualPDO(namedtuple("TypeCVirtualPDO", "i")):
- """A Type-C Current Virtual PDO
-
- ``i`` is the advertised current in milliamperes.
- """
- __slots__ = ()
-
- pdo_type = "typec_virtual"
-
- def __str__(self):
- """Print the TypeCVirtualPDO in the manner of the configuration shell"""
- s = self.pdo_type + "\n"
-
- s += "\ti: {:.2f} A".format(self.i / 1000.0)
-
- return s
-
-
- def read_pdo(text):
- """Create a PDO object from partial text returned by Sink.send_command"""
- # First, determine the PDO type
- pdo_type = text[0].split(b":")[-1].strip().decode("utf-8")
-
- if pdo_type == SrcFixedPDO.pdo_type:
- # Set default values (n.b. there are none for v and i)
- dual_role_pwr = False
- usb_suspend = False
- unconstrained_pwr = False
- usb_comms = False
- dual_role_data = False
- unchunked_ext_msg = False
- peak_i = 0
-
- # Load a SrcFixedPDO
- for line in text[1:]:
- fields = line.split(b":")
- fields[0] = fields[0].strip()
- fields[1] = fields[1].strip()
- if fields[0] == b"dual_role_pwr":
- dual_role_pwr = (fields[1] == b"1")
- elif fields[0] == b"usb_suspend":
- usb_suspend = (fields[1] == b"1")
- elif fields[0] == b"unconstrained_pwr":
- unconstrained_pwr = (fields[1] == b"1")
- elif fields[0] == b"usb_comms":
- usb_comms = (fields[1] == b"1")
- elif fields[0] == b"dual_role_data":
- dual_role_data = (fields[1] == b"1")
- elif fields[0] == b"unchunked_ext_msg":
- unchunked_ext_msg = (fields[1] == b"1")
- elif fields[0] == b"peak_i":
- peak_i = int(fields[1])
- elif fields[0] == b"v":
- v = round(1000*float(fields[1].split()[0]))
- elif fields[0] == b"i":
- i = round(1000*float(fields[1].split()[0]))
-
- # Make the SrcFixedPDO
- return SrcFixedPDO(
- dual_role_pwr=dual_role_pwr,
- usb_suspend=usb_suspend,
- unconstrained_pwr=unconstrained_pwr,
- usb_comms=usb_comms,
- dual_role_data=dual_role_data,
- unchunked_ext_msg=unchunked_ext_msg,
- peak_i=peak_i,
- v=v,
- i=i)
- elif pdo_type == TypeCVirtualPDO.pdo_type:
- # Load a TypeCVirtualPDO
- for line in text[1:]:
- fields = line.split(b":")
- fields[0] = fields[0].strip()
- fields[1] = fields[1].strip()
- if fields[0] == b"i":
- i = round(1000*float(fields[1].split()[0]))
-
- # Make the TypeCVirtualPDO
- return TypeCVirtualPDO(i=i)
- elif pdo_type == "No Source_Capabilities":
- return None
- else:
- # Make an UnknownPDO
- return UnknownPDO(value=int(pdo_type, 16))
-
-
- def read_pdo_list(text):
- """Create a list of PDOs from text returned by Sink.send_command"""
- # Get the lines where PDOs start
- pdo_start_list = []
- for index, line in enumerate(text):
- if not line.startswith(b"\t"):
- pdo_start_list.append(index)
- # Append the number of lines so the last slice will work right
- pdo_start_list.append(len(text))
-
- # Read the PDOs
- pdo_list = []
- for start, end in zip(pdo_start_list[:-1], pdo_start_list[1:]):
- pdo = read_pdo(text[start:end])
- if pdo is not None:
- pdo_list.append(pdo)
-
- return pdo_list
-
-
- def calculate_pdp(pdo_list):
- """Calculate the PDP in watts of a list of PDOs
-
- The result is only guaranteed to be correct if the power supply follows
- the USB Power Delivery standard. Since quite a few power supplies
- unfortunately do not, this can really only be considered an estimate.
- """
- max_power = 0
-
- # The Source Power Rules make it so the PDP can be determined by the
- # highest power available from any fixed supply PDO.
- for pdo in pdo_list:
- if pdo.pdo_type == "fixed":
- max_power = max(max_power, pdo.v / 1000.0 * pdo.i / 1000.0)
- elif pdo.pdo_type == "typec_virtual":
- max_power = max(max_power, 5.0 * pdo.i / 1000.0)
-
- return max_power
-
-
- def follows_power_rules(pdo_list):
- """Test whether a list of PDOs follows the Power Rules for PD 2.0
-
- This function is a false-biased approximation; that is, when it returns
- False it is definitely correct, but when it returns True it might be
- incorrect.
- """
- # First, estimate the PDP assuming the rules are being followed
- pdp = calculate_pdp(pdo_list)
-
- # If there's a typec_virtual PDO, there's no Power Delivery so the Power
- # Rules cannot be violated. In truth they're not really being followed
- # either since they only apply to Power Delivery, but returning True here
- # seems like the safer option.
- if pdo_list and pdo_list[0].pdo_type == "typec_virtual":
- return True
-
- # Make sure nothing exceeds the PDP
- for pdo in pdo_list:
- if pdo.pdo_type == "fixed":
- if pdp < pdo.v / 1000.0 * pdo.i / 1000.0:
- return False
- # TODO: in the future, there will be more types of PDO checked here
-
- # Check that the fixed supply PDOs look right
- seen_5v = False
- seen_9v = False
- seen_15v = False
- seen_20v = False
- seen_normative_voltages = False
- if pdp == 0:
- # No power is fine
- seen_normative_voltages = True
- elif pdp <= 15:
- # Below 15 W, make sure the PDP is available at 5 V.
- for pdo in pdo_list:
- if pdo.pdo_type == "fixed" and pdo.v == 5000:
- seen_5v = True
- if pdo.v / 1000.0 * pdo.i / 1000.0 != pdp:
- return False
- seen_normative_voltages = seen_5v
- elif pdp <= 27:
- # Between 15 and 27 W, make sure at least 3 A is available at 5 V and
- # the PDP is available at 9 V.
- for pdo in pdo_list:
- if pdo.pdo_type == "fixed" and pdo.v == 5000:
- seen_5v = True
- if pdo.i < 3000.0:
- return False
- elif pdo.pdo_type == "fixed" and pdo.v == 9000:
- seen_9v = True
- if pdo.v / 1000.0 * pdo.i / 1000.0 != pdp:
- return False
- seen_normative_voltages = seen_5v and seen_9v
- elif pdp <= 45:
- # Between 27 and 45 W, make sure at least 3 A is available at 5 and
- # 9 V, and the PDP is available at 15 V.
- for pdo in pdo_list:
- if pdo.pdo_type == "fixed" and pdo.v == 5000:
- seen_5v = True
- if pdo.i < 3000.0:
- return False
- elif pdo.pdo_type == "fixed" and pdo.v == 9000:
- seen_9v = True
- if pdo.i < 3000.0:
- return False
- elif pdo.pdo_type == "fixed" and pdo.v == 15000:
- seen_15v = True
- if pdo.v / 1000.0 * pdo.i / 1000.0 != pdp:
- return False
- seen_normative_voltages = seen_5v and seen_9v and seen_15v
- else:
- # Above 45 W, make sure at least 3 A is available at 5, 9, and 15 V,
- # and the PDP is available at 20 V.
- for pdo in pdo_list:
- if pdo.pdo_type == "fixed" and pdo.v == 5000:
- seen_5v = True
- if pdo.i < 3000.0:
- return False
- elif pdo.pdo_type == "fixed" and pdo.v == 9000:
- seen_9v = True
- if pdo.i < 3000.0:
- return False
- elif pdo.pdo_type == "fixed" and pdo.v == 15000:
- seen_15v = True
- if pdo.i < 3000.0:
- return False
- elif pdo.pdo_type == "fixed" and pdo.v == 20000:
- seen_20v = True
- if pdo.v / 1000.0 * pdo.i / 1000.0 != pdp:
- return False
- seen_normative_voltages = seen_5v and seen_9v and seen_15v and seen_20v
-
- if not seen_normative_voltages:
- return False
-
- # TODO: there are several things this currently doesn't test, such as
- # variable and battery PDOs.
-
- return True
|