This commit is contained in:
kssenii 2023-02-13 13:15:58 +01:00
parent 8265db80ff
commit 614d57d245
7 changed files with 41 additions and 10 deletions

View File

@ -4,11 +4,22 @@
namespace DB
{
IMessageProducer::IMessageProducer(Poco::Logger * log_) : log(log_)
{
}
void AsynchronousMessageProducer::start(const ContextPtr & context)
{
LOG_TEST(log, "Executing startup");
initialize();
producing_task = context->getSchedulePool().createTask(getProducingTaskName(), [this]
{
LOG_TEST(log, "Starting producing task loop");
scheduled.store(true);
scheduled.notify_one();
startProducingTaskLoop();
});
producing_task->activateAndSchedule();
@ -20,8 +31,17 @@ void AsynchronousMessageProducer::finish()
if (finished.exchange(true))
return;
LOG_TEST(log, "Executing shutdown");
/// It is possible that the task with a producer loop haven't been started yet
/// while we have non empty payloads queue.
/// If we deactivate it here, the messages will never be sent,
/// as the producer loop will never start.
scheduled.wait(false);
/// Tell the task that it should shutdown, but not immediately,
/// it will finish executing current tasks nevertheless.
stopProducingTask();
/// Deactivate producing task and wait until it's finished.
/// Wait for the producer task to finish.
producing_task->deactivate();
finishImpl();
}

View File

@ -6,6 +6,8 @@
#include <Interpreters/Context.h>
#include <Core/BackgroundSchedulePool.h>
namespace Poco { class Logger; }
namespace DB
{
@ -14,6 +16,8 @@ namespace DB
class IMessageProducer
{
public:
explicit IMessageProducer(Poco::Logger * log_);
/// Do some preparations.
virtual void start(const ContextPtr & context) = 0;
@ -24,12 +28,17 @@ public:
virtual void finish() = 0;
virtual ~IMessageProducer() = default;
protected:
Poco::Logger * log;
};
/// Implements interface for concurrent message producing.
class AsynchronousMessageProducer : public IMessageProducer
{
public:
explicit AsynchronousMessageProducer(Poco::Logger * log_) : IMessageProducer(log_) {}
/// Create and schedule task in BackgroundSchedulePool that will produce messages.
void start(const ContextPtr & context) override;
@ -58,6 +67,8 @@ private:
std::atomic<bool> finished = false;
BackgroundSchedulePool::TaskHolder producing_task;
std::atomic<bool> scheduled;
};

View File

@ -18,7 +18,11 @@ namespace DB
KafkaProducer::KafkaProducer(
ProducerPtr producer_, const std::string & topic_, std::chrono::milliseconds poll_timeout, std::atomic<bool> & shutdown_called_, const Block & header)
: producer(producer_), topic(topic_), timeout(poll_timeout), shutdown_called(shutdown_called_)
: IMessageProducer(&Poco::Logger::get("KafkaProducer"))
, producer(producer_)
, topic(topic_)
, timeout(poll_timeout)
, shutdown_called(shutdown_called_)
{
if (header.has("_key"))
{

View File

@ -24,11 +24,11 @@ NATSProducer::NATSProducer(
const String & subject_,
std::atomic<bool> & shutdown_called_,
Poco::Logger * log_)
: connection(configuration_, log_)
: AsynchronousMessageProducer(log_)
, connection(configuration_, log_)
, subject(subject_)
, shutdown_called(shutdown_called_)
, payloads(BATCH)
, log(log_)
{
}

View File

@ -50,8 +50,6 @@ private:
* - payloads are pushed to queue in countRow and popped by another thread in writingFunc, each payload gets into queue only once
*/
ConcurrentBoundedQueue<String> payloads;
Poco::Logger * log;
};
}

View File

@ -31,7 +31,8 @@ RabbitMQProducer::RabbitMQProducer(
const bool persistent_,
std::atomic<bool> & shutdown_called_,
Poco::Logger * log_)
: connection(configuration_, log_)
: AsynchronousMessageProducer(log_)
, connection(configuration_, log_)
, routing_keys(routing_keys_)
, exchange_name(exchange_name_)
, exchange_type(exchange_type_)
@ -40,7 +41,6 @@ RabbitMQProducer::RabbitMQProducer(
, shutdown_called(shutdown_called_)
, payloads(BATCH)
, returned(RETURNED_LIMIT)
, log(log_)
{
}

View File

@ -103,8 +103,6 @@ private:
/// Record of pending acknowledgements from the server; its size never exceeds size of returned.queue
std::map<UInt64, Payload> delivery_record;
Poco::Logger * log;
};
}