Merge branch 'nats-integration' of https://github.com/tchepavel/ClickHouse into nats-integration

This commit is contained in:
Chebarykov Pavel 2022-06-02 09:13:27 +00:00
commit 912e88c7fc
8 changed files with 41 additions and 52 deletions

View File

@ -9,6 +9,9 @@ namespace DB
/* The object of this class is shared between concurrent consumers (who share the same connection == share the same
* event loop and handler).
*/
static const auto MAX_THREAD_WORK_DURATION_MS = 60000;
NATSHandler::NATSHandler(uv_loop_t * loop_, Poco::Logger * log_) :
loop(loop_),
log(log_),
@ -23,6 +26,8 @@ NATSHandler::NATSHandler(uv_loop_t * loop_, Poco::Logger * log_) :
natsLibuv_Read,
natsLibuv_Write,
natsLibuv_Detach);
natsOptions_SetIOBufSize(opts, INT_MAX);
natsOptions_SetSendAsap(opts, true);
}
void NATSHandler::startLoop()
@ -32,10 +37,15 @@ void NATSHandler::startLoop()
LOG_DEBUG(log, "Background loop started");
loop_running.store(true);
auto start_time = std::chrono::steady_clock::now();
auto end_time = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
while (loop_state.load() == Loop::RUN)
while (loop_state.load() == Loop::RUN && duration.count() < MAX_THREAD_WORK_DURATION_MS)
{
uv_run(loop, UV_RUN_NOWAIT);
end_time = std::chrono::steady_clock::now();
duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
}
LOG_DEBUG(log, "Background loop ended");

View File

@ -62,6 +62,7 @@ NATSSource::~NATSSource()
if (!buffer)
return;
buffer->allowNext();
storage.pushReadBuffer(buffer);
}

View File

@ -19,6 +19,7 @@ namespace ErrorCodes
ReadBufferFromNATSConsumer::ReadBufferFromNATSConsumer(
std::shared_ptr<NATSConnectionManager> connection_,
StorageNATS & storage_,
std::vector<String> & subjects_,
const String & subscribe_queue_name,
Poco::Logger * log_,
@ -27,6 +28,7 @@ ReadBufferFromNATSConsumer::ReadBufferFromNATSConsumer(
const std::atomic<bool> & stopped_)
: ReadBuffer(nullptr, 0)
, connection(connection_)
, storage(storage_)
, subjects(subjects_)
, log(log_)
, row_delimiter(row_delimiter_)
@ -49,6 +51,7 @@ void ReadBufferFromNATSConsumer::subscribe()
if (status == NATS_OK)
{
LOG_DEBUG(log, "Subscribed to subject {}", subject);
natsSubscription_SetPendingLimits(ns, -1, -1);
subscriptions.emplace_back(ns, &natsSubscription_Destroy);
}
else
@ -100,6 +103,8 @@ void ReadBufferFromNATSConsumer::onMsg(natsConnection *, natsSubscription *, nat
};
if (!buffer->received.push(std::move(data)))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to received queue");
buffer->storage.startStreaming();
}
natsMsg_Destroy(msg);

View File

