From cae39622512022ba32b7be74989570331aa624b1 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 25 Jul 2012 19:53:43 +0000 Subject: [PATCH] dbms: development [#CONV-2944]. --- dbms/include/DB/Columns/IColumn.h | 2 + dbms/include/DB/Core/SortDescription.h | 39 ++++++ .../MergingSortedBlockInputStream.h | 50 +++++++ .../MergeSortingBlockInputStream.cpp | 45 +----- .../MergingSortedBlockInputStream.cpp | 129 ++++++++++++++++++ 5 files changed, 223 insertions(+), 42 deletions(-) create mode 100644 dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h create mode 100644 dbms/src/DataStreams/MergingSortedBlockInputStream.cpp diff --git a/dbms/include/DB/Columns/IColumn.h b/dbms/include/DB/Columns/IColumn.h index dee2973e533..b10b088ef3a 100644 --- a/dbms/include/DB/Columns/IColumn.h +++ b/dbms/include/DB/Columns/IColumn.h @@ -15,6 +15,8 @@ using Poco::SharedPtr; class IColumn; typedef SharedPtr ColumnPtr; typedef std::vector Columns; +typedef std::vector ColumnPlainPtrs; +typedef std::vector ConstColumnPlainPtrs; /** Интерфейс для хранения столбцов значений в оперативке. diff --git a/dbms/include/DB/Core/SortDescription.h b/dbms/include/DB/Core/SortDescription.h index 9aaf26b3c7c..d2c9163a522 100644 --- a/dbms/include/DB/Core/SortDescription.h +++ b/dbms/include/DB/Core/SortDescription.h @@ -3,6 +3,7 @@ #include #include +#include namespace DB @@ -25,5 +26,43 @@ struct SortColumnDescription /// Описание правила сортировки по нескольким столбцам. typedef std::vector SortDescription; + +/** Курсор, позволяющий сравнивать соответствующие строки в разных блоках. + * Для использования в priority queue. + */ +struct SortCursor +{ + ConstColumnPlainPtrs * all_columns; + ConstColumnPlainPtrs * sort_columns; + size_t sort_columns_size; + size_t pos; + size_t rows; + const SortDescription * desc; + + SortCursor(ConstColumnPlainPtrs * all_columns_, ConstColumnPlainPtrs * sort_columns_, const SortDescription * desc_, size_t pos_ = 0) + : all_columns(all_columns_), sort_columns(sort_columns_), sort_columns_size(sort_columns->size()), + pos(pos_), rows((*all_columns)[0]->size()), desc(desc_) + { + } + + /** Инвертировано, чтобы из priority queue элементы вынимались в нужном порядке. + */ + bool operator< (const SortCursor & rhs) const + { + for (size_t i = 0; i < sort_columns_size; ++i) + { + int res = (*desc)[i].direction * (*sort_columns)[i]->compareAt(pos, rhs.pos, *(*rhs.sort_columns)[i]); + if (res > 0) + return true; + if (res < 0) + return false; + } + return false; + } + + bool isLast() const { return pos + 1 >= rows; } + SortCursor next() const { return SortCursor(all_columns, sort_columns, desc, pos + 1); } +}; + } diff --git a/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h b/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h new file mode 100644 index 00000000000..5179c96a351 --- /dev/null +++ b/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h @@ -0,0 +1,50 @@ +#pragma once + +#include + +#include + +#include + + +namespace DB +{ + +/** Соединяет несколько сортированных потоков в один. + */ +class MergingSortedBlockInputStream : public IProfilingBlockInputStream +{ +public: + MergingSortedBlockInputStream(BlockInputStreams inputs_, SortDescription & description_, size_t max_block_size_) + : inputs(inputs_), description(description_), max_block_size(max_block_size_), first(true), + num_columns(0), source_blocks(inputs.size()) + { + children.insert(children.end(), inputs.begin(), inputs.end()); + } + + Block readImpl(); + + String getName() const { return "MergingSortedBlockInputStream"; } + + BlockInputStreamPtr clone() { return new MergingSortedBlockInputStream(inputs, description, max_block_size); } + +private: + BlockInputStreams inputs; + SortDescription description; + size_t max_block_size; + + bool first; + + /// Текущие сливаемые блоки. + size_t num_columns; + Blocks source_blocks; + + typedef std::vector ConstColumnPlainPtrsForBlocks; + ConstColumnPlainPtrsForBlocks all_columns; + ConstColumnPlainPtrsForBlocks sort_columns; + + typedef std::priority_queue Queue; + Queue queue; +}; + +} diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index 48e06f9eb00..02943a321f8 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -29,45 +29,6 @@ Block MergeSortingBlockInputStream::readImpl() } -namespace -{ - typedef std::vector ConstColumnPlainPtrs; - typedef std::vector ColumnPlainPtrs; - - /// Курсор, позволяющий сравнивать соответствующие строки в разных блоках. - struct Cursor - { - ConstColumnPlainPtrs * all_columns; - ConstColumnPlainPtrs * sort_columns; - size_t sort_columns_size; - size_t pos; - size_t rows; - - Cursor(ConstColumnPlainPtrs * all_columns_, ConstColumnPlainPtrs * sort_columns_, size_t pos_ = 0) - : all_columns(all_columns_), sort_columns(sort_columns_), sort_columns_size(sort_columns->size()), - pos(pos_), rows((*all_columns)[0]->size()) - { - } - - bool operator< (const Cursor & rhs) const - { - for (size_t i = 0; i < sort_columns_size; ++i) - { - int res = (*sort_columns)[i]->compareAt(pos, rhs.pos, *(*rhs.sort_columns)[i]); - if (res > 0) - return true; - if (res < 0) - return false; - } - return false; - } - - bool isLast() const { return pos + 1 >= rows; } - Cursor next() const { return Cursor(all_columns, sort_columns, pos + 1); } - }; -} - - Block MergeSortingBlockInputStream::merge(Blocks & blocks) { Stopwatch watch; @@ -83,7 +44,7 @@ Block MergeSortingBlockInputStream::merge(Blocks & blocks) merged = blocks[0].cloneEmpty(); - typedef std::priority_queue Queue; + typedef std::priority_queue Queue; Queue queue; typedef std::vector ConstColumnPlainPtrsForBlocks; @@ -109,7 +70,7 @@ Block MergeSortingBlockInputStream::merge(Blocks & blocks) sort_columns[i].push_back(&*it->getByPosition(column_number).column); } - queue.push(Cursor(&all_columns[i], &sort_columns[i])); + queue.push(SortCursor(&all_columns[i], &sort_columns[i], &description)); } ColumnPlainPtrs merged_columns; @@ -119,7 +80,7 @@ Block MergeSortingBlockInputStream::merge(Blocks & blocks) /// Вынимаем строки в нужном порядке и кладём в merged. while (!queue.empty()) { - Cursor current = queue.top(); + SortCursor current = queue.top(); queue.pop(); for (size_t i = 0; i < num_columns; ++i) diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp new file mode 100644 index 00000000000..f6880b3edc4 --- /dev/null +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp @@ -0,0 +1,129 @@ +#include + +#include + + +namespace DB +{ + +Block MergingSortedBlockInputStream::readImpl() +{ + if (!inputs.size()) + return Block(); + + if (inputs.size() == 1) + return inputs[0]->read(); + + /// Читаем первые блоки, инициализируем очередь. + if (first) + { + first = false; + size_t i = 0; + for (Blocks::iterator it = source_blocks.begin(); it != source_blocks.end(); ++it, ++i) + { + if (*it) + continue; + + *it = inputs[i]->read(); + + if (!*it) + continue; + + if (!num_columns) + num_columns = source_blocks[0].columns(); + + for (size_t j = 0; j < num_columns; ++j) + all_columns[i].push_back(&*it->getByPosition(j).column); + + for (size_t j = 0, size = description.size(); j < size; ++j) + { + size_t column_number = !description[j].column_name.empty() + ? it->getPositionByName(description[j].column_name) + : description[j].column_number; + + sort_columns[i].push_back(&*it->getByPosition(column_number).column); + } + + queue.push(SortCursor(&all_columns[i], &sort_columns[i], &description)); + } + } + + /// Инициализируем результат. + size_t merged_rows = 0; + Block merged_block; + ColumnPlainPtrs merged_columns; + + /// Клонируем структуру первого непустого блока источников. + Blocks::const_iterator it = source_blocks.begin(); + for (; it != source_blocks.end(); ++it) + if (*it) + merged_block = it->cloneEmpty(); + + /// Если все входные блоки пустые. + if (it == source_blocks.end()) + return Block(); + + for (size_t i = 0; i < num_columns; ++i) + merged_columns.push_back(&*merged_block.getByPosition(i).column); + + /// Вынимаем строки в нужном порядке и кладём в merged_block, пока строк не больше max_block_size + while (!queue.empty()) + { + SortCursor current = queue.top(); + queue.pop(); + + for (size_t i = 0; i < num_columns; ++i) + merged_columns[i]->insert((*(*current.all_columns)[i])[current.pos]); + + if (!current.isLast()) + queue.push(current.next()); + else + { + /// Достаём из соответствующего источника следующий блок, если есть. + /// Источник, соответствующий этому курсору, ищем с помощью небольшого хака (сравнивая адреса all_columns). + + size_t i = 0; + size_t size = all_columns.size(); + for (; i < size; ++i) + { + if (&all_columns[i] == current.all_columns) + { + source_blocks[i] = inputs[i]->read(); + if (source_blocks[i]) + { + all_columns[i].clear(); + sort_columns[i].clear(); + + for (size_t j = 0; j < num_columns; ++j) + all_columns[i].push_back(&*source_blocks[i].getByPosition(j).column); + + for (size_t j = 0, size = description.size(); j < size; ++j) + { + size_t column_number = !description[j].column_name.empty() + ? source_blocks[i].getPositionByName(description[j].column_name) + : description[j].column_number; + + sort_columns[i].push_back(&*source_blocks[i].getByPosition(column_number).column); + } + + queue.push(SortCursor(&all_columns[i], &sort_columns[i], &description)); + } + + break; + } + } + + if (i == size) + throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR); + } + + ++merged_rows; + if (merged_rows == max_block_size) + return merged_block; + } + + inputs.clear(); + return merged_block; +} + +}