diff --git a/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h b/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h index 538b330fae5..2349f120a5b 100644 --- a/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h +++ b/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h @@ -22,7 +22,7 @@ namespace DB namespace ErrorCodes { - const extern int NOT_IMPLEMENTED; + extern const int NOT_IMPLEMENTED; } /** diff --git a/src/Storages/IMessageProducer.h b/src/Storages/IMessageProducer.h index 53744c396de..1ad09570c94 100644 --- a/src/Storages/IMessageProducer.h +++ b/src/Storages/IMessageProducer.h @@ -49,7 +49,7 @@ protected: virtual void finishImpl() {} virtual String getProducingTaskName() const = 0; - /// Method that is called inside producing task, all producing wokr should be done here. + /// Method that is called inside producing task, all producing work should be done here. virtual void producingTask() = 0; private: diff --git a/src/Storages/Kafka/KafkaProducer.cpp b/src/Storages/Kafka/KafkaProducer.cpp index 84066228bef..ef8f11f1386 100644 --- a/src/Storages/Kafka/KafkaProducer.cpp +++ b/src/Storages/Kafka/KafkaProducer.cpp @@ -20,7 +20,7 @@ static const auto BATCH = 1000; namespace ErrorCodes { - const extern int LOGICAL_ERROR; + extern const int LOGICAL_ERROR; } KafkaProducer::KafkaProducer( diff --git a/src/Storages/RabbitMQ/RabbitMQConsumer.cpp b/src/Storages/RabbitMQ/RabbitMQConsumer.cpp index c2a922b6baf..9b66b9b1d7c 100644 --- a/src/Storages/RabbitMQ/RabbitMQConsumer.cpp +++ b/src/Storages/RabbitMQ/RabbitMQConsumer.cpp @@ -37,7 +37,6 @@ RabbitMQConsumer::RabbitMQConsumer( } - void RabbitMQConsumer::closeChannel() { if (consumer_channel) diff --git a/src/Storages/RabbitMQ/RabbitMQProducer.cpp b/src/Storages/RabbitMQ/RabbitMQProducer.cpp index 358c2b23d82..54678434f7c 100644 --- a/src/Storages/RabbitMQ/RabbitMQProducer.cpp +++ b/src/Storages/RabbitMQ/RabbitMQProducer.cpp @@ -72,11 +72,10 @@ void RabbitMQProducer::finishImpl() connection.disconnect(); } - void RabbitMQProducer::produce(const String & message, size_t, const Columns &, size_t) { LOG_DEBUG(&Poco::Logger::get("RabbitMQProducer"), "push {}", message); - + Payload payload; payload.message = message; payload.id = ++payload_counter; @@ -84,7 +83,6 @@ void RabbitMQProducer::produce(const String & message, size_t, const Columns &, throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to payloads queue"); } - void RabbitMQProducer::setupChannel() { producer_channel = connection.createChannel(); @@ -136,7 +134,6 @@ void RabbitMQProducer::setupChannel() }); } - void RabbitMQProducer::removeRecord(UInt64 received_delivery_tag, bool multiple, bool republish) { auto record_iter = delivery_record.find(received_delivery_tag); @@ -165,7 +162,6 @@ void RabbitMQProducer::removeRecord(UInt64 received_delivery_tag, bool multiple, } } - void RabbitMQProducer::publish(Payloads & messages, bool republishing) { Payload payload; @@ -180,8 +176,6 @@ void RabbitMQProducer::publish(Payloads & messages, bool republishing) if (!pop_result) return; - LOG_DEBUG(&Poco::Logger::get("RabbitMQProducer"), "pop {}", payload.message); - AMQP::Envelope envelope(payload.message.data(), payload.message.size()); /// if headers exchange is used, routing keys are added here via headers, if not - it is just empty @@ -230,11 +224,8 @@ void RabbitMQProducer::publish(Payloads & messages, bool republishing) iterateEventLoop(); } - void RabbitMQProducer::producingTask() { - LOG_DEBUG(&Poco::Logger::get("RabbitMQProducer"), "start producingTask"); - while ((!payloads.isFinishedAndEmpty() || !returned.empty() || !delivery_record.empty()) && !shutdown_called.load()) { /// If onReady callback is not received, producer->usable() will anyway return true, @@ -259,8 +250,6 @@ void RabbitMQProducer::producingTask() } } - LOG_DEBUG(&Poco::Logger::get("RabbitMQProducer"), "finish producingTask"); - LOG_DEBUG(log, "Producer on channel {} completed", channel_id); }