Fixed SummingMergeTree. [#CLICKHOUSE-2]

This commit is contained in:
Vitaliy Lyudvichenko 2017-11-16 22:03:32 +03:00 committed by alexey-milovidov
parent a2b8ae3100
commit ab0aff8709
5 changed files with 76 additions and 51 deletions

2
contrib/poco vendored

@ -1 +1 @@
Subproject commit e30352c2c24eebecbd82d41f7054d908ac7fdc37
Subproject commit 1366df1c7e068bb2efd846bc8dc8e286b090904e

View File

@ -36,7 +36,7 @@ String SummingSortedBlockInputStream::getID() const
}
void SummingSortedBlockInputStream::insertCurrentRow(ColumnPlainPtrs & merged_columns)
void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(ColumnPlainPtrs & merged_columns, bool force_insertion)
{
for (auto & desc : columns_to_aggregate)
{
@ -46,6 +46,19 @@ void SummingSortedBlockInputStream::insertCurrentRow(ColumnPlainPtrs & merged_co
try
{
desc.function->insertResultInto(desc.state.data(), *desc.merged_column);
/// Update zero status of current row
if (desc.column_numbers.size() == 1)
{
// Flag row as non-empty if at least one column number if non-zero
current_row_is_zero = current_row_is_zero && desc.merged_column->get64(desc.merged_column->size() - 1) == 0;
}
else
{
/// It is sumMap aggregate function.
/// Assume that the row isn't empty in this case (just because it is compatible with previous version)
current_row_is_zero = false;
}
}
catch (...)
{
@ -60,8 +73,22 @@ void SummingSortedBlockInputStream::insertCurrentRow(ColumnPlainPtrs & merged_co
desc.merged_column->insertDefault();
}
/// If it is "zero" row and it is not the last row of the result block, then
/// rollback the insertion (at this moment we need rollback only cols from columns_to_aggregate)
if (!force_insertion && current_row_is_zero)
{
for (auto & desc : columns_to_aggregate)
desc.merged_column->popBack(1);
return;
}
for (auto i : column_numbers_not_to_aggregate)
merged_columns[i]->insert(current_row[i]);
/// Update per-block and per-group flags
++merged_rows;
output_is_non_empty = true;
}
@ -155,6 +182,7 @@ Block SummingSortedBlockInputStream::readImpl()
desc.column_numbers = {i};
desc.function = factory.get("sumWithOverflow", {column.type});
desc.function->setArguments({column.type});
desc.add_function = desc.function->getAddressOfAddFunction();
desc.state.resize(desc.function->sizeOfData());
columns_to_aggregate.emplace_back(std::move(desc));
}
@ -237,6 +265,7 @@ Block SummingSortedBlockInputStream::readImpl()
// Create summation for all value columns in the map
desc.function = factory.get("sumMap", argument_types);
desc.function->setArguments(argument_types);
desc.add_function = desc.function->getAddressOfAddFunction();
desc.state.resize(desc.function->sizeOfData());
columns_to_aggregate.emplace_back(std::move(desc));
}
@ -279,7 +308,7 @@ Block SummingSortedBlockInputStream::readImpl()
template <typename TSortCursor>
void SummingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue)
{
size_t merged_rows = 0;
merged_rows = 0;
/// Take the rows in needed order and put them in `merged_block` until rows no more than `max_block_size`
while (!queue.empty())
@ -308,12 +337,7 @@ void SummingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std:
if (key_differs)
{
/// Write the data for the previous group.
if (!current_row_is_zero)
{
++merged_rows;
output_is_non_empty = true;
insertCurrentRow(merged_columns);
}
insertCurrentRowIfNeeded(merged_columns, false);
current_key.swap(next_key);
@ -327,11 +351,12 @@ void SummingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std:
}
// Start aggregations with current row
current_row_is_zero = !addRow(current_row, current);
addRow(current_row, current);
current_row_is_zero = true;
}
else
{
current_row_is_zero = !addRow(current_row, current);
addRow(current_row, current);
// Merge maps only for same rows
for (auto & desc : maps_to_sum)
@ -355,12 +380,7 @@ void SummingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std:
/// We will write the data for the last group, if it is non-zero.
/// If it is zero, and without it the output stream will be empty, we will write it anyway.
if (!current_row_is_zero || !output_is_non_empty)
{
++merged_rows; /// Dead store (result is unused). Left for clarity.
insertCurrentRow(merged_columns);
}
insertCurrentRowIfNeeded(merged_columns, !output_is_non_empty);
finished = true;
}
@ -449,22 +469,18 @@ bool SummingSortedBlockInputStream::mergeMap(const MapDescription & desc, Row &
template <typename TSortCursor>
bool SummingSortedBlockInputStream::addRow(Row & row, TSortCursor & cursor)
void SummingSortedBlockInputStream::addRow(Row & row, TSortCursor & cursor)
{
bool res = false;
for (auto & desc : columns_to_aggregate)
{
if (desc.created)
{
if (!desc.created)
throw Exception("Logical error in SummingSortedBlockInputStream, there are no description", ErrorCodes::LOGICAL_ERROR);
// Specialized case for unary functions
if (desc.column_numbers.size() == 1)
{
auto & col = cursor->all_columns[desc.column_numbers[0]];
desc.function->add(desc.state.data(), &col, cursor->pos, nullptr);
// Flag row as non-empty if at least one column number if non-zero
// Note: This defers compaction of signed type rows that sum to zero by one merge
if (!res)
res = col->get64(cursor->pos) != 0;
desc.add_function(desc.function.get(), desc.state.data(), &col, cursor->pos, nullptr);
}
else
{
@ -473,14 +489,9 @@ bool SummingSortedBlockInputStream::addRow(Row & row, TSortCursor & cursor)
for (size_t i = 0; i < desc.column_numbers.size(); ++i)
columns[i] = cursor->all_columns[desc.column_numbers[i]];
desc.function->add(desc.state.data(), columns.data(), cursor->pos, nullptr);
// Note: we can't detect whether the aggregation result is non-empty here yet
res = true;
desc.add_function(desc.function.get(),desc.state.data(), columns.data(), cursor->pos, nullptr);
}
}
}
return res;
}
}

