From ab0aff87092c1cce8fdda1c476738bab3bae1ff0 Mon Sep 17 00:00:00 2001 From: Vitaliy Lyudvichenko Date: Thu, 16 Nov 2017 22:03:32 +0300 Subject: [PATCH] Fixed SummingMergeTree. [#CLICKHOUSE-2] --- contrib/poco | 2 +- .../SummingSortedBlockInputStream.cpp | 95 +++++++++++-------- .../SummingSortedBlockInputStream.h | 18 ++-- .../00084_summing_merge_tree.reference | 2 + .../0_stateless/00084_summing_merge_tree.sql | 10 ++ 5 files changed, 76 insertions(+), 51 deletions(-) diff --git a/contrib/poco b/contrib/poco index e30352c2c24..1366df1c7e0 160000 --- a/contrib/poco +++ b/contrib/poco @@ -1 +1 @@ -Subproject commit e30352c2c24eebecbd82d41f7054d908ac7fdc37 +Subproject commit 1366df1c7e068bb2efd846bc8dc8e286b090904e diff --git a/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp b/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp index 7d0f94e997e..bc5c5dfc7dc 100644 --- a/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp @@ -36,7 +36,7 @@ String SummingSortedBlockInputStream::getID() const } -void SummingSortedBlockInputStream::insertCurrentRow(ColumnPlainPtrs & merged_columns) +void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(ColumnPlainPtrs & merged_columns, bool force_insertion) { for (auto & desc : columns_to_aggregate) { @@ -46,6 +46,19 @@ void SummingSortedBlockInputStream::insertCurrentRow(ColumnPlainPtrs & merged_co try { desc.function->insertResultInto(desc.state.data(), *desc.merged_column); + + /// Update zero status of current row + if (desc.column_numbers.size() == 1) + { + // Flag row as non-empty if at least one column number if non-zero + current_row_is_zero = current_row_is_zero && desc.merged_column->get64(desc.merged_column->size() - 1) == 0; + } + else + { + /// It is sumMap aggregate function. + /// Assume that the row isn't empty in this case (just because it is compatible with previous version) + current_row_is_zero = false; + } } catch (...) { @@ -60,8 +73,22 @@ void SummingSortedBlockInputStream::insertCurrentRow(ColumnPlainPtrs & merged_co desc.merged_column->insertDefault(); } + /// If it is "zero" row and it is not the last row of the result block, then + /// rollback the insertion (at this moment we need rollback only cols from columns_to_aggregate) + if (!force_insertion && current_row_is_zero) + { + for (auto & desc : columns_to_aggregate) + desc.merged_column->popBack(1); + + return; + } + for (auto i : column_numbers_not_to_aggregate) merged_columns[i]->insert(current_row[i]); + + /// Update per-block and per-group flags + ++merged_rows; + output_is_non_empty = true; } @@ -155,6 +182,7 @@ Block SummingSortedBlockInputStream::readImpl() desc.column_numbers = {i}; desc.function = factory.get("sumWithOverflow", {column.type}); desc.function->setArguments({column.type}); + desc.add_function = desc.function->getAddressOfAddFunction(); desc.state.resize(desc.function->sizeOfData()); columns_to_aggregate.emplace_back(std::move(desc)); } @@ -237,6 +265,7 @@ Block SummingSortedBlockInputStream::readImpl() // Create summation for all value columns in the map desc.function = factory.get("sumMap", argument_types); desc.function->setArguments(argument_types); + desc.add_function = desc.function->getAddressOfAddFunction(); desc.state.resize(desc.function->sizeOfData()); columns_to_aggregate.emplace_back(std::move(desc)); } @@ -279,7 +308,7 @@ Block SummingSortedBlockInputStream::readImpl() template void SummingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std::priority_queue & queue) { - size_t merged_rows = 0; + merged_rows = 0; /// Take the rows in needed order and put them in `merged_block` until rows no more than `max_block_size` while (!queue.empty()) @@ -308,12 +337,7 @@ void SummingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std: if (key_differs) { /// Write the data for the previous group. - if (!current_row_is_zero) - { - ++merged_rows; - output_is_non_empty = true; - insertCurrentRow(merged_columns); - } + insertCurrentRowIfNeeded(merged_columns, false); current_key.swap(next_key); @@ -327,11 +351,12 @@ void SummingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std: } // Start aggregations with current row - current_row_is_zero = !addRow(current_row, current); + addRow(current_row, current); + current_row_is_zero = true; } else { - current_row_is_zero = !addRow(current_row, current); + addRow(current_row, current); // Merge maps only for same rows for (auto & desc : maps_to_sum) @@ -355,12 +380,7 @@ void SummingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std: /// We will write the data for the last group, if it is non-zero. /// If it is zero, and without it the output stream will be empty, we will write it anyway. - if (!current_row_is_zero || !output_is_non_empty) - { - ++merged_rows; /// Dead store (result is unused). Left for clarity. - insertCurrentRow(merged_columns); - } - + insertCurrentRowIfNeeded(merged_columns, !output_is_non_empty); finished = true; } @@ -449,38 +469,29 @@ bool SummingSortedBlockInputStream::mergeMap(const MapDescription & desc, Row & template -bool SummingSortedBlockInputStream::addRow(Row & row, TSortCursor & cursor) +void SummingSortedBlockInputStream::addRow(Row & row, TSortCursor & cursor) { - bool res = false; for (auto & desc : columns_to_aggregate) { - if (desc.created) - { - // Specialized case for unary functions - if (desc.column_numbers.size() == 1) - { - auto & col = cursor->all_columns[desc.column_numbers[0]]; - desc.function->add(desc.state.data(), &col, cursor->pos, nullptr); - // Flag row as non-empty if at least one column number if non-zero - // Note: This defers compaction of signed type rows that sum to zero by one merge - if (!res) - res = col->get64(cursor->pos) != 0; - } - else - { - // Gather all source columns into a vector - ConstColumnPlainPtrs columns(desc.column_numbers.size()); - for (size_t i = 0; i < desc.column_numbers.size(); ++i) - columns[i] = cursor->all_columns[desc.column_numbers[i]]; + if (!desc.created) + throw Exception("Logical error in SummingSortedBlockInputStream, there are no description", ErrorCodes::LOGICAL_ERROR); - desc.function->add(desc.state.data(), columns.data(), cursor->pos, nullptr); - // Note: we can't detect whether the aggregation result is non-empty here yet - res = true; - } + // Specialized case for unary functions + if (desc.column_numbers.size() == 1) + { + auto & col = cursor->all_columns[desc.column_numbers[0]]; + desc.add_function(desc.function.get(), desc.state.data(), &col, cursor->pos, nullptr); + } + else + { + // Gather all source columns into a vector + ConstColumnPlainPtrs columns(desc.column_numbers.size()); + for (size_t i = 0; i < desc.column_numbers.size(); ++i) + columns[i] = cursor->all_columns[desc.column_numbers[i]]; + + desc.add_function(desc.function.get(),desc.state.data(), columns.data(), cursor->pos, nullptr); } } - - return res; } } diff --git a/dbms/src/DataStreams/SummingSortedBlockInputStream.h b/dbms/src/DataStreams/SummingSortedBlockInputStream.h index f21d79d01a6..493e32d5326 100644 --- a/dbms/src/DataStreams/SummingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/SummingSortedBlockInputStream.h @@ -73,6 +73,7 @@ private: struct AggregateDescription { AggregateFunctionPtr function; + IAggregateFunction::AddFunc add_function = nullptr; std::vector column_numbers; ColumnPtr merged_column; std::vector state; @@ -100,9 +101,10 @@ private: RowRef next_key; /// The primary key of the next row. Row current_row; - bool current_row_is_zero = true; /// The current row is summed to zero, and it should be deleted. + bool current_row_is_zero = true; /// Are all summed columns zero (or empty)? It is updated incrementally. - bool output_is_non_empty = false; /// Have we given out at least one row as a result. + bool output_is_non_empty = false; /// Have we given out at least one row as a result. + size_t merged_rows = 0; /// Number of rows merged into current result block /** We support two different cursors - with Collation and without. * Templates are used instead of polymorphic SortCursor and calls to virtual functions. @@ -110,17 +112,17 @@ private: template void merge(ColumnPlainPtrs & merged_columns, std::priority_queue & queue); - /// Insert the summed row for the current group into the result. - void insertCurrentRow(ColumnPlainPtrs & merged_columns); + /// Insert the summed row for the current group into the result and updates some of per-block flags if the row is not "zero". + /// If force_insertion=true, then the row will be inserted even if it is "zero" + void insertCurrentRowIfNeeded(ColumnPlainPtrs & merged_columns, bool force_insertion); + /// Returns true is merge result is not empty template bool mergeMap(const MapDescription & map, Row & row, TSortCursor & cursor); - /** Add the row under the cursor to the `row`. - * Returns false if the result is zero. - */ + // Add the row under the cursor to the `row`. template - bool addRow(Row & row, TSortCursor & cursor); + void addRow(Row & row, TSortCursor & cursor); }; } diff --git a/dbms/tests/queries/0_stateless/00084_summing_merge_tree.reference b/dbms/tests/queries/0_stateless/00084_summing_merge_tree.reference index 7142e1ca063..72f24941378 100644 --- a/dbms/tests/queries/0_stateless/00084_summing_merge_tree.reference +++ b/dbms/tests/queries/0_stateless/00084_summing_merge_tree.reference @@ -2,3 +2,5 @@ 2000-01-01 Hello 5 7 9 2000-01-01 Goodbye 1 2 3 2000-01-01 Hello 1 7 9 +0 2 +666 1 diff --git a/dbms/tests/queries/0_stateless/00084_summing_merge_tree.sql b/dbms/tests/queries/0_stateless/00084_summing_merge_tree.sql index 82b5117625f..c03092d0277 100644 --- a/dbms/tests/queries/0_stateless/00084_summing_merge_tree.sql +++ b/dbms/tests/queries/0_stateless/00084_summing_merge_tree.sql @@ -29,3 +29,13 @@ SELECT * FROM test.summing_merge_tree ORDER BY d, a, x, y, z; DROP TABLE test.summing_merge_tree; + +-- +DROP TABLE IF EXISTS test.summing; +CREATE TABLE test.summing (p Date, k UInt64, s UInt64) ENGINE = SummingMergeTree(p, k, 1); + +INSERT INTO test.summing (k, s) VALUES (0, 1); +INSERT INTO test.summing (k, s) VALUES (0, 1), (666, 1), (666, 0); +OPTIMIZE TABLE test.summing PARTITION 197001; + +SELECT k, s FROM test.summing ORDER BY k;