Added CollapsingSortedTransform.

This commit is contained in:
Nikolai Kochetov 2020-03-27 20:37:35 +03:00
parent 9577ef38d0
commit d5ab2e5331
8 changed files with 429 additions and 22 deletions

View File

@ -0,0 +1,231 @@
#include <Processors/Merges/CollapsingSortedTransform.h>
#include <Columns/ColumnsNumber.h>
#include <Common/FieldVisitors.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
/// Maximum number of messages about incorrect data in the log.
#define MAX_ERROR_MESSAGES 10
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INCORRECT_DATA;
}
CollapsingSortedTransform::CollapsingSortedTransform(
const Block & header,
size_t num_inputs,
SortDescription description_,
const String & sign_column,
size_t max_block_size,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes)
: IMergingTransform(num_inputs, header, header, max_block_size, use_average_block_sizes, true)
, description(std::move(description_))
, sign_column_number(header.getPositionByName(sign_column))
, out_row_sources_buf(out_row_sources_buf_)
{
}
void CollapsingSortedTransform::initializeInputs()
{
queue = SortingHeap<SortCursor>(cursors);
is_queue_initialized = true;
}
void CollapsingSortedTransform::consume(Chunk chunk, size_t input_number)
{
updateCursor(std::move(chunk), input_number);
if (is_queue_initialized)
queue.push(cursors[input_number]);
}
void CollapsingSortedTransform::updateCursor(Chunk chunk, size_t source_num)
{
auto num_rows = chunk.getNumRows();
auto columns = chunk.detachColumns();
for (auto & column : columns)
column = column->convertToFullColumnIfConst();
chunk.setColumns(std::move(columns), num_rows);
auto & source_chunk = source_chunks[source_num];
if (source_chunk)
{
source_chunk = new detail::SharedChunk(std::move(chunk));
cursors[source_num].reset(source_chunk->getColumns(), {});
}
else
{
if (cursors[source_num].has_collation)
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR);
source_chunk = new detail::SharedChunk(std::move(chunk));
cursors[source_num] = SortCursorImpl(source_chunk->getColumns(), description, source_num);
}
source_chunk->all_columns = cursors[source_num].all_columns;
source_chunk->sort_columns = cursors[source_num].sort_columns;
}
void CollapsingSortedTransform::reportIncorrectData()
{
std::stringstream s;
s << "Incorrect data: number of rows with sign = 1 (" << count_positive
<< ") differs with number of rows with sign = -1 (" << count_negative
<< ") by more than one (for key: ";
auto & sort_columns = *last_row.sort_columns;
for (size_t i = 0, size = sort_columns.size(); i < size; ++i)
{
if (i != 0)
s << ", ";
s << applyVisitor(FieldVisitorToString(), (*sort_columns[i])[last_row.row_num]);
}
s << ").";
/** Fow now we limit ourselves to just logging such situations,
* since the data is generated by external programs.
* With inconsistent data, this is an unavoidable error that can not be easily corrected by admins. Therefore Warning.
*/
LOG_WARNING(log, s.rdbuf());
}
void CollapsingSortedTransform::insertRow(RowRef & row)
{
merged_data.insertRow(*row.all_columns, row.row_num, row.owned_chunk->getNumRows());
}
void CollapsingSortedTransform::insertRows()
{
if (count_positive == 0 && count_negative == 0)
{
/// No input rows have been read.
return;
}
if (last_is_positive || count_positive != count_negative)
{
if (count_positive <= count_negative)
{
insertRow(first_negative_row);
if (out_row_sources_buf)
current_row_sources[first_negative_pos].setSkipFlag(false);
}
if (count_positive >= count_negative)
{
insertRow(last_positive_row);
if (out_row_sources_buf)
current_row_sources[last_positive_pos].setSkipFlag(false);
}
if (!(count_positive == count_negative || count_positive + 1 == count_negative || count_positive == count_negative + 1))
{
if (count_incorrect_data < MAX_ERROR_MESSAGES)
reportIncorrectData();
++count_incorrect_data;
}
}
if (out_row_sources_buf)
out_row_sources_buf->write(
reinterpret_cast<const char *>(current_row_sources.data()),
current_row_sources.size() * sizeof(RowSourcePart));
}
void CollapsingSortedTransform::work()
{
merge();
}
void CollapsingSortedTransform::merge()
{
/// Take rows in required order and put them into `merged_data`, while the rows are no more than `max_block_size`
while (queue.isValid())
{
auto current = queue.current();
Int8 sign = assert_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->pos];
RowRef current_row;
setRowRef(current_row, current);
if (last_row.empty())
setRowRef(last_row, current);
bool key_differs = !last_row.hasEqualSortColumnsWith(current_row);
/// if there are enough rows and the last one is calculated completely
if (key_differs && merged_data.hasEnoughRows())
return;
if (key_differs)
{
/// We write data for the previous primary key.
insertRows();
current_row.swap(last_row);
count_negative = 0;
count_positive = 0;
current_pos = 0;
first_negative_pos = 0;
last_positive_pos = 0;
current_row_sources.resize(0);
}
/// Initially, skip all rows. On insert, unskip "corner" rows.
if (out_row_sources_buf)
current_row_sources.emplace_back(current.impl->order, true);
if (sign == 1)
{
++count_positive;
last_is_positive = true;
setRowRef(last_positive_row, current);
last_positive_pos = current_pos;
}
else if (sign == -1)
{
if (!count_negative)
{
setRowRef(first_negative_row, current);
first_negative_pos = current_pos;
}
++count_negative;
last_is_positive = false;
}
else
throw Exception("Incorrect data: Sign = " + toString(sign) + " (must be 1 or -1).",
ErrorCodes::INCORRECT_DATA);
if (!current->isLast())
{
queue.next();
}
else
{
/// We take next block from the corresponding source, if there is one.
queue.removeTop();
requestDataForInput(current.impl->order);
return;
}
}
insertRows();
finish();
}
}

