execute_merges_on_single_replica

This commit is contained in:
Mikhail Filimonov 2020-09-18 12:57:33 +02:00
parent 0bc60e2d53
commit 97fef77ed1
No known key found for this signature in database
GPG Key ID: 6E49C2E9AF1220BE
10 changed files with 478 additions and 6 deletions

View File

@ -62,6 +62,7 @@ struct Settings;
M(UInt64, min_replicated_logs_to_keep, 10, "Keep about this number of last records in ZooKeeper log, even if they are obsolete. It doesn't affect work of tables: used only to diagnose ZooKeeper log before cleaning.", 0) \
M(Seconds, prefer_fetch_merged_part_time_threshold, 3600, "If time passed after replication log entry creation exceeds this threshold and sum size of parts is greater than \"prefer_fetch_merged_part_size_threshold\", prefer fetching merged part from replica instead of doing merge locally. To speed up very long merges.", 0) \
M(UInt64, prefer_fetch_merged_part_size_threshold, 10ULL * 1024 * 1024 * 1024, "If sum size of parts exceeds this threshold and time passed after replication log entry creation is greater than \"prefer_fetch_merged_part_time_threshold\", prefer fetching merged part from replica instead of doing merge locally. To speed up very long merges.", 0) \
M(Seconds, execute_merges_on_single_replica_time_threshold, 0, "When greater than zero only a single replica starts the merge immediately, others wait up to that amount of time to download the result instead of doing merges locally. If the chosen replica doesn't finish the merge during that amount of time, fallback to standard behavior happens.", 0) \
M(Seconds, try_fetch_recompressed_part_timeout, 7200, "Recompression works slow in most cases, so we don't start merge with recompression until this timeout and trying to fetch recompressed part from replica which assigned this merge with recompression.", 0) \
M(Bool, always_fetch_merged_part, 0, "If true, replica never merge parts and always download merged parts from other replicas.", 0) \
M(UInt64, max_suspicious_broken_parts, 10, "Max broken parts, if more - deny automatic deletion.", 0) \

View File

