Fix AggregatingSorted for simple aggregate functions.

This commit is contained in:
Nikolai Kochetov 2020-04-06 18:03:38 +03:00
parent fe442d8c9a
commit 71fab516f2
7 changed files with 67 additions and 16 deletions

View File

@ -68,6 +68,24 @@ namespace
return def;
}
MutableColumns getMergedColumns(const Block & header, const AggregatingSortedTransform::ColumnsDefinition & def)
{
MutableColumns columns;
columns.resize(header.columns());
for (auto & desc : def.columns_to_simple_aggregate)
{
auto & type = header.getByPosition(desc.column_number).type;
columns[desc.column_number] = recursiveRemoveLowCardinality(type)->createColumn();
}
for (size_t i = 0; i < columns.size(); ++i)
if (!columns[i])
columns[i] = header.getByPosition(i).type->createColumn();
return columns;
}
}
AggregatingSortedTransform::AggregatingSortedTransform(
@ -75,7 +93,7 @@ AggregatingSortedTransform::AggregatingSortedTransform(
SortDescription description_, size_t max_block_size)
: IMergingTransform(num_inputs, header, header, true)
, columns_definition(defineColumns(header, description_))
, merged_data(header.cloneEmptyColumns(), false, max_block_size)
, merged_data(getMergedColumns(header, columns_definition), false, max_block_size)
, description(std::move(description_))
, source_chunks(num_inputs)
, cursors(num_inputs)
@ -106,7 +124,7 @@ void AggregatingSortedTransform::updateCursor(Chunk chunk, size_t source_num)
column = column->convertToFullColumnIfConst();
for (auto & desc : columns_definition.columns_to_simple_aggregate)
if (desc.type_to_convert)
if (desc.inner_type)
columns[desc.column_number] = recursiveRemoveLowCardinality(columns[desc.column_number]);
chunk.setColumns(std::move(columns), num_rows);
@ -145,10 +163,10 @@ void AggregatingSortedTransform::work()
for (auto & desc : columns_definition.columns_to_simple_aggregate)
{
if (desc.type_to_convert)
if (desc.inner_type)
{
auto & from_type = header.getByPosition(desc.column_number).type;
auto & to_type = desc.type_to_convert;
auto & from_type = desc.inner_type;
auto & to_type = header.getByPosition(desc.column_number).type;
columns[desc.column_number] = recursiveTypeConversion(columns[desc.column_number], from_type, to_type);
}
}

View File

@ -129,13 +129,13 @@ public:
size_t column_number = 0;
IColumn * column = nullptr;
const DataTypePtr type_to_convert;
const DataTypePtr inner_type;
AlignedBuffer state;
bool created = false;
SimpleAggregateDescription(AggregateFunctionPtr function_, const size_t column_number_, DataTypePtr type)
: function(std::move(function_)), column_number(column_number_), type_to_convert(std::move(type))
: function(std::move(function_)), column_number(column_number_), inner_type(std::move(type))
{
add_function = function->getAddressOfAddFunction();
state.reset(function->sizeOfData(), function->alignOfData());

View File

@ -64,14 +64,15 @@ private:
SortingHeap<SortCursor> queue;
bool is_queue_initialized = false;
/// Allocator must be destroyed after all RowRefs.
detail::SharedChunkAllocator chunk_allocator;
using RowRef = detail::RowRefWithOwnedChunk;
static constexpr size_t max_row_refs = 4; /// first_negative, last_positive, last, current.
RowRef first_negative_row;
RowRef last_positive_row;
RowRef last_row;
detail::SharedChunkAllocator chunk_allocator;
size_t count_positive = 0; /// The number of positive rows for the current primary key.
size_t count_negative = 0; /// The number of negative rows for the current primary key.
bool last_is_positive = false; /// true if the last row for the current primary key is positive.

View File

@ -237,12 +237,13 @@ private:
/// Path name of current bucket
StringRef current_group_path;
/// Allocator must be destroyed after all RowRefs.
detail::SharedChunkAllocator chunk_allocator;
static constexpr size_t max_row_refs = 2; /// current_subgroup_newest_row, current_row.
/// Last row with maximum version for current primary key (time bucket).
RowRef current_subgroup_newest_row;
detail::SharedChunkAllocator chunk_allocator;
/// Time of last read row
time_t current_time = 0;
time_t current_time_rounded = 0;

View File

@ -50,6 +50,9 @@ private:
SortingHeap<SortCursor> queue;
bool is_queue_initialized = false;
/// Allocator must be destroyed after all RowRefs.
detail::SharedChunkAllocator chunk_allocator;
using RowRef = detail::RowRefWithOwnedChunk;
static constexpr size_t max_row_refs = 3; /// last, current, selected.
RowRef last_row;
@ -57,8 +60,6 @@ private:
RowRef selected_row; /// Last row with maximum version for current primary key.
size_t max_pos = 0; /// The position (into current_row_sources) of the row with the highest version.
detail::SharedChunkAllocator chunk_allocator;
/// Sources of rows with the current primary key.
PODArray<RowSourcePart> current_row_sources;

View File

@ -3,6 +3,8 @@
#include <Processors/Chunk.h>
#include <Columns/IColumn.h>
#include <Core/SortCursor.h>
#include <Common/StackTrace.h>
#include <common/logger_useful.h>
#include <boost/smart_ptr/intrusive_ptr.hpp>
@ -51,6 +53,9 @@ class SharedChunkAllocator
public:
explicit SharedChunkAllocator(size_t max_chunks)
{
if (max_chunks == 0)
max_chunks = 1;
chunks.resize(max_chunks);
free_chunks.reserve(max_chunks);
@ -74,12 +79,36 @@ public:
return SharedChunkPtr(&chunks[pos]);
}
~SharedChunkAllocator()
{
if (free_chunks.size() != chunks.size())
{
LOG_ERROR(&Logger::get("SharedChunkAllocator"),
"SharedChunkAllocator was destroyed before RowRef was released. StackTrace: "
<< StackTrace().toString());
return;
}
}
private:
std::vector<SharedChunk> chunks;
std::vector<size_t> free_chunks;
void release(SharedChunk * ptr)
void release(SharedChunk * ptr) noexcept
{
if (chunks.empty())
{
/// This may happen if allocator was removed before chunks.
/// Log message and exit, because we don't want to throw exception in destructor.
LOG_ERROR(&Logger::get("SharedChunkAllocator"),
"SharedChunkAllocator was destroyed before RowRef was released. StackTrace: "
<< StackTrace().toString());
return;
}
/// Release memory. It is not obligatory.
ptr->clear();
ptr->all_columns.clear();

View File

@ -53,14 +53,15 @@ private:
SortingHeap<SortCursor> queue;
bool is_queue_initialized = false;
/// Allocator must be destroyed after all RowRefs.
detail::SharedChunkAllocator chunk_allocator;
using RowRef = detail::RowRefWithOwnedChunk;
const size_t max_rows_in_queue;
/// Rows with the same primary key and sign.
FixedSizeDequeWithGaps<RowRef> current_keys;
Int8 sign_in_queue = 0;
detail::SharedChunkAllocator chunk_allocator;
std::queue<RowSourcePart> current_row_sources; /// Sources of rows with the current primary key
void insertGap(size_t gap_size);