mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Fix shutdown
This commit is contained in:
parent
fe6918a9d5
commit
f153d6aa3c
@ -62,6 +62,7 @@ void SystemLogBase<LogElement>::stopFlushThread()
|
||||
return;
|
||||
|
||||
is_shutdown = true;
|
||||
queue->shutdown();
|
||||
|
||||
/// Tell thread to shutdown.
|
||||
queue->flush_event.notify_all();
|
||||
@ -105,8 +106,8 @@ void SystemLogQueue<LogElement>::add(const LogElement & element)
|
||||
{
|
||||
std::unique_lock lock(mutex);
|
||||
|
||||
// if (queue.is_shutdown)
|
||||
// return; // TODO
|
||||
if (is_shutdown)
|
||||
return;
|
||||
|
||||
if (queue.size() == DBMS_SYSTEM_LOG_QUEUE_SIZE / 2)
|
||||
{
|
||||
@ -191,18 +192,18 @@ uint64_t SystemLogBase<LogElement>::notifyFlushImpl(bool force)
|
||||
uint64_t this_thread_requested_offset;
|
||||
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
std::lock_guard lock(queue->mutex);
|
||||
if (is_shutdown)
|
||||
return uint64_t(-1);
|
||||
|
||||
this_thread_requested_offset = queue_front_index + queue.size();
|
||||
this_thread_requested_offset = queue->queue_front_index + queue->queue.size();
|
||||
|
||||
// Publish our flush request, taking care not to overwrite the requests
|
||||
// made by other threads.
|
||||
is_force_prepare_tables |= force;
|
||||
requested_flush_up_to = std::max(requested_flush_up_to, this_thread_requested_offset);
|
||||
queue->requested_flush_up_to = std::max(queue->requested_flush_up_to, this_thread_requested_offset);
|
||||
|
||||
flush_event.notify_all();
|
||||
queue->flush_event.notify_all();
|
||||
}
|
||||
|
||||
LOG_DEBUG(log, "Requested flush up to offset {}", this_thread_requested_offset);
|
||||
|
@ -78,10 +78,8 @@ public:
|
||||
|
||||
void add(const LogElement & element);
|
||||
size_t size() const { return queue.size(); }
|
||||
void push_back(const LogElement & element)
|
||||
{
|
||||
queue.push_back(element);
|
||||
}
|
||||
//void push_back(const LogElement & element) { queue.push_back(element); }
|
||||
void shutdown() { is_shutdown = true; }
|
||||
|
||||
// Queue is bounded. But its size is quite large to not block in all normal cases.
|
||||
std::vector<LogElement> queue;
|
||||
@ -101,7 +99,9 @@ public:
|
||||
// Logged overflow message at this queue front index
|
||||
uint64_t logged_queue_full_at_index = -1;
|
||||
|
||||
private:
|
||||
Poco::Logger * log;
|
||||
bool is_shutdown = false;
|
||||
};
|
||||
|
||||
template <typename LogElement>
|
||||
|
@ -12,7 +12,7 @@ namespace DB
|
||||
{
|
||||
template <typename> class SystemLogQueue;
|
||||
struct TextLogElement;
|
||||
using FooBar = SystemLogQueue<TextLogElement>;
|
||||
using TextLogQueue = SystemLogQueue<TextLogElement>;
|
||||
}
|
||||
#endif
|
||||
|
||||
@ -33,7 +33,7 @@ public:
|
||||
void addChannel(Poco::AutoPtr<Poco::Channel> channel, const std::string & name);
|
||||
|
||||
#ifndef WITHOUT_TEXT_LOG
|
||||
void addTextLog(std::shared_ptr<DB::FooBar> log_queue, int max_priority);
|
||||
void addTextLog(std::shared_ptr<DB::TextLogQueue> log_queue, int max_priority);
|
||||
#endif
|
||||
|
||||
void setLevel(const std::string & name, int level);
|
||||
@ -48,7 +48,7 @@ private:
|
||||
std::map<std::string, ExtendedChannelPtrPair> channels;
|
||||
|
||||
#ifndef WITHOUT_TEXT_LOG
|
||||
std::weak_ptr<DB::FooBar> text_log;
|
||||
std::weak_ptr<DB::TextLogQueue> text_log;
|
||||
std::atomic<int> text_log_max_priority = -1;
|
||||
#endif
|
||||
};
|
||||
|
Loading…
Reference in New Issue
Block a user