mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 16:42:05 +00:00
fix system.zookeeper_log initialization
This commit is contained in:
parent
9a20b9f0c6
commit
47995a053e
@ -1047,6 +1047,7 @@ if (ThreadFuzzer::instance().isEffective())
|
||||
loadMetadataSystem(global_context);
|
||||
/// After attaching system databases we can initialize system log.
|
||||
global_context->initializeSystemLogs();
|
||||
global_context->setSystemZooKeeperLogAfterInitializationIfNeeded();
|
||||
auto & database_catalog = DatabaseCatalog::instance();
|
||||
/// After the system database is created, attach virtual system tables (in addition to query_log and part_log)
|
||||
attachSystemTablesServer(*database_catalog.getSystemDatabase(), has_zookeeper);
|
||||
|
@ -135,8 +135,10 @@ void ZooKeeper::init(const std::string & implementation_, const Strings & hosts_
|
||||
}
|
||||
|
||||
ZooKeeper::ZooKeeper(const std::string & hosts_string, const std::string & identity_, int32_t session_timeout_ms_,
|
||||
int32_t operation_timeout_ms_, const std::string & chroot_, const std::string & implementation_)
|
||||
int32_t operation_timeout_ms_, const std::string & chroot_, const std::string & implementation_,
|
||||
std::shared_ptr<DB::ZooKeeperLog> zk_log_)
|
||||
{
|
||||
zk_log = std::move(zk_log_);
|
||||
Strings hosts_strings;
|
||||
splitInto<','>(hosts_strings, hosts_string);
|
||||
|
||||
@ -144,8 +146,10 @@ ZooKeeper::ZooKeeper(const std::string & hosts_string, const std::string & ident
|
||||
}
|
||||
|
||||
ZooKeeper::ZooKeeper(const Strings & hosts_, const std::string & identity_, int32_t session_timeout_ms_,
|
||||
int32_t operation_timeout_ms_, const std::string & chroot_, const std::string & implementation_)
|
||||
int32_t operation_timeout_ms_, const std::string & chroot_, const std::string & implementation_,
|
||||
std::shared_ptr<DB::ZooKeeperLog> zk_log_)
|
||||
{
|
||||
zk_log = std::move(zk_log_);
|
||||
init(implementation_, hosts_, identity_, session_timeout_ms_, operation_timeout_ms_, chroot_);
|
||||
}
|
||||
|
||||
@ -729,7 +733,7 @@ bool ZooKeeper::waitForDisappear(const std::string & path, const WaitCondition &
|
||||
|
||||
ZooKeeperPtr ZooKeeper::startNewSession() const
|
||||
{
|
||||
return std::make_shared<ZooKeeper>(hosts, identity, session_timeout_ms, operation_timeout_ms, chroot, implementation);
|
||||
return std::make_shared<ZooKeeper>(hosts, identity, session_timeout_ms, operation_timeout_ms, chroot, implementation, zk_log);
|
||||
}
|
||||
|
||||
|
||||
@ -1020,6 +1024,14 @@ void ZooKeeper::finalize()
|
||||
impl->finalize();
|
||||
}
|
||||
|
||||
void ZooKeeper::setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_)
|
||||
{
|
||||
zk_log = std::move(zk_log_);
|
||||
if (auto * zk = dynamic_cast<Coordination::ZooKeeper *>(impl.get()))
|
||||
zk->setZooKeeperLog(zk_log);
|
||||
}
|
||||
|
||||
|
||||
size_t KeeperMultiException::getFailedOpIndex(Coordination::Error exception_code, const Coordination::Responses & responses)
|
||||
{
|
||||
if (responses.empty())
|
||||
|
@ -56,13 +56,15 @@ public:
|
||||
int32_t session_timeout_ms_ = Coordination::DEFAULT_SESSION_TIMEOUT_MS,
|
||||
int32_t operation_timeout_ms_ = Coordination::DEFAULT_OPERATION_TIMEOUT_MS,
|
||||
const std::string & chroot_ = "",
|
||||
const std::string & implementation_ = "zookeeper");
|
||||
const std::string & implementation_ = "zookeeper",
|
||||
std::shared_ptr<DB::ZooKeeperLog> zk_log_ = nullptr);
|
||||
|
||||
ZooKeeper(const Strings & hosts_, const std::string & identity_ = "",
|
||||
int32_t session_timeout_ms_ = Coordination::DEFAULT_SESSION_TIMEOUT_MS,
|
||||
int32_t operation_timeout_ms_ = Coordination::DEFAULT_OPERATION_TIMEOUT_MS,
|
||||
const std::string & chroot_ = "",
|
||||
const std::string & implementation_ = "zookeeper");
|
||||
const std::string & implementation_ = "zookeeper",
|
||||
std::shared_ptr<DB::ZooKeeperLog> zk_log_ = nullptr);
|
||||
|
||||
/** Config of the form:
|
||||
<zookeeper>
|
||||
@ -273,6 +275,8 @@ public:
|
||||
|
||||
void finalize();
|
||||
|
||||
void setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_);
|
||||
|
||||
private:
|
||||
friend class EphemeralNodeHolder;
|
||||
|
||||
|
@ -315,9 +315,10 @@ ZooKeeper::ZooKeeper(
|
||||
std::shared_ptr<ZooKeeperLog> zk_log_)
|
||||
: root_path(root_path_),
|
||||
session_timeout(session_timeout_),
|
||||
operation_timeout(std::min(operation_timeout_, session_timeout_)),
|
||||
zk_log(std::move(zk_log_))
|
||||
operation_timeout(std::min(operation_timeout_, session_timeout_))
|
||||
{
|
||||
std::atomic_store(&zk_log, std::move(zk_log_));
|
||||
|
||||
if (!root_path.empty())
|
||||
{
|
||||
if (root_path.back() == '/')
|
||||
@ -1212,9 +1213,16 @@ void ZooKeeper::close()
|
||||
}
|
||||
|
||||
|
||||
void ZooKeeper::setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_)
|
||||
{
|
||||
/// logOperationIfNeeded(...) uses zk_log and can be called from different threads, so we have to use atomic shared_ptr
|
||||
std::atomic_store(&zk_log, std::move(zk_log_));
|
||||
}
|
||||
|
||||
void ZooKeeper::logOperationIfNeeded(const ZooKeeperRequestPtr & request, const ZooKeeperResponsePtr & response, bool finalize)
|
||||
{
|
||||
if (!zk_log)
|
||||
auto maybe_zk_log = std::atomic_load(&zk_log);
|
||||
if (!maybe_zk_log)
|
||||
return;
|
||||
|
||||
ZooKeeperLogElement::Type log_type = ZooKeeperLogElement::UNKNOWN;
|
||||
@ -1249,7 +1257,7 @@ void ZooKeeper::logOperationIfNeeded(const ZooKeeperRequestPtr & request, const
|
||||
elem.event_time = event_time;
|
||||
elem.address = socket.peerAddress();
|
||||
elem.session_id = session_id;
|
||||
zk_log->add(elem);
|
||||
maybe_zk_log->add(elem);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -189,6 +189,8 @@ public:
|
||||
|
||||
void finalize() override { finalize(false, false); }
|
||||
|
||||
void setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_);
|
||||
|
||||
private:
|
||||
String root_path;
|
||||
ACLs default_acls;
|
||||
|
@ -1752,6 +1752,24 @@ zkutil::ZooKeeperPtr Context::getZooKeeper() const
|
||||
return shared->zookeeper;
|
||||
}
|
||||
|
||||
void Context::setSystemZooKeeperLogAfterInitializationIfNeeded()
|
||||
{
|
||||
/// It can be nearly impossible to understand in which order global objects are initialized on server startup.
|
||||
/// If getZooKeeper() is called before initializeSystemLogs(), then zkutil::ZooKeeper gets nullptr
|
||||
/// instead of pointer to system table and it logs nothing.
|
||||
/// This method explicitly sets correct pointer to system log after its initialization.
|
||||
/// TODO get rid of this if possible
|
||||
|
||||
std::lock_guard lock(shared->zookeeper_mutex);
|
||||
if (!shared->system_logs || !shared->system_logs->zookeeper_log)
|
||||
return;
|
||||
|
||||
if (shared->zookeeper)
|
||||
shared->zookeeper->setZooKeeperLog(shared->system_logs->zookeeper_log);
|
||||
|
||||
for (auto & zk : shared->auxiliary_zookeepers)
|
||||
zk.second->setZooKeeperLog(shared->system_logs->zookeeper_log);
|
||||
}
|
||||
|
||||
void Context::initializeKeeperStorageDispatcher() const
|
||||
{
|
||||
|
@ -649,6 +649,8 @@ public:
|
||||
// Reload Zookeeper
|
||||
void reloadZooKeeperIfChanged(const ConfigurationPtr & config) const;
|
||||
|
||||
void setSystemZooKeeperLogAfterInitializationIfNeeded();
|
||||
|
||||
/// Create a cache of uncompressed blocks of specified size. This can be done only once.
|
||||
void setUncompressedCache(size_t max_size_in_bytes);
|
||||
std::shared_ptr<UncompressedCache> getUncompressedCache() const;
|
||||
|
Loading…
Reference in New Issue
Block a user