Added ExecutablePool storage

This commit is contained in:
Maksim Kita 2021-09-02 14:53:20 +03:00
parent befb82e441
commit abda2a636e
12 changed files with 435 additions and 185 deletions

View File

@ -3,8 +3,11 @@
#include <memory>
#include <common/logger_useful.h>
#include <common/BorrowedObjectPool.h>
#include <Common/ShellCommand.h>
#include <Common/ThreadPool.h>
#include <IO/ReadHelpers.h>
#include <Formats/FormatFactory.h>
#include <Processors/ISimpleTransform.h>
@ -17,8 +20,9 @@
namespace DB
{
/** A stream, that runs child process and sends data to its stdin in background thread,
* and receives data from its stdout.
/** A stream, that get child process and sends data tasks.
* For each send data task background thread is created, send data tasks must send data to process input pipes.
* ShellCommandSource receives data from process stdout.
*/
class ShellCommandSource final : public SourceWithProgress
{
@ -29,7 +33,7 @@ public:
ContextPtr context,
const std::string & format,
const Block & sample_block,
std::unique_ptr<ShellCommand> command_,
std::unique_ptr<ShellCommand> && command_,
Poco::Logger * log_,
std::vector<SendDataTask> && send_data_tasks,
size_t max_block_size = DEFAULT_BLOCK_SIZE)
@ -48,7 +52,7 @@ public:
ContextPtr context,
const std::string & format,
const Block & sample_block,
std::unique_ptr<ShellCommand> command_,
std::unique_ptr<ShellCommand> && command_,
Poco::Logger * log_,
size_t max_block_size = DEFAULT_BLOCK_SIZE)
: SourceWithProgress(sample_block)
@ -107,4 +111,173 @@ private:
Poco::Logger * log;
};
/** A stream, that get child process and sends data tasks.
* For each send data task background thread is created, send data tasks must send data to process input pipes.
* ShellCommandPoolSource receives data from process stdout.
*
* Main difference with ShellCommandSource is that ShellCommandPoolSource initialized with process_pool and rows_to_read.
* Rows to read are necessary because processes in pool are not destroyed and work in read write loop.
* Source need to finish generating new chunks after rows_to_read rows are generated from process.
*
* If rows_to_read are not specified it is expected that script will output rows_to_read before other data.
*
* After source is destroyed process is returned to pool.
*/
using ProcessPool = BorrowedObjectPool<std::unique_ptr<ShellCommand>>;
class ShellCommandPoolSource final : public SourceWithProgress
{
public:
using SendDataTask = std::function<void(void)>;
ShellCommandPoolSource(
ContextPtr context,
const std::string & format,
const Block & sample_block,
std::shared_ptr<ProcessPool> process_pool_,
std::unique_ptr<ShellCommand> && command_,
size_t rows_to_read_,
Poco::Logger * log_,
std::vector<SendDataTask> && send_data_tasks)
: SourceWithProgress(sample_block)
, process_pool(process_pool_)
, command(std::move(command_))
, rows_to_read(rows_to_read_)
, log(log_)
{
for (auto && send_data_task : send_data_tasks)
{
send_data_threads.emplace_back([task = std::move(send_data_task), this]()
{
try
{
task();
}
catch (...)
{
std::lock_guard<std::mutex> lock(send_data_lock);
exception_during_send_data = std::current_exception();
}
});
}
pipeline.init(Pipe(FormatFactory::instance().getInput(format, command->out, sample_block, context, rows_to_read)));
executor = std::make_unique<PullingPipelineExecutor>(pipeline);
}
ShellCommandPoolSource(
ContextPtr context,
const std::string & format,
const Block & sample_block,
std::shared_ptr<ProcessPool> process_pool_,
std::unique_ptr<ShellCommand> && command_,
Poco::Logger * log_,
std::vector<SendDataTask> && send_data_tasks)
: SourceWithProgress(sample_block)
, process_pool(process_pool_)
, command(std::move(command_))
, log(log_)
{
for (auto && send_data_task : send_data_tasks)
{
send_data_threads.emplace_back([task = std::move(send_data_task), this]()
{
try
{
task();
}
catch (...)
{
std::lock_guard<std::mutex> lock(send_data_lock);
exception_during_send_data = std::current_exception();
}
});
}
readText(rows_to_read, command->out);
pipeline.init(Pipe(FormatFactory::instance().getInput(format, command->out, sample_block, context, rows_to_read)));
executor = std::make_unique<PullingPipelineExecutor>(pipeline);
}
~ShellCommandPoolSource() override
{
for (auto & thread : send_data_threads)
if (thread.joinable())
thread.join();
if (command)
process_pool->returnObject(std::move(command));
}
protected:
Chunk generate() override
{
rethrowExceptionDuringReadIfNeeded();
if (current_read_rows == rows_to_read)
return {};
Chunk chunk;
try
{
if (!executor->pull(chunk))
return {};
current_read_rows += chunk.getNumRows();
}
catch (...)
{
tryLogCurrentException(log);
command = nullptr;
throw;
}
return chunk;
}
public:
Status prepare() override
{
auto status = SourceWithProgress::prepare();
if (status == Status::Finished)
{
for (auto & thread : send_data_threads)
if (thread.joinable())
thread.join();
rethrowExceptionDuringReadIfNeeded();
}
return status;
}
void rethrowExceptionDuringReadIfNeeded()
{
std::lock_guard<std::mutex> lock(send_data_lock);
if (exception_during_send_data)
{
command = nullptr;
std::rethrow_exception(exception_during_send_data);
}
}
String getName() const override { return "ShellCommandPoolSource"; }
std::shared_ptr<ProcessPool> process_pool;
std::unique_ptr<ShellCommand> command;
QueryPipeline pipeline;
std::unique_ptr<PullingPipelineExecutor> executor;
size_t rows_to_read = 0;
Poco::Logger * log;
std::vector<ThreadFromGlobalPool> send_data_threads;
size_t current_read_rows = 0;
std::mutex send_data_lock;
std::exception_ptr exception_during_send_data;
};
}

