From c74b8e236686d98b194224269f655be425963738 Mon Sep 17 00:00:00 2001 From: artpaul Date: Fri, 30 Dec 2016 18:40:12 +0500 Subject: [PATCH] add comments; count exact number of inserted rows [#METR-23881] --- .../DB/DataStreams/LimitByBlockInputStream.h | 10 ++++++++-- .../DataStreams/LimitByBlockInputStream.cpp | 19 +++++++++++++------ 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/dbms/include/DB/DataStreams/LimitByBlockInputStream.h b/dbms/include/DB/DataStreams/LimitByBlockInputStream.h index 796dd124c36..a9224daecb1 100644 --- a/dbms/include/DB/DataStreams/LimitByBlockInputStream.h +++ b/dbms/include/DB/DataStreams/LimitByBlockInputStream.h @@ -9,6 +9,12 @@ namespace DB { +/** Implements LIMIT BY clause witch can be used to obtain a "top N by subgroup". + * + * For example, if you have table T like this (Num: 1 1 3 3 3 4 4 5 7 7 7 7), + * the query SELECT Num FROM T LIMIT 2 BY Num + * will give you the following result: (Num: 1 1 3 3 4 4 5 7 7). + */ class LimitByBlockInputStream : public IProfilingBlockInputStream { public: @@ -27,8 +33,8 @@ private: private: using MapHashed = HashMap; - Names columns_names; - size_t group_size; + const Names columns_names; + const size_t group_size; MapHashed keys_counts; }; diff --git a/dbms/src/DataStreams/LimitByBlockInputStream.cpp b/dbms/src/DataStreams/LimitByBlockInputStream.cpp index b0df3e9be36..ea39fb7226c 100644 --- a/dbms/src/DataStreams/LimitByBlockInputStream.cpp +++ b/dbms/src/DataStreams/LimitByBlockInputStream.cpp @@ -19,6 +19,8 @@ String LimitByBlockInputStream::getID() const Block LimitByBlockInputStream::readImpl() { + /// Execute until end of stream or until + /// a block with some new records will be gotten. while (true) { Block block = children[0]->read(); @@ -28,7 +30,7 @@ Block LimitByBlockInputStream::readImpl() const ConstColumnPlainPtrs column_ptrs(getKeyColumns(block)); const size_t rows = block.rows(); IColumn::Filter filter(rows); - bool inserted = false; + size_t inserted_count = 0; for (size_t i = 0; i < rows; ++i) { @@ -40,17 +42,22 @@ Block LimitByBlockInputStream::readImpl() hash.get128(key.first, key.second); - const bool valid = (keys_counts[key]++ < group_size); - filter[i] = valid; - inserted |= valid; + if (keys_counts[key]++ < group_size) + { + inserted_count++; + filter[i] = 1; + } + else + filter[i] = 0; } - if (!inserted) + /// Just go to the next block if there isn't any new records in the current one. + if (!inserted_count) continue; size_t all_columns = block.columns(); for (size_t i = 0; i < all_columns; ++i) - block.getByPosition(i).column = block.getByPosition(i).column->filter(filter, -1); + block.getByPosition(i).column = block.getByPosition(i).column->filter(filter, inserted_count); return block; }