ClickHouse/tests/ci/tee_popen.py
2024-12-04 13:13:21 +01:00

130 lines
4.0 KiB
Python

#!/usr/bin/env python3
import logging
import os
import signal
import sys
from io import TextIOWrapper
from pathlib import Path
from subprocess import PIPE, STDOUT, Popen
from threading import Thread
from time import sleep
from typing import Optional, Union
# Very simple tee logic implementation. You can specify a shell command, output
# logfile and env variables. After TeePopen is created you can only wait until
# it finishes. stderr and stdout will be redirected both to specified file and
# stdout.
class TeePopen:
def __init__(
self,
command: str,
log_file: Union[str, Path],
env: Optional[dict] = None,
timeout: Optional[int] = None,
):
self.command = command
self._log_file_name = log_file
self._log_file = None # type: Optional[TextIOWrapper]
self.env = env or os.environ.copy()
self._process = None # type: Optional[Popen]
self.timeout = timeout
self.timeout_exceeded = False
self.terminated_by_sigterm = False
self.terminated_by_sigkill = False
def _check_timeout(self) -> None:
if self.timeout is None:
return
sleep(self.timeout)
logging.warning(
"Timeout exceeded. Send SIGTERM to process %s, timeout %s",
self.process.pid,
self.timeout,
)
self.timeout_exceeded = True
self.terminate()
def terminate(self, wait_before_kill: int = 100) -> None:
time_wait = 0
time_sleep = 5
self.terminated_by_sigterm = True
self.send_signal(signal.SIGTERM)
while self.process.poll() is None and time_wait < wait_before_kill:
logging.warning("Wait the process %s to terminate", self.process.pid)
sleep(time_sleep)
time_wait += time_sleep
self.terminated_by_sigkill = True
while self.process.poll() is None:
logging.error("Process is still running. Send SIGKILL")
self.send_signal(signal.SIGKILL)
sleep(time_sleep)
def __enter__(self) -> "TeePopen":
self.process = Popen(
self.command,
shell=True,
universal_newlines=True,
env=self.env,
start_new_session=True, # signall will be sent to all children
stderr=STDOUT,
stdout=PIPE,
bufsize=1,
errors="backslashreplace",
)
sleep(1)
print(f"Subprocess started, pid [{self.process.pid}]")
if self.timeout is not None and self.timeout > 0:
t = Thread(target=self._check_timeout)
t.daemon = True # does not block the program from exit
t.start()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.wait()
if self.timeout_exceeded:
exceeded_log = (
f"Command `{self.command}` has failed, "
f"timeout {self.timeout}s is exceeded"
)
if self.process.stdout is not None:
sys.stdout.write(exceeded_log)
self.log_file.write(exceeded_log)
self.log_file.close()
def wait(self) -> int:
if self.process.stdout is not None:
for line in self.process.stdout:
sys.stdout.write(line)
self.log_file.write(line)
self.log_file.flush()
return self.process.wait()
def poll(self):
return self.process.poll()
def send_signal(self, signal_num):
os.killpg(self.process.pid, signal_num)
@property
def process(self) -> Popen:
if self._process is not None:
return self._process
raise AttributeError("process is not created yet")
@process.setter
def process(self, process: Popen) -> None:
self._process = process
@property
def log_file(self) -> TextIOWrapper:
if self._log_file is None:
# pylint:disable-next=consider-using-with
self._log_file = open(self._log_file_name, "w", encoding="utf-8")
return self._log_file