From ca507126d6b51183b0d189d54de0fc2c725b49cd Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Fri, 28 Oct 2016 10:54:24 +0300 Subject: [PATCH] gpg: use new device package (instead of factory) --- trezor_agent/gpg/__main__.py | 16 +++++------ trezor_agent/gpg/agent.py | 29 +++++++++----------- trezor_agent/gpg/device.py | 52 +++++++++++------------------------- 3 files changed, 36 insertions(+), 61 deletions(-) diff --git a/trezor_agent/gpg/__main__.py b/trezor_agent/gpg/__main__.py index d069eb5..e9b8596 100755 --- a/trezor_agent/gpg/__main__.py +++ b/trezor_agent/gpg/__main__.py @@ -29,10 +29,10 @@ def run_create(args): log.warning('NOTE: in order to re-generate the exact same GPG key later, ' 'run this command with "--time=%d" commandline flag (to set ' 'the timestamp of the GPG key manually).', args.time) - conn = device.HardwareSigner(user_id=args.user_id, - curve_name=args.ecdsa_curve) - verifying_key = conn.pubkey(ecdh=False) - decryption_key = conn.pubkey(ecdh=True) + d = device.HardwareSigner(user_id=args.user_id, + curve_name=args.ecdsa_curve) + verifying_key = d.pubkey(ecdh=False) + decryption_key = d.pubkey(ecdh=True) if key_exists(args.user_id): # add as subkey log.info('adding %s GPG subkey for "%s" to existing key', @@ -48,10 +48,10 @@ def run_create(args): primary_bytes = keyring.export_public_key(args.user_id) result = encode.create_subkey(primary_bytes=primary_bytes, subkey=signing_key, - signer_func=conn.sign) + signer_func=d.sign) result = encode.create_subkey(primary_bytes=result, subkey=encryption_key, - signer_func=conn.sign) + signer_func=d.sign) else: # add as primary log.info('creating new %s GPG primary key for "%s"', args.ecdsa_curve, args.user_id) @@ -66,10 +66,10 @@ def run_create(args): result = encode.create_primary(user_id=args.user_id, pubkey=primary, - signer_func=conn.sign) + signer_func=d.sign) result = encode.create_subkey(primary_bytes=result, subkey=subkey, - signer_func=conn.sign) + signer_func=d.sign) sys.stdout.write(protocol.armor(result, 'PUBLIC KEY BLOCK')) diff --git a/trezor_agent/gpg/agent.py b/trezor_agent/gpg/agent.py index e1d3f9c..24ac3ff 100644 --- a/trezor_agent/gpg/agent.py +++ b/trezor_agent/gpg/agent.py @@ -1,6 +1,5 @@ """GPG-agent utilities.""" import binascii -import contextlib import logging from . import decode, device, keyring, protocol @@ -37,7 +36,6 @@ def sig_encode(r, s): return b'(7:sig-val(5:ecdsa(1:r32:' + r + b')(1:s32:' + s + b')))' -@contextlib.contextmanager def open_connection(keygrip_bytes): """ Connect to the device for the specified keygrip. @@ -54,24 +52,23 @@ def open_connection(keygrip_bytes): ecdh = (pubkey_dict['algo'] == protocol.ECDH_ALGO_ID) conn = device.HardwareSigner(user_id, curve_name=curve_name) - with contextlib.closing(conn): - pubkey = protocol.PublicKey( - curve_name=curve_name, created=pubkey_dict['created'], - verifying_key=conn.pubkey(ecdh=ecdh), ecdh=ecdh) - assert pubkey.key_id() == pubkey_dict['key_id'] - assert pubkey.keygrip == keygrip_bytes - yield conn + pubkey = protocol.PublicKey( + curve_name=curve_name, created=pubkey_dict['created'], + verifying_key=conn.pubkey(ecdh=ecdh), ecdh=ecdh) + assert pubkey.key_id() == pubkey_dict['key_id'] + assert pubkey.keygrip == keygrip_bytes + return conn def pksign(keygrip, digest, algo): """Sign a message digest using a private EC key.""" log.debug('signing %r digest (algo #%s)', digest, algo) keygrip_bytes = binascii.unhexlify(keygrip) - with open_connection(keygrip_bytes) as conn: - r, s = conn.sign(binascii.unhexlify(digest)) - result = sig_encode(r, s) - log.debug('result: %r', result) - return result + conn = open_connection(keygrip_bytes) + r, s = conn.sign(binascii.unhexlify(digest)) + result = sig_encode(r, s) + log.debug('result: %r', result) + return result def _serialize_point(data): @@ -105,8 +102,8 @@ def pkdecrypt(keygrip, conn): remote_pubkey = parse_ecdh(line) keygrip_bytes = binascii.unhexlify(keygrip) - with open_connection(keygrip_bytes) as conn: - return _serialize_point(conn.ecdh(remote_pubkey)) + conn = open_connection(keygrip_bytes) + return _serialize_point(conn.ecdh(remote_pubkey)) def handle_connection(conn): diff --git a/trezor_agent/gpg/device.py b/trezor_agent/gpg/device.py index 123f7f8..04e6dba 100644 --- a/trezor_agent/gpg/device.py +++ b/trezor_agent/gpg/device.py @@ -2,7 +2,7 @@ import logging -from .. import factory, formats, util +from .. import device, formats, util log = logging.getLogger(__name__) @@ -12,55 +12,33 @@ class HardwareSigner(object): def __init__(self, user_id, curve_name): """Connect to the device and retrieve required public key.""" - self.client_wrapper = factory.load() - self.identity = self.client_wrapper.identity_type() - self.identity.proto = 'gpg' - self.identity.host = user_id - self.curve_name = curve_name + self.device = device.detect(identity_str='', + curve_name=curve_name) + self.device.identity_dict['proto'] = 'gpg' + self.device.identity_dict['host'] = user_id self.user_id = user_id def pubkey(self, ecdh=False): """Return public key as VerifyingKey object.""" - addr = util.get_bip32_address(identity=self.identity, ecdh=ecdh) - if ecdh: - curve_name = formats.get_ecdh_curve_name(self.curve_name) - else: - curve_name = self.curve_name - public_node = self.client_wrapper.connection.get_public_node( - n=addr, ecdsa_curve_name=curve_name) - + with self.device: + pubkey = self.device.pubkey(ecdh=ecdh) return formats.decompress_pubkey( - pubkey=public_node.node.public_key, - curve_name=curve_name) + pubkey=pubkey, curve_name=self.device.curve_name) def sign(self, digest): """Sign the digest and return a serialized signature.""" log.info('please confirm GPG signature on %s for "%s"...', - self.client_wrapper.device_name, self.user_id) - if self.curve_name == formats.CURVE_NIST256: + self.device, self.user_id) + if self.device.curve_name == formats.CURVE_NIST256: digest = digest[:32] # sign the first 256 bits log.debug('signing digest: %s', util.hexlify(digest)) - result = self.client_wrapper.connection.sign_identity( - identity=self.identity, - challenge_hidden=digest, - challenge_visual='', - ecdsa_curve_name=self.curve_name) - assert result.signature[:1] == b'\x00' - sig = result.signature[1:] + with self.device: + sig = self.device.sign(blob=digest) return (util.bytes2num(sig[:32]), util.bytes2num(sig[32:])) def ecdh(self, pubkey): """Derive shared secret using ECDH from remote public key.""" log.info('please confirm GPG decryption on %s for "%s"...', - self.client_wrapper.device_name, self.user_id) - result = self.client_wrapper.connection.get_ecdh_session_key( - identity=self.identity, - peer_public_key=pubkey, - ecdsa_curve_name=formats.get_ecdh_curve_name(self.curve_name)) - assert len(result.session_key) in {65, 33} # NIST256 or Curve25519 - assert result.session_key[:1] == b'\x04' - return result.session_key - - def close(self): - """Close the connection to the device.""" - self.client_wrapper.connection.close() + self.device, self.user_id) + with self.device: + return self.device.ecdh(pubkey=pubkey)