crafty-4/app/classes/minecraft/mc_ping.py

183 lines
5.6 KiB
Python

import struct
import socket
import base64
import json
import os
import sys
import logging.config
from app.classes.shared.console import console
from app.classes.minecraft.bedrock_ping import BedrockPing
logger = logging.getLogger(__name__)
class Server:
def __init__(self, data):
self.description = data.get('description')
# print(self.description)
if isinstance(self.description, dict):
# cat server
if "translate" in self.description:
self.description = self.description['translate']
# waterfall / bungee
elif 'extra' in self.description:
lines = []
description = self.description
if 'extra' in description.keys():
for e in description['extra']:
#Conversion format code needed only for Java Version
lines.append(get_code_format("reset"))
if "bold" in e.keys():
lines.append(get_code_format("bold"))
if "italic" in e.keys():
lines.append(get_code_format("italic"))
if "underlined" in e.keys():
lines.append(get_code_format("underlined"))
if "strikethrough" in e.keys():
lines.append(get_code_format("strikethrough"))
if "obfuscated" in e.keys():
lines.append(get_code_format("obfuscated"))
if "color" in e.keys():
lines.append(get_code_format(e['color']))
#Then append the text
if "text" in e.keys():
if e['text'] == '\n':
lines.append("§§")
else:
lines.append(e['text'])
total_text = " ".join(lines)
self.description = total_text
# normal MC
else:
self.description = self.description['text']
self.icon = base64.b64decode(data.get('favicon', '')[22:])
self.players = Players(data['players']).report()
self.version = data['version']['name']
self.protocol = data['version']['protocol']
class Players(list):
def __init__(self, data):
super().__init__(Player(x) for x in data.get('sample', []))
self.max = data['max']
self.online = data['online']
def report(self):
players = []
for x in self:
players.append(str(x))
r_data = {
'online': self.online,
'max': self.max,
'players': players
}
return json.dumps(r_data)
class Player:
def __init__(self, data):
self.id = data['id']
self.name = data['name']
def __str__(self):
return self.name
def get_code_format(format_name):
root_dir = os.path.abspath(os.path.curdir)
format_file = os.path.join(root_dir, 'app', 'config', 'motd_format.json')
try:
with open(format_file, "r", encoding='utf-8') as f:
data = json.load(f)
if format_name in data.keys():
return data.get(format_name)
else:
logger.error(f"Format MOTD Error: format name {format_name} does not exist")
console.error(f"Format MOTD Error: format name {format_name} does not exist")
return ""
except Exception as e:
logger.critical(f"Config File Error: Unable to read {format_file} due to {e}")
console.critical(f"Config File Error: Unable to read {format_file} due to {e}")
return ""
# For the rest of requests see wiki.vg/Protocol
def ping(ip, port):
def read_var_int():
i = 0
j = 0
while True:
k = sock.recv(1)
if not k:
return 0
k = k[0]
i |= (k & 0x7f) << (j * 7)
j += 1
if j > 5:
raise ValueError('var_int too big')
if not k & 0x80:
return i
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect((ip, port))
except socket.error:
return False
try:
host = ip.encode('utf-8')
data = b'' # wiki.vg/Server_List_Ping
data += b'\x00' # packet ID
data += b'\x04' # protocol variant
data += struct.pack('>b', len(host)) + host
data += struct.pack('>H', port)
data += b'\x01' # next state
data = struct.pack('>b', len(data)) + data
sock.sendall(data + b'\x01\x00') # handshake + status ping
length = read_var_int() # full packet length
if length < 10:
if length < 0:
return False
else:
return False
sock.recv(1) # packet type, 0 for pings
length = read_var_int() # string length
data = b''
while len(data) != length:
chunk = sock.recv(length - len(data))
if not chunk:
return False
data += chunk
logger.debug(f"Server reports this data on ping: {data}")
return Server(json.loads(data))
finally:
sock.close()
# For the rest of requests see wiki.vg/Protocol
def ping_bedrock(ip, port):
if len(sys.argv) > 3:
client_guid = sys.argv[3]
else:
client_guid = 0
try:
brp = BedrockPing(ip, port, client_guid)
return brp.ping()
except socket.timeout:
logger.debug("Unable to get RakNet stats")