Convert type comments to Python ≥ 3.6 variable annotations.

Signed-off-by: Anders Kaseorg <anders@zulip.com>
This commit is contained in:
Anders Kaseorg 2023-10-17 17:12:58 -07:00
parent 8abca34a05
commit a9607dfdf9
44 changed files with 128 additions and 142 deletions

View file

@ -19,10 +19,10 @@ class HelloWorldHandler:
"""
def handle_message(self, message: Dict[str, Any], bot_handler: BotHandler) -> None:
content = "beep boop" # type: str
content = "beep boop"
bot_handler.send_reply(message, content)
emoji_name = "wave" # type: str
emoji_name = "wave"
bot_handler.react(message, emoji_name)
return

View file

@ -1,16 +1,12 @@
from typing import List
from zulint.custom_rules import RuleList
from zulint.custom_rules import Rule, RuleList
MYPY = False
if MYPY:
from zulint.custom_rules import Rule
whitespace_rules = [
whitespace_rules: List[Rule] = [
# This linter should be first since bash_rules depends on it.
{"pattern": r"\s+$", "strip": "\n", "description": "Fix trailing whitespace"},
{"pattern": "\t", "strip": "\n", "description": "Fix tab-based whitespace"},
] # type: List[Rule]
]
markdown_whitespace_rules = list(
rule for rule in whitespace_rules if rule["pattern"] != r"\s+$"
@ -129,7 +125,7 @@ json_rules = RuleList(
rules=whitespace_rules[0:1],
)
prose_style_rules = [
prose_style_rules: List[Rule] = [
{
"pattern": r'[^\/\#\-"]([jJ]avascript)', # exclude usage in hrefs/divs
"description": "javascript should be spelled JavaScript",
@ -147,7 +143,7 @@ prose_style_rules = [
"pattern": r"[^-_]botserver(?!rc)|bot server",
"description": "Use Botserver instead of botserver or Botserver.",
},
] # type: List[Rule]
]
markdown_docs_length_exclude = {
"zulip_bots/zulip_bots/bots/converter/doc.md",

View file

@ -11,12 +11,12 @@ from typing import Any, Callable, Dict, List
import requests
from requests import Response
red = "\033[91m" # type: str
green = "\033[92m" # type: str
end_format = "\033[0m" # type: str
bold = "\033[1m" # type: str
red = "\033[91m"
green = "\033[92m"
end_format = "\033[0m"
bold = "\033[1m"
bots_dir = ".bots" # type: str
bots_dir = ".bots"
def pack(options: argparse.Namespace) -> None:

View file

@ -175,7 +175,7 @@ for inpath in force_include:
try:
ext = os.path.splitext(inpath)[1].split(".")[1]
except IndexError:
ext = "py" # type: str
ext = "py"
files_dict[ext].append(inpath)
pyi_files = set(files_dict["pyi"])

View file

@ -21,7 +21,7 @@ class IRCBot(irc.bot.SingleServerIRCBot):
nickserv_password: str = "",
port: int = 6667,
) -> None:
self.channel = channel # type: irc.bot.Channel
self.channel: irc.bot.Channel = channel
self.zulip_client = zulip_client
self.stream = stream
self.topic = topic

View file

@ -8,9 +8,9 @@ from oauth2client.file import Storage
try:
import argparse
flags = argparse.ArgumentParser(
flags: Optional[argparse.Namespace] = argparse.ArgumentParser(
parents=[tools.argparser]
).parse_args() # type: Optional[argparse.Namespace]
).parse_args()
except ImportError:
flags = None

View file

@ -31,10 +31,10 @@ APPLICATION_NAME = "Zulip"
HOME_DIR = os.path.expanduser("~")
# Our cached view of the calendar, updated periodically.
events = [] # type: List[Tuple[int, datetime.datetime, str]]
events: List[Tuple[int, datetime.datetime, str]] = []
# Unique keys for events we've already sent, so we don't remind twice.
sent = set() # type: Set[Tuple[int, datetime.datetime]]
sent: Set[Tuple[int, datetime.datetime]] = set()
sys.path.append(os.path.dirname(__file__))

View file

@ -87,7 +87,7 @@ class JabberToZulipBot(ClientXMPP):
self.nick = jid.username
jid.resource = "zulip"
ClientXMPP.__init__(self, jid, password)
self.rooms = set() # type: Set[str]
self.rooms: Set[str] = set()
self.rooms_to_join = rooms
self.add_event_handler("session_start", self.session_start)
self.add_event_handler("message", self.message)
@ -205,7 +205,7 @@ class JabberToZulipBot(ClientXMPP):
class ZulipToJabberBot:
def __init__(self, zulip_client: Client) -> None:
self.client = zulip_client
self.jabber = None # type: Optional[JabberToZulipBot]
self.jabber: Optional[JabberToZulipBot] = None
def set_jabber_client(self, client: JabberToZulipBot) -> None:
self.jabber = client
@ -282,7 +282,7 @@ def get_rooms(zulipToJabber: ZulipToJabberBot) -> List[str]:
else:
stream_infos = get_stream_infos("subscriptions", zulipToJabber.client.get_subscriptions)
rooms = [] # type: List[str]
rooms: List[str] = []
for stream_info in stream_infos:
stream = stream_info["name"]
if stream.endswith("/xmpp"):

View file

@ -105,7 +105,7 @@ def process_logs() -> None:
if __name__ == "__main__":
parser = zulip.add_default_arguments(argparse.ArgumentParser()) # type: argparse.ArgumentParser
parser = zulip.add_default_arguments(argparse.ArgumentParser())
parser.add_argument("--control-path", default="/etc/log2zulip.conf")
args = parser.parse_args()

View file

@ -8,7 +8,7 @@ VERSION = "0.9"
# Nagios passes the notification details as command line options.
# In Nagios, "output" means "first line of output", and "long
# output" means "other lines of output".
parser = zulip.add_default_arguments(argparse.ArgumentParser()) # type: argparse.ArgumentParser
parser = zulip.add_default_arguments(argparse.ArgumentParser())
parser.add_argument("--output", default="")
parser.add_argument("--long-output", default="")
parser.add_argument("--stream", default="nagios")
@ -17,11 +17,9 @@ for opt in ("type", "host", "service", "state"):
parser.add_argument("--" + opt)
opts = parser.parse_args()
client = zulip.Client(
config_file=opts.config, client="ZulipNagios/" + VERSION
) # type: zulip.Client
client = zulip.Client(config_file=opts.config, client="ZulipNagios/" + VERSION)
msg = dict(type="stream", to=opts.stream) # type: Dict[str, Any]
msg: Dict[str, Any] = dict(type="stream", to=opts.stream)
# Set a subject based on the host or service in question. This enables
# threaded discussion of multiple concurrent issues, and provides useful

View file

@ -52,7 +52,7 @@ def format_deployment_message(
## If properly installed, the Zulip API should be in your import
## path, but if not, set a custom path below
ZULIP_API_PATH = None # type: Optional[str]
ZULIP_API_PATH: Optional[str] = None
# Set this to your Zulip server's API URI
ZULIP_SITE = "https://zulip.example.com"

View file

@ -37,11 +37,11 @@ client = zulip.Client(
site=config.ZULIP_SITE,
api_key=config.ZULIP_API_KEY,
client="ZulipPerforce/" + __version__,
) # type: zulip.Client
)
try:
changelist = int(sys.argv[1]) # type: int
changeroot = sys.argv[2] # type: str
changelist = int(sys.argv[1])
changeroot = sys.argv[2]
except IndexError:
print("Wrong number of arguments.\n\n", end=" ", file=sys.stderr)
print(__doc__, file=sys.stderr)
@ -51,11 +51,9 @@ except ValueError:
print(__doc__, file=sys.stderr)
sys.exit(-1)
metadata = git_p4.p4_describe(changelist) # type: Dict[str, str]
metadata: Dict[str, str] = git_p4.p4_describe(changelist)
destination = config.commit_notice_destination(
changeroot, changelist
) # type: Optional[Dict[str, str]]
destination: Optional[Dict[str, str]] = config.commit_notice_destination(changeroot, changelist)
if destination is None:
# Don't forward the notice anywhere
@ -88,12 +86,12 @@ message = """**{user}** committed revision @{change} to `{path}`.
```
""".format(
user=metadata["user"], change=change, path=changeroot, desc=metadata["desc"]
) # type: str
)
message_data = {
message_data: Dict[str, Any] = {
"type": "stream",
"to": destination["stream"],
"subject": destination["subject"],
"content": message,
} # type: Dict[str, Any]
}
client.send_message(message_data)

View file

@ -20,9 +20,9 @@ import feedparser
import zulip
VERSION = "0.9" # type: str
RSS_DATA_DIR = os.path.expanduser(os.path.join("~", ".cache", "zulip-rss")) # type: str
OLDNESS_THRESHOLD = 30 # type: int
VERSION = "0.9"
RSS_DATA_DIR = os.path.expanduser(os.path.join("~", ".cache", "zulip-rss"))
OLDNESS_THRESHOLD = 30
usage = """Usage: Send summaries of RSS entries for your favorite feeds to Zulip.
@ -48,9 +48,7 @@ stream every 5 minutes is:
*/5 * * * * /usr/local/share/zulip/integrations/rss/rss-bot"""
parser = zulip.add_default_arguments(
argparse.ArgumentParser(usage)
) # type: argparse.ArgumentParser
parser = zulip.add_default_arguments(argparse.ArgumentParser(usage))
parser.add_argument(
"--stream",
dest="stream",
@ -94,7 +92,7 @@ parser.add_argument(
default=False,
)
opts = parser.parse_args() # type: Any
opts = parser.parse_args()
def mkdir_p(path: str) -> None:
@ -115,15 +113,15 @@ except OSError:
print(f"Unable to store RSS data at {opts.data_dir}.", file=sys.stderr)
exit(1)
log_file = os.path.join(opts.data_dir, "rss-bot.log") # type: str
log_format = "%(asctime)s: %(message)s" # type: str
log_file = os.path.join(opts.data_dir, "rss-bot.log")
log_format = "%(asctime)s: %(message)s"
logging.basicConfig(format=log_format)
formatter = logging.Formatter(log_format) # type: logging.Formatter
file_handler = logging.FileHandler(log_file) # type: logging.FileHandler
formatter = logging.Formatter(log_format)
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
logger = logging.getLogger(__name__) # type: logging.Logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(file_handler)
@ -138,7 +136,7 @@ class MLStripper(HTMLParser):
def __init__(self) -> None:
super().__init__()
self.reset()
self.fed = [] # type: List[str]
self.fed: List[str] = []
def handle_data(self, data: str) -> None:
self.fed.append(data)
@ -173,7 +171,7 @@ def elide_subject(subject: str) -> str:
def send_zulip(entry: Any, feed_name: str) -> Dict[str, Any]:
body = entry.summary # type: str
body: str = entry.summary
if opts.unwrap:
body = unwrap_text(body)
@ -182,56 +180,52 @@ def send_zulip(entry: Any, feed_name: str) -> Dict[str, Any]:
entry.link,
strip_tags(body),
entry.link,
) # type: str
)
if opts.math:
content = content.replace("$", "$$")
message = {
message: Dict[str, str] = {
"type": "stream",
"to": opts.stream,
"subject": opts.topic or elide_subject(feed_name),
"content": content,
} # type: Dict[str, str]
}
return client.send_message(message)
try:
with open(opts.feed_file) as f:
feed_urls = [feed.strip() for feed in f.readlines()] # type: List[str]
feed_urls: List[str] = [feed.strip() for feed in f.readlines()]
except OSError:
log_error_and_exit(f"Unable to read feed file at {opts.feed_file}.")
client = zulip.Client(
client: zulip.Client = zulip.Client(
email=opts.zulip_email,
api_key=opts.zulip_api_key,
config_file=opts.zulip_config_file,
site=opts.zulip_site,
client="ZulipRSS/" + VERSION,
) # type: zulip.Client
)
first_message = True # type: bool
first_message = True
for feed_url in feed_urls:
feed_file = os.path.join(opts.data_dir, urllib.parse.urlparse(feed_url).netloc) # Type: str
try:
with open(feed_file) as f:
old_feed_hashes = {
line.strip(): True for line in f.readlines()
} # type: Dict[str, bool]
old_feed_hashes = {line.strip(): True for line in f.readlines()}
except OSError:
old_feed_hashes = {}
new_hashes = [] # type: List[str]
data = feedparser.parse(feed_url) # type: feedparser.parse
new_hashes: List[str] = []
data = feedparser.parse(feed_url)
for entry in data.entries:
entry_hash = compute_entry_hash(entry) # type: str
entry_hash = compute_entry_hash(entry)
# An entry has either been published or updated.
entry_time = entry.get(
"published_parsed", entry.get("updated_parsed")
) # type: Tuple[int, int]
entry_time: Tuple[int, int] = entry.get("published_parsed", entry.get("updated_parsed"))
if (
entry_time is not None
and (time.time() - calendar.timegm(entry_time)) > OLDNESS_THRESHOLD * 60 * 60 * 24
@ -247,9 +241,9 @@ for feed_url in feed_urls:
# entries in reverse chronological order.
break
feed_name = data.feed.title or feed_url # type: str
feed_name: str = data.feed.title or feed_url
response = send_zulip(entry, feed_name) # type: Dict[str, Any]
response: Dict[str, Any] = send_zulip(entry, feed_name)
if response["result"] != "success":
logger.error(f"Error processing {feed_url}")
logger.error(str(response))

View file

@ -31,17 +31,17 @@ client = zulip.Client(
site=config.ZULIP_SITE,
api_key=config.ZULIP_API_KEY,
client="ZulipSVN/" + VERSION,
) # type: zulip.Client
svn = pysvn.Client() # type: pysvn.Client
)
svn = pysvn.Client()
path, rev = sys.argv[1:]
# since its a local path, prepend "file://"
path = "file://" + path
entry = svn.log(path, revision_end=pysvn.Revision(pysvn.opt_revision_kind.number, rev))[
0
] # type: Dict[str, Any]
entry: Dict[str, Any] = svn.log(
path, revision_end=pysvn.Revision(pysvn.opt_revision_kind.number, rev)
)[0]
message = "**{}** committed revision r{} to `{}`.\n\n> {}".format(
entry["author"], rev, path.split("/")[-1], entry["revprops"]["svn:log"]
)
@ -49,10 +49,10 @@ message = "**{}** committed revision r{} to `{}`.\n\n> {}".format(
destination = config.commit_notice_destination(path, rev)
if destination is not None:
message_data = {
message_data: Dict[str, Any] = {
"type": "stream",
"to": destination["stream"],
"subject": destination["subject"],
"content": message,
} # type: Dict[str, Any]
}
client.send_message(message_data)

View file

@ -192,8 +192,8 @@ if not actually_subscribed:
print_status_and_exit(1)
# Prepare keys
zhkeys = {} # type: Dict[str, Tuple[str, str]]
hzkeys = {} # type: Dict[str, Tuple[str, str]]
zhkeys: Dict[str, Tuple[str, str]] = {}
hzkeys: Dict[str, Tuple[str, str]] = {}
def gen_key(key_dict: Dict[str, Tuple[str, str]]) -> str:
@ -313,7 +313,7 @@ def process_keys(content_list: List[str]) -> Tuple[Dict[str, int], Set[str], Set
# Start by filtering out any keys that might have come from
# concurrent check-mirroring processes
content_keys = [key for key in content_list if key in all_keys]
key_counts = {} # type: Dict[str, int]
key_counts: Dict[str, int] = {}
for key in all_keys:
key_counts[key] = 0
for key in content_keys:

View file

@ -1107,7 +1107,7 @@ def valid_stream_name(name: str) -> bool:
def parse_zephyr_subs(verbose: bool = False) -> Set[Tuple[str, str, str]]:
zephyr_subscriptions = set() # type: Set[Tuple[str, str, str]]
zephyr_subscriptions: Set[Tuple[str, str, str]] = set()
subs_file = os.path.join(os.environ["HOME"], ".zephyr.subs")
if not os.path.exists(subs_file):
if verbose:
@ -1348,7 +1348,7 @@ or specify the --api-key-file option."""
options.session_path = f"/var/tmp/{options.user}"
if options.forward_from_zulip:
child_pid = os.fork() # type: Optional[int]
child_pid: Optional[int] = os.fork()
if child_pid == 0:
CURRENT_STATE = States.ZulipToZephyr
# Run the zulip => zephyr mirror in the child

View file

@ -1,4 +1,4 @@
import pkgutil
from typing import List
__path__ = pkgutil.extend_path(__path__, __name__) # type: List[str]
__path__: List[str] = pkgutil.extend_path(__path__, __name__)

View file

@ -485,7 +485,7 @@ class Client:
"certificate will not be validated, making the "
"HTTPS connection potentially insecure"
)
self.tls_verification = False # type: Union[bool, str]
self.tls_verification: Union[bool, str] = False
elif cert_bundle is not None:
if not os.path.isfile(cert_bundle):
raise ConfigNotFoundError(f"tls bundle '{cert_bundle}' does not exist")
@ -509,7 +509,7 @@ class Client:
self.client_cert = client_cert
self.client_cert_key = client_cert_key
self.session = None # type: Optional[requests.Session]
self.session: Optional[requests.Session] = None
self.has_connected = False
@ -527,10 +527,10 @@ class Client:
# Build a client cert object for requests
if self.client_cert_key is not None:
assert self.client_cert is not None # Otherwise ZulipError near end of __init__
client_cert = (
client_cert: Union[None, str, Tuple[str, str]] = (
self.client_cert,
self.client_cert_key,
) # type: Union[None, str, Tuple[str, str]]
)
else:
client_cert = self.client_cert
@ -602,11 +602,11 @@ class Client:
self.ensure_session()
assert self.session is not None
query_state = {
query_state: Dict[str, Any] = {
"had_error_retry": False,
"request": request,
"failures": 0,
} # type: Dict[str, Any]
}
def error_retry(error_string: str) -> bool:
if not self.retry_on_errors or query_state["failures"] >= 10:

View file

@ -40,7 +40,7 @@ request = {
"apply_markdown": False,
}
all_messages = [] # type: List[Dict[str, Any]]
all_messages: List[Dict[str, Any]] = []
found_newest = False
while not found_newest:

View file

@ -28,7 +28,7 @@ options = parser.parse_args()
client = zulip.init_from_options(options)
if options.file_path:
file = open(options.file_path, "rb") # type: IO[Any]
file: IO[Any] = open(options.file_path, "rb")
else:
file = StringIO("This is a test file.")
file.name = "test.txt"

View file

@ -241,14 +241,14 @@ class BaremetricsHandler:
return "\n".join(response)
def create_plan(self, parameters: List[str]) -> str:
data_header = {
data_header: Any = {
"oid": parameters[1],
"name": parameters[2],
"currency": parameters[3],
"amount": int(parameters[4]),
"interval": parameters[5],
"interval_count": int(parameters[6]),
} # type: Any
}
url = f"https://api.baremetrics.com/v1/{parameters[0]}/plans"
create_plan_response = requests.post(url, data=data_header, headers=self.auth_header)

View file

@ -87,8 +87,8 @@ def get_bot_converter_response(message: Dict[str, str], bot_handler: BotHandler)
exponent -= exp
unit_to = unit_to[len(key) :]
uf_to_std = utils.UNITS.get(unit_from, []) # type: List[Any]
ut_to_std = utils.UNITS.get(unit_to, []) # type: List[Any]
uf_to_std: List[Any] = utils.UNITS.get(unit_from, [])
ut_to_std: List[Any] = utils.UNITS.get(unit_to, [])
if not uf_to_std:
results.append("`" + unit_from + "` is not a valid unit. " + utils.QUICK_HELP)

View file

@ -148,7 +148,7 @@ def dbx_ls(client: Any, fn: str) -> str:
try:
result = client.files_list_folder(fn)
files_list = [] # type: List[str]
files_list: List[str] = []
for meta in result.entries:
files_list += [" - " + URL.format(name=meta.name, path=meta.path_lower)]

View file

@ -16,10 +16,10 @@ class HelloWorldHandler:
"""
def handle_message(self, message: Dict[str, Any], bot_handler: BotHandler) -> None:
content = "beep boop" # type: str
content = "beep boop"
bot_handler.send_reply(message, content)
emoji_name = "wave" # type: str
emoji_name = "wave"
bot_handler.react(message, emoji_name)
return

View file

@ -2,7 +2,7 @@ from zulip_bots.test_lib import BotTestCase, DefaultTests
class TestHelpBot(BotTestCase, DefaultTests):
bot_name = "helloworld" # type: str
bot_name: str = "helloworld"
def test_bot(self) -> None:
dialog = [

View file

@ -4,7 +4,7 @@ from zulip_bots.test_lib import BotTestCase, DefaultTests
class TestIDoneThisBot(BotTestCase, DefaultTests):
bot_name = "idonethis" # type: str
bot_name: str = "idonethis"
def test_create_entry_default_team(self) -> None:
with self.mock_config_info(

View file

@ -23,7 +23,7 @@ class LinkShortenerHandler:
self.check_api_key(bot_handler)
def check_api_key(self, bot_handler: BotHandler) -> None:
test_request_data = self.call_link_shorten_service("www.youtube.com/watch") # type: Any
test_request_data: Any = self.call_link_shorten_service("www.youtube.com/watch")
try:
if self.is_invalid_token_error(test_request_data):
bot_handler.quit(

View file

@ -75,12 +75,12 @@ class MentionHandler:
"Accept-Version": "1.15",
}
create_alert_data = {
create_alert_data: Any = {
"name": keyword,
"query": {"type": "basic", "included_keywords": [keyword]},
"languages": ["en"],
"sources": ["web"],
} # type: Any
}
response = requests.post(
"https://api.mention.net/api/accounts/" + self.account_id + "/alerts",

View file

@ -78,7 +78,7 @@ def query_salesforce(
) -> str:
arg = arg.strip()
qarg = arg.split(" -", 1)[0]
split_args = [] # type: List[str]
split_args: List[str] = []
raw_arg = ""
if len(arg.split(" -", 1)) > 1:
raw_arg = " -" + arg.split(" -", 1)[1]
@ -96,10 +96,10 @@ def query_salesforce(
res = salesforce.query(
query.format(object_type["fields"], object_type["table"], qarg, limit_num)
)
exclude_keys = [] # type: List[str]
exclude_keys: List[str] = []
if "exclude_keys" in command.keys():
exclude_keys = command["exclude_keys"]
force_keys = [] # type: List[str]
force_keys: List[str] = []
if "force_keys" in command.keys():
force_keys = command["force_keys"]
rank_output = False

View file

@ -96,7 +96,7 @@ mock_object_types = {
class TestSalesforceBot(BotTestCase, DefaultTests):
bot_name = "salesforce" # type: str
bot_name: str = "salesforce"
def _test(self, test_name: str, message: str, response: str, auth_success: bool = True) -> None:
with self.mock_config_info(mock_config), mock_salesforce_auth(

View file

@ -3,7 +3,7 @@ from typing import Any, Dict, List
link_query = "SELECT {} FROM {} WHERE Id='{}'"
default_query = "SELECT {} FROM {} WHERE Name LIKE '%{}%' LIMIT {}"
commands = [
commands: List[Dict[str, Any]] = [
{
"commands": ["search account", "find account", "search accounts", "find accounts"],
"object": "account",
@ -41,9 +41,9 @@ commands = [
"rank_output": True,
"force_keys": ["Amount"],
},
] # type: List[Dict[str, Any]]
]
object_types = {
object_types: Dict[str, Dict[str, str]] = {
"account": {
"fields": "Id, Name, Phone, BillingStreet, BillingCity, BillingState",
"table": "Account",
@ -53,4 +53,4 @@ object_types = {
"fields": "Id, Name, Amount, Probability, StageName, CloseDate",
"table": "Opportunity",
},
} # type: Dict[str, Dict[str, str]]
}

View file

@ -160,7 +160,7 @@ class TicTacToeModel:
# there are two blanks and an 2 in each row, column, and diagonal (done in two_blanks).
# If smarter is False, all blank locations can be chosen.
if self.smarter:
blanks = [] # type: Any
blanks: Any = []
for triplet in self.triplets:
result = self.two_blanks(triplet, board)
if result:

View file

@ -7,7 +7,7 @@ mock_config = {"api_key": "TEST", "access_token": "TEST", "user_name": "TEST"}
class TestTrelloBot(BotTestCase, DefaultTests):
bot_name = "trello" # type: str
bot_name: str = "trello"
def test_bot_responds_to_empty_message(self) -> None:
with self.mock_config_info(mock_config), patch("requests.get"):

View file

@ -93,7 +93,7 @@ class TrelloHandler:
return bot_response
def get_board_descs(self, boards: List[str]) -> str:
bot_response = [] # type: List[str]
bot_response: List[str] = []
get_board_desc_url = "https://api.trello.com/1/boards/{}/"
for index, board in enumerate(boards):
board_desc_response = requests.get(

View file

@ -15,7 +15,7 @@ from zulip_bots.test_lib import BotTestCase, DefaultTests, StubBotHandler, read_
class TestTriviaQuizBot(BotTestCase, DefaultTests):
bot_name = "trivia_quiz" # type: str
bot_name: str = "trivia_quiz"
new_question_response = (
"\nQ: Which class of animals are newts members of?\n\n"

View file

@ -130,13 +130,13 @@ def get_quiz_from_payload(payload: Dict[str, Any]) -> Dict[str, Any]:
for i in range(3):
answers[letters[i + 1]] = result["incorrect_answers"][i]
answers = {letter: fix_quotes(answer) for letter, answer in answers.items()}
quiz = dict(
quiz: Dict[str, Any] = dict(
question=fix_quotes(question),
answers=answers,
answered_options=[],
pending=True,
correct_letter=correct_letter,
) # type: Dict[str, Any]
)
return quiz

View file

@ -8,11 +8,11 @@ from zulip_bots.test_lib import BotTestCase, DefaultTests, StubBotHandler, get_b
class TestYoutubeBot(BotTestCase, DefaultTests):
bot_name = "youtube"
normal_config = {
normal_config: Dict[str, str] = {
"key": "12345678",
"number_of_results": "5",
"video_region": "US",
} # type: Dict[str,str]
}
help_content = (
"*Help for YouTube bot* :robot_face: : \n\n"

View file

@ -56,7 +56,7 @@ class YoutubeHandler:
def search_youtube(query: str, key: str, region: str, max_results: int = 1) -> List[List[str]]:
videos = []
params = {
params: Dict[str, Union[str, int]] = {
"part": "id,snippet",
"maxResults": max_results,
"key": key,
@ -64,7 +64,7 @@ def search_youtube(query: str, key: str, region: str, max_results: int = 1) -> L
"alt": "json",
"type": "video",
"regionCode": region,
} # type: Dict[str, Union[str, int]]
}
url = "https://www.googleapis.com/youtube/v3/search"
try:
@ -98,7 +98,7 @@ def get_bot_response(
key = config_info["key"]
max_results = int(config_info["number_of_results"])
region = config_info["video_region"]
video_list = [] # type: List[List[str]]
video_list: List[List[str]] = []
try:
if query == "" or query is None:
return YoutubeHandler.help_content

View file

@ -55,17 +55,17 @@ class GameAdapter:
self.is_single_player = self.min_players == self.max_players == 1
self.supports_computer = supports_computer
self.gameMessageHandler = gameMessageHandler()
self.invites = {} # type: Dict[str, Dict[str, str]]
self.instances = {} # type: Dict[str, Any]
self.user_cache = {} # type: Dict[str, Dict[str, Any]]
self.pending_subject_changes = [] # type: List[str]
self.invites: Dict[str, Dict[str, str]] = {}
self.instances: Dict[str, Any] = {}
self.user_cache: Dict[str, Dict[str, Any]] = {}
self.pending_subject_changes: List[str] = []
self.stream = "games"
self.rules = rules
# Values are [won, lost, drawn, total] new values can be added, but MUST be added to the end of the list.
def add_user_statistics(self, user: str, values: Dict[str, int]) -> None:
self.get_user_cache()
current_values = {} # type: Dict[str, int]
current_values: Dict[str, int] = {}
if "stats" in self.get_user_by_email(user).keys():
current_values = self.user_cache[user]["stats"]
for key, value in values.items():
@ -570,7 +570,7 @@ class GameAdapter:
def get_players(self, game_id: str, parameter: str = "a") -> List[str]:
if game_id in self.invites.keys():
players = [] # type: List[str]
players: List[str] = []
if (
self.invites[game_id]["subject"] == "###private###" and "p" in parameter
) or "p" not in parameter:
@ -587,7 +587,7 @@ class GameAdapter:
return []
def get_game_info(self, game_id: str) -> Dict[str, Any]:
game_info = {} # type: Dict[str, Any]
game_info: Dict[str, Any] = {}
if game_id in self.instances.keys():
instance = self.instances[game_id]
game_info = {
@ -855,8 +855,8 @@ class GameInstance:
self.model = deepcopy(self.gameAdapter.model())
self.board = self.model.current_board
self.turn = random.randrange(0, len(players)) - 1
self.current_draw = {} # type: Dict[str, bool]
self.current_messages = [] # type: List[str]
self.current_draw: Dict[str, bool] = {}
self.current_messages: List[str] = []
self.is_changing_subject = False
def start(self) -> None:

View file

@ -53,7 +53,7 @@ class RateLimit:
def __init__(self, message_limit: int, interval_limit: int) -> None:
self.message_limit = message_limit
self.interval_limit = interval_limit
self.message_list = [] # type: List[float]
self.message_list: List[float] = []
self.error_message = "-----> !*!*!*MESSAGE RATE LIMIT REACHED, EXITING*!*!*! <-----\n"
"Is your bot trapped in an infinite loop by reacting to its own messages?"

View file

@ -19,7 +19,7 @@ def get_bot_message_handler(bot_name: str) -> Any:
# handler class. Eventually, we want bot's handler classes to
# inherit from a common prototype specifying the handle_message
# function.
lib_module = import_module("zulip_bots.bots.{bot}.{bot}".format(bot=bot_name)) # type: Any
lib_module: Any = import_module("zulip_bots.bots.{bot}.{bot}".format(bot=bot_name))
return lib_module.handler_class()

View file

@ -18,7 +18,7 @@ class StubBotHandler:
self.reset_transcript()
def reset_transcript(self) -> None:
self.transcript = [] # type: List[Tuple[str, Dict[str, Any]]]
self.transcript: List[Tuple[str, Dict[str, Any]]] = []
def identity(self) -> BotIdentity:
return BotIdentity(self.full_name, self.email)

View file

@ -1,4 +1,4 @@
import pkgutil
from typing import List
__path__ = pkgutil.extend_path(__path__, __name__) # type: List[str]
__path__: List[str] = pkgutil.extend_path(__path__, __name__)

View file

@ -31,7 +31,7 @@ def read_config_section(parser: configparser.ConfigParser, section: str) -> Dict
def read_config_from_env_vars(bot_name: Optional[str] = None) -> Dict[str, Dict[str, str]]:
bots_config = {} # type: Dict[str, Dict[str, str]]
bots_config: Dict[str, Dict[str, str]] = {}
json_config = os.environ.get("ZULIP_BOTSERVER_CONFIG")
if json_config is None:
@ -67,7 +67,7 @@ def read_config_file(
) -> Dict[str, Dict[str, str]]:
parser = parse_config_file(config_file_path)
bots_config = {} # type: Dict[str, Dict[str, str]]
bots_config: Dict[str, Dict[str, str]] = {}
if bot_name is None:
bots_config = {
section: read_config_section(parser, section) for section in parser.sections()
@ -173,7 +173,7 @@ def init_message_handlers(
app = Flask(__name__)
bots_config = {} # type: Dict[str, Dict[str, str]]
bots_config: Dict[str, Dict[str, str]] = {}
@app.route("/", methods=["POST"])