This commit is contained in:
Evgeniy Gatov 2014-05-26 08:48:33 +04:00
commit d6ad0254da
18 changed files with 314 additions and 90 deletions

View File

@ -59,7 +59,7 @@ public:
nested_func->create(place);
}
void destroy(AggregateDataPtr place) const
void destroy(AggregateDataPtr place) const noexcept
{
nested_func->destroy(place);
}

View File

@ -57,7 +57,7 @@ public:
nested_func->create(place);
}
void destroy(AggregateDataPtr place) const
void destroy(AggregateDataPtr place) const noexcept
{
nested_func->destroy(place);
}

View File

@ -59,7 +59,7 @@ public:
nested_func->create(place);
}
void destroy(AggregateDataPtr place) const
void destroy(AggregateDataPtr place) const noexcept
{
nested_func->destroy(place);
}

View File

@ -55,7 +55,7 @@ public:
nested_func->create(place);
}
void destroy(AggregateDataPtr place) const
void destroy(AggregateDataPtr place) const noexcept
{
nested_func->destroy(place);
}

View File

@ -59,7 +59,7 @@ public:
virtual void create(AggregateDataPtr place) const = 0;
/// Уничтожить данные для агрегации.
virtual void destroy(AggregateDataPtr place) const = 0;
virtual void destroy(AggregateDataPtr place) const noexcept = 0;
/// Уничтожать данные не обязательно.
virtual bool hasTrivialDestructor() const = 0;
@ -118,7 +118,7 @@ public:
new (place) Data;
}
void destroy(AggregateDataPtr place) const
void destroy(AggregateDataPtr place) const noexcept
{
data(place).~Data();
}

View File

@ -280,7 +280,7 @@ public:
void getPermutation(bool reverse, size_t limit, Permutation & res) const
{
size_t s = size();
if (limit > s)
if (limit >= s)
limit = 0;
res.resize(s);

View File

@ -122,7 +122,8 @@ public:
bool operator()(size_t lhs, size_t rhs) const
{
/// TODO: memcmp тормозит.
return positive == (0 > memcmp(&parent.chars[lhs * parent.n], &parent.chars[rhs * parent.n], parent.n));
int res = memcmp(&parent.chars[lhs * parent.n], &parent.chars[rhs * parent.n], parent.n);
return positive ? (res < 0) : (res > 0);
}
};
@ -133,7 +134,7 @@ public:
for (size_t i = 0; i < s; ++i)
res[i] = i;
if (limit > s)
if (limit >= s)
limit = 0;
if (limit)

View File

@ -266,9 +266,11 @@ public:
less(const ColumnString & parent_) : parent(parent_) {}
bool operator()(size_t lhs, size_t rhs) const
{
return positive == (0 > strcmp(
int res = strcmp(
reinterpret_cast<const char *>(&parent.chars[parent.offsetAt(lhs)]),
reinterpret_cast<const char *>(&parent.chars[parent.offsetAt(rhs)])));
reinterpret_cast<const char *>(&parent.chars[parent.offsetAt(rhs)]));
return positive ? (res < 0) : (res > 0);
}
};
@ -279,7 +281,7 @@ public:
for (size_t i = 0; i < s; ++i)
res[i] = i;
if (limit > s)
if (limit >= s)
limit = 0;
if (limit)
@ -308,9 +310,11 @@ public:
bool operator()(size_t lhs, size_t rhs) const
{
return positive == (0 > collator.compare(
int res = collator.compare(
reinterpret_cast<const char *>(&parent.chars[parent.offsetAt(lhs)]), parent.sizeAt(lhs),
reinterpret_cast<const char *>(&parent.chars[parent.offsetAt(rhs)]), parent.sizeAt(rhs)));
reinterpret_cast<const char *>(&parent.chars[parent.offsetAt(rhs)]), parent.sizeAt(rhs));
return positive ? (res < 0) : (res > 0);
}
};
@ -322,7 +326,7 @@ public:
for (size_t i = 0; i < s; ++i)
res[i] = i;
if (limit > s)
if (limit >= s)
limit = 0;
if (limit)

