Small style fixes following #382

Tested with `tox`.
This commit is contained in:
Roman Zeyde
2022-03-11 09:19:56 +02:00
parent 0662ced2f4
commit df84c4c15f

View File

@@ -1,15 +1,12 @@
"""Jade-related code (see https://www.keepkey.com/).""" """Jade-related code (see https://www.keepkey.com/)."""
import ecdsa
import logging import logging
import ecdsa
import semver import semver
from serial.tools import list_ports
from jadepy import JadeAPI
from .. import formats, util
from . import interface from . import interface
from .. import formats
from .. import util
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -23,6 +20,7 @@ def _verify_support(identity, ecdh):
class BlockstreamJade(interface.Device): class BlockstreamJade(interface.Device):
"""Connection to Blockstream Jade device.""" """Connection to Blockstream Jade device."""
MIN_SUPPORTED_FW_VERSION = semver.VersionInfo(0, 1, 33) MIN_SUPPORTED_FW_VERSION = semver.VersionInfo(0, 1, 33)
DEVICE_IDS = [(0x10c4, 0xea60), (0x1a86, 0x55d4)] DEVICE_IDS = [(0x10c4, 0xea60), (0x1a86, 0x55d4)]
connection = None connection = None
@@ -33,6 +31,11 @@ class BlockstreamJade(interface.Device):
return 'jade-agent' return 'jade-agent'
def connect(self): def connect(self):
"""Connect to the first matching device."""
# pylint: disable=import-error
from jadepy import JadeAPI
from serial.tools import list_ports
# Return the existing connection if we have one # Return the existing connection if we have one
if BlockstreamJade.connection is not None: if BlockstreamJade.connection is not None:
return BlockstreamJade.connection return BlockstreamJade.connection
@@ -41,7 +44,6 @@ class BlockstreamJade(interface.Device):
# Scan com ports looking for the relevant vid and pid, and connect to the # Scan com ports looking for the relevant vid and pid, and connect to the
# first matching device. Then call 'auth_user' - this usually requires network # first matching device. Then call 'auth_user' - this usually requires network
# access in order to unlock the device with a PIN and the remote blind pinserver. # access in order to unlock the device with a PIN and the remote blind pinserver.
devices = []
for devinfo in list_ports.comports(): for devinfo in list_ports.comports():
device_product_key = (devinfo.vid, devinfo.pid) device_product_key = (devinfo.vid, devinfo.pid)
if device_product_key in self.DEVICE_IDS: if device_product_key in self.DEVICE_IDS:
@@ -59,7 +61,7 @@ class BlockstreamJade(interface.Device):
fwversion = semver.VersionInfo.parse(verinfo['JADE_VERSION']) fwversion = semver.VersionInfo.parse(verinfo['JADE_VERSION'])
if self.MIN_SUPPORTED_FW_VERSION > fwversion.finalize_version(): if self.MIN_SUPPORTED_FW_VERSION > fwversion.finalize_version():
msg = ('Outdated {} firmware for device. Please update using' msg = ('Outdated {} firmware for device. Please update using'
' a Blockstream Green companion app') ' a Blockstream Green companion app')
raise ValueError(msg.format(fwversion)) raise ValueError(msg.format(fwversion))
# Authenticate the user (unlock with pin) # Authenticate the user (unlock with pin)
@@ -75,13 +77,14 @@ class BlockstreamJade(interface.Device):
except Exception as e: except Exception as e:
raise interface.NotFoundError( raise interface.NotFoundError(
'{} not connected: "{}"'.format(self, e)) '{} not connected: "{}"'.format(self, e))
return None
@staticmethod @staticmethod
def get_identity_string(identity): def _get_identity_string(identity):
return interface.identity_to_string(identity.identity_dict) return interface.identity_to_string(identity.identity_dict)
@staticmethod @staticmethod
def load_uncompressed_pubkey(pubkey, curve_name): def _load_uncompressed_pubkey(pubkey, curve_name):
assert curve_name == formats.CURVE_NIST256 assert curve_name == formats.CURVE_NIST256
assert len(pubkey) == 65 and pubkey[0] == 0x04 assert len(pubkey) == 65 and pubkey[0] == 0x04
curve = ecdsa.NIST256p curve = ecdsa.NIST256p
@@ -94,7 +97,7 @@ class BlockstreamJade(interface.Device):
def pubkey(self, identity, ecdh=False): def pubkey(self, identity, ecdh=False):
"""Get PublicKey object for specified BIP32 address and elliptic curve.""" """Get PublicKey object for specified BIP32 address and elliptic curve."""
_verify_support(identity, ecdh) _verify_support(identity, ecdh)
identity_string = self.get_identity_string(identity) identity_string = self._get_identity_string(identity)
curve_name = identity.get_curve_name(ecdh=ecdh) curve_name = identity.get_curve_name(ecdh=ecdh)
key_type = 'slip-0017' if ecdh else 'slip-0013' key_type = 'slip-0017' if ecdh else 'slip-0013'
@@ -104,13 +107,15 @@ class BlockstreamJade(interface.Device):
log.debug('result: %s', result) log.debug('result: %s', result)
assert len(result) == 33 or len(result) == 65 assert len(result) == 33 or len(result) == 65
convert_pubkey = formats.decompress_pubkey if len(result) == 33 else self.load_uncompressed_pubkey convert_pubkey = (formats.decompress_pubkey
if len(result) == 33 else
self._load_uncompressed_pubkey)
return convert_pubkey(pubkey=result, curve_name=curve_name) return convert_pubkey(pubkey=result, curve_name=curve_name)
def sign(self, identity, blob): def sign(self, identity, blob):
"""Sign given blob and return the signature (as bytes).""" """Sign given blob and return the signature (as bytes)."""
_verify_support(identity, ecdh=False) _verify_support(identity, ecdh=False)
identity_string = self.get_identity_string(identity) identity_string = self._get_identity_string(identity)
curve_name = identity.get_curve_name(ecdh=False) curve_name = identity.get_curve_name(ecdh=False)
log.debug('"%s" signing %r (%s) on %s', log.debug('"%s" signing %r (%s) on %s',
@@ -127,7 +132,7 @@ class BlockstreamJade(interface.Device):
def ecdh(self, identity, pubkey): def ecdh(self, identity, pubkey):
"""Get shared session key using Elliptic Curve Diffie-Hellman.""" """Get shared session key using Elliptic Curve Diffie-Hellman."""
_verify_support(identity, ecdh=True) _verify_support(identity, ecdh=True)
identity_string = self.get_identity_string(identity) identity_string = self._get_identity_string(identity)
curve_name = identity.get_curve_name(ecdh=True) curve_name = identity.get_curve_name(ecdh=True)
log.debug('"%s" shared session key (%s) for %r from %s', log.debug('"%s" shared session key (%s) for %r from %s',