This commit is contained in:
MikhailBurdukov 2024-11-21 00:09:45 +01:00 committed by GitHub
commit 1b52d7e4b4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 224 additions and 31 deletions

View File

@ -2,6 +2,7 @@
#include <Core/ServerSettings.h> #include <Core/ServerSettings.h>
#include <Core/Settings.h> #include <Core/Settings.h>
#include <Common/ShellCommandsHolder.h>
#include <IO/ConnectionTimeouts.h> #include <IO/ConnectionTimeouts.h>
namespace DB namespace DB
@ -29,7 +30,7 @@ LibraryBridgeHelper::LibraryBridgeHelper(ContextPtr context_)
void LibraryBridgeHelper::startBridge(std::unique_ptr<ShellCommand> cmd) const void LibraryBridgeHelper::startBridge(std::unique_ptr<ShellCommand> cmd) const
{ {
getContext()->addBridgeCommand(std::move(cmd)); ShellCommandsHolder::instance().addCommand(std::move(cmd));
} }

View File

@ -11,6 +11,7 @@
#include <Poco/Util/AbstractConfiguration.h> #include <Poco/Util/AbstractConfiguration.h>
#include <Common/BridgeProtocolVersion.h> #include <Common/BridgeProtocolVersion.h>
#include <Common/ShellCommand.h> #include <Common/ShellCommand.h>
#include <Common/ShellCommandsHolder.h>
#include <IO/ConnectionTimeouts.h> #include <IO/ConnectionTimeouts.h>
#include <base/range.h> #include <base/range.h>
#include <BridgeHelper/IBridgeHelper.h> #include <BridgeHelper/IBridgeHelper.h>
@ -144,7 +145,7 @@ protected:
void startBridge(std::unique_ptr<ShellCommand> cmd) const override void startBridge(std::unique_ptr<ShellCommand> cmd) const override
{ {
getContext()->addBridgeCommand(std::move(cmd)); ShellCommandsHolder::instance().addCommand(std::move(cmd));
} }

View File

@ -61,6 +61,9 @@ LoggerPtr ShellCommand::getLogger()
ShellCommand::~ShellCommand() ShellCommand::~ShellCommand()
{ {
if (do_not_terminate)
return;
if (wait_called) if (wait_called)
return; return;
@ -291,11 +294,48 @@ std::unique_ptr<ShellCommand> ShellCommand::executeDirect(const ShellCommand::Co
return executeImpl(path.data(), argv.data(), config); return executeImpl(path.data(), argv.data(), config);
} }
struct ShellCommand::tryWaitResult
{
bool is_process_terminated = false;
int retcode = -1;
};
int ShellCommand::tryWait() int ShellCommand::tryWait()
{ {
return tryWaitImpl(true).retcode;
}
ShellCommand::tryWaitResult ShellCommand::tryWaitImpl(bool blocking)
{
LOG_TRACE(getLogger(), "Will wait for shell command pid {}", pid);
ShellCommand::tryWaitResult result;
int options = ((!blocking) ? WNOHANG : 0);
int status = 0;
int waitpid_retcode = -1;
while (waitpid_retcode < 0)
{
waitpid_retcode = waitpid(pid, &status, options);
if (waitpid_retcode > 0)
{
break;
}
if (!blocking && !waitpid_retcode)
{
result.is_process_terminated = false;
return result;
}
if (errno != EINTR)
throw ErrnoException(ErrorCodes::CANNOT_WAITPID, "Cannot waitpid");
}
LOG_TRACE(getLogger(), "Wait for shell command pid {} completed with status {}", pid, status);
wait_called = true; wait_called = true;
result.is_process_terminated = true;
in.close(); in.close();
out.close(); out.close();
err.close(); err.close();
@ -306,19 +346,11 @@ int ShellCommand::tryWait()
for (auto & [_, fd] : read_fds) for (auto & [_, fd] : read_fds)
fd.close(); fd.close();
LOG_TRACE(getLogger(), "Will wait for shell command pid {}", pid);
int status = 0;
while (waitpid(pid, &status, 0) < 0)
{
if (errno != EINTR)
throw ErrnoException(ErrorCodes::CANNOT_WAITPID, "Cannot waitpid");
}
LOG_TRACE(getLogger(), "Wait for shell command pid {} completed with status {}", pid, status);
if (WIFEXITED(status)) if (WIFEXITED(status))
return WEXITSTATUS(status); {
result.retcode = WEXITSTATUS(status);
return result;
}
if (WIFSIGNALED(status)) if (WIFSIGNALED(status))
throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process was terminated by signal {}", toString(WTERMSIG(status))); throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process was terminated by signal {}", toString(WTERMSIG(status)));
@ -330,10 +362,8 @@ int ShellCommand::tryWait()
} }
void ShellCommand::wait() void ShellCommand::handleProcessRetcode(int retcode) const
{ {
int retcode = tryWait();
if (retcode != EXIT_SUCCESS) if (retcode != EXIT_SUCCESS)
{ {
switch (retcode) switch (retcode)
@ -356,5 +386,22 @@ void ShellCommand::wait()
} }
} }
bool ShellCommand::waitIfProccesTerminated()
{
auto proc_status = tryWaitImpl(false);
if (proc_status.is_process_terminated)
{
handleProcessRetcode(proc_status.retcode);
}
return proc_status.is_process_terminated;
}
void ShellCommand::wait()
{
int retcode = tryWaitImpl(true).retcode;
handleProcessRetcode(retcode);
}
} }

