decoupled os_calls from fishy to use os_services instead

This commit is contained in:
Adam Saudagar 2023-02-21 23:05:44 +05:30
parent 0de6b54777
commit c5d1cb67cf
8 changed files with 55 additions and 135 deletions

View File

@ -1,11 +1,6 @@
import ctypes
import logging
import os
import sys
import win32con
import win32gui
import fishy
from fishy.gui import GUI, splash, update_dialog, check_eula
from fishy import helper, web
@ -19,19 +14,13 @@ from fishy.helper.migration import Migration
from fishy.osservices.os_services import os_services
def check_window_name(title):
titles = ["Command Prompt", "PowerShell", "Fishy"]
for t in titles:
if t in title:
return True
return False
# noinspection PyBroadException
def initialize(window_to_hide):
def initialize():
Migration.migrate()
helper.create_shortcut_first()
if not config.get("shortcut_created", False):
os_services.create_shortcut(False)
config.set("shortcut_created", True)
new_session = web.get_session()
@ -39,15 +28,11 @@ def initialize(window_to_hide):
logging.error("Couldn't create a session, some features might not work")
logging.debug(f"created session {new_session}")
try:
is_admin = os.getuid() == 0
except AttributeError:
is_admin = ctypes.windll.shell32.IsUserAnAdmin() != 0
if is_admin:
if os_services.is_admin():
logging.info("Running with admin privileges")
if not config.get("debug", False) and check_window_name(win32gui.GetWindowText(window_to_hide)):
win32gui.ShowWindow(window_to_hide, win32con.SW_HIDE)
if not config.get("debug", False):
os_services.hide_terminal()
helper.install_thread_excepthook()
sys.excepthook = helper.unhandled_exception_logging
@ -73,20 +58,18 @@ def main():
update_dialog.check_update(gui)
logger.connect(gui)
window_to_hide = win32gui.GetForegroundWindow()
bot = EngineEventHandler(lambda: gui)
gui = GUI(lambda: bot, on_gui_load)
hotkey.start()
logging.info(f"Fishybot v{fishy.__version__}")
initialize(window_to_hide)
initialize()
gui.start()
active.start()
bot.start_event_handler() # main thread loop
bot.start_event_handler() # main thread loop
hotkey.stop()
active.stop()

View File

@ -1,18 +1,14 @@
import logging
import math
from enum import Enum
from threading import Thread
import numpy as np
import pywintypes
import win32api
import win32gui
from ctypes import windll
from mss import mss
from mss.base import MSSBase
from fishy.helper.helper import print_exc
from fishy.osservices.os_services import os_services
class Status(Enum):
@ -27,10 +23,11 @@ class WindowServer:
"""
Screen: np.ndarray = None
windowOffset = None
hwnd = None
status = Status.STOPPED
sct: MSSBase = None
monitor_top_left = None
crop = None
monitor_id = -1
def init():
@ -38,47 +35,35 @@ def init():
Executed once before the main loop,
Finds the game window, and calculates the offset to remove the title bar
"""
# noinspection PyUnresolvedReferences
try:
WindowServer.hwnd = win32gui.FindWindow(None, "Elder Scrolls Online")
WindowServer.status = Status.RUNNING
WindowServer.sct = mss()
monitor_id = windll.user32.MonitorFromWindow(WindowServer.hwnd, 2)
WindowServer.monitor_top_left = win32api.GetMonitorInfo(monitor_id)["Monitor"][:2]
WindowServer.crop = os_services.get_game_window_rect()
rect = win32gui.GetWindowRect(WindowServer.hwnd)
client_rect = win32gui.GetClientRect(WindowServer.hwnd)
WindowServer.windowOffset = math.floor(((rect[2] - rect[0]) - client_rect[2]) / 2)
WindowServer.status = Status.RUNNING
WindowServer.sct = mss()
except pywintypes.error:
monitor_rect = os_services.get_monitor_rect()
if monitor_rect is None:
logging.error("Game window not found")
WindowServer.status = Status.CRASHED
for i, m in enumerate(WindowServer.sct.monitors):
if m["top"] == monitor_rect[0] and m["left"] == monitor_rect[1]:
WindowServer.monitor_id = i
def get_screenshot():
sct_img = WindowServer.sct.grab(WindowServer.sct.monitors[WindowServer.monitor_id])
# noinspection PyTypeChecker
return np.array(sct_img)
def loop():
"""
Executed in the start of the main loop
finds the game window location and captures it
"""
sct_img = WindowServer.sct.grab(WindowServer.sct.monitors[1])
# noinspection PyTypeChecker
temp_screen = np.array(sct_img)
rect = win32gui.GetWindowRect(WindowServer.hwnd)
client_rect = win32gui.GetClientRect(WindowServer.hwnd)
fullscreen = sct_img.size.height == (rect[3] - rect[1])
title_offset = ((rect[3] - rect[1]) - client_rect[3]) - WindowServer.windowOffset if not fullscreen else 0
crop = (
rect[0] + WindowServer.windowOffset - WindowServer.monitor_top_left[0],
rect[1] + title_offset - WindowServer.monitor_top_left[1],
rect[2] - WindowServer.windowOffset - WindowServer.monitor_top_left[0],
rect[3] - WindowServer.windowOffset - WindowServer.monitor_top_left[1]
)
WindowServer.Screen = temp_screen[crop[1]:crop[3], crop[0]:crop[2]] if not fullscreen else temp_screen
ss = get_screenshot()
crop = WindowServer.crop
WindowServer.Screen = ss[crop[1]:crop[3], crop[0]:crop[2]]
if WindowServer.Screen.size == 0:
logging.error("Don't minimize or drag game window outside the screen")

