slightly more robust ReplicatedMergeTreePartCheckThread::TemporarilyStop [#CLICKHOUSE-3074]

This commit is contained in:
Alexey Zatelepin 2017-07-12 19:15:16 +03:00 committed by alexey-milovidov
parent bdc529645e
commit 58d541b740
2 changed files with 19 additions and 10 deletions

View File

@ -26,24 +26,32 @@ ReplicatedMergeTreePartCheckThread::ReplicatedMergeTreePartCheckThread(StorageRe
void ReplicatedMergeTreePartCheckThread::start()
{
std::lock_guard<std::mutex> lock(start_stop_mutex);
if (need_stop)
need_stop = false;
else
thread = std::thread([this] { run(); });
}
void ReplicatedMergeTreePartCheckThread::stop()
{
need_stop = true;
wakeup_event.set();
std::lock_guard<std::mutex> lock(start_stop_mutex);
need_stop = true;
if (thread.joinable())
{
wakeup_event.set();
thread.join();
need_stop = false;
}
}
void ReplicatedMergeTreePartCheckThread::enqueuePart(const String & name, time_t delay_to_check_seconds)
{
std::lock_guard<std::mutex> lock(mutex);
std::lock_guard<std::mutex> lock(parts_mutex);
if (parts_set.count(name))
return;
@ -56,7 +64,7 @@ void ReplicatedMergeTreePartCheckThread::enqueuePart(const String & name, time_t
size_t ReplicatedMergeTreePartCheckThread::size() const
{
std::lock_guard<std::mutex> lock(mutex);
std::lock_guard<std::mutex> lock(parts_mutex);
return parts_set.size();
}
@ -304,7 +312,7 @@ void ReplicatedMergeTreePartCheckThread::run()
time_t min_check_time = std::numeric_limits<time_t>::max();
{
std::lock_guard<std::mutex> lock(mutex);
std::lock_guard<std::mutex> lock(parts_mutex);
if (parts_queue.empty())
{
@ -350,7 +358,7 @@ void ReplicatedMergeTreePartCheckThread::run()
/// Remove the part from check queue.
{
std::lock_guard<std::mutex> lock(mutex);
std::lock_guard<std::mutex> lock(parts_mutex);
if (parts_queue.empty())
{

View File

@ -88,12 +88,13 @@ private:
* - If we do not have a part, check to see if it (or the part covering it) exists anywhere on another replicas.
*/
mutable std::mutex parts_mutex;
StringSet parts_set;
PartsToCheckQueue parts_queue;
mutable std::mutex mutex;
Poco::Event wakeup_event;
std::atomic<bool> need_stop { false };
std::mutex start_stop_mutex;
std::atomic<bool> need_stop { false };
std::thread thread;
};