Added support for normal packets and reduced code duplication.

This commit is contained in:
lcdr 2015-06-14 12:16:57 +02:00
parent bfa76cf704
commit c4c0bad416

View File

@ -1,3 +1,4 @@
import os
import sqlite3
import tkinter.filedialog as filedialog
import xml.etree.ElementTree as ET
@ -5,8 +6,9 @@ import zipfile
from collections import OrderedDict
from ctypes import c_float, c_int, c_int64, c_ubyte, c_uint, c_ushort
from tkinter import BooleanVar, BOTH, END, HORIZONTAL, Menu, Tk
from tkinter.font import nametofont
from tkinter.scrolledtext import ScrolledText
from tkinter.ttk import Frame, PanedWindow, Treeview
from tkinter.ttk import Frame, PanedWindow, Style, Treeview
import structparser
@ -41,18 +43,51 @@ component_name[64] = None
component_name[73] = None
comp_ids = list(component_name.keys())
parser = {}
comp_parser = {}
for key, value in component_name.items():
if value is not None:
with open("packetdefinitions/replica/components/"+value+".structs") as file:
parser[key] = structparser.StructParser(file.read())
comp_parser[key] = structparser.StructParser(file.read())
norm_parser = {}
for rootdir, _, files in os.walk("packetdefinitions"):
for filename in files:
with open(rootdir+"/"+filename) as file:
norm_parser[filename[:filename.rindex(".")]] = structparser.StructParser(file.read())
break
class ParserOutput:
def __init__(self):
self.text = ""
self.tag = "normal"
def __enter__(self):
pass
def __exit__(self, exc_type, exc_value, traceback):
if exc_type is not None:
if exc_type == AssertionError:
exc_name = "ASSERTION FAILED"
self.tag = "assertfail"
elif exc_type == IndexError:
exc_name = "READ ERROR"
self.tag = "readerror"
self.text = exc_name+" "+str(exc_value)+"\n"+self.text
return True
def append(self, structs):
for level, description, value, unexpected in structs:
if unexpected:
self.text += "UNEXPECTED: "
self.tag = "unexpected"
self.text += "\t"*level+description+": "+str(value)+"\n"
class CaptureObject:
def __init__(self, network_id=None, object_id=None):
def __init__(self, network_id=None, object_id=None, lot=None):
self.network_id = network_id
self.object_id = object_id
self.lot = lot
self.entry = None
self.comp_parsers = []
class CaptureExplorer(Frame):
def __init__(self, db_path, gamemessages_path, master=None):
@ -69,6 +104,7 @@ class CaptureExplorer(Frame):
self.parse_creations = BooleanVar(value=True)
self.parse_serializations = BooleanVar(value=True)
self.parse_game_messages = BooleanVar(value=True)
self.parse_normal_packets = BooleanVar(value=True)
self.create_widgets()
def create_widgets(self):
@ -78,8 +114,8 @@ class CaptureExplorer(Frame):
parse_menu.add_checkbutton(label="Parse Creations", variable=self.parse_creations)
parse_menu.add_checkbutton(label="Parse Serializations", variable=self.parse_serializations)
parse_menu.add_checkbutton(label="Parse Game Messages", variable=self.parse_game_messages)
parse_menu.add_checkbutton(label="Parse Normal Packets", variable=self.parse_normal_packets)
menubar.add_cascade(label="Parse", menu=parse_menu)
menubar.add_command(label="Reload struct definitions", command=self.reload_parsers)
self.master.config(menu=menubar)
pane = PanedWindow(orient=HORIZONTAL)
@ -89,25 +125,18 @@ class CaptureExplorer(Frame):
self.tree = Treeview(columns=columns)
for col in columns:
self.tree.heading(col, text=col, command=(lambda col: lambda: self.sort_column(col, False))(col))
self.tree.tag_configure("normal", font="0")
self.tree.tag_configure("unexpected", font="0", foreground="medium blue")
self.tree.tag_configure("assertfail", font="0", foreground="orange")
self.tree.tag_configure("readerror", font="0", background="medium purple")
self.tree.tag_configure("error", font="0", foreground="red")
self.tree.tag_configure("normal")
self.tree.tag_configure("unexpected", foreground="medium blue")
self.tree.tag_configure("assertfail", foreground="orange")
self.tree.tag_configure("readerror", background="medium purple")
self.tree.tag_configure("error", foreground="red")
self.tree.bind("<<TreeviewSelect>>", self.on_item_click)
pane.add(self.tree)
self.item_inspector = ScrolledText(font="0", tabs="4m")
self.item_inspector = ScrolledText(font="TkDefaultFont", tabs="4m")
self.item_inspector.insert(END, "Select an item to inspect it.")
pane.add(self.item_inspector)
def reload_parsers(self):
for key, value in component_name.items():
if value is not None:
with open("packetdefinitions/replica/components/"+value+".structs") as file:
parser[key].__init__(file.read())
def askopenfiles(self):
files = filedialog.askopenfilenames(filetypes=[("Zip", "*.zip")])
if files:
@ -126,7 +155,40 @@ class CaptureExplorer(Frame):
print("Parsing creations")
creations = [i for i in files if "[24]" in i]
for packet_name in creations:
lot = int(packet_name[packet_name.index("(")+1:packet_name.index(")")])
packet = BitStream(capture.read(packet_name))
self.parse_creation(packet_name, packet)
if self.parse_serializations.get():
print("Parsing serializations")
serializations = [i for i in files if "[27]" in i]
for packet_name in serializations:
packet = BitStream(capture.read(packet_name)[1:])
self.parse_serialization_packet(packet_name, packet)
if self.parse_game_messages.get():
print("Parsing game messages")
game_messages = [i for i in files if "[53-05-00-0c]" in i or "[53-04-00-05]" in i]
for packet_name in game_messages:
packet = BitStream(capture.read(packet_name)[8:])
self.parse_game_message(packet_name, packet)
if self.parse_normal_packets.get():
print("Parsing normal packets")
packets = [i for i in files if "[24]" not in i and "[27]" not in i and "[53-05-00-0c]" not in i and "[53-04-00-05]" not in i]
for packet_name in packets:
packet = BitStream(capture.read(packet_name))
self.parse_normal_packet(packet_name, packet)
def parse_creation(self, packet_name, packet):
packet.skip_read(1)
has_network_id = packet.read(c_bit)
assert has_network_id
network_id = packet.read(c_ushort)
object_id = packet.read(c_int64)
for obj in self.objects:
if obj.object_id == object_id: # We've already parsed this object (can happen due to ghosting)
return
lot = packet.read(c_int)
if lot not in self.lot_data:
try:
lot_name = self.sqlite.execute("select name from Objects where id == "+str(lot)).fetchone()[0]
@ -139,7 +201,7 @@ class CaptureExplorer(Frame):
component_types.sort(key=comp_ids.index)
for comp_type in component_types:
if component_name[comp_type] is not None:
parsers.append((component_name[comp_type], parser[comp_type]))
parsers.append((component_name[comp_type], comp_parser[comp_type]))
except ValueError as e:
error = "ERROR: Unknown component "+str(e.args[0].split()[0])+" "+str(component_types)
else:
@ -147,75 +209,31 @@ class CaptureExplorer(Frame):
self.lot_data[lot] = lot_name, parsers, error
else:
lot_name, parsers, error = self.lot_data[lot]
packet = BitStream(capture.read(packet_name))
self.parse_creation(packet_name, packet, lot_name, parsers, error)
if self.parse_serializations.get():
print("Parsing serializations")
serializations = [i for i in files if "[27]" in i]
for packet_name in serializations:
packet = BitStream(capture.read(packet_name)[1:])
self.parse_serialization(packet_name, packet)
if self.parse_game_messages.get():
print("Parsing game messages")
game_messages = [i for i in files if "[53-05-00-0c]" in i or "[53-04-00-05" in i]
for packet_name in game_messages:
packet = BitStream(capture.read(packet_name)[8:])
self.parse_game_message(packet_name, packet)
def parse_creation(self, packet_name, packet, lot_name, parsers, error):
packet.skip_read(1)
has_network_id = packet.read(c_bit)
assert has_network_id
network_id = packet.read(c_ushort)
object_id = packet.read(c_int64)
for obj in self.objects:
if obj.object_id == object_id: # We've already parsed this object (can happen due to ghosting)
return
packet.skip_read(4)
id_ = packet.read(str, length_type=c_ubyte) + " " + lot_name
packet._read_offset = 0
parser_output = ""
tag = "normal"
try:
for level, description, value, unexpected in creation_header_parser.parse(packet):
if unexpected:
parser_output += "UNEXPECTED: "
tag = "unexpected"
parser_output += "\t"*level+description+": "+str(value)+"\n"
for level, description, value, unexpected in serialization_header_parser.parse(packet):
if unexpected:
parser_output += "UNEXPECTED: "
tag = "unexpected"
parser_output += "\t"*level+description+": "+str(value)+"\n"
if error:
parser_output = error+"\n"+parser_output
tag = "error"
parser_output = ParserOutput()
with parser_output:
parser_output.append(creation_header_parser.parse(packet))
if error is not None:
parser_output.text = error+"\n"+parser_output.text
parser_output.tag = "error"
else:
self.parse_serialization(packet, parser_output, parsers, is_creation=True)
obj = CaptureObject(network_id=network_id, object_id=object_id, lot=lot)
self.objects.append(obj)
obj.entry = self.tree.insert("", "end", text=packet_name, values=(id_, parser_output.text), tag=parser_output.tag)
@staticmethod
def parse_serialization(packet, parser_output, parsers, is_creation=False):
parser_output.append(serialization_header_parser.parse(packet))
for name, parser in parsers:
parser_output += "\n"+name+"\n\n"
for level, description, value, unexpected in parser.parse(packet, {"creation":True}):
if unexpected:
parser_output += "UNEXPECTED: "
tag = "unexpected"
parser_output += "\t"*level+description+": "+str(value)+"\n"
parser_output.text += "\n"+name+"\n\n"
parser_output.append(parser.parse(packet, {"creation":is_creation}))
if not packet.all_read():
raise IndexError("Not completely read")
except AssertionError as e:
parser_output = "ASSERTION FAILED "+str(e)+"\n"+parser_output
tag = "assertfail"
except IndexError as e:
parser_output = "READ ERROR "+str(e)+"\n"+parser_output
tag = "readerror"
obj = CaptureObject(network_id=network_id, object_id=object_id)
self.objects.append(obj)
obj.comp_parsers = parsers
obj.entry = self.tree.insert("", "end", text=packet_name, values=(id_, parser_output), tag=tag)
def parse_serialization(self, packet_name, packet):
def parse_serialization_packet(self, packet_name, packet):
network_id = packet.read(c_ushort)
obj = None
for j in self.objects:
@ -227,32 +245,20 @@ class CaptureExplorer(Frame):
self.objects.append(obj)
obj.entry = self.tree.insert("", "end", text="Unknown", values=("network_id="+str(network_id), ""), tag="normal")
tag = "normal"
parser_output = ""
try:
for level, description, value, unexpected in serialization_header_parser.parse(packet):
if unexpected:
parser_output += "UNEXPECTED: "
tag = "unexpected"
parser_output += "\t"*level+description+": "+str(value)+"\n"
if obj.lot is None:
parsers = []
error = "Unknown object"
else:
_, parsers, error = self.lot_data[obj.lot]
for name, parser in obj.comp_parsers:
parser_output += "\n"+name+"\n\n"
for level, description, value, unexpected in parser.parse(packet, {"creation":False}):
if unexpected:
parser_output += "UNEXPECTED: "
tag = "unexpected"
parser_output += "\t"*level+description+": "+str(value)+"\n"
if not packet.all_read():
raise IndexError("Not completely read")
except AssertionError as e:
parser_output = "ASSERTION FAILED "+str(e)+"\n"+parser_output
tag = "assertfail"
except IndexError as e:
parser_output = "READ ERROR "+str(e)+"\n"+parser_output
tag = "readerror"
self.tree.insert(obj.entry, "end", text=packet_name, values=("Note: If the creation packet has an error, the serialization packets will have one as well", parser_output), tag=tag)
parser_output = ParserOutput()
with parser_output:
self.parse_serialization(packet, parser_output, parsers)
if error is not None:
parser_output.tag = "error"
else:
error = ""
self.tree.insert(obj.entry, "end", text=packet_name, values=(error, parser_output.text), tag=parser_output.tag)
def parse_game_message(self, packet_name, packet):
object_id = packet.read(c_int64)
@ -306,23 +312,23 @@ class CaptureExplorer(Frame):
attrs = message.findall("attr")
attrs.sort(key=lambda x: x.get("name"))
vars = OrderedDict()
attr_values = OrderedDict()
if message.find("freeze") is not None or message.find("thaw") is not None:
# Custom serializations
if msg_name == "NotifyMissionTask":
vars["missionID"] = packet.read(c_int)
vars["taskMask"] = packet.read(c_int)
attr_values["missionID"] = packet.read(c_int)
attr_values["taskMask"] = packet.read(c_int)
updates = []
for _ in range(packet.read(c_ubyte)):
updates.append(packet.read(c_float))
vars["updates"] = updates
attr_values["updates"] = updates
elif msg_name == "RequestLinkedMission":
vars["playerID"] = packet.read(c_int64)
vars["missionID"] = packet.read(c_int)
vars["bMissionOffered"] = packet.read(c_bit)
attr_values["playerID"] = packet.read(c_int64)
attr_values["missionID"] = packet.read(c_int)
attr_values["bMissionOffered"] = packet.read(c_bit)
else:
raise NotImplementedError("Custom serialization")
values = "\n".join(["%s = %s" % (a,b) for a,b in vars.items()])
values = "\n".join(["%s = %s" % (a, b) for a, b in attr_values.items()])
tag = "normal"
else:
local_enums = {}
@ -335,12 +341,12 @@ class CaptureExplorer(Frame):
type_ = attr.get("type")
default = attr.get("default")
if type_ == "bool": # bools don't have default-flags
vars[attr.get("name")] = packet.read(c_bit)
attr_values[attr.get("name")] = packet.read(c_bit)
continue
if default is not None:
is_not_default = packet.read(c_bit)
if not is_not_default:
vars[attr.get("name")] = default
attr_values[attr.get("name")] = default
continue
if type_ == "unsigned char":
value = packet.read(c_ubyte)
@ -354,8 +360,11 @@ class CaptureExplorer(Frame):
value = packet.read(c_int64)
elif type_ == "LWOOBJID":
value = packet.read(c_int64)
if value == object_id:
value = str(value)+" <self>"
else:
for obj in self.objects:
if obj.object_id == value:
if value == obj.object_id:
value = str(value)+" <"+self.tree.item(obj.entry, "values")[0]+">"
break
elif type_ == "float":
@ -386,26 +395,39 @@ class CaptureExplorer(Frame):
value = self.gamemsg_global_enums[type_][value]+" ("+str(value)+")"
else:
raise NotImplementedError(type_)
vars[attr.get("name")] = value
attr_values[attr.get("name")] = value
if not packet.all_read():
raise ValueError
except NotImplementedError as e:
values = (msg_name, str(e)+"\nlen: "+str(len(packet)-10)+"\n"+"\n".join(["%s = %s" % (a,b) for a,b in vars.items()]))
tag = "error"
except ValueError as e:
values = ("likely not "+msg_name, "Error while parsing, likely not this message!\n"+str(e)+"\nlen: "+str(len(packet)-10))
values = (msg_name, str(e)+"\nlen: "+str(len(packet)-10)+"\n"+"\n".join(["%s = %s" % (a, b) for a, b in attr_values.items()]))
tag = "error"
except (IndexError, UnicodeDecodeError) as e:
print(packet_name, msg_name)
import traceback
traceback.print_exc()
values = ("likely not "+msg_name, "Error while parsing, likely not this message!\n"+str(e)+"\nlen: "+str(len(packet)-10)+"\n"+"\n".join(["%s = %s" % (a,b) for a,b in vars.items()]))
values = ("likely not "+msg_name, "Error while parsing, likely not this message!\n"+str(e)+"\nlen: "+str(len(packet)-10)+"\n"+"\n".join(["%s = %s" % (a, b) for a, b in attr_values.items()]))
tag = "error"
except ValueError as e:
values = ("likely not "+msg_name, "Error while parsing, likely not this message!\n"+str(e)+"\nlen: "+str(len(packet)-10))
tag = "error"
else:
values = (msg_name, "\n".join(["%s = %s" % (a,b) for a,b in vars.items()]))
values = (msg_name, "\n".join(["%s = %s" % (a, b) for a, b in attr_values.items()]))
tag = "normal"
self.tree.insert(entry, "end", text=packet_name, values=values, tag=tag)
def parse_normal_packet(self, packet_name, packet):
id_ = packet_name[packet_name.index("[")+1:packet_name.index("]")]
if id_ not in norm_parser:
self.tree.insert("", "end", text=packet_name, values=(id_, "Add the struct definition file packetdefinitions/"+id_+".structs to enable parsing of this packet."), tag="error")
return
if id_.startswith("53"):
packet.skip_read(8)
else:
packet.skip_read(1)
parser_output = ParserOutput()
parser_output.append(norm_parser[id_].parse(packet))
self.tree.insert("", "end", text=packet_name, values=(id_, parser_output.text), tag=parser_output.tag)
def sort_column(self, col, reverse):
items = [item for item in self.tree.get_children()]
items.sort(key=lambda x: self.tree.set(x, col), reverse=reverse)
@ -422,6 +444,9 @@ class CaptureExplorer(Frame):
def main():
root = Tk()
fontheight = nametofont("TkDefaultFont").metrics("linespace")
style = Style(root)
style.configure("Treeview", rowheight=fontheight)
app = CaptureExplorer("<sqlite cdclient path>", "<game messages path>", master=root)
app.mainloop()