diff --git a/trezor_agent/gpg/agent.py b/trezor_agent/gpg/agent.py index c3e3156..9011fae 100644 --- a/trezor_agent/gpg/agent.py +++ b/trezor_agent/gpg/agent.py @@ -2,6 +2,7 @@ import binascii import contextlib import logging +import os from . import decode, encode, keyring from .. import util @@ -39,8 +40,9 @@ def sig_encode(r, s): def pksign(keygrip, digest, algo): """Sign a message digest using a private EC key.""" assert algo == '8' + user_id = os.environ['TREZOR_GPG_USER_ID'] pubkey_dict = decode.load_public_key( - pubkey_bytes=keyring.export_public_key(user_id=None), + pubkey_bytes=keyring.export_public_key(user_id=user_id), use_custom=True, ecdh=False) pubkey, conn = encode.load_from_public_key(pubkey_dict=pubkey_dict) with contextlib.closing(conn): @@ -83,8 +85,9 @@ def pkdecrypt(keygrip, conn): assert keyring.recvline(conn) == b'END' remote_pubkey = parse_ecdh(line) + user_id = os.environ['TREZOR_GPG_USER_ID'] local_pubkey = decode.load_public_key( - pubkey_bytes=keyring.export_public_key(user_id=None), + pubkey_bytes=keyring.export_public_key(user_id=user_id), use_custom=True, ecdh=True) pubkey, conn = encode.load_from_public_key(pubkey_dict=local_pubkey) with contextlib.closing(conn): diff --git a/trezor_agent/gpg/decode.py b/trezor_agent/gpg/decode.py index 13f2b36..42c56ef 100644 --- a/trezor_agent/gpg/decode.py +++ b/trezor_agent/gpg/decode.py @@ -241,31 +241,62 @@ def parse_packets(stream): log.debug('prefix byte: %s', bin(value)) assert util.bit(value, 7) == 1 - assert util.bit(value, 6) == 0 # new format not supported yet tag = util.low_bits(value, 6) - length_type = util.low_bits(tag, 2) - tag = tag >> 2 - fmt = {0: '>B', 1: '>H', 2: '>L'}[length_type] - packet_size = stream.readfmt(fmt) + if util.bit(value, 6) == 0: + length_type = util.low_bits(tag, 2) + tag = tag >> 2 + fmt = {0: '>B', 1: '>H', 2: '>L'}[length_type] + packet_size = stream.readfmt(fmt) + else: + first = stream.readfmt('B') + if first < 192: + packet_size = first + elif first < 224: + packet_size = ((first - 192) << 8) + stream.readfmt('B') + 192 + elif first == 255: + packet_size = stream.readfmt('>L') + else: + log.error('Partial Body Lengths unsupported') log.debug('packet length: %d', packet_size) packet_data = stream.read(packet_size) packet_type = PACKET_TYPES.get(tag) - assert packet_type is not None, tag - p = packet_type(util.Reader(io.BytesIO(packet_data))) - p['tag'] = tag + if packet_type is not None: + p = packet_type(util.Reader(io.BytesIO(packet_data))) + p['tag'] = tag + else: + p = {'type': 'unknown', 'tag': tag, 'raw': packet_data} + log.debug('packet "%s": %s', p['type'], p) yield p -def digest_packets(packets): +def digest_packets(packets, hashalg): """Compute digest on specified packets, according to '_to_hash' field.""" data_to_hash = io.BytesIO() for p in packets: data_to_hash.write(p['_to_hash']) - return hashlib.sha256(data_to_hash.getvalue()).digest() + return hashalg(data_to_hash.getvalue()).digest() + + +def _hash_ripemd160(msg): + """Wrapper for ripemd160 hash.""" + h = hashlib.new("ripemd160") + h.update(msg) + return h + + +HASH_ALGORITHMS = { + 1: hashlib.md5, + 2: hashlib.sha1, + 3: _hash_ripemd160, + 8: hashlib.sha256, + 9: hashlib.sha384, + 10: hashlib.sha512, + 11: hashlib.sha224, +} def load_public_key(pubkey_bytes, use_custom=False, ecdh=False): @@ -275,10 +306,13 @@ def load_public_key(pubkey_bytes, use_custom=False, ecdh=False): pubkey, userid, signature = packets[:3] packets = packets[3:] - digest = digest_packets([pubkey, userid, signature]) - assert signature['hash_prefix'] == digest[:2] + hashalg = HASH_ALGORITHMS.get(signature['hash_alg']) + if hashalg is not None: + digest = digest_packets([pubkey, userid, signature], hashalg) + assert signature['hash_prefix'] == digest[:2] + log.debug('loaded public key "%s"', userid['value']) - if pubkey.get('verifier'): + if hashalg is not None and pubkey.get('verifier'): verify_digest(pubkey=pubkey, digest=digest, signature=signature['sig'], label='GPG public key') else: @@ -292,6 +326,8 @@ def load_public_key(pubkey_bytes, use_custom=False, ecdh=False): log.debug('found custom %s', packet['type']) break + while packets[1]['type'] != 'signature': + packets = packets[1:] packet, signature = packets[:2] packets = packets[2:] @@ -303,7 +339,9 @@ def load_public_key(pubkey_bytes, use_custom=False, ecdh=False): def load_signature(stream, original_data): """Load signature from stream, and compute GPG digest for verification.""" signature, = list(parse_packets(util.Reader(stream))) - digest = digest_packets([{'_to_hash': original_data}, signature]) + hashalg = HASH_ALGORITHMS.get(signature['hash_alg']) + assert hashalg is not None + digest = digest_packets([{'_to_hash': original_data}, signature], hashalg) assert signature['hash_prefix'] == digest[:2] return signature, digest diff --git a/trezor_agent/gpg/tests/test_decode.py b/trezor_agent/gpg/tests/test_decode.py index bbfca64..09355a1 100644 --- a/trezor_agent/gpg/tests/test_decode.py +++ b/trezor_agent/gpg/tests/test_decode.py @@ -1,3 +1,4 @@ +import hashlib import io import pytest @@ -64,7 +65,7 @@ zpR9luXTKwMEl+mlZmwEFKZXBmir '_to_hash': b'\x04\x13\x13\x08\x00\x12\x05\x02WHH\xd6\x02\x1b\x03\x02\x15\x08\x02\x16\x00\x02\x17\x80\x04\xff\x00\x00\x00\x18' # nopep8 }) - digest = decode.digest_packets(packets=[pubkey, user_id, signature]) + digest = decode.digest_packets(packets=[pubkey, user_id, signature], hashalg=hashlib.sha256) decode.verify_digest(pubkey=pubkey, digest=digest, signature=signature['sig'], label='GPG primary public key')