diff --git a/dbms/src/Core/iostream_debug_helpers.cpp b/dbms/src/Core/iostream_debug_helpers.cpp index 4277126e7e2..3e29ff14e85 100644 --- a/dbms/src/Core/iostream_debug_helpers.cpp +++ b/dbms/src/Core/iostream_debug_helpers.cpp @@ -115,7 +115,6 @@ std::ostream & operator<<(std::ostream & stream, const Connection::Packet & what std::ostream & operator<<(std::ostream & stream, const SubqueryForSet & what) { stream << "SubqueryForSet(source = " << what.source - << ", source_sample = " << what.source_sample // TODO: << ", set = " << what.set << ", join = " << what.join << ", table = " << what.table << ")"; diff --git a/dbms/src/DataStreams/LazyBlockInputStream.h b/dbms/src/DataStreams/LazyBlockInputStream.h index 55f9c7053e1..783995a2496 100644 --- a/dbms/src/DataStreams/LazyBlockInputStream.h +++ b/dbms/src/DataStreams/LazyBlockInputStream.h @@ -15,14 +15,13 @@ class LazyBlockInputStream : public IProfilingBlockInputStream public: using Generator = std::function; - LazyBlockInputStream(Generator generator_) - : generator(std::move(generator_)) + LazyBlockInputStream(const Block & header_, Generator generator_) + : header(header_), generator(std::move(generator_)) { } - LazyBlockInputStream(const char * name_, Generator generator_) - : name(name_) - , generator(std::move(generator_)) + LazyBlockInputStream(const char * name_, const Block & header_, Generator generator_) + : name(name_), header(header_), generator(std::move(generator_)) { } @@ -36,88 +35,70 @@ public: Block getHeader() override { - std::cerr << "LazyBlockInputStream::getHeader()\n"; - - init(); - if (!input) - return {}; - - return input->getHeader(); + return header; } protected: Block readImpl() override { - init(); if (!input) - return {}; + { + input = generator(); + + if (!input) + return Block(); + + auto * p_input = dynamic_cast(input.get()); + + if (p_input) + { + /// They could have been set before, but were not passed into the `input`. + if (progress_callback) + p_input->setProgressCallback(progress_callback); + if (process_list_elem) + p_input->setProcessListElement(process_list_elem); + } + + input->readPrefix(); + + { + std::lock_guard lock(cancel_mutex); + + /** TODO Data race here. See IProfilingBlockInputStream::collectAndSendTotalRowsApprox. + Assume following pipeline: + + RemoteBlockInputStream + AsynchronousBlockInputStream + LazyBlockInputStream + + RemoteBlockInputStream calls AsynchronousBlockInputStream::readPrefix + and AsynchronousBlockInputStream spawns a thread and returns. + + The separate thread will call LazyBlockInputStream::read + LazyBlockInputStream::read will add more children to itself + + In the same moment, in main thread, RemoteBlockInputStream::read is called, + then IProfilingBlockInputStream::collectAndSendTotalRowsApprox is called + and iterates over set of children. + */ + children.push_back(input); + + if (isCancelled() && p_input) + p_input->cancel(); + } + } return input->read(); } private: const char * name = "Lazy"; + Block header; Generator generator; - bool initialized = false; BlockInputStreamPtr input; std::mutex cancel_mutex; - - void init() - { - if (initialized) - return; - - std::cerr << "LazyBlockInputStream::init()\n"; - - input = generator(); - initialized = true; - - if (!input) - return; - - std::cerr << "!\n"; - - auto * p_input = dynamic_cast(input.get()); - - if (p_input) - { - /// They could have been set before, but were not passed into the `input`. - if (progress_callback) - p_input->setProgressCallback(progress_callback); - if (process_list_elem) - p_input->setProcessListElement(process_list_elem); - } - - input->readPrefix(); - - { - std::lock_guard lock(cancel_mutex); - - /** TODO Data race here. See IProfilingBlockInputStream::collectAndSendTotalRowsApprox. - Assume following pipeline: - - RemoteBlockInputStream - AsynchronousBlockInputStream - LazyBlockInputStream - - RemoteBlockInputStream calls AsynchronousBlockInputStream::readPrefix - and AsynchronousBlockInputStream spawns a thread and returns. - - The separate thread will call LazyBlockInputStream::read - LazyBlockInputStream::read will add more children to itself - - In the same moment, in main thread, RemoteBlockInputStream::read is called, - then IProfilingBlockInputStream::collectAndSendTotalRowsApprox is called - and iterates over set of children. - */ - children.push_back(input); - - if (isCancelled() && p_input) - p_input->cancel(); - } - } }; } diff --git a/dbms/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp b/dbms/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp index 7ca008a0a2d..43ef98dfb26 100644 --- a/dbms/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp +++ b/dbms/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp @@ -198,7 +198,7 @@ void SelectStreamFactory::createForShard( } }; - res.emplace_back(std::make_shared("LazyShardWithLocalReplica", lazily_create_stream)); + res.emplace_back(std::make_shared("LazyShardWithLocalReplica", header, lazily_create_stream)); } else emplace_remote_stream(); diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.cpp b/dbms/src/Interpreters/ExpressionAnalyzer.cpp index a519ebb6dbd..259cd8f3503 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.cpp +++ b/dbms/src/Interpreters/ExpressionAnalyzer.cpp @@ -883,7 +883,6 @@ void ExpressionAnalyzer::addExternalStorage(ASTPtr & subquery_or_table_name_or_t external_tables[external_table_name] = external_storage; subqueries_for_sets[external_table_name].source = interpreter->execute().in; - subqueries_for_sets[external_table_name].source_sample = interpreter->getSampleBlock(); subqueries_for_sets[external_table_name].table = external_storage; /** NOTE If it was written IN tmp_table - the existing temporary (but not external) table, @@ -1661,8 +1660,7 @@ void ExpressionAnalyzer::makeSet(const ASTFunction * node, const Block & sample_ { auto interpreter = interpretSubquery(arg, context, subquery_depth, {}); subquery_for_set.source = std::make_shared( - [interpreter]() mutable { return interpreter->execute().in; }); - subquery_for_set.source_sample = interpreter->getSampleBlock(); + interpreter->getSampleBlock(), [interpreter]() mutable { return interpreter->execute().in; }); /** Why is LazyBlockInputStream used? * @@ -2486,13 +2484,14 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty table = table_to_join.subquery; auto interpreter = interpretSubquery(table, context, subquery_depth, required_joined_columns); - subquery_for_set.source = std::make_shared([interpreter]() mutable { return interpreter->execute().in; }); - subquery_for_set.source_sample = interpreter->getSampleBlock(); + subquery_for_set.source = std::make_shared( + interpreter->getSampleBlock(), + [interpreter]() mutable { return interpreter->execute().in; }); } /// TODO You do not need to set this up when JOIN is only needed on remote servers. subquery_for_set.join = join; - subquery_for_set.join->setSampleBlock(subquery_for_set.source_sample); + subquery_for_set.join->setSampleBlock(subquery_for_set.source->getHeader()); } addJoinAction(step.actions, false); diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.h b/dbms/src/Interpreters/ExpressionAnalyzer.h index b11bd025a2e..ccd60b296cb 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.h +++ b/dbms/src/Interpreters/ExpressionAnalyzer.h @@ -41,7 +41,6 @@ struct SubqueryForSet { /// The source is obtained using the InterpreterSelectQuery subquery. BlockInputStreamPtr source; - Block source_sample; /// If set, build it from result. SetPtr set; diff --git a/dbms/src/Storages/StorageMerge.cpp b/dbms/src/Storages/StorageMerge.cpp index 319b398f372..39be47eab9b 100644 --- a/dbms/src/Storages/StorageMerge.cpp +++ b/dbms/src/Storages/StorageMerge.cpp @@ -137,6 +137,7 @@ BlockInputStreams StorageMerge::read( const unsigned num_streams) { BlockInputStreams res; + Block header = getSampleBlockForColumns(column_names); Names virt_column_names, real_column_names; for (const auto & it : column_names) @@ -223,13 +224,13 @@ BlockInputStreams StorageMerge::read( for (auto & stream : source_streams) { /// will throw if some columns not convertible - stream = std::make_shared(context, stream, getSampleBlock()); + stream = std::make_shared(context, stream, header); } } else { /// If many streams, initialize it lazily, to avoid long delay before start of query processing. - source_streams.emplace_back(std::make_shared([=] + source_streams.emplace_back(std::make_shared(header, [=] { QueryProcessingStage::Enum processed_stage_in_source_table = processed_stage; BlockInputStreams streams = table->read( @@ -247,11 +248,11 @@ BlockInputStreams StorageMerge::read( throw Exception("Source tables for Merge table are processing data up to different stages", ErrorCodes::INCOMPATIBLE_SOURCE_TABLES); - auto stream = streams.empty() ? std::make_shared(getSampleBlock()) : streams.front(); + auto stream = streams.empty() ? std::make_shared(header) : streams.front(); if (!streams.empty()) { /// will throw if some columns not convertible - stream = std::make_shared(context, stream, getSampleBlock()); + stream = std::make_shared(context, stream, header); } return stream; }));