watch events support for window view

This commit is contained in:
Vxider 2021-12-12 07:45:55 +00:00
parent 900e443900
commit 7eb18ea21e
4 changed files with 65 additions and 29 deletions

View File

@ -298,13 +298,16 @@ Note that elements emitted by a late firing should be treated as updated results
### Monitoring New Windows {#window-view-monitoring}
Window view supports the `WATCH` query to constantly append the processing results to the console or use `TO` syntax to output the results to a table.
Window view supports the [WATCH](../../../sql-reference/statements/watch.md) query to monitoring changes, or use `TO` syntax to output the results to a table.
``` sql
WATCH [db.]name [LIMIT n]
WATCH [db.]window_view
[EVENTS]
[LIMIT n]
[FORMAT format]
```
`WATCH` query acts similar as in `LIVE VIEW`. A `LIMIT` can be specified to set the number of updates to receive before terminating the query.
`WATCH` query acts similar as in `LIVE VIEW`. A `LIMIT` can be specified to set the number of updates to receive before terminating the query. The `EVENTS` clause can be used to obtain a short form of the `WATCH` query where instead of the query result you will just get the latest query watermark.
### Settings {#window-view-settings}

View File

@ -527,7 +527,7 @@ inline void StorageWindowView::fire(UInt32 watermark)
for (auto & watch_stream : watch_streams)
{
if (auto watch_stream_ptr = watch_stream.lock())
watch_stream_ptr->addBlock(block);
watch_stream_ptr->addBlock(block, watermark);
}
}
if (!target_table_id.empty())
@ -910,7 +910,11 @@ Pipe StorageWindowView::watch(
}
auto reader = std::make_shared<WindowViewSource>(
*this, has_limit, limit,
*this,
query.is_watch_events,
window_view_timezone,
has_limit,
limit,
local_context->getSettingsRef().window_view_heartbeat_interval.totalSeconds());
std::lock_guard lock(fire_signal_mutex);
@ -1077,7 +1081,8 @@ ASTPtr StorageWindowView::innerQueryParser(const ASTSelectQuery & query)
ErrorCodes::ILLEGAL_COLUMN,
"Illegal column #{} of time zone argument of function, must be constant string",
time_zone_arg_num);
time_zone = &DateLUT::instance(time_zone_ast->value.safeGet<String>());
window_view_timezone = time_zone_ast->value.safeGet<String>();
time_zone = &DateLUT::instance(window_view_timezone);
}
else
time_zone = &DateLUT::instance();
@ -1354,9 +1359,12 @@ Block & StorageWindowView::getHeader() const
sample_block = InterpreterSelectQuery(
select_query->clone(), window_view_context, getParentStorage(), nullptr,
SelectQueryOptions(QueryProcessingStage::Complete)).getSampleBlock();
/// convert all columns to full columns
/// in case some of them are constant
for (size_t i = 0; i < sample_block.columns(); ++i)
{
sample_block.safeGetByPosition(i).column = sample_block.safeGetByPosition(i).column->convertToFullColumnIfConst();
}
}
return sample_block;
}

View File

@ -210,6 +210,7 @@ private:
BackgroundSchedulePool::TaskHolder clean_cache_task;
BackgroundSchedulePool::TaskHolder fire_task;
String window_view_timezone;
String function_now_timezone;
ASTPtr innerQueryParser(const ASTSelectQuery & query);

View File

@ -12,73 +12,94 @@ class WindowViewSource : public SourceWithProgress
public:
WindowViewSource(
StorageWindowView & storage_,
const bool is_events_,
String window_view_timezone_,
const bool has_limit_,
const UInt64 limit_,
const UInt64 heartbeat_interval_sec_)
: SourceWithProgress(storage_.getHeader())
: SourceWithProgress(
is_events_ ? Block(
{ColumnWithTypeAndName(ColumnUInt32::create(), std::make_shared<DataTypeDateTime>(window_view_timezone_), "watermark")})
: storage_.getHeader())
, storage(storage_)
, is_events(is_events_)
, window_view_timezone(window_view_timezone_)
, has_limit(has_limit_)
, limit(limit_)
, heartbeat_interval_sec(heartbeat_interval_sec_) {}
, heartbeat_interval_sec(heartbeat_interval_sec_)
{
if (is_events)
header.insert(
ColumnWithTypeAndName(ColumnUInt32::create(), std::make_shared<DataTypeDateTime>(window_view_timezone_), "watermark"));
else
header = storage.getHeader();
}
String getName() const override { return "WindowViewSource"; }
void addBlock(Block block_)
void addBlock(Block block_, UInt32 watermark)
{
std::lock_guard lock(blocks_mutex);
blocks.push_back(std::move(block_));
blocks_with_watermark.push_back(std::make_pair(std::move(block_), watermark));
}
protected:
Block getHeader() const { return storage.getHeader(); }
Block getHeader() const { return header; }
Chunk generate() override
{
auto block = generateImpl();
return Chunk(block.getColumns(), block.rows());
Block block;
UInt32 watermark;
std::tie(block, watermark) = generateImpl();
if (is_events)
{
return Chunk(
{DataTypeDateTime(window_view_timezone).createColumnConst(block.rows(), watermark)->convertToFullColumnIfConst()},
block.rows());
}
else
return Chunk(block.getColumns(), block.rows());
}
Block generateImpl()
std::pair<Block, UInt32> generateImpl()
{
Block res;
if (has_limit && num_updates == static_cast<Int64>(limit))
return Block();
return std::make_pair(Block(), 0);
if (isCancelled() || storage.shutdown_called)
return Block();
return std::make_pair(Block(), 0);
std::unique_lock lock(blocks_mutex);
if (blocks.empty())
if (blocks_with_watermark.empty())
{
if (!end_of_blocks)
{
end_of_blocks = true;
num_updates += 1;
return getHeader();
return std::make_pair(getHeader(), 0);
}
storage.fire_condition.wait_for(lock, std::chrono::seconds(heartbeat_interval_sec));
if (isCancelled() || storage.shutdown_called)
{
return Block();
return std::make_pair(Block(), 0);
}
if (blocks.empty())
return getHeader();
if (blocks_with_watermark.empty())
return std::make_pair(getHeader(), 0);
else
{
end_of_blocks = false;
res = blocks.front();
blocks.pop_front();
auto res = blocks_with_watermark.front();
blocks_with_watermark.pop_front();
return res;
}
}
else
{
res = blocks.front();
blocks.pop_front();
auto res = blocks_with_watermark.front();
blocks_with_watermark.pop_front();
return res;
}
}
@ -86,8 +107,11 @@ protected:
private:
StorageWindowView & storage;
BlocksList blocks;
std::list<std::pair<Block, UInt32>> blocks_with_watermark;
Block header;
const bool is_events;
String window_view_timezone;
const bool has_limit;
const UInt64 limit;
Int64 num_updates = -1;