Merge branch 'fixes'
This commit is contained in:
@@ -39,13 +39,17 @@ class FakeDevice(interface.Device):
|
||||
self.vk = self.sk.get_verifying_key()
|
||||
return self
|
||||
|
||||
def close(self):
|
||||
"""Close the device."""
|
||||
|
||||
def pubkey(self, identity, ecdh=False):
|
||||
"""Return public key."""
|
||||
_verify_support(identity)
|
||||
data = self.vk.to_string()
|
||||
x, y = data[:32], data[32:]
|
||||
prefix = bytearray([2 + (bytearray(y)[0] & 1)])
|
||||
return bytes(prefix) + x
|
||||
pubkey = bytes(prefix) + x
|
||||
return formats.decompress_pubkey(pubkey=pubkey, curve_name=identity.curve_name)
|
||||
|
||||
def sign(self, identity, blob):
|
||||
"""Sign given blob and return the signature (as bytes)."""
|
||||
|
||||
@@ -226,6 +226,8 @@ def run_agent(device_type):
|
||||
p.add_argument('-v', '--verbose', default=0, action='count')
|
||||
p.add_argument('--server', default=False, action='store_true',
|
||||
help='Use stdin/stdout for communication with GPG.')
|
||||
p.add_argument('--daemon', default=False, action='store_true',
|
||||
help='Daemonize the agent.')
|
||||
|
||||
p.add_argument('--pin-entry-binary', type=str, default='pinentry',
|
||||
help='Path to PIN entry UI helper.')
|
||||
@@ -236,6 +238,15 @@ def run_agent(device_type):
|
||||
|
||||
args, _ = p.parse_known_args()
|
||||
|
||||
if args.daemon:
|
||||
with daemon.DaemonContext():
|
||||
run_agent_internal(args, device_type)
|
||||
else:
|
||||
run_agent_internal(args, device_type)
|
||||
|
||||
|
||||
def run_agent_internal(args, device_type):
|
||||
"""Actually run the server."""
|
||||
assert args.homedir
|
||||
|
||||
log_file = os.path.join(args.homedir, 'gpg-agent.log')
|
||||
|
||||
@@ -99,7 +99,7 @@ class Handler:
|
||||
b'SETHASH': lambda _, args: self.set_hash(*args),
|
||||
b'PKSIGN': lambda conn, _: self.pksign(conn),
|
||||
b'PKDECRYPT': lambda conn, _: self.pkdecrypt(conn),
|
||||
b'HAVEKEY': lambda _, args: self.have_key(*args),
|
||||
b'HAVEKEY': lambda conn, args: self.have_key(conn, *args),
|
||||
b'KEYINFO': _key_info,
|
||||
b'SCD': self.handle_scd,
|
||||
b'GET_PASSPHRASE': self.handle_get_passphrase,
|
||||
@@ -198,8 +198,16 @@ class Handler:
|
||||
ec_point = self.client.ecdh(identity=identity, pubkey=remote_pubkey)
|
||||
keyring.sendline(conn, b'D ' + _serialize_point(ec_point))
|
||||
|
||||
def have_key(self, *keygrips):
|
||||
def have_key(self, conn, *keygrips):
|
||||
"""Check if any keygrip corresponds to a TREZOR-based key."""
|
||||
if len(keygrips) == 1 and keygrips[0].startswith(b"--list="):
|
||||
# Support "fast-path" key listing:
|
||||
# https://dev.gnupg.org/rG40da61b89b62dcb77847dc79eb159e885f52f817
|
||||
keygrips = list(decode.iter_keygrips(pubkey_bytes=self.pubkey_bytes))
|
||||
log.debug('keygrips: %r', keygrips)
|
||||
keyring.sendline(conn, b'D ' + util.assuan_serialize(b''.join(keygrips)))
|
||||
return
|
||||
|
||||
for keygrip in keygrips:
|
||||
try:
|
||||
self.get_identity(keygrip=keygrip)
|
||||
|
||||
@@ -282,18 +282,20 @@ HASH_ALGORITHMS = {
|
||||
}
|
||||
|
||||
|
||||
def load_by_keygrip(pubkey_bytes, keygrip):
|
||||
"""Return public key and first user ID for specified keygrip."""
|
||||
def _parse_pubkey_packets(pubkey_bytes):
|
||||
stream = io.BytesIO(pubkey_bytes)
|
||||
packets = list(parse_packets(stream))
|
||||
packets_per_pubkey = []
|
||||
for p in packets:
|
||||
for p in parse_packets(stream):
|
||||
if p['type'] == 'pubkey':
|
||||
# Add a new packet list for each pubkey.
|
||||
packets_per_pubkey.append([])
|
||||
packets_per_pubkey[-1].append(p)
|
||||
return packets_per_pubkey
|
||||
|
||||
for packets in packets_per_pubkey:
|
||||
|
||||
def load_by_keygrip(pubkey_bytes, keygrip):
|
||||
"""Return public key and first user ID for specified keygrip."""
|
||||
for packets in _parse_pubkey_packets(pubkey_bytes):
|
||||
user_ids = [p for p in packets if p['type'] == 'user_id']
|
||||
for p in packets:
|
||||
if p.get('keygrip') == keygrip:
|
||||
@@ -301,6 +303,15 @@ def load_by_keygrip(pubkey_bytes, keygrip):
|
||||
raise KeyError('{} keygrip not found'.format(util.hexlify(keygrip)))
|
||||
|
||||
|
||||
def iter_keygrips(pubkey_bytes):
|
||||
"""Iterate over all keygrips in this pubkey."""
|
||||
for packets in _parse_pubkey_packets(pubkey_bytes):
|
||||
for p in packets:
|
||||
keygrip = p.get('keygrip')
|
||||
if keygrip:
|
||||
yield keygrip
|
||||
|
||||
|
||||
def load_signature(stream, original_data):
|
||||
"""Load signature from stream, and compute GPG digest for verification."""
|
||||
signature, = list(parse_packets((stream)))
|
||||
|
||||
@@ -17,8 +17,11 @@ log = logging.getLogger(__name__)
|
||||
def check_output(args, env=None, sp=subprocess):
|
||||
"""Call an external binary and return its stdout."""
|
||||
log.debug('calling %s with env %s', args, env)
|
||||
output = sp.check_output(args=args, env=env)
|
||||
p = sp.Popen(args=args, env=env, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE)
|
||||
(output, error) = p.communicate()
|
||||
log.debug('output: %r', output)
|
||||
if error:
|
||||
log.debug('error: %r', error)
|
||||
return output
|
||||
|
||||
|
||||
|
||||
BIN
libagent/gpg/tests/romanz-pubkey.gpg
Normal file
BIN
libagent/gpg/tests/romanz-pubkey.gpg
Normal file
Binary file not shown.
@@ -1,6 +1,5 @@
|
||||
import glob
|
||||
import io
|
||||
import os
|
||||
import pathlib
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -30,8 +29,8 @@ def test_mpi():
|
||||
assert decode.parse_mpis(util.Reader(s), n=2) == [0x123, 5]
|
||||
|
||||
|
||||
cwd = os.path.join(os.path.dirname(__file__))
|
||||
input_files = glob.glob(os.path.join(cwd, '*.gpg'))
|
||||
cwd = pathlib.Path(__file__).parent
|
||||
input_files = cwd.glob('*.gpg')
|
||||
|
||||
|
||||
@pytest.fixture(params=input_files)
|
||||
@@ -60,3 +59,20 @@ def test_has_custom_subpacket():
|
||||
def test_load_by_keygrip_missing():
|
||||
with pytest.raises(KeyError):
|
||||
decode.load_by_keygrip(pubkey_bytes=b'', keygrip=b'')
|
||||
|
||||
|
||||
def test_keygrips():
|
||||
pubkey_bytes = (cwd / "romanz-pubkey.gpg").open("rb").read()
|
||||
keygrips = list(decode.iter_keygrips(pubkey_bytes))
|
||||
assert [k.hex() for k in keygrips] == [
|
||||
'7b2497258d76bc6539ed88d018cd1c739e2dbb6c',
|
||||
'30ae97f3d8e0e34c5ed80e1715fd442ca24c0a8e',
|
||||
]
|
||||
|
||||
for keygrip in keygrips:
|
||||
pubkey_dict, user_ids = decode.load_by_keygrip(pubkey_bytes, keygrip)
|
||||
assert pubkey_dict['keygrip'] == keygrip
|
||||
assert [u['value'] for u in user_ids] == [
|
||||
b'Roman Zeyde <roman.zeyde@gmail.com>',
|
||||
b'Roman Zeyde <me@romanzey.de>',
|
||||
]
|
||||
|
||||
@@ -53,6 +53,14 @@ class FakeSocket:
|
||||
self.tx.write(data)
|
||||
|
||||
|
||||
def mock_subprocess(output, error=b''):
|
||||
sp = mock.Mock(spec=['Popen', 'PIPE'])
|
||||
p = mock.Mock(spec=['communicate'])
|
||||
sp.Popen.return_value = p
|
||||
p.communicate.return_value = (output, error)
|
||||
return sp
|
||||
|
||||
|
||||
def test_sign_digest():
|
||||
sock = FakeSocket()
|
||||
sock.rx.write(b'OK Pleased to meet you, process XYZ\n')
|
||||
@@ -61,10 +69,8 @@ def test_sign_digest():
|
||||
sock.rx.seek(0)
|
||||
keygrip = '1234'
|
||||
digest = b'A' * 32
|
||||
sp = mock.Mock(spec=['check_output'])
|
||||
sp.check_output.return_value = '/dev/pts/0'
|
||||
sig = keyring.sign_digest(sock=sock, keygrip=keygrip,
|
||||
digest=digest, sp=sp,
|
||||
digest=digest, sp=mock_subprocess('/dev/pts/0'),
|
||||
environ={'DISPLAY': ':0'})
|
||||
assert sig == (0x30313233343536373839414243444546,)
|
||||
assert sock.tx.getvalue() == b'''RESET
|
||||
@@ -85,8 +91,7 @@ def test_iterlines():
|
||||
|
||||
|
||||
def test_get_agent_sock_path():
|
||||
sp = mock.Mock(spec=['check_output'])
|
||||
sp.check_output.return_value = b'''sysconfdir:/usr/local/etc/gnupg
|
||||
sp = mock_subprocess(b'''sysconfdir:/usr/local/etc/gnupg
|
||||
bindir:/usr/local/bin
|
||||
libexecdir:/usr/local/libexec
|
||||
libdir:/usr/local/lib/gnupg
|
||||
@@ -96,6 +101,6 @@ dirmngr-socket:/run/user/1000/gnupg/S.dirmngr
|
||||
agent-ssh-socket:/run/user/1000/gnupg/S.gpg-agent.ssh
|
||||
agent-socket:/run/user/1000/gnupg/S.gpg-agent
|
||||
homedir:/home/roman/.gnupg
|
||||
'''
|
||||
''')
|
||||
expected = b'/run/user/1000/gnupg/S.gpg-agent'
|
||||
assert keyring.get_agent_sock_path(sp=sp) == expected
|
||||
|
||||
Reference in New Issue
Block a user