Merge pull request #32607 from Vxider/watch-events-supports

Events clause support for Window View watch query
This commit is contained in:
Kseniia Sumarokova 2021-12-14 23:08:24 +03:00 committed by GitHub
commit bf415378be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 132 additions and 34 deletions

View File

@ -298,13 +298,16 @@ Note that elements emitted by a late firing should be treated as updated results
### Monitoring New Windows {#window-view-monitoring}
Window view supports the `WATCH` query to constantly append the processing results to the console or use `TO` syntax to output the results to a table.
Window view supports the [WATCH](../../../sql-reference/statements/watch.md) query to monitoring changes, or use `TO` syntax to output the results to a table.
``` sql
WATCH [db.]name [LIMIT n]
WATCH [db.]window_view
[EVENTS]
[LIMIT n]
[FORMAT format]
```
`WATCH` query acts similar as in `LIVE VIEW`. A `LIMIT` can be specified to set the number of updates to receive before terminating the query.
`WATCH` query acts similar as in `LIVE VIEW`. A `LIMIT` can be specified to set the number of updates to receive before terminating the query. The `EVENTS` clause can be used to obtain a short form of the `WATCH` query where instead of the query result you will just get the latest query watermark.
### Settings {#window-view-settings}

View File

@ -527,7 +527,7 @@ inline void StorageWindowView::fire(UInt32 watermark)
for (auto & watch_stream : watch_streams)
{
if (auto watch_stream_ptr = watch_stream.lock())
watch_stream_ptr->addBlock(block);
watch_stream_ptr->addBlock(block, watermark);
}
}
if (!target_table_id.empty())
@ -910,7 +910,11 @@ Pipe StorageWindowView::watch(
}
auto reader = std::make_shared<WindowViewSource>(
*this, has_limit, limit,
std::static_pointer_cast<StorageWindowView>(shared_from_this()),
query.is_watch_events,
window_view_timezone,
has_limit,
limit,
local_context->getSettingsRef().window_view_heartbeat_interval.totalSeconds());
std::lock_guard lock(fire_signal_mutex);
@ -1077,7 +1081,8 @@ ASTPtr StorageWindowView::innerQueryParser(const ASTSelectQuery & query)
ErrorCodes::ILLEGAL_COLUMN,
"Illegal column #{} of time zone argument of function, must be constant string",
time_zone_arg_num);
time_zone = &DateLUT::instance(time_zone_ast->value.safeGet<String>());
window_view_timezone = time_zone_ast->value.safeGet<String>();
time_zone = &DateLUT::instance(window_view_timezone);
}
else
time_zone = &DateLUT::instance();
@ -1354,9 +1359,12 @@ Block & StorageWindowView::getHeader() const
sample_block = InterpreterSelectQuery(
select_query->clone(), window_view_context, getParentStorage(), nullptr,
SelectQueryOptions(QueryProcessingStage::Complete)).getSampleBlock();
/// convert all columns to full columns
/// in case some of them are constant
for (size_t i = 0; i < sample_block.columns(); ++i)
{
sample_block.safeGetByPosition(i).column = sample_block.safeGetByPosition(i).column->convertToFullColumnIfConst();
}
}
return sample_block;
}

View File

@ -210,6 +210,7 @@ private:
BackgroundSchedulePool::TaskHolder clean_cache_task;
BackgroundSchedulePool::TaskHolder fire_task;
String window_view_timezone;
String function_now_timezone;
ASTPtr innerQueryParser(const ASTSelectQuery & query);

View File

