Fusion360-Addons/python/Lib/site-packages/ptvsd/messaging.py
2021-10-31 19:12:13 +01:00

420 lines
14 KiB
Python

# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE in the project root
# for license information.
from __future__ import print_function, with_statement, absolute_import
import contextlib
import itertools
import json
import sys
import threading
import traceback
from ._util import new_hidden_thread
class JsonIOStream(object):
"""Implements a JSON value stream over two byte streams (input and output).
Each value is encoded as a packet consisting of a header and a body, as defined by the
Debug Adapter Protocol (https://microsoft.github.io/debug-adapter-protocol/overview).
"""
MAX_BODY_SIZE = 0xFFFFFF
@classmethod
def from_stdio(cls):
if sys.version_info >= (3,):
stdin = sys.stdin.buffer
stdout = sys.stdout.buffer
else:
stdin = sys.stdin
stdout = sys.stdout
if sys.platform == 'win32':
import os, msvcrt
msvcrt.setmode(stdin.fileno(), os.O_BINARY)
msvcrt.setmode(stdout.fileno(), os.O_BINARY)
return cls(stdin, stdout)
@classmethod
def from_socket(cls, socket):
if socket.gettimeout() is not None:
raise ValueError('Socket must be in blocking mode')
socket_io = socket.makefile('rwb', 0)
return cls(socket_io, socket_io)
def __init__(self, reader, writer):
"""Creates a new JsonIOStream.
reader is a BytesIO-like object from which incoming messages are read;
reader.readline() must treat '\n' as the line terminator, and must leave
'\r' as is (i.e. it must not translate '\r\n' to just plain '\n'!).
writer is a BytesIO-like object to which outgoing messages are written.
"""
self._reader = reader
self._writer = writer
self._is_closing = False
def close(self):
self._is_closing = True
self._reader.close()
self._writer.close()
def _read_line(self):
line = b''
while True:
try:
line += self._reader.readline()
except Exception:
raise EOFError
if not line:
raise EOFError
if line.endswith(b'\r\n'):
line = line[0:-2]
return line
def read_json(self):
"""Read a single JSON value from reader.
Returns JSON value as parsed by json.loads(), or raises EOFError
if there are no more objects to be read.
"""
headers = {}
while True:
line = self._read_line()
if line == b'':
break
key, _, value = line.partition(b':')
headers[key] = value
try:
length = int(headers[b'Content-Length'])
if not (0 <= length <= self.MAX_BODY_SIZE):
raise ValueError
except (KeyError, ValueError):
raise IOError('Content-Length is missing or invalid')
try:
body = self._reader.read(length)
except Exception:
if self._is_closing:
raise EOFError
else:
raise
if isinstance(body, bytes):
body = body.decode('utf-8')
return json.loads(body)
def write_json(self, value):
"""Write a single JSON object to writer.
object must be in the format suitable for json.dump().
"""
body = json.dumps(value, sort_keys=True)
if not isinstance(body, bytes):
body = body.encode('utf-8')
header = 'Content-Length: %d\r\n\r\n' % len(body)
if not isinstance(header, bytes):
header = header.encode('ascii')
self._writer.write(header)
self._writer.write(body)
class Request(object):
"""Represents an incoming or an outgoing request.
Incoming requests are represented by instances of this class.
Outgoing requests are represented by instances of OutgoingRequest, which
provides additional functionality to handle responses.
"""
def __init__(self, channel, seq, command, arguments):
self.channel = channel
self.seq = seq
self.command = command
self.arguments = arguments
class OutgoingRequest(Request):
"""Represents an outgoing request, for which it is possible to wait for a
response to be received, and register a response callback.
"""
def __init__(self, *args):
super(OutgoingRequest, self).__init__(*args)
self.response = None
self._lock = threading.Lock()
self._got_response = threading.Event()
self._callback = lambda _: None
def _handle_response(self, seq, command, body):
assert self.response is None
with self._lock:
response = Response(self.channel, seq, self, body)
self.response = response
callback = self._callback
callback(response)
self._got_response.set()
def wait_for_response(self, raise_if_failed=True):
"""Waits until a response is received for this request, records that
response as a new Response object accessible via self.response, and
returns self.response.body.
If raise_if_failed is True, and the received response does not indicate
success, raises RequestFailure. Otherwise, self.response.body has to be
inspected to determine whether the request failed or succeeded.
"""
self._got_response.wait()
if raise_if_failed and not self.response.success:
raise self.response.body
return self.response.body
def on_response(self, callback):
"""Registers a callback to invoke when a response is received for this
request. If response was already received, invokes callback immediately.
Callback is invoked with Response as the sole arugment.
The callback is invoked on an unspecified background thread that performs
processing of incoming messages; therefore, no further message processing
occurs until the callback returns.
"""
with self._lock:
response = self.response
if response is None:
self._callback = callback
return
callback(response)
class Response(object):
"""Represents a response to a Request.
"""
def __init__(self, channel, seq, request, body):
self.channel = channel
self.seq = seq
self.request = request
"""Request object that this is a response to.
"""
self.body = body
"""Body of the response if the request was successful, or an instance
of some class derived from Exception it it was not.
If a response was received from the other side, but it was marked as
failure, it is an instance of RequestFailure, capturing the received
error message.
If a response was never received from the other side (e.g. because it
disconnected before sending a response), it is EOFError.
"""
@property
def success(self):
return not isinstance(self.body, Exception)
class Event(object):
"""Represents a received event.
"""
def __init__(self, channel, seq, event, body):
self.channel = channel
self.seq = seq
self.event = event
self.body = body
class RequestFailure(Exception):
def __init__(self, message):
self.message = message
def __eq__(self, other):
if not isinstance(other, RequestFailure):
return NotImplemented
return self.message == other.message
def __ne__(self, other):
return not self == other
def __repr__(self):
return 'RequestFailure(%r)' % self.message
def __str__(self):
return self.message
class JsonMessageChannel(object):
"""Implements a JSON message channel on top of a JSON stream, with
support for generic Request, Response and Event messages as defined by the
Debug Adapter Protocol (https://microsoft.github.io/debug-adapter-protocol/overview).
"""
def __init__(self, stream, handlers=None, name='vsc_messaging'):
self.stream = stream
self.send_callback = lambda channel, message: None
self.receive_callback = lambda channel, message: None
self._lock = threading.Lock()
self._stop = threading.Event()
self._seq_iter = itertools.count(1)
self._requests = {}
self._handlers = handlers
self._worker = new_hidden_thread(name, self._process_incoming_messages)
self._worker.daemon = True
def close(self):
self.stream.close()
def start(self):
self._worker.start()
def wait(self):
self._worker.join()
@contextlib.contextmanager
def _send_message(self, type, rest={}):
with self._lock:
seq = next(self._seq_iter)
message = {
'seq': seq,
'type': type,
}
message.update(rest)
with self._lock:
yield seq
self.stream.write_json(message)
self.send_callback(self, message)
def send_request(self, command, arguments=None):
d = {'command': command}
if arguments is not None:
d['arguments'] = arguments
with self._send_message('request', d) as seq:
request = OutgoingRequest(self, seq, command, arguments)
self._requests[seq] = request
return request
def send_event(self, event, body=None):
d = {'event': event}
if body is not None:
d['body'] = body
with self._send_message('event', d):
pass
def _send_response(self, request_seq, success, command, error_message, body):
d = {
'request_seq': request_seq,
'success': success,
'command': command,
}
if success:
if body is not None:
d['body'] = body
else:
if error_message is not None:
d['message'] = error_message
with self._send_message('response', d):
pass
def on_message(self, message):
self.receive_callback(self, message)
seq = message['seq']
typ = message['type']
if typ == 'request':
command = message['command']
arguments = message.get('arguments', None)
self.on_request(seq, command, arguments)
elif typ == 'event':
event = message['event']
body = message.get('body', None)
self.on_event(seq, event, body)
elif typ == 'response':
request_seq = message['request_seq']
success = message['success']
command = message['command']
error_message = message.get('message', None)
body = message.get('body', None)
self.on_response(seq, request_seq, success, command, error_message, body)
else:
raise IOError('Incoming message has invalid "type":\n%r' % message)
def on_request(self, seq, command, arguments):
handler_name = '%s_request' % command
handler = getattr(self._handlers, handler_name, None)
if handler is None:
try:
handler = getattr(self._handlers, 'request')
except AttributeError:
raise AttributeError('%r has no handler for request %r' % (self._handlers, command))
request = Request(self, seq, command, arguments)
try:
response_body = handler(request)
except Exception as ex:
self._send_response(seq, False, command, str(ex), None)
else:
if isinstance(response_body, Exception):
self._send_response(seq, False, command, str(response_body), None)
else:
self._send_response(seq, True, command, None, response_body)
def on_event(self, seq, event, body):
handler_name = '%s_event' % event
handler = getattr(self._handlers, handler_name, None)
if handler is None:
try:
handler = getattr(self._handlers, 'event')
except AttributeError:
raise AttributeError('%r has no handler for event %r' % (self._handlers, event))
handler(Event(self, seq, event, body))
def on_response(self, seq, request_seq, success, command, error_message, body):
try:
with self._lock:
request = self._requests.pop(request_seq)
except KeyError:
raise KeyError('Received response to unknown request %d', request_seq)
if not success:
body = RequestFailure(error_message)
return request._handle_response(seq, command, body)
def _process_incoming_messages(self):
try:
while True:
try:
message = self.stream.read_json()
except EOFError:
break
try:
self.on_message(message)
except Exception:
print('Error while processing message:\n%r\n\n' % message, file=sys.__stderr__)
traceback.print_exc(file=sys.__stderr__)
raise
finally:
# There's no more incoming messages, so any requests that are still pending
# must be marked as failed to unblock anyone waiting on them.
with self._lock:
for request in self._requests.values():
request._handle_response(None, request.command, EOFError('No response'))
class MessageHandlers(object):
"""A simple delegating message handlers object for use with JsonMessageChannel.
For every argument provided, the object has an attribute with the corresponding
name and value. Example:
"""
def __init__(self, **kwargs):
for name, func in kwargs.items():
setattr(self, name, func)