Updated ShellCommandSource

This commit is contained in:
Maksim Kita 2021-12-28 12:43:30 +03:00
parent 41437b72f1
commit d1db3c9f42
12 changed files with 67 additions and 80 deletions

View File

@ -603,6 +603,7 @@
M(632, UNEXPECTED_DATA_AFTER_PARSED_VALUE) \
M(633, QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW) \
M(634, MONGODB_ERROR) \
M(635, CANNOT_POLL) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -66,7 +66,7 @@ ExecutableDictionarySource::ExecutableDictionarySource(
const DictionaryStructure & dict_struct_,
const Configuration & configuration_,
Block & sample_block_,
std::shared_ptr<ShellCommandCoordinator> coordinator_,
std::shared_ptr<ShellCommandSourceCoordinator> coordinator_,
ContextPtr context_)
: log(&Poco::Logger::get("ExecutableDictionarySource"))
, dict_struct(dict_struct_)
@ -166,29 +166,9 @@ Pipe ExecutableDictionarySource::loadKeys(const Columns & key_columns, const std
Pipe ExecutableDictionarySource::getStreamForBlock(const Block & block)
{
String command = configuration.command;
const auto & coordinator_configuration = coordinator->getConfiguration();
if (coordinator_configuration.execute_direct)
{
auto global_context = context->getGlobalContext();
auto user_scripts_path = global_context->getUserScriptsPath();
auto script_path = user_scripts_path + '/' + command;
if (!fileOrSymlinkPathStartsWith(script_path, user_scripts_path))
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Executable file {} must be inside user scripts folder {}",
command,
user_scripts_path);
if (!std::filesystem::exists(std::filesystem::path(script_path)))
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Executable file {} does not exist inside user scripts folder {}",
command,
user_scripts_path);
command = std::move(script_path);
}
String command = configuration.command;
updateCommandIfNeeded(command, coordinator_configuration.execute_direct, context);
auto source = std::make_shared<SourceFromSingleChunk>(block);
auto shell_input_pipe = Pipe(std::move(source));
@ -273,7 +253,7 @@ void registerDictionarySourceExecutable(DictionarySourceFactory & factory)
.implicit_key = config.getBool(settings_config_prefix + ".implicit_key", false),
};
ShellCommandCoordinator::Configuration shell_command_coordinator_configration
ShellCommandSourceCoordinator::Configuration shell_command_coordinator_configration
{
.format = config.getString(settings_config_prefix + ".format"),
.command_termination_timeout_seconds = config.getUInt64(settings_config_prefix + ".command_termination_timeout", 10),
@ -284,7 +264,7 @@ void registerDictionarySourceExecutable(DictionarySourceFactory & factory)
.execute_direct = config.getBool(settings_config_prefix + ".execute_direct", false)
};
std::shared_ptr<ShellCommandCoordinator> coordinator = std::make_shared<ShellCommandCoordinator>(shell_command_coordinator_configration);
auto coordinator = std::make_shared<ShellCommandSourceCoordinator>(shell_command_coordinator_configration);
return std::make_unique<ExecutableDictionarySource>(dict_struct, configuration, sample_block, std::move(coordinator), context);
};

View File

@ -33,7 +33,7 @@ public:
const DictionaryStructure & dict_struct_,
const Configuration & configuration_,
Block & sample_block_,
std::shared_ptr<ShellCommandCoordinator> coordinator_,
std::shared_ptr<ShellCommandSourceCoordinator> coordinator_,
ContextPtr context_);
ExecutableDictionarySource(const ExecutableDictionarySource & other);
@ -69,7 +69,7 @@ private:
const DictionaryStructure dict_struct;
const Configuration configuration;
Block sample_block;
std::shared_ptr<ShellCommandCoordinator> coordinator;
std::shared_ptr<ShellCommandSourceCoordinator> coordinator;
ContextPtr context;
};

View File

@ -35,7 +35,7 @@ ExecutablePoolDictionarySource::ExecutablePoolDictionarySource(
const DictionaryStructure & dict_struct_,
const Configuration & configuration_,
Block & sample_block_,
std::shared_ptr<ShellCommandCoordinator> coordinator_,
std::shared_ptr<ShellCommandSourceCoordinator> coordinator_,
ContextPtr context_)
: dict_struct(dict_struct_)
, configuration(configuration_)
@ -220,7 +220,7 @@ void registerDictionarySourceExecutablePool(DictionarySourceFactory & factory)
.implicit_key = config.getBool(settings_config_prefix + ".implicit_key", false),
};
ShellCommandCoordinator::Configuration shell_command_coordinator_configration
ShellCommandSourceCoordinator::Configuration shell_command_coordinator_configration
{
.format = config.getString(settings_config_prefix + ".format"),
.command_termination_timeout_seconds = config.getUInt64(settings_config_prefix + ".command_termination_timeout", 10),
@ -233,7 +233,7 @@ void registerDictionarySourceExecutablePool(DictionarySourceFactory & factory)
.execute_direct = execute_direct
};
std::shared_ptr<ShellCommandCoordinator> coordinator = std::make_shared<ShellCommandCoordinator>(shell_command_coordinator_configration);
auto coordinator = std::make_shared<ShellCommandSourceCoordinator>(shell_command_coordinator_configration);
return std::make_unique<ExecutablePoolDictionarySource>(dict_struct, configuration, sample_block, std::move(coordinator), context);
};

