diff --git a/dbms/src/Storages/LiveView/StorageLiveView.cpp b/dbms/src/Storages/LiveView/StorageLiveView.cpp index 84ebb497243..e847c9b8b93 100644 --- a/dbms/src/Storages/LiveView/StorageLiveView.cpp +++ b/dbms/src/Storages/LiveView/StorageLiveView.cpp @@ -97,6 +97,11 @@ static void extractDependentTable(ASTPtr & query, String & select_database_name, BlocksPtrs StorageLiveView::collectMergeableBlocks(const Context & context) { + ASTPtr mergeable_query = inner_query; + + if (inner_subquery) + mergeable_query = inner_subquery; + BlocksPtrs new_mergeable_blocks = std::make_shared>(); BlocksPtr base_mergeable_blocks = std::make_shared(); @@ -110,9 +115,11 @@ BlocksPtrs StorageLiveView::collectMergeableBlocks(const Context & context) new_mergeable_blocks->push_back(base_mergeable_blocks); mergeable_blocks = new_mergeable_blocks; + + return mergeable_blocks; } -BlockInputStreams blocksToInputStreams(BlocksPtrs blocks) +BlockInputStreams StorageLiveView::blocksToInputStreams(BlocksPtrs blocks) { BlockInputStreams streams; @@ -131,7 +138,7 @@ BlockInputStreams blocksToInputStreams(BlocksPtrs blocks) } /// Complete query using input streams from mergeable blocks -BlockInputStreamPtr StorageLiveView::completeQuery(BlockInputStreams from) +BlockInputStreamPtr StorageLiveView::completeQuery(BlockInputStreams & from) { auto block_context = std::make_unique(global_context); block_context->makeQueryContext(); @@ -224,7 +231,7 @@ void StorageLiveView::writeIntoLiveView( } } - BlockInputStreamPtr data = completeQuery(std::move(from)); + BlockInputStreamPtr data = live_view.completeQuery(from); copyData(*data, *output); } diff --git a/dbms/src/Storages/LiveView/StorageLiveView.h b/dbms/src/Storages/LiveView/StorageLiveView.h index 46e5008ece3..49bff4bf2ca 100644 --- a/dbms/src/Storages/LiveView/StorageLiveView.h +++ b/dbms/src/Storages/LiveView/StorageLiveView.h @@ -140,8 +140,12 @@ public: std::shared_ptr getBlocksPtr() { return blocks_ptr; } BlocksPtrs getMergeableBlocks() { return mergeable_blocks; } + /// collect and set mergeable blocks. Must be called holding mutex BlocksPtrs collectMergeableBlocks(const Context & context); + /// Complete query using input streams from mergeable blocks + BlockInputStreamPtr completeQuery(BlockInputStreams & from); + void setMergeableBlocks(BlocksPtrs blocks) { mergeable_blocks = blocks; } std::shared_ptr getActivePtr() { return active_ptr; }