update code style

This commit is contained in:
Vxider 2020-03-24 10:46:56 +08:00
parent 2430521fba
commit 8802f81bc9
2 changed files with 109 additions and 114 deletions

View File

@ -97,13 +97,13 @@ namespace
is_hop = true; is_hop = true;
window_function = node.clone(); window_function = node.clone();
timestamp_column_name = node.arguments->children[0]->getColumnName(); timestamp_column_name = node.arguments->children[0]->getColumnName();
auto ptr_ = node.clone(); auto ptr = node.clone();
std::static_pointer_cast<ASTFunction>(ptr_)->setAlias(""); std::static_pointer_cast<ASTFunction>(ptr)->setAlias("");
auto arrayJoin = makeASTFunction("arrayJoin", ptr_); auto array_join = makeASTFunction("arrayJoin", ptr);
arrayJoin->alias = node.alias; array_join->alias = node.alias;
node_ptr = arrayJoin; node_ptr = array_join;
window_column_name = arrayJoin->getColumnName(); window_column_name = array_join->getColumnName();
window_column_alias = arrayJoin->alias; window_column_alias = array_join->alias;
} }
else if (serializeAST(node) != serializeAST(*window_function)) else if (serializeAST(node) != serializeAST(*window_function))
throw Exception("WINDOW VIEW only support ONE WINDOW FUNCTION", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW); throw Exception("WINDOW VIEW only support ONE WINDOW FUNCTION", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW);
@ -225,19 +225,19 @@ namespace
auto function_equal = makeASTFunction( auto function_equal = makeASTFunction(
"less", std::make_shared<ASTIdentifier>("____w_end"), std::make_shared<ASTLiteral>(timestamp)); "less", std::make_shared<ASTIdentifier>("____w_end"), std::make_shared<ASTLiteral>(timestamp));
auto alterCommand = std::make_shared<ASTAlterCommand>(); auto alter_command = std::make_shared<ASTAlterCommand>();
alterCommand->type = ASTAlterCommand::DELETE; alter_command->type = ASTAlterCommand::DELETE;
alterCommand->predicate = function_equal; alter_command->predicate = function_equal;
alterCommand->children.push_back(alterCommand->predicate); alter_command->children.push_back(alter_command->predicate);
auto alterCommandList = std::make_shared<ASTAlterCommandList>(); auto alter_command_list = std::make_shared<ASTAlterCommandList>();
alterCommandList->add(alterCommand); alter_command_list->add(alter_command);
auto alterQuery = std::make_shared<ASTAlterQuery>(); auto alter_query = std::make_shared<ASTAlterQuery>();
alterQuery->database = inner_table_id.database_name; alter_query->database = inner_table_id.database_name;
alterQuery->table = inner_table_id.table_name; alter_query->table = inner_table_id.table_name;
alterQuery->set(alterQuery->command_list, alterCommandList); alter_query->set(alter_query->command_list, alter_command_list);
return alterQuery; return alter_query;
} }
std::shared_ptr<ASTSelectQuery> generateFetchColumnsQuery(const StorageID & inner_storage) std::shared_ptr<ASTSelectQuery> generateFetchColumnsQuery(const StorageID & inner_storage)
@ -246,9 +246,6 @@ namespace
auto select = std::make_shared<ASTExpressionList>(); auto select = std::make_shared<ASTExpressionList>();
select->children.push_back(std::make_shared<ASTAsterisk>()); select->children.push_back(std::make_shared<ASTAsterisk>());
res_query->setExpression(ASTSelectQuery::Expression::SELECT, select); res_query->setExpression(ASTSelectQuery::Expression::SELECT, select);
auto tableInSelectQuery = std::make_shared<ASTTablesInSelectQuery>();
auto tableInSelectQueryElement = std::make_shared<ASTTablesInSelectQueryElement>();
res_query->setExpression(ASTSelectQuery::Expression::TABLES, std::make_shared<ASTTablesInSelectQuery>()); res_query->setExpression(ASTSelectQuery::Expression::TABLES, std::make_shared<ASTTablesInSelectQuery>());
auto tables = res_query->tables(); auto tables = res_query->tables();
auto tables_elem = std::make_shared<ASTTablesInSelectQueryElement>(); auto tables_elem = std::make_shared<ASTTablesInSelectQueryElement>();
@ -360,8 +357,8 @@ bool StorageWindowView::optimize(const ASTPtr & query, const ASTPtr & partition,
Pipes StorageWindowView::blocksToPipes(BlocksList & blocks, Block & sample_block) Pipes StorageWindowView::blocksToPipes(BlocksList & blocks, Block & sample_block)
{ {
Pipes pipes; Pipes pipes;
for (auto & block_ : blocks) for (auto & block : blocks)
pipes.emplace_back(std::make_shared<SourceFromSingleChunk>(sample_block, Chunk(block_.getColumns(), block_.rows()))); pipes.emplace_back(std::make_shared<SourceFromSingleChunk>(sample_block, Chunk(block.getColumns(), block.rows())));
return pipes; return pipes;
} }
@ -402,11 +399,11 @@ inline void StorageWindowView::cleanCache()
else else
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
mergeable_blocks.remove_if([w_bound](Block & block_) mergeable_blocks.remove_if([w_bound](Block & block)
{ {
auto & column_ = block_.getByName("____w_end").column; auto & column = block.getByName("____w_end").column;
const auto & data = static_cast<const ColumnUInt32 &>(*column_).getData(); const auto & data = static_cast<const ColumnUInt32 &>(*column).getData();
for (size_t i = 0; i < column_->size(); ++i) for (size_t i = 0; i < column->size(); ++i)
{ {
if (data[i] >= w_bound) if (data[i] >= w_bound)
return false; return false;
@ -433,12 +430,12 @@ inline void StorageWindowView::fire(UInt32 watermark)
if (target_table_id.empty()) if (target_table_id.empty())
{ {
in_stream->readPrefix(); in_stream->readPrefix();
while (auto block_ = in_stream->read()) while (auto block = in_stream->read())
{ {
for (auto & watch_stream : watch_streams) for (auto & watch_stream : watch_streams)
{ {
if (auto watch_stream_ = watch_stream.lock()) if (auto watch_stream_ptr = watch_stream.lock())
watch_stream_->addBlock(block_); watch_stream_ptr->addBlock(block);
} }
} }
in_stream->readSuffix(); in_stream->readSuffix();
@ -448,18 +445,18 @@ inline void StorageWindowView::fire(UInt32 watermark)
try try
{ {
StoragePtr target_table = getTargetStorage(); StoragePtr target_table = getTargetStorage();
auto _lock = target_table->lockStructureForShare(true, global_context.getCurrentQueryId()); auto lock = target_table->lockStructureForShare(true, global_context.getCurrentQueryId());
auto out_stream = target_table->write(getInnerQuery(), global_context); auto out_stream = target_table->write(getInnerQuery(), global_context);
in_stream->readPrefix(); in_stream->readPrefix();
out_stream->writePrefix(); out_stream->writePrefix();
while (auto block_ = in_stream->read()) while (auto block = in_stream->read())
{ {
for (auto & watch_stream : watch_streams) for (auto & watch_stream : watch_streams)
{ {
if (auto watch_stream_ = watch_stream.lock()) if (const auto & watch_stream_ptr = watch_stream.lock())
watch_stream_->addBlock(block_); watch_stream_ptr->addBlock(block);
} }
out_stream->write(std::move(block_)); out_stream->write(std::move(block));
} }
in_stream->readSuffix(); in_stream->readSuffix();
out_stream->writeSuffix(); out_stream->writeSuffix();
@ -481,7 +478,7 @@ std::shared_ptr<ASTCreateQuery> StorageWindowView::generateInnerTableCreateQuery
auto new_columns_list = std::make_shared<ASTColumns>(); auto new_columns_list = std::make_shared<ASTColumns>();
auto sample_block_ auto t_sample_block
= InterpreterSelectQuery(getInnerQuery(), global_context, getParentStorage(), SelectQueryOptions(QueryProcessingStage::WithMergeableState)) = InterpreterSelectQuery(getInnerQuery(), global_context, getParentStorage(), SelectQueryOptions(QueryProcessingStage::WithMergeableState))
.getSampleBlock(); .getSampleBlock();
@ -496,13 +493,13 @@ std::shared_ptr<ASTCreateQuery> StorageWindowView::generateInnerTableCreateQuery
columns_list->children.push_back(column_window); columns_list->children.push_back(column_window);
} }
for (auto & column_ : sample_block_.getColumnsWithTypeAndName()) for (auto & column : t_sample_block.getColumnsWithTypeAndName())
{ {
ParserIdentifierWithOptionalParameters parser; ParserIdentifierWithOptionalParameters parser;
String sql = column_.type->getName(); String sql = column.type->getName();
ASTPtr ast = parseQuery(parser, sql.data(), sql.data() + sql.size(), "data type", 0); ASTPtr ast = parseQuery(parser, sql.data(), sql.data() + sql.size(), "data type", 0);
auto column_dec = std::make_shared<ASTColumnDeclaration>(); auto column_dec = std::make_shared<ASTColumnDeclaration>();
column_dec->name = column_.name; column_dec->name = column.name;
column_dec->type = ast; column_dec->type = ast;
columns_list->children.push_back(column_dec); columns_list->children.push_back(column_dec);
} }
@ -568,13 +565,13 @@ std::shared_ptr<ASTCreateQuery> StorageWindowView::generateInnerTableCreateQuery
inline UInt32 StorageWindowView::getWindowLowerBound(UInt32 time_sec) inline UInt32 StorageWindowView::getWindowLowerBound(UInt32 time_sec)
{ {
IntervalKind window_kind_; IntervalKind window_interval_kind;
if (is_tumble) if (is_tumble)
window_kind_ = window_kind; window_interval_kind = window_kind;
else else
window_kind_ = hop_kind; window_interval_kind = hop_kind;
switch (window_kind_) switch (window_interval_kind)
{ {
#define CASE_WINDOW_KIND(KIND) \ #define CASE_WINDOW_KIND(KIND) \
case IntervalKind::KIND: \ case IntervalKind::KIND: \
@ -687,7 +684,7 @@ void StorageWindowView::threadFuncCleanCache()
} }
} }
if (!shutdown_called) if (!shutdown_called)
cleanCacheTask->scheduleAfter(RESCHEDULE_MS); clean_cache_task->scheduleAfter(RESCHEDULE_MS);
} }
void StorageWindowView::threadFuncFireProc() void StorageWindowView::threadFuncFireProc()
@ -707,7 +704,7 @@ void StorageWindowView::threadFuncFireProc()
fire_signal_condition.wait_for(lock, std::chrono::microseconds(static_cast<UInt64>(next_fire_signal) * 1000000 - timestamp_usec)); fire_signal_condition.wait_for(lock, std::chrono::microseconds(static_cast<UInt64>(next_fire_signal) * 1000000 - timestamp_usec));
} }
if (!shutdown_called) if (!shutdown_called)
fireTask->scheduleAfter(RESCHEDULE_MS); fire_task->scheduleAfter(RESCHEDULE_MS);
} }
void StorageWindowView::threadFuncFireEvent() void StorageWindowView::threadFuncFireEvent()
@ -726,7 +723,7 @@ void StorageWindowView::threadFuncFireEvent()
} }
} }
if (!shutdown_called) if (!shutdown_called)
fireTask->scheduleAfter(RESCHEDULE_MS); fire_task->scheduleAfter(RESCHEDULE_MS);
} }
BlockInputStreams StorageWindowView::watch( BlockInputStreams StorageWindowView::watch(
@ -778,9 +775,7 @@ StorageWindowView::StorageWindowView(
if (query.select->list_of_selects->children.size() != 1) if (query.select->list_of_selects->children.size() != 1)
throw Exception("UNION is not supported for Window View", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW); throw Exception("UNION is not supported for Window View", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW);
auto inner_query_ = query.select->list_of_selects->children.at(0); ASTSelectQuery & select_query = typeid_cast<ASTSelectQuery &>(*query.select->list_of_selects->children.at(0));
ASTSelectQuery & select_query = typeid_cast<ASTSelectQuery &>(*inner_query_);
String select_database_name = local_context.getCurrentDatabase(); String select_database_name = local_context.getCurrentDatabase();
String select_table_name; String select_table_name;
extractDependentTable(select_query, select_database_name, select_table_name); extractDependentTable(select_query, select_database_name, select_table_name);
@ -895,27 +890,27 @@ StorageWindowView::StorageWindowView(
{ {
// write expressions // write expressions
ColumnsWithTypeAndName columns__; ColumnsWithTypeAndName t_columns;
columns__.emplace_back( t_columns.emplace_back(
nullptr, nullptr,
std::make_shared<DataTypeTuple>(DataTypes{std::make_shared<DataTypeDateTime>(), std::make_shared<DataTypeDateTime>()}), std::make_shared<DataTypeTuple>(DataTypes{std::make_shared<DataTypeDateTime>(), std::make_shared<DataTypeDateTime>()}),
window_column_name); window_column_name);
columns__.emplace_back(nullptr, std::make_shared<DataTypeDateTime>(), "____timestamp"); t_columns.emplace_back(nullptr, std::make_shared<DataTypeDateTime>(), "____timestamp");
const auto & function_tuple = FunctionFactory::instance().get("tupleElement", global_context); const auto & function_tuple = FunctionFactory::instance().get("tupleElement", global_context);
writeExpressions = std::make_shared<ExpressionActions>(columns__, global_context); write_expressions = std::make_shared<ExpressionActions>(t_columns, global_context);
writeExpressions->add(ExpressionAction::addColumn( write_expressions->add(ExpressionAction::addColumn(
{std::make_shared<DataTypeUInt8>()->createColumnConst(1, toField(2)), std::make_shared<DataTypeUInt8>(), "____tuple_arg"})); {std::make_shared<DataTypeUInt8>()->createColumnConst(1, toField(2)), std::make_shared<DataTypeUInt8>(), "____tuple_arg"}));
writeExpressions->add(ExpressionAction::applyFunction(function_tuple, Names{window_column_name, "____tuple_arg"}, "____w_end")); write_expressions->add(ExpressionAction::applyFunction(function_tuple, Names{window_column_name, "____tuple_arg"}, "____w_end"));
writeExpressions->add(ExpressionAction::removeColumn("____tuple_arg")); write_expressions->add(ExpressionAction::removeColumn("____tuple_arg"));
} }
cleanCacheTask = global_context.getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncCleanCache(); }); clean_cache_task = global_context.getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncCleanCache(); });
if (is_proctime) if (is_proctime)
fireTask = global_context.getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncFireProc(); }); fire_task = global_context.getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncFireProc(); });
else else
fireTask = global_context.getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncFireEvent(); }); fire_task = global_context.getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncFireEvent(); });
cleanCacheTask->deactivate(); clean_cache_task->deactivate();
fireTask->deactivate(); fire_task->deactivate();
} }
@ -927,17 +922,17 @@ ASTPtr StorageWindowView::innerQueryParser(ASTSelectQuery & query)
// parse stage mergeable // parse stage mergeable
ASTPtr result = query.clone(); ASTPtr result = query.clone();
ASTPtr expr_list = result; ASTPtr expr_list = result;
StageMergeableVisitorData stageMergeableData; StageMergeableVisitorData stage_mergeable_data;
InDepthNodeVisitor<OneTypeMatcher<StageMergeableVisitorData, false>, true>(stageMergeableData).visit(expr_list); InDepthNodeVisitor<OneTypeMatcher<StageMergeableVisitorData, false>, true>(stage_mergeable_data).visit(expr_list);
if (!stageMergeableData.is_tumble && !stageMergeableData.is_hop) if (!stage_mergeable_data.is_tumble && !stage_mergeable_data.is_hop)
throw Exception("WINDOW FUNCTION is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY); throw Exception("WINDOW FUNCTION is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY);
window_column_name = stageMergeableData.window_column_name; window_column_name = stage_mergeable_data.window_column_name;
window_column_alias = stageMergeableData.window_column_alias; window_column_alias = stage_mergeable_data.window_column_alias;
timestamp_column_name = stageMergeableData.timestamp_column_name; timestamp_column_name = stage_mergeable_data.timestamp_column_name;
is_tumble = stageMergeableData.is_tumble; is_tumble = stage_mergeable_data.is_tumble;
// parser window function // parser window function
ASTFunction & window_function = typeid_cast<ASTFunction &>(*stageMergeableData.window_function); ASTFunction & window_function = typeid_cast<ASTFunction &>(*stage_mergeable_data.window_function);
const auto & arguments = window_function.arguments->children; const auto & arguments = window_function.arguments->children;
const auto & arg1 = std::static_pointer_cast<ASTFunction>(arguments.at(1)); const auto & arg1 = std::static_pointer_cast<ASTFunction>(arguments.at(1));
if (!arg1 || !startsWith(arg1->name, "toInterval")) if (!arg1 || !startsWith(arg1->name, "toInterval"))
@ -980,87 +975,87 @@ void StorageWindowView::writeIntoWindowView(StorageWindowView & window_view, con
InterpreterSelectQuery select_block(window_view.getFinalQuery(), context, {std::move(pipe)}, QueryProcessingStage::WithMergeableState); InterpreterSelectQuery select_block(window_view.getFinalQuery(), context, {std::move(pipe)}, QueryProcessingStage::WithMergeableState);
source_stream = select_block.execute().in; source_stream = select_block.execute().in;
source_stream = std::make_shared<ExpressionBlockInputStream>(source_stream, window_view.writeExpressions); source_stream = std::make_shared<ExpressionBlockInputStream>(source_stream, window_view.write_expressions);
source_stream = std::make_shared<SquashingBlockInputStream>( source_stream = std::make_shared<SquashingBlockInputStream>(
source_stream, context.getSettingsRef().min_insert_block_size_rows, context.getSettingsRef().min_insert_block_size_bytes); source_stream, context.getSettingsRef().min_insert_block_size_rows, context.getSettingsRef().min_insert_block_size_bytes);
} }
else else
{ {
UInt32 max_fired_watermark_ = 0; UInt32 t_max_fired_watermark = 0;
if (window_view.allowed_lateness) if (window_view.allowed_lateness)
{ {
UInt32 max_timestamp_ = 0; UInt32 t_max_timstamp = 0;
UInt32 max_watermark_ = 0; UInt32 t_max_watermark = 0;
{ {
std::lock_guard lock(window_view.fire_signal_mutex); std::lock_guard lock(window_view.fire_signal_mutex);
max_fired_watermark_ = window_view.max_fired_watermark; t_max_fired_watermark = window_view.max_fired_watermark;
max_watermark_ = window_view.max_watermark; t_max_watermark = window_view.max_watermark;
max_timestamp_ = window_view.max_timestamp; t_max_timstamp = window_view.max_timestamp;
} }
if (max_timestamp_!= 0) if (t_max_timstamp!= 0)
{ {
UInt32 lateness_bound UInt32 lateness_bound
= addTime(max_timestamp_, window_view.lateness_kind, -1 * window_view.lateness_num_units, window_view.time_zone); = addTime(t_max_timstamp, window_view.lateness_kind, -1 * window_view.lateness_num_units, window_view.time_zone);
if (window_view.is_watermark_bounded) if (window_view.is_watermark_bounded)
{ {
UInt32 watermark_lower_bound = window_view.is_tumble UInt32 watermark_lower_bound = window_view.is_tumble
? addTime(max_watermark_, window_view.window_kind, -1 * window_view.window_num_units, window_view.time_zone) ? addTime(t_max_watermark, window_view.window_kind, -1 * window_view.window_num_units, window_view.time_zone)
: addTime(max_watermark_, window_view.hop_kind, -1 * window_view.hop_num_units, window_view.time_zone); : addTime(t_max_watermark, window_view.hop_kind, -1 * window_view.hop_num_units, window_view.time_zone);
if (watermark_lower_bound < lateness_bound) if (watermark_lower_bound < lateness_bound)
lateness_bound = watermark_lower_bound; lateness_bound = watermark_lower_bound;
} }
ColumnsWithTypeAndName columns__; ColumnsWithTypeAndName columns;
columns__.emplace_back(nullptr, std::make_shared<DataTypeDateTime>(), window_view.timestamp_column_name); columns.emplace_back(nullptr, std::make_shared<DataTypeDateTime>(), window_view.timestamp_column_name);
ExpressionActionsPtr filterExpressions = std::make_shared<ExpressionActions>(columns__, context); ExpressionActionsPtr filter_expressions = std::make_shared<ExpressionActions>(columns, context);
filterExpressions->add( filter_expressions->add(
ExpressionAction::addColumn({std::make_shared<DataTypeDateTime>()->createColumnConst(1, toField(lateness_bound)), ExpressionAction::addColumn({std::make_shared<DataTypeDateTime>()->createColumnConst(1, toField(lateness_bound)),
std::make_shared<DataTypeDateTime>(), std::make_shared<DataTypeDateTime>(),
"____lateness_bound"})); "____lateness_bound"}));
const auto & function_greater = FunctionFactory::instance().get("greaterOrEquals", context); const auto & function_greater = FunctionFactory::instance().get("greaterOrEquals", context);
filterExpressions->add(ExpressionAction::applyFunction( filter_expressions->add(ExpressionAction::applyFunction(
function_greater, Names{window_view.timestamp_column_name, "____lateness_bound"}, "____filter")); function_greater, Names{window_view.timestamp_column_name, "____lateness_bound"}, "____filter"));
pipe.addSimpleTransform(std::make_shared<FilterTransform>(pipe.getHeader(), filterExpressions, "____filter", true)); pipe.addSimpleTransform(std::make_shared<FilterTransform>(pipe.getHeader(), filter_expressions, "____filter", true));
} }
} }
UInt32 max_timestamp__ = 0; UInt32 t_max_timstamp = 0;
if (!window_view.is_tumble || window_view.is_watermark_bounded || window_view.allowed_lateness) if (!window_view.is_tumble || window_view.is_watermark_bounded || window_view.allowed_lateness)
{ {
auto & column_timestamp = block.getByName(window_view.timestamp_column_name).column; auto & column_timestamp = block.getByName(window_view.timestamp_column_name).column;
const ColumnUInt32::Container & timestamp_data = static_cast<const ColumnUInt32 &>(*column_timestamp).getData(); const ColumnUInt32::Container & timestamp_data = static_cast<const ColumnUInt32 &>(*column_timestamp).getData();
for (auto& timestamp_ : timestamp_data) for (auto& timestamp : timestamp_data)
{ {
if (timestamp_ > max_timestamp__) if (timestamp > t_max_timstamp)
max_timestamp__ = timestamp_; t_max_timstamp = timestamp;
} }
} }
InterpreterSelectQuery select_block(window_view.getFinalQuery(), context, {std::move(pipe)}, QueryProcessingStage::WithMergeableState); InterpreterSelectQuery select_block(window_view.getFinalQuery(), context, {std::move(pipe)}, QueryProcessingStage::WithMergeableState);
source_stream = select_block.execute().in; source_stream = select_block.execute().in;
source_stream = std::make_shared<ExpressionBlockInputStream>(source_stream, window_view.writeExpressions); source_stream = std::make_shared<ExpressionBlockInputStream>(source_stream, window_view.write_expressions);
source_stream = std::make_shared<SquashingBlockInputStream>( source_stream = std::make_shared<SquashingBlockInputStream>(
source_stream, context.getSettingsRef().min_insert_block_size_rows, context.getSettingsRef().min_insert_block_size_bytes); source_stream, context.getSettingsRef().min_insert_block_size_rows, context.getSettingsRef().min_insert_block_size_bytes);
if (!window_view.is_tumble) if (!window_view.is_tumble)
source_stream source_stream
= std::make_shared<WatermarkBlockInputStream>(source_stream, window_view, window_view.getWindowUpperBound(max_timestamp__)); = std::make_shared<WatermarkBlockInputStream>(source_stream, window_view, window_view.getWindowUpperBound(t_max_timstamp));
else else
source_stream = std::make_shared<WatermarkBlockInputStream>(source_stream, window_view); source_stream = std::make_shared<WatermarkBlockInputStream>(source_stream, window_view);
if (window_view.is_watermark_bounded || window_view.allowed_lateness) if (window_view.is_watermark_bounded || window_view.allowed_lateness)
std::static_pointer_cast<WatermarkBlockInputStream>(source_stream)->setMaxTimestamp(max_timestamp__); std::static_pointer_cast<WatermarkBlockInputStream>(source_stream)->setMaxTimestamp(t_max_timstamp);
if (window_view.allowed_lateness && max_fired_watermark_ != 0) if (window_view.allowed_lateness && t_max_fired_watermark != 0)
std::static_pointer_cast<WatermarkBlockInputStream>(source_stream)->setAllowedLateness(max_fired_watermark_); std::static_pointer_cast<WatermarkBlockInputStream>(source_stream)->setAllowedLateness(t_max_fired_watermark);
} }
if (!window_view.inner_table_id.empty()) if (!window_view.inner_table_id.empty())
{ {
auto & inner_storage = window_view.getInnerStorage(); auto & inner_storage = window_view.getInnerStorage();
auto lock_ = inner_storage->lockStructureForShare(true, context.getCurrentQueryId()); auto lock = inner_storage->lockStructureForShare(true, context.getCurrentQueryId());
auto stream = inner_storage->write(window_view.getInnerQuery(), context); auto stream = inner_storage->write(window_view.getInnerQuery(), context);
copyData(*source_stream, *stream); copyData(*source_stream, *stream);
} }
@ -1069,8 +1064,8 @@ void StorageWindowView::writeIntoWindowView(StorageWindowView & window_view, con
source_stream->readPrefix(); source_stream->readPrefix();
{ {
std::lock_guard lock(window_view.mutex); std::lock_guard lock(window_view.mutex);
while (Block block_ = source_stream->read()) while (Block t_block = source_stream->read())
window_view.mergeable_blocks.push_back(std::move(block_)); window_view.mergeable_blocks.push_back(std::move(t_block));
} }
source_stream->readSuffix(); source_stream->readSuffix();
} }
@ -1079,8 +1074,8 @@ void StorageWindowView::writeIntoWindowView(StorageWindowView & window_view, con
void StorageWindowView::startup() void StorageWindowView::startup()
{ {
// Start the working thread // Start the working thread
cleanCacheTask->activateAndSchedule(); clean_cache_task->activateAndSchedule();
fireTask->activateAndSchedule(); fire_task->activateAndSchedule();
} }
void StorageWindowView::shutdown() void StorageWindowView::shutdown()
@ -1088,8 +1083,8 @@ void StorageWindowView::shutdown()
bool expected = false; bool expected = false;
if (!shutdown_called.compare_exchange_strong(expected, true)) if (!shutdown_called.compare_exchange_strong(expected, true))
return; return;
cleanCacheTask->deactivate(); clean_cache_task->deactivate();
fireTask->deactivate(); fire_task->deactivate();
} }
StorageWindowView::~StorageWindowView() StorageWindowView::~StorageWindowView()
@ -1159,21 +1154,21 @@ BlockInputStreamPtr StorageWindowView::getNewBlocksInputStreamPtr(UInt32 waterma
pipes = blocksToPipes(mergeable_blocks, getMergeableHeader()); pipes = blocksToPipes(mergeable_blocks, getMergeableHeader());
} }
ColumnsWithTypeAndName columns_; ColumnsWithTypeAndName t_columns;
columns_.emplace_back(nullptr, std::make_shared<DataTypeDateTime>(), "____w_end"); t_columns.emplace_back(nullptr, std::make_shared<DataTypeDateTime>(), "____w_end");
ExpressionActionsPtr actions_ = std::make_shared<ExpressionActions>(columns_, global_context); ExpressionActionsPtr actions = std::make_shared<ExpressionActions>(t_columns, global_context);
actions_->add(ExpressionAction::addColumn({std::make_shared<DataTypeDateTime>()->createColumnConst(1, toField(watermark)), actions->add(ExpressionAction::addColumn({std::make_shared<DataTypeDateTime>()->createColumnConst(1, toField(watermark)),
std::make_shared<DataTypeDateTime>(), std::make_shared<DataTypeDateTime>(),
"____watermark"})); "____watermark"}));
const auto & function_equals = FunctionFactory::instance().get("equals", global_context); const auto & function_equals = FunctionFactory::instance().get("equals", global_context);
ExpressionActionsPtr apply_function_actions = std::make_shared<ExpressionActions>(columns_, global_context); ExpressionActionsPtr apply_function_actions = std::make_shared<ExpressionActions>(t_columns, global_context);
actions_->add(ExpressionAction::applyFunction(function_equals, Names{"____w_end", "____watermark"}, "____filter")); actions->add(ExpressionAction::applyFunction(function_equals, Names{"____w_end", "____watermark"}, "____filter"));
actions_->add(ExpressionAction::removeColumn("____w_end")); actions->add(ExpressionAction::removeColumn("____w_end"));
actions_->add(ExpressionAction::removeColumn("____watermark")); actions->add(ExpressionAction::removeColumn("____watermark"));
for (auto & pipe : pipes) for (auto & pipe : pipes)
pipe.addSimpleTransform(std::make_shared<FilterTransform>(pipe.getHeader(), actions_, pipe.addSimpleTransform(std::make_shared<FilterTransform>(pipe.getHeader(), actions,
"____filter", true)); "____filter", true));
auto proxy_storage = std::make_shared<WindowViewProxyStorage>( auto proxy_storage = std::make_shared<WindowViewProxyStorage>(

View File

@ -105,10 +105,10 @@ private:
mutable StoragePtr inner_storage; mutable StoragePtr inner_storage;
mutable StoragePtr target_storage; mutable StoragePtr target_storage;
BackgroundSchedulePool::TaskHolder cleanCacheTask; BackgroundSchedulePool::TaskHolder clean_cache_task;
BackgroundSchedulePool::TaskHolder fireTask; BackgroundSchedulePool::TaskHolder fire_task;
ExpressionActionsPtr writeExpressions; ExpressionActionsPtr write_expressions;
ASTPtr innerQueryParser(ASTSelectQuery & inner_query); ASTPtr innerQueryParser(ASTSelectQuery & inner_query);