mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 17:41:59 +00:00
Rewriting implementation of LIVE VIEW no users thread.
This commit is contained in:
parent
f959c29be6
commit
62988800e6
@ -18,10 +18,13 @@ using NonBlockingResult = std::pair<Block, bool>;
|
||||
public:
|
||||
~LiveViewBlockInputStream() override
|
||||
{
|
||||
/// Start storage no users thread
|
||||
/// if we are the last active user
|
||||
if (!storage->is_dropped && blocks_ptr.use_count() < 3)
|
||||
storage->startNoUsersThread(temporary_live_view_timeout_sec);
|
||||
/// Wake up no users thread
|
||||
{
|
||||
std::cerr << "DEBUG: live view block input stream ... send wake up thread\n";
|
||||
std::lock_guard lock(storage->no_users_thread_mutex);
|
||||
storage->no_users_thread_wakeup = true;
|
||||
storage->no_users_thread_condition.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
LiveViewBlockInputStream(std::shared_ptr<StorageLiveView> storage_,
|
||||
@ -29,9 +32,12 @@ public:
|
||||
std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr_,
|
||||
std::shared_ptr<bool> active_ptr_,
|
||||
const bool has_limit_, const UInt64 limit_,
|
||||
const UInt64 heartbeat_interval_sec_,
|
||||
const UInt64 temporary_live_view_timeout_sec_)
|
||||
: storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)), blocks_metadata_ptr(std::move(blocks_metadata_ptr_)), active_ptr(std::move(active_ptr_)), has_limit(has_limit_), limit(limit_), heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000), temporary_live_view_timeout_sec(temporary_live_view_timeout_sec_)
|
||||
const UInt64 heartbeat_interval_sec_)
|
||||
: storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)),
|
||||
blocks_metadata_ptr(std::move(blocks_metadata_ptr_)),
|
||||
active_ptr(std::move(active_ptr_)),
|
||||
has_limit(has_limit_), limit(limit_),
|
||||
heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000)
|
||||
{
|
||||
/// grab active pointer
|
||||
active = active_ptr.lock();
|
||||
@ -200,7 +206,6 @@ private:
|
||||
Int64 num_updates = -1;
|
||||
bool end_of_blocks = false;
|
||||
UInt64 heartbeat_interval_usec;
|
||||
UInt64 temporary_live_view_timeout_sec;
|
||||
UInt64 last_event_timestamp_usec = 0;
|
||||
Poco::Timestamp timestamp;
|
||||
};
|
||||
|
@ -37,10 +37,12 @@ using NonBlockingResult = std::pair<Block, bool>;
|
||||
public:
|
||||
~LiveViewEventsBlockInputStream() override
|
||||
{
|
||||
/// Start storage no users thread
|
||||
/// if we are the last active user
|
||||
if (!storage->is_dropped && blocks_ptr.use_count() < 3)
|
||||
storage->startNoUsersThread(temporary_live_view_timeout_sec);
|
||||
/// Wake up no users thread
|
||||
{
|
||||
std::lock_guard lock(storage->no_users_thread_mutex);
|
||||
storage->no_users_thread_wakeup = true;
|
||||
storage->no_users_thread_condition.notify_one();
|
||||
}
|
||||
}
|
||||
/// length default -2 because we want LIMIT to specify number of updates so that LIMIT 1 waits for 1 update
|
||||
/// and LIMIT 0 just returns data without waiting for any updates
|
||||
@ -49,9 +51,12 @@ public:
|
||||
std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr_,
|
||||
std::shared_ptr<bool> active_ptr_,
|
||||
const bool has_limit_, const UInt64 limit_,
|
||||
const UInt64 heartbeat_interval_sec_,
|
||||
const UInt64 temporary_live_view_timeout_sec_)
|
||||
: storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)), blocks_metadata_ptr(std::move(blocks_metadata_ptr_)), active_ptr(std::move(active_ptr_)), has_limit(has_limit_), limit(limit_), heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000), temporary_live_view_timeout_sec(temporary_live_view_timeout_sec_)
|
||||
const UInt64 heartbeat_interval_sec_)
|
||||
: storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)),
|
||||
blocks_metadata_ptr(std::move(blocks_metadata_ptr_)),
|
||||
active_ptr(std::move(active_ptr_)), has_limit(has_limit_),
|
||||
limit(limit_),
|
||||
heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000)
|
||||
{
|
||||
/// grab active pointer
|
||||
active = active_ptr.lock();
|
||||
@ -236,7 +241,6 @@ private:
|
||||
Int64 num_updates = -1;
|
||||
bool end_of_blocks = false;
|
||||
UInt64 heartbeat_interval_usec;
|
||||
UInt64 temporary_live_view_timeout_sec;
|
||||
UInt64 last_event_timestamp_usec = 0;
|
||||
Poco::Timestamp timestamp;
|
||||
};
|
||||
|
@ -363,45 +363,49 @@ void StorageLiveView::checkTableCanBeDropped() const
|
||||
}
|
||||
}
|
||||
|
||||
void StorageLiveView::noUsersThread(const UInt64 & timeout)
|
||||
void StorageLiveView::noUsersThread(std::shared_ptr<StorageLiveView> storage, const UInt64 & timeout)
|
||||
{
|
||||
if (shutdown_called)
|
||||
return;
|
||||
|
||||
bool drop_table = false;
|
||||
|
||||
if (storage->shutdown_called || storage->is_dropped)
|
||||
return;
|
||||
|
||||
{
|
||||
while (1)
|
||||
{
|
||||
std::unique_lock lock(no_users_thread_mutex);
|
||||
if (!no_users_thread_condition.wait_for(lock, std::chrono::seconds(timeout), [&] { return no_users_thread_wakeup; }))
|
||||
std::unique_lock lock(storage->no_users_thread_mutex);
|
||||
if(!storage->no_users_thread_condition.wait_for(lock, std::chrono::seconds(timeout), [&] { return storage->no_users_thread_wakeup; }))
|
||||
{
|
||||
no_users_thread_wakeup = false;
|
||||
if (shutdown_called)
|
||||
storage->no_users_thread_wakeup = false;
|
||||
if (storage->shutdown_called || storage->is_dropped)
|
||||
return;
|
||||
if (hasUsers())
|
||||
return;
|
||||
if (!global_context.getDependencies(database_name, table_name).empty())
|
||||
if (storage->hasUsers())
|
||||
continue;
|
||||
if (!storage->global_context.getDependencies(storage->database_name, storage->table_name).empty())
|
||||
continue;
|
||||
drop_table = true;
|
||||
}
|
||||
else {
|
||||
storage->no_users_thread_wakeup = false;
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (drop_table)
|
||||
{
|
||||
if (global_context.tryGetTable(database_name, table_name))
|
||||
if (storage->global_context.tryGetTable(storage->database_name, storage->table_name))
|
||||
{
|
||||
try
|
||||
{
|
||||
/// We create and execute `drop` query for this table
|
||||
auto drop_query = std::make_shared<ASTDropQuery>();
|
||||
drop_query->database = database_name;
|
||||
drop_query->table = table_name;
|
||||
drop_query->database = storage->database_name;
|
||||
drop_query->table = storage->table_name;
|
||||
drop_query->kind = ASTDropQuery::Kind::Drop;
|
||||
ASTPtr ast_drop_query = drop_query;
|
||||
InterpreterDropQuery drop_interpreter(ast_drop_query, global_context);
|
||||
InterpreterDropQuery drop_interpreter(ast_drop_query, storage->global_context);
|
||||
drop_interpreter.execute();
|
||||
}
|
||||
catch (...)
|
||||
@ -413,13 +417,6 @@ void StorageLiveView::noUsersThread(const UInt64 & timeout)
|
||||
|
||||
void StorageLiveView::startNoUsersThread(const UInt64 & timeout)
|
||||
{
|
||||
bool expected = false;
|
||||
if (!start_no_users_thread_called.compare_exchange_strong(expected, true))
|
||||
return;
|
||||
|
||||
if (is_dropped)
|
||||
return;
|
||||
|
||||
if (is_temporary)
|
||||
{
|
||||
if (no_users_thread.joinable())
|
||||
@ -435,10 +432,9 @@ void StorageLiveView::startNoUsersThread(const UInt64 & timeout)
|
||||
std::lock_guard lock(no_users_thread_mutex);
|
||||
no_users_thread_wakeup = false;
|
||||
}
|
||||
if (!is_dropped)
|
||||
no_users_thread = std::thread(&StorageLiveView::noUsersThread, this, timeout);
|
||||
no_users_thread = std::thread(&StorageLiveView::noUsersThread,
|
||||
std::static_pointer_cast<StorageLiveView>(shared_from_this()), timeout);
|
||||
}
|
||||
start_no_users_thread_called = false;
|
||||
}
|
||||
|
||||
void StorageLiveView::startup()
|
||||
@ -454,19 +450,19 @@ void StorageLiveView::shutdown()
|
||||
|
||||
if (no_users_thread.joinable())
|
||||
{
|
||||
std::lock_guard lock(no_users_thread_mutex);
|
||||
no_users_thread_wakeup = true;
|
||||
no_users_thread_condition.notify_one();
|
||||
/// Must detach the no users thread
|
||||
/// as we can't join it as it will result
|
||||
/// in a deadlock
|
||||
no_users_thread.detach(); /// TODO Not viable at all.
|
||||
{
|
||||
std::lock_guard lock(no_users_thread_mutex);
|
||||
no_users_thread_wakeup = true;
|
||||
no_users_thread_condition.notify_one();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
StorageLiveView::~StorageLiveView()
|
||||
{
|
||||
shutdown();
|
||||
if (no_users_thread.joinable())
|
||||
no_users_thread.detach();
|
||||
}
|
||||
|
||||
void StorageLiveView::drop()
|
||||
@ -532,8 +528,7 @@ BlockInputStreams StorageLiveView::watch(
|
||||
|
||||
if (query.is_watch_events)
|
||||
{
|
||||
auto reader = std::make_shared<LiveViewEventsBlockInputStream>(std::static_pointer_cast<StorageLiveView>(shared_from_this()), blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit, context.getSettingsRef().live_view_heartbeat_interval.totalSeconds(),
|
||||
context.getSettingsRef().temporary_live_view_timeout.totalSeconds());
|
||||
auto reader = std::make_shared<LiveViewEventsBlockInputStream>(std::static_pointer_cast<StorageLiveView>(shared_from_this()), blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit, context.getSettingsRef().live_view_heartbeat_interval.totalSeconds());
|
||||
|
||||
if (no_users_thread.joinable())
|
||||
{
|
||||
@ -557,8 +552,7 @@ BlockInputStreams StorageLiveView::watch(
|
||||
}
|
||||
else
|
||||
{
|
||||
auto reader = std::make_shared<LiveViewBlockInputStream>(std::static_pointer_cast<StorageLiveView>(shared_from_this()), blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit, context.getSettingsRef().live_view_heartbeat_interval.totalSeconds(),
|
||||
context.getSettingsRef().temporary_live_view_timeout.totalSeconds());
|
||||
auto reader = std::make_shared<LiveViewBlockInputStream>(std::static_pointer_cast<StorageLiveView>(shared_from_this()), blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit, context.getSettingsRef().live_view_heartbeat_interval.totalSeconds());
|
||||
|
||||
if (no_users_thread.joinable())
|
||||
{
|
||||
|
@ -71,11 +71,9 @@ public:
|
||||
{
|
||||
return active_ptr.use_count() > 1;
|
||||
}
|
||||
/// Background thread for temporary tables
|
||||
/// which drops this table if there are no users
|
||||
void startNoUsersThread(const UInt64 & timeout);
|
||||
/// No users thread mutex, predicate and wake up condition
|
||||
std::mutex no_users_thread_mutex;
|
||||
bool no_users_thread_wakeup{false};
|
||||
bool no_users_thread_wakeup = false;
|
||||
std::condition_variable no_users_thread_condition;
|
||||
/// Get blocks hash
|
||||
/// must be called with mutex locked
|
||||
@ -149,7 +147,7 @@ private:
|
||||
String database_name;
|
||||
ASTPtr inner_query;
|
||||
Context & global_context;
|
||||
bool is_temporary {false};
|
||||
bool is_temporary = false;
|
||||
mutable Block sample_block;
|
||||
|
||||
/// Mutex for the blocks and ready condition
|
||||
@ -166,10 +164,12 @@ private:
|
||||
std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr;
|
||||
BlocksPtrs mergeable_blocks;
|
||||
|
||||
void noUsersThread(const UInt64 & timeout);
|
||||
/// Background thread for temporary tables
|
||||
/// which drops this table if there are no users
|
||||
void startNoUsersThread(const UInt64 & timeout);
|
||||
static void noUsersThread(std::shared_ptr<StorageLiveView> storage, const UInt64 & timeout);
|
||||
std::thread no_users_thread;
|
||||
std::atomic<bool> shutdown_called{false};
|
||||
std::atomic<bool> start_no_users_thread_called{false};
|
||||
std::atomic<bool> shutdown_called = false;
|
||||
UInt64 temporary_live_view_timeout;
|
||||
|
||||
StorageLiveView(
|
||||
|
Loading…
Reference in New Issue
Block a user