View File

@ -16,7 +16,8 @@ from fishy.engine.fullautofisher.mode.recorder import Recorder
from fishy.engine.semifisher import fishing_mode
from fishy.engine.semifisher.fishing_mode import FishingMode
from fishy.helper.config import config
from fishy.helper.helper import wait_until, is_eso_active, sign, print_exc
from fishy.helper.helper import wait_until, sign, print_exc
from fishy.osservices.os_services import os_services
mse = mouse.Controller()
kb = keyboard.Controller()
@ -52,9 +53,9 @@ class FullAuto(IEngine):
return
# block thread until game window becomes active
if not is_eso_active():
if not os_services.is_eso_active():
logging.info("Waiting for eso window to be active...")
wait_until(lambda: is_eso_active() or not self.start)
wait_until(lambda: os_services.is_eso_active() or not self.start)
if self.start:
logging.info("starting in 2 secs...")
time.sleep(2)
@ -87,8 +88,8 @@ class FullAuto(IEngine):
def stop_on_inactive(self):
def func():
logging.debug("stop on inactive started")
wait_until(lambda: not is_eso_active() or not self.start)
if self.start and not is_eso_active():
wait_until(lambda: not os_services.is_eso_active() or not self.start)
if self.start and not os_services.is_eso_active():
self.turn_off()
logging.debug("stop on inactive stopped")
Thread(target=func).start()

View File

@ -12,10 +12,10 @@ from playsound import playsound
from fishy import web
from fishy.engine.semifisher import fishing_mode
from fishy.engine.semifisher.fishing_mode import FishingMode, State
from fishy.engine.semifisher.fishing_mode import State
from fishy.helper import helper
from fishy.helper.config import config
from fishy.helper.helper import is_eso_active
from fishy.osservices.os_services import os_services
class FishEvent:
@ -44,7 +44,7 @@ def _fishing_sleep(waittime, lower_limit_ms=16, upper_limit_ms=1375):
def if_eso_is_focused(func):
def wrapper():
if not is_eso_active():
if not os_services.is_eso_active():
logging.warning("ESO window is not focused")
return
func()

View File

@ -15,6 +15,7 @@ from ..helper.config import config
from .discord_login import discord_login
from ..helper.hotkey.hotkey_process import hotkey
from ..helper.hotkey.process import Key
from ..osservices.os_services import os_services
if typing.TYPE_CHECKING:
from . import GUI
@ -46,7 +47,7 @@ def _create(gui: 'GUI'):
gui.login.set(1 if login > 0 else 0)
state = tk.DISABLED if login == -1 else tk.ACTIVE
filemenu.add_checkbutton(label="Login", command=lambda: discord_login(gui), variable=gui.login, state=state)
filemenu.add_command(label="Create Shortcut", command=lambda: helper.create_shortcut(False))
filemenu.add_command(label="Create Shortcut", command=lambda: os_services.create_shortcut(False))
# filemenu.add_command(label="Create Anti-Ghost Shortcut", command=lambda: helper.create_shortcut(True))
def _toggle_mode():

