Table CREATE DROP Poco::Logger memory leak fix

This commit is contained in:
Maksim Kita 2024-01-15 18:54:32 +03:00
parent 5ba7a78d23
commit 42029f42e7
17 changed files with 85 additions and 39 deletions

View File

@ -251,6 +251,11 @@ void tryLogCurrentException(Poco::Logger * logger, const std::string & start_of_
tryLogCurrentExceptionImpl(logger, start_of_message);
}
void tryLogCurrentException(const LoggerPtr & logger, const std::string & start_of_message)
{
tryLogCurrentException(logger.get(), start_of_message);
}
static void getNoSpaceLeftInfoMessage(std::filesystem::path path, String & msg)
{
path = std::filesystem::absolute(path);
@ -523,6 +528,11 @@ void tryLogException(std::exception_ptr e, Poco::Logger * logger, const std::str
}
}
void tryLogException(std::exception_ptr e, const LoggerPtr & logger, const std::string & start_of_message)
{
tryLogException(e, logger.get(), start_of_message);
}
std::string getExceptionMessage(const Exception & e, bool with_stacktrace, bool check_embedded_stacktrace)
{
return getExceptionMessageAndPattern(e, with_stacktrace, check_embedded_stacktrace).text;

View File

@ -10,6 +10,7 @@
#include <base/errnoToString.h>
#include <base/scope_guard.h>
#include <Common/LoggingFormatStringHelpers.h>
#include <Common/LoggerPtr.h>
#include <Common/StackTrace.h>
#include <fmt/format.h>
@ -242,6 +243,7 @@ using Exceptions = std::vector<std::exception_ptr>;
*/
void tryLogCurrentException(const char * log_name, const std::string & start_of_message = "");
void tryLogCurrentException(Poco::Logger * logger, const std::string & start_of_message = "");
void tryLogCurrentException(const LoggerPtr & logger, const std::string & start_of_message = "");
/** Prints current exception in canonical format.
@ -287,6 +289,7 @@ struct ExecutionStatus
void tryLogException(std::exception_ptr e, const char * log_name, const std::string & start_of_message = "");
void tryLogException(std::exception_ptr e, Poco::Logger * logger, const std::string & start_of_message = "");
void tryLogException(std::exception_ptr e, const LoggerPtr & logger, const std::string & start_of_message = "");
std::string getExceptionMessage(const Exception & e, bool with_stacktrace, bool check_embedded_stacktrace = false);
PreformattedMessage getExceptionMessageAndPattern(const Exception & e, bool with_stacktrace, bool check_embedded_stacktrace = false);

15
src/Common/LoggerPtr.cpp Normal file
View File

@ -0,0 +1,15 @@
#include <Common/LoggerPtr.h>
struct LoggerDeleter
{
void operator()(const Poco::Logger * logger)
{
Poco::Logger::destroy(logger->name());
}
};
LoggerPtr getLogger(const std::string & name)
{
Poco::Logger * logger_raw_ptr = &Poco::Logger::get(name);
return std::shared_ptr<Poco::Logger>(logger_raw_ptr, LoggerDeleter());
}

14
src/Common/LoggerPtr.h Normal file
View File

@ -0,0 +1,14 @@
#pragma once
#include <Poco/Logger.h>
namespace Poco
{
class Logger;
}
using LoggerPtr = std::shared_ptr<Poco::Logger>;
LoggerPtr getLogger(const std::string & name);

View File

@ -8,6 +8,7 @@
#include <Common/CurrentThread.h>
#include <Common/ProfileEvents.h>
#include <Common/LoggingFormatStringHelpers.h>
#include <Common/LoggerPtr.h>
namespace Poco { class Logger; }
@ -19,11 +20,12 @@ using LogSeriesLimiterPtr = std::shared_ptr<LogSeriesLimiter>;
namespace
{
[[maybe_unused]] const ::Poco::Logger * getLogger(const ::Poco::Logger * logger) { return logger; }
[[maybe_unused]] const ::Poco::Logger * getLogger(const std::atomic<::Poco::Logger *> & logger) { return logger.load(); }
[[maybe_unused]] std::unique_ptr<LogToStrImpl> getLogger(std::unique_ptr<LogToStrImpl> && logger) { return logger; }
[[maybe_unused]] std::unique_ptr<LogFrequencyLimiterIml> getLogger(std::unique_ptr<LogFrequencyLimiterIml> && logger) { return logger; }
[[maybe_unused]] LogSeriesLimiterPtr getLogger(LogSeriesLimiterPtr & logger) { return logger; }
[[maybe_unused]] const ::Poco::Logger * getLoggerHelper(const LoggerPtr & logger) { return logger.get(); }
[[maybe_unused]] const ::Poco::Logger * getLoggerHelper(const ::Poco::Logger * logger) { return logger; }
[[maybe_unused]] const ::Poco::Logger * getLoggerHelper(const std::atomic<::Poco::Logger *> & logger) { return logger.load(); }
[[maybe_unused]] std::unique_ptr<LogToStrImpl> getLoggerHelper(std::unique_ptr<LogToStrImpl> && logger) { return logger; }
[[maybe_unused]] std::unique_ptr<LogFrequencyLimiterIml> getLoggerHelper(std::unique_ptr<LogFrequencyLimiterIml> && logger) { return logger; }
[[maybe_unused]] LogSeriesLimiterPtr getLoggerHelper(LogSeriesLimiterPtr & logger) { return logger; }
}
#define LOG_IMPL_FIRST_ARG(X, ...) X
@ -62,7 +64,7 @@ namespace
#define LOG_IMPL(logger, priority, PRIORITY, ...) do \
{ \
auto _logger = ::getLogger(logger); \
auto _logger = ::getLoggerHelper(logger); \
const bool _is_clients_log = (DB::CurrentThread::getGroup() != nullptr) && \
(DB::CurrentThread::get().getClientLogsLevel() >= (priority)); \
if (!_is_clients_log && !_logger->is((PRIORITY))) \

View File

@ -247,7 +247,7 @@ ReadFromMergeTree::ReadFromMergeTree(
size_t num_streams_,
bool sample_factor_column_queried_,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read_,
Poco::Logger * log_,
LoggerPtr log_,
AnalysisResultPtr analyzed_result_ptr_,
bool enable_parallel_reading)
: SourceStepWithFilter(DataStream{.header = MergeTreeSelectProcessor::transformHeader(
@ -274,7 +274,7 @@ ReadFromMergeTree::ReadFromMergeTree(
, requested_num_streams(num_streams_)
, sample_factor_column_queried(sample_factor_column_queried_)
, max_block_numbers_to_read(std::move(max_block_numbers_to_read_))
, log(log_)
, log(std::move(log_))
, analyzed_result_ptr(analyzed_result_ptr_)
, is_parallel_reading_from_replicas(enable_parallel_reading)
{
@ -1281,7 +1281,7 @@ ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
data,
real_column_names,
sample_factor_column_queried,
log,
log.get(),
indexes);
}

View File

@ -120,7 +120,7 @@ public:
size_t num_streams_,
bool sample_factor_column_queried_,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read_,
Poco::Logger * log_,
LoggerPtr log_,
AnalysisResultPtr analyzed_result_ptr_,
bool enable_parallel_reading);
@ -259,7 +259,7 @@ private:
/// Pre-computed value, needed to trigger sets creating for PK
mutable std::optional<Indexes> indexes;
Poco::Logger * log;
LoggerPtr log;
UInt64 selected_parts = 0;
UInt64 selected_rows = 0;
UInt64 selected_marks = 0;

View File

@ -1663,7 +1663,7 @@ try
metadata_manager->deleteAll(true);
metadata_manager->assertAllDeleted(true);
getDataPartStorage().rename(to.parent_path(), to.filename(), storage.log, remove_new_dir_if_exists, fsync_dir);
getDataPartStorage().rename(to.parent_path(), to.filename(), storage.log.get(), remove_new_dir_if_exists, fsync_dir);
metadata_manager->updateAll(true);
auto new_projection_root_path = to.string();
@ -1758,7 +1758,7 @@ void IMergeTreeDataPart::remove()
}
bool is_temporary_part = is_temp || state == MergeTreeDataPartState::Temporary;
getDataPartStorage().remove(std::move(can_remove_callback), checksums, projection_checksums, is_temporary_part, storage.log);
getDataPartStorage().remove(std::move(can_remove_callback), checksums, projection_checksums, is_temporary_part, storage.log.get());
}
std::optional<String> IMergeTreeDataPart::getRelativePathForPrefix(const String & prefix, bool detached, bool broken) const
@ -1775,7 +1775,7 @@ std::optional<String> IMergeTreeDataPart::getRelativePathForPrefix(const String
if (detached && parent_part)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot detach projection");
return getDataPartStorage().getRelativePathForPrefix(storage.log, prefix, detached, broken);
return getDataPartStorage().getRelativePathForPrefix(storage.log.get(), prefix, detached, broken);
}
std::optional<String> IMergeTreeDataPart::getRelativePathForDetachedPart(const String & prefix, bool broken) const
@ -1841,7 +1841,7 @@ MutableDataPartStoragePtr IMergeTreeDataPart::makeCloneOnDisk(
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not clone data part {} to empty directory.", name);
String path_to_clone = fs::path(storage.relative_data_path) / directory_name / "";
return getDataPartStorage().clonePart(path_to_clone, getDataPartStorage().getPartDirectory(), disk, read_settings, write_settings, storage.log, cancellation_hook);
return getDataPartStorage().clonePart(path_to_clone, getDataPartStorage().getPartDirectory(), disk, read_settings, write_settings, storage.log.get(), cancellation_hook);
}
UInt64 IMergeTreeDataPart::getIndexSizeFromFile() const

View File

@ -355,7 +355,7 @@ MergeTreeData::MergeTreeData(
, require_part_metadata(require_part_metadata_)
, broken_part_callback(broken_part_callback_)
, log_name(std::make_shared<String>(table_id_.getNameForLogs()))
, log(&Poco::Logger::get(*log_name))
, log(getLogger(*log_name))
, storage_settings(std::move(storage_settings_))
, pinned_part_uuids(std::make_shared<PinnedPartUUIDs>())
, data_parts_by_info(data_parts_indexes.get<TagByInfo>())
@ -1222,7 +1222,7 @@ MergeTreeData::PartLoadingTree::build(PartLoadingInfos nodes)
}
static std::optional<size_t> calculatePartSizeSafe(
const MergeTreeData::DataPartPtr & part, Poco::Logger * log)
const MergeTreeData::DataPartPtr & part, const LoggerPtr & log)
{
try
{
@ -2114,7 +2114,7 @@ size_t MergeTreeData::clearOldTemporaryDirectories(const String & root_path, siz
{
/// Actually we don't rely on temporary_directories_lifetime when removing old temporaries directories,
/// it's just an extra level of protection just in case we have a bug.
LOG_INFO(LogFrequencyLimiter(log, 10), "{} is in use (by merge/mutation/INSERT) (consider increasing temporary_directories_lifetime setting)", full_path);
LOG_INFO(LogFrequencyLimiter(log.get(), 10), "{} is in use (by merge/mutation/INSERT) (consider increasing temporary_directories_lifetime setting)", full_path);
continue;
}
else if (!disk->exists(it->path()))
@ -2735,7 +2735,7 @@ void MergeTreeData::renameInMemory(const StorageID & new_table_id)
{
IStorage::renameInMemory(new_table_id);
std::atomic_store(&log_name, std::make_shared<String>(new_table_id.getNameForLogs()));
log = &Poco::Logger::get(*log_name);
log = getLogger(*log_name);
}
void MergeTreeData::dropAllData()
@ -8181,7 +8181,7 @@ ReservationPtr MergeTreeData::balancedReservation(
}
// Record submerging big parts in the tagger to clean them up.
tagger_ptr->emplace(*this, part_name, std::move(covered_parts), log);
tagger_ptr->emplace(*this, part_name, std::move(covered_parts), log.get());
}
}
}

View File

@ -5,6 +5,7 @@
#include <Common/SimpleIncrement.h>
#include <Common/SharedMutex.h>
#include <Common/MultiVersion.h>
#include <Common/LoggerPtr.h>
#include <Storages/IStorage.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromFile.h>
@ -1117,7 +1118,7 @@ protected:
/// log_name will change during table RENAME. Use atomic_shared_ptr to allow concurrent RW.
/// NOTE clang-14 doesn't have atomic_shared_ptr yet. Use std::atomic* operations for now.
std::shared_ptr<String> log_name;
std::atomic<Poco::Logger *> log;
LoggerPtr log;
/// Storage settings.
/// Use get and set to receive readonly versions.

View File

@ -66,7 +66,7 @@ static const double DISK_USAGE_COEFFICIENT_TO_SELECT = 2;
static const double DISK_USAGE_COEFFICIENT_TO_RESERVE = 1.1;
MergeTreeDataMergerMutator::MergeTreeDataMergerMutator(MergeTreeData & data_)
: data(data_), log(&Poco::Logger::get(data.getLogName() + " (MergerMutator)"))
: data(data_), log(getLogger(data.getLogName() + " (MergerMutator)"))
{
}

View File

@ -213,7 +213,7 @@ public :
private:
MergeTreeData & data;
Poco::Logger * log;
LoggerPtr log;
/// When the last time you wrote to the log that the disk space was running out (not to write about this too often).
time_t disk_space_warning_time = 0;

View File

@ -75,7 +75,7 @@ public:
private:
const MergeTreeData & data;
Poco::Logger * log;
LoggerPtr log;
/// Get the approximate value (bottom estimate - only by full marks) of the number of rows falling under the index.
static size_t getApproximateTotalRowsToRead(

View File

@ -591,7 +591,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
auto projection_block = projection.calculate(block, context);
if (projection_block.rows())
{
auto proj_temp_part = writeProjectionPart(data, log, projection_block, projection, new_data_part.get());
auto proj_temp_part = writeProjectionPart(data, log.get(), projection_block, projection, new_data_part.get());
new_data_part->addProjectionPart(projection.name, std::move(proj_temp_part.part));
for (auto & stream : proj_temp_part.streams)
temp_part.streams.emplace_back(std::move(stream));

View File

@ -45,8 +45,9 @@ class MergeTreeDataWriter
public:
explicit MergeTreeDataWriter(MergeTreeData & data_)
: data(data_)
, log(&Poco::Logger::get(data.getLogName() + " (Writer)"))
{}
, log(getLogger(data.getLogName() + " (Writer)"))
{
}
/** Split the block to blocks, each of them must be written as separate part.
* (split rows by partition)
@ -131,7 +132,7 @@ private:
const ProjectionDescription & projection);
MergeTreeData & data;
Poco::Logger * log;
LoggerPtr log;
};
}

View File

@ -683,7 +683,7 @@ std::optional<MergeTreeMutationStatus> StorageMergeTree::getIncompleteMutationsS
const auto & mutation_entry = current_mutation_it->second;
auto txn = tryGetTransactionForMutation(mutation_entry, log);
auto txn = tryGetTransactionForMutation(mutation_entry, log.get());
/// There's no way a transaction may finish before a mutation that was started by the transaction.
/// But sometimes we need to check status of an unrelated mutation, in this case we don't care about transactions.
assert(txn || mutation_entry.tid.isPrehistoric() || from_another_mutation);
@ -829,7 +829,7 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
if (!to_kill)
return CancellationCode::NotFound;
if (auto txn = tryGetTransactionForMutation(*to_kill, log))
if (auto txn = tryGetTransactionForMutation(*to_kill, log.get()))
{
LOG_TRACE(log, "Cancelling transaction {} which had started mutation {}", to_kill->tid, mutation_id);
TransactionLog::instance().rollbackTransaction(txn);
@ -1222,7 +1222,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
if (!part->version.isVisible(first_mutation_tid.start_csn, first_mutation_tid))
continue;
txn = tryGetTransactionForMutation(mutations_begin_it->second, log);
txn = tryGetTransactionForMutation(mutations_begin_it->second, log.get());
if (!txn)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find transaction {} that has started mutation {} "
"that is going to be applied to part {}",

View File

@ -320,7 +320,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
attach,
[this] (const std::string & name) { enqueuePartForCheck(name); })
, zookeeper_name(zkutil::extractZooKeeperName(zookeeper_path_))
, zookeeper_path(zkutil::extractZooKeeperPath(zookeeper_path_, /* check_starts_with_slash */ !attach, log))
, zookeeper_path(zkutil::extractZooKeeperPath(zookeeper_path_, /* check_starts_with_slash */ !attach, log.get()))
, replica_name(replica_name_)
, replica_path(fs::path(zookeeper_path) / "replicas" / replica_name_)
, reader(*this)
@ -812,7 +812,7 @@ bool StorageReplicatedMergeTree::createTableIfNotExists(const StorageMetadataPtr
else
{
auto metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(drop_lock_path, *zookeeper);
if (!removeTableNodesFromZooKeeper(zookeeper, zookeeper_path, metadata_drop_lock, log))
if (!removeTableNodesFromZooKeeper(zookeeper, zookeeper_path, metadata_drop_lock, log.get()))
{
/// Someone is recursively removing table right now, we cannot create new table until old one is removed
continue;
@ -1128,7 +1128,7 @@ void StorageReplicatedMergeTree::drop()
if (lost_part_count > 0)
LOG_INFO(log, "Dropping table with non-zero lost_part_count equal to {}", lost_part_count);
}
dropReplica(zookeeper, zookeeper_path, replica_name, log, getSettings(), &has_metadata_in_zookeeper);
dropReplica(zookeeper, zookeeper_path, replica_name, log.get(), getSettings(), &has_metadata_in_zookeeper);
}
}
@ -4181,7 +4181,7 @@ void StorageReplicatedMergeTree::startBeingLeader()
return;
}
zkutil::checkNoOldLeaders(log, *zookeeper, fs::path(zookeeper_path) / "leader_election");
zkutil::checkNoOldLeaders(log.get(), *zookeeper, fs::path(zookeeper_path) / "leader_election");
LOG_INFO(log, "Became leader");
is_leader = true;
@ -4275,7 +4275,7 @@ void StorageReplicatedMergeTree::waitForUniquePartsToBeFetchedByOtherReplicas(St
auto zookeeper = getZooKeeperIfTableShutDown();
auto unique_parts_set = findReplicaUniqueParts(replica_name, zookeeper_path, format_version, zookeeper, log);
auto unique_parts_set = findReplicaUniqueParts(replica_name, zookeeper_path, format_version, zookeeper, log.get());
if (unique_parts_set.empty())
{
LOG_INFO(log, "Will not wait for unique parts to be fetched because we don't have any unique parts");
@ -9348,7 +9348,7 @@ StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & part, co
return unlockSharedDataByID(
part.getUniqueId(), shared_id, part.info, replica_name,
part.getDataPartStorage().getDiskType(), zookeeper, *getSettings(), log, zookeeper_path, format_version);
part.getDataPartStorage().getDiskType(), zookeeper, *getSettings(), log.get(), zookeeper_path, format_version);
}
namespace
@ -10301,7 +10301,7 @@ void StorageReplicatedMergeTree::backupData(
bool exists = false;
Strings mutation_ids;
{
ZooKeeperRetriesControl retries_ctl("getMutations", log, zookeeper_retries_info, nullptr);
ZooKeeperRetriesControl retries_ctl("getMutations", log.get(), zookeeper_retries_info, nullptr);
retries_ctl.retryLoop([&]()
{
if (!zookeeper || zookeeper->expired())
@ -10320,7 +10320,7 @@ void StorageReplicatedMergeTree::backupData(
bool mutation_id_exists = false;
String mutation;
ZooKeeperRetriesControl retries_ctl("getMutation", log, zookeeper_retries_info, nullptr);
ZooKeeperRetriesControl retries_ctl("getMutation", log.get(), zookeeper_retries_info, nullptr);
retries_ctl.retryLoop([&]()
{
if (!zookeeper || zookeeper->expired())