Merge pull request #34297 from ClickHouse/fix_restarting_thread

Try to fix race between pullLogsToQueue and RestartingThread
This commit is contained in:
tavplubix 2022-02-04 17:30:17 +03:00 committed by GitHub
commit 22de534fdc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 166 additions and 141 deletions

View File

@ -20,7 +20,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int UNEXPECTED_NODE_IN_ZOOKEEPER;
extern const int ABORTED;
extern const int READONLY;
}
@ -555,10 +554,17 @@ bool ReplicatedMergeTreeQueue::removeFailedQuorumPart(const MergeTreePartInfo &
int32_t ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback, PullLogsReason reason)
{
std::lock_guard lock(pull_logs_to_queue_mutex);
if (storage.is_readonly && reason == SYNC)
if (reason != LOAD)
{
throw Exception(ErrorCodes::READONLY, "Cannot SYNC REPLICA, because replica is readonly");
/// TODO throw logical error for other reasons (except LOAD)
/// It's totally ok to load queue on readonly replica (that's what RestartingThread does on initialization).
/// It's ok if replica became readonly due to connection loss after we got current zookeeper (in this case zookeeper must be expired).
/// And it's ok if replica became readonly after shutdown.
/// In other cases it's likely that someone called pullLogsToQueue(...) when queue is not initialized yet by RestartingThread.
bool not_completely_initialized = storage.is_readonly && !zookeeper->expired() && !storage.shutdown_called;
if (not_completely_initialized)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Tried to pull logs to queue (reason: {}) on readonly replica {}, it's a bug",
reason, storage.getStorageID().getNameForLogs());
}
if (pull_log_blocker.isCancelled())

View File

