Copy Rabbit to become NATS

This commit is contained in:
tchepavel 2022-05-08 15:12:15 +03:00
parent 59bfcd917b
commit 944a729fc5
22 changed files with 2934 additions and 0 deletions

3
.gitmodules vendored
View File

@ -265,3 +265,6 @@
[submodule "contrib/wyhash"]
path = contrib/wyhash
url = https://github.com/wangyi-fudan/wyhash.git
[submodule "contrib/nats-io"]
path = contrib/nats-io
url = https://github.com/nats-io/nats.c.git

View File

@ -132,6 +132,7 @@ add_contrib (krb5-cmake krb5)
add_contrib (cyrus-sasl-cmake cyrus-sasl) # for krb5
add_contrib (libgsasl-cmake libgsasl) # requires krb5
add_contrib (librdkafka-cmake librdkafka) # requires: libgsasl
add_contrib (nats-io-cmake nats-io)
add_contrib (libhdfs3-cmake libhdfs3) # requires: protobuf, krb5
add_contrib (hive-metastore-cmake hive-metastore) # requires: thrift/avro/arrow/libhdfs3
add_contrib (cppkafka-cmake cppkafka)

1
contrib/nats-io vendored Submodule

@ -0,0 +1 @@
Subproject commit d1f59f7bcf8465526f7e6d9c99982cbd6b209547

View File

@ -107,6 +107,8 @@ if (TARGET ch_contrib::rdkafka)
add_headers_and_sources(dbms Storages/Kafka)
endif()
add_headers_and_sources(dbms Storages/NATS)
if (TARGET ch_contrib::amqp_cpp)
add_headers_and_sources(dbms Storages/RabbitMQ)
endif()
@ -380,6 +382,8 @@ if (TARGET ch_contrib::rdkafka)
dbms_target_link_libraries(PRIVATE ch_contrib::rdkafka ch_contrib::cppkafka)
endif()
dbms_target_link_libraries(PRIVATE ch_contrib::nats-io)
if (TARGET ch_contrib::sasl2)
dbms_target_link_libraries(PRIVATE ch_contrib::sasl2)
endif()

View File

@ -0,0 +1,14 @@
#pragma once
#include <memory>
namespace DB
{
class ReadBufferFromNATSConsumer;
using ConsumerBufferPtr = std::shared_ptr<ReadBufferFromRabbitMQConsumer>;
class WriteBufferToNATSProducer;
using ProducerBufferPtr = std::shared_ptr<WriteBufferToRabbitMQProducer>;
}

View File

@ -0,0 +1,121 @@
#include "NATSConnection.h"
#include <base/logger_useful.h>
#include <IO/WriteHelpers.h>
namespace DB
{
static const auto CONNECT_SLEEP = 200;
static const auto RETRIES_MAX = 20;
NATSConnection::NATSConnection(const NATSConfiguration & configuration_, Poco::Logger * log_)
: configuration(configuration_)
, log(log_)
, event_handler(loop.getLoop(), log)
{
}
String NATSConnection::connectionInfoForLog() const
{
return configuration.host + ':' + toString(configuration.port);
}
bool NATSConnection::isConnected()
{
std::lock_guard lock(mutex);
return isConnectedImpl();
}
bool NATSConnection::connect()
{
std::lock_guard lock(mutex);
connectImpl();
return isConnectedImpl();
}
bool NATSConnection::reconnect()
{
std::lock_guard lock(mutex);
if (isConnectedImpl())
return true;
disconnectImpl();
/// This will force immediate closure if not yet closed
if (!connection->closed())
connection->close(true);
LOG_DEBUG(log, "Trying to restore connection to {}", connectionInfoForLog());
connectImpl();
return isConnectedImpl();
}
SubscriptionPtr NATSConnection::createSubscription(const std::string& subject)
{
std::lock_guard lock(mutex);
natsSubscription * ns;
natsConnection_SubscribeSync(&ns, connection, subject.c_str());
return SubscriptionPtr(ns, &natsSubscription_Destroy);
}
void NATSConnection::disconnect()
{
std::lock_guard lock(mutex);
disconnectImpl();
}
bool NATSConnection::closed()
{
std::lock_guard lock(mutex);
return natsConnection_IsClosed(connection);
}
bool NATSConnection::isConnectedImpl() const
{
return event_handler.connectionRunning() && !natsConnection_IsClosed(connection);
}
void NATSConnection::connectImpl()
{
if (configuration.connection_string.empty())
{
LOG_DEBUG(log, "Connecting to: {}:{} (user: {})", configuration.host, configuration.port, configuration.username);
AMQP::Login login(configuration.username, configuration.password);
AMQP::Address address(configuration.host, configuration.port, login, configuration.vhost, configuration.secure);
connection = std::make_unique<AMQP::TcpConnection>(&event_handler, address);
}
else
{
AMQP::Address address(configuration.connection_string);
connection = std::make_unique<AMQP::TcpConnection>(&event_handler, address);
}
auto cnt_retries = 0;
while (true)
{
event_handler.iterateLoop();
if (connection->ready() || cnt_retries++ == RETRIES_MAX)
break;
std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP));
}
}
void NATSConnection::disconnectImpl()
{
natsConnection_Close(connection);
/** Connection is not closed immediately (firstly, all pending operations are completed, and then
* an AMQP closing-handshake is performed). But cannot open a new connection until previous one is properly closed
*/
size_t cnt_retries = 0;
while (!closed() && cnt_retries++ != RETRIES_MAX)
event_handler.iterateLoop();
}
}

View File

@ -0,0 +1,68 @@
#pragma once
#include <Storages/NATS/UVLoop.h>
#include <Storages/NATS/NATSHandler.h>
namespace DB
{
struct NATSConfiguration
{
String url;
String host;
UInt16 port;
String username;
String password;
String vhost;
bool secure;
String connection_string;
};
using SubscriptionPtr = std::unique_ptr<natsSubscription, decltype(&natsSubscription_Destroy)>;
class NATSConnection
{
public:
NATSConnection(const NATSConfiguration & configuration_, Poco::Logger * log_);
bool isConnected();
bool connect();
bool reconnect();
void disconnect();
bool closed();
SubscriptionPtr createSubscription(const std::string& subject);
/// NATSHandler is thread safe. Any public methods can be called concurrently.
NATSHandler & getHandler() { return event_handler; }
String connectionInfoForLog() const;
private:
bool isConnectedImpl() const;
void connectImpl();
void disconnectImpl();
NATSConfiguration configuration;
Poco::Logger * log;
UVLoop loop;
NATSHandler event_handler;
natsConnection * connection;
natsStatus status;
std::mutex mutex;
};
using NATSConnectionPtr = std::unique_ptr<NATSConnection>;
}

