mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-23 18:20:50 +00:00
fix livelock with watch queries and database atomic
This commit is contained in:
parent
ff0ae624ef
commit
5ffb7372af
@ -46,7 +46,7 @@ public:
|
|||||||
|
|
||||||
void cancel(bool kill) override
|
void cancel(bool kill) override
|
||||||
{
|
{
|
||||||
if (isCancelled() || storage->is_dropped)
|
if (isCancelled() || storage->shutdown_called)
|
||||||
return;
|
return;
|
||||||
IBlockInputStream::cancel(kill);
|
IBlockInputStream::cancel(kill);
|
||||||
std::lock_guard lock(storage->mutex);
|
std::lock_guard lock(storage->mutex);
|
||||||
@ -115,7 +115,7 @@ protected:
|
|||||||
end = blocks->end();
|
end = blocks->end();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isCancelled() || storage->is_dropped)
|
if (isCancelled() || storage->shutdown_called)
|
||||||
{
|
{
|
||||||
return { Block(), true };
|
return { Block(), true };
|
||||||
}
|
}
|
||||||
@ -155,7 +155,7 @@ protected:
|
|||||||
bool signaled = std::cv_status::no_timeout == storage->condition.wait_for(lock,
|
bool signaled = std::cv_status::no_timeout == storage->condition.wait_for(lock,
|
||||||
std::chrono::microseconds(std::max(UInt64(0), heartbeat_interval_usec - (timestamp_usec - last_event_timestamp_usec))));
|
std::chrono::microseconds(std::max(UInt64(0), heartbeat_interval_usec - (timestamp_usec - last_event_timestamp_usec))));
|
||||||
|
|
||||||
if (isCancelled() || storage->is_dropped)
|
if (isCancelled() || storage->shutdown_called)
|
||||||
{
|
{
|
||||||
return { Block(), true };
|
return { Block(), true };
|
||||||
}
|
}
|
||||||
|
@ -65,7 +65,7 @@ public:
|
|||||||
|
|
||||||
void cancel(bool kill) override
|
void cancel(bool kill) override
|
||||||
{
|
{
|
||||||
if (isCancelled() || storage->is_dropped)
|
if (isCancelled() || storage->shutdown_called)
|
||||||
return;
|
return;
|
||||||
IBlockInputStream::cancel(kill);
|
IBlockInputStream::cancel(kill);
|
||||||
std::lock_guard lock(storage->mutex);
|
std::lock_guard lock(storage->mutex);
|
||||||
@ -149,7 +149,7 @@ protected:
|
|||||||
end = blocks->end();
|
end = blocks->end();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isCancelled() || storage->is_dropped)
|
if (isCancelled() || storage->shutdown_called)
|
||||||
{
|
{
|
||||||
return { Block(), true };
|
return { Block(), true };
|
||||||
}
|
}
|
||||||
@ -190,7 +190,7 @@ protected:
|
|||||||
bool signaled = std::cv_status::no_timeout == storage->condition.wait_for(lock,
|
bool signaled = std::cv_status::no_timeout == storage->condition.wait_for(lock,
|
||||||
std::chrono::microseconds(std::max(UInt64(0), heartbeat_interval_usec - (timestamp_usec - last_event_timestamp_usec))));
|
std::chrono::microseconds(std::max(UInt64(0), heartbeat_interval_usec - (timestamp_usec - last_event_timestamp_usec))));
|
||||||
|
|
||||||
if (isCancelled() || storage->is_dropped)
|
if (isCancelled() || storage->shutdown_called)
|
||||||
{
|
{
|
||||||
return { Block(), true };
|
return { Block(), true };
|
||||||
}
|
}
|
||||||
|
@ -468,6 +468,10 @@ void StorageLiveView::shutdown()
|
|||||||
if (!shutdown_called.compare_exchange_strong(expected, true))
|
if (!shutdown_called.compare_exchange_strong(expected, true))
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
/// WATCH queries should be stopped after setting shutdown_called to true.
|
||||||
|
/// Otherwise livelock is possible for LiveView table in Atomic database:
|
||||||
|
/// WATCH query will wait for table to be dropped and DatabaseCatalog will wait for queries to finish
|
||||||
|
|
||||||
{
|
{
|
||||||
std::lock_guard no_users_thread_lock(no_users_thread_mutex);
|
std::lock_guard no_users_thread_lock(no_users_thread_mutex);
|
||||||
if (no_users_thread.joinable())
|
if (no_users_thread.joinable())
|
||||||
|
Loading…
Reference in New Issue
Block a user