ClickHouse/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp

142 lines
3.5 KiB
C++
Raw Normal View History

2012-07-23 06:23:29 +00:00
#include <queue>
2012-07-24 18:17:44 +00:00
#include <iomanip>
#include <statdaemons/Stopwatch.h>
2012-07-23 06:23:29 +00:00
2011-09-04 01:42:14 +00:00
#include <DB/DataStreams/MergeSortingBlockInputStream.h>
namespace DB
{
2011-09-04 21:23:19 +00:00
Block MergeSortingBlockInputStream::readImpl()
2011-09-04 01:42:14 +00:00
{
2012-07-23 20:01:29 +00:00
/** Достаточно простой алгоритм:
2011-09-04 01:42:14 +00:00
* - прочитать в оперативку все блоки;
2012-07-24 18:44:23 +00:00
* - объединить их всех;
2011-09-04 01:42:14 +00:00
*/
2012-03-05 02:34:20 +00:00
if (has_been_read)
return Block();
has_been_read = true;
2011-09-04 01:42:14 +00:00
2012-07-23 06:23:29 +00:00
Blocks blocks;
2011-09-04 01:42:14 +00:00
while (Block block = input->read())
blocks.push_back(block);
2012-07-24 18:52:26 +00:00
return merge(blocks);
2011-09-04 01:42:14 +00:00
}
2012-07-23 06:23:29 +00:00
namespace
{
typedef std::vector<const IColumn *> ConstColumnPlainPtrs;
typedef std::vector<IColumn *> ColumnPlainPtrs;
2012-07-23 20:01:29 +00:00
/// Курсор, позволяющий сравнивать соответствующие строки в разных блоках.
2012-07-23 06:23:29 +00:00
struct Cursor
{
2012-07-23 20:01:29 +00:00
ConstColumnPlainPtrs * all_columns;
ConstColumnPlainPtrs * sort_columns;
2012-07-23 06:23:29 +00:00
size_t sort_columns_size;
size_t pos;
size_t rows;
2012-07-23 20:01:29 +00:00
Cursor(ConstColumnPlainPtrs * all_columns_, ConstColumnPlainPtrs * sort_columns_, size_t pos_ = 0)
2012-07-23 06:23:29 +00:00
: all_columns(all_columns_), sort_columns(sort_columns_), sort_columns_size(sort_columns->size()),
2012-07-24 18:12:32 +00:00
pos(pos_), rows((*all_columns)[0]->size())
{
}
2012-07-23 06:23:29 +00:00
2012-07-23 20:01:29 +00:00
bool operator< (const Cursor & rhs) const
2012-07-23 06:23:29 +00:00
{
for (size_t i = 0; i < sort_columns_size; ++i)
{
2012-07-23 20:01:29 +00:00
int res = (*sort_columns)[i]->compareAt(pos, rhs.pos, *(*rhs.sort_columns)[i]);
2012-07-23 06:23:29 +00:00
if (res > 0)
return true;
if (res < 0)
return false;
}
return false;
}
2012-07-24 18:12:32 +00:00
bool isLast() const { return pos + 1 >= rows; }
2012-07-23 20:01:29 +00:00
Cursor next() const { return Cursor(all_columns, sort_columns, pos + 1); }
};
2012-07-23 06:23:29 +00:00
}
2012-07-23 20:01:29 +00:00
Block MergeSortingBlockInputStream::merge(Blocks & blocks)
2011-09-04 01:42:14 +00:00
{
2012-07-24 18:29:45 +00:00
Stopwatch watch;
LOG_DEBUG(log, "Merge sorting");
2011-09-04 01:42:14 +00:00
Block merged;
2012-07-23 06:23:29 +00:00
if (!blocks.size())
return merged;
2012-07-23 20:01:29 +00:00
if (blocks.size() == 1)
return blocks[0];
2012-07-23 06:23:29 +00:00
merged = blocks[0].cloneEmpty();
typedef std::priority_queue<Cursor> Queue;
Queue queue;
2012-07-24 17:46:55 +00:00
typedef std::vector<ConstColumnPlainPtrs> ConstColumnPlainPtrsForBlocks;
ConstColumnPlainPtrsForBlocks all_columns(blocks.size());
ConstColumnPlainPtrsForBlocks sort_columns(blocks.size());
size_t i = 0;
2012-07-23 06:23:29 +00:00
size_t num_columns = blocks[0].columns();
2012-07-24 17:46:55 +00:00
for (Blocks::const_iterator it = blocks.begin(); it != blocks.end(); ++it, ++i)
2012-07-23 06:23:29 +00:00
{
if (!*it)
continue;
2012-07-24 17:46:55 +00:00
for (size_t j = 0; j < num_columns; ++j)
all_columns[i].push_back(&*it->getByPosition(j).column);
2012-07-23 20:01:29 +00:00
2012-07-24 17:46:55 +00:00
for (size_t j = 0, size = description.size(); j < size; ++j)
2012-07-23 06:23:29 +00:00
{
2012-07-24 17:46:55 +00:00
size_t column_number = !description[j].column_name.empty()
? it->getPositionByName(description[j].column_name)
: description[j].column_number;
2012-07-23 06:23:29 +00:00
2012-07-24 17:46:55 +00:00
sort_columns[i].push_back(&*it->getByPosition(column_number).column);
2012-07-23 06:23:29 +00:00
}
2012-07-24 17:46:55 +00:00
queue.push(Cursor(&all_columns[i], &sort_columns[i]));
2012-07-23 06:23:29 +00:00
}
2012-07-23 20:01:29 +00:00
ColumnPlainPtrs merged_columns;
for (size_t i = 0; i < num_columns; ++i)
merged_columns.push_back(&*merged.getByPosition(i).column);
/// Вынимаем строки в нужном порядке и кладём в merged.
while (!queue.empty())
2012-07-23 06:23:29 +00:00
{
2012-07-23 20:01:29 +00:00
Cursor current = queue.top();
queue.pop();
2012-07-23 06:23:29 +00:00
2012-07-23 20:01:29 +00:00
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insert((*(*current.all_columns)[i])[current.pos]);
if (!current.isLast())
queue.push(current.next());
2012-07-23 06:23:29 +00:00
}
2012-07-24 18:29:45 +00:00
LOG_DEBUG(log, std::fixed << std::setprecision(2)
<< "Merge sorted " << blocks.size() << " blocks, " << merged.rows() << " rows"
<< " in " << watch.elapsedSeconds() << " sec., "
<< merged.rows() / watch.elapsedSeconds() << " rows/sec., "
<< merged.bytes() / 1000000.0 / watch.elapsedSeconds() << " MiB/sec.");
2012-07-23 06:23:29 +00:00
return merged;
2012-07-23 20:01:29 +00:00
}
2012-07-23 06:23:29 +00:00
2011-09-04 01:42:14 +00:00
}