util: move ASSUAN serialization to break circular import
This commit is contained in:
@@ -4,7 +4,7 @@ import logging
|
|||||||
import os
|
import os
|
||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
from ..gpg import agent
|
from .. import util
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -93,17 +93,17 @@ def interact(title, description, prompt, binary, options):
|
|||||||
env=os.environ)
|
env=os.environ)
|
||||||
expect(p, [b'OK'])
|
expect(p, [b'OK'])
|
||||||
|
|
||||||
title = agent.serialize(title.encode('ascii'))
|
title = util.assuan_serialize(title.encode('ascii'))
|
||||||
write(p, b'SETTITLE ' + title + b'\n')
|
write(p, b'SETTITLE ' + title + b'\n')
|
||||||
expect(p, [b'OK'])
|
expect(p, [b'OK'])
|
||||||
|
|
||||||
if description:
|
if description:
|
||||||
description = agent.serialize(description.encode('ascii'))
|
description = util.assuan_serialize(description.encode('ascii'))
|
||||||
write(p, b'SETDESC ' + description + b'\n')
|
write(p, b'SETDESC ' + description + b'\n')
|
||||||
expect(p, [b'OK'])
|
expect(p, [b'OK'])
|
||||||
|
|
||||||
if prompt:
|
if prompt:
|
||||||
prompt = agent.serialize(prompt.encode('ascii'))
|
prompt = util.assuan_serialize(prompt.encode('ascii'))
|
||||||
write(p, b'SETPROMPT ' + prompt + b'\n')
|
write(p, b'SETPROMPT ' + prompt + b'\n')
|
||||||
expect(p, [b'OK'])
|
expect(p, [b'OK'])
|
||||||
|
|
||||||
|
|||||||
@@ -21,25 +21,17 @@ def yield_connections(sock):
|
|||||||
yield conn
|
yield conn
|
||||||
|
|
||||||
|
|
||||||
def serialize(data):
|
|
||||||
"""Serialize data according to ASSUAN protocol."""
|
|
||||||
for c in [b'%', b'\n', b'\r']:
|
|
||||||
escaped = '%{:02X}'.format(ord(c)).encode('ascii')
|
|
||||||
data = data.replace(c, escaped)
|
|
||||||
return data
|
|
||||||
|
|
||||||
|
|
||||||
def sig_encode(r, s):
|
def sig_encode(r, s):
|
||||||
"""Serialize ECDSA signature data into GPG S-expression."""
|
"""Serialize ECDSA signature data into GPG S-expression."""
|
||||||
r = serialize(util.num2bytes(r, 32))
|
r = util.assuan_serialize(util.num2bytes(r, 32))
|
||||||
s = serialize(util.num2bytes(s, 32))
|
s = util.assuan_serialize(util.num2bytes(s, 32))
|
||||||
return b'(7:sig-val(5:ecdsa(1:r32:' + r + b')(1:s32:' + s + b')))'
|
return b'(7:sig-val(5:ecdsa(1:r32:' + r + b')(1:s32:' + s + b')))'
|
||||||
|
|
||||||
|
|
||||||
def _serialize_point(data):
|
def _serialize_point(data):
|
||||||
prefix = '{}:'.format(len(data)).encode('ascii')
|
prefix = '{}:'.format(len(data)).encode('ascii')
|
||||||
# https://www.gnupg.org/documentation/manuals/assuan/Server-responses.html
|
# https://www.gnupg.org/documentation/manuals/assuan/Server-responses.html
|
||||||
return b'(5:value' + serialize(prefix + data) + b')'
|
return b'(5:value' + util.assuan_serialize(prefix + data) + b')'
|
||||||
|
|
||||||
|
|
||||||
def parse_ecdh(line):
|
def parse_ecdh(line):
|
||||||
|
|||||||
11
libagent/gpg/tests/test_agent.py
Normal file
11
libagent/gpg/tests/test_agent.py
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
from .. import agent
|
||||||
|
|
||||||
|
|
||||||
|
def test_sig_encode():
|
||||||
|
SIG = (
|
||||||
|
b'(7:sig-val(5:ecdsa(1:r32:\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||||
|
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||||
|
b'\x00\x00\x00\x00\x0c)(1:s32:\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||||
|
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||||
|
b'\x00\x00\x00\x00")))')
|
||||||
|
assert agent.sig_encode(12, 34) == SIG
|
||||||
@@ -115,3 +115,9 @@ def test_memoize():
|
|||||||
assert g(1) == g(1)
|
assert g(1) == g(1)
|
||||||
assert g(1) != g(2)
|
assert g(1) != g(2)
|
||||||
assert f.mock_calls == [mock.call(1), mock.call(2)]
|
assert f.mock_calls == [mock.call(1), mock.call(2)]
|
||||||
|
|
||||||
|
|
||||||
|
def test_assuan_serialize():
|
||||||
|
assert util.assuan_serialize(b'') == b''
|
||||||
|
assert util.assuan_serialize(b'123\n456') == b'123%0A456'
|
||||||
|
assert util.assuan_serialize(b'\r\n') == b'%0D%0A'
|
||||||
|
|||||||
@@ -229,3 +229,11 @@ def which(cmd):
|
|||||||
raise OSError('Cannot find {!r} in $PATH'.format(cmd))
|
raise OSError('Cannot find {!r} in $PATH'.format(cmd))
|
||||||
log.debug('which %r => %r', cmd, full_path)
|
log.debug('which %r => %r', cmd, full_path)
|
||||||
return full_path
|
return full_path
|
||||||
|
|
||||||
|
|
||||||
|
def assuan_serialize(data):
|
||||||
|
"""Serialize data according to ASSUAN protocol (for GPG daemon communication)."""
|
||||||
|
for c in [b'%', b'\n', b'\r']:
|
||||||
|
escaped = '%{:02X}'.format(ord(c)).encode('ascii')
|
||||||
|
data = data.replace(c, escaped)
|
||||||
|
return data
|
||||||
|
|||||||
Reference in New Issue
Block a user