Add AggregatingSortedTransform.

This commit is contained in:
Nikolai Kochetov 2020-04-01 21:00:26 +03:00
parent 77e6714031
commit a0f163c761
8 changed files with 556 additions and 145 deletions

View File

@ -0,0 +1,252 @@
#include <Processors/Merges/AggregatingSortedTransform.h>
#include <Columns/ColumnAggregateFunction.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/DataTypeCustomSimpleAggregateFunction.h>
#include <DataTypes/DataTypeLowCardinality.h>
namespace DB
{
namespace
{
AggregatingSortedTransform::ColumnsDefinition defineColumns(
const Block & header, const SortDescription & description)
{
AggregatingSortedTransform::ColumnsDefinition def = {};
size_t num_columns = header.columns();
/// Fill in the column numbers that need to be aggregated.
for (size_t i = 0; i < num_columns; ++i)
{
const ColumnWithTypeAndName & column = header.safeGetByPosition(i);
/// We leave only states of aggregate functions.
if (!dynamic_cast<const DataTypeAggregateFunction *>(column.type.get())
&& !dynamic_cast<const DataTypeCustomSimpleAggregateFunction *>(column.type->getCustomName()))
{
def.column_numbers_not_to_aggregate.push_back(i);
continue;
}
/// Included into PK?
auto it = description.begin();
for (; it != description.end(); ++it)
if (it->column_name == column.name || (it->column_name.empty() && it->column_number == i))
break;
if (it != description.end())
{
def.column_numbers_not_to_aggregate.push_back(i);
continue;
}
if (auto simple_aggr = dynamic_cast<const DataTypeCustomSimpleAggregateFunction *>(column.type->getCustomName()))
{
auto type = recursiveRemoveLowCardinality(column.type);
if (type.get() == column.type.get())
type = nullptr;
// simple aggregate function
AggregatingSortedTransform::SimpleAggregateDescription desc(simple_aggr->getFunction(), i, type);
if (desc.function->allocatesMemoryInArena())
def.allocates_memory_in_arena = true;
def.columns_to_simple_aggregate.emplace_back(std::move(desc));
}
else
{
// standard aggregate function
def.columns_to_aggregate.emplace_back(i);
}
}
}
}
AggregatingSortedTransform::AggregatingSortedTransform(
size_t num_inputs, const Block & header,
SortDescription description_, size_t max_block_size)
: IMergingTransform(num_inputs, header, header, true)
, columns_definition(defineColumns(header, description_))
, merged_data(header.cloneEmptyColumns(), false, max_block_size)
, description(std::move(description_))
, source_chunks(num_inputs)
, cursors(num_inputs)
{
merged_data.initAggregateDescription(columns_definition);
}
void AggregatingSortedTransform::initializeInputs()
{
queue = SortingHeap<SortCursor>(cursors);
is_queue_initialized = true;
}
void AggregatingSortedTransform::consume(Chunk chunk, size_t input_number)
{
updateCursor(std::move(chunk), input_number);
if (is_queue_initialized)
queue.push(cursors[input_number]);
}
void AggregatingSortedTransform::updateCursor(Chunk chunk, size_t source_num)
{
auto num_rows = chunk.getNumRows();
auto columns = chunk.detachColumns();
for (auto & column : columns)
column = column->convertToFullColumnIfConst();
for (auto & desc : columns_definition.columns_to_simple_aggregate)
if (desc.type_to_convert)
columns[desc.column_number] = recursiveRemoveLowCardinality(columns[desc.column_number]);
chunk.setColumns(std::move(columns), num_rows);
auto & source_chunk = source_chunks[source_num];
if (source_chunk)
{
/// Extend lifetime of last chunk.
last_chunk = std::move(source_chunk);
last_chunk_sort_columns = std::move(cursors[source_num].all_columns);
source_chunk = std::move(chunk);
cursors[source_num].reset(source_chunk.getColumns(), {});
}
else
{
if (cursors[source_num].has_collation)
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR);
source_chunk = std::move(chunk);
cursors[source_num] = SortCursorImpl(source_chunk.getColumns(), description, source_num);
}
}
void AggregatingSortedTransform::work()
{
merge();
prepareOutputChunk(merged_data);
if (has_output_chunk)
{
size_t num_rows = output_chunk.getNumRows();
auto columns = output_chunk.detachColumns();
auto & header = getOutputs().back().getHeader();
for (auto & desc : columns_definition.columns_to_simple_aggregate)
{
if (desc.type_to_convert)
{
auto & from_type = header.getByPosition(desc.column_number).type;
auto & to_type = desc.type_to_convert;
columns[desc.column_number] = recursiveTypeConversion(columns[desc.column_number], from_type, to_type);
}
}
output_chunk.setColumns(std::move(columns), num_rows);
merged_data.initAggregateDescription(columns_definition);
}
}
void AggregatingSortedTransform::merge()
{
/// We take the rows in the correct order and put them in `merged_block`, while the rows are no more than `max_block_size`
while (queue.isValid())
{
bool key_differs;
bool has_previous_group = !last_key.empty();
SortCursor current = queue.current();
{
detail::RowRef current_key;
current_key.set(current);
if (!has_previous_group) /// The first key encountered.
key_differs = true;
else
key_differs = !last_key.hasEqualSortColumnsWith(current_key);
last_key = current_key;
last_chunk_sort_columns.clear();
}
if (key_differs)
{
/// if there are enough rows accumulated and the last one is calculated completely
if (merged_data.hasEnoughRows())
{
/// Write the simple aggregation result for the previous group.
insertSimpleAggregationResult();
return;
}
/// We will write the data for the group. We copy the values of ordinary columns.
merged_data.insertRow(current->all_columns, current->pos,
columns_definition.column_numbers_not_to_aggregate);
/// Add the empty aggregation state to the aggregate columns. The state will be updated in the `addRow` function.
for (auto & column_to_aggregate : columns_definition.columns_to_aggregate)
column_to_aggregate.column->insertDefault();
/// Write the simple aggregation result for the previous group.
if (merged_data.mergedRows() > 0)
insertSimpleAggregationResult();
/// Reset simple aggregation states for next row
for (auto & desc : columns_definition.columns_to_simple_aggregate)
desc.createState();
if (columns_definition.allocates_memory_in_arena)
arena = std::make_unique<Arena>();
}
addRow(current);
if (!current->isLast())
{
queue.next();
}
else
{
/// We get the next block from the corresponding source, if there is one.
queue.removeTop();
requestDataForInput(current.impl->order);
return;
}
}
/// Write the simple aggregation result for the previous group.
if (merged_data.mergedRows() > 0)
insertSimpleAggregationResult();
last_chunk_sort_columns.clear();
is_finished = true;
}
void AggregatingSortedTransform::addRow(SortCursor & cursor)
{
for (auto & desc : columns_definition.columns_to_aggregate)
desc.column->insertMergeFrom(*cursor->all_columns[desc.column_number], cursor->pos);
for (auto & desc : columns_definition.columns_to_simple_aggregate)
{
auto & col = cursor->all_columns[desc.column_number];
desc.add_function(desc.function.get(), desc.state.data(), &col, cursor->pos, arena.get());
}
}
void AggregatingSortedTransform::insertSimpleAggregationResult()
{
for (auto & desc : columns_definition.columns_to_simple_aggregate)
{
desc.function->insertResultInto(desc.state.data(), *desc.column);
desc.destroyState();
}
}
}

