ctrl2cgi: fix deferred callbacks

Previously handle_reply() was marked as deferred callback unlike soap.py
where it's synchronous function. This seems to be causing issues where
some of the callbacks are not yield properly. Let's move to the
known-to-work semantics of soap.py where async functions are limited to
Trap() class.

Change-Id: Ib2c28dd7f79cbd28d475de93750703659ddd18f1
Related: SYS#4399
This commit is contained in:
Max 2018-11-27 17:43:45 +01:00
parent 2a1d8930f6
commit ec3944e7ae
1 changed files with 7 additions and 18 deletions

View File

@ -22,7 +22,7 @@
*/ */
""" """
__version__ = "0.0.2" # bump this on every non-trivial change __version__ = "0.0.3" # bump this on every non-trivial change
from twisted.internet import defer, reactor from twisted.internet import defer, reactor
from osmopy.twisted_ipa import CTRL, IPAFactory, __version__ as twisted_ipa_version from osmopy.twisted_ipa import CTRL, IPAFactory, __version__ as twisted_ipa_version
@ -40,25 +40,14 @@ import configparser
assert V(twisted_ipa_version) > V('0.4') assert V(twisted_ipa_version) > V('0.4')
@defer.inlineCallbacks
def handle_reply(f, log, resp): def handle_reply(f, log, resp):
""" """
Reply handler: process raw CGI server response, function f to run for each command Reply handler: process raw CGI server response, function f to run for each command
""" """
#log.debug('HANDLE_REPLY: code=%r' % (resp.code)) decoded = json.loads(resp.decode('utf-8'))
#for key,val in resp.headers.getAllRawHeaders(): bsc_id = decoded.get('commands')[0].split()[0].split('.')[3] # we expect 1st command to have net.0.bsc.666.bts.2.trx.1 location prefix format
# log.debug('HANDLE_REPLY: key=%r val=%r' % (key, val)) log.debug("BSC %s commands: %r" % (bsc_id, decoded.get('commands')))
if resp.code != 200: for t in decoded.get('commands'): # Process commands format
resp_body = yield resp.text()
log.critical('Received HTTP response %d: %s' % (resp.code, resp_body))
return
parsed = yield resp.json()
#log.debug("RESPONSE: %r" % (parsed))
bsc_id = parsed.get('commands')[0].split()[0].split('.')[3] # we expect 1st command to have net.0.bsc.666.bts.2.trx.1 location prefix format
log.info("Received CGI response for BSC %s with %d commands, error status: %s" % (bsc_id, len(parsed.get('commands')), parsed.get('error')))
log.debug("BSC %s commands: %r" % (bsc_id, parsed.get('commands')))
for t in parsed.get('commands'): # Process commands format
(_, m) = Ctrl().cmd(*t.split()) (_, m) = Ctrl().cmd(*t.split())
f(m) f(m)
@ -116,8 +105,8 @@ class Trap(CTRL):
params = make_params(bsc, data) params = make_params(bsc, data)
self.factory.log.debug('location-state@%s.%s.%s.%s (%s) => %s' % (net, bsc, bts, trx, params['time_stamp'], data)) self.factory.log.debug('location-state@%s.%s.%s.%s (%s) => %s' % (net, bsc, bts, trx, params['time_stamp'], data))
params['h'] = gen_hash(params, self.factory.secret_key) params['h'] = gen_hash(params, self.factory.secret_key)
d = post(self.factory.location, None, params=params) d = post(self.factory.location, params)
d.addCallback(partial(handle_reply, self.transport.write, self.factory.log)) # treq's collect helper is handy to get all reply content at once using closure on ctx d.addCallback(collect, partial(handle_reply, self.transport.write, self.factory.log)) # treq's collect helper is handy to get all reply content at once
d.addErrback(lambda e, bsc: self.factory.log.critical("HTTP POST error %s while trying to register BSC %s on %s" % (e, bsc, self.factory.location)), bsc) # handle HTTP errors d.addErrback(lambda e, bsc: self.factory.log.critical("HTTP POST error %s while trying to register BSC %s on %s" % (e, bsc, self.factory.location)), bsc) # handle HTTP errors
# Ensure that we run only limited number of requests in parallel: # Ensure that we run only limited number of requests in parallel:
yield self.factory.semaphore.acquire() yield self.factory.semaphore.acquire()