diff --git a/.gitmodules b/.gitmodules index 5fd9e9721f6..412fefea899 100644 --- a/.gitmodules +++ b/.gitmodules @@ -265,3 +265,6 @@ [submodule "contrib/wyhash"] path = contrib/wyhash url = https://github.com/wangyi-fudan/wyhash.git +[submodule "contrib/nats-io"] + path = contrib/nats-io + url = https://github.com/nats-io/nats.c.git diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 94003f3b3ee..b6fe9da167d 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -132,6 +132,7 @@ add_contrib (krb5-cmake krb5) add_contrib (cyrus-sasl-cmake cyrus-sasl) # for krb5 add_contrib (libgsasl-cmake libgsasl) # requires krb5 add_contrib (librdkafka-cmake librdkafka) # requires: libgsasl +add_contrib (nats-io-cmake nats-io) add_contrib (libhdfs3-cmake libhdfs3) # requires: protobuf, krb5 add_contrib (hive-metastore-cmake hive-metastore) # requires: thrift/avro/arrow/libhdfs3 add_contrib (cppkafka-cmake cppkafka) diff --git a/contrib/nats-io b/contrib/nats-io new file mode 160000 index 00000000000..d1f59f7bcf8 --- /dev/null +++ b/contrib/nats-io @@ -0,0 +1 @@ +Subproject commit d1f59f7bcf8465526f7e6d9c99982cbd6b209547 diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 20db948abd0..4e364629f24 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -107,6 +107,8 @@ if (TARGET ch_contrib::rdkafka) add_headers_and_sources(dbms Storages/Kafka) endif() +add_headers_and_sources(dbms Storages/NATS) + if (TARGET ch_contrib::amqp_cpp) add_headers_and_sources(dbms Storages/RabbitMQ) endif() @@ -380,6 +382,8 @@ if (TARGET ch_contrib::rdkafka) dbms_target_link_libraries(PRIVATE ch_contrib::rdkafka ch_contrib::cppkafka) endif() +dbms_target_link_libraries(PRIVATE ch_contrib::nats-io) + if (TARGET ch_contrib::sasl2) dbms_target_link_libraries(PRIVATE ch_contrib::sasl2) endif() diff --git a/src/Storages/NATS/Buffer_fwd.h b/src/Storages/NATS/Buffer_fwd.h new file mode 100644 index 00000000000..caf7b8cfdbe --- /dev/null +++ b/src/Storages/NATS/Buffer_fwd.h @@ -0,0 +1,14 @@ +#pragma once + +#include + +namespace DB +{ + +class ReadBufferFromNATSConsumer; +using ConsumerBufferPtr = std::shared_ptr; + +class WriteBufferToNATSProducer; +using ProducerBufferPtr = std::shared_ptr; + +} diff --git a/src/Storages/NATS/NATSConnection.cpp b/src/Storages/NATS/NATSConnection.cpp new file mode 100644 index 00000000000..7b0abba7535 --- /dev/null +++ b/src/Storages/NATS/NATSConnection.cpp @@ -0,0 +1,121 @@ +#include "NATSConnection.h" + +#include +#include + + +namespace DB +{ + +static const auto CONNECT_SLEEP = 200; +static const auto RETRIES_MAX = 20; + + +NATSConnection::NATSConnection(const NATSConfiguration & configuration_, Poco::Logger * log_) + : configuration(configuration_) + , log(log_) + , event_handler(loop.getLoop(), log) +{ +} + +String NATSConnection::connectionInfoForLog() const +{ + return configuration.host + ':' + toString(configuration.port); +} + +bool NATSConnection::isConnected() +{ + std::lock_guard lock(mutex); + return isConnectedImpl(); +} + +bool NATSConnection::connect() +{ + std::lock_guard lock(mutex); + connectImpl(); + return isConnectedImpl(); +} + +bool NATSConnection::reconnect() +{ + std::lock_guard lock(mutex); + if (isConnectedImpl()) + return true; + + disconnectImpl(); + + /// This will force immediate closure if not yet closed + if (!connection->closed()) + connection->close(true); + + LOG_DEBUG(log, "Trying to restore connection to {}", connectionInfoForLog()); + connectImpl(); + + return isConnectedImpl(); +} + +SubscriptionPtr NATSConnection::createSubscription(const std::string& subject) +{ + std::lock_guard lock(mutex); + natsSubscription * ns; + natsConnection_SubscribeSync(&ns, connection, subject.c_str()); + return SubscriptionPtr(ns, &natsSubscription_Destroy); +} + +void NATSConnection::disconnect() +{ + std::lock_guard lock(mutex); + disconnectImpl(); +} + +bool NATSConnection::closed() +{ + std::lock_guard lock(mutex); + return natsConnection_IsClosed(connection); +} + +bool NATSConnection::isConnectedImpl() const +{ + return event_handler.connectionRunning() && !natsConnection_IsClosed(connection); +} + +void NATSConnection::connectImpl() +{ + if (configuration.connection_string.empty()) + { + LOG_DEBUG(log, "Connecting to: {}:{} (user: {})", configuration.host, configuration.port, configuration.username); + AMQP::Login login(configuration.username, configuration.password); + AMQP::Address address(configuration.host, configuration.port, login, configuration.vhost, configuration.secure); + connection = std::make_unique(&event_handler, address); + } + else + { + AMQP::Address address(configuration.connection_string); + connection = std::make_unique(&event_handler, address); + } + + auto cnt_retries = 0; + while (true) + { + event_handler.iterateLoop(); + + if (connection->ready() || cnt_retries++ == RETRIES_MAX) + break; + + std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP)); + } +} + +void NATSConnection::disconnectImpl() +{ + natsConnection_Close(connection); + + /** Connection is not closed immediately (firstly, all pending operations are completed, and then + * an AMQP closing-handshake is performed). But cannot open a new connection until previous one is properly closed + */ + size_t cnt_retries = 0; + while (!closed() && cnt_retries++ != RETRIES_MAX) + event_handler.iterateLoop(); +} + +} diff --git a/src/Storages/NATS/NATSConnection.h b/src/Storages/NATS/NATSConnection.h new file mode 100644 index 00000000000..2c999e873aa --- /dev/null +++ b/src/Storages/NATS/NATSConnection.h @@ -0,0 +1,68 @@ +#pragma once + +#include +#include + + +namespace DB +{ + +struct NATSConfiguration +{ + String url; + String host; + UInt16 port; + String username; + String password; + String vhost; + + bool secure; + String connection_string; +}; + +using SubscriptionPtr = std::unique_ptr; + +class NATSConnection +{ + +public: + NATSConnection(const NATSConfiguration & configuration_, Poco::Logger * log_); + + bool isConnected(); + + bool connect(); + + bool reconnect(); + + void disconnect(); + + bool closed(); + + SubscriptionPtr createSubscription(const std::string& subject); + + /// NATSHandler is thread safe. Any public methods can be called concurrently. + NATSHandler & getHandler() { return event_handler; } + + String connectionInfoForLog() const; + +private: + bool isConnectedImpl() const; + + void connectImpl(); + + void disconnectImpl(); + + NATSConfiguration configuration; + Poco::Logger * log; + + UVLoop loop; + NATSHandler event_handler; + + natsConnection * connection; + natsStatus status; + std::mutex mutex; +}; + +using NATSConnectionPtr = std::unique_ptr; + +} diff --git a/src/Storages/NATS/NATSHandler.cpp b/src/Storages/NATS/NATSHandler.cpp new file mode 100644 index 00000000000..7fb8ff38c47 --- /dev/null +++ b/src/Storages/NATS/NATSHandler.cpp @@ -0,0 +1,67 @@ +#include +#include +#include + +namespace DB +{ + +/* The object of this class is shared between concurrent consumers (who share the same connection == share the same + * event loop and handler). + */ +NATSHandler::NATSHandler(uv_loop_t * loop_, Poco::Logger * log_) : + loop(loop_), + log(log_), + connection_running(false), + loop_running(false), + loop_state(Loop::STOP) +{ + natsLibuv_Init(); + natsLibuv_SetThreadLocalLoop(loop); + natsOptions_Create(&opts); + status = natsOptions_SetEventLoop(opts, static_cast(loop), + natsLibuv_Attach, + natsLibuv_Read, + natsLibuv_Write, + natsLibuv_Detach); +} + +void NATSHandler::startLoop() +{ + std::lock_guard lock(startup_mutex); + + LOG_DEBUG(log, "Background loop started"); + loop_running.store(true); + + while (loop_state.load() == Loop::RUN) + uv_run(loop, UV_RUN_NOWAIT); + + LOG_DEBUG(log, "Background loop ended"); + loop_running.store(false); +} + +void NATSHandler::iterateLoop() +{ + std::unique_lock lock(startup_mutex, std::defer_lock); + if (lock.try_lock()) + uv_run(loop, UV_RUN_NOWAIT); +} + +/// Do not need synchronization as in iterateLoop(), because this method is used only for +/// initial NATS setup - at this point there is no background loop thread. +void NATSHandler::startBlockingLoop() +{ + LOG_DEBUG(log, "Started blocking loop."); + uv_run(loop, UV_RUN_DEFAULT); +} + +void NATSHandler::stopLoop() +{ + LOG_DEBUG(log, "Implicit loop stop."); + uv_stop(loop); +} + +NATSHandler::~NATSHandler() { + natsOptions_Destroy(opts); +} + +} diff --git a/src/Storages/NATS/NATSHandler.h b/src/Storages/NATS/NATSHandler.h new file mode 100644 index 00000000000..12ea3454b9d --- /dev/null +++ b/src/Storages/NATS/NATSHandler.h @@ -0,0 +1,66 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace Loop +{ + static const UInt8 RUN = 1; + static const UInt8 STOP = 2; +} + +class NATSHandler +{ + +public: + NATSHandler(uv_loop_t * loop_, Poco::Logger * log_); + + ~NATSHandler(); + + /// Loop for background thread worker. + void startLoop(); + + /// Loop to wait for small tasks in a non-blocking mode. + /// Adds synchronization with main background loop. + void iterateLoop(); + + /// Loop to wait for small tasks in a blocking mode. + /// No synchronization is done with the main loop thread. + void startBlockingLoop(); + + void stopLoop(); + + bool connectionRunning() const { return connection_running.load(); } + bool loopRunning() const { return loop_running.load(); } + + void updateLoopState(UInt8 state) { loop_state.store(state); } + UInt8 getLoopState() { return loop_state.load(); } + + natsStatus getStatus() { return status; } + natsOptions * getOptions() { return opts; } + +private: + uv_loop_t * loop; + natsOptions * opts = nullptr; + natsStatus status = NATS_OK; + Poco::Logger * log; + + std::atomic connection_running, loop_running; + std::atomic loop_state; + std::mutex startup_mutex; +}; + +using NATSHandlerPtr = std::shared_ptr; + +} diff --git a/src/Storages/NATS/NATSSettings.cpp b/src/Storages/NATS/NATSSettings.cpp new file mode 100644 index 00000000000..11659b105b2 --- /dev/null +++ b/src/Storages/NATS/NATSSettings.cpp @@ -0,0 +1,39 @@ +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int UNKNOWN_SETTING; +} + +IMPLEMENT_SETTINGS_TRAITS(NATSSettingsTraits, LIST_OF_RABBITMQ_SETTINGS) + +void NATSSettings::loadFromQuery(ASTStorage & storage_def) +{ + if (storage_def.settings) + { + try + { + applyChanges(storage_def.settings->changes); + } + catch (Exception & e) + { + if (e.code() == ErrorCodes::UNKNOWN_SETTING) + e.addMessage("for storage " + storage_def.engine->name); + throw; + } + } + else + { + auto settings_ast = std::make_shared(); + settings_ast->is_standalone = false; + storage_def.set(storage_def.settings, settings_ast); + } +} +} diff --git a/src/Storages/NATS/NATSSettings.h b/src/Storages/NATS/NATSSettings.h new file mode 100644 index 00000000000..5f4051e06d1 --- /dev/null +++ b/src/Storages/NATS/NATSSettings.h @@ -0,0 +1,45 @@ +#pragma once + +#include +#include + +namespace DB +{ + class ASTStorage; + + +#define RABBITMQ_RELATED_SETTINGS(M) \ + M(String, nats_host_port, "", "A host-port to connect to NATS server.", 0) \ + M(String, nats_exchange_name, "clickhouse-exchange", "The exchange name, to which messages are sent.", 0) \ + M(String, nats_format, "", "The message format.", 0) \ + M(String, nats_exchange_type, "default", "The exchange type.", 0) \ + M(String, nats_routing_key_list, "5672", "A string of routing keys, separated by dots.", 0) \ + M(Char, nats_row_delimiter, '\0', "The character to be considered as a delimiter.", 0) \ + M(String, nats_schema, "", "Schema identifier (used by schema-based formats) for NATS engine", 0) \ + M(UInt64, nats_num_consumers, 1, "The number of consumer channels per table.", 0) \ + M(UInt64, nats_num_queues, 1, "The number of queues per consumer.", 0) \ + M(String, nats_queue_base, "", "Base for queue names to be able to reopen non-empty queues in case of failure.", 0) \ + M(Bool, nats_persistent, false, "For insert query messages will be made 'persistent', durable.", 0) \ + M(Bool, nats_secure, false, "Use SSL connection", 0) \ + M(String, nats_address, "", "Address for connection", 0) \ + M(UInt64, nats_skip_broken_messages, 0, "Skip at least this number of broken messages from NATS per block", 0) \ + M(UInt64, nats_max_block_size, 0, "Number of row collected before flushing data from NATS.", 0) \ + M(Milliseconds, nats_flush_interval_ms, 0, "Timeout for flushing data from NATS.", 0) \ + M(String, nats_vhost, "/", "NATS vhost.", 0) \ + M(String, nats_queue_settings_list, "", "A list of nats queue settings", 0) \ + M(Bool, nats_queue_consume, false, "Use user-defined queues and do not make any NATS setup: declaring exchanges, queues, bindings", 0) \ + M(String, nats_username, "", "NATS username", 0) \ + M(String, nats_password, "", "NATS password", 0) \ + M(Bool, nats_commit_on_select, false, "Commit messages when select query is made", 0) \ + +#define LIST_OF_RABBITMQ_SETTINGS(M) \ + RABBITMQ_RELATED_SETTINGS(M) \ + FORMAT_FACTORY_SETTINGS(M) + +DECLARE_SETTINGS_TRAITS(NATSSettingsTraits, LIST_OF_RABBITMQ_SETTINGS) + +struct NATSSettings : public BaseSettings +{ + void loadFromQuery(ASTStorage & storage_def); +}; +} diff --git a/src/Storages/NATS/NATSSink.cpp b/src/Storages/NATS/NATSSink.cpp new file mode 100644 index 00000000000..db2620d9ed2 --- /dev/null +++ b/src/Storages/NATS/NATSSink.cpp @@ -0,0 +1,56 @@ +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +NATSSink::NATSSink( + StorageNATS & storage_, + const StorageMetadataPtr & metadata_snapshot_, + ContextPtr context_) + : SinkToStorage(metadata_snapshot_->getSampleBlockNonMaterialized()) + , storage(storage_) + , metadata_snapshot(metadata_snapshot_) + , context(context_) +{ + storage.unbindExchange(); +} + + +void NATSSink::onStart() +{ + buffer = storage.createWriteBuffer(); + buffer->activateWriting(); + + auto format_settings = getFormatSettings(context); + format_settings.protobuf.allow_multiple_rows_without_delimiter = true; + + format = FormatFactory::instance().getOutputFormat(storage.getFormatName(), *buffer, getHeader(), context, + [this](const Columns & /* columns */, size_t /* rows */) + { + buffer->countRow(); + }, + format_settings); +} + + +void NATSSink::consume(Chunk chunk) +{ + format->write(getHeader().cloneWithColumns(chunk.detachColumns())); +} + + +void NATSSink::onFinish() +{ + format->finalize(); + + if (buffer) + buffer->updateMaxWait(); +} + +} diff --git a/src/Storages/NATS/NATSSink.h b/src/Storages/NATS/NATSSink.h new file mode 100644 index 00000000000..2f887dd3a4f --- /dev/null +++ b/src/Storages/NATS/NATSSink.h @@ -0,0 +1,32 @@ +#pragma once + +#include +#include + + +namespace DB +{ + +class IOutputFormat; +using IOutputFormatPtr = std::shared_ptr; + +class NATSSink : public SinkToStorage +{ + +public: + explicit NATSSink(StorageNATS & storage_, const StorageMetadataPtr & metadata_snapshot_, ContextPtr context_); + + void onStart() override; + void consume(Chunk chunk) override; + void onFinish() override; + + String getName() const override { return "NATSSink"; } + +private: + StorageNATS & storage; + StorageMetadataPtr metadata_snapshot; + ContextPtr context; + ProducerBufferPtr buffer; + IOutputFormatPtr format; +}; +} diff --git a/src/Storages/NATS/NATSSource.cpp b/src/Storages/NATS/NATSSource.cpp new file mode 100644 index 00000000000..046b8792ced --- /dev/null +++ b/src/Storages/NATS/NATSSource.cpp @@ -0,0 +1,189 @@ +#include + +#include +#include +#include +#include + +namespace DB +{ + +static std::pair getHeaders(const StorageSnapshotPtr & storage_snapshot) +{ + auto non_virtual_header = storage_snapshot->metadata->getSampleBlockNonMaterialized(); + auto virtual_header = storage_snapshot->getSampleBlockForColumns( + {"_exchange_name", "_channel_id", "_delivery_tag", "_redelivered", "_message_id", "_timestamp"}); + + return {non_virtual_header, virtual_header}; +} + +static Block getSampleBlock(const Block & non_virtual_header, const Block & virtual_header) +{ + auto header = non_virtual_header; + for (const auto & column : virtual_header) + header.insert(column); + + return header; +} + +NATSSource::NATSSource( + StorageNATS & storage_, + const StorageSnapshotPtr & storage_snapshot_, + ContextPtr context_, + const Names & columns, + size_t max_block_size_, + bool ack_in_suffix_) + : NATSSource( + storage_, + storage_snapshot_, + getHeaders(storage_snapshot_), + context_, + columns, + max_block_size_, + ack_in_suffix_) +{ +} + +NATSSource::NATSSource( + StorageNATS & storage_, + const StorageSnapshotPtr & storage_snapshot_, + std::pair headers, + ContextPtr context_, + const Names & columns, + size_t max_block_size_, + bool ack_in_suffix_) + : SourceWithProgress(getSampleBlock(headers.first, headers.second)) + , storage(storage_) + , storage_snapshot(storage_snapshot_) + , context(context_) + , column_names(columns) + , max_block_size(max_block_size_) + , ack_in_suffix(ack_in_suffix_) + , non_virtual_header(std::move(headers.first)) + , virtual_header(std::move(headers.second)) +{ + storage.incrementReader(); +} + + +NATSSource::~NATSSource() +{ + storage.decrementReader(); + + if (!buffer) + return; + + storage.pushReadBuffer(buffer); +} + + +bool NATSSource::needChannelUpdate() +{ + if (!buffer) + return false; + + return buffer->needChannelUpdate(); +} + + +void NATSSource::updateChannel() +{ + if (!buffer) + return; + + buffer->updateAckTracker(); + + if (storage.updateChannel(buffer->getChannel())) + buffer->setupChannel(); +} + +Chunk NATSSource::generate() +{ + auto chunk = generateImpl(); + if (!chunk && ack_in_suffix) + sendAck(); + + return chunk; +} + +Chunk NATSSource::generateImpl() +{ + if (!buffer) + { + auto timeout = std::chrono::milliseconds(context->getSettingsRef().rabbitmq_max_wait_ms.totalMilliseconds()); + buffer = storage.popReadBuffer(timeout); + } + + if (!buffer || is_finished) + return {}; + + is_finished = true; + + MutableColumns virtual_columns = virtual_header.cloneEmptyColumns(); + auto input_format = FormatFactory::instance().getInputFormat( + storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size); + + StreamingFormatExecutor executor(non_virtual_header, input_format); + + size_t total_rows = 0; + + while (true) + { + if (buffer->eof()) + break; + + auto new_rows = executor.execute(); + + if (new_rows) + { + auto exchange_name = storage.getExchange(); + auto channel_id = buffer->getChannelID(); + auto delivery_tag = buffer->getDeliveryTag(); + auto redelivered = buffer->getRedelivered(); + auto message_id = buffer->getMessageID(); + auto timestamp = buffer->getTimestamp(); + + buffer->updateAckTracker({delivery_tag, channel_id}); + + for (size_t i = 0; i < new_rows; ++i) + { + virtual_columns[0]->insert(exchange_name); + virtual_columns[1]->insert(channel_id); + virtual_columns[2]->insert(delivery_tag); + virtual_columns[3]->insert(redelivered); + virtual_columns[4]->insert(message_id); + virtual_columns[5]->insert(timestamp); + } + + total_rows = total_rows + new_rows; + } + + buffer->allowNext(); + + if (total_rows >= max_block_size || buffer->queueEmpty() || buffer->isConsumerStopped() || !checkTimeLimit()) + break; + } + + if (total_rows == 0) + return {}; + + auto result_columns = executor.getResultColumns(); + for (auto & column : virtual_columns) + result_columns.push_back(std::move(column)); + + return Chunk(std::move(result_columns), total_rows); +} + + +bool NATSSource::sendAck() +{ + if (!buffer) + return false; + + if (!buffer->ackMessages()) + return false; + + return true; +} + +} diff --git a/src/Storages/NATS/NATSSource.h b/src/Storages/NATS/NATSSource.h new file mode 100644 index 00000000000..ce48e5cf382 --- /dev/null +++ b/src/Storages/NATS/NATSSource.h @@ -0,0 +1,61 @@ +#pragma once + +#include +#include +#include + + +namespace DB +{ + +class NATSSource : public SourceWithProgress +{ + +public: + NATSSource( + StorageNATS & storage_, + const StorageSnapshotPtr & storage_snapshot_, + ContextPtr context_, + const Names & columns, + size_t max_block_size_, + bool ack_in_suffix = false); + + ~NATSSource() override; + + String getName() const override { return storage.getName(); } + ConsumerBufferPtr getBuffer() { return buffer; } + + Chunk generate() override; + + bool queueEmpty() const { return !buffer || buffer->queueEmpty(); } + bool needChannelUpdate(); + void updateChannel(); + bool sendAck(); + +private: + StorageNATS & storage; + StorageSnapshotPtr storage_snapshot; + ContextPtr context; + Names column_names; + const size_t max_block_size; + bool ack_in_suffix; + + bool is_finished = false; + const Block non_virtual_header; + const Block virtual_header; + + ConsumerBufferPtr buffer; + + NATSSource( + StorageNATS & storage_, + const StorageSnapshotPtr & storage_snapshot_, + std::pair headers, + ContextPtr context_, + const Names & columns, + size_t max_block_size_, + bool ack_in_suffix); + + Chunk generateImpl(); +}; + +} diff --git a/src/Storages/NATS/ReadBufferFromNATSConsumer.cpp b/src/Storages/NATS/ReadBufferFromNATSConsumer.cpp new file mode 100644 index 00000000000..ad966401cc3 --- /dev/null +++ b/src/Storages/NATS/ReadBufferFromNATSConsumer.cpp @@ -0,0 +1,188 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "Poco/Timer.h" +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +ReadBufferFromNATSConsumer::ReadBufferFromNATSConsumer( + NATSHandler & event_handler_, + std::vector & queues_, + size_t channel_id_base_, + const String & channel_base_, + Poco::Logger * log_, + char row_delimiter_, + uint32_t queue_size_, + const std::atomic & stopped_) + : ReadBuffer(nullptr, 0) + , event_handler(event_handler_) + , queues(queues_) + , channel_base(channel_base_) + , channel_id_base(channel_id_base_) + , log(log_) + , row_delimiter(row_delimiter_) + , stopped(stopped_) + , received(queue_size_) +{ +} + + +ReadBufferFromNATSConsumer::~ReadBufferFromNATSConsumer() +{ + BufferBase::set(nullptr, 0, 0); +} + + +void ReadBufferFromNATSConsumer::subscribe() +{ + for (const auto & queue_name : queues) + { + consumer_channel->consume(queue_name) + .onSuccess([&](const std::string & /* consumer_tag */) + { + LOG_TRACE(log, "Consumer on channel {} is subscribed to queue {}", channel_id, queue_name); + + if (++subscribed == queues.size()) + wait_subscription.store(false); + }) + .onReceived([&](const AMQP::Message & message, uint64_t delivery_tag, bool redelivered) + { + if (message.bodySize()) + { + String message_received = std::string(message.body(), message.body() + message.bodySize()); + if (row_delimiter != '\0') + message_received += row_delimiter; + + if (!received.push({message_received, message.hasMessageID() ? message.messageID() : "", + message.hasTimestamp() ? message.timestamp() : 0, + redelivered, AckTracker(delivery_tag, channel_id)})) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to received queue"); + } + }) + .onError([&](const char * message) + { + /* End up here either if channel ends up in an error state (then there will be resubscription) or consume call error, which + * arises from queue settings mismatch or queue level error, which should not happen as no one else is supposed to touch them + */ + LOG_ERROR(log, "Consumer failed on channel {}. Reason: {}", channel_id, message); + wait_subscription.store(false); + }); + } +} + + +bool ReadBufferFromNATSConsumer::ackMessages() +{ + AckTracker record_info = last_inserted_record_info; + + /* Do not send ack to server if message's channel is not the same as current running channel because delivery tags are scoped per + * channel, so if channel fails, all previous delivery tags become invalid + */ + if (record_info.channel_id == channel_id && record_info.delivery_tag && record_info.delivery_tag > prev_tag) + { + /// Commit all received messages with delivery tags from last committed to last inserted + if (!consumer_channel->ack(record_info.delivery_tag, AMQP::multiple)) + { + LOG_ERROR(log, "Failed to commit messages with delivery tags from last committed to {} on channel {}", + record_info.delivery_tag, channel_id); + return false; + } + + prev_tag = record_info.delivery_tag; + LOG_TRACE(log, "Consumer committed messages with deliveryTags up to {} on channel {}", record_info.delivery_tag, channel_id); + } + + return true; +} + + +void ReadBufferFromNATSConsumer::updateAckTracker(AckTracker record_info) +{ + if (record_info.delivery_tag && channel_error.load()) + return; + + if (!record_info.delivery_tag) + prev_tag = 0; + + last_inserted_record_info = record_info; +} + + +void ReadBufferFromNATSConsumer::setupChannel() +{ + if (!consumer_channel) + return; + + wait_subscription.store(true); + + consumer_channel->onReady([&]() + { + /* First number indicates current consumer buffer; second number indicates serial number of created channel for current buffer, + * i.e. if channel fails - another one is created and its serial number is incremented; channel_base is to guarantee that + * channel_id is unique for each table + */ + channel_id = std::to_string(channel_id_base) + "_" + std::to_string(channel_id_counter++) + "_" + channel_base; + LOG_TRACE(log, "Channel {} is created", channel_id); + + subscribed = 0; + subscribe(); + channel_error.store(false); + }); + + consumer_channel->onError([&](const char * message) + { + LOG_ERROR(log, "Channel {} error: {}", channel_id, message); + + channel_error.store(true); + wait_subscription.store(false); + }); +} + + +bool ReadBufferFromNATSConsumer::needChannelUpdate() +{ + if (wait_subscription) + return false; + + return channel_error || !consumer_channel || !consumer_channel->usable(); +} + + +void ReadBufferFromNATSConsumer::iterateEventLoop() +{ + event_handler.iterateLoop(); +} + + +bool ReadBufferFromNATSConsumer::nextImpl() +{ + if (stopped || !allowed) + return false; + + if (received.tryPop(current)) + { + auto * new_position = const_cast(current.message.data()); + BufferBase::set(new_position, current.message.size(), 0); + allowed = false; + + return true; + } + + return false; +} + +} diff --git a/src/Storages/NATS/ReadBufferFromNATSConsumer.h b/src/Storages/NATS/ReadBufferFromNATSConsumer.h new file mode 100644 index 00000000000..1206ee4e62d --- /dev/null +++ b/src/Storages/NATS/ReadBufferFromNATSConsumer.h @@ -0,0 +1,104 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace Poco +{ + class Logger; +} + +namespace DB +{ + +class ReadBufferFromNATSConsumer : public ReadBuffer +{ + +public: + ReadBufferFromNATSConsumer( + NATSHandler & event_handler_, + std::vector & queues_, + size_t channel_id_base_, + const String & channel_base_, + Poco::Logger * log_, + char row_delimiter_, + uint32_t queue_size_, + const std::atomic & stopped_); + + ~ReadBufferFromNATSConsumer() override; + + struct AckTracker + { + UInt64 delivery_tag; + String channel_id; + + AckTracker() = default; + AckTracker(UInt64 tag, String id) : delivery_tag(tag), channel_id(id) {} + }; + + struct MessageData + { + String message; + String message_id; + uint64_t timestamp = 0; + bool redelivered = false; + AckTracker track{}; + }; + + ChannelPtr & getChannel() { return consumer_channel; } + void setupChannel(); + bool needChannelUpdate(); + void closeChannel() + { + if (consumer_channel) + consumer_channel->close(); + } + + void updateQueues(std::vector & queues_) { queues = queues_; } + size_t queuesCount() { return queues.size(); } + + bool isConsumerStopped() { return stopped; } + bool ackMessages(); + void updateAckTracker(AckTracker record = AckTracker()); + + bool queueEmpty() { return received.empty(); } + void allowNext() { allowed = true; } // Allow to read next message. + + auto getChannelID() const { return current.track.channel_id; } + auto getDeliveryTag() const { return current.track.delivery_tag; } + auto getRedelivered() const { return current.redelivered; } + auto getMessageID() const { return current.message_id; } + auto getTimestamp() const { return current.timestamp; } + +private: + bool nextImpl() override; + + void subscribe(); + void iterateEventLoop(); + + ChannelPtr consumer_channel; + NATSHandler & event_handler; /// Used concurrently, but is thread safe. + std::vector queues; + const String channel_base; + const size_t channel_id_base; + Poco::Logger * log; + char row_delimiter; + bool allowed = true; + const std::atomic & stopped; + + String channel_id; + std::atomic channel_error = true, wait_subscription = false; + ConcurrentBoundedQueue received; + MessageData current; + size_t subscribed = 0; + + AckTracker last_inserted_record_info; + UInt64 prev_tag = 0, channel_id_counter = 0; +}; + +} diff --git a/src/Storages/NATS/StorageNATS.cpp b/src/Storages/NATS/StorageNATS.cpp new file mode 100644 index 00000000000..41ab2d1fc9b --- /dev/null +++ b/src/Storages/NATS/StorageNATS.cpp @@ -0,0 +1,1186 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +static const uint32_t QUEUE_SIZE = 100000; +static const auto MAX_FAILED_READ_ATTEMPTS = 10; +static const auto RESCHEDULE_MS = 500; +static const auto BACKOFF_TRESHOLD = 32000; +static const auto MAX_THREAD_WORK_DURATION_MS = 60000; + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; + extern const int BAD_ARGUMENTS; + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int CANNOT_CONNECT_RABBITMQ; + extern const int CANNOT_BIND_RABBITMQ_EXCHANGE; + extern const int CANNOT_DECLARE_RABBITMQ_EXCHANGE; + extern const int CANNOT_REMOVE_RABBITMQ_EXCHANGE; + extern const int CANNOT_CREATE_RABBITMQ_QUEUE_BINDING; + extern const int QUERY_NOT_ALLOWED; +} + +namespace ExchangeType +{ + /// Note that default here means default by implementation and not by nats settings + static const String DEFAULT = "default"; + static const String FANOUT = "fanout"; + static const String DIRECT = "direct"; + static const String TOPIC = "topic"; + static const String HASH = "consistent_hash"; + static const String HEADERS = "headers"; +} + + +StorageNATS::StorageNATS( + const StorageID & table_id_, + ContextPtr context_, + const ColumnsDescription & columns_, + std::unique_ptr nats_settings_, + bool is_attach_) + : IStorage(table_id_) + , WithContext(context_->getGlobalContext()) + , nats_settings(std::move(nats_settings_)) + , exchange_name(getContext()->getMacros()->expand(nats_settings->nats_exchange_name)) + , format_name(getContext()->getMacros()->expand(nats_settings->nats_format)) + , exchange_type(defineExchangeType(getContext()->getMacros()->expand(nats_settings->nats_exchange_type))) + , routing_keys(parseSettings(getContext()->getMacros()->expand(nats_settings->nats_routing_key_list))) + , row_delimiter(nats_settings->nats_row_delimiter.value) + , schema_name(getContext()->getMacros()->expand(nats_settings->nats_schema)) + , num_consumers(nats_settings->nats_num_consumers.value) + , num_queues(nats_settings->nats_num_queues.value) + , queue_base(getContext()->getMacros()->expand(nats_settings->nats_queue_base)) + , queue_settings_list(parseSettings(getContext()->getMacros()->expand(nats_settings->nats_queue_settings_list))) + , persistent(nats_settings->nats_persistent.value) + , use_user_setup(nats_settings->nats_queue_consume.value) + , hash_exchange(num_consumers > 1 || num_queues > 1) + , log(&Poco::Logger::get("StorageNATS (" + table_id_.table_name + ")")) + , semaphore(0, num_consumers) + , unique_strbase(getRandomName()) + , queue_size(std::max(QUEUE_SIZE, static_cast(getMaxBlockSize()))) + , milliseconds_to_wait(RESCHEDULE_MS) + , is_attach(is_attach_) +{ + auto parsed_address = parseAddress(getContext()->getMacros()->expand(nats_settings->nats_host_port), 5672); + context_->getRemoteHostFilter().checkHostAndPort(parsed_address.first, toString(parsed_address.second)); + + auto nats_username = nats_settings->nats_username.value; + auto nats_password = nats_settings->nats_password.value; + configuration = + { + .host = parsed_address.first, + .port = parsed_address.second, + .username = nats_username.empty() ? getContext()->getConfigRef().getString("nats.username") : nats_username, + .password = nats_password.empty() ? getContext()->getConfigRef().getString("nats.password") : nats_password, + .vhost = getContext()->getConfigRef().getString("nats.vhost", getContext()->getMacros()->expand(nats_settings->nats_vhost)), + .secure = nats_settings->nats_secure.value, + .connection_string = getContext()->getMacros()->expand(nats_settings->nats_address) + }; + + if (configuration.secure) + SSL_library_init(); + + StorageInMemoryMetadata storage_metadata; + storage_metadata.setColumns(columns_); + setInMemoryMetadata(storage_metadata); + + nats_context = addSettings(getContext()); + nats_context->makeQueryContext(); + + if (queue_base.empty()) + { + /* Make sure that local exchange name is unique for each table and is not the same as client's exchange name. It also needs to + * be table-based and not just a random string, because local exchanges should be declared the same for same tables + */ + sharding_exchange = getTableBasedName(exchange_name, table_id_); + + /* By default without a specified queue name in queue's declaration - its name will be generated by the library, but its better + * to specify it unique for each table to reuse them once the table is recreated. So it means that queues remain the same for every + * table unless queue_base table setting is specified (which allows to register consumers to specific queues). Now this is a base + * for the names of later declared queues + */ + queue_base = getTableBasedName("", table_id_); + } + else + { + /* In case different tables are used to register multiple consumers to the same queues (so queues are shared between tables) and + * at the same time sharding exchange is needed (if there are multiple shared queues), then those tables also need to share + * sharding exchange and bridge exchange + */ + sharding_exchange = exchange_name + "_" + queue_base; + } + + bridge_exchange = sharding_exchange + "_bridge"; + + try + { + connection = std::make_unique(configuration, log); + if (connection->connect()) + initNATS(); + else if (!is_attach) + throw Exception(ErrorCodes::CANNOT_CONNECT_RABBITMQ, "Cannot connect to {}", connection->connectionInfoForLog()); + } + catch (...) + { + tryLogCurrentException(log); + if (!is_attach) + throw; + } + + /// One looping task for all consumers as they share the same connection == the same handler == the same event loop + looping_task = getContext()->getMessageBrokerSchedulePool().createTask("NATSLoopingTask", [this]{ loopingFunc(); }); + looping_task->deactivate(); + + streaming_task = getContext()->getMessageBrokerSchedulePool().createTask("NATSStreamingTask", [this]{ streamingToViewsFunc(); }); + streaming_task->deactivate(); + + connection_task = getContext()->getMessageBrokerSchedulePool().createTask("NATSConnectionTask", [this]{ connectionFunc(); }); + connection_task->deactivate(); +} + + +Names StorageNATS::parseSettings(String settings_list) +{ + Names result; + if (settings_list.empty()) + return result; + boost::split(result, settings_list, [](char c){ return c == ','; }); + for (String & key : result) + boost::trim(key); + + return result; +} + + +AMQP::ExchangeType StorageNATS::defineExchangeType(String exchange_type_) +{ + AMQP::ExchangeType type; + if (exchange_type_ != ExchangeType::DEFAULT) + { + if (exchange_type_ == ExchangeType::FANOUT) type = AMQP::ExchangeType::fanout; + else if (exchange_type_ == ExchangeType::DIRECT) type = AMQP::ExchangeType::direct; + else if (exchange_type_ == ExchangeType::TOPIC) type = AMQP::ExchangeType::topic; + else if (exchange_type_ == ExchangeType::HASH) type = AMQP::ExchangeType::consistent_hash; + else if (exchange_type_ == ExchangeType::HEADERS) type = AMQP::ExchangeType::headers; + else throw Exception("Invalid exchange type", ErrorCodes::BAD_ARGUMENTS); + } + else + { + type = AMQP::ExchangeType::fanout; + } + + return type; +} + + +String StorageNATS::getTableBasedName(String name, const StorageID & table_id) +{ + if (name.empty()) + return fmt::format("{}_{}", table_id.database_name, table_id.table_name); + else + return fmt::format("{}_{}_{}", name, table_id.database_name, table_id.table_name); +} + + +ContextMutablePtr StorageNATS::addSettings(ContextPtr local_context) const +{ + auto modified_context = Context::createCopy(local_context); + modified_context->setSetting("input_format_skip_unknown_fields", true); + modified_context->setSetting("input_format_allow_errors_ratio", 0.); + modified_context->setSetting("input_format_allow_errors_num", nats_settings->nats_skip_broken_messages.value); + + if (!schema_name.empty()) + modified_context->setSetting("format_schema", schema_name); + + for (const auto & setting : *nats_settings) + { + const auto & setting_name = setting.getName(); + + /// check for non-nats-related settings + if (!setting_name.starts_with("nats_")) + modified_context->setSetting(setting_name, setting.getValue()); + } + + return modified_context; +} + + +void StorageNATS::loopingFunc() +{ + connection->getHandler().startLoop(); +} + + +void StorageNATS::stopLoop() +{ + connection->getHandler().updateLoopState(Loop::STOP); +} + +void StorageNATS::stopLoopIfNoReaders() +{ + /// Stop the loop if no select was started. + /// There can be a case that selects are finished + /// but not all sources decremented the counter, then + /// it is ok that the loop is not stopped, because + /// there is a background task (streaming_task), which + /// also checks whether there is an idle loop. + std::lock_guard lock(loop_mutex); + if (readers_count) + return; + connection->getHandler().updateLoopState(Loop::STOP); +} + +void StorageNATS::startLoop() +{ + assert(rabbit_is_ready); + connection->getHandler().updateLoopState(Loop::RUN); + looping_task->activateAndSchedule(); +} + + +void StorageNATS::incrementReader() +{ + ++readers_count; +} + + +void StorageNATS::decrementReader() +{ + --readers_count; +} + + +void StorageNATS::connectionFunc() +{ + if (rabbit_is_ready) + return; + + if (connection->reconnect()) + initNATS(); + else + connection_task->scheduleAfter(RESCHEDULE_MS); +} + + +/* Need to deactivate this way because otherwise might get a deadlock when first deactivate streaming task in shutdown and then + * inside streaming task try to deactivate any other task + */ +void StorageNATS::deactivateTask(BackgroundSchedulePool::TaskHolder & task, bool wait, bool stop_loop) +{ + if (stop_loop) + stopLoop(); + + std::unique_lock lock(task_mutex, std::defer_lock); + if (lock.try_lock()) + { + task->deactivate(); + lock.unlock(); + } + else if (wait) /// Wait only if deactivating from shutdown + { + lock.lock(); + task->deactivate(); + } +} + + +size_t StorageNATS::getMaxBlockSize() const +{ + return nats_settings->nats_max_block_size.changed + ? nats_settings->nats_max_block_size.value + : (getContext()->getSettingsRef().max_insert_block_size.value / num_consumers); +} + + +void StorageNATS::initNATS() +{ + if (shutdown_called || rabbit_is_ready) + return; + + if (use_user_setup) + { + queues.emplace_back(queue_base); + rabbit_is_ready = true; + return; + } + + try + { + auto rabbit_channel = connection->createChannel(); + + /// Main exchange -> Bridge exchange -> ( Sharding exchange ) -> Queues -> Consumers + + initExchange(*rabbit_channel); + bindExchange(*rabbit_channel); + + for (const auto i : collections::range(0, num_queues)) + bindQueue(i + 1, *rabbit_channel); + + LOG_TRACE(log, "NATS setup completed"); + rabbit_is_ready = true; + rabbit_channel->close(); + } + catch (...) + { + tryLogCurrentException(log); + if (!is_attach) + throw; + } +} + + +void StorageNATS::initExchange(AMQP::TcpChannel & rabbit_channel) +{ + /// Exchange hierarchy: + /// 1. Main exchange (defined with table settings - nats_exchange_name, nats_exchange_type). + /// 2. Bridge exchange (fanout). Used to easily disconnect main exchange and to simplify queue bindings. + /// 3. Sharding (or hash) exchange. Used in case of multiple queues. + /// 4. Consumer exchange. Just an alias for bridge_exchange or sharding exchange to know to what exchange + /// queues will be bound. + + /// All exchanges are declared with options: + /// 1. `durable` (survive NATS server restart) + /// 2. `autodelete` (auto delete in case of queue bindings are dropped). + + rabbit_channel.declareExchange(exchange_name, exchange_type, AMQP::durable) + .onError([&](const char * message) + { + /// This error can be a result of attempt to declare exchange if it was already declared but + /// 1) with different exchange type. + /// 2) with different exchange settings. + throw Exception("Unable to declare exchange. Make sure specified exchange is not already declared. Error: " + + std::string(message), ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE); + }); + + rabbit_channel.declareExchange(bridge_exchange, AMQP::fanout, AMQP::durable | AMQP::autodelete) + .onError([&](const char * message) + { + /// This error is not supposed to happen as this exchange name is always unique to type and its settings. + throw Exception( + ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE, "Unable to declare bridge exchange ({}). Reason: {}", bridge_exchange, std::string(message)); + }); + + if (!hash_exchange) + { + consumer_exchange = bridge_exchange; + return; + } + + AMQP::Table binding_arguments; + + /// Default routing key property in case of hash exchange is a routing key, which is required to be an integer. + /// Support for arbitrary exchange type (i.e. arbitrary pattern of routing keys) requires to eliminate this dependency. + /// This settings changes hash property to message_id. + binding_arguments["hash-property"] = "message_id"; + + /// Declare hash exchange for sharding. + rabbit_channel.declareExchange(sharding_exchange, AMQP::consistent_hash, AMQP::durable | AMQP::autodelete, binding_arguments) + .onError([&](const char * message) + { + /// This error can be a result of same reasons as above for exchange_name, i.e. it will mean that sharding exchange name appeared + /// to be the same as some other exchange (which purpose is not for sharding). So probably actual error reason: queue_base parameter + /// is bad. + throw Exception( + ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE, + "Unable to declare sharding exchange ({}). Reason: {}", sharding_exchange, std::string(message)); + }); + + rabbit_channel.bindExchange(bridge_exchange, sharding_exchange, routing_keys[0]) + .onError([&](const char * message) + { + throw Exception( + ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE, + "Unable to bind bridge exchange ({}) to sharding exchange ({}). Reason: {}", + bridge_exchange, + sharding_exchange, + std::string(message)); + }); + + consumer_exchange = sharding_exchange; +} + + +void StorageNATS::bindExchange(AMQP::TcpChannel & rabbit_channel) +{ + size_t bound_keys = 0; + + if (exchange_type == AMQP::ExchangeType::headers) + { + AMQP::Table bind_headers; + for (const auto & header : routing_keys) + { + std::vector matching; + boost::split(matching, header, [](char c){ return c == '='; }); + bind_headers[matching[0]] = matching[1]; + } + + rabbit_channel.bindExchange(exchange_name, bridge_exchange, routing_keys[0], bind_headers) + .onSuccess([&]() { connection->getHandler().stopLoop(); }) + .onError([&](const char * message) + { + throw Exception( + ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE, + "Unable to bind exchange {} to bridge exchange ({}). Reason: {}", + exchange_name, bridge_exchange, std::string(message)); + }); + } + else if (exchange_type == AMQP::ExchangeType::fanout || exchange_type == AMQP::ExchangeType::consistent_hash) + { + rabbit_channel.bindExchange(exchange_name, bridge_exchange, routing_keys[0]) + .onSuccess([&]() { connection->getHandler().stopLoop(); }) + .onError([&](const char * message) + { + throw Exception( + ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE, + "Unable to bind exchange {} to bridge exchange ({}). Reason: {}", + exchange_name, bridge_exchange, std::string(message)); + }); + } + else + { + for (const auto & routing_key : routing_keys) + { + rabbit_channel.bindExchange(exchange_name, bridge_exchange, routing_key) + .onSuccess([&]() + { + ++bound_keys; + if (bound_keys == routing_keys.size()) + connection->getHandler().stopLoop(); + }) + .onError([&](const char * message) + { + throw Exception( + ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE, + "Unable to bind exchange {} to bridge exchange ({}). Reason: {}", + exchange_name, bridge_exchange, std::string(message)); + }); + } + } + + connection->getHandler().startBlockingLoop(); +} + + +void StorageNATS::bindQueue(size_t queue_id, AMQP::TcpChannel & rabbit_channel) +{ + auto success_callback = [&](const std::string & queue_name, int msgcount, int /* consumercount */) + { + queues.emplace_back(queue_name); + LOG_DEBUG(log, "Queue {} is declared", queue_name); + + if (msgcount) + LOG_INFO(log, "Queue {} is non-empty. Non-consumed messaged will also be delivered", queue_name); + + /* Here we bind either to sharding exchange (consistent-hash) or to bridge exchange (fanout). All bindings to routing keys are + * done between client's exchange and local bridge exchange. Binding key must be a string integer in case of hash exchange, for + * fanout exchange it can be arbitrary + */ + rabbit_channel.bindQueue(consumer_exchange, queue_name, std::to_string(queue_id)) + .onSuccess([&] { connection->getHandler().stopLoop(); }) + .onError([&](const char * message) + { + throw Exception( + ErrorCodes::CANNOT_CREATE_RABBITMQ_QUEUE_BINDING, + "Failed to create queue binding for exchange {}. Reason: {}", exchange_name, std::string(message)); + }); + }; + + auto error_callback([&](const char * message) + { + /* This error is most likely a result of an attempt to declare queue with different settings if it was declared before. So for a + * given queue name either deadletter_exchange parameter changed or queue_size changed, i.e. table was declared with different + * max_block_size parameter. Solution: client should specify a different queue_base parameter or manually delete previously + * declared queues via any of the various cli tools. + */ + throw Exception("Failed to declare queue. Probably queue settings are conflicting: max_block_size, deadletter_exchange. Attempt \ + specifying differently those settings or use a different queue_base or manually delete previously declared queues, \ + which were declared with the same names. ERROR reason: " + + std::string(message), ErrorCodes::BAD_ARGUMENTS); + }); + + AMQP::Table queue_settings; + + std::unordered_set integer_settings = {"x-max-length", "x-max-length-bytes", "x-message-ttl", "x-expires", "x-priority", "x-max-priority"}; + std::unordered_set string_settings = {"x-overflow", "x-dead-letter-exchange", "x-queue-type"}; + + /// Check user-defined settings. + if (!queue_settings_list.empty()) + { + for (const auto & setting : queue_settings_list) + { + Strings setting_values; + splitInto<'='>(setting_values, setting); + if (setting_values.size() != 2) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid settings string: {}", setting); + + String key = setting_values[0], value = setting_values[1]; + + if (integer_settings.contains(key)) + queue_settings[key] = parse(value); + else if (string_settings.find(key) != string_settings.end()) + queue_settings[key] = value; + else + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported queue setting: {}", value); + } + } + + /// Impose default settings if there are no user-defined settings. + if (!queue_settings.contains("x-max-length")) + { + queue_settings["x-max-length"] = queue_size; + } + if (!queue_settings.contains("x-overflow")) + { + queue_settings["x-overflow"] = "reject-publish"; + } + + /// If queue_base - a single name, then it can be used as one specific queue, from which to read. + /// Otherwise it is used as a generator (unique for current table) of queue names, because it allows to + /// maximize performance - via setting `nats_num_queues`. + const String queue_name = !hash_exchange ? queue_base : std::to_string(queue_id) + "_" + queue_base; + + /// AMQP::autodelete setting is not allowed, because in case of server restart there will be no consumers + /// and deleting queues should not take place. + rabbit_channel.declareQueue(queue_name, AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback); + connection->getHandler().startBlockingLoop(); +} + + +bool StorageNATS::updateChannel(ChannelPtr & channel) +{ + try + { + channel = connection->createChannel(); + return true; + } + catch (...) + { + tryLogCurrentException(log); + return false; + } +} + + +void StorageNATS::prepareChannelForBuffer(ConsumerBufferPtr buffer) +{ + if (!buffer) + return; + + if (buffer->queuesCount() != queues.size()) + buffer->updateQueues(queues); + + buffer->updateAckTracker(); + + if (updateChannel(buffer->getChannel())) + buffer->setupChannel(); +} + + +void StorageNATS::unbindExchange() +{ + /* This is needed because with NATS (without special adjustments) can't, for example, properly make mv if there was insert query + * on the same table before, and in another direction it will make redundant copies, but most likely nobody will do that. + * As publishing is done to exchange, publisher never knows to which queues the message will go, every application interested in + * consuming from certain exchange - declares its owns exchange-bound queues, messages go to all such exchange-bound queues, and as + * input streams are always created at startup, then they will also declare its own exchange bound queues, but they will not be visible + * externally - client declares its own exchange-bound queues, from which to consume, so this means that if not disconnecting this local + * queues, then messages will go both ways and in one of them they will remain not consumed. So need to disconnect local exchange + * bindings to remove redunadant message copies, but after that mv cannot work unless those bindings are recreated. Recreating them is + * not difficult but very ugly and as probably nobody will do such thing - bindings will not be recreated. + */ + if (!exchange_removed.exchange(true)) + { + try + { + streaming_task->deactivate(); + + stopLoop(); + looping_task->deactivate(); + + auto rabbit_channel = connection->createChannel(); + rabbit_channel->removeExchange(bridge_exchange) + .onSuccess([&]() + { + connection->getHandler().stopLoop(); + }) + .onError([&](const char * message) + { + throw Exception("Unable to remove exchange. Reason: " + std::string(message), ErrorCodes::CANNOT_REMOVE_RABBITMQ_EXCHANGE); + }); + + connection->getHandler().startBlockingLoop(); + rabbit_channel->close(); + } + catch (...) + { + exchange_removed = false; + throw; + } + } +} + + +Pipe StorageNATS::read( + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + SelectQueryInfo & /* query_info */, + ContextPtr local_context, + QueryProcessingStage::Enum /* processed_stage */, + size_t /* max_block_size */, + unsigned /* num_streams */) +{ + if (!rabbit_is_ready) + throw Exception("NATS setup not finished. Connection might be lost", ErrorCodes::CANNOT_CONNECT_RABBITMQ); + + if (num_created_consumers == 0) + return {}; + + if (!local_context->getSettingsRef().stream_like_engine_allow_direct_select) + throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Direct select is not allowed. To enable use setting `stream_like_engine_allow_direct_select`"); + + if (mv_attached) + throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageNATS with attached materialized views"); + + std::lock_guard lock(loop_mutex); + + auto sample_block = storage_snapshot->getSampleBlockForColumns(column_names); + auto modified_context = addSettings(local_context); + + if (!connection->isConnected()) + { + if (connection->getHandler().loopRunning()) + deactivateTask(looping_task, false, true); + if (!connection->reconnect()) + throw Exception(ErrorCodes::CANNOT_CONNECT_RABBITMQ, "No connection to {}", connection->connectionInfoForLog()); + } + + initializeBuffers(); + + Pipes pipes; + pipes.reserve(num_created_consumers); + + for (size_t i = 0; i < num_created_consumers; ++i) + { + auto rabbit_source = std::make_shared( + *this, storage_snapshot, modified_context, column_names, 1, nats_settings->nats_commit_on_select); + + auto converting_dag = ActionsDAG::makeConvertingActions( + rabbit_source->getPort().getHeader().getColumnsWithTypeAndName(), + sample_block.getColumnsWithTypeAndName(), + ActionsDAG::MatchColumnsMode::Name); + + auto converting = std::make_shared(std::move(converting_dag)); + auto converting_transform = std::make_shared(rabbit_source->getPort().getHeader(), std::move(converting)); + + pipes.emplace_back(std::move(rabbit_source)); + pipes.back().addTransform(std::move(converting_transform)); + } + + if (!connection->getHandler().loopRunning() && connection->isConnected()) + startLoop(); + + LOG_DEBUG(log, "Starting reading {} streams", pipes.size()); + auto united_pipe = Pipe::unitePipes(std::move(pipes)); + united_pipe.addInterpreterContext(modified_context); + return united_pipe; +} + + +SinkToStoragePtr StorageNATS::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context) +{ + return std::make_shared(*this, metadata_snapshot, local_context); +} + + +void StorageNATS::startup() +{ + if (!rabbit_is_ready) + { + if (connection->isConnected()) + { + try + { + initNATS(); + } + catch (...) + { + if (!is_attach) + throw; + tryLogCurrentException(log); + } + } + else + { + connection_task->activateAndSchedule(); + } + } + + for (size_t i = 0; i < num_consumers; ++i) + { + try + { + auto buffer = createReadBuffer(); + pushReadBuffer(std::move(buffer)); + ++num_created_consumers; + } + catch (...) + { + if (!is_attach) + throw; + tryLogCurrentException(log); + } + } + + streaming_task->activateAndSchedule(); +} + + +void StorageNATS::shutdown() +{ + shutdown_called = true; + + /// In case it has not yet been able to setup connection; + deactivateTask(connection_task, true, false); + + /// The order of deactivating tasks is important: wait for streamingToViews() func to finish and + /// then wait for background event loop to finish. + deactivateTask(streaming_task, true, false); + deactivateTask(looping_task, true, true); + + /// Just a paranoid try catch, it is not actually needed. + try + { + if (drop_table) + { + for (auto & buffer : buffers) + buffer->closeChannel(); + + cleanupNATS(); + } + + /// It is important to close connection here - before removing consumer buffers, because + /// it will finish and clean callbacks, which might use those buffers data. + connection->disconnect(); + + for (size_t i = 0; i < num_created_consumers; ++i) + popReadBuffer(); + } + catch (...) + { + tryLogCurrentException(log); + } +} + + +/// The only thing publishers are supposed to be aware of is _exchanges_ and queues are a responsibility of a consumer. +/// Therefore, if a table is dropped, a clean up is needed. +void StorageNATS::cleanupNATS() const +{ + if (use_user_setup) + return; + + connection->heartbeat(); + if (!connection->isConnected()) + { + String queue_names; + for (const auto & queue : queues) + { + if (!queue_names.empty()) + queue_names += ", "; + queue_names += queue; + } + LOG_WARNING(log, + "NATS clean up not done, because there is no connection in table's shutdown." + "There are {} queues ({}), which might need to be deleted manually. Exchanges will be auto-deleted", + queues.size(), queue_names); + return; + } + + auto rabbit_channel = connection->createChannel(); + for (const auto & queue : queues) + { + /// AMQP::ifunused is needed, because it is possible to share queues between multiple tables and dropping + /// on of them should not affect others. + /// AMQP::ifempty is not used on purpose. + + rabbit_channel->removeQueue(queue, AMQP::ifunused) + .onSuccess([&](uint32_t num_messages) + { + LOG_TRACE(log, "Successfully deleted queue {}, messages contained {}", queue, num_messages); + connection->getHandler().stopLoop(); + }) + .onError([&](const char * message) + { + LOG_ERROR(log, "Failed to delete queue {}. Error message: {}", queue, message); + connection->getHandler().stopLoop(); + }); + } + connection->getHandler().startBlockingLoop(); + rabbit_channel->close(); + + /// Also there is no need to cleanup exchanges as they were created with AMQP::autodelete option. Once queues + /// are removed, exchanges will also be cleaned. +} + + +void StorageNATS::pushReadBuffer(ConsumerBufferPtr buffer) +{ + std::lock_guard lock(buffers_mutex); + buffers.push_back(buffer); + semaphore.set(); +} + + +ConsumerBufferPtr StorageNATS::popReadBuffer() +{ + return popReadBuffer(std::chrono::milliseconds::zero()); +} + + +ConsumerBufferPtr StorageNATS::popReadBuffer(std::chrono::milliseconds timeout) +{ + // Wait for the first free buffer + if (timeout == std::chrono::milliseconds::zero()) + semaphore.wait(); + else + { + if (!semaphore.tryWait(timeout.count())) + return nullptr; + } + + // Take the first available buffer from the list + std::lock_guard lock(buffers_mutex); + auto buffer = buffers.back(); + buffers.pop_back(); + + return buffer; +} + + +ConsumerBufferPtr StorageNATS::createReadBuffer() +{ + return std::make_shared( + connection->getHandler(), queues, ++consumer_id, + unique_strbase, log, row_delimiter, queue_size, shutdown_called); +} + + +ProducerBufferPtr StorageNATS::createWriteBuffer() +{ + return std::make_shared( + configuration, getContext(), routing_keys, exchange_name, exchange_type, + producer_id.fetch_add(1), persistent, shutdown_called, log, + row_delimiter ? std::optional{row_delimiter} : std::nullopt, 1, 1024); +} + + +bool StorageNATS::checkDependencies(const StorageID & table_id) +{ + // Check if all dependencies are attached + auto dependencies = DatabaseCatalog::instance().getDependencies(table_id); + if (dependencies.empty()) + return true; + + // Check the dependencies are ready? + for (const auto & db_tab : dependencies) + { + auto table = DatabaseCatalog::instance().tryGetTable(db_tab, getContext()); + if (!table) + return false; + + // If it materialized view, check it's target table + auto * materialized_view = dynamic_cast(table.get()); + if (materialized_view && !materialized_view->tryGetTargetTable()) + return false; + + // Check all its dependencies + if (!checkDependencies(db_tab)) + return false; + } + + return true; +} + + +void StorageNATS::initializeBuffers() +{ + assert(rabbit_is_ready); + if (!initialized) + { + for (const auto & buffer : buffers) + prepareChannelForBuffer(buffer); + initialized = true; + } +} + + +void StorageNATS::streamingToViewsFunc() +{ + if (rabbit_is_ready) + { + try + { + auto table_id = getStorageID(); + + // Check if at least one direct dependency is attached + size_t dependencies_count = DatabaseCatalog::instance().getDependencies(table_id).size(); + bool rabbit_connected = connection->isConnected() || connection->reconnect(); + + if (dependencies_count && rabbit_connected) + { + initializeBuffers(); + auto start_time = std::chrono::steady_clock::now(); + + mv_attached.store(true); + + // Keep streaming as long as there are attached views and streaming is not cancelled + while (!shutdown_called && num_created_consumers > 0) + { + if (!checkDependencies(table_id)) + break; + + LOG_DEBUG(log, "Started streaming to {} attached views", dependencies_count); + + if (streamToViews()) + { + /// Reschedule with backoff. + if (milliseconds_to_wait < BACKOFF_TRESHOLD) + milliseconds_to_wait *= 2; + stopLoopIfNoReaders(); + break; + } + else + { + milliseconds_to_wait = RESCHEDULE_MS; + } + + auto end_time = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast(end_time - start_time); + if (duration.count() > MAX_THREAD_WORK_DURATION_MS) + { + stopLoopIfNoReaders(); + LOG_TRACE(log, "Reschedule streaming. Thread work duration limit exceeded."); + break; + } + } + } + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } + + mv_attached.store(false); + + /// If there is no running select, stop the loop which was + /// activated by previous select. + if (connection->getHandler().loopRunning()) + stopLoopIfNoReaders(); + + if (!shutdown_called) + streaming_task->scheduleAfter(milliseconds_to_wait); +} + + +bool StorageNATS::streamToViews() +{ + auto table_id = getStorageID(); + auto table = DatabaseCatalog::instance().getTable(table_id, getContext()); + if (!table) + throw Exception("Engine table " + table_id.getNameForLogs() + " doesn't exist.", ErrorCodes::LOGICAL_ERROR); + + // Create an INSERT query for streaming data + auto insert = std::make_shared(); + insert->table_id = table_id; + + // Only insert into dependent views and expect that input blocks contain virtual columns + InterpreterInsertQuery interpreter(insert, nats_context, false, true, true); + auto block_io = interpreter.execute(); + + auto storage_snapshot = getStorageSnapshot(getInMemoryMetadataPtr(), getContext()); + auto column_names = block_io.pipeline.getHeader().getNames(); + auto sample_block = storage_snapshot->getSampleBlockForColumns(column_names); + + auto block_size = getMaxBlockSize(); + + // Create a stream for each consumer and join them in a union stream + std::vector> sources; + Pipes pipes; + sources.reserve(num_created_consumers); + pipes.reserve(num_created_consumers); + + for (size_t i = 0; i < num_created_consumers; ++i) + { + auto source = std::make_shared( + *this, storage_snapshot, nats_context, column_names, block_size, false); + sources.emplace_back(source); + pipes.emplace_back(source); + + // Limit read batch to maximum block size to allow DDL + StreamLocalLimits limits; + + limits.speed_limits.max_execution_time = nats_settings->nats_flush_interval_ms.changed + ? nats_settings->nats_flush_interval_ms + : getContext()->getSettingsRef().stream_flush_interval_ms; + + limits.timeout_overflow_mode = OverflowMode::BREAK; + + source->setLimits(limits); + } + + block_io.pipeline.complete(Pipe::unitePipes(std::move(pipes))); + + if (!connection->getHandler().loopRunning()) + startLoop(); + + { + CompletedPipelineExecutor executor(block_io.pipeline); + executor.execute(); + } + + /* Note: sending ack() with loop running in another thread will lead to a lot of data races inside the library, but only in case + * error occurs or connection is lost while ack is being sent + */ + deactivateTask(looping_task, false, true); + size_t queue_empty = 0; + + if (!connection->isConnected()) + { + if (shutdown_called) + return true; + + if (connection->reconnect()) + { + LOG_DEBUG(log, "Connection restored, updating channels"); + for (auto & source : sources) + source->updateChannel(); + } + else + { + LOG_TRACE(log, "Reschedule streaming. Unable to restore connection."); + return true; + } + } + else + { + /// Commit + for (auto & source : sources) + { + if (source->queueEmpty()) + ++queue_empty; + + if (source->needChannelUpdate()) + { + auto buffer = source->getBuffer(); + prepareChannelForBuffer(buffer); + } + + /* false is returned by the sendAck function in only two cases: + * 1) if connection failed. In this case all channels will be closed and will be unable to send ack. Also ack is made based on + * delivery tags, which are unique to channels, so if channels fail, those delivery tags will become invalid and there is + * no way to send specific ack from a different channel. Actually once the server realises that it has messages in a queue + * waiting for confirm from a channel which suddenly closed, it will immediately make those messages accessible to other + * consumers. So in this case duplicates are inevitable. + * 2) size of the sent frame (libraries's internal request interface) exceeds max frame - internal library error. This is more + * common for message frames, but not likely to happen to ack frame I suppose. So I do not believe it is likely to happen. + * Also in this case if channel didn't get closed - it is ok if failed to send ack, because the next attempt to send ack on + * the same channel will also commit all previously not-committed messages. Anyway I do not think that for ack frame this + * will ever happen. + */ + if (!source->sendAck()) + { + /// Iterate loop to activate error callbacks if they happened + connection->getHandler().iterateLoop(); + if (!connection->isConnected()) + break; + } + + connection->getHandler().iterateLoop(); + } + } + + if ((queue_empty == num_created_consumers) && (++read_attempts == MAX_FAILED_READ_ATTEMPTS)) + { + connection->heartbeat(); + read_attempts = 0; + LOG_TRACE(log, "Reschedule streaming. Queues are empty."); + return true; + } + else + { + startLoop(); + } + + /// Do not reschedule, do not stop event loop. + return false; +} + + +void registerStorageNATS(StorageFactory & factory) +{ + auto creator_fn = [](const StorageFactory::Arguments & args) + { + auto nats_settings = std::make_unique(); + bool with_named_collection = getExternalDataSourceConfiguration(args.engine_args, *nats_settings, args.getLocalContext()); + if (!with_named_collection && !args.storage_def->settings) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "NATS engine must have settings"); + + nats_settings->loadFromQuery(*args.storage_def); + + if (!nats_settings->nats_host_port.changed + && !nats_settings->nats_address.changed) + throw Exception("You must specify either `nats_host_port` or `nats_address` settings", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + if (!nats_settings->nats_format.changed) + throw Exception("You must specify `nats_format` setting", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + return StorageNATS::create(args.table_id, args.getContext(), args.columns, std::move(nats_settings), args.attach); + }; + + factory.registerStorage("NATS", creator_fn, StorageFactory::StorageFeatures{ .supports_settings = true, }); +} + + +NamesAndTypesList StorageNATS::getVirtuals() const +{ + return NamesAndTypesList{ + {"_exchange_name", std::make_shared()}, + {"_channel_id", std::make_shared()}, + {"_delivery_tag", std::make_shared()}, + {"_redelivered", std::make_shared()}, + {"_message_id", std::make_shared()}, + {"_timestamp", std::make_shared()} + }; +} + +} diff --git a/src/Storages/NATS/StorageNATS.h b/src/Storages/NATS/StorageNATS.h new file mode 100644 index 00000000000..e5b397194b7 --- /dev/null +++ b/src/Storages/NATS/StorageNATS.h @@ -0,0 +1,207 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +class StorageNATS final: public shared_ptr_helper, public IStorage, WithContext +{ + friend struct shared_ptr_helper; + +public: + std::string getName() const override { return "NATS"; } + + bool noPushingToViews() const override { return true; } + + void startup() override; + void shutdown() override; + + /// This is a bad way to let storage know in shutdown() that table is going to be dropped. There are some actions which need + /// to be done only when table is dropped (not when detached). Also connection must be closed only in shutdown, but those + /// actions require an open connection. Therefore there needs to be a way inside shutdown() method to know whether it is called + /// because of drop query. And drop() method is not suitable at all, because it will not only require to reopen connection, but also + /// it can be called considerable time after table is dropped (for example, in case of Atomic database), which is not appropriate for the case. + void checkTableCanBeDropped() const override { drop_table = true; } + + /// Always return virtual columns in addition to required columns + Pipe read( + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + SelectQueryInfo & query_info, + ContextPtr context, + QueryProcessingStage::Enum processed_stage, + size_t max_block_size, + unsigned num_streams) override; + + SinkToStoragePtr write( + const ASTPtr & query, + const StorageMetadataPtr & metadata_snapshot, + ContextPtr context) override; + + void pushReadBuffer(ConsumerBufferPtr buf); + ConsumerBufferPtr popReadBuffer(); + ConsumerBufferPtr popReadBuffer(std::chrono::milliseconds timeout); + + ProducerBufferPtr createWriteBuffer(); + + const String & getFormatName() const { return format_name; } + NamesAndTypesList getVirtuals() const override; + + String getExchange() const { return exchange_name; } + void unbindExchange(); + + bool updateChannel(ChannelPtr & channel); + void updateQueues(std::vector & queues_) { queues_ = queues; } + void prepareChannelForBuffer(ConsumerBufferPtr buffer); + + void incrementReader(); + void decrementReader(); + +protected: + StorageNATS( + const StorageID & table_id_, + ContextPtr context_, + const ColumnsDescription & columns_, + std::unique_ptr nats_settings_, + bool is_attach_); + +private: + ContextMutablePtr nats_context; + std::unique_ptr nats_settings; + + const String exchange_name; + const String format_name; + AMQP::ExchangeType exchange_type; + Names routing_keys; + char row_delimiter; + const String schema_name; + size_t num_consumers; + size_t num_queues; + String queue_base; + Names queue_settings_list; + + /// For insert query. Mark messages as durable. + const bool persistent; + + /// A table setting. It is possible not to perform any NATS setup, which is supposed to be consumer-side setup: + /// declaring exchanges, queues, bindings. Instead everything needed from NATS table is to connect to a specific queue. + /// This solution disables all optimizations and is not really optimal, but allows user to fully control all NATS setup. + bool use_user_setup; + + bool hash_exchange; + Poco::Logger * log; + + NATSConnectionPtr connection; /// Connection for all consumers + NATSConfiguration configuration; + + size_t num_created_consumers = 0; + Poco::Semaphore semaphore; + std::mutex buffers_mutex; + std::vector buffers; /// available buffers for NATS consumers + + String unique_strbase; /// to make unique consumer channel id + + /// maximum number of messages in NATS queue (x-max-length). Also used + /// to setup size of inner buffer for received messages + uint32_t queue_size; + + String sharding_exchange, bridge_exchange, consumer_exchange; + size_t consumer_id = 0; /// counter for consumer buffer, needed for channel id + + std::vector queues; + + std::once_flag flag; /// remove exchange only once + std::mutex task_mutex; + BackgroundSchedulePool::TaskHolder streaming_task; + BackgroundSchedulePool::TaskHolder looping_task; + BackgroundSchedulePool::TaskHolder connection_task; + + uint64_t milliseconds_to_wait; + + /** + * ╰( ͡° ͜ʖ ͡° )つ──☆* Evil atomics: + */ + /// Needed for tell MV or producer background tasks + /// that they must finish as soon as possible. + std::atomic shutdown_called{false}; + /// Counter for producer buffers, needed for channel id. + /// Needed to generate unique producer buffer identifiers. + std::atomic producer_id = 1; + /// Has connection background task completed successfully? + /// It is started only once -- in constructor. + std::atomic rabbit_is_ready = false; + /// Allow to remove exchange only once. + std::atomic exchange_removed = false; + /// For select query we must be aware of the end of streaming + /// to be able to turn off the loop. + std::atomic readers_count = 0; + std::atomic mv_attached = false; + + /// In select query we start event loop, but do not stop it + /// after that select is finished. Then in a thread, which + /// checks for MV we also check if we have select readers. + /// If not - we turn off the loop. The checks are done under + /// mutex to avoid having a turned off loop when select was + /// started. + std::mutex loop_mutex; + + size_t read_attempts = 0; + mutable bool drop_table = false; + bool is_attach; + + ConsumerBufferPtr createReadBuffer(); + void initializeBuffers(); + bool initialized = false; + + /// Functions working in the background + void streamingToViewsFunc(); + void loopingFunc(); + void connectionFunc(); + + void startLoop(); + void stopLoop(); + void stopLoopIfNoReaders(); + + static Names parseSettings(String settings_list); + static AMQP::ExchangeType defineExchangeType(String exchange_type_); + static String getTableBasedName(String name, const StorageID & table_id); + + ContextMutablePtr addSettings(ContextPtr context) const; + size_t getMaxBlockSize() const; + void deactivateTask(BackgroundSchedulePool::TaskHolder & task, bool wait, bool stop_loop); + + void initNATS(); + void cleanupNATS() const; + + void initExchange(AMQP::TcpChannel & rabbit_channel); + void bindExchange(AMQP::TcpChannel & rabbit_channel); + void bindQueue(size_t queue_id, AMQP::TcpChannel & rabbit_channel); + + bool streamToViews(); + bool checkDependencies(const StorageID & table_id); + + static String getRandomName() + { + std::uniform_int_distribution distribution('a', 'z'); + String random_str(32, ' '); + for (auto & c : random_str) + c = distribution(thread_local_rng); + return random_str; + } +}; + +} diff --git a/src/Storages/NATS/UVLoop.h b/src/Storages/NATS/UVLoop.h new file mode 100644 index 00000000000..4de67cbc206 --- /dev/null +++ b/src/Storages/NATS/UVLoop.h @@ -0,0 +1,44 @@ +#pragma once + +#include + +#include +#include + +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int SYSTEM_ERROR; +} + +/// RAII wrapper around uv event loop +class UVLoop : public boost::noncopyable +{ +public: + UVLoop(): loop_ptr(new uv_loop_t()) + { + int res = uv_loop_init(loop_ptr.get()); + + if (res != 0) + throw Exception("UVLoop could not initialize", ErrorCodes::SYSTEM_ERROR); + } + + ~UVLoop() + { + if (loop_ptr) + uv_loop_close(loop_ptr.get()); + } + + inline uv_loop_t * getLoop() { return loop_ptr.get(); } + + inline const uv_loop_t * getLoop() const { return loop_ptr.get(); } + +private: + std::unique_ptr loop_ptr; +}; + +} diff --git a/src/Storages/NATS/WriteBufferToNATSProducer.cpp b/src/Storages/NATS/WriteBufferToNATSProducer.cpp new file mode 100644 index 00000000000..2f4ea7bee53 --- /dev/null +++ b/src/Storages/NATS/WriteBufferToNATSProducer.cpp @@ -0,0 +1,317 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +static const auto BATCH = 1000; +static const auto RETURNED_LIMIT = 50000; + +namespace ErrorCodes +{ + extern const int CANNOT_CONNECT_RABBITMQ; + extern const int LOGICAL_ERROR; +} + +WriteBufferToNATSProducer::WriteBufferToNATSProducer( + const NATSConfiguration & configuration_, + ContextPtr global_context, + const Names & routing_keys_, + const String & exchange_name_, + const AMQP::ExchangeType exchange_type_, + const size_t channel_id_base_, + const bool persistent_, + std::atomic & shutdown_called_, + Poco::Logger * log_, + std::optional delimiter, + size_t rows_per_message, + size_t chunk_size_) + : WriteBuffer(nullptr, 0) + , connection(configuration_, log_) + , routing_keys(routing_keys_) + , exchange_name(exchange_name_) + , exchange_type(exchange_type_) + , channel_id_base(std::to_string(channel_id_base_)) + , persistent(persistent_) + , shutdown_called(shutdown_called_) + , payloads(BATCH) + , returned(RETURNED_LIMIT) + , log(log_) + , delim(delimiter) + , max_rows(rows_per_message) + , chunk_size(chunk_size_) +{ + if (connection.connect()) + setupChannel(); + else + throw Exception(ErrorCodes::CANNOT_CONNECT_RABBITMQ, "Cannot connect to NATS {}", connection.connectionInfoForLog()); + + writing_task = global_context->getSchedulePool().createTask("NATSWritingTask", [this]{ writingFunc(); }); + writing_task->deactivate(); + + if (exchange_type == AMQP::ExchangeType::headers) + { + for (const auto & header : routing_keys) + { + std::vector matching; + boost::split(matching, header, [](char c){ return c == '='; }); + key_arguments[matching[0]] = matching[1]; + } + } + + reinitializeChunks(); +} + + +WriteBufferToNATSProducer::~WriteBufferToNATSProducer() +{ + writing_task->deactivate(); + connection.disconnect(); + assert(rows == 0); +} + + +void WriteBufferToNATSProducer::countRow() +{ + if (++rows % max_rows == 0) + { + const std::string & last_chunk = chunks.back(); + size_t last_chunk_size = offset(); + + if (last_chunk_size && delim && last_chunk[last_chunk_size - 1] == delim) + --last_chunk_size; + + std::string payload; + payload.reserve((chunks.size() - 1) * chunk_size + last_chunk_size); + + for (auto i = chunks.begin(), end = --chunks.end(); i != end; ++i) + payload.append(*i); + + payload.append(last_chunk, 0, last_chunk_size); + + reinitializeChunks(); + + ++payload_counter; + if (!payloads.push(std::make_pair(payload_counter, payload))) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to payloads queue"); + } +} + + +void WriteBufferToNATSProducer::setupChannel() +{ + producer_channel = connection.createChannel(); + + producer_channel->onError([&](const char * message) + { + LOG_ERROR(log, "Producer's channel {} error: {}", channel_id, message); + + /// Channel is not usable anymore. (https://github.com/CopernicaMarketingSoftware/AMQP-CPP/issues/36#issuecomment-125112236) + producer_channel->close(); + + /* Save records that have not received ack/nack from server before channel closure. They are removed and pushed back again once + * they are republished because after channel recovery they will acquire new delivery tags, so all previous records become invalid + */ + for (const auto & record : delivery_record) + if (!returned.push(record.second)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to returned queue"); + + LOG_DEBUG(log, "Producer on channel {} hasn't confirmed {} messages, {} waiting to be published", + channel_id, delivery_record.size(), payloads.size()); + + /// Delivery tags are scoped per channel. + delivery_record.clear(); + delivery_tag = 0; + producer_ready = false; + }); + + producer_channel->onReady([&]() + { + channel_id = channel_id_base + "_" + std::to_string(channel_id_counter++); + LOG_DEBUG(log, "Producer's channel {} is ready", channel_id); + + /* if persistent == true, onAck is received when message is persisted to disk or when it is consumed on every queue. If fails, + * onNack() is received. If persistent == false, message is confirmed the moment it is enqueued. First option is two times + * slower than the second, so default is second and the first is turned on in table setting. + * + * "Publisher confirms" are implemented similar to strategy#3 here https://www.rabbitmq.com/tutorials/tutorial-seven-java.html + */ + producer_channel->confirmSelect() + .onAck([&](uint64_t acked_delivery_tag, bool multiple) + { + removeRecord(acked_delivery_tag, multiple, false); + }) + .onNack([&](uint64_t nacked_delivery_tag, bool multiple, bool /* requeue */) + { + removeRecord(nacked_delivery_tag, multiple, true); + }); + producer_ready = true; + }); +} + + +void WriteBufferToNATSProducer::removeRecord(UInt64 received_delivery_tag, bool multiple, bool republish) +{ + auto record_iter = delivery_record.find(received_delivery_tag); + assert(record_iter != delivery_record.end()); + + if (multiple) + { + /// If multiple is true, then all delivery tags up to and including current are confirmed (with ack or nack). + ++record_iter; + + if (republish) + for (auto record = delivery_record.begin(); record != record_iter; ++record) + if (!returned.push(record->second)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to returned queue"); + + /// Delete the records even in case when republished because new delivery tags will be assigned by the server. + delivery_record.erase(delivery_record.begin(), record_iter); + } + else + { + if (republish) + if (!returned.push(record_iter->second)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to returned queue"); + + delivery_record.erase(record_iter); + } +} + + +void WriteBufferToNATSProducer::publish(ConcurrentBoundedQueue> & messages, bool republishing) +{ + std::pair payload; + + /* It is important to make sure that delivery_record.size() is never bigger than returned.size(), i.e. number if unacknowledged + * messages cannot exceed returned.size(), because they all might end up there + */ + while (!messages.empty() && producer_channel->usable() && delivery_record.size() < RETURNED_LIMIT) + { + bool pop_result = messages.pop(payload); + + if (!pop_result) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not pop payload"); + + AMQP::Envelope envelope(payload.second.data(), payload.second.size()); + + /// if headers exchange is used, routing keys are added here via headers, if not - it is just empty + AMQP::Table message_settings = key_arguments; + + /* There is the case when connection is lost in the period after some messages were published and before ack/nack was sent by the + * server, then it means that publisher will never know whether those messages were delivered or not, and therefore those records + * that received no ack/nack before connection loss will be republished (see onError() callback), so there might be duplicates. To + * let consumer know that received message might be a possible duplicate - a "republished" field is added to message metadata + */ + message_settings["republished"] = std::to_string(republishing); + envelope.setHeaders(message_settings); + + /* Adding here a messageID property to message metadata. Since NATS does not guarantee exactly-once delivery, then on the + * consumer side "republished" field of message metadata can be checked and, if it set to 1, consumer might also check "messageID" + * property. This way detection of duplicates is guaranteed + */ + envelope.setMessageID(std::to_string(payload.first)); + + /// Delivery mode is 1 or 2. 1 is default. 2 makes a message durable, but makes performance 1.5-2 times worse + if (persistent) + envelope.setDeliveryMode(2); + + if (exchange_type == AMQP::ExchangeType::consistent_hash) + { + producer_channel->publish(exchange_name, std::to_string(delivery_tag), envelope); + } + else if (exchange_type == AMQP::ExchangeType::headers) + { + producer_channel->publish(exchange_name, "", envelope); + } + else + { + producer_channel->publish(exchange_name, routing_keys[0], envelope); + } + + /// This is needed for "publisher confirms", which guarantees at-least-once delivery + ++delivery_tag; + delivery_record.insert(delivery_record.end(), {delivery_tag, payload}); + + /// Need to break at some point to let event loop run, because no publishing actually happens before looping + if (delivery_tag % BATCH == 0) + break; + } + + iterateEventLoop(); +} + + +void WriteBufferToNATSProducer::writingFunc() +{ + while ((!payloads.empty() || wait_all) && !shutdown_called.load()) + { + /// If onReady callback is not received, producer->usable() will anyway return true, + /// but must publish only after onReady callback. + if (producer_ready) + { + /* Publish main paylods only when there are no returned messages. This way it is ensured that returned messages are republished + * as fast as possible and no new publishes are made before returned messages are handled + */ + if (!returned.empty() && producer_channel->usable()) + publish(returned, true); + else if (!payloads.empty() && producer_channel->usable()) + publish(payloads, false); + } + + iterateEventLoop(); + + if (wait_num.load() && delivery_record.empty() && payloads.empty() && returned.empty()) + wait_all = false; + else if (!producer_channel->usable()) + { + if (connection.reconnect()) + setupChannel(); + } + } + + LOG_DEBUG(log, "Producer on channel {} completed", channel_id); +} + + +void WriteBufferToNATSProducer::nextImpl() +{ + addChunk(); +} + +void WriteBufferToNATSProducer::addChunk() +{ + chunks.push_back(std::string()); + chunks.back().resize(chunk_size); + set(chunks.back().data(), chunk_size); +} + +void WriteBufferToNATSProducer::reinitializeChunks() +{ + rows = 0; + chunks.clear(); + /// We cannot leave the buffer in the undefined state (i.e. without any + /// underlying buffer), since in this case the WriteBuffeR::next() will + /// not call our nextImpl() (due to available() == 0) + addChunk(); +} + + +void WriteBufferToNATSProducer::iterateEventLoop() +{ + connection.getHandler().iterateLoop(); +} + +} diff --git a/src/Storages/NATS/WriteBufferToNATSProducer.h b/src/Storages/NATS/WriteBufferToNATSProducer.h new file mode 100644 index 00000000000..d0d80a6cf9d --- /dev/null +++ b/src/Storages/NATS/WriteBufferToNATSProducer.h @@ -0,0 +1,121 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +class WriteBufferToNATSProducer : public WriteBuffer +{ +public: + WriteBufferToNATSProducer( + const NATSConfiguration & configuration_, + ContextPtr global_context, + const Names & routing_keys_, + const String & exchange_name_, + const AMQP::ExchangeType exchange_type_, + const size_t channel_id_base_, + const bool persistent_, + std::atomic & shutdown_called_, + Poco::Logger * log_, + std::optional delimiter, + size_t rows_per_message, + size_t chunk_size_ + ); + + ~WriteBufferToNATSProducer() override; + + void countRow(); + void activateWriting() { writing_task->activateAndSchedule(); } + void updateMaxWait() { wait_num.store(payload_counter); } + +private: + void nextImpl() override; + void addChunk(); + void reinitializeChunks(); + + void iterateEventLoop(); + void writingFunc(); + void setupChannel(); + void removeRecord(UInt64 received_delivery_tag, bool multiple, bool republish); + void publish(ConcurrentBoundedQueue> & message, bool republishing); + + NATSConnection connection; + + const Names routing_keys; + const String exchange_name; + AMQP::ExchangeType exchange_type; + const String channel_id_base; /// Serial number of current producer buffer + const bool persistent; + + /* false: when shutdown is called; needed because table might be dropped before all acks are received + * true: in all other cases + */ + std::atomic & shutdown_called; + + AMQP::Table key_arguments; + BackgroundSchedulePool::TaskHolder writing_task; + + std::unique_ptr producer_channel; + bool producer_ready = false; + + /// Channel errors lead to channel closure, need to count number of recreated channels to update channel id + UInt64 channel_id_counter = 0; + + /// channel id which contains id of current producer buffer and serial number of recreated channel in this buffer + String channel_id; + + /* payloads.queue: + * - payloads are pushed to queue in countRow and popped by another thread in writingFunc, each payload gets into queue only once + * returned.queue: + * - payloads are pushed to queue: + * 1) inside channel->onError() callback if channel becomes unusable and the record of pending acknowledgements from server + * is non-empty. + * 2) inside removeRecord() if received nack() - negative acknowledgement from the server that message failed to be written + * to disk or it was unable to reach the queue. + * - payloads are popped from the queue once republished + */ + ConcurrentBoundedQueue> payloads, returned; + + /* Counter of current delivery on a current channel. Delivery tags are scoped per channel. The server attaches a delivery tag for each + * published message - a serial number of delivery on current channel. Delivery tag is a way of server to notify publisher if it was + * able or unable to process delivery, i.e. it sends back a response with a corresponding delivery tag. + */ + UInt64 delivery_tag = 0; + + /* false: message delivery successfully ended: publisher received confirm from server that all published + * 1) persistent messages were written to disk + * 2) non-persistent messages reached the queue + * true: continue to process deliveries and returned messages + */ + bool wait_all = true; + + /* false: until writeSuffix is called + * true: means payloads.queue will not grow anymore + */ + std::atomic wait_num = 0; + + /// Needed to fill messageID property + UInt64 payload_counter = 0; + + /// Record of pending acknowledgements from the server; its size never exceeds size of returned.queue + std::map> delivery_record; + + Poco::Logger * log; + const std::optional delim; + const size_t max_rows; + const size_t chunk_size; + size_t rows = 0; + std::list chunks; +}; + +}