From 16f092d87ba9aced84d905bfda8344f6bbdbde39 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 5 Jun 2014 23:52:13 +0400 Subject: [PATCH] dbms: development [#METR-10894]. --- .../DB/Columns/ColumnAggregateFunction.h | 6 +++-- .../DB/Storages/MergeTree/MergeTreeData.h | 1 + dbms/src/Core/Block.cpp | 6 ++--- dbms/src/Interpreters/Aggregator.cpp | 27 ++++++++++--------- 4 files changed, 23 insertions(+), 17 deletions(-) diff --git a/dbms/include/DB/Columns/ColumnAggregateFunction.h b/dbms/include/DB/Columns/ColumnAggregateFunction.h index 835a3e93e9c..fec1cce8be8 100644 --- a/dbms/include/DB/Columns/ColumnAggregateFunction.h +++ b/dbms/include/DB/Columns/ColumnAggregateFunction.h @@ -66,9 +66,11 @@ public: ~ColumnAggregateFunction() { - if (!func->hasTrivialDestructor()) + IAggregateFunction * function = func; + + if (!function->hasTrivialDestructor()) for (size_t i = 0, s = data.size(); i < s; ++i) - func->destroy(data[i]); + function->destroy(data[i]); } std::string getName() const { return "ColumnAggregateFunction"; } diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h index 21992df988d..bb5e08c6e06 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h @@ -50,6 +50,7 @@ namespace DB * пары записей с разными значениями sign_column для одного значения первичного ключа. * (см. CollapsingSortedBlockInputStream.h) * - Summing - при склейке кусков, при совпадении PK суммировать все числовые столбцы, не входящие в PK. + * - Aggregating - при склейке кусков, при совпадении PK, делается слияние состояний столбцов-агрегатных функций. */ /** Этот класс хранит список кусков и параметры структуры данных. diff --git a/dbms/src/Core/Block.cpp b/dbms/src/Core/Block.cpp index 446128783e6..25cb17cd1b1 100644 --- a/dbms/src/Core/Block.cpp +++ b/dbms/src/Core/Block.cpp @@ -23,14 +23,14 @@ void Block::addDefaults(NamesAndTypesListPtr required_columns) { for (NamesAndTypesList::const_iterator it = required_columns->begin(); it != required_columns->end(); ++it) { - if (!this->has(it->first)) + if (!has(it->first)) { ColumnWithNameAndType col; col.name = it->first; col.type = it->second; col.column = dynamic_cast(*it->second->createConstColumn( - this->rows(), it->second->getDefault())).convertToFullColumn(); - this->insert(col); + rows(), it->second->getDefault())).convertToFullColumn(); + insert(col); } } } diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 5fe86634c2b..36533bee9b4 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -341,17 +341,18 @@ void Aggregator::destroyImpl( { for (typename Method::const_iterator it = method.data.begin(); it != method.data.end(); ++it) { - for (size_t i = 0; i < aggregates_size; ++i) - { - char * data = Method::getAggregateData(it->second); + char * data = Method::getAggregateData(it->second); - /** Если исключение (обычно нехватка памяти, кидается MemoryTracker-ом) возникло - * после вставки ключа в хэш-таблицу, но до создания всех состояний агрегатных функций, - * то data будет равен nullptr-у. - */ - if (nullptr != data && !aggregate_functions[i]->isState()) + /** Если исключение (обычно нехватка памяти, кидается MemoryTracker-ом) возникло + * после вставки ключа в хэш-таблицу, но до создания всех состояний агрегатных функций, + * то data будет равен nullptr-у. + */ + if (nullptr == data) + continue; + + for (size_t i = 0; i < aggregates_size; ++i) + if (!aggregate_functions[i]->isState()) aggregate_functions[i]->destroy(data + offsets_of_aggregate_states[i]); - } } } @@ -563,6 +564,7 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants, bool fi { /// Столбец ColumnAggregateFunction захватывает разделяемое владение ареной с состояниями агрегатных функций. ColumnAggregateFunction & column_aggregate_func = static_cast(*column.column); + for (size_t j = 0; j < data_variants.aggregates_pools.size(); ++j) column_aggregate_func.addArena(data_variants.aggregates_pools[j]); } @@ -810,9 +812,10 @@ void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result) { AggregatedDataWithoutKey & res_data = result.without_key; - for (size_t i = 0; i < aggregates_size; ++i) - if (!aggregate_functions[i]->isState()) - aggregate_functions[i]->destroy(res_data + offsets_of_aggregate_states[i]); + if (nullptr != res_data) + for (size_t i = 0; i < aggregates_size; ++i) + if (!aggregate_functions[i]->isState()) + aggregate_functions[i]->destroy(res_data + offsets_of_aggregate_states[i]); } if (result.type == AggregatedDataVariants::KEY_64)