mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge branch 'master' into persistent_nukeeper_snapshot_storage
This commit is contained in:
commit
f555e65cc8
@ -7,9 +7,9 @@ toc_title: Distinctive Features
|
||||
|
||||
## True Column-Oriented Database Management System {#true-column-oriented-dbms}
|
||||
|
||||
In a true column-oriented DBMS, no extra data is stored with the values. Among other things, this means that constant-length values must be supported, to avoid storing their length “number” next to the values. As an example, a billion UInt8-type values should consume around 1 GB uncompressed, or this strongly affects the CPU use. It is essential to store data compactly (without any “garbage”) even when uncompressed, since the speed of decompression (CPU usage) depends mainly on the volume of uncompressed data.
|
||||
In a real column-oriented DBMS, no extra data is stored with the values. Among other things, this means that constant-length values must be supported, to avoid storing their length “number” next to the values. For example, a billion UInt8-type values should consume around 1 GB uncompressed, or this strongly affects the CPU use. It is essential to store data compactly (without any “garbage”) even when uncompressed since the speed of decompression (CPU usage) depends mainly on the volume of uncompressed data.
|
||||
|
||||
It is worth noting because there are systems that can store values of different columns separately, but that can’t effectively process analytical queries due to their optimization for other scenarios. Examples are HBase, BigTable, Cassandra, and HyperTable. In these systems, you would get throughput around a hundred thousand rows per second, but not hundreds of millions of rows per second.
|
||||
It is worth noting because there are systems that can store values of different columns separately, but that can’t effectively process analytical queries due to their optimization for other scenarios. Examples are HBase, BigTable, Cassandra, and HyperTable. You would get throughput around a hundred thousand rows per second in these systems, but not hundreds of millions of rows per second.
|
||||
|
||||
It’s also worth noting that ClickHouse is a database management system, not a single database. ClickHouse allows creating tables and databases in runtime, loading data, and running queries without reconfiguring and restarting the server.
|
||||
|
||||
|
157
src/Common/BorrowedObjectPool.h
Normal file
157
src/Common/BorrowedObjectPool.h
Normal file
@ -0,0 +1,157 @@
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
#include <vector>
|
||||
#include <chrono>
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
|
||||
#include <common/defines.h>
|
||||
|
||||
#include <Common/MoveOrCopyIfThrow.h>
|
||||
|
||||
/** Pool for limited size objects that cannot be used from different threads simultaneously.
|
||||
* The main use case is to have fixed size of objects that can be reused in difference threads during their lifetime
|
||||
* and have to be initialized on demand.
|
||||
* Two main properties of pool are allocated objects size and borrowed objects size.
|
||||
* Allocated objects size is size of objects that are currently allocated by the pool.
|
||||
* Borrowed objects size is size of objects that are borrowed by clients.
|
||||
* If max_size == 0 then pool has unlimited size and objects will be allocated without limit.
|
||||
*
|
||||
* Pool provides following strategy for borrowing object:
|
||||
* If max_size == 0 then pool has unlimited size and objects will be allocated without limit.
|
||||
* 1. If pool has objects that can be borrowed increase borrowed objects size and return it.
|
||||
* 2. If pool allocatedObjectsSize is lower than max objects size or pool has unlimited size
|
||||
* allocate new object, increase borrowed objects size and return it.
|
||||
* 3. If pool is full wait on condition variable with or without timeout until some object
|
||||
* will be returned to the pool.
|
||||
*/
|
||||
template <typename T>
|
||||
class BorrowedObjectPool final
|
||||
{
|
||||
public:
|
||||
explicit BorrowedObjectPool(size_t max_size_) : max_size(max_size_) {}
|
||||
|
||||
/// Borrow object from pool. If pull is full and all objects were borrowed
|
||||
/// then calling thread will wait until some object will be returned into pool.
|
||||
template <typename FactoryFunc>
|
||||
void borrowObject(T & dest, FactoryFunc && func)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(objects_mutex);
|
||||
|
||||
if (!objects.empty())
|
||||
{
|
||||
dest = borrowFromObjects(lock);
|
||||
return;
|
||||
}
|
||||
|
||||
bool has_unlimited_size = (max_size == 0);
|
||||
|
||||
if (unlikely(has_unlimited_size) || allocated_objects_size < max_size)
|
||||
{
|
||||
dest = allocateObjectForBorrowing(lock, std::forward<FactoryFunc>(func));
|
||||
return;
|
||||
}
|
||||
|
||||
condition_variable.wait(lock, [this] { return !objects.empty(); });
|
||||
dest = borrowFromObjects(lock);
|
||||
}
|
||||
|
||||
/// Same as borrowObject function, but wait with timeout.
|
||||
/// Returns true if object was borrowed during timeout.
|
||||
template <typename FactoryFunc>
|
||||
bool tryBorrowObject(T & dest, FactoryFunc && func, size_t timeout_in_milliseconds = 0)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(objects_mutex);
|
||||
|
||||
if (!objects.empty())
|
||||
{
|
||||
dest = borrowFromObjects(lock);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool has_unlimited_size = (max_size == 0);
|
||||
|
||||
if (unlikely(has_unlimited_size) || allocated_objects_size < max_size)
|
||||
{
|
||||
dest = allocateObjectForBorrowing(lock, std::forward<FactoryFunc>(func));
|
||||
return true;
|
||||
}
|
||||
|
||||
bool wait_result = condition_variable.wait_for(lock, std::chrono::milliseconds(timeout_in_milliseconds), [this] { return !objects.empty(); });
|
||||
|
||||
if (wait_result)
|
||||
dest = borrowFromObjects(lock);
|
||||
|
||||
return wait_result;
|
||||
}
|
||||
|
||||
/// Return object into pool. Client must return same object that was borrowed.
|
||||
inline void returnObject(T && object_to_return)
|
||||
{
|
||||
std::unique_lock<std::mutex> lck(objects_mutex);
|
||||
|
||||
objects.emplace_back(std::move(object_to_return));
|
||||
--borrowed_objects_size;
|
||||
|
||||
condition_variable.notify_one();
|
||||
}
|
||||
|
||||
/// Max pool size
|
||||
inline size_t maxSize() const
|
||||
{
|
||||
return max_size;
|
||||
}
|
||||
|
||||
/// Allocated objects size by the pool. If allocatedObjectsSize == maxSize then pool is full.
|
||||
inline size_t allocatedObjectsSize() const
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(objects_mutex);
|
||||
return allocated_objects_size;
|
||||
}
|
||||
|
||||
/// Returns allocatedObjectsSize == maxSize
|
||||
inline bool isFull() const
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(objects_mutex);
|
||||
return allocated_objects_size == max_size;
|
||||
}
|
||||
|
||||
/// Borrowed objects size. If borrowedObjectsSize == allocatedObjectsSize and pool is full.
|
||||
/// Then client will wait during borrowObject function call.
|
||||
inline size_t borrowedObjectsSize() const
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(objects_mutex);
|
||||
return borrowed_objects_size;
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
template <typename FactoryFunc>
|
||||
inline T allocateObjectForBorrowing(const std::unique_lock<std::mutex> &, FactoryFunc && func)
|
||||
{
|
||||
++allocated_objects_size;
|
||||
++borrowed_objects_size;
|
||||
|
||||
return std::forward<FactoryFunc>(func)();
|
||||
}
|
||||
|
||||
inline T borrowFromObjects(const std::unique_lock<std::mutex> &)
|
||||
{
|
||||
T dst;
|
||||
detail::moveOrCopyIfThrow(std::move(objects.back()), dst);
|
||||
objects.pop_back();
|
||||
|
||||
++borrowed_objects_size;
|
||||
|
||||
return dst;
|
||||
}
|
||||
|
||||
size_t max_size;
|
||||
|
||||
mutable std::mutex objects_mutex;
|
||||
std::condition_variable condition_variable;
|
||||
size_t allocated_objects_size = 0;
|
||||
size_t borrowed_objects_size = 0;
|
||||
std::vector<T> objects;
|
||||
};
|
@ -6,38 +6,7 @@
|
||||
#include <Poco/Mutex.h>
|
||||
#include <Poco/Semaphore.h>
|
||||
|
||||
#include <common/types.h>
|
||||
|
||||
|
||||
namespace detail
|
||||
{
|
||||
template <typename T, bool is_nothrow_move_assignable = std::is_nothrow_move_assignable_v<T>>
|
||||
struct MoveOrCopyIfThrow;
|
||||
|
||||
template <typename T>
|
||||
struct MoveOrCopyIfThrow<T, true>
|
||||
{
|
||||
void operator()(T && src, T & dst) const
|
||||
{
|
||||
dst = std::forward<T>(src);
|
||||
}
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
struct MoveOrCopyIfThrow<T, false>
|
||||
{
|
||||
void operator()(T && src, T & dst) const
|
||||
{
|
||||
dst = src;
|
||||
}
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
void moveOrCopyIfThrow(T && src, T & dst)
|
||||
{
|
||||
MoveOrCopyIfThrow<T>()(std::forward<T>(src), dst);
|
||||
}
|
||||
}
|
||||
#include <Common/MoveOrCopyIfThrow.h>
|
||||
|
||||
/** A very simple thread-safe queue of limited size.
|
||||
* If you try to pop an item from an empty queue, the thread is blocked until the queue becomes nonempty.
|
||||
@ -53,8 +22,10 @@ private:
|
||||
Poco::Semaphore empty_count;
|
||||
|
||||
public:
|
||||
ConcurrentBoundedQueue(size_t max_fill)
|
||||
: fill_count(0, max_fill), empty_count(max_fill, max_fill) {}
|
||||
explicit ConcurrentBoundedQueue(size_t max_fill)
|
||||
: fill_count(0, max_fill)
|
||||
, empty_count(max_fill, max_fill)
|
||||
{}
|
||||
|
||||
void push(const T & x)
|
||||
{
|
||||
|
33
src/Common/MoveOrCopyIfThrow.h
Normal file
33
src/Common/MoveOrCopyIfThrow.h
Normal file
@ -0,0 +1,33 @@
|
||||
#pragma once
|
||||
|
||||
#include <common/types.h>
|
||||
|
||||
namespace detail
|
||||
{
|
||||
template <typename T, bool is_nothrow_move_assignable = std::is_nothrow_move_assignable_v<T>>
|
||||
struct MoveOrCopyIfThrow;
|
||||
|
||||
template <typename T>
|
||||
struct MoveOrCopyIfThrow<T, true>
|
||||
{
|
||||
void operator()(T && src, T & dst) const
|
||||
{
|
||||
dst = std::forward<T>(src);
|
||||
}
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
struct MoveOrCopyIfThrow<T, false>
|
||||
{
|
||||
void operator()(T && src, T & dst) const
|
||||
{
|
||||
dst = src;
|
||||
}
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
void moveOrCopyIfThrow(T && src, T & dst)
|
||||
{
|
||||
MoveOrCopyIfThrow<T>()(std::forward<T>(src), dst);
|
||||
}
|
||||
}
|
@ -2,15 +2,18 @@
|
||||
#include <sys/wait.h>
|
||||
#include <fcntl.h>
|
||||
#include <dlfcn.h>
|
||||
#include <unistd.h>
|
||||
#include <time.h>
|
||||
#include <csignal>
|
||||
|
||||
#include <common/logger_useful.h>
|
||||
#include <common/errnoToString.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/ShellCommand.h>
|
||||
#include <Common/PipeFDs.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <common/errnoToString.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/Operators.h>
|
||||
#include <unistd.h>
|
||||
#include <csignal>
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
@ -36,9 +39,9 @@ namespace ErrorCodes
|
||||
extern const int CANNOT_CREATE_CHILD_PROCESS;
|
||||
}
|
||||
|
||||
ShellCommand::ShellCommand(pid_t pid_, int & in_fd_, int & out_fd_, int & err_fd_, bool terminate_in_destructor_)
|
||||
ShellCommand::ShellCommand(pid_t pid_, int & in_fd_, int & out_fd_, int & err_fd_, ShellCommandDestructorStrategy destructor_strategy_)
|
||||
: pid(pid_)
|
||||
, terminate_in_destructor(terminate_in_destructor_)
|
||||
, destructor_strategy(destructor_strategy_)
|
||||
, in(in_fd_)
|
||||
, out(out_fd_)
|
||||
, err(err_fd_)
|
||||
@ -52,14 +55,24 @@ Poco::Logger * ShellCommand::getLogger()
|
||||
|
||||
ShellCommand::~ShellCommand()
|
||||
{
|
||||
if (terminate_in_destructor)
|
||||
if (wait_called)
|
||||
return;
|
||||
|
||||
if (destructor_strategy.terminate_in_destructor)
|
||||
{
|
||||
LOG_TRACE(getLogger(), "Will kill shell command pid {} with SIGTERM", pid);
|
||||
int retcode = kill(pid, SIGTERM);
|
||||
if (retcode != 0)
|
||||
LOG_WARNING(getLogger(), "Cannot kill shell command pid {} errno '{}'", pid, errnoToString(retcode));
|
||||
size_t try_wait_timeout = destructor_strategy.wait_for_normal_exit_before_termination_seconds;
|
||||
bool process_terminated_normally = tryWaitProcessWithTimeout(try_wait_timeout);
|
||||
|
||||
if (!process_terminated_normally)
|
||||
{
|
||||
LOG_TRACE(getLogger(), "Will kill shell command pid {} with SIGTERM", pid);
|
||||
|
||||
int retcode = kill(pid, SIGTERM);
|
||||
if (retcode != 0)
|
||||
LOG_WARNING(getLogger(), "Cannot kill shell command pid {} errno '{}'", pid, errnoToString(retcode));
|
||||
}
|
||||
}
|
||||
else if (!wait_called)
|
||||
else
|
||||
{
|
||||
try
|
||||
{
|
||||
@ -72,6 +85,54 @@ ShellCommand::~ShellCommand()
|
||||
}
|
||||
}
|
||||
|
||||
bool ShellCommand::tryWaitProcessWithTimeout(size_t timeout_in_seconds)
|
||||
{
|
||||
int status = 0;
|
||||
|
||||
LOG_TRACE(getLogger(), "Try wait for shell command pid ({}) with timeout ({})", pid, timeout_in_seconds);
|
||||
|
||||
wait_called = true;
|
||||
struct timespec interval {.tv_sec = 1, .tv_nsec = 0};
|
||||
|
||||
in.close();
|
||||
out.close();
|
||||
err.close();
|
||||
|
||||
if (timeout_in_seconds == 0)
|
||||
{
|
||||
/// If there is no timeout before signal try to waitpid 1 time without block so we can avoid sending
|
||||
/// signal if process is already normally terminated.
|
||||
|
||||
int waitpid_res = waitpid(pid, &status, WNOHANG);
|
||||
bool process_terminated_normally = (waitpid_res == pid);
|
||||
return process_terminated_normally;
|
||||
}
|
||||
|
||||
/// If timeout is positive try waitpid without block in loop until
|
||||
/// process is normally terminated or waitpid return error
|
||||
|
||||
while (timeout_in_seconds != 0)
|
||||
{
|
||||
int waitpid_res = waitpid(pid, &status, WNOHANG);
|
||||
|
||||
bool process_terminated_normally = (waitpid_res == pid);
|
||||
|
||||
if (process_terminated_normally)
|
||||
return true;
|
||||
else if (waitpid_res == 0)
|
||||
{
|
||||
--timeout_in_seconds;
|
||||
nanosleep(&interval, nullptr);
|
||||
|
||||
continue;
|
||||
}
|
||||
else if (waitpid_res == -1 && errno != EINTR)
|
||||
return false;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
void ShellCommand::logCommand(const char * filename, char * const argv[])
|
||||
{
|
||||
WriteBufferFromOwnString args;
|
||||
@ -87,7 +148,10 @@ void ShellCommand::logCommand(const char * filename, char * const argv[])
|
||||
}
|
||||
|
||||
std::unique_ptr<ShellCommand> ShellCommand::executeImpl(
|
||||
const char * filename, char * const argv[], bool pipe_stdin_only, bool terminate_in_destructor)
|
||||
const char * filename,
|
||||
char * const argv[],
|
||||
bool pipe_stdin_only,
|
||||
ShellCommandDestructorStrategy terminate_in_destructor_strategy)
|
||||
{
|
||||
logCommand(filename, argv);
|
||||
|
||||
@ -144,7 +208,7 @@ std::unique_ptr<ShellCommand> ShellCommand::executeImpl(
|
||||
}
|
||||
|
||||
std::unique_ptr<ShellCommand> res(new ShellCommand(
|
||||
pid, pipe_stdin.fds_rw[1], pipe_stdout.fds_rw[0], pipe_stderr.fds_rw[0], terminate_in_destructor));
|
||||
pid, pipe_stdin.fds_rw[1], pipe_stdout.fds_rw[0], pipe_stderr.fds_rw[0], terminate_in_destructor_strategy));
|
||||
|
||||
LOG_TRACE(getLogger(), "Started shell command '{}' with pid {}", filename, pid);
|
||||
return res;
|
||||
@ -152,7 +216,9 @@ std::unique_ptr<ShellCommand> ShellCommand::executeImpl(
|
||||
|
||||
|
||||
std::unique_ptr<ShellCommand> ShellCommand::execute(
|
||||
const std::string & command, bool pipe_stdin_only, bool terminate_in_destructor)
|
||||
const std::string & command,
|
||||
bool pipe_stdin_only,
|
||||
ShellCommandDestructorStrategy terminate_in_destructor_strategy)
|
||||
{
|
||||
/// Arguments in non-constant chunks of memory (as required for `execv`).
|
||||
/// Moreover, their copying must be done before calling `vfork`, so after `vfork` do a minimum of things.
|
||||
@ -162,12 +228,14 @@ std::unique_ptr<ShellCommand> ShellCommand::execute(
|
||||
|
||||
char * const argv[] = { argv0.data(), argv1.data(), argv2.data(), nullptr };
|
||||
|
||||
return executeImpl("/bin/sh", argv, pipe_stdin_only, terminate_in_destructor);
|
||||
return executeImpl("/bin/sh", argv, pipe_stdin_only, terminate_in_destructor_strategy);
|
||||
}
|
||||
|
||||
|
||||
std::unique_ptr<ShellCommand> ShellCommand::executeDirect(
|
||||
const std::string & path, const std::vector<std::string> & arguments, bool terminate_in_destructor)
|
||||
const std::string & path,
|
||||
const std::vector<std::string> & arguments,
|
||||
ShellCommandDestructorStrategy terminate_in_destructor_strategy)
|
||||
{
|
||||
size_t argv_sum_size = path.size() + 1;
|
||||
for (const auto & arg : arguments)
|
||||
@ -188,7 +256,7 @@ std::unique_ptr<ShellCommand> ShellCommand::executeDirect(
|
||||
|
||||
argv[arguments.size() + 1] = nullptr;
|
||||
|
||||
return executeImpl(path.data(), argv.data(), false, terminate_in_destructor);
|
||||
return executeImpl(path.data(), argv.data(), false, terminate_in_destructor_strategy);
|
||||
}
|
||||
|
||||
|
||||
|
@ -23,21 +23,38 @@ namespace DB
|
||||
* The second difference - allows to work simultaneously with stdin, and with stdout, and with stderr of running process,
|
||||
* and also to obtain the return code and completion status.
|
||||
*/
|
||||
class ShellCommand
|
||||
|
||||
struct ShellCommandDestructorStrategy final
|
||||
{
|
||||
explicit ShellCommandDestructorStrategy(bool terminate_in_destructor_, size_t wait_for_normal_exit_before_termination_seconds_ = 0)
|
||||
: terminate_in_destructor(terminate_in_destructor_)
|
||||
, wait_for_normal_exit_before_termination_seconds(wait_for_normal_exit_before_termination_seconds_)
|
||||
{
|
||||
}
|
||||
|
||||
bool terminate_in_destructor;
|
||||
|
||||
/// If terminate in destructor is true, command will wait until send SIGTERM signal to created process
|
||||
size_t wait_for_normal_exit_before_termination_seconds = 0;
|
||||
};
|
||||
|
||||
class ShellCommand final
|
||||
{
|
||||
private:
|
||||
pid_t pid;
|
||||
bool wait_called = false;
|
||||
bool terminate_in_destructor;
|
||||
ShellCommandDestructorStrategy destructor_strategy;
|
||||
|
||||
ShellCommand(pid_t pid_, int & in_fd_, int & out_fd_, int & err_fd_, bool terminate_in_destructor_);
|
||||
ShellCommand(pid_t pid_, int & in_fd_, int & out_fd_, int & err_fd_, ShellCommandDestructorStrategy destructor_strategy_);
|
||||
|
||||
bool tryWaitProcessWithTimeout(size_t timeout_in_seconds);
|
||||
|
||||
static Poco::Logger * getLogger();
|
||||
|
||||
/// Print command name and the list of arguments to log. NOTE: No escaping of arguments is performed.
|
||||
static void logCommand(const char * filename, char * const argv[]);
|
||||
|
||||
static std::unique_ptr<ShellCommand> executeImpl(const char * filename, char * const argv[], bool pipe_stdin_only, bool terminate_in_destructor);
|
||||
static std::unique_ptr<ShellCommand> executeImpl(const char * filename, char * const argv[], bool pipe_stdin_only, ShellCommandDestructorStrategy terminate_in_destructor_strategy);
|
||||
|
||||
public:
|
||||
WriteBufferFromFile in; /// If the command reads from stdin, do not forget to call in.close() after writing all the data there.
|
||||
@ -48,11 +65,11 @@ public:
|
||||
|
||||
/// Run the command using /bin/sh -c.
|
||||
/// If terminate_in_destructor is true, send terminate signal in destructor and don't wait process.
|
||||
static std::unique_ptr<ShellCommand> execute(const std::string & command, bool pipe_stdin_only = false, bool terminate_in_destructor = false);
|
||||
static std::unique_ptr<ShellCommand> execute(const std::string & command, bool pipe_stdin_only = false, ShellCommandDestructorStrategy terminate_in_destructor_strategy = ShellCommandDestructorStrategy(false));
|
||||
|
||||
/// Run the executable with the specified arguments. `arguments` - without argv[0].
|
||||
/// If terminate_in_destructor is true, send terminate signal in destructor and don't wait process.
|
||||
static std::unique_ptr<ShellCommand> executeDirect(const std::string & path, const std::vector<std::string> & arguments, bool terminate_in_destructor = false);
|
||||
static std::unique_ptr<ShellCommand> executeDirect(const std::string & path, const std::vector<std::string> & arguments, ShellCommandDestructorStrategy terminate_in_destructor_strategy = ShellCommandDestructorStrategy(false));
|
||||
|
||||
/// Wait for the process to end, throw an exception if the code is not 0 or if the process was not completed by itself.
|
||||
void wait();
|
||||
|
@ -345,7 +345,7 @@ struct ODBCBridgeMixin
|
||||
|
||||
LOG_TRACE(log, "Starting {}", serviceAlias());
|
||||
|
||||
return ShellCommand::executeDirect(path.toString(), cmd_args, true);
|
||||
return ShellCommand::executeDirect(path.toString(), cmd_args, ShellCommandDestructorStrategy(true));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -13,6 +13,11 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
|
||||
}
|
||||
|
||||
void formatBlock(BlockOutputStreamPtr & out, const Block & block)
|
||||
{
|
||||
out->writePrefix();
|
||||
@ -88,4 +93,63 @@ Context copyContextAndApplySettings(
|
||||
return local_context;
|
||||
}
|
||||
|
||||
|
||||
BlockInputStreamWithAdditionalColumns::BlockInputStreamWithAdditionalColumns(
|
||||
Block block_to_add_, std::unique_ptr<IBlockInputStream> && stream_)
|
||||
: block_to_add(std::move(block_to_add_))
|
||||
, stream(std::move(stream_))
|
||||
{
|
||||
}
|
||||
|
||||
Block BlockInputStreamWithAdditionalColumns::getHeader() const
|
||||
{
|
||||
auto header = stream->getHeader();
|
||||
|
||||
if (header)
|
||||
{
|
||||
for (Int64 i = static_cast<Int64>(block_to_add.columns() - 1); i >= 0; --i)
|
||||
header.insert(0, block_to_add.getByPosition(i).cloneEmpty());
|
||||
}
|
||||
|
||||
return header;
|
||||
}
|
||||
|
||||
Block BlockInputStreamWithAdditionalColumns::readImpl()
|
||||
{
|
||||
auto block = stream->read();
|
||||
|
||||
if (block)
|
||||
{
|
||||
auto block_rows = block.rows();
|
||||
|
||||
auto cut_block = block_to_add.cloneWithCutColumns(current_range_index, block_rows);
|
||||
|
||||
if (cut_block.rows() != block_rows)
|
||||
throw Exception(
|
||||
"Number of rows in block to add after cut must equal to number of rows in block from inner stream",
|
||||
ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
|
||||
|
||||
for (Int64 i = static_cast<Int64>(cut_block.columns() - 1); i >= 0; --i)
|
||||
block.insert(0, cut_block.getByPosition(i));
|
||||
|
||||
current_range_index += block_rows;
|
||||
}
|
||||
|
||||
return block;
|
||||
}
|
||||
|
||||
void BlockInputStreamWithAdditionalColumns::readPrefix()
|
||||
{
|
||||
stream->readPrefix();
|
||||
}
|
||||
|
||||
void BlockInputStreamWithAdditionalColumns::readSuffix()
|
||||
{
|
||||
stream->readSuffix();
|
||||
}
|
||||
|
||||
String BlockInputStreamWithAdditionalColumns::getName() const
|
||||
{
|
||||
return "BlockInputStreamWithAdditionalColumns";
|
||||
}
|
||||
}
|
||||
|
@ -6,12 +6,13 @@
|
||||
|
||||
#include <Poco/File.h>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
#include <Columns/IColumn.h>
|
||||
#include <Core/Block.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class IBlockOutputStream;
|
||||
using BlockOutputStreamPtr = std::shared_ptr<IBlockOutputStream>;
|
||||
|
||||
@ -46,4 +47,29 @@ void applySettingsToContext(
|
||||
Context & context,
|
||||
const Poco::Util::AbstractConfiguration & config);
|
||||
|
||||
/** A stream, adds additional columns to each block that it will read from inner stream.
|
||||
*
|
||||
* block_to_add rows size must be equal to final sum rows size of all inner stream blocks.
|
||||
*/
|
||||
class BlockInputStreamWithAdditionalColumns final : public IBlockInputStream
|
||||
{
|
||||
public:
|
||||
BlockInputStreamWithAdditionalColumns(Block block_to_add_, std::unique_ptr<IBlockInputStream> && stream_);
|
||||
|
||||
Block getHeader() const override;
|
||||
|
||||
Block readImpl() override;
|
||||
|
||||
void readPrefix() override;
|
||||
|
||||
void readSuffix() override;
|
||||
|
||||
String getName() const override;
|
||||
|
||||
private:
|
||||
Block block_to_add;
|
||||
std::unique_ptr<IBlockInputStream> stream;
|
||||
size_t current_range_index = 0;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -27,7 +27,6 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int DICTIONARY_ACCESS_DENIED;
|
||||
extern const int UNSUPPORTED_METHOD;
|
||||
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
|
||||
}
|
||||
|
||||
namespace
|
||||
@ -58,7 +57,6 @@ namespace
|
||||
|
||||
}
|
||||
|
||||
|
||||
ExecutableDictionarySource::ExecutableDictionarySource(
|
||||
const DictionaryStructure & dict_struct_,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
@ -200,101 +198,14 @@ namespace
|
||||
std::function<void(WriteBufferFromFile &)> send_data;
|
||||
ThreadFromGlobalPool thread;
|
||||
};
|
||||
|
||||
/** A stream, adds additional columns to each block that it will read from inner stream.
|
||||
*
|
||||
* block_to_add rows size must be equal to final sum rows size of all inner stream blocks.
|
||||
*/
|
||||
class BlockInputStreamWithAdditionalColumns final: public IBlockInputStream
|
||||
{
|
||||
public:
|
||||
BlockInputStreamWithAdditionalColumns(
|
||||
Block block_to_add_,
|
||||
std::unique_ptr<IBlockInputStream>&& stream_)
|
||||
: block_to_add(std::move(block_to_add_))
|
||||
, stream(std::move(stream_))
|
||||
{
|
||||
}
|
||||
|
||||
Block getHeader() const override
|
||||
{
|
||||
auto header = stream->getHeader();
|
||||
|
||||
if (header)
|
||||
{
|
||||
for (Int64 i = static_cast<Int64>(block_to_add.columns() - 1); i >= 0; --i)
|
||||
header.insert(0, block_to_add.getByPosition(i).cloneEmpty());
|
||||
}
|
||||
|
||||
return header;
|
||||
}
|
||||
|
||||
Block readImpl() override
|
||||
{
|
||||
auto block = stream->read();
|
||||
|
||||
if (block)
|
||||
{
|
||||
auto block_rows = block.rows();
|
||||
|
||||
auto cut_block = block_to_add.cloneWithCutColumns(current_range_index, block_rows);
|
||||
|
||||
if (cut_block.rows() != block_rows)
|
||||
throw Exception(
|
||||
"Number of rows in block to add after cut must equal to number of rows in block from inner stream",
|
||||
ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
|
||||
|
||||
for (Int64 i = static_cast<Int64>(cut_block.columns() - 1); i >= 0; --i)
|
||||
block.insert(0, cut_block.getByPosition(i));
|
||||
|
||||
current_range_index += block_rows;
|
||||
}
|
||||
|
||||
return block;
|
||||
}
|
||||
|
||||
void readPrefix() override
|
||||
{
|
||||
stream->readPrefix();
|
||||
}
|
||||
|
||||
void readSuffix() override
|
||||
{
|
||||
stream->readSuffix();
|
||||
}
|
||||
|
||||
String getName() const override { return "BlockInputStreamWithAdditionalColumns"; }
|
||||
|
||||
private:
|
||||
Block block_to_add;
|
||||
std::unique_ptr<IBlockInputStream> stream;
|
||||
size_t current_range_index = 0;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
|
||||
BlockInputStreamPtr ExecutableDictionarySource::loadIds(const std::vector<UInt64> & ids)
|
||||
{
|
||||
LOG_TRACE(log, "loadIds {} size = {}", toString(), ids.size());
|
||||
|
||||
auto block = blockForIds(dict_struct, ids);
|
||||
|
||||
auto stream = std::make_unique<BlockInputStreamWithBackgroundThread>(
|
||||
context, format, sample_block, command, log,
|
||||
[block, this](WriteBufferFromFile & out) mutable
|
||||
{
|
||||
auto output_stream = context.getOutputStream(format, out, block.cloneEmpty());
|
||||
formatBlock(output_stream, block);
|
||||
out.close();
|
||||
});
|
||||
|
||||
if (implicit_key)
|
||||
{
|
||||
return std::make_shared<BlockInputStreamWithAdditionalColumns>(block, std::move(stream));
|
||||
}
|
||||
else
|
||||
return std::shared_ptr<BlockInputStreamWithBackgroundThread>(stream.release());
|
||||
return getStreamForBlock(block);
|
||||
}
|
||||
|
||||
BlockInputStreamPtr ExecutableDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
|
||||
@ -302,7 +213,11 @@ BlockInputStreamPtr ExecutableDictionarySource::loadKeys(const Columns & key_col
|
||||
LOG_TRACE(log, "loadKeys {} size = {}", toString(), requested_rows.size());
|
||||
|
||||
auto block = blockForKeys(dict_struct, key_columns, requested_rows);
|
||||
return getStreamForBlock(block);
|
||||
}
|
||||
|
||||
BlockInputStreamPtr ExecutableDictionarySource::getStreamForBlock(const Block & block)
|
||||
{
|
||||
auto stream = std::make_unique<BlockInputStreamWithBackgroundThread>(
|
||||
context, format, sample_block, command, log,
|
||||
[block, this](WriteBufferFromFile & out) mutable
|
||||
@ -354,13 +269,13 @@ void registerDictionarySourceExecutable(DictionarySourceFactory & factory)
|
||||
bool check_config) -> DictionarySourcePtr
|
||||
{
|
||||
if (dict_struct.has_expressions)
|
||||
throw Exception{"Dictionary source of type `executable` does not support attribute expressions", ErrorCodes::LOGICAL_ERROR};
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Dictionary source of type `executable` does not support attribute expressions");
|
||||
|
||||
/// Executable dictionaries may execute arbitrary commands.
|
||||
/// It's OK for dictionaries created by administrator from xml-file, but
|
||||
/// maybe dangerous for dictionaries created from DDL-queries.
|
||||
if (check_config)
|
||||
throw Exception("Dictionaries with Executable dictionary source is not allowed", ErrorCodes::DICTIONARY_ACCESS_DENIED);
|
||||
throw Exception(ErrorCodes::DICTIONARY_ACCESS_DENIED, "Dictionaries with executable dictionary source are not allowed to be created from DDL query");
|
||||
|
||||
Context context_local_copy = copyContextAndApplySettings(config_prefix, context, config);
|
||||
|
||||
|
@ -47,6 +47,8 @@ public:
|
||||
|
||||
std::string toString() const override;
|
||||
|
||||
BlockInputStreamPtr getStreamForBlock(const Block & block);
|
||||
|
||||
private:
|
||||
Poco::Logger * log;
|
||||
time_t update_time = 0;
|
||||
|
321
src/Dictionaries/ExecutablePoolDictionarySource.cpp
Normal file
321
src/Dictionaries/ExecutablePoolDictionarySource.cpp
Normal file
@ -0,0 +1,321 @@
|
||||
#include "ExecutablePoolDictionarySource.h"
|
||||
|
||||
#include <functional>
|
||||
#include <ext/scope_guard.h>
|
||||
#include <DataStreams/IBlockOutputStream.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/copyData.h>
|
||||
#include <Common/ShellCommand.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <common/LocalDateTime.h>
|
||||
#include "DictionarySourceFactory.h"
|
||||
#include "DictionarySourceHelpers.h"
|
||||
#include "DictionaryStructure.h"
|
||||
#include "registerDictionaries.h"
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int DICTIONARY_ACCESS_DENIED;
|
||||
extern const int UNSUPPORTED_METHOD;
|
||||
extern const int TIMEOUT_EXCEEDED;
|
||||
}
|
||||
|
||||
ExecutablePoolDictionarySource::ExecutablePoolDictionarySource(
|
||||
const DictionaryStructure & dict_struct_,
|
||||
const Configuration & configuration_,
|
||||
Block & sample_block_,
|
||||
const Context & context_)
|
||||
: log(&Poco::Logger::get("ExecutablePoolDictionarySource"))
|
||||
, dict_struct{dict_struct_}
|
||||
, configuration{configuration_}
|
||||
, sample_block{sample_block_}
|
||||
, context{context_}
|
||||
/// If pool size == 0 then there is no size restrictions. Poco max size of semaphore is integer type.
|
||||
, process_pool{std::make_shared<ProcessPool>(configuration.pool_size == 0 ? std::numeric_limits<int>::max() : configuration.pool_size)}
|
||||
{
|
||||
/// Remove keys from sample_block for implicit_key dictionary because
|
||||
/// these columns will not be returned from source
|
||||
/// Implicit key means that the source script will return only values,
|
||||
/// and the correspondence to the requested keys is determined implicitly - by the order of rows in the result.
|
||||
if (configuration.implicit_key)
|
||||
{
|
||||
auto keys_names = dict_struct.getKeysNames();
|
||||
|
||||
for (auto & key_name : keys_names)
|
||||
{
|
||||
size_t key_column_position_in_block = sample_block.getPositionByName(key_name);
|
||||
sample_block.erase(key_column_position_in_block);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ExecutablePoolDictionarySource::ExecutablePoolDictionarySource(const ExecutablePoolDictionarySource & other)
|
||||
: log(&Poco::Logger::get("ExecutablePoolDictionarySource"))
|
||||
, update_time{other.update_time}
|
||||
, dict_struct{other.dict_struct}
|
||||
, configuration{other.configuration}
|
||||
, sample_block{other.sample_block}
|
||||
, context{other.context}
|
||||
, process_pool{std::make_shared<ProcessPool>(configuration.pool_size)}
|
||||
{
|
||||
}
|
||||
|
||||
BlockInputStreamPtr ExecutablePoolDictionarySource::loadAll()
|
||||
{
|
||||
throw Exception("ExecutablePoolDictionarySource with implicit_key does not support loadAll method", ErrorCodes::UNSUPPORTED_METHOD);
|
||||
}
|
||||
|
||||
BlockInputStreamPtr ExecutablePoolDictionarySource::loadUpdatedAll()
|
||||
{
|
||||
throw Exception("ExecutablePoolDictionarySource with implicit_key does not support loadAll method", ErrorCodes::UNSUPPORTED_METHOD);
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
/** A stream, that runs child process and sends data to its stdin in background thread,
|
||||
* and receives data from its stdout.
|
||||
*/
|
||||
class PoolBlockInputStreamWithBackgroundThread final : public IBlockInputStream
|
||||
{
|
||||
public:
|
||||
PoolBlockInputStreamWithBackgroundThread(
|
||||
std::shared_ptr<ProcessPool> process_pool_,
|
||||
std::unique_ptr<ShellCommand> && command_,
|
||||
BlockInputStreamPtr && stream_,
|
||||
size_t read_rows_,
|
||||
Poco::Logger * log_,
|
||||
std::function<void(WriteBufferFromFile &)> && send_data_)
|
||||
: process_pool(process_pool_)
|
||||
, command(std::move(command_))
|
||||
, stream(std::move(stream_))
|
||||
, rows_to_read(read_rows_)
|
||||
, log(log_)
|
||||
, send_data(std::move(send_data_))
|
||||
, thread([this]
|
||||
{
|
||||
try
|
||||
{
|
||||
send_data(command->in);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
std::lock_guard<std::mutex> lck(exception_during_read_lock);
|
||||
exception_during_read = std::current_exception();
|
||||
}
|
||||
})
|
||||
{}
|
||||
|
||||
~PoolBlockInputStreamWithBackgroundThread() override
|
||||
{
|
||||
if (thread.joinable())
|
||||
thread.join();
|
||||
|
||||
if (command)
|
||||
process_pool->returnObject(std::move(command));
|
||||
}
|
||||
|
||||
Block getHeader() const override
|
||||
{
|
||||
return stream->getHeader();
|
||||
}
|
||||
|
||||
private:
|
||||
Block readImpl() override
|
||||
{
|
||||
rethrowExceptionDuringReadIfNeeded();
|
||||
|
||||
if (current_read_rows == rows_to_read)
|
||||
return Block();
|
||||
|
||||
Block block;
|
||||
|
||||
try
|
||||
{
|
||||
block = stream->read();
|
||||
current_read_rows += block.rows();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(log);
|
||||
command = nullptr;
|
||||
throw;
|
||||
}
|
||||
|
||||
return block;
|
||||
}
|
||||
|
||||
void readPrefix() override
|
||||
{
|
||||
rethrowExceptionDuringReadIfNeeded();
|
||||
stream->readPrefix();
|
||||
}
|
||||
|
||||
void readSuffix() override
|
||||
{
|
||||
if (thread.joinable())
|
||||
thread.join();
|
||||
|
||||
rethrowExceptionDuringReadIfNeeded();
|
||||
stream->readSuffix();
|
||||
}
|
||||
|
||||
void rethrowExceptionDuringReadIfNeeded()
|
||||
{
|
||||
std::lock_guard<std::mutex> lck(exception_during_read_lock);
|
||||
if (exception_during_read)
|
||||
{
|
||||
command = nullptr;
|
||||
std::rethrow_exception(exception_during_read);
|
||||
}
|
||||
}
|
||||
|
||||
String getName() const override { return "PoolWithBackgroundThread"; }
|
||||
|
||||
std::shared_ptr<ProcessPool> process_pool;
|
||||
std::unique_ptr<ShellCommand> command;
|
||||
BlockInputStreamPtr stream;
|
||||
size_t rows_to_read;
|
||||
Poco::Logger * log;
|
||||
std::function<void(WriteBufferFromFile &)> send_data;
|
||||
ThreadFromGlobalPool thread;
|
||||
size_t current_read_rows = 0;
|
||||
std::mutex exception_during_read_lock;
|
||||
std::exception_ptr exception_during_read;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
BlockInputStreamPtr ExecutablePoolDictionarySource::loadIds(const std::vector<UInt64> & ids)
|
||||
{
|
||||
LOG_TRACE(log, "loadIds {} size = {}", toString(), ids.size());
|
||||
|
||||
auto block = blockForIds(dict_struct, ids);
|
||||
return getStreamForBlock(block);
|
||||
}
|
||||
|
||||
BlockInputStreamPtr ExecutablePoolDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
|
||||
{
|
||||
LOG_TRACE(log, "loadKeys {} size = {}", toString(), requested_rows.size());
|
||||
|
||||
auto block = blockForKeys(dict_struct, key_columns, requested_rows);
|
||||
return getStreamForBlock(block);
|
||||
}
|
||||
|
||||
BlockInputStreamPtr ExecutablePoolDictionarySource::getStreamForBlock(const Block & block)
|
||||
{
|
||||
std::unique_ptr<ShellCommand> process;
|
||||
bool result = process_pool->tryBorrowObject(process, [this]()
|
||||
{
|
||||
bool terminate_in_destructor = true;
|
||||
ShellCommandDestructorStrategy strategy { terminate_in_destructor, configuration.command_termination_timeout };
|
||||
auto shell_command = ShellCommand::execute(configuration.command, false, strategy);
|
||||
return shell_command;
|
||||
}, configuration.max_command_execution_time * 10000);
|
||||
|
||||
if (!result)
|
||||
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Could not get process from pool, max command execution timeout exceeded");
|
||||
|
||||
size_t rows_to_read = block.rows();
|
||||
auto read_stream = context.getInputFormat(configuration.format, process->out, sample_block, rows_to_read);
|
||||
|
||||
auto stream = std::make_unique<PoolBlockInputStreamWithBackgroundThread>(
|
||||
process_pool, std::move(process), std::move(read_stream), rows_to_read, log,
|
||||
[block, this](WriteBufferFromFile & out) mutable
|
||||
{
|
||||
auto output_stream = context.getOutputStream(configuration.format, out, block.cloneEmpty());
|
||||
formatBlock(output_stream, block);
|
||||
});
|
||||
|
||||
if (configuration.implicit_key)
|
||||
return std::make_shared<BlockInputStreamWithAdditionalColumns>(block, std::move(stream));
|
||||
else
|
||||
return std::shared_ptr<PoolBlockInputStreamWithBackgroundThread>(stream.release());
|
||||
}
|
||||
|
||||
bool ExecutablePoolDictionarySource::isModified() const
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ExecutablePoolDictionarySource::supportsSelectiveLoad() const
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ExecutablePoolDictionarySource::hasUpdateField() const
|
||||
{
|
||||
return !configuration.update_field.empty();
|
||||
}
|
||||
|
||||
DictionarySourcePtr ExecutablePoolDictionarySource::clone() const
|
||||
{
|
||||
return std::make_unique<ExecutablePoolDictionarySource>(*this);
|
||||
}
|
||||
|
||||
std::string ExecutablePoolDictionarySource::toString() const
|
||||
{
|
||||
return "ExecutablePool size: " + std::to_string(configuration.pool_size) + " command: " + configuration.command;
|
||||
}
|
||||
|
||||
void registerDictionarySourceExecutablePool(DictionarySourceFactory & factory)
|
||||
{
|
||||
auto create_table_source = [=](const DictionaryStructure & dict_struct,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
Block & sample_block,
|
||||
const Context & context,
|
||||
const std::string & /* default_database */,
|
||||
bool check_config) -> DictionarySourcePtr
|
||||
{
|
||||
if (dict_struct.has_expressions)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Dictionary source of type `executable_pool` does not support attribute expressions");
|
||||
|
||||
/// Executable dictionaries may execute arbitrary commands.
|
||||
/// It's OK for dictionaries created by administrator from xml-file, but
|
||||
/// maybe dangerous for dictionaries created from DDL-queries.
|
||||
if (check_config)
|
||||
throw Exception(ErrorCodes::DICTIONARY_ACCESS_DENIED, "Dictionaries with executable pool dictionary source are not allowed to be created from DDL query");
|
||||
|
||||
Context context_local_copy = copyContextAndApplySettings(config_prefix, context, config);
|
||||
|
||||
/** Currently parallel parsing input format cannot read exactly max_block_size rows from input,
|
||||
* so it will be blocked on ReadBufferFromFileDescriptor because this file descriptor represent pipe that does not have eof.
|
||||
*/
|
||||
auto settings_no_parallel_parsing = context_local_copy.getSettings();
|
||||
settings_no_parallel_parsing.input_format_parallel_parsing = false;
|
||||
context_local_copy.setSettings(settings_no_parallel_parsing);
|
||||
|
||||
String configuration_config_prefix = config_prefix + ".executable_pool";
|
||||
|
||||
size_t max_command_execution_time = config.getUInt64(configuration_config_prefix + ".max_command_execution_time", 10);
|
||||
|
||||
size_t max_execution_time_seconds = static_cast<size_t>(context.getSettings().max_execution_time.totalSeconds());
|
||||
if (max_command_execution_time > max_execution_time_seconds)
|
||||
max_command_execution_time = max_execution_time_seconds;
|
||||
|
||||
ExecutablePoolDictionarySource::Configuration configuration
|
||||
{
|
||||
.command = config.getString(configuration_config_prefix + ".command"),
|
||||
.format = config.getString(configuration_config_prefix + ".format"),
|
||||
.pool_size = config.getUInt64(configuration_config_prefix + ".size"),
|
||||
.update_field = config.getString(configuration_config_prefix + ".update_field", ""),
|
||||
.implicit_key = config.getBool(configuration_config_prefix + ".implicit_key", false),
|
||||
.command_termination_timeout = config.getUInt64(configuration_config_prefix + ".command_termination_timeout", 10),
|
||||
.max_command_execution_time = max_command_execution_time
|
||||
};
|
||||
|
||||
return std::make_unique<ExecutablePoolDictionarySource>(dict_struct, configuration, sample_block, context_local_copy);
|
||||
};
|
||||
|
||||
factory.registerSource("executable_pool", create_table_source);
|
||||
}
|
||||
|
||||
}
|
84
src/Dictionaries/ExecutablePoolDictionarySource.h
Normal file
84
src/Dictionaries/ExecutablePoolDictionarySource.h
Normal file
@ -0,0 +1,84 @@
|
||||
#pragma once
|
||||
|
||||
#include <Core/Block.h>
|
||||
#include <Common/BorrowedObjectPool.h>
|
||||
#include <Interpreters/Context.h>
|
||||
|
||||
#include "IDictionarySource.h"
|
||||
#include "DictionaryStructure.h"
|
||||
|
||||
namespace Poco { class Logger; }
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
using ProcessPool = BorrowedObjectPool<std::unique_ptr<ShellCommand>>;
|
||||
|
||||
/** ExecutablePoolDictionarySource allows loading data from pool of processes.
|
||||
* When client requests ids or keys source get process from ProcessPool
|
||||
* and create stream based on source format from process stdout.
|
||||
* It is important that stream format will expect only rows that were requested.
|
||||
* When stream is finished process is returned back to the ProcessPool.
|
||||
* If there are no processes in pool during request client will be blocked
|
||||
* until some process will be retunred to pool.
|
||||
*/
|
||||
class ExecutablePoolDictionarySource final : public IDictionarySource
|
||||
{
|
||||
public:
|
||||
struct Configuration
|
||||
{
|
||||
const String command;
|
||||
const String format;
|
||||
const size_t pool_size;
|
||||
const String update_field;
|
||||
const bool implicit_key;
|
||||
const size_t command_termination_timeout;
|
||||
const size_t max_command_execution_time;
|
||||
};
|
||||
|
||||
ExecutablePoolDictionarySource(
|
||||
const DictionaryStructure & dict_struct_,
|
||||
const Configuration & configuration_,
|
||||
Block & sample_block_,
|
||||
const Context & context_);
|
||||
|
||||
ExecutablePoolDictionarySource(const ExecutablePoolDictionarySource & other);
|
||||
ExecutablePoolDictionarySource & operator=(const ExecutablePoolDictionarySource &) = delete;
|
||||
|
||||
BlockInputStreamPtr loadAll() override;
|
||||
|
||||
/** The logic of this method is flawed, absolutely incorrect and ignorant.
|
||||
* It may lead to skipping some values due to clock sync or timezone changes.
|
||||
* The intended usage of "update_field" is totally different.
|
||||
*/
|
||||
BlockInputStreamPtr loadUpdatedAll() override;
|
||||
|
||||
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;
|
||||
|
||||
BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
|
||||
|
||||
bool isModified() const override;
|
||||
|
||||
bool supportsSelectiveLoad() const override;
|
||||
|
||||
bool hasUpdateField() const override;
|
||||
|
||||
DictionarySourcePtr clone() const override;
|
||||
|
||||
std::string toString() const override;
|
||||
|
||||
BlockInputStreamPtr getStreamForBlock(const Block & block);
|
||||
|
||||
private:
|
||||
Poco::Logger * log;
|
||||
time_t update_time = 0;
|
||||
const DictionaryStructure dict_struct;
|
||||
const Configuration configuration;
|
||||
|
||||
Block sample_block;
|
||||
Context context;
|
||||
std::shared_ptr<ProcessPool> process_pool;
|
||||
};
|
||||
|
||||
}
|
@ -18,6 +18,7 @@ void registerDictionarySourceJDBC(DictionarySourceFactory & source_factory);
|
||||
void registerDictionarySourcePostgreSQL(DictionarySourceFactory & source_factory);
|
||||
#endif
|
||||
void registerDictionarySourceExecutable(DictionarySourceFactory & source_factory);
|
||||
void registerDictionarySourceExecutablePool(DictionarySourceFactory & source_factory);
|
||||
void registerDictionarySourceHTTP(DictionarySourceFactory & source_factory);
|
||||
void registerDictionarySourceLibrary(DictionarySourceFactory & source_factory);
|
||||
|
||||
@ -54,6 +55,7 @@ void registerDictionaries()
|
||||
registerDictionarySourcePostgreSQL(source_factory);
|
||||
#endif
|
||||
registerDictionarySourceExecutable(source_factory);
|
||||
registerDictionarySourceExecutablePool(source_factory);
|
||||
registerDictionarySourceHTTP(source_factory);
|
||||
registerDictionarySourceLibrary(source_factory);
|
||||
}
|
||||
|
@ -42,6 +42,7 @@ SRCS(
|
||||
Embedded/RegionsHierarchy.cpp
|
||||
Embedded/RegionsNames.cpp
|
||||
ExecutableDictionarySource.cpp
|
||||
ExecutablePoolDictionarySource.cpp
|
||||
ExternalQueryBuilder.cpp
|
||||
FileDictionarySource.cpp
|
||||
FlatDictionary.cpp
|
||||
|
@ -368,6 +368,13 @@ public:
|
||||
/// Functions in external dictionaries_loader only support full-value (not constant) columns with keys.
|
||||
ColumnPtr key_column_full = key_col_with_type.column->convertToFullColumnIfConst();
|
||||
|
||||
if (!isTuple(key_col_with_type.type))
|
||||
throw Exception(
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
|
||||
"Third argument of function ({}) must be tuple when dictionary is complex. Actual type ({}).",
|
||||
getName(),
|
||||
key_col_with_type.type->getName());
|
||||
|
||||
const auto & key_columns = typeid_cast<const ColumnTuple &>(*key_column_full).getColumnsCopy();
|
||||
const auto & key_types = static_cast<const DataTypeTuple &>(*key_col_with_type.type).getElements();
|
||||
|
||||
|
@ -60,6 +60,7 @@ if [ "$DATA_DIR_PATTERN" != "$DATA_DIR" ]; then
|
||||
cat ${CONFIG_SERVER_DIR}/ints_dictionary.xml | sed -e s!9000!$CLICKHOUSE_PORT_TCP! > $DATA_DIR/etc/ints_dictionary.xml
|
||||
cat ${CONFIG_SERVER_DIR}/strings_dictionary.xml | sed -e s!9000!$CLICKHOUSE_PORT_TCP! > $DATA_DIR/etc/strings_dictionary.xml
|
||||
cat ${CONFIG_SERVER_DIR}/decimals_dictionary.xml | sed -e s!9000!$CLICKHOUSE_PORT_TCP! > $DATA_DIR/etc/decimals_dictionary.xml
|
||||
cat ${CONFIG_SERVER_DIR}/executable_pool_dictionary.xml | sed -e s!9000!$CLICKHOUSE_PORT_TCP! > $DATA_DIR/etc/executable_pool_dictionary.xml
|
||||
fi
|
||||
|
||||
CLICKHOUSE_EXTRACT_CONFIG=${CLICKHOUSE_EXTRACT_CONFIG:="${CLICKHOUSE_EXTRACT} --config=$CLICKHOUSE_CONFIG"}
|
||||
|
154
tests/config/executable_pool_dictionary.xml
Normal file
154
tests/config/executable_pool_dictionary.xml
Normal file
@ -0,0 +1,154 @@
|
||||
<dictionaries>
|
||||
|
||||
<dictionary>
|
||||
<name>executable_pool_simple</name>
|
||||
|
||||
<structure>
|
||||
<id>
|
||||
<name>x</name>
|
||||
</id>
|
||||
<attribute>
|
||||
<name>a</name>
|
||||
<type>String</type>
|
||||
<null_value></null_value>
|
||||
</attribute>
|
||||
<attribute>
|
||||
<name>b</name>
|
||||
<type>String</type>
|
||||
<null_value></null_value>
|
||||
</attribute>
|
||||
</structure>
|
||||
|
||||
<source>
|
||||
<executable_pool>
|
||||
<format>TabSeparated</format>
|
||||
<command>while read read_data; do printf "$read_data\t$read_data a\t$read_data b\n"; done</command>
|
||||
<size>5</size>
|
||||
</executable_pool>
|
||||
</source>
|
||||
|
||||
<layout>
|
||||
<direct />
|
||||
</layout>
|
||||
|
||||
<lifetime>300</lifetime>
|
||||
</dictionary>
|
||||
|
||||
<dictionary>
|
||||
<name>executable_pool_complex</name>
|
||||
|
||||
<structure>
|
||||
<key>
|
||||
<attribute>
|
||||
<name>x</name>
|
||||
<type>String</type>
|
||||
</attribute>
|
||||
<attribute>
|
||||
<name>y</name>
|
||||
<type>String</type>
|
||||
</attribute>
|
||||
</key>
|
||||
<attribute>
|
||||
<name>a</name>
|
||||
<type>String</type>
|
||||
<null_value></null_value>
|
||||
</attribute>
|
||||
<attribute>
|
||||
<name>b</name>
|
||||
<type>String</type>
|
||||
<null_value></null_value>
|
||||
</attribute>
|
||||
</structure>
|
||||
|
||||
<source>
|
||||
<executable>
|
||||
<format>TabSeparated</format>
|
||||
<command>while read read_data; do printf "$read_data\tvalue a\tvalue b\n"; done</command>
|
||||
</executable>
|
||||
</source>
|
||||
|
||||
<layout>
|
||||
<complex_key_direct />
|
||||
</layout>
|
||||
|
||||
<lifetime>300</lifetime>
|
||||
</dictionary>
|
||||
|
||||
<dictionary>
|
||||
<name>executable_pool_simple_implicit_key</name>
|
||||
|
||||
<structure>
|
||||
<id>
|
||||
<name>x</name>
|
||||
</id>
|
||||
<attribute>
|
||||
<name>a</name>
|
||||
<type>String</type>
|
||||
<null_value></null_value>
|
||||
</attribute>
|
||||
<attribute>
|
||||
<name>b</name>
|
||||
<type>String</type>
|
||||
<null_value></null_value>
|
||||
</attribute>
|
||||
</structure>
|
||||
|
||||
<source>
|
||||
<executable_pool>
|
||||
<format>TabSeparated</format>
|
||||
<command>while read read_data; do printf "$read_data a\t$read_data b\n"; done</command>
|
||||
<size>5</size>
|
||||
<implicit_key>true</implicit_key>
|
||||
</executable_pool>
|
||||
</source>
|
||||
|
||||
<layout>
|
||||
<direct />
|
||||
</layout>
|
||||
|
||||
<lifetime>300</lifetime>
|
||||
</dictionary>
|
||||
|
||||
<dictionary>
|
||||
<name>executable_pool_complex_implicit_key</name>
|
||||
|
||||
<structure>
|
||||
<key>
|
||||
<attribute>
|
||||
<name>x</name>
|
||||
<type>String</type>
|
||||
</attribute>
|
||||
<attribute>
|
||||
<name>y</name>
|
||||
<type>String</type>
|
||||
</attribute>
|
||||
</key>
|
||||
<attribute>
|
||||
<name>a</name>
|
||||
<type>String</type>
|
||||
<null_value></null_value>
|
||||
</attribute>
|
||||
<attribute>
|
||||
<name>b</name>
|
||||
<type>String</type>
|
||||
<null_value></null_value>
|
||||
</attribute>
|
||||
</structure>
|
||||
|
||||
<source>
|
||||
<executable_pool>
|
||||
<format>TabSeparated</format>
|
||||
<command>while read read_data; do printf "data a\tdata b\n"; done</command>
|
||||
<size>5</size>
|
||||
<implicit_key>true</implicit_key>
|
||||
</executable_pool>
|
||||
</source>
|
||||
|
||||
<layout>
|
||||
<complex_key_direct />
|
||||
</layout>
|
||||
|
||||
<lifetime>300</lifetime>
|
||||
</dictionary>
|
||||
|
||||
</dictionaries>
|
@ -45,6 +45,7 @@ ln -sf $SRC_PATH/ints_dictionary.xml $DEST_SERVER_PATH/
|
||||
ln -sf $SRC_PATH/strings_dictionary.xml $DEST_SERVER_PATH/
|
||||
ln -sf $SRC_PATH/decimals_dictionary.xml $DEST_SERVER_PATH/
|
||||
ln -sf $SRC_PATH/executable_dictionary.xml $DEST_SERVER_PATH/
|
||||
ln -sf $SRC_PATH/executable_pool_dictionary.xml $DEST_SERVER_PATH/
|
||||
|
||||
ln -sf $SRC_PATH/top_level_domains $DEST_SERVER_PATH/
|
||||
|
||||
|
@ -0,0 +1,10 @@
|
||||
executable_pool_simple
|
||||
1 a
|
||||
1 b
|
||||
2 a
|
||||
2 b
|
||||
executable_pool_complex
|
||||
value a
|
||||
value b
|
||||
value a
|
||||
value b
|
@ -0,0 +1,15 @@
|
||||
SELECT 'executable_pool_simple';
|
||||
|
||||
SELECT dictGet('executable_pool_simple', 'a', toUInt64(1));
|
||||
SELECT dictGet('executable_pool_simple', 'b', toUInt64(1));
|
||||
|
||||
SELECT dictGet('executable_pool_simple', 'a', toUInt64(2));
|
||||
SELECT dictGet('executable_pool_simple', 'b', toUInt64(2));
|
||||
|
||||
SELECT 'executable_pool_complex';
|
||||
|
||||
SELECT dictGet('executable_pool_complex', 'a', ('First_1', 'Second_1'));
|
||||
SELECT dictGet('executable_pool_complex', 'b', ('First_1', 'Second_1'));
|
||||
|
||||
SELECT dictGet('executable_pool_complex', 'a', ('First_2', 'Second_2'));
|
||||
SELECT dictGet('executable_pool_complex', 'b', ('First_2', 'Second_2'));
|
@ -0,0 +1,10 @@
|
||||
executable_pool_simple_implicit_key
|
||||
1 a
|
||||
1 b
|
||||
2 a
|
||||
2 b
|
||||
executable_pool_complex_implicit_key
|
||||
data a
|
||||
data b
|
||||
data a
|
||||
data b
|
@ -0,0 +1,15 @@
|
||||
SELECT 'executable_pool_simple_implicit_key';
|
||||
|
||||
SELECT dictGet('executable_pool_simple_implicit_key', 'a', toUInt64(1));
|
||||
SELECT dictGet('executable_pool_simple_implicit_key', 'b', toUInt64(1));
|
||||
|
||||
SELECT dictGet('executable_pool_simple_implicit_key', 'a', toUInt64(2));
|
||||
SELECT dictGet('executable_pool_simple_implicit_key', 'b', toUInt64(2));
|
||||
|
||||
SELECT 'executable_pool_complex_implicit_key';
|
||||
|
||||
SELECT dictGet('executable_pool_complex_implicit_key', 'a', ('First_1', 'Second_1'));
|
||||
SELECT dictGet('executable_pool_complex_implicit_key', 'b', ('First_1', 'Second_1'));
|
||||
|
||||
SELECT dictGet('executable_pool_complex_implicit_key', 'a', ('First_2', 'Second_2'));
|
||||
SELECT dictGet('executable_pool_complex_implicit_key', 'b', ('First_2', 'Second_2'));
|
@ -747,10 +747,7 @@ def verification_cooldown_performance(self, server, rbac=False, iterations=5000)
|
||||
no_vcd_time = repeat_requests(server=server, iterations=iterations, vcd_value="0", rbac=rbac)
|
||||
metric("login_with_vcd_value_0", units="seconds", value=no_vcd_time)
|
||||
|
||||
with Then("The performance with verification cooldown parameter set is better than the performance with no verification cooldown parameter."):
|
||||
assert no_vcd_time > vcd_time, error()
|
||||
|
||||
with And("Log the performance improvement as a percentage."):
|
||||
with Then("Log the performance improvement as a percentage"):
|
||||
metric("percentage_improvement", units="%", value=100*(no_vcd_time - vcd_time)/vcd_time)
|
||||
|
||||
@TestOutline
|
||||
|
@ -50,5 +50,57 @@
|
||||
[0.024, 0.015, 0.021],
|
||||
[0.007, 0.013, 0.006]
|
||||
]
|
||||
},
|
||||
{
|
||||
"system": "AMD Ryzen 9",
|
||||
"system_full": "AMD Ryzen 9 3950X 16-Core Processor, 64 GiB RAM, Samsung evo 970 plus 1TB",
|
||||
"time": "2021-03-08 00:00:00",
|
||||
"kind": "desktop",
|
||||
"result":
|
||||
[
|
||||
[0.002, 0.002, 0.002],
|
||||
[0.013, 0.011, 0.010],
|
||||
[0.030, 0.023, 0.023],
|
||||
[0.071, 0.033, 0.031],
|
||||
[0.090, 0.068, 0.066],
|
||||
[0.165, 0.137, 0.137],
|
||||
[0.015, 0.014, 0.015],
|
||||
[0.013, 0.012, 0.012],
|
||||
[0.317, 0.268, 0.261],
|
||||
[0.337, 0.303, 0.301],
|
||||
[0.108, 0.089, 0.091],
|
||||
[0.127, 0.110, 0.114],
|
||||
[0.714, 0.690, 0.643],
|
||||
[0.888, 0.835, 0.809],
|
||||
[0.748, 0.727, 0.704],
|
||||
[0.666, 0.653, 0.652],
|
||||
[1.868, 1.821, 1.826],
|
||||
[1.007, 0.958, 0.957],
|
||||
[4.466, 4.371, 4.377],
|
||||
[0.074, 0.027, 0.027],
|
||||
[0.748, 0.326, 0.307],
|
||||
[0.844, 0.427, 0.408],
|
||||
[2.040, 1.545, 1.552],
|
||||
[1.392, 0.609, 0.560],
|
||||
[0.237, 0.155, 0.142],
|
||||
[0.140, 0.112, 0.114],
|
||||
[0.233, 0.151, 0.146],
|
||||
[0.790, 0.567, 0.545],
|
||||
[0.981, 0.751, 0.752],
|
||||
[3.532, 3.522, 3.516],
|
||||
[0.505, 0.478, 0.456],
|
||||
[1.078, 0.959, 0.959],
|
||||
[5.653, 5.600, 5.570],
|
||||
[3.572, 3.399, 3.405],
|
||||
[3.619, 3.445, 3.429],
|
||||
[1.176, 1.174, 1.165],
|
||||
[0.140, 0.127, 0.124],
|
||||
[0.054, 0.052, 0.052],
|
||||
[0.052, 0.049, 0.048],
|
||||
[0.275, 0.265, 0.265],
|
||||
[0.025, 0.024, 0.025],
|
||||
[0.020, 0.021, 0.019],
|
||||
[0.006, 0.005, 0.005]
|
||||
]
|
||||
}
|
||||
]
|
||||
|
Loading…
Reference in New Issue
Block a user