Updated StorageExecutable

This commit is contained in:
Maksim Kita 2021-08-24 22:38:42 +03:00
parent 75a37f6956
commit 5545959581
7 changed files with 163 additions and 342 deletions

View File

@ -1,100 +0,0 @@
#pragma once
#include <memory>
#include <Common/ShellCommand.h>
#include <common/logger_useful.h>
#include <DataStreams/OwningBlockInputStream.h>
#include <IO/ReadHelpers.h>
namespace DB
{
/// Owns ShellCommand and calls wait for it.
class ShellCommandOwningBlockInputStream : public OwningBlockInputStream<ShellCommand>
{
private:
Poco::Logger * log;
public:
ShellCommandOwningBlockInputStream(Poco::Logger * log_, const BlockInputStreamPtr & impl, std::unique_ptr<ShellCommand> command_)
: OwningBlockInputStream(std::move(impl), std::move(command_)), log(log_)
{
}
void readSuffix() override
{
OwningBlockInputStream<ShellCommand>::readSuffix();
std::string err;
readStringUntilEOF(err, own->err);
if (!err.empty())
LOG_ERROR(log, "Having stderr: {}", err);
own->wait();
}
};
/** A stream, that runs child process and sends data to its stdin in background thread,
* and receives data from its stdout.
*/
class BlockInputStreamWithBackgroundThread final : public IBlockInputStream
{
public:
BlockInputStreamWithBackgroundThread(
const Context & context,
const std::string & format,
const Block & sample_block,
const std::string & command_str,
Poco::Logger * log_,
UInt64 max_block_size,
std::function<void(WriteBufferFromFile &)> && send_data_)
: log(log_),
command(ShellCommand::execute(command_str)),
send_data(std::move(send_data_)),
thread([this] { send_data(command->in); })
{
stream = context.getInputFormat(format, command->out, sample_block, max_block_size);
}
~BlockInputStreamWithBackgroundThread() override
{
if (thread.joinable())
thread.join();
}
Block getHeader() const override
{
return stream->getHeader();
}
private:
Block readImpl() override
{
return stream->read();
}
void readPrefix() override
{
stream->readPrefix();
}
void readSuffix() override
{
stream->readSuffix();
std::string err;
readStringUntilEOF(err, command->err);
if (!err.empty())
LOG_ERROR(log, "Having stderr: {}", err);
command->wait();
}
String getName() const override { return "WithBackgroundThread"; }
Poco::Logger * log;
BlockInputStreamPtr stream;
std::unique_ptr<ShellCommand> command;
std::function<void(WriteBufferFromFile &)> send_data;
ThreadFromGlobalPool thread;
};
}

View File

@ -0,0 +1,58 @@
#pragma once
#include <memory>
#include <common/logger_useful.h>
#include <Common/ShellCommand.h>
#include <Common/ThreadPool.h>
#include <IO/ReadHelpers.h>
#include <Formats/FormatFactory.h>
#include <Processors/ISimpleTransform.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
namespace DB
{
/// Owns ShellCommand and calls wait for it.
class ShellCommandOwningTransform final : public ISimpleTransform
{
public:
ShellCommandOwningTransform(
const Block & header,
Poco::Logger * log_,
std::unique_ptr<ShellCommand> command_)
: ISimpleTransform(header, header, true)
, command(std::move(command_))
, log(log_)
{
}
String getName() const override { return "ShellCommandOwningTransform"; }
void transform(Chunk &) override {}
Status prepare() override
{
auto status = ISimpleTransform::prepare();
if (status == Status::Finished)
{
std::string err;
readStringUntilEOF(err, command->err);
if (!err.empty())
LOG_ERROR(log, "Having stderr: {}", err);
command->wait();
}
return status;
}
private:
std::unique_ptr<ShellCommand> command;
Poco::Logger * log;
};
}

View File

