Improve recovery of ReplicatedAccessStorage after errors.

This commit is contained in:
Vitaly Baranov 2022-08-09 11:43:23 +02:00
parent 8f9f5c69da
commit 646cd55690
4 changed files with 132 additions and 49 deletions

View File

@ -45,10 +45,10 @@ public:
/// Reloads and updates entities in this storage. This function is used to implement SYSTEM RELOAD CONFIG.
virtual void reload() {}
/// Starts periodic reloading and update of entities in this storage.
/// Starts periodic reloading and updating of entities in this storage.
virtual void startPeriodicReloading() {}
/// Stops periodic reloading and update of entities in this storage.
/// Stops periodic reloading and updating of entities in this storage.
virtual void stopPeriodicReloading() {}
/// Returns the identifiers of all the entities of a specified type contained in the storage.

View File

@ -119,7 +119,7 @@ bool MemoryAccessStorage::insertNoLock(const UUID & id, const AccessEntityPtr &
assert(replace_if_exists);
removeNoLock(id_by_name, /* throw_if_not_exists = */ false);
}
if (id_collision)
{
assert(replace_if_exists);

View File

@ -24,8 +24,8 @@ namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int NO_ZOOKEEPER;
extern const int BAD_ARGUMENTS;
extern const int NO_ZOOKEEPER;
}
static UUID parseUUID(const String & text)
@ -60,7 +60,7 @@ ReplicatedAccessStorage::ReplicatedAccessStorage(
if (zookeeper_path.front() != '/')
zookeeper_path = "/" + zookeeper_path;
initializeZookeeper();
initZooKeeperIfNeeded();
}
ReplicatedAccessStorage::~ReplicatedAccessStorage()
@ -122,15 +122,14 @@ bool ReplicatedAccessStorage::insertWithID(const UUID & id, const AccessEntityPt
const String & name = new_entity->getName();
LOG_DEBUG(getLogger(), "Inserting entity of type {} named {} with id {}", type_info.name, name, toString(id));
auto zookeeper = get_zookeeper();
auto zookeeper = getZooKeeper();
bool ok = false;
retryOnZooKeeperUserError(10, [&]{ ok = insertZooKeeper(zookeeper, id, new_entity, replace_if_exists, throw_if_exists); });
if (!ok)
return false;
std::lock_guard lock{mutex};
refreshEntityNoLock(zookeeper, id);
refreshEntity(zookeeper, id);
return true;
}
@ -222,7 +221,7 @@ bool ReplicatedAccessStorage::removeImpl(const UUID & id, bool throw_if_not_exis
{
LOG_DEBUG(getLogger(), "Removing entity {}", toString(id));
auto zookeeper = get_zookeeper();
auto zookeeper = getZooKeeper();
bool ok = false;
retryOnZooKeeperUserError(10, [&] { ok = removeZooKeeper(zookeeper, id, throw_if_not_exists); });
@ -274,15 +273,14 @@ bool ReplicatedAccessStorage::updateImpl(const UUID & id, const UpdateFunc & upd
{
LOG_DEBUG(getLogger(), "Updating entity {}", toString(id));
auto zookeeper = get_zookeeper();
auto zookeeper = getZooKeeper();
bool ok = false;
retryOnZooKeeperUserError(10, [&] { ok = updateZooKeeper(zookeeper, id, update_func, throw_if_not_exists); });
if (!ok)
return false;
std::lock_guard lock{mutex};
refreshEntityNoLock(zookeeper, id);
refreshEntity(zookeeper, id);
return true;
}
@ -350,45 +348,82 @@ void ReplicatedAccessStorage::runWatchingThread()
{
LOG_DEBUG(getLogger(), "Started watching thread");
setThreadName("ReplACLWatch");
while (watching)
{
bool refreshed = false;
try
{
if (!initialized)
initializeZookeeper();
if (refresh())
changes_notifier.sendNotifications();
initZooKeeperIfNeeded();
refreshed = refresh();
}
catch (...)
{
tryLogCurrentException(getLogger(), "Unexpected error, will try to restart worker thread:");
tryLogCurrentException(getLogger(), "Will try to restart watching thread after error");
resetAfterError();
sleepForSeconds(5);
continue;
}
if (refreshed)
{
try
{
changes_notifier.sendNotifications();
}
catch (...)
{
tryLogCurrentException(getLogger(), "Error while sending notifications");
}
}
}
}
void ReplicatedAccessStorage::resetAfterError()
{
initialized = false;
UUID id;
while (watched_queue->tryPop(id)) {}
/// Make watching thread reinitialize ZooKeeper and reread everything.
std::lock_guard lock{cached_zookeeper_mutex};
cached_zookeeper = nullptr;
}
void ReplicatedAccessStorage::initializeZookeeper()
void ReplicatedAccessStorage::initZooKeeperIfNeeded()
{
assert(!initialized);
auto zookeeper = get_zookeeper();
getZooKeeper();
}
if (!zookeeper)
throw Exception("Can't have Replicated access without ZooKeeper", ErrorCodes::NO_ZOOKEEPER);
zkutil::ZooKeeperPtr ReplicatedAccessStorage::getZooKeeper()
{
std::lock_guard lock{cached_zookeeper_mutex};
return getZooKeeperNoLock();
}
createRootNodes(zookeeper);
zkutil::ZooKeeperPtr ReplicatedAccessStorage::getZooKeeperNoLock()
{
if (!cached_zookeeper || cached_zookeeper->expired())
{
auto zookeeper = get_zookeeper();
if (!zookeeper)
throw Exception("Can't have Replicated access without ZooKeeper", ErrorCodes::NO_ZOOKEEPER);
refreshEntities(zookeeper);
/// It's possible that we connected to different [Zoo]Keeper instance
/// so we may read a bit stale state.
zookeeper->sync(zookeeper_path);
initialized = true;
createRootNodes(zookeeper);
refreshEntities(zookeeper, /* all= */ true);
cached_zookeeper = zookeeper;
}
return cached_zookeeper;
}
void ReplicatedAccessStorage::reload()
{
#if 0
/// Reinitialize ZooKeeper and reread everything.
std::lock_guard lock{cached_zookeeper_mutex};
cached_zookeeper = nullptr;
getZooKeeperNoLock();
#endif
}
void ReplicatedAccessStorage::createRootNodes(const zkutil::ZooKeeperPtr & zookeeper)
@ -410,10 +445,10 @@ bool ReplicatedAccessStorage::refresh()
if (!watched_queue->tryPop(id, /* timeout_ms: */ 10000))
return false;
auto zookeeper = get_zookeeper();
auto zookeeper = getZooKeeper();
if (id == UUIDHelpers::Nil)
refreshEntities(zookeeper);
refreshEntities(zookeeper, /* all= */ false);
else
refreshEntity(zookeeper, id);
@ -421,10 +456,16 @@ bool ReplicatedAccessStorage::refresh()
}
void ReplicatedAccessStorage::refreshEntities(const zkutil::ZooKeeperPtr & zookeeper)
void ReplicatedAccessStorage::refreshEntities(const zkutil::ZooKeeperPtr & zookeeper, bool all)
{
LOG_DEBUG(getLogger(), "Refreshing entities list");
if (all)
{
/// It doesn't make sense to keep the queue because we will reread everything in this function.
watched_queue->clear();
}
const String zookeeper_uuids_path = zookeeper_path + "/uuid";
auto watch_entities_list = [watched_queue = watched_queue](const Coordination::WatchResponse &)
{
@ -439,47 +480,84 @@ void ReplicatedAccessStorage::refreshEntities(const zkutil::ZooKeeperPtr & zooke
entity_uuids.emplace_back(parseUUID(entity_uuid_str));
std::lock_guard lock{mutex};
memory_storage.removeAllExcept(entity_uuids);
for (const auto & uuid : entity_uuids)
if (all)
{
if (!initialized || !memory_storage.exists(uuid))
refreshEntityNoLock(zookeeper, uuid);
/// all=true means we read & parse all access entities from ZooKeeper.
std::vector<std::pair<UUID, AccessEntityPtr>> entities;
for (const auto & uuid : entity_uuids)
{
if (auto entity = tryReadEntityFromZooKeeper(zookeeper, uuid))
entities.emplace_back(uuid, entity);
}
memory_storage.setAll(entities);
}
else
{
/// all=false means we read & parse only new access entities from ZooKeeper.
memory_storage.removeAllExcept(entity_uuids);
for (const auto & uuid : entity_uuids)
{
if (!memory_storage.exists(uuid))
refreshEntityNoLock(zookeeper, uuid);
}
}
LOG_DEBUG(getLogger(), "Refreshing entities list finished");
}
void ReplicatedAccessStorage::refreshEntity(const zkutil::ZooKeeperPtr & zookeeper, const UUID & id)
{
LOG_DEBUG(getLogger(), "Refreshing entity {}", toString(id));
auto entity = tryReadEntityFromZooKeeper(zookeeper, id);
std::lock_guard lock{mutex};
refreshEntityNoLock(zookeeper, id);
if (entity)
setEntityNoLock(id, entity);
else
removeEntityNoLock(id);
}
void ReplicatedAccessStorage::refreshEntityNoLock(const zkutil::ZooKeeperPtr & zookeeper, const UUID & id)
{
LOG_DEBUG(getLogger(), "Refreshing entity {}", toString(id));
auto entity = tryReadEntityFromZooKeeper(zookeeper, id);
if (entity)
setEntityNoLock(id, entity);
else
removeEntityNoLock(id);
}
AccessEntityPtr ReplicatedAccessStorage::tryReadEntityFromZooKeeper(const zkutil::ZooKeeperPtr & zookeeper, const UUID & id) const
{
const auto watch_entity = [watched_queue = watched_queue, id](const Coordination::WatchResponse & response)
{
if (response.type == Coordination::Event::CHANGED)
[[maybe_unused]] bool push_result = watched_queue->push(id);
};
Coordination::Stat entity_stat;
const String entity_path = zookeeper_path + "/uuid/" + toString(id);
String entity_definition;
const bool exists = zookeeper->tryGetWatch(entity_path, entity_definition, &entity_stat, watch_entity);
if (exists)
bool exists = zookeeper->tryGetWatch(entity_path, entity_definition, &entity_stat, watch_entity);
if (!exists)
return nullptr;
try
{
const AccessEntityPtr entity = deserializeAccessEntity(entity_definition, entity_path);
setEntityNoLock(id, entity);
return deserializeAccessEntity(entity_definition, entity_path);
}
else
catch (...)
{
removeEntityNoLock(id);
tryLogCurrentException(getLogger(), "Error while reading the definition of " + toString(id));
return nullptr;
}
}
void ReplicatedAccessStorage::setEntityNoLock(const UUID & id, const AccessEntityPtr & entity)
{
LOG_DEBUG(getLogger(), "Setting id {} to entity named {}", toString(id), entity->getName());

View File

@ -25,6 +25,7 @@ public:
const char * getStorageType() const override { return STORAGE_TYPE; }
void reload() override;
void startPeriodicReloading() override { startWatchingThread(); }
void stopPeriodicReloading() override { stopWatchingThread(); }
@ -36,9 +37,10 @@ public:
private:
String zookeeper_path;
zkutil::GetZooKeeper get_zookeeper;
const zkutil::GetZooKeeper get_zookeeper;
std::atomic<bool> initialized = false;
zkutil::ZooKeeperPtr cached_zookeeper TSA_GUARDED_BY(cached_zookeeper_mutex);
std::mutex cached_zookeeper_mutex;
std::atomic<bool> watching = false;
ThreadFromGlobalPool watching_thread;
@ -53,7 +55,9 @@ private:
bool removeZooKeeper(const zkutil::ZooKeeperPtr & zookeeper, const UUID & id, bool throw_if_not_exists);
bool updateZooKeeper(const zkutil::ZooKeeperPtr & zookeeper, const UUID & id, const UpdateFunc & update_func, bool throw_if_not_exists);
void initializeZookeeper();
void initZooKeeperIfNeeded();
zkutil::ZooKeeperPtr getZooKeeper();
zkutil::ZooKeeperPtr getZooKeeperNoLock() TSA_REQUIRES(cached_zookeeper_mutex);
void createRootNodes(const zkutil::ZooKeeperPtr & zookeeper);
void startWatchingThread();
@ -63,10 +67,11 @@ private:
void resetAfterError();
bool refresh();
void refreshEntities(const zkutil::ZooKeeperPtr & zookeeper);
void refreshEntities(const zkutil::ZooKeeperPtr & zookeeper, bool all);
void refreshEntity(const zkutil::ZooKeeperPtr & zookeeper, const UUID & id);
void refreshEntityNoLock(const zkutil::ZooKeeperPtr & zookeeper, const UUID & id) TSA_REQUIRES(mutex);
AccessEntityPtr tryReadEntityFromZooKeeper(const zkutil::ZooKeeperPtr & zookeeper, const UUID & id) const;
void setEntityNoLock(const UUID & id, const AccessEntityPtr & entity) TSA_REQUIRES(mutex);
void removeEntityNoLock(const UUID & id) TSA_REQUIRES(mutex);