Add SummingSortedTransform [part 2]

This commit is contained in:
Nikolai Kochetov 2020-03-31 22:58:27 +03:00
parent 5f5eb58abe
commit f3743552ce
7 changed files with 328 additions and 250 deletions

View File

@ -25,7 +25,7 @@ CollapsingSortedTransform::CollapsingSortedTransform(
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes)
: IMergingTransform(num_inputs, header, header, true)
, merged_data(header, use_average_block_sizes, max_block_size)
, merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size)
, description(std::move(description_))
, sign_column_number(header.getPositionByName(sign_column))
, out_row_sources_buf(out_row_sources_buf_)

View File

@ -13,12 +13,9 @@ namespace ErrorCodes
class MergedData
{
public:
explicit MergedData(const Block & header, bool use_average_block_size_, UInt64 max_block_size_)
: max_block_size(max_block_size_), use_average_block_size(use_average_block_size_)
explicit MergedData(MutableColumns columns_, bool use_average_block_size_, UInt64 max_block_size_)
: columns(std::move(columns_)), max_block_size(max_block_size_), use_average_block_size(use_average_block_size_)
{
columns.reserve(header.columns());
for (const auto & column : header)
columns.emplace_back(column.type->createColumn());
}
/// Pull will be called at next prepare call.

View File

@ -24,7 +24,7 @@ MergingSortedTransform::MergingSortedTransform(
bool use_average_block_sizes,
bool have_all_inputs_)
: IMergingTransform(num_inputs, header, header, have_all_inputs_)
, merged_data(header, use_average_block_sizes, max_block_size)
, merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size)
, description(std::move(description_))
, limit(limit_)
, quiet(quiet_)

View File

@ -16,7 +16,7 @@ ReplacingSortedTransform::ReplacingSortedTransform(
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes)
: IMergingTransform(num_inputs, header, header, true)
, merged_data(header, use_average_block_sizes, max_block_size)
, merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size)
, description(std::move(description_))
, out_row_sources_buf(out_row_sources_buf_)
, chunk_allocator(num_inputs + max_row_refs)

View File

