diff --git a/src/BridgeHelper/LibraryBridgeHelper.cpp b/src/BridgeHelper/LibraryBridgeHelper.cpp index 58b84aa1ffd..e9fd1078658 100644 --- a/src/BridgeHelper/LibraryBridgeHelper.cpp +++ b/src/BridgeHelper/LibraryBridgeHelper.cpp @@ -29,7 +29,7 @@ LibraryBridgeHelper::LibraryBridgeHelper(ContextPtr context_) void LibraryBridgeHelper::startBridge(std::unique_ptr cmd) const { - getContext()->addBridgeCommand(std::move(cmd)); + getContext()->addBackgroundShellCommand(std::move(cmd)); } diff --git a/src/BridgeHelper/XDBCBridgeHelper.h b/src/BridgeHelper/XDBCBridgeHelper.h index 2e1328b4057..0851d1289db 100644 --- a/src/BridgeHelper/XDBCBridgeHelper.h +++ b/src/BridgeHelper/XDBCBridgeHelper.h @@ -144,7 +144,7 @@ protected: void startBridge(std::unique_ptr cmd) const override { - getContext()->addBridgeCommand(std::move(cmd)); + getContext()->addBackgroundShellCommand(std::move(cmd)); } diff --git a/src/Common/BackgroundShellCommandHolder.cpp b/src/Common/BackgroundShellCommandHolder.cpp new file mode 100644 index 00000000000..8d239356218 --- /dev/null +++ b/src/Common/BackgroundShellCommandHolder.cpp @@ -0,0 +1,37 @@ +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +LoggerPtr BackgroundShellCommandHolder::getLogger() +{ + return ::getLogger("BackgroundShellCommandHolder"); +} + + +void BackgroundShellCommandHolder::removeCommand(pid_t pid) +{ + std::lock_guard lock(mutex); + bool is_erased = active_shell_commands.erase(pid); + LOG_TRACE(getLogger(), "Try to erase command with the pid {}, is_erased: {}", pid, is_erased); +} + +void BackgroundShellCommandHolder::addCommand(std::unique_ptr command) +{ + std::lock_guard lock(mutex); + pid_t command_pid = command->getPid(); + + auto [iterator, is_inserted] = active_shell_commands.emplace(std::make_pair(command_pid, std::move(command))); + if (!is_inserted) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't insert proccess PID {} into active shell commands, because there are running proccess with same PID", command_pid); + + LOG_TRACE(getLogger(), "Inserted the command with pid {}", command_pid); +} +} diff --git a/src/Common/BackgroundShellCommandHolder.h b/src/Common/BackgroundShellCommandHolder.h new file mode 100644 index 00000000000..58bbbefcec1 --- /dev/null +++ b/src/Common/BackgroundShellCommandHolder.h @@ -0,0 +1,30 @@ +#pragma once + +#include +#include + +#include +#include + + +namespace DB +{ + +/** The holder class for running background shell processes. +*/ +class BackgroundShellCommandHolder final +{ +public: + void removeCommand(pid_t pid); + void addCommand(std::unique_ptr command); + +private: + using ActiveShellCommandsCollection = std::unordered_map>; + + std::mutex mutex; + ActiveShellCommandsCollection active_shell_commands TSA_GUARDED_BY(mutex); + + static LoggerPtr getLogger(); +}; + +} diff --git a/src/Common/ShellCommand.cpp b/src/Common/ShellCommand.cpp index c2d8cc44ab3..0d41669816c 100644 --- a/src/Common/ShellCommand.cpp +++ b/src/Common/ShellCommand.cpp @@ -25,7 +25,6 @@ namespace CANNOT_EXEC = 0x55555558, CANNOT_DUP_READ_DESCRIPTOR = 0x55555559, CANNOT_DUP_WRITE_DESCRIPTOR = 0x55555560, - CANNOT_VFORK_IN_CHILD = 0x55555561, }; } @@ -152,10 +151,6 @@ std::unique_ptr ShellCommand::executeImpl( PipeFDs pipe_stdout; PipeFDs pipe_stderr; - PipeFDs pipe_with_child; - WriteBufferFromFile write_buffer_for_child(pipe_with_child.fds_rw[1]); - ReadBufferFromFile read_buffer_for_parent(pipe_with_child.fds_rw[0]); - std::vector> read_pipe_fds; std::vector> write_pipe_fds; @@ -165,93 +160,64 @@ std::unique_ptr ShellCommand::executeImpl( for (size_t i = 0; i < config.write_fds.size(); ++i) write_pipe_fds.emplace_back(std::make_unique()); - pid_t child_pid = reinterpret_cast(real_vfork)(); + pid_t pid = reinterpret_cast(real_vfork)(); - if (child_pid == -1) + if (pid == -1) throw ErrnoException(ErrorCodes::CANNOT_FORK, "Cannot vfork"); - /// Here we are using double vfork technique to prevent zombie process. - if (0 == child_pid) + if (0 == pid) { - pid_t pid = reinterpret_cast(real_vfork)(); + /// We are in the freshly created process. - if (pid == -1) - _exit(static_cast(ReturnCodes::CANNOT_VFORK_IN_CHILD)); + /// Why `_exit` and not `exit`? Because `exit` calls `atexit` and destructors of thread local storage. + /// And there is a lot of garbage (including, for example, mutex is blocked). And this can not be done after `vfork` - deadlock happens. - if (pid == 0) + /// Replace the file descriptors with the ends of our pipes. + if (STDIN_FILENO != dup2(pipe_stdin.fds_rw[0], STDIN_FILENO)) + _exit(static_cast(ReturnCodes::CANNOT_DUP_STDIN)); + + if (!config.pipe_stdin_only) { - /// We are in the freshly created process. + if (STDOUT_FILENO != dup2(pipe_stdout.fds_rw[1], STDOUT_FILENO)) + _exit(static_cast(ReturnCodes::CANNOT_DUP_STDOUT)); - /// Why `_exit` and not `exit`? Because `exit` calls `atexit` and destructors of thread local storage. - /// And there is a lot of garbage (including, for example, mutex is blocked). And this can not be done after `vfork` - deadlock happens. - - /// Replace the file descriptors with the ends of our pipes. - if (STDIN_FILENO != dup2(pipe_stdin.fds_rw[0], STDIN_FILENO)) - _exit(static_cast(ReturnCodes::CANNOT_DUP_STDIN)); - - if (!config.pipe_stdin_only) - { - if (STDOUT_FILENO != dup2(pipe_stdout.fds_rw[1], STDOUT_FILENO)) - _exit(static_cast(ReturnCodes::CANNOT_DUP_STDOUT)); - - if (STDERR_FILENO != dup2(pipe_stderr.fds_rw[1], STDERR_FILENO)) - _exit(static_cast(ReturnCodes::CANNOT_DUP_STDERR)); - } - - for (size_t i = 0; i < config.read_fds.size(); ++i) - { - auto & fds = *read_pipe_fds[i]; - auto fd = config.read_fds[i]; - - if (fd != dup2(fds.fds_rw[1], fd)) - _exit(static_cast(ReturnCodes::CANNOT_DUP_READ_DESCRIPTOR)); - } - - for (size_t i = 0; i < config.write_fds.size(); ++i) - { - auto & fds = *write_pipe_fds[i]; - auto fd = config.write_fds[i]; - - if (fd != dup2(fds.fds_rw[0], fd)) - _exit(static_cast(ReturnCodes::CANNOT_DUP_WRITE_DESCRIPTOR)); - } - - // Reset the signal mask: it may be non-empty and will be inherited - // by the child process, which might not expect this. - sigset_t mask; - sigemptyset(&mask); - sigprocmask(0, nullptr, &mask); // NOLINT(concurrency-mt-unsafe) - sigprocmask(SIG_UNBLOCK, &mask, nullptr); // NOLINT(concurrency-mt-unsafe) - - execv(filename, argv); - /// If the process is running, then `execv` does not return here. - - _exit(static_cast(ReturnCodes::CANNOT_EXEC)); + if (STDERR_FILENO != dup2(pipe_stderr.fds_rw[1], STDERR_FILENO)) + _exit(static_cast(ReturnCodes::CANNOT_DUP_STDERR)); } - else + + for (size_t i = 0; i < config.read_fds.size(); ++i) { - DB::writeIntBinary(pid, write_buffer_for_child); - write_buffer_for_child.next(); - _exit(0); + auto & fds = *read_pipe_fds[i]; + auto fd = config.read_fds[i]; + + if (fd != dup2(fds.fds_rw[1], fd)) + _exit(static_cast(ReturnCodes::CANNOT_DUP_READ_DESCRIPTOR)); } + + for (size_t i = 0; i < config.write_fds.size(); ++i) + { + auto & fds = *write_pipe_fds[i]; + auto fd = config.write_fds[i]; + + if (fd != dup2(fds.fds_rw[0], fd)) + _exit(static_cast(ReturnCodes::CANNOT_DUP_WRITE_DESCRIPTOR)); + } + + // Reset the signal mask: it may be non-empty and will be inherited + // by the child process, which might not expect this. + sigset_t mask; + sigemptyset(&mask); + sigprocmask(0, nullptr, &mask); // NOLINT(concurrency-mt-unsafe) + sigprocmask(SIG_UNBLOCK, &mask, nullptr); // NOLINT(concurrency-mt-unsafe) + + execv(filename, argv); + /// If the process is running, then `execv` does not return here. + + _exit(static_cast(ReturnCodes::CANNOT_EXEC)); } - else - { - int status = 0; - while (waitpid(child_pid, &status, 0) < 0) - { - if (errno != EINTR) - throw ErrnoException(ErrorCodes::CANNOT_WAITPID, "Cannot waitpid"); - } - int return_code = handleWaitStatus(child_pid, status); - handleProceesReturnCode(child_pid, return_code); - } - - pid_t grandchild_pid = 0; - DB::readIntBinary(grandchild_pid, read_buffer_for_parent); std::unique_ptr res(new ShellCommand( - grandchild_pid, + pid, pipe_stdin.fds_rw[1], pipe_stdout.fds_rw[0], pipe_stderr.fds_rw[0], @@ -275,7 +241,7 @@ std::unique_ptr ShellCommand::executeImpl( getLogger(), "Started shell command '{}' with pid {} and file descriptors: out {}, err {}", filename, - grandchild_pid, + pid, res->out.getFD(), res->err.getFD()); @@ -351,53 +317,44 @@ int ShellCommand::tryWait() LOG_TRACE(getLogger(), "Wait for shell command pid {} completed with status {}", pid, status); - return handleWaitStatus(pid, status); -} - -void ShellCommand::wait() -{ - int retcode = tryWait(); - handleProceesReturnCode(pid, retcode); -} - -int ShellCommand::handleWaitStatus(pid_t pid, int status) -{ if (WIFEXITED(status)) return WEXITSTATUS(status); if (WIFSIGNALED(status)) - throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process {} was terminated by signal {}", pid, toString(WTERMSIG(status))); + throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process was terminated by signal {}", toString(WTERMSIG(status))); if (WIFSTOPPED(status)) - throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process {} was stopped by signal {}", pid, toString(WSTOPSIG(status))); + throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process was stopped by signal {}", toString(WSTOPSIG(status))); - throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process {} was not exited normally by unknown reason", pid); + throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process was not exited normally by unknown reason"); } -void ShellCommand::handleProceesReturnCode(pid_t pid, int retcode) + +void ShellCommand::wait() { + int retcode = tryWait(); + if (retcode != EXIT_SUCCESS) { switch (retcode) { case static_cast(ReturnCodes::CANNOT_DUP_STDIN): - throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 stdin of child process: {}", pid); + throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 stdin of child process"); case static_cast(ReturnCodes::CANNOT_DUP_STDOUT): - throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 stdout of child process {}", pid); + throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 stdout of child process"); case static_cast(ReturnCodes::CANNOT_DUP_STDERR): - throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 stderr of child process {}", pid); + throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 stderr of child process"); case static_cast(ReturnCodes::CANNOT_EXEC): - throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot execv in child process {}", pid); + throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot execv in child process"); case static_cast(ReturnCodes::CANNOT_DUP_READ_DESCRIPTOR): - throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 read descriptor of child process {}", pid); + throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 read descriptor of child process"); case static_cast(ReturnCodes::CANNOT_DUP_WRITE_DESCRIPTOR): - throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 write descriptor of child process {}", pid); - case static_cast(ReturnCodes::CANNOT_VFORK_IN_CHILD): - throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot vfork in child procces {}", pid); + throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 write descriptor of child process"); default: - throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process {} was exited with return code {}", pid, toString(retcode)); + throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process was exited with return code {}", toString(retcode)); } } } + } diff --git a/src/Common/ShellCommand.h b/src/Common/ShellCommand.h index 17106db9698..7e068baab4c 100644 --- a/src/Common/ShellCommand.h +++ b/src/Common/ShellCommand.h @@ -67,6 +67,11 @@ public: DestructorStrategy terminate_in_destructor_strategy = DestructorStrategy(false, 0); }; + pid_t getPid() + { + return pid; + } + /// Run the command using /bin/sh -c. /// If terminate_in_destructor is true, send terminate signal in destructor and don't wait process. static std::unique_ptr execute(const Config & config); @@ -99,9 +104,6 @@ private: static LoggerPtr getLogger(); - static int handleWaitStatus(pid_t pid, int status); - static void handleProceesReturnCode(pid_t pid, int exit_code); - /// Print command name and the list of arguments to log. NOTE: No escaping of arguments is performed. static void logCommand(const char * filename, char * const argv[]); diff --git a/src/Common/SignalHandlers.cpp b/src/Common/SignalHandlers.cpp index 08261fb1cc1..7ee2ee031ae 100644 --- a/src/Common/SignalHandlers.cpp +++ b/src/Common/SignalHandlers.cpp @@ -68,6 +68,20 @@ void terminateRequestedSignalHandler(int sig, siginfo_t *, void *) writeSignalIDtoSignalPipe(sig); } +void childSignalHandler(int sig, siginfo_t * info, void *) +{ + DENY_ALLOCATIONS_IN_SCOPE; + auto saved_errno = errno; /// We must restore previous value of errno in signal handler. + + char buf[signal_pipe_buf_size]; + auto & signal_pipe = HandledSignals::instance().signal_pipe; + WriteBufferFromFileDescriptor out(signal_pipe.fds_rw[1], signal_pipe_buf_size, buf); + writeBinary(sig, out); + writeBinary(info->si_pid, out); + + out.next(); + errno = saved_errno; +} void signalHandler(int sig, siginfo_t * info, void * context) { @@ -294,6 +308,12 @@ void SignalListener::run() if (daemon) daemon->handleSignal(sig); } + else if (sig == SIGCHLD) + { + pid_t child_pid = 0; + readBinary(child_pid, in); + Context::getGlobalContextInstance()->terminateBackgroundShellCommand(child_pid); + } else { siginfo_t info{}; diff --git a/src/Common/SignalHandlers.h b/src/Common/SignalHandlers.h index e7519f7aee2..0ac6d1d2428 100644 --- a/src/Common/SignalHandlers.h +++ b/src/Common/SignalHandlers.h @@ -33,6 +33,7 @@ void closeLogsSignalHandler(int sig, siginfo_t *, void *); void terminateRequestedSignalHandler(int sig, siginfo_t *, void *); +void childSignalHandler(int sig, siginfo_t * info, void *); /** Handler for "fault" or diagnostic signals. Send data about fault to separate thread to write into log. */ diff --git a/src/Daemon/BaseDaemon.cpp b/src/Daemon/BaseDaemon.cpp index 8a8dd3c759c..9cbf9fecdb1 100644 --- a/src/Daemon/BaseDaemon.cpp +++ b/src/Daemon/BaseDaemon.cpp @@ -440,6 +440,7 @@ void BaseDaemon::initializeTerminationAndSignalProcessing() HandledSignals::instance().setupCommonDeadlySignalHandlers(); HandledSignals::instance().setupCommonTerminateRequestSignalHandlers(); HandledSignals::instance().addSignalHandler({SIGHUP}, closeLogsSignalHandler, true); + HandledSignals::instance().addSignalHandler({SIGCHLD}, childSignalHandler, true); /// Set up Poco ErrorHandler for Poco Threads. static KillingErrorHandler killing_error_handler; diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 7f0ad013c1d..85fd07360a4 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -100,6 +100,7 @@ #include #include #include +#include #include #include #include @@ -540,8 +541,9 @@ struct ContextSharedPart : boost::noncopyable /// No lock required for application_type modified only during initialization Context::ApplicationType application_type = Context::ApplicationType::SERVER; - /// vector of xdbc-bridge commands, they will be killed when Context will be destroyed - std::vector> bridge_commands TSA_GUARDED_BY(mutex); + /// Manager of running background shell commands. + /// They will be killed when Context will be destroyed or with SIGCHLD signal. + BackgroundShellCommandHolder background_active_shell_commands; /// No lock required for config_reload_callback, start_servers_callback, stop_servers_callback modified only during initialization Context::ConfigReloadCallback config_reload_callback; @@ -5067,10 +5069,14 @@ void Context::addQueryParameters(const NameToNameMap & parameters) query_parameters.insert_or_assign(name, value); } -void Context::addBridgeCommand(std::unique_ptr cmd) const +void Context::addBackgroundShellCommand(std::unique_ptr cmd) const { - std::lock_guard lock(shared->mutex); - shared->bridge_commands.emplace_back(std::move(cmd)); + shared->background_active_shell_commands.addCommand(std::move(cmd)); +} + +void Context::terminateBackgroundShellCommand(pid_t pid) const +{ + shared->background_active_shell_commands.removeCommand(pid); } diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 327ac0af5fd..f18676c1472 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -1288,8 +1288,11 @@ public: /// Overrides values of existing parameters. void addQueryParameters(const NameToNameMap & parameters); - /// Add started bridge command. It will be killed after context destruction - void addBridgeCommand(std::unique_ptr cmd) const; + /// Add background shell command. It will be killed after context destruction or with SIGCHLD. + void addBackgroundShellCommand(std::unique_ptr cmd) const; + + /// Terminate background shell command. + void terminateBackgroundShellCommand(pid_t pid) const; IHostContextPtr & getHostContext(); const IHostContextPtr & getHostContext() const; diff --git a/tests/integration/test_library_bridge/test.py b/tests/integration/test_library_bridge/test.py index 79ec17271be..7a31dfbda5c 100644 --- a/tests/integration/test_library_bridge/test.py +++ b/tests/integration/test_library_bridge/test.py @@ -39,6 +39,13 @@ def create_dict_simple(ch_instance): ) +def check_no_zombie_processes(instance): + res = instance.exec_in_container( + ["bash", "-c", "ps ax -ostat,pid | grep -e '[zZ]' | wc -l"], user="root" + ) + assert res == "0\n" + + @pytest.fixture(scope="module") def ch_cluster(): try: @@ -268,12 +275,8 @@ def test_recover_after_bridge_crash(ch_cluster): instance.exec_in_container( ["bash", "-c", "kill -9 `pidof clickhouse-library-bridge`"], user="root" ) - ## There are no zombie processes. - res = instance.exec_in_container( - ["bash", "-c", "ps ax -ostat,pid | grep -e '[zZ]' | wc -l"], user="root" - ) - assert res == "0\n" + check_no_zombie_processes(instance) instance.query("DROP DICTIONARY lib_dict_c") @@ -299,6 +302,8 @@ def test_server_restart_bridge_might_be_stil_alive(ch_cluster): result = instance.query("""select dictGet(lib_dict_c, 'value1', toUInt64(1));""") assert result.strip() == "101" + check_no_zombie_processes(instance) + instance.query("DROP DICTIONARY lib_dict_c")