ctrl2cgi: properly limit number of requests
Manual acquire()/release() of semaphore does not limit number of concurrent requests when combined with explicit yield. Fix this by using semaphore.run() and removing inilineCallbacks decorator. Change-Id: I47b8b9f5b726ca0905bb7c023d63b325c7f7d85f Related: SYS#4399
This commit is contained in:
parent
fa80a22563
commit
8f8d9144ec
|
@ -22,7 +22,7 @@
|
|||
*/
|
||||
"""
|
||||
|
||||
__version__ = "0.0.5" # bump this on every non-trivial change
|
||||
__version__ = "0.0.6" # bump this on every non-trivial change
|
||||
|
||||
import argparse, os, logging, logging.handlers
|
||||
import hashlib
|
||||
|
@ -60,6 +60,12 @@ def gen_hash(params, skey):
|
|||
#print('HASH: \nparams="%r"\ninput="%s" \nres="%s"' %(params, input, res))
|
||||
return res
|
||||
|
||||
def make_async_req(dst, par, f_write, f_log):
|
||||
d = post(dst, par)
|
||||
d.addCallback(collect, partial(handle_reply, f_write, f_log)) # treq's collect helper is handy to get all reply content at once
|
||||
d.addErrback(lambda e: f_log.critical("HTTP POST error %s while trying to register BSC %s on %s" % (e, par['bsc_id'], dst))) # handle HTTP errors
|
||||
return d
|
||||
|
||||
class Trap(CTRL):
|
||||
"""
|
||||
TRAP handler (agnostic to factory's client object)
|
||||
|
@ -93,7 +99,6 @@ class Trap(CTRL):
|
|||
self.factory.log.info("Connected to CTRL@%s:%d" % (self.factory.addr_ctrl, self.factory.port_ctrl))
|
||||
super(CTRL, self).connectionMade()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def handle_locationstate(self, net, bsc, bts, trx, data):
|
||||
"""
|
||||
Handle location-state TRAP: parse trap content, build CGI Request and use treq's routines to post it while setting up async handlers
|
||||
|
@ -101,13 +106,8 @@ class Trap(CTRL):
|
|||
params = make_params(bsc, data)
|
||||
self.factory.log.debug('location-state@%s.%s.%s.%s (%s) => %s' % (net, bsc, bts, trx, params['time_stamp'], data))
|
||||
params['h'] = gen_hash(params, self.factory.secret_key)
|
||||
d = post(self.factory.location, params)
|
||||
d.addCallback(collect, partial(handle_reply, self.transport.write, self.factory.log)) # treq's collect helper is handy to get all reply content at once
|
||||
d.addErrback(lambda e, bsc: self.factory.log.critical("HTTP POST error %s while trying to register BSC %s on %s" % (e, bsc, self.factory.location)), bsc) # handle HTTP errors
|
||||
# Ensure that we run only limited number of requests in parallel:
|
||||
yield self.factory.semaphore.acquire()
|
||||
yield d # we end up here only if semaphore is available which means it's ok to fire the request without exceeding the limit
|
||||
self.factory.semaphore.release()
|
||||
self.factory.semaphore.run(make_async_req, self.factory.location, params, self.transport.write, self.factory.log)
|
||||
|
||||
def handle_notificationrejectionv1(self, net, bsc, bts, trx, data):
|
||||
"""
|
||||
|
|
Loading…
Reference in New Issue