Remove some streams.

This commit is contained in:
Nikolai Kochetov 2020-06-01 19:31:06 +03:00
parent ca3fb27b09
commit 3a0acb00c1
50 changed files with 75 additions and 3866 deletions

View File

@ -59,7 +59,7 @@ MutableColumnPtr ColumnAggregateFunction::convertToValues(MutableColumnPtr colum
* Due to the presence of WITH TOTALS, during aggregation the states of this aggregate function will be stored
* in the ColumnAggregateFunction column of type
* AggregateFunction(quantileTimingState(0.5), UInt64).
* Then, in `TotalsHavingBlockInputStream`, it will be called `convertToValues` method,
* Then, in `TotalsHavingTransform`, it will be called `convertToValues` method,
* to get the "ready" values.
* But it just converts a column of type
* `AggregateFunction(quantileTimingState(0.5), UInt64)`

View File

@ -1,51 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Core/ColumnWithTypeAndName.h>
namespace DB
{
/** Adds a materialized const column to the block with a specified value.
*/
template <typename T>
class AddingConstColumnBlockInputStream : public IBlockInputStream
{
public:
AddingConstColumnBlockInputStream(
BlockInputStreamPtr input_,
DataTypePtr data_type_,
T value_,
String column_name_)
: data_type(data_type_), value(value_), column_name(column_name_)
{
children.push_back(input_);
}
String getName() const override { return "AddingConstColumn"; }
Block getHeader() const override
{
Block res = children.back()->getHeader();
res.insert({data_type->createColumn(), data_type, column_name});
return res;
}
protected:
Block readImpl() override
{
Block res = children.back()->read();
if (!res)
return res;
res.insert({data_type->createColumnConst(res.rows(), value)->convertToFullColumnIfConst(), data_type, column_name});
return res;
}
private:
DataTypePtr data_type;
T value;
String column_name;
};
}

View File

@ -1,75 +0,0 @@
#include <Common/formatReadable.h>
#include <DataStreams/BlocksListBlockInputStream.h>
#include <DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h>
#include <DataStreams/AggregatingBlockInputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
namespace ProfileEvents
{
extern const Event ExternalAggregationMerge;
}
namespace DB
{
Block AggregatingBlockInputStream::getHeader() const
{
return aggregator.getHeader(final);
}
Block AggregatingBlockInputStream::readImpl()
{
if (!executed)
{
executed = true;
AggregatedDataVariantsPtr data_variants = std::make_shared<AggregatedDataVariants>();
Aggregator::CancellationHook hook = [&]() { return this->isCancelled(); };
aggregator.setCancellationHook(hook);
aggregator.execute(children.back(), *data_variants);
if (!aggregator.hasTemporaryFiles())
{
ManyAggregatedDataVariants many_data { data_variants };
impl = aggregator.mergeAndConvertToBlocks(many_data, final, 1);
}
else
{
/** If there are temporary files with partially-aggregated data on the disk,
* then read and merge them, spending the minimum amount of memory.
*/
ProfileEvents::increment(ProfileEvents::ExternalAggregationMerge);
if (!isCancelled())
{
/// Flush data in the RAM to disk also. It's easier than merging on-disk and RAM data.
if (data_variants->size()) // NOLINT
aggregator.writeToTemporaryFile(*data_variants);
}
const auto & files = aggregator.getTemporaryFiles();
BlockInputStreams input_streams;
for (const auto & file : files.files)
{
temporary_inputs.emplace_back(std::make_unique<TemporaryFileStream>(file->path()));
input_streams.emplace_back(temporary_inputs.back()->block_in);
}
LOG_TRACE(log, "Will merge {} temporary files of size {} compressed, {} uncompressed.", files.files.size(), ReadableSize(files.sum_size_compressed), ReadableSize(files.sum_size_uncompressed));
impl = std::make_unique<MergingAggregatedMemoryEfficientBlockInputStream>(input_streams, params, final, 1, 1);
}
}
if (isCancelledOrThrowIfKilled() || !impl)
return {};
return impl->read();
}
}

View File

@ -1,53 +0,0 @@
#pragma once
#include <Interpreters/Aggregator.h>
#include <IO/ReadBufferFromFile.h>
#include <Compression/CompressedReadBuffer.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/TemporaryFileStream.h>
namespace DB
{
/** Aggregates the stream of blocks using the specified key columns and aggregate functions.
* Columns with aggregate functions adds to the end of the block.
* If final = false, the aggregate functions are not finalized, that is, they are not replaced by their value, but contain an intermediate state of calculations.
* This is necessary so that aggregation can continue (for example, by combining streams of partially aggregated data).
*/
class AggregatingBlockInputStream : public IBlockInputStream
{
public:
/** keys are taken from the GROUP BY part of the query
* Aggregate functions are searched everywhere in the expression.
* Columns corresponding to keys and arguments of aggregate functions must already be computed.
*/
AggregatingBlockInputStream(const BlockInputStreamPtr & input, const Aggregator::Params & params_, bool final_)
: params(params_), aggregator(params), final(final_)
{
children.push_back(input);
}
String getName() const override { return "Aggregating"; }
Block getHeader() const override;
protected:
Block readImpl() override;
Aggregator::Params params;
Aggregator aggregator;
bool final;
bool executed = false;
std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs;
/** From here we will get the completed blocks after the aggregation. */
std::unique_ptr<IBlockInputStream> impl;
Poco::Logger * log = &Poco::Logger::get("AggregatingBlockInputStream");
};
}

View File

@ -1,157 +0,0 @@
#include <DataStreams/CollapsingFinalBlockInputStream.h>
#include <Common/typeid_cast.h>
/// Maximum number of messages about incorrect data in the log.
#define MAX_ERROR_MESSAGES 10
namespace DB
{
CollapsingFinalBlockInputStream::~CollapsingFinalBlockInputStream()
{
queue = {};
for (auto & block : output_blocks)
delete block;
}
void CollapsingFinalBlockInputStream::reportBadCounts()
{
/// With inconsistent data, this is an unavoidable error that can not be easily fixed by admins. Therefore Warning.
LOG_WARNING(log, "Incorrect data: number of rows with sign = 1 ({}) differs with number of rows with sign = -1 ({}) by more than one", count_positive, count_negative);
}
void CollapsingFinalBlockInputStream::reportBadSign(Int8 sign)
{
LOG_ERROR(log, "Invalid sign: {}", static_cast<int>(sign));
}
void CollapsingFinalBlockInputStream::fetchNextBlock(size_t input_index)
{
BlockInputStreamPtr stream = children[input_index];
Block block = stream->read();
if (!block)
return;
MergingBlockPtr merging_block(new MergingBlock(block, input_index, description, sign_column_name, &output_blocks));
++blocks_fetched;
queue.push(Cursor(merging_block));
}
void CollapsingFinalBlockInputStream::commitCurrent()
{
if (count_positive || count_negative)
{
if (count_positive >= count_negative && last_is_positive)
{
last_positive.addToFilter();
}
if (!(count_positive == count_negative || count_positive + 1 == count_negative || count_positive == count_negative + 1))
{
if (count_incorrect_data < MAX_ERROR_MESSAGES)
reportBadCounts();
++count_incorrect_data;
}
last_positive = Cursor();
previous = Cursor();
}
count_negative = 0;
count_positive = 0;
}
Block CollapsingFinalBlockInputStream::readImpl()
{
if (first)
{
for (size_t i = 0; i < children.size(); ++i)
fetchNextBlock(i);
first = false;
}
/// We will create blocks for the answer until we get a non-empty block.
while (true)
{
while (!queue.empty() && output_blocks.empty())
{
Cursor current = queue.top();
queue.pop();
bool has_next = !queue.empty();
Cursor next = has_next ? queue.top() : Cursor();
/// We will advance in the current block, not using the queue, as long as possible.
while (true)
{
if (!current.equal(previous))
{
commitCurrent();
previous = current;
}
Int8 sign = current.getSign();
if (sign == 1)
{
last_positive = current;
last_is_positive = true;
++count_positive;
}
else if (sign == -1)
{
last_is_positive = false;
++count_negative;
}
else
reportBadSign(sign);
if (current.isLast())
{
fetchNextBlock(current.block->stream_index);
/// All streams are over. We'll process the last key.
if (!has_next)
commitCurrent();
break;
}
else
{
current.next();
if (has_next && !(next < current))
{
queue.push(current);
break;
}
}
}
}
/// End of the stream.
if (output_blocks.empty())
{
if (blocks_fetched != blocks_output)
LOG_ERROR(log, "Logical error: CollapsingFinalBlockInputStream has output {} blocks instead of {}", blocks_output, blocks_fetched);
return Block();
}
MergingBlock * merging_block = output_blocks.back();
Block block = merging_block->block;
for (size_t i = 0; i < block.columns(); ++i)
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(merging_block->filter, -1);
output_blocks.pop_back();
delete merging_block;
++blocks_output;
if (block)
return block;
}
}
}

View File

@ -1,211 +0,0 @@
#pragma once
#include <common/logger_useful.h>
#include <DataStreams/IBlockInputStream.h>
#include <Core/SortDescription.h>
#include <Columns/ColumnsNumber.h>
#include <Common/typeid_cast.h>
#include <queue>
#include <cassert>
#include <boost/intrusive_ptr.hpp>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_TYPE_OF_FIELD;
}
/// Collapses the same rows with the opposite sign roughly like CollapsingSortedBlockInputStream.
/// Outputs the rows in random order (the input streams must still be ordered).
/// Outputs only rows with a positive sign.
class CollapsingFinalBlockInputStream : public IBlockInputStream
{
public:
CollapsingFinalBlockInputStream(
const BlockInputStreams & inputs,
const SortDescription & description_,
const String & sign_column_name_)
: description(description_), sign_column_name(sign_column_name_)
{
children.insert(children.end(), inputs.begin(), inputs.end());
}
~CollapsingFinalBlockInputStream() override;
String getName() const override { return "CollapsingFinal"; }
bool isSortedOutput() const override { return true; }
const SortDescription & getSortDescription() const override { return description; }
Block getHeader() const override { return children.at(0)->getHeader(); }
struct MergingBlock;
using BlockPlainPtrs = std::vector<MergingBlock*>;
struct MergingBlock : boost::noncopyable
{
MergingBlock(const Block & block_,
size_t stream_index_,
const SortDescription & desc,
const String & sign_column_name_,
BlockPlainPtrs * output_blocks_)
: block(block_), stream_index(stream_index_), output_blocks(output_blocks_)
{
sort_columns.resize(desc.size());
for (size_t i = 0; i < desc.size(); ++i)
{
size_t column_number = !desc[i].column_name.empty()
? block.getPositionByName(desc[i].column_name)
: desc[i].column_number;
sort_columns[i] = block.safeGetByPosition(column_number).column.get();
}
const IColumn * sign_icolumn = block.getByName(sign_column_name_).column.get();
sign_column = typeid_cast<const ColumnInt8 *>(sign_icolumn);
if (!sign_column)
throw Exception("Sign column must have type Int8", ErrorCodes::BAD_TYPE_OF_FIELD);
rows = sign_column->size();
/// Filled entirely with zeros. Then `1` are set in the positions of the rows to be left.
filter.resize_fill(rows);
}
Block block;
/// Rows with the same key will be sorted in ascending order of stream_index.
size_t stream_index;
size_t rows;
/// Which rows should be left. Filled when the threads merge.
IColumn::Filter filter;
/// Point to `block`.
ColumnRawPtrs sort_columns;
const ColumnInt8 * sign_column;
/// When it reaches zero, the block can be outputted in response.
int refcount = 0;
/// Where to put the block when it is ready to be outputted in response.
BlockPlainPtrs * output_blocks;
};
private:
Block readImpl() override;
/// When deleting the last block reference, adds a block to `output_blocks`.
using MergingBlockPtr = boost::intrusive_ptr<MergingBlock>;
struct Cursor
{
MergingBlockPtr block;
size_t pos = 0;
Cursor() {}
explicit Cursor(const MergingBlockPtr & block_, size_t pos_ = 0) : block(block_), pos(pos_) {}
bool operator< (const Cursor & rhs) const
{
for (size_t i = 0; i < block->sort_columns.size(); ++i)
{
int res = block->sort_columns[i]->compareAt(pos, rhs.pos, *(rhs.block->sort_columns[i]), 1);
if (res > 0)
return true;
if (res < 0)
return false;
}
return block->stream_index > rhs.block->stream_index;
}
/// Not consistent with operator< : does not consider order.
bool equal(const Cursor & rhs) const
{
if (!block || !rhs.block)
return false;
for (size_t i = 0; i < block->sort_columns.size(); ++i)
{
int res = block->sort_columns[i]->compareAt(pos, rhs.pos, *(rhs.block->sort_columns[i]), 1);
if (res != 0)
return false;
}
return true;
}
Int8 getSign()
{
return block->sign_column->getData()[pos];
}
/// Indicates that this row should be outputted in response.
void addToFilter()
{
block->filter[pos] = 1;
}
bool isLast()
{
return pos + 1 == block->rows;
}
void next()
{
++pos;
}
};
using Queue = std::priority_queue<Cursor>;
const SortDescription description;
String sign_column_name;
Poco::Logger * log = &Poco::Logger::get("CollapsingFinalBlockInputStream");
bool first = true;
BlockPlainPtrs output_blocks;
Queue queue;
Cursor previous; /// The current primary key.
Cursor last_positive; /// The last positive row for the current primary key.
size_t count_positive = 0; /// The number of positive rows for the current primary key.
size_t count_negative = 0; /// The number of negative rows for the current primary key.
bool last_is_positive = false; /// true if the last row for the current primary key is positive.
size_t count_incorrect_data = 0; /// To prevent too many error messages from writing to the log.
/// Count the number of blocks fetched and outputted.
size_t blocks_fetched = 0;
size_t blocks_output = 0;
void fetchNextBlock(size_t input_index);
void commitCurrent();
void reportBadCounts();
void reportBadSign(Int8 sign);
};
inline void intrusive_ptr_add_ref(CollapsingFinalBlockInputStream::MergingBlock * ptr)
{
++ptr->refcount;
}
inline void intrusive_ptr_release(CollapsingFinalBlockInputStream::MergingBlock * ptr)
{
if (0 == --ptr->refcount)
ptr->output_blocks->push_back(ptr);
}
}

