From 3c53dfd227be24a4f3189e15ee94db7d5b047e57 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Fri, 13 Sep 2019 18:41:09 +0300 Subject: [PATCH] Add processors to StorageMergeTree [WIP]. --- .../MergeTreeBaseSelectBlockInputStream.cpp | 107 ++++++++++++++---- .../MergeTreeBaseSelectBlockInputStream.h | 22 ++-- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 14 +-- .../MergeTree/MergeTreeDataSelectExecutor.h | 10 +- .../Storages/MergeTree/MergeTreeRangeReader.h | 2 +- .../MergeTreeThreadSelectBlockInputStream.cpp | 14 +-- .../MergeTreeThreadSelectBlockInputStream.h | 4 +- dbms/src/Storages/StorageMergeTree.cpp | 2 +- dbms/src/Storages/StorageMergeTree.h | 2 +- 9 files changed, 112 insertions(+), 65 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MergeTreeBaseSelectBlockInputStream.cpp b/dbms/src/Storages/MergeTree/MergeTreeBaseSelectBlockInputStream.cpp index 0489182fe55..077e3ea0712 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeBaseSelectBlockInputStream.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeBaseSelectBlockInputStream.cpp @@ -19,7 +19,8 @@ namespace ErrorCodes } -MergeTreeBaseSelectBlockInputStream::MergeTreeBaseSelectBlockInputStream( +MergeTreeBaseSelectBlockInputProcessor::MergeTreeBaseSelectBlockInputProcessor( + Block header, const MergeTreeData & storage_, const PrewhereInfoPtr & prewhere_info_, UInt64 max_block_size_rows_, @@ -31,6 +32,7 @@ MergeTreeBaseSelectBlockInputStream::MergeTreeBaseSelectBlockInputStream( bool save_marks_in_cache_, const Names & virt_column_names_) : + ISource(getHeader(std::move(header), prewhere_info_, virt_column_names_)), storage(storage_), prewhere_info(prewhere_info_), max_block_size_rows(max_block_size_rows_), @@ -45,26 +47,27 @@ MergeTreeBaseSelectBlockInputStream::MergeTreeBaseSelectBlockInputStream( } -Block MergeTreeBaseSelectBlockInputStream::readImpl() +Chunk MergeTreeBaseSelectBlockInputProcessor::generate() { - Block res; - - while (!res && !isCancelled()) + while (!isCancelled()) { if ((!task || task->isFinished()) && !getNewTask()) - break; + return {}; - res = readFromPart(); + auto res = readFromPart(); - if (res) - injectVirtualColumns(res); + if (!res.hasNoRows()) + { + injectVirtualColumns(res, task.get(), virt_column_names); + return res; + } } - return res; + return {}; } -void MergeTreeBaseSelectBlockInputStream::initializeRangeReaders(MergeTreeReadTask & current_task) +void MergeTreeBaseSelectBlockInputProcessor::initializeRangeReaders(MergeTreeReadTask & current_task) { if (prewhere_info) { @@ -103,7 +106,7 @@ void MergeTreeBaseSelectBlockInputStream::initializeRangeReaders(MergeTreeReadTa } -Block MergeTreeBaseSelectBlockInputStream::readFromPartImpl() +Chunk MergeTreeBaseSelectBlockInputProcessor::readFromPartImpl() { if (task->size_predictor) task->size_predictor->startBlock(); @@ -160,7 +163,8 @@ Block MergeTreeBaseSelectBlockInputStream::readFromPartImpl() UInt64 num_filtered_rows = read_result.numReadRows() - read_result.block.rows(); - progressImpl({ read_result.numReadRows(), read_result.numBytesRead() }); + /// TODO + /// progressImpl({ read_result.numReadRows(), read_result.numBytesRead() }); if (task->size_predictor) { @@ -177,13 +181,14 @@ Block MergeTreeBaseSelectBlockInputStream::readFromPartImpl() column.column = column.column->convertToFullColumnIfConst(); } - read_result.block.checkNumberOfRows(); + UInt64 num_rows = read_result.columns.empty() ? 0 + : read_result.columns[0]->size(); - return read_result.block; + return Chunk(std::move(read_result.columns), num_rows); } -Block MergeTreeBaseSelectBlockInputStream::readFromPart() +Chunk MergeTreeBaseSelectBlockInputProcessor::readFromPart() { if (!task->range_reader.isInitialized()) initializeRangeReaders(*task); @@ -192,15 +197,18 @@ Block MergeTreeBaseSelectBlockInputStream::readFromPart() } -void MergeTreeBaseSelectBlockInputStream::injectVirtualColumns(Block & block) const +template +static void injectVirtualColumnsImpl(size_t rows, InsertCallback & callback, MergeTreeReadTask * task, const Names & virtual_columns) { /// add virtual columns /// Except _sample_factor, which is added from the outside. - if (!virt_column_names.empty()) + if (!virtual_columns.empty()) { - const auto rows = block.rows(); + if (unlikely(rows && !task)) + throw Exception("Cannot insert virtual columns to non-empty chunk without specified task.", + ErrorCodes::LOGICAL_ERROR); - for (const auto & virt_column_name : virt_column_names) + for (const auto & virt_column_name : virtual_columns) { if (virt_column_name == "_part") { @@ -210,7 +218,7 @@ void MergeTreeBaseSelectBlockInputStream::injectVirtualColumns(Block & block) co else column = DataTypeString().createColumn(); - block.insert({ column, std::make_shared(), virt_column_name}); + callback.template insert(column, virt_column_name); } else if (virt_column_name == "_part_index") { @@ -220,7 +228,7 @@ void MergeTreeBaseSelectBlockInputStream::injectVirtualColumns(Block & block) co else column = DataTypeUInt64().createColumn(); - block.insert({ column, std::make_shared(), virt_column_name}); + callback.template insert(column, virt_column_name); } else if (virt_column_name == "_partition_id") { @@ -230,14 +238,55 @@ void MergeTreeBaseSelectBlockInputStream::injectVirtualColumns(Block & block) co else column = DataTypeString().createColumn(); - block.insert({ column, std::make_shared(), virt_column_name}); + callback.template insert(column, virt_column_name); } } } } +namespace +{ + struct InsertIntoBlockCallback + { + template + void insert(const ColumnPtr & column, const String & name) + { + block.insert({column, std::make_shared(), name}); + } -void MergeTreeBaseSelectBlockInputStream::executePrewhereActions(Block & block, const PrewhereInfoPtr & prewhere_info) + Block & block; + }; + + struct InsertIntoColumnsCallback + { + template + void insert(const ColumnPtr & column, const String &) + { + columns.push_back(column); + } + + Columns & columns; + }; +} + +void MergeTreeBaseSelectBlockInputProcessor::injectVirtualColumns(Block & block, MergeTreeReadTask * task, const Names & virtual_columns) +{ + InsertIntoBlockCallback callback { block }; + injectVirtualColumnsImpl(block.rows(), callback, task, virtual_columns); +} + +void MergeTreeBaseSelectBlockInputProcessor::injectVirtualColumns(Chunk & chunk, MergeTreeReadTask * task, const Names & virtual_columns) +{ + UInt64 num_rows = chunk.getNumRows(); + auto columns = chunk.detachColumns(); + + InsertIntoColumnsCallback callback { columns }; + injectVirtualColumnsImpl(num_rows, callback, task, virtual_columns); + + chunk.setColumns(columns, num_rows); +} + +void MergeTreeBaseSelectBlockInputProcessor::executePrewhereActions(Block & block, const PrewhereInfoPtr & prewhere_info) { if (prewhere_info) { @@ -253,7 +302,15 @@ void MergeTreeBaseSelectBlockInputStream::executePrewhereActions(Block & block, } } +Block MergeTreeBaseSelectBlockInputProcessor::getHeader( + Block block, const PrewhereInfoPtr & prewhere_info, const Names & virtual_columns) +{ + executePrewhereActions(block, prewhere_info); + injectVirtualColumns(block, nullptr, virtual_columns); + return block; +} -MergeTreeBaseSelectBlockInputStream::~MergeTreeBaseSelectBlockInputStream() = default; + +MergeTreeBaseSelectBlockInputProcessor::~MergeTreeBaseSelectBlockInputProcessor() = default; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeBaseSelectBlockInputStream.h b/dbms/src/Storages/MergeTree/MergeTreeBaseSelectBlockInputStream.h index 640f73652e4..0abbb2d001c 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeBaseSelectBlockInputStream.h +++ b/dbms/src/Storages/MergeTree/MergeTreeBaseSelectBlockInputStream.h @@ -5,6 +5,8 @@ #include #include +#include + namespace DB { @@ -14,10 +16,11 @@ class MarkCache; /// Base class for MergeTreeThreadSelectBlockInputStream and MergeTreeSelectBlockInputStream -class MergeTreeBaseSelectBlockInputStream : public IBlockInputStream +class MergeTreeBaseSelectBlockInputProcessor : public ISource { public: - MergeTreeBaseSelectBlockInputStream( + MergeTreeBaseSelectBlockInputProcessor( + Block header, const MergeTreeData & storage_, const PrewhereInfoPtr & prewhere_info_, UInt64 max_block_size_rows_, @@ -29,24 +32,23 @@ public: bool save_marks_in_cache_ = true, const Names & virt_column_names_ = {}); - ~MergeTreeBaseSelectBlockInputStream() override; + ~MergeTreeBaseSelectBlockInputProcessor() override; static void executePrewhereActions(Block & block, const PrewhereInfoPtr & prewhere_info); protected: - Block readImpl() final; + Chunk generate() final; /// Creates new this->task, and initilizes readers virtual bool getNewTask() = 0; - /// We will call progressImpl manually. - void progress(const Progress &) override {} + virtual Chunk readFromPart(); - virtual Block readFromPart(); + Chunk readFromPartImpl(); - Block readFromPartImpl(); - - void injectVirtualColumns(Block & block) const; + static void injectVirtualColumns(Block & block, MergeTreeReadTask * task, const Names & virtual_columns); + static void injectVirtualColumns(Chunk & chunk, MergeTreeReadTask * task, const Names & virtual_columns); + static Block getHeader(Block block, const PrewhereInfoPtr & prewhere_info, const Names & virtual_columns); void initializeRangeReaders(MergeTreeReadTask & task); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 40dc0bf6b52..95f76a4c7f7 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -141,7 +141,7 @@ static RelativeSize convertAbsoluteSampleSizeToRelative(const ASTPtr & node, siz } -BlockInputStreams MergeTreeDataSelectExecutor::read( +Pipes MergeTreeDataSelectExecutor::read( const Names & column_names_to_return, const SelectQueryInfo & query_info, const Context & context, @@ -154,7 +154,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( max_block_size, num_streams, max_block_numbers_to_read); } -BlockInputStreams MergeTreeDataSelectExecutor::readFromParts( +Pipes MergeTreeDataSelectExecutor::readFromParts( MergeTreeData::DataPartsVector parts, const Names & column_names_to_return, const SelectQueryInfo & query_info, @@ -565,7 +565,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts( ProfileEvents::increment(ProfileEvents::SelectedRanges, sum_ranges); ProfileEvents::increment(ProfileEvents::SelectedMarks, sum_marks); - BlockInputStreams res; + Pipes res; if (select.final()) { @@ -658,7 +658,7 @@ size_t roundRowsOrBytesToMarks( } -BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams( +Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams( RangesInDataParts && parts, size_t num_streams, const Names & column_names, @@ -707,7 +707,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams( if (sum_marks > max_marks_to_use_cache) use_uncompressed_cache = false; - BlockInputStreams res; + Pipes res; if (sum_marks > 0 && settings.merge_tree_uniform_read_distribution == 1) { @@ -817,7 +817,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams( return res; } -BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder( +Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder( RangesInDataParts && parts, size_t num_streams, const Names & column_names, @@ -1026,7 +1026,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithO } -BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( +Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( RangesInDataParts && parts, const Names & column_names, UInt64 max_block_size, diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h index 44857799d01..9b46b663ab2 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h @@ -24,7 +24,7 @@ public: */ using PartitionIdToMaxBlock = std::unordered_map; - BlockInputStreams read( + Pipes read( const Names & column_names, const SelectQueryInfo & query_info, const Context & context, @@ -32,7 +32,7 @@ public: unsigned num_streams, const PartitionIdToMaxBlock * max_block_numbers_to_read = nullptr) const; - BlockInputStreams readFromParts( + Pipes readFromParts( MergeTreeData::DataPartsVector parts, const Names & column_names, const SelectQueryInfo & query_info, @@ -46,7 +46,7 @@ private: Logger * log; - BlockInputStreams spreadMarkRangesAmongStreams( + Pipes spreadMarkRangesAmongStreams( RangesInDataParts && parts, size_t num_streams, const Names & column_names, @@ -56,7 +56,7 @@ private: const Names & virt_columns, const Settings & settings) const; - BlockInputStreams spreadMarkRangesAmongStreamsWithOrder( + Pipes spreadMarkRangesAmongStreamsWithOrder( RangesInDataParts && parts, size_t num_streams, const Names & column_names, @@ -67,7 +67,7 @@ private: const Names & virt_columns, const Settings & settings) const; - BlockInputStreams spreadMarkRangesAmongStreamsFinal( + Pipes spreadMarkRangesAmongStreamsFinal( RangesInDataParts && parts, const Names & column_names, UInt64 max_block_size, diff --git a/dbms/src/Storages/MergeTree/MergeTreeRangeReader.h b/dbms/src/Storages/MergeTree/MergeTreeRangeReader.h index 0eae69ee17e..4261509d7fc 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeRangeReader.h +++ b/dbms/src/Storages/MergeTree/MergeTreeRangeReader.h @@ -157,7 +157,7 @@ public: void addNumBytesRead(size_t count) { num_bytes_read += count; } - Block block; + Columns columns; private: RangesInfo started_ranges; diff --git a/dbms/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputStream.cpp b/dbms/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputStream.cpp index 69cf173212d..cd6efa6b7d1 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputStream.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputStream.cpp @@ -20,7 +20,7 @@ MergeTreeThreadSelectBlockInputStream::MergeTreeThreadSelectBlockInputStream( const Settings & settings, const Names & virt_column_names_) : - MergeTreeBaseSelectBlockInputStream{storage_, prewhere_info_, max_block_size_rows_, + MergeTreeBaseSelectBlockInputProcessor{pool->getHeader(), storage_, prewhere_info_, max_block_size_rows_, preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, use_uncompressed_cache_, true, virt_column_names_}, thread{thread_}, @@ -38,19 +38,9 @@ MergeTreeThreadSelectBlockInputStream::MergeTreeThreadSelectBlockInputStream( else min_marks_to_read = min_marks_to_read_; - ordered_names = getHeader().getNames(); + ordered_names = getPort().getHeader().getNames(); } - -Block MergeTreeThreadSelectBlockInputStream::getHeader() const -{ - auto res = pool->getHeader(); - executePrewhereActions(res, prewhere_info); - injectVirtualColumns(res); - return res; -} - - /// Requests read task from MergeTreeReadPool and signals whether it got one bool MergeTreeThreadSelectBlockInputStream::getNewTask() { diff --git a/dbms/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputStream.h b/dbms/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputStream.h index 3c7dfb7927d..9603d21fb33 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputStream.h +++ b/dbms/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputStream.h @@ -11,7 +11,7 @@ class MergeTreeReadPool; /** Used in conjunction with MergeTreeReadPool, asking it for more work to do and performing whatever reads it is asked * to perform. */ -class MergeTreeThreadSelectBlockInputStream : public MergeTreeBaseSelectBlockInputStream +class MergeTreeThreadSelectBlockInputStream : public MergeTreeBaseSelectBlockInputProcessor { public: MergeTreeThreadSelectBlockInputStream( @@ -31,8 +31,6 @@ public: ~MergeTreeThreadSelectBlockInputStream() override; - Block getHeader() const override; - protected: /// Requests read task from MergeTreeReadPool and signals whether it got one bool getNewTask() override; diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 77c5a909f0c..bc50dec5b72 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -121,7 +121,7 @@ StorageMergeTree::~StorageMergeTree() shutdown(); } -BlockInputStreams StorageMergeTree::read( +Pipes StorageMergeTree::readWithProcessors( const Names & column_names, const SelectQueryInfo & query_info, const Context & context, diff --git a/dbms/src/Storages/StorageMergeTree.h b/dbms/src/Storages/StorageMergeTree.h index 04b20fda5b9..6d55b4655ce 100644 --- a/dbms/src/Storages/StorageMergeTree.h +++ b/dbms/src/Storages/StorageMergeTree.h @@ -35,7 +35,7 @@ public: bool supportsIndexForIn() const override { return true; } - BlockInputStreams read( + Pipes readWithProcessors( const Names & column_names, const SelectQueryInfo & query_info, const Context & context,