Address PR comments

This commit is contained in:
Antonio Andelic 2022-08-24 17:44:14 +00:00
parent da0d6056f4
commit 797e21761a
5 changed files with 29 additions and 52 deletions

View File

@ -739,7 +739,6 @@ public:
ReplicatedFetchList & getReplicatedFetchList();
const ReplicatedFetchList & getReplicatedFetchList() const;
/// If the current session is expired at the time of the call, synchronously creates and returns a new session with the startNewSession() call.
/// If no ZooKeeper configured, throws an exception.
std::shared_ptr<zkutil::ZooKeeper> getZooKeeper() const;

View File

@ -1,6 +1,6 @@
#include <Storages/MergeTree/ReplicatedMergeTreeAttachThread.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include "Common/ZooKeeper/IKeeper.h"
#include <Common/ZooKeeper/IKeeper.h>
namespace DB
{
@ -48,22 +48,16 @@ void ReplicatedMergeTreeAttachThread::run()
catch (const Exception & e)
{
if (const auto * coordination_exception = dynamic_cast<const Coordination::Exception *>(&e))
{
std::array retriable_errors{
Coordination::Error::ZCONNECTIONLOSS, Coordination::Error::ZSESSIONEXPIRED, Coordination::Error::ZOPERATIONTIMEOUT};
needs_retry = std::any_of(
retriable_errors.begin(), retriable_errors.end(), [&](const auto error) { return error == coordination_exception->code; });
}
needs_retry = Coordination::isHardwareError(coordination_exception->code);
if (!needs_retry)
if (needs_retry)
{
LOG_ERROR(log, "Initialization failed, table will remain readonly. Error: {}", e.message());
std::lock_guard lock(storage.initialization_mutex);
storage.initialization_done = true;
LOG_ERROR(log, "Initialization failed. Error: {}", e.message());
}
else
{
LOG_ERROR(log, "Initialization failed. Error: {}", e.message());
LOG_ERROR(log, "Initialization failed, table will remain readonly. Error: {}", e.message());
storage.initialization_done = true;
}
}
@ -160,14 +154,7 @@ void ReplicatedMergeTreeAttachThread::runImpl()
void ReplicatedMergeTreeAttachThread::finalizeInitialization() TSA_NO_THREAD_SAFETY_ANALYSIS
{
std::unique_lock lock(storage.initialization_mutex);
if (storage.startup_called)
{
lock.unlock();
storage.startupImpl();
lock.lock();
}
storage.initialization_done = true;
LOG_INFO(log, "Table is initialized");
}

View File

@ -104,8 +104,7 @@ void ReplicatedMergeTreeRestartingThread::run()
bool ReplicatedMergeTreeRestartingThread::runImpl()
{
auto zookeeper = storage.tryGetZooKeeper();
if (!storage.is_readonly && zookeeper && !zookeeper->expired())
if (!storage.is_readonly && !storage.getZooKeeper()->expired())
return true;
if (first_time)

View File

@ -277,7 +277,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
, cleanup_thread(*this)
, part_check_thread(*this)
, restarting_thread(*this)
, attach_thread(*this)
, part_moves_between_shards_orchestrator(*this)
, renaming_restrictions(renaming_restrictions_)
, replicated_fetches_pool_size(getContext()->getSettingsRef().background_fetches_pool_size)
@ -382,10 +381,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
if (attach)
{
LOG_INFO(log, "Table will be in readonly mode until initialization is finished");
attach_thread.setSkipSanityChecks(skip_sanity_checks);
attach_thread.start();
attach_thread.waitFirstTry();
attach_thread.emplace(*this);
attach_thread->setSkipSanityChecks(skip_sanity_checks);
return;
}
@ -572,6 +569,7 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas(
void StorageReplicatedMergeTree::createNewZooKeeperNodes()
{
auto zookeeper = getZooKeeper();
std::vector<zkutil::ZooKeeper::FutureCreate> futures;
/// These 4 nodes used to be created in createNewZookeeperNodes() and they were moved to createTable()
@ -819,16 +817,13 @@ void StorageReplicatedMergeTree::createReplica(const StorageMetadataPtr & metada
void StorageReplicatedMergeTree::drop()
{
assert(shutdown_called);
/// There is also the case when user has configured ClickHouse to wrong ZooKeeper cluster
/// or metadata of staled replica were removed manually,
/// in this case, has_metadata_in_zookeeper = false, and we also permit to drop the table.
bool maybe_has_metadata_in_zookeeper = false;
{
std::lock_guard lock{initialization_mutex};
maybe_has_metadata_in_zookeeper = !initialization_done || !has_metadata_in_zookeeper.has_value() || *has_metadata_in_zookeeper;
}
bool maybe_has_metadata_in_zookeeper = !has_metadata_in_zookeeper.has_value() || *has_metadata_in_zookeeper;
if (maybe_has_metadata_in_zookeeper)
{
/// Table can be shut down, restarting thread is not active
@ -843,7 +838,6 @@ void StorageReplicatedMergeTree::drop()
if (!zookeeper)
throw Exception("Can't drop readonly replicated table (need to drop data in ZooKeeper as well)", ErrorCodes::TABLE_IS_READ_ONLY);
shutdown();
dropReplica(zookeeper, zookeeper_path, replica_name, log, getSettings());
}
@ -1057,6 +1051,7 @@ bool StorageReplicatedMergeTree::removeTableNodesFromZooKeeper(zkutil::ZooKeeper
void StorageReplicatedMergeTree::checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot)
{
auto zookeeper = getZooKeeper();
ReplicatedMergeTreeTableMetadata old_metadata(*this, metadata_snapshot);
Coordination::Stat metadata_stat;
@ -1113,6 +1108,7 @@ static time_t tryGetPartCreateTime(zkutil::ZooKeeperPtr & zookeeper, const Strin
void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
{
auto zookeeper = getZooKeeper();
Strings expected_parts_vec = zookeeper->getChildren(fs::path(replica_path) / "parts");
/// Parts in ZK.
@ -1252,6 +1248,7 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
void StorageReplicatedMergeTree::syncPinnedPartUUIDs()
{
auto zookeeper = getZooKeeper();
Coordination::Stat stat;
String s = zookeeper->get(zookeeper_path + "/pinned_part_uuids", &stat);
@ -3448,7 +3445,7 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n
void StorageReplicatedMergeTree::startBeingLeader()
{
auto zookeeper = getZooKeeper();
assert(zookeeper);
if (!getSettings()->replicated_can_become_leader)
{
LOG_INFO(log, "Will not enter leader election because replicated_can_become_leader=0");
@ -4156,14 +4153,12 @@ DataPartStoragePtr StorageReplicatedMergeTree::fetchExistsPart(
void StorageReplicatedMergeTree::startup()
{
if (attach_thread)
{
std::lock_guard lock(initialization_mutex);
if (!initialization_done)
{
startup_called = true;
attach_thread->start();
attach_thread->waitFirstTry();
return;
}
}
startupImpl();
}
@ -4236,7 +4231,8 @@ void StorageReplicatedMergeTree::shutdown()
mutations_finalizing_task->deactivate();
stopBeingLeader();
attach_thread.shutdown();
if (attach_thread)
attach_thread->shutdown();
restarting_thread.shutdown();
background_operations_assignee.finish();
part_moves_between_shards_orchestrator.shutdown();
@ -4977,11 +4973,8 @@ void StorageReplicatedMergeTree::restoreMetadataInZooKeeper()
{
LOG_INFO(log, "Restoring replica metadata");
{
std::lock_guard lock(initialization_mutex);
if (!initialization_done)
throw Exception(ErrorCodes::NOT_INITIALIZED, "Table is not initialized yet");
}
if (!is_readonly)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica must be readonly");

View File

@ -447,13 +447,12 @@ private:
/// A thread that processes reconnection to ZooKeeper when the session expires.
ReplicatedMergeTreeRestartingThread restarting_thread;
ReplicatedMergeTreeAttachThread attach_thread;
/// A thread that attaches the table using ZooKeeper
std::optional<ReplicatedMergeTreeAttachThread> attach_thread;
PartMovesBetweenShardsOrchestrator part_moves_between_shards_orchestrator;
mutable std::mutex initialization_mutex;
TSA_GUARDED_BY(initialization_mutex) bool initialization_done{false};
TSA_GUARDED_BY(initialization_mutex) bool startup_called{false};
std::atomic<bool> initialization_done{false};
/// True if replica was created for existing table with fixed granularity
bool other_replicas_fixed_granularity = false;