From ac2d12b3541244eb463d7a0e1fa5abd3049bb63b Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Fri, 29 Apr 2016 17:45:16 +0300 Subject: [PATCH] It works again! --- trezor_agent/gpg/agent.py | 19 ++-- trezor_agent/gpg/decode.py | 3 + trezor_agent/gpg/encode.py | 175 +++++++++++++++++++++---------------- trezor_agent/gpg/test.sh | 1 + 4 files changed, 115 insertions(+), 83 deletions(-) diff --git a/trezor_agent/gpg/agent.py b/trezor_agent/gpg/agent.py index bc196f2..2e1ad93 100644 --- a/trezor_agent/gpg/agent.py +++ b/trezor_agent/gpg/agent.py @@ -76,6 +76,16 @@ def _parse(s): return _parse_term(s) +def _parse_ecdsa_sig(sig): + data, (algo, (r, sig_r), (s, sig_s)) = sig + assert data == 'sig-val' + assert algo == 'ecdsa' + assert r == 'r' + assert s == 's' + return (util.bytes2num(sig_r), + util.bytes2num(sig_s)) + + def sign(sock, keygrip, digest): """Sign a digest using specified key using GPG agent.""" hash_algo = 8 # SHA256 @@ -105,14 +115,7 @@ def sign(sock, keygrip, digest): sig, leftover = _parse(sig) assert not leftover - - data, (algo, (r, sig_r), (s, sig_s)) = sig - assert data == 'sig-val' - assert algo == 'ecdsa' - assert r == 'r' - assert s == 's' - return (util.bytes2num(sig_r), - util.bytes2num(sig_s)) + return _parse_ecdsa_sig(sig) def get_keygrip(user_id): diff --git a/trezor_agent/gpg/decode.py b/trezor_agent/gpg/decode.py index ebe96b3..fda6e45 100644 --- a/trezor_agent/gpg/decode.py +++ b/trezor_agent/gpg/decode.py @@ -252,6 +252,9 @@ def load_public_key(stream): subkey = subsig = None if len(packets) == 5: pubkey, userid, signature, subkey, subsig = packets + # TODO: refactor this out! + log.debug('subkey: %s', subkey) + log.debug('subsig: %s', subsig) else: pubkey, userid, signature = packets diff --git a/trezor_agent/gpg/encode.py b/trezor_agent/gpg/encode.py index 0ab3df8..b0c2a04 100644 --- a/trezor_agent/gpg/encode.py +++ b/trezor_agent/gpg/encode.py @@ -135,6 +135,7 @@ class AgentSigner(object): return self.public_key['verifying_key'] def sign(self, digest, visual): + log.info('signing %r using gpg-agent', visual) r, s = agent.sign(sock=self.sock, keygrip=self.keygrip, digest=digest) return mpi(r) + mpi(s) @@ -142,37 +143,14 @@ class AgentSigner(object): self.sock.close() -class Signer(object): - """Performs GPG signing operations.""" - - def __init__(self, user_id, created, curve_name): - """Construct and loads a public key from the device.""" - self.user_id = user_id - assert curve_name in formats.SUPPORTED_CURVES - - self.conn = HardwareSigner(user_id, curve_name=curve_name) - self.verifying_key = self.conn.pubkey() - +class PublicKey(object): + def __init__(self, curve_name, created, verifying_key): + self.curve_name = str(curve_name) self.created = int(created) - log.info('%s GPG public key %s created at %s', curve_name, - self.hex_short_key_id(), util.time_format(self.created)) - - @classmethod - def from_public_key(cls, pubkey, user_id): - """ - Create from an existing GPG public key. - - `pubkey` should be loaded via `decode.load_from_gpg(user_id)` - from the local GPG keyring. - """ - s = Signer(user_id=user_id, - created=pubkey['created'], - curve_name=_find_curve_by_algo_id(pubkey['algo'])) - assert s.key_id() == pubkey['key_id'] - return s + self.verifying_key = verifying_key def _pubkey_data(self): - curve_info = SUPPORTED_CURVES[self.conn.curve_name] + curve_info = SUPPORTED_CURVES[self.curve_name] header = struct.pack('>BLB', 4, # version self.created, # creation @@ -195,63 +173,108 @@ class Signer(object): """Short (8 hexadecimal digits) GPG key ID.""" return util.hexlify(self.key_id()[-4:]) + +class Signer(object): + """Performs GPG signing operations.""" + + def __init__(self, user_id, created, curve_name): + """Construct and loads a public key from the device.""" + self.user_id = user_id + assert curve_name in formats.SUPPORTED_CURVES + + self.conn = HardwareSigner(user_id, curve_name=curve_name) + self.pubkey = PublicKey( + curve_name=curve_name, created=created, + verifying_key=self.conn.pubkey()) + + log.info('%s GPG public key %s created at %s', curve_name, + self.pubkey.hex_short_key_id(), util.time_format(self.pubkey.created)) + + @classmethod + def from_public_key(cls, pubkey, user_id): + """ + Create from an existing GPG public key. + + `pubkey` should be loaded via `decode.load_from_gpg(user_id)` + from the local GPG keyring. + """ + s = Signer(user_id=user_id, + created=pubkey['created'], + curve_name=_find_curve_by_algo_id(pubkey['algo'])) + assert s.pubkey.key_id() == pubkey['key_id'] + return s + def close(self): """Close connection and turn off the screen of the device.""" self.conn.close() def export(self): """Export GPG public key, ready for "gpg2 --import".""" - pubkey_packet = packet(tag=6, blob=self._pubkey_data()) + pubkey_packet = packet(tag=6, blob=self.pubkey._pubkey_data()) user_id_packet = packet(tag=13, blob=self.user_id) - data_to_sign = (self._pubkey_data_to_hash() + + data_to_sign = (self.pubkey._pubkey_data_to_hash() + user_id_packet[:1] + util.prefix_len('>L', self.user_id)) log.info('signing public key "%s"', self.user_id) hashed_subpackets = [ - subpacket_time(self.created), # signature creaion time + subpacket_time(self.pubkey.created), # signature creaion time subpacket_byte(0x1B, 1 | 2), # key flags (certify & sign) subpacket_byte(0x15, 8), # preferred hash (SHA256) subpacket_byte(0x16, 0), # preferred compression (none) subpacket_byte(0x17, 0x80)] # key server prefs (no-modify) - signature = self._make_signature(visual=self.hex_short_key_id(), - data_to_sign=data_to_sign, - sig_type=0x13, # user id & public key - hashed_subpackets=hashed_subpackets) + unhashed_subpackets = [ + subpacket(16, self.pubkey.key_id())] # issuer key id + + signature = _make_signature( + pubkey=self.pubkey, conn=self.conn, + data_to_sign=data_to_sign, + sig_type=0x13, # user id & public key + hashed_subpackets=hashed_subpackets, + unhashed_subpackets=unhashed_subpackets) sign_packet = packet(tag=2, blob=signature) return pubkey_packet + user_id_packet + sign_packet def subkey(self, user_id): - subkey_packet = packet(tag=14, blob=self._pubkey_data()) + subkey_packet = packet(tag=14, blob=self.pubkey._pubkey_data()) primary = decode.load_from_gpg(user_id) keygrip = agent.get_keygrip(user_id) log.info('adding as subkey to %s (%s)', user_id, keygrip) - data_to_sign = primary['_to_hash'] + self._pubkey_data_to_hash() + data_to_sign = primary['_to_hash'] + self.pubkey._pubkey_data_to_hash() hashed_subpackets = [ - subpacket_time(self.created)] # signature creaion time - back_sign = self._make_signature(visual='Add subkey', - data_to_sign=data_to_sign, - sig_type=0x19, # Primary Key Binding Signature - hashed_subpackets=hashed_subpackets) + subpacket_time(self.pubkey.created)] # signature creaion time + unhashed_subpackets = [ + subpacket(16, self.pubkey.key_id())] # issuer key id + + # Primary Key Binding Signature + back_sign = _make_signature(pubkey=self.pubkey, conn=self.conn, + data_to_sign=data_to_sign, + sig_type=0x19, + hashed_subpackets=hashed_subpackets, + unhashed_subpackets=unhashed_subpackets) log.info('back_sign: %r', back_sign) hashed_subpackets = [ - subpacket_time(self.created), # signature creaion time + subpacket_time(self.pubkey.created), # signature creaion time subpacket_byte(0x1B, 2)] # key flags (certify & sign) + unhashed_subpackets = [ + subpacket(16, primary['key_id']), # issuer key id + subpacket(32, back_sign)] + _conn = self.conn self.conn = AgentSigner(user_id, curve_name=formats.CURVE_NIST256) - self.key_id = lambda: primary['key_id'] - signature = self._make_signature(visual='Add subkey', - data_to_sign=data_to_sign, - sig_type=0x18, # Subkey Binding Signature - hashed_subpackets=hashed_subpackets, - unhashed=[subpacket(32, bytes(back_sign))]) + + # Subkey Binding Signature + signature = _make_signature(pubkey=self.pubkey, conn=self.conn, + data_to_sign=data_to_sign, + sig_type=0x18, + hashed_subpackets=hashed_subpackets, + unhashed_subpackets=unhashed_subpackets) self.conn = _conn sign_packet = packet(tag=2, blob=signature) return subkey_packet + sign_packet - def sign(self, msg, sign_time=None): """Sign GPG message at specified time.""" if sign_time is None: @@ -260,36 +283,38 @@ class Signer(object): log.info('signing %d byte message at %s', len(msg), util.time_format(sign_time)) hashed_subpackets = [subpacket_time(sign_time)] - blob = self._make_signature( - visual=self.hex_short_key_id(), - data_to_sign=msg, hashed_subpackets=hashed_subpackets) + unhashed_subpackets = [ + subpacket(16, self.pubkey.key_id())] # issuer key id + + blob = _make_signature( + pubkey=self.pubkey, conn=self.conn, data_to_sign=msg, + hashed_subpackets=hashed_subpackets, + unhashed_subpackets=unhashed_subpackets) return packet(tag=2, blob=blob) - def _make_signature(self, visual, data_to_sign, - hashed_subpackets, sig_type=0, unhashed=()): - curve_info = SUPPORTED_CURVES[self.conn.curve_name] - header = struct.pack('>BBBB', - 4, # version - sig_type, # rfc4880 (section-5.2.1) - curve_info['algo_id'], - 8) # hash_alg (SHA256) - hashed = subpackets(*hashed_subpackets) - log.info('key_id: %s', util.hexlify(self.key_id())) - unhashed = subpackets( - subpacket(16, self.key_id()), # issuer key id - *unhashed - ) - tail = b'\x04\xff' + struct.pack('>L', len(header) + len(hashed)) - data_to_hash = data_to_sign + header + hashed + tail - log.debug('hashing %d bytes', len(data_to_hash)) - digest = hashlib.sha256(data_to_hash).digest() +def _make_signature(pubkey, conn, data_to_sign, + hashed_subpackets, unhashed_subpackets, sig_type=0): + curve_info = SUPPORTED_CURVES[pubkey.curve_name] + header = struct.pack('>BBBB', + 4, # version + sig_type, # rfc4880 (section-5.2.1) + curve_info['algo_id'], + 8) # hash_alg (SHA256) + hashed = subpackets(*hashed_subpackets) + unhashed = subpackets(*unhashed_subpackets) + tail = b'\x04\xff' + struct.pack('>L', len(header) + len(hashed)) + data_to_hash = data_to_sign + header + hashed + tail - sig = self.conn.sign(digest=digest, visual=visual) + log.debug('hashing %d bytes', len(data_to_hash)) + digest = hashlib.sha256(data_to_hash).digest() - return (header + hashed + unhashed + - digest[:2] + # used for decoder's sanity check - sig) # actual ECDSA signature + visual = pubkey.hex_short_key_id() + sig = conn.sign(digest=digest, visual=visual) + + return bytes(header + hashed + unhashed + + digest[:2] + # used for decoder's sanity check + sig) # actual ECDSA signature def _split_lines(body, size): diff --git a/trezor_agent/gpg/test.sh b/trezor_agent/gpg/test.sh index 9a5618f..6a31a35 100644 --- a/trezor_agent/gpg/test.sh +++ b/trezor_agent/gpg/test.sh @@ -5,6 +5,7 @@ gpg2 --full-gen-key --expert gpg2 --export > romanz.pub NOW=`date +%s` trezor-gpg -t $NOW "romanz" -o subkey.pub +gpg2 -K gpg2 -vv --import <(cat romanz.pub subkey.pub) gpg2 -K