Also zero copy mutations

This commit is contained in:
alesapin 2022-02-10 14:15:08 +03:00
parent 70221b272b
commit f764da35ca
6 changed files with 63 additions and 19 deletions

View File

@ -767,6 +767,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDiskRemoteMeta(
{ {
throw Exception(ErrorCodes::ZERO_COPY_REPLICATION_ERROR, "Part {} unique id {} doesn't exist on {}.", part_name, part_id, disk->getName()); throw Exception(ErrorCodes::ZERO_COPY_REPLICATION_ERROR, "Part {} unique id {} doesn't exist on {}.", part_name, part_id, disk->getName());
} }
LOG_DEBUG(log, "Downloading Part {} unique id {} metadata onto disk {}.", LOG_DEBUG(log, "Downloading Part {} unique id {} metadata onto disk {}.",
part_name, part_id, disk->getName()); part_name, part_id, disk->getName());
@ -836,6 +837,9 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDiskRemoteMeta(
new_data_part->storage.lockSharedData(*new_data_part); new_data_part->storage.lockSharedData(*new_data_part);
LOG_DEBUG(log, "Download of part {} unique id {} metadata onto disk {} finished.",
part_name, part_id, disk->getName());
return new_data_part; return new_data_part;
} }

View File

@ -49,9 +49,9 @@ std::pair<bool, ReplicatedMergeMutateTaskBase::PartLogWriter> MergeFromLogEntryT
std::optional<String> replica_to_execute_merge; std::optional<String> replica_to_execute_merge;
bool replica_to_execute_merge_picked = false; bool replica_to_execute_merge_picked = false;
if (storage.merge_strategy_picker.shouldMergeOnSingleReplica(entry)) if (storage.merge_strategy_picker.shouldMergeMutateOnSingleReplica(entry))
{ {
replica_to_execute_merge = storage.merge_strategy_picker.pickReplicaToExecuteMerge(entry); replica_to_execute_merge = storage.merge_strategy_picker.pickReplicaToExecuteMergeMutation(entry);
replica_to_execute_merge_picked = true; replica_to_execute_merge_picked = true;
if (replica_to_execute_merge) if (replica_to_execute_merge)
@ -162,10 +162,10 @@ std::pair<bool, ReplicatedMergeMutateTaskBase::PartLogWriter> MergeFromLogEntryT
{ {
if (auto disk = reserved_space->getDisk(); disk->getType() == DB::DiskType::S3) if (auto disk = reserved_space->getDisk(); disk->getType() == DB::DiskType::S3)
{ {
if (storage.merge_strategy_picker.shouldMergeOnSingleReplicaShared(entry)) if (storage.merge_strategy_picker.shouldMergeMutateOnSingleReplicaShared(entry))
{ {
if (!replica_to_execute_merge_picked) if (!replica_to_execute_merge_picked)
replica_to_execute_merge = storage.merge_strategy_picker.pickReplicaToExecuteMerge(entry); replica_to_execute_merge = storage.merge_strategy_picker.pickReplicaToExecuteMergeMutation(entry);
if (replica_to_execute_merge) if (replica_to_execute_merge)
{ {

View File

@ -52,6 +52,26 @@ std::pair<bool, ReplicatedMergeMutateTaskBase::PartLogWriter> MutateFromLogEntry
} }
} }
/// In some use cases merging can be more expensive than fetching
/// and it may be better to spread merges tasks across the replicas
/// instead of doing exactly the same merge cluster-wise
std::optional<String> replica_to_execute_merge;
bool replica_to_execute_merge_picked = false;
if (storage.merge_strategy_picker.shouldMergeMutateOnSingleReplica(entry))
{
replica_to_execute_merge = storage.merge_strategy_picker.pickReplicaToExecuteMergeMutation(entry);
replica_to_execute_merge_picked = true;
if (replica_to_execute_merge)
{
LOG_DEBUG(log,
"Prefer fetching part {} from replica {} due to execute_merges_on_single_replica_time_threshold",
entry.new_part_name, replica_to_execute_merge.value());
return {false, {}};
}
}
new_part_info = MergeTreePartInfo::fromPartName(entry.new_part_name, storage.format_version); new_part_info = MergeTreePartInfo::fromPartName(entry.new_part_name, storage.format_version);
commands = MutationCommands::create(storage.queue.getMutationCommands(source_part, new_part_info.mutation)); commands = MutationCommands::create(storage.queue.getMutationCommands(source_part, new_part_info.mutation));
@ -59,6 +79,26 @@ std::pair<bool, ReplicatedMergeMutateTaskBase::PartLogWriter> MutateFromLogEntry
/// Can throw an exception. /// Can throw an exception.
reserved_space = storage.reserveSpace(estimated_space_for_result, source_part->volume); reserved_space = storage.reserveSpace(estimated_space_for_result, source_part->volume);
if (storage_settings_ptr->allow_remote_fs_zero_copy_replication)
{
if (auto disk = reserved_space->getDisk(); disk->getType() == DB::DiskType::S3)
{
if (storage.merge_strategy_picker.shouldMergeMutateOnSingleReplicaShared(entry))
{
if (!replica_to_execute_merge_picked)
replica_to_execute_merge = storage.merge_strategy_picker.pickReplicaToExecuteMergeMutation(entry);
if (replica_to_execute_merge)
{
LOG_DEBUG(log,
"Prefer fetching part {} from replica {} due s3_execute_merges_on_single_replica_time_threshold",
entry.new_part_name, replica_to_execute_merge.value());
return {false, {}};
}
}
}
}
table_lock_holder = storage.lockForShare( table_lock_holder = storage.lockForShare(
RWLockImpl::NO_QUERY, storage_settings_ptr->lock_acquire_timeout_for_background_operations); RWLockImpl::NO_QUERY, storage_settings_ptr->lock_acquire_timeout_for_background_operations);
StorageMetadataPtr metadata_snapshot = storage.getInMemoryMetadataPtr(); StorageMetadataPtr metadata_snapshot = storage.getInMemoryMetadataPtr();

View File

@ -27,30 +27,30 @@ ReplicatedMergeTreeMergeStrategyPicker::ReplicatedMergeTreeMergeStrategyPicker(S
{} {}
bool ReplicatedMergeTreeMergeStrategyPicker::isMergeFinishedByAnyReplica(const ReplicatedMergeTreeLogEntryData & entry) bool ReplicatedMergeTreeMergeStrategyPicker::isMergeMutationFinishedByAnyReplica(const ReplicatedMergeTreeLogEntryData & entry)
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
return !parts_on_active_replicas.getContainingPart(entry.new_part_name).empty(); return !parts_on_active_replicas.getContainingPart(entry.new_part_name).empty();
} }
bool ReplicatedMergeTreeMergeStrategyPicker::shouldMergeOnSingleReplica(const ReplicatedMergeTreeLogEntryData & entry) const bool ReplicatedMergeTreeMergeStrategyPicker::shouldMergeMutateOnSingleReplica(const ReplicatedMergeTreeLogEntryData & entry) const
{ {
time_t threshold = execute_merges_on_single_replica_time_threshold; time_t threshold = execute_merges_on_single_replica_time_threshold;
return ( return (
threshold > 0 /// feature turned on threshold > 0 /// feature turned on
&& entry.type == ReplicatedMergeTreeLogEntry::MERGE_PARTS /// it is a merge log entry && (entry.type == ReplicatedMergeTreeLogEntry::MERGE_PARTS || entry.type == ReplicatedMergeTreeLogEntry::MUTATE_PART)
&& entry.create_time + threshold > time(nullptr) /// not too much time waited && entry.create_time + threshold > time(nullptr) /// not too much time waited
); );
} }
bool ReplicatedMergeTreeMergeStrategyPicker::shouldMergeOnSingleReplicaShared(const ReplicatedMergeTreeLogEntryData & entry) const bool ReplicatedMergeTreeMergeStrategyPicker::shouldMergeMutateOnSingleReplicaShared(const ReplicatedMergeTreeLogEntryData & entry) const
{ {
time_t threshold = remote_fs_execute_merges_on_single_replica_time_threshold; time_t threshold = remote_fs_execute_merges_on_single_replica_time_threshold;
return ( return (
threshold > 0 /// feature turned on threshold > 0 /// feature turned on
&& entry.type == ReplicatedMergeTreeLogEntry::MERGE_PARTS /// it is a merge log entry && (entry.type == ReplicatedMergeTreeLogEntry::MERGE_PARTS || entry.type == ReplicatedMergeTreeLogEntry::MUTATE_PART)
&& entry.create_time + threshold > time(nullptr) /// not too much time waited && entry.create_time + threshold > time(nullptr) /// not too much time waited
); );
} }
@ -63,7 +63,7 @@ bool ReplicatedMergeTreeMergeStrategyPicker::shouldMergeOnSingleReplicaShared(co
/// nodes can pick different replicas to execute merge and wait for it (or to execute the same merge together) /// nodes can pick different replicas to execute merge and wait for it (or to execute the same merge together)
/// but that doesn't have a significant impact (in one case it will wait for the execute_merges_on_single_replica_time_threshold, /// but that doesn't have a significant impact (in one case it will wait for the execute_merges_on_single_replica_time_threshold,
/// in another just 2 replicas will do the merge) /// in another just 2 replicas will do the merge)
std::optional<String> ReplicatedMergeTreeMergeStrategyPicker::pickReplicaToExecuteMerge(const ReplicatedMergeTreeLogEntryData & entry) std::optional<String> ReplicatedMergeTreeMergeStrategyPicker::pickReplicaToExecuteMergeMutation(const ReplicatedMergeTreeLogEntryData & entry)
{ {
/// last state refresh was too long ago, need to sync up the replicas list /// last state refresh was too long ago, need to sync up the replicas list
if (time(nullptr) - last_refresh_time > REFRESH_STATE_MAXIMUM_INTERVAL_SECONDS) if (time(nullptr) - last_refresh_time > REFRESH_STATE_MAXIMUM_INTERVAL_SECONDS)

View File

@ -52,18 +52,18 @@ public:
/// return true if execute_merges_on_single_replica_time_threshold feature is active /// return true if execute_merges_on_single_replica_time_threshold feature is active
/// and we may need to do a fetch (or postpone) instead of merge /// and we may need to do a fetch (or postpone) instead of merge
bool shouldMergeOnSingleReplica(const ReplicatedMergeTreeLogEntryData & entry) const; bool shouldMergeMutateOnSingleReplica(const ReplicatedMergeTreeLogEntryData & entry) const;
/// return true if remote_fs_execute_merges_on_single_replica_time_threshold feature is active /// return true if remote_fs_execute_merges_on_single_replica_time_threshold feature is active
/// and we may need to do a fetch (or postpone) instead of merge /// and we may need to do a fetch (or postpone) instead of merge
bool shouldMergeOnSingleReplicaShared(const ReplicatedMergeTreeLogEntryData & entry) const; bool shouldMergeMutateOnSingleReplicaShared(const ReplicatedMergeTreeLogEntryData & entry) const;
/// returns the replica name /// returns the replica name
/// and it's not current replica should do the merge /// and it's not current replica should do the merge
std::optional<String> pickReplicaToExecuteMerge(const ReplicatedMergeTreeLogEntryData & entry); std::optional<String> pickReplicaToExecuteMergeMutation(const ReplicatedMergeTreeLogEntryData & entry);
/// Checks (in zookeeper) if some replica finished the merge /// Checks (in zookeeper) if some replica finished the merge
bool isMergeFinishedByAnyReplica(const ReplicatedMergeTreeLogEntryData & entry); bool isMergeMutationFinishedByAnyReplica(const ReplicatedMergeTreeLogEntryData & entry);
private: private:
StorageReplicatedMergeTree & storage; StorageReplicatedMergeTree & storage;

View File

@ -1205,7 +1205,7 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
return false; return false;
} }
bool should_execute_on_single_replica = merge_strategy_picker.shouldMergeOnSingleReplica(entry); bool should_execute_on_single_replica = merge_strategy_picker.shouldMergeMutateOnSingleReplica(entry);
if (!should_execute_on_single_replica) if (!should_execute_on_single_replica)
{ {
/// Separate check. If we use only s3, check remote_fs_execute_merges_on_single_replica_time_threshold as well. /// Separate check. If we use only s3, check remote_fs_execute_merges_on_single_replica_time_threshold as well.
@ -1216,19 +1216,19 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
only_s3_storage = false; only_s3_storage = false;
if (!disks.empty() && only_s3_storage) if (!disks.empty() && only_s3_storage)
should_execute_on_single_replica = merge_strategy_picker.shouldMergeOnSingleReplicaShared(entry); should_execute_on_single_replica = merge_strategy_picker.shouldMergeMutateOnSingleReplicaShared(entry);
} }
if (should_execute_on_single_replica) if (should_execute_on_single_replica)
{ {
auto replica_to_execute_merge = merge_strategy_picker.pickReplicaToExecuteMerge(entry); auto replica_to_execute_merge = merge_strategy_picker.pickReplicaToExecuteMergeMutation(entry);
if (replica_to_execute_merge) if (replica_to_execute_merge)
{ {
if (!merge_strategy_picker.isMergeFinishedByAnyReplica(entry)) if (!merge_strategy_picker.isMergeMutationFinishedByAnyReplica(entry))
{ {
out_postpone_reason = fmt::format( out_postpone_reason = fmt::format(
"Not executing merge for the part {} because no one have executed it, waiting for {} to execute merge.", "Not executing merge/mutation for the part {} because no one have executed it, waiting for {} to execute merge.",
entry.new_part_name, replica_to_execute_merge.value()); entry.new_part_name, replica_to_execute_merge.value());
LOG_DEBUG(log, fmt::runtime(out_postpone_reason)); LOG_DEBUG(log, fmt::runtime(out_postpone_reason));
return false; return false;