diff --git a/captureexplorer.py b/captureexplorer.py index 8c07ad2..e3bf676 100644 --- a/captureexplorer.py +++ b/captureexplorer.py @@ -1,3 +1,4 @@ +import os import sqlite3 import tkinter.filedialog as filedialog import xml.etree.ElementTree as ET @@ -5,8 +6,9 @@ import zipfile from collections import OrderedDict from ctypes import c_float, c_int, c_int64, c_ubyte, c_uint, c_ushort from tkinter import BooleanVar, BOTH, END, HORIZONTAL, Menu, Tk +from tkinter.font import nametofont from tkinter.scrolledtext import ScrolledText -from tkinter.ttk import Frame, PanedWindow, Treeview +from tkinter.ttk import Frame, PanedWindow, Style, Treeview import structparser @@ -41,18 +43,51 @@ component_name[64] = None component_name[73] = None comp_ids = list(component_name.keys()) -parser = {} +comp_parser = {} for key, value in component_name.items(): if value is not None: with open("packetdefinitions/replica/components/"+value+".structs") as file: - parser[key] = structparser.StructParser(file.read()) + comp_parser[key] = structparser.StructParser(file.read()) + +norm_parser = {} +for rootdir, _, files in os.walk("packetdefinitions"): + for filename in files: + with open(rootdir+"/"+filename) as file: + norm_parser[filename[:filename.rindex(".")]] = structparser.StructParser(file.read()) + break + +class ParserOutput: + def __init__(self): + self.text = "" + self.tag = "normal" + + def __enter__(self): + pass + + def __exit__(self, exc_type, exc_value, traceback): + if exc_type is not None: + if exc_type == AssertionError: + exc_name = "ASSERTION FAILED" + self.tag = "assertfail" + elif exc_type == IndexError: + exc_name = "READ ERROR" + self.tag = "readerror" + self.text = exc_name+" "+str(exc_value)+"\n"+self.text + return True + + def append(self, structs): + for level, description, value, unexpected in structs: + if unexpected: + self.text += "UNEXPECTED: " + self.tag = "unexpected" + self.text += "\t"*level+description+": "+str(value)+"\n" class CaptureObject: - def __init__(self, network_id=None, object_id=None): + def __init__(self, network_id=None, object_id=None, lot=None): self.network_id = network_id self.object_id = object_id + self.lot = lot self.entry = None - self.comp_parsers = [] class CaptureExplorer(Frame): def __init__(self, db_path, gamemessages_path, master=None): @@ -69,6 +104,7 @@ class CaptureExplorer(Frame): self.parse_creations = BooleanVar(value=True) self.parse_serializations = BooleanVar(value=True) self.parse_game_messages = BooleanVar(value=True) + self.parse_normal_packets = BooleanVar(value=True) self.create_widgets() def create_widgets(self): @@ -78,8 +114,8 @@ class CaptureExplorer(Frame): parse_menu.add_checkbutton(label="Parse Creations", variable=self.parse_creations) parse_menu.add_checkbutton(label="Parse Serializations", variable=self.parse_serializations) parse_menu.add_checkbutton(label="Parse Game Messages", variable=self.parse_game_messages) + parse_menu.add_checkbutton(label="Parse Normal Packets", variable=self.parse_normal_packets) menubar.add_cascade(label="Parse", menu=parse_menu) - menubar.add_command(label="Reload struct definitions", command=self.reload_parsers) self.master.config(menu=menubar) pane = PanedWindow(orient=HORIZONTAL) @@ -89,25 +125,18 @@ class CaptureExplorer(Frame): self.tree = Treeview(columns=columns) for col in columns: self.tree.heading(col, text=col, command=(lambda col: lambda: self.sort_column(col, False))(col)) - self.tree.tag_configure("normal", font="0") - self.tree.tag_configure("unexpected", font="0", foreground="medium blue") - self.tree.tag_configure("assertfail", font="0", foreground="orange") - self.tree.tag_configure("readerror", font="0", background="medium purple") - self.tree.tag_configure("error", font="0", foreground="red") + self.tree.tag_configure("normal") + self.tree.tag_configure("unexpected", foreground="medium blue") + self.tree.tag_configure("assertfail", foreground="orange") + self.tree.tag_configure("readerror", background="medium purple") + self.tree.tag_configure("error", foreground="red") self.tree.bind("<>", self.on_item_click) pane.add(self.tree) - self.item_inspector = ScrolledText(font="0", tabs="4m") + self.item_inspector = ScrolledText(font="TkDefaultFont", tabs="4m") self.item_inspector.insert(END, "Select an item to inspect it.") pane.add(self.item_inspector) - def reload_parsers(self): - for key, value in component_name.items(): - if value is not None: - with open("packetdefinitions/replica/components/"+value+".structs") as file: - parser[key].__init__(file.read()) - - def askopenfiles(self): files = filedialog.askopenfilenames(filetypes=[("Zip", "*.zip")]) if files: @@ -126,45 +155,31 @@ class CaptureExplorer(Frame): print("Parsing creations") creations = [i for i in files if "[24]" in i] for packet_name in creations: - lot = int(packet_name[packet_name.index("(")+1:packet_name.index(")")]) - if lot not in self.lot_data: - try: - lot_name = self.sqlite.execute("select name from Objects where id == "+str(lot)).fetchone()[0] - except TypeError: - print("Name for lot", lot, "not found") - lot_name = str(lot) - component_types = [i[0] for i in self.sqlite.execute("select component_type from ComponentsRegistry where id == "+str(lot)).fetchall()] - parsers = [] - try: - component_types.sort(key=comp_ids.index) - for comp_type in component_types: - if component_name[comp_type] is not None: - parsers.append((component_name[comp_type], parser[comp_type])) - except ValueError as e: - error = "ERROR: Unknown component "+str(e.args[0].split()[0])+" "+str(component_types) - else: - error = None - self.lot_data[lot] = lot_name, parsers, error - else: - lot_name, parsers, error = self.lot_data[lot] packet = BitStream(capture.read(packet_name)) - self.parse_creation(packet_name, packet, lot_name, parsers, error) + self.parse_creation(packet_name, packet) if self.parse_serializations.get(): print("Parsing serializations") serializations = [i for i in files if "[27]" in i] for packet_name in serializations: packet = BitStream(capture.read(packet_name)[1:]) - self.parse_serialization(packet_name, packet) + self.parse_serialization_packet(packet_name, packet) if self.parse_game_messages.get(): print("Parsing game messages") - game_messages = [i for i in files if "[53-05-00-0c]" in i or "[53-04-00-05" in i] + game_messages = [i for i in files if "[53-05-00-0c]" in i or "[53-04-00-05]" in i] for packet_name in game_messages: packet = BitStream(capture.read(packet_name)[8:]) self.parse_game_message(packet_name, packet) - def parse_creation(self, packet_name, packet, lot_name, parsers, error): + if self.parse_normal_packets.get(): + print("Parsing normal packets") + packets = [i for i in files if "[24]" not in i and "[27]" not in i and "[53-05-00-0c]" not in i and "[53-04-00-05]" not in i] + for packet_name in packets: + packet = BitStream(capture.read(packet_name)) + self.parse_normal_packet(packet_name, packet) + + def parse_creation(self, packet_name, packet): packet.skip_read(1) has_network_id = packet.read(c_bit) assert has_network_id @@ -173,49 +188,52 @@ class CaptureExplorer(Frame): for obj in self.objects: if obj.object_id == object_id: # We've already parsed this object (can happen due to ghosting) return - packet.skip_read(4) + lot = packet.read(c_int) + if lot not in self.lot_data: + try: + lot_name = self.sqlite.execute("select name from Objects where id == "+str(lot)).fetchone()[0] + except TypeError: + print("Name for lot", lot, "not found") + lot_name = str(lot) + component_types = [i[0] for i in self.sqlite.execute("select component_type from ComponentsRegistry where id == "+str(lot)).fetchall()] + parsers = [] + try: + component_types.sort(key=comp_ids.index) + for comp_type in component_types: + if component_name[comp_type] is not None: + parsers.append((component_name[comp_type], comp_parser[comp_type])) + except ValueError as e: + error = "ERROR: Unknown component "+str(e.args[0].split()[0])+" "+str(component_types) + else: + error = None + self.lot_data[lot] = lot_name, parsers, error + else: + lot_name, parsers, error = self.lot_data[lot] id_ = packet.read(str, length_type=c_ubyte) + " " + lot_name packet._read_offset = 0 - parser_output = "" - tag = "normal" - try: - for level, description, value, unexpected in creation_header_parser.parse(packet): - if unexpected: - parser_output += "UNEXPECTED: " - tag = "unexpected" - parser_output += "\t"*level+description+": "+str(value)+"\n" - for level, description, value, unexpected in serialization_header_parser.parse(packet): - if unexpected: - parser_output += "UNEXPECTED: " - tag = "unexpected" - parser_output += "\t"*level+description+": "+str(value)+"\n" - - if error: - parser_output = error+"\n"+parser_output - tag = "error" + parser_output = ParserOutput() + with parser_output: + parser_output.append(creation_header_parser.parse(packet)) + if error is not None: + parser_output.text = error+"\n"+parser_output.text + parser_output.tag = "error" else: - for name, parser in parsers: - parser_output += "\n"+name+"\n\n" - for level, description, value, unexpected in parser.parse(packet, {"creation":True}): - if unexpected: - parser_output += "UNEXPECTED: " - tag = "unexpected" - parser_output += "\t"*level+description+": "+str(value)+"\n" - if not packet.all_read(): - raise IndexError("Not completely read") - except AssertionError as e: - parser_output = "ASSERTION FAILED "+str(e)+"\n"+parser_output - tag = "assertfail" - except IndexError as e: - parser_output = "READ ERROR "+str(e)+"\n"+parser_output - tag = "readerror" + self.parse_serialization(packet, parser_output, parsers, is_creation=True) - obj = CaptureObject(network_id=network_id, object_id=object_id) + obj = CaptureObject(network_id=network_id, object_id=object_id, lot=lot) self.objects.append(obj) - obj.comp_parsers = parsers - obj.entry = self.tree.insert("", "end", text=packet_name, values=(id_, parser_output), tag=tag) + obj.entry = self.tree.insert("", "end", text=packet_name, values=(id_, parser_output.text), tag=parser_output.tag) - def parse_serialization(self, packet_name, packet): + @staticmethod + def parse_serialization(packet, parser_output, parsers, is_creation=False): + parser_output.append(serialization_header_parser.parse(packet)) + for name, parser in parsers: + parser_output.text += "\n"+name+"\n\n" + parser_output.append(parser.parse(packet, {"creation":is_creation})) + if not packet.all_read(): + raise IndexError("Not completely read") + + def parse_serialization_packet(self, packet_name, packet): network_id = packet.read(c_ushort) obj = None for j in self.objects: @@ -227,32 +245,20 @@ class CaptureExplorer(Frame): self.objects.append(obj) obj.entry = self.tree.insert("", "end", text="Unknown", values=("network_id="+str(network_id), ""), tag="normal") - tag = "normal" - parser_output = "" - try: - for level, description, value, unexpected in serialization_header_parser.parse(packet): - if unexpected: - parser_output += "UNEXPECTED: " - tag = "unexpected" - parser_output += "\t"*level+description+": "+str(value)+"\n" + if obj.lot is None: + parsers = [] + error = "Unknown object" + else: + _, parsers, error = self.lot_data[obj.lot] - for name, parser in obj.comp_parsers: - parser_output += "\n"+name+"\n\n" - for level, description, value, unexpected in parser.parse(packet, {"creation":False}): - if unexpected: - parser_output += "UNEXPECTED: " - tag = "unexpected" - parser_output += "\t"*level+description+": "+str(value)+"\n" - if not packet.all_read(): - raise IndexError("Not completely read") - except AssertionError as e: - parser_output = "ASSERTION FAILED "+str(e)+"\n"+parser_output - tag = "assertfail" - except IndexError as e: - parser_output = "READ ERROR "+str(e)+"\n"+parser_output - tag = "readerror" - - self.tree.insert(obj.entry, "end", text=packet_name, values=("Note: If the creation packet has an error, the serialization packets will have one as well", parser_output), tag=tag) + parser_output = ParserOutput() + with parser_output: + self.parse_serialization(packet, parser_output, parsers) + if error is not None: + parser_output.tag = "error" + else: + error = "" + self.tree.insert(obj.entry, "end", text=packet_name, values=(error, parser_output.text), tag=parser_output.tag) def parse_game_message(self, packet_name, packet): object_id = packet.read(c_int64) @@ -306,23 +312,23 @@ class CaptureExplorer(Frame): attrs = message.findall("attr") attrs.sort(key=lambda x: x.get("name")) - vars = OrderedDict() + attr_values = OrderedDict() if message.find("freeze") is not None or message.find("thaw") is not None: # Custom serializations if msg_name == "NotifyMissionTask": - vars["missionID"] = packet.read(c_int) - vars["taskMask"] = packet.read(c_int) + attr_values["missionID"] = packet.read(c_int) + attr_values["taskMask"] = packet.read(c_int) updates = [] for _ in range(packet.read(c_ubyte)): updates.append(packet.read(c_float)) - vars["updates"] = updates + attr_values["updates"] = updates elif msg_name == "RequestLinkedMission": - vars["playerID"] = packet.read(c_int64) - vars["missionID"] = packet.read(c_int) - vars["bMissionOffered"] = packet.read(c_bit) + attr_values["playerID"] = packet.read(c_int64) + attr_values["missionID"] = packet.read(c_int) + attr_values["bMissionOffered"] = packet.read(c_bit) else: raise NotImplementedError("Custom serialization") - values = "\n".join(["%s = %s" % (a,b) for a,b in vars.items()]) + values = "\n".join(["%s = %s" % (a, b) for a, b in attr_values.items()]) tag = "normal" else: local_enums = {} @@ -335,12 +341,12 @@ class CaptureExplorer(Frame): type_ = attr.get("type") default = attr.get("default") if type_ == "bool": # bools don't have default-flags - vars[attr.get("name")] = packet.read(c_bit) + attr_values[attr.get("name")] = packet.read(c_bit) continue if default is not None: is_not_default = packet.read(c_bit) if not is_not_default: - vars[attr.get("name")] = default + attr_values[attr.get("name")] = default continue if type_ == "unsigned char": value = packet.read(c_ubyte) @@ -354,10 +360,13 @@ class CaptureExplorer(Frame): value = packet.read(c_int64) elif type_ == "LWOOBJID": value = packet.read(c_int64) - for obj in self.objects: - if obj.object_id == value: - value = str(value)+" <"+self.tree.item(obj.entry, "values")[0]+">" - break + if value == object_id: + value = str(value)+" " + else: + for obj in self.objects: + if value == obj.object_id: + value = str(value)+" <"+self.tree.item(obj.entry, "values")[0]+">" + break elif type_ == "float": value = packet.read(c_float) elif type_ == "std::string": @@ -386,26 +395,39 @@ class CaptureExplorer(Frame): value = self.gamemsg_global_enums[type_][value]+" ("+str(value)+")" else: raise NotImplementedError(type_) - vars[attr.get("name")] = value + attr_values[attr.get("name")] = value if not packet.all_read(): raise ValueError except NotImplementedError as e: - values = (msg_name, str(e)+"\nlen: "+str(len(packet)-10)+"\n"+"\n".join(["%s = %s" % (a,b) for a,b in vars.items()])) - tag = "error" - except ValueError as e: - values = ("likely not "+msg_name, "Error while parsing, likely not this message!\n"+str(e)+"\nlen: "+str(len(packet)-10)) + values = (msg_name, str(e)+"\nlen: "+str(len(packet)-10)+"\n"+"\n".join(["%s = %s" % (a, b) for a, b in attr_values.items()])) tag = "error" except (IndexError, UnicodeDecodeError) as e: print(packet_name, msg_name) import traceback traceback.print_exc() - values = ("likely not "+msg_name, "Error while parsing, likely not this message!\n"+str(e)+"\nlen: "+str(len(packet)-10)+"\n"+"\n".join(["%s = %s" % (a,b) for a,b in vars.items()])) + values = ("likely not "+msg_name, "Error while parsing, likely not this message!\n"+str(e)+"\nlen: "+str(len(packet)-10)+"\n"+"\n".join(["%s = %s" % (a, b) for a, b in attr_values.items()])) + tag = "error" + except ValueError as e: + values = ("likely not "+msg_name, "Error while parsing, likely not this message!\n"+str(e)+"\nlen: "+str(len(packet)-10)) tag = "error" else: - values = (msg_name, "\n".join(["%s = %s" % (a,b) for a,b in vars.items()])) + values = (msg_name, "\n".join(["%s = %s" % (a, b) for a, b in attr_values.items()])) tag = "normal" self.tree.insert(entry, "end", text=packet_name, values=values, tag=tag) + def parse_normal_packet(self, packet_name, packet): + id_ = packet_name[packet_name.index("[")+1:packet_name.index("]")] + if id_ not in norm_parser: + self.tree.insert("", "end", text=packet_name, values=(id_, "Add the struct definition file packetdefinitions/"+id_+".structs to enable parsing of this packet."), tag="error") + return + if id_.startswith("53"): + packet.skip_read(8) + else: + packet.skip_read(1) + parser_output = ParserOutput() + parser_output.append(norm_parser[id_].parse(packet)) + self.tree.insert("", "end", text=packet_name, values=(id_, parser_output.text), tag=parser_output.tag) + def sort_column(self, col, reverse): items = [item for item in self.tree.get_children()] items.sort(key=lambda x: self.tree.set(x, col), reverse=reverse) @@ -422,8 +444,11 @@ class CaptureExplorer(Frame): def main(): root = Tk() + fontheight = nametofont("TkDefaultFont").metrics("linespace") + style = Style(root) + style.configure("Treeview", rowheight=fontheight) app = CaptureExplorer("", "", master=root) app.mainloop() -if __name__=="__main__": +if __name__ == "__main__": main()