Merge pull request #29768 from ClickHouse/remove-merging-streams

Remove some merging streams
This commit is contained in:
Nikolai Kochetov 2021-10-08 13:15:10 +03:00 committed by GitHub
commit 9eddee5517
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 423 additions and 308 deletions

View File

@ -11,104 +11,157 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int INCOMPATIBLE_COLUMNS;
extern const int INCORRECT_NUMBER_OF_COLUMNS; extern const int INCORRECT_NUMBER_OF_COLUMNS;
extern const int EMPTY_DATA_PASSED; extern const int EMPTY_DATA_PASSED;
extern const int RECEIVED_EMPTY_DATA; extern const int RECEIVED_EMPTY_DATA;
} }
ColumnGathererStream::ColumnGathererStream( ColumnGathererStream::ColumnGathererStream(
const String & column_name_, const BlockInputStreams & source_streams, ReadBuffer & row_sources_buf_, size_t num_inputs, ReadBuffer & row_sources_buf_, size_t block_preferred_size_)
size_t block_preferred_size_) : sources(num_inputs), row_sources_buf(row_sources_buf_)
: column_name(column_name_), sources(source_streams.size()), row_sources_buf(row_sources_buf_) , block_preferred_size(block_preferred_size_)
, block_preferred_size(block_preferred_size_), log(&Poco::Logger::get("ColumnGathererStream"))
{ {
if (source_streams.empty()) if (num_inputs == 0)
throw Exception("There are no streams to gather", ErrorCodes::EMPTY_DATA_PASSED); throw Exception("There are no streams to gather", ErrorCodes::EMPTY_DATA_PASSED);
children.assign(source_streams.begin(), source_streams.end());
for (size_t i = 0; i < children.size(); ++i)
{
const Block & header = children[i]->getHeader();
/// Sometimes MergeTreeReader injects additional column with partitioning key
if (header.columns() > 2)
throw Exception(
"Block should have 1 or 2 columns, but contains " + toString(header.columns()),
ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS);
if (i == 0)
{
column.name = column_name;
column.type = header.getByName(column_name).type;
column.column = column.type->createColumn();
} }
else if (header.getByName(column_name).column->getName() != column.column->getName())
throw Exception("Column types don't match", ErrorCodes::INCOMPATIBLE_COLUMNS); void ColumnGathererStream::initialize(Inputs inputs)
{
for (size_t i = 0; i < inputs.size(); ++i)
{
if (inputs[i].chunk)
{
sources[i].update(inputs[i].chunk.detachColumns().at(0));
if (!result_column)
result_column = sources[i].column->cloneEmpty();
}
} }
} }
IMergingAlgorithm::Status ColumnGathererStream::merge()
Block ColumnGathererStream::readImpl()
{ {
/// Nothing to read after initialize.
if (!result_column)
return Status(Chunk(), true);
if (source_to_fully_copy) /// Was set on a previous iteration
{
Chunk res;
res.addColumn(source_to_fully_copy->column);
merged_rows += source_to_fully_copy->size;
source_to_fully_copy->pos = source_to_fully_copy->size;
source_to_fully_copy = nullptr;
return Status(std::move(res));
}
/// Special case: single source and there are no skipped rows /// Special case: single source and there are no skipped rows
if (children.size() == 1 && row_sources_buf.eof() && !source_to_fully_copy) /// Note: looks like this should never happen because row_sources_buf cannot just skip row info.
return children[0]->read(); if (sources.size() == 1 && row_sources_buf.eof())
{
if (sources.front().pos < sources.front().size)
{
next_required_source = 0;
Chunk res;
merged_rows += sources.front().column->size();
merged_bytes += sources.front().column->allocatedBytes();
res.addColumn(std::move(sources.front().column));
sources.front().pos = sources.front().size = 0;
return Status(std::move(res));
}
if (!source_to_fully_copy && row_sources_buf.eof()) if (next_required_source == -1)
return Block(); return Status(Chunk(), true);
MutableColumnPtr output_column = column.column->cloneEmpty(); next_required_source = 0;
output_block = Block{column.cloneEmpty()}; return Status(next_required_source);
/// Surprisingly this call may directly change output_block, bypassing }
if (next_required_source != -1 && sources[next_required_source].size == 0)
throw Exception("Cannot fetch required block. Source " + toString(next_required_source), ErrorCodes::RECEIVED_EMPTY_DATA);
/// Surprisingly this call may directly change some internal state of ColumnGathererStream.
/// output_column. See ColumnGathererStream::gather. /// output_column. See ColumnGathererStream::gather.
output_column->gather(*this); result_column->gather(*this);
if (!output_column->empty())
output_block.getByPosition(0).column = std::move(output_column);
return output_block; if (next_required_source != -1)
return Status(next_required_source);
if (source_to_fully_copy && result_column->empty())
{
Chunk res;
merged_rows += source_to_fully_copy->column->size();
merged_bytes += source_to_fully_copy->column->allocatedBytes();
res.addColumn(source_to_fully_copy->column);
source_to_fully_copy->pos = source_to_fully_copy->size;
source_to_fully_copy = nullptr;
return Status(std::move(res));
}
auto col = result_column->cloneEmpty();
result_column.swap(col);
Chunk res;
merged_rows += col->size();
merged_bytes += col->allocatedBytes();
res.addColumn(std::move(col));
return Status(std::move(res), row_sources_buf.eof() && !source_to_fully_copy);
} }
void ColumnGathererStream::fetchNewBlock(Source & source, size_t source_num) void ColumnGathererStream::consume(Input & input, size_t source_num)
{ {
try auto & source = sources[source_num];
{ if (input.chunk)
source.block = children[source_num]->read(); source.update(input.chunk.getColumns().at(0));
source.update(column_name);
}
catch (Exception & e)
{
e.addMessage("Cannot fetch required block. Stream " + children[source_num]->getName() + ", part " + toString(source_num));
throw;
}
if (0 == source.size) if (0 == source.size)
{ {
throw Exception("Fetched block is empty. Stream " + children[source_num]->getName() + ", part " + toString(source_num), throw Exception("Fetched block is empty. Source " + toString(source_num),
ErrorCodes::RECEIVED_EMPTY_DATA); ErrorCodes::RECEIVED_EMPTY_DATA);
} }
} }
ColumnGathererTransform::ColumnGathererTransform(
void ColumnGathererStream::readSuffixImpl() const Block & header,
size_t num_inputs,
ReadBuffer & row_sources_buf_,
size_t block_preferred_size_)
: IMergingTransform<ColumnGathererStream>(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false,
num_inputs, row_sources_buf_, block_preferred_size_)
, log(&Poco::Logger::get("ColumnGathererStream"))
{ {
const BlockStreamProfileInfo & profile_info = getProfileInfo(); if (header.columns() != 1)
throw Exception(
"Header should have 1 column, but contains " + toString(header.columns()),
ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS);
}
void ColumnGathererTransform::work()
{
Stopwatch stopwatch;
IMergingTransform<ColumnGathererStream>::work();
elapsed_ns += stopwatch.elapsedNanoseconds();
}
void ColumnGathererTransform::onFinish()
{
auto merged_rows = algorithm.getMergedRows();
auto merged_bytes = algorithm.getMergedRows();
/// Don't print info for small parts (< 10M rows) /// Don't print info for small parts (< 10M rows)
if (profile_info.rows < 10000000) if (merged_rows < 10000000)
return; return;
double seconds = profile_info.total_stopwatch.elapsedSeconds(); double seconds = static_cast<double>(elapsed_ns) / 1000000000ULL;
const auto & column_name = getOutputPort().getHeader().getByPosition(0).name;
if (!seconds) if (!seconds)
LOG_DEBUG(log, "Gathered column {} ({} bytes/elem.) in 0 sec.", LOG_DEBUG(log, "Gathered column {} ({} bytes/elem.) in 0 sec.",
column_name, static_cast<double>(profile_info.bytes) / profile_info.rows); column_name, static_cast<double>(merged_bytes) / merged_rows);
else else
LOG_DEBUG(log, "Gathered column {} ({} bytes/elem.) in {} sec., {} rows/sec., {}/sec.", LOG_DEBUG(log, "Gathered column {} ({} bytes/elem.) in {} sec., {} rows/sec., {}/sec.",
column_name, static_cast<double>(profile_info.bytes) / profile_info.rows, seconds, column_name, static_cast<double>(merged_bytes) / merged_rows, seconds,
profile_info.rows / seconds, ReadableSize(profile_info.bytes / seconds)); merged_rows / seconds, ReadableSize(merged_bytes / seconds));
} }
} }

View File

