gpg: hack agent prototype

This commit is contained in:
Roman Zeyde
2016-05-28 23:02:45 +03:00
parent 6975671cc1
commit d9b07e2ac6
3 changed files with 108 additions and 11 deletions

View File

@@ -3,9 +3,9 @@
import argparse
import contextlib
import logging
import os
import sys
import time
import os
from . import decode, encode, keyring, proto

97
trezor_agent/gpg/agent.py Normal file
View File

@@ -0,0 +1,97 @@
import binascii
import contextlib
import hashlib
import logging
import os
import select
import subprocess
import threading
import ecdsa
from . import keyring
from .. import server, util
log = logging.getLogger(__name__)
def yield_connections(sock):
"""Run a server on the specified socket."""
while True:
log.debug('waiting for connection on %s', sock.getsockname())
conn, _ = sock.accept()
conn.settimeout(None)
log.debug('accepted connection on %s', sock.getsockname())
yield conn
def serialize(data):
for c in ['%', '\n', '\r']:
data = data.replace(c, '%{:02X}'.format(ord(c)))
return data
def sig_encode(r, s, o):
r = serialize(util.num2bytes(r, 32))
s = serialize(util.num2bytes(s, 32))
return '(7:sig-val(5:ecdsa(1:r32:{})(1:s32:{})))\n'.format(r, s)
def pksign(keygrip, digest):
pk = 0x99ddae8aee45de830e08889f76ce1f7f993d80a1a05e843ae950b5ba62c16efb
sk = ecdsa.SigningKey.from_secret_exponent(pk, curve=ecdsa.NIST256p,
hashfunc=hashlib.sha256)
digest = binascii.unhexlify(digest)
result = sk.sign_digest_deterministic(digest, hashfunc=hashlib.sha256,
sigencode=sig_encode)
log.debug('result: %r', result)
return result
def handle_connection(conn):
keygrip = None
digest = None
algo = None
conn.sendall('OK\n')
while True:
line = keyring.recvline(conn)
parts = line.split(' ')
command = parts[0]
args = parts[1:]
if command in {'RESET', 'OPTION', 'HAVEKEY', 'SETKEYDESC'}:
pass # reply with OK
elif command == 'GETINFO':
conn.sendall('D 2.1.11\n')
elif command == 'AGENT_ID':
conn.sendall('D TREZOR\n')
elif command == 'SIGKEY':
keygrip, = args
elif command == 'SETHASH':
algo, digest = args
elif command == 'PKSIGN':
sig = pksign(keygrip, digest)
conn.sendall('D ' + sig)
else:
log.error('unknown request: %r', line)
return
conn.sendall('OK\n')
def main():
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(levelname)-10s %(message)s')
sock_path = os.path.expanduser('~/.gnupg/S.gpg-agent')
with server.unix_domain_socket_server(sock_path) as sock:
for conn in yield_connections(sock):
with contextlib.closing(conn):
try:
handle_connection(conn)
except EOFError:
break
if __name__ == '__main__':
main()

View File

@@ -22,14 +22,14 @@ def connect_to_agent(sock_path='~/.gnupg/S.gpg-agent', sp=subprocess):
return sock
def _communicate(sock, msg):
def communicate(sock, msg):
msg += '\n'
sock.sendall(msg.encode('ascii'))
log.debug('-> %r', msg)
return _recvline(sock)
return recvline(sock)
def _recvline(sock):
def recvline(sock):
reply = io.BytesIO()
while True:
@@ -112,7 +112,7 @@ def sign_digest(sock, keygrip, digest, sp=subprocess, environ=None):
hash_algo = 8 # SHA256
assert len(digest) == 32
assert _communicate(sock, 'RESET').startswith(b'OK')
assert communicate(sock, 'RESET').startswith(b'OK')
ttyname = sp.check_output(['tty']).strip()
options = ['ttyname={}'.format(ttyname)] # set TTY for passphrase entry
@@ -122,17 +122,17 @@ def sign_digest(sock, keygrip, digest, sp=subprocess, environ=None):
options.append('display={}'.format(display))
for opt in options:
assert _communicate(sock, 'OPTION {}'.format(opt)) == b'OK'
assert communicate(sock, 'OPTION {}'.format(opt)) == b'OK'
assert _communicate(sock, 'SIGKEY {}'.format(keygrip)) == b'OK'
assert communicate(sock, 'SIGKEY {}'.format(keygrip)) == b'OK'
hex_digest = binascii.hexlify(digest).upper().decode('ascii')
assert _communicate(sock, 'SETHASH {} {}'.format(hash_algo,
assert communicate(sock, 'SETHASH {} {}'.format(hash_algo,
hex_digest)) == b'OK'
assert _communicate(sock, 'SETKEYDESC '
assert communicate(sock, 'SETKEYDESC '
'Sign+a+new+TREZOR-based+subkey') == b'OK'
assert _communicate(sock, 'PKSIGN') == b'OK'
line = _recvline(sock).strip()
assert communicate(sock, 'PKSIGN') == b'OK'
line = recvline(sock).strip()
line = unescape(line)
log.debug('unescaped: %r', line)
prefix, sig = line.split(b' ', 1)