From 713f6fbe56330c67006e26ea0857f7358e2cb4fc Mon Sep 17 00:00:00 2001 From: samwiseg00 Date: Sat, 8 Sep 2018 20:31:06 -0400 Subject: [PATCH] Rewrite Kill_stream.py to add rich message functionality --- killstream/kill_stream.py | 655 ++++++++++++++++++++++++++++---------- 1 file changed, 491 insertions(+), 164 deletions(-) diff --git a/killstream/kill_stream.py b/killstream/kill_stream.py index 4cbf54e..f67b622 100644 --- a/killstream/kill_stream.py +++ b/killstream/kill_stream.py @@ -1,7 +1,6 @@ """ Description: Use conditions to kill a stream -Author: Blacktwin, Arcanemagus, samwiseg00 -Requires: requests +Author: Blacktwin, Arcanemagus, Samwiseg00, JonnyWong16 Adding the script to Tautulli: Taultulli > Settings > Notification Agents > Add a new notification agent > @@ -35,215 +34,509 @@ Taultulli > Settings > Notification Agents > New Script > Script Arguments: Arguments: --jbop SELECTOR --userId {user_id} --username {username} --sessionId {session_id} --notify notifierID --interval 30 --limit 1200 + --richMessage RICH_TYPE --serverName {server_name} + --plexUrl {plex_url} --posterUrl {poster_url} + --richColor '#E5A00D' --killMessage 'Your message here.' Save Close """ -import requests -import argparse -import sys + import os -from time import sleep +import sys +import argparse +import json +import time from datetime import datetime +import requests + TAUTULLI_URL = '' TAUTULLI_APIKEY = '' +TAUTULLI_PUBLIC_URL = '' TAUTULLI_URL = os.getenv('TAUTULLI_URL', TAUTULLI_URL) +TAUTULLI_PUBLIC_URL = os.getenv('TAUTULLI_PUBLIC_URL', TAUTULLI_PUBLIC_URL) TAUTULLI_APIKEY = os.getenv('TAUTULLI_APIKEY', TAUTULLI_APIKEY) TAUTULLI_ENCODING = os.getenv('TAUTULLI_ENCODING', 'UTF-8') +VERIFY_SSL = False + +if TAUTULLI_PUBLIC_URL != '/': + # Check to see if there is a public URL set in Tautulli + TAUTULLI_LINK = TAUTULLI_PUBLIC_URL +else: + TAUTULLI_LINK = TAUTULLI_URL SUBJECT_TEXT = "Tautulli has killed a stream." BODY_TEXT = "Killed session ID '{id}'. Reason: {message}" BODY_TEXT_USER = "Killed {user}'s stream. Reason: {message}." -sess = requests.Session() -# Ignore verifying the SSL certificate -sess.verify = False # '/path/to/certfile' -# If verify is set to a path to a directory, -# the directory must have been processed using the c_rehash utility supplied -# with OpenSSL. -if sess.verify is False: - # Disable the warning that the request is insecure, we know that... - import urllib3 - urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) SELECTOR = ['stream', 'allStreams', 'paused'] +RICH_TYPE = ['discord', 'slack'] -def send_notification(subject_text, body_text, notifier_id): - """Send a notification through Tautulli +TAUTULLI_ICON = 'https://github.com/Tautulli/Tautulli/raw/master/data/interfaces/default/images/logo-circle.png' - Parameters - ---------- - subject_text : str - The text to use for the subject line of the message. - body_text : str - The text to use for the body of the notification. - notifier_id : int - Tautulli Notification Agent ID to send the notification to. - """ - payload = {'apikey': TAUTULLI_APIKEY, - 'cmd': 'notify', - 'notifier_id': notifier_id, - 'subject': subject_text, - 'body': body_text} +def utc_now_iso(): + """Get current time in ISO format""" + utcnow = datetime.utcnow() + + return utcnow.isoformat() + + +def hex_to_int(value): + """Convert hex value to integer""" try: - r = sess.post(TAUTULLI_URL.rstrip('/') + '/api/v2', params=payload) - response = r.json() - - if response['response']['result'] == 'success': - sys.stdout.write("Successfully sent Tautulli notification.\n") - else: - raise Exception(response['response']['message']) - except Exception as e: - sys.stderr.write( - "Tautulli API 'notify' request failed: {0}.\n".format(e)) - return None + return int(value, 16) + except (ValueError, TypeError): + return 0 -def get_activity(): - """Get the current activity on the PMS. - - Returns - ------- - list - The current active sessions on the Plex server. - """ - payload = {'apikey': TAUTULLI_APIKEY, - 'cmd': 'get_activity'} - - try: - req = sess.get(TAUTULLI_URL.rstrip('/') + '/api/v2', params=payload) - response = req.json() - - res_data = response['response']['data']['sessions'] - return res_data - - except Exception as e: - sys.stderr.write( - "Tautulli API 'get_activity' request failed: {0}.\n".format(e)) - return [] +def arg_decoding(arg): + """Decode args, encode UTF-8""" + return arg.decode(TAUTULLI_ENCODING).encode('UTF-8') -def get_user_session_ids(user_id): - """Get current session IDs for a specific user. +def debug_dump_vars(): + """Dump parameters for debug""" + print('Tautulli URL - ' + TAUTULLI_URL) + print('Tautulli Public URL - ' + TAUTULLI_PUBLIC_URL) + print('Verify SSL - ' + str(VERIFY_SSL)) + print('Tautulli API key - ' + TAUTULLI_APIKEY[-4:] + .rjust(len(TAUTULLI_APIKEY), "x")) + + +def get_all_streams(user_id=None): + """Get a list of all current streams. Parameters ---------- user_id : int The ID of the user to grab sessions for. - Returns ------- - list - The active session IDs for the specific user ID. - + objects + The of stream objects. """ - sessions = get_activity() - user_streams = [s['session_id'] - for s in sessions if s['user_id'] == user_id] - return user_streams + sessions = tautulli.get_activity()['sessions'] + + if user_id: + streams = [Stream(session=s) for s in sessions if s['user_id'] == user_id] + else: + streams = [Stream(session=s) for s in sessions] + + return streams -def terminate_session(session_id, message, notifier=None, username=None): - """Stop a streaming session. +def notify(opts, message, kill_type=None, stream=None): + """Decides which notifier type to use""" + if opts.notify and opts.richMessage: + rich_notify(opts.notify, opts.richMessage, opts.richColor, kill_type, + opts.serverName, opts.plexUrl, opts.posterUrl, message, stream) + elif opts.notify: + basic_notify(opts.notify, opts.sessionId, opts.username, message) + + +def rich_notify(notifier_id, rich_type, color=None, kill_type=None, server_name=None, + plex_url=None, poster_url=None, message=None, stream=None): + """Decides which rich notifier type to use. Set default values for empty variables Parameters ---------- - session_id : str - The session ID of the stream to terminate. + notifier_id : int + The ID of the user to grab sessions for. + rich_type : str + Contains 'discord' or 'slack'. + kill_type : str + The kill type used. + server_name : str + The name of the plex server. + plex_url : str + Plex media URL. + poster_url : str + The media poster URL. message : str - The message to display to the user when terminating a stream. - notifier : int - Notification agent ID to send a message to (the default is None). - username : str - The username for the terminated session (the default is None). + Message sent to the client. + stream : obj + Stream object. """ - payload = {'apikey': TAUTULLI_APIKEY, - 'cmd': 'terminate_session', - 'session_id': session_id, - 'message': message} + notification = Notification(notifier_id, SUBJECT_TEXT, BODY_TEXT, tautulli, stream) - try: - req = sess.post(TAUTULLI_URL.rstrip('/') + '/api/v2', params=payload) - response = req.json() + # Set a default server_name if none is provided + if server_name is None: + server_name = 'Plex Server' - if response['response']['result'] == 'success': - sys.stdout.write( - "Successfully killed Plex session: {0}.\n".format(session_id)) - if notifier: - if username: - body = BODY_TEXT_USER.format(user=username, - message=message) - else: - body = BODY_TEXT.format(id=session_id, message=message) - send_notification(SUBJECT_TEXT, body, notifier) - else: - raise Exception(response['response']['message']) - except Exception as e: - sys.stderr.write( - "Tautulli API 'terminate_session' request failed: {0}.".format(e)) - return None + # Set a defult color if none is provided + if color is None: + color = '#E5A00D' + + # Set a defult plexUrl if none is provided + if plex_url is None: + plex_url = 'https://app.plex.tv' + + # Set a defult posterUrl if none is provided + if poster_url is None: + poster_url = TAUTULLI_ICON + + # Set a default message if none is provided + if message is None: + message = 'The server owner has ended the stream.' + + if kill_type == 'Stream': + title = "Killed {}'s stream.".format(stream.friendly_name) + footer = '{} | Kill {}'.format(server_name, kill_type) + + elif kill_type == 'Paused': + title = "Killed {}'s paused stream.".format(stream.friendly_name) + footer = '{} | Kill {}'.format(server_name, kill_type) + + elif kill_type == 'All Streams': + title = "Killed {}'s stream.".format(stream.friendly_name) + footer = '{} | Kill {}'.format(server_name, kill_type) + poster_url = TAUTULLI_ICON + plex_url = 'https://app.plex.tv' + + if rich_type == 'discord': + color = hex_to_int(color.lstrip('#')) + notification.send_discord(title, color, poster_url, plex_url, message, footer) + + elif rich_type == 'slack': + notification.send_slack(title, color, poster_url, plex_url, message, footer) -def terminate_long_pause(session_id, message, limit, interval, notify=None, username=None): - """Kills the session if it is paused for longer than seconds. +def basic_notify(notifier_id, session_id, username=None, message=None): + """Basic notifier""" + notification = Notification(notifier_id, SUBJECT_TEXT, BODY_TEXT, tautulli, stream) - Parameters - ---------- - session_id : str - The session id of the session to monitor. - message : str - The message to use if the stream is terminated. - limit : int - The number of seconds the session is allowed to remain paused before it - is terminated. - interval : int - The amount of time to wait between checks of the session state. - notify : int - Tautulli Notification Agent ID to send a notification to on killing a - stream. - """ - start = datetime.now() - checked_time = 0 - # Continue checking 2 intervals past the allowed limit in order to - # account for system variances. - check_limit = limit + (interval * 2) + if username: + body = BODY_TEXT_USER.format(user=username, + message=message) + else: + body = BODY_TEXT.format(id=session_id, message=message) + notification.send(SUBJECT_TEXT, body) - while checked_time < check_limit: - sessions = get_activity() - found_session = False - for session in sessions: - if session['session_id'] == session_id: - found_session = True - state = session['state'] - now = datetime.now() - checked_time = (now - start).total_seconds() +class Tautulli: + def __init__(self, url, apikey, verify_ssl=False, debug=None): + self.url = url + self.apikey = apikey + self.debug = debug - if state == 'paused': - if checked_time >= limit: - terminate_session(session_id, message, notify, username) - return - else: - sleep(interval) - elif state == 'playing' or state == 'buffering': - sys.stdout.write( - "Session '{}' has resumed, ".format(session_id) + - "stopping monitoring.\n") - return - if not found_session: - sys.stdout.write( - "Session '{}' is no longer active ".format(session_id) + - "on the server, stopping monitoring.\n") + self.session = requests.Session() + self.adapters = requests.adapters.HTTPAdapter(max_retries=3, + pool_connections=1, + pool_maxsize=1, + pool_block=True) + self.session.mount('http://', self.adapters) + self.session.mount('https://', self.adapters) + + # Ignore verifying the SSL certificate + if verify_ssl is False: + self.session.verify = False + # Disable the warning that the request is insecure, we know that... + import urllib3 + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + def _call_api(self, cmd, payload, method='GET', debug=None): + payload['cmd'] = cmd + payload['apikey'] = self.apikey + + try: + response = self.session.request(method, self.url + '/api/v2', params=payload) + except: + print("Tautulli request failed for cmd '{}'. Invalid Tautulli URL?".format(cmd)) + if self.debug: + traceback.print_exc() return + try: + response_json = response.json() + except ValueError: + print("Failed to parse json response for Tautulli API cmd '{}'".format(cmd)) + return -def arg_decoding(arg): - return arg.decode(TAUTULLI_ENCODING).encode('UTF-8') + if response_json['response']['result'] == 'success': + if self.debug: + print("Successfully called Tautulli API cmd '{}'".format(cmd)) + return response_json['response']['data'] + else: + error_msg = response_json['response']['message'] + print("Tautulli API cmd '{}' failed: {}".format(cmd, error_msg)) + return + + def get_activity(self, session_key=None, session_id=None): + """Call Tautulli's get_activity api endpoint""" + payload = {} + + if session_key: + payload['session_key'] = session_key + elif session_id: + payload['session_id'] = session_id + + return self._call_api('get_activity', payload) + + def notify(self, notifier_id, subject, body): + """Call Tautulli's notify api endpoint""" + payload = {'notifier_id': notifier_id, + 'subject': subject, + 'body': body} + + return self._call_api('notify', payload) + + def terminate_session(self, session_key=None, session_id=None, message=''): + """Call Tautulli's terminate_session api endpoint""" + payload = {} + + if session_key: + payload['session_key'] = session_key + elif session_id: + payload['session_id'] = session_id + + if message: + payload['message'] = message + + return self._call_api('terminate_session', payload) + + +class Stream: + def __init__(self, session_id=None, user_id=None, username=None, tautulli=None, session=None): + self.session_id = session_id + self.user_id = user_id + self.username = username + self.session_exists = False + + self.tautulli = tautulli + + if session is not None: + self._set_stream_attributes(session) + + def _set_stream_attributes(self, session): + for k, v in session.items(): + setattr(self, k, v) + + def get_all_stream_info(self): + """Get all stream info from Tautulli.""" + session = self.tautulli.get_activity(session_id=self.session_id) + if session: + self._set_stream_attributes(session) + self.session_exists = True + else: + self.session_exists = False + + def terminate(self, message=''): + """Calls Tautulli to terminate the session. + + Parameters + ---------- + message : str + The message to use if the stream is terminated. + """ + self.tautulli.terminate_session(session_id=self.session_id, message=message) + + def terminate_long_pause(self, message, limit, interval): + """Kills the session if it is paused for longer than seconds. + + Parameters + ---------- + message : str + The message to use if the stream is terminated. + limit : int + The number of seconds the session is allowed to remain paused before it + is terminated. + interval : int + The amount of time to wait between checks of the session state. + """ + start = datetime.now() + checked_time = 0 + # Continue checking 2 intervals past the allowed limit in order to + # account for system variances. + check_limit = limit + (interval * 2) + + while checked_time < check_limit: + self.get_all_stream_info() + + if self.session_exists is False: + sys.stdout.write( + "Session '{}' from user '{}' is no longer active " + .format(self.session_id, self.username) + + "on the server, stopping monitoring.\n") + return False + + now = datetime.now() + checked_time = (now - start).total_seconds() + + if self.state == 'paused': + if checked_time >= limit: + self.terminate(message) + sys.stdout.write( + "Session '{}' from user '{}' has been killed.\n" + .format(self.session_id, self.username)) + return True + else: + time.sleep(interval) + + elif self.state == 'playing' or self.state == 'buffering': + sys.stdout.write( + "Session '{}' from user '{}' has been resumed, " + .format(self.session_id, self.username) + + "stopping monitoring.\n") + return False + + +class Notification: + def __init__(self, notifier_id, subject, body, tautulli, stream): + self.notifier_id = notifier_id + self.subject = subject + self.body = body + + self.tautulli = tautulli + self.stream = stream + + def send(self, subject='', body=''): + """Send to Tautulli notifier. + + Parameters + ---------- + subject : str + Subject of the message. + body : str + Body of the message. + """ + subject = subject or self.subject + body = body or self.body + self.tautulli.notify(notifier_id=self.notifier_id, subject=subject, body=body) + + def send_discord(self, title, color, poster_url, plex_url, message, footer): + """Build the Discord message. + + Parameters + ---------- + title : str + The title of the message. + color : int + The color of the message + poster_url : str + The media poster URL. + plex_url : str + Plex media URL. + message : str + Message sent to the player. + footer : str + Footer of the message. + """ + discord_message = { + "embeds": [ + { + "author": { + "icon_url": TAUTULLI_ICON, + "name": "Tautulli", + "url": TAUTULLI_LINK.rstrip('/') + }, + "color": color, + "fields": [ + { + "inline": True, + "name": "User", + "value": self.stream.friendly_name + }, + { + "inline": True, + "name": "Session Key", + "value": self.stream.session_key + }, + { + "inline": True, + "name": "Watching", + "value": self.stream.full_title + }, + { + "inline": False, + "name": "Message Sent", + "value": message + } + ], + "thumbnail": { + "url": poster_url + }, + "title": title, + "timestamp": utc_now_iso(), + "url": plex_url, + "footer": { + "text": footer + } + + } + + ], + } + + discord_message = json.dumps(discord_message, sort_keys=True, + separators=(',', ': ')) + self.send(body=discord_message) + + def send_slack(self, title, color, poster_url, plex_url, message, footer): + """Build the Slack message. + + Parameters + ---------- + title : str + The title of the message. + color : int + The color of the message + poster_url : str + The media poster URL. + plex_url : str + Plex media URL. + message : str + Message sent to the player. + footer : str + Footer of the message. + """ + slack_message = { + "attachments": [ + { + "title": title, + "title_link": plex_url, + "author_icon": TAUTULLI_ICON, + "author_name": "Tautulli", + "author_link": TAUTULLI_LINK.rstrip('/'), + "color": color, + "fields": [ + { + "title": "User", + "value": self.stream.friendly_name, + "short": True + }, + { + "title": "Session Key", + "value": self.stream.session_key, + "short": True + }, + { + "title": "Watching", + "value": self.stream.full_title, + "short": True + }, + { + "title": "Message Sent", + "value": message, + "short": False + } + ], + "thumb_url": poster_url, + "footer": footer, + "ts": time.time() + } + + ], + } + + slack_message = json.dumps(slack_message, sort_keys=True, + separators=(',', ': ')) + self.send(body=slack_message) if __name__ == "__main__": @@ -258,14 +551,26 @@ if __name__ == "__main__": parser.add_argument('--sessionId', help='The unique identifier for the stream.') parser.add_argument('--notify', type=int, - help='Notification Agent ID number to Agent to send ' + - 'notification.') + help='Notification Agent ID number to Agent to send ' + + 'notification.') parser.add_argument('--limit', type=int, default=(20 * 60), # 20 minutes help='The time session is allowed to remain paused.') parser.add_argument('--interval', type=int, default=30, help='The seconds between paused session checks.') parser.add_argument('--killMessage', nargs='+', type=arg_decoding, help='Message to send to user whose stream is killed.') + parser.add_argument('--richMessage', type=arg_decoding, choices=RICH_TYPE, + help='Rich message type selector.\nChoices: (%(choices)s)') + parser.add_argument('--serverName', type=arg_decoding, + help='Plex Server Name') + parser.add_argument('--plexUrl', type=arg_decoding, + help='URL to plex media') + parser.add_argument('--posterUrl', type=arg_decoding, + help='Poster URL of the media') + parser.add_argument('--richColor', type=arg_decoding, + help='Color of the rich message') + parser.add_argument("--debug", action='store_true', + help='Enable debug messages.') opts = parser.parse_args() @@ -273,17 +578,39 @@ if __name__ == "__main__": sys.stderr.write("No sessionId provided! Is this synced content?\n") sys.exit(1) + if opts.debug: + # Import traceback to get more deatiled information + import traceback + # Dump the ENVs passed from tatutulli + debug_dump_vars() + + # Create a Tautulli instance + tautulli = Tautulli(TAUTULLI_URL.rstrip('/'), TAUTULLI_APIKEY, VERIFY_SSL, opts.debug) + + # Create initial Stream object with basic info + stream = Stream(opts.sessionId, opts.userId, opts.username, tautulli) + + # Only pull all stream info if using richMessage + if opts.notify and opts.richMessage: + stream.get_all_stream_info() + + # Set a defult message if none is provided if opts.killMessage: message = ' '.join(opts.killMessage) else: - message = '' + message = 'The server owner has ended the stream.' if opts.jbop == 'stream': - terminate_session(opts.sessionId, message, opts.notify, opts.username) + stream.terminate(message) + notify(opts, message, 'Stream', stream) + elif opts.jbop == 'allStreams': - streams = get_user_session_ids(opts.userId) - for session_id in streams: - terminate_session(session_id, message, opts.notify, opts.username) + streams = get_all_streams(opts.userId) + for stream in streams: + tautulli.terminate_session(session_id=stream.session_id, message=message) + notify(opts, message, 'All Streams', stream) + elif opts.jbop == 'paused': - terminate_long_pause(opts.sessionId, message, opts.limit, - opts.interval, opts.notify, opts.username) + killed_stream = stream.terminate_long_pause(message, opts.limit, opts.interval) + if killed_stream: + notify(opts, message, 'Paused', stream)