#!/usr/bin/python # # p25craft.py - utility for crafting APCO P25 packets # # version 1.0 # # This is a command line tool for generating test vectors consisting of one or # more P25 packets. It produces text output similar to the test vectors # presented in the P25 standards documents (TIA-102). It can also produce # binary output suitable for viewing with a hex editor or loading into a vector # signal generator. # # The generated packets are intended to comply with the following standards: # TIA-102.BAAA-A Project 25 FDMA - Common Air Interface # TIA-102.AABB-A Project 25 Trunking Control Channel Formats # TIA-102.AABC-C Project 25 Trunking Control Channel Messages # TIA-102.AABF-B Link Control Word Formats and Messages # # Testing has been done to verify agreement with test vectors listed in: # TIA-102.BAAB-B Project 25 Common Air Interface Conformance Test # # In addition to command line use, this can be used as a python module. # example: # $ python # >>> from p25craft import * # >>> construct_ext_fnct_cmd_check(0x293, 0x1, 0x000001, 0x000002) # Special Packet: Extended Function Command - Radio Check # Extended Function: # Class = 00 # . . . # # Copyright 2011 Michael Ossmann # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2, or (at your option) # any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; see the file COPYING. If not, write to # the Free Software Foundation, Inc., 51 Franklin Street, # Boston, MA 02110-1301, USA. import sys, struct quiet = False outfile = "" flip = 0x000 ##################### # utility functions # ##################### def text_out(text): if not quiet: sys.stdout.write(text) # Print a list of dibits. def print_dibits(data): assert len(data) % 2 == 0 for i in range(0, len(data), 2): nibble = (data[i] << 2) + data[i + 1] text_out("%01x" % nibble) text_out("\n") # Print output similar to examples found in CAI conformance test specification. # Input should be a list of dibits. def print_spec(data): # we only support full microslots assert len(data) % 36 == 0 microslot = 0 text_out("\t\tMicroslot: ___________0___________ ___________1___________") for i in range(0, len(data), 36): if microslot % 2 == 0: text_out("\n") text_out("\t\t" "%9d: " % microslot) text_out(" ") for j in range(0, 36, 6): dodectet = 0 for k in range(6): dodectet |= data[i+j+k] << (10 - k*2) text_out("%03x " % (dodectet ^ flip)) microslot += 1 text_out("\n") text_out("\n") # also produce binary output if requested if outfile: for i in range(0, len(data), 4): byte = 0 for j in range(4): byte |= data[i+j] << (6 - j*2) outfile.write(struct.pack('B1', (byte ^ flip & 0xff))) # Split an integer into list of bytes. def split_bytes(data, len): bytes = [] for i in range((len - 1) * 8, -8, -8): bytes.append((data >> i) & 0xff) return bytes # Split an integer into list of dibits. # count should be set to the number of dibits to extract. def split_dibits(data, count): dibits = [] for i in range(count*2 - 2, -2, -2): dibits.append((data >> i) & 0x03) return dibits # Split an integer into list of tribits. # count should be set to the number of tribits to extract. def split_tribits(data, count): dibits = [] for i in range(count*3 - 3, -3, -3): dibits.append((data >> i) & 0x07) return dibits # Insert status symbols. # Arguments are lists of dibits. Returns list of dibits. def insert_status(data, ssyms): stats = list(ssyms) stats.reverse() i = 1 syms = [] for dibit in data: syms.append(dibit) if i % 35 == 0: if stats: syms.append(stats.pop()) else: sys.stderr.write("error: ran out of status symbols\n") i += 1 # pad to end of status symbols while stats: syms.append(0) if i % 35 == 0: syms.append(stats.pop()) if stats: sys.stderr.write("warning: excess status symbols\n") i += 1 return syms ######################## # error control coding # ######################## # (64,16,23) BCH encoder # spec sometimes refers to this as (63,16,23) plus a parity bit # argument is an integer # returns an integer def bch_64_16_23_encode(data): matrix = ( 0x8000cd930bdd3b2a, 0x4000ab5a8e33a6be, 0x2000983e4cc4e874, 0x10004c1f2662743a, 0x0800eb9c98ec0136, 0x0400b85d47ab3bb0, 0x02005c2ea3d59dd8, 0x01002e1751eaceec, 0x0080170ba8f56776, 0x0040c616dfa78890, 0x0020630b6fd3c448, 0x00103185b7e9e224, 0x000818c2dbf4f112, 0x0004c1f2662743a2, 0x0002ad6a38ce9afb, 0x00019b2617ba7657) assert data < 2**16 codeword = 0 for i in range(16): if data & (0x8000 >> i): codeword ^= matrix[i] return codeword # GF(2^6) multiply (for Reed-Solomon encoder) def gf6mult(a, b): assert a < 2**6 assert b < 2**6 p = 0 for i in range(6): if b & 1: p ^= a a <<= 1 if a & 0x40: a ^= 0x43 # primitive polynomial: x^6 + x + 1 b >>= 1 return p # (36,20,17) shortened Reed-Solomon encoder # argument is an integer # returns an integer def rs_36_20_17_encode(data): matrix = ( (1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,074,037,034,006,002,007,044,064,026,014,026,044,054,013,077,005), (0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,004,017,050,024,011,005,030,057,033,003,002,002,015,016,025,026), (0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,007,023,037,046,056,075,043,045,055,021,050,031,045,027,071,062), (0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,026,005,007,063,063,027,063,040,006,004,040,045,047,030,075,007), (0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,023,073,073,041,072,034,021,051,067,016,031,074,011,021,012,021), (0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,024,051,025,023,022,041,074,066,074,065,070,036,067,045,064,001), (0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,052,033,014,002,020,006,014,025,052,023,035,074,075,075,043,027), (0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,055,062,056,025,073,060,015,030,013,017,020,002,070,055,014,047), (0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,054,051,032,065,077,012,054,013,035,032,056,012,075,001,072,063), (0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,074,041,030,041,043,022,051,006,064,033,003,047,027,012,055,047), (0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,054,070,011,003,013,022,016,057,003,045,072,031,030,056,035,022), (0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,051,007,072,030,065,054,006,021,036,063,050,061,064,052,001,060), (0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,001,065,032,070,013,044,073,024,012,052,021,055,012,035,014,072), (0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,011,070,005,010,065,024,015,077,022,024,024,074,007,044,007,046), (0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,006,002,065,011,041,020,045,042,046,054,035,012,040,064,065,033), (0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,034,031,001,015,044,064,016,024,052,016,006,062,020,013,055,057), (0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,063,043,025,044,077,063,017,017,064,014,040,074,031,072,054,006), (0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,071,021,070,044,056,004,030,074,004,023,071,070,063,045,056,043), (0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,002,001,053,074,002,014,052,074,012,057,024,063,015,042,052,033), (0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,034,035,002,023,021,027,022,033,064,042,005,073,051,046,073,060)) assert data < 2**120 codeword = [0,] * 36 for i in range(36): for j in range(20): hexbit = (data >> ((19 - j) * 6)) & 0x3f codeword[i] ^= gf6mult(hexbit, matrix[j][i]) return codeword # (24,12,13) shortened Reed-Solomon encoder # argument is an integer # returns an integer def rs_24_12_13_encode(data): matrix = ( (1,0,0,0,0,0,0,0,0,0,0,0,062,044,003,025,014,016,027,003,053,004,036,047), (0,1,0,0,0,0,0,0,0,0,0,0,011,012,011,011,016,064,067,055,001,076,026,073), (0,0,1,0,0,0,0,0,0,0,0,0,003,001,005,075,014,006,020,044,066,006,070,066), (0,0,0,1,0,0,0,0,0,0,0,0,021,070,027,045,016,067,023,064,073,033,044,021), (0,0,0,0,1,0,0,0,0,0,0,0,030,022,003,075,015,015,033,015,051,003,053,050), (0,0,0,0,0,1,0,0,0,0,0,0,001,041,027,056,076,064,021,053,004,025,001,012), (0,0,0,0,0,0,1,0,0,0,0,0,061,076,021,055,076,001,063,035,030,013,064,070), (0,0,0,0,0,0,0,1,0,0,0,0,024,022,071,056,021,035,073,042,057,074,043,076), (0,0,0,0,0,0,0,0,1,0,0,0,072,042,005,020,043,047,033,056,001,016,013,076), (0,0,0,0,0,0,0,0,0,1,0,0,072,014,065,054,035,025,041,016,015,040,071,026), (0,0,0,0,0,0,0,0,0,0,1,0,073,065,036,061,042,022,017,004,044,020,025,005), (0,0,0,0,0,0,0,0,0,0,0,1,071,005,055,003,071,034,060,011,074,002,041,050)) assert data < 2**72 codeword = [0,] * 24 for i in range(24): for j in range(12): hexbit = (data >> ((11 - j) * 6)) & 0x3f codeword[i] ^= gf6mult(hexbit, matrix[j][i]) return codeword # (24,16,9) shortened Reed-Solomon encoder # argument is an integer # returns an integer def rs_24_16_9_encode(data): matrix = ( (1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,051,045,067,015,064,067,052,012), (0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,057,025,063,073,071,022,040,015), (0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,005,001,031,004,016,054,025,076), (0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,073,007,047,014,041,077,047,011), (0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,075,015,051,051,017,067,017,057), (0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,020,032,014,042,075,042,070,054), (0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,002,075,043,005,001,040,012,064), (0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,024,074,015,072,024,026,074,061), (0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,042,064,007,022,061,020,040,065), (0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,032,032,055,041,057,066,021,077), (0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,065,036,025,007,050,016,040,051), (0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,064,006,054,032,076,046,014,036), (0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,062,063,074,070,005,027,037,046), (0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,055,043,034,071,057,076,050,064), (0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,024,023,023,005,050,070,042,023), (0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,067,075,045,060,057,024,006,026)) assert data < 2**96 codeword = [0,] * 24 for i in range(24): for j in range(16): hexbit = (data >> ((15 - j) * 6)) & 0x3f codeword[i] ^= gf6mult(hexbit, matrix[j][i]) return codeword # (24,12,8) extended Golay encoder # argument is an integer # returns an integer def golay_24_12_8_encode(data): matrix = (040006165, 020003073, 010007550, 04003664, 02001732, 01006631, 0403315, 0201547, 0106706, 045227, 024476, 014353) assert data < 2**12 codeword = 0 for i in range(12): if data & (04000 >> i): codeword ^= matrix[i] return codeword # (23,12,7) Golay encoder # argument is an integer # returns an integer def golay_23_12_8_encode(data): return golay_24_12_8_encode(data) >> 1 # (18,6,8) shortened Golay encoder # argument is an integer # returns an integer def golay_18_6_8_encode(data): assert data < 2**6 return golay_24_12_8_encode(data) # (16,8,5) shortened cyclic encoder # argument is an integer # returns an integer def cyclic_16_8_5_encode(data): matrix = (0x804e, 0x4027, 0x208f, 0x10db, 0x08f1, 0x04e4, 0x0272, 0x0139) assert data < 2**8 codeword = 0 for i in range(8): if data & (0x80 >> i): codeword ^= matrix[i] return codeword # (10,6,3) shortened Hamming encoder # argument is an integer # returns an integer def hamming_10_6_3_encode(data): matrix = (0x20e, 0x10d, 0x08b, 0x047, 0x023, 0x01c) assert data < 2**6 codeword = 0 for i in range(6): if data & (0x20 >> i): codeword ^= matrix[i] return codeword # (15,11,3) Hamming encoder # argument is an integer # returns an integer def hamming_15_11_3_encode(data): matrix = (0x400f, 0x200e, 0x100d, 0x080c, 0x040b, 0x020a, 0x0109, 0x0087, 0x0046, 0x0025, 0x0013) assert data < 2**11 codeword = 0 for i in range(11): if data & (0x400 >> i): codeword ^= matrix[i] return codeword # sequence of cyclic encodings for LDU1 # argument is an integer # returns an integer def ldu1_cyclic(lsd): word = cyclic_16_8_5_encode((lsd >> 24) & 0xff) word <<= 16 word |= cyclic_16_8_5_encode((lsd >> 16) & 0xff) return word # sequence of cyclic encodings for LDU2 # argument is an integer # returns an integer def ldu2_cyclic(lsd): word = cyclic_16_8_5_encode((lsd >> 8) & 0xff) word <<= 16 word |= cyclic_16_8_5_encode(lsd & 0xff) return word # sequence of golay encodings for HDU # argument is an integer # returns an integer def header_golay(rs_codeword): out = 0 for i in range(36): out <<= 18 out |= golay_18_6_8_encode(rs_codeword[i]) return out # sequence of hamming encodings for LDU1 and LDU2 # argument is an integer # returns an integer def ldu_hamming(rs_codeword): out = 0 for i in range(24): out <<= 10 out |= hamming_10_6_3_encode(rs_codeword[i]) return out # sequence of golay encodings for xTDU # argument is an integer # returns an integer def xtdu_golay(rs_codeword): out = 0 for i in range(0, 24, 2): out <<= 24 data = (rs_codeword[i] << 6) + rs_codeword[i+1] out |= golay_24_12_8_encode(data) return out # interleave a sequence of 98 symbols (for trellis encoded data) # argument is list of dibits # returns list of dibits def data_interleave(input): assert len(input) == 98 output = [] for j in range(0,97,8): output.extend(input[j:j+2]) for i in range(2,7,2): for j in range(0,89,8): output.extend(input[i+j:i+j+2]) return output # 1/2 rate trellis encode a sequence of dibits # argument is list of dibits # returns list of dibits def trellis_1_2_encode(input): # append flushing dibit input = list(input) input.append(0) output = [] state = 0 # state transition table, including constellation to dibit pair mapping table = ( ((0, 2), (3, 0), (0, 1), (3, 3)), ((3, 2), (0, 0), (3, 1), (0, 3)), ((2, 1), (1, 3), (2, 2), (1, 0)), ((1, 1), (2, 3), (1, 2), (2, 0))) for i in range(len(input)): output.extend(table[state][input[i]]) state = input[i] # return dibits return output # 3/4 rate trellis encode a sequence of tribits # argument is list of tribits # returns list of dibits def trellis_3_4_encode(input): # append flushing tribit input = list(input) input.append(0) output = [] state = 0 # state transition table, including constellation to dibit pair mapping table = ( ((0, 2), (3, 1), (3, 2), (0, 1), (1, 3), (2, 0), (2, 3), (1, 0)), ((3, 2), (0, 1), (1, 3), (2, 0), (2, 3), (1, 0), (0, 2), (3, 1)), ((2, 2), (1, 1), (1, 2), (2, 1), (3, 3), (0, 0), (0, 3), (3, 0)), ((1, 2), (2, 1), (3, 3), (0, 0), (0, 3), (3, 0), (2, 2), (1, 1)), ((3, 3), (0, 0), (0, 3), (3, 0), (2, 2), (1, 1), (1, 2), (2, 1)), ((0, 3), (3, 0), (2, 2), (1, 1), (1, 2), (2, 1), (3, 3), (0, 0)), ((1, 3), (2, 0), (2, 3), (1, 0), (0, 2), (3, 1), (3, 2), (0, 1)), ((2, 3), (1, 0), (0, 2), (3, 1), (3, 2), (0, 1), (1, 3), (2, 0))) for i in range(len(input)): output.extend(table[state][input[i]]) state = input[i] # return dibits return output # 16 bit CRC_CCITT over 80 data bits # argument is an integer # returns an integer def crc_ccitt(data): assert data >= 0 assert data <= 0xffffffffffffffffffffL g = (1 << 12) | (1 << 5) | 1 crc = 0; for i in range(79, -1, -1): crc <<= 1 if (((crc >> 16) ^ (data >> i)) & 1): crc ^= g crc = (crc & 0xffff) ^ 0xffff return crc # 32 bit CRC over variable number of data bits # arguments are integers # returns an integer def crc_32(data, length): assert length >= 0 assert length <= 4096 assert data >= 0 assert data < 2**length g = 0x04c11db7 crc = 0; for i in range(length - 1, -1, -1): crc <<= 1 if (((crc >> 32) ^ (data >> i)) & 1): crc ^= g crc = (crc & 0xffffffff) ^ 0xffffffff return crc # 9 bit CRC over 7 bit serial number and 128 data bits # arguments are integers # returns an integer def crc_9(serial, data): assert serial >= 0 assert serial <= 0x7f assert data >= 0 assert data <= 0xffffffffffffffffffffffffffffffffL data |= (serial << 128) g = (1 << 6) | (1 << 4) | (1 << 3) | 1 crc = 0; for i in range(134, -1, -1): crc <<= 1 if (((crc >> 9) ^ (data >> i)) & 1): crc ^= g crc = (crc & 0x1ff) ^ 0x1ff return crc ############################## # construct parts of packets # ############################## # arguments are integers # returns list of dibits def start_packet(nac, duid): assert nac <= 0xfff assert duid <= 0xf symbols = [] # every packet gets a frame sync fs = 0x5575f5ff77ff symbols.extend(split_dibits(fs, 24)) # add NID codeword nid = bch_64_16_23_encode((nac << 4) | duid) symbols.extend(split_dibits(nid, 32)) return symbols # arguments are integers # returns an integer def construct_lcf(p, sf, lco): text_out("\t\tLink Control Format:\n") assert p <= 0x1 assert sf <= 0x1 assert lco <= 0x3f lcf = p << 7 lcf |= sf << 6 lcf |= lco text_out("\t\t\tP = %01x\n" % p) text_out("\t\t\tSF = %01x\n" % sf) text_out("\t\t\tLCO = %02x\n" % lco) text_out("\t\t\tLCF = %02x\n" % lcf) return lcf # arguments are integers # returns an integer def construct_svcopt(e, p, d, m, r, pri): text_out("\t\tService Options:\n") assert e <= 0x1 assert p <= 0x1 assert d <= 0x1 assert m <= 0x1 assert r <= 0x1 assert pri <= 0x7 so = e << 7 so |= p << 6 so |= d << 5 so |= m << 4 so |= pri text_out("\t\t\tE = %01x\n" % e) text_out("\t\t\tP = %01x\n" % p) text_out("\t\t\tD = %01x\n" % d) text_out("\t\t\tM = %01x\n" % m) text_out("\t\t\tR = %01x\n" % 0) text_out("\t\t\tPri = %02x\n" % pri) text_out("\t\t\tSvcOpt = %02x\n" % so) return so # arguments are integers # returns an integer def construct_lc(lco, mfid, svcopt, s, tgid, dst, src): text_out("\tLink Control Word:\n") assert lco <= 0xff assert mfid <= 0xff assert svcopt <= 0xff assert s <= 0x1 assert tgid <= 0xffff assert dst <= 0xffffff assert src <= 0xffffff lcf = construct_lcf(0, 0, lco) lc = lcf << 64 text_out("\t\tMFID = %02x\n" % mfid) lc |= mfid << 56 lc |= svcopt << 48 # We only implement the formats listed in the Common Air Interface # specification. For other formats, see TIA-102.AABF-B. if lco == 0: # group call # s = explicit source ID required text_out("\t\tS = %01x\n" % s) lc |= s << 32 text_out("\t\tTGID = %04x\n" % tgid) lc |= tgid << 24 elif lco == 3: # individual call text_out("\t\tTUID = %06x\n" % dst) lc |= dst << 24 else: sys.stderr.write("error: --lco must be 0 or 3\n") sys.exit(1) text_out("\t\tSUID = %06x\n" % src) lc |= src text_out("\t\tLCW = %018x\n" % lc) return lc # arguments are integers # returns an integer def construct_es(mi, algid, kid): text_out("\tEncryption Sync Word:\n") assert mi <= 0xffffffffffffffffffL assert algid <= 0xff assert kid <= 0xffff es = mi << 24 es |= algid << 16 es |= kid text_out("\t\tMI = %018x\n" % mi) text_out("\t\tALGID = %02x\n" % algid) text_out("\t\tKID = %04x\n" % kid) text_out("\t\tESW = %024x\n" % es) return es # arguments are integers # returns an integer def construct_tsbk(lb, p, opcode, mfid, arg): text_out("\tTrunking Signaling Block:\n") assert lb <= 0x1 assert p <= 0x1 assert opcode <= 0x3f assert mfid <= 0xff assert arg <= 0xffffffffffffffffL tsbk = lb << 95 tsbk |= p << 94 tsbk |= opcode << 88 tsbk |= mfid << 80 tsbk |= arg << 16 crc = crc_ccitt(tsbk >> 16) tsbk |= crc text_out("\t\tLB = %01x\n" % lb) text_out("\t\tP = %01x\n" % p) text_out("\t\tOpcode = %02x\n" % opcode) text_out("\t\tMFID = %02x\n" % mfid) text_out("\t\tArguments = %016x\n" % arg) text_out("\t\tCRC = %04x\n" % crc) text_out("\t\tTSBK = %024x\n" % tsbk) return tsbk # arguments are integers # returns an integer def construct_cpduh(an, io, sapid, mfid, llid, fmf, btf, poc, syn, ns, fsnf, dho): text_out("\tConfirmed Packet Data Unit Header:\n") assert an <= 0x1 assert io <= 0x1 assert sapid <= 0x3f assert mfid <= 0xff assert llid <= 0xffffff assert fmf <= 0x1 assert btf <= 0x7f assert poc <= 0x1f assert syn <= 0x1 assert ns <= 0x7 assert fsnf <= 0xf assert dho <= 0x3f duformat = 0x16 hdr = an << 94 # 1 indicates confirmation desired hdr |= io << 93 # 1 for outbound, 0 for inbound hdr |= duformat << 88 hdr |= 0x3 << 86 hdr |= sapid << 80 # SAP Identifier hdr |= mfid << 72 hdr |= llid << 48 # Logical Link ID hdr |= fmf << 47 # Full Message Flag hdr |= btf << 40 # blocks to follow hdr |= poc << 32 # pad octet count hdr |= syn << 31 hdr |= ns << 28 # N(S) sequence number hdr |= fsnf << 24 # Fragment Sequence Number Field hdr |= dho << 16 # Data Header Offset crc = crc_ccitt(hdr >> 16) hdr |= crc text_out("\t\tA/N = %01x\n" % an) text_out("\t\tI/O = %01x\n" % io) text_out("\t\tFormat = %02x\n" % duformat) text_out("\t\tSAP = %02x\n" % sapid) text_out("\t\tMFID = %02x\n" % mfid) text_out("\t\tLLID = %06x\n" % llid) text_out("\t\tFMF = %01x\n" % fmf) text_out("\t\tBTF = %02x\n" % btf) text_out("\t\tPOC = %02x\n" % poc) text_out("\t\tSyn = %01x\n" % syn) text_out("\t\tN(S) = %01x\n" % ns) text_out("\t\tFSNF = %01x\n" % fsnf) text_out("\t\tDHO = %02x\n" % dho) text_out("\t\tCRC = %04x\n" % crc) text_out("\t\tHeader = %024x\n" % hdr) return hdr # arguments are integers # returns an integer def construct_rpduh(io, rclass, rtype, rstatus, mfid, llid, x, btf, sllid): text_out("\tResponse Packet Data Unit Header:\n") assert io <= 0x1 assert rclass <= 0x3 assert rtype <= 0x7 assert rstatus <= 0x7 assert mfid <= 0xff assert llid <= 0xffffff assert btf <= 0x7f assert sllid <= 0xffffff if x: assert sllid == 0 duformat = 0x3 hdr = io << 93 # 1 for outbound, 0 for inbound hdr |= duformat << 88 hdr |= rclass << 86 # response class hdr |= rtype << 83 # response types hdr |= rstatus << 80 # response status hdr |= mfid << 72 hdr |= llid << 48 # Logical Link ID hdr |= x << 47 # X hdr |= btf << 40 # blocks to follow hdr |= sllid << 16 # Source Logical Link ID crc = crc_ccitt(hdr >> 16) hdr |= crc text_out("\t\tI/O = %01x\n" % io) text_out("\t\tFormat = %02x\n" % duformat) text_out("\t\tClass = %01x\n" % rclass) text_out("\t\tType = %01x\n" % rtype) text_out("\t\tStatus = %01x\n" % rstatus) text_out("\t\tMFID = %02x\n" % mfid) text_out("\t\tLLID = %06x\n" % llid) text_out("\t\tX = %01x\n" % x) text_out("\t\tBTF = %02x\n" % btf) text_out("\t\tSLLID = %06x\n" % sllid) text_out("\t\tCRC = %04x\n" % crc) text_out("\t\tHeader = %024x\n" % hdr) return hdr # arguments are integers # returns an integer def construct_upduh(io, sapid, mfid, llid, btf, poc, dho): text_out("\tUnconfirmed Packet Data Unit Header:\n") assert io <= 0x1 assert sapid <= 0x3f assert mfid <= 0xff assert llid <= 0xffffff assert btf <= 0x7f assert poc <= 0x1f assert dho <= 0x3f duformat = 0x15 hdr = io << 93 # 1 for outbound, 0 for inbound hdr |= duformat << 88 hdr |= 0x3 << 86 hdr |= sapid << 80 # SAP Identifier hdr |= mfid << 72 hdr |= llid << 48 # Logical Link ID hdr |= 0x1 << 47 hdr |= btf << 40 # blocks to follow hdr |= poc << 32 # pad octet count hdr |= dho << 16 # Data Header Offset crc = crc_ccitt(hdr >> 16) hdr |= crc text_out("\t\tI/O = %01x\n" % io) text_out("\t\tFormat = %02x\n" % duformat) text_out("\t\tSAP = %02x\n" % sapid) text_out("\t\tMFID = %02x\n" % mfid) text_out("\t\tLLID = %06x\n" % llid) text_out("\t\tBTF = %02x\n" % btf) text_out("\t\tPOC = %02x\n" % poc) text_out("\t\tDHO = %02x\n" % dho) text_out("\t\tCRC = %04x\n" % crc) text_out("\t\tHeader = %024x\n" % hdr) return hdr # arguments are integers # returns an integer def construct_ambth(io, sapid, mfid, llid, btf, opcode, dbtm): text_out("\tAlternate Multiple Block Trunking Header:\n") assert io <= 0x1 assert sapid <= 0x3f assert mfid <= 0xff assert llid <= 0xffffff assert btf <= 0x7f assert opcode <= 0x3f assert dbtm <= 0xffff duformat = 0x17 hdr = io << 93 # 1 for outbound, 0 for inbound hdr |= duformat << 88 hdr |= 0x3 << 86 hdr |= sapid << 80 # SAP Identifier hdr |= mfid << 72 hdr |= llid << 48 # Logical Link ID hdr |= 0x1 << 47 hdr |= btf << 40 # blocks to follow hdr |= opcode << 32 hdr |= dbtm << 16 # defined by trunking messages crc = crc_ccitt(hdr >> 16) hdr |= crc text_out("\t\tI/O = %01x\n" % io) text_out("\t\tFormat = %02x\n" % duformat) text_out("\t\tSAP = %02x\n" % sapid) text_out("\t\tMFID = %02x\n" % mfid) text_out("\t\tLLID = %06x\n" % llid) text_out("\t\tBTF = %02x\n" % btf) text_out("\t\tOpcode = %02x\n" % opcode) text_out("\t\tDBTM = %04x\n" % dbtm) text_out("\t\tCRC = %04x\n" % crc) text_out("\t\tHeader = %024x\n" % hdr) return hdr # arguments are integers # returns an integer def construct_ef(efclass, operand, arguments): text_out("\tExtended Function:\n") assert efclass <= 0xff assert operand <= 0xff assert arguments <= 0xffffff ef = efclass << 32 ef |= operand << 24 ef |= arguments text_out("\t\tClass = %02x\n" % efclass) text_out("\t\tOperand = %02x\n" % operand) text_out("\t\tArguments = %06x\n" % arguments) return ef ############################## # construct complete packets # ############################## # Header Data Unit def construct_hdu(nac, ss, mi, mfid, algid, kid, tgid): text_out("Header Data Unit:\n") assert nac <= 0xfff assert ss <= 0x3 assert mi <= 0xffffffffffffffffffL assert algid <= 0xff assert kid <= 0xffff assert tgid <= 0xffff duid = 0x0 ssyms = (ss,) * 11 symbols = start_packet(nac, duid) # HDU codeword hdr = 0 hdr |= mi << 48 hdr |= mfid << 40 hdr |= algid << 32 hdr |= kid << 16 hdr |= tgid rs_codeword = rs_36_20_17_encode(hdr) symbols.extend(split_dibits(header_golay(rs_codeword), 324)) text_out("\tDUID = %01x\n" % duid) text_out("\tNAC = %03x\n" % nac) text_out("\tSSym = %d %d %d %d %d %d %d %d %d %d %d\n" % ssyms) text_out("\tMI = %018x\n" % mi) text_out("\tMFID = %02x\n" % mfid) text_out("\tALGID = %02x\n" % algid) text_out("\tKID = %04x\n" % kid) text_out("\tTGID = %04x\n" % tgid) text_out("\tSymbol data:\n") print_spec(insert_status(symbols, ssyms)) # Logical Link Data Unit 1 def construct_ldu1(nac, ss, imbe, lsd, lco, mfid, svcopt, s, tgid, dst, src): text_out("Logical Link Data Unit 1:\n") assert nac <= 0xfff assert ss <= 0x3 assert imbe <= 0xffffffffffffffffffffffffffffffffffffL assert lsd <= 0xffffffff assert lco <= 0xff assert mfid <= 0xff assert svcopt <= 0xff assert s <= 0x1 assert tgid <= 0xffff assert dst <= 0xffffff assert src <= 0xffffff duid = 0x5 ssyms = (ss,) * 24 text_out("\tDUID = %01x\n" % duid) text_out("\tNAC = %03x\n" % nac) text_out("\tSSym = %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d\n" % ssyms) #text_out("\tIMBE = %023x\n" % imbe) text_out("\tIMBE = %036x\n" % imbe) text_out("\tLSD = %08x\n" % lsd) symbols = start_packet(nac, duid) # Link Control Word lc = construct_lc(lco, mfid, svcopt, s, tgid, dst, src) rs_codeword = rs_24_12_13_encode(lc) lcw_syms = split_dibits(ldu_hamming(rs_codeword), 120) # Low Speed Data lsd_syms = split_dibits(ldu1_cyclic(lsd), 16) symbols.extend(split_dibits(imbe, 72)) symbols.extend(split_dibits(imbe ^ 2, 72)) # flipping sync bit symbols.extend(lcw_syms[0:20]) symbols.extend(split_dibits(imbe, 72)) symbols.extend(lcw_syms[20:40]) symbols.extend(split_dibits(imbe ^ 2, 72)) symbols.extend(lcw_syms[40:60]) symbols.extend(split_dibits(imbe, 72)) symbols.extend(lcw_syms[60:80]) symbols.extend(split_dibits(imbe ^ 2, 72)) symbols.extend(lcw_syms[80:100]) symbols.extend(split_dibits(imbe, 72)) symbols.extend(lcw_syms[100:120]) symbols.extend(split_dibits(imbe ^ 2, 72)) symbols.extend(lsd_syms) symbols.extend(split_dibits(imbe, 72)) text_out("\tSymbol data:\n") print_spec(insert_status(symbols, ssyms)) # Logical Link Data Unit 2 def construct_ldu2(nac, ss, imbe, lsd, mi, algid, kid): text_out("Logical Link Data Unit 2:\n") assert nac <= 0xfff assert ss <= 0x3 assert imbe <= 0xffffffffffffffffffffffffffffffffffffL assert lsd <= 0xffffffff assert mi <= 0xffffffffffffffffffL assert algid <= 0xff assert kid <= 0xffff duid = 0xa ssyms = (ss,) * 24 text_out("\tDUID = %01x\n" % duid) text_out("\tNAC = %03x\n" % nac) text_out("\tSSym = %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d\n" % ssyms) text_out("\tIMBE = %036x\n" % imbe) text_out("\tLSD = %08x\n" % lsd) symbols = start_packet(nac, duid) # Encryption Sync Word es = construct_es(mi, algid, kid) rs_codeword = rs_24_16_9_encode(es) es_syms = split_dibits(ldu_hamming(rs_codeword), 120) # Low Speed Data lsd_syms = split_dibits(ldu2_cyclic(lsd), 16) symbols.extend(split_dibits(imbe ^ 2, 72)) # flipping sync bit symbols.extend(split_dibits(imbe, 72)) symbols.extend(es_syms[0:20]) symbols.extend(split_dibits(imbe ^ 2, 72)) symbols.extend(es_syms[20:40]) symbols.extend(split_dibits(imbe, 72)) symbols.extend(es_syms[40:60]) symbols.extend(split_dibits(imbe ^ 2, 72)) symbols.extend(es_syms[60:80]) symbols.extend(split_dibits(imbe, 72)) symbols.extend(es_syms[80:100]) symbols.extend(split_dibits(imbe ^ 2, 72)) symbols.extend(es_syms[100:120]) symbols.extend(split_dibits(imbe, 72)) symbols.extend(lsd_syms) symbols.extend(split_dibits(imbe ^ 2, 72)) text_out("\tSymbol data:\n") print_spec(insert_status(symbols, ssyms)) # Simple Terminator Data Unit def construct_stdu(nac, ss): text_out("Simple Terminator Data Unit:\n") assert nac <= 0xfff assert ss <= 0x3 duid = 0x3 ssyms = (ss,) * 2 symbols = start_packet(nac, duid) text_out("\tDUID = %01x\n" % duid) text_out("\tNAC = %03x\n" % nac) text_out("\tSSym = %d %d\n" % ssyms) text_out("\tSymbol data:\n") print_spec(insert_status(symbols, ssyms)) # Terminator Data Unit with Link Control # # aka Extended Terminator Data Unit def construct_xtdu(nac, ss, lco, mfid, svcopt, s, tgid, dst, src): text_out("Terminator Data Unit with Link Control:\n") assert nac <= 0xfff assert ss <= 0x3 assert lco <= 0xff assert mfid <= 0xff assert svcopt <= 0xff assert s <= 0x1 assert tgid <= 0xffff assert dst <= 0xffffff assert src <= 0xffffff duid = 0xf ssyms = (ss,) * 6 text_out("\tDUID = %01x\n" % duid) text_out("\tNAC = %03x\n" % nac) text_out("\tSSym = %d %d %d %d %d %d\n" % ssyms) symbols = start_packet(nac, duid) lc = construct_lc(lco, mfid, svcopt, s, tgid, dst, src) rs_codeword = rs_24_12_13_encode(lc) symbols.extend(split_dibits(xtdu_golay(rs_codeword), 144)) text_out("\tSymbol data:\n") print_spec(insert_status(symbols, ssyms)) # Trunking Signaling Data Unit # # The standards variously refer to this packet type as "Trunking Signaling Data # Unit" or "TSBK" or "abbreviated format" or, perhaps most often, "single block # format" even though it may consist of as many as three actual Trunking # Signaling Blocks (TSBKs). def construct_tsdu(nac, ss, blocks, mfid, opcode, arg): text_out("Trunking Signaling Data Unit:\n") assert nac <= 0xfff assert ss <= 0x3 assert blocks <= 0x3 assert mfid <= 0xff assert opcode <= 0x3f assert arg <= 0xffffffffffffffffL duid = 0x7 numss = (56 + (blocks * 98) + 34) / 35 ssyms = (ss,) * numss text_out("\tDUID = %01x\n" % duid) text_out("\tNAC = %03x\n" % nac) text_out("\tSSym =") for i in range(numss): text_out(" %d" % ssyms[i]) text_out("\n") symbols = start_packet(nac, duid) # append TSBKs for i in range(blocks): last_block = (i == (blocks - 1)) tsbk = construct_tsbk(last_block, 0, opcode, mfid, arg) symbols.extend(data_interleave(trellis_1_2_encode(split_dibits(tsbk, 48)))) text_out("\tSymbol data:\n") print_spec(insert_status(symbols, ssyms)) # Confirmed Packet Data Unit def construct_cpdu(nac, ss, data, length, an, io, sapid, mfid, llid, ns, fsnf, dho): text_out("Confirmed Packet Data Unit:\n") assert nac <= 0xfff assert ss <= 0x3 assert an <= 0x1 assert io <= 0x1 assert sapid <= 0x3f assert mfid <= 0xff assert llid <= 0xffffff assert ns <= 0x7 assert fsnf <= 0xf assert dho <= 0x3f assert data.bit_length() <= (length * 8) duid = 0xc symbols = start_packet(nac, duid) # account for length of packet CRC appended to data length += 4 blocks = (length + 15) / 16 assert blocks <= 127 # account for padding to end of next full block pad_octets = (blocks * 16) - length length += pad_octets # add padding data <<= (pad_octets * 8) # I'm assuming the packet CRC should be part of the data used to compute # the block CRC below, but the specification is unclear on this point. packet_crc = crc_32(data, (length - 4) * 8) data <<= (4 * 8) data |= packet_crc numss = (56 + ((blocks + 1) * 98) + 34) / 35 ssyms = (ss,) * numss text_out("\tDUID = %01x\n" % duid) text_out("\tNAC = %03x\n" % nac) text_out("\tSSym =") for i in range(numss): text_out(" %d" % ssyms[i]) text_out("\n") text_out("\tCRC = %08x\n" % packet_crc) text_out("\tData = %x\n" % data) # Certain cases may require more sophisticated handling of FMF and Syn # flags than we support. fmf = 1 syn = 0 header = construct_cpduh(an, io, sapid, mfid, llid, fmf, blocks, pad_octets, syn, ns, fsnf, dho) symbols.extend(data_interleave(trellis_1_2_encode( split_dibits(header, 48)))) for i in range(blocks): text_out("\tConfirmed Packet Data Unit Block:\n") cpdu_block = (data >> (16 * 8 * (blocks - i - 1))) & 0xffffffffffffffffffffffffffffffffL # More sophisticated serial number generation required # for retransmissions. serial = i block_crc = crc_9(serial, cpdu_block) text_out("\t\tSerial = %02x\n" % serial) text_out("\t\tCRC = %03x\n" % block_crc) text_out("\t\tData = %032x\n" % cpdu_block) cpdu_block |= serial << 137 cpdu_block |= block_crc << 128 symbols.extend(data_interleave(trellis_3_4_encode( split_tribits(cpdu_block, 48)))) text_out("\tSymbol data:\n") print_spec(insert_status(symbols, ssyms)) # Response Packet Data Unit def construct_rpdu(nac, ss, data, length, io, rclass, rtype, rstatus, mfid, llid, x, sllid): text_out("Response Packet Data Unit:\n") assert nac <= 0xfff assert ss <= 0x3 assert io <= 0x1 assert rclass <= 0x3 assert rtype <= 0x7 assert rstatus <= 0x7 assert mfid <= 0xff assert llid <= 0xffffff if x: assert sllid == 0 assert sllid <= 0xffffff assert data.bit_length() <= (length * 8) # We only support zero or one data blocks. The specification # says there can be two, but the construction of a two block # response packet is unclear. assert (length == 0) or (length == 8) duid = 0xc symbols = start_packet(nac, duid) if (length > 0): # account for length of packet CRC appended to data length += 4 blocks = (length + 11) / 12 packet_crc = crc_32(data, (length - 4) * 8) data <<= (4 * 8) data |= packet_crc else: blocks = 0 numss = (56 + ((blocks + 1) * 98) + 34) / 35 ssyms = (ss,) * numss text_out("\tDUID = %01x\n" % duid) text_out("\tNAC = %03x\n" % nac) text_out("\tSSym =") for i in range(numss): text_out(" %d" % ssyms[i]) text_out("\n") if (blocks > 0): text_out("\tCRC = %08x\n" % packet_crc) text_out("\tData = %x\n" % data) header = construct_rpduh(io, rclass, rtype, rstatus, mfid, llid, x, blocks, sllid) symbols.extend(data_interleave(trellis_1_2_encode( split_dibits(header, 48)))) for i in range(blocks): text_out("\tResponse Packet Data Unit Block:\n") rpdu_block = (data >> (12 * 8 * (blocks - i - 1))) & 0xffffffffffffffffffffffffL text_out("\t\tData = %024x\n" % rpdu_block) # I'm assuming 1/2 rate trellis encoding based on the length # of the block. The specification doesn't say. symbols.extend(data_interleave(trellis_1_2_encode( split_dibits(rpdu_block, 48)))) text_out("\tSymbol data:\n") print_spec(insert_status(symbols, ssyms)) # Unconfirmed Packet Data Unit def construct_updu(nac, ss, data, length, io, sapid, mfid, llid, dho): text_out("Unconfirmed Packet Data Unit:\n") assert nac <= 0xfff assert ss <= 0x3 assert io <= 0x1 assert sapid <= 0x3f assert mfid <= 0xff assert llid <= 0xffffff assert dho <= 0x3f assert data.bit_length() <= (length * 8) duid = 0xc symbols = start_packet(nac, duid) # account for length of packet CRC appended to data length += 4 blocks = (length + 11) / 12 assert blocks <= 127 # account for padding to end of next full block pad_octets = (blocks * 12) - length length += pad_octets # add padding data <<= (pad_octets * 8) packet_crc = crc_32(data, (length - 4) * 8) data <<= (4 * 8) data |= packet_crc numss = (56 + ((blocks + 1) * 98) + 34) / 35 ssyms = (ss,) * numss text_out("\tDUID = %01x\n" % duid) text_out("\tNAC = %03x\n" % nac) text_out("\tSSym =") for i in range(numss): text_out(" %d" % ssyms[i]) text_out("\n") text_out("\tCRC = %08x\n" % packet_crc) text_out("\tData = %x\n" % data) header = construct_upduh(io, sapid, mfid, llid, blocks, pad_octets, dho) symbols.extend(data_interleave(trellis_1_2_encode( split_dibits(header, 48)))) for i in range(blocks): text_out("\tUnconfirmed Packet Data Unit Block:\n") updu_block = (data >> (12 * 8 * (blocks - i - 1))) & 0xffffffffffffffffffffffffL text_out("\t\tData = %024x\n" % updu_block) symbols.extend(data_interleave(trellis_1_2_encode( split_dibits(updu_block, 48)))) text_out("\tSymbol data:\n") print_spec(insert_status(symbols, ssyms)) # Alternate Multiple Block Trunking (MBT) Control Packet # # This is sometimes called things like "multiblock trunking control" and is a # variation of the Unconfirmed Packet Data Unit. def construct_ambt(nac, ss, data, length, io, sapid, mfid, llid, opcode, dbtm): text_out("Alternate Multiple Block Trunking (MBT) Control Packet:\n") assert nac <= 0xfff assert ss <= 0x3 assert io <= 0x1 assert sapid <= 0x3f assert mfid <= 0xff assert llid <= 0xffffff assert opcode <= 0x3f assert dbtm <= 0xffff assert data.bit_length() <= (length * 8) assert (sapid == 0x3d) or (sapid == 0x3f) duid = 0xc symbols = start_packet(nac, duid) # account for length of packet CRC appended to data length += 4 blocks = (length + 11) / 12 assert blocks <= 3 # make sure the data fill the blocks extra = (blocks * 12) - length assert extra == 0 packet_crc = crc_32(data, (length - 4) * 8) data <<= (4 * 8) data |= packet_crc numss = (56 + ((blocks + 1) * 98) + 34) / 35 ssyms = (ss,) * numss text_out("\tDUID = %01x\n" % duid) text_out("\tNAC = %03x\n" % nac) text_out("\tSSym =") for i in range(numss): text_out(" %d" % ssyms[i]) text_out("\n") text_out("\tCRC = %08x\n" % packet_crc) text_out("\tData = %x\n" % data) header = construct_ambth(io, sapid, mfid, llid, blocks, opcode, dbtm) symbols.extend(data_interleave(trellis_1_2_encode( split_dibits(header, 48)))) for i in range(blocks): text_out("\tMultiple Block Trunking Block:\n") ambt_block = (data >> (12 * 8 * (blocks - i - 1))) & 0xffffffffffffffffffffffffL text_out("\t\tData = %024x\n" % ambt_block) symbols.extend(data_interleave(trellis_1_2_encode( split_dibits(ambt_block, 48)))) text_out("\tSymbol data:\n") print_spec(insert_status(symbols, ssyms)) ############################# # construct special packets # ############################# # These functions for specific control channel packets are provided to serve as # an example of how this software may be extended to support the crafting of # various data packets. Limited testing has been done for most of these # because the specifications do not include test vectors for them. Those that # have been given command line options and include example usage in the # comments below have been tested successfully as conventional control messages # (see TIA-102.AABG) in live operation. # Radio Unit Monitor example usage: # p25craft.py --rum --src 0xfffffd --dst 0x000001 --ss 1 def construct_rad_mon_cmd(nac, ss, src, dst): text_out("Special Packet: Radio Unit Monitor Command\n") opcode = 0x1d # Radio Unit Monitor Command txtime = 0x00 # transmit time in seconds sm = 0 # silent mode txmult = 0x3 # multiply radio's configured tx duration args = txtime << 56 args |= sm << 55 args |= txmult << 48 args |= src << 24 args |= dst construct_tsdu(nac, ss, 1, 0x00, opcode, args) def construct_ack_rsp_fne(nac, ss, svctype, src, dst): text_out("Special Packet: Acknowledge Response - FNE\n") # hard coded example parameters aiv = 0 ex = 0 wacnid = 0x00000 sysid = 0x000 opcode = 0x20 # Acknowledge Response - FNE if ex == 1: addtlinfo = wacnid << 12 addtlinfo |= sysid else: addtlinfo = src args = aiv << 63 # Additional Information Valid Flag args |= ex << 62 # extended address args |= svctype << 56 args |= addtlinfo << 24 args |= dst construct_tsdu(nac, ss, 1, 0x00, opcode, args) def construct_ack_rsp_u(nac, ss, src, dst): text_out("Special Packet: Acknowledge Response - Unit\n") opcode = 0x20 # Acknowledge Response - Unit args = svctype << 56 args |= dst << 24 args |= src construct_tsdu(nac, ss, 1, 0x00, opcode, args) def construct_rad_mon_req(nac, ss, src, dst): text_out("Special Packet: Radio Unit Monitor Request\n") opcode = 0x1d # Radio Unit Monitor Request txtime = 0x00 # transmit time in seconds sm = 0 # silent mode txmult = 0x3 # multiply radio's configured tx duration args = txtime << 56 args |= sm << 55 args |= txmult << 48 args |= dst << 24 args |= src construct_tsdu(nac, ss, 1, 0x00, opcode, args) def construct_call_alrt_req(nac, ss, src, dst): text_out("Special Packet: Call Alert Request\n") opcode = 0x1f # Call Alert Request args = dst << 24 args |= src construct_tsdu(nac, ss, 1, 0x00, opcode, args) def construct_can_srv_req(nac, ss, svctype, src, dst): text_out("Special Packet: Cancel Service Request\n") # hard coded example parameters aiv = 1 reason = 0x00 # no reason code addtlinfo = dst opcode = 0x23 # Cancel Service Request args = aiv << 63 # Additional Information Valid Flag args |= svctype << 56 args |= reason << 48 args |= addtlinfo << 24 args |= src construct_tsdu(nac, ss, 1, 0x00, opcode, args) def construct_emrg_alrm_req(nac, ss, src, tgid): text_out("Special Packet: Emergency Alarm Request\n") opcode = 0x27 # Emergency Alarm Request args = tgid << 24 args |= src construct_tsdu(nac, ss, 1, 0x00, opcode, args) # It seems to make more sense to use EXT_FNCT_RSP for the following command # ACKs, but they are included here in EXT_FNCT_CMD form as well because the # spec appears to allow it. def construct_ext_fnct_cmd_class0(nac, ss, operand, src, dst): ef = construct_ef(0x00, operand, src) opcode = 0x24 # Extended Function Command arg = ef << 24 arg |= dst construct_tsdu(nac, ss, 1, 0x00, opcode, arg) # Inhibit example usage: # p25craft.py --inhibit --src 0xfffffd --dst 0x000001 --ss 1 def construct_ext_fnct_cmd_inhibit(nac, ss, src, dst): text_out("Special Packet: Extended Function Command - Inhibit\n") operand = 0x7f # Radio Inhibit construct_ext_fnct_cmd_class0(nac, ss, operand, src, dst) def construct_ext_fnct_cmd_inhibit_ack(nac, ss, src, dst): text_out("Special Packet: Extended Function Command - Inhibit ACK\n") operand = 0xff # Radio Inhibit ACK construct_ext_fnct_cmd_class0(nac, ss, operand, src, dst) # Uninhibit example usage: # p25craft.py --uninhibit --src 0xfffffd --dst 0x000001 --ss 1 def construct_ext_fnct_cmd_uninhibit(nac, ss, src, dst): text_out("Special Packet: Extended Function Command - Uninhibit\n") operand = 0x7e # Radio Uninhibit construct_ext_fnct_cmd_class0(nac, ss, operand, src, dst) def construct_ext_fnct_cmd_uninhibit_ack(nac, ss, src, dst): text_out("Special Packet: Extended Function Command - Uninhibit ACK\n") operand = 0xfe # Radio Uninhibit ACK construct_ext_fnct_cmd_class0(nac, ss, operand, src, dst) def construct_ext_fnct_cmd_check(nac, ss, src, dst): text_out("Special Packet: Extended Function Command - Radio Check\n") operand = 0x00 # Radio Check construct_ext_fnct_cmd_class0(nac, ss, operand, src, dst) def construct_ext_fnct_cmd_check_ack(nac, ss, src, dst): text_out("Special Packet: Extended Function Command - Radio Check ACK\n") operand = 0x80 # Radio Check ACK construct_ext_fnct_cmd_class0(nac, ss, operand, src, dst) # It seems to make more sense to use EXT_FNCT_CMD for the following commands, # but they are included here in EXT_FNCT_RSP form as well because the spec # appears to allow it. def construct_ext_fnct_rsp_class0(nac, ss, operand, src, dst): ef = construct_ef(0x00, operand, dst) opcode = 0x24 # Extended Function Response arg = ef << 24 arg |= src construct_tsdu(nac, ss, 1, 0x00, opcode, arg) def construct_ext_fnct_rsp_inhibit(nac, ss, src, dst): text_out("Special Packet: Extended Function Command - Inhibit\n") operand = 0x7f # Radio Inhibit construct_ext_fnct_rsp_class0(nac, ss, operand, src, dst) def construct_ext_fnct_rsp_inhibit_ack(nac, ss, src, dst): text_out("Special Packet: Extended Function Command - Inhibit ACK\n") operand = 0xff # Radio Inhibit ACK construct_ext_fnct_rsp_class0(nac, ss, operand, src, dst) def construct_ext_fnct_rsp_uninhibit(nac, ss, src, dst): text_out("Special Packet: Extended Function Command - Uninhibit\n") operand = 0x7e # Radio Uninhibit construct_ext_fnct_rsp_class0(nac, ss, operand, src, dst) def construct_ext_fnct_rsp_uninhibit_ack(nac, ss, src, dst): text_out("Special Packet: Extended Function Command - Uninhibit ACK\n") operand = 0xfe # Radio Uninhibit ACK construct_ext_fnct_rsp_class0(nac, ss, operand, src, dst) def construct_ext_fnct_rsp_check(nac, ss, src, dst): text_out("Special Packet: Extended Function Command - Radio Check\n") operand = 0x00 # Radio Check construct_ext_fnct_rsp_class0(nac, ss, operand, src, dst) def construct_ext_fnct_rsp_check_ack(nac, ss, src, dst): text_out("Special Packet: Extended Function Command - Radio Check ACK\n") operand = 0x80 # Radio Check ACK construct_ext_fnct_rsp_class0(nac, ss, operand, src, dst) ######## # main # ######## if __name__ == "__main__": from optparse import OptionParser parser = OptionParser() parser.add_option("--hdu", action="store_true", dest="hdu", default=False, help="Construct Header Data Unit") parser.add_option("--ldu1", action="store_true", dest="ldu1", default=False, help="Construct Logical Link Data Unit 1") parser.add_option("--ldu2", action="store_true", dest="ldu2", default=False, help="Construct Logical Link Data Unit 2") parser.add_option("--stdu", action="store_true", dest="stdu", default=False, help="Construct Simple Terminator Data Unit") parser.add_option("--xtdu", action="store_true", dest="xtdu", default=False, help="Construct Terminator Data Unit with Link Control") parser.add_option("--tsdu", action="store_true", dest="tsdu", default=False, help="Construct Trunking Signaling Data Unit") parser.add_option("--cpdu", action="store_true", dest="cpdu", default=False, help="Construct Confirmed Packet Data Unit") parser.add_option("--rpdu", action="store_true", dest="rpdu", default=False, help="Construct Response Packet Data Unit") parser.add_option("--updu", action="store_true", dest="updu", default=False, help="Construct Unconfirmed Packet Data Unit") parser.add_option("--ambt", action="store_true", dest="ambt", default=False, help="Construct Alternative Multiple Block Trunking Conrtol Packet") parser.add_option("--inhibit", action="store_true", dest="inhibit", default=False, help="Special packet: Inhibit") parser.add_option("--uninhibit", action="store_true", dest="uninhibit", default=False, help="Special packet: Uninhibit") parser.add_option("--rum", action="store_true", dest="rum", default=False, help="Special packet: Radio Unit Monitor") parser.add_option("--superframes", type="int", default=0, help="Number of superframes to construct (Default: 0)") parser.add_option("--sqtail", type="string", default="0:0:0", help="Squelch tail (Default: 0:0:0)") parser.add_option("--nac", type="int", default=0x293, help="Network Access Code (Default: 0x293)") parser.add_option("--ss", type="int", default=0, help="Status Symbol (one value to be repeated) (Default: 0)") parser.add_option("--mi", type="int", default=0, help="Message Indicator (Default: 0x000000000000000000)") parser.add_option("--mfid", type="int", default=0, help="Manufacturer ID (Default: 0x00)") parser.add_option("--algid", type="int", default=0x80, help="Algorithm ID (Default: 0x80)") parser.add_option("--kid", type="int", default=0, help="Key ID (Default: 0x0000)") parser.add_option("--tgid", type="int", default=1, help="Talk Group ID (Default: 0x0001)") parser.add_option("--lco", type="int", default=0, help="Link Control Opcode (Default: 0x00)") parser.add_option("--src", type="int", default=1, help="Source ID (Default: 0x000001)") parser.add_option("--dst", type="int", default=1, help="Destination ID (Default: 0x000001)") parser.add_option("--lsd", type="int", default=0, help="Entire Low Speed Data Word (Default: 0x00000000)") parser.add_option("--lsd1", type="int", default=None, help="Low Speed Data high word for LDU1 (Default: use --lsd)") parser.add_option("--lsd2", type="int", default=None, help="Low Speed Data low word for LDU2 (Default: use --lsd)") parser.add_option("--ntsbk", type="int", default=1, help="Number of TSBKs per TSDU (Default: 1)") parser.add_option("--data", type="int", default=0, help="Packet data (Default: 0x0000000000000000)") parser.add_option("--length", type="int", default=0, help="Packet data length in bytes (Default: auto)") parser.add_option("--sapid", type="int", default=0, help="SAP ID (Default: 0x00)") parser.add_option("--opcode", type="int", default=0, help="Opcode (Default: 0x00)") parser.add_option("--dbtm", type="int", default=0, help="Defined By Trunking Message AMBT field (Default: 0x0000)") parser.add_option("--dho", type="int", default=0, help="Data Header Offset (Default: 0x00)") parser.add_option("--rclass", type="int", default=0, help="Response Class (Default: 0x0)") parser.add_option("--rtype", type="int", default=0, help="Response Type (Default: 0x0)") parser.add_option("--rstatus", type="int", default=0, help="Response Status (Default: 0x0)") parser.add_option("--ns", type="int", default=0, help="N(S) sequence number (Default: 0x0)") parser.add_option("--arg", type="int", default=0, help="Trunking Control Opcode Argument (Default: 0x0000000000000000)") parser.add_option("--svcopt", type="int", default=None, help="Service Options byte (Default: use --pri, --emerg, etc.)") parser.add_option("-e", "--emerg", action="store_true", dest="emerg", default=False, help="Emergency Bit (Default: false)") parser.add_option("-p", "--protected", action="store_true", dest="protected", default=False, help="Protected Bit (Default: false)") parser.add_option("-d", "--duplex", action="store_true", dest="duplex", default=False, help="Duplex Bit (Default: false)") parser.add_option("-m", "--mode", action="store_true", dest="mode", default=False, help="Mode Bit (Default: false)") parser.add_option("-r", "--reserved", action="store_true", dest="reserved", default=False, help="Reserved Bit (Default: false)") parser.add_option("--pri", type="int", default=0, help="Priority Level (Default: 0") parser.add_option("-t", "--1011", action="store_true", dest="hz1011", default=False, help="1011 Hz test tone (Default: false)") parser.add_option("-s", "--silence", action="store_true", dest="silence", default=False, help="audio silence (Default: false)") parser.add_option("-o", "--output-file", type="string", default=None, help="Binary output file (Default: None)") parser.add_option("-q", "--quiet", action="store_true", dest="quiet", default=False, help="Supress text output (Default: false)") parser.add_option("-f", "--flip", action="store_true", dest="flip", default=False, help="invert frequency deviations (Default: false)") parser.add_option("-l", "--late-entry", action="store_true", dest="late", default=False, help="Simulate late receiver entry (Default: false)") parser.add_option("-i", "--inbound", action="store_true", dest="inbound", default=False, help="Inbound packet (Default: false (outbound))") (options, args) = parser.parse_args() assert options.nac <= 0xfff assert options.ss <= 0b11 assert options.mi <= 0xffffffffffffffffffL assert options.mfid <= 0xff assert options.algid <= 0xff assert options.kid <= 0xffff assert options.tgid <= 0xffff assert options.lco <= 0x3f assert options.src <= 0xffffff assert options.dst <= 0xffffff assert options.lsd <= 0xffffffff assert options.lsd1 <= 0xffff assert options.lsd2 <= 0xffff assert options.pri <= 0x7 assert options.svcopt <= 0xff assert options.ntsbk >= 1 assert options.ntsbk <= 3 assert options.sapid <= 0x3f assert options.opcode <= 0x3f assert options.dbtm <= 0xffff assert options.dho <= 0x3f assert options.rclass <= 0x3 assert options.rtype <= 0x7 assert options.rstatus <= 0x7 assert options.ns <= 0x7 assert options.arg <= 0xffffffffffffffffL if options.quiet: quiet = True if options.output_file: if options.output_file == '-': outfile = sys.stdout quiet = True else: outfile = open(options.output_file, 'w') # set up inversion of frequency deviations if options.flip: flip = 0xaaa # squelch tail argument is in the form M:N:K # M = number of sTDUs to prepend # N = number of xTDUs to append # K = number of sTDUs to append sqtail = options.sqtail.split(':') if len(sqtail) != 3: sys.stderr.write("--sqtail must be in the form M:N:K\n") sys.exit(1) sq_m = int(sqtail[0]) sq_n = int(sqtail[1]) sq_k = int(sqtail[2]) # The Logical Link ID field should be set to the source address for inbound # packets and the destination address for outbound packets. if options.inbound: llid = options.src else: llid = options.dst # Auto-detect the length in bytes of --data. The --length option overrides # this, which is useful when you want leading zeros. if (options.data > 0) and (options.length == 0): options.length = (options.data.bit_length() + 7) / 8 # make sure we have IMBE data if we need it if options.ldu1 or options.ldu2 or options.superframes: if options.hz1011: options.imbe = 0x38928490d433c0be1b91844ff058a589d839 elif options.silence: options.imbe = 0x6c42e85de2e8269363d981f9be23b18ae004 else: sys.stderr.write("error: must specify --1011 or --silence\n") sys.exit(1) # handle alternative way Low Speed Data may be specified if options.lsd1: options.lsd &= 0x0000ffff options.lsd |= options.lsd1 << 16 if options.lsd2: options.lsd &= 0xffff0000 options.lsd |= options.lsd2 # Build the Service Options field first so that it can be used in # various places. if options.superframes or options.ldu1 or options.xtdu or sq_n > 0: if not options.svcopt: options.svcopt = construct_svcopt(options.emerg, options.protected, options.duplex, options.mode, options.reserved, options.pri) # prepend sTDUs (see sqtail) for i in range(sq_m): construct_stdu(options.nac, 1) if options.superframes: if not options.late: construct_hdu(options.nac, options.ss, options.mi, options.mfid, options.algid, options.kid, options.tgid) for i in range(options.superframes): construct_ldu1(options.nac, options.ss, options.imbe, options.lsd, options.lco, options.mfid, options.svcopt, 0, options.tgid, options.dst, options.src) construct_ldu2(options.nac, options.ss, options.imbe, options.lsd, options.mi, options.algid, options.kid) construct_stdu(options.nac, options.ss) else: if options.hdu: construct_hdu(options.nac, options.ss, options.mi, options.mfid, options.algid, options.kid, options.tgid) elif options.ldu1: construct_ldu1(options.nac, options.ss, options.imbe, options.lsd, options.lco, options.mfid, options.svcopt, 0, options.tgid, options.dst, options.src) elif options.ldu2: construct_ldu2(options.nac, options.ss, options.imbe, options.lsd, options.mi, options.algid, options.kid) elif options.stdu: construct_stdu(options.nac, options.ss) elif options.xtdu: construct_xtdu(options.nac, options.ss, options.lco, options.mfid, options.svcopt, 0, options.tgid, options.dst, options.src) elif options.tsdu: construct_tsdu(options.nac, options.ss, options.ntsbk, options.mfid, options.opcode, options.arg) elif options.cpdu: construct_cpdu(options.nac, options.ss, options.data, options.length, 1, options.inbound, options.sapid, options.mfid, llid, options.ns, 8, options.dho) elif options.rpdu: construct_rpdu(options.nac, options.ss, options.data, options.length, options.inbound, options.rclass, options.rtype, options.rstatus, options.mfid, llid, 1, 0) elif options.updu: construct_updu(options.nac, options.ss, options.data, options.length, options.inbound, options.sapid, options.mfid, llid, options.dho) elif options.ambt: construct_ambt(options.nac, options.ss, options.data, options.length, options.inbound, options.sapid, options.mfid, llid, options.opcode, options.dbtm) elif options.inhibit: construct_ext_fnct_cmd_inhibit(options.nac, options.ss, options.src, options.dst) elif options.uninhibit: construct_ext_fnct_cmd_uninhibit(options.nac, options.ss, options.src, options.dst) elif options.rum: construct_rad_mon_cmd(options.nac, options.ss, options.src, options.dst) else: sys.stderr.write("error: must specify packet type or number of superframes (try -h or --help)\n") # append xTDUs (see sqtail) for i in range(sq_n): construct_xtdu(options.nac, 3, options.lco, options.mfid, options.svcopt, 0, options.tgid, options.dst, options.src) # append sTDUs (see sqtail) for i in range(sq_k): construct_stdu(options.nac, 3) if outfile: outfile.close()