Reverting to previous no users thread functionality to avoid

keeping no users thread always alive for each live view.
This commit is contained in:
Vitaliy Zakaznikov 2019-08-25 08:27:47 -04:00
parent 2342d64d1b
commit 7fb13b12f9
4 changed files with 47 additions and 47 deletions

View File

@ -18,13 +18,10 @@ using NonBlockingResult = std::pair<Block, bool>;
public:
~LiveViewBlockInputStream() override
{
/// Wake up no users thread
{
std::cerr << "DEBUG: live view block input stream ... send wake up thread\n";
std::lock_guard lock(storage->no_users_thread_mutex);
storage->no_users_thread_wakeup = true;
storage->no_users_thread_condition.notify_one();
}
/// Start storage no users thread
/// if we are the last active user
if (!storage->is_dropped && blocks_ptr.use_count() < 3)
storage->startNoUsersThread(temporary_live_view_timeout_sec);
}
LiveViewBlockInputStream(std::shared_ptr<StorageLiveView> storage_,
@ -32,12 +29,14 @@ public:
std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr_,
std::shared_ptr<bool> active_ptr_,
const bool has_limit_, const UInt64 limit_,
const UInt64 heartbeat_interval_sec_)
const UInt64 heartbeat_interval_sec_,
const UInt64 temporary_live_view_timeout_sec_)
: storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)),
blocks_metadata_ptr(std::move(blocks_metadata_ptr_)),
active_ptr(std::move(active_ptr_)),
has_limit(has_limit_), limit(limit_),
heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000)
heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000),
temporary_live_view_timeout_sec(temporary_live_view_timeout_sec_)
{
/// grab active pointer
active = active_ptr.lock();
@ -206,6 +205,7 @@ private:
Int64 num_updates = -1;
bool end_of_blocks = false;
UInt64 heartbeat_interval_usec;
UInt64 temporary_live_view_timeout_sec;
UInt64 last_event_timestamp_usec = 0;
Poco::Timestamp timestamp;
};

View File

@ -37,12 +37,10 @@ using NonBlockingResult = std::pair<Block, bool>;
public:
~LiveViewEventsBlockInputStream() override
{
/// Wake up no users thread
{
std::lock_guard lock(storage->no_users_thread_mutex);
storage->no_users_thread_wakeup = true;
storage->no_users_thread_condition.notify_one();
}
/// Start storage no users thread
/// if we are the last active user
if (!storage->is_dropped && blocks_ptr.use_count() < 3)
storage->startNoUsersThread(temporary_live_view_timeout_sec);
}
/// length default -2 because we want LIMIT to specify number of updates so that LIMIT 1 waits for 1 update
/// and LIMIT 0 just returns data without waiting for any updates
@ -51,12 +49,14 @@ public:
std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr_,
std::shared_ptr<bool> active_ptr_,
const bool has_limit_, const UInt64 limit_,
const UInt64 heartbeat_interval_sec_)
const UInt64 heartbeat_interval_sec_,
const UInt64 temporary_live_view_timeout_sec_)
: storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)),
blocks_metadata_ptr(std::move(blocks_metadata_ptr_)),
active_ptr(std::move(active_ptr_)), has_limit(has_limit_),
limit(limit_),
heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000)
heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000),
temporary_live_view_timeout_sec(temporary_live_view_timeout_sec_)
{
/// grab active pointer
active = active_ptr.lock();
@ -241,6 +241,7 @@ private:
Int64 num_updates = -1;
bool end_of_blocks = false;
UInt64 heartbeat_interval_usec;
UInt64 temporary_live_view_timeout_sec;
UInt64 last_event_timestamp_usec = 0;
Poco::Timestamp timestamp;
};

View File

