mncc: Make it possible to build a MNCC server for testing
For manual testing the osmo-sip-connector it is nice to run a custom MNCC server to mock certain behavior. Refactor the socket class to share code between client/server. Change-Id: I8387fe1687557c6a1a26ff1e0cc9dbff3087aa82changes/51/2451/1
parent
08207d9e6f
commit
03980b4e31
45
mncc_sock.py
45
mncc_sock.py
|
@ -52,22 +52,14 @@ def mncc_number(number, num_type = 0, num_plan = 0, num_present = 1, num_screen
|
|||
plan = num_plan, present = num_present,
|
||||
screen = num_screen)
|
||||
|
||||
class MnccSocket(object):
|
||||
def __init__(self, address = '/tmp/bsc_mncc'):
|
||||
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
|
||||
print 'connecting to %s' % address
|
||||
try:
|
||||
self.sock.connect(address)
|
||||
except socket.error, errmsg:
|
||||
print >>sys.stderr, errmsg
|
||||
sys.exit(1)
|
||||
|
||||
# FIXME: parse the HELLO message
|
||||
msg = self.recv()
|
||||
|
||||
class MnccSocketBase(object):
|
||||
def send(self, msg):
|
||||
return self.sock.sendall(msg.send())
|
||||
|
||||
def send_msg(self, msg):
|
||||
data = buffer(msg)[:]
|
||||
return self.sock.sendall(data)
|
||||
|
||||
def recv(self):
|
||||
data = self.sock.recv(1500)
|
||||
ms = mncc_msg()
|
||||
|
@ -76,3 +68,30 @@ class MnccSocket(object):
|
|||
ms = mncc_rtp_msg()
|
||||
ms.receive(data)
|
||||
return ms
|
||||
|
||||
class MnccSocket(MnccSocketBase):
|
||||
def __init__(self, address = '/tmp/bsc_mncc'):
|
||||
super(MnccSocketBase, self).__init__()
|
||||
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
|
||||
print('connecting to %s' % address)
|
||||
try:
|
||||
self.sock.connect(address)
|
||||
except socket.error as errmsg:
|
||||
sys.stderr.write("%s\n" % errmsg)
|
||||
sys.exit(1)
|
||||
|
||||
# FIXME: parse the HELLO message
|
||||
msg = self.recv()
|
||||
|
||||
class MnccSocketServer(object):
|
||||
def __init__(self, address = '/tmp/bsc_mncc'):
|
||||
os.unlink(address)
|
||||
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
|
||||
self.sock.bind(address)
|
||||
self.sock.listen(5)
|
||||
|
||||
def accept(self):
|
||||
(fd,_) = self.sock.accept()
|
||||
sock = MnccSocketBase()
|
||||
sock.sock = fd
|
||||
return sock
|
||||
|
|
Loading…
Reference in New Issue