@ -1,26 +1,21 @@
#include "ExecutableDictionarySource.h" #include "ExecutableDictionarySource.h"
#include <functional> #include <functional>
#include <common/scope_guard.h>
#include <DataStreams/OwningBlockInputStream.h>
#include <DataStreams/formatBlock.h>
#include <Processors/ISimpleTransform.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Formats/IInputFormat.h>
#include <Interpreters/Context.h>
#include <Formats/FormatFactory.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Common/ShellCommand.h>
#include <Common/ThreadPool.h>
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <common/LocalDateTime.h> #include <common/LocalDateTime.h>
#include "DictionarySourceFactory.h" #include <Common/ShellCommand.h>
#include "DictionarySourceHelpers.h"
#include "DictionaryStructure.h" #include <DataStreams/ShellCommandSource.h>
#include "registerDictionaries.h" #include <DataStreams/formatBlock.h>
#include <Interpreters/Context.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Dictionaries/DictionarySourceFactory.h>
#include <Dictionaries/DictionarySourceHelpers.h>
#include <Dictionaries/DictionaryStructure.h>
#include <Dictionaries/registerDictionaries.h>
namespace DB namespace DB
@ -35,27 +30,53 @@ namespace ErrorCodes
extern const int UNSUPPORTED_METHOD; extern const int UNSUPPORTED_METHOD;
} }
namespace namespace
{ {
/// Owns ShellCommand and calls wait for it.
class ShellCommandOwningTransform final : public ISimpleTransform /** A stream, that runs child process and sends data to its stdin in background thread,
* and receives data from its stdout.
*
* TODO: implement without background thread.
*/
class SourceWithBackgroundThread final : public SourceWithProgress
{ {
private:
Poco::Logger * log;
std::unique_ptr<ShellCommand> command;
public: public:
ShellCommandOwningTransform(const Block & header, Poco::Logger * log_, std::unique_ptr<ShellCommand> command_) SourceWithBackgroundThread(
: ISimpleTransform(header, header, true), log(log_), command(std::move(command_)) ContextPtr context,
const std::string & format,
const Block & sample_block,
const std::string & command_str,
Poco::Logger * log_,
std::function<void(WriteBufferFromFile &)> && send_data_)
: SourceWithProgress(sample_block)
, log(log_)
, command(ShellCommand::execute(command_str))
, send_data(std::move(send_data_))
, thread([this] { send_data(command->in); })
{ {
pipeline.init(Pipe(FormatFactory::instance().getInput(format, command->out, sample_block, context, DEFAULT_BLOCK_SIZE)));
executor = std::make_unique<PullingPipelineExecutor>(pipeline);
} }
String getName() const override { return "ShellCommandOwningTransform"; } ~SourceWithBackgroundThread() override
void transform(Chunk &) override {} {
if (thread.joinable())
thread.join();
}
protected:
Chunk generate() override
{
Chunk chunk;
executor->pull(chunk);
return chunk;
}
public:
Status prepare() override Status prepare() override
{ {
auto status = ISimpleTransform::prepare(); auto status = SourceWithProgress::prepare();
if (status == Status::Finished) if (status == Status::Finished)
{ {
std::string err; std::string err;
@ -63,13 +84,24 @@ namespace
if (!err.empty()) if (!err.empty())
LOG_ERROR(log, "Having stderr: {}", err); LOG_ERROR(log, "Having stderr: {}", err);
if (thread.joinable())
thread.join();
command->wait(); command->wait();
} }
return status; return status;
} }
};
String getName() const override { return "SourceWithBackgroundThread"; }
Poco::Logger * log;
QueryPipeline pipeline;
std::unique_ptr<PullingPipelineExecutor> executor;
std::unique_ptr<ShellCommand> command;
std::function<void(WriteBufferFromFile &)> send_data;
ThreadFromGlobalPool thread;
};
} }
ExecutableDictionarySource::ExecutableDictionarySource( ExecutableDictionarySource::ExecutableDictionarySource(
@ -133,85 +165,11 @@ Pipe ExecutableDictionarySource::loadUpdatedAll()
LOG_TRACE(log, "loadUpdatedAll {}", command_with_update_field); LOG_TRACE(log, "loadUpdatedAll {}", command_with_update_field);
auto process = ShellCommand::execute(command_with_update_field); auto process = ShellCommand::execute(command_with_update_field);
Pipe pipe(FormatFactory::instance().getInput(configuration.format, process->out, sample_block, context, max_block_size)); Pipe pipe(FormatFactory::instance().getInput(configuration.format, process->out, sample_block, context, max_block_size));
pipe.addTransform(std::make_shared<ShellCommandOwningTransform>(pipe.getHeader(), log, std::move(process))); pipe.addTransform(std::make_shared<ShellCommandOwningTransform>(pipe.getHeader(), log, std::move(process)));
return pipe; return pipe;
} }
namespace
{
/** A stream, that runs child process and sends data to its stdin in background thread,
* and receives data from its stdout.
*
* TODO: implement without background thread.
*/
class SourceWithBackgroundThread final : public SourceWithProgress
{
public:
SourceWithBackgroundThread(
ContextPtr context,
const std::string & format,
const Block & sample_block,
const std::string & command_str,
Poco::Logger * log_,
std::function<void(WriteBufferFromFile &)> && send_data_)
: SourceWithProgress(sample_block)
, log(log_)
, command(ShellCommand::execute(command_str))
, send_data(std::move(send_data_))
, thread([this] { send_data(command->in); })
{
pipeline.init(Pipe(FormatFactory::instance().getInput(format, command->out, sample_block, context, max_block_size)));
executor = std::make_unique<PullingPipelineExecutor>(pipeline);
}
~SourceWithBackgroundThread() override
{
if (thread.joinable())
thread.join();
}
protected:
Chunk generate() override
{
Chunk chunk;
executor->pull(chunk);
return chunk;
}
public:
Status prepare() override
{
auto status = SourceWithProgress::prepare();
if (status == Status::Finished)
{
std::string err;
readStringUntilEOF(err, command->err);
if (!err.empty())
LOG_ERROR(log, "Having stderr: {}", err);
if (thread.joinable())
thread.join();
command->wait();
}
return status;
}
String getName() const override { return "SourceWithBackgroundThread"; }
Poco::Logger * log;
QueryPipeline pipeline;
std::unique_ptr<PullingPipelineExecutor> executor;
std::unique_ptr<ShellCommand> command;
std::function<void(WriteBufferFromFile &)> send_data;
ThreadFromGlobalPool thread;
};
}
Pipe ExecutableDictionarySource::loadIds(const std::vector<UInt64> & ids) Pipe ExecutableDictionarySource::loadIds(const std::vector<UInt64> & ids)
{ {
LOG_TRACE(log, "loadIds {} size = {}", toString(), ids.size()); LOG_TRACE(log, "loadIds {} size = {}", toString(), ids.size());

View File

@ -1,123 +1,32 @@
#include <Storages/StorageExecutable.h> #include <Storages/StorageExecutable.h>
#include <Processors/Pipe.h> #include <Processors/Pipe.h>
#include <Parsers/ASTLiteral.h>
#include <Storages/StorageFile.h>
#include <Parsers/ASTIdentifier.h>
#include <IO/CompressionMethod.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Storages/StorageFactory.h> #include <Storages/StorageFactory.h>
#include <Processors/Sources/SourceWithProgress.h> #include <Processors/Sources/SourceWithProgress.h>
#include <DataStreams/ShellCommandBlockInputStream.h> #include <DataStreams/ShellCommandSource.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Dictionaries/ExecutableDictionarySource.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Core/Block.h> #include <Core/Block.h>
#include <Common/ShellCommand.h> #include <Common/ShellCommand.h>
namespace DB namespace DB
{ {
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int UNKNOWN_IDENTIFIER; extern const int UNKNOWN_IDENTIFIER;
} }
namespace
{
class StorageExecutableSource : public SourceWithProgress
{
public:
String getName() const override { return "Executable"; }
StorageExecutableSource(
const String & file_path_,
const String & format_,
BlockInputStreamPtr input_,
const StorageMetadataPtr & metadata_snapshot_,
const Context & context_,
UInt64 max_block_size_)
: SourceWithProgress(metadata_snapshot_->getSampleBlock())
, file_path(std::move(file_path_))
, format(format_)
, input(input_)
, metadata_snapshot(metadata_snapshot_)
, context(context_)
, max_block_size(max_block_size_)
{
}
Chunk generate() override {
LOG_TRACE(log, "generating {}", toString(file_path));
if (!reader)
{
auto sample_block = metadata_snapshot->getSampleBlock();
reader = std::make_shared<BlockInputStreamWithBackgroundThread>(context, format, sample_block, file_path, log, max_block_size, [this](WriteBufferFromFile & out)
{
if (!input)
{
out.close();
return;
}
auto output_stream = context.getOutputFormat(format, out, input->getHeader());
output_stream->writePrefix();
input->readPrefix();
while (auto block = input->read()) {
output_stream->write(block);
}
input->readSuffix();
input.reset();
output_stream->writeSuffix();
output_stream->flush();
out.close();
});
reader->readPrefix();
}
if (auto res = reader->read())
{
Columns columns = res.getColumns();
UInt64 num_rows = res.rows();
return Chunk(std::move(columns), num_rows);
}
reader->readSuffix();
reader.reset();
return {};
}
private:
String file_path;
const String & format;
BlockInputStreamPtr input;
BlockInputStreamPtr reader;
const StorageMetadataPtr & metadata_snapshot;
const Context & context;
UInt64 max_block_size;
Poco::Logger * log = &Poco::Logger::get("StorageExecutableSource");
};
}
StorageExecutable::StorageExecutable( StorageExecutable::StorageExecutable(
const StorageID & table_id_, const StorageID & table_id_,
const String & file_path_, const String & file_path_,
const String & format_, const String & format_,
BlockInputStreamPtr input_,
const ColumnsDescription & columns, const ColumnsDescription & columns,
const ConstraintsDescription & constraints, const ConstraintsDescription & constraints)
const Context & context_)
: IStorage(table_id_) : IStorage(table_id_)
, file_path(file_path_) , file_path(file_path_)
, format(format_) , format(format_)
, input(input_) , log(&Poco::Logger::get("StorageExecutable"))
, context(context_)
{ {
StorageInMemoryMetadata storage_metadata; StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns); storage_metadata.setColumns(columns);
@ -129,18 +38,16 @@ Pipe StorageExecutable::read(
const Names & /*column_names*/, const Names & /*column_names*/,
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot,
SelectQueryInfo & /*query_info*/, SelectQueryInfo & /*query_info*/,
const Context & context_, ContextPtr context_,
QueryProcessingStage::Enum /*processed_stage*/, QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size, size_t max_block_size,
unsigned /*num_streams*/) unsigned /*num_streams*/)
{ {
return Pipe(std::make_shared<StorageExecutableSource>( auto process = ShellCommand::execute(file_path);
file_path, auto sample_block = metadata_snapshot->getSampleBlock();
format, Pipe pipe(FormatFactory::instance().getInput(format, process->out, std::move(sample_block), std::move(context_), max_block_size));
input, pipe.addTransform(std::make_shared<ShellCommandOwningTransform>(pipe.getHeader(), log, std::move(process)));
metadata_snapshot, return pipe;
context_,
max_block_size));
} }
}; };

