mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-16 11:22:12 +00:00
Add SummingSortedTransform [part 4]
This commit is contained in:
parent
2ecbf0b0bb
commit
f4e4aeda7f
@ -29,6 +29,8 @@ CollapsingSortedTransform::CollapsingSortedTransform(
|
|||||||
, description(std::move(description_))
|
, description(std::move(description_))
|
||||||
, sign_column_number(header.getPositionByName(sign_column))
|
, sign_column_number(header.getPositionByName(sign_column))
|
||||||
, out_row_sources_buf(out_row_sources_buf_)
|
, out_row_sources_buf(out_row_sources_buf_)
|
||||||
|
, source_chunks(num_inputs)
|
||||||
|
, cursors(num_inputs)
|
||||||
, chunk_allocator(num_inputs + max_row_refs)
|
, chunk_allocator(num_inputs + max_row_refs)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
@ -19,6 +19,8 @@ ReplacingSortedTransform::ReplacingSortedTransform(
|
|||||||
, merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size)
|
, merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size)
|
||||||
, description(std::move(description_))
|
, description(std::move(description_))
|
||||||
, out_row_sources_buf(out_row_sources_buf_)
|
, out_row_sources_buf(out_row_sources_buf_)
|
||||||
|
, source_chunks(num_inputs)
|
||||||
|
, cursors(num_inputs)
|
||||||
, chunk_allocator(num_inputs + max_row_refs)
|
, chunk_allocator(num_inputs + max_row_refs)
|
||||||
{
|
{
|
||||||
if (!version_column.empty())
|
if (!version_column.empty())
|
||||||
|
@ -1,14 +1,23 @@
|
|||||||
#include <Processors/Merges/SummingSortedTransform.h>
|
#include <Processors/Merges/SummingSortedTransform.h>
|
||||||
|
|
||||||
#include <DataTypes/DataTypeArray.h>
|
#include <DataTypes/DataTypeArray.h>
|
||||||
#include <DataTypes/NestedUtils.h>
|
#include <DataTypes/NestedUtils.h>
|
||||||
#include <Common/StringUtils/StringUtils.h>
|
#include <Columns/ColumnAggregateFunction.h>
|
||||||
#include <Core/Row.h>
|
|
||||||
#include <Common/FieldVisitors.h>
|
|
||||||
#include <Columns/ColumnTuple.h>
|
#include <Columns/ColumnTuple.h>
|
||||||
|
#include <Common/StringUtils/StringUtils.h>
|
||||||
|
#include <Common/FieldVisitors.h>
|
||||||
|
#include <Core/Row.h>
|
||||||
|
#include <IO/WriteHelpers.h>
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
|
namespace ErrorCodes
|
||||||
|
{
|
||||||
|
extern const int LOGICAL_ERROR;
|
||||||
|
extern const int CORRUPTED_DATA;
|
||||||
|
}
|
||||||
|
|
||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
bool isInPrimaryKey(const SortDescription & description, const std::string & name, const size_t number)
|
bool isInPrimaryKey(const SortDescription & description, const std::string & name, const size_t number)
|
||||||
@ -159,7 +168,7 @@ namespace
|
|||||||
std::find(column_names_to_sum.begin(), column_names_to_sum.end(), column.name))
|
std::find(column_names_to_sum.begin(), column_names_to_sum.end(), column.name))
|
||||||
{
|
{
|
||||||
// Create aggregator to sum this column
|
// Create aggregator to sum this column
|
||||||
SummingSortedTransform::AggregateDescription desc;
|
detail::AggregateDescription desc;
|
||||||
desc.is_agg_func_type = is_agg_func;
|
desc.is_agg_func_type = is_agg_func;
|
||||||
desc.column_numbers = {i};
|
desc.column_numbers = {i};
|
||||||
|
|
||||||
@ -202,7 +211,7 @@ namespace
|
|||||||
}
|
}
|
||||||
|
|
||||||
DataTypes argument_types;
|
DataTypes argument_types;
|
||||||
SummingSortedTransform::AggregateDescription desc;
|
detail::AggregateDescription desc;
|
||||||
SummingSortedTransform::MapDescription map_desc;
|
SummingSortedTransform::MapDescription map_desc;
|
||||||
|
|
||||||
column_num_it = map.second.begin();
|
column_num_it = map.second.begin();
|
||||||
@ -323,20 +332,52 @@ namespace
|
|||||||
|
|
||||||
chunk.setColumns(std::move(res_columns), num_rows);
|
chunk.setColumns(std::move(res_columns), num_rows);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void setRow(Row & row, SortCursor & cursor, const Block & header)
|
||||||
|
{
|
||||||
|
size_t num_columns = row.size();
|
||||||
|
for (size_t i = 0; i < num_columns; ++i)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
cursor->all_columns[i]->get(cursor->pos, row[i]);
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||||
|
|
||||||
|
/// Find out the name of the column and throw more informative exception.
|
||||||
|
|
||||||
|
String column_name;
|
||||||
|
if (i < header.columns())
|
||||||
|
{
|
||||||
|
column_name = header.safeGetByPosition(i).name;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
throw Exception("MergingSortedBlockInputStream failed to read row " + toString(cursor->pos)
|
||||||
|
+ " of column " + toString(i) + (column_name.empty() ? "" : " (" + column_name + ")"),
|
||||||
|
ErrorCodes::CORRUPTED_DATA);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SummingSortedTransform::SummingSortedTransform(
|
SummingSortedTransform::SummingSortedTransform(
|
||||||
size_t num_inputs, const Block & header,
|
size_t num_inputs, const Block & header,
|
||||||
SortDescription description_,
|
SortDescription description_,
|
||||||
/// List of columns to be summed. If empty, all numeric columns that are not in the description are taken.
|
/// List of columns to be summed. If empty, all numeric columns that are not in the description are taken.
|
||||||
const Names & column_names_to_sum,
|
const Names & column_names_to_sum,
|
||||||
size_t max_block_size)
|
size_t max_block_size)
|
||||||
: IMergingTransform(num_inputs, header, header, true)
|
: IMergingTransform(num_inputs, header, header, true)
|
||||||
, columns_definition(defineColumns(header, description_, column_names_to_sum))
|
, columns_definition(defineColumns(header, description_, column_names_to_sum))
|
||||||
, merged_data(getMergedDataColumns(header, columns_definition), false, max_block_size)
|
, merged_data(getMergedDataColumns(header, columns_definition), false, max_block_size)
|
||||||
|
, description(std::move(description_))
|
||||||
|
, source_chunks(num_inputs)
|
||||||
|
, cursors(num_inputs)
|
||||||
{
|
{
|
||||||
size_t num_columns = header.columns();
|
current_row.resize(header.columns());
|
||||||
current_row.resize(num_columns);
|
merged_data.initAggregateDescription(columns_definition.columns_to_aggregate);
|
||||||
}
|
}
|
||||||
|
|
||||||
void SummingSortedTransform::initializeInputs()
|
void SummingSortedTransform::initializeInputs()
|
||||||
@ -389,7 +430,103 @@ void SummingSortedTransform::work()
|
|||||||
prepareOutputChunk(merged_data);
|
prepareOutputChunk(merged_data);
|
||||||
|
|
||||||
if (has_output_chunk)
|
if (has_output_chunk)
|
||||||
|
{
|
||||||
finalizeChunk(output_chunk, getOutputs().back().getHeader().columns(), columns_definition);
|
finalizeChunk(output_chunk, getOutputs().back().getHeader().columns(), columns_definition);
|
||||||
|
merged_data.initAggregateDescription(columns_definition.columns_to_aggregate);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void SummingSortedTransform::insertCurrentRowIfNeeded()
|
||||||
|
{
|
||||||
|
/// We have nothing to aggregate. It means that it could be non-zero, because we have columns_not_to_aggregate.
|
||||||
|
if (columns_definition.columns_to_aggregate.empty())
|
||||||
|
current_row_is_zero = false;
|
||||||
|
|
||||||
|
for (auto & desc : columns_definition.columns_to_aggregate)
|
||||||
|
{
|
||||||
|
// Do not insert if the aggregation state hasn't been created
|
||||||
|
if (desc.created)
|
||||||
|
{
|
||||||
|
if (desc.is_agg_func_type)
|
||||||
|
{
|
||||||
|
current_row_is_zero = false;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
desc.function->insertResultInto(desc.state.data(), *desc.merged_column);
|
||||||
|
|
||||||
|
/// Update zero status of current row
|
||||||
|
if (desc.column_numbers.size() == 1)
|
||||||
|
{
|
||||||
|
// Flag row as non-empty if at least one column number if non-zero
|
||||||
|
current_row_is_zero = current_row_is_zero && desc.merged_column->isDefaultAt(desc.merged_column->size() - 1);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/// It is sumMapWithOverflow aggregate function.
|
||||||
|
/// Assume that the row isn't empty in this case (just because it is compatible with previous version)
|
||||||
|
current_row_is_zero = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
desc.destroyState();
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
desc.destroyState();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
desc.merged_column->insertDefault();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// If it is "zero" row, then rollback the insertion
|
||||||
|
/// (at this moment we need rollback only cols from columns_to_aggregate)
|
||||||
|
if (current_row_is_zero)
|
||||||
|
{
|
||||||
|
for (auto & desc : columns_definition.columns_to_aggregate)
|
||||||
|
desc.merged_column->popBack(1);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
merged_data.insertRow(current_row, columns_definition.column_numbers_not_to_aggregate);
|
||||||
|
}
|
||||||
|
|
||||||
|
void SummingSortedTransform::addRow(SortCursor & cursor)
|
||||||
|
{
|
||||||
|
for (auto & desc : columns_definition.columns_to_aggregate)
|
||||||
|
{
|
||||||
|
if (!desc.created)
|
||||||
|
throw Exception("Logical error in SummingSortedBlockInputStream, there are no description", ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
|
if (desc.is_agg_func_type)
|
||||||
|
{
|
||||||
|
// desc.state is not used for AggregateFunction types
|
||||||
|
auto & col = cursor->all_columns[desc.column_numbers[0]];
|
||||||
|
assert_cast<ColumnAggregateFunction &>(*desc.merged_column).insertMergeFrom(*col, cursor->pos);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Specialized case for unary functions
|
||||||
|
if (desc.column_numbers.size() == 1)
|
||||||
|
{
|
||||||
|
auto & col = cursor->all_columns[desc.column_numbers[0]];
|
||||||
|
desc.add_function(desc.function.get(), desc.state.data(), &col, cursor->pos, nullptr);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Gather all source columns into a vector
|
||||||
|
ColumnRawPtrs columns(desc.column_numbers.size());
|
||||||
|
for (size_t i = 0; i < desc.column_numbers.size(); ++i)
|
||||||
|
columns[i] = cursor->all_columns[desc.column_numbers[i]];
|
||||||
|
|
||||||
|
desc.add_function(desc.function.get(), desc.state.data(), columns.data(), cursor->pos, nullptr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void SummingSortedTransform::merge()
|
void SummingSortedTransform::merge()
|
||||||
@ -403,7 +540,7 @@ void SummingSortedTransform::merge()
|
|||||||
SortCursor current = queue.current();
|
SortCursor current = queue.current();
|
||||||
|
|
||||||
{
|
{
|
||||||
RowRef current_key;
|
detail::RowRef current_key;
|
||||||
current_key.set(current);
|
current_key.set(current);
|
||||||
|
|
||||||
if (!has_previous_group) /// The first key encountered.
|
if (!has_previous_group) /// The first key encountered.
|
||||||
@ -431,7 +568,7 @@ void SummingSortedTransform::merge()
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
setRow(current_row, current);
|
setRow(current_row, current, getInputs().front().getHeader());
|
||||||
|
|
||||||
/// Reset aggregation states for next row
|
/// Reset aggregation states for next row
|
||||||
for (auto & desc : columns_definition.columns_to_aggregate)
|
for (auto & desc : columns_definition.columns_to_aggregate)
|
||||||
|
@ -13,20 +13,9 @@
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
|
namespace detail
|
||||||
|
|
||||||
class SummingSortedTransform : public IMergingTransform
|
|
||||||
{
|
{
|
||||||
public:
|
/// Stores aggregation function, state, and columns to be used as function arguments.
|
||||||
|
|
||||||
SummingSortedTransform(
|
|
||||||
size_t num_inputs, const Block & header,
|
|
||||||
SortDescription description_,
|
|
||||||
/// List of columns to be summed. If empty, all numeric columns that are not in the description are taken.
|
|
||||||
const Names & column_names_to_sum,
|
|
||||||
size_t max_block_size);
|
|
||||||
|
|
||||||
/// Stores aggregation function, state, and columns to be used as function arguments
|
|
||||||
struct AggregateDescription
|
struct AggregateDescription
|
||||||
{
|
{
|
||||||
/// An aggregate function 'sumWithOverflow' or 'sumMapWithOverflow' for summing.
|
/// An aggregate function 'sumWithOverflow' or 'sumMapWithOverflow' for summing.
|
||||||
@ -78,57 +67,31 @@ public:
|
|||||||
AggregateDescription(const AggregateDescription &) = delete;
|
AggregateDescription(const AggregateDescription &) = delete;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// Specialization for SummingSortedTransform. Inserts only data for non-aggregated columns.
|
||||||
struct SummingMergedData : public MergedData
|
struct SummingMergedData : public MergedData
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
using MergedData::MergedData;
|
using MergedData::MergedData;
|
||||||
|
|
||||||
|
void insertRow(const Row & row, const ColumnNumbers & column_numbers)
|
||||||
|
{
|
||||||
|
for (auto column_number :column_numbers)
|
||||||
|
columns[column_number]->insert(row[column_number]);
|
||||||
|
|
||||||
|
++total_merged_rows;
|
||||||
|
++merged_rows;
|
||||||
|
/// TODO: sum_blocks_granularity += block_size;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Initialize aggregate descriptions with columns.
|
||||||
|
void initAggregateDescription(std::vector<AggregateDescription> & columns_to_aggregate)
|
||||||
|
{
|
||||||
|
size_t num_columns = columns_to_aggregate.size();
|
||||||
|
for (size_t column_number = 0; column_number < num_columns; ++column_number)
|
||||||
|
columns_to_aggregate[column_number].merged_column = columns[column_number].get();
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Stores numbers of key-columns and value-columns.
|
|
||||||
struct MapDescription
|
|
||||||
{
|
|
||||||
std::vector<size_t> key_col_nums;
|
|
||||||
std::vector<size_t> val_col_nums;
|
|
||||||
};
|
|
||||||
|
|
||||||
struct ColumnsDefinition
|
|
||||||
{
|
|
||||||
/// Columns with which values should be summed.
|
|
||||||
ColumnNumbers column_numbers_not_to_aggregate;
|
|
||||||
/// Columns which should be aggregated.
|
|
||||||
std::vector<AggregateDescription> columns_to_aggregate;
|
|
||||||
/// Mapping for nested columns.
|
|
||||||
std::vector<MapDescription> maps_to_sum;
|
|
||||||
|
|
||||||
size_t getNumColumns() const { return column_numbers_not_to_aggregate.size() + columns_to_aggregate.size(); }
|
|
||||||
};
|
|
||||||
|
|
||||||
String getName() const override { return "SummingSortedTransform"; }
|
|
||||||
void work() override;
|
|
||||||
|
|
||||||
protected:
|
|
||||||
void initializeInputs() override;
|
|
||||||
void consume(Chunk chunk, size_t input_number) override;
|
|
||||||
|
|
||||||
private:
|
|
||||||
Row current_row;
|
|
||||||
bool current_row_is_zero = true; /// Are all summed columns zero (or empty)? It is updated incrementally.
|
|
||||||
|
|
||||||
ColumnsDefinition columns_definition;
|
|
||||||
SummingMergedData merged_data;
|
|
||||||
|
|
||||||
SortDescription description;
|
|
||||||
|
|
||||||
/// Chunks currently being merged.
|
|
||||||
std::vector<Chunk> source_chunks;
|
|
||||||
SortCursorImpls cursors;
|
|
||||||
|
|
||||||
/// In merging algorithm, we need to compare current sort key with the last one.
|
|
||||||
/// So, sorting columns for last row needed to be stored.
|
|
||||||
/// In order to do it, we extend lifetime of last chunk and it's sort columns (from corresponding sort cursor).
|
|
||||||
Chunk last_chunk;
|
|
||||||
ColumnRawPtrs last_chunk_sort_columns; /// Point to last_chunk if valid.
|
|
||||||
|
|
||||||
struct RowRef
|
struct RowRef
|
||||||
{
|
{
|
||||||
ColumnRawPtrs * sort_columns = nullptr; /// Point to sort_columns from SortCursor or last_chunk_sort_columns.
|
ColumnRawPtrs * sort_columns = nullptr; /// Point to sort_columns from SortCursor or last_chunk_sort_columns.
|
||||||
@ -158,15 +121,73 @@ private:
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
}
|
||||||
|
|
||||||
RowRef last_key;
|
class SummingSortedTransform : public IMergingTransform
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
|
||||||
|
SummingSortedTransform(
|
||||||
|
size_t num_inputs, const Block & header,
|
||||||
|
SortDescription description_,
|
||||||
|
/// List of columns to be summed. If empty, all numeric columns that are not in the description are taken.
|
||||||
|
const Names & column_names_to_sum,
|
||||||
|
size_t max_block_size);
|
||||||
|
|
||||||
|
/// Stores numbers of key-columns and value-columns.
|
||||||
|
struct MapDescription
|
||||||
|
{
|
||||||
|
std::vector<size_t> key_col_nums;
|
||||||
|
std::vector<size_t> val_col_nums;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct ColumnsDefinition
|
||||||
|
{
|
||||||
|
/// Columns with which values should be summed.
|
||||||
|
ColumnNumbers column_numbers_not_to_aggregate;
|
||||||
|
/// Columns which should be aggregated.
|
||||||
|
std::vector<detail::AggregateDescription> columns_to_aggregate;
|
||||||
|
/// Mapping for nested columns.
|
||||||
|
std::vector<MapDescription> maps_to_sum;
|
||||||
|
|
||||||
|
size_t getNumColumns() const { return column_numbers_not_to_aggregate.size() + columns_to_aggregate.size(); }
|
||||||
|
};
|
||||||
|
|
||||||
|
String getName() const override { return "SummingSortedTransform"; }
|
||||||
|
void work() override;
|
||||||
|
|
||||||
|
protected:
|
||||||
|
void initializeInputs() override;
|
||||||
|
void consume(Chunk chunk, size_t input_number) override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
Row current_row;
|
||||||
|
bool current_row_is_zero = true; /// Are all summed columns zero (or empty)? It is updated incrementally.
|
||||||
|
|
||||||
|
ColumnsDefinition columns_definition;
|
||||||
|
detail::SummingMergedData merged_data;
|
||||||
|
|
||||||
|
SortDescription description;
|
||||||
|
|
||||||
|
/// Chunks currently being merged.
|
||||||
|
std::vector<Chunk> source_chunks;
|
||||||
|
SortCursorImpls cursors;
|
||||||
|
|
||||||
|
/// In merging algorithm, we need to compare current sort key with the last one.
|
||||||
|
/// So, sorting columns for last row needed to be stored.
|
||||||
|
/// In order to do it, we extend lifetime of last chunk and it's sort columns (from corresponding sort cursor).
|
||||||
|
Chunk last_chunk;
|
||||||
|
ColumnRawPtrs last_chunk_sort_columns; /// Point to last_chunk if valid.
|
||||||
|
|
||||||
|
detail::RowRef last_key;
|
||||||
|
|
||||||
SortingHeap<SortCursor> queue;
|
SortingHeap<SortCursor> queue;
|
||||||
bool is_queue_initialized = false;
|
bool is_queue_initialized = false;
|
||||||
|
|
||||||
void insertRow();
|
|
||||||
void merge();
|
void merge();
|
||||||
void updateCursor(Chunk chunk, size_t source_num);
|
void updateCursor(Chunk chunk, size_t source_num);
|
||||||
|
void addRow(SortCursor & cursor);
|
||||||
|
void insertCurrentRowIfNeeded();
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,8 @@ VersionedCollapsingTransform::VersionedCollapsingTransform(
|
|||||||
, merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size)
|
, merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size)
|
||||||
, description(std::move(description_))
|
, description(std::move(description_))
|
||||||
, out_row_sources_buf(out_row_sources_buf_)
|
, out_row_sources_buf(out_row_sources_buf_)
|
||||||
|
, source_chunks(num_inputs)
|
||||||
|
, cursors(num_inputs)
|
||||||
, max_rows_in_queue(MAX_ROWS_IN_MULTIVERSION_QUEUE - 1) /// -1 for +1 in FixedSizeDequeWithGaps's internal buffer
|
, max_rows_in_queue(MAX_ROWS_IN_MULTIVERSION_QUEUE - 1) /// -1 for +1 in FixedSizeDequeWithGaps's internal buffer
|
||||||
, current_keys(max_rows_in_queue)
|
, current_keys(max_rows_in_queue)
|
||||||
, chunk_allocator(num_inputs + max_rows_in_queue + 1) /// +1 just in case (for current_row)
|
, chunk_allocator(num_inputs + max_rows_in_queue + 1) /// +1 just in case (for current_row)
|
||||||
|
Loading…
Reference in New Issue
Block a user