ClickHouse/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp

142 lines
3.2 KiB
C++
Raw Normal View History

2012-07-25 19:53:43 +00:00
#include <queue>
2012-08-01 19:55:05 +00:00
#include <iomanip>
2012-07-25 19:53:43 +00:00
#include <DB/DataStreams/MergingSortedBlockInputStream.h>
namespace DB
{
2012-08-14 20:33:37 +00:00
void MergingSortedBlockInputStream::init(Block & merged_block, ColumnPlainPtrs & merged_columns)
{
2012-07-25 19:53:43 +00:00
/// Читаем первые блоки, инициализируем очередь.
if (first)
{
first = false;
size_t i = 0;
for (Blocks::iterator it = source_blocks.begin(); it != source_blocks.end(); ++it, ++i)
{
if (*it)
continue;
*it = inputs[i]->read();
if (!*it)
continue;
if (!num_columns)
num_columns = source_blocks[0].columns();
cursors[i] = SortCursorImpl(*it, description);
queue.push(SortCursor(&cursors[i]));
2012-07-25 19:53:43 +00:00
}
}
/// Инициализируем результат.
/// Клонируем структуру первого непустого блока источников.
2012-07-25 20:22:39 +00:00
{
2012-08-01 19:04:58 +00:00
Blocks::const_iterator it = source_blocks.begin();
for (; it != source_blocks.end(); ++it)
2012-07-25 20:22:39 +00:00
{
2012-08-01 19:04:58 +00:00
if (*it)
{
merged_block = it->cloneEmpty();
break;
}
2012-07-25 20:22:39 +00:00
}
2012-07-25 19:53:43 +00:00
2012-08-01 19:04:58 +00:00
/// Если все входные блоки пустые.
if (it == source_blocks.end())
2012-08-14 20:33:37 +00:00
return;
2012-08-01 19:04:58 +00:00
}
2012-08-14 20:33:37 +00:00
2012-07-25 19:53:43 +00:00
for (size_t i = 0; i < num_columns; ++i)
merged_columns.push_back(&*merged_block.getByPosition(i).column);
2012-08-14 20:33:37 +00:00
}
Block MergingSortedBlockInputStream::readImpl()
{
if (!inputs.size())
return Block();
if (inputs.size() == 1)
return inputs[0]->read();
size_t merged_rows = 0;
Block merged_block;
ColumnPlainPtrs merged_columns;
init(merged_block, merged_columns);
if (merged_columns.empty())
return Block();
2012-07-25 19:53:43 +00:00
/// Вынимаем строки в нужном порядке и кладём в merged_block, пока строк не больше max_block_size
while (!queue.empty())
{
SortCursor current = queue.top();
queue.pop();
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insert((*current->all_columns[i])[current->pos]);
2012-07-25 19:53:43 +00:00
if (!current->isLast())
{
current->next();
queue.push(current);
}
2012-07-25 19:53:43 +00:00
else
{
/// Достаём из соответствующего источника следующий блок, если есть.
2012-08-14 20:33:37 +00:00
fetchNextBlock(current);
2012-07-25 19:53:43 +00:00
}
++merged_rows;
if (merged_rows == max_block_size)
return merged_block;
}
2012-08-08 20:12:29 +00:00
inputs.clear();
return merged_block;
}
2012-08-14 20:33:37 +00:00
void MergingSortedBlockInputStream::fetchNextBlock(const SortCursor & current)
{
size_t i = 0;
size_t size = cursors.size();
for (; i < size; ++i)
{
if (&cursors[i] == current.impl)
{
source_blocks[i] = inputs[i]->read();
if (source_blocks[i])
{
cursors[i].reset(source_blocks[i]);
queue.push(SortCursor(&cursors[i]));
}
break;
}
}
if (i == size)
throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR);
}
2012-08-08 20:12:29 +00:00
void MergingSortedBlockInputStream::readSuffix()
{
2012-08-01 19:55:05 +00:00
const BlockStreamProfileInfo & profile_info = getInfo();
double seconds = profile_info.work_stopwatch.elapsedSeconds();
LOG_DEBUG(log, std::fixed << std::setprecision(2)
<< "Merge sorted " << profile_info.blocks << " blocks, " << profile_info.rows << " rows"
<< " in " << seconds << " sec., "
<< profile_info.rows / seconds << " rows/sec., "
<< profile_info.bytes / 1000000.0 / seconds << " MiB/sec.");
2012-07-25 19:53:43 +00:00
}
}