2017-12-15 10:56:05 +00:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
2017-12-21 13:38:39 +00:00
|
|
|
# just a smoke test for osmopy
|
2017-12-15 10:56:05 +00:00
|
|
|
|
2018-01-14 17:27:31 +00:00
|
|
|
import asyncio, random, sys, os
|
|
|
|
|
|
|
|
# we have to use this ugly hack to workaroundbrokenrelative imports in py3:
|
|
|
|
# from ..osmopy.osmo_ipa import Ctrl
|
|
|
|
# does not work as expected
|
|
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
|
2017-12-21 13:38:39 +00:00
|
|
|
from osmopy.osmo_ipa import Ctrl
|
|
|
|
from osmopy import __version__
|
2017-12-15 10:56:05 +00:00
|
|
|
|
2017-12-21 13:38:39 +00:00
|
|
|
class CtrlProtocol(asyncio.Protocol):
|
|
|
|
def connection_made(self, transport):
|
|
|
|
peername = transport.get_extra_info('peername')
|
|
|
|
print('Connection from {}'.format(peername))
|
|
|
|
self.transport = transport
|
|
|
|
|
|
|
|
def data_received(self, data):
|
|
|
|
(i, v, k) = Ctrl().parse(data)
|
|
|
|
if not k:
|
|
|
|
print('Ctrl GET received: %s' % v)
|
|
|
|
else:
|
|
|
|
print('Ctrl SET received: %s :: %s' % (v, k))
|
|
|
|
|
|
|
|
message = Ctrl().reply(i, v, k)
|
|
|
|
self.transport.write(message)
|
|
|
|
|
|
|
|
self.transport.close()
|
|
|
|
# quit the loop gracefully
|
|
|
|
print('Closing the loop...')
|
|
|
|
loop.stop()
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
loop = asyncio.get_event_loop()
|
|
|
|
test_host = '127.0.0.5'
|
|
|
|
test_port = str(random.randint(1025, 60000))
|
|
|
|
|
2018-01-15 13:08:54 +00:00
|
|
|
print('Testing v%s on %s:%s' % (__version__, test_host, test_port))
|
|
|
|
|
2017-12-21 13:38:39 +00:00
|
|
|
# Each client connection will create a new protocol instance
|
|
|
|
server = loop.run_until_complete(loop.create_server(CtrlProtocol, test_host, test_port))
|
|
|
|
|
|
|
|
print('Serving on {}...'.format(server.sockets[0].getsockname()))
|
|
|
|
|
|
|
|
# Async client running in the subprocess plugged to the same event loop
|
|
|
|
loop.run_until_complete(asyncio.gather(asyncio.create_subprocess_exec('./scripts/osmo_ctrl.py', '-g', 'mnc', '-d', test_host, '-p', test_port), loop = loop))
|
|
|
|
|
|
|
|
loop.run_forever()
|
|
|
|
|
|
|
|
# Cleanup after loop is finished
|
|
|
|
server.close()
|
|
|
|
loop.run_until_complete(server.wait_closed())
|
|
|
|
loop.close()
|
|
|
|
|
|
|
|
print('[Python3] Smoke test PASSED for v%s' % __version__)
|