View File

@ -0,0 +1,67 @@
#include <base/logger_useful.h>
#include <Common/Exception.h>
#include <Storages/NATS/NATSHandler.h>
namespace DB
{
/* The object of this class is shared between concurrent consumers (who share the same connection == share the same
* event loop and handler).
*/
NATSHandler::NATSHandler(uv_loop_t * loop_, Poco::Logger * log_) :
loop(loop_),
log(log_),
connection_running(false),
loop_running(false),
loop_state(Loop::STOP)
{
natsLibuv_Init();
natsLibuv_SetThreadLocalLoop(loop);
natsOptions_Create(&opts);
status = natsOptions_SetEventLoop(opts, static_cast<void*>(loop),
natsLibuv_Attach,
natsLibuv_Read,
natsLibuv_Write,
natsLibuv_Detach);
}
void NATSHandler::startLoop()
{
std::lock_guard lock(startup_mutex);
LOG_DEBUG(log, "Background loop started");
loop_running.store(true);
while (loop_state.load() == Loop::RUN)
uv_run(loop, UV_RUN_NOWAIT);
LOG_DEBUG(log, "Background loop ended");
loop_running.store(false);
}
void NATSHandler::iterateLoop()
{
std::unique_lock lock(startup_mutex, std::defer_lock);
if (lock.try_lock())
uv_run(loop, UV_RUN_NOWAIT);
}
/// Do not need synchronization as in iterateLoop(), because this method is used only for
/// initial NATS setup - at this point there is no background loop thread.
void NATSHandler::startBlockingLoop()
{
LOG_DEBUG(log, "Started blocking loop.");
uv_run(loop, UV_RUN_DEFAULT);
}
void NATSHandler::stopLoop()
{
LOG_DEBUG(log, "Implicit loop stop.");
uv_stop(loop);
}
NATSHandler::~NATSHandler() {
natsOptions_Destroy(opts);
}
}

View File

@ -0,0 +1,66 @@
#pragma once
#include <thread>
#include <memory>
#include <mutex>
#include <nats.h>
#include <adapters/libuv.h>
#include <amqpcpp.h>
#include <amqpcpp/linux_tcp.h>
#include <base/types.h>
#include <amqpcpp/libuv.h>
#include <Poco/Logger.h>
namespace DB
{
namespace Loop
{
static const UInt8 RUN = 1;
static const UInt8 STOP = 2;
}
class NATSHandler
{
public:
NATSHandler(uv_loop_t * loop_, Poco::Logger * log_);
~NATSHandler();
/// Loop for background thread worker.
void startLoop();
/// Loop to wait for small tasks in a non-blocking mode.
/// Adds synchronization with main background loop.
void iterateLoop();
/// Loop to wait for small tasks in a blocking mode.
/// No synchronization is done with the main loop thread.
void startBlockingLoop();
void stopLoop();
bool connectionRunning() const { return connection_running.load(); }
bool loopRunning() const { return loop_running.load(); }
void updateLoopState(UInt8 state) { loop_state.store(state); }
UInt8 getLoopState() { return loop_state.load(); }
natsStatus getStatus() { return status; }
natsOptions * getOptions() { return opts; }
private:
uv_loop_t * loop;
natsOptions * opts = nullptr;
natsStatus status = NATS_OK;
Poco::Logger * log;
std::atomic<bool> connection_running, loop_running;
std::atomic<UInt8> loop_state;
std::mutex startup_mutex;
};
using NATSHandlerPtr = std::shared_ptr<NATSHandler>;
}

View File

@ -0,0 +1,39 @@
#include <Storages/NATS/NATSSettings.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTFunction.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_SETTING;
}
IMPLEMENT_SETTINGS_TRAITS(NATSSettingsTraits, LIST_OF_RABBITMQ_SETTINGS)
void NATSSettings::loadFromQuery(ASTStorage & storage_def)
{
if (storage_def.settings)
{
try
{
applyChanges(storage_def.settings->changes);
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::UNKNOWN_SETTING)
e.addMessage("for storage " + storage_def.engine->name);
throw;
}
}
else
{
auto settings_ast = std::make_shared<ASTSetQuery>();
settings_ast->is_standalone = false;
storage_def.set(storage_def.settings, settings_ast);
}
}
}

View File

@ -0,0 +1,45 @@
#pragma once
#include <Core/BaseSettings.h>
#include <Core/Settings.h>
namespace DB
{
class ASTStorage;
#define RABBITMQ_RELATED_SETTINGS(M) \
M(String, nats_host_port, "", "A host-port to connect to NATS server.", 0) \
M(String, nats_exchange_name, "clickhouse-exchange", "The exchange name, to which messages are sent.", 0) \
M(String, nats_format, "", "The message format.", 0) \
M(String, nats_exchange_type, "default", "The exchange type.", 0) \
M(String, nats_routing_key_list, "5672", "A string of routing keys, separated by dots.", 0) \
M(Char, nats_row_delimiter, '\0', "The character to be considered as a delimiter.", 0) \
M(String, nats_schema, "", "Schema identifier (used by schema-based formats) for NATS engine", 0) \
M(UInt64, nats_num_consumers, 1, "The number of consumer channels per table.", 0) \
M(UInt64, nats_num_queues, 1, "The number of queues per consumer.", 0) \
M(String, nats_queue_base, "", "Base for queue names to be able to reopen non-empty queues in case of failure.", 0) \
M(Bool, nats_persistent, false, "For insert query messages will be made 'persistent', durable.", 0) \
M(Bool, nats_secure, false, "Use SSL connection", 0) \
M(String, nats_address, "", "Address for connection", 0) \
M(UInt64, nats_skip_broken_messages, 0, "Skip at least this number of broken messages from NATS per block", 0) \
M(UInt64, nats_max_block_size, 0, "Number of row collected before flushing data from NATS.", 0) \
M(Milliseconds, nats_flush_interval_ms, 0, "Timeout for flushing data from NATS.", 0) \
M(String, nats_vhost, "/", "NATS vhost.", 0) \
M(String, nats_queue_settings_list, "", "A list of nats queue settings", 0) \
M(Bool, nats_queue_consume, false, "Use user-defined queues and do not make any NATS setup: declaring exchanges, queues, bindings", 0) \
M(String, nats_username, "", "NATS username", 0) \
M(String, nats_password, "", "NATS password", 0) \
M(Bool, nats_commit_on_select, false, "Commit messages when select query is made", 0) \
#define LIST_OF_RABBITMQ_SETTINGS(M) \
RABBITMQ_RELATED_SETTINGS(M) \
FORMAT_FACTORY_SETTINGS(M)
DECLARE_SETTINGS_TRAITS(NATSSettingsTraits, LIST_OF_RABBITMQ_SETTINGS)
struct NATSSettings : public BaseSettings<NATSSettingsTraits>
{
void loadFromQuery(ASTStorage & storage_def);
};
}

