#!/usr/bin/env python # Copyright 2021 Max H. Parke KA1RBI # # This file is part of OP25 # # OP25 is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3, or (at your option) # any later version. # # OP25 is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public # License for more details. # # You should have received a copy of the GNU General Public License # along with OP25; see the file COPYING. If not, write to the Free # Software Foundation, Inc., 51 Franklin Street, Boston, MA # 02110-1301, USA. import sys import os import time import json import threading import traceback import sqlite3 from gnuradio import gr from emap import events_map, cc_events _def_db_file = 'op25-data.db' _def_msgq_size = 100 _def_uncommitted = 25 class du_queue_runner(threading.Thread): def __init__(self, msgq, **kwds): threading.Thread.__init__ (self, **kwds) self.setDaemon(1) self.msgq = msgq self.db_filename = _def_db_file self.conn = None self.cursor = None self.failed = False self.keep_running = True self.uncommitted = 0 self.max_q = 0 self.start() self.next_t = time.time() def run(self): self.connect() while self.keep_running and not self.failed: self.max_q = max(self.max_q, self.msgq.count()) if time.time() > self.next_t: self.next_t = time.time() + 2 msg = self.msgq.delete_head() if self.failed or not self.keep_running: break self.insert_row(msg) def disconnect(self): self.conn.close() self.cursor = None self.conn = None def connect(self): self.conn = sqlite3.connect(self.db_filename) self.cursor = self.conn.cursor() def insert_row(self, msg): if self.cursor is None or self.conn is None: return d = json.loads(msg.to_string()) try: self.cursor.execute(d['command'], d['row']) # optimization: only commit when no more msgs queued (or limit reached) if self.uncommitted < _def_uncommitted and self.msgq.count(): self.uncommitted += 1 else: self.conn.commit() self.uncommitted = 0 except: self.failed = True traceback.print_exc(limit=None, file=sys.stdout) traceback.print_exc(limit=None, file=sys.stderr) sys.stderr.write('sql_dbi: db logging stopped due to error (or db not initialized)\n') # TODO - add error recovery? class sql_dbi: def __init__(self, db_filename=_def_db_file): self.conn = None self.cursor = None self.db_filename = db_filename self.db_msgq = gr.msg_queue(_def_msgq_size) self.q_runner = du_queue_runner(self.db_msgq) self.db_msgq_overflow = 0 self.sql_commands = { 'calls': 'INSERT INTO calls(time, sysid, options, tgid, srcid)', 'joins': 'INSERT INTO joins(time, sysid, rv, tgid, srcid)', 'create_data_store': '''CREATE TABLE data_store ( id INTEGER PRIMARY KEY, time REAL NOT NULL, cc_event INTEGER NOT NULL, opcode INTEGER NOT NULL, sysid INTEGER NOT NULL, mfrid INTEGER NULL, p INTEGER NULL, p2 INTEGER NULL, p3 INTEGER NULL, wacn INTEGER NULL, frequency INTEGER NULL, tgid INTEGER NULL, tgid2 INTEGER NULL, suid INTEGER NULL, suid2 INTEGER NULL, tsbk_sysid INTEGER NULL, FOREIGN KEY(cc_event) REFERENCES event_keys (id))''', 'create_event_keys': '''CREATE TABLE event_keys ( id INTEGER PRIMARY KEY, tag TEXT NOT NULL )''', 'create_sysid': '''CREATE TABLE sysid_tags ( id INTEGER PRIMARY KEY, sysid INTEGER NOT NULL, tag TEXT)''', 'create_tgid': '''CREATE TABLE tgid_tags ( id INTEGER PRIMARY KEY, rid INTEGER NOT NULL, sysid INTEGER NOT NULL, tag TEXT, priority INTEGER)''', 'create_unit_id': '''CREATE TABLE unit_id_tags ( id INTEGER PRIMARY KEY, rid INTEGER NOT NULL, sysid INTEGER NOT NULL, tag TEXT, priority INTEGER)''', 'create_2b_rv': '''CREATE TABLE loc_reg_resp_rv ( rv INTEGER NOT NULL, tag TEXT NOT NULL)''', 'populate_2b_rv': '''INSERT INTO loc_reg_resp_rv(rv, tag) VALUES(0, "join") INSERT INTO loc_reg_resp_rv(rv, tag) VALUES(1, "fail") INSERT INTO loc_reg_resp_rv(rv, tag) VALUES(2, "deny") INSERT INTO loc_reg_resp_rv(rv, tag) VALUES(3, "refuse")''', 'create_index': '''CREATE INDEX tgid_idx ON data_store(tgid) CREATE INDEX tgid2_idx ON data_store(tgid2) CREATE INDEX suid_idx ON data_store(suid) CREATE INDEX suid2_idx ON data_store(suid2) CREATE INDEX t_tgid_idx ON tgid_tags(rid) CREATE INDEX t_unit_id_idx ON unit_id_tags(rid)''' } def disconnect(self): self.conn.close() self.cursor = None self.conn = None def connect(self): self.conn = sqlite3.connect(self.db_filename) self.cursor = self.conn.cursor() def reset_db(self): # any data in db will be destroyed! if os.access(self.db_filename, os.W_OK): os.remove(self.db_filename) self.conn = sqlite3.connect(self.db_filename) self.cursor = self.conn.cursor() self.execute('create_sysid') self.execute('create_2b_rv') self.execute_lines('populate_2b_rv') self.execute('create_tgid') self.execute('create_unit_id') self.execute('create_event_keys') self.execute('create_data_store') self.execute_lines('create_index') self.conn.commit() self.populate_event_keys() self.conn.close() def execute(self, q): self.cursor.execute(self.sql_commands[q]) self.conn.commit() def execute_lines(self,q): for line in self.sql_commands[q].split('\n'): self.cursor.execute(line) self.conn.commit() def q(self, query): if query != '-': return self.cursor.execute(query) lines = sys.stdin.read().strip().split('\n') for query in lines: self.cursor.execute(query) self.conn.commit() return None def write(self, table_name, row): # the number of elements in tuple 'row' must be two less than the number of table columns curr_time = time.time() row = (curr_time,) + row qs = ['?'] * len(row) command = self.sql_commands[table_name] + ' VALUES (' + ','.join(qs) + ')' self.cursor.execute(command, row) self.conn.commit() def event(self, d): if d['cc_event'] not in events_map: return if not os.access(_def_db_file, os.W_OK): # if DB not (yet) set up or not writable return mapl = events_map[d['cc_event']] row = [] column_names = [] for col in mapl: colname = col[0] k = col[1] # special mappings: unwrap tgid and srcid objects if colname.startswith('tgid') and type(d[k]) is dict: val = d[k]['tg_id'] elif colname.startswith('suid') and type(d[k]) is dict: val = d[k]['unit_id'] elif type(d[k]) is not dict: val = d[k] else: sys.stderr.write('value retrieval error %s %s %s\n' % (d['cc_event'], type(d[k]) is dict, k)) val = -1 # special mappings: map cc_event tag to an int if colname == 'cc_event': val = cc_events[d[k]] # special mappings: map affiliation to int if k == 'affiliation': if d[k] == 'global': val = 1 elif d[k] == 'local': val = 0 else: val = -1 # special mappings: map duration to int(msec). if k == 'duration': val = int(d[k] * 1000) row.append(val) column_names.append(colname) command = "INSERT INTO data_store(%s) VALUES(%s)" % (','.join(column_names), ','.join(['?'] * len(row))) js = json.dumps({'command': command, 'row': row}) if not self.db_msgq.full_p(): msg = gr.message().make_from_string(js, 0, 0, 0) self.db_msgq.insert_tail(msg) else: self.db_msgq_overflow += 1 def import_tsv(self, argv): cmd = argv[1] filename = argv[2] sysid = int(argv[3]) if cmd == 'import_tgid': table = 'tgid_tags' elif cmd == 'import_unit': table = 'unit_id_tags' elif cmd == 'import_sysid': table = 'sysid_tags' else: print('%s unsupported' % (cmd)) return q = 'INSERT INTO ' + table + '(rid, sysid, tag, priority) VALUES(?,?,?,?)' if table == 'sysid_tags': q = 'INSERT INTO ' + table + '(sysid, tag) VALUES(?,?)' rows = [] with open(filename, 'r') as f: lines = f.read().rstrip().split('\n') for i in range(len(lines)): a = lines[i].split('\t') if i == 0: # check hdr if not a[0].strip().isdigit(): continue rid = int(a[0]) tag = a[1] priority = 0 if len(a) < 3 else int(a[2]) s = (rid, sysid, tag, priority) if table == 'sysid_tags': s = (rid, tag) rows.append(s) if len(rows): self.cursor.executemany(q, rows) self.conn.commit() def populate_event_keys(self): d = {cc_events[k]:k for k in cc_events} query = 'INSERT INTO event_keys(id, tag) VALUES(?, ?)' for k in sorted(d.keys()): self.cursor.execute(query, [k, d[k]]) self.conn.commit() def main(): if len(sys.argv) > 1 and sys.argv[1] == 'reset_db': sql_dbi().reset_db() return db1 = sql_dbi() db1.connect() if len(sys.argv) > 1 and sys.argv[1] == 'setup': db1.cursor.execute(db1.sql_commands['create_tgid']) db1.cursor.execute(db1.sql_commands['create_unit_id']) db1.conn.commit() db1.conn.close() return if len(sys.argv) > 1 and sys.argv[1] == 'execute_lines': db1.execute_lines(sys.argv[2]) return if len(sys.argv) > 1 and sys.argv[1] == 'execute': db1.execute(sys.argv[2]) return if len(sys.argv) > 1 and sys.argv[1] == 'insert': db1.write('joins', (555, 5555, 5555555)) return if len(sys.argv) > 3 and sys.argv[1].startswith('import_'): db1.import_tsv(sys.argv) return if len(sys.argv) < 3 or sys.argv[1] != 'query': print('nothing done') return result = db1.q(sys.argv[2]) if result: for row in result: print ('%s' % ('\t'.join([str(x) for x in row]))) if __name__ == '__main__': main()