View File

@ -1,55 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
/** Combines several sources into one.
* Unlike UnionBlockInputStream, it does this sequentially.
* Blocks of different sources are not interleaved with each other.
*/
class ConcatBlockInputStream : public IBlockInputStream
{
public:
ConcatBlockInputStream(BlockInputStreams inputs_)
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
current_stream = children.begin();
}
String getName() const override { return "Concat"; }
Block getHeader() const override { return children.at(0)->getHeader(); }
/// We call readSuffix prematurely by ourself. Suppress default behaviour.
void readSuffix() override {}
protected:
Block readImpl() override
{
Block res;
while (current_stream != children.end())
{
res = (*current_stream)->read();
if (res)
break;
else
{
(*current_stream)->readSuffix();
++current_stream;
}
}
return res;
}
private:
BlockInputStreams::iterator current_stream;
};
}

View File

@ -1,49 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Columns/ColumnLowCardinality.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Columns/ColumnConst.h>
namespace DB
{
/** Combines several sources into one.
* Unlike UnionBlockInputStream, it does this sequentially.
* Blocks of different sources are not interleaved with each other.
*/
class ConvertColumnLowCardinalityToFullBlockInputStream : public IBlockInputStream
{
public:
explicit ConvertColumnLowCardinalityToFullBlockInputStream(const BlockInputStreamPtr & input)
{
children.push_back(input);
}
String getName() const override { return "ConvertColumnLowCardinalityToFull"; }
Block getHeader() const override { return convert(children.at(0)->getHeader()); }
protected:
Block readImpl() override { return convert(children.back()->read()); }
private:
Block convert(Block && block) const
{
for (auto & column : block)
{
if (auto * column_const = typeid_cast<const ColumnConst *>(column.column.get()))
column.column = column_const->removeLowCardinality();
else
column.column = column.column->convertToFullColumnIfLowCardinality();
if (auto * low_cardinality_type = typeid_cast<const DataTypeLowCardinality *>(column.type.get()))
column.type = low_cardinality_type->getDictionaryType();
}
return std::move(block);
}
};
}

View File

@ -1,93 +0,0 @@
#include <DataStreams/CubeBlockInputStream.h>
#include <DataStreams/finalizeBlock.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <Columns/ColumnAggregateFunction.h>
#include <Columns/FilterDescription.h>
#include <Common/typeid_cast.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TOO_MANY_COLUMNS;
}
CubeBlockInputStream::CubeBlockInputStream(
const BlockInputStreamPtr & input_, const Aggregator::Params & params_) : aggregator(params_),
keys(params_.keys)
{
if (keys.size() > 30)
throw Exception("Too many columns for cube", ErrorCodes::TOO_MANY_COLUMNS);
children.push_back(input_);
Aggregator::CancellationHook hook = [this]() { return this->isCancelled(); };
aggregator.setCancellationHook(hook);
}
Block CubeBlockInputStream::getHeader() const
{
Block res = children.at(0)->getHeader();
finalizeBlock(res);
return res;
}
Block CubeBlockInputStream::readImpl()
{
/** After reading all blocks from input stream,
* we will calculate all subsets of columns on next iterations of readImpl
* by zeroing columns at positions, where bits are zero in current bitmask.
*/
if (!is_data_read)
{
BlocksList source_blocks;
while (auto block = children[0]->read())
source_blocks.push_back(block);
if (source_blocks.empty())
return {};
is_data_read = true;
mask = (1 << keys.size()) - 1;
if (source_blocks.size() > 1)
source_block = aggregator.mergeBlocks(source_blocks, false);
else
source_block = std::move(source_blocks.front());
zero_block = source_block.cloneEmpty();
for (auto key : keys)
{
auto & current = zero_block.getByPosition(key);
current.column = current.column->cloneResized(source_block.rows());
}
auto finalized = source_block;
finalizeBlock(finalized);
return finalized;
}
if (!mask)
return {};
--mask;
auto cube_block = source_block;
for (size_t i = 0; i < keys.size(); ++i)
{
if (!((mask >> i) & 1))
{
size_t pos = keys.size() - i - 1;
auto & current = cube_block.getByPosition(keys[pos]);
current.column = zero_block.getByPosition(keys[pos]).column;
}
}
BlocksList cube_blocks = { cube_block };
Block finalized = aggregator.mergeBlocks(cube_blocks, true);
return finalized;
}
}

View File

@ -1,42 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Interpreters/Aggregator.h>
#include <Core/ColumnNumbers.h>
namespace DB
{
class ExpressionActions;
/** Takes blocks after grouping, with non-finalized aggregate functions.
* Calculates all subsets of columns and aggreagetes over them.
*/
class CubeBlockInputStream : public IBlockInputStream
{
private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
using AggregateColumns = std::vector<ColumnRawPtrs>;
public:
CubeBlockInputStream(
const BlockInputStreamPtr & input_, const Aggregator::Params & params_);
String getName() const override { return "Cube"; }
Block getHeader() const override;
protected:
Block readImpl() override;
private:
Aggregator aggregator;
ColumnNumbers keys;
UInt32 mask = 0;
Block source_block;
Block zero_block;
bool is_data_read = false;
};
}

View File

@ -1,122 +0,0 @@
#include <DataStreams/DistinctBlockInputStream.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SET_SIZE_LIMIT_EXCEEDED;
}
DistinctBlockInputStream::DistinctBlockInputStream(const BlockInputStreamPtr & input, const SizeLimits & set_size_limits_, UInt64 limit_hint_, const Names & columns_)
: columns_names(columns_)
, limit_hint(limit_hint_)
, set_size_limits(set_size_limits_)
{
children.push_back(input);
}
Block DistinctBlockInputStream::readImpl()
{
/// Execute until end of stream or until
/// a block with some new records will be gotten.
while (true)
{
if (no_more_rows)
return Block();
/// Stop reading if we already reach the limit.
if (limit_hint && data.getTotalRowCount() >= limit_hint)
return Block();
Block block = children[0]->read();
if (!block)
return Block();
const ColumnRawPtrs column_ptrs(getKeyColumns(block));
if (column_ptrs.empty())
{
/// Only constants. We need to return single row.
no_more_rows = true;
for (auto & elem : block)
elem.column = elem.column->cut(0, 1);
return block;
}
if (data.empty())
data.init(SetVariants::chooseMethod(column_ptrs, key_sizes));
const size_t old_set_size = data.getTotalRowCount();
const size_t rows = block.rows();
IColumn::Filter filter(rows);
switch (data.type)
{
case SetVariants::Type::EMPTY:
break;
#define M(NAME) \
case SetVariants::Type::NAME: \
buildFilter(*data.NAME, column_ptrs, filter, rows, data); \
break;
APPLY_FOR_SET_VARIANTS(M)
#undef M
}
/// Just go to the next block if there isn't any new record in the current one.
if (data.getTotalRowCount() == old_set_size)
continue;
if (!set_size_limits.check(data.getTotalRowCount(), data.getTotalByteCount(), "DISTINCT", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED))
return {};
for (auto & elem : block)
elem.column = elem.column->filter(filter, -1);
return block;
}
}
template <typename Method>
void DistinctBlockInputStream::buildFilter(
Method & method,
const ColumnRawPtrs & columns,
IColumn::Filter & filter,
size_t rows,
SetVariants & variants) const
{
typename Method::State state(columns, key_sizes, nullptr);
for (size_t i = 0; i < rows; ++i)
{
auto emplace_result = state.emplaceKey(method.data, i, variants.string_pool);
/// Emit the record if there is no such key in the current set yet.
/// Skip it otherwise.
filter[i] = emplace_result.isInserted();
}
}
ColumnRawPtrs DistinctBlockInputStream::getKeyColumns(const Block & block) const
{
size_t columns = columns_names.empty() ? block.columns() : columns_names.size();
ColumnRawPtrs column_ptrs;
column_ptrs.reserve(columns);
for (size_t i = 0; i < columns; ++i)
{
const auto & column = columns_names.empty()
? block.safeGetByPosition(i).column
: block.getByName(columns_names[i]).column;
/// Ignore all constant columns.
if (!isColumnConst(*column))
column_ptrs.emplace_back(column.get());
}
return column_ptrs;
}
}

View File

@ -1,52 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Interpreters/SetVariants.h>
namespace DB
{
/** This class is intended for implementation of SELECT DISTINCT clause and
* leaves only unique rows in the stream.
*
* To optimize the SELECT DISTINCT ... LIMIT clause we can
* set limit_hint to non zero value. So we stop emitting new rows after
* count of already emitted rows will reach the limit_hint.
*/
class DistinctBlockInputStream : public IBlockInputStream
{
public:
/// Empty columns_ means all collumns.
DistinctBlockInputStream(const BlockInputStreamPtr & input, const SizeLimits & set_size_limits_, UInt64 limit_hint_, const Names & columns_);
String getName() const override { return "Distinct"; }
Block getHeader() const override { return children.at(0)->getHeader(); }
protected:
Block readImpl() override;
private:
ColumnRawPtrs getKeyColumns(const Block & block) const;
template <typename Method>
void buildFilter(
Method & method,
const ColumnRawPtrs & key_columns,
IColumn::Filter & filter,
size_t rows,
SetVariants & variants) const;
Names columns_names;
SetVariants data;
Sizes key_sizes;
UInt64 limit_hint;
bool no_more_rows = false;
/// Restrictions on the maximum size of the output data.
SizeLimits set_size_limits;
};
}

View File

