diff --git a/src/Common/Exception.cpp b/src/Common/Exception.cpp
index e5e8cf2c818..fcf44d96b43 100644
--- a/src/Common/Exception.cpp
+++ b/src/Common/Exception.cpp
@@ -251,6 +251,11 @@ void tryLogCurrentException(Poco::Logger * logger, const std::string & start_of_
tryLogCurrentExceptionImpl(logger, start_of_message);
}
+void tryLogCurrentException(const LoggerPtr & logger, const std::string & start_of_message)
+{
+ tryLogCurrentException(logger.get(), start_of_message);
+}
+
static void getNoSpaceLeftInfoMessage(std::filesystem::path path, String & msg)
{
path = std::filesystem::absolute(path);
@@ -523,6 +528,11 @@ void tryLogException(std::exception_ptr e, Poco::Logger * logger, const std::str
}
}
+void tryLogException(std::exception_ptr e, const LoggerPtr & logger, const std::string & start_of_message)
+{
+ tryLogException(e, logger.get(), start_of_message);
+}
+
std::string getExceptionMessage(const Exception & e, bool with_stacktrace, bool check_embedded_stacktrace)
{
return getExceptionMessageAndPattern(e, with_stacktrace, check_embedded_stacktrace).text;
diff --git a/src/Common/Exception.h b/src/Common/Exception.h
index 6f30fde3876..77ba2c6db5b 100644
--- a/src/Common/Exception.h
+++ b/src/Common/Exception.h
@@ -10,6 +10,7 @@
#include
#include
#include
+#include
#include
#include
@@ -242,6 +243,7 @@ using Exceptions = std::vector;
*/
void tryLogCurrentException(const char * log_name, const std::string & start_of_message = "");
void tryLogCurrentException(Poco::Logger * logger, const std::string & start_of_message = "");
+void tryLogCurrentException(const LoggerPtr & logger, const std::string & start_of_message = "");
/** Prints current exception in canonical format.
@@ -287,6 +289,7 @@ struct ExecutionStatus
void tryLogException(std::exception_ptr e, const char * log_name, const std::string & start_of_message = "");
void tryLogException(std::exception_ptr e, Poco::Logger * logger, const std::string & start_of_message = "");
+void tryLogException(std::exception_ptr e, const LoggerPtr & logger, const std::string & start_of_message = "");
std::string getExceptionMessage(const Exception & e, bool with_stacktrace, bool check_embedded_stacktrace = false);
PreformattedMessage getExceptionMessageAndPattern(const Exception & e, bool with_stacktrace, bool check_embedded_stacktrace = false);
diff --git a/src/Common/LoggerPtr.cpp b/src/Common/LoggerPtr.cpp
new file mode 100644
index 00000000000..33caaf81a81
--- /dev/null
+++ b/src/Common/LoggerPtr.cpp
@@ -0,0 +1,15 @@
+#include
+
+struct LoggerDeleter
+{
+ void operator()(const Poco::Logger * logger)
+ {
+ Poco::Logger::destroy(logger->name());
+ }
+};
+
+LoggerPtr getLogger(const std::string & name)
+{
+ Poco::Logger * logger_raw_ptr = &Poco::Logger::get(name);
+ return std::shared_ptr(logger_raw_ptr, LoggerDeleter());
+}
diff --git a/src/Common/LoggerPtr.h b/src/Common/LoggerPtr.h
new file mode 100644
index 00000000000..cd0529ea258
--- /dev/null
+++ b/src/Common/LoggerPtr.h
@@ -0,0 +1,14 @@
+#pragma once
+
+#include
+
+namespace Poco
+{
+
+class Logger;
+
+}
+
+using LoggerPtr = std::shared_ptr;
+
+LoggerPtr getLogger(const std::string & name);
diff --git a/src/Common/logger_useful.h b/src/Common/logger_useful.h
index d9fe5ac9190..fda374befc9 100644
--- a/src/Common/logger_useful.h
+++ b/src/Common/logger_useful.h
@@ -8,6 +8,7 @@
#include
#include
#include
+#include
namespace Poco { class Logger; }
@@ -19,11 +20,12 @@ using LogSeriesLimiterPtr = std::shared_ptr;
namespace
{
- [[maybe_unused]] const ::Poco::Logger * getLogger(const ::Poco::Logger * logger) { return logger; }
- [[maybe_unused]] const ::Poco::Logger * getLogger(const std::atomic<::Poco::Logger *> & logger) { return logger.load(); }
- [[maybe_unused]] std::unique_ptr getLogger(std::unique_ptr && logger) { return logger; }
- [[maybe_unused]] std::unique_ptr getLogger(std::unique_ptr && logger) { return logger; }
- [[maybe_unused]] LogSeriesLimiterPtr getLogger(LogSeriesLimiterPtr & logger) { return logger; }
+ [[maybe_unused]] const ::Poco::Logger * getLoggerHelper(const LoggerPtr & logger) { return logger.get(); }
+ [[maybe_unused]] const ::Poco::Logger * getLoggerHelper(const ::Poco::Logger * logger) { return logger; }
+ [[maybe_unused]] const ::Poco::Logger * getLoggerHelper(const std::atomic<::Poco::Logger *> & logger) { return logger.load(); }
+ [[maybe_unused]] std::unique_ptr getLoggerHelper(std::unique_ptr && logger) { return logger; }
+ [[maybe_unused]] std::unique_ptr getLoggerHelper(std::unique_ptr && logger) { return logger; }
+ [[maybe_unused]] LogSeriesLimiterPtr getLoggerHelper(LogSeriesLimiterPtr & logger) { return logger; }
}
#define LOG_IMPL_FIRST_ARG(X, ...) X
@@ -62,7 +64,7 @@ namespace
#define LOG_IMPL(logger, priority, PRIORITY, ...) do \
{ \
- auto _logger = ::getLogger(logger); \
+ auto _logger = ::getLoggerHelper(logger); \
const bool _is_clients_log = (DB::CurrentThread::getGroup() != nullptr) && \
(DB::CurrentThread::get().getClientLogsLevel() >= (priority)); \
if (!_is_clients_log && !_logger->is((PRIORITY))) \
diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp
index d02e387afc3..62d4538cb84 100644
--- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp
+++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp
@@ -247,7 +247,7 @@ ReadFromMergeTree::ReadFromMergeTree(
size_t num_streams_,
bool sample_factor_column_queried_,
std::shared_ptr max_block_numbers_to_read_,
- Poco::Logger * log_,
+ LoggerPtr log_,
AnalysisResultPtr analyzed_result_ptr_,
bool enable_parallel_reading)
: SourceStepWithFilter(DataStream{.header = MergeTreeSelectProcessor::transformHeader(
@@ -274,7 +274,7 @@ ReadFromMergeTree::ReadFromMergeTree(
, requested_num_streams(num_streams_)
, sample_factor_column_queried(sample_factor_column_queried_)
, max_block_numbers_to_read(std::move(max_block_numbers_to_read_))
- , log(log_)
+ , log(std::move(log_))
, analyzed_result_ptr(analyzed_result_ptr_)
, is_parallel_reading_from_replicas(enable_parallel_reading)
{
@@ -1281,7 +1281,7 @@ ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
data,
real_column_names,
sample_factor_column_queried,
- log,
+ log.get(),
indexes);
}
diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.h b/src/Processors/QueryPlan/ReadFromMergeTree.h
index aed2a270ca1..3f2ac7ea931 100644
--- a/src/Processors/QueryPlan/ReadFromMergeTree.h
+++ b/src/Processors/QueryPlan/ReadFromMergeTree.h
@@ -120,7 +120,7 @@ public:
size_t num_streams_,
bool sample_factor_column_queried_,
std::shared_ptr max_block_numbers_to_read_,
- Poco::Logger * log_,
+ LoggerPtr log_,
AnalysisResultPtr analyzed_result_ptr_,
bool enable_parallel_reading);
@@ -259,7 +259,7 @@ private:
/// Pre-computed value, needed to trigger sets creating for PK
mutable std::optional indexes;
- Poco::Logger * log;
+ LoggerPtr log;
UInt64 selected_parts = 0;
UInt64 selected_rows = 0;
UInt64 selected_marks = 0;
diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp
index 87f23b0da2a..0c1bcff2c50 100644
--- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp
+++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp
@@ -1663,7 +1663,7 @@ try
metadata_manager->deleteAll(true);
metadata_manager->assertAllDeleted(true);
- getDataPartStorage().rename(to.parent_path(), to.filename(), storage.log, remove_new_dir_if_exists, fsync_dir);
+ getDataPartStorage().rename(to.parent_path(), to.filename(), storage.log.get(), remove_new_dir_if_exists, fsync_dir);
metadata_manager->updateAll(true);
auto new_projection_root_path = to.string();
@@ -1758,7 +1758,7 @@ void IMergeTreeDataPart::remove()
}
bool is_temporary_part = is_temp || state == MergeTreeDataPartState::Temporary;
- getDataPartStorage().remove(std::move(can_remove_callback), checksums, projection_checksums, is_temporary_part, storage.log);
+ getDataPartStorage().remove(std::move(can_remove_callback), checksums, projection_checksums, is_temporary_part, storage.log.get());
}
std::optional IMergeTreeDataPart::getRelativePathForPrefix(const String & prefix, bool detached, bool broken) const
@@ -1775,7 +1775,7 @@ std::optional IMergeTreeDataPart::getRelativePathForPrefix(const String
if (detached && parent_part)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot detach projection");
- return getDataPartStorage().getRelativePathForPrefix(storage.log, prefix, detached, broken);
+ return getDataPartStorage().getRelativePathForPrefix(storage.log.get(), prefix, detached, broken);
}
std::optional IMergeTreeDataPart::getRelativePathForDetachedPart(const String & prefix, bool broken) const
@@ -1841,7 +1841,7 @@ MutableDataPartStoragePtr IMergeTreeDataPart::makeCloneOnDisk(
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not clone data part {} to empty directory.", name);
String path_to_clone = fs::path(storage.relative_data_path) / directory_name / "";
- return getDataPartStorage().clonePart(path_to_clone, getDataPartStorage().getPartDirectory(), disk, read_settings, write_settings, storage.log, cancellation_hook);
+ return getDataPartStorage().clonePart(path_to_clone, getDataPartStorage().getPartDirectory(), disk, read_settings, write_settings, storage.log.get(), cancellation_hook);
}
UInt64 IMergeTreeDataPart::getIndexSizeFromFile() const
diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp
index 61332a4ff38..ef18a1d46a8 100644
--- a/src/Storages/MergeTree/MergeTreeData.cpp
+++ b/src/Storages/MergeTree/MergeTreeData.cpp
@@ -355,7 +355,7 @@ MergeTreeData::MergeTreeData(
, require_part_metadata(require_part_metadata_)
, broken_part_callback(broken_part_callback_)
, log_name(std::make_shared(table_id_.getNameForLogs()))
- , log(&Poco::Logger::get(*log_name))
+ , log(getLogger(*log_name))
, storage_settings(std::move(storage_settings_))
, pinned_part_uuids(std::make_shared())
, data_parts_by_info(data_parts_indexes.get())
@@ -1222,7 +1222,7 @@ MergeTreeData::PartLoadingTree::build(PartLoadingInfos nodes)
}
static std::optional calculatePartSizeSafe(
- const MergeTreeData::DataPartPtr & part, Poco::Logger * log)
+ const MergeTreeData::DataPartPtr & part, const LoggerPtr & log)
{
try
{
@@ -2114,7 +2114,7 @@ size_t MergeTreeData::clearOldTemporaryDirectories(const String & root_path, siz
{
/// Actually we don't rely on temporary_directories_lifetime when removing old temporaries directories,
/// it's just an extra level of protection just in case we have a bug.
- LOG_INFO(LogFrequencyLimiter(log, 10), "{} is in use (by merge/mutation/INSERT) (consider increasing temporary_directories_lifetime setting)", full_path);
+ LOG_INFO(LogFrequencyLimiter(log.get(), 10), "{} is in use (by merge/mutation/INSERT) (consider increasing temporary_directories_lifetime setting)", full_path);
continue;
}
else if (!disk->exists(it->path()))
@@ -2735,7 +2735,7 @@ void MergeTreeData::renameInMemory(const StorageID & new_table_id)
{
IStorage::renameInMemory(new_table_id);
std::atomic_store(&log_name, std::make_shared(new_table_id.getNameForLogs()));
- log = &Poco::Logger::get(*log_name);
+ log = getLogger(*log_name);
}
void MergeTreeData::dropAllData()
@@ -8181,7 +8181,7 @@ ReservationPtr MergeTreeData::balancedReservation(
}
// Record submerging big parts in the tagger to clean them up.
- tagger_ptr->emplace(*this, part_name, std::move(covered_parts), log);
+ tagger_ptr->emplace(*this, part_name, std::move(covered_parts), log.get());
}
}
}
diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h
index f0dbaf0e307..7ac46e12e1a 100644
--- a/src/Storages/MergeTree/MergeTreeData.h
+++ b/src/Storages/MergeTree/MergeTreeData.h
@@ -5,6 +5,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -1117,7 +1118,7 @@ protected:
/// log_name will change during table RENAME. Use atomic_shared_ptr to allow concurrent RW.
/// NOTE clang-14 doesn't have atomic_shared_ptr yet. Use std::atomic* operations for now.
std::shared_ptr log_name;
- std::atomic log;
+ LoggerPtr log;
/// Storage settings.
/// Use get and set to receive readonly versions.
diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp
index 8c03aef6f99..58fddde7b54 100644
--- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp
+++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp
@@ -66,7 +66,7 @@ static const double DISK_USAGE_COEFFICIENT_TO_SELECT = 2;
static const double DISK_USAGE_COEFFICIENT_TO_RESERVE = 1.1;
MergeTreeDataMergerMutator::MergeTreeDataMergerMutator(MergeTreeData & data_)
- : data(data_), log(&Poco::Logger::get(data.getLogName() + " (MergerMutator)"))
+ : data(data_), log(getLogger(data.getLogName() + " (MergerMutator)"))
{
}
diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h
index 6eab0ee0c37..f3a3f51b6c3 100644
--- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h
+++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h
@@ -213,7 +213,7 @@ public :
private:
MergeTreeData & data;
- Poco::Logger * log;
+ LoggerPtr log;
/// When the last time you wrote to the log that the disk space was running out (not to write about this too often).
time_t disk_space_warning_time = 0;
diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h
index ba1f20054f0..93eac427f77 100644
--- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h
+++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h
@@ -75,7 +75,7 @@ public:
private:
const MergeTreeData & data;
- Poco::Logger * log;
+ LoggerPtr log;
/// Get the approximate value (bottom estimate - only by full marks) of the number of rows falling under the index.
static size_t getApproximateTotalRowsToRead(
diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/src/Storages/MergeTree/MergeTreeDataWriter.cpp
index 3c0b2d2b42e..25785b8aea0 100644
--- a/src/Storages/MergeTree/MergeTreeDataWriter.cpp
+++ b/src/Storages/MergeTree/MergeTreeDataWriter.cpp
@@ -591,7 +591,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
auto projection_block = projection.calculate(block, context);
if (projection_block.rows())
{
- auto proj_temp_part = writeProjectionPart(data, log, projection_block, projection, new_data_part.get());
+ auto proj_temp_part = writeProjectionPart(data, log.get(), projection_block, projection, new_data_part.get());
new_data_part->addProjectionPart(projection.name, std::move(proj_temp_part.part));
for (auto & stream : proj_temp_part.streams)
temp_part.streams.emplace_back(std::move(stream));
diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.h b/src/Storages/MergeTree/MergeTreeDataWriter.h
index 2fb6b1f22d4..aaa0f71eccf 100644
--- a/src/Storages/MergeTree/MergeTreeDataWriter.h
+++ b/src/Storages/MergeTree/MergeTreeDataWriter.h
@@ -45,8 +45,9 @@ class MergeTreeDataWriter
public:
explicit MergeTreeDataWriter(MergeTreeData & data_)
: data(data_)
- , log(&Poco::Logger::get(data.getLogName() + " (Writer)"))
- {}
+ , log(getLogger(data.getLogName() + " (Writer)"))
+ {
+ }
/** Split the block to blocks, each of them must be written as separate part.
* (split rows by partition)
@@ -131,7 +132,7 @@ private:
const ProjectionDescription & projection);
MergeTreeData & data;
- Poco::Logger * log;
+ LoggerPtr log;
};
}
diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp
index fbdde15c2af..ab2f7ea2989 100644
--- a/src/Storages/StorageMergeTree.cpp
+++ b/src/Storages/StorageMergeTree.cpp
@@ -683,7 +683,7 @@ std::optional StorageMergeTree::getIncompleteMutationsS
const auto & mutation_entry = current_mutation_it->second;
- auto txn = tryGetTransactionForMutation(mutation_entry, log);
+ auto txn = tryGetTransactionForMutation(mutation_entry, log.get());
/// There's no way a transaction may finish before a mutation that was started by the transaction.
/// But sometimes we need to check status of an unrelated mutation, in this case we don't care about transactions.
assert(txn || mutation_entry.tid.isPrehistoric() || from_another_mutation);
@@ -829,7 +829,7 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
if (!to_kill)
return CancellationCode::NotFound;
- if (auto txn = tryGetTransactionForMutation(*to_kill, log))
+ if (auto txn = tryGetTransactionForMutation(*to_kill, log.get()))
{
LOG_TRACE(log, "Cancelling transaction {} which had started mutation {}", to_kill->tid, mutation_id);
TransactionLog::instance().rollbackTransaction(txn);
@@ -1222,7 +1222,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
if (!part->version.isVisible(first_mutation_tid.start_csn, first_mutation_tid))
continue;
- txn = tryGetTransactionForMutation(mutations_begin_it->second, log);
+ txn = tryGetTransactionForMutation(mutations_begin_it->second, log.get());
if (!txn)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find transaction {} that has started mutation {} "
"that is going to be applied to part {}",
diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp
index 715cbab9eea..1e86f3f70f5 100644
--- a/src/Storages/StorageReplicatedMergeTree.cpp
+++ b/src/Storages/StorageReplicatedMergeTree.cpp
@@ -320,7 +320,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
attach,
[this] (const std::string & name) { enqueuePartForCheck(name); })
, zookeeper_name(zkutil::extractZooKeeperName(zookeeper_path_))
- , zookeeper_path(zkutil::extractZooKeeperPath(zookeeper_path_, /* check_starts_with_slash */ !attach, log))
+ , zookeeper_path(zkutil::extractZooKeeperPath(zookeeper_path_, /* check_starts_with_slash */ !attach, log.get()))
, replica_name(replica_name_)
, replica_path(fs::path(zookeeper_path) / "replicas" / replica_name_)
, reader(*this)
@@ -812,7 +812,7 @@ bool StorageReplicatedMergeTree::createTableIfNotExists(const StorageMetadataPtr
else
{
auto metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(drop_lock_path, *zookeeper);
- if (!removeTableNodesFromZooKeeper(zookeeper, zookeeper_path, metadata_drop_lock, log))
+ if (!removeTableNodesFromZooKeeper(zookeeper, zookeeper_path, metadata_drop_lock, log.get()))
{
/// Someone is recursively removing table right now, we cannot create new table until old one is removed
continue;
@@ -1128,7 +1128,7 @@ void StorageReplicatedMergeTree::drop()
if (lost_part_count > 0)
LOG_INFO(log, "Dropping table with non-zero lost_part_count equal to {}", lost_part_count);
}
- dropReplica(zookeeper, zookeeper_path, replica_name, log, getSettings(), &has_metadata_in_zookeeper);
+ dropReplica(zookeeper, zookeeper_path, replica_name, log.get(), getSettings(), &has_metadata_in_zookeeper);
}
}
@@ -4181,7 +4181,7 @@ void StorageReplicatedMergeTree::startBeingLeader()
return;
}
- zkutil::checkNoOldLeaders(log, *zookeeper, fs::path(zookeeper_path) / "leader_election");
+ zkutil::checkNoOldLeaders(log.get(), *zookeeper, fs::path(zookeeper_path) / "leader_election");
LOG_INFO(log, "Became leader");
is_leader = true;
@@ -4275,7 +4275,7 @@ void StorageReplicatedMergeTree::waitForUniquePartsToBeFetchedByOtherReplicas(St
auto zookeeper = getZooKeeperIfTableShutDown();
- auto unique_parts_set = findReplicaUniqueParts(replica_name, zookeeper_path, format_version, zookeeper, log);
+ auto unique_parts_set = findReplicaUniqueParts(replica_name, zookeeper_path, format_version, zookeeper, log.get());
if (unique_parts_set.empty())
{
LOG_INFO(log, "Will not wait for unique parts to be fetched because we don't have any unique parts");
@@ -9348,7 +9348,7 @@ StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & part, co
return unlockSharedDataByID(
part.getUniqueId(), shared_id, part.info, replica_name,
- part.getDataPartStorage().getDiskType(), zookeeper, *getSettings(), log, zookeeper_path, format_version);
+ part.getDataPartStorage().getDiskType(), zookeeper, *getSettings(), log.get(), zookeeper_path, format_version);
}
namespace
@@ -10301,7 +10301,7 @@ void StorageReplicatedMergeTree::backupData(
bool exists = false;
Strings mutation_ids;
{
- ZooKeeperRetriesControl retries_ctl("getMutations", log, zookeeper_retries_info, nullptr);
+ ZooKeeperRetriesControl retries_ctl("getMutations", log.get(), zookeeper_retries_info, nullptr);
retries_ctl.retryLoop([&]()
{
if (!zookeeper || zookeeper->expired())
@@ -10320,7 +10320,7 @@ void StorageReplicatedMergeTree::backupData(
bool mutation_id_exists = false;
String mutation;
- ZooKeeperRetriesControl retries_ctl("getMutation", log, zookeeper_retries_info, nullptr);
+ ZooKeeperRetriesControl retries_ctl("getMutation", log.get(), zookeeper_retries_info, nullptr);
retries_ctl.retryLoop([&]()
{
if (!zookeeper || zookeeper->expired())