Added VersionedCollapsingTransform.

This commit is contained in:
Nikolai Kochetov 2020-03-30 19:16:00 +03:00
parent d8a303dda8
commit c05331656f
4 changed files with 437 additions and 1 deletions

View File

@ -4,6 +4,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
ReplacingSortedTransform::ReplacingSortedTransform(
size_t num_inputs, const Block & header,
SortDescription description_, const String & version_column,
@ -78,6 +83,11 @@ void ReplacingSortedTransform::insertRow()
selected_row.clear();
}
void ReplacingSortedTransform::work()
{
merge();
}
void ReplacingSortedTransform::merge()
{
/// Take the rows in needed order and put them into `merged_columns` until rows no more than `max_block_size`

View File

@ -23,13 +23,14 @@ public:
bool use_average_block_sizes = false);
String getName() const override { return "ReplacingSorted"; }
void work() override;
protected:
void initializeInputs() override;
void consume(Chunk chunk, size_t input_number) override;
private:
Logger * log = &Logger::get("ReplacingSortedBlockInputStream");
Logger * log = &Logger::get("ReplacingSortedTransform");
SortDescription description;
ssize_t version_column_number = -1;

View File

@ -0,0 +1,200 @@
#include <Processors/Merges/VersionedCollapsingTransform.h>
#include <IO/WriteBuffer.h>
#include <Columns/ColumnsNumber.h>
namespace DB
{
static const size_t MAX_ROWS_IN_MULTIVERSION_QUEUE = 8192;
VersionedCollapsingTransform::VersionedCollapsingTransform(
size_t num_inputs, const Block & header,
SortDescription description_, const String & sign_column_,
size_t max_block_size,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes)
: IMergingTransform(num_inputs, header, header, max_block_size, use_average_block_sizes, true)
, description(std::move(description_))
, out_row_sources_buf(out_row_sources_buf_)
, max_rows_in_queue(MAX_ROWS_IN_MULTIVERSION_QUEUE - 2)
, current_keys(max_rows_in_queue + 1)
, chunk_allocator(num_inputs + max_rows_in_queue + 1)
{
sign_column_number = header.getPositionByName(sign_column_);
}
void VersionedCollapsingTransform::initializeInputs()
{
queue = SortingHeap<SortCursor>(cursors);
is_queue_initialized = true;
}
void VersionedCollapsingTransform::consume(Chunk chunk, size_t input_number)
{
updateCursor(std::move(chunk), input_number);
if (is_queue_initialized)
queue.push(cursors[input_number]);
}
void VersionedCollapsingTransform::updateCursor(Chunk chunk, size_t source_num)
{
auto num_rows = chunk.getNumRows();
auto columns = chunk.detachColumns();
for (auto & column : columns)
column = column->convertToFullColumnIfConst();
chunk.setColumns(std::move(columns), num_rows);
auto & source_chunk = source_chunks[source_num];
if (source_chunk)
{
source_chunk = chunk_allocator.alloc(std::move(chunk));
cursors[source_num].reset(source_chunk->getColumns(), {});
}
else
{
if (cursors[source_num].has_collation)
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR);
source_chunk = chunk_allocator.alloc(std::move(chunk));
cursors[source_num] = SortCursorImpl(source_chunk->getColumns(), description, source_num);
}
source_chunk->all_columns = cursors[source_num].all_columns;
source_chunk->sort_columns = cursors[source_num].sort_columns;
}
void VersionedCollapsingTransform::work()
{
merge();
}
inline ALWAYS_INLINE static void writeRowSourcePart(WriteBuffer & buffer, RowSourcePart row_source)
{
if constexpr (sizeof(RowSourcePart) == 1)
buffer.write(*reinterpret_cast<const char *>(&row_source));
else
buffer.write(reinterpret_cast<const char *>(&row_source), sizeof(RowSourcePart));
}
void VersionedCollapsingTransform::insertGap(size_t gap_size)
{
if (out_row_sources_buf)
{
for (size_t i = 0; i < gap_size; ++i)
{
writeRowSourcePart(*out_row_sources_buf, current_row_sources.front());
current_row_sources.pop();
}
}
}
void VersionedCollapsingTransform::insertRow(size_t skip_rows, const RowRef & row)
{
merged_data.insertRow(*row.all_columns, row.row_num, row.owned_chunk->getNumRows());
insertGap(skip_rows);
if (out_row_sources_buf)
{
current_row_sources.front().setSkipFlag(false);
writeRowSourcePart(*out_row_sources_buf, current_row_sources.front());
current_row_sources.pop();
}
}
void VersionedCollapsingTransform::merge()
{
/// Take rows in correct order and put them into `merged_columns` until the rows no more than `max_block_size`
while (queue.isValid())
{
SortCursor current = queue.current();
RowRef current_row;
Int8 sign = assert_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->pos];
setRowRef(current_row, current);
/// At first, let's decide the number of rows needed to insert right now.
size_t num_rows_to_insert = 0;
if (!current_keys.empty())
{
auto key_differs = !current_row.hasEqualSortColumnsWith(current_keys.back());
if (key_differs) /// Flush whole queue
num_rows_to_insert = current_keys.size();
else if (current_keys.size() >= max_rows_in_queue) /// Flush single row if queue is big
num_rows_to_insert = 1;
}
/// Insert ready roes if any.
while (num_rows_to_insert)
{
const auto & row = current_keys.front();
auto gap = current_keys.frontGap();
insertRow(gap, row);
current_keys.popFront();
--num_rows_to_insert;
/// It's ok to return here, because we didn't affect queue.
if (merged_data.hasEnoughRows())
return;
}
if (current_keys.empty())
{
sign_in_queue = sign;
current_keys.pushBack(current_row);
}
else /// If queue is not empty, then current_row has the same key as in current_keys queue
{
if (sign == sign_in_queue)
current_keys.pushBack(current_row);
else
{
current_keys.popBack();
current_keys.pushGap(2);
}
}
if (out_row_sources_buf)
current_row_sources.emplace(current->order, true);
if (!current->isLast())
{
queue.next();
}
else
{
/// We take next block from the corresponding source, if there is one.
queue.removeTop();
requestDataForInput(current.impl->order);
return;
}
}
while (!current_keys.empty())
{
const auto & row = current_keys.front();
auto gap = current_keys.frontGap();
insertRow(gap, row);
current_keys.popFront();
if (merged_data.hasEnoughRows())
return;
}
/// Write information about last collapsed rows.
insertGap(current_keys.frontGap());
finish();
}
}

