Merge pull request #44653 from ClickHouse/custom-reading-for-mutation

Custom reading for mutation
This commit is contained in:
Nikolai Kochetov 2023-01-10 12:16:24 +01:00 committed by GitHub
commit 11418963c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 539 additions and 142 deletions

View File

@ -377,9 +377,6 @@ private:
inline static ContextPtr global_context_instance;
/// A flag, used to mark if reader needs to apply deleted rows mask.
bool apply_deleted_mask = true;
/// Temporary data for query execution accounting.
TemporaryDataOnDiskScopePtr temp_data_on_disk;
public:
@ -973,9 +970,6 @@ public:
bool isInternalQuery() const { return is_internal_query; }
void setInternalQuery(bool internal) { is_internal_query = internal; }
bool applyDeletedMask() const { return apply_deleted_mask; }
void setApplyDeletedMask(bool apply) { apply_deleted_mask = apply; }
ActionLocksManagerPtr getActionLocksManager() const;
enum class ApplicationType

View File

@ -30,6 +30,9 @@
#include <DataTypes/NestedUtils.h>
#include <Interpreters/PreparedSets.h>
#include <Storages/LightweightDeleteDescription.h>
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
#include <Processors/Sources/ThrowingExceptionSource.h>
namespace DB
@ -190,7 +193,8 @@ ColumnDependencies getAllColumnDependencies(const StorageMetadataPtr & metadata_
bool isStorageTouchedByMutations(
const StoragePtr & storage,
MergeTreeData & storage,
MergeTreeData::DataPartPtr source_part,
const StorageMetadataPtr & metadata_snapshot,
const std::vector<MutationCommand> & commands,
ContextMutablePtr context_copy)
@ -199,19 +203,15 @@ bool isStorageTouchedByMutations(
return false;
bool all_commands_can_be_skipped = true;
auto storage_from_merge_tree_data_part = std::dynamic_pointer_cast<StorageFromMergeTreeDataPart>(storage);
for (const MutationCommand & command : commands)
{
if (!command.predicate) /// The command touches all rows.
return true;
if (command.partition && !storage_from_merge_tree_data_part)
throw Exception("ALTER UPDATE/DELETE ... IN PARTITION is not supported for non-MergeTree tables", ErrorCodes::NOT_IMPLEMENTED);
if (command.partition && storage_from_merge_tree_data_part)
if (command.partition)
{
const String partition_id = storage_from_merge_tree_data_part->getPartitionIDFromQuery(command.partition, context_copy);
if (partition_id == storage_from_merge_tree_data_part->getPartitionId())
const String partition_id = storage.getPartitionIDFromQuery(command.partition, context_copy);
if (partition_id == source_part->info.partition_id)
all_commands_can_be_skipped = false;
}
else
@ -229,13 +229,15 @@ bool isStorageTouchedByMutations(
context_copy->setSetting("allow_asynchronous_read_from_io_pool_for_merge_tree", false);
context_copy->setSetting("max_streams_for_merge_tree_reading", Field(0));
ASTPtr select_query = prepareQueryAffectedAST(commands, storage, context_copy);
ASTPtr select_query = prepareQueryAffectedAST(commands, storage.shared_from_this(), context_copy);
auto storage_from_part = std::make_shared<StorageFromMergeTreeDataPart>(source_part);
/// Interpreter must be alive, when we use result of execute() method.
/// For some reason it may copy context and give it into ExpressionTransform
/// after that we will use context from destroyed stack frame in our stream.
InterpreterSelectQuery interpreter(
select_query, context_copy, storage, metadata_snapshot, SelectQueryOptions().ignoreLimits().ignoreProjections());
select_query, context_copy, storage_from_part, metadata_snapshot, SelectQueryOptions().ignoreLimits().ignoreProjections());
auto io = interpreter.execute();
PullingPipelineExecutor executor(io.pipeline);
@ -288,6 +290,57 @@ ASTPtr getPartitionAndPredicateExpressionForMutationCommand(
return command.predicate ? command.predicate->clone() : partition_predicate_as_ast_func;
}
MutationsInterpreter::Source::Source(StoragePtr storage_) : storage(std::move(storage_))
{
}
MutationsInterpreter::Source::Source(MergeTreeData & storage_, MergeTreeData::DataPartPtr source_part_)
: data(&storage_), part(std::move(source_part_))
{
}
StorageSnapshotPtr MutationsInterpreter::Source::getStorageSnapshot(const StorageMetadataPtr & snapshot_, const ContextPtr & context_) const
{
if (data)
return data->getStorageSnapshot(snapshot_, context_);
return storage->getStorageSnapshot(snapshot_, context_);
}
StoragePtr MutationsInterpreter::Source::getStorage() const
{
if (data)
return data->shared_from_this();
return storage;
}
const MergeTreeData * MutationsInterpreter::Source::getMergeTreeData() const
{
if (data)
return data;
return dynamic_cast<const MergeTreeData *>(storage.get());
}
bool MutationsInterpreter::Source::supportsLightweightDelete() const
{
if (part)
return part->supportLightweightDeleteMutate();
return storage->supportsLightweightDelete();
}
bool MutationsInterpreter::Source::hasLightweightDeleteMask() const
{
return part && part->hasLightweightDelete();
}
bool MutationsInterpreter::Source::materializeTTLRecalculateOnly() const
{
return data && data->getSettings()->materialize_ttl_recalculate_only;
}
MutationsInterpreter::MutationsInterpreter(
StoragePtr storage_,
@ -297,7 +350,45 @@ MutationsInterpreter::MutationsInterpreter(
bool can_execute_,
bool return_all_columns_,
bool return_deleted_rows_)
: storage(std::move(storage_))
: MutationsInterpreter(
Source(std::move(storage_)),
metadata_snapshot_, std::move(commands_), std::move(context_),
can_execute_, return_all_columns_, return_deleted_rows_)
{
if (can_execute_ && dynamic_cast<const MergeTreeData *>(source.getStorage().get()))
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot execute mutation for {}. Mutation should be applied to every part separately.",
source.getStorage()->getName());
}
}
MutationsInterpreter::MutationsInterpreter(
MergeTreeData & storage_,
MergeTreeData::DataPartPtr source_part_,
const StorageMetadataPtr & metadata_snapshot_,
MutationCommands commands_,
ContextPtr context_,
bool can_execute_,
bool return_all_columns_,
bool return_deleted_rows_)
: MutationsInterpreter(
Source(storage_, std::move(source_part_)),
metadata_snapshot_, std::move(commands_), std::move(context_),
can_execute_, return_all_columns_, return_deleted_rows_)
{
}
MutationsInterpreter::MutationsInterpreter(
Source source_,
const StorageMetadataPtr & metadata_snapshot_,
MutationCommands commands_,
ContextPtr context_,
bool can_execute_,
bool return_all_columns_,
bool return_deleted_rows_)
: source(std::move(source_))
, metadata_snapshot(metadata_snapshot_)
, commands(std::move(commands_))
, context(Context::createCopy(context_))
@ -306,12 +397,12 @@ MutationsInterpreter::MutationsInterpreter(
, return_all_columns(return_all_columns_)
, return_deleted_rows(return_deleted_rows_)
{
mutation_ast = prepare(!can_execute);
prepare(!can_execute);
}
static NameSet getKeyColumns(const StoragePtr & storage, const StorageMetadataPtr & metadata_snapshot)
static NameSet getKeyColumns(const MutationsInterpreter::Source & source, const StorageMetadataPtr & metadata_snapshot)
{
const MergeTreeData * merge_tree_data = dynamic_cast<const MergeTreeData *>(storage.get());
const MergeTreeData * merge_tree_data = source.getMergeTreeData();
if (!merge_tree_data)
return {};
@ -333,21 +424,12 @@ static NameSet getKeyColumns(const StoragePtr & storage, const StorageMetadataPt
return key_columns;
}
static bool materializeTTLRecalculateOnly(const StoragePtr & storage)
{
auto storage_from_merge_tree_data_part = std::dynamic_pointer_cast<StorageFromMergeTreeDataPart>(storage);
if (!storage_from_merge_tree_data_part)
return false;
return storage_from_merge_tree_data_part->materializeTTLRecalculateOnly();
}
static void validateUpdateColumns(
const StoragePtr & storage,
const MutationsInterpreter::Source & source,
const StorageMetadataPtr & metadata_snapshot, const NameSet & updated_columns,
const std::unordered_map<String, Names> & column_to_affected_materialized)
{
NameSet key_columns = getKeyColumns(storage, metadata_snapshot);
NameSet key_columns = getKeyColumns(source, metadata_snapshot);
for (const String & column_name : updated_columns)
{
@ -364,7 +446,7 @@ static void validateUpdateColumns(
/// Allow to override value of lightweight delete filter virtual column
if (!found && column_name == LightweightDeleteDescription::FILTER_COLUMN.name)
{
if (!storage->supportsLightweightDelete())
if (!source.supportsLightweightDelete())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Lightweight delete is not supported for table");
found = true;
}
@ -427,7 +509,7 @@ static std::optional<std::vector<ASTPtr>> getExpressionsOfUpdatedNestedSubcolumn
return res;
}
ASTPtr MutationsInterpreter::prepare(bool dry_run)
void MutationsInterpreter::prepare(bool dry_run)
{
if (is_prepared)
throw Exception("MutationsInterpreter is already prepared. It is a bug.", ErrorCodes::LOGICAL_ERROR);
@ -448,7 +530,7 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
}
NameSet updated_columns;
bool materialize_ttl_recalculate_only = materializeTTLRecalculateOnly(storage);
bool materialize_ttl_recalculate_only = source.materializeTTLRecalculateOnly();
for (const MutationCommand & command : commands)
{
@ -481,7 +563,7 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
}
}
validateUpdateColumns(storage, metadata_snapshot, updated_columns, column_to_affected_materialized);
validateUpdateColumns(source, metadata_snapshot, updated_columns, column_to_affected_materialized);
}
dependencies = getAllColumnDependencies(metadata_snapshot, updated_columns);
@ -778,15 +860,10 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
stages_copy.back().filters = stage.filters;
}
const ASTPtr select_query = prepareInterpreterSelectQuery(stages_copy, /* dry_run = */ true);
InterpreterSelectQuery interpreter{
select_query, context, storage, metadata_snapshot,
SelectQueryOptions().analyze(/* dry_run = */ false).ignoreLimits().ignoreProjections()};
prepareMutationStages(stages_copy, true);
auto first_stage_header = interpreter.getSampleBlock();
QueryPlan plan;
auto source = std::make_shared<NullSource>(first_stage_header);
plan.addStep(std::make_unique<ReadFromPreparedSource>(Pipe(std::move(source))));
initQueryPlan(stages_copy.front(), plan);
auto pipeline = addStreamsForLaterStages(stages_copy, plan);
updated_header = std::make_unique<Block>(pipeline.getHeader());
}
@ -801,21 +878,18 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
is_prepared = true;
return prepareInterpreterSelectQuery(stages, dry_run);
prepareMutationStages(stages, dry_run);
}
ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector<Stage> & prepared_stages, bool dry_run)
void MutationsInterpreter::prepareMutationStages(std::vector<Stage> & prepared_stages, bool dry_run)
{
auto storage_snapshot = storage->getStorageSnapshot(metadata_snapshot, context);
auto storage_snapshot = source.getStorageSnapshot(metadata_snapshot, context);
auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects();
auto all_columns = storage_snapshot->getColumns(options);
/// Add _row_exists column if it is present in the part
if (auto part_storage = dynamic_pointer_cast<DB::StorageFromMergeTreeDataPart>(storage))
{
if (part_storage->hasLightweightDeletedMask())
all_columns.push_back({LightweightDeleteDescription::FILTER_COLUMN});
}
if (source.hasLightweightDeleteMask())
all_columns.push_back({LightweightDeleteDescription::FILTER_COLUMN});
/// Next, for each stage calculate columns changed by this and previous stages.
for (size_t i = 0; i < prepared_stages.size(); ++i)
@ -839,7 +913,7 @@ ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector<Stage> &
/// Now, calculate `expressions_chain` for each stage except the first.
/// Do it backwards to propagate information about columns required as input for a stage to the previous stage.
for (size_t i = prepared_stages.size() - 1; i > 0; --i)
for (int64_t i = prepared_stages.size() - 1; i >= 0; --i)
{
auto & stage = prepared_stages[i];
@ -859,7 +933,7 @@ ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector<Stage> &
/// e.g. ALTER referencing the same table in scalar subquery
bool execute_scalar_subqueries = !dry_run;
auto syntax_result = TreeRewriter(context).analyze(
all_asts, all_columns, storage, storage_snapshot,
all_asts, all_columns, source.getStorage(), storage_snapshot,
false, true, execute_scalar_subqueries);
if (execute_scalar_subqueries && context->hasQueryContext())
@ -897,6 +971,9 @@ ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector<Stage> &
}
}
if (i == 0 && actions_chain.steps.empty())
actions_chain.lastStep(syntax_result->required_source_columns);
/// Remove all intermediate columns.
actions_chain.addStep();
actions_chain.getLastStep().required_output.clear();
@ -908,49 +985,198 @@ ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector<Stage> &
actions_chain.finalize();
/// Propagate information about columns needed as input.
for (const auto & column : actions_chain.steps.front()->getRequiredColumns())
prepared_stages[i - 1].output_columns.insert(column.name);
}
/// Execute first stage as a SELECT statement.
auto select = std::make_shared<ASTSelectQuery>();
select->setExpression(ASTSelectQuery::Expression::SELECT, std::make_shared<ASTExpressionList>());
for (const auto & column_name : prepared_stages[0].output_columns)
select->select()->children.push_back(std::make_shared<ASTIdentifier>(column_name));
/// Don't let select list be empty.
if (select->select()->children.empty())
select->select()->children.push_back(std::make_shared<ASTLiteral>(Field(0)));
if (!prepared_stages[0].filters.empty())
{
ASTPtr where_expression;
if (prepared_stages[0].filters.size() == 1)
where_expression = prepared_stages[0].filters[0];
else
if (i)
{
auto coalesced_predicates = std::make_shared<ASTFunction>();
coalesced_predicates->name = "and";
coalesced_predicates->arguments = std::make_shared<ASTExpressionList>();
coalesced_predicates->children.push_back(coalesced_predicates->arguments);
coalesced_predicates->arguments->children = prepared_stages[0].filters;
where_expression = std::move(coalesced_predicates);
/// Propagate information about columns needed as input.
for (const auto & column : actions_chain.steps.front()->getRequiredColumns())
prepared_stages[i - 1].output_columns.insert(column.name);
}
}
}
/// This structure re-implements adding virtual columns while reading from MergeTree part.
/// It would be good to unify it with IMergeTreeSelectAlgorithm.
struct VirtualColumns
{
struct ColumnAndPosition
{
ColumnWithTypeAndName column;
size_t position;
};
using Columns = std::vector<ColumnAndPosition>;
Columns virtuals;
Names columns_to_read;
VirtualColumns(Names required_columns, const MergeTreeData::DataPartPtr & part) : columns_to_read(std::move(required_columns))
{
for (size_t i = 0; i < columns_to_read.size(); ++i)
{
if (columns_to_read[i] == LightweightDeleteDescription::FILTER_COLUMN.name)
{
LoadedMergeTreeDataPartInfoForReader part_info_reader(part);
if (!part_info_reader.getColumns().contains(LightweightDeleteDescription::FILTER_COLUMN.name))
{
ColumnWithTypeAndName mask_column;
mask_column.type = LightweightDeleteDescription::FILTER_COLUMN.type;
mask_column.column = mask_column.type->createColumnConst(0, 1);
mask_column.name = std::move(columns_to_read[i]);
virtuals.emplace_back(ColumnAndPosition{.column = std::move(mask_column), .position = i});
}
}
else if (columns_to_read[i] == "_partition_id")
{
ColumnWithTypeAndName column;
column.type = std::make_shared<DataTypeString>();
column.column = column.type->createColumnConst(0, part->info.partition_id);
column.name = std::move(columns_to_read[i]);
virtuals.emplace_back(ColumnAndPosition{.column = std::move(column), .position = i});
}
}
if (!virtuals.empty())
{
Names columns_no_virtuals;
columns_no_virtuals.reserve(columns_to_read.size());
size_t next_virtual = 0;
for (size_t i = 0; i < columns_to_read.size(); ++i)
{
if (next_virtual < virtuals.size() && i == virtuals[next_virtual].position)
++next_virtual;
else
columns_no_virtuals.emplace_back(std::move(columns_to_read[i]));
}
columns_to_read.swap(columns_no_virtuals);
}
select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_expression));
}
return select;
void addVirtuals(QueryPlan & plan)
{
auto dag = std::make_unique<ActionsDAG>(plan.getCurrentDataStream().header.getColumnsWithTypeAndName());
for (auto & column : virtuals)
{
const auto & adding_const = dag->addColumn(std::move(column.column));
auto & outputs = dag->getOutputs();
outputs.insert(outputs.begin() + column.position, &adding_const);
}
auto step = std::make_unique<ExpressionStep>(plan.getCurrentDataStream(), std::move(dag));
plan.addStep(std::move(step));
}
};
void MutationsInterpreter::Source::read(
Stage & first_stage,
QueryPlan & plan,
const StorageMetadataPtr & snapshot_,
const ContextPtr & context_,
bool apply_deleted_mask_,
bool can_execute_) const
{
auto required_columns = first_stage.expressions_chain.steps.front()->getRequiredColumns().getNames();
auto storage_snapshot = getStorageSnapshot(snapshot_, context_);
if (!can_execute_)
{
auto header = storage_snapshot->getSampleBlockForColumns(required_columns);
auto callback = []()
{
return DB::Exception(ErrorCodes::LOGICAL_ERROR, "Cannot execute a mutation because can_execute flag set to false");
};
Pipe pipe(std::make_shared<ThrowingExceptionSource>(header, callback));
auto read_from_pipe = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
plan.addStep(std::move(read_from_pipe));
return;
}
if (data)
{
const auto & steps = first_stage.expressions_chain.steps;
const auto & names = first_stage.filter_column_names;
size_t num_filters = names.size();
ActionsDAGPtr filter;
if (!first_stage.filter_column_names.empty())
{
ActionsDAG::NodeRawConstPtrs nodes(num_filters);
for (size_t i = 0; i < num_filters; ++i)
nodes[i] = &steps[i]->actions()->findInOutputs(names[i]);
filter = ActionsDAG::buildFilterActionsDAG(nodes, {}, context_);
}
VirtualColumns virtual_columns(std::move(required_columns), part);
createMergeTreeSequentialSource(
plan, *data, storage_snapshot, part, std::move(virtual_columns.columns_to_read), apply_deleted_mask_, filter, context_,
&Poco::Logger::get("MutationsInterpreter"));
virtual_columns.addVirtuals(plan);
}
else
{
auto select = std::make_shared<ASTSelectQuery>();
select->setExpression(ASTSelectQuery::Expression::SELECT, std::make_shared<ASTExpressionList>());
for (const auto & column_name : first_stage.output_columns)
select->select()->children.push_back(std::make_shared<ASTIdentifier>(column_name));
/// Don't let select list be empty.
if (select->select()->children.empty())
select->select()->children.push_back(std::make_shared<ASTLiteral>(Field(0)));
if (!first_stage.filters.empty())
{
ASTPtr where_expression;
if (first_stage.filters.size() == 1)
where_expression = first_stage.filters[0];
else
{
auto coalesced_predicates = std::make_shared<ASTFunction>();
coalesced_predicates->name = "and";
coalesced_predicates->arguments = std::make_shared<ASTExpressionList>();
coalesced_predicates->children.push_back(coalesced_predicates->arguments);
coalesced_predicates->arguments->children = first_stage.filters;
where_expression = std::move(coalesced_predicates);
}
select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_expression));
}
SelectQueryInfo query_info;
query_info.query = std::move(select);
size_t max_block_size = context_->getSettingsRef().max_block_size;
size_t max_streams = 1;
storage->read(plan, required_columns, storage_snapshot, query_info, context_, QueryProcessingStage::FetchColumns, max_block_size, max_streams);
if (!plan.isInitialized())
{
/// It may be possible when there is nothing to read from storage.
auto header = storage_snapshot->getSampleBlockForColumns(required_columns);
auto read_from_pipe = std::make_unique<ReadFromPreparedSource>(Pipe(std::make_shared<NullSource>(header)));
plan.addStep(std::move(read_from_pipe));
}
}
}
void MutationsInterpreter::initQueryPlan(Stage & first_stage, QueryPlan & plan)
{
source.read(first_stage, plan, metadata_snapshot, context, apply_deleted_mask, can_execute);
addCreatingSetsStep(plan, first_stage.analyzer->getPreparedSets(), context);
}
QueryPipelineBuilder MutationsInterpreter::addStreamsForLaterStages(const std::vector<Stage> & prepared_stages, QueryPlan & plan) const
{
for (size_t i_stage = 1; i_stage < prepared_stages.size(); ++i_stage)
for (const Stage & stage : prepared_stages)
{
const Stage & stage = prepared_stages[i_stage];
for (size_t i = 0; i < stage.expressions_chain.steps.size(); ++i)
{
const auto & step = stage.expressions_chain.steps[i];
@ -988,14 +1214,11 @@ QueryPipelineBuilder MutationsInterpreter::addStreamsForLaterStages(const std::v
void MutationsInterpreter::validate()
{
if (!select_interpreter)
select_interpreter = std::make_unique<InterpreterSelectQuery>(mutation_ast, context, storage, metadata_snapshot, select_limits);
const Settings & settings = context->getSettingsRef();
/// For Replicated* storages mutations cannot employ non-deterministic functions
/// because that produces inconsistencies between replicas
if (startsWith(storage->getName(), "Replicated") && !settings.allow_nondeterministic_mutations)
if (startsWith(source.getStorage()->getName(), "Replicated") && !settings.allow_nondeterministic_mutations)
{
for (const auto & command : commands)
{
@ -1012,7 +1235,7 @@ void MutationsInterpreter::validate()
}
QueryPlan plan;
select_interpreter->buildQueryPlan(plan);
initQueryPlan(stages.front(), plan);
auto pipeline = addStreamsForLaterStages(stages, plan);
}
@ -1021,23 +1244,8 @@ QueryPipelineBuilder MutationsInterpreter::execute()
if (!can_execute)
throw Exception("Cannot execute mutations interpreter because can_execute flag set to false", ErrorCodes::LOGICAL_ERROR);
if (!select_interpreter)
{
/// Skip to apply deleted mask for MutateSomePartColumn cases when part has lightweight delete.
if (!apply_deleted_mask)
{
auto context_for_reading = Context::createCopy(context);
context_for_reading->setApplyDeletedMask(apply_deleted_mask);
select_interpreter = std::make_unique<InterpreterSelectQuery>(mutation_ast, context_for_reading, storage, metadata_snapshot, select_limits);
}
else
select_interpreter = std::make_unique<InterpreterSelectQuery>(mutation_ast, context, storage, metadata_snapshot, select_limits);
}
QueryPlan plan;
select_interpreter->buildQueryPlan(plan);
initQueryPlan(stages.front(), plan);
auto builder = addStreamsForLaterStages(stages, plan);
/// Sometimes we update just part of columns (for example UPDATE mutation)
@ -1069,11 +1277,7 @@ const ColumnDependencies & MutationsInterpreter::getColumnDependencies() const
size_t MutationsInterpreter::evaluateCommandsSize()
{
for (const MutationCommand & command : commands)
if (unlikely(!command.predicate && !command.partition)) /// The command touches all rows.
return mutation_ast->size();
return std::max(prepareQueryAffectedAST(commands, storage, context)->size(), mutation_ast->size());
return prepareQueryAffectedAST(commands, source.getStorage(), context)->size();
}
std::optional<SortDescription> MutationsInterpreter::getStorageSortDescriptionIfPossible(const Block & header) const
@ -1096,7 +1300,7 @@ std::optional<SortDescription> MutationsInterpreter::getStorageSortDescriptionIf
ASTPtr MutationsInterpreter::getPartitionAndPredicateExpressionForMutationCommand(const MutationCommand & command) const
{
return DB::getPartitionAndPredicateExpressionForMutationCommand(command, storage, context);
return DB::getPartitionAndPredicateExpressionForMutationCommand(command, source.getStorage(), context);
}
bool MutationsInterpreter::Stage::isAffectingAllColumns(const Names & storage_columns) const

View File

@ -19,7 +19,8 @@ using QueryPipelineBuilderPtr = std::unique_ptr<QueryPipelineBuilder>;
/// Return false if the data isn't going to be changed by mutations.
bool isStorageTouchedByMutations(
const StoragePtr & storage,
MergeTreeData & storage,
MergeTreeData::DataPartPtr source_part,
const StorageMetadataPtr & metadata_snapshot,
const std::vector<MutationCommand> & commands,
ContextMutablePtr context_copy
@ -35,6 +36,8 @@ ASTPtr getPartitionAndPredicateExpressionForMutationCommand(
/// to this data.
class MutationsInterpreter
{
struct Stage;
public:
/// Storage to mutate, array of mutations commands and context. If you really want to execute mutation
/// use can_execute = true, in other cases (validation, amount of commands) it can be false
@ -47,8 +50,18 @@ public:
bool return_all_columns_ = false,
bool return_deleted_rows_ = false);
void validate();
/// Special case for MergeTree
MutationsInterpreter(
MergeTreeData & storage_,
MergeTreeData::DataPartPtr source_part_,
const StorageMetadataPtr & metadata_snapshot_,
MutationCommands commands_,
ContextPtr context_,
bool can_execute_,
bool return_all_columns_ = false,
bool return_deleted_rows_ = false);
void validate();
size_t evaluateCommandsSize();
/// The resulting stream will return blocks containing only changed columns and columns, that we need to recalculate indices.
@ -82,19 +95,60 @@ public:
void setApplyDeletedMask(bool apply) { apply_deleted_mask = apply; }
/// Internal class which represents a data part for MergeTree
/// or just storage for other storages.
/// The main idea is to create a dedicated reading from MergeTree part.
/// Additionally we propagate some storage properties.
struct Source
{
StorageSnapshotPtr getStorageSnapshot(const StorageMetadataPtr & snapshot_, const ContextPtr & context_) const;
StoragePtr getStorage() const;
const MergeTreeData * getMergeTreeData() const;
bool supportsLightweightDelete() const;
bool hasLightweightDeleteMask() const;
bool materializeTTLRecalculateOnly() const;
void read(
Stage & first_stage,
QueryPlan & plan,
const StorageMetadataPtr & snapshot_,
const ContextPtr & context_,
bool apply_deleted_mask_,
bool can_execute_) const;
explicit Source(StoragePtr storage_);
Source(MergeTreeData & storage_, MergeTreeData::DataPartPtr source_part_);
private:
StoragePtr storage;
/// Special case for MergeTree.
MergeTreeData * data = nullptr;
MergeTreeData::DataPartPtr part;
};
private:
ASTPtr prepare(bool dry_run);
MutationsInterpreter(
Source source_,
const StorageMetadataPtr & metadata_snapshot_,
MutationCommands commands_,
ContextPtr context_,
bool can_execute_,
bool return_all_columns_,
bool return_deleted_rows_);
struct Stage;
void prepare(bool dry_run);
ASTPtr prepareInterpreterSelectQuery(std::vector<Stage> &prepared_stages, bool dry_run);
void initQueryPlan(Stage & first_stage, QueryPlan & query_plan);
void prepareMutationStages(std::vector<Stage> &prepared_stages, bool dry_run);
QueryPipelineBuilder addStreamsForLaterStages(const std::vector<Stage> & prepared_stages, QueryPlan & plan) const;
std::optional<SortDescription> getStorageSortDescriptionIfPossible(const Block & header) const;
ASTPtr getPartitionAndPredicateExpressionForMutationCommand(const MutationCommand & command) const;
StoragePtr storage;
Source source;
StorageMetadataPtr metadata_snapshot;
MutationCommands commands;
ContextPtr context;
@ -103,12 +157,6 @@ private:
bool apply_deleted_mask = true;
ASTPtr mutation_ast;
/// We have to store interpreter because it use own copy of context
/// and some streams from execute method may use it.
std::unique_ptr<InterpreterSelectQuery> select_interpreter;
/// A sequence of mutation commands is executed as a sequence of stages. Each stage consists of several
/// filters, followed by updating values of some columns. Commands can reuse expressions calculated by the
/// previous commands in the same stage, but at the end of each stage intermediate columns are thrown away

View File

@ -64,7 +64,6 @@ static MergeTreeReaderSettings getMergeTreeReaderSettings(
.save_marks_in_cache = true,
.checksum_on_read = settings.checksum_on_read,
.read_in_order = query_info.input_order_info != nullptr,
.apply_deleted_mask = context->applyDeletedMask(),
.use_asynchronous_read_from_pool = settings.allow_asynchronous_read_from_io_pool_for_merge_tree
&& (settings.max_streams_to_max_threads_ratio > 1 || settings.max_streams_for_merge_tree_reading > 1),
};

View File

@ -0,0 +1,32 @@
#pragma once
#include <Processors/ISource.h>
namespace DB
{
/// This source is throwing exception at the first attempt to read from it.
/// Can be used as a additional check that pipeline (or its part) is never executed.
class ThrowingExceptionSource : public ISource
{
public:
using CallBack = std::function<Exception()>;
explicit ThrowingExceptionSource(Block header, CallBack callback_)
: ISource(std::move(header))
, callback(std::move(callback_))
{}
String getName() const override { return "ThrowingExceptionSource"; }
protected:
Chunk generate() override
{
throw callback();
}
CallBack callback;
};
}

View File

@ -110,6 +110,8 @@ public:
/// The name of the table.
StorageID getStorageID() const;
virtual bool isMergeTree() const { return false; }
/// Returns true if the storage receives data from a remote server or servers.
virtual bool isRemote() const { return false; }

View File

@ -424,6 +424,8 @@ public:
StoragePolicyPtr getStoragePolicy() const override;
bool isMergeTree() const override { return true; }
bool supportsPrewhere() const override { return true; }
bool supportsFinal() const override;

View File

@ -66,6 +66,13 @@ public:
size_t num_streams,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read = nullptr) const;
static MarkRanges markRangesFromPKRange(
const MergeTreeData::DataPartPtr & part,
const StorageMetadataPtr & metadata_snapshot,
const KeyCondition & key_condition,
const Settings & settings,
Poco::Logger * log);
private:
const MergeTreeData & data;
Poco::Logger * log;
@ -78,13 +85,6 @@ private:
const Settings & settings,
Poco::Logger * log);
static MarkRanges markRangesFromPKRange(
const MergeTreeData::DataPartPtr & part,
const StorageMetadataPtr & metadata_snapshot,
const KeyCondition & key_condition,
const Settings & settings,
Poco::Logger * log);
static MarkRanges filterMarksUsingIndex(
MergeTreeIndexPtr index_helper,
MergeTreeIndexConditionPtr condition,

View File

@ -1,9 +1,14 @@
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/QueryPlan/ISourceStep.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <QueryPipeline/Pipe.h>
#include <Interpreters/Context.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/QueryPlan/FilterStep.h>
namespace DB
{
@ -25,6 +30,8 @@ public:
const StorageSnapshotPtr & storage_snapshot_,
MergeTreeData::DataPartPtr data_part_,
Names columns_to_read_,
std::optional<MarkRanges> mark_ranges_,
bool apply_deleted_mask,
bool read_with_direct_io_,
bool take_column_types_from_storage,
bool quiet = false);
@ -56,6 +63,8 @@ private:
Poco::Logger * log = &Poco::Logger::get("MergeTreeSequentialSource");
std::optional<MarkRanges> mark_ranges;
std::shared_ptr<MarkCache> mark_cache;
using MergeTreeReaderPtr = std::unique_ptr<IMergeTreeReader>;
MergeTreeReaderPtr reader;
@ -76,6 +85,8 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
const StorageSnapshotPtr & storage_snapshot_,
MergeTreeData::DataPartPtr data_part_,
Names columns_to_read_,
std::optional<MarkRanges> mark_ranges_,
bool apply_deleted_mask,
bool read_with_direct_io_,
bool take_column_types_from_storage,
bool quiet)
@ -85,6 +96,7 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
, data_part(std::move(data_part_))
, columns_to_read(std::move(columns_to_read_))
, read_with_direct_io(read_with_direct_io_)
, mark_ranges(std::move(mark_ranges_))
, mark_cache(storage.getContext()->getMarkCache())
{
if (!quiet)
@ -126,11 +138,15 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
MergeTreeReaderSettings reader_settings =
{
.read_settings = read_settings,
.save_marks_in_cache = false
.save_marks_in_cache = false,
.apply_deleted_mask = apply_deleted_mask,
};
if (!mark_ranges)
mark_ranges.emplace(MarkRanges{MarkRange(0, data_part->getMarksCount())});
reader = data_part->getReader(columns_for_reader, storage_snapshot->metadata,
MarkRanges{MarkRange(0, data_part->getMarksCount())},
*mark_ranges,
/* uncompressed_cache = */ nullptr, mark_cache.get(), reader_settings, {}, {});
}
@ -224,8 +240,10 @@ Pipe createMergeTreeSequentialSource(
if (need_to_filter_deleted_rows)
columns.emplace_back(LightweightDeleteDescription::FILTER_COLUMN.name);
bool apply_deleted_mask = false;
auto column_part_source = std::make_shared<MergeTreeSequentialSource>(
storage, storage_snapshot, data_part, columns, read_with_direct_io, take_column_types_from_storage, quiet);
storage, storage_snapshot, data_part, columns, std::optional<MarkRanges>{}, apply_deleted_mask, read_with_direct_io, take_column_types_from_storage, quiet);
Pipe pipe(std::move(column_part_source));
@ -242,4 +260,92 @@ Pipe createMergeTreeSequentialSource(
return pipe;
}
/// A Query Plan step to read from a single Merge Tree part
/// using Merge Tree Sequential Source (which reads strictly sequentially in a single thread).
/// This step is used for mutations because the usual reading is too tricky.
/// Previously, sequential reading was achieved by changing some settings like max_threads,
/// however, this approach lead to data corruption after some new settings were introduced.
class ReadFromPart final : public ISourceStep
{
public:
ReadFromPart(
const MergeTreeData & storage_,
const StorageSnapshotPtr & storage_snapshot_,
MergeTreeData::DataPartPtr data_part_,
Names columns_to_read_,
bool apply_deleted_mask_,
ActionsDAGPtr filter_,
ContextPtr context_,
Poco::Logger * log_)
: ISourceStep(DataStream{.header = storage_snapshot_->getSampleBlockForColumns(columns_to_read_)})
, storage(storage_)
, storage_snapshot(storage_snapshot_)
, data_part(std::move(data_part_))
, columns_to_read(std::move(columns_to_read_))
, apply_deleted_mask(apply_deleted_mask_)
, filter(std::move(filter_))
, context(std::move(context_))
, log(log_)
{
}
String getName() const override { return fmt::format("ReadFromPart({})", data_part->name); }
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override
{
std::optional<MarkRanges> mark_ranges;
const auto & metadata_snapshot = storage_snapshot->metadata;
if (filter && metadata_snapshot->hasPrimaryKey())
{
const auto & primary_key = storage_snapshot->metadata->getPrimaryKey();
const Names & primary_key_column_names = primary_key.column_names;
KeyCondition key_condition(filter, context, primary_key_column_names, primary_key.expression, NameSet{});
LOG_DEBUG(log, "Key condition: {}", key_condition.toString());
if (!key_condition.alwaysFalse())
mark_ranges = MergeTreeDataSelectExecutor::markRangesFromPKRange(
data_part, metadata_snapshot, key_condition, context->getSettingsRef(), log);
if (mark_ranges && mark_ranges->empty())
{
pipeline.init(Pipe(std::make_unique<NullSource>(output_stream->header)));
return;
}
}
auto source = std::make_unique<MergeTreeSequentialSource>(
storage, storage_snapshot, data_part, columns_to_read, std::move(mark_ranges), apply_deleted_mask, false, true);
pipeline.init(Pipe(std::move(source)));
}
private:
const MergeTreeData & storage;
StorageSnapshotPtr storage_snapshot;
MergeTreeData::DataPartPtr data_part;
Names columns_to_read;
bool apply_deleted_mask;
ActionsDAGPtr filter;
ContextPtr context;
Poco::Logger * log;
};
void createMergeTreeSequentialSource(
QueryPlan & plan,
const MergeTreeData & storage,
const StorageSnapshotPtr & storage_snapshot,
MergeTreeData::DataPartPtr data_part,
Names columns_to_read,
bool apply_deleted_mask,
ActionsDAGPtr filter,
ContextPtr context,
Poco::Logger * log)
{
auto reading = std::make_unique<ReadFromPart>(
storage, storage_snapshot, std::move(data_part), std::move(columns_to_read), apply_deleted_mask, filter, std::move(context), log);
plan.addStep(std::move(reading));
}
}

View File

@ -20,4 +20,17 @@ Pipe createMergeTreeSequentialSource(
bool quiet,
std::shared_ptr<std::atomic<size_t>> filtered_rows_count);
class QueryPlan;
void createMergeTreeSequentialSource(
QueryPlan & plan,
const MergeTreeData & storage,
const StorageSnapshotPtr & storage_snapshot,
MergeTreeData::DataPartPtr data_part,
Names columns_to_read,
bool apply_deleted_mask,
ActionsDAGPtr filter,
ContextPtr context,
Poco::Logger * log);
}

View File

@ -714,8 +714,6 @@ struct MutationContext
FutureMergedMutatedPartPtr future_part;
MergeTreeData::DataPartPtr source_part;
StoragePtr storage_from_source_part;
StorageMetadataPtr metadata_snapshot;
MutationCommandsConstPtr commands;
@ -1478,10 +1476,9 @@ MutateTask::MutateTask(
ctx->storage_columns = metadata_snapshot_->getColumns().getAllPhysical();
ctx->txn = txn;
ctx->source_part = ctx->future_part->parts[0];
ctx->storage_from_source_part = std::make_shared<StorageFromMergeTreeDataPart>(ctx->source_part);
ctx->need_prefix = need_prefix_;
auto storage_snapshot = ctx->storage_from_source_part->getStorageSnapshot(ctx->metadata_snapshot, context_);
auto storage_snapshot = ctx->data->getStorageSnapshot(ctx->metadata_snapshot, context_);
extendObjectColumns(ctx->storage_columns, storage_snapshot->object_columns, /*with_subcolumns=*/ false);
}
@ -1554,7 +1551,7 @@ bool MutateTask::prepare()
}
if (ctx->source_part->isStoredOnDisk() && !isStorageTouchedByMutations(
ctx->storage_from_source_part, ctx->metadata_snapshot, ctx->commands_for_part, Context::createCopy(context_for_reading)))
*ctx->data, ctx->source_part, ctx->metadata_snapshot, ctx->commands_for_part, Context::createCopy(context_for_reading)))
{
NameSet files_to_copy_instead_of_hardlinks;
auto settings_ptr = ctx->data->getSettings();
@ -1597,7 +1594,7 @@ bool MutateTask::prepare()
if (!ctx->for_interpreter.empty())
{
ctx->interpreter = std::make_unique<MutationsInterpreter>(
ctx->storage_from_source_part, ctx->metadata_snapshot, ctx->for_interpreter, context_for_reading, true);
*ctx->data, ctx->source_part, ctx->metadata_snapshot, ctx->for_interpreter, context_for_reading, true);
ctx->materialized_indices = ctx->interpreter->grabMaterializedIndices();
ctx->materialized_projections = ctx->interpreter->grabMaterializedProjections();
ctx->mutation_kind = ctx->interpreter->getMutationKind();