mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 08:02:02 +00:00
Better solution
This commit is contained in:
parent
c587160308
commit
70221b272b
@ -23,18 +23,14 @@ static const auto REFRESH_STATE_MAXIMUM_INTERVAL_SECONDS = 30;
|
||||
|
||||
ReplicatedMergeTreeMergeStrategyPicker::ReplicatedMergeTreeMergeStrategyPicker(StorageReplicatedMergeTree & storage_)
|
||||
: storage(storage_)
|
||||
, parts_on_active_replicas(storage_.format_version)
|
||||
{}
|
||||
|
||||
|
||||
bool ReplicatedMergeTreeMergeStrategyPicker::isMergeFinishedByAnyReplica(const ReplicatedMergeTreeLogEntryData & entry)
|
||||
{
|
||||
String dummy;
|
||||
return !storage.findReplicaHavingCoveringPart(entry.new_part_name, true, dummy).empty();
|
||||
}
|
||||
|
||||
bool ReplicatedMergeTreeMergeStrategyPicker::isMergeFinishedByReplica(const String & replica, const ReplicatedMergeTreeLogEntryData & entry)
|
||||
{
|
||||
return storage.checkReplicaHavePart(replica, entry.new_part_name);
|
||||
std::lock_guard lock(mutex);
|
||||
return !parts_on_active_replicas.getContainingPart(entry.new_part_name).empty();
|
||||
}
|
||||
|
||||
|
||||
@ -122,11 +118,17 @@ void ReplicatedMergeTreeMergeStrategyPicker::refreshState()
|
||||
|
||||
std::vector<String> active_replicas_tmp;
|
||||
int current_replica_index_tmp = -1;
|
||||
ActiveDataPartSet active_parts_tmp(storage.format_version);
|
||||
|
||||
for (const String & replica : all_replicas)
|
||||
{
|
||||
if (zookeeper->exists(storage.zookeeper_path + "/replicas/" + replica + "/is_active"))
|
||||
auto replica_path = fs::path{storage.zookeeper_path} / "replicas" / replica;
|
||||
if (zookeeper->exists(replica_path / "is_active"))
|
||||
{
|
||||
Strings parts = zookeeper->getChildren(replica_path / "parts");
|
||||
for (const auto & part : parts)
|
||||
active_parts_tmp.add(part);
|
||||
|
||||
active_replicas_tmp.push_back(replica);
|
||||
if (replica == storage.replica_name)
|
||||
{
|
||||
@ -157,6 +159,7 @@ void ReplicatedMergeTreeMergeStrategyPicker::refreshState()
|
||||
last_refresh_time = now;
|
||||
current_replica_index = current_replica_index_tmp;
|
||||
active_replicas = active_replicas_tmp;
|
||||
parts_on_active_replicas = active_parts_tmp;
|
||||
}
|
||||
|
||||
|
||||
|
@ -5,7 +5,9 @@
|
||||
#include <mutex>
|
||||
#include <vector>
|
||||
#include <atomic>
|
||||
#include <unordered_set>
|
||||
#include <boost/noncopyable.hpp>
|
||||
#include <Storages/MergeTree/ActiveDataPartSet.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -60,9 +62,7 @@ public:
|
||||
/// and it's not current replica should do the merge
|
||||
std::optional<String> pickReplicaToExecuteMerge(const ReplicatedMergeTreeLogEntryData & entry);
|
||||
|
||||
/// checks (in zookeeper) if the picked replica finished the merge
|
||||
bool isMergeFinishedByReplica(const String & replica, const ReplicatedMergeTreeLogEntryData & entry);
|
||||
|
||||
/// Checks (in zookeeper) if some replica finished the merge
|
||||
bool isMergeFinishedByAnyReplica(const ReplicatedMergeTreeLogEntryData & entry);
|
||||
|
||||
private:
|
||||
@ -82,6 +82,7 @@ private:
|
||||
/// execute_merges_on_single_replica_time_threshold enabled
|
||||
int current_replica_index = -1;
|
||||
std::vector<String> active_replicas;
|
||||
ActiveDataPartSet parts_on_active_replicas;
|
||||
|
||||
};
|
||||
|
||||
|
@ -1225,7 +1225,7 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
|
||||
|
||||
if (replica_to_execute_merge)
|
||||
{
|
||||
if (!merge_strategy_picker.isMergeFinishedByReplica(*replica_to_execute_merge, entry) && !merge_strategy_picker.isMergeFinishedByAnyReplica(entry))
|
||||
if (!merge_strategy_picker.isMergeFinishedByAnyReplica(entry))
|
||||
{
|
||||
out_postpone_reason = fmt::format(
|
||||
"Not executing merge for the part {} because no one have executed it, waiting for {} to execute merge.",
|
||||
|
Loading…
Reference in New Issue
Block a user