@ -366,7 +366,6 @@ void StorageLiveView::checkTableCanBeDropped() const
void StorageLiveView::noUsersThread(std::shared_ptr<StorageLiveView> storage, const UInt64 & timeout)
{
bool drop_table = false;
UInt64 next_timeout = timeout;
if (storage->shutdown_called)
return;
@ -375,33 +374,17 @@ void StorageLiveView::noUsersThread(std::shared_ptr<StorageLiveView> storage, co
while (1)
{
std::unique_lock lock(storage->no_users_thread_mutex);
if (!storage->no_users_thread_condition.wait_for(lock, std::chrono::seconds(next_timeout), [&] { return storage->no_users_thread_wakeup; }))
if (!storage->no_users_thread_condition.wait_for(lock, std::chrono::seconds(timeout), [&] { return storage->no_users_thread_wakeup; }))
{
storage->no_users_thread_wakeup = false;
if (storage->shutdown_called)
return;
if (storage->hasUsers())
{
/// Thread woke up but there are still users so sleep for 3 times longer than
/// the original timeout to reduce the number of times thread wakes up.
/// Wait until we are explicitely woken up when a user goes away to
/// reset wait time to the original timeout.
next_timeout = timeout * 3;
continue;
}
return;
if (!storage->global_context.getDependencies(storage->database_name, storage->table_name).empty())
continue;
drop_table = true;
}
else
{
/// Thread was explicitly awaken so reset timeout to the original
next_timeout = timeout;
storage->no_users_thread_wakeup = false;
if (storage->shutdown_called)
return;
continue;
}
break;
}
}
@ -430,24 +413,31 @@ void StorageLiveView::noUsersThread(std::shared_ptr<StorageLiveView> storage, co
void StorageLiveView::startNoUsersThread(const UInt64 & timeout)
{
bool expected = false;
if (!start_no_users_thread_called.compare_exchange_strong(expected, true))
return;
if (is_temporary)
{
if (no_users_thread.joinable())
{
/// If the thread is already running then
/// wake it up and just return
{
std::lock_guard lock(no_users_thread_mutex);
no_users_thread_wakeup = true;
no_users_thread_condition.notify_one();
return;
}
no_users_thread.join();
}
{
std::lock_guard lock(no_users_thread_mutex);
no_users_thread_wakeup = false;
}
if (!is_dropped)
no_users_thread = std::thread(&StorageLiveView::noUsersThread,
std::static_pointer_cast<StorageLiveView>(shared_from_this()), timeout);
}
start_no_users_thread_called = false;
}
void StorageLiveView::startup()
@ -541,7 +531,11 @@ BlockInputStreams StorageLiveView::watch(
if (query.is_watch_events)
{
auto reader = std::make_shared<LiveViewEventsBlockInputStream>(std::static_pointer_cast<StorageLiveView>(shared_from_this()), blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit, context.getSettingsRef().live_view_heartbeat_interval.totalSeconds());
auto reader = std::make_shared<LiveViewEventsBlockInputStream>(
std::static_pointer_cast<StorageLiveView>(shared_from_this()),
blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit,
context.getSettingsRef().live_view_heartbeat_interval.totalSeconds(),
context.getSettingsRef().temporary_live_view_timeout.totalSeconds());
if (no_users_thread.joinable())
{
@ -565,7 +559,11 @@ BlockInputStreams StorageLiveView::watch(
}
else
{
auto reader = std::make_shared<LiveViewBlockInputStream>(std::static_pointer_cast<StorageLiveView>(shared_from_this()), blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit, context.getSettingsRef().live_view_heartbeat_interval.totalSeconds());
auto reader = std::make_shared<LiveViewBlockInputStream>(
std::static_pointer_cast<StorageLiveView>(shared_from_this()),
blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit,
context.getSettingsRef().live_view_heartbeat_interval.totalSeconds(),
context.getSettingsRef().temporary_live_view_timeout.totalSeconds());
if (no_users_thread.joinable())
{

View File

@ -72,6 +72,7 @@ public:
return active_ptr.use_count() > 1;
}
/// No users thread mutex, predicate and wake up condition
void startNoUsersThread(const UInt64 & timeout);
std::mutex no_users_thread_mutex;
bool no_users_thread_wakeup = false;
std::condition_variable no_users_thread_condition;
@ -166,10 +167,10 @@ private:
/// Background thread for temporary tables
/// which drops this table if there are no users
void startNoUsersThread(const UInt64 & timeout);
static void noUsersThread(std::shared_ptr<StorageLiveView> storage, const UInt64 & timeout);
std::thread no_users_thread;
std::atomic<bool> shutdown_called = false;
std::atomic<bool> start_no_users_thread_called = false;
UInt64 temporary_live_view_timeout;
StorageLiveView(