utils/captureexplorer.py
lcdr 33696fd9e8 captureexplorer now reads the options from a file named captureexplorer.ini
format:
[paths]
db_path=
gamemessages_path=
[parse]
2015-06-22 21:23:38 +02:00

458 lines
17 KiB
Python

import configparser
import os
import sqlite3
import tkinter.filedialog as filedialog
import xml.etree.ElementTree as ET
import zipfile
from collections import OrderedDict
from ctypes import c_float, c_int, c_int64, c_ubyte, c_uint, c_ushort
from tkinter import BooleanVar, BOTH, END, HORIZONTAL, Menu, Tk
from tkinter.font import nametofont
from tkinter.scrolledtext import ScrolledText
from tkinter.ttk import Frame, PanedWindow, Style, Treeview
import structparser
from pyraknet.bitstream import BitStream, c_bit
with open("packetdefinitions/replica/creation_header.structs", encoding="utf-8") as file:
creation_header_parser = structparser.StructParser(file.read())
with open("packetdefinitions/replica/serialization_header.structs", encoding="utf-8") as file:
serialization_header_parser = structparser.StructParser(file.read())
component_name = OrderedDict()
component_name[1] = "ControllablePhysics"
component_name[3] = "SimplePhysics"
component_name[40] = "PhantomPhysics"
component_name[7] = "Destructible"
component_name[49] = "Switch"
component_name[26] = "Pet"
component_name[4] = "Character"
component_name[17] = "Inventory"
component_name[5] = "Script"
component_name[9] = "Skill"
component_name[60] = "BaseCombatAI"
component_name[16] = "Vendor"
component_name[6] = "Bouncer"
component_name[39] = "ScriptedActivity"
component_name[2] = "Render"
component_name[107] = "Index36"
component_name[31] = None
component_name[35] = None
component_name[56] = None
component_name[64] = None
component_name[73] = None
comp_ids = list(component_name.keys())
comp_parser = {}
for key, value in component_name.items():
if value is not None:
with open("packetdefinitions/replica/components/"+value+".structs") as file:
comp_parser[key] = structparser.StructParser(file.read())
norm_parser = {}
for rootdir, _, files in os.walk("packetdefinitions"):
for filename in files:
with open(rootdir+"/"+filename) as file:
norm_parser[filename[:filename.rindex(".")]] = structparser.StructParser(file.read())
break
class ParserOutput:
def __init__(self):
self.text = ""
self.tag = "normal"
def __enter__(self):
pass
def __exit__(self, exc_type, exc_value, traceback):
if exc_type is not None:
if exc_type == AssertionError:
exc_name = "ASSERTION FAILED"
self.tag = "assertfail"
elif exc_type == IndexError:
exc_name = "READ ERROR"
self.tag = "readerror"
self.text = exc_name+" "+str(exc_value)+"\n"+self.text
return True
def append(self, structs):
for level, description, value, unexpected in structs:
if unexpected:
self.text += "UNEXPECTED: "
self.tag = "unexpected"
self.text += "\t"*level+description+": "+str(value)+"\n"
class CaptureObject:
def __init__(self, network_id=None, object_id=None, lot=None):
self.network_id = network_id
self.object_id = object_id
self.lot = lot
self.entry = None
class CaptureExplorer(Frame):
def __init__(self):
super().__init__()
config = configparser.ConfigParser()
config.read("captureexplorer.ini")
self.sqlite = sqlite3.connect(config["paths"]["db_path"])
gamemsg_xml = ET.parse(config["paths"]["gamemessages_path"])
self.gamemsgs = gamemsg_xml.findall("message")
self.gamemsg_global_enums = {}
for enum in gamemsg_xml.findall("enum"):
self.gamemsg_global_enums[enum.get("name")] = tuple(value.get("name") for value in enum.findall("value"))
self.objects = []
self.lot_data = {}
self.parse_creations = BooleanVar(value=config["parse"].get("creations", True))
self.parse_serializations = BooleanVar(value=config["parse"].get("serializations", True))
self.parse_game_messages = BooleanVar(value=config["parse"].get("game_messages", True))
self.parse_normal_packets = BooleanVar(value=config["parse"].get("normal_packets", True))
self.create_widgets()
def create_widgets(self):
menubar = Menu()
menubar.add_command(label="Open", command=self.askopenfiles)
parse_menu = Menu(menubar)
parse_menu.add_checkbutton(label="Parse Creations", variable=self.parse_creations)
parse_menu.add_checkbutton(label="Parse Serializations", variable=self.parse_serializations)
parse_menu.add_checkbutton(label="Parse Game Messages", variable=self.parse_game_messages)
parse_menu.add_checkbutton(label="Parse Normal Packets", variable=self.parse_normal_packets)
menubar.add_cascade(label="Parse", menu=parse_menu)
self.master.config(menu=menubar)
pane = PanedWindow(orient=HORIZONTAL)
pane.pack(fill=BOTH, expand=True)
columns = ("id",)
self.tree = Treeview(columns=columns)
for col in columns:
self.tree.heading(col, text=col, command=(lambda col: lambda: self.sort_column(col, False))(col))
self.tree.tag_configure("normal")
self.tree.tag_configure("unexpected", foreground="medium blue")
self.tree.tag_configure("assertfail", foreground="orange")
self.tree.tag_configure("readerror", background="medium purple")
self.tree.tag_configure("error", foreground="red")
self.tree.bind("<<TreeviewSelect>>", self.on_item_click)
pane.add(self.tree)
self.item_inspector = ScrolledText(font="TkDefaultFont", tabs="4m")
self.item_inspector.insert(END, "Select an item to inspect it.")
pane.add(self.item_inspector)
def askopenfiles(self):
files = filedialog.askopenfilenames(filetypes=[("Zip", "*.zip")])
if files:
self.load_captures(files)
def load_captures(self, captures):
self.tree.set_children("")
self.objects = []
print("Loading captures, this might take a while")
for capture in captures:
print("Loading", capture)
with zipfile.ZipFile(capture) as capture:
files = [i for i in capture.namelist() if "of" not in i]
if self.parse_creations.get():
print("Parsing creations")
creations = [i for i in files if "[24]" in i]
for packet_name in creations:
packet = BitStream(capture.read(packet_name))
self.parse_creation(packet_name, packet)
if self.parse_serializations.get():
print("Parsing serializations")
serializations = [i for i in files if "[27]" in i]
for packet_name in serializations:
packet = BitStream(capture.read(packet_name)[1:])
self.parse_serialization_packet(packet_name, packet)
if self.parse_game_messages.get():
print("Parsing game messages")
game_messages = [i for i in files if "[53-05-00-0c]" in i or "[53-04-00-05]" in i]
for packet_name in game_messages:
packet = BitStream(capture.read(packet_name)[8:])
self.parse_game_message(packet_name, packet)
if self.parse_normal_packets.get():
print("Parsing normal packets")
packets = [i for i in files if "[24]" not in i and "[27]" not in i and "[53-05-00-0c]" not in i and "[53-04-00-05]" not in i]
for packet_name in packets:
packet = BitStream(capture.read(packet_name))
self.parse_normal_packet(packet_name, packet)
def parse_creation(self, packet_name, packet):
packet.skip_read(1)
has_network_id = packet.read(c_bit)
assert has_network_id
network_id = packet.read(c_ushort)
object_id = packet.read(c_int64)
for obj in self.objects:
if obj.object_id == object_id: # We've already parsed this object (can happen due to ghosting)
return
lot = packet.read(c_int)
if lot not in self.lot_data:
try:
lot_name = self.sqlite.execute("select name from Objects where id == "+str(lot)).fetchone()[0]
except TypeError:
print("Name for lot", lot, "not found")
lot_name = str(lot)
component_types = [i[0] for i in self.sqlite.execute("select component_type from ComponentsRegistry where id == "+str(lot)).fetchall()]
parsers = []
try:
component_types.sort(key=comp_ids.index)
for comp_type in component_types:
if component_name[comp_type] is not None:
parsers.append((component_name[comp_type], comp_parser[comp_type]))
except ValueError as e:
error = "ERROR: Unknown component "+str(e.args[0].split()[0])+" "+str(component_types)
else:
error = None
self.lot_data[lot] = lot_name, parsers, error
else:
lot_name, parsers, error = self.lot_data[lot]
id_ = packet.read(str, length_type=c_ubyte) + " " + lot_name
packet._read_offset = 0
parser_output = ParserOutput()
with parser_output:
parser_output.append(creation_header_parser.parse(packet))
if error is not None:
parser_output.text = error+"\n"+parser_output.text
parser_output.tag = "error"
else:
self.parse_serialization(packet, parser_output, parsers, is_creation=True)
obj = CaptureObject(network_id=network_id, object_id=object_id, lot=lot)
self.objects.append(obj)
obj.entry = self.tree.insert("", "end", text=packet_name, values=(id_, parser_output.text), tag=parser_output.tag)
@staticmethod
def parse_serialization(packet, parser_output, parsers, is_creation=False):
parser_output.append(serialization_header_parser.parse(packet))
for name, parser in parsers:
parser_output.text += "\n"+name+"\n\n"
parser_output.append(parser.parse(packet, {"creation":is_creation}))
if not packet.all_read():
raise IndexError("Not completely read")
def parse_serialization_packet(self, packet_name, packet):
network_id = packet.read(c_ushort)
obj = None
for j in self.objects:
if j.network_id == network_id:
obj = j
break
if obj is None:
obj = CaptureObject(network_id=network_id)
self.objects.append(obj)
obj.entry = self.tree.insert("", "end", text="Unknown", values=("network_id="+str(network_id), ""), tag="normal")
if obj.lot is None:
parsers = []
error = "Unknown object"
else:
_, parsers, error = self.lot_data[obj.lot]
parser_output = ParserOutput()
with parser_output:
self.parse_serialization(packet, parser_output, parsers)
if error is not None:
parser_output.tag = "error"
else:
error = ""
self.tree.insert(obj.entry, "end", text=packet_name, values=(error, parser_output.text), tag=parser_output.tag)
def parse_game_message(self, packet_name, packet):
object_id = packet.read(c_int64)
for i in self.objects:
if i.object_id == object_id:
entry = i.entry
break
else:
obj = CaptureObject(object_id=object_id)
self.objects.append(obj)
obj.entry = entry = self.tree.insert("", "end", text="Unknown", values=("object_id="+str(object_id), ""), tag="normal")
msg_id = packet.read(c_ushort)
if msg_id <= 0x80:
msg_id -= 1
elif msg_id <= 0xf9:
msg_id -= 2
elif msg_id <= 0x175:
msg_id += 1
elif msg_id <= 0x1fd:
msg_id -= 1
elif msg_id <= 0x208:
msg_id -= 5
elif msg_id <= 0x231:
msg_id -= 8
elif msg_id <= 0x30d:
msg_id -= 10
elif msg_id <= 0x353:
msg_id -= 9
elif msg_id <= 0x37a:
msg_id -= 10
elif msg_id <= 0x3a6:
msg_id -= 9
elif msg_id <= 0x430:
msg_id -= 33
elif msg_id <= 0x4c7:
msg_id -= 34
elif msg_id <= 0x510:
msg_id -= 31
elif msg_id <= 0x58b:
msg_id -= 30
elif msg_id <= 0x5e7:
msg_id -= 29
try:
message = self.gamemsgs[msg_id]
msg_name = message.get("name")
network = message.get("network")
if network is None or ((("[53-05-00-0c]" in packet_name and "client" not in network) or ("[53-04-00-05]" in packet_name and "server" not in network)) and network != "duplicated"):
raise ValueError
attrs = message.findall("attr")
attrs.sort(key=lambda x: x.get("name"))
attr_values = OrderedDict()
if message.find("freeze") is not None or message.find("thaw") is not None:
# Custom serializations
if msg_name == "NotifyMissionTask":
attr_values["missionID"] = packet.read(c_int)
attr_values["taskMask"] = packet.read(c_int)
updates = []
for _ in range(packet.read(c_ubyte)):
updates.append(packet.read(c_float))
attr_values["updates"] = updates
elif msg_name == "RequestLinkedMission":
attr_values["playerID"] = packet.read(c_int64)
attr_values["missionID"] = packet.read(c_int)
attr_values["bMissionOffered"] = packet.read(c_bit)
else:
raise NotImplementedError("Custom serialization")
values = "\n".join(["%s = %s" % (a, b) for a, b in attr_values.items()])
tag = "normal"
else:
local_enums = {}
for enum in message.findall("enum"):
local_enums[enum.get("name")] = tuple(value.get("name") for value in enum.findall("value"))
for attr in attrs:
if attr.get("returnValue") is not None:
raise NotImplementedError(attr.get("name"), "returnValue")
type_ = attr.get("type")
default = attr.get("default")
if type_ == "bool": # bools don't have default-flags
attr_values[attr.get("name")] = packet.read(c_bit)
continue
if default is not None:
is_not_default = packet.read(c_bit)
if not is_not_default:
attr_values[attr.get("name")] = default
continue
if type_ == "unsigned char":
value = packet.read(c_ubyte)
elif type_ == "LWOMAPID":
value = packet.read(c_ushort)
elif type_ in ("int", "LOT"):
value = packet.read(c_int)
elif type_ in ("unsigned int", "TSkillID"):
value = packet.read(c_uint)
elif type_ == "__int64":
value = packet.read(c_int64)
elif type_ == "LWOOBJID":
value = packet.read(c_int64)
if value == object_id:
value = str(value)+" <self>"
else:
for obj in self.objects:
if value == obj.object_id:
value = str(value)+" <"+self.tree.item(obj.entry, "values")[0]+">"
break
elif type_ == "float":
value = packet.read(c_float)
elif type_ == "std::string":
length = packet.read(c_uint)
if length > 255: # in case this isn't the right message after all and we read a way too high value
raise ValueError
value = packet.read(str, char_size=1, allocated_length=length)
elif type_ == "std::wstring":
length = packet.read(c_uint)
if length > 255: # in case this isn't the right message after all and we read a way too high value
raise ValueError
value = packet.read(str, char_size=2, allocated_length=length*2)
elif type_ == "NiPoint3":
value = packet.read(c_float), packet.read(c_float), packet.read(c_float)
elif type_ == "NiQuaternion":
value = packet.read(c_float), packet.read(c_float), packet.read(c_float), packet.read(c_float)
elif type_ == "LwoNameValue":
value = packet.read(str, length_type=c_uint)
if len(value) > 0:
assert packet.read(c_ushort) == 0 # for some reason has a null terminator
elif type_ in local_enums:
value = packet.read(c_uint)
value = local_enums[type_][value]+" ("+str(value)+")"
elif type_ in self.gamemsg_global_enums:
value = packet.read(c_uint)
value = self.gamemsg_global_enums[type_][value]+" ("+str(value)+")"
else:
raise NotImplementedError(type_)
attr_values[attr.get("name")] = value
if not packet.all_read():
raise ValueError
except NotImplementedError as e:
values = (msg_name, str(e)+"\nlen: "+str(len(packet)-10)+"\n"+"\n".join(["%s = %s" % (a, b) for a, b in attr_values.items()]))
tag = "error"
except (IndexError, UnicodeDecodeError) as e:
print(packet_name, msg_name)
import traceback
traceback.print_exc()
values = ("likely not "+msg_name, "Error while parsing, likely not this message!\n"+str(e)+"\nlen: "+str(len(packet)-10)+"\n"+"\n".join(["%s = %s" % (a, b) for a, b in attr_values.items()]))
tag = "error"
except ValueError as e:
values = ("likely not "+msg_name, "Error while parsing, likely not this message!\n"+str(e)+"\nlen: "+str(len(packet)-10))
tag = "error"
else:
values = (msg_name, "\n".join(["%s = %s" % (a, b) for a, b in attr_values.items()]))
tag = "normal"
self.tree.insert(entry, "end", text=packet_name, values=values, tag=tag)
def parse_normal_packet(self, packet_name, packet):
id_ = packet_name[packet_name.index("[")+1:packet_name.index("]")]
if id_ not in norm_parser:
self.tree.insert("", "end", text=packet_name, values=(id_, "Add the struct definition file packetdefinitions/"+id_+".structs to enable parsing of this packet."), tag="error")
return
if id_.startswith("53"):
packet.skip_read(8)
else:
packet.skip_read(1)
parser_output = ParserOutput()
parser_output.append(norm_parser[id_].parse(packet))
self.tree.insert("", "end", text=packet_name, values=(id_, parser_output.text), tag=parser_output.tag)
def sort_column(self, col, reverse):
items = [item for item in self.tree.get_children()]
items.sort(key=lambda x: self.tree.set(x, col), reverse=reverse)
# rearrange items in sorted positions
for index, item in enumerate(items):
self.tree.move(item, "", index)
# reverse sort next time
self.tree.heading(col, command=lambda: self.sort_column(col, not reverse))
def on_item_click(self, event):
item = self.tree.selection()[0]
self.item_inspector.delete(1.0, END)
self.item_inspector.insert(END, self.tree.item(item, "values")[1])
def main():
root = Tk()
fontheight = nametofont("TkDefaultFont").metrics("linespace")
style = Style()
style.configure("Treeview", rowheight=fontheight)
app = CaptureExplorer()
app.mainloop()
if __name__ == "__main__":
main()