dbms: fixed collations; preparation to external sorting [#METR-2944].

This commit is contained in:
Alexey Milovidov 2015-01-07 18:30:11 +03:00
parent 4eff949e22
commit 8620b80d99
10 changed files with 397 additions and 107 deletions

View File

@ -269,6 +269,7 @@ namespace ErrorCodes
UNION_ALL_COLUMN_ALIAS_MISMATCH,
CLIENT_OUTPUT_FORMAT_SPECIFIED,
UNKNOWN_BLOCK_INFO_FIELD,
BAD_COLLATION,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,

View File

@ -50,9 +50,9 @@ struct SortCursorImpl
ConstColumnPlainPtrs all_columns;
ConstColumnPlainPtrs sort_columns;
SortDescription desc;
size_t sort_columns_size;
size_t pos;
size_t rows;
size_t sort_columns_size = 0;
size_t pos = 0;
size_t rows = 0;
/** Порядок (что сравнивается), если сравниваемые столбцы равны.
* Даёт возможность предпочитать строки из нужного курсора.
@ -65,12 +65,12 @@ struct SortCursorImpl
NeedCollationFlags need_collation;
/** Есть ли хотя бы один столбец с Collator. */
bool has_collation;
bool has_collation = false;
SortCursorImpl() : sort_columns(0), pos(0), rows(0) {}
SortCursorImpl() {}
SortCursorImpl(const Block & block, const SortDescription & desc_, size_t order_ = 0)
: desc(desc_), sort_columns_size(desc.size()), order(order_), need_collation(desc.size()), has_collation(false)
: desc(desc_), sort_columns_size(desc.size()), order(order_), need_collation(desc.size())
{
reset(block);
}

View File

@ -1,5 +1,7 @@
#pragma once
#include <queue>
#include <Yandex/logger_useful.h>
#include <DB/Core/SortDescription.h>
@ -12,12 +14,52 @@ namespace DB
/** Соединяет поток сортированных по отдельности блоков в сортированный целиком поток.
*/
/** Часть реализации. Сливает набор готовых (уже прочитанных откуда-то) блоков.
* Возвращает результат слияния в виде потока блоков не более max_merged_block_size строк.
*/
class MergeSortingBlocksBlockInputStream : public IProfilingBlockInputStream
{
public:
/// limit - если не 0, то можно выдать только первые limit строк в сортированном порядке.
MergeSortingBlocksBlockInputStream(Blocks & blocks_, SortDescription & description_,
size_t max_merged_block_size_, size_t limit_ = 0);
String getName() const override { return "MergeSortingBlocksBlockInputStream"; }
String getID() const override { return getName(); }
protected:
Block readImpl() override;
private:
Blocks & blocks;
SortDescription description;
size_t max_merged_block_size;
size_t limit;
size_t total_merged_rows = 0;
using CursorImpls = std::vector<SortCursorImpl>;
CursorImpls cursors;
bool has_collation = false;
std::priority_queue<SortCursor> queue;
std::priority_queue<SortCursorWithCollation> queue_with_collation;
/** Делаем поддержку двух разных курсоров - с Collation и без.
* Шаблоны используем вместо полиморфных SortCursor'ов и вызовов виртуальных функций.
*/
template <typename TSortCursor>
Block mergeImpl(std::priority_queue<TSortCursor> & queue);
};
class MergeSortingBlockInputStream : public IProfilingBlockInputStream
{
public:
/// limit - если не 0, то можно выдать только первые limit строк в сортированном порядке.
MergeSortingBlockInputStream(BlockInputStreamPtr input_, SortDescription & description_, size_t limit_ = 0)
: description(description_), limit(limit_), has_been_read(false), log(&Logger::get("MergeSortingBlockInputStream"))
: description(description_), limit(limit_)
{
children.push_back(input_);
}
@ -43,22 +85,10 @@ private:
SortDescription description;
size_t limit;
/// Всё было прочитано.
bool has_been_read;
Logger * log = &Logger::get("MergeSortingBlockInputStream");
Logger * log;
/** Слить сразу много блоков с помощью priority queue.
*/
Block merge(Blocks & blocks);
typedef std::vector<SortCursorImpl> CursorImpls;
/** Делаем поддержку двух разных курсоров - с Collation и без.
* Шаблоны используем вместо полиморфных SortCursor'ов и вызовов виртуальных функций.
*/
template <typename TSortCursor>
Block mergeImpl(Blocks & block, CursorImpls & cursors);
Blocks blocks;
std::unique_ptr<MergeSortingBlocksBlockInputStream> impl;
};
}