View File

@ -67,6 +67,21 @@ public:
DestructorStrategy terminate_in_destructor_strategy = DestructorStrategy(false, 0); DestructorStrategy terminate_in_destructor_strategy = DestructorStrategy(false, 0);
}; };
pid_t getPid() const
{
return pid;
}
bool isWaitCalled() const
{
return wait_called;
}
void setDoNotTerminate()
{
do_not_terminate = true;
}
/// Run the command using /bin/sh -c. /// Run the command using /bin/sh -c.
/// If terminate_in_destructor is true, send terminate signal in destructor and don't wait process. /// If terminate_in_destructor is true, send terminate signal in destructor and don't wait process.
static std::unique_ptr<ShellCommand> execute(const Config & config); static std::unique_ptr<ShellCommand> execute(const Config & config);
@ -81,6 +96,10 @@ public:
/// Wait for the process to finish, see the return code. To throw an exception if the process was not completed independently. /// Wait for the process to finish, see the return code. To throw an exception if the process was not completed independently.
int tryWait(); int tryWait();
/// Returns if process terminated.
/// If process terminated, then handle return code.
bool waitIfProccesTerminated();
WriteBufferFromFile in; /// If the command reads from stdin, do not forget to call in.close() after writing all the data there. WriteBufferFromFile in; /// If the command reads from stdin, do not forget to call in.close() after writing all the data there.
ReadBufferFromFile out; ReadBufferFromFile out;
ReadBufferFromFile err; ReadBufferFromFile err;
@ -92,10 +111,16 @@ private:
pid_t pid; pid_t pid;
Config config; Config config;
bool wait_called = false; bool wait_called = false;
bool do_not_terminate = false;
ShellCommand(pid_t pid_, int & in_fd_, int & out_fd_, int & err_fd_, const Config & config); ShellCommand(pid_t pid_, int & in_fd_, int & out_fd_, int & err_fd_, const Config & config);
bool tryWaitProcessWithTimeout(size_t timeout_in_seconds); bool tryWaitProcessWithTimeout(size_t timeout_in_seconds);
struct tryWaitResult;
tryWaitResult tryWaitImpl(bool blocking);
void handleProcessRetcode(int retcode) const;
static LoggerPtr getLogger(); static LoggerPtr getLogger();

View File

