This commit is contained in:
Nikolai Kochetov 2021-10-06 20:59:27 +03:00
parent 6b619512aa
commit d0c6f11fcb
13 changed files with 197 additions and 158 deletions

View File

@ -8,40 +8,28 @@ namespace ErrorCodes
extern const int SET_SIZE_LIMIT_EXCEEDED;
}
DistinctSortedBlockInputStream::DistinctSortedBlockInputStream(
const BlockInputStreamPtr & input, SortDescription sort_description, const SizeLimits & set_size_limits_, UInt64 limit_hint_, const Names & columns)
: description(std::move(sort_description))
DistinctSortedTransform::DistinctSortedTransform(
const Block & header, SortDescription sort_description, const SizeLimits & set_size_limits_, UInt64 limit_hint_, const Names & columns)
: ISimpleTransform(header, header, true)
, description(std::move(sort_description))
, columns_names(columns)
, limit_hint(limit_hint_)
, set_size_limits(set_size_limits_)
{
children.push_back(input);
}
Block DistinctSortedBlockInputStream::readImpl()
void DistinctSortedTransform::transform(Chunk & chunk)
{
/// Execute until end of stream or until
/// a block with some new records will be gotten.
for (;;)
{
/// Stop reading if we already reached the limit.
if (limit_hint && data.getTotalRowCount() >= limit_hint)
return Block();
Block block = children.back()->read();
if (!block)
return Block();
const ColumnRawPtrs column_ptrs(getKeyColumns(block));
const ColumnRawPtrs column_ptrs(getKeyColumns(chunk));
if (column_ptrs.empty())
return block;
return;
const ColumnRawPtrs clearing_hint_columns(getClearingColumns(block, column_ptrs));
const ColumnRawPtrs clearing_hint_columns(getClearingColumns(chunk, column_ptrs));
if (data.type == ClearableSetVariants::Type::EMPTY)
data.init(ClearableSetVariants::chooseMethod(column_ptrs, key_sizes));
const size_t rows = block.rows();
const size_t rows = chunk.getNumRows();
IColumn::Filter filter(rows);
bool has_new_data = false;
@ -59,25 +47,33 @@ Block DistinctSortedBlockInputStream::readImpl()
/// Just go to the next block if there isn't any new record in the current one.
if (!has_new_data)
continue;
return;
if (!set_size_limits.check(data.getTotalRowCount(), data.getTotalByteCount(), "DISTINCT", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED))
return {};
prev_block.block = block;
prev_block.clearing_hint_columns = std::move(clearing_hint_columns);
size_t all_columns = block.columns();
for (size_t i = 0; i < all_columns; ++i)
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(filter, -1);
return block;
{
stopReading();
chunk.clear();
return;
}
/// Stop reading if we already reached the limit.
if (limit_hint && data.getTotalRowCount() >= limit_hint)
stopReading();
prev_chunk.chunk = std::move(chunk);
prev_chunk.clearing_hint_columns = std::move(clearing_hint_columns);
size_t all_columns = prev_chunk.chunk.getNumColumns();
Chunk res_chunk;
for (size_t i = 0; i < all_columns; ++i)
res_chunk.addColumn(prev_chunk.chunk.getColumns().at(i)->filter(filter, -1));
chunk = std::move(res_chunk);
}
template <typename Method>
bool DistinctSortedBlockInputStream::buildFilter(
bool DistinctSortedTransform::buildFilter(
Method & method,
const ColumnRawPtrs & columns,
const ColumnRawPtrs & clearing_hint_columns,
@ -90,8 +86,8 @@ bool DistinctSortedBlockInputStream::buildFilter(
/// Compare last row of previous block and first row of current block,
/// If rows not equal, we can clear HashSet,
/// If clearing_hint_columns is empty, we CAN'T clear HashSet.
if (!clearing_hint_columns.empty() && !prev_block.clearing_hint_columns.empty()
&& !rowsEqual(clearing_hint_columns, 0, prev_block.clearing_hint_columns, prev_block.block.rows() - 1))
if (!clearing_hint_columns.empty() && !prev_chunk.clearing_hint_columns.empty()
&& !rowsEqual(clearing_hint_columns, 0, prev_chunk.clearing_hint_columns, prev_chunk.chunk.getNumRows() - 1))
{
method.data.clear();
}
@ -117,18 +113,20 @@ bool DistinctSortedBlockInputStream::buildFilter(
return has_new_data;
}
ColumnRawPtrs DistinctSortedBlockInputStream::getKeyColumns(const Block & block) const
ColumnRawPtrs DistinctSortedTransform::getKeyColumns(const Chunk & chunk) const
{
size_t columns = columns_names.empty() ? block.columns() : columns_names.size();
size_t columns = columns_names.empty() ? chunk.getNumColumns() : columns_names.size();
ColumnRawPtrs column_ptrs;
column_ptrs.reserve(columns);
for (size_t i = 0; i < columns; ++i)
{
const auto & column = columns_names.empty()
? block.safeGetByPosition(i).column
: block.getByName(columns_names[i]).column;
auto pos = i;
if (!columns_names.empty())
pos = input.getHeader().getPositionByName(columns_names[i]);
const auto & column = chunk.getColumns()[pos];
/// Ignore all constant columns.
if (!isColumnConst(*column))
@ -138,13 +136,13 @@ ColumnRawPtrs DistinctSortedBlockInputStream::getKeyColumns(const Block & block)
return column_ptrs;
}
ColumnRawPtrs DistinctSortedBlockInputStream::getClearingColumns(const Block & block, const ColumnRawPtrs & key_columns) const
ColumnRawPtrs DistinctSortedTransform::getClearingColumns(const Chunk & chunk, const ColumnRawPtrs & key_columns) const
{
ColumnRawPtrs clearing_hint_columns;
clearing_hint_columns.reserve(description.size());
for (const auto & sort_column_description : description)
{
const auto * sort_column_ptr = block.safeGetByPosition(sort_column_description.column_number).column.get();
const auto * sort_column_ptr = chunk.getColumns().at(sort_column_description.column_number).get();
const auto it = std::find(key_columns.cbegin(), key_columns.cend(), sort_column_ptr);
if (it != key_columns.cend()) /// if found in key_columns
clearing_hint_columns.emplace_back(sort_column_ptr);
@ -154,7 +152,7 @@ ColumnRawPtrs DistinctSortedBlockInputStream::getClearingColumns(const Block & b
return clearing_hint_columns;
}
bool DistinctSortedBlockInputStream::rowsEqual(const ColumnRawPtrs & lhs, size_t n, const ColumnRawPtrs & rhs, size_t m)
bool DistinctSortedTransform::rowsEqual(const ColumnRawPtrs & lhs, size_t n, const ColumnRawPtrs & rhs, size_t m)
{
for (size_t column_index = 0, num_columns = lhs.size(); column_index < num_columns; ++column_index)
{

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Processors/ISimpleTransform.h>
#include <Interpreters/SetVariants.h>
#include <Core/SortDescription.h>
@ -18,24 +18,22 @@ namespace DB
* set limit_hint to non zero value. So we stop emitting new rows after
* count of already emitted rows will reach the limit_hint.
*/
class DistinctSortedBlockInputStream : public IBlockInputStream
class DistinctSortedTransform : public ISimpleTransform
{
public:
/// Empty columns_ means all columns.
DistinctSortedBlockInputStream(const BlockInputStreamPtr & input, SortDescription sort_description, const SizeLimits & set_size_limits_, UInt64 limit_hint_, const Names & columns);
DistinctSortedTransform(const Block & header, SortDescription sort_description, const SizeLimits & set_size_limits_, UInt64 limit_hint_, const Names & columns);
String getName() const override { return "DistinctSorted"; }
Block getHeader() const override { return children.at(0)->getHeader(); }
String getName() const override { return "DistinctSortedTransform"; }
protected:
Block readImpl() override;
void transform(Chunk & chunk) override;
private:
ColumnRawPtrs getKeyColumns(const Block & block) const;
ColumnRawPtrs getKeyColumns(const Chunk & chunk) const;
/// When clearing_columns changed, we can clean HashSet to memory optimization
/// clearing_columns is a left-prefix of SortDescription exists in key_columns
ColumnRawPtrs getClearingColumns(const Block & block, const ColumnRawPtrs & key_columns) const;
ColumnRawPtrs getClearingColumns(const Chunk & chunk, const ColumnRawPtrs & key_columns) const;
static bool rowsEqual(const ColumnRawPtrs & lhs, size_t n, const ColumnRawPtrs & rhs, size_t m);
/// return true if has new data
@ -50,12 +48,12 @@ private:
SortDescription description;
struct PreviousBlock
struct PreviousChunk
{
Block block;
Chunk chunk;
ColumnRawPtrs clearing_hint_columns;
};
PreviousBlock prev_block;
PreviousChunk prev_chunk;
Names columns_names;
ClearableSetVariants data;

View File

@ -16,18 +16,17 @@
namespace DB
{
TTLBlockInputStream::TTLBlockInputStream(
const BlockInputStreamPtr & input_,
TTLTransform::TTLTransform(
const Block & header_,
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const MergeTreeData::MutableDataPartPtr & data_part_,
time_t current_time_,
bool force_)
: data_part(data_part_)
, log(&Poco::Logger::get(storage_.getLogName() + " (TTLBlockInputStream)"))
: ISimpleTransform(header_, header_, false)
, data_part(data_part_)
, log(&Poco::Logger::get(storage_.getLogName() + " (TTLTransform)"))
{
children.push_back(input_);
header = children.at(0)->getHeader();
auto old_ttl_infos = data_part->ttl_infos;
if (metadata_snapshot_->hasRowsTTL())
@ -50,7 +49,7 @@ TTLBlockInputStream::TTLBlockInputStream(
for (const auto & group_by_ttl : metadata_snapshot_->getGroupByTTLs())
algorithms.emplace_back(std::make_unique<TTLAggregationAlgorithm>(
group_by_ttl, old_ttl_infos.group_by_ttl[group_by_ttl.result_column], current_time_, force_, header, storage_));
group_by_ttl, old_ttl_infos.group_by_ttl[group_by_ttl.result_column], current_time_, force_, getInputPort().getHeader(), storage_));
if (metadata_snapshot_->hasAnyColumnTTL())
{
@ -98,22 +97,28 @@ Block reorderColumns(Block block, const Block & header)
return res;
}
Block TTLBlockInputStream::readImpl()
void TTLTransform::transform(Chunk & chunk)
{
if (all_data_dropped)
return {};
{
stopReading();
chunk.clear();
return;
}
auto block = children.at(0)->read();
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
for (const auto & algorithm : algorithms)
algorithm->execute(block);
if (!block)
return block;
return;
return reorderColumns(std::move(block), header);
size_t num_rows = block.rows();
chunk = Chunk(reorderColumns(std::move(block), getOutputPort().getHeader()).getColumns(), num_rows);
}
void TTLBlockInputStream::readSuffixImpl()
void TTLTransform::finalize()
{
data_part->ttl_infos = {};
for (const auto & algorithm : algorithms)
@ -126,4 +131,13 @@ void TTLBlockInputStream::readSuffixImpl()
}
}
IProcessor::Status TTLTransform::prepare()
{
auto status = ISimpleTransform::prepare();
if (status == Status::Finished)
finalize();
return status;
}
}

View File

@ -1,5 +1,5 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Processors/ISimpleTransform.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Core/Block.h>
@ -12,11 +12,11 @@
namespace DB
{
class TTLBlockInputStream : public IBlockInputStream
class TTLTransform : public ISimpleTransform
{
public:
TTLBlockInputStream(
const BlockInputStreamPtr & input_,
TTLTransform(
const Block & header_,
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const MergeTreeData::MutableDataPartPtr & data_part_,
@ -25,13 +25,14 @@ public:
);
String getName() const override { return "TTL"; }
Block getHeader() const override { return header; }
Status prepare() override;
protected:
Block readImpl() override;
void transform(Chunk & chunk) override;
/// Finalizes ttl infos and updates data part
void readSuffixImpl() override;
void finalize();
private:
std::vector<TTLAlgorithmPtr> algorithms;
@ -41,7 +42,6 @@ private:
/// ttl_infos and empty_columns are updating while reading
const MergeTreeData::MutableDataPartPtr & data_part;
Poco::Logger * log;
Block header;
};
}

View File

@ -4,18 +4,17 @@
namespace DB
{
TTLCalcInputStream::TTLCalcInputStream(
const BlockInputStreamPtr & input_,
TTLCalcTransform::TTLCalcTransform(
const Block & header_,
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const MergeTreeData::MutableDataPartPtr & data_part_,
time_t current_time_,
bool force_)
: data_part(data_part_)
: ISimpleTransform(header_, header_, true)
, data_part(data_part_)
, log(&Poco::Logger::get(storage_.getLogName() + " (TTLCalcInputStream)"))
{
children.push_back(input_);
header = children.at(0)->getHeader();
auto old_ttl_infos = data_part->ttl_infos;
if (metadata_snapshot_->hasRowsTTL())
@ -51,27 +50,36 @@ TTLCalcInputStream::TTLCalcInputStream(
recompression_ttl, TTLUpdateField::RECOMPRESSION_TTL, recompression_ttl.result_column, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_));
}
Block TTLCalcInputStream::readImpl()
void TTLCalcTransform::transform(Chunk & chunk)
{
auto block = children.at(0)->read();
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
for (const auto & algorithm : algorithms)
algorithm->execute(block);
if (!block)
return block;
return;
Block res;
for (const auto & col : header)
res.insert(block.getByName(col.name));
Chunk res;
for (const auto & col : getOutputPort().getHeader())
res.addColumn(block.getByName(col.name).column);
return res;
chunk = std::move(res);
}
void TTLCalcInputStream::readSuffixImpl()
void TTLCalcTransform::finalize()
{
data_part->ttl_infos = {};
for (const auto & algorithm : algorithms)
algorithm->finalize(data_part);
}
IProcessor::Status TTLCalcTransform::prepare()
{
auto status = ISimpleTransform::prepare();
if (status == Status::Finished)
finalize();
return status;
}
}

View File

@ -1,5 +1,5 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Processors/ISimpleTransform.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Core/Block.h>
@ -11,11 +11,11 @@
namespace DB
{
class TTLCalcInputStream : public IBlockInputStream
class TTLCalcTransform : public ISimpleTransform
{
public:
TTLCalcInputStream(
const BlockInputStreamPtr & input_,
TTLCalcTransform(
const Block & header_,
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const MergeTreeData::MutableDataPartPtr & data_part_,
@ -24,13 +24,13 @@ public:
);
String getName() const override { return "TTL_CALC"; }
Block getHeader() const override { return header; }
Status prepare() override;
protected:
Block readImpl() override;
void transform(Chunk & chunk) override;
/// Finalizes ttl infos and updates data part
void readSuffixImpl() override;
void finalize();
private:
std::vector<TTLAlgorithmPtr> algorithms;
@ -38,7 +38,6 @@ private:
/// ttl_infos and empty_columns are updating while reading
const MergeTreeData::MutableDataPartPtr & data_part;
Poco::Logger * log;
Block header;
};
}

View File

@ -932,7 +932,7 @@ void MutationsInterpreter::validate()
auto pipeline = addStreamsForLaterStages(stages, plan);
}
BlockInputStreamPtr MutationsInterpreter::execute()
QueryPipeline MutationsInterpreter::execute()
{
if (!can_execute)
throw Exception("Cannot execute mutations interpreter because can_execute flag set to false", ErrorCodes::LOGICAL_ERROR);
@ -956,12 +956,11 @@ BlockInputStreamPtr MutationsInterpreter::execute()
}
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
BlockInputStreamPtr result_stream = std::make_shared<PipelineExecutingBlockInputStream>(std::move(pipeline));
if (!updated_header)
updated_header = std::make_unique<Block>(result_stream->getHeader());
updated_header = std::make_unique<Block>(pipeline.getHeader());
return result_stream;
return pipeline;
}
Block MutationsInterpreter::getUpdatedHeader() const

View File

@ -50,7 +50,7 @@ public:
size_t evaluateCommandsSize();
/// The resulting stream will return blocks containing only changed columns and columns, that we need to recalculate indices.
BlockInputStreamPtr execute();
QueryPipeline execute();
/// Only changed columns.
Block getUpdatedHeader() const;

View File

@ -11,6 +11,7 @@
#include "Storages/MergeTree/MergeTreeSequentialSource.h"
#include "Storages/MergeTree/FutureMergedMutatedPart.h"
#include "Processors/Transforms/ExpressionTransform.h"
#include "Processors/Transforms/MaterializingTransform.h"
#include "Processors/Merges/MergingSortedTransform.h"
#include "Processors/Merges/CollapsingSortedTransform.h"
#include "Processors/Merges/SummingSortedTransform.h"
@ -236,8 +237,6 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
ctx->compression_codec,
ctx->blocks_are_granules_size);
global_ctx->merged_stream->readPrefix();
global_ctx->rows_written = 0;
ctx->initial_reservation = global_ctx->space_reservation ? global_ctx->space_reservation->getSize() : 0;
@ -298,14 +297,17 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::execute()
bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl()
{
Block block;
if (!ctx->is_cancelled() && (block = global_ctx->merged_stream->read()))
if (!ctx->is_cancelled() && (global_ctx->merging_executor->pull(block)))
{
global_ctx->rows_written += block.rows();
const_cast<MergedBlockOutputStream &>(*global_ctx->to).write(block);
global_ctx->merge_list_element_ptr->rows_written = global_ctx->merged_stream->getProfileInfo().rows;
global_ctx->merge_list_element_ptr->bytes_written_uncompressed = global_ctx->merged_stream->getProfileInfo().bytes;
UInt64 result_rows = 0;
UInt64 result_bytes = 0;
global_ctx->merged_pipeline.tryGetResultRowsAndBytes(result_rows, result_bytes);
global_ctx->merge_list_element_ptr->rows_written = result_rows;
global_ctx->merge_list_element_ptr->bytes_written_uncompressed = result_bytes;
/// Reservation updates is not performed yet, during the merge it may lead to higher free space requirements
if (global_ctx->space_reservation && ctx->sum_input_rows_upper_bound)
@ -323,8 +325,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl()
return true;
}
global_ctx->merged_stream->readSuffix();
global_ctx->merged_stream.reset();
global_ctx->merging_executor.reset();
global_ctx->merged_pipeline.reset();
if (global_ctx->merges_blocker->isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
@ -799,26 +801,25 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
auto res_pipe = Pipe::unitePipes(std::move(pipes));
res_pipe.addTransform(std::move(merged_transform));
QueryPipeline pipeline(std::move(res_pipe));
pipeline.setNumThreads(1);
global_ctx->merged_stream = std::make_shared<PipelineExecutingBlockInputStream>(std::move(pipeline));
if (global_ctx->deduplicate)
global_ctx->merged_stream = std::make_shared<DistinctSortedBlockInputStream>(
global_ctx->merged_stream, sort_description, SizeLimits(), 0 /*limit_hint*/, global_ctx->deduplicate_by_columns);
res_pipe.addTransform(std::make_shared<DistinctSortedTransform>(
res_pipe.getHeader(), sort_description, SizeLimits(), 0 /*limit_hint*/, global_ctx->deduplicate_by_columns));
if (ctx->need_remove_expired_values)
global_ctx->merged_stream = std::make_shared<TTLBlockInputStream>(
global_ctx->merged_stream, *global_ctx->data, global_ctx->metadata_snapshot, global_ctx->new_data_part, global_ctx->time_of_merge, ctx->force_ttl);
res_pipe.addTransform(std::make_shared<TTLTransform>(
res_pipe.getHeader(), *global_ctx->data, global_ctx->metadata_snapshot, global_ctx->new_data_part, global_ctx->time_of_merge, ctx->force_ttl));
if (global_ctx->metadata_snapshot->hasSecondaryIndices())
{
const auto & indices = global_ctx->metadata_snapshot->getSecondaryIndices();
global_ctx->merged_stream = std::make_shared<ExpressionBlockInputStream>(
global_ctx->merged_stream, indices.getSingleExpressionForIndices(global_ctx->metadata_snapshot->getColumns(), global_ctx->data->getContext()));
global_ctx->merged_stream = std::make_shared<MaterializingBlockInputStream>(global_ctx->merged_stream);
res_pipe.addTransform(std::make_shared<ExpressionTransform>(
res_pipe.getHeader(), indices.getSingleExpressionForIndices(global_ctx->metadata_snapshot->getColumns(), global_ctx->data->getContext())));
res_pipe.addTransform(std::make_shared<MaterializingTransform>(res_pipe.getHeader()));
}
global_ctx->merged_pipeline = QueryPipeline(std::move(res_pipe));
global_ctx->merging_executor = std::make_unique<PullingPipelineExecutor>(global_ctx->merged_pipeline);
}

View File

@ -148,7 +148,8 @@ private:
std::unique_ptr<MergeStageProgress> column_progress{nullptr};
std::shared_ptr<MergedBlockOutputStream> to{nullptr};
BlockInputStreamPtr merged_stream{nullptr};
QueryPipeline merged_pipeline;
std::unique_ptr<PullingPipelineExecutor> merging_executor;
SyncGuardPtr sync_guard{nullptr};
MergeTreeData::MutableDataPartPtr new_data_part{nullptr};

View File

@ -11,6 +11,9 @@
#include <DataStreams/SquashingBlockInputStream.h>
#include <Parsers/queryToString.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Storages/MergeTree/StorageFromMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/MutationCommands.h>
@ -182,7 +185,7 @@ static std::vector<ProjectionDescriptionRawPtr> getProjectionsForNewDataPart(
/// Return set of indices which should be recalculated during mutation also
/// wraps input stream into additional expression stream
static std::set<MergeTreeIndexPtr> getIndicesToRecalculate(
BlockInputStreamPtr & input_stream,
QueryPipeline & pipeline,
const NameSet & updated_columns,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
@ -234,9 +237,9 @@ static std::set<MergeTreeIndexPtr> getIndicesToRecalculate(
}
}
if (!indices_to_recalc.empty() && input_stream)
if (!indices_to_recalc.empty() && pipeline.initialized())
{
auto indices_recalc_syntax = TreeRewriter(context).analyze(indices_recalc_expr_list, input_stream->getHeader().getNamesAndTypesList());
auto indices_recalc_syntax = TreeRewriter(context).analyze(indices_recalc_expr_list, pipeline.getHeader().getNamesAndTypesList());
auto indices_recalc_expr = ExpressionAnalyzer(
indices_recalc_expr_list,
indices_recalc_syntax, context).getActions(false);
@ -246,8 +249,11 @@ static std::set<MergeTreeIndexPtr> getIndicesToRecalculate(
/// MutationsInterpreter which knows about skip indices and stream 'in' already has
/// all required columns.
/// TODO move this logic to single place.
input_stream = std::make_shared<MaterializingBlockInputStream>(
std::make_shared<ExpressionBlockInputStream>(input_stream, indices_recalc_expr));
QueryPipelineBuilder builder;
builder.init(std::move(pipeline));
builder.addTransform(std::make_shared<ExpressionTransform>(builder.getHeader(), indices_recalc_expr));
builder.addTransform(std::make_shared<MaterializingTransform>(builder.getHeader()));
pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
}
return indices_to_recalc;
}
@ -500,7 +506,8 @@ struct MutationContext
std::unique_ptr<CurrentMetrics::Increment> num_mutations;
BlockInputStreamPtr mutating_stream{nullptr}; // in
QueryPipeline mutating_pipeline; // in
std::unique_ptr<PullingPipelineExecutor> mutating_executor;
Block updated_header;
std::unique_ptr<MutationsInterpreter> interpreter;
@ -795,24 +802,25 @@ void PartMergerWriter::prepare()
bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()
{
if (MutationHelpers::checkOperationIsNotCanceled(*ctx->merges_blocker, ctx->mutate_entry) && (block = ctx->mutating_stream->read()))
Block cur_block;
if (MutationHelpers::checkOperationIsNotCanceled(*ctx->merges_blocker, ctx->mutate_entry) && ctx->mutating_executor->pull(cur_block))
{
if (ctx->minmax_idx)
ctx->minmax_idx->update(block, ctx->data->getMinMaxColumnsNames(ctx->metadata_snapshot->getPartitionKey()));
ctx->minmax_idx->update(cur_block, ctx->data->getMinMaxColumnsNames(ctx->metadata_snapshot->getPartitionKey()));
ctx->out->write(block);
ctx->out->write(cur_block);
for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i)
{
const auto & projection = *ctx->projections_to_build[i];
auto projection_block = projection_squashes[i].add(projection.calculate(block, ctx->context));
auto projection_block = projection_squashes[i].add(projection.calculate(cur_block, ctx->context));
if (projection_block)
projection_parts[projection.name].emplace_back(MergeTreeDataWriter::writeTempProjectionPart(
*ctx->data, ctx->log, projection_block, projection, ctx->new_data_part.get(), ++block_num));
}
(*ctx->mutate_entry)->rows_written += block.rows();
(*ctx->mutate_entry)->bytes_written_uncompressed += block.bytes();
(*ctx->mutate_entry)->rows_written += cur_block.rows();
(*ctx->mutate_entry)->bytes_written_uncompressed += cur_block.bytes();
/// Need execute again
return true;
@ -937,18 +945,25 @@ private:
auto skip_part_indices = MutationHelpers::getIndicesForNewDataPart(ctx->metadata_snapshot->getSecondaryIndices(), ctx->for_file_renames);
ctx->projections_to_build = MutationHelpers::getProjectionsForNewDataPart(ctx->metadata_snapshot->getProjections(), ctx->for_file_renames);
if (ctx->mutating_stream == nullptr)
if (!ctx->mutating_pipeline.initialized())
throw Exception("Cannot mutate part columns with uninitialized mutations stream. It's a bug", ErrorCodes::LOGICAL_ERROR);
QueryPipelineBuilder builder;
builder.init(std::move(ctx->mutating_pipeline));
if (ctx->metadata_snapshot->hasPrimaryKey() || ctx->metadata_snapshot->hasSecondaryIndices())
ctx->mutating_stream = std::make_shared<MaterializingBlockInputStream>(
std::make_shared<ExpressionBlockInputStream>(ctx->mutating_stream, ctx->data->getPrimaryKeyAndSkipIndicesExpression(ctx->metadata_snapshot)));
{
builder.addTransform(
std::make_shared<ExpressionTransform>(builder.getHeader(), ctx->data->getPrimaryKeyAndSkipIndicesExpression(ctx->metadata_snapshot)));
builder.addTransform(std::make_shared<MaterializingTransform>(builder.getHeader()));
}
if (ctx->execute_ttl_type == ExecuteTTLType::NORMAL)
ctx->mutating_stream = std::make_shared<TTLBlockInputStream>(ctx->mutating_stream, *ctx->data, ctx->metadata_snapshot, ctx->new_data_part, ctx->time_of_mutation, true);
builder.addTransform(std::make_shared<TTLTransform>(builder.getHeader(), *ctx->data, ctx->metadata_snapshot, ctx->new_data_part, ctx->time_of_mutation, true));
if (ctx->execute_ttl_type == ExecuteTTLType::RECALCULATE)
ctx->mutating_stream = std::make_shared<TTLCalcInputStream>(ctx->mutating_stream, *ctx->data, ctx->metadata_snapshot, ctx->new_data_part, ctx->time_of_mutation, true);
builder.addTransform(std::make_shared<TTLCalcTransform>(builder.getHeader(), *ctx->data, ctx->metadata_snapshot, ctx->new_data_part, ctx->time_of_mutation, true));
ctx->minmax_idx = std::make_shared<IMergeTreeDataPart::MinMaxIndex>();
@ -959,7 +974,8 @@ private:
skip_part_indices,
ctx->compression_codec);
ctx->mutating_stream->readPrefix();
ctx->mutating_pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
ctx->mutating_executor = std::make_unique<PullingPipelineExecutor>(ctx->mutating_pipeline);
part_merger_writer_task = std::make_unique<PartMergerWriter>(ctx);
}
@ -968,7 +984,8 @@ private:
void finalize()
{
ctx->new_data_part->minmax_idx = std::move(ctx->minmax_idx);
ctx->mutating_stream->readSuffix();
ctx->mutating_executor.reset();
ctx->mutating_pipeline.reset();
static_pointer_cast<MergedBlockOutputStream>(ctx->out)->writeSuffixAndFinalizePart(ctx->new_data_part, ctx->need_sync);
}
@ -1087,16 +1104,16 @@ private:
ctx->compression_codec = ctx->source_part->default_codec;
if (ctx->mutating_stream)
if (ctx->mutating_pipeline.initialized())
{
if (ctx->mutating_stream == nullptr)
throw Exception("Cannot mutate part columns with uninitialized mutations stream. It's a bug", ErrorCodes::LOGICAL_ERROR);
QueryPipelineBuilder builder;
builder.init(std::move(ctx->mutating_pipeline));
if (ctx->execute_ttl_type == ExecuteTTLType::NORMAL)
ctx->mutating_stream = std::make_shared<TTLBlockInputStream>(ctx->mutating_stream, *ctx->data, ctx->metadata_snapshot, ctx->new_data_part, ctx->time_of_mutation, true);
builder.addTransform(std::make_shared<TTLTransform>(builder.getHeader(), *ctx->data, ctx->metadata_snapshot, ctx->new_data_part, ctx->time_of_mutation, true));
if (ctx->execute_ttl_type == ExecuteTTLType::RECALCULATE)
ctx->mutating_stream = std::make_shared<TTLCalcInputStream>(ctx->mutating_stream, *ctx->data, ctx->metadata_snapshot, ctx->new_data_part, ctx->time_of_mutation, true);
builder.addTransform(std::make_shared<TTLCalcTransform>(builder.getHeader(), *ctx->data, ctx->metadata_snapshot, ctx->new_data_part, ctx->time_of_mutation, true));
ctx->out = std::make_shared<MergedColumnOnlyOutputStream>(
ctx->new_data_part,
@ -1109,7 +1126,9 @@ private:
&ctx->source_part->index_granularity_info
);
ctx->mutating_stream->readPrefix();
ctx->mutating_pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
ctx->mutating_executor = std::make_unique<PullingPipelineExecutor>(ctx->mutating_pipeline);
ctx->projections_to_build = std::vector<ProjectionDescriptionRawPtr>{ctx->projections_to_recalc.begin(), ctx->projections_to_recalc.end()};
part_merger_writer_task = std::make_unique<PartMergerWriter>(ctx);
@ -1119,9 +1138,10 @@ private:
void finalize()
{
if (ctx->mutating_stream)
if (ctx->mutating_executor)
{
ctx->mutating_stream->readSuffix();
ctx->mutating_executor.reset();
ctx->mutating_pipeline.reset();
auto changed_checksums =
static_pointer_cast<MergedColumnOnlyOutputStream>(ctx->out)->writeSuffixAndGetChecksums(
@ -1267,9 +1287,9 @@ bool MutateTask::prepare()
ctx->materialized_indices = ctx->interpreter->grabMaterializedIndices();
ctx->materialized_projections = ctx->interpreter->grabMaterializedProjections();
ctx->mutation_kind = ctx->interpreter->getMutationKind();
ctx->mutating_stream = ctx->interpreter->execute();
ctx->mutating_pipeline = ctx->interpreter->execute();
ctx->updated_header = ctx->interpreter->getUpdatedHeader();
ctx->mutating_stream->setProgressCallback(MergeProgressCallback((*ctx->mutate_entry)->ptr(), ctx->watch_prev_elapsed, *ctx->stage_progress));
ctx->mutating_pipeline.setProgressCallback(MergeProgressCallback((*ctx->mutate_entry)->ptr(), ctx->watch_prev_elapsed, *ctx->stage_progress));
}
ctx->single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + ctx->future_part->name, ctx->space_reservation->getDisk(), 0);
@ -1299,7 +1319,7 @@ bool MutateTask::prepare()
ctx->need_sync = needSyncPart(ctx->source_part->rows_count, ctx->source_part->getBytesOnDisk(), *data_settings);
ctx->execute_ttl_type = ExecuteTTLType::NONE;
if (ctx->mutating_stream)
if (ctx->mutating_pipeline.initialized())
ctx->execute_ttl_type = MergeTreeDataMergerMutator::shouldExecuteTTL(ctx->metadata_snapshot, ctx->interpreter->getColumnDependencies());
@ -1318,7 +1338,7 @@ bool MutateTask::prepare()
ctx->updated_columns.emplace(name_type.name);
ctx->indices_to_recalc = MutationHelpers::getIndicesToRecalculate(
ctx->mutating_stream, ctx->updated_columns, ctx->metadata_snapshot, ctx->context, ctx->materialized_indices, ctx->source_part);
ctx->mutating_pipeline, ctx->updated_columns, ctx->metadata_snapshot, ctx->context, ctx->materialized_indices, ctx->source_part);
ctx->projections_to_recalc = MutationHelpers::getProjectionsToRecalculate(
ctx->updated_columns, ctx->metadata_snapshot, ctx->materialized_projections, ctx->source_part);

View File

@ -18,6 +18,7 @@
#include <Compression/CompressedWriteBuffer.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Pipe.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Poco/String.h> /// toLower
@ -114,17 +115,16 @@ void StorageJoin::mutate(const MutationCommands & commands, ContextPtr context)
{
auto storage_ptr = DatabaseCatalog::instance().getTable(getStorageID(), context);
auto interpreter = std::make_unique<MutationsInterpreter>(storage_ptr, metadata_snapshot, commands, context, true);
auto in = interpreter->execute();
in->readPrefix();
auto pipeline = interpreter->execute();
PullingPipelineExecutor executor(pipeline);
while (const Block & block = in->read())
Block block;
while (executor.pull(block))
{
new_data->addJoinedBlock(block, true);
if (persistent)
backup_stream.write(block);
}
in->readSuffix();
}
/// Now acquire exclusive lock and modify storage.

View File

@ -10,6 +10,7 @@
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Pipe.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
namespace DB
@ -263,11 +264,12 @@ void StorageMemory::mutate(const MutationCommands & commands, ContextPtr context
new_context->setSetting("max_threads", 1);
auto interpreter = std::make_unique<MutationsInterpreter>(storage_ptr, metadata_snapshot, commands, new_context, true);
auto in = interpreter->execute();
auto pipeline = interpreter->execute();
PullingPipelineExecutor executor(pipeline);
in->readPrefix();
Blocks out;
while (Block block = in->read())
Block block;
while (executor.pull(block))
{
if (compress)
for (auto & elem : block)
@ -275,7 +277,6 @@ void StorageMemory::mutate(const MutationCommands & commands, ContextPtr context
out.push_back(block);
}
in->readSuffix();
std::unique_ptr<Blocks> new_data;