Add debug logging

This commit is contained in:
kssenii 2024-08-05 16:40:54 +02:00
parent ba3099015f
commit e9506202d6

View File

@ -1055,6 +1055,8 @@ void StorageWindowView::threadFuncFireProc()
/// TODO: consider using time_t instead (for every timestamp in this class) /// TODO: consider using time_t instead (for every timestamp in this class)
UInt32 timestamp_now = now(); UInt32 timestamp_now = now();
LOG_TRACE(log, "Now: {}, next fire signal: {}, max watermark: {}", timestamp_now, next_fire_signal, max_watermark);
while (next_fire_signal <= timestamp_now) while (next_fire_signal <= timestamp_now)
{ {
try try
@ -1072,6 +1074,9 @@ void StorageWindowView::threadFuncFireProc()
if (slide_kind > IntervalKind::Kind::Day) if (slide_kind > IntervalKind::Kind::Day)
slide_interval *= 86400; slide_interval *= 86400;
next_fire_signal += slide_interval; next_fire_signal += slide_interval;
LOG_TRACE(log, "Now: {}, next fire signal: {}, max watermark: {}, max fired watermark: {}, slide interval: {}",
timestamp_now, next_fire_signal, max_watermark, max_fired_watermark, slide_interval);
} }
if (max_watermark >= timestamp_now) if (max_watermark >= timestamp_now)
@ -1433,16 +1438,19 @@ void StorageWindowView::writeIntoWindowView(
while (window_view.modifying_query) while (window_view.modifying_query)
std::this_thread::sleep_for(std::chrono::milliseconds(100)); std::this_thread::sleep_for(std::chrono::milliseconds(100));
if (!window_view.is_proctime && window_view.max_watermark == 0 && block.rows() > 0) const size_t block_rows = block.rows();
if (!window_view.is_proctime && window_view.max_watermark == 0 && block_rows > 0)
{ {
std::lock_guard lock(window_view.fire_signal_mutex); std::lock_guard lock(window_view.fire_signal_mutex);
const auto & window_column = block.getByName(window_view.timestamp_column_name); const auto & window_column = block.getByName(window_view.timestamp_column_name);
const ColumnUInt32::Container & window_end_data = static_cast<const ColumnUInt32 &>(*window_column.column).getData(); const ColumnUInt32::Container & window_end_data = static_cast<const ColumnUInt32 &>(*window_column.column).getData();
UInt32 first_record_timestamp = window_end_data[0]; UInt32 first_record_timestamp = window_end_data[0];
window_view.max_watermark = window_view.getWindowUpperBound(first_record_timestamp); window_view.max_watermark = window_view.getWindowUpperBound(first_record_timestamp);
LOG_TRACE(window_view.log, "New max watermark: {}", window_view.max_watermark);
} }
Pipe pipe(std::make_shared<SourceFromSingleChunk>(block)); Pipe pipe(std::make_shared<SourceFromSingleChunk>(std::move(block)));
UInt32 lateness_bound = 0; UInt32 lateness_bound = 0;
UInt32 t_max_watermark = 0; UInt32 t_max_watermark = 0;
@ -1649,6 +1657,8 @@ void StorageWindowView::writeIntoWindowView(
auto executor = builder.execute(); auto executor = builder.execute();
executor->execute(builder.getNumThreads(), local_context->getSettingsRef().use_concurrency_control); executor->execute(builder.getNumThreads(), local_context->getSettingsRef().use_concurrency_control);
LOG_TRACE(window_view.log, "Wrote {} rows into inner table ({})", block_rows, inner_table->getStorageID().getFullTableName());
} }
void StorageWindowView::startup() void StorageWindowView::startup()