@ -0,0 +1,53 @@
#include <Common/logger_useful.h>
#include <Common/Exception.h>
#include <Common/ShellCommandsHolder.h>
namespace DB
{
ShellCommandsHolder & ShellCommandsHolder::instance()
{
static ShellCommandsHolder instance;
return instance;
}
void ShellCommandsHolder::removeCommand(pid_t pid)
{
std::lock_guard lock(mutex);
bool is_erased = shell_commands.erase(pid);
LOG_TRACE(log, "Try to erase command with the pid {}, is_erased: {}", pid, is_erased);
}
void ShellCommandsHolder::addCommand(std::unique_ptr<ShellCommand> command)
{
std::lock_guard lock(mutex);
pid_t command_pid = command->getPid();
if (command->waitIfProccesTerminated())
{
LOG_TRACE(log, "Pid {} already finished. Do not insert it.", command_pid);
return;
}
auto [iterator, is_inserted] = shell_commands.try_emplace(command_pid, std::move(command));
if (is_inserted)
{
LOG_TRACE(log, "Inserted the command with pid {}", command_pid);
return;
}
if (iterator->second->isWaitCalled())
{
iterator->second = std::move(command);
LOG_TRACE(log, "Replaced the command with pid {}", command_pid);
return;
}
/// We got two active ShellCommand with the same pid.
/// Probably it is a bug, will try to replace the old shell command with a new one.
chassert(false);
LOG_WARNING(log, "The PID already presented in active shell commands, will try to replace with a new one.");
iterator->second->setDoNotTerminate();
iterator->second = std::move(command);
}
}

View File

@ -0,0 +1,32 @@
#pragma once
#include <Common/ShellCommand.h>
#include <boost/noncopyable.hpp>
#include <memory>
#include <mutex>
#include <unordered_map>
namespace DB
{
/** The holder class for running background shell processes.
*/
class ShellCommandsHolder final : public boost::noncopyable
{
public:
static ShellCommandsHolder & instance();
void removeCommand(pid_t pid);
void addCommand(std::unique_ptr<ShellCommand> command);
private:
using ShellCommands = std::unordered_map<pid_t, std::unique_ptr<ShellCommand>>;
std::mutex mutex;
ShellCommands shell_commands TSA_GUARDED_BY(mutex);
LoggerPtr log = getLogger("ShellCommandsHolder");
};
}

View File

