Merge branch 'subkey'

This commit is contained in:
Roman Zeyde
2016-04-30 15:42:21 +03:00
5 changed files with 257 additions and 101 deletions

View File

@@ -76,15 +76,47 @@ def _parse(s):
return _parse_term(s)
def _parse_ecdsa_sig(args):
(r, sig_r), (s, sig_s) = args
assert r == 'r'
assert s == 's'
return (util.bytes2num(sig_r),
util.bytes2num(sig_s))
def _parse_rsa_sig(args):
(s, sig_s), = args
assert s == 's'
return (util.bytes2num(sig_s),)
def _parse_sig(sig):
label, sig = sig
assert label == 'sig-val'
algo_name = sig[0]
parser = {'rsa': _parse_rsa_sig, 'ecdsa': _parse_ecdsa_sig}[algo_name]
return parser(args=sig[1:])
def sign(sock, keygrip, digest):
"""Sign a digest using specified key using GPG agent."""
hash_algo = 8 # SHA256
assert len(digest) == 32
assert _communicate(sock, 'RESET').startswith('OK')
ttyname = sp.check_output('tty').strip()
options = ['ttyname={}'.format(ttyname)] # set TTY for passphrase entry
for opt in options:
assert _communicate(sock, 'OPTION {}'.format(opt)) == 'OK'
assert _communicate(sock, 'SIGKEY {}'.format(keygrip)) == 'OK'
assert _communicate(sock, 'SETHASH {} {}'.format(hash_algo,
_hex(digest))) == 'OK'
desc = ('Please+enter+the+passphrase+to+unlock+the+OpenPGP%0A'
'secret+key,+to+sign+a+new+TREZOR-based+subkey')
assert _communicate(sock, 'SETKEYDESC {}'.format(desc)) == 'OK'
assert _communicate(sock, 'PKSIGN') == 'OK'
line = _recvline(sock).strip()
@@ -95,14 +127,7 @@ def sign(sock, keygrip, digest):
sig, leftover = _parse(sig)
assert not leftover
data, (algo, (r, sig_r), (s, sig_s)) = sig
assert data == 'sig-val'
assert algo == 'ecdsa'
assert r == 'r'
assert s == 's'
return (util.bytes2num(sig_r),
util.bytes2num(sig_s))
return _parse_sig(sig)
def get_keygrip(user_id):
@@ -113,8 +138,9 @@ def get_keygrip(user_id):
def _main():
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(levelname)-10s %(message)s')
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(levelname)-10s %(message)-100s '
'%(filename)s:%(lineno)d')
p = argparse.ArgumentParser()
p.add_argument('user_id')

View File

