diff --git a/src/BridgeHelper/LibraryBridgeHelper.cpp b/src/BridgeHelper/LibraryBridgeHelper.cpp index 58b84aa1ffd..cb675da7fa5 100644 --- a/src/BridgeHelper/LibraryBridgeHelper.cpp +++ b/src/BridgeHelper/LibraryBridgeHelper.cpp @@ -2,6 +2,7 @@ #include #include +#include #include namespace DB @@ -29,7 +30,7 @@ LibraryBridgeHelper::LibraryBridgeHelper(ContextPtr context_) void LibraryBridgeHelper::startBridge(std::unique_ptr cmd) const { - getContext()->addBridgeCommand(std::move(cmd)); + ShellCommandsHolder::instance().addCommand(std::move(cmd)); } diff --git a/src/BridgeHelper/XDBCBridgeHelper.h b/src/BridgeHelper/XDBCBridgeHelper.h index 2e1328b4057..cf1720a29ec 100644 --- a/src/BridgeHelper/XDBCBridgeHelper.h +++ b/src/BridgeHelper/XDBCBridgeHelper.h @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -144,7 +145,7 @@ protected: void startBridge(std::unique_ptr cmd) const override { - getContext()->addBridgeCommand(std::move(cmd)); + ShellCommandsHolder::instance().addCommand(std::move(cmd)); } diff --git a/src/Common/ShellCommand.cpp b/src/Common/ShellCommand.cpp index 0d41669816c..bd2c95629fe 100644 --- a/src/Common/ShellCommand.cpp +++ b/src/Common/ShellCommand.cpp @@ -61,6 +61,9 @@ LoggerPtr ShellCommand::getLogger() ShellCommand::~ShellCommand() { + if (do_not_terminate) + return; + if (wait_called) return; @@ -291,11 +294,48 @@ std::unique_ptr ShellCommand::executeDirect(const ShellCommand::Co return executeImpl(path.data(), argv.data(), config); } +struct ShellCommand::tryWaitResult +{ + bool is_process_terminated = false; + int retcode = -1; +}; int ShellCommand::tryWait() { + return tryWaitImpl(true).retcode; +} + +ShellCommand::tryWaitResult ShellCommand::tryWaitImpl(bool blocking) +{ + LOG_TRACE(getLogger(), "Will wait for shell command pid {}", pid); + + ShellCommand::tryWaitResult result; + + int options = ((!blocking) ? WNOHANG : 0); + int status = 0; + int waitpid_retcode = -1; + + while (waitpid_retcode < 0) + { + waitpid_retcode = waitpid(pid, &status, options); + if (waitpid_retcode > 0) + { + break; + } + if (!blocking && !waitpid_retcode) + { + result.is_process_terminated = false; + return result; + } + if (errno != EINTR) + throw ErrnoException(ErrorCodes::CANNOT_WAITPID, "Cannot waitpid"); + } + + LOG_TRACE(getLogger(), "Wait for shell command pid {} completed with status {}", pid, status); + wait_called = true; + result.is_process_terminated = true; in.close(); out.close(); err.close(); @@ -306,19 +346,11 @@ int ShellCommand::tryWait() for (auto & [_, fd] : read_fds) fd.close(); - LOG_TRACE(getLogger(), "Will wait for shell command pid {}", pid); - - int status = 0; - while (waitpid(pid, &status, 0) < 0) - { - if (errno != EINTR) - throw ErrnoException(ErrorCodes::CANNOT_WAITPID, "Cannot waitpid"); - } - - LOG_TRACE(getLogger(), "Wait for shell command pid {} completed with status {}", pid, status); - if (WIFEXITED(status)) - return WEXITSTATUS(status); + { + result.retcode = WEXITSTATUS(status); + return result; + } if (WIFSIGNALED(status)) throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process was terminated by signal {}", toString(WTERMSIG(status))); @@ -330,10 +362,8 @@ int ShellCommand::tryWait() } -void ShellCommand::wait() +void ShellCommand::handleProcessRetcode(int retcode) const { - int retcode = tryWait(); - if (retcode != EXIT_SUCCESS) { switch (retcode) @@ -356,5 +386,22 @@ void ShellCommand::wait() } } +bool ShellCommand::waitIfProccesTerminated() +{ + auto proc_status = tryWaitImpl(false); + if (proc_status.is_process_terminated) + { + handleProcessRetcode(proc_status.retcode); + } + return proc_status.is_process_terminated; +} + + +void ShellCommand::wait() +{ + int retcode = tryWaitImpl(true).retcode; + handleProcessRetcode(retcode); +} + } diff --git a/src/Common/ShellCommand.h b/src/Common/ShellCommand.h index 5ebc1daefa1..7aff975efa6 100644 --- a/src/Common/ShellCommand.h +++ b/src/Common/ShellCommand.h @@ -67,6 +67,21 @@ public: DestructorStrategy terminate_in_destructor_strategy = DestructorStrategy(false, 0); }; + pid_t getPid() const + { + return pid; + } + + bool isWaitCalled() const + { + return wait_called; + } + + void setDoNotTerminate() + { + do_not_terminate = true; + } + /// Run the command using /bin/sh -c. /// If terminate_in_destructor is true, send terminate signal in destructor and don't wait process. static std::unique_ptr execute(const Config & config); @@ -81,6 +96,10 @@ public: /// Wait for the process to finish, see the return code. To throw an exception if the process was not completed independently. int tryWait(); + /// Returns if process terminated. + /// If process terminated, then handle return code. + bool waitIfProccesTerminated(); + WriteBufferFromFile in; /// If the command reads from stdin, do not forget to call in.close() after writing all the data there. ReadBufferFromFile out; ReadBufferFromFile err; @@ -92,10 +111,16 @@ private: pid_t pid; Config config; bool wait_called = false; + bool do_not_terminate = false; ShellCommand(pid_t pid_, int & in_fd_, int & out_fd_, int & err_fd_, const Config & config); bool tryWaitProcessWithTimeout(size_t timeout_in_seconds); + struct tryWaitResult; + + tryWaitResult tryWaitImpl(bool blocking); + + void handleProcessRetcode(int retcode) const; static LoggerPtr getLogger(); diff --git a/src/Common/ShellCommandsHolder.cpp b/src/Common/ShellCommandsHolder.cpp new file mode 100644 index 00000000000..6cc44f27fa7 --- /dev/null +++ b/src/Common/ShellCommandsHolder.cpp @@ -0,0 +1,53 @@ +#include +#include +#include + +namespace DB +{ + +ShellCommandsHolder & ShellCommandsHolder::instance() +{ + static ShellCommandsHolder instance; + return instance; +} + +void ShellCommandsHolder::removeCommand(pid_t pid) +{ + std::lock_guard lock(mutex); + bool is_erased = shell_commands.erase(pid); + LOG_TRACE(log, "Try to erase command with the pid {}, is_erased: {}", pid, is_erased); +} + +void ShellCommandsHolder::addCommand(std::unique_ptr command) +{ + std::lock_guard lock(mutex); + pid_t command_pid = command->getPid(); + if (command->waitIfProccesTerminated()) + { + LOG_TRACE(log, "Pid {} already finished. Do not insert it.", command_pid); + return; + } + + auto [iterator, is_inserted] = shell_commands.try_emplace(command_pid, std::move(command)); + if (is_inserted) + { + LOG_TRACE(log, "Inserted the command with pid {}", command_pid); + return; + } + + if (iterator->second->isWaitCalled()) + { + iterator->second = std::move(command); + LOG_TRACE(log, "Replaced the command with pid {}", command_pid); + return; + } + /// We got two active ShellCommand with the same pid. + /// Probably it is a bug, will try to replace the old shell command with a new one. + chassert(false); + + LOG_WARNING(log, "The PID already presented in active shell commands, will try to replace with a new one."); + + iterator->second->setDoNotTerminate(); + iterator->second = std::move(command); +} +} diff --git a/src/Common/ShellCommandsHolder.h b/src/Common/ShellCommandsHolder.h new file mode 100644 index 00000000000..2326d4042bc --- /dev/null +++ b/src/Common/ShellCommandsHolder.h @@ -0,0 +1,32 @@ +#pragma once + +#include +#include +#include +#include +#include + + +namespace DB +{ + +/** The holder class for running background shell processes. +*/ +class ShellCommandsHolder final : public boost::noncopyable +{ +public: + static ShellCommandsHolder & instance(); + + void removeCommand(pid_t pid); + void addCommand(std::unique_ptr command); + +private: + using ShellCommands = std::unordered_map>; + + std::mutex mutex; + ShellCommands shell_commands TSA_GUARDED_BY(mutex); + + LoggerPtr log = getLogger("ShellCommandsHolder"); +}; + +} diff --git a/src/Common/SignalHandlers.cpp b/src/Common/SignalHandlers.cpp index 7ab001d6d34..166bb20d978 100644 --- a/src/Common/SignalHandlers.cpp +++ b/src/Common/SignalHandlers.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -68,6 +69,20 @@ void terminateRequestedSignalHandler(int sig, siginfo_t *, void *) writeSignalIDtoSignalPipe(sig); } +void childSignalHandler(int sig, siginfo_t * info, void *) +{ + DENY_ALLOCATIONS_IN_SCOPE; + auto saved_errno = errno; /// We must restore previous value of errno in signal handler. + + char buf[signal_pipe_buf_size]; + auto & signal_pipe = HandledSignals::instance().signal_pipe; + WriteBufferFromFileDescriptor out(signal_pipe.fds_rw[1], signal_pipe_buf_size, buf); + writeBinary(sig, out); + writeBinary(info->si_pid, out); + + out.finalize(); + errno = saved_errno; +} void signalHandler(int sig, siginfo_t * info, void * context) { @@ -296,6 +311,12 @@ void SignalListener::run() if (daemon) daemon->handleSignal(sig); } + else if (sig == SIGCHLD) + { + pid_t child_pid = 0; + readBinary(child_pid, in); + ShellCommandsHolder::instance().removeCommand(child_pid); + } else { siginfo_t info{}; diff --git a/src/Common/SignalHandlers.h b/src/Common/SignalHandlers.h index e7519f7aee2..0ac6d1d2428 100644 --- a/src/Common/SignalHandlers.h +++ b/src/Common/SignalHandlers.h @@ -33,6 +33,7 @@ void closeLogsSignalHandler(int sig, siginfo_t *, void *); void terminateRequestedSignalHandler(int sig, siginfo_t *, void *); +void childSignalHandler(int sig, siginfo_t * info, void *); /** Handler for "fault" or diagnostic signals. Send data about fault to separate thread to write into log. */ diff --git a/src/Daemon/BaseDaemon.cpp b/src/Daemon/BaseDaemon.cpp index 8a8dd3c759c..9cbf9fecdb1 100644 --- a/src/Daemon/BaseDaemon.cpp +++ b/src/Daemon/BaseDaemon.cpp @@ -440,6 +440,7 @@ void BaseDaemon::initializeTerminationAndSignalProcessing() HandledSignals::instance().setupCommonDeadlySignalHandlers(); HandledSignals::instance().setupCommonTerminateRequestSignalHandlers(); HandledSignals::instance().addSignalHandler({SIGHUP}, closeLogsSignalHandler, true); + HandledSignals::instance().addSignalHandler({SIGCHLD}, childSignalHandler, true); /// Set up Poco ErrorHandler for Poco Threads. static KillingErrorHandler killing_error_handler; diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index b8e178e402b..4268d026351 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -524,9 +524,6 @@ struct ContextSharedPart : boost::noncopyable /// No lock required for application_type modified only during initialization Context::ApplicationType application_type = Context::ApplicationType::SERVER; - /// vector of xdbc-bridge commands, they will be killed when Context will be destroyed - std::vector> bridge_commands TSA_GUARDED_BY(mutex); - /// No lock required for config_reload_callback, start_servers_callback, stop_servers_callback modified only during initialization Context::ConfigReloadCallback config_reload_callback; Context::StartStopServersCallback start_servers_callback; @@ -5016,12 +5013,6 @@ void Context::addQueryParameters(const NameToNameMap & parameters) query_parameters.insert_or_assign(name, value); } -void Context::addBridgeCommand(std::unique_ptr cmd) const -{ - std::lock_guard lock(shared->mutex); - shared->bridge_commands.emplace_back(std::move(cmd)); -} - IHostContextPtr & Context::getHostContext() { diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index c62c16098e5..ab954f81873 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -1284,8 +1284,6 @@ public: /// Overrides values of existing parameters. void addQueryParameters(const NameToNameMap & parameters); - /// Add started bridge command. It will be killed after context destruction - void addBridgeCommand(std::unique_ptr cmd) const; IHostContextPtr & getHostContext(); const IHostContextPtr & getHostContext() const; diff --git a/tests/integration/helpers/cluster.py b/tests/integration/helpers/cluster.py index 11d61e0fd17..df1c4266468 100644 --- a/tests/integration/helpers/cluster.py +++ b/tests/integration/helpers/cluster.py @@ -1654,6 +1654,7 @@ class ClickHouseCluster: config_root_name="clickhouse", extra_configs=[], randomize_settings=True, + use_docker_init_flag=False, ) -> "ClickHouseInstance": """Add an instance to the cluster. @@ -1758,6 +1759,7 @@ class ClickHouseCluster: config_root_name=config_root_name, extra_configs=extra_configs, randomize_settings=randomize_settings, + use_docker_init_flag=use_docker_init_flag, ) docker_compose_yml_dir = get_docker_compose_path() @@ -3318,6 +3320,7 @@ services: {ipv6_address} {net_aliases} {net_alias1} + init: {init_flag} """ @@ -3383,6 +3386,7 @@ class ClickHouseInstance: config_root_name="clickhouse", extra_configs=[], randomize_settings=True, + use_docker_init_flag=False, ): self.name = name self.base_cmd = cluster.base_cmd @@ -3502,6 +3506,7 @@ class ClickHouseInstance: self.with_installed_binary = with_installed_binary self.is_up = False self.config_root_name = config_root_name + self.docker_init_flag = use_docker_init_flag def is_built_with_sanitizer(self, sanitizer_name=""): build_opts = self.query( @@ -4794,6 +4799,7 @@ class ClickHouseInstance: ipv6_address=ipv6_address, net_aliases=net_aliases, net_alias1=net_alias1, + init_flag="true" if self.docker_init_flag else "false", ) ) diff --git a/tests/integration/test_library_bridge/test.py b/tests/integration/test_library_bridge/test.py index 6254735a18f..7a31dfbda5c 100644 --- a/tests/integration/test_library_bridge/test.py +++ b/tests/integration/test_library_bridge/test.py @@ -14,6 +14,11 @@ instance = cluster.add_instance( dictionaries=["configs/dictionaries/dict1.xml"], main_configs=["configs/config.d/config.xml"], stay_alive=True, + # WA for the problem with zombie processes inside the docker container. + # This is important here because we are checking that there are no zombie processes + # after craches inside the library bridge. + # https://forums.docker.com/t/what-the-latest-with-the-zombie-process-reaping-problem/50758/2 + use_docker_init_flag=True, ) @@ -34,6 +39,13 @@ def create_dict_simple(ch_instance): ) +def check_no_zombie_processes(instance): + res = instance.exec_in_container( + ["bash", "-c", "ps ax -ostat,pid | grep -e '[zZ]' | wc -l"], user="root" + ) + assert res == "0\n" + + @pytest.fixture(scope="module") def ch_cluster(): try: @@ -263,6 +275,8 @@ def test_recover_after_bridge_crash(ch_cluster): instance.exec_in_container( ["bash", "-c", "kill -9 `pidof clickhouse-library-bridge`"], user="root" ) + + check_no_zombie_processes(instance) instance.query("DROP DICTIONARY lib_dict_c") @@ -288,6 +302,8 @@ def test_server_restart_bridge_might_be_stil_alive(ch_cluster): result = instance.query("""select dictGet(lib_dict_c, 'value1', toUInt64(1));""") assert result.strip() == "101" + check_no_zombie_processes(instance) + instance.query("DROP DICTIONARY lib_dict_c") diff --git a/tests/integration/test_library_bridge/test_exiled.py b/tests/integration/test_library_bridge/test_exiled.py index 46a6ac1eaa4..b6d31a218fd 100644 --- a/tests/integration/test_library_bridge/test_exiled.py +++ b/tests/integration/test_library_bridge/test_exiled.py @@ -71,13 +71,13 @@ def test_bridge_dies_with_parent(ch_cluster): except: pass - for i in range(30): + for i in range(60): time.sleep(1) clickhouse_pid = instance.get_process_pid("clickhouse server") if clickhouse_pid is None: break - for i in range(30): + for i in range(60): time.sleep(1) bridge_pid = instance.get_process_pid("library-bridge") if bridge_pid is None: @@ -95,5 +95,5 @@ def test_bridge_dies_with_parent(ch_cluster): assert clickhouse_pid is None assert bridge_pid is None finally: - instance.start_clickhouse(20) + instance.start_clickhouse(60) instance.query("DROP DICTIONARY lib_dict_c")