Use 'merge on single replica' option instead of zookeeper lock

This commit is contained in:
Anton Ivashkin 2021-03-09 17:34:28 +03:00
parent d80c2cef06
commit 265d293934
4 changed files with 45 additions and 16 deletions

View File

@ -71,6 +71,7 @@ struct Settings;
M(Seconds, prefer_fetch_merged_part_time_threshold, 3600, "If time passed after replication log entry creation exceeds this threshold and sum size of parts is greater than \"prefer_fetch_merged_part_size_threshold\", prefer fetching merged part from replica instead of doing merge locally. To speed up very long merges.", 0) \
M(UInt64, prefer_fetch_merged_part_size_threshold, 10ULL * 1024 * 1024 * 1024, "If sum size of parts exceeds this threshold and time passed after replication log entry creation is greater than \"prefer_fetch_merged_part_time_threshold\", prefer fetching merged part from replica instead of doing merge locally. To speed up very long merges.", 0) \
M(Seconds, execute_merges_on_single_replica_time_threshold, 0, "When greater than zero only a single replica starts the merge immediately, others wait up to that amount of time to download the result instead of doing merges locally. If the chosen replica doesn't finish the merge during that amount of time, fallback to standard behavior happens.", 0) \
M(Seconds, s3_execute_merges_on_single_replica_time_threshold, 3 * 60 * 60, "When greater than zero only a single replica starts the merge immediatelys when merged part on S3 storage and 'allow_s3_zero_copy_replication' is enabled.", 0) \
M(Seconds, try_fetch_recompressed_part_timeout, 7200, "Recompression works slow in most cases, so we don't start merge with recompression until this timeout and trying to fetch recompressed part from replica which assigned this merge with recompression.", 0) \
M(Bool, always_fetch_merged_part, 0, "If true, replica never merge parts and always download merged parts from other replicas.", 0) \
M(UInt64, max_suspicious_broken_parts, 10, "Max broken parts, if more - deny automatic deletion.", 0) \

View File

