mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Reverting all the changes in LIVE VIEW table.
This commit is contained in:
parent
8621e93460
commit
684c7f943a
@ -1,13 +1,13 @@
|
||||
#pragma once
|
||||
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
#include <Storages/LiveView/Events.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/** Implements LIVE VIEW table WATCH input stream.
|
||||
* Keeps stream alive by outputting blocks with no rows
|
||||
* Keeps stream alive by outputing blocks with no rows
|
||||
* based on period specified by the heartbeat interval.
|
||||
*/
|
||||
class LiveViewBlockInputStream : public IBlockInputStream
|
||||
@ -18,9 +18,10 @@ using NonBlockingResult = std::pair<Block, bool>;
|
||||
public:
|
||||
~LiveViewBlockInputStream() override
|
||||
{
|
||||
/// Wakeup storage events thread if we are the last active user
|
||||
/// Start storage no users thread
|
||||
/// if we are the last active user
|
||||
if (!storage->is_dropped && blocks_ptr.use_count() < 3)
|
||||
storage->wakeupEventsThread(LiveViewEvent::LAST_USER);
|
||||
storage->startNoUsersThread(temporary_live_view_timeout_sec);
|
||||
}
|
||||
|
||||
LiveViewBlockInputStream(std::shared_ptr<StorageLiveView> storage_,
|
||||
@ -28,12 +29,14 @@ public:
|
||||
std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr_,
|
||||
std::shared_ptr<bool> active_ptr_,
|
||||
const bool has_limit_, const UInt64 limit_,
|
||||
const UInt64 heartbeat_interval_sec_)
|
||||
const UInt64 heartbeat_interval_sec_,
|
||||
const UInt64 temporary_live_view_timeout_sec_)
|
||||
: storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)),
|
||||
blocks_metadata_ptr(std::move(blocks_metadata_ptr_)),
|
||||
active_ptr(std::move(active_ptr_)),
|
||||
has_limit(has_limit_), limit(limit_),
|
||||
heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000)
|
||||
heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000),
|
||||
temporary_live_view_timeout_sec(temporary_live_view_timeout_sec_)
|
||||
{
|
||||
/// grab active pointer
|
||||
active = active_ptr.lock();
|
||||
@ -202,6 +205,7 @@ private:
|
||||
Int64 num_updates = -1;
|
||||
bool end_of_blocks = false;
|
||||
UInt64 heartbeat_interval_usec;
|
||||
UInt64 temporary_live_view_timeout_sec;
|
||||
UInt64 last_event_timestamp_usec = 0;
|
||||
};
|
||||
|
||||
|
@ -45,8 +45,8 @@ public:
|
||||
|
||||
(*storage.blocks_ptr) = new_blocks;
|
||||
(*storage.blocks_metadata_ptr) = new_blocks_metadata;
|
||||
|
||||
storage.condition.notify_all();
|
||||
storage.wakeupEventsThread(LiveViewEvent::NEW_BLOCKS);
|
||||
}
|
||||
|
||||
new_blocks.reset();
|
||||
|
@ -19,13 +19,13 @@ limitations under the License. */
|
||||
#include <DataStreams/OneBlockInputStream.h>
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
#include <Storages/LiveView/StorageLiveView.h>
|
||||
#include <Storages/LiveView/Events.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/** Implements LIVE VIEW table WATCH EVENTS input stream.
|
||||
* Keeps stream alive by outputting blocks with no rows
|
||||
* Keeps stream alive by outputing blocks with no rows
|
||||
* based on period specified by the heartbeat interval.
|
||||
*/
|
||||
class LiveViewEventsBlockInputStream : public IBlockInputStream
|
||||
@ -36,9 +36,10 @@ using NonBlockingResult = std::pair<Block, bool>;
|
||||
public:
|
||||
~LiveViewEventsBlockInputStream() override
|
||||
{
|
||||
/// Wakeup storage events thread if we are the last active user
|
||||
/// Start storage no users thread
|
||||
/// if we are the last active user
|
||||
if (!storage->is_dropped && blocks_ptr.use_count() < 3)
|
||||
storage->wakeupEventsThread(LiveViewEvent::LAST_USER);
|
||||
storage->startNoUsersThread(temporary_live_view_timeout_sec);
|
||||
}
|
||||
/// length default -2 because we want LIMIT to specify number of updates so that LIMIT 1 waits for 1 update
|
||||
/// and LIMIT 0 just returns data without waiting for any updates
|
||||
@ -47,12 +48,14 @@ public:
|
||||
std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr_,
|
||||
std::shared_ptr<bool> active_ptr_,
|
||||
const bool has_limit_, const UInt64 limit_,
|
||||
const UInt64 heartbeat_interval_sec_)
|
||||
const UInt64 heartbeat_interval_sec_,
|
||||
const UInt64 temporary_live_view_timeout_sec_)
|
||||
: storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)),
|
||||
blocks_metadata_ptr(std::move(blocks_metadata_ptr_)),
|
||||
active_ptr(std::move(active_ptr_)), has_limit(has_limit_),
|
||||
limit(limit_),
|
||||
heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000)
|
||||
heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000),
|
||||
temporary_live_view_timeout_sec(temporary_live_view_timeout_sec_)
|
||||
{
|
||||
/// grab active pointer
|
||||
active = active_ptr.lock();
|
||||
@ -233,6 +236,7 @@ private:
|
||||
Int64 num_updates = -1;
|
||||
bool end_of_blocks = false;
|
||||
UInt64 heartbeat_interval_usec;
|
||||
UInt64 temporary_live_view_timeout_sec;
|
||||
UInt64 last_event_timestamp_usec = 0;
|
||||
Poco::Timestamp timestamp;
|
||||
};
|
||||
|
@ -271,11 +271,6 @@ StorageLiveView::StorageLiveView(
|
||||
is_temporary = true;
|
||||
temporary_live_view_timeout = *query.live_view_timeout;
|
||||
}
|
||||
if (query.live_view_auto_refresh)
|
||||
{
|
||||
is_auto_refreshed = true;
|
||||
auto_refresh_live_view_interval = *query.live_view_auto_refresh;
|
||||
}
|
||||
|
||||
blocks_ptr = std::make_shared<BlocksPtr>();
|
||||
blocks_metadata_ptr = std::make_shared<BlocksMetadataPtr>();
|
||||
@ -376,7 +371,6 @@ bool StorageLiveView::getNewBlocks()
|
||||
(*blocks_ptr) = new_blocks;
|
||||
(*blocks_metadata_ptr) = new_blocks_metadata;
|
||||
updated = true;
|
||||
wakeupEventsThread(LiveViewEvent::NEW_BLOCKS);
|
||||
}
|
||||
}
|
||||
return updated;
|
||||
@ -393,44 +387,34 @@ void StorageLiveView::checkTableCanBeDropped() const
|
||||
}
|
||||
}
|
||||
|
||||
void StorageLiveView::eventsThread(std::shared_ptr<StorageLiveView> storage)
|
||||
void StorageLiveView::noUsersThread(std::shared_ptr<StorageLiveView> storage, const UInt64 & timeout)
|
||||
{
|
||||
bool drop_table = false;
|
||||
|
||||
if (storage->shutdown_called)
|
||||
return;
|
||||
|
||||
auto table_id = storage->getStorageID();
|
||||
UInt64 next_event_timeout = 15;
|
||||
unsigned int event = 0;
|
||||
UInt64 event_timestamp_usec = 0;
|
||||
|
||||
while (true)
|
||||
{
|
||||
std::unique_lock lock(storage->events_thread_wakeup_mutex);
|
||||
if (storage->events_thread_condition.wait_for(lock, std::chrono::seconds(next_event_timeout), [&] { return storage->events_thread_wakeup; }))
|
||||
while (true)
|
||||
{
|
||||
storage->events_thread_wakeup = false;
|
||||
event = storage->events_thread_event;
|
||||
event_timestamp_usec = storage->events_thread_event_timestamp_usec;
|
||||
storage->events_thread_event = 0;
|
||||
storage->events_thread_event_timestamp_usec = 0;
|
||||
std::cerr << "!!!: eventsThread got EVENT " << event << "\n";
|
||||
|
||||
if (storage->shutdown_called || event == LiveViewEvent::SHUTDOWN)
|
||||
return;
|
||||
|
||||
//if (storage->hasUsers())
|
||||
// return;
|
||||
//if (!DatabaseCatalog::instance().getDependencies(table_id).empty())
|
||||
// continue;
|
||||
//if (storage->isTemporary())
|
||||
// drop_table = true;
|
||||
continue;
|
||||
std::unique_lock lock(storage->no_users_thread_wakeup_mutex);
|
||||
if (!storage->no_users_thread_condition.wait_for(lock, std::chrono::seconds(timeout), [&] { return storage->no_users_thread_wakeup; }))
|
||||
{
|
||||
storage->no_users_thread_wakeup = false;
|
||||
if (storage->shutdown_called)
|
||||
return;
|
||||
if (storage->hasUsers())
|
||||
return;
|
||||
if (!DatabaseCatalog::instance().getDependencies(table_id).empty())
|
||||
continue;
|
||||
drop_table = true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
// timeout sleeping
|
||||
std::cerr << "!!!: eventsThread timeout\n";
|
||||
}
|
||||
|
||||
/*if (drop_table)
|
||||
if (drop_table)
|
||||
{
|
||||
if (DatabaseCatalog::instance().tryGetTable(table_id))
|
||||
{
|
||||
@ -450,49 +434,46 @@ void StorageLiveView::eventsThread(std::shared_ptr<StorageLiveView> storage)
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
}
|
||||
}*/
|
||||
}
|
||||
|
||||
void StorageLiveView::wakeupEventsThread(const LiveViewEvent & event)
|
||||
{
|
||||
std::lock_guard events_thread_lock(events_thread_mutex);
|
||||
if (events_thread.joinable())
|
||||
{
|
||||
std::lock_guard lock(events_thread_wakeup_mutex);
|
||||
events_thread_wakeup = true;
|
||||
events_thread_event |= event;
|
||||
events_thread_event_timestamp_usec = static_cast<UInt64>(Poco::Timestamp().epochMicroseconds());
|
||||
|
||||
if (event == LiveViewEvent::NEW_USER)
|
||||
events_thread_event &= ~static_cast<unsigned int>(LiveViewEvent::LAST_USER);
|
||||
else if (event == LiveViewEvent::LAST_USER)
|
||||
events_thread_event &= ~static_cast<unsigned int>(LiveViewEvent::NEW_USER);
|
||||
|
||||
events_thread_condition.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
void StorageLiveView::startEventsThread()
|
||||
void StorageLiveView::startNoUsersThread(const UInt64 & timeout)
|
||||
{
|
||||
if (is_temporary || is_auto_refreshed)
|
||||
{
|
||||
std::lock_guard events_thread_lock(events_thread_mutex);
|
||||
bool expected = false;
|
||||
if (!start_no_users_thread_called.compare_exchange_strong(expected, true))
|
||||
return;
|
||||
|
||||
if (events_thread.joinable())
|
||||
if (is_temporary)
|
||||
{
|
||||
std::lock_guard no_users_thread_lock(no_users_thread_mutex);
|
||||
|
||||
if (shutdown_called)
|
||||
return;
|
||||
|
||||
if (no_users_thread.joinable())
|
||||
{
|
||||
std::lock_guard lock(events_thread_wakeup_mutex);
|
||||
events_thread_wakeup = false;
|
||||
{
|
||||
std::lock_guard lock(no_users_thread_wakeup_mutex);
|
||||
no_users_thread_wakeup = true;
|
||||
no_users_thread_condition.notify_one();
|
||||
}
|
||||
no_users_thread.join();
|
||||
}
|
||||
events_thread = std::thread(&StorageLiveView::eventsThread,
|
||||
std::static_pointer_cast<StorageLiveView>(shared_from_this()));
|
||||
{
|
||||
std::lock_guard lock(no_users_thread_wakeup_mutex);
|
||||
no_users_thread_wakeup = false;
|
||||
}
|
||||
if (!is_dropped)
|
||||
no_users_thread = std::thread(&StorageLiveView::noUsersThread,
|
||||
std::static_pointer_cast<StorageLiveView>(shared_from_this()), timeout);
|
||||
}
|
||||
|
||||
start_no_users_thread_called = false;
|
||||
}
|
||||
|
||||
void StorageLiveView::startup()
|
||||
{
|
||||
startEventsThread();
|
||||
startNoUsersThread(temporary_live_view_timeout);
|
||||
}
|
||||
|
||||
void StorageLiveView::shutdown()
|
||||
@ -503,14 +484,13 @@ void StorageLiveView::shutdown()
|
||||
return;
|
||||
|
||||
{
|
||||
std::lock_guard events_thread_lock(events_thread_mutex);
|
||||
if (events_thread.joinable())
|
||||
std::lock_guard no_users_thread_lock(no_users_thread_mutex);
|
||||
if (no_users_thread.joinable())
|
||||
{
|
||||
{
|
||||
std::lock_guard lock(events_thread_wakeup_mutex);
|
||||
events_thread_wakeup = true;
|
||||
events_thread_event = LiveViewEvent::SHUTDOWN;
|
||||
events_thread_condition.notify_one();
|
||||
std::lock_guard lock(no_users_thread_wakeup_mutex);
|
||||
no_users_thread_wakeup = true;
|
||||
no_users_thread_condition.notify_one();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -521,9 +501,9 @@ StorageLiveView::~StorageLiveView()
|
||||
shutdown();
|
||||
|
||||
{
|
||||
std::lock_guard lock(events_thread_mutex);
|
||||
if (events_thread.joinable())
|
||||
events_thread.detach();
|
||||
std::lock_guard lock(no_users_thread_mutex);
|
||||
if (no_users_thread.joinable())
|
||||
no_users_thread.detach();
|
||||
}
|
||||
}
|
||||
|
||||
@ -592,9 +572,18 @@ BlockInputStreams StorageLiveView::watch(
|
||||
auto reader = std::make_shared<LiveViewEventsBlockInputStream>(
|
||||
std::static_pointer_cast<StorageLiveView>(shared_from_this()),
|
||||
blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit,
|
||||
context.getSettingsRef().live_view_heartbeat_interval.totalSeconds());
|
||||
context.getSettingsRef().live_view_heartbeat_interval.totalSeconds(),
|
||||
temporary_live_view_timeout);
|
||||
|
||||
wakeupEventsThread(LiveViewEvent::NEW_USER);
|
||||
{
|
||||
std::lock_guard no_users_thread_lock(no_users_thread_mutex);
|
||||
if (no_users_thread.joinable())
|
||||
{
|
||||
std::lock_guard lock(no_users_thread_wakeup_mutex);
|
||||
no_users_thread_wakeup = true;
|
||||
no_users_thread_condition.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
@ -614,9 +603,18 @@ BlockInputStreams StorageLiveView::watch(
|
||||
auto reader = std::make_shared<LiveViewBlockInputStream>(
|
||||
std::static_pointer_cast<StorageLiveView>(shared_from_this()),
|
||||
blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit,
|
||||
context.getSettingsRef().live_view_heartbeat_interval.totalSeconds());
|
||||
context.getSettingsRef().live_view_heartbeat_interval.totalSeconds(),
|
||||
temporary_live_view_timeout);
|
||||
|
||||
wakeupEventsThread(LiveViewEvent::NEW_USER);
|
||||
{
|
||||
std::lock_guard no_users_thread_lock(no_users_thread_mutex);
|
||||
if (no_users_thread.joinable())
|
||||
{
|
||||
std::lock_guard lock(no_users_thread_wakeup_mutex);
|
||||
no_users_thread_wakeup = true;
|
||||
no_users_thread_condition.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
|
@ -13,7 +13,6 @@ limitations under the License. */
|
||||
|
||||
#include <ext/shared_ptr_helper.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Storages/LiveView/Events.h>
|
||||
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
@ -73,7 +72,6 @@ public:
|
||||
bool supportsFinal() const override { return true; }
|
||||
|
||||
bool isTemporary() { return is_temporary; }
|
||||
bool isAutoRefreshed() { return is_auto_refreshed; }
|
||||
|
||||
/// Check if we have any readers
|
||||
/// must be called with mutex locked
|
||||
@ -88,14 +86,11 @@ public:
|
||||
{
|
||||
return active_ptr.use_count() > 1;
|
||||
}
|
||||
|
||||
/// events thread mutex, predicate and wake up condition
|
||||
void wakeupEventsThread(const LiveViewEvent & event);
|
||||
std::mutex events_thread_wakeup_mutex;
|
||||
bool events_thread_wakeup = false;
|
||||
unsigned int events_thread_event = 0;
|
||||
UInt64 events_thread_event_timestamp_usec = 0;
|
||||
std::condition_variable events_thread_condition;
|
||||
/// No users thread mutex, predicate and wake up condition
|
||||
void startNoUsersThread(const UInt64 & timeout);
|
||||
std::mutex no_users_thread_wakeup_mutex;
|
||||
bool no_users_thread_wakeup = false;
|
||||
std::condition_variable no_users_thread_condition;
|
||||
/// Get blocks hash
|
||||
/// must be called with mutex locked
|
||||
String getBlocksHashKey()
|
||||
@ -179,7 +174,6 @@ private:
|
||||
std::unique_ptr<Context> live_view_context;
|
||||
|
||||
bool is_temporary = false;
|
||||
bool is_auto_refreshed = false;
|
||||
/// Mutex to protect access to sample block and inner_blocks_query
|
||||
mutable std::mutex sample_block_lock;
|
||||
mutable Block sample_block;
|
||||
@ -198,15 +192,14 @@ private:
|
||||
std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr;
|
||||
MergeableBlocksPtr mergeable_blocks;
|
||||
|
||||
/// Background events thread for temporary and auto refresh tables
|
||||
void startEventsThread();
|
||||
static void eventsThread(std::shared_ptr<StorageLiveView> storage);
|
||||
std::mutex events_thread_mutex;
|
||||
std::thread events_thread;
|
||||
/// Background thread for temporary tables
|
||||
/// which drops this table if there are no users
|
||||
static void noUsersThread(std::shared_ptr<StorageLiveView> storage, const UInt64 & timeout);
|
||||
std::mutex no_users_thread_mutex;
|
||||
std::thread no_users_thread;
|
||||
std::atomic<bool> shutdown_called = false;
|
||||
std::atomic<bool> start_events_thread_called = false;
|
||||
std::atomic<bool> start_no_users_thread_called = false;
|
||||
UInt64 temporary_live_view_timeout;
|
||||
UInt64 auto_refresh_live_view_interval;
|
||||
|
||||
StorageLiveView(
|
||||
const StorageID & table_id_,
|
||||
|
Loading…
Reference in New Issue
Block a user