@ -11,83 +11,109 @@ class WindowViewSource : public SourceWithProgress
{
public:
WindowViewSource(
StorageWindowView & storage_,
std::shared_ptr<StorageWindowView> storage_,
const bool is_events_,
String window_view_timezone_,
const bool has_limit_,
const UInt64 limit_,
const UInt64 heartbeat_interval_sec_)
: SourceWithProgress(storage_.getHeader())
: SourceWithProgress(
is_events_ ? Block(
{ColumnWithTypeAndName(ColumnUInt32::create(), std::make_shared<DataTypeDateTime>(window_view_timezone_), "watermark")})
: storage_->getHeader())
, storage(storage_)
, is_events(is_events_)
, window_view_timezone(window_view_timezone_)
, has_limit(has_limit_)
, limit(limit_)
, heartbeat_interval_sec(heartbeat_interval_sec_) {}
, heartbeat_interval_sec(heartbeat_interval_sec_)
{
if (is_events)
header.insert(
ColumnWithTypeAndName(ColumnUInt32::create(), std::make_shared<DataTypeDateTime>(window_view_timezone_), "watermark"));
else
header = storage->getHeader();
}
String getName() const override { return "WindowViewSource"; }
void addBlock(Block block_)
void addBlock(Block block_, UInt32 watermark)
{
std::lock_guard lock(blocks_mutex);
blocks.push_back(std::move(block_));
blocks_with_watermark.push_back({std::move(block_), watermark});
}
protected:
Block getHeader() const { return storage.getHeader(); }
Block getHeader() const { return header; }
Chunk generate() override
{
auto block = generateImpl();
return Chunk(block.getColumns(), block.rows());
Block block;
UInt32 watermark;
std::tie(block, watermark) = generateImpl();
if (is_events)
{
return Chunk(
{DataTypeDateTime(window_view_timezone).createColumnConst(block.rows(), watermark)->convertToFullColumnIfConst()},
block.rows());
}
else
{
return Chunk(block.getColumns(), block.rows());
}
}
Block generateImpl()
std::pair<Block, UInt32> generateImpl()
{
Block res;
if (has_limit && num_updates == static_cast<Int64>(limit))
return Block();
return {Block(), 0};
if (isCancelled() || storage.shutdown_called)
return Block();
if (isCancelled() || storage->shutdown_called)
return {Block(), 0};
std::unique_lock lock(blocks_mutex);
if (blocks.empty())
if (blocks_with_watermark.empty())
{
if (!end_of_blocks)
{
end_of_blocks = true;
num_updates += 1;
return getHeader();
return {getHeader(), 0};
}
storage.fire_condition.wait_for(lock, std::chrono::seconds(heartbeat_interval_sec));
storage->fire_condition.wait_for(lock, std::chrono::seconds(heartbeat_interval_sec));
if (isCancelled() || storage.shutdown_called)
if (isCancelled() || storage->shutdown_called)
{
return Block();
return {Block(), 0};
}
if (blocks.empty())
return getHeader();
if (blocks_with_watermark.empty())
return {getHeader(), 0};
else
{
end_of_blocks = false;
res = blocks.front();
blocks.pop_front();
auto res = blocks_with_watermark.front();
blocks_with_watermark.pop_front();
return res;
}
}
else
{
res = blocks.front();
blocks.pop_front();
auto res = blocks_with_watermark.front();
blocks_with_watermark.pop_front();
return res;
}
}
private:
StorageWindowView & storage;
std::shared_ptr<StorageWindowView> storage;
BlocksList blocks;
std::list<std::pair<Block, UInt32>> blocks_with_watermark;
Block header;
const bool is_events;
String window_view_timezone;
const bool has_limit;
const UInt64 limit;
Int64 num_updates = -1;

View File

@ -0,0 +1,60 @@
#!/usr/bin/env python3
# Tags: no-parallel
import os
import sys
import signal
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
log = None
# uncomment the line below for debugging
# log=sys.stdout
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
client1.expect(prompt)
client2.expect(prompt)
client1.send('SET allow_experimental_window_view = 1')
client1.expect(prompt)
client1.send('SET window_view_heartbeat_interval = 1')
client1.expect(prompt)
client2.send('SET allow_experimental_window_view = 1')
client2.expect(prompt)
client1.send('CREATE DATABASE IF NOT EXISTS 01070_window_view_watch_events')
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS 01070_window_view_watch_events.mt NO DELAY')
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS 01070_window_view_watch_events.wv NO DELAY')
client1.expect(prompt)
client1.send("CREATE TABLE 01070_window_view_watch_events.mt(a Int32, timestamp DateTime('US/Samoa')) ENGINE=MergeTree ORDER BY tuple()")
client1.expect(prompt)
client1.send("CREATE WINDOW VIEW 01070_window_view_watch_events.wv WATERMARK=ASCENDING AS SELECT count(a) AS count, tumbleEnd(wid) AS w_end FROM 01070_window_view_watch_events.mt GROUP BY tumble(timestamp, INTERVAL '5' SECOND, 'US/Samoa') AS wid")
client1.expect(prompt)
client1.send('WATCH 01070_window_view_watch_events.wv EVENTS')
client1.expect('Query id' + end_of_block)
client2.send("INSERT INTO 01070_window_view_watch_events.mt VALUES (1, '1990/01/01 12:00:00');")
client2.expect("Ok.")
client2.send("INSERT INTO 01070_window_view_watch_events.mt VALUES (1, '1990/01/01 12:00:06');")
client2.expect("Ok.")
client1.expect('1990-01-01 12:00:05' + end_of_block)
client1.expect('Progress: 1.00 rows.*\)')
# send Ctrl-C
client1.send('\x03', eol='')
match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]:
client1.send(client1.command)
client1.expect(prompt)
client1.send('DROP TABLE 01070_window_view_watch_events.wv NO DELAY;')
client1.expect(prompt)
client1.send('DROP TABLE 01070_window_view_watch_events.mt;')
client1.expect(prompt)
client1.send('DROP DATABASE IF EXISTS 01070_window_view_watch_events;')
client1.expect(prompt)