From e411d7f50fdf8920d25e95daaab65400451165e6 Mon Sep 17 00:00:00 2001 From: Ilya Golshtein Date: Mon, 26 Aug 2024 13:55:18 +0000 Subject: [PATCH] kafka_dead_letter_queue: debug logging removed --- src/Storages/Kafka/KafkaSource.cpp | 8 -------- src/Storages/Kafka/StorageKafka2.cpp | 4 ---- src/Storages/Kafka/StorageKafkaUtils.cpp | 11 ----------- 3 files changed, 23 deletions(-) diff --git a/src/Storages/Kafka/KafkaSource.cpp b/src/Storages/Kafka/KafkaSource.cpp index 69696a04d35..071f901bb5e 100644 --- a/src/Storages/Kafka/KafkaSource.cpp +++ b/src/Storages/Kafka/KafkaSource.cpp @@ -49,7 +49,6 @@ KafkaSource::KafkaSource( , virtual_header(storage.getVirtualsHeader()) , handle_error_mode(storage.getHandleErrorMode()) { - LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "ctor"); } KafkaSource::~KafkaSource() @@ -78,7 +77,6 @@ bool KafkaSource::checkTimeLimit() const Chunk KafkaSource::generateImpl() { - LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "actual top"); if (!consumer) { auto timeout = std::chrono::milliseconds(context->getSettingsRef().kafka_max_wait_ms.totalMilliseconds()); @@ -95,7 +93,6 @@ Chunk KafkaSource::generateImpl() if (is_finished) return {}; - LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: top"); is_finished = true; // now it's one-time usage InputStream // one block of the needed size (or with desired flush timeout) is formed in one internal iteration @@ -149,7 +146,6 @@ Chunk KafkaSource::generateImpl() while (true) { - LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: top of while"); size_t new_rows = 0; exception_message.reset(); if (auto buf = consumer->consume()) @@ -169,7 +165,6 @@ Chunk KafkaSource::generateImpl() ProfileEvents::increment(ProfileEvents::KafkaRowsRead, new_rows); - LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: new_rows"); consumer->storeLastReadMessageOffset(); auto topic = consumer->currentTopic(); @@ -195,7 +190,6 @@ Chunk KafkaSource::generateImpl() for (size_t i = 0; i < new_rows; ++i) { - LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: top of for"); virtual_columns[0]->insert(topic); virtual_columns[1]->insert(key); virtual_columns[2]->insert(offset); @@ -229,7 +223,6 @@ Chunk KafkaSource::generateImpl() } else if (handle_error_mode == ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE) { - LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: DEAD_LETTER_QUEUE"); if (exception_message) { @@ -237,7 +230,6 @@ Chunk KafkaSource::generateImpl() auto storage_id = storage.getStorageID(); auto dead_letter_queue = context->getDeadLetterQueue(); - LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: calling dead_letter_queue->add"); dead_letter_queue->add( DeadLetterQueueElement{ .stream_type = DeadLetterQueueElement::StreamType::Kafka, diff --git a/src/Storages/Kafka/StorageKafka2.cpp b/src/Storages/Kafka/StorageKafka2.cpp index e918158d7c4..cf3e1ac65d7 100644 --- a/src/Storages/Kafka/StorageKafka2.cpp +++ b/src/Storages/Kafka/StorageKafka2.cpp @@ -927,8 +927,6 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer( virtual_columns[6]->insert(headers_names); virtual_columns[7]->insert(headers_values); - LOG_DEBUG(&Poco::Logger::get("StorageKafka2"), "pollConsumer"); - if (getHandleKafkaErrorMode() == ExtStreamingHandleErrorMode::STREAM) { if (exception_message) @@ -944,14 +942,12 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer( } else if (getHandleKafkaErrorMode() == ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE) { - LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: DEAD_LETTER_QUEUE"); if (exception_message) { const auto time_now = std::chrono::system_clock::now(); auto dead_letter_queue = getContext()->getDeadLetterQueue(); - LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: calling dead_letter_queue->add"); dead_letter_queue->add( DeadLetterQueueElement{ .stream_type = DeadLetterQueueElement::StreamType::Kafka, diff --git a/src/Storages/Kafka/StorageKafkaUtils.cpp b/src/Storages/Kafka/StorageKafkaUtils.cpp index 27b59841c29..327d812e843 100644 --- a/src/Storages/Kafka/StorageKafkaUtils.cpp +++ b/src/Storages/Kafka/StorageKafkaUtils.cpp @@ -59,8 +59,6 @@ namespace ErrorCodes void registerStorageKafka(StorageFactory & factory) { - LOG_DEBUG(&Poco::Logger::get("registerStorageKafka"), "Top of registerStorageKafka"); - auto creator_fn = [](const StorageFactory::Arguments & args) -> std::shared_ptr { ASTs & engine_args = args.engine_args; @@ -74,9 +72,6 @@ void registerStorageKafka(StorageFactory & factory) for (const auto & setting : kafka_settings->all()) { const auto & setting_name = setting.getName(); - LOG_DEBUG(&Poco::Logger::get("registerStorageKafka"), "registerStorageKafka (named collection): processing {}", setting_name); - - if (named_collection->has(setting_name)) kafka_settings->set(setting_name, named_collection->get(setting_name)); } @@ -85,9 +80,7 @@ void registerStorageKafka(StorageFactory & factory) if (has_settings) { - LOG_DEBUG(&Poco::Logger::get("registerStorageKafka"), "registerStorageKafka: before loadFromQuery"); kafka_settings->loadFromQuery(*args.storage_def); - LOG_DEBUG(&Poco::Logger::get("registerStorageKafka"), "registerStorageKafka: after loadFromQuery"); } // Check arguments and settings @@ -161,9 +154,7 @@ void registerStorageKafka(StorageFactory & factory) CHECK_KAFKA_STORAGE_ARGUMENT(12, kafka_poll_timeout_ms, 0) CHECK_KAFKA_STORAGE_ARGUMENT(13, kafka_flush_interval_ms, 0) CHECK_KAFKA_STORAGE_ARGUMENT(14, kafka_thread_per_consumer, 0) - LOG_DEBUG(&Poco::Logger::get("registerStorageKafka"), "registerStorageKafka: before kafka_handle_error_mode CHECK_KAFKA_STORAGE_ARGUMENT"); CHECK_KAFKA_STORAGE_ARGUMENT(15, kafka_handle_error_mode, 0) - LOG_DEBUG(&Poco::Logger::get("registerStorageKafka"), "registerStorageKafka: after kafka_handle_error_mode CHECK_KAFKA_STORAGE_ARGUMENT"); CHECK_KAFKA_STORAGE_ARGUMENT(16, kafka_commit_on_select, 0) CHECK_KAFKA_STORAGE_ARGUMENT(17, kafka_max_rows_per_message, 0) } @@ -291,8 +282,6 @@ void registerStorageKafka(StorageFactory & factory) args.table_id, args.getContext(), args.columns, args.comment, std::move(kafka_settings), collection_name); }; - - factory.registerStorage( "Kafka", creator_fn,