mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 23:52:03 +00:00
dbms: development [#CONV-2944].
This commit is contained in:
parent
8b372a6227
commit
cae3962251
@ -15,6 +15,8 @@ using Poco::SharedPtr;
|
||||
class IColumn;
|
||||
typedef SharedPtr<IColumn> ColumnPtr;
|
||||
typedef std::vector<ColumnPtr> Columns;
|
||||
typedef std::vector<IColumn *> ColumnPlainPtrs;
|
||||
typedef std::vector<const IColumn *> ConstColumnPlainPtrs;
|
||||
|
||||
|
||||
/** Интерфейс для хранения столбцов значений в оперативке.
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include <vector>
|
||||
|
||||
#include <DB/Core/Types.h>
|
||||
#include <DB/Columns/IColumn.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -25,5 +26,43 @@ struct SortColumnDescription
|
||||
/// Описание правила сортировки по нескольким столбцам.
|
||||
typedef std::vector<SortColumnDescription> SortDescription;
|
||||
|
||||
|
||||
/** Курсор, позволяющий сравнивать соответствующие строки в разных блоках.
|
||||
* Для использования в priority queue.
|
||||
*/
|
||||
struct SortCursor
|
||||
{
|
||||
ConstColumnPlainPtrs * all_columns;
|
||||
ConstColumnPlainPtrs * sort_columns;
|
||||
size_t sort_columns_size;
|
||||
size_t pos;
|
||||
size_t rows;
|
||||
const SortDescription * desc;
|
||||
|
||||
SortCursor(ConstColumnPlainPtrs * all_columns_, ConstColumnPlainPtrs * sort_columns_, const SortDescription * desc_, size_t pos_ = 0)
|
||||
: all_columns(all_columns_), sort_columns(sort_columns_), sort_columns_size(sort_columns->size()),
|
||||
pos(pos_), rows((*all_columns)[0]->size()), desc(desc_)
|
||||
{
|
||||
}
|
||||
|
||||
/** Инвертировано, чтобы из priority queue элементы вынимались в нужном порядке.
|
||||
*/
|
||||
bool operator< (const SortCursor & rhs) const
|
||||
{
|
||||
for (size_t i = 0; i < sort_columns_size; ++i)
|
||||
{
|
||||
int res = (*desc)[i].direction * (*sort_columns)[i]->compareAt(pos, rhs.pos, *(*rhs.sort_columns)[i]);
|
||||
if (res > 0)
|
||||
return true;
|
||||
if (res < 0)
|
||||
return false;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool isLast() const { return pos + 1 >= rows; }
|
||||
SortCursor next() const { return SortCursor(all_columns, sort_columns, desc, pos + 1); }
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
|
50
dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h
Normal file
50
dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h
Normal file
@ -0,0 +1,50 @@
|
||||
#pragma once
|
||||
|
||||
#include <queue>
|
||||
|
||||
#include <DB/Core/SortDescription.h>
|
||||
|
||||
#include <DB/DataStreams/IProfilingBlockInputStream.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/** Соединяет несколько сортированных потоков в один.
|
||||
*/
|
||||
class MergingSortedBlockInputStream : public IProfilingBlockInputStream
|
||||
{
|
||||
public:
|
||||
MergingSortedBlockInputStream(BlockInputStreams inputs_, SortDescription & description_, size_t max_block_size_)
|
||||
: inputs(inputs_), description(description_), max_block_size(max_block_size_), first(true),
|
||||
num_columns(0), source_blocks(inputs.size())
|
||||
{
|
||||
children.insert(children.end(), inputs.begin(), inputs.end());
|
||||
}
|
||||
|
||||
Block readImpl();
|
||||
|
||||
String getName() const { return "MergingSortedBlockInputStream"; }
|
||||
|
||||
BlockInputStreamPtr clone() { return new MergingSortedBlockInputStream(inputs, description, max_block_size); }
|
||||
|
||||
private:
|
||||
BlockInputStreams inputs;
|
||||
SortDescription description;
|
||||
size_t max_block_size;
|
||||
|
||||
bool first;
|
||||
|
||||
/// Текущие сливаемые блоки.
|
||||
size_t num_columns;
|
||||
Blocks source_blocks;
|
||||
|
||||
typedef std::vector<ConstColumnPlainPtrs> ConstColumnPlainPtrsForBlocks;
|
||||
ConstColumnPlainPtrsForBlocks all_columns;
|
||||
ConstColumnPlainPtrsForBlocks sort_columns;
|
||||
|
||||
typedef std::priority_queue<SortCursor> Queue;
|
||||
Queue queue;
|
||||
};
|
||||
|
||||
}
|
@ -29,45 +29,6 @@ Block MergeSortingBlockInputStream::readImpl()
|
||||
}
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
typedef std::vector<const IColumn *> ConstColumnPlainPtrs;
|
||||
typedef std::vector<IColumn *> ColumnPlainPtrs;
|
||||
|
||||
/// Курсор, позволяющий сравнивать соответствующие строки в разных блоках.
|
||||
struct Cursor
|
||||
{
|
||||
ConstColumnPlainPtrs * all_columns;
|
||||
ConstColumnPlainPtrs * sort_columns;
|
||||
size_t sort_columns_size;
|
||||
size_t pos;
|
||||
size_t rows;
|
||||
|
||||
Cursor(ConstColumnPlainPtrs * all_columns_, ConstColumnPlainPtrs * sort_columns_, size_t pos_ = 0)
|
||||
: all_columns(all_columns_), sort_columns(sort_columns_), sort_columns_size(sort_columns->size()),
|
||||
pos(pos_), rows((*all_columns)[0]->size())
|
||||
{
|
||||
}
|
||||
|
||||
bool operator< (const Cursor & rhs) const
|
||||
{
|
||||
for (size_t i = 0; i < sort_columns_size; ++i)
|
||||
{
|
||||
int res = (*sort_columns)[i]->compareAt(pos, rhs.pos, *(*rhs.sort_columns)[i]);
|
||||
if (res > 0)
|
||||
return true;
|
||||
if (res < 0)
|
||||
return false;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool isLast() const { return pos + 1 >= rows; }
|
||||
Cursor next() const { return Cursor(all_columns, sort_columns, pos + 1); }
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
Block MergeSortingBlockInputStream::merge(Blocks & blocks)
|
||||
{
|
||||
Stopwatch watch;
|
||||
@ -83,7 +44,7 @@ Block MergeSortingBlockInputStream::merge(Blocks & blocks)
|
||||
|
||||
merged = blocks[0].cloneEmpty();
|
||||
|
||||
typedef std::priority_queue<Cursor> Queue;
|
||||
typedef std::priority_queue<SortCursor> Queue;
|
||||
Queue queue;
|
||||
|
||||
typedef std::vector<ConstColumnPlainPtrs> ConstColumnPlainPtrsForBlocks;
|
||||
@ -109,7 +70,7 @@ Block MergeSortingBlockInputStream::merge(Blocks & blocks)
|
||||
sort_columns[i].push_back(&*it->getByPosition(column_number).column);
|
||||
}
|
||||
|
||||
queue.push(Cursor(&all_columns[i], &sort_columns[i]));
|
||||
queue.push(SortCursor(&all_columns[i], &sort_columns[i], &description));
|
||||
}
|
||||
|
||||
ColumnPlainPtrs merged_columns;
|
||||
@ -119,7 +80,7 @@ Block MergeSortingBlockInputStream::merge(Blocks & blocks)
|
||||
/// Вынимаем строки в нужном порядке и кладём в merged.
|
||||
while (!queue.empty())
|
||||
{
|
||||
Cursor current = queue.top();
|
||||
SortCursor current = queue.top();
|
||||
queue.pop();
|
||||
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
|
129
dbms/src/DataStreams/MergingSortedBlockInputStream.cpp
Normal file
129
dbms/src/DataStreams/MergingSortedBlockInputStream.cpp
Normal file
@ -0,0 +1,129 @@
|
||||
#include <queue>
|
||||
|
||||
#include <DB/DataStreams/MergingSortedBlockInputStream.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
Block MergingSortedBlockInputStream::readImpl()
|
||||
{
|
||||
if (!inputs.size())
|
||||
return Block();
|
||||
|
||||
if (inputs.size() == 1)
|
||||
return inputs[0]->read();
|
||||
|
||||
/// Читаем первые блоки, инициализируем очередь.
|
||||
if (first)
|
||||
{
|
||||
first = false;
|
||||
size_t i = 0;
|
||||
for (Blocks::iterator it = source_blocks.begin(); it != source_blocks.end(); ++it, ++i)
|
||||
{
|
||||
if (*it)
|
||||
continue;
|
||||
|
||||
*it = inputs[i]->read();
|
||||
|
||||
if (!*it)
|
||||
continue;
|
||||
|
||||
if (!num_columns)
|
||||
num_columns = source_blocks[0].columns();
|
||||
|
||||
for (size_t j = 0; j < num_columns; ++j)
|
||||
all_columns[i].push_back(&*it->getByPosition(j).column);
|
||||
|
||||
for (size_t j = 0, size = description.size(); j < size; ++j)
|
||||
{
|
||||
size_t column_number = !description[j].column_name.empty()
|
||||
? it->getPositionByName(description[j].column_name)
|
||||
: description[j].column_number;
|
||||
|
||||
sort_columns[i].push_back(&*it->getByPosition(column_number).column);
|
||||
}
|
||||
|
||||
queue.push(SortCursor(&all_columns[i], &sort_columns[i], &description));
|
||||
}
|
||||
}
|
||||
|
||||
/// Инициализируем результат.
|
||||
size_t merged_rows = 0;
|
||||
Block merged_block;
|
||||
ColumnPlainPtrs merged_columns;
|
||||
|
||||
/// Клонируем структуру первого непустого блока источников.
|
||||
Blocks::const_iterator it = source_blocks.begin();
|
||||
for (; it != source_blocks.end(); ++it)
|
||||
if (*it)
|
||||
merged_block = it->cloneEmpty();
|
||||
|
||||
/// Если все входные блоки пустые.
|
||||
if (it == source_blocks.end())
|
||||
return Block();
|
||||
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
merged_columns.push_back(&*merged_block.getByPosition(i).column);
|
||||
|
||||
/// Вынимаем строки в нужном порядке и кладём в merged_block, пока строк не больше max_block_size
|
||||
while (!queue.empty())
|
||||
{
|
||||
SortCursor current = queue.top();
|
||||
queue.pop();
|
||||
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
merged_columns[i]->insert((*(*current.all_columns)[i])[current.pos]);
|
||||
|
||||
if (!current.isLast())
|
||||
queue.push(current.next());
|
||||
else
|
||||
{
|
||||
/// Достаём из соответствующего источника следующий блок, если есть.
|
||||
/// Источник, соответствующий этому курсору, ищем с помощью небольшого хака (сравнивая адреса all_columns).
|
||||
|
||||
size_t i = 0;
|
||||
size_t size = all_columns.size();
|
||||
for (; i < size; ++i)
|
||||
{
|
||||
if (&all_columns[i] == current.all_columns)
|
||||
{
|
||||
source_blocks[i] = inputs[i]->read();
|
||||
if (source_blocks[i])
|
||||
{
|
||||
all_columns[i].clear();
|
||||
sort_columns[i].clear();
|
||||
|
||||
for (size_t j = 0; j < num_columns; ++j)
|
||||
all_columns[i].push_back(&*source_blocks[i].getByPosition(j).column);
|
||||
|
||||
for (size_t j = 0, size = description.size(); j < size; ++j)
|
||||
{
|
||||
size_t column_number = !description[j].column_name.empty()
|
||||
? source_blocks[i].getPositionByName(description[j].column_name)
|
||||
: description[j].column_number;
|
||||
|
||||
sort_columns[i].push_back(&*source_blocks[i].getByPosition(column_number).column);
|
||||
}
|
||||
|
||||
queue.push(SortCursor(&all_columns[i], &sort_columns[i], &description));
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (i == size)
|
||||
throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
++merged_rows;
|
||||
if (merged_rows == max_block_size)
|
||||
return merged_block;
|
||||
}
|
||||
|
||||
inputs.clear();
|
||||
return merged_block;
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user