View File

@ -73,6 +73,7 @@ private:
struct AggregateDescription
{
AggregateFunctionPtr function;
IAggregateFunction::AddFunc add_function = nullptr;
std::vector<size_t> column_numbers;
ColumnPtr merged_column;
std::vector<char> state;
@ -100,9 +101,10 @@ private:
RowRef next_key; /// The primary key of the next row.
Row current_row;
bool current_row_is_zero = true; /// The current row is summed to zero, and it should be deleted.
bool current_row_is_zero = true; /// Are all summed columns zero (or empty)? It is updated incrementally.
bool output_is_non_empty = false; /// Have we given out at least one row as a result.
size_t merged_rows = 0; /// Number of rows merged into current result block
/** We support two different cursors - with Collation and without.
* Templates are used instead of polymorphic SortCursor and calls to virtual functions.
@ -110,17 +112,17 @@ private:
template <typename TSortCursor>
void merge(ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue);
/// Insert the summed row for the current group into the result.
void insertCurrentRow(ColumnPlainPtrs & merged_columns);
/// Insert the summed row for the current group into the result and updates some of per-block flags if the row is not "zero".
/// If force_insertion=true, then the row will be inserted even if it is "zero"
void insertCurrentRowIfNeeded(ColumnPlainPtrs & merged_columns, bool force_insertion);
/// Returns true is merge result is not empty
template <typename TSortCursor>
bool mergeMap(const MapDescription & map, Row & row, TSortCursor & cursor);
/** Add the row under the cursor to the `row`.
* Returns false if the result is zero.
*/
// Add the row under the cursor to the `row`.
template <typename TSortCursor>
bool addRow(Row & row, TSortCursor & cursor);
void addRow(Row & row, TSortCursor & cursor);
};
}

View File

@ -2,3 +2,5 @@
2000-01-01 Hello 5 7 9
2000-01-01 Goodbye 1 2 3
2000-01-01 Hello 1 7 9
0 2
666 1

View File

@ -29,3 +29,13 @@ SELECT * FROM test.summing_merge_tree ORDER BY d, a, x, y, z;
DROP TABLE test.summing_merge_tree;
--
DROP TABLE IF EXISTS test.summing;
CREATE TABLE test.summing (p Date, k UInt64, s UInt64) ENGINE = SummingMergeTree(p, k, 1);
INSERT INTO test.summing (k, s) VALUES (0, 1);
INSERT INTO test.summing (k, s) VALUES (0, 1), (666, 1), (666, 0);
OPTIMIZE TABLE test.summing PARTITION 197001;
SELECT k, s FROM test.summing ORDER BY k;