gpg: refactor pubkeys' parsing code
This commit is contained in:
@@ -271,8 +271,9 @@ def digest_packets(packets):
|
||||
return hashlib.sha256(data_to_hash.getvalue()).digest()
|
||||
|
||||
|
||||
def load_public_key(stream, use_custom=False):
|
||||
def load_public_key(pubkey_bytes, use_custom=False):
|
||||
"""Parse and validate GPG public key from an input stream."""
|
||||
stream = io.BytesIO(pubkey_bytes)
|
||||
packets = list(parse_packets(util.Reader(stream)))
|
||||
pubkey, userid, signature = packets[:3]
|
||||
packets = packets[3:]
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
import logging
|
||||
import time
|
||||
|
||||
from . import keyring, proto
|
||||
from . import decode, keyring, proto
|
||||
from .. import client, factory, formats, util
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@@ -136,10 +136,10 @@ class Factory(object):
|
||||
sign_packet = proto.packet(tag=2, blob=signature)
|
||||
return pubkey_packet + user_id_packet + sign_packet
|
||||
|
||||
def create_subkey(self):
|
||||
def create_subkey(self, primary_bytes):
|
||||
"""Export new subkey to `self.user_id` GPG primary key."""
|
||||
subkey_packet = proto.packet(tag=14, blob=self.pubkey.data())
|
||||
primary = keyring.get_public_key(self.user_id)
|
||||
primary = decode.load_public_key(primary_bytes)
|
||||
log.info('adding subkey to primary GPG key "%s" (%s)',
|
||||
self.user_id, util.hexlify(primary['key_id']))
|
||||
data_to_sign = primary['_to_hash'] + self.pubkey.data_to_hash()
|
||||
@@ -176,7 +176,7 @@ class Factory(object):
|
||||
hashed_subpackets=hashed_subpackets,
|
||||
unhashed_subpackets=unhashed_subpackets)
|
||||
sign_packet = proto.packet(tag=2, blob=signature)
|
||||
return subkey_packet + sign_packet
|
||||
return primary_bytes + subkey_packet + sign_packet
|
||||
|
||||
def sign_message(self, msg, sign_time=None):
|
||||
"""Sign GPG message at specified time."""
|
||||
|
||||
@@ -8,7 +8,6 @@ import re
|
||||
import socket
|
||||
import subprocess
|
||||
|
||||
from . import decode
|
||||
from .. import util
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@@ -45,7 +44,7 @@ def _recvline(sock):
|
||||
|
||||
|
||||
def unescape(s):
|
||||
"""Unescape ASSUAN message."""
|
||||
"""Unescape ASSUAN message (0xAB <-> '%AB')."""
|
||||
s = bytearray(s)
|
||||
i = 0
|
||||
while i < len(s):
|
||||
@@ -151,13 +150,11 @@ def get_keygrip(user_id):
|
||||
return re.findall(r'Keygrip = (\w+)', output)[0]
|
||||
|
||||
|
||||
def get_public_key(user_id, use_custom=False):
|
||||
"""Load existing GPG public key for `user_id` from local keyring."""
|
||||
def export_public_key(user_id):
|
||||
"""Export GPG public key for specified `user_id`."""
|
||||
args = ['gpg2', '--export'] + ([user_id] if user_id else [])
|
||||
pubkey_bytes = subprocess.check_output(args=args)
|
||||
if pubkey_bytes:
|
||||
return decode.load_public_key(io.BytesIO(pubkey_bytes),
|
||||
use_custom=use_custom)
|
||||
else:
|
||||
result = subprocess.check_output(args=args)
|
||||
if not result:
|
||||
log.error('could not find public key %r in local GPG keyring', user_id)
|
||||
raise KeyError(user_id)
|
||||
return result
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
import argparse
|
||||
import contextlib
|
||||
import logging
|
||||
import subprocess as sp
|
||||
import sys
|
||||
import time
|
||||
import os
|
||||
@@ -21,9 +20,8 @@ def run_create(args):
|
||||
|
||||
with contextlib.closing(f):
|
||||
if args.subkey:
|
||||
subkey = f.create_subkey()
|
||||
primary = sp.check_output(['gpg2', '--export', user_id])
|
||||
result = primary + subkey
|
||||
primary_key = keyring.export_public_key(user_id=user_id)
|
||||
result = f.create_subkey(primary_bytes=primary_key)
|
||||
else:
|
||||
result = f.create_primary()
|
||||
|
||||
@@ -32,7 +30,8 @@ def run_create(args):
|
||||
|
||||
def run_sign(args):
|
||||
"""Generate a GPG signature using hardware-based device."""
|
||||
pubkey = keyring.get_public_key(user_id=None, use_custom=True)
|
||||
pubkey = decode.load_public_key(keyring.export_public_key(user_id=None),
|
||||
use_custom=True)
|
||||
f = encode.Factory.from_public_key(pubkey=pubkey,
|
||||
user_id=pubkey['user_id'])
|
||||
with contextlib.closing(f):
|
||||
|
||||
Reference in New Issue
Block a user