Merge pull request #26972 from ClickHouse/fix_zookeeper_log_initialization

Fix system.zookeeper_log initialization
This commit is contained in:
alesapin 2021-07-30 12:24:19 +03:00 committed by GitHub
commit 444ad67663
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 56 additions and 9 deletions

View File

@ -1094,6 +1094,7 @@ if (ThreadFuzzer::instance().isEffective())
loadMetadataSystem(global_context);
/// After attaching system databases we can initialize system log.
global_context->initializeSystemLogs();
global_context->setSystemZooKeeperLogAfterInitializationIfNeeded();
auto & database_catalog = DatabaseCatalog::instance();
/// After the system database is created, attach virtual system tables (in addition to query_log and part_log)
attachSystemTablesServer(*database_catalog.getSystemDatabase(), has_zookeeper);

View File

@ -135,8 +135,10 @@ void ZooKeeper::init(const std::string & implementation_, const Strings & hosts_
}
ZooKeeper::ZooKeeper(const std::string & hosts_string, const std::string & identity_, int32_t session_timeout_ms_,
int32_t operation_timeout_ms_, const std::string & chroot_, const std::string & implementation_)
int32_t operation_timeout_ms_, const std::string & chroot_, const std::string & implementation_,
std::shared_ptr<DB::ZooKeeperLog> zk_log_)
{
zk_log = std::move(zk_log_);
Strings hosts_strings;
splitInto<','>(hosts_strings, hosts_string);
@ -144,8 +146,10 @@ ZooKeeper::ZooKeeper(const std::string & hosts_string, const std::string & ident
}
ZooKeeper::ZooKeeper(const Strings & hosts_, const std::string & identity_, int32_t session_timeout_ms_,
int32_t operation_timeout_ms_, const std::string & chroot_, const std::string & implementation_)
int32_t operation_timeout_ms_, const std::string & chroot_, const std::string & implementation_,
std::shared_ptr<DB::ZooKeeperLog> zk_log_)
{
zk_log = std::move(zk_log_);
init(implementation_, hosts_, identity_, session_timeout_ms_, operation_timeout_ms_, chroot_);
}
@ -729,7 +733,7 @@ bool ZooKeeper::waitForDisappear(const std::string & path, const WaitCondition &
ZooKeeperPtr ZooKeeper::startNewSession() const
{
return std::make_shared<ZooKeeper>(hosts, identity, session_timeout_ms, operation_timeout_ms, chroot, implementation);
return std::make_shared<ZooKeeper>(hosts, identity, session_timeout_ms, operation_timeout_ms, chroot, implementation, zk_log);
}
@ -1020,6 +1024,14 @@ void ZooKeeper::finalize()
impl->finalize();
}
void ZooKeeper::setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_)
{
zk_log = std::move(zk_log_);
if (auto * zk = dynamic_cast<Coordination::ZooKeeper *>(impl.get()))
zk->setZooKeeperLog(zk_log);
}
size_t KeeperMultiException::getFailedOpIndex(Coordination::Error exception_code, const Coordination::Responses & responses)
{
if (responses.empty())

View File

@ -56,13 +56,15 @@ public:
int32_t session_timeout_ms_ = Coordination::DEFAULT_SESSION_TIMEOUT_MS,
int32_t operation_timeout_ms_ = Coordination::DEFAULT_OPERATION_TIMEOUT_MS,
const std::string & chroot_ = "",
const std::string & implementation_ = "zookeeper");
const std::string & implementation_ = "zookeeper",
std::shared_ptr<DB::ZooKeeperLog> zk_log_ = nullptr);
ZooKeeper(const Strings & hosts_, const std::string & identity_ = "",
int32_t session_timeout_ms_ = Coordination::DEFAULT_SESSION_TIMEOUT_MS,
int32_t operation_timeout_ms_ = Coordination::DEFAULT_OPERATION_TIMEOUT_MS,
const std::string & chroot_ = "",
const std::string & implementation_ = "zookeeper");
const std::string & implementation_ = "zookeeper",
std::shared_ptr<DB::ZooKeeperLog> zk_log_ = nullptr);
/** Config of the form:
<zookeeper>
@ -273,6 +275,8 @@ public:
void finalize();
void setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_);
private:
friend class EphemeralNodeHolder;

View File

@ -315,9 +315,10 @@ ZooKeeper::ZooKeeper(
std::shared_ptr<ZooKeeperLog> zk_log_)
: root_path(root_path_),
session_timeout(session_timeout_),
operation_timeout(std::min(operation_timeout_, session_timeout_)),
zk_log(std::move(zk_log_))
operation_timeout(std::min(operation_timeout_, session_timeout_))
{
std::atomic_store(&zk_log, std::move(zk_log_));
if (!root_path.empty())
{
if (root_path.back() == '/')
@ -1212,9 +1213,16 @@ void ZooKeeper::close()
}
void ZooKeeper::setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_)
{
/// logOperationIfNeeded(...) uses zk_log and can be called from different threads, so we have to use atomic shared_ptr
std::atomic_store(&zk_log, std::move(zk_log_));
}
void ZooKeeper::logOperationIfNeeded(const ZooKeeperRequestPtr & request, const ZooKeeperResponsePtr & response, bool finalize)
{
if (!zk_log)
auto maybe_zk_log = std::atomic_load(&zk_log);
if (!maybe_zk_log)
return;
ZooKeeperLogElement::Type log_type = ZooKeeperLogElement::UNKNOWN;
@ -1249,7 +1257,7 @@ void ZooKeeper::logOperationIfNeeded(const ZooKeeperRequestPtr & request, const
elem.event_time = event_time;
elem.address = socket.peerAddress();
elem.session_id = session_id;
zk_log->add(elem);
maybe_zk_log->add(elem);
}
}

View File

@ -189,6 +189,8 @@ public:
void finalize() override { finalize(false, false); }
void setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_);
private:
String root_path;
ACLs default_acls;

View File

@ -1752,6 +1752,24 @@ zkutil::ZooKeeperPtr Context::getZooKeeper() const
return shared->zookeeper;
}
void Context::setSystemZooKeeperLogAfterInitializationIfNeeded()
{
/// It can be nearly impossible to understand in which order global objects are initialized on server startup.
/// If getZooKeeper() is called before initializeSystemLogs(), then zkutil::ZooKeeper gets nullptr
/// instead of pointer to system table and it logs nothing.
/// This method explicitly sets correct pointer to system log after its initialization.
/// TODO get rid of this if possible
std::lock_guard lock(shared->zookeeper_mutex);
if (!shared->system_logs || !shared->system_logs->zookeeper_log)
return;
if (shared->zookeeper)
shared->zookeeper->setZooKeeperLog(shared->system_logs->zookeeper_log);
for (auto & zk : shared->auxiliary_zookeepers)
zk.second->setZooKeeperLog(shared->system_logs->zookeeper_log);
}
void Context::initializeKeeperStorageDispatcher() const
{

View File

@ -649,6 +649,8 @@ public:
// Reload Zookeeper
void reloadZooKeeperIfChanged(const ConfigurationPtr & config) const;
void setSystemZooKeeperLogAfterInitializationIfNeeded();
/// Create a cache of uncompressed blocks of specified size. This can be done only once.
void setUncompressedCache(size_t max_size_in_bytes);
std::shared_ptr<UncompressedCache> getUncompressedCache() const;