Rewrite Kill_stream.py to add rich message functionality

This commit is contained in:
samwiseg00 2018-09-08 20:31:06 -04:00
parent 286c65f9a2
commit 713f6fbe56

View File

@ -1,7 +1,6 @@
""" """
Description: Use conditions to kill a stream Description: Use conditions to kill a stream
Author: Blacktwin, Arcanemagus, samwiseg00 Author: Blacktwin, Arcanemagus, Samwiseg00, JonnyWong16
Requires: requests
Adding the script to Tautulli: Adding the script to Tautulli:
Taultulli > Settings > Notification Agents > Add a new notification agent > Taultulli > Settings > Notification Agents > Add a new notification agent >
@ -35,215 +34,509 @@ Taultulli > Settings > Notification Agents > New Script > Script Arguments:
Arguments: --jbop SELECTOR --userId {user_id} --username {username} Arguments: --jbop SELECTOR --userId {user_id} --username {username}
--sessionId {session_id} --notify notifierID --sessionId {session_id} --notify notifierID
--interval 30 --limit 1200 --interval 30 --limit 1200
--richMessage RICH_TYPE --serverName {server_name}
--plexUrl {plex_url} --posterUrl {poster_url}
--richColor '#E5A00D'
--killMessage 'Your message here.' --killMessage 'Your message here.'
Save Save
Close Close
""" """
import requests
import argparse
import sys
import os import os
from time import sleep import sys
import argparse
import json
import time
from datetime import datetime from datetime import datetime
import requests
TAUTULLI_URL = '' TAUTULLI_URL = ''
TAUTULLI_APIKEY = '' TAUTULLI_APIKEY = ''
TAUTULLI_PUBLIC_URL = ''
TAUTULLI_URL = os.getenv('TAUTULLI_URL', TAUTULLI_URL) TAUTULLI_URL = os.getenv('TAUTULLI_URL', TAUTULLI_URL)
TAUTULLI_PUBLIC_URL = os.getenv('TAUTULLI_PUBLIC_URL', TAUTULLI_PUBLIC_URL)
TAUTULLI_APIKEY = os.getenv('TAUTULLI_APIKEY', TAUTULLI_APIKEY) TAUTULLI_APIKEY = os.getenv('TAUTULLI_APIKEY', TAUTULLI_APIKEY)
TAUTULLI_ENCODING = os.getenv('TAUTULLI_ENCODING', 'UTF-8') TAUTULLI_ENCODING = os.getenv('TAUTULLI_ENCODING', 'UTF-8')
VERIFY_SSL = False
if TAUTULLI_PUBLIC_URL != '/':
# Check to see if there is a public URL set in Tautulli
TAUTULLI_LINK = TAUTULLI_PUBLIC_URL
else:
TAUTULLI_LINK = TAUTULLI_URL
SUBJECT_TEXT = "Tautulli has killed a stream." SUBJECT_TEXT = "Tautulli has killed a stream."
BODY_TEXT = "Killed session ID '{id}'. Reason: {message}" BODY_TEXT = "Killed session ID '{id}'. Reason: {message}"
BODY_TEXT_USER = "Killed {user}'s stream. Reason: {message}." BODY_TEXT_USER = "Killed {user}'s stream. Reason: {message}."
sess = requests.Session()
# Ignore verifying the SSL certificate
sess.verify = False # '/path/to/certfile'
# If verify is set to a path to a directory,
# the directory must have been processed using the c_rehash utility supplied
# with OpenSSL.
if sess.verify is False:
# Disable the warning that the request is insecure, we know that...
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
SELECTOR = ['stream', 'allStreams', 'paused'] SELECTOR = ['stream', 'allStreams', 'paused']
RICH_TYPE = ['discord', 'slack']
def send_notification(subject_text, body_text, notifier_id): TAUTULLI_ICON = 'https://github.com/Tautulli/Tautulli/raw/master/data/interfaces/default/images/logo-circle.png'
"""Send a notification through Tautulli
Parameters
----------
subject_text : str
The text to use for the subject line of the message.
body_text : str
The text to use for the body of the notification.
notifier_id : int
Tautulli Notification Agent ID to send the notification to.
"""
payload = {'apikey': TAUTULLI_APIKEY,
'cmd': 'notify',
'notifier_id': notifier_id,
'subject': subject_text,
'body': body_text}
def utc_now_iso():
"""Get current time in ISO format"""
utcnow = datetime.utcnow()
return utcnow.isoformat()
def hex_to_int(value):
"""Convert hex value to integer"""
try: try:
r = sess.post(TAUTULLI_URL.rstrip('/') + '/api/v2', params=payload) return int(value, 16)
response = r.json() except (ValueError, TypeError):
return 0
if response['response']['result'] == 'success':
sys.stdout.write("Successfully sent Tautulli notification.\n")
else:
raise Exception(response['response']['message'])
except Exception as e:
sys.stderr.write(
"Tautulli API 'notify' request failed: {0}.\n".format(e))
return None
def get_activity(): def arg_decoding(arg):
"""Get the current activity on the PMS. """Decode args, encode UTF-8"""
return arg.decode(TAUTULLI_ENCODING).encode('UTF-8')
Returns
-------
list
The current active sessions on the Plex server.
"""
payload = {'apikey': TAUTULLI_APIKEY,
'cmd': 'get_activity'}
try:
req = sess.get(TAUTULLI_URL.rstrip('/') + '/api/v2', params=payload)
response = req.json()
res_data = response['response']['data']['sessions']
return res_data
except Exception as e:
sys.stderr.write(
"Tautulli API 'get_activity' request failed: {0}.\n".format(e))
return []
def get_user_session_ids(user_id): def debug_dump_vars():
"""Get current session IDs for a specific user. """Dump parameters for debug"""
print('Tautulli URL - ' + TAUTULLI_URL)
print('Tautulli Public URL - ' + TAUTULLI_PUBLIC_URL)
print('Verify SSL - ' + str(VERIFY_SSL))
print('Tautulli API key - ' + TAUTULLI_APIKEY[-4:]
.rjust(len(TAUTULLI_APIKEY), "x"))
def get_all_streams(user_id=None):
"""Get a list of all current streams.
Parameters Parameters
---------- ----------
user_id : int user_id : int
The ID of the user to grab sessions for. The ID of the user to grab sessions for.
Returns Returns
------- -------
list objects
The active session IDs for the specific user ID. The of stream objects.
""" """
sessions = get_activity() sessions = tautulli.get_activity()['sessions']
user_streams = [s['session_id']
for s in sessions if s['user_id'] == user_id] if user_id:
return user_streams streams = [Stream(session=s) for s in sessions if s['user_id'] == user_id]
else:
streams = [Stream(session=s) for s in sessions]
return streams
def terminate_session(session_id, message, notifier=None, username=None): def notify(opts, message, kill_type=None, stream=None):
"""Stop a streaming session. """Decides which notifier type to use"""
if opts.notify and opts.richMessage:
rich_notify(opts.notify, opts.richMessage, opts.richColor, kill_type,
opts.serverName, opts.plexUrl, opts.posterUrl, message, stream)
elif opts.notify:
basic_notify(opts.notify, opts.sessionId, opts.username, message)
def rich_notify(notifier_id, rich_type, color=None, kill_type=None, server_name=None,
plex_url=None, poster_url=None, message=None, stream=None):
"""Decides which rich notifier type to use. Set default values for empty variables
Parameters Parameters
---------- ----------
session_id : str notifier_id : int
The session ID of the stream to terminate. The ID of the user to grab sessions for.
rich_type : str
Contains 'discord' or 'slack'.
kill_type : str
The kill type used.
server_name : str
The name of the plex server.
plex_url : str
Plex media URL.
poster_url : str
The media poster URL.
message : str message : str
The message to display to the user when terminating a stream. Message sent to the client.
notifier : int stream : obj
Notification agent ID to send a message to (the default is None). Stream object.
username : str
The username for the terminated session (the default is None).
""" """
payload = {'apikey': TAUTULLI_APIKEY, notification = Notification(notifier_id, SUBJECT_TEXT, BODY_TEXT, tautulli, stream)
'cmd': 'terminate_session',
'session_id': session_id,
'message': message}
try: # Set a default server_name if none is provided
req = sess.post(TAUTULLI_URL.rstrip('/') + '/api/v2', params=payload) if server_name is None:
response = req.json() server_name = 'Plex Server'
if response['response']['result'] == 'success': # Set a defult color if none is provided
sys.stdout.write( if color is None:
"Successfully killed Plex session: {0}.\n".format(session_id)) color = '#E5A00D'
if notifier:
if username: # Set a defult plexUrl if none is provided
body = BODY_TEXT_USER.format(user=username, if plex_url is None:
message=message) plex_url = 'https://app.plex.tv'
else:
body = BODY_TEXT.format(id=session_id, message=message) # Set a defult posterUrl if none is provided
send_notification(SUBJECT_TEXT, body, notifier) if poster_url is None:
else: poster_url = TAUTULLI_ICON
raise Exception(response['response']['message'])
except Exception as e: # Set a default message if none is provided
sys.stderr.write( if message is None:
"Tautulli API 'terminate_session' request failed: {0}.".format(e)) message = 'The server owner has ended the stream.'
return None
if kill_type == 'Stream':
title = "Killed {}'s stream.".format(stream.friendly_name)
footer = '{} | Kill {}'.format(server_name, kill_type)
elif kill_type == 'Paused':
title = "Killed {}'s paused stream.".format(stream.friendly_name)
footer = '{} | Kill {}'.format(server_name, kill_type)
elif kill_type == 'All Streams':
title = "Killed {}'s stream.".format(stream.friendly_name)
footer = '{} | Kill {}'.format(server_name, kill_type)
poster_url = TAUTULLI_ICON
plex_url = 'https://app.plex.tv'
if rich_type == 'discord':
color = hex_to_int(color.lstrip('#'))
notification.send_discord(title, color, poster_url, plex_url, message, footer)
elif rich_type == 'slack':
notification.send_slack(title, color, poster_url, plex_url, message, footer)
def terminate_long_pause(session_id, message, limit, interval, notify=None, username=None): def basic_notify(notifier_id, session_id, username=None, message=None):
"""Kills the session if it is paused for longer than <limit> seconds. """Basic notifier"""
notification = Notification(notifier_id, SUBJECT_TEXT, BODY_TEXT, tautulli, stream)
Parameters if username:
---------- body = BODY_TEXT_USER.format(user=username,
session_id : str message=message)
The session id of the session to monitor. else:
message : str body = BODY_TEXT.format(id=session_id, message=message)
The message to use if the stream is terminated. notification.send(SUBJECT_TEXT, body)
limit : int
The number of seconds the session is allowed to remain paused before it
is terminated.
interval : int
The amount of time to wait between checks of the session state.
notify : int
Tautulli Notification Agent ID to send a notification to on killing a
stream.
"""
start = datetime.now()
checked_time = 0
# Continue checking 2 intervals past the allowed limit in order to
# account for system variances.
check_limit = limit + (interval * 2)
while checked_time < check_limit:
sessions = get_activity()
found_session = False
for session in sessions: class Tautulli:
if session['session_id'] == session_id: def __init__(self, url, apikey, verify_ssl=False, debug=None):
found_session = True self.url = url
state = session['state'] self.apikey = apikey
now = datetime.now() self.debug = debug
checked_time = (now - start).total_seconds()
if state == 'paused': self.session = requests.Session()
if checked_time >= limit: self.adapters = requests.adapters.HTTPAdapter(max_retries=3,
terminate_session(session_id, message, notify, username) pool_connections=1,
return pool_maxsize=1,
else: pool_block=True)
sleep(interval) self.session.mount('http://', self.adapters)
elif state == 'playing' or state == 'buffering': self.session.mount('https://', self.adapters)
sys.stdout.write(
"Session '{}' has resumed, ".format(session_id) + # Ignore verifying the SSL certificate
"stopping monitoring.\n") if verify_ssl is False:
return self.session.verify = False
if not found_session: # Disable the warning that the request is insecure, we know that...
sys.stdout.write( import urllib3
"Session '{}' is no longer active ".format(session_id) + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
"on the server, stopping monitoring.\n")
def _call_api(self, cmd, payload, method='GET', debug=None):
payload['cmd'] = cmd
payload['apikey'] = self.apikey
try:
response = self.session.request(method, self.url + '/api/v2', params=payload)
except:
print("Tautulli request failed for cmd '{}'. Invalid Tautulli URL?".format(cmd))
if self.debug:
traceback.print_exc()
return return
try:
response_json = response.json()
except ValueError:
print("Failed to parse json response for Tautulli API cmd '{}'".format(cmd))
return
def arg_decoding(arg): if response_json['response']['result'] == 'success':
return arg.decode(TAUTULLI_ENCODING).encode('UTF-8') if self.debug:
print("Successfully called Tautulli API cmd '{}'".format(cmd))
return response_json['response']['data']
else:
error_msg = response_json['response']['message']
print("Tautulli API cmd '{}' failed: {}".format(cmd, error_msg))
return
def get_activity(self, session_key=None, session_id=None):
"""Call Tautulli's get_activity api endpoint"""
payload = {}
if session_key:
payload['session_key'] = session_key
elif session_id:
payload['session_id'] = session_id
return self._call_api('get_activity', payload)
def notify(self, notifier_id, subject, body):
"""Call Tautulli's notify api endpoint"""
payload = {'notifier_id': notifier_id,
'subject': subject,
'body': body}
return self._call_api('notify', payload)
def terminate_session(self, session_key=None, session_id=None, message=''):
"""Call Tautulli's terminate_session api endpoint"""
payload = {}
if session_key:
payload['session_key'] = session_key
elif session_id:
payload['session_id'] = session_id
if message:
payload['message'] = message
return self._call_api('terminate_session', payload)
class Stream:
def __init__(self, session_id=None, user_id=None, username=None, tautulli=None, session=None):
self.session_id = session_id
self.user_id = user_id
self.username = username
self.session_exists = False
self.tautulli = tautulli
if session is not None:
self._set_stream_attributes(session)
def _set_stream_attributes(self, session):
for k, v in session.items():
setattr(self, k, v)
def get_all_stream_info(self):
"""Get all stream info from Tautulli."""
session = self.tautulli.get_activity(session_id=self.session_id)
if session:
self._set_stream_attributes(session)
self.session_exists = True
else:
self.session_exists = False
def terminate(self, message=''):
"""Calls Tautulli to terminate the session.
Parameters
----------
message : str
The message to use if the stream is terminated.
"""
self.tautulli.terminate_session(session_id=self.session_id, message=message)
def terminate_long_pause(self, message, limit, interval):
"""Kills the session if it is paused for longer than <limit> seconds.
Parameters
----------
message : str
The message to use if the stream is terminated.
limit : int
The number of seconds the session is allowed to remain paused before it
is terminated.
interval : int
The amount of time to wait between checks of the session state.
"""
start = datetime.now()
checked_time = 0
# Continue checking 2 intervals past the allowed limit in order to
# account for system variances.
check_limit = limit + (interval * 2)
while checked_time < check_limit:
self.get_all_stream_info()
if self.session_exists is False:
sys.stdout.write(
"Session '{}' from user '{}' is no longer active "
.format(self.session_id, self.username)
+ "on the server, stopping monitoring.\n")
return False
now = datetime.now()
checked_time = (now - start).total_seconds()
if self.state == 'paused':
if checked_time >= limit:
self.terminate(message)
sys.stdout.write(
"Session '{}' from user '{}' has been killed.\n"
.format(self.session_id, self.username))
return True
else:
time.sleep(interval)
elif self.state == 'playing' or self.state == 'buffering':
sys.stdout.write(
"Session '{}' from user '{}' has been resumed, "
.format(self.session_id, self.username)
+ "stopping monitoring.\n")
return False
class Notification:
def __init__(self, notifier_id, subject, body, tautulli, stream):
self.notifier_id = notifier_id
self.subject = subject
self.body = body
self.tautulli = tautulli
self.stream = stream
def send(self, subject='', body=''):
"""Send to Tautulli notifier.
Parameters
----------
subject : str
Subject of the message.
body : str
Body of the message.
"""
subject = subject or self.subject
body = body or self.body
self.tautulli.notify(notifier_id=self.notifier_id, subject=subject, body=body)
def send_discord(self, title, color, poster_url, plex_url, message, footer):
"""Build the Discord message.
Parameters
----------
title : str
The title of the message.
color : int
The color of the message
poster_url : str
The media poster URL.
plex_url : str
Plex media URL.
message : str
Message sent to the player.
footer : str
Footer of the message.
"""
discord_message = {
"embeds": [
{
"author": {
"icon_url": TAUTULLI_ICON,
"name": "Tautulli",
"url": TAUTULLI_LINK.rstrip('/')
},
"color": color,
"fields": [
{
"inline": True,
"name": "User",
"value": self.stream.friendly_name
},
{
"inline": True,
"name": "Session Key",
"value": self.stream.session_key
},
{
"inline": True,
"name": "Watching",
"value": self.stream.full_title
},
{
"inline": False,
"name": "Message Sent",
"value": message
}
],
"thumbnail": {
"url": poster_url
},
"title": title,
"timestamp": utc_now_iso(),
"url": plex_url,
"footer": {
"text": footer
}
}
],
}
discord_message = json.dumps(discord_message, sort_keys=True,
separators=(',', ': '))
self.send(body=discord_message)
def send_slack(self, title, color, poster_url, plex_url, message, footer):
"""Build the Slack message.
Parameters
----------
title : str
The title of the message.
color : int
The color of the message
poster_url : str
The media poster URL.
plex_url : str
Plex media URL.
message : str
Message sent to the player.
footer : str
Footer of the message.
"""
slack_message = {
"attachments": [
{
"title": title,
"title_link": plex_url,
"author_icon": TAUTULLI_ICON,
"author_name": "Tautulli",
"author_link": TAUTULLI_LINK.rstrip('/'),
"color": color,
"fields": [
{
"title": "User",
"value": self.stream.friendly_name,
"short": True
},
{
"title": "Session Key",
"value": self.stream.session_key,
"short": True
},
{
"title": "Watching",
"value": self.stream.full_title,
"short": True
},
{
"title": "Message Sent",
"value": message,
"short": False
}
],
"thumb_url": poster_url,
"footer": footer,
"ts": time.time()
}
],
}
slack_message = json.dumps(slack_message, sort_keys=True,
separators=(',', ': '))
self.send(body=slack_message)
if __name__ == "__main__": if __name__ == "__main__":
@ -258,14 +551,26 @@ if __name__ == "__main__":
parser.add_argument('--sessionId', parser.add_argument('--sessionId',
help='The unique identifier for the stream.') help='The unique identifier for the stream.')
parser.add_argument('--notify', type=int, parser.add_argument('--notify', type=int,
help='Notification Agent ID number to Agent to send ' + help='Notification Agent ID number to Agent to send '
'notification.') + 'notification.')
parser.add_argument('--limit', type=int, default=(20 * 60), # 20 minutes parser.add_argument('--limit', type=int, default=(20 * 60), # 20 minutes
help='The time session is allowed to remain paused.') help='The time session is allowed to remain paused.')
parser.add_argument('--interval', type=int, default=30, parser.add_argument('--interval', type=int, default=30,
help='The seconds between paused session checks.') help='The seconds between paused session checks.')
parser.add_argument('--killMessage', nargs='+', type=arg_decoding, parser.add_argument('--killMessage', nargs='+', type=arg_decoding,
help='Message to send to user whose stream is killed.') help='Message to send to user whose stream is killed.')
parser.add_argument('--richMessage', type=arg_decoding, choices=RICH_TYPE,
help='Rich message type selector.\nChoices: (%(choices)s)')
parser.add_argument('--serverName', type=arg_decoding,
help='Plex Server Name')
parser.add_argument('--plexUrl', type=arg_decoding,
help='URL to plex media')
parser.add_argument('--posterUrl', type=arg_decoding,
help='Poster URL of the media')
parser.add_argument('--richColor', type=arg_decoding,
help='Color of the rich message')
parser.add_argument("--debug", action='store_true',
help='Enable debug messages.')
opts = parser.parse_args() opts = parser.parse_args()
@ -273,17 +578,39 @@ if __name__ == "__main__":
sys.stderr.write("No sessionId provided! Is this synced content?\n") sys.stderr.write("No sessionId provided! Is this synced content?\n")
sys.exit(1) sys.exit(1)
if opts.debug:
# Import traceback to get more deatiled information
import traceback
# Dump the ENVs passed from tatutulli
debug_dump_vars()
# Create a Tautulli instance
tautulli = Tautulli(TAUTULLI_URL.rstrip('/'), TAUTULLI_APIKEY, VERIFY_SSL, opts.debug)
# Create initial Stream object with basic info
stream = Stream(opts.sessionId, opts.userId, opts.username, tautulli)
# Only pull all stream info if using richMessage
if opts.notify and opts.richMessage:
stream.get_all_stream_info()
# Set a defult message if none is provided
if opts.killMessage: if opts.killMessage:
message = ' '.join(opts.killMessage) message = ' '.join(opts.killMessage)
else: else:
message = '' message = 'The server owner has ended the stream.'
if opts.jbop == 'stream': if opts.jbop == 'stream':
terminate_session(opts.sessionId, message, opts.notify, opts.username) stream.terminate(message)
notify(opts, message, 'Stream', stream)
elif opts.jbop == 'allStreams': elif opts.jbop == 'allStreams':
streams = get_user_session_ids(opts.userId) streams = get_all_streams(opts.userId)
for session_id in streams: for stream in streams:
terminate_session(session_id, message, opts.notify, opts.username) tautulli.terminate_session(session_id=stream.session_id, message=message)
notify(opts, message, 'All Streams', stream)
elif opts.jbop == 'paused': elif opts.jbop == 'paused':
terminate_long_pause(opts.sessionId, message, opts.limit, killed_stream = stream.terminate_long_pause(message, opts.limit, opts.interval)
opts.interval, opts.notify, opts.username) if killed_stream:
notify(opts, message, 'Paused', stream)