This commit is contained in:
Evgeniy Gatov 2015-02-13 18:13:09 +03:00
commit 92c5757ff1
11 changed files with 150 additions and 147 deletions

View File

@ -74,6 +74,37 @@ template <> struct CompareHelper<Float32> : public FloatCompareHelper<Float32> {
template <> struct CompareHelper<Float64> : public FloatCompareHelper<Float64> {};
/** Для реализации функции get64.
*/
template <typename T>
inline UInt64 unionCastToUInt64(T x) { return x; }
template <> inline UInt64 unionCastToUInt64(Float64 x)
{
union
{
Float64 src;
UInt64 res;
};
src = x;
return res;
}
template <> inline UInt64 unionCastToUInt64(Float32 x)
{
union
{
Float32 src;
UInt64 res;
};
res = 0;
src = x;
return res;
}
/** Шаблон столбцов, которые используют для хранения простой массив.
*/
template <typename T>
@ -191,7 +222,10 @@ public:
res = typename NearestFieldType<T>::Type(data[n]);
}
UInt64 get64(size_t n) const override;
UInt64 get64(size_t n) const override
{
return unionCastToUInt64(data[n]);
}
void insert(const Field & x) override
{
@ -361,38 +395,4 @@ protected:
};
template <typename T>
UInt64 ColumnVector<T>::get64(size_t n) const
{
return data[n];
}
template <>
inline UInt64 ColumnVector<Float64>::get64(size_t n) const
{
union
{
Float64 src;
UInt64 res;
};
src = data[n];
return res;
}
template <>
inline UInt64 ColumnVector<Float32>::get64(size_t n) const
{
union
{
Float32 src;
UInt64 res;
};
res = 0;
src = data[n];
return res;
}
}

View File

@ -110,3 +110,14 @@ DEFINE_HASH(DB::Float32)
DEFINE_HASH(DB::Float64)
#undef DEFINE_HASH
/// Разумно использовать для UInt8, UInt16 при достаточном размере хэш-таблицы.
struct TrivialHash
{
template <typename T>
size_t operator() (T key) const
{
return key;
}
};

View File

@ -181,6 +181,26 @@ struct HashTableGrower
};
/** При использовании в качестве Grower-а, превращает хэш-таблицу в что-то типа lookup-таблицы.
* Остаётся неоптимальность - в ячейках хранятся ключи.
* Также компилятору не удаётся полностью удалить код хождения по цепочке разрешения коллизий, хотя он не нужен.
* TODO Сделать полноценную lookup-таблицу.
*/
template <size_t key_bits>
struct HashTableFixedGrower
{
size_t bufSize() const { return 1 << key_bits; }
size_t place(size_t x) const { return x; }
/// Тут можно было бы написать __builtin_unreachable(), но компилятор не до конца всё оптимизирует, и получается менее эффективно.
size_t next(size_t pos) const { return pos + 1; }
bool overflow(size_t elems) const { return false; }
void increaseSize() { __builtin_unreachable(); }
void set(size_t num_elems) {}
void setBufSize(size_t buf_size_) {}
};
/** Если нужно хранить нулевой ключ отдельно - место для его хранения. */
template <bool need_zero_value_storage, typename Cell>
struct ZeroValueStorage;

View File