@ -1,6 +1,7 @@
#include <Common/SignalHandlers.h> #include <Common/SignalHandlers.h>
#include <Common/config_version.h> #include <Common/config_version.h>
#include <Common/getHashOfLoadedBinary.h> #include <Common/getHashOfLoadedBinary.h>
#include <Common/ShellCommandsHolder.h>
#include <Common/CurrentThread.h> #include <Common/CurrentThread.h>
#include <Daemon/BaseDaemon.h> #include <Daemon/BaseDaemon.h>
#include <Daemon/SentryWriter.h> #include <Daemon/SentryWriter.h>
@ -68,6 +69,20 @@ void terminateRequestedSignalHandler(int sig, siginfo_t *, void *)
writeSignalIDtoSignalPipe(sig); writeSignalIDtoSignalPipe(sig);
} }
void childSignalHandler(int sig, siginfo_t * info, void *)
{
DENY_ALLOCATIONS_IN_SCOPE;
auto saved_errno = errno; /// We must restore previous value of errno in signal handler.
char buf[signal_pipe_buf_size];
auto & signal_pipe = HandledSignals::instance().signal_pipe;
WriteBufferFromFileDescriptor out(signal_pipe.fds_rw[1], signal_pipe_buf_size, buf);
writeBinary(sig, out);
writeBinary(info->si_pid, out);
out.next();
errno = saved_errno;
}
void signalHandler(int sig, siginfo_t * info, void * context) void signalHandler(int sig, siginfo_t * info, void * context)
{ {
@ -294,6 +309,12 @@ void SignalListener::run()
if (daemon) if (daemon)
daemon->handleSignal(sig); daemon->handleSignal(sig);
} }
else if (sig == SIGCHLD)
{
pid_t child_pid = 0;
readBinary(child_pid, in);
ShellCommandsHolder::instance().removeCommand(child_pid);
}
else else
{ {
siginfo_t info{}; siginfo_t info{};

View File

@ -33,6 +33,7 @@ void closeLogsSignalHandler(int sig, siginfo_t *, void *);
void terminateRequestedSignalHandler(int sig, siginfo_t *, void *); void terminateRequestedSignalHandler(int sig, siginfo_t *, void *);
void childSignalHandler(int sig, siginfo_t * info, void *);
/** Handler for "fault" or diagnostic signals. Send data about fault to separate thread to write into log. /** Handler for "fault" or diagnostic signals. Send data about fault to separate thread to write into log.
*/ */

View File

@ -440,6 +440,7 @@ void BaseDaemon::initializeTerminationAndSignalProcessing()
HandledSignals::instance().setupCommonDeadlySignalHandlers(); HandledSignals::instance().setupCommonDeadlySignalHandlers();
HandledSignals::instance().setupCommonTerminateRequestSignalHandlers(); HandledSignals::instance().setupCommonTerminateRequestSignalHandlers();
HandledSignals::instance().addSignalHandler({SIGHUP}, closeLogsSignalHandler, true); HandledSignals::instance().addSignalHandler({SIGHUP}, closeLogsSignalHandler, true);
HandledSignals::instance().addSignalHandler({SIGCHLD}, childSignalHandler, true);
/// Set up Poco ErrorHandler for Poco Threads. /// Set up Poco ErrorHandler for Poco Threads.
static KillingErrorHandler killing_error_handler; static KillingErrorHandler killing_error_handler;

View File

@ -540,9 +540,6 @@ struct ContextSharedPart : boost::noncopyable
/// No lock required for application_type modified only during initialization /// No lock required for application_type modified only during initialization
Context::ApplicationType application_type = Context::ApplicationType::SERVER; Context::ApplicationType application_type = Context::ApplicationType::SERVER;
/// vector of xdbc-bridge commands, they will be killed when Context will be destroyed
std::vector<std::unique_ptr<ShellCommand>> bridge_commands TSA_GUARDED_BY(mutex);
/// No lock required for config_reload_callback, start_servers_callback, stop_servers_callback modified only during initialization /// No lock required for config_reload_callback, start_servers_callback, stop_servers_callback modified only during initialization
Context::ConfigReloadCallback config_reload_callback; Context::ConfigReloadCallback config_reload_callback;
Context::StartStopServersCallback start_servers_callback; Context::StartStopServersCallback start_servers_callback;
@ -5067,12 +5064,6 @@ void Context::addQueryParameters(const NameToNameMap & parameters)
query_parameters.insert_or_assign(name, value); query_parameters.insert_or_assign(name, value);
} }
void Context::addBridgeCommand(std::unique_ptr<ShellCommand> cmd) const
{
std::lock_guard lock(shared->mutex);
shared->bridge_commands.emplace_back(std::move(cmd));
}
IHostContextPtr & Context::getHostContext() IHostContextPtr & Context::getHostContext()
{ {

View File

@ -1288,8 +1288,6 @@ public:
/// Overrides values of existing parameters. /// Overrides values of existing parameters.
void addQueryParameters(const NameToNameMap & parameters); void addQueryParameters(const NameToNameMap & parameters);
/// Add started bridge command. It will be killed after context destruction
void addBridgeCommand(std::unique_ptr<ShellCommand> cmd) const;
IHostContextPtr & getHostContext(); IHostContextPtr & getHostContext();
const IHostContextPtr & getHostContext() const; const IHostContextPtr & getHostContext() const;

View File

@ -1657,6 +1657,7 @@ class ClickHouseCluster:
extra_configs=[], extra_configs=[],
extra_args="", extra_args="",
randomize_settings=True, randomize_settings=True,
use_docker_init_flag=False,
) -> "ClickHouseInstance": ) -> "ClickHouseInstance":
"""Add an instance to the cluster. """Add an instance to the cluster.
@ -1762,6 +1763,7 @@ class ClickHouseCluster:
config_root_name=config_root_name, config_root_name=config_root_name,
extra_configs=extra_configs, extra_configs=extra_configs,
randomize_settings=randomize_settings, randomize_settings=randomize_settings,
use_docker_init_flag=use_docker_init_flag,
) )
docker_compose_yml_dir = get_docker_compose_path() docker_compose_yml_dir = get_docker_compose_path()
@ -3353,6 +3355,7 @@ services:
{ipv6_address} {ipv6_address}
{net_aliases} {net_aliases}
{net_alias1} {net_alias1}
init: {init_flag}
""" """
@ -3419,6 +3422,7 @@ class ClickHouseInstance:
config_root_name="clickhouse", config_root_name="clickhouse",
extra_configs=[], extra_configs=[],
randomize_settings=True, randomize_settings=True,
use_docker_init_flag=False,
): ):
self.name = name self.name = name
self.base_cmd = cluster.base_cmd self.base_cmd = cluster.base_cmd
@ -3545,6 +3549,7 @@ class ClickHouseInstance:
self.with_installed_binary = with_installed_binary self.with_installed_binary = with_installed_binary
self.is_up = False self.is_up = False
self.config_root_name = config_root_name self.config_root_name = config_root_name
self.docker_init_flag = use_docker_init_flag
def is_built_with_sanitizer(self, sanitizer_name=""): def is_built_with_sanitizer(self, sanitizer_name=""):
build_opts = self.query( build_opts = self.query(
@ -4838,6 +4843,7 @@ class ClickHouseInstance:
ipv6_address=ipv6_address, ipv6_address=ipv6_address,
net_aliases=net_aliases, net_aliases=net_aliases,
net_alias1=net_alias1, net_alias1=net_alias1,
init_flag="true" if self.docker_init_flag else "false",
) )
) )