View File

@ -20,8 +20,8 @@ class MergingSortedBlockInputStream : public IProfilingBlockInputStream
public:
/// limit - если не 0, то можно выдать только первые limit строк в сортированном порядке.
MergingSortedBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_, size_t max_block_size_, size_t limit_ = 0)
: description(description_), max_block_size(max_block_size_), limit(limit_), total_merged_rows(0), first(true), has_collation(false),
num_columns(0), source_blocks(inputs_.size()), cursors(inputs_.size()), log(&Logger::get("MergingSortedBlockInputStream"))
: description(description_), max_block_size(max_block_size_), limit(limit_),
source_blocks(inputs_.size()), cursors(inputs_.size())
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
}
@ -65,14 +65,13 @@ protected:
SortDescription description;
size_t max_block_size;
size_t limit;
size_t total_merged_rows;
size_t total_merged_rows = 0;
bool first;
bool has_collation;
bool first = true;
bool has_collation = false;
/// Текущие сливаемые блоки.
size_t num_columns;
size_t num_columns = 0;
Blocks source_blocks;
typedef std::vector<SortCursorImpl> CursorImpls;
@ -139,7 +138,7 @@ private:
template <typename TSortCursor>
void merge(Block & merged_block, ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue);
Logger * log;
Logger * log = &Logger::get("MergingSortedBlockInputStream");
};
}

View File

@ -16,7 +16,7 @@ using Poco::SharedPtr;
class OneBlockInputStream : public IProfilingBlockInputStream
{
public:
OneBlockInputStream(const Block & block_) : block(block_), has_been_read(false) {}
OneBlockInputStream(const Block & block_) : block(block_) {}
String getName() const override { return "OneBlockInputStream"; }
@ -39,7 +39,7 @@ protected:
private:
Block block;
bool has_been_read;
bool has_been_read = false;
};
}

View File

