Fix Windows compatibility issues and AGE and GPG support

This commit is contained in:
SlugFiller
2023-09-10 02:30:33 +03:00
15 changed files with 1012 additions and 102 deletions

View File

@@ -14,11 +14,15 @@ import functools
import logging
import os
import re
import stat
import subprocess
import sys
import time
import daemon
try:
# TODO: Not supported on Windows. Use daemoniker instead?
import daemon
except ImportError:
daemon = None
import pkg_resources
import semver
@@ -39,6 +43,7 @@ def export_public_key(device_type, args):
verifying_key = c.pubkey(identity=identity, ecdh=False)
decryption_key = c.pubkey(identity=identity, ecdh=True)
signer_func = functools.partial(c.sign, identity=identity)
fingerprints = []
if args.subkey: # add as subkey
log.info('adding %s GPG subkey for "%s" to existing key',
@@ -47,10 +52,12 @@ def export_public_key(device_type, args):
signing_key = protocol.PublicKey(
curve_name=args.ecdsa_curve, created=args.time,
verifying_key=verifying_key, ecdh=False)
fingerprints.append(util.hexlify(signing_key.fingerprint()))
# subkey for encryption
encryption_key = protocol.PublicKey(
curve_name=formats.get_ecdh_curve_name(args.ecdsa_curve),
created=args.time, verifying_key=decryption_key, ecdh=True)
fingerprints.append(util.hexlify(encryption_key.fingerprint()))
primary_bytes = keyring.export_public_key(args.user_id)
result = encode.create_subkey(primary_bytes=primary_bytes,
subkey=signing_key,
@@ -65,10 +72,12 @@ def export_public_key(device_type, args):
primary = protocol.PublicKey(
curve_name=args.ecdsa_curve, created=args.time,
verifying_key=verifying_key, ecdh=False)
fingerprints.append(util.hexlify(primary.fingerprint()))
# subkey for encryption
subkey = protocol.PublicKey(
curve_name=formats.get_ecdh_curve_name(args.ecdsa_curve),
created=args.time, verifying_key=decryption_key, ecdh=True)
fingerprints.append(util.hexlify(subkey.fingerprint()))
result = encode.create_primary(user_id=args.user_id,
pubkey=primary,
@@ -77,7 +86,7 @@ def export_public_key(device_type, args):
subkey=subkey,
signer_func=signer_func)
return protocol.armor(result, 'PUBLIC KEY BLOCK')
return (fingerprints, protocol.armor(result, 'PUBLIC KEY BLOCK'))
def verify_gpg_version():
@@ -98,10 +107,10 @@ def check_output(args):
return out
def check_call(args, stdin=None, env=None):
def check_call(args, stdin=None, input_bytes=None, env=None):
"""Runs command and verifies its success."""
log.debug('run: %s%s', args, ' {}'.format(env) if env else '')
subprocess.check_call(args=args, stdin=stdin, env=env)
subprocess.run(args=args, stdin=stdin, input=input_bytes, env=env, check=True)
def write_file(path, data):
@@ -135,32 +144,44 @@ def run_init(device_type, args):
'remove it manually if required', homedir)
sys.exit(1)
check_call(['mkdir', '-p', homedir])
check_call(['chmod', '700', homedir])
# Prepare the key before making any changes
fingerprints, public_key_bytes = export_public_key(device_type, args)
os.makedirs(homedir, mode=0o700)
agent_path = util.which('{}-gpg-agent'.format(device_name))
# Prepare GPG agent invocation script (to pass the PATH from environment).
with open(os.path.join(homedir, 'run-agent.sh'), 'w') as f:
f.write(r"""#!/bin/sh
with open(os.path.join(homedir, ('run-agent.sh'
if sys.platform != 'win32' else
'run-agent.bat')), 'w') as f:
if sys.platform != 'win32':
f.write(r"""#!/bin/sh
export PATH="{0}"
{1} \
-vv \
--pin-entry-binary={pin_entry_binary} \
--passphrase-entry-binary={passphrase_entry_binary} \
--cache-expiry-seconds={cache_expiry_seconds} \
$*
""".format(os.environ['PATH'], agent_path, **vars(args)))
check_call(['chmod', '700', f.name])
""".format(util.escape_cmd_quotes(os.environ['PATH'])))
else:
f.write(r"""@echo off
set PATH={0}
""".format(util.escape_cmd_win(os.environ['PATH'])))
f.write('"{0}" -vv'.format(util.escape_cmd_quotes(agent_path)))
for arg in ['pin_entry_binary', 'passphrase_entry_binary', 'cache_expiry_seconds']:
if hasattr(args, arg):
f.write(' "--{0}={1}"'.format(arg.replace('_', '-'),
util.escape_cmd_quotes(getattr(args, arg))))
if sys.platform != 'win32':
f.write(' $*\n')
else:
f.write(' %*\n')
os.chmod(f.name, 0o700)
run_agent_script = f.name
# Prepare GPG configuration file
with open(os.path.join(homedir, 'gpg.conf'), 'w') as f:
f.write("""# Hardware-based GPG configuration
agent-program {0}
agent-program "{0}"
personal-digest-preferences SHA512
default-key \"{1}\"
""".format(run_agent_script, args.user_id))
default-key {1}
""".format(util.escape_cmd_quotes(run_agent_script), fingerprints[0]))
# Prepare a helper script for setting up the new identity
with open(os.path.join(homedir, 'env'), 'w') as f:
@@ -175,24 +196,18 @@ else
${{COMMAND}}
fi
""".format(homedir))
check_call(['chmod', '700', f.name])
os.chmod(f.name, 0o700)
# Generate new GPG identity and import into GPG keyring
pubkey = write_file(os.path.join(homedir, 'pubkey.asc'),
export_public_key(device_type, args))
verbosity = ('-' + ('v' * args.verbose)) if args.verbose else '--quiet'
check_call(keyring.gpg_command(['--homedir', homedir, verbosity,
'--import', pubkey.name]))
'--import']),
input_bytes=public_key_bytes.encode())
# Make new GPG identity with "ultimate" trust (via its fingerprint)
out = check_output(keyring.gpg_command(['--homedir', homedir,
'--list-public-keys',
'--with-fingerprint',
'--with-colons']))
fpr = re.findall('fpr:::::::::([0-9A-F]+):', out)[0]
f = write_file(os.path.join(homedir, 'ownertrust.txt'), fpr + ':6\n')
check_call(keyring.gpg_command(['--homedir', homedir,
'--import-ownertrust', f.name]))
'--import-ownertrust']),
input_bytes=(fingerprints[0] + ':6\n').encode())
# Load agent and make sure it responds with the new identity
check_call(keyring.gpg_command(['--homedir', homedir,
@@ -226,8 +241,9 @@ def run_agent(device_type):
p.add_argument('-v', '--verbose', default=0, action='count')
p.add_argument('--server', default=False, action='store_true',
help='Use stdin/stdout for communication with GPG.')
p.add_argument('--daemon', default=False, action='store_true',
help='Daemonize the agent.')
if daemon:
p.add_argument('--daemon', default=False, action='store_true',
help='Daemonize the agent.')
p.add_argument('--pin-entry-binary', type=str, default='pinentry',
help='Path to PIN entry UI helper.')
@@ -238,7 +254,7 @@ def run_agent(device_type):
args, _ = p.parse_known_args()
if args.daemon:
if daemon and args.daemon:
with daemon.DaemonContext():
run_agent_internal(args, device_type)
else:
@@ -312,11 +328,11 @@ def main(device_type):
p.add_argument('--homedir', type=str, default=os.environ.get('GNUPGHOME'),
help='Customize GnuPG home directory for the new identity.')
p.add_argument('--pin-entry-binary', type=str, default='pinentry',
p.add_argument('--pin-entry-binary', type=str, default=argparse.SUPPRESS,
help='Path to PIN entry UI helper.')
p.add_argument('--passphrase-entry-binary', type=str, default='pinentry',
p.add_argument('--passphrase-entry-binary', type=str, default=argparse.SUPPRESS,
help='Path to passphrase entry UI helper.')
p.add_argument('--cache-expiry-seconds', type=float, default=float('inf'),
p.add_argument('--cache-expiry-seconds', type=float, default=argparse.SUPPRESS,
help='Expire passphrase from cache after this duration.')
p.set_defaults(func=run_init)

View File

@@ -8,9 +8,14 @@ import os
import re
import socket
import subprocess
import sys
import urllib.parse
from .. import util
if sys.platform == 'win32':
from .. import win_server
log = logging.getLogger(__name__)
@@ -27,12 +32,8 @@ def check_output(args, env=None, sp=subprocess):
def get_agent_sock_path(env=None, sp=subprocess):
"""Parse gpgconf output to find out GPG agent UNIX socket path."""
args = [util.which('gpgconf'), '--list-dirs']
output = check_output(args=args, env=env, sp=sp)
lines = output.strip().split(b'\n')
dirs = dict(line.split(b':', 1) for line in lines)
log.debug('%s: %s', args, dirs)
return dirs[b'agent-socket']
args = [util.which('gpgconf'), '--list-dirs', 'agent-socket']
return check_output(args=args, env=env, sp=sp).strip()
def connect_to_agent(env=None, sp=subprocess):
@@ -40,8 +41,11 @@ def connect_to_agent(env=None, sp=subprocess):
sock_path = get_agent_sock_path(sp=sp, env=env)
# Make sure the original gpg-agent is running.
check_output(args=['gpg-connect-agent', '/bye'], sp=sp)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(sock_path)
if sys.platform == 'win32':
sock = win_server.Client(sock_path)
else:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(sock_path)
return sock
@@ -193,7 +197,8 @@ def get_gnupg_components(sp=subprocess):
"""Parse GnuPG components' paths."""
args = [util.which('gpgconf'), '--list-components']
output = check_output(args=args, sp=sp)
components = dict(re.findall('(.*):.*:(.*)', output.decode('utf-8')))
components = {k: urllib.parse.unquote(v) for k, v in re.findall(
r'(?<!:)([^\n\r:]*):[^\n\r:]*:([^\n\r:]*)(?!:)', output.decode('utf-8'))}
log.debug('gpgconf --list-components: %s', components)
return components
@@ -206,6 +211,15 @@ def get_gnupg_binary(sp=subprocess, neopg_binary=None):
return get_gnupg_components(sp=sp)['gpg']
@util.memoize
def get_pinentry_binary(sp=subprocess):
"""Returns the exact path to `pinentry` if GPG is installed."""
try:
return get_gnupg_components(sp=sp)['pinentry']
except Exception: # pylint: disable=broad-except
return 'pinentry'
def gpg_command(args, env=None):
"""Prepare common GPG command line arguments."""
if env is None:
@@ -225,10 +239,10 @@ def gpg_version(sp=subprocess):
"""Get a keygrip of the primary GPG key of the specified user."""
args = gpg_command(['--version'])
output = check_output(args=args, sp=sp)
line = output.split(b'\n', maxsplit=1)[0] # b'gpg (GnuPG) 2.1.11'
line = line.split(b' ')[-1] # b'2.1.11'
line = line.split(b'-')[0] # remove trailing version parts
return line.split(b'v')[-1] # remove 'v' prefix
line = re.split('[\n\r]+', output.decode('utf-8'))[0] # b'gpg (GnuPG) 2.1.11'
line = line.split(' ')[-1] # b'2.1.11'
line = line.split('-')[0] # remove trailing version parts
return line.split('v')[-1].encode() # remove 'v' prefix
def export_public_key(user_id, env=None, sp=subprocess):

View File

@@ -222,12 +222,13 @@ class PublicKey:
"""Data for digest computation."""
return b'\x99' + util.prefix_len('>H', self.data())
def _fingerprint(self):
def fingerprint(self):
"""GPG key fingerprint as bytes."""
return hashlib.sha1(self.data_to_hash()).digest()
def key_id(self):
"""Short (8 byte) GPG key ID."""
return self._fingerprint()[-8:]
return self.fingerprint()[-8:]
def __repr__(self):
"""Short (8 hexadecimal digits) GPG key ID."""

View File

@@ -1,4 +1,5 @@
import io
import subprocess
import mock
@@ -91,16 +92,11 @@ def test_iterlines():
def test_get_agent_sock_path():
sp = mock_subprocess(b'''sysconfdir:/usr/local/etc/gnupg
bindir:/usr/local/bin
libexecdir:/usr/local/libexec
libdir:/usr/local/lib/gnupg
datadir:/usr/local/share/gnupg
localedir:/usr/local/share/locale
dirmngr-socket:/run/user/1000/gnupg/S.dirmngr
agent-ssh-socket:/run/user/1000/gnupg/S.gpg-agent.ssh
agent-socket:/run/user/1000/gnupg/S.gpg-agent
homedir:/home/roman/.gnupg
''')
expected = b'/run/user/1000/gnupg/S.gpg-agent'
assert keyring.get_agent_sock_path(sp=sp) == expected
expected_prefix = b'/run/user/'
expected_suffix = b'/gnupg/S.gpg-agent'
expected_infix = b'0123456789'
value = keyring.get_agent_sock_path(sp=subprocess)
assert value.startswith(expected_prefix)
assert value.endswith(expected_suffix)
value = value[len(expected_prefix):-len(expected_suffix)]
assert value.strip(expected_infix) == b''