mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge pull request #68728 from ClickHouse/chesema-around-logs
fix shutdown for PeriodicLog
This commit is contained in:
commit
573d83ff97
@ -273,6 +273,25 @@ void SystemLogBase<LogElement>::startup()
|
||||
saving_thread = std::make_unique<ThreadFromGlobalPool>([this] { savingThreadFunction(); });
|
||||
}
|
||||
|
||||
template <typename LogElement>
|
||||
void SystemLogBase<LogElement>::stopFlushThread()
|
||||
{
|
||||
{
|
||||
std::lock_guard lock(thread_mutex);
|
||||
|
||||
if (!saving_thread || !saving_thread->joinable())
|
||||
return;
|
||||
|
||||
if (is_shutdown)
|
||||
return;
|
||||
|
||||
is_shutdown = true;
|
||||
queue->shutdown();
|
||||
}
|
||||
|
||||
saving_thread->join();
|
||||
}
|
||||
|
||||
template <typename LogElement>
|
||||
void SystemLogBase<LogElement>::add(LogElement element)
|
||||
{
|
||||
|
@ -216,6 +216,8 @@ public:
|
||||
static consteval bool shouldTurnOffLogger() { return false; }
|
||||
|
||||
protected:
|
||||
void stopFlushThread() final;
|
||||
|
||||
std::shared_ptr<SystemLogQueue<LogElement>> queue;
|
||||
};
|
||||
}
|
||||
|
@ -10,7 +10,7 @@ void PeriodicLog<LogElement>::startCollect(size_t collect_interval_milliseconds_
|
||||
{
|
||||
collect_interval_milliseconds = collect_interval_milliseconds_;
|
||||
is_shutdown_metric_thread = false;
|
||||
flush_thread = std::make_unique<ThreadFromGlobalPool>([this] { threadFunction(); });
|
||||
collecting_thread = std::make_unique<ThreadFromGlobalPool>([this] { threadFunction(); });
|
||||
}
|
||||
|
||||
template <typename LogElement>
|
||||
@ -19,15 +19,15 @@ void PeriodicLog<LogElement>::stopCollect()
|
||||
bool old_val = false;
|
||||
if (!is_shutdown_metric_thread.compare_exchange_strong(old_val, true))
|
||||
return;
|
||||
if (flush_thread)
|
||||
flush_thread->join();
|
||||
if (collecting_thread)
|
||||
collecting_thread->join();
|
||||
}
|
||||
|
||||
template <typename LogElement>
|
||||
void PeriodicLog<LogElement>::shutdown()
|
||||
{
|
||||
stopCollect();
|
||||
this->stopFlushThread();
|
||||
Base::shutdown();
|
||||
}
|
||||
|
||||
template <typename LogElement>
|
||||
|
@ -17,6 +17,7 @@ template <typename LogElement>
|
||||
class PeriodicLog : public SystemLog<LogElement>
|
||||
{
|
||||
using SystemLog<LogElement>::SystemLog;
|
||||
using Base = SystemLog<LogElement>;
|
||||
|
||||
public:
|
||||
using TimePoint = std::chrono::system_clock::time_point;
|
||||
@ -24,18 +25,18 @@ public:
|
||||
/// Launches a background thread to collect metrics with interval
|
||||
void startCollect(size_t collect_interval_milliseconds_);
|
||||
|
||||
/// Stop background thread
|
||||
void stopCollect();
|
||||
|
||||
void shutdown() final;
|
||||
|
||||
protected:
|
||||
/// Stop background thread
|
||||
void stopCollect();
|
||||
|
||||
virtual void stepFunction(TimePoint current_time) = 0;
|
||||
|
||||
private:
|
||||
void threadFunction();
|
||||
|
||||
std::unique_ptr<ThreadFromGlobalPool> flush_thread;
|
||||
std::unique_ptr<ThreadFromGlobalPool> collecting_thread;
|
||||
size_t collect_interval_milliseconds;
|
||||
std::atomic<bool> is_shutdown_metric_thread{false};
|
||||
};
|
||||
|
@ -402,32 +402,13 @@ SystemLog<LogElement>::SystemLog(
|
||||
template <typename LogElement>
|
||||
void SystemLog<LogElement>::shutdown()
|
||||
{
|
||||
stopFlushThread();
|
||||
Base::stopFlushThread();
|
||||
|
||||
auto table = DatabaseCatalog::instance().tryGetTable(table_id, getContext());
|
||||
if (table)
|
||||
table->flushAndShutdown();
|
||||
}
|
||||
|
||||
template <typename LogElement>
|
||||
void SystemLog<LogElement>::stopFlushThread()
|
||||
{
|
||||
{
|
||||
std::lock_guard lock(thread_mutex);
|
||||
|
||||
if (!saving_thread || !saving_thread->joinable())
|
||||
return;
|
||||
|
||||
if (is_shutdown)
|
||||
return;
|
||||
|
||||
is_shutdown = true;
|
||||
queue->shutdown();
|
||||
}
|
||||
|
||||
saving_thread->join();
|
||||
}
|
||||
|
||||
|
||||
template <typename LogElement>
|
||||
void SystemLog<LogElement>::savingThreadFunction()
|
||||
|
@ -125,8 +125,6 @@ public:
|
||||
|
||||
void shutdown() override;
|
||||
|
||||
void stopFlushThread() override;
|
||||
|
||||
/** Creates new table if it does not exist.
|
||||
* Renames old table if its structure is not suitable.
|
||||
* This cannot be done in constructor to avoid deadlock while renaming a table under locked Context when SystemLog object is created.
|
||||
@ -136,9 +134,6 @@ public:
|
||||
protected:
|
||||
LoggerPtr log;
|
||||
|
||||
using ISystemLog::is_shutdown;
|
||||
using ISystemLog::saving_thread;
|
||||
using ISystemLog::thread_mutex;
|
||||
using Base::queue;
|
||||
|
||||
StoragePtr getStorage() const;
|
||||
|
Loading…
Reference in New Issue
Block a user