@ -58,106 +58,108 @@ void ReplicatedMergeTreeRestartingThread::run()
if (need_stop)
return;
bool reschedule_now = false;
size_t reschedule_period_ms = check_period_ms;
try
{
if (first_time || readonly_mode_was_set || storage.getZooKeeper()->expired())
{
startup_completed = false;
if (first_time)
{
LOG_DEBUG(log, "Activating replica.");
}
else
{
if (storage.getZooKeeper()->expired())
{
LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session.");
setReadonly();
}
else if (readonly_mode_was_set)
{
LOG_WARNING(log, "Table was in readonly mode. Will try to activate it.");
}
partialShutdown();
}
if (!startup_completed)
{
try
{
storage.setZooKeeper();
}
catch (const Coordination::Exception &)
{
/// The exception when you try to zookeeper_init usually happens if DNS does not work. We will try to do it again.
tryLogCurrentException(log, __PRETTY_FUNCTION__);
/// Here we're almost sure the table is already readonly, but it doesn't hurt to enforce it.
setReadonly();
if (first_time)
storage.startup_event.set();
task->scheduleAfter(retry_period_ms);
return;
}
if (!need_stop && !tryStartup())
{
/// We couldn't startup replication. Table must be readonly.
/// Otherwise it can have partially initialized queue and other
/// strange parts of state.
setReadonly();
if (first_time)
storage.startup_event.set();
task->scheduleAfter(retry_period_ms);
return;
}
if (first_time)
storage.startup_event.set();
startup_completed = true;
}
if (need_stop)
return;
bool old_val = true;
if (storage.is_readonly.compare_exchange_strong(old_val, false))
{
readonly_mode_was_set = false;
CurrentMetrics::sub(CurrentMetrics::ReadonlyReplica);
}
first_time = false;
}
bool replica_is_active = runImpl();
if (!replica_is_active)
reschedule_period_ms = retry_period_ms;
}
catch (const Exception & e)
{
/// We couldn't activate table let's set it into readonly mode
setReadonly();
partialShutdown();
storage.startup_event.set();
tryLogCurrentException(log, __PRETTY_FUNCTION__);
if (e.code() == ErrorCodes::REPLICA_STATUS_CHANGED)
reschedule_now = true;
reschedule_period_ms = 0;
}
catch (...)
{
setReadonly();
partialShutdown();
storage.startup_event.set();
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
if (reschedule_now)
task->schedule();
if (first_time)
{
if (storage.is_readonly)
{
/// We failed to start replication, table is still readonly, so we should increment the metric. See also setNotReadonly().
CurrentMetrics::add(CurrentMetrics::ReadonlyReplica);
}
/// It does not matter if replication is actually started or not, just notify after the first attempt.
storage.startup_event.set();
first_time = false;
}
if (need_stop)
return;
if (reschedule_period_ms)
task->scheduleAfter(reschedule_period_ms);
else
task->scheduleAfter(check_period_ms);
task->schedule();
}
bool ReplicatedMergeTreeRestartingThread::runImpl()
{
if (!storage.is_readonly && !storage.getZooKeeper()->expired())
return true;
if (first_time)
{
LOG_DEBUG(log, "Activating replica.");
assert(storage.is_readonly);
}
else if (storage.is_readonly)
{
LOG_WARNING(log, "Table was in readonly mode. Will try to activate it.");
}
else if (storage.getZooKeeper()->expired())
{
LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session.");
partialShutdown();
}
else
{
__builtin_unreachable();
}
try
{
storage.setZooKeeper();
}
catch (const Coordination::Exception &)
{
/// The exception when you try to zookeeper_init usually happens if DNS does not work. We will try to do it again.
tryLogCurrentException(log, __PRETTY_FUNCTION__);
assert(storage.is_readonly);
return false;
}
if (need_stop)
return false;
if (!tryStartup())
{
assert(storage.is_readonly);
return false;
}
setNotReadonly();
/// Start queue processing
storage.background_operations_assignee.start();
storage.queue_updating_task->activateAndSchedule();
storage.mutations_updating_task->activateAndSchedule();
storage.mutations_finalizing_task->activateAndSchedule();
storage.merge_selecting_task->activateAndSchedule();
storage.cleanup_thread.start();
storage.part_check_thread.start();
return true;
}
@ -202,17 +204,6 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
storage.partial_shutdown_called = false;
storage.partial_shutdown_event.reset();
/// Start queue processing
storage.background_operations_assignee.start();
storage.queue_updating_task->activateAndSchedule();
storage.mutations_updating_task->activateAndSchedule();
storage.mutations_finalizing_task->activateAndSchedule();
storage.merge_selecting_task->activateAndSchedule();
storage.cleanup_thread.start();
storage.part_check_thread.start();
return true;
}
catch (...)
@ -366,8 +357,9 @@ void ReplicatedMergeTreeRestartingThread::activateReplica()
}
void ReplicatedMergeTreeRestartingThread::partialShutdown()
void ReplicatedMergeTreeRestartingThread::partialShutdown(bool part_of_full_shutdown)
{
setReadonly(part_of_full_shutdown);
ProfileEvents::increment(ProfileEvents::ReplicaPartialShutdown);
storage.partial_shutdown_called = true;
@ -403,25 +395,35 @@ void ReplicatedMergeTreeRestartingThread::shutdown()
task->deactivate();
LOG_TRACE(log, "Restarting thread finished");
/// For detach table query, we should reset the ReadonlyReplica metric.
if (readonly_mode_was_set)
{
CurrentMetrics::sub(CurrentMetrics::ReadonlyReplica);
readonly_mode_was_set = false;
}
/// Stop other tasks.
partialShutdown();
partialShutdown(/* part_of_full_shutdown */ true);
}
void ReplicatedMergeTreeRestartingThread::setReadonly()
void ReplicatedMergeTreeRestartingThread::setReadonly(bool on_shutdown)
{
bool old_val = false;
if (storage.is_readonly.compare_exchange_strong(old_val, true))
{
readonly_mode_was_set = true;
bool became_readonly = storage.is_readonly.compare_exchange_strong(old_val, true);
/// Do not increment the metric if replica became readonly due to shutdown.
if (became_readonly && on_shutdown)
return;
if (became_readonly)
CurrentMetrics::add(CurrentMetrics::ReadonlyReplica);
}
/// Replica was already readonly, but we should decrement the metric, because we are detaching/dropping table.
if (on_shutdown)
CurrentMetrics::sub(CurrentMetrics::ReadonlyReplica);
}
void ReplicatedMergeTreeRestartingThread::setNotReadonly()
{
bool old_val = true;
/// is_readonly is true on startup, but ReadonlyReplica metric is not incremented,
/// because we don't want to change this metric if replication is started successfully.
/// So we should not decrement it when replica stopped being readonly on startup.
if (storage.is_readonly.compare_exchange_strong(old_val, false) && !first_time)
CurrentMetrics::sub(CurrentMetrics::ReadonlyReplica);
}
}

