Redesigned

This commit is contained in:
MikhailBurdukov 2024-11-05 15:49:01 +00:00
parent f12a582bd0
commit 4cc8d36419
12 changed files with 182 additions and 120 deletions

View File

@ -29,7 +29,7 @@ LibraryBridgeHelper::LibraryBridgeHelper(ContextPtr context_)
void LibraryBridgeHelper::startBridge(std::unique_ptr<ShellCommand> cmd) const
{
getContext()->addBridgeCommand(std::move(cmd));
getContext()->addBackgroundShellCommand(std::move(cmd));
}

View File

@ -144,7 +144,7 @@ protected:
void startBridge(std::unique_ptr<ShellCommand> cmd) const override
{
getContext()->addBridgeCommand(std::move(cmd));
getContext()->addBackgroundShellCommand(std::move(cmd));
}

View File

@ -0,0 +1,37 @@
#include <Common/logger_useful.h>
#include <Common/Exception.h>
#include <Common/BackgroundShellCommandHolder.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
LoggerPtr BackgroundShellCommandHolder::getLogger()
{
return ::getLogger("BackgroundShellCommandHolder");
}
void BackgroundShellCommandHolder::removeCommand(pid_t pid)
{
std::lock_guard lock(mutex);
bool is_erased = active_shell_commands.erase(pid);
LOG_TRACE(getLogger(), "Try to erase command with the pid {}, is_erased: {}", pid, is_erased);
}
void BackgroundShellCommandHolder::addCommand(std::unique_ptr<ShellCommand> command)
{
std::lock_guard lock(mutex);
pid_t command_pid = command->getPid();
auto [iterator, is_inserted] = active_shell_commands.emplace(std::make_pair(command_pid, std::move(command)));
if (!is_inserted)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't insert proccess PID {} into active shell commands, because there are running proccess with same PID", command_pid);
LOG_TRACE(getLogger(), "Inserted the command with pid {}", command_pid);
}
}

View File

@ -0,0 +1,30 @@
#pragma once
#include <memory>
#include <mutex>
#include <Common/ShellCommand.h>
#include <unordered_map>
namespace DB
{
/** The holder class for running background shell processes.
*/
class BackgroundShellCommandHolder final
{
public:
void removeCommand(pid_t pid);
void addCommand(std::unique_ptr<ShellCommand> command);
private:
using ActiveShellCommandsCollection = std::unordered_map<pid_t, std::unique_ptr<ShellCommand>>;
std::mutex mutex;
ActiveShellCommandsCollection active_shell_commands TSA_GUARDED_BY(mutex);
static LoggerPtr getLogger();
};
}

View File

