Futher extensions to tester

git-svn-id: http://yate.null.ro/svn/yate/trunk@1016 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
maciejka 2006-09-04 16:44:53 +00:00
parent c36621f9e8
commit 061e731c79
2 changed files with 571 additions and 0 deletions

View File

@ -0,0 +1,363 @@
#!/usr/bin/python
# -*- coding: iso-8859-2; -*-
"""
Tester module for YAYPM. Generates calls, and allows to define incall
activity like sending DTMFs. Can be run on two separate yates to imitate
realistic load.
Copyright (C) 2005 Maciek Kaminski
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
"""
from yaypm import TCPDispatcherFactory, AbandonedException
from yaypm.flow import go, logger_flow, getResult
from yaypm.utils import XOR, sleep
from yaypm.utils.tester import dtmfetc
from twisted.internet import reactor, defer
import sys, logging, yaypm, time
logger = logging.getLogger("tester")
beepresource = "wave/play/./sounds/beep.gsm"
digitresource = "wave/play/./sounds/digits/pl/%s.gsm"
extended_calls = []
impossible_prefix = "_aaaa_tester_prefix"
def do_nothing(client_yate, server_yate, testid, targetid, callid, remoteid, duration):
"""
Defines incall activity. Does nothing for specified number of seconds.
"""
logger.debug("[%d] doing nothing on %s for %d s." % (testid, callid, duration))
yield sleep(duration)
getResult()
logger.debug("[%d] dropping: %s" % (testid, callid))
yield client_yate.msg("call.drop", {"id": callid}).dispatch()
getResult()
def detect_dtmf_and_do_nothing(client_yate, server_yate, testid, targetid, callid, remoteid, duration):
"""
Defines incall activity. Enables dtmf detection on server and does nothing
for specified number of seconds.
"""
server_yate.msg("chan.masquerade",
attrs = {"message" : "chan.detectdtmf",
"id": remoteid,
"consumer": "dtmf/"}).enqueue()
logger.debug("[%d] detecting dtmf on %s for %d s." % (testid, callid, duration))
yield sleep(duration)
getResult()
logger.debug("[%d] dropping: %s" % (testid, callid))
yield client_yate.msg("call.drop", {"id": callid}).dispatch()
getResult()
def select_non_called(dfrs):
"""
Select deferreds that have not been fired.
"""
r = []
for d in dfrs:
if not d.called:
r.append(d)
return r
def load(client_yate, server_yate, target, handler, con_max, tests, monitor):
"""
Generate load. Needs:
client and server yate connections;
target number;
handler function that defines incall activity;
starts not more than con_max concurrent calls;
tests is a list of tests to execute
monitor is a extension that will be used to monitor tests.
"""
concurrent = []
monitored = []
successes, failures = {}, {}
def success(_, t, start):
successes[t] = time.time() - start
def failure(f, t, start):
failures[t] = (time.time() - start, f)
yield server_yate.installMsgHandler("call.route", prio = 50)
getResult()
yield client_yate.installWatchHandler("call.answered")
getResult()
yield client_yate.installWatchHandler("chan.hangup")
getResult()
if monitor:
monitorid = None
monitor_room = None
logger.debug("Creating monitoring connection.")
execute = client_yate.msg("call.execute",
{"callto": "dumb/",
"target": monitor})
yield execute.dispatch()
if not getResult():
logger.warn("can't create monitor connection on %s" % monitor)
monitor = None
else:
try:
end = client_yate.onwatch(
"chan.hangup",
lambda m, callid = execute["id"] : m["id"] == callid)
yield client_yate.onwatch(
"call.answered",
lambda m : m["id"] == execute["targetid"],
until = end)
getResult()
logger.debug("Monitor connection answered.")
dumbid = execute["id"]
monitorid = execute["targetid"]
execute = client_yate.msg(
"chan.masquerade",
{"message": "call.execute",
"id": execute["targetid"],
"lonely": "true",
"voice": "false",
"echo": "false",
"smart": "true",
"callto": "conf/"})
yield execute.dispatch()
if getResult():
monitor_room = execute["room"]
logger.debug("Monitor conference created.")
yield client_yate.msg("call.drop", {"id": dumbid}).dispatch()
getResult()
else:
logger.warn("can't create monitor conference on %s" % monitor)
monitor = None
except AbandonedException:
logger.debug("Monitor connection not answered.")
monitor = None
count = 0
for t in tests:
concurrent = select_non_called(concurrent)
if len(concurrent) >= con_max:
logger.debug("waiting on concurrency limit: %d" % con_max)
yield defer.DeferredList(concurrent, fireOnOneCallback=True)
_, fired = getResult()
concurrent.remove(concurrent[fired])
count = count + 1
route = server_yate.onmsg(
"call.route",
lambda m : m["driver"] != "dumb" and m["called"].find(impossible_prefix) >= 0)
def getRemoteId(d):
yield d
route = getResult()
route["called"] = route["called"].replace(impossible_prefix, "")
remoteid = route["id"]
route.ret(False)
yield remoteid
return
remoteid_def = go(getRemoteId(route))
execute = client_yate.msg(
"call.execute",
{"callto": "dumb/",
"target": target % impossible_prefix,
"maxcall": 1000})
yield execute.dispatch()
try:
if not getResult():
route.cancel()
raise AbandonedException("Call to: %s failed." % target)
callid = execute["id"]
end = client_yate.onwatch(
"chan.hangup",
lambda m, callid = callid : m["id"] == callid)
yield defer.DeferredList(
[remoteid_def, end],
fireOnOneCallback=True, fireOnOneErrback=True)
result, end_first = getResult()
if end_first:
raise AbandonedException("Call to: %s hungup." % target)
logger.debug("[%d] outgoing call to %s" % (count, callid))
yield client_yate.onwatch(
"call.answered",
lambda m : m["targetid"] == callid,
until = end)
answered = getResult()
targetid = execute["targetid"]
monitoring = False
if monitor and not monitored :
logger.debug("[%d] monitoring: %s" % (count, callid))
monitored.append(callid)
end.addCallback(
lambda _, targetid = targetid: client_yate.msg("call.drop", {"id": targetid}).enqueue())
yield client_yate.msg(
"chan.masquerade",
{"message": "call.conference",
"id": callid,
"room": monitor_room}).dispatch()
getResult()
monitoring = True
logger.debug("[%d] recording: %s" % (count, str(callid)))
client_yate.msg(
"chan.masquerade",
{"message": "chan.attach",
"id": callid,
# "source": "moh/default",
"source": "tone/silence",
"consumer": "wave/record//tmp/recording%s.slin" % \
callid.replace("/", "-"),
"maxlen": 0}).enqueue()
yield remoteid_def
remoteid = getResult()
logger.debug(
"[%d] running test with local=(%s, %s) remote=%s" %
(count, targetid, callid, remoteid))
start = time.time()
result = go(handler(
client_yate, server_yate,
count,
targetid,
callid, remoteid, t))
result.addCallbacks(success, failure,
callbackArgs=(count, start),
errbackArgs=(count, start))
if monitoring:
result.addCallback(
lambda _, mon_id: monitored.remove(mon_id),
callid)
concurrent.append(result)
except AbandonedException, e:
if not route.called:
route.cancel()
logger.warn("[%d] outgoing call to %s abandoned" % (count, callid))
logger.debug(
"Waiting for %d tests to finish" % len(select_non_called(concurrent)))
yield defer.DeferredList(concurrent)
logger.info("Test finished!")
if monitor and monitorid:
logger.debug("droping monitor connection")
yield client_yate.msg("call.drop", {"id": monitorid}).dispatch()
getResult()
yield sleep(1)
getResult()
logger.debug("stopping reactor!")
reactor.stop()
logger.info("-"*80)
logger.info("Summary")
logger.info("-"*80)
logger.info("Tests: %d" % (len(successes) + len(failures)))
if successes:
logger.info("-"*80)
logger.info("Successes: %d" % len(successes))
logger.info("Avg time: %.2f s" %
(reduce(lambda x, y: x + y, successes.values(), 0) /
len(successes)))
if failures:
logger.info("-"*80)
logger.info("Failures: %d" % len(failures))
sumt = 0
for tid, (t, f) in failures.iteritems():
logger.info("%d, %s %s" % (tid, str(f.type), f.getErrorMessage()))
sumt = sumt + t
logger.info("Avg time: %.2f s" % (sumt/len(failures)))
def sequential_n(n, tests):
"""
Repeat tests sequentially n times
"""
i = 0
l = len(tests)
while i < n:
yield tests[i % l]
i = i + 1
def random_n(n, tests):
"""
Repeat tests in random order n times.
"""
i = 0
while i < n:
yield tests[random.randint(0, len(tests)-1)]
i = i + 1
def do_load_test(
local_addr, local_port,
remote_addr, remote_port,
dest, handler, con_max, tests,
monitor = None):
"""
Executes test.
"""
def start_client(client_yate):
logger.debug("client started");
def start_server(server_yate):
logger.debug("server started");
go(load(client_yate, server_yate,
dest,
handler, con_max,
tests,
monitor))
server_factory = TCPDispatcherFactory(start_server)
reactor.connectTCP(remote_addr, remote_port, server_factory)
client_factory = TCPDispatcherFactory(start_client)
reactor.connectTCP(local_addr, local_port, client_factory)

