dbms: development [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2011-09-04 01:42:14 +00:00
parent 971c98f03a
commit ad3ecadcf9
6 changed files with 189 additions and 15 deletions

View File

@ -0,0 +1,23 @@
#pragma once
#include <vector>
namespace DB
{
/// Описание правила сортировки по одному столбцу.
struct SortColumnDescription
{
size_t column_number; /// Номер столбца
int direction; /// 1 - по возрастанию, -1 - по убыванию.
SortColumnDescription(size_t column_number_, int direction_)
: column_number(column_number_), direction(direction_) {}
};
/// Описание правила сортировки по нескольким столбцам.
typedef std::vector<SortColumnDescription> SortDescription;
}

View File

@ -0,0 +1,28 @@
#pragma once
#include <DB/Core/SortDescription.h>
#include <DB/DataStreams/IBlockInputStream.h>
namespace DB
{
/** Соединяет поток сортированных по отдельности блоков в сортированный целиком поток.
*/
class MergeSortingBlockInputStream : public IBlockInputStream
{
public:
MergeSortingBlockInputStream(BlockInputStreamPtr input_, SortDescription & description_)
: input(input_), description(description_) {}
Block read();
private:
BlockInputStreamPtr input;
SortDescription description;
void merge(Block & left, Block & right);
};
}

View File

@ -1,6 +1,6 @@
#pragma once
#include <DB/Core/ColumnNumbers.h>
#include <DB/Core/SortDescription.h>
#include <DB/DataStreams/IBlockInputStream.h>
@ -9,19 +9,19 @@ namespace DB
{
/** Сортирует каждый блок по отдельности по значениям указанных столбцов.
* На данный момент, используется сильно неоптимальный алгоритм.
* На данный момент, используется не очень оптимальный алгоритм.
*/
class PartialSortingBlockInputStream : public IBlockInputStream
{
public:
PartialSortingBlockInputStream(BlockInputStreamPtr input_, ColumnNumbers & column_numbers_)
: input(input_), column_numbers(column_numbers_) {}
PartialSortingBlockInputStream(BlockInputStreamPtr input_, SortDescription & description_)
: input(input_), description(description_) {}
Block read();
private:
BlockInputStreamPtr input;
ColumnNumbers column_numbers;
SortDescription description;
};
}

View File

@ -0,0 +1,115 @@
#include <DB/DataStreams/MergeSortingBlockInputStream.h>
namespace DB
{
Block MergeSortingBlockInputStream::read()
{
/** На данный момент - очень простой алгоритм:
* - прочитать в оперативку все блоки;
* - объединять по два соседних блока;
*/
typedef std::list<Block> Blocks;
Blocks blocks;
while (Block block = input->read())
blocks.push_back(block);
if (blocks.empty())
return Block();
while (blocks.size() > 1)
{
for (Blocks::iterator it = blocks.begin(); it != blocks.end();)
{
Blocks::iterator next = it;
++next;
if (next == blocks.end())
break;
merge(*it, *next);
++it;
blocks.erase(it++);
}
}
return blocks.front();
}
void MergeSortingBlockInputStream::merge(Block & left, Block & right)
{
Block merged;
size_t left_size = left.rows();
size_t right_size = right.rows();
size_t left_pos = 0;
size_t right_pos = 0;
typedef std::vector<const IColumn *> ConstColumns;
typedef std::vector<IColumn *> Columns;
/// Все столбцы блоков.
ConstColumns left_columns;
ConstColumns right_columns;
Columns merged_columns;
/// Столбцы, по которым идёт сортировка.
ConstColumns left_sort_columns;
ConstColumns right_sort_columns;
size_t num_columns = left.columns();
for (size_t i = 0; i < num_columns; ++i)
{
ColumnWithNameAndType col;
col.name = left.getByPosition(i).name;
col.type = left.getByPosition(i).type;
col.column = left.getByPosition(i).column->cloneEmpty();
merged.insert(col);
merged_columns.push_back(&*col.column);
left_columns.push_back(&*left.getByPosition(i).column);
right_columns.push_back(&*right.getByPosition(i).column);
}
for (size_t i = 0, size = description.size(); i < size; ++i)
{
left_sort_columns.push_back(&*left.getByPosition(description[i].column_number).column);
right_sort_columns.push_back(&*right.getByPosition(description[i].column_number).column);
}
/// Объединяем.
while (right_pos < right_size || left_pos < left_size)
{
/// Откуда брать строку - из левого или из правого блока?
int res = 0;
if (right_pos == right_size)
res = -1;
else if (left_pos == left_size)
res = 1;
else
for (size_t i = 0, size = description.size(); i < size; ++i)
if ((res = description[i].direction * left_sort_columns[i]->compareAt(left_pos, right_pos, *right_sort_columns[i])))
break;
/// Вставляем строку в объединённый блок.
if (res <= 0)
{
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insert((*left_columns[i])[left_pos]);
++left_pos;
}
else
{
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insert((*right_columns[i])[right_pos]);
++right_pos;
}
}
left = merged;
}
}

View File

@ -6,20 +6,20 @@ namespace DB
struct PartialSortingLess
{
typedef std::vector<const IColumn *> Columns;
typedef std::vector<std::pair<const IColumn *, int> > Columns;
Columns columns;
PartialSortingLess(const Block & block, const ColumnNumbers & column_numbers)
PartialSortingLess(const Block & block, const SortDescription & description)
{
for (size_t i = 0, size = column_numbers.size(); i < size; ++i)
columns.push_back(&*block.getByPosition(column_numbers[i]).column);
for (size_t i = 0, size = description.size(); i < size; ++i)
columns.push_back(std::make_pair(&*block.getByPosition(description[i].column_number).column, description[i].direction));
}
bool operator() (size_t a, size_t b) const
{
for (Columns::const_iterator it = columns.begin(); it != columns.end(); ++it)
{
int res = (*it)->compareAt(a, b, **it);
int res = it->second * it->first->compareAt(a, b, *it->first);
if (res < 0)
return true;
else if (res > 0)
@ -41,7 +41,7 @@ Block PartialSortingBlockInputStream::read()
for (size_t i = 0; i < size; ++i)
perm[i] = i;
PartialSortingLess less(res, column_numbers);
PartialSortingLess less(res, description);
std::sort(perm.begin(), perm.end(), less);
size_t columns = res.columns();

View File

@ -18,6 +18,7 @@
#include <DB/DataStreams/LimitBlockInputStream.h>
#include <DB/DataStreams/PartialSortingBlockInputStream.h>
#include <DB/DataStreams/MergeSortingBlockInputStream.h>
#include <DB/DataStreams/ProfilingBlockInputStream.h>
#include <DB/DataStreams/TabSeparatedRowOutputStream.h>
#include <DB/DataStreams/copyData.h>
@ -148,16 +149,21 @@ int main(int argc, char ** argv)
((*names_and_types_map)["IsLink"])
;
DB::ColumnNumbers sort_columns;
sort_columns.push_back(1);
sort_columns.push_back(2);
DB::SortDescription sort_columns;
// sort_columns.push_back(DB::SortColumnDescription(1, -1));
// sort_columns.push_back(DB::SortColumnDescription(2, 1));
sort_columns.push_back(DB::SortColumnDescription(0, 1));
// sort_columns.push_back(DB::SortColumnDescription(3, 1));
Poco::SharedPtr<DB::IBlockInputStream> in = table.read(column_names, 0);
Poco::SharedPtr<DB::IBlockInputStream> in = table.read(column_names, 0, argc == 2 ? atoi(argv[1]) : 1048576);
Poco::SharedPtr<DB::ProfilingBlockInputStream> profiling1 = new DB::ProfilingBlockInputStream(in);
in = profiling1;
in = new DB::PartialSortingBlockInputStream(in, sort_columns);
Poco::SharedPtr<DB::ProfilingBlockInputStream> profiling2 = new DB::ProfilingBlockInputStream(in);
in = profiling2;
in = new DB::MergeSortingBlockInputStream(in, sort_columns);
Poco::SharedPtr<DB::ProfilingBlockInputStream> profiling3 = new DB::ProfilingBlockInputStream(in);
in = profiling3;
//in = new DB::LimitBlockInputStream(in, 10);
DB::WriteBufferFromOStream ob(std::cout);
@ -169,6 +175,8 @@ int main(int argc, char ** argv)
profiling1->getInfo().print(std::cerr);
std::cerr << std::endl << "Sorting: " << std::endl;
profiling2->getInfo().print(std::cerr);
std::cerr << std::endl << "Merging: " << std::endl;
profiling3->getInfo().print(std::cerr);
}
catch (const DB::Exception & e)
{