@ -59,68 +59,10 @@ typedef TwoLevelHashMapWithSavedHash<StringRef, AggregateDataPtr> AggregatedData
typedef TwoLevelHashMap<UInt128, AggregateDataPtr, UInt128HashCRC32> AggregatedDataWithKeys128TwoLevel;
typedef TwoLevelHashMap<UInt128, std::pair<StringRef*, AggregateDataPtr>, UInt128TrivialHash> AggregatedDataHashedTwoLevel;
/// Специализации для UInt8, UInt16.
struct TrivialHash
{
template <typename T>
size_t operator() (T key) const
{
return key;
}
};
/** Превращает хэш-таблицу в что-то типа lookup-таблицы. Остаётся неоптимальность - в ячейках хранятся ключи.
* Также компилятору не удаётся полностью удалить код хождения по цепочке разрешения коллизий, хотя он не нужен.
* TODO Переделать в полноценную lookup-таблицу.
*/
template <size_t key_bits>
struct HashTableFixedGrower
{
size_t bufSize() const { return 1 << key_bits; }
size_t place(size_t x) const { return x; }
/// Тут можно было бы написать __builtin_unreachable(), но компилятор не до конца всё оптимизирует, и получается менее эффективно.
size_t next(size_t pos) const { return pos + 1; }
bool overflow(size_t elems) const { return false; }
void increaseSize() { __builtin_unreachable(); }
void set(size_t num_elems) {}
void setBufSize(size_t buf_size_) {}
};
typedef HashMap<UInt64, AggregateDataPtr, TrivialHash, HashTableFixedGrower<8>> AggregatedDataWithUInt8Key;
typedef HashMap<UInt64, AggregateDataPtr, TrivialHash, HashTableFixedGrower<16>> AggregatedDataWithUInt16Key;
template <typename T>
inline UInt64 unionCastToUInt64(T x) { return x; }
template <> inline UInt64 unionCastToUInt64(Float64 x)
{
union
{
Float64 src;
UInt64 res;
};
src = x;
return res;
}
template <> inline UInt64 unionCastToUInt64(Float32 x)
{
union
{
Float32 src;
UInt64 res;
};
res = 0;
src = x;
return res;
}
/// Для случая, когда есть один числовой ключ.
template <typename FieldType, typename TData> /// UInt8/16/32/64 для любых типов соответствующей битности.
struct AggregationMethodOneNumber

View File

@ -94,9 +94,6 @@ private:
// Переименовать столбцы каждого запроса цепочки UNION ALL в такие же имена, как в первом запросе.
void renameColumns();
/// Является ли это первым запросом цепочки UNION ALL имеющей длниу >= 2.
bool isFirstSelectInsideUnionAll() const;
/** Из какой таблицы читать. JOIN-ы не поддерживаются.
*/
void getDatabaseAndTableNames(String & database_name, String & table_name);
@ -135,10 +132,8 @@ private:
std::unique_ptr<ExpressionAnalyzer> query_analyzer;
BlockInputStreams streams;
/** Цепочка UNION ALL может иметь длину 1 (в таком случае имеется просто один запрос SELECT)
* или больше. Этот флаг установлен, если это первый запрос, возможно единственный, этой цепочки.
*/
bool is_union_all_head;
/// Являемся ли мы первым запросом SELECT цепочки UNION ALL?
bool is_first_select_inside_union_all;
/// Следующий запрос SELECT в цепочке UNION ALL.
std::unique_ptr<InterpreterSelectQuery> next_select_in_union_all;

View File