View File

@ -0,0 +1,162 @@
#pragma once
#include <Processors/Merges/IMergingTransform.h>
#include <Processors/Merges/MergedData.h>
#include <Processors/Merges/RowRef.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <Common/AlignedBuffer.h>
#include <Core/SortDescription.h>
#include <Core/SortCursor.h>
#include <Core/Row.h>
namespace DB
{
class ColumnAggregateFunction;
class AggregatingSortedTransform : public IMergingTransform
{
public:
AggregatingSortedTransform(
size_t num_inputs, const Block & header,
SortDescription description_, size_t max_block_size);
struct SimpleAggregateDescription;
struct ColumnsDefinition
{
struct AggregateDescription
{
ColumnAggregateFunction * column = nullptr;
const size_t column_number = 0;
AggregateDescription() = default;
explicit AggregateDescription(size_t col_number) : column_number(col_number) {}
};
/// Columns with which numbers should not be aggregated.
ColumnNumbers column_numbers_not_to_aggregate;
std::vector<AggregateDescription> columns_to_aggregate;
std::vector<SimpleAggregateDescription> columns_to_simple_aggregate;
/// Does SimpleAggregateFunction allocates memory in arena?
bool allocates_memory_in_arena = false;
};
String getName() const override { return "AggregatingSortedTransform"; }
void work() override;
protected:
void initializeInputs() override;
void consume(Chunk chunk, size_t input_number) override;
private:
/// Specialization for SummingSortedTransform. Inserts only data for non-aggregated columns.
struct AggregatingMergedData : public MergedData
{
public:
using MergedData::MergedData;
void insertRow(const ColumnRawPtrs & raw_columns, size_t row, const ColumnNumbers & column_numbers)
{
for (auto column_number :column_numbers)
columns[column_number]->insertFrom(*raw_columns[column_number], row);
++total_merged_rows;
++merged_rows;
/// TODO: sum_blocks_granularity += block_size;
}
/// Initialize aggregate descriptions with columns.
void initAggregateDescription(ColumnsDefinition & def)
{
for (auto & desc : def.columns_to_simple_aggregate)
desc.column = columns[desc.column_number].get();
for (auto & desc : def.columns_to_aggregate)
desc.column = typeid_cast<ColumnAggregateFunction *>(columns[desc.column_number].get());
}
};
ColumnsDefinition columns_definition;
AggregatingMergedData merged_data;
SortDescription description;
/// Chunks currently being merged.
std::vector<Chunk> source_chunks;
SortCursorImpls cursors;
/// In merging algorithm, we need to compare current sort key with the last one.
/// So, sorting columns for last row needed to be stored.
/// In order to do it, we extend lifetime of last chunk and it's sort columns (from corresponding sort cursor).
Chunk last_chunk;
ColumnRawPtrs last_chunk_sort_columns; /// Point to last_chunk if valid.
detail::RowRef last_key;
SortingHeap<SortCursor> queue;
bool is_queue_initialized = false;
/// Memory pool for SimpleAggregateFunction
/// (only when allocates_memory_in_arena == true).
std::unique_ptr<Arena> arena;
void merge();
void updateCursor(Chunk chunk, size_t source_num);
void addRow(SortCursor & cursor);
void insertSimpleAggregationResult();
public:
/// Stores information for aggregation of SimpleAggregateFunction columns
struct SimpleAggregateDescription
{
/// An aggregate function 'anyLast', 'sum'...
AggregateFunctionPtr function;
IAggregateFunction::AddFunc add_function = nullptr;
size_t column_number = 0;
IColumn * column = nullptr;
const DataTypePtr type_to_convert;
AlignedBuffer state;
bool created = false;
SimpleAggregateDescription(AggregateFunctionPtr function_, const size_t column_number_, DataTypePtr type)
: function(std::move(function_)), column_number(column_number_), type_to_convert(std::move(type))
{
add_function = function->getAddressOfAddFunction();
state.reset(function->sizeOfData(), function->alignOfData());
}
void createState()
{
if (created)
return;
function->create(state.data());
created = true;
}
void destroyState()
{
if (!created)
return;
function->destroy(state.data());
created = false;
}
/// Explicitly destroy aggregation state if the stream is terminated
~SimpleAggregateDescription()
{
destroyState();
}
SimpleAggregateDescription() = default;
SimpleAggregateDescription(SimpleAggregateDescription &&) = default;
SimpleAggregateDescription(const SimpleAggregateDescription &) = delete;
};
};
}

