From cb4741f7704bc862882e044d0d3e9bd0850bccae Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Thu, 31 Oct 2024 12:34:53 +0000 Subject: [PATCH 01/13] Fix zombie processes after library brigde crash. --- src/Common/ShellCommand.cpp | 169 +++++++++++------- src/Common/ShellCommand.h | 3 + tests/integration/helpers/cluster.py | 6 + tests/integration/test_library_bridge/test.py | 11 ++ 4 files changed, 126 insertions(+), 63 deletions(-) diff --git a/src/Common/ShellCommand.cpp b/src/Common/ShellCommand.cpp index 0d41669816c..fe3bd8f8b9c 100644 --- a/src/Common/ShellCommand.cpp +++ b/src/Common/ShellCommand.cpp @@ -25,6 +25,7 @@ namespace CANNOT_EXEC = 0x55555558, CANNOT_DUP_READ_DESCRIPTOR = 0x55555559, CANNOT_DUP_WRITE_DESCRIPTOR = 0x55555560, + CANNOT_VFORK_IN_CHILD = 0x55555561, }; } @@ -138,7 +139,7 @@ std::unique_ptr ShellCommand::executeImpl( * http://www.oracle.com/technetwork/server-storage/solaris10/subprocess-136439.html * Therefore, separate the resolving of the symbol from the call. */ - static void * real_vfork = dlsym(RTLD_DEFAULT, "vfork"); + static void * real_vfork = dlsym(RTLD_DEFAULT, "fork"); #else /// If we use Musl with static linking, there is no dlsym and no issue with vfork. static void * real_vfork = reinterpret_cast(&vfork); @@ -151,6 +152,10 @@ std::unique_ptr ShellCommand::executeImpl( PipeFDs pipe_stdout; PipeFDs pipe_stderr; + PipeFDs pipe_with_child; + WriteBufferFromFile write_buffer_for_child(pipe_with_child.fds_rw[1]); + ReadBufferFromFile read_buffer_for_parent(pipe_with_child.fds_rw[0]); + std::vector> read_pipe_fds; std::vector> write_pipe_fds; @@ -160,64 +165,93 @@ std::unique_ptr ShellCommand::executeImpl( for (size_t i = 0; i < config.write_fds.size(); ++i) write_pipe_fds.emplace_back(std::make_unique()); - pid_t pid = reinterpret_cast(real_vfork)(); + pid_t child_pid = reinterpret_cast(real_vfork)(); - if (pid == -1) + if (child_pid == -1) throw ErrnoException(ErrorCodes::CANNOT_FORK, "Cannot vfork"); - if (0 == pid) + /// Here we are using double vfork technique to prevent zombie process. + if (0 == child_pid) { - /// We are in the freshly created process. + pid_t pid = reinterpret_cast(real_vfork)(); - /// Why `_exit` and not `exit`? Because `exit` calls `atexit` and destructors of thread local storage. - /// And there is a lot of garbage (including, for example, mutex is blocked). And this can not be done after `vfork` - deadlock happens. + if (pid == -1) + _exit(static_cast(ReturnCodes::CANNOT_VFORK_IN_CHILD)); - /// Replace the file descriptors with the ends of our pipes. - if (STDIN_FILENO != dup2(pipe_stdin.fds_rw[0], STDIN_FILENO)) - _exit(static_cast(ReturnCodes::CANNOT_DUP_STDIN)); - - if (!config.pipe_stdin_only) + if (pid == 0) { - if (STDOUT_FILENO != dup2(pipe_stdout.fds_rw[1], STDOUT_FILENO)) - _exit(static_cast(ReturnCodes::CANNOT_DUP_STDOUT)); + /// We are in the freshly created process. - if (STDERR_FILENO != dup2(pipe_stderr.fds_rw[1], STDERR_FILENO)) - _exit(static_cast(ReturnCodes::CANNOT_DUP_STDERR)); + /// Why `_exit` and not `exit`? Because `exit` calls `atexit` and destructors of thread local storage. + /// And there is a lot of garbage (including, for example, mutex is blocked). And this can not be done after `vfork` - deadlock happens. + + /// Replace the file descriptors with the ends of our pipes. + if (STDIN_FILENO != dup2(pipe_stdin.fds_rw[0], STDIN_FILENO)) + _exit(static_cast(ReturnCodes::CANNOT_DUP_STDIN)); + + if (!config.pipe_stdin_only) + { + if (STDOUT_FILENO != dup2(pipe_stdout.fds_rw[1], STDOUT_FILENO)) + _exit(static_cast(ReturnCodes::CANNOT_DUP_STDOUT)); + + if (STDERR_FILENO != dup2(pipe_stderr.fds_rw[1], STDERR_FILENO)) + _exit(static_cast(ReturnCodes::CANNOT_DUP_STDERR)); + } + + for (size_t i = 0; i < config.read_fds.size(); ++i) + { + auto & fds = *read_pipe_fds[i]; + auto fd = config.read_fds[i]; + + if (fd != dup2(fds.fds_rw[1], fd)) + _exit(static_cast(ReturnCodes::CANNOT_DUP_READ_DESCRIPTOR)); + } + + for (size_t i = 0; i < config.write_fds.size(); ++i) + { + auto & fds = *write_pipe_fds[i]; + auto fd = config.write_fds[i]; + + if (fd != dup2(fds.fds_rw[0], fd)) + _exit(static_cast(ReturnCodes::CANNOT_DUP_WRITE_DESCRIPTOR)); + } + + // Reset the signal mask: it may be non-empty and will be inherited + // by the child process, which might not expect this. + sigset_t mask; + sigemptyset(&mask); + sigprocmask(0, nullptr, &mask); // NOLINT(concurrency-mt-unsafe) + sigprocmask(SIG_UNBLOCK, &mask, nullptr); // NOLINT(concurrency-mt-unsafe) + + execv(filename, argv); + /// If the process is running, then `execv` does not return here. + + _exit(static_cast(ReturnCodes::CANNOT_EXEC)); } - - for (size_t i = 0; i < config.read_fds.size(); ++i) + else { - auto & fds = *read_pipe_fds[i]; - auto fd = config.read_fds[i]; - - if (fd != dup2(fds.fds_rw[1], fd)) - _exit(static_cast(ReturnCodes::CANNOT_DUP_READ_DESCRIPTOR)); + DB::writeIntBinary(pid, write_buffer_for_child); + write_buffer_for_child.next(); + _exit(0); } - - for (size_t i = 0; i < config.write_fds.size(); ++i) + } + else + { + int status = 0; + while (waitpid(child_pid, &status, 0) < 0) { - auto & fds = *write_pipe_fds[i]; - auto fd = config.write_fds[i]; - - if (fd != dup2(fds.fds_rw[0], fd)) - _exit(static_cast(ReturnCodes::CANNOT_DUP_WRITE_DESCRIPTOR)); + if (errno != EINTR) + throw ErrnoException(ErrorCodes::CANNOT_WAITPID, "Cannot waitpid"); } - - // Reset the signal mask: it may be non-empty and will be inherited - // by the child process, which might not expect this. - sigset_t mask; - sigemptyset(&mask); - sigprocmask(0, nullptr, &mask); // NOLINT(concurrency-mt-unsafe) - sigprocmask(SIG_UNBLOCK, &mask, nullptr); // NOLINT(concurrency-mt-unsafe) - - execv(filename, argv); - /// If the process is running, then `execv` does not return here. - - _exit(static_cast(ReturnCodes::CANNOT_EXEC)); + int return_code = handleWaitStatus(child_pid, status); + handleProceesReturnCode(child_pid, return_code); } + pid_t grandchild_pid = 0; + DB::readIntBinary(grandchild_pid, read_buffer_for_parent); + std::unique_ptr res(new ShellCommand( - pid, + grandchild_pid, pipe_stdin.fds_rw[1], pipe_stdout.fds_rw[0], pipe_stderr.fds_rw[0], @@ -241,7 +275,7 @@ std::unique_ptr ShellCommand::executeImpl( getLogger(), "Started shell command '{}' with pid {} and file descriptors: out {}, err {}", filename, - pid, + grandchild_pid, res->out.getFD(), res->err.getFD()); @@ -317,44 +351,53 @@ int ShellCommand::tryWait() LOG_TRACE(getLogger(), "Wait for shell command pid {} completed with status {}", pid, status); - if (WIFEXITED(status)) - return WEXITSTATUS(status); - - if (WIFSIGNALED(status)) - throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process was terminated by signal {}", toString(WTERMSIG(status))); - - if (WIFSTOPPED(status)) - throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process was stopped by signal {}", toString(WSTOPSIG(status))); - - throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process was not exited normally by unknown reason"); + return handleWaitStatus(pid, status); } - void ShellCommand::wait() { int retcode = tryWait(); + handleProceesReturnCode(pid, retcode); +} +int ShellCommand::handleWaitStatus(pid_t pid, int status) +{ + if (WIFEXITED(status)) + return WEXITSTATUS(status); + + if (WIFSIGNALED(status)) + throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process {} was terminated by signal {}", pid, toString(WTERMSIG(status))); + + if (WIFSTOPPED(status)) + throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process {} was stopped by signal {}", pid, toString(WSTOPSIG(status))); + + throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process {} was not exited normally by unknown reason", pid); +} + +void ShellCommand::handleProceesReturnCode(pid_t pid, int retcode) +{ if (retcode != EXIT_SUCCESS) { switch (retcode) { case static_cast(ReturnCodes::CANNOT_DUP_STDIN): - throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 stdin of child process"); + throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 stdin of child process: {}", pid); case static_cast(ReturnCodes::CANNOT_DUP_STDOUT): - throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 stdout of child process"); + throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 stdout of child process {}", pid); case static_cast(ReturnCodes::CANNOT_DUP_STDERR): - throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 stderr of child process"); + throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 stderr of child process {}", pid); case static_cast(ReturnCodes::CANNOT_EXEC): - throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot execv in child process"); + throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot execv in child process {}", pid); case static_cast(ReturnCodes::CANNOT_DUP_READ_DESCRIPTOR): - throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 read descriptor of child process"); + throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 read descriptor of child process {}", pid); case static_cast(ReturnCodes::CANNOT_DUP_WRITE_DESCRIPTOR): - throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 write descriptor of child process"); + throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 write descriptor of child process {}", pid); + case static_cast(ReturnCodes::CANNOT_VFORK_IN_CHILD): + throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot vfork in child procces {}", pid); default: - throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process was exited with return code {}", toString(retcode)); + throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process {} was exited with return code {}", pid, toString(retcode)); } } } - } diff --git a/src/Common/ShellCommand.h b/src/Common/ShellCommand.h index 5ebc1daefa1..17106db9698 100644 --- a/src/Common/ShellCommand.h +++ b/src/Common/ShellCommand.h @@ -99,6 +99,9 @@ private: static LoggerPtr getLogger(); + static int handleWaitStatus(pid_t pid, int status); + static void handleProceesReturnCode(pid_t pid, int exit_code); + /// Print command name and the list of arguments to log. NOTE: No escaping of arguments is performed. static void logCommand(const char * filename, char * const argv[]); diff --git a/tests/integration/helpers/cluster.py b/tests/integration/helpers/cluster.py index eae0f9fec2d..949343e9036 100644 --- a/tests/integration/helpers/cluster.py +++ b/tests/integration/helpers/cluster.py @@ -1657,6 +1657,7 @@ class ClickHouseCluster: extra_configs=[], extra_args="", randomize_settings=True, + use_docker_init_flag=False, ) -> "ClickHouseInstance": """Add an instance to the cluster. @@ -1762,6 +1763,7 @@ class ClickHouseCluster: config_root_name=config_root_name, extra_configs=extra_configs, randomize_settings=randomize_settings, + use_docker_init_flag=use_docker_init_flag, ) docker_compose_yml_dir = get_docker_compose_path() @@ -3353,6 +3355,7 @@ services: {ipv6_address} {net_aliases} {net_alias1} + init: {init_flag} """ @@ -3419,6 +3422,7 @@ class ClickHouseInstance: config_root_name="clickhouse", extra_configs=[], randomize_settings=True, + use_docker_init_flag=False, ): self.name = name self.base_cmd = cluster.base_cmd @@ -3545,6 +3549,7 @@ class ClickHouseInstance: self.with_installed_binary = with_installed_binary self.is_up = False self.config_root_name = config_root_name + self.docker_init_flag = use_docker_init_flag def is_built_with_sanitizer(self, sanitizer_name=""): build_opts = self.query( @@ -4838,6 +4843,7 @@ class ClickHouseInstance: ipv6_address=ipv6_address, net_aliases=net_aliases, net_alias1=net_alias1, + init_flag="true" if self.docker_init_flag else "false", ) ) diff --git a/tests/integration/test_library_bridge/test.py b/tests/integration/test_library_bridge/test.py index 6254735a18f..79ec17271be 100644 --- a/tests/integration/test_library_bridge/test.py +++ b/tests/integration/test_library_bridge/test.py @@ -14,6 +14,11 @@ instance = cluster.add_instance( dictionaries=["configs/dictionaries/dict1.xml"], main_configs=["configs/config.d/config.xml"], stay_alive=True, + # WA for the problem with zombie processes inside the docker container. + # This is important here because we are checking that there are no zombie processes + # after craches inside the library bridge. + # https://forums.docker.com/t/what-the-latest-with-the-zombie-process-reaping-problem/50758/2 + use_docker_init_flag=True, ) @@ -263,6 +268,12 @@ def test_recover_after_bridge_crash(ch_cluster): instance.exec_in_container( ["bash", "-c", "kill -9 `pidof clickhouse-library-bridge`"], user="root" ) + ## There are no zombie processes. + res = instance.exec_in_container( + ["bash", "-c", "ps ax -ostat,pid | grep -e '[zZ]' | wc -l"], user="root" + ) + assert res == "0\n" + instance.query("DROP DICTIONARY lib_dict_c") From f12a582bd09545e831a19ed635d47457b14a9f9e Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Thu, 31 Oct 2024 14:11:18 +0000 Subject: [PATCH 02/13] vfork --- src/Common/ShellCommand.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Common/ShellCommand.cpp b/src/Common/ShellCommand.cpp index fe3bd8f8b9c..c2d8cc44ab3 100644 --- a/src/Common/ShellCommand.cpp +++ b/src/Common/ShellCommand.cpp @@ -139,7 +139,7 @@ std::unique_ptr ShellCommand::executeImpl( * http://www.oracle.com/technetwork/server-storage/solaris10/subprocess-136439.html * Therefore, separate the resolving of the symbol from the call. */ - static void * real_vfork = dlsym(RTLD_DEFAULT, "fork"); + static void * real_vfork = dlsym(RTLD_DEFAULT, "vfork"); #else /// If we use Musl with static linking, there is no dlsym and no issue with vfork. static void * real_vfork = reinterpret_cast(&vfork); From 4cc8d36419067a551cf3e8736af11da89f137dc5 Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Tue, 5 Nov 2024 15:49:01 +0000 Subject: [PATCH 03/13] Redesigned --- src/BridgeHelper/LibraryBridgeHelper.cpp | 2 +- src/BridgeHelper/XDBCBridgeHelper.h | 2 +- src/Common/BackgroundShellCommandHolder.cpp | 37 ++++ src/Common/BackgroundShellCommandHolder.h | 30 ++++ src/Common/ShellCommand.cpp | 163 +++++++----------- src/Common/ShellCommand.h | 8 +- src/Common/SignalHandlers.cpp | 20 +++ src/Common/SignalHandlers.h | 1 + src/Daemon/BaseDaemon.cpp | 1 + src/Interpreters/Context.cpp | 16 +- src/Interpreters/Context.h | 7 +- tests/integration/test_library_bridge/test.py | 15 +- 12 files changed, 182 insertions(+), 120 deletions(-) create mode 100644 src/Common/BackgroundShellCommandHolder.cpp create mode 100644 src/Common/BackgroundShellCommandHolder.h diff --git a/src/BridgeHelper/LibraryBridgeHelper.cpp b/src/BridgeHelper/LibraryBridgeHelper.cpp index 58b84aa1ffd..e9fd1078658 100644 --- a/src/BridgeHelper/LibraryBridgeHelper.cpp +++ b/src/BridgeHelper/LibraryBridgeHelper.cpp @@ -29,7 +29,7 @@ LibraryBridgeHelper::LibraryBridgeHelper(ContextPtr context_) void LibraryBridgeHelper::startBridge(std::unique_ptr cmd) const { - getContext()->addBridgeCommand(std::move(cmd)); + getContext()->addBackgroundShellCommand(std::move(cmd)); } diff --git a/src/BridgeHelper/XDBCBridgeHelper.h b/src/BridgeHelper/XDBCBridgeHelper.h index 2e1328b4057..0851d1289db 100644 --- a/src/BridgeHelper/XDBCBridgeHelper.h +++ b/src/BridgeHelper/XDBCBridgeHelper.h @@ -144,7 +144,7 @@ protected: void startBridge(std::unique_ptr cmd) const override { - getContext()->addBridgeCommand(std::move(cmd)); + getContext()->addBackgroundShellCommand(std::move(cmd)); } diff --git a/src/Common/BackgroundShellCommandHolder.cpp b/src/Common/BackgroundShellCommandHolder.cpp new file mode 100644 index 00000000000..8d239356218 --- /dev/null +++ b/src/Common/BackgroundShellCommandHolder.cpp @@ -0,0 +1,37 @@ +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +LoggerPtr BackgroundShellCommandHolder::getLogger() +{ + return ::getLogger("BackgroundShellCommandHolder"); +} + + +void BackgroundShellCommandHolder::removeCommand(pid_t pid) +{ + std::lock_guard lock(mutex); + bool is_erased = active_shell_commands.erase(pid); + LOG_TRACE(getLogger(), "Try to erase command with the pid {}, is_erased: {}", pid, is_erased); +} + +void BackgroundShellCommandHolder::addCommand(std::unique_ptr command) +{ + std::lock_guard lock(mutex); + pid_t command_pid = command->getPid(); + + auto [iterator, is_inserted] = active_shell_commands.emplace(std::make_pair(command_pid, std::move(command))); + if (!is_inserted) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't insert proccess PID {} into active shell commands, because there are running proccess with same PID", command_pid); + + LOG_TRACE(getLogger(), "Inserted the command with pid {}", command_pid); +} +} diff --git a/src/Common/BackgroundShellCommandHolder.h b/src/Common/BackgroundShellCommandHolder.h new file mode 100644 index 00000000000..58bbbefcec1 --- /dev/null +++ b/src/Common/BackgroundShellCommandHolder.h @@ -0,0 +1,30 @@ +#pragma once + +#include +#include + +#include +#include + + +namespace DB +{ + +/** The holder class for running background shell processes. +*/ +class BackgroundShellCommandHolder final +{ +public: + void removeCommand(pid_t pid); + void addCommand(std::unique_ptr command); + +private: + using ActiveShellCommandsCollection = std::unordered_map>; + + std::mutex mutex; + ActiveShellCommandsCollection active_shell_commands TSA_GUARDED_BY(mutex); + + static LoggerPtr getLogger(); +}; + +} diff --git a/src/Common/ShellCommand.cpp b/src/Common/ShellCommand.cpp index c2d8cc44ab3..0d41669816c 100644 --- a/src/Common/ShellCommand.cpp +++ b/src/Common/ShellCommand.cpp @@ -25,7 +25,6 @@ namespace CANNOT_EXEC = 0x55555558, CANNOT_DUP_READ_DESCRIPTOR = 0x55555559, CANNOT_DUP_WRITE_DESCRIPTOR = 0x55555560, - CANNOT_VFORK_IN_CHILD = 0x55555561, }; } @@ -152,10 +151,6 @@ std::unique_ptr ShellCommand::executeImpl( PipeFDs pipe_stdout; PipeFDs pipe_stderr; - PipeFDs pipe_with_child; - WriteBufferFromFile write_buffer_for_child(pipe_with_child.fds_rw[1]); - ReadBufferFromFile read_buffer_for_parent(pipe_with_child.fds_rw[0]); - std::vector> read_pipe_fds; std::vector> write_pipe_fds; @@ -165,93 +160,64 @@ std::unique_ptr ShellCommand::executeImpl( for (size_t i = 0; i < config.write_fds.size(); ++i) write_pipe_fds.emplace_back(std::make_unique()); - pid_t child_pid = reinterpret_cast(real_vfork)(); + pid_t pid = reinterpret_cast(real_vfork)(); - if (child_pid == -1) + if (pid == -1) throw ErrnoException(ErrorCodes::CANNOT_FORK, "Cannot vfork"); - /// Here we are using double vfork technique to prevent zombie process. - if (0 == child_pid) + if (0 == pid) { - pid_t pid = reinterpret_cast(real_vfork)(); + /// We are in the freshly created process. - if (pid == -1) - _exit(static_cast(ReturnCodes::CANNOT_VFORK_IN_CHILD)); + /// Why `_exit` and not `exit`? Because `exit` calls `atexit` and destructors of thread local storage. + /// And there is a lot of garbage (including, for example, mutex is blocked). And this can not be done after `vfork` - deadlock happens. - if (pid == 0) + /// Replace the file descriptors with the ends of our pipes. + if (STDIN_FILENO != dup2(pipe_stdin.fds_rw[0], STDIN_FILENO)) + _exit(static_cast(ReturnCodes::CANNOT_DUP_STDIN)); + + if (!config.pipe_stdin_only) { - /// We are in the freshly created process. + if (STDOUT_FILENO != dup2(pipe_stdout.fds_rw[1], STDOUT_FILENO)) + _exit(static_cast(ReturnCodes::CANNOT_DUP_STDOUT)); - /// Why `_exit` and not `exit`? Because `exit` calls `atexit` and destructors of thread local storage. - /// And there is a lot of garbage (including, for example, mutex is blocked). And this can not be done after `vfork` - deadlock happens. - - /// Replace the file descriptors with the ends of our pipes. - if (STDIN_FILENO != dup2(pipe_stdin.fds_rw[0], STDIN_FILENO)) - _exit(static_cast(ReturnCodes::CANNOT_DUP_STDIN)); - - if (!config.pipe_stdin_only) - { - if (STDOUT_FILENO != dup2(pipe_stdout.fds_rw[1], STDOUT_FILENO)) - _exit(static_cast(ReturnCodes::CANNOT_DUP_STDOUT)); - - if (STDERR_FILENO != dup2(pipe_stderr.fds_rw[1], STDERR_FILENO)) - _exit(static_cast(ReturnCodes::CANNOT_DUP_STDERR)); - } - - for (size_t i = 0; i < config.read_fds.size(); ++i) - { - auto & fds = *read_pipe_fds[i]; - auto fd = config.read_fds[i]; - - if (fd != dup2(fds.fds_rw[1], fd)) - _exit(static_cast(ReturnCodes::CANNOT_DUP_READ_DESCRIPTOR)); - } - - for (size_t i = 0; i < config.write_fds.size(); ++i) - { - auto & fds = *write_pipe_fds[i]; - auto fd = config.write_fds[i]; - - if (fd != dup2(fds.fds_rw[0], fd)) - _exit(static_cast(ReturnCodes::CANNOT_DUP_WRITE_DESCRIPTOR)); - } - - // Reset the signal mask: it may be non-empty and will be inherited - // by the child process, which might not expect this. - sigset_t mask; - sigemptyset(&mask); - sigprocmask(0, nullptr, &mask); // NOLINT(concurrency-mt-unsafe) - sigprocmask(SIG_UNBLOCK, &mask, nullptr); // NOLINT(concurrency-mt-unsafe) - - execv(filename, argv); - /// If the process is running, then `execv` does not return here. - - _exit(static_cast(ReturnCodes::CANNOT_EXEC)); + if (STDERR_FILENO != dup2(pipe_stderr.fds_rw[1], STDERR_FILENO)) + _exit(static_cast(ReturnCodes::CANNOT_DUP_STDERR)); } - else + + for (size_t i = 0; i < config.read_fds.size(); ++i) { - DB::writeIntBinary(pid, write_buffer_for_child); - write_buffer_for_child.next(); - _exit(0); + auto & fds = *read_pipe_fds[i]; + auto fd = config.read_fds[i]; + + if (fd != dup2(fds.fds_rw[1], fd)) + _exit(static_cast(ReturnCodes::CANNOT_DUP_READ_DESCRIPTOR)); } + + for (size_t i = 0; i < config.write_fds.size(); ++i) + { + auto & fds = *write_pipe_fds[i]; + auto fd = config.write_fds[i]; + + if (fd != dup2(fds.fds_rw[0], fd)) + _exit(static_cast(ReturnCodes::CANNOT_DUP_WRITE_DESCRIPTOR)); + } + + // Reset the signal mask: it may be non-empty and will be inherited + // by the child process, which might not expect this. + sigset_t mask; + sigemptyset(&mask); + sigprocmask(0, nullptr, &mask); // NOLINT(concurrency-mt-unsafe) + sigprocmask(SIG_UNBLOCK, &mask, nullptr); // NOLINT(concurrency-mt-unsafe) + + execv(filename, argv); + /// If the process is running, then `execv` does not return here. + + _exit(static_cast(ReturnCodes::CANNOT_EXEC)); } - else - { - int status = 0; - while (waitpid(child_pid, &status, 0) < 0) - { - if (errno != EINTR) - throw ErrnoException(ErrorCodes::CANNOT_WAITPID, "Cannot waitpid"); - } - int return_code = handleWaitStatus(child_pid, status); - handleProceesReturnCode(child_pid, return_code); - } - - pid_t grandchild_pid = 0; - DB::readIntBinary(grandchild_pid, read_buffer_for_parent); std::unique_ptr res(new ShellCommand( - grandchild_pid, + pid, pipe_stdin.fds_rw[1], pipe_stdout.fds_rw[0], pipe_stderr.fds_rw[0], @@ -275,7 +241,7 @@ std::unique_ptr ShellCommand::executeImpl( getLogger(), "Started shell command '{}' with pid {} and file descriptors: out {}, err {}", filename, - grandchild_pid, + pid, res->out.getFD(), res->err.getFD()); @@ -351,53 +317,44 @@ int ShellCommand::tryWait() LOG_TRACE(getLogger(), "Wait for shell command pid {} completed with status {}", pid, status); - return handleWaitStatus(pid, status); -} - -void ShellCommand::wait() -{ - int retcode = tryWait(); - handleProceesReturnCode(pid, retcode); -} - -int ShellCommand::handleWaitStatus(pid_t pid, int status) -{ if (WIFEXITED(status)) return WEXITSTATUS(status); if (WIFSIGNALED(status)) - throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process {} was terminated by signal {}", pid, toString(WTERMSIG(status))); + throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process was terminated by signal {}", toString(WTERMSIG(status))); if (WIFSTOPPED(status)) - throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process {} was stopped by signal {}", pid, toString(WSTOPSIG(status))); + throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process was stopped by signal {}", toString(WSTOPSIG(status))); - throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process {} was not exited normally by unknown reason", pid); + throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process was not exited normally by unknown reason"); } -void ShellCommand::handleProceesReturnCode(pid_t pid, int retcode) + +void ShellCommand::wait() { + int retcode = tryWait(); + if (retcode != EXIT_SUCCESS) { switch (retcode) { case static_cast(ReturnCodes::CANNOT_DUP_STDIN): - throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 stdin of child process: {}", pid); + throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 stdin of child process"); case static_cast(ReturnCodes::CANNOT_DUP_STDOUT): - throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 stdout of child process {}", pid); + throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 stdout of child process"); case static_cast(ReturnCodes::CANNOT_DUP_STDERR): - throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 stderr of child process {}", pid); + throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 stderr of child process"); case static_cast(ReturnCodes::CANNOT_EXEC): - throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot execv in child process {}", pid); + throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot execv in child process"); case static_cast(ReturnCodes::CANNOT_DUP_READ_DESCRIPTOR): - throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 read descriptor of child process {}", pid); + throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 read descriptor of child process"); case static_cast(ReturnCodes::CANNOT_DUP_WRITE_DESCRIPTOR): - throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 write descriptor of child process {}", pid); - case static_cast(ReturnCodes::CANNOT_VFORK_IN_CHILD): - throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot vfork in child procces {}", pid); + throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 write descriptor of child process"); default: - throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process {} was exited with return code {}", pid, toString(retcode)); + throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process was exited with return code {}", toString(retcode)); } } } + } diff --git a/src/Common/ShellCommand.h b/src/Common/ShellCommand.h index 17106db9698..7e068baab4c 100644 --- a/src/Common/ShellCommand.h +++ b/src/Common/ShellCommand.h @@ -67,6 +67,11 @@ public: DestructorStrategy terminate_in_destructor_strategy = DestructorStrategy(false, 0); }; + pid_t getPid() + { + return pid; + } + /// Run the command using /bin/sh -c. /// If terminate_in_destructor is true, send terminate signal in destructor and don't wait process. static std::unique_ptr execute(const Config & config); @@ -99,9 +104,6 @@ private: static LoggerPtr getLogger(); - static int handleWaitStatus(pid_t pid, int status); - static void handleProceesReturnCode(pid_t pid, int exit_code); - /// Print command name and the list of arguments to log. NOTE: No escaping of arguments is performed. static void logCommand(const char * filename, char * const argv[]); diff --git a/src/Common/SignalHandlers.cpp b/src/Common/SignalHandlers.cpp index 08261fb1cc1..7ee2ee031ae 100644 --- a/src/Common/SignalHandlers.cpp +++ b/src/Common/SignalHandlers.cpp @@ -68,6 +68,20 @@ void terminateRequestedSignalHandler(int sig, siginfo_t *, void *) writeSignalIDtoSignalPipe(sig); } +void childSignalHandler(int sig, siginfo_t * info, void *) +{ + DENY_ALLOCATIONS_IN_SCOPE; + auto saved_errno = errno; /// We must restore previous value of errno in signal handler. + + char buf[signal_pipe_buf_size]; + auto & signal_pipe = HandledSignals::instance().signal_pipe; + WriteBufferFromFileDescriptor out(signal_pipe.fds_rw[1], signal_pipe_buf_size, buf); + writeBinary(sig, out); + writeBinary(info->si_pid, out); + + out.next(); + errno = saved_errno; +} void signalHandler(int sig, siginfo_t * info, void * context) { @@ -294,6 +308,12 @@ void SignalListener::run() if (daemon) daemon->handleSignal(sig); } + else if (sig == SIGCHLD) + { + pid_t child_pid = 0; + readBinary(child_pid, in); + Context::getGlobalContextInstance()->terminateBackgroundShellCommand(child_pid); + } else { siginfo_t info{}; diff --git a/src/Common/SignalHandlers.h b/src/Common/SignalHandlers.h index e7519f7aee2..0ac6d1d2428 100644 --- a/src/Common/SignalHandlers.h +++ b/src/Common/SignalHandlers.h @@ -33,6 +33,7 @@ void closeLogsSignalHandler(int sig, siginfo_t *, void *); void terminateRequestedSignalHandler(int sig, siginfo_t *, void *); +void childSignalHandler(int sig, siginfo_t * info, void *); /** Handler for "fault" or diagnostic signals. Send data about fault to separate thread to write into log. */ diff --git a/src/Daemon/BaseDaemon.cpp b/src/Daemon/BaseDaemon.cpp index 8a8dd3c759c..9cbf9fecdb1 100644 --- a/src/Daemon/BaseDaemon.cpp +++ b/src/Daemon/BaseDaemon.cpp @@ -440,6 +440,7 @@ void BaseDaemon::initializeTerminationAndSignalProcessing() HandledSignals::instance().setupCommonDeadlySignalHandlers(); HandledSignals::instance().setupCommonTerminateRequestSignalHandlers(); HandledSignals::instance().addSignalHandler({SIGHUP}, closeLogsSignalHandler, true); + HandledSignals::instance().addSignalHandler({SIGCHLD}, childSignalHandler, true); /// Set up Poco ErrorHandler for Poco Threads. static KillingErrorHandler killing_error_handler; diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 7f0ad013c1d..85fd07360a4 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -100,6 +100,7 @@ #include #include #include +#include #include #include #include @@ -540,8 +541,9 @@ struct ContextSharedPart : boost::noncopyable /// No lock required for application_type modified only during initialization Context::ApplicationType application_type = Context::ApplicationType::SERVER; - /// vector of xdbc-bridge commands, they will be killed when Context will be destroyed - std::vector> bridge_commands TSA_GUARDED_BY(mutex); + /// Manager of running background shell commands. + /// They will be killed when Context will be destroyed or with SIGCHLD signal. + BackgroundShellCommandHolder background_active_shell_commands; /// No lock required for config_reload_callback, start_servers_callback, stop_servers_callback modified only during initialization Context::ConfigReloadCallback config_reload_callback; @@ -5067,10 +5069,14 @@ void Context::addQueryParameters(const NameToNameMap & parameters) query_parameters.insert_or_assign(name, value); } -void Context::addBridgeCommand(std::unique_ptr cmd) const +void Context::addBackgroundShellCommand(std::unique_ptr cmd) const { - std::lock_guard lock(shared->mutex); - shared->bridge_commands.emplace_back(std::move(cmd)); + shared->background_active_shell_commands.addCommand(std::move(cmd)); +} + +void Context::terminateBackgroundShellCommand(pid_t pid) const +{ + shared->background_active_shell_commands.removeCommand(pid); } diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 327ac0af5fd..f18676c1472 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -1288,8 +1288,11 @@ public: /// Overrides values of existing parameters. void addQueryParameters(const NameToNameMap & parameters); - /// Add started bridge command. It will be killed after context destruction - void addBridgeCommand(std::unique_ptr cmd) const; + /// Add background shell command. It will be killed after context destruction or with SIGCHLD. + void addBackgroundShellCommand(std::unique_ptr cmd) const; + + /// Terminate background shell command. + void terminateBackgroundShellCommand(pid_t pid) const; IHostContextPtr & getHostContext(); const IHostContextPtr & getHostContext() const; diff --git a/tests/integration/test_library_bridge/test.py b/tests/integration/test_library_bridge/test.py index 79ec17271be..7a31dfbda5c 100644 --- a/tests/integration/test_library_bridge/test.py +++ b/tests/integration/test_library_bridge/test.py @@ -39,6 +39,13 @@ def create_dict_simple(ch_instance): ) +def check_no_zombie_processes(instance): + res = instance.exec_in_container( + ["bash", "-c", "ps ax -ostat,pid | grep -e '[zZ]' | wc -l"], user="root" + ) + assert res == "0\n" + + @pytest.fixture(scope="module") def ch_cluster(): try: @@ -268,12 +275,8 @@ def test_recover_after_bridge_crash(ch_cluster): instance.exec_in_container( ["bash", "-c", "kill -9 `pidof clickhouse-library-bridge`"], user="root" ) - ## There are no zombie processes. - res = instance.exec_in_container( - ["bash", "-c", "ps ax -ostat,pid | grep -e '[zZ]' | wc -l"], user="root" - ) - assert res == "0\n" + check_no_zombie_processes(instance) instance.query("DROP DICTIONARY lib_dict_c") @@ -299,6 +302,8 @@ def test_server_restart_bridge_might_be_stil_alive(ch_cluster): result = instance.query("""select dictGet(lib_dict_c, 'value1', toUInt64(1));""") assert result.strip() == "101" + check_no_zombie_processes(instance) + instance.query("DROP DICTIONARY lib_dict_c") From b57938df4e8f31304aaebc50821d5338e12db803 Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Tue, 5 Nov 2024 18:41:47 +0000 Subject: [PATCH 04/13] Style --- src/Common/BackgroundShellCommandHolder.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Common/BackgroundShellCommandHolder.cpp b/src/Common/BackgroundShellCommandHolder.cpp index 8d239356218..035ab01abbf 100644 --- a/src/Common/BackgroundShellCommandHolder.cpp +++ b/src/Common/BackgroundShellCommandHolder.cpp @@ -30,7 +30,7 @@ void BackgroundShellCommandHolder::addCommand(std::unique_ptr comm auto [iterator, is_inserted] = active_shell_commands.emplace(std::make_pair(command_pid, std::move(command))); if (!is_inserted) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't insert proccess PID {} into active shell commands, because there are running proccess with same PID", command_pid); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't insert process PID {} into active shell commands, because there are running process with same PID", command_pid); LOG_TRACE(getLogger(), "Inserted the command with pid {}", command_pid); } From ca8a93e1ddb68ba15e837b066be1538e7b2a381b Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Wed, 6 Nov 2024 08:19:41 +0000 Subject: [PATCH 05/13] Fix tidy --- src/Common/ShellCommand.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Common/ShellCommand.h b/src/Common/ShellCommand.h index 7e068baab4c..e07b769610f 100644 --- a/src/Common/ShellCommand.h +++ b/src/Common/ShellCommand.h @@ -67,7 +67,7 @@ public: DestructorStrategy terminate_in_destructor_strategy = DestructorStrategy(false, 0); }; - pid_t getPid() + pid_t getPid() const { return pid; } From 89096b554a4ec5fc75c924628e0b605938987369 Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Wed, 6 Nov 2024 10:10:41 +0000 Subject: [PATCH 06/13] Restart Ci From e16c231c580030647e0ec09431cb6f42a04778b0 Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Thu, 7 Nov 2024 10:52:52 +0000 Subject: [PATCH 07/13] Review --- ...llCommandHolder.cpp => ShellCommandsHolder.cpp} | 14 +++++++------- ...dShellCommandHolder.h => ShellCommandsHolder.h} | 10 +++++----- src/Common/SignalHandlers.cpp | 2 +- src/Interpreters/Context.cpp | 6 +++--- src/Interpreters/Context.h | 2 +- 5 files changed, 17 insertions(+), 17 deletions(-) rename src/Common/{BackgroundShellCommandHolder.cpp => ShellCommandsHolder.cpp} (57%) rename src/Common/{BackgroundShellCommandHolder.h => ShellCommandsHolder.h} (61%) diff --git a/src/Common/BackgroundShellCommandHolder.cpp b/src/Common/ShellCommandsHolder.cpp similarity index 57% rename from src/Common/BackgroundShellCommandHolder.cpp rename to src/Common/ShellCommandsHolder.cpp index 035ab01abbf..3b66c6ba5fc 100644 --- a/src/Common/BackgroundShellCommandHolder.cpp +++ b/src/Common/ShellCommandsHolder.cpp @@ -1,6 +1,6 @@ #include #include -#include +#include namespace DB { @@ -10,25 +10,25 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -LoggerPtr BackgroundShellCommandHolder::getLogger() +LoggerPtr ShellCommandsHolder::getLogger() { - return ::getLogger("BackgroundShellCommandHolder"); + return ::getLogger("ShellCommandsHolder"); } -void BackgroundShellCommandHolder::removeCommand(pid_t pid) +void ShellCommandsHolder::removeCommand(pid_t pid) { std::lock_guard lock(mutex); - bool is_erased = active_shell_commands.erase(pid); + bool is_erased = shell_commands.erase(pid); LOG_TRACE(getLogger(), "Try to erase command with the pid {}, is_erased: {}", pid, is_erased); } -void BackgroundShellCommandHolder::addCommand(std::unique_ptr command) +void ShellCommandsHolder::addCommand(std::unique_ptr command) { std::lock_guard lock(mutex); pid_t command_pid = command->getPid(); - auto [iterator, is_inserted] = active_shell_commands.emplace(std::make_pair(command_pid, std::move(command))); + auto [iterator, is_inserted] = shell_commands.emplace(std::make_pair(command_pid, std::move(command))); if (!is_inserted) throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't insert process PID {} into active shell commands, because there are running process with same PID", command_pid); diff --git a/src/Common/BackgroundShellCommandHolder.h b/src/Common/ShellCommandsHolder.h similarity index 61% rename from src/Common/BackgroundShellCommandHolder.h rename to src/Common/ShellCommandsHolder.h index 58bbbefcec1..95db7622ebd 100644 --- a/src/Common/BackgroundShellCommandHolder.h +++ b/src/Common/ShellCommandsHolder.h @@ -1,9 +1,9 @@ #pragma once +#include +#include #include #include - -#include #include @@ -12,17 +12,17 @@ namespace DB /** The holder class for running background shell processes. */ -class BackgroundShellCommandHolder final +class ShellCommandsHolder final : public boost::noncopyable { public: void removeCommand(pid_t pid); void addCommand(std::unique_ptr command); private: - using ActiveShellCommandsCollection = std::unordered_map>; + using ShellCommands = std::unordered_map>; std::mutex mutex; - ActiveShellCommandsCollection active_shell_commands TSA_GUARDED_BY(mutex); + ShellCommands shell_commands TSA_GUARDED_BY(mutex); static LoggerPtr getLogger(); }; diff --git a/src/Common/SignalHandlers.cpp b/src/Common/SignalHandlers.cpp index 7ee2ee031ae..440cfe0b57e 100644 --- a/src/Common/SignalHandlers.cpp +++ b/src/Common/SignalHandlers.cpp @@ -312,7 +312,7 @@ void SignalListener::run() { pid_t child_pid = 0; readBinary(child_pid, in); - Context::getGlobalContextInstance()->terminateBackgroundShellCommand(child_pid); + Context::getGlobalContextInstance()->removeBackgroundShellCommand(child_pid); } else { diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 85fd07360a4..951571dd1db 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -100,7 +100,7 @@ #include #include #include -#include +#include #include #include #include @@ -543,7 +543,7 @@ struct ContextSharedPart : boost::noncopyable /// Manager of running background shell commands. /// They will be killed when Context will be destroyed or with SIGCHLD signal. - BackgroundShellCommandHolder background_active_shell_commands; + ShellCommandsHolder background_active_shell_commands; /// No lock required for config_reload_callback, start_servers_callback, stop_servers_callback modified only during initialization Context::ConfigReloadCallback config_reload_callback; @@ -5074,7 +5074,7 @@ void Context::addBackgroundShellCommand(std::unique_ptr cmd) const shared->background_active_shell_commands.addCommand(std::move(cmd)); } -void Context::terminateBackgroundShellCommand(pid_t pid) const +void Context::removeBackgroundShellCommand(pid_t pid) const { shared->background_active_shell_commands.removeCommand(pid); } diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index f18676c1472..51f6e299c95 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -1292,7 +1292,7 @@ public: void addBackgroundShellCommand(std::unique_ptr cmd) const; /// Terminate background shell command. - void terminateBackgroundShellCommand(pid_t pid) const; + void removeBackgroundShellCommand(pid_t pid) const; IHostContextPtr & getHostContext(); const IHostContextPtr & getHostContext() const; From ea375aecb3f83da1b2f3245dc33e0f91b2adb784 Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Thu, 7 Nov 2024 10:53:38 +0000 Subject: [PATCH 08/13] Increase timeout --- tests/integration/test_library_bridge/test_exiled.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_library_bridge/test_exiled.py b/tests/integration/test_library_bridge/test_exiled.py index 46a6ac1eaa4..b6d31a218fd 100644 --- a/tests/integration/test_library_bridge/test_exiled.py +++ b/tests/integration/test_library_bridge/test_exiled.py @@ -71,13 +71,13 @@ def test_bridge_dies_with_parent(ch_cluster): except: pass - for i in range(30): + for i in range(60): time.sleep(1) clickhouse_pid = instance.get_process_pid("clickhouse server") if clickhouse_pid is None: break - for i in range(30): + for i in range(60): time.sleep(1) bridge_pid = instance.get_process_pid("library-bridge") if bridge_pid is None: @@ -95,5 +95,5 @@ def test_bridge_dies_with_parent(ch_cluster): assert clickhouse_pid is None assert bridge_pid is None finally: - instance.start_clickhouse(20) + instance.start_clickhouse(60) instance.query("DROP DICTIONARY lib_dict_c") From d69427c35f20ba739bb514170cb5f916b8aa1852 Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Thu, 7 Nov 2024 16:24:02 +0000 Subject: [PATCH 09/13] More review --- src/Common/ShellCommand.cpp | 74 ++++++++++++++++++++++++------ src/Common/ShellCommand.h | 20 ++++++++ src/Common/ShellCommandsHolder.cpp | 29 ++++++++++-- 3 files changed, 104 insertions(+), 19 deletions(-) diff --git a/src/Common/ShellCommand.cpp b/src/Common/ShellCommand.cpp index 0d41669816c..b540cee9e82 100644 --- a/src/Common/ShellCommand.cpp +++ b/src/Common/ShellCommand.cpp @@ -61,6 +61,9 @@ LoggerPtr ShellCommand::getLogger() ShellCommand::~ShellCommand() { + if (is_manualy_terminated) + return; + if (wait_called) return; @@ -291,11 +294,45 @@ std::unique_ptr ShellCommand::executeDirect(const ShellCommand::Co return executeImpl(path.data(), argv.data(), config); } +struct ShellCommand::tryWaitResult +{ + bool is_process_terminated = false; + int retcode = -1; +}; int ShellCommand::tryWait() { + return tryWaitImpl(false).retcode; +} + +ShellCommand::tryWaitResult ShellCommand::tryWaitImpl(bool blocking) +{ + LOG_TRACE(getLogger(), "Will wait for shell command pid {}", pid); + + ShellCommand::tryWaitResult result; + + int options = ((blocking) ? WNOHANG : 0); + int status = 0; + int waitpid_retcode = -1; + + while (waitpid_retcode < 0) + { + waitpid_retcode = waitpid(pid, &status, options); + + if (blocking && !waitpid_retcode) + { + result.is_process_terminated = false; + return result; + } + if (errno != EINTR) + throw ErrnoException(ErrorCodes::CANNOT_WAITPID, "Cannot waitpid"); + } + + LOG_TRACE(getLogger(), "Wait for shell command pid {} completed with status {}", pid, status); + wait_called = true; + result.is_process_terminated = true; in.close(); out.close(); err.close(); @@ -306,19 +343,11 @@ int ShellCommand::tryWait() for (auto & [_, fd] : read_fds) fd.close(); - LOG_TRACE(getLogger(), "Will wait for shell command pid {}", pid); - - int status = 0; - while (waitpid(pid, &status, 0) < 0) - { - if (errno != EINTR) - throw ErrnoException(ErrorCodes::CANNOT_WAITPID, "Cannot waitpid"); - } - - LOG_TRACE(getLogger(), "Wait for shell command pid {} completed with status {}", pid, status); - if (WIFEXITED(status)) - return WEXITSTATUS(status); + { + result.retcode = WEXITSTATUS(status); + return result; + } if (WIFSIGNALED(status)) throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process was terminated by signal {}", toString(WTERMSIG(status))); @@ -330,10 +359,8 @@ int ShellCommand::tryWait() } -void ShellCommand::wait() +void ShellCommand::handleProcessRetcode(int retcode) const { - int retcode = tryWait(); - if (retcode != EXIT_SUCCESS) { switch (retcode) @@ -356,5 +383,22 @@ void ShellCommand::wait() } } +bool ShellCommand::waitIfProccesTerminated() +{ + auto proc_status = tryWaitImpl(true); + if (proc_status.is_process_terminated) + { + handleProcessRetcode(proc_status.retcode); + } + return proc_status.is_process_terminated; +} + + +void ShellCommand::wait() +{ + int retcode = tryWaitImpl(false).retcode; + handleProcessRetcode(retcode); +} + } diff --git a/src/Common/ShellCommand.h b/src/Common/ShellCommand.h index e07b769610f..6a4a8328b82 100644 --- a/src/Common/ShellCommand.h +++ b/src/Common/ShellCommand.h @@ -72,6 +72,16 @@ public: return pid; } + bool isWaitCalled() const + { + return wait_called; + } + + void setManuallyTerminated() + { + is_manualy_terminated = true; + } + /// Run the command using /bin/sh -c. /// If terminate_in_destructor is true, send terminate signal in destructor and don't wait process. static std::unique_ptr execute(const Config & config); @@ -86,6 +96,10 @@ public: /// Wait for the process to finish, see the return code. To throw an exception if the process was not completed independently. int tryWait(); + /// Returns if process terminated. + /// If process terminated, then handle return code. + bool waitIfProccesTerminated(); + WriteBufferFromFile in; /// If the command reads from stdin, do not forget to call in.close() after writing all the data there. ReadBufferFromFile out; ReadBufferFromFile err; @@ -97,10 +111,16 @@ private: pid_t pid; Config config; bool wait_called = false; + bool is_manualy_terminated = false; ShellCommand(pid_t pid_, int & in_fd_, int & out_fd_, int & err_fd_, const Config & config); bool tryWaitProcessWithTimeout(size_t timeout_in_seconds); + struct tryWaitResult; + + tryWaitResult tryWaitImpl(bool blocking); + + void handleProcessRetcode(int retcode) const; static LoggerPtr getLogger(); diff --git a/src/Common/ShellCommandsHolder.cpp b/src/Common/ShellCommandsHolder.cpp index 3b66c6ba5fc..242c30f1286 100644 --- a/src/Common/ShellCommandsHolder.cpp +++ b/src/Common/ShellCommandsHolder.cpp @@ -27,11 +27,32 @@ void ShellCommandsHolder::addCommand(std::unique_ptr command) { std::lock_guard lock(mutex); pid_t command_pid = command->getPid(); + if (command->waitIfProccesTerminated()) + { + LOG_TRACE(getLogger(), "Pid {} already finished. Do not insert it.", command_pid); + return; + } - auto [iterator, is_inserted] = shell_commands.emplace(std::make_pair(command_pid, std::move(command))); - if (!is_inserted) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't insert process PID {} into active shell commands, because there are running process with same PID", command_pid); + auto [iterator, is_inserted] = shell_commands.try_emplace(command_pid, std::move(command)); + if (is_inserted) + { + LOG_TRACE(getLogger(), "Inserted the command with pid {}", command_pid); + return; + } - LOG_TRACE(getLogger(), "Inserted the command with pid {}", command_pid); + if (iterator->second->isWaitCalled()) + { + iterator->second = std::move(command); + LOG_TRACE(getLogger(), "Replaced the command with pid {}", command_pid); + return; + } + + /// We got two active ShellCommand with the same pid. + /// Probably it is a bug, will try to replace the old shell command with a new one. + + LOG_WARNING(getLogger(), "The PID already presented in active shell commands, will try to replace with a new one."); + + iterator->second->setManuallyTerminated(); + iterator->second = std::move(command); } } From cca7da6664650f2794b63d2b77a4aa6977b75b26 Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Thu, 7 Nov 2024 18:54:24 +0000 Subject: [PATCH 10/13] Style --- src/Common/ShellCommandsHolder.cpp | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/Common/ShellCommandsHolder.cpp b/src/Common/ShellCommandsHolder.cpp index 242c30f1286..eb3f981a6e4 100644 --- a/src/Common/ShellCommandsHolder.cpp +++ b/src/Common/ShellCommandsHolder.cpp @@ -5,11 +5,6 @@ namespace DB { -namespace ErrorCodes -{ - extern const int LOGICAL_ERROR; -} - LoggerPtr ShellCommandsHolder::getLogger() { return ::getLogger("ShellCommandsHolder"); From 2e83d0f61fdc6baa6198efaa5ac167ae825cad97 Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Thu, 14 Nov 2024 09:55:06 +0000 Subject: [PATCH 11/13] Remove holder from config --- src/BridgeHelper/LibraryBridgeHelper.cpp | 3 ++- src/BridgeHelper/XDBCBridgeHelper.h | 3 ++- src/Common/ShellCommand.cpp | 15 +++++++++------ src/Common/ShellCommandsHolder.cpp | 6 ++++++ src/Common/ShellCommandsHolder.h | 2 ++ src/Common/SignalHandlers.cpp | 3 ++- src/Interpreters/Context.cpp | 15 --------------- src/Interpreters/Context.h | 5 ----- 8 files changed, 23 insertions(+), 29 deletions(-) diff --git a/src/BridgeHelper/LibraryBridgeHelper.cpp b/src/BridgeHelper/LibraryBridgeHelper.cpp index e9fd1078658..cb675da7fa5 100644 --- a/src/BridgeHelper/LibraryBridgeHelper.cpp +++ b/src/BridgeHelper/LibraryBridgeHelper.cpp @@ -2,6 +2,7 @@ #include #include +#include #include namespace DB @@ -29,7 +30,7 @@ LibraryBridgeHelper::LibraryBridgeHelper(ContextPtr context_) void LibraryBridgeHelper::startBridge(std::unique_ptr cmd) const { - getContext()->addBackgroundShellCommand(std::move(cmd)); + ShellCommandsHolder::instance().addCommand(std::move(cmd)); } diff --git a/src/BridgeHelper/XDBCBridgeHelper.h b/src/BridgeHelper/XDBCBridgeHelper.h index 0851d1289db..cf1720a29ec 100644 --- a/src/BridgeHelper/XDBCBridgeHelper.h +++ b/src/BridgeHelper/XDBCBridgeHelper.h @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -144,7 +145,7 @@ protected: void startBridge(std::unique_ptr cmd) const override { - getContext()->addBackgroundShellCommand(std::move(cmd)); + ShellCommandsHolder::instance().addCommand(std::move(cmd)); } diff --git a/src/Common/ShellCommand.cpp b/src/Common/ShellCommand.cpp index b540cee9e82..bef7b7a0e9d 100644 --- a/src/Common/ShellCommand.cpp +++ b/src/Common/ShellCommand.cpp @@ -302,7 +302,7 @@ struct ShellCommand::tryWaitResult int ShellCommand::tryWait() { - return tryWaitImpl(false).retcode; + return tryWaitImpl(true).retcode; } ShellCommand::tryWaitResult ShellCommand::tryWaitImpl(bool blocking) @@ -311,15 +311,18 @@ ShellCommand::tryWaitResult ShellCommand::tryWaitImpl(bool blocking) ShellCommand::tryWaitResult result; - int options = ((blocking) ? WNOHANG : 0); + int options = ((!blocking) ? WNOHANG : 0); int status = 0; int waitpid_retcode = -1; while (waitpid_retcode < 0) { waitpid_retcode = waitpid(pid, &status, options); - - if (blocking && !waitpid_retcode) + if (waitpid_retcode > 0) + { + break; + } + if (!blocking && !waitpid_retcode) { result.is_process_terminated = false; return result; @@ -385,7 +388,7 @@ void ShellCommand::handleProcessRetcode(int retcode) const bool ShellCommand::waitIfProccesTerminated() { - auto proc_status = tryWaitImpl(true); + auto proc_status = tryWaitImpl(false); if (proc_status.is_process_terminated) { handleProcessRetcode(proc_status.retcode); @@ -396,7 +399,7 @@ bool ShellCommand::waitIfProccesTerminated() void ShellCommand::wait() { - int retcode = tryWaitImpl(false).retcode; + int retcode = tryWaitImpl(true).retcode; handleProcessRetcode(retcode); } diff --git a/src/Common/ShellCommandsHolder.cpp b/src/Common/ShellCommandsHolder.cpp index eb3f981a6e4..1354cd2e4b1 100644 --- a/src/Common/ShellCommandsHolder.cpp +++ b/src/Common/ShellCommandsHolder.cpp @@ -5,6 +5,12 @@ namespace DB { +ShellCommandsHolder & ShellCommandsHolder::instance() +{ + static ShellCommandsHolder instance; + return instance; +} + LoggerPtr ShellCommandsHolder::getLogger() { return ::getLogger("ShellCommandsHolder"); diff --git a/src/Common/ShellCommandsHolder.h b/src/Common/ShellCommandsHolder.h index 95db7622ebd..70934365768 100644 --- a/src/Common/ShellCommandsHolder.h +++ b/src/Common/ShellCommandsHolder.h @@ -15,6 +15,8 @@ namespace DB class ShellCommandsHolder final : public boost::noncopyable { public: + static ShellCommandsHolder & instance(); + void removeCommand(pid_t pid); void addCommand(std::unique_ptr command); diff --git a/src/Common/SignalHandlers.cpp b/src/Common/SignalHandlers.cpp index 440cfe0b57e..63ac2df272b 100644 --- a/src/Common/SignalHandlers.cpp +++ b/src/Common/SignalHandlers.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -312,7 +313,7 @@ void SignalListener::run() { pid_t child_pid = 0; readBinary(child_pid, in); - Context::getGlobalContextInstance()->removeBackgroundShellCommand(child_pid); + ShellCommandsHolder::instance().removeCommand(child_pid); } else { diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 951571dd1db..78d41a336b6 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -100,7 +100,6 @@ #include #include #include -#include #include #include #include @@ -541,10 +540,6 @@ struct ContextSharedPart : boost::noncopyable /// No lock required for application_type modified only during initialization Context::ApplicationType application_type = Context::ApplicationType::SERVER; - /// Manager of running background shell commands. - /// They will be killed when Context will be destroyed or with SIGCHLD signal. - ShellCommandsHolder background_active_shell_commands; - /// No lock required for config_reload_callback, start_servers_callback, stop_servers_callback modified only during initialization Context::ConfigReloadCallback config_reload_callback; Context::StartStopServersCallback start_servers_callback; @@ -5069,16 +5064,6 @@ void Context::addQueryParameters(const NameToNameMap & parameters) query_parameters.insert_or_assign(name, value); } -void Context::addBackgroundShellCommand(std::unique_ptr cmd) const -{ - shared->background_active_shell_commands.addCommand(std::move(cmd)); -} - -void Context::removeBackgroundShellCommand(pid_t pid) const -{ - shared->background_active_shell_commands.removeCommand(pid); -} - IHostContextPtr & Context::getHostContext() { diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 51f6e299c95..cb8bf9634e2 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -1288,11 +1288,6 @@ public: /// Overrides values of existing parameters. void addQueryParameters(const NameToNameMap & parameters); - /// Add background shell command. It will be killed after context destruction or with SIGCHLD. - void addBackgroundShellCommand(std::unique_ptr cmd) const; - - /// Terminate background shell command. - void removeBackgroundShellCommand(pid_t pid) const; IHostContextPtr & getHostContext(); const IHostContextPtr & getHostContext() const; From 296b88efdfce55db122648e1d658bfe76873be53 Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Wed, 20 Nov 2024 15:37:30 +0000 Subject: [PATCH 12/13] Review --- src/Common/ShellCommand.cpp | 2 +- src/Common/ShellCommand.h | 6 +++--- src/Common/ShellCommandsHolder.cpp | 20 +++++++------------- src/Common/ShellCommandsHolder.h | 2 +- 4 files changed, 12 insertions(+), 18 deletions(-) diff --git a/src/Common/ShellCommand.cpp b/src/Common/ShellCommand.cpp index bef7b7a0e9d..bd2c95629fe 100644 --- a/src/Common/ShellCommand.cpp +++ b/src/Common/ShellCommand.cpp @@ -61,7 +61,7 @@ LoggerPtr ShellCommand::getLogger() ShellCommand::~ShellCommand() { - if (is_manualy_terminated) + if (do_not_terminate) return; if (wait_called) diff --git a/src/Common/ShellCommand.h b/src/Common/ShellCommand.h index 6a4a8328b82..7aff975efa6 100644 --- a/src/Common/ShellCommand.h +++ b/src/Common/ShellCommand.h @@ -77,9 +77,9 @@ public: return wait_called; } - void setManuallyTerminated() + void setDoNotTerminate() { - is_manualy_terminated = true; + do_not_terminate = true; } /// Run the command using /bin/sh -c. @@ -111,7 +111,7 @@ private: pid_t pid; Config config; bool wait_called = false; - bool is_manualy_terminated = false; + bool do_not_terminate = false; ShellCommand(pid_t pid_, int & in_fd_, int & out_fd_, int & err_fd_, const Config & config); diff --git a/src/Common/ShellCommandsHolder.cpp b/src/Common/ShellCommandsHolder.cpp index 1354cd2e4b1..6cc44f27fa7 100644 --- a/src/Common/ShellCommandsHolder.cpp +++ b/src/Common/ShellCommandsHolder.cpp @@ -11,17 +11,11 @@ ShellCommandsHolder & ShellCommandsHolder::instance() return instance; } -LoggerPtr ShellCommandsHolder::getLogger() -{ - return ::getLogger("ShellCommandsHolder"); -} - - void ShellCommandsHolder::removeCommand(pid_t pid) { std::lock_guard lock(mutex); bool is_erased = shell_commands.erase(pid); - LOG_TRACE(getLogger(), "Try to erase command with the pid {}, is_erased: {}", pid, is_erased); + LOG_TRACE(log, "Try to erase command with the pid {}, is_erased: {}", pid, is_erased); } void ShellCommandsHolder::addCommand(std::unique_ptr command) @@ -30,30 +24,30 @@ void ShellCommandsHolder::addCommand(std::unique_ptr command) pid_t command_pid = command->getPid(); if (command->waitIfProccesTerminated()) { - LOG_TRACE(getLogger(), "Pid {} already finished. Do not insert it.", command_pid); + LOG_TRACE(log, "Pid {} already finished. Do not insert it.", command_pid); return; } auto [iterator, is_inserted] = shell_commands.try_emplace(command_pid, std::move(command)); if (is_inserted) { - LOG_TRACE(getLogger(), "Inserted the command with pid {}", command_pid); + LOG_TRACE(log, "Inserted the command with pid {}", command_pid); return; } if (iterator->second->isWaitCalled()) { iterator->second = std::move(command); - LOG_TRACE(getLogger(), "Replaced the command with pid {}", command_pid); + LOG_TRACE(log, "Replaced the command with pid {}", command_pid); return; } - /// We got two active ShellCommand with the same pid. /// Probably it is a bug, will try to replace the old shell command with a new one. + chassert(false); - LOG_WARNING(getLogger(), "The PID already presented in active shell commands, will try to replace with a new one."); + LOG_WARNING(log, "The PID already presented in active shell commands, will try to replace with a new one."); - iterator->second->setManuallyTerminated(); + iterator->second->setDoNotTerminate(); iterator->second = std::move(command); } } diff --git a/src/Common/ShellCommandsHolder.h b/src/Common/ShellCommandsHolder.h index 70934365768..2326d4042bc 100644 --- a/src/Common/ShellCommandsHolder.h +++ b/src/Common/ShellCommandsHolder.h @@ -26,7 +26,7 @@ private: std::mutex mutex; ShellCommands shell_commands TSA_GUARDED_BY(mutex); - static LoggerPtr getLogger(); + LoggerPtr log = getLogger("ShellCommandsHolder"); }; } From 2875cf5230629697b6dd643fdbf2c5f3c8430a74 Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Thu, 21 Nov 2024 10:04:10 +0000 Subject: [PATCH 13/13] Fix with new master --- src/Common/SignalHandlers.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Common/SignalHandlers.cpp b/src/Common/SignalHandlers.cpp index 63ac2df272b..47c10147ade 100644 --- a/src/Common/SignalHandlers.cpp +++ b/src/Common/SignalHandlers.cpp @@ -80,7 +80,7 @@ void childSignalHandler(int sig, siginfo_t * info, void *) writeBinary(sig, out); writeBinary(info->si_pid, out); - out.next(); + out.finalize(); errno = saved_errno; }