mirror of
https://github.com/lcdr/utils.git
synced 2024-08-30 17:32:16 +00:00
Added luzviewer for viewing .luz (and .lvl) files, and reorganized the viewers so they use a common base class.
This commit is contained in:
parent
7f694262a5
commit
742c32c5cd
@ -6,13 +6,10 @@ import xml.etree.ElementTree as ET
|
||||
import zipfile
|
||||
from collections import OrderedDict
|
||||
from ctypes import c_float, c_int, c_int64, c_ubyte, c_uint, c_ushort
|
||||
from tkinter import BooleanVar, BOTH, END, HORIZONTAL, Menu, Tk
|
||||
from tkinter.font import nametofont
|
||||
from tkinter.scrolledtext import ScrolledText
|
||||
from tkinter.ttk import Frame, PanedWindow, Style, Treeview
|
||||
from tkinter import BooleanVar, END, Menu
|
||||
|
||||
import viewer
|
||||
import structparser
|
||||
|
||||
from pyraknet.bitstream import BitStream, c_bit
|
||||
|
||||
with open("packetdefinitions/replica/creation_header.structs", encoding="utf-8") as file:
|
||||
@ -62,7 +59,7 @@ for rootdir, _, files in os.walk("packetdefinitions"):
|
||||
class ParserOutput:
|
||||
def __init__(self):
|
||||
self.text = ""
|
||||
self.tag = "normal"
|
||||
self.tag = ""
|
||||
|
||||
def __enter__(self):
|
||||
pass
|
||||
@ -92,12 +89,12 @@ class CaptureObject:
|
||||
self.lot = lot
|
||||
self.entry = None
|
||||
|
||||
class CaptureExplorer(Frame):
|
||||
class CaptureViewer(viewer.Viewer):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
config = configparser.ConfigParser()
|
||||
config.read("captureexplorer.ini")
|
||||
self.sqlite = sqlite3.connect(config["paths"]["db_path"])
|
||||
config.read("captureviewer.ini")
|
||||
self.db = sqlite3.connect(config["paths"]["db_path"])
|
||||
gamemsg_xml = ET.parse(config["paths"]["gamemessages_path"])
|
||||
self.gamemsgs = gamemsg_xml.findall("message")
|
||||
self.gamemsg_global_enums = {}
|
||||
@ -113,6 +110,7 @@ class CaptureExplorer(Frame):
|
||||
self.create_widgets()
|
||||
|
||||
def create_widgets(self):
|
||||
super().create_widgets()
|
||||
menubar = Menu()
|
||||
menubar.add_command(label="Open", command=self.askopenfiles)
|
||||
parse_menu = Menu(menubar)
|
||||
@ -123,29 +121,19 @@ class CaptureExplorer(Frame):
|
||||
menubar.add_cascade(label="Parse", menu=parse_menu)
|
||||
self.master.config(menu=menubar)
|
||||
|
||||
pane = PanedWindow(orient=HORIZONTAL)
|
||||
pane.pack(fill=BOTH, expand=True)
|
||||
|
||||
columns = ("id",)
|
||||
self.tree = Treeview(columns=columns)
|
||||
self.tree.configure(columns=columns)
|
||||
for col in columns:
|
||||
self.tree.heading(col, text=col, command=(lambda col: lambda: self.sort_column(col, False))(col))
|
||||
self.tree.tag_configure("normal")
|
||||
self.tree.tag_configure("unexpected", foreground="medium blue")
|
||||
self.tree.tag_configure("assertfail", foreground="orange")
|
||||
self.tree.tag_configure("readerror", background="medium purple")
|
||||
self.tree.tag_configure("error", foreground="red")
|
||||
self.tree.bind("<<TreeviewSelect>>", self.on_item_click)
|
||||
pane.add(self.tree)
|
||||
|
||||
self.item_inspector = ScrolledText(font="TkDefaultFont", tabs="4m")
|
||||
self.item_inspector.insert(END, "Select an item to inspect it.")
|
||||
pane.add(self.item_inspector)
|
||||
|
||||
def askopenfiles(self):
|
||||
files = filedialog.askopenfilenames(filetypes=[("Zip", "*.zip")])
|
||||
if files:
|
||||
self.load_captures(files)
|
||||
paths = filedialog.askopenfilenames(filetypes=[("Zip", "*.zip")])
|
||||
if paths:
|
||||
self.load_captures(paths)
|
||||
|
||||
def load_captures(self, captures):
|
||||
self.tree.set_children("")
|
||||
@ -196,11 +184,11 @@ class CaptureExplorer(Frame):
|
||||
lot = packet.read(c_int)
|
||||
if lot not in self.lot_data:
|
||||
try:
|
||||
lot_name = self.sqlite.execute("select name from Objects where id == "+str(lot)).fetchone()[0]
|
||||
lot_name = self.db.execute("select name from Objects where id == "+str(lot)).fetchone()[0]
|
||||
except TypeError:
|
||||
print("Name for lot", lot, "not found")
|
||||
lot_name = str(lot)
|
||||
component_types = [i[0] for i in self.sqlite.execute("select component_type from ComponentsRegistry where id == "+str(lot)).fetchall()]
|
||||
component_types = [i[0] for i in self.db.execute("select component_type from ComponentsRegistry where id == "+str(lot)).fetchall()]
|
||||
parsers = []
|
||||
try:
|
||||
component_types.sort(key=comp_ids.index)
|
||||
@ -227,7 +215,7 @@ class CaptureExplorer(Frame):
|
||||
|
||||
obj = CaptureObject(network_id=network_id, object_id=object_id, lot=lot)
|
||||
self.objects.append(obj)
|
||||
obj.entry = self.tree.insert("", "end", text=packet_name, values=(id_, parser_output.text), tag=parser_output.tag)
|
||||
obj.entry = self.tree.insert("", END, text=packet_name, values=(id_, parser_output.text), tag=parser_output.tag)
|
||||
|
||||
@staticmethod
|
||||
def parse_serialization(packet, parser_output, parsers, is_creation=False):
|
||||
@ -248,7 +236,7 @@ class CaptureExplorer(Frame):
|
||||
if obj is None:
|
||||
obj = CaptureObject(network_id=network_id)
|
||||
self.objects.append(obj)
|
||||
obj.entry = self.tree.insert("", "end", text="Unknown", values=("network_id="+str(network_id), ""), tag="normal")
|
||||
obj.entry = self.tree.insert("", END, text="Unknown", values=("network_id="+str(network_id), ""))
|
||||
|
||||
if obj.lot is None:
|
||||
parsers = []
|
||||
@ -263,7 +251,7 @@ class CaptureExplorer(Frame):
|
||||
parser_output.tag = "error"
|
||||
else:
|
||||
error = ""
|
||||
self.tree.insert(obj.entry, "end", text=packet_name, values=(error, parser_output.text), tag=parser_output.tag)
|
||||
self.tree.insert(obj.entry, END, text=packet_name, values=(error, parser_output.text), tag=parser_output.tag)
|
||||
|
||||
def parse_game_message(self, packet_name, packet):
|
||||
object_id = packet.read(c_int64)
|
||||
@ -274,14 +262,14 @@ class CaptureExplorer(Frame):
|
||||
else:
|
||||
obj = CaptureObject(object_id=object_id)
|
||||
self.objects.append(obj)
|
||||
obj.entry = entry = self.tree.insert("", "end", text="Unknown", values=("object_id="+str(object_id), ""), tag="normal")
|
||||
obj.entry = entry = self.tree.insert("", END, text="Unknown", values=("object_id="+str(object_id), ""))
|
||||
|
||||
msg_id = packet.read(c_ushort)
|
||||
if msg_id <= 0x80:
|
||||
msg_id -= 1
|
||||
elif msg_id <= 0xf9:
|
||||
msg_id -= 2
|
||||
elif msg_id <= 0x175:
|
||||
elif msg_id <= 0x1c0:
|
||||
msg_id += 1
|
||||
elif msg_id <= 0x1fd:
|
||||
msg_id -= 1
|
||||
@ -327,6 +315,12 @@ class CaptureExplorer(Frame):
|
||||
for _ in range(packet.read(c_ubyte)):
|
||||
updates.append(packet.read(c_float))
|
||||
attr_values["updates"] = updates
|
||||
elif msg_name == "VendorStatusUpdate":
|
||||
attr_values["bUpdateOnly"] = packet.read(c_bit)
|
||||
inv = {}
|
||||
for _ in range(packet.read(c_uint)):
|
||||
inv[packet.read(c_int)] = packet.read(c_int)
|
||||
attr_values["inventoryList"] = inv
|
||||
elif msg_name == "RequestLinkedMission":
|
||||
attr_values["playerID"] = packet.read(c_int64)
|
||||
attr_values["missionID"] = packet.read(c_int)
|
||||
@ -339,7 +333,7 @@ class CaptureExplorer(Frame):
|
||||
else:
|
||||
raise NotImplementedError("Custom serialization")
|
||||
values = "\n".join(["%s = %s" % (a, b) for a, b in attr_values.items()])
|
||||
tag = "normal"
|
||||
tag = ""
|
||||
else:
|
||||
local_enums = {}
|
||||
for enum in message.findall("enum"):
|
||||
@ -395,7 +389,7 @@ class CaptureExplorer(Frame):
|
||||
value = packet.read(c_float), packet.read(c_float), packet.read(c_float), packet.read(c_float)
|
||||
elif type_ == "LwoNameValue":
|
||||
value = packet.read(str, length_type=c_uint)
|
||||
if len(value) > 0:
|
||||
if value:
|
||||
assert packet.read(c_ushort) == 0 # for some reason has a null terminator
|
||||
elif type_ in local_enums:
|
||||
value = packet.read(c_uint)
|
||||
@ -422,13 +416,13 @@ class CaptureExplorer(Frame):
|
||||
tag = "error"
|
||||
else:
|
||||
values = (msg_name, "\n".join(["%s = %s" % (a, b) for a, b in attr_values.items()]))
|
||||
tag = "normal"
|
||||
self.tree.insert(entry, "end", text=packet_name, values=values, tag=tag)
|
||||
tag = ""
|
||||
self.tree.insert(entry, END, text=packet_name, values=values, tag=tag)
|
||||
|
||||
def parse_normal_packet(self, packet_name, packet):
|
||||
id_ = packet_name[packet_name.index("[")+1:packet_name.index("]")]
|
||||
if id_ not in norm_parser:
|
||||
self.tree.insert("", "end", text=packet_name, values=(id_, "Add the struct definition file packetdefinitions/"+id_+".structs to enable parsing of this packet."), tag="error")
|
||||
self.tree.insert("", END, text=packet_name, values=(id_, "Add the struct definition file packetdefinitions/"+id_+".structs to enable parsing of this packet."), tag="error")
|
||||
return
|
||||
if id_.startswith("53"):
|
||||
packet.skip_read(8)
|
||||
@ -436,29 +430,13 @@ class CaptureExplorer(Frame):
|
||||
packet.skip_read(1)
|
||||
parser_output = ParserOutput()
|
||||
parser_output.append(norm_parser[id_].parse(packet))
|
||||
self.tree.insert("", "end", text=packet_name, values=(id_, parser_output.text), tag=parser_output.tag)
|
||||
self.tree.insert("", END, text=packet_name, values=(id_, parser_output.text), tag=parser_output.tag)
|
||||
|
||||
def sort_column(self, col, reverse):
|
||||
items = [item for item in self.tree.get_children()]
|
||||
items.sort(key=lambda x: self.tree.set(x, col), reverse=reverse)
|
||||
# rearrange items in sorted positions
|
||||
for index, item in enumerate(items):
|
||||
self.tree.move(item, "", index)
|
||||
# reverse sort next time
|
||||
self.tree.heading(col, command=lambda: self.sort_column(col, not reverse))
|
||||
|
||||
def on_item_click(self, event):
|
||||
def on_item_select(self, event):
|
||||
item = self.tree.selection()[0]
|
||||
self.item_inspector.delete(1.0, END)
|
||||
self.item_inspector.insert(END, self.tree.item(item, "values")[1])
|
||||
|
||||
def main():
|
||||
root = Tk()
|
||||
fontheight = nametofont("TkDefaultFont").metrics("linespace")
|
||||
style = Style()
|
||||
style.configure("Treeview", rowheight=fontheight)
|
||||
app = CaptureExplorer()
|
||||
app.mainloop()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
app = CaptureViewer()
|
||||
app.mainloop()
|
143
luzviewer.py
Normal file
143
luzviewer.py
Normal file
@ -0,0 +1,143 @@
|
||||
import configparser
|
||||
import os.path
|
||||
import sqlite3
|
||||
from ctypes import c_float, c_int64, c_ubyte, c_uint, c_uint64, c_ushort
|
||||
|
||||
import tkinter.filedialog as filedialog
|
||||
from tkinter import END, Menu
|
||||
|
||||
import viewer
|
||||
from pyraknet.bitstream import BitStream
|
||||
|
||||
class LUZViewer(viewer.Viewer):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
config = configparser.ConfigParser()
|
||||
config.read("luzviewer.ini")
|
||||
self.db = sqlite3.connect(config["paths"]["db_path"])
|
||||
self.create_widgets()
|
||||
|
||||
def create_widgets(self):
|
||||
super().create_widgets()
|
||||
menubar = Menu()
|
||||
menubar.add_command(label="Open", command=self.askopenfile)
|
||||
self.master.config(menu=menubar)
|
||||
|
||||
def askopenfile(self):
|
||||
path = filedialog.askopenfilename(filetypes=[("LEGO Universe Zone", "*.luz")])
|
||||
if path:
|
||||
self.load_luz(path)
|
||||
|
||||
def load_luz(self, luz_path):
|
||||
self.tree.set_children("")
|
||||
print("Loading", luz_path)
|
||||
with open(luz_path, "rb") as file:
|
||||
stream = BitStream(file.read())
|
||||
|
||||
version = stream.read(c_uint)
|
||||
assert version in (40, 41), version
|
||||
unknown1 = stream.read(c_uint)
|
||||
world_id = stream.read(c_uint)
|
||||
spawnpoint_pos = stream.read(c_float), stream.read(c_float), stream.read(c_float)
|
||||
spawnpoint_rot = stream.read(c_float), stream.read(c_float), stream.read(c_float), stream.read(c_float)
|
||||
|
||||
zone = self.tree.insert("", END, text="Zone", values=(version, unknown1, world_id, spawnpoint_pos, spawnpoint_rot))
|
||||
|
||||
### scenes
|
||||
scenes = self.tree.insert(zone, END, text="Scenes")
|
||||
for _ in range(stream.read(c_uint)):
|
||||
filename = stream.read(str, char_size=1, length_type=c_ubyte)
|
||||
scene_id = stream.read(c_uint)
|
||||
is_audio = stream.read(c_uint)
|
||||
scene_name = stream.read(str, char_size=1, length_type=c_ubyte)
|
||||
if is_audio:
|
||||
assert scene_name == "Audio"
|
||||
scene = self.tree.insert(scenes, END, text="Scene", values=(filename, scene_id, is_audio, scene_name))
|
||||
assert stream.read(bytes, length=3) == b"\xff\xff\xff"
|
||||
with open(os.path.join(os.path.dirname(luz_path), filename), "rb") as lvl:
|
||||
print("Loading lvl", filename)
|
||||
self.parse_lvl(BitStream(lvl.read()), scene)
|
||||
assert stream.read(c_ubyte) == 0
|
||||
### terrain
|
||||
filename = stream.read(str, char_size=1, length_type=c_ubyte)
|
||||
name = stream.read(str, char_size=1, length_type=c_ubyte)
|
||||
description = stream.read(str, char_size=1, length_type=c_ubyte)
|
||||
self.tree.insert(zone, END, text="Terrain", values=(filename, name, description))
|
||||
### unknown
|
||||
unknowns = self.tree.insert(zone, END, text="Unknowns")
|
||||
for _ in range(stream.read(c_uint)):
|
||||
for _ in range(2):
|
||||
unknown1 = stream.read(c_uint64)
|
||||
unknown2 = stream.read(c_float), stream.read(c_float), stream.read(c_float)
|
||||
self.tree.insert(unknowns, END, text="Unknown", values=(unknown1, unknown2))
|
||||
remaining_length = stream.read(c_uint)
|
||||
assert len(stream) - stream._read_offset//8 == remaining_length
|
||||
assert stream.read(c_uint) == 1
|
||||
|
||||
def parse_lvl(self, stream, scene):
|
||||
while not stream.all_read():
|
||||
assert stream._read_offset//8 % 16 == 0 # seems everything is aligned like this?
|
||||
start_pos = stream._read_offset//8
|
||||
assert stream.read(bytes, length=4) == b"CHNK"
|
||||
chunktype = stream.read(c_uint)
|
||||
assert stream.read(c_ushort) == 1
|
||||
assert stream.read(c_ushort) in (1, 2)
|
||||
chunk_length = stream.read(c_uint)
|
||||
data_pos = stream.read(c_uint)
|
||||
stream._read_offset = data_pos * 8
|
||||
assert stream._read_offset//8 % 16 == 0
|
||||
if chunktype == 1000:
|
||||
pass
|
||||
elif chunktype == 2000:
|
||||
pass
|
||||
elif chunktype == 2001:
|
||||
for _ in range(stream.read(c_uint)):
|
||||
object_id = stream.read(c_int64) # seems like the object id, but without some bits
|
||||
lot = stream.read(c_uint)
|
||||
unknown1 = stream.read(c_uint)
|
||||
unknown2 = stream.read(c_uint)
|
||||
position = stream.read(c_float), stream.read(c_float), stream.read(c_float)
|
||||
rotation = stream.read(c_float), stream.read(c_float), stream.read(c_float), stream.read(c_float)
|
||||
scale = stream.read(c_float)
|
||||
config_data = stream.read(str, length_type=c_uint)
|
||||
config_data = config_data.replace("{", "<crlbrktstart>").replace("}", "<crlbrktend>").replace("\\", "<backslash>") # for some reason these characters aren't properly escaped when sent to Tk
|
||||
assert stream.read(c_uint) == 0
|
||||
lot_name = ""
|
||||
if lot == 176:
|
||||
lot_name = "Spawner - "
|
||||
lot = config_data[config_data.index("spawntemplate")+16:config_data.index("\n", config_data.index("spawntemplate")+16)]
|
||||
try:
|
||||
lot_name += self.db.execute("select name from Objects where id == "+str(lot)).fetchone()[0]
|
||||
except TypeError:
|
||||
print("Name for lot", lot, "not found")
|
||||
lot_name += " - "+str(lot)
|
||||
self.tree.insert(scene, END, text="Object", values=(object_id, lot_name, unknown1, unknown2, position, rotation, scale, config_data))
|
||||
elif chunktype == 2002:
|
||||
pass
|
||||
stream._read_offset = (start_pos + chunk_length) * 8 # go to the next CHNK
|
||||
|
||||
def on_item_select(self, event):
|
||||
item = self.tree.selection()[0]
|
||||
item_type = self.tree.item(item, "text")
|
||||
if item_type == "Zone":
|
||||
cols = "version", "unknown1", "world_id", "spawnpoint_pos", "spawnpoint_rot"
|
||||
elif item_type == "Scene":
|
||||
cols = "filename", "scene_id", "is_audio", "scene_name"
|
||||
elif item_type == "Terrain":
|
||||
cols = "filename", "name", "description"
|
||||
elif item_type == "Unknown":
|
||||
cols = "unknown1", "unknown2"
|
||||
elif item_type == "Object":
|
||||
cols = "object_id", "lot", "unknown1", "unknown2", "position", "rotation", "scale"
|
||||
else:
|
||||
cols = ()
|
||||
if cols:
|
||||
self.tree.configure(columns=cols)
|
||||
for col in cols:
|
||||
self.tree.heading(col, text=col, command=(lambda col: lambda: self.sort_column(col, False))(col))
|
||||
self.item_inspector.delete(1.0, END)
|
||||
self.item_inspector.insert(END, "\n".join(self.tree.item(item, "values")))
|
||||
|
||||
if __name__ == "__main__":
|
||||
app = LUZViewer()
|
||||
app.mainloop()
|
80
viewer.py
Normal file
80
viewer.py
Normal file
@ -0,0 +1,80 @@
|
||||
from tkinter import BOTH, END, HORIZONTAL, RIGHT, StringVar, Text, X, Y
|
||||
from tkinter.font import nametofont
|
||||
from tkinter.ttk import Entry, Frame, PanedWindow, Scrollbar, Style, Treeview
|
||||
|
||||
class Viewer(Frame):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.master.title(type(self).__name__)
|
||||
fontheight = nametofont("TkDefaultFont").metrics("linespace")
|
||||
style = Style()
|
||||
style.configure("Treeview", rowheight=fontheight)
|
||||
self.detached_items = {}
|
||||
self.find_input = StringVar(value="Enter search here")
|
||||
self.tree = None
|
||||
self.item_inspector = None
|
||||
|
||||
def create_widgets(self):
|
||||
find_entry = Entry(textvariable=self.find_input)
|
||||
find_entry.pack(fill=X)
|
||||
find_entry.bind("<Return>", lambda event: self.find())
|
||||
|
||||
pane = PanedWindow(orient=HORIZONTAL)
|
||||
pane.pack(fill=BOTH, expand=True)
|
||||
|
||||
frame = Frame()
|
||||
scrollbar = Scrollbar(frame)
|
||||
scrollbar.pack(side=RIGHT, fill=Y)
|
||||
|
||||
self.tree = Treeview(frame, columns=(None,), yscrollcommand=scrollbar.set)
|
||||
self.tree.tag_configure("match", background="light yellow")
|
||||
self.tree.bind("<<TreeviewSelect>>", self.on_item_select)
|
||||
self.tree.pack(fill=BOTH, expand=True)
|
||||
|
||||
scrollbar.configure(command=self.tree.yview)
|
||||
|
||||
frame = Frame()
|
||||
scrollbar = Scrollbar(frame)
|
||||
scrollbar.pack(side=RIGHT, fill=Y)
|
||||
|
||||
self.item_inspector = Text(frame, font="TkDefaultFont", tabs="4m", yscrollcommand=scrollbar.set)
|
||||
self.item_inspector.insert(END, "Select an item to inspect it.")
|
||||
self.item_inspector.pack()
|
||||
|
||||
scrollbar.configure(command=self.item_inspector.yview)
|
||||
|
||||
def find(self):
|
||||
query = self.find_input.get().lower()
|
||||
for item in self.tree.tag_has("match"):
|
||||
self.tree.item(item, tags=())
|
||||
for parent, detached_children in self.detached_items.items():
|
||||
for i in detached_children:
|
||||
self.tree.reattach(i, parent, END)
|
||||
if query:
|
||||
self.filter_items(query)
|
||||
|
||||
def filter_items(self, query, parent=""):
|
||||
all_children = self.tree.get_children(parent)
|
||||
detached_children = [item for item in all_children if not any(query in i.lower() for i in self.tree.item(item, "values"))] # first, find all children that don't match
|
||||
for item in all_children:
|
||||
if item not in detached_children:
|
||||
self.tree.item(item, tags=("match",))
|
||||
self.tree.see(item)
|
||||
if self.filter_items(query, item) and item in detached_children:
|
||||
detached_children.remove(item) # don't detach if a child matches
|
||||
self.detached_items[parent] = detached_children
|
||||
for i in detached_children:
|
||||
self.tree.detach(i)
|
||||
return len(detached_children) != len(all_children) # return true if any children match
|
||||
|
||||
def sort_column(self, col, reverse, parent=""):
|
||||
children = list(self.tree.get_children(parent))
|
||||
children.sort(key=lambda x: self.tree.set(x, col), reverse=reverse)
|
||||
# rearrange items in sorted positions
|
||||
for index, child in enumerate(children):
|
||||
self.tree.move(child, parent, index)
|
||||
for child in children:
|
||||
self.sort_column(col, reverse, child)
|
||||
if parent == "":
|
||||
# reverse sort next time
|
||||
self.tree.heading(col, command=lambda: self.sort_column(col, not reverse))
|
Loading…
Reference in New Issue
Block a user