View File

@ -0,0 +1,93 @@
#pragma once
#include <Processors/Merges/IMergingTransform.h>
#include <Processors/Merges/SharedChunk.h>
#include <Core/SortDescription.h>
#include <Core/SortCursor.h>
#include <DataStreams/ColumnGathererStream.h>
#include <common/logger_useful.h>
namespace DB
{
/** Merges several sorted inputs to one.
* For each group of consecutive identical values of the primary key (the columns by which the data is sorted),
* keeps no more than one row with the value of the column `sign_column = -1` ("negative row")
* and no more than a row with the value of the column `sign_column = 1` ("positive row").
* That is, it collapses the records from the change log.
*
* If the number of positive and negative rows is the same, and the last row is positive, then the first negative and last positive rows are written.
* If the number of positive and negative rows is the same, and the last line is negative, it writes nothing.
* If the positive by 1 is greater than the negative rows, then only the last positive row is written.
* If negative by 1 is greater than positive rows, then only the first negative row is written.
* Otherwise, a logical error.
*/
class CollapsingSortedTransform final : public IMergingTransform
{
public:
CollapsingSortedTransform(
const Block & header,
size_t num_inputs,
SortDescription description_,
const String & sign_column,
size_t max_block_size,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false);
String getName() const override { return "CollapsingSortedTransform"; }
void work() override;
protected:
void initializeInputs() override;
void consume(Chunk chunk, size_t input_number) override;
private:
Logger * log = &Logger::get("CollapsingSortedTransform");
/// Settings
SortDescription description;
bool has_collation = false;
const size_t sign_column_number;
/// Used in Vertical merge algorithm to gather non-PK/non-index columns (on next step)
/// If it is not nullptr then it should be populated during execution
WriteBuffer * out_row_sources_buf = nullptr;
/// Chunks currently being merged.
using SourceChunks = std::vector<detail::SharedChunkPtr>;
SourceChunks source_chunks;
SortCursorImpls cursors;
SortingHeap<SortCursor> queue;
bool is_queue_initialized = false;
using RowRef = detail::RowRef;
RowRef first_negative_row;
RowRef last_positive_row;
RowRef last_row;
size_t count_positive = 0; /// The number of positive rows for the current primary key.
size_t count_negative = 0; /// The number of negative rows for the current primary key.
bool last_is_positive = false; /// true if the last row for the current primary key is positive.
/// Fields specific for VERTICAL merge algorithm.
/// Row numbers are relative to the start of current primary key.
size_t current_pos = 0; /// Current row number
size_t first_negative_pos = 0; /// Row number of first_negative
size_t last_positive_pos = 0; /// Row number of last_positive
PODArray<RowSourcePart> current_row_sources; /// Sources of rows with the current primary key
size_t count_incorrect_data = 0; /// To prevent too many error messages from writing to the log.
void reportIncorrectData();
void insertRow(RowRef & row);
void insertRows();
void merge();
void updateCursor(Chunk chunk, size_t source_num);
void setRowRef(RowRef & row, SortCursor & cursor) { row.set(cursor, source_chunks[cursor.impl->order]); }
};
}

