diff --git a/src/Storages/RabbitMQ/Buffer_fwd.h b/src/Storages/RabbitMQ/Buffer_fwd.h index f0ef010c518..5be2c6fdf6a 100644 --- a/src/Storages/RabbitMQ/Buffer_fwd.h +++ b/src/Storages/RabbitMQ/Buffer_fwd.h @@ -8,4 +8,7 @@ namespace DB class ReadBufferFromRabbitMQConsumer; using ConsumerBufferPtr = std::shared_ptr; +class WriteBufferToRabbitMQProducer; +using ProducerBufferPtr = std::shared_ptr; + } diff --git a/src/Storages/RabbitMQ/RabbitMQBlockOutputStream.cpp b/src/Storages/RabbitMQ/RabbitMQBlockOutputStream.cpp new file mode 100644 index 00000000000..3f940891c23 --- /dev/null +++ b/src/Storages/RabbitMQ/RabbitMQBlockOutputStream.cpp @@ -0,0 +1,57 @@ +#include +#include +#include +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern int CANNOT_CREATE_IO_BUFFER; +} + + +RabbitMQBlockOutputStream::RabbitMQBlockOutputStream( + StorageRabbitMQ & storage_, const Context & context_) : storage(storage_), context(context_) +{ +} + + +Block RabbitMQBlockOutputStream::getHeader() const +{ + return storage.getSampleBlockNonMaterialized(); +} + + +void RabbitMQBlockOutputStream::writePrefix() +{ + buffer = storage.createWriteBuffer(); + if (!buffer) + throw Exception("Failed to create RabbitMQ producer!", ErrorCodes::CANNOT_CREATE_IO_BUFFER); + + child = FormatFactory::instance().getOutput( + storage.getFormatName(), *buffer, getHeader(), context, [this](const Columns & /* columns */, size_t /* rows */) + { + buffer->count_row(); + }); +} + + +void RabbitMQBlockOutputStream::write(const Block & block) +{ + child->write(block); + + if (buffer) + buffer->flush(); +} + + +void RabbitMQBlockOutputStream::writeSuffix() +{ + child->writeSuffix(); +} + +} diff --git a/src/Storages/RabbitMQ/RabbitMQBlockOutputStream.h b/src/Storages/RabbitMQ/RabbitMQBlockOutputStream.h new file mode 100644 index 00000000000..2f7b89a2a30 --- /dev/null +++ b/src/Storages/RabbitMQ/RabbitMQBlockOutputStream.h @@ -0,0 +1,29 @@ +#pragma once + +#include +#include +#include + + +namespace DB +{ + +class RabbitMQBlockOutputStream : public IBlockOutputStream +{ + +public: + explicit RabbitMQBlockOutputStream(StorageRabbitMQ & storage_, const Context & context_); + + Block getHeader() const override; + + void writePrefix() override; + void write(const Block & block) override; + void writeSuffix() override; + +private: + StorageRabbitMQ & storage; + Context context; + ProducerBufferPtr buffer; + BlockOutputStreamPtr child; +}; +} diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index fb705e4d1bc..ee5dede5261 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -15,6 +15,8 @@ #include #include #include +#include +#include #include #include #include @@ -124,6 +126,12 @@ Pipes StorageRabbitMQ::read( } +BlockOutputStreamPtr StorageRabbitMQ::write(const ASTPtr &, const Context & context) +{ + return std::make_shared(*this, context); +} + + void StorageRabbitMQ::startup() { for (size_t i = 0; i < num_consumers; ++i) @@ -205,6 +213,14 @@ ConsumerBufferPtr StorageRabbitMQ::createReadBuffer() } +ProducerBufferPtr StorageRabbitMQ::createWriteBuffer() +{ + return std::make_shared(parsed_address, routing_key, exchange_name, + log, num_consumers, bind_by_id, hash_exchange, + row_delimiter ? std::optional{row_delimiter} : std::nullopt, 1, 1024); +} + + bool StorageRabbitMQ::checkDependencies(const StorageID & table_id) { // Check if all dependencies are attached diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.h b/src/Storages/RabbitMQ/StorageRabbitMQ.h index fc098b168f1..5aa77a9a732 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.h +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.h @@ -25,6 +25,7 @@ public: std::string getName() const override { return "RabbitMQ"; } bool supportsSettings() const override { return true; } + bool noPushingToViews() const override { return true; } void startup() override; void shutdown() override; @@ -37,10 +38,16 @@ public: size_t max_block_size, unsigned num_streams) override; + BlockOutputStreamPtr write( + const ASTPtr & query, + const Context & context) override; + void pushReadBuffer(ConsumerBufferPtr buf); ConsumerBufferPtr popReadBuffer(); ConsumerBufferPtr popReadBuffer(std::chrono::milliseconds timeout); + ProducerBufferPtr createWriteBuffer(); + const String & getExchangeName() const { return exchange_name; } const String & getRoutingKey() const { return routing_key; } diff --git a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp new file mode 100644 index 00000000000..529cc5bd93b --- /dev/null +++ b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp @@ -0,0 +1,169 @@ +#include +#include "Core/Block.h" +#include "Columns/ColumnString.h" +#include "Columns/ColumnsNumber.h" +#include +#include +#include +#include + + +namespace DB +{ + +enum +{ + Connection_setup_sleep = 200, + Connection_setup_retries_max = 1000, + Buffer_limit_to_flush = 50000 +}; + +WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer( + std::pair & parsed_address, + const String & routing_key_, + const String & exchange_, + Poco::Logger * log_, + const size_t num_queues_, + const bool bind_by_id_, + const bool hash_exchange_, + std::optional delimiter, + size_t rows_per_message, + size_t chunk_size_) + : WriteBuffer(nullptr, 0) + , routing_key(routing_key_) + , exchange_name(exchange_) + , log(log_) + , num_queues(num_queues_) + , bind_by_id(bind_by_id_) + , hash_exchange(hash_exchange_) + , delim(delimiter) + , max_rows(rows_per_message) + , chunk_size(chunk_size_) + , producerEvbase(event_base_new()) + , eventHandler(producerEvbase, log) + , connection(&eventHandler, AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login("root", "clickhouse"), "/")) +{ + /* The reason behind making a separate connection for each concurrent producer is explained here: + * https://github.com/CopernicaMarketingSoftware/AMQP-CPP/issues/128#issuecomment-300780086 + * - publishing from different threads (as outputStreams are asynchronous) leads to internal libary errors. + */ + size_t cnt_retries = 0; + while (!connection.ready() && ++cnt_retries != Connection_setup_retries_max) + { + event_base_loop(producerEvbase, EVLOOP_NONBLOCK | EVLOOP_ONCE); + std::this_thread::sleep_for(std::chrono::milliseconds(Connection_setup_sleep)); + } + + if (!connection.ready()) + { + LOG_ERROR(log, "Cannot set up connection for producer!"); + } + + producer_channel = std::make_shared(&connection); +} + + +WriteBufferToRabbitMQProducer::~WriteBufferToRabbitMQProducer() +{ + flush(); + connection.close(); + + assert(rows == 0 && chunks.empty()); +} + + +void WriteBufferToRabbitMQProducer::count_row() +{ + if (++rows % max_rows == 0) + { + const std::string & last_chunk = chunks.back(); + size_t last_chunk_size = offset(); + + if (delim && last_chunk[last_chunk_size - 1] == delim) + --last_chunk_size; + + std::string payload; + payload.reserve((chunks.size() - 1) * chunk_size + last_chunk_size); + + for (auto i = chunks.begin(), e = --chunks.end(); i != e; ++i) + payload.append(*i); + + payload.append(last_chunk, 0, last_chunk_size); + + rows = 0; + chunks.clear(); + set(nullptr, 0); + + messages.emplace_back(payload); + ++message_counter; + + if (messages.size() >= Buffer_limit_to_flush) + { + flush(); + } + } +} + + +void WriteBufferToRabbitMQProducer::flush() +{ + /* Why accumulating payloads and not publishing each of them at once in count_row()? Because publishing needs to + * be wrapped inside declareExchange() callback and it is too expensive in terms of time to declare it each time + * we publish. Declaring it once and then publishing without wrapping inside onSuccess callback leads to + * exchange becoming inactive at some point and part of messages is lost as a result. + */ + std::atomic exchange_declared = false, exchange_error = false; + + producer_channel->declareExchange(exchange_name + "_direct", AMQP::direct, AMQP::passive) + .onSuccess([&]() + { + for (auto & payload : messages) + { + if (!message_counter) + return; + + next_queue = next_queue % num_queues + 1; + + if (bind_by_id || hash_exchange) + { + producer_channel->publish(exchange_name, std::to_string(next_queue), payload); + } + else + { + producer_channel->publish(exchange_name, routing_key, payload); + } + + --message_counter; + } + + exchange_declared = true; + messages.clear(); + }) + .onError([&](const char * message) + { + exchange_error = true; + exchange_declared = false; + LOG_ERROR(log, "Exchange was not declared: {}", message); + }); + + while (!exchange_declared && !exchange_error) + { + startEventLoop(exchange_declared); + } +} + + +void WriteBufferToRabbitMQProducer::nextImpl() +{ + chunks.push_back(std::string()); + chunks.back().resize(chunk_size); + set(chunks.back().data(), chunk_size); +} + + +void WriteBufferToRabbitMQProducer::startEventLoop(std::atomic & check_param) +{ + eventHandler.start(check_param); +} + +} diff --git a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h new file mode 100644 index 00000000000..d7a1715d491 --- /dev/null +++ b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h @@ -0,0 +1,67 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace DB +{ + +using ProducerPtr = std::shared_ptr; +using Messages = std::vector; + +class WriteBufferToRabbitMQProducer : public WriteBuffer +{ +public: + WriteBufferToRabbitMQProducer( + std::pair & parsed_address, + const String & routing_key_, + const String & exchange_, + Poco::Logger * log_, + const size_t num_queues_, + const bool bind_by_id_, + const bool hash_exchange_, + std::optional delimiter, + size_t rows_per_message, + size_t chunk_size_ + ); + + ~WriteBufferToRabbitMQProducer() override; + + void count_row(); + void flush(); + +private: + void nextImpl() override; + void checkExchange(); + void startEventLoop(std::atomic & check_param); + + const String routing_key; + const String exchange_name; + const bool bind_by_id; + const bool hash_exchange; + const size_t num_queues; + + event_base * producerEvbase; + RabbitMQHandler eventHandler; + AMQP::TcpConnection connection; + ProducerPtr producer_channel; + + size_t next_queue = 0; + UInt64 message_counter = 0; + String channel_id; + + Messages messages; + + Poco::Logger * log; + const std::optional delim; + const size_t max_rows; + const size_t chunk_size; + size_t count_mes = 0; + size_t rows = 0; + std::list chunks; +}; + +}