@ -21,7 +21,7 @@ namespace DB
class DistinctSortedBlockInputStream : public IBlockInputStream
{
public:
/// Empty columns_ means all collumns.
/// Empty columns_ means all columns.
DistinctSortedBlockInputStream(const BlockInputStreamPtr & input, const SizeLimits & set_size_limits_, UInt64 limit_hint_, const Names & columns);
String getName() const override { return "DistinctSorted"; }

View File

@ -1,186 +0,0 @@
#include <DataStreams/FillingBlockInputStream.h>
#include <Interpreters/convertFieldToType.h>
#include <DataTypes/DataTypesNumber.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_WITH_FILL_EXPRESSION;
}
FillingBlockInputStream::FillingBlockInputStream(
const BlockInputStreamPtr & input, const SortDescription & sort_description_)
: sort_description(sort_description_), filling_row(sort_description_), next_row(sort_description_)
{
children.push_back(input);
header = children.at(0)->getHeader();
std::vector<bool> is_fill_column(header.columns());
for (const auto & elem : sort_description)
is_fill_column[header.getPositionByName(elem.column_name)] = true;
auto try_convert_fields = [](FillColumnDescription & descr, const DataTypePtr & type)
{
auto max_type = Field::Types::Null;
WhichDataType which(type);
DataTypePtr to_type;
if (isInteger(type) || which.isDateOrDateTime())
{
max_type = Field::Types::Int64;
to_type = std::make_shared<DataTypeInt64>();
}
else if (which.isFloat())
{
max_type = Field::Types::Float64;
to_type = std::make_shared<DataTypeFloat64>();
}
if (descr.fill_from.getType() > max_type || descr.fill_to.getType() > max_type
|| descr.fill_step.getType() > max_type)
return false;
descr.fill_from = convertFieldToType(descr.fill_from, *to_type);
descr.fill_to = convertFieldToType(descr.fill_to, *to_type);
descr.fill_step = convertFieldToType(descr.fill_step, *to_type);
return true;
};
for (size_t i = 0; i < header.columns(); ++i)
{
if (is_fill_column[i])
{
size_t pos = fill_column_positions.size();
auto & descr = filling_row.getFillDescription(pos);
auto type = header.getByPosition(i).type;
if (!try_convert_fields(descr, type))
throw Exception("Incompatible types of WITH FILL expression values with column type "
+ type->getName(), ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
if (type->isValueRepresentedByUnsignedInteger() &&
((!descr.fill_from.isNull() && less(descr.fill_from, Field{0}, 1)) ||
(!descr.fill_to.isNull() && less(descr.fill_to, Field{0}, 1))))
{
throw Exception("WITH FILL bound values cannot be negative for unsigned type "
+ type->getName(), ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
}
fill_column_positions.push_back(i);
}
else
other_column_positions.push_back(i);
}
}
Block FillingBlockInputStream::readImpl()
{
Columns old_fill_columns;
Columns old_other_columns;
MutableColumns res_fill_columns;
MutableColumns res_other_columns;
auto init_columns_by_positions = [](const Block & block, Columns & columns,
MutableColumns & mutable_columns, const Positions & positions)
{
for (size_t pos : positions)
{
auto column = block.getByPosition(pos).column;
columns.push_back(column);
mutable_columns.push_back(column->cloneEmpty()->assumeMutable());
}
};
auto block = children.back()->read();
if (!block)
{
init_columns_by_positions(header, old_fill_columns, res_fill_columns, fill_column_positions);
init_columns_by_positions(header, old_other_columns, res_other_columns, other_column_positions);
bool should_insert_first = next_row < filling_row;
bool generated = false;
for (size_t i = 0; i < filling_row.size(); ++i)
next_row[i] = filling_row.getFillDescription(i).fill_to;
if (should_insert_first && filling_row < next_row)
insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
while (filling_row.next(next_row))
{
generated = true;
insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
}
if (generated)
return createResultBlock(res_fill_columns, res_other_columns);
return block;
}
size_t rows = block.rows();
init_columns_by_positions(block, old_fill_columns, res_fill_columns, fill_column_positions);
init_columns_by_positions(block, old_other_columns, res_other_columns, other_column_positions);
if (first)
{
for (size_t i = 0; i < filling_row.size(); ++i)
{
auto current_value = (*old_fill_columns[i])[0];
const auto & fill_from = filling_row.getFillDescription(i).fill_from;
if (!fill_from.isNull() && !equals(current_value, fill_from))
{
filling_row.initFromDefaults(i);
if (less(fill_from, current_value, filling_row.getDirection(i)))
insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
break;
}
filling_row[i] = current_value;
}
first = false;
}
for (size_t row_ind = 0; row_ind < rows; ++row_ind)
{
bool should_insert_first = next_row < filling_row;
for (size_t i = 0; i < filling_row.size(); ++i)
{
auto current_value = (*old_fill_columns[i])[row_ind];
const auto & fill_to = filling_row.getFillDescription(i).fill_to;
if (fill_to.isNull() || less(current_value, fill_to, filling_row.getDirection(i)))
next_row[i] = current_value;
else
next_row[i] = fill_to;
}
/// A case, when at previous step row was initialized from defaults 'fill_from' values
/// and probably we need to insert it to block.
if (should_insert_first && filling_row < next_row)
insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
/// Insert generated filling row to block, while it is less than current row in block.
while (filling_row.next(next_row))
insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
copyRowFromColumns(res_fill_columns, old_fill_columns, row_ind);
copyRowFromColumns(res_other_columns, old_other_columns, row_ind);
}
return createResultBlock(res_fill_columns, res_other_columns);
}
Block FillingBlockInputStream::createResultBlock(MutableColumns & fill_columns, MutableColumns & other_columns) const
{
MutableColumns result_columns(header.columns());
for (size_t i = 0; i < fill_columns.size(); ++i)
result_columns[fill_column_positions[i]] = std::move(fill_columns[i]);
for (size_t i = 0; i < other_columns.size(); ++i)
result_columns[other_column_positions[i]] = std::move(other_columns[i]);
return header.cloneWithColumns(std::move(result_columns));
}
}

View File

@ -1,39 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Interpreters/FillingRow.h>
namespace DB
{
/** Implements modifier WITH FILL of ORDER BY clause.
* It fills gaps in data stream by rows with missing values in columns with set WITH FILL and deafults in other columns.
* Optionally FROM, TO and STEP values can be specified.
*/
class FillingBlockInputStream : public IBlockInputStream
{
public:
FillingBlockInputStream(const BlockInputStreamPtr & input, const SortDescription & sort_description_);
String getName() const override { return "Filling"; }
Block getHeader() const override { return header; }
protected:
Block readImpl() override;
private:
Block createResultBlock(MutableColumns & fill_columns, MutableColumns & other_columns) const;
const SortDescription sort_description; /// Contains only rows with WITH FILL.
FillingRow filling_row; /// Current row, which is used to fill gaps.
FillingRow next_row; /// Row to which we need to generate filling rows.
Block header;
using Positions = std::vector<size_t>;
Positions fill_column_positions;
Positions other_column_positions;
bool first = true;
};
}

View File

@ -1,34 +0,0 @@
#include <DataStreams/FilterColumnsBlockInputStream.h>
namespace DB
{
Block FilterColumnsBlockInputStream::getHeader() const
{
Block block = children.back()->getHeader();
Block filtered;
for (const auto & it : columns_to_save)
if (throw_if_column_not_found || block.has(it))
filtered.insert(std::move(block.getByName(it)));
return filtered;
}
Block FilterColumnsBlockInputStream::readImpl()
{
Block block = children.back()->read();
if (!block)
return block;
Block filtered;
for (const auto & it : columns_to_save)
if (throw_if_column_not_found || block.has(it))
filtered.insert(std::move(block.getByName(it)));
return filtered;
}
}

View File

@ -1,37 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <iostream>
namespace DB
{
/// Removes columns other than columns_to_save_ from block,
/// and reorders columns as in columns_to_save_.
/// Functionality is similar to ExpressionBlockInputStream with ExpressionActions containing PROJECT action.
class FilterColumnsBlockInputStream : public IBlockInputStream
{
public:
FilterColumnsBlockInputStream(
const BlockInputStreamPtr & input, const Names & columns_to_save_, bool throw_if_column_not_found_)
: columns_to_save(columns_to_save_), throw_if_column_not_found(throw_if_column_not_found_)
{
children.push_back(input);
}
String getName() const override
{
return "FilterColumns";
}
Block getHeader() const override;
protected:
Block readImpl() override;
private:
Names columns_to_save;
bool throw_if_column_not_found;
};
}

View File

@ -1,164 +0,0 @@
#include <DataStreams/FinishSortingBlockInputStream.h>
#include <DataStreams/MergeSortingBlockInputStream.h>
#include <DataStreams/processConstants.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static bool isPrefix(const SortDescription & pref_descr, const SortDescription & descr)
{
if (pref_descr.size() > descr.size())
return false;
for (size_t i = 0; i < pref_descr.size(); ++i)
if (pref_descr[i] != descr[i])
return false;
return true;
}
FinishSortingBlockInputStream::FinishSortingBlockInputStream(
const BlockInputStreamPtr & input, const SortDescription & description_sorted_,
const SortDescription & description_to_sort_,
size_t max_merged_block_size_, UInt64 limit_)
: description_sorted(description_sorted_), description_to_sort(description_to_sort_),
max_merged_block_size(max_merged_block_size_), limit(limit_)
{
if (!isPrefix(description_sorted, description_to_sort))
throw Exception("Can`t finish sorting. SortDescription of already sorted stream is not prefix of "
"SortDescription needed to sort", ErrorCodes::LOGICAL_ERROR);
children.push_back(input);
header = children.at(0)->getHeader();
removeConstantsFromSortDescription(header, description_to_sort);
}
struct Less
{
const ColumnsWithSortDescriptions & left_columns;
const ColumnsWithSortDescriptions & right_columns;
Less(const ColumnsWithSortDescriptions & left_columns_, const ColumnsWithSortDescriptions & right_columns_) :
left_columns(left_columns_), right_columns(right_columns_) {}
bool operator() (size_t a, size_t b) const
{
for (auto it = left_columns.begin(), jt = right_columns.begin(); it != left_columns.end(); ++it, ++jt)
{
int res = it->description.direction * it->column->compareAt(a, b, *jt->column, it->description.nulls_direction);
if (res < 0)
return true;
else if (res > 0)
return false;
}
return false;
}
};
Block FinishSortingBlockInputStream::readImpl()
{
if (limit && total_rows_processed >= limit)
return {};
Block res;
if (impl)
res = impl->read();
/// If res block is empty, we have finished sorting previous chunk of blocks.
if (!res)
{
if (end_of_stream)
return {};
blocks.clear();
if (tail_block)
blocks.push_back(std::move(tail_block));
while (true)
{
Block block = children.back()->read();
/// End of input stream, but we can`t return immediately, we need to merge already read blocks.
/// Check it later, when get end of stream from impl.
if (!block)
{
end_of_stream = true;
break;
}
// If there were only const columns in sort description, then there is no need to sort.
// Return the blocks as is.
if (description_to_sort.empty())
return block;
if (block.rows() == 0)
continue;
removeConstantsFromBlock(block);
/// Find the position of last already read key in current block.
if (!blocks.empty())
{
const Block & last_block = blocks.back();
auto last_columns = getColumnsWithSortDescription(last_block, description_sorted);
auto current_columns = getColumnsWithSortDescription(block, description_sorted);
Less less(last_columns, current_columns);
size_t size = block.rows();
IColumn::Permutation perm(size);
for (size_t i = 0; i < size; ++i)
perm[i] = i;
auto * it = std::upper_bound(perm.begin(), perm.end(), last_block.rows() - 1, less);
/// We need to save tail of block, because next block may starts with the same key as in tail
/// and we should sort these rows in one chunk.
if (it != perm.end())
{
size_t tail_pos = it - perm.begin();
Block head_block = block.cloneEmpty();
tail_block = block.cloneEmpty();
for (size_t i = 0; i < block.columns(); ++i)
{
head_block.getByPosition(i).column = block.getByPosition(i).column->cut(0, tail_pos);
tail_block.getByPosition(i).column = block.getByPosition(i).column->cut(tail_pos, block.rows() - tail_pos);
}
if (head_block.rows())
blocks.push_back(head_block);
break;
}
}
/// If we reach here, that means that current block is first in chunk
/// or it all consists of rows with the same key as tail of a previous block.
blocks.push_back(block);
}
if (!blocks.empty())
{
impl = std::make_unique<MergeSortingBlocksBlockInputStream>(blocks, description_to_sort, max_merged_block_size, limit);
res = impl->read();
}
}
if (res)
enrichBlockWithConstants(res, header);
total_rows_processed += res.rows();
return res;
}
}

View File

@ -1,51 +0,0 @@
#pragma once
#include <Core/SortDescription.h>
#include <Interpreters/sortBlock.h>
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
/** Takes stream already sorted by `x` and finishes sorting it by (`x`, `y`).
* During sorting only blocks with rows that equal by `x` saved in RAM.
* */
class FinishSortingBlockInputStream : public IBlockInputStream
{
public:
/// limit - if not 0, allowed to return just first 'limit' rows in sorted order.
FinishSortingBlockInputStream(const BlockInputStreamPtr & input, const SortDescription & description_sorted_,
const SortDescription & description_to_sort_,
size_t max_merged_block_size_, UInt64 limit_);
String getName() const override { return "FinishSorting"; }
bool isSortedOutput() const override { return true; }
const SortDescription & getSortDescription() const override { return description_to_sort; }
Block getHeader() const override { return header; }
protected:
Block readImpl() override;
private:
SortDescription description_sorted;
SortDescription description_to_sort;
size_t max_merged_block_size;
UInt64 limit;
Block tail_block;
Blocks blocks;
std::unique_ptr<IBlockInputStream> impl;
/// Before operation, will remove constant columns from blocks. And after, place constant columns back.
/// to avoid excessive virtual function calls
/// Save original block structure here.
Block header;
bool end_of_stream = false;
size_t total_rows_processed = 0;
};
}

View File

@ -1,82 +0,0 @@
#include <DataStreams/LimitByBlockInputStream.h>
#include <Common/PODArray.h>
#include <Common/SipHash.h>
namespace DB
{
LimitByBlockInputStream::LimitByBlockInputStream(const BlockInputStreamPtr & input,
size_t group_length_, size_t group_offset_, const Names & columns)
: columns_names(columns)
, group_length(group_length_)
, group_offset(group_offset_)
{
children.push_back(input);
}
Block LimitByBlockInputStream::readImpl()
{
/// Execute until end of stream or until
/// a block with some new records will be gotten.
while (true)
{
Block block = children[0]->read();
if (!block)
return Block();
const ColumnRawPtrs column_ptrs(getKeyColumns(block));
const size_t rows = block.rows();
IColumn::Filter filter(rows);
size_t inserted_count = 0;
for (size_t i = 0; i < rows; ++i)
{
UInt128 key;
SipHash hash;
for (const auto & column : column_ptrs)
column->updateHashWithValue(i, hash);
hash.get128(key.low, key.high);
auto count = keys_counts[key]++;
if (count >= group_offset && count < group_length + group_offset)
{
inserted_count++;
filter[i] = 1;
}
else
filter[i] = 0;
}
/// Just go to the next block if there isn't any new records in the current one.
if (!inserted_count)
continue;
size_t all_columns = block.columns();
for (size_t i = 0; i < all_columns; ++i)
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(filter, inserted_count);
return block;
}
}
ColumnRawPtrs LimitByBlockInputStream::getKeyColumns(Block & block) const
{
ColumnRawPtrs column_ptrs;
column_ptrs.reserve(columns_names.size());
for (const auto & name : columns_names)
{
auto & column = block.getByName(name).column;
/// Ignore all constant columns.
if (!isColumnConst(*column))
column_ptrs.emplace_back(column.get());
}
return column_ptrs;
}
}

View File

@ -1,42 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Common/HashTable/HashMap.h>
#include <Common/UInt128.h>
namespace DB
{
/** Implements LIMIT BY clause witch can be used to obtain a "top N by subgroup".
*
* For example, if you have table T like this (Num: 1 1 3 3 3 4 4 5 7 7 7 7),
* the query SELECT Num FROM T LIMIT 2 BY Num
* will give you the following result: (Num: 1 1 3 3 4 4 5 7 7).
*/
class LimitByBlockInputStream : public IBlockInputStream
{
public:
LimitByBlockInputStream(const BlockInputStreamPtr & input, size_t group_length_, size_t group_offset_, const Names & columns);
String getName() const override { return "LimitBy"; }
Block getHeader() const override { return children.at(0)->getHeader(); }
protected:
Block readImpl() override;
private:
ColumnRawPtrs getKeyColumns(Block & block) const;
private:
using MapHashed = HashMap<UInt128, UInt64, UInt128TrivialHash>;
const Names columns_names;
const size_t group_length;
const size_t group_offset;
MapHashed keys_counts;
};
}

View File