View File

@ -173,15 +173,7 @@ IProcessor::Status IMergingTransform::prepare()
/// Push if has data.
bool has_data_to_push = (is_finished && merged_data.mergedRows()) || merged_data.hasEnoughRows();
if (has_data_to_push && !is_port_full)
{
auto chunk = merged_data.pull();
++total_chunks;
total_rows += chunk.getNumRows();
total_bytes += chunk.allocatedBytes();
output.push(std::move(chunk));
}
output.push(merged_data.pull());
if (!is_initialized)
return prepareInitializeInputs();

View File

@ -41,7 +41,7 @@ protected:
void finish() { is_finished = true; } /// Call it when all data was inserted to merged_data.
/// Struct which represents current merging chunk of data.
/// Also it calculates the number of merged rows.
/// Also it calculates the number of merged rows and other profile info.
class MergedData
{
public:
@ -98,6 +98,8 @@ protected:
merged_rows = 0;
sum_blocks_granularity = 0;
++total_chunks;
total_allocated_bytes += chunk.allocatedBytes();
return chunk;
}
@ -118,28 +120,28 @@ protected:
return merged_rows * merged_rows >= sum_blocks_granularity;
}
UInt64 totalMergedRows() const { return total_merged_rows; }
UInt64 mergedRows() const { return merged_rows; }
UInt64 totalMergedRows() const { return total_merged_rows; }
UInt64 totalChunks() const { return total_chunks; }
UInt64 totalAllocatedBytes() const { return total_allocated_bytes; }
private:
MutableColumns columns;
UInt64 sum_blocks_granularity = 0;
UInt64 total_merged_rows = 0;
UInt64 merged_rows = 0;
UInt64 total_merged_rows = 0;
UInt64 total_chunks = 0;
UInt64 total_allocated_bytes = 0;
const UInt64 max_block_size;
const bool use_average_block_size;
MutableColumns columns;
};
MergedData merged_data;
protected:
/// Profile info.
Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE};
UInt64 total_rows = 0;
UInt64 total_bytes = 0;
UInt64 total_chunks = 0;
private:
/// Processor state.

View File