View File

@ -0,0 +1,56 @@
#include <Storages/NATS/NATSSink.h>
#include <Storages/NATS/WriteBufferToNATSProducer.h>
#include <Storages/NATS/StorageNATS.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/IOutputFormat.h>
#include <base/logger_useful.h>
namespace DB
{
NATSSink::NATSSink(
StorageNATS & storage_,
const StorageMetadataPtr & metadata_snapshot_,
ContextPtr context_)
: SinkToStorage(metadata_snapshot_->getSampleBlockNonMaterialized())
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, context(context_)
{
storage.unbindExchange();
}
void NATSSink::onStart()
{
buffer = storage.createWriteBuffer();
buffer->activateWriting();
auto format_settings = getFormatSettings(context);
format_settings.protobuf.allow_multiple_rows_without_delimiter = true;
format = FormatFactory::instance().getOutputFormat(storage.getFormatName(), *buffer, getHeader(), context,
[this](const Columns & /* columns */, size_t /* rows */)
{
buffer->countRow();
},
format_settings);
}
void NATSSink::consume(Chunk chunk)
{
format->write(getHeader().cloneWithColumns(chunk.detachColumns()));
}
void NATSSink::onFinish()
{
format->finalize();
if (buffer)
buffer->updateMaxWait();
}
}

View File

@ -0,0 +1,32 @@
#pragma once
#include <Processors/Sinks/SinkToStorage.h>
#include <Storages/NATS/StorageNATS.h>
namespace DB
{
class IOutputFormat;
using IOutputFormatPtr = std::shared_ptr<IOutputFormat>;
class NATSSink : public SinkToStorage
{
public:
explicit NATSSink(StorageNATS & storage_, const StorageMetadataPtr & metadata_snapshot_, ContextPtr context_);
void onStart() override;
void consume(Chunk chunk) override;
void onFinish() override;
String getName() const override { return "NATSSink"; }
private:
StorageNATS & storage;
StorageMetadataPtr metadata_snapshot;
ContextPtr context;
ProducerBufferPtr buffer;
IOutputFormatPtr format;
};
}

View File

@ -0,0 +1,189 @@
#include <Storages/NATS/NATSSource.h>
#include <Formats/FormatFactory.h>
#include <Interpreters/Context.h>
#include <Processors/Executors/StreamingFormatExecutor.h>
#include <Storages/NATS/ReadBufferFromNATSConsumer.h>
namespace DB
{
static std::pair<Block, Block> getHeaders(const StorageSnapshotPtr & storage_snapshot)
{
auto non_virtual_header = storage_snapshot->metadata->getSampleBlockNonMaterialized();
auto virtual_header = storage_snapshot->getSampleBlockForColumns(
{"_exchange_name", "_channel_id", "_delivery_tag", "_redelivered", "_message_id", "_timestamp"});
return {non_virtual_header, virtual_header};
}
static Block getSampleBlock(const Block & non_virtual_header, const Block & virtual_header)
{
auto header = non_virtual_header;
for (const auto & column : virtual_header)
header.insert(column);
return header;
}
NATSSource::NATSSource(
StorageNATS & storage_,
const StorageSnapshotPtr & storage_snapshot_,
ContextPtr context_,
const Names & columns,
size_t max_block_size_,
bool ack_in_suffix_)
: NATSSource(
storage_,
storage_snapshot_,
getHeaders(storage_snapshot_),
context_,
columns,
max_block_size_,
ack_in_suffix_)
{
}
NATSSource::NATSSource(
StorageNATS & storage_,
const StorageSnapshotPtr & storage_snapshot_,
std::pair<Block, Block> headers,
ContextPtr context_,
const Names & columns,
size_t max_block_size_,
bool ack_in_suffix_)
: SourceWithProgress(getSampleBlock(headers.first, headers.second))
, storage(storage_)
, storage_snapshot(storage_snapshot_)
, context(context_)
, column_names(columns)
, max_block_size(max_block_size_)
, ack_in_suffix(ack_in_suffix_)
, non_virtual_header(std::move(headers.first))
, virtual_header(std::move(headers.second))
{
storage.incrementReader();
}
NATSSource::~NATSSource()
{
storage.decrementReader();
if (!buffer)
return;
storage.pushReadBuffer(buffer);
}
bool NATSSource::needChannelUpdate()
{
if (!buffer)
return false;
return buffer->needChannelUpdate();
}
void NATSSource::updateChannel()
{
if (!buffer)
return;
buffer->updateAckTracker();
if (storage.updateChannel(buffer->getChannel()))
buffer->setupChannel();
}
Chunk NATSSource::generate()
{
auto chunk = generateImpl();
if (!chunk && ack_in_suffix)
sendAck();
return chunk;
}
Chunk NATSSource::generateImpl()
{
if (!buffer)
{
auto timeout = std::chrono::milliseconds(context->getSettingsRef().rabbitmq_max_wait_ms.totalMilliseconds());
buffer = storage.popReadBuffer(timeout);
}
if (!buffer || is_finished)
return {};
is_finished = true;
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
auto input_format = FormatFactory::instance().getInputFormat(
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size);
StreamingFormatExecutor executor(non_virtual_header, input_format);
size_t total_rows = 0;
while (true)
{
if (buffer->eof())
break;
auto new_rows = executor.execute();
if (new_rows)
{
auto exchange_name = storage.getExchange();
auto channel_id = buffer->getChannelID();
auto delivery_tag = buffer->getDeliveryTag();
auto redelivered = buffer->getRedelivered();
auto message_id = buffer->getMessageID();
auto timestamp = buffer->getTimestamp();
buffer->updateAckTracker({delivery_tag, channel_id});
for (size_t i = 0; i < new_rows; ++i)
{
virtual_columns[0]->insert(exchange_name);
virtual_columns[1]->insert(channel_id);
virtual_columns[2]->insert(delivery_tag);
virtual_columns[3]->insert(redelivered);
virtual_columns[4]->insert(message_id);
virtual_columns[5]->insert(timestamp);
}
total_rows = total_rows + new_rows;
}
buffer->allowNext();
if (total_rows >= max_block_size || buffer->queueEmpty() || buffer->isConsumerStopped() || !checkTimeLimit())
break;
}
if (total_rows == 0)
return {};
auto result_columns = executor.getResultColumns();
for (auto & column : virtual_columns)
result_columns.push_back(std::move(column));
return Chunk(std::move(result_columns), total_rows);
}
bool NATSSource::sendAck()
{
if (!buffer)
return false;
if (!buffer->ackMessages())
return false;
return true;
}
}

