YAYPM update.
git-svn-id: http://yate.null.ro/svn/yate/trunk@1873 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
parent
8b6138eea2
commit
8a9e2db797
|
@ -28,7 +28,6 @@ from twisted.protocols.basic import LineReceiver
|
|||
from twisted.internet.protocol import Protocol, ClientFactory
|
||||
from twisted.python import failure
|
||||
from threading import Thread, Event
|
||||
from logging import Formatter
|
||||
import logging, imp, time, random, traceback, string
|
||||
|
||||
try:
|
||||
|
@ -50,8 +49,9 @@ try:
|
|||
except Exception:
|
||||
pass
|
||||
|
||||
logger = logging.getLogger('yaypm.debug')
|
||||
logger_messages = logging.getLogger('yaypm.debug.messages')
|
||||
#main_logger = logging.getLogger('yaypm')
|
||||
logger = logging.getLogger('yaypm.internals')
|
||||
logger_messages = logging.getLogger('yaypm.messages')
|
||||
|
||||
_HANDLER_TYPE_MSG=0
|
||||
_HANDLER_TYPE_WCH=1
|
||||
|
@ -72,8 +72,8 @@ class AbandonedException(Exception):
|
|||
|
||||
"""
|
||||
def __init__(self, cause):
|
||||
Exception.__init__(self)
|
||||
self.cause = cause
|
||||
Exception.__init__(self, cause)
|
||||
|
||||
class DisconnectedException(Exception):
|
||||
"""
|
||||
|
@ -99,7 +99,7 @@ class CancellableDeferred(defer.Deferred):
|
|||
self.cancelled = False
|
||||
|
||||
def cancel(self, *args, **kwargs):
|
||||
canceller=self.canceller
|
||||
canceller = self.canceller
|
||||
if not self.called:
|
||||
self.cancelled = True
|
||||
if canceller:
|
||||
|
@ -134,10 +134,10 @@ class Dispatcher:
|
|||
lambda d, m = None : self._cancelHandler(m, name, hdlr_type, d))
|
||||
|
||||
key = (name, hdlr_type)
|
||||
handlers = self.handlers.get(key, [])
|
||||
handlers = self.handlers.get(key, {})
|
||||
if not handlers:
|
||||
self.handlers[key] = handlers
|
||||
handlers.append((guard, d))
|
||||
handlers[d] = guard
|
||||
|
||||
if until:
|
||||
def until_callback(m):
|
||||
|
@ -149,6 +149,9 @@ class Dispatcher:
|
|||
|
||||
return d
|
||||
|
||||
def is_handler_installed(name, hdlr_type):
|
||||
return self.handlers.get((name, hdlr_type), None) != None
|
||||
|
||||
def _register_and_install_handler(
|
||||
self, name, hdlr_type, guard, until, autoreturn = False):
|
||||
|
||||
|
@ -190,46 +193,32 @@ class Dispatcher:
|
|||
|
||||
handlers = self.handlers.get(key, None)
|
||||
if not handlers:
|
||||
|
||||
return False
|
||||
|
||||
#sentinel guards agains inifite loop
|
||||
#handlers added in handler won't be handled this time
|
||||
sentinel = None
|
||||
handlers.append(sentinel)
|
||||
|
||||
done = False
|
||||
to_check = handlers.keys()
|
||||
|
||||
for handler in handlers:
|
||||
if handler == sentinel:
|
||||
#Todo: enyone has an idea how to avoid
|
||||
#going through handlers twice?
|
||||
#a dirty hack might be to search it by hand starting
|
||||
#from len(handlers) that should be faster for longer lists
|
||||
break
|
||||
guard, d = handler
|
||||
if guard(m):
|
||||
for d in to_check:
|
||||
guard = handlers.get(d, None)
|
||||
if guard and guard(m):
|
||||
done = True
|
||||
if logger_messages.isEnabledFor(logging.DEBUG):
|
||||
logger.debug(
|
||||
"firing handler for %s: %s",
|
||||
_MSG_TYPE_DSCS[hdlr_type], m.getName())
|
||||
try:
|
||||
d.callback(m)
|
||||
handlers.remove(handler)
|
||||
except defer.AlreadyCalledError, e:
|
||||
logger.exception(
|
||||
"Message handled twice resulted in AlreadyCalledError. Swallowing.")
|
||||
break
|
||||
handlers.remove(sentinel)
|
||||
"firing handler for %s: %s on %s",
|
||||
_MSG_TYPE_DSCS[hdlr_type], m.getName(), str(d))
|
||||
|
||||
if logger.isEnabledFor(logging.DEBUG):
|
||||
del handlers[d]
|
||||
d.callback(m)
|
||||
if hdlr_type == _HANDLER_TYPE_MSG:
|
||||
break
|
||||
|
||||
if logger_messages.isEnabledFor(logging.DEBUG):
|
||||
result = "Active handlers:\n" + "-"*80
|
||||
for (name, type), handlers in self.handlers.iteritems():
|
||||
result = result + "\n%s \"%s\": %d handler(s)" % \
|
||||
(_MSG_TYPE_DSCS[type], name, len(handlers))
|
||||
result = result + "\n" + "-"*80
|
||||
logger.debug(result)
|
||||
logger_messages.debug(result)
|
||||
|
||||
return done
|
||||
|
||||
|
@ -240,16 +229,12 @@ class Dispatcher:
|
|||
|
||||
key = (name, hdlr_type)
|
||||
if self.handlers.has_key(key):
|
||||
for i, (_, d) in enumerate(self.handlers[key]):
|
||||
if d == d2remove:
|
||||
del self.handlers[key][i]
|
||||
break
|
||||
|
||||
del self.handlers[key][d2remove]
|
||||
|
||||
def _cancelHandler(self, m, name, hdlr_type, d):
|
||||
"""
|
||||
Cancel YAYPM deferred.
|
||||
"""
|
||||
|
||||
if logger.isEnabledFor(logging.DEBUG):
|
||||
logger.debug("Canceling: %s" % name)
|
||||
self._removeHandler(name, hdlr_type, d)
|
||||
|
@ -444,10 +429,11 @@ class EmbeddedDispatcher(Dispatcher):
|
|||
|
||||
for i, script in enumerate(scripts):
|
||||
try:
|
||||
yateproxy.debug("pymodule(%s)" % "yaypm",
|
||||
name = "__embedded_yaypm_module__%d" % i
|
||||
yateproxy.debug("yaypm",
|
||||
logging.DEBUG,
|
||||
"Loading script: %s" % script)
|
||||
imp.load_source("__embedded_yaypm_module__", script)
|
||||
"Loading script: %s as %s" % (script, name))
|
||||
imp.load_source(name, script)
|
||||
except Exception:
|
||||
yateproxy.debug("pymodule(%s)" % "yaypm",
|
||||
logging.ERROR,
|
||||
|
@ -479,27 +465,32 @@ class EmbeddedDispatcher(Dispatcher):
|
|||
Install Pymodule message handler.
|
||||
"""
|
||||
key = (name, _HANDLER_TYPE_MSG)
|
||||
self.handlers[key] = self.handlers.get(key, [])
|
||||
self.handlers[key] = self.handlers.get(key, {})
|
||||
yateproxy.installMsgHandler(self.interpreter, name, prio)
|
||||
d = CancellableDeferred()
|
||||
d.callback(True)
|
||||
return d
|
||||
return None
|
||||
|
||||
#d = CancellableDeferred()
|
||||
#d.callback(True)
|
||||
#return d
|
||||
|
||||
def installWatchHandler(self, name):
|
||||
"""
|
||||
Install Pymodule watch handler.
|
||||
"""
|
||||
key = (name, _HANDLER_TYPE_WCH)
|
||||
self.handlers[key] = self.handlers.get(key, [])
|
||||
self.handlers[key] = self.handlers.get(key, {})
|
||||
yateproxy.installWatchHandler(self.interpreter, name)
|
||||
d = CancellableDeferred()
|
||||
d.callback(True)
|
||||
return d
|
||||
return None
|
||||
|
||||
#d = CancellableDeferred()
|
||||
#d.callback(True)
|
||||
#return d
|
||||
|
||||
def msg(self, name, attrs = None, retValue = None):
|
||||
"""
|
||||
Create YATE message wrapped with MessageProxy.
|
||||
"""
|
||||
"""
|
||||
|
||||
m = EmbeddedDispatcher._MessageProxy(
|
||||
yateproxy.message_create(name, str(retValue)))
|
||||
|
||||
|
@ -821,12 +812,12 @@ class TCPDispatcher(Dispatcher, LineReceiver):
|
|||
del self.waiting[wid]
|
||||
if unescape(values[1]) in ("ok", "true"):
|
||||
key = (values[0], _HANDLER_TYPE_WCH)
|
||||
self.handlers[key] = self.handlers.get(key, [])
|
||||
self.handlers[key] = self.handlers.get(key, {})
|
||||
d.callback(True)
|
||||
else:
|
||||
logger.warn("Can't install handler for: %s" % str(values[0]))
|
||||
d.errback(failure.Failure(
|
||||
"Can't install handler for: %s" % str(values[0])))
|
||||
Exception("Can't install handler for: %s" % str(values[0]))))
|
||||
else:
|
||||
if logger.isEnabledFor(logging.WARN):
|
||||
logger.warn("Response to unknown message: %s" % str(values))
|
||||
|
@ -844,7 +835,7 @@ class TCPDispatcher(Dispatcher, LineReceiver):
|
|||
del self.waiting[mid]
|
||||
if unescape(values[2]) in ("ok", "true"):
|
||||
key = (values[1], _HANDLER_TYPE_MSG)
|
||||
self.handlers[key] = self.handlers.get(key, [])
|
||||
self.handlers[key] = self.handlers.get(key, {})
|
||||
d.callback(True)
|
||||
else:
|
||||
logger.warn("Can't install handler for: %s" % str(values[1]))
|
||||
|
@ -902,13 +893,17 @@ class TCPDispatcher(Dispatcher, LineReceiver):
|
|||
|
||||
def connectionLost(self, reason):
|
||||
logger.info("Connection lost: %s" % reason.getErrorMessage());
|
||||
for _, d in self.waiting.values():
|
||||
for (m, (_, d)) in self.waiting.items():
|
||||
try:
|
||||
raise DisconnectedException()
|
||||
except:
|
||||
d.errback(failure.Failure())
|
||||
self.waiting.clear()
|
||||
|
||||
## try:
|
||||
## reactor.stop()
|
||||
## except:
|
||||
## pass
|
||||
|
||||
def lineReceived(self, line):
|
||||
if logger_messages.isEnabledFor(logging.DEBUG):
|
||||
logger_messages.debug("received line:\n%s" % line);
|
||||
|
@ -996,12 +991,10 @@ class TCPDispatcherFactory(ClientFactory):
|
|||
self.selfwatch = selfwatch
|
||||
|
||||
def startedConnecting(self, connector):
|
||||
if logger.isEnabledFor(logging.DEBUG):
|
||||
logger.debug("Connecting...")
|
||||
logger.info("Connecting...")
|
||||
|
||||
def buildProtocol(self, addr):
|
||||
if logger.isEnabledFor(logging.DEBUG):
|
||||
logger.debug("Connected.")
|
||||
logger.info("Connected.")
|
||||
return TCPDispatcher(
|
||||
self.connected,
|
||||
self.args, self.kwargs,
|
||||
|
@ -1009,8 +1002,27 @@ class TCPDispatcherFactory(ClientFactory):
|
|||
self.selfwatch)
|
||||
|
||||
def clientConnectionLost(self, connector, reason):
|
||||
logger.info("clientConnectionLost")
|
||||
pass
|
||||
|
||||
def clientConnectionFailed(self, connector, reason):
|
||||
if logger.isEnabledFor(logging.DEBUG):
|
||||
logger.debug("Connection failed for reason: %s" % str(reason))
|
||||
reactor.stop()
|
||||
|
||||
class Formatter(logging.Formatter) :
|
||||
_level_colors = {
|
||||
"DEBUG": "\033[22;32m", "INFO": "\033[01;34m",
|
||||
"WARNING": "\033[22;35m", "ERROR": "\033[22;31m",
|
||||
"CRITICAL": "\033[01;31m"
|
||||
};
|
||||
def __init__(self,
|
||||
fmt = '%(name)s %(levelname)s %(message)s',
|
||||
datefmt=None):
|
||||
logging.Formatter.__init__(self, fmt, datefmt)
|
||||
|
||||
def format(self, record):
|
||||
if(Formatter._level_colors.has_key(record.levelname)):
|
||||
record.levelname = "%s%s\033[0;0m" % \
|
||||
(Formatter._level_colors[record.levelname],
|
||||
record.levelname)
|
||||
record.name = "\033[37m\033[1m%s\033[0;0m" % record.name
|
||||
return logging.Formatter.format(self, record)
|
||||
|
|
|
@ -0,0 +1,83 @@
|
|||
import logging
|
||||
from yaypm.utils import XOR
|
||||
from yaypm.flow import getResult
|
||||
|
||||
logger = logging.getLogger("yaypm.resources")
|
||||
|
||||
class Resource:
|
||||
def _match(self, *args):
|
||||
raise NotImplementedError("Abstract Method!")
|
||||
def play(self, yate, callid, targetid, stopOnDTMF=False, until = None, override = False, *args):
|
||||
files = self._match(*args)
|
||||
nid = targetid
|
||||
|
||||
if not until:
|
||||
until = yate.onwatch("chan.hangup",
|
||||
lambda m : m["id"] == callid)
|
||||
|
||||
for f in files:
|
||||
m = yate.msg("chan.masquerade",
|
||||
{"message": "chan.attach",
|
||||
"id": targetid,
|
||||
"override" if override else "source": f,
|
||||
"notify": nid})
|
||||
yield m.dispatch()
|
||||
getResult()
|
||||
|
||||
if stopOnDTMF:
|
||||
yield XOR(
|
||||
yate.onmsg("chan.notify",
|
||||
lambda m : m["targetid"] == nid,
|
||||
autoreturn = True,
|
||||
until = until),
|
||||
yate.onwatch("chan.dtmf",
|
||||
lambda m : m["id"] == callid,
|
||||
until = until))
|
||||
dtmf, _ = getResult()
|
||||
if dtmf:
|
||||
yield dtmf
|
||||
break
|
||||
else:
|
||||
yield yate.onmsg("chan.notify",
|
||||
lambda m : m["targetid"] == nid,
|
||||
autoreturn = True,
|
||||
until = until)
|
||||
getResult()
|
||||
|
||||
def override(self, yate, callid, stopOnDTMF=False, until = None, *args):
|
||||
|
||||
return Resource.play(self, yate, callid, callid, stopOnDTMF,
|
||||
until, override = True, *args)
|
||||
|
||||
class StaticResource(Resource):
|
||||
def __init__(self, attach, desc = None):
|
||||
self.attach = attach
|
||||
if not desc:
|
||||
self.desc = attach
|
||||
|
||||
def _match(self, *args):
|
||||
return [self.attach]
|
||||
|
||||
class ConcatenationResource(Resource):
|
||||
def __init__(self, *args):
|
||||
self.resources = []
|
||||
current = None
|
||||
current_args = None
|
||||
for arg in args:
|
||||
if isinstance(arg, Resource):
|
||||
if current:
|
||||
self.resources.append((current, current_args))
|
||||
current = arg
|
||||
current_args = []
|
||||
else:
|
||||
if not current:
|
||||
raise WrongValue("Argument without Resource!")
|
||||
current_args.append(arg)
|
||||
if current:
|
||||
self.resources.append((current, current_args))
|
||||
|
||||
def _match(self, *args):
|
||||
result = []
|
||||
for resource, res_args in self.resources:
|
||||
result.extend(resource._match(*(args[i] for i in res_args)))
|
||||
return result
|
|
@ -0,0 +1,74 @@
|
|||
#!/usr/bin/python
|
||||
|
||||
import sys, os, imp, logging, time, subprocess
|
||||
from twisted.internet import reactor
|
||||
|
||||
logger = logging.getLogger('yaypm.srcmon')
|
||||
|
||||
def monitor():
|
||||
def modules2watch():
|
||||
modules2watch = {}
|
||||
pythonDir = sys.modules["os"].__file__.rsplit("/", 1)[0]
|
||||
|
||||
for m in sys.modules.values():
|
||||
if hasattr(m, "__file__"):
|
||||
f = m.__file__
|
||||
if not f.startswith(pythonDir) and not f == '<stdin>':
|
||||
if f.endswith("pyc"):
|
||||
modules2watch[f[:-1]] = m
|
||||
else:
|
||||
modules2watch[f] = m
|
||||
|
||||
return modules2watch
|
||||
|
||||
def check():
|
||||
modules = modules2watch()
|
||||
watched = {}
|
||||
|
||||
logger.info("Monitoring %d files." % len(modules.keys()))
|
||||
|
||||
while True:
|
||||
for f in modules.keys():
|
||||
try:
|
||||
t = os.stat(f)
|
||||
except os.error:
|
||||
logger.info("File: '%s' possibly removed. Reloading..." % f)
|
||||
yield True
|
||||
|
||||
mtime = watched.get(f, None)
|
||||
if mtime:
|
||||
if t.st_mtime > mtime:
|
||||
logger.info("File: '%s' changed." % f)
|
||||
yield False
|
||||
else:
|
||||
watched[f] = t.st_mtime
|
||||
yield True
|
||||
|
||||
def loop(t, f):
|
||||
if f():
|
||||
reactor.callLater(t, loop, t, f)
|
||||
else:
|
||||
logger.info("Reloading...")
|
||||
reactor.stop()
|
||||
os.execv(sys.argv[0], sys.argv)
|
||||
|
||||
c = check()
|
||||
|
||||
reactor.callLater(1, loop, 1, c.next)
|
||||
|
||||
|
||||
if os.environ.has_key("MONITORED"):
|
||||
sys.stderr.write("Started in monitored mode.\n")
|
||||
reactor.callLater(1, monitor)
|
||||
else:
|
||||
os.environ["MONITORED"] = ""
|
||||
|
||||
while True:
|
||||
if not(subprocess.call(sys.argv)):
|
||||
break
|
||||
else:
|
||||
sys.stderr.write(
|
||||
"Monitored process failed(judging by exit code). " + \
|
||||
"Trying to reload.\n")
|
||||
time.sleep(3)
|
||||
|
|
@ -1,468 +0,0 @@
|
|||
#!/usr/bin/python
|
||||
# -*- coding: iso-8859-2; -*-
|
||||
"""
|
||||
Tester module for YAYPM. Generates calls, and allows to define incall
|
||||
activity like sending DTMFs. Can be run on to separate yates to imitate
|
||||
realistic load.
|
||||
|
||||
Copyright (C) 2005 Maciek Kaminski
|
||||
|
||||
This program is free software; you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation; either version 2 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
|
||||
"""
|
||||
|
||||
from yaypm import TCPDispatcherFactory, AbandonedException
|
||||
from yaypm.flow import go, logger_flow, getResult
|
||||
from yaypm.utils import XOR, sleep
|
||||
from twisted.internet import reactor, defer
|
||||
|
||||
import sys, logging, yaypm, time
|
||||
|
||||
logger = logging.getLogger("tester")
|
||||
|
||||
beepresource = "wave/play/./sounds/beep.gsm"
|
||||
digitresource = "wave/play/./sounds/digits/pl/%s.gsm"
|
||||
|
||||
extended_calls = []
|
||||
|
||||
impossible_prefix = "_aaaa_tester_prefix"
|
||||
|
||||
def do_nothing(client_yate, server_yate, testid, targetid, callid, remoteid, duration):
|
||||
"""
|
||||
Defines incall activity. Does nothing for specified number of seconds.
|
||||
"""
|
||||
logger.debug("[%d] doing nothing on %s for %d s." % (testid, callid, duration))
|
||||
yield sleep(duration)
|
||||
getResult()
|
||||
logger.debug("[%d] dropping: %s" % (testid, callid))
|
||||
yield client_yate.msg("call.drop", {"id": callid}).dispatch()
|
||||
getResult()
|
||||
|
||||
def detect_dtmf_and_do_nothing(client_yate, server_yate, testid, targetid, callid, remoteid, duration):
|
||||
"""
|
||||
Defines incall activity. Enables dtmf detection on server and does nothing
|
||||
for specified number of seconds.
|
||||
"""
|
||||
server_yate.msg("chan.masquerade",
|
||||
attrs = {"message" : "chan.detectdtmf",
|
||||
"id": remoteid,
|
||||
"consumer": "dtmf/"}).enqueue()
|
||||
|
||||
logger.debug("[%d] detecting dtmf on %s for %d s." % (testid, callid, duration))
|
||||
yield sleep(duration)
|
||||
getResult()
|
||||
logger.debug("[%d] dropping: %s" % (testid, callid))
|
||||
yield client_yate.msg("call.drop", {"id": callid}).dispatch()
|
||||
getResult()
|
||||
|
||||
|
||||
def send_dtmfs(client_yate, server_yate, testid, targetid, callid, remoteid, dtmfs):
|
||||
"""
|
||||
Defines incall activity. Activity is defined in dtmfs string according
|
||||
to the following rules:
|
||||
1-9,*,# - sends x as dtmf
|
||||
connected? - waits for test.checkpoint message with check==connected
|
||||
connected?x - waits for test.checkpoint message with check==connected for x secs
|
||||
s:x,c:y - does chan.attach with source==s and consumer==c
|
||||
_ - 1s break, _*x - x times 1s
|
||||
timeout:t - starts timer that drops connection after t secs
|
||||
... - ends tester thread but does not drop connection
|
||||
Example: _,1,1? - waits 1 second, sends 1, waits for 1 checkpoint.
|
||||
"""
|
||||
|
||||
end = client_yate.onwatch("chan.hangup", lambda m : m["id"] == targetid)
|
||||
|
||||
for text in [t.strip() for t in dtmfs.split(",")]:
|
||||
if "c:" in text or "s:" in text:
|
||||
media = text.split("|")
|
||||
consumer = None
|
||||
source = None
|
||||
for m in media:
|
||||
if m.startswith("c:"):
|
||||
consumer = m[2:]
|
||||
elif m.startswith("s:"):
|
||||
source = m[2:]
|
||||
if source or consumer:
|
||||
attrs = {"message": "chan.attach",
|
||||
"id": callid}
|
||||
if source:
|
||||
attrs["source"] = source
|
||||
if consumer:
|
||||
attrs["consumer"] = consumer
|
||||
client_yate.msg("chan.masquerade", attrs).enqueue()
|
||||
elif text == "...":
|
||||
logger.debug("[%d] call %s extended" % (testid, callid))
|
||||
extended_calls.append(callid)
|
||||
return
|
||||
## elif text.startswith("timeout:"):
|
||||
## timeout = int(text[len("timeout:"):])
|
||||
## drop = client_yate.msg("call.drop",
|
||||
## {"id": targetid, "reason": "timeout"})
|
||||
## logger.debug(
|
||||
## "[%d] Setting absolute timeout to: %d s" % (testid, timeout))
|
||||
## reactor.callLater(timeout, drop.enqueue)
|
||||
elif "?" in text:
|
||||
i = text.find("?")
|
||||
check = text[:i]
|
||||
timeout = None
|
||||
if len(text) > i + 1:
|
||||
timeout = int(text[i+1:])
|
||||
|
||||
check_def = server_yate.onwatch(
|
||||
"test.checkpoint",
|
||||
lambda m : m["id"] == remoteid and m["check"] == check,
|
||||
until = end)
|
||||
|
||||
if timeout:
|
||||
logger.debug(
|
||||
"[%d] waiting for: %ds for checkpoint: %s on: %s" % \
|
||||
(testid, timeout, check, remoteid))
|
||||
yield XOR(check_def, sleep(timeout))
|
||||
what, _ = getResult()
|
||||
if what > 0:
|
||||
logger.debug(
|
||||
"[%d] timeout while waiting %d s for checkpoint: %s on: %s" % \
|
||||
(testid, timeout, check, remoteid))
|
||||
client_yate.msg("call.drop", {"id": callid}).enqueue()
|
||||
logger.debug(
|
||||
"[%d] dropping connection: %s" % (testid, remoteid))
|
||||
raise Exception("Timeout while waiting %d s for checkpoint: %s on: %s" % \
|
||||
(timeout, check, remoteid))
|
||||
else:
|
||||
logger.debug(
|
||||
"[%d] Waiting for checkpoint: '%s' on: %s" % \
|
||||
(testid, check, remoteid))
|
||||
yield check_def
|
||||
getResult()
|
||||
|
||||
elif text in ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "0", "#", "*"]:
|
||||
logger.debug("[%d] Sending dtmf: %s to %s" % (testid, text, targetid))
|
||||
client_yate.msg(
|
||||
"chan.dtmf",
|
||||
{"id": callid, "targetid": targetid, "text": text}).enqueue()
|
||||
else:
|
||||
t = 1
|
||||
if "*" in text:
|
||||
try:
|
||||
t = int(text[2:])
|
||||
except ValueError:
|
||||
pass
|
||||
logger.debug("[%d] Sleeping for: %d s." % (testid, t))
|
||||
yield sleep(t)
|
||||
getResult()
|
||||
yield sleep(1)
|
||||
getResult()
|
||||
logger.debug("[%d] dropping: %s" % (testid, callid))
|
||||
yield client_yate.msg("call.drop", {"id": callid}).dispatch()
|
||||
getResult()
|
||||
## except AbandonedException, e:
|
||||
## logger.debug("[%d] call %s abandoned" % (testid, callid))
|
||||
## except DisconnectedException:
|
||||
## logger.debug("[%d] call %s disconnected" % (testid, callid))
|
||||
|
||||
logger.debug("[%d] call %s finished" % (testid, callid))
|
||||
|
||||
|
||||
def select_non_called(dfrs):
|
||||
"""
|
||||
Select deferreds that have not been fired.
|
||||
"""
|
||||
r = []
|
||||
for d in dfrs:
|
||||
if not d.called:
|
||||
r.append(d)
|
||||
return r
|
||||
|
||||
def load(client_yate, server_yate, target, handler, con_max, tests, monitor):
|
||||
"""
|
||||
Generate load. Needs:
|
||||
client and server yate connections;
|
||||
target number;
|
||||
handler function that defines incall activity;
|
||||
starts not more than con_max concurrent calls;
|
||||
tests is a list of tests to execute
|
||||
monitor is a extension that will be used to monitor tests.
|
||||
"""
|
||||
|
||||
concurrent = []
|
||||
monitored = []
|
||||
|
||||
successes, failures = {}, {}
|
||||
|
||||
def success(_, t, start):
|
||||
successes[t] = time.time() - start
|
||||
|
||||
def failure(f, t, start):
|
||||
failures[t] = (time.time() - start, f)
|
||||
|
||||
yield server_yate.installMsgHandler("call.route", prio = 50)
|
||||
getResult()
|
||||
yield client_yate.installWatchHandler("call.answered")
|
||||
getResult()
|
||||
yield client_yate.installWatchHandler("chan.hangup")
|
||||
getResult()
|
||||
|
||||
if monitor:
|
||||
monitorid = None
|
||||
monitor_room = None
|
||||
|
||||
logger.debug("Creating monitoring connection.")
|
||||
|
||||
execute = client_yate.msg("call.execute",
|
||||
{"callto": "dumb/",
|
||||
"target": monitor})
|
||||
yield execute.dispatch()
|
||||
if not getResult():
|
||||
logger.warn("can't create monitor connection on %s" % monitor)
|
||||
monitor = None
|
||||
else:
|
||||
try:
|
||||
end = client_yate.onwatch(
|
||||
"chan.hangup",
|
||||
lambda m, callid = execute["id"] : m["id"] == callid)
|
||||
|
||||
yield client_yate.onwatch(
|
||||
"call.answered",
|
||||
lambda m : m["id"] == execute["targetid"],
|
||||
until = end)
|
||||
getResult()
|
||||
logger.debug("Monitor connection answered.")
|
||||
|
||||
dumbid = execute["id"]
|
||||
monitorid = execute["targetid"]
|
||||
|
||||
execute = client_yate.msg(
|
||||
"chan.masquerade",
|
||||
{"message": "call.execute",
|
||||
"id": execute["targetid"],
|
||||
"lonely": "true",
|
||||
"voice": "false",
|
||||
"echo": "false",
|
||||
"smart": "true",
|
||||
"callto": "conf/"})
|
||||
yield execute.dispatch()
|
||||
if getResult():
|
||||
monitor_room = execute["room"]
|
||||
logger.debug("Monitor conference created.")
|
||||
yield client_yate.msg("call.drop", {"id": dumbid}).dispatch()
|
||||
getResult()
|
||||
else:
|
||||
logger.warn("can't create monitor conference on %s" % monitor)
|
||||
monitor = None
|
||||
except AbandonedException:
|
||||
logger.debug("Monitor connection not answered.")
|
||||
monitor = None
|
||||
|
||||
count = 0
|
||||
|
||||
for t in tests:
|
||||
concurrent = select_non_called(concurrent)
|
||||
if len(concurrent) >= con_max:
|
||||
logger.debug("waiting on concurrency limit: %d" % con_max)
|
||||
yield defer.DeferredList(concurrent, fireOnOneCallback=True)
|
||||
_, fired = getResult()
|
||||
concurrent.remove(concurrent[fired])
|
||||
|
||||
count = count + 1
|
||||
|
||||
route = server_yate.onmsg(
|
||||
"call.route",
|
||||
lambda m : m["driver"] != "dumb" and m["called"].find(impossible_prefix) >= 0)
|
||||
|
||||
def getRemoteId(d):
|
||||
yield d
|
||||
route = getResult()
|
||||
route["called"] = route["called"].replace(impossible_prefix, "")
|
||||
remoteid = route["id"]
|
||||
route.ret(False)
|
||||
yield remoteid
|
||||
return
|
||||
|
||||
remoteid_def = go(getRemoteId(route))
|
||||
|
||||
execute = client_yate.msg(
|
||||
"call.execute",
|
||||
{"callto": "dumb/",
|
||||
"target": target % impossible_prefix,
|
||||
"maxcall": 1000})
|
||||
|
||||
yield execute.dispatch()
|
||||
|
||||
try:
|
||||
if not getResult():
|
||||
route.cancel()
|
||||
raise AbandonedException("Call to: %s failed." % target)
|
||||
|
||||
callid = execute["id"]
|
||||
|
||||
end = client_yate.onwatch(
|
||||
"chan.hangup",
|
||||
lambda m, callid = callid : m["id"] == callid)
|
||||
|
||||
yield defer.DeferredList(
|
||||
[remoteid_def, end],
|
||||
fireOnOneCallback=True, fireOnOneErrback=True)
|
||||
|
||||
result, end_first = getResult()
|
||||
if end_first:
|
||||
raise AbandonedException("Call to: %s hungup." % target)
|
||||
|
||||
logger.debug("[%d] outgoing call to %s" % (count, callid))
|
||||
|
||||
yield client_yate.onwatch(
|
||||
"call.answered",
|
||||
lambda m : m["targetid"] == callid,
|
||||
until = end)
|
||||
answered = getResult()
|
||||
|
||||
targetid = execute["targetid"]
|
||||
|
||||
monitoring = False
|
||||
|
||||
if monitor and not monitored :
|
||||
logger.debug("[%d] monitoring: %s" % (count, callid))
|
||||
monitored.append(callid)
|
||||
end.addCallback(
|
||||
lambda _, targetid = targetid: client_yate.msg("call.drop", {"id": targetid}).enqueue())
|
||||
|
||||
yield client_yate.msg(
|
||||
"chan.masquerade",
|
||||
{"message": "call.conference",
|
||||
"id": callid,
|
||||
"room": monitor_room}).dispatch()
|
||||
getResult()
|
||||
monitoring = True
|
||||
|
||||
logger.debug("[%d] recording: %s" % (count, str(callid)))
|
||||
client_yate.msg(
|
||||
"chan.masquerade",
|
||||
{"message": "chan.attach",
|
||||
"id": callid,
|
||||
# "source": "moh/default",
|
||||
"source": "tone/silence",
|
||||
"consumer": "wave/record//tmp/recording%s.slin" % \
|
||||
callid.replace("/", "-"),
|
||||
"maxlen": 0}).enqueue()
|
||||
|
||||
yield remoteid_def
|
||||
remoteid = getResult()
|
||||
|
||||
logger.debug(
|
||||
"[%d] running test with local=(%s, %s) remote=%s" %
|
||||
(count, targetid, callid, remoteid))
|
||||
|
||||
start = time.time()
|
||||
|
||||
result = go(handler(
|
||||
client_yate, server_yate,
|
||||
count,
|
||||
targetid,
|
||||
callid, remoteid, t))
|
||||
|
||||
result.addCallbacks(success, failure,
|
||||
callbackArgs=(count, start),
|
||||
errbackArgs=(count, start))
|
||||
|
||||
if monitoring:
|
||||
result.addCallback(
|
||||
lambda _, mon_id: monitored.remove(mon_id),
|
||||
callid)
|
||||
|
||||
concurrent.append(result)
|
||||
except AbandonedException, e:
|
||||
if not route.called:
|
||||
route.cancel()
|
||||
logger.warn("[%d] outgoing call to %s abandoned" % (count, callid))
|
||||
|
||||
logger.debug(
|
||||
"Waiting for %d tests to finish" % len(select_non_called(concurrent)))
|
||||
yield defer.DeferredList(concurrent)
|
||||
logger.info("Test finished!")
|
||||
|
||||
if monitor and monitorid:
|
||||
logger.debug("droping monitor connection")
|
||||
yield client_yate.msg("call.drop", {"id": monitorid}).dispatch()
|
||||
getResult()
|
||||
yield sleep(1)
|
||||
getResult()
|
||||
|
||||
logger.debug("stopping reactor!")
|
||||
reactor.stop()
|
||||
|
||||
logger.info("-"*80)
|
||||
logger.info("Summary")
|
||||
logger.info("-"*80)
|
||||
logger.info("Tests: %d" % (len(successes) + len(failures)))
|
||||
if successes:
|
||||
logger.info("-"*80)
|
||||
logger.info("Successes: %d" % len(successes))
|
||||
logger.info("Avg time: %.2f s" %
|
||||
(reduce(lambda x, y: x + y, successes.values(), 0) /
|
||||
len(successes)))
|
||||
if failures:
|
||||
logger.info("-"*80)
|
||||
logger.info("Failures: %d" % len(failures))
|
||||
sumt = 0
|
||||
for tid, (t, f) in failures.iteritems():
|
||||
logger.info("%d, %s %s" % (tid, str(f.type), f.getErrorMessage()))
|
||||
sumt = sumt + t
|
||||
logger.info("Avg time: %.2f s" % (sumt/len(failures)))
|
||||
|
||||
def sequential_n(n, tests):
|
||||
"""
|
||||
Repeat tests sequentially n times
|
||||
"""
|
||||
i = 0
|
||||
l = len(tests)
|
||||
while i < n:
|
||||
yield tests[i % l]
|
||||
i = i + 1
|
||||
|
||||
def random_n(n, tests):
|
||||
"""
|
||||
Repeat tests in random order n times.
|
||||
"""
|
||||
i = 0
|
||||
while i < n:
|
||||
yield tests[random.randint(0, len(tests)-1)]
|
||||
i = i + 1
|
||||
|
||||
def do_load_test(
|
||||
local_addr, local_port,
|
||||
remote_addr, remote_port,
|
||||
dest, handler, con_max, tests,
|
||||
monitor = None):
|
||||
|
||||
"""
|
||||
Executes test.
|
||||
"""
|
||||
|
||||
def start_client(client_yate):
|
||||
logger.debug("client started");
|
||||
|
||||
def start_server(server_yate):
|
||||
logger.debug("server started");
|
||||
go(load(client_yate, server_yate,
|
||||
dest,
|
||||
handler, con_max,
|
||||
tests,
|
||||
monitor))
|
||||
|
||||
server_factory = TCPDispatcherFactory(start_server)
|
||||
reactor.connectTCP(remote_addr, remote_port, server_factory)
|
||||
|
||||
client_factory = TCPDispatcherFactory(start_client)
|
||||
|
||||
reactor.connectTCP(local_addr, local_port, client_factory)
|
||||
|
Loading…
Reference in New Issue