@ -6,6 +6,7 @@
#include <Storages/NATS/NATSConnection.h>
#include <base/types.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Storages/NATS/StorageNATS.h>
namespace Poco
{
@ -20,6 +21,7 @@ class ReadBufferFromNATSConsumer : public ReadBuffer
public:
ReadBufferFromNATSConsumer(
std::shared_ptr<NATSConnectionManager> connection_,
StorageNATS & storage_,
std::vector<String> & subjects_,
const String & subscribe_queue_name,
Poco::Logger * log_,
@ -41,6 +43,7 @@ public:
bool isConsumerStopped() { return stopped; }
bool queueEmpty() { return received.empty(); }
size_t queueSize() { return received.size(); }
void allowNext() { allowed = true; } // Allow to read next message.
auto getSubject() const { return current.subject; }
@ -51,6 +54,7 @@ private:
static void onMsg(natsConnection * nc, natsSubscription * sub, natsMsg * msg, void * consumer);
std::shared_ptr<NATSConnectionManager> connection;
StorageNATS & storage;
std::vector<SubscriptionPtr> subscriptions;
std::vector<String> subjects;
Poco::Logger * log;

View File

@ -29,7 +29,6 @@ namespace DB
static const uint32_t QUEUE_SIZE = 100000;
static const auto RESCHEDULE_MS = 500;
static const auto BACKOFF_TRESHOLD = 8000;
static const auto MAX_THREAD_WORK_DURATION_MS = 60000;
namespace ErrorCodes
@ -59,7 +58,6 @@ StorageNATS::StorageNATS(
, log(&Poco::Logger::get("StorageNATS (" + table_id_.table_name + ")"))
, semaphore(0, num_consumers)
, queue_size(std::max(QUEUE_SIZE, static_cast<uint32_t>(getMaxBlockSize())))
, milliseconds_to_wait(RESCHEDULE_MS)
, is_attach(is_attach_)
{
auto nats_username = getContext()->getMacros()->expand(nats_settings->nats_username);
@ -162,6 +160,7 @@ ContextMutablePtr StorageNATS::addSettings(ContextPtr local_context) const
void StorageNATS::loopingFunc()
{
connection->getHandler().startLoop();
looping_task->activateAndSchedule();
}
@ -244,22 +243,14 @@ bool StorageNATS::initBuffers()
/* Need to deactivate this way because otherwise might get a deadlock when first deactivate streaming task in shutdown and then
* inside streaming task try to deactivate any other task
*/
void StorageNATS::deactivateTask(BackgroundSchedulePool::TaskHolder & task, bool wait, bool stop_loop)
void StorageNATS::deactivateTask(BackgroundSchedulePool::TaskHolder & task, bool stop_loop)
{
if (stop_loop)
stopLoop();
std::unique_lock<std::mutex> lock(task_mutex, std::defer_lock);
if (lock.try_lock())
{
task->deactivate();
lock.unlock();
}
else if (wait) /// Wait only if deactivating from shutdown
{
lock.lock();
task->deactivate();
}
lock.lock();
task->deactivate();
}
@ -299,8 +290,6 @@ Pipe StorageNATS::read(
if (!connection->isConnected())
{
if (connection->getHandler().loopRunning())
deactivateTask(looping_task, false, true);
if (!connection->reconnect())
throw Exception(ErrorCodes::CANNOT_CONNECT_NATS, "No connection to {}", connection->connectionInfoForLog());
}
@ -385,8 +374,6 @@ void StorageNATS::startup()
if (!connection->isConnected() || !initBuffers())
connection_task->activateAndSchedule();
streaming_task->activateAndSchedule();
}
@ -395,12 +382,12 @@ void StorageNATS::shutdown()
shutdown_called = true;
/// In case it has not yet been able to setup connection;
deactivateTask(connection_task, true, false);
deactivateTask(connection_task, false);
/// The order of deactivating tasks is important: wait for streamingToViews() func to finish and
/// then wait for background event loop to finish.
deactivateTask(streaming_task, true, false);
deactivateTask(looping_task, true, true);
deactivateTask(streaming_task, false);
deactivateTask(looping_task, true);
/// Just a paranoid try catch, it is not actually needed.
try
@ -411,10 +398,6 @@ void StorageNATS::shutdown()
buffer->unsubscribe();
}
/// It is important to close connection here - before removing consumer buffers, because
/// it will finish and clean callbacks, which might use those buffers data.
if (connection->getHandler().loopRunning())
stopLoop();
connection->disconnect();
for (size_t i = 0; i < num_created_consumers; ++i)
@ -463,7 +446,7 @@ ConsumerBufferPtr StorageNATS::popReadBuffer(std::chrono::milliseconds timeout)
ConsumerBufferPtr StorageNATS::createReadBuffer()
{
return std::make_shared<ReadBufferFromNATSConsumer>(
connection, subjects,
connection, *this, subjects,
nats_settings->nats_queue_group.changed ? nats_settings->nats_queue_group.value : getStorageID().getFullTableName(),
log, row_delimiter, queue_size, shutdown_called);
}
@ -548,6 +531,7 @@ bool StorageNATS::checkDependencies(const StorageID & table_id)
void StorageNATS::streamingToViewsFunc()
{
bool do_reschedule = true;
try
{
auto table_id = getStorageID();
@ -573,21 +557,14 @@ void StorageNATS::streamingToViewsFunc()
if (streamToViews())
{
/// Reschedule with backoff.
if (milliseconds_to_wait < BACKOFF_TRESHOLD)
milliseconds_to_wait *= 2;
stopLoopIfNoReaders();
do_reschedule = false;
break;
}
else
{
milliseconds_to_wait = RESCHEDULE_MS;
}
auto end_time = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
if (duration.count() > MAX_THREAD_WORK_DURATION_MS)
{
stopLoopIfNoReaders();
LOG_TRACE(log, "Reschedule streaming. Thread work duration limit exceeded.");
break;
}
@ -601,13 +578,8 @@ void StorageNATS::streamingToViewsFunc()
mv_attached.store(false);
/// If there is no running select, stop the loop which was
/// activated by previous select.
if (connection->getHandler().loopRunning())
stopLoopIfNoReaders();
if (!shutdown_called)
streaming_task->scheduleAfter(milliseconds_to_wait);
if (!shutdown_called && do_reschedule)
streaming_task->scheduleAfter(RESCHEDULE_MS);
}
@ -640,6 +612,7 @@ bool StorageNATS::streamToViews()
for (size_t i = 0; i < num_created_consumers; ++i)
{
LOG_DEBUG(log, "Current queue size: {}", buffers[0]->queueSize());
auto source = std::make_shared<NATSSource>(*this, storage_snapshot, nats_context, column_names, block_size);
sources.emplace_back(source);
pipes.emplace_back(source);
@ -666,10 +639,6 @@ bool StorageNATS::streamToViews()
executor.execute();
}
/* Note: sending ack() with loop running in another thread will lead to a lot of data races inside the library, but only in case
* error occurs or connection is lost while ack is being sent
*/
deactivateTask(looping_task, false, true);
size_t queue_empty = 0;
if (!connection->isConnected())

View File

@ -60,6 +60,8 @@ public:
void incrementReader();
void decrementReader();
void startStreaming() { if (!mv_attached) { streaming_task->activateAndSchedule(); } }
private:
ContextMutablePtr nats_context;
std::unique_ptr<NATSSettings> nats_settings;
@ -90,8 +92,6 @@ private:
BackgroundSchedulePool::TaskHolder looping_task;
BackgroundSchedulePool::TaskHolder connection_task;
uint64_t milliseconds_to_wait;
/// True if consumers have subscribed to all subjects
std::atomic<bool> consumers_ready{false};
/// Needed for tell MV or producer background tasks
@ -135,7 +135,7 @@ private:
ContextMutablePtr addSettings(ContextPtr context) const;
size_t getMaxBlockSize() const;
void deactivateTask(BackgroundSchedulePool::TaskHolder & task, bool wait, bool stop_loop);
void deactivateTask(BackgroundSchedulePool::TaskHolder & task, bool stop_loop);
bool streamToViews();
bool checkDependencies(const StorageID & table_id);

View File

@ -15,6 +15,7 @@ namespace DB
{
static const auto BATCH = 1000;
static const auto MAX_BUFFERED = 131072;
namespace ErrorCodes
{
@ -54,7 +55,6 @@ WriteBufferToNATSProducer::WriteBufferToNATSProducer(
WriteBufferToNATSProducer::~WriteBufferToNATSProducer()
{
writing_task->deactivate();
connection.disconnect();
assert(rows == 0);
}
@ -79,7 +79,6 @@ void WriteBufferToNATSProducer::countRow()
reinitializeChunks();
++payload_counter;
if (!payloads.push(payload))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to payloads queue");
}
@ -97,12 +96,14 @@ void WriteBufferToNATSProducer::publish()
void WriteBufferToNATSProducer::publishThreadFunc(void * arg)
{
String payload;
WriteBufferToNATSProducer * buffer = static_cast<WriteBufferToNATSProducer *>(arg);
String payload;
natsStatus status;
while (!buffer->payloads.empty())
{
if (natsConnection_Buffered(buffer->connection.getConnection()) > MAX_BUFFERED)
break;
bool pop_result = buffer->payloads.pop(payload);
if (!pop_result)

View File

@ -69,7 +69,6 @@ private:
* true: means payloads.queue will not grow anymore
*/
std::atomic<bool> wait_payloads = false;
UInt64 payload_counter = 0;
Poco::Logger * log;
const std::optional<char> delim;