gpg: refactor hash algorithm handling
This commit is contained in:
@@ -258,29 +258,23 @@ def parse_packets(stream):
|
||||
yield p
|
||||
|
||||
|
||||
def digest_packets(packets, hashalg):
|
||||
def digest_packets(packets, hasher):
|
||||
"""Compute digest on specified packets, according to '_to_hash' field."""
|
||||
data_to_hash = io.BytesIO()
|
||||
for p in packets:
|
||||
data_to_hash.write(p['_to_hash'])
|
||||
return hashalg(data_to_hash.getvalue()).digest()
|
||||
|
||||
|
||||
def _hash_ripemd160(msg):
|
||||
"""Wrapper for ripemd160 hash."""
|
||||
h = hashlib.new("ripemd160")
|
||||
h.update(msg)
|
||||
return h
|
||||
hasher.update(data_to_hash.getvalue())
|
||||
return hasher.digest()
|
||||
|
||||
|
||||
HASH_ALGORITHMS = {
|
||||
1: hashlib.md5,
|
||||
2: hashlib.sha1,
|
||||
3: _hash_ripemd160,
|
||||
8: hashlib.sha256,
|
||||
9: hashlib.sha384,
|
||||
10: hashlib.sha512,
|
||||
11: hashlib.sha224,
|
||||
1: 'md5',
|
||||
2: 'sha1',
|
||||
3: 'ripemd160',
|
||||
8: 'sha256',
|
||||
9: 'sha384',
|
||||
10: 'sha512',
|
||||
11: 'sha224',
|
||||
}
|
||||
|
||||
|
||||
@@ -291,13 +285,14 @@ def load_public_key(pubkey_bytes, use_custom=False, ecdh=False):
|
||||
pubkey, userid, signature = packets[:3]
|
||||
packets = packets[3:]
|
||||
|
||||
hashalg = HASH_ALGORITHMS.get(signature['hash_alg'])
|
||||
if hashalg is not None:
|
||||
digest = digest_packets([pubkey, userid, signature], hashalg)
|
||||
hash_alg = HASH_ALGORITHMS.get(signature['hash_alg'])
|
||||
if hash_alg is not None:
|
||||
digest = digest_packets(packets=[pubkey, userid, signature],
|
||||
hasher=hashlib.new(hash_alg))
|
||||
assert signature['hash_prefix'] == digest[:2]
|
||||
|
||||
log.debug('loaded public key "%s"', userid['value'])
|
||||
if hashalg is not None and pubkey.get('verifier'):
|
||||
if hash_alg is not None and pubkey.get('verifier'):
|
||||
verify_digest(pubkey=pubkey, digest=digest,
|
||||
signature=signature['sig'], label='GPG public key')
|
||||
else:
|
||||
@@ -324,9 +319,9 @@ def load_public_key(pubkey_bytes, use_custom=False, ecdh=False):
|
||||
def load_signature(stream, original_data):
|
||||
"""Load signature from stream, and compute GPG digest for verification."""
|
||||
signature, = list(parse_packets(util.Reader(stream)))
|
||||
hashalg = HASH_ALGORITHMS.get(signature['hash_alg'])
|
||||
assert hashalg is not None
|
||||
digest = digest_packets([{'_to_hash': original_data}, signature], hashalg)
|
||||
hash_alg = HASH_ALGORITHMS[signature['hash_alg']]
|
||||
digest = digest_packets([{'_to_hash': original_data}, signature],
|
||||
hasher=hashlib.new(hash_alg))
|
||||
assert signature['hash_prefix'] == digest[:2]
|
||||
return signature, digest
|
||||
|
||||
|
||||
Reference in New Issue
Block a user