View File

@ -0,0 +1,61 @@
#pragma once
#include <Processors/Sources/SourceWithProgress.h>
#include <Storages/NATS/StorageNATS.h>
#include <Storages/NATS/ReadBufferFromNATSConsumer.h>
namespace DB
{
class NATSSource : public SourceWithProgress
{
public:
NATSSource(
StorageNATS & storage_,
const StorageSnapshotPtr & storage_snapshot_,
ContextPtr context_,
const Names & columns,
size_t max_block_size_,
bool ack_in_suffix = false);
~NATSSource() override;
String getName() const override { return storage.getName(); }
ConsumerBufferPtr getBuffer() { return buffer; }
Chunk generate() override;
bool queueEmpty() const { return !buffer || buffer->queueEmpty(); }
bool needChannelUpdate();
void updateChannel();
bool sendAck();
private:
StorageNATS & storage;
StorageSnapshotPtr storage_snapshot;
ContextPtr context;
Names column_names;
const size_t max_block_size;
bool ack_in_suffix;
bool is_finished = false;
const Block non_virtual_header;
const Block virtual_header;
ConsumerBufferPtr buffer;
NATSSource(
StorageNATS & storage_,
const StorageSnapshotPtr & storage_snapshot_,
std::pair<Block, Block> headers,
ContextPtr context_,
const Names & columns,
size_t max_block_size_,
bool ack_in_suffix);
Chunk generateImpl();
};
}

View File

@ -0,0 +1,188 @@
#include <utility>
#include <chrono>
#include <thread>
#include <mutex>
#include <atomic>
#include <memory>
#include <Storages/NATS/ReadBufferFromNATSConsumer.h>
#include <Storages/NATS/NATSHandler.h>
#include <boost/algorithm/string/split.hpp>
#include <base/logger_useful.h>
#include "Poco/Timer.h"
#include <amqpcpp.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
ReadBufferFromNATSConsumer::ReadBufferFromNATSConsumer(
NATSHandler & event_handler_,
std::vector<String> & queues_,
size_t channel_id_base_,
const String & channel_base_,
Poco::Logger * log_,
char row_delimiter_,
uint32_t queue_size_,
const std::atomic<bool> & stopped_)
: ReadBuffer(nullptr, 0)
, event_handler(event_handler_)
, queues(queues_)
, channel_base(channel_base_)
, channel_id_base(channel_id_base_)
, log(log_)
, row_delimiter(row_delimiter_)
, stopped(stopped_)
, received(queue_size_)
{
}
ReadBufferFromNATSConsumer::~ReadBufferFromNATSConsumer()
{
BufferBase::set(nullptr, 0, 0);
}
void ReadBufferFromNATSConsumer::subscribe()
{
for (const auto & queue_name : queues)
{
consumer_channel->consume(queue_name)
.onSuccess([&](const std::string & /* consumer_tag */)
{
LOG_TRACE(log, "Consumer on channel {} is subscribed to queue {}", channel_id, queue_name);
if (++subscribed == queues.size())
wait_subscription.store(false);
})
.onReceived([&](const AMQP::Message & message, uint64_t delivery_tag, bool redelivered)
{
if (message.bodySize())
{
String message_received = std::string(message.body(), message.body() + message.bodySize());
if (row_delimiter != '\0')
message_received += row_delimiter;
if (!received.push({message_received, message.hasMessageID() ? message.messageID() : "",
message.hasTimestamp() ? message.timestamp() : 0,
redelivered, AckTracker(delivery_tag, channel_id)}))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to received queue");
}
})
.onError([&](const char * message)
{
/* End up here either if channel ends up in an error state (then there will be resubscription) or consume call error, which
* arises from queue settings mismatch or queue level error, which should not happen as no one else is supposed to touch them
*/
LOG_ERROR(log, "Consumer failed on channel {}. Reason: {}", channel_id, message);
wait_subscription.store(false);
});
}
}
bool ReadBufferFromNATSConsumer::ackMessages()
{
AckTracker record_info = last_inserted_record_info;
/* Do not send ack to server if message's channel is not the same as current running channel because delivery tags are scoped per
* channel, so if channel fails, all previous delivery tags become invalid
*/
if (record_info.channel_id == channel_id && record_info.delivery_tag && record_info.delivery_tag > prev_tag)
{
/// Commit all received messages with delivery tags from last committed to last inserted
if (!consumer_channel->ack(record_info.delivery_tag, AMQP::multiple))
{
LOG_ERROR(log, "Failed to commit messages with delivery tags from last committed to {} on channel {}",
record_info.delivery_tag, channel_id);
return false;
}
prev_tag = record_info.delivery_tag;
LOG_TRACE(log, "Consumer committed messages with deliveryTags up to {} on channel {}", record_info.delivery_tag, channel_id);
}
return true;
}
void ReadBufferFromNATSConsumer::updateAckTracker(AckTracker record_info)
{
if (record_info.delivery_tag && channel_error.load())
return;
if (!record_info.delivery_tag)
prev_tag = 0;
last_inserted_record_info = record_info;
}
void ReadBufferFromNATSConsumer::setupChannel()
{
if (!consumer_channel)
return;
wait_subscription.store(true);
consumer_channel->onReady([&]()
{
/* First number indicates current consumer buffer; second number indicates serial number of created channel for current buffer,
* i.e. if channel fails - another one is created and its serial number is incremented; channel_base is to guarantee that
* channel_id is unique for each table
*/
channel_id = std::to_string(channel_id_base) + "_" + std::to_string(channel_id_counter++) + "_" + channel_base;
LOG_TRACE(log, "Channel {} is created", channel_id);
subscribed = 0;
subscribe();
channel_error.store(false);
});
consumer_channel->onError([&](const char * message)
{
LOG_ERROR(log, "Channel {} error: {}", channel_id, message);
channel_error.store(true);
wait_subscription.store(false);
});
}
bool ReadBufferFromNATSConsumer::needChannelUpdate()
{
if (wait_subscription)
return false;
return channel_error || !consumer_channel || !consumer_channel->usable();
}
void ReadBufferFromNATSConsumer::iterateEventLoop()
{
event_handler.iterateLoop();
}
bool ReadBufferFromNATSConsumer::nextImpl()
{
if (stopped || !allowed)
return false;
if (received.tryPop(current))
{
auto * new_position = const_cast<char *>(current.message.data());
BufferBase::set(new_position, current.message.size(), 0);
allowed = false;
return true;
}
return false;
}
}