View File

@ -1,5 +1,5 @@
from .config import Config
from .helper import (addon_exists, create_shortcut, create_shortcut_first,
from .helper import (addon_exists,
get_addonversion, get_savedvarsdir,
install_addon, install_thread_excepthook, manifest_file,
not_implemented, open_web, playsound_multiple,

View File

@ -10,15 +10,17 @@ from typing import Optional
from event_scheduler import EventScheduler
from fishy.osservices.os_services import os_services
def filename():
from fishy.helper.helper import get_documents
name = "fishy_config.json"
_filename = os.path.join(os.environ["HOMEDRIVE"], os.environ["HOMEPATH"], "Documents", name)
if os.path.exists(_filename):
return _filename
return os.path.join(get_documents(), name)
# fallback for onedrive documents
return os.path.join(os_services.get_documents(), name)
temp_file = os.path.join(os.environ["TEMP"], "fishy_config.BAK")

View File

@ -14,15 +14,13 @@ from uuid import uuid1
from zipfile import ZipFile
import requests
import winshell
from playsound import playsound
from win32com.client import Dispatch
from win32comext.shell import shell, shellcon
from win32gui import GetForegroundWindow, GetWindowText
import fishy
from fishy.constants import libgps, lam2, fishyqr, libmapping, libdl, libchatmsg
from fishy.helper.config import config
from fishy.osservices.os_services import os_services
def playsound_multiple(path, count=2):
@ -110,55 +108,14 @@ def manifest_file(rel_path):
return os.path.join(os.path.dirname(fishy.__file__), rel_path)
def create_shortcut_first():
from .config import config
if not config.get("shortcut_created", False):
create_shortcut(False)
config.set("shortcut_created", True)
# noinspection PyBroadException
def create_shortcut(anti_ghosting: bool):
"""
creates a new shortcut on desktop
"""
try:
desktop = winshell.desktop()
path = os.path.join(desktop, "Fishybot ESO.lnk")
_shell = Dispatch('WScript.Shell')
shortcut = _shell.CreateShortCut(path)
if anti_ghosting:
shortcut.TargetPath = r"C:\Windows\System32\cmd.exe"
python_dir = os.path.join(os.path.dirname(sys.executable), "pythonw.exe")
shortcut.Arguments = f"/C start /affinity 1 /low {python_dir} -m fishy"
else:
shortcut.TargetPath = os.path.join(os.path.dirname(sys.executable), "python.exe")
shortcut.Arguments = "-m fishy"
shortcut.IconLocation = manifest_file("icon.ico")
shortcut.save()
logging.info("Shortcut created")
except Exception:
print_exc()
logging.error("Couldn't create shortcut")
def get_savedvarsdir():
# noinspection PyUnresolvedReferences
from win32com.shell import shell, shellcon
documents = shell.SHGetFolderPath(0, shellcon.CSIDL_PERSONAL, None, 0)
return os.path.join(documents, "Elder Scrolls Online", "live", "SavedVariables")
eso_path = os_services.get_eso_config_path()
return os.path.join(eso_path, "live", "SavedVariables")
def get_addondir():
# noinspection PyUnresolvedReferences
from win32com.shell import shell, shellcon
documents = shell.SHGetFolderPath(0, shellcon.CSIDL_PERSONAL, None, 0)
return os.path.join(documents, "Elder Scrolls Online", "live", "Addons")
eso_path = os_services.get_eso_config_path()
return os.path.join(eso_path, "live", "Addons")
def addon_exists(name, url=None, v=None):
@ -222,19 +179,10 @@ def remove_addon(name, url=None, v=None):
return 0
def get_documents():
return shell.SHGetFolderPath(0, shellcon.CSIDL_PERSONAL, None, 0)
def log_raise(msg):
logging.error(msg)
raise Exception(msg)
def is_eso_active():
return GetWindowText(GetForegroundWindow()) == "Elder Scrolls Online"
# noinspection PyProtectedMember,PyUnresolvedReferences
def _get_id(thread):
# returns id of the respective thread
@ -252,8 +200,8 @@ def kill_thread(thread):
if res > 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, 0)
print('Exception raise failure')
def print_exc():
logging.error(traceback.format_exc())
traceback.print_exc()