Merge pull request #14822 from vzakaznikov/live_view_periodic_refresh

Adding support for periodically refreshed LIVE VIEW tables
This commit is contained in:
alexey-milovidov 2021-02-01 06:11:30 +03:00 committed by GitHub
commit 08f713f177
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 352 additions and 43 deletions

View File

@ -36,6 +36,7 @@
#define DEFAULT_MERGE_BLOCK_SIZE 8192
#define DEFAULT_TEMPORARY_LIVE_VIEW_TIMEOUT_SEC 5
#define DEFAULT_PERIODIC_LIVE_VIEW_REFRESH_SEC 60
#define SHOW_CHARS_ON_SYNTAX_ERROR ptrdiff_t(160)
#define DEFAULT_LIVE_VIEW_HEARTBEAT_INTERVAL_SEC 15
#define DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE 1024

View File

@ -391,6 +391,7 @@ class IColumn;
M(Bool, validate_polygons, true, "Throw exception if polygon is invalid in function pointInPolygon (e.g. self-tangent, self-intersecting). If the setting is false, the function will accept invalid polygons but may silently return wrong result.", 0) \
M(UInt64, max_parser_depth, DBMS_DEFAULT_MAX_PARSER_DEPTH, "Maximum parser depth (recursion depth of recursive descend parser).", 0) \
M(Seconds, temporary_live_view_timeout, DEFAULT_TEMPORARY_LIVE_VIEW_TIMEOUT_SEC, "Timeout after which temporary live view is deleted.", 0) \
M(Seconds, periodic_live_view_refresh, DEFAULT_PERIODIC_LIVE_VIEW_REFRESH_SEC, "Interval after which periodically refreshed live view is forced to refresh.", 0) \
M(Bool, transform_null_in, false, "If enabled, NULL values will be matched with 'IN' operator as if they are considered equal.", 0) \
M(Bool, allow_nondeterministic_mutations, false, "Allow non-deterministic functions in ALTER UPDATE/ALTER DELETE statements", 0) \
M(Seconds, lock_acquire_timeout, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "How long locking request should wait before failing", 0) \

View File

@ -269,6 +269,18 @@ void ASTCreateQuery::formatQueryImpl(const FormatSettings & settings, FormatStat
if (live_view_timeout)
settings.ostr << (settings.hilite ? hilite_keyword : "") << " WITH TIMEOUT " << (settings.hilite ? hilite_none : "")
<< *live_view_timeout;
if (live_view_periodic_refresh)
{
if (live_view_timeout)
settings.ostr << (settings.hilite ? hilite_keyword : "") << " AND" << (settings.hilite ? hilite_none : "");
else
settings.ostr << (settings.hilite ? hilite_keyword : "") << " WITH" << (settings.hilite ? hilite_none : "");
settings.ostr << (settings.hilite ? hilite_keyword : "") << " PERIODIC REFRESH " << (settings.hilite ? hilite_none : "")
<< *live_view_periodic_refresh;
}
formatOnCluster(settings);
}
else

View File

@ -77,6 +77,8 @@ public:
ASTDictionary * dictionary = nullptr; /// dictionary definition (layout, primary key, etc.)
std::optional<UInt64> live_view_timeout; /// For CREATE LIVE VIEW ... WITH TIMEOUT ...
std::optional<UInt64> live_view_periodic_refresh; /// For CREATE LIVE VIEW ... WITH [PERIODIC] REFRESH ...
bool attach_short_syntax{false};
std::optional<String> attach_from_path = std::nullopt;

View File