View File

@ -0,0 +1,104 @@
#pragma once
#include <Core/Names.h>
#include <base/types.h>
#include <IO/ReadBuffer.h>
#include <amqpcpp.h>
#include <nats.h>
#include <Storages/NATS/NATSHandler.h>
#include <Common/ConcurrentBoundedQueue.h>
namespace Poco
{
class Logger;
}
namespace DB
{
class ReadBufferFromNATSConsumer : public ReadBuffer
{
public:
ReadBufferFromNATSConsumer(
NATSHandler & event_handler_,
std::vector<String> & queues_,
size_t channel_id_base_,
const String & channel_base_,
Poco::Logger * log_,
char row_delimiter_,
uint32_t queue_size_,
const std::atomic<bool> & stopped_);
~ReadBufferFromNATSConsumer() override;
struct AckTracker
{
UInt64 delivery_tag;
String channel_id;
AckTracker() = default;
AckTracker(UInt64 tag, String id) : delivery_tag(tag), channel_id(id) {}
};
struct MessageData
{
String message;
String message_id;
uint64_t timestamp = 0;
bool redelivered = false;
AckTracker track{};
};
ChannelPtr & getChannel() { return consumer_channel; }
void setupChannel();
bool needChannelUpdate();
void closeChannel()
{
if (consumer_channel)
consumer_channel->close();
}
void updateQueues(std::vector<String> & queues_) { queues = queues_; }
size_t queuesCount() { return queues.size(); }
bool isConsumerStopped() { return stopped; }
bool ackMessages();
void updateAckTracker(AckTracker record = AckTracker());
bool queueEmpty() { return received.empty(); }
void allowNext() { allowed = true; } // Allow to read next message.
auto getChannelID() const { return current.track.channel_id; }
auto getDeliveryTag() const { return current.track.delivery_tag; }
auto getRedelivered() const { return current.redelivered; }
auto getMessageID() const { return current.message_id; }
auto getTimestamp() const { return current.timestamp; }
private:
bool nextImpl() override;
void subscribe();
void iterateEventLoop();
ChannelPtr consumer_channel;
NATSHandler & event_handler; /// Used concurrently, but is thread safe.
std::vector<String> queues;
const String channel_base;
const size_t channel_id_base;
Poco::Logger * log;
char row_delimiter;
bool allowed = true;
const std::atomic<bool> & stopped;
String channel_id;
std::atomic<bool> channel_error = true, wait_subscription = false;
ConcurrentBoundedQueue<MessageData> received;
MessageData current;
size_t subscribed = 0;
AckTracker last_inserted_record_info;
UInt64 prev_tag = 0, channel_id_counter = 0;
};
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,207 @@
#pragma once
#include <Core/BackgroundSchedulePool.h>
#include <Storages/IStorage.h>
#include <Poco/Semaphore.h>
#include <base/shared_ptr_helper.h>
#include <mutex>
#include <atomic>
#include <Storages/NATS/Buffer_fwd.h>
#include <Storages/NATS/NATSSettings.h>
#include <Storages/NATS/NATSConnection.h>
#include <Common/thread_local_rng.h>
#include <amqpcpp/libuv.h>
#include <uv.h>
#include <random>
namespace DB
{
class StorageNATS final: public shared_ptr_helper<StorageNATS>, public IStorage, WithContext
{
friend struct shared_ptr_helper<StorageNATS>;
public:
std::string getName() const override { return "NATS"; }
bool noPushingToViews() const override { return true; }
void startup() override;
void shutdown() override;
/// This is a bad way to let storage know in shutdown() that table is going to be dropped. There are some actions which need
/// to be done only when table is dropped (not when detached). Also connection must be closed only in shutdown, but those
/// actions require an open connection. Therefore there needs to be a way inside shutdown() method to know whether it is called
/// because of drop query. And drop() method is not suitable at all, because it will not only require to reopen connection, but also
/// it can be called considerable time after table is dropped (for example, in case of Atomic database), which is not appropriate for the case.
void checkTableCanBeDropped() const override { drop_table = true; }
/// Always return virtual columns in addition to required columns
Pipe read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
SinkToStoragePtr write(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context) override;
void pushReadBuffer(ConsumerBufferPtr buf);
ConsumerBufferPtr popReadBuffer();
ConsumerBufferPtr popReadBuffer(std::chrono::milliseconds timeout);
ProducerBufferPtr createWriteBuffer();
const String & getFormatName() const { return format_name; }
NamesAndTypesList getVirtuals() const override;
String getExchange() const { return exchange_name; }
void unbindExchange();
bool updateChannel(ChannelPtr & channel);
void updateQueues(std::vector<String> & queues_) { queues_ = queues; }
void prepareChannelForBuffer(ConsumerBufferPtr buffer);
void incrementReader();
void decrementReader();
protected:
StorageNATS(
const StorageID & table_id_,
ContextPtr context_,
const ColumnsDescription & columns_,
std::unique_ptr<NATSSettings> nats_settings_,
bool is_attach_);
private:
ContextMutablePtr nats_context;
std::unique_ptr<NATSSettings> nats_settings;
const String exchange_name;
const String format_name;
AMQP::ExchangeType exchange_type;
Names routing_keys;
char row_delimiter;
const String schema_name;
size_t num_consumers;
size_t num_queues;
String queue_base;
Names queue_settings_list;
/// For insert query. Mark messages as durable.
const bool persistent;
/// A table setting. It is possible not to perform any NATS setup, which is supposed to be consumer-side setup:
/// declaring exchanges, queues, bindings. Instead everything needed from NATS table is to connect to a specific queue.
/// This solution disables all optimizations and is not really optimal, but allows user to fully control all NATS setup.
bool use_user_setup;
bool hash_exchange;
Poco::Logger * log;
NATSConnectionPtr connection; /// Connection for all consumers
NATSConfiguration configuration;
size_t num_created_consumers = 0;
Poco::Semaphore semaphore;
std::mutex buffers_mutex;
std::vector<ConsumerBufferPtr> buffers; /// available buffers for NATS consumers
String unique_strbase; /// to make unique consumer channel id
/// maximum number of messages in NATS queue (x-max-length). Also used
/// to setup size of inner buffer for received messages
uint32_t queue_size;
String sharding_exchange, bridge_exchange, consumer_exchange;
size_t consumer_id = 0; /// counter for consumer buffer, needed for channel id
std::vector<String> queues;
std::once_flag flag; /// remove exchange only once
std::mutex task_mutex;
BackgroundSchedulePool::TaskHolder streaming_task;
BackgroundSchedulePool::TaskHolder looping_task;
BackgroundSchedulePool::TaskHolder connection_task;
uint64_t milliseconds_to_wait;
/**
* ( ͡° ͜ʖ ͡° )* Evil atomics:
*/
/// Needed for tell MV or producer background tasks
/// that they must finish as soon as possible.
std::atomic<bool> shutdown_called{false};
/// Counter for producer buffers, needed for channel id.
/// Needed to generate unique producer buffer identifiers.
std::atomic<size_t> producer_id = 1;
/// Has connection background task completed successfully?
/// It is started only once -- in constructor.
std::atomic<bool> rabbit_is_ready = false;
/// Allow to remove exchange only once.
std::atomic<bool> exchange_removed = false;
/// For select query we must be aware of the end of streaming
/// to be able to turn off the loop.
std::atomic<size_t> readers_count = 0;
std::atomic<bool> mv_attached = false;
/// In select query we start event loop, but do not stop it
/// after that select is finished. Then in a thread, which
/// checks for MV we also check if we have select readers.
/// If not - we turn off the loop. The checks are done under
/// mutex to avoid having a turned off loop when select was
/// started.
std::mutex loop_mutex;
size_t read_attempts = 0;
mutable bool drop_table = false;
bool is_attach;
ConsumerBufferPtr createReadBuffer();
void initializeBuffers();
bool initialized = false;
/// Functions working in the background
void streamingToViewsFunc();
void loopingFunc();
void connectionFunc();
void startLoop();
void stopLoop();
void stopLoopIfNoReaders();
static Names parseSettings(String settings_list);
static AMQP::ExchangeType defineExchangeType(String exchange_type_);
static String getTableBasedName(String name, const StorageID & table_id);
ContextMutablePtr addSettings(ContextPtr context) const;
size_t getMaxBlockSize() const;
void deactivateTask(BackgroundSchedulePool::TaskHolder & task, bool wait, bool stop_loop);
void initNATS();
void cleanupNATS() const;
void initExchange(AMQP::TcpChannel & rabbit_channel);
void bindExchange(AMQP::TcpChannel & rabbit_channel);
void bindQueue(size_t queue_id, AMQP::TcpChannel & rabbit_channel);
bool streamToViews();
bool checkDependencies(const StorageID & table_id);
static String getRandomName()
{
std::uniform_int_distribution<int> distribution('a', 'z');
String random_str(32, ' ');
for (auto & c : random_str)
c = distribution(thread_local_rng);
return random_str;
}
};
}

View File

@ -0,0 +1,44 @@
#pragma once
#include <memory>
#include <boost/noncopyable.hpp>
#include <uv.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SYSTEM_ERROR;
}
/// RAII wrapper around uv event loop
class UVLoop : public boost::noncopyable
{
public:
UVLoop(): loop_ptr(new uv_loop_t())
{
int res = uv_loop_init(loop_ptr.get());
if (res != 0)
throw Exception("UVLoop could not initialize", ErrorCodes::SYSTEM_ERROR);
}
~UVLoop()
{
if (loop_ptr)
uv_loop_close(loop_ptr.get());
}
inline uv_loop_t * getLoop() { return loop_ptr.get(); }
inline const uv_loop_t * getLoop() const { return loop_ptr.get(); }
private:
std::unique_ptr<uv_loop_t> loop_ptr;
};
}

