small update

This commit is contained in:
Vxider 2021-12-13 02:38:16 +00:00
parent caad9f6c79
commit f6dceb53a5
2 changed files with 8 additions and 8 deletions

View File

@ -910,7 +910,7 @@ Pipe StorageWindowView::watch(
}
auto reader = std::make_shared<WindowViewSource>(
*this,
std::static_pointer_cast<StorageWindowView>(shared_from_this()),
query.is_watch_events,
window_view_timezone,
has_limit,

View File

@ -11,7 +11,7 @@ class WindowViewSource : public SourceWithProgress
{
public:
WindowViewSource(
StorageWindowView & storage_,
std::shared_ptr<StorageWindowView> storage_,
const bool is_events_,
String window_view_timezone_,
const bool has_limit_,
@ -20,7 +20,7 @@ public:
: SourceWithProgress(
is_events_ ? Block(
{ColumnWithTypeAndName(ColumnUInt32::create(), std::make_shared<DataTypeDateTime>(window_view_timezone_), "watermark")})
: storage_.getHeader())
: storage_->getHeader())
, storage(storage_)
, is_events(is_events_)
, window_view_timezone(window_view_timezone_)
@ -32,7 +32,7 @@ public:
header.insert(
ColumnWithTypeAndName(ColumnUInt32::create(), std::make_shared<DataTypeDateTime>(window_view_timezone_), "watermark"));
else
header = storage.getHeader();
header = storage->getHeader();
}
String getName() const override { return "WindowViewSource"; }
@ -68,7 +68,7 @@ protected:
if (has_limit && num_updates == static_cast<Int64>(limit))
return {Block(), 0};
if (isCancelled() || storage.shutdown_called)
if (isCancelled() || storage->shutdown_called)
return {Block(), 0};
std::unique_lock lock(blocks_mutex);
@ -81,9 +81,9 @@ protected:
return {getHeader(), 0};
}
storage.fire_condition.wait_for(lock, std::chrono::seconds(heartbeat_interval_sec));
storage->fire_condition.wait_for(lock, std::chrono::seconds(heartbeat_interval_sec));
if (isCancelled() || storage.shutdown_called)
if (isCancelled() || storage->shutdown_called)
{
return {Block(), 0};
}
@ -107,7 +107,7 @@ protected:
}
private:
StorageWindowView & storage;
std::shared_ptr<StorageWindowView> storage;
std::list<std::pair<Block, UInt32>> blocks_with_watermark;