View File

@ -36,19 +36,18 @@ private:
Poco::Logger * log;
std::atomic<bool> need_stop {false};
// We need it besides `storage.is_readonly`, because `shutdown()` may be called many times, that way `storage.is_readonly` will not change.
bool readonly_mode_was_set = false;
/// The random data we wrote into `/replicas/me/is_active`.
String active_node_identifier;
BackgroundSchedulePool::TaskHolder task;
Int64 check_period_ms; /// The frequency of checking expiration of session in ZK.
bool first_time = true; /// Activate replica for the first time.
bool startup_completed = false;
void run();
/// Restarts table if needed, returns false if it failed to restart replica.
bool runImpl();
/// Start or stop background threads. Used for partial reinitialization when re-creating a session in ZooKeeper.
bool tryStartup(); /// Returns false if ZooKeeper is not available.
@ -61,10 +60,13 @@ private:
/// If there is an unreachable quorum, and we have a part, then add this replica to the quorum.
void updateQuorumIfWeHavePart();
void partialShutdown();
void partialShutdown(bool part_of_full_shutdown = false);
/// Set readonly mode for table
void setReadonly();
void setReadonly(bool on_shutdown = false);
/// Disable readonly mode for table
void setNotReadonly();
};

View File

@ -194,6 +194,16 @@ zkutil::ZooKeeperPtr StorageReplicatedMergeTree::getZooKeeper() const
return res;
}
zkutil::ZooKeeperPtr StorageReplicatedMergeTree::getZooKeeperAndAssertNotReadonly() const
{
/// There's a short period of time after connection loss when new session is created,
/// but replication queue is not reinitialized. We must ensure that table is not readonly anymore
/// before using new ZooKeeper session to write something (except maybe GET_PART) into replication log.
auto res = getZooKeeper();
assertNotReadonly();
return res;
}
static MergeTreePartInfo makeDummyDropRangeForMovePartitionOrAttachPartitionFrom(const String & partition_id)
{
/// NOTE We don't have special log entry type for MOVE PARTITION/ATTACH PARTITION FROM,
@ -335,14 +345,13 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
/// Do not activate the replica. It will be readonly.
LOG_ERROR(log, "No ZooKeeper: table will be in readonly mode.");
is_readonly = true;
has_metadata_in_zookeeper = std::nullopt;
return;
}
if (attach && !current_zookeeper->exists(zookeeper_path + "/metadata"))
{
LOG_WARNING(log, "No metadata in ZooKeeper for {}: table will be in readonly mode.", zookeeper_path);
is_readonly = true;
has_metadata_in_zookeeper = false;
return;
}
@ -354,11 +363,12 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
if (attach && !current_zookeeper->exists(replica_path))
{
LOG_WARNING(log, "No metadata in ZooKeeper for {}: table will be in readonly mode", replica_path);
is_readonly = true;
has_metadata_in_zookeeper = false;
return;
}
has_metadata_in_zookeeper = true;
if (!attach)
{
if (!getDataParts().empty())
@ -2776,7 +2786,7 @@ void StorageReplicatedMergeTree::queueUpdatingTask()
}
try
{
queue.pullLogsToQueue(getZooKeeper(), queue_updating_task->getWatchCallback(), ReplicatedMergeTreeQueue::UPDATE);
queue.pullLogsToQueue(getZooKeeperAndAssertNotReadonly(), queue_updating_task->getWatchCallback(), ReplicatedMergeTreeQueue::UPDATE);
last_queue_update_finish_time.store(time(nullptr));
queue_update_in_progress = false;
}
@ -2985,7 +2995,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
/// (OPTIMIZE queries) can assign new merges.
std::lock_guard merge_selecting_lock(merge_selecting_mutex);
auto zookeeper = getZooKeeper();
auto zookeeper = getZooKeeperAndAssertNotReadonly();
ReplicatedMergeTreeMergePredicate merge_pred = queue.getMergePredicate(zookeeper);
@ -3086,7 +3096,7 @@ void StorageReplicatedMergeTree::mutationsFinalizingTask()
try
{
needs_reschedule = queue.tryFinalizeMutations(getZooKeeper());
needs_reschedule = queue.tryFinalizeMutations(getZooKeeperAndAssertNotReadonly());
}
catch (...)
{
@ -4029,7 +4039,8 @@ bool StorageReplicatedMergeTree::fetchExistsPart(const String & part_name, const
void StorageReplicatedMergeTree::startup()
{
if (is_readonly)
/// Do not start replication if ZooKeeper is not configured or there is no metadata in zookeeper
if (!has_metadata_in_zookeeper.has_value() || !*has_metadata_in_zookeeper)
return;
try
@ -4044,7 +4055,10 @@ void StorageReplicatedMergeTree::startup()
/// In this thread replica will be activated.
restarting_thread.start();
/// Wait while restarting_thread finishing initialization
/// Wait while restarting_thread finishing initialization.
/// NOTE It does not mean that replication is actually started after receiving this event.
/// It only means that an attempt to startup replication was made.
/// Table may be still in readonly mode if this attempt failed for any reason.
startup_event.wait();
startBackgroundMovesIfNeeded();
@ -4334,7 +4348,7 @@ bool StorageReplicatedMergeTree::optimize(
return false;
};
auto zookeeper = getZooKeeper();
auto zookeeper = getZooKeeperAndAssertNotReadonly();
UInt64 disk_space = getStoragePolicy()->getMaxUnreservedFreeSpace();
const auto storage_settings_ptr = getSettings();
auto metadata_snapshot = getInMemoryMetadataPtr();
@ -4553,7 +4567,7 @@ void StorageReplicatedMergeTree::alter(
return queryToString(query);
};
const auto zookeeper = getZooKeeper();
const auto zookeeper = getZooKeeperAndAssertNotReadonly();
std::optional<ReplicatedMergeTreeLogEntryData> alter_entry;
std::optional<String> mutation_znode;
@ -4857,7 +4871,6 @@ void StorageReplicatedMergeTree::restoreMetadataInZooKeeper()
LOG_INFO(log, "Created ZK nodes for table");
is_readonly = false;
has_metadata_in_zookeeper = true;
if (is_first_replica)
@ -4875,7 +4888,7 @@ void StorageReplicatedMergeTree::dropPartNoWaitNoThrow(const String & part_name)
if (!is_leader)
throw Exception("DROP PART cannot be done on this replica because it is not a leader", ErrorCodes::NOT_A_LEADER);
zkutil::ZooKeeperPtr zookeeper = getZooKeeper();
zkutil::ZooKeeperPtr zookeeper = getZooKeeperAndAssertNotReadonly();
LogEntry entry;
dropPartImpl(zookeeper, part_name, entry, /*detach=*/ false, /*throw_if_noop=*/ false);
@ -4887,7 +4900,7 @@ void StorageReplicatedMergeTree::dropPart(const String & part_name, bool detach,
if (!is_leader)
throw Exception("DROP PART cannot be done on this replica because it is not a leader", ErrorCodes::NOT_A_LEADER);
zkutil::ZooKeeperPtr zookeeper = getZooKeeper();
zkutil::ZooKeeperPtr zookeeper = getZooKeeperAndAssertNotReadonly();
LogEntry entry;
dropPartImpl(zookeeper, part_name, entry, detach, /*throw_if_noop=*/ true);
@ -4901,7 +4914,7 @@ void StorageReplicatedMergeTree::dropPartition(const ASTPtr & partition, bool de
if (!is_leader)
throw Exception("DROP PARTITION cannot be done on this replica because it is not a leader", ErrorCodes::NOT_A_LEADER);
zkutil::ZooKeeperPtr zookeeper = getZooKeeper();
zkutil::ZooKeeperPtr zookeeper = getZooKeeperAndAssertNotReadonly();
LogEntry entry;
String partition_id = getPartitionIDFromQuery(partition, query_context);
@ -4924,7 +4937,7 @@ void StorageReplicatedMergeTree::truncate(
if (!is_leader)
throw Exception("TRUNCATE cannot be done on this replica because it is not a leader", ErrorCodes::NOT_A_LEADER);
zkutil::ZooKeeperPtr zookeeper = getZooKeeper();
zkutil::ZooKeeperPtr zookeeper = getZooKeeperAndAssertNotReadonly();
Strings partitions = zookeeper->getChildren(fs::path(zookeeper_path) / "block_numbers");
@ -4948,7 +4961,9 @@ PartitionCommandsResultInfo StorageReplicatedMergeTree::attachPartition(
bool attach_part,
ContextPtr query_context)
{
assertNotReadonly();
/// Allow ATTACH PARTITION on readonly replica when restoring it.
if (!are_restoring_replica)
assertNotReadonly();
PartitionCommandsResultInfo results;
PartsTemporaryRename renamed_parts(*this, "detached/");
@ -5870,7 +5885,7 @@ CancellationCode StorageReplicatedMergeTree::killMutation(const String & mutatio
{
assertNotReadonly();
zkutil::ZooKeeperPtr zookeeper = getZooKeeper();
zkutil::ZooKeeperPtr zookeeper = getZooKeeperAndAssertNotReadonly();
LOG_INFO(log, "Killing mutation {}", mutation_id);
@ -6609,7 +6624,7 @@ void StorageReplicatedMergeTree::movePartitionToShard(
if (zkutil::normalizeZooKeeperPath(zookeeper_path, /* check_starts_with_slash */ true) == zkutil::normalizeZooKeeperPath(to, /* check_starts_with_slash */ true))
throw Exception("Source and destination are the same", ErrorCodes::BAD_ARGUMENTS);
auto zookeeper = getZooKeeper();
auto zookeeper = getZooKeeperAndAssertNotReadonly();
String part_name = partition->as<ASTLiteral &>().value.safeGet<String>();
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
@ -6795,7 +6810,7 @@ bool StorageReplicatedMergeTree::waitForShrinkingQueueSize(size_t queue_size, UI
Stopwatch watch;
/// Let's fetch new log entries firstly
queue.pullLogsToQueue(getZooKeeper(), {}, ReplicatedMergeTreeQueue::SYNC);
queue.pullLogsToQueue(getZooKeeperAndAssertNotReadonly(), {}, ReplicatedMergeTreeQueue::SYNC);
/// This is significant, because the execution of this task could be delayed at BackgroundPool.
/// And we force it to be executed.

View File

@ -321,10 +321,12 @@ private:
zkutil::ZooKeeperPtr tryGetZooKeeper() const;
zkutil::ZooKeeperPtr getZooKeeper() const;
zkutil::ZooKeeperPtr getZooKeeperAndAssertNotReadonly() const;
void setZooKeeper();
/// If true, the table is offline and can not be written to it.
std::atomic_bool is_readonly {false};
/// This flag is managed by RestartingThread.
std::atomic_bool is_readonly {true};
/// If nullopt - ZooKeeper is not available, so we don't know if there is table metadata.
/// If false - ZooKeeper is available, but there is no table metadata. It's safe to drop table in this case.
std::optional<bool> has_metadata_in_zookeeper;

View File

@ -81,5 +81,4 @@ check_replication_consistency "alter_table" "count(), sum(a), sum(b), round(sum(
$CLICKHOUSE_CLIENT -n -q "DROP TABLE alter_table0;" 2> >(grep -F -v 'is already started to be removing by another replica right now') &
$CLICKHOUSE_CLIENT -n -q "DROP TABLE alter_table1;" 2> >(grep -F -v 'is already started to be removing by another replica right now') &
wait

View File

@ -107,4 +107,5 @@ check_replication_consistency "alter_table_" "count(), sum(a), sum(b), round(sum
for i in {0..9}; do
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS alter_table_$i" 2>&1 | grep "was not completely removed from ZooKeeper" &
done
wait

View File

@ -80,7 +80,6 @@ timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
wait
echo "Finishing alters"

View File

@ -126,4 +126,5 @@ for ((i=0; i<16; i++)) do
$CLICKHOUSE_CLIENT -q "DROP TABLE dst_$i" 2>&1| grep -Fv "is already started to be removing" &
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS src_$i" 2>&1| grep -Fv "is already started to be removing" &
done
wait

View File

@ -67,5 +67,4 @@ done
$CLICKHOUSE_CLIENT --query "SHOW CREATE TABLE concurrent_mutate_kill"
$CLICKHOUSE_CLIENT --query "OPTIMIZE TABLE concurrent_mutate_kill FINAL"
$CLICKHOUSE_CLIENT --query "SELECT sum(value) FROM concurrent_mutate_kill"
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_mutate_kill"

View File

@ -74,5 +74,4 @@ $CLICKHOUSE_CLIENT --query "SELECT COUNT() > 0 FROM system.part_log where table
for i in $(seq 1 $NUM_REPLICAS); do
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS ttl_table$i" &
done
wait