View File

@ -0,0 +1,317 @@
#include <Storages/NATS/WriteBufferToNATSProducer.h>
#include <Core/Block.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Interpreters/Context.h>
#include <base/logger_useful.h>
#include <amqpcpp.h>
#include <uv.h>
#include <boost/algorithm/string/split.hpp>
#include <chrono>
#include <thread>
#include <atomic>
namespace DB
{
static const auto BATCH = 1000;
static const auto RETURNED_LIMIT = 50000;
namespace ErrorCodes
{
extern const int CANNOT_CONNECT_RABBITMQ;
extern const int LOGICAL_ERROR;
}
WriteBufferToNATSProducer::WriteBufferToNATSProducer(
const NATSConfiguration & configuration_,
ContextPtr global_context,
const Names & routing_keys_,
const String & exchange_name_,
const AMQP::ExchangeType exchange_type_,
const size_t channel_id_base_,
const bool persistent_,
std::atomic<bool> & shutdown_called_,
Poco::Logger * log_,
std::optional<char> delimiter,
size_t rows_per_message,
size_t chunk_size_)
: WriteBuffer(nullptr, 0)
, connection(configuration_, log_)
, routing_keys(routing_keys_)
, exchange_name(exchange_name_)
, exchange_type(exchange_type_)
, channel_id_base(std::to_string(channel_id_base_))
, persistent(persistent_)
, shutdown_called(shutdown_called_)
, payloads(BATCH)
, returned(RETURNED_LIMIT)
, log(log_)
, delim(delimiter)
, max_rows(rows_per_message)
, chunk_size(chunk_size_)
{
if (connection.connect())
setupChannel();
else
throw Exception(ErrorCodes::CANNOT_CONNECT_RABBITMQ, "Cannot connect to NATS {}", connection.connectionInfoForLog());
writing_task = global_context->getSchedulePool().createTask("NATSWritingTask", [this]{ writingFunc(); });
writing_task->deactivate();
if (exchange_type == AMQP::ExchangeType::headers)
{
for (const auto & header : routing_keys)
{
std::vector<String> matching;
boost::split(matching, header, [](char c){ return c == '='; });
key_arguments[matching[0]] = matching[1];
}
}
reinitializeChunks();
}
WriteBufferToNATSProducer::~WriteBufferToNATSProducer()
{
writing_task->deactivate();
connection.disconnect();
assert(rows == 0);
}
void WriteBufferToNATSProducer::countRow()
{
if (++rows % max_rows == 0)
{
const std::string & last_chunk = chunks.back();
size_t last_chunk_size = offset();
if (last_chunk_size && delim && last_chunk[last_chunk_size - 1] == delim)
--last_chunk_size;
std::string payload;
payload.reserve((chunks.size() - 1) * chunk_size + last_chunk_size);
for (auto i = chunks.begin(), end = --chunks.end(); i != end; ++i)
payload.append(*i);
payload.append(last_chunk, 0, last_chunk_size);
reinitializeChunks();
++payload_counter;
if (!payloads.push(std::make_pair(payload_counter, payload)))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to payloads queue");
}
}
void WriteBufferToNATSProducer::setupChannel()
{
producer_channel = connection.createChannel();
producer_channel->onError([&](const char * message)
{
LOG_ERROR(log, "Producer's channel {} error: {}", channel_id, message);
/// Channel is not usable anymore. (https://github.com/CopernicaMarketingSoftware/AMQP-CPP/issues/36#issuecomment-125112236)
producer_channel->close();
/* Save records that have not received ack/nack from server before channel closure. They are removed and pushed back again once
* they are republished because after channel recovery they will acquire new delivery tags, so all previous records become invalid
*/
for (const auto & record : delivery_record)
if (!returned.push(record.second))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to returned queue");
LOG_DEBUG(log, "Producer on channel {} hasn't confirmed {} messages, {} waiting to be published",
channel_id, delivery_record.size(), payloads.size());
/// Delivery tags are scoped per channel.
delivery_record.clear();
delivery_tag = 0;
producer_ready = false;
});
producer_channel->onReady([&]()
{
channel_id = channel_id_base + "_" + std::to_string(channel_id_counter++);
LOG_DEBUG(log, "Producer's channel {} is ready", channel_id);
/* if persistent == true, onAck is received when message is persisted to disk or when it is consumed on every queue. If fails,
* onNack() is received. If persistent == false, message is confirmed the moment it is enqueued. First option is two times
* slower than the second, so default is second and the first is turned on in table setting.
*
* "Publisher confirms" are implemented similar to strategy#3 here https://www.rabbitmq.com/tutorials/tutorial-seven-java.html
*/
producer_channel->confirmSelect()
.onAck([&](uint64_t acked_delivery_tag, bool multiple)
{
removeRecord(acked_delivery_tag, multiple, false);
})
.onNack([&](uint64_t nacked_delivery_tag, bool multiple, bool /* requeue */)
{
removeRecord(nacked_delivery_tag, multiple, true);
});
producer_ready = true;
});
}
void WriteBufferToNATSProducer::removeRecord(UInt64 received_delivery_tag, bool multiple, bool republish)
{
auto record_iter = delivery_record.find(received_delivery_tag);
assert(record_iter != delivery_record.end());
if (multiple)
{
/// If multiple is true, then all delivery tags up to and including current are confirmed (with ack or nack).
++record_iter;
if (republish)
for (auto record = delivery_record.begin(); record != record_iter; ++record)
if (!returned.push(record->second))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to returned queue");
/// Delete the records even in case when republished because new delivery tags will be assigned by the server.
delivery_record.erase(delivery_record.begin(), record_iter);
}
else
{
if (republish)
if (!returned.push(record_iter->second))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to returned queue");
delivery_record.erase(record_iter);
}
}
void WriteBufferToNATSProducer::publish(ConcurrentBoundedQueue<std::pair<UInt64, String>> & messages, bool republishing)
{
std::pair<UInt64, String> payload;
/* It is important to make sure that delivery_record.size() is never bigger than returned.size(), i.e. number if unacknowledged
* messages cannot exceed returned.size(), because they all might end up there
*/
while (!messages.empty() && producer_channel->usable() && delivery_record.size() < RETURNED_LIMIT)
{
bool pop_result = messages.pop(payload);
if (!pop_result)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not pop payload");
AMQP::Envelope envelope(payload.second.data(), payload.second.size());
/// if headers exchange is used, routing keys are added here via headers, if not - it is just empty
AMQP::Table message_settings = key_arguments;
/* There is the case when connection is lost in the period after some messages were published and before ack/nack was sent by the
* server, then it means that publisher will never know whether those messages were delivered or not, and therefore those records
* that received no ack/nack before connection loss will be republished (see onError() callback), so there might be duplicates. To
* let consumer know that received message might be a possible duplicate - a "republished" field is added to message metadata
*/
message_settings["republished"] = std::to_string(republishing);
envelope.setHeaders(message_settings);
/* Adding here a messageID property to message metadata. Since NATS does not guarantee exactly-once delivery, then on the
* consumer side "republished" field of message metadata can be checked and, if it set to 1, consumer might also check "messageID"
* property. This way detection of duplicates is guaranteed
*/
envelope.setMessageID(std::to_string(payload.first));
/// Delivery mode is 1 or 2. 1 is default. 2 makes a message durable, but makes performance 1.5-2 times worse
if (persistent)
envelope.setDeliveryMode(2);
if (exchange_type == AMQP::ExchangeType::consistent_hash)
{
producer_channel->publish(exchange_name, std::to_string(delivery_tag), envelope);
}
else if (exchange_type == AMQP::ExchangeType::headers)
{
producer_channel->publish(exchange_name, "", envelope);
}
else
{
producer_channel->publish(exchange_name, routing_keys[0], envelope);
}
/// This is needed for "publisher confirms", which guarantees at-least-once delivery
++delivery_tag;
delivery_record.insert(delivery_record.end(), {delivery_tag, payload});
/// Need to break at some point to let event loop run, because no publishing actually happens before looping
if (delivery_tag % BATCH == 0)
break;
}
iterateEventLoop();
}
void WriteBufferToNATSProducer::writingFunc()
{
while ((!payloads.empty() || wait_all) && !shutdown_called.load())
{
/// If onReady callback is not received, producer->usable() will anyway return true,
/// but must publish only after onReady callback.
if (producer_ready)
{
/* Publish main paylods only when there are no returned messages. This way it is ensured that returned messages are republished
* as fast as possible and no new publishes are made before returned messages are handled
*/
if (!returned.empty() && producer_channel->usable())
publish(returned, true);
else if (!payloads.empty() && producer_channel->usable())
publish(payloads, false);
}
iterateEventLoop();
if (wait_num.load() && delivery_record.empty() && payloads.empty() && returned.empty())
wait_all = false;
else if (!producer_channel->usable())
{
if (connection.reconnect())
setupChannel();
}
}
LOG_DEBUG(log, "Producer on channel {} completed", channel_id);
}
void WriteBufferToNATSProducer::nextImpl()
{
addChunk();
}
void WriteBufferToNATSProducer::addChunk()
{
chunks.push_back(std::string());
chunks.back().resize(chunk_size);
set(chunks.back().data(), chunk_size);
}
void WriteBufferToNATSProducer::reinitializeChunks()
{
rows = 0;
chunks.clear();
/// We cannot leave the buffer in the undefined state (i.e. without any
/// underlying buffer), since in this case the WriteBuffeR::next() will
/// not call our nextImpl() (due to available() == 0)
addChunk();
}
void WriteBufferToNATSProducer::iterateEventLoop()
{
connection.getHandler().iterateLoop();
}
}