View File

@ -36,7 +36,7 @@ public:
const DictionaryStructure & dict_struct_,
const Configuration & configuration_,
Block & sample_block_,
std::shared_ptr<ShellCommandCoordinator> coordinator_,
std::shared_ptr<ShellCommandSourceCoordinator> coordinator_,
ContextPtr context_);
ExecutablePoolDictionarySource(const ExecutablePoolDictionarySource & other);
@ -71,7 +71,7 @@ private:
const Configuration configuration;
Block sample_block;
std::shared_ptr<ShellCommandCoordinator> coordinator;
std::shared_ptr<ShellCommandSourceCoordinator> coordinator;
ContextPtr context;
Poco::Logger * log;
};

View File

@ -57,7 +57,7 @@ ExternalLoader::LoadablePtr ExternalUserDefinedExecutableFunctionsLoader::create
String type = config.getString(key_in_config + ".type");
bool is_executable_pool;
bool is_executable_pool = false;
if (type == "executable")
is_executable_pool = false;
@ -130,7 +130,7 @@ ExternalLoader::LoadablePtr ExternalUserDefinedExecutableFunctionsLoader::create
.result_type = std::move(result_type), //-V1030
};
ShellCommandCoordinator::Configuration shell_command_coordinator_configration
ShellCommandSourceCoordinator::Configuration shell_command_coordinator_configration
{
.format = std::move(format), //-V1030
.command_termination_timeout_seconds = command_termination_timeout_seconds,
@ -143,7 +143,7 @@ ExternalLoader::LoadablePtr ExternalUserDefinedExecutableFunctionsLoader::create
.execute_direct = execute_direct
};
std::shared_ptr<ShellCommandCoordinator> coordinator = std::make_shared<ShellCommandCoordinator>(shell_command_coordinator_configration);
auto coordinator = std::make_shared<ShellCommandSourceCoordinator>(shell_command_coordinator_configration);
return std::make_shared<UserDefinedExecutableFunction>(function_configuration, std::move(coordinator), lifetime);
}

View File

@ -13,7 +13,7 @@ namespace DB
UserDefinedExecutableFunction::UserDefinedExecutableFunction(
const UserDefinedExecutableFunctionConfiguration & configuration_,
std::shared_ptr<ShellCommandCoordinator> coordinator_,
std::shared_ptr<ShellCommandSourceCoordinator> coordinator_,
const ExternalLoadableLifetime & lifetime_)
: configuration(configuration_)
, coordinator(std::move(coordinator_))

View File

@ -25,7 +25,7 @@ public:
UserDefinedExecutableFunction(
const UserDefinedExecutableFunctionConfiguration & configuration_,
std::shared_ptr<ShellCommandCoordinator> coordinator_,
std::shared_ptr<ShellCommandSourceCoordinator> coordinator_,
const ExternalLoadableLifetime & lifetime_);
const ExternalLoadableLifetime & getLifetime() const override
@ -58,7 +58,7 @@ public:
return configuration;
}
std::shared_ptr<ShellCommandCoordinator> getCoordinator() const
std::shared_ptr<ShellCommandSourceCoordinator> getCoordinator() const
{
return coordinator;
}
@ -75,7 +75,7 @@ public:
private:
UserDefinedExecutableFunctionConfiguration configuration;
std::shared_ptr<ShellCommandCoordinator> coordinator;
std::shared_ptr<ShellCommandSourceCoordinator> coordinator;
ExternalLoadableLifetime lifetime;
};

View File

