diff --git a/libagent/device/jade.py b/libagent/device/jade.py index 8318a98..abb62dd 100644 --- a/libagent/device/jade.py +++ b/libagent/device/jade.py @@ -1,15 +1,12 @@ """Jade-related code (see https://www.keepkey.com/).""" -import ecdsa import logging + +import ecdsa import semver -from serial.tools import list_ports - -from jadepy import JadeAPI +from .. import formats, util from . import interface -from .. import formats -from .. import util log = logging.getLogger(__name__) @@ -23,6 +20,7 @@ def _verify_support(identity, ecdh): class BlockstreamJade(interface.Device): """Connection to Blockstream Jade device.""" + MIN_SUPPORTED_FW_VERSION = semver.VersionInfo(0, 1, 33) DEVICE_IDS = [(0x10c4, 0xea60), (0x1a86, 0x55d4)] connection = None @@ -33,6 +31,11 @@ class BlockstreamJade(interface.Device): return 'jade-agent' def connect(self): + """Connect to the first matching device.""" + # pylint: disable=import-error + from jadepy import JadeAPI + from serial.tools import list_ports + # Return the existing connection if we have one if BlockstreamJade.connection is not None: return BlockstreamJade.connection @@ -41,7 +44,6 @@ class BlockstreamJade(interface.Device): # Scan com ports looking for the relevant vid and pid, and connect to the # first matching device. Then call 'auth_user' - this usually requires network # access in order to unlock the device with a PIN and the remote blind pinserver. - devices = [] for devinfo in list_ports.comports(): device_product_key = (devinfo.vid, devinfo.pid) if device_product_key in self.DEVICE_IDS: @@ -59,7 +61,7 @@ class BlockstreamJade(interface.Device): fwversion = semver.VersionInfo.parse(verinfo['JADE_VERSION']) if self.MIN_SUPPORTED_FW_VERSION > fwversion.finalize_version(): msg = ('Outdated {} firmware for device. Please update using' - ' a Blockstream Green companion app') + ' a Blockstream Green companion app') raise ValueError(msg.format(fwversion)) # Authenticate the user (unlock with pin) @@ -75,13 +77,14 @@ class BlockstreamJade(interface.Device): except Exception as e: raise interface.NotFoundError( '{} not connected: "{}"'.format(self, e)) + return None @staticmethod - def get_identity_string(identity): + def _get_identity_string(identity): return interface.identity_to_string(identity.identity_dict) @staticmethod - def load_uncompressed_pubkey(pubkey, curve_name): + def _load_uncompressed_pubkey(pubkey, curve_name): assert curve_name == formats.CURVE_NIST256 assert len(pubkey) == 65 and pubkey[0] == 0x04 curve = ecdsa.NIST256p @@ -94,7 +97,7 @@ class BlockstreamJade(interface.Device): def pubkey(self, identity, ecdh=False): """Get PublicKey object for specified BIP32 address and elliptic curve.""" _verify_support(identity, ecdh) - identity_string = self.get_identity_string(identity) + identity_string = self._get_identity_string(identity) curve_name = identity.get_curve_name(ecdh=ecdh) key_type = 'slip-0017' if ecdh else 'slip-0013' @@ -104,13 +107,15 @@ class BlockstreamJade(interface.Device): log.debug('result: %s', result) assert len(result) == 33 or len(result) == 65 - convert_pubkey = formats.decompress_pubkey if len(result) == 33 else self.load_uncompressed_pubkey + convert_pubkey = (formats.decompress_pubkey + if len(result) == 33 else + self._load_uncompressed_pubkey) return convert_pubkey(pubkey=result, curve_name=curve_name) def sign(self, identity, blob): """Sign given blob and return the signature (as bytes).""" _verify_support(identity, ecdh=False) - identity_string = self.get_identity_string(identity) + identity_string = self._get_identity_string(identity) curve_name = identity.get_curve_name(ecdh=False) log.debug('"%s" signing %r (%s) on %s', @@ -127,7 +132,7 @@ class BlockstreamJade(interface.Device): def ecdh(self, identity, pubkey): """Get shared session key using Elliptic Curve Diffie-Hellman.""" _verify_support(identity, ecdh=True) - identity_string = self.get_identity_string(identity) + identity_string = self._get_identity_string(identity) curve_name = identity.get_curve_name(ecdh=True) log.debug('"%s" shared session key (%s) for %r from %s',