View File

@ -0,0 +1,225 @@
#pragma once
#include <Processors/Merges/IMergingTransform.h>
#include <Processors/Merges/RowRef.h>
#include <Core/SortDescription.h>
#include <Core/SortCursor.h>
#include <DataStreams/ColumnGathererStream.h>
#include <common/logger_useful.h>
#include <queue>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
/* Deque with fixed memory size. Allows pushing gaps.
* frontGap() returns the number of gaps were inserted before front.
*
* This structure may be implemented via std::deque, but
* - Deque uses fixed amount of memory which is allocated in constructor. No more allocations are performed.
* - Gaps are not stored as separate values in queue, which is more memory efficient.
* - Deque is responsible for gaps invariant: after removing element, moves gaps into neighbor cell.
*
* Note: empty deque may have non-zero front gap.
*/
template <typename T>
class FixedSizeDequeWithGaps
{
public:
struct ValueWithGap
{
/// The number of gaps before current element. The number of gaps after last element stores into end cell.
size_t gap;
/// Store char[] instead of T in order to make ValueWithGap POD.
/// Call placement constructors after push and and destructors after pop.
char value[sizeof(T)];
};
explicit FixedSizeDequeWithGaps(size_t size)
{
container.resize_fill(size + 1);
}
~FixedSizeDequeWithGaps()
{
auto destruct_range = [this](size_t from, size_t to)
{
for (size_t i = from; i < to; ++i)
destructValue(i);
};
if (begin <= end)
destruct_range(begin, end);
else
{
destruct_range(0, end);
destruct_range(begin, container.size());
}
}
void pushBack(const T & value)
{
checkEnoughSpaceToInsert();
constructValue(end, value);
moveRight(end);
container[end].gap = 0;
}
void pushGap(size_t count) { container[end].gap += count; }
void popBack()
{
checkHasValuesToRemove();
size_t curr_gap = container[end].gap;
moveLeft(end);
destructValue(end);
container[end].gap += curr_gap;
}
void popFront()
{
checkHasValuesToRemove();
destructValue(begin);
moveRight(begin);
}
T & front()
{
checkHasValuesToGet();
return getValue(begin);
}
const T & front() const
{
checkHasValuesToGet();
return getValue(begin);
}
const T & back() const
{
size_t ps = end;
moveLeft(ps);
return getValue(ps);
}
size_t & frontGap() { return container[begin].gap; }
const size_t & frontGap() const { return container[begin].gap; }
size_t size() const
{
if (begin <= end)
return end - begin;
return end + (container.size() - begin);
}
bool empty() const { return begin == end; }
private:
PODArray<ValueWithGap> container;
size_t gap_before_first = 0;
size_t begin = 0;
size_t end = 0;
void constructValue(size_t index, const T & value) { new (container[index].value) T(value); }
void destructValue(size_t index) { reinterpret_cast<T *>(container[index].value)->~T(); }
T & getValue(size_t index) { return *reinterpret_cast<T *>(container[index].value); }
const T & getValue(size_t index) const { return *reinterpret_cast<const T *>(container[index].value); }
void moveRight(size_t & index) const
{
++index;
if (index == container.size())
index = 0;
}
void moveLeft(size_t & index) const
{
if (index == 0)
index = container.size();
--index;
}
void checkEnoughSpaceToInsert() const
{
if (size() + 1 == container.size())
throw Exception("Not enough space to insert into FixedSizeDequeWithGaps with capacity "
+ toString(container.size() - 1), ErrorCodes::LOGICAL_ERROR);
}
void checkHasValuesToRemove() const
{
if (empty())
throw Exception("Cannot remove from empty FixedSizeDequeWithGaps", ErrorCodes::LOGICAL_ERROR);
}
void checkHasValuesToGet() const
{
if (empty())
throw Exception("Cannot get value from empty FixedSizeDequeWithGaps", ErrorCodes::LOGICAL_ERROR);
}
};
class VersionedCollapsingTransform : public IMergingTransform
{
public:
/// Don't need version column. It's in primary key.
VersionedCollapsingTransform(
size_t num_inputs, const Block & header,
SortDescription description_, const String & sign_column_,
size_t max_block_size,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false);
String getName() const override { return "VersionedCollapsingTransform"; }
void work() override;
protected:
void initializeInputs() override;
void consume(Chunk chunk, size_t input_number) override;
private:
Logger * log = &Logger::get("VersionedCollapsingTransform");
SortDescription description;
size_t sign_column_number = 0;
/// Used in Vertical merge algorithm to gather non-PK/non-index columns (on next step)
/// If it is not nullptr then it should be populated during execution
WriteBuffer * out_row_sources_buf = nullptr;
/// Chunks currently being merged.
using SourceChunks = std::vector<detail::SharedChunkPtr>;
SourceChunks source_chunks;
SortCursorImpls cursors;
SortingHeap<SortCursor> queue;
bool is_queue_initialized = false;
using RowRef = detail::RowRef;
const size_t max_rows_in_queue;
/// Rows with the same primary key and sign.
FixedSizeDequeWithGaps<RowRef> current_keys;
Int8 sign_in_queue = 0;
detail::SharedChunkAllocator chunk_allocator;
std::queue<RowSourcePart> current_row_sources; /// Sources of rows with the current primary key
void insertGap(size_t gap_size);
void insertRow(size_t skip_rows, const RowRef & row);
void merge();
void updateCursor(Chunk chunk, size_t source_num);
void setRowRef(RowRef & row, SortCursor & cursor) { row.set(cursor, source_chunks[cursor.impl->order]); }
};
}