From 34c03a462ce4b099ca933cfbaf509da4781aa9d4 Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Sun, 11 Mar 2018 14:33:54 +0200 Subject: [PATCH] ui: merge into a single module --- libagent/device/{ui/__init__.py => ui.py} | 130 ++++++++++++++++------ libagent/device/ui/pinentry.py | 64 ----------- 2 files changed, 93 insertions(+), 101 deletions(-) rename libagent/device/{ui/__init__.py => ui.py} (50%) delete mode 100644 libagent/device/ui/pinentry.py diff --git a/libagent/device/ui/__init__.py b/libagent/device/ui.py similarity index 50% rename from libagent/device/ui/__init__.py rename to libagent/device/ui.py index c9f213f..20c0434 100644 --- a/libagent/device/ui/__init__.py +++ b/libagent/device/ui.py @@ -4,12 +4,53 @@ import logging import os import subprocess -from . import pinentry +from ..gpg import agent log = logging.getLogger(__name__) -def _create_default_options_getter(): +class UI(object): + """UI for PIN/passphrase entry (for TREZOR devices).""" + + def __init__(self, device_type, config=None): + """C-tor.""" + default_pinentry = 'pinentry' # by default, use GnuPG pinentry tool + if config is None: + config = {} + self.pin_entry_binary = config.get('pin_entry_binary', + default_pinentry) + self.passphrase_entry_binary = config.get('passphrase_entry_binary', + default_pinentry) + self.options_getter = create_default_options_getter() + self.device_name = device_type.__name__ + + def get_pin(self): + """Ask the user for (scrambled) PIN.""" + description = ( + 'Use the numeric keypad to describe number positions.\n' + 'The layout is:\n' + ' 7 8 9\n' + ' 4 5 6\n' + ' 1 2 3') + return interact( + title='{} PIN'.format(self.device_name), + prompt='PIN:', + description=description, + binary=self.pin_entry_binary, + options=self.options_getter()) + + def get_passphrase(self): + """Ask the user for passphrase.""" + return interact( + title='{} passphrase'.format(self.device_name), + prompt='Passphrase:', + description=None, + binary=self.passphrase_entry_binary, + options=self.options_getter()) + + +def create_default_options_getter(): + """Return current TTY and DISPLAY settings for GnuPG pinentry.""" options = [] try: ttyname = subprocess.check_output(args=['tty']).strip() @@ -27,41 +68,56 @@ def _create_default_options_getter(): return lambda: options -class UI(object): - """UI for PIN/passphrase entry (for TREZOR devices).""" +def write(p, line): + """Send and flush a single line to the subprocess' stdin.""" + log.debug('%s <- %r', p.args, line) + p.stdin.write(line) + p.stdin.flush() - def __init__(self, device_type, config=None): - """C-tor.""" - default_pinentry = 'pinentry' # by default, use GnuPG pinentry tool - if config is None: - config = {} - self.pin_entry_binary = config.get('pin_entry_binary', - default_pinentry) - self.passphrase_entry_binary = config.get('passphrase_entry_binary', - default_pinentry) - self.options_getter = _create_default_options_getter() - self.device_name = device_type.__name__ - def get_pin(self): - """Ask the user for (scrambled) PIN.""" - description = ( - 'Use the numeric keypad to describe number positions.\n' - 'The layout is:\n' - ' 7 8 9\n' - ' 4 5 6\n' - ' 1 2 3') - return pinentry.interact( - title='{} PIN'.format(self.device_name), - prompt='PIN:', - description=description, - binary=self.pin_entry_binary, - options=self.options_getter()) +def expect(p, prefixes): + """Read a line and return it without required prefix.""" + resp = p.stdout.readline() + log.debug('%s -> %r', p.args, resp) + for prefix in prefixes: + if resp.startswith(prefix): + return resp[len(prefix):] + raise ValueError('Unexpected response: {}'.format(resp)) - def get_passphrase(self): - """Ask the user for passphrase.""" - return pinentry.interact( - title='{} passphrase'.format(self.device_name), - prompt='Passphrase:', - description=None, - binary=self.passphrase_entry_binary, - options=self.options_getter()) + +def interact(title, description, prompt, binary, options): + """Use GPG pinentry program to interact with the user.""" + p = subprocess.Popen(args=[binary], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + env=os.environ) + expect(p, [b'OK']) + + title = agent.serialize(title.encode('ascii')) + write(p, b'SETTITLE ' + title + b'\n') + expect(p, [b'OK']) + + if description: + description = agent.serialize(description.encode('ascii')) + write(p, b'SETDESC ' + description + b'\n') + expect(p, [b'OK']) + + if prompt: + prompt = agent.serialize(prompt.encode('ascii')) + write(p, b'SETPROMPT ' + prompt + b'\n') + expect(p, [b'OK']) + + log.debug('setting %d options', len(options)) + for opt in options: + write(p, b'OPTION ' + opt + b'\n') + expect(p, [b'OK', b'ERR']) + + write(p, b'GETPIN\n') + pin = expect(p, [b'OK', b'D ']) + + p.communicate() # close stdin and wait for the process to exit + exit_code = p.wait() + if exit_code: + raise subprocess.CalledProcessError(exit_code, binary) + + return pin.decode('ascii').strip() diff --git a/libagent/device/ui/pinentry.py b/libagent/device/ui/pinentry.py deleted file mode 100644 index 0d308d4..0000000 --- a/libagent/device/ui/pinentry.py +++ /dev/null @@ -1,64 +0,0 @@ -"""Python wrapper for GnuPG's pinentry.""" - -import logging -import os -import subprocess - -import libagent.gpg.agent - -log = logging.getLogger(__name__) - - -def write(p, line): - """Send and flush a single line to the subprocess' stdin.""" - log.debug('%s <- %r', p.args, line) - p.stdin.write(line) - p.stdin.flush() - - -def expect(p, prefixes): - """Read a line and return it without required prefix.""" - resp = p.stdout.readline() - log.debug('%s -> %r', p.args, resp) - for prefix in prefixes: - if resp.startswith(prefix): - return resp[len(prefix):] - raise ValueError('Unexpected response: {}'.format(resp)) - - -def interact(title, description, prompt, binary, options): - """Use GPG pinentry program to interact with the user.""" - p = subprocess.Popen(args=[binary], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - env=os.environ) - expect(p, [b'OK']) - - title = libagent.gpg.agent.serialize(title.encode('ascii')) - write(p, b'SETTITLE ' + title + b'\n') - expect(p, [b'OK']) - - if description: - description = libagent.gpg.agent.serialize(description.encode('ascii')) - write(p, b'SETDESC ' + description + b'\n') - expect(p, [b'OK']) - - if prompt: - prompt = libagent.gpg.agent.serialize(prompt.encode('ascii')) - write(p, b'SETPROMPT ' + prompt + b'\n') - expect(p, [b'OK']) - - log.debug('setting %d options', len(options)) - for opt in options: - write(p, b'OPTION ' + opt + b'\n') - expect(p, [b'OK', b'ERR']) - - write(p, b'GETPIN\n') - pin = expect(p, [b'OK', b'D ']) - - p.communicate() # close stdin and wait for the process to exit - exit_code = p.wait() - if exit_code: - raise subprocess.CalledProcessError(exit_code, binary) - - return pin.decode('ascii').strip()