View File

@ -0,0 +1,208 @@
#!/usr/bin/python
# -*- coding: iso-8859-2; -*-
"""
Tester module for YAYPM. Generates calls, and allows to define incall
activity like sending DTMFs. Can be run on two separate yates to imitate
realistic load.
Copyright (C) 2005 Maciek Kaminski
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
"""
from yaypm import TCPDispatcherFactory, AbandonedException
from yaypm.flow import go, logger_flow, getResult
from yaypm.utils import XOR, sleep #, tester
from twisted.internet import reactor, defer
import sys, logging, yaypm, time
logger = logging.getLogger("tester")
class peep:
def __init__(self, g):
self.g = g
self.peep = None
def __iter__(self):
return self
def next(self):
if self.peep:
print "peep", self.peep
tmp = self.peep
self.peep = None
return tmp
else:
return self.g.next()
def back(self, v):
self.peep = v
def tokenize(text):
a = ""
for c in text:
if c in ",()|":
if a:
yield a
yield c
a = ""
else:
a = a + c
if a:
yield a
def dtmf_etc(client_yate, server_yate, testid, targetid, callid, remoteid, test):
"""
Defines incall activity. Activity is defined in a string, according
to the following rules:
1-9,*,# - sends x as dtmf
connected? - waits for test.checkpoint message with check==connected
connected?x - waits for test.checkpoint message with check==connected for x secs
error! - raise exception on test.checkpoint message with check==error
s:x,c:y - does chan.attach with source==s and consumer==c
_ - 1s break, _*x - x times 1s
timeout:t - starts timer that drops connection after t secs
... - ends tester thread but does not drop connection
Example: _,1,1? - waits 1 second, sends 1, waits for 1 checkpoint.
"""
end = client_yate.onwatch("chan.hangup", lambda m : m["id"] == targetid)
def one_step(step):
if "c:" in step or "s:" in step:
media = step.split("|")
consumer = None
source = None
for m in media:
if m.startswith("c:"):
consumer = m[2:]
elif m.startswith("s:"):
source = m[2:]
if source or consumer:
attrs = {"message": "chan.attach",
"id": callid}
if source:
attrs["source"] = source
if consumer:
attrs["consumer"] = consumer
client_yate.msg("chan.masquerade", attrs).enqueue()
elif step == "...":
logger.debug("[%d] call %s extended" % (testid, callid))
#tester.extended_calls.append(callid)
return
elif "?" in step:
i = step.find("?")
check = step[:i]
timeout = None
if len(step) > i + 1:
timeout = int(step[i+1:])
check_def = server_yate.onwatch(
"test.checkpoint",
lambda m : m["id"] == remoteid and m["check"] == check,
until = end)
if timeout:
logger.debug(
"[%d] waiting for: %ds for checkpoint: %s on: %s" % \
(testid, timeout, check, remoteid))
yield XOR(check_def, sleep(timeout))
what, _ = getResult()
if what > 0:
logger.debug(
"[%d] timeout while waiting %d s for checkpoint: %s on: %s" % \
(testid, timeout, check, remoteid))
client_yate.msg("call.drop", {"id": callid}).enqueue()
logger.debug(
"[%d] dropping connection: %s" % (testid, remoteid))
raise Exception("Timeout while waiting %d s for checkpoint: %s on: %s" % \
(timeout, check, remoteid))
else:
logger.debug(
"[%d] Waiting for checkpoint: '%s' on: %s" % \
(testid, check, remoteid))
yield check_def
getResult()
elif step in ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "0", "#", "*"]:
logger.debug("[%d] Sending dtmf: %s to %s" % (testid, step, targetid))
client_yate.msg(
"chan.dtmf",
{"id": callid, "targetid": targetid, "text": step}).enqueue()
else:
t = 1
if "*" in step:
try:
t = int(step[2:])
except ValueError:
pass
logger.debug("[%d] Sleeping for: %d s." % (testid, t))
yield sleep(t)
getResult()
def parse(toparse):
print "parse"
stack = []
for t in toparse:
print "t:", t
if t == ",":
def x(a, b):
yield go()
getResult()
yield go(b)
getResult()
stack.append(x(stack.pop(), parse(toparse)))
elif t == "(":
stack.append(parse(toparse))
if toparse.next() != ")":
raise Excepion("Unmatched bracket!")
elif t == ")":
toparse.back(t)
break
elif t == "|":
def x(a, b):
yield defer.DeferredList([a, b],
fireOnOneCallback=True)
getResult()
stack.append(x(stack.pop(), parse(toparse)))
else:
stack.append(one_step(t))
if stack:
return stack[0]
else:
return None
yield go(parse(parse(peep(tokenize(x)))))
getResult()
yield sleep(0.1)
getResult()
logger.debug("[%d] dropping: %s" % (testid, callid))
yield client_yate.msg("call.drop", {"id": callid}).dispatch()
getResult()
logger.debug("[%d] call %s finished" % (testid, callid))