mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Use join() instead of detach() for the no_users_thread in StorageLiveView.
This commit is contained in:
parent
0f3351d983
commit
2a9ab48279
@ -23,6 +23,7 @@
|
||||
#include <Storages/MergeTree/MergeTreeSettings.h>
|
||||
#include <Storages/CompressionCodecSelector.h>
|
||||
#include <Storages/StorageS3Settings.h>
|
||||
#include <Storages/LiveView/TemporaryLiveViewCleaner.h>
|
||||
#include <Disks/DiskLocal.h>
|
||||
#include <TableFunctions/TableFunctionFactory.h>
|
||||
#include <Interpreters/ActionLocksManager.h>
|
||||
@ -423,6 +424,7 @@ struct ContextShared
|
||||
if (system_logs)
|
||||
system_logs->shutdown();
|
||||
|
||||
TemporaryLiveViewCleaner::shutdown();
|
||||
DatabaseCatalog::shutdown();
|
||||
|
||||
/// Preemptive destruction is important, because these objects may have a refcount to ContextShared (cyclic reference).
|
||||
@ -479,6 +481,12 @@ Context Context::createGlobal(ContextShared * shared)
|
||||
return res;
|
||||
}
|
||||
|
||||
void Context::initGlobal()
|
||||
{
|
||||
DatabaseCatalog::init(this);
|
||||
TemporaryLiveViewCleaner::init(*this);
|
||||
}
|
||||
|
||||
SharedContextHolder Context::createShared()
|
||||
{
|
||||
return SharedContextHolder(std::make_unique<ContextShared>());
|
||||
|
@ -445,11 +445,7 @@ public:
|
||||
|
||||
void makeQueryContext() { query_context = this; }
|
||||
void makeSessionContext() { session_context = this; }
|
||||
void makeGlobalContext()
|
||||
{
|
||||
global_context = this;
|
||||
DatabaseCatalog::init(this);
|
||||
}
|
||||
void makeGlobalContext() { initGlobal(); global_context = this; }
|
||||
|
||||
const Settings & getSettingsRef() const { return settings; }
|
||||
|
||||
@ -622,6 +618,8 @@ public:
|
||||
private:
|
||||
std::unique_lock<std::recursive_mutex> getLock() const;
|
||||
|
||||
void initGlobal();
|
||||
|
||||
/// Compute and set actual user settings, client_info.current_user should be set
|
||||
void calculateAccessRights();
|
||||
|
||||
|
@ -10,6 +10,7 @@ namespace DB
|
||||
{
|
||||
class Context;
|
||||
using DatabaseAndTable = std::pair<DatabasePtr, StoragePtr>;
|
||||
class AccessRightsElements;
|
||||
|
||||
/** Allow to either drop table with all its data (DROP),
|
||||
* or remove information about table (just forget) from server (DETACH),
|
||||
|
@ -16,27 +16,17 @@ class LiveViewBlockInputStream : public IBlockInputStream
|
||||
using NonBlockingResult = std::pair<Block, bool>;
|
||||
|
||||
public:
|
||||
~LiveViewBlockInputStream() override
|
||||
{
|
||||
/// Start storage no users thread
|
||||
/// if we are the last active user
|
||||
if (!storage->is_dropped && blocks_ptr.use_count() < 3)
|
||||
storage->startNoUsersThread(temporary_live_view_timeout_sec);
|
||||
}
|
||||
|
||||
LiveViewBlockInputStream(std::shared_ptr<StorageLiveView> storage_,
|
||||
std::shared_ptr<BlocksPtr> blocks_ptr_,
|
||||
std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr_,
|
||||
std::shared_ptr<bool> active_ptr_,
|
||||
const bool has_limit_, const UInt64 limit_,
|
||||
const UInt64 heartbeat_interval_sec_,
|
||||
const UInt64 temporary_live_view_timeout_sec_)
|
||||
const UInt64 heartbeat_interval_sec_)
|
||||
: storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)),
|
||||
blocks_metadata_ptr(std::move(blocks_metadata_ptr_)),
|
||||
active_ptr(std::move(active_ptr_)),
|
||||
has_limit(has_limit_), limit(limit_),
|
||||
heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000),
|
||||
temporary_live_view_timeout_sec(temporary_live_view_timeout_sec_)
|
||||
heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000)
|
||||
{
|
||||
/// grab active pointer
|
||||
active = active_ptr.lock();
|
||||
@ -205,7 +195,6 @@ private:
|
||||
Int64 num_updates = -1;
|
||||
bool end_of_blocks = false;
|
||||
UInt64 heartbeat_interval_usec;
|
||||
UInt64 temporary_live_view_timeout_sec;
|
||||
UInt64 last_event_timestamp_usec = 0;
|
||||
};
|
||||
|
||||
|
@ -34,13 +34,6 @@ class LiveViewEventsBlockInputStream : public IBlockInputStream
|
||||
using NonBlockingResult = std::pair<Block, bool>;
|
||||
|
||||
public:
|
||||
~LiveViewEventsBlockInputStream() override
|
||||
{
|
||||
/// Start storage no users thread
|
||||
/// if we are the last active user
|
||||
if (!storage->is_dropped && blocks_ptr.use_count() < 3)
|
||||
storage->startNoUsersThread(temporary_live_view_timeout_sec);
|
||||
}
|
||||
/// length default -2 because we want LIMIT to specify number of updates so that LIMIT 1 waits for 1 update
|
||||
/// and LIMIT 0 just returns data without waiting for any updates
|
||||
LiveViewEventsBlockInputStream(std::shared_ptr<StorageLiveView> storage_,
|
||||
@ -48,14 +41,12 @@ public:
|
||||
std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr_,
|
||||
std::shared_ptr<bool> active_ptr_,
|
||||
const bool has_limit_, const UInt64 limit_,
|
||||
const UInt64 heartbeat_interval_sec_,
|
||||
const UInt64 temporary_live_view_timeout_sec_)
|
||||
const UInt64 heartbeat_interval_sec_)
|
||||
: storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)),
|
||||
blocks_metadata_ptr(std::move(blocks_metadata_ptr_)),
|
||||
active_ptr(std::move(active_ptr_)), has_limit(has_limit_),
|
||||
limit(limit_),
|
||||
heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000),
|
||||
temporary_live_view_timeout_sec(temporary_live_view_timeout_sec_)
|
||||
heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000)
|
||||
{
|
||||
/// grab active pointer
|
||||
active = active_ptr.lock();
|
||||
@ -236,7 +227,6 @@ private:
|
||||
Int64 num_updates = -1;
|
||||
bool end_of_blocks = false;
|
||||
UInt64 heartbeat_interval_usec;
|
||||
UInt64 temporary_live_view_timeout_sec;
|
||||
UInt64 last_event_timestamp_usec = 0;
|
||||
Poco::Timestamp timestamp;
|
||||
};
|
||||
|
@ -12,10 +12,8 @@ limitations under the License. */
|
||||
#include <Parsers/ASTSelectQuery.h>
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
#include <Parsers/ASTWatchQuery.h>
|
||||
#include <Parsers/ASTDropQuery.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/InterpreterDropQuery.h>
|
||||
#include <Interpreters/InterpreterSelectQuery.h>
|
||||
#include <DataStreams/IBlockOutputStream.h>
|
||||
#include <DataStreams/OneBlockInputStream.h>
|
||||
@ -31,6 +29,7 @@ limitations under the License. */
|
||||
#include <Storages/LiveView/LiveViewBlockOutputStream.h>
|
||||
#include <Storages/LiveView/LiveViewEventsBlockInputStream.h>
|
||||
#include <Storages/LiveView/StorageBlocks.h>
|
||||
#include <Storages/LiveView/TemporaryLiveViewCleaner.h>
|
||||
|
||||
#include <Storages/StorageFactory.h>
|
||||
#include <Parsers/ASTTablesInSelectQuery.h>
|
||||
@ -276,7 +275,7 @@ StorageLiveView::StorageLiveView(
|
||||
if (query.live_view_timeout)
|
||||
{
|
||||
is_temporary = true;
|
||||
temporary_live_view_timeout = *query.live_view_timeout;
|
||||
temporary_live_view_timeout = std::chrono::seconds{*query.live_view_timeout};
|
||||
}
|
||||
|
||||
blocks_ptr = std::make_shared<BlocksPtr>();
|
||||
@ -384,128 +383,21 @@ void StorageLiveView::checkTableCanBeDropped() const
|
||||
}
|
||||
}
|
||||
|
||||
void StorageLiveView::noUsersThread(std::shared_ptr<StorageLiveView> storage, const UInt64 & timeout)
|
||||
{
|
||||
bool drop_table = false;
|
||||
|
||||
if (storage->shutdown_called)
|
||||
return;
|
||||
|
||||
auto table_id = storage->getStorageID();
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
std::unique_lock lock(storage->no_users_thread_wakeup_mutex);
|
||||
if (!storage->no_users_thread_condition.wait_for(lock, std::chrono::seconds(timeout), [&] { return storage->no_users_thread_wakeup; }))
|
||||
{
|
||||
storage->no_users_thread_wakeup = false;
|
||||
if (storage->shutdown_called)
|
||||
return;
|
||||
if (storage->hasUsers())
|
||||
return;
|
||||
if (!DatabaseCatalog::instance().getDependencies(table_id).empty())
|
||||
continue;
|
||||
drop_table = true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (drop_table)
|
||||
{
|
||||
if (DatabaseCatalog::instance().tryGetTable(table_id, storage->global_context))
|
||||
{
|
||||
try
|
||||
{
|
||||
/// We create and execute `drop` query for this table
|
||||
auto drop_query = std::make_shared<ASTDropQuery>();
|
||||
drop_query->database = table_id.database_name;
|
||||
drop_query->table = table_id.table_name;
|
||||
drop_query->kind = ASTDropQuery::Kind::Drop;
|
||||
ASTPtr ast_drop_query = drop_query;
|
||||
InterpreterDropQuery drop_interpreter(ast_drop_query, storage->global_context);
|
||||
drop_interpreter.execute();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void StorageLiveView::startNoUsersThread(const UInt64 & timeout)
|
||||
{
|
||||
bool expected = false;
|
||||
if (!start_no_users_thread_called.compare_exchange_strong(expected, true))
|
||||
return;
|
||||
|
||||
if (is_temporary)
|
||||
{
|
||||
std::lock_guard no_users_thread_lock(no_users_thread_mutex);
|
||||
|
||||
if (shutdown_called)
|
||||
return;
|
||||
|
||||
if (no_users_thread.joinable())
|
||||
{
|
||||
{
|
||||
std::lock_guard lock(no_users_thread_wakeup_mutex);
|
||||
no_users_thread_wakeup = true;
|
||||
no_users_thread_condition.notify_one();
|
||||
}
|
||||
no_users_thread.join();
|
||||
}
|
||||
{
|
||||
std::lock_guard lock(no_users_thread_wakeup_mutex);
|
||||
no_users_thread_wakeup = false;
|
||||
}
|
||||
if (!is_dropped)
|
||||
no_users_thread = std::thread(&StorageLiveView::noUsersThread,
|
||||
std::static_pointer_cast<StorageLiveView>(shared_from_this()), timeout);
|
||||
}
|
||||
|
||||
start_no_users_thread_called = false;
|
||||
}
|
||||
|
||||
void StorageLiveView::startup()
|
||||
{
|
||||
startNoUsersThread(temporary_live_view_timeout);
|
||||
if (is_temporary)
|
||||
TemporaryLiveViewCleaner::instance().addView(std::static_pointer_cast<StorageLiveView>(shared_from_this()));
|
||||
}
|
||||
|
||||
void StorageLiveView::shutdown()
|
||||
{
|
||||
shutdown_called = true;
|
||||
DatabaseCatalog::instance().removeDependency(select_table_id, getStorageID());
|
||||
bool expected = false;
|
||||
if (!shutdown_called.compare_exchange_strong(expected, true))
|
||||
return;
|
||||
|
||||
/// WATCH queries should be stopped after setting shutdown_called to true.
|
||||
/// Otherwise livelock is possible for LiveView table in Atomic database:
|
||||
/// WATCH query will wait for table to be dropped and DatabaseCatalog will wait for queries to finish
|
||||
|
||||
{
|
||||
std::lock_guard no_users_thread_lock(no_users_thread_mutex);
|
||||
if (no_users_thread.joinable())
|
||||
{
|
||||
{
|
||||
std::lock_guard lock(no_users_thread_wakeup_mutex);
|
||||
no_users_thread_wakeup = true;
|
||||
no_users_thread_condition.notify_one();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
StorageLiveView::~StorageLiveView()
|
||||
{
|
||||
shutdown();
|
||||
|
||||
{
|
||||
std::lock_guard lock(no_users_thread_mutex);
|
||||
if (no_users_thread.joinable())
|
||||
no_users_thread.detach();
|
||||
}
|
||||
}
|
||||
|
||||
void StorageLiveView::drop()
|
||||
@ -572,18 +464,7 @@ BlockInputStreams StorageLiveView::watch(
|
||||
auto reader = std::make_shared<LiveViewEventsBlockInputStream>(
|
||||
std::static_pointer_cast<StorageLiveView>(shared_from_this()),
|
||||
blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit,
|
||||
context.getSettingsRef().live_view_heartbeat_interval.totalSeconds(),
|
||||
temporary_live_view_timeout);
|
||||
|
||||
{
|
||||
std::lock_guard no_users_thread_lock(no_users_thread_mutex);
|
||||
if (no_users_thread.joinable())
|
||||
{
|
||||
std::lock_guard lock(no_users_thread_wakeup_mutex);
|
||||
no_users_thread_wakeup = true;
|
||||
no_users_thread_condition.notify_one();
|
||||
}
|
||||
}
|
||||
context.getSettingsRef().live_view_heartbeat_interval.totalSeconds());
|
||||
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
@ -603,18 +484,7 @@ BlockInputStreams StorageLiveView::watch(
|
||||
auto reader = std::make_shared<LiveViewBlockInputStream>(
|
||||
std::static_pointer_cast<StorageLiveView>(shared_from_this()),
|
||||
blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit,
|
||||
context.getSettingsRef().live_view_heartbeat_interval.totalSeconds(),
|
||||
temporary_live_view_timeout);
|
||||
|
||||
{
|
||||
std::lock_guard no_users_thread_lock(no_users_thread_mutex);
|
||||
if (no_users_thread.joinable())
|
||||
{
|
||||
std::lock_guard lock(no_users_thread_wakeup_mutex);
|
||||
no_users_thread_wakeup = true;
|
||||
no_users_thread_condition.notify_one();
|
||||
}
|
||||
}
|
||||
context.getSettingsRef().live_view_heartbeat_interval.totalSeconds());
|
||||
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
|
@ -38,6 +38,10 @@ using ASTPtr = std::shared_ptr<IAST>;
|
||||
using BlocksMetadataPtr = std::shared_ptr<BlocksMetadata>;
|
||||
using MergeableBlocksPtr = std::shared_ptr<MergeableBlocks>;
|
||||
|
||||
class Pipe;
|
||||
using Pipes = std::vector<Pipe>;
|
||||
|
||||
|
||||
class StorageLiveView final : public ext::shared_ptr_helper<StorageLiveView>, public IStorage
|
||||
{
|
||||
friend struct ext::shared_ptr_helper<StorageLiveView>;
|
||||
@ -70,7 +74,9 @@ public:
|
||||
|
||||
NamesAndTypesList getVirtuals() const override;
|
||||
|
||||
bool isTemporary() { return is_temporary; }
|
||||
bool isTemporary() const { return is_temporary; }
|
||||
std::chrono::seconds getTimeout() const { return temporary_live_view_timeout; }
|
||||
|
||||
|
||||
/// Check if we have any readers
|
||||
/// must be called with mutex locked
|
||||
@ -85,11 +91,7 @@ public:
|
||||
{
|
||||
return active_ptr.use_count() > 1;
|
||||
}
|
||||
/// No users thread mutex, predicate and wake up condition
|
||||
void startNoUsersThread(const UInt64 & timeout);
|
||||
std::mutex no_users_thread_wakeup_mutex;
|
||||
bool no_users_thread_wakeup = false;
|
||||
std::condition_variable no_users_thread_condition;
|
||||
|
||||
/// Get blocks hash
|
||||
/// must be called with mutex locked
|
||||
String getBlocksHashKey()
|
||||
@ -175,6 +177,8 @@ private:
|
||||
std::unique_ptr<Context> live_view_context;
|
||||
|
||||
bool is_temporary = false;
|
||||
std::chrono::seconds temporary_live_view_timeout;
|
||||
|
||||
/// Mutex to protect access to sample block and inner_blocks_query
|
||||
mutable std::mutex sample_block_lock;
|
||||
mutable Block sample_block;
|
||||
@ -193,14 +197,7 @@ private:
|
||||
std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr;
|
||||
MergeableBlocksPtr mergeable_blocks;
|
||||
|
||||
/// Background thread for temporary tables
|
||||
/// which drops this table if there are no users
|
||||
static void noUsersThread(std::shared_ptr<StorageLiveView> storage, const UInt64 & timeout);
|
||||
std::mutex no_users_thread_mutex;
|
||||
std::thread no_users_thread;
|
||||
std::atomic<bool> shutdown_called = false;
|
||||
std::atomic<bool> start_no_users_thread_called = false;
|
||||
UInt64 temporary_live_view_timeout;
|
||||
|
||||
StorageLiveView(
|
||||
const StorageID & table_id_,
|
||||
|
148
src/Storages/LiveView/TemporaryLiveViewCleaner.cpp
Normal file
148
src/Storages/LiveView/TemporaryLiveViewCleaner.cpp
Normal file
@ -0,0 +1,148 @@
|
||||
#include <Storages/LiveView/TemporaryLiveViewCleaner.h>
|
||||
#include <Storages/LiveView/StorageLiveView.h>
|
||||
#include <Interpreters/DatabaseCatalog.h>
|
||||
#include <Interpreters/InterpreterDropQuery.h>
|
||||
#include <Parsers/ASTDropQuery.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
void executeDropQuery(const StorageID & storage_id, Context & context)
|
||||
{
|
||||
if (!DatabaseCatalog::instance().isTableExist(storage_id, context))
|
||||
return;
|
||||
try
|
||||
{
|
||||
/// We create and execute `drop` query for this table
|
||||
auto drop_query = std::make_shared<ASTDropQuery>();
|
||||
drop_query->database = storage_id.database_name;
|
||||
drop_query->table = storage_id.table_name;
|
||||
drop_query->kind = ASTDropQuery::Kind::Drop;
|
||||
ASTPtr ast_drop_query = drop_query;
|
||||
InterpreterDropQuery drop_interpreter(ast_drop_query, context);
|
||||
drop_interpreter.execute();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
std::unique_ptr<TemporaryLiveViewCleaner> TemporaryLiveViewCleaner::the_instance;
|
||||
|
||||
|
||||
void TemporaryLiveViewCleaner::init(Context & global_context_)
|
||||
{
|
||||
if (the_instance)
|
||||
throw Exception("TemporaryLiveViewCleaner already initialized", ErrorCodes::LOGICAL_ERROR);
|
||||
the_instance.reset(new TemporaryLiveViewCleaner(global_context_));
|
||||
}
|
||||
|
||||
|
||||
void TemporaryLiveViewCleaner::shutdown()
|
||||
{
|
||||
the_instance.reset();
|
||||
}
|
||||
|
||||
|
||||
TemporaryLiveViewCleaner::TemporaryLiveViewCleaner(Context & global_context_)
|
||||
: global_context(global_context_)
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
TemporaryLiveViewCleaner::~TemporaryLiveViewCleaner()
|
||||
{
|
||||
stopBackgroundThread();
|
||||
}
|
||||
|
||||
|
||||
void TemporaryLiveViewCleaner::addView(const std::shared_ptr<StorageLiveView> & view)
|
||||
{
|
||||
if (!view->isTemporary())
|
||||
return;
|
||||
|
||||
auto current_time = std::chrono::system_clock::now();
|
||||
auto time_of_next_check = current_time + view->getTimeout();
|
||||
|
||||
std::lock_guard lock{mutex};
|
||||
|
||||
/// Keep the vector `views` sorted by time of next check.
|
||||
StorageAndTimeOfCheck storage_and_time_of_check{view, time_of_next_check};
|
||||
views.insert(std::upper_bound(views.begin(), views.end(), storage_and_time_of_check), storage_and_time_of_check);
|
||||
|
||||
if (!background_thread.joinable())
|
||||
background_thread = ThreadFromGlobalPool{&TemporaryLiveViewCleaner::backgroundThreadFunc, this};
|
||||
|
||||
background_thread_wake_up.notify_one();
|
||||
}
|
||||
|
||||
|
||||
void TemporaryLiveViewCleaner::backgroundThreadFunc()
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
while (!background_thread_should_exit && !views.empty())
|
||||
{
|
||||
background_thread_wake_up.wait_until(lock, views.front().time_of_check);
|
||||
if (background_thread_should_exit)
|
||||
return;
|
||||
|
||||
auto current_time = std::chrono::system_clock::now();
|
||||
std::vector<StorageID> storages_to_drop;
|
||||
|
||||
auto it = views.begin();
|
||||
while (it != views.end())
|
||||
{
|
||||
std::shared_ptr<StorageLiveView> storage = it->storage.lock();
|
||||
auto & time_of_check = it->time_of_check;
|
||||
if (!storage)
|
||||
{
|
||||
/// Storage has been already removed.
|
||||
it = views.erase(it);
|
||||
continue;
|
||||
}
|
||||
|
||||
++it;
|
||||
|
||||
if (current_time < time_of_check)
|
||||
break; /// It's not the time to check it yet.
|
||||
|
||||
time_of_check = current_time + storage->getTimeout();
|
||||
|
||||
auto storage_id = storage->getStorageID();
|
||||
if (storage->hasUsers() || !DatabaseCatalog::instance().getDependencies(storage_id).empty())
|
||||
continue;
|
||||
|
||||
storages_to_drop.emplace_back(storage_id);
|
||||
}
|
||||
|
||||
lock.unlock();
|
||||
for (const auto & storage_id : storages_to_drop)
|
||||
executeDropQuery(storage_id, global_context);
|
||||
lock.lock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void TemporaryLiveViewCleaner::stopBackgroundThread()
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
if (background_thread.joinable())
|
||||
{
|
||||
background_thread_should_exit = true;
|
||||
background_thread_wake_up.notify_one();
|
||||
background_thread.join();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
51
src/Storages/LiveView/TemporaryLiveViewCleaner.h
Normal file
51
src/Storages/LiveView/TemporaryLiveViewCleaner.h
Normal file
@ -0,0 +1,51 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <chrono>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class StorageLiveView;
|
||||
struct StorageID;
|
||||
|
||||
/// This class removes temporary live views in the background thread when it's possible.
|
||||
/// There should only a single instance of this class.
|
||||
class TemporaryLiveViewCleaner
|
||||
{
|
||||
public:
|
||||
static TemporaryLiveViewCleaner & instance() { return *the_instance; }
|
||||
|
||||
/// Drops a specified live view after a while if it's temporary.
|
||||
void addView(const std::shared_ptr<StorageLiveView> & view);
|
||||
|
||||
/// Should be called once.
|
||||
static void init(Context & global_context_);
|
||||
static void shutdown();
|
||||
|
||||
private:
|
||||
friend std::unique_ptr<TemporaryLiveViewCleaner>::deleter_type;
|
||||
|
||||
TemporaryLiveViewCleaner(Context & global_context_);
|
||||
~TemporaryLiveViewCleaner();
|
||||
|
||||
void backgroundThreadFunc();
|
||||
void stopBackgroundThread();
|
||||
|
||||
struct StorageAndTimeOfCheck
|
||||
{
|
||||
std::weak_ptr<StorageLiveView> storage;
|
||||
std::chrono::system_clock::time_point time_of_check;
|
||||
bool operator <(const StorageAndTimeOfCheck & other) const { return time_of_check < other.time_of_check; }
|
||||
};
|
||||
|
||||
static std::unique_ptr<TemporaryLiveViewCleaner> the_instance;
|
||||
Context & global_context;
|
||||
std::mutex mutex;
|
||||
std::vector<StorageAndTimeOfCheck> views;
|
||||
ThreadFromGlobalPool background_thread;
|
||||
std::atomic<bool> background_thread_should_exit = false;
|
||||
std::condition_variable background_thread_wake_up;
|
||||
};
|
||||
|
||||
}
|
@ -20,6 +20,7 @@ SRCS(
|
||||
IStorage.cpp
|
||||
KeyDescription.cpp
|
||||
LiveView/StorageLiveView.cpp
|
||||
LiveView/TemporaryLiveViewCleaner.cpp
|
||||
MergeTree/ActiveDataPartSet.cpp
|
||||
MergeTree/AllMergeSelector.cpp
|
||||
MergeTree/BackgroundProcessingPool.cpp
|
||||
|
Loading…
Reference in New Issue
Block a user