mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-03 13:02:00 +00:00
dbms: improved performance of aggregation by one numeric key (up to: 2x for UInt8, 5x for UInt16, 1.1x for UInt32, UInt64) [#METR-2944].
This commit is contained in:
parent
8f687c6042
commit
425263970f
@ -77,7 +77,7 @@ template <> struct CompareHelper<Float64> : public FloatCompareHelper<Float64> {
|
||||
/** Шаблон столбцов, которые используют для хранения простой массив.
|
||||
*/
|
||||
template <typename T>
|
||||
class ColumnVector : public IColumn
|
||||
class ColumnVector final : public IColumn
|
||||
{
|
||||
private:
|
||||
typedef ColumnVector<T> Self;
|
||||
|
@ -69,3 +69,4 @@
|
||||
#define DBMS_DISTRIBUTED_DIRECTORY_MONITOR_SLEEP_TIME_MS 100
|
||||
|
||||
#define ALWAYS_INLINE __attribute__((__always_inline__))
|
||||
#define NO_INLINE __attribute__((__noinline__))
|
||||
|
@ -22,6 +22,7 @@
|
||||
#include <DB/Columns/ColumnString.h>
|
||||
#include <DB/Columns/ColumnFixedString.h>
|
||||
#include <DB/Columns/ColumnAggregateFunction.h>
|
||||
#include <DB/Columns/ColumnVector.h>
|
||||
|
||||
|
||||
|
||||
@ -53,25 +54,59 @@ typedef HashMap<UInt128, AggregateDataPtr, UInt128Hash> AggregatedDataWithKeys12
|
||||
typedef HashMap<UInt128, std::pair<StringRef*, AggregateDataPtr>, UInt128TrivialHash> AggregatedDataHashed;
|
||||
|
||||
|
||||
/// Для случая, когда есть один числовой ключ.
|
||||
struct AggregationMethodKey64
|
||||
/// Специализации для UInt8, UInt16.
|
||||
struct TrivialHash
|
||||
{
|
||||
typedef AggregatedDataWithUInt64Key Data;
|
||||
typedef Data::key_type Key;
|
||||
typedef Data::mapped_type Mapped;
|
||||
typedef Data::iterator iterator;
|
||||
typedef Data::const_iterator const_iterator;
|
||||
template <typename T>
|
||||
size_t operator() (T key) const
|
||||
{
|
||||
return key;
|
||||
}
|
||||
};
|
||||
|
||||
/// Превращает хэш-таблицу в что-то типа lookup-таблицы. Остаётся неоптимальность - в ячейках хранятся ключи.
|
||||
template <size_t key_bits>
|
||||
struct HashTableFixedGrower
|
||||
{
|
||||
size_t bufSize() const { return 1 << key_bits; }
|
||||
size_t mask() const { return bufSize() - 1; }
|
||||
size_t place(size_t x) const { return x; }
|
||||
size_t next(size_t pos) const { __builtin_unreachable(); return pos; }
|
||||
bool overflow(size_t elems) const { return false; }
|
||||
|
||||
void increaseSize() { __builtin_unreachable(); }
|
||||
void set(size_t num_elems) {}
|
||||
void setBufSize(size_t buf_size_) {}
|
||||
};
|
||||
|
||||
typedef HashMap<UInt64, AggregateDataPtr, TrivialHash, HashTableFixedGrower<8>> AggregatedDataWithUInt8Key;
|
||||
typedef HashMap<UInt64, AggregateDataPtr, TrivialHash, HashTableFixedGrower<16>> AggregatedDataWithUInt16Key;
|
||||
|
||||
template <typename FieldType> struct AggregatedDataWithUIntKey { using Type = AggregatedDataWithUInt64Key; };
|
||||
template <> struct AggregatedDataWithUIntKey<UInt8> { using Type = AggregatedDataWithUInt8Key; };
|
||||
template <> struct AggregatedDataWithUIntKey<UInt16> { using Type = AggregatedDataWithUInt16Key; };
|
||||
|
||||
|
||||
/// Для случая, когда есть один числовой ключ.
|
||||
template <typename FieldType> /// UInt8/16/32/64 для любых типов соответствующей битности.
|
||||
struct AggregationMethodOneNumber
|
||||
{
|
||||
typedef typename AggregatedDataWithUIntKey<FieldType>::Type Data;
|
||||
typedef typename Data::key_type Key;
|
||||
typedef typename Data::mapped_type Mapped;
|
||||
typedef typename Data::iterator iterator;
|
||||
typedef typename Data::const_iterator const_iterator;
|
||||
|
||||
Data data;
|
||||
|
||||
const IColumn * column;
|
||||
const ColumnVector<FieldType> * column;
|
||||
|
||||
/** Вызывается в начале обработки каждого блока.
|
||||
* Устанавливает переменные, необходимые для остальных методов, вызываемых во внутренних циклах.
|
||||
*/
|
||||
void init(ConstColumnPlainPtrs & key_columns)
|
||||
{
|
||||
column = key_columns[0];
|
||||
column = static_cast<const ColumnVector<FieldType> *>(key_columns[0]);
|
||||
}
|
||||
|
||||
/// Достать из ключевых столбцов ключ для вставки в хэш-таблицу.
|
||||
@ -99,7 +134,7 @@ struct AggregationMethodKey64
|
||||
*/
|
||||
static void insertKeyIntoColumns(const_iterator & it, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes)
|
||||
{
|
||||
key_columns[0]->insertData(reinterpret_cast<const char *>(&it->first), sizeof(it->first));
|
||||
static_cast<ColumnVector<FieldType> *>(key_columns[0])->insertData(reinterpret_cast<const char *>(&it->first), sizeof(it->first));
|
||||
}
|
||||
};
|
||||
|
||||
@ -317,7 +352,10 @@ struct AggregatedDataVariants : private boost::noncopyable
|
||||
*/
|
||||
AggregatedDataWithoutKey without_key = nullptr;
|
||||
|
||||
std::unique_ptr<AggregationMethodKey64> key64;
|
||||
std::unique_ptr<AggregationMethodOneNumber<UInt8>> key8;
|
||||
std::unique_ptr<AggregationMethodOneNumber<UInt16>> key16;
|
||||
std::unique_ptr<AggregationMethodOneNumber<UInt32>> key32;
|
||||
std::unique_ptr<AggregationMethodOneNumber<UInt64>> key64;
|
||||
std::unique_ptr<AggregationMethodString> key_string;
|
||||
std::unique_ptr<AggregationMethodFixedString> key_fixed_string;
|
||||
std::unique_ptr<AggregationMethodKeys128> keys128;
|
||||
@ -325,13 +363,16 @@ struct AggregatedDataVariants : private boost::noncopyable
|
||||
|
||||
enum Type
|
||||
{
|
||||
EMPTY = 0,
|
||||
WITHOUT_KEY = 1,
|
||||
KEY_64 = 2,
|
||||
KEY_STRING = 3,
|
||||
KEY_FIXED_STRING = 4,
|
||||
KEYS_128 = 5,
|
||||
HASHED = 6,
|
||||
EMPTY = 0,
|
||||
WITHOUT_KEY,
|
||||
KEY_8,
|
||||
KEY_16,
|
||||
KEY_32,
|
||||
KEY_64,
|
||||
KEY_STRING,
|
||||
KEY_FIXED_STRING,
|
||||
KEYS_128,
|
||||
HASHED,
|
||||
};
|
||||
Type type = EMPTY;
|
||||
|
||||
@ -348,11 +389,14 @@ struct AggregatedDataVariants : private boost::noncopyable
|
||||
{
|
||||
case EMPTY: break;
|
||||
case WITHOUT_KEY: break;
|
||||
case KEY_64: key64 .reset(new AggregationMethodKey64); break;
|
||||
case KEY_STRING: key_string .reset(new AggregationMethodString); break;
|
||||
case KEY_FIXED_STRING: key_fixed_string.reset(new AggregationMethodFixedString); break;
|
||||
case KEYS_128: keys128 .reset(new AggregationMethodKeys128); break;
|
||||
case HASHED: hashed .reset(new AggregationMethodHashed); break;
|
||||
case KEY_8: key8 .reset(new decltype(key8)::element_type); break;
|
||||
case KEY_16: key16 .reset(new decltype(key16)::element_type); break;
|
||||
case KEY_32: key32 .reset(new decltype(key32)::element_type); break;
|
||||
case KEY_64: key64 .reset(new decltype(key64)::element_type); break;
|
||||
case KEY_STRING: key_string .reset(new decltype(key_string)::element_type); break;
|
||||
case KEY_FIXED_STRING: key_fixed_string.reset(new decltype(key_fixed_string)::element_type); break;
|
||||
case KEYS_128: keys128 .reset(new decltype(keys128)::element_type); break;
|
||||
case HASHED: hashed .reset(new decltype(hashed)::element_type); break;
|
||||
|
||||
default:
|
||||
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
|
||||
@ -365,6 +409,9 @@ struct AggregatedDataVariants : private boost::noncopyable
|
||||
{
|
||||
case EMPTY: return 0;
|
||||
case WITHOUT_KEY: return 1;
|
||||
case KEY_8: return key8->data.size() + (without_key != nullptr);
|
||||
case KEY_16: return key16->data.size() + (without_key != nullptr);
|
||||
case KEY_32: return key32->data.size() + (without_key != nullptr);
|
||||
case KEY_64: return key64->data.size() + (without_key != nullptr);
|
||||
case KEY_STRING: return key_string->data.size() + (without_key != nullptr);
|
||||
case KEY_FIXED_STRING: return key_fixed_string->data.size() + (without_key != nullptr);
|
||||
@ -382,6 +429,9 @@ struct AggregatedDataVariants : private boost::noncopyable
|
||||
{
|
||||
case EMPTY: return "EMPTY";
|
||||
case WITHOUT_KEY: return "WITHOUT_KEY";
|
||||
case KEY_8: return "KEY_8";
|
||||
case KEY_16: return "KEY_16";
|
||||
case KEY_32: return "KEY_32";
|
||||
case KEY_64: return "KEY_64";
|
||||
case KEY_STRING: return "KEY_STRING";
|
||||
case KEY_FIXED_STRING: return "KEY_FIXED_STRING";
|
||||
|
@ -94,6 +94,9 @@ private:
|
||||
void calculateHashesThread(Block & block, size_t begin, size_t end, ExceptionPtr & exception, MemoryTracker * memory_tracker);
|
||||
void aggregateThread(Block & block, AggregatedDataVariants & result, size_t thread_no, ExceptionPtr & exception, MemoryTracker * memory_tracker);
|
||||
void convertToBlockThread(AggregatedDataVariants & data_variant, Block & block, bool final, ExceptionPtr & exception, MemoryTracker * memory_tracker);
|
||||
|
||||
template <typename FieldType>
|
||||
void aggregateOneNumber(AggregatedDataVariants & result, size_t thread_no, bool no_more_keys);
|
||||
};
|
||||
|
||||
|
||||
|
@ -7,7 +7,8 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class StorageView : public IStorage {
|
||||
class StorageView : public IStorage
|
||||
{
|
||||
|
||||
public:
|
||||
static StoragePtr create(const String & table_name_, const String & database_name_,
|
||||
@ -16,7 +17,7 @@ public:
|
||||
std::string getName() const override { return "View"; }
|
||||
std::string getTableName() const override { return table_name; }
|
||||
const NamesAndTypesList & getColumnsList() const override { return *columns; }
|
||||
DB::ASTPtr getInnerQuery() const { return inner_query.clone(); };
|
||||
ASTPtr getInnerQuery() const { return inner_query.clone(); };
|
||||
|
||||
/// Пробрасывается внутрь запроса и решается на его уровне.
|
||||
bool supportsSampling() const override { return true; }
|
||||
|
@ -125,7 +125,18 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod(const ConstColu
|
||||
|
||||
/// Если есть один числовой ключ, который помещается в 64 бита
|
||||
if (keys_size == 1 && key_columns[0]->isNumeric())
|
||||
return AggregatedDataVariants::KEY_64;
|
||||
{
|
||||
size_t size_of_field = key_columns[0]->sizeOfField();
|
||||
if (size_of_field == 1)
|
||||
return AggregatedDataVariants::KEY_8;
|
||||
if (size_of_field == 2)
|
||||
return AggregatedDataVariants::KEY_16;
|
||||
if (size_of_field == 4)
|
||||
return AggregatedDataVariants::KEY_32;
|
||||
if (size_of_field == 8)
|
||||
return AggregatedDataVariants::KEY_64;
|
||||
throw Exception("Logical error: numeric column has sizeOfField not in 1, 2, 4, 8.", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
/// Если ключи помещаются в 128 бит, будем использовать хэш-таблицу по упакованным в 128-бит ключам
|
||||
if (keys_fit_128_bits)
|
||||
@ -167,8 +178,12 @@ void Aggregator::createAggregateStates(AggregateDataPtr & aggregate_data) const
|
||||
}
|
||||
|
||||
|
||||
/** Интересно - если убрать noinline, то gcc зачем-то инлайнит эту функцию, и производительность уменьшается (~10%).
|
||||
* (Возможно из-за того, что после инлайна этой функции, перестают инлайниться более внутренние функции.)
|
||||
* Инлайнить не имеет смысла, так как внутренний цикл находится целиком внутри этой функции.
|
||||
*/
|
||||
template <typename Method>
|
||||
void Aggregator::executeImpl(
|
||||
void NO_INLINE Aggregator::executeImpl(
|
||||
Method & method,
|
||||
Arena * aggregates_pool,
|
||||
size_t rows,
|
||||
@ -226,7 +241,7 @@ void Aggregator::executeImpl(
|
||||
|
||||
|
||||
template <typename Method>
|
||||
void Aggregator::convertToBlockImpl(
|
||||
void NO_INLINE Aggregator::convertToBlockImpl(
|
||||
Method & method,
|
||||
ColumnPlainPtrs & key_columns,
|
||||
AggregateColumnsData & aggregate_columns,
|
||||
@ -262,7 +277,7 @@ void Aggregator::convertToBlockImpl(
|
||||
|
||||
|
||||
template <typename Method>
|
||||
void Aggregator::mergeDataImpl(
|
||||
void NO_INLINE Aggregator::mergeDataImpl(
|
||||
Method & method_dst,
|
||||
Method & method_src) const
|
||||
{
|
||||
@ -294,7 +309,7 @@ void Aggregator::mergeDataImpl(
|
||||
|
||||
|
||||
template <typename Method>
|
||||
void Aggregator::mergeStreamsImpl(
|
||||
void NO_INLINE Aggregator::mergeStreamsImpl(
|
||||
Method & method,
|
||||
Arena * aggregates_pool,
|
||||
size_t start_row,
|
||||
@ -336,7 +351,7 @@ void Aggregator::mergeStreamsImpl(
|
||||
|
||||
|
||||
template <typename Method>
|
||||
void Aggregator::destroyImpl(
|
||||
void NO_INLINE Aggregator::destroyImpl(
|
||||
Method & method) const
|
||||
{
|
||||
for (typename Method::const_iterator it = method.data.begin(); it != method.data.end(); ++it)
|
||||
@ -372,8 +387,14 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result,
|
||||
|
||||
/// Запоминаем столбцы, с которыми будем работать
|
||||
for (size_t i = 0; i < keys_size; ++i)
|
||||
{
|
||||
key_columns[i] = block.getByPosition(keys[i]).column;
|
||||
|
||||
if (key_columns[i]->isConst())
|
||||
throw Exception("Constants is not allowed as GROUP BY keys"
|
||||
" (but all of them must be eliminated in ExpressionAnalyzer)", ErrorCodes::ILLEGAL_COLUMN);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < aggregates_size; ++i)
|
||||
{
|
||||
for (size_t j = 0; j < aggregate_columns[i].size(); ++j)
|
||||
@ -434,7 +455,16 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result,
|
||||
|
||||
AggregateDataPtr overflow_row_ptr = overflow_row ? result.without_key : nullptr;
|
||||
|
||||
if (result.type == AggregatedDataVariants::KEY_64)
|
||||
if (result.type == AggregatedDataVariants::KEY_8)
|
||||
executeImpl(*result.key8, result.aggregates_pool, rows, key_columns, aggregate_columns,
|
||||
result.key_sizes, key, no_more_keys, overflow_row_ptr);
|
||||
else if (result.type == AggregatedDataVariants::KEY_16)
|
||||
executeImpl(*result.key16, result.aggregates_pool, rows, key_columns, aggregate_columns,
|
||||
result.key_sizes, key, no_more_keys, overflow_row_ptr);
|
||||
else if (result.type == AggregatedDataVariants::KEY_32)
|
||||
executeImpl(*result.key32, result.aggregates_pool, rows, key_columns, aggregate_columns,
|
||||
result.key_sizes, key, no_more_keys, overflow_row_ptr);
|
||||
else if (result.type == AggregatedDataVariants::KEY_64)
|
||||
executeImpl(*result.key64, result.aggregates_pool, rows, key_columns, aggregate_columns,
|
||||
result.key_sizes, key, no_more_keys, overflow_row_ptr);
|
||||
else if (result.type == AggregatedDataVariants::KEY_STRING)
|
||||
@ -590,7 +620,16 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants, bool fi
|
||||
|
||||
size_t start_row = overflow_row ? 1 : 0;
|
||||
|
||||
if (data_variants.type == AggregatedDataVariants::KEY_64)
|
||||
if (data_variants.type == AggregatedDataVariants::KEY_8)
|
||||
convertToBlockImpl(*data_variants.key8, key_columns, aggregate_columns,
|
||||
final_aggregate_columns, data_variants.key_sizes, start_row, final);
|
||||
else if (data_variants.type == AggregatedDataVariants::KEY_16)
|
||||
convertToBlockImpl(*data_variants.key16, key_columns, aggregate_columns,
|
||||
final_aggregate_columns, data_variants.key_sizes, start_row, final);
|
||||
else if (data_variants.type == AggregatedDataVariants::KEY_32)
|
||||
convertToBlockImpl(*data_variants.key32, key_columns, aggregate_columns,
|
||||
final_aggregate_columns, data_variants.key_sizes, start_row, final);
|
||||
else if (data_variants.type == AggregatedDataVariants::KEY_64)
|
||||
convertToBlockImpl(*data_variants.key64, key_columns, aggregate_columns,
|
||||
final_aggregate_columns, data_variants.key_sizes, start_row, final);
|
||||
else if (data_variants.type == AggregatedDataVariants::KEY_STRING)
|
||||
@ -694,7 +733,13 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va
|
||||
current_data = nullptr;
|
||||
}
|
||||
|
||||
if (res->type == AggregatedDataVariants::KEY_64)
|
||||
if (res->type == AggregatedDataVariants::KEY_8)
|
||||
mergeDataImpl(*res->key8, *current.key8);
|
||||
else if (res->type == AggregatedDataVariants::KEY_16)
|
||||
mergeDataImpl(*res->key16, *current.key16);
|
||||
else if (res->type == AggregatedDataVariants::KEY_32)
|
||||
mergeDataImpl(*res->key32, *current.key32);
|
||||
else if (res->type == AggregatedDataVariants::KEY_64)
|
||||
mergeDataImpl(*res->key64, *current.key64);
|
||||
else if (res->type == AggregatedDataVariants::KEY_STRING)
|
||||
mergeDataImpl(*res->key_string, *current.key_string);
|
||||
@ -782,7 +827,13 @@ void Aggregator::merge(BlockInputStreamPtr stream, AggregatedDataVariants & resu
|
||||
|
||||
size_t start_row = overflow_row ? 1 : 0;
|
||||
|
||||
if (result.type == AggregatedDataVariants::KEY_64)
|
||||
if (result.type == AggregatedDataVariants::KEY_8)
|
||||
mergeStreamsImpl(*result.key8, result.aggregates_pool, start_row, rows, key_columns, aggregate_columns, key_sizes, key);
|
||||
else if (result.type == AggregatedDataVariants::KEY_16)
|
||||
mergeStreamsImpl(*result.key16, result.aggregates_pool, start_row, rows, key_columns, aggregate_columns, key_sizes, key);
|
||||
else if (result.type == AggregatedDataVariants::KEY_32)
|
||||
mergeStreamsImpl(*result.key32, result.aggregates_pool, start_row, rows, key_columns, aggregate_columns, key_sizes, key);
|
||||
else if (result.type == AggregatedDataVariants::KEY_64)
|
||||
mergeStreamsImpl(*result.key64, result.aggregates_pool, start_row, rows, key_columns, aggregate_columns, key_sizes, key);
|
||||
else if (result.type == AggregatedDataVariants::KEY_STRING)
|
||||
mergeStreamsImpl(*result.key_string, result.aggregates_pool, start_row, rows, key_columns, aggregate_columns, key_sizes, key);
|
||||
@ -818,7 +869,13 @@ void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result)
|
||||
aggregate_functions[i]->destroy(res_data + offsets_of_aggregate_states[i]);
|
||||
}
|
||||
|
||||
if (result.type == AggregatedDataVariants::KEY_64)
|
||||
if (result.type == AggregatedDataVariants::KEY_8)
|
||||
destroyImpl(*result.key8);
|
||||
else if (result.type == AggregatedDataVariants::KEY_16)
|
||||
destroyImpl(*result.key16);
|
||||
else if (result.type == AggregatedDataVariants::KEY_32)
|
||||
destroyImpl(*result.key32);
|
||||
else if (result.type == AggregatedDataVariants::KEY_64)
|
||||
destroyImpl(*result.key64);
|
||||
else if (result.type == AggregatedDataVariants::KEY_STRING)
|
||||
destroyImpl(*result.key_string);
|
||||
|
@ -48,7 +48,10 @@ void SplittingAggregator::execute(BlockInputStreamPtr stream, ManyAggregatedData
|
||||
method = chooseAggregationMethod(key_columns, key_sizes);
|
||||
|
||||
/// Подготавливаем массивы, куда будут складываться ключи или хэши от ключей.
|
||||
if (method == AggregatedDataVariants::KEY_64)
|
||||
if (method == AggregatedDataVariants::KEY_8 /// TODO не использовать SplittingAggregator для маленьких ключей.
|
||||
|| method == AggregatedDataVariants::KEY_16
|
||||
|| method == AggregatedDataVariants::KEY_32
|
||||
|| method == AggregatedDataVariants::KEY_64)
|
||||
{
|
||||
keys64.resize(rows);
|
||||
}
|
||||
@ -96,7 +99,7 @@ void SplittingAggregator::execute(BlockInputStreamPtr stream, ManyAggregatedData
|
||||
|
||||
pool.wait();
|
||||
|
||||
rethrowFirstException(exceptions);
|
||||
rethrowFirstException(exceptions); /// TODO Заменить на future, packaged_task
|
||||
|
||||
/// Параллельно агрегируем в независимые хэш-таблицы
|
||||
|
||||
@ -150,14 +153,17 @@ void SplittingAggregator::calculateHashesThread(Block & block, size_t begin, siz
|
||||
|
||||
try
|
||||
{
|
||||
if (method == AggregatedDataVariants::KEY_64)
|
||||
if (method == AggregatedDataVariants::KEY_8
|
||||
|| method == AggregatedDataVariants::KEY_16
|
||||
|| method == AggregatedDataVariants::KEY_32
|
||||
|| method == AggregatedDataVariants::KEY_64)
|
||||
{
|
||||
const IColumn & column = *key_columns[0];
|
||||
|
||||
for (size_t i = begin; i < end; ++i)
|
||||
{
|
||||
keys64[i] = column.get64(i);
|
||||
thread_nums[i] = intHash32<0xd1f93e3190506c7cULL>(keys64[i]) % threads;
|
||||
keys64[i] = column.get64(i); /// TODO Убрать виртуальный вызов
|
||||
thread_nums[i] = intHash32<0xd1f93e3190506c7cULL>(keys64[i]) % threads; /// TODO более эффективная хэш-функция
|
||||
}
|
||||
}
|
||||
else if (method == AggregatedDataVariants::KEY_STRING)
|
||||
@ -216,6 +222,45 @@ void SplittingAggregator::calculateHashesThread(Block & block, size_t begin, siz
|
||||
}
|
||||
|
||||
|
||||
template <typename FieldType>
|
||||
void SplittingAggregator::aggregateOneNumber(AggregatedDataVariants & result, size_t thread_no, bool no_more_keys)
|
||||
{
|
||||
AggregatedDataWithUInt64Key & res = result.key64->data;
|
||||
|
||||
for (size_t i = 0; i < rows; ++i)
|
||||
{
|
||||
if (thread_nums[i] != thread_no)
|
||||
continue;
|
||||
|
||||
/// Берём ключ
|
||||
UInt64 key = keys64[i];
|
||||
|
||||
AggregatedDataWithUInt64Key::iterator it;
|
||||
bool inserted;
|
||||
|
||||
if (!no_more_keys)
|
||||
res.emplace(key, it, inserted);
|
||||
else
|
||||
{
|
||||
inserted = false;
|
||||
it = res.find(key);
|
||||
if (res.end() == it)
|
||||
continue;
|
||||
}
|
||||
|
||||
if (inserted)
|
||||
{
|
||||
it->second = result.aggregates_pool->alloc(total_size_of_aggregate_states);
|
||||
createAggregateStates(it->second);
|
||||
}
|
||||
|
||||
/// Добавляем значения
|
||||
for (size_t j = 0; j < aggregates_size; ++j)
|
||||
aggregate_functions[j]->add(it->second + offsets_of_aggregate_states[j], &aggregate_columns[j][0], i);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void SplittingAggregator::aggregateThread(
|
||||
Block & block, AggregatedDataVariants & result, size_t thread_no, ExceptionPtr & exception, MemoryTracker * memory_tracker)
|
||||
{
|
||||
@ -233,42 +278,14 @@ void SplittingAggregator::aggregateThread(
|
||||
bool no_more_keys = max_rows_to_group_by && size_of_all_results > max_rows_to_group_by;
|
||||
size_t old_result_size = result.size();
|
||||
|
||||
if (method == AggregatedDataVariants::KEY_64)
|
||||
{
|
||||
AggregatedDataWithUInt64Key & res = result.key64->data;
|
||||
|
||||
for (size_t i = 0; i < rows; ++i)
|
||||
{
|
||||
if (thread_nums[i] != thread_no)
|
||||
continue;
|
||||
|
||||
/// Берём ключ
|
||||
UInt64 key = keys64[i];
|
||||
|
||||
AggregatedDataWithUInt64Key::iterator it;
|
||||
bool inserted;
|
||||
|
||||
if (!no_more_keys)
|
||||
res.emplace(key, it, inserted);
|
||||
else
|
||||
{
|
||||
inserted = false;
|
||||
it = res.find(key);
|
||||
if (res.end() == it)
|
||||
continue;
|
||||
}
|
||||
|
||||
if (inserted)
|
||||
{
|
||||
it->second = result.aggregates_pool->alloc(total_size_of_aggregate_states);
|
||||
createAggregateStates(it->second);
|
||||
}
|
||||
|
||||
/// Добавляем значения
|
||||
for (size_t j = 0; j < aggregates_size; ++j)
|
||||
aggregate_functions[j]->add(it->second + offsets_of_aggregate_states[j], &aggregate_columns[j][0], i);
|
||||
}
|
||||
}
|
||||
if (method == AggregatedDataVariants::KEY_8)
|
||||
aggregateOneNumber<UInt8>(result, thread_no, no_more_keys);
|
||||
else if (method == AggregatedDataVariants::KEY_16)
|
||||
aggregateOneNumber<UInt16>(result, thread_no, no_more_keys);
|
||||
else if (method == AggregatedDataVariants::KEY_32)
|
||||
aggregateOneNumber<UInt32>(result, thread_no, no_more_keys);
|
||||
else if (method == AggregatedDataVariants::KEY_64)
|
||||
aggregateOneNumber<UInt64>(result, thread_no, no_more_keys);
|
||||
else if (method == AggregatedDataVariants::KEY_STRING)
|
||||
{
|
||||
AggregatedDataWithStringKey & res = result.key_string->data;
|
||||
|
@ -34,9 +34,9 @@ BlockInputStreams StorageSystemOne::read(
|
||||
ColumnWithNameAndType col;
|
||||
col.name = "dummy";
|
||||
col.type = new DataTypeUInt8;
|
||||
col.column = new ColumnConstUInt8(1, 0);
|
||||
col.column = ColumnConstUInt8(1, 0).convertToFullColumn();
|
||||
block.insert(col);
|
||||
|
||||
|
||||
return BlockInputStreams(1, new OneBlockInputStream(block));
|
||||
}
|
||||
|
||||
|
@ -1 +1 @@
|
||||
SELECT quantilesTiming(0.1, 0.5, 0.9)(materialize(dummy)) FROM remote('127.0.0.{1,2}', system, one) GROUP BY 1 WITH TOTALS
|
||||
SELECT quantilesTiming(0.1, 0.5, 0.9)(dummy) FROM remote('127.0.0.{1,2}', system, one) GROUP BY 1 WITH TOTALS
|
||||
|
Loading…
Reference in New Issue
Block a user