Fixing tests.

This commit is contained in:
Nikolai Kochetov 2023-11-14 12:51:25 +00:00
parent 4004248c13
commit d5907e10de
2 changed files with 20 additions and 9 deletions

View File

@ -84,8 +84,10 @@ std::vector<ASTs> PredicateExpressionsOptimizer::extractTablesPredicates(const A
return {}; /// Not optimized when predicate contains stateful function or indeterministic function or window functions return {}; /// Not optimized when predicate contains stateful function or indeterministic function or window functions
} }
/// Skip predicate like `... IN (SELECT ... FROM input())` because
/// it can be duplicated but we can't execute `input()` twice.
if (hasInputTableFunction(predicate_expression)) if (hasInputTableFunction(predicate_expression))
return {}; /// Not optimized when predicate contains input table function return {};
if (!expression_info.is_array_join) if (!expression_info.is_array_join)
{ {

View File

@ -5,6 +5,7 @@
#include <memory> #include <memory>
#include <Processors/ISource.h> #include <Processors/ISource.h>
#include <Processors/Sources/ThrowingExceptionSource.h>
#include <Processors/QueryPlan/ISourceStep.h> #include <Processors/QueryPlan/ISourceStep.h>
#include <Processors/QueryPlan/QueryPlan.h> #include <Processors/QueryPlan/QueryPlan.h>
#include <QueryPipeline/Pipe.h> #include <QueryPipeline/Pipe.h>
@ -60,16 +61,16 @@ public:
ReadFromInput( ReadFromInput(
Block sample_block, Block sample_block,
//StorageSnapshotPtr storage_snapshot_, Pipe pipe_,
StorageInput & storage_) StorageInput & storage_)
: ISourceStep(DataStream{.header = std::move(sample_block)}) : ISourceStep(DataStream{.header = std::move(sample_block)})
//, storage_snapshot(std::move(storage_snapshot_)) , pipe(std::move(pipe_))
, storage(storage_) , storage(storage_)
{ {
} }
private: private:
//StorageSnapshotPtr storage_snapshot; Pipe pipe;
StorageInput & storage; StorageInput & storage;
}; };
@ -85,21 +86,20 @@ void StorageInput::read(
{ {
storage_snapshot->check(column_names); storage_snapshot->check(column_names);
Block sample_block = storage_snapshot->metadata->getSampleBlock(); Block sample_block = storage_snapshot->metadata->getSampleBlock();
Pipe input_source_pipe;
auto query_context = context->getQueryContext(); auto query_context = context->getQueryContext();
/// It is TCP request if we have callbacks for input(). /// It is TCP request if we have callbacks for input().
if (!was_pipe_initialized && query_context->getInputBlocksReaderCallback()) if (query_context->getInputBlocksReaderCallback())
{ {
/// Send structure to the client. /// Send structure to the client.
query_context->initializeInput(shared_from_this()); query_context->initializeInput(shared_from_this());
input_source_pipe = Pipe(std::make_shared<StorageInputSource>(query_context, sample_block));
} }
if (!was_pipe_initialized)
throw Exception(ErrorCodes::INVALID_USAGE_OF_INPUT, "Input stream is not initialized, input() must be used only in INSERT SELECT query");
auto reading = std::make_unique<ReadFromInput>( auto reading = std::make_unique<ReadFromInput>(
std::move(sample_block), std::move(sample_block),
//storage_snapshot, std::move(input_source_pipe),
*this); *this);
query_plan.addStep(std::move(reading)); query_plan.addStep(std::move(reading));
@ -107,6 +107,15 @@ void StorageInput::read(
void ReadFromInput::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) void ReadFromInput::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{ {
if (!pipe.empty())
{
pipeline.init(std::move(pipe));
return;
}
if (!storage.was_pipe_initialized)
throw Exception(ErrorCodes::INVALID_USAGE_OF_INPUT, "Input stream is not initialized, input() must be used only in INSERT SELECT query");
if (storage.was_pipe_used) if (storage.was_pipe_used)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to read from input() twice."); throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to read from input() twice.");