@ -56,6 +56,17 @@ bool ReplicatedMergeTreeMergeStrategyPicker::shouldMergeOnSingleReplica(const Re
}
bool ReplicatedMergeTreeMergeStrategyPicker::shouldMergeOnSingleReplicaS3Shared(const ReplicatedMergeTreeLogEntryData & entry) const
{
time_t threshold = s3_execute_merges_on_single_replica_time_threshold;
return (
threshold > 0 /// feature turned on
&& entry.type == ReplicatedMergeTreeLogEntry::MERGE_PARTS /// it is a merge log entry
&& entry.create_time + threshold > time(nullptr) /// not too much time waited
);
}
/// that will return the same replica name for ReplicatedMergeTreeLogEntry on all the replicas (if the replica set is the same).
/// that way each replica knows who is responsible for doing a certain merge.
@ -90,18 +101,23 @@ std::optional<String> ReplicatedMergeTreeMergeStrategyPicker::pickReplicaToExecu
void ReplicatedMergeTreeMergeStrategyPicker::refreshState()
{
auto threshold = storage.getSettings()->execute_merges_on_single_replica_time_threshold.totalSeconds();
auto threshold_s3 = 0;
if (storage.getSettings()->allow_s3_zero_copy_replication)
threshold_s3 = storage.getSettings()->s3_execute_merges_on_single_replica_time_threshold.totalSeconds();
if (threshold == 0)
{
/// we can reset the settings w/o lock (it's atomic)
execute_merges_on_single_replica_time_threshold = threshold;
if (threshold_s3 == 0)
s3_execute_merges_on_single_replica_time_threshold = threshold_s3;
if (threshold == 0 && threshold_s3 == 0)
return;
}
auto now = time(nullptr);
/// the setting was already enabled, and last state refresh was done recently
if (execute_merges_on_single_replica_time_threshold != 0
if ((execute_merges_on_single_replica_time_threshold != 0
|| s3_execute_merges_on_single_replica_time_threshold != 0)
&& now - last_refresh_time < REFRESH_STATE_MINIMUM_INTERVAL_SECONDS)
return;
@ -130,11 +146,15 @@ void ReplicatedMergeTreeMergeStrategyPicker::refreshState()
LOG_WARNING(storage.log, "Can't find current replica in the active replicas list, or too few active replicas to use execute_merges_on_single_replica_time_threshold!");
/// we can reset the settings w/o lock (it's atomic)
execute_merges_on_single_replica_time_threshold = 0;
s3_execute_merges_on_single_replica_time_threshold = 0;
return;
}
std::lock_guard lock(mutex);
execute_merges_on_single_replica_time_threshold = threshold;
if (threshold != 0) /// Zeros already reset
execute_merges_on_single_replica_time_threshold = threshold;
if (threshold_s3 != 0)
s3_execute_merges_on_single_replica_time_threshold = threshold_s3;
last_refresh_time = now;
current_replica_index = current_replica_index_tmp;
active_replicas = active_replicas_tmp;

View File

@ -52,6 +52,10 @@ public:
/// and we may need to do a fetch (or postpone) instead of merge
bool shouldMergeOnSingleReplica(const ReplicatedMergeTreeLogEntryData & entry) const;
/// return true if s3_execute_merges_on_single_replica_time_threshold feature is active
/// and we may need to do a fetch (or postpone) instead of merge
bool shouldMergeOnSingleReplicaS3Shared(const ReplicatedMergeTreeLogEntryData & entry) const;
/// returns the replica name
/// and it's not current replica should do the merge
/// used in shouldExecuteLogEntry and in tryExecuteMerge
@ -68,6 +72,7 @@ private:
uint64_t getEntryHash(const ReplicatedMergeTreeLogEntryData & entry) const;
std::atomic<time_t> execute_merges_on_single_replica_time_threshold = 0;
std::atomic<time_t> s3_execute_merges_on_single_replica_time_threshold = 0;
std::atomic<time_t> last_refresh_time = 0;
std::mutex mutex;

View File

@ -528,7 +528,6 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas(
void StorageReplicatedMergeTree::createNewZooKeeperNodes()
{
auto storage_settings = getSettings();
auto zookeeper = getZooKeeper();
/// Working with quorum.
@ -546,10 +545,9 @@ void StorageReplicatedMergeTree::createNewZooKeeperNodes()
zookeeper->createIfNotExists(replica_path + "/mutation_pointer", String());
/// Nodes for zero-copy S3 replication
if (storage_settings->allow_s3_zero_copy_replication)
if (storage_settings.get()->allow_s3_zero_copy_replication)
{
zookeeper->createIfNotExists(zookeeper_path + "/zero_copy_s3", String());
zookeeper->createIfNotExists(zookeeper_path + "/zero_copy_s3/merged", String());
zookeeper->createIfNotExists(zookeeper_path + "/zero_copy_s3/shared", String());
}
}
@ -1459,9 +1457,12 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
/// In some use cases merging can be more expensive than fetching
/// and it may be better to spread merges tasks across the replicas
/// instead of doing exactly the same merge cluster-wise
std::optional<String> replica_to_execute_merge;
bool replica_to_execute_merge_picked = false;
if (merge_strategy_picker.shouldMergeOnSingleReplica(entry))
{
auto replica_to_execute_merge = merge_strategy_picker.pickReplicaToExecuteMerge(entry);
replica_to_execute_merge = merge_strategy_picker.pickReplicaToExecuteMerge(entry);
replica_to_execute_merge_picked = true;
if (replica_to_execute_merge)
{
@ -1547,15 +1548,17 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
auto disk = reserved_space->getDisk();
if (disk->getType() == DB::DiskType::Type::S3)
{
auto zookeeper = getZooKeeper();
String zookeeper_node = zookeeper_path + "/zero_copy_s3/merged/" + entry.new_part_name;
if (merge_strategy_picker.shouldMergeOnSingleReplicaS3Shared(entry))
{
if (!replica_to_execute_merge_picked)
replica_to_execute_merge = merge_strategy_picker.pickReplicaToExecuteMerge(entry);
auto code = zookeeper->tryCreate(zookeeper_node, "lock", zkutil::CreateMode::Ephemeral);
/// Someone else created or started create this merge,
/// so will try to fetch.
if (code == Coordination::Error::ZNODEEXISTS)
return false;
if (replica_to_execute_merge)
{
LOG_DEBUG(log, "Prefer fetching part {} from replica {} due s3_execute_merges_on_single_replica_time_threshold", entry.new_part_name, replica_to_execute_merge.value());
return false;
}
}
}
}