mirror of
https://github.com/lcdr/utils.git
synced 2024-08-30 17:32:16 +00:00
Added LIFExtractor for viewing and extracting LIF contents, and refactored viewers.
This commit is contained in:
27
README.md
27
README.md
@ -1,7 +1,22 @@
|
||||
Various utilities.
|
||||
|
||||
Requires Python 3.6.
|
||||
|
||||
Some programs use the bitstream module from my python RakNet implementation for reading binary files. It's available at https://bitbucket.org/lcdr/pyraknet. See the Readme there for how to install it.
|
||||
|
||||
# utils
|
||||
## Utilities for working with LEGO Universe file formats and network packets.
|
||||
### Created by lcdr
|
||||
### Source repository at https://bitbucket.org/lcdr/utils/
|
||||
### License: GPL v3
|
||||
|
||||
### Included utilities:
|
||||
|
||||
* captureviewer - Graphical viewer for parsing and displaying LU network captures. Opens .zip files containing .bin packets in our capture naming format.
|
||||
* luzviewer - Graphical viewer for parsing and displaying LU maps saved as .luz and .lvl files. Can open the .luz files in your LU client.
|
||||
* pkextractor - Graphical viewer and extractor for parsing .pk files (used by LU to pack assets) and displaying their contents. Can extract single files by double-clicking, and can also extract the entire archive to a specified folder.
|
||||
* lifextractor - Graphical viewer and extractor for parsing .lif files (used by LDD to pack assets) and displaying their contents. Can extract single files by double-clicking, and can also extract the entire archive to a specified folder.
|
||||
* fdb_to_sqlite - Command line script to convert the information from the FDB database format used by LU to SQLite.
|
||||
* decompress_sd0 - Command line script to decompress LU's sd0 file format / compression scheme.
|
||||
|
||||
### Requirements:
|
||||
* Python 3.6
|
||||
* https://bitbucket.org/lcdr/pyraknet for some scripts
|
||||
|
||||
### Installation
|
||||
|
||||
`pip install hg+https://bitbucket.org/lcdr/utils` should handle the installation automatically, including installing dependencies. If you run into problems you might have to execute pip as admin, or if you have multiple Python versions installed explicitly use the pip of the compatible Python version.
|
||||
|
@ -1,5 +1,4 @@
|
||||
import configparser
|
||||
import math
|
||||
import glob
|
||||
import os.path
|
||||
import pickle
|
||||
@ -109,8 +108,7 @@ class CaptureObject:
|
||||
self.entry = None
|
||||
|
||||
class CaptureViewer(viewer.Viewer):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
def init(self):
|
||||
config = configparser.ConfigParser()
|
||||
config.read("captureviewer.ini")
|
||||
try:
|
||||
@ -119,7 +117,7 @@ class CaptureViewer(viewer.Viewer):
|
||||
messagebox.showerror("Can not open database", "Make sure db_path in the INI is set correctly.")
|
||||
sys.exit()
|
||||
|
||||
self.create_parsers()
|
||||
self._create_parsers()
|
||||
|
||||
with open("packetdefinitions/gm", "rb") as file:
|
||||
self.gamemsgs = pickle.loads(zlib.decompress(file.read()))
|
||||
@ -133,13 +131,12 @@ class CaptureViewer(viewer.Viewer):
|
||||
self.retry_with_script_component = BooleanVar(value=config["parse"]["retry_with_script_component"])
|
||||
self.retry_with_trigger_component = BooleanVar(value=config["parse"]["retry_with_trigger_component"])
|
||||
self.retry_with_phantom_component = BooleanVar(value=config["parse"]["retry_with_phantom_component"])
|
||||
self.create_widgets()
|
||||
|
||||
def create_parsers(self):
|
||||
def _create_parsers(self):
|
||||
type_handlers = {}
|
||||
type_handlers["object_id"] = self.object_id_handler
|
||||
type_handlers["lot"] = self.lot_handler
|
||||
type_handlers["compressed_ldf"] = self.compressed_ldf_handler
|
||||
type_handlers["object_id"] = self._object_id_handler
|
||||
type_handlers["lot"] = self._lot_handler
|
||||
type_handlers["compressed_ldf"] = self._compressed_ldf_handler
|
||||
|
||||
with open(__file__+"/../packetdefinitions/replica/creation_header.structs", encoding="utf-8") as file:
|
||||
self.creation_header_parser = StructParser(file.read(), type_handlers)
|
||||
@ -162,9 +159,7 @@ class CaptureViewer(viewer.Viewer):
|
||||
|
||||
def create_widgets(self):
|
||||
super().create_widgets()
|
||||
menubar = Menu()
|
||||
menubar.add_command(label="Open", command=self.askopenfiles)
|
||||
parse_menu = Menu(menubar)
|
||||
parse_menu = Menu(self.menubar)
|
||||
parse_menu.add_checkbutton(label="Parse Creations", variable=self.parse_creations)
|
||||
parse_menu.add_checkbutton(label="Parse Serializations", variable=self.parse_serializations)
|
||||
parse_menu.add_checkbutton(label="Parse Game Messages", variable=self.parse_game_messages)
|
||||
@ -172,69 +167,62 @@ class CaptureViewer(viewer.Viewer):
|
||||
parse_menu.add_checkbutton(label="Retry parsing with script component if failed", variable=self.retry_with_script_component)
|
||||
parse_menu.add_checkbutton(label="Retry parsing with trigger component if failed", variable=self.retry_with_trigger_component)
|
||||
parse_menu.add_checkbutton(label="Retry parsing with phantom component if failed", variable=self.retry_with_phantom_component)
|
||||
menubar.add_cascade(label="Parse", menu=parse_menu)
|
||||
self.master.config(menu=menubar)
|
||||
self.menubar.add_cascade(label="Parse", menu=parse_menu)
|
||||
|
||||
columns = "id",
|
||||
self.tree.configure(columns=columns)
|
||||
for col in columns:
|
||||
self.tree.heading(col, text=col, command=(lambda col: lambda: self.sort_column(col, False))(col))
|
||||
self.set_headings("Name", treeheading="Packet", treewidth=1200)
|
||||
self.tree.tag_configure("unexpected", foreground="medium blue")
|
||||
self.tree.tag_configure("assertfail", foreground="orange")
|
||||
self.tree.tag_configure("readerror", background="medium purple")
|
||||
self.tree.tag_configure("error", foreground="red")
|
||||
|
||||
def askopenfiles(self):
|
||||
paths = filedialog.askopenfilenames(filetypes=[("Zip", "*.zip")])
|
||||
if paths:
|
||||
self.load_captures(paths)
|
||||
def askopener(self):
|
||||
return filedialog.askopenfilenames(filetypes=[("Zip", "*.zip")])
|
||||
|
||||
def load_captures(self, captures):
|
||||
self.tree.set_children("")
|
||||
self.detached_items.clear()
|
||||
def load(self, captures) -> None:
|
||||
self.objects = []
|
||||
print("Loading captures, this might take a while")
|
||||
for i, capture in enumerate(captures):
|
||||
print("Loading", capture, "[%i/%i]" % (i+1, len(captures)))
|
||||
with zipfile.ZipFile(capture) as capture:
|
||||
self.set_superbar(self.parse_creations.get()+self.parse_serializations.get()+self.parse_game_messages.get()+self.parse_normal_packets.get())
|
||||
files = [i for i in capture.namelist() if "of" not in i]
|
||||
|
||||
if self.parse_creations.get():
|
||||
for _ in self.step_superbar(self.parse_creations.get(), "Parsing creations"):
|
||||
print("Parsing creations")
|
||||
creations = [i for i in files if "[24]" in i]
|
||||
for packet_name in creations:
|
||||
packet = ReadStream(capture.read(packet_name), unlocked=True)
|
||||
self.parse_creation(packet_name, packet)
|
||||
self._parse_creation(packet_name, packet)
|
||||
|
||||
if self.parse_serializations.get():
|
||||
for _ in self.step_superbar(self.parse_serializations.get(), "Parsing serializations"):
|
||||
print("Parsing serializations")
|
||||
serializations = [i for i in files if "[27]" in i]
|
||||
for packet_name in serializations:
|
||||
packet = ReadStream(capture.read(packet_name)[1:])
|
||||
self.parse_serialization_packet(packet_name, packet)
|
||||
self._parse_serialization_packet(packet_name, packet)
|
||||
|
||||
if self.parse_game_messages.get():
|
||||
for _ in self.step_superbar(self.parse_game_messages.get(), "Parsing game messages"):
|
||||
print("Parsing game messages")
|
||||
game_messages = [i for i in files if "[53-05-00-0c]" in i or "[53-04-00-05]" in i]
|
||||
for packet_name in game_messages:
|
||||
packet = ReadStream(capture.read(packet_name)[8:])
|
||||
self.parse_game_message(packet_name, packet)
|
||||
self._parse_game_message(packet_name, packet)
|
||||
|
||||
if self.parse_normal_packets.get():
|
||||
for _ in self.step_superbar(self.parse_normal_packets.get(), "Parsing normal packets"):
|
||||
print("Parsing normal packets")
|
||||
packets = [i for i in files if "[24]" not in i and "[27]" not in i and "[53-05-00-0c]" not in i and "[53-04-00-05]" not in i]
|
||||
for packet_name in packets:
|
||||
packet = ReadStream(capture.read(packet_name))
|
||||
self.parse_normal_packet(packet_name, packet)
|
||||
self._parse_normal_packet(packet_name, packet)
|
||||
|
||||
def object_id_handler(self, stream):
|
||||
def _object_id_handler(self, stream):
|
||||
object_id = stream.read(c_int64)
|
||||
for obj in self.objects:
|
||||
if object_id == obj.object_id:
|
||||
return str(object_id)+" <"+self.tree.item(obj.entry, "values")[0]+">"
|
||||
return str(object_id)
|
||||
|
||||
def lot_handler(self, stream):
|
||||
def _lot_handler(self, stream):
|
||||
lot = stream.read(c_int)
|
||||
if lot not in self.lot_data:
|
||||
try:
|
||||
@ -246,7 +234,7 @@ class CaptureViewer(viewer.Viewer):
|
||||
lot_name = self.lot_data[lot][0]
|
||||
return "%s - %s" % (lot, lot_name)
|
||||
|
||||
def compressed_ldf_handler(self, stream):
|
||||
def _compressed_ldf_handler(self, stream):
|
||||
size = stream.read(c_uint)
|
||||
is_compressed = stream.read(c_bool)
|
||||
if is_compressed:
|
||||
@ -257,7 +245,7 @@ class CaptureViewer(viewer.Viewer):
|
||||
uncompressed = stream.read(bytes, length=size)
|
||||
return ldf.from_ldf(ReadStream(uncompressed))
|
||||
|
||||
def parse_creation(self, packet_name, packet, retry_with_components=[]):
|
||||
def _parse_creation(self, packet_name, packet, retry_with_components=[]):
|
||||
packet.skip_read(1)
|
||||
has_network_id = packet.read(c_bit)
|
||||
assert has_network_id
|
||||
@ -304,7 +292,7 @@ class CaptureViewer(viewer.Viewer):
|
||||
parser_output.tags.append("error")
|
||||
else:
|
||||
try:
|
||||
self.parse_serialization(packet, parser_output, parsers, is_creation=True)
|
||||
self._parse_serialization(packet, parser_output, parsers, is_creation=True)
|
||||
except (AssertionError, IndexError, struct.error):
|
||||
if retry_with_components:
|
||||
print("retry was not able to resolve parsing error")
|
||||
@ -321,14 +309,14 @@ class CaptureViewer(viewer.Viewer):
|
||||
print("retrying with", retry_with_components, packet_name)
|
||||
del self.lot_data[lot]
|
||||
packet.read_offset = 0
|
||||
self.parse_creation(packet_name, packet, retry_with_components)
|
||||
self._parse_creation(packet_name, packet, retry_with_components)
|
||||
return
|
||||
|
||||
obj = CaptureObject(network_id=network_id, object_id=object_id, lot=lot)
|
||||
self.objects.append(obj)
|
||||
obj.entry = self.tree.insert("", END, text=packet_name, values=(id_, parser_output.text.replace("{", "<crlbrktopen>").replace("}", "<crlbrktclose>").replace("\\", "<backslash>")), tags=parser_output.tags)
|
||||
|
||||
def parse_serialization(self, packet, parser_output, parsers, is_creation=False):
|
||||
def _parse_serialization(self, packet, parser_output, parsers, is_creation=False):
|
||||
parser_output.append(self.serialization_header_parser.parse(packet))
|
||||
for name, parser in parsers.items():
|
||||
parser_output.text += "\n"+name+"\n\n"
|
||||
@ -336,7 +324,7 @@ class CaptureViewer(viewer.Viewer):
|
||||
if not packet.all_read():
|
||||
raise IndexError("Not completely read, %i bytes unread" % len(packet.read_remaining()))
|
||||
|
||||
def parse_serialization_packet(self, packet_name, packet):
|
||||
def _parse_serialization_packet(self, packet_name, packet):
|
||||
network_id = packet.read(c_ushort)
|
||||
obj = None
|
||||
for i in self.objects:
|
||||
@ -356,14 +344,14 @@ class CaptureViewer(viewer.Viewer):
|
||||
|
||||
parser_output = ParserOutput()
|
||||
with parser_output:
|
||||
self.parse_serialization(packet, parser_output, parsers)
|
||||
self._parse_serialization(packet, parser_output, parsers)
|
||||
if error is not None:
|
||||
parser_output.tags.append("error")
|
||||
else:
|
||||
error = ""
|
||||
self.tree.insert(obj.entry, END, text=packet_name, values=(error, parser_output.text.replace("{", "<crlbrktopen>").replace("}", "<crlbrktclose>").replace("\\", "<backslash>")), tags=parser_output.tags)
|
||||
|
||||
def parse_game_message(self, packet_name, packet):
|
||||
def _parse_game_message(self, packet_name, packet):
|
||||
object_id = packet.read(c_int64)
|
||||
for i in self.objects:
|
||||
if i.object_id == object_id:
|
||||
@ -530,7 +518,7 @@ class CaptureViewer(viewer.Viewer):
|
||||
if packet.read(c_bit):
|
||||
item["unknown3"] = packet.read(c_uint)
|
||||
if packet.read(c_bit):
|
||||
item["extra_info"] = self.compressed_ldf_handler(packet)
|
||||
item["extra_info"] = self._compressed_ldf_handler(packet)
|
||||
item["unknown4"] = packet.read(c_bit)
|
||||
items.append(item)
|
||||
param_values["items"] = items
|
||||
@ -555,7 +543,7 @@ class CaptureViewer(viewer.Viewer):
|
||||
if packet.read(c_bit):
|
||||
item["unknown2"] = packet.read(c_uint)
|
||||
if packet.read(c_bit):
|
||||
item["extra_info"] = self.compressed_ldf_handler(packet)
|
||||
item["extra_info"] = self._compressed_ldf_handler(packet)
|
||||
item["unknown3"] = packet.read(c_bit)
|
||||
items.append(item)
|
||||
param_values["items"] = items
|
||||
@ -673,7 +661,7 @@ class CaptureViewer(viewer.Viewer):
|
||||
values = (msg_name, "\n".join(["%s = %s" % (a, pprint.pformat(b)) for a, b in param_values.items()]))
|
||||
self.tree.insert(entry, END, text=packet_name, values=values, tags=tags)
|
||||
|
||||
def parse_normal_packet(self, packet_name, packet):
|
||||
def _parse_normal_packet(self, packet_name, packet):
|
||||
id_ = packet_name[packet_name.index("[")+1:packet_name.index("]")]
|
||||
if id_ not in self.norm_parser:
|
||||
self.tree.insert("", END, text=packet_name, values=(id_, "Add the struct definition file packetdefinitions/"+id_+".structs to enable parsing of this packet."), tags=["error"])
|
||||
|
84
extractor.py
Normal file
84
extractor.py
Normal file
@ -0,0 +1,84 @@
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import tkinter.filedialog as filedialog
|
||||
from tkinter import END, X
|
||||
from typing import Set
|
||||
|
||||
import viewer
|
||||
|
||||
class Extractor(viewer.Viewer):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.records = {}
|
||||
|
||||
def create_widgets(self) -> None:
|
||||
super().create_widgets(create_inspector=False)
|
||||
self.menubar.add_command(label="Extract Selected", command=self._extract_selected)
|
||||
|
||||
self.tree.bind("<Double-Button-1>", self._show_selected)
|
||||
self.tree.bind("<Return>", self._show_selected)
|
||||
|
||||
self.set_headings("Size (Bytes)", treeheading="Filename", treewidth=1600)
|
||||
|
||||
def tree_insert_path(self, path: str, values=()) -> None:
|
||||
dir, filename = os.path.split(path)
|
||||
if not self.tree.exists(dir):
|
||||
self.tree_insert_path(dir)
|
||||
self.tree.insert(dir, END, iid=path, text=filename, values=values)
|
||||
|
||||
def askopener(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def load(self, path: str) -> None:
|
||||
self.records.clear()
|
||||
|
||||
def extract_data(self, path: str) -> bytes:
|
||||
raise NotImplementedError
|
||||
|
||||
def _show_selected(self, _) -> None:
|
||||
if len(self.tree.selection()) > 10:
|
||||
return
|
||||
for path in self.tree.selection():
|
||||
if self.tree.get_children(path):
|
||||
continue # is directory
|
||||
|
||||
data = self.extract_data(path)
|
||||
tempfile_path = os.path.join(tempfile.gettempdir(), os.path.basename(path))
|
||||
with open(tempfile_path, "wb") as file:
|
||||
file.write(data)
|
||||
|
||||
if sys.platform == "win32":
|
||||
os.startfile(tempfile_path)
|
||||
else:
|
||||
opener = "open" if sys.platform == "darwin" else "xdg-open"
|
||||
subprocess.call([opener, tempfile_path])
|
||||
|
||||
def _extract_selected(self) -> None:
|
||||
outdir = filedialog.askdirectory(title="Select output directory")
|
||||
if not outdir:
|
||||
return
|
||||
paths = set()
|
||||
for path in self.tree.selection():
|
||||
paths.update(self._get_leaves(path))
|
||||
|
||||
for path in self.step_superbar(paths, "Extracting files"):
|
||||
self._save_path(outdir, path)
|
||||
|
||||
def _save_path(self, outdir: str, path: str) -> None:
|
||||
data = self.extract_data(path)
|
||||
dir, filename = os.path.split(path)
|
||||
out = os.path.join(outdir, dir)
|
||||
os.makedirs(out, exist_ok=True)
|
||||
with open(os.path.join(out, filename), "wb") as file:
|
||||
file.write(data)
|
||||
|
||||
def _get_leaves(self, path: str) -> Set[str]:
|
||||
output = set()
|
||||
if self.tree.get_children(path):
|
||||
for child in self.tree.get_children(path):
|
||||
output.update(self._get_leaves(child))
|
||||
elif path in self.records:
|
||||
output.add(path)
|
||||
return output
|
132
lifextractor.pyw
Normal file
132
lifextractor.pyw
Normal file
@ -0,0 +1,132 @@
|
||||
import datetime
|
||||
import enum
|
||||
import io
|
||||
import os
|
||||
import struct
|
||||
|
||||
import tkinter.filedialog as filedialog
|
||||
|
||||
import extractor
|
||||
from pyraknet.bitstream import ReadStream, UnsignedIntStruct
|
||||
|
||||
class be_ushort(UnsignedIntStruct):
|
||||
_struct = struct.Struct(">H")
|
||||
|
||||
class be_uint(UnsignedIntStruct):
|
||||
_struct = struct.Struct(">I")
|
||||
|
||||
class be_uint64(UnsignedIntStruct):
|
||||
_struct = struct.Struct(">Q")
|
||||
|
||||
class Enum1(enum.Enum):
|
||||
Root = 1
|
||||
Unknown = 2
|
||||
Directory = 3
|
||||
File = 4
|
||||
Metadata = 5
|
||||
|
||||
class LIFExtractor(extractor.Extractor):
|
||||
def askopener(self):
|
||||
return filedialog.askopenfilename(filetypes=[("LIF", "*.lif")])
|
||||
|
||||
def load(self, path: str) -> None:
|
||||
super().load(path)
|
||||
self.lif_path = path
|
||||
self.current_file_data_offset = 84
|
||||
with open(path, "rb") as file:
|
||||
header = file.read(4)
|
||||
if header != b"LIFF":
|
||||
raise ValueError("Not a LIF file")
|
||||
header = ReadStream(file.read(14))
|
||||
lifsize = header.read(be_uint64)
|
||||
assert header.read(be_ushort) == 1
|
||||
assert header.read(be_uint) == 0
|
||||
self._read_part(file, 0)
|
||||
|
||||
assert file.tell() == lifsize
|
||||
|
||||
self.set_headings("Size (Bytes)", "Creation time?", "Last modification time?", "Last access time?", treeheading="Filename")
|
||||
|
||||
for filename in sorted(self.records.keys()):
|
||||
self.tree_insert_path(filename, self.records[filename][1:])
|
||||
|
||||
def _read_part(self, file, level):
|
||||
start = file.tell()
|
||||
stream = ReadStream(file.read(20))
|
||||
assert stream.read(be_ushort) == 1
|
||||
entry_type = Enum1(stream.read(be_ushort))
|
||||
size = stream.read(be_uint64)
|
||||
uint1 = stream.read(be_uint)
|
||||
if entry_type in (Enum1.Unknown, Enum1.File, Enum1.Metadata):
|
||||
assert uint1 == 1
|
||||
else:
|
||||
assert uint1 == 0
|
||||
if entry_type != Enum1.File:
|
||||
print(" "*level+entry_type.name)
|
||||
assert stream.read(be_uint) == 0
|
||||
if entry_type == Enum1.Unknown:
|
||||
t2stream = ReadStream(file.read(6))
|
||||
assert t2stream.read(be_ushort) == 1
|
||||
assert t2stream.read(be_uint) == 0
|
||||
elif entry_type == Enum1.File:
|
||||
file.seek(size - 20, io.SEEK_CUR) # skip file content
|
||||
elif entry_type == Enum1.Metadata:
|
||||
self.lif = ReadStream(file.read(size - 20))
|
||||
assert self.lif.read(be_ushort) == 1
|
||||
self._read_dir()
|
||||
assert self.lif.all_read()
|
||||
if uint1 == 0:
|
||||
while file.tell() - start < size:
|
||||
self._read_part(file, level+1)
|
||||
|
||||
def _read_direntry(self):
|
||||
something = self.lif.read(be_uint)
|
||||
string = b""
|
||||
while True:
|
||||
char = self.lif.read(bytes, length=2)
|
||||
if char == b"\0\0":
|
||||
break
|
||||
string += char
|
||||
name = string.decode("utf-16-be")
|
||||
size = self.lif.read(be_uint64)
|
||||
return something, name, size
|
||||
|
||||
def _convert_time(self, wintime):
|
||||
microseconds = wintime / 10
|
||||
return str(datetime.datetime(1601, 1, 1) + datetime.timedelta(microseconds=microseconds))
|
||||
|
||||
def _read_dir(self, dirname=""):
|
||||
something, name, size = self._read_direntry()
|
||||
dirname = os.path.join(dirname, name)
|
||||
if dirname == "":
|
||||
assert something == 0 # root
|
||||
else:
|
||||
assert something == 7 # directory
|
||||
assert size == 20
|
||||
for _ in range(self.lif.read(be_uint)):
|
||||
entry_type = self.lif.read(be_ushort) # 1 = directory, 2 = file
|
||||
self.current_file_data_offset += 20
|
||||
if entry_type == 1:
|
||||
self._read_dir(dirname)
|
||||
elif entry_type == 2:
|
||||
something, name, size = self._read_direntry()
|
||||
assert something in (5 , 7) # 7 if .lif or directory, 5 if otherwise?
|
||||
t1 = self._convert_time(self.lif.read(be_uint64))
|
||||
t2 = self._convert_time(self.lif.read(be_uint64))
|
||||
t3 = self._convert_time(self.lif.read(be_uint64))
|
||||
self.records[os.path.join(dirname, name)] = self.current_file_data_offset, size - 20, t1, t2, t3
|
||||
self.current_file_data_offset += size - 20
|
||||
|
||||
else:
|
||||
raise ValueError(entry_type)
|
||||
|
||||
def extract_data(self, path: str) -> bytes:
|
||||
file_offset, file_size = self.records[path][:2]
|
||||
|
||||
with open(self.lif_path, "rb") as file:
|
||||
file.seek(file_offset)
|
||||
return file.read(file_size)
|
||||
|
||||
if __name__ == "__main__":
|
||||
app = LIFExtractor()
|
||||
app.mainloop()
|
@ -6,7 +6,7 @@ import sys
|
||||
|
||||
import tkinter.filedialog as filedialog
|
||||
import tkinter.messagebox as messagebox
|
||||
from tkinter import END, Menu
|
||||
from tkinter import END
|
||||
|
||||
import viewer
|
||||
from pyraknet.bitstream import c_bool, c_float, c_int, c_int64, c_ubyte, c_uint, c_uint64, c_ushort, ReadStream
|
||||
@ -27,8 +27,7 @@ class PathBehavior(enum.IntEnum):
|
||||
Once = 2
|
||||
|
||||
class LUZViewer(viewer.Viewer):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
def init(self):
|
||||
config = configparser.ConfigParser()
|
||||
config.read("luzviewer.ini")
|
||||
try:
|
||||
@ -36,22 +35,17 @@ class LUZViewer(viewer.Viewer):
|
||||
except:
|
||||
messagebox.showerror("Can not open database", "Make sure db_path in the INI is set correctly.")
|
||||
sys.exit()
|
||||
self.create_widgets()
|
||||
|
||||
def create_widgets(self):
|
||||
super().create_widgets()
|
||||
menubar = Menu()
|
||||
menubar.add_command(label="Open", command=self.askopenfile)
|
||||
self.master.config(menu=menubar)
|
||||
self.set_headings(treeheading="Type", treewidth=1200)
|
||||
|
||||
def askopenfile(self):
|
||||
path = filedialog.askopenfilename(filetypes=[("LEGO Universe Zone", "*.luz")])
|
||||
if path:
|
||||
self.load_luz(path)
|
||||
def askopener(self):
|
||||
return filedialog.askopenfilename(filetypes=[("LEGO Universe Zone", "*.luz")])
|
||||
|
||||
def load_luz(self, luz_path):
|
||||
self.tree.set_children("")
|
||||
def load(self, luz_path: str) -> None:
|
||||
print("Loading", luz_path)
|
||||
self.set_superbar(2)
|
||||
with open(luz_path, "rb") as file:
|
||||
data = file.read()
|
||||
luz_len = len(data)
|
||||
@ -76,7 +70,7 @@ class LUZViewer(viewer.Viewer):
|
||||
else:
|
||||
number_of_scenes = stream.read(c_ubyte)
|
||||
|
||||
for _ in range(number_of_scenes):
|
||||
for _ in self.step_superbar(number_of_scenes, "Loading Scenes"):
|
||||
filename = stream.read(bytes, length_type=c_ubyte).decode("latin1")
|
||||
scene_id = stream.read(c_uint64)
|
||||
scene_name = stream.read(bytes, length_type=c_ubyte).decode("latin1")
|
||||
@ -87,7 +81,7 @@ class LUZViewer(viewer.Viewer):
|
||||
with open(lvl_path, "rb") as lvl:
|
||||
print("Loading lvl", filename)
|
||||
try:
|
||||
self.parse_lvl(ReadStream(lvl.read(), unlocked=True), scene)
|
||||
self._parse_lvl(ReadStream(lvl.read(), unlocked=True), scene)
|
||||
except Exception:
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
@ -122,7 +116,8 @@ class LUZViewer(viewer.Viewer):
|
||||
|
||||
### paths
|
||||
paths = self.tree.insert(zone, END, text="Paths")
|
||||
for _ in range(stream.read(c_uint)):
|
||||
paths_count = stream.read(c_uint)
|
||||
for _ in self.step_superbar(paths_count, "Loading Paths"):
|
||||
path_version = stream.read(c_uint)
|
||||
name = stream.read(str, length_type=c_ubyte)
|
||||
path_type = stream.read(c_uint)
|
||||
@ -228,7 +223,7 @@ class LUZViewer(viewer.Viewer):
|
||||
config_type_and_value = stream.read(str, length_type=c_ubyte)
|
||||
self.tree.insert(waypoint, END, text="Config", values=(config_name, config_type_and_value))
|
||||
|
||||
def parse_lvl(self, stream, scene):
|
||||
def _parse_lvl(self, stream, scene):
|
||||
header = stream.read(bytes, length=4)
|
||||
stream.read_offset = 0
|
||||
if header == b"CHNK":
|
||||
@ -250,15 +245,15 @@ class LUZViewer(viewer.Viewer):
|
||||
elif chunk_type == 2000:
|
||||
pass
|
||||
elif chunk_type == 2001:
|
||||
self.lvl_parse_chunk_type_2001(stream, scene)
|
||||
self._lvl_parse_chunk_type_2001(stream, scene)
|
||||
elif chunk_type == 2002:
|
||||
pass
|
||||
stream.read_offset = (start_pos + chunk_length) * 8 # go to the next CHNK
|
||||
else:
|
||||
self.parse_old_lvl_header(stream)
|
||||
self.lvl_parse_chunk_type_2001(stream, scene)
|
||||
self._parse_old_lvl_header(stream)
|
||||
self._lvl_parse_chunk_type_2001(stream, scene)
|
||||
|
||||
def parse_old_lvl_header(self, stream):
|
||||
def _parse_old_lvl_header(self, stream):
|
||||
version = stream.read(c_ushort)
|
||||
assert stream.read(c_ushort) == version
|
||||
stream.read(c_ubyte)
|
||||
@ -301,7 +296,7 @@ class LUZViewer(viewer.Viewer):
|
||||
for _ in range(stream.read(c_uint)):
|
||||
stream.read(c_float), stream.read(c_float), stream.read(c_float)
|
||||
|
||||
def lvl_parse_chunk_type_2001(self, stream, scene):
|
||||
def _lvl_parse_chunk_type_2001(self, stream, scene):
|
||||
for _ in range(stream.read(c_uint)):
|
||||
object_id = stream.read(c_int64) # seems like the object id, but without some bits
|
||||
lot = stream.read(c_uint)
|
||||
@ -339,16 +334,9 @@ class LUZViewer(viewer.Viewer):
|
||||
cols = "Object ID", "LOT", "unknown1", "unknown2", "Position", "Rotation", "Scale"
|
||||
elif item_type == "Spawner":
|
||||
cols = "Path Version", "Name", "unknown1", "Behavior", "Spawned LOT", "Respawn Time", "Max to Spawn", "Num to maintain", "Object ID", "Activate on load"
|
||||
|
||||
else:
|
||||
cols = ()
|
||||
if cols:
|
||||
self.tree.configure(columns=cols)
|
||||
colwidth = self.tree.winfo_width() // (len(cols)+1)
|
||||
self.tree.column("#0", width=colwidth)
|
||||
for i, col in enumerate(cols):
|
||||
self.tree.heading(col, text=col, command=(lambda col: lambda: self.sort_column(col, False))(col))
|
||||
self.tree.column(i, width=colwidth)
|
||||
self.set_headings(*cols)
|
||||
self.item_inspector.delete(1.0, END)
|
||||
self.item_inspector.insert(END, "\n".join(self.tree.item(item, "values")))
|
||||
|
||||
|
@ -11,7 +11,7 @@ Index 2 ($+863BD0):
|
||||
|
||||
Index 3 ($+7DC480):
|
||||
[bit] - flag
|
||||
[bit] - ???
|
||||
[bit] - ???, expect == False
|
||||
[bit] - ???
|
||||
|
||||
Index 4 ($+8A3A40):
|
||||
@ -70,15 +70,15 @@ if creation:
|
||||
[bit] - is player landing by rocket
|
||||
[u16-wstring] - LDF info of rocket modules
|
||||
[bit] - flag
|
||||
[bit] - flag?, assert == False
|
||||
[bit] - ???, assert == False
|
||||
[u8] - ???, assert == 0
|
||||
[bit] - PVP flag, assert == False
|
||||
[bit] - is gm, expect == False
|
||||
[u8] - gmlevel, expect == 0
|
||||
[bit] - ???, assert == False
|
||||
[u8] - ???, assert == 0
|
||||
[bit] - flag
|
||||
[u32] - if this is 1 the character's head glows, expect in (0, 1)
|
||||
[bit] - flag (this and below was in a separate function in the code)
|
||||
[s64] - ???
|
||||
[u8] - ??? (count for next struct?)
|
||||
[bit] - flag
|
||||
[s64] - guild (leader?) id, assert == 0
|
||||
[u8-wstring] - guild name, assert == ""
|
||||
[bit] - ???
|
||||
[s32] - ???
|
||||
|
@ -2,7 +2,7 @@ Component 1 - ControllablePhysics (tested using LOT 1)
|
||||
Index 1 ($+845770):
|
||||
if creation:
|
||||
[bit] - flag
|
||||
[u32] - ???
|
||||
[u32] - jetpack effect id
|
||||
[bit] - ???
|
||||
[bit] - ???
|
||||
[bit] - flag
|
||||
|
@ -31,8 +31,8 @@ if creation:
|
||||
[bit] - flag
|
||||
[bit] - flag, assert == False
|
||||
if trigger:
|
||||
[bit] - ???, assert == False
|
||||
[bit] - ???, expect == False
|
||||
[bit] - flag
|
||||
[u32] - ???
|
||||
[float] - ???
|
||||
[bit] - flag
|
||||
[bit] - ???
|
||||
|
@ -27,6 +27,13 @@ Index 1 ($+7FD4D0)
|
||||
[float] - ???
|
||||
[float] - ???
|
||||
[float] - ???
|
||||
|
||||
[bit] - flag
|
||||
[float] - ???
|
||||
[float] - ???
|
||||
[bit] - ???
|
||||
[bit] - ???
|
||||
[float] - ???
|
||||
if not creation:
|
||||
[bit] - flag?
|
||||
if creation:
|
||||
|
109
pkextractor.pyw
Normal file
109
pkextractor.pyw
Normal file
@ -0,0 +1,109 @@
|
||||
import hashlib
|
||||
import os
|
||||
import struct
|
||||
|
||||
import tkinter.filedialog as filedialog
|
||||
|
||||
import decompress_sd0
|
||||
import extractor
|
||||
from pyraknet.bitstream import c_bool, c_int, c_ubyte, c_uint, ReadStream
|
||||
|
||||
class PKExtractor(extractor.Extractor):
|
||||
def askopener(self):
|
||||
return filedialog.askdirectory(title="Select LU root folder (containing /client/, /versions/)")
|
||||
|
||||
def load(self, path: str) -> None:
|
||||
super().load(path)
|
||||
filenames = {}
|
||||
|
||||
for filename in ("trunk.txt", "hotfix.txt"):
|
||||
filenames.update(self._load_filehashes(os.path.join(path, "versions", filename)))
|
||||
print("Loaded hashes")
|
||||
pks = []
|
||||
for dir, _, files in os.walk(os.path.join(path, "client/res/pack")):
|
||||
for file in files:
|
||||
if file.endswith(".pk"):
|
||||
pks.append(os.path.join(dir, file))
|
||||
|
||||
for pk in self.step_superbar(pks, "Loading pack files"):
|
||||
self._load_pk(pk, filenames)
|
||||
|
||||
for filename in sorted(self.records.keys()):
|
||||
self.tree_insert_path(filename, self.records[filename][3])
|
||||
|
||||
def _load_filehashes(self, path: str):
|
||||
filenames = {}
|
||||
with open(path) as file:
|
||||
for line in file.read().splitlines()[3:]:
|
||||
values = line.split(",")
|
||||
filenames[values[2]] = values[0]
|
||||
return filenames
|
||||
|
||||
def _load_pki(self, path: str):
|
||||
# unused, alternate way to get the list of pks
|
||||
with open(path, "rb") as file:
|
||||
stream = ReadStream(file.read())
|
||||
|
||||
assert stream.read(c_uint) == 3
|
||||
pack_files = []
|
||||
for _ in range(stream.read(c_uint)):
|
||||
pack_files.append(stream.read(bytes, length_type=c_uint).decode("latin1"))
|
||||
|
||||
for _ in range(stream.read(c_uint)):
|
||||
stream.skip_read(20)
|
||||
|
||||
assert stream.all_read()
|
||||
return pack_files
|
||||
|
||||
def _load_pk(self, path: str, filenames) -> None:
|
||||
with open(path, "rb") as file:
|
||||
assert file.read(7) == b"ndpk\x01\xff\x00"
|
||||
file.seek(-8, 2)
|
||||
number_of_records_address = struct.unpack("I", file.read(4))[0]
|
||||
unknown = struct.unpack("I", file.read(4))[0]
|
||||
if unknown != 0:
|
||||
print(unknown, path)
|
||||
file.seek(number_of_records_address)
|
||||
data = ReadStream(file.read()[:-8])
|
||||
|
||||
number_of_records = data.read(c_uint)
|
||||
for _ in range(number_of_records):
|
||||
pk_index = data.read(c_uint)
|
||||
unknown1 = data.read(c_int)
|
||||
unknown2 = data.read(c_int)
|
||||
original_size = data.read(c_uint)
|
||||
original_md5 = data.read(bytes, length=32).decode()
|
||||
unknown3 = data.read(c_uint)
|
||||
compressed_size = data.read(c_uint)
|
||||
compressed_md5 = data.read(bytes, length=32).decode()
|
||||
unknown4 = data.read(c_uint)
|
||||
data_position = data.read(c_uint)
|
||||
is_compressed = data.read(c_bool)
|
||||
unknown5 = data.read(c_ubyte)
|
||||
unknown6 = data.read(c_ubyte)
|
||||
unknown7 = data.read(c_ubyte)
|
||||
if original_md5 not in filenames:
|
||||
filenames[original_md5] = "unlisted/"+original_md5
|
||||
self.records[filenames[original_md5]] = path, data_position, is_compressed, original_size, original_md5, compressed_size, compressed_md5
|
||||
|
||||
def extract_data(self, path: str) -> bytes:
|
||||
pk_path, data_position, is_compressed, original_size, original_md5, compressed_size, compressed_md5 = self.records[path]
|
||||
|
||||
with open(pk_path, "rb") as file:
|
||||
file.seek(data_position)
|
||||
if is_compressed:
|
||||
data = file.read(compressed_size)
|
||||
else:
|
||||
data = file.read(original_size)
|
||||
assert file.read(5) == b"\xff\x00\x00\xdd\x00"
|
||||
|
||||
if is_compressed:
|
||||
assert hashlib.md5(data).hexdigest() == compressed_md5
|
||||
data = decompress_sd0.decompress(data)
|
||||
|
||||
assert hashlib.md5(data).hexdigest() == original_md5
|
||||
return data
|
||||
|
||||
if __name__ == "__main__":
|
||||
app = PKExtractor()
|
||||
app.mainloop()
|
210
pkviewer.pyw
210
pkviewer.pyw
@ -1,210 +0,0 @@
|
||||
import hashlib
|
||||
import os
|
||||
import struct
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
|
||||
import tkinter.filedialog as filedialog
|
||||
from tkinter import BOTH, END, Menu, RIGHT, X, Y
|
||||
from tkinter.ttk import Entry, Progressbar, Scrollbar, Treeview
|
||||
|
||||
import decompress_sd0
|
||||
import viewer
|
||||
from pyraknet.bitstream import c_bool, c_int, c_ubyte, c_uint, ReadStream
|
||||
|
||||
class PKViewer(viewer.Viewer):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.create_widgets()
|
||||
|
||||
def create_widgets(self):
|
||||
find_entry = Entry(textvariable=self.find_input)
|
||||
find_entry.pack(fill=X)
|
||||
find_entry.bind("<Return>", self.find)
|
||||
|
||||
scrollbar = Scrollbar()
|
||||
scrollbar.pack(side=RIGHT, fill=Y)
|
||||
|
||||
self.tree = Treeview(columns=(None,), yscrollcommand=scrollbar.set)
|
||||
self.tree.tag_configure("match", background="light yellow")
|
||||
self.tree.pack(fill=BOTH, expand=True)
|
||||
|
||||
scrollbar.configure(command=self.tree.yview)
|
||||
|
||||
menubar = Menu()
|
||||
menubar.add_command(label="Open", command=self.askopen)
|
||||
menubar.add_command(label="Extract Selected", command=self.extract_selected)
|
||||
self.master.config(menu=menubar)
|
||||
|
||||
columns = "Size",
|
||||
self.tree.configure(columns=columns)
|
||||
for col in columns:
|
||||
self.tree.heading(col, text=col, command=(lambda col: lambda: self.sort_column(col, False))(col))
|
||||
|
||||
self.tree.bind("<Double-Button-1>", self.extract_and_show_selected)
|
||||
self.tree.bind("<Return>", self.extract_and_show_selected)
|
||||
|
||||
def askopen(self):
|
||||
dir = filedialog.askdirectory(title="Select LU root folder (containing /client/, /versions/)")
|
||||
if dir:
|
||||
self.load(dir)
|
||||
|
||||
def load(self, dir):
|
||||
self.filenames = {}
|
||||
self.records = {}
|
||||
self.reattach_all()
|
||||
self.tree.delete(*self.tree.get_children())
|
||||
self.progress = Progressbar()
|
||||
self.progress.pack(fill=X)
|
||||
|
||||
for filename in ("trunk.txt", "hotfix.txt"):
|
||||
self.load_filehashes(os.path.join(dir, "versions", filename))
|
||||
print("Loaded hashes")
|
||||
pks = []
|
||||
for dir, _, filenames in os.walk(os.path.join(dir, "client/res/pack")):
|
||||
for filename in filenames:
|
||||
if filename.endswith(".pk"):
|
||||
pks.append(os.path.join(dir, filename))
|
||||
|
||||
self.progress.configure(maximum=len(pks)+1)
|
||||
for pk in pks:
|
||||
self.load_pk(pk)
|
||||
self.progress.step()
|
||||
self.update()
|
||||
print("Loaded records")
|
||||
|
||||
for filename in sorted(self.records.keys()):
|
||||
self.create_tree(filename, self.records[filename][3])
|
||||
self.progress.pack_forget()
|
||||
|
||||
def create_tree(self, path, values=()):
|
||||
dir, filename = os.path.split(path)
|
||||
if not self.tree.exists(dir):
|
||||
self.create_tree(dir)
|
||||
self.tree.insert(dir, END, iid=path, text=filename, values=values)
|
||||
|
||||
def load_filehashes(self, path):
|
||||
with open(path) as file:
|
||||
for line in file.read().splitlines()[3:]:
|
||||
values = line.split(",")
|
||||
self.filenames[values[2]] = values[0]
|
||||
|
||||
def load_pki(self, path):
|
||||
# unused, alternate way to get the list of pks
|
||||
with open(path, "rb") as file:
|
||||
stream = ReadStream(file.read())
|
||||
|
||||
assert stream.read(c_uint) == 3
|
||||
pack_files = []
|
||||
for _ in range(stream.read(c_uint)):
|
||||
pack_files.append(stream.read(bytes, length_type=c_uint).decode("latin1"))
|
||||
|
||||
for _ in range(stream.read(c_uint)):
|
||||
stream.skip_read(20)
|
||||
|
||||
assert stream.all_read()
|
||||
return pack_files
|
||||
|
||||
def load_pk(self, path):
|
||||
with open(path, "rb") as file:
|
||||
assert file.read(7) == b"ndpk\x01\xff\x00"
|
||||
file.seek(-8, 2)
|
||||
number_of_records_address = struct.unpack("I", file.read(4))[0]
|
||||
unknown = struct.unpack("I", file.read(4))[0]
|
||||
if unknown != 0:
|
||||
print(unknown, path)
|
||||
file.seek(number_of_records_address)
|
||||
data = ReadStream(file.read()[:-8])
|
||||
|
||||
number_of_records = data.read(c_uint)
|
||||
for _ in range(number_of_records):
|
||||
pk_index = data.read(c_uint)
|
||||
unknown1 = data.read(c_int)
|
||||
unknown2 = data.read(c_int)
|
||||
original_size = data.read(c_uint)
|
||||
original_md5 = data.read(bytes, length=32).decode()
|
||||
unknown3 = data.read(c_uint)
|
||||
compressed_size = data.read(c_uint)
|
||||
compressed_md5 = data.read(bytes, length=32).decode()
|
||||
unknown4 = data.read(c_uint)
|
||||
data_position = data.read(c_uint)
|
||||
is_compressed = data.read(c_bool)
|
||||
unknown5 = data.read(c_ubyte)
|
||||
unknown6 = data.read(c_ubyte)
|
||||
unknown7 = data.read(c_ubyte)
|
||||
if original_md5 not in self.filenames:
|
||||
self.filenames[original_md5] = "unlisted/"+original_md5
|
||||
self.records[self.filenames[original_md5]] = path, data_position, is_compressed, original_size, original_md5, compressed_size, compressed_md5
|
||||
|
||||
def extract_path(self, path):
|
||||
pk_path, data_position, is_compressed, original_size, original_md5, compressed_size, compressed_md5 = self.records[path]
|
||||
|
||||
with open(pk_path, "rb") as file:
|
||||
file.seek(data_position)
|
||||
if is_compressed:
|
||||
data = file.read(compressed_size)
|
||||
else:
|
||||
data = file.read(original_size)
|
||||
assert file.read(5) == b"\xff\x00\x00\xdd\x00"
|
||||
|
||||
if is_compressed:
|
||||
assert hashlib.md5(data).hexdigest() == compressed_md5
|
||||
data = decompress_sd0.decompress(data)
|
||||
|
||||
assert hashlib.md5(data).hexdigest() == original_md5
|
||||
return data
|
||||
|
||||
def extract_and_show_selected(self, _):
|
||||
for path in self.tree.selection():
|
||||
if self.tree.get_children(path):
|
||||
continue # is directory
|
||||
|
||||
data = self.extract_path(path)
|
||||
tempfile_path = os.path.join(tempfile.gettempdir(), os.path.basename(path))
|
||||
with open(tempfile_path, "wb") as file:
|
||||
file.write(data)
|
||||
|
||||
if sys.platform == "win32":
|
||||
os.startfile(tempfile_path)
|
||||
else:
|
||||
opener = "open" if sys.platform == "darwin" else "xdg-open"
|
||||
subprocess.call([opener, tempfile_path])
|
||||
|
||||
def extract_selected(self):
|
||||
outdir = filedialog.askdirectory(title="Select output directory")
|
||||
if not outdir:
|
||||
return
|
||||
paths = set()
|
||||
for path in self.tree.selection():
|
||||
paths.update(self.get_leaves(path))
|
||||
|
||||
self.progress = Progressbar(maximum=len(paths)+1)
|
||||
self.progress.pack(fill=X)
|
||||
for path in paths:
|
||||
self.save_path(outdir, path)
|
||||
self.progress.pack_forget()
|
||||
|
||||
def save_path(self, outdir, path):
|
||||
data = self.extract_path(path)
|
||||
dir, filename = os.path.split(path)
|
||||
out = os.path.join(outdir, dir)
|
||||
os.makedirs(out, exist_ok=True)
|
||||
with open(os.path.join(out, filename), "wb") as file:
|
||||
file.write(data)
|
||||
|
||||
self.progress.step()
|
||||
self.update()
|
||||
|
||||
def get_leaves(self, path):
|
||||
output = set()
|
||||
if self.tree.get_children(path):
|
||||
for child in self.tree.get_children(path):
|
||||
output.update(self.get_leaves(child))
|
||||
elif path in self.records:
|
||||
output.add(path)
|
||||
return output
|
||||
|
||||
if __name__ == "__main__":
|
||||
app = PKViewer()
|
||||
app.mainloop()
|
127
viewer.py
127
viewer.py
@ -1,6 +1,6 @@
|
||||
from tkinter import BOTH, END, HORIZONTAL, RIGHT, StringVar, Text, X, Y
|
||||
from tkinter import BOTH, BOTTOM, END, HORIZONTAL, LEFT, Menu, NSEW, RIGHT, StringVar, Text, TOP, X, Y
|
||||
from tkinter.font import nametofont
|
||||
from tkinter.ttk import Entry, Frame, PanedWindow, Scrollbar, Style, Treeview
|
||||
from tkinter.ttk import Entry, Frame, Label, PanedWindow, Progressbar, Scrollbar, Style, Treeview
|
||||
|
||||
class Viewer(Frame):
|
||||
def __init__(self):
|
||||
@ -9,15 +9,25 @@ class Viewer(Frame):
|
||||
fontheight = nametofont("TkDefaultFont").metrics("linespace")
|
||||
style = Style()
|
||||
style.configure("Treeview", rowheight=fontheight)
|
||||
style.configure("Superbar.Horizontal.TProgressbar", foreground="red", background="red")
|
||||
self.detached_items = {}
|
||||
self.find_input = StringVar(value="Enter search here")
|
||||
self.tree = None
|
||||
self.item_inspector = None
|
||||
self.init()
|
||||
self.create_widgets()
|
||||
|
||||
def init(self) -> None:
|
||||
pass
|
||||
|
||||
def create_widgets(self, create_inspector: bool=True) -> None:
|
||||
self.menubar = Menu()
|
||||
self.menubar.add_command(label="Open", command=self._askopen)
|
||||
self.master.config(menu=self.menubar)
|
||||
|
||||
def create_widgets(self):
|
||||
find_entry = Entry(textvariable=self.find_input)
|
||||
find_entry.pack(fill=X)
|
||||
find_entry.bind("<Return>", self.find)
|
||||
find_entry.bind("<Return>", self._find)
|
||||
|
||||
pane = PanedWindow(orient=HORIZONTAL)
|
||||
pane.pack(fill=BOTH, expand=True)
|
||||
@ -33,35 +43,103 @@ class Viewer(Frame):
|
||||
|
||||
scrollbar.configure(command=self.tree.yview)
|
||||
pane.add(frame)
|
||||
if create_inspector:
|
||||
frame = Frame()
|
||||
scrollbar = Scrollbar(frame)
|
||||
scrollbar.pack(side=RIGHT, fill=Y)
|
||||
|
||||
frame = Frame()
|
||||
scrollbar = Scrollbar(frame)
|
||||
scrollbar.pack(side=RIGHT, fill=Y)
|
||||
self.item_inspector = Text(frame, font="TkDefaultFont", tabs="4m", yscrollcommand=scrollbar.set)
|
||||
self.item_inspector.insert(END, "Select an item to inspect it.")
|
||||
self.item_inspector.pack(fill=BOTH, expand=True)
|
||||
|
||||
self.item_inspector = Text(frame, font="TkDefaultFont", tabs="4m", yscrollcommand=scrollbar.set)
|
||||
self.item_inspector.insert(END, "Select an item to inspect it.")
|
||||
self.item_inspector.pack(fill=BOTH, expand=True)
|
||||
scrollbar.configure(command=self.item_inspector.yview)
|
||||
pane.add(frame)
|
||||
|
||||
scrollbar.configure(command=self.item_inspector.yview)
|
||||
pane.add(frame)
|
||||
self.status_frame = Frame()
|
||||
self.status = Label(self.status_frame)
|
||||
self.status.grid(row=0, column=0)
|
||||
self.superbar = Progressbar(self.status_frame, maximum=0, style="Superbar.Horizontal.TProgressbar")
|
||||
self.superbar.grid(row=0, column=1, sticky=NSEW)
|
||||
self.superbar.grid_remove()
|
||||
self.progressbar = Progressbar(self.status_frame)
|
||||
self.progressbar.grid(row=1, columnspan=2, sticky=NSEW)
|
||||
self.progressbar.grid_remove()
|
||||
self.status_frame.columnconfigure(0, weight=1)
|
||||
|
||||
def find(self, _):
|
||||
def set_superbar(self, maximum: int) -> None:
|
||||
self.superbar.config(maximum=maximum, value=0)
|
||||
if maximum == 1:
|
||||
self.superbar.grid_remove()
|
||||
else:
|
||||
self.superbar.grid()
|
||||
|
||||
def step_superbar(self, arg, desc: str="") -> None:
|
||||
if self.superbar.cget("maximum") == 0:
|
||||
self.set_superbar(1)
|
||||
if self.superbar.cget("value") == 0:
|
||||
self.status_frame.pack(side=BOTTOM, fill=X)
|
||||
|
||||
self.status.config(text=desc)
|
||||
self.update()
|
||||
if isinstance(arg, int):
|
||||
iterable = range(arg)
|
||||
max = arg
|
||||
else:
|
||||
iterable = arg
|
||||
max = len(arg)
|
||||
if max > 1:
|
||||
self.progressbar.config(maximum=max+1, value=0)
|
||||
self.progressbar.grid()
|
||||
else:
|
||||
self.progressbar.grid_remove()
|
||||
|
||||
for x in iterable:
|
||||
yield x
|
||||
self.progressbar.step()
|
||||
self.update()
|
||||
self.superbar.step()
|
||||
if self.superbar.cget("value") == 0:
|
||||
self.status_frame.pack_forget()
|
||||
self.superbar.config(maximum=0)
|
||||
|
||||
def set_headings(self, *cols, treeheading: str=None, treewidth: int=None) -> None:
|
||||
if treeheading is not None:
|
||||
self.tree.heading("#0", text=treeheading)
|
||||
self.tree.configure(columns=cols)
|
||||
if treewidth is None:
|
||||
treewidth = self.tree.winfo_width()
|
||||
colwidth = treewidth // (len(cols)+1)
|
||||
self.tree.column("#0", width=colwidth)
|
||||
for i, col in enumerate(cols):
|
||||
self.tree.heading(col, text=col, command=(lambda col: lambda: self._sort_column(col, False))(col))
|
||||
self.tree.column(i, width=colwidth)
|
||||
|
||||
def askopener(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def load(self, path) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
def on_item_select(self, _) -> None:
|
||||
pass
|
||||
|
||||
def _find(self, _):
|
||||
query = self.find_input.get().lower()
|
||||
for item in self.tree.tag_has("match"):
|
||||
tags = list(self.tree.item(item, "tags"))
|
||||
tags.remove("match")
|
||||
self.tree.item(item, tags=tags)
|
||||
self.reattach_all()
|
||||
self._reattach_all()
|
||||
if query:
|
||||
self.filter_items(query)
|
||||
self._filter_items(query)
|
||||
|
||||
def reattach_all(self):
|
||||
def _reattach_all(self) -> None:
|
||||
for parent, detached_children in self.detached_items.items():
|
||||
for index, item in detached_children:
|
||||
self.tree.reattach(item, parent, index)
|
||||
self.detached_items.clear()
|
||||
|
||||
def filter_items(self, query, parent=""):
|
||||
def _filter_items(self, query, parent=""):
|
||||
all_children = self.tree.get_children(parent)
|
||||
detached_children = [item for item in all_children if not any(query in i.lower() for i in self.tree.item(item, "values")) and not query in self.tree.item(item, "text").lower()] # first, find all children that don't match
|
||||
for item in all_children:
|
||||
@ -70,21 +148,28 @@ class Viewer(Frame):
|
||||
tags.append("match")
|
||||
self.tree.item(item, tags=tags)
|
||||
self.tree.see(item)
|
||||
if self.filter_items(query, item) and item in detached_children:
|
||||
if self._filter_items(query, item) and item in detached_children:
|
||||
detached_children.remove(item) # don't detach if a child matches
|
||||
self.detached_items[parent] = [(self.tree.index(item), item) for item in detached_children]
|
||||
for item in detached_children:
|
||||
self.tree.detach(item)
|
||||
return len(detached_children) != len(all_children) # return true if any children match
|
||||
|
||||
def sort_column(self, col, reverse, parent=""):
|
||||
def _sort_column(self, col, reverse, parent="") -> None:
|
||||
children = list(self.tree.get_children(parent))
|
||||
children.sort(key=lambda x: self.tree.set(x, col), reverse=reverse)
|
||||
# rearrange items in sorted positions
|
||||
for index, child in enumerate(children):
|
||||
self.tree.move(child, parent, index)
|
||||
for child in children:
|
||||
self.sort_column(col, reverse, child)
|
||||
self._sort_column(col, reverse, child)
|
||||
if parent == "":
|
||||
# reverse sort next time
|
||||
self.tree.heading(col, command=lambda: self.sort_column(col, not reverse))
|
||||
self.tree.heading(col, command=lambda: self._sort_column(col, not reverse))
|
||||
|
||||
def _askopen(self) -> None:
|
||||
path = self.askopener()
|
||||
if path:
|
||||
self._reattach_all()
|
||||
self.tree.delete(*self.tree.get_children())
|
||||
self.load(path)
|
||||
|
Reference in New Issue
Block a user