View File

@ -182,7 +182,7 @@ public:
for (size_t i = 0; i < rows; ++i)
res[i] = i;
if (limit > rows)
if (limit >= rows)
limit = 0;
if (limit)

View File

@ -149,7 +149,7 @@ public:
for (size_t i = 0; i < s; ++i)
res[i] = i;
if (limit > s)
if (limit >= s)
limit = 0;
if (limit)

View File

@ -470,7 +470,7 @@ protected:
Names key_names;
AggregateDescriptions aggregates;
std::vector<IAggregateFunction *> aggregate_functions;
std::vector<bool> is_final;
std::vector<char> is_final;
size_t keys_size;
size_t aggregates_size;
/// Нужно ли класть в AggregatedDataVariants::without_key агрегаты для ключей, не попавших в max_rows_to_group_by.

View File

@ -265,14 +265,15 @@ void Aggregator::mergeDataImpl(
if (!inserted)
{
for (size_t i = 0; i < aggregates_size; ++i)
{
aggregate_functions[i]->merge(
Method::getAggregateData(res_it->second) + offsets_of_aggregate_states[i],
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
for (size_t i = 0; i < aggregates_size; ++i)
aggregate_functions[i]->destroy(
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
}
Method::getAggregateData(it->second) = nullptr;
}
else
{
@ -331,18 +332,16 @@ void Aggregator::destroyImpl(
for (typename Method::const_iterator it = method.data.begin(); it != method.data.end(); ++it)
{
for (size_t i = 0; i < aggregates_size; ++i)
/// Если аггрегатная функция не может быть финализирована, то за ее удаление отвечает ColumnAggregateFunction
if (is_final[i])
{
char * data = Method::getAggregateData(it->second);
{
char * data = Method::getAggregateData(it->second);
/** Если исключение (обычно нехватка памяти, кидается MemoryTracker-ом) возникло
* после вставки ключа в хэш-таблицу, но до создания всех состояний агрегатных функций,
* то data будет равен nullptr-у.
*/
if (nullptr != data)
aggregate_functions[i]->destroy(data + offsets_of_aggregate_states[i]);
}
/** Если исключение (обычно нехватка памяти, кидается MemoryTracker-ом) возникло
* после вставки ключа в хэш-таблицу, но до создания всех состояний агрегатных функций,
* то data будет равен nullptr-у.
*/
if (nullptr != data)
aggregate_functions[i]->destroy(data + offsets_of_aggregate_states[i]);
}
}
}
@ -528,65 +527,83 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants, bool fi
key_columns[i]->reserve(rows);
}
for (size_t i = 0; i < aggregates_size; ++i)
try
{
is_final[i] = final && aggregate_functions[i]->canBeFinal();
if (!is_final[i])
{
/// Столбец ColumnAggregateFunction захватывает разделяемое владение ареной с состояниями агрегатных функций.
ColumnAggregateFunction & column_aggregate_func = static_cast<ColumnAggregateFunction &>(*res.getByPosition(i + keys_size).column);
for (size_t j = 0; j < data_variants.aggregates_pools.size(); ++j)
column_aggregate_func.addArena(data_variants.aggregates_pools[j]);
aggregate_columns[i] = &column_aggregate_func.getData();
aggregate_columns[i]->resize(rows);
}
else
{
ColumnWithNameAndType & column = res.getByPosition(i + keys_size);
column.type = aggregate_functions[i]->getReturnType();
column.column = column.type->createColumn();
column.column->reserve(rows);
final_aggregate_columns[i] = column.column;
}
}
if (data_variants.type == AggregatedDataVariants::WITHOUT_KEY || overflow_row)
{
AggregatedDataWithoutKey & data = data_variants.without_key;
for (size_t i = 0; i < aggregates_size; ++i)
{
is_final[i] = final && aggregate_functions[i]->canBeFinal();
if (!is_final[i])
(*aggregate_columns[i])[0] = data + offsets_of_aggregate_states[i];
{
/// Столбец ColumnAggregateFunction захватывает разделяемое владение ареной с состояниями агрегатных функций.
ColumnAggregateFunction & column_aggregate_func = static_cast<ColumnAggregateFunction &>(*res.getByPosition(i + keys_size).column);
for (size_t j = 0; j < data_variants.aggregates_pools.size(); ++j)
column_aggregate_func.addArena(data_variants.aggregates_pools[j]);
aggregate_columns[i] = &column_aggregate_func.getData();
aggregate_columns[i]->resize(rows);
}
else
aggregate_functions[i]->insertResultInto(data + offsets_of_aggregate_states[i], *final_aggregate_columns[i]);
{
ColumnWithNameAndType & column = res.getByPosition(i + keys_size);
column.type = aggregate_functions[i]->getReturnType();
column.column = column.type->createColumn();
column.column->reserve(rows);
if (overflow_row)
for (size_t i = 0; i < keys_size; ++i)
key_columns[i]->insertDefault();
final_aggregate_columns[i] = column.column;
}
}
if (data_variants.type == AggregatedDataVariants::WITHOUT_KEY || overflow_row)
{
AggregatedDataWithoutKey & data = data_variants.without_key;
for (size_t i = 0; i < aggregates_size; ++i)
if (!is_final[i])
(*aggregate_columns[i])[0] = data + offsets_of_aggregate_states[i];
else
aggregate_functions[i]->insertResultInto(data + offsets_of_aggregate_states[i], *final_aggregate_columns[i]);
if (overflow_row)
for (size_t i = 0; i < keys_size; ++i)
key_columns[i]->insertDefault();
}
size_t start_row = overflow_row ? 1 : 0;
if (data_variants.type == AggregatedDataVariants::KEY_64)
convertToBlockImpl(*data_variants.key64, key_columns, aggregate_columns,
final_aggregate_columns, data_variants.key_sizes, start_row);
else if (data_variants.type == AggregatedDataVariants::KEY_STRING)
convertToBlockImpl(*data_variants.key_string, key_columns, aggregate_columns,
final_aggregate_columns, data_variants.key_sizes, start_row);
else if (data_variants.type == AggregatedDataVariants::KEY_FIXED_STRING)
convertToBlockImpl(*data_variants.key_fixed_string, key_columns, aggregate_columns,
final_aggregate_columns, data_variants.key_sizes, start_row);
else if (data_variants.type == AggregatedDataVariants::KEYS_128)
convertToBlockImpl(*data_variants.keys128, key_columns, aggregate_columns,
final_aggregate_columns, data_variants.key_sizes, start_row);
else if (data_variants.type == AggregatedDataVariants::HASHED)
convertToBlockImpl(*data_variants.hashed, key_columns, aggregate_columns,
final_aggregate_columns, data_variants.key_sizes, start_row);
else if (data_variants.type != AggregatedDataVariants::WITHOUT_KEY)
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
}
catch (...)
{
/** Работа с состояниями агрегатных функций недостаточно exception-safe.
* Если часть столбцов aggregate_columns была resize-на, но значения не были вставлены,
* то эти столбцы будут в некорректном состоянии
* (ColumnAggregateFunction попытаются в деструкторе вызвать деструкторы у элементов, которых нет),
* а также деструкторы будут вызываться у AggregatedDataVariants.
* Поэтому, вручную "откатываем" их.
*/
for (size_t i = 0; i < aggregates_size; ++i)
if (aggregate_columns[i])
aggregate_columns[i]->clear();
size_t start_row = overflow_row ? 1 : 0;
if (data_variants.type == AggregatedDataVariants::KEY_64)
convertToBlockImpl(*data_variants.key64, key_columns, aggregate_columns,
final_aggregate_columns, data_variants.key_sizes, start_row);
else if (data_variants.type == AggregatedDataVariants::KEY_STRING)
convertToBlockImpl(*data_variants.key_string, key_columns, aggregate_columns,
final_aggregate_columns, data_variants.key_sizes, start_row);
else if (data_variants.type == AggregatedDataVariants::KEY_FIXED_STRING)
convertToBlockImpl(*data_variants.key_fixed_string, key_columns, aggregate_columns,
final_aggregate_columns, data_variants.key_sizes, start_row);
else if (data_variants.type == AggregatedDataVariants::KEYS_128)
convertToBlockImpl(*data_variants.keys128, key_columns, aggregate_columns,
final_aggregate_columns, data_variants.key_sizes, start_row);
else if (data_variants.type == AggregatedDataVariants::HASHED)
convertToBlockImpl(*data_variants.hashed, key_columns, aggregate_columns,
final_aggregate_columns, data_variants.key_sizes, start_row);
else if (data_variants.type != AggregatedDataVariants::WITHOUT_KEY)
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
throw;
}
if (!final)
{
@ -650,10 +667,12 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va
AggregatedDataWithoutKey & current_data = current.without_key;
for (size_t i = 0; i < aggregates_size; ++i)
{
aggregate_functions[i]->merge(res_data + offsets_of_aggregate_states[i], current_data + offsets_of_aggregate_states[i]);
for (size_t i = 0; i < aggregates_size; ++i)
aggregate_functions[i]->destroy(current_data + offsets_of_aggregate_states[i]);
}
current_data = nullptr;
}
if (res->type == AggregatedDataVariants::KEY_64)
@ -775,9 +794,7 @@ void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result)
AggregatedDataWithoutKey & res_data = result.without_key;
for (size_t i = 0; i < aggregates_size; ++i)
/// Если аггрегатная функция не может быть финализирована, то за ее удаление отвечает ColumnAggregateFunction
if (is_final[i])
aggregate_functions[i]->destroy(res_data + offsets_of_aggregate_states[i]);
aggregate_functions[i]->destroy(res_data + offsets_of_aggregate_states[i]);
}
if (result.type == AggregatedDataVariants::KEY_64)

View File

@ -97,7 +97,7 @@ void sortBlock(Block & block, const SortDescription & description, size_t limit)
for (size_t i = 0; i < size; ++i)
perm[i] = i;
if (limit > size)
if (limit >= size)
limit = 0;
bool need_collation = false;

View File

@ -193,7 +193,7 @@ StoragePtr StorageFactory::get(
* - (для Replicated) Путь к таблице в ZooKeeper
* - (для Replicated) Имя реплики в ZooKeeper
* - имя столбца с датой;
* - (не обязательно) имя столбца для семплирования (запрос с SAMPLE x будет выбирать строки, у которых в этом столбце значение меньше, чем x*UINT32_MAX);
* - (не обязательно) выражение для семплирования (запрос с SAMPLE x будет выбирать строки, у которых в этом столбце значение меньше, чем x*UINT32_MAX);
* - выражение для сортировки (либо скалярное выражение, либо tuple из нескольких);
* - index_granularity;
* - (для Collapsing) имя столбца, содержащего тип строчки с изменением "визита" (принимающего значения 1 и -1).

View File

@ -0,0 +1,100 @@
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc

View File

@ -0,0 +1 @@
SELECT s FROM (SELECT materialize('abc') AS s FROM system.numbers LIMIT 100) ORDER BY s DESC

View File

@ -0,0 +1,100 @@
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc
abc

View File

@ -0,0 +1 @@
SELECT s FROM (SELECT toFixedString(materialize('abc'), 3) AS s FROM system.numbers LIMIT 100) ORDER BY s DESC