@@ -68,11 +68,27 @@ def _parse_ed25519_verifier(mpi):
return _ed25519_verify, vk
def _create_rsa_verifier(n, e):
def verifier(signature, digest):
s, = signature
size = n.bit_length()
result = pow(s, e, n) % (2 ** 256)
digest = util.bytes2num(digest)
if result == digest:
log.debug('RSA-%d signature is OK', size)
return True
else:
raise ValueError('invalid RSA signature')
return verifier
SUPPORTED_CURVES = {
b'\x2A\x86\x48\xCE\x3D\x03\x01\x07': _parse_nist256p1_verifier,
b'\x2B\x06\x01\x04\x01\xDA\x47\x0F\x01': _parse_ed25519_verifier,
}
ECDSA_ALGO_IDS = (19, 22) # (nist256, ed25519)
def _parse_literal(stream):
"""See https://tools.ietf.org/html/rfc4880#section-5.9 for details."""
@@ -115,10 +131,15 @@ def _parse_signature(stream):
p['unhashed_subpackets'] = parse_subpackets(stream)
embedded = list(_parse_embedded_signatures(p['unhashed_subpackets']))
if embedded:
log.info('embedded sigs: %s', embedded)
p['embedded'] = embedded
p['hash_prefix'] = stream.readfmt('2s')
p['sig'] = (parse_mpi(stream), parse_mpi(stream))
if p['pubkey_alg'] in ECDSA_ALGO_IDS:
p['sig'] = (parse_mpi(stream), parse_mpi(stream))
else: # RSA
p['sig'] = (parse_mpi(stream),)
assert not stream.read()
return p
@@ -131,16 +152,23 @@ def _parse_pubkey(stream):
p['version'] = stream.readfmt('B')
p['created'] = stream.readfmt('>L')
p['algo'] = stream.readfmt('B')
if p['algo'] in ECDSA_ALGO_IDS:
# https://tools.ietf.org/html/rfc6637#section-11
oid_size = stream.readfmt('B')
oid = stream.read(oid_size)
assert oid in SUPPORTED_CURVES, util.hexlify(oid)
parser = SUPPORTED_CURVES[oid]
# https://tools.ietf.org/html/rfc6637#section-11
oid_size = stream.readfmt('B')
oid = stream.read(oid_size)
assert oid in SUPPORTED_CURVES
parser = SUPPORTED_CURVES[oid]
mpi = parse_mpi(stream)
log.debug('mpi: %x (%d bits)', mpi, mpi.bit_length())
p['verifier'], p['verifying_key'] = parser(mpi)
else: # RSA
n = parse_mpi(stream)
e = parse_mpi(stream)
log.debug('n: %x (%d bits)', n, n.bit_length())
log.debug('e: %x (%d bits)', e, e.bit_length())
p['verifier'] = _create_rsa_verifier(n, e)
mpi = parse_mpi(stream)
log.debug('mpi: %x (%d bits)', mpi, mpi.bit_length())
p['verifier'], p['verifying_key'] = parser(mpi)
assert not stream.read()
# https://tools.ietf.org/html/rfc4880#section-12.2
@@ -248,13 +276,20 @@ def digest_packets(packets):
def load_public_key(stream):
"""Parse and validate GPG public key from an input stream."""
packets = list(parse_packets(util.Reader(stream)))
pubkey, userid, signature = packets[:3]
subkey = subsig = None
if len(packets) == 5:
pubkey, userid, signature, subkey, subsig = packets
log.debug('subkey: %s', subkey)
log.debug('subsig: %s', subsig)
else:
pubkey, userid, signature = packets
digest = digest_packets([pubkey, userid, signature])
assert signature['hash_prefix'] == digest[:2]
log.debug('loaded public key "%s"', userid['value'])
verify_digest(pubkey=pubkey, digest=digest,
signature=signature['sig'], label='GPG public key')
return pubkey
return subkey or pubkey
def load_signature(stream, original_data):

View File

