ClickHouse/src/Storages/IMessageProducer.cpp

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

51 lines
1.3 KiB
C++
Raw Normal View History

#include <Storages/IMessageProducer.h>
#include <Common/logger_useful.h>
namespace DB
{
2023-02-13 12:15:58 +00:00
IMessageProducer::IMessageProducer(Poco::Logger * log_) : log(log_)
{
}
2022-12-15 19:47:10 +00:00
void AsynchronousMessageProducer::start(const ContextPtr & context)
{
2023-02-13 12:15:58 +00:00
LOG_TEST(log, "Executing startup");
initialize();
producing_task = context->getSchedulePool().createTask(getProducingTaskName(), [this]
{
2023-02-13 12:15:58 +00:00
LOG_TEST(log, "Starting producing task loop");
scheduled.store(true);
scheduled.notify_one();
2022-12-15 19:47:10 +00:00
startProducingTaskLoop();
});
producing_task->activateAndSchedule();
}
2022-12-15 19:47:10 +00:00
void AsynchronousMessageProducer::finish()
{
/// We should execute finish logic only once.
if (finished.exchange(true))
return;
2023-02-13 12:15:58 +00:00
LOG_TEST(log, "Executing shutdown");
/// It is possible that the task with a producer loop haven't been started yet
/// while we have non empty payloads queue.
/// If we deactivate it here, the messages will never be sent,
/// as the producer loop will never start.
scheduled.wait(false);
/// Tell the task that it should shutdown, but not immediately,
/// it will finish executing current tasks nevertheless.
stopProducingTask();
2023-02-13 12:15:58 +00:00
/// Wait for the producer task to finish.
producing_task->deactivate();
finishImpl();
}
}