diff --git a/trezor_agent/gpg/__main__.py b/trezor_agent/gpg/__main__.py index 6c64120..3a170d5 100755 --- a/trezor_agent/gpg/__main__.py +++ b/trezor_agent/gpg/__main__.py @@ -7,7 +7,7 @@ import os import sys import time -from . import agent, encode, keyring, proto +from . import agent, encode, keyring, protocol from .. import server log = logging.getLogger(__name__) @@ -24,11 +24,11 @@ def run_create(args): if args.subkey: primary_bytes = keyring.export_public_key(user_id=user_id) # subkey for signing - signing_key = proto.PublicKey( + signing_key = protocol.PublicKey( curve_name=args.ecdsa_curve, created=args.time, verifying_key=verifying_key, ecdh=False) # subkey for encryption - encryption_key = proto.PublicKey( + encryption_key = protocol.PublicKey( curve_name=args.ecdsa_curve, created=args.time, verifying_key=decryption_key, ecdh=True) result = encode.create_subkey(primary_bytes=primary_bytes, @@ -39,11 +39,11 @@ def run_create(args): signer_func=conn.sign) else: # primary key for signing - primary = proto.PublicKey( + primary = protocol.PublicKey( curve_name=args.ecdsa_curve, created=args.time, verifying_key=verifying_key, ecdh=False) # subkey for encryption - subkey = proto.PublicKey( + subkey = protocol.PublicKey( curve_name=args.ecdsa_curve, created=args.time, verifying_key=decryption_key, ecdh=True) @@ -54,7 +54,7 @@ def run_create(args): pubkey=subkey, signer_func=conn.sign) - sys.stdout.write(proto.armor(result, 'PUBLIC KEY BLOCK')) + sys.stdout.write(protocol.armor(result, 'PUBLIC KEY BLOCK')) def run_agent(args): diff --git a/trezor_agent/gpg/decode.py b/trezor_agent/gpg/decode.py index 3a7969a..d69050c 100644 --- a/trezor_agent/gpg/decode.py +++ b/trezor_agent/gpg/decode.py @@ -10,7 +10,7 @@ import struct import ecdsa import ed25519 -from . import proto +from . import protocol from .. import util log = logging.getLogger(__name__) @@ -131,7 +131,7 @@ def _parse_signature(stream): log.debug('embedded sigs: %s', embedded) p['embedded'] = embedded - p['_is_custom'] = (proto.CUSTOM_SUBPACKET in p['unhashed_subpackets']) + p['_is_custom'] = (protocol.CUSTOM_SUBPACKET in p['unhashed_subpackets']) p['hash_prefix'] = stream.readfmt('2s') if p['pubkey_alg'] in ECDSA_ALGO_IDS: @@ -330,7 +330,7 @@ def load_public_key(pubkey_bytes, use_custom=False, ecdh=False): packet = pubkey while use_custom: if packet['type'] in ('pubkey', 'subkey') and signature['_is_custom']: - if ecdh == (packet['algo'] == proto.ECDH_ALGO_ID): + if ecdh == (packet['algo'] == protocol.ECDH_ALGO_ID): log.debug('found custom %s', packet['type']) break diff --git a/trezor_agent/gpg/encode.py b/trezor_agent/gpg/encode.py index a3ba11f..2287cc8 100644 --- a/trezor_agent/gpg/encode.py +++ b/trezor_agent/gpg/encode.py @@ -2,7 +2,7 @@ import logging import time -from . import decode, keyring, proto +from . import decode, keyring, protocol from .. import client, factory, formats, util log = logging.getLogger(__name__) @@ -80,34 +80,34 @@ def _time_format(t): def create_primary(user_id, pubkey, signer_func): """Export new primary GPG public key, ready for "gpg2 --import".""" - pubkey_packet = proto.packet(tag=6, blob=pubkey.data()) - user_id_packet = proto.packet(tag=13, - blob=user_id.encode('ascii')) + pubkey_packet = protocol.packet(tag=6, blob=pubkey.data()) + user_id_packet = protocol.packet(tag=13, + blob=user_id.encode('ascii')) data_to_sign = (pubkey.data_to_hash() + user_id_packet[:1] + util.prefix_len('>L', user_id.encode('ascii'))) log.info('creating primary GPG key "%s"', user_id) hashed_subpackets = [ - proto.subpacket_time(pubkey.created), # signature time + protocol.subpacket_time(pubkey.created), # signature time # https://tools.ietf.org/html/rfc4880#section-5.2.3.7 - proto.subpacket_byte(0x0B, 9), # preferred symmetric algo (AES-256) + protocol.subpacket_byte(0x0B, 9), # preferred symmetric algo (AES-256) # https://tools.ietf.org/html/rfc4880#section-5.2.3.4 - proto.subpacket_byte(0x1B, 1 | 2), # key flags (certify & sign) + protocol.subpacket_byte(0x1B, 1 | 2), # key flags (certify & sign) # https://tools.ietf.org/html/rfc4880#section-5.2.3.21 - proto.subpacket_byte(0x15, 8), # preferred hash (SHA256) + protocol.subpacket_byte(0x15, 8), # preferred hash (SHA256) # https://tools.ietf.org/html/rfc4880#section-5.2.3.8 - proto.subpacket_byte(0x16, 0), # preferred compression (none) + protocol.subpacket_byte(0x16, 0), # preferred compression (none) # https://tools.ietf.org/html/rfc4880#section-5.2.3.9 - proto.subpacket_byte(0x17, 0x80) # key server prefs (no-modify) + protocol.subpacket_byte(0x17, 0x80) # key server prefs (no-modify) # https://tools.ietf.org/html/rfc4880#section-5.2.3.17 ] unhashed_subpackets = [ - proto.subpacket(16, pubkey.key_id()), # issuer key id - proto.CUSTOM_SUBPACKET] + protocol.subpacket(16, pubkey.key_id()), # issuer key id + protocol.CUSTOM_SUBPACKET] log.info('confirm signing with primary key') - signature = proto.make_signature( + signature = protocol.make_signature( signer_func=signer_func, public_algo=pubkey.algo_id, data_to_sign=data_to_sign, @@ -115,13 +115,13 @@ def create_primary(user_id, pubkey, signer_func): hashed_subpackets=hashed_subpackets, unhashed_subpackets=unhashed_subpackets) - sign_packet = proto.packet(tag=2, blob=signature) + sign_packet = protocol.packet(tag=2, blob=signature) return pubkey_packet + user_id_packet + sign_packet def create_subkey(primary_bytes, pubkey, signer_func): """Export new subkey to GPG primary key.""" - subkey_packet = proto.packet(tag=14, blob=pubkey.data()) + subkey_packet = protocol.packet(tag=14, blob=pubkey.data()) primary = decode.load_public_key(primary_bytes) log.info('adding subkey to primary GPG key "%s"', primary['user_id']) data_to_sign = primary['_to_hash'] + pubkey.data_to_hash() @@ -131,11 +131,11 @@ def create_subkey(primary_bytes, pubkey, signer_func): else: # Primary Key Binding Signature hashed_subpackets = [ - proto.subpacket_time(pubkey.created)] # signature time + protocol.subpacket_time(pubkey.created)] # signature time unhashed_subpackets = [ - proto.subpacket(16, pubkey.key_id())] # issuer key id + protocol.subpacket(16, pubkey.key_id())] # issuer key id log.info('confirm signing with new subkey') - embedded_sig = proto.make_signature( + embedded_sig = protocol.make_signature( signer_func=signer_func, data_to_sign=data_to_sign, public_algo=pubkey.algo_id, @@ -150,27 +150,27 @@ def create_subkey(primary_bytes, pubkey, signer_func): flags = (2) if (not pubkey.ecdh) else (4 | 8) hashed_subpackets = [ - proto.subpacket_time(pubkey.created), # signature time - proto.subpacket_byte(0x1B, flags)] + protocol.subpacket_time(pubkey.created), # signature time + protocol.subpacket_byte(0x1B, flags)] unhashed_subpackets = [] - unhashed_subpackets.append(proto.subpacket(16, primary['key_id'])) + unhashed_subpackets.append(protocol.subpacket(16, primary['key_id'])) if embedded_sig is not None: - unhashed_subpackets.append(proto.subpacket(32, embedded_sig)) - unhashed_subpackets.append(proto.CUSTOM_SUBPACKET) + unhashed_subpackets.append(protocol.subpacket(32, embedded_sig)) + unhashed_subpackets.append(protocol.CUSTOM_SUBPACKET) log.info('confirm signing with primary key') if not primary['_is_custom']: signer_func = AgentSigner(primary['user_id']).sign - signature = proto.make_signature( + signature = protocol.make_signature( signer_func=signer_func, data_to_sign=data_to_sign, public_algo=primary['algo'], sig_type=0x18, hashed_subpackets=hashed_subpackets, unhashed_subpackets=unhashed_subpackets) - sign_packet = proto.packet(tag=2, blob=signature) + sign_packet = protocol.packet(tag=2, blob=signature) return primary_bytes + subkey_packet + sign_packet @@ -178,12 +178,12 @@ def load_from_public_key(pubkey_dict): """Load correct public key from the device.""" user_id = pubkey_dict['user_id'] created = pubkey_dict['created'] - curve_name = proto.find_curve_by_algo_id(pubkey_dict['algo']) + curve_name = protocol.find_curve_by_algo_id(pubkey_dict['algo']) assert curve_name in formats.SUPPORTED_CURVES - ecdh = (pubkey_dict['algo'] == proto.ECDH_ALGO_ID) + ecdh = (pubkey_dict['algo'] == protocol.ECDH_ALGO_ID) conn = HardwareSigner(user_id, curve_name=curve_name) - pubkey = proto.PublicKey( + pubkey = protocol.PublicKey( curve_name=curve_name, created=created, verifying_key=conn.pubkey(ecdh=ecdh), ecdh=ecdh) assert pubkey.key_id() == pubkey_dict['key_id'] diff --git a/trezor_agent/gpg/proto.py b/trezor_agent/gpg/protocol.py similarity index 100% rename from trezor_agent/gpg/proto.py rename to trezor_agent/gpg/protocol.py diff --git a/trezor_agent/gpg/tests/test_proto.py b/trezor_agent/gpg/tests/test_protocol.py similarity index 57% rename from trezor_agent/gpg/tests/test_proto.py rename to trezor_agent/gpg/tests/test_protocol.py index fec34d9..b649dd9 100644 --- a/trezor_agent/gpg/tests/test_proto.py +++ b/trezor_agent/gpg/tests/test_protocol.py @@ -1,43 +1,43 @@ import ecdsa import ed25519 -from .. import proto +from .. import protocol from ... import formats def test_packet(): - assert proto.packet(1, b'') == b'\x84\x00' - assert proto.packet(2, b'A') == b'\x88\x01A' + assert protocol.packet(1, b'') == b'\x84\x00' + assert protocol.packet(2, b'A') == b'\x88\x01A' blob = b'B' * 0xAB - assert proto.packet(3, blob) == b'\x8c\xAB' + blob + assert protocol.packet(3, blob) == b'\x8c\xAB' + blob blob = b'C' * 0x1234 - assert proto.packet(3, blob) == b'\x8d\x12\x34' + blob + assert protocol.packet(3, blob) == b'\x8d\x12\x34' + blob blob = b'D' * 0x12345678 - assert proto.packet(4, blob) == b'\x92\x12\x34\x56\x78' + blob + assert protocol.packet(4, blob) == b'\x92\x12\x34\x56\x78' + blob def test_subpackets(): - assert proto.subpacket(1, b'') == b'\x01' - assert proto.subpacket(2, '>H', 0x0304) == b'\x02\x03\x04' - assert proto.subpacket_long(9, 0x12345678) == b'\x09\x12\x34\x56\x78' - assert proto.subpacket_time(0x12345678) == b'\x02\x12\x34\x56\x78' - assert proto.subpacket_byte(0xAB, 0xCD) == b'\xAB\xCD' - assert proto.subpackets() == b'\x00\x00' - assert proto.subpackets(b'ABC', b'12345') == b'\x00\x0A\x03ABC\x0512345' + assert protocol.subpacket(1, b'') == b'\x01' + assert protocol.subpacket(2, '>H', 0x0304) == b'\x02\x03\x04' + assert protocol.subpacket_long(9, 0x12345678) == b'\x09\x12\x34\x56\x78' + assert protocol.subpacket_time(0x12345678) == b'\x02\x12\x34\x56\x78' + assert protocol.subpacket_byte(0xAB, 0xCD) == b'\xAB\xCD' + assert protocol.subpackets() == b'\x00\x00' + assert protocol.subpackets(b'ABC', b'12345') == b'\x00\x0A\x03ABC\x0512345' def test_mpi(): - assert proto.mpi(0x123) == b'\x00\x09\x01\x23' + assert protocol.mpi(0x123) == b'\x00\x09\x01\x23' def test_find(): - assert proto.find_curve_by_algo_id(19) == formats.CURVE_NIST256 - assert proto.find_curve_by_algo_id(22) == formats.CURVE_ED25519 + assert protocol.find_curve_by_algo_id(19) == formats.CURVE_NIST256 + assert protocol.find_curve_by_algo_id(22) == formats.CURVE_ED25519 def test_armor(): data = bytearray(range(256)) - assert proto.armor(data, 'TEST') == '''-----BEGIN PGP TEST----- + assert protocol.armor(data, 'TEST') == '''-----BEGIN PGP TEST----- Version: GnuPG v2 AAECAwQFBgcICQoLDA0ODxAREhMUFRYXGBkaGxwdHh8gISIjJCUmJygpKissLS4v @@ -57,11 +57,11 @@ def test_make_signature(): b'\xc7\xf4;\x86\x97s\xdb\x9a\xda\xee< \xcb\x9e\x00') return (7, 8) - sig = proto.make_signature( + sig = protocol.make_signature( signer_func=signer_func, data_to_sign=b'Hello World!', public_algo=22, - hashed_subpackets=[proto.subpacket_time(1)], + hashed_subpackets=[protocol.subpacket_time(1)], unhashed_subpackets=[], sig_type=25) assert sig == (b'\x04\x19\x16\x08\x00\x06\x05\x02' @@ -71,8 +71,8 @@ def test_make_signature(): def test_nist256p1(): sk = ecdsa.SigningKey.from_secret_exponent(secexp=1, curve=ecdsa.NIST256p) vk = sk.get_verifying_key() - pk = proto.PublicKey(curve_name=formats.CURVE_NIST256, - created=42, verifying_key=vk) + pk = protocol.PublicKey(curve_name=formats.CURVE_NIST256, + created=42, verifying_key=vk) assert repr(pk) == 'GPG public key nist256p1/F82361D9' assert pk.keygrip == b'\x95\x85.\x91\x7f\xe2\xc3\x91R\xba\x99\x81\x92\xb5y\x1d\xb1\\\xdc\xf0' @@ -80,8 +80,8 @@ def test_nist256p1(): def test_nist256p1_ecdh(): sk = ecdsa.SigningKey.from_secret_exponent(secexp=1, curve=ecdsa.NIST256p) vk = sk.get_verifying_key() - pk = proto.PublicKey(curve_name=formats.CURVE_NIST256, - created=42, verifying_key=vk, ecdh=True) + pk = protocol.PublicKey(curve_name=formats.CURVE_NIST256, + created=42, verifying_key=vk, ecdh=True) assert repr(pk) == 'GPG public key nist256p1/5811DF46' assert pk.keygrip == b'\x95\x85.\x91\x7f\xe2\xc3\x91R\xba\x99\x81\x92\xb5y\x1d\xb1\\\xdc\xf0' @@ -89,7 +89,7 @@ def test_nist256p1_ecdh(): def test_ed25519(): sk = ed25519.SigningKey(b'\x00' * 32) vk = sk.get_verifying_key() - pk = proto.PublicKey(curve_name=formats.CURVE_ED25519, - created=42, verifying_key=vk) + pk = protocol.PublicKey(curve_name=formats.CURVE_ED25519, + created=42, verifying_key=vk) assert repr(pk) == 'GPG public key ed25519/36B40FE6' assert pk.keygrip == b'\xbf\x01\x90l\x17\xb64\xa3-\xf4\xc0gr\x99\x18<\xddBQ?'