From 303813c1d8a703e09ccabb3ab174f840d4d4b682 Mon Sep 17 00:00:00 2001 From: ruct Date: Fri, 16 Apr 2021 00:15:54 +0300 Subject: [PATCH] Maybe --- .../ShellCommandBlockInputStream.h | 100 ++++++++++++++++ .../ShellCommandOwningBlockInputStream.h | 37 ------ .../ExecutableDictionarySource.cpp | 74 +----------- src/Storages/StorageExecutable.cpp | 107 +++++++----------- src/Storages/StorageExecutable.h | 13 +-- src/Storages/ya.make | 1 + .../TableFunctionExecutable.cpp | 67 +++++++++-- src/TableFunctions/TableFunctionExecutable.h | 28 +++-- src/TableFunctions/ya.make | 1 + 9 files changed, 228 insertions(+), 200 deletions(-) create mode 100644 src/DataStreams/ShellCommandBlockInputStream.h delete mode 100644 src/DataStreams/ShellCommandOwningBlockInputStream.h diff --git a/src/DataStreams/ShellCommandBlockInputStream.h b/src/DataStreams/ShellCommandBlockInputStream.h new file mode 100644 index 00000000000..1eb1532e9fa --- /dev/null +++ b/src/DataStreams/ShellCommandBlockInputStream.h @@ -0,0 +1,100 @@ +#pragma once + +#include + +#include +#include +#include +#include + +namespace DB +{ +/// Owns ShellCommand and calls wait for it. +class ShellCommandOwningBlockInputStream : public OwningBlockInputStream +{ +private: + Poco::Logger * log; +public: + ShellCommandOwningBlockInputStream(Poco::Logger * log_, const BlockInputStreamPtr & impl, std::unique_ptr command_) + : OwningBlockInputStream(std::move(impl), std::move(command_)), log(log_) + { + } + + void readSuffix() override + { + OwningBlockInputStream::readSuffix(); + + std::string err; + readStringUntilEOF(err, own->err); + if (!err.empty()) + LOG_ERROR(log, "Having stderr: {}", err); + + own->wait(); + } +}; + +/** A stream, that runs child process and sends data to its stdin in background thread, + * and receives data from its stdout. + */ +class BlockInputStreamWithBackgroundThread final : public IBlockInputStream +{ +public: + BlockInputStreamWithBackgroundThread( + const Context & context, + const std::string & format, + const Block & sample_block, + const std::string & command_str, + Poco::Logger * log_, + UInt64 max_block_size, + std::function && send_data_) + : log(log_), + command(ShellCommand::execute(command_str)), + send_data(std::move(send_data_)), + thread([this] { send_data(command->in); }) + { + stream = context.getInputFormat(format, command->out, sample_block, max_block_size); + } + + ~BlockInputStreamWithBackgroundThread() override + { + if (thread.joinable()) + thread.join(); + } + + Block getHeader() const override + { + return stream->getHeader(); + } + +private: + Block readImpl() override + { + return stream->read(); + } + + void readPrefix() override + { + stream->readPrefix(); + } + + void readSuffix() override + { + stream->readSuffix(); + + std::string err; + readStringUntilEOF(err, command->err); + if (!err.empty()) + LOG_ERROR(log, "Having stderr: {}", err); + + command->wait(); + } + + String getName() const override { return "WithBackgroundThread"; } + + Poco::Logger * log; + BlockInputStreamPtr stream; + std::unique_ptr command; + std::function send_data; + ThreadFromGlobalPool thread; +}; +} diff --git a/src/DataStreams/ShellCommandOwningBlockInputStream.h b/src/DataStreams/ShellCommandOwningBlockInputStream.h deleted file mode 100644 index 656ad5f5c2f..00000000000 --- a/src/DataStreams/ShellCommandOwningBlockInputStream.h +++ /dev/null @@ -1,37 +0,0 @@ -#pragma once - -#include - -#include -#include -#include -#include - -namespace DB -{ - -/// Owns ShellCommand and calls wait for it. -class ShellCommandOwningBlockInputStream : public OwningBlockInputStream -{ -private: - Poco::Logger * log; -public: - ShellCommandOwningBlockInputStream(Poco::Logger * log_, const BlockInputStreamPtr & impl, std::unique_ptr command_) - : OwningBlockInputStream(std::move(impl), std::move(command_)), log(log_) - { - } - - void readSuffix() override - { - OwningBlockInputStream::readSuffix(); - - std::string err; - readStringUntilEOF(err, own->err); - if (!err.empty()) - LOG_ERROR(log, "Having stderr: {}", err); - - own->wait(); - } -}; - -} diff --git a/src/Dictionaries/ExecutableDictionarySource.cpp b/src/Dictionaries/ExecutableDictionarySource.cpp index 4b0c613c01a..9c2833a7adf 100644 --- a/src/Dictionaries/ExecutableDictionarySource.cpp +++ b/src/Dictionaries/ExecutableDictionarySource.cpp @@ -3,7 +3,7 @@ #include #include #include -#include +#include #include #include #include @@ -79,80 +79,12 @@ BlockInputStreamPtr ExecutableDictionarySource::loadUpdatedAll() return std::make_shared(log, input_stream, std::move(process)); } -namespace -{ - /** A stream, that runs child process and sends data to its stdin in background thread, - * and receives data from its stdout. - */ - class BlockInputStreamWithBackgroundThread final : public IBlockInputStream - { - public: - BlockInputStreamWithBackgroundThread( - const Context & context, - const std::string & format, - const Block & sample_block, - const std::string & command_str, - Poco::Logger * log_, - std::function && send_data_) - : log(log_), - command(ShellCommand::execute(command_str)), - send_data(std::move(send_data_)), - thread([this] { send_data(command->in); }) - { - stream = context.getInputFormat(format, command->out, sample_block, max_block_size); - } - - ~BlockInputStreamWithBackgroundThread() override - { - if (thread.joinable()) - thread.join(); - } - - Block getHeader() const override - { - return stream->getHeader(); - } - - private: - Block readImpl() override - { - return stream->read(); - } - - void readPrefix() override - { - stream->readPrefix(); - } - - void readSuffix() override - { - stream->readSuffix(); - - std::string err; - readStringUntilEOF(err, command->err); - if (!err.empty()) - LOG_ERROR(log, "Having stderr: {}", err); - - command->wait(); - } - - String getName() const override { return "WithBackgroundThread"; } - - Poco::Logger * log; - BlockInputStreamPtr stream; - std::unique_ptr command; - std::function send_data; - ThreadFromGlobalPool thread; - }; -} - - BlockInputStreamPtr ExecutableDictionarySource::loadIds(const std::vector & ids) { LOG_TRACE(log, "loadIds {} size = {}", toString(), ids.size()); return std::make_shared( - context, format, sample_block, command, log, + context, format, sample_block, command, log, max_block_size, [&ids, this](WriteBufferFromFile & out) mutable { auto output_stream = context.getOutputFormat(format, out, sample_block); @@ -166,7 +98,7 @@ BlockInputStreamPtr ExecutableDictionarySource::loadKeys(const Columns & key_col LOG_TRACE(log, "loadKeys {} size = {}", toString(), requested_rows.size()); return std::make_shared( - context, format, sample_block, command, log, + context, format, sample_block, command, log, max_block_size, [key_columns, &requested_rows, this](WriteBufferFromFile & out) mutable { auto output_stream = context.getOutputFormat(format, out, sample_block); diff --git a/src/Storages/StorageExecutable.cpp b/src/Storages/StorageExecutable.cpp index 4270936cb06..25bcd4df6b4 100644 --- a/src/Storages/StorageExecutable.cpp +++ b/src/Storages/StorageExecutable.cpp @@ -8,7 +8,8 @@ #include #include #include -#include +#include +#include #include #include #include @@ -31,18 +32,18 @@ public: String getName() const override { return "Executable"; } StorageExecutableSource( + const String & file_path_, const String & format_, + BlockInputStreamPtr input_, const StorageMetadataPtr & metadata_snapshot_, const Context & context_, - const ColumnsDescription & /*columns*/, - UInt64 max_block_size_, - const CompressionMethod /*compression_method*/, - const String & file_path_) + UInt64 max_block_size_) : SourceWithProgress(metadata_snapshot_->getSampleBlock()) , file_path(std::move(file_path_)) + , format(format_) + , input(input_) , metadata_snapshot(metadata_snapshot_) , context(context_) - , format(format_) , max_block_size(max_block_size_) { } @@ -52,9 +53,29 @@ public: if (!reader) { - auto process = ShellCommand::execute(file_path); - auto input_stream = context.getInputFormat(format, process->out, metadata_snapshot->getSampleBlock(), max_block_size); - reader = std::make_shared(log, input_stream, std::move(process)); + auto sample_block = metadata_snapshot->getSampleBlock(); + reader = std::make_shared(context, format, sample_block, file_path, log, max_block_size, [this](WriteBufferFromFile & out) + { + if (!input) + { + out.close(); + return; + } + + auto output_stream = context.getOutputFormat(format, out, input->getHeader()); + output_stream->writePrefix(); + + input->readPrefix(); + while (auto block = input->read()) { + output_stream->write(block); + } + input->readSuffix(); + input.reset(); + + output_stream->writeSuffix(); + output_stream->flush(); + out.close(); + }); reader->readPrefix(); } @@ -73,10 +94,11 @@ public: private: String file_path; + const String & format; + BlockInputStreamPtr input; BlockInputStreamPtr reader; const StorageMetadataPtr & metadata_snapshot; const Context & context; - const String & format; UInt64 max_block_size; Poco::Logger * log = &Poco::Logger::get("StorageExecutableSource"); }; @@ -85,18 +107,17 @@ public: StorageExecutable::StorageExecutable( const StorageID & table_id_, - const String & format_name_, const String & file_path_, + const String & format_, + BlockInputStreamPtr input_, const ColumnsDescription & columns, const ConstraintsDescription & constraints, - Context & context_, - CompressionMethod compression_method_) + const Context & context_) : IStorage(table_id_) - , format_name(format_name_) , file_path(file_path_) + , format(format_) + , input(input_) , context(context_) - , compression_method(compression_method_) - , log(&Poco::Logger::get("StorageExecutable (" + table_id_.getFullTableName() + ")")) { StorageInMemoryMetadata storage_metadata; storage_metadata.setColumns(columns); @@ -104,51 +125,6 @@ StorageExecutable::StorageExecutable( setInMemoryMetadata(storage_metadata); } -void registerStorageExecutable(StorageFactory & factory) -{ - factory.registerStorage( - "Executable", - [](const StorageFactory::Arguments & args) - { - ASTs & engine_args = args.engine_args; - - if (!(engine_args.size() >= 1 && engine_args.size() <= 4)) // NOLINT - throw Exception( - "Storage Executable requires from 1 to 4 arguments: name of used format, source and compression_method.", - ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); - - engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[0], args.local_context); - String format_name = engine_args[0]->as().value.safeGet(); - - String compression_method_str; - String source_path; - - if (const auto * literal = engine_args[1]->as()) - { - auto type = literal->value.getType(); - if (type == Field::Types::String) - source_path = literal->value.get(); - else - throw Exception("Second argument must be path or file descriptor", ErrorCodes::BAD_ARGUMENTS); - } - - if (engine_args.size() == 3) - { - engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context); - compression_method_str = engine_args[2]->as().value.safeGet(); - } - - CompressionMethod compression_method = chooseCompressionMethod("", compression_method_str); - - return StorageExecutable::create(args.table_id, format_name, source_path, args.columns, args.constraints, args.context, compression_method); - }, - { - .source_access_type = AccessType::FILE, - }); -} - - - Pipe StorageExecutable::read( const Names & /*column_names*/, const StorageMetadataPtr & metadata_snapshot, @@ -159,13 +135,12 @@ Pipe StorageExecutable::read( unsigned /*num_streams*/) { return Pipe(std::make_shared( - format_name, + file_path, + format, + input, metadata_snapshot, context_, - metadata_snapshot->getColumns(), - max_block_size, - compression_method, - file_path)); + max_block_size)); } }; diff --git a/src/Storages/StorageExecutable.h b/src/Storages/StorageExecutable.h index 16e4e981c19..2079615f8c9 100644 --- a/src/Storages/StorageExecutable.h +++ b/src/Storages/StorageExecutable.h @@ -29,19 +29,18 @@ public: protected: StorageExecutable( const StorageID & table_id, - const String & format_name_, const String & file_path_, + const String & format_, + BlockInputStreamPtr input_, const ColumnsDescription & columns, const ConstraintsDescription & constraints, - Context & context_, - CompressionMethod compression_method_); + const Context & context_); private: - String format_name; String file_path; - Context & context; - CompressionMethod compression_method; - Poco::Logger * log = &Poco::Logger::get("StorageExecutable"); + String format; + BlockInputStreamPtr input; + const Context & context; }; } diff --git a/src/Storages/ya.make b/src/Storages/ya.make index e0c6cab602f..baaa1af25d4 100644 --- a/src/Storages/ya.make +++ b/src/Storages/ya.make @@ -111,6 +111,7 @@ SRCS( StorageBuffer.cpp StorageDictionary.cpp StorageDistributed.cpp + StorageExecutable.cpp StorageFactory.cpp StorageFile.cpp StorageGenerateRandom.cpp diff --git a/src/TableFunctions/TableFunctionExecutable.cpp b/src/TableFunctions/TableFunctionExecutable.cpp index 8295fc4a508..977847f6d91 100644 --- a/src/TableFunctions/TableFunctionExecutable.cpp +++ b/src/TableFunctions/TableFunctionExecutable.cpp @@ -1,22 +1,71 @@ -#include -#include -#include -#include +#include #include -#include -#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include "registerTableFunctions.h" + namespace DB { -StoragePtr TableFunctionExecutable::getStorage( - const String & source_path, const String & format_, const ColumnsDescription & columns, Context & global_context, const std::string & table_name, const std::string & compression_method_) const + +namespace ErrorCodes { - return StorageExecutable::create(StorageID(getDatabaseName(), table_name), format_, source_path, columns, ConstraintsDescription{}, global_context, chooseCompressionMethod("", compression_method_)); + extern const int LOGICAL_ERROR; + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; +} + +void TableFunctionExecutable::parseArguments(const ASTPtr & ast_function, const Context & context) +{ + const auto * function = ast_function->as(); + + if (!function->arguments) + throw Exception("Table function '" + getName() + "' must have arguments", ErrorCodes::LOGICAL_ERROR); + + auto args = function->arguments->children; + + if (!(args.size() == 3 || args.size() == 4)) + throw Exception("Table function '" + getName() + "' requires exactly 3 or 4 arguments: path, format, structure, [input_query]", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + for (size_t i = 0; i <= 2; ++i) + args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(args[i], context); + + file_path = args[0]->as().value.safeGet(); + format = args[1]->as().value.safeGet(); + structure = evaluateConstantExpressionOrIdentifierAsLiteral(args[2], context)->as().value.safeGet(); + if (args.size() == 4) { + if (!(args[3]->as())) + throw Exception("Table function '" + getName() + "' 4th argument is invalid input query", ErrorCodes::LOGICAL_ERROR); + + input = interpretSubquery(args[3], context, {}, {})->execute().getInputStream(); + } +} + +ColumnsDescription TableFunctionExecutable::getActualTableStructure(const Context & context) const +{ + return parseColumnsListFromString(structure, context); +} + +StoragePtr TableFunctionExecutable::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const +{ + auto storage = StorageExecutable::create(StorageID(getDatabaseName(), table_name), file_path, format, input, getActualTableStructure(context), ConstraintsDescription{}, context); + storage->startup(); + return storage; } void registerTableFunctionExecutable(TableFunctionFactory & factory) { factory.registerFunction(); } + } diff --git a/src/TableFunctions/TableFunctionExecutable.h b/src/TableFunctions/TableFunctionExecutable.h index 7e179327c72..5ee468e5930 100644 --- a/src/TableFunctions/TableFunctionExecutable.h +++ b/src/TableFunctions/TableFunctionExecutable.h @@ -1,28 +1,36 @@ #pragma once -#include - +#include +#include namespace DB { + class Context; + /* executable(path, format, structure, input_query) - creates a temporary storage from executable file * * * The file must be in the clickhouse data directory. * The relative path begins with the clickhouse data directory. */ -class TableFunctionExecutable : public ITableFunctionFileLike +class TableFunctionExecutable : public ITableFunction { public: static constexpr auto name = "executable"; - std::string getName() const override - { - return name; - } + std::string getName() const override { return name; } + bool hasStaticStructure() const override { return true; } private: - StoragePtr getStorage( - const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name, const std::string & compression_method) const override; + StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const override; const char * getStorageTypeName() const override { return "Executable"; } -};} + + ColumnsDescription getActualTableStructure(const Context & context) const override; + void parseArguments(const ASTPtr & ast_function, const Context & context) override; + + String file_path; + String format; + String structure; + BlockInputStreamPtr input; +}; +} diff --git a/src/TableFunctions/ya.make b/src/TableFunctions/ya.make index 2bafb588fbb..267ff4f4c57 100644 --- a/src/TableFunctions/ya.make +++ b/src/TableFunctions/ya.make @@ -11,6 +11,7 @@ SRCS( ITableFunctionFileLike.cpp ITableFunctionXDBC.cpp TableFunctionFactory.cpp + TableFunctionExecutable.cpp TableFunctionFile.cpp TableFunctionGenerateRandom.cpp TableFunctionInput.cpp