mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
More review
This commit is contained in:
parent
39130c5ded
commit
6edbd61e05
@ -61,6 +61,9 @@ LoggerPtr ShellCommand::getLogger()
|
||||
|
||||
ShellCommand::~ShellCommand()
|
||||
{
|
||||
if (is_manualy_terminated)
|
||||
return;
|
||||
|
||||
if (wait_called)
|
||||
return;
|
||||
|
||||
@ -291,11 +294,45 @@ std::unique_ptr<ShellCommand> ShellCommand::executeDirect(const ShellCommand::Co
|
||||
return executeImpl(path.data(), argv.data(), config);
|
||||
}
|
||||
|
||||
struct ShellCommand::tryWaitResult
|
||||
{
|
||||
bool is_process_terminated = false;
|
||||
int retcode = -1;
|
||||
};
|
||||
|
||||
int ShellCommand::tryWait()
|
||||
{
|
||||
return tryWaitImpl(false).retcode;
|
||||
}
|
||||
|
||||
ShellCommand::tryWaitResult ShellCommand::tryWaitImpl(bool blocking)
|
||||
{
|
||||
LOG_TRACE(getLogger(), "Will wait for shell command pid {}", pid);
|
||||
|
||||
ShellCommand::tryWaitResult result;
|
||||
|
||||
int options = ((blocking) ? WNOHANG : 0);
|
||||
int status = 0;
|
||||
int waitpid_retcode = -1;
|
||||
|
||||
while (waitpid_retcode < 0)
|
||||
{
|
||||
waitpid_retcode = waitpid(pid, &status, options);
|
||||
|
||||
if (blocking && !waitpid_retcode)
|
||||
{
|
||||
result.is_process_terminated = false;
|
||||
return result;
|
||||
}
|
||||
if (errno != EINTR)
|
||||
throw ErrnoException(ErrorCodes::CANNOT_WAITPID, "Cannot waitpid");
|
||||
}
|
||||
|
||||
LOG_TRACE(getLogger(), "Wait for shell command pid {} completed with status {}", pid, status);
|
||||
|
||||
wait_called = true;
|
||||
|
||||
result.is_process_terminated = true;
|
||||
in.close();
|
||||
out.close();
|
||||
err.close();
|
||||
@ -306,19 +343,11 @@ int ShellCommand::tryWait()
|
||||
for (auto & [_, fd] : read_fds)
|
||||
fd.close();
|
||||
|
||||
LOG_TRACE(getLogger(), "Will wait for shell command pid {}", pid);
|
||||
|
||||
int status = 0;
|
||||
while (waitpid(pid, &status, 0) < 0)
|
||||
{
|
||||
if (errno != EINTR)
|
||||
throw ErrnoException(ErrorCodes::CANNOT_WAITPID, "Cannot waitpid");
|
||||
}
|
||||
|
||||
LOG_TRACE(getLogger(), "Wait for shell command pid {} completed with status {}", pid, status);
|
||||
|
||||
if (WIFEXITED(status))
|
||||
return WEXITSTATUS(status);
|
||||
{
|
||||
result.retcode = WEXITSTATUS(status);
|
||||
return result;
|
||||
}
|
||||
|
||||
if (WIFSIGNALED(status))
|
||||
throw Exception(ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY, "Child process was terminated by signal {}", toString(WTERMSIG(status)));
|
||||
@ -330,10 +359,8 @@ int ShellCommand::tryWait()
|
||||
}
|
||||
|
||||
|
||||
void ShellCommand::wait()
|
||||
void ShellCommand::handleProcessRetcode(int retcode) const
|
||||
{
|
||||
int retcode = tryWait();
|
||||
|
||||
if (retcode != EXIT_SUCCESS)
|
||||
{
|
||||
switch (retcode)
|
||||
@ -356,5 +383,22 @@ void ShellCommand::wait()
|
||||
}
|
||||
}
|
||||
|
||||
bool ShellCommand::waitIfProccesTerminated()
|
||||
{
|
||||
auto proc_status = tryWaitImpl(true);
|
||||
if (proc_status.is_process_terminated)
|
||||
{
|
||||
handleProcessRetcode(proc_status.retcode);
|
||||
}
|
||||
return proc_status.is_process_terminated;
|
||||
}
|
||||
|
||||
|
||||
void ShellCommand::wait()
|
||||
{
|
||||
int retcode = tryWaitImpl(false).retcode;
|
||||
handleProcessRetcode(retcode);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -72,6 +72,16 @@ public:
|
||||
return pid;
|
||||
}
|
||||
|
||||
bool isWaitCalled() const
|
||||
{
|
||||
return wait_called;
|
||||
}
|
||||
|
||||
void setManuallyTerminated()
|
||||
{
|
||||
is_manualy_terminated = true;
|
||||
}
|
||||
|
||||
/// Run the command using /bin/sh -c.
|
||||
/// If terminate_in_destructor is true, send terminate signal in destructor and don't wait process.
|
||||
static std::unique_ptr<ShellCommand> execute(const Config & config);
|
||||
@ -86,6 +96,10 @@ public:
|
||||
/// Wait for the process to finish, see the return code. To throw an exception if the process was not completed independently.
|
||||
int tryWait();
|
||||
|
||||
/// Returns if process terminated.
|
||||
/// If process terminated, then handle return code.
|
||||
bool waitIfProccesTerminated();
|
||||
|
||||
WriteBufferFromFile in; /// If the command reads from stdin, do not forget to call in.close() after writing all the data there.
|
||||
ReadBufferFromFile out;
|
||||
ReadBufferFromFile err;
|
||||
@ -97,10 +111,16 @@ private:
|
||||
pid_t pid;
|
||||
Config config;
|
||||
bool wait_called = false;
|
||||
bool is_manualy_terminated = false;
|
||||
|
||||
ShellCommand(pid_t pid_, int & in_fd_, int & out_fd_, int & err_fd_, const Config & config);
|
||||
|
||||
bool tryWaitProcessWithTimeout(size_t timeout_in_seconds);
|
||||
struct tryWaitResult;
|
||||
|
||||
tryWaitResult tryWaitImpl(bool blocking);
|
||||
|
||||
void handleProcessRetcode(int retcode) const;
|
||||
|
||||
static LoggerPtr getLogger();
|
||||
|
||||
|
@ -27,11 +27,32 @@ void ShellCommandsHolder::addCommand(std::unique_ptr<ShellCommand> command)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
pid_t command_pid = command->getPid();
|
||||
if (command->waitIfProccesTerminated())
|
||||
{
|
||||
LOG_TRACE(getLogger(), "Pid {} already finished. Do not insert it.", command_pid);
|
||||
return;
|
||||
}
|
||||
|
||||
auto [iterator, is_inserted] = shell_commands.emplace(std::make_pair(command_pid, std::move(command)));
|
||||
if (!is_inserted)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't insert process PID {} into active shell commands, because there are running process with same PID", command_pid);
|
||||
|
||||
auto [iterator, is_inserted] = shell_commands.try_emplace(command_pid, std::move(command));
|
||||
if (is_inserted)
|
||||
{
|
||||
LOG_TRACE(getLogger(), "Inserted the command with pid {}", command_pid);
|
||||
return;
|
||||
}
|
||||
|
||||
if (iterator->second->isWaitCalled())
|
||||
{
|
||||
iterator->second = std::move(command);
|
||||
LOG_TRACE(getLogger(), "Replaced the command with pid {}", command_pid);
|
||||
return;
|
||||
}
|
||||
|
||||
/// We got two active ShellCommand with the same pid.
|
||||
/// Probably it is a bug, will try to replace the old shell command with a new one.
|
||||
|
||||
LOG_WARNING(getLogger(), "The PID already presented in active shell commands, will try to replace with a new one.");
|
||||
|
||||
iterator->second->setManuallyTerminated();
|
||||
iterator->second = std::move(command);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user