@ -1,277 +0,0 @@
#include <DataStreams/MergeSortingBlockInputStream.h>
#include <DataStreams/MergingSortedBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/TemporaryFileStream.h>
#include <DataStreams/processConstants.h>
#include <Common/formatReadable.h>
#include <IO/WriteBufferFromFile.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Interpreters/sortBlock.h>
#include <Disks/StoragePolicy.h>
namespace ProfileEvents
{
extern const Event ExternalSortWritePart;
extern const Event ExternalSortMerge;
}
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_ENOUGH_SPACE;
}
MergeSortingBlockInputStream::MergeSortingBlockInputStream(
const BlockInputStreamPtr & input, SortDescription & description_,
size_t max_merged_block_size_, UInt64 limit_, size_t max_bytes_before_remerge_,
size_t max_bytes_before_external_sort_, VolumePtr tmp_volume_, const String & codec_, size_t min_free_disk_space_)
: description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_),
max_bytes_before_remerge(max_bytes_before_remerge_),
max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_volume(tmp_volume_),
codec(codec_),
min_free_disk_space(min_free_disk_space_)
{
children.push_back(input);
header = children.at(0)->getHeader();
header_without_constants = header;
removeConstantsFromBlock(header_without_constants);
removeConstantsFromSortDescription(header, description);
}
Block MergeSortingBlockInputStream::readImpl()
{
/** Algorithm:
* - read to memory blocks from source stream;
* - if too many of them and if external sorting is enabled,
* - merge all blocks to sorted stream and write it to temporary file;
* - at the end, merge all sorted streams from temporary files and also from rest of blocks in memory.
*/
/// If has not read source blocks.
if (!impl)
{
while (Block block = children.back()->read())
{
/// If there were only const columns in sort description, then there is no need to sort.
/// Return the blocks as is.
if (description.empty())
return block;
removeConstantsFromBlock(block);
blocks.push_back(block);
sum_rows_in_blocks += block.rows();
sum_bytes_in_blocks += block.allocatedBytes();
/** If significant amount of data was accumulated, perform preliminary merging step.
*/
if (blocks.size() > 1
&& limit
&& limit * 2 < sum_rows_in_blocks /// 2 is just a guess.
&& remerge_is_useful
&& max_bytes_before_remerge
&& sum_bytes_in_blocks > max_bytes_before_remerge)
{
remerge();
}
/** If too many of them and if external sorting is enabled,
* will merge blocks that we have in memory at this moment and write merged stream to temporary (compressed) file.
* NOTE. It's possible to check free space in filesystem.
*/
if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort)
{
size_t size = sum_bytes_in_blocks + min_free_disk_space;
auto reservation = tmp_volume->reserve(size);
if (!reservation)
throw Exception("Not enough space for external sort in temporary storage", ErrorCodes::NOT_ENOUGH_SPACE);
const std::string tmp_path(reservation->getDisk()->getPath());
temporary_files.emplace_back(createTemporaryFile(tmp_path));
const std::string & path = temporary_files.back()->path();
MergeSortingBlocksBlockInputStream block_in(blocks, description, max_merged_block_size, limit);
LOG_INFO(log, "Sorting and writing part of data into temporary file {}", path);
ProfileEvents::increment(ProfileEvents::ExternalSortWritePart);
TemporaryFileStream::write(path, header_without_constants, block_in, &is_cancelled, codec); /// NOTE. Possibly limit disk usage.
LOG_INFO(log, "Done writing part of data into temporary file {}", path);
blocks.clear();
sum_bytes_in_blocks = 0;
sum_rows_in_blocks = 0;
}
}
if ((blocks.empty() && temporary_files.empty()) || isCancelledOrThrowIfKilled())
return Block();
if (temporary_files.empty())
{
impl = std::make_unique<MergeSortingBlocksBlockInputStream>(blocks, description, max_merged_block_size, limit);
}
else
{
/// If there was temporary files.
ProfileEvents::increment(ProfileEvents::ExternalSortMerge);
LOG_INFO(log, "There are {} temporary sorted parts to merge.", temporary_files.size());
/// Create sorted streams to merge.
for (const auto & file : temporary_files)
{
temporary_inputs.emplace_back(std::make_unique<TemporaryFileStream>(file->path(), header_without_constants));
inputs_to_merge.emplace_back(temporary_inputs.back()->block_in);
}
/// Rest of blocks in memory.
if (!blocks.empty())
inputs_to_merge.emplace_back(std::make_shared<MergeSortingBlocksBlockInputStream>(blocks, description, max_merged_block_size, limit));
/// Will merge that sorted streams.
impl = std::make_unique<MergingSortedBlockInputStream>(inputs_to_merge, description, max_merged_block_size, limit);
}
}
Block res = impl->read();
if (res)
enrichBlockWithConstants(res, header);
return res;
}
MergeSortingBlocksBlockInputStream::MergeSortingBlocksBlockInputStream(
Blocks & blocks_, const SortDescription & description_, size_t max_merged_block_size_, UInt64 limit_)
: blocks(blocks_), header(blocks.at(0).cloneEmpty()), description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_)
{
Blocks nonempty_blocks;
for (const auto & block : blocks)
{
if (block.rows() == 0)
continue;
nonempty_blocks.push_back(block);
cursors.emplace_back(block, description);
has_collation |= cursors.back().has_collation;
}
blocks.swap(nonempty_blocks);
if (has_collation)
queue_with_collation = SortingHeap<SortCursorWithCollation>(cursors);
else if (description.size() > 1)
queue_without_collation = SortingHeap<SortCursor>(cursors);
else
queue_simple = SortingHeap<SimpleSortCursor>(cursors);
}
Block MergeSortingBlocksBlockInputStream::readImpl()
{
if (blocks.empty())
return Block();
if (blocks.size() == 1)
{
Block res = blocks[0];
blocks.clear();
return res;
}
if (has_collation)
return mergeImpl(queue_with_collation);
else if (description.size() > 1)
return mergeImpl(queue_without_collation);
else
return mergeImpl(queue_simple);
}
template <typename TSortingHeap>
Block MergeSortingBlocksBlockInputStream::mergeImpl(TSortingHeap & queue)
{
size_t num_columns = header.columns();
MutableColumns merged_columns = header.cloneEmptyColumns();
/// Reserve
if (queue.isValid() && !blocks.empty())
{
/// The expected size of output block is the same as input block
size_t size_to_reserve = blocks[0].rows();
for (auto & column : merged_columns)
column->reserve(size_to_reserve);
}
/// TODO: Optimization when a single block left.
/// Take rows from queue in right order and push to 'merged'.
size_t merged_rows = 0;
while (queue.isValid())
{
auto current = queue.current();
/// Append a row from queue.
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*current->all_columns[i], current->pos);
++total_merged_rows;
++merged_rows;
/// We don't need more rows because of limit has reached.
if (limit && total_merged_rows == limit)
{
blocks.clear();
break;
}
queue.next();
/// It's enough for current output block but we will continue.
if (merged_rows == max_merged_block_size)
break;
}
if (!queue.isValid())
blocks.clear();
if (merged_rows == 0)
return {};
return header.cloneWithColumns(std::move(merged_columns));
}
void MergeSortingBlockInputStream::remerge()
{
LOG_DEBUG(log, "Re-merging intermediate ORDER BY data ({} blocks with {} rows) to save memory consumption", blocks.size(), sum_rows_in_blocks);
/// NOTE Maybe concat all blocks and partial sort will be faster than merge?
MergeSortingBlocksBlockInputStream merger(blocks, description, max_merged_block_size, limit);
Blocks new_blocks;
size_t new_sum_rows_in_blocks = 0;
size_t new_sum_bytes_in_blocks = 0;
merger.readPrefix();
while (Block block = merger.read())
{
new_sum_rows_in_blocks += block.rows();
new_sum_bytes_in_blocks += block.allocatedBytes();
new_blocks.emplace_back(std::move(block));
}
merger.readSuffix();
LOG_DEBUG(log, "Memory usage is lowered from {} to {}", ReadableSize(sum_bytes_in_blocks), ReadableSize(new_sum_bytes_in_blocks));
/// If the memory consumption was not lowered enough - we will not perform remerge anymore. 2 is a guess.
if (new_sum_bytes_in_blocks * 2 > sum_bytes_in_blocks)
remerge_is_useful = false;
blocks = std::move(new_blocks);
sum_rows_in_blocks = new_sum_rows_in_blocks;
sum_bytes_in_blocks = new_sum_bytes_in_blocks;
}
}

View File

@ -1,131 +0,0 @@
#pragma once
#include <common/logger_useful.h>
#include <Common/filesystemHelpers.h>
#include <Core/SortDescription.h>
#include <Core/SortCursor.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <IO/ReadBufferFromFile.h>
#include <Compression/CompressedReadBuffer.h>
namespace DB
{
struct TemporaryFileStream;
class IVolume;
using VolumePtr = std::shared_ptr<IVolume>;
namespace ErrorCodes
{
}
/** Merges stream of sorted each-separately blocks to sorted as-a-whole stream of blocks.
* If data to sort is too much, could use external sorting, with temporary files.
*/
/** Part of implementation. Merging array of ready (already read from somewhere) blocks.
* Returns result of merge as stream of blocks, not more than 'max_merged_block_size' rows in each.
*/
class MergeSortingBlocksBlockInputStream : public IBlockInputStream
{
public:
/// limit - if not 0, allowed to return just first 'limit' rows in sorted order.
MergeSortingBlocksBlockInputStream(Blocks & blocks_, const SortDescription & description_,
size_t max_merged_block_size_, UInt64 limit_ = 0);
String getName() const override { return "MergeSortingBlocks"; }
bool isSortedOutput() const override { return true; }
const SortDescription & getSortDescription() const override { return description; }
Block getHeader() const override { return header; }
protected:
Block readImpl() override;
private:
Blocks & blocks;
Block header;
SortDescription description;
size_t max_merged_block_size;
UInt64 limit;
size_t total_merged_rows = 0;
SortCursorImpls cursors;
bool has_collation = false;
SortingHeap<SortCursor> queue_without_collation;
SortingHeap<SimpleSortCursor> queue_simple;
SortingHeap<SortCursorWithCollation> queue_with_collation;
/** Two different cursors are supported - with and without Collation.
* Templates are used (instead of virtual functions in SortCursor) for zero-overhead.
*/
template <typename TSortingHeap>
Block mergeImpl(TSortingHeap & queue);
};
class MergeSortingBlockInputStream : public IBlockInputStream
{
public:
/// limit - if not 0, allowed to return just first 'limit' rows in sorted order.
MergeSortingBlockInputStream(const BlockInputStreamPtr & input, SortDescription & description_,
size_t max_merged_block_size_, UInt64 limit_,
size_t max_bytes_before_remerge_,
size_t max_bytes_before_external_sort_, VolumePtr tmp_volume_,
const String & codec_,
size_t min_free_disk_space_);
String getName() const override { return "MergeSorting"; }
bool isSortedOutput() const override { return true; }
const SortDescription & getSortDescription() const override { return description; }
Block getHeader() const override { return header; }
protected:
Block readImpl() override;
private:
SortDescription description;
size_t max_merged_block_size;
UInt64 limit;
size_t max_bytes_before_remerge;
size_t max_bytes_before_external_sort;
VolumePtr tmp_volume;
String codec;
size_t min_free_disk_space;
Poco::Logger * log = &Poco::Logger::get("MergeSortingBlockInputStream");
Blocks blocks;
size_t sum_rows_in_blocks = 0;
size_t sum_bytes_in_blocks = 0;
std::unique_ptr<IBlockInputStream> impl;
/// Before operation, will remove constant columns from blocks. And after, place constant columns back.
/// (to avoid excessive virtual function calls and because constants cannot be serialized in Native format for temporary files)
/// Save original block structure here.
Block header;
Block header_without_constants;
/// Everything below is for external sorting.
std::vector<std::unique_ptr<TemporaryFile>> temporary_files;
std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs;
BlockInputStreams inputs_to_merge;
/// Merge all accumulated blocks to keep no more than limit rows.
void remerge();
/// If remerge doesn't save memory at least several times, mark it as useless and don't do it anymore.
bool remerge_is_useful = true;
};
}

View File

@ -1,41 +0,0 @@
#include <Columns/ColumnsNumber.h>
#include <DataStreams/MergingAggregatedBlockInputStream.h>
namespace DB
{
Block MergingAggregatedBlockInputStream::getHeader() const
{
return aggregator.getHeader(final);
}
Block MergingAggregatedBlockInputStream::readImpl()
{
if (!executed)
{
executed = true;
AggregatedDataVariants data_variants;
Aggregator::CancellationHook hook = [&]() { return this->isCancelled(); };
aggregator.setCancellationHook(hook);
aggregator.mergeStream(children.back(), data_variants, max_threads);
blocks = aggregator.convertToBlocks(data_variants, final, max_threads);
it = blocks.begin();
}
Block res;
if (isCancelledOrThrowIfKilled() || it == blocks.end())
return res;
res = std::move(*it);
++it;
return res;
}
}

View File

@ -1,40 +0,0 @@
#pragma once
#include <Interpreters/Aggregator.h>
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
/** A pre-aggregate stream of blocks in which each block is already aggregated.
* Aggregate functions in blocks should not be finalized so that their states can be merged.
*/
class MergingAggregatedBlockInputStream : public IBlockInputStream
{
public:
MergingAggregatedBlockInputStream(const BlockInputStreamPtr & input, const Aggregator::Params & params, bool final_, size_t max_threads_)
: aggregator(params), final(final_), max_threads(max_threads_)
{
children.push_back(input);
}
String getName() const override { return "MergingAggregated"; }
Block getHeader() const override;
protected:
Block readImpl() override;
private:
Aggregator aggregator;
bool final;
size_t max_threads;
bool executed = false;
BlocksList blocks;
BlocksList::iterator it;
};
}

View File