@ -569,10 +569,14 @@ bool ParserCreateLiveViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & e
ASTPtr as_table;
ASTPtr select;
ASTPtr live_view_timeout;
ASTPtr live_view_periodic_refresh;
String cluster_str;
bool attach = false;
bool if_not_exists = false;
bool with_and = false;
bool with_timeout = false;
bool with_periodic_refresh = false;
if (!s_create.ignore(pos, expected))
{
@ -594,10 +598,35 @@ bool ParserCreateLiveViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & e
if (!table_name_p.parse(pos, table, expected))
return false;
if (ParserKeyword{"WITH TIMEOUT"}.ignore(pos, expected))
if (ParserKeyword{"WITH"}.ignore(pos, expected))
{
if (!ParserNumber{}.parse(pos, live_view_timeout, expected))
live_view_timeout = std::make_shared<ASTLiteral>(static_cast<UInt64>(DEFAULT_TEMPORARY_LIVE_VIEW_TIMEOUT_SEC));
if (ParserKeyword{"TIMEOUT"}.ignore(pos, expected))
{
if (!ParserNumber{}.parse(pos, live_view_timeout, expected))
{
live_view_timeout = std::make_shared<ASTLiteral>(static_cast<UInt64>(DEFAULT_TEMPORARY_LIVE_VIEW_TIMEOUT_SEC));
}
/// Optional - AND
if (ParserKeyword{"AND"}.ignore(pos, expected))
with_and = true;
with_timeout = true;
}
if (ParserKeyword{"REFRESH"}.ignore(pos, expected) || ParserKeyword{"PERIODIC REFRESH"}.ignore(pos, expected))
{
if (!ParserNumber{}.parse(pos, live_view_periodic_refresh, expected))
live_view_periodic_refresh = std::make_shared<ASTLiteral>(static_cast<UInt64>(DEFAULT_PERIODIC_LIVE_VIEW_REFRESH_SEC));
with_periodic_refresh = true;
}
else if (with_and)
return false;
if (!with_timeout && !with_periodic_refresh)
return false;
}
if (ParserKeyword{"ON"}.ignore(pos, expected))
@ -656,6 +685,9 @@ bool ParserCreateLiveViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & e
if (live_view_timeout)
query->live_view_timeout.emplace(live_view_timeout->as<ASTLiteral &>().value.safeGet<UInt64>());
if (live_view_periodic_refresh)
query->live_view_periodic_refresh.emplace(live_view_periodic_refresh->as<ASTLiteral &>().value.safeGet<UInt64>());
return true;
}

View File

