Split the package into a shared library and separate per-device packages

This commit is contained in:
Roman Zeyde
2017-04-27 16:31:17 +03:00
parent eb525e1b62
commit 4af881b3cb
54 changed files with 229 additions and 157 deletions

145
libagent/gpg/__init__.py Normal file
View File

@@ -0,0 +1,145 @@
"""
TREZOR support for ECDSA GPG signatures.
See these links for more details:
- https://www.gnupg.org/faq/whats-new-in-2.1.html
- https://tools.ietf.org/html/rfc4880
- https://tools.ietf.org/html/rfc6637
- https://tools.ietf.org/html/draft-irtf-cfrg-eddsa-05
"""
import argparse
import contextlib
import logging
import os
import sys
import time
import semver
from . import agent, client, encode, keyring, protocol
from .. import device, formats, server, util
log = logging.getLogger(__name__)
def export_public_key(device_type, args):
"""Generate a new pubkey for a new/existing GPG identity."""
log.warning('NOTE: in order to re-generate the exact same GPG key later, '
'run this command with "--time=%d" commandline flag (to set '
'the timestamp of the GPG key manually).', args.time)
c = client.Client(user_id=args.user_id, curve_name=args.ecdsa_curve,
device_type=device_type)
verifying_key = c.pubkey(ecdh=False)
decryption_key = c.pubkey(ecdh=True)
if args.subkey: # add as subkey
log.info('adding %s GPG subkey for "%s" to existing key',
args.ecdsa_curve, args.user_id)
# subkey for signing
signing_key = protocol.PublicKey(
curve_name=args.ecdsa_curve, created=args.time,
verifying_key=verifying_key, ecdh=False)
# subkey for encryption
encryption_key = protocol.PublicKey(
curve_name=formats.get_ecdh_curve_name(args.ecdsa_curve),
created=args.time, verifying_key=decryption_key, ecdh=True)
primary_bytes = keyring.export_public_key(args.user_id)
result = encode.create_subkey(primary_bytes=primary_bytes,
subkey=signing_key,
signer_func=c.sign)
result = encode.create_subkey(primary_bytes=result,
subkey=encryption_key,
signer_func=c.sign)
else: # add as primary
log.info('creating new %s GPG primary key for "%s"',
args.ecdsa_curve, args.user_id)
# primary key for signing
primary = protocol.PublicKey(
curve_name=args.ecdsa_curve, created=args.time,
verifying_key=verifying_key, ecdh=False)
# subkey for encryption
subkey = protocol.PublicKey(
curve_name=formats.get_ecdh_curve_name(args.ecdsa_curve),
created=args.time, verifying_key=decryption_key, ecdh=True)
result = encode.create_primary(user_id=args.user_id,
pubkey=primary,
signer_func=c.sign)
result = encode.create_subkey(primary_bytes=result,
subkey=subkey,
signer_func=c.sign)
sys.stdout.write(protocol.armor(result, 'PUBLIC KEY BLOCK'))
def run_create(device_type, args):
"""Export public GPG key."""
util.setup_logging(verbosity=args.verbose)
log.warning('This GPG tool is still in EXPERIMENTAL mode, '
'so please note that the API and features may '
'change without backwards compatibility!')
existing_gpg = keyring.gpg_version().decode('ascii')
required_gpg = '>=2.1.11'
if semver.match(existing_gpg, required_gpg):
export_public_key(device_type, args)
else:
log.error('Existing gpg2 has version "%s" (%s required)',
existing_gpg, required_gpg)
def run_unlock(device_type, args):
"""Unlock hardware device (for future interaction)."""
util.setup_logging(verbosity=args.verbose)
with device_type() as d:
log.info('unlocked %s device', d)
def run_agent(device_type):
"""Run a simple GPG-agent server."""
parser = argparse.ArgumentParser()
parser.add_argument('--homedir', default=os.environ.get('GNUPGHOME'))
args, _ = parser.parse_known_args()
assert args.homedir
config_file = os.path.join(args.homedir, 'gpg-agent.conf')
lines = (line.strip() for line in open(config_file))
lines = (line for line in lines if line and not line.startswith('#'))
config = dict(line.split(' ', 1) for line in lines)
util.setup_logging(verbosity=int(config['verbosity']),
filename=config['log-file'])
sock_path = keyring.get_agent_sock_path()
with server.unix_domain_socket_server(sock_path) as sock:
for conn in agent.yield_connections(sock):
with contextlib.closing(conn):
try:
agent.handle_connection(conn=conn, device_type=device_type)
except StopIteration:
log.info('stopping gpg-agent')
return
except Exception as e: # pylint: disable=broad-except
log.exception('gpg-agent failed: %s', e)
def main(device_type):
"""Parse command-line arguments."""
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers()
p = subparsers.add_parser('create', help='Export public GPG key')
p.add_argument('user_id')
p.add_argument('-e', '--ecdsa-curve', default='nist256p1')
p.add_argument('-t', '--time', type=int, default=int(time.time()))
p.add_argument('-v', '--verbose', default=0, action='count')
p.add_argument('-s', '--subkey', default=False, action='store_true')
p.set_defaults(func=run_create)
p = subparsers.add_parser('unlock', help='Unlock the hardware device')
p.add_argument('-v', '--verbose', default=0, action='count')
p.set_defaults(func=run_unlock)
args = parser.parse_args()
return args.func(device_type=device_type, args=args)

171
libagent/gpg/agent.py Normal file
View File

