move stopFlushThread to SystemLogBase

This commit is contained in:
Sema Checherinda 2024-08-22 17:18:06 +02:00
parent a9e793532a
commit 859d2bfe27
6 changed files with 27 additions and 30 deletions

View File

@ -273,6 +273,25 @@ void SystemLogBase<LogElement>::startup()
saving_thread = std::make_unique<ThreadFromGlobalPool>([this] { savingThreadFunction(); });
}
template <typename LogElement>
void SystemLogBase<LogElement>::stopFlushThread()
{
{
std::lock_guard lock(thread_mutex);
if (!saving_thread || !saving_thread->joinable())
return;
if (is_shutdown)
return;
is_shutdown = true;
queue->shutdown();
}
saving_thread->join();
}
template <typename LogElement>
void SystemLogBase<LogElement>::add(LogElement element)
{

View File

@ -216,6 +216,8 @@ public:
static consteval bool shouldTurnOffLogger() { return false; }
protected:
void stopFlushThread() final;
std::shared_ptr<SystemLogQueue<LogElement>> queue;
};
}

View File

@ -11,7 +11,7 @@ void PeriodicLog<LogElement>::startCollect(size_t collect_interval_milliseconds_
{
collect_interval_milliseconds = collect_interval_milliseconds_;
is_shutdown_metric_thread = false;
flush_thread = std::make_unique<ThreadFromGlobalPool>([this] { threadFunction(); });
collecting_thread = std::make_unique<ThreadFromGlobalPool>([this] { threadFunction(); });
}
template <typename LogElement>
@ -20,8 +20,8 @@ void PeriodicLog<LogElement>::stopCollect()
bool old_val = false;
if (!is_shutdown_metric_thread.compare_exchange_strong(old_val, true))
return;
if (flush_thread)
flush_thread->join();
if (collecting_thread)
collecting_thread->join();
}
template <typename LogElement>

View File

@ -36,7 +36,7 @@ protected:
private:
void threadFunction();
std::unique_ptr<ThreadFromGlobalPool> flush_thread;
std::unique_ptr<ThreadFromGlobalPool> collecting_thread;
size_t collect_interval_milliseconds;
std::atomic<bool> is_shutdown_metric_thread{false};
};

View File

@ -402,32 +402,13 @@ SystemLog<LogElement>::SystemLog(
template <typename LogElement>
void SystemLog<LogElement>::shutdown()
{
stopFlushThread();
Base::stopFlushThread();
auto table = DatabaseCatalog::instance().tryGetTable(table_id, getContext());
if (table)
table->flushAndShutdown();
}
template <typename LogElement>
void SystemLog<LogElement>::stopFlushThread()
{
{
std::lock_guard lock(thread_mutex);
if (!saving_thread || !saving_thread->joinable())
return;
if (is_shutdown)
return;
is_shutdown = true;
queue->shutdown();
}
saving_thread->join();
}
template <typename LogElement>
void SystemLog<LogElement>::savingThreadFunction()

View File

@ -125,8 +125,6 @@ public:
void shutdown() override;
void stopFlushThread() override;
/** Creates new table if it does not exist.
* Renames old table if its structure is not suitable.
* This cannot be done in constructor to avoid deadlock while renaming a table under locked Context when SystemLog object is created.
@ -136,10 +134,7 @@ public:
protected:
LoggerPtr log;
using ISystemLog::is_shutdown;
using ISystemLog::saving_thread;
using ISystemLog::thread_mutex;
using Base::queue;
using Base::queue;
StoragePtr getStorage() const;