@ -25,7 +25,6 @@ namespace
CANNOT_EXEC = 0x55555558,
CANNOT_DUP_READ_DESCRIPTOR = 0x55555559,
CANNOT_DUP_WRITE_DESCRIPTOR = 0x55555560,
CANNOT_VFORK_IN_CHILD = 0x55555561,
};
}
@ -152,10 +151,6 @@ std::unique_ptr<ShellCommand> ShellCommand::executeImpl(
PipeFDs pipe_stdout;
PipeFDs pipe_stderr;
PipeFDs pipe_with_child;
WriteBufferFromFile write_buffer_for_child(pipe_with_child.fds_rw[1]);
ReadBufferFromFile read_buffer_for_parent(pipe_with_child.fds_rw[0]);
std::vector<std::unique_ptr<PipeFDs>> read_pipe_fds;
std::vector<std::unique_ptr<PipeFDs>> write_pipe_fds;
@ -165,93 +160,64 @@ std::unique_ptr<ShellCommand> ShellCommand::executeImpl(
for (size_t i = 0; i < config.write_fds.size(); ++i)
write_pipe_fds.emplace_back(std::make_unique<PipeFDs>());
pid_t child_pid = reinterpret_cast<pid_t(*)()>(real_vfork)();
pid_t pid = reinterpret_cast<pid_t(*)()>(real_vfork)();
if (child_pid == -1)
if (pid == -1)
throw ErrnoException(ErrorCodes::CANNOT_FORK, "Cannot vfork");
/// Here we are using double vfork technique to prevent zombie process.
if (0 == child_pid)
if (0 == pid)
{
pid_t pid = reinterpret_cast<pid_t(*)()>(real_vfork)();
/// We are in the freshly created process.
if (pid == -1)
_exit(static_cast<int>(ReturnCodes::CANNOT_VFORK_IN_CHILD));
/// Why `_exit` and not `exit`? Because `exit` calls `atexit` and destructors of thread local storage.
/// And there is a lot of garbage (including, for example, mutex is blocked). And this can not be done after `vfork` - deadlock happens.
if (pid == 0)
/// Replace the file descriptors with the ends of our pipes.
if (STDIN_FILENO != dup2(pipe_stdin.fds_rw[0], STDIN_FILENO))
_exit(static_cast<int>(ReturnCodes::CANNOT_DUP_STDIN));
if (!config.pipe_stdin_only)
{
/// We are in the freshly created process.
if (STDOUT_FILENO != dup2(pipe_stdout.fds_rw[1], STDOUT_FILENO))
_exit(static_cast<int>(ReturnCodes::CANNOT_DUP_STDOUT));
/// Why `_exit` and not `exit`? Because `exit` calls `atexit` and destructors of thread local storage.
/// And there is a lot of garbage (including, for example, mutex is blocked). And this can not be done after `vfork` - deadlock happens.
/// Replace the file descriptors with the ends of our pipes.
if (STDIN_FILENO != dup2(pipe_stdin.fds_rw[0], STDIN_FILENO))
_exit(static_cast<int>(ReturnCodes::CANNOT_DUP_STDIN));
if (!config.pipe_stdin_only)
{
if (STDOUT_FILENO != dup2(pipe_stdout.fds_rw[1], STDOUT_FILENO))
_exit(static_cast<int>(ReturnCodes::CANNOT_DUP_STDOUT));
if (STDERR_FILENO != dup2(pipe_stderr.fds_rw[1], STDERR_FILENO))
_exit(static_cast<int>(ReturnCodes::CANNOT_DUP_STDERR));
}
for (size_t i = 0; i < config.read_fds.size(); ++i)
{
auto & fds = *read_pipe_fds[i];
auto fd = config.read_fds[i];
if (fd != dup2(fds.fds_rw[1], fd))
_exit(static_cast<int>(ReturnCodes::CANNOT_DUP_READ_DESCRIPTOR));
}
for (size_t i = 0; i < config.write_fds.size(); ++i)
{
auto & fds = *write_pipe_fds[i];
auto fd = config.write_fds[i];
if (fd != dup2(fds.fds_rw[0], fd))
_exit(static_cast<int>(ReturnCodes::CANNOT_DUP_WRITE_DESCRIPTOR));
}
// Reset the signal mask: it may be non-empty and will be inherited
// by the child process, which might not expect this.
sigset_t mask;
sigemptyset(&mask);
sigprocmask(0, nullptr, &mask); // NOLINT(concurrency-mt-unsafe)
sigprocmask(SIG_UNBLOCK, &mask, nullptr); // NOLINT(concurrency-mt-unsafe)
execv(filename, argv);
/// If the process is running, then `execv` does not return here.
_exit(static_cast<int>(ReturnCodes::CANNOT_EXEC));
if (STDERR_FILENO != dup2(pipe_stderr.fds_rw[1], STDERR_FILENO))
_exit(static_cast<int>(ReturnCodes::CANNOT_DUP_STDERR));
}
else
for (size_t i = 0; i < config.read_fds.size(); ++i)
{
DB::writeIntBinary(pid, write_buffer_for_child);
write_buffer_for_child.next();
_exit(0);
auto & fds = *read_pipe_fds[i];
auto fd = config.read_fds[i];
if (fd != dup2(fds.fds_rw[1], fd))
_exit(static_cast<int>(ReturnCodes::CANNOT_DUP_READ_DESCRIPTOR));
}
for (size_t i = 0; i < config.write_fds.size(); ++i)
{
auto & fds = *write_pipe_fds[i];
auto fd = config.write_fds[i];
if (fd != dup2(fds.fds_rw[0], fd))
_exit(static_cast<int>(ReturnCodes::CANNOT_DUP_WRITE_DESCRIPTOR));
}
// Reset the signal mask: it may be non-empty and will be inherited
// by the child process, which might not expect this.
sigset_t mask;
sigemptyset(&mask);
sigprocmask(0, nullptr, &mask); // NOLINT(concurrency-mt-unsafe)
sigprocmask(SIG_UNBLOCK, &mask, nullptr); // NOLINT(concurrency-mt-unsafe)
execv(filename, argv);
/// If the process is running, then `execv` does not return here.
_exit(static_cast<int>(ReturnCodes::CANNOT_EXEC));
}
else
{
int status = 0;
while (waitpid(child_pid, &status, 0) < 0)
{
if (errno != EINTR)
throw ErrnoException(ErrorCodes::CANNOT_WAITPID, "Cannot waitpid");
}
int return_code = handleWaitStatus(child_pid, status);
handleProceesReturnCode(child_pid, return_code);
}
pid_t grandchild_pid = 0;
DB::readIntBinary(grandchild_pid, read_buffer_for_parent);
std::unique_ptr<ShellCommand> res(new ShellCommand(
grandchild_pid,
pid,
pipe_stdin.fds_rw[1],
pipe_stdout.fds_rw[0],
pipe_stderr.fds_rw[0],
@ -275,7 +241,7 @@ std::unique_ptr<ShellCommand> ShellCommand::executeImpl(
getLogger(),
"Started shell command '{}' with pid {} and file descriptors: out {}, err {}",
filename,
grandchild_pid,
pid,
res->out.getFD(),
res->err.getFD());
@ -351,53 +317,44 @@ int ShellCommand::tryWait()
LOG_TRACE(getLogger(), "Wait for shell command pid {} completed with status {}", pid, status);
return handleWaitStatus(pid, status);
}
void ShellCommand::wait()
{
int retcode = tryWait();
handleProceesReturnCode(pid, retcode);
}
int ShellCommand::handleWaitStatus(pid_t pid, int status)
{
if (WIFEXITED(status))
return WEXITSTATUS(status);
if (WIFSIGNALED(status))
throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process {} was terminated by signal {}", pid, toString(WTERMSIG(status)));
throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process was terminated by signal {}", toString(WTERMSIG(status)));
if (WIFSTOPPED(status))
throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process {} was stopped by signal {}", pid, toString(WSTOPSIG(status)));
throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process was stopped by signal {}", toString(WSTOPSIG(status)));
throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process {} was not exited normally by unknown reason", pid);
throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process was not exited normally by unknown reason");
}
void ShellCommand::handleProceesReturnCode(pid_t pid, int retcode)
void ShellCommand::wait()
{
int retcode = tryWait();
if (retcode != EXIT_SUCCESS)
{
switch (retcode)
{
case static_cast<int>(ReturnCodes::CANNOT_DUP_STDIN):
throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 stdin of child process: {}", pid);
throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 stdin of child process");
case static_cast<int>(ReturnCodes::CANNOT_DUP_STDOUT):
throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 stdout of child process {}", pid);
throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 stdout of child process");
case static_cast<int>(ReturnCodes::CANNOT_DUP_STDERR):
throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 stderr of child process {}", pid);
throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 stderr of child process");
case static_cast<int>(ReturnCodes::CANNOT_EXEC):
throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot execv in child process {}", pid);
throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot execv in child process");
case static_cast<int>(ReturnCodes::CANNOT_DUP_READ_DESCRIPTOR):
throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 read descriptor of child process {}", pid);
throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 read descriptor of child process");
case static_cast<int>(ReturnCodes::CANNOT_DUP_WRITE_DESCRIPTOR):
throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 write descriptor of child process {}", pid);
case static_cast<int>(ReturnCodes::CANNOT_VFORK_IN_CHILD):
throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot vfork in child procces {}", pid);
throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 write descriptor of child process");
default:
throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process {} was exited with return code {}", pid, toString(retcode));
throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process was exited with return code {}", toString(retcode));
}
}
}
}

View File

@ -67,6 +67,11 @@ public:
DestructorStrategy terminate_in_destructor_strategy = DestructorStrategy(false, 0);
};
pid_t getPid()
{
return pid;
}
/// Run the command using /bin/sh -c.
/// If terminate_in_destructor is true, send terminate signal in destructor and don't wait process.
static std::unique_ptr<ShellCommand> execute(const Config & config);
@ -99,9 +104,6 @@ private:
static LoggerPtr getLogger();
static int handleWaitStatus(pid_t pid, int status);
static void handleProceesReturnCode(pid_t pid, int exit_code);
/// Print command name and the list of arguments to log. NOTE: No escaping of arguments is performed.
static void logCommand(const char * filename, char * const argv[]);

View File

@ -68,6 +68,20 @@ void terminateRequestedSignalHandler(int sig, siginfo_t *, void *)
writeSignalIDtoSignalPipe(sig);
}
void childSignalHandler(int sig, siginfo_t * info, void *)
{
DENY_ALLOCATIONS_IN_SCOPE;
auto saved_errno = errno; /// We must restore previous value of errno in signal handler.
char buf[signal_pipe_buf_size];
auto & signal_pipe = HandledSignals::instance().signal_pipe;
WriteBufferFromFileDescriptor out(signal_pipe.fds_rw[1], signal_pipe_buf_size, buf);
writeBinary(sig, out);
writeBinary(info->si_pid, out);
out.next();
errno = saved_errno;
}
void signalHandler(int sig, siginfo_t * info, void * context)
{
@ -294,6 +308,12 @@ void SignalListener::run()
if (daemon)
daemon->handleSignal(sig);
}
else if (sig == SIGCHLD)
{
pid_t child_pid = 0;
readBinary(child_pid, in);
Context::getGlobalContextInstance()->terminateBackgroundShellCommand(child_pid);
}
else
{
siginfo_t info{};

View File

@ -33,6 +33,7 @@ void closeLogsSignalHandler(int sig, siginfo_t *, void *);
void terminateRequestedSignalHandler(int sig, siginfo_t *, void *);
void childSignalHandler(int sig, siginfo_t * info, void *);
/** Handler for "fault" or diagnostic signals. Send data about fault to separate thread to write into log.
*/

View File

@ -440,6 +440,7 @@ void BaseDaemon::initializeTerminationAndSignalProcessing()
HandledSignals::instance().setupCommonDeadlySignalHandlers();
HandledSignals::instance().setupCommonTerminateRequestSignalHandlers();
HandledSignals::instance().addSignalHandler({SIGHUP}, closeLogsSignalHandler, true);
HandledSignals::instance().addSignalHandler({SIGCHLD}, childSignalHandler, true);
/// Set up Poco ErrorHandler for Poco Threads.
static KillingErrorHandler killing_error_handler;

View File

@ -100,6 +100,7 @@
#include <Common/Config/AbstractConfigurationComparison.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ShellCommand.h>
#include <Common/BackgroundShellCommandHolder.h>
#include <Common/logger_useful.h>
#include <Common/RemoteHostFilter.h>
#include <Common/HTTPHeaderFilter.h>
@ -540,8 +541,9 @@ struct ContextSharedPart : boost::noncopyable
/// No lock required for application_type modified only during initialization
Context::ApplicationType application_type = Context::ApplicationType::SERVER;
/// vector of xdbc-bridge commands, they will be killed when Context will be destroyed
std::vector<std::unique_ptr<ShellCommand>> bridge_commands TSA_GUARDED_BY(mutex);
/// Manager of running background shell commands.
/// They will be killed when Context will be destroyed or with SIGCHLD signal.
BackgroundShellCommandHolder background_active_shell_commands;
/// No lock required for config_reload_callback, start_servers_callback, stop_servers_callback modified only during initialization
Context::ConfigReloadCallback config_reload_callback;
@ -5067,10 +5069,14 @@ void Context::addQueryParameters(const NameToNameMap & parameters)
query_parameters.insert_or_assign(name, value);
}
void Context::addBridgeCommand(std::unique_ptr<ShellCommand> cmd) const
void Context::addBackgroundShellCommand(std::unique_ptr<ShellCommand> cmd) const
{
std::lock_guard lock(shared->mutex);
shared->bridge_commands.emplace_back(std::move(cmd));
shared->background_active_shell_commands.addCommand(std::move(cmd));
}
void Context::terminateBackgroundShellCommand(pid_t pid) const
{
shared->background_active_shell_commands.removeCommand(pid);
}

