Files
trezor-agent/libagent/win_server.py

423 lines
15 KiB
Python

"""Windows named pipe server simulating a UNIX socket."""
import contextlib
import ctypes
import io
import os
import socket
import win32api
import win32event
import win32file
import win32pipe
import winerror
from . import util
kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
PIPE_BUFFER_SIZE = 64 * 1024
CTRL_C_EVENT = 0
THREAD_SET_CONTEXT = 0x0010
# Workaround for Ctrl+C not stopping IO on Windows
# See https://github.com/python/cpython/issues/85609
@contextlib.contextmanager
def ctrl_cancel_async_io(file_handle):
"""Listen for SIGINT and translate it to interrupting IO on the specified file handle."""
@ctypes.WINFUNCTYPE(ctypes.c_uint, ctypes.c_uint)
def ctrl_handler(ctrl_event):
if ctrl_event == CTRL_C_EVENT:
kernel32.CancelIoEx(file_handle, None)
return False
try:
kernel32.SetConsoleCtrlHandler(ctrl_handler, True)
yield
finally:
kernel32.SetConsoleCtrlHandler(ctrl_handler, False)
# Based loosely on https://docs.microsoft.com/en-us/windows/win32/ipc/multithreaded-pipe-server
class NamedPipe:
"""A Windows named pipe.
Can act both as a listener waiting for and processing connections,
or as a client connecting to a listener.
"""
@staticmethod
def __close(handle, disconnect):
"""Closes a named pipe handle."""
if handle == win32file.INVALID_HANDLE_VALUE:
return
win32file.FlushFileBuffers(handle)
if disconnect:
win32pipe.DisconnectNamedPipe(handle)
win32api.CloseHandle(handle)
@staticmethod
def create(name):
"""Opens a named pipe server for receiving connections."""
handle = win32pipe.CreateNamedPipe(
name,
win32pipe.PIPE_ACCESS_DUPLEX | win32file.FILE_FLAG_OVERLAPPED,
win32pipe.PIPE_TYPE_MESSAGE | win32pipe.PIPE_READMODE_MESSAGE | win32pipe.PIPE_WAIT,
win32pipe.PIPE_UNLIMITED_INSTANCES,
PIPE_BUFFER_SIZE,
PIPE_BUFFER_SIZE,
0,
None)
if handle == win32file.INVALID_HANDLE_VALUE:
raise IOError('CreateNamedPipe failed ({0})'.format(win32api.GetLastError()))
try:
pending_io = False
overlapped = win32file.OVERLAPPED()
overlapped.hEvent = win32event.CreateEvent(None, True, True, None)
error_code = win32pipe.ConnectNamedPipe(handle, overlapped)
if error_code == winerror.ERROR_IO_PENDING:
pending_io = True
else:
win32event.SetEvent(overlapped.hEvent)
if error_code != winerror.ERROR_PIPE_CONNECTED:
raise IOError('ConnectNamedPipe failed ({0})'.format(error_code))
ret = NamedPipe(name, handle, overlapped, pending_io, True)
handle = win32file.INVALID_HANDLE_VALUE
return ret
finally:
NamedPipe.__close(handle, True)
@staticmethod
def open(name):
"""Opens a named pipe server for receiving connections."""
handle = win32file.CreateFile(
name,
win32file.GENERIC_READ | win32file.GENERIC_WRITE,
0,
None,
win32file.OPEN_EXISTING,
win32file.FILE_FLAG_OVERLAPPED,
None)
if handle == win32file.INVALID_HANDLE_VALUE:
raise IOError('CreateFile failed ({0})'.format(win32api.GetLastError()))
try:
overlapped = win32file.OVERLAPPED()
overlapped.hEvent = win32event.CreateEvent(None, True, True, None)
win32pipe.SetNamedPipeHandleState(handle, win32pipe.PIPE_READMODE_BYTE, None, None)
ret = NamedPipe(name, handle, overlapped, False, False)
handle = win32file.INVALID_HANDLE_VALUE
return ret
finally:
NamedPipe.__close(handle, False)
def __init__(self, name, handle, overlapped, pending_io, created):
"""Should not be called directly.
Use ``NamedPipe.create`` or ``NamedPipe.open`` instead.
"""
# pylint: disable=too-many-arguments
self.name = name
self.handle = handle
self.overlapped = overlapped
self.pending_io = pending_io
self.created = created
self.retain_buf = bytes()
self.timeout = win32event.INFINITE
def __del__(self):
"""Close the named pipe."""
self.close()
def settimeout(self, timeout):
"""Sets the timeout for IO operations on the named pipe in milliseconds."""
self.timeout = win32event.INFINITE if timeout is None else int(timeout * 1000)
def close(self):
"""Close the named pipe."""
NamedPipe.__close(self.handle, self.created)
self.handle = win32file.INVALID_HANDLE_VALUE
def connect(self):
"""Connect to a named pipe with the specified timeout."""
with ctrl_cancel_async_io(self.handle):
waitHandle = win32event.WaitForSingleObject(self.overlapped.hEvent, self.timeout)
if waitHandle == win32event.WAIT_TIMEOUT:
raise TimeoutError('Timed out waiting for client on pipe {0}'.format(self.name))
if not self.pending_io:
return
win32pipe.GetOverlappedResult(
self.handle,
self.overlapped,
False)
error_code = win32api.GetLastError()
if error_code == winerror.NO_ERROR:
return
raise IOError('Connection to named pipe {0} failed ({1})'.format(self.name, error_code))
def recv(self, size):
"""Read data from the pipe."""
rbuf = win32file.AllocateReadBuffer(min(size, PIPE_BUFFER_SIZE))
try:
error_code, _ = win32file.ReadFile(self.handle, rbuf, self.overlapped)
if error_code not in (winerror.NO_ERROR,
winerror.ERROR_IO_PENDING,
winerror.ERROR_MORE_DATA):
raise IOError('ReadFile failed ({0})'.format(error_code))
except win32api.error as e:
if e.winerror == winerror.ERROR_NO_DATA:
return None
raise
with ctrl_cancel_async_io(self.handle):
win32event.WaitForSingleObject(self.overlapped.hEvent, self.timeout)
try:
chunk_size = win32pipe.GetOverlappedResult(self.handle, self.overlapped, False)
error_code = win32api.GetLastError()
if error_code != winerror.NO_ERROR:
raise IOError('ReadFile failed ({0})'.format(error_code))
return rbuf[:chunk_size] if chunk_size > 0 else None
except win32api.error as e:
if e.winerror == winerror.ERROR_BROKEN_PIPE:
return None
raise
def send(self, data):
"""Write from the specified buffer to the pipe."""
error_code, _ = win32file.WriteFile(self.handle, data, self.overlapped)
if error_code not in (winerror.NO_ERROR,
winerror.ERROR_IO_PENDING,
winerror.ERROR_MORE_DATA):
raise IOError('WriteFile failed ({0})'.format(error_code))
with ctrl_cancel_async_io(self.handle):
win32event.WaitForSingleObject(self.overlapped.hEvent, self.timeout)
written = win32pipe.GetOverlappedResult(self.handle, self.overlapped, False)
error_code = win32api.GetLastError()
if error_code != winerror.NO_ERROR:
raise IOError('WriteFile failed ({0})'.format(error_code))
return written
def sendall(self, data):
"""Send the specified reply to the pipe."""
while len(data) > 0:
written = self.send(data)
data = data[written:]
class InterruptibleSocket:
"""A wrapper for sockets which allows IO operations to be interrupted by SIGINT."""
def __init__(self, sock):
"""Wraps the socket object ``sock``."""
self.sock = sock
def __del__(self):
"""Close the wrapped socket. It should not outlive the wrapper."""
self.close()
def settimeout(self, timeout):
"""Forward to underlying socket."""
self.sock.settimeout(timeout)
def recv(self, size):
"""Forward to underlying socket, while monitoring for SIGINT."""
try:
with ctrl_cancel_async_io(self.sock.fileno()):
return self.sock.recv(size)
except OSError as e:
if e.winerror == 10054:
# Convert socket close to end of file
return None
raise
def sendall(self, reply):
"""Forward to underlying socket, while monitoring for SIGINT."""
with ctrl_cancel_async_io(self.sock.fileno()):
return self.sock.sendall(reply)
def close(self):
"""Forward to underlying socket."""
return self.sock.close()
def getsockname(self):
"""Forward to underlying socket."""
return self.sock.getsockname()
class Server:
"""Listend on an emulated AF_UNIX socket on Windows.
Supports both Gpg4win-style AF_UNIX emulation and OpenSSH-style AF_UNIX emulation
"""
def __init__(self, pipe_name):
"""Opens a socket or named pipe.
If ``pipe_name`` is a byte string, it is interpreted as a Gpg4win-style socket.
The string contains the name of a file which must contain information needed to connect to
a TCP socket listening on localhost emulating an AF_UNIX socket.
Both the file and listening socket are created.
If it is a string, it is interpreted as an OpenSSH-style socket.
The string contains the name of a Windows named pipe.
"""
self.timeout = None
self.pipe_name = pipe_name
self.sock = None
self.pipe = None
if not isinstance(self.pipe_name, str):
# GPG simulated socket via localhost socket
self.key = os.urandom(16)
self.sock = socket.socket()
self.sock.bind(('127.0.0.1', 0))
_, port = self.sock.getsockname()
self.sock.listen(1)
# Write key to file
with open(self.pipe_name, 'wb') as f:
with ctrl_cancel_async_io(f.fileno()):
f.write(str(port).encode())
f.write(b'\n')
f.write(self.key)
def __del__(self):
"""Close the underlying socket or pipe."""
if self.pipe is not None:
self.pipe.close()
self.pipe = None
if self.sock is not None:
self.sock.close()
self.sock = None
def settimeout(self, timeout):
"""Set the timeout in seconds."""
if self.sock:
self.sock.settimeout(timeout)
self.timeout = timeout
def getsockname(self):
"""Return the file path or pipe name used for creating this named pipe."""
return self.pipe_name
def accept(self):
"""Listens for incoming connections on the socket.
Returns a pair ``(pipe, address)`` where ``pipe`` is a connected socket-like object
representing a client, and ``address`` is some string representing the client's address.
When a named pipe is used, the client's address is the same as the pipe name.
"""
if self.sock:
with ctrl_cancel_async_io(self.sock.fileno()):
sock, addr = self.sock.accept()
sock = InterruptibleSocket(sock)
sock.settimeout(self.timeout)
if self.key != util.recv(sock, 16):
sock.close()
# Simulate timeout on failed connection to allow the caller to retry
raise TimeoutError('Illegitimate client tried to connect to pipe {0}'
.format(self.pipe_name))
sock.settimeout(None)
return (sock, addr)
else:
# Named pipe based server
if self.pipe is None:
self.pipe = NamedPipe.create(self.pipe_name)
self.pipe.settimeout(self.timeout)
self.pipe.connect()
self.pipe.settimeout(None)
# A named pipe can only accept a single connection
# It must be recreated if a new connection is to be made
pipe = self.pipe
self.pipe = None
return (pipe, self.pipe_name)
class Client:
"""Connects to an emulated AF_UNIX socket on Windows.
Supports both Gpg4win-style AF_UNIX emulation and OpenSSH-style AF_UNIX emulation
"""
def __init__(self, pipe_name):
"""Connects to a socket or named pipe.
If ``pipe_name`` is a byte string, it is interpreted as a Gpg4win-style socket.
The string contains the name of a file which contains information needed to connect to
a TCP socket listening on localhost emulating an AF_UNIX socket.
If it is a string, it is interpreted as an OpenSSH-style socket.
The string contains the name of a Windows named pipe.
"""
self.pipe_name = pipe_name
self.sock = None
self.pipe = None
if not isinstance(self.pipe_name, str):
# Read key from file
with open(self.pipe_name, 'rb') as f:
with ctrl_cancel_async_io(f.fileno()):
port = io.BytesIO()
while True:
c = f.read(1)
if not c:
raise OSError('Could not read port for socket {0}'.format(pipe_name))
if c == b'\n':
break
if c < b'0' or c > b'9':
raise OSError('Could not read port for socket {0}'.format(pipe_name))
port.write(c)
port = int(port.getvalue())
key_len = 0
key = io.BytesIO()
while key:
c = f.read(16-key_len)
if not c:
raise OSError('Could not read nonce for socket {0}'.format(pipe_name))
key.write(c)
key_len += len(c)
key = key.getvalue()
# Verify end of file
c = f.read(1)
if c:
raise OSError('Corrupt socket {0}'.format(pipe_name))
# GPG simulated socket via localhost socket
sock = socket.socket()
sock.connect(('127.0.0.1', port))
self.sock = InterruptibleSocket(sock)
self.sock.sendall(key)
else:
self.pipe = NamedPipe.open(pipe_name)
def __del__(self):
"""Close the underlying socket or named pipe."""
if self.pipe is not None:
self.pipe.close()
self.pipe = None
if self.sock is not None:
self.sock.close()
self.sock = None
def settimeout(self, timeout):
"""Forward to underlying socket or named pipe."""
if self.sock:
self.sock.settimeout(timeout)
if self.pipe:
self.pipe.settimeout(timeout)
def getsockname(self):
"""Return the file path or pipe name used for connecting to this named pipe."""
return self.pipe_name
def recv(self, size):
"""Forward to underlying socket or named pipe."""
if self.sock is not None:
return self.sock.recv(size)
return self.pipe.recv(size)
def sendall(self, reply):
"""Forward to underlying socket or named pipe."""
if self.sock is not None:
return self.sock.sendall(reply)
return self.pipe.sendall(reply)