processing time support

This commit is contained in:
Vxider 2020-02-17 13:06:03 +08:00
parent c296b76624
commit 00c3bfb72a
28 changed files with 471 additions and 291 deletions

View File

@ -0,0 +1,39 @@
#pragma once
#include <Columns/ColumnsNumber.h>
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
/** Add timestamp column for processing time process in
* WINDOW VIEW
*/
class AddingTimestampBlockInputStream : public IBlockInputStream
{
public:
AddingTimestampBlockInputStream(const BlockInputStreamPtr & input_, UInt32 timestamp_) : input(input_), timestamp(timestamp_)
{
cached_header = input->getHeader();
cached_header.insert({ColumnUInt32::create(1, 1), std::make_shared<DataTypeDateTime>(), "____timestamp"});
}
String getName() const override { return "AddingTimestamp"; }
Block getHeader() const override { return cached_header.cloneEmpty(); }
protected:
Block readImpl() override
{
Block res = input->read();
if (res)
res.insert({ColumnUInt32::create(res.rows(), timestamp), std::make_shared<DataTypeDateTime>(), "____timestamp"});
return res;
}
private:
BlockInputStreamPtr input;
Block cached_header;
UInt32 timestamp;
};
}

View File

@ -38,9 +38,10 @@
#include <boost/lexical_cast.hpp>
#include <Common/typeid_cast.h>
#include <Storages/WindowView/AddingTimestampBlockInputStream.h>
#include <Storages/WindowView/BlocksListInputStream.h>
#include <Storages/WindowView/StorageWindowView.h>
#include <Storages/WindowView/WindowViewBlockInputStream.h>
#include <Storages/WindowView/BlocksListInputStream.h>
#include <Storages/WindowView/WindowViewProxyStorage.h>
@ -111,8 +112,8 @@ namespace
std::static_pointer_cast<ASTFunction>(ptr_)->setAlias("");
auto arrayJoin = makeASTFunction("arrayJoin", ptr_);
arrayJoin->alias = node.alias;
data.window_column_name = arrayJoin->getColumnName();
node_ptr = arrayJoin;
data.window_column_name = arrayJoin->getColumnName();
}
else if (serializeAST(node) != serializeAST(*data.window_function))
throw Exception("WINDOW VIEW only support ONE WINDOW FUNCTION", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW);
@ -120,6 +121,43 @@ namespace
}
};
class ParserProcTimeFinalMatcher
{
public:
using Visitor = InDepthNodeVisitor<ParserProcTimeFinalMatcher, true>;
struct Data
{
bool is_proctime_tumble = false;
String window_column_name;
};
static bool needChildVisit(ASTPtr &, const ASTPtr &)
{
return true;
}
static void visit(ASTPtr & ast, Data & data)
{
if (const auto * t = ast->as<ASTFunction>())
visit(*t, ast, data);
}
private:
static void visit(const ASTFunction & node, ASTPtr & node_ptr, Data & data)
{
if (node.name == "TUMBLE")
{
if (const auto * t = node.arguments->children[0]->as<ASTFunction>(); t && t->name == "now")
{
data.is_proctime_tumble = true;
node_ptr->children[0]->children[0] = std::make_shared<ASTIdentifier>("____timestamp");
data.window_column_name = node.getColumnName();
}
}
}
};
static inline IntervalKind strToIntervalKind(const String& interval_str)
{
if (interval_str == "Second")
@ -299,7 +337,7 @@ inline void StorageWindowView::clearInnerTable()
inline void StorageWindowView::flushToTable(UInt32 timestamp_)
{
//write into dependent table
StoragePtr target_table = getTargetTable();
StoragePtr target_table = getTargetStorage();
auto _blockInputStreamPtr = getNewBlocksInputStreamPtr(timestamp_);
auto _lock = target_table->lockStructureForShare(true, global_context.getCurrentQueryId());
auto stream = target_table->write(getInnerQuery(), global_context);
@ -315,12 +353,21 @@ std::shared_ptr<ASTCreateQuery> StorageWindowView::generateInnerTableCreateQuery
auto new_columns_list = std::make_shared<ASTColumns>();
auto storage = getParentStorage();
auto sample_block_
= InterpreterSelectQuery(getInnerQuery(), global_context, storage, SelectQueryOptions(QueryProcessingStage::WithMergeableState))
= InterpreterSelectQuery(getInnerQuery(), global_context, getParentStorage(), SelectQueryOptions(QueryProcessingStage::WithMergeableState))
.getSampleBlock();
auto columns_list = std::make_shared<ASTExpressionList>();
if (is_proctime_tumble)
{
auto column_window = std::make_shared<ASTColumnDeclaration>();
column_window->name = window_column_name;
column_window->type
= makeASTFunction("Tuple", std::make_shared<ASTIdentifier>("DateTime"), std::make_shared<ASTIdentifier>("DateTime"));
columns_list->children.push_back(column_window);
}
for (auto & column_ : sample_block_.getColumnsWithTypeAndName())
{
ParserIdentifierWithOptionalParameters parser;
@ -331,10 +378,10 @@ std::shared_ptr<ASTCreateQuery> StorageWindowView::generateInnerTableCreateQuery
column_dec->type = ast;
columns_list->children.push_back(column_dec);
}
auto column_fire_status = std::make_shared<ASTColumnDeclaration>();
column_fire_status->name = "____w_end";
column_fire_status->type = std::make_shared<ASTIdentifier>("DateTime");
columns_list->children.push_back(column_fire_status);
auto column_wend = std::make_shared<ASTColumnDeclaration>();
column_wend->name = "____w_end";
column_wend->type = std::make_shared<ASTIdentifier>("DateTime");
columns_list->children.push_back(column_wend);
new_columns_list->set(new_columns_list->columns, columns_list);
manual_create_query->set(manual_create_query->columns_list, new_columns_list);
@ -569,6 +616,13 @@ StorageWindowView::StorageWindowView(
select_table_id = StorageID(select_database_name, select_table_name);
inner_query = innerQueryParser(select_query);
final_query = inner_query->clone();
ParserProcTimeFinalMatcher::Data final_query_data;
ParserProcTimeFinalMatcher::Visitor(final_query_data).visit(final_query);
is_proctime_tumble = final_query_data.is_proctime_tumble;
if (is_proctime_tumble)
window_column_name = final_query_data.window_column_name;
/// If the table is not specified - use the table `system.one`
if (select_table_name.empty())
{
@ -581,7 +635,6 @@ StorageWindowView::StorageWindowView(
if (!query.to_table.empty())
target_table_id = StorageID(query.to_database, query.to_table);
is_temporary = query.temporary;
inner_table_clear_interval = local_context.getSettingsRef().window_view_inner_table_clean_interval.totalSeconds();
mergeable_blocks = std::make_shared<std::list<BlocksListPtr>>();
@ -590,6 +643,9 @@ StorageWindowView::StorageWindowView(
if (query.watermark_function)
{
if (is_proctime_tumble)
throw Exception("WATERMARK is not support for Processing time processing.", ErrorCodes::INCORRECT_QUERY);
// parser watermark function
const auto & watermark_function = std::static_pointer_cast<ASTFunction>(query.watermark_function);
if (!startsWith(watermark_function->name, "toInterval"))
@ -603,7 +659,7 @@ StorageWindowView::StorageWindowView(
{
watermark_num_units = boost::lexical_cast<int>(interval_units_p1->value.get<String>());
}
catch (const boost::bad_lexical_cast & exec)
catch (const boost::bad_lexical_cast &)
{
throw Exception("Cannot parse string '" + interval_units_p1->value.get<String>() + "' as Integer.", ErrorCodes::CANNOT_PARSE_TEXT);
}
@ -661,8 +717,6 @@ ASTPtr StorageWindowView::innerQueryParser(ASTSelectQuery & query)
// parser window function
ASTFunction & window_function = typeid_cast<ASTFunction &>(*stageMergeableOneData.window_function);
const auto & arguments = window_function.arguments->children;
const auto & arg0 = std::static_pointer_cast<ASTIdentifier>(arguments.at(0));
timestamp_column_name = arg0->IAST::getAliasOrColumnName();
const auto & arg1 = std::static_pointer_cast<ASTFunction>(arguments.at(1));
if (!arg1 || !startsWith(arg1->name, "toInterval"))
throw Exception("Illegal type of last argument of function " + arg1->name + " should be Interval", ErrorCodes::ILLEGAL_COLUMN);
@ -678,12 +732,17 @@ ASTPtr StorageWindowView::innerQueryParser(ASTSelectQuery & query)
void StorageWindowView::writeIntoWindowView(StorageWindowView & window_view, const Block & block, const Context & context)
{
UInt32 timestamp_now = std::time(nullptr);
auto block_stream = std::make_shared<OneBlockInputStream>(block);
BlockInputStreams streams;
if (window_view.is_proctime_tumble)
streams = {std::make_shared<AddingTimestampBlockInputStream>(block_stream, timestamp_now)};
else
streams = {block_stream};
BlockInputStreams streams = {std::make_shared<OneBlockInputStream>(block)};
auto window_proxy_storage = std::make_shared<WindowViewProxyStorage>(
StorageID("", "WindowViewProxyStorage"), window_view.getParentStorage(), std::move(streams), QueryProcessingStage::FetchColumns);
InterpreterSelectQuery select_block(
window_view.getInnerQuery(), context, window_proxy_storage, QueryProcessingStage::WithMergeableState);
window_view.getFinalQuery(), context, window_proxy_storage, QueryProcessingStage::WithMergeableState);
auto data_mergeable_stream = std::make_shared<MaterializingBlockInputStream>(select_block.execute().in);
@ -701,51 +760,58 @@ void StorageWindowView::writeIntoWindowView(StorageWindowView & window_view, con
actions_->add(ExpressionAction::removeColumn("____tuple_arg"));
BlockInputStreamPtr in_stream;
if (window_view.watermark_num_units != 0)
{
UInt32 watermark = window_view.getWatermark(timestamp_now);
actions_->add(
ExpressionAction::addColumn({std::make_shared<DataTypeUInt32>()->createColumnConst(1, toField(watermark)), std::make_shared<DataTypeDateTime>(), "____watermark"}));
actions_->add(ExpressionAction::addColumn({std::make_shared<DataTypeUInt32>()->createColumnConst(1, toField(watermark)),
std::make_shared<DataTypeDateTime>(),
"____watermark"}));
const auto & function_greater = FunctionFactory::instance().get("greaterOrEquals", context);
actions_->add(ExpressionAction::applyFunction(function_greater, Names{"____w_end", "____watermark"}, "____filter"));
actions_->add(ExpressionAction::removeColumn("____watermark"));
in_stream = std::make_shared<FilterBlockInputStream>(data_mergeable_stream, actions_, "____filter", true);
}
else
in_stream = std::make_shared<ExpressionBlockInputStream>(data_mergeable_stream, actions_);
if (!window_view.inner_table_id.empty())
{
auto stream = window_view.getInnerStorage()->write(window_view.getInnerQuery(), context);
if (window_view.watermark_num_units != 0)
if (window_view.is_proctime_tumble)
{
std::unique_lock lock(window_view.fire_signal_mutex);
copyData(*in_stream, *stream);
}
else if (window_view.watermark_num_units != 0)
{
while (Block block_ = in_stream->read())
{
auto column_wend = block_.getByName("____w_end").column;
stream->write(std::move(block_));
const ColumnUInt32::Container & wend_data
= static_cast<const ColumnUInt32 &>(*block_.getByName("____w_end").column.get()).getData();
= static_cast<const ColumnUInt32 &>(*column_wend).getData();
for (size_t i = 0; i < wend_data.size(); ++i)
{
if (wend_data[i] < timestamp_now)
{
window_view.addFireSignal(wend_data[i]);
}
}
}
}
else
copyData(*in_stream, *stream);
}
else
{
BlocksListPtr new_mergeable_blocks = std::make_shared<BlocksList>();
if (window_view.is_proctime_tumble)
{
std::unique_lock lock(window_view.fire_signal_mutex);
while (Block block_ = in_stream->read())
new_mergeable_blocks->push_back(std::move(block_));
}
if (window_view.watermark_num_units != 0)
{
while (Block block_ = in_stream->read())
{
auto column_wend = block_.getByName("____w_end").column;
new_mergeable_blocks->push_back(std::move(block_));
const ColumnUInt32::Container & wend_data
= static_cast<const ColumnUInt32 &>(*block_.getByName("____w_end").column.get()).getData();
= static_cast<const ColumnUInt32 &>(*column_wend).getData();
for (size_t i = 0; i < wend_data.size(); ++i)
{
if (wend_data[i] < timestamp_now)
@ -766,16 +832,6 @@ void StorageWindowView::writeIntoWindowView(StorageWindowView & window_view, con
}
}
StoragePtr StorageWindowView::getTargetTable() const
{
return global_context.getTable(target_table_id);
}
StoragePtr StorageWindowView::tryGetTargetTable() const
{
return global_context.tryGetTable(target_table_id);
}
void StorageWindowView::startup()
{
// Start the working thread
@ -802,26 +858,10 @@ StorageWindowView::~StorageWindowView()
BlockInputStreamPtr StorageWindowView::getNewBlocksInputStreamPtr(UInt32 timestamp_)
{
BlockInputStreamPtr stream;
if (!inner_table_id.empty())
return getNewBlocksInputStreamPtrInnerTable(timestamp_);
if (mergeable_blocks->empty())
return {std::make_shared<NullBlockInputStream>(getHeader())};
BlockInputStreams from;
auto sample_block_ = mergeable_blocks->front()->front().cloneEmpty();
BlockInputStreamPtr stream = std::make_shared<BlocksListInputStream>(mergeable_blocks, sample_block_, timestamp_);
from.push_back(std::move(stream));
auto proxy_storage = std::make_shared<WindowViewProxyStorage>(
StorageID("", "WindowViewProxyStorage"), getParentStorage(), std::move(from), QueryProcessingStage::WithMergeableState);
InterpreterSelectQuery select(getInnerQuery(), global_context, proxy_storage, QueryProcessingStage::Complete);
BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
return data;
}
BlockInputStreamPtr StorageWindowView::getNewBlocksInputStreamPtrInnerTable(UInt32 timestamp_)
{
{
auto & storage = getInnerStorage();
InterpreterSelectQuery fetch(fetch_column_query, global_context, storage, SelectQueryOptions(QueryProcessingStage::FetchColumns));
@ -829,18 +869,30 @@ BlockInputStreamPtr StorageWindowView::getNewBlocksInputStreamPtrInnerTable(UInt
columns_.emplace_back(nullptr, std::make_shared<DataTypeDateTime>(), "____w_end");
ExpressionActionsPtr actions_ = std::make_shared<ExpressionActions>(columns_, global_context);
actions_->add(ExpressionAction::addColumn({std::make_shared<DataTypeDateTime>()->createColumnConst(1, toField(timestamp_)), std::make_shared<DataTypeDateTime>(), "____w_end_now"}));
actions_->add(ExpressionAction::addColumn({std::make_shared<DataTypeDateTime>()->createColumnConst(1, toField(timestamp_)),
std::make_shared<DataTypeDateTime>(),
"____timestamp_now"}));
const auto & function_equals = FunctionFactory::instance().get("equals", global_context);
ExpressionActionsPtr apply_function_actions = std::make_shared<ExpressionActions>(columns_, global_context);
actions_->add(ExpressionAction::applyFunction(function_equals, Names{"____w_end", "____w_end_now"}, "____filter"));
auto stream = std::make_shared<FilterBlockInputStream>(fetch.execute().in, actions_, "____filter", true);
actions_->add(ExpressionAction::applyFunction(function_equals, Names{"____w_end", "____timestamp_now"}, "____filter"));
actions_->add(ExpressionAction::removeColumn("____w_end"));
actions_->add(ExpressionAction::removeColumn("____timestamp_now"));
stream = std::make_shared<FilterBlockInputStream>(fetch.execute().in, actions_, "____filter", true);
}
else
{
if (mergeable_blocks->empty())
return std::make_shared<NullBlockInputStream>(getHeader());
auto sample_block_ = mergeable_blocks->front()->front().cloneEmpty();
stream = std::make_shared<BlocksListInputStream>(mergeable_blocks, sample_block_, timestamp_);
}
BlockInputStreams from;
from.push_back(std::move(stream));
auto proxy_storage = std::make_shared<WindowViewProxyStorage>(
StorageID("", "WindowViewProxyStorage"), getParentStorage(), std::move(from), QueryProcessingStage::WithMergeableState);
InterpreterSelectQuery select(getInnerQuery(), global_context, proxy_storage, QueryProcessingStage::Complete);
InterpreterSelectQuery select(getFinalQuery(), global_context, proxy_storage, QueryProcessingStage::Complete);
BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
return data;
}

View File

@ -25,21 +25,11 @@ public:
~StorageWindowView() override;
String getName() const override { return "WindowView"; }
ASTPtr getInnerQuery() const { return inner_query->clone(); }
/// It is passed inside the query and solved at its level.
bool supportsSampling() const override { return true; }
bool supportsFinal() const override { return true; }
bool isTemporary() { return is_temporary; }
bool hasActiveUsers() { return active_ptr.use_count() > 1; }
void checkTableCanBeDropped() const override;
StoragePtr getTargetTable() const;
StoragePtr tryGetTargetTable() const;
void drop(TableStructureWriteLockHolder &) override;
void startup() override;
@ -56,14 +46,70 @@ public:
BlocksListPtrs getMergeableBlocksList() { return mergeable_blocks; }
std::shared_ptr<bool> getActivePtr() { return active_ptr; }
/// Read new data blocks that store query result
BlockInputStreamPtr getNewBlocksInputStreamPtr(UInt32 timestamp_);
BlockInputStreamPtr getNewBlocksInputStreamPtrInnerTable(UInt32 timestamp_);
static void writeIntoWindowView(StorageWindowView & window_view, const Block & block, const Context & context);
BlocksPtr getNewBlocks();
private:
ASTPtr inner_query;
ASTPtr final_query;
ASTPtr fetch_column_query;
Context & global_context;
bool is_proctime_tumble{false};
std::atomic<bool> shutdown_called{false};
mutable Block sample_block;
UInt64 inner_table_clear_interval;
const DateLUTImpl & time_zone;
std::list<UInt32> fire_signal;
std::list<WindowViewBlockInputStream *> watch_streams;
std::condition_variable condition;
BlocksListPtrs mergeable_blocks;
/// Mutex for the blocks and ready condition
std::mutex mutex;
std::mutex flush_table_mutex;
std::mutex fire_signal_mutex;
std::mutex proc_time_signal_mutex;
/// Active users
std::shared_ptr<bool> active_ptr;
IntervalKind::Kind window_kind;
IntervalKind::Kind watermark_kind;
Int64 window_num_units;
Int64 watermark_num_units = 0;
String window_column_name;
StorageID select_table_id = StorageID::createEmpty();
StorageID target_table_id = StorageID::createEmpty();
StorageID inner_table_id = StorageID::createEmpty();
StoragePtr parent_storage;
StoragePtr inner_storage;
StoragePtr target_storage;
BackgroundSchedulePool::TaskHolder toTableTask;
BackgroundSchedulePool::TaskHolder innerTableClearTask;
BackgroundSchedulePool::TaskHolder fireTask;
ASTPtr innerQueryParser(ASTSelectQuery & inner_query);
std::shared_ptr<ASTCreateQuery> generateInnerTableCreateQuery(const ASTCreateQuery & inner_create_query, const String & database_name, const String & table_name);
UInt32 getWindowLowerBound(UInt32 time_sec, int window_id_skew = 0);
UInt32 getWindowUpperBound(UInt32 time_sec, int window_id_skew = 0);
UInt32 getWatermark(UInt32 time_sec);
Block getHeader() const;
void flushToTable(UInt32 timestamp_);
void clearInnerTable();
void threadFuncToTable();
void threadFuncClearInnerTable();
void threadFuncFire();
void addFireSignal(UInt32 timestamp_);
ASTPtr getInnerQuery() const { return inner_query->clone(); }
ASTPtr getFinalQuery() const { return final_query->clone(); }
StoragePtr& getParentStorage()
{
@ -79,68 +125,12 @@ public:
return inner_storage;
}
static void writeIntoWindowView(StorageWindowView & window_view, const Block & block, const Context & context);
static void writeIntoWindowViewInnerTable(StorageWindowView & window_view, const Block & block, const Context & context);
ASTPtr innerQueryParser(ASTSelectQuery & inner_query);
std::shared_ptr<ASTCreateQuery> generateInnerTableCreateQuery(const ASTCreateQuery & inner_create_query, const String & database_name, const String & table_name);
inline UInt32 getWindowLowerBound(UInt32 time_sec, int window_id_skew = 0);
inline UInt32 getWindowUpperBound(UInt32 time_sec, int window_id_skew = 0);
inline UInt32 getWatermark(UInt32 time_sec);
private:
StorageID select_table_id = StorageID::createEmpty();
ASTPtr inner_query;
ASTPtr fetch_column_query;
String window_column_name;
String timestamp_column_name;
Context & global_context;
StoragePtr parent_storage;
StoragePtr inner_storage;
bool is_temporary{false};
mutable Block sample_block;
/// Mutex for the blocks and ready condition
std::mutex mutex;
std::mutex flush_table_mutex;
std::mutex fire_signal_mutex;
std::list<UInt32> fire_signal;
std::list<WindowViewBlockInputStream *> watch_streams;
/// New blocks ready condition to broadcast to readers
/// that new blocks are available
std::condition_variable condition;
/// Active users
std::shared_ptr<bool> active_ptr;
BlocksListPtrs mergeable_blocks;
IntervalKind::Kind window_kind;
Int64 window_num_units;
IntervalKind::Kind watermark_kind;
Int64 watermark_num_units = 0;
const DateLUTImpl & time_zone;
StorageID target_table_id = StorageID::createEmpty();
StorageID inner_table_id = StorageID::createEmpty();
void flushToTable(UInt32 timestamp_);
void clearInnerTable();
void threadFuncToTable();
void threadFuncClearInnerTable();
void threadFuncFire();
void addFireSignal(UInt32 timestamp_);
std::atomic<bool> shutdown_called{false};
UInt64 inner_table_clear_interval;
Poco::Timestamp timestamp;
BackgroundSchedulePool::TaskHolder toTableTask;
BackgroundSchedulePool::TaskHolder innerTableClearTask;
BackgroundSchedulePool::TaskHolder fireTask;
StoragePtr& getTargetStorage()
{
if (target_storage == nullptr && !target_table_id.empty())
target_storage = global_context.getTable(target_table_id);
return target_storage;
}
StorageWindowView(
const StorageID & table_id_,

View File

@ -4,6 +4,7 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataStreams/NullBlockInputStream.h>
#include <Storages/IStorage.h>
namespace DB
@ -12,11 +13,25 @@ namespace DB
class WindowViewProxyStorage : public IStorage
{
public:
WindowViewProxyStorage(const StorageID & table_id_, StoragePtr parent_storage_, QueryProcessingStage::Enum to_stage_)
: IStorage(table_id_)
, parent_storage(std::move(parent_storage_))
, streams({std::make_shared<NullBlockInputStream>(Block())})
, to_stage(to_stage_)
{
column_des = parent_storage->getColumns();
column_des.add({"____timestamp", std::make_shared<DataTypeDateTime>(), false});
}
WindowViewProxyStorage(const StorageID & table_id_, StoragePtr parent_storage_, BlockInputStreams streams_, QueryProcessingStage::Enum to_stage_)
: IStorage(table_id_)
, parent_storage(std::move(parent_storage_))
, streams(std::move(streams_))
, to_stage(to_stage_) {}
, to_stage(to_stage_)
{
column_des = parent_storage->getColumns();
column_des.add({"____timestamp", std::make_shared<DataTypeDateTime>(), false});
}
public:
std::string getName() const override { return "WindowViewProxyStorage(" + parent_storage->getName() + ")"; }
@ -53,17 +68,29 @@ public:
Names getColumnsRequiredForSampling() const override { return parent_storage->getColumnsRequiredForSampling(); }
Names getColumnsRequiredForFinal() const override { return parent_storage->getColumnsRequiredForFinal(); }
const ColumnsDescription & getColumns() const override { return parent_storage->getColumns(); }
const ColumnsDescription & getColumns() const override { return column_des; }
void setColumns(ColumnsDescription columns_) override { return parent_storage->setColumns(columns_); }
NameAndTypePair getColumn(const String & column_name) const override { return parent_storage->getColumn(column_name); }
NameAndTypePair getColumn(const String & column_name) const override
{
if (column_name == "____timestamp")
return {"____timestamp", std::shared_ptr<DataTypeDateTime>()};
return parent_storage->getColumn(column_name);
}
bool hasColumn(const String & column_name) const override { return parent_storage->hasColumn(column_name); }
bool hasColumn(const String & column_name) const override
{
if (column_name == "____timestamp")
return true;
return parent_storage->hasColumn(column_name);
}
private:
StoragePtr parent_storage;
BlockInputStreams streams;
String window_column_name;
ColumnsDescription column_des;
QueryProcessingStage::Enum to_stage;
};
}

View File

@ -0,0 +1,10 @@
--TUMBLE--
1
--HOP--
1
1
--INNER_TUMBLE--
1
--INNER_HOP--
1
1

View File

@ -0,0 +1,35 @@
SET allow_experimental_window_view = 1;
DROP TABLE IF EXISTS test.mt;
SELECT '--TUMBLE--';
DROP TABLE IF EXISTS test.wv;
CREATE TABLE test.mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple();
CREATE WINDOW VIEW test.wv WATERMARK INTERVAL '1' SECOND AS SELECT count(a) FROM test.mt GROUP BY TUMBLE(timestamp, INTERVAL '1' SECOND) AS wid;
INSERT INTO test.mt VALUES (1, now() + INTERVAL '1' SECOND);
WATCH test.wv LIMIT 1;
SELECT '--HOP--';
DROP TABLE IF EXISTS test.wv;
CREATE WINDOW VIEW test.wv WATERMARK INTERVAL '1' SECOND AS SELECT count(a) FROM test.mt GROUP BY HOP(timestamp, INTERVAL '1' SECOND, INTERVAL '2' SECOND) AS wid;
INSERT INTO test.mt VALUES (1, now() + INTERVAL '1' SECOND);
WATCH test.wv LIMIT 2;
SELECT '--INNER_TUMBLE--';
DROP TABLE IF EXISTS test.wv;
CREATE WINDOW VIEW test.wv ENGINE=MergeTree ORDER BY tuple() WATERMARK INTERVAL '1' SECOND AS SELECT count(a) FROM test.mt GROUP BY TUMBLE(timestamp, INTERVAL '1' SECOND) AS wid;
INSERT INTO test.mt VALUES (1, now() + INTERVAL '1' SECOND);
WATCH test.wv LIMIT 1;
SELECT '--INNER_HOP--';
DROP TABLE IF EXISTS test.wv;
CREATE WINDOW VIEW test.wv ENGINE=MergeTree ORDER BY tuple() WATERMARK INTERVAL '1' SECOND AS SELECT count(a) FROM test.mt GROUP BY HOP(timestamp, INTERVAL '1' SECOND, INTERVAL '2' SECOND) AS wid;
INSERT INTO test.mt VALUES (1, now() + INTERVAL '1' SECOND);
WATCH test.wv LIMIT 2;
DROP TABLE test.wv;
DROP TABLE test.mt;

View File

@ -1,13 +0,0 @@
SET allow_experimental_window_view = 1;
DROP TABLE IF EXISTS test.wv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple();
CREATE WINDOW VIEW test.wv AS SELECT count(a) FROM test.mt GROUP BY TUMBLE(timestamp, INTERVAL '1' SECOND) AS wid;
INSERT INTO test.mt VALUES (1, now());
WATCH test.wv LIMIT 1;
DROP TABLE test.wv;
DROP TABLE test.mt;

View File

@ -0,0 +1,14 @@
--TUMBLE--
0
1
--HOP--
0
1
1
--INNER_TUMBLE--
0
1
--INNER_HOP--
0
1
1

View File

@ -0,0 +1,47 @@
SET allow_experimental_window_view = 1;
DROP TABLE IF EXISTS test.mt;
DROP TABLE IF EXISTS test.dst;
DROP TABLE IF EXISTS test.`.inner.wv`;
CREATE TABLE test.dst(count UInt64) Engine=MergeTree ORDER BY tuple();
SELECT '--TUMBLE--';
DROP TABLE IF EXISTS test.wv;
TRUNCATE TABLE test.dst;
CREATE TABLE test.mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple();
CREATE WINDOW VIEW test.wv TO test.dst WATERMARK INTERVAL '1' SECOND AS SELECT count(a) AS count FROM test.mt GROUP BY TUMBLE(timestamp, INTERVAL '1' SECOND) AS wid;
INSERT INTO test.mt VALUES (1, now());
SELECT sleep(2);
SELECT count FROM test.dst;
SELECT '--HOP--';
DROP TABLE IF EXISTS test.wv;
TRUNCATE TABLE test.dst;
CREATE WINDOW VIEW test.wv TO test.dst WATERMARK INTERVAL '1' SECOND AS SELECT count(a) AS count FROM test.mt GROUP BY HOP(timestamp, INTERVAL '1' SECOND, INTERVAL '2' SECOND) AS wid;
INSERT INTO test.mt VALUES (1, now());
SELECT sleep(2);
SELECT count FROM test.dst;
SELECT '--INNER_TUMBLE--';
DROP TABLE IF EXISTS test.wv;
TRUNCATE TABLE test.dst;
CREATE WINDOW VIEW test.wv TO test.dst ENGINE=MergeTree ORDER BY tuple() WATERMARK INTERVAL '1' SECOND AS SELECT count(a) AS count FROM test.mt GROUP BY TUMBLE(timestamp, INTERVAL '1' SECOND) AS wid;
INSERT INTO test.mt VALUES (1, now());
SELECT sleep(2);
SELECT count FROM test.dst;
SELECT '--INNER_HOP--';
DROP TABLE IF EXISTS test.wv;
TRUNCATE TABLE test.dst;
CREATE WINDOW VIEW test.wv TO test.dst ENGINE=MergeTree ORDER BY tuple() WATERMARK INTERVAL '1' SECOND AS SELECT count(a) AS count FROM test.mt GROUP BY HOP(timestamp, INTERVAL '1' SECOND, INTERVAL '2' SECOND) AS wid;
INSERT INTO test.mt VALUES (1, now());
SELECT sleep(2);
SELECT count FROM test.dst;
DROP TABLE test.wv;
DROP TABLE test.mt;

View File

@ -1,13 +0,0 @@
SET allow_experimental_window_view = 1;
DROP TABLE IF EXISTS test.wv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple();
CREATE WINDOW VIEW test.wv AS SELECT count(a) FROM test.mt GROUP BY HOP(timestamp, INTERVAL '1' SECOND, INTERVAL '2' SECOND) AS wid;
INSERT INTO test.mt VALUES (1, now());
WATCH test.wv LIMIT 2;
DROP TABLE test.wv;
DROP TABLE test.mt;

View File

@ -1,13 +0,0 @@
SET allow_experimental_window_view = 1;
DROP TABLE IF EXISTS test.wv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple();
CREATE WINDOW VIEW test.wv ENGINE=MergeTree ORDER BY tuple() AS SELECT count(a) FROM test.mt GROUP BY TUMBLE(timestamp, INTERVAL '1' SECOND) AS wid;
INSERT INTO test.mt VALUES (1, now());
WATCH test.wv LIMIT 1;
DROP TABLE test.wv;
DROP TABLE test.mt;

View File

@ -0,0 +1,10 @@
--TUMBLE--
1
--HOP--
1
1
--INNER_TUMBLE--
1
--INNER_HOP--
1
1

View File

@ -0,0 +1,35 @@
SET allow_experimental_window_view = 1;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt(a Int32) ENGINE=MergeTree ORDER BY tuple();
SELECT '--TUMBLE--';
DROP TABLE IF EXISTS test.wv;
CREATE WINDOW VIEW test.wv AS SELECT count(a) FROM test.mt GROUP BY TUMBLE(now(), INTERVAL '1' SECOND) AS wid;
INSERT INTO test.mt VALUES (1);
WATCH test.wv LIMIT 1;
SELECT '--HOP--';
DROP TABLE IF EXISTS test.wv;
CREATE WINDOW VIEW test.wv AS SELECT count(a) FROM test.mt GROUP BY HOP(now(), INTERVAL '1' SECOND, INTERVAL '2' SECOND) AS wid;
INSERT INTO test.mt VALUES (1);
WATCH test.wv LIMIT 2;
SELECT '--INNER_TUMBLE--';
DROP TABLE IF EXISTS test.wv;
CREATE WINDOW VIEW test.wv ENGINE=MergeTree ORDER BY tuple() AS SELECT count(a) FROM test.mt GROUP BY TUMBLE(now(), INTERVAL '1' SECOND) AS wid;
INSERT INTO test.mt VALUES (1);
WATCH test.wv LIMIT 1;
SELECT '--INNER_HOP--';
DROP TABLE IF EXISTS test.wv;
CREATE WINDOW VIEW test.wv ENGINE=MergeTree ORDER BY tuple() AS SELECT count(a) FROM test.mt GROUP BY HOP(now(), INTERVAL '1' SECOND, INTERVAL '2' SECOND) AS wid;
INSERT INTO test.mt VALUES (1);
WATCH test.wv LIMIT 2;
DROP TABLE test.wv;
DROP TABLE test.mt;

View File

@ -1,13 +0,0 @@
SET allow_experimental_window_view = 1;
DROP TABLE IF EXISTS test.wv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple();
CREATE WINDOW VIEW test.wv ENGINE=MergeTree ORDER BY tuple() AS SELECT count(a) FROM test.mt GROUP BY HOP(timestamp, INTERVAL '1' SECOND, INTERVAL '2' SECOND) AS wid;
INSERT INTO test.mt VALUES (1, now());
WATCH test.wv LIMIT 2;
DROP TABLE test.wv;
DROP TABLE test.mt;

View File

@ -0,0 +1,14 @@
--TUMBLE--
0
1
--HOP--
0
1
1
--INNER_TUMBLE--
0
1
--INNER_HOP--
0
1
1

View File

@ -0,0 +1,47 @@
SET allow_experimental_window_view = 1;
DROP TABLE IF EXISTS test.mt;
DROP TABLE IF EXISTS test.dst;
DROP TABLE IF EXISTS test.`.inner.wv`;
CREATE TABLE test.dst(count UInt64) Engine=MergeTree ORDER BY tuple();
CREATE TABLE test.mt(a Int32) ENGINE=MergeTree ORDER BY tuple();
SELECT '--TUMBLE--';
DROP TABLE IF EXISTS test.wv;
TRUNCATE TABLE test.dst;
CREATE WINDOW VIEW test.wv TO test.dst AS SELECT count(a) AS count FROM test.mt GROUP BY TUMBLE(now(), INTERVAL '1' SECOND) AS wid;
INSERT INTO test.mt VALUES (1);
SELECT sleep(1);
SELECT count FROM test.dst;
SELECT '--HOP--';
DROP TABLE IF EXISTS test.wv;
TRUNCATE TABLE test.dst;
CREATE WINDOW VIEW test.wv TO test.dst AS SELECT count(a) AS count FROM test.mt GROUP BY HOP(now(), INTERVAL '1' SECOND, INTERVAL '2' SECOND) AS wid;
INSERT INTO test.mt VALUES (1);
SELECT sleep(2);
SELECT count FROM test.dst;
SELECT '--INNER_TUMBLE--';
DROP TABLE IF EXISTS test.wv;
TRUNCATE TABLE test.dst;
CREATE WINDOW VIEW test.wv TO test.dst ENGINE=MergeTree ORDER BY tuple() AS SELECT count(a) AS count FROM test.mt GROUP BY TUMBLE(now(), INTERVAL '1' SECOND) AS wid;
INSERT INTO test.mt VALUES (1);
SELECT sleep(1);
SELECT count FROM test.dst;
SELECT '--INNER_HOP--';
DROP TABLE IF EXISTS test.wv;
TRUNCATE TABLE test.dst;
CREATE WINDOW VIEW test.wv TO test.dst ENGINE=MergeTree ORDER BY tuple() AS SELECT count(a) AS count FROM test.mt GROUP BY HOP(now(), INTERVAL '1' SECOND, INTERVAL '2' SECOND) AS wid;
INSERT INTO test.mt VALUES (1);
SELECT sleep(2);
SELECT count FROM test.dst;
DROP TABLE test.wv;
DROP TABLE test.mt;

View File

@ -1,18 +0,0 @@
SET allow_experimental_window_view = 1;
DROP TABLE IF EXISTS test.wv;
DROP TABLE IF EXISTS test.mt;
DROP TABLE IF EXISTS test.dst;
DROP TABLE IF EXISTS test.`.inner.wv`;
CREATE TABLE test.mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple();
CREATE TABLE test.dst(count UInt64) Engine=MergeTree ORDER BY tuple();
CREATE WINDOW VIEW test.wv TO test.dst AS SELECT count(a) AS count FROM test.mt GROUP BY TUMBLE(timestamp, INTERVAL '1' SECOND) AS wid;
INSERT INTO test.mt VALUES (1, now());
SELECT sleep(1);
SELECT count FROM test.dst;
DROP TABLE test.wv;
DROP TABLE test.mt;
DROP TABLE test.dst;

View File

@ -1,18 +0,0 @@
SET allow_experimental_window_view = 1;
DROP TABLE IF EXISTS test.wv;
DROP TABLE IF EXISTS test.mt;
DROP TABLE IF EXISTS test.dst;
DROP TABLE IF EXISTS test.`.inner.wv`;
CREATE TABLE test.mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple();
CREATE TABLE test.dst(count UInt64) ENGINE=MergeTree ORDER BY tuple();
CREATE WINDOW VIEW test.wv TO test.dst AS SELECT count(a) AS count FROM test.mt GROUP BY HOP(timestamp, INTERVAL '1' SECOND, INTERVAL '2' SECOND) AS wid;
INSERT INTO test.mt VALUES (1, now());
SELECT sleep(3);
SELECT count FROM test.dst;
DROP TABLE test.wv;
DROP TABLE test.mt;
DROP TABLE test.dst;

View File

@ -1,18 +0,0 @@
SET allow_experimental_window_view = 1;
DROP TABLE IF EXISTS test.wv;
DROP TABLE IF EXISTS test.mt;
DROP TABLE IF EXISTS test.dst;
DROP TABLE IF EXISTS test.`.inner.wv`;
CREATE TABLE test.mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple();
CREATE TABLE test.dst(count UInt64) ENGINE=MergeTree ORDER BY tuple();
CREATE WINDOW VIEW test.wv TO test.dst ENGINE=MergeTree ORDER BY tuple() AS SELECT count(a) AS count FROM test.mt GROUP BY TUMBLE(timestamp, INTERVAL '1' SECOND) AS wid;
INSERT INTO test.mt VALUES (1, now());
SELECT sleep(1);
SELECT count FROM test.dst;
DROP TABLE test.wv;
DROP TABLE test.mt;
DROP TABLE test.dst;

View File

@ -1,18 +0,0 @@
SET allow_experimental_window_view = 1;
DROP TABLE IF EXISTS test.wv;
DROP TABLE IF EXISTS test.mt;
DROP TABLE IF EXISTS test.dst;
DROP TABLE IF EXISTS test.`.inner.wv`;
CREATE TABLE test.mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple();
CREATE TABLE test.dst(count UInt64) ENGINE=MergeTree ORDER BY tuple();
CREATE WINDOW VIEW test.wv TO test.dst ENGINE=MergeTree ORDER BY tuple() AS SELECT count(a) AS count FROM test.mt GROUP BY HOP(timestamp, INTERVAL '1' SECOND, INTERVAL '2' SECOND) AS wid;
INSERT INTO test.mt VALUES (1, now());
SELECT sleep(3);
SELECT count FROM test.dst;
DROP TABLE test.wv;
DROP TABLE test.mt;
DROP TABLE test.dst;