@ -1,14 +1,10 @@
#include <queue>
#include <iomanip>
#include <statdaemons/Stopwatch.h>
#include <DB/DataStreams/MergeSortingBlockInputStream.h>
namespace DB
{
Block MergeSortingBlockInputStream::readImpl()
{
/** Достаточно простой алгоритм:
@ -16,98 +12,83 @@ Block MergeSortingBlockInputStream::readImpl()
* - объединить их всех;
*/
if (has_been_read)
return Block();
/// Ещё не прочитали блоки.
if (!impl)
{
while (Block block = children.back()->read())
blocks.push_back(block);
has_been_read = true;
if (blocks.empty() || isCancelled())
return Block();
Blocks blocks;
while (Block block = children.back()->read())
blocks.push_back(block);
impl.reset(new MergeSortingBlocksBlockInputStream(blocks, description, DEFAULT_BLOCK_SIZE, limit));
}
if (isCancelled())
return Block();
return merge(blocks);
return impl->read();
}
Block MergeSortingBlockInputStream::merge(Blocks & blocks)
MergeSortingBlocksBlockInputStream::MergeSortingBlocksBlockInputStream(
Blocks & blocks_, SortDescription & description_, size_t max_merged_block_size_, size_t limit_)
: blocks(blocks_), description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_)
{
Blocks nonempty_blocks;
for (const auto & block : blocks)
{
if (block.rowsInFirstColumn() == 0)
continue;
nonempty_blocks.push_back(block);
cursors.emplace_back(block, description);
has_collation |= cursors.back().has_collation;
}
blocks.swap(nonempty_blocks);
if (!has_collation)
{
for (size_t i = 0; i < cursors.size(); ++i)
queue.push(SortCursor(&cursors[i]));
}
else
{
for (size_t i = 0; i < cursors.size(); ++i)
queue_with_collation.push(SortCursorWithCollation(&cursors[i]));
}
}
Block MergeSortingBlocksBlockInputStream::readImpl()
{
if (blocks.empty())
return Block();
if (blocks.size() == 1)
return blocks[0];
Stopwatch watch;
LOG_DEBUG(log, "Merge sorting");
CursorImpls cursors(blocks.size());
bool has_collation = false;
size_t nonempty_blocks = 0;
for (Blocks::const_iterator it = blocks.begin(); it != blocks.end(); ++it)
{
if (it->rowsInFirstColumn() == 0)
continue;
cursors[nonempty_blocks] = SortCursorImpl(*it, description);
has_collation |= cursors[nonempty_blocks].has_collation;
++nonempty_blocks;
Block res = blocks[0];
blocks.clear();
return res;
}
if (nonempty_blocks == 0)
return Block();
cursors.resize(nonempty_blocks);
Block merged;
if (has_collation)
merged = mergeImpl<SortCursorWithCollation>(blocks, cursors);
else
merged = mergeImpl<SortCursor>(blocks, cursors);
watch.stop();
size_t rows_before_merge = 0;
size_t bytes_before_merge = 0;
for (const auto & block : blocks)
{
rows_before_merge += block.rowsInFirstColumn();
bytes_before_merge += block.bytes();
}
LOG_DEBUG(log, std::fixed << std::setprecision(2)
<< "Merge sorted " << blocks.size() << " blocks, from " << rows_before_merge << " to " << merged.rows() << " rows"
<< " in " << watch.elapsedSeconds() << " sec., "
<< rows_before_merge / watch.elapsedSeconds() << " rows/sec., "
<< bytes_before_merge / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.");
return merged;
return !has_collation
? mergeImpl<SortCursor>(queue)
: mergeImpl<SortCursorWithCollation>(queue_with_collation);
}
template <typename TSortCursor>
Block MergeSortingBlockInputStream::mergeImpl(Blocks & blocks, CursorImpls & cursors)
Block MergeSortingBlocksBlockInputStream::mergeImpl(std::priority_queue<TSortCursor> & queue)
{
Block merged = blocks[0].cloneEmpty();
size_t num_columns = blocks[0].columns();
typedef std::priority_queue<TSortCursor> Queue;
Queue queue;
for (size_t i = 0; i < cursors.size(); ++i)
queue.push(TSortCursor(&cursors[i]));
ColumnPlainPtrs merged_columns;
for (size_t i = 0; i < num_columns; ++i) /// TODO: reserve
merged_columns.push_back(&*merged.getByPosition(i).column);
merged_columns.push_back(merged.getByPosition(i).column.get());
/// Вынимаем строки в нужном порядке и кладём в merged.
for (size_t row = 0; (!limit || row < limit) && !queue.empty(); ++row)
size_t merged_rows = 0;
while (!queue.empty())
{
TSortCursor current = queue.top();
queue.pop();
@ -120,9 +101,24 @@ Block MergeSortingBlockInputStream::mergeImpl(Blocks & blocks, CursorImpls & cur
current->next();
queue.push(current);
}
++total_merged_rows;
if (limit && total_merged_rows == limit)
{
blocks.clear();
return merged;
}
++merged_rows;
if (merged_rows == max_merged_block_size)
return merged;
}
if (merged_rows == 0)
merged.clear();
return merged;
}
}

View File

@ -852,7 +852,9 @@ void InterpreterSelectQuery::executeOrder(BlockInputStreams & streams)
++it)
{
String name = (*it)->children.front()->getColumnName();
order_descr.push_back(SortColumnDescription(name, typeid_cast<ASTOrderByElement &>(**it).direction));
const ASTOrderByElement & order_by_elem = typeid_cast<const ASTOrderByElement &>(**it);
order_descr.emplace_back(name, order_by_elem.direction, order_by_elem.collator);
}
/// Если есть LIMIT и нет DISTINCT - можно делать частичную сортировку.

