Merge branch 'master' into python3
This commit is contained in:
@@ -1,2 +1,2 @@
|
||||
[MESSAGES CONTROL]
|
||||
disable=invalid-name, missing-docstring, locally-disabled
|
||||
disable=invalid-name, missing-docstring, locally-disabled, unbalanced-tuple-unpacking
|
||||
|
||||
@@ -6,7 +6,7 @@ python:
|
||||
|
||||
install:
|
||||
- pip install ecdsa ed25519 semver # test without trezorlib for now
|
||||
- pip install pylint coverage pep8 pydocstyle
|
||||
- pip install -U pylint coverage pep8 pydocstyle # use latest tools
|
||||
|
||||
script:
|
||||
- pep8 trezor_agent
|
||||
|
||||
22
trezor_agent/gpg/README.md
Normal file
22
trezor_agent/gpg/README.md
Normal file
@@ -0,0 +1,22 @@
|
||||
# Generate new stand-alone GPG identity
|
||||
|
||||
```
|
||||
$ USER_ID="Satoshi Nakamoto <satoshi@nakamoto.bit>"
|
||||
$ trezor-gpg create "${USER_ID}" > identity.pub # create new TREZOR-based GPG identity
|
||||
$ gpg2 --import identity.pub # import into local GPG public keyring
|
||||
$ gpg2 --edit "${USER_ID}" trust # OPTIONAL: mark the key as trusted
|
||||
```
|
||||
|
||||
# Generate new subkey for existing GPG identity
|
||||
```
|
||||
$ USER_ID="Satoshi Nakamoto <satoshi@nakamoto.bit>"
|
||||
$ gpg2 --list-keys "${USER_ID}" # make sure this identity already exists
|
||||
$ trezor-gpg create --subkey "${USER_ID}" > identity.pub # create new TREZOR-based GPG public key
|
||||
$ gpg2 --import identity.pub # append it to existing identity
|
||||
```
|
||||
|
||||
# Generate signatures using the TREZOR device
|
||||
```
|
||||
$ trezor-gpg sign EXAMPLE > EXAMPLE.sig # confirm signature using the device
|
||||
$ gpg2 --verify EXAMPLE.sig # verify using standard GPG binary
|
||||
```
|
||||
@@ -1,51 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
"""Check GPG v2 signature for a given public key."""
|
||||
import argparse
|
||||
import base64
|
||||
import io
|
||||
import logging
|
||||
|
||||
from . import decode
|
||||
from .. import util
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def original_data(filename):
|
||||
"""Locate and load original file data, whose signature is provided."""
|
||||
parts = filename.rsplit('.', 1)
|
||||
if len(parts) == 2 and parts[1] in ('sig', 'asc'):
|
||||
log.debug('loading file %s', parts[0])
|
||||
return open(parts[0], 'rb').read()
|
||||
|
||||
|
||||
def verify(pubkey, sig_file):
|
||||
"""Verify correctness of public key and signature."""
|
||||
stream = open(sig_file, 'rb')
|
||||
if stream.name.endswith('.asc'):
|
||||
lines = stream.readlines()[3:-1]
|
||||
data = base64.b64decode(''.join(lines))
|
||||
payload, checksum = data[:-3], data[-3:]
|
||||
assert util.crc24(payload) == checksum
|
||||
stream = io.BytesIO(payload)
|
||||
|
||||
signature, digest = decode.load_signature(stream, original_data(sig_file))
|
||||
decode.verify_digest(pubkey=pubkey, digest=digest,
|
||||
signature=signature['sig'], label='GPG signature')
|
||||
log.info('%s OK', sig_file)
|
||||
|
||||
|
||||
def main():
|
||||
"""Main function."""
|
||||
p = argparse.ArgumentParser()
|
||||
p.add_argument('pubkey')
|
||||
p.add_argument('signature')
|
||||
p.add_argument('-v', '--verbose', action='store_true', default=False)
|
||||
args = p.parse_args()
|
||||
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO,
|
||||
format='%(asctime)s %(levelname)-10s %(message)s')
|
||||
verify(pubkey=decode.load_public_key(open(args.pubkey, 'rb')),
|
||||
sig_file=args.signature)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -1,4 +1,5 @@
|
||||
"""Decoders for GPG v2 data structures."""
|
||||
import base64
|
||||
import functools
|
||||
import hashlib
|
||||
import io
|
||||
@@ -268,6 +269,7 @@ def load_public_key(stream, use_custom=False):
|
||||
packet, signature = packets[:2]
|
||||
packets = packets[2:]
|
||||
|
||||
packet['user_id'] = userid['value']
|
||||
return packet
|
||||
|
||||
|
||||
@@ -281,7 +283,8 @@ def load_signature(stream, original_data):
|
||||
|
||||
def load_from_gpg(user_id, use_custom=False):
|
||||
"""Load existing GPG public key for `user_id` from local keyring."""
|
||||
pubkey_bytes = subprocess.check_output(['gpg2', '--export', user_id])
|
||||
args = ['gpg2', '--export'] + ([user_id] if user_id else [])
|
||||
pubkey_bytes = subprocess.check_output(args=args)
|
||||
if pubkey_bytes:
|
||||
return load_public_key(io.BytesIO(pubkey_bytes), use_custom=use_custom)
|
||||
else:
|
||||
@@ -298,3 +301,19 @@ def verify_digest(pubkey, digest, signature, label):
|
||||
except ecdsa.keys.BadSignatureError:
|
||||
log.error('Bad %s!', label)
|
||||
raise
|
||||
|
||||
|
||||
def verify(pubkey, signature, original_data):
|
||||
"""Verify correctness of public key and signature."""
|
||||
stream = io.BytesIO(signature)
|
||||
|
||||
# remove GPG armor
|
||||
lines = stream.readlines()[3:-1]
|
||||
data = base64.b64decode(''.join(lines))
|
||||
payload, checksum = data[:-3], data[-3:]
|
||||
assert util.crc24(payload) == checksum
|
||||
stream = io.BytesIO(payload)
|
||||
|
||||
signature, digest = load_signature(stream, original_data)
|
||||
verify_digest(pubkey=pubkey, digest=digest,
|
||||
signature=signature['sig'], label='GPG signature')
|
||||
|
||||
@@ -125,8 +125,9 @@ class Signer(object):
|
||||
curve_name=curve_name, created=created,
|
||||
verifying_key=self.conn.pubkey())
|
||||
|
||||
log.info('%s GPG public key %s created at %s', curve_name,
|
||||
self.pubkey, util.time_format(self.pubkey.created))
|
||||
log.info('%s GPG public key %s created at %s for "%s"',
|
||||
curve_name, self.pubkey,
|
||||
util.time_format(self.pubkey.created), user_id)
|
||||
|
||||
@classmethod
|
||||
def from_public_key(cls, pubkey, user_id):
|
||||
|
||||
@@ -6,73 +6,65 @@ import subprocess as sp
|
||||
import sys
|
||||
import time
|
||||
|
||||
from . import check, decode, encode
|
||||
from . import decode, encode
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _open_output(filename):
|
||||
return sys.stdout if filename == '-' else open(filename, 'wb')
|
||||
def run_create(args):
|
||||
"""Generate a new pubkey for a new/existing GPG identity."""
|
||||
s = encode.Signer(user_id=args.user_id, created=args.time,
|
||||
curve_name=args.ecdsa_curve)
|
||||
if args.subkey:
|
||||
subkey = s.subkey()
|
||||
primary = sp.check_output(['gpg2', '--export', args.user_id])
|
||||
result = primary + subkey
|
||||
else:
|
||||
result = s.export()
|
||||
s.close()
|
||||
|
||||
return encode.armor(result, 'PUBLIC KEY BLOCK')
|
||||
|
||||
|
||||
def _call_with_input(args, blob):
|
||||
p = sp.Popen(args=args, stdin=sp.PIPE)
|
||||
p.stdin.write(blob)
|
||||
p.stdin.close()
|
||||
exit_code = p.wait()
|
||||
assert exit_code == 0, exit_code
|
||||
def run_sign(args):
|
||||
"""Generate a GPG signature using hardware-based device."""
|
||||
pubkey = decode.load_from_gpg(user_id=None, use_custom=True)
|
||||
s = encode.Signer.from_public_key(pubkey=pubkey, user_id=pubkey['user_id'])
|
||||
if args.filename:
|
||||
data = open(args.filename, 'rb').read()
|
||||
else:
|
||||
data = sys.stdin.read()
|
||||
sig = s.sign(data)
|
||||
s.close()
|
||||
|
||||
sig = encode.armor(sig, 'SIGNATURE')
|
||||
decode.verify(pubkey=pubkey, signature=sig, original_data=data)
|
||||
return sig
|
||||
|
||||
|
||||
def main():
|
||||
"""Main function."""
|
||||
p = argparse.ArgumentParser()
|
||||
p.add_argument('user_id')
|
||||
p.add_argument('filename', nargs='?')
|
||||
p.add_argument('-t', '--time', type=int, default=int(time.time()))
|
||||
p.add_argument('-a', '--armor', action='store_true', default=False)
|
||||
p.add_argument('-v', '--verbose', action='store_true', default=False)
|
||||
p.add_argument('-s', '--subkey', action='store_true', default=False)
|
||||
p.add_argument('-e', '--ecdsa-curve', default='nist256p1')
|
||||
p.add_argument('-o', '--output',
|
||||
help='Output file name for the results. '
|
||||
'Use "-" to write the results to stdout or "GPG" '
|
||||
'to import a public key into the local keyring.')
|
||||
subparsers = p.add_subparsers()
|
||||
|
||||
create = subparsers.add_parser('create')
|
||||
create.add_argument('user_id', help='e.g. '
|
||||
'"Satoshi Nakamoto <satoshi@nakamoto.bit>"')
|
||||
create.add_argument('-s', '--subkey', action='store_true', default=False)
|
||||
create.add_argument('-e', '--ecdsa-curve', default='nist256p1')
|
||||
create.add_argument('-t', '--time', type=int, default=int(time.time()))
|
||||
create.set_defaults(run=run_create)
|
||||
|
||||
sign = subparsers.add_parser('sign')
|
||||
sign.add_argument('filename', nargs='?')
|
||||
sign.set_defaults(run=run_sign)
|
||||
|
||||
args = p.parse_args()
|
||||
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO,
|
||||
format='%(asctime)s %(levelname)-10s %(message)s')
|
||||
user_id = args.user_id.encode('ascii')
|
||||
if not args.filename:
|
||||
s = encode.Signer(user_id=user_id, created=args.time,
|
||||
curve_name=args.ecdsa_curve)
|
||||
if args.subkey:
|
||||
pubkey = s.subkey()
|
||||
else:
|
||||
pubkey = s.export()
|
||||
|
||||
ext = '.pub'
|
||||
if args.armor:
|
||||
pubkey = encode.armor(pubkey, 'PUBLIC KEY BLOCK')
|
||||
ext = '.asc'
|
||||
filename = args.output or '-' # use stdout if no file specified
|
||||
if filename == 'GPG':
|
||||
log.info('importing public key to local keyring')
|
||||
_call_with_input(['gpg2', '--import'], pubkey)
|
||||
else:
|
||||
_open_output(filename).write(pubkey)
|
||||
else:
|
||||
pubkey = decode.load_from_gpg(user_id, use_custom=True)
|
||||
s = encode.Signer.from_public_key(pubkey=pubkey, user_id=user_id)
|
||||
data = open(args.filename, 'rb').read()
|
||||
sig, ext = s.sign(data), '.sig'
|
||||
if args.armor:
|
||||
sig = encode.armor(sig, 'SIGNATURE')
|
||||
ext = '.asc'
|
||||
filename = args.output or (args.filename + ext)
|
||||
_open_output(filename).write(sig)
|
||||
check.verify(pubkey=pubkey, sig_file=filename)
|
||||
|
||||
s.close()
|
||||
result = args.run(args)
|
||||
sys.stdout.write(result)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
@@ -46,3 +46,54 @@ def test_send_recv():
|
||||
assert util.recv(s, 2) == b'3*'
|
||||
|
||||
pytest.raises(EOFError, util.recv, s, 1)
|
||||
|
||||
|
||||
def test_crc24():
|
||||
assert util.crc24(b'') == b'\xb7\x04\xce'
|
||||
assert util.crc24(b'1234567890') == b'\x8c\x00\x72'
|
||||
|
||||
|
||||
def test_bit():
|
||||
assert util.bit(6, 3) == 0
|
||||
assert util.bit(6, 2) == 1
|
||||
assert util.bit(6, 1) == 1
|
||||
assert util.bit(6, 0) == 0
|
||||
|
||||
|
||||
def test_split_bits():
|
||||
assert util.split_bits(0x1234, 4, 8, 4) == [0x1, 0x23, 0x4]
|
||||
|
||||
|
||||
def test_hexlify():
|
||||
assert util.hexlify(b'\x12\x34\xab\xcd') == '1234ABCD'
|
||||
|
||||
|
||||
def test_low_bits():
|
||||
assert util.low_bits(0x1234, 12) == 0x234
|
||||
assert util.low_bits(0x1234, 32) == 0x1234
|
||||
assert util.low_bits(0x1234, 0) == 0
|
||||
|
||||
|
||||
def test_readfmt():
|
||||
stream = io.BytesIO(b'ABC\x12\x34')
|
||||
assert util.readfmt(stream, 'B') == (65,)
|
||||
assert util.readfmt(stream, '>2sH') == (b'BC', 0x1234)
|
||||
|
||||
|
||||
def test_prefix_len():
|
||||
assert util.prefix_len('>H', b'ABCD') == b'\x00\x04ABCD'
|
||||
|
||||
|
||||
def test_reader():
|
||||
stream = io.BytesIO(b'ABC\x12\x34')
|
||||
r = util.Reader(stream)
|
||||
assert r.read(1) == b'A'
|
||||
assert r.readfmt('2s') == b'BC'
|
||||
|
||||
dst = io.BytesIO()
|
||||
with r.capture(dst):
|
||||
assert r.readfmt('>H') == 0x1234
|
||||
assert dst.getvalue() == b'\x12\x34'
|
||||
|
||||
with pytest.raises(EOFError):
|
||||
r.read(1)
|
||||
|
||||
@@ -94,7 +94,7 @@ def crc24(blob):
|
||||
crc ^= CRC24_POLY
|
||||
assert 0 <= crc < 0x1000000
|
||||
crc_bytes = struct.pack('>L', crc)
|
||||
assert crc_bytes[0] == b'\x00'
|
||||
assert crc_bytes[:1] == b'\x00'
|
||||
return crc_bytes[1:]
|
||||
|
||||
|
||||
@@ -120,7 +120,9 @@ def split_bits(value, *bits):
|
||||
result.append(value & mask)
|
||||
value = value >> b
|
||||
assert value == 0
|
||||
return reversed(result)
|
||||
|
||||
result.reverse()
|
||||
return result
|
||||
|
||||
|
||||
def readfmt(stream, fmt):
|
||||
|
||||
Reference in New Issue
Block a user