dbms: preparation [#METR-16212].

This commit is contained in:
Alexey Milovidov 2015-12-13 11:51:28 +03:00
parent 7f74a32c80
commit 42480c31ef
8 changed files with 371 additions and 7 deletions

View File

@ -0,0 +1,55 @@
#pragma once
#include <memory>
#include <DB/IO/ReadBufferFromFile.h>
#include <DB/IO/WriteBufferFromFile.h>
namespace DB
{
/** Позволяет запустить команду,
* читать её stdout, stderr, писать в stdin,
* дождаться завершения.
*
* Реализация похожа на функцию popen из POSIX (посмотреть можно в исходниках libc).
*
* Наиболее важное отличие: использует vfork вместо fork.
* Это сделано, потому что fork не работает (с ошибкой о нехватке памяти),
* при некоторых настройках overcommit-а, если размер адресного пространства процесса больше половины количества доступной памяти.
* Также, изменение memory map-ов - довольно ресурсоёмкая операция.
*
* Второе отличие - позволяет работать одновременно и с stdin, и с stdout, и с stderr запущенного процесса,
* а также узнать код и статус завершения.
*/
class ShellCommand
{
private:
pid_t pid;
ShellCommand(pid_t pid, int in_fd, int out_fd, int err_fd)
: pid(pid), in(in_fd), out(out_fd), err(err_fd) {};
static std::unique_ptr<ShellCommand> executeImpl(const char * filename, char * const argv[], char * const envp[]);
public:
WriteBufferFromFile in; /// Если команда читает из stdin, то не забудьте вызвать in.close() после записи туда всех данных.
ReadBufferFromFile out;
ReadBufferFromFile err;
/// Выполнить команду с использованием /bin/sh -c
static std::unique_ptr<ShellCommand> execute(const std::string & command);
/// Выполнить исполняемый файл с указаннами аргументами. arguments - без argv[0].
static std::unique_ptr<ShellCommand> executeDirect(const std::string & path, const std::vector<std::string> & arguments);
/// Подождать завершения процесса, кинуть исключение, если код не 0 или если процесс был завершён не самостоятельно.
void wait();
/// Подождать завершения процесса, узнать код возврата. Кинуть исключение, если процесс был завершён не самостоятельно.
int tryWait();
};
}

View File

@ -299,6 +299,13 @@ namespace ErrorCodes
RECEIVED_EMPTY_DATA = 295,
NO_REMOTE_SHARD_FOUND = 296,
SHARD_HAS_NO_CONNECTIONS = 297,
CANNOT_PIPE = 298,
CANNOT_FORK = 299,
CANNOT_DLSYM = 300,
CANNOT_CREATE_CHILD_PROCESS = 301,
CHILD_WAS_NOT_EXITED_NORMALLY = 302,
CANNOT_SELECT = 303,
CANNOT_WAITPID = 304,
KEEPER_EXCEPTION = 999,
POCO_EXCEPTION = 1000,

View File

@ -14,7 +14,7 @@ class ReadBufferFromFile : public ReadBufferFromFileDescriptor
{
private:
std::string file_name;
public:
ReadBufferFromFile(const std::string & file_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1,
char * existing_memory = nullptr, size_t alignment = 0)
@ -28,9 +28,28 @@ public:
throwFromErrno("Cannot open file " + file_name, errno == ENOENT ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE);
}
/// Использовать уже открытый файл.
ReadBufferFromFile(int fd, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1,
char * existing_memory = nullptr, size_t alignment = 0)
: ReadBufferFromFileDescriptor(fd, buf_size, existing_memory, alignment), file_name("(fd = " + toString(fd) + ")")
{
}
virtual ~ReadBufferFromFile()
{
close(fd);
if (fd < 0)
return;
::close(fd);
}
/// Закрыть файл раньше вызова деструктора.
void close()
{
if (0 != ::close(fd))
throw Exception("Cannot close file", ErrorCodes::CANNOT_CLOSE_FILE);
fd = -1;
}
virtual std::string getFileName()

View File

@ -127,6 +127,23 @@ private:
return res;
}
}
/// При условии, что файловый дескриптор позволяет использовать select, проверяет в течение таймаута, есть ли данные для чтения.
bool poll(size_t timeout_microseconds)
{
fd_set fds;
FD_ZERO(&fds);
FD_SET(fd, &fds);
timeval timeout = { time_t(timeout_microseconds / 1000000), time_t(timeout_microseconds % 1000000) };
int res = select(1, &fds, 0, 0, &timeout);
if (-1 == res)
throwFromErrno("Cannot select", ErrorCodes::CANNOT_SELECT);
return res > 0;
}
};
}

View File

