c7cee4331f
Eventually, -Tps will not force -V, and will print summaries when -V is not selected. However, work still has to be done there. svn path=/trunk/; revision=9218
325 lines
8.9 KiB
Python
Executable file
325 lines
8.9 KiB
Python
Executable file
#!/usr/bin/env python
|
|
"""
|
|
Process packet capture files and produce a nice HTML
|
|
report of MSN Chat sessions.
|
|
|
|
Copyright (c) 2003 by Gilbert Ramirez <gram@alumni.rice.edu>
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; either version 2
|
|
of the License, or (at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import array
|
|
import string
|
|
import EtherealXML
|
|
import getopt
|
|
|
|
# By default we output the HTML to stdout
|
|
out_fh = sys.stdout
|
|
|
|
class MSNMessage:
|
|
pass
|
|
|
|
class MSN_MSG(MSNMessage):
|
|
def __init__(self, timestamp, user, message):
|
|
self.timestamp = timestamp
|
|
self.user = user
|
|
self.message = message
|
|
|
|
|
|
class Conversation:
|
|
"""Keeps track of a single MSN chat session"""
|
|
|
|
re_MSG_out = re.compile("MSG (?P<TrID>\d+) (?P<ACKTYPE>[UNA]) (?P<len>\d+)")
|
|
re_MSG_in = re.compile("MSG (?P<user>\S+)@(?P<domain>\S+) (?P<alias>\S+) (?P<len>\d+)")
|
|
|
|
USER_NOT_FOUND = -1
|
|
DEFAULT_USER = None
|
|
|
|
|
|
DEFAULT_USER_COLOR = "#0000ff"
|
|
USER_COLORS = [ "#ff0000", "#00ff00",
|
|
"#800000", "#008000", "#000080" ]
|
|
|
|
DEFAULT_USER_TEXT_COLOR = "#000000"
|
|
USER_TEXT_COLOR = "#000080"
|
|
|
|
def __init__(self):
|
|
self.packets = []
|
|
self.messages = []
|
|
|
|
def AddPacket(self, packet):
|
|
self.packets.append(packet)
|
|
|
|
def Summarize(self):
|
|
for packet in self.packets:
|
|
msg = self.CreateMSNMessage(packet)
|
|
if msg:
|
|
self.messages.append(msg)
|
|
else:
|
|
#XXX
|
|
pass
|
|
|
|
|
|
def CreateMSNMessage(self, packet):
|
|
msnms = packet.get_items("msnms")[0]
|
|
|
|
# Check the first line in the msnms transmission for the user
|
|
child = msnms.children[0]
|
|
user = self.USER_NOT_FOUND
|
|
|
|
m = self.re_MSG_out.search(child.show)
|
|
if m:
|
|
user = self.DEFAULT_USER
|
|
|
|
else:
|
|
m = self.re_MSG_in.search(child.show)
|
|
if m:
|
|
user = m.group("alias")
|
|
|
|
if user == self.USER_NOT_FOUND:
|
|
print >> sys.stderr, "No match for", child.show
|
|
sys.exit(1)
|
|
return None
|
|
|
|
msg = ""
|
|
|
|
i = 5
|
|
check_trailing = 0
|
|
if len(msnms.children) > 5:
|
|
check_trailing = 1
|
|
|
|
while i < len(msnms.children):
|
|
msg += msnms.children[i].show
|
|
if check_trailing:
|
|
j = msg.find("MSG ")
|
|
if j >= 0:
|
|
msg = msg[:j]
|
|
i += 5
|
|
else:
|
|
i += 6
|
|
else:
|
|
i += 6
|
|
|
|
timestamp = packet.get_items("frame.time")[0].get_show()
|
|
i = timestamp.rfind(".")
|
|
timestamp = timestamp[:i]
|
|
|
|
return MSN_MSG(timestamp, user, msg)
|
|
|
|
def MsgToHTML(self, text):
|
|
bytes = array.array("B")
|
|
|
|
new_string = text
|
|
i = new_string.find("\\")
|
|
|
|
while i > -1:
|
|
# At the end?
|
|
if i == len(new_string) - 1:
|
|
# Just let the default action
|
|
# copy everything to 'bytes'
|
|
break
|
|
|
|
if new_string[i+1] in string.digits:
|
|
left = new_string[:i]
|
|
bytes.fromstring(left)
|
|
|
|
right = new_string[i+4:]
|
|
|
|
oct_string = new_string[i+1:i+4]
|
|
char = int(oct_string, 8)
|
|
bytes.append(char)
|
|
|
|
new_string = right
|
|
|
|
# ignore \r and \n
|
|
elif new_string[i+1] in "rn":
|
|
copy_these = new_string[:i]
|
|
bytes.fromstring(copy_these)
|
|
new_string = new_string[i+2:]
|
|
|
|
else:
|
|
copy_these = new_string[:i+2]
|
|
bytes.fromstring(copy_these)
|
|
new_string = new_string[i+2:]
|
|
|
|
i = new_string.find("\\")
|
|
|
|
|
|
bytes.fromstring(new_string)
|
|
|
|
return bytes
|
|
|
|
def CreateHTML(self, default_user):
|
|
if not self.messages:
|
|
return
|
|
|
|
print >> out_fh, """
|
|
<HR><BR><H3 Align=Center> ---- New Conversation @ %s ----</H3><BR>""" \
|
|
% (self.messages[0].timestamp)
|
|
|
|
user_color_assignments = {}
|
|
|
|
for msg in self.messages:
|
|
# Calculate 'user' and 'user_color' and 'user_text_color'
|
|
if msg.user == self.DEFAULT_USER:
|
|
user = default_user
|
|
user_color = self.DEFAULT_USER_COLOR
|
|
user_text_color = self.DEFAULT_USER_TEXT_COLOR
|
|
else:
|
|
user = msg.user
|
|
user_text_color = self.USER_TEXT_COLOR
|
|
if user_color_assignments.has_key(user):
|
|
user_color = user_color_assignments[user]
|
|
else:
|
|
num_assigned = len(user_color_assignments.keys())
|
|
user_color = self.USER_COLORS[num_assigned]
|
|
user_color_assignments[user] = user_color
|
|
|
|
# "Oct 6, 2003 21:45:25" --> "21:45:25"
|
|
timestamp = msg.timestamp.split()[-1]
|
|
|
|
htmlmsg = self.MsgToHTML(msg.message)
|
|
|
|
print >> out_fh, """
|
|
<FONT COLOR="%s"><FONT SIZE="2">(%s) </FONT><B>%s:</B></FONT> <FONT COLOR="%s">""" \
|
|
% (user_color, timestamp, user, user_text_color)
|
|
|
|
htmlmsg.tofile(out_fh)
|
|
|
|
print >> out_fh, "</FONT><BR>"
|
|
|
|
|
|
class CaptureFile:
|
|
"""Parses a single a capture file and keeps track of
|
|
all chat sessions in the file."""
|
|
|
|
def __init__(self, capture_filename, tethereal):
|
|
"""Run tethereal on the capture file and parse
|
|
the data."""
|
|
self.conversations = []
|
|
self.conversations_map = {}
|
|
|
|
pipe = os.popen(tethereal + " -Tpdml -n -R "
|
|
"'msnms contains \"X-MMS-IM-Format\"' "
|
|
"-r " + capture_filename, "r")
|
|
|
|
EtherealXML.parse_fh(pipe, self.collect_packets)
|
|
|
|
for conv in self.conversations:
|
|
conv.Summarize()
|
|
|
|
def collect_packets(self, packet):
|
|
"""Collect the packets passed back from EtherealXML.
|
|
Sort them by TCP/IP conversation, as there could be multiple
|
|
clients per machine."""
|
|
src_ip = packet.get_items("ip.src")[-1].get_show()
|
|
dst_ip = packet.get_items("ip.dst")[-1].get_show()
|
|
src_tcp = packet.get_items("tcp.srcport")[-1].get_show()
|
|
dst_tcp = packet.get_items("tcp.dstport")[-1].get_show()
|
|
|
|
key_params = [src_ip, dst_ip, src_tcp, dst_tcp]
|
|
key_params.sort()
|
|
key = '|'.join(key_params)
|
|
|
|
if not self.conversations_map.has_key(key):
|
|
conv = self.conversations_map[key] = Conversation()
|
|
self.conversations.append(conv)
|
|
else:
|
|
conv = self.conversations_map[key]
|
|
|
|
conv.AddPacket(packet)
|
|
|
|
|
|
def CreateHTML(self, default_user):
|
|
if not self.conversations:
|
|
return
|
|
|
|
for conv in self.conversations:
|
|
conv.CreateHTML(default_user)
|
|
|
|
|
|
def run_filename(filename, default_user, tethereal):
|
|
"""Process one capture file."""
|
|
|
|
capture = CaptureFile(filename, tethereal)
|
|
capture.CreateHTML(default_user)
|
|
|
|
|
|
def run(filenames, default_user, tethereal):
|
|
# HTML Header
|
|
print >> out_fh, """
|
|
<HTML><TITLE>MSN Conversation</TITLE>
|
|
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
|
|
<BODY>
|
|
"""
|
|
for filename in filenames:
|
|
run_filename(filename, default_user, tethereal)
|
|
|
|
# HTML Footer
|
|
print >> out_fh, """
|
|
<HR>
|
|
</BODY>
|
|
</HTML>
|
|
"""
|
|
|
|
|
|
def usage():
|
|
print >> sys.stderr, "msnchat [OPTIONS] CAPTURE_FILE [...]"
|
|
print >> sys.stderr, " -o FILE name of output file"
|
|
print >> sys.stderr, " -t TETHEREAL location of tethereal binary"
|
|
print >> sys.stderr, " -u USER name for unknown user"
|
|
sys.exit(1)
|
|
|
|
def main():
|
|
default_user = "Unknown"
|
|
tethereal = "tethereal"
|
|
|
|
optstring = "ho:t:u:"
|
|
longopts = ["help"]
|
|
|
|
try:
|
|
opts, args = getopt.getopt(sys.argv[1:], optstring, longopts)
|
|
except getopt.GetoptError:
|
|
usage()
|
|
|
|
for opt, arg in opts:
|
|
if opt == "-h" or opt == "--help":
|
|
usage()
|
|
|
|
elif opt == "-o":
|
|
filename = arg
|
|
global out_fh
|
|
try:
|
|
out_fh = open(filename, "w")
|
|
except IOError:
|
|
sys.exit("Could not open %s for writing." % (filename,))
|
|
|
|
elif opt == "-u":
|
|
default_user = arg
|
|
|
|
elif opt == "-t":
|
|
tethereal = arg
|
|
|
|
else:
|
|
sys.exit("Unhandled command-line option: " + opt)
|
|
|
|
run(args, default_user, tethereal)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|