ruff: Fix SIM115 Use context handler for opening files.

Signed-off-by: Anders Kaseorg <anders@zulip.com>
This commit is contained in:
Anders Kaseorg 2023-11-01 16:51:36 -07:00
parent 5199c14077
commit 43a9263114
5 changed files with 30 additions and 26 deletions

View file

@ -13,6 +13,7 @@ import os
import sys
import time
from datetime import datetime, timedelta
from pathlib import Path
import pytz
import requests
@ -301,7 +302,7 @@ def run_mirror() -> None:
time.sleep(sleep_interval)
except KeyboardInterrupt:
open(config.RESUME_FILE, "w").write(since.strftime("%s"))
Path(config.RESUME_FILE).write_text(since.strftime("%s"))
logging.info("Shutting down Codebase mirror")
@ -310,13 +311,15 @@ def check_permissions() -> None:
# check that the log file can be written
if config.LOG_FILE:
try:
open(config.LOG_FILE, "w")
with open(config.LOG_FILE, "w"):
pass
except OSError as e:
sys.stderr.write("Could not open up log for writing:")
sys.stderr.write(str(e))
# check that the resume file can be written (this creates if it doesn't exist)
try:
open(config.RESUME_FILE, "a+")
with open(config.RESUME_FILE, "a+"):
pass
except OSError as e:
sys.stderr.write(f"Could not open up the file {config.RESUME_FILE} for reading and writing")
sys.stderr.write(str(e))

View file

@ -11,6 +11,7 @@ import subprocess
import sys
import tempfile
import traceback
from pathlib import Path
from typing import List
# Use the Zulip virtualenv if available
@ -73,8 +74,8 @@ def process_logs() -> None:
data_file_path = os.path.join(temp_dir, "log2zulip.state")
mkdir_p(os.path.dirname(data_file_path))
if not os.path.exists(data_file_path):
open(data_file_path, "w").write("{}")
last_data = json.loads(open(data_file_path).read())
Path(data_file_path).write_text("{}")
last_data = json.loads(Path(data_file_path).read_text())
new_data = {}
for log_file in log_files:
file_data = last_data.get(log_file, {})
@ -97,7 +98,7 @@ def process_logs() -> None:
process_lines(new_lines, log_file)
file_data["last"] += len(new_lines)
new_data[log_file] = file_data
open(data_file_path, "w").write(json.dumps(new_data))
Path(data_file_path).write_text(json.dumps(new_data))
if __name__ == "__main__":
@ -115,10 +116,10 @@ if __name__ == "__main__":
# TODO: Convert this locking code to use a standard context manager.
try:
open(lock_path, "w").write("1")
Path(lock_path).write_text("1")
zulip_client = zulip.init_from_options(args)
try:
log_files = json.loads(open(args.control_path).read())
log_files = json.loads(Path(args.control_path).read_text())
except (json.JSONDecodeError, OSError):
print(f"Could not load control data from {args.control_path}")
traceback.print_exc()

View file

@ -1,8 +1,8 @@
#!/usr/bin/env python3
import base64
import os
import subprocess
import sys
from pathlib import Path
short_user = sys.argv[1]
api_key = sys.argv[2]
@ -14,25 +14,25 @@ with open(f"/home/zulip/ccache/{program_name}", "wb") as f:
f.write(base64.b64decode(ccache_data_encoded))
# Setup API key
api_key_path = f"/home/zulip/api-keys/{program_name}"
open(api_key_path, "w").write(api_key + "\n")
api_key_path = Path(f"/home/zulip/api-keys/{program_name}")
api_key_path.write_text(api_key + "\n")
# Setup supervisord configuration
supervisor_path = f"/etc/supervisor/conf.d/zmirror/{program_name}.conf"
template = os.path.join(os.path.dirname(__file__), "zmirror_private.conf.template")
template_data = open(template).read()
supervisor_path = Path(f"/etc/supervisor/conf.d/zmirror/{program_name}.conf")
template_path = Path(__file__).parent / "zmirror_private.conf.template"
template_data = template_path.read_text()
session_path = f"/home/zulip/zephyr_sessions/{program_name}"
# Preserve mail zephyrs forwarding setting across rewriting the config file
try:
if "--forward-mail-zephyrs" in open(supervisor_path).read():
if "--forward-mail-zephyrs" in supervisor_path.read_text():
template_data = template_data.replace(
"--use-sessions", "--use-sessions --forward-mail-zephyrs"
)
except Exception:
pass
open(supervisor_path, "w").write(template_data.replace("USERNAME", short_user))
supervisor_path.write_text(template_data.replace("USERNAME", short_user))
# Delete your session
subprocess.check_call(["rm", "-f", session_path])

View file

@ -14,6 +14,7 @@ import tempfile
import textwrap
import time
from ctypes import POINTER, byref, c_char, c_int, c_ushort
from pathlib import Path
from queue import Queue
from threading import Thread
from types import FrameType
@ -247,9 +248,7 @@ def zephyr_bulk_subscribe(subs: List[Tuple[str, str, str]]) -> None:
def update_subscriptions() -> None:
try:
f = open(options.stream_file_path)
public_streams: List[str] = json.loads(f.read())
f.close()
public_streams: List[str] = json.loads(Path(options.stream_file_path).read_text())
except Exception:
logger.exception("Error reading public streams:")
return
@ -380,11 +379,11 @@ def parse_zephyr_body(zephyr_data: str, notice_format: str) -> Tuple[str, str]:
def parse_crypt_table(zephyr_class: str, instance: str) -> Optional[str]:
try:
crypt_table = open(os.path.join(os.environ["HOME"], ".crypt-table"))
crypt_table = (Path(os.environ["HOME"]) / ".crypt-table").read_text()
except OSError:
return None
for line in crypt_table.readlines():
for line in crypt_table.splitlines():
if line.strip() == "":
# Ignore blank lines
continue
@ -1090,13 +1089,13 @@ def valid_stream_name(name: str) -> bool:
def parse_zephyr_subs(verbose: bool = False) -> Set[Tuple[str, str, str]]:
zephyr_subscriptions: Set[Tuple[str, str, str]] = set()
subs_file = os.path.join(os.environ["HOME"], ".zephyr.subs")
if not os.path.exists(subs_file):
subs_path = Path(os.environ["HOME"]) / ".zephyr.subs"
if not subs_path.exists():
if verbose:
logger.error("Couldn't find ~/.zephyr.subs!")
return zephyr_subscriptions
for raw_line in open(subs_file).readlines():
for raw_line in subs_path.read_text().splitlines():
line = raw_line.strip()
if len(line) == 0:
continue
@ -1271,7 +1270,7 @@ or specify the --api-key-file option."""
),
)
sys.exit(1)
api_key = open(options.api_key_file).read().strip()
api_key = Path(options.api_key_file).read_text().strip()
# Store the API key in the environment so that our children
# don't need to read it in
os.environ["HUMBUG_API_KEY"] = api_key

View file

@ -7,6 +7,7 @@ import signal
import sys
import time
from contextlib import contextmanager
from pathlib import Path
from typing import IO, Any, Dict, Iterator, List, Optional, Set
from typing_extensions import Protocol
@ -415,7 +416,7 @@ def is_private_message_but_not_group_pm(
def display_config_file_errors(error_msg: str, config_file: str) -> None:
file_contents = open(config_file).read()
file_contents = Path(config_file).read_text()
print(f"\nERROR: {config_file} seems to be broken:\n\n{file_contents}")
print(f"\nMore details here:\n\n{error_msg}\n")