Add typings to minecraft/stats.py

This commit is contained in:
luukas 2022-05-20 22:05:37 +03:00
parent 02f5318f76
commit c1ebb16dff
No known key found for this signature in database
GPG Key ID: CC4915E8D71FC044

View File

@ -3,43 +3,82 @@ import logging
import datetime
import base64
import psutil
import typing as t
from app.classes.minecraft.mc_ping import ping
from app.classes.models.management import HostStats
from app.classes.models.servers import HelperServers
from app.classes.shared.helpers import Helpers
from app.classes.shared.main_controller import Controller
logger = logging.getLogger(__name__)
class DiskDataDict(t.TypedDict):
device: str
total_raw: int
total: str
used_raw: int
used: str
free_raw: int
free: str
percent_used: float
fs: str
mount: str
class NodeStatsDict(t.TypedDict):
boot_time: str
cpu_usage: float
cpu_count: int
cpu_cur_freq: float
cpu_max_freq: float
mem_percent: float
mem_usage_raw: int
mem_usage: str
mem_total_raw: int
mem_total: str
disk_data: t.List[DiskDataDict]
class NodeStatsReturnDict(t.TypedDict):
node_stats: NodeStatsDict
class Stats:
helper: Helpers
controller: Controller
def __init__(self, helper, controller):
self.helper = helper
self.controller = controller
def get_node_stats(self):
def get_node_stats(self) -> NodeStatsReturnDict:
boot_time = datetime.datetime.fromtimestamp(psutil.boot_time())
data = {}
try:
cpu_freq = psutil.cpu_freq()
except NotImplementedError:
cpu_freq = psutil._common.scpufreq(current=0, min=0, max=0)
node_stats = {
memory = psutil.virtual_memory()
node_stats: NodeStatsDict = {
"boot_time": str(boot_time),
"cpu_usage": psutil.cpu_percent(interval=0.5) / psutil.cpu_count(),
"cpu_count": psutil.cpu_count(),
"cpu_cur_freq": round(cpu_freq[0], 2),
"cpu_max_freq": cpu_freq[2],
"mem_percent": psutil.virtual_memory()[2],
"mem_usage": Helpers.human_readable_file_size(psutil.virtual_memory()[3]),
"mem_total": Helpers.human_readable_file_size(psutil.virtual_memory()[0]),
"mem_percent": memory.percent,
"mem_usage_raw": memory.used,
"mem_usage": Helpers.human_readable_file_size(memory.used),
"mem_total_raw": memory.total,
"mem_total": Helpers.human_readable_file_size(memory.total),
"disk_data": self._all_disk_usage(),
}
# server_stats = self.get_servers_stats()
# data['servers'] = server_stats
data["node_stats"] = node_stats
return data
return {
"node_stats": node_stats,
}
@staticmethod
def _get_process_stats(process):
@ -82,7 +121,7 @@ class Stats:
# Source: https://github.com/giampaolo/psutil/blob/master/scripts/disk_usage.py
@staticmethod
def _all_disk_usage():
def _all_disk_usage() -> t.List[DiskDataDict]:
disk_data = []
# print(templ % ("Device", "Total", "Used", "Free", "Use ", "Type","Mount"))
@ -97,10 +136,13 @@ class Stats:
disk_data.append(
{
"device": part.device,
"total_raw": usage.total,
"total": Helpers.human_readable_file_size(usage.total),
"used_raw": usage.used,
"used": Helpers.human_readable_file_size(usage.used),
"free_raw": usage.free,
"free": Helpers.human_readable_file_size(usage.free),
"percent_used": int(usage.percent),
"percent_used": usage.percent,
"fs": part.fstype,
"mount": part.mountpoint,
}
@ -194,7 +236,7 @@ class Stats:
def record_stats(self):
stats_to_send = self.get_node_stats()
node_stats = stats_to_send.get("node_stats")
node_stats = stats_to_send["node_stats"]
HostStats.insert(
{