From 4b69684cd2da6d3ca5bb5ced5e4fba5250b7b7f6 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Wed, 14 Apr 2021 23:15:31 +0300 Subject: [PATCH] Add logging about pushing to underlying in StorageKafka --- src/Storages/Kafka/StorageKafka.cpp | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index e2b5ce7c325..15dd5b553b0 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -585,6 +586,8 @@ void StorageKafka::threadFunc(size_t idx) bool StorageKafka::streamToViews() { + Stopwatch watch; + auto table_id = getStorageID(); auto table = DatabaseCatalog::instance().getTable(table_id, getContext()); if (!table) @@ -637,7 +640,11 @@ bool StorageKafka::streamToViews() // We can't cancel during copyData, as it's not aware of commits and other kafka-related stuff. // It will be cancelled on underlying layer (kafka buffer) std::atomic stub = {false}; - copyData(*in, *block_io.out, &stub); + size_t rows = 0; + copyData(*in, *block_io.out, [&rows](const Block & block) + { + rows += block.rows(); + }, &stub); bool some_stream_is_stalled = false; for (auto & stream : streams) @@ -646,6 +653,10 @@ bool StorageKafka::streamToViews() stream->as()->commit(); } + UInt64 milliseconds = watch.elapsedMilliseconds(); + LOG_DEBUG(log, "Pushing {} rows to {} took {} ms.", + formatReadableQuantity(rows), table_id.getNameForLogs(), milliseconds); + return some_stream_is_stalled; }