This commit is contained in:
Evgeniy Gatov 2014-10-29 04:50:12 +03:00
commit 7b949ef7ec
9 changed files with 210 additions and 81 deletions

View File

@ -77,7 +77,7 @@ template <> struct CompareHelper<Float64> : public FloatCompareHelper<Float64> {
/** Шаблон столбцов, которые используют для хранения простой массив.
*/
template <typename T>
class ColumnVector : public IColumn
class ColumnVector final : public IColumn
{
private:
typedef ColumnVector<T> Self;

View File

@ -69,3 +69,4 @@
#define DBMS_DISTRIBUTED_DIRECTORY_MONITOR_SLEEP_TIME_MS 100
#define ALWAYS_INLINE __attribute__((__always_inline__))
#define NO_INLINE __attribute__((__noinline__))

View File

@ -22,6 +22,7 @@
#include <DB/Columns/ColumnString.h>
#include <DB/Columns/ColumnFixedString.h>
#include <DB/Columns/ColumnAggregateFunction.h>
#include <DB/Columns/ColumnVector.h>
@ -53,25 +54,59 @@ typedef HashMap<UInt128, AggregateDataPtr, UInt128Hash> AggregatedDataWithKeys12
typedef HashMap<UInt128, std::pair<StringRef*, AggregateDataPtr>, UInt128TrivialHash> AggregatedDataHashed;
/// Для случая, когда есть один числовой ключ.
struct AggregationMethodKey64
/// Специализации для UInt8, UInt16.
struct TrivialHash
{
typedef AggregatedDataWithUInt64Key Data;
typedef Data::key_type Key;
typedef Data::mapped_type Mapped;
typedef Data::iterator iterator;
typedef Data::const_iterator const_iterator;
template <typename T>
size_t operator() (T key) const
{
return key;
}
};
/// Превращает хэш-таблицу в что-то типа lookup-таблицы. Остаётся неоптимальность - в ячейках хранятся ключи.
template <size_t key_bits>
struct HashTableFixedGrower
{
size_t bufSize() const { return 1 << key_bits; }
size_t mask() const { return bufSize() - 1; }
size_t place(size_t x) const { return x; }
size_t next(size_t pos) const { __builtin_unreachable(); return pos; }
bool overflow(size_t elems) const { return false; }
void increaseSize() { __builtin_unreachable(); }
void set(size_t num_elems) {}
void setBufSize(size_t buf_size_) {}
};
typedef HashMap<UInt64, AggregateDataPtr, TrivialHash, HashTableFixedGrower<8>> AggregatedDataWithUInt8Key;
typedef HashMap<UInt64, AggregateDataPtr, TrivialHash, HashTableFixedGrower<16>> AggregatedDataWithUInt16Key;
template <typename FieldType> struct AggregatedDataWithUIntKey { using Type = AggregatedDataWithUInt64Key; };
template <> struct AggregatedDataWithUIntKey<UInt8> { using Type = AggregatedDataWithUInt8Key; };
template <> struct AggregatedDataWithUIntKey<UInt16> { using Type = AggregatedDataWithUInt16Key; };
/// Для случая, когда есть один числовой ключ.
template <typename FieldType> /// UInt8/16/32/64 для любых типов соответствующей битности.
struct AggregationMethodOneNumber
{
typedef typename AggregatedDataWithUIntKey<FieldType>::Type Data;
typedef typename Data::key_type Key;
typedef typename Data::mapped_type Mapped;
typedef typename Data::iterator iterator;
typedef typename Data::const_iterator const_iterator;
Data data;
const IColumn * column;
const ColumnVector<FieldType> * column;
/** Вызывается в начале обработки каждого блока.
* Устанавливает переменные, необходимые для остальных методов, вызываемых во внутренних циклах.
*/
void init(ConstColumnPlainPtrs & key_columns)
{
column = key_columns[0];
column = static_cast<const ColumnVector<FieldType> *>(key_columns[0]);
}
/// Достать из ключевых столбцов ключ для вставки в хэш-таблицу.
@ -99,7 +134,7 @@ struct AggregationMethodKey64
*/
static void insertKeyIntoColumns(const_iterator & it, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes)
{
key_columns[0]->insertData(reinterpret_cast<const char *>(&it->first), sizeof(it->first));
static_cast<ColumnVector<FieldType> *>(key_columns[0])->insertData(reinterpret_cast<const char *>(&it->first), sizeof(it->first));
}
};
@ -317,7 +352,10 @@ struct AggregatedDataVariants : private boost::noncopyable
*/
AggregatedDataWithoutKey without_key = nullptr;
std::unique_ptr<AggregationMethodKey64> key64;
std::unique_ptr<AggregationMethodOneNumber<UInt8>> key8;
std::unique_ptr<AggregationMethodOneNumber<UInt16>> key16;
std::unique_ptr<AggregationMethodOneNumber<UInt32>> key32;
std::unique_ptr<AggregationMethodOneNumber<UInt64>> key64;
std::unique_ptr<AggregationMethodString> key_string;
std::unique_ptr<AggregationMethodFixedString> key_fixed_string;
std::unique_ptr<AggregationMethodKeys128> keys128;
@ -325,13 +363,16 @@ struct AggregatedDataVariants : private boost::noncopyable
enum Type
{
EMPTY = 0,
WITHOUT_KEY = 1,
KEY_64 = 2,
KEY_STRING = 3,
KEY_FIXED_STRING = 4,
KEYS_128 = 5,
HASHED = 6,
EMPTY = 0,
WITHOUT_KEY,
KEY_8,
KEY_16,
KEY_32,
KEY_64,
KEY_STRING,
KEY_FIXED_STRING,
KEYS_128,
HASHED,
};
Type type = EMPTY;
@ -348,11 +389,14 @@ struct AggregatedDataVariants : private boost::noncopyable
{
case EMPTY: break;
case WITHOUT_KEY: break;
case KEY_64: key64 .reset(new AggregationMethodKey64); break;
case KEY_STRING: key_string .reset(new AggregationMethodString); break;
case KEY_FIXED_STRING: key_fixed_string.reset(new AggregationMethodFixedString); break;
case KEYS_128: keys128 .reset(new AggregationMethodKeys128); break;
case HASHED: hashed .reset(new AggregationMethodHashed); break;
case KEY_8: key8 .reset(new decltype(key8)::element_type); break;
case KEY_16: key16 .reset(new decltype(key16)::element_type); break;
case KEY_32: key32 .reset(new decltype(key32)::element_type); break;
case KEY_64: key64 .reset(new decltype(key64)::element_type); break;
case KEY_STRING: key_string .reset(new decltype(key_string)::element_type); break;
case KEY_FIXED_STRING: key_fixed_string.reset(new decltype(key_fixed_string)::element_type); break;
case KEYS_128: keys128 .reset(new decltype(keys128)::element_type); break;
case HASHED: hashed .reset(new decltype(hashed)::element_type); break;
default:
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
@ -365,6 +409,9 @@ struct AggregatedDataVariants : private boost::noncopyable
{
case EMPTY: return 0;
case WITHOUT_KEY: return 1;
case KEY_8: return key8->data.size() + (without_key != nullptr);
case KEY_16: return key16->data.size() + (without_key != nullptr);
case KEY_32: return key32->data.size() + (without_key != nullptr);
case KEY_64: return key64->data.size() + (without_key != nullptr);
case KEY_STRING: return key_string->data.size() + (without_key != nullptr);
case KEY_FIXED_STRING: return key_fixed_string->data.size() + (without_key != nullptr);
@ -382,6 +429,9 @@ struct AggregatedDataVariants : private boost::noncopyable
{
case EMPTY: return "EMPTY";
case WITHOUT_KEY: return "WITHOUT_KEY";
case KEY_8: return "KEY_8";
case KEY_16: return "KEY_16";
case KEY_32: return "KEY_32";
case KEY_64: return "KEY_64";
case KEY_STRING: return "KEY_STRING";
case KEY_FIXED_STRING: return "KEY_FIXED_STRING";

View File

@ -94,6 +94,9 @@ private:
void calculateHashesThread(Block & block, size_t begin, size_t end, ExceptionPtr & exception, MemoryTracker * memory_tracker);
void aggregateThread(Block & block, AggregatedDataVariants & result, size_t thread_no, ExceptionPtr & exception, MemoryTracker * memory_tracker);
void convertToBlockThread(AggregatedDataVariants & data_variant, Block & block, bool final, ExceptionPtr & exception, MemoryTracker * memory_tracker);
template <typename FieldType>
void aggregateOneNumber(AggregatedDataVariants & result, size_t thread_no, bool no_more_keys);
};

View File

@ -7,7 +7,8 @@
namespace DB
{
class StorageView : public IStorage {
class StorageView : public IStorage
{
public:
static StoragePtr create(const String & table_name_, const String & database_name_,
@ -16,7 +17,7 @@ public:
std::string getName() const override { return "View"; }
std::string getTableName() const override { return table_name; }
const NamesAndTypesList & getColumnsList() const override { return *columns; }
DB::ASTPtr getInnerQuery() const { return inner_query.clone(); };
ASTPtr getInnerQuery() const { return inner_query.clone(); };
/// Пробрасывается внутрь запроса и решается на его уровне.
bool supportsSampling() const override { return true; }

View File

@ -125,7 +125,18 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod(const ConstColu
/// Если есть один числовой ключ, который помещается в 64 бита
if (keys_size == 1 && key_columns[0]->isNumeric())
return AggregatedDataVariants::KEY_64;
{
size_t size_of_field = key_columns[0]->sizeOfField();
if (size_of_field == 1)
return AggregatedDataVariants::KEY_8;
if (size_of_field == 2)
return AggregatedDataVariants::KEY_16;
if (size_of_field == 4)
return AggregatedDataVariants::KEY_32;
if (size_of_field == 8)
return AggregatedDataVariants::KEY_64;
throw Exception("Logical error: numeric column has sizeOfField not in 1, 2, 4, 8.", ErrorCodes::LOGICAL_ERROR);
}
/// Если ключи помещаются в 128 бит, будем использовать хэш-таблицу по упакованным в 128-бит ключам
if (keys_fit_128_bits)
@ -167,8 +178,12 @@ void Aggregator::createAggregateStates(AggregateDataPtr & aggregate_data) const
}
/** Интересно - если убрать noinline, то gcc зачем-то инлайнит эту функцию, и производительность уменьшается (~10%).
* (Возможно из-за того, что после инлайна этой функции, перестают инлайниться более внутренние функции.)
* Инлайнить не имеет смысла, так как внутренний цикл находится целиком внутри этой функции.
*/
template <typename Method>
void Aggregator::executeImpl(
void NO_INLINE Aggregator::executeImpl(
Method & method,
Arena * aggregates_pool,
size_t rows,
@ -226,7 +241,7 @@ void Aggregator::executeImpl(
template <typename Method>
void Aggregator::convertToBlockImpl(
void NO_INLINE Aggregator::convertToBlockImpl(
Method & method,
ColumnPlainPtrs & key_columns,
AggregateColumnsData & aggregate_columns,
@ -262,7 +277,7 @@ void Aggregator::convertToBlockImpl(
template <typename Method>
void Aggregator::mergeDataImpl(
void NO_INLINE Aggregator::mergeDataImpl(
Method & method_dst,
Method & method_src) const
{
@ -294,7 +309,7 @@ void Aggregator::mergeDataImpl(
template <typename Method>
void Aggregator::mergeStreamsImpl(
void NO_INLINE Aggregator::mergeStreamsImpl(
Method & method,
Arena * aggregates_pool,
size_t start_row,
@ -336,7 +351,7 @@ void Aggregator::mergeStreamsImpl(
template <typename Method>
void Aggregator::destroyImpl(
void NO_INLINE Aggregator::destroyImpl(
Method & method) const
{
for (typename Method::const_iterator it = method.data.begin(); it != method.data.end(); ++it)
@ -372,8 +387,14 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result,
/// Запоминаем столбцы, с которыми будем работать
for (size_t i = 0; i < keys_size; ++i)
{
key_columns[i] = block.getByPosition(keys[i]).column;
if (key_columns[i]->isConst())
throw Exception("Constants is not allowed as GROUP BY keys"
" (but all of them must be eliminated in ExpressionAnalyzer)", ErrorCodes::ILLEGAL_COLUMN);
}
for (size_t i = 0; i < aggregates_size; ++i)
{
for (size_t j = 0; j < aggregate_columns[i].size(); ++j)
@ -434,7 +455,16 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result,
AggregateDataPtr overflow_row_ptr = overflow_row ? result.without_key : nullptr;
if (result.type == AggregatedDataVariants::KEY_64)
if (result.type == AggregatedDataVariants::KEY_8)
executeImpl(*result.key8, result.aggregates_pool, rows, key_columns, aggregate_columns,
result.key_sizes, key, no_more_keys, overflow_row_ptr);
else if (result.type == AggregatedDataVariants::KEY_16)
executeImpl(*result.key16, result.aggregates_pool, rows, key_columns, aggregate_columns,
result.key_sizes, key, no_more_keys, overflow_row_ptr);
else if (result.type == AggregatedDataVariants::KEY_32)
executeImpl(*result.key32, result.aggregates_pool, rows, key_columns, aggregate_columns,
result.key_sizes, key, no_more_keys, overflow_row_ptr);
else if (result.type == AggregatedDataVariants::KEY_64)
executeImpl(*result.key64, result.aggregates_pool, rows, key_columns, aggregate_columns,
result.key_sizes, key, no_more_keys, overflow_row_ptr);
else if (result.type == AggregatedDataVariants::KEY_STRING)
@ -590,7 +620,16 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants, bool fi
size_t start_row = overflow_row ? 1 : 0;
if (data_variants.type == AggregatedDataVariants::KEY_64)
if (data_variants.type == AggregatedDataVariants::KEY_8)
convertToBlockImpl(*data_variants.key8, key_columns, aggregate_columns,
final_aggregate_columns, data_variants.key_sizes, start_row, final);
else if (data_variants.type == AggregatedDataVariants::KEY_16)
convertToBlockImpl(*data_variants.key16, key_columns, aggregate_columns,
final_aggregate_columns, data_variants.key_sizes, start_row, final);
else if (data_variants.type == AggregatedDataVariants::KEY_32)
convertToBlockImpl(*data_variants.key32, key_columns, aggregate_columns,
final_aggregate_columns, data_variants.key_sizes, start_row, final);
else if (data_variants.type == AggregatedDataVariants::KEY_64)
convertToBlockImpl(*data_variants.key64, key_columns, aggregate_columns,
final_aggregate_columns, data_variants.key_sizes, start_row, final);
else if (data_variants.type == AggregatedDataVariants::KEY_STRING)
@ -694,7 +733,13 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va
current_data = nullptr;
}
if (res->type == AggregatedDataVariants::KEY_64)
if (res->type == AggregatedDataVariants::KEY_8)
mergeDataImpl(*res->key8, *current.key8);
else if (res->type == AggregatedDataVariants::KEY_16)
mergeDataImpl(*res->key16, *current.key16);
else if (res->type == AggregatedDataVariants::KEY_32)
mergeDataImpl(*res->key32, *current.key32);
else if (res->type == AggregatedDataVariants::KEY_64)
mergeDataImpl(*res->key64, *current.key64);
else if (res->type == AggregatedDataVariants::KEY_STRING)
mergeDataImpl(*res->key_string, *current.key_string);
@ -782,7 +827,13 @@ void Aggregator::merge(BlockInputStreamPtr stream, AggregatedDataVariants & resu
size_t start_row = overflow_row ? 1 : 0;
if (result.type == AggregatedDataVariants::KEY_64)
if (result.type == AggregatedDataVariants::KEY_8)
mergeStreamsImpl(*result.key8, result.aggregates_pool, start_row, rows, key_columns, aggregate_columns, key_sizes, key);
else if (result.type == AggregatedDataVariants::KEY_16)
mergeStreamsImpl(*result.key16, result.aggregates_pool, start_row, rows, key_columns, aggregate_columns, key_sizes, key);
else if (result.type == AggregatedDataVariants::KEY_32)
mergeStreamsImpl(*result.key32, result.aggregates_pool, start_row, rows, key_columns, aggregate_columns, key_sizes, key);
else if (result.type == AggregatedDataVariants::KEY_64)
mergeStreamsImpl(*result.key64, result.aggregates_pool, start_row, rows, key_columns, aggregate_columns, key_sizes, key);
else if (result.type == AggregatedDataVariants::KEY_STRING)
mergeStreamsImpl(*result.key_string, result.aggregates_pool, start_row, rows, key_columns, aggregate_columns, key_sizes, key);
@ -818,7 +869,13 @@ void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result)
aggregate_functions[i]->destroy(res_data + offsets_of_aggregate_states[i]);
}
if (result.type == AggregatedDataVariants::KEY_64)
if (result.type == AggregatedDataVariants::KEY_8)
destroyImpl(*result.key8);
else if (result.type == AggregatedDataVariants::KEY_16)
destroyImpl(*result.key16);
else if (result.type == AggregatedDataVariants::KEY_32)
destroyImpl(*result.key32);
else if (result.type == AggregatedDataVariants::KEY_64)
destroyImpl(*result.key64);
else if (result.type == AggregatedDataVariants::KEY_STRING)
destroyImpl(*result.key_string);

View File

@ -48,7 +48,10 @@ void SplittingAggregator::execute(BlockInputStreamPtr stream, ManyAggregatedData
method = chooseAggregationMethod(key_columns, key_sizes);
/// Подготавливаем массивы, куда будут складываться ключи или хэши от ключей.
if (method == AggregatedDataVariants::KEY_64)
if (method == AggregatedDataVariants::KEY_8 /// TODO не использовать SplittingAggregator для маленьких ключей.
|| method == AggregatedDataVariants::KEY_16
|| method == AggregatedDataVariants::KEY_32
|| method == AggregatedDataVariants::KEY_64)
{
keys64.resize(rows);
}
@ -96,7 +99,7 @@ void SplittingAggregator::execute(BlockInputStreamPtr stream, ManyAggregatedData
pool.wait();
rethrowFirstException(exceptions);
rethrowFirstException(exceptions); /// TODO Заменить на future, packaged_task
/// Параллельно агрегируем в независимые хэш-таблицы
@ -150,14 +153,17 @@ void SplittingAggregator::calculateHashesThread(Block & block, size_t begin, siz
try
{
if (method == AggregatedDataVariants::KEY_64)
if (method == AggregatedDataVariants::KEY_8
|| method == AggregatedDataVariants::KEY_16
|| method == AggregatedDataVariants::KEY_32
|| method == AggregatedDataVariants::KEY_64)
{
const IColumn & column = *key_columns[0];
for (size_t i = begin; i < end; ++i)
{
keys64[i] = column.get64(i);
thread_nums[i] = intHash32<0xd1f93e3190506c7cULL>(keys64[i]) % threads;
keys64[i] = column.get64(i); /// TODO Убрать виртуальный вызов
thread_nums[i] = intHash32<0xd1f93e3190506c7cULL>(keys64[i]) % threads; /// TODO более эффективная хэш-функция
}
}
else if (method == AggregatedDataVariants::KEY_STRING)
@ -216,6 +222,45 @@ void SplittingAggregator::calculateHashesThread(Block & block, size_t begin, siz
}
template <typename FieldType>
void SplittingAggregator::aggregateOneNumber(AggregatedDataVariants & result, size_t thread_no, bool no_more_keys)
{
AggregatedDataWithUInt64Key & res = result.key64->data;
for (size_t i = 0; i < rows; ++i)
{
if (thread_nums[i] != thread_no)
continue;
/// Берём ключ
UInt64 key = keys64[i];
AggregatedDataWithUInt64Key::iterator it;
bool inserted;
if (!no_more_keys)
res.emplace(key, it, inserted);
else
{
inserted = false;
it = res.find(key);
if (res.end() == it)
continue;
}
if (inserted)
{
it->second = result.aggregates_pool->alloc(total_size_of_aggregate_states);
createAggregateStates(it->second);
}
/// Добавляем значения
for (size_t j = 0; j < aggregates_size; ++j)
aggregate_functions[j]->add(it->second + offsets_of_aggregate_states[j], &aggregate_columns[j][0], i);
}
}
void SplittingAggregator::aggregateThread(
Block & block, AggregatedDataVariants & result, size_t thread_no, ExceptionPtr & exception, MemoryTracker * memory_tracker)
{
@ -233,42 +278,14 @@ void SplittingAggregator::aggregateThread(
bool no_more_keys = max_rows_to_group_by && size_of_all_results > max_rows_to_group_by;
size_t old_result_size = result.size();
if (method == AggregatedDataVariants::KEY_64)
{
AggregatedDataWithUInt64Key & res = result.key64->data;
for (size_t i = 0; i < rows; ++i)
{
if (thread_nums[i] != thread_no)
continue;
/// Берём ключ
UInt64 key = keys64[i];
AggregatedDataWithUInt64Key::iterator it;
bool inserted;
if (!no_more_keys)
res.emplace(key, it, inserted);
else
{
inserted = false;
it = res.find(key);
if (res.end() == it)
continue;
}
if (inserted)
{
it->second = result.aggregates_pool->alloc(total_size_of_aggregate_states);
createAggregateStates(it->second);
}
/// Добавляем значения
for (size_t j = 0; j < aggregates_size; ++j)
aggregate_functions[j]->add(it->second + offsets_of_aggregate_states[j], &aggregate_columns[j][0], i);
}
}
if (method == AggregatedDataVariants::KEY_8)
aggregateOneNumber<UInt8>(result, thread_no, no_more_keys);
else if (method == AggregatedDataVariants::KEY_16)
aggregateOneNumber<UInt16>(result, thread_no, no_more_keys);
else if (method == AggregatedDataVariants::KEY_32)
aggregateOneNumber<UInt32>(result, thread_no, no_more_keys);
else if (method == AggregatedDataVariants::KEY_64)
aggregateOneNumber<UInt64>(result, thread_no, no_more_keys);
else if (method == AggregatedDataVariants::KEY_STRING)
{
AggregatedDataWithStringKey & res = result.key_string->data;

View File

@ -34,9 +34,9 @@ BlockInputStreams StorageSystemOne::read(
ColumnWithNameAndType col;
col.name = "dummy";
col.type = new DataTypeUInt8;
col.column = new ColumnConstUInt8(1, 0);
col.column = ColumnConstUInt8(1, 0).convertToFullColumn();
block.insert(col);
return BlockInputStreams(1, new OneBlockInputStream(block));
}

View File

@ -1 +1 @@
SELECT quantilesTiming(0.1, 0.5, 0.9)(materialize(dummy)) FROM remote('127.0.0.{1,2}', system, one) GROUP BY 1 WITH TOTALS
SELECT quantilesTiming(0.1, 0.5, 0.9)(dummy) FROM remote('127.0.0.{1,2}', system, one) GROUP BY 1 WITH TOTALS