From a534446315286c68529500bd03faaa94f51d0398 Mon Sep 17 00:00:00 2001 From: Anders Kaseorg Date: Fri, 17 Jun 2022 11:49:25 -0700 Subject: [PATCH] zephyr: Remove python-zephyr in favor of ctypes. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Our custom patched version of python-zephyr only worked on Python 2. Now we don’t need python-zephyr at all. Signed-off-by: Anders Kaseorg --- pyproject.toml | 8 +- zulip/integrations/zephyr/check-mirroring | 65 ++++-- zulip/integrations/zephyr/zephyr_ctypes.py | 206 ++++++++++++++++++ .../zephyr/zephyr_mirror_backend.py | 197 +++++++++++------ 4 files changed, 391 insertions(+), 85 deletions(-) create mode 100644 zulip/integrations/zephyr/zephyr_ctypes.py diff --git a/pyproject.toml b/pyproject.toml index 99089263..577099be 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,6 +3,12 @@ line-length = 100 target-version = ["py36"] [tool.isort] -src_paths = ["tools", "zulip", "zulip_bots", "zulip_botserver"] +src_paths = [ + "tools", + "zulip", + "zulip/integrations/zephyr", + "zulip_bots", + "zulip_botserver", +] profile = "black" line_length = 100 diff --git a/zulip/integrations/zephyr/check-mirroring b/zulip/integrations/zephyr/check-mirroring index 33d63c60..67ccb249 100755 --- a/zulip/integrations/zephyr/check-mirroring +++ b/zulip/integrations/zephyr/check-mirroring @@ -6,10 +6,10 @@ import random import subprocess import sys import time +from ctypes import byref, c_int, c_ushort from typing import Dict, List, Set, Tuple -import zephyr - +import zephyr_ctypes import zulip parser = optparse.OptionParser() @@ -136,9 +136,43 @@ for (stream, test) in test_streams: actually_subscribed = False for tries in range(10): try: - zephyr.init() - zephyr._z.subAll(zephyr_subs_to_add) - zephyr_subs = zephyr._z.getSubscriptions() + zephyr_ctypes.check(zephyr_ctypes.ZInitialize()) + zephyr_port = c_ushort() + zephyr_ctypes.check(zephyr_ctypes.ZOpenPort(byref(zephyr_port))) + zephyr_ctypes.check(zephyr_ctypes.ZCancelSubscriptions(0)) + + zephyr_ctypes.check( + zephyr_ctypes.ZSubscribeTo( + (zephyr_ctypes.ZSubscription_t * len(zephyr_subs_to_add))( + *( + zephyr_ctypes.ZSubscription_t( + zsub_class=cls.encode(), + zsub_classinst=instance.encode(), + zsub_recipient=recipient.encode(), + ) + for cls, instance, recipient in zephyr_subs_to_add + ) + ), + len(zephyr_subs_to_add), + 0, + ) + ) + + try: + nsubs = c_int() + zephyr_ctypes.check(zephyr_ctypes.ZRetrieveSubscriptions(0, byref(nsubs))) + zsubs = (zephyr_ctypes.ZSubscription_t * nsubs.value)() + zephyr_ctypes.check(zephyr_ctypes.ZGetSubscriptions(zsubs, byref(nsubs))) + zephyr_subs = { + ( + zsub.zsub_class.decode(), + zsub.zsub_classinst.decode(), + zsub.zsub_recipient.decode(), + ) + for zsub in zsubs + } + finally: + zephyr_ctypes.ZFlushSubscriptions() missing = 0 for elt in zephyr_subs_to_add: @@ -148,8 +182,8 @@ for tries in range(10): if missing == 0: actually_subscribed = True break - except OSError as e: - if "SERVNAK received" in e.args: + except zephyr_ctypes.ZephyrError as e: + if e.code == zephyr_ctypes.ZERR_SERVNAK: logger.error("SERVNAK repeatedly received, punting rest of test") else: logger.exception("Exception subscribing to zephyrs") @@ -185,15 +219,15 @@ notices = [] # receive queue with 30+ messages, which might result in messages # being dropped. def receive_zephyrs() -> None: - while True: + while zephyr_ctypes.ZPending() != 0: + notice = zephyr_ctypes.ZNotice_t() + sender = zephyr_ctypes.sockaddr_in() try: - notice = zephyr.receive(block=False) - except Exception: + zephyr_ctypes.check(zephyr_ctypes.ZReceiveNotice(byref(notice), byref(sender))) + except zephyr_ctypes.ZephyrError: logging.exception("Exception receiving zephyrs:") - notice = None - if notice is None: break - if notice.opcode != "": + if notice.z_opcode != b"": continue notices.append(notice) @@ -294,7 +328,10 @@ def process_keys(content_list: List[str]) -> Tuple[Dict[str, int], Set[str], Set # The h_foo variables are about the messages we _received_ in Zulip # The z_foo variables are about the messages we _received_ in Zephyr h_contents = [message["content"] for message in messages] -z_contents = [notice.message.split("\0")[1] for notice in notices] +z_contents = [ + notice.z_message[: notice.z_message_len].split(b"\0")[1].decode(errors="replace") + for notice in notices +] (h_key_counts, h_missing_z, h_missing_h, h_duplicates, h_success) = process_keys(h_contents) (z_key_counts, z_missing_z, z_missing_h, z_duplicates, z_success) = process_keys(z_contents) diff --git a/zulip/integrations/zephyr/zephyr_ctypes.py b/zulip/integrations/zephyr/zephyr_ctypes.py new file mode 100644 index 00000000..ef35f0a3 --- /dev/null +++ b/zulip/integrations/zephyr/zephyr_ctypes.py @@ -0,0 +1,206 @@ +from ctypes import ( + CDLL, + CFUNCTYPE, + POINTER, + Structure, + Union, + c_char, + c_char_p, + c_int, + c_long, + c_uint, + c_uint8, + c_uint16, + c_uint32, + c_ushort, + c_void_p, +) + +libc = CDLL("libc.so.6") +com_err = CDLL("libcom_err.so.2") +libzephyr = CDLL("libzephyr.so.4") + + +# --- glibc/bits/sockaddr.h --- + +sa_family_t = c_ushort + + +# --- glibc/sysdeps/unix/sysv/linux/bits/socket.h --- + + +class sockaddr(Structure): + _fields_ = [ + ("sa_family", sa_family_t), + ("sa_data", c_char * 14), + ] + + +# --- glibc/inet/netinet/in.h --- + +in_port_t = c_uint16 +in_addr_t = c_uint32 + + +class in_addr(Structure): + _fields_ = [ + ("s_addr", in_addr_t), + ] + + +class sockaddr_in(Structure): + _fields_ = [ + ("sin_family", sa_family_t), + ("sin_port", in_port_t), + ("sin_addr", in_addr), + ("sin_zero", c_uint8 * 8), + ] + + +class in6_addr(Structure): + _fields_ = [ + ("s6_addr", c_uint8 * 16), + ] + + +class sockaddr_in6(Structure): + _fields_ = [ + ("sin6_family", sa_family_t), + ("sin6_port", in_port_t), + ("sin6_flowinfo", c_uint32), + ("sin6_addr", in6_addr), + ("sin6_scope_id", c_uint32), + ] + + +# --- glibc/stdlib/stdlib.h --- + +free = CFUNCTYPE(None, c_void_p)(("free", libc)) + + +# --- e2fsprogs/lib/et/com_err.h --- + +error_message = CFUNCTYPE(c_char_p, c_long)(("error_message", com_err)) + + +# --- zephyr/h/zephyr/zephyr.h --- + +Z_MAXOTHERFIELDS = 10 + +ZNotice_Kind_t = c_int + + +class _ZTimeval(Structure): + _fields_ = [ + ("tv_sec", c_int), + ("tv_usec", c_int), + ] + + +class ZUnique_Id_t(Structure): + _fields_ = [ + ("zuid_addr", in_addr), + ("tv", _ZTimeval), + ] + + +ZChecksum_t = c_uint + + +class _ZSenderSockaddr(Union): + _fields_ = [ + ("sa", sockaddr), + ("ip4", sockaddr_in), + ("ip6", sockaddr_in6), + ] + + +class ZNotice_t(Structure): + _fields_ = [ + ("z_packet", c_char_p), + ("z_version", c_char_p), + ("z_kind", ZNotice_Kind_t), + ("z_uid", ZUnique_Id_t), + ("z_sender_sockaddr", _ZSenderSockaddr), + ("z_time", _ZTimeval), + ("z_port", c_ushort), + ("z_charset", c_ushort), + ("z_auth", c_int), + ("z_checked_auth", c_int), + ("z_authent_len", c_int), + ("z_ascii_authent", c_char_p), + ("z_class", c_char_p), + ("z_class_inst", c_char_p), + ("z_opcode", c_char_p), + ("z_sender", c_char_p), + ("z_recipient", c_char_p), + ("z_default_format", c_char_p), + ("z_multinotice", c_char_p), + ("z_multiuid", ZUnique_Id_t), + ("z_checksum", ZChecksum_t), + ("z_ascii_checksum", c_char_p), + ("z_num_other_fields", c_int), + ("z_other_fields", c_char_p * Z_MAXOTHERFIELDS), + ("z_message", POINTER(c_char)), + ("z_message_len", c_int), + ("z_num_hdr_fields", c_uint), + ("z_hdr_fields", POINTER(c_char_p)), + ] + + +class ZSubscription_t(Structure): + _fields_ = [ + ("zsub_recipient", c_char_p), + ("zsub_class", c_char_p), + ("zsub_classinst", c_char_p), + ] + + +Code_t = c_int + +ZInitialize = CFUNCTYPE(Code_t)(("ZInitialize", libzephyr)) +ZRetrieveSubscriptions = CFUNCTYPE(Code_t, c_ushort, POINTER(c_int))( + ("ZRetrieveSubscriptions", libzephyr) +) +ZGetSubscriptions = CFUNCTYPE(Code_t, POINTER(ZSubscription_t), POINTER(c_int))( + ("ZGetSubscriptions", libzephyr) +) +ZOpenPort = CFUNCTYPE(Code_t, POINTER(c_ushort))(("ZOpenPort", libzephyr)) +ZFlushSubscriptions = CFUNCTYPE(Code_t)(("ZFlushSubscriptions", libzephyr)) +ZSubscribeTo = CFUNCTYPE(Code_t, POINTER(ZSubscription_t), c_int, c_uint)( + ("ZSubscribeTo", libzephyr) +) +ZCancelSubscriptions = CFUNCTYPE(Code_t, c_uint)(("ZCancelSubscriptions", libzephyr)) +ZPending = CFUNCTYPE(c_int)(("ZPending", libzephyr)) +ZReceiveNotice = CFUNCTYPE(Code_t, POINTER(ZNotice_t), POINTER(sockaddr_in))( + ("ZReceiveNotice", libzephyr) +) +ZDumpSession = CFUNCTYPE(Code_t, POINTER(POINTER(c_char)), POINTER(c_int))( + ("ZDumpSession", libzephyr) +) +ZLoadSession = CFUNCTYPE(Code_t, POINTER(c_char), c_int)(("ZLoadSession", libzephyr)) +ZGetFD = CFUNCTYPE(c_int)(("ZGetFD", libzephyr)) + +ZERR_NONE = 0 + + +# --- zephyr/lib/zephyr_err.et --- + +ERROR_TABLE_BASE_zeph = -772103680 +ZERR_SERVNAK = ERROR_TABLE_BASE_zeph + 16 + + +# --- convenience helpers --- + + +class ZephyrError(Exception): + def __init__(self, code: int) -> None: + self.code = code + + def __str__(self) -> str: + return error_message(self.code).decode() + + +def check(code: int) -> None: + if code != ZERR_NONE: + raise ZephyrError(code) diff --git a/zulip/integrations/zephyr/zephyr_mirror_backend.py b/zulip/integrations/zephyr/zephyr_mirror_backend.py index 61112485..2e906090 100755 --- a/zulip/integrations/zephyr/zephyr_mirror_backend.py +++ b/zulip/integrations/zephyr/zephyr_mirror_backend.py @@ -13,6 +13,7 @@ import sys import tempfile import textwrap import time +from ctypes import POINTER, byref, c_char, c_int, c_ushort from queue import Queue from threading import Thread from types import FrameType @@ -20,6 +21,7 @@ from typing import IO, Any, Dict, List, NoReturn, Optional, Set, Tuple, Union from typing_extensions import Literal, TypedDict +import zephyr_ctypes import zulip from zulip import RandomExponentialBackoff @@ -179,8 +181,23 @@ current_zephyr_subs = set() def zephyr_bulk_subscribe(subs: List[Tuple[str, str, str]]) -> None: try: - zephyr._z.subAll(subs) - except OSError: + zephyr_ctypes.check( + zephyr_ctypes.ZSubscribeTo( + (zephyr_ctypes.ZSubscription_t * len(subs))( + *( + zephyr_ctypes.ZSubscription_t( + zsub_class=cls.encode(), + zsub_classinst=instance.encode(), + zsub_recipient=recipient.encode(), + ) + for cls, instance, recipient in subs + ) + ), + len(subs), + 0, + ) + ) + except zephyr_ctypes.ZephyrError: # Since we haven't added the subscription to # current_zephyr_subs yet, we can just return (so that we'll # continue processing normal messages) and we'll end up @@ -189,26 +206,41 @@ def zephyr_bulk_subscribe(subs: List[Tuple[str, str, str]]) -> None: logger.exception("Error subscribing to streams (will retry automatically):") logger.warning(f"Streams were: {[cls for cls, instance, recipient in subs]}") return + try: - actual_zephyr_subs = [cls for (cls, _, _) in zephyr._z.getSubscriptions()] - except OSError: + nsubs = c_int() + zephyr_ctypes.check(zephyr_ctypes.ZRetrieveSubscriptions(0, byref(nsubs))) + zsubs = (zephyr_ctypes.ZSubscription_t * nsubs.value)() + zephyr_ctypes.check(zephyr_ctypes.ZGetSubscriptions(zsubs, byref(nsubs))) + actual_zephyr_subs = {zsub.zsub_class.decode() for zsub in zsubs} + except zephyr_ctypes.ZephyrError: logger.exception("Error getting current Zephyr subscriptions") # Don't add anything to current_zephyr_subs so that we'll # retry the next time we check for streams to subscribe to # (within 15 seconds). return + finally: + zephyr_ctypes.ZFlushSubscriptions() + for (cls, instance, recipient) in subs: if cls not in actual_zephyr_subs: logger.error(f"Zephyr failed to subscribe us to {cls}; will retry") - try: - # We'll retry automatically when we next check for - # streams to subscribe to (within 15 seconds), but - # it's worth doing 1 retry immediately to avoid - # missing 15 seconds of messages on the affected - # classes - zephyr._z.sub(cls, instance, recipient) - except OSError: - pass + # We'll retry automatically when we next check for + # streams to subscribe to (within 15 seconds), but + # it's worth doing 1 retry immediately to avoid + # missing 15 seconds of messages on the affected + # classes + zephyr_ctypes.ZSubscribeTo( + (zephyr_ctypes.ZSubscription_t * 1)( + zephyr_ctypes.ZSubscription_t( + zsub_class=cls.encode(), + zsub_classinst=instance.encode(), + zsub_recipient=recipient.encode(), + ) + ), + 1, + 0, + ) else: current_zephyr_subs.add(cls) @@ -259,8 +291,8 @@ def maybe_restart_mirroring_script() -> None: logger.warning("zephyr mirroring script has been updated; restarting...") maybe_kill_child() try: - zephyr._z.cancelSubs() - except OSError: + zephyr_ctypes.check(zephyr_ctypes.ZCancelSubscriptions(0)) + except zephyr_ctypes.ZephyrError: # We don't care whether we failed to cancel subs properly, but we should log it logger.exception("") backoff = RandomExponentialBackoff( @@ -281,22 +313,22 @@ def process_loop(zulip_queue: "Queue[ZephyrDict]", log: Optional[IO[str]]) -> No last_check_time = time.time() recieve_backoff = RandomExponentialBackoff() while True: - select.select([zephyr._z.getFD()], [], [], 15) + select.select([zephyr_ctypes.ZGetFD()], [], [], 15) try: process_backoff = RandomExponentialBackoff() # Fetch notices from the queue until its empty - while True: - notice = zephyr.receive(block=False) + while zephyr_ctypes.ZPending() != 0: + notice = zephyr_ctypes.ZNotice_t() + sender = zephyr_ctypes.sockaddr_in() + zephyr_ctypes.check(zephyr_ctypes.ZReceiveNotice(byref(notice), byref(sender))) recieve_backoff.succeed() - if notice is None: - break try: process_notice(notice, zulip_queue, log) process_backoff.succeed() - except Exception: + except zephyr_ctypes.ZephyrError: logger.exception("Error relaying zephyr:") process_backoff.fail() - except Exception: + except zephyr_ctypes.ZephyrError: logger.exception("Error checking for new zephyrs:") recieve_backoff.fail() continue @@ -409,39 +441,46 @@ def decrypt_zephyr(zephyr_class: str, instance: str, body: str) -> str: def process_notice( - notice: "zephyr.ZNotice", zulip_queue: "Queue[ZephyrDict]", log: Optional[IO[str]] + notice: zephyr_ctypes.ZNotice_t, zulip_queue: "Queue[ZephyrDict]", log: Optional[IO[str]] ) -> None: - assert notice.sender is not None - (zsig, body) = parse_zephyr_body(notice.message, notice.format) + assert notice.z_sender is not None + (zsig, body) = parse_zephyr_body( + notice.z_message[: notice.z_message_len].decode(errors="replace"), + notice.z_default_format.decode(errors="replace"), + ) is_personal = False is_huddle = False - if notice.opcode == "PING": + if notice.z_opcode == b"PING": # skip PING messages return - zephyr_class = notice.cls.lower() + zephyr_class = notice.z_class.decode() + zephyr_instance = notice.z_class_inst.decode() + zephyr_sender = notice.z_sender.decode() - if zephyr_class == options.nagios_class: + if zephyr_class.lower() == options.nagios_class: # Mark that we got the message and proceed with open(options.nagios_path, "w") as f: f.write("0\n") return - if notice.recipient != "": + if notice.z_recipient != b"": is_personal = True # Drop messages not to the listed subscriptions if is_personal and not options.forward_personals: return - if (zephyr_class not in current_zephyr_subs) and not is_personal: - logger.debug(f"Skipping ... {zephyr_class}/{notice.instance}/{is_personal}") + if (zephyr_class.lower() not in current_zephyr_subs) and not is_personal: + logger.debug(f"Skipping ... {zephyr_class}/{zephyr_instance}/{is_personal}") return - if notice.format.startswith("Zephyr error: See") or notice.format.endswith("@(@color(blue))"): + if notice.z_default_format.startswith(b"Zephyr error: See") or notice.z_default_format.endswith( + b"@(@color(blue))" + ): logger.debug("Skipping message we got from Zulip!") return if ( - zephyr_class == "mail" - and notice.instance.lower() == "inbox" + zephyr_class.lower() == "mail" + and zephyr_instance.lower() == "inbox" and is_personal and not options.forward_mail_zephyrs ): @@ -455,21 +494,21 @@ def process_notice( huddle_recipients = [ to_zulip_username(x.strip()) for x in body.split("\n")[0][4:].split() ] - if notice.sender not in huddle_recipients: - huddle_recipients.append(to_zulip_username(notice.sender)) + if zephyr_sender not in huddle_recipients: + huddle_recipients.append(to_zulip_username(zephyr_sender)) body = body.split("\n", 1)[1] if ( options.forward_class_messages - and notice.opcode is not None - and notice.opcode.lower() == "crypt" + and notice.z_opcode is not None + and notice.z_opcode.lower() == b"crypt" ): - body = decrypt_zephyr(zephyr_class, notice.instance.lower(), body) + body = decrypt_zephyr(zephyr_class.lower(), zephyr_instance.lower(), body) zeph: ZephyrDict zeph = { - "time": str(notice.time), - "sender": notice.sender, + "time": str(notice.z_time.tv_sec + notice.z_time.tv_usec / 1e6), + "sender": zephyr_sender, "zsig": zsig, # logged here but not used by app "content": body, } @@ -477,30 +516,30 @@ def process_notice( zeph["type"] = "private" zeph["recipient"] = huddle_recipients elif is_personal: - assert notice.recipient is not None + assert notice.z_recipient is not None zeph["type"] = "private" - zeph["recipient"] = to_zulip_username(notice.recipient) + zeph["recipient"] = to_zulip_username(notice.z_recipient.decode()) else: zeph["type"] = "stream" - zeph["stream"] = zephyr_class - if notice.instance.strip() != "": - zeph["subject"] = notice.instance + zeph["stream"] = zephyr_class.lower() + if zephyr_instance.strip() != "": + zeph["subject"] = zephyr_instance else: - zeph["subject"] = f'(instance "{notice.instance}")' + zeph["subject"] = f'(instance "{zephyr_instance}")' # Add instances in for instanced personals if is_personal: - if notice.cls.lower() != "message" and notice.instance.lower() != "personal": - heading = f"[-c {notice.cls} -i {notice.instance}]\n" - elif notice.cls.lower() != "message": - heading = f"[-c {notice.cls}]\n" - elif notice.instance.lower() != "personal": - heading = f"[-i {notice.instance}]\n" + if zephyr_class.lower() != "message" and zephyr_instance.lower() != "personal": + heading = f"[-c {zephyr_class} -i {zephyr_instance}]\n" + elif zephyr_class.lower() != "message": + heading = f"[-c {zephyr_class}]\n" + elif zephyr_instance.lower() != "personal": + heading = f"[-i {zephyr_instance}]\n" else: heading = "" zeph["content"] = heading + zeph["content"] - logger.info(f"Received a message on {zephyr_class}/{notice.instance} from {notice.sender}...") + logger.info(f"Received a message on {zephyr_class}/{zephyr_instance} from {zephyr_sender}...") if log is not None: log.write(json.dumps(zeph) + "\n") log.flush() @@ -530,12 +569,14 @@ def zephyr_init_autoretry() -> None: backoff = zulip.RandomExponentialBackoff() while backoff.keep_going(): try: - # zephyr.init() tries to clear old subscriptions, and thus - # sometimes gets a SERVNAK from the server - zephyr.init() + # ZCancelSubscriptions sometimes gets a SERVNAK from the server + zephyr_ctypes.check(zephyr_ctypes.ZInitialize()) + zephyr_port = c_ushort() + zephyr_ctypes.check(zephyr_ctypes.ZOpenPort(byref(zephyr_port))) + zephyr_ctypes.check(zephyr_ctypes.ZCancelSubscriptions(0)) backoff.succeed() return - except OSError: + except zephyr_ctypes.ZephyrError: logger.exception("Error initializing Zephyr library (retrying). Traceback:") backoff.fail() @@ -548,11 +589,10 @@ def zephyr_load_session_autoretry(session_path: str) -> None: try: with open(session_path, "rb") as f: session = f.read() - zephyr._z.initialize() - zephyr._z.load_session(session) - zephyr.__inited = True + zephyr_ctypes.check(zephyr_ctypes.ZInitialize()) + zephyr_ctypes.check(zephyr_ctypes.ZLoadSession(session, len(session))) return - except OSError: + except zephyr_ctypes.ZephyrError: logger.exception("Error loading saved Zephyr session (retrying). Traceback:") backoff.fail() @@ -560,13 +600,26 @@ def zephyr_load_session_autoretry(session_path: str) -> None: def zephyr_subscribe_autoretry(sub: Tuple[str, str, str]) -> None: + cls, instance, recipient = sub backoff = zulip.RandomExponentialBackoff() while backoff.keep_going(): try: - zephyr.Subscriptions().add(sub) + zephyr_ctypes.check( + zephyr_ctypes.ZSubscribeTo( + (zephyr_ctypes.ZSubscription_t * 1)( + zephyr_ctypes.ZSubscription_t( + zsub_class=cls.encode(), + zsub_classinst=instance.encode(), + zsub_recipient=recipient.encode(), + ) + ), + 1, + 0, + ) + ) backoff.succeed() return - except OSError: + except zephyr_ctypes.ZephyrError: # Probably a SERVNAK from the zephyr server, but log the # traceback just in case it's something else logger.exception("Error subscribing to personals (retrying). Traceback:") @@ -593,8 +646,14 @@ def zephyr_to_zulip(options: optparse.Values) -> None: if options.nagios_class: zephyr_subscribe_autoretry((options.nagios_class, "*", "*")) if options.use_sessions: - with open(options.session_path, "wb") as f: - f.write(zephyr._z.dump_session()) + buf = POINTER(c_char)() + buf_len = c_int() + zephyr_ctypes.check(zephyr_ctypes.ZDumpSession(byref(buf), byref(buf_len))) + try: + with open(options.session_path, "wb") as f: + f.write(buf[: buf_len.value]) # type: ignore[arg-type] # bytes, but mypy infers List[c_char] + finally: + zephyr_ctypes.free(buf) if options.logs_to_resend is not None: with open(options.logs_to_resend) as log: @@ -1184,8 +1243,8 @@ def die_gracefully(signal: int, frame: FrameType) -> None: if CURRENT_STATE == States.ZephyrToZulip and not options.use_sessions: try: - # zephyr=>zulip processes may have added subs, so run cancelSubs - zephyr._z.cancelSubs() + # zephyr=>zulip processes may have added subs, so run ZCancelSubscriptions + zephyr_ctypes.check(zephyr_ctypes.ZCancelSubscriptions(0)) except OSError: # We don't care whether we failed to cancel subs properly, but we should log it logger.exception("") @@ -1297,8 +1356,6 @@ or specify the --api-key-file option.""" child_pid = None CURRENT_STATE = States.ZephyrToZulip - import zephyr - logger_name = "zephyr=>zulip" if options.shard is not None: logger_name += f"({options.shard})"