dbms: merge sorting: probably better [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2012-07-27 20:19:15 +00:00
parent 60987d7ecd
commit 75a80c192f
4 changed files with 77 additions and 72 deletions

View File

@ -3,6 +3,7 @@
#include <vector>
#include <DB/Core/Types.h>
#include <DB/Core/Block.h>
#include <DB/Columns/IColumn.h>
@ -28,30 +29,70 @@ typedef std::vector<SortColumnDescription> SortDescription;
/** Курсор, позволяющий сравнивать соответствующие строки в разных блоках.
* Курсор двигается по одному блоку.
* Для использования в priority queue.
*/
struct SortCursor
struct SortCursorImpl
{
ConstColumnPlainPtrs * all_columns;
ConstColumnPlainPtrs * sort_columns;
ConstColumnPlainPtrs all_columns;
ConstColumnPlainPtrs sort_columns;
SortDescription desc;
size_t sort_columns_size;
size_t pos;
size_t rows;
const SortDescription * desc;
SortCursorImpl() {}
SortCursor(ConstColumnPlainPtrs * all_columns_, ConstColumnPlainPtrs * sort_columns_, const SortDescription * desc_, size_t pos_ = 0)
: all_columns(all_columns_), sort_columns(sort_columns_), sort_columns_size(sort_columns->size()),
pos(pos_), rows((*all_columns)[0]->size()), desc(desc_)
SortCursorImpl(const Block & block, const SortDescription & desc_)
: desc(desc_), sort_columns_size(desc.size())
{
reset(block);
}
/** Инвертировано, чтобы из priority queue элементы вынимались в нужном порядке.
*/
/// Установить курсор в начало нового блока.
void reset(const Block & block)
{
all_columns.clear();
sort_columns.clear();
size_t num_columns = block.columns();
for (size_t j = 0; j < num_columns; ++j)
all_columns.push_back(&*block.getByPosition(j).column);
for (size_t j = 0, size = desc.size(); j < size; ++j)
{
size_t column_number = !desc[j].column_name.empty()
? block.getPositionByName(desc[j].column_name)
: desc[j].column_number;
sort_columns.push_back(&*block.getByPosition(column_number).column);
}
pos = 0;
rows = all_columns[0]->size();
}
bool isLast() const { return pos + 1 >= rows; }
void next() { ++pos; }
};
/// Для лёгкости копирования.
struct SortCursor
{
SortCursorImpl * impl;
SortCursor(SortCursorImpl * impl_) : impl(impl_) {}
SortCursorImpl * operator-> () { return impl; }
const SortCursorImpl * operator-> () const { return impl; }
/// Инвертировано, чтобы из priority queue элементы вынимались в нужном порядке.
bool operator< (const SortCursor & rhs) const
{
for (size_t i = 0; i < sort_columns_size; ++i)
for (size_t i = 0; i < impl->sort_columns_size; ++i)
{
int res = (*desc)[i].direction * (*sort_columns)[i]->compareAt(pos, rhs.pos, *(*rhs.sort_columns)[i]);
int res = impl->desc[i].direction * impl->sort_columns[i]->compareAt(impl->pos, rhs.impl->pos, *(rhs.impl->sort_columns[i]));
if (res > 0)
return true;
if (res < 0)
@ -59,9 +100,6 @@ struct SortCursor
}
return false;
}
bool isLast() const { return pos + 1 >= rows; }
SortCursor next() const { return SortCursor(all_columns, sort_columns, desc, pos + 1); }
};
}

View File

@ -17,7 +17,7 @@ class MergingSortedBlockInputStream : public IProfilingBlockInputStream
public:
MergingSortedBlockInputStream(BlockInputStreams inputs_, SortDescription & description_, size_t max_block_size_)
: inputs(inputs_), description(description_), max_block_size(max_block_size_), first(true),
num_columns(0), source_blocks(inputs.size()), all_columns(inputs.size()), sort_columns(inputs.size())
num_columns(0), source_blocks(inputs.size()), cursors(inputs.size())
{
children.insert(children.end(), inputs.begin(), inputs.end());
}
@ -39,9 +39,8 @@ private:
size_t num_columns;
Blocks source_blocks;
typedef std::vector<ConstColumnPlainPtrs> ConstColumnPlainPtrsForBlocks;
ConstColumnPlainPtrsForBlocks all_columns;
ConstColumnPlainPtrsForBlocks sort_columns;
typedef std::vector<SortCursorImpl> CursorImpls;
CursorImpls cursors;
typedef std::priority_queue<SortCursor> Queue;
Queue queue;