View File

@ -1288,8 +1288,11 @@ public:
/// Overrides values of existing parameters.
void addQueryParameters(const NameToNameMap & parameters);
/// Add started bridge command. It will be killed after context destruction
void addBridgeCommand(std::unique_ptr<ShellCommand> cmd) const;
/// Add background shell command. It will be killed after context destruction or with SIGCHLD.
void addBackgroundShellCommand(std::unique_ptr<ShellCommand> cmd) const;
/// Terminate background shell command.
void terminateBackgroundShellCommand(pid_t pid) const;
IHostContextPtr & getHostContext();
const IHostContextPtr & getHostContext() const;

View File

@ -39,6 +39,13 @@ def create_dict_simple(ch_instance):
)
def check_no_zombie_processes(instance):
res = instance.exec_in_container(
["bash", "-c", "ps ax -ostat,pid | grep -e '[zZ]' | wc -l"], user="root"
)
assert res == "0\n"
@pytest.fixture(scope="module")
def ch_cluster():
try:
@ -268,12 +275,8 @@ def test_recover_after_bridge_crash(ch_cluster):
instance.exec_in_container(
["bash", "-c", "kill -9 `pidof clickhouse-library-bridge`"], user="root"
)
## There are no zombie processes.
res = instance.exec_in_container(
["bash", "-c", "ps ax -ostat,pid | grep -e '[zZ]' | wc -l"], user="root"
)
assert res == "0\n"
check_no_zombie_processes(instance)
instance.query("DROP DICTIONARY lib_dict_c")
@ -299,6 +302,8 @@ def test_server_restart_bridge_might_be_stil_alive(ch_cluster):
result = instance.query("""select dictGet(lib_dict_c, 'value1', toUInt64(1));""")
assert result.strip() == "101"
check_no_zombie_processes(instance)
instance.query("DROP DICTIONARY lib_dict_c")