View File

@ -1,6 +1,5 @@
#include "ExecutableDictionarySource.h"
#include <functional>
#include <common/logger_useful.h>
#include <common/LocalDateTime.h>
#include <Common/ShellCommand.h>
@ -125,8 +124,7 @@ Pipe ExecutableDictionarySource::getStreamForBlock(const Block & block)
formatBlock(output_stream, block);
out.close();
}};
std::vector<ShellCommandSource::SendDataTask> tasks = {task};
std::vector<ShellCommandSource::SendDataTask> tasks = {std::move(task)};
Pipe pipe(std::make_unique<ShellCommandSource>(context, configuration.format, sample_block, std::move(process), log, std::move(tasks)));

View File

@ -1,11 +1,12 @@
#pragma once
#include "DictionaryStructure.h"
#include "IDictionarySource.h"
#include <common/logger_useful.h>
#include <Core/Block.h>
#include <Interpreters/Context.h>
namespace Poco { class Logger; }
#include <Dictionaries/IDictionarySource.h>
#include <Dictionaries/DictionaryStructure.h>
namespace DB

View File

@ -1,24 +1,21 @@
#include "ExecutablePoolDictionarySource.h"
#include <functional>
#include <common/scope_guard.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/QueryPipeline.h>
#include <common/logger_useful.h>
#include <common/LocalDateTime.h>
#include <Common/ShellCommand.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <DataStreams/formatBlock.h>
#include <Interpreters/Context.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/IInputFormat.h>
#include <Common/ShellCommand.h>
#include <Common/ThreadPool.h>
#include <common/logger_useful.h>
#include <common/LocalDateTime.h>
#include "DictionarySourceFactory.h"
#include "DictionarySourceHelpers.h"
#include "DictionaryStructure.h"
#include "registerDictionaries.h"
#include <Dictionaries/DictionarySourceFactory.h>
#include <Dictionaries/DictionarySourceHelpers.h>
#include <Dictionaries/DictionaryStructure.h>
namespace DB
@ -37,13 +34,13 @@ ExecutablePoolDictionarySource::ExecutablePoolDictionarySource(
const Configuration & configuration_,
Block & sample_block_,
ContextPtr context_)
: log(&Poco::Logger::get("ExecutablePoolDictionarySource"))
, dict_struct{dict_struct_}
, configuration{configuration_}
, sample_block{sample_block_}
, context{context_}
: dict_struct(dict_struct_)
, configuration(configuration_)
, sample_block(sample_block_)
, context(context_)
/// If pool size == 0 then there is no size restrictions. Poco max size of semaphore is integer type.
, process_pool{std::make_shared<ProcessPool>(configuration.pool_size == 0 ? std::numeric_limits<int>::max() : configuration.pool_size)}
, process_pool(std::make_shared<ProcessPool>(configuration.pool_size == 0 ? std::numeric_limits<int>::max() : configuration.pool_size))
, log(&Poco::Logger::get("ExecutablePoolDictionarySource"))
{
/// Remove keys from sample_block for implicit_key dictionary because
/// these columns will not be returned from source
@ -62,13 +59,12 @@ ExecutablePoolDictionarySource::ExecutablePoolDictionarySource(
}
ExecutablePoolDictionarySource::ExecutablePoolDictionarySource(const ExecutablePoolDictionarySource & other)
: log(&Poco::Logger::get("ExecutablePoolDictionarySource"))
, update_time{other.update_time}
, dict_struct{other.dict_struct}
, configuration{other.configuration}
, sample_block{other.sample_block}
, context{Context::createCopy(other.context)}
, process_pool{std::make_shared<ProcessPool>(configuration.pool_size)}
: dict_struct(other.dict_struct)
, configuration(other.configuration)
, sample_block(other.sample_block)
, context(Context::createCopy(other.context))
, process_pool(std::make_shared<ProcessPool>(configuration.pool_size))
, log(&Poco::Logger::get("ExecutablePoolDictionarySource"))
{
}
@ -82,123 +78,6 @@ Pipe ExecutablePoolDictionarySource::loadUpdatedAll()
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutablePoolDictionarySource does not support loadUpdatedAll method");
}
namespace
{
/** A stream, that runs child process and sends data to its stdin in background thread,
* and receives data from its stdout.
*/
class PoolSourceWithBackgroundThread final : public SourceWithProgress
{
public:
PoolSourceWithBackgroundThread(
std::shared_ptr<ProcessPool> process_pool_,
std::unique_ptr<ShellCommand> && command_,
Pipe pipe,
size_t read_rows_,
Poco::Logger * log_,
std::function<void(WriteBufferFromFile &)> && send_data_)
: SourceWithProgress(pipe.getHeader())
, process_pool(process_pool_)
, command(std::move(command_))
, rows_to_read(read_rows_)
, log(log_)
, send_data(std::move(send_data_))
, thread([this]
{
try
{
send_data(command->in);
}
catch (...)
{
std::lock_guard<std::mutex> lck(exception_during_read_lock);
exception_during_read = std::current_exception();
}
})
{
pipeline.init(std::move(pipe));
executor = std::make_unique<PullingPipelineExecutor>(pipeline);
}
~PoolSourceWithBackgroundThread() override
{
if (thread.joinable())
thread.join();
if (command)
process_pool->returnObject(std::move(command));
}
protected:
Chunk generate() override
{
rethrowExceptionDuringReadIfNeeded();
if (current_read_rows == rows_to_read)
return {};
Chunk chunk;
try
{
if (!executor->pull(chunk))
return {};
current_read_rows += chunk.getNumRows();
}
catch (...)
{
tryLogCurrentException(log);
command = nullptr;
throw;
}
return chunk;
}
public:
Status prepare() override
{
auto status = SourceWithProgress::prepare();
if (status == Status::Finished)
{
if (thread.joinable())
thread.join();
rethrowExceptionDuringReadIfNeeded();
}
return status;
}
void rethrowExceptionDuringReadIfNeeded()
{
std::lock_guard<std::mutex> lck(exception_during_read_lock);
if (exception_during_read)
{
command = nullptr;
std::rethrow_exception(exception_during_read);
}
}
String getName() const override { return "PoolWithBackgroundThread"; }
std::shared_ptr<ProcessPool> process_pool;
std::unique_ptr<ShellCommand> command;
QueryPipeline pipeline;
std::unique_ptr<PullingPipelineExecutor> executor;
size_t rows_to_read;
Poco::Logger * log;
std::function<void(WriteBufferFromFile &)> send_data;
ThreadFromGlobalPool thread;
size_t current_read_rows = 0;
std::mutex exception_during_read_lock;
std::exception_ptr exception_during_read;
};
}
Pipe ExecutablePoolDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
LOG_TRACE(log, "loadIds {} size = {}", toString(), ids.size());
@ -228,19 +107,20 @@ Pipe ExecutablePoolDictionarySource::getStreamForBlock(const Block & block)
if (!result)
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED,
"Could not get process from pool, max command execution timeout exceeded ({}) seconds",
"Could not get process from pool, max command execution timeout exceeded {} seconds",
configuration.max_command_execution_time);
size_t rows_to_read = block.rows();
auto format = FormatFactory::instance().getInput(configuration.format, process->out, sample_block, context, rows_to_read);
auto * process_in = &process->in;
ShellCommandPoolSource::SendDataTask task = [process_in, block, this]() mutable
{
auto & out = *process_in;
auto output_stream = context->getOutputStream(configuration.format, out, block.cloneEmpty());
formatBlock(output_stream, block);
};
std::vector<ShellCommandPoolSource::SendDataTask> tasks = {std::move(task)};
Pipe pipe(std::make_unique<PoolSourceWithBackgroundThread>(
process_pool, std::move(process), Pipe(std::move(format)), rows_to_read, log,
[block, this](WriteBufferFromFile & out) mutable
{
auto output_stream = context->getOutputStream(configuration.format, out, block.cloneEmpty());
formatBlock(output_stream, block);
}));
Pipe pipe(std::make_unique<ShellCommandPoolSource>(context, configuration.format, sample_block, process_pool, std::move(process), rows_to_read, log, std::move(tasks)));
if (configuration.implicit_key)
pipe.addTransform(std::make_shared<TransformWithAdditionalColumns>(block, pipe.getHeader()));

