mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 09:32:01 +00:00
Working on adding AUTO REFRESH to LIVE VIEW tables.
This commit is contained in:
parent
398ed4d9c2
commit
855e06b031
@ -36,6 +36,7 @@
|
|||||||
#define DEFAULT_MERGE_BLOCK_SIZE 8192
|
#define DEFAULT_MERGE_BLOCK_SIZE 8192
|
||||||
|
|
||||||
#define DEFAULT_TEMPORARY_LIVE_VIEW_TIMEOUT_SEC 5
|
#define DEFAULT_TEMPORARY_LIVE_VIEW_TIMEOUT_SEC 5
|
||||||
|
#define DEFAULT_AUTO_REFRESH_LIVE_VIEW_INTERVAL_SEC 300
|
||||||
#define SHOW_CHARS_ON_SYNTAX_ERROR ptrdiff_t(160)
|
#define SHOW_CHARS_ON_SYNTAX_ERROR ptrdiff_t(160)
|
||||||
#define DEFAULT_LIVE_VIEW_HEARTBEAT_INTERVAL_SEC 15
|
#define DEFAULT_LIVE_VIEW_HEARTBEAT_INTERVAL_SEC 15
|
||||||
#define DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE 1024
|
#define DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE 1024
|
||||||
|
@ -406,6 +406,7 @@ struct Settings : public SettingsCollection<Settings>
|
|||||||
M(SettingBool, validate_polygons, true, "Throw exception if polygon is invalid in function pointInPolygon (e.g. self-tangent, self-intersecting). If the setting is false, the function will accept invalid polygons but may silently return wrong result.", 0) \
|
M(SettingBool, validate_polygons, true, "Throw exception if polygon is invalid in function pointInPolygon (e.g. self-tangent, self-intersecting). If the setting is false, the function will accept invalid polygons but may silently return wrong result.", 0) \
|
||||||
M(SettingUInt64, max_parser_depth, 1000, "Maximum parser depth.", 0) \
|
M(SettingUInt64, max_parser_depth, 1000, "Maximum parser depth.", 0) \
|
||||||
M(SettingSeconds, temporary_live_view_timeout, DEFAULT_TEMPORARY_LIVE_VIEW_TIMEOUT_SEC, "Timeout after which temporary live view is deleted.", 0) \
|
M(SettingSeconds, temporary_live_view_timeout, DEFAULT_TEMPORARY_LIVE_VIEW_TIMEOUT_SEC, "Timeout after which temporary live view is deleted.", 0) \
|
||||||
|
M(SettingSeconds, auto_refresh_live_view_interval, DEFAULT_AUTO_REFRESH_LIVE_VIEW_INTERVAL_SEC, "The interval in seconds after which auto refreshed live view is refreshed.", 0) \
|
||||||
M(SettingBool, transform_null_in, false, "If enabled, NULL values will be matched with 'IN' operator as if they are considered equal.", 0) \
|
M(SettingBool, transform_null_in, false, "If enabled, NULL values will be matched with 'IN' operator as if they are considered equal.", 0) \
|
||||||
M(SettingBool, allow_nondeterministic_mutations, false, "Allow non-deterministic functions in ALTER UPDATE/ALTER DELETE statements", 0) \
|
M(SettingBool, allow_nondeterministic_mutations, false, "Allow non-deterministic functions in ALTER UPDATE/ALTER DELETE statements", 0) \
|
||||||
M(SettingSeconds, lock_acquire_timeout, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "How long locking request should wait before failing", 0) \
|
M(SettingSeconds, lock_acquire_timeout, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "How long locking request should wait before failing", 0) \
|
||||||
|
@ -234,9 +234,16 @@ void ASTCreateQuery::formatQueryImpl(const FormatSettings & settings, FormatStat
|
|||||||
<< (if_not_exists ? "IF NOT EXISTS " : "")
|
<< (if_not_exists ? "IF NOT EXISTS " : "")
|
||||||
<< (settings.hilite ? hilite_none : "")
|
<< (settings.hilite ? hilite_none : "")
|
||||||
<< (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table);
|
<< (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table);
|
||||||
if (live_view_timeout)
|
if (live_view_timeout || live_view_auto_refresh)
|
||||||
settings.ostr << (settings.hilite ? hilite_keyword : "") << " WITH TIMEOUT " << (settings.hilite ? hilite_none : "")
|
{
|
||||||
<< *live_view_timeout;
|
settings.ostr << (settings.hilite ? hilite_keyword : "") << " WITH " << (settings.hilite ? hilite_none : "");
|
||||||
|
if (live_view_timeout)
|
||||||
|
settings.ostr << (settings.hilite ? hilite_keyword : "") << "TIMEOUT " << (settings.hilite ? hilite_none : "")
|
||||||
|
<< *live_view_timeout;
|
||||||
|
if (live_view_auto_refresh)
|
||||||
|
settings.ostr << (live_view_timeout ? ", " : "") << (settings.hilite ? hilite_keyword : "") << "AUTO REFRESH " << (settings.hilite ? hilite_none : "")
|
||||||
|
<< *live_view_auto_refresh;
|
||||||
|
}
|
||||||
formatOnCluster(settings);
|
formatOnCluster(settings);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -73,6 +73,7 @@ public:
|
|||||||
ASTSelectWithUnionQuery * select = nullptr;
|
ASTSelectWithUnionQuery * select = nullptr;
|
||||||
ASTDictionary * dictionary = nullptr; /// dictionary definition (layout, primary key, etc.)
|
ASTDictionary * dictionary = nullptr; /// dictionary definition (layout, primary key, etc.)
|
||||||
std::optional<UInt64> live_view_timeout; /// For CREATE LIVE VIEW ... WITH TIMEOUT ...
|
std::optional<UInt64> live_view_timeout; /// For CREATE LIVE VIEW ... WITH TIMEOUT ...
|
||||||
|
std::optional<UInt64> live_view_auto_refresh; /// For CREATE LIVE VIEW ... WITH AUTO REFRESH ...
|
||||||
|
|
||||||
/** Get the text that identifies this element. */
|
/** Get the text that identifies this element. */
|
||||||
String getID(char delim) const override { return (attach ? "AttachQuery" : "CreateQuery") + (delim + database) + delim + table; }
|
String getID(char delim) const override { return (attach ? "AttachQuery" : "CreateQuery") + (delim + database) + delim + table; }
|
||||||
|
@ -510,6 +510,7 @@ bool ParserCreateLiveViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & e
|
|||||||
ASTPtr as_table;
|
ASTPtr as_table;
|
||||||
ASTPtr select;
|
ASTPtr select;
|
||||||
ASTPtr live_view_timeout;
|
ASTPtr live_view_timeout;
|
||||||
|
ASTPtr live_view_auto_refresh;
|
||||||
|
|
||||||
String cluster_str;
|
String cluster_str;
|
||||||
bool attach = false;
|
bool attach = false;
|
||||||
@ -542,10 +543,27 @@ bool ParserCreateLiveViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & e
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ParserKeyword{"WITH TIMEOUT"}.ignore(pos, expected))
|
if (ParserKeyword{"WITH"}.ignore(pos, expected))
|
||||||
{
|
{
|
||||||
if (!ParserNumber{}.parse(pos, live_view_timeout, expected))
|
do
|
||||||
live_view_timeout = std::make_shared<ASTLiteral>(static_cast<UInt64>(DEFAULT_TEMPORARY_LIVE_VIEW_TIMEOUT_SEC));
|
{
|
||||||
|
if (!live_view_timeout && ParserKeyword{"TIMEOUT"}.ignore(pos, expected))
|
||||||
|
{
|
||||||
|
if (!ParserNumber{}.parse(pos, live_view_timeout, expected))
|
||||||
|
live_view_timeout = std::make_shared<ASTLiteral>(static_cast<UInt64>(DEFAULT_TEMPORARY_LIVE_VIEW_TIMEOUT_SEC));
|
||||||
|
}
|
||||||
|
else if (!live_view_auto_refresh && ParserKeyword{"AUTO REFRESH"}.ignore(pos, expected))
|
||||||
|
{
|
||||||
|
if (!ParserNumber{}.parse(pos, live_view_auto_refresh, expected))
|
||||||
|
live_view_auto_refresh = std::make_shared<ASTLiteral>(static_cast<UInt64>(DEFAULT_AUTO_REFRESH_LIVE_VIEW_INTERVAL_SEC));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
while ((!live_view_timeout || !live_view_auto_refresh) && ParserToken{TokenType::Comma}.ignore(pos, expected));
|
||||||
|
|
||||||
|
if (!live_view_timeout && !live_view_auto_refresh)
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ParserKeyword{"ON"}.ignore(pos, expected))
|
if (ParserKeyword{"ON"}.ignore(pos, expected))
|
||||||
@ -608,6 +626,8 @@ bool ParserCreateLiveViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & e
|
|||||||
|
|
||||||
if (live_view_timeout)
|
if (live_view_timeout)
|
||||||
query->live_view_timeout.emplace(live_view_timeout->as<ASTLiteral &>().value.safeGet<UInt64>());
|
query->live_view_timeout.emplace(live_view_timeout->as<ASTLiteral &>().value.safeGet<UInt64>());
|
||||||
|
if (live_view_auto_refresh)
|
||||||
|
query->live_view_auto_refresh.emplace(live_view_auto_refresh->as<ASTLiteral &>().value.safeGet<UInt64>());
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -312,7 +312,7 @@ protected:
|
|||||||
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
|
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
|
||||||
};
|
};
|
||||||
|
|
||||||
/// CREATE|ATTACH LIVE VIEW [IF NOT EXISTS] [db.]name [TO [db.]name] AS SELECT ...
|
/// CREATE|ATTACH LIVE VIEW [IF NOT EXISTS] [db.]name [TO [db.]name] [WITH {TIMEOUT | AUTO REFRESH} [num]} [,...]] AS SELECT ...
|
||||||
class ParserCreateLiveViewQuery : public IParserBase
|
class ParserCreateLiveViewQuery : public IParserBase
|
||||||
{
|
{
|
||||||
protected:
|
protected:
|
||||||
|
14
src/Storages/LiveView/Events.h
Normal file
14
src/Storages/LiveView/Events.h
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
/// events processed by the events thread
|
||||||
|
enum LiveViewEvent
|
||||||
|
{
|
||||||
|
NONE = 0,
|
||||||
|
LAST_USER = 1,
|
||||||
|
NEW_USER = 2,
|
||||||
|
NEW_BLOCKS = 4,
|
||||||
|
SHUTDOWN = 8
|
||||||
|
};
|
||||||
|
}
|
@ -1,13 +1,13 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <DataStreams/IBlockInputStream.h>
|
#include <DataStreams/IBlockInputStream.h>
|
||||||
|
#include <Storages/LiveView/Events.h>
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
/** Implements LIVE VIEW table WATCH input stream.
|
/** Implements LIVE VIEW table WATCH input stream.
|
||||||
* Keeps stream alive by outputing blocks with no rows
|
* Keeps stream alive by outputting blocks with no rows
|
||||||
* based on period specified by the heartbeat interval.
|
* based on period specified by the heartbeat interval.
|
||||||
*/
|
*/
|
||||||
class LiveViewBlockInputStream : public IBlockInputStream
|
class LiveViewBlockInputStream : public IBlockInputStream
|
||||||
@ -18,10 +18,9 @@ using NonBlockingResult = std::pair<Block, bool>;
|
|||||||
public:
|
public:
|
||||||
~LiveViewBlockInputStream() override
|
~LiveViewBlockInputStream() override
|
||||||
{
|
{
|
||||||
/// Start storage no users thread
|
/// Wakeup storage events thread if we are the last active user
|
||||||
/// if we are the last active user
|
|
||||||
if (!storage->is_dropped && blocks_ptr.use_count() < 3)
|
if (!storage->is_dropped && blocks_ptr.use_count() < 3)
|
||||||
storage->startNoUsersThread(temporary_live_view_timeout_sec);
|
storage->wakeupEventsThread(LiveViewEvent::LAST_USER);
|
||||||
}
|
}
|
||||||
|
|
||||||
LiveViewBlockInputStream(std::shared_ptr<StorageLiveView> storage_,
|
LiveViewBlockInputStream(std::shared_ptr<StorageLiveView> storage_,
|
||||||
@ -29,14 +28,12 @@ public:
|
|||||||
std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr_,
|
std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr_,
|
||||||
std::shared_ptr<bool> active_ptr_,
|
std::shared_ptr<bool> active_ptr_,
|
||||||
const bool has_limit_, const UInt64 limit_,
|
const bool has_limit_, const UInt64 limit_,
|
||||||
const UInt64 heartbeat_interval_sec_,
|
const UInt64 heartbeat_interval_sec_)
|
||||||
const UInt64 temporary_live_view_timeout_sec_)
|
|
||||||
: storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)),
|
: storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)),
|
||||||
blocks_metadata_ptr(std::move(blocks_metadata_ptr_)),
|
blocks_metadata_ptr(std::move(blocks_metadata_ptr_)),
|
||||||
active_ptr(std::move(active_ptr_)),
|
active_ptr(std::move(active_ptr_)),
|
||||||
has_limit(has_limit_), limit(limit_),
|
has_limit(has_limit_), limit(limit_),
|
||||||
heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000),
|
heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000)
|
||||||
temporary_live_view_timeout_sec(temporary_live_view_timeout_sec_)
|
|
||||||
{
|
{
|
||||||
/// grab active pointer
|
/// grab active pointer
|
||||||
active = active_ptr.lock();
|
active = active_ptr.lock();
|
||||||
@ -205,7 +202,6 @@ private:
|
|||||||
Int64 num_updates = -1;
|
Int64 num_updates = -1;
|
||||||
bool end_of_blocks = false;
|
bool end_of_blocks = false;
|
||||||
UInt64 heartbeat_interval_usec;
|
UInt64 heartbeat_interval_usec;
|
||||||
UInt64 temporary_live_view_timeout_sec;
|
|
||||||
UInt64 last_event_timestamp_usec = 0;
|
UInt64 last_event_timestamp_usec = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -45,8 +45,8 @@ public:
|
|||||||
|
|
||||||
(*storage.blocks_ptr) = new_blocks;
|
(*storage.blocks_ptr) = new_blocks;
|
||||||
(*storage.blocks_metadata_ptr) = new_blocks_metadata;
|
(*storage.blocks_metadata_ptr) = new_blocks_metadata;
|
||||||
|
|
||||||
storage.condition.notify_all();
|
storage.condition.notify_all();
|
||||||
|
storage.wakeupEventsThread(LiveViewEvent::NEW_BLOCKS);
|
||||||
}
|
}
|
||||||
|
|
||||||
new_blocks.reset();
|
new_blocks.reset();
|
||||||
|
@ -19,13 +19,13 @@ limitations under the License. */
|
|||||||
#include <DataStreams/OneBlockInputStream.h>
|
#include <DataStreams/OneBlockInputStream.h>
|
||||||
#include <DataStreams/IBlockInputStream.h>
|
#include <DataStreams/IBlockInputStream.h>
|
||||||
#include <Storages/LiveView/StorageLiveView.h>
|
#include <Storages/LiveView/StorageLiveView.h>
|
||||||
|
#include <Storages/LiveView/Events.h>
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
/** Implements LIVE VIEW table WATCH EVENTS input stream.
|
/** Implements LIVE VIEW table WATCH EVENTS input stream.
|
||||||
* Keeps stream alive by outputing blocks with no rows
|
* Keeps stream alive by outputting blocks with no rows
|
||||||
* based on period specified by the heartbeat interval.
|
* based on period specified by the heartbeat interval.
|
||||||
*/
|
*/
|
||||||
class LiveViewEventsBlockInputStream : public IBlockInputStream
|
class LiveViewEventsBlockInputStream : public IBlockInputStream
|
||||||
@ -36,10 +36,9 @@ using NonBlockingResult = std::pair<Block, bool>;
|
|||||||
public:
|
public:
|
||||||
~LiveViewEventsBlockInputStream() override
|
~LiveViewEventsBlockInputStream() override
|
||||||
{
|
{
|
||||||
/// Start storage no users thread
|
/// Wakeup storage events thread if we are the last active user
|
||||||
/// if we are the last active user
|
|
||||||
if (!storage->is_dropped && blocks_ptr.use_count() < 3)
|
if (!storage->is_dropped && blocks_ptr.use_count() < 3)
|
||||||
storage->startNoUsersThread(temporary_live_view_timeout_sec);
|
storage->wakeupEventsThread(LiveViewEvent::LAST_USER);
|
||||||
}
|
}
|
||||||
/// length default -2 because we want LIMIT to specify number of updates so that LIMIT 1 waits for 1 update
|
/// length default -2 because we want LIMIT to specify number of updates so that LIMIT 1 waits for 1 update
|
||||||
/// and LIMIT 0 just returns data without waiting for any updates
|
/// and LIMIT 0 just returns data without waiting for any updates
|
||||||
@ -48,14 +47,12 @@ public:
|
|||||||
std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr_,
|
std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr_,
|
||||||
std::shared_ptr<bool> active_ptr_,
|
std::shared_ptr<bool> active_ptr_,
|
||||||
const bool has_limit_, const UInt64 limit_,
|
const bool has_limit_, const UInt64 limit_,
|
||||||
const UInt64 heartbeat_interval_sec_,
|
const UInt64 heartbeat_interval_sec_)
|
||||||
const UInt64 temporary_live_view_timeout_sec_)
|
|
||||||
: storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)),
|
: storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)),
|
||||||
blocks_metadata_ptr(std::move(blocks_metadata_ptr_)),
|
blocks_metadata_ptr(std::move(blocks_metadata_ptr_)),
|
||||||
active_ptr(std::move(active_ptr_)), has_limit(has_limit_),
|
active_ptr(std::move(active_ptr_)), has_limit(has_limit_),
|
||||||
limit(limit_),
|
limit(limit_),
|
||||||
heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000),
|
heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000)
|
||||||
temporary_live_view_timeout_sec(temporary_live_view_timeout_sec_)
|
|
||||||
{
|
{
|
||||||
/// grab active pointer
|
/// grab active pointer
|
||||||
active = active_ptr.lock();
|
active = active_ptr.lock();
|
||||||
@ -236,7 +233,6 @@ private:
|
|||||||
Int64 num_updates = -1;
|
Int64 num_updates = -1;
|
||||||
bool end_of_blocks = false;
|
bool end_of_blocks = false;
|
||||||
UInt64 heartbeat_interval_usec;
|
UInt64 heartbeat_interval_usec;
|
||||||
UInt64 temporary_live_view_timeout_sec;
|
|
||||||
UInt64 last_event_timestamp_usec = 0;
|
UInt64 last_event_timestamp_usec = 0;
|
||||||
Poco::Timestamp timestamp;
|
Poco::Timestamp timestamp;
|
||||||
};
|
};
|
||||||
|
@ -271,6 +271,11 @@ StorageLiveView::StorageLiveView(
|
|||||||
is_temporary = true;
|
is_temporary = true;
|
||||||
temporary_live_view_timeout = *query.live_view_timeout;
|
temporary_live_view_timeout = *query.live_view_timeout;
|
||||||
}
|
}
|
||||||
|
if (query.live_view_auto_refresh)
|
||||||
|
{
|
||||||
|
is_auto_refreshed = true;
|
||||||
|
auto_refresh_live_view_interval = *query.live_view_auto_refresh;
|
||||||
|
}
|
||||||
|
|
||||||
blocks_ptr = std::make_shared<BlocksPtr>();
|
blocks_ptr = std::make_shared<BlocksPtr>();
|
||||||
blocks_metadata_ptr = std::make_shared<BlocksMetadataPtr>();
|
blocks_metadata_ptr = std::make_shared<BlocksMetadataPtr>();
|
||||||
@ -371,6 +376,7 @@ bool StorageLiveView::getNewBlocks()
|
|||||||
(*blocks_ptr) = new_blocks;
|
(*blocks_ptr) = new_blocks;
|
||||||
(*blocks_metadata_ptr) = new_blocks_metadata;
|
(*blocks_metadata_ptr) = new_blocks_metadata;
|
||||||
updated = true;
|
updated = true;
|
||||||
|
wakeupEventsThread(LiveViewEvent::NEW_BLOCKS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return updated;
|
return updated;
|
||||||
@ -387,34 +393,44 @@ void StorageLiveView::checkTableCanBeDropped() const
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void StorageLiveView::noUsersThread(std::shared_ptr<StorageLiveView> storage, const UInt64 & timeout)
|
void StorageLiveView::eventsThread(std::shared_ptr<StorageLiveView> storage)
|
||||||
{
|
{
|
||||||
bool drop_table = false;
|
|
||||||
|
|
||||||
if (storage->shutdown_called)
|
if (storage->shutdown_called)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
auto table_id = storage->getStorageID();
|
auto table_id = storage->getStorageID();
|
||||||
|
UInt64 next_event_timeout = 15;
|
||||||
|
unsigned int event = 0;
|
||||||
|
UInt64 event_timestamp_usec = 0;
|
||||||
|
|
||||||
|
while (true)
|
||||||
{
|
{
|
||||||
while (true)
|
std::unique_lock lock(storage->events_thread_wakeup_mutex);
|
||||||
|
if (storage->events_thread_condition.wait_for(lock, std::chrono::seconds(next_event_timeout), [&] { return storage->events_thread_wakeup; }))
|
||||||
{
|
{
|
||||||
std::unique_lock lock(storage->no_users_thread_wakeup_mutex);
|
storage->events_thread_wakeup = false;
|
||||||
if (!storage->no_users_thread_condition.wait_for(lock, std::chrono::seconds(timeout), [&] { return storage->no_users_thread_wakeup; }))
|
event = storage->events_thread_event;
|
||||||
{
|
event_timestamp_usec = storage->events_thread_event_timestamp_usec;
|
||||||
storage->no_users_thread_wakeup = false;
|
storage->events_thread_event = 0;
|
||||||
if (storage->shutdown_called)
|
storage->events_thread_event_timestamp_usec = 0;
|
||||||
return;
|
std::cerr << "!!!: eventsThread got EVENT " << event << "\n";
|
||||||
if (storage->hasUsers())
|
|
||||||
return;
|
if (storage->shutdown_called || event == LiveViewEvent::SHUTDOWN)
|
||||||
if (!DatabaseCatalog::instance().getDependencies(table_id).empty())
|
return;
|
||||||
continue;
|
|
||||||
drop_table = true;
|
//if (storage->hasUsers())
|
||||||
}
|
// return;
|
||||||
break;
|
//if (!DatabaseCatalog::instance().getDependencies(table_id).empty())
|
||||||
|
// continue;
|
||||||
|
//if (storage->isTemporary())
|
||||||
|
// drop_table = true;
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
// timeout sleeping
|
||||||
|
std::cerr << "!!!: eventsThread timeout\n";
|
||||||
}
|
}
|
||||||
|
|
||||||
if (drop_table)
|
/*if (drop_table)
|
||||||
{
|
{
|
||||||
if (DatabaseCatalog::instance().tryGetTable(table_id))
|
if (DatabaseCatalog::instance().tryGetTable(table_id))
|
||||||
{
|
{
|
||||||
@ -434,46 +450,49 @@ void StorageLiveView::noUsersThread(std::shared_ptr<StorageLiveView> storage, co
|
|||||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}*/
|
||||||
|
}
|
||||||
|
|
||||||
|
void StorageLiveView::wakeupEventsThread(const LiveViewEvent & event)
|
||||||
|
{
|
||||||
|
std::lock_guard events_thread_lock(events_thread_mutex);
|
||||||
|
if (events_thread.joinable())
|
||||||
|
{
|
||||||
|
std::lock_guard lock(events_thread_wakeup_mutex);
|
||||||
|
events_thread_wakeup = true;
|
||||||
|
events_thread_event |= event;
|
||||||
|
events_thread_event_timestamp_usec = static_cast<UInt64>(Poco::Timestamp().epochMicroseconds());
|
||||||
|
|
||||||
|
if (event == LiveViewEvent::NEW_USER)
|
||||||
|
events_thread_event &= ~static_cast<unsigned int>(LiveViewEvent::LAST_USER);
|
||||||
|
else if (event == LiveViewEvent::LAST_USER)
|
||||||
|
events_thread_event &= ~static_cast<unsigned int>(LiveViewEvent::NEW_USER);
|
||||||
|
|
||||||
|
events_thread_condition.notify_one();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void StorageLiveView::startNoUsersThread(const UInt64 & timeout)
|
void StorageLiveView::startEventsThread()
|
||||||
{
|
{
|
||||||
bool expected = false;
|
if (is_temporary || is_auto_refreshed)
|
||||||
if (!start_no_users_thread_called.compare_exchange_strong(expected, true))
|
|
||||||
return;
|
|
||||||
|
|
||||||
if (is_temporary)
|
|
||||||
{
|
{
|
||||||
std::lock_guard no_users_thread_lock(no_users_thread_mutex);
|
std::lock_guard events_thread_lock(events_thread_mutex);
|
||||||
|
|
||||||
if (shutdown_called)
|
if (events_thread.joinable())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
if (no_users_thread.joinable())
|
|
||||||
{
|
{
|
||||||
{
|
std::lock_guard lock(events_thread_wakeup_mutex);
|
||||||
std::lock_guard lock(no_users_thread_wakeup_mutex);
|
events_thread_wakeup = false;
|
||||||
no_users_thread_wakeup = true;
|
|
||||||
no_users_thread_condition.notify_one();
|
|
||||||
}
|
|
||||||
no_users_thread.join();
|
|
||||||
}
|
}
|
||||||
{
|
events_thread = std::thread(&StorageLiveView::eventsThread,
|
||||||
std::lock_guard lock(no_users_thread_wakeup_mutex);
|
std::static_pointer_cast<StorageLiveView>(shared_from_this()));
|
||||||
no_users_thread_wakeup = false;
|
|
||||||
}
|
|
||||||
if (!is_dropped)
|
|
||||||
no_users_thread = std::thread(&StorageLiveView::noUsersThread,
|
|
||||||
std::static_pointer_cast<StorageLiveView>(shared_from_this()), timeout);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
start_no_users_thread_called = false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void StorageLiveView::startup()
|
void StorageLiveView::startup()
|
||||||
{
|
{
|
||||||
startNoUsersThread(temporary_live_view_timeout);
|
startEventsThread();
|
||||||
}
|
}
|
||||||
|
|
||||||
void StorageLiveView::shutdown()
|
void StorageLiveView::shutdown()
|
||||||
@ -484,13 +503,14 @@ void StorageLiveView::shutdown()
|
|||||||
return;
|
return;
|
||||||
|
|
||||||
{
|
{
|
||||||
std::lock_guard no_users_thread_lock(no_users_thread_mutex);
|
std::lock_guard events_thread_lock(events_thread_mutex);
|
||||||
if (no_users_thread.joinable())
|
if (events_thread.joinable())
|
||||||
{
|
{
|
||||||
{
|
{
|
||||||
std::lock_guard lock(no_users_thread_wakeup_mutex);
|
std::lock_guard lock(events_thread_wakeup_mutex);
|
||||||
no_users_thread_wakeup = true;
|
events_thread_wakeup = true;
|
||||||
no_users_thread_condition.notify_one();
|
events_thread_event = LiveViewEvent::SHUTDOWN;
|
||||||
|
events_thread_condition.notify_one();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -501,9 +521,9 @@ StorageLiveView::~StorageLiveView()
|
|||||||
shutdown();
|
shutdown();
|
||||||
|
|
||||||
{
|
{
|
||||||
std::lock_guard lock(no_users_thread_mutex);
|
std::lock_guard lock(events_thread_mutex);
|
||||||
if (no_users_thread.joinable())
|
if (events_thread.joinable())
|
||||||
no_users_thread.detach();
|
events_thread.detach();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -572,18 +592,9 @@ BlockInputStreams StorageLiveView::watch(
|
|||||||
auto reader = std::make_shared<LiveViewEventsBlockInputStream>(
|
auto reader = std::make_shared<LiveViewEventsBlockInputStream>(
|
||||||
std::static_pointer_cast<StorageLiveView>(shared_from_this()),
|
std::static_pointer_cast<StorageLiveView>(shared_from_this()),
|
||||||
blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit,
|
blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit,
|
||||||
context.getSettingsRef().live_view_heartbeat_interval.totalSeconds(),
|
context.getSettingsRef().live_view_heartbeat_interval.totalSeconds());
|
||||||
temporary_live_view_timeout);
|
|
||||||
|
|
||||||
{
|
wakeupEventsThread(LiveViewEvent::NEW_USER);
|
||||||
std::lock_guard no_users_thread_lock(no_users_thread_mutex);
|
|
||||||
if (no_users_thread.joinable())
|
|
||||||
{
|
|
||||||
std::lock_guard lock(no_users_thread_wakeup_mutex);
|
|
||||||
no_users_thread_wakeup = true;
|
|
||||||
no_users_thread_condition.notify_one();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
{
|
||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
@ -603,18 +614,9 @@ BlockInputStreams StorageLiveView::watch(
|
|||||||
auto reader = std::make_shared<LiveViewBlockInputStream>(
|
auto reader = std::make_shared<LiveViewBlockInputStream>(
|
||||||
std::static_pointer_cast<StorageLiveView>(shared_from_this()),
|
std::static_pointer_cast<StorageLiveView>(shared_from_this()),
|
||||||
blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit,
|
blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit,
|
||||||
context.getSettingsRef().live_view_heartbeat_interval.totalSeconds(),
|
context.getSettingsRef().live_view_heartbeat_interval.totalSeconds());
|
||||||
temporary_live_view_timeout);
|
|
||||||
|
|
||||||
{
|
wakeupEventsThread(LiveViewEvent::NEW_USER);
|
||||||
std::lock_guard no_users_thread_lock(no_users_thread_mutex);
|
|
||||||
if (no_users_thread.joinable())
|
|
||||||
{
|
|
||||||
std::lock_guard lock(no_users_thread_wakeup_mutex);
|
|
||||||
no_users_thread_wakeup = true;
|
|
||||||
no_users_thread_condition.notify_one();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
{
|
||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
|
@ -13,6 +13,7 @@ limitations under the License. */
|
|||||||
|
|
||||||
#include <ext/shared_ptr_helper.h>
|
#include <ext/shared_ptr_helper.h>
|
||||||
#include <Storages/IStorage.h>
|
#include <Storages/IStorage.h>
|
||||||
|
#include <Storages/LiveView/Events.h>
|
||||||
|
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
@ -72,6 +73,7 @@ public:
|
|||||||
bool supportsFinal() const override { return true; }
|
bool supportsFinal() const override { return true; }
|
||||||
|
|
||||||
bool isTemporary() { return is_temporary; }
|
bool isTemporary() { return is_temporary; }
|
||||||
|
bool isAutoRefreshed() { return is_auto_refreshed; }
|
||||||
|
|
||||||
/// Check if we have any readers
|
/// Check if we have any readers
|
||||||
/// must be called with mutex locked
|
/// must be called with mutex locked
|
||||||
@ -86,11 +88,14 @@ public:
|
|||||||
{
|
{
|
||||||
return active_ptr.use_count() > 1;
|
return active_ptr.use_count() > 1;
|
||||||
}
|
}
|
||||||
/// No users thread mutex, predicate and wake up condition
|
|
||||||
void startNoUsersThread(const UInt64 & timeout);
|
/// events thread mutex, predicate and wake up condition
|
||||||
std::mutex no_users_thread_wakeup_mutex;
|
void wakeupEventsThread(const LiveViewEvent & event);
|
||||||
bool no_users_thread_wakeup = false;
|
std::mutex events_thread_wakeup_mutex;
|
||||||
std::condition_variable no_users_thread_condition;
|
bool events_thread_wakeup = false;
|
||||||
|
unsigned int events_thread_event = 0;
|
||||||
|
UInt64 events_thread_event_timestamp_usec = 0;
|
||||||
|
std::condition_variable events_thread_condition;
|
||||||
/// Get blocks hash
|
/// Get blocks hash
|
||||||
/// must be called with mutex locked
|
/// must be called with mutex locked
|
||||||
String getBlocksHashKey()
|
String getBlocksHashKey()
|
||||||
@ -174,6 +179,7 @@ private:
|
|||||||
std::unique_ptr<Context> live_view_context;
|
std::unique_ptr<Context> live_view_context;
|
||||||
|
|
||||||
bool is_temporary = false;
|
bool is_temporary = false;
|
||||||
|
bool is_auto_refreshed = false;
|
||||||
/// Mutex to protect access to sample block and inner_blocks_query
|
/// Mutex to protect access to sample block and inner_blocks_query
|
||||||
mutable std::mutex sample_block_lock;
|
mutable std::mutex sample_block_lock;
|
||||||
mutable Block sample_block;
|
mutable Block sample_block;
|
||||||
@ -192,14 +198,15 @@ private:
|
|||||||
std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr;
|
std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr;
|
||||||
MergeableBlocksPtr mergeable_blocks;
|
MergeableBlocksPtr mergeable_blocks;
|
||||||
|
|
||||||
/// Background thread for temporary tables
|
/// Background events thread for temporary and auto refresh tables
|
||||||
/// which drops this table if there are no users
|
void startEventsThread();
|
||||||
static void noUsersThread(std::shared_ptr<StorageLiveView> storage, const UInt64 & timeout);
|
static void eventsThread(std::shared_ptr<StorageLiveView> storage);
|
||||||
std::mutex no_users_thread_mutex;
|
std::mutex events_thread_mutex;
|
||||||
std::thread no_users_thread;
|
std::thread events_thread;
|
||||||
std::atomic<bool> shutdown_called = false;
|
std::atomic<bool> shutdown_called = false;
|
||||||
std::atomic<bool> start_no_users_thread_called = false;
|
std::atomic<bool> start_events_thread_called = false;
|
||||||
UInt64 temporary_live_view_timeout;
|
UInt64 temporary_live_view_timeout;
|
||||||
|
UInt64 auto_refresh_live_view_interval;
|
||||||
|
|
||||||
StorageLiveView(
|
StorageLiveView(
|
||||||
const StorageID & table_id_,
|
const StorageID & table_id_,
|
||||||
|
Loading…
Reference in New Issue
Block a user