@ -4,261 +4,302 @@
#include <Common/StringUtils/StringUtils.h>
#include <Core/Row.h>
#include <Common/FieldVisitors.h>
#include <Columns/ColumnTuple.h>
namespace DB
{
namespace
{
bool isInPrimaryKey(const SortDescription & description, const std::string & name, const size_t number)
{
for (auto & desc : description)
if (desc.column_name == name || (desc.column_name.empty() && desc.column_number == number))
return true;
return false;
}
/// Returns true if merge result is not empty
bool mergeMap(const SummingSortedTransform::MapDescription & desc, Row & row, SortCursor & cursor)
{
/// Strongly non-optimal.
Row & left = row;
Row right(left.size());
for (size_t col_num : desc.key_col_nums)
right[col_num] = (*cursor->all_columns[col_num])[cursor->pos].template get<Array>();
for (size_t col_num : desc.val_col_nums)
right[col_num] = (*cursor->all_columns[col_num])[cursor->pos].template get<Array>();
auto at_ith_column_jth_row = [&](const Row & matrix, size_t i, size_t j) -> const Field &
bool isInPrimaryKey(const SortDescription & description, const std::string & name, const size_t number)
{
return matrix[i].get<Array>()[j];
};
for (auto & desc : description)
if (desc.column_name == name || (desc.column_name.empty() && desc.column_number == number))
return true;
auto tuple_of_nth_columns_at_jth_row = [&](const Row & matrix, const ColumnNumbers & col_nums, size_t j) -> Array
{
size_t size = col_nums.size();
Array res(size);
for (size_t col_num_index = 0; col_num_index < size; ++col_num_index)
res[col_num_index] = at_ith_column_jth_row(matrix, col_nums[col_num_index], j);
return res;
};
std::map<Array, Array> merged;
auto accumulate = [](Array & dst, const Array & src)
{
bool has_non_zero = false;
size_t size = dst.size();
for (size_t i = 0; i < size; ++i)
if (applyVisitor(FieldVisitorSum(src[i]), dst[i]))
has_non_zero = true;
return has_non_zero;
};
auto merge = [&](const Row & matrix)
{
size_t rows = matrix[desc.key_col_nums[0]].get<Array>().size();
for (size_t j = 0; j < rows; ++j)
{
Array key = tuple_of_nth_columns_at_jth_row(matrix, desc.key_col_nums, j);
Array value = tuple_of_nth_columns_at_jth_row(matrix, desc.val_col_nums, j);
auto it = merged.find(key);
if (merged.end() == it)
merged.emplace(std::move(key), std::move(value));
else
{
if (!accumulate(it->second, value))
merged.erase(it);
}
}
};
merge(left);
merge(right);
for (size_t col_num : desc.key_col_nums)
row[col_num] = Array(merged.size());
for (size_t col_num : desc.val_col_nums)
row[col_num] = Array(merged.size());
size_t row_num = 0;
for (const auto & key_value : merged)
{
for (size_t col_num_index = 0, size = desc.key_col_nums.size(); col_num_index < size; ++col_num_index)
row[desc.key_col_nums[col_num_index]].get<Array>()[row_num] = key_value.first[col_num_index];
for (size_t col_num_index = 0, size = desc.val_col_nums.size(); col_num_index < size; ++col_num_index)
row[desc.val_col_nums[col_num_index]].get<Array>()[row_num] = key_value.second[col_num_index];
++row_num;
return false;
}
return row_num != 0;
}
/// Returns true if merge result is not empty
bool mergeMap(const SummingSortedTransform::MapDescription & desc, Row & row, SortCursor & cursor)
{
/// Strongly non-optimal.
Row & left = row;
Row right(left.size());
for (size_t col_num : desc.key_col_nums)
right[col_num] = (*cursor->all_columns[col_num])[cursor->pos].template get<Array>();
for (size_t col_num : desc.val_col_nums)
right[col_num] = (*cursor->all_columns[col_num])[cursor->pos].template get<Array>();
auto at_ith_column_jth_row = [&](const Row & matrix, size_t i, size_t j) -> const Field &
{
return matrix[i].get<Array>()[j];
};
auto tuple_of_nth_columns_at_jth_row = [&](const Row & matrix, const ColumnNumbers & col_nums, size_t j) -> Array
{
size_t size = col_nums.size();
Array res(size);
for (size_t col_num_index = 0; col_num_index < size; ++col_num_index)
res[col_num_index] = at_ith_column_jth_row(matrix, col_nums[col_num_index], j);
return res;
};
std::map<Array, Array> merged;
auto accumulate = [](Array & dst, const Array & src)
{
bool has_non_zero = false;
size_t size = dst.size();
for (size_t i = 0; i < size; ++i)
if (applyVisitor(FieldVisitorSum(src[i]), dst[i]))
has_non_zero = true;
return has_non_zero;
};
auto merge = [&](const Row & matrix)
{
size_t rows = matrix[desc.key_col_nums[0]].get<Array>().size();
for (size_t j = 0; j < rows; ++j)
{
Array key = tuple_of_nth_columns_at_jth_row(matrix, desc.key_col_nums, j);
Array value = tuple_of_nth_columns_at_jth_row(matrix, desc.val_col_nums, j);
auto it = merged.find(key);
if (merged.end() == it)
merged.emplace(std::move(key), std::move(value));
else
{
if (!accumulate(it->second, value))
merged.erase(it);
}
}
};
merge(left);
merge(right);
for (size_t col_num : desc.key_col_nums)
row[col_num] = Array(merged.size());
for (size_t col_num : desc.val_col_nums)
row[col_num] = Array(merged.size());
size_t row_num = 0;
for (const auto & key_value : merged)
{
for (size_t col_num_index = 0, size = desc.key_col_nums.size(); col_num_index < size; ++col_num_index)
row[desc.key_col_nums[col_num_index]].get<Array>()[row_num] = key_value.first[col_num_index];
for (size_t col_num_index = 0, size = desc.val_col_nums.size(); col_num_index < size; ++col_num_index)
row[desc.val_col_nums[col_num_index]].get<Array>()[row_num] = key_value.second[col_num_index];
++row_num;
}
return row_num != 0;
}
SummingSortedTransform::ColumnsDefinition defineColumns(
const Block & header,
const SortDescription & description,
const Names & column_names_to_sum)
{
size_t num_columns = header.columns();
SummingSortedTransform::ColumnsDefinition def;
/// name of nested structure -> the column numbers that refer to it.
std::unordered_map<std::string, std::vector<size_t>> discovered_maps;
/** Fill in the column numbers, which must be summed.
* This can only be numeric columns that are not part of the sort key.
* If a non-empty column_names_to_sum is specified, then we only take these columns.
* Some columns from column_names_to_sum may not be found. This is ignored.
*/
for (size_t i = 0; i < num_columns; ++i)
{
const ColumnWithTypeAndName & column = header.safeGetByPosition(i);
/// Discover nested Maps and find columns for summation
if (typeid_cast<const DataTypeArray *>(column.type.get()))
{
const auto map_name = Nested::extractTableName(column.name);
/// if nested table name ends with `Map` it is a possible candidate for special handling
if (map_name == column.name || !endsWith(map_name, "Map"))
{
def.column_numbers_not_to_aggregate.push_back(i);
continue;
}
discovered_maps[map_name].emplace_back(i);
}
else
{
bool is_agg_func = WhichDataType(column.type).isAggregateFunction();
/// There are special const columns for example after prewhere sections.
if ((!column.type->isSummable() && !is_agg_func) || isColumnConst(*column.column))
{
def.column_numbers_not_to_aggregate.push_back(i);
continue;
}
/// Are they inside the PK?
if (isInPrimaryKey(description, column.name, i))
{
def.column_numbers_not_to_aggregate.push_back(i);
continue;
}
if (column_names_to_sum.empty()
|| column_names_to_sum.end() !=
std::find(column_names_to_sum.begin(), column_names_to_sum.end(), column.name))
{
// Create aggregator to sum this column
SummingSortedTransform::AggregateDescription desc;
desc.is_agg_func_type = is_agg_func;
desc.column_numbers = {i};
if (!is_agg_func)
{
desc.init("sumWithOverflow", {column.type});
}
def.columns_to_aggregate.emplace_back(std::move(desc));
}
else
{
// Column is not going to be summed, use last value
def.column_numbers_not_to_aggregate.push_back(i);
}
}
}
/// select actual nested Maps from list of candidates
for (const auto & map : discovered_maps)
{
/// map should contain at least two elements (key -> value)
if (map.second.size() < 2)
{
for (auto col : map.second)
def.column_numbers_not_to_aggregate.push_back(col);
continue;
}
/// no elements of map could be in primary key
auto column_num_it = map.second.begin();
for (; column_num_it != map.second.end(); ++column_num_it)
if (isInPrimaryKey(description, header.safeGetByPosition(*column_num_it).name, *column_num_it))
break;
if (column_num_it != map.second.end())
{
for (auto col : map.second)
def.column_numbers_not_to_aggregate.push_back(col);
continue;
}
DataTypes argument_types;
SummingSortedTransform::AggregateDescription desc;
SummingSortedTransform::MapDescription map_desc;
column_num_it = map.second.begin();
for (; column_num_it != map.second.end(); ++column_num_it)
{
const ColumnWithTypeAndName & key_col = header.safeGetByPosition(*column_num_it);
const String & name = key_col.name;
const IDataType & nested_type = *assert_cast<const DataTypeArray &>(*key_col.type).getNestedType();
if (column_num_it == map.second.begin()
|| endsWith(name, "ID")
|| endsWith(name, "Key")
|| endsWith(name, "Type"))
{
if (!nested_type.isValueRepresentedByInteger() && !isStringOrFixedString(nested_type))
break;
map_desc.key_col_nums.push_back(*column_num_it);
}
else
{
if (!nested_type.isSummable())
break;
map_desc.val_col_nums.push_back(*column_num_it);
}
// Add column to function arguments
desc.column_numbers.push_back(*column_num_it);
argument_types.push_back(key_col.type);
}
if (column_num_it != map.second.end())
{
for (auto col : map.second)
def.column_numbers_not_to_aggregate.push_back(col);
continue;
}
if (map_desc.key_col_nums.size() == 1)
{
// Create summation for all value columns in the map
desc.init("sumMapWithOverflow", argument_types);
def.columns_to_aggregate.emplace_back(std::move(desc));
}
else
{
// Fall back to legacy mergeMaps for composite keys
for (auto col : map.second)
def.column_numbers_not_to_aggregate.push_back(col);
def.maps_to_sum.emplace_back(std::move(map_desc));
}
}
}
MutableColumns getMergedDataColumns(
const Block & header,
const SummingSortedTransform::ColumnsDefinition & columns_definition)
{
MutableColumns columns;
columns.reserve(columns_definition.getNumColumns());
for (auto & desc : columns_definition.columns_to_aggregate)
{
// Wrap aggregated columns in a tuple to match function signature
if (!desc.is_agg_func_type && isTuple(desc.function->getReturnType()))
{
size_t tuple_size = desc.column_numbers.size();
MutableColumns tuple_columns(tuple_size);
for (size_t i = 0; i < tuple_size; ++i)
tuple_columns[i] = header.safeGetByPosition(desc.column_numbers[i]).column->cloneEmpty();
columns.emplace_back(ColumnTuple::create(std::move(tuple_columns)));
}
else
columns.emplace_back(header.safeGetByPosition(desc.column_numbers[0]).column->cloneEmpty());
}
for (auto & column_number : columns_definition.column_numbers_not_to_aggregate)
columns.emplace_back(header.safeGetByPosition(column_number).type->createColumn());
return columns;
}
}
SummingSortedTransform::SummingSortedTransform(
size_t num_inputs, const Block & header,
SortDescription description,
SortDescription description_,
/// List of columns to be summed. If empty, all numeric columns that are not in the description are taken.
const Names & column_names_to_sum,
size_t max_block_size)
: IMergingTransform(num_inputs, header, header, true)
, columns_definition(defineColumns(header, description_, column_names_to_sum))
, merged_data(getMergedDataColumns(header, columns_definition), false, max_block_size)
{
size_t num_columns = header.columns();
current_row.resize(num_columns);
/// name of nested structure -> the column numbers that refer to it.
std::unordered_map<std::string, std::vector<size_t>> discovered_maps;
/** Fill in the column numbers, which must be summed.
* This can only be numeric columns that are not part of the sort key.
* If a non-empty column_names_to_sum is specified, then we only take these columns.
* Some columns from column_names_to_sum may not be found. This is ignored.
*/
for (size_t i = 0; i < num_columns; ++i)
{
const ColumnWithTypeAndName & column = header.safeGetByPosition(i);
/// Discover nested Maps and find columns for summation
if (typeid_cast<const DataTypeArray *>(column.type.get()))
{
const auto map_name = Nested::extractTableName(column.name);
/// if nested table name ends with `Map` it is a possible candidate for special handling
if (map_name == column.name || !endsWith(map_name, "Map"))
{
column_numbers_not_to_aggregate.push_back(i);
continue;
}
discovered_maps[map_name].emplace_back(i);
}
else
{
bool is_agg_func = WhichDataType(column.type).isAggregateFunction();
/// There are special const columns for example after prewere sections.
if ((!column.type->isSummable() && !is_agg_func) || isColumnConst(*column.column))
{
column_numbers_not_to_aggregate.push_back(i);
continue;
}
/// Are they inside the PK?
if (isInPrimaryKey(description, column.name, i))
{
column_numbers_not_to_aggregate.push_back(i);
continue;
}
if (column_names_to_sum.empty()
|| column_names_to_sum.end() !=
std::find(column_names_to_sum.begin(), column_names_to_sum.end(), column.name))
{
// Create aggregator to sum this column
AggregateDescription desc;
desc.is_agg_func_type = is_agg_func;
desc.column_numbers = {i};
if (!is_agg_func)
{
desc.init("sumWithOverflow", {column.type});
}
columns_to_aggregate.emplace_back(std::move(desc));
}
else
{
// Column is not going to be summed, use last value
column_numbers_not_to_aggregate.push_back(i);
}
}
}
/// select actual nested Maps from list of candidates
for (const auto & map : discovered_maps)
{
/// map should contain at least two elements (key -> value)
if (map.second.size() < 2)
{
for (auto col : map.second)
column_numbers_not_to_aggregate.push_back(col);
continue;
}
/// no elements of map could be in primary key
auto column_num_it = map.second.begin();
for (; column_num_it != map.second.end(); ++column_num_it)
if (isInPrimaryKey(description, header.safeGetByPosition(*column_num_it).name, *column_num_it))
break;
if (column_num_it != map.second.end())
{
for (auto col : map.second)
column_numbers_not_to_aggregate.push_back(col);
continue;
}
DataTypes argument_types;
AggregateDescription desc;
MapDescription map_desc;
column_num_it = map.second.begin();
for (; column_num_it != map.second.end(); ++column_num_it)
{
const ColumnWithTypeAndName & key_col = header.safeGetByPosition(*column_num_it);
const String & name = key_col.name;
const IDataType & nested_type = *static_cast<const DataTypeArray *>(key_col.type.get())->getNestedType();
if (column_num_it == map.second.begin()
|| endsWith(name, "ID")
|| endsWith(name, "Key")
|| endsWith(name, "Type"))
{
if (!nested_type.isValueRepresentedByInteger() && !isStringOrFixedString(nested_type))
break;
map_desc.key_col_nums.push_back(*column_num_it);
}
else
{
if (!nested_type.isSummable())
break;
map_desc.val_col_nums.push_back(*column_num_it);
}
// Add column to function arguments
desc.column_numbers.push_back(*column_num_it);
argument_types.push_back(key_col.type);
}
if (column_num_it != map.second.end())
{
for (auto col : map.second)
column_numbers_not_to_aggregate.push_back(col);
continue;
}
if (map_desc.key_col_nums.size() == 1)
{
// Create summation for all value columns in the map
desc.init("sumMapWithOverflow", argument_types);
columns_to_aggregate.emplace_back(std::move(desc));
}
else
{
// Fall back to legacy mergeMaps for composite keys
for (auto col : map.second)
column_numbers_not_to_aggregate.push_back(col);
maps_to_sum.emplace_back(std::move(map_desc));
}
}
}
}