@ -23,7 +23,7 @@ namespace ErrorCodes
extern const int TIMEOUT_EXCEEDED;
extern const int CANNOT_FCNTL;
extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
extern const int CANNOT_SELECT;
extern const int CANNOT_POLL;
extern const int CANNOT_WRITE_TO_FILE_DESCRIPTOR;
}
@ -91,7 +91,7 @@ static bool pollFd(int fd, size_t timeout_milliseconds, int events)
}
else
{
throwFromErrno("Cannot select", ErrorCodes::CANNOT_SELECT);
throwFromErrno("Cannot poll", ErrorCodes::CANNOT_POLL);
}
}
else
@ -451,14 +451,14 @@ namespace
}
ShellCommandCoordinator::ShellCommandCoordinator(const Configuration & configuration_)
ShellCommandSourceCoordinator::ShellCommandSourceCoordinator(const Configuration & configuration_)
: configuration(configuration_)
{
if (configuration.is_executable_pool)
process_pool = std::make_shared<ProcessPool>(configuration.pool_size ? configuration.pool_size : std::numeric_limits<size_t>::max());
}
Pipe ShellCommandCoordinator::createPipe(
Pipe ShellCommandSourceCoordinator::createPipe(
const std::string & command,
const std::vector<std::string> & arguments,
std::vector<Pipe> && input_pipes,

View File

@ -36,7 +36,7 @@ struct ShellCommandSourceConfiguration
size_t max_block_size = DEFAULT_BLOCK_SIZE;
};
class ShellCommandCoordinator
class ShellCommandSourceCoordinator
{
public:
@ -72,7 +72,7 @@ public:
};
explicit ShellCommandCoordinator(const Configuration & configuration_);
explicit ShellCommandSourceCoordinator(const Configuration & configuration_);
const Configuration & getConfiguration() const
{

View File

@ -34,6 +34,40 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
namespace
{
void transformToSingleBlockSources(Pipes & inputs)
{
size_t inputs_size = inputs.size();
for (size_t i = 0; i < inputs_size; ++i)
{
auto && input = inputs[i];
QueryPipeline input_pipeline(std::move(input));
PullingPipelineExecutor input_pipeline_executor(input_pipeline);
auto header = input_pipeline_executor.getHeader();
auto result_block = header.cloneEmpty();
size_t result_block_columns = result_block.columns();
Block result;
while (input_pipeline_executor.pull(result))
{
for (size_t result_block_index = 0; result_block_index < result_block_columns; ++result_block_index)
{
auto & block_column = result.safeGetByPosition(result_block_index);
auto & result_block_column = result_block.safeGetByPosition(result_block_index);
result_block_column.column->assumeMutable()->insertRangeFrom(*block_column.column, 0, block_column.column->size());
}
}
auto source = std::make_shared<SourceFromSingleChunk>(std::move(result_block));
inputs[i] = Pipe(std::move(source));
}
}
}
StorageExecutable::StorageExecutable(
const StorageID & table_id_,
const String & format,
@ -51,7 +85,7 @@ StorageExecutable::StorageExecutable(
storage_metadata.setConstraints(constraints);
setInMemoryMetadata(storage_metadata);
ShellCommandCoordinator::Configuration configuration
ShellCommandSourceCoordinator::Configuration configuration
{
.format = format,
.command_termination_timeout_seconds = settings.command_termination_timeout,
@ -66,7 +100,7 @@ StorageExecutable::StorageExecutable(
.execute_direct = true
};
coordinator = std::make_unique<ShellCommandCoordinator>(std::move(configuration));
coordinator = std::make_unique<ShellCommandSourceCoordinator>(std::move(configuration));
}
Pipe StorageExecutable::read(
@ -104,44 +138,16 @@ Pipe StorageExecutable::read(
inputs.emplace_back(QueryPipelineBuilder::getPipe(interpreter.buildQueryPipeline()));
}
/// For executable pool we read data from input streams and convert it to single blocks streams.
if (settings.is_executable_pool)
{
/// For executable pool we read data from input streams and convert it to single blocks streams.
size_t inputs_size = inputs.size();
for (size_t i = 0; i < inputs_size; ++i)
{
auto && input = inputs[i];
QueryPipeline input_pipeline(std::move(input));
PullingPipelineExecutor input_pipeline_executor(input_pipeline);
auto header = input_pipeline_executor.getHeader();
auto result_block = header.cloneEmpty();
size_t result_block_columns = result_block.columns();
Block result;
while (input_pipeline_executor.pull(result))
{
for (size_t result_block_index = 0; result_block_index < result_block_columns; ++result_block_index)
{
auto & block_column = result.safeGetByPosition(result_block_index);
auto & result_block_column = result_block.safeGetByPosition(result_block_index);
result_block_column.column->assumeMutable()->insertRangeFrom(*block_column.column, 0, block_column.column->size());
}
}
auto source = std::make_shared<SourceFromSingleChunk>(std::move(result_block));
inputs[i] = Pipe(std::move(source));
}
}
transformToSingleBlockSources(inputs);
auto sample_block = metadata_snapshot->getSampleBlock();
ShellCommandSourceConfiguration configuration;
configuration.max_block_size = max_block_size;
if (coordinator->getConfiguration().is_executable_pool)
if (settings.is_executable_pool)
{
configuration.read_fixed_number_of_rows = true;
configuration.read_number_of_rows_from_process_output = true;

View File

@ -23,7 +23,7 @@ public:
String getName() const override
{
if (coordinator->getConfiguration().is_executable_pool)
if (settings.is_executable_pool)
return "ExecutablePool";
else
return "Executable";
@ -52,7 +52,7 @@ private:
ExecutableSettings settings;
std::vector<ASTPtr> input_queries;
Poco::Logger * log;
std::unique_ptr<ShellCommandCoordinator> coordinator;
std::unique_ptr<ShellCommandSourceCoordinator> coordinator;
};
}