@@ -0,0 +1,171 @@
"""GPG-agent utilities."""
import binascii
import logging
from . import client, decode, keyring, protocol
from .. import util
log = logging.getLogger(__name__)
def yield_connections(sock):
"""Run a server on the specified socket."""
while True:
log.debug('waiting for connection on %s', sock.getsockname())
try:
conn, _ = sock.accept()
except KeyboardInterrupt:
return
conn.settimeout(None)
log.debug('accepted connection on %s', sock.getsockname())
yield conn
def serialize(data):
"""Serialize data according to ASSUAN protocol."""
for c in [b'%', b'\n', b'\r']:
escaped = '%{:02X}'.format(ord(c)).encode('ascii')
data = data.replace(c, escaped)
return data
def sig_encode(r, s):
"""Serialize ECDSA signature data into GPG S-expression."""
r = serialize(util.num2bytes(r, 32))
s = serialize(util.num2bytes(s, 32))
return b'(7:sig-val(5:ecdsa(1:r32:' + r + b')(1:s32:' + s + b')))'
def open_connection(keygrip_bytes, device_type):
"""
Connect to the device for the specified keygrip.
Parse GPG public key to find the first user ID, which is used to
specify the correct signature/decryption key on the device.
"""
pubkey_dict, user_ids = decode.load_by_keygrip(
pubkey_bytes=keyring.export_public_keys(),
keygrip=keygrip_bytes)
# We assume the first user ID is used to generate TREZOR-based GPG keys.
user_id = user_ids[0]['value'].decode('ascii')
curve_name = protocol.get_curve_name_by_oid(pubkey_dict['curve_oid'])
ecdh = (pubkey_dict['algo'] == protocol.ECDH_ALGO_ID)
conn = client.Client(user_id, curve_name=curve_name, device_type=device_type)
pubkey = protocol.PublicKey(
curve_name=curve_name, created=pubkey_dict['created'],
verifying_key=conn.pubkey(ecdh=ecdh), ecdh=ecdh)
assert pubkey.key_id() == pubkey_dict['key_id']
assert pubkey.keygrip() == keygrip_bytes
return conn
def pksign(keygrip, digest, algo, device_type):
"""Sign a message digest using a private EC key."""
log.debug('signing %r digest (algo #%s)', digest, algo)
keygrip_bytes = binascii.unhexlify(keygrip)
conn = open_connection(keygrip_bytes, device_type=device_type)
r, s = conn.sign(binascii.unhexlify(digest))
result = sig_encode(r, s)
log.debug('result: %r', result)
return result
def _serialize_point(data):
prefix = '{}:'.format(len(data)).encode('ascii')
# https://www.gnupg.org/documentation/manuals/assuan/Server-responses.html
return b'(5:value' + serialize(prefix + data) + b')'
def parse_ecdh(line):
"""Parse ECDH request and return remote public key."""
prefix, line = line.split(b' ', 1)
assert prefix == b'D'
exp, leftover = keyring.parse(keyring.unescape(line))
log.debug('ECDH s-exp: %r', exp)
assert not leftover
label, exp = exp
assert label == b'enc-val'
assert exp[0] == b'ecdh'
items = exp[1:]
log.debug('ECDH parameters: %r', items)
return dict(items)[b'e']
def pkdecrypt(keygrip, conn, device_type):
"""Handle decryption using ECDH."""
for msg in [b'S INQUIRE_MAXLEN 4096', b'INQUIRE CIPHERTEXT']:
keyring.sendline(conn, msg)
line = keyring.recvline(conn)
assert keyring.recvline(conn) == b'END'
remote_pubkey = parse_ecdh(line)
keygrip_bytes = binascii.unhexlify(keygrip)
conn = open_connection(keygrip_bytes, device_type=device_type)
return _serialize_point(conn.ecdh(remote_pubkey))
@util.memoize
def have_key(keygrip, device_type):
"""Check if current keygrip correspond to a TREZOR-based key."""
try:
open_connection(keygrip_bytes=binascii.unhexlify(keygrip),
device_type=device_type)
return True
except KeyError as e:
log.warning('HAVEKEY(%s) failed: %s', keygrip, e)
return False
# pylint: disable=too-many-branches
def handle_connection(conn, device_type):
"""Handle connection from GPG binary using the ASSUAN protocol."""
keygrip = None
digest = None
algo = None
version = keyring.gpg_version() # "Clone" existing GPG version
keyring.sendline(conn, b'OK')
for line in keyring.iterlines(conn):
parts = line.split(b' ')
command = parts[0]
args = parts[1:]
if command in {b'RESET', b'OPTION', b'SETKEYDESC'}:
pass # reply with OK
elif command == b'GETINFO':
keyring.sendline(conn, b'D ' + version)
elif command == b'AGENT_ID':
keyring.sendline(conn, b'D TREZOR') # "Fake" agent ID
elif command in {b'SIGKEY', b'SETKEY'}:
keygrip, = args
elif command == b'SETHASH':
algo, digest = args
elif command == b'PKSIGN':
sig = pksign(keygrip, digest, algo, device_type=device_type)
keyring.sendline(conn, b'D ' + sig)
elif command == b'PKDECRYPT':
sec = pkdecrypt(keygrip, conn, device_type=device_type)
keyring.sendline(conn, b'D ' + sec)
elif command == b'HAVEKEY':
if not have_key(keygrip=args[0], device_type=device_type):
keyring.sendline(conn,
b'ERR 67108881 No secret key <GPG Agent>')
return
elif command == b'KEYINFO':
keygrip, = args
# Dummy reply (mainly for 'gpg --edit' to succeed).
# For details, see GnuPG agent KEYINFO command help.
# https://git.gnupg.org/cgi-bin/gitweb.cgi?p=gnupg.git;a=blob;f=agent/command.c;h=c8b34e9882076b1b724346787781f657cac75499;hb=refs/heads/master#l1082
fmt = 'S KEYINFO {0} X - - - - - - -'
keyring.sendline(conn, fmt.format(keygrip).encode('ascii'))
elif command == b'BYE':
return
elif command == b'KILLAGENT':
keyring.sendline(conn, b'OK')
raise StopIteration
else:
log.error('unknown request: %r', line)
return
keyring.sendline(conn, b'OK')

44
libagent/gpg/client.py Normal file
View File

@@ -0,0 +1,44 @@
"""Device abstraction layer for GPG operations."""
import logging
from .. import device, formats, util
log = logging.getLogger(__name__)
class Client(object):
"""Sign messages and get public keys from a hardware device."""
def __init__(self, user_id, curve_name, device_type):
"""Connect to the device and retrieve required public key."""
self.device = device_type()
self.user_id = user_id
self.identity = device.interface.Identity(
identity_str='gpg://', curve_name=curve_name)
self.identity.identity_dict['host'] = user_id
def pubkey(self, ecdh=False):
"""Return public key as VerifyingKey object."""
with self.device:
pubkey = self.device.pubkey(ecdh=ecdh, identity=self.identity)
return formats.decompress_pubkey(
pubkey=pubkey, curve_name=self.identity.curve_name)
def sign(self, digest):
"""Sign the digest and return a serialized signature."""
log.info('please confirm GPG signature on %s for "%s"...',
self.device, self.user_id)
if self.identity.curve_name == formats.CURVE_NIST256:
digest = digest[:32] # sign the first 256 bits
log.debug('signing digest: %s', util.hexlify(digest))
with self.device:
sig = self.device.sign(blob=digest, identity=self.identity)
return (util.bytes2num(sig[:32]), util.bytes2num(sig[32:]))
def ecdh(self, pubkey):
"""Derive shared secret using ECDH from remote public key."""
log.info('please confirm GPG decryption on %s for "%s"...',
self.device, self.user_id)
with self.device:
return self.device.ecdh(pubkey=pubkey, identity=self.identity)

318
libagent/gpg/decode.py Normal file
View File