View File

@ -8,6 +8,7 @@
#include <Common/AlignedBuffer.h>
#include <Core/SortDescription.h>
#include <Core/SortCursor.h>
#include <Core/Row.h>
namespace DB
{
@ -20,7 +21,7 @@ public:
SummingSortedTransform(
size_t num_inputs, const Block & header,
SortDescription description,
SortDescription description_,
/// List of columns to be summed. If empty, all numeric columns that are not in the description are taken.
const Names & column_names_to_sum,
size_t max_block_size);
@ -80,7 +81,7 @@ public:
struct SummingMergedData : public MergedData
{
public:
using MergedData::MergedData;
};
/// Stores numbers of key-columns and value-columns.
@ -90,12 +91,51 @@ public:
std::vector<size_t> val_col_nums;
};
private:
/// Columns with which values should be summed.
ColumnNumbers column_numbers_not_to_aggregate;
struct ColumnsDefinition
{
/// Columns with which values should be summed.
ColumnNumbers column_numbers_not_to_aggregate;
/// Columns which should be aggregated.
std::vector<AggregateDescription> columns_to_aggregate;
/// Mapping for nested columns.
std::vector<MapDescription> maps_to_sum;
std::vector<AggregateDescription> columns_to_aggregate;
std::vector<MapDescription> maps_to_sum;
size_t getNumColumns() const { return column_numbers_not_to_aggregate.size() + columns_to_aggregate.size(); }
};
private:
Row current_row;
bool current_row_is_zero = true; /// Are all summed columns zero (or empty)? It is updated incrementally.
ColumnsDefinition columns_definition;
SummingMergedData merged_data;
SortDescription description;
/// Chunks currently being merged.
std::vector<Chunk> source_chunks;
SortCursorImpls cursors;
/// In merging algorithm, we need to compare current sort key with the last one.
/// So, sorting columns for last row needed to be stored.
/// In order to do it, we extend lifetime of last chunk and it's sort columns (from corresponding sort cursor).
Chunk last_chunk;
ColumnRawPtrs last_chunk_sort_columns; /// Point to last_chunk if valid.
struct RowRef
{
ColumnRawPtrs * sort_columns = nullptr; /// Point to sort_columns from SortCursor or last_chunk_sort_columns.
UInt64 row_number = 0;
};
RowRef last_row;
SortingHeap<SortCursor> queue;
bool is_queue_initialized = false;
void insertRow();
void merge();
void updateCursor(Chunk chunk, size_t source_num);
};
}

View File

@ -14,7 +14,7 @@ VersionedCollapsingTransform::VersionedCollapsingTransform(
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes)
: IMergingTransform(num_inputs, header, header, true)
, merged_data(header, use_average_block_sizes, max_block_size)
, merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size)
, description(std::move(description_))
, out_row_sources_buf(out_row_sources_buf_)
, max_rows_in_queue(MAX_ROWS_IN_MULTIVERSION_QUEUE - 1) /// -1 for +1 in FixedSizeDequeWithGaps's internal buffer