View File

@ -57,9 +57,8 @@ Block MergeSortingBlockInputStream::merge(Blocks & blocks)
typedef std::priority_queue<SortCursor> Queue;
Queue queue;
typedef std::vector<ConstColumnPlainPtrs> ConstColumnPlainPtrsForBlocks;
ConstColumnPlainPtrsForBlocks all_columns(blocks.size());
ConstColumnPlainPtrsForBlocks sort_columns(blocks.size());
typedef std::vector<SortCursorImpl> CursorImpls;
CursorImpls cursors(blocks.size());
size_t i = 0;
size_t num_columns = blocks[0].columns();
@ -68,19 +67,8 @@ Block MergeSortingBlockInputStream::merge(Blocks & blocks)
if (!*it)
continue;
for (size_t j = 0; j < num_columns; ++j)
all_columns[i].push_back(&*it->getByPosition(j).column);
for (size_t j = 0, size = description.size(); j < size; ++j)
{
size_t column_number = !description[j].column_name.empty()
? it->getPositionByName(description[j].column_name)
: description[j].column_number;
sort_columns[i].push_back(&*it->getByPosition(column_number).column);
}
queue.push(SortCursor(&all_columns[i], &sort_columns[i], &description));
cursors[i] = SortCursorImpl(*it, description);
queue.push(SortCursor(&cursors[i]));
}
ColumnPlainPtrs merged_columns;
@ -94,10 +82,13 @@ Block MergeSortingBlockInputStream::merge(Blocks & blocks)
queue.pop();
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insert((*(*current.all_columns)[i])[current.pos]);
merged_columns[i]->insert((*current->all_columns[i])[current->pos]);
if (!current.isLast())
queue.push(current.next());
if (!current->isLast())
{
current->next();
queue.push(current);
}
}
LOG_DEBUG(log, std::fixed << std::setprecision(2)

View File

@ -32,19 +32,8 @@ Block MergingSortedBlockInputStream::readImpl()
if (!num_columns)
num_columns = source_blocks[0].columns();
for (size_t j = 0; j < num_columns; ++j)
all_columns[i].push_back(&*it->getByPosition(j).column);
for (size_t j = 0, size = description.size(); j < size; ++j)
{
size_t column_number = !description[j].column_name.empty()
? it->getPositionByName(description[j].column_name)
: description[j].column_number;
sort_columns[i].push_back(&*it->getByPosition(column_number).column);
}
queue.push(SortCursor(&all_columns[i], &sort_columns[i], &description));
cursors[i] = SortCursorImpl(*it, description);
queue.push(SortCursor(&cursors[i]));
}
}
@ -78,40 +67,28 @@ Block MergingSortedBlockInputStream::readImpl()
queue.pop();
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insert((*(*current.all_columns)[i])[current.pos]);
merged_columns[i]->insert((*current->all_columns[i])[current->pos]);
if (!current.isLast())
queue.push(current.next());
if (!current->isLast())
{
current->next();
queue.push(current);
}
else
{
/// Достаём из соответствующего источника следующий блок, если есть.
/// Источник, соответствующий этому курсору, ищем с помощью небольшого хака (сравнивая адреса all_columns).
size_t i = 0;
size_t size = all_columns.size();
size_t size = cursors.size();
for (; i < size; ++i)
{
if (&all_columns[i] == current.all_columns)
if (&cursors[i] == current.impl)
{
source_blocks[i] = inputs[i]->read();
if (source_blocks[i])
{
all_columns[i].clear();
sort_columns[i].clear();
for (size_t j = 0; j < num_columns; ++j)
all_columns[i].push_back(&*source_blocks[i].getByPosition(j).column);
for (size_t j = 0, size = description.size(); j < size; ++j)
{
size_t column_number = !description[j].column_name.empty()
? source_blocks[i].getPositionByName(description[j].column_name)
: description[j].column_number;
sort_columns[i].push_back(&*source_blocks[i].getByPosition(column_number).column);
}
queue.push(SortCursor(&all_columns[i], &sort_columns[i], &description));
cursors[i].reset(*it);
queue.push(SortCursor(&cursors[i]));
}
break;