@ -1,8 +1,9 @@
#pragma once #pragma once
#include <DataStreams/IBlockInputStream.h>
#include <IO/ReadBuffer.h> #include <IO/ReadBuffer.h>
#include <Common/PODArray.h> #include <Common/PODArray.h>
#include <Processors/Merges/Algorithms/IMergingAlgorithm.h>
#include <Processors/Merges/IMergingTransform.h>
namespace Poco { class Logger; } namespace Poco { class Logger; }
@ -53,77 +54,91 @@ using MergedRowSources = PODArray<RowSourcePart>;
* Stream mask maps row number to index of source stream. * Stream mask maps row number to index of source stream.
* Streams should contain exactly one column. * Streams should contain exactly one column.
*/ */
class ColumnGathererStream : public IBlockInputStream class ColumnGathererStream final : public IMergingAlgorithm
{ {
public: public:
ColumnGathererStream( ColumnGathererStream(size_t num_inputs, ReadBuffer & row_sources_buf_, size_t block_preferred_size_ = DEFAULT_BLOCK_SIZE);
const String & column_name_, const BlockInputStreams & source_streams, ReadBuffer & row_sources_buf_,
size_t block_preferred_size_ = DEFAULT_BLOCK_SIZE);
String getName() const override { return "ColumnGatherer"; } void initialize(Inputs inputs) override;
void consume(Input & input, size_t source_num) override;
Block readImpl() override; Status merge() override;
void readSuffixImpl() override;
Block getHeader() const override { return children.at(0)->getHeader(); }
/// for use in implementations of IColumn::gather() /// for use in implementations of IColumn::gather()
template <typename Column> template <typename Column>
void gather(Column & column_res); void gather(Column & column_res);
UInt64 getMergedRows() const { return merged_rows; }
UInt64 getMergedBytes() const { return merged_bytes; }
private: private:
/// Cache required fields /// Cache required fields
struct Source struct Source
{ {
const IColumn * column = nullptr; ColumnPtr column;
size_t pos = 0; size_t pos = 0;
size_t size = 0; size_t size = 0;
Block block;
void update(const String & name) void update(ColumnPtr column_)
{ {
column = block.getByName(name).column.get(); column = std::move(column_);
size = block.rows(); size = column->size();
pos = 0; pos = 0;
} }
}; };
void fetchNewBlock(Source & source, size_t source_num); MutableColumnPtr result_column;
String column_name;
ColumnWithTypeAndName column;
std::vector<Source> sources; std::vector<Source> sources;
ReadBuffer & row_sources_buf; ReadBuffer & row_sources_buf;
size_t block_preferred_size; const size_t block_preferred_size;
Source * source_to_fully_copy = nullptr; Source * source_to_fully_copy = nullptr;
Block output_block;
ssize_t next_required_source = -1;
size_t cur_block_preferred_size = 0;
UInt64 merged_rows = 0;
UInt64 merged_bytes = 0;
};
class ColumnGathererTransform final : public IMergingTransform<ColumnGathererStream>
{
public:
ColumnGathererTransform(
const Block & header,
size_t num_inputs,
ReadBuffer & row_sources_buf_,
size_t block_preferred_size_ = DEFAULT_BLOCK_SIZE);
String getName() const override { return "ColumnGathererTransform"; }
void work() override;
protected:
void onFinish() override;
UInt64 elapsed_ns = 0;
Poco::Logger * log; Poco::Logger * log;
}; };
template <typename Column> template <typename Column>
void ColumnGathererStream::gather(Column & column_res) void ColumnGathererStream::gather(Column & column_res)
{ {
if (source_to_fully_copy) /// Was set on a previous iteration
{
output_block.getByPosition(0).column = source_to_fully_copy->block.getByName(column_name).column;
source_to_fully_copy->pos = source_to_fully_copy->size;
source_to_fully_copy = nullptr;
return;
}
row_sources_buf.nextIfAtEnd(); row_sources_buf.nextIfAtEnd();
RowSourcePart * row_source_pos = reinterpret_cast<RowSourcePart *>(row_sources_buf.position()); RowSourcePart * row_source_pos = reinterpret_cast<RowSourcePart *>(row_sources_buf.position());
RowSourcePart * row_sources_end = reinterpret_cast<RowSourcePart *>(row_sources_buf.buffer().end()); RowSourcePart * row_sources_end = reinterpret_cast<RowSourcePart *>(row_sources_buf.buffer().end());
size_t cur_block_preferred_size = std::min(static_cast<size_t>(row_sources_end - row_source_pos), block_preferred_size); if (next_required_source == -1)
{
/// Start new column.
cur_block_preferred_size = std::min(static_cast<size_t>(row_sources_end - row_source_pos), block_preferred_size);
column_res.reserve(cur_block_preferred_size); column_res.reserve(cur_block_preferred_size);
}
size_t cur_size = 0; size_t cur_size = column_res.size();
next_required_source = -1;
while (row_source_pos < row_sources_end && cur_size < cur_block_preferred_size) while (row_source_pos < row_sources_end && cur_size < cur_block_preferred_size)
{ {
@ -131,13 +146,15 @@ void ColumnGathererStream::gather(Column & column_res)
size_t source_num = row_source.getSourceNum(); size_t source_num = row_source.getSourceNum();
Source & source = sources[source_num]; Source & source = sources[source_num];
bool source_skip = row_source.getSkipFlag(); bool source_skip = row_source.getSkipFlag();
++row_source_pos;
if (source.pos >= source.size) /// Fetch new block from source_num part if (source.pos >= source.size) /// Fetch new block from source_num part
{ {
fetchNewBlock(source, source_num); next_required_source = source_num;
return;
} }
++row_source_pos;
/// Consecutive optimization. TODO: precompute lengths /// Consecutive optimization. TODO: precompute lengths
size_t len = 1; size_t len = 1;
size_t max_len = std::min(static_cast<size_t>(row_sources_end - row_source_pos), source.size - source.pos); // interval should be in the same block size_t max_len = std::min(static_cast<size_t>(row_sources_end - row_source_pos), source.size - source.pos); // interval should be in the same block
@ -156,16 +173,9 @@ void ColumnGathererStream::gather(Column & column_res)
{ {
/// If current block already contains data, return it. /// If current block already contains data, return it.
/// Whole column from current source will be returned on next read() iteration. /// Whole column from current source will be returned on next read() iteration.
if (cur_size > 0)
{
source_to_fully_copy = &source; source_to_fully_copy = &source;
return; return;
} }
output_block.getByPosition(0).column = source.block.getByName(column_name).column;
source.pos += len;
return;
}
else if (len == 1) else if (len == 1)
column_res.insertFrom(*source.column, source.pos); column_res.insertFrom(*source.column, source.pos);
else else

View File

@ -8,40 +8,28 @@ namespace ErrorCodes
extern const int SET_SIZE_LIMIT_EXCEEDED; extern const int SET_SIZE_LIMIT_EXCEEDED;
} }
DistinctSortedBlockInputStream::DistinctSortedBlockInputStream( DistinctSortedTransform::DistinctSortedTransform(
const BlockInputStreamPtr & input, SortDescription sort_description, const SizeLimits & set_size_limits_, UInt64 limit_hint_, const Names & columns) const Block & header, SortDescription sort_description, const SizeLimits & set_size_limits_, UInt64 limit_hint_, const Names & columns)
: description(std::move(sort_description)) : ISimpleTransform(header, header, true)
, description(std::move(sort_description))
, columns_names(columns) , columns_names(columns)
, limit_hint(limit_hint_) , limit_hint(limit_hint_)
, set_size_limits(set_size_limits_) , set_size_limits(set_size_limits_)
{ {
children.push_back(input);
} }
Block DistinctSortedBlockInputStream::readImpl() void DistinctSortedTransform::transform(Chunk & chunk)
{ {
/// Execute until end of stream or until const ColumnRawPtrs column_ptrs(getKeyColumns(chunk));
/// a block with some new records will be gotten.
for (;;)
{
/// Stop reading if we already reached the limit.
if (limit_hint && data.getTotalRowCount() >= limit_hint)
return Block();
Block block = children.back()->read();
if (!block)
return Block();
const ColumnRawPtrs column_ptrs(getKeyColumns(block));
if (column_ptrs.empty()) if (column_ptrs.empty())
return block; return;
const ColumnRawPtrs clearing_hint_columns(getClearingColumns(block, column_ptrs)); const ColumnRawPtrs clearing_hint_columns(getClearingColumns(chunk, column_ptrs));
if (data.type == ClearableSetVariants::Type::EMPTY) if (data.type == ClearableSetVariants::Type::EMPTY)
data.init(ClearableSetVariants::chooseMethod(column_ptrs, key_sizes)); data.init(ClearableSetVariants::chooseMethod(column_ptrs, key_sizes));
const size_t rows = block.rows(); const size_t rows = chunk.getNumRows();
IColumn::Filter filter(rows); IColumn::Filter filter(rows);
bool has_new_data = false; bool has_new_data = false;
@ -59,25 +47,36 @@ Block DistinctSortedBlockInputStream::readImpl()
/// Just go to the next block if there isn't any new record in the current one. /// Just go to the next block if there isn't any new record in the current one.
if (!has_new_data) if (!has_new_data)
continue; {
chunk.clear();
return;
}
if (!set_size_limits.check(data.getTotalRowCount(), data.getTotalByteCount(), "DISTINCT", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED)) if (!set_size_limits.check(data.getTotalRowCount(), data.getTotalByteCount(), "DISTINCT", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED))
return {}; {
stopReading();
prev_block.block = block; chunk.clear();
prev_block.clearing_hint_columns = std::move(clearing_hint_columns); return;
size_t all_columns = block.columns();
for (size_t i = 0; i < all_columns; ++i)
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(filter, -1);
return block;
} }
/// Stop reading if we already reached the limit.
if (limit_hint && data.getTotalRowCount() >= limit_hint)
stopReading();
prev_chunk.chunk = std::move(chunk);
prev_chunk.clearing_hint_columns = std::move(clearing_hint_columns);
size_t all_columns = prev_chunk.chunk.getNumColumns();
Chunk res_chunk;
for (size_t i = 0; i < all_columns; ++i)
res_chunk.addColumn(prev_chunk.chunk.getColumns().at(i)->filter(filter, -1));
chunk = std::move(res_chunk);
} }
template <typename Method> template <typename Method>
bool DistinctSortedBlockInputStream::buildFilter( bool DistinctSortedTransform::buildFilter(
Method & method, Method & method,
const ColumnRawPtrs & columns, const ColumnRawPtrs & columns,
const ColumnRawPtrs & clearing_hint_columns, const ColumnRawPtrs & clearing_hint_columns,
@ -90,8 +89,8 @@ bool DistinctSortedBlockInputStream::buildFilter(
/// Compare last row of previous block and first row of current block, /// Compare last row of previous block and first row of current block,
/// If rows not equal, we can clear HashSet, /// If rows not equal, we can clear HashSet,
/// If clearing_hint_columns is empty, we CAN'T clear HashSet. /// If clearing_hint_columns is empty, we CAN'T clear HashSet.
if (!clearing_hint_columns.empty() && !prev_block.clearing_hint_columns.empty() if (!clearing_hint_columns.empty() && !prev_chunk.clearing_hint_columns.empty()
&& !rowsEqual(clearing_hint_columns, 0, prev_block.clearing_hint_columns, prev_block.block.rows() - 1)) && !rowsEqual(clearing_hint_columns, 0, prev_chunk.clearing_hint_columns, prev_chunk.chunk.getNumRows() - 1))
{ {
method.data.clear(); method.data.clear();
} }
@ -117,18 +116,20 @@ bool DistinctSortedBlockInputStream::buildFilter(
return has_new_data; return has_new_data;
} }
ColumnRawPtrs DistinctSortedBlockInputStream::getKeyColumns(const Block & block) const ColumnRawPtrs DistinctSortedTransform::getKeyColumns(const Chunk & chunk) const
{ {
size_t columns = columns_names.empty() ? block.columns() : columns_names.size(); size_t columns = columns_names.empty() ? chunk.getNumColumns() : columns_names.size();
ColumnRawPtrs column_ptrs; ColumnRawPtrs column_ptrs;
column_ptrs.reserve(columns); column_ptrs.reserve(columns);
for (size_t i = 0; i < columns; ++i) for (size_t i = 0; i < columns; ++i)
{ {
const auto & column = columns_names.empty() auto pos = i;
? block.safeGetByPosition(i).column if (!columns_names.empty())
: block.getByName(columns_names[i]).column; pos = input.getHeader().getPositionByName(columns_names[i]);
const auto & column = chunk.getColumns()[pos];
/// Ignore all constant columns. /// Ignore all constant columns.
if (!isColumnConst(*column)) if (!isColumnConst(*column))
@ -138,13 +139,13 @@ ColumnRawPtrs DistinctSortedBlockInputStream::getKeyColumns(const Block & block)
return column_ptrs; return column_ptrs;
} }
ColumnRawPtrs DistinctSortedBlockInputStream::getClearingColumns(const Block & block, const ColumnRawPtrs & key_columns) const ColumnRawPtrs DistinctSortedTransform::getClearingColumns(const Chunk & chunk, const ColumnRawPtrs & key_columns) const
{ {
ColumnRawPtrs clearing_hint_columns; ColumnRawPtrs clearing_hint_columns;
clearing_hint_columns.reserve(description.size()); clearing_hint_columns.reserve(description.size());
for (const auto & sort_column_description : description) for (const auto & sort_column_description : description)
{ {
const auto * sort_column_ptr = block.safeGetByPosition(sort_column_description.column_number).column.get(); const auto * sort_column_ptr = chunk.getColumns().at(sort_column_description.column_number).get();
const auto it = std::find(key_columns.cbegin(), key_columns.cend(), sort_column_ptr); const auto it = std::find(key_columns.cbegin(), key_columns.cend(), sort_column_ptr);
if (it != key_columns.cend()) /// if found in key_columns if (it != key_columns.cend()) /// if found in key_columns
clearing_hint_columns.emplace_back(sort_column_ptr); clearing_hint_columns.emplace_back(sort_column_ptr);
@ -154,7 +155,7 @@ ColumnRawPtrs DistinctSortedBlockInputStream::getClearingColumns(const Block & b
return clearing_hint_columns; return clearing_hint_columns;
} }
bool DistinctSortedBlockInputStream::rowsEqual(const ColumnRawPtrs & lhs, size_t n, const ColumnRawPtrs & rhs, size_t m) bool DistinctSortedTransform::rowsEqual(const ColumnRawPtrs & lhs, size_t n, const ColumnRawPtrs & rhs, size_t m)
{ {
for (size_t column_index = 0, num_columns = lhs.size(); column_index < num_columns; ++column_index) for (size_t column_index = 0, num_columns = lhs.size(); column_index < num_columns; ++column_index)
{ {

View File

@ -1,6 +1,6 @@
#pragma once #pragma once
#include <DataStreams/IBlockInputStream.h> #include <Processors/ISimpleTransform.h>
#include <Interpreters/SetVariants.h> #include <Interpreters/SetVariants.h>
#include <Core/SortDescription.h> #include <Core/SortDescription.h>
@ -18,24 +18,22 @@ namespace DB
* set limit_hint to non zero value. So we stop emitting new rows after * set limit_hint to non zero value. So we stop emitting new rows after
* count of already emitted rows will reach the limit_hint. * count of already emitted rows will reach the limit_hint.
*/ */
class DistinctSortedBlockInputStream : public IBlockInputStream class DistinctSortedTransform : public ISimpleTransform
{ {
public: public:
/// Empty columns_ means all columns. /// Empty columns_ means all columns.
DistinctSortedBlockInputStream(const BlockInputStreamPtr & input, SortDescription sort_description, const SizeLimits & set_size_limits_, UInt64 limit_hint_, const Names & columns); DistinctSortedTransform(const Block & header, SortDescription sort_description, const SizeLimits & set_size_limits_, UInt64 limit_hint_, const Names & columns);
String getName() const override { return "DistinctSorted"; } String getName() const override { return "DistinctSortedTransform"; }
Block getHeader() const override { return children.at(0)->getHeader(); }
protected: protected:
Block readImpl() override; void transform(Chunk & chunk) override;
private: private:
ColumnRawPtrs getKeyColumns(const Block & block) const; ColumnRawPtrs getKeyColumns(const Chunk & chunk) const;
/// When clearing_columns changed, we can clean HashSet to memory optimization /// When clearing_columns changed, we can clean HashSet to memory optimization
/// clearing_columns is a left-prefix of SortDescription exists in key_columns /// clearing_columns is a left-prefix of SortDescription exists in key_columns
ColumnRawPtrs getClearingColumns(const Block & block, const ColumnRawPtrs & key_columns) const; ColumnRawPtrs getClearingColumns(const Chunk & chunk, const ColumnRawPtrs & key_columns) const;
static bool rowsEqual(const ColumnRawPtrs & lhs, size_t n, const ColumnRawPtrs & rhs, size_t m); static bool rowsEqual(const ColumnRawPtrs & lhs, size_t n, const ColumnRawPtrs & rhs, size_t m);
/// return true if has new data /// return true if has new data
@ -50,12 +48,12 @@ private:
SortDescription description; SortDescription description;
struct PreviousBlock struct PreviousChunk
{ {
Block block; Chunk chunk;
ColumnRawPtrs clearing_hint_columns; ColumnRawPtrs clearing_hint_columns;
}; };
PreviousBlock prev_block; PreviousChunk prev_chunk;
Names columns_names; Names columns_names;
ClearableSetVariants data; ClearableSetVariants data;

View File

@ -16,18 +16,17 @@
namespace DB namespace DB
{ {
TTLBlockInputStream::TTLBlockInputStream( TTLTransform::TTLTransform(
const BlockInputStreamPtr & input_, const Block & header_,
const MergeTreeData & storage_, const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_, const StorageMetadataPtr & metadata_snapshot_,
const MergeTreeData::MutableDataPartPtr & data_part_, const MergeTreeData::MutableDataPartPtr & data_part_,
time_t current_time_, time_t current_time_,
bool force_) bool force_)
: data_part(data_part_) : IAccumulatingTransform(header_, header_)
, log(&Poco::Logger::get(storage_.getLogName() + " (TTLBlockInputStream)")) , data_part(data_part_)
, log(&Poco::Logger::get(storage_.getLogName() + " (TTLTransform)"))
{ {
children.push_back(input_);
header = children.at(0)->getHeader();
auto old_ttl_infos = data_part->ttl_infos; auto old_ttl_infos = data_part->ttl_infos;
if (metadata_snapshot_->hasRowsTTL()) if (metadata_snapshot_->hasRowsTTL())
@ -50,7 +49,7 @@ TTLBlockInputStream::TTLBlockInputStream(
for (const auto & group_by_ttl : metadata_snapshot_->getGroupByTTLs()) for (const auto & group_by_ttl : metadata_snapshot_->getGroupByTTLs())
algorithms.emplace_back(std::make_unique<TTLAggregationAlgorithm>( algorithms.emplace_back(std::make_unique<TTLAggregationAlgorithm>(
group_by_ttl, old_ttl_infos.group_by_ttl[group_by_ttl.result_column], current_time_, force_, header, storage_)); group_by_ttl, old_ttl_infos.group_by_ttl[group_by_ttl.result_column], current_time_, force_, getInputPort().getHeader(), storage_));
if (metadata_snapshot_->hasAnyColumnTTL()) if (metadata_snapshot_->hasAnyColumnTTL())
{ {
@ -98,22 +97,40 @@ Block reorderColumns(Block block, const Block & header)
return res; return res;
} }
Block TTLBlockInputStream::readImpl() void TTLTransform::consume(Chunk chunk)
{ {
if (all_data_dropped) if (all_data_dropped)
return {}; {
finishConsume();
return;
}
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
auto block = children.at(0)->read();
for (const auto & algorithm : algorithms) for (const auto & algorithm : algorithms)
algorithm->execute(block); algorithm->execute(block);
if (!block) if (!block)
return block; return;
return reorderColumns(std::move(block), header); size_t num_rows = block.rows();
setReadyChunk(Chunk(reorderColumns(std::move(block), getOutputPort().getHeader()).getColumns(), num_rows));
} }
void TTLBlockInputStream::readSuffixImpl() Chunk TTLTransform::generate()
{
Block block;
for (const auto & algorithm : algorithms)
algorithm->execute(block);
if (!block)
return {};
size_t num_rows = block.rows();
return Chunk(reorderColumns(std::move(block), getOutputPort().getHeader()).getColumns(), num_rows);
}
void TTLTransform::finalize()
{ {
data_part->ttl_infos = {}; data_part->ttl_infos = {};
for (const auto & algorithm : algorithms) for (const auto & algorithm : algorithms)
@ -126,4 +143,13 @@ void TTLBlockInputStream::readSuffixImpl()
} }
} }
IProcessor::Status TTLTransform::prepare()
{
auto status = IAccumulatingTransform::prepare();
if (status == Status::Finished)
finalize();
return status;
}
} }

View File

@ -1,5 +1,5 @@
#pragma once #pragma once
#include <DataStreams/IBlockInputStream.h> #include <Processors/IAccumulatingTransform.h>
#include <Storages/MergeTree/MergeTreeData.h> #include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h> #include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Core/Block.h> #include <Core/Block.h>
@ -12,11 +12,11 @@
namespace DB namespace DB
{ {
class TTLBlockInputStream : public IBlockInputStream class TTLTransform : public IAccumulatingTransform
{ {
public: public:
TTLBlockInputStream( TTLTransform(
const BlockInputStreamPtr & input_, const Block & header_,
const MergeTreeData & storage_, const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_, const StorageMetadataPtr & metadata_snapshot_,
const MergeTreeData::MutableDataPartPtr & data_part_, const MergeTreeData::MutableDataPartPtr & data_part_,
@ -25,13 +25,15 @@ public:
); );
String getName() const override { return "TTL"; } String getName() const override { return "TTL"; }
Block getHeader() const override { return header; }
Status prepare() override;
protected: protected:
Block readImpl() override; void consume(Chunk chunk) override;
Chunk generate() override;
/// Finalizes ttl infos and updates data part /// Finalizes ttl infos and updates data part
void readSuffixImpl() override; void finalize();
private: private:
std::vector<TTLAlgorithmPtr> algorithms; std::vector<TTLAlgorithmPtr> algorithms;
@ -41,7 +43,6 @@ private:
/// ttl_infos and empty_columns are updating while reading /// ttl_infos and empty_columns are updating while reading
const MergeTreeData::MutableDataPartPtr & data_part; const MergeTreeData::MutableDataPartPtr & data_part;
Poco::Logger * log; Poco::Logger * log;
Block header;
}; };
} }

View File

@ -4,18 +4,17 @@
namespace DB namespace DB
{ {
TTLCalcInputStream::TTLCalcInputStream( TTLCalcTransform::TTLCalcTransform(
const BlockInputStreamPtr & input_, const Block & header_,
const MergeTreeData & storage_, const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_, const StorageMetadataPtr & metadata_snapshot_,
const MergeTreeData::MutableDataPartPtr & data_part_, const MergeTreeData::MutableDataPartPtr & data_part_,
time_t current_time_, time_t current_time_,
bool force_) bool force_)
: data_part(data_part_) : IAccumulatingTransform(header_, header_)
, log(&Poco::Logger::get(storage_.getLogName() + " (TTLCalcInputStream)")) , data_part(data_part_)
, log(&Poco::Logger::get(storage_.getLogName() + " (TTLCalcTransform)"))
{ {
children.push_back(input_);
header = children.at(0)->getHeader();
auto old_ttl_infos = data_part->ttl_infos; auto old_ttl_infos = data_part->ttl_infos;
if (metadata_snapshot_->hasRowsTTL()) if (metadata_snapshot_->hasRowsTTL())
@ -51,27 +50,52 @@ TTLCalcInputStream::TTLCalcInputStream(
recompression_ttl, TTLUpdateField::RECOMPRESSION_TTL, recompression_ttl.result_column, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_)); recompression_ttl, TTLUpdateField::RECOMPRESSION_TTL, recompression_ttl.result_column, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_));
} }
Block TTLCalcInputStream::readImpl() void TTLCalcTransform::consume(Chunk chunk)
{ {
auto block = children.at(0)->read(); auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
for (const auto & algorithm : algorithms) for (const auto & algorithm : algorithms)
algorithm->execute(block); algorithm->execute(block);
if (!block) if (!block)
return block; return;
Block res; Chunk res;
for (const auto & col : header) for (const auto & col : getOutputPort().getHeader())
res.insert(block.getByName(col.name)); res.addColumn(block.getByName(col.name).column);
setReadyChunk(std::move(res));
}
Chunk TTLCalcTransform::generate()
{
Block block;
for (const auto & algorithm : algorithms)
algorithm->execute(block);
if (!block)
return {};
Chunk res;
for (const auto & col : getOutputPort().getHeader())
res.addColumn(block.getByName(col.name).column);
return res; return res;
} }
void TTLCalcInputStream::readSuffixImpl() void TTLCalcTransform::finalize()
{ {
data_part->ttl_infos = {}; data_part->ttl_infos = {};
for (const auto & algorithm : algorithms) for (const auto & algorithm : algorithms)
algorithm->finalize(data_part); algorithm->finalize(data_part);
} }
IProcessor::Status TTLCalcTransform::prepare()
{
auto status = IAccumulatingTransform::prepare();
if (status == Status::Finished)
finalize();
return status;
}
} }

View File

@ -1,5 +1,5 @@
#pragma once #pragma once
#include <DataStreams/IBlockInputStream.h> #include <Processors/IAccumulatingTransform.h>
#include <Storages/MergeTree/MergeTreeData.h> #include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h> #include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Core/Block.h> #include <Core/Block.h>
@ -11,11 +11,11 @@
namespace DB namespace DB
{ {
class TTLCalcInputStream : public IBlockInputStream class TTLCalcTransform : public IAccumulatingTransform
{ {
public: public:
TTLCalcInputStream( TTLCalcTransform(
const BlockInputStreamPtr & input_, const Block & header_,
const MergeTreeData & storage_, const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_, const StorageMetadataPtr & metadata_snapshot_,
const MergeTreeData::MutableDataPartPtr & data_part_, const MergeTreeData::MutableDataPartPtr & data_part_,
@ -24,13 +24,14 @@ public:
); );
String getName() const override { return "TTL_CALC"; } String getName() const override { return "TTL_CALC"; }
Block getHeader() const override { return header; } Status prepare() override;
protected: protected:
Block readImpl() override; void consume(Chunk chunk) override;
Chunk generate() override;
/// Finalizes ttl infos and updates data part /// Finalizes ttl infos and updates data part
void readSuffixImpl() override; void finalize();
private: private:
std::vector<TTLAlgorithmPtr> algorithms; std::vector<TTLAlgorithmPtr> algorithms;
@ -38,7 +39,6 @@ private:
/// ttl_infos and empty_columns are updating while reading /// ttl_infos and empty_columns are updating while reading
const MergeTreeData::MutableDataPartPtr & data_part; const MergeTreeData::MutableDataPartPtr & data_part;
Poco::Logger * log; Poco::Logger * log;
Block header;
}; };
} }

View File

@ -932,7 +932,7 @@ void MutationsInterpreter::validate()
auto pipeline = addStreamsForLaterStages(stages, plan); auto pipeline = addStreamsForLaterStages(stages, plan);
} }
BlockInputStreamPtr MutationsInterpreter::execute() QueryPipeline MutationsInterpreter::execute()
{ {
if (!can_execute) if (!can_execute)
throw Exception("Cannot execute mutations interpreter because can_execute flag set to false", ErrorCodes::LOGICAL_ERROR); throw Exception("Cannot execute mutations interpreter because can_execute flag set to false", ErrorCodes::LOGICAL_ERROR);
@ -956,12 +956,11 @@ BlockInputStreamPtr MutationsInterpreter::execute()
} }
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
BlockInputStreamPtr result_stream = std::make_shared<PipelineExecutingBlockInputStream>(std::move(pipeline));
if (!updated_header) if (!updated_header)
updated_header = std::make_unique<Block>(result_stream->getHeader()); updated_header = std::make_unique<Block>(pipeline.getHeader());
return result_stream; return pipeline;
} }
Block MutationsInterpreter::getUpdatedHeader() const Block MutationsInterpreter::getUpdatedHeader() const

View File

@ -50,7 +50,7 @@ public:
size_t evaluateCommandsSize(); size_t evaluateCommandsSize();
/// The resulting stream will return blocks containing only changed columns and columns, that we need to recalculate indices. /// The resulting stream will return blocks containing only changed columns and columns, that we need to recalculate indices.
BlockInputStreamPtr execute(); QueryPipeline execute();
/// Only changed columns. /// Only changed columns.
Block getUpdatedHeader() const; Block getUpdatedHeader() const;

View File

@ -104,7 +104,9 @@ Columns Chunk::detachColumns()
void Chunk::addColumn(ColumnPtr column) void Chunk::addColumn(ColumnPtr column)
{ {
if (column->size() != num_rows) if (empty())
num_rows = column->size();
else if (column->size() != num_rows)
throw Exception("Invalid number of rows in Chunk column " + column->getName()+ ": expected " + throw Exception("Invalid number of rows in Chunk column " + column->getName()+ ": expected " +
toString(num_rows) + ", got " + toString(column->size()), ErrorCodes::LOGICAL_ERROR); toString(num_rows) + ", got " + toString(column->size()), ErrorCodes::LOGICAL_ERROR);

View File

@ -589,7 +589,6 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
block.getNamesAndTypesList(), block.getNamesAndTypesList(),
{}, {},
CompressionCodecFactory::instance().get("NONE", {})); CompressionCodecFactory::instance().get("NONE", {}));
part_out.writePrefix();
part_out.write(block); part_out.write(block);
part_out.writeSuffixAndFinalizePart(new_projection_part); part_out.writeSuffixAndFinalizePart(new_projection_part);
new_projection_part->checksums.checkEqual(checksums, /* have_uncompressed = */ true); new_projection_part->checksums.checkEqual(checksums, /* have_uncompressed = */ true);
@ -612,7 +611,6 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
MergedBlockOutputStream part_out( MergedBlockOutputStream part_out(
new_data_part, metadata_snapshot, block.getNamesAndTypesList(), {}, CompressionCodecFactory::instance().get("NONE", {})); new_data_part, metadata_snapshot, block.getNamesAndTypesList(), {}, CompressionCodecFactory::instance().get("NONE", {}));
part_out.writePrefix();
part_out.write(block); part_out.write(block);
part_out.writeSuffixAndFinalizePart(new_data_part); part_out.writeSuffixAndFinalizePart(new_data_part);
new_data_part->checksums.checkEqual(checksums, /* have_uncompressed = */ true); new_data_part->checksums.checkEqual(checksums, /* have_uncompressed = */ true);

View File

@ -2,22 +2,25 @@
#include <Storages/MergeTree/MergeTreeIndexGranularity.h> #include <Storages/MergeTree/MergeTreeIndexGranularity.h>
#include <Storages/MergeTree/MergeTreeData.h> #include <Storages/MergeTree/MergeTreeData.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h> #include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/IMergeTreeDataPartWriter.h> #include <Storages/MergeTree/IMergeTreeDataPartWriter.h>
namespace DB namespace DB
{ {
class IMergedBlockOutputStream : public IBlockOutputStream class IMergedBlockOutputStream
{ {
public: public:
IMergedBlockOutputStream( IMergedBlockOutputStream(
const MergeTreeDataPartPtr & data_part, const MergeTreeDataPartPtr & data_part,
const StorageMetadataPtr & metadata_snapshot_); const StorageMetadataPtr & metadata_snapshot_);
virtual ~IMergedBlockOutputStream() = default;
using WrittenOffsetColumns = std::set<std::string>; using WrittenOffsetColumns = std::set<std::string>;
virtual void write(const Block & block) = 0;
const MergeTreeIndexGranularity & getIndexGranularity() const const MergeTreeIndexGranularity & getIndexGranularity() const
{ {
return writer->getIndexGranularity(); return writer->getIndexGranularity();
@ -35,7 +38,6 @@ protected:
NamesAndTypesList & columns, NamesAndTypesList & columns,
MergeTreeData::DataPart::Checksums & checksums); MergeTreeData::DataPart::Checksums & checksums);
protected:
const MergeTreeData & storage; const MergeTreeData & storage;
StorageMetadataPtr metadata_snapshot; StorageMetadataPtr metadata_snapshot;

View File

@ -11,6 +11,7 @@
#include "Storages/MergeTree/MergeTreeSequentialSource.h" #include "Storages/MergeTree/MergeTreeSequentialSource.h"
#include "Storages/MergeTree/FutureMergedMutatedPart.h" #include "Storages/MergeTree/FutureMergedMutatedPart.h"
#include "Processors/Transforms/ExpressionTransform.h" #include "Processors/Transforms/ExpressionTransform.h"
#include "Processors/Transforms/MaterializingTransform.h"
#include "Processors/Merges/MergingSortedTransform.h" #include "Processors/Merges/MergingSortedTransform.h"
#include "Processors/Merges/CollapsingSortedTransform.h" #include "Processors/Merges/CollapsingSortedTransform.h"
#include "Processors/Merges/SummingSortedTransform.h" #include "Processors/Merges/SummingSortedTransform.h"
@ -236,11 +237,6 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
ctx->compression_codec, ctx->compression_codec,
ctx->blocks_are_granules_size); ctx->blocks_are_granules_size);
global_ctx->merged_stream->readPrefix();
/// TODO: const
const_cast<MergedBlockOutputStream&>(*global_ctx->to).writePrefix();
global_ctx->rows_written = 0; global_ctx->rows_written = 0;
ctx->initial_reservation = global_ctx->space_reservation ? global_ctx->space_reservation->getSize() : 0; ctx->initial_reservation = global_ctx->space_reservation ? global_ctx->space_reservation->getSize() : 0;
@ -301,14 +297,17 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::execute()
bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl() bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl()
{ {
Block block; Block block;
if (!ctx->is_cancelled() && (block = global_ctx->merged_stream->read())) if (!ctx->is_cancelled() && (global_ctx->merging_executor->pull(block)))
{ {
global_ctx->rows_written += block.rows(); global_ctx->rows_written += block.rows();
const_cast<MergedBlockOutputStream &>(*global_ctx->to).write(block); const_cast<MergedBlockOutputStream &>(*global_ctx->to).write(block);
global_ctx->merge_list_element_ptr->rows_written = global_ctx->merged_stream->getProfileInfo().rows; UInt64 result_rows = 0;
global_ctx->merge_list_element_ptr->bytes_written_uncompressed = global_ctx->merged_stream->getProfileInfo().bytes; UInt64 result_bytes = 0;
global_ctx->merged_pipeline.tryGetResultRowsAndBytes(result_rows, result_bytes);
global_ctx->merge_list_element_ptr->rows_written = result_rows;
global_ctx->merge_list_element_ptr->bytes_written_uncompressed = result_bytes;
/// Reservation updates is not performed yet, during the merge it may lead to higher free space requirements /// Reservation updates is not performed yet, during the merge it may lead to higher free space requirements
if (global_ctx->space_reservation && ctx->sum_input_rows_upper_bound) if (global_ctx->space_reservation && ctx->sum_input_rows_upper_bound)
@ -326,8 +325,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl()
return true; return true;
} }
global_ctx->merged_stream->readSuffix(); global_ctx->merging_executor.reset();
global_ctx->merged_stream.reset(); global_ctx->merged_pipeline.reset();
if (global_ctx->merges_blocker->isCancelled()) if (global_ctx->merges_blocker->isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED); throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
@ -353,8 +352,6 @@ bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const
global_ctx->merge_list_element_ptr->columns_written = global_ctx->merging_column_names.size(); global_ctx->merge_list_element_ptr->columns_written = global_ctx->merging_column_names.size();
global_ctx->merge_list_element_ptr->progress.store(ctx->column_sizes->keyColumnsWeight(), std::memory_order_relaxed); global_ctx->merge_list_element_ptr->progress.store(ctx->column_sizes->keyColumnsWeight(), std::memory_order_relaxed);
ctx->column_part_streams = BlockInputStreams(global_ctx->future_part->parts.size());
ctx->rows_sources_write_buf->next(); ctx->rows_sources_write_buf->next();
ctx->rows_sources_uncompressed_write_buf->next(); ctx->rows_sources_uncompressed_write_buf->next();
/// Ensure data has written to disk. /// Ensure data has written to disk.
@ -389,6 +386,7 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
global_ctx->column_progress = std::make_unique<MergeStageProgress>(ctx->progress_before, ctx->column_sizes->columnWeight(column_name)); global_ctx->column_progress = std::make_unique<MergeStageProgress>(ctx->progress_before, ctx->column_sizes->columnWeight(column_name));
Pipes pipes;
for (size_t part_num = 0; part_num < global_ctx->future_part->parts.size(); ++part_num) for (size_t part_num = 0; part_num < global_ctx->future_part->parts.size(); ++part_num)
{ {
auto column_part_source = std::make_shared<MergeTreeSequentialSource>( auto column_part_source = std::make_shared<MergeTreeSequentialSource>(
@ -398,20 +396,22 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
column_part_source->setProgressCallback( column_part_source->setProgressCallback(
MergeProgressCallback(global_ctx->merge_list_element_ptr, global_ctx->watch_prev_elapsed, *global_ctx->column_progress)); MergeProgressCallback(global_ctx->merge_list_element_ptr, global_ctx->watch_prev_elapsed, *global_ctx->column_progress));
QueryPipeline column_part_pipeline(Pipe(std::move(column_part_source))); pipes.emplace_back(std::move(column_part_source));
column_part_pipeline.setNumThreads(1);
ctx->column_part_streams[part_num] =
std::make_shared<PipelineExecutingBlockInputStream>(std::move(column_part_pipeline));
} }
auto pipe = Pipe::unitePipes(std::move(pipes));
ctx->rows_sources_read_buf->seek(0, 0); ctx->rows_sources_read_buf->seek(0, 0);
ctx->column_gathered_stream = std::make_unique<ColumnGathererStream>(column_name, ctx->column_part_streams, *ctx->rows_sources_read_buf); auto transform = std::make_unique<ColumnGathererTransform>(pipe.getHeader(), pipe.numOutputPorts(), *ctx->rows_sources_read_buf);
pipe.addTransform(std::move(transform));
ctx->column_parts_pipeline = QueryPipeline(std::move(pipe));
ctx->executor = std::make_unique<PullingPipelineExecutor>(ctx->column_parts_pipeline);
ctx->column_to = std::make_unique<MergedColumnOnlyOutputStream>( ctx->column_to = std::make_unique<MergedColumnOnlyOutputStream>(
global_ctx->new_data_part, global_ctx->new_data_part,
global_ctx->metadata_snapshot, global_ctx->metadata_snapshot,
ctx->column_gathered_stream->getHeader(), ctx->executor->getHeader(),
ctx->compression_codec, ctx->compression_codec,
/// we don't need to recalc indices here /// we don't need to recalc indices here
/// because all of them were already recalculated and written /// because all of them were already recalculated and written
@ -421,15 +421,13 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
global_ctx->to->getIndexGranularity()); global_ctx->to->getIndexGranularity());
ctx->column_elems_written = 0; ctx->column_elems_written = 0;
ctx->column_to->writePrefix();
} }
bool MergeTask::VerticalMergeStage::executeVerticalMergeForOneColumn() const bool MergeTask::VerticalMergeStage::executeVerticalMergeForOneColumn() const
{ {
Block block; Block block;
if (!global_ctx->merges_blocker->isCancelled() && (block = ctx->column_gathered_stream->read())) if (!global_ctx->merges_blocker->isCancelled() && ctx->executor->pull(block))
{ {
ctx->column_elems_written += block.rows(); ctx->column_elems_written += block.rows();
ctx->column_to->write(block); ctx->column_to->write(block);
@ -447,7 +445,7 @@ void MergeTask::VerticalMergeStage::finalizeVerticalMergeForOneColumn() const
if (global_ctx->merges_blocker->isCancelled()) if (global_ctx->merges_blocker->isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED); throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
ctx->column_gathered_stream->readSuffix(); ctx->executor.reset();
auto changed_checksums = ctx->column_to->writeSuffixAndGetChecksums(global_ctx->new_data_part, global_ctx->checksums_gathered_columns, ctx->need_sync); auto changed_checksums = ctx->column_to->writeSuffixAndGetChecksums(global_ctx->new_data_part, global_ctx->checksums_gathered_columns, ctx->need_sync);
global_ctx->checksums_gathered_columns.add(std::move(changed_checksums)); global_ctx->checksums_gathered_columns.add(std::move(changed_checksums));
@ -457,10 +455,14 @@ void MergeTask::VerticalMergeStage::finalizeVerticalMergeForOneColumn() const
", but " + toString(global_ctx->rows_written) + " rows of PK columns", ErrorCodes::LOGICAL_ERROR); ", but " + toString(global_ctx->rows_written) + " rows of PK columns", ErrorCodes::LOGICAL_ERROR);
} }
UInt64 rows = 0;
UInt64 bytes = 0;
ctx->column_parts_pipeline.tryGetResultRowsAndBytes(rows, bytes);
/// NOTE: 'progress' is modified by single thread, but it may be concurrently read from MergeListElement::getInfo() (StorageSystemMerges). /// NOTE: 'progress' is modified by single thread, but it may be concurrently read from MergeListElement::getInfo() (StorageSystemMerges).
global_ctx->merge_list_element_ptr->columns_written += 1; global_ctx->merge_list_element_ptr->columns_written += 1;
global_ctx->merge_list_element_ptr->bytes_written_uncompressed += ctx->column_gathered_stream->getProfileInfo().bytes; global_ctx->merge_list_element_ptr->bytes_written_uncompressed += bytes;
global_ctx->merge_list_element_ptr->progress.store(ctx->progress_before + ctx->column_sizes->columnWeight(column_name), std::memory_order_relaxed); global_ctx->merge_list_element_ptr->progress.store(ctx->progress_before + ctx->column_sizes->columnWeight(column_name), std::memory_order_relaxed);
/// This is the external cycle increment. /// This is the external cycle increment.
@ -799,26 +801,25 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
auto res_pipe = Pipe::unitePipes(std::move(pipes)); auto res_pipe = Pipe::unitePipes(std::move(pipes));
res_pipe.addTransform(std::move(merged_transform)); res_pipe.addTransform(std::move(merged_transform));
QueryPipeline pipeline(std::move(res_pipe));
pipeline.setNumThreads(1);
global_ctx->merged_stream = std::make_shared<PipelineExecutingBlockInputStream>(std::move(pipeline));
if (global_ctx->deduplicate) if (global_ctx->deduplicate)
global_ctx->merged_stream = std::make_shared<DistinctSortedBlockInputStream>( res_pipe.addTransform(std::make_shared<DistinctSortedTransform>(
global_ctx->merged_stream, sort_description, SizeLimits(), 0 /*limit_hint*/, global_ctx->deduplicate_by_columns); res_pipe.getHeader(), sort_description, SizeLimits(), 0 /*limit_hint*/, global_ctx->deduplicate_by_columns));
if (ctx->need_remove_expired_values) if (ctx->need_remove_expired_values)
global_ctx->merged_stream = std::make_shared<TTLBlockInputStream>( res_pipe.addTransform(std::make_shared<TTLTransform>(
global_ctx->merged_stream, *global_ctx->data, global_ctx->metadata_snapshot, global_ctx->new_data_part, global_ctx->time_of_merge, ctx->force_ttl); res_pipe.getHeader(), *global_ctx->data, global_ctx->metadata_snapshot, global_ctx->new_data_part, global_ctx->time_of_merge, ctx->force_ttl));
if (global_ctx->metadata_snapshot->hasSecondaryIndices()) if (global_ctx->metadata_snapshot->hasSecondaryIndices())
{ {
const auto & indices = global_ctx->metadata_snapshot->getSecondaryIndices(); const auto & indices = global_ctx->metadata_snapshot->getSecondaryIndices();
global_ctx->merged_stream = std::make_shared<ExpressionBlockInputStream>( res_pipe.addTransform(std::make_shared<ExpressionTransform>(
global_ctx->merged_stream, indices.getSingleExpressionForIndices(global_ctx->metadata_snapshot->getColumns(), global_ctx->data->getContext())); res_pipe.getHeader(), indices.getSingleExpressionForIndices(global_ctx->metadata_snapshot->getColumns(), global_ctx->data->getContext())));
global_ctx->merged_stream = std::make_shared<MaterializingBlockInputStream>(global_ctx->merged_stream); res_pipe.addTransform(std::make_shared<MaterializingTransform>(res_pipe.getHeader()));
} }
global_ctx->merged_pipeline = QueryPipeline(std::move(res_pipe));
global_ctx->merging_executor = std::make_unique<PullingPipelineExecutor>(global_ctx->merged_pipeline);
} }

View File

@ -9,6 +9,7 @@
#include <Storages/MergeTree/ColumnSizeEstimator.h> #include <Storages/MergeTree/ColumnSizeEstimator.h>
#include <Storages/MergeTree/MergedColumnOnlyOutputStream.h> #include <Storages/MergeTree/MergedColumnOnlyOutputStream.h>
#include <DataStreams/ColumnGathererStream.h> #include <DataStreams/ColumnGathererStream.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Compression/CompressedReadBufferFromFile.h> #include <Compression/CompressedReadBufferFromFile.h>
#include <memory> #include <memory>
@ -147,7 +148,8 @@ private:
std::unique_ptr<MergeStageProgress> column_progress{nullptr}; std::unique_ptr<MergeStageProgress> column_progress{nullptr};
std::shared_ptr<MergedBlockOutputStream> to{nullptr}; std::shared_ptr<MergedBlockOutputStream> to{nullptr};
BlockInputStreamPtr merged_stream{nullptr}; QueryPipeline merged_pipeline;
std::unique_ptr<PullingPipelineExecutor> merging_executor;
SyncGuardPtr sync_guard{nullptr}; SyncGuardPtr sync_guard{nullptr};
MergeTreeData::MutableDataPartPtr new_data_part{nullptr}; MergeTreeData::MutableDataPartPtr new_data_part{nullptr};
@ -263,8 +265,8 @@ private:
Float64 progress_before = 0; Float64 progress_before = 0;
std::unique_ptr<MergedColumnOnlyOutputStream> column_to{nullptr}; std::unique_ptr<MergedColumnOnlyOutputStream> column_to{nullptr};
size_t column_elems_written{0}; size_t column_elems_written{0};
BlockInputStreams column_part_streams; QueryPipeline column_parts_pipeline;
std::unique_ptr<ColumnGathererStream> column_gathered_stream; std::unique_ptr<PullingPipelineExecutor> executor;
std::unique_ptr<CompressedReadBufferFromFile> rows_sources_read_buf{nullptr}; std::unique_ptr<CompressedReadBufferFromFile> rows_sources_read_buf{nullptr};
}; };

View File

@ -92,7 +92,6 @@ void MergeTreeDataPartInMemory::flushToDisk(const String & base_path, const Stri
auto compression_codec = storage.getContext()->chooseCompressionCodec(0, 0); auto compression_codec = storage.getContext()->chooseCompressionCodec(0, 0);
auto indices = MergeTreeIndexFactory::instance().getMany(metadata_snapshot->getSecondaryIndices()); auto indices = MergeTreeIndexFactory::instance().getMany(metadata_snapshot->getSecondaryIndices());
MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns, indices, compression_codec); MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns, indices, compression_codec);
out.writePrefix();
out.write(block); out.write(block);
const auto & projections = metadata_snapshot->getProjections(); const auto & projections = metadata_snapshot->getProjections();
for (const auto & [projection_name, projection] : projection_parts) for (const auto & [projection_name, projection] : projection_parts)
@ -123,7 +122,6 @@ void MergeTreeDataPartInMemory::flushToDisk(const String & base_path, const Stri
auto projection_indices = MergeTreeIndexFactory::instance().getMany(desc.metadata->getSecondaryIndices()); auto projection_indices = MergeTreeIndexFactory::instance().getMany(desc.metadata->getSecondaryIndices());
MergedBlockOutputStream projection_out( MergedBlockOutputStream projection_out(
projection_data_part, desc.metadata, projection_part->columns, projection_indices, projection_compression_codec); projection_data_part, desc.metadata, projection_part->columns, projection_indices, projection_compression_codec);
projection_out.writePrefix();
projection_out.write(projection_part->block); projection_out.write(projection_part->block);
projection_out.writeSuffixAndFinalizePart(projection_data_part); projection_out.writeSuffixAndFinalizePart(projection_data_part);
new_data_part->addProjectionPart(projection_name, std::move(projection_data_part)); new_data_part->addProjectionPart(projection_name, std::move(projection_data_part));

View File

@ -412,7 +412,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(
MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns, index_factory.getMany(metadata_snapshot->getSecondaryIndices()), compression_codec); MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns, index_factory.getMany(metadata_snapshot->getSecondaryIndices()), compression_codec);
bool sync_on_insert = data.getSettings()->fsync_after_insert; bool sync_on_insert = data.getSettings()->fsync_after_insert;
out.writePrefix();
out.writeWithPermutation(block, perm_ptr); out.writeWithPermutation(block, perm_ptr);
for (const auto & projection : metadata_snapshot->getProjections()) for (const auto & projection : metadata_snapshot->getProjections())
@ -508,7 +507,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeProjectionPartImpl(
{}, {},
compression_codec); compression_codec);
out.writePrefix();
out.writeWithPermutation(block, perm_ptr); out.writeWithPermutation(block, perm_ptr);
out.writeSuffixAndFinalizePart(new_data_part); out.writeSuffixAndFinalizePart(new_data_part);

View File

@ -202,7 +202,6 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor
if (metadata_snapshot->hasSortingKey()) if (metadata_snapshot->hasSortingKey())
metadata_snapshot->getSortingKey().expression->execute(block); metadata_snapshot->getSortingKey().expression->execute(block);
part_out.writePrefix();
part_out.write(block); part_out.write(block);
for (const auto & projection : metadata_snapshot->getProjections()) for (const auto & projection : metadata_snapshot->getProjections())

View File

@ -8,7 +8,6 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
} }
@ -51,11 +50,6 @@ void MergedBlockOutputStream::writeWithPermutation(const Block & block, const IC
writeImpl(block, permutation); writeImpl(block, permutation);
} }
void MergedBlockOutputStream::writeSuffix()
{
throw Exception("Method writeSuffix is not supported by MergedBlockOutputStream", ErrorCodes::NOT_IMPLEMENTED);
}
void MergedBlockOutputStream::writeSuffixAndFinalizePart( void MergedBlockOutputStream::writeSuffixAndFinalizePart(
MergeTreeData::MutableDataPartPtr & new_part, MergeTreeData::MutableDataPartPtr & new_part,
bool sync, bool sync,

View File

@ -21,7 +21,7 @@ public:
CompressionCodecPtr default_codec_, CompressionCodecPtr default_codec_,
bool blocks_are_granules_size = false); bool blocks_are_granules_size = false);
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); } Block getHeader() const { return metadata_snapshot->getSampleBlock(); }
/// If the data is pre-sorted. /// If the data is pre-sorted.
void write(const Block & block) override; void write(const Block & block) override;
@ -31,8 +31,6 @@ public:
*/ */
void writeWithPermutation(const Block & block, const IColumn::Permutation * permutation); void writeWithPermutation(const Block & block, const IColumn::Permutation * permutation);
void writeSuffix() override;
/// Finalize writing part and fill inner structures /// Finalize writing part and fill inner structures
/// If part is new and contains projections, they should be added before invoking this method. /// If part is new and contains projections, they should be added before invoking this method.
void writeSuffixAndFinalizePart( void writeSuffixAndFinalizePart(
@ -53,7 +51,6 @@ private:
MergeTreeData::DataPart::Checksums & checksums, MergeTreeData::DataPart::Checksums & checksums,
bool sync); bool sync);
private:
NamesAndTypesList columns_list; NamesAndTypesList columns_list;
IMergeTreeDataPart::MinMaxIndex minmax_idx; IMergeTreeDataPart::MinMaxIndex minmax_idx;
size_t rows_count = 0; size_t rows_count = 0;

View File

@ -53,11 +53,6 @@ void MergedColumnOnlyOutputStream::write(const Block & block)
writer->write(block, nullptr); writer->write(block, nullptr);
} }
void MergedColumnOnlyOutputStream::writeSuffix()
{
throw Exception("Method writeSuffix is not supported by MergedColumnOnlyOutputStream", ErrorCodes::NOT_IMPLEMENTED);
}
MergeTreeData::DataPart::Checksums MergeTreeData::DataPart::Checksums
MergedColumnOnlyOutputStream::writeSuffixAndGetChecksums( MergedColumnOnlyOutputStream::writeSuffixAndGetChecksums(
MergeTreeData::MutableDataPartPtr & new_part, MergeTreeData::MutableDataPartPtr & new_part,

View File

@ -23,9 +23,8 @@ public:
const MergeTreeIndexGranularity & index_granularity = {}, const MergeTreeIndexGranularity & index_granularity = {},
const MergeTreeIndexGranularityInfo * index_granularity_info_ = nullptr); const MergeTreeIndexGranularityInfo * index_granularity_info_ = nullptr);
Block getHeader() const override { return header; } Block getHeader() const { return header; }
void write(const Block & block) override; void write(const Block & block) override;
void writeSuffix() override;
MergeTreeData::DataPart::Checksums MergeTreeData::DataPart::Checksums
writeSuffixAndGetChecksums(MergeTreeData::MutableDataPartPtr & new_part, MergeTreeData::DataPart::Checksums & all_checksums, bool sync = false); writeSuffixAndGetChecksums(MergeTreeData::MutableDataPartPtr & new_part, MergeTreeData::DataPart::Checksums & all_checksums, bool sync = false);

