mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Redesigned
This commit is contained in:
parent
9c77f61f8e
commit
5d7d331091
@ -29,7 +29,7 @@ LibraryBridgeHelper::LibraryBridgeHelper(ContextPtr context_)
|
||||
|
||||
void LibraryBridgeHelper::startBridge(std::unique_ptr<ShellCommand> cmd) const
|
||||
{
|
||||
getContext()->addBridgeCommand(std::move(cmd));
|
||||
getContext()->addBackgroundShellCommand(std::move(cmd));
|
||||
}
|
||||
|
||||
|
||||
|
@ -144,7 +144,7 @@ protected:
|
||||
|
||||
void startBridge(std::unique_ptr<ShellCommand> cmd) const override
|
||||
{
|
||||
getContext()->addBridgeCommand(std::move(cmd));
|
||||
getContext()->addBackgroundShellCommand(std::move(cmd));
|
||||
}
|
||||
|
||||
|
||||
|
37
src/Common/BackgroundShellCommandHolder.cpp
Normal file
37
src/Common/BackgroundShellCommandHolder.cpp
Normal file
@ -0,0 +1,37 @@
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/BackgroundShellCommandHolder.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
LoggerPtr BackgroundShellCommandHolder::getLogger()
|
||||
{
|
||||
return ::getLogger("BackgroundShellCommandHolder");
|
||||
}
|
||||
|
||||
|
||||
void BackgroundShellCommandHolder::removeCommand(pid_t pid)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
bool is_erased = active_shell_commands.erase(pid);
|
||||
LOG_TRACE(getLogger(), "Try to erase command with the pid {}, is_erased: {}", pid, is_erased);
|
||||
}
|
||||
|
||||
void BackgroundShellCommandHolder::addCommand(std::unique_ptr<ShellCommand> command)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
pid_t command_pid = command->getPid();
|
||||
|
||||
auto [iterator, is_inserted] = active_shell_commands.emplace(std::make_pair(command_pid, std::move(command)));
|
||||
if (!is_inserted)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't insert proccess PID {} into active shell commands, because there are running proccess with same PID", command_pid);
|
||||
|
||||
LOG_TRACE(getLogger(), "Inserted the command with pid {}", command_pid);
|
||||
}
|
||||
}
|
30
src/Common/BackgroundShellCommandHolder.h
Normal file
30
src/Common/BackgroundShellCommandHolder.h
Normal file
@ -0,0 +1,30 @@
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
|
||||
#include <Common/ShellCommand.h>
|
||||
#include <unordered_map>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/** The holder class for running background shell processes.
|
||||
*/
|
||||
class BackgroundShellCommandHolder final
|
||||
{
|
||||
public:
|
||||
void removeCommand(pid_t pid);
|
||||
void addCommand(std::unique_ptr<ShellCommand> command);
|
||||
|
||||
private:
|
||||
using ActiveShellCommandsCollection = std::unordered_map<pid_t, std::unique_ptr<ShellCommand>>;
|
||||
|
||||
std::mutex mutex;
|
||||
ActiveShellCommandsCollection active_shell_commands TSA_GUARDED_BY(mutex);
|
||||
|
||||
static LoggerPtr getLogger();
|
||||
};
|
||||
|
||||
}
|
@ -25,7 +25,6 @@ namespace
|
||||
CANNOT_EXEC = 0x55555558,
|
||||
CANNOT_DUP_READ_DESCRIPTOR = 0x55555559,
|
||||
CANNOT_DUP_WRITE_DESCRIPTOR = 0x55555560,
|
||||
CANNOT_VFORK_IN_CHILD = 0x55555561,
|
||||
};
|
||||
}
|
||||
|
||||
@ -152,10 +151,6 @@ std::unique_ptr<ShellCommand> ShellCommand::executeImpl(
|
||||
PipeFDs pipe_stdout;
|
||||
PipeFDs pipe_stderr;
|
||||
|
||||
PipeFDs pipe_with_child;
|
||||
WriteBufferFromFile write_buffer_for_child(pipe_with_child.fds_rw[1]);
|
||||
ReadBufferFromFile read_buffer_for_parent(pipe_with_child.fds_rw[0]);
|
||||
|
||||
std::vector<std::unique_ptr<PipeFDs>> read_pipe_fds;
|
||||
std::vector<std::unique_ptr<PipeFDs>> write_pipe_fds;
|
||||
|
||||
@ -165,93 +160,64 @@ std::unique_ptr<ShellCommand> ShellCommand::executeImpl(
|
||||
for (size_t i = 0; i < config.write_fds.size(); ++i)
|
||||
write_pipe_fds.emplace_back(std::make_unique<PipeFDs>());
|
||||
|
||||
pid_t child_pid = reinterpret_cast<pid_t(*)()>(real_vfork)();
|
||||
pid_t pid = reinterpret_cast<pid_t(*)()>(real_vfork)();
|
||||
|
||||
if (child_pid == -1)
|
||||
if (pid == -1)
|
||||
throw ErrnoException(ErrorCodes::CANNOT_FORK, "Cannot vfork");
|
||||
|
||||
/// Here we are using double vfork technique to prevent zombie process.
|
||||
if (0 == child_pid)
|
||||
if (0 == pid)
|
||||
{
|
||||
pid_t pid = reinterpret_cast<pid_t(*)()>(real_vfork)();
|
||||
/// We are in the freshly created process.
|
||||
|
||||
if (pid == -1)
|
||||
_exit(static_cast<int>(ReturnCodes::CANNOT_VFORK_IN_CHILD));
|
||||
/// Why `_exit` and not `exit`? Because `exit` calls `atexit` and destructors of thread local storage.
|
||||
/// And there is a lot of garbage (including, for example, mutex is blocked). And this can not be done after `vfork` - deadlock happens.
|
||||
|
||||
if (pid == 0)
|
||||
/// Replace the file descriptors with the ends of our pipes.
|
||||
if (STDIN_FILENO != dup2(pipe_stdin.fds_rw[0], STDIN_FILENO))
|
||||
_exit(static_cast<int>(ReturnCodes::CANNOT_DUP_STDIN));
|
||||
|
||||
if (!config.pipe_stdin_only)
|
||||
{
|
||||
/// We are in the freshly created process.
|
||||
if (STDOUT_FILENO != dup2(pipe_stdout.fds_rw[1], STDOUT_FILENO))
|
||||
_exit(static_cast<int>(ReturnCodes::CANNOT_DUP_STDOUT));
|
||||
|
||||
/// Why `_exit` and not `exit`? Because `exit` calls `atexit` and destructors of thread local storage.
|
||||
/// And there is a lot of garbage (including, for example, mutex is blocked). And this can not be done after `vfork` - deadlock happens.
|
||||
|
||||
/// Replace the file descriptors with the ends of our pipes.
|
||||
if (STDIN_FILENO != dup2(pipe_stdin.fds_rw[0], STDIN_FILENO))
|
||||
_exit(static_cast<int>(ReturnCodes::CANNOT_DUP_STDIN));
|
||||
|
||||
if (!config.pipe_stdin_only)
|
||||
{
|
||||
if (STDOUT_FILENO != dup2(pipe_stdout.fds_rw[1], STDOUT_FILENO))
|
||||
_exit(static_cast<int>(ReturnCodes::CANNOT_DUP_STDOUT));
|
||||
|
||||
if (STDERR_FILENO != dup2(pipe_stderr.fds_rw[1], STDERR_FILENO))
|
||||
_exit(static_cast<int>(ReturnCodes::CANNOT_DUP_STDERR));
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < config.read_fds.size(); ++i)
|
||||
{
|
||||
auto & fds = *read_pipe_fds[i];
|
||||
auto fd = config.read_fds[i];
|
||||
|
||||
if (fd != dup2(fds.fds_rw[1], fd))
|
||||
_exit(static_cast<int>(ReturnCodes::CANNOT_DUP_READ_DESCRIPTOR));
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < config.write_fds.size(); ++i)
|
||||
{
|
||||
auto & fds = *write_pipe_fds[i];
|
||||
auto fd = config.write_fds[i];
|
||||
|
||||
if (fd != dup2(fds.fds_rw[0], fd))
|
||||
_exit(static_cast<int>(ReturnCodes::CANNOT_DUP_WRITE_DESCRIPTOR));
|
||||
}
|
||||
|
||||
// Reset the signal mask: it may be non-empty and will be inherited
|
||||
// by the child process, which might not expect this.
|
||||
sigset_t mask;
|
||||
sigemptyset(&mask);
|
||||
sigprocmask(0, nullptr, &mask); // NOLINT(concurrency-mt-unsafe)
|
||||
sigprocmask(SIG_UNBLOCK, &mask, nullptr); // NOLINT(concurrency-mt-unsafe)
|
||||
|
||||
execv(filename, argv);
|
||||
/// If the process is running, then `execv` does not return here.
|
||||
|
||||
_exit(static_cast<int>(ReturnCodes::CANNOT_EXEC));
|
||||
if (STDERR_FILENO != dup2(pipe_stderr.fds_rw[1], STDERR_FILENO))
|
||||
_exit(static_cast<int>(ReturnCodes::CANNOT_DUP_STDERR));
|
||||
}
|
||||
else
|
||||
|
||||
for (size_t i = 0; i < config.read_fds.size(); ++i)
|
||||
{
|
||||
DB::writeIntBinary(pid, write_buffer_for_child);
|
||||
write_buffer_for_child.next();
|
||||
_exit(0);
|
||||
auto & fds = *read_pipe_fds[i];
|
||||
auto fd = config.read_fds[i];
|
||||
|
||||
if (fd != dup2(fds.fds_rw[1], fd))
|
||||
_exit(static_cast<int>(ReturnCodes::CANNOT_DUP_READ_DESCRIPTOR));
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < config.write_fds.size(); ++i)
|
||||
{
|
||||
auto & fds = *write_pipe_fds[i];
|
||||
auto fd = config.write_fds[i];
|
||||
|
||||
if (fd != dup2(fds.fds_rw[0], fd))
|
||||
_exit(static_cast<int>(ReturnCodes::CANNOT_DUP_WRITE_DESCRIPTOR));
|
||||
}
|
||||
|
||||
// Reset the signal mask: it may be non-empty and will be inherited
|
||||
// by the child process, which might not expect this.
|
||||
sigset_t mask;
|
||||
sigemptyset(&mask);
|
||||
sigprocmask(0, nullptr, &mask); // NOLINT(concurrency-mt-unsafe)
|
||||
sigprocmask(SIG_UNBLOCK, &mask, nullptr); // NOLINT(concurrency-mt-unsafe)
|
||||
|
||||
execv(filename, argv);
|
||||
/// If the process is running, then `execv` does not return here.
|
||||
|
||||
_exit(static_cast<int>(ReturnCodes::CANNOT_EXEC));
|
||||
}
|
||||
else
|
||||
{
|
||||
int status = 0;
|
||||
while (waitpid(child_pid, &status, 0) < 0)
|
||||
{
|
||||
if (errno != EINTR)
|
||||
throw ErrnoException(ErrorCodes::CANNOT_WAITPID, "Cannot waitpid");
|
||||
}
|
||||
int return_code = handleWaitStatus(child_pid, status);
|
||||
handleProceesReturnCode(child_pid, return_code);
|
||||
}
|
||||
|
||||
pid_t grandchild_pid = 0;
|
||||
DB::readIntBinary(grandchild_pid, read_buffer_for_parent);
|
||||
|
||||
std::unique_ptr<ShellCommand> res(new ShellCommand(
|
||||
grandchild_pid,
|
||||
pid,
|
||||
pipe_stdin.fds_rw[1],
|
||||
pipe_stdout.fds_rw[0],
|
||||
pipe_stderr.fds_rw[0],
|
||||
@ -275,7 +241,7 @@ std::unique_ptr<ShellCommand> ShellCommand::executeImpl(
|
||||
getLogger(),
|
||||
"Started shell command '{}' with pid {} and file descriptors: out {}, err {}",
|
||||
filename,
|
||||
grandchild_pid,
|
||||
pid,
|
||||
res->out.getFD(),
|
||||
res->err.getFD());
|
||||
|
||||
@ -351,53 +317,44 @@ int ShellCommand::tryWait()
|
||||
|
||||
LOG_TRACE(getLogger(), "Wait for shell command pid {} completed with status {}", pid, status);
|
||||
|
||||
return handleWaitStatus(pid, status);
|
||||
}
|
||||
|
||||
void ShellCommand::wait()
|
||||
{
|
||||
int retcode = tryWait();
|
||||
handleProceesReturnCode(pid, retcode);
|
||||
}
|
||||
|
||||
int ShellCommand::handleWaitStatus(pid_t pid, int status)
|
||||
{
|
||||
if (WIFEXITED(status))
|
||||
return WEXITSTATUS(status);
|
||||
|
||||
if (WIFSIGNALED(status))
|
||||
throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process {} was terminated by signal {}", pid, toString(WTERMSIG(status)));
|
||||
throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process was terminated by signal {}", toString(WTERMSIG(status)));
|
||||
|
||||
if (WIFSTOPPED(status))
|
||||
throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process {} was stopped by signal {}", pid, toString(WSTOPSIG(status)));
|
||||
throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process was stopped by signal {}", toString(WSTOPSIG(status)));
|
||||
|
||||
throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process {} was not exited normally by unknown reason", pid);
|
||||
throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process was not exited normally by unknown reason");
|
||||
}
|
||||
|
||||
void ShellCommand::handleProceesReturnCode(pid_t pid, int retcode)
|
||||
|
||||
void ShellCommand::wait()
|
||||
{
|
||||
int retcode = tryWait();
|
||||
|
||||
if (retcode != EXIT_SUCCESS)
|
||||
{
|
||||
switch (retcode)
|
||||
{
|
||||
case static_cast<int>(ReturnCodes::CANNOT_DUP_STDIN):
|
||||
throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 stdin of child process: {}", pid);
|
||||
throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 stdin of child process");
|
||||
case static_cast<int>(ReturnCodes::CANNOT_DUP_STDOUT):
|
||||
throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 stdout of child process {}", pid);
|
||||
throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 stdout of child process");
|
||||
case static_cast<int>(ReturnCodes::CANNOT_DUP_STDERR):
|
||||
throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 stderr of child process {}", pid);
|
||||
throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 stderr of child process");
|
||||
case static_cast<int>(ReturnCodes::CANNOT_EXEC):
|
||||
throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot execv in child process {}", pid);
|
||||
throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot execv in child process");
|
||||
case static_cast<int>(ReturnCodes::CANNOT_DUP_READ_DESCRIPTOR):
|
||||
throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 read descriptor of child process {}", pid);
|
||||
throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 read descriptor of child process");
|
||||
case static_cast<int>(ReturnCodes::CANNOT_DUP_WRITE_DESCRIPTOR):
|
||||
throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 write descriptor of child process {}", pid);
|
||||
case static_cast<int>(ReturnCodes::CANNOT_VFORK_IN_CHILD):
|
||||
throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot vfork in child procces {}", pid);
|
||||
throw Exception(ErrorCodes::CANNOT_CREATE_CHILD_PROCESS, "Cannot dup2 write descriptor of child process");
|
||||
default:
|
||||
throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process {} was exited with return code {}", pid, toString(retcode));
|
||||
throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process was exited with return code {}", toString(retcode));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -67,6 +67,11 @@ public:
|
||||
DestructorStrategy terminate_in_destructor_strategy = DestructorStrategy(false, 0);
|
||||
};
|
||||
|
||||
pid_t getPid()
|
||||
{
|
||||
return pid;
|
||||
}
|
||||
|
||||
/// Run the command using /bin/sh -c.
|
||||
/// If terminate_in_destructor is true, send terminate signal in destructor and don't wait process.
|
||||
static std::unique_ptr<ShellCommand> execute(const Config & config);
|
||||
@ -99,9 +104,6 @@ private:
|
||||
|
||||
static LoggerPtr getLogger();
|
||||
|
||||
static int handleWaitStatus(pid_t pid, int status);
|
||||
static void handleProceesReturnCode(pid_t pid, int exit_code);
|
||||
|
||||
/// Print command name and the list of arguments to log. NOTE: No escaping of arguments is performed.
|
||||
static void logCommand(const char * filename, char * const argv[]);
|
||||
|
||||
|
@ -68,6 +68,20 @@ void terminateRequestedSignalHandler(int sig, siginfo_t *, void *)
|
||||
writeSignalIDtoSignalPipe(sig);
|
||||
}
|
||||
|
||||
void childSignalHandler(int sig, siginfo_t * info, void *)
|
||||
{
|
||||
DENY_ALLOCATIONS_IN_SCOPE;
|
||||
auto saved_errno = errno; /// We must restore previous value of errno in signal handler.
|
||||
|
||||
char buf[signal_pipe_buf_size];
|
||||
auto & signal_pipe = HandledSignals::instance().signal_pipe;
|
||||
WriteBufferFromFileDescriptor out(signal_pipe.fds_rw[1], signal_pipe_buf_size, buf);
|
||||
writeBinary(sig, out);
|
||||
writeBinary(info->si_pid, out);
|
||||
|
||||
out.next();
|
||||
errno = saved_errno;
|
||||
}
|
||||
|
||||
void signalHandler(int sig, siginfo_t * info, void * context)
|
||||
{
|
||||
@ -296,6 +310,12 @@ void SignalListener::run()
|
||||
if (daemon)
|
||||
daemon->handleSignal(sig);
|
||||
}
|
||||
else if (sig == SIGCHLD)
|
||||
{
|
||||
pid_t child_pid = 0;
|
||||
readBinary(child_pid, in);
|
||||
Context::getGlobalContextInstance()->terminateBackgroundShellCommand(child_pid);
|
||||
}
|
||||
else
|
||||
{
|
||||
siginfo_t info{};
|
||||
|
@ -33,6 +33,7 @@ void closeLogsSignalHandler(int sig, siginfo_t *, void *);
|
||||
|
||||
void terminateRequestedSignalHandler(int sig, siginfo_t *, void *);
|
||||
|
||||
void childSignalHandler(int sig, siginfo_t * info, void *);
|
||||
|
||||
/** Handler for "fault" or diagnostic signals. Send data about fault to separate thread to write into log.
|
||||
*/
|
||||
|
@ -440,6 +440,7 @@ void BaseDaemon::initializeTerminationAndSignalProcessing()
|
||||
HandledSignals::instance().setupCommonDeadlySignalHandlers();
|
||||
HandledSignals::instance().setupCommonTerminateRequestSignalHandlers();
|
||||
HandledSignals::instance().addSignalHandler({SIGHUP}, closeLogsSignalHandler, true);
|
||||
HandledSignals::instance().addSignalHandler({SIGCHLD}, childSignalHandler, true);
|
||||
|
||||
/// Set up Poco ErrorHandler for Poco Threads.
|
||||
static KillingErrorHandler killing_error_handler;
|
||||
|
@ -99,6 +99,7 @@
|
||||
#include <Common/Config/AbstractConfigurationComparison.h>
|
||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||
#include <Common/ShellCommand.h>
|
||||
#include <Common/BackgroundShellCommandHolder.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/RemoteHostFilter.h>
|
||||
#include <Common/HTTPHeaderFilter.h>
|
||||
@ -524,8 +525,9 @@ struct ContextSharedPart : boost::noncopyable
|
||||
/// No lock required for application_type modified only during initialization
|
||||
Context::ApplicationType application_type = Context::ApplicationType::SERVER;
|
||||
|
||||
/// vector of xdbc-bridge commands, they will be killed when Context will be destroyed
|
||||
std::vector<std::unique_ptr<ShellCommand>> bridge_commands TSA_GUARDED_BY(mutex);
|
||||
/// Manager of running background shell commands.
|
||||
/// They will be killed when Context will be destroyed or with SIGCHLD signal.
|
||||
BackgroundShellCommandHolder background_active_shell_commands;
|
||||
|
||||
/// No lock required for config_reload_callback, start_servers_callback, stop_servers_callback modified only during initialization
|
||||
Context::ConfigReloadCallback config_reload_callback;
|
||||
@ -5016,10 +5018,14 @@ void Context::addQueryParameters(const NameToNameMap & parameters)
|
||||
query_parameters.insert_or_assign(name, value);
|
||||
}
|
||||
|
||||
void Context::addBridgeCommand(std::unique_ptr<ShellCommand> cmd) const
|
||||
void Context::addBackgroundShellCommand(std::unique_ptr<ShellCommand> cmd) const
|
||||
{
|
||||
std::lock_guard lock(shared->mutex);
|
||||
shared->bridge_commands.emplace_back(std::move(cmd));
|
||||
shared->background_active_shell_commands.addCommand(std::move(cmd));
|
||||
}
|
||||
|
||||
void Context::terminateBackgroundShellCommand(pid_t pid) const
|
||||
{
|
||||
shared->background_active_shell_commands.removeCommand(pid);
|
||||
}
|
||||
|
||||
|
||||
|
@ -1284,8 +1284,11 @@ public:
|
||||
/// Overrides values of existing parameters.
|
||||
void addQueryParameters(const NameToNameMap & parameters);
|
||||
|
||||
/// Add started bridge command. It will be killed after context destruction
|
||||
void addBridgeCommand(std::unique_ptr<ShellCommand> cmd) const;
|
||||
/// Add background shell command. It will be killed after context destruction or with SIGCHLD.
|
||||
void addBackgroundShellCommand(std::unique_ptr<ShellCommand> cmd) const;
|
||||
|
||||
/// Terminate background shell command.
|
||||
void terminateBackgroundShellCommand(pid_t pid) const;
|
||||
|
||||
IHostContextPtr & getHostContext();
|
||||
const IHostContextPtr & getHostContext() const;
|
||||
|
@ -39,6 +39,13 @@ def create_dict_simple(ch_instance):
|
||||
)
|
||||
|
||||
|
||||
def check_no_zombie_processes(instance):
|
||||
res = instance.exec_in_container(
|
||||
["bash", "-c", "ps ax -ostat,pid | grep -e '[zZ]' | wc -l"], user="root"
|
||||
)
|
||||
assert res == "0\n"
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def ch_cluster():
|
||||
try:
|
||||
@ -268,12 +275,8 @@ def test_recover_after_bridge_crash(ch_cluster):
|
||||
instance.exec_in_container(
|
||||
["bash", "-c", "kill -9 `pidof clickhouse-library-bridge`"], user="root"
|
||||
)
|
||||
## There are no zombie processes.
|
||||
res = instance.exec_in_container(
|
||||
["bash", "-c", "ps ax -ostat,pid | grep -e '[zZ]' | wc -l"], user="root"
|
||||
)
|
||||
assert res == "0\n"
|
||||
|
||||
check_no_zombie_processes(instance)
|
||||
instance.query("DROP DICTIONARY lib_dict_c")
|
||||
|
||||
|
||||
@ -299,6 +302,8 @@ def test_server_restart_bridge_might_be_stil_alive(ch_cluster):
|
||||
result = instance.query("""select dictGet(lib_dict_c, 'value1', toUInt64(1));""")
|
||||
assert result.strip() == "101"
|
||||
|
||||
check_no_zombie_processes(instance)
|
||||
|
||||
instance.query("DROP DICTIONARY lib_dict_c")
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user