gpg: use shutil.which() for Python 3
This commit is contained in:
@@ -129,8 +129,7 @@ def run_init(device_type, args):
|
|||||||
check_call(['mkdir', '-p', homedir])
|
check_call(['mkdir', '-p', homedir])
|
||||||
check_call(['chmod', '700', homedir])
|
check_call(['chmod', '700', homedir])
|
||||||
|
|
||||||
agent_path = check_output(['which', '{}-gpg-agent'.format(device_name)])
|
agent_path = util.which('{}-gpg-agent'.format(device_name))
|
||||||
agent_path = agent_path.strip()
|
|
||||||
|
|
||||||
# Prepare GPG configuration file
|
# Prepare GPG configuration file
|
||||||
with open(os.path.join(homedir, 'gpg.conf'), 'w') as f:
|
with open(os.path.join(homedir, 'gpg.conf'), 'w') as f:
|
||||||
|
|||||||
@@ -16,10 +16,11 @@ log = logging.getLogger(__name__)
|
|||||||
|
|
||||||
def get_agent_sock_path(env=None, sp=subprocess):
|
def get_agent_sock_path(env=None, sp=subprocess):
|
||||||
"""Parse gpgconf output to find out GPG agent UNIX socket path."""
|
"""Parse gpgconf output to find out GPG agent UNIX socket path."""
|
||||||
output = sp.check_output(['gpgconf', '--list-dirs'], env=env)
|
args = [util.which('gpgconf'), '--list-dirs']
|
||||||
|
output = sp.check_output(args=args, env=env)
|
||||||
lines = output.strip().split(b'\n')
|
lines = output.strip().split(b'\n')
|
||||||
dirs = dict(line.split(b':', 1) for line in lines)
|
dirs = dict(line.split(b':', 1) for line in lines)
|
||||||
log.debug('gpgconf --list-dirs: %s', dirs)
|
log.debug('%s: %s', args, dirs)
|
||||||
return dirs[b'agent-socket']
|
return dirs[b'agent-socket']
|
||||||
|
|
||||||
|
|
||||||
@@ -179,22 +180,23 @@ def sign_digest(sock, keygrip, digest, sp=subprocess, environ=None):
|
|||||||
|
|
||||||
def get_gnupg_components(sp=subprocess):
|
def get_gnupg_components(sp=subprocess):
|
||||||
"""Parse GnuPG components' paths."""
|
"""Parse GnuPG components' paths."""
|
||||||
output = sp.check_output(['gpgconf', '--list-components'])
|
output = sp.check_output([util.which('gpgconf'), '--list-components'])
|
||||||
components = dict(re.findall('(.*):.*:(.*)', output.decode('ascii')))
|
components = dict(re.findall('(.*):.*:(.*)', output.decode('ascii')))
|
||||||
log.debug('gpgconf --list-components: %s', components)
|
log.debug('gpgconf --list-components: %s', components)
|
||||||
return components
|
return components
|
||||||
|
|
||||||
|
|
||||||
|
@util.memoize
|
||||||
def get_gnupg_binary(sp=subprocess):
|
def get_gnupg_binary(sp=subprocess):
|
||||||
"""Starting GnuPG 2.2.x, the default installation uses `gpg`."""
|
"""Starting GnuPG 2.2.x, the default installation uses `gpg`."""
|
||||||
return get_gnupg_components(sp=sp)['gpg']
|
return get_gnupg_components(sp=sp)['gpg']
|
||||||
|
|
||||||
|
|
||||||
def gpg_command(args, env=None, sp=subprocess):
|
def gpg_command(args, env=None):
|
||||||
"""Prepare common GPG command line arguments."""
|
"""Prepare common GPG command line arguments."""
|
||||||
if env is None:
|
if env is None:
|
||||||
env = os.environ
|
env = os.environ
|
||||||
cmd = [get_gnupg_binary(sp=sp)]
|
cmd = [get_gnupg_binary()]
|
||||||
homedir = env.get('GNUPGHOME')
|
homedir = env.get('GNUPGHOME')
|
||||||
if homedir:
|
if homedir:
|
||||||
cmd.extend(['--homedir', homedir])
|
cmd.extend(['--homedir', homedir])
|
||||||
@@ -203,14 +205,14 @@ def gpg_command(args, env=None, sp=subprocess):
|
|||||||
|
|
||||||
def get_keygrip(user_id, sp=subprocess):
|
def get_keygrip(user_id, sp=subprocess):
|
||||||
"""Get a keygrip of the primary GPG key of the specified user."""
|
"""Get a keygrip of the primary GPG key of the specified user."""
|
||||||
args = gpg_command(['--list-keys', '--with-keygrip', user_id], sp=sp)
|
args = gpg_command(['--list-keys', '--with-keygrip', user_id])
|
||||||
output = sp.check_output(args).decode('ascii')
|
output = sp.check_output(args).decode('ascii')
|
||||||
return re.findall(r'Keygrip = (\w+)', output)[0]
|
return re.findall(r'Keygrip = (\w+)', output)[0]
|
||||||
|
|
||||||
|
|
||||||
def gpg_version(sp=subprocess):
|
def gpg_version(sp=subprocess):
|
||||||
"""Get a keygrip of the primary GPG key of the specified user."""
|
"""Get a keygrip of the primary GPG key of the specified user."""
|
||||||
args = gpg_command(['--version'], sp=sp)
|
args = gpg_command(['--version'])
|
||||||
output = sp.check_output(args)
|
output = sp.check_output(args)
|
||||||
line = output.split(b'\n')[0] # b'gpg (GnuPG) 2.1.11'
|
line = output.split(b'\n')[0] # b'gpg (GnuPG) 2.1.11'
|
||||||
return line.split(b' ')[-1] # b'2.1.11'
|
return line.split(b' ')[-1] # b'2.1.11'
|
||||||
@@ -218,7 +220,7 @@ def gpg_version(sp=subprocess):
|
|||||||
|
|
||||||
def export_public_key(user_id, env=None, sp=subprocess):
|
def export_public_key(user_id, env=None, sp=subprocess):
|
||||||
"""Export GPG public key for specified `user_id`."""
|
"""Export GPG public key for specified `user_id`."""
|
||||||
args = gpg_command(['--export', user_id], sp=sp)
|
args = gpg_command(['--export', user_id])
|
||||||
result = sp.check_output(args=args, env=env)
|
result = sp.check_output(args=args, env=env)
|
||||||
if not result:
|
if not result:
|
||||||
log.error('could not find public key %r in local GPG keyring', user_id)
|
log.error('could not find public key %r in local GPG keyring', user_id)
|
||||||
@@ -228,7 +230,7 @@ def export_public_key(user_id, env=None, sp=subprocess):
|
|||||||
|
|
||||||
def export_public_keys(env=None, sp=subprocess):
|
def export_public_keys(env=None, sp=subprocess):
|
||||||
"""Export all GPG public keys."""
|
"""Export all GPG public keys."""
|
||||||
args = gpg_command(['--export'], sp=sp)
|
args = gpg_command(['--export'])
|
||||||
result = sp.check_output(args=args, env=env)
|
result = sp.check_output(args=args, env=env)
|
||||||
if not result:
|
if not result:
|
||||||
raise KeyError('No GPG public keys found at env: {!r}'.format(env))
|
raise KeyError('No GPG public keys found at env: {!r}'.format(env))
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import contextlib
|
|||||||
import functools
|
import functools
|
||||||
import io
|
import io
|
||||||
import logging
|
import logging
|
||||||
|
import shutil
|
||||||
import struct
|
import struct
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
@@ -213,3 +214,13 @@ def memoize(func):
|
|||||||
return result
|
return result
|
||||||
|
|
||||||
return wrapper
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
|
@memoize
|
||||||
|
def which(cmd):
|
||||||
|
"""Return full path to specified command, or raise OSError if missing."""
|
||||||
|
full_path = shutil.which(cmd)
|
||||||
|
if full_path is None:
|
||||||
|
raise OSError('Cannot find {!r} in $PATH'.format(cmd))
|
||||||
|
log.debug('which %r => %r', cmd, full_path)
|
||||||
|
return full_path
|
||||||
|
|||||||
Reference in New Issue
Block a user