From 554595958197bc7b6f325c165c3ce97623148db8 Mon Sep 17 00:00:00 2001 From: Maksim Kita Date: Tue, 24 Aug 2021 22:38:42 +0300 Subject: [PATCH] Updated StorageExecutable --- .../ShellCommandBlockInputStream.h | 100 --------- src/DataStreams/ShellCommandSource.h | 58 ++++++ .../ExecutableDictionarySource.cpp | 196 +++++++----------- src/Storages/StorageExecutable.cpp | 113 +--------- src/Storages/StorageExecutable.h | 21 +- .../TableFunctionExecutable.cpp | 11 +- src/TableFunctions/TableFunctionExecutable.h | 6 +- 7 files changed, 163 insertions(+), 342 deletions(-) delete mode 100644 src/DataStreams/ShellCommandBlockInputStream.h create mode 100644 src/DataStreams/ShellCommandSource.h diff --git a/src/DataStreams/ShellCommandBlockInputStream.h b/src/DataStreams/ShellCommandBlockInputStream.h deleted file mode 100644 index 1eb1532e9fa..00000000000 --- a/src/DataStreams/ShellCommandBlockInputStream.h +++ /dev/null @@ -1,100 +0,0 @@ -#pragma once - -#include - -#include -#include -#include -#include - -namespace DB -{ -/// Owns ShellCommand and calls wait for it. -class ShellCommandOwningBlockInputStream : public OwningBlockInputStream -{ -private: - Poco::Logger * log; -public: - ShellCommandOwningBlockInputStream(Poco::Logger * log_, const BlockInputStreamPtr & impl, std::unique_ptr command_) - : OwningBlockInputStream(std::move(impl), std::move(command_)), log(log_) - { - } - - void readSuffix() override - { - OwningBlockInputStream::readSuffix(); - - std::string err; - readStringUntilEOF(err, own->err); - if (!err.empty()) - LOG_ERROR(log, "Having stderr: {}", err); - - own->wait(); - } -}; - -/** A stream, that runs child process and sends data to its stdin in background thread, - * and receives data from its stdout. - */ -class BlockInputStreamWithBackgroundThread final : public IBlockInputStream -{ -public: - BlockInputStreamWithBackgroundThread( - const Context & context, - const std::string & format, - const Block & sample_block, - const std::string & command_str, - Poco::Logger * log_, - UInt64 max_block_size, - std::function && send_data_) - : log(log_), - command(ShellCommand::execute(command_str)), - send_data(std::move(send_data_)), - thread([this] { send_data(command->in); }) - { - stream = context.getInputFormat(format, command->out, sample_block, max_block_size); - } - - ~BlockInputStreamWithBackgroundThread() override - { - if (thread.joinable()) - thread.join(); - } - - Block getHeader() const override - { - return stream->getHeader(); - } - -private: - Block readImpl() override - { - return stream->read(); - } - - void readPrefix() override - { - stream->readPrefix(); - } - - void readSuffix() override - { - stream->readSuffix(); - - std::string err; - readStringUntilEOF(err, command->err); - if (!err.empty()) - LOG_ERROR(log, "Having stderr: {}", err); - - command->wait(); - } - - String getName() const override { return "WithBackgroundThread"; } - - Poco::Logger * log; - BlockInputStreamPtr stream; - std::unique_ptr command; - std::function send_data; - ThreadFromGlobalPool thread; -}; -} diff --git a/src/DataStreams/ShellCommandSource.h b/src/DataStreams/ShellCommandSource.h new file mode 100644 index 00000000000..c8f0a4edaad --- /dev/null +++ b/src/DataStreams/ShellCommandSource.h @@ -0,0 +1,58 @@ +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +/// Owns ShellCommand and calls wait for it. +class ShellCommandOwningTransform final : public ISimpleTransform +{ +public: + ShellCommandOwningTransform( + const Block & header, + Poco::Logger * log_, + std::unique_ptr command_) + : ISimpleTransform(header, header, true) + , command(std::move(command_)) + , log(log_) + { + } + + String getName() const override { return "ShellCommandOwningTransform"; } + void transform(Chunk &) override {} + + Status prepare() override + { + auto status = ISimpleTransform::prepare(); + if (status == Status::Finished) + { + std::string err; + readStringUntilEOF(err, command->err); + if (!err.empty()) + LOG_ERROR(log, "Having stderr: {}", err); + + command->wait(); + } + + return status; + } + +private: + std::unique_ptr command; + Poco::Logger * log; +}; + +} diff --git a/src/Dictionaries/ExecutableDictionarySource.cpp b/src/Dictionaries/ExecutableDictionarySource.cpp index 40794673de2..27d6c5ff1d5 100644 --- a/src/Dictionaries/ExecutableDictionarySource.cpp +++ b/src/Dictionaries/ExecutableDictionarySource.cpp @@ -1,26 +1,21 @@ #include "ExecutableDictionarySource.h" #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include #include #include -#include "DictionarySourceFactory.h" -#include "DictionarySourceHelpers.h" -#include "DictionaryStructure.h" -#include "registerDictionaries.h" +#include + +#include +#include + +#include +#include +#include + +#include +#include +#include +#include namespace DB @@ -35,41 +30,78 @@ namespace ErrorCodes extern const int UNSUPPORTED_METHOD; } - namespace { - /// Owns ShellCommand and calls wait for it. - class ShellCommandOwningTransform final : public ISimpleTransform + +/** A stream, that runs child process and sends data to its stdin in background thread, + * and receives data from its stdout. + * + * TODO: implement without background thread. + */ +class SourceWithBackgroundThread final : public SourceWithProgress +{ +public: + SourceWithBackgroundThread( + ContextPtr context, + const std::string & format, + const Block & sample_block, + const std::string & command_str, + Poco::Logger * log_, + std::function && send_data_) + : SourceWithProgress(sample_block) + , log(log_) + , command(ShellCommand::execute(command_str)) + , send_data(std::move(send_data_)) + , thread([this] { send_data(command->in); }) { - private: - Poco::Logger * log; - std::unique_ptr command; - public: - ShellCommandOwningTransform(const Block & header, Poco::Logger * log_, std::unique_ptr command_) - : ISimpleTransform(header, header, true), log(log_), command(std::move(command_)) + pipeline.init(Pipe(FormatFactory::instance().getInput(format, command->out, sample_block, context, DEFAULT_BLOCK_SIZE))); + executor = std::make_unique(pipeline); + } + + ~SourceWithBackgroundThread() override + { + if (thread.joinable()) + thread.join(); + } + +protected: + Chunk generate() override + { + Chunk chunk; + executor->pull(chunk); + return chunk; + } + +public: + Status prepare() override + { + auto status = SourceWithProgress::prepare(); + + if (status == Status::Finished) { + std::string err; + readStringUntilEOF(err, command->err); + if (!err.empty()) + LOG_ERROR(log, "Having stderr: {}", err); + + if (thread.joinable()) + thread.join(); + + command->wait(); } - String getName() const override { return "ShellCommandOwningTransform"; } - void transform(Chunk &) override {} + return status; + } - Status prepare() override - { - auto status = ISimpleTransform::prepare(); - if (status == Status::Finished) - { - std::string err; - readStringUntilEOF(err, command->err); - if (!err.empty()) - LOG_ERROR(log, "Having stderr: {}", err); - - command->wait(); - } - - return status; - } - }; + String getName() const override { return "SourceWithBackgroundThread"; } + Poco::Logger * log; + QueryPipeline pipeline; + std::unique_ptr executor; + std::unique_ptr command; + std::function send_data; + ThreadFromGlobalPool thread; +}; } ExecutableDictionarySource::ExecutableDictionarySource( @@ -133,85 +165,11 @@ Pipe ExecutableDictionarySource::loadUpdatedAll() LOG_TRACE(log, "loadUpdatedAll {}", command_with_update_field); auto process = ShellCommand::execute(command_with_update_field); - Pipe pipe(FormatFactory::instance().getInput(configuration.format, process->out, sample_block, context, max_block_size)); pipe.addTransform(std::make_shared(pipe.getHeader(), log, std::move(process))); return pipe; } -namespace -{ - /** A stream, that runs child process and sends data to its stdin in background thread, - * and receives data from its stdout. - * - * TODO: implement without background thread. - */ - class SourceWithBackgroundThread final : public SourceWithProgress - { - public: - SourceWithBackgroundThread( - ContextPtr context, - const std::string & format, - const Block & sample_block, - const std::string & command_str, - Poco::Logger * log_, - std::function && send_data_) - : SourceWithProgress(sample_block) - , log(log_) - , command(ShellCommand::execute(command_str)) - , send_data(std::move(send_data_)) - , thread([this] { send_data(command->in); }) - { - pipeline.init(Pipe(FormatFactory::instance().getInput(format, command->out, sample_block, context, max_block_size))); - executor = std::make_unique(pipeline); - } - - ~SourceWithBackgroundThread() override - { - if (thread.joinable()) - thread.join(); - } - - protected: - Chunk generate() override - { - Chunk chunk; - executor->pull(chunk); - return chunk; - } - - public: - Status prepare() override - { - auto status = SourceWithProgress::prepare(); - - if (status == Status::Finished) - { - std::string err; - readStringUntilEOF(err, command->err); - if (!err.empty()) - LOG_ERROR(log, "Having stderr: {}", err); - - if (thread.joinable()) - thread.join(); - - command->wait(); - } - - return status; - } - - String getName() const override { return "SourceWithBackgroundThread"; } - - Poco::Logger * log; - QueryPipeline pipeline; - std::unique_ptr executor; - std::unique_ptr command; - std::function send_data; - ThreadFromGlobalPool thread; - }; -} - Pipe ExecutableDictionarySource::loadIds(const std::vector & ids) { LOG_TRACE(log, "loadIds {} size = {}", toString(), ids.size()); diff --git a/src/Storages/StorageExecutable.cpp b/src/Storages/StorageExecutable.cpp index 25bcd4df6b4..7b9fbb5ccb8 100644 --- a/src/Storages/StorageExecutable.cpp +++ b/src/Storages/StorageExecutable.cpp @@ -1,123 +1,32 @@ #include #include -#include -#include -#include -#include #include #include #include #include -#include -#include -#include -#include +#include #include #include namespace DB { + namespace ErrorCodes { extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int UNKNOWN_IDENTIFIER; } -namespace -{ -class StorageExecutableSource : public SourceWithProgress -{ -public: - - String getName() const override { return "Executable"; } - - StorageExecutableSource( - const String & file_path_, - const String & format_, - BlockInputStreamPtr input_, - const StorageMetadataPtr & metadata_snapshot_, - const Context & context_, - UInt64 max_block_size_) - : SourceWithProgress(metadata_snapshot_->getSampleBlock()) - , file_path(std::move(file_path_)) - , format(format_) - , input(input_) - , metadata_snapshot(metadata_snapshot_) - , context(context_) - , max_block_size(max_block_size_) - { - } - - Chunk generate() override { - LOG_TRACE(log, "generating {}", toString(file_path)); - - if (!reader) - { - auto sample_block = metadata_snapshot->getSampleBlock(); - reader = std::make_shared(context, format, sample_block, file_path, log, max_block_size, [this](WriteBufferFromFile & out) - { - if (!input) - { - out.close(); - return; - } - - auto output_stream = context.getOutputFormat(format, out, input->getHeader()); - output_stream->writePrefix(); - - input->readPrefix(); - while (auto block = input->read()) { - output_stream->write(block); - } - input->readSuffix(); - input.reset(); - - output_stream->writeSuffix(); - output_stream->flush(); - out.close(); - }); - - reader->readPrefix(); - } - - if (auto res = reader->read()) - { - Columns columns = res.getColumns(); - UInt64 num_rows = res.rows(); - return Chunk(std::move(columns), num_rows); - } - - reader->readSuffix(); - reader.reset(); - return {}; - } - - private: - String file_path; - const String & format; - BlockInputStreamPtr input; - BlockInputStreamPtr reader; - const StorageMetadataPtr & metadata_snapshot; - const Context & context; - UInt64 max_block_size; - Poco::Logger * log = &Poco::Logger::get("StorageExecutableSource"); -}; -} - - StorageExecutable::StorageExecutable( const StorageID & table_id_, const String & file_path_, const String & format_, - BlockInputStreamPtr input_, const ColumnsDescription & columns, - const ConstraintsDescription & constraints, - const Context & context_) + const ConstraintsDescription & constraints) : IStorage(table_id_) , file_path(file_path_) , format(format_) - , input(input_) - , context(context_) + , log(&Poco::Logger::get("StorageExecutable")) { StorageInMemoryMetadata storage_metadata; storage_metadata.setColumns(columns); @@ -129,18 +38,16 @@ Pipe StorageExecutable::read( const Names & /*column_names*/, const StorageMetadataPtr & metadata_snapshot, SelectQueryInfo & /*query_info*/, - const Context & context_, + ContextPtr context_, QueryProcessingStage::Enum /*processed_stage*/, size_t max_block_size, unsigned /*num_streams*/) { - return Pipe(std::make_shared( - file_path, - format, - input, - metadata_snapshot, - context_, - max_block_size)); + auto process = ShellCommand::execute(file_path); + auto sample_block = metadata_snapshot->getSampleBlock(); + Pipe pipe(FormatFactory::instance().getInput(format, process->out, std::move(sample_block), std::move(context_), max_block_size)); + pipe.addTransform(std::make_shared(pipe.getHeader(), log, std::move(process))); + return pipe; } }; diff --git a/src/Storages/StorageExecutable.h b/src/Storages/StorageExecutable.h index 2079615f8c9..4cff7b9ee32 100644 --- a/src/Storages/StorageExecutable.h +++ b/src/Storages/StorageExecutable.h @@ -1,19 +1,19 @@ #pragma once -#include +#include +#include #include #include -#include -#include + namespace DB { /** * This class represents table engine for external executable files. */ -class StorageExecutable final : public ext::shared_ptr_helper, public IStorage +class StorageExecutable final : public shared_ptr_helper, public IStorage { - friend struct ext::shared_ptr_helper; + friend struct shared_ptr_helper; public: String getName() const override { return "Executable"; } @@ -21,26 +21,23 @@ public: const Names & column_names, const StorageMetadataPtr & /*metadata_snapshot*/, SelectQueryInfo & query_info, - const Context & context, + ContextPtr context, QueryProcessingStage::Enum processed_stage, size_t max_block_size, - unsigned num_streams) override; + unsigned threads) override; protected: StorageExecutable( const StorageID & table_id, const String & file_path_, const String & format_, - BlockInputStreamPtr input_, const ColumnsDescription & columns, - const ConstraintsDescription & constraints, - const Context & context_); + const ConstraintsDescription & constraints); private: String file_path; String format; - BlockInputStreamPtr input; - const Context & context; + Poco::Logger * log; }; } diff --git a/src/TableFunctions/TableFunctionExecutable.cpp b/src/TableFunctions/TableFunctionExecutable.cpp index 977847f6d91..01a87c9262e 100644 --- a/src/TableFunctions/TableFunctionExecutable.cpp +++ b/src/TableFunctions/TableFunctionExecutable.cpp @@ -24,7 +24,7 @@ namespace ErrorCodes extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; } -void TableFunctionExecutable::parseArguments(const ASTPtr & ast_function, const Context & context) +void TableFunctionExecutable::parseArguments(const ASTPtr & ast_function, ContextPtr context) { const auto * function = ast_function->as(); @@ -46,19 +46,20 @@ void TableFunctionExecutable::parseArguments(const ASTPtr & ast_function, const if (args.size() == 4) { if (!(args[3]->as())) throw Exception("Table function '" + getName() + "' 4th argument is invalid input query", ErrorCodes::LOGICAL_ERROR); - + input = interpretSubquery(args[3], context, {}, {})->execute().getInputStream(); } } -ColumnsDescription TableFunctionExecutable::getActualTableStructure(const Context & context) const +ColumnsDescription TableFunctionExecutable::getActualTableStructure(ContextPtr context) const { return parseColumnsListFromString(structure, context); } -StoragePtr TableFunctionExecutable::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const +StoragePtr TableFunctionExecutable::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const { - auto storage = StorageExecutable::create(StorageID(getDatabaseName(), table_name), file_path, format, input, getActualTableStructure(context), ConstraintsDescription{}, context); + auto storage_id = StorageID(getDatabaseName(), table_name); + auto storage = StorageExecutable::create(storage_id, file_path, format, getActualTableStructure(context), ConstraintsDescription{}); storage->startup(); return storage; } diff --git a/src/TableFunctions/TableFunctionExecutable.h b/src/TableFunctions/TableFunctionExecutable.h index 5ee468e5930..025df5e0cf2 100644 --- a/src/TableFunctions/TableFunctionExecutable.h +++ b/src/TableFunctions/TableFunctionExecutable.h @@ -22,11 +22,11 @@ public: bool hasStaticStructure() const override { return true; } private: - StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const override; + StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override; const char * getStorageTypeName() const override { return "Executable"; } - ColumnsDescription getActualTableStructure(const Context & context) const override; - void parseArguments(const ASTPtr & ast_function, const Context & context) override; + ColumnsDescription getActualTableStructure(ContextPtr context) const override; + void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; String file_path; String format;