View File

@ -11,6 +11,9 @@
#include <DataStreams/SquashingBlockInputStream.h> #include <DataStreams/SquashingBlockInputStream.h>
#include <Parsers/queryToString.h> #include <Parsers/queryToString.h>
#include <Processors/Sources/SourceFromSingleChunk.h> #include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Storages/MergeTree/StorageFromMergeTreeDataPart.h> #include <Storages/MergeTree/StorageFromMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataWriter.h> #include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/MutationCommands.h> #include <Storages/MutationCommands.h>
@ -182,7 +185,7 @@ static std::vector<ProjectionDescriptionRawPtr> getProjectionsForNewDataPart(
/// Return set of indices which should be recalculated during mutation also /// Return set of indices which should be recalculated during mutation also
/// wraps input stream into additional expression stream /// wraps input stream into additional expression stream
static std::set<MergeTreeIndexPtr> getIndicesToRecalculate( static std::set<MergeTreeIndexPtr> getIndicesToRecalculate(
BlockInputStreamPtr & input_stream, QueryPipeline & pipeline,
const NameSet & updated_columns, const NameSet & updated_columns,
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot,
ContextPtr context, ContextPtr context,
@ -234,9 +237,9 @@ static std::set<MergeTreeIndexPtr> getIndicesToRecalculate(
} }
} }
if (!indices_to_recalc.empty() && input_stream) if (!indices_to_recalc.empty() && pipeline.initialized())
{ {
auto indices_recalc_syntax = TreeRewriter(context).analyze(indices_recalc_expr_list, input_stream->getHeader().getNamesAndTypesList()); auto indices_recalc_syntax = TreeRewriter(context).analyze(indices_recalc_expr_list, pipeline.getHeader().getNamesAndTypesList());
auto indices_recalc_expr = ExpressionAnalyzer( auto indices_recalc_expr = ExpressionAnalyzer(
indices_recalc_expr_list, indices_recalc_expr_list,
indices_recalc_syntax, context).getActions(false); indices_recalc_syntax, context).getActions(false);
@ -246,8 +249,11 @@ static std::set<MergeTreeIndexPtr> getIndicesToRecalculate(
/// MutationsInterpreter which knows about skip indices and stream 'in' already has /// MutationsInterpreter which knows about skip indices and stream 'in' already has
/// all required columns. /// all required columns.
/// TODO move this logic to single place. /// TODO move this logic to single place.
input_stream = std::make_shared<MaterializingBlockInputStream>( QueryPipelineBuilder builder;
std::make_shared<ExpressionBlockInputStream>(input_stream, indices_recalc_expr)); builder.init(std::move(pipeline));
builder.addTransform(std::make_shared<ExpressionTransform>(builder.getHeader(), indices_recalc_expr));
builder.addTransform(std::make_shared<MaterializingTransform>(builder.getHeader()));
pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
} }
return indices_to_recalc; return indices_to_recalc;
} }
@ -500,7 +506,8 @@ struct MutationContext
std::unique_ptr<CurrentMetrics::Increment> num_mutations; std::unique_ptr<CurrentMetrics::Increment> num_mutations;
BlockInputStreamPtr mutating_stream{nullptr}; // in QueryPipeline mutating_pipeline; // in
std::unique_ptr<PullingPipelineExecutor> mutating_executor;
Block updated_header; Block updated_header;
std::unique_ptr<MutationsInterpreter> interpreter; std::unique_ptr<MutationsInterpreter> interpreter;
@ -795,24 +802,25 @@ void PartMergerWriter::prepare()
bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()
{ {
if (MutationHelpers::checkOperationIsNotCanceled(*ctx->merges_blocker, ctx->mutate_entry) && (block = ctx->mutating_stream->read())) Block cur_block;
if (MutationHelpers::checkOperationIsNotCanceled(*ctx->merges_blocker, ctx->mutate_entry) && ctx->mutating_executor->pull(cur_block))
{ {
if (ctx->minmax_idx) if (ctx->minmax_idx)
ctx->minmax_idx->update(block, ctx->data->getMinMaxColumnsNames(ctx->metadata_snapshot->getPartitionKey())); ctx->minmax_idx->update(cur_block, ctx->data->getMinMaxColumnsNames(ctx->metadata_snapshot->getPartitionKey()));
ctx->out->write(block); ctx->out->write(cur_block);
for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i) for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i)
{ {
const auto & projection = *ctx->projections_to_build[i]; const auto & projection = *ctx->projections_to_build[i];
auto projection_block = projection_squashes[i].add(projection.calculate(block, ctx->context)); auto projection_block = projection_squashes[i].add(projection.calculate(cur_block, ctx->context));
if (projection_block) if (projection_block)
projection_parts[projection.name].emplace_back(MergeTreeDataWriter::writeTempProjectionPart( projection_parts[projection.name].emplace_back(MergeTreeDataWriter::writeTempProjectionPart(
*ctx->data, ctx->log, projection_block, projection, ctx->new_data_part.get(), ++block_num)); *ctx->data, ctx->log, projection_block, projection, ctx->new_data_part.get(), ++block_num));
} }
(*ctx->mutate_entry)->rows_written += block.rows(); (*ctx->mutate_entry)->rows_written += cur_block.rows();
(*ctx->mutate_entry)->bytes_written_uncompressed += block.bytes(); (*ctx->mutate_entry)->bytes_written_uncompressed += cur_block.bytes();
/// Need execute again /// Need execute again
return true; return true;
@ -937,18 +945,25 @@ private:
auto skip_part_indices = MutationHelpers::getIndicesForNewDataPart(ctx->metadata_snapshot->getSecondaryIndices(), ctx->for_file_renames); auto skip_part_indices = MutationHelpers::getIndicesForNewDataPart(ctx->metadata_snapshot->getSecondaryIndices(), ctx->for_file_renames);
ctx->projections_to_build = MutationHelpers::getProjectionsForNewDataPart(ctx->metadata_snapshot->getProjections(), ctx->for_file_renames); ctx->projections_to_build = MutationHelpers::getProjectionsForNewDataPart(ctx->metadata_snapshot->getProjections(), ctx->for_file_renames);
if (ctx->mutating_stream == nullptr) if (!ctx->mutating_pipeline.initialized())
throw Exception("Cannot mutate part columns with uninitialized mutations stream. It's a bug", ErrorCodes::LOGICAL_ERROR); throw Exception("Cannot mutate part columns with uninitialized mutations stream. It's a bug", ErrorCodes::LOGICAL_ERROR);
QueryPipelineBuilder builder;
builder.init(std::move(ctx->mutating_pipeline));
if (ctx->metadata_snapshot->hasPrimaryKey() || ctx->metadata_snapshot->hasSecondaryIndices()) if (ctx->metadata_snapshot->hasPrimaryKey() || ctx->metadata_snapshot->hasSecondaryIndices())
ctx->mutating_stream = std::make_shared<MaterializingBlockInputStream>( {
std::make_shared<ExpressionBlockInputStream>(ctx->mutating_stream, ctx->data->getPrimaryKeyAndSkipIndicesExpression(ctx->metadata_snapshot))); builder.addTransform(
std::make_shared<ExpressionTransform>(builder.getHeader(), ctx->data->getPrimaryKeyAndSkipIndicesExpression(ctx->metadata_snapshot)));
builder.addTransform(std::make_shared<MaterializingTransform>(builder.getHeader()));
}
if (ctx->execute_ttl_type == ExecuteTTLType::NORMAL) if (ctx->execute_ttl_type == ExecuteTTLType::NORMAL)
ctx->mutating_stream = std::make_shared<TTLBlockInputStream>(ctx->mutating_stream, *ctx->data, ctx->metadata_snapshot, ctx->new_data_part, ctx->time_of_mutation, true); builder.addTransform(std::make_shared<TTLTransform>(builder.getHeader(), *ctx->data, ctx->metadata_snapshot, ctx->new_data_part, ctx->time_of_mutation, true));
if (ctx->execute_ttl_type == ExecuteTTLType::RECALCULATE) if (ctx->execute_ttl_type == ExecuteTTLType::RECALCULATE)
ctx->mutating_stream = std::make_shared<TTLCalcInputStream>(ctx->mutating_stream, *ctx->data, ctx->metadata_snapshot, ctx->new_data_part, ctx->time_of_mutation, true); builder.addTransform(std::make_shared<TTLCalcTransform>(builder.getHeader(), *ctx->data, ctx->metadata_snapshot, ctx->new_data_part, ctx->time_of_mutation, true));
ctx->minmax_idx = std::make_shared<IMergeTreeDataPart::MinMaxIndex>(); ctx->minmax_idx = std::make_shared<IMergeTreeDataPart::MinMaxIndex>();
@ -959,8 +974,8 @@ private:
skip_part_indices, skip_part_indices,
ctx->compression_codec); ctx->compression_codec);
ctx->mutating_stream->readPrefix(); ctx->mutating_pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
ctx->out->writePrefix(); ctx->mutating_executor = std::make_unique<PullingPipelineExecutor>(ctx->mutating_pipeline);
part_merger_writer_task = std::make_unique<PartMergerWriter>(ctx); part_merger_writer_task = std::make_unique<PartMergerWriter>(ctx);
} }
@ -969,7 +984,8 @@ private:
void finalize() void finalize()
{ {
ctx->new_data_part->minmax_idx = std::move(ctx->minmax_idx); ctx->new_data_part->minmax_idx = std::move(ctx->minmax_idx);
ctx->mutating_stream->readSuffix(); ctx->mutating_executor.reset();
ctx->mutating_pipeline.reset();
static_pointer_cast<MergedBlockOutputStream>(ctx->out)->writeSuffixAndFinalizePart(ctx->new_data_part, ctx->need_sync); static_pointer_cast<MergedBlockOutputStream>(ctx->out)->writeSuffixAndFinalizePart(ctx->new_data_part, ctx->need_sync);
} }
@ -1088,16 +1104,16 @@ private:
ctx->compression_codec = ctx->source_part->default_codec; ctx->compression_codec = ctx->source_part->default_codec;
if (ctx->mutating_stream) if (ctx->mutating_pipeline.initialized())
{ {
if (ctx->mutating_stream == nullptr) QueryPipelineBuilder builder;
throw Exception("Cannot mutate part columns with uninitialized mutations stream. It's a bug", ErrorCodes::LOGICAL_ERROR); builder.init(std::move(ctx->mutating_pipeline));
if (ctx->execute_ttl_type == ExecuteTTLType::NORMAL) if (ctx->execute_ttl_type == ExecuteTTLType::NORMAL)
ctx->mutating_stream = std::make_shared<TTLBlockInputStream>(ctx->mutating_stream, *ctx->data, ctx->metadata_snapshot, ctx->new_data_part, ctx->time_of_mutation, true); builder.addTransform(std::make_shared<TTLTransform>(builder.getHeader(), *ctx->data, ctx->metadata_snapshot, ctx->new_data_part, ctx->time_of_mutation, true));
if (ctx->execute_ttl_type == ExecuteTTLType::RECALCULATE) if (ctx->execute_ttl_type == ExecuteTTLType::RECALCULATE)
ctx->mutating_stream = std::make_shared<TTLCalcInputStream>(ctx->mutating_stream, *ctx->data, ctx->metadata_snapshot, ctx->new_data_part, ctx->time_of_mutation, true); builder.addTransform(std::make_shared<TTLCalcTransform>(builder.getHeader(), *ctx->data, ctx->metadata_snapshot, ctx->new_data_part, ctx->time_of_mutation, true));
ctx->out = std::make_shared<MergedColumnOnlyOutputStream>( ctx->out = std::make_shared<MergedColumnOnlyOutputStream>(
ctx->new_data_part, ctx->new_data_part,
@ -1110,8 +1126,9 @@ private:
&ctx->source_part->index_granularity_info &ctx->source_part->index_granularity_info
); );
ctx->mutating_stream->readPrefix(); ctx->mutating_pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
ctx->out->writePrefix(); ctx->mutating_executor = std::make_unique<PullingPipelineExecutor>(ctx->mutating_pipeline);
ctx->projections_to_build = std::vector<ProjectionDescriptionRawPtr>{ctx->projections_to_recalc.begin(), ctx->projections_to_recalc.end()}; ctx->projections_to_build = std::vector<ProjectionDescriptionRawPtr>{ctx->projections_to_recalc.begin(), ctx->projections_to_recalc.end()};
part_merger_writer_task = std::make_unique<PartMergerWriter>(ctx); part_merger_writer_task = std::make_unique<PartMergerWriter>(ctx);
@ -1121,9 +1138,10 @@ private:
void finalize() void finalize()
{ {
if (ctx->mutating_stream) if (ctx->mutating_executor)
{ {
ctx->mutating_stream->readSuffix(); ctx->mutating_executor.reset();
ctx->mutating_pipeline.reset();
auto changed_checksums = auto changed_checksums =
static_pointer_cast<MergedColumnOnlyOutputStream>(ctx->out)->writeSuffixAndGetChecksums( static_pointer_cast<MergedColumnOnlyOutputStream>(ctx->out)->writeSuffixAndGetChecksums(
@ -1269,9 +1287,9 @@ bool MutateTask::prepare()
ctx->materialized_indices = ctx->interpreter->grabMaterializedIndices(); ctx->materialized_indices = ctx->interpreter->grabMaterializedIndices();
ctx->materialized_projections = ctx->interpreter->grabMaterializedProjections(); ctx->materialized_projections = ctx->interpreter->grabMaterializedProjections();
ctx->mutation_kind = ctx->interpreter->getMutationKind(); ctx->mutation_kind = ctx->interpreter->getMutationKind();
ctx->mutating_stream = ctx->interpreter->execute(); ctx->mutating_pipeline = ctx->interpreter->execute();
ctx->updated_header = ctx->interpreter->getUpdatedHeader(); ctx->updated_header = ctx->interpreter->getUpdatedHeader();
ctx->mutating_stream->setProgressCallback(MergeProgressCallback((*ctx->mutate_entry)->ptr(), ctx->watch_prev_elapsed, *ctx->stage_progress)); ctx->mutating_pipeline.setProgressCallback(MergeProgressCallback((*ctx->mutate_entry)->ptr(), ctx->watch_prev_elapsed, *ctx->stage_progress));
} }
ctx->single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + ctx->future_part->name, ctx->space_reservation->getDisk(), 0); ctx->single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + ctx->future_part->name, ctx->space_reservation->getDisk(), 0);
@ -1301,7 +1319,7 @@ bool MutateTask::prepare()
ctx->need_sync = needSyncPart(ctx->source_part->rows_count, ctx->source_part->getBytesOnDisk(), *data_settings); ctx->need_sync = needSyncPart(ctx->source_part->rows_count, ctx->source_part->getBytesOnDisk(), *data_settings);
ctx->execute_ttl_type = ExecuteTTLType::NONE; ctx->execute_ttl_type = ExecuteTTLType::NONE;
if (ctx->mutating_stream) if (ctx->mutating_pipeline.initialized())
ctx->execute_ttl_type = MergeTreeDataMergerMutator::shouldExecuteTTL(ctx->metadata_snapshot, ctx->interpreter->getColumnDependencies()); ctx->execute_ttl_type = MergeTreeDataMergerMutator::shouldExecuteTTL(ctx->metadata_snapshot, ctx->interpreter->getColumnDependencies());
@ -1320,7 +1338,7 @@ bool MutateTask::prepare()
ctx->updated_columns.emplace(name_type.name); ctx->updated_columns.emplace(name_type.name);
ctx->indices_to_recalc = MutationHelpers::getIndicesToRecalculate( ctx->indices_to_recalc = MutationHelpers::getIndicesToRecalculate(
ctx->mutating_stream, ctx->updated_columns, ctx->metadata_snapshot, ctx->context, ctx->materialized_indices, ctx->source_part); ctx->mutating_pipeline, ctx->updated_columns, ctx->metadata_snapshot, ctx->context, ctx->materialized_indices, ctx->source_part);
ctx->projections_to_recalc = MutationHelpers::getProjectionsToRecalculate( ctx->projections_to_recalc = MutationHelpers::getProjectionsToRecalculate(
ctx->updated_columns, ctx->metadata_snapshot, ctx->materialized_projections, ctx->source_part); ctx->updated_columns, ctx->metadata_snapshot, ctx->materialized_projections, ctx->source_part);

View File

@ -18,6 +18,7 @@
#include <Compression/CompressedWriteBuffer.h> #include <Compression/CompressedWriteBuffer.h>
#include <Processors/Sources/SourceWithProgress.h> #include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Pipe.h> #include <Processors/Pipe.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Poco/String.h> /// toLower #include <Poco/String.h> /// toLower
@ -114,17 +115,16 @@ void StorageJoin::mutate(const MutationCommands & commands, ContextPtr context)
{ {
auto storage_ptr = DatabaseCatalog::instance().getTable(getStorageID(), context); auto storage_ptr = DatabaseCatalog::instance().getTable(getStorageID(), context);
auto interpreter = std::make_unique<MutationsInterpreter>(storage_ptr, metadata_snapshot, commands, context, true); auto interpreter = std::make_unique<MutationsInterpreter>(storage_ptr, metadata_snapshot, commands, context, true);
auto in = interpreter->execute(); auto pipeline = interpreter->execute();
in->readPrefix(); PullingPipelineExecutor executor(pipeline);
while (const Block & block = in->read()) Block block;
while (executor.pull(block))
{ {
new_data->addJoinedBlock(block, true); new_data->addJoinedBlock(block, true);
if (persistent) if (persistent)
backup_stream.write(block); backup_stream.write(block);
} }
in->readSuffix();
} }
/// Now acquire exclusive lock and modify storage. /// Now acquire exclusive lock and modify storage.

View File

@ -10,6 +10,7 @@
#include <Processors/Sources/SourceWithProgress.h> #include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Pipe.h> #include <Processors/Pipe.h>
#include <Processors/Sinks/SinkToStorage.h> #include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
namespace DB namespace DB
@ -263,11 +264,12 @@ void StorageMemory::mutate(const MutationCommands & commands, ContextPtr context
new_context->setSetting("max_threads", 1); new_context->setSetting("max_threads", 1);
auto interpreter = std::make_unique<MutationsInterpreter>(storage_ptr, metadata_snapshot, commands, new_context, true); auto interpreter = std::make_unique<MutationsInterpreter>(storage_ptr, metadata_snapshot, commands, new_context, true);
auto in = interpreter->execute(); auto pipeline = interpreter->execute();
PullingPipelineExecutor executor(pipeline);
in->readPrefix();
Blocks out; Blocks out;
while (Block block = in->read()) Block block;
while (executor.pull(block))
{ {
if (compress) if (compress)
for (auto & elem : block) for (auto & elem : block)
@ -275,7 +277,6 @@ void StorageMemory::mutate(const MutationCommands & commands, ContextPtr context
out.push_back(block); out.push_back(block);
} }
in->readSuffix();
std::unique_ptr<Blocks> new_data; std::unique_ptr<Blocks> new_data;

View File

@ -7139,7 +7139,6 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns, index_factory.getMany(metadata_snapshot->getSecondaryIndices()), compression_codec); MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns, index_factory.getMany(metadata_snapshot->getSecondaryIndices()), compression_codec);
bool sync_on_insert = settings->fsync_after_insert; bool sync_on_insert = settings->fsync_after_insert;
out.writePrefix();
out.write(block); out.write(block);
/// TODO(ab): What projections should we add to the empty part? How can we make sure that it /// TODO(ab): What projections should we add to the empty part? How can we make sure that it
/// won't block future merges? Perhaps we should also check part emptiness when selecting parts /// won't block future merges? Perhaps we should also check part emptiness when selecting parts