Add age plugin support
See https://github.com/str4d/rage/tree/main/age-plugin. Example usage: RAGE_DIR=$PWD/../Rust/rage (cd $RAGE_DIR; cargo build --all) export PATH=$PATH:$RAGE_DIR/target/debug age-plugin-trezor -i "John Doe" | tee trezor.id R=$(grep recipient trezor.id | cut -f 3 -d ' ') date | tee msg.txt rage -er $R < msg.txt > enc.txt rage -di trezor.id < enc.txt
This commit is contained in:
175
libagent/age/__init__.py
Normal file
175
libagent/age/__init__.py
Normal file
@@ -0,0 +1,175 @@
|
||||
"""
|
||||
TREZOR support for AGE format.
|
||||
|
||||
See these links for more details:
|
||||
- https://age-encryption.org/v1
|
||||
- https://github.com/FiloSottile/age
|
||||
- https://github.com/str4d/rage/
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import base64
|
||||
import contextlib
|
||||
import datetime
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import traceback
|
||||
|
||||
import bech32
|
||||
import pkg_resources
|
||||
import semver
|
||||
from cryptography.exceptions import InvalidTag
|
||||
from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305
|
||||
|
||||
from .. import device, server, util
|
||||
from . import client
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def bech32_decode(prefix, encoded):
|
||||
"""Decode Bech32-encoded data."""
|
||||
hrp, data = bech32.bech32_decode(encoded)
|
||||
assert prefix == hrp
|
||||
return bytes(bech32.convertbits(data, 5, 8, pad=False))
|
||||
|
||||
|
||||
def bech32_encode(prefix, data):
|
||||
"""Encode data using Bech32."""
|
||||
return bech32.bech32_encode(prefix, bech32.convertbits(bytes(data), 8, 5))
|
||||
|
||||
|
||||
def run_pubkey(device_type, args):
|
||||
"""Initialize hardware-based GnuPG identity."""
|
||||
log.warning('This AGE tool is still in EXPERIMENTAL mode, '
|
||||
'so please note that the API and features may '
|
||||
'change without backwards compatibility!')
|
||||
|
||||
c = client.Client(device=device_type())
|
||||
pubkey = c.pubkey(identity=client.create_identity(args.identity), ecdh=True)
|
||||
recipient = bech32_encode(prefix="age", data=pubkey)
|
||||
print(f"# recipient: {recipient}")
|
||||
print(f"# SLIP-0017: {args.identity}")
|
||||
data = args.identity.encode()
|
||||
encoded = bech32_encode(prefix="age-plugin-trezor-", data=data).upper()
|
||||
decoded = bech32_decode(prefix="age-plugin-trezor-", encoded=encoded)
|
||||
assert decoded.startswith(data)
|
||||
print(encoded)
|
||||
|
||||
|
||||
def base64_decode(encoded: str) -> bytes:
|
||||
"""Decode Base64-encoded data (after padding correctly with '=')."""
|
||||
k = len(encoded) % 4
|
||||
pad = (4 - k) if k else 0
|
||||
return base64.b64decode(encoded + ("=" * pad))
|
||||
|
||||
|
||||
def base64_encode(data: bytes) -> str:
|
||||
"""Encode data using Base64 (and remove '=')."""
|
||||
return base64.b64encode(data).replace(b"=", b"").decode()
|
||||
|
||||
|
||||
def decrypt(key, encrypted):
|
||||
"""Decrypt age-encrypted data."""
|
||||
cipher = ChaCha20Poly1305(key)
|
||||
try:
|
||||
return cipher.decrypt(
|
||||
nonce=(b"\x00" * 12),
|
||||
data=encrypted,
|
||||
associated_data=None)
|
||||
except InvalidTag:
|
||||
return None
|
||||
|
||||
|
||||
def run_decrypt(device_type, args):
|
||||
"""Unlock hardware device (for future interaction)."""
|
||||
# pylint: disable=too-many-locals
|
||||
c = client.Client(device=device_type())
|
||||
|
||||
lines = (line.strip() for line in sys.stdin) # strip whitespace
|
||||
lines = (line for line in lines if line) # skip empty lines
|
||||
|
||||
identities = []
|
||||
stanza_map = {}
|
||||
|
||||
for line in lines:
|
||||
log.debug("got %r", line)
|
||||
if line == "-> done":
|
||||
break
|
||||
|
||||
if line.startswith("-> add-identity "):
|
||||
encoded = line.split(" ")[-1].lower()
|
||||
data = bech32_decode("age-plugin-trezor-", encoded)
|
||||
identity = client.create_identity(data.decode())
|
||||
identities.append(identity)
|
||||
|
||||
elif line.startswith("-> recipient-stanza "):
|
||||
file_index, tag, *args = line.split(" ")[2:]
|
||||
body = next(lines)
|
||||
if tag != "X25519":
|
||||
continue
|
||||
|
||||
peer_pubkey = base64_decode(args[0])
|
||||
encrypted = base64_decode(body)
|
||||
stanza_map.setdefault(file_index, []).append((peer_pubkey, encrypted))
|
||||
|
||||
for file_index, stanzas in stanza_map.items():
|
||||
_handle_single_file(file_index, stanzas, identities, c)
|
||||
|
||||
sys.stdout.write('-> done\n\n')
|
||||
sys.stdout.flush()
|
||||
sys.stdout.close()
|
||||
|
||||
|
||||
def _handle_single_file(file_index, stanzas, identities, c):
|
||||
d = c.device.__class__.__name__
|
||||
msg = base64_encode(f'Please confirm decryption on {d} device...'.encode())
|
||||
for peer_pubkey, encrypted in stanzas:
|
||||
for identity in identities:
|
||||
sys.stdout.write(f'-> msg\n{msg}\n')
|
||||
sys.stdout.flush()
|
||||
|
||||
key = c.ecdh(identity=identity, peer_pubkey=peer_pubkey)
|
||||
result = decrypt(key=key, encrypted=encrypted)
|
||||
if not result:
|
||||
continue
|
||||
|
||||
sys.stdout.write(f'-> file-key {file_index}\n{base64_encode(result)}\n')
|
||||
sys.stdout.flush()
|
||||
return
|
||||
|
||||
|
||||
def main(device_type):
|
||||
"""Parse command-line arguments."""
|
||||
p = argparse.ArgumentParser()
|
||||
|
||||
agent_package = device_type.package_name()
|
||||
resources_map = {r.key: r for r in pkg_resources.require(agent_package)}
|
||||
resources = [resources_map[agent_package], resources_map['libagent']]
|
||||
versions = '\n'.join('{}={}'.format(r.key, r.version) for r in resources)
|
||||
p.add_argument('--version', help='print the version info',
|
||||
action='version', version=versions)
|
||||
|
||||
p.add_argument('-i', '--identity')
|
||||
p.add_argument('-v', '--verbose', default=0, action='count')
|
||||
p.add_argument('--age-plugin')
|
||||
|
||||
args = p.parse_args()
|
||||
|
||||
log_path = os.environ.get("TREZOR_AGE_PLUGIN_LOG")
|
||||
util.setup_logging(verbosity=2, filename=log_path)
|
||||
|
||||
log.debug("starting age plugin: %s", args)
|
||||
|
||||
device_type.ui = device.ui.UI(device_type=device_type, config=vars(args))
|
||||
|
||||
try:
|
||||
if args.identity:
|
||||
run_pubkey(device_type=device_type, args=args)
|
||||
elif args.age_plugin:
|
||||
run_decrypt(device_type=device_type, args=args)
|
||||
except Exception as e: # pylint: disable=broad-except
|
||||
log.exception("age plugin failed: %s", e)
|
||||
|
||||
log.debug("closing age plugin")
|
||||
48
libagent/age/client.py
Normal file
48
libagent/age/client.py
Normal file
@@ -0,0 +1,48 @@
|
||||
"""Device abstraction layer for AGE operations."""
|
||||
|
||||
import logging
|
||||
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
|
||||
|
||||
from ..device import interface
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def create_identity(user_id):
|
||||
"""Create AGE identity for hardware device."""
|
||||
result = interface.Identity(identity_str='age://', curve_name="ed25519")
|
||||
result.identity_dict['host'] = user_id
|
||||
return result
|
||||
|
||||
|
||||
class Client:
|
||||
"""Sign messages and get public keys from a hardware device."""
|
||||
|
||||
def __init__(self, device):
|
||||
"""C-tor."""
|
||||
self.device = device
|
||||
|
||||
def pubkey(self, identity, ecdh=False):
|
||||
"""Return public key as VerifyingKey object."""
|
||||
with self.device:
|
||||
pubkey = bytes(self.device.pubkey(ecdh=ecdh, identity=identity))
|
||||
assert len(pubkey) == 32
|
||||
return pubkey
|
||||
|
||||
def ecdh(self, identity, peer_pubkey):
|
||||
"""Derive shared secret using ECDH from peer public key."""
|
||||
log.info('please confirm AGE decryption on %s for "%s"...',
|
||||
self.device, identity.to_string())
|
||||
with self.device:
|
||||
assert len(peer_pubkey) == 32
|
||||
result, self_pubkey = self.device.ecdh_with_pubkey(
|
||||
pubkey=(b"\x40" + peer_pubkey), identity=identity)
|
||||
assert result[:1] == b"\x04"
|
||||
hkdf = HKDF(
|
||||
algorithm=hashes.SHA256(),
|
||||
length=32,
|
||||
salt=((peer_pubkey + self_pubkey)),
|
||||
info=b"age-encryption.org/v1/X25519")
|
||||
return hkdf.derive(result[1:])
|
||||
@@ -126,6 +126,11 @@ class Trezor(interface.Device):
|
||||
|
||||
def ecdh(self, identity, pubkey):
|
||||
"""Get shared session key using Elliptic Curve Diffie-Hellman."""
|
||||
session_key, _ = self.ecdh_with_pubkey(identity, pubkey)
|
||||
return session_key
|
||||
|
||||
def ecdh_with_pubkey(self, identity, pubkey):
|
||||
"""Get shared session key using Elliptic Curve Diffie-Hellman & self public key."""
|
||||
curve_name = identity.get_curve_name(ecdh=True)
|
||||
log.debug('"%s" shared session key (%s) for %r from %s',
|
||||
identity.to_string(), curve_name, pubkey, self)
|
||||
@@ -138,7 +143,11 @@ class Trezor(interface.Device):
|
||||
log.debug('result: %s', result)
|
||||
assert len(result.session_key) in {65, 33} # NIST256 or Curve25519
|
||||
assert result.session_key[:1] == b'\x04'
|
||||
return bytes(result.session_key)
|
||||
self_pubkey = result.public_key
|
||||
if self_pubkey:
|
||||
self_pubkey = bytes(self_pubkey[1:])
|
||||
|
||||
return bytes(result.session_key), self_pubkey
|
||||
except self._defs.TrezorFailure as e:
|
||||
msg = '{} error: {}'.format(self, e)
|
||||
log.debug(msg, exc_info=True)
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
"""TREZOR-related definitions."""
|
||||
|
||||
import logging
|
||||
# pylint: disable=unused-import,import-error,no-name-in-module,no-member
|
||||
import logging
|
||||
import os
|
||||
|
||||
import mnemonic
|
||||
import semver
|
||||
|
||||
import trezorlib
|
||||
from trezorlib.btc import get_address, get_public_node
|
||||
from trezorlib.client import PASSPHRASE_TEST_PATH
|
||||
|
||||
@@ -101,7 +101,7 @@ def parse_pubkey(blob):
|
||||
|
||||
def _decompress_ed25519(pubkey):
|
||||
"""Load public key from the serialized blob (stripping the prefix byte)."""
|
||||
if pubkey[:1] == b'\x00':
|
||||
if pubkey[:1] in {b'\x00', b'\x01'}:
|
||||
# set by Trezor fsm_msgSignIdentity() and fsm_msgGetPublicKey()
|
||||
return nacl.signing.VerifyKey(pubkey[1:], encoder=nacl.encoding.RawEncoder)
|
||||
else:
|
||||
|
||||
Reference in New Issue
Block a user