ClickHouse/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp
2018-08-10 07:02:56 +03:00

119 lines
3.4 KiB
C++

#include <DataStreams/ReplacingSortedBlockInputStream.h>
#include <Columns/ColumnsNumber.h>
#include <common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
void ReplacingSortedBlockInputStream::insertRow(MutableColumns & merged_columns, size_t & merged_rows)
{
if (out_row_sources_buf)
{
/// true flag value means "skip row"
current_row_sources[max_pos].setSkipFlag(false);
out_row_sources_buf->write(reinterpret_cast<const char *>(current_row_sources.data()),
current_row_sources.size() * sizeof(RowSourcePart));
current_row_sources.resize(0);
}
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*(*selected_row.columns)[i], selected_row.row_num);
}
Block ReplacingSortedBlockInputStream::readImpl()
{
if (finished)
return Block();
MutableColumns merged_columns;
init(merged_columns);
if (has_collation)
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR);
if (merged_columns.empty())
return Block();
merge(merged_columns, queue_without_collation);
return header.cloneWithColumns(std::move(merged_columns));
}
void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue)
{
size_t merged_rows = 0;
/// Take the rows in needed order and put them into `merged_columns` until rows no more than `max_block_size`
while (!queue.empty())
{
SortCursor current = queue.top();
if (current_key.empty())
setPrimaryKeyRef(current_key, current);
setPrimaryKeyRef(next_key, current);
bool key_differs = next_key != current_key;
/// if there are enough rows and the last one is calculated completely
if (key_differs && merged_rows >= max_block_size)
return;
queue.pop();
if (key_differs)
{
/// Write the data for the previous primary key.
insertRow(merged_columns, merged_rows);
selected_row.reset();
current_key.swap(next_key);
}
/// Initially, skip all rows. Unskip last on insert.
size_t current_pos = current_row_sources.size();
if (out_row_sources_buf)
current_row_sources.emplace_back(current.impl->order, true);
/// A non-strict comparison, since we select the last row for the same version values.
if (version_column_number == -1
|| selected_row.empty()
|| current->all_columns[version_column_number]->compareAt(
current->pos, selected_row.row_num,
*(*selected_row.columns)[version_column_number],
/* nan_direction_hint = */ 1) >= 0)
{
max_pos = current_pos;
setRowRef(selected_row, current);
}
if (!current->isLast())
{
current->next();
queue.push(current);
}
else
{
/// We get the next block from the corresponding source, if there is one.
fetchNextBlock(current, queue);
}
}
/// We will write the data for the last primary key.
if (!selected_row.empty())
insertRow(merged_columns, merged_rows);
finished = true;
}
}