diff --git a/trezor_agent/gpg/debug_subkeys.py b/trezor_agent/gpg/debug_subkeys.py index 28a0570..a9aff80 100644 --- a/trezor_agent/gpg/debug_subkeys.py +++ b/trezor_agent/gpg/debug_subkeys.py @@ -1,10 +1,7 @@ #!/usr/bin/env python """Check GPG v2 signature for a given public key.""" import argparse -import base64 -import io import logging -import pprint from . import decode from .. import util @@ -21,7 +18,7 @@ def main(): logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO, format='%(asctime)s %(levelname)-10s %(message)s') stream = open(args.pubkey, 'rb') - parser = decode.Parser(util.Reader(stream)) + parser = decode.parse_packets(util.Reader(stream)) pubkey, userid, sig1, subkey, sig2 = parser digest = decode.digest_packets([pubkey, userid, sig1]) diff --git a/trezor_agent/gpg/decode.py b/trezor_agent/gpg/decode.py index e1f6592..021b4b7 100644 --- a/trezor_agent/gpg/decode.py +++ b/trezor_agent/gpg/decode.py @@ -74,146 +74,146 @@ SUPPORTED_CURVES = { } -class Parser(object): - """Parse GPG packets from a given stream.""" - - def __init__(self, stream): - """Create an empty parser.""" - self.stream = stream - self.packet_types = { - 2: self.signature, - 6: self.pubkey, - 11: self.literal, - 13: self.user_id, - 14: self.subkey, - } - - def __iter__(self): - """Support iterative parsing of available GPG packets.""" - return self - - def literal(self, stream): - """See https://tools.ietf.org/html/rfc4880#section-5.9 for details.""" - p = {'type': 'literal'} - p['format'] = stream.readfmt('c') - filename_len = stream.readfmt('B') - p['filename'] = stream.read(filename_len) - p['date'] = stream.readfmt('>L') - p['content'] = stream.read() - p['_to_hash'] = p['content'] - return p - - def _embedded_signatures(self, subpackets): - for packet in subpackets: - data = bytearray(packet) - if data[0] == 32: - # https://tools.ietf.org/html/rfc4880#section-5.2.3.26 - stream = io.BytesIO(data[1:]) - yield self.signature(util.Reader(stream)) +def _parse_literal(stream): + """See https://tools.ietf.org/html/rfc4880#section-5.9 for details.""" + p = {'type': 'literal'} + p['format'] = stream.readfmt('c') + filename_len = stream.readfmt('B') + p['filename'] = stream.read(filename_len) + p['date'] = stream.readfmt('>L') + p['content'] = stream.read() + p['_to_hash'] = p['content'] + return p - def signature(self, stream): - """See https://tools.ietf.org/html/rfc4880#section-5.2 for details.""" - p = {'type': 'signature'} +def _parse_embedded_signatures(subpackets): + for packet in subpackets: + data = bytearray(packet) + if data[0] == 32: + # https://tools.ietf.org/html/rfc4880#section-5.2.3.26 + stream = io.BytesIO(data[1:]) + yield _parse_signature(util.Reader(stream)) - to_hash = io.BytesIO() - with stream.capture(to_hash): - p['version'] = stream.readfmt('B') - p['sig_type'] = stream.readfmt('B') - p['pubkey_alg'] = stream.readfmt('B') - p['hash_alg'] = stream.readfmt('B') - p['hashed_subpackets'] = parse_subpackets(stream) - # https://tools.ietf.org/html/rfc4880#section-5.2.4 - tail_to_hash = b'\x04\xff' + struct.pack('>L', to_hash.tell()) +def _parse_signature(stream): + """See https://tools.ietf.org/html/rfc4880#section-5.2 for details.""" + p = {'type': 'signature'} - p['_to_hash'] = to_hash.getvalue() + tail_to_hash + to_hash = io.BytesIO() + with stream.capture(to_hash): + p['version'] = stream.readfmt('B') + p['sig_type'] = stream.readfmt('B') + p['pubkey_alg'] = stream.readfmt('B') + p['hash_alg'] = stream.readfmt('B') + p['hashed_subpackets'] = parse_subpackets(stream) - p['unhashed_subpackets'] = parse_subpackets(stream) - embedded = list(self._embedded_signatures(p['unhashed_subpackets'])) - if embedded: - p['embedded'] = embedded + # https://tools.ietf.org/html/rfc4880#section-5.2.4 + tail_to_hash = b'\x04\xff' + struct.pack('>L', to_hash.tell()) - p['hash_prefix'] = stream.readfmt('2s') - p['sig'] = (parse_mpi(stream), parse_mpi(stream)) + p['_to_hash'] = to_hash.getvalue() + tail_to_hash + + p['unhashed_subpackets'] = parse_subpackets(stream) + embedded = list(_parse_embedded_signatures(p['unhashed_subpackets'])) + if embedded: + p['embedded'] = embedded + + p['hash_prefix'] = stream.readfmt('2s') + p['sig'] = (parse_mpi(stream), parse_mpi(stream)) + assert not stream.read() + return p + + +def _parse_pubkey(stream): + """See https://tools.ietf.org/html/rfc4880#section-5.5 for details.""" + p = {'type': 'pubkey'} + packet = io.BytesIO() + with stream.capture(packet): + p['version'] = stream.readfmt('B') + p['created'] = stream.readfmt('>L') + p['algo'] = stream.readfmt('B') + + # https://tools.ietf.org/html/rfc6637#section-11 + oid_size = stream.readfmt('B') + oid = stream.read(oid_size) + assert oid in SUPPORTED_CURVES + parser = SUPPORTED_CURVES[oid] + + mpi = parse_mpi(stream) + log.debug('mpi: %x (%d bits)', mpi, mpi.bit_length()) + p['verifier'] = parser(mpi) assert not stream.read() - return p + # https://tools.ietf.org/html/rfc4880#section-12.2 + packet_data = packet.getvalue() + data_to_hash = (b'\x99' + struct.pack('>H', len(packet_data)) + + packet_data) + p['key_id'] = hashlib.sha1(data_to_hash).digest()[-8:] + p['_to_hash'] = data_to_hash + log.debug('key ID: %s', util.hexlify(p['key_id'])) + return p - def pubkey(self, stream): - """See https://tools.ietf.org/html/rfc4880#section-5.5 for details.""" - p = {'type': 'pubkey'} - packet = io.BytesIO() - with stream.capture(packet): - p['version'] = stream.readfmt('B') - p['created'] = stream.readfmt('>L') - p['algo'] = stream.readfmt('B') - # https://tools.ietf.org/html/rfc6637#section-11 - oid_size = stream.readfmt('B') - oid = stream.read(oid_size) - assert oid in SUPPORTED_CURVES - parser = SUPPORTED_CURVES[oid] +def _parse_subkey(stream): + """See https://tools.ietf.org/html/rfc4880#section-5.5 for details.""" + p = {'type': 'subkey'} + packet = io.BytesIO() + with stream.capture(packet): + p['version'] = stream.readfmt('B') + p['created'] = stream.readfmt('>L') + p['algo'] = stream.readfmt('B') - mpi = parse_mpi(stream) - log.debug('mpi: %x (%d bits)', mpi, mpi.bit_length()) - p['verifier'] = parser(mpi) - assert not stream.read() + # https://tools.ietf.org/html/rfc6637#section-11 + oid_size = stream.readfmt('B') + oid = stream.read(oid_size) + assert oid in SUPPORTED_CURVES + parser = SUPPORTED_CURVES[oid] - # https://tools.ietf.org/html/rfc4880#section-12.2 - packet_data = packet.getvalue() - data_to_hash = (b'\x99' + struct.pack('>H', len(packet_data)) + - packet_data) - p['key_id'] = hashlib.sha1(data_to_hash).digest()[-8:] - p['_to_hash'] = data_to_hash - log.debug('key ID: %s', util.hexlify(p['key_id'])) - return p + mpi = parse_mpi(stream) + log.debug('mpi: %x (%d bits)', mpi, mpi.bit_length()) + p['verifier'] = parser(mpi) + leftover = stream.read() # TBD: what is this? + if leftover: + log.warning('unexpected subkey leftover: %r', leftover) - def subkey(self, stream): - """See https://tools.ietf.org/html/rfc4880#section-5.5 for details.""" - p = {'type': 'subkey'} - packet = io.BytesIO() - with stream.capture(packet): - p['version'] = stream.readfmt('B') - p['created'] = stream.readfmt('>L') - p['algo'] = stream.readfmt('B') + # https://tools.ietf.org/html/rfc4880#section-12.2 + packet_data = packet.getvalue() + data_to_hash = (b'\x99' + struct.pack('>H', len(packet_data)) + + packet_data) + p['key_id'] = hashlib.sha1(data_to_hash).digest()[-8:] + p['_to_hash'] = data_to_hash + log.debug('key ID: %s', util.hexlify(p['key_id'])) + return p - # https://tools.ietf.org/html/rfc6637#section-11 - oid_size = stream.readfmt('B') - oid = stream.read(oid_size) - assert oid in SUPPORTED_CURVES - parser = SUPPORTED_CURVES[oid] - mpi = parse_mpi(stream) - log.debug('mpi: %x (%d bits)', mpi, mpi.bit_length()) - p['verifier'] = parser(mpi) - leftover = stream.read() # TBD: what is this? - if leftover: - log.warning('unexpected subkey leftover: %r', leftover) +def _parse_user_id(stream): + """See https://tools.ietf.org/html/rfc4880#section-5.11 for details.""" + value = stream.read() + to_hash = b'\xb4' + util.prefix_len('>L', value) + return {'type': 'user_id', 'value': value, '_to_hash': to_hash} - # https://tools.ietf.org/html/rfc4880#section-12.2 - packet_data = packet.getvalue() - data_to_hash = (b'\x99' + struct.pack('>H', len(packet_data)) + - packet_data) - p['key_id'] = hashlib.sha1(data_to_hash).digest()[-8:] - p['_to_hash'] = data_to_hash - log.debug('key ID: %s', util.hexlify(p['key_id'])) - return p - def user_id(self, stream): - """See https://tools.ietf.org/html/rfc4880#section-5.11 for details.""" - value = stream.read() - to_hash = b'\xb4' + util.prefix_len('>L', value) - return {'type': 'user_id', 'value': value, '_to_hash': to_hash} +PACKET_TYPES = { + 2: _parse_signature, + 6: _parse_pubkey, + 11: _parse_literal, + 13: _parse_user_id, + 14: _parse_subkey, +} - def __next__(self): - """See https://tools.ietf.org/html/rfc4880#section-4.2 for details.""" + +def parse_packets(stream): + """ + Support iterative parsing of available GPG packets. + + See https://tools.ietf.org/html/rfc4880#section-4.2 for details. + """ + while True: try: - value = self.stream.readfmt('B') + value = stream.readfmt('B') except EOFError: - raise StopIteration + return - log.debug('prefix byte: %02x', value) + log.debug('prefix byte: %s', bin(value)) assert util.bit(value, 7) == 1 assert util.bit(value, 6) == 0 # new format not supported yet @@ -222,22 +222,24 @@ class Parser(object): tag = tag >> 2 fmt = {0: '>B', 1: '>H', 2: '>L'}[length_type] log.debug('length_type: %s', fmt) - packet_size = self.stream.readfmt(fmt) + packet_size = stream.readfmt(fmt) + log.debug('packet length: %d', packet_size) - packet_data = self.stream.read(packet_size) - packet_type = self.packet_types.get(tag) + packet_data = stream.read(packet_size) + packet_type = PACKET_TYPES.get(tag) + if packet_type: p = packet_type(util.Reader(io.BytesIO(packet_data))) else: raise ValueError('Unknown packet type: {}'.format(tag)) + p['tag'] = tag log.debug('packet "%s": %s', p['type'], p) - return p - - next = __next__ + yield p def digest_packets(packets): + """Compute digest on specified packets, according to '_to_hash' field.""" data_to_hash = io.BytesIO() for p in packets: data_to_hash.write(p['_to_hash']) @@ -246,8 +248,8 @@ def digest_packets(packets): def load_public_key(stream): """Parse and validate GPG public key from an input stream.""" - parser = Parser(util.Reader(stream)) - pubkey, userid, signature = list(parser) + packets = list(parse_packets(util.Reader(stream))) + pubkey, userid, signature = packets digest = digest_packets([pubkey, userid, signature]) assert signature['hash_prefix'] == digest[:2] log.debug('loaded public key "%s"', userid['value']) @@ -257,8 +259,8 @@ def load_public_key(stream): def load_signature(stream, original_data): - parser = Parser(util.Reader(stream)) - signature, = parser + """Load signature from stream, and compute GPG digest for verification.""" + signature, = list(parse_packets(util.Reader(stream))) digest = digest_packets([{'_to_hash': original_data}, signature]) assert signature['hash_prefix'] == digest[:2] return signature, digest