filter outdated data before aggregating

This commit is contained in:
Vxider 2020-07-15 14:18:57 +08:00
parent 563c4fb382
commit db8f64a573

View File

@ -994,6 +994,51 @@ void StorageWindowView::writeIntoWindowView(StorageWindowView & window_view, con
Pipe pipe(std::make_shared<SourceFromSingleChunk>(block.cloneEmpty(), Chunk(block.getColumns(), block.rows())));
BlockInputStreamPtr source_stream;
UInt32 lateness_bound = 0;
UInt32 t_max_watermark = 0;
UInt32 t_max_timestamp = 0;
UInt32 t_max_fired_watermark = 0;
{
std::lock_guard lock(window_view.fire_signal_mutex);
t_max_fired_watermark = window_view.max_fired_watermark;
t_max_watermark = window_view.max_watermark;
t_max_timestamp = window_view.max_timestamp;
}
// Filter outdated data
if (window_view.allowed_lateness && t_max_timestamp != 0)
{
lateness_bound
= window_view.addTime(t_max_timestamp, window_view.lateness_kind, -1 * window_view.lateness_num_units);
if (window_view.is_watermark_bounded)
{
UInt32 watermark_lower_bound = window_view.is_tumble
? window_view.addTime(t_max_watermark, window_view.window_kind, -1 * window_view.window_num_units)
: window_view.addTime(t_max_watermark, window_view.hop_kind, -1 * window_view.hop_num_units);
if (watermark_lower_bound < lateness_bound)
lateness_bound = watermark_lower_bound;
}
}
else
{
lateness_bound = t_max_fired_watermark;
}
if (lateness_bound > 0)
{
ColumnsWithTypeAndName columns;
columns.emplace_back(nullptr, std::make_shared<DataTypeDateTime>(), window_view.timestamp_column_name);
ExpressionActionsPtr filter_expressions = std::make_shared<ExpressionActions>(columns, context);
filter_expressions->add(ExpressionAction::addColumn(
{std::make_shared<DataTypeDateTime>()->createColumnConst(1, toField(lateness_bound)),
std::make_shared<DataTypeDateTime>(),
"____lateness_bound"}));
const auto & function_greater = FunctionFactory::instance().get("greaterOrEquals", context);
filter_expressions->add(ExpressionAction::applyFunction(
function_greater, Names{window_view.timestamp_column_name, "____lateness_bound"}, "____filter"));
pipe.addSimpleTransform(std::make_shared<FilterTransform>(pipe.getHeader(), filter_expressions, "____filter", true));
}
std::shared_lock<std::shared_mutex> fire_signal_lock;
if (window_view.is_proctime)
{
@ -1009,45 +1054,6 @@ void StorageWindowView::writeIntoWindowView(StorageWindowView & window_view, con
}
else
{
UInt32 t_max_fired_watermark = 0;
if (window_view.allowed_lateness)
{
UInt32 t_max_timestamp = 0;
UInt32 t_max_watermark = 0;
{
std::lock_guard lock(window_view.fire_signal_mutex);
t_max_fired_watermark = window_view.max_fired_watermark;
t_max_watermark = window_view.max_watermark;
t_max_timestamp = window_view.max_timestamp;
}
if (t_max_timestamp!= 0)
{
UInt32 lateness_bound
= window_view.addTime(t_max_timestamp, window_view.lateness_kind, -1 * window_view.lateness_num_units);
if (window_view.is_watermark_bounded)
{
UInt32 watermark_lower_bound = window_view.is_tumble
? window_view.addTime(t_max_watermark, window_view.window_kind, -1 * window_view.window_num_units)
: window_view.addTime(t_max_watermark, window_view.hop_kind, -1 * window_view.hop_num_units);
if (watermark_lower_bound < lateness_bound)
lateness_bound = watermark_lower_bound;
}
ColumnsWithTypeAndName columns;
columns.emplace_back(nullptr, std::make_shared<DataTypeDateTime>(), window_view.timestamp_column_name);
ExpressionActionsPtr filter_expressions = std::make_shared<ExpressionActions>(columns, context);
filter_expressions->add(
ExpressionAction::addColumn({std::make_shared<DataTypeDateTime>()->createColumnConst(1, toField(lateness_bound)),
std::make_shared<DataTypeDateTime>(),
"____lateness_bound"}));
const auto & function_greater = FunctionFactory::instance().get("greaterOrEquals", context);
filter_expressions->add(ExpressionAction::applyFunction(
function_greater, Names{window_view.timestamp_column_name, "____lateness_bound"}, "____filter"));
pipe.addSimpleTransform(std::make_shared<FilterTransform>(pipe.getHeader(), filter_expressions, "____filter", true));
}
}
InterpreterSelectQuery select_block(window_view.getMergeableQuery(), context, {std::move(pipe)}, QueryProcessingStage::WithMergeableState);
source_stream = select_block.execute().getInputStream();
@ -1057,18 +1063,18 @@ void StorageWindowView::writeIntoWindowView(StorageWindowView & window_view, con
if (window_view.is_watermark_bounded || window_view.allowed_lateness)
{
UInt32 t_max_timestamp = 0;
UInt32 block_max_timestamp = 0;
if (window_view.is_watermark_bounded || window_view.allowed_lateness)
{
const auto & column_timestamp = block.getByName(window_view.timestamp_column_name).column;
const ColumnUInt32::Container & timestamp_data = static_cast<const ColumnUInt32 &>(*column_timestamp).getData();
for (const auto & timestamp : timestamp_data)
{
if (timestamp > t_max_timestamp)
t_max_timestamp = timestamp;
if (timestamp > block_max_timestamp)
block_max_timestamp = timestamp;
}
}
std::static_pointer_cast<WatermarkBlockInputStream>(source_stream)->setMaxTimestamp(t_max_timestamp);
std::static_pointer_cast<WatermarkBlockInputStream>(source_stream)->setMaxTimestamp(block_max_timestamp);
}
if (window_view.allowed_lateness && t_max_fired_watermark != 0)