View File

@ -9,7 +9,13 @@ typedef std::vector<std::pair<const IColumn *, SortColumnDescription> > ColumnsW
static inline bool needCollation(const IColumn * column, const SortColumnDescription & description)
{
return !description.collator.isNull() && column->getName() == "ColumnString";
if (description.collator.isNull())
return false;
if (column->getName() != "ColumnString")
throw Exception("Collations could be specified only for String columns.", ErrorCodes::BAD_COLLATION);
return true;
}

View File

@ -0,0 +1,250 @@
Ё
А
Я
а
я
ё
а
А
ё
Ё
я
Я
а
а
А
А
ё
ё
Ё
Ё
я
я
Я
Я
A
A
B
B
C
C
D
D
E
E
F
F
G
G
H
H
I
I
J
J
K
K
L
L
M
M
N
N
O
O
P
P
Q
R
R
S
S
T
T
U
U
V
V
W
X
Y
Y
Z
Z
a
a
b
b
c
c
d
d
e
e
f
f
g
g
h
h
i
i
j
j
k
k
l
l
m
m
n
n
o
o
p
p
q
r
r
s
s
t
t
u
u
v
v
w
x
y
y
z
z
Ç
Ö
Ü
ç
ö
ü
Ğ
ğ
İ
ı
Ş
ş
a
a
A
A
b
b
B
B
c
c
C
C
ç
Ç
d
d
D
D
e
e
E
E
f
f
F
F
g
g
G
G
ğ
Ğ
h
h
H
H
ı
I
I
i
i
İ
j
j
J
J
k
k
K
K
l
l
L
L
m
m
M
M
n
n
N
N
o
o
O
O
ö
Ö
p
p
P
P
q
Q
r
r
R
R
s
s
S
S
ş
Ş
t
t
T
T
u
u
U
U
ü
Ü
v
v
V
V
w
W
x
X
y
y
Y
Y
z
z
Z
Z
а 1
А 4
ё 3
Ё 6
я 2
Я 5

View File

@ -0,0 +1,6 @@
SELECT arrayJoin(['а', 'я', 'ё', 'А', 'Я', 'Ё']) AS x ORDER BY x;
SELECT arrayJoin(['а', 'я', 'ё', 'А', 'Я', 'Ё']) AS x ORDER BY x COLLATE 'ru';
SELECT arrayJoin(['а', 'я', 'ё', 'А', 'Я', 'Ё']) AS x FROM remote('127.0.0.{1,2}', system, one) ORDER BY x COLLATE 'ru';
SELECT arrayJoin(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'ç', 'd', 'e', 'f', 'g', 'ğ', 'h', 'ı', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'ö', 'p', 'r', 's', 'ş', 't', 'u', 'ü', 'v', 'y', 'z', 'A', 'B', 'C', 'Ç', 'D', 'E', 'F', 'G', 'Ğ', 'H', 'I', 'İ', 'J', 'K', 'L', 'M', 'N', 'O', 'Ö', 'P', 'R', 'S', 'Ş', 'T', 'U', 'Ü', 'V', 'Y', 'Z']) AS x ORDER BY x;
SELECT arrayJoin(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'ç', 'd', 'e', 'f', 'g', 'ğ', 'h', 'ı', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'ö', 'p', 'r', 's', 'ş', 't', 'u', 'ü', 'v', 'y', 'z', 'A', 'B', 'C', 'Ç', 'D', 'E', 'F', 'G', 'Ğ', 'H', 'I', 'İ', 'J', 'K', 'L', 'M', 'N', 'O', 'Ö', 'P', 'R', 'S', 'Ş', 'T', 'U', 'Ü', 'V', 'Y', 'Z']) AS x ORDER BY x COLLATE 'tr';
SELECT x, n FROM (SELECT ['а', 'я', 'ё', 'А', 'Я', 'Ё'] AS arr) ARRAY JOIN arr AS x, arrayEnumerate(arr) AS n ORDER BY x COLLATE 'ru', n;