ClickHouse/dbms/src/DataStreams/CollapsingFinalBlockInputStream.h

257 lines
7.4 KiB
C++

#pragma once
#include <common/logger_useful.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Core/SortDescription.h>
#include <Columns/ColumnsNumber.h>
#include <Common/typeid_cast.h>
#include <queue>
namespace DB
{
/// Collapses the same rows with the opposite sign roughly like CollapsingSortedBlockInputStream.
/// Outputs the rows in random order (the input streams must still be ordered).
/// Outputs only rows with a positive sign.
class CollapsingFinalBlockInputStream : public IProfilingBlockInputStream
{
public:
CollapsingFinalBlockInputStream(
const BlockInputStreams & inputs,
const SortDescription & description_,
const String & sign_column_name_)
: description(description_), sign_column_name(sign_column_name_)
{
children.insert(children.end(), inputs.begin(), inputs.end());
}
~CollapsingFinalBlockInputStream() override;
String getName() const override { return "CollapsingFinal"; }
bool isSortedOutput() const override { return true; }
const SortDescription & getSortDescription() const override { return description; }
Block getHeader() const override { return children.at(0)->getHeader(); }
protected:
Block readImpl() override;
private:
struct MergingBlock;
using BlockPlainPtrs = std::vector<MergingBlock*>;
struct MergingBlock : boost::noncopyable
{
MergingBlock(const Block & block_,
size_t stream_index_,
const SortDescription & desc,
const String & sign_column_name,
BlockPlainPtrs * output_blocks)
: block(block_), stream_index(stream_index_), output_blocks(output_blocks)
{
sort_columns.resize(desc.size());
for (size_t i = 0; i < desc.size(); ++i)
{
size_t column_number = !desc[i].column_name.empty()
? block.getPositionByName(desc[i].column_name)
: desc[i].column_number;
sort_columns[i] = block.safeGetByPosition(column_number).column.get();
}
const IColumn * sign_icolumn = block.getByName(sign_column_name).column.get();
sign_column = typeid_cast<const ColumnInt8 *>(sign_icolumn);
if (!sign_column)
throw Exception("Sign column must have type Int8", ErrorCodes::BAD_TYPE_OF_FIELD);
rows = sign_column->size();
/// Filled entirely with zeros. Then `1` are set in the positions of the rows to be left.
filter.resize_fill(rows);
}
Block block;
/// Rows with the same key will be sorted in ascending order of stream_index.
size_t stream_index;
size_t rows;
/// Which rows should be left. Filled when the threads merge.
IColumn::Filter filter;
/// Point to `block`.
ColumnRawPtrs sort_columns;
const ColumnInt8 * sign_column;
/// When it reaches zero, the block can be outputted in response.
int refcount = 0;
/// Where to put the block when it is ready to be outputted in response.
BlockPlainPtrs * output_blocks;
};
/// When deleting the last block reference, adds a block to `output_blocks`.
class MergingBlockPtr
{
public:
MergingBlockPtr() : ptr() {}
explicit MergingBlockPtr(MergingBlock * ptr_) : ptr(ptr_)
{
if (ptr)
++ptr->refcount;
}
MergingBlockPtr(const MergingBlockPtr & rhs) : ptr(rhs.ptr)
{
if (ptr)
++ptr->refcount;
}
MergingBlockPtr & operator=(const MergingBlockPtr & rhs)
{
destroy();
ptr = rhs.ptr;
if (ptr)
++ptr->refcount;
return *this;
}
~MergingBlockPtr()
{
destroy();
}
/// Zero the pointer and do not add a block to output_blocks.
void cancel()
{
if (ptr)
{
--ptr->refcount;
if (!ptr->refcount)
delete ptr;
ptr = nullptr;
}
}
MergingBlock & operator*() const { return *ptr; }
MergingBlock * operator->() const { return ptr; }
operator bool() const { return !!ptr; }
bool operator!() const { return !ptr; }
private:
MergingBlock * ptr;
void destroy()
{
if (ptr)
{
--ptr->refcount;
if (!ptr->refcount)
{
if (std::uncaught_exceptions())
delete ptr;
else
ptr->output_blocks->push_back(ptr);
}
ptr = nullptr;
}
}
};
struct Cursor
{
MergingBlockPtr block;
size_t pos;
Cursor() {}
explicit Cursor(const MergingBlockPtr & block_, size_t pos_ = 0) : block(block_), pos(pos_) {}
bool operator< (const Cursor & rhs) const
{
for (size_t i = 0; i < block->sort_columns.size(); ++i)
{
int res = block->sort_columns[i]->compareAt(pos, rhs.pos, *(rhs.block->sort_columns[i]), 1);
if (res > 0)
return true;
if (res < 0)
return false;
}
return block->stream_index > rhs.block->stream_index;
}
/// Not consistent with operator< : does not consider order.
bool equal(const Cursor & rhs) const
{
if (!block || !rhs.block)
return false;
for (size_t i = 0; i < block->sort_columns.size(); ++i)
{
int res = block->sort_columns[i]->compareAt(pos, rhs.pos, *(rhs.block->sort_columns[i]), 1);
if (res != 0)
return false;
}
return true;
}
Int8 getSign()
{
return block->sign_column->getData()[pos];
}
/// Indicates that this row should be outputted in response.
void addToFilter()
{
block->filter[pos] = 1;
}
bool isLast()
{
return pos + 1 == block->rows;
}
void next()
{
++pos;
}
};
using Queue = std::priority_queue<Cursor>;
const SortDescription description;
String sign_column_name;
Logger * log = &Logger::get("CollapsingFinalBlockInputStream");
bool first = true;
BlockPlainPtrs output_blocks;
Queue queue;
Cursor previous; /// The current primary key.
Cursor last_positive; /// The last positive row for the current primary key.
size_t count_positive = 0; /// The number of positive rows for the current primary key.
size_t count_negative = 0; /// The number of negative rows for the current primary key.
bool last_is_positive = false; /// true if the last row for the current primary key is positive.
size_t count_incorrect_data = 0; /// To prevent too many error messages from writing to the log.
/// Count the number of blocks fetched and outputted.
size_t blocks_fetched = 0;
size_t blocks_output = 0;
void fetchNextBlock(size_t input_index);
void commitCurrent();
void reportBadCounts();
void reportBadSign(Int8 sign);
};
}