View File

@ -0,0 +1,121 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <Columns/IColumn.h>
#include <list>
#include <mutex>
#include <atomic>
#include <amqpcpp.h>
#include <Storages/NATS/NATSConnection.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Core/BackgroundSchedulePool.h>
#include <Core/Names.h>
namespace DB
{
class WriteBufferToNATSProducer : public WriteBuffer
{
public:
WriteBufferToNATSProducer(
const NATSConfiguration & configuration_,
ContextPtr global_context,
const Names & routing_keys_,
const String & exchange_name_,
const AMQP::ExchangeType exchange_type_,
const size_t channel_id_base_,
const bool persistent_,
std::atomic<bool> & shutdown_called_,
Poco::Logger * log_,
std::optional<char> delimiter,
size_t rows_per_message,
size_t chunk_size_
);
~WriteBufferToNATSProducer() override;
void countRow();
void activateWriting() { writing_task->activateAndSchedule(); }
void updateMaxWait() { wait_num.store(payload_counter); }
private:
void nextImpl() override;
void addChunk();
void reinitializeChunks();
void iterateEventLoop();
void writingFunc();
void setupChannel();
void removeRecord(UInt64 received_delivery_tag, bool multiple, bool republish);
void publish(ConcurrentBoundedQueue<std::pair<UInt64, String>> & message, bool republishing);
NATSConnection connection;
const Names routing_keys;
const String exchange_name;
AMQP::ExchangeType exchange_type;
const String channel_id_base; /// Serial number of current producer buffer
const bool persistent;
/* false: when shutdown is called; needed because table might be dropped before all acks are received
* true: in all other cases
*/
std::atomic<bool> & shutdown_called;
AMQP::Table key_arguments;
BackgroundSchedulePool::TaskHolder writing_task;
std::unique_ptr<AMQP::TcpChannel> producer_channel;
bool producer_ready = false;
/// Channel errors lead to channel closure, need to count number of recreated channels to update channel id
UInt64 channel_id_counter = 0;
/// channel id which contains id of current producer buffer and serial number of recreated channel in this buffer
String channel_id;
/* payloads.queue:
* - payloads are pushed to queue in countRow and popped by another thread in writingFunc, each payload gets into queue only once
* returned.queue:
* - payloads are pushed to queue:
* 1) inside channel->onError() callback if channel becomes unusable and the record of pending acknowledgements from server
* is non-empty.
* 2) inside removeRecord() if received nack() - negative acknowledgement from the server that message failed to be written
* to disk or it was unable to reach the queue.
* - payloads are popped from the queue once republished
*/
ConcurrentBoundedQueue<std::pair<UInt64, String>> payloads, returned;
/* Counter of current delivery on a current channel. Delivery tags are scoped per channel. The server attaches a delivery tag for each
* published message - a serial number of delivery on current channel. Delivery tag is a way of server to notify publisher if it was
* able or unable to process delivery, i.e. it sends back a response with a corresponding delivery tag.
*/
UInt64 delivery_tag = 0;
/* false: message delivery successfully ended: publisher received confirm from server that all published
* 1) persistent messages were written to disk
* 2) non-persistent messages reached the queue
* true: continue to process deliveries and returned messages
*/
bool wait_all = true;
/* false: until writeSuffix is called
* true: means payloads.queue will not grow anymore
*/
std::atomic<UInt64> wait_num = 0;
/// Needed to fill messageID property
UInt64 payload_counter = 0;
/// Record of pending acknowledgements from the server; its size never exceeds size of returned.queue
std::map<UInt64, std::pair<UInt64, String>> delivery_record;
Poco::Logger * log;
const std::optional<char> delim;
const size_t max_rows;
const size_t chunk_size;
size_t rows = 0;
std::list<std::string> chunks;
};
}