diff --git a/src/Interpreters/PredicateExpressionsOptimizer.cpp b/src/Interpreters/PredicateExpressionsOptimizer.cpp index 885c99aeb90..8dc8c1c92cc 100644 --- a/src/Interpreters/PredicateExpressionsOptimizer.cpp +++ b/src/Interpreters/PredicateExpressionsOptimizer.cpp @@ -84,8 +84,10 @@ std::vector PredicateExpressionsOptimizer::extractTablesPredicates(const A return {}; /// Not optimized when predicate contains stateful function or indeterministic function or window functions } + /// Skip predicate like `... IN (SELECT ... FROM input())` because + /// it can be duplicated but we can't execute `input()` twice. if (hasInputTableFunction(predicate_expression)) - return {}; /// Not optimized when predicate contains input table function + return {}; if (!expression_info.is_array_join) { diff --git a/src/Storages/StorageInput.cpp b/src/Storages/StorageInput.cpp index 2314d3fb581..7f1eeaedfb1 100644 --- a/src/Storages/StorageInput.cpp +++ b/src/Storages/StorageInput.cpp @@ -5,6 +5,7 @@ #include #include +#include #include #include #include @@ -60,16 +61,16 @@ public: ReadFromInput( Block sample_block, - //StorageSnapshotPtr storage_snapshot_, + Pipe pipe_, StorageInput & storage_) : ISourceStep(DataStream{.header = std::move(sample_block)}) - //, storage_snapshot(std::move(storage_snapshot_)) + , pipe(std::move(pipe_)) , storage(storage_) { } private: - //StorageSnapshotPtr storage_snapshot; + Pipe pipe; StorageInput & storage; }; @@ -85,21 +86,20 @@ void StorageInput::read( { storage_snapshot->check(column_names); Block sample_block = storage_snapshot->metadata->getSampleBlock(); + Pipe input_source_pipe; auto query_context = context->getQueryContext(); /// It is TCP request if we have callbacks for input(). - if (!was_pipe_initialized && query_context->getInputBlocksReaderCallback()) + if (query_context->getInputBlocksReaderCallback()) { /// Send structure to the client. query_context->initializeInput(shared_from_this()); + input_source_pipe = Pipe(std::make_shared(query_context, sample_block)); } - if (!was_pipe_initialized) - throw Exception(ErrorCodes::INVALID_USAGE_OF_INPUT, "Input stream is not initialized, input() must be used only in INSERT SELECT query"); - auto reading = std::make_unique( std::move(sample_block), - //storage_snapshot, + std::move(input_source_pipe), *this); query_plan.addStep(std::move(reading)); @@ -107,6 +107,15 @@ void StorageInput::read( void ReadFromInput::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) { + if (!pipe.empty()) + { + pipeline.init(std::move(pipe)); + return; + } + + if (!storage.was_pipe_initialized) + throw Exception(ErrorCodes::INVALID_USAGE_OF_INPUT, "Input stream is not initialized, input() must be used only in INSERT SELECT query"); + if (storage.was_pipe_used) throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to read from input() twice.");