View File

@ -64,7 +64,7 @@ private:
SortingHeap<SortCursor> queue;
bool is_queue_initialized = false;
using RowRef = detail::RowRef;
using RowRef = detail::RowRefWithOwnedChunk;
static constexpr size_t max_row_refs = 4; /// first_negative, last_positive, last, current.
RowRef first_negative_row;
RowRef last_positive_row;

View File

@ -13,7 +13,7 @@
namespace DB
{
class ReplacingSortedTransform : public IMergingTransform
class ReplacingSortedTransform final : public IMergingTransform
{
public:
ReplacingSortedTransform(
@ -50,7 +50,7 @@ private:
SortingHeap<SortCursor> queue;
bool is_queue_initialized = false;
using RowRef = detail::RowRef;
using RowRef = detail::RowRefWithOwnedChunk;
static constexpr size_t max_row_refs = 3; /// last, current, selected.
RowRef last_row;
/// RowRef next_key; /// Primary key of next row.

View File

@ -103,10 +103,46 @@ inline void intrusive_ptr_release(SharedChunk * ptr)
}
/// This class represents a row in a chunk.
/// RowRef hold shared pointer to this chunk, possibly extending its life time.
struct RowRef
{
ColumnRawPtrs * sort_columns = nullptr; /// Point to sort_columns from SortCursor or last_chunk_sort_columns.
UInt64 row_num = 0;
bool empty() const { return sort_columns == nullptr; }
void reset() { sort_columns = nullptr; }
void set(SortCursor & cursor)
{
sort_columns = &cursor.impl->sort_columns;
row_num = cursor.impl->pos;
}
static bool checkEquals(const ColumnRawPtrs * left, size_t left_row, const ColumnRawPtrs * right, size_t right_row)
{
auto size = left->size();
for (size_t col_number = 0; col_number < size; ++col_number)
{
auto & cur_column = (*left)[col_number];
auto & other_column = (*right)[col_number];
if (0 != cur_column->compareAt(left_row, right_row, *other_column, 1))
return false;
}
return true;
}
bool hasEqualSortColumnsWith(const RowRef & other)
{
return checkEquals(sort_columns, row_num, other.sort_columns, other.row_num);
}
};
/// This class also represents a row in a chunk.
/// RowRefWithOwnedChunk hold shared pointer to this chunk, possibly extending its life time.
/// It is needed, for example, in CollapsingTransform, where we need to store first negative row for current sort key.
/// We do not copy data itself, because it may be potentially changed for each row. Performance for `set` is important.
struct RowRef
struct RowRefWithOwnedChunk
{
detail::SharedChunkPtr owned_chunk = nullptr;
@ -114,7 +150,7 @@ struct RowRef
ColumnRawPtrs * sort_columns = nullptr;
UInt64 row_num = 0;
void swap(RowRef & other)
void swap(RowRefWithOwnedChunk & other)
{
owned_chunk.swap(other.owned_chunk);
std::swap(all_columns, other.all_columns);
@ -140,19 +176,9 @@ struct RowRef
sort_columns = &owned_chunk->sort_columns;
}
bool hasEqualSortColumnsWith(const RowRef & other)
bool hasEqualSortColumnsWith(const RowRefWithOwnedChunk & other)
{
auto size = sort_columns->size();
for (size_t col_number = 0; col_number < size; ++col_number)
{
auto & cur_column = (*sort_columns)[col_number];
auto & other_column = (*other.sort_columns)[col_number];
if (0 != cur_column->compareAt(row_num, other.row_num, *other_column, 1))
return false;
}
return true;
return RowRef::checkEquals(sort_columns, row_num, other.sort_columns, other.row_num);
}
};

View File

@ -168,7 +168,7 @@ namespace
std::find(column_names_to_sum.begin(), column_names_to_sum.end(), column.name))
{
// Create aggregator to sum this column
detail::AggregateDescription desc;
SummingSortedTransform::AggregateDescription desc;
desc.is_agg_func_type = is_agg_func;
desc.column_numbers = {i};
@ -211,7 +211,7 @@ namespace
}
DataTypes argument_types;
detail::AggregateDescription desc;
SummingSortedTransform::AggregateDescription desc;
SummingSortedTransform::MapDescription map_desc;
column_num_it = map.second.begin();

View File

@ -2,6 +2,7 @@
#include <Processors/Merges/IMergingTransform.h>
#include <Processors/Merges/MergedData.h>
#include <Processors/Merges/RowRef.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
@ -13,8 +14,100 @@
namespace DB
{
namespace detail
class SummingSortedTransform final : public IMergingTransform
{
public:
SummingSortedTransform(
size_t num_inputs, const Block & header,
SortDescription description_,
/// List of columns to be summed. If empty, all numeric columns that are not in the description are taken.
const Names & column_names_to_sum,
size_t max_block_size);
struct AggregateDescription;
/// Stores numbers of key-columns and value-columns.
struct MapDescription
{
std::vector<size_t> key_col_nums;
std::vector<size_t> val_col_nums;
};
struct ColumnsDefinition
{
/// Columns with which values should be summed.
ColumnNumbers column_numbers_not_to_aggregate;
/// Columns which should be aggregated.
std::vector<AggregateDescription> columns_to_aggregate;
/// Mapping for nested columns.
std::vector<MapDescription> maps_to_sum;
size_t getNumColumns() const { return column_numbers_not_to_aggregate.size() + columns_to_aggregate.size(); }
};
/// Specialization for SummingSortedTransform. Inserts only data for non-aggregated columns.
struct SummingMergedData : public MergedData
{
public:
using MergedData::MergedData;
void insertRow(const Row & row, const ColumnNumbers & column_numbers)
{
for (auto column_number :column_numbers)
columns[column_number]->insert(row[column_number]);
++total_merged_rows;
++merged_rows;
/// TODO: sum_blocks_granularity += block_size;
}
/// Initialize aggregate descriptions with columns.
void initAggregateDescription(std::vector<AggregateDescription> & columns_to_aggregate)
{
size_t num_columns = columns_to_aggregate.size();
for (size_t column_number = 0; column_number < num_columns; ++column_number)
columns_to_aggregate[column_number].merged_column = columns[column_number].get();
}
};
String getName() const override { return "SummingSortedTransform"; }
void work() override;
protected:
void initializeInputs() override;
void consume(Chunk chunk, size_t input_number) override;
private:
Row current_row;
bool current_row_is_zero = true; /// Are all summed columns zero (or empty)? It is updated incrementally.
ColumnsDefinition columns_definition;
SummingMergedData merged_data;
SortDescription description;
/// Chunks currently being merged.
std::vector<Chunk> source_chunks;
SortCursorImpls cursors;
/// In merging algorithm, we need to compare current sort key with the last one.
/// So, sorting columns for last row needed to be stored.
/// In order to do it, we extend lifetime of last chunk and it's sort columns (from corresponding sort cursor).
Chunk last_chunk;
ColumnRawPtrs last_chunk_sort_columns; /// Point to last_chunk if valid.
detail::RowRef last_key;
SortingHeap<SortCursor> queue;
bool is_queue_initialized = false;
void merge();
void updateCursor(Chunk chunk, size_t source_num);
void addRow(SortCursor & cursor);
void insertCurrentRowIfNeeded();
public:
/// Stores aggregation function, state, and columns to be used as function arguments.
struct AggregateDescription
{
@ -66,128 +159,6 @@ namespace detail
AggregateDescription(AggregateDescription &&) = default;
AggregateDescription(const AggregateDescription &) = delete;
};
/// Specialization for SummingSortedTransform. Inserts only data for non-aggregated columns.
struct SummingMergedData : public MergedData
{
public:
using MergedData::MergedData;
void insertRow(const Row & row, const ColumnNumbers & column_numbers)
{
for (auto column_number :column_numbers)
columns[column_number]->insert(row[column_number]);
++total_merged_rows;
++merged_rows;
/// TODO: sum_blocks_granularity += block_size;
}
/// Initialize aggregate descriptions with columns.
void initAggregateDescription(std::vector<AggregateDescription> & columns_to_aggregate)
{
size_t num_columns = columns_to_aggregate.size();
for (size_t column_number = 0; column_number < num_columns; ++column_number)
columns_to_aggregate[column_number].merged_column = columns[column_number].get();
}
};
struct RowRef
{
ColumnRawPtrs * sort_columns = nullptr; /// Point to sort_columns from SortCursor or last_chunk_sort_columns.
UInt64 row_num = 0;
bool empty() const { return sort_columns == nullptr; }
void reset() { sort_columns = nullptr; }
void set(SortCursor & cursor)
{
sort_columns = &cursor.impl->sort_columns;
row_num = cursor.impl->pos;
}
bool hasEqualSortColumnsWith(const RowRef & other)
{
auto size = sort_columns->size();
for (size_t col_number = 0; col_number < size; ++col_number)
{
auto & cur_column = (*sort_columns)[col_number];
auto & other_column = (*other.sort_columns)[col_number];
if (0 != cur_column->compareAt(row_num, other.row_num, *other_column, 1))
return false;
}
return true;
}
};
}
class SummingSortedTransform : public IMergingTransform
{
public:
SummingSortedTransform(
size_t num_inputs, const Block & header,
SortDescription description_,
/// List of columns to be summed. If empty, all numeric columns that are not in the description are taken.
const Names & column_names_to_sum,
size_t max_block_size);
/// Stores numbers of key-columns and value-columns.
struct MapDescription
{
std::vector<size_t> key_col_nums;
std::vector<size_t> val_col_nums;
};
struct ColumnsDefinition
{
/// Columns with which values should be summed.
ColumnNumbers column_numbers_not_to_aggregate;
/// Columns which should be aggregated.
std::vector<detail::AggregateDescription> columns_to_aggregate;
/// Mapping for nested columns.
std::vector<MapDescription> maps_to_sum;
size_t getNumColumns() const { return column_numbers_not_to_aggregate.size() + columns_to_aggregate.size(); }
};
String getName() const override { return "SummingSortedTransform"; }
void work() override;
protected:
void initializeInputs() override;
void consume(Chunk chunk, size_t input_number) override;
private:
Row current_row;
bool current_row_is_zero = true; /// Are all summed columns zero (or empty)? It is updated incrementally.
ColumnsDefinition columns_definition;
detail::SummingMergedData merged_data;
SortDescription description;
/// Chunks currently being merged.
std::vector<Chunk> source_chunks;
SortCursorImpls cursors;
/// In merging algorithm, we need to compare current sort key with the last one.
/// So, sorting columns for last row needed to be stored.
/// In order to do it, we extend lifetime of last chunk and it's sort columns (from corresponding sort cursor).
Chunk last_chunk;
ColumnRawPtrs last_chunk_sort_columns; /// Point to last_chunk if valid.
detail::RowRef last_key;
SortingHeap<SortCursor> queue;
bool is_queue_initialized = false;
void merge();
void updateCursor(Chunk chunk, size_t source_num);
void addRow(SortCursor & cursor);
void insertCurrentRowIfNeeded();
};
}

View File

@ -15,7 +15,7 @@
namespace DB
{
class VersionedCollapsingTransform : public IMergingTransform
class VersionedCollapsingTransform final : public IMergingTransform
{
public:
/// Don't need version column. It's in primary key.
@ -53,7 +53,7 @@ private:
SortingHeap<SortCursor> queue;
bool is_queue_initialized = false;
using RowRef = detail::RowRef;
using RowRef = detail::RowRefWithOwnedChunk;
const size_t max_rows_in_queue;
/// Rows with the same primary key and sign.
FixedSizeDequeWithGaps<RowRef> current_keys;