diff --git a/trezor_agent/gpg/agent.py b/trezor_agent/gpg/agent.py index 2583d73..6140cf1 100644 --- a/trezor_agent/gpg/agent.py +++ b/trezor_agent/gpg/agent.py @@ -39,12 +39,13 @@ def sig_encode(r, s): def pksign(keygrip, digest, algo): """Sign a message digest using a private EC key.""" assert algo == '8' - pubkey = decode.load_public_key(keyring.export_public_key(user_id=None), - use_custom=True) - f = encode.Factory.from_public_key(pubkey=pubkey) - with contextlib.closing(f): - assert f.pubkey.keygrip == binascii.unhexlify(keygrip) - r, s = f.conn.sign(binascii.unhexlify(digest)) + pubkey_dict = decode.load_public_key( + pubkey_bytes=keyring.export_public_key(user_id=None), + use_custom=True) + pubkey, conn = encode.load_from_public_key(pubkey_dict=pubkey_dict) + with contextlib.closing(conn): + assert pubkey.keygrip == binascii.unhexlify(keygrip) + r, s = conn.sign(binascii.unhexlify(digest)) result = sig_encode(r, s) log.debug('result: %r', result) return result @@ -85,10 +86,10 @@ def pkdecrypt(keygrip, conn): local_pubkey = decode.load_public_key( pubkey_bytes=keyring.export_public_key(user_id=None), use_custom=True) - f = encode.Factory.from_public_key(pubkey=local_pubkey) - with contextlib.closing(f): - assert f.pubkey.keygrip == binascii.unhexlify(keygrip) - shared_secret = f.get_shared_secret(remote_pubkey) + pubkey, conn = encode.load_from_public_key(pubkey_dict=local_pubkey) + with contextlib.closing(conn): + assert pubkey.keygrip == binascii.unhexlify(keygrip) + shared_secret = conn.ecdh(remote_pubkey) assert len(shared_secret) == 65 assert shared_secret[:1] == b'\x04' diff --git a/trezor_agent/gpg/encode.py b/trezor_agent/gpg/encode.py index 91b6f5a..e529dc6 100644 --- a/trezor_agent/gpg/encode.py +++ b/trezor_agent/gpg/encode.py @@ -192,44 +192,20 @@ def sign_message(signer_func, msg, pubkey, sign_time): return proto.packet(tag=2, blob=blob) -class Factory(object): - """Performs GPG signing operations.""" +def load_from_public_key(pubkey_dict): + """Load correct public key from the device.""" + user_id = pubkey_dict['user_id'] + created = pubkey_dict['created'] + curve_name = proto.find_curve_by_algo_id(pubkey_dict['algo']) + assert curve_name in formats.SUPPORTED_CURVES + ecdh = (pubkey_dict['algo'] == proto.ECDH_ALGO_ID) - def __init__(self, user_id, created, curve_name, ecdh=False): - """Construct and loads a public key from the device.""" - self.user_id = user_id - assert curve_name in formats.SUPPORTED_CURVES + conn = HardwareSigner(user_id, curve_name=curve_name) + pubkey = proto.PublicKey( + curve_name=curve_name, created=created, + verifying_key=conn.pubkey(), ecdh=ecdh) + assert pubkey.key_id() == pubkey_dict['key_id'] + log.info('%s created at %s for "%s"', + pubkey, _time_format(pubkey.created), user_id) - self.conn = HardwareSigner(user_id, curve_name=curve_name) - self.pubkey = proto.PublicKey( - curve_name=curve_name, created=created, - verifying_key=self.conn.pubkey(), ecdh=ecdh) - self.ecdh = ecdh - - log.info('%s created at %s for "%s"', - self.pubkey, _time_format(self.pubkey.created), user_id) - - @classmethod - def from_public_key(cls, pubkey): - """Create from an existing GPG public key.""" - f = cls(user_id=pubkey['user_id'], - created=pubkey['created'], - curve_name=proto.find_curve_by_algo_id(pubkey['algo']), - ecdh=(pubkey['algo'] == proto.ECDH_ALGO_ID)) - assert f.pubkey.key_id() == pubkey['key_id'] - return f - - def close(self): - """Close connection and turn off the screen of the device.""" - self.conn.close() - - def sign_message(self, msg, sign_time=None): - """Sign GPG message at specified time.""" - if sign_time is None: - sign_time = int(time.time()) - return sign_message(signer_func=self.conn.sign, pubkey=self.pubkey, - msg=msg, sign_time=sign_time) - - def get_shared_secret(self, pubkey): - """Derive shared secret using ECDH from remote public key.""" - return self.conn.ecdh(pubkey) + return pubkey, conn