formatting cleanup to aide future debugging

This commit is contained in:
Brian Lindner 2022-10-07 00:35:42 -04:00
parent 260df79ca6
commit 4486051441
No known key found for this signature in database

View File

@ -9,21 +9,21 @@ Optional Arguments:
-h, --help show this help message and exit -h, --help show this help message and exit
-v, --version show the version number and exit -v, --version show the version number and exit
-lc LOG_CONFIG_FILE, --logconfig-path LOG_CONFIG_FILE -lc LOG_CONFIG_FILE, --logconfig-path LOG_CONFIG_FILE
Path to logging config file. Path to logging config file.
[Default: ./logging.conf] [Default: ./logging.conf]
-c CONFIG_FILE, --config-path CONFIG_FILE -c CONFIG_FILE, --config-path CONFIG_FILE
Path to Config.ini to use for Plex Server info. Path to Config.ini to use for Plex Server info.
[Default: ./config.ini] [Default: ./config.ini]
-s SCHEDULE_FILE, --schedule-path SCHEDULE_FILE -s SCHEDULE_FILE, --schedule-path SCHEDULE_FILE
Path to pre-roll schedule file (YAML) to be use. Path to pre-roll schedule file (YAML) to be use.
[Default: ./preroll_schedules.yaml] [Default: ./preroll_schedules.yaml]
Requirements: Requirements:
- See Requirements.txt for Python modules - See Requirements.txt for Python modules
Scheduling: Scheduling:
Add to system scheduler such as: Add to system scheduler such as:
> crontab -e > crontab -e
> 0 0 * * * python path/to/schedule_preroll.py >/dev/null 2>&1 > 0 0 * * * python path/to/schedule_preroll.py >/dev/null 2>&1
Raises: Raises:
@ -33,17 +33,16 @@ Raises:
FileNotFoundError: [description] FileNotFoundError: [description]
""" """
import logging
import os import os
import sys import sys
import logging from argparse import ArgumentParser, Namespace
from datetime import date, datetime, timedelta
from typing import Dict, List, NamedTuple, Optional, Tuple, Union
import requests import requests
from datetime import datetime, date, timedelta
import yaml import yaml
from typing import NamedTuple, Union, Optional, Tuple, List, Dict from plexapi.server import PlexServer
from argparse import Namespace, ArgumentParser
from configparser import ConfigParser
from configparser import Error as ConfigError
from plexapi.server import PlexServer, CONFIG
# import local util modules # import local util modules
import plexutil import plexutil
@ -53,6 +52,7 @@ logger = logging.getLogger(__name__)
filename = os.path.basename(sys.argv[0]) filename = os.path.basename(sys.argv[0])
SCRIPT_NAME = os.path.splitext(filename)[0] SCRIPT_NAME = os.path.splitext(filename)[0]
class ScheduleEntry(NamedTuple): class ScheduleEntry(NamedTuple):
type: str type: str
startdate: datetime startdate: datetime
@ -60,8 +60,10 @@ class ScheduleEntry(NamedTuple):
force: bool force: bool
path: str path: str
ScheduleType = Dict[str, List[ScheduleEntry]] ScheduleType = Dict[str, List[ScheduleEntry]]
def arguments() -> Namespace: def arguments() -> Namespace:
"""Setup and Return command line arguments """Setup and Return command line arguments
See https://docs.python.org/3/howto/argparse.html See https://docs.python.org/3/howto/argparse.html
@ -69,57 +71,72 @@ def arguments() -> Namespace:
Returns: Returns:
argparse.Namespace: Namespace object argparse.Namespace: Namespace object
""" """
description = 'Automate scheduling of pre-roll intros for Plex' description = "Automate scheduling of pre-roll intros for Plex"
version = '0.10.1' version = "0.10.1"
config_default = './config.ini' config_default = "./config.ini"
log_config_default = './logging.conf' log_config_default = "./logging.conf"
schedule_default = './preroll_schedules.yaml' schedule_default = "./preroll_schedules.yaml"
parser = ArgumentParser(description='{}'.format(description)) parser = ArgumentParser(description=f"{description}")
parser.add_argument('-v', '--version', action='version', version='%(prog)s {}'.format(version), parser.add_argument(
help='show the version number and exit' "-v",
) "--version",
parser.add_argument('-lc', '--logconfig-file', action="version",
dest='log_config_file', action='store', version=f"%(prog)s {version}",
default=log_config_default, help="show the version number and exit",
help='Path to logging config file. [Default: {}]' \ )
.format(log_config_default) parser.add_argument(
) "-lc",
parser.add_argument('-t', '--test-run', "--logconfig-file",
dest='do_test_run', action='store_true', dest="log_config_file",
default=False, action="store",
help='Perform a test run, display output but dont save' default=log_config_default,
) help=f"Path to logging config file. [Default: {log_config_default}]",
parser.add_argument('-c', '--config-file', )
dest='config_file', action='store', parser.add_argument(
help='Path to Config.ini to use for Plex Server info. [Default: {}]' \ "-t",
.format(config_default) "--test-run",
) dest="do_test_run",
parser.add_argument('-s', '--schedule-file', action="store_true",
dest='schedule_file', action='store', default=False,
help='Path to pre-roll schedule file (YAML) to be use. [Default: {}]' \ help="Perform a test run, display output but dont save",
.format(schedule_default) )
) parser.add_argument(
"-c",
"--config-file",
dest="config_file",
action="store",
help=f"Path to Config.ini to use for Plex Server info. [Default: {config_default}]",
)
parser.add_argument(
"-s",
"--schedule-file",
dest="schedule_file",
action="store",
help=f"Path to pre-roll schedule file (YAML) to be use. [Default: {schedule_default}]",
)
args = parser.parse_args() args = parser.parse_args()
return args return args
def schedule_types() -> ScheduleType: def schedule_types() -> ScheduleType:
"""Return the main types of schedules to be used for storage processing """Return the main types of schedules to be used for storage processing
Returns: Returns:
ScheduleType: Dict of main schema items ScheduleType: Dict of main schema items
""" """
schema : ScheduleType = { schema: ScheduleType = {
'default': [], "default": [],
'monthly': [], "monthly": [],
'weekly': [], "weekly": [],
'date_range': [], "date_range": [],
'misc': [] "misc": [],
} }
return schema return schema
def week_range(year:int, weeknum:int) -> Tuple[datetime, datetime]:
def week_range(year: int, weeknum: int) -> Tuple[datetime, datetime]:
"""Return the starting/ending date range of a given year/week """Return the starting/ending date range of a given year/week
Args: Args:
@ -130,8 +147,7 @@ def week_range(year:int, weeknum:int) -> Tuple[datetime, datetime]:
DateTime: Start date of the Year/Month DateTime: Start date of the Year/Month
DateTime: End date of the Year/Month DateTime: End date of the Year/Month
""" """
start = datetime.strptime('{}-W{}-0'.format(year, int(weeknum)-1), start = datetime.strptime(f"{year}-W{int(weeknum) - 1}-0", "%Y-W%W-%w").date()
"%Y-W%W-%w").date()
end = start + timedelta(days=6) end = start + timedelta(days=6)
start = datetime.combine(start, datetime.min.time()) start = datetime.combine(start, datetime.min.time())
@ -139,7 +155,8 @@ def week_range(year:int, weeknum:int) -> Tuple[datetime, datetime]:
return (start, end) return (start, end)
def month_range(year:int, monthnum:int) -> Tuple[datetime, datetime]:
def month_range(year: int, monthnum: int) -> Tuple[datetime, datetime]:
"""Return the starting/ending date range of a given year/month """Return the starting/ending date range of a given year/month
Args: Args:
@ -159,7 +176,8 @@ def month_range(year:int, monthnum:int) -> Tuple[datetime, datetime]:
return (start, end) return (start, end)
def duration_seconds(start:Union[date,datetime], end:Union[date,datetime]) -> float:
def duration_seconds(start: Union[date, datetime], end: Union[date, datetime]) -> float:
"""Return length of time between two date/datetime in seconds """Return length of time between two date/datetime in seconds
Args: Args:
@ -175,22 +193,20 @@ def duration_seconds(start:Union[date,datetime], end:Union[date,datetime]) -> fl
end = datetime.combine(end, datetime.max.time()) end = datetime.combine(end, datetime.max.time())
delta = end - start delta = end - start
logger.debug('duration_second[] Start: {} End: {} Duration: {}'.format(start, logger.debug(f"duration_second[] Start: {start} End: {end} Duration: {delta.total_seconds()}")
end,
delta.total_seconds()
))
return delta.total_seconds() return delta.total_seconds()
def make_datetime(value: Union[str, date, datetime], lowtime: bool=True) -> datetime:
def make_datetime(value: Union[str, date, datetime], lowtime: bool = True) -> datetime:
"""Returns a DateTime object with a calculated Time component if none provided """Returns a DateTime object with a calculated Time component if none provided
converts: converts:
* Date to DateTime, with a Time of Midnight 00:00 or 11:59 pm * Date to DateTime, with a Time of Midnight 00:00 or 11:59 pm
* String to DateTime, with a Time as defined in the string * String to DateTime, with a Time as defined in the string
Args: Args:
value (Union[str, date, datetime]): Input value to convert to a DateTime object value (Union[str, date, datetime]): Input value to convert to a DateTime object
lowtime (bool, optional): Calculate time to be midnight (True) or 11:59 PM (False). lowtime (bool, optional): Calculate time to be midnight (True) or 11:59 PM (False).
Defaults to True. Defaults to True.
Raises: Raises:
@ -201,7 +217,7 @@ def make_datetime(value: Union[str, date, datetime], lowtime: bool=True) -> date
""" """
today = date.today() today = date.today()
now = datetime.now() now = datetime.now()
dt_val = datetime(today.year, today.month, today.day, 0,0,0) dt_val = datetime(today.year, today.month, today.day, 0, 0, 0)
# append the low or high time of the day # append the low or high time of the day
if lowtime: if lowtime:
@ -210,16 +226,15 @@ def make_datetime(value: Union[str, date, datetime], lowtime: bool=True) -> date
time = datetime.max.time() time = datetime.max.time()
# determine how to translate the input value # determine how to translate the input value
if isinstance(value, datetime): if isinstance(value, datetime): # type: ignore
dt_val = value dt_val = value
elif isinstance(value, date): elif isinstance(value, date): # type: ignore
dt_val = datetime.combine(value, time) dt_val = datetime.combine(value, time)
elif isinstance(value, str): elif isinstance(value, str): # type: ignore
try: try:
# Expect format of DateType string to be (YYYY-MM-DD or YYYY-MM-DD HH:MM:SS) # Expect format of DateType string to be (YYYY-MM-DD or YYYY-MM-DD HH:MM:SS)
# allow 'xx' to denote 'every' similar to Cron "*" # allow 'xx' to denote 'every' similar to Cron "*"
msg = 'Translating string value="{}" to datetime (LowTime={})'.format(value, msg = f'Translating string value="{value}" to datetime (LowTime={lowtime})'
lowtime)
logger.debug(msg) logger.debug(msg)
# default to today and the time period (low/high) # default to today and the time period (low/high)
@ -227,39 +242,38 @@ def make_datetime(value: Union[str, date, datetime], lowtime: bool=True) -> date
hour, minute, second = time.hour, time.minute, time.second hour, minute, second = time.hour, time.minute, time.second
# start parsing the Time out, for later additional processing # start parsing the Time out, for later additional processing
dateparts = value.lower().split('-') dateparts = value.lower().split("-")
year = today.year if dateparts[0] == 'xxxx' else int(dateparts[0]) year = today.year if dateparts[0] == "xxxx" else int(dateparts[0])
month = today.month if dateparts[1] == 'xx' else int(dateparts[1]) month = today.month if dateparts[1] == "xx" else int(dateparts[1])
dateparts_day = dateparts[2].split(' ') dateparts_day = dateparts[2].split(" ")
day = today.day if dateparts_day[0] == 'xx' else int(dateparts_day[0]) day = today.day if dateparts_day[0] == "xx" else int(dateparts_day[0])
# attempt to parse out Time components # attempt to parse out Time components
if len(dateparts_day) > 1: if len(dateparts_day) > 1:
timeparts = dateparts_day[1].split(':') timeparts = dateparts_day[1].split(":")
if len(timeparts) > 1: if len(timeparts) > 1:
hour = now.hour if timeparts[0] == 'xx' else int(timeparts[0]) hour = now.hour if timeparts[0] == "xx" else int(timeparts[0])
minute = now.minute if timeparts[1] == 'xx' else int(timeparts[1]) minute = now.minute if timeparts[1] == "xx" else int(timeparts[1])
second = now.second + 1 if timeparts[2] == 'xx' else int(timeparts[2]) second = now.second + 1 if timeparts[2] == "xx" else int(timeparts[2])
dt_val = datetime(year, month, day, hour, minute, second) dt_val = datetime(year, month, day, hour, minute, second)
logger.debug('Datetime-> "{}"'.format(dt_val)) logger.debug(f'Datetime-> "{dt_val}"')
except Exception as e: except Exception as e:
msg = 'Unable to parse date string "{}"'.format(value) msg = f'Unable to parse date string "{value}"'
logger.error(msg, exc_info=e) logger.error(msg, exc_info=e)
raise raise
else: else:
msg = 'UnknownType: Unable to parse date string "{}" for type "{}"'.format(value, msg = f'UnknownType: Unable to parse date string "{value}" for type "{type(value)}"'
type(value)
)
logger.error(msg) logger.error(msg)
raise TypeError(msg) raise TypeError(msg)
return dt_val return dt_val
def preroll_schedule(schedule_file: Optional[str]=None) -> List[ScheduleEntry]:
def preroll_schedule(schedule_file: Optional[str] = None) -> List[ScheduleEntry]:
"""Return a listing of defined preroll schedules for searching/use """Return a listing of defined preroll schedules for searching/use
Args: Args:
@ -271,187 +285,193 @@ def preroll_schedule(schedule_file: Optional[str]=None) -> List[ScheduleEntry]:
Returns: Returns:
list: list of ScheduleEntries list: list of ScheduleEntries
""" """
default_files = ['preroll_schedules.yaml', 'preroll_schedules.yml'] default_files = ["preroll_schedules.yaml", "preroll_schedules.yml"]
filename = None filename = None
if schedule_file != '' and schedule_file != None: if schedule_file not in ("", None):
if os.path.exists(str(schedule_file)): if os.path.exists(str(schedule_file)):
filename = schedule_file filename = schedule_file
else: else:
msg = 'Pre-roll Schedule file "{}" not found'.format(schedule_file) msg = f'Pre-roll Schedule file "{schedule_file}" not found'
raise FileNotFoundError(msg) raise FileNotFoundError(msg)
else: else:
for f in default_files: for f in default_files:
if os.path.exists(f): if os.path.exists(f):
filename = f filename = f
break break
# if we still cant find a schedule file, we abort # if we still cant find a schedule file, we abort
if not filename: if not filename:
msg = 'Missing schedule file: "{}"'.format('" / "'.join(default_files)) filestr = '" / "'.join(default_files)
msg = f'Missing schedule file: "{filestr}"'
logger.critical(msg) logger.critical(msg)
raise FileNotFoundError(msg) raise FileNotFoundError(msg)
with open(filename, 'r') as file: with open(filename, "r") as file:
contents = yaml.load(file, Loader=yaml.SafeLoader) contents = yaml.load(file, Loader=yaml.SafeLoader) # type: ignore
today = date.today() today = date.today()
schedule : List[ScheduleEntry] = [] schedule: List[ScheduleEntry] = []
for schedule_section in schedule_types(): for schedule_section in schedule_types():
if schedule_section == 'weekly': if schedule_section == "weekly":
try: try:
use = contents[schedule_section]['enabled'] use = contents[schedule_section]["enabled"]
if use: if use:
for i in range(1,53): for i in range(1, 53):
try: try:
path = contents[schedule_section][i] path = contents[schedule_section][i]
if path: if path:
start, end = week_range(today.year, i) start, end = week_range(today.year, i)
entry = ScheduleEntry(type=schedule_section, entry = ScheduleEntry(
force=False, type=schedule_section,
startdate=start, force=False,
enddate=end, startdate=start,
path=path) enddate=end,
path=path,
)
schedule.append(entry) schedule.append(entry)
except KeyError as ke: except KeyError as ke:
# skip KeyError for missing Weeks # skip KeyError for missing Weeks
msg = 'Key Value not found: "{}"->"{}", skipping week'.format(schedule_section, msg = f'Key Value not found: "{schedule_section}"->"{i}", skipping week'
i)
logger.debug(msg) logger.debug(msg)
pass pass
except KeyError as ke: except KeyError as ke:
msg = 'Key Value not found in "{}" section'.format(schedule_section) msg = f'Key Value not found in "{schedule_section}" section'
logger.error(msg, exc_info=ke) logger.error(msg, exc_info=ke)
raise raise
elif schedule_section == 'monthly': elif schedule_section == "monthly":
try: try:
use = contents[schedule_section]['enabled'] use = contents[schedule_section]["enabled"]
if use: if use:
for i in range(1,13): for i in range(1, 13):
month_abrev = date(today.year, i, 1).strftime('%b').lower() month_abrev = date(today.year, i, 1).strftime("%b").lower()
try: try:
path = contents[schedule_section][month_abrev] path = contents[schedule_section][month_abrev]
if path: if path:
start, end = month_range(today.year, i) start, end = month_range(today.year, i)
entry = ScheduleEntry(type=schedule_section, entry = ScheduleEntry(
force=False, type=schedule_section,
startdate=start, force=False,
enddate=end, startdate=start,
path=path) enddate=end,
path=path,
)
schedule.append(entry) schedule.append(entry)
except KeyError as ke: except KeyError as ke:
# skip KeyError for missing Months # skip KeyError for missing Months
msg = 'Key Value not found: "{}"->"{}", skipping month'.format(schedule_section, msg = 'Key Value not found: "{schedule_section}"->"{month_abrev}", skipping month'
month_abrev)
logger.warning(msg) logger.warning(msg)
pass pass
except KeyError as ke: except KeyError as ke:
msg = 'Key Value not found in "{}" section'.format(schedule_section) msg = f'Key Value not found in "{schedule_section}" section'
logger.error(msg, exc_info=ke) logger.error(msg, exc_info=ke)
raise raise
elif schedule_section == 'date_range': elif schedule_section == "date_range":
try: try:
use = contents[schedule_section]['enabled'] use = contents[schedule_section]["enabled"]
if use: if use:
for r in contents[schedule_section]['ranges']: for r in contents[schedule_section]["ranges"]:
try: try:
path = r['path'] path = r["path"]
if path: if path:
try: try:
force = r['force'] force = r["force"]
except KeyError as ke: except KeyError as ke:
# special case Optional, ignore # special case Optional, ignore
force = False force = False
pass pass
start = make_datetime(r['start_date'], lowtime=True)
end = make_datetime(r['end_date'], lowtime=False)
entry = ScheduleEntry(type=schedule_section, start = make_datetime(r["start_date"], lowtime=True)
force=force, end = make_datetime(r["end_date"], lowtime=False)
startdate=start,
enddate=end, entry = ScheduleEntry(
path=path) type=schedule_section,
force=force,
startdate=start,
enddate=end,
path=path,
)
schedule.append(entry) schedule.append(entry)
except KeyError as ke: except KeyError as ke:
msg = 'Key Value not found for entry: "{}"'.format(entry) msg = f'Key Value not found for entry: "{entry}"' # type: ignore
logger.error(msg, exc_info=ke) logger.error(msg, exc_info=ke)
raise raise
except KeyError as ke: except KeyError as ke:
msg = 'Key Value not found in "{}" section'.format(schedule_section) msg = f'Key Value not found in "{schedule_section}" section'
logger.error(msg, exc_info=ke) logger.error(msg, exc_info=ke)
raise raise
elif schedule_section == 'misc': elif schedule_section == "misc":
try: try:
use = contents[schedule_section]['enabled'] use = contents[schedule_section]["enabled"]
if use: if use:
try: try:
path = contents[schedule_section]['always_use'] path = contents[schedule_section]["always_use"]
if path: if path:
entry = ScheduleEntry(type=schedule_section, entry = ScheduleEntry(
force=False, type=schedule_section,
startdate=datetime(today.year, today.month, today.day, force=False,
0, 0, 0), startdate=datetime(today.year, today.month, today.day, 0, 0, 0),
enddate=datetime(today.year, today.month, today.day, enddate=datetime(today.year, today.month, today.day, 23, 59, 59),
23,59,59), path=path,
path=path) )
schedule.append(entry) schedule.append(entry)
except KeyError as ke: except KeyError as ke:
msg = 'Key Value not found for entry: "{}"'.format(entry) msg = f'Key Value not found for entry: "{entry}"' # type: ignore
logger.error(msg, exc_info=ke) logger.error(msg, exc_info=ke)
raise raise
except KeyError as ke: except KeyError as ke:
msg = 'Key Value not found in "{}" section'.format(schedule_section) msg = f'Key Value not found in "{schedule_section}" section'
logger.error(msg, exc_info=ke) logger.error(msg, exc_info=ke)
raise raise
elif schedule_section == 'default': elif schedule_section == "default":
try: try:
use = contents[schedule_section]['enabled'] use = contents[schedule_section]["enabled"]
if use: if use:
try: try:
path = contents[schedule_section]['path'] path = contents[schedule_section]["path"]
if path: if path:
entry = ScheduleEntry(type=schedule_section, entry = ScheduleEntry(
force=False, type=schedule_section,
startdate=datetime(today.year, today.month, today.day, force=False,
0, 0, 0), startdate=datetime(today.year, today.month, today.day, 0, 0, 0),
enddate=datetime(today.year, today.month, today.day, enddate=datetime(today.year, today.month, today.day, 23, 59, 59),
23,59,59), path=path,
path=path) )
schedule.append(entry) schedule.append(entry)
except KeyError as ke: except KeyError as ke:
msg = 'Key Value not found for entry: "{}"'.format(entry) msg = f'Key Value not found for entry: "{entry}"' # type: ignore
logger.error(msg, exc_info=ke) logger.error(msg, exc_info=ke)
raise raise
except KeyError as ke: except KeyError as ke:
msg = 'Key Value not found in "{}" section'.format(schedule_section) msg = f'Key Value not found in "{schedule_section}" section'
logger.error(msg, exc_info=ke) logger.error(msg, exc_info=ke)
raise raise
else: else:
msg = 'Unknown schedule_section "{}" detected'.format(schedule_section) msg = f'Unknown schedule_section "{schedule_section}" detected'
logger.error(msg) logger.error(msg)
raise ValueError(msg) raise ValueError(msg)
# Sort list so most recent Ranges appear first # Sort list so most recent Ranges appear first
schedule.sort(reverse=True, key=lambda x:x.startdate) schedule.sort(reverse=True, key=lambda x: x.startdate)
return schedule return schedule
def build_listing_string(items: List[str], play_all: bool=False) -> str:
def build_listing_string(items: List[str], play_all: bool = False) -> str:
"""Build the Plex formatted string of preroll paths """Build the Plex formatted string of preroll paths
Args: Args:
@ -463,14 +483,15 @@ def build_listing_string(items: List[str], play_all: bool=False) -> str:
""" """
if play_all: if play_all:
# use , to play all entries # use , to play all entries
listing = ','.join(items) listing = ",".join(items)
else: else:
#use ; to play random selection # use ; to play random selection
listing = ';'.join(items) listing = ";".join(items)
return listing return listing
def preroll_listing(schedule: List[ScheduleEntry], for_datetime: Optional[datetime]=None) -> str:
def preroll_listing(schedule: List[ScheduleEntry], for_datetime: Optional[datetime] = None) -> str:
"""Return listing of preroll videos to be used by Plex """Return listing of preroll videos to be used by Plex
Args: Args:
@ -481,31 +502,29 @@ def preroll_listing(schedule: List[ScheduleEntry], for_datetime: Optional[dateti
Returns: Returns:
string: listing of preroll video paths to be used for Extras. CSV style: (;|,) string: listing of preroll video paths to be used for Extras. CSV style: (;|,)
""" """
listing = '' listing = ""
entries = schedule_types() entries = schedule_types()
# determine which date to build the listing for # determine which date to build the listing for
if for_datetime: if for_datetime:
if isinstance(for_datetime, datetime): if isinstance(for_datetime, datetime): # type: ignore
check_datetime = for_datetime check_datetime = for_datetime
else: else:
check_datetime = datetime.combine(for_datetime, datetime.now().time()) check_datetime = datetime.combine(for_datetime, datetime.now().time())
else: else:
check_datetime = datetime.now() check_datetime = datetime.now()
# process the schedule for the given date # process the schedule for the given date
for entry in schedule: for entry in schedule:
try: try:
entry_start = entry.startdate entry_start = entry.startdate
entry_end = entry.enddate entry_end = entry.enddate
if not isinstance(entry_start, datetime): if not isinstance(entry_start, datetime): # type: ignore
entry_start = datetime.combine(entry_start, datetime.min.time()) entry_start = datetime.combine(entry_start, datetime.min.time())
if not isinstance(entry_end, datetime): if not isinstance(entry_end, datetime): # type: ignore
entry_end = datetime.combine(entry_end, datetime.max.time()) entry_end = datetime.combine(entry_end, datetime.max.time())
msg = 'checking "{}" against: "{}" - "{}"'.format(check_datetime, msg = f'checking "{check_datetime}" against: "{entry_start}" - "{entry_end}"'
entry_start,
entry_end)
logger.debug(msg) logger.debug(msg)
if entry_start <= check_datetime <= entry_end: if entry_start <= check_datetime <= entry_end:
@ -516,52 +535,56 @@ def preroll_listing(schedule: List[ScheduleEntry], for_datetime: Optional[dateti
entry_force = entry.force entry_force = entry.force
except KeyError as ke: except KeyError as ke:
# special case Optional, ignore # special case Optional, ignore
pass pass
msg = 'Check PASS: Using "{}" - "{}"'.format(entry_start, entry_end) msg = f'Check PASS: Using "{entry_start}" - "{entry_end}"'
logger.debug(msg) logger.debug(msg)
if entry_path: if entry_path:
found = False found = False
# check new schedule item against exist list # check new schedule item against exist list
for e in entries[entry_type]: for e in entries[entry_type]:
duration_new = duration_seconds(entry_start, entry_end) duration_new = duration_seconds(entry_start, entry_end)
duration_curr = duration_seconds(e.startdate, e.enddate) duration_curr = duration_seconds(e.startdate, e.enddate)
# only the narrowest timeframe should stay # only the narrowest timeframe should stay
# disregard if a force entry is there # disregard if a force entry is there
if duration_new < duration_curr and e.force != True: if duration_new < duration_curr and e.force != True:
entries[entry_type].remove(e) entries[entry_type].remove(e)
found = True found = True
# prep for use if New, or is a force Usage # prep for use if New, or is a force Usage
if not found or entry_force == True: if not found or entry_force == True:
entries[entry_type].append(entry) entries[entry_type].append(entry)
except KeyError as ke: except KeyError as ke:
msg = 'KeyError with entry "{}"'.format(entry) msg = f'KeyError with entry "{entry}"'
logger.error(msg, exc_info=ke) logger.error(msg, exc_info=ke)
raise raise
# Build the merged output based or order of Priority # Build the merged output based or order of Priority
merged_list = [] merged_list = []
if entries['misc']: if entries["misc"]:
merged_list.extend([p.path for p in entries['misc']]) merged_list.extend([p.path for p in entries["misc"]]) # type: ignore
if entries['date_range']: if entries["date_range"]:
merged_list.extend([p.path for p in entries['date_range']]) merged_list.extend([p.path for p in entries["date_range"]]) # type: ignore
if entries['weekly'] and not entries['date_range']: if entries["weekly"] and not entries["date_range"]:
merged_list.extend([p.path for p in entries['weekly']]) merged_list.extend([p.path for p in entries["weekly"]]) # type: ignore
if entries['monthly'] \ if entries["monthly"] and not entries["weekly"] and not entries["date_range"]:
and not entries['weekly'] and not entries['date_range']: merged_list.extend([p.path for p in entries["monthly"]]) # type: ignore
merged_list.extend([p.path for p in entries['monthly']]) if (
if entries['default'] \ entries["default"]
and not entries['monthly'] and not entries['weekly'] and not entries['date_range']: and not entries["monthly"]
merged_list.extend([p.path for p in entries['default']]) and not entries["weekly"]
and not entries["date_range"]
):
merged_list.extend([p.path for p in entries["default"]]) # type: ignore
listing = build_listing_string(merged_list) listing = build_listing_string(merged_list)
return listing return listing
def save_preroll_listing(plex: PlexServer, preroll_listing: Union[str, List[str]]) -> None: def save_preroll_listing(plex: PlexServer, preroll_listing: Union[str, List[str]]) -> None:
"""Save Plex Preroll info to PlexServer settings """Save Plex Preroll info to PlexServer settings
@ -570,20 +593,20 @@ def save_preroll_listing(plex: PlexServer, preroll_listing: Union[str, List[str]
preroll_listing (str, list[str]): csv listing or List of preroll paths to save preroll_listing (str, list[str]): csv listing or List of preroll paths to save
""" """
# if happend to send in an Iterable List, merge to a string # if happend to send in an Iterable List, merge to a string
if type(preroll_listing) is list: if isinstance(preroll_listing, list):
preroll_listing = build_listing_string(list(preroll_listing)) preroll_listing = build_listing_string(list(preroll_listing))
msg = 'Attempting save of pre-rolls: "{}"'.format(preroll_listing) msg = f'Attempting save of pre-rolls: "{preroll_listing}"'
logger.debug(msg) logger.debug(msg)
plex.settings.get('cinemaTrailersPrerollID').set(preroll_listing) plex.settings.get("cinemaTrailersPrerollID").set(preroll_listing) # type: ignore
plex.settings.save() plex.settings.save()
msg = 'Saved Pre-Rolls: Server: "{}" Pre-Rolls: "{}"'.format(plex.friendlyName, msg = f'Saved Pre-Rolls: Server: "{plex.friendlyName}" Pre-Rolls: "{preroll_listing}"' # type: ignore
preroll_listing)
logger.info(msg) logger.info(msg)
if __name__ == '__main__':
if __name__ == "__main__":
args = arguments() args = arguments()
plexutil.init_logger(args.log_config_file) plexutil.init_logger(args.log_config_file)
@ -593,27 +616,28 @@ if __name__ == '__main__':
# Initialize Session information # Initialize Session information
sess = requests.Session() sess = requests.Session()
# Ignore verifying the SSL certificate # Ignore verifying the SSL certificate
sess.verify = False # '/path/to/certfile' sess.verify = False # '/path/to/certfile'
# If verify is set to a path of a directory (not a cert file), # If verify is set to a path of a directory (not a cert file),
# the directory needs to be processed with the c_rehash utility # the directory needs to be processed with the c_rehash utility
# from OpenSSL. # from OpenSSL.
if sess.verify is False: if sess.verify is False:
# Disable the warning that the request is insecure, we know that... # Disable the warning that the request is insecure, we know that...
import urllib3 import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
try: try:
plex = PlexServer(cfg['PLEX_URL'], cfg['PLEX_TOKEN'], session=sess) plex = PlexServer(cfg["PLEX_URL"], cfg["PLEX_TOKEN"], session=sess)
except Exception as e: except Exception as e:
msg = 'Error connecting to Plex' msg = "Error connecting to Plex"
logger.error(msg, exc_info=e) logger.error(msg, exc_info=e)
raise e raise e
schedule = preroll_schedule(args.schedule_file) schedule = preroll_schedule(args.schedule_file)
prerolls = preroll_listing(schedule) prerolls = preroll_listing(schedule)
if args.do_test_run: if args.do_test_run:
msg = 'Test Run of Plex Pre-Rolls: **Nothing being saved**\n{}\n'.format(prerolls) msg = f"Test Run of Plex Pre-Rolls: **Nothing being saved**\n{prerolls}\n"
logger.debug(msg) logger.debug(msg)
print(msg) print(msg)
else: else: