mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
better on-fly mutations
This commit is contained in:
parent
21f7e0788c
commit
0a54ba6575
@ -243,6 +243,7 @@ bool analyzeProjectionCandidate(
|
||||
|
||||
auto projection_result_ptr = reader.estimateNumMarksToRead(
|
||||
std::move(projection_parts),
|
||||
reading.getMutationsSnapshot()->cloneEmpty(),
|
||||
required_column_names,
|
||||
candidate.projection->metadata,
|
||||
projection_query_info,
|
||||
|
@ -1373,6 +1373,7 @@ ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(Merge
|
||||
{
|
||||
return selectRangesToRead(
|
||||
std::move(parts),
|
||||
mutations_snapshot,
|
||||
metadata_for_reading,
|
||||
query_info,
|
||||
context,
|
||||
@ -1390,9 +1391,11 @@ static void buildIndexes(
|
||||
const ActionsDAG * filter_actions_dag,
|
||||
const MergeTreeData & data,
|
||||
const MergeTreeData::DataPartsVector & parts,
|
||||
const MergeTreeData::MutationsSnapshotPtr & mutations_snapshot,
|
||||
const ContextPtr & context,
|
||||
const SelectQueryInfo & query_info,
|
||||
const StorageMetadataPtr & metadata_snapshot)
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const LoggerPtr & log)
|
||||
{
|
||||
indexes.reset();
|
||||
|
||||
@ -1418,19 +1421,21 @@ static void buildIndexes(
|
||||
indexes->partition_pruner.emplace(metadata_snapshot, filter_actions_dag, context, false /* strict */);
|
||||
}
|
||||
|
||||
indexes->part_values
|
||||
= MergeTreeDataSelectExecutor::filterPartsByVirtualColumns(metadata_snapshot, data, parts, filter_actions_dag, context);
|
||||
indexes->part_values = MergeTreeDataSelectExecutor::filterPartsByVirtualColumns(metadata_snapshot, data, parts, filter_actions_dag, context);
|
||||
MergeTreeDataSelectExecutor::buildKeyConditionFromPartOffset(indexes->part_offset_condition, filter_actions_dag, context);
|
||||
|
||||
indexes->use_skip_indexes = settings.use_skip_indexes;
|
||||
bool final = query_info.isFinal();
|
||||
|
||||
if (final && !settings.use_skip_indexes_if_final)
|
||||
if (query_info.isFinal() && !settings.use_skip_indexes_if_final)
|
||||
indexes->use_skip_indexes = false;
|
||||
|
||||
if (!indexes->use_skip_indexes)
|
||||
return;
|
||||
|
||||
const auto & all_indexes = metadata_snapshot->getSecondaryIndices();
|
||||
|
||||
if (all_indexes.empty())
|
||||
return;
|
||||
|
||||
std::unordered_set<std::string> ignored_index_names;
|
||||
|
||||
if (settings.ignore_data_skipping_indices.changed)
|
||||
@ -1455,49 +1460,68 @@ static void buildIndexes(
|
||||
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Cannot parse ignore_data_skipping_indices ('{}')", indices);
|
||||
}
|
||||
|
||||
auto all_updated_columns = mutations_snapshot->getAllUpdatedColumns();
|
||||
|
||||
UsefulSkipIndexes skip_indexes;
|
||||
using Key = std::pair<String, size_t>;
|
||||
std::map<Key, size_t> merged;
|
||||
|
||||
for (const auto & index : metadata_snapshot->getSecondaryIndices())
|
||||
for (const auto & index : all_indexes)
|
||||
{
|
||||
if (!ignored_index_names.contains(index.name))
|
||||
if (ignored_index_names.contains(index.name))
|
||||
continue;
|
||||
|
||||
auto index_helper = MergeTreeIndexFactory::instance().get(index);
|
||||
|
||||
if (!all_updated_columns.empty())
|
||||
{
|
||||
auto index_helper = MergeTreeIndexFactory::instance().get(index);
|
||||
if (index_helper->isMergeable())
|
||||
auto required_columns = index_helper->getColumnsRequiredForIndexCalc();
|
||||
auto it = std::ranges::find_if(required_columns, [&](const auto & column_name)
|
||||
{
|
||||
auto [it, inserted] = merged.emplace(Key{index_helper->index.type, index_helper->getGranularity()}, skip_indexes.merged_indices.size());
|
||||
if (inserted)
|
||||
{
|
||||
skip_indexes.merged_indices.emplace_back();
|
||||
skip_indexes.merged_indices.back().condition = index_helper->createIndexMergedCondition(query_info, metadata_snapshot);
|
||||
}
|
||||
return all_updated_columns.contains(column_name);
|
||||
});
|
||||
|
||||
skip_indexes.merged_indices[it->second].addIndex(index_helper);
|
||||
}
|
||||
else
|
||||
if (it != required_columns.end())
|
||||
{
|
||||
MergeTreeIndexConditionPtr condition;
|
||||
if (index_helper->isVectorSearch())
|
||||
{
|
||||
#ifdef ENABLE_ANNOY
|
||||
if (const auto * annoy = typeid_cast<const MergeTreeIndexAnnoy *>(index_helper.get()))
|
||||
condition = annoy->createIndexCondition(query_info, context);
|
||||
#endif
|
||||
#ifdef ENABLE_USEARCH
|
||||
if (const auto * usearch = typeid_cast<const MergeTreeIndexUSearch *>(index_helper.get()))
|
||||
condition = usearch->createIndexCondition(query_info, context);
|
||||
#endif
|
||||
if (!condition)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown vector search index {}", index_helper->index.name);
|
||||
}
|
||||
else
|
||||
condition = index_helper->createIndexCondition(filter_actions_dag, context);
|
||||
|
||||
if (!condition->alwaysUnknownOrTrue())
|
||||
skip_indexes.useful_indices.emplace_back(index_helper, condition);
|
||||
LOG_TRACE(log, "Index {} is not used because it depends on column {} which will be updated on fly", index.name, *it);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (index_helper->isMergeable())
|
||||
{
|
||||
auto [it, inserted] = merged.emplace(Key{index_helper->index.type, index_helper->getGranularity()}, skip_indexes.merged_indices.size());
|
||||
if (inserted)
|
||||
{
|
||||
skip_indexes.merged_indices.emplace_back();
|
||||
skip_indexes.merged_indices.back().condition = index_helper->createIndexMergedCondition(query_info, metadata_snapshot);
|
||||
}
|
||||
|
||||
skip_indexes.merged_indices[it->second].addIndex(index_helper);
|
||||
continue;
|
||||
}
|
||||
|
||||
MergeTreeIndexConditionPtr condition;
|
||||
if (index_helper->isVectorSearch())
|
||||
{
|
||||
#ifdef ENABLE_ANNOY
|
||||
if (const auto * annoy = typeid_cast<const MergeTreeIndexAnnoy *>(index_helper.get()))
|
||||
condition = annoy->createIndexCondition(query_info, context);
|
||||
#endif
|
||||
#ifdef ENABLE_USEARCH
|
||||
if (const auto * usearch = typeid_cast<const MergeTreeIndexUSearch *>(index_helper.get()))
|
||||
condition = usearch->createIndexCondition(query_info, context);
|
||||
#endif
|
||||
if (!condition)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown vector search index {}", index_helper->index.name);
|
||||
}
|
||||
else
|
||||
{
|
||||
condition = index_helper->createIndexCondition(filter_actions_dag, context);
|
||||
}
|
||||
|
||||
if (!condition->alwaysUnknownOrTrue())
|
||||
skip_indexes.useful_indices.emplace_back(index_helper, condition);
|
||||
}
|
||||
|
||||
// move minmax indices to first positions, so they will be applied first as cheapest ones
|
||||
@ -1535,14 +1559,17 @@ void ReadFromMergeTree::applyFilters(ActionDAGNodes added_filter_nodes)
|
||||
query_info.filter_actions_dag.get(),
|
||||
data,
|
||||
prepared_parts,
|
||||
mutations_snapshot,
|
||||
context,
|
||||
query_info,
|
||||
metadata_for_reading);
|
||||
metadata_for_reading,
|
||||
log);
|
||||
}
|
||||
}
|
||||
|
||||
ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
|
||||
MergeTreeData::DataPartsVector parts,
|
||||
MergeTreeData::MutationsSnapshotPtr mutations_snapshot,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const SelectQueryInfo & query_info_,
|
||||
ContextPtr context_,
|
||||
@ -1573,7 +1600,7 @@ ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
|
||||
const Names & primary_key_column_names = primary_key.column_names;
|
||||
|
||||
if (!indexes)
|
||||
buildIndexes(indexes, query_info_.filter_actions_dag.get(), data, parts, context_, query_info_, metadata_snapshot);
|
||||
buildIndexes(indexes, query_info_.filter_actions_dag.get(), data, parts, mutations_snapshot, context_, query_info_, metadata_snapshot, log);
|
||||
|
||||
if (indexes->part_values && indexes->part_values->empty())
|
||||
return std::make_shared<AnalysisResult>(std::move(result));
|
||||
|
@ -154,6 +154,7 @@ public:
|
||||
|
||||
static AnalysisResultPtr selectRangesToRead(
|
||||
MergeTreeData::DataPartsVector parts,
|
||||
MergeTreeData::MutationsSnapshotPtr mutations_snapshot,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const SelectQueryInfo & query_info,
|
||||
ContextPtr context,
|
||||
|
@ -1,8 +1,8 @@
|
||||
#pragma once
|
||||
|
||||
#include <Storages/MutationCommands.h>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <Storages/StorageInMemoryMetadata.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -11,11 +11,17 @@ namespace DB
|
||||
/// Alter conversions which should be applied on-fly for part.
|
||||
/// Built from of the most recent mutation commands for part.
|
||||
/// Now only ALTER RENAME COLUMN is applied.
|
||||
class AlterConversions : private boost::noncopyable
|
||||
class AlterConversions : private WithContext, boost::noncopyable
|
||||
{
|
||||
public:
|
||||
AlterConversions() = default;
|
||||
|
||||
AlterConversions(StorageMetadataPtr metadata_snapshot_, ContextPtr context_)
|
||||
: WithContext(context_)
|
||||
, metadata_snapshot(std::move(metadata_snapshot_))
|
||||
{
|
||||
}
|
||||
|
||||
struct RenamePair
|
||||
{
|
||||
std::string rename_to;
|
||||
@ -40,6 +46,7 @@ public:
|
||||
private:
|
||||
/// Rename map new_name -> old_name.
|
||||
std::vector<RenamePair> rename_map;
|
||||
StorageMetadataPtr metadata_snapshot;
|
||||
};
|
||||
|
||||
using AlterConversionsPtr = std::shared_ptr<const AlterConversions>;
|
||||
|
@ -301,7 +301,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
|
||||
infos.add(part_infos);
|
||||
}
|
||||
|
||||
global_ctx->alter_conversions.push_back(MergeTreeData::getAlterConversionsForPart(part, mutations_snapshot));
|
||||
global_ctx->alter_conversions.push_back(MergeTreeData::getAlterConversionsForPart(part, mutations_snapshot, global_ctx->metadata_snapshot, global_ctx->context));
|
||||
}
|
||||
|
||||
const auto & local_part_min_ttl = global_ctx->new_data_part->ttl_infos.part_min_ttl;
|
||||
|
@ -7137,11 +7137,11 @@ UInt64 MergeTreeData::estimateNumberOfRowsToRead(
|
||||
ContextPtr query_context, const StorageSnapshotPtr & storage_snapshot, const SelectQueryInfo & query_info) const
|
||||
{
|
||||
const auto & snapshot_data = assert_cast<const MergeTreeData::SnapshotData &>(*storage_snapshot->data);
|
||||
const auto & parts = snapshot_data.parts;
|
||||
|
||||
MergeTreeDataSelectExecutor reader(*this);
|
||||
auto result_ptr = reader.estimateNumMarksToRead(
|
||||
parts,
|
||||
snapshot_data.parts,
|
||||
snapshot_data.mutations_snapshot,
|
||||
storage_snapshot->getMetadataForQuery()->getColumns().getAll().getNames(),
|
||||
storage_snapshot->metadata,
|
||||
query_info,
|
||||
@ -8162,10 +8162,12 @@ bool MergeTreeData::canUsePolymorphicParts(const MergeTreeSettings & settings, S
|
||||
|
||||
AlterConversionsPtr MergeTreeData::getAlterConversionsForPart(
|
||||
const MergeTreeDataPartPtr & part,
|
||||
const MutationsSnapshotPtr & snapshot)
|
||||
const MutationsSnapshotPtr & mutations,
|
||||
const StorageMetadataPtr & metadata,
|
||||
const ContextPtr & query_context)
|
||||
{
|
||||
auto commands = snapshot->getAlterMutationCommandsForPart(part);
|
||||
auto result = std::make_shared<AlterConversions>();
|
||||
auto commands = mutations->getAlterMutationCommandsForPart(part);
|
||||
auto result = std::make_shared<AlterConversions>(metadata, query_context);
|
||||
|
||||
for (const auto & command : commands | std::views::reverse)
|
||||
result->addMutationCommand(command);
|
||||
@ -8758,8 +8760,7 @@ static void updateMutationsCounters(
|
||||
void incrementMutationsCounters(
|
||||
Int64 & num_data_mutations_to_apply,
|
||||
Int64 & num_metadata_mutations_to_apply,
|
||||
const MutationCommands & commands,
|
||||
std::lock_guard<std::mutex> & /*lock*/)
|
||||
const MutationCommands & commands)
|
||||
{
|
||||
return updateMutationsCounters(num_data_mutations_to_apply, num_metadata_mutations_to_apply, commands, 1);
|
||||
}
|
||||
@ -8767,8 +8768,7 @@ void incrementMutationsCounters(
|
||||
void decrementMutationsCounters(
|
||||
Int64 & num_data_mutations_to_apply,
|
||||
Int64 & num_metadata_mutations_to_apply,
|
||||
const MutationCommands & commands,
|
||||
std::lock_guard<std::mutex> & /*lock*/)
|
||||
const MutationCommands & commands)
|
||||
{
|
||||
return updateMutationsCounters(num_data_mutations_to_apply, num_metadata_mutations_to_apply, commands, -1);
|
||||
}
|
||||
|
@ -475,6 +475,8 @@ public:
|
||||
/// @return list of mutation commands, in *reverse* order (newest to oldest)
|
||||
virtual MutationCommands getAlterMutationCommandsForPart(const DataPartPtr & part) const = 0;
|
||||
virtual std::shared_ptr<IMutationsSnapshot> cloneEmpty() const = 0;
|
||||
virtual NameSet getAllUpdatedColumns() const = 0;
|
||||
|
||||
bool hasDataMutations() const { return params.need_data_mutations && info.num_data_mutations > 0; }
|
||||
|
||||
virtual ~IMutationsSnapshot() = default;
|
||||
@ -975,7 +977,9 @@ public:
|
||||
/// Return alter conversions for part which must be applied on fly.
|
||||
static AlterConversionsPtr getAlterConversionsForPart(
|
||||
const MergeTreeDataPartPtr & part,
|
||||
const MutationsSnapshotPtr & snapshot);
|
||||
const MutationsSnapshotPtr & mutations,
|
||||
const StorageMetadataPtr & metadata,
|
||||
const ContextPtr & query_context);
|
||||
|
||||
/// Returns destination disk or volume for the TTL rule according to current storage policy.
|
||||
SpacePtr getDestinationForMoveTTL(const TTLDescription & move_ttl) const;
|
||||
@ -1769,17 +1773,14 @@ struct CurrentlySubmergingEmergingTagger
|
||||
};
|
||||
|
||||
/// Look at MutationCommands if it contains mutations for AlterConversions, update the counter.
|
||||
/// Return true if the counter had been updated
|
||||
void incrementMutationsCounters(
|
||||
Int64 & num_data_mutations_to_apply,
|
||||
Int64 & num_metadata_mutations_to_apply,
|
||||
const MutationCommands & commands,
|
||||
std::lock_guard<std::mutex> & lock);
|
||||
const MutationCommands & commands);
|
||||
|
||||
void decrementMutationsCounters(
|
||||
Int64 & num_data_mutations_to_apply,
|
||||
Int64 & num_metadata_mutations_to_apply,
|
||||
const MutationCommands & commands,
|
||||
std::lock_guard<std::mutex> & lock);
|
||||
const MutationCommands & commands);
|
||||
|
||||
}
|
||||
|
@ -884,6 +884,7 @@ std::shared_ptr<QueryIdHolder> MergeTreeDataSelectExecutor::checkLimits(
|
||||
|
||||
ReadFromMergeTree::AnalysisResultPtr MergeTreeDataSelectExecutor::estimateNumMarksToRead(
|
||||
MergeTreeData::DataPartsVector parts,
|
||||
MergeTreeData::MutationsSnapshotPtr mutations_snapshot,
|
||||
const Names & column_names_to_return,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const SelectQueryInfo & query_info,
|
||||
@ -898,6 +899,7 @@ ReadFromMergeTree::AnalysisResultPtr MergeTreeDataSelectExecutor::estimateNumMar
|
||||
std::optional<ReadFromMergeTree::Indexes> indexes;
|
||||
return ReadFromMergeTree::selectRangesToRead(
|
||||
std::move(parts),
|
||||
mutations_snapshot,
|
||||
metadata_snapshot,
|
||||
query_info,
|
||||
context,
|
||||
|
@ -56,6 +56,7 @@ public:
|
||||
/// This method is used to select best projection for table.
|
||||
ReadFromMergeTree::AnalysisResultPtr estimateNumMarksToRead(
|
||||
MergeTreeData::DataPartsVector parts,
|
||||
MergeTreeData::MutationsSnapshotPtr mutations_snapshot,
|
||||
const Names & column_names,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const SelectQueryInfo & query_info,
|
||||
|
@ -105,7 +105,6 @@ MergeTreePrefetchedReadPool::MergeTreePrefetchedReadPool(
|
||||
column_names_,
|
||||
settings_,
|
||||
context_)
|
||||
, WithContext(context_)
|
||||
, prefetch_threadpool(getContext()->getPrefetchThreadpool())
|
||||
, log(getLogger("MergeTreePrefetchedReadPool(" + (parts_ranges.empty() ? "" : parts_ranges.front().data_part->storage.getStorageID().getNameForLogs()) + ")"))
|
||||
{
|
||||
|
@ -14,7 +14,7 @@ using MergeTreeReaderPtr = std::unique_ptr<IMergeTreeReader>;
|
||||
/// A class which is responsible for creating read tasks
|
||||
/// which are later taken by readers via getTask method.
|
||||
/// Does prefetching for the read tasks it creates.
|
||||
class MergeTreePrefetchedReadPool : public MergeTreeReadPoolBase, private WithContext
|
||||
class MergeTreePrefetchedReadPool : public MergeTreeReadPoolBase
|
||||
{
|
||||
public:
|
||||
MergeTreePrefetchedReadPool(
|
||||
|
@ -4,9 +4,6 @@
|
||||
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
|
||||
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
|
||||
|
||||
#include <cmath>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -26,7 +23,8 @@ MergeTreeReadPoolBase::MergeTreeReadPoolBase(
|
||||
const Names & column_names_,
|
||||
const PoolSettings & pool_settings_,
|
||||
const ContextPtr & context_)
|
||||
: parts_ranges(std::move(parts_))
|
||||
: WithContext(context_)
|
||||
, parts_ranges(std::move(parts_))
|
||||
, mutations_snapshot(std::move(mutations_snapshot_))
|
||||
, shared_virtual_fields(std::move(shared_virtual_fields_))
|
||||
, storage_snapshot(storage_snapshot_)
|
||||
@ -121,7 +119,7 @@ void MergeTreeReadPoolBase::fillPerPartInfos(const Settings & settings)
|
||||
}
|
||||
|
||||
read_task_info.part_index_in_query = part_with_ranges.part_index_in_query;
|
||||
read_task_info.alter_conversions = MergeTreeData::getAlterConversionsForPart(part_with_ranges.data_part, mutations_snapshot);
|
||||
read_task_info.alter_conversions = MergeTreeData::getAlterConversionsForPart(part_with_ranges.data_part, mutations_snapshot, storage_snapshot->metadata, getContext());
|
||||
|
||||
LoadedMergeTreeDataPartInfoForReader part_info(part_with_ranges.data_part, read_task_info.alter_conversions);
|
||||
|
||||
|
@ -6,7 +6,7 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class MergeTreeReadPoolBase : public IMergeTreeReadPool
|
||||
class MergeTreeReadPoolBase : public IMergeTreeReadPool, protected WithContext
|
||||
{
|
||||
public:
|
||||
using MutationsSnapshotPtr = MergeTreeData::MutationsSnapshotPtr;
|
||||
|
@ -2125,7 +2125,7 @@ bool MutateTask::prepare()
|
||||
};
|
||||
|
||||
auto mutations_snapshot = ctx->data->getMutationsSnapshot(params);
|
||||
auto alter_conversions = MergeTreeData::getAlterConversionsForPart(ctx->source_part, mutations_snapshot);
|
||||
auto alter_conversions = MergeTreeData::getAlterConversionsForPart(ctx->source_part, mutations_snapshot, ctx->metadata_snapshot, ctx->context);
|
||||
|
||||
auto context_for_reading = Context::createCopy(ctx->context);
|
||||
|
||||
|
@ -950,7 +950,7 @@ int32_t ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper
|
||||
{
|
||||
const auto commands = entry.commands;
|
||||
it = mutations_by_znode.erase(it);
|
||||
decrementMutationsCounters(num_data_mutations_to_apply, num_metadata_mutations_to_apply, commands, state_lock);
|
||||
decrementMutationsCounters(num_data_mutations_to_apply, num_metadata_mutations_to_apply, commands);
|
||||
}
|
||||
else
|
||||
it = mutations_by_znode.erase(it);
|
||||
@ -1000,7 +1000,7 @@ int32_t ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper
|
||||
for (const ReplicatedMergeTreeMutationEntryPtr & entry : new_mutations)
|
||||
{
|
||||
auto & mutation = mutations_by_znode.emplace(entry->znode_name, MutationStatus(entry, format_version)).first->second;
|
||||
incrementMutationsCounters(num_data_mutations_to_apply, num_metadata_mutations_to_apply, entry->commands, lock);
|
||||
incrementMutationsCounters(num_data_mutations_to_apply, num_metadata_mutations_to_apply, entry->commands);
|
||||
|
||||
NOEXCEPT_SCOPE({
|
||||
for (const auto & pair : entry->block_numbers)
|
||||
@ -1961,6 +1961,23 @@ MutationCommands ReplicatedMergeTreeQueue::MutationsSnapshot::getAlterMutationCo
|
||||
return result;
|
||||
}
|
||||
|
||||
NameSet ReplicatedMergeTreeQueue::MutationsSnapshot::getAllUpdatedColumns() const
|
||||
{
|
||||
if (!params.need_data_mutations)
|
||||
return {};
|
||||
|
||||
NameSet res;
|
||||
for (const auto & [partition_id, mutations] : mutations_by_partition)
|
||||
{
|
||||
for (const auto & [version, entry] : mutations)
|
||||
{
|
||||
auto names = entry->commands.getAllUpdatedColumns();
|
||||
std::move(names.begin(), names.end(), std::inserter(res, res.end()));
|
||||
}
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
MergeTreeData::MutationsSnapshotPtr ReplicatedMergeTreeQueue::getMutationsSnapshot(const MutationsSnapshot::Params & params) const
|
||||
{
|
||||
std::lock_guard lock(state_mutex);
|
||||
@ -2122,7 +2139,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
|
||||
mutation.parts_to_do.clear();
|
||||
}
|
||||
|
||||
decrementMutationsCounters(num_data_mutations_to_apply, num_metadata_mutations_to_apply, mutation.entry->commands, lock);
|
||||
decrementMutationsCounters(num_data_mutations_to_apply, num_metadata_mutations_to_apply, mutation.entry->commands);
|
||||
}
|
||||
else if (mutation.parts_to_do.size() == 0)
|
||||
{
|
||||
@ -2179,7 +2196,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
|
||||
LOG_TRACE(log, "Finishing data alter with version {} for entry {}", entry->alter_version, entry->znode_name);
|
||||
alter_sequence.finishDataAlter(entry->alter_version, lock);
|
||||
}
|
||||
decrementMutationsCounters(num_data_mutations_to_apply, num_metadata_mutations_to_apply, entry->commands, lock);
|
||||
decrementMutationsCounters(num_data_mutations_to_apply, num_metadata_mutations_to_apply, entry->commands);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -426,6 +426,7 @@ public:
|
||||
|
||||
MutationCommands getAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const override;
|
||||
std::shared_ptr<MergeTreeData::IMutationsSnapshot> cloneEmpty() const override { return std::make_shared<MutationsSnapshot>(); }
|
||||
NameSet getAllUpdatedColumns() const override;
|
||||
};
|
||||
|
||||
/// Return mutation commands for part which could be not applied to
|
||||
|
@ -268,4 +268,13 @@ bool MutationCommands::containBarrierCommand() const
|
||||
return false;
|
||||
}
|
||||
|
||||
NameSet MutationCommands::getAllUpdatedColumns() const
|
||||
{
|
||||
NameSet res;
|
||||
for (const auto & command : *this)
|
||||
for (const auto & [column_name, _] : command.column_to_update_expression)
|
||||
res.insert(column_name);
|
||||
return res;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -92,6 +92,7 @@ public:
|
||||
/// stick with other commands. Commands from one set have already been validated
|
||||
/// to be executed without issues on the creation state.
|
||||
bool containBarrierCommand() const;
|
||||
NameSet getAllUpdatedColumns() const;
|
||||
};
|
||||
|
||||
using MutationCommandsConstPtr = std::shared_ptr<MutationCommands>;
|
||||
|
@ -519,7 +519,7 @@ Int64 StorageMergeTree::startMutation(const MutationCommands & commands, Context
|
||||
if (!inserted)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mutation {} already exists, it's a bug", version);
|
||||
|
||||
incrementMutationsCounters(num_data_mutations_to_apply, num_metadata_mutations_to_apply, *it->second.commands, lock);
|
||||
incrementMutationsCounters(num_data_mutations_to_apply, num_metadata_mutations_to_apply, *it->second.commands);
|
||||
}
|
||||
|
||||
LOG_INFO(log, "Added mutation: {}{}", mutation_id, additional_info);
|
||||
@ -556,7 +556,7 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re
|
||||
if (static_cast<UInt64>(result_part->part_info.mutation) == it->first)
|
||||
mutation_backoff_policy.removePartFromFailed(failed_part->name);
|
||||
|
||||
decrementMutationsCounters(num_data_mutations_to_apply, num_metadata_mutations_to_apply, *entry.commands, lock);
|
||||
decrementMutationsCounters(num_data_mutations_to_apply, num_metadata_mutations_to_apply, *entry.commands);
|
||||
}
|
||||
}
|
||||
else
|
||||
@ -838,7 +838,7 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
|
||||
{
|
||||
bool mutation_finished = *min_version > static_cast<Int64>(mutation_version);
|
||||
if (!mutation_finished)
|
||||
decrementMutationsCounters(num_data_mutations_to_apply, num_metadata_mutations_to_apply, *it->second.commands, lock);
|
||||
decrementMutationsCounters(num_data_mutations_to_apply, num_metadata_mutations_to_apply, *it->second.commands);
|
||||
}
|
||||
|
||||
to_kill.emplace(std::move(it->second));
|
||||
@ -923,7 +923,7 @@ void StorageMergeTree::loadMutations()
|
||||
if (!inserted)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mutation {} already exists, it's a bug", block_number);
|
||||
|
||||
incrementMutationsCounters(num_data_mutations_to_apply, num_metadata_mutations_to_apply, *entry_it->second.commands, lock);
|
||||
incrementMutationsCounters(num_data_mutations_to_apply, num_metadata_mutations_to_apply, *entry_it->second.commands);
|
||||
}
|
||||
else if (startsWith(it->name(), "tmp_mutation_"))
|
||||
{
|
||||
@ -2464,6 +2464,20 @@ MutationCommands StorageMergeTree::MutationsSnapshot::getAlterMutationCommandsFo
|
||||
return result;
|
||||
}
|
||||
|
||||
NameSet StorageMergeTree::MutationsSnapshot::getAllUpdatedColumns() const
|
||||
{
|
||||
if (!params.need_data_mutations)
|
||||
return {};
|
||||
|
||||
NameSet res;
|
||||
for (const auto & [version, commands] : mutations_by_version)
|
||||
{
|
||||
auto names = commands->getAllUpdatedColumns();
|
||||
std::move(names.begin(), names.end(), std::inserter(res, res.end()));
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
MergeTreeData::MutationsSnapshotPtr StorageMergeTree::getMutationsSnapshot(const IMutationsSnapshot::Params & params) const
|
||||
{
|
||||
std::lock_guard lock(currently_processing_in_background_mutex);
|
||||
|
@ -320,6 +320,7 @@ private:
|
||||
|
||||
MutationCommands getAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const override;
|
||||
std::shared_ptr<MergeTreeData::IMutationsSnapshot> cloneEmpty() const override { return std::make_shared<MutationsSnapshot>(); }
|
||||
NameSet getAllUpdatedColumns() const override;
|
||||
};
|
||||
|
||||
MutationsSnapshotPtr getMutationsSnapshot(const IMutationsSnapshot::Params & params) const override;
|
||||
|
Loading…
Reference in New Issue
Block a user