From a9bcabc664a6e2c95e796278b8934dbee2034717 Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Sat, 4 Feb 2023 18:41:54 +0200 Subject: [PATCH] Fix a few pycodestyle & pystyle issues --- libagent/device/ledger.py | 17 ++++++----- libagent/device/onlykey.py | 2 +- libagent/gpg/agent.py | 2 +- libagent/gpg/encode.py | 4 +-- libagent/server.py | 2 +- libagent/ssh/__init__.py | 6 ++-- libagent/ssh/protocol.py | 54 +++++++++++++++++------------------ libagent/tests/test_server.py | 2 +- 8 files changed, 46 insertions(+), 43 deletions(-) diff --git a/libagent/device/ledger.py b/libagent/device/ledger.py index cc50e96..b3916a6 100644 --- a/libagent/device/ledger.py +++ b/libagent/device/ledger.py @@ -36,11 +36,13 @@ def _convert_public_key(ecdsa_curve_name, result): class LedgerNanoS(interface.Device): """Connection to Ledger Nano S device.""" + LEDGER_APP_NAME = "SSH/PGP Agent" ledger_app_version = None ledger_app_supports_end_of_frame_byte = True def get_app_name_and_version(self, dongle): + """Retrieve currently running Ledger application name and its version string.""" device_version_answer = dongle.exchange(binascii.unhexlify('B001000000')) offset = 1 app_name_length = struct.unpack_from("B", device_version_answer, offset)[0] @@ -50,7 +52,7 @@ class LedgerNanoS(interface.Device): app_version_length = struct.unpack_from("B", device_version_answer, offset)[0] offset += 1 app_version = device_version_answer[offset: offset + app_version_length] - log.debug("running app {}, version {}".format(app_name, app_version)) + log.debug("running app %s, version %s", app_name, app_version) return (app_name.decode(), app_version.decode()) @classmethod @@ -64,13 +66,13 @@ class LedgerNanoS(interface.Device): dongle = comm.getDongle(debug=True) (app_name, self.ledger_app_version) = self.get_app_name_and_version(dongle) - self.ledger_app_version = self.ledger_app_version.split(".") - if self.ledger_app_version[0] == "0" and self.ledger_app_version[1] == "0" and int(self.ledger_app_version[2]) <= 7: + version_parts = self.ledger_app_version.split(".") + if (version_parts[0] == "0" and version_parts[1] == "0" and int(version_parts[2]) <= 7): self.ledger_app_supports_end_of_frame_byte = False if app_name != LedgerNanoS.LEDGER_APP_NAME: # we could launch the app here if we are in the dashboard - raise interface.DeviceError('{} is not running {}'.format(self, LedgerNanoS.LEDGER_APP_NAME)) + raise interface.DeviceError(f'{self} is not running {LedgerNanoS.LEDGER_APP_NAME}') return dongle except comm.CommException as e: @@ -98,6 +100,7 @@ class LedgerNanoS(interface.Device): def sign(self, identity, blob): """Sign given blob and return the signature (as bytes).""" + # pylint: disable=too-many-locals,too-many-branches path = _expand_path(identity.get_bip32_address(ecdh=False)) offset = 0 result = None @@ -106,7 +109,7 @@ class LedgerNanoS(interface.Device): if offset == 0: data += bytearray([len(path) // 4]) + path chunk_size = min(len(blob) - offset, 255 - len(data)) - data += blob[offset : offset + chunk_size] + data += blob[offset:offset + chunk_size] if identity.identity_dict['proto'] == 'ssh': ins = '04' @@ -120,8 +123,8 @@ class LedgerNanoS(interface.Device): if offset == 0: p1 = "00" - elif ((offset + chunk_size) == len(blob)) and self.ledger_app_supports_end_of_frame_byte: - p1 = "81" # end of frame byte only handled in 0.0.8+ + elif offset + chunk_size == len(blob) and self.ledger_app_supports_end_of_frame_byte: + p1 = "81" # end of frame byte only handled in 0.0.8+ else: p1 = "01" diff --git a/libagent/device/onlykey.py b/libagent/device/onlykey.py index 5594963..710f0fe 100644 --- a/libagent/device/onlykey.py +++ b/libagent/device/onlykey.py @@ -289,7 +289,7 @@ class OnlyKey(interface.Device): log.info('received= %s', repr(result)) return bytes(result) - raise Exception('failed to sign challenge') + raise interface.Error('failed to sign challenge') def ecdh(self, identity, pubkey): """Get shared session key using Elliptic Curve Diffie-Hellman.""" diff --git a/libagent/gpg/agent.py b/libagent/gpg/agent.py index 63bc157..15c9364 100644 --- a/libagent/gpg/agent.py +++ b/libagent/gpg/agent.py @@ -164,7 +164,7 @@ class Handler: # We assume the first user ID is used to generate TREZOR-based GPG keys. user_id = user_ids[0]['value'].decode('utf-8') curve_name = protocol.get_curve_name_by_oid(pubkey_dict['curve_oid']) - ecdh = (pubkey_dict['algo'] == protocol.ECDH_ALGO_ID) + ecdh = pubkey_dict['algo'] == protocol.ECDH_ALGO_ID identity = client.create_identity(user_id=user_id, curve_name=curve_name) verifying_key = self.client.pubkey(identity=identity, ecdh=ecdh) diff --git a/libagent/gpg/encode.py b/libagent/gpg/encode.py index 0eac71f..44c3d2e 100644 --- a/libagent/gpg/encode.py +++ b/libagent/gpg/encode.py @@ -11,7 +11,7 @@ log = logging.getLogger(__name__) def create_primary(user_id, pubkey, signer_func, secret_bytes=b''): """Export new primary GPG public key, ready for "gpg2 --import".""" pubkey_packet = protocol.packet(tag=(5 if secret_bytes else 6), - blob=(pubkey.data() + secret_bytes)) + blob=pubkey.data() + secret_bytes) user_id_bytes = user_id.encode('utf-8') user_id_packet = protocol.packet(tag=13, blob=user_id_bytes) data_to_sign = (pubkey.data_to_hash() + user_id_packet[:1] + @@ -51,7 +51,7 @@ def create_primary(user_id, pubkey, signer_func, secret_bytes=b''): def create_subkey(primary_bytes, subkey, signer_func, secret_bytes=b''): """Export new subkey to GPG primary key.""" subkey_packet = protocol.packet(tag=(7 if secret_bytes else 14), - blob=(subkey.data() + secret_bytes)) + blob=subkey.data() + secret_bytes) packets = list(decode.parse_packets(io.BytesIO(primary_bytes))) primary, user_id, signature = packets[:3] diff --git a/libagent/server.py b/libagent/server.py index f02792e..8e16c8d 100644 --- a/libagent/server.py +++ b/libagent/server.py @@ -134,7 +134,7 @@ def server_thread(sock, handle_conn, quit_event): break # Handle connections from SSH concurrently. threading.Thread(target=handle_conn, - kwargs=dict(conn=conn)).start() + kwargs={'conn': conn}).start() log.debug('server thread stopped') diff --git a/libagent/ssh/__init__.py b/libagent/ssh/__init__.py index af31ab5..5d6734e 100644 --- a/libagent/ssh/__init__.py +++ b/libagent/ssh/__init__.py @@ -138,9 +138,9 @@ def serve(handler, sock_path, timeout=UNIX_SOCKET_TIMEOUT): handle_conn = functools.partial(server.handle_connection, handler=handler, mutex=device_mutex) - kwargs = dict(sock=sock, - handle_conn=handle_conn, - quit_event=quit_event) + kwargs = {'sock': sock, + 'handle_conn': handle_conn, + 'quit_event': quit_event} with server.spawn(server.server_thread, kwargs): try: yield environ diff --git a/libagent/ssh/protocol.py b/libagent/ssh/protocol.py index 34c0452..7c3e675 100644 --- a/libagent/ssh/protocol.py +++ b/libagent/ssh/protocol.py @@ -16,33 +16,33 @@ log = logging.getLogger(__name__) # Taken from https://github.com/openssh/openssh-portable/blob/master/authfd.h -COMMANDS = dict( - SSH_AGENTC_REQUEST_RSA_IDENTITIES=1, - SSH_AGENT_RSA_IDENTITIES_ANSWER=2, - SSH_AGENTC_RSA_CHALLENGE=3, - SSH_AGENT_RSA_RESPONSE=4, - SSH_AGENT_FAILURE=5, - SSH_AGENT_SUCCESS=6, - SSH_AGENTC_ADD_RSA_IDENTITY=7, - SSH_AGENTC_REMOVE_RSA_IDENTITY=8, - SSH_AGENTC_REMOVE_ALL_RSA_IDENTITIES=9, - SSH2_AGENTC_REQUEST_IDENTITIES=11, - SSH2_AGENT_IDENTITIES_ANSWER=12, - SSH2_AGENTC_SIGN_REQUEST=13, - SSH2_AGENT_SIGN_RESPONSE=14, - SSH2_AGENTC_ADD_IDENTITY=17, - SSH2_AGENTC_REMOVE_IDENTITY=18, - SSH2_AGENTC_REMOVE_ALL_IDENTITIES=19, - SSH_AGENTC_ADD_SMARTCARD_KEY=20, - SSH_AGENTC_REMOVE_SMARTCARD_KEY=21, - SSH_AGENTC_LOCK=22, - SSH_AGENTC_UNLOCK=23, - SSH_AGENTC_ADD_RSA_ID_CONSTRAINED=24, - SSH2_AGENTC_ADD_ID_CONSTRAINED=25, - SSH_AGENTC_ADD_SMARTCARD_KEY_CONSTRAINED=26, - SSH_AGENTC_EXTENSION=27, - SSH_AGENT_EXTENSION_FAILURE=28, -) +COMMANDS = { + "SSH_AGENTC_REQUEST_RSA_IDENTITIES": 1, + "SSH_AGENT_RSA_IDENTITIES_ANSWER": 2, + "SSH_AGENTC_RSA_CHALLENGE": 3, + "SSH_AGENT_RSA_RESPONSE": 4, + "SSH_AGENT_FAILURE": 5, + "SSH_AGENT_SUCCESS": 6, + "SSH_AGENTC_ADD_RSA_IDENTITY": 7, + "SSH_AGENTC_REMOVE_RSA_IDENTITY": 8, + "SSH_AGENTC_REMOVE_ALL_RSA_IDENTITIES": 9, + "SSH2_AGENTC_REQUEST_IDENTITIES": 11, + "SSH2_AGENT_IDENTITIES_ANSWER": 12, + "SSH2_AGENTC_SIGN_REQUEST": 13, + "SSH2_AGENT_SIGN_RESPONSE": 14, + "SSH2_AGENTC_ADD_IDENTITY": 17, + "SSH2_AGENTC_REMOVE_IDENTITY": 18, + "SSH2_AGENTC_REMOVE_ALL_IDENTITIES": 19, + "SSH_AGENTC_ADD_SMARTCARD_KEY": 20, + "SSH_AGENTC_REMOVE_SMARTCARD_KEY": 21, + "SSH_AGENTC_LOCK": 22, + "SSH_AGENTC_UNLOCK": 23, + "SSH_AGENTC_ADD_RSA_ID_CONSTRAINED": 24, + "SSH2_AGENTC_ADD_ID_CONSTRAINED": 25, + "SSH_AGENTC_ADD_SMARTCARD_KEY_CONSTRAINED": 26, + "SSH_AGENTC_EXTENSION": 27, + "SSH_AGENT_EXTENSION_FAILURE": 28, +} def msg_code(name): diff --git a/libagent/tests/test_server.py b/libagent/tests/test_server.py index 918bf8c..947160a 100644 --- a/libagent/tests/test_server.py +++ b/libagent/tests/test_server.py @@ -102,7 +102,7 @@ def test_spawn(): def thread(x): obj.append(x) - with server.spawn(thread, dict(x=1)): + with server.spawn(thread, {'x': 1}): pass assert obj == [1]