@ -1,626 +0,0 @@
#include <future>
#include <Common/setThreadName.h>
#include <Common/CurrentMetrics.h>
#include <DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h>
#include <Common/CurrentThread.h>
namespace CurrentMetrics
{
extern const Metric QueryThread;
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
/** Scheme of operation:
*
* We have to output blocks in specific order: by bucket number:
*
* o o o o ... o
* 0 1 2 3 255
*
* Each block is the result of merge of blocks with same bucket number from several sources:
*
* src1 o o ...
* | |
* src2 o o
*
* | |
* v v
*
* result o o
* 0 1
*
* (we must merge 0th block from src1 with 0th block from src2 to form 0th result block and so on)
*
* We may read (request over network) blocks from different sources in parallel.
* It is done by getNextBlocksToMerge method. Number of threads is 'reading_threads'.
*
* Also, we may do merges for different buckets in parallel.
* For example, we may
* merge 1th block from src1 with 1th block from src2 in one thread
* and merge 2nd block from src1 with 2nd block from src2 in other thread.
* Number of threads is 'merging_threads'
* And we must keep only 'merging_threads' buckets of blocks in memory simultaneously,
* because our goal is to limit memory usage: not to keep all result in memory, but return it in streaming form.
*
* So, we return result sequentially, but perform calculations of resulting blocks in parallel.
* (calculation - is doing merge of source blocks for same buckets)
*
* Example:
*
* src1 . . o o . . .
* | |
* src2 o o
*
* | |
* v v
*
* result . . o o . . .
*
* In this picture, we do only two merges in parallel.
* When a merge is done, method 'getNextBlocksToMerge' is called to get blocks from sources for next bucket.
* Then next merge is performed.
*
* Main ('readImpl') method is waiting for merged blocks for next bucket and returns it.
*/
MergingAggregatedMemoryEfficientBlockInputStream::MergingAggregatedMemoryEfficientBlockInputStream(
BlockInputStreams inputs_, const Aggregator::Params & params, bool final_, size_t reading_threads_, size_t merging_threads_)
: aggregator(params), final(final_),
reading_threads(std::min(reading_threads_, inputs_.size())), merging_threads(merging_threads_),
inputs(inputs_.begin(), inputs_.end())
{
children = inputs_;
/** Create threads that will request and read data from remote servers.
*/
if (reading_threads > 1)
reading_pool = std::make_unique<ThreadPool>(reading_threads);
/** Create threads. Each of them will pull next set of blocks to merge in a loop,
* then merge them and place result in a queue (in fact, ordered map), from where we will read ready result blocks.
*/
if (merging_threads > 1)
parallel_merge_data = std::make_unique<ParallelMergeData>(merging_threads);
}
Block MergingAggregatedMemoryEfficientBlockInputStream::getHeader() const
{
return aggregator.getHeader(final);
}
void MergingAggregatedMemoryEfficientBlockInputStream::readPrefix()
{
start();
}
void MergingAggregatedMemoryEfficientBlockInputStream::readSuffix()
{
if (!all_read && !isCancelled())
throw Exception("readSuffix called before all data is read", ErrorCodes::LOGICAL_ERROR);
finalize();
for (auto & child : children)
child->readSuffix();
}
void MergingAggregatedMemoryEfficientBlockInputStream::cancel(bool kill)
{
if (kill)
is_killed = true;
bool old_val = false;
if (!is_cancelled.compare_exchange_strong(old_val, true))
return;
if (parallel_merge_data)
{
{
std::unique_lock lock(parallel_merge_data->merged_blocks_mutex);
parallel_merge_data->finish = true;
}
parallel_merge_data->merged_blocks_changed.notify_one(); /// readImpl method must stop waiting and exit.
parallel_merge_data->have_space.notify_all(); /// Merging threads must stop waiting and exit.
}
for (auto & input : inputs)
{
try
{
input.stream->cancel(kill);
}
catch (...)
{
/** If failed to ask to stop processing one or more sources.
* (example: connection reset during distributed query execution)
* - then don't care.
*/
LOG_ERROR(log, "Exception while cancelling {}", input.stream->getName());
}
}
}
void MergingAggregatedMemoryEfficientBlockInputStream::start()
{
if (started)
return;
started = true;
/// If child is RemoteBlockInputStream, then child->readPrefix() will send query to remote server, initiating calculations.
if (reading_threads == 1)
{
for (auto & child : children)
child->readPrefix();
}
else
{
size_t num_children = children.size();
try
{
for (size_t i = 0; i < num_children; ++i)
{
auto & child = children[i];
auto thread_group = CurrentThread::getGroup();
reading_pool->scheduleOrThrowOnError([&child, thread_group]
{
setThreadName("MergeAggReadThr");
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
child->readPrefix();
});
}
}
catch (...)
{
reading_pool->wait();
throw;
}
reading_pool->wait();
}
if (merging_threads > 1)
{
auto & pool = parallel_merge_data->pool;
/** Create threads that will receive and merge blocks.
*/
for (size_t i = 0; i < merging_threads; ++i)
pool.scheduleOrThrowOnError([this, thread_group = CurrentThread::getGroup()]() { mergeThread(thread_group); });
}
}
Block MergingAggregatedMemoryEfficientBlockInputStream::readImpl()
{
start();
if (!parallel_merge_data)
{
if (BlocksToMerge blocks_to_merge = getNextBlocksToMerge())
return aggregator.mergeBlocks(*blocks_to_merge, final);
return {};
}
else
{
Block res;
while (true)
{
std::unique_lock lock(parallel_merge_data->merged_blocks_mutex);
parallel_merge_data->merged_blocks_changed.wait(lock, [this]
{
return parallel_merge_data->finish /// Requested to finish early.
|| parallel_merge_data->exception /// An error in merging thread.
|| parallel_merge_data->exhausted /// No more data in sources.
|| !parallel_merge_data->merged_blocks.empty(); /// Have another merged block.
});
if (parallel_merge_data->exception)
std::rethrow_exception(parallel_merge_data->exception);
if (parallel_merge_data->finish)
break;
bool have_merged_block_or_merging_in_progress = !parallel_merge_data->merged_blocks.empty();
if (parallel_merge_data->exhausted && !have_merged_block_or_merging_in_progress)
break;
if (have_merged_block_or_merging_in_progress)
{
auto it = parallel_merge_data->merged_blocks.begin();
if (it->second)
{
res.swap(it->second);
parallel_merge_data->merged_blocks.erase(it);
lock.unlock();
parallel_merge_data->have_space.notify_one(); /// We consumed block. Merging thread may merge next block for us.
break;
}
}
}
if (!res)
all_read = true;
return res;
}
}
MergingAggregatedMemoryEfficientBlockInputStream::~MergingAggregatedMemoryEfficientBlockInputStream()
{
try
{
if (!all_read)
cancel(false);
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void MergingAggregatedMemoryEfficientBlockInputStream::finalize()
{
if (!started)
return;
LOG_TRACE(log, "Waiting for threads to finish");
if (parallel_merge_data)
parallel_merge_data->pool.wait();
LOG_TRACE(log, "Waited for threads to finish");
}
void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(ThreadGroupStatusPtr thread_group)
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
try
{
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
setThreadName("MergeAggMergThr");
while (!parallel_merge_data->finish)
{
/** Receiving next blocks is processing by one thread pool, and merge is in another.
* This is quite complex interaction.
* Each time:
* - 'reading_threads' will read one next block from each source;
* - group of blocks for merge is created from them;
* - one of 'merging_threads' will do merge this group of blocks;
*/
BlocksToMerge blocks_to_merge;
int output_order = -1;
/** Synchronously:
* - fetch next blocks from sources,
* wait for space in 'merged_blocks'
* and reserve a place in 'merged_blocks' to do merge of them;
* - or, if no next blocks, set 'exhausted' flag.
*/
{
std::lock_guard lock_next_blocks(parallel_merge_data->get_next_blocks_mutex);
if (parallel_merge_data->exhausted || parallel_merge_data->finish)
break;
blocks_to_merge = getNextBlocksToMerge();
if (!blocks_to_merge || blocks_to_merge->empty())
{
{
std::unique_lock lock_merged_blocks(parallel_merge_data->merged_blocks_mutex);
parallel_merge_data->exhausted = true;
}
/// No new blocks has been read from sources. (But maybe, in another mergeThread, some previous block is still prepared.)
parallel_merge_data->merged_blocks_changed.notify_one();
break;
}
output_order = blocks_to_merge->front().info.is_overflows
? NUM_BUCKETS /// "Overflow" blocks returned by 'getNextBlocksToMerge' after all other blocks.
: blocks_to_merge->front().info.bucket_num;
{
std::unique_lock lock_merged_blocks(parallel_merge_data->merged_blocks_mutex);
parallel_merge_data->have_space.wait(lock_merged_blocks, [this]
{
return parallel_merge_data->merged_blocks.size() < merging_threads
|| parallel_merge_data->finish;
});
if (parallel_merge_data->finish)
break;
/** Place empty block. It is promise to do merge and fill it.
* Main thread knows, that there will be result for 'output_order' place.
* Main thread must return results exactly in 'output_order', so that is important.
*/
parallel_merge_data->merged_blocks[output_order]; //-V607
}
}
/// At this point, several merge threads may work in parallel.
Block res = aggregator.mergeBlocks(*blocks_to_merge, final);
{
std::lock_guard lock(parallel_merge_data->merged_blocks_mutex);
if (parallel_merge_data->finish)
break;
parallel_merge_data->merged_blocks[output_order] = res;
}
/// Notify that we have another merged block.
parallel_merge_data->merged_blocks_changed.notify_one();
}
}
catch (...)
{
{
std::lock_guard lock(parallel_merge_data->merged_blocks_mutex);
parallel_merge_data->exception = std::current_exception();
parallel_merge_data->finish = true;
}
parallel_merge_data->merged_blocks_changed.notify_one();
parallel_merge_data->have_space.notify_all();
}
}
MergingAggregatedMemoryEfficientBlockInputStream::BlocksToMerge MergingAggregatedMemoryEfficientBlockInputStream::getNextBlocksToMerge()
{
/** There are several input sources.
* From each of them, data may be received in one of following forms:
*
* 1. Block with specified 'bucket_num'.
* It means, that on remote server, data was partitioned by buckets.
* And data for each 'bucket_num' from different servers may be merged independently.
* Because data in different buckets will contain different aggregation keys.
* Data for different 'bucket_num's will be received in increasing order of 'bucket_num'.
*
* 2. Block without specified 'bucket_num'.
* It means, that on remote server, data was not partitioned by buckets.
* If all servers will send non-partitioned data, we may just merge it.
* But if some other servers will send partitioned data,
* then we must first partition non-partitioned data, and then merge data in each partition.
*
* 3. Blocks with 'is_overflows' = true.
* It is additional data, that was not passed 'max_rows_to_group_by' threshold.
* It must be merged together independently of ordinary data.
*/
++current_bucket_num;
/// Read from source next block with bucket number not greater than 'current_bucket_num'.
auto need_that_input = [this] (Input & input)
{
return !input.is_exhausted
&& input.block.info.bucket_num < current_bucket_num;
};
auto read_from_input = [this] (Input & input)
{
/// If block with 'overflows' (not ordinary data) will be received, then remember that block and repeat.
while (true)
{
// std::cerr << "reading block\n";
Block block = input.stream->read();
if (!block)
{
// std::cerr << "input is exhausted\n";
input.is_exhausted = true;
break;
}
if (block.info.bucket_num != -1)
{
/// One of partitioned blocks for two-level data.
// std::cerr << "block for bucket " << block.info.bucket_num << "\n";
has_two_level = true;
input.block = block;
}
else if (block.info.is_overflows)
{
// std::cerr << "block for overflows\n";
has_overflows = true;
input.overflow_block = block;
continue;
}
else
{
/// Block for non-partitioned (single-level) data.
// std::cerr << "block without bucket\n";
input.block = block;
}
break;
}
};
if (reading_threads == 1)
{
for (auto & input : inputs)
if (need_that_input(input))
read_from_input(input);
}
else
{
try
{
for (auto & input : inputs)
{
if (need_that_input(input))
{
auto thread_group = CurrentThread::getGroup();
reading_pool->scheduleOrThrowOnError([&input, &read_from_input, thread_group]
{
setThreadName("MergeAggReadThr");
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
read_from_input(input);
});
}
}
}
catch (...)
{
reading_pool->wait();
throw;
}
reading_pool->wait();
}
while (true)
{
if (current_bucket_num >= NUM_BUCKETS)
{
/// All ordinary data was processed. Maybe, there are also 'overflows'-blocks.
// std::cerr << "at end\n";
if (has_overflows)
{
// std::cerr << "merging overflows\n";
has_overflows = false;
BlocksToMerge blocks_to_merge = std::make_unique<BlocksList>();
for (auto & input : inputs)
if (input.overflow_block)
blocks_to_merge->emplace_back(std::move(input.overflow_block));
return blocks_to_merge;
}
else
return {};
}
else if (has_two_level)
{
/** Having two-level (partitioned) data.
* Will process by bucket numbers in increasing order.
* Find minimum bucket number, for which there is data
* - this will be data for merge.
*/
// std::cerr << "has two level\n";
int min_bucket_num = NUM_BUCKETS;
for (auto & input : inputs)
{
/// Blocks for already partitioned (two-level) data.
if (input.block.info.bucket_num != -1 && input.block.info.bucket_num < min_bucket_num)
min_bucket_num = input.block.info.bucket_num;
/// Not yet partitioned (splitted to buckets) block. Will partition it and place result to 'splitted_blocks'.
if (input.block.info.bucket_num == -1 && input.block && input.splitted_blocks.empty())
{
LOG_TRACE(&Poco::Logger::get("MergingAggregatedMemoryEfficient"), "Having block without bucket: will split.");
input.splitted_blocks = aggregator.convertBlockToTwoLevel(input.block);
input.block = Block();
}
/// Blocks we got by splitting non-partitioned blocks.
if (!input.splitted_blocks.empty())
{
for (const auto & block : input.splitted_blocks)
{
if (block && block.info.bucket_num < min_bucket_num)
{
min_bucket_num = block.info.bucket_num;
break;
}
}
}
}
current_bucket_num = min_bucket_num;
// std::cerr << "current_bucket_num = " << current_bucket_num << "\n";
/// No more blocks with ordinary data.
if (current_bucket_num == NUM_BUCKETS)
continue;
/// Collect all blocks for 'current_bucket_num' to do merge.
BlocksToMerge blocks_to_merge = std::make_unique<BlocksList>();
for (auto & input : inputs)
{
if (input.block.info.bucket_num == current_bucket_num)
{
// std::cerr << "having block for current_bucket_num\n";
blocks_to_merge->emplace_back(std::move(input.block));
input.block = Block();
}
else if (!input.splitted_blocks.empty() && input.splitted_blocks[min_bucket_num])
{
// std::cerr << "having splitted data for bucket\n";
blocks_to_merge->emplace_back(std::move(input.splitted_blocks[min_bucket_num]));
input.splitted_blocks[min_bucket_num] = Block();
}
}
return blocks_to_merge;
}
else
{
/// There are only non-partitioned (single-level) data. Just merge them.
// std::cerr << "don't have two level\n";
BlocksToMerge blocks_to_merge = std::make_unique<BlocksList>();
for (auto & input : inputs)
if (input.block)
blocks_to_merge->emplace_back(std::move(input.block));
current_bucket_num = NUM_BUCKETS;
return blocks_to_merge;
}
}
}
}

View File

@ -1,158 +0,0 @@
#pragma once
#include <Interpreters/Aggregator.h>
#include <DataStreams/IBlockInputStream.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/CurrentThread.h>
#include <Common/ThreadPool.h>
#include <condition_variable>
namespace DB
{
/** Pre-aggregates block streams, holding in RAM only one or more (up to merging_threads) blocks from each source.
* This saves RAM in case of using two-level aggregation, where in each source there will be up to 256 blocks with parts of the result.
*
* Aggregate functions in blocks should not be finalized so that their states can be combined.
*
* Used to solve two tasks:
*
* 1. External aggregation with data flush to disk.
* Partially aggregated data (previously divided into 256 buckets) is flushed to some number of files on the disk.
* We need to read them and merge them by buckets - keeping only a few buckets from each file in RAM simultaneously.
*
* 2. Merge aggregation results for distributed query processing.
* Partially aggregated data arrives from different servers, which can be splitted down or not, into 256 buckets,
* and these buckets are passed to us by the network from each server in sequence, one by one.
* You should also read and merge by the buckets.
*
* The essence of the work:
*
* There are a number of sources. They give out blocks with partially aggregated data.
* Each source can return one of the following block sequences:
* 1. "unsplitted" block with bucket_num = -1;
* 2. "splitted" (two_level) blocks with bucket_num from 0 to 255;
* In both cases, there may also be a block of "overflows" with bucket_num = -1 and is_overflows = true;
*
* We start from the convention that splitted blocks are always passed in the order of bucket_num.
* That is, if a < b, then the bucket_num = a block goes before bucket_num = b.
* This is needed for a memory-efficient merge
* - so that you do not need to read the blocks up front, but go all the way up by bucket_num.
*
* In this case, not all bucket_num from the range of 0..255 can be present.
* The overflow block can be presented in any order relative to other blocks (but it can be only one).
*
* It is necessary to combine these sequences of blocks and return the result as a sequence with the same properties.
* That is, at the output, if there are "splitted" blocks in the sequence, then they should go in the order of bucket_num.
*
* The merge can be performed using several (merging_threads) threads.
* For this, receiving of a set of blocks for the next bucket_num should be done sequentially,
* and then, when we have several received sets, they can be merged in parallel.
*
* When you receive next blocks from different sources,
* data from sources can also be read in several threads (reading_threads)
* for optimal performance in the presence of a fast network or disks (from where these blocks are read).
*/
class MergingAggregatedMemoryEfficientBlockInputStream final : public IBlockInputStream
{
public:
MergingAggregatedMemoryEfficientBlockInputStream(
BlockInputStreams inputs_, const Aggregator::Params & params, bool final_,
size_t reading_threads_, size_t merging_threads_);
~MergingAggregatedMemoryEfficientBlockInputStream() override;
String getName() const override { return "MergingAggregatedMemoryEfficient"; }
/// Sends the request (initiates calculations) earlier than `read`.
void readPrefix() override;
/// Called either after everything is read, or after cancel.
void readSuffix() override;
/** Different from the default implementation by trying to stop all sources,
* skipping failed by execution.
*/
void cancel(bool kill) override;
Block getHeader() const override;
protected:
Block readImpl() override;
private:
static constexpr int NUM_BUCKETS = 256;
Aggregator aggregator;
bool final;
size_t reading_threads;
size_t merging_threads;
bool started = false;
bool all_read = false;
std::atomic<bool> has_two_level {false};
std::atomic<bool> has_overflows {false};
int current_bucket_num = -1;
Poco::Logger * log = &Poco::Logger::get("MergingAggregatedMemoryEfficientBlockInputStream");
struct Input
{
BlockInputStreamPtr stream;
Block block;
Block overflow_block;
std::vector<Block> splitted_blocks;
bool is_exhausted = false;
Input(BlockInputStreamPtr & stream_) : stream(stream_) {}
};
std::vector<Input> inputs;
using BlocksToMerge = std::unique_ptr<BlocksList>;
void start();
/// Get blocks that you can merge. This allows you to merge them in parallel in separate threads.
BlocksToMerge getNextBlocksToMerge();
std::unique_ptr<ThreadPool> reading_pool;
/// For a parallel merge.
struct ParallelMergeData
{
ThreadPool pool;
/// Now one of the merging threads receives next blocks for the merge. This operation must be done sequentially.
std::mutex get_next_blocks_mutex;
std::atomic<bool> exhausted {false}; /// No more source data.
std::atomic<bool> finish {false}; /// Need to terminate early.
std::exception_ptr exception;
/// It is necessary to give out blocks in the order of the key (bucket_num).
/// If the value is an empty block, you need to wait for its merge.
/// (This means the promise that there will be data here, which is important because the data should be given out
/// in the order of the key - bucket_num)
std::map<int, Block> merged_blocks;
std::mutex merged_blocks_mutex;
/// An event that is used by merging threads to tell the main thread that the new block is ready.
std::condition_variable merged_blocks_changed;
/// An event by which the main thread is telling merging threads that it is possible to process the next group of blocks.
std::condition_variable have_space;
explicit ParallelMergeData(size_t max_threads) : pool(max_threads) {}
};
std::unique_ptr<ParallelMergeData> parallel_merge_data;
void mergeThread(ThreadGroupStatusPtr thread_group);
void finalize();
};
}

View File

@ -1,199 +0,0 @@
#include <DataStreams/BlocksListBlockInputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h>
#include <DataStreams/ParallelAggregatingBlockInputStream.h>
#include <Common/formatReadable.h>
namespace ProfileEvents
{
extern const Event ExternalAggregationMerge;
}
namespace DB
{
ParallelAggregatingBlockInputStream::ParallelAggregatingBlockInputStream(
const BlockInputStreams & inputs, const BlockInputStreamPtr & additional_input_at_end,
const Aggregator::Params & params_, bool final_, size_t max_threads_, size_t temporary_data_merge_threads_)
: params(params_), aggregator(params),
final(final_), max_threads(std::min(inputs.size(), max_threads_)), temporary_data_merge_threads(temporary_data_merge_threads_),
keys_size(params.keys_size), aggregates_size(params.aggregates_size),
handler(*this), processor(inputs, additional_input_at_end, max_threads, handler)
{
children = inputs;
if (additional_input_at_end)
children.push_back(additional_input_at_end);
}
Block ParallelAggregatingBlockInputStream::getHeader() const
{
return aggregator.getHeader(final);
}
void ParallelAggregatingBlockInputStream::cancel(bool kill)
{
if (kill)
is_killed = true;
bool old_val = false;
if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
return;
if (!executed)
processor.cancel(kill);
}
Block ParallelAggregatingBlockInputStream::readImpl()
{
if (!executed)
{
Aggregator::CancellationHook hook = [&]() { return this->isCancelled(); };
aggregator.setCancellationHook(hook);
execute();
if (isCancelledOrThrowIfKilled())
return {};
if (!aggregator.hasTemporaryFiles())
{
/** If all partially-aggregated data is in RAM, then merge them in parallel, also in RAM.
*/
impl = aggregator.mergeAndConvertToBlocks(many_data, final, max_threads);
}
else
{
/** If there are temporary files with partially-aggregated data on the disk,
* then read and merge them, spending the minimum amount of memory.
*/
ProfileEvents::increment(ProfileEvents::ExternalAggregationMerge);
const auto & files = aggregator.getTemporaryFiles();
BlockInputStreams input_streams;
for (const auto & file : files.files)
{
temporary_inputs.emplace_back(std::make_unique<TemporaryFileStream>(file->path()));
input_streams.emplace_back(temporary_inputs.back()->block_in);
}
LOG_TRACE(log, "Will merge {} temporary files of size {} compressed, {} uncompressed.", files.files.size(), ReadableSize(files.sum_size_compressed), ReadableSize(files.sum_size_uncompressed));
impl = std::make_unique<MergingAggregatedMemoryEfficientBlockInputStream>(
input_streams, params, final, temporary_data_merge_threads, temporary_data_merge_threads);
}
executed = true;
}
Block res;
if (isCancelledOrThrowIfKilled() || !impl)
return res;
return impl->read();
}
void ParallelAggregatingBlockInputStream::Handler::onBlock(Block & block, size_t thread_num)
{
parent.aggregator.executeOnBlock(block, *parent.many_data[thread_num],
parent.threads_data[thread_num].key_columns, parent.threads_data[thread_num].aggregate_columns, parent.no_more_keys);
parent.threads_data[thread_num].src_rows += block.rows();
parent.threads_data[thread_num].src_bytes += block.bytes();
}
void ParallelAggregatingBlockInputStream::Handler::onFinishThread(size_t thread_num)
{
if (!parent.isCancelled() && parent.aggregator.hasTemporaryFiles())
{
/// Flush data in the RAM to disk. So it's easier to unite them later.
auto & data = *parent.many_data[thread_num];
if (data.isConvertibleToTwoLevel())
data.convertToTwoLevel();
if (!data.empty())
parent.aggregator.writeToTemporaryFile(data);
}
}
void ParallelAggregatingBlockInputStream::Handler::onFinish()
{
if (!parent.isCancelled() && parent.aggregator.hasTemporaryFiles())
{
/// It may happen that some data has not yet been flushed,
/// because at the time of `onFinishThread` call, no data has been flushed to disk, and then some were.
for (auto & data : parent.many_data)
{
if (data->isConvertibleToTwoLevel())
data->convertToTwoLevel();
if (!data->empty())
parent.aggregator.writeToTemporaryFile(*data);
}
}
}
void ParallelAggregatingBlockInputStream::Handler::onException(std::exception_ptr & exception, size_t thread_num)
{
parent.exceptions[thread_num] = exception;
parent.cancel(false);
}
void ParallelAggregatingBlockInputStream::execute()
{
many_data.resize(max_threads);
exceptions.resize(max_threads);
for (size_t i = 0; i < max_threads; ++i)
threads_data.emplace_back(keys_size, aggregates_size);
LOG_TRACE(log, "Aggregating");
Stopwatch watch;
for (auto & elem : many_data)
elem = std::make_shared<AggregatedDataVariants>();
processor.process();
processor.wait();
rethrowFirstException(exceptions);
if (isCancelledOrThrowIfKilled())
return;
double elapsed_seconds = watch.elapsedSeconds();
size_t total_src_rows = 0;
size_t total_src_bytes = 0;
for (size_t i = 0; i < max_threads; ++i)
{
size_t rows = many_data[i]->size();
LOG_TRACE(log, "Aggregated. {} to {} rows (from {}) in {} sec. ({} rows/sec., {}/sec.)",
threads_data[i].src_rows, rows, ReadableSize(threads_data[i].src_bytes),
elapsed_seconds, threads_data[i].src_rows / elapsed_seconds,
ReadableSize(threads_data[i].src_bytes / elapsed_seconds));
total_src_rows += threads_data[i].src_rows;
total_src_bytes += threads_data[i].src_bytes;
}
LOG_TRACE(log, "Total aggregated. {} rows (from {}) in {} sec. ({} rows/sec., {}/sec.)",
total_src_rows, ReadableSize(total_src_bytes), elapsed_seconds,
total_src_rows / elapsed_seconds, ReadableSize(total_src_bytes / elapsed_seconds));
/// If there was no data, and we aggregate without keys, we must return single row with the result of empty aggregation.
/// To do this, we pass a block with zero rows to aggregate.
if (total_src_rows == 0 && params.keys_size == 0 && !params.empty_result_for_aggregation_by_empty_set)
aggregator.executeOnBlock(children.at(0)->getHeader(), *many_data[0],
threads_data[0].key_columns, threads_data[0].aggregate_columns, no_more_keys);
}
}

View File

@ -1,112 +0,0 @@
#pragma once
#include <Interpreters/Aggregator.h>
#include <IO/ReadBufferFromFile.h>
#include <Compression/CompressedReadBuffer.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/ParallelInputsProcessor.h>
#include <DataStreams/TemporaryFileStream.h>
namespace DB
{
/** Aggregates several sources in parallel.
* Makes aggregation of blocks from different sources independently in different threads, then combines the results.
* If final == false, aggregate functions are not finalized, that is, they are not replaced by their value, but contain an intermediate state of calculations.
* This is necessary so that aggregation can continue (for example, by combining streams of partially aggregated data).
*/
class ParallelAggregatingBlockInputStream : public IBlockInputStream
{
public:
/** Columns from key_names and arguments of aggregate functions must already be computed.
*/
ParallelAggregatingBlockInputStream(
const BlockInputStreams & inputs, const BlockInputStreamPtr & additional_input_at_end,
const Aggregator::Params & params_, bool final_, size_t max_threads_, size_t temporary_data_merge_threads_);
String getName() const override { return "ParallelAggregating"; }
void cancel(bool kill) override;
Block getHeader() const override;
protected:
/// Do nothing that preparation to execution of the query be done in parallel, in ParallelInputsProcessor.
void readPrefix() override
{
}
Block readImpl() override;
private:
Aggregator::Params params;
Aggregator aggregator;
bool final;
size_t max_threads;
size_t temporary_data_merge_threads;
size_t keys_size;
size_t aggregates_size;
/** Used if there is a limit on the maximum number of rows in the aggregation,
* and if group_by_overflow_mode == ANY.
* In this case, new keys are not added to the set, but aggregation is performed only by
* keys that have already been added into the set.
*/
bool no_more_keys = false;
std::atomic<bool> executed {false};
std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs;
Poco::Logger * log = &Poco::Logger::get("ParallelAggregatingBlockInputStream");
ManyAggregatedDataVariants many_data;
Exceptions exceptions;
struct ThreadData
{
size_t src_rows = 0;
size_t src_bytes = 0;
ColumnRawPtrs key_columns;
Aggregator::AggregateColumns aggregate_columns;
ThreadData(size_t keys_size_, size_t aggregates_size_)
{
key_columns.resize(keys_size_);
aggregate_columns.resize(aggregates_size_);
}
};
std::vector<ThreadData> threads_data;
struct Handler
{
Handler(ParallelAggregatingBlockInputStream & parent_)
: parent(parent_) {}
void onBlock(Block & block, size_t thread_num);
void onFinishThread(size_t thread_num);
void onFinish();
void onException(std::exception_ptr & exception, size_t thread_num);
ParallelAggregatingBlockInputStream & parent;
};
Handler handler;
ParallelInputsProcessor<Handler> processor;
void execute();
/** From here we get the finished blocks after the aggregation.
*/
std::unique_ptr<IBlockInputStream> impl;
};
}

View File

@ -1,18 +0,0 @@
#include <Interpreters/sortBlock.h>
#include <DataStreams/PartialSortingBlockInputStream.h>
namespace DB
{
Block PartialSortingBlockInputStream::readImpl()
{
Block res = children.back()->read();
sortBlock(res, description, limit);
return res;
}
}

View File

@ -1,35 +0,0 @@
#pragma once
#include <Core/SortDescription.h>
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
/** Sorts each block individually by the values of the specified columns.
* At the moment, not very optimal algorithm is used.
*/
class PartialSortingBlockInputStream : public IBlockInputStream
{
public:
/// limit - if not 0, then you can sort each block not completely, but only `limit` first rows by order.
PartialSortingBlockInputStream(const BlockInputStreamPtr & input_, SortDescription & description_, UInt64 limit_ = 0)
: description(description_), limit(limit_)
{
children.push_back(input_);
}
String getName() const override { return "PartialSorting"; }
Block getHeader() const override { return children.at(0)->getHeader(); }
protected:
Block readImpl() override;
private:
SortDescription description;
UInt64 limit;
};
}

