Fix style

This commit is contained in:
avogar 2022-10-28 17:09:08 +00:00
parent 8e13d1f1ec
commit d5f68e013d
5 changed files with 4 additions and 16 deletions

View File

@ -22,7 +22,7 @@ namespace DB
namespace ErrorCodes
{
const extern int NOT_IMPLEMENTED;
extern const int NOT_IMPLEMENTED;
}
/**

View File

@ -49,7 +49,7 @@ protected:
virtual void finishImpl() {}
virtual String getProducingTaskName() const = 0;
/// Method that is called inside producing task, all producing wokr should be done here.
/// Method that is called inside producing task, all producing work should be done here.
virtual void producingTask() = 0;
private:

View File

@ -20,7 +20,7 @@ static const auto BATCH = 1000;
namespace ErrorCodes
{
const extern int LOGICAL_ERROR;
extern const int LOGICAL_ERROR;
}
KafkaProducer::KafkaProducer(

View File

@ -37,7 +37,6 @@ RabbitMQConsumer::RabbitMQConsumer(
}
void RabbitMQConsumer::closeChannel()
{
if (consumer_channel)

View File

@ -72,11 +72,10 @@ void RabbitMQProducer::finishImpl()
connection.disconnect();
}
void RabbitMQProducer::produce(const String & message, size_t, const Columns &, size_t)
{
LOG_DEBUG(&Poco::Logger::get("RabbitMQProducer"), "push {}", message);
Payload payload;
payload.message = message;
payload.id = ++payload_counter;
@ -84,7 +83,6 @@ void RabbitMQProducer::produce(const String & message, size_t, const Columns &,
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to payloads queue");
}
void RabbitMQProducer::setupChannel()
{
producer_channel = connection.createChannel();
@ -136,7 +134,6 @@ void RabbitMQProducer::setupChannel()
});
}
void RabbitMQProducer::removeRecord(UInt64 received_delivery_tag, bool multiple, bool republish)
{
auto record_iter = delivery_record.find(received_delivery_tag);
@ -165,7 +162,6 @@ void RabbitMQProducer::removeRecord(UInt64 received_delivery_tag, bool multiple,
}
}
void RabbitMQProducer::publish(Payloads & messages, bool republishing)
{
Payload payload;
@ -180,8 +176,6 @@ void RabbitMQProducer::publish(Payloads & messages, bool republishing)
if (!pop_result)
return;
LOG_DEBUG(&Poco::Logger::get("RabbitMQProducer"), "pop {}", payload.message);
AMQP::Envelope envelope(payload.message.data(), payload.message.size());
/// if headers exchange is used, routing keys are added here via headers, if not - it is just empty
@ -230,11 +224,8 @@ void RabbitMQProducer::publish(Payloads & messages, bool republishing)
iterateEventLoop();
}
void RabbitMQProducer::producingTask()
{
LOG_DEBUG(&Poco::Logger::get("RabbitMQProducer"), "start producingTask");
while ((!payloads.isFinishedAndEmpty() || !returned.empty() || !delivery_record.empty()) && !shutdown_called.load())
{
/// If onReady callback is not received, producer->usable() will anyway return true,
@ -259,8 +250,6 @@ void RabbitMQProducer::producingTask()
}
}
LOG_DEBUG(&Poco::Logger::get("RabbitMQProducer"), "finish producingTask");
LOG_DEBUG(log, "Producer on channel {} completed", channel_id);
}