Try to make MergeTreeRestartingThread more reactive in case of expired sessions (#41092)

This commit is contained in:
Nikita Mikhaylov 2022-09-23 14:40:04 +02:00 committed by GitHub
parent b1d8593d18
commit dbcba1490c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 214 additions and 3 deletions

View File

@ -0,0 +1,48 @@
#include <Common/EventNotifier.h>
#include <Common/Exception.h>
#include <boost/functional/hash.hpp>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
std::unique_ptr<EventNotifier> EventNotifier::event_notifier;
EventNotifier & EventNotifier::init()
{
if (event_notifier)
throw Exception(ErrorCodes::LOGICAL_ERROR, "EventNotifier is initialized twice. This is a bug.");
event_notifier = std::make_unique<EventNotifier>();
return *event_notifier;
}
EventNotifier & EventNotifier::instance()
{
if (!event_notifier)
throw Exception(ErrorCodes::LOGICAL_ERROR, "EventNotifier is not initialized. This is a bug.");
return *event_notifier;
}
void EventNotifier::shutdown()
{
if (event_notifier)
event_notifier.reset();
}
size_t EventNotifier::calculateIdentifier(size_t a, size_t b)
{
size_t result = 0;
boost::hash_combine(result, a);
boost::hash_combine(result, b);
return result;
}
}

View File

@ -0,0 +1,92 @@
#pragma once
#include <vector>
#include <mutex>
#include <functional>
#include <set>
#include <map>
#include <memory>
#include <utility>
#include <iostream>
#include <base/types.h>
#include <Common/HashTable/Hash.h>
namespace DB
{
class EventNotifier
{
public:
struct Handler
{
Handler(
EventNotifier & parent_,
size_t event_id_,
size_t callback_id_)
: parent(parent_)
, event_id(event_id_)
, callback_id(callback_id_)
{}
~Handler()
{
std::lock_guard lock(parent.mutex);
parent.callback_table[event_id].erase(callback_id);
parent.storage.erase(callback_id);
}
private:
EventNotifier & parent;
size_t event_id;
size_t callback_id;
};
using HandlerPtr = std::shared_ptr<Handler>;
static EventNotifier & init();
static EventNotifier & instance();
static void shutdown();
template <typename EventType, typename Callback>
[[ nodiscard ]] HandlerPtr subscribe(EventType event, Callback && callback)
{
std::lock_guard lock(mutex);
auto event_id = DefaultHash64(event);
auto callback_id = calculateIdentifier(event_id, ++counter);
callback_table[event_id].insert(callback_id);
storage[callback_id] = std::forward<Callback>(callback);
return std::make_shared<Handler>(*this, event_id, callback_id);
}
template <typename EventType>
void notify(EventType event)
{
std::lock_guard lock(mutex);
for (const auto & identifier : callback_table[DefaultHash64(event)])
storage[identifier]();
}
private:
// To move boost include for .h file
static size_t calculateIdentifier(size_t a, size_t b);
using CallbackType = std::function<void()>;
using CallbackStorage = std::map<size_t, CallbackType>;
using EventToCallbacks = std::map<size_t, std::set<size_t>>;
std::mutex mutex;
EventToCallbacks callback_table;
CallbackStorage storage;
size_t counter{0};
static std::unique_ptr<EventNotifier> event_notifier;
};
}

View File