View File

@ -14,6 +14,11 @@ instance = cluster.add_instance(
dictionaries=["configs/dictionaries/dict1.xml"], dictionaries=["configs/dictionaries/dict1.xml"],
main_configs=["configs/config.d/config.xml"], main_configs=["configs/config.d/config.xml"],
stay_alive=True, stay_alive=True,
# WA for the problem with zombie processes inside the docker container.
# This is important here because we are checking that there are no zombie processes
# after craches inside the library bridge.
# https://forums.docker.com/t/what-the-latest-with-the-zombie-process-reaping-problem/50758/2
use_docker_init_flag=True,
) )
@ -34,6 +39,13 @@ def create_dict_simple(ch_instance):
) )
def check_no_zombie_processes(instance):
res = instance.exec_in_container(
["bash", "-c", "ps ax -ostat,pid | grep -e '[zZ]' | wc -l"], user="root"
)
assert res == "0\n"
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def ch_cluster(): def ch_cluster():
try: try:
@ -263,6 +275,8 @@ def test_recover_after_bridge_crash(ch_cluster):
instance.exec_in_container( instance.exec_in_container(
["bash", "-c", "kill -9 `pidof clickhouse-library-bridge`"], user="root" ["bash", "-c", "kill -9 `pidof clickhouse-library-bridge`"], user="root"
) )
check_no_zombie_processes(instance)
instance.query("DROP DICTIONARY lib_dict_c") instance.query("DROP DICTIONARY lib_dict_c")
@ -288,6 +302,8 @@ def test_server_restart_bridge_might_be_stil_alive(ch_cluster):
result = instance.query("""select dictGet(lib_dict_c, 'value1', toUInt64(1));""") result = instance.query("""select dictGet(lib_dict_c, 'value1', toUInt64(1));""")
assert result.strip() == "101" assert result.strip() == "101"
check_no_zombie_processes(instance)
instance.query("DROP DICTIONARY lib_dict_c") instance.query("DROP DICTIONARY lib_dict_c")

View File

@ -71,13 +71,13 @@ def test_bridge_dies_with_parent(ch_cluster):
except: except:
pass pass
for i in range(30): for i in range(60):
time.sleep(1) time.sleep(1)
clickhouse_pid = instance.get_process_pid("clickhouse server") clickhouse_pid = instance.get_process_pid("clickhouse server")
if clickhouse_pid is None: if clickhouse_pid is None:
break break
for i in range(30): for i in range(60):
time.sleep(1) time.sleep(1)
bridge_pid = instance.get_process_pid("library-bridge") bridge_pid = instance.get_process_pid("library-bridge")
if bridge_pid is None: if bridge_pid is None:
@ -95,5 +95,5 @@ def test_bridge_dies_with_parent(ch_cluster):
assert clickhouse_pid is None assert clickhouse_pid is None
assert bridge_pid is None assert bridge_pid is None
finally: finally:
instance.start_clickhouse(20) instance.start_clickhouse(60)
instance.query("DROP DICTIONARY lib_dict_c") instance.query("DROP DICTIONARY lib_dict_c")