View File

@ -1,20 +1,18 @@
#pragma once
#include <common/BorrowedObjectPool.h>
#include <common/logger_useful.h>
#include <Core/Block.h>
#include <Interpreters/Context.h>
#include "IDictionarySource.h"
#include "DictionaryStructure.h"
namespace Poco { class Logger; }
#include <Dictionaries/IDictionarySource.h>
#include <Dictionaries/DictionaryStructure.h>
#include <DataStreams/ShellCommandSource.h>
namespace DB
{
using ProcessPool = BorrowedObjectPool<std::unique_ptr<ShellCommand>>;
/** ExecutablePoolDictionarySource allows loading data from pool of processes.
* When client requests ids or keys source get process from ProcessPool
@ -73,14 +71,13 @@ public:
Pipe getStreamForBlock(const Block & block);
private:
Poco::Logger * log;
time_t update_time = 0;
const DictionaryStructure dict_struct;
const Configuration configuration;
Block sample_block;
ContextPtr context;
std::shared_ptr<ProcessPool> process_pool;
Poco::Logger * log;
};
}

View File

@ -0,0 +1,42 @@
#include "ExecutablePoolSettings.h"
#include <Common/Exception.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTFunction.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_SETTING;
}
IMPLEMENT_SETTINGS_TRAITS(ExecutablePoolSettingsTraits, LIST_OF_EXECUTABLE_POOL_SETTINGS);
void ExecutablePoolSettings::loadFromQuery(ASTStorage & storage_def)
{
if (storage_def.settings)
{
try
{
applyChanges(storage_def.settings->changes);
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::UNKNOWN_SETTING)
e.addMessage("for storage " + storage_def.engine->name);
throw;
}
}
else
{
auto settings_ast = std::make_shared<ASTSetQuery>();
settings_ast->is_standalone = false;
storage_def.set(storage_def.settings, settings_ast);
}
}
}

View File

@ -0,0 +1,24 @@
#pragma once
#include <Core/Defines.h>
#include <Core/BaseSettings.h>
namespace DB
{
class ASTStorage;
#define LIST_OF_EXECUTABLE_POOL_SETTINGS(M) \
M(UInt64, pool_size, 16, "Processes pool size. If size == 0, then there is no size restrictions", 0) \
M(UInt64, max_command_execution_time, 10, "Max command execution time in seconds.", 0) \
M(UInt64, command_termination_timeout, 10, "Command termination timeout in seconds.", 0) \
DECLARE_SETTINGS_TRAITS(ExecutablePoolSettingsTraits, LIST_OF_EXECUTABLE_POOL_SETTINGS)
/// Settings for ExecutablePool engine.
struct ExecutablePoolSettings : public BaseSettings<ExecutablePoolSettingsTraits>
{
void loadFromQuery(ASTStorage & storage_def);
};
}

View File

@ -4,16 +4,18 @@
#include <Common/ShellCommand.h>
#include <Core/Block.h>
#include <IO/ReadHelpers.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <DataStreams/IBlockInputStream.h>
#include <Processors/Pipe.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Storages/StorageFactory.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/ShellCommandSource.h>
namespace DB
@ -24,6 +26,7 @@ namespace ErrorCodes
extern const int UNSUPPORTED_METHOD;
extern const int LOGICAL_ERROR;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int TIMEOUT_EXCEEDED;
}
StorageExecutable::StorageExecutable(
@ -47,6 +50,31 @@ StorageExecutable::StorageExecutable(
setInMemoryMetadata(storage_metadata);
}
StorageExecutable::StorageExecutable(
const StorageID & table_id_,
const String & script_name_,
const std::vector<String> & arguments_,
const String & format_,
const std::vector<ASTPtr> & input_queries_,
const ExecutablePoolSettings & pool_settings_,
const ColumnsDescription & columns,
const ConstraintsDescription & constraints)
: IStorage(table_id_)
, script_name(script_name_)
, arguments(arguments_)
, format(format_)
, input_queries(input_queries_)
, pool_settings(pool_settings_)
/// If pool size == 0 then there is no size restrictions. Poco max size of semaphore is integer type.
, process_pool(std::make_shared<ProcessPool>(pool_settings.pool_size == 0 ? std::numeric_limits<int>::max() : pool_settings.pool_size))
, log(&Poco::Logger::get("StorageExecutablePool"))
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns);
storage_metadata.setConstraints(constraints);
setInMemoryMetadata(storage_metadata);
}
Pipe StorageExecutable::read(
const Names & /*column_names*/,
const StorageMetadataPtr & metadata_snapshot,
@ -56,6 +84,8 @@ Pipe StorageExecutable::read(
size_t max_block_size,
unsigned /*threads*/)
{
std::cerr << getName() << "::read" << std::endl;
auto user_scripts_path = context->getUserScriptsPath();
auto script_path = user_scripts_path + '/' + script_name;
if (!std::filesystem::exists(std::filesystem::path(script_path)))
@ -79,7 +109,27 @@ Pipe StorageExecutable::read(
for (size_t i = 1; i < inputs.size(); ++i)
config.write_fds.emplace_back(i + 2);
auto process = ShellCommand::executeDirect(config);
std::unique_ptr<ShellCommand> process;
if (process_pool)
{
std::cerr << getName() <<"::read create process" << std::endl;
bool result = process_pool->tryBorrowObject(process, [&config, this]()
{
config.terminate_in_destructor_strategy = ShellCommand::DestructorStrategy{ true /*terminate_in_destructor*/, pool_settings.command_termination_timeout };
auto shell_command = ShellCommand::execute(config);
return shell_command;
}, pool_settings.max_command_execution_time * 10000);
if (!result)
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED,
"Could not get process from pool, max command execution timeout exceeded {} seconds",
pool_settings.max_command_execution_time);
}
else
{
process = ShellCommand::executeDirect(config);
}
std::vector<ShellCommandSource::SendDataTask> tasks;
tasks.reserve(inputs.size());
@ -123,13 +173,22 @@ Pipe StorageExecutable::read(
}
auto sample_block = metadata_snapshot->getSampleBlock();
Pipe pipe(std::make_unique<ShellCommandSource>(context, format, sample_block, std::move(process), log, std::move(tasks), max_block_size));
return pipe;
if (process_pool)
{
Pipe pipe(std::make_unique<ShellCommandPoolSource>(context, format, std::move(sample_block), process_pool, std::move(process), log, std::move(tasks)));
return pipe;
}
else
{
Pipe pipe(std::make_unique<ShellCommandSource>(context, format, std::move(sample_block), std::move(process), log, std::move(tasks), max_block_size));
return pipe;
}
}
void registerStorageExecutable(StorageFactory & factory)
{
factory.registerStorage("Executable", [](const StorageFactory::Arguments & args)
auto register_storage = [](const StorageFactory::Arguments & args, bool is_executable_pool) -> StoragePtr
{
auto local_context = args.getLocalContext();
@ -143,7 +202,7 @@ void registerStorageExecutable(StorageFactory & factory)
auto scipt_name_with_arguments_value = args.engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
std::vector<String> script_name_with_arguments;
boost::split(script_name_with_arguments, scipt_name_with_arguments_value, [](char c){ return c == ' '; });
boost::split(script_name_with_arguments, scipt_name_with_arguments_value, [](char c) { return c == ' '; });
auto script_name = script_name_with_arguments[0];
script_name_with_arguments.erase(script_name_with_arguments.begin());
@ -154,8 +213,8 @@ void registerStorageExecutable(StorageFactory & factory)
{
ASTPtr query = args.engine_args[i]->children.at(0);
if (!query->as<ASTSelectWithUnionQuery>())
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"StorageExecutable argument is invalid input query {}",
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD, "StorageExecutable argument is invalid input query {}",
query->formatForErrorMessage());
input_queries.emplace_back(std::move(query));
@ -164,7 +223,35 @@ void registerStorageExecutable(StorageFactory & factory)
const auto & columns = args.columns;
const auto & constraints = args.constraints;
return StorageExecutable::create(args.table_id, script_name, script_name_with_arguments, format, input_queries, columns, constraints);
if (is_executable_pool)
{
size_t max_command_execution_time = 10;
size_t max_execution_time_seconds = static_cast<size_t>(args.getContext()->getSettings().max_execution_time.totalSeconds());
if (max_execution_time_seconds != 0 && max_command_execution_time > max_execution_time_seconds)
max_command_execution_time = max_execution_time_seconds;
ExecutablePoolSettings pool_settings;
pool_settings.max_command_execution_time = max_command_execution_time;
if (args.storage_def->settings)
pool_settings.loadFromQuery(*args.storage_def);
return StorageExecutable::create(args.table_id, script_name, script_name_with_arguments, format, input_queries, pool_settings, columns, constraints);
}
else
{
return StorageExecutable::create(args.table_id, script_name, script_name_with_arguments, format, input_queries, columns, constraints);
}
};
factory.registerStorage("Executable", [&](const StorageFactory::Arguments & args)
{
return register_storage(args, false /*is_executable_pool*/);
});
factory.registerStorage("ExecutablePool", [&](const StorageFactory::Arguments & args)
{
return register_storage(args, true /*is_executable_pool*/);
});
}

View File

@ -3,19 +3,30 @@
#include <common/logger_useful.h>
#include <common/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <IO/CompressionMethod.h>
#include <DataStreams/ShellCommandSource.h>
#include <Storages/ExecutablePoolSettings.h>
namespace DB
{
/**
* This class represents table engine for external executable files.
* Executable storage that will start process for read.
* ExecutablePool storage maintain pool of processes and take process from pool for read.
*/
class StorageExecutable final : public shared_ptr_helper<StorageExecutable>, public IStorage
{
friend struct shared_ptr_helper<StorageExecutable>;
public:
String getName() const override { return "Executable"; }
String getName() const override
{
if (process_pool)
return "ExecutablePool";
else
return "Executable";
}
Pipe read(
const Names & column_names,
@ -36,12 +47,24 @@ protected:
const ColumnsDescription & columns,
const ConstraintsDescription & constraints);
StorageExecutable(
const StorageID & table_id,
const String & script_name_,
const std::vector<String> & arguments_,
const String & format_,
const std::vector<ASTPtr> & input_queries_,
const ExecutablePoolSettings & pool_settings_,
const ColumnsDescription & columns,
const ConstraintsDescription & constraints);
private:
String script_name;
std::vector<String> arguments;
String format;
std::vector<ASTPtr> input_queries;
ExecutablePoolSettings pool_settings;
std::shared_ptr<ProcessPool> process_pool;
Poco::Logger * log;
};
}
}

View File

@ -55,3 +55,15 @@ def test_executable_storage_argument(started_cluster):
node.query("CREATE TABLE test_table (value String) ENGINE=Executable('test_argument.sh 1', 'TabSeparated')")
assert node.query("SELECT * FROM test_table") == 'Key 1\n'
node.query("DROP TABLE test_table")
def test_executable_pool_storage(started_cluster):
node.query("DROP TABLE IF EXISTS test_table")
node.query("CREATE TABLE test_table (value String) ENGINE=ExecutablePool('test_input_process_pool.sh', 'TabSeparated', (SELECT 1))")
assert node.query("SELECT * FROM test_table") == 'Key 1\n'
node.query("DROP TABLE test_table")
def test_executable_pool_storage_multiple_pipes(started_cluster):
node.query("DROP TABLE IF EXISTS test_table")
node.query("CREATE TABLE test_table (value String) ENGINE=ExecutablePool('test_input_process_pool_multiple_pipes.sh', 'TabSeparated', (SELECT 1), (SELECT 2), (SELECT 3))")
assert node.query("SELECT * FROM test_table") == 'Key from 4 fd 3\nKey from 3 fd 2\nKey from 0 fd 1\n'
node.query("DROP TABLE test_table")

View File

@ -0,0 +1,3 @@
#!/bin/bash
while read read_data; do printf '1'; printf "Key $read_data\n"; done

View File

@ -0,0 +1,10 @@
#!/bin/bash
read -t 250 -u 4 read_data_from_4_fd;
read -t 250 -u 3 read_data_from_3_fd;
read -t 250 read_data_from_0_df;
printf '3';
printf "Key from 4 fd $read_data_from_4_fd\n";
printf "Key from 3 fd $read_data_from_3_fd\n";
printf "Key from 0 fd $read_data_from_0_df\n";