View File

@ -1,42 +0,0 @@
#include "ReverseBlockInputStream.h"
#include <Common/PODArray.h>
namespace DB
{
ReverseBlockInputStream::ReverseBlockInputStream(const BlockInputStreamPtr & input)
{
children.push_back(input);
}
String ReverseBlockInputStream::getName() const
{
return "Reverse";
}
Block ReverseBlockInputStream::getHeader() const
{
return children.at(0)->getHeader();
}
Block ReverseBlockInputStream::readImpl()
{
auto result_block = children.back()->read();
if (!result_block)
{
return Block();
}
IColumn::Permutation permutation;
size_t rows_size = result_block.rows();
for (size_t i = 0; i < rows_size; ++i)
permutation.emplace_back(rows_size - 1 - i);
for (auto & block : result_block)
block.column = block.column->permute(permutation, 0);
return result_block;
}
}

View File

@ -1,21 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
/// Reverses an order of rows in every block in a data stream.
class ReverseBlockInputStream : public IBlockInputStream
{
public:
ReverseBlockInputStream(const BlockInputStreamPtr & input);
String getName() const override;
Block getHeader() const override;
protected:
Block readImpl() override;
};
}

View File

@ -1,72 +0,0 @@
#include <DataStreams/RollupBlockInputStream.h>
#include <DataStreams/finalizeBlock.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <Columns/ColumnAggregateFunction.h>
#include <Columns/FilterDescription.h>
#include <Common/typeid_cast.h>
namespace DB
{
RollupBlockInputStream::RollupBlockInputStream(
const BlockInputStreamPtr & input_, const Aggregator::Params & params_) : aggregator(params_),
keys(params_.keys)
{
children.push_back(input_);
Aggregator::CancellationHook hook = [this]() { return this->isCancelled(); };
aggregator.setCancellationHook(hook);
}
Block RollupBlockInputStream::getHeader() const
{
Block res = children.at(0)->getHeader();
finalizeBlock(res);
return res;
}
Block RollupBlockInputStream::readImpl()
{
/** After reading a block from input stream,
* we will subsequently roll it up on next iterations of 'readImpl'
* by zeroing out every column one-by-one and re-merging a block.
*/
if (!is_data_read)
{
BlocksList source_blocks;
while (auto block = children[0]->read())
source_blocks.push_back(block);
if (source_blocks.empty())
return {};
is_data_read = true;
if (source_blocks.size() > 1)
rollup_block = aggregator.mergeBlocks(source_blocks, false);
else
rollup_block = std::move(source_blocks.front());
current_key = keys.size() - 1;
auto finalized = rollup_block;
finalizeBlock(finalized);
return finalized;
}
if (current_key < 0)
return {};
auto & current = rollup_block.getByPosition(keys[current_key]);
current.column = current.column->cloneEmpty()->cloneResized(rollup_block.rows());
--current_key;
BlocksList rollup_blocks = { rollup_block };
rollup_block = aggregator.mergeBlocks(rollup_blocks, false);
auto finalized = rollup_block;
finalizeBlock(finalized);
return finalized;
}
}