@@ -0,0 +1,318 @@
"""Decoders for GPG v2 data structures."""
import base64
import functools
import hashlib
import io
import logging
import struct
import ecdsa
import ed25519
from . import protocol
from .. import util
log = logging.getLogger(__name__)
def parse_subpackets(s):
"""See https://tools.ietf.org/html/rfc4880#section-5.2.3.1 for details."""
subpackets = []
total_size = s.readfmt('>H')
data = s.read(total_size)
s = util.Reader(io.BytesIO(data))
while True:
try:
first = s.readfmt('B')
except EOFError:
break
if first < 192:
subpacket_len = first
elif first < 255:
subpacket_len = ((first - 192) << 8) + s.readfmt('B') + 192
else: # first == 255
subpacket_len = s.readfmt('>L')
subpackets.append(s.read(subpacket_len))
return subpackets
def parse_mpi(s):
"""See https://tools.ietf.org/html/rfc4880#section-3.2 for details."""
bits = s.readfmt('>H')
blob = bytearray(s.read(int((bits + 7) // 8)))
return sum(v << (8 * i) for i, v in enumerate(reversed(blob)))
def parse_mpis(s, n):
"""Parse multiple MPIs from stream."""
return [parse_mpi(s) for _ in range(n)]
def _parse_nist256p1_pubkey(mpi):
prefix, x, y = util.split_bits(mpi, 4, 256, 256)
if prefix != 4:
raise ValueError('Invalid MPI prefix: {}'.format(prefix))
point = ecdsa.ellipticcurve.Point(curve=ecdsa.NIST256p.curve,
x=x, y=y)
return ecdsa.VerifyingKey.from_public_point(
point=point, curve=ecdsa.curves.NIST256p,
hashfunc=hashlib.sha256)
def _parse_ed25519_pubkey(mpi):
prefix, value = util.split_bits(mpi, 8, 256)
if prefix != 0x40:
raise ValueError('Invalid MPI prefix: {}'.format(prefix))
return ed25519.VerifyingKey(util.num2bytes(value, size=32))
SUPPORTED_CURVES = {
b'\x2A\x86\x48\xCE\x3D\x03\x01\x07':
(_parse_nist256p1_pubkey, protocol.keygrip_nist256),
b'\x2B\x06\x01\x04\x01\xDA\x47\x0F\x01':
(_parse_ed25519_pubkey, protocol.keygrip_ed25519),
b'\x2B\x06\x01\x04\x01\x97\x55\x01\x05\x01':
(_parse_ed25519_pubkey, protocol.keygrip_curve25519),
}
RSA_ALGO_IDS = {1, 2, 3}
ELGAMAL_ALGO_ID = 16
DSA_ALGO_ID = 17
ECDSA_ALGO_IDS = {18, 19, 22} # {ecdsa, nist256, ed25519}
def _parse_embedded_signatures(subpackets):
for packet in subpackets:
data = bytearray(packet)
if data[0] == 32:
# https://tools.ietf.org/html/rfc4880#section-5.2.3.26
stream = io.BytesIO(data[1:])
yield _parse_signature(util.Reader(stream))
def has_custom_subpacket(signature_packet):
"""Detect our custom public keys by matching subpacket data."""
return any(protocol.CUSTOM_KEY_LABEL == subpacket[1:]
for subpacket in signature_packet['unhashed_subpackets'])
def _parse_signature(stream):
"""See https://tools.ietf.org/html/rfc4880#section-5.2 for details."""
p = {'type': 'signature'}
to_hash = io.BytesIO()
with stream.capture(to_hash):
p['version'] = stream.readfmt('B')
p['sig_type'] = stream.readfmt('B')
p['pubkey_alg'] = stream.readfmt('B')
p['hash_alg'] = stream.readfmt('B')
p['hashed_subpackets'] = parse_subpackets(stream)
# https://tools.ietf.org/html/rfc4880#section-5.2.4
tail_to_hash = b'\x04\xff' + struct.pack('>L', to_hash.tell())
p['_to_hash'] = to_hash.getvalue() + tail_to_hash
p['unhashed_subpackets'] = parse_subpackets(stream)
embedded = list(_parse_embedded_signatures(p['unhashed_subpackets']))
if embedded:
log.debug('embedded sigs: %s', embedded)
p['embedded'] = embedded
p['hash_prefix'] = stream.readfmt('2s')
if p['pubkey_alg'] in ECDSA_ALGO_IDS:
p['sig'] = (parse_mpi(stream), parse_mpi(stream))
elif p['pubkey_alg'] in RSA_ALGO_IDS: # RSA
p['sig'] = (parse_mpi(stream),)
elif p['pubkey_alg'] == DSA_ALGO_ID:
p['sig'] = (parse_mpi(stream), parse_mpi(stream))
else:
log.error('unsupported public key algo: %d', p['pubkey_alg'])
assert not stream.read()
return p
def _parse_pubkey(stream, packet_type='pubkey'):
"""See https://tools.ietf.org/html/rfc4880#section-5.5 for details."""
p = {'type': packet_type}
packet = io.BytesIO()
with stream.capture(packet):
p['version'] = stream.readfmt('B')
p['created'] = stream.readfmt('>L')
p['algo'] = stream.readfmt('B')
if p['algo'] in ECDSA_ALGO_IDS:
log.debug('parsing elliptic curve key')
# https://tools.ietf.org/html/rfc6637#section-11
oid_size = stream.readfmt('B')
oid = stream.read(oid_size)
assert oid in SUPPORTED_CURVES, util.hexlify(oid)
p['curve_oid'] = oid
mpi = parse_mpi(stream)
log.debug('mpi: %x (%d bits)', mpi, mpi.bit_length())
leftover = stream.read()
if leftover:
leftover = io.BytesIO(leftover)
# https://tools.ietf.org/html/rfc6637#section-8
# should be b'\x03\x01\x08\x07': SHA256 + AES128
size, = util.readfmt(leftover, 'B')
p['kdf'] = leftover.read(size)
p['secret'] = leftover.read()
parse_func, keygrip_func = SUPPORTED_CURVES[oid]
keygrip = keygrip_func(parse_func(mpi))
log.debug('keygrip: %s', util.hexlify(keygrip))
p['keygrip'] = keygrip
elif p['algo'] == DSA_ALGO_ID:
parse_mpis(stream, n=4) # DSA keys are not supported
elif p['algo'] == ELGAMAL_ALGO_ID:
parse_mpis(stream, n=3) # ElGamal keys are not supported
else: # assume RSA
parse_mpis(stream, n=2) # RSA keys are not supported
assert not stream.read()
# https://tools.ietf.org/html/rfc4880#section-12.2
packet_data = packet.getvalue()
data_to_hash = (b'\x99' + struct.pack('>H', len(packet_data)) +
packet_data)
p['key_id'] = hashlib.sha1(data_to_hash).digest()[-8:]
p['_to_hash'] = data_to_hash
log.debug('key ID: %s', util.hexlify(p['key_id']))
return p
_parse_subkey = functools.partial(_parse_pubkey, packet_type='subkey')
def _parse_user_id(stream, packet_type='user_id'):
"""See https://tools.ietf.org/html/rfc4880#section-5.11 for details."""
value = stream.read()
to_hash = b'\xb4' + util.prefix_len('>L', value)
return {'type': packet_type, 'value': value, '_to_hash': to_hash}
# User attribute is handled as an opaque user ID
_parse_attribute = functools.partial(_parse_user_id,
packet_type='user_attribute')
PACKET_TYPES = {
2: _parse_signature,
5: _parse_pubkey,
6: _parse_pubkey,
7: _parse_subkey,
13: _parse_user_id,
14: _parse_subkey,
17: _parse_attribute,
}
def parse_packets(stream):
"""
Support iterative parsing of available GPG packets.
See https://tools.ietf.org/html/rfc4880#section-4.2 for details.
"""
reader = util.Reader(stream)
while True:
try:
value = reader.readfmt('B')
except EOFError:
return
log.debug('prefix byte: %s', bin(value))
assert util.bit(value, 7) == 1
tag = util.low_bits(value, 6)
if util.bit(value, 6) == 0:
length_type = util.low_bits(tag, 2)
tag = tag >> 2
fmt = {0: '>B', 1: '>H', 2: '>L'}[length_type]
packet_size = reader.readfmt(fmt)
else:
first = reader.readfmt('B')
if first < 192:
packet_size = first
elif first < 224:
packet_size = ((first - 192) << 8) + reader.readfmt('B') + 192
elif first == 255:
packet_size = reader.readfmt('>L')
else:
log.error('Partial Body Lengths unsupported')
log.debug('packet length: %d', packet_size)
packet_data = reader.read(packet_size)
packet_type = PACKET_TYPES.get(tag)
p = {'type': 'unknown', 'tag': tag, 'raw': packet_data}
if packet_type is not None:
try:
p = packet_type(util.Reader(io.BytesIO(packet_data)))
p['tag'] = tag
except ValueError:
log.exception('Skipping packet: %s', util.hexlify(packet_data))
log.debug('packet "%s": %s', p['type'], p)
yield p
def digest_packets(packets, hasher):
"""Compute digest on specified packets, according to '_to_hash' field."""
data_to_hash = io.BytesIO()
for p in packets:
data_to_hash.write(p['_to_hash'])
hasher.update(data_to_hash.getvalue())
return hasher.digest()
HASH_ALGORITHMS = {
1: 'md5',
2: 'sha1',
3: 'ripemd160',
8: 'sha256',
9: 'sha384',
10: 'sha512',
11: 'sha224',
}
def load_by_keygrip(pubkey_bytes, keygrip):
"""Return public key and first user ID for specified keygrip."""
stream = io.BytesIO(pubkey_bytes)
packets = list(parse_packets(stream))
packets_per_pubkey = []
for p in packets:
if p['type'] == 'pubkey':
# Add a new packet list for each pubkey.
packets_per_pubkey.append([])
packets_per_pubkey[-1].append(p)
for packets in packets_per_pubkey:
user_ids = [p for p in packets if p['type'] == 'user_id']
for p in packets:
if p.get('keygrip') == keygrip:
return p, user_ids
raise KeyError('{} keygrip not found'.format(util.hexlify(keygrip)))
def load_signature(stream, original_data):
"""Load signature from stream, and compute GPG digest for verification."""
signature, = list(parse_packets((stream)))
hash_alg = HASH_ALGORITHMS[signature['hash_alg']]
digest = digest_packets([{'_to_hash': original_data}, signature],
hasher=hashlib.new(hash_alg))
assert signature['hash_prefix'] == digest[:2]
return signature, digest
def remove_armor(armored_data):
"""Decode armored data into its binary form."""
stream = io.BytesIO(armored_data)
lines = stream.readlines()[3:-1]
data = base64.b64decode(b''.join(lines))
payload, checksum = data[:-3], data[-3:]
assert util.crc24(payload) == checksum
return payload

103
libagent/gpg/encode.py Normal file
View File

@@ -0,0 +1,103 @@
"""Create GPG ECDSA signatures and public keys using TREZOR device."""
import io
import logging
from . import decode, keyring, protocol
from .. import util
log = logging.getLogger(__name__)
def create_primary(user_id, pubkey, signer_func, secret_bytes=b''):
"""Export new primary GPG public key, ready for "gpg2 --import"."""
pubkey_packet = protocol.packet(tag=(5 if secret_bytes else 6),
blob=(pubkey.data() + secret_bytes))
user_id_packet = protocol.packet(tag=13,
blob=user_id.encode('ascii'))
data_to_sign = (pubkey.data_to_hash() +
user_id_packet[:1] +
util.prefix_len('>L', user_id.encode('ascii')))
hashed_subpackets = [
protocol.subpacket_time(pubkey.created), # signature time
# https://tools.ietf.org/html/rfc4880#section-5.2.3.7
protocol.subpacket_byte(0x0B, 9), # preferred symmetric algo (AES-256)
# https://tools.ietf.org/html/rfc4880#section-5.2.3.4
protocol.subpacket_byte(0x1B, 1 | 2), # key flags (certify & sign)
# https://tools.ietf.org/html/rfc4880#section-5.2.3.21
protocol.subpacket_byte(0x15, 8), # preferred hash (SHA256)
# https://tools.ietf.org/html/rfc4880#section-5.2.3.8
protocol.subpacket_byte(0x16, 0), # preferred compression (none)
# https://tools.ietf.org/html/rfc4880#section-5.2.3.9
protocol.subpacket_byte(0x17, 0x80) # key server prefs (no-modify)
# https://tools.ietf.org/html/rfc4880#section-5.2.3.17
]
unhashed_subpackets = [
protocol.subpacket(16, pubkey.key_id()), # issuer key id
protocol.CUSTOM_SUBPACKET]
signature = protocol.make_signature(
signer_func=signer_func,
public_algo=pubkey.algo_id,
data_to_sign=data_to_sign,
sig_type=0x13, # user id & public key
hashed_subpackets=hashed_subpackets,
unhashed_subpackets=unhashed_subpackets)
sign_packet = protocol.packet(tag=2, blob=signature)
return pubkey_packet + user_id_packet + sign_packet
def create_subkey(primary_bytes, subkey, signer_func, secret_bytes=b''):
"""Export new subkey to GPG primary key."""
subkey_packet = protocol.packet(tag=(7 if secret_bytes else 14),
blob=(subkey.data() + secret_bytes))
packets = list(decode.parse_packets(io.BytesIO(primary_bytes)))
primary, user_id, signature = packets[:3]
data_to_sign = primary['_to_hash'] + subkey.data_to_hash()
if subkey.ecdh:
embedded_sig = None
else:
# Primary Key Binding Signature
hashed_subpackets = [
protocol.subpacket_time(subkey.created)] # signature time
unhashed_subpackets = [
protocol.subpacket(16, subkey.key_id())] # issuer key id
embedded_sig = protocol.make_signature(
signer_func=signer_func,
data_to_sign=data_to_sign,
public_algo=subkey.algo_id,
sig_type=0x19,
hashed_subpackets=hashed_subpackets,
unhashed_subpackets=unhashed_subpackets)
# Subkey Binding Signature
# Key flags: https://tools.ietf.org/html/rfc4880#section-5.2.3.21
# (certify & sign) (encrypt)
flags = (2) if (not subkey.ecdh) else (4 | 8)
hashed_subpackets = [
protocol.subpacket_time(subkey.created), # signature time
protocol.subpacket_byte(0x1B, flags)]
unhashed_subpackets = []
unhashed_subpackets.append(protocol.subpacket(16, primary['key_id']))
if embedded_sig is not None:
unhashed_subpackets.append(protocol.subpacket(32, embedded_sig))
unhashed_subpackets.append(protocol.CUSTOM_SUBPACKET)
if not decode.has_custom_subpacket(signature):
signer_func = keyring.create_agent_signer(user_id['value'])
signature = protocol.make_signature(
signer_func=signer_func,
data_to_sign=data_to_sign,
public_algo=primary['algo'],
sig_type=0x18,
hashed_subpackets=hashed_subpackets,
unhashed_subpackets=unhashed_subpackets)
sign_packet = protocol.packet(tag=2, blob=signature)
return primary_bytes + subkey_packet + sign_packet

229
libagent/gpg/keyring.py Normal file
View File

@@ -0,0 +1,229 @@
"""Tools for doing signature using gpg-agent."""
from __future__ import absolute_import, print_function, unicode_literals
import binascii
import io
import logging
import os
import re
import socket
import subprocess
from .. import util
log = logging.getLogger(__name__)
def get_agent_sock_path(sp=subprocess):
"""Parse gpgconf output to find out GPG agent UNIX socket path."""
lines = sp.check_output(['gpgconf', '--list-dirs']).strip().split(b'\n')
dirs = dict(line.split(b':', 1) for line in lines)
return dirs[b'agent-socket']
def connect_to_agent(sp=subprocess):
"""Connect to GPG agent's UNIX socket."""
sock_path = get_agent_sock_path(sp=sp)
sp.check_call(['gpg-connect-agent', '/bye']) # Make sure it's running
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(sock_path)
return sock
def communicate(sock, msg):
"""Send a message and receive a single line."""
sendline(sock, msg.encode('ascii'))
return recvline(sock)
def sendline(sock, msg):
"""Send a binary message, followed by EOL."""
log.debug('<- %r', msg)
sock.sendall(msg + b'\n')
def recvline(sock):
"""Receive a single line from the socket."""
reply = io.BytesIO()
while True:
c = sock.recv(1)
if not c:
return None # socket is closed
if c == b'\n':
break
reply.write(c)
result = reply.getvalue()
log.debug('-> %r', result)
return result
def iterlines(conn):
"""Iterate over input, split by lines."""
while True:
line = recvline(conn)
if line is None:
break
yield line
def unescape(s):
"""Unescape ASSUAN message (0xAB <-> '%AB')."""
s = bytearray(s)
i = 0
while i < len(s):
if s[i] == ord('%'):
hex_bytes = bytes(s[i+1:i+3])
value = int(hex_bytes.decode('ascii'), 16)
s[i:i+3] = [value]
i += 1
return bytes(s)
def parse_term(s):
"""Parse single s-expr term from bytes."""
size, s = s.split(b':', 1)
size = int(size)
return s[:size], s[size:]
def parse(s):
"""Parse full s-expr from bytes."""
if s.startswith(b'('):
s = s[1:]
name, s = parse_term(s)
values = [name]
while not s.startswith(b')'):
value, s = parse(s)
values.append(value)
return values, s[1:]
else:
return parse_term(s)
def _parse_ecdsa_sig(args):
(r, sig_r), (s, sig_s) = args
assert r == b'r'
assert s == b's'
return (util.bytes2num(sig_r),
util.bytes2num(sig_s))
# DSA and EDDSA happen to have the same structure as ECDSA signatures
_parse_dsa_sig = _parse_ecdsa_sig
_parse_eddsa_sig = _parse_ecdsa_sig
def _parse_rsa_sig(args):
(s, sig_s), = args
assert s == b's'
return (util.bytes2num(sig_s),)
def parse_sig(sig):
"""Parse signature integer values from s-expr."""
label, sig = sig
assert label == b'sig-val'
algo_name = sig[0]
parser = {b'rsa': _parse_rsa_sig,
b'ecdsa': _parse_ecdsa_sig,
b'eddsa': _parse_eddsa_sig,
b'dsa': _parse_dsa_sig}[algo_name]
return parser(args=sig[1:])
def sign_digest(sock, keygrip, digest, sp=subprocess, environ=None):
"""Sign a digest using specified key using GPG agent."""
hash_algo = 8 # SHA256
assert len(digest) == 32
assert communicate(sock, 'RESET').startswith(b'OK')
ttyname = sp.check_output(['tty']).strip()
options = ['ttyname={}'.format(ttyname)] # set TTY for passphrase entry
display = (environ or os.environ).get('DISPLAY')
if display is not None:
options.append('display={}'.format(display))
for opt in options:
assert communicate(sock, 'OPTION {}'.format(opt)) == b'OK'
assert communicate(sock, 'SIGKEY {}'.format(keygrip)) == b'OK'
hex_digest = binascii.hexlify(digest).upper().decode('ascii')
assert communicate(sock, 'SETHASH {} {}'.format(hash_algo,
hex_digest)) == b'OK'
assert communicate(sock, 'SETKEYDESC '
'Sign+a+new+TREZOR-based+subkey') == b'OK'
assert communicate(sock, 'PKSIGN') == b'OK'
while True:
line = recvline(sock).strip()
if line.startswith(b'S PROGRESS'):
continue
else:
break
line = unescape(line)
log.debug('unescaped: %r', line)
prefix, sig = line.split(b' ', 1)
if prefix != b'D':
raise ValueError(prefix)
sig, leftover = parse(sig)
assert not leftover, leftover
return parse_sig(sig)
def gpg_command(args, env=None):
"""Prepare common GPG command line arguments."""
if env is None:
env = os.environ
cmd = ['gpg2']
homedir = env.get('GNUPGHOME')
if homedir:
cmd.extend(['--homedir', homedir])
return cmd + args
def get_keygrip(user_id, sp=subprocess):
"""Get a keygrip of the primary GPG key of the specified user."""
args = gpg_command(['--list-keys', '--with-keygrip', user_id])
output = sp.check_output(args).decode('ascii')
return re.findall(r'Keygrip = (\w+)', output)[0]
def gpg_version(sp=subprocess):
"""Get a keygrip of the primary GPG key of the specified user."""
args = gpg_command(['--version'])
output = sp.check_output(args)
line = output.split(b'\n')[0] # b'gpg (GnuPG) 2.1.11'
return line.split(b' ')[-1] # b'2.1.11'
def export_public_key(user_id, sp=subprocess):
"""Export GPG public key for specified `user_id`."""
args = gpg_command(['--export', user_id])
result = sp.check_output(args=args)
if not result:
log.error('could not find public key %r in local GPG keyring', user_id)
raise KeyError(user_id)
return result
def export_public_keys(sp=subprocess):
"""Export all GPG public keys."""
args = gpg_command(['--export'])
return sp.check_output(args=args)
def create_agent_signer(user_id):
"""Sign digest with existing GPG keys using gpg-agent tool."""
sock = connect_to_agent()
keygrip = get_keygrip(user_id)
def sign(digest):
"""Sign the digest and return an ECDSA/RSA/DSA signature."""
return sign_digest(sock=sock, keygrip=keygrip, digest=digest)
return sign

271
libagent/gpg/protocol.py Normal file
View File

@@ -0,0 +1,271 @@
"""GPG protocol utilities."""
import base64
import hashlib
import logging
import struct
from .. import formats, util
log = logging.getLogger(__name__)
def packet(tag, blob):
"""Create small GPG packet."""
assert len(blob) < 2**32
if len(blob) < 2**8:
length_type = 0
elif len(blob) < 2**16:
length_type = 1
else:
length_type = 2
fmt = ['>B', '>H', '>L'][length_type]
leading_byte = 0x80 | (tag << 2) | (length_type)
return struct.pack('>B', leading_byte) + util.prefix_len(fmt, blob)
def subpacket(subpacket_type, fmt, *values):
"""Create GPG subpacket."""
blob = struct.pack(fmt, *values) if values else fmt
return struct.pack('>B', subpacket_type) + blob
def subpacket_long(subpacket_type, value):
"""Create GPG subpacket with 32-bit unsigned integer."""
return subpacket(subpacket_type, '>L', value)
def subpacket_time(value):
"""Create GPG subpacket with time in seconds (since Epoch)."""
return subpacket_long(2, value)
def subpacket_byte(subpacket_type, value):
"""Create GPG subpacket with 8-bit unsigned integer."""
return subpacket(subpacket_type, '>B', value)
def subpacket_prefix_len(item):
"""Prefix subpacket length according to RFC 4880 section-5.2.3.1."""
n = len(item)
if n >= 8384:
prefix = b'\xFF' + struct.pack('>L', n)
elif n >= 192:
n = n - 192
prefix = struct.pack('BB', (n // 256) + 192, n % 256)
else:
prefix = struct.pack('B', n)
return prefix + item
def subpackets(*items):
"""Serialize several GPG subpackets."""
prefixed = [subpacket_prefix_len(item) for item in items]
return util.prefix_len('>H', b''.join(prefixed))
def mpi(value):
"""Serialize multipresicion integer using GPG format."""
bits = value.bit_length()
data_size = (bits + 7) // 8
data_bytes = bytearray(data_size)
for i in range(data_size):
data_bytes[i] = value & 0xFF
value = value >> 8
data_bytes.reverse()
return struct.pack('>H', bits) + bytes(data_bytes)
def _serialize_nist256(vk):
return mpi((4 << 512) |
(vk.pubkey.point.x() << 256) |
(vk.pubkey.point.y()))
def _serialize_ed25519(vk):
return mpi((0x40 << 256) |
util.bytes2num(vk.to_bytes()))
def _compute_keygrip(params):
parts = []
for name, value in params:
exp = '{}:{}{}:'.format(len(name), name, len(value))
parts.append(b'(' + exp.encode('ascii') + value + b')')
return hashlib.sha1(b''.join(parts)).digest()
def keygrip_nist256(vk):
"""Compute keygrip for NIST256 curve public keys."""
curve = vk.curve.curve
gen = vk.curve.generator
g = (4 << 512) | (gen.x() << 256) | gen.y()
point = vk.pubkey.point
q = (4 << 512) | (point.x() << 256) | point.y()
return _compute_keygrip([
['p', util.num2bytes(curve.p(), size=32)],
['a', util.num2bytes(curve.a() % curve.p(), size=32)],
['b', util.num2bytes(curve.b() % curve.p(), size=32)],
['g', util.num2bytes(g, size=65)],
['n', util.num2bytes(vk.curve.order, size=32)],
['q', util.num2bytes(q, size=65)],
])
def keygrip_ed25519(vk):
"""Compute keygrip for Ed25519 public keys."""
# pylint: disable=line-too-long
return _compute_keygrip([
['p', util.num2bytes(0x7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFED, size=32)], # nopep8
['a', b'\x01'],
['b', util.num2bytes(0x2DFC9311D490018C7338BF8688861767FF8FF5B2BEBE27548A14B235ECA6874A, size=32)], # nopep8
['g', util.num2bytes(0x04216936D3CD6E53FEC0A4E231FDD6DC5C692CC7609525A7B2C9562D608F25D51A6666666666666666666666666666666666666666666666666666666666666658, size=65)], # nopep8
['n', util.num2bytes(0x1000000000000000000000000000000014DEF9DEA2F79CD65812631A5CF5D3ED, size=32)], # nopep8
['q', vk.to_bytes()],
])
def keygrip_curve25519(vk):
"""Compute keygrip for Curve25519 public keys."""
# pylint: disable=line-too-long
return _compute_keygrip([
['p', util.num2bytes(0x7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFED, size=32)], # nopep8
['a', b'\x01\xDB\x41'],
['b', b'\x01'],
['g', util.num2bytes(0x04000000000000000000000000000000000000000000000000000000000000000920ae19a1b8a086b4e01edd2c7748d14c923d4d7e6d7c61b229e9c5a27eced3d9, size=65)], # nopep8
['n', util.num2bytes(0x1000000000000000000000000000000014DEF9DEA2F79CD65812631A5CF5D3ED, size=32)], # nopep8
['q', vk.to_bytes()],
])
SUPPORTED_CURVES = {
formats.CURVE_NIST256: {
# https://tools.ietf.org/html/rfc6637#section-11
'oid': b'\x2A\x86\x48\xCE\x3D\x03\x01\x07',
'algo_id': 19,
'serialize': _serialize_nist256,
'keygrip': keygrip_nist256,
},
formats.CURVE_ED25519: {
'oid': b'\x2B\x06\x01\x04\x01\xDA\x47\x0F\x01',
'algo_id': 22,
'serialize': _serialize_ed25519,
'keygrip': keygrip_ed25519,
},
formats.ECDH_CURVE25519: {
'oid': b'\x2B\x06\x01\x04\x01\x97\x55\x01\x05\x01',
'algo_id': 18,
'serialize': _serialize_ed25519,
'keygrip': keygrip_curve25519,
},
}
ECDH_ALGO_ID = 18
CUSTOM_KEY_LABEL = b'TREZOR-GPG' # marks "our" pubkey
CUSTOM_SUBPACKET_ID = 26 # use "policy URL" subpacket
CUSTOM_SUBPACKET = subpacket(CUSTOM_SUBPACKET_ID, CUSTOM_KEY_LABEL)
def get_curve_name_by_oid(oid):
"""Return curve name matching specified OID, or raise KeyError."""
for curve_name, info in SUPPORTED_CURVES.items():
if info['oid'] == oid:
return curve_name
raise KeyError('Unknown OID: {!r}'.format(oid))
class PublicKey(object):
"""GPG representation for public key packets."""
def __init__(self, curve_name, created, verifying_key, ecdh=False):
"""Contruct using a ECDSA VerifyingKey object."""
self.curve_name = curve_name
self.curve_info = SUPPORTED_CURVES[curve_name]
self.created = int(created) # time since Epoch
self.verifying_key = verifying_key
self.ecdh = bool(ecdh)
if ecdh:
self.algo_id = ECDH_ALGO_ID
self.ecdh_packet = b'\x03\x01\x08\x07'
else:
self.algo_id = self.curve_info['algo_id']
self.ecdh_packet = b''
def keygrip(self):
"""Compute GPG keygrip of the verifying key."""
return self.curve_info['keygrip'](self.verifying_key)
def data(self):
"""Data for packet creation."""
header = struct.pack('>BLB',
4, # version
self.created, # creation
self.algo_id) # public key algorithm ID
oid = util.prefix_len('>B', self.curve_info['oid'])
blob = self.curve_info['serialize'](self.verifying_key)
return header + oid + blob + self.ecdh_packet
def data_to_hash(self):
"""Data for digest computation."""
return b'\x99' + util.prefix_len('>H', self.data())
def _fingerprint(self):
return hashlib.sha1(self.data_to_hash()).digest()
def key_id(self):
"""Short (8 byte) GPG key ID."""
return self._fingerprint()[-8:]
def __repr__(self):
"""Short (8 hexadecimal digits) GPG key ID."""
hex_key_id = util.hexlify(self.key_id())[-8:]
return 'GPG public key {}/{}'.format(self.curve_name, hex_key_id)
__str__ = __repr__
def _split_lines(body, size):
lines = []
for i in range(0, len(body), size):
lines.append(body[i:i+size] + '\n')
return ''.join(lines)
def armor(blob, type_str):
"""See https://tools.ietf.org/html/rfc4880#section-6 for details."""
head = '-----BEGIN PGP {}-----\nVersion: GnuPG v2\n\n'.format(type_str)
body = base64.b64encode(blob).decode('ascii')
checksum = base64.b64encode(util.crc24(blob)).decode('ascii')
tail = '-----END PGP {}-----\n'.format(type_str)
return head + _split_lines(body, 64) + '=' + checksum + '\n' + tail
def make_signature(signer_func, data_to_sign, public_algo,
hashed_subpackets, unhashed_subpackets, sig_type=0):
"""Create new GPG signature."""
# pylint: disable=too-many-arguments
header = struct.pack('>BBBB',
4, # version
sig_type, # rfc4880 (section-5.2.1)
public_algo,
8) # hash_alg (SHA256)
hashed = subpackets(*hashed_subpackets)
unhashed = subpackets(*unhashed_subpackets)
tail = b'\x04\xff' + struct.pack('>L', len(header) + len(hashed))
data_to_hash = data_to_sign + header + hashed + tail
log.debug('hashing %d bytes', len(data_to_hash))
digest = hashlib.sha256(data_to_hash).digest()
log.debug('signing digest: %s', util.hexlify(digest))
params = signer_func(digest=digest)
sig = b''.join(mpi(p) for p in params)
return bytes(header + hashed + unhashed +
digest[:2] + # used for decoder's sanity check
sig) # actual ECDSA signature

View File

@@ -0,0 +1 @@
"""Tests for GPG module."""

View File

@@ -0,0 +1,62 @@
import glob
import io
import os
import pytest
from .. import decode, protocol
from ... import util
def test_subpackets():
s = io.BytesIO(b'\x00\x05\x02\xAB\xCD\x01\xEF')
assert decode.parse_subpackets(util.Reader(s)) == [b'\xAB\xCD', b'\xEF']
def test_subpackets_prefix():
for n in [0, 1, 2, 4, 5, 10, 191, 192, 193,
255, 256, 257, 8383, 8384, 65530]:
item = b'?' * n # create dummy subpacket
prefixed = protocol.subpackets(item)
result = decode.parse_subpackets(util.Reader(io.BytesIO(prefixed)))
assert [item] == result
def test_mpi():
s = io.BytesIO(b'\x00\x09\x01\x23')
assert decode.parse_mpi(util.Reader(s)) == 0x123
s = io.BytesIO(b'\x00\x09\x01\x23\x00\x03\x05')
assert decode.parse_mpis(util.Reader(s), n=2) == [0x123, 5]
cwd = os.path.join(os.path.dirname(__file__))
input_files = glob.glob(os.path.join(cwd, '*.gpg'))
@pytest.fixture(params=input_files)
def public_key_path(request):
return request.param
def test_gpg_files(public_key_path): # pylint: disable=redefined-outer-name
with open(public_key_path, 'rb') as f:
assert list(decode.parse_packets(f))
def test_has_custom_subpacket():
sig = {'unhashed_subpackets': []}
assert not decode.has_custom_subpacket(sig)
custom_markers = [
protocol.CUSTOM_SUBPACKET,
protocol.subpacket(10, protocol.CUSTOM_KEY_LABEL),
]
for marker in custom_markers:
sig = {'unhashed_subpackets': [marker]}
assert decode.has_custom_subpacket(sig)
def test_load_by_keygrip_missing():
with pytest.raises(KeyError):
decode.load_by_keygrip(pubkey_bytes=b'', keygrip=b'')

View File

@@ -0,0 +1,101 @@
import io
import mock
from .. import keyring
def test_unescape_short():
assert keyring.unescape(b'abc%0AX%0D %25;.-+()') == b'abc\nX\r %;.-+()'
def test_unescape_long():
escaped = (b'D (7:sig-val(3:dsa(1:r32:\x1d\x15.\x12\xe8h\x19\xd9O\xeb\x06'
b'yD?a:/\xae\xdb\xac\x93\xa6\x86\xcbs\xb8\x03\xf1\xcb\x89\xc7'
b'\x1f)(1:s32:%25\xb5\x04\x94\xc7\xc4X\xc7\xe0%0D\x08\xbb%0DuN'
b'\x9c6}[\xc2=t\x8c\xfdD\x81\xe8\xdd\x86=\xe2\xa9)))')
unescaped = (b'D (7:sig-val(3:dsa(1:r32:\x1d\x15.\x12\xe8h\x19\xd9O\xeb'
b'\x06yD?a:/\xae\xdb\xac\x93\xa6\x86\xcbs\xb8\x03\xf1\xcb\x89'
b'\xc7\x1f)(1:s32:%\xb5\x04\x94\xc7\xc4X\xc7\xe0\r\x08\xbb\ru'
b'N\x9c6}[\xc2=t\x8c\xfdD\x81\xe8\xdd\x86=\xe2\xa9)))')
assert keyring.unescape(escaped) == unescaped
def test_parse_term():
assert keyring.parse(b'4:abcdXXX') == (b'abcd', b'XXX')
def test_parse_ecdsa():
sig, rest = keyring.parse(b'(7:sig-val(5:ecdsa'
b'(1:r2:\x01\x02)(1:s2:\x03\x04)))')
values = [[b'r', b'\x01\x02'], [b's', b'\x03\x04']]
assert sig == [b'sig-val', [b'ecdsa'] + values]
assert rest == b''
assert keyring.parse_sig(sig) == (0x102, 0x304)
def test_parse_rsa():
sig, rest = keyring.parse(b'(7:sig-val(3:rsa(1:s4:\x01\x02\x03\x04)))')
assert sig == [b'sig-val', [b'rsa', [b's', b'\x01\x02\x03\x04']]]
assert rest == b''
assert keyring.parse_sig(sig) == (0x1020304,)
class FakeSocket(object):
def __init__(self):
self.rx = io.BytesIO()
self.tx = io.BytesIO()
def recv(self, n):
return self.rx.read(n)
def sendall(self, data):
self.tx.write(data)
def test_sign_digest():
sock = FakeSocket()
sock.rx.write(b'OK Pleased to meet you, process XYZ\n')
sock.rx.write(b'OK\n' * 6)
sock.rx.write(b'D (7:sig-val(3:rsa(1:s16:0123456789ABCDEF)))\n')
sock.rx.seek(0)
keygrip = '1234'
digest = b'A' * 32
sp = mock.Mock(spec=['check_output'])
sp.check_output.return_value = '/dev/pts/0'
sig = keyring.sign_digest(sock=sock, keygrip=keygrip,
digest=digest, sp=sp,
environ={'DISPLAY': ':0'})
assert sig == (0x30313233343536373839414243444546,)
assert sock.tx.getvalue() == b'''RESET
OPTION ttyname=/dev/pts/0
OPTION display=:0
SIGKEY 1234
SETHASH 8 4141414141414141414141414141414141414141414141414141414141414141
SETKEYDESC Sign+a+new+TREZOR-based+subkey
PKSIGN
'''
def test_iterlines():
sock = FakeSocket()
sock.rx.write(b'foo\nbar\nxyz')
sock.rx.seek(0)
assert list(keyring.iterlines(sock)) == [b'foo', b'bar']
def test_get_agent_sock_path():
sp = mock.Mock(spec=['check_output'])
sp.check_output.return_value = b'''sysconfdir:/usr/local/etc/gnupg
bindir:/usr/local/bin
libexecdir:/usr/local/libexec
libdir:/usr/local/lib/gnupg
datadir:/usr/local/share/gnupg
localedir:/usr/local/share/locale
dirmngr-socket:/run/user/1000/gnupg/S.dirmngr
agent-ssh-socket:/run/user/1000/gnupg/S.gpg-agent.ssh
agent-socket:/run/user/1000/gnupg/S.gpg-agent
homedir:/home/roman/.gnupg
'''
expected = b'/run/user/1000/gnupg/S.gpg-agent'
assert keyring.get_agent_sock_path(sp=sp) == expected

View File

@@ -0,0 +1,107 @@
import ecdsa
import ed25519
import pytest
from .. import protocol
from ... import formats
def test_packet():
assert protocol.packet(1, b'') == b'\x84\x00'
assert protocol.packet(2, b'A') == b'\x88\x01A'
blob = b'B' * 0xAB
assert protocol.packet(3, blob) == b'\x8c\xAB' + blob
blob = b'C' * 0x1234
assert protocol.packet(3, blob) == b'\x8d\x12\x34' + blob
blob = b'D' * 0x12345678
assert protocol.packet(4, blob) == b'\x92\x12\x34\x56\x78' + blob
def test_subpackets():
assert protocol.subpacket(1, b'') == b'\x01'
assert protocol.subpacket(2, '>H', 0x0304) == b'\x02\x03\x04'
assert protocol.subpacket_long(9, 0x12345678) == b'\x09\x12\x34\x56\x78'
assert protocol.subpacket_time(0x12345678) == b'\x02\x12\x34\x56\x78'
assert protocol.subpacket_byte(0xAB, 0xCD) == b'\xAB\xCD'
assert protocol.subpackets() == b'\x00\x00'
assert protocol.subpackets(b'ABC', b'12345') == b'\x00\x0A\x03ABC\x0512345'
def test_mpi():
assert protocol.mpi(0x123) == b'\x00\x09\x01\x23'
def test_armor():
data = bytearray(range(256))
assert protocol.armor(data, 'TEST') == '''-----BEGIN PGP TEST-----
Version: GnuPG v2
AAECAwQFBgcICQoLDA0ODxAREhMUFRYXGBkaGxwdHh8gISIjJCUmJygpKissLS4v
MDEyMzQ1Njc4OTo7PD0+P0BBQkNERUZHSElKS0xNTk9QUVJTVFVWV1hZWltcXV5f
YGFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6e3x9fn+AgYKDhIWGh4iJiouMjY6P
kJGSk5SVlpeYmZqbnJ2en6ChoqOkpaanqKmqq6ytrq+wsbKztLW2t7i5uru8vb6/
wMHCw8TFxsfIycrLzM3Oz9DR0tPU1dbX2Nna29zd3t/g4eLj5OXm5+jp6uvs7e7v
8PHy8/T19vf4+fr7/P3+/w==
=W700
-----END PGP TEST-----
'''
def test_make_signature():
def signer_func(digest):
assert digest == (b'\xd0\xe5]|\x8bP\xe6\x91\xb3\xe8+\xf4A\xf0`(\xb1'
b'\xc7\xf4;\x86\x97s\xdb\x9a\xda\xee< \xcb\x9e\x00')
return (7, 8)
sig = protocol.make_signature(
signer_func=signer_func,
data_to_sign=b'Hello World!',
public_algo=22,
hashed_subpackets=[protocol.subpacket_time(1)],
unhashed_subpackets=[],
sig_type=25)
assert sig == (b'\x04\x19\x16\x08\x00\x06\x05\x02'
b'\x00\x00\x00\x01\x00\x00\xd0\xe5\x00\x03\x07\x00\x04\x08')
def test_nist256p1():
sk = ecdsa.SigningKey.from_secret_exponent(secexp=1, curve=ecdsa.NIST256p)
vk = sk.get_verifying_key()
pk = protocol.PublicKey(curve_name=formats.CURVE_NIST256,
created=42, verifying_key=vk)
assert repr(pk) == 'GPG public key nist256p1/F82361D9'
assert pk.keygrip() == b'\x95\x85.\x91\x7f\xe2\xc3\x91R\xba\x99\x81\x92\xb5y\x1d\xb1\\\xdc\xf0'
def test_nist256p1_ecdh():
sk = ecdsa.SigningKey.from_secret_exponent(secexp=1, curve=ecdsa.NIST256p)
vk = sk.get_verifying_key()
pk = protocol.PublicKey(curve_name=formats.CURVE_NIST256,
created=42, verifying_key=vk, ecdh=True)
assert repr(pk) == 'GPG public key nist256p1/5811DF46'
assert pk.keygrip() == b'\x95\x85.\x91\x7f\xe2\xc3\x91R\xba\x99\x81\x92\xb5y\x1d\xb1\\\xdc\xf0'
def test_ed25519():
sk = ed25519.SigningKey(b'\x00' * 32)
vk = sk.get_verifying_key()
pk = protocol.PublicKey(curve_name=formats.CURVE_ED25519,
created=42, verifying_key=vk)
assert repr(pk) == 'GPG public key ed25519/36B40FE6'
assert pk.keygrip() == b'\xbf\x01\x90l\x17\xb64\xa3-\xf4\xc0gr\x99\x18<\xddBQ?'
def test_curve25519():
sk = ed25519.SigningKey(b'\x00' * 32)
vk = sk.get_verifying_key()
pk = protocol.PublicKey(curve_name=formats.ECDH_CURVE25519,
created=42, verifying_key=vk)
assert repr(pk) == 'GPG public key curve25519/69460384'
assert pk.keygrip() == b'x\xd6\x86\xe4\xa6\xfc;\x0fY\xe1}Lw\xc4\x9ed\xf1Q\x8a\x00'
def test_get_curve_name_by_oid():
for name, info in protocol.SUPPORTED_CURVES.items():
assert protocol.get_curve_name_by_oid(info['oid']) == name
with pytest.raises(KeyError):
protocol.get_curve_name_by_oid('BAD_OID')