@ -34,6 +34,7 @@ public:
{
new_blocks_metadata->hash = key_str;
new_blocks_metadata->version = storage.getBlocksVersion() + 1;
new_blocks_metadata->time = std::chrono::system_clock::now();
for (auto & block : *new_blocks)
{
@ -48,6 +49,15 @@ public:
storage.condition.notify_all();
}
else
{
// only update blocks time
new_blocks_metadata->hash = storage.getBlocksHashKey();
new_blocks_metadata->version = storage.getBlocksVersion();
new_blocks_metadata->time = std::chrono::system_clock::now();
(*storage.blocks_metadata_ptr) = new_blocks_metadata;
}
new_blocks.reset();
new_blocks_metadata.reset();

View File

@ -20,6 +20,7 @@ limitations under the License. */
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/SquashingBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <common/logger_useful.h>
#include <Common/typeid_cast.h>
#include <Common/SipHash.h>
@ -254,6 +255,8 @@ StorageLiveView::StorageLiveView(
live_view_context = std::make_unique<Context>(global_context);
live_view_context->makeQueryContext();
log = &Poco::Logger::get("StorageLiveView (" + table_id_.database_name + "." + table_id_.table_name + ")");
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
setInMemoryMetadata(storage_metadata);
@ -275,12 +278,21 @@ StorageLiveView::StorageLiveView(
if (query.live_view_timeout)
{
is_temporary = true;
temporary_live_view_timeout = std::chrono::seconds{*query.live_view_timeout};
temporary_live_view_timeout = Seconds {*query.live_view_timeout};
}
if (query.live_view_periodic_refresh)
{
is_periodically_refreshed = true;
periodic_live_view_refresh = Seconds {*query.live_view_periodic_refresh};
}
blocks_ptr = std::make_shared<BlocksPtr>();
blocks_metadata_ptr = std::make_shared<BlocksMetadataPtr>();
active_ptr = std::make_shared<bool>(true);
periodic_refresh_task = global_context.getSchedulePool().createTask("LieViewPeriodicRefreshTask", [this]{ periodicRefreshTaskFunc(); });
periodic_refresh_task->deactivate();
}
Block StorageLiveView::getHeader() const
@ -369,10 +381,21 @@ bool StorageLiveView::getNewBlocks()
}
new_blocks_metadata->hash = key.toHexString();
new_blocks_metadata->version = getBlocksVersion() + 1;
new_blocks_metadata->time = std::chrono::system_clock::now();
(*blocks_ptr) = new_blocks;
(*blocks_metadata_ptr) = new_blocks_metadata;
updated = true;
}
else
{
new_blocks_metadata->hash = getBlocksHashKey();
new_blocks_metadata->version = getBlocksVersion();
new_blocks_metadata->time = std::chrono::system_clock::now();
(*blocks_metadata_ptr) = new_blocks_metadata;
}
}
return updated;
}
@ -392,11 +415,18 @@ void StorageLiveView::startup()
{
if (is_temporary)
TemporaryLiveViewCleaner::instance().addView(std::static_pointer_cast<StorageLiveView>(shared_from_this()));
if (is_periodically_refreshed)
periodic_refresh_task->activate();
}
void StorageLiveView::shutdown()
{
shutdown_called = true;
if (is_periodically_refreshed)
periodic_refresh_task->deactivate();
DatabaseCatalog::instance().removeDependency(select_table_id, getStorageID());
}
@ -415,15 +445,55 @@ void StorageLiveView::drop()
condition.notify_all();
}
void StorageLiveView::refresh()
void StorageLiveView::scheduleNextPeriodicRefresh()
{
Seconds current_time = std::chrono::duration_cast<Seconds> (std::chrono::system_clock::now().time_since_epoch());
Seconds blocks_time = std::chrono::duration_cast<Seconds> (getBlocksTime().time_since_epoch());
if ((current_time - periodic_live_view_refresh) >= blocks_time)
{
refresh(false);
blocks_time = std::chrono::duration_cast<Seconds> (getBlocksTime().time_since_epoch());
}
current_time = std::chrono::duration_cast<Seconds> (std::chrono::system_clock::now().time_since_epoch());
auto next_refresh_time = blocks_time + periodic_live_view_refresh;
if (current_time >= next_refresh_time)
periodic_refresh_task->scheduleAfter(0);
else
{
auto schedule_time = std::chrono::duration_cast<MilliSeconds> (next_refresh_time - current_time);
periodic_refresh_task->scheduleAfter(static_cast<size_t>(schedule_time.count()));
}
}
void StorageLiveView::periodicRefreshTaskFunc()
{
LOG_TRACE(log, "periodic refresh task");
std::lock_guard lock(mutex);
if (hasActiveUsers())
scheduleNextPeriodicRefresh();
}
void StorageLiveView::refresh(bool grab_lock)
{
// Lock is already acquired exclusively from InterperterAlterQuery.cpp InterpreterAlterQuery::execute() method.
// So, reacquiring lock is not needed and will result in an exception.
if (grab_lock)
{
std::lock_guard lock(mutex);
if (getNewBlocks())
condition.notify_all();
}
else
{
if (getNewBlocks())
condition.notify_all();
}
}
Pipe StorageLiveView::read(
@ -435,15 +505,21 @@ Pipe StorageLiveView::read(
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
std::lock_guard lock(mutex);
if (!(*blocks_ptr))
refresh(false);
else if (is_periodically_refreshed)
{
std::lock_guard lock(mutex);
if (!(*blocks_ptr))
{
if (getNewBlocks())
condition.notify_all();
}
return Pipe(std::make_shared<BlocksSource>(blocks_ptr, getHeader()));
Seconds current_time = std::chrono::duration_cast<Seconds> (std::chrono::system_clock::now().time_since_epoch());
Seconds blocks_time = std::chrono::duration_cast<Seconds> (getBlocksTime().time_since_epoch());
if ((current_time - periodic_live_view_refresh) >= blocks_time)
refresh(false);
}
return Pipe(std::make_shared<BlocksSource>(blocks_ptr, getHeader()));
}
BlockInputStreams StorageLiveView::watch(
@ -458,6 +534,7 @@ BlockInputStreams StorageLiveView::watch(
bool has_limit = false;
UInt64 limit = 0;
BlockInputStreamPtr reader;
if (query.limit_length)
{
@ -466,45 +543,28 @@ BlockInputStreams StorageLiveView::watch(
}
if (query.is_watch_events)
{
auto reader = std::make_shared<LiveViewEventsBlockInputStream>(
reader = std::make_shared<LiveViewEventsBlockInputStream>(
std::static_pointer_cast<StorageLiveView>(shared_from_this()),
blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit,
context.getSettingsRef().live_view_heartbeat_interval.totalSeconds());
{
std::lock_guard lock(mutex);
if (!(*blocks_ptr))
{
if (getNewBlocks())
condition.notify_all();
}
}
processed_stage = QueryProcessingStage::Complete;
return { reader };
}
else
{
auto reader = std::make_shared<LiveViewBlockInputStream>(
reader = std::make_shared<LiveViewBlockInputStream>(
std::static_pointer_cast<StorageLiveView>(shared_from_this()),
blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit,
context.getSettingsRef().live_view_heartbeat_interval.totalSeconds());
{
std::lock_guard lock(mutex);
if (!(*blocks_ptr))
{
if (getNewBlocks())
condition.notify_all();
}
}
{
std::lock_guard lock(mutex);
processed_stage = QueryProcessingStage::Complete;
if (!(*blocks_ptr))
refresh(false);
return { reader };
if (is_periodically_refreshed)
scheduleNextPeriodicRefresh();
}
processed_stage = QueryProcessingStage::Complete;
return { reader };
}
NamesAndTypesList StorageLiveView::getVirtuals() const

View File

@ -13,6 +13,7 @@ limitations under the License. */
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <Core/BackgroundSchedulePool.h>
#include <mutex>
#include <condition_variable>
@ -21,10 +22,16 @@ limitations under the License. */
namespace DB
{
using Time = std::chrono::time_point<std::chrono::system_clock>;
using Seconds = std::chrono::seconds;
using MilliSeconds = std::chrono::milliseconds;
struct BlocksMetadata
{
String hash;
UInt64 version;
Time time;
};
struct MergeableBlocks
@ -75,8 +82,10 @@ public:
NamesAndTypesList getVirtuals() const override;
bool isTemporary() const { return is_temporary; }
std::chrono::seconds getTimeout() const { return temporary_live_view_timeout; }
bool isPeriodicallyRefreshed() const { return is_periodically_refreshed; }
Seconds getTimeout() const { return temporary_live_view_timeout; }
Seconds getPeriodicRefresh() const { return periodic_live_view_refresh; }
/// Check if we have any readers
/// must be called with mutex locked
@ -109,6 +118,15 @@ public:
return 0;
}
/// Get blocks time
/// must be called with mutex locked
Time getBlocksTime()
{
if (*blocks_metadata_ptr)
return (*blocks_metadata_ptr)->time;
return {};
}
/// Reset blocks
/// must be called with mutex locked
void reset()
@ -124,7 +142,7 @@ public:
void startup() override;
void shutdown() override;
void refresh();
void refresh(const bool grab_lock = true);
Pipe read(
const Names & column_names,
@ -176,8 +194,13 @@ private:
Context & global_context;
std::unique_ptr<Context> live_view_context;
Poco::Logger * log;
bool is_temporary = false;
std::chrono::seconds temporary_live_view_timeout;
bool is_periodically_refreshed = false;
Seconds temporary_live_view_timeout;
Seconds periodic_live_view_refresh;
/// Mutex to protect access to sample block and inner_blocks_query
mutable std::mutex sample_block_lock;
@ -199,6 +222,13 @@ private:
std::atomic<bool> shutdown_called = false;
/// Periodic refresh task used when [PERIODIC] REFRESH is specified in create statement
BackgroundSchedulePool::TaskHolder periodic_refresh_task;
void periodicRefreshTaskFunc();
/// Must be called with mutex locked
void scheduleNextPeriodicRefresh();
StorageLiveView(
const StorageID & table_id_,
Context & local_context,

View File

@ -0,0 +1,41 @@
#!/usr/bin/env python3
import os
import sys
import signal
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
client1.expect(prompt)
client2.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client2.send('SET allow_experimental_live_view = 1')
client2.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send("CREATE LIVE VIEW test.lv WITH REFRESH 1"
" AS SELECT value FROM system.events WHERE event = 'OSCPUVirtualTimeMicroseconds'")
client1.expect(prompt)
client1.send('WATCH test.lv FORMAT JSONEachRow')
client1.expect(r'"_version":' + end_of_block)
client1.expect(r'"_version":' + end_of_block)
client1.expect(r'"_version":' + end_of_block)
# send Ctrl-C
client1.send('\x03', eol='')
match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]:
client1.send(client1.command)
client1.expect(prompt)
client1.send('DROP TABLE test.lv')
client1.expect(prompt)

View File

@ -0,0 +1,52 @@
#!/usr/bin/env python3
import os
import sys
import time
import signal
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
client1.expect(prompt)
client2.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client2.send('SET allow_experimental_live_view = 1')
client2.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send("CREATE LIVE VIEW test.lv WITH TIMEOUT 60 AND REFRESH 1"
" AS SELECT value FROM system.events WHERE event = 'OSCPUVirtualTimeMicroseconds'")
client1.expect(prompt)
client1.send('WATCH test.lv FORMAT JSONEachRow')
client1.expect(r'"_version":' + end_of_block)
client1.expect(r'"_version":' + end_of_block)
client1.expect(r'"_version":' + end_of_block)
# send Ctrl-C
client1.send('\x03', eol='')
match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]:
client1.send(client1.command)
client1.expect(prompt)
# poll until live view table is dropped
start_time = time.time()
while True:
client1.send('SELECT * FROM test.lv FORMAT JSONEachRow')
client1.expect(prompt)
if 'Table test.lv doesn\'t exist' in client1.before:
break
if time.time() - start_time > 90:
break
# check table is dropped
client1.send('DROP TABLE test.lv')
client1.expect('Table test.lv doesn\'t exist')
client1.expect(prompt)

View File

@ -0,0 +1,68 @@
#!/usr/bin/env python3
import os
import sys
import signal
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
client1.expect(prompt)
client2.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client2.send('SET allow_experimental_live_view = 1')
client2.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('DROP DICTIONARY IF EXITS test.dict')
client1.expect(prompt)
client1.send("CREATE TABLE test.mt (a Int32, b Int32) Engine=MergeTree order by tuple()")
client1.expect(prompt)
client1.send("CREATE DICTIONARY test.dict(a Int32, b Int32) PRIMARY KEY a LAYOUT(FLAT()) " + \
"SOURCE(CLICKHOUSE(db 'test' table 'mt')) LIFETIME(1)")
client1.expect(prompt)
client1.send("CREATE LIVE VIEW test.lv WITH REFRESH 1 AS SELECT * FROM test.dict")
client1.expect(prompt)
client2.send("INSERT INTO test.mt VALUES (1,2)")
client2.expect(prompt)
client1.send('WATCH test.lv FORMAT JSONEachRow')
client1.expect(r'"_version":"1"')
client2.send("INSERT INTO test.mt VALUES (2,2)")
client2.expect(prompt)
client1.expect(r'"_version":"2"')
client2.send("INSERT INTO test.mt VALUES (3,2)")
client2.expect(prompt)
client1.expect(r'"_version":"3"')
# send Ctrl-C
client1.send('\x03', eol='')
match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]:
client1.send(client1.command)
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send('DROP DICTIONARY IF EXISTS test.dict')
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)