136 lines
4.9 KiB
Python
136 lines
4.9 KiB
Python
import logging
|
|
from typing import Dict, List, Optional, Tuple, Union
|
|
|
|
import requests
|
|
from requests.exceptions import ConnectionError, HTTPError
|
|
|
|
from zulip_bots.lib import BotHandler
|
|
|
|
commands_list = ("list", "top", "help")
|
|
|
|
|
|
class YoutubeHandler:
|
|
def usage(self) -> str:
|
|
return """
|
|
This plugin will allow users to search
|
|
for a given search term on Youtube.
|
|
Use '@mention-bot help' to get more information on the bot usage.
|
|
"""
|
|
|
|
help_content = (
|
|
"*Help for YouTube bot* :robot_face: : \n\n"
|
|
"The bot responds to messages starting with @mention-bot.\n\n"
|
|
"`@mention-bot <search terms>` will return top Youtube video for the given `<search term>`.\n"
|
|
"`@mention-bot top <search terms>` also returns the top Youtube result.\n"
|
|
"`@mention-bot list <search terms>` will return a list Youtube videos for the given <search term>.\n \n"
|
|
"Example:\n"
|
|
" * @mention-bot funny cats\n"
|
|
" * @mention-bot list funny dogs"
|
|
)
|
|
|
|
def initialize(self, bot_handler: BotHandler) -> None:
|
|
self.config_info = bot_handler.get_config_info("youtube")
|
|
# Check if API key is valid. If it is not valid, don't run the bot.
|
|
try:
|
|
search_youtube("test", self.config_info["key"], self.config_info["video_region"])
|
|
except HTTPError as e:
|
|
assert e.response is not None
|
|
if e.response.json()["error"]["errors"][0]["reason"] == "keyInvalid":
|
|
bot_handler.quit(
|
|
"Invalid key.Follow the instructions in doc.md for setting API key."
|
|
)
|
|
else:
|
|
raise
|
|
except ConnectionError:
|
|
logging.warning("Bad connection")
|
|
|
|
def handle_message(self, message: Dict[str, str], bot_handler: BotHandler) -> None:
|
|
if message["content"] == "" or message["content"] == "help":
|
|
bot_handler.send_reply(message, self.help_content)
|
|
else:
|
|
cmd, query = get_command_query(message)
|
|
bot_response = get_bot_response(query, cmd, self.config_info)
|
|
logging.info(bot_response.format())
|
|
bot_handler.send_reply(message, bot_response)
|
|
|
|
|
|
def search_youtube(query: str, key: str, region: str, max_results: int = 1) -> List[List[str]]:
|
|
params: Dict[str, Union[str, int]] = {
|
|
"part": "id,snippet",
|
|
"maxResults": max_results,
|
|
"key": key,
|
|
"q": query,
|
|
"alt": "json",
|
|
"type": "video",
|
|
"regionCode": region,
|
|
}
|
|
|
|
url = "https://www.googleapis.com/youtube/v3/search"
|
|
try:
|
|
r = requests.get(url, params=params)
|
|
except ConnectionError: # Usually triggered by bad connection.
|
|
logging.exception("Bad connection")
|
|
raise
|
|
r.raise_for_status()
|
|
search_response = r.json()
|
|
# Add each result to the appropriate list, and then display the lists of
|
|
# matching videos, channels, and playlists.
|
|
return [
|
|
[search_result["snippet"]["title"], search_result["id"]["videoId"]]
|
|
for search_result in search_response.get("items", [])
|
|
if search_result["id"]["kind"] == "youtube#video"
|
|
]
|
|
|
|
|
|
def get_command_query(message: Dict[str, str]) -> Tuple[Optional[str], str]:
|
|
blocks = message["content"].lower().split()
|
|
command = blocks[0]
|
|
if command in commands_list:
|
|
query = message["content"][len(command) + 1 :].lstrip()
|
|
return command, query
|
|
else:
|
|
return None, message["content"]
|
|
|
|
|
|
def get_bot_response(
|
|
query: Optional[str], command: Optional[str], config_info: Dict[str, str]
|
|
) -> str:
|
|
key = config_info["key"]
|
|
max_results = int(config_info["number_of_results"])
|
|
region = config_info["video_region"]
|
|
video_list: List[List[str]] = []
|
|
try:
|
|
if query == "" or query is None:
|
|
return YoutubeHandler.help_content
|
|
if command is None or command == "top":
|
|
video_list = search_youtube(query, key, region)
|
|
|
|
elif command == "list":
|
|
video_list = search_youtube(query, key, region, max_results)
|
|
|
|
elif command == "help":
|
|
return YoutubeHandler.help_content
|
|
|
|
except (ConnectionError, HTTPError):
|
|
return "Uh-Oh, couldn't process the request right now.\nPlease again later"
|
|
|
|
reply = "Here is what I found for `" + query + "` : "
|
|
|
|
if len(video_list) == 0:
|
|
return (
|
|
"Oops ! Sorry I couldn't find any video for `" + query + "` :slightly_frowning_face:"
|
|
)
|
|
elif len(video_list) == 1:
|
|
return (
|
|
reply
|
|
+ f"\n{video_list[0][0]} - [Watch now](https://www.youtube.com/watch?v={video_list[0][1]})"
|
|
).strip()
|
|
|
|
for title, id in video_list:
|
|
reply = reply + f"\n * {title} - [Watch now](https://www.youtube.com/watch/{id})"
|
|
# Using link https://www.youtube.com/watch/<id> to
|
|
# prevent showing multiple previews
|
|
return reply
|
|
|
|
|
|
handler_class = YoutubeHandler
|