@@ -13,10 +13,18 @@ log = logging.getLogger(__name__)
def packet(tag, blob):
"""Create small GPG packet."""
assert len(blob) < 256
length_type = 0 # : 1 byte for length
assert len(blob) < 2**32
if len(blob) < 2**8:
length_type = 0
elif len(blob) < 2**16:
length_type = 1
else:
length_type = 2
fmt = ['>B', '>H', '>L'][length_type]
leading_byte = 0x80 | (tag << 2) | (length_type)
return struct.pack('>B', leading_byte) + util.prefix_len('>B', blob)
return struct.pack('>B', leading_byte) + util.prefix_len(fmt, blob)
def subpacket(subpacket_type, fmt, *values):
@@ -50,13 +58,13 @@ def mpi(value):
"""Serialize multipresicion integer using GPG format."""
bits = value.bit_length()
data_size = (bits + 7) // 8
data_bytes = [0] * data_size
data_bytes = bytearray(data_size)
for i in range(data_size):
data_bytes[i] = value & 0xFF
value = value >> 8
data_bytes.reverse()
return struct.pack('>H', bits) + bytearray(data_bytes)
return struct.pack('>H', bits) + bytes(data_bytes)
def _serialize_nist256(vk):
@@ -92,7 +100,10 @@ def _find_curve_by_algo_id(algo_id):
class HardwareSigner(object):
"""Sign messages and get public keys from a hardware device."""
def __init__(self, user_id, curve_name):
"""Connect to the device and retrieve required public key."""
self.client_wrapper = factory.load()
self.identity = self.client_wrapper.identity_type()
self.identity.proto = 'gpg'
@@ -100,6 +111,7 @@ class HardwareSigner(object):
self.curve_name = curve_name
def pubkey(self):
"""Return public key as VerifyingKey object."""
addr = client.get_address(self.identity)
public_node = self.client_wrapper.connection.get_public_node(
n=addr, ecdsa_curve_name=self.curve_name)
@@ -108,56 +120,100 @@ class HardwareSigner(object):
pubkey=public_node.node.public_key,
curve_name=self.curve_name)
def sign(self, digest, visual):
def sign(self, digest):
"""Sign the digest and return a serialized signature."""
result = self.client_wrapper.connection.sign_identity(
identity=self.identity,
challenge_hidden=digest,
challenge_visual=visual,
challenge_visual=util.hexlify(digest),
ecdsa_curve_name=self.curve_name)
assert result.signature[:1] == b'\x00'
sig = result.signature[1:]
return mpi(util.bytes2num(sig[:32])) + mpi(util.bytes2num(sig[32:]))
def close(self):
"""Close the connection to the device."""
self.client_wrapper.connection.clear_session()
self.client_wrapper.connection.close()
class AgentSigner(object):
def __init__(self, user_id, curve_name):
"""Sign messages and get public keys using gpg-agent tool."""
def __init__(self, user_id):
"""Connect to the agent and retrieve required public key."""
self.sock = agent.connect()
assert curve_name == formats.CURVE_NIST256
self.curve_name = curve_name
self.keygrip = agent.get_keygrip(user_id)
self.public_key = decode.load_from_gpg(user_id)
def pubkey(self):
"""Return public key as VerifyingKey object."""
return self.public_key['verifying_key']
def sign(self, digest, visual):
r, s = agent.sign(sock=self.sock, keygrip=self.keygrip, digest=digest)
return mpi(r) + mpi(s)
def sign(self, digest):
"""Sign the digest and return an ECDSA signature."""
params = agent.sign(sock=self.sock,
keygrip=self.keygrip, digest=digest)
return b''.join(mpi(p) for p in params)
def close(self):
"""Close the connection to gpg-agent."""
self.sock.close()
class PublicKey(object):
"""GPG representation for public key packets."""
def __init__(self, curve_name, created, verifying_key):
"""Contruct using a ECDSA VerifyingKey object."""
self.curve_info = SUPPORTED_CURVES[curve_name]
self.created = int(created) # time since Epoch
self.verifying_key = verifying_key
self.algo_id = self.curve_info['algo_id']
def data(self):
"""Data for packet creation."""
header = struct.pack('>BLB',
4, # version
self.created, # creation
self.algo_id) # public key algorithm ID
oid = util.prefix_len('>B', self.curve_info['oid'])
blob = self.curve_info['serialize'](self.verifying_key)
return header + oid + blob
def data_to_hash(self):
"""Data for digest computation."""
return b'\x99' + util.prefix_len('>H', self.data())
def _fingerprint(self):
return hashlib.sha1(self.data_to_hash()).digest()
def key_id(self):
"""Short (8 byte) GPG key ID."""
return self._fingerprint()[-8:]
def __repr__(self):
"""Short (8 hexadecimal digits) GPG key ID."""
return '<{}>'.format(util.hexlify(self.key_id()))
__str__ = __repr__
class Signer(object):
"""Performs GPG signing operations."""
SignerType = AgentSigner
def __init__(self, user_id, created, curve_name):
"""Construct and loads a public key from the device."""
self.user_id = user_id
assert curve_name in formats.SUPPORTED_CURVES
self.conn = self.SignerType(user_id, curve_name=curve_name)
self.verifying_key = self.conn.pubkey()
self.conn = HardwareSigner(user_id, curve_name=curve_name)
self.pubkey = PublicKey(
curve_name=curve_name, created=created,
verifying_key=self.conn.pubkey())
self.created = int(created)
log.info('%s GPG public key %s created at %s', curve_name,
self.hex_short_key_id(), util.time_format(self.created))
self.pubkey, util.time_format(self.pubkey.created))
@classmethod
def from_public_key(cls, pubkey, user_id):
@@ -170,64 +226,79 @@ class Signer(object):
s = Signer(user_id=user_id,
created=pubkey['created'],
curve_name=_find_curve_by_algo_id(pubkey['algo']))
assert s.key_id() == pubkey['key_id']
assert s.pubkey.key_id() == pubkey['key_id']
return s
def _pubkey_data(self):
curve_info = SUPPORTED_CURVES[self.conn.curve_name]
header = struct.pack('>BLB',
4, # version
self.created, # creation
curve_info['algo_id'])
oid = util.prefix_len('>B', curve_info['oid'])
blob = curve_info['serialize'](self.verifying_key)
return header + oid + blob
def _pubkey_data_to_hash(self):
return b'\x99' + util.prefix_len('>H', self._pubkey_data())
def _fingerprint(self):
return hashlib.sha1(self._pubkey_data_to_hash()).digest()
def key_id(self):
"""Short (8 byte) GPG key ID."""
return self._fingerprint()[-8:]
def hex_short_key_id(self):
"""Short (8 hexadecimal digits) GPG key ID."""
return util.hexlify(self.key_id()[-4:])
def close(self):
"""Close connection and turn off the screen of the device."""
self.conn.close()
def export(self):
"""Export GPG public key, ready for "gpg2 --import"."""
pubkey_packet = packet(tag=6, blob=self._pubkey_data())
pubkey_packet = packet(tag=6, blob=self.pubkey.data())
user_id_packet = packet(tag=13, blob=self.user_id)
data_to_sign = (self._pubkey_data_to_hash() +
data_to_sign = (self.pubkey.data_to_hash() +
user_id_packet[:1] +
util.prefix_len('>L', self.user_id))
log.info('signing public key "%s"', self.user_id)
hashed_subpackets = [
subpacket_time(self.created), # signature creaion time
subpacket_time(self.pubkey.created), # signature creaion time
subpacket_byte(0x1B, 1 | 2), # key flags (certify & sign)
subpacket_byte(0x15, 8), # preferred hash (SHA256)
subpacket_byte(0x16, 0), # preferred compression (none)
subpacket_byte(0x17, 0x80)] # key server prefs (no-modify)
signature = self._make_signature(visual=self.hex_short_key_id(),
data_to_sign=data_to_sign,
sig_type=0x13, # user id & public key
hashed_subpackets=hashed_subpackets)
unhashed_subpackets = [
subpacket(16, self.pubkey.key_id())] # issuer key id
signature = _make_signature(
signer_func=self.conn.sign,
public_algo=self.pubkey.algo_id,
data_to_sign=data_to_sign,
sig_type=0x13, # user id & public key
hashed_subpackets=hashed_subpackets,
unhashed_subpackets=unhashed_subpackets)
sign_packet = packet(tag=2, blob=signature)
return pubkey_packet + user_id_packet + sign_packet
def subkey(self, user_id):
primary = decode.load_from_gpg(user_id)
keygrip = agent.get_keygrip(user_id)
log.info('adding as subkey to %s (%s)', user_id, keygrip)
def subkey(self):
"""Export a subkey to `self.user_id` GPG primary key."""
subkey_packet = packet(tag=14, blob=self.pubkey.data())
primary = decode.load_from_gpg(self.user_id)
keygrip = agent.get_keygrip(self.user_id)
log.info('adding as subkey to %s (%s)', self.user_id, keygrip)
data_to_sign = primary['_to_hash'] + self.pubkey.data_to_hash()
# Primary Key Binding Signature
hashed_subpackets = [
subpacket_time(self.pubkey.created)] # signature creaion time
unhashed_subpackets = [
subpacket(16, self.pubkey.key_id())] # issuer key id
embedded_sig = _make_signature(signer_func=self.conn.sign,
data_to_sign=data_to_sign,
public_algo=self.pubkey.algo_id,
sig_type=0x19,
hashed_subpackets=hashed_subpackets,
unhashed_subpackets=unhashed_subpackets)
log.info('embedded signature: %r', embedded_sig)
# Subkey Binding Signature
hashed_subpackets = [
subpacket_time(self.pubkey.created), # signature creaion time
subpacket_byte(0x1B, 2)] # key flags (certify & sign)
unhashed_subpackets = [
subpacket(16, primary['key_id']), # issuer key id
subpacket(32, embedded_sig)]
gpg_agent = AgentSigner(self.user_id)
signature = _make_signature(signer_func=gpg_agent.sign,
data_to_sign=data_to_sign,
public_algo=primary['algo'],
sig_type=0x18,
hashed_subpackets=hashed_subpackets,
unhashed_subpackets=unhashed_subpackets)
sign_packet = packet(tag=2, blob=signature)
return subkey_packet + sign_packet
def sign(self, msg, sign_time=None):
"""Sign GPG message at specified time."""
@@ -237,34 +308,38 @@ class Signer(object):
log.info('signing %d byte message at %s',
len(msg), util.time_format(sign_time))
hashed_subpackets = [subpacket_time(sign_time)]
blob = self._make_signature(
visual=self.hex_short_key_id(),
data_to_sign=msg, hashed_subpackets=hashed_subpackets)
unhashed_subpackets = [
subpacket(16, self.pubkey.key_id())] # issuer key id
blob = _make_signature(signer_func=self.conn.sign,
data_to_sign=msg,
public_algo=self.pubkey.algo_id,
hashed_subpackets=hashed_subpackets,
unhashed_subpackets=unhashed_subpackets)
return packet(tag=2, blob=blob)
def _make_signature(self, visual, data_to_sign,
hashed_subpackets, sig_type=0):
curve_info = SUPPORTED_CURVES[self.conn.curve_name]
header = struct.pack('>BBBB',
4, # version
sig_type, # rfc4880 (section-5.2.1)
curve_info['algo_id'],
8) # hash_alg (SHA256)
hashed = subpackets(*hashed_subpackets)
unhashed = subpackets(
subpacket(16, self.key_id()) # issuer key id
)
tail = b'\x04\xff' + struct.pack('>L', len(header) + len(hashed))
data_to_hash = data_to_sign + header + hashed + tail
log.debug('hashing %d bytes', len(data_to_hash))
digest = hashlib.sha256(data_to_hash).digest()
def _make_signature(signer_func, data_to_sign, public_algo,
hashed_subpackets, unhashed_subpackets, sig_type=0):
# pylint: disable=too-many-arguments
header = struct.pack('>BBBB',
4, # version
sig_type, # rfc4880 (section-5.2.1)
public_algo,
8) # hash_alg (SHA256)
hashed = subpackets(*hashed_subpackets)
unhashed = subpackets(*unhashed_subpackets)
tail = b'\x04\xff' + struct.pack('>L', len(header) + len(hashed))
data_to_hash = data_to_sign + header + hashed + tail
sig = self.conn.sign(digest=digest, visual=visual)
log.debug('hashing %d bytes', len(data_to_hash))
digest = hashlib.sha256(data_to_hash).digest()
return (header + hashed + unhashed +
digest[:2] + # used for decoder's sanity check
sig) # actual ECDSA signature
sig = signer_func(digest=digest)
return bytes(header + hashed + unhashed +
digest[:2] + # used for decoder's sanity check
sig) # actual ECDSA signature
def _split_lines(body, size):

View File

@@ -31,6 +31,7 @@ def main():
p.add_argument('-t', '--time', type=int, default=int(time.time()))
p.add_argument('-a', '--armor', action='store_true', default=False)
p.add_argument('-v', '--verbose', action='store_true', default=False)
p.add_argument('-s', '--subkey', action='store_true', default=False)
p.add_argument('-e', '--ecdsa-curve', default='nist256p1')
p.add_argument('-o', '--output',
help='Output file name for the results. '
@@ -44,12 +45,16 @@ def main():
if not args.filename:
s = encode.Signer(user_id=user_id, created=args.time,
curve_name=args.ecdsa_curve)
pubkey = s.export()
if args.subkey:
pubkey = s.subkey()
else:
pubkey = s.export()
ext = '.pub'
if args.armor:
pubkey = encode.armor(pubkey, 'PUBLIC KEY BLOCK')
ext = '.asc'
filename = args.output or (s.hex_short_key_id() + ext)
filename = args.output or '-' # use stdout if no file specified
if filename == 'GPG':
log.info('importing public key to local keyring')
_call_with_input(['gpg2', '--import'], pubkey)

15
trezor_agent/gpg/test.sh Normal file
View File

@@ -0,0 +1,15 @@
# NEVER RUN ON YOUR OWN REAL GPG KEYS!!!!! THEY WILL BE DELETED!!!!!
set -x -e -u
CURVE=ed25519
#CURVE=nist256p1
(cd ~/.gnupg && rm -rf openpgp-revocs.d/ private-keys-v1.d/ pubring.kbx* trustdb.gpg /tmp/log *.gpg; killall gpg-agent || true)
gpg2 --full-gen-key --expert
gpg2 --export > romanz.pub
NOW=`date +%s`
trezor-gpg -t $NOW -v -e $CURVE --subkey "romanz" -o subkey.pub
gpg2 -K
gpg2 -vv --import <(cat romanz.pub subkey.pub)
gpg2 -K
trezor-gpg -t $NOW -v -e $CURVE "romanz" EXAMPLE
gpg2 --verify EXAMPLE.sig