View File

@ -1,41 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Interpreters/Aggregator.h>
#include <Core/ColumnNumbers.h>
namespace DB
{
class ExpressionActions;
/** Takes blocks after grouping, with non-finalized aggregate functions.
* Calculates subtotals and grand totals values for a set of columns.
*/
class RollupBlockInputStream : public IBlockInputStream
{
private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
using AggregateColumns = std::vector<ColumnRawPtrs>;
public:
RollupBlockInputStream(
const BlockInputStreamPtr & input_, const Aggregator::Params & params_);
String getName() const override { return "Rollup"; }
Block getHeader() const override;
protected:
Block readImpl() override;
private:
Aggregator aggregator;
ColumnNumbers keys;
ssize_t current_key = -1;
Block rollup_block;
bool is_data_read = false;
};
}

View File

@ -1,181 +0,0 @@
#include <DataStreams/TotalsHavingBlockInputStream.h>
#include <DataStreams/finalizeBlock.h>
#include <Interpreters/ExpressionActions.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <Columns/ColumnAggregateFunction.h>
#include <Columns/FilterDescription.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Common/Arena.h>
namespace DB
{
TotalsHavingBlockInputStream::TotalsHavingBlockInputStream(
const BlockInputStreamPtr & input_,
bool overflow_row_, const ExpressionActionsPtr & expression_,
const std::string & filter_column_, TotalsMode totals_mode_, double auto_include_threshold_, bool final_)
: overflow_row(overflow_row_),
expression(expression_), filter_column_name(filter_column_), totals_mode(totals_mode_),
auto_include_threshold(auto_include_threshold_), final(final_)
{
children.push_back(input_);
/// Initialize current totals with initial state.
Block source_header = children.at(0)->getHeader();
current_totals.reserve(source_header.columns());
for (const auto & elem : source_header)
{
// Create a column with default value
MutableColumnPtr new_column = elem.type->createColumn();
elem.type->insertDefaultInto(*new_column);
current_totals.emplace_back(std::move(new_column));
}
}
Block TotalsHavingBlockInputStream::getTotals()
{
if (!totals)
{
/** If totals_mode == AFTER_HAVING_AUTO, you need to decide whether to add aggregates to TOTALS for strings,
* not passed max_rows_to_group_by.
*/
if (overflow_aggregates)
{
if (totals_mode == TotalsMode::BEFORE_HAVING
|| totals_mode == TotalsMode::AFTER_HAVING_INCLUSIVE
|| (totals_mode == TotalsMode::AFTER_HAVING_AUTO
&& static_cast<double>(passed_keys) / total_keys >= auto_include_threshold))
addToTotals(overflow_aggregates, nullptr);
}
totals = children.at(0)->getHeader().cloneWithColumns(std::move(current_totals));
finalizeBlock(totals);
}
if (totals && expression)
expression->execute(totals);
return totals;
}
Block TotalsHavingBlockInputStream::getHeader() const
{
Block res = children.at(0)->getHeader();
if (final)
finalizeBlock(res);
if (expression)
expression->execute(res);
return res;
}
Block TotalsHavingBlockInputStream::readImpl()
{
Block finalized;
Block block;
while (true)
{
block = children[0]->read();
/// Block with values not included in `max_rows_to_group_by`. We'll postpone it.
if (overflow_row && block && block.info.is_overflows)
{
overflow_aggregates = block;
continue;
}
if (!block)
return finalized;
finalized = block;
if (final)
finalizeBlock(finalized);
total_keys += finalized.rows();
if (filter_column_name.empty())
{
addToTotals(block, nullptr);
}
else
{
/// Compute the expression in HAVING.
expression->execute(finalized);
size_t filter_column_pos = finalized.getPositionByName(filter_column_name);
ColumnPtr filter_column_ptr = finalized.safeGetByPosition(filter_column_pos).column->convertToFullColumnIfConst();
FilterDescription filter_description(*filter_column_ptr);
/// Add values to `totals` (if it was not already done).
if (totals_mode == TotalsMode::BEFORE_HAVING)
addToTotals(block, nullptr);
else
addToTotals(block, filter_description.data);
/// Filter the block by expression in HAVING.
size_t columns = finalized.columns();
for (size_t i = 0; i < columns; ++i)
{
ColumnWithTypeAndName & current_column = finalized.safeGetByPosition(i);
current_column.column = current_column.column->filter(*filter_description.data, -1);
if (current_column.column->empty())
{
finalized.clear();
break;
}
}
}
if (!finalized)
continue;
passed_keys += finalized.rows();
return finalized;
}
}
void TotalsHavingBlockInputStream::addToTotals(const Block & source_block, const IColumn::Filter * filter)
{
for (size_t i = 0, num_columns = source_block.columns(); i < num_columns; ++i)
{
const auto * source_column = typeid_cast<const ColumnAggregateFunction *>(
source_block.getByPosition(i).column.get());
if (!source_column)
{
continue;
}
auto & totals_column = assert_cast<ColumnAggregateFunction &>(*current_totals[i]);
assert(totals_column.size() == 1);
/// Accumulate all aggregate states from a column of a source block into
/// the corresponding totals column.
const auto & vec = source_column->getData();
size_t size = vec.size();
if (filter)
{
for (size_t j = 0; j < size; ++j)
if ((*filter)[j])
totals_column.insertMergeFrom(vec[j]);
}
else
{
for (size_t j = 0; j < size; ++j)
totals_column.insertMergeFrom(vec[j]);
}
}
}
}

View File

@ -1,62 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
class Arena;
using ArenaPtr = std::shared_ptr<Arena>;
class ExpressionActions;
enum class TotalsMode;
/** Takes blocks after grouping, with non-finalized aggregate functions.
* Calculates total values according to totals_mode.
* If necessary, evaluates the expression from HAVING and filters rows. Returns the finalized and filtered blocks.
*/
class TotalsHavingBlockInputStream : public IBlockInputStream
{
private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
public:
/// expression may be nullptr
TotalsHavingBlockInputStream(
const BlockInputStreamPtr & input_,
bool overflow_row_, const ExpressionActionsPtr & expression_,
const std::string & filter_column_, TotalsMode totals_mode_, double auto_include_threshold_, bool final_);
String getName() const override { return "TotalsHaving"; }
Block getTotals() override;
Block getHeader() const override;
protected:
Block readImpl() override;
private:
bool overflow_row;
ExpressionActionsPtr expression;
String filter_column_name;
TotalsMode totals_mode;
double auto_include_threshold;
bool final;
size_t passed_keys = 0;
size_t total_keys = 0;
/** Here are the values that did not pass max_rows_to_group_by.
* They are added or not added to the current_totals, depending on the totals_mode.
*/
Block overflow_aggregates;
/// Here, total values are accumulated. After the work is finished, they will be placed in IBlockInputStream::totals.
MutableColumns current_totals;
/// If filter == nullptr - add all rows. Otherwise, only the rows that pass the filter (HAVING).
void addToTotals(const Block & block, const IColumn::Filter * filter);
};
}

View File

@ -1,6 +1,5 @@
#include <random>
#include <Common/thread_local_rng.h>
#include <DataStreams/ConcatBlockInputStream.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/Pipe.h>
#include "narrowBlockInputStreams.h"
@ -24,26 +23,6 @@ namespace
}
}
BlockInputStreams narrowBlockInputStreams(BlockInputStreams & inputs, size_t width)
{
size_t size = inputs.size();
if (size <= width)
return inputs;
std::vector<BlockInputStreams> partitions(width);
auto distribution = getDistribution(size, width);
for (size_t i = 0; i < size; ++i)
partitions[distribution[i]].push_back(inputs[i]);
BlockInputStreams res(width);
for (size_t i = 0; i < width; ++i)
res[i] = std::make_shared<ConcatBlockInputStream>(partitions[i]);
return res;
}
Pipes narrowPipes(Pipes pipes, size_t width)
{
size_t size = pipes.size();

View File

@ -16,7 +16,6 @@ using Pipes = std::vector<Pipe>;
* Trying to glue the sources with each other uniformly randomly.
* (to avoid overweighting if the distribution of the amount of data in different sources is subject to some pattern)
*/
BlockInputStreams narrowBlockInputStreams(BlockInputStreams & inputs, size_t width);
Pipes narrowPipes(Pipes pipes, size_t width);
}