@ -18,7 +18,7 @@ class WriteBufferFromFile : public WriteBufferFromFileDescriptor
{
private:
std::string file_name;
public:
WriteBufferFromFile(const std::string & file_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1, mode_t mode = 0666,
char * existing_memory = nullptr, size_t alignment = 0)
@ -27,13 +27,23 @@ public:
ProfileEvents::increment(ProfileEvents::FileOpen);
fd = open(file_name.c_str(), flags == -1 ? O_WRONLY | O_TRUNC | O_CREAT : flags, mode);
if (-1 == fd)
throwFromErrno("Cannot open file " + file_name, errno == ENOENT ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE);
}
/// Использовать уже открытый файл.
WriteBufferFromFile(int fd, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1, mode_t mode = 0666,
char * existing_memory = nullptr, size_t alignment = 0)
: WriteBufferFromFileDescriptor(fd, buf_size, existing_memory, alignment), file_name("(fd = " + toString(fd) + ")")
{
}
~WriteBufferFromFile()
{
if (fd < 0)
return;
try
{
next();
@ -43,9 +53,20 @@ public:
tryLogCurrentException(__PRETTY_FUNCTION__);
}
close(fd);
::close(fd);
}
/// Закрыть файл раньше вызова деструктора.
void close()
{
next();
if (0 != ::close(fd))
throw Exception("Cannot close file", ErrorCodes::CANNOT_CLOSE_FILE);
fd = -1;
}
/** fsync() transfers ("flushes") all modified in-core data of (i.e., modified buffer cache pages for) the file
* referred to by the file descriptor fd to the disk device (or other permanent storage device)
* so that all changed information can be retrieved even after the system crashed or was rebooted.

View File

@ -67,7 +67,8 @@ public:
{
try
{
next();
if (fd >= 0)
next();
}
catch (...)
{

View File

@ -0,0 +1,195 @@
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <fcntl.h>
#include <dlfcn.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/Common/Exception.h>
#include <DB/Common/ShellCommand.h>
#include <DB/IO/WriteBufferFromVector.h>
namespace
{
struct Pipe
{
union
{
int fds[2];
struct
{
int read_fd;
int write_fd;
};
};
Pipe()
{
if (0 != pipe2(fds, O_CLOEXEC))
DB::throwFromErrno("Cannot create pipe", DB::ErrorCodes::CANNOT_PIPE);
}
~Pipe()
{
if (read_fd >= 0)
close(read_fd);
if (write_fd >= 0)
close(write_fd);
}
};
/// По этим кодам возврата из дочернего процесса мы узнаем (наверняка) об ошибках при его создании.
enum class ReturnCodes : int
{
CANNOT_DUP_STDIN = 42, /// Значение не принципиально, но выбрано так, чтобы редко конфликтовать с кодом возврата программы.
CANNOT_DUP_STDOUT = 43,
CANNOT_DUP_STDERR = 44,
CANNOT_EXEC = 45,
};
}
namespace DB
{
std::unique_ptr<ShellCommand> ShellCommand::executeImpl(const char * filename, char * const argv[], char * const envp[])
{
/** Тут написано, что при обычном вызове vfork, есть шанс deadlock-а в многопоточных программах,
* из-за резолвинга символов в shared-библиотеке:
* http://www.oracle.com/technetwork/server-storage/solaris10/subprocess-136439.html
* Поэтому, отделим резолвинг символа от вызова.
*/
static void * real_vfork = dlsym(RTLD_DEFAULT, "vfork");
if (!real_vfork)
throwFromErrno("Cannot find symbol vfork in myself", ErrorCodes::CANNOT_DLSYM);
Pipe pipe_stdin;
Pipe pipe_stdout;
Pipe pipe_stderr;
pid_t pid = reinterpret_cast<pid_t(*)()>(real_vfork)();
if (-1 == pid)
throwFromErrno("Cannot vfork", ErrorCodes::CANNOT_FORK);
if (0 == pid)
{
/// Находимся в свежесозданном процессе.
/// Почему _exit а не exit? Потому что exit вызывает atexit и деструкторы thread local storage.
/// А там куча мусора (в том числе, например, блокируется mutex). А это нельзя делать после vfork - происходит deadlock.
/// Заменяем файловые дескрипторы на концы наших пайпов.
if (STDIN_FILENO != dup2(pipe_stdin.read_fd, STDIN_FILENO))
_exit(int(ReturnCodes::CANNOT_DUP_STDIN));
if (STDOUT_FILENO != dup2(pipe_stdout.write_fd, STDOUT_FILENO))
_exit(int(ReturnCodes::CANNOT_DUP_STDOUT));
if (STDERR_FILENO != dup2(pipe_stderr.write_fd, STDERR_FILENO))
_exit(int(ReturnCodes::CANNOT_DUP_STDERR));
execve(filename, argv, envp);
/// Если процесс запущен, то execve не возвращает сюда.
_exit(int(ReturnCodes::CANNOT_EXEC));
}
std::unique_ptr<ShellCommand> res(new ShellCommand(pid, pipe_stdin.write_fd, pipe_stdout.read_fd, pipe_stderr.read_fd));
/// Теперь владение файловыми дескрипторами передано в результат.
pipe_stdin.write_fd = -1;
pipe_stdout.read_fd = -1;
pipe_stderr.read_fd = -1;
return res;
}
std::unique_ptr<ShellCommand> ShellCommand::execute(const std::string & command)
{
/// Аргументы в неконстантных кусках памяти (как требуется для execve).
/// Причём, их копирование должно быть совершено раньше вызова vfork, чтобы после vfork делать минимум вещей.
std::vector<char> argv0("sh", "sh" + strlen("sh") + 1);
std::vector<char> argv1("-c", "-c" + strlen("-c") + 1);
std::vector<char> argv2(command.data(), command.data() + command.size() + 1);
char * const argv[] = { argv0.data(), argv1.data(), argv2.data(), nullptr };
char * const envp[] = { nullptr };
return executeImpl("/bin/sh", argv, envp);
}
std::unique_ptr<ShellCommand> ShellCommand::executeDirect(const std::string & path, const std::vector<std::string> & arguments)
{
size_t argv_sum_size = path.size() + 1;
for (const auto & arg : arguments)
argv_sum_size += arg.size() + 1;
std::vector<char *> argv(arguments.size() + 2);
std::vector<char> argv_data(argv_sum_size);
WriteBuffer writer(argv_data.data(), argv_sum_size);
argv[0] = writer.position();
writer.write(path.data(), path.size() + 1);
for (size_t i = 0, size = arguments.size(); i < size; ++i)
{
argv[i + 1] = writer.position();
writer.write(arguments[i].data(), arguments[i].size() + 1);
}
argv[arguments.size() + 1] = nullptr;
char * const envp[] = { nullptr };
return executeImpl(path.data(), argv.data(), envp);
}
int ShellCommand::tryWait()
{
int status = 0;
if (-1 == waitpid(pid, &status, 0))
throwFromErrno("Cannot waitpid", ErrorCodes::CANNOT_WAITPID);
if (WIFEXITED(status))
return WEXITSTATUS(status);
if (WIFSIGNALED(status))
throw Exception("Child process was terminated by signal " + toString(WTERMSIG(status)), ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY);
if (WIFSTOPPED(status))
throw Exception("Child process was stopped by signal " + toString(WSTOPSIG(status)), ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY);
throw Exception("Child process was not exited normally by unknown reason", ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY);
}
void ShellCommand::wait()
{
int retcode = tryWait();
if (retcode != EXIT_SUCCESS)
{
switch (retcode)
{
case int(ReturnCodes::CANNOT_DUP_STDIN):
throw Exception("Cannot dup2 stdin of child process", ErrorCodes::CANNOT_CREATE_CHILD_PROCESS);
case int(ReturnCodes::CANNOT_DUP_STDOUT):
throw Exception("Cannot dup2 stdout of child process", ErrorCodes::CANNOT_CREATE_CHILD_PROCESS);
case int(ReturnCodes::CANNOT_DUP_STDERR):
throw Exception("Cannot dup2 stderr of child process", ErrorCodes::CANNOT_CREATE_CHILD_PROCESS);
case int(ReturnCodes::CANNOT_EXEC):
throw Exception("Cannot execve in child process", ErrorCodes::CANNOT_CREATE_CHILD_PROCESS);
default:
throw Exception("Child process was exited with return code " + toString(retcode), ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY);
}
}
}
}

View File

@ -0,0 +1,49 @@
#include <iostream>
#include <DB/Common/ShellCommand.h>
#include <DB/IO/copyData.h>
#include <DB/IO/WriteBufferFromFileDescriptor.h>
#include <DB/IO/ReadBufferFromString.h>
using namespace DB;
int main(int arg, char ** argv)
try
{
{
auto command = ShellCommand::execute("echo 'Hello, world!'");
WriteBufferFromFileDescriptor out(STDOUT_FILENO);
copyData(command->out, out);
command->wait();
}
{
auto command = ShellCommand::executeDirect("/bin/echo", {"Hello, world!"});
WriteBufferFromFileDescriptor out(STDOUT_FILENO);
copyData(command->out, out);
command->wait();
}
{
auto command = ShellCommand::execute("cat");
String in_str = "Hello, world!\n";
ReadBufferFromString in(in_str);
copyData(in, command->in);
command->in.close();
WriteBufferFromFileDescriptor out(STDOUT_FILENO);
copyData(command->out, out);
command->wait();
}
}
catch (...)
{
std::cerr << getCurrentExceptionMessage(false) << "\n";
return 1;
}