diff --git a/dbms/src/Storages/LiveView/StorageLiveView.cpp b/dbms/src/Storages/LiveView/StorageLiveView.cpp index db410eeb5e4..84ebb497243 100644 --- a/dbms/src/Storages/LiveView/StorageLiveView.cpp +++ b/dbms/src/Storages/LiveView/StorageLiveView.cpp @@ -95,6 +95,64 @@ static void extractDependentTable(ASTPtr & query, String & select_database_name, DB::ErrorCodes::LOGICAL_ERROR); } +BlocksPtrs StorageLiveView::collectMergeableBlocks(const Context & context) +{ + BlocksPtrs new_mergeable_blocks = std::make_shared>(); + BlocksPtr base_mergeable_blocks = std::make_shared(); + + InterpreterSelectQuery interpreter(mergeable_query, context, SelectQueryOptions(QueryProcessingStage::WithMergeableState), Names()); + + auto view_mergeable_stream = std::make_shared(interpreter.execute().in); + + while (Block this_block = view_mergeable_stream->read()) + base_mergeable_blocks->push_back(this_block); + + new_mergeable_blocks->push_back(base_mergeable_blocks); + + mergeable_blocks = new_mergeable_blocks; +} + +BlockInputStreams blocksToInputStreams(BlocksPtrs blocks) +{ + BlockInputStreams streams; + + for (auto & blocks_ : *blocks) + { + if (blocks_->empty()) + continue; + + auto sample_block = blocks_->front().cloneEmpty(); + + BlockInputStreamPtr stream = std::make_shared(std::make_shared(blocks_), sample_block); + + streams.push_back(std::move(stream)); + } + return streams; +} + +/// Complete query using input streams from mergeable blocks +BlockInputStreamPtr StorageLiveView::completeQuery(BlockInputStreams from) +{ + auto block_context = std::make_unique(global_context); + block_context->makeQueryContext(); + + auto blocks_storage = StorageBlocks::createStorage(database_name, table_name, parent_storage->getColumns(), + std::move(from), QueryProcessingStage::WithMergeableState); + + block_context->addExternalTable(table_name + "_blocks", blocks_storage); + + InterpreterSelectQuery select(inner_blocks_query->clone(), *block_context, StoragePtr(), SelectQueryOptions(QueryProcessingStage::Complete)); + BlockInputStreamPtr data = std::make_shared(select.execute().in); + + /// Squashing is needed here because the view query can generate a lot of blocks + /// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY + /// and two-level aggregation is triggered). + data = std::make_shared( + data, global_context.getSettingsRef().min_insert_block_size_rows, + global_context.getSettingsRef().min_insert_block_size_bytes); + + return data; +} void StorageLiveView::writeIntoLiveView( StorageLiveView & live_view, @@ -132,41 +190,25 @@ void StorageLiveView::writeIntoLiveView( mergeable_blocks = live_view.getMergeableBlocks(); if (!mergeable_blocks || mergeable_blocks->size() >= context.getGlobalContext().getSettingsRef().max_live_view_insert_blocks_before_refresh) { - mergeable_blocks = std::make_shared>(); - BlocksPtr base_mergeable_blocks = std::make_shared(); - InterpreterSelectQuery interpreter(mergeable_query, context, SelectQueryOptions(QueryProcessingStage::WithMergeableState), Names()); - auto view_mergeable_stream = std::make_shared( - interpreter.execute().in); - while (Block this_block = view_mergeable_stream->read()) - base_mergeable_blocks->push_back(this_block); - mergeable_blocks->push_back(base_mergeable_blocks); - live_view.setMergeableBlocks(mergeable_blocks); - - /// Create from streams - for (auto & blocks_ : *mergeable_blocks) - { - if (blocks_->empty()) - continue; - auto sample_block = blocks_->front().cloneEmpty(); - BlockInputStreamPtr stream = std::make_shared(std::make_shared(blocks_), sample_block); - from.push_back(std::move(stream)); - } - + mergeable_blocks = live_view.collectMergeableBlocks(context); + from = live_view.blocksToInputStreams(mergeable_blocks); is_block_processed = true; } } - auto parent_storage = context.getTable(live_view.getSelectDatabaseName(), live_view.getSelectTableName()); - if (!is_block_processed) { BlockInputStreams streams = {std::make_shared(block)}; + auto blocks_storage = StorageBlocks::createStorage(live_view.database_name, live_view.table_name, - parent_storage->getColumns(), std::move(streams), QueryProcessingStage::FetchColumns); + live_view.getParentStorage()->getColumns(), std::move(streams), QueryProcessingStage::FetchColumns); + InterpreterSelectQuery select_block(mergeable_query, context, blocks_storage, QueryProcessingStage::WithMergeableState); + auto data_mergeable_stream = std::make_shared( select_block.execute().in); + while (Block this_block = data_mergeable_stream->read()) new_mergeable_blocks->push_back(this_block); @@ -178,31 +220,11 @@ void StorageLiveView::writeIntoLiveView( mergeable_blocks = live_view.getMergeableBlocks(); mergeable_blocks->push_back(new_mergeable_blocks); - - /// Create from streams - for (auto & blocks_ : *mergeable_blocks) - { - if (blocks_->empty()) - continue; - auto sample_block = blocks_->front().cloneEmpty(); - BlockInputStreamPtr stream = std::make_shared(std::make_shared(blocks_), sample_block); - from.push_back(std::move(stream)); - } + from = live_view.blocksToInputStreams(mergeable_blocks); } } - auto blocks_storage = StorageBlocks::createStorage(live_view.database_name, live_view.table_name, parent_storage->getColumns(), std::move(from), QueryProcessingStage::WithMergeableState); - block_context->addExternalTable(live_view.table_name + "_blocks", blocks_storage); - - InterpreterSelectQuery select(live_view.getInnerBlocksQuery(), *block_context, StoragePtr(), SelectQueryOptions(QueryProcessingStage::Complete)); - BlockInputStreamPtr data = std::make_shared(select.execute().in); - - /// Squashing is needed here because the view query can generate a lot of blocks - /// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY - /// and two-level aggregation is triggered). - data = std::make_shared( - data, context.getGlobalContext().getSettingsRef().min_insert_block_size_rows, context.getGlobalContext().getSettingsRef().min_insert_block_size_bytes); - + BlockInputStreamPtr data = completeQuery(std::move(from)); copyData(*data, *output); } @@ -247,6 +269,8 @@ StorageLiveView::StorageLiveView( DatabaseAndTableName(select_database_name, select_table_name), DatabaseAndTableName(database_name, table_name)); + parent_storage = local_context.getTable(select_database_name, select_table_name); + is_temporary = query.temporary; temporary_live_view_timeout = local_context.getSettingsRef().temporary_live_view_timeout.totalSeconds(); diff --git a/dbms/src/Storages/LiveView/StorageLiveView.h b/dbms/src/Storages/LiveView/StorageLiveView.h index a5b0f15e879..46e5008ece3 100644 --- a/dbms/src/Storages/LiveView/StorageLiveView.h +++ b/dbms/src/Storages/LiveView/StorageLiveView.h @@ -45,6 +45,7 @@ public: String getDatabaseName() const override { return database_name; } String getSelectDatabaseName() const { return select_database_name; } String getSelectTableName() const { return select_table_name; } + StoragePtr getParentStorage() const { return parent_storage; } NameAndTypePair getColumn(const String & column_name) const override; bool hasColumn(const String & column_name) const override; @@ -139,6 +140,8 @@ public: std::shared_ptr getBlocksPtr() { return blocks_ptr; } BlocksPtrs getMergeableBlocks() { return mergeable_blocks; } + /// collect and set mergeable blocks. Must be called holding mutex + BlocksPtrs collectMergeableBlocks(const Context & context); void setMergeableBlocks(BlocksPtrs blocks) { mergeable_blocks = blocks; } std::shared_ptr getActivePtr() { return active_ptr; } @@ -147,6 +150,9 @@ public: Block getHeader() const; + /// convert blocks to input streams + static BlockInputStreams blocksToInputStreams(BlocksPtrs blocks); + static void writeIntoLiveView( StorageLiveView & live_view, const Block & block, @@ -162,6 +168,7 @@ private: ASTPtr inner_blocks_query; /// query over the mergeable blocks to produce final result Context & global_context; std::unique_ptr live_view_context; + StoragePtr parent_storage; bool is_temporary = false; /// Mutex to protect access to sample block