View File

@ -1,49 +0,0 @@
#include <DataStreams/processConstants.h>
namespace DB
{
void removeConstantsFromBlock(Block & block)
{
size_t columns = block.columns();
size_t i = 0;
while (i < columns)
{
if (block.getByPosition(i).column && isColumnConst(*block.getByPosition(i).column))
{
block.erase(i);
--columns;
}
else
++i;
}
}
void removeConstantsFromSortDescription(const Block & header, SortDescription & description)
{
/// Note: This code is not correct if column description contains column numbers instead of column names.
/// Hopefully, everywhere where it is used, column description contains names.
description.erase(std::remove_if(description.begin(), description.end(),
[&](const SortColumnDescription & elem)
{
const auto & column = !elem.column_name.empty() ? header.getByName(elem.column_name)
: header.safeGetByPosition(elem.column_number);
return column.column && isColumnConst(*column.column);
}), description.end());
}
void enrichBlockWithConstants(Block & block, const Block & header)
{
size_t rows = block.rows();
size_t columns = header.columns();
for (size_t i = 0; i < columns; ++i)
{
const auto & col_type_name = header.getByPosition(i);
if (col_type_name.column && isColumnConst(*col_type_name.column))
block.insert(i, {col_type_name.column->cloneResized(rows), col_type_name.type, col_type_name.name});
}
}
}

View File

@ -1,23 +0,0 @@
#pragma once
#include <Core/Block.h>
#include <Core/SortDescription.h>
namespace DB
{
/** Functions for manipulate constants for sorting.
* See MergeSortingBlocksBlockInputStream and FinishSortingBlockInputStream for details.
*/
/** Remove constant columns from block.
*/
void removeConstantsFromBlock(Block & block);
void removeConstantsFromSortDescription(const Block & header, SortDescription & description);
/** Add into block, whose constant columns was removed by previous function,
* constant columns from header (which must have structure as before removal of constants from block).
*/
void enrichBlockWithConstants(Block & block, const Block & header);
}

View File

@ -9,8 +9,5 @@ target_link_libraries (filter_stream PRIVATE dbms clickhouse_storages_system cli
add_executable (union_stream2 union_stream2.cpp ${SRCS})
target_link_libraries (union_stream2 PRIVATE dbms)
add_executable (collapsing_sorted_stream collapsing_sorted_stream.cpp ${SRCS})
target_link_libraries (collapsing_sorted_stream PRIVATE dbms)
add_executable (finish_sorting_stream finish_sorting_stream.cpp ${SRCS})
target_link_libraries (finish_sorting_stream PRIVATE dbms)

View File

@ -1,84 +0,0 @@
#include <iostream>
#include <iomanip>
#include <Poco/ConsoleChannel.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/CollapsingFinalBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <Interpreters/Context.h>
#include <DataTypes/DataTypesNumber.h>
int main(int, char **)
try
{
using namespace DB;
Poco::AutoPtr<Poco::ConsoleChannel> channel = new Poco::ConsoleChannel(std::cerr);
Poco::Logger::root().setChannel(channel);
Poco::Logger::root().setLevel("trace");
Block block1;
{
ColumnWithTypeAndName column1;
column1.name = "Sign";
column1.type = std::make_shared<DataTypeInt8>();
column1.column = ColumnInt8::create({static_cast<int8_t>(1), static_cast<int8_t>(-1)});
block1.insert(column1);
ColumnWithTypeAndName column2;
column2.name = "CounterID";
column2.type = std::make_shared<DataTypeUInt32>();
column2.column = ColumnUInt32::create({static_cast<uint32_t>(123), static_cast<uint32_t>(123)});
block1.insert(column2);
}
Block block2;
{
ColumnWithTypeAndName column1;
column1.name = "Sign";
column1.type = std::make_shared<DataTypeInt8>();
column1.column = ColumnInt8::create({static_cast<int8_t>(1), static_cast<int8_t>(1)});
block2.insert(column1);
ColumnWithTypeAndName column2;
column2.name = "CounterID";
column2.type = std::make_shared<DataTypeUInt32>();
column2.column = ColumnUInt32::create({static_cast<uint32_t>(123), static_cast<uint32_t>(456)});
block2.insert(column2);
}
BlockInputStreams inputs;
inputs.push_back(std::make_shared<OneBlockInputStream>(block1));
inputs.push_back(std::make_shared<OneBlockInputStream>(block2));
SortDescription descr;
SortColumnDescription col_descr("CounterID", 1, 1);
descr.push_back(col_descr);
//CollapsingSortedBlockInputStream collapsed(inputs, descr, "Sign", 1048576);
CollapsingFinalBlockInputStream collapsed(inputs, descr, "Sign");
SharedContextHolder shared_context = Context::createShared();
Context context = Context::createGlobal(shared_context.get());
context.makeGlobalContext();
WriteBufferFromFileDescriptor out_buf(STDERR_FILENO);
BlockOutputStreamPtr output = context.getOutputFormat("TabSeparated", out_buf, block1);
copyData(collapsed, *output);
return 0;
}
catch (const DB::Exception & e)
{
std::cerr << e.what() << ", " << e.displayText() << std::endl;
throw;
}

View File

@ -7,10 +7,14 @@
#include <Core/SortDescription.h>
#include <DataStreams/MergeSortingBlockInputStream.h>
#include <DataStreams/PartialSortingBlockInputStream.h>
#include <DataStreams/FinishSortingBlockInputStream.h>
#include <Interpreters/sortBlock.h>
#include <Processors/Transforms/FinishSortingTransform.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Executors/PipelineExecutingBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Processors/Transforms/MergeSortingTransform.h>
using namespace DB;
@ -33,7 +37,11 @@ int main(int argc, char ** argv)
size_t m = argc >= 2 ? std::stol(argv[1]) : 2;
size_t n = argc >= 3 ? std::stol(argv[2]) : 10;
Blocks blocks;
SortDescription sort_descr;
sort_descr.emplace_back("col1", 1, 1);
Block block_header;
Pipes sources;
for (size_t t = 0; t < m; ++t)
{
Block block;
@ -53,28 +61,35 @@ int main(int argc, char ** argv)
column.column = std::move(col);
block.insert(column);
}
blocks.push_back(block);
if (!block_header)
block_header = block.cloneEmpty();
sortBlock(block, sort_descr);
sources.emplace_back(std::make_shared<SourceFromInputStream>(std::make_shared<OneBlockInputStream>(block)));
}
SortDescription sort_descr;
sort_descr.emplace_back("col1", 1, 1);
QueryPipeline pipeline;
pipeline.init(std::move(sources));
for (auto & block : blocks)
sortBlock(block, sort_descr);
BlockInputStreamPtr stream = std::make_shared<MergeSortingBlocksBlockInputStream>(blocks, sort_descr, n);
pipeline.addPipe({std::make_shared<MergeSortingTransform>(pipeline.getHeader(), sort_descr, n, 0, 0, 0, nullptr, 0)});
SortDescription sort_descr_final;
sort_descr_final.emplace_back("col1", 1, 1);
sort_descr_final.emplace_back("col2", 1, 1);
stream = std::make_shared<FinishSortingBlockInputStream>(stream, sort_descr, sort_descr_final, n, 0);
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FinishSortingTransform>(header, sort_descr, sort_descr_final, n, 0);
});
auto stream = std::make_shared<PipelineExecutingBlockInputStream>(std::move(pipeline));
{
Stopwatch stopwatch;
stopwatch.start();
Block res_block = blocks[0].cloneEmpty();
Block res_block = block_header;
while (Block block = stream->read())
{

View File

@ -9,56 +9,39 @@ NO_COMPILER_WARNINGS()
SRCS(
AddingDefaultBlockOutputStream.cpp
AddingDefaultsBlockInputStream.cpp
AggregatingBlockInputStream.cpp
AsynchronousBlockInputStream.cpp
BlockIO.cpp
BlockStreamProfileInfo.cpp
CheckConstraintsBlockOutputStream.cpp
CheckSortedBlockInputStream.cpp
CollapsingFinalBlockInputStream.cpp
ColumnGathererStream.cpp
ConvertingBlockInputStream.cpp
copyData.cpp
CountingBlockOutputStream.cpp
CreatingSetsBlockInputStream.cpp
CubeBlockInputStream.cpp
DistinctBlockInputStream.cpp
DistinctSortedBlockInputStream.cpp
ExecutionSpeedLimits.cpp
ExpressionBlockInputStream.cpp
FillingBlockInputStream.cpp
FilterBlockInputStream.cpp
FilterColumnsBlockInputStream.cpp
finalizeBlock.cpp
FinishSortingBlockInputStream.cpp
IBlockInputStream.cpp
InputStreamFromASTInsertQuery.cpp
InternalTextLogsRowOutputStream.cpp
LimitBlockInputStream.cpp
LimitByBlockInputStream.cpp
materializeBlock.cpp
MaterializingBlockInputStream.cpp
MergeSortingBlockInputStream.cpp
MergingAggregatedBlockInputStream.cpp
MergingAggregatedMemoryEfficientBlockInputStream.cpp
MergingSortedBlockInputStream.cpp
narrowBlockInputStreams.cpp
NativeBlockInputStream.cpp
NativeBlockOutputStream.cpp
ParallelAggregatingBlockInputStream.cpp
ParallelParsingBlockInputStream.cpp
PartialSortingBlockInputStream.cpp
processConstants.cpp
PushingToViewsBlockOutputStream.cpp
RemoteBlockInputStream.cpp
RemoteBlockOutputStream.cpp
ReverseBlockInputStream.cpp
RollupBlockInputStream.cpp
SizeLimits.cpp
SquashingBlockInputStream.cpp
SquashingBlockOutputStream.cpp
SquashingTransform.cpp
TotalsHavingBlockInputStream.cpp
TTLBlockInputStream.cpp
)

View File

@ -829,7 +829,7 @@ using ManyAggregatedDataVariants = std::vector<AggregatedDataVariantsPtr>;
using ManyAggregatedDataVariantsPtr = std::shared_ptr<ManyAggregatedDataVariants>;
/** How are "total" values calculated with WITH TOTALS?
* (For more details, see TotalsHavingBlockInputStream.)
* (For more details, see TotalsHavingTransform.)
*
* In the absence of group_by_overflow_mode = 'any', the data is aggregated as usual, but the states of the aggregate functions are not finalized.
* Later, the aggregate function states for all rows (passed through HAVING) are merged into one - this will be TOTALS.

View File

@ -3,7 +3,6 @@
#include <DataStreams/MergingAggregatedBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <DataStreams/ConvertColumnLowCardinalityToFullBlockInputStream.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>

View File

@ -8,6 +8,50 @@
namespace DB
{
/** Pre-aggregates data from ports, holding in RAM only one or more (up to merging_threads) blocks from each source.
* This saves RAM in case of using two-level aggregation, where in each source there will be up to 256 blocks with parts of the result.
*
* Aggregate functions in blocks should not be finalized so that their states can be combined.
*
* Used to solve two tasks:
*
* 1. External aggregation with data flush to disk.
* Partially aggregated data (previously divided into 256 buckets) is flushed to some number of files on the disk.
* We need to read them and merge them by buckets - keeping only a few buckets from each file in RAM simultaneously.
*
* 2. Merge aggregation results for distributed query processing.
* Partially aggregated data arrives from different servers, which can be splitted down or not, into 256 buckets,
* and these buckets are passed to us by the network from each server in sequence, one by one.
* You should also read and merge by the buckets.
*
* The essence of the work:
*
* There are a number of sources. They give out blocks with partially aggregated data.
* Each source can return one of the following block sequences:
* 1. "unsplitted" block with bucket_num = -1;
* 2. "splitted" (two_level) blocks with bucket_num from 0 to 255;
* In both cases, there may also be a block of "overflows" with bucket_num = -1 and is_overflows = true;
*
* We start from the convention that splitted blocks are always passed in the order of bucket_num.
* That is, if a < b, then the bucket_num = a block goes before bucket_num = b.
* This is needed for a memory-efficient merge
* - so that you do not need to read the blocks up front, but go all the way up by bucket_num.
*
* In this case, not all bucket_num from the range of 0..255 can be present.
* The overflow block can be presented in any order relative to other blocks (but it can be only one).
*
* It is necessary to combine these sequences of blocks and return the result as a sequence with the same properties.
* That is, at the output, if there are "splitted" blocks in the sequence, then they should go in the order of bucket_num.
*
* The merge can be performed using several (merging_threads) threads.
* For this, receiving of a set of blocks for the next bucket_num should be done sequentially,
* and then, when we have several received sets, they can be merged in parallel.
*
* When you receive next blocks from different sources,
* data from sources can also be read in several threads (reading_threads)
* for optimal performance in the presence of a fast network or disks (from where these blocks are read).
*/
/// Has several inputs and single output.
/// Read from inputs chunks with partially aggregated data, group them by bucket number
/// and write data from single bucket as single chunk.

View File

@ -19,7 +19,7 @@ limitations under the License. */
#include <Interpreters/InterpreterSelectQuery.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/BlocksBlockInputStream.h>
#include <DataStreams/BlocksSource.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/SquashingBlockInputStream.h>
#include <DataStreams/copyData.h>