@ -0,0 +1,152 @@
#include <Storages/MergeTree/ReplicatedMergeTreeMergeStrategyPicker.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h>
#include <common/types.h>
#include <optional>
#include <mutex>
#include <city.h>
#include <algorithm>
#include <atomic>
namespace DB
{
// minimum interval (seconds) between checks if chosen replica finished the merge.
static const auto RECHECK_MERGE_READYNESS_INTERVAL_SECONDS = 1;
// don't refresh state too often (to limit number of zookeeper ops)
static const auto REFRESH_STATE_MINIMUM_INTERVAL_SECONDS = 5;
// refresh the state automatically it it was not refreshed for a longer time
static const auto REFRESH_STATE_MAXIMUM_INTERVAL_SECONDS = 30;
ReplicatedMergeTreeMergeStrategyPicker::ReplicatedMergeTreeMergeStrategyPicker(StorageReplicatedMergeTree & storage_)
: storage(storage_)
{}
bool ReplicatedMergeTreeMergeStrategyPicker::isMergeFinishedByReplica(const String & replica, const ReplicatedMergeTreeLogEntryData & entry)
{
// those have only seconds resolution, so recheck period is quite rough
auto reference_timestamp = entry.last_postpone_time;
if (reference_timestamp == 0)
reference_timestamp = entry.create_time;
// we don't want to check zookeeper too frequent
if (time(nullptr) - reference_timestamp >= RECHECK_MERGE_READYNESS_INTERVAL_SECONDS)
{
return storage.checkReplicaHavePart(replica, entry.new_part_name);
}
return false;
}
// that will return the same replica name for ReplicatedMergeTreeLogEntry on all the replicas (if the replica set is the same).
// that way each replica knows who is responsible for doing a certain merge.
// in some corner cases (added / removed / deactivated replica)
// nodes can pick different replicas to execute merge and wait for it (or to execute the same merge together)
// but that doesn't have a significant impact (in one case it will wait for the execute_merges_on_single_replica_time_threshold,
// in another just 2 replicas will do the merge)
std::optional<std::string> ReplicatedMergeTreeMergeStrategyPicker::pickReplicaToExecuteMerge(const ReplicatedMergeTreeLogEntryData & entry)
{
time_t threshold = execute_merges_on_single_replica_time_threshold;
if (
threshold == 0 // feature turned off
|| entry.type != ReplicatedMergeTreeLogEntry::MERGE_PARTS // it is not a merge log entry
)
return std::nullopt;
auto now = time(nullptr);
if (entry.create_time + threshold <= now) // too much time waited
return std::nullopt;
if (now - last_refresh_time > REFRESH_STATE_MAXIMUM_INTERVAL_SECONDS) // last state refresh was too long ago, need to sync up the replicas list
refreshState();
auto hash = getEntryHash(entry);
std::lock_guard lock(mutex);
auto num_replicas = active_replicas.size();
if (num_replicas==0)
return std::nullopt;
auto replica_index = static_cast<int>(hash % num_replicas);
if (replica_index == current_replica_index)
return std::nullopt;
return active_replicas.at(replica_index);
}
void ReplicatedMergeTreeMergeStrategyPicker::refreshState()
{
auto threshold = storage.getSettings()->execute_merges_on_single_replica_time_threshold.totalSeconds();
auto now = time(nullptr);
// the setting is not changed, and last state refresh was done recently
if (threshold == execute_merges_on_single_replica_time_threshold
&& now - last_refresh_time < REFRESH_STATE_MINIMUM_INTERVAL_SECONDS)
return;
if (threshold == 0)
{
// we can reset the settings w/o lock (it's atomic)
execute_merges_on_single_replica_time_threshold = threshold;
return;
}
auto zookeeper = storage.getZooKeeper();
auto all_replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas");
// TODO: do we need that sort or we can rely that zookeeper will return them in deterministic order?
std::sort(all_replicas.begin(), all_replicas.end());
std::vector<String> active_replicas_tmp;
int current_replica_index_tmp = -1;
for (const String & replica : all_replicas)
{
if (zookeeper->exists(storage.zookeeper_path + "/replicas/" + replica + "/is_active"))
{
active_replicas_tmp.push_back(replica);
if (replica == storage.replica_name)
{
current_replica_index_tmp = active_replicas_tmp.size() - 1;
}
}
}
if (current_replica_index_tmp < 0 || active_replicas_tmp.size() < 2)
{
LOG_ERROR(storage.log, "Can't find current replica in the active replicas list, or too few active replicas to use execute_merges_on_single_replica_time_threshold!");
// we can reset the settings w/o lock (it's atomic)
execute_merges_on_single_replica_time_threshold = 0;
return;
}
std::lock_guard lock(mutex);
execute_merges_on_single_replica_time_threshold = threshold;
last_refresh_time = now;
current_replica_index = current_replica_index_tmp;
active_replicas = active_replicas_tmp;
}
uint64_t ReplicatedMergeTreeMergeStrategyPicker::getEntryHash(const ReplicatedMergeTreeLogEntryData & entry) const
{
auto hash_data = storage.zookeeper_path + entry.new_part_name;
return CityHash_v1_0_2::CityHash64(hash_data.c_str(), hash_data.length());
}
}

View File

@ -0,0 +1,66 @@
#pragma once
#include <common/types.h>
// #include <Common/ZooKeeper/ZooKeeper.h>
// #include <common/logger_useful.h>
#include <optional>
#include <mutex>
#include <vector>
#include <atomic>
#include <boost/noncopyable.hpp>
namespace DB
{
class StorageReplicatedMergeTree;
struct ReplicatedMergeTreeLogEntryData;
/// In some use cases merging can be more expensive than fetching
/// (so instead of doing exactly the same merge cluster-wise you can do merge once and fetch ready part)
/// Fetches may be desirable for other operational reasons (backup replica without lot of CPU resources).
///
/// That class allow to take a decisions about preferred strategy for a concreate merge.
///
/// Since that code is used in shouldExecuteLogEntry we need to be able to:
/// 1) make decision fast
/// 2) avoid excessive zookeeper operations
///
/// Because of that we need to cache some important things,
/// like list of active replicas (to limit the number of zookeeper operations)
///
/// That means we need to refresh the state of that object regularly
class ReplicatedMergeTreeMergeStrategyPicker: public boost::noncopyable
{
public:
ReplicatedMergeTreeMergeStrategyPicker(StorageReplicatedMergeTree & storage_);
/// triggers refreshing the cached state (list of replicas etc.)
/// used when we get new merge event from the zookeeper queue ( see queueUpdatingTask() etc )
void refreshState();
/// returns the replica name in the case when feature is active
/// and it's not current replica should do the merge
/// used in shouldExecuteLogEntry and in tryExecuteMerge
std::optional<std::string> pickReplicaToExecuteMerge(const ReplicatedMergeTreeLogEntryData & entry);
/// checks (in zookeeper) if the picked replica finished the merge
bool isMergeFinishedByReplica(const String & replica, const ReplicatedMergeTreeLogEntryData & entry);
private:
StorageReplicatedMergeTree & storage;
/// calculate entry hash based on zookeeper path and new part name
uint64_t getEntryHash(const ReplicatedMergeTreeLogEntryData & entry) const;
std::atomic<time_t> execute_merges_on_single_replica_time_threshold = 0;
std::atomic<time_t> last_refresh_time = 0;
std::mutex mutex;
// those 2 member accessed under the mutex, only when
// execute_merges_on_single_replica_time_threshold enabled
int current_replica_index = -1;
std::vector<String> active_replicas;
};
}

View File

@ -5,6 +5,7 @@
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeMergeStrategyPicker.h>
#include <Common/StringUtils/StringUtils.h>
@ -20,8 +21,9 @@ namespace ErrorCodes
}
ReplicatedMergeTreeQueue::ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_)
ReplicatedMergeTreeQueue::ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_, ReplicatedMergeTreeMergeStrategyPicker & merge_strategy_picker_)
: storage(storage_)
, merge_strategy_picker(merge_strategy_picker_)
, format_version(storage.format_version)
, current_parts(format_version)
, virtual_parts(format_version)
@ -120,6 +122,8 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, {});
merge_strategy_picker.refreshState();
LOG_TRACE(log, "Loaded queue");
return updated;
}
@ -587,7 +591,15 @@ int32_t ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper
}
if (!copied_entries.empty())
{
LOG_DEBUG(log, "Pulled {} entries to queue.", copied_entries.size());
/// to limit the number of zookeeper operations MergeStrategyPicker state is updated only
/// when new merges appear.
auto operations_in_queue = countMergesAndPartMutations();
if (operations_in_queue.merges > 0)
merge_strategy_picker.refreshState();
}
}
storage.background_executor.triggerTask();
@ -1088,6 +1100,17 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
return false;
}
auto replica_to_execute_merge = merge_strategy_picker.pickReplicaToExecuteMerge(entry);
if (replica_to_execute_merge && !merge_strategy_picker.isMergeFinishedByReplica(replica_to_execute_merge.value(), entry))
{
String reason = "Not executing merge for the part " + entry.new_part_name
+ ", waiting for " + replica_to_execute_merge.value() + " to execute merge.";
LOG_DEBUG(log, reason);
out_postpone_reason = reason;
return false;
}
UInt64 max_source_parts_size = entry.type == LogEntry::MERGE_PARTS ? merger_mutator.getMaxSourcePartsSizeForMerge()
: merger_mutator.getMaxSourcePartSizeForMutation();
/** If there are enough free threads in background pool to do large merges (maximal size of merge is allowed),

View File

@ -22,6 +22,7 @@ class StorageReplicatedMergeTree;
class MergeTreeDataMergerMutator;
class ReplicatedMergeTreeMergePredicate;
class ReplicatedMergeTreeMergeStrategyPicker;
class ReplicatedMergeTreeQueue
@ -57,6 +58,7 @@ private:
using InsertsByTime = std::set<LogEntryPtr, ByTime>;
StorageReplicatedMergeTree & storage;
ReplicatedMergeTreeMergeStrategyPicker & merge_strategy_picker;
MergeTreeDataFormatVersion format_version;
String zookeeper_path;
@ -275,7 +277,7 @@ private:
size_t current_multi_batch_size = 1;
public:
ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_);
ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_, ReplicatedMergeTreeMergeStrategyPicker & merge_strategy_picker_);
~ReplicatedMergeTreeQueue();

View File

@ -139,7 +139,6 @@ static const auto MERGE_SELECTING_SLEEP_MS = 5 * 1000;
static const auto MUTATIONS_FINALIZING_SLEEP_MS = 1 * 1000;
static const auto MUTATIONS_FINALIZING_IDLE_SLEEP_MS = 5 * 1000;
void StorageReplicatedMergeTree::setZooKeeper(zkutil::ZooKeeperPtr zookeeper)
{
std::lock_guard lock(current_zookeeper_mutex);
@ -202,7 +201,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
, reader(*this)
, writer(*this)
, merger_mutator(*this, global_context.getSettingsRef().background_pool_size)
, queue(*this)
, merge_strategy_picker(*this)
, queue(*this, merge_strategy_picker)
, fetcher(*this)
, background_executor(*this, global_context)
, background_moves_executor(*this, global_context)
@ -1361,6 +1361,16 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
return false;
}
/// In some use cases merging can be more expensive than fetching
/// and it may be better to spread merges tasks across the replicas
/// instead of doing exactly the same merge cluster-wise
auto replica_to_execute_merge = merge_strategy_picker.pickReplicaToExecuteMerge(entry);
if (replica_to_execute_merge)
{
LOG_DEBUG(log, "Prefer fetching part {} from replica {} due execute_merges_on_single_replica_time_threshold", entry.new_part_name, replica_to_execute_merge.value());
return false;
}
DataPartsVector parts;
bool have_all_parts = true;
for (const String & name : entry.source_parts)
@ -3011,6 +3021,11 @@ void StorageReplicatedMergeTree::exitLeaderElection()
leader_election = nullptr;
}
bool StorageReplicatedMergeTree::checkReplicaHavePart(const String & replica, const String & part_name)
{
auto zookeeper = getZooKeeper();
return zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/parts/" + part_name);
}
String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_name, bool active)
{
@ -3026,7 +3041,7 @@ String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_nam
if (replica == replica_name)
continue;
if (zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/parts/" + part_name) &&
if (checkReplicaHavePart(replica, part_name) &&
(!active || zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active")))
return replica;
@ -4016,6 +4031,8 @@ void StorageReplicatedMergeTree::alter(
StorageInMemoryMetadata future_metadata = getInMemoryMetadata();
commands.apply(future_metadata, query_context);
merge_strategy_picker.refreshState();
changeSettings(future_metadata.settings_changes, table_lock_holder);
DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(query_context, table_id, future_metadata);

View File

@ -13,6 +13,7 @@
#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
#include <Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h>
#include <Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h>
#include <Storages/MergeTree/ReplicatedMergeTreeMergeStrategyPicker.h>
#include <Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h>
#include <Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h>
#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
@ -210,7 +211,6 @@ public:
bool canExecuteFetch(const ReplicatedMergeTreeLogEntry & entry, String & disable_reason) const;
private:
/// Get a sequential consistent view of current parts.
ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock getMaxAddedBlocks() const;
@ -222,11 +222,13 @@ private:
friend class ReplicatedMergeTreeCleanupThread;
friend class ReplicatedMergeTreeAlterThread;
friend class ReplicatedMergeTreeRestartingThread;
friend class ReplicatedMergeTreeMergeStrategyPicker;
friend struct ReplicatedMergeTreeLogEntry;
friend class ScopedPartitionMergeLock;
friend class ReplicatedMergeTreeQueue;
friend class MergeTreeData;
using MergeStrategyPicker = ReplicatedMergeTreeMergeStrategyPicker;
using LogEntry = ReplicatedMergeTreeLogEntry;
using LogEntryPtr = LogEntry::Ptr;
@ -262,6 +264,8 @@ private:
MergeTreeDataWriter writer;
MergeTreeDataMergerMutator merger_mutator;
MergeStrategyPicker merge_strategy_picker;
/** The queue of what needs to be done on this replica to catch up with everyone. It is taken from ZooKeeper (/replicas/me/queue/).
* In ZK entries in chronological order. Here it is not necessary.
*/
@ -478,6 +482,8 @@ private:
*/
String findReplicaHavingPart(const String & part_name, bool active);
bool checkReplicaHavePart(const String & replica, const String & part_name);
/** Find replica having specified part or any part that covers it.
* If active = true, consider only active replicas.
* If found, returns replica name and set 'entry->actual_new_part_name' to name of found largest covering part.

View File

@ -94,6 +94,7 @@ SRCS(
MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp
MergeTree/ReplicatedMergeTreeCleanupThread.cpp
MergeTree/ReplicatedMergeTreeLogEntry.cpp
MergeTree/ReplicatedMergeTreeMergeStrategyPicker.cpp
MergeTree/ReplicatedMergeTreeMutationEntry.cpp
MergeTree/ReplicatedMergeTreePartCheckThread.cpp
MergeTree/ReplicatedMergeTreePartHeader.cpp

View File

@ -0,0 +1,77 @@
############################
### emulate normal feature operation - merges are distributed between replicas
############################
### emulate execute_merges_on_single_replica_time_threshold timeout
############################
### timeout not exceeded, r1 waits for r2
Row 1:
──────
table: execute_on_single_replica_r1
type: MERGE_PARTS
new_part_name: all_0_0_5
has_postpones: 1
postpone_reason: Not executing merge for the part all_0_0_5, waiting for r2 to execute merge.
Row 2:
──────
table: execute_on_single_replica_r2
type: MERGE_PARTS
new_part_name: all_0_0_5
has_postpones: 0
postpone_reason:
############################
### timeout exceeded, r1 failed to get the merged part from r2 and did the merge by its own
Row 1:
──────
table: execute_on_single_replica_r2
type: MERGE_PARTS
new_part_name: all_0_0_5
has_postpones: 0
postpone_reason:
############################
### queue unfreeze
############################
### disable the feature
############################
### part_log
Row 1:
──────
part_name: all_0_0_1
mergers: ['execute_on_single_replica_r1']
fetchers: ['execute_on_single_replica_r2']
Row 2:
──────
part_name: all_0_0_2
mergers: ['execute_on_single_replica_r1']
fetchers: ['execute_on_single_replica_r2']
Row 3:
──────
part_name: all_0_0_3
mergers: ['execute_on_single_replica_r2']
fetchers: ['execute_on_single_replica_r1']
Row 4:
──────
part_name: all_0_0_4
mergers: ['execute_on_single_replica_r2']
fetchers: ['execute_on_single_replica_r1']
Row 5:
──────
part_name: all_0_0_5
mergers: ['execute_on_single_replica_r1','execute_on_single_replica_r2']
fetchers: []
Row 6:
──────
part_name: all_0_0_6
mergers: ['execute_on_single_replica_r1','execute_on_single_replica_r2']
fetchers: []
Row 7:
──────
part_name: all_0_0_7
mergers: ['execute_on_single_replica_r1','execute_on_single_replica_r2']
fetchers: []

View File

@ -0,0 +1,127 @@
DROP TABLE IF EXISTS execute_on_single_replica_r1 NO DELAY;
DROP TABLE IF EXISTS execute_on_single_replica_r2 NO DELAY;
/* that test requires fixed zookeeper path */
CREATE TABLE execute_on_single_replica_r1 (x UInt64) ENGINE=ReplicatedMergeTree('/clickhouse/tables/test_01532/execute_on_single_replica', 'r1') ORDER BY tuple() SETTINGS execute_merges_on_single_replica_time_threshold=10;
CREATE TABLE execute_on_single_replica_r2 (x UInt64) ENGINE=ReplicatedMergeTree('/clickhouse/tables/test_01532/execute_on_single_replica', 'r2') ORDER BY tuple() SETTINGS execute_merges_on_single_replica_time_threshold=10;
INSERT INTO execute_on_single_replica_r1 VALUES (1);
SYSTEM SYNC REPLICA execute_on_single_replica_r2;
SET optimize_throw_if_noop=1;
SELECT '############################';
SELECT '### emulate normal feature operation - merges are distributed between replicas';
/* all_0_0_1 - will be merged by r1, and downloaded by r2 */
OPTIMIZE TABLE execute_on_single_replica_r1 FINAL;
SYSTEM SYNC REPLICA execute_on_single_replica_r2;
/* all_0_0_2 - will be merged by r1, and downloaded by r2 */
OPTIMIZE TABLE execute_on_single_replica_r2 FINAL;
SYSTEM SYNC REPLICA execute_on_single_replica_r1;
/* all_0_0_3 - will be merged by r2, and downloaded by r1 */
OPTIMIZE TABLE execute_on_single_replica_r1 FINAL;
SYSTEM SYNC REPLICA execute_on_single_replica_r2;
/* all_0_0_4 - will be merged by r2, and downloaded by r1 */
OPTIMIZE TABLE execute_on_single_replica_r2 FINAL;
SYSTEM SYNC REPLICA execute_on_single_replica_r1;
SELECT '############################';
SELECT '### emulate execute_merges_on_single_replica_time_threshold timeout';
SYSTEM STOP REPLICATION QUEUES execute_on_single_replica_r2;
/* all_0_0_5 - should be merged by r2, but it has replication queue stopped, so r1 do the merge */
OPTIMIZE TABLE execute_on_single_replica_r1 FINAL SETTINGS replication_alter_partitions_sync=0;
/* if we will check immediately we can find the log entry unchecked */
SELECT * FROM numbers(4) where sleepEachRow(1);
SELECT '############################';
SELECT '### timeout not exceeded, r1 waits for r2';
/* we can now check that r1 waits for r2 */
SELECT
table,
type,
new_part_name,
num_postponed > 0 AS has_postpones,
postpone_reason
FROM system.replication_queue
WHERE table LIKE 'execute\\_on\\_single\\_replica\\_r%'
AND database = currentDatabase()
ORDER BY table
FORMAT Vertical;
/* we have execute_merges_on_single_replica_time_threshold exceeded */
SELECT * FROM numbers(10) where sleepEachRow(1);
SELECT '############################';
SELECT '### timeout exceeded, r1 failed to get the merged part from r2 and did the merge by its own';
SELECT
table,
type,
new_part_name,
num_postponed > 0 AS has_postpones,
postpone_reason
FROM system.replication_queue
WHERE table LIKE 'execute\\_on\\_single\\_replica\\_r%'
AND database = currentDatabase()
ORDER BY table
FORMAT Vertical;
SYSTEM START REPLICATION QUEUES execute_on_single_replica_r2;
SYSTEM SYNC REPLICA execute_on_single_replica_r2;
SELECT '############################';
SELECT '### queue unfreeze';
SELECT
table,
type,
new_part_name,
num_postponed > 0 AS has_postpones,
postpone_reason
FROM system.replication_queue
WHERE table LIKE 'execute\\_on\\_single\\_replica\\_r%'
AND database = currentDatabase()
ORDER BY table
FORMAT Vertical;
SELECT '############################';
SELECT '### disable the feature';
ALTER TABLE execute_on_single_replica_r1 MODIFY SETTING execute_merges_on_single_replica_time_threshold=0;
ALTER TABLE execute_on_single_replica_r2 MODIFY SETTING execute_merges_on_single_replica_time_threshold=0;
/* all_0_0_6 - we disabled the feature, both replicas will merge */
OPTIMIZE TABLE execute_on_single_replica_r2 FINAL;
/* all_0_0_7 - same */
OPTIMIZE TABLE execute_on_single_replica_r1 FINAL;
SYSTEM SYNC REPLICA execute_on_single_replica_r1;
SYSTEM SYNC REPLICA execute_on_single_replica_r2;
SYSTEM FLUSH LOGS;
SELECT '############################';
SELECT '### part_log';
SELECT
part_name,
arraySort(groupArrayIf(table, event_type = 'MergeParts')) AS mergers,
arraySort(groupArrayIf(table, event_type = 'DownloadPart')) AS fetchers
FROM system.part_log
WHERE (event_time > (now() - 40))
AND (table LIKE 'execute\\_on\\_single\\_replica\\_r%')
AND (part_name NOT LIKE '%\\_0')
AND (database = currentDatabase())
GROUP BY part_name
ORDER BY part_name
FORMAT Vertical;
DROP TABLE execute_on_single_replica_r1 NO DELAY;
DROP TABLE execute_on_single_replica_r2 NO DELAY;