2012-07-23 06:23:29 +00:00
|
|
|
|
#include <queue>
|
2012-07-24 18:17:44 +00:00
|
|
|
|
#include <iomanip>
|
|
|
|
|
|
|
|
|
|
#include <statdaemons/Stopwatch.h>
|
2012-07-23 06:23:29 +00:00
|
|
|
|
|
2011-09-04 01:42:14 +00:00
|
|
|
|
#include <DB/DataStreams/MergeSortingBlockInputStream.h>
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
|
{
|
|
|
|
|
|
2011-09-04 21:23:19 +00:00
|
|
|
|
Block MergeSortingBlockInputStream::readImpl()
|
2011-09-04 01:42:14 +00:00
|
|
|
|
{
|
2012-07-23 20:01:29 +00:00
|
|
|
|
/** Достаточно простой алгоритм:
|
2011-09-04 01:42:14 +00:00
|
|
|
|
* - прочитать в оперативку все блоки;
|
2012-07-23 20:01:29 +00:00
|
|
|
|
* - объединить их всех с помощью priority_queue;
|
2011-09-04 01:42:14 +00:00
|
|
|
|
*/
|
2012-03-05 02:34:20 +00:00
|
|
|
|
|
|
|
|
|
if (has_been_read)
|
|
|
|
|
return Block();
|
|
|
|
|
|
|
|
|
|
has_been_read = true;
|
2011-09-04 01:42:14 +00:00
|
|
|
|
|
2012-07-23 06:23:29 +00:00
|
|
|
|
Blocks blocks;
|
2011-09-04 01:42:14 +00:00
|
|
|
|
while (Block block = input->read())
|
|
|
|
|
blocks.push_back(block);
|
|
|
|
|
|
2012-07-24 18:29:45 +00:00
|
|
|
|
return merge(blocks);
|
2012-07-23 20:01:29 +00:00
|
|
|
|
|
|
|
|
|
#if 0
|
2011-09-04 01:42:14 +00:00
|
|
|
|
while (blocks.size() > 1)
|
|
|
|
|
{
|
|
|
|
|
for (Blocks::iterator it = blocks.begin(); it != blocks.end();)
|
|
|
|
|
{
|
|
|
|
|
Blocks::iterator next = it;
|
|
|
|
|
++next;
|
|
|
|
|
if (next == blocks.end())
|
|
|
|
|
break;
|
|
|
|
|
merge(*it, *next);
|
|
|
|
|
++it;
|
|
|
|
|
blocks.erase(it++);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return blocks.front();
|
2012-07-23 20:01:29 +00:00
|
|
|
|
#endif
|
2011-09-04 01:42:14 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2012-07-23 06:23:29 +00:00
|
|
|
|
namespace
|
|
|
|
|
{
|
|
|
|
|
typedef std::vector<const IColumn *> ConstColumnPlainPtrs;
|
|
|
|
|
typedef std::vector<IColumn *> ColumnPlainPtrs;
|
|
|
|
|
|
2012-07-23 20:01:29 +00:00
|
|
|
|
/// Курсор, позволяющий сравнивать соответствующие строки в разных блоках.
|
2012-07-23 06:23:29 +00:00
|
|
|
|
struct Cursor
|
|
|
|
|
{
|
2012-07-23 20:01:29 +00:00
|
|
|
|
ConstColumnPlainPtrs * all_columns;
|
|
|
|
|
ConstColumnPlainPtrs * sort_columns;
|
2012-07-23 06:23:29 +00:00
|
|
|
|
size_t sort_columns_size;
|
|
|
|
|
size_t pos;
|
|
|
|
|
size_t rows;
|
|
|
|
|
|
2012-07-23 20:01:29 +00:00
|
|
|
|
Cursor(ConstColumnPlainPtrs * all_columns_, ConstColumnPlainPtrs * sort_columns_, size_t pos_ = 0)
|
2012-07-23 06:23:29 +00:00
|
|
|
|
: all_columns(all_columns_), sort_columns(sort_columns_), sort_columns_size(sort_columns->size()),
|
2012-07-24 18:12:32 +00:00
|
|
|
|
pos(pos_), rows((*all_columns)[0]->size())
|
|
|
|
|
{
|
|
|
|
|
}
|
2012-07-23 06:23:29 +00:00
|
|
|
|
|
2012-07-23 20:01:29 +00:00
|
|
|
|
bool operator< (const Cursor & rhs) const
|
2012-07-23 06:23:29 +00:00
|
|
|
|
{
|
|
|
|
|
for (size_t i = 0; i < sort_columns_size; ++i)
|
|
|
|
|
{
|
2012-07-23 20:01:29 +00:00
|
|
|
|
int res = (*sort_columns)[i]->compareAt(pos, rhs.pos, *(*rhs.sort_columns)[i]);
|
2012-07-23 06:23:29 +00:00
|
|
|
|
if (res > 0)
|
|
|
|
|
return true;
|
|
|
|
|
if (res < 0)
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
2012-07-24 18:12:32 +00:00
|
|
|
|
bool isLast() const { return pos + 1 >= rows; }
|
2012-07-23 20:01:29 +00:00
|
|
|
|
Cursor next() const { return Cursor(all_columns, sort_columns, pos + 1); }
|
|
|
|
|
};
|
2012-07-23 06:23:29 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2012-07-23 20:01:29 +00:00
|
|
|
|
Block MergeSortingBlockInputStream::merge(Blocks & blocks)
|
2011-09-04 01:42:14 +00:00
|
|
|
|
{
|
2012-07-24 18:29:45 +00:00
|
|
|
|
Stopwatch watch;
|
|
|
|
|
LOG_DEBUG(log, "Merge sorting");
|
|
|
|
|
|
2011-09-04 01:42:14 +00:00
|
|
|
|
Block merged;
|
2012-07-23 06:23:29 +00:00
|
|
|
|
|
|
|
|
|
if (!blocks.size())
|
|
|
|
|
return merged;
|
|
|
|
|
|
2012-07-23 20:01:29 +00:00
|
|
|
|
if (blocks.size() == 1)
|
|
|
|
|
return blocks[0];
|
|
|
|
|
|
2012-07-23 06:23:29 +00:00
|
|
|
|
merged = blocks[0].cloneEmpty();
|
|
|
|
|
|
|
|
|
|
typedef std::priority_queue<Cursor> Queue;
|
|
|
|
|
Queue queue;
|
|
|
|
|
|
2012-07-24 17:46:55 +00:00
|
|
|
|
typedef std::vector<ConstColumnPlainPtrs> ConstColumnPlainPtrsForBlocks;
|
|
|
|
|
ConstColumnPlainPtrsForBlocks all_columns(blocks.size());
|
|
|
|
|
ConstColumnPlainPtrsForBlocks sort_columns(blocks.size());
|
|
|
|
|
|
|
|
|
|
size_t i = 0;
|
2012-07-23 06:23:29 +00:00
|
|
|
|
size_t num_columns = blocks[0].columns();
|
2012-07-24 17:46:55 +00:00
|
|
|
|
for (Blocks::const_iterator it = blocks.begin(); it != blocks.end(); ++it, ++i)
|
2012-07-23 06:23:29 +00:00
|
|
|
|
{
|
|
|
|
|
if (!*it)
|
|
|
|
|
continue;
|
|
|
|
|
|
2012-07-24 17:46:55 +00:00
|
|
|
|
for (size_t j = 0; j < num_columns; ++j)
|
|
|
|
|
all_columns[i].push_back(&*it->getByPosition(j).column);
|
2012-07-23 20:01:29 +00:00
|
|
|
|
|
2012-07-24 17:46:55 +00:00
|
|
|
|
for (size_t j = 0, size = description.size(); j < size; ++j)
|
2012-07-23 06:23:29 +00:00
|
|
|
|
{
|
2012-07-24 17:46:55 +00:00
|
|
|
|
size_t column_number = !description[j].column_name.empty()
|
|
|
|
|
? it->getPositionByName(description[j].column_name)
|
|
|
|
|
: description[j].column_number;
|
2012-07-23 06:23:29 +00:00
|
|
|
|
|
2012-07-24 17:46:55 +00:00
|
|
|
|
sort_columns[i].push_back(&*it->getByPosition(column_number).column);
|
2012-07-23 06:23:29 +00:00
|
|
|
|
}
|
|
|
|
|
|
2012-07-24 17:46:55 +00:00
|
|
|
|
queue.push(Cursor(&all_columns[i], &sort_columns[i]));
|
2012-07-23 06:23:29 +00:00
|
|
|
|
}
|
|
|
|
|
|
2012-07-23 20:01:29 +00:00
|
|
|
|
ColumnPlainPtrs merged_columns;
|
|
|
|
|
for (size_t i = 0; i < num_columns; ++i)
|
|
|
|
|
merged_columns.push_back(&*merged.getByPosition(i).column);
|
|
|
|
|
|
|
|
|
|
/// Вынимаем строки в нужном порядке и кладём в merged.
|
|
|
|
|
while (!queue.empty())
|
2012-07-23 06:23:29 +00:00
|
|
|
|
{
|
2012-07-23 20:01:29 +00:00
|
|
|
|
Cursor current = queue.top();
|
|
|
|
|
queue.pop();
|
2012-07-23 06:23:29 +00:00
|
|
|
|
|
2012-07-23 20:01:29 +00:00
|
|
|
|
for (size_t i = 0; i < num_columns; ++i)
|
|
|
|
|
merged_columns[i]->insert((*(*current.all_columns)[i])[current.pos]);
|
|
|
|
|
|
|
|
|
|
if (!current.isLast())
|
|
|
|
|
queue.push(current.next());
|
2012-07-23 06:23:29 +00:00
|
|
|
|
}
|
|
|
|
|
|
2012-07-24 18:29:45 +00:00
|
|
|
|
LOG_DEBUG(log, std::fixed << std::setprecision(2)
|
|
|
|
|
<< "Merge sorted " << blocks.size() << " blocks, " << merged.rows() << " rows"
|
|
|
|
|
<< " in " << watch.elapsedSeconds() << " sec., "
|
|
|
|
|
<< merged.rows() / watch.elapsedSeconds() << " rows/sec., "
|
|
|
|
|
<< merged.bytes() / 1000000.0 / watch.elapsedSeconds() << " MiB/sec.");
|
|
|
|
|
|
2012-07-23 06:23:29 +00:00
|
|
|
|
return merged;
|
2012-07-23 20:01:29 +00:00
|
|
|
|
}
|
2012-07-23 06:23:29 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void MergeSortingBlockInputStream::merge(Block & left, Block & right)
|
|
|
|
|
{
|
|
|
|
|
Block merged = left.cloneEmpty();
|
2011-09-04 01:42:14 +00:00
|
|
|
|
|
|
|
|
|
size_t left_size = left.rows();
|
|
|
|
|
size_t right_size = right.rows();
|
|
|
|
|
|
|
|
|
|
size_t left_pos = 0;
|
|
|
|
|
size_t right_pos = 0;
|
|
|
|
|
|
|
|
|
|
/// Все столбцы блоков.
|
2012-07-23 06:23:29 +00:00
|
|
|
|
ConstColumnPlainPtrs left_columns;
|
|
|
|
|
ConstColumnPlainPtrs right_columns;
|
|
|
|
|
ColumnPlainPtrs merged_columns;
|
2011-09-04 01:42:14 +00:00
|
|
|
|
|
|
|
|
|
/// Столбцы, по которым идёт сортировка.
|
2012-07-23 06:23:29 +00:00
|
|
|
|
ConstColumnPlainPtrs left_sort_columns;
|
|
|
|
|
ConstColumnPlainPtrs right_sort_columns;
|
2011-09-04 01:42:14 +00:00
|
|
|
|
|
|
|
|
|
size_t num_columns = left.columns();
|
|
|
|
|
for (size_t i = 0; i < num_columns; ++i)
|
|
|
|
|
{
|
|
|
|
|
left_columns.push_back(&*left.getByPosition(i).column);
|
|
|
|
|
right_columns.push_back(&*right.getByPosition(i).column);
|
2012-07-23 20:01:29 +00:00
|
|
|
|
merged_columns.push_back(&*merged.getByPosition(i).column);
|
2011-09-04 01:42:14 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0, size = description.size(); i < size; ++i)
|
|
|
|
|
{
|
2011-09-04 05:14:52 +00:00
|
|
|
|
size_t column_number = !description[i].column_name.empty()
|
|
|
|
|
? left.getPositionByName(description[i].column_name)
|
|
|
|
|
: description[i].column_number;
|
|
|
|
|
|
|
|
|
|
left_sort_columns.push_back(&*left.getByPosition(column_number).column);
|
|
|
|
|
right_sort_columns.push_back(&*right.getByPosition(column_number).column);
|
2011-09-04 01:42:14 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Объединяем.
|
|
|
|
|
while (right_pos < right_size || left_pos < left_size)
|
|
|
|
|
{
|
|
|
|
|
/// Откуда брать строку - из левого или из правого блока?
|
|
|
|
|
int res = 0;
|
|
|
|
|
if (right_pos == right_size)
|
|
|
|
|
res = -1;
|
|
|
|
|
else if (left_pos == left_size)
|
|
|
|
|
res = 1;
|
|
|
|
|
else
|
|
|
|
|
for (size_t i = 0, size = description.size(); i < size; ++i)
|
|
|
|
|
if ((res = description[i].direction * left_sort_columns[i]->compareAt(left_pos, right_pos, *right_sort_columns[i])))
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
/// Вставляем строку в объединённый блок.
|
|
|
|
|
if (res <= 0)
|
|
|
|
|
{
|
|
|
|
|
for (size_t i = 0; i < num_columns; ++i)
|
|
|
|
|
merged_columns[i]->insert((*left_columns[i])[left_pos]);
|
|
|
|
|
++left_pos;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
for (size_t i = 0; i < num_columns; ++i)
|
|
|
|
|
merged_columns[i]->insert((*right_columns[i])[right_pos]);
|
|
|
|
|
++right_pos;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
left = merged;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|