Merge pull request #37853 from Vxider/fire-when-inserted

Fire and clean windows in WindowView only when data is inserted
This commit is contained in:
Kseniia Sumarokova 2022-06-07 11:24:50 +02:00 committed by GitHub
commit edc6b68801
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 70 additions and 58 deletions

View File

@ -915,7 +915,7 @@ void StorageWindowView::addFireSignal(std::set<UInt32> & signals)
std::lock_guard lock(fire_signal_mutex);
for (const auto & signal : signals)
fire_signal.push_back(signal);
fire_signal_condition.notify_all();
fire_task->schedule();
}
void StorageWindowView::updateMaxTimestamp(UInt32 timestamp)
@ -927,6 +927,12 @@ void StorageWindowView::updateMaxTimestamp(UInt32 timestamp)
void StorageWindowView::updateMaxWatermark(UInt32 watermark)
{
if (is_proctime)
{
max_watermark = watermark;
return;
}
std::lock_guard lock(fire_signal_mutex);
bool updated;
@ -952,10 +958,10 @@ void StorageWindowView::updateMaxWatermark(UInt32 watermark)
}
if (updated)
fire_signal_condition.notify_all();
fire_task->schedule();
}
inline void StorageWindowView::cleanup()
void StorageWindowView::cleanup()
{
std::lock_guard fire_signal_lock(fire_signal_mutex);
std::lock_guard mutex_lock(mutex);
@ -973,18 +979,21 @@ inline void StorageWindowView::cleanup()
void StorageWindowView::threadFuncCleanup()
{
try
{
if (!shutdown_called)
cleanup();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (shutdown_called)
return;
if (!shutdown_called)
clean_cache_task->scheduleAfter(clean_interval_ms);
if ((Poco::Timestamp().epochMicroseconds() - last_clean_timestamp_usec) > clean_interval_usec)
{
try
{
cleanup();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
last_clean_timestamp_usec = Poco::Timestamp().epochMicroseconds();
}
}
void StorageWindowView::threadFuncFireProc()
@ -999,7 +1008,8 @@ void StorageWindowView::threadFuncFireProc()
{
try
{
fire(next_fire_signal);
if (max_watermark >= timestamp_now)
fire(next_fire_signal);
}
catch (...)
{
@ -1013,38 +1023,35 @@ void StorageWindowView::threadFuncFireProc()
next_fire_signal += slide_interval;
}
if (max_watermark >= timestamp_now)
clean_cache_task->schedule();
UInt64 timestamp_ms = static_cast<UInt64>(Poco::Timestamp().epochMicroseconds()) / 1000;
if (!shutdown_called)
fire_task->scheduleAfter(std::max(
UInt64(0),
static_cast<UInt64>(next_fire_signal) * 1000 - timestamp_ms));
fire_task->scheduleAfter(std::max(UInt64(0), static_cast<UInt64>(next_fire_signal) * 1000 - timestamp_ms));
}
void StorageWindowView::threadFuncFireEvent()
{
std::unique_lock lock(fire_signal_mutex);
while (!shutdown_called)
LOG_TRACE(log, "Fire events: {}", fire_signal.size());
while (!shutdown_called && !fire_signal.empty())
{
bool signaled = std::cv_status::no_timeout == fire_signal_condition.wait_for(lock, std::chrono::seconds(fire_signal_timeout_s));
if (!signaled)
continue;
LOG_TRACE(log, "Fire events: {}", fire_signal.size());
while (!fire_signal.empty())
try
{
try
{
fire(fire_signal.front());
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
max_fired_watermark = fire_signal.front();
fire_signal.pop_front();
fire(fire_signal.front());
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
max_fired_watermark = fire_signal.front();
fire_signal.pop_front();
}
clean_cache_task->schedule();
}
// Pipe StorageWindowView::read(
@ -1147,7 +1154,7 @@ StorageWindowView::StorageWindowView(
, WithContext(context_->getGlobalContext())
, log(&Poco::Logger::get(fmt::format("StorageWindowView({}.{})", table_id_.database_name, table_id_.table_name)))
, fire_signal_timeout_s(context_->getSettingsRef().wait_for_window_view_fire_signal_timeout.totalSeconds())
, clean_interval_ms(context_->getSettingsRef().window_view_clean_interval.totalMilliseconds())
, clean_interval_usec(context_->getSettingsRef().window_view_clean_interval.totalMicroseconds())
{
if (!query.select)
throw Exception(ErrorCodes::INCORRECT_QUERY, "SELECT query is not specified for {}", getName());
@ -1512,24 +1519,24 @@ void StorageWindowView::writeIntoWindowView(
if (block_max_timestamp)
window_view.updateMaxTimestamp(block_max_timestamp);
UInt32 lateness_upper_bound = 0;
if (window_view.allowed_lateness && t_max_fired_watermark)
lateness_upper_bound = t_max_fired_watermark;
/// On each chunk check window end for each row in a window column, calculating max.
/// Update max watermark (latest seen window end) if needed.
/// If lateness is allowed, add lateness signals.
builder.addSimpleTransform([&](const Block & current_header)
{
return std::make_shared<WatermarkTransform>(
current_header,
window_view,
window_view.window_id_name,
lateness_upper_bound);
});
}
UInt32 lateness_upper_bound = 0;
if (!window_view.is_proctime && window_view.allowed_lateness && t_max_fired_watermark)
lateness_upper_bound = t_max_fired_watermark;
/// On each chunk check window end for each row in a window column, calculating max.
/// Update max watermark (latest seen window end) if needed.
/// If lateness is allowed, add lateness signals.
builder.addSimpleTransform([&](const Block & current_header)
{
return std::make_shared<WatermarkTransform>(
current_header,
window_view,
window_view.window_id_name,
lateness_upper_bound);
});
auto inner_table = window_view.getInnerTable();
auto lock = inner_table->lockForShare(
local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
@ -1563,10 +1570,12 @@ void StorageWindowView::startup()
{
DatabaseCatalog::instance().addDependency(select_table_id, getStorageID());
// Start the working thread
fire_task->activateAndSchedule();
fire_task->activate();
clean_cache_task->activate();
clean_cache_task->scheduleAfter(clean_interval_ms);
/// Start the working thread
if (is_proctime)
fire_task->schedule();
}
void StorageWindowView::shutdown()
@ -1574,7 +1583,6 @@ void StorageWindowView::shutdown()
shutdown_called = true;
fire_condition.notify_all();
fire_signal_condition.notify_all();
clean_cache_task->deactivate();
fire_task->deactivate();

View File

@ -205,7 +205,8 @@ private:
mutable Block input_header;
mutable Block output_header;
UInt64 fire_signal_timeout_s;
UInt64 clean_interval_ms;
UInt64 clean_interval_usec;
UInt64 last_clean_timestamp_usec = 0;
const DateLUTImpl * time_zone = nullptr;
UInt32 max_timestamp = 0;
UInt32 max_watermark = 0; // next watermark to fire

View File

@ -1,3 +1,4 @@
0
4 631234810
5 631234810
6 631234815

View File

@ -27,6 +27,8 @@ while true; do
$CLICKHOUSE_CLIENT --query="SELECT count(*) FROM test_01086.\`.inner.wv\`" | grep -q "5" && break || sleep .5 ||:
done
$CLICKHOUSE_CLIENT --query="SELECT sleep(2);"
$CLICKHOUSE_CLIENT --query="INSERT INTO test_01086.mt VALUES (1, 6, toDateTime('1990/01/01 12:00:11', 'US/Samoa'));"
while true; do