vici: Return a Python generator instead of a list for streamed responses

In addition that it may reduce memory usage and improve performance for large
responses, it returns immediate results. This is important for longer lasting
commands, such as initiate/terminate, where immediate log feedback is preferable
when interactively calling such commands.
This commit is contained in:
Martin Willi 2015-03-02 15:25:55 +01:00
parent 90e16837ba
commit a47e431ba9
2 changed files with 25 additions and 47 deletions

View File

@ -900,10 +900,10 @@ An example to print the daemon version information is as simple as:
## A request with response iteration ##
The _Session_ class returns an iterable list for streamed events. Currently a
list is returned with all streamed event messages, but a future release might
provide more scalable object streaming. The following example lists all loaded
connections using the _list-conns_ command and implicitly the _list-conn_ event:
The _Session_ class returns an iterable Python generator for streamed events to
continuously stream objects to the caller. The following example lists all
loaded connections using the _list-conns_ command and implicitly the _list-conn_
event:
for conn in v.list_conns():
for key in conn:

View File

@ -38,8 +38,8 @@ class Session(object):
:param sa: the SA to initiate
:type sa: dict
:return: logs emitted by command
:rtype: list
:return: generator for logs emitted as dict
:rtype: generator
"""
return self.handler.streamed_request("initiate", "control-log", sa)
@ -48,8 +48,8 @@ class Session(object):
:param sa: the SA to terminate
:type sa: dict
:return: logs emitted by command
:rtype: list
:return: generator for logs emitted as dict
:rtype: generator
"""
return self.handler.streamed_request("terminate", "control-log", sa)
@ -74,8 +74,8 @@ class Session(object):
:param filters: retrieve only matching IKE_SAs (optional)
:type filters: dict
:return: list of active IKE_SAs and associated CHILD_SAs
:rtype: list
:return: generator for active IKE_SAs and associated CHILD_SAs as dict
:rtype: generator
"""
return self.handler.streamed_request("list-sas", "list-sa", filters)
@ -84,8 +84,8 @@ class Session(object):
:param filters: retrieve only matching policies (optional)
:type filters: dict
:return: list of installed trap, drop and bypass policies
:rtype: list
:return: generator for installed trap, drop and bypass policies as dict
:rtype: generator
"""
return self.handler.streamed_request("list-policies", "list-policy",
filters)
@ -95,8 +95,8 @@ class Session(object):
:param filters: retrieve only matching configuration names (optional)
:type filters: dict
:return: list of connections
:rtype: list
:return: generator for loaded connections as dict
:rtype: generator
"""
return self.handler.streamed_request("list-conns", "list-conn",
filters)
@ -114,8 +114,8 @@ class Session(object):
:param filters: retrieve only matching certificates (optional)
:type filters: dict
:return: list of installed trap, drop and bypass policies
:rtype: list
:return: generator for loaded certificates as dict
:rtype: generator
"""
return self.handler.streamed_request("list-certs", "list-cert", filters)
@ -203,7 +203,6 @@ class SessionHandler(object):
def __init__(self, transport):
self.transport = transport
self.log_events = collections.deque()
def _communicate(self, packet):
"""Send packet over transport and parse response.
@ -214,7 +213,7 @@ class SessionHandler(object):
:rtype: :py:class:`collections.namedtuple`
"""
self.transport.send(packet)
return self._read()
return Packet.parse(self.transport.receive())
def request(self, command, message=None):
"""Send request with an optional message.
@ -260,11 +259,9 @@ class SessionHandler(object):
:type event_stream_type: str
:param message: message (optional)
:type message: str
:return: a pair of the command result and a list of emitted events
:rtype: tuple
:return: generator for streamed event responses as dict
:rtype: generator
"""
result = []
if message is not None:
message = Message.serialize(message)
@ -284,10 +281,12 @@ class SessionHandler(object):
# issue command, and read any event messages
packet = Packet.request(command, message)
self.transport.send(packet)
response = self._read()
while response.response_type == Packet.EVENT:
result.append(Message.deserialize(response.payload))
response = self._read()
while True:
response = Packet.parse(self.transport.receive())
if response.response_type == Packet.EVENT:
yield Message.deserialize(response.payload)
else:
break
if response.response_type == Packet.CMD_RESPONSE:
Message.deserialize(response.payload)
@ -311,24 +310,3 @@ class SessionHandler(object):
confirm=Packet.EVENT_CONFIRM,
)
)
return result
def _read(self):
"""Get next packet from transport.
:return: parsed packet in a tuple with message type and payload
:rtype: :py:class:`collections.namedtuple`
"""
raw_response = self.transport.receive()
response = Packet.parse(raw_response)
# FIXME
if response.response_type == Packet.EVENT and response.event_type == "log":
# queue up any debug log messages, and get next
self.log_events.append(response)
# do something?
self._read()
else:
return response