utils: add gsmtap_logread.py a gsmtap log reader

Receive gsmtap logs and feeds it into the python logging
framework. Allows to use generic logging features and
further utilities.

Change-Id: I24478d8e16066c6118e867bdba54c6418c15e170
This commit is contained in:
Alexander Couzens 2019-05-27 03:44:39 +02:00 committed by Harald Welte
parent 2e78f900cf
commit 9b20741910
2 changed files with 167 additions and 0 deletions

75
utils/gsmtap.py Normal file
View File

@ -0,0 +1,75 @@
#!/usr/bin/env python
# License: MIT
# Copyright 2019 by Sysmocom s.f.m.c. GmbH
# Author: Alexander Couzens <lynxis@fe80.eu>
import struct
GSMTAP_VERSION = 0x02
GSMTAP_TYPE_OSMOCORE_LOG = 0x10
class TooSmall(RuntimeError):
pass
# struct gsmtap_hdr {
# uint8_t version; /*!< version, set to 0x01 currently */
# uint8_t hdr_len; /*!< length in number of 32bit words */
# uint8_t type; /*!< see GSMTAP_TYPE_* */
# uint8_t timeslot; /*!< timeslot (0..7 on Um) */
#
# uint16_t arfcn; /*!< ARFCN (frequency) */
# int8_t signal_dbm; /*!< signal level in dBm */
# int8_t snr_db; /*!< signal/noise ratio in dB */
#
# uint32_t frame_number; /*!< GSM Frame Number (FN) */
#
# uint8_t sub_type; /*!< Type of burst/channel, see above */
# uint8_t antenna_nr; /*!< Antenna Number */
# uint8_t sub_slot; /*!< sub-slot within timeslot */
# uint8_t res; /*!< reserved for future use (RFU) */
#
# }
class gsmtap_hdr():
def __init__(self, data):
if len(data) < 2:
raise TooSmall()
self.version, self.hdr_len = struct.unpack('!BB', data[0:2])
self.hdr_len *= 4
if self.hdr_len >= 3:
self.type = struct.unpack('!B', data[2:3])[0]
# /*! Structure of the GSMTAP libosmocore logging header */
# struct gsmtap_osmocore_log_hdr {
# struct {
# uint32_t sec;
# uint32_t usec;
# } ts;
# char proc_name[16]; /*!< name of process */
# uint32_t pid; /*!< process ID */
# uint8_t level; /*!< logging level */
# uint8_t _pad[3];
# /* TODO: color */
# char subsys[16]; /*!< logging sub-system */
# struct {
# char name[32]; /*!< source file name */
# uint32_t line_nr;/*!< line number */
# } src_file;
# } __attribute__((packed));
class gsmtap_log():
def __init__(self, data):
packformat = '!II16sIBxxx16s32sI'
packlen = struct.calcsize(packformat)
if len(data) < packlen:
raise TooSmall()
self.sec, self.usec, \
self.proc_name, self.pid, \
self.level, self.subsys, \
self.filename, self.fileline_nr = struct.unpack(packformat, data[:packlen])
message_len = len(data) - packlen
if message_len > 0:
self.message = data[packlen:].decode('utf-8')

92
utils/gsmtap_logread.py Normal file
View File

@ -0,0 +1,92 @@
#!/usr/bin/env python
#
# License: MIT
# Copyright 2019 by Sysmocom s.f.m.c. GmbH
# Author: Alexander Couzens <lynxis@fe80.eu>
import logging
import socket
from gsmtap import GSMTAP_TYPE_OSMOCORE_LOG, gsmtap_hdr, gsmtap_log, TooSmall
LOG = logging.getLogger("gsmlogreader")
def parse_gsm(packet):
hdr = None
try:
hdr = gsmtap_hdr(packet)
except TooSmall:
return None
if hdr.type != GSMTAP_TYPE_OSMOCORE_LOG:
return None
if len(packet) <= hdr.hdr_len:
return None
try:
return gsmtap_log(packet[hdr.hdr_len:])
except TooSmall:
return None
def gsmtaplevel_to_loglevel(level):
""" convert a gsmtap log level into a python log level """
if level <= 1:
return logging.DEBUG
if level <= 3:
return logging.INFO
if level <= 5:
return logging.WARNING
return logging.ERROR
def convert_gsmtap_log(gsmtap):
level = gsmtaplevel_to_loglevel(gsmtap.level)
attr = {
"name": "gsmtap",
"levelno": level,
"levelname": gsmtap_get_logname(gsmtap.level),
"pathname": gsmtap.filename,
"lineno": gsmtap.fileline_nr,
"processName": gsmtap.proc_name,
"process": gsmtap.pid,
"module": gsmtap.subsys,
"created": float(gsmtap.sec + gsmtap.usec / 1000000.0),
"msec": int(gsmtap.usec / 1000),
"msg": gsmtap.message.replace('\n', ' '),
}
return attr
def gsmtap_get_logname(level):
names = {
1: "DEBUG",
3: "INFO",
5: "NOTICE",
7: "ERROR",
8: "FATAL",
}
if level in names:
return names[level]
return "UNKNOWN"
if __name__ == "__main__":
# Create a UDP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_address = ('0.0.0.0', 4729)
sock.bind(server_address)
logger = logging.getLogger("gsmtap")
logformat = "%(asctime)s %(message)s"
logging.basicConfig(format=logformat, level=logging.DEBUG)
while True:
data, address = sock.recvfrom(4096)
log = parse_gsm(data)
if not log:
continue
record = logging.makeLogRecord(convert_gsmtap_log(log))
logger.handle(record)