@ -43,7 +43,27 @@ void InterpreterSelectQuery::init(BlockInputStreamPtr input, const Names & requi
throw Exception("Too deep subqueries. Maximum: " + toString(settings.limits.max_subquery_depth),
ErrorCodes::TOO_DEEP_SUBQUERIES);
if (isFirstSelectInsideUnionAll() && hasAsterisk())
if (is_first_select_inside_union_all)
{
/// Создать цепочку запросов SELECT.
InterpreterSelectQuery * interpreter = this;
ASTPtr tail = query.next_union_all;
query.next_union_all = nullptr;
while (!tail.isNull())
{
ASTPtr head = tail;
ASTSelectQuery & head_query = static_cast<ASTSelectQuery &>(*head);
tail = head_query.next_union_all;
head_query.next_union_all = nullptr;
interpreter->next_select_in_union_all.reset(new InterpreterSelectQuery(head, context, to_stage, subquery_depth, nullptr, false));
interpreter = interpreter->next_select_in_union_all.get();
}
}
if (is_first_select_inside_union_all && hasAsterisk())
{
basicInit(input, table_column_names);
@ -125,18 +145,13 @@ void InterpreterSelectQuery::basicInit(BlockInputStreamPtr input_, const NamesAn
if (input_)
streams.push_back(input_);
if (isFirstSelectInsideUnionAll())
if (is_first_select_inside_union_all)
{
/// Создаем цепочку запросов SELECT и проверяем, что результаты всех запросов SELECT cовместимые.
/// NOTE Мы можем безопасно применить static_cast вместо typeid_cast,
/// потому что знаем, что в цепочке UNION ALL имеются только деревья типа SELECT.
InterpreterSelectQuery * interpreter = this;
Block first = interpreter->getSampleBlock();
for (ASTPtr tree = query.next_union_all; !tree.isNull(); tree = (static_cast<ASTSelectQuery &>(*tree)).next_union_all)
/// Проверяем, что результаты всех запросов SELECT cовместимые.
Block first = getSampleBlock();
for (auto p = next_select_in_union_all.get(); p != nullptr; p = p->next_select_in_union_all.get())
{
interpreter->next_select_in_union_all.reset(new InterpreterSelectQuery(tree, context, to_stage, subquery_depth, nullptr, false));
interpreter = interpreter->next_select_in_union_all.get();
Block current = interpreter->getSampleBlock();
Block current = p->getSampleBlock();
if (!blocksHaveEqualStructure(first, current))
throw Exception("Result structures mismatch in the SELECT queries of the UNION ALL chain. Found result structure:\n\n" + current.dumpStructure()
+ "\n\nwhile expecting:\n\n" + first.dumpStructure() + "\n\ninstead",
@ -156,7 +171,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context
size_t subquery_depth_, BlockInputStreamPtr input_, bool is_union_all_head_)
: query_ptr(query_ptr_), query(typeid_cast<ASTSelectQuery &>(*query_ptr)),
context(context_), settings(context.getSettings()), to_stage(to_stage_), subquery_depth(subquery_depth_),
is_union_all_head(is_union_all_head_),
is_first_select_inside_union_all(is_union_all_head_ && !query.next_union_all.isNull()),
log(&Logger::get("InterpreterSelectQuery"))
{
init(input_);
@ -167,7 +182,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context
QueryProcessingStage::Enum to_stage_, size_t subquery_depth_, BlockInputStreamPtr input_)
: query_ptr(query_ptr_), query(typeid_cast<ASTSelectQuery &>(*query_ptr)),
context(context_), settings(context.getSettings()), to_stage(to_stage_), subquery_depth(subquery_depth_),
is_union_all_head(true),
is_first_select_inside_union_all(!query.next_union_all.isNull()),
log(&Logger::get("InterpreterSelectQuery"))
{
init(input_, required_column_names_);
@ -178,7 +193,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context
const NamesAndTypesList & table_column_names, QueryProcessingStage::Enum to_stage_, size_t subquery_depth_, BlockInputStreamPtr input_)
: query_ptr(query_ptr_), query(typeid_cast<ASTSelectQuery &>(*query_ptr)),
context(context_), settings(context.getSettings()), to_stage(to_stage_), subquery_depth(subquery_depth_),
is_union_all_head(true),
is_first_select_inside_union_all(!query.next_union_all.isNull()),
log(&Logger::get("InterpreterSelectQuery"))
{
init(input_, required_column_names_, table_column_names);
@ -189,11 +204,10 @@ bool InterpreterSelectQuery::hasAsterisk() const
if (query.hasAsterisk())
return true;
if (isFirstSelectInsideUnionAll())
for (IAST * tree = query.next_union_all.get(); tree != nullptr; tree = static_cast<ASTSelectQuery *>(tree)->next_union_all.get())
if (is_first_select_inside_union_all)
for (auto p = next_select_in_union_all.get(); p != nullptr; p = p->next_select_in_union_all.get())
{
const auto & next_query = static_cast<ASTSelectQuery &>(*tree);
if (next_query.hasAsterisk())
if (p->query.hasAsterisk())
return true;
}
@ -202,12 +216,9 @@ bool InterpreterSelectQuery::hasAsterisk() const
void InterpreterSelectQuery::renameColumns()
{
if (isFirstSelectInsideUnionAll())
for (IAST * tree = query.next_union_all.get(); tree != nullptr; tree = static_cast<ASTSelectQuery *>(tree)->next_union_all.get())
{
auto & ast = static_cast<ASTSelectQuery &>(*tree);
ast.renameColumns(query);
}
if (is_first_select_inside_union_all)
for (auto p = next_select_in_union_all.get(); p != nullptr; p = p->next_select_in_union_all.get())
p->query.renameColumns(query);
}
void InterpreterSelectQuery::rewriteExpressionList(const Names & required_column_names)
@ -215,27 +226,18 @@ void InterpreterSelectQuery::rewriteExpressionList(const Names & required_column
if (query.distinct)
return;
if (isFirstSelectInsideUnionAll())
for (IAST * tree = query.next_union_all.get(); tree != nullptr; tree = static_cast<ASTSelectQuery *>(tree)->next_union_all.get())
if (is_first_select_inside_union_all)
for (auto p = next_select_in_union_all.get(); p != nullptr; p = p->next_select_in_union_all.get())
{
auto & next_query = static_cast<ASTSelectQuery &>(*tree);
if (next_query.distinct)
if (p->query.distinct)
return;
}
query.rewriteSelectExpressionList(required_column_names);
if (isFirstSelectInsideUnionAll())
for (IAST * tree = query.next_union_all.get(); tree != nullptr; tree = static_cast<ASTSelectQuery *>(tree)->next_union_all.get())
{
auto & next_query = static_cast<ASTSelectQuery &>(*tree);
next_query.rewriteSelectExpressionList(required_column_names);
}
}
bool InterpreterSelectQuery::isFirstSelectInsideUnionAll() const
{
return is_union_all_head && !query.next_union_all.isNull();
if (is_first_select_inside_union_all)
for (auto p = next_select_in_union_all.get(); p != nullptr; p = p->next_select_in_union_all.get())
p->query.rewriteSelectExpressionList(required_column_names);
}
void InterpreterSelectQuery::getDatabaseAndTableNames(String & database_name, String & table_name)
@ -323,7 +325,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
const BlockInputStreams & InterpreterSelectQuery::executeWithoutUnion()
{
if (isFirstSelectInsideUnionAll())
if (is_first_select_inside_union_all)
{
executeSingleQuery();
for (auto p = next_select_in_union_all.get(); p != nullptr; p = p->next_select_in_union_all.get())

View File

@ -237,7 +237,7 @@ void Join::insertFromBlockImpl(Maps & maps, size_t rows, const ConstColumnPlainP
}
}
else
throw Exception("Illegal type of column when creating set with string key: " + column.getName(), ErrorCodes::ILLEGAL_COLUMN);
throw Exception("Illegal type of column when creating join with string key: " + column.getName(), ErrorCodes::ILLEGAL_COLUMN);
}
else if (type == Set::HASHED)
{

View File

@ -46,7 +46,7 @@ public:
};
struct TrivialHash
struct SimpleHash
{
size_t operator() (UInt64 x) const { return x; }
size_t operator() (StringRef x) const { return DB::parse<UInt64>(x.data); }
@ -65,7 +65,7 @@ int main(int argc, char ** argv)
typedef HashMapWithDump<
StringRef,
UInt64,
TrivialHash,
SimpleHash,
Grower,
HashTableAllocatorWithStackMemory<4 * 24> > Map;

View File

@ -0,0 +1,20 @@
1 2015-01-01 1 foo
1 2015-01-01 1 foo
1 2016-01-01 6 bar
1 2016-01-01 6 bar
2 2015-02-01 2 bar
2 2015-02-01 2 bar
2 2016-02-01 7 foo
2 2016-02-01 7 foo
3 2015-03-01 3 foo
3 2015-03-01 3 foo
3 2016-03-01 8 bar
3 2016-03-01 8 bar
4 2015-04-01 4 bar
4 2015-04-01 4 bar
4 2016-04-01 9 foo
4 2016-04-01 9 foo
5 2015-05-01 5 foo
5 2015-05-01 5 foo
5 2016-05-01 10 bar
5 2016-05-01 10 bar

View File

@ -0,0 +1,13 @@
DROP TABLE IF EXISTS test.report1;
DROP TABLE IF EXISTS test.report2;
CREATE TABLE test.report1(id UInt32, event_date Date, priority UInt32, description String) ENGINE = MergeTree(event_date, intHash32(id), (id, event_date, intHash32(id)), 8192);
CREATE TABLE test.report2(id UInt32, event_date Date, priority UInt32, description String) ENGINE = MergeTree(event_date, intHash32(id), (id, event_date, intHash32(id)), 8192);
INSERT INTO test.report1(id,event_date,priority,description) VALUES (1, '2015-01-01', 1, 'foo')(2, '2015-02-01', 2, 'bar')(3, '2015-03-01', 3, 'foo')(4, '2015-04-01', 4, 'bar')(5, '2015-05-01', 5, 'foo');
INSERT INTO test.report2(id,event_date,priority,description) VALUES (1, '2016-01-01', 6, 'bar')(2, '2016-02-01', 7, 'foo')(3, '2016-03-01', 8, 'bar')(4, '2016-04-01', 9, 'foo')(5, '2016-05-01', 10, 'bar');
SELECT * FROM (SELECT id, event_date, priority, description FROM remote('127.0.0.{1,2}', test, report1) UNION ALL SELECT id, event_date, priority, description FROM remote('127.0.0.{1,2}', test, report2)) ORDER BY id, event_date ASC;
DROP TABLE test.report1;
DROP TABLE test.report2;