@ -139,12 +139,14 @@ void ZooKeeper::init(ZooKeeperArgs args_)
}
}
ZooKeeper::ZooKeeper(const ZooKeeperArgs & args_, std::shared_ptr<DB::ZooKeeperLog> zk_log_)
: zk_log(std::move(zk_log_))
{
zk_log = std::move(zk_log_);
init(args_);
}
ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name, std::shared_ptr<DB::ZooKeeperLog> zk_log_)
: zk_log(std::move(zk_log_))
{

View File

@ -1,15 +1,16 @@
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperImpl.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/Exception.h>
#include <Common/EventNotifier.h>
#include <Common/logger_useful.h>
#include <Common/ProfileEvents.h>
#include <Common/setThreadName.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromString.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <Common/logger_useful.h>
#include <base/getThreadId.h>
#include <Common/config.h>
@ -874,7 +875,11 @@ void ZooKeeper::finalize(bool error_send, bool error_receive, const String & rea
/// No new requests will appear in queue after finish()
bool was_already_finished = requests_queue.finish();
if (!was_already_finished)
{
active_session_metric_increment.destroy();
/// Notify all subscribers (ReplicatedMergeTree tables) about expired session
EventNotifier::instance().notify(Error::ZSESSIONEXPIRED);
}
};
try

View File

@ -0,0 +1,53 @@
#include <gtest/gtest.h>
#include <Common/EventNotifier.h>
#include <Common/ZooKeeper/IKeeper.h>
TEST(EventNotifier, SimpleTest)
{
using namespace DB;
size_t result = 1;
EventNotifier::init();
auto handler3 = EventNotifier::instance().subscribe(Coordination::Error::ZSESSIONEXPIRED, [&result](){ result *= 3; });
{
auto handler5 = EventNotifier::instance().subscribe(Coordination::Error::ZSESSIONEXPIRED, [&result](){ result *= 5; });
}
auto handler7 = EventNotifier::instance().subscribe(Coordination::Error::ZSESSIONEXPIRED, [&result](){ result *= 7; });
EventNotifier::instance().notify(Coordination::Error::ZSESSIONEXPIRED);
ASSERT_EQ(result, 21);
result = 1;
handler3.reset();
EventNotifier::instance().notify(Coordination::Error::ZSESSIONEXPIRED);
ASSERT_EQ(result, 7);
auto handler11 = EventNotifier::instance().subscribe(Coordination::Error::ZSESSIONEXPIRED, [&result](){ result *= 11; });
result = 1;
handler7.reset();
EventNotifier::instance().notify(Coordination::Error::ZSESSIONEXPIRED);
ASSERT_EQ(result, 11);
EventNotifier::HandlerPtr handler13;
{
handler13 = EventNotifier::instance().subscribe(Coordination::Error::ZSESSIONEXPIRED, [&result](){ result *= 13; });
}
result = 1;
EventNotifier::instance().notify(Coordination::Error::ZSESSIONEXPIRED);
ASSERT_EQ(result, 143);
result = 1;
handler11.reset();
handler13.reset();
EventNotifier::instance().notify(Coordination::Error::ZSESSIONEXPIRED);
ASSERT_EQ(result, 1);
EventNotifier::shutdown();
}

View File

@ -8,6 +8,7 @@
#include <Poco/Util/Application.h>
#include <Common/Macros.h>
#include <Common/escapeForFileName.h>
#include <Common/EventNotifier.h>
#include <Common/setThreadName.h>
#include <Common/Stopwatch.h>
#include <Common/formatReadable.h>
@ -510,6 +511,7 @@ void Context::initGlobal()
assert(!global_context_instance);
global_context_instance = shared_from_this();
DatabaseCatalog::init(shared_from_this());
EventNotifier::init();
}
SharedContextHolder Context::createShared()

View File

@ -4185,6 +4185,11 @@ void StorageReplicatedMergeTree::startupImpl()
/// In this thread replica will be activated.
restarting_thread.start();
/// And this is just a callback
session_expired_callback_handler = EventNotifier::instance().subscribe(Coordination::Error::ZSESSIONEXPIRED, [this]()
{
restarting_thread.start();
});
/// Wait while restarting_thread finishing initialization.
/// NOTE It does not mean that replication is actually started after receiving this event.
@ -4228,6 +4233,8 @@ void StorageReplicatedMergeTree::shutdown()
if (shutdown_called.exchange(true))
return;
session_expired_callback_handler.reset();
/// Cancel fetches, merges and mutations to force the queue_task to finish ASAP.
fetcher.blocker.cancelForever();
merger_mutator.merges_blocker.cancelForever();

View File

@ -29,6 +29,7 @@
#include <Common/randomSeed.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/Throttler.h>
#include <Common/EventNotifier.h>
#include <base/defines.h>
#include <Core/BackgroundSchedulePool.h>
#include <QueryPipeline/Pipe.h>
@ -453,6 +454,7 @@ private:
/// A thread that processes reconnection to ZooKeeper when the session expires.
ReplicatedMergeTreeRestartingThread restarting_thread;
EventNotifier::HandlerPtr session_expired_callback_handler;
/// A thread that attaches the table using ZooKeeper
std::optional<ReplicatedMergeTreeAttachThread> attach_thread;