View File

@ -1,19 +1,19 @@
#pragma once #pragma once
#include <Common/config.h>
#include <common/logger_useful.h>
#include <common/shared_ptr_helper.h>
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <IO/CompressionMethod.h> #include <IO/CompressionMethod.h>
#include <common/logger_useful.h>
#include <ext/shared_ptr_helper.h>
namespace DB namespace DB
{ {
/** /**
* This class represents table engine for external executable files. * This class represents table engine for external executable files.
*/ */
class StorageExecutable final : public ext::shared_ptr_helper<StorageExecutable>, public IStorage class StorageExecutable final : public shared_ptr_helper<StorageExecutable>, public IStorage
{ {
friend struct ext::shared_ptr_helper<StorageExecutable>; friend struct shared_ptr_helper<StorageExecutable>;
public: public:
String getName() const override { return "Executable"; } String getName() const override { return "Executable"; }
@ -21,26 +21,23 @@ public:
const Names & column_names, const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/, const StorageMetadataPtr & /*metadata_snapshot*/,
SelectQueryInfo & query_info, SelectQueryInfo & query_info,
const Context & context, ContextPtr context,
QueryProcessingStage::Enum processed_stage, QueryProcessingStage::Enum processed_stage,
size_t max_block_size, size_t max_block_size,
unsigned num_streams) override; unsigned threads) override;
protected: protected:
StorageExecutable( StorageExecutable(
const StorageID & table_id, const StorageID & table_id,
const String & file_path_, const String & file_path_,
const String & format_, const String & format_,
BlockInputStreamPtr input_,
const ColumnsDescription & columns, const ColumnsDescription & columns,
const ConstraintsDescription & constraints, const ConstraintsDescription & constraints);
const Context & context_);
private: private:
String file_path; String file_path;
String format; String format;
BlockInputStreamPtr input; Poco::Logger * log;
const Context & context;
}; };
} }