@ -19,6 +19,7 @@ MergingSortedTransform::MergingSortedTransform(
SortDescription description_,
size_t max_block_size,
UInt64 limit_,
WriteBuffer * out_row_sources_buf_,
bool quiet_,
bool use_average_block_sizes,
bool have_all_inputs_)
@ -26,6 +27,7 @@ MergingSortedTransform::MergingSortedTransform(
, description(std::move(description_))
, limit(limit_)
, quiet(quiet_)
, out_row_sources_buf(out_row_sources_buf_)
, source_chunks(num_inputs)
, cursors(num_inputs)
{
@ -205,6 +207,8 @@ void MergingSortedTransform::insertFromChunk(size_t source_num)
source_chunks[source_num] = Chunk();
/// Write order of rows for other columns
/// this data will be used in gather stream
if (out_row_sources_buf)
{
RowSourcePart row_source(source_num);
@ -224,13 +228,13 @@ void MergingSortedTransform::onFinish()
std::stringstream message;
message << std::fixed << std::setprecision(2)
<< "Merge sorted " << total_chunks << " blocks, " << total_rows << " rows"
<< "Merge sorted " << merged_data.totalChunks() << " blocks, " << merged_data.totalMergedRows() << " rows"
<< " in " << seconds << " sec.";
if (seconds != 0)
message << ", "
<< total_rows / seconds << " rows/sec., "
<< total_bytes / 1000000.0 / seconds << " MB/sec.";
<< merged_data.totalMergedRows() / seconds << " rows/sec., "
<< merged_data.totalAllocatedBytes() / 1000000.0 / seconds << " MB/sec.";
LOG_DEBUG(log, message.str());
}

View File

@ -9,7 +9,7 @@ namespace DB
{
/// Merges several sorted inputs into one sorted output.
class MergingSortedTransform : public IMergingTransform
class MergingSortedTransform final : public IMergingTransform
{
public:
MergingSortedTransform(
@ -18,6 +18,7 @@ public:
SortDescription description,
size_t max_block_size,
UInt64 limit_ = 0,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool quiet_ = false,
bool use_average_block_sizes = false,
bool have_all_inputs_ = true);

View File

@ -0,0 +1,83 @@
#pragma once
#include <Processors/Chunk.h>
#include <Columns/IColumn.h>
#include <boost/smart_ptr/intrusive_ptr.hpp>
#include <Core/SortCursor.h>
namespace DB::detail
{
/// Allows you refer to the row in the block and hold the block ownership,
/// and thus avoid creating a temporary row object.
/// Do not use std::shared_ptr, since there is no need for a place for `weak_count` and `deleter`;
/// does not use Poco::SharedPtr, since you need to allocate a block and `refcount` in one piece;
/// does not use Poco::AutoPtr, since it does not have a `move` constructor and there are extra checks for nullptr;
/// The reference counter is not atomic, since it is used from one thread.
struct SharedChunk : Chunk
{
int refcount = 0;
ColumnRawPtrs all_columns;
ColumnRawPtrs sort_columns;
explicit SharedChunk(Chunk && chunk) : Chunk(std::move(chunk)) {}
};
inline void intrusive_ptr_add_ref(detail::SharedChunk * ptr)
{
++ptr->refcount;
}
inline void intrusive_ptr_release(detail::SharedChunk * ptr)
{
if (0 == --ptr->refcount)
delete ptr;
}
using SharedChunkPtr = boost::intrusive_ptr<detail::SharedChunk>;
struct RowRef
{
detail::SharedChunkPtr owned_chunk;
ColumnRawPtrs * all_columns = nullptr;
ColumnRawPtrs * sort_columns = nullptr;
UInt64 row_num = 0;
void swap(RowRef & other)
{
owned_chunk.swap(other.owned_chunk);
std::swap(all_columns, other.all_columns);
std::swap(sort_columns, other.sort_columns);
std::swap(row_num, other.row_num);
}
bool empty() const { return all_columns == nullptr; }
void set(SortCursor & cursor, SharedChunkPtr chunk)
{
owned_chunk = std::move(chunk);
row_num = cursor.impl->pos;
all_columns = &owned_chunk->all_columns;
sort_columns = &owned_chunk->sort_columns;
}
bool hasEqualSortColumnsWith(const RowRef & other)
{
auto size = sort_columns->size();
for (size_t col_number = 0; col_number < size; ++col_number)
{
auto & cur_column = (*sort_columns)[col_number];
auto & other_column = (*other.sort_columns)[col_number];
if (0 != cur_column->compareAt(row_num, other.row_num, *other_column, 1))
return false;
}
return true;
}
};
}

View File

@ -199,6 +199,7 @@ void MergeSortingTransform::consume(Chunk chunk)
description,
max_merged_block_size,
limit,
nullptr,
quiet,
use_average_block_sizes,
have_all_inputs);