251 lines
9.4 KiB
Python
251 lines
9.4 KiB
Python
# osmo_gsm_tester: VTY connection
|
|
#
|
|
# Copyright (C) 2020 by sysmocom - s.f.m.c. GmbH
|
|
#
|
|
# Author: Neels Hofmeyr <neels@hofmeyr.de>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
# License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import socket
|
|
import struct
|
|
import re
|
|
import time
|
|
import sys
|
|
|
|
from ..core import log
|
|
from ..core.event_loop import MainLoop
|
|
|
|
class VtyInterfaceExn(Exception):
|
|
pass
|
|
|
|
class OsmoVty(log.Origin):
|
|
'''Suggested usage:
|
|
with OsmoVty(...) as vty:
|
|
vty.cmds('enable', 'configure network', 'net')
|
|
response = vty.cmd('foo 1 2 3')
|
|
print('\n'.join(response))
|
|
|
|
Using 'with' ensures that the connection is closed again.
|
|
There should not be nested 'with' statements on this object.
|
|
|
|
Note that test env objects (like tenv.bsc()) may keep a VTY connected until the test exits. A 'with' should not
|
|
be used on those.
|
|
'''
|
|
|
|
##############
|
|
# PROTECTED
|
|
##############
|
|
|
|
def __init__(self, host, port, prompt=None):
|
|
super().__init__(log.C_BUS, 'Vty', host=host, port=port)
|
|
self.host = host
|
|
self.port = port
|
|
self.sck = None
|
|
self.prompt = prompt
|
|
self.re_prompt = None
|
|
self.this_node = None
|
|
self.this_prompt_char = None
|
|
self.last_node = None
|
|
self.last_prompt_char = None
|
|
|
|
def try_connect(self):
|
|
'''Do a connection attempt, return True when successful, False otherwise.
|
|
Does not raise exceptions, but logs them to the debug log.'''
|
|
assert self.sck is None
|
|
try:
|
|
self.dbg('Connecting')
|
|
sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
try:
|
|
sck.connect((self.host, self.port))
|
|
except:
|
|
sck.close()
|
|
raise
|
|
# set self.sck only after the connect was successful
|
|
self.sck = sck
|
|
return True
|
|
except:
|
|
self.dbg('Failed to connect', sys.exc_info()[0])
|
|
return False
|
|
|
|
def _command(self, command_str, timeout=10, strict=True):
|
|
'''Send a command and return the response.'''
|
|
# (copied from https://git.osmocom.org/python/osmo-python-tests/tree/osmopy/osmo_interact/vty.py)
|
|
self.dbg('Sending', command_str=command_str)
|
|
self.sck.send(command_str.encode())
|
|
|
|
waited_since = time.time()
|
|
received_lines = []
|
|
last_line = ''
|
|
|
|
# (not using MainLoop.wait() to accumulate received responses across
|
|
# iterations)
|
|
while True:
|
|
new_data = self.sck.recv(4096).decode('utf-8')
|
|
|
|
last_line = "%s%s" % (last_line, new_data)
|
|
|
|
if last_line:
|
|
# Separate the received response into lines.
|
|
# But note: the VTY logging currently separates with '\n\r', not '\r\n',
|
|
# see _vty_output() in libosmocore logging_vty.c.
|
|
# So we need to jump through hoops to not separate 'abc\n\rdef' as
|
|
# [ 'abc', '', 'def' ]; but also not to convert '\r\n\r\n' to '\r\n\n' ('\r{\r\n}\n')
|
|
# Simplest is to just drop all the '\r' and only care about the '\n'.
|
|
last_line = last_line.replace('\r', '')
|
|
lines = last_line.splitlines()
|
|
if last_line.endswith('\n'):
|
|
received_lines.extend(lines)
|
|
last_line = ""
|
|
else:
|
|
# if pkt buffer ends in the middle of a line, we need to keep
|
|
# last non-finished line:
|
|
received_lines.extend(lines[:-1])
|
|
last_line = lines[-1]
|
|
|
|
match = self.re_prompt.match(last_line)
|
|
if not match:
|
|
if time.time() - waited_since > timeout:
|
|
raise IOError("Failed to read data (did the app crash?)")
|
|
MainLoop.sleep(.1)
|
|
continue
|
|
|
|
self.last_node = self.this_node
|
|
self.last_prompt_char = self.this_prompt_char
|
|
self.this_node = match.group(1) or None
|
|
self.this_prompt_char = match.group(2)
|
|
break
|
|
|
|
# expecting to have received the command we sent as echo, remove it
|
|
clean_command_str = command_str.strip()
|
|
if clean_command_str.endswith('?'):
|
|
clean_command_str = clean_command_str[:-1]
|
|
if received_lines and received_lines[0] == clean_command_str:
|
|
received_lines = received_lines[1:]
|
|
if len(received_lines) > 1:
|
|
self.dbg('Received\n|', '\n| '.join(received_lines), '\n')
|
|
elif len(received_lines) == 1:
|
|
self.dbg('Received', repr(received_lines[0]))
|
|
|
|
if received_lines == ['% Unknown command.']:
|
|
errmsg = 'VTY reports unknown command: %r' % command_str
|
|
if strict:
|
|
raise VtyInterfaceExn(errmsg)
|
|
else:
|
|
self.log('ignoring error:', errmsg)
|
|
|
|
return received_lines
|
|
|
|
########################
|
|
# PUBLIC - INTERNAL API
|
|
########################
|
|
|
|
def connect(self, timeout=30):
|
|
'''Connect to the VTY self.host and self.port, retry for 'timeout' seconds.
|
|
connect() and disconnect() are called implicitly when using the 'with' statement.
|
|
See class OsmoVty's doc.
|
|
'''
|
|
MainLoop.wait(self.try_connect, timestep=3, timeout=timeout)
|
|
self.sck.setblocking(1)
|
|
|
|
# read first prompt
|
|
# (copied from https://git.osmocom.org/python/osmo-python-tests/tree/osmopy/osmo_interact/vty.py)
|
|
self.this_node = None
|
|
self.this_prompt_char = '>' # slight cheat for initial prompt char
|
|
self.last_node = None
|
|
self.last_prompt_char = None
|
|
|
|
data = self.sck.recv(4096)
|
|
if not self.prompt:
|
|
b = data
|
|
b = b[b.rfind(b'\n') + 1:]
|
|
while b and (b[0] < ord('A') or b[0] > ord('z')):
|
|
b = b[1:]
|
|
prompt_str = b.decode('utf-8')
|
|
if '>' in prompt_str:
|
|
self.prompt = prompt_str[:prompt_str.find('>')]
|
|
self.dbg(prompt=self.prompt)
|
|
if not self.prompt:
|
|
raise VtyInterfaceExn('Could not find application name; needed to decode prompts.'
|
|
' Initial data was: %r' % data)
|
|
self.re_prompt = re.compile('^%s(?:\(([\w-]*)\))?([#>]) (.*)$' % self.prompt)
|
|
|
|
def disconnect(self):
|
|
'''Disconnect.
|
|
connect() and disconnect() are called implicitly when using the 'with' statement.
|
|
See class OsmoVty's doc.
|
|
'''
|
|
if self.sck is None:
|
|
return
|
|
self.dbg('Disconnecting')
|
|
self.sck.close()
|
|
self.sck = None
|
|
|
|
###################
|
|
# PUBLIC (test API included)
|
|
###################
|
|
|
|
def cmd(self, command_str, timeout=10, strict=True):
|
|
'''Send one VTY command and return its response.
|
|
Return a list of strings, one string per line, without line break characters:
|
|
[ 'first line', 'second line', 'third line' ]
|
|
When strict==False, do not raise exceptions on '% Unknown command'.
|
|
If the connection is not yet open, briefly connect for only this command and disconnect again. If it is open,
|
|
use the open connection and leave it open.
|
|
'''
|
|
# allow calling for both already connected VTY as well as establishing
|
|
# a connection just for this command.
|
|
if self.sck is None:
|
|
with self:
|
|
return self.cmd(command_str, timeout, strict)
|
|
|
|
# (copied from https://git.osmocom.org/python/osmo-python-tests/tree/osmopy/osmo_interact/vty.py)
|
|
command_str = command_str or '\r'
|
|
if command_str[-1] not in '?\r\t':
|
|
command_str = command_str + '\r'
|
|
|
|
received_lines = self._command(command_str, timeout, strict)
|
|
|
|
# send escape to cancel the '?' command line
|
|
if command_str[-1] == '?':
|
|
self._command('\x03', timeout)
|
|
|
|
return received_lines
|
|
|
|
def cmds(self, *cmds, timeout=10, strict=True):
|
|
'''Send a series of commands and return each command's response:
|
|
cmds('foo', 'bar', 'baz') --> [ ['foo line 1','foo line 2'], ['bar line 1'], ['baz line 1']]
|
|
When strict==False, do not raise exceptions on '% Unknown command'.
|
|
If the connection is not yet open, briefly connect for only these commands and disconnect again. If it is
|
|
open, use the open connection and leave it open.
|
|
'''
|
|
# allow calling for both already connected VTY as well as establishing
|
|
# a connection just for this command.
|
|
if self.sck is None:
|
|
with self:
|
|
return self.cmds(*cmds, timeout=timeout, strict=strict)
|
|
|
|
responses = []
|
|
for cmd in cmds:
|
|
responses.append(self.cmd(cmd, timeout, strict))
|
|
return responses
|
|
|
|
def __enter__(self):
|
|
self.connect()
|
|
return self
|
|
|
|
def __exit__(self, *exc_info):
|
|
self.disconnect()
|
|
|
|
# vim: expandtab tabstop=4 shiftwidth=4
|