ui: merge into a single module
This commit is contained in:
@@ -4,12 +4,53 @@ import logging
|
||||
import os
|
||||
import subprocess
|
||||
|
||||
from . import pinentry
|
||||
from ..gpg import agent
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _create_default_options_getter():
|
||||
class UI(object):
|
||||
"""UI for PIN/passphrase entry (for TREZOR devices)."""
|
||||
|
||||
def __init__(self, device_type, config=None):
|
||||
"""C-tor."""
|
||||
default_pinentry = 'pinentry' # by default, use GnuPG pinentry tool
|
||||
if config is None:
|
||||
config = {}
|
||||
self.pin_entry_binary = config.get('pin_entry_binary',
|
||||
default_pinentry)
|
||||
self.passphrase_entry_binary = config.get('passphrase_entry_binary',
|
||||
default_pinentry)
|
||||
self.options_getter = create_default_options_getter()
|
||||
self.device_name = device_type.__name__
|
||||
|
||||
def get_pin(self):
|
||||
"""Ask the user for (scrambled) PIN."""
|
||||
description = (
|
||||
'Use the numeric keypad to describe number positions.\n'
|
||||
'The layout is:\n'
|
||||
' 7 8 9\n'
|
||||
' 4 5 6\n'
|
||||
' 1 2 3')
|
||||
return interact(
|
||||
title='{} PIN'.format(self.device_name),
|
||||
prompt='PIN:',
|
||||
description=description,
|
||||
binary=self.pin_entry_binary,
|
||||
options=self.options_getter())
|
||||
|
||||
def get_passphrase(self):
|
||||
"""Ask the user for passphrase."""
|
||||
return interact(
|
||||
title='{} passphrase'.format(self.device_name),
|
||||
prompt='Passphrase:',
|
||||
description=None,
|
||||
binary=self.passphrase_entry_binary,
|
||||
options=self.options_getter())
|
||||
|
||||
|
||||
def create_default_options_getter():
|
||||
"""Return current TTY and DISPLAY settings for GnuPG pinentry."""
|
||||
options = []
|
||||
try:
|
||||
ttyname = subprocess.check_output(args=['tty']).strip()
|
||||
@@ -27,41 +68,56 @@ def _create_default_options_getter():
|
||||
return lambda: options
|
||||
|
||||
|
||||
class UI(object):
|
||||
"""UI for PIN/passphrase entry (for TREZOR devices)."""
|
||||
def write(p, line):
|
||||
"""Send and flush a single line to the subprocess' stdin."""
|
||||
log.debug('%s <- %r', p.args, line)
|
||||
p.stdin.write(line)
|
||||
p.stdin.flush()
|
||||
|
||||
def __init__(self, device_type, config=None):
|
||||
"""C-tor."""
|
||||
default_pinentry = 'pinentry' # by default, use GnuPG pinentry tool
|
||||
if config is None:
|
||||
config = {}
|
||||
self.pin_entry_binary = config.get('pin_entry_binary',
|
||||
default_pinentry)
|
||||
self.passphrase_entry_binary = config.get('passphrase_entry_binary',
|
||||
default_pinentry)
|
||||
self.options_getter = _create_default_options_getter()
|
||||
self.device_name = device_type.__name__
|
||||
|
||||
def get_pin(self):
|
||||
"""Ask the user for (scrambled) PIN."""
|
||||
description = (
|
||||
'Use the numeric keypad to describe number positions.\n'
|
||||
'The layout is:\n'
|
||||
' 7 8 9\n'
|
||||
' 4 5 6\n'
|
||||
' 1 2 3')
|
||||
return pinentry.interact(
|
||||
title='{} PIN'.format(self.device_name),
|
||||
prompt='PIN:',
|
||||
description=description,
|
||||
binary=self.pin_entry_binary,
|
||||
options=self.options_getter())
|
||||
def expect(p, prefixes):
|
||||
"""Read a line and return it without required prefix."""
|
||||
resp = p.stdout.readline()
|
||||
log.debug('%s -> %r', p.args, resp)
|
||||
for prefix in prefixes:
|
||||
if resp.startswith(prefix):
|
||||
return resp[len(prefix):]
|
||||
raise ValueError('Unexpected response: {}'.format(resp))
|
||||
|
||||
def get_passphrase(self):
|
||||
"""Ask the user for passphrase."""
|
||||
return pinentry.interact(
|
||||
title='{} passphrase'.format(self.device_name),
|
||||
prompt='Passphrase:',
|
||||
description=None,
|
||||
binary=self.passphrase_entry_binary,
|
||||
options=self.options_getter())
|
||||
|
||||
def interact(title, description, prompt, binary, options):
|
||||
"""Use GPG pinentry program to interact with the user."""
|
||||
p = subprocess.Popen(args=[binary],
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
env=os.environ)
|
||||
expect(p, [b'OK'])
|
||||
|
||||
title = agent.serialize(title.encode('ascii'))
|
||||
write(p, b'SETTITLE ' + title + b'\n')
|
||||
expect(p, [b'OK'])
|
||||
|
||||
if description:
|
||||
description = agent.serialize(description.encode('ascii'))
|
||||
write(p, b'SETDESC ' + description + b'\n')
|
||||
expect(p, [b'OK'])
|
||||
|
||||
if prompt:
|
||||
prompt = agent.serialize(prompt.encode('ascii'))
|
||||
write(p, b'SETPROMPT ' + prompt + b'\n')
|
||||
expect(p, [b'OK'])
|
||||
|
||||
log.debug('setting %d options', len(options))
|
||||
for opt in options:
|
||||
write(p, b'OPTION ' + opt + b'\n')
|
||||
expect(p, [b'OK', b'ERR'])
|
||||
|
||||
write(p, b'GETPIN\n')
|
||||
pin = expect(p, [b'OK', b'D '])
|
||||
|
||||
p.communicate() # close stdin and wait for the process to exit
|
||||
exit_code = p.wait()
|
||||
if exit_code:
|
||||
raise subprocess.CalledProcessError(exit_code, binary)
|
||||
|
||||
return pin.decode('ascii').strip()
|
||||
@@ -1,64 +0,0 @@
|
||||
"""Python wrapper for GnuPG's pinentry."""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
|
||||
import libagent.gpg.agent
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def write(p, line):
|
||||
"""Send and flush a single line to the subprocess' stdin."""
|
||||
log.debug('%s <- %r', p.args, line)
|
||||
p.stdin.write(line)
|
||||
p.stdin.flush()
|
||||
|
||||
|
||||
def expect(p, prefixes):
|
||||
"""Read a line and return it without required prefix."""
|
||||
resp = p.stdout.readline()
|
||||
log.debug('%s -> %r', p.args, resp)
|
||||
for prefix in prefixes:
|
||||
if resp.startswith(prefix):
|
||||
return resp[len(prefix):]
|
||||
raise ValueError('Unexpected response: {}'.format(resp))
|
||||
|
||||
|
||||
def interact(title, description, prompt, binary, options):
|
||||
"""Use GPG pinentry program to interact with the user."""
|
||||
p = subprocess.Popen(args=[binary],
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
env=os.environ)
|
||||
expect(p, [b'OK'])
|
||||
|
||||
title = libagent.gpg.agent.serialize(title.encode('ascii'))
|
||||
write(p, b'SETTITLE ' + title + b'\n')
|
||||
expect(p, [b'OK'])
|
||||
|
||||
if description:
|
||||
description = libagent.gpg.agent.serialize(description.encode('ascii'))
|
||||
write(p, b'SETDESC ' + description + b'\n')
|
||||
expect(p, [b'OK'])
|
||||
|
||||
if prompt:
|
||||
prompt = libagent.gpg.agent.serialize(prompt.encode('ascii'))
|
||||
write(p, b'SETPROMPT ' + prompt + b'\n')
|
||||
expect(p, [b'OK'])
|
||||
|
||||
log.debug('setting %d options', len(options))
|
||||
for opt in options:
|
||||
write(p, b'OPTION ' + opt + b'\n')
|
||||
expect(p, [b'OK', b'ERR'])
|
||||
|
||||
write(p, b'GETPIN\n')
|
||||
pin = expect(p, [b'OK', b'D '])
|
||||
|
||||
p.communicate() # close stdin and wait for the process to exit
|
||||
exit_code = p.wait()
|
||||
if exit_code:
|
||||
raise subprocess.CalledProcessError(exit_code, binary)
|
||||
|
||||
return pin.decode('ascii').strip()
|
||||
Reference in New Issue
Block a user