mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-29 11:02:08 +00:00
Added SummingSortedAlgorithm
This commit is contained in:
parent
89aef7aaf9
commit
586c295b94
@ -1,5 +1,7 @@
|
||||
#include <Processors/Merges/AggregatingSortedAlgorithm.h>
|
||||
|
||||
#include <Columns/ColumnAggregateFunction.h>
|
||||
#include <Common/AlignedBuffer.h>
|
||||
#include <DataTypes/DataTypeAggregateFunction.h>
|
||||
#include <DataTypes/DataTypeCustomSimpleAggregateFunction.h>
|
||||
#include <DataTypes/DataTypeLowCardinality.h>
|
||||
@ -136,8 +138,9 @@ static MutableColumns getMergedColumns(const Block & header, const AggregatingSo
|
||||
|
||||
for (auto & desc : def.columns_to_simple_aggregate)
|
||||
{
|
||||
auto & type = header.getByPosition(desc.column_number).type;
|
||||
columns[desc.column_number] = recursiveRemoveLowCardinality(type)->createColumn();
|
||||
auto & type = desc.nested_type ? desc.nested_type
|
||||
: desc.real_type;
|
||||
columns[desc.column_number] = type->createColumn();
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < columns.size(); ++i)
|
||||
@ -147,7 +150,8 @@ static MutableColumns getMergedColumns(const Block & header, const AggregatingSo
|
||||
return columns;
|
||||
}
|
||||
|
||||
static void prepareChunk(Chunk & chunk, const AggregatingSortedAlgorithm::ColumnsDefinition & def)
|
||||
/// Remove constants and LowCardinality for SimpleAggregateFunction
|
||||
static void preprocessChunk(Chunk & chunk, const AggregatingSortedAlgorithm::ColumnsDefinition & def)
|
||||
{
|
||||
auto num_rows = chunk.getNumRows();
|
||||
auto columns = chunk.detachColumns();
|
||||
@ -162,6 +166,25 @@ static void prepareChunk(Chunk & chunk, const AggregatingSortedAlgorithm::Column
|
||||
chunk.setColumns(std::move(columns), num_rows);
|
||||
}
|
||||
|
||||
/// Return back LowCardinality for SimpleAggregateFunction
|
||||
static void postprocessChunk(Chunk & chunk, const AggregatingSortedAlgorithm::ColumnsDefinition & def)
|
||||
{
|
||||
size_t num_rows = chunk.getNumRows();
|
||||
auto columns_ = chunk.detachColumns();
|
||||
|
||||
for (auto & desc : def.columns_to_simple_aggregate)
|
||||
{
|
||||
if (desc.nested_type)
|
||||
{
|
||||
auto & from_type = desc.nested_type;
|
||||
auto & to_type = desc.real_type;
|
||||
columns_[desc.column_number] = recursiveTypeConversion(columns_[desc.column_number], from_type, to_type);
|
||||
}
|
||||
}
|
||||
|
||||
chunk.setColumns(std::move(columns_), num_rows);
|
||||
}
|
||||
|
||||
|
||||
AggregatingSortedAlgorithm::AggregatingMergedData::AggregatingMergedData(
|
||||
MutableColumns columns_, UInt64 max_block_size_, ColumnsDefinition & def_)
|
||||
@ -226,21 +249,8 @@ Chunk AggregatingSortedAlgorithm::AggregatingMergedData::pull()
|
||||
throw Exception("Can't pull chunk because group was not finished.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
auto chunk = MergedData::pull();
|
||||
postprocessChunk(chunk, def);
|
||||
|
||||
size_t num_rows = chunk.getNumRows();
|
||||
auto columns_ = chunk.detachColumns();
|
||||
|
||||
for (auto & desc : def.columns_to_simple_aggregate)
|
||||
{
|
||||
if (desc.nested_type)
|
||||
{
|
||||
auto & from_type = desc.nested_type;
|
||||
auto & to_type = desc.real_type;
|
||||
columns_[desc.column_number] = recursiveTypeConversion(columns_[desc.column_number], from_type, to_type);
|
||||
}
|
||||
}
|
||||
|
||||
chunk.setColumns(std::move(columns_), num_rows);
|
||||
initAggregateDescription();
|
||||
|
||||
return chunk;
|
||||
@ -269,14 +279,14 @@ void AggregatingSortedAlgorithm::initialize(Chunks chunks)
|
||||
{
|
||||
for (auto & chunk : chunks)
|
||||
if (chunk)
|
||||
prepareChunk(chunk, columns_definition);
|
||||
preprocessChunk(chunk, columns_definition);
|
||||
|
||||
initializeQueue(std::move(chunks));
|
||||
}
|
||||
|
||||
void AggregatingSortedAlgorithm::consume(Chunk chunk, size_t source_num)
|
||||
{
|
||||
prepareChunk(chunk, columns_definition);
|
||||
preprocessChunk(chunk, columns_definition);
|
||||
updateCursor(std::move(chunk), source_num);
|
||||
}
|
||||
|
||||
|
@ -2,11 +2,6 @@
|
||||
|
||||
#include <Processors/Merges/IMergingAlgorithmWithDelayedChunk.h>
|
||||
#include <Processors/Merges/MergedData.h>
|
||||
#include <AggregateFunctions/IAggregateFunction.h>
|
||||
#include <Common/AlignedBuffer.h>
|
||||
#include <DataTypes/DataTypeLowCardinality.h>
|
||||
#include <Columns/ColumnAggregateFunction.h>
|
||||
#include <Common/Arena.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
594
src/Processors/Merges/SummingSortedAlgorithm.cpp
Normal file
594
src/Processors/Merges/SummingSortedAlgorithm.cpp
Normal file
@ -0,0 +1,594 @@
|
||||
#include <Processors/Merges/SummingSortedAlgorithm.h>
|
||||
#include <Common/FieldVisitors.h>
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
#include <DataTypes/NestedUtils.h>
|
||||
#include <Common/StringUtils/StringUtils.h>
|
||||
#include <Columns/ColumnTuple.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Columns/ColumnAggregateFunction.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int CORRUPTED_DATA;
|
||||
}
|
||||
|
||||
static bool isInPrimaryKey(const SortDescription & description, const std::string & name, const size_t number)
|
||||
{
|
||||
for (auto & desc : description)
|
||||
if (desc.column_name == name || (desc.column_name.empty() && desc.column_number == number))
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/// Returns true if merge result is not empty
|
||||
static bool mergeMap(const SummingSortedAlgorithm::MapDescription & desc, Row & row, SortCursor & cursor)
|
||||
{
|
||||
/// Strongly non-optimal.
|
||||
|
||||
Row & left = row;
|
||||
Row right(left.size());
|
||||
|
||||
for (size_t col_num : desc.key_col_nums)
|
||||
right[col_num] = (*cursor->all_columns[col_num])[cursor->pos].template get<Array>();
|
||||
|
||||
for (size_t col_num : desc.val_col_nums)
|
||||
right[col_num] = (*cursor->all_columns[col_num])[cursor->pos].template get<Array>();
|
||||
|
||||
auto at_ith_column_jth_row = [&](const Row & matrix, size_t i, size_t j) -> const Field &
|
||||
{
|
||||
return matrix[i].get<Array>()[j];
|
||||
};
|
||||
|
||||
auto tuple_of_nth_columns_at_jth_row = [&](const Row & matrix, const ColumnNumbers & col_nums, size_t j) -> Array
|
||||
{
|
||||
size_t size = col_nums.size();
|
||||
Array res(size);
|
||||
for (size_t col_num_index = 0; col_num_index < size; ++col_num_index)
|
||||
res[col_num_index] = at_ith_column_jth_row(matrix, col_nums[col_num_index], j);
|
||||
return res;
|
||||
};
|
||||
|
||||
std::map<Array, Array> merged;
|
||||
|
||||
auto accumulate = [](Array & dst, const Array & src)
|
||||
{
|
||||
bool has_non_zero = false;
|
||||
size_t size = dst.size();
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
if (applyVisitor(FieldVisitorSum(src[i]), dst[i]))
|
||||
has_non_zero = true;
|
||||
return has_non_zero;
|
||||
};
|
||||
|
||||
auto merge = [&](const Row & matrix)
|
||||
{
|
||||
size_t rows = matrix[desc.key_col_nums[0]].get<Array>().size();
|
||||
|
||||
for (size_t j = 0; j < rows; ++j)
|
||||
{
|
||||
Array key = tuple_of_nth_columns_at_jth_row(matrix, desc.key_col_nums, j);
|
||||
Array value = tuple_of_nth_columns_at_jth_row(matrix, desc.val_col_nums, j);
|
||||
|
||||
auto it = merged.find(key);
|
||||
if (merged.end() == it)
|
||||
merged.emplace(std::move(key), std::move(value));
|
||||
else
|
||||
{
|
||||
if (!accumulate(it->second, value))
|
||||
merged.erase(it);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
merge(left);
|
||||
merge(right);
|
||||
|
||||
for (size_t col_num : desc.key_col_nums)
|
||||
row[col_num] = Array(merged.size());
|
||||
for (size_t col_num : desc.val_col_nums)
|
||||
row[col_num] = Array(merged.size());
|
||||
|
||||
size_t row_num = 0;
|
||||
for (const auto & key_value : merged)
|
||||
{
|
||||
for (size_t col_num_index = 0, size = desc.key_col_nums.size(); col_num_index < size; ++col_num_index)
|
||||
row[desc.key_col_nums[col_num_index]].get<Array>()[row_num] = key_value.first[col_num_index];
|
||||
|
||||
for (size_t col_num_index = 0, size = desc.val_col_nums.size(); col_num_index < size; ++col_num_index)
|
||||
row[desc.val_col_nums[col_num_index]].get<Array>()[row_num] = key_value.second[col_num_index];
|
||||
|
||||
++row_num;
|
||||
}
|
||||
|
||||
return row_num != 0;
|
||||
}
|
||||
|
||||
static SummingSortedAlgorithm::ColumnsDefinition defineColumns(
|
||||
const Block & header,
|
||||
const SortDescription & description,
|
||||
const Names & column_names_to_sum)
|
||||
{
|
||||
size_t num_columns = header.columns();
|
||||
SummingSortedAlgorithm::ColumnsDefinition def;
|
||||
|
||||
/// name of nested structure -> the column numbers that refer to it.
|
||||
std::unordered_map<std::string, std::vector<size_t>> discovered_maps;
|
||||
|
||||
/** Fill in the column numbers, which must be summed.
|
||||
* This can only be numeric columns that are not part of the sort key.
|
||||
* If a non-empty column_names_to_sum is specified, then we only take these columns.
|
||||
* Some columns from column_names_to_sum may not be found. This is ignored.
|
||||
*/
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
{
|
||||
const ColumnWithTypeAndName & column = header.safeGetByPosition(i);
|
||||
|
||||
/// Discover nested Maps and find columns for summation
|
||||
if (typeid_cast<const DataTypeArray *>(column.type.get()))
|
||||
{
|
||||
const auto map_name = Nested::extractTableName(column.name);
|
||||
/// if nested table name ends with `Map` it is a possible candidate for special handling
|
||||
if (map_name == column.name || !endsWith(map_name, "Map"))
|
||||
{
|
||||
def.column_numbers_not_to_aggregate.push_back(i);
|
||||
continue;
|
||||
}
|
||||
|
||||
discovered_maps[map_name].emplace_back(i);
|
||||
}
|
||||
else
|
||||
{
|
||||
bool is_agg_func = WhichDataType(column.type).isAggregateFunction();
|
||||
|
||||
/// There are special const columns for example after prewhere sections.
|
||||
if ((!column.type->isSummable() && !is_agg_func) || isColumnConst(*column.column))
|
||||
{
|
||||
def.column_numbers_not_to_aggregate.push_back(i);
|
||||
continue;
|
||||
}
|
||||
|
||||
/// Are they inside the PK?
|
||||
if (isInPrimaryKey(description, column.name, i))
|
||||
{
|
||||
def.column_numbers_not_to_aggregate.push_back(i);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (column_names_to_sum.empty()
|
||||
|| column_names_to_sum.end() !=
|
||||
std::find(column_names_to_sum.begin(), column_names_to_sum.end(), column.name))
|
||||
{
|
||||
// Create aggregator to sum this column
|
||||
SummingSortedAlgorithm::AggregateDescription desc;
|
||||
desc.is_agg_func_type = is_agg_func;
|
||||
desc.column_numbers = {i};
|
||||
|
||||
if (!is_agg_func)
|
||||
{
|
||||
desc.init("sumWithOverflow", {column.type});
|
||||
}
|
||||
|
||||
def.columns_to_aggregate.emplace_back(std::move(desc));
|
||||
}
|
||||
else
|
||||
{
|
||||
// Column is not going to be summed, use last value
|
||||
def.column_numbers_not_to_aggregate.push_back(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// select actual nested Maps from list of candidates
|
||||
for (const auto & map : discovered_maps)
|
||||
{
|
||||
/// map should contain at least two elements (key -> value)
|
||||
if (map.second.size() < 2)
|
||||
{
|
||||
for (auto col : map.second)
|
||||
def.column_numbers_not_to_aggregate.push_back(col);
|
||||
continue;
|
||||
}
|
||||
|
||||
/// no elements of map could be in primary key
|
||||
auto column_num_it = map.second.begin();
|
||||
for (; column_num_it != map.second.end(); ++column_num_it)
|
||||
if (isInPrimaryKey(description, header.safeGetByPosition(*column_num_it).name, *column_num_it))
|
||||
break;
|
||||
if (column_num_it != map.second.end())
|
||||
{
|
||||
for (auto col : map.second)
|
||||
def.column_numbers_not_to_aggregate.push_back(col);
|
||||
continue;
|
||||
}
|
||||
|
||||
DataTypes argument_types;
|
||||
SummingSortedAlgorithm::AggregateDescription desc;
|
||||
SummingSortedAlgorithm::MapDescription map_desc;
|
||||
|
||||
column_num_it = map.second.begin();
|
||||
for (; column_num_it != map.second.end(); ++column_num_it)
|
||||
{
|
||||
const ColumnWithTypeAndName & key_col = header.safeGetByPosition(*column_num_it);
|
||||
const String & name = key_col.name;
|
||||
const IDataType & nested_type = *assert_cast<const DataTypeArray &>(*key_col.type).getNestedType();
|
||||
|
||||
if (column_num_it == map.second.begin()
|
||||
|| endsWith(name, "ID")
|
||||
|| endsWith(name, "Key")
|
||||
|| endsWith(name, "Type"))
|
||||
{
|
||||
if (!nested_type.isValueRepresentedByInteger() && !isStringOrFixedString(nested_type))
|
||||
break;
|
||||
|
||||
map_desc.key_col_nums.push_back(*column_num_it);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!nested_type.isSummable())
|
||||
break;
|
||||
|
||||
map_desc.val_col_nums.push_back(*column_num_it);
|
||||
}
|
||||
|
||||
// Add column to function arguments
|
||||
desc.column_numbers.push_back(*column_num_it);
|
||||
argument_types.push_back(key_col.type);
|
||||
}
|
||||
|
||||
if (column_num_it != map.second.end())
|
||||
{
|
||||
for (auto col : map.second)
|
||||
def.column_numbers_not_to_aggregate.push_back(col);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (map_desc.key_col_nums.size() == 1)
|
||||
{
|
||||
// Create summation for all value columns in the map
|
||||
desc.init("sumMapWithOverflow", argument_types);
|
||||
def.columns_to_aggregate.emplace_back(std::move(desc));
|
||||
}
|
||||
else
|
||||
{
|
||||
// Fall back to legacy mergeMaps for composite keys
|
||||
for (auto col : map.second)
|
||||
def.column_numbers_not_to_aggregate.push_back(col);
|
||||
def.maps_to_sum.emplace_back(std::move(map_desc));
|
||||
}
|
||||
}
|
||||
|
||||
return def;
|
||||
}
|
||||
|
||||
static MutableColumns getMergedDataColumns(
|
||||
const Block & header,
|
||||
const SummingSortedAlgorithm::ColumnsDefinition & columns_definition)
|
||||
{
|
||||
MutableColumns columns;
|
||||
columns.reserve(columns_definition.getNumColumns());
|
||||
|
||||
for (auto & desc : columns_definition.columns_to_aggregate)
|
||||
{
|
||||
// Wrap aggregated columns in a tuple to match function signature
|
||||
if (!desc.is_agg_func_type && isTuple(desc.function->getReturnType()))
|
||||
{
|
||||
size_t tuple_size = desc.column_numbers.size();
|
||||
MutableColumns tuple_columns(tuple_size);
|
||||
for (size_t i = 0; i < tuple_size; ++i)
|
||||
tuple_columns[i] = header.safeGetByPosition(desc.column_numbers[i]).column->cloneEmpty();
|
||||
|
||||
columns.emplace_back(ColumnTuple::create(std::move(tuple_columns)));
|
||||
}
|
||||
else
|
||||
columns.emplace_back(header.safeGetByPosition(desc.column_numbers[0]).column->cloneEmpty());
|
||||
}
|
||||
|
||||
for (auto & column_number : columns_definition.column_numbers_not_to_aggregate)
|
||||
columns.emplace_back(header.safeGetByPosition(column_number).type->createColumn());
|
||||
|
||||
return columns;
|
||||
}
|
||||
|
||||
static void preprocessChunk(Chunk & chunk)
|
||||
{
|
||||
auto num_rows = chunk.getNumRows();
|
||||
auto columns = chunk.detachColumns();
|
||||
|
||||
for (auto & column : columns)
|
||||
column = column->convertToFullColumnIfConst();
|
||||
|
||||
chunk.setColumns(std::move(columns), num_rows);
|
||||
}
|
||||
|
||||
static void postprocessChunk(
|
||||
Chunk & chunk, size_t num_result_columns,
|
||||
const SummingSortedAlgorithm::ColumnsDefinition & def)
|
||||
{
|
||||
size_t num_rows = chunk.getNumRows();
|
||||
auto columns = chunk.detachColumns();
|
||||
|
||||
Columns res_columns(num_result_columns);
|
||||
size_t next_column = 0;
|
||||
|
||||
for (auto & desc : def.columns_to_aggregate)
|
||||
{
|
||||
auto column = std::move(columns[next_column]);
|
||||
++next_column;
|
||||
|
||||
if (!desc.is_agg_func_type && isTuple(desc.function->getReturnType()))
|
||||
{
|
||||
/// Unpack tuple into block.
|
||||
size_t tuple_size = desc.column_numbers.size();
|
||||
for (size_t i = 0; i < tuple_size; ++i)
|
||||
res_columns[desc.column_numbers[i]] = assert_cast<const ColumnTuple &>(*column).getColumnPtr(i);
|
||||
}
|
||||
else
|
||||
res_columns[desc.column_numbers[0]] = std::move(column);
|
||||
}
|
||||
|
||||
for (auto column_number : def.column_numbers_not_to_aggregate)
|
||||
{
|
||||
auto column = std::move(columns[next_column]);
|
||||
++next_column;
|
||||
|
||||
res_columns[column_number] = std::move(column);
|
||||
}
|
||||
|
||||
chunk.setColumns(std::move(res_columns), num_rows);
|
||||
}
|
||||
|
||||
static void setRow(Row & row, SortCursor & cursor, const Names & column_names)
|
||||
{
|
||||
size_t num_columns = row.size();
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
{
|
||||
try
|
||||
{
|
||||
cursor->all_columns[i]->get(cursor->pos, row[i]);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
|
||||
/// Find out the name of the column and throw more informative exception.
|
||||
|
||||
String column_name;
|
||||
if (i < column_names.size())
|
||||
column_name = column_names[i];
|
||||
|
||||
throw Exception("MergingSortedBlockInputStream failed to read row " + toString(cursor->pos)
|
||||
+ " of column " + toString(i) + (column_name.empty() ? "" : " (" + column_name + ")"),
|
||||
ErrorCodes::CORRUPTED_DATA);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Chunk SummingSortedAlgorithm::SummingMergedData::pull(size_t num_result_columns, const ColumnsDefinition & def)
|
||||
{
|
||||
auto chunk = MergedData::pull();
|
||||
postprocessChunk(chunk, num_result_columns, def);
|
||||
return chunk;
|
||||
}
|
||||
|
||||
SummingSortedAlgorithm::SummingSortedAlgorithm(
|
||||
const Block & header, size_t num_inputs,
|
||||
SortDescription description_,
|
||||
const Names & column_names_to_sum,
|
||||
size_t max_block_size)
|
||||
: IMergingAlgorithmWithDelayedChunk(num_inputs, std::move(description_))
|
||||
, columns_definition(defineColumns(header, description_, column_names_to_sum))
|
||||
, merged_data(getMergedDataColumns(header, columns_definition), false, max_block_size)
|
||||
, column_names(header.getNames())
|
||||
{
|
||||
current_row.resize(header.columns());
|
||||
merged_data.initAggregateDescription(columns_definition.columns_to_aggregate);
|
||||
}
|
||||
|
||||
void SummingSortedAlgorithm::initialize(Chunks chunks)
|
||||
{
|
||||
for (auto & chunk : chunks)
|
||||
if (chunk)
|
||||
preprocessChunk(chunk);
|
||||
|
||||
initializeQueue(std::move(chunks));
|
||||
}
|
||||
|
||||
void SummingSortedAlgorithm::consume(Chunk chunk, size_t source_num)
|
||||
{
|
||||
preprocessChunk(chunk);
|
||||
updateCursor(std::move(chunk), source_num);
|
||||
}
|
||||
|
||||
|
||||
void SummingSortedAlgorithm::insertCurrentRowIfNeeded()
|
||||
{
|
||||
/// We have nothing to aggregate. It means that it could be non-zero, because we have columns_not_to_aggregate.
|
||||
if (columns_definition.columns_to_aggregate.empty())
|
||||
current_row_is_zero = false;
|
||||
|
||||
for (auto & desc : columns_definition.columns_to_aggregate)
|
||||
{
|
||||
// Do not insert if the aggregation state hasn't been created
|
||||
if (desc.created)
|
||||
{
|
||||
if (desc.is_agg_func_type)
|
||||
{
|
||||
current_row_is_zero = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
try
|
||||
{
|
||||
desc.function->insertResultInto(desc.state.data(), *desc.merged_column);
|
||||
|
||||
/// Update zero status of current row
|
||||
if (desc.column_numbers.size() == 1)
|
||||
{
|
||||
// Flag row as non-empty if at least one column number if non-zero
|
||||
current_row_is_zero = current_row_is_zero && desc.merged_column->isDefaultAt(desc.merged_column->size() - 1);
|
||||
}
|
||||
else
|
||||
{
|
||||
/// It is sumMapWithOverflow aggregate function.
|
||||
/// Assume that the row isn't empty in this case (just because it is compatible with previous version)
|
||||
current_row_is_zero = false;
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
desc.destroyState();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
desc.destroyState();
|
||||
}
|
||||
else
|
||||
desc.merged_column->insertDefault();
|
||||
}
|
||||
|
||||
/// If it is "zero" row, then rollback the insertion
|
||||
/// (at this moment we need rollback only cols from columns_to_aggregate)
|
||||
if (current_row_is_zero)
|
||||
{
|
||||
for (auto & desc : columns_definition.columns_to_aggregate)
|
||||
desc.merged_column->popBack(1);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
merged_data.insertRow(current_row, columns_definition.column_numbers_not_to_aggregate);
|
||||
}
|
||||
|
||||
void SummingSortedAlgorithm::addRow(SortCursor & cursor)
|
||||
{
|
||||
for (auto & desc : columns_definition.columns_to_aggregate)
|
||||
{
|
||||
if (!desc.created)
|
||||
throw Exception("Logical error in SummingSortedBlockInputStream, there are no description", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
if (desc.is_agg_func_type)
|
||||
{
|
||||
// desc.state is not used for AggregateFunction types
|
||||
auto & col = cursor->all_columns[desc.column_numbers[0]];
|
||||
assert_cast<ColumnAggregateFunction &>(*desc.merged_column).insertMergeFrom(*col, cursor->pos);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Specialized case for unary functions
|
||||
if (desc.column_numbers.size() == 1)
|
||||
{
|
||||
auto & col = cursor->all_columns[desc.column_numbers[0]];
|
||||
desc.add_function(desc.function.get(), desc.state.data(), &col, cursor->pos, nullptr);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Gather all source columns into a vector
|
||||
ColumnRawPtrs columns(desc.column_numbers.size());
|
||||
for (size_t i = 0; i < desc.column_numbers.size(); ++i)
|
||||
columns[i] = cursor->all_columns[desc.column_numbers[i]];
|
||||
|
||||
desc.add_function(desc.function.get(), desc.state.data(), columns.data(), cursor->pos, nullptr);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
IMergingAlgorithm::Status SummingSortedAlgorithm::merge()
|
||||
{
|
||||
/// Take the rows in needed order and put them in `merged_columns` until rows no more than `max_block_size`
|
||||
while (queue.isValid())
|
||||
{
|
||||
bool key_differs;
|
||||
bool has_previous_group = !last_key.empty();
|
||||
|
||||
SortCursor current = queue.current();
|
||||
|
||||
{
|
||||
detail::RowRef current_key;
|
||||
current_key.set(current);
|
||||
|
||||
if (!has_previous_group) /// The first key encountered.
|
||||
{
|
||||
key_differs = true;
|
||||
current_row_is_zero = true;
|
||||
}
|
||||
else
|
||||
key_differs = !last_key.hasEqualSortColumnsWith(current_key);
|
||||
|
||||
last_key = current_key;
|
||||
last_chunk_sort_columns.clear();
|
||||
}
|
||||
|
||||
if (key_differs)
|
||||
{
|
||||
if (has_previous_group)
|
||||
/// Write the data for the previous group.
|
||||
insertCurrentRowIfNeeded();
|
||||
|
||||
if (merged_data.hasEnoughRows())
|
||||
{
|
||||
/// The block is now full and the last row is calculated completely.
|
||||
last_key.reset();
|
||||
return Status(merged_data.pull(column_names.size(), columns_definition));
|
||||
}
|
||||
|
||||
setRow(current_row, current, column_names);
|
||||
|
||||
/// Reset aggregation states for next row
|
||||
for (auto & desc : columns_definition.columns_to_aggregate)
|
||||
desc.createState();
|
||||
|
||||
// Start aggregations with current row
|
||||
addRow(current);
|
||||
|
||||
if (columns_definition.maps_to_sum.empty())
|
||||
{
|
||||
/// We have only columns_to_aggregate. The status of current row will be determined
|
||||
/// in 'insertCurrentRowIfNeeded' method on the values of aggregate functions.
|
||||
current_row_is_zero = true; // NOLINT
|
||||
}
|
||||
else
|
||||
{
|
||||
/// We have complex maps that will be summed with 'mergeMap' method.
|
||||
/// The single row is considered non zero, and the status after merging with other rows
|
||||
/// will be determined in the branch below (when key_differs == false).
|
||||
current_row_is_zero = false; // NOLINT
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
addRow(current);
|
||||
|
||||
// Merge maps only for same rows
|
||||
for (const auto & desc : columns_definition.maps_to_sum)
|
||||
if (mergeMap(desc, current_row, current))
|
||||
current_row_is_zero = false;
|
||||
}
|
||||
|
||||
if (!current->isLast())
|
||||
{
|
||||
queue.next();
|
||||
}
|
||||
else
|
||||
{
|
||||
/// We get the next block from the corresponding source, if there is one.
|
||||
queue.removeTop();
|
||||
return Status(current.impl->order);
|
||||
}
|
||||
}
|
||||
|
||||
/// We will write the data for the last group, if it is non-zero.
|
||||
/// If it is zero, and without it the output stream will be empty, we will write it anyway.
|
||||
insertCurrentRowIfNeeded();
|
||||
last_chunk_sort_columns.clear();
|
||||
return Status(merged_data.pull(column_names.size(), columns_definition), true);
|
||||
}
|
||||
|
||||
|
||||
}
|
152
src/Processors/Merges/SummingSortedAlgorithm.h
Normal file
152
src/Processors/Merges/SummingSortedAlgorithm.h
Normal file
@ -0,0 +1,152 @@
|
||||
#pragma once
|
||||
|
||||
#include <Processors/Merges/IMergingAlgorithmWithDelayedChunk.h>
|
||||
#include <Processors/Merges/MergedData.h>
|
||||
#include <Core/Row.h>
|
||||
#include <AggregateFunctions/IAggregateFunction.h>
|
||||
#include <Common/AlignedBuffer.h>
|
||||
#include <AggregateFunctions/AggregateFunctionFactory.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class SummingSortedAlgorithm : public IMergingAlgorithmWithDelayedChunk
|
||||
{
|
||||
public:
|
||||
SummingSortedAlgorithm(
|
||||
const Block & header, size_t num_inputs,
|
||||
SortDescription description_,
|
||||
/// List of columns to be summed. If empty, all numeric columns that are not in the description are taken.
|
||||
const Names & column_names_to_sum,
|
||||
size_t max_block_size);
|
||||
|
||||
void initialize(Chunks chunks) override;
|
||||
void consume(Chunk chunk, size_t source_num) override;
|
||||
Status merge() override;
|
||||
|
||||
struct AggregateDescription;
|
||||
|
||||
/// Stores numbers of key-columns and value-columns.
|
||||
struct MapDescription
|
||||
{
|
||||
std::vector<size_t> key_col_nums;
|
||||
std::vector<size_t> val_col_nums;
|
||||
};
|
||||
|
||||
/// This structure define columns into one of three types:
|
||||
/// * columns which values not needed to be aggregated
|
||||
/// * aggregate functions and columns which needed to be summed
|
||||
/// * mapping for nested columns
|
||||
struct ColumnsDefinition
|
||||
{
|
||||
/// Columns with which values should not be aggregated.
|
||||
ColumnNumbers column_numbers_not_to_aggregate;
|
||||
/// Columns which should be aggregated.
|
||||
std::vector<AggregateDescription> columns_to_aggregate;
|
||||
/// Mapping for nested columns.
|
||||
std::vector<MapDescription> maps_to_sum;
|
||||
|
||||
size_t getNumColumns() const { return column_numbers_not_to_aggregate.size() + columns_to_aggregate.size(); }
|
||||
};
|
||||
|
||||
/// Specialization for SummingSortedTransform. Inserts only data for non-aggregated columns.
|
||||
class SummingMergedData : public MergedData
|
||||
{
|
||||
private:
|
||||
using MergedData::pull;
|
||||
|
||||
public:
|
||||
using MergedData::MergedData;
|
||||
|
||||
void insertRow(const Row & row, const ColumnNumbers & column_numbers)
|
||||
{
|
||||
size_t next_column = columns.size() - column_numbers.size();
|
||||
for (auto column_number : column_numbers)
|
||||
{
|
||||
columns[next_column]->insert(row[column_number]);
|
||||
++next_column;
|
||||
}
|
||||
|
||||
++total_merged_rows;
|
||||
++merged_rows;
|
||||
/// TODO: sum_blocks_granularity += block_size;
|
||||
}
|
||||
|
||||
/// Initialize aggregate descriptions with columns.
|
||||
void initAggregateDescription(std::vector<AggregateDescription> & columns_to_aggregate)
|
||||
{
|
||||
size_t num_columns = columns_to_aggregate.size();
|
||||
for (size_t column_number = 0; column_number < num_columns; ++column_number)
|
||||
columns_to_aggregate[column_number].merged_column = columns[column_number].get();
|
||||
}
|
||||
|
||||
Chunk pull(size_t num_result_columns, const ColumnsDefinition & def);
|
||||
};
|
||||
|
||||
private:
|
||||
Row current_row;
|
||||
bool current_row_is_zero = true; /// Are all summed columns zero (or empty)? It is updated incrementally.
|
||||
|
||||
ColumnsDefinition columns_definition;
|
||||
SummingMergedData merged_data;
|
||||
|
||||
Names column_names;
|
||||
|
||||
void addRow(SortCursor & cursor);
|
||||
void insertCurrentRowIfNeeded();
|
||||
|
||||
public:
|
||||
/// Stores aggregation function, state, and columns to be used as function arguments.
|
||||
struct AggregateDescription
|
||||
{
|
||||
/// An aggregate function 'sumWithOverflow' or 'sumMapWithOverflow' for summing.
|
||||
AggregateFunctionPtr function;
|
||||
IAggregateFunction::AddFunc add_function = nullptr;
|
||||
std::vector<size_t> column_numbers;
|
||||
IColumn * merged_column = nullptr;
|
||||
AlignedBuffer state;
|
||||
bool created = false;
|
||||
|
||||
/// In case when column has type AggregateFunction: use the aggregate function from itself instead of 'function' above.
|
||||
bool is_agg_func_type = false;
|
||||
|
||||
void init(const char * function_name, const DataTypes & argument_types)
|
||||
{
|
||||
function = AggregateFunctionFactory::instance().get(function_name, argument_types);
|
||||
add_function = function->getAddressOfAddFunction();
|
||||
state.reset(function->sizeOfData(), function->alignOfData());
|
||||
}
|
||||
|
||||
void createState()
|
||||
{
|
||||
if (created)
|
||||
return;
|
||||
if (is_agg_func_type)
|
||||
merged_column->insertDefault();
|
||||
else
|
||||
function->create(state.data());
|
||||
created = true;
|
||||
}
|
||||
|
||||
void destroyState()
|
||||
{
|
||||
if (!created)
|
||||
return;
|
||||
if (!is_agg_func_type)
|
||||
function->destroy(state.data());
|
||||
created = false;
|
||||
}
|
||||
|
||||
/// Explicitly destroy aggregation state if the stream is terminated
|
||||
~AggregateDescription()
|
||||
{
|
||||
destroyState();
|
||||
}
|
||||
|
||||
AggregateDescription() = default;
|
||||
AggregateDescription(AggregateDescription &&) = default;
|
||||
AggregateDescription(const AggregateDescription &) = delete;
|
||||
};
|
||||
};
|
||||
|
||||
}
|
@ -1,626 +0,0 @@
|
||||
#include <Processors/Merges/SummingSortedTransform.h>
|
||||
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
#include <DataTypes/NestedUtils.h>
|
||||
#include <Columns/ColumnAggregateFunction.h>
|
||||
#include <Columns/ColumnTuple.h>
|
||||
#include <Common/StringUtils/StringUtils.h>
|
||||
#include <Common/FieldVisitors.h>
|
||||
#include <Core/Row.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int CORRUPTED_DATA;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
bool isInPrimaryKey(const SortDescription & description, const std::string & name, const size_t number)
|
||||
{
|
||||
for (auto & desc : description)
|
||||
if (desc.column_name == name || (desc.column_name.empty() && desc.column_number == number))
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/// Returns true if merge result is not empty
|
||||
bool mergeMap(const SummingSortedTransform::MapDescription & desc, Row & row, SortCursor & cursor)
|
||||
{
|
||||
/// Strongly non-optimal.
|
||||
|
||||
Row & left = row;
|
||||
Row right(left.size());
|
||||
|
||||
for (size_t col_num : desc.key_col_nums)
|
||||
right[col_num] = (*cursor->all_columns[col_num])[cursor->pos].template get<Array>();
|
||||
|
||||
for (size_t col_num : desc.val_col_nums)
|
||||
right[col_num] = (*cursor->all_columns[col_num])[cursor->pos].template get<Array>();
|
||||
|
||||
auto at_ith_column_jth_row = [&](const Row & matrix, size_t i, size_t j) -> const Field &
|
||||
{
|
||||
return matrix[i].get<Array>()[j];
|
||||
};
|
||||
|
||||
auto tuple_of_nth_columns_at_jth_row = [&](const Row & matrix, const ColumnNumbers & col_nums, size_t j) -> Array
|
||||
{
|
||||
size_t size = col_nums.size();
|
||||
Array res(size);
|
||||
for (size_t col_num_index = 0; col_num_index < size; ++col_num_index)
|
||||
res[col_num_index] = at_ith_column_jth_row(matrix, col_nums[col_num_index], j);
|
||||
return res;
|
||||
};
|
||||
|
||||
std::map<Array, Array> merged;
|
||||
|
||||
auto accumulate = [](Array & dst, const Array & src)
|
||||
{
|
||||
bool has_non_zero = false;
|
||||
size_t size = dst.size();
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
if (applyVisitor(FieldVisitorSum(src[i]), dst[i]))
|
||||
has_non_zero = true;
|
||||
return has_non_zero;
|
||||
};
|
||||
|
||||
auto merge = [&](const Row & matrix)
|
||||
{
|
||||
size_t rows = matrix[desc.key_col_nums[0]].get<Array>().size();
|
||||
|
||||
for (size_t j = 0; j < rows; ++j)
|
||||
{
|
||||
Array key = tuple_of_nth_columns_at_jth_row(matrix, desc.key_col_nums, j);
|
||||
Array value = tuple_of_nth_columns_at_jth_row(matrix, desc.val_col_nums, j);
|
||||
|
||||
auto it = merged.find(key);
|
||||
if (merged.end() == it)
|
||||
merged.emplace(std::move(key), std::move(value));
|
||||
else
|
||||
{
|
||||
if (!accumulate(it->second, value))
|
||||
merged.erase(it);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
merge(left);
|
||||
merge(right);
|
||||
|
||||
for (size_t col_num : desc.key_col_nums)
|
||||
row[col_num] = Array(merged.size());
|
||||
for (size_t col_num : desc.val_col_nums)
|
||||
row[col_num] = Array(merged.size());
|
||||
|
||||
size_t row_num = 0;
|
||||
for (const auto & key_value : merged)
|
||||
{
|
||||
for (size_t col_num_index = 0, size = desc.key_col_nums.size(); col_num_index < size; ++col_num_index)
|
||||
row[desc.key_col_nums[col_num_index]].get<Array>()[row_num] = key_value.first[col_num_index];
|
||||
|
||||
for (size_t col_num_index = 0, size = desc.val_col_nums.size(); col_num_index < size; ++col_num_index)
|
||||
row[desc.val_col_nums[col_num_index]].get<Array>()[row_num] = key_value.second[col_num_index];
|
||||
|
||||
++row_num;
|
||||
}
|
||||
|
||||
return row_num != 0;
|
||||
}
|
||||
|
||||
SummingSortedTransform::ColumnsDefinition defineColumns(
|
||||
const Block & header,
|
||||
const SortDescription & description,
|
||||
const Names & column_names_to_sum)
|
||||
{
|
||||
size_t num_columns = header.columns();
|
||||
SummingSortedTransform::ColumnsDefinition def;
|
||||
|
||||
/// name of nested structure -> the column numbers that refer to it.
|
||||
std::unordered_map<std::string, std::vector<size_t>> discovered_maps;
|
||||
|
||||
/** Fill in the column numbers, which must be summed.
|
||||
* This can only be numeric columns that are not part of the sort key.
|
||||
* If a non-empty column_names_to_sum is specified, then we only take these columns.
|
||||
* Some columns from column_names_to_sum may not be found. This is ignored.
|
||||
*/
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
{
|
||||
const ColumnWithTypeAndName & column = header.safeGetByPosition(i);
|
||||
|
||||
/// Discover nested Maps and find columns for summation
|
||||
if (typeid_cast<const DataTypeArray *>(column.type.get()))
|
||||
{
|
||||
const auto map_name = Nested::extractTableName(column.name);
|
||||
/// if nested table name ends with `Map` it is a possible candidate for special handling
|
||||
if (map_name == column.name || !endsWith(map_name, "Map"))
|
||||
{
|
||||
def.column_numbers_not_to_aggregate.push_back(i);
|
||||
continue;
|
||||
}
|
||||
|
||||
discovered_maps[map_name].emplace_back(i);
|
||||
}
|
||||
else
|
||||
{
|
||||
bool is_agg_func = WhichDataType(column.type).isAggregateFunction();
|
||||
|
||||
/// There are special const columns for example after prewhere sections.
|
||||
if ((!column.type->isSummable() && !is_agg_func) || isColumnConst(*column.column))
|
||||
{
|
||||
def.column_numbers_not_to_aggregate.push_back(i);
|
||||
continue;
|
||||
}
|
||||
|
||||
/// Are they inside the PK?
|
||||
if (isInPrimaryKey(description, column.name, i))
|
||||
{
|
||||
def.column_numbers_not_to_aggregate.push_back(i);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (column_names_to_sum.empty()
|
||||
|| column_names_to_sum.end() !=
|
||||
std::find(column_names_to_sum.begin(), column_names_to_sum.end(), column.name))
|
||||
{
|
||||
// Create aggregator to sum this column
|
||||
SummingSortedTransform::AggregateDescription desc;
|
||||
desc.is_agg_func_type = is_agg_func;
|
||||
desc.column_numbers = {i};
|
||||
|
||||
if (!is_agg_func)
|
||||
{
|
||||
desc.init("sumWithOverflow", {column.type});
|
||||
}
|
||||
|
||||
def.columns_to_aggregate.emplace_back(std::move(desc));
|
||||
}
|
||||
else
|
||||
{
|
||||
// Column is not going to be summed, use last value
|
||||
def.column_numbers_not_to_aggregate.push_back(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// select actual nested Maps from list of candidates
|
||||
for (const auto & map : discovered_maps)
|
||||
{
|
||||
/// map should contain at least two elements (key -> value)
|
||||
if (map.second.size() < 2)
|
||||
{
|
||||
for (auto col : map.second)
|
||||
def.column_numbers_not_to_aggregate.push_back(col);
|
||||
continue;
|
||||
}
|
||||
|
||||
/// no elements of map could be in primary key
|
||||
auto column_num_it = map.second.begin();
|
||||
for (; column_num_it != map.second.end(); ++column_num_it)
|
||||
if (isInPrimaryKey(description, header.safeGetByPosition(*column_num_it).name, *column_num_it))
|
||||
break;
|
||||
if (column_num_it != map.second.end())
|
||||
{
|
||||
for (auto col : map.second)
|
||||
def.column_numbers_not_to_aggregate.push_back(col);
|
||||
continue;
|
||||
}
|
||||
|
||||
DataTypes argument_types;
|
||||
SummingSortedTransform::AggregateDescription desc;
|
||||
SummingSortedTransform::MapDescription map_desc;
|
||||
|
||||
column_num_it = map.second.begin();
|
||||
for (; column_num_it != map.second.end(); ++column_num_it)
|
||||
{
|
||||
const ColumnWithTypeAndName & key_col = header.safeGetByPosition(*column_num_it);
|
||||
const String & name = key_col.name;
|
||||
const IDataType & nested_type = *assert_cast<const DataTypeArray &>(*key_col.type).getNestedType();
|
||||
|
||||
if (column_num_it == map.second.begin()
|
||||
|| endsWith(name, "ID")
|
||||
|| endsWith(name, "Key")
|
||||
|| endsWith(name, "Type"))
|
||||
{
|
||||
if (!nested_type.isValueRepresentedByInteger() && !isStringOrFixedString(nested_type))
|
||||
break;
|
||||
|
||||
map_desc.key_col_nums.push_back(*column_num_it);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!nested_type.isSummable())
|
||||
break;
|
||||
|
||||
map_desc.val_col_nums.push_back(*column_num_it);
|
||||
}
|
||||
|
||||
// Add column to function arguments
|
||||
desc.column_numbers.push_back(*column_num_it);
|
||||
argument_types.push_back(key_col.type);
|
||||
}
|
||||
|
||||
if (column_num_it != map.second.end())
|
||||
{
|
||||
for (auto col : map.second)
|
||||
def.column_numbers_not_to_aggregate.push_back(col);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (map_desc.key_col_nums.size() == 1)
|
||||
{
|
||||
// Create summation for all value columns in the map
|
||||
desc.init("sumMapWithOverflow", argument_types);
|
||||
def.columns_to_aggregate.emplace_back(std::move(desc));
|
||||
}
|
||||
else
|
||||
{
|
||||
// Fall back to legacy mergeMaps for composite keys
|
||||
for (auto col : map.second)
|
||||
def.column_numbers_not_to_aggregate.push_back(col);
|
||||
def.maps_to_sum.emplace_back(std::move(map_desc));
|
||||
}
|
||||
}
|
||||
|
||||
return def;
|
||||
}
|
||||
|
||||
MutableColumns getMergedDataColumns(
|
||||
const Block & header,
|
||||
const SummingSortedTransform::ColumnsDefinition & columns_definition)
|
||||
{
|
||||
MutableColumns columns;
|
||||
columns.reserve(columns_definition.getNumColumns());
|
||||
|
||||
for (auto & desc : columns_definition.columns_to_aggregate)
|
||||
{
|
||||
// Wrap aggregated columns in a tuple to match function signature
|
||||
if (!desc.is_agg_func_type && isTuple(desc.function->getReturnType()))
|
||||
{
|
||||
size_t tuple_size = desc.column_numbers.size();
|
||||
MutableColumns tuple_columns(tuple_size);
|
||||
for (size_t i = 0; i < tuple_size; ++i)
|
||||
tuple_columns[i] = header.safeGetByPosition(desc.column_numbers[i]).column->cloneEmpty();
|
||||
|
||||
columns.emplace_back(ColumnTuple::create(std::move(tuple_columns)));
|
||||
}
|
||||
else
|
||||
columns.emplace_back(header.safeGetByPosition(desc.column_numbers[0]).column->cloneEmpty());
|
||||
}
|
||||
|
||||
for (auto & column_number : columns_definition.column_numbers_not_to_aggregate)
|
||||
columns.emplace_back(header.safeGetByPosition(column_number).type->createColumn());
|
||||
|
||||
return columns;
|
||||
}
|
||||
|
||||
void finalizeChunk(
|
||||
Chunk & chunk, size_t num_result_columns,
|
||||
const SummingSortedTransform::ColumnsDefinition & columns_definition)
|
||||
{
|
||||
size_t num_rows = chunk.getNumRows();
|
||||
auto columns = chunk.detachColumns();
|
||||
|
||||
Columns res_columns(num_result_columns);
|
||||
size_t next_column = 0;
|
||||
|
||||
for (auto & desc : columns_definition.columns_to_aggregate)
|
||||
{
|
||||
auto column = std::move(columns[next_column]);
|
||||
++next_column;
|
||||
|
||||
if (!desc.is_agg_func_type && isTuple(desc.function->getReturnType()))
|
||||
{
|
||||
/// Unpack tuple into block.
|
||||
size_t tuple_size = desc.column_numbers.size();
|
||||
for (size_t i = 0; i < tuple_size; ++i)
|
||||
res_columns[desc.column_numbers[i]] = assert_cast<const ColumnTuple &>(*column).getColumnPtr(i);
|
||||
}
|
||||
else
|
||||
res_columns[desc.column_numbers[0]] = std::move(column);
|
||||
}
|
||||
|
||||
for (auto column_number : columns_definition.column_numbers_not_to_aggregate)
|
||||
{
|
||||
auto column = std::move(columns[next_column]);
|
||||
++next_column;
|
||||
|
||||
res_columns[column_number] = std::move(column);
|
||||
}
|
||||
|
||||
chunk.setColumns(std::move(res_columns), num_rows);
|
||||
}
|
||||
|
||||
void setRow(Row & row, SortCursor & cursor, const Block & header)
|
||||
{
|
||||
size_t num_columns = row.size();
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
{
|
||||
try
|
||||
{
|
||||
cursor->all_columns[i]->get(cursor->pos, row[i]);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
|
||||
/// Find out the name of the column and throw more informative exception.
|
||||
|
||||
String column_name;
|
||||
if (i < header.columns())
|
||||
{
|
||||
column_name = header.safeGetByPosition(i).name;
|
||||
break;
|
||||
}
|
||||
|
||||
throw Exception("MergingSortedBlockInputStream failed to read row " + toString(cursor->pos)
|
||||
+ " of column " + toString(i) + (column_name.empty() ? "" : " (" + column_name + ")"),
|
||||
ErrorCodes::CORRUPTED_DATA);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
SummingSortedTransform::SummingSortedTransform(
|
||||
const Block & header, size_t num_inputs,
|
||||
SortDescription description_,
|
||||
/// List of columns to be summed. If empty, all numeric columns that are not in the description are taken.
|
||||
const Names & column_names_to_sum,
|
||||
size_t max_block_size)
|
||||
: IMergingTransform(num_inputs, header, header, true)
|
||||
, columns_definition(defineColumns(header, description_, column_names_to_sum))
|
||||
, merged_data(getMergedDataColumns(header, columns_definition), false, max_block_size)
|
||||
, description(std::move(description_))
|
||||
, source_chunks(num_inputs)
|
||||
, cursors(num_inputs)
|
||||
{
|
||||
current_row.resize(header.columns());
|
||||
merged_data.initAggregateDescription(columns_definition.columns_to_aggregate);
|
||||
}
|
||||
|
||||
void SummingSortedTransform::initializeInputs()
|
||||
{
|
||||
queue = SortingHeap<SortCursor>(cursors);
|
||||
is_queue_initialized = true;
|
||||
}
|
||||
|
||||
void SummingSortedTransform::consume(Chunk chunk, size_t input_number)
|
||||
{
|
||||
updateCursor(std::move(chunk), input_number);
|
||||
|
||||
if (is_queue_initialized)
|
||||
queue.push(cursors[input_number]);
|
||||
}
|
||||
|
||||
void SummingSortedTransform::updateCursor(Chunk chunk, size_t source_num)
|
||||
{
|
||||
auto num_rows = chunk.getNumRows();
|
||||
auto columns = chunk.detachColumns();
|
||||
for (auto & column : columns)
|
||||
column = column->convertToFullColumnIfConst();
|
||||
|
||||
chunk.setColumns(std::move(columns), num_rows);
|
||||
|
||||
auto & source_chunk = source_chunks[source_num];
|
||||
|
||||
if (source_chunk)
|
||||
{
|
||||
/// Extend lifetime of last chunk.
|
||||
last_chunk = std::move(source_chunk);
|
||||
last_chunk_sort_columns = std::move(cursors[source_num].sort_columns);
|
||||
|
||||
source_chunk = std::move(chunk);
|
||||
cursors[source_num].reset(source_chunk.getColumns(), {});
|
||||
}
|
||||
else
|
||||
{
|
||||
if (cursors[source_num].has_collation)
|
||||
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
source_chunk = std::move(chunk);
|
||||
cursors[source_num] = SortCursorImpl(source_chunk.getColumns(), description, source_num);
|
||||
}
|
||||
}
|
||||
|
||||
void SummingSortedTransform::work()
|
||||
{
|
||||
merge();
|
||||
prepareOutputChunk(merged_data);
|
||||
|
||||
if (has_output_chunk)
|
||||
{
|
||||
finalizeChunk(output_chunk, getOutputs().back().getHeader().columns(), columns_definition);
|
||||
merged_data.initAggregateDescription(columns_definition.columns_to_aggregate);
|
||||
}
|
||||
}
|
||||
|
||||
void SummingSortedTransform::insertCurrentRowIfNeeded()
|
||||
{
|
||||
/// We have nothing to aggregate. It means that it could be non-zero, because we have columns_not_to_aggregate.
|
||||
if (columns_definition.columns_to_aggregate.empty())
|
||||
current_row_is_zero = false;
|
||||
|
||||
for (auto & desc : columns_definition.columns_to_aggregate)
|
||||
{
|
||||
// Do not insert if the aggregation state hasn't been created
|
||||
if (desc.created)
|
||||
{
|
||||
if (desc.is_agg_func_type)
|
||||
{
|
||||
current_row_is_zero = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
try
|
||||
{
|
||||
desc.function->insertResultInto(desc.state.data(), *desc.merged_column);
|
||||
|
||||
/// Update zero status of current row
|
||||
if (desc.column_numbers.size() == 1)
|
||||
{
|
||||
// Flag row as non-empty if at least one column number if non-zero
|
||||
current_row_is_zero = current_row_is_zero && desc.merged_column->isDefaultAt(desc.merged_column->size() - 1);
|
||||
}
|
||||
else
|
||||
{
|
||||
/// It is sumMapWithOverflow aggregate function.
|
||||
/// Assume that the row isn't empty in this case (just because it is compatible with previous version)
|
||||
current_row_is_zero = false;
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
desc.destroyState();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
desc.destroyState();
|
||||
}
|
||||
else
|
||||
desc.merged_column->insertDefault();
|
||||
}
|
||||
|
||||
/// If it is "zero" row, then rollback the insertion
|
||||
/// (at this moment we need rollback only cols from columns_to_aggregate)
|
||||
if (current_row_is_zero)
|
||||
{
|
||||
for (auto & desc : columns_definition.columns_to_aggregate)
|
||||
desc.merged_column->popBack(1);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
merged_data.insertRow(current_row, columns_definition.column_numbers_not_to_aggregate);
|
||||
}
|
||||
|
||||
void SummingSortedTransform::addRow(SortCursor & cursor)
|
||||
{
|
||||
for (auto & desc : columns_definition.columns_to_aggregate)
|
||||
{
|
||||
if (!desc.created)
|
||||
throw Exception("Logical error in SummingSortedBlockInputStream, there are no description", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
if (desc.is_agg_func_type)
|
||||
{
|
||||
// desc.state is not used for AggregateFunction types
|
||||
auto & col = cursor->all_columns[desc.column_numbers[0]];
|
||||
assert_cast<ColumnAggregateFunction &>(*desc.merged_column).insertMergeFrom(*col, cursor->pos);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Specialized case for unary functions
|
||||
if (desc.column_numbers.size() == 1)
|
||||
{
|
||||
auto & col = cursor->all_columns[desc.column_numbers[0]];
|
||||
desc.add_function(desc.function.get(), desc.state.data(), &col, cursor->pos, nullptr);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Gather all source columns into a vector
|
||||
ColumnRawPtrs columns(desc.column_numbers.size());
|
||||
for (size_t i = 0; i < desc.column_numbers.size(); ++i)
|
||||
columns[i] = cursor->all_columns[desc.column_numbers[i]];
|
||||
|
||||
desc.add_function(desc.function.get(), desc.state.data(), columns.data(), cursor->pos, nullptr);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void SummingSortedTransform::merge()
|
||||
{
|
||||
/// Take the rows in needed order and put them in `merged_columns` until rows no more than `max_block_size`
|
||||
while (queue.isValid())
|
||||
{
|
||||
bool key_differs;
|
||||
bool has_previous_group = !last_key.empty();
|
||||
|
||||
SortCursor current = queue.current();
|
||||
|
||||
{
|
||||
detail::RowRef current_key;
|
||||
current_key.set(current);
|
||||
|
||||
if (!has_previous_group) /// The first key encountered.
|
||||
{
|
||||
key_differs = true;
|
||||
current_row_is_zero = true;
|
||||
}
|
||||
else
|
||||
key_differs = !last_key.hasEqualSortColumnsWith(current_key);
|
||||
|
||||
last_key = current_key;
|
||||
last_chunk_sort_columns.clear();
|
||||
}
|
||||
|
||||
if (key_differs)
|
||||
{
|
||||
if (has_previous_group)
|
||||
/// Write the data for the previous group.
|
||||
insertCurrentRowIfNeeded();
|
||||
|
||||
if (merged_data.hasEnoughRows())
|
||||
{
|
||||
/// The block is now full and the last row is calculated completely.
|
||||
last_key.reset();
|
||||
return;
|
||||
}
|
||||
|
||||
setRow(current_row, current, getInputs().front().getHeader());
|
||||
|
||||
/// Reset aggregation states for next row
|
||||
for (auto & desc : columns_definition.columns_to_aggregate)
|
||||
desc.createState();
|
||||
|
||||
// Start aggregations with current row
|
||||
addRow(current);
|
||||
|
||||
if (columns_definition.maps_to_sum.empty())
|
||||
{
|
||||
/// We have only columns_to_aggregate. The status of current row will be determined
|
||||
/// in 'insertCurrentRowIfNeeded' method on the values of aggregate functions.
|
||||
current_row_is_zero = true; // NOLINT
|
||||
}
|
||||
else
|
||||
{
|
||||
/// We have complex maps that will be summed with 'mergeMap' method.
|
||||
/// The single row is considered non zero, and the status after merging with other rows
|
||||
/// will be determined in the branch below (when key_differs == false).
|
||||
current_row_is_zero = false; // NOLINT
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
addRow(current);
|
||||
|
||||
// Merge maps only for same rows
|
||||
for (const auto & desc : columns_definition.maps_to_sum)
|
||||
if (mergeMap(desc, current_row, current))
|
||||
current_row_is_zero = false;
|
||||
}
|
||||
|
||||
if (!current->isLast())
|
||||
{
|
||||
queue.next();
|
||||
}
|
||||
else
|
||||
{
|
||||
/// We get the next block from the corresponding source, if there is one.
|
||||
queue.removeTop();
|
||||
requestDataForInput(current.impl->order);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/// We will write the data for the last group, if it is non-zero.
|
||||
/// If it is zero, and without it the output stream will be empty, we will write it anyway.
|
||||
insertCurrentRowIfNeeded();
|
||||
last_chunk_sort_columns.clear();
|
||||
is_finished = true;
|
||||
}
|
||||
|
||||
}
|
@ -1,15 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <Processors/Merges/IMergingTransform.h>
|
||||
#include <Processors/Merges/MergedData.h>
|
||||
#include <Processors/Merges/RowRef.h>
|
||||
|
||||
#include <AggregateFunctions/IAggregateFunction.h>
|
||||
#include <AggregateFunctions/AggregateFunctionFactory.h>
|
||||
#include <Common/AlignedBuffer.h>
|
||||
#include <Core/SortDescription.h>
|
||||
#include <Core/SortCursor.h>
|
||||
#include <Core/Row.h>
|
||||
#include <Processors/Merges/SummingSortedAlgorithm.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -19,7 +11,7 @@ namespace DB
|
||||
* collapses them into one row, summing all the numeric columns except the primary key.
|
||||
* If in all numeric columns, except for the primary key, the result is zero, it deletes the row.
|
||||
*/
|
||||
class SummingSortedTransform final : public IMergingTransform
|
||||
class SummingSortedTransform final : public IMergingTransform2<SummingSortedAlgorithm>
|
||||
{
|
||||
public:
|
||||
|
||||
@ -28,146 +20,18 @@ public:
|
||||
SortDescription description_,
|
||||
/// List of columns to be summed. If empty, all numeric columns that are not in the description are taken.
|
||||
const Names & column_names_to_sum,
|
||||
size_t max_block_size);
|
||||
|
||||
struct AggregateDescription;
|
||||
|
||||
/// Stores numbers of key-columns and value-columns.
|
||||
struct MapDescription
|
||||
size_t max_block_size)
|
||||
: IMergingTransform2(
|
||||
num_inputs, header, header, true,
|
||||
header,
|
||||
num_inputs,
|
||||
std::move(description_),
|
||||
column_names_to_sum,
|
||||
max_block_size)
|
||||
{
|
||||
std::vector<size_t> key_col_nums;
|
||||
std::vector<size_t> val_col_nums;
|
||||
};
|
||||
|
||||
struct ColumnsDefinition
|
||||
{
|
||||
/// Columns with which values should be summed.
|
||||
ColumnNumbers column_numbers_not_to_aggregate;
|
||||
/// Columns which should be aggregated.
|
||||
std::vector<AggregateDescription> columns_to_aggregate;
|
||||
/// Mapping for nested columns.
|
||||
std::vector<MapDescription> maps_to_sum;
|
||||
|
||||
size_t getNumColumns() const { return column_numbers_not_to_aggregate.size() + columns_to_aggregate.size(); }
|
||||
};
|
||||
|
||||
/// Specialization for SummingSortedTransform. Inserts only data for non-aggregated columns.
|
||||
struct SummingMergedData : public MergedData
|
||||
{
|
||||
public:
|
||||
using MergedData::MergedData;
|
||||
|
||||
void insertRow(const Row & row, const ColumnNumbers & column_numbers)
|
||||
{
|
||||
size_t next_column = columns.size() - column_numbers.size();
|
||||
for (auto column_number : column_numbers)
|
||||
{
|
||||
columns[next_column]->insert(row[column_number]);
|
||||
++next_column;
|
||||
}
|
||||
|
||||
++total_merged_rows;
|
||||
++merged_rows;
|
||||
/// TODO: sum_blocks_granularity += block_size;
|
||||
}
|
||||
|
||||
/// Initialize aggregate descriptions with columns.
|
||||
void initAggregateDescription(std::vector<AggregateDescription> & columns_to_aggregate)
|
||||
{
|
||||
size_t num_columns = columns_to_aggregate.size();
|
||||
for (size_t column_number = 0; column_number < num_columns; ++column_number)
|
||||
columns_to_aggregate[column_number].merged_column = columns[column_number].get();
|
||||
}
|
||||
};
|
||||
|
||||
String getName() const override { return "SummingSortedTransform"; }
|
||||
void work() override;
|
||||
|
||||
protected:
|
||||
void initializeInputs() override;
|
||||
void consume(Chunk chunk, size_t input_number) override;
|
||||
|
||||
private:
|
||||
Row current_row;
|
||||
bool current_row_is_zero = true; /// Are all summed columns zero (or empty)? It is updated incrementally.
|
||||
|
||||
ColumnsDefinition columns_definition;
|
||||
SummingMergedData merged_data;
|
||||
|
||||
SortDescription description;
|
||||
|
||||
/// Chunks currently being merged.
|
||||
std::vector<Chunk> source_chunks;
|
||||
SortCursorImpls cursors;
|
||||
|
||||
/// In merging algorithm, we need to compare current sort key with the last one.
|
||||
/// So, sorting columns for last row needed to be stored.
|
||||
/// In order to do it, we extend lifetime of last chunk and it's sort columns (from corresponding sort cursor).
|
||||
Chunk last_chunk;
|
||||
ColumnRawPtrs last_chunk_sort_columns; /// Point to last_chunk if valid.
|
||||
|
||||
detail::RowRef last_key;
|
||||
|
||||
SortingHeap<SortCursor> queue;
|
||||
bool is_queue_initialized = false;
|
||||
|
||||
void merge();
|
||||
void updateCursor(Chunk chunk, size_t source_num);
|
||||
void addRow(SortCursor & cursor);
|
||||
void insertCurrentRowIfNeeded();
|
||||
|
||||
public:
|
||||
/// Stores aggregation function, state, and columns to be used as function arguments.
|
||||
struct AggregateDescription
|
||||
{
|
||||
/// An aggregate function 'sumWithOverflow' or 'sumMapWithOverflow' for summing.
|
||||
AggregateFunctionPtr function;
|
||||
IAggregateFunction::AddFunc add_function = nullptr;
|
||||
std::vector<size_t> column_numbers;
|
||||
IColumn * merged_column = nullptr;
|
||||
AlignedBuffer state;
|
||||
bool created = false;
|
||||
|
||||
/// In case when column has type AggregateFunction: use the aggregate function from itself instead of 'function' above.
|
||||
bool is_agg_func_type = false;
|
||||
|
||||
void init(const char * function_name, const DataTypes & argument_types)
|
||||
{
|
||||
function = AggregateFunctionFactory::instance().get(function_name, argument_types);
|
||||
add_function = function->getAddressOfAddFunction();
|
||||
state.reset(function->sizeOfData(), function->alignOfData());
|
||||
}
|
||||
|
||||
void createState()
|
||||
{
|
||||
if (created)
|
||||
return;
|
||||
if (is_agg_func_type)
|
||||
merged_column->insertDefault();
|
||||
else
|
||||
function->create(state.data());
|
||||
created = true;
|
||||
}
|
||||
|
||||
void destroyState()
|
||||
{
|
||||
if (!created)
|
||||
return;
|
||||
if (!is_agg_func_type)
|
||||
function->destroy(state.data());
|
||||
created = false;
|
||||
}
|
||||
|
||||
/// Explicitly destroy aggregation state if the stream is terminated
|
||||
~AggregateDescription()
|
||||
{
|
||||
destroyState();
|
||||
}
|
||||
|
||||
AggregateDescription() = default;
|
||||
AggregateDescription(AggregateDescription &&) = default;
|
||||
AggregateDescription(const AggregateDescription &) = delete;
|
||||
};
|
||||
};
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user