add comments; count exact number of inserted rows [#METR-23881]

This commit is contained in:
artpaul 2016-12-30 18:40:12 +05:00
parent 273e58f8d7
commit c74b8e2366
2 changed files with 21 additions and 8 deletions

View File

@ -9,6 +9,12 @@
namespace DB namespace DB
{ {
/** Implements LIMIT BY clause witch can be used to obtain a "top N by subgroup".
*
* For example, if you have table T like this (Num: 1 1 3 3 3 4 4 5 7 7 7 7),
* the query SELECT Num FROM T LIMIT 2 BY Num
* will give you the following result: (Num: 1 1 3 3 4 4 5 7 7).
*/
class LimitByBlockInputStream : public IProfilingBlockInputStream class LimitByBlockInputStream : public IProfilingBlockInputStream
{ {
public: public:
@ -27,8 +33,8 @@ private:
private: private:
using MapHashed = HashMap<UInt128, UInt64, UInt128TrivialHash>; using MapHashed = HashMap<UInt128, UInt64, UInt128TrivialHash>;
Names columns_names; const Names columns_names;
size_t group_size; const size_t group_size;
MapHashed keys_counts; MapHashed keys_counts;
}; };

View File

@ -19,6 +19,8 @@ String LimitByBlockInputStream::getID() const
Block LimitByBlockInputStream::readImpl() Block LimitByBlockInputStream::readImpl()
{ {
/// Execute until end of stream or until
/// a block with some new records will be gotten.
while (true) while (true)
{ {
Block block = children[0]->read(); Block block = children[0]->read();
@ -28,7 +30,7 @@ Block LimitByBlockInputStream::readImpl()
const ConstColumnPlainPtrs column_ptrs(getKeyColumns(block)); const ConstColumnPlainPtrs column_ptrs(getKeyColumns(block));
const size_t rows = block.rows(); const size_t rows = block.rows();
IColumn::Filter filter(rows); IColumn::Filter filter(rows);
bool inserted = false; size_t inserted_count = 0;
for (size_t i = 0; i < rows; ++i) for (size_t i = 0; i < rows; ++i)
{ {
@ -40,17 +42,22 @@ Block LimitByBlockInputStream::readImpl()
hash.get128(key.first, key.second); hash.get128(key.first, key.second);
const bool valid = (keys_counts[key]++ < group_size); if (keys_counts[key]++ < group_size)
filter[i] = valid; {
inserted |= valid; inserted_count++;
filter[i] = 1;
}
else
filter[i] = 0;
} }
if (!inserted) /// Just go to the next block if there isn't any new records in the current one.
if (!inserted_count)
continue; continue;
size_t all_columns = block.columns(); size_t all_columns = block.columns();
for (size_t i = 0; i < all_columns; ++i) for (size_t i = 0; i < all_columns; ++i)
block.getByPosition(i).column = block.getByPosition(i).column->filter(filter, -1); block.getByPosition(i).column = block.getByPosition(i).column->filter(filter, inserted_count);
return block; return block;
} }