Add SummingSortedTransform [part 3]

This commit is contained in:
Nikolai Kochetov 2020-04-01 14:45:02 +03:00
parent 61d6c61757
commit 2ecbf0b0bb
2 changed files with 217 additions and 2 deletions

View File

@ -286,6 +286,43 @@ namespace
return columns;
}
void finalizeChunk(
Chunk & chunk, size_t num_result_columns,
const SummingSortedTransform::ColumnsDefinition & columns_definition)
{
size_t num_rows = chunk.getNumRows();
auto columns = chunk.detachColumns();
Columns res_columns(num_result_columns);
size_t next_column = 0;
for (auto & desc : columns_definition.columns_to_aggregate)
{
auto column = std::move(columns[next_column]);
++next_column;
if (!desc.is_agg_func_type && isTuple(desc.function->getReturnType()))
{
/// Unpack tuple into block.
size_t tuple_size = desc.column_numbers.size();
for (size_t i = 0; i < tuple_size; ++i)
res_columns[desc.column_numbers[i]] = assert_cast<const ColumnTuple &>(*column).getColumnPtr(i);
}
else
res_columns[desc.column_numbers[0]] = std::move(column);
}
for (auto column_number : columns_definition.column_numbers_not_to_aggregate)
{
auto column = std::move(columns[next_column]);
++next_column;
res_columns[column_number] = std::move(column);
}
chunk.setColumns(std::move(res_columns), num_rows);
}
}
SummingSortedTransform::SummingSortedTransform(
@ -302,4 +339,151 @@ SummingSortedTransform::SummingSortedTransform(
current_row.resize(num_columns);
}
void SummingSortedTransform::initializeInputs()
{
queue = SortingHeap<SortCursor>(cursors);
is_queue_initialized = true;
}
void SummingSortedTransform::consume(Chunk chunk, size_t input_number)
{
updateCursor(std::move(chunk), input_number);
if (is_queue_initialized)
queue.push(cursors[input_number]);
}
void SummingSortedTransform::updateCursor(Chunk chunk, size_t source_num)
{
auto num_rows = chunk.getNumRows();
auto columns = chunk.detachColumns();
for (auto & column : columns)
column = column->convertToFullColumnIfConst();
chunk.setColumns(std::move(columns), num_rows);
auto & source_chunk = source_chunks[source_num];
if (source_chunk)
{
/// Extend lifetime of last chunk.
last_chunk = std::move(source_chunk);
last_chunk_sort_columns = std::move(cursors[source_num].all_columns);
source_chunk = std::move(chunk);
cursors[source_num].reset(source_chunk.getColumns(), {});
}
else
{
if (cursors[source_num].has_collation)
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR);
source_chunk = std::move(chunk);
cursors[source_num] = SortCursorImpl(source_chunk.getColumns(), description, source_num);
}
}
void SummingSortedTransform::work()
{
merge();
prepareOutputChunk(merged_data);
if (has_output_chunk)
finalizeChunk(output_chunk, getOutputs().back().getHeader().columns(), columns_definition);
}
void SummingSortedTransform::merge()
{
/// Take the rows in needed order and put them in `merged_columns` until rows no more than `max_block_size`
while (queue.isValid())
{
bool key_differs;
bool has_previous_group = !last_key.empty();
SortCursor current = queue.current();
{
RowRef current_key;
current_key.set(current);
if (!has_previous_group) /// The first key encountered.
{
key_differs = true;
current_row_is_zero = true;
}
else
key_differs = !last_key.hasEqualSortColumnsWith(current_key);
last_key = current_key;
last_chunk_sort_columns.clear();
}
if (key_differs)
{
if (has_previous_group)
/// Write the data for the previous group.
insertCurrentRowIfNeeded();
if (merged_data.hasEnoughRows())
{
/// The block is now full and the last row is calculated completely.
last_key.reset();
return;
}
setRow(current_row, current);
/// Reset aggregation states for next row
for (auto & desc : columns_definition.columns_to_aggregate)
desc.createState();
// Start aggregations with current row
addRow(current);
if (columns_definition.maps_to_sum.empty())
{
/// We have only columns_to_aggregate. The status of current row will be determined
/// in 'insertCurrentRowIfNeeded' method on the values of aggregate functions.
current_row_is_zero = true; // NOLINT
}
else
{
/// We have complex maps that will be summed with 'mergeMap' method.
/// The single row is considered non zero, and the status after merging with other rows
/// will be determined in the branch below (when key_differs == false).
current_row_is_zero = false; // NOLINT
}
}
else
{
addRow(current);
// Merge maps only for same rows
for (const auto & desc : columns_definition.maps_to_sum)
if (mergeMap(desc, current_row, current))
current_row_is_zero = false;
}
if (!current->isLast())
{
queue.next();
}
else
{
/// We get the next block from the corresponding source, if there is one.
queue.removeTop();
requestDataForInput(current.impl->order);
return;
}
}
/// We will write the data for the last group, if it is non-zero.
/// If it is zero, and without it the output stream will be empty, we will write it anyway.
insertCurrentRowIfNeeded();
last_chunk_sort_columns.clear();
is_finished = true;
}
}

View File

@ -103,6 +103,13 @@ public:
size_t getNumColumns() const { return column_numbers_not_to_aggregate.size() + columns_to_aggregate.size(); }
};
String getName() const override { return "SummingSortedTransform"; }
void work() override;
protected:
void initializeInputs() override;
void consume(Chunk chunk, size_t input_number) override;
private:
Row current_row;
bool current_row_is_zero = true; /// Are all summed columns zero (or empty)? It is updated incrementally.
@ -125,10 +132,34 @@ private:
struct RowRef
{
ColumnRawPtrs * sort_columns = nullptr; /// Point to sort_columns from SortCursor or last_chunk_sort_columns.
UInt64 row_number = 0;
UInt64 row_num = 0;
bool empty() const { return sort_columns == nullptr; }
void reset() { sort_columns = nullptr; }
void set(SortCursor & cursor)
{
sort_columns = &cursor.impl->sort_columns;
row_num = cursor.impl->pos;
}
bool hasEqualSortColumnsWith(const RowRef & other)
{
auto size = sort_columns->size();
for (size_t col_number = 0; col_number < size; ++col_number)
{
auto & cur_column = (*sort_columns)[col_number];
auto & other_column = (*other.sort_columns)[col_number];
if (0 != cur_column->compareAt(row_num, other.row_num, *other_column, 1))
return false;
}
return true;
}
};
RowRef last_row;
RowRef last_key;
SortingHeap<SortCursor> queue;
bool is_queue_initialized = false;