introduce mutations snapshot

This commit is contained in:
Anton Popov 2024-06-28 02:28:02 +00:00
parent 4acfad66c2
commit 096616bd1f
42 changed files with 413 additions and 230 deletions

View File

@ -145,6 +145,7 @@ ColumnDependencies getAllColumnDependencies(
bool isStorageTouchedByMutations(
MergeTreeData & storage,
MergeTreeData::DataPartPtr source_part,
MergeTreeData::MutationsSnapshotPtr mutations_snapshot,
const StorageMetadataPtr & metadata_snapshot,
const std::vector<MutationCommand> & commands,
ContextPtr context)
@ -181,7 +182,7 @@ bool isStorageTouchedByMutations(
if (all_commands_can_be_skipped)
return false;
auto storage_from_part = std::make_shared<StorageFromMergeTreeDataPart>(source_part);
auto storage_from_part = std::make_shared<StorageFromMergeTreeDataPart>(source_part, mutations_snapshot);
std::optional<InterpreterSelectQuery> interpreter_select_query;
BlockIO io;
@ -283,8 +284,13 @@ MutationsInterpreter::Source::Source(StoragePtr storage_) : storage(std::move(st
{
}
MutationsInterpreter::Source::Source(MergeTreeData & storage_, MergeTreeData::DataPartPtr source_part_)
: data(&storage_), part(std::move(source_part_))
MutationsInterpreter::Source::Source(
MergeTreeData & storage_,
MergeTreeData::DataPartPtr source_part_,
AlterConversionsPtr alter_conversions_)
: data(&storage_)
, part(std::move(source_part_))
, alter_conversions(std::move(alter_conversions_))
{
}
@ -384,13 +390,14 @@ MutationsInterpreter::MutationsInterpreter(
MutationsInterpreter::MutationsInterpreter(
MergeTreeData & storage_,
MergeTreeData::DataPartPtr source_part_,
AlterConversionsPtr alter_conversions_,
StorageMetadataPtr metadata_snapshot_,
MutationCommands commands_,
Names available_columns_,
ContextPtr context_,
Settings settings_)
: MutationsInterpreter(
Source(storage_, std::move(source_part_)),
Source(storage_, std::move(source_part_), std::move(alter_conversions_)),
std::move(metadata_snapshot_), std::move(commands_),
std::move(available_columns_), std::move(context_), std::move(settings_))
{
@ -1210,7 +1217,7 @@ void MutationsInterpreter::Source::read(
createReadFromPartStep(
MergeTreeSequentialSourceType::Mutation,
plan, *data, storage_snapshot,
part, required_columns,
part, alter_conversions, required_columns,
apply_deleted_mask_, filter, context_,
getLogger("MutationsInterpreter"));
}

View File

@ -6,6 +6,7 @@
#include <Interpreters/Context.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/MutationCommands.h>
#include "Storages/MergeTree/AlterConversions.h"
namespace DB
@ -21,6 +22,7 @@ using QueryPipelineBuilderPtr = std::unique_ptr<QueryPipelineBuilder>;
bool isStorageTouchedByMutations(
MergeTreeData & storage,
MergeTreeData::DataPartPtr source_part,
MergeTreeData::MutationsSnapshotPtr mutations_snapshot,
const StorageMetadataPtr & metadata_snapshot,
const std::vector<MutationCommand> & commands,
ContextPtr context
@ -71,6 +73,7 @@ public:
MutationsInterpreter(
MergeTreeData & storage_,
MergeTreeData::DataPartPtr source_part_,
AlterConversionsPtr alter_conversions_,
StorageMetadataPtr metadata_snapshot_,
MutationCommands commands_,
Names available_columns_,
@ -138,7 +141,7 @@ public:
bool can_execute_) const;
explicit Source(StoragePtr storage_);
Source(MergeTreeData & storage_, MergeTreeData::DataPartPtr source_part_);
Source(MergeTreeData & storage_, MergeTreeData::DataPartPtr source_part_, AlterConversionsPtr alter_conversions_);
private:
StoragePtr storage;
@ -146,6 +149,7 @@ public:
/// Special case for *MergeTree.
MergeTreeData * data = nullptr;
MergeTreeData::DataPartPtr part;
AlterConversionsPtr alter_conversions;
};
private:

View File

@ -764,7 +764,7 @@ std::optional<String> optimizeUseAggregateProjections(QueryPlan::Node & node, Qu
projection_reading = reader.readFromParts(
/* parts = */ {},
/* alter_conversions = */ {},
reading->getMutationsSnapshot()->cloneEmpty(),
best_candidate->dag->getRequiredColumnsNames(),
proj_snapshot,
projection_query_info,

View File

@ -199,7 +199,7 @@ std::optional<String> optimizeUseNormalProjections(Stack & stack, QueryPlan::Nod
auto projection_reading = reader.readFromParts(
/*parts=*/ {},
/*alter_conversions=*/ {},
reading->getMutationsSnapshot()->cloneEmpty(),
required_columns,
proj_snapshot,
query_info_copy,

View File

@ -217,20 +217,15 @@ bool analyzeProjectionCandidate(
{
MergeTreeData::DataPartsVector projection_parts;
MergeTreeData::DataPartsVector normal_parts;
std::vector<AlterConversionsPtr> alter_conversions;
for (const auto & part_with_ranges : parts_with_ranges)
{
const auto & created_projections = part_with_ranges.data_part->getProjectionParts();
auto it = created_projections.find(candidate.projection->name);
if (it != created_projections.end() && !it->second->is_broken)
{
projection_parts.push_back(it->second);
}
else
{
normal_parts.push_back(part_with_ranges.data_part);
alter_conversions.push_back(part_with_ranges.alter_conversions);
}
}
if (projection_parts.empty())
@ -255,7 +250,7 @@ bool analyzeProjectionCandidate(
if (!normal_parts.empty())
{
/// TODO: We can reuse existing analysis_result by filtering out projection parts
auto normal_result_ptr = reading.selectRangesToRead(std::move(normal_parts), std::move(alter_conversions));
auto normal_result_ptr = reading.selectRangesToRead(std::move(normal_parts));
if (normal_result_ptr->selected_marks != 0)
{

View File

@ -229,7 +229,6 @@ public:
{
ranges_in_data_parts.emplace_back(
initial_ranges_in_data_parts[part_index].data_part,
initial_ranges_in_data_parts[part_index].alter_conversions,
initial_ranges_in_data_parts[part_index].part_index_in_query,
MarkRanges{mark_range});
part_index_to_initial_ranges_in_data_parts_index[it->second] = part_index;

View File

@ -262,7 +262,7 @@ void ReadFromMergeTree::AnalysisResult::checkLimits(const Settings & settings, c
ReadFromMergeTree::ReadFromMergeTree(
MergeTreeData::DataPartsVector parts_,
std::vector<AlterConversionsPtr> alter_conversions_,
MergeTreeData::MutationsSnapshotPtr mutations_,
Names all_column_names_,
const MergeTreeData & data_,
const SelectQueryInfo & query_info_,
@ -279,7 +279,7 @@ ReadFromMergeTree::ReadFromMergeTree(
query_info_.prewhere_info)}, all_column_names_, query_info_, storage_snapshot_, context_)
, reader_settings(getMergeTreeReaderSettings(context_, query_info_))
, prepared_parts(std::move(parts_))
, alter_conversions_for_parts(std::move(alter_conversions_))
, mutations_snapshot(std::move(mutations_))
, all_column_names(std::move(all_column_names_))
, data(data_)
, actions_settings(ExpressionActionsSettings::fromContext(context_))
@ -361,6 +361,7 @@ Pipe ReadFromMergeTree::readFromPoolParallelReplicas(
auto pool = std::make_shared<MergeTreeReadPoolParallelReplicas>(
std::move(extension),
std::move(parts_with_range),
mutations_snapshot,
shared_virtual_fields,
storage_snapshot,
prewhere_info,
@ -442,6 +443,7 @@ Pipe ReadFromMergeTree::readFromPool(
{
pool = std::make_shared<MergeTreePrefetchedReadPool>(
std::move(parts_with_range),
mutations_snapshot,
shared_virtual_fields,
storage_snapshot,
prewhere_info,
@ -455,6 +457,7 @@ Pipe ReadFromMergeTree::readFromPool(
{
pool = std::make_shared<MergeTreeReadPool>(
std::move(parts_with_range),
mutations_snapshot,
shared_virtual_fields,
storage_snapshot,
prewhere_info,
@ -535,6 +538,7 @@ Pipe ReadFromMergeTree::readInOrder(
std::move(extension),
mode,
parts_with_ranges,
mutations_snapshot,
shared_virtual_fields,
storage_snapshot,
prewhere_info,
@ -550,6 +554,7 @@ Pipe ReadFromMergeTree::readInOrder(
has_limit_below_one_block,
read_type,
parts_with_ranges,
mutations_snapshot,
shared_virtual_fields,
storage_snapshot,
prewhere_info,
@ -1016,7 +1021,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
}
ranges_to_get_from_part = split_ranges(ranges_to_get_from_part, input_order_info->direction);
new_parts.emplace_back(part.data_part, part.alter_conversions, part.part_index_in_query, std::move(ranges_to_get_from_part));
new_parts.emplace_back(part.data_part, part.part_index_in_query, std::move(ranges_to_get_from_part));
}
splitted_parts_and_ranges.emplace_back(std::move(new_parts));
@ -1243,7 +1248,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
RangesInDataParts new_parts;
for (auto part_it = parts_to_merge_ranges[range_index]; part_it != parts_to_merge_ranges[range_index + 1]; ++part_it)
new_parts.emplace_back(part_it->data_part, part_it->alter_conversions, part_it->part_index_in_query, part_it->ranges);
new_parts.emplace_back(part_it->data_part, part_it->part_index_in_query, part_it->ranges);
if (new_parts.empty())
continue;
@ -1356,15 +1361,13 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(bool find_exact_ranges) const
{
return selectRangesToRead(prepared_parts, alter_conversions_for_parts, find_exact_ranges);
return selectRangesToRead(prepared_parts, find_exact_ranges);
}
ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
MergeTreeData::DataPartsVector parts, std::vector<AlterConversionsPtr> alter_conversions, bool find_exact_ranges) const
ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(MergeTreeData::DataPartsVector parts, bool find_exact_ranges) const
{
return selectRangesToRead(
std::move(parts),
std::move(alter_conversions),
metadata_for_reading,
query_info,
context,
@ -1534,7 +1537,6 @@ void ReadFromMergeTree::applyFilters(ActionDAGNodes added_filter_nodes)
ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
MergeTreeData::DataPartsVector parts,
std::vector<AlterConversionsPtr> alter_conversions,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info_,
ContextPtr context_,
@ -1596,10 +1598,9 @@ ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
{
MergeTreeDataSelectExecutor::filterPartsByPartition(
parts,
indexes->partition_pruner,
indexes->minmax_idx_condition,
parts,
alter_conversions,
indexes->part_values,
metadata_snapshot,
data,
@ -1628,7 +1629,6 @@ ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
auto reader_settings = getMergeTreeReaderSettings(context_, query_info_);
result.parts_with_ranges = MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipIndexes(
std::move(parts),
std::move(alter_conversions),
metadata_snapshot,
context_,
indexes->key_condition,

View File

@ -110,7 +110,7 @@ public:
ReadFromMergeTree(
MergeTreeData::DataPartsVector parts_,
std::vector<AlterConversionsPtr> alter_conversions_,
MergeTreeData::MutationsSnapshotPtr mutations_snapshot_,
Names all_column_names_,
const MergeTreeData & data_,
const SelectQueryInfo & query_info_,
@ -154,7 +154,6 @@ public:
static AnalysisResultPtr selectRangesToRead(
MergeTreeData::DataPartsVector parts,
std::vector<AlterConversionsPtr> alter_conversions,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
ContextPtr context,
@ -166,8 +165,7 @@ public:
std::optional<Indexes> & indexes,
bool find_exact_ranges);
AnalysisResultPtr selectRangesToRead(
MergeTreeData::DataPartsVector parts, std::vector<AlterConversionsPtr> alter_conversions, bool find_exact_ranges = false) const;
AnalysisResultPtr selectRangesToRead(MergeTreeData::DataPartsVector parts, bool find_exact_ranges = false) const;
AnalysisResultPtr selectRangesToRead(bool find_exact_ranges = false) const;
@ -188,7 +186,7 @@ public:
void setAnalyzedResult(AnalysisResultPtr analyzed_result_ptr_) { analyzed_result_ptr = std::move(analyzed_result_ptr_); }
const MergeTreeData::DataPartsVector & getParts() const { return prepared_parts; }
const std::vector<AlterConversionsPtr> & getAlterConvertionsForParts() const { return alter_conversions_for_parts; }
MergeTreeData::MutationsSnapshotPtr getMutationsSnapshot() const { return mutations_snapshot; }
const MergeTreeData & getMergeTreeData() const { return data; }
size_t getMaxBlockSize() const { return block_size.max_block_size_rows; }
@ -209,7 +207,7 @@ private:
MergeTreeReaderSettings reader_settings;
MergeTreeData::DataPartsVector prepared_parts;
std::vector<AlterConversionsPtr> alter_conversions_for_parts;
MergeTreeData::MutationsSnapshotPtr mutations_snapshot;
Names all_column_names;

View File

@ -9,9 +9,14 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
bool AlterConversions::supportsMutationCommandType(MutationCommand::Type t)
bool AlterConversions::isSupportedDataMutation(MutationCommand::Type)
{
return t == MutationCommand::Type::RENAME_COLUMN;
return false;
}
bool AlterConversions::isSupportedMetadataMutation(MutationCommand::Type type)
{
return type == MutationCommand::Type::RENAME_COLUMN;
}
void AlterConversions::addMutationCommand(const MutationCommand & command)

View File

@ -35,7 +35,8 @@ public:
/// Get column old name before rename (lookup by key in rename_map)
std::string getColumnOldName(const std::string & new_name) const;
static bool supportsMutationCommandType(MutationCommand::Type);
static bool isSupportedDataMutation(MutationCommand::Type type);
static bool isSupportedMetadataMutation(MutationCommand::Type type);
private:
/// Rename map new_name -> old_name.

View File

@ -257,6 +257,10 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
if (enabledBlockOffsetColumn(global_ctx))
addGatheringColumn(global_ctx, BlockOffsetColumn::name, BlockOffsetColumn::type);
auto mutations_snapshot = global_ctx->data->getMutationsSnapshot(
global_ctx->metadata_snapshot->getMetadataVersion(),
/*need_data_mutations=*/ false);
SerializationInfo::Settings info_settings =
{
.ratio_of_defaults_for_sparse = global_ctx->data->getSettings()->ratio_of_defaults_for_sparse_serialization,
@ -264,10 +268,12 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
};
SerializationInfoByName infos(global_ctx->storage_columns, info_settings);
global_ctx->alter_conversions.reserve(global_ctx->future_part->parts.size());
for (const auto & part : global_ctx->future_part->parts)
{
global_ctx->new_data_part->ttl_infos.update(part->ttl_infos);
if (global_ctx->metadata_snapshot->hasAnyTTL() && !part->checkAllTTLCalculated(global_ctx->metadata_snapshot))
{
LOG_INFO(ctx->log, "Some TTL values were not calculated for part {}. Will calculate them forcefully during merge.", part->name);
@ -288,6 +294,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
infos.add(part_infos);
}
global_ctx->alter_conversions.push_back(MergeTreeData::getAlterConversionsForPart(part, mutations_snapshot));
}
const auto & local_part_min_ttl = global_ctx->new_data_part->ttl_infos.part_min_ttl;
@ -604,6 +612,7 @@ Pipe MergeTask::VerticalMergeStage::createPipeForReadingOneColumn(const String &
*global_ctx->data,
global_ctx->storage_snapshot,
global_ctx->future_part->parts[part_num],
global_ctx->alter_conversions[part_num],
Names{column_name},
/*mark_ranges=*/ {},
global_ctx->input_rows_filtered,
@ -996,13 +1005,14 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
global_ctx->horizontal_stage_progress = std::make_unique<MergeStageProgress>(
ctx->column_sizes ? ctx->column_sizes->keyColumnsWeight() : 1.0);
for (const auto & part : global_ctx->future_part->parts)
for (size_t i = 0; i < global_ctx->future_part->parts.size(); ++i)
{
Pipe pipe = createMergeTreeSequentialSource(
MergeTreeSequentialSourceType::Merge,
*global_ctx->data,
global_ctx->storage_snapshot,
part,
global_ctx->future_part->parts[i],
global_ctx->alter_conversions[i],
global_ctx->merging_columns.getNames(),
/*mark_ranges=*/ {},
global_ctx->input_rows_filtered,

View File

@ -4,6 +4,7 @@
#include <memory>
#include <Common/filesystemHelpers.h>
#include "Storages/MergeTree/AlterConversions.h"
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedReadBufferFromFile.h>
@ -154,6 +155,7 @@ private:
StorageSnapshotPtr storage_snapshot{nullptr};
StorageMetadataPtr metadata_snapshot{nullptr};
FutureMergedMutatedPartPtr future_part{nullptr};
std::vector<AlterConversionsPtr> alter_conversions;
/// This will be either nullptr or new_data_part, so raw pointer is ok.
IMergeTreeDataPart * parent_part{nullptr};
ContextPtr context{nullptr};

View File

@ -8115,11 +8115,13 @@ bool MergeTreeData::canUsePolymorphicParts(const MergeTreeSettings & settings, S
return true;
}
AlterConversionsPtr MergeTreeData::getAlterConversionsForPart(MergeTreeDataPartPtr part) const
AlterConversionsPtr MergeTreeData::getAlterConversionsForPart(
const MergeTreeDataPartPtr & part,
const MutationsSnapshotPtr & snapshot)
{
auto commands = getAlterMutationCommandsForPart(part);
auto commands = snapshot->getAlterMutationCommandsForPart(part);
auto result = std::make_shared<AlterConversions>();
for (const auto & command : commands | std::views::reverse)
result->addMutationCommand(command);
@ -8427,9 +8429,9 @@ StorageSnapshotPtr MergeTreeData::getStorageSnapshot(const StorageMetadataPtr &
object_columns_copy = object_columns;
}
snapshot_data->alter_conversions.reserve(snapshot_data->parts.size());
for (const auto & part : snapshot_data->parts)
snapshot_data->alter_conversions.push_back(getAlterConversionsForPart(part));
snapshot_data->mutations_snapshot = getMutationsSnapshot(
metadata_snapshot->getMetadataVersion(),
query_context->getSettingsRef().apply_mutations_on_fly);
return std::make_shared<StorageSnapshot>(*this, metadata_snapshot, std::move(object_columns_copy), std::move(snapshot_data));
}
@ -8616,28 +8618,59 @@ void MergeTreeData::verifySortingKey(const KeyDescription & sorting_key)
}
}
bool updateAlterConversionsMutations(const MutationCommands & commands, std::atomic<ssize_t> & alter_conversions_mutations, bool remove)
static void updateMutationsCounters(
Int64 & data_mutations_to_apply,
Int64 & metadata_mutations_to_apply,
const MutationCommands & commands,
Int64 increment)
{
if (data_mutations_to_apply < 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "On-fly data mutations counter is negative ({})", data_mutations_to_apply);
if (metadata_mutations_to_apply < 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "On-fly metadata mutations counter is negative ({})", metadata_mutations_to_apply);
bool has_data_mutation = false;
bool has_metadata_mutation = false;
for (const auto & command : commands)
{
if (AlterConversions::supportsMutationCommandType(command.type))
if (!has_data_mutation && AlterConversions::isSupportedDataMutation(command.type))
{
if (remove)
{
--alter_conversions_mutations;
if (alter_conversions_mutations < 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "On-fly mutations counter is negative ({})", alter_conversions_mutations);
}
else
{
if (alter_conversions_mutations < 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "On-fly mutations counter is negative ({})", alter_conversions_mutations);
++alter_conversions_mutations;
}
return true;
data_mutations_to_apply += increment;
has_data_mutation = true;
if (data_mutations_to_apply < 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "On-fly data mutations counter is negative ({})", data_mutations_to_apply);
}
if (!has_metadata_mutation && AlterConversions::isSupportedMetadataMutation(command.type))
{
metadata_mutations_to_apply += increment;
has_metadata_mutation = true;
if (metadata_mutations_to_apply < 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "On-fly metadata mutations counter is negative ({})", metadata_mutations_to_apply);
}
}
return false;
}
void incrementMutationsCounters(
Int64 & data_mutations_to_apply,
Int64 & metadata_mutations_to_apply,
const MutationCommands & commands,
std::lock_guard<std::mutex> & /*lock*/)
{
return updateMutationsCounters(data_mutations_to_apply, metadata_mutations_to_apply, commands, 1);
}
void decrementMutationsCounters(
Int64 & data_mutations_to_apply,
Int64 & metadata_mutations_to_apply,
const MutationCommands & commands,
std::lock_guard<std::mutex> & /*lock*/)
{
return updateMutationsCounters(data_mutations_to_apply, metadata_mutations_to_apply, commands, -1);
}
}

View File

@ -35,6 +35,7 @@
#include <Interpreters/PartLog.h>
#include <Poco/Timestamp.h>
#include <Common/threadPoolCallbackRunner.h>
#include "Storages/ProjectionsDescription.h"
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
@ -445,12 +446,27 @@ public:
bool supportsTrivialCountOptimization(const StorageSnapshotPtr &, ContextPtr) const override;
struct IMutationsSnapshot
{
/// Return pending mutations that weren't applied to `part` yet and should be applied on the fly
/// (i.e. when reading from the part). Mutations not supported by AlterConversions
/// (supportsMutationCommandType()) can be omitted.
///
/// @return list of mutations, in *reverse* order (newest to oldest)
virtual MutationCommands getAlterMutationCommandsForPart(const DataPartPtr & part) const = 0;
virtual std::shared_ptr<IMutationsSnapshot> cloneEmpty() const = 0;
virtual ~IMutationsSnapshot() = default;
};
using MutationsSnapshotPtr = std::shared_ptr<const IMutationsSnapshot>;
/// Snapshot for MergeTree contains the current set of data parts
/// at the moment of the start of query.
/// and mutations required to be applied at the moment of the start of query.
struct SnapshotData : public StorageSnapshot::Data
{
DataPartsVector parts;
std::vector<AlterConversionsPtr> alter_conversions;
MutationsSnapshotPtr mutations_snapshot;
};
StorageSnapshotPtr getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context) const override;
@ -934,8 +950,13 @@ public:
Disks getDisks() const { return getStoragePolicy()->getDisks(); }
/// TODO: comment
virtual MutationsSnapshotPtr getMutationsSnapshot(Int64 metadata_version, bool need_data_mutations) const = 0;
/// Return alter conversions for part which must be applied on fly.
AlterConversionsPtr getAlterConversionsForPart(MergeTreeDataPartPtr part) const;
static AlterConversionsPtr getAlterConversionsForPart(
const MergeTreeDataPartPtr & part,
const MutationsSnapshotPtr & snapshot);
/// Returns destination disk or volume for the TTL rule according to current storage policy.
SpacePtr getDestinationForMoveTTL(const TTLDescription & move_ttl) const;
@ -1448,13 +1469,6 @@ protected:
/// mechanisms for parts locking
virtual bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const = 0;
/// Return pending mutations that weren't applied to `part` yet and should be applied on the fly
/// (i.e. when reading from the part). Mutations not supported by AlterConversions
/// (supportsMutationCommandType()) can be omitted.
///
/// @return list of mutations, in *reverse* order (newest to oldest)
virtual MutationCommands getAlterMutationCommandsForPart(const DataPartPtr & part) const = 0;
struct PartBackupEntries
{
String part_name;
@ -1738,6 +1752,16 @@ struct CurrentlySubmergingEmergingTagger
/// Look at MutationCommands if it contains mutations for AlterConversions, update the counter.
/// Return true if the counter had been updated
bool updateAlterConversionsMutations(const MutationCommands & commands, std::atomic<ssize_t> & alter_conversions_mutations, bool remove);
void incrementMutationsCounters(
Int64 & data_mutations_to_apply,
Int64 & metadata_mutations_to_apply,
const MutationCommands & commands,
std::lock_guard<std::mutex> & lock);
void decrementMutationsCounters(
Int64 & data_mutations_to_apply,
Int64 & metadata_mutations_to_apply,
const MutationCommands & commands,
std::lock_guard<std::mutex> & lock);
}

View File

@ -130,12 +130,10 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
bool enable_parallel_reading) const
{
const auto & snapshot_data = assert_cast<const MergeTreeData::SnapshotData &>(*storage_snapshot->data);
const auto & parts = snapshot_data.parts;
const auto & alter_conversions = snapshot_data.alter_conversions;
auto step = readFromParts(
parts,
alter_conversions,
snapshot_data.parts,
snapshot_data.mutations_snapshot,
column_names_to_return,
storage_snapshot,
query_info,
@ -491,10 +489,9 @@ std::optional<std::unordered_set<String>> MergeTreeDataSelectExecutor::filterPar
}
void MergeTreeDataSelectExecutor::filterPartsByPartition(
MergeTreeData::DataPartsVector & parts,
const std::optional<PartitionPruner> & partition_pruner,
const std::optional<KeyCondition> & minmax_idx_condition,
MergeTreeData::DataPartsVector & parts,
std::vector<AlterConversionsPtr> & alter_conversions,
const std::optional<std::unordered_set<String>> & part_values,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeData & data,
@ -503,8 +500,6 @@ void MergeTreeDataSelectExecutor::filterPartsByPartition(
LoggerPtr log,
ReadFromMergeTree::IndexStats & index_stats)
{
chassert(alter_conversions.empty() || parts.size() == alter_conversions.size());
const Settings & settings = context->getSettingsRef();
DataTypes minmax_columns_types;
@ -528,7 +523,6 @@ void MergeTreeDataSelectExecutor::filterPartsByPartition(
if (query_context->getSettingsRef().allow_experimental_query_deduplication)
selectPartsToReadWithUUIDFilter(
parts,
alter_conversions,
part_values,
data.getPinnedPartUUIDs(),
minmax_idx_condition,
@ -541,7 +535,6 @@ void MergeTreeDataSelectExecutor::filterPartsByPartition(
else
selectPartsToRead(
parts,
alter_conversions,
part_values,
minmax_idx_condition,
minmax_columns_types,
@ -580,7 +573,6 @@ void MergeTreeDataSelectExecutor::filterPartsByPartition(
RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipIndexes(
MergeTreeData::DataPartsVector && parts,
std::vector<AlterConversionsPtr> && alter_conversions,
StorageMetadataPtr metadata_snapshot,
const ContextPtr & context,
const KeyCondition & key_condition,
@ -593,8 +585,6 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
bool use_skip_indexes,
bool find_exact_ranges)
{
chassert(alter_conversions.empty() || parts.size() == alter_conversions.size());
RangesInDataParts parts_with_ranges;
parts_with_ranges.resize(parts.size());
const Settings & settings = context->getSettingsRef();
@ -653,11 +643,8 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
auto process_part = [&](size_t part_index)
{
auto & part = parts[part_index];
auto alter_conversions_for_part = !alter_conversions.empty()
? alter_conversions[part_index]
: std::make_shared<AlterConversions>();
RangesInDataPart ranges(part, alter_conversions_for_part, part_index);
RangesInDataPart ranges(part, part_index);
size_t total_marks_count = part->index_granularity.getMarksCountWithoutFinal();
if (metadata_snapshot->hasPrimaryKey() || part_offset_condition)
@ -907,11 +894,11 @@ ReadFromMergeTree::AnalysisResultPtr MergeTreeDataSelectExecutor::estimateNumMar
return std::make_shared<ReadFromMergeTree::AnalysisResult>();
std::optional<ReadFromMergeTree::Indexes> indexes;
/// NOTE: We don't need alter_conversions because the returned analysis_result is only used for:
/// 1. estimate the number of rows to read; 2. projection reading, which doesn't have alter_conversions.
/// NOTE: We don't need mutations snapshot because the returned analysis_result is only used for:
/// 1. estimate the number of rows to read;
/// 2. projection reading, which doesn't have alter conversions.
return ReadFromMergeTree::selectRangesToRead(
std::move(parts),
/*alter_conversions=*/{},
metadata_snapshot,
query_info,
context,
@ -926,7 +913,7 @@ ReadFromMergeTree::AnalysisResultPtr MergeTreeDataSelectExecutor::estimateNumMar
QueryPlanStepPtr MergeTreeDataSelectExecutor::readFromParts(
MergeTreeData::DataPartsVector parts,
std::vector<AlterConversionsPtr> alter_conversions,
MergeTreeData::MutationsSnapshotPtr mutations_snapshot,
const Names & column_names_to_return,
const StorageSnapshotPtr & storage_snapshot,
const SelectQueryInfo & query_info,
@ -948,7 +935,7 @@ QueryPlanStepPtr MergeTreeDataSelectExecutor::readFromParts(
return std::make_unique<ReadFromMergeTree>(
std::move(parts),
std::move(alter_conversions),
std::move(mutations_snapshot),
column_names_to_return,
data,
query_info,
@ -1546,7 +1533,6 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingMergedIndex(
void MergeTreeDataSelectExecutor::selectPartsToRead(
MergeTreeData::DataPartsVector & parts,
std::vector<AlterConversionsPtr> & alter_conversions,
const std::optional<std::unordered_set<String>> & part_values,
const std::optional<KeyCondition> & minmax_idx_condition,
const DataTypes & minmax_columns_types,
@ -1555,10 +1541,7 @@ void MergeTreeDataSelectExecutor::selectPartsToRead(
PartFilterCounters & counters)
{
MergeTreeData::DataPartsVector prev_parts;
std::vector<AlterConversionsPtr> prev_conversions;
std::swap(prev_parts, parts);
std::swap(prev_conversions, alter_conversions);
for (size_t i = 0; i < prev_parts.size(); ++i)
{
@ -1600,14 +1583,11 @@ void MergeTreeDataSelectExecutor::selectPartsToRead(
counters.num_granules_after_partition_pruner += num_granules;
parts.push_back(prev_parts[i]);
if (!prev_conversions.empty())
alter_conversions.push_back(prev_conversions[i]);
}
}
void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter(
MergeTreeData::DataPartsVector & parts,
std::vector<AlterConversionsPtr> & alter_conversions,
const std::optional<std::unordered_set<String>> & part_values,
MergeTreeData::PinnedPartUUIDsPtr pinned_part_uuids,
const std::optional<KeyCondition> & minmax_idx_condition,
@ -1620,18 +1600,13 @@ void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter(
{
/// process_parts prepare parts that have to be read for the query,
/// returns false if duplicated parts' UUID have been met
auto select_parts = [&] (
MergeTreeData::DataPartsVector & selected_parts,
std::vector<AlterConversionsPtr> & selected_conversions) -> bool
auto select_parts = [&](MergeTreeData::DataPartsVector & selected_parts) -> bool
{
auto ignored_part_uuids = query_context->getIgnoredPartUUIDs();
std::unordered_set<UUID> temp_part_uuids;
MergeTreeData::DataPartsVector prev_parts;
std::vector<AlterConversionsPtr> prev_conversions;
std::swap(prev_parts, selected_parts);
std::swap(prev_conversions, selected_conversions);
for (size_t i = 0; i < prev_parts.size(); ++i)
{
@ -1686,8 +1661,6 @@ void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter(
}
selected_parts.push_back(prev_parts[i]);
if (!prev_conversions.empty())
selected_conversions.push_back(prev_conversions[i]);
}
if (!temp_part_uuids.empty())
@ -1706,7 +1679,7 @@ void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter(
};
/// Process parts that have to be read for a query.
auto needs_retry = !select_parts(parts, alter_conversions);
auto needs_retry = !select_parts(parts);
/// If any duplicated part UUIDs met during the first step, try to ignore them in second pass.
/// This may happen when `prefer_localhost_replica` is set and "distributed" stage runs in the same process with "remote" stage.
@ -1717,7 +1690,7 @@ void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter(
counters = PartFilterCounters();
/// Second attempt didn't help, throw an exception
if (!select_parts(parts, alter_conversions))
if (!select_parts(parts))
throw Exception(ErrorCodes::DUPLICATED_PART_UUIDS, "Found duplicate UUIDs while processing query.");
}
}

View File

@ -40,7 +40,7 @@ public:
/// The same as read, but with specified set of parts.
QueryPlanStepPtr readFromParts(
MergeTreeData::DataPartsVector parts,
std::vector<AlterConversionsPtr> alter_conversions,
MergeTreeData::MutationsSnapshotPtr mutations_snapshot,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
const SelectQueryInfo & query_info,
@ -120,7 +120,6 @@ private:
/// as well as `max_block_number_to_read`.
static void selectPartsToRead(
MergeTreeData::DataPartsVector & parts,
std::vector<AlterConversionsPtr> & alter_conversions,
const std::optional<std::unordered_set<String>> & part_values,
const std::optional<KeyCondition> & minmax_idx_condition,
const DataTypes & minmax_columns_types,
@ -131,7 +130,6 @@ private:
/// Same as previous but also skip parts uuids if any to the query context, or skip parts which uuids marked as excluded.
static void selectPartsToReadWithUUIDFilter(
MergeTreeData::DataPartsVector & parts,
std::vector<AlterConversionsPtr> & alter_conversions,
const std::optional<std::unordered_set<String>> & part_values,
MergeTreeData::PinnedPartUUIDsPtr pinned_part_uuids,
const std::optional<KeyCondition> & minmax_idx_condition,
@ -175,10 +173,9 @@ public:
/// Filter parts using minmax index and partition key.
static void filterPartsByPartition(
MergeTreeData::DataPartsVector & parts,
const std::optional<PartitionPruner> & partition_pruner,
const std::optional<KeyCondition> & minmax_idx_condition,
MergeTreeData::DataPartsVector & parts,
std::vector<AlterConversionsPtr> & alter_conversions,
const std::optional<std::unordered_set<String>> & part_values,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeData & data,
@ -192,7 +189,6 @@ public:
/// If 'check_limits = true' it will throw exception if the amount of data exceed the limits from settings.
static RangesInDataParts filterPartsByPrimaryKeyAndSkipIndexes(
MergeTreeData::DataPartsVector && parts,
std::vector<AlterConversionsPtr> && alter_conversions,
StorageMetadataPtr metadata_snapshot,
const ContextPtr & context,
const KeyCondition & key_condition,

View File

@ -6,6 +6,7 @@
#include <IO/ReadBufferFromString.h>
#include <Interpreters/TransactionLog.h>
#include <Backups/BackupEntryFromMemory.h>
#include "Storages/MutationCommands.h"
#include <utility>
@ -50,7 +51,7 @@ UInt64 MergeTreeMutationEntry::parseFileName(const String & file_name_)
MergeTreeMutationEntry::MergeTreeMutationEntry(MutationCommands commands_, DiskPtr disk_, const String & path_prefix_, UInt64 tmp_number,
const TransactionID & tid_, const WriteSettings & settings)
: create_time(time(nullptr))
, commands(std::move(commands_))
, commands(std::make_shared<MutationCommands>(std::move(commands_)))
, disk(std::move(disk_))
, path_prefix(path_prefix_)
, file_name("tmp_mutation_" + toString(tmp_number) + ".txt")
@ -63,7 +64,7 @@ MergeTreeMutationEntry::MergeTreeMutationEntry(MutationCommands commands_, DiskP
*out << "format version: 1\n"
<< "create time: " << LocalDateTime(create_time, DateLUT::serverTimezoneInstance()) << "\n";
*out << "commands: ";
commands.writeText(*out, /* with_pure_metadata_commands = */ false);
commands->writeText(*out, /* with_pure_metadata_commands = */ false);
*out << "\n";
if (tid.isPrehistoric())
{
@ -116,7 +117,8 @@ void MergeTreeMutationEntry::writeCSN(CSN csn_)
}
MergeTreeMutationEntry::MergeTreeMutationEntry(DiskPtr disk_, const String & path_prefix_, const String & file_name_)
: disk(std::move(disk_))
: commands(std::make_shared<MutationCommands>())
, disk(std::move(disk_))
, path_prefix(path_prefix_)
, file_name(file_name_)
, is_temp(false)
@ -133,7 +135,7 @@ MergeTreeMutationEntry::MergeTreeMutationEntry(DiskPtr disk_, const String & pat
create_time_dt.hour(), create_time_dt.minute(), create_time_dt.second());
*buf >> "commands: ";
commands.readText(*buf);
commands->readText(*buf);
*buf >> "\n";
if (buf->eof())
@ -177,7 +179,7 @@ std::shared_ptr<const IBackupEntry> MergeTreeMutationEntry::backup() const
out << "block number: " << block_number << "\n";
out << "commands: ";
commands.writeText(out, /* with_pure_metadata_commands = */ false);
commands->writeText(out, /* with_pure_metadata_commands = */ false);
out << "\n";
return std::make_shared<BackupEntryFromMemory>(out.str());

View File

@ -16,7 +16,7 @@ class IBackupEntry;
struct MergeTreeMutationEntry
{
time_t create_time = 0;
MutationCommands commands;
std::shared_ptr<MutationCommands> commands;
DiskPtr disk;
String path_prefix;

View File

@ -84,6 +84,7 @@ MergeTreeReadTask::Readers MergeTreePrefetchedReadPool::PrefetchedReaders::get()
MergeTreePrefetchedReadPool::MergeTreePrefetchedReadPool(
RangesInDataParts && parts_,
MutationsSnapshotPtr mutations_snapshot_,
VirtualFields shared_virtual_fields_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
@ -94,6 +95,7 @@ MergeTreePrefetchedReadPool::MergeTreePrefetchedReadPool(
const ContextPtr & context_)
: MergeTreeReadPoolBase(
std::move(parts_),
std::move(mutations_snapshot_),
std::move(shared_virtual_fields_),
storage_snapshot_,
prewhere_info_,

View File

@ -19,6 +19,7 @@ class MergeTreePrefetchedReadPool : public MergeTreeReadPoolBase, private WithCo
public:
MergeTreePrefetchedReadPool(
RangesInDataParts && parts_,
MutationsSnapshotPtr mutations_snapshot_,
VirtualFields shared_virtual_fields_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,

View File

@ -35,6 +35,7 @@ size_t getApproxSizeOfPart(const IMergeTreeDataPart & part, const Names & column
MergeTreeReadPool::MergeTreeReadPool(
RangesInDataParts && parts_,
MutationsSnapshotPtr mutations_snapshot_,
VirtualFields shared_virtual_fields_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
@ -45,6 +46,7 @@ MergeTreeReadPool::MergeTreeReadPool(
const ContextPtr & context_)
: MergeTreeReadPoolBase(
std::move(parts_),
std::move(mutations_snapshot_),
std::move(shared_virtual_fields_),
storage_snapshot_,
prewhere_info_,

View File

@ -26,6 +26,7 @@ public:
MergeTreeReadPool(
RangesInDataParts && parts_,
MutationsSnapshotPtr mutations_snapshot_,
VirtualFields shared_virtual_fields_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,

View File

@ -13,6 +13,7 @@ namespace ErrorCodes
MergeTreeReadPoolBase::MergeTreeReadPoolBase(
RangesInDataParts && parts_,
MutationsSnapshotPtr mutations_snapshot_,
VirtualFields shared_virtual_fields_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
@ -22,6 +23,7 @@ MergeTreeReadPoolBase::MergeTreeReadPoolBase(
const PoolSettings & pool_settings_,
const ContextPtr & context_)
: parts_ranges(std::move(parts_))
, mutations_snapshot(std::move(mutations_snapshot_))
, shared_virtual_fields(std::move(shared_virtual_fields_))
, storage_snapshot(storage_snapshot_)
, prewhere_info(prewhere_info_)
@ -67,9 +69,9 @@ void MergeTreeReadPoolBase::fillPerPartInfos()
}
read_task_info.part_index_in_query = part_with_ranges.part_index_in_query;
read_task_info.alter_conversions = part_with_ranges.alter_conversions;
read_task_info.alter_conversions = MergeTreeData::getAlterConversionsForPart(part_with_ranges.data_part, mutations_snapshot);
LoadedMergeTreeDataPartInfoForReader part_info(part_with_ranges.data_part, part_with_ranges.alter_conversions);
LoadedMergeTreeDataPartInfoForReader part_info(part_with_ranges.data_part, read_task_info.alter_conversions);
read_task_info.task_columns = getReadTaskColumns(
part_info,

View File

@ -9,6 +9,8 @@ namespace DB
class MergeTreeReadPoolBase : public IMergeTreeReadPool
{
public:
using MutationsSnapshotPtr = MergeTreeData::MutationsSnapshotPtr;
struct PoolSettings
{
size_t threads = 0;
@ -23,6 +25,7 @@ public:
MergeTreeReadPoolBase(
RangesInDataParts && parts_,
MutationsSnapshotPtr mutations_snapshot_,
VirtualFields shared_virtual_fields_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
@ -37,6 +40,7 @@ public:
protected:
/// Initialized in constructor
const RangesInDataParts parts_ranges;
const MutationsSnapshotPtr mutations_snapshot;
const VirtualFields shared_virtual_fields;
const StorageSnapshotPtr storage_snapshot;
const PrewhereInfoPtr prewhere_info;

View File

@ -12,6 +12,7 @@ MergeTreeReadPoolInOrder::MergeTreeReadPoolInOrder(
bool has_limit_below_one_block_,
MergeTreeReadType read_type_,
RangesInDataParts parts_,
MutationsSnapshotPtr mutations_snapshot_,
VirtualFields shared_virtual_fields_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
@ -22,6 +23,7 @@ MergeTreeReadPoolInOrder::MergeTreeReadPoolInOrder(
const ContextPtr & context_)
: MergeTreeReadPoolBase(
std::move(parts_),
std::move(mutations_snapshot_),
std::move(shared_virtual_fields_),
storage_snapshot_,
prewhere_info_,

View File

@ -11,6 +11,7 @@ public:
bool has_limit_below_one_block_,
MergeTreeReadType read_type_,
RangesInDataParts parts_,
MutationsSnapshotPtr mutations_snapshot_,
VirtualFields shared_virtual_fields_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,

View File

@ -13,6 +13,7 @@ namespace ErrorCodes
MergeTreeReadPoolParallelReplicas::MergeTreeReadPoolParallelReplicas(
ParallelReadingExtension extension_,
RangesInDataParts && parts_,
MutationsSnapshotPtr mutations_snapshot_,
VirtualFields shared_virtual_fields_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
@ -23,6 +24,7 @@ MergeTreeReadPoolParallelReplicas::MergeTreeReadPoolParallelReplicas(
const ContextPtr & context_)
: MergeTreeReadPoolBase(
std::move(parts_),
std::move(mutations_snapshot_),
std::move(shared_virtual_fields_),
storage_snapshot_,
prewhere_info_,

View File

@ -11,6 +11,7 @@ public:
MergeTreeReadPoolParallelReplicas(
ParallelReadingExtension extension_,
RangesInDataParts && parts_,
MutationsSnapshotPtr mutations_snapshot_,
VirtualFields shared_virtual_fields_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,

View File

@ -12,6 +12,7 @@ MergeTreeReadPoolParallelReplicasInOrder::MergeTreeReadPoolParallelReplicasInOrd
ParallelReadingExtension extension_,
CoordinationMode mode_,
RangesInDataParts parts_,
MutationsSnapshotPtr mutations_snapshot_,
VirtualFields shared_virtual_fields_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
@ -22,6 +23,7 @@ MergeTreeReadPoolParallelReplicasInOrder::MergeTreeReadPoolParallelReplicasInOrd
const ContextPtr & context_)
: MergeTreeReadPoolBase(
std::move(parts_),
std::move(mutations_snapshot_),
std::move(shared_virtual_fields_),
storage_snapshot_,
prewhere_info_,

View File

@ -12,6 +12,7 @@ public:
ParallelReadingExtension extension_,
CoordinationMode mode_,
RangesInDataParts parts_,
MutationsSnapshotPtr mutations_snapshot_,
VirtualFields shared_virtual_fields_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,

View File

@ -13,6 +13,7 @@
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Common/logger_useful.h>
#include "Storages/MergeTree/AlterConversions.h"
#include <Processors/Merges/Algorithms/MergeTreePartLevelInfo.h>
namespace DB
@ -38,6 +39,7 @@ public:
const MergeTreeData & storage_,
const StorageSnapshotPtr & storage_snapshot_,
MergeTreeData::DataPartPtr data_part_,
AlterConversionsPtr alter_conversions_,
Names columns_to_read_,
std::optional<MarkRanges> mark_ranges_,
bool apply_deleted_mask,
@ -62,6 +64,9 @@ private:
/// Data part will not be removed if the pointer owns it
MergeTreeData::DataPartPtr data_part;
/// TODO: comment.
AlterConversionsPtr alter_conversions;
/// Columns we have to read (each Block from read will contain them)
Names columns_to_read;
@ -91,6 +96,7 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
const MergeTreeData & storage_,
const StorageSnapshotPtr & storage_snapshot_,
MergeTreeData::DataPartPtr data_part_,
AlterConversionsPtr alter_conversions_,
Names columns_to_read_,
std::optional<MarkRanges> mark_ranges_,
bool apply_deleted_mask,
@ -100,6 +106,7 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
, storage(storage_)
, storage_snapshot(storage_snapshot_)
, data_part(std::move(data_part_))
, alter_conversions(std::move(alter_conversions_))
, columns_to_read(std::move(columns_to_read_))
, read_with_direct_io(read_with_direct_io_)
, mark_ranges(std::move(mark_ranges_))
@ -113,8 +120,6 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
LOG_DEBUG(log, "Reading {} marks from part {}, total {} rows starting from the beginning of the part",
data_part->getMarksCount(), data_part->name, data_part->rows_count);
auto alter_conversions = storage.getAlterConversionsForPart(data_part);
/// Note, that we don't check setting collaborate_with_coordinator presence, because this source
/// is only used in background merges.
addTotalRowsApprox(data_part->rows_count);
@ -300,6 +305,7 @@ Pipe createMergeTreeSequentialSource(
const MergeTreeData & storage,
const StorageSnapshotPtr & storage_snapshot,
MergeTreeData::DataPartPtr data_part,
AlterConversionsPtr alter_conversions,
Names columns_to_read,
std::optional<MarkRanges> mark_ranges,
std::shared_ptr<std::atomic<size_t>> filtered_rows_count,
@ -316,7 +322,8 @@ Pipe createMergeTreeSequentialSource(
columns_to_read.emplace_back(RowExistsColumn::name);
auto column_part_source = std::make_shared<MergeTreeSequentialSource>(type,
storage, storage_snapshot, data_part, columns_to_read, std::move(mark_ranges),
storage, storage_snapshot, data_part, alter_conversions,
columns_to_read, std::move(mark_ranges),
/*apply_deleted_mask=*/ false, read_with_direct_io, prefetch);
Pipe pipe(std::move(column_part_source));
@ -347,6 +354,7 @@ public:
const MergeTreeData & storage_,
const StorageSnapshotPtr & storage_snapshot_,
MergeTreeData::DataPartPtr data_part_,
AlterConversionsPtr alter_conversions_,
Names columns_to_read_,
bool apply_deleted_mask_,
ActionsDAGPtr filter_,
@ -357,6 +365,7 @@ public:
, storage(storage_)
, storage_snapshot(storage_snapshot_)
, data_part(std::move(data_part_))
, alter_conversions(std::move(alter_conversions_))
, columns_to_read(std::move(columns_to_read_))
, apply_deleted_mask(apply_deleted_mask_)
, filter(std::move(filter_))
@ -400,6 +409,7 @@ public:
storage,
storage_snapshot,
data_part,
alter_conversions,
columns_to_read,
std::move(mark_ranges),
/*filtered_rows_count=*/ nullptr,
@ -415,6 +425,7 @@ private:
const MergeTreeData & storage;
StorageSnapshotPtr storage_snapshot;
MergeTreeData::DataPartPtr data_part;
AlterConversionsPtr alter_conversions;
Names columns_to_read;
bool apply_deleted_mask;
ActionsDAGPtr filter;
@ -428,6 +439,7 @@ void createReadFromPartStep(
const MergeTreeData & storage,
const StorageSnapshotPtr & storage_snapshot,
MergeTreeData::DataPartPtr data_part,
AlterConversionsPtr alter_conversions,
Names columns_to_read,
bool apply_deleted_mask,
ActionsDAGPtr filter,
@ -435,7 +447,8 @@ void createReadFromPartStep(
LoggerPtr log)
{
auto reading = std::make_unique<ReadFromPart>(type,
storage, storage_snapshot, std::move(data_part),
storage, storage_snapshot,
std::move(data_part), std::move(alter_conversions),
std::move(columns_to_read), apply_deleted_mask,
filter, std::move(context), log);

View File

@ -21,6 +21,7 @@ Pipe createMergeTreeSequentialSource(
const MergeTreeData & storage,
const StorageSnapshotPtr & storage_snapshot,
MergeTreeData::DataPartPtr data_part,
AlterConversionsPtr alter_conversions,
Names columns_to_read,
std::optional<MarkRanges> mark_ranges,
std::shared_ptr<std::atomic<size_t>> filtered_rows_count,
@ -36,6 +37,7 @@ void createReadFromPartStep(
const MergeTreeData & storage,
const StorageSnapshotPtr & storage_snapshot,
MergeTreeData::DataPartPtr data_part,
AlterConversionsPtr alter_conversions,
Names columns_to_read,
bool apply_deleted_mask,
ActionsDAGPtr filter,

View File

@ -29,6 +29,7 @@
#include <DataTypes/DataTypeVariant.h>
#include <boost/algorithm/string/replace.hpp>
#include <Common/ProfileEventsScope.h>
#include "Storages/MergeTree/AlterConversions.h"
#include <Core/ColumnsWithTypeAndName.h>
@ -104,6 +105,7 @@ static UInt64 getExistingRowsCount(const Block & block)
static void splitAndModifyMutationCommands(
MergeTreeData::DataPartPtr part,
StorageMetadataPtr metadata_snapshot,
AlterConversionsPtr alter_conversions,
const MutationCommands & commands,
MutationCommands & for_interpreter,
MutationCommands & for_file_renames,
@ -169,8 +171,6 @@ static void splitAndModifyMutationCommands(
}
auto alter_conversions = part->storage.getAlterConversionsForPart(part);
/// We don't add renames from commands, instead we take them from rename_map.
/// It's important because required renames depend not only on part's data version (i.e. mutation version)
/// but also on part's metadata version. Why we have such logic only for renames? Because all other types of alter
@ -286,7 +286,6 @@ static void splitAndModifyMutationCommands(
}
}
auto alter_conversions = part->storage.getAlterConversionsForPart(part);
/// We don't add renames from commands, instead we take them from rename_map.
/// It's important because required renames depend not only on part's data version (i.e. mutation version)
/// but also on part's metadata version. Why we have such logic only for renames? Because all other types of alter
@ -2119,6 +2118,14 @@ bool MutateTask::prepare()
ctx->num_mutations = std::make_unique<CurrentMetrics::Increment>(CurrentMetrics::PartMutation);
auto mutations_snapshot = ctx->data->getMutationsSnapshot(
ctx->metadata_snapshot->getMetadataVersion(),
/*need_data_mutations=*/ false);
auto alter_conversions = MergeTreeData::getAlterConversionsForPart(
ctx->source_part,
mutations_snapshot);
auto context_for_reading = Context::createCopy(ctx->context);
/// Allow mutations to work when force_index_by_date or force_primary_key is on.
@ -2133,7 +2140,7 @@ bool MutateTask::prepare()
ctx->commands_for_part.emplace_back(command);
if (ctx->source_part->isStoredOnDisk() && !isStorageTouchedByMutations(
*ctx->data, ctx->source_part, ctx->metadata_snapshot, ctx->commands_for_part, context_for_reading))
*ctx->data, ctx->source_part, mutations_snapshot, ctx->metadata_snapshot, ctx->commands_for_part, context_for_reading))
{
NameSet files_to_copy_instead_of_hardlinks;
auto settings_ptr = ctx->data->getSettings();
@ -2192,8 +2199,13 @@ bool MutateTask::prepare()
context_for_reading->setSetting("read_from_filesystem_cache_if_exists_otherwise_bypass_cache", 1);
MutationHelpers::splitAndModifyMutationCommands(
ctx->source_part, ctx->metadata_snapshot,
ctx->commands_for_part, ctx->for_interpreter, ctx->for_file_renames, ctx->log);
ctx->source_part,
ctx->metadata_snapshot,
alter_conversions,
ctx->commands_for_part,
ctx->for_interpreter,
ctx->for_file_renames,
ctx->log);
ctx->stage_progress = std::make_unique<MergeStageProgress>(1.0);
@ -2205,7 +2217,8 @@ bool MutateTask::prepare()
settings.apply_deleted_mask = false;
ctx->interpreter = std::make_unique<MutationsInterpreter>(
*ctx->data, ctx->source_part, ctx->metadata_snapshot, ctx->for_interpreter,
*ctx->data, ctx->source_part, alter_conversions,
ctx->metadata_snapshot, ctx->for_interpreter,
ctx->metadata_snapshot->getColumns().getNamesOfPhysical(), context_for_reading, settings);
ctx->materialized_indices = ctx->interpreter->grabMaterializedIndices();

View File

@ -42,7 +42,6 @@ struct RangesInDataPartsDescription: public std::deque<RangesInDataPartDescripti
struct RangesInDataPart
{
DataPartPtr data_part;
AlterConversionsPtr alter_conversions;
size_t part_index_in_query;
MarkRanges ranges;
MarkRanges exact_ranges;
@ -51,14 +50,13 @@ struct RangesInDataPart
RangesInDataPart(
const DataPartPtr & data_part_,
const AlterConversionsPtr & alter_conversions_,
const size_t part_index_in_query_,
const MarkRanges & ranges_ = MarkRanges{})
: data_part{data_part_}
, alter_conversions{alter_conversions_}
, part_index_in_query{part_index_in_query_}
, ranges{ranges_}
{}
{
}
RangesInDataPartDescription getDescription() const;

View File

@ -135,7 +135,6 @@ struct ReplicatedMergeTreeLogEntryData
int alter_version = -1; /// May be equal to -1, if it's normal mutation, not metadata update.
/// only ALTER METADATA command
/// NOTE It's never used
bool have_mutation = false; /// If this alter requires additional mutation step, for data update
String columns_str; /// New columns data corresponding to alter_version

View File

@ -13,6 +13,8 @@
#include <base/defines.h>
#include <Parsers/formatAST.h>
#include <base/sort.h>
#include <algorithm>
#include <mutex>
#include <ranges>
#include <Poco/Timestamp.h>
@ -949,7 +951,7 @@ int32_t ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper
{
const auto commands = entry.commands;
it = mutations_by_znode.erase(it);
updateAlterConversionsMutations(commands, alter_conversions_mutations, /* remove= */ true);
decrementMutationsCounters(data_mutations_to_apply, metadata_mutations_to_apply, commands, lock);
}
else
it = mutations_by_znode.erase(it);
@ -1001,7 +1003,7 @@ int32_t ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper
auto & mutation = mutations_by_znode.emplace(entry->znode_name, MutationStatus(entry, format_version))
.first->second;
updateAlterConversionsMutations(entry->commands, alter_conversions_mutations, /* remove= */ false);
incrementMutationsCounters(data_mutations_to_apply, metadata_mutations_to_apply, entry->commands, lock);
NOEXCEPT_SCOPE({
for (const auto & pair : entry->block_numbers)
{
@ -1075,7 +1077,7 @@ ReplicatedMergeTreeMutationEntryPtr ReplicatedMergeTreeQueue::removeMutation(
}
mutations_by_znode.erase(it);
/// updateAlterConversionsMutations() will be called in updateMutations()
/// decrementMutationsCounters() will be called in updateMutations()
LOG_DEBUG(log, "Removed mutation {} from local state.", entry->znode_name);
}
@ -1899,25 +1901,15 @@ ReplicatedMergeTreeMergePredicate ReplicatedMergeTreeQueue::getMergePredicate(zk
return ReplicatedMergeTreeMergePredicate(*this, zookeeper, std::move(partition_ids_hint));
}
MutationCommands ReplicatedMergeTreeQueue::getAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const
MutationCommands ReplicatedMergeTreeQueue::MutationsSnapshot::getAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const
{
int32_t part_metadata_version = part->getMetadataVersion();
int32_t metadata_version = storage.getInMemoryMetadataPtr()->getMetadataVersion();
chassert(alter_conversions_mutations >= 0);
/// NOTE: that just checking part_metadata_version is not enough, since we
/// need to check for non-metadata mutations as well.
if (alter_conversions_mutations == 0 && metadata_version == part_metadata_version)
return {};
std::unique_lock lock(state_mutex);
auto in_partition = mutations_by_partition.find(part->info.partition_id);
if (in_partition == mutations_by_partition.end())
return {};
Int64 part_data_version = part->info.getDataVersion();
int32_t part_metadata_version = part->getMetadataVersion();
MutationCommands result;
bool seen_all_data_mutations = false;
@ -1926,20 +1918,22 @@ MutationCommands ReplicatedMergeTreeQueue::getAlterMutationCommandsForPart(const
auto add_to_result = [&](const ReplicatedMergeTreeMutationEntryPtr & entry)
{
for (const auto & command : entry->commands | std::views::reverse)
if (AlterConversions::supportsMutationCommandType(command.type))
result.emplace_back(command);
{
if (need_data_mutations && AlterConversions::isSupportedDataMutation(command.type))
result.push_back(command);
else if (AlterConversions::isSupportedMetadataMutation(command.type))
result.push_back(command);
}
};
/// Here we return mutation commands for part which has bigger alter version than part metadata version.
/// Please note, we don't use getDataVersion(). It's because these alter commands are used for in-fly conversions
/// of part's metadata.
for (const auto & [mutation_version, mutation_status] : in_partition->second | std::views::reverse)
for (const auto & [mutation_version, entry] : in_partition->second | std::views::reverse)
{
if (seen_all_data_mutations && seen_all_metadata_mutations)
break;
auto & entry = mutation_status->entry;
auto alter_version = entry->alter_version;
if (alter_version != -1)
{
@ -1964,6 +1958,48 @@ MutationCommands ReplicatedMergeTreeQueue::getAlterMutationCommandsForPart(const
return result;
}
MergeTreeData::MutationsSnapshotPtr ReplicatedMergeTreeQueue::getMutationsSnapshot(Int64 metadata_version, bool need_data_mutations) const
{
auto res = std::make_shared<MutationsSnapshot>();
res->metadata_version = metadata_version;
res->need_data_mutations = need_data_mutations;
std::lock_guard lock(state_mutex);
bool have_data_mutations = res->need_data_mutations && data_mutations_to_apply > 0;
bool have_metadata_mutations = metadata_mutations_to_apply > 0;
if (!have_data_mutations && !have_metadata_mutations)
return res;
for (const auto & [partition_id, mutations] : mutations_by_partition)
{
auto & in_partition = res->mutations_by_partition[partition_id];
for (const auto & [version, status] : mutations | std::views::reverse)
{
if (status->is_done)
break;
bool has_required_command = std::ranges::any_of(status->entry->commands, [&](const auto & command)
{
if (have_data_mutations && AlterConversions::isSupportedDataMutation(command.type))
return true;
if (have_metadata_mutations && AlterConversions::isSupportedMetadataMutation(command.type))
return true;
return false;
});
if (has_required_command)
in_partition.emplace(version, status->entry);
}
}
return res;
}
MutationCommands ReplicatedMergeTreeQueue::getMutationCommands(
const MergeTreeData::DataPartPtr & part, Int64 desired_mutation_version, Strings & mutation_ids) const
{
@ -2044,7 +2080,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
mutation.parts_to_do.clear();
}
updateAlterConversionsMutations(mutation.entry->commands, alter_conversions_mutations, /* remove= */ true);
decrementMutationsCounters(data_mutations_to_apply, metadata_mutations_to_apply, mutation.entry->commands, lock);
}
else if (mutation.parts_to_do.size() == 0)
{
@ -2101,7 +2137,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
LOG_TRACE(log, "Finishing data alter with version {} for entry {}", entry->alter_version, entry->znode_name);
alter_sequence.finishDataAlter(entry->alter_version, lock);
}
updateAlterConversionsMutations(entry->commands, alter_conversions_mutations, /* remove= */ true);
decrementMutationsCounters(data_mutations_to_apply, metadata_mutations_to_apply, entry->commands, lock);
}
}
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <cstdint>
#include <optional>
#include <Common/ActionBlocker.h>
@ -151,8 +152,11 @@ private:
/// Mapping from znode path to Mutations Status
std::map<String, MutationStatus> mutations_by_znode;
/// Unfinished mutations that is required AlterConversions (see getAlterMutationCommandsForPart())
std::atomic<ssize_t> alter_conversions_mutations = 0;
/// Unfinished mutations that are required for AlterConversions.
Int64 data_mutations_to_apply = 0;
Int64 metadata_mutations_to_apply = 0;
/// Partition -> (block_number -> MutationStatus)
std::unordered_map<String, std::map<Int64, MutationStatus *>> mutations_by_partition;
/// Znode ID of the latest mutation that is done.
@ -409,10 +413,24 @@ public:
MutationCommands getMutationCommands(const MergeTreeData::DataPartPtr & part, Int64 desired_mutation_version,
Strings & mutation_ids) const;
struct MutationsSnapshot : public MergeTreeData::IMutationsSnapshot
{
MutationsSnapshot() = default;
Int64 metadata_version = -1;
bool need_data_mutations = false;
using MutationsByPartititon = std::unordered_map<String, std::map<Int64, ReplicatedMergeTreeMutationEntryPtr>>;
MutationsByPartititon mutations_by_partition;
MutationCommands getAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const override;
std::shared_ptr<MergeTreeData::IMutationsSnapshot> cloneEmpty() const override { return std::make_shared<MutationsSnapshot>(); }
};
/// Return mutation commands for part which could be not applied to
/// it according to part mutation version. Used when we apply alter commands on fly,
/// without actual data modification on disk.
MutationCommands getAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const;
MergeTreeData::MutationsSnapshotPtr getMutationsSnapshot(Int64 metadata_version, bool need_data_mutations) const;
/// Mark finished mutations as done. If the function needs to be called again at some later time
/// (because some mutations are probably done but we are not sure yet), returns true.

View File

@ -26,10 +26,12 @@ class StorageFromMergeTreeDataPart final : public IStorage
{
public:
/// Used in part mutation.
explicit StorageFromMergeTreeDataPart(const MergeTreeData::DataPartPtr & part_)
explicit StorageFromMergeTreeDataPart(
const MergeTreeData::DataPartPtr & part_,
const MergeTreeData::MutationsSnapshotPtr & mutations_snapshot_)
: IStorage(getIDFromPart(part_))
, parts({part_})
, alter_conversions({part_->storage.getAlterConversionsForPart(part_)})
, mutations_snapshot(mutations_snapshot_)
, storage(part_->storage)
, partition_id(part_->info.partition_id)
{
@ -71,10 +73,11 @@ public:
size_t max_block_size,
size_t num_streams) override
{
/// TODO: fix
query_plan.addStep(MergeTreeDataSelectExecutor(storage)
.readFromParts(
parts,
alter_conversions,
mutations_snapshot,
column_names,
storage_snapshot,
query_info,
@ -121,7 +124,7 @@ public:
private:
const MergeTreeData::DataPartsVector parts;
const std::vector<AlterConversionsPtr> alter_conversions;
const MergeTreeData::MutationsSnapshotPtr mutations_snapshot;
const MergeTreeData & storage;
const String partition_id;
const ReadFromMergeTree::AnalysisResultPtr analysis_result_ptr;

View File

@ -498,18 +498,11 @@ Int64 StorageMergeTree::startMutation(const MutationCommands & commands, Context
if (txn)
txn->addMutation(shared_from_this(), mutation_id);
bool alter_conversions_mutations_updated = updateAlterConversionsMutations(entry.commands, alter_conversions_mutations, /* remove= */ false);
bool inserted = current_mutations_by_version.try_emplace(version, std::move(entry)).second;
auto [it, inserted] = current_mutations_by_version.try_emplace(version, std::move(entry));
if (!inserted)
{
if (alter_conversions_mutations_updated)
{
--alter_conversions_mutations;
chassert(alter_conversions_mutations >= 0);
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mutation {} already exists, it's a bug", version);
}
incrementMutationsCounters(data_mutations_to_apply, metadata_mutations_to_apply, *it->second.commands, lock);
LOG_INFO(log, "Added mutation: {}{}", mutation_id, additional_info);
}
background_operations_assignee.trigger();
@ -545,7 +538,7 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re
if (static_cast<UInt64>(result_part->part_info.mutation) == it->first)
mutation_backoff_policy.removePartFromFailed(failed_part->name);
updateAlterConversionsMutations(it->second.commands, alter_conversions_mutations, /* remove= */ true);
decrementMutationsCounters(data_mutations_to_apply, metadata_mutations_to_apply, *entry.commands, lock);
}
}
else
@ -744,17 +737,15 @@ std::map<std::string, MutationCommands> StorageMergeTree::getUnfinishedMutationC
std::map<std::string, MutationCommands> result;
for (const auto & kv : current_mutations_by_version)
for (const auto & [mutation_version, entry] : current_mutations_by_version)
{
Int64 mutation_version = kv.first;
const MergeTreeMutationEntry & entry = kv.second;
const PartVersionWithName needle{mutation_version, ""};
const PartVersionWithName needle{static_cast<Int64>(mutation_version), ""};
auto versions_it = std::lower_bound(
part_versions_with_names.begin(), part_versions_with_names.end(), needle, comparator);
size_t parts_to_do = versions_it - part_versions_with_names.begin();
if (parts_to_do > 0)
result.emplace(entry.file_name, entry.commands);
result.emplace(entry.file_name, *entry.commands);
}
return result;
}
@ -787,7 +778,7 @@ std::vector<MergeTreeMutationStatus> StorageMergeTree::getMutationsStatus() cons
std::map<String, Int64> block_numbers_map({{"", entry.block_number}});
for (const MutationCommand & command : entry.commands)
for (const MutationCommand & command : *entry.commands)
{
WriteBufferFromOwnString buf;
formatAST(*command.ast, buf, false, true);
@ -824,20 +815,15 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
auto it = current_mutations_by_version.find(mutation_version);
if (it != current_mutations_by_version.end())
{
bool mutation_finished = true;
if (std::optional<Int64> min_version = getMinPartDataVersion())
mutation_finished = *min_version > static_cast<Int64>(mutation_version);
{
bool mutation_finished = *min_version > static_cast<Int64>(mutation_version);
if (!mutation_finished)
decrementMutationsCounters(data_mutations_to_apply, metadata_mutations_to_apply, *it->second.commands, lock);
}
to_kill.emplace(std::move(it->second));
if (!mutation_finished)
{
const auto commands = it->second.commands;
current_mutations_by_version.erase(it);
updateAlterConversionsMutations(commands, alter_conversions_mutations, /* remove= */ true);
}
else
current_mutations_by_version.erase(it);
current_mutations_by_version.erase(it);
}
}
@ -885,6 +871,8 @@ void StorageMergeTree::loadDeduplicationLog()
void StorageMergeTree::loadMutations()
{
std::lock_guard lock(currently_processing_in_background_mutex);
for (const auto & disk : getDisks())
{
for (auto it = disk->iterateDirectory(relative_data_path); it->isValid(); it->next())
@ -893,7 +881,7 @@ void StorageMergeTree::loadMutations()
{
MergeTreeMutationEntry entry(disk, relative_data_path, it->name());
UInt64 block_number = entry.block_number;
LOG_DEBUG(log, "Loading mutation: {} entry, commands size: {}", it->name(), entry.commands.size());
LOG_DEBUG(log, "Loading mutation: {} entry, commands size: {}", it->name(), entry.commands->size());
if (!entry.tid.isPrehistoric() && !entry.csn)
{
@ -912,10 +900,11 @@ void StorageMergeTree::loadMutations()
}
}
auto inserted = current_mutations_by_version.try_emplace(block_number, std::move(entry)).second;
auto [entry_it, inserted] = current_mutations_by_version.try_emplace(block_number, std::move(entry));
if (!inserted)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mutation {} already exists, it's a bug", block_number);
updateAlterConversionsMutations(entry.commands, alter_conversions_mutations, /* remove= */ false);
incrementMutationsCounters(data_mutations_to_apply, metadata_mutations_to_apply, *entry_it->second.commands, lock);
}
else if (startsWith(it->name(), "tmp_mutation_"))
{
@ -1264,7 +1253,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
size_t commands_size = 0;
MutationCommands commands_for_size_validation;
for (const auto & command : it->second.commands)
for (const auto & command : *it->second.commands)
{
if (command.type != MutationCommand::Type::DROP_COLUMN
&& command.type != MutationCommand::Type::DROP_INDEX
@ -1308,11 +1297,11 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
const auto & single_mutation_commands = it->second.commands;
if (single_mutation_commands.containBarrierCommand())
if (single_mutation_commands->containBarrierCommand())
{
if (commands->empty())
{
commands->insert(commands->end(), single_mutation_commands.begin(), single_mutation_commands.end());
commands->insert(commands->end(), single_mutation_commands->begin(), single_mutation_commands->end());
last_mutation_to_apply = it;
}
break;
@ -1320,7 +1309,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
else
{
current_ast_elements += commands_size;
commands->insert(commands->end(), single_mutation_commands.begin(), single_mutation_commands.end());
commands->insert(commands->end(), single_mutation_commands->begin(), single_mutation_commands->end());
last_mutation_to_apply = it;
}
@ -2431,34 +2420,62 @@ void StorageMergeTree::attachRestoredParts(MutableDataPartsVector && parts)
}
}
MutationCommands StorageMergeTree::getAlterMutationCommandsForPart(const DataPartPtr & part) const
MutationCommands StorageMergeTree::MutationsSnapshot::getAlterMutationCommandsForPart(const DataPartPtr & part) const
{
/// NOTE: there is no need to check part metadata_version, since
/// ALTER_METADATA cannot be done asynchronously, like in
/// ReplicatedMergeTree.
chassert(alter_conversions_mutations >= 0);
if (alter_conversions_mutations == 0)
return {};
std::lock_guard lock(currently_processing_in_background_mutex);
UInt64 part_data_version = part->info.getDataVersion();
MutationCommands result;
UInt64 part_data_version = part->info.getDataVersion();
for (const auto & [mutation_version, entry] : current_mutations_by_version | std::views::reverse)
for (const auto & [mutation_version, commands] : mutations_by_version | std::views::reverse)
{
if (mutation_version <= part_data_version)
break;
for (const auto & command : entry.commands | std::views::reverse)
if (AlterConversions::supportsMutationCommandType(command.type))
result.emplace_back(command);
for (const auto & command : *commands | std::views::reverse)
{
if (need_data_mutations && AlterConversions::isSupportedDataMutation(command.type))
result.push_back(command);
else if (AlterConversions::isSupportedMetadataMutation(command.type))
result.push_back(command);
}
}
return result;
}
MergeTreeData::MutationsSnapshotPtr StorageMergeTree::getMutationsSnapshot(Int64 metadata_version, bool need_data_mutations) const
{
auto res = std::make_shared<MutationsSnapshot>();
res->metadata_version = metadata_version;
res->need_data_mutations = need_data_mutations;
std::lock_guard lock(currently_processing_in_background_mutex);
bool have_data_mutations = res->need_data_mutations && data_mutations_to_apply > 0;
bool have_metadata_mutations = metadata_mutations_to_apply > 0;
if (!have_data_mutations && !have_metadata_mutations)
return res;
for (const auto & [version, entry] : current_mutations_by_version)
{
bool has_required_command = std::ranges::any_of(*entry.commands, [&](const auto & command)
{
if (have_data_mutations && AlterConversions::isSupportedDataMutation(command.type))
return true;
if (have_metadata_mutations && AlterConversions::isSupportedMetadataMutation(command.type))
return true;
return false;
});
if (has_required_command)
res->mutations_by_version.emplace(version, entry.commands);
}
return res;
}
void StorageMergeTree::startBackgroundMovesIfNeeded()
{
if (areBackgroundMovesNeeded())

View File

@ -17,6 +17,7 @@
#include <Disks/StoragePolicy.h>
#include <Common/SimpleIncrement.h>
#include "Storages/MutationCommands.h"
namespace DB
@ -147,8 +148,10 @@ private:
DataParts currently_merging_mutating_parts;
std::map<UInt64, MergeTreeMutationEntry> current_mutations_by_version;
/// Unfinished mutations that is required AlterConversions (see getAlterMutationCommandsForPart())
std::atomic<ssize_t> alter_conversions_mutations = 0;
/// Unfinished mutations that are required for AlterConversions.
Int64 data_mutations_to_apply = 0;
Int64 metadata_mutations_to_apply = 0;
std::atomic<bool> shutdown_called {false};
std::atomic<bool> flush_called {false};
@ -309,8 +312,21 @@ private:
ContextPtr context;
};
protected:
MutationCommands getAlterMutationCommandsForPart(const DataPartPtr & part) const override;
struct MutationsSnapshot : public IMutationsSnapshot
{
MutationsSnapshot() = default;
Int64 metadata_version = -1;
bool need_data_mutations = false;
using MutationsByVersion = std::map<UInt64, std::shared_ptr<const MutationCommands>>;
MutationsByVersion mutations_by_version;
MutationCommands getAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const override;
std::shared_ptr<MergeTreeData::IMutationsSnapshot> cloneEmpty() const override { return std::make_shared<MutationsSnapshot>(); }
};
MutationsSnapshotPtr getMutationsSnapshot(Int64 metadata_version, bool need_data_mutations) const override;
};
}

View File

@ -9149,13 +9149,11 @@ bool StorageReplicatedMergeTree::canUseAdaptiveGranularity() const
(!has_non_adaptive_index_granularity_parts && !other_replicas_fixed_granularity));
}
MutationCommands StorageReplicatedMergeTree::getAlterMutationCommandsForPart(const DataPartPtr & part) const
MergeTreeData::MutationsSnapshotPtr StorageReplicatedMergeTree::getMutationsSnapshot(Int64 metadata_version, bool need_data_mutations) const
{
return queue.getAlterMutationCommandsForPart(part);
return queue.getMutationsSnapshot(metadata_version, need_data_mutations);
}
void StorageReplicatedMergeTree::startBackgroundMovesIfNeeded()
{
if (areBackgroundMovesNeeded())

View File

@ -932,7 +932,7 @@ private:
void waitMutationToFinishOnReplicas(
const Strings & replicas, const String & mutation_id) const;
MutationCommands getAlterMutationCommandsForPart(const DataPartPtr & part) const override;
MutationsSnapshotPtr getMutationsSnapshot(Int64 metadata_version, bool need_data_mutations) const override;
void startBackgroundMovesIfNeeded() override;