diff --git a/agents/trezor/setup.py b/agents/trezor/setup.py index f6ff367..63f169b 100644 --- a/agents/trezor/setup.py +++ b/agents/trezor/setup.py @@ -37,5 +37,6 @@ setup( 'trezor-gpg = trezor_agent:gpg_tool', 'trezor-gpg-agent = trezor_agent:gpg_agent', 'trezor-signify = trezor_agent:signify_tool', + 'age-plugin-trezor = trezor_agent:age_tool', # see https://github.com/str4d/rage/blob/main/age-plugin/README.md ]}, ) diff --git a/agents/trezor/trezor_agent.py b/agents/trezor/trezor_agent.py index c6fa935..78758cf 100644 --- a/agents/trezor/trezor_agent.py +++ b/agents/trezor/trezor_agent.py @@ -1,6 +1,7 @@ -from libagent import signify, gpg, ssh +from libagent import age, signify, gpg, ssh from libagent.device.trezor import Trezor as DeviceType +age_tool = lambda: age.main(DeviceType) ssh_agent = lambda: ssh.main(DeviceType) gpg_tool = lambda: gpg.main(DeviceType) gpg_agent = lambda: gpg.run_agent(DeviceType) diff --git a/libagent/age/__init__.py b/libagent/age/__init__.py new file mode 100644 index 0000000..15cc3ee --- /dev/null +++ b/libagent/age/__init__.py @@ -0,0 +1,175 @@ +""" +TREZOR support for AGE format. + +See these links for more details: + - https://age-encryption.org/v1 + - https://github.com/FiloSottile/age + - https://github.com/str4d/rage/ +""" + +import argparse +import base64 +import contextlib +import datetime +import logging +import os +import sys +import traceback + +import bech32 +import pkg_resources +import semver +from cryptography.exceptions import InvalidTag +from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 + +from .. import device, server, util +from . import client + +log = logging.getLogger(__name__) + + +def bech32_decode(prefix, encoded): + """Decode Bech32-encoded data.""" + hrp, data = bech32.bech32_decode(encoded) + assert prefix == hrp + return bytes(bech32.convertbits(data, 5, 8, pad=False)) + + +def bech32_encode(prefix, data): + """Encode data using Bech32.""" + return bech32.bech32_encode(prefix, bech32.convertbits(bytes(data), 8, 5)) + + +def run_pubkey(device_type, args): + """Initialize hardware-based GnuPG identity.""" + log.warning('This AGE tool is still in EXPERIMENTAL mode, ' + 'so please note that the API and features may ' + 'change without backwards compatibility!') + + c = client.Client(device=device_type()) + pubkey = c.pubkey(identity=client.create_identity(args.identity), ecdh=True) + recipient = bech32_encode(prefix="age", data=pubkey) + print(f"# recipient: {recipient}") + print(f"# SLIP-0017: {args.identity}") + data = args.identity.encode() + encoded = bech32_encode(prefix="age-plugin-trezor-", data=data).upper() + decoded = bech32_decode(prefix="age-plugin-trezor-", encoded=encoded) + assert decoded.startswith(data) + print(encoded) + + +def base64_decode(encoded: str) -> bytes: + """Decode Base64-encoded data (after padding correctly with '=').""" + k = len(encoded) % 4 + pad = (4 - k) if k else 0 + return base64.b64decode(encoded + ("=" * pad)) + + +def base64_encode(data: bytes) -> str: + """Encode data using Base64 (and remove '=').""" + return base64.b64encode(data).replace(b"=", b"").decode() + + +def decrypt(key, encrypted): + """Decrypt age-encrypted data.""" + cipher = ChaCha20Poly1305(key) + try: + return cipher.decrypt( + nonce=(b"\x00" * 12), + data=encrypted, + associated_data=None) + except InvalidTag: + return None + + +def run_decrypt(device_type, args): + """Unlock hardware device (for future interaction).""" + # pylint: disable=too-many-locals + c = client.Client(device=device_type()) + + lines = (line.strip() for line in sys.stdin) # strip whitespace + lines = (line for line in lines if line) # skip empty lines + + identities = [] + stanza_map = {} + + for line in lines: + log.debug("got %r", line) + if line == "-> done": + break + + if line.startswith("-> add-identity "): + encoded = line.split(" ")[-1].lower() + data = bech32_decode("age-plugin-trezor-", encoded) + identity = client.create_identity(data.decode()) + identities.append(identity) + + elif line.startswith("-> recipient-stanza "): + file_index, tag, *args = line.split(" ")[2:] + body = next(lines) + if tag != "X25519": + continue + + peer_pubkey = base64_decode(args[0]) + encrypted = base64_decode(body) + stanza_map.setdefault(file_index, []).append((peer_pubkey, encrypted)) + + for file_index, stanzas in stanza_map.items(): + _handle_single_file(file_index, stanzas, identities, c) + + sys.stdout.write('-> done\n\n') + sys.stdout.flush() + sys.stdout.close() + + +def _handle_single_file(file_index, stanzas, identities, c): + d = c.device.__class__.__name__ + msg = base64_encode(f'Please confirm decryption on {d} device...'.encode()) + for peer_pubkey, encrypted in stanzas: + for identity in identities: + sys.stdout.write(f'-> msg\n{msg}\n') + sys.stdout.flush() + + key = c.ecdh(identity=identity, peer_pubkey=peer_pubkey) + result = decrypt(key=key, encrypted=encrypted) + if not result: + continue + + sys.stdout.write(f'-> file-key {file_index}\n{base64_encode(result)}\n') + sys.stdout.flush() + return + + +def main(device_type): + """Parse command-line arguments.""" + p = argparse.ArgumentParser() + + agent_package = device_type.package_name() + resources_map = {r.key: r for r in pkg_resources.require(agent_package)} + resources = [resources_map[agent_package], resources_map['libagent']] + versions = '\n'.join('{}={}'.format(r.key, r.version) for r in resources) + p.add_argument('--version', help='print the version info', + action='version', version=versions) + + p.add_argument('-i', '--identity') + p.add_argument('-v', '--verbose', default=0, action='count') + p.add_argument('--age-plugin') + + args = p.parse_args() + + log_path = os.environ.get("TREZOR_AGE_PLUGIN_LOG") + util.setup_logging(verbosity=2, filename=log_path) + + log.debug("starting age plugin: %s", args) + + device_type.ui = device.ui.UI(device_type=device_type, config=vars(args)) + + try: + if args.identity: + run_pubkey(device_type=device_type, args=args) + elif args.age_plugin: + run_decrypt(device_type=device_type, args=args) + except Exception as e: # pylint: disable=broad-except + log.exception("age plugin failed: %s", e) + + log.debug("closing age plugin") diff --git a/libagent/age/client.py b/libagent/age/client.py new file mode 100644 index 0000000..a369591 --- /dev/null +++ b/libagent/age/client.py @@ -0,0 +1,48 @@ +"""Device abstraction layer for AGE operations.""" + +import logging + +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.kdf.hkdf import HKDF + +from ..device import interface + +log = logging.getLogger(__name__) + + +def create_identity(user_id): + """Create AGE identity for hardware device.""" + result = interface.Identity(identity_str='age://', curve_name="ed25519") + result.identity_dict['host'] = user_id + return result + + +class Client: + """Sign messages and get public keys from a hardware device.""" + + def __init__(self, device): + """C-tor.""" + self.device = device + + def pubkey(self, identity, ecdh=False): + """Return public key as VerifyingKey object.""" + with self.device: + pubkey = bytes(self.device.pubkey(ecdh=ecdh, identity=identity)) + assert len(pubkey) == 32 + return pubkey + + def ecdh(self, identity, peer_pubkey): + """Derive shared secret using ECDH from peer public key.""" + log.info('please confirm AGE decryption on %s for "%s"...', + self.device, identity.to_string()) + with self.device: + assert len(peer_pubkey) == 32 + result, self_pubkey = self.device.ecdh_with_pubkey( + pubkey=(b"\x40" + peer_pubkey), identity=identity) + assert result[:1] == b"\x04" + hkdf = HKDF( + algorithm=hashes.SHA256(), + length=32, + salt=((peer_pubkey + self_pubkey)), + info=b"age-encryption.org/v1/X25519") + return hkdf.derive(result[1:]) diff --git a/libagent/device/trezor.py b/libagent/device/trezor.py index 8fdbd11..65978b3 100644 --- a/libagent/device/trezor.py +++ b/libagent/device/trezor.py @@ -126,6 +126,11 @@ class Trezor(interface.Device): def ecdh(self, identity, pubkey): """Get shared session key using Elliptic Curve Diffie-Hellman.""" + session_key, _ = self.ecdh_with_pubkey(identity, pubkey) + return session_key + + def ecdh_with_pubkey(self, identity, pubkey): + """Get shared session key using Elliptic Curve Diffie-Hellman & self public key.""" curve_name = identity.get_curve_name(ecdh=True) log.debug('"%s" shared session key (%s) for %r from %s', identity.to_string(), curve_name, pubkey, self) @@ -138,7 +143,11 @@ class Trezor(interface.Device): log.debug('result: %s', result) assert len(result.session_key) in {65, 33} # NIST256 or Curve25519 assert result.session_key[:1] == b'\x04' - return bytes(result.session_key) + self_pubkey = result.public_key + if self_pubkey: + self_pubkey = bytes(self_pubkey[1:]) + + return bytes(result.session_key), self_pubkey except self._defs.TrezorFailure as e: msg = '{} error: {}'.format(self, e) log.debug(msg, exc_info=True) diff --git a/libagent/device/trezor_defs.py b/libagent/device/trezor_defs.py index e0569d5..b6a36da 100644 --- a/libagent/device/trezor_defs.py +++ b/libagent/device/trezor_defs.py @@ -1,12 +1,11 @@ """TREZOR-related definitions.""" -import logging # pylint: disable=unused-import,import-error,no-name-in-module,no-member +import logging import os import mnemonic import semver - import trezorlib from trezorlib.btc import get_address, get_public_node from trezorlib.client import PASSPHRASE_TEST_PATH diff --git a/libagent/formats.py b/libagent/formats.py index 9d64424..36f69d6 100644 --- a/libagent/formats.py +++ b/libagent/formats.py @@ -101,7 +101,7 @@ def parse_pubkey(blob): def _decompress_ed25519(pubkey): """Load public key from the serialized blob (stripping the prefix byte).""" - if pubkey[:1] == b'\x00': + if pubkey[:1] in {b'\x00', b'\x01'}: # set by Trezor fsm_msgSignIdentity() and fsm_msgGetPublicKey() return nacl.signing.VerifyKey(pubkey[1:], encoder=nacl.encoding.RawEncoder) else: diff --git a/setup.py b/setup.py index ee214e1..0cc83f6 100755 --- a/setup.py +++ b/setup.py @@ -16,6 +16,8 @@ setup( 'libagent.ssh', ], install_requires=[ + 'bech32>=1.2.0', + 'cryptography>=3.4.6', 'docutils>=0.14', 'python-daemon>=2.3.0', 'wheel>=0.32.3', diff --git a/tox.ini b/tox.ini index 3d6cc1a..a8af270 100644 --- a/tox.ini +++ b/tox.ini @@ -13,10 +13,10 @@ deps= pylint semver pydocstyle - isort<5 + isort commands= pycodestyle libagent - isort --skip-glob .tox -c -rc libagent + isort --skip-glob .tox -c libagent pylint --reports=no --rcfile .pylintrc libagent pydocstyle libagent coverage run --source libagent -m py.test -v libagent