View File

@ -24,7 +24,7 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
} }
void TableFunctionExecutable::parseArguments(const ASTPtr & ast_function, const Context & context) void TableFunctionExecutable::parseArguments(const ASTPtr & ast_function, ContextPtr context)
{ {
const auto * function = ast_function->as<ASTFunction>(); const auto * function = ast_function->as<ASTFunction>();
@ -51,14 +51,15 @@ void TableFunctionExecutable::parseArguments(const ASTPtr & ast_function, const
} }
} }
ColumnsDescription TableFunctionExecutable::getActualTableStructure(const Context & context) const ColumnsDescription TableFunctionExecutable::getActualTableStructure(ContextPtr context) const
{ {
return parseColumnsListFromString(structure, context); return parseColumnsListFromString(structure, context);
} }
StoragePtr TableFunctionExecutable::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const StoragePtr TableFunctionExecutable::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{ {
auto storage = StorageExecutable::create(StorageID(getDatabaseName(), table_name), file_path, format, input, getActualTableStructure(context), ConstraintsDescription{}, context); auto storage_id = StorageID(getDatabaseName(), table_name);
auto storage = StorageExecutable::create(storage_id, file_path, format, getActualTableStructure(context), ConstraintsDescription{});
storage->startup(); storage->startup();
return storage; return storage;
} }

View File

@ -22,11 +22,11 @@ public:
bool hasStaticStructure() const override { return true; } bool hasStaticStructure() const override { return true; }
private: private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const override; StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "Executable"; } const char * getStorageTypeName() const override { return "Executable"; }
ColumnsDescription getActualTableStructure(const Context & context) const override; ColumnsDescription getActualTableStructure(ContextPtr context) const override;
void parseArguments(const ASTPtr & ast_function, const Context & context) override; void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
String file_path; String file_path;
String format; String format;