183 lines
5.8 KiB
Python
183 lines
5.8 KiB
Python
"""
|
|
TREZOR support for AGE format.
|
|
|
|
See these links for more details:
|
|
- https://age-encryption.org/v1
|
|
- https://github.com/FiloSottile/age
|
|
- https://github.com/str4d/rage/
|
|
"""
|
|
|
|
import argparse
|
|
import base64
|
|
import io
|
|
import logging
|
|
import os
|
|
import sys
|
|
|
|
import bech32
|
|
import pkg_resources
|
|
from cryptography.exceptions import InvalidTag
|
|
from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305
|
|
|
|
from .. import device, util
|
|
from . import client
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
def bech32_decode(prefix, encoded):
|
|
"""Decode Bech32-encoded data."""
|
|
hrp, data = bech32.bech32_decode(encoded)
|
|
assert prefix == hrp
|
|
return bytes(bech32.convertbits(data, 5, 8, pad=False))
|
|
|
|
|
|
def bech32_encode(prefix, data):
|
|
"""Encode data using Bech32."""
|
|
return bech32.bech32_encode(prefix, bech32.convertbits(bytes(data), 8, 5))
|
|
|
|
|
|
def run_pubkey(device_type, args):
|
|
"""Initialize hardware-based GnuPG identity."""
|
|
log.warning('This AGE tool is still in EXPERIMENTAL mode, '
|
|
'so please note that the API and features may '
|
|
'change without backwards compatibility!')
|
|
|
|
c = client.Client(device=device_type())
|
|
pubkey = c.pubkey(identity=client.create_identity(args.identity), ecdh=True)
|
|
recipient = bech32_encode(prefix="age", data=pubkey)
|
|
print(f"# recipient: {recipient}")
|
|
print(f"# SLIP-0017: {args.identity}")
|
|
data = args.identity.encode()
|
|
encoded = bech32_encode(prefix="age-plugin-trezor-", data=data).upper()
|
|
decoded = bech32_decode(prefix="age-plugin-trezor-", encoded=encoded)
|
|
assert decoded.startswith(data)
|
|
print(encoded)
|
|
|
|
|
|
def base64_decode(encoded: str) -> bytes:
|
|
"""Decode Base64-encoded data (after padding correctly with '=')."""
|
|
k = len(encoded) % 4
|
|
pad = (4 - k) if k else 0
|
|
return base64.b64decode(encoded + ("=" * pad))
|
|
|
|
|
|
# https://github.com/FiloSottile/age/blob/v1.1.0-rc.1/internal/format/format.go#L45
|
|
BYTES_PER_LINE = 48
|
|
|
|
|
|
def base64_encode(data: bytes) -> str:
|
|
"""Encode data using Base64 (and remove '=')."""
|
|
reader = io.BytesIO(data)
|
|
chunks = map(base64.b64encode, iter(lambda: reader.read(BYTES_PER_LINE), b""))
|
|
chunks = (chunk.replace(b"=", b"") for chunk in chunks)
|
|
return b"\n".join(chunks).decode()
|
|
|
|
|
|
def decrypt(key, encrypted):
|
|
"""Decrypt age-encrypted data."""
|
|
cipher = ChaCha20Poly1305(key)
|
|
try:
|
|
return cipher.decrypt(
|
|
nonce=(b"\x00" * 12),
|
|
data=encrypted,
|
|
associated_data=None)
|
|
except InvalidTag:
|
|
return None
|
|
|
|
|
|
def run_decrypt(device_type, args):
|
|
"""Unlock hardware device (for future interaction)."""
|
|
# pylint: disable=too-many-locals
|
|
c = client.Client(device=device_type())
|
|
|
|
lines = (line.strip() for line in sys.stdin) # strip whitespace
|
|
lines = (line for line in lines if line) # skip empty lines
|
|
|
|
identities = []
|
|
stanza_map = {}
|
|
|
|
for line in lines:
|
|
log.debug("got %r", line)
|
|
if line == "-> done":
|
|
break
|
|
|
|
if line.startswith("-> add-identity "):
|
|
encoded = line.split(" ")[-1].lower()
|
|
data = bech32_decode("age-plugin-trezor-", encoded)
|
|
identity = client.create_identity(data.decode())
|
|
identities.append(identity)
|
|
|
|
elif line.startswith("-> recipient-stanza "):
|
|
file_index, tag, *args = line.split(" ")[2:]
|
|
body = next(lines)
|
|
if tag != "X25519":
|
|
continue
|
|
|
|
peer_pubkey = base64_decode(args[0])
|
|
encrypted = base64_decode(body)
|
|
stanza_map.setdefault(file_index, []).append((peer_pubkey, encrypted))
|
|
|
|
for file_index, stanzas in stanza_map.items():
|
|
_handle_single_file(file_index, stanzas, identities, c)
|
|
|
|
sys.stdout.write('-> done\n\n')
|
|
sys.stdout.flush()
|
|
sys.stdout.close()
|
|
|
|
|
|
def _handle_single_file(file_index, stanzas, identities, c):
|
|
d = c.device.__class__.__name__
|
|
for peer_pubkey, encrypted in stanzas:
|
|
for identity in identities:
|
|
id_str = identity.to_string()
|
|
msg = base64_encode(f'Please confirm {id_str} decryption on {d} device...'.encode())
|
|
sys.stdout.write(f'-> msg\n{msg}\n')
|
|
sys.stdout.flush()
|
|
|
|
key = c.ecdh(identity=identity, peer_pubkey=peer_pubkey)
|
|
result = decrypt(key=key, encrypted=encrypted)
|
|
if not result:
|
|
continue
|
|
|
|
sys.stdout.write(f'-> file-key {file_index}\n{base64_encode(result)}\n')
|
|
sys.stdout.flush()
|
|
return
|
|
|
|
|
|
def main(device_type):
|
|
"""Parse command-line arguments."""
|
|
p = argparse.ArgumentParser()
|
|
|
|
agent_package = device_type.package_name()
|
|
resources_map = {r.key: r for r in pkg_resources.require(agent_package)}
|
|
resources = [resources_map[agent_package], resources_map['libagent']]
|
|
versions = '\n'.join('{}={}'.format(r.key, r.version) for r in resources)
|
|
p.add_argument('--version', help='print the version info',
|
|
action='version', version=versions)
|
|
|
|
p.add_argument('-i', '--identity')
|
|
p.add_argument('-v', '--verbose', default=0, action='count')
|
|
p.add_argument('--age-plugin')
|
|
|
|
args = p.parse_args()
|
|
|
|
log_path = os.environ.get("TREZOR_AGE_PLUGIN_LOG")
|
|
util.setup_logging(verbosity=args.verbose, filename=log_path)
|
|
|
|
log.debug("starting age plugin: %s", args)
|
|
|
|
device_type.ui = device.ui.UI(device_type=device_type, config=vars(args))
|
|
|
|
try:
|
|
if args.identity:
|
|
run_pubkey(device_type=device_type, args=args)
|
|
elif args.age_plugin == 'identity-v1':
|
|
run_decrypt(device_type=device_type, args=args)
|
|
else:
|
|
log.error("Unsupported state machine: %r", args.age_plugin)
|
|
except Exception as e: # pylint: disable=broad-except
|
|
log.exception("age plugin failed: %s", e)
|
|
|
|
log.debug("closing age plugin")
|