make code better

This commit is contained in:
Alexander Tokmakov 2021-08-23 15:57:50 +03:00
parent 9ef0b00803
commit cc9c2fd63b
6 changed files with 60 additions and 100 deletions

View File

@ -97,7 +97,7 @@ class IColumn;
M(Bool, optimize_move_to_prewhere_if_final, false, "If query has `FINAL`, the optimization `move_to_prewhere` is not always correct and it is enabled only if both settings `optimize_move_to_prewhere` and `optimize_move_to_prewhere_if_final` are turned on", 0) \
\
M(UInt64, replication_alter_partitions_sync, 1, "Wait for actions to manipulate the partitions. 0 - do not wait, 1 - wait for execution only of itself, 2 - wait for everyone.", 0) \
M(Int64, replication_wait_for_inactive_replica_timeout, 120, "Wait for inactive replica to execute ALTER/OPTIMIZE. Time in seconds, 0 - wait unlimited time, negative - do not wait.", 0) \
M(Int64, replication_wait_for_inactive_replica_timeout, 120, "Wait for inactive replica to execute ALTER/OPTIMIZE. Time in seconds, 0 - do not wait, negative - wait for unlimited time.", 0) \
\
M(LoadBalancing, load_balancing, LoadBalancing::RANDOM, "Which replicas (among healthy replicas) to preferably send a query to (on the first attempt) for distributed processing.", 0) \
M(UInt64, load_balancing_first_offset, 0, "Which replica to preferably send a query when FIRST_OR_RANDOM load balancing strategy is used.", 0) \

View File

@ -194,7 +194,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil::
/// This wait in background schedule pool is useless. It'd be
/// better to have some notification which will call `step`
/// function when all replicated will finish. TODO.
storage.waitForAllReplicasToProcessLogEntry(log_entry, 0);
storage.waitForAllReplicasToProcessLogEntry(zookeeper_path, log_entry, 0);
}
{
@ -231,7 +231,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil::
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses.back()).path_created;
log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
storage.waitForAllTableReplicasToProcessLogEntry(entry.to_shard, log_entry, 0);
storage.waitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, 0);
}
{
@ -269,7 +269,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil::
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses.back()).path_created;
log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
storage.waitForAllTableReplicasToProcessLogEntry(entry.to_shard, log_entry, 0);
storage.waitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, 0);
}
{
@ -318,7 +318,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil::
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses.back()).path_created;
log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
storage.waitForAllTableReplicasToProcessLogEntry(entry.to_shard, log_entry, 0);
storage.waitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, 0);
}
{
@ -348,7 +348,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil::
{
ReplicatedMergeTreeLogEntry log_entry;
if (storage.dropPartImpl(zk, entry.part_name, log_entry, false, false))
storage.waitForAllReplicasToProcessLogEntry(log_entry, 0);
storage.waitForAllReplicasToProcessLogEntry(zookeeper_path, log_entry, 0);
}
{

View File

@ -4721,14 +4721,8 @@ bool StorageReplicatedMergeTree::optimize(
table_lock.reset();
Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout;
for (auto & merge_entry : merge_entries)
{
if (query_context->getSettingsRef().replication_alter_partitions_sync == 1)
waitForReplicaToProcessLogEntry(replica_name, merge_entry, wait_for_inactive_timeout);
else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2)
waitForAllReplicasToProcessLogEntry(merge_entry, wait_for_inactive_timeout);
}
waitForLogEntryToBeProcessedIfNecessary(merge_entry, query_context);
return true;
}
@ -5039,21 +5033,8 @@ void StorageReplicatedMergeTree::alter(
table_lock_holder.reset();
std::vector<String> unwaited;
Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout;
if (query_context->getSettingsRef().replication_alter_partitions_sync == 2)
{
LOG_DEBUG(log, "Updated shared metadata nodes in ZooKeeper. Waiting for replicas to apply changes.");
unwaited = tryWaitForAllReplicasToProcessLogEntry(*alter_entry, wait_for_inactive_timeout);
}
else if (query_context->getSettingsRef().replication_alter_partitions_sync == 1)
{
LOG_DEBUG(log, "Updated shared metadata nodes in ZooKeeper. Waiting for replicas to apply changes.");
waitForReplicaToProcessLogEntry(replica_name, *alter_entry, wait_for_inactive_timeout);
}
if (!unwaited.empty())
throw Exception("Some replicas doesn't finish metadata alter", ErrorCodes::UNFINISHED);
LOG_DEBUG(log, "Updated shared metadata nodes in ZooKeeper. Waiting for replicas to apply changes.");
waitForLogEntryToBeProcessedIfNecessary(*alter_entry, query_context, "Some replicas doesn't finish metadata alter: ");
if (mutation_znode)
{
@ -5204,12 +5185,7 @@ void StorageReplicatedMergeTree::dropPart(const String & part_name, bool detach,
dropPartImpl(zookeeper, part_name, entry, detach, /*throw_if_noop=*/ true);
/// If necessary, wait until the operation is performed on itself or on all replicas.
Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout;
if (query_context->getSettingsRef().replication_alter_partitions_sync == 1)
waitForReplicaToProcessLogEntry(replica_name, entry, wait_for_inactive_timeout);
else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2)
waitForAllReplicasToProcessLogEntry(entry, wait_for_inactive_timeout);
waitForLogEntryToBeProcessedIfNecessary(entry, query_context);
}
void StorageReplicatedMergeTree::dropPartition(const ASTPtr & partition, bool detach, ContextPtr query_context)
@ -5226,13 +5202,7 @@ void StorageReplicatedMergeTree::dropPartition(const ASTPtr & partition, bool de
if (did_drop)
{
/// If necessary, wait until the operation is performed on itself or on all replicas.
Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout;
if (query_context->getSettingsRef().replication_alter_partitions_sync == 1)
waitForReplicaToProcessLogEntry(replica_name, entry, wait_for_inactive_timeout);
else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2)
waitForAllReplicasToProcessLogEntry(entry, wait_for_inactive_timeout);
waitForLogEntryToBeProcessedIfNecessary(entry, query_context);
cleanLastPartNode(partition_id);
}
}
@ -5260,14 +5230,8 @@ void StorageReplicatedMergeTree::truncate(
entries_to_wait.push_back(std::move(entry));
}
Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout;
for (const auto & entry : entries_to_wait)
{
if (query_context->getSettingsRef().replication_alter_partitions_sync == 1)
waitForReplicaToProcessLogEntry(replica_name, *entry, wait_for_inactive_timeout);
else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2)
waitForAllReplicasToProcessLogEntry(*entry, wait_for_inactive_timeout);
}
waitForLogEntryToBeProcessedIfNecessary(*entry, query_context);
}
@ -5425,7 +5389,7 @@ StorageReplicatedMergeTree::allocateBlockNumber(
}
Strings StorageReplicatedMergeTree::tryWaitForAllTableReplicasToProcessLogEntry(
Strings StorageReplicatedMergeTree::tryWaitForAllReplicasToProcessLogEntry(
const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout)
{
LOG_DEBUG(log, "Waiting for all replicas to process {}", entry.znode_name);
@ -5433,12 +5397,12 @@ Strings StorageReplicatedMergeTree::tryWaitForAllTableReplicasToProcessLogEntry(
auto zookeeper = getZooKeeper();
Strings replicas = zookeeper->getChildren(fs::path(table_zookeeper_path) / "replicas");
Strings unwaited;
bool wait_for_inactive = 0 <= wait_for_inactive_timeout;
bool wait_for_inactive = wait_for_inactive_timeout != 0;
for (const String & replica : replicas)
{
if (wait_for_inactive || zookeeper->exists(fs::path(table_zookeeper_path) / "replicas" / replica / "is_active"))
{
if (!waitForTableReplicaToProcessLogEntry(table_zookeeper_path, replica, entry, wait_for_inactive_timeout))
if (!tryWaitForReplicaToProcessLogEntry(table_zookeeper_path, replica, entry, wait_for_inactive_timeout))
unwaited.push_back(replica);
}
else
@ -5451,30 +5415,37 @@ Strings StorageReplicatedMergeTree::tryWaitForAllTableReplicasToProcessLogEntry(
return unwaited;
}
void StorageReplicatedMergeTree::waitForAllTableReplicasToProcessLogEntry(
const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout)
void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(
const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout, const String & error_context)
{
Strings unfinished_replicas = tryWaitForAllTableReplicasToProcessLogEntry(table_zookeeper_path, entry, wait_for_inactive_timeout);
Strings unfinished_replicas = tryWaitForAllReplicasToProcessLogEntry(table_zookeeper_path, entry, wait_for_inactive_timeout);
if (unfinished_replicas.empty())
return;
throw Exception(ErrorCodes::UNFINISHED, "Timeout exceeded while waiting for replicas {} to process entry {}. "
"Probably some replicas are inactive", fmt::join(unfinished_replicas, ", "), entry.znode_name);
throw Exception(ErrorCodes::UNFINISHED, "{}Timeout exceeded while waiting for replicas {} to process entry {}. "
"Probably some replicas are inactive", error_context, fmt::join(unfinished_replicas, ", "), entry.znode_name);
}
Strings StorageReplicatedMergeTree::tryWaitForAllReplicasToProcessLogEntry(
const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout)
void StorageReplicatedMergeTree::waitForLogEntryToBeProcessedIfNecessary(const ReplicatedMergeTreeLogEntryData & entry, ContextPtr query_context, const String & error_context)
{
return tryWaitForAllTableReplicasToProcessLogEntry(zookeeper_path, entry, wait_for_inactive_timeout);
/// If necessary, wait until the operation is performed on itself or on all replicas.
Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout;
if (query_context->getSettingsRef().replication_alter_partitions_sync == 1)
{
bool finished = tryWaitForReplicaToProcessLogEntry(zookeeper_path, replica_name, entry, wait_for_inactive_timeout);
if (!finished)
{
throw Exception(ErrorCodes::UNFINISHED, "{}Log entry {} is not precessed on local replica, "
"most likely because the replica was shut down.", error_context, entry.znode_name);
}
}
else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2)
{
waitForAllReplicasToProcessLogEntry(zookeeper_path, entry, wait_for_inactive_timeout, error_context);
}
}
void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(
const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout)
{
waitForAllTableReplicasToProcessLogEntry(zookeeper_path, entry, wait_for_inactive_timeout);
}
bool StorageReplicatedMergeTree::waitForTableReplicaToProcessLogEntry(
bool StorageReplicatedMergeTree::tryWaitForReplicaToProcessLogEntry(
const String & table_zookeeper_path, const String & replica, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout)
{
String entry_str = entry.toString();
@ -5493,13 +5464,16 @@ bool StorageReplicatedMergeTree::waitForTableReplicaToProcessLogEntry(
*/
bool waiting_itself = replica == replica_name;
bool wait_for_inactive = 0 <= wait_for_inactive_timeout;
/// Do not wait if timeout is zero
bool wait_for_inactive = wait_for_inactive_timeout != 0;
/// Wait for unlimited time if timeout is negative
bool check_timeout = wait_for_inactive_timeout > 0;
Stopwatch time_waiting;
const auto & stop_waiting = [&]()
{
bool stop_waiting_itself = waiting_itself && partial_shutdown_called;
bool timeout_exceeded = 0 < wait_for_inactive_timeout && wait_for_inactive_timeout < time_waiting.elapsedSeconds();
bool timeout_exceeded = check_timeout && wait_for_inactive_timeout < time_waiting.elapsedSeconds();
bool stop_waiting_inactive = (!wait_for_inactive || timeout_exceeded)
&& !getZooKeeper()->exists(fs::path(table_zookeeper_path) / "replicas" / replica / "is_active");
return is_dropped || stop_waiting_itself || stop_waiting_inactive;
@ -5578,13 +5552,6 @@ bool StorageReplicatedMergeTree::waitForTableReplicaToProcessLogEntry(
}
bool StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(
const String & replica, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout)
{
return waitForTableReplicaToProcessLogEntry(zookeeper_path, replica, entry, wait_for_inactive_timeout);
}
void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields)
{
auto zookeeper = tryGetZooKeeper();
@ -6595,12 +6562,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
lock2.reset();
lock1.reset();
/// If necessary, wait until the operation is performed on all replicas.
Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout;
if (query_context->getSettingsRef().replication_alter_partitions_sync == 1)
waitForReplicaToProcessLogEntry(replica_name, entry, wait_for_inactive_timeout);
else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2)
waitForAllReplicasToProcessLogEntry(entry, wait_for_inactive_timeout);
waitForLogEntryToBeProcessedIfNecessary(entry, query_context);
}
void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr query_context)
@ -6780,11 +6742,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
cleanup_thread.wakeup();
lock2.reset();
Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout;
if (query_context->getSettingsRef().replication_alter_partitions_sync == 1)
waitForReplicaToProcessLogEntry(replica_name, entry, wait_for_inactive_timeout);
else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2)
waitForAllReplicasToProcessLogEntry(entry, wait_for_inactive_timeout);
dest_table_storage->waitForLogEntryToBeProcessedIfNecessary(entry, query_context);
/// Create DROP_RANGE for the source table
alter_partition_version_path = zookeeper_path + "/alter_partition_version";
@ -6812,10 +6770,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
entry_delete.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
lock1.reset();
if (query_context->getSettingsRef().replication_alter_partitions_sync == 1)
waitForReplicaToProcessLogEntry(replica_name, entry_delete, wait_for_inactive_timeout);
else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2)
waitForAllReplicasToProcessLogEntry(entry_delete, wait_for_inactive_timeout);
waitForLogEntryToBeProcessedIfNecessary(entry_delete, query_context);
/// Cleaning possibly stored information about parts from /quorum/last_part node in ZooKeeper.
cleanLastPartNode(partition_id);

View File

@ -630,24 +630,27 @@ private:
const String & zookeeper_block_id_path = "", const String & zookeeper_path_prefix = "") const;
/** Wait until all replicas, including this, execute the specified action from the log.
* If replicas are added at the same time, it can not wait the added replica .
* If replicas are added at the same time, it can not wait the added replica.
*
* Waits for inactive replicas no more than wait_for_inactive_timeout.
* Returns list of inactive replicas that have not executed entry or throws exception.
*
* NOTE: This method must be called without table lock held.
* Because it effectively waits for other thread that usually has to also acquire a lock to proceed and this yields deadlock.
* TODO: There are wrong usages of this method that are not fixed yet.
*
* One method for convenient use on current table, another for waiting on foreign shards.
*/
void waitForAllTableReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout);
void waitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout);
Strings tryWaitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout);
Strings tryWaitForAllTableReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout);
void waitForAllReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry,
Int64 wait_for_inactive_timeout, const String & error_context = {});
Strings tryWaitForAllReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry,
Int64 wait_for_inactive_timeout);
/** Wait until the specified replica executes the specified action from the log.
* NOTE: See comment about locks above.
*/
bool waitForTableReplicaToProcessLogEntry(const String & table_zookeeper_path, const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout = 0);
bool waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout = 0);
bool tryWaitForReplicaToProcessLogEntry(const String & table_zookeeper_path, const String & replica_name,
const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout = 0);
/// Depending on settings, do nothing or wait for this replica or all replicas process log entry.
void waitForLogEntryToBeProcessedIfNecessary(const ReplicatedMergeTreeLogEntryData & entry, ContextPtr query_context, const String & error_context = {});
/// Throw an exception if the table is readonly.
void assertNotReadonly() const;

View File

@ -12,6 +12,7 @@ SYSTEM SYNC REPLICA byte_identical_r2;
ALTER TABLE byte_identical_r1 ADD COLUMN y UInt64 DEFAULT rand();
SYSTEM SYNC REPLICA byte_identical_r1;
SYSTEM SYNC REPLICA byte_identical_r2;
SET replication_alter_partitions_sync=2;
OPTIMIZE TABLE byte_identical_r1 PARTITION tuple() FINAL;
SELECT x, t1.y - t2.y FROM byte_identical_r1 t1 SEMI LEFT JOIN byte_identical_r2 t2 USING x ORDER BY x;

View File

@ -98,6 +98,7 @@ SELECT '*** disable the feature';
ALTER TABLE execute_on_single_replica_r1 MODIFY SETTING execute_merges_on_single_replica_time_threshold=0;
ALTER TABLE execute_on_single_replica_r2 MODIFY SETTING execute_merges_on_single_replica_time_threshold=0;
SET replication_alter_partitions_sync=2;
/* all_0_0_6 - we disabled the feature, both replicas will merge */
OPTIMIZE TABLE execute_on_single_replica_r2 FINAL;
/* all_0_0_7 - same */