Merge pull request #12761 from kssenii/rabbitmq-improvements

RabbitMQ improvements
This commit is contained in:
alesapin 2020-09-08 09:38:25 +03:00 committed by GitHub
commit 4364bff3bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 2143 additions and 1209 deletions

View File

@ -27,9 +27,15 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
[rabbitmq_exchange_type = 'exchange_type',]
[rabbitmq_routing_key_list = 'key1,key2,...',]
[rabbitmq_row_delimiter = 'delimiter_symbol',]
[rabbitmq_schema = '',]
[rabbitmq_num_consumers = N,]
[rabbitmq_num_queues = N,]
[rabbitmq_transactional_channel = 0]
[rabbitmq_queue_base = 'queue',]
[rabbitmq_deadletter_exchange = 'dl-exchange',]
[rabbitmq_persistent = 0,]
[rabbitmq_skip_broken_messages = N,]
[rabbitmq_max_block_size = N,]
[rabbitmq_flush_interval_ms = N]
```
Required parameters:
@ -40,12 +46,18 @@ Required parameters:
Optional parameters:
- `rabbitmq_exchange_type` The type of RabbitMQ exchange: `direct`, `fanout`, `topic`, `headers`, `consistent-hash`. Default: `fanout`.
- `rabbitmq_exchange_type` The type of RabbitMQ exchange: `direct`, `fanout`, `topic`, `headers`, `consistent_hash`. Default: `fanout`.
- `rabbitmq_routing_key_list` A comma-separated list of routing keys.
- `rabbitmq_row_delimiter` Delimiter character, which ends the message.
- `rabbitmq_schema` Parameter that must be used if the format requires a schema definition. For example, [Capn Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `rabbitmq_num_consumers` The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient.
- `rabbitmq_num_queues` The number of queues per consumer. Default: `1`. Specify more queues if the capacity of one queue per consumer is insufficient. A single queue can contain up to 50K messages at the same time.
- `rabbitmq_transactional_channel` Wrap `INSERT` queries in transactions. Default: `0`.
- `rabbitmq_num_queues` The number of queues per consumer. Default: `1`. Specify more queues if the capacity of one queue per consumer is insufficient.
- `rabbitmq_queue_base` - Specify a base name for queues that will be declared. By default, queues are declared unique to tables based on db and table names.
- `rabbitmq_deadletter_exchange` - Specify name for a [dead letter exchange](https://www.rabbitmq.com/dlx.html). You can create another table with this exchange name and collect messages in cases when they are republished to dead letter exchange. By default dead letter exchange is not specified.
- `persistent` - If set to 1 (true), in insert query delivery mode will be set to 2 (marks messages as 'persistent'). Default: `0`.
- `rabbitmq_skip_broken_messages` RabbitMQ message parser tolerance to schema-incompatible messages per block. Default: `0`. If `rabbitmq_skip_broken_messages = N` then the engine skips *N* RabbitMQ messages that cannot be parsed (a message equals a row of data).
- `rabbitmq_max_block_size`
- `rabbitmq_flush_interval_ms`
Required configuration:
@ -92,13 +104,22 @@ Exchange type options:
- `headers` - Routing is based on `key=value` matches with a setting `x-match=all` or `x-match=any`. Example table key list: `x-match=all,format=logs,type=report,year=2020`.
- `consistent-hash` - Data is evenly distributed between all bound tables (where the exchange name is the same). Note that this exchange type must be enabled with RabbitMQ plugin: `rabbitmq-plugins enable rabbitmq_consistent_hash_exchange`.
If exchange type is not specified, then default is `fanout` and routing keys for data publishing must be randomized in range `[1, num_consumers]` for every message/batch (or in range `[1, num_consumers * num_queues]` if `rabbitmq_num_queues` is set). This table configuration works quicker than any other, especially when `rabbitmq_num_consumers` and/or `rabbitmq_num_queues` parameters are set.
Setting `rabbitmq_queue_base` may be used for the following cases:
- to let different tables share queues, so that multiple consumers could be registered for the same queues, which makes a better performance. If using `rabbitmq_num_consumers` and/or `rabbitmq_num_queues` settings, the exact match of queues is achieved in case these parameters are the same.
- to be able to restore reading from certain durable queues when not all messages were successfully consumed. To be able to resume consumption from one specific queue - set its name in `rabbitmq_queue_base` setting and do not specify `rabbitmq_num_consumers` and `rabbitmq_num_queues` (defaults to 1). To be able to resume consumption from all queues, which were declared for a specific table - just specify the same settings: `rabbitmq_queue_base`, `rabbitmq_num_consumers`, `rabbitmq_num_queues`. By default, queue names will be unique to tables. Note: it makes sence only if messages are sent with delivery mode 2 - marked 'persistent', durable.
- to reuse queues as they are declared durable and not auto-deleted.
If `rabbitmq_num_consumers` and/or `rabbitmq_num_queues` parameters are specified along with `rabbitmq_exchange_type`, then:
To improve performance, received messages are grouped into blocks the size of [max\_insert\_block\_size](../../../operations/server-configuration-parameters/settings.md#settings-max_insert_block_size). If the block wasnt formed within [stream\_flush\_interval\_ms](../../../operations/server-configuration-parameters/settings.md) milliseconds, the data will be flushed to the table regardless of the completeness of the block.
If `rabbitmq_num_consumers` and/or `rabbitmq_num_queues` settings are specified along with `rabbitmq_exchange_type`, then:
- `rabbitmq-consistent-hash-exchange` plugin must be enabled.
- `message_id` property of the published messages must be specified (unique for each message/batch).
For insert query there is message metadata, which is added for each published message: `messageID` and `republished` flag (true, if published more than once) - can be accessed via message headers.
Do not use the same table for inserts and materialized views.
Example:
``` sql
@ -113,10 +134,18 @@ Example:
rabbitmq_num_consumers = 5;
CREATE TABLE daily (key UInt64, value UInt64)
ENGINE = MergeTree();
ENGINE = MergeTree() ORDER BY key;
CREATE MATERIALIZED VIEW consumer TO daily
AS SELECT key, value FROM queue;
SELECT key, value FROM daily ORDER BY key;
```
## Virtual Columns {#virtual-columns}
- `_exchange_name` - RabbitMQ exchange name.
- `_channel_id` - ChannelID, on which consumer, who received the message, was declared.
- `_delivery_tag` - DeliveryTag of the received message. Scoped per channel.
- `_redelivered` - `redelivered` flag of the message.
- `_message_id` - MessageID of the received message; non-empty if was set, when message was published.

View File

@ -503,6 +503,10 @@ namespace ErrorCodes
extern const int CANNOT_RESTORE_FROM_FIELD_DUMP = 536;
extern const int ILLEGAL_MYSQL_VARIABLE = 537;
extern const int MYSQL_SYNTAX_ERROR = 538;
extern const int CANNOT_BIND_RABBITMQ_EXCHANGE = 539;
extern const int CANNOT_DECLARE_RABBITMQ_EXCHANGE = 540;
extern const int CANNOT_CREATE_RABBITMQ_QUEUE_BINDING = 541;
extern const int CANNOT_REMOVE_RABBITMQ_EXCHANGE = 542;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -15,20 +15,29 @@ RabbitMQBlockInputStream::RabbitMQBlockInputStream(
StorageRabbitMQ & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const Context & context_,
const Names & columns)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, context(context_)
, column_names(columns)
, non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized())
, virtual_header(metadata_snapshot->getSampleBlockForColumns({"_exchange"}, storage.getVirtuals(), storage.getStorageID()))
const Names & columns,
size_t max_block_size_,
bool ack_in_suffix_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, context(context_)
, column_names(columns)
, max_block_size(max_block_size_)
, ack_in_suffix(ack_in_suffix_)
, non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized())
, sample_block(non_virtual_header)
, virtual_header(metadata_snapshot->getSampleBlockForColumns(
{"_exchange_name", "_channel_id", "_delivery_tag", "_redelivered", "_message_id"},
storage.getVirtuals(), storage.getStorageID()))
{
for (const auto & column : virtual_header)
sample_block.insert(column);
}
RabbitMQBlockInputStream::~RabbitMQBlockInputStream()
{
if (!claimed)
if (!buffer)
return;
storage.pushReadBuffer(buffer);
@ -37,21 +46,40 @@ RabbitMQBlockInputStream::~RabbitMQBlockInputStream()
Block RabbitMQBlockInputStream::getHeader() const
{
return metadata_snapshot->getSampleBlockForColumns(column_names, storage.getVirtuals(), storage.getStorageID());
return sample_block;
}
void RabbitMQBlockInputStream::readPrefixImpl()
{
auto timeout = std::chrono::milliseconds(context.getSettingsRef().rabbitmq_max_wait_ms.totalMilliseconds());
buffer = storage.popReadBuffer(timeout);
claimed = !!buffer;
}
if (!buffer || finished)
bool RabbitMQBlockInputStream::needChannelUpdate()
{
if (!buffer || !buffer->isChannelUpdateAllowed())
return false;
if (buffer->isChannelError())
return true;
ChannelPtr channel = buffer->getChannel();
return !channel || !channel->usable();
}
void RabbitMQBlockInputStream::updateChannel()
{
if (!buffer)
return;
buffer->checkSubscription();
buffer->updateAckTracker();
storage.updateChannel(buffer->getChannel());
buffer->setupChannel();
}
@ -66,7 +94,7 @@ Block RabbitMQBlockInputStream::readImpl()
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
auto input_format = FormatFactory::instance().getInputFormat(
storage.getFormatName(), *buffer, non_virtual_header, context, 1);
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size);
InputPort port(input_format->getPort().getHeader(), input_format.get());
connect(input_format->getPort(), port);
@ -123,17 +151,31 @@ Block RabbitMQBlockInputStream::readImpl()
auto new_rows = read_rabbitmq_message();
auto exchange_name = buffer->getExchange();
for (size_t i = 0; i < new_rows; ++i)
if (new_rows)
{
virtual_columns[0]->insert(exchange_name);
auto exchange_name = storage.getExchange();
auto channel_id = buffer->getChannelID();
auto delivery_tag = buffer->getDeliveryTag();
auto redelivered = buffer->getRedelivered();
auto message_id = buffer->getMessageID();
buffer->updateAckTracker({delivery_tag, channel_id});
for (size_t i = 0; i < new_rows; ++i)
{
virtual_columns[0]->insert(exchange_name);
virtual_columns[1]->insert(channel_id);
virtual_columns[2]->insert(delivery_tag);
virtual_columns[3]->insert(redelivered);
virtual_columns[4]->insert(message_id);
}
total_rows = total_rows + new_rows;
}
total_rows = total_rows + new_rows;
buffer->allowNext();
if (!new_rows || !checkTimeLimit())
if (total_rows >= max_block_size || buffer->queueEmpty() || buffer->isConsumerStopped() || !checkTimeLimit())
break;
}
@ -144,11 +186,27 @@ Block RabbitMQBlockInputStream::readImpl()
auto virtual_block = virtual_header.cloneWithColumns(std::move(virtual_columns));
for (const auto & column : virtual_block.getColumnsWithTypeAndName())
{
result_block.insert(column);
}
return result_block;
}
void RabbitMQBlockInputStream::readSuffixImpl()
{
if (ack_in_suffix)
sendAck();
}
bool RabbitMQBlockInputStream::sendAck()
{
if (!buffer)
return false;
if (!buffer->ackMessages())
return false;
return true;
}
}

View File

@ -17,7 +17,9 @@ public:
StorageRabbitMQ & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const Context & context_,
const Names & columns);
const Names & columns,
size_t max_block_size_,
bool ack_in_suffix = true);
~RabbitMQBlockInputStream() override;
@ -26,15 +28,23 @@ public:
void readPrefixImpl() override;
Block readImpl() override;
void readSuffixImpl() override;
bool needChannelUpdate();
void updateChannel();
bool sendAck();
private:
StorageRabbitMQ & storage;
StorageMetadataPtr metadata_snapshot;
Context context;
Names column_names;
const size_t max_block_size;
bool ack_in_suffix;
bool finished = false;
bool claimed = false;
const Block non_virtual_header;
Block sample_block;
const Block virtual_header;
ConsumerBufferPtr buffer;

View File

@ -33,6 +33,9 @@ Block RabbitMQBlockOutputStream::getHeader() const
void RabbitMQBlockOutputStream::writePrefix()
{
if (!storage.exchangeRemoved())
storage.unbindExchange();
buffer = storage.createWriteBuffer();
if (!buffer)
throw Exception("Failed to create RabbitMQ producer!", ErrorCodes::CANNOT_CREATE_IO_BUFFER);
@ -56,6 +59,9 @@ void RabbitMQBlockOutputStream::write(const Block & block)
void RabbitMQBlockOutputStream::writeSuffix()
{
child->writeSuffix();
if (buffer)
buffer->updateMaxWait();
}
}

View File

@ -5,35 +5,45 @@
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_CONNECT_RABBITMQ;
}
/* The object of this class is shared between concurrent consumers (who share the same connection == share the same
* event loop and handler).
*/
RabbitMQHandler::RabbitMQHandler(uv_loop_t * loop_, Poco::Logger * log_) :
AMQP::LibUvHandler(loop_),
loop(loop_),
log(log_)
log(log_),
connection_running(false),
loop_running(false),
loop_state(Loop::STOP)
{
}
void RabbitMQHandler::onError(AMQP::TcpConnection * connection, const char * message)
///Method that is called when the connection ends up in an error state.
void RabbitMQHandler::onError(AMQP::TcpConnection * /* connection */, const char * message)
{
LOG_ERROR(log, "Library error report: {}", message);
connection_running.store(false);
}
if (!connection->usable() || !connection->ready())
throw Exception("Connection error", ErrorCodes::CANNOT_CONNECT_RABBITMQ);
void RabbitMQHandler::onReady(AMQP::TcpConnection * /* connection */)
{
LOG_TRACE(log, "Connection is ready");
connection_running.store(true);
loop_state.store(Loop::RUN);
}
void RabbitMQHandler::startLoop()
{
std::lock_guard lock(startup_mutex);
/// stop_loop variable is updated in a separate thread
while (!stop_loop.load())
LOG_DEBUG(log, "Background loop started");
loop_running.store(true);
while (loop_state.load() == Loop::RUN)
uv_run(loop, UV_RUN_NOWAIT);
LOG_DEBUG(log, "Background loop ended");
loop_running.store(false);
}
void RabbitMQHandler::iterateLoop()

View File

@ -11,22 +11,36 @@
namespace DB
{
namespace Loop
{
static const UInt8 RUN = 1;
static const UInt8 STOP = 2;
}
class RabbitMQHandler : public AMQP::LibUvHandler
{
public:
RabbitMQHandler(uv_loop_t * loop_, Poco::Logger * log_);
void onError(AMQP::TcpConnection * connection, const char * message) override;
void stop() { stop_loop.store(true); }
void onError(AMQP::TcpConnection * connection, const char * message) override;
void onReady(AMQP::TcpConnection * connection) override;
void startLoop();
void iterateLoop();
bool connectionRunning() { return connection_running.load(); }
bool loopRunning() { return loop_running.load(); }
void updateLoopState(UInt8 state) { loop_state.store(state); }
UInt8 getLoopState() { return loop_state.load(); }
private:
uv_loop_t * loop;
Poco::Logger * log;
std::atomic<bool> stop_loop = false;
std::atomic<bool> connection_running, loop_running;
std::atomic<UInt8> loop_state;
std::mutex startup_mutex;
};

View File

@ -2,7 +2,6 @@
#include <Core/BaseSettings.h>
namespace DB
{
class ASTStorage;
@ -10,17 +9,22 @@ namespace DB
#define LIST_OF_RABBITMQ_SETTINGS(M) \
M(String, rabbitmq_host_port, "", "A host-port to connect to RabbitMQ server.", 0) \
M(String, rabbitmq_routing_key_list, "5672", "A string of routing keys, separated by dots.", 0) \
M(String, rabbitmq_exchange_name, "clickhouse-exchange", "The exchange name, to which messages are sent.", 0) \
M(String, rabbitmq_format, "", "The message format.", 0) \
M(Char, rabbitmq_row_delimiter, '\0', "The character to be considered as a delimiter.", 0) \
M(String, rabbitmq_exchange_type, "default", "The exchange type.", 0) \
M(String, rabbitmq_routing_key_list, "5672", "A string of routing keys, separated by dots.", 0) \
M(Char, rabbitmq_row_delimiter, '\0', "The character to be considered as a delimiter.", 0) \
M(String, rabbitmq_schema, "", "Schema identifier (used by schema-based formats) for RabbitMQ engine", 0) \
M(UInt64, rabbitmq_num_consumers, 1, "The number of consumer channels per table.", 0) \
M(UInt64, rabbitmq_num_queues, 1, "The number of queues per consumer.", 0) \
M(Bool, rabbitmq_transactional_channel, false, "Use transactional channel for publishing.", 0) \
DECLARE_SETTINGS_TRAITS(RabbitMQSettingsTraits, LIST_OF_RABBITMQ_SETTINGS)
M(String, rabbitmq_queue_base, "", "Base for queue names to be able to reopen non-empty queues in case of failure.", 0) \
M(String, rabbitmq_deadletter_exchange, "", "Exchange name to be passed as a dead-letter-exchange name.", 0) \
M(Bool, rabbitmq_persistent, false, "If set, delivery mode will be set to 2 (makes messages 'persistent', durable).", 0) \
M(UInt64, rabbitmq_skip_broken_messages, 0, "Skip at least this number of broken messages from RabbitMQ per block", 0) \
M(UInt64, rabbitmq_max_block_size, 0, "Number of row collected before flushing data from RabbitMQ.", 0) \
M(Milliseconds, rabbitmq_flush_interval_ms, 0, "Timeout for flushing data from RabbitMQ.", 0) \
DECLARE_SETTINGS_TRAITS(RabbitMQSettingsTraits, LIST_OF_RABBITMQ_SETTINGS)
struct RabbitMQSettings : public BaseSettings<RabbitMQSettingsTraits>
{

View File

@ -17,383 +17,215 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int CANNOT_CREATE_RABBITMQ_QUEUE_BINDING;
}
namespace ExchangeType
{
/// Note that default here means default by implementation and not by rabbitmq settings
static const String DEFAULT = "default";
static const String FANOUT = "fanout";
static const String DIRECT = "direct";
static const String TOPIC = "topic";
static const String HASH = "consistent_hash";
static const String HEADERS = "headers";
}
static const auto QUEUE_SIZE = 50000; /// Equals capacity of a single rabbitmq queue
ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
ChannelPtr consumer_channel_,
ChannelPtr setup_channel_,
HandlerPtr event_handler_,
const String & exchange_name_,
const Names & routing_keys_,
size_t channel_id_,
size_t channel_id_base_,
const String & channel_base_,
const String & queue_base_,
Poco::Logger * log_,
char row_delimiter_,
bool bind_by_id_,
bool hash_exchange_,
size_t num_queues_,
const String & exchange_type_,
const String & local_exchange_,
const String & deadletter_exchange_,
uint32_t queue_size_,
const std::atomic<bool> & stopped_)
: ReadBuffer(nullptr, 0)
, consumer_channel(std::move(consumer_channel_))
, setup_channel(setup_channel_)
, event_handler(event_handler_)
, exchange_name(exchange_name_)
, routing_keys(routing_keys_)
, channel_id(channel_id_)
, bind_by_id(bind_by_id_)
, channel_base(channel_base_)
, channel_id_base(channel_id_base_)
, queue_base(queue_base_)
, hash_exchange(hash_exchange_)
, num_queues(num_queues_)
, exchange_type(exchange_type_)
, local_exchange(local_exchange_)
, local_default_exchange(local_exchange + "_" + ExchangeType::DIRECT)
, local_hash_exchange(local_exchange + "_" + ExchangeType::HASH)
, deadletter_exchange(deadletter_exchange_)
, log(log_)
, row_delimiter(row_delimiter_)
, queue_size(queue_size_)
, stopped(stopped_)
, messages(QUEUE_SIZE * num_queues)
, received(queue_size * num_queues)
{
exchange_type_set = exchange_type != ExchangeType::DEFAULT;
/* One queue per consumer can handle up to 50000 messages. More queues per consumer can be added.
* By default there is one queue per consumer.
*/
for (size_t queue_id = 0; queue_id < num_queues; ++queue_id)
{
/// Queue bingings must be declared before any publishing => it must be done here and not in readPrefix()
initQueueBindings(queue_id);
}
bindQueue(queue_id);
setupChannel();
}
ReadBufferFromRabbitMQConsumer::~ReadBufferFromRabbitMQConsumer()
{
consumer_channel->close();
messages.clear();
BufferBase::set(nullptr, 0, 0);
}
void ReadBufferFromRabbitMQConsumer::initExchange()
void ReadBufferFromRabbitMQConsumer::bindQueue(size_t queue_id)
{
/* This direct-exchange is used for default implementation and for INSERT query (so it is always declared). If exchange_type
* is not set, then there are only two exchanges - external, defined by the client, and local, unique for each table (default).
* This strict division to external and local exchanges is needed to avoid too much complexity with defining exchange_name
* for INSERT query producer and, in general, it is better to distinguish them into separate ones.
*/
consumer_channel->declareExchange(local_default_exchange, AMQP::direct).onError([&](const char * message)
std::atomic<bool> binding_created = false;
auto success_callback = [&](const std::string & queue_name, int msgcount, int /* consumercount */)
{
local_exchange_declared = false;
LOG_ERROR(log, "Failed to declare local direct-exchange. Reason: {}", message);
});
queues.emplace_back(queue_name);
LOG_DEBUG(log, "Queue {} is declared", queue_name);
if (!exchange_type_set)
{
consumer_channel->declareExchange(exchange_name, AMQP::fanout).onError([&](const char * message)
{
local_exchange_declared = false;
LOG_ERROR(log, "Failed to declare default fanout-exchange. Reason: {}", message);
});
if (msgcount)
LOG_INFO(log, "Queue {} is non-empty. Non-consumed messaged will also be delivered", queue_name);
/// With fanout exchange the binding key is ignored - a parameter might be arbitrary. All distribution lies on local_exchange.
consumer_channel->bindExchange(exchange_name, local_default_exchange, routing_keys[0]).onError([&](const char * message)
{
local_exchange_declared = false;
LOG_ERROR(log, "Failed to bind local direct-exchange to fanout-exchange. Reason: {}", message);
});
return;
}
AMQP::ExchangeType type;
if (exchange_type == ExchangeType::FANOUT) type = AMQP::ExchangeType::fanout;
else if (exchange_type == ExchangeType::DIRECT) type = AMQP::ExchangeType::direct;
else if (exchange_type == ExchangeType::TOPIC) type = AMQP::ExchangeType::topic;
else if (exchange_type == ExchangeType::HASH) type = AMQP::ExchangeType::consistent_hash;
else if (exchange_type == ExchangeType::HEADERS) type = AMQP::ExchangeType::headers;
else throw Exception("Invalid exchange type", ErrorCodes::BAD_ARGUMENTS);
/* Declare client's exchange of the specified type and bind it to hash-exchange (if it is not already hash-exchange), which
* will evenly distribute messages between all consumers. (This enables better scaling as without hash-exchange - the only
* option to avoid getting the same messages more than once - is having only one consumer with one queue)
*/
consumer_channel->declareExchange(exchange_name, type).onError([&](const char * message)
{
local_exchange_declared = false;
LOG_ERROR(log, "Failed to declare client's {} exchange. Reason: {}", exchange_type, message);
});
/// No need for declaring hash-exchange if there is only one consumer with one queue or exchange type is already hash
if (!bind_by_id)
return;
hash_exchange = true;
if (exchange_type == ExchangeType::HASH)
return;
/* By default hash exchange distributes messages based on a hash value of a routing key, which must be a string integer. But
* in current case we use hash exchange for binding to another exchange of some other type, which needs its own routing keys
* of other types: headers, patterns and string-keys. This means that hash property must be changed.
*/
{
AMQP::Table binding_arguments;
binding_arguments["hash-property"] = "message_id";
/// Declare exchange for sharding.
consumer_channel->declareExchange(local_hash_exchange, AMQP::consistent_hash, binding_arguments)
/* Here we bind either to sharding exchange (consistent-hash) or to bridge exchange (fanout). All bindings to routing keys are
* done between client's exchange and local bridge exchange. Binding key must be a string integer in case of hash exchange, for
* fanout exchange it can be arbitrary
*/
setup_channel->bindQueue(exchange_name, queue_name, std::to_string(channel_id_base))
.onSuccess([&] { binding_created = true; })
.onError([&](const char * message)
{
local_exchange_declared = false;
LOG_ERROR(log, "Failed to declare {} exchange: {}", exchange_type, message);
throw Exception(
ErrorCodes::CANNOT_CREATE_RABBITMQ_QUEUE_BINDING,
"Failed to create queue binding with queue {} for exchange {}. Reason: {}", std::string(message),
queue_name, exchange_name);
});
}
};
/// Then bind client's exchange to sharding exchange (by keys, specified by the client):
if (exchange_type == ExchangeType::HEADERS)
auto error_callback([&](const char * message)
{
AMQP::Table binding_arguments;
std::vector<String> matching;
/* This error is most likely a result of an attempt to declare queue with different settings if it was declared before. So for a
* given queue name either deadletter_exchange parameter changed or queue_size changed, i.e. table was declared with different
* max_block_size parameter. Solution: client should specify a different queue_base parameter or manually delete previously
* declared queues via any of the various cli tools.
*/
throw Exception("Failed to declare queue. Probably queue settings are conflicting: max_block_size, deadletter_exchange. Attempt \
specifying differently those settings or use a different queue_base or manually delete previously declared queues, \
which were declared with the same names. ERROR reason: "
+ std::string(message), ErrorCodes::BAD_ARGUMENTS);
});
for (const auto & header : routing_keys)
{
boost::split(matching, header, [](char c){ return c == '='; });
binding_arguments[matching[0]] = matching[1];
matching.clear();
}
AMQP::Table queue_settings;
/// Routing key can be arbitrary here.
consumer_channel->bindExchange(exchange_name, local_hash_exchange, routing_keys[0], binding_arguments)
.onError([&](const char * message)
{
local_exchange_declared = false;
LOG_ERROR(log, "Failed to bind local hash exchange to client's exchange. Reason: {}", message);
});
}
else
queue_settings["x-max-length"] = queue_size;
queue_settings["x-overflow"] = "reject-publish";
if (!deadletter_exchange.empty())
queue_settings["x-dead-letter-exchange"] = deadletter_exchange;
/* The first option not just simplifies queue_name, but also implements the possibility to be able to resume reading from one
* specific queue when its name is specified in queue_base setting
*/
const String queue_name = !hash_exchange ? queue_base : std::to_string(channel_id_base) + "_" + std::to_string(queue_id) + "_" + queue_base;
setup_channel->declareQueue(queue_name, AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback);
while (!binding_created)
{
for (const auto & routing_key : routing_keys)
{
consumer_channel->bindExchange(exchange_name, local_hash_exchange, routing_key).onError([&](const char * message)
{
local_exchange_declared = false;
LOG_ERROR(log, "Failed to bind local hash exchange to client's exchange. Reason: {}", message);
});
}
iterateEventLoop();
}
}
void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
void ReadBufferFromRabbitMQConsumer::subscribe()
{
/// These variables might be updated later from a separate thread in onError callbacks.
if (!local_exchange_declared || (exchange_type_set && !local_hash_exchange_declared))
for (const auto & queue_name : queues)
{
initExchange();
local_exchange_declared = true;
local_hash_exchange_declared = true;
}
bool default_bindings_created = false, default_bindings_error = false;
bool bindings_created = false, bindings_error = false;
consumer_channel->declareQueue(AMQP::exclusive)
.onSuccess([&](const std::string & queue_name_, int /* msgcount */, int /* consumercount */)
{
queues.emplace_back(queue_name_);
subscribed_queue[queue_name_] = false;
String binding_key = routing_keys[0];
/* Every consumer has at least one unique queue. Bind the queues to exchange based on the consumer_channel_id
* in case there is one queue per consumer and bind by queue_id in case there is more than 1 queue per consumer.
* (queue_id is based on channel_id)
*/
if (bind_by_id || hash_exchange)
consumer_channel->consume(queue_name)
.onSuccess([&](const std::string & /* consumer_tag */)
{
if (queues.size() == 1)
{
binding_key = std::to_string(channel_id);
}
else
{
binding_key = std::to_string(channel_id + queue_id);
}
}
LOG_TRACE(log, "Consumer on channel {} is subscribed to queue {}", channel_id, queue_name);
/// Bind queue to exchange that is used for INSERT query and also for default implementation.
consumer_channel->bindQueue(local_default_exchange, queue_name_, binding_key)
.onSuccess([&]
if (++subscribed == queues.size())
wait_subscription.store(false);
})
.onReceived([&](const AMQP::Message & message, uint64_t delivery_tag, bool redelivered)
{
default_bindings_created = true;
if (message.bodySize())
{
String message_received = std::string(message.body(), message.body() + message.bodySize());
if (row_delimiter != '\0')
message_received += row_delimiter;
if (message.hasMessageID())
received.push({message_received, message.messageID(), redelivered, AckTracker(delivery_tag, channel_id)});
else
received.push({message_received, "", redelivered, AckTracker(delivery_tag, channel_id)});
}
})
.onError([&](const char * message)
{
default_bindings_error = true;
LOG_ERROR(log, "Failed to bind to key {}. Reason: {}", binding_key, message);
/* End up here either if channel ends up in an error state (then there will be resubscription) or consume call error, which
* arises from queue settings mismatch or queue level error, which should not happen as noone else is supposed to touch them
*/
LOG_ERROR(log, "Consumer failed on channel {}. Reason: {}", channel_id, message);
wait_subscription.store(false);
});
}
}
/* Subscription can probably be moved back to readPrefix(), but not sure whether it is better in regard to speed, because
* if moved there, it must(!) be wrapped inside a channel->onSuccess callback or any other, otherwise
* consumer might fail to subscribe and no resubscription will help.
*/
subscribe(queues.back());
LOG_DEBUG(log, "Queue " + queue_name_ + " is declared");
bool ReadBufferFromRabbitMQConsumer::ackMessages()
{
AckTracker record_info = last_inserted_record_info;
if (exchange_type_set)
{
if (hash_exchange)
{
/* If exchange_type == hash, then bind directly to this client's exchange (because there is no need for a distributor
* exchange as it is already hash-exchange), otherwise hash-exchange is a local distributor exchange.
*/
String current_hash_exchange = exchange_type == ExchangeType::HASH ? exchange_name : local_hash_exchange;
/// If hash-exchange is used for messages distribution, then the binding key is ignored - can be arbitrary.
consumer_channel->bindQueue(current_hash_exchange, queue_name_, binding_key)
.onSuccess([&]
{
bindings_created = true;
})
.onError([&](const char * message)
{
bindings_error = true;
LOG_ERROR(log, "Failed to create queue binding to key {}. Reason: {}", binding_key, message);
});
}
else if (exchange_type == ExchangeType::HEADERS)
{
AMQP::Table binding_arguments;
std::vector<String> matching;
/// It is not parsed for the second time - if it was parsed above, then it would never end up here.
for (const auto & header : routing_keys)
{
boost::split(matching, header, [](char c){ return c == '='; });
binding_arguments[matching[0]] = matching[1];
matching.clear();
}
consumer_channel->bindQueue(exchange_name, queue_name_, routing_keys[0], binding_arguments)
.onSuccess([&]
{
bindings_created = true;
})
.onError([&](const char * message)
{
bindings_error = true;
LOG_ERROR(log, "Failed to bind queue to key. Reason: {}", message);
});
}
else
{
/// Means there is only one queue with one consumer - no even distribution needed - no hash-exchange.
for (const auto & routing_key : routing_keys)
{
/// Binding directly to exchange, specified by the client.
consumer_channel->bindQueue(exchange_name, queue_name_, routing_key)
.onSuccess([&]
{
bindings_created = true;
})
.onError([&](const char * message)
{
bindings_error = true;
LOG_ERROR(log, "Failed to bind queue to key. Reason: {}", message);
});
}
}
}
})
.onError([&](const char * message)
{
default_bindings_error = true;
LOG_ERROR(log, "Failed to declare queue on the channel. Reason: {}", message);
});
/* Run event loop (which updates local variables in a separate thread) until bindings are created or failed to be created.
* It is important at this moment to make sure that queue bindings are created before any publishing can happen because
* otherwise messages will be routed nowhere.
/* Do not send ack to server if message's channel is not the same as current running channel because delivery tags are scoped per
* channel, so if channel fails, all previous delivery tags become invalid
*/
while ((!default_bindings_created && !default_bindings_error) || (exchange_type_set && !bindings_created && !bindings_error))
if (record_info.channel_id == channel_id && record_info.delivery_tag && record_info.delivery_tag > prev_tag)
{
iterateEventLoop();
}
}
void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name)
{
if (subscribed_queue[queue_name])
return;
consumer_channel->consume(queue_name, AMQP::noack)
.onSuccess([&](const std::string & /* consumer */)
{
subscribed_queue[queue_name] = true;
consumer_error = false;
++count_subscribed;
LOG_TRACE(log, "Consumer {} is subscribed to queue {}", channel_id, queue_name);
})
.onReceived([&](const AMQP::Message & message, uint64_t /* deliveryTag */, bool /* redelivered */)
{
size_t message_size = message.bodySize();
if (message_size && message.body() != nullptr)
/// Commit all received messages with delivery tags from last commited to last inserted
if (!consumer_channel->ack(record_info.delivery_tag, AMQP::multiple))
{
String message_received = std::string(message.body(), message.body() + message_size);
if (row_delimiter != '\0')
{
message_received += row_delimiter;
}
messages.push(message_received);
LOG_ERROR(log, "Failed to commit messages with delivery tags from last commited to {} on channel {}",
record_info.delivery_tag, channel_id);
return false;
}
})
.onError([&](const char * message)
{
consumer_error = true;
LOG_ERROR(log, "Consumer {} failed. Reason: {}", channel_id, message);
});
prev_tag = record_info.delivery_tag;
LOG_TRACE(log, "Consumer commited messages with deliveryTags up to {} on channel {}", record_info.delivery_tag, channel_id);
}
return true;
}
void ReadBufferFromRabbitMQConsumer::checkSubscription()
void ReadBufferFromRabbitMQConsumer::updateAckTracker(AckTracker record_info)
{
if (count_subscribed == num_queues)
if (record_info.delivery_tag && channel_error.load())
return;
wait_subscribed = num_queues;
if (!record_info.delivery_tag)
prev_tag = 0;
/// These variables are updated in a separate thread.
while (count_subscribed != wait_subscribed && !consumer_error)
last_inserted_record_info = record_info;
}
void ReadBufferFromRabbitMQConsumer::setupChannel()
{
wait_subscription.store(true);
consumer_channel->onReady([&]()
{
iterateEventLoop();
}
/* First number indicates current consumer buffer; second number indicates serial number of created channel for current buffer,
* i.e. if channel fails - another one is created and its serial number is incremented; channel_base is to guarantee that
* channel_id is unique for each table
*/
channel_id = std::to_string(channel_id_base) + "_" + std::to_string(channel_id_counter++) + "_" + channel_base;
LOG_TRACE(log, "Channel {} is created", channel_id);
LOG_TRACE(log, "Consumer {} is subscribed to {} queues", channel_id, count_subscribed);
subscribed = 0;
subscribe();
channel_error.store(false);
});
/// Updated in callbacks which are run by the loop.
if (count_subscribed == num_queues)
return;
/// A case that should never normally happen.
for (auto & queue : queues)
consumer_channel->onError([&](const char * message)
{
subscribe(queue);
}
LOG_ERROR(log, "Channel {} error: {}", channel_id, message);
channel_error.store(true);
wait_subscription.store(false);
});
}
@ -408,10 +240,10 @@ bool ReadBufferFromRabbitMQConsumer::nextImpl()
if (stopped || !allowed)
return false;
if (messages.tryPop(current))
if (received.tryPop(current))
{
auto * new_position = const_cast<char *>(current.data());
BufferBase::set(new_position, current.size(), 0);
auto * new_position = const_cast<char *>(current.message.data());
BufferBase::set(new_position, current.message.size(), 0);
allowed = false;
return true;

View File

@ -24,63 +24,91 @@ class ReadBufferFromRabbitMQConsumer : public ReadBuffer
public:
ReadBufferFromRabbitMQConsumer(
ChannelPtr consumer_channel_,
ChannelPtr setup_channel_,
HandlerPtr event_handler_,
const String & exchange_name_,
const Names & routing_keys_,
size_t channel_id_,
size_t channel_id_base_,
const String & channel_base_,
const String & queue_base_,
Poco::Logger * log_,
char row_delimiter_,
bool bind_by_id_,
bool hash_exchange_,
size_t num_queues_,
const String & exchange_type_,
const String & local_exchange_,
const String & deadletter_exchange_,
uint32_t queue_size_,
const std::atomic<bool> & stopped_);
~ReadBufferFromRabbitMQConsumer() override;
void allowNext() { allowed = true; } // Allow to read next message.
void checkSubscription();
struct AckTracker
{
UInt64 delivery_tag;
String channel_id;
auto getExchange() const { return exchange_name; }
AckTracker() : delivery_tag(0), channel_id("") {}
AckTracker(UInt64 tag, String id) : delivery_tag(tag), channel_id(id) {}
};
struct MessageData
{
String message;
String message_id;
bool redelivered;
AckTracker track;
};
bool isConsumerStopped() { return stopped; }
bool isChannelError() { return channel_error; }
/// Do not allow to update channel if current channel is not properly set up and subscribed
bool isChannelUpdateAllowed() { return !wait_subscription; }
ChannelPtr & getChannel() { return consumer_channel; }
void setupChannel();
bool ackMessages();
void updateAckTracker(AckTracker record = AckTracker());
bool queueEmpty() { return received.empty(); }
void allowNext() { allowed = true; } // Allow to read next message.
auto getChannelID() const { return current.track.channel_id; }
auto getDeliveryTag() const { return current.track.delivery_tag; }
auto getRedelivered() const { return current.redelivered; }
auto getMessageID() const { return current.message_id; }
private:
bool nextImpl() override;
void bindQueue(size_t queue_id);
void subscribe();
void iterateEventLoop();
ChannelPtr consumer_channel;
ChannelPtr setup_channel;
HandlerPtr event_handler;
const String exchange_name;
const Names routing_keys;
const size_t channel_id;
const bool bind_by_id;
const String channel_base;
const size_t channel_id_base;
const String queue_base;
const bool hash_exchange;
const size_t num_queues;
const String exchange_type;
const String local_exchange;
const String local_default_exchange;
const String local_hash_exchange;
const String deadletter_exchange;
Poco::Logger * log;
char row_delimiter;
bool allowed = true;
uint32_t queue_size;
const std::atomic<bool> & stopped;
String default_local_exchange;
bool local_exchange_declared = false, local_hash_exchange_declared = false;
bool exchange_type_set = false, hash_exchange = false;
std::atomic<bool> consumer_error = false;
std::atomic<size_t> count_subscribed = 0, wait_subscribed;
ConcurrentBoundedQueue<String> messages;
String current;
String channel_id;
std::atomic<bool> channel_error = true, wait_subscription = false;
std::vector<String> queues;
std::unordered_map<String, bool> subscribed_queue;
bool nextImpl() override;
void initExchange();
void initQueueBindings(const size_t queue_id);
void subscribe(const String & queue_name);
void iterateEventLoop();
ConcurrentBoundedQueue<MessageData> received;
MessageData current;
size_t subscribed = 0;
AckTracker last_inserted_record_info;
UInt64 prev_tag = 0, channel_id_counter = 0;
};
}

View File

@ -14,7 +14,6 @@
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Storages/RabbitMQ/RabbitMQSettings.h>
#include <Storages/RabbitMQ/RabbitMQBlockInputStream.h>
#include <Storages/RabbitMQ/RabbitMQBlockOutputStream.h>
#include <Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h>
@ -39,14 +38,30 @@ namespace DB
{
static const auto CONNECT_SLEEP = 200;
static const auto RETRIES_MAX = 1000;
static const auto RETRIES_MAX = 20;
static const auto HEARTBEAT_RESCHEDULE_MS = 3000;
static const uint32_t QUEUE_SIZE = 100000;
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
extern const int CANNOT_CONNECT_RABBITMQ;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int CANNOT_BIND_RABBITMQ_EXCHANGE;
extern const int CANNOT_DECLARE_RABBITMQ_EXCHANGE;
extern const int CANNOT_REMOVE_RABBITMQ_EXCHANGE;
}
namespace ExchangeType
{
/// Note that default here means default by implementation and not by rabbitmq settings
static const String DEFAULT = "default";
static const String FANOUT = "fanout";
static const String DIRECT = "direct";
static const String TOPIC = "topic";
static const String HASH = "consistent_hash";
static const String HEADERS = "headers";
}
@ -54,78 +69,152 @@ StorageRabbitMQ::StorageRabbitMQ(
const StorageID & table_id_,
Context & context_,
const ColumnsDescription & columns_,
const String & host_port_,
const Names & routing_keys_,
const String & exchange_name_,
const String & format_name_,
char row_delimiter_,
const String & exchange_type_,
size_t num_consumers_,
size_t num_queues_,
const bool use_transactional_channel_)
std::unique_ptr<RabbitMQSettings> rabbitmq_settings_)
: IStorage(table_id_)
, global_context(context_.getGlobalContext())
, rabbitmq_context(Context(global_context))
, routing_keys(global_context.getMacros()->expand(routing_keys_))
, exchange_name(exchange_name_)
, format_name(global_context.getMacros()->expand(format_name_))
, row_delimiter(row_delimiter_)
, num_consumers(num_consumers_)
, num_queues(num_queues_)
, exchange_type(exchange_type_)
, use_transactional_channel(use_transactional_channel_)
, rabbitmq_settings(std::move(rabbitmq_settings_))
, exchange_name(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_exchange_name.value))
, format_name(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_format.value))
, exchange_type(defineExchangeType(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_exchange_type.value)))
, routing_keys(parseRoutingKeys(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_routing_key_list.value)))
, row_delimiter(rabbitmq_settings->rabbitmq_row_delimiter.value)
, schema_name(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_schema.value))
, num_consumers(rabbitmq_settings->rabbitmq_num_consumers.value)
, num_queues(rabbitmq_settings->rabbitmq_num_queues.value)
, queue_base(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_queue_base.value))
, deadletter_exchange(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_deadletter_exchange.value))
, persistent(rabbitmq_settings->rabbitmq_persistent.value)
, hash_exchange(num_consumers > 1 || num_queues > 1)
, log(&Poco::Logger::get("StorageRabbitMQ (" + table_id_.table_name + ")"))
, parsed_address(parseAddress(global_context.getMacros()->expand(host_port_), 5672))
, address(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_host_port.value))
, parsed_address(parseAddress(address, 5672))
, login_password(std::make_pair(
global_context.getConfigRef().getString("rabbitmq.username"),
global_context.getConfigRef().getString("rabbitmq.password")))
, semaphore(0, num_consumers_)
, semaphore(0, num_consumers)
, unique_strbase(getRandomName())
, queue_size(std::max(QUEUE_SIZE, static_cast<uint32_t>(getMaxBlockSize())))
{
loop = std::make_unique<uv_loop_t>();
uv_loop_init(loop.get());
event_handler = std::make_shared<RabbitMQHandler>(loop.get(), log);
connection = std::make_shared<AMQP::TcpConnection>(event_handler.get(), AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login(login_password.first, login_password.second), "/"));
size_t cnt_retries = 0;
while (!connection->ready() && ++cnt_retries != RETRIES_MAX)
if (!restoreConnection(false))
{
event_handler->iterateLoop();
std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP));
if (!connection->closed())
connection->close(true);
throw Exception("Cannot connect to RabbitMQ " + address, ErrorCodes::CANNOT_CONNECT_RABBITMQ);
}
if (!connection->ready())
throw Exception("Cannot set up connection for consumers", ErrorCodes::CANNOT_CONNECT_RABBITMQ);
rabbitmq_context.makeQueryContext();
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
setInMemoryMetadata(storage_metadata);
streaming_task = global_context.getSchedulePool().createTask("RabbitMQStreamingTask", [this]{ threadFunc(); });
rabbitmq_context.makeQueryContext();
rabbitmq_context = addSettings(rabbitmq_context);
/// One looping task for all consumers as they share the same connection == the same handler == the same event loop
event_handler->updateLoopState(Loop::STOP);
looping_task = global_context.getSchedulePool().createTask("RabbitMQLoopingTask", [this]{ loopingFunc(); });
looping_task->deactivate();
streaming_task = global_context.getSchedulePool().createTask("RabbitMQStreamingTask", [this]{ streamingToViewsFunc(); });
streaming_task->deactivate();
heartbeat_task = global_context.getSchedulePool().createTask("RabbitMQHeartbeatTask", [this]{ heartbeatFunc(); });
heartbeat_task->deactivate();
bind_by_id = num_consumers > 1 || num_queues > 1;
if (queue_base.empty())
{
/* Make sure that local exchange name is unique for each table and is not the same as client's exchange name. It also needs to
* be table-based and not just a random string, because local exchanges should be declared the same for same tables
*/
sharding_exchange = getTableBasedName(exchange_name, table_id_);
auto table_id = getStorageID();
String table_name = table_id.table_name;
/* By default without a specified queue name in queue's declaration - its name will be generated by the library, but its better
* to specify it unique for each table to reuse them once the table is recreated. So it means that queues remain the same for every
* table unless queue_base table setting is specified (which allows to register consumers to specific queues). Now this is a base
* for the names of later declared queues
*/
queue_base = getTableBasedName("", table_id_);
}
else
{
/* In case different tables are used to register multiple consumers to the same queues (so queues are shared between tables) and
* at the same time sharding exchange is needed (if there are multiple shared queues), then those tables also need to share
* sharding exchange and bridge exchange
*/
sharding_exchange = exchange_name + "_" + queue_base;
}
/// Make sure that local exchange name is unique for each table and is not the same as client's exchange name
local_exchange_name = exchange_name + "_" + table_name;
bridge_exchange = sharding_exchange + "_bridge";
}
/// One looping task for all consumers as they share the same connection == the same handler == the same event loop
looping_task = global_context.getSchedulePool().createTask("RabbitMQLoopingTask", [this]{ loopingFunc(); });
looping_task->deactivate();
Names StorageRabbitMQ::parseRoutingKeys(String routing_key_list)
{
Names result;
boost::split(result, routing_key_list, [](char c){ return c == ','; });
for (String & key : result)
boost::trim(key);
return result;
}
AMQP::ExchangeType StorageRabbitMQ::defineExchangeType(String exchange_type_)
{
AMQP::ExchangeType type;
if (exchange_type_ != ExchangeType::DEFAULT)
{
if (exchange_type_ == ExchangeType::FANOUT) type = AMQP::ExchangeType::fanout;
else if (exchange_type_ == ExchangeType::DIRECT) type = AMQP::ExchangeType::direct;
else if (exchange_type_ == ExchangeType::TOPIC) type = AMQP::ExchangeType::topic;
else if (exchange_type_ == ExchangeType::HASH) type = AMQP::ExchangeType::consistent_hash;
else if (exchange_type_ == ExchangeType::HEADERS) type = AMQP::ExchangeType::headers;
else throw Exception("Invalid exchange type", ErrorCodes::BAD_ARGUMENTS);
}
else
{
type = AMQP::ExchangeType::fanout;
}
return type;
}
String StorageRabbitMQ::getTableBasedName(String name, const StorageID & table_id)
{
std::stringstream ss;
if (name.empty())
ss << table_id.database_name << "_" << table_id.table_name;
else
ss << name << "_" << table_id.database_name << "_" << table_id.table_name;
return ss.str();
}
Context StorageRabbitMQ::addSettings(Context context) const
{
context.setSetting("input_format_skip_unknown_fields", true);
context.setSetting("input_format_allow_errors_ratio", 0.);
context.setSetting("input_format_allow_errors_num", rabbitmq_settings->rabbitmq_skip_broken_messages.value);
if (!schema_name.empty())
context.setSetting("format_schema", schema_name);
return context;
}
void StorageRabbitMQ::heartbeatFunc()
{
if (!stream_cancelled)
if (!stream_cancelled && event_handler->connectionRunning())
{
LOG_TRACE(log, "Sending RabbitMQ heartbeat");
connection->heartbeat();
heartbeat_task->scheduleAfter(HEARTBEAT_RESCHEDULE_MS);
}
@ -134,8 +223,256 @@ void StorageRabbitMQ::heartbeatFunc()
void StorageRabbitMQ::loopingFunc()
{
LOG_DEBUG(log, "Starting event looping iterations");
event_handler->startLoop();
if (event_handler->connectionRunning())
event_handler->startLoop();
}
/* Need to deactivate this way because otherwise might get a deadlock when first deactivate streaming task in shutdown and then
* inside streaming task try to deactivate any other task
*/
void StorageRabbitMQ::deactivateTask(BackgroundSchedulePool::TaskHolder & task, bool wait, bool stop_loop)
{
if (stop_loop)
event_handler->updateLoopState(Loop::STOP);
std::unique_lock<std::mutex> lock(task_mutex, std::defer_lock);
if (lock.try_lock())
{
task->deactivate();
lock.unlock();
}
else if (wait) /// Wait only if deactivating from shutdown
{
lock.lock();
task->deactivate();
}
}
size_t StorageRabbitMQ::getMaxBlockSize() const
{
return rabbitmq_settings->rabbitmq_max_block_size.changed
? rabbitmq_settings->rabbitmq_max_block_size.value
: (global_context.getSettingsRef().max_insert_block_size.value / num_consumers);
}
void StorageRabbitMQ::initExchange()
{
/* Binding scheme is the following: client's exchange -> key bindings by routing key list -> bridge exchange (fanout) ->
* -> sharding exchange (only if needed) -> queues
*/
setup_channel->declareExchange(exchange_name, exchange_type, AMQP::durable)
.onError([&](const char * message)
{
/* This error can be a result of attempt to declare exchange if it was already declared but
* 1) with different exchange type. In this case can
* - manually delete previously declared exchange and create a new one.
* - throw an error that the exchange with this name but another type is already declared and ask client to delete it himself
* if it is not needed anymore or use another exchange name.
* 2) with different exchange settings. This can only happen if client himself declared exchange with the same name and
* specified its own settings, which differ from this implementation.
*/
throw Exception("Unable to declare exchange. Make sure specified exchange is not already declared. Error: "
+ std::string(message), ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE);
});
/// Bridge exchange is needed to easily disconnect consumer queues and also simplifies queue bindings
setup_channel->declareExchange(bridge_exchange, AMQP::fanout, AMQP::durable + AMQP::autodelete)
.onError([&](const char * message)
{
/// This error is not supposed to happen as this exchange name is always unique to type and its settings
throw Exception(
ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE, "Unable to declare bridge exchange ({}). Reason: {}", bridge_exchange, std::string(message));
});
if (!hash_exchange)
{
consumer_exchange = bridge_exchange;
return;
}
/* Change hash property because by default it will be routing key, which has to be an integer, but with support for any exchange
* type - routing keys might be of any type
*/
AMQP::Table binding_arguments;
binding_arguments["hash-property"] = "message_id";
/// Declare exchange for sharding.
setup_channel->declareExchange(sharding_exchange, AMQP::consistent_hash, AMQP::durable + AMQP::autodelete, binding_arguments)
.onError([&](const char * message)
{
/* This error can be a result of same reasons as above for exchange_name, i.e. it will mean that sharding exchange name appeared
* to be the same as some other exchange (which purpose is not for sharding). So probably actual error reason: queue_base parameter
* is bad.
*/
throw Exception(
ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE, "Unable to declare sharding exchange ({}). Reason: {}", sharding_exchange, std::string(message));
});
setup_channel->bindExchange(bridge_exchange, sharding_exchange, routing_keys[0])
.onError([&](const char * message)
{
throw Exception(
ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE,
"Unable to bind bridge exchange ({}) to sharding exchange ({}). Reason: {}",
bridge_exchange,
sharding_exchange,
std::string(message));
});
consumer_exchange = sharding_exchange;
}
void StorageRabbitMQ::bindExchange()
{
std::atomic<bool> binding_created = false;
size_t bound_keys = 0;
if (exchange_type == AMQP::ExchangeType::headers)
{
AMQP::Table bind_headers;
for (const auto & header : routing_keys)
{
std::vector<String> matching;
boost::split(matching, header, [](char c){ return c == '='; });
bind_headers[matching[0]] = matching[1];
}
setup_channel->bindExchange(exchange_name, bridge_exchange, routing_keys[0], bind_headers)
.onSuccess([&]() { binding_created = true; })
.onError([&](const char * message)
{
throw Exception(
ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE,
"Unable to bind exchange {} to bridge exchange ({}). Reason: {}",
exchange_name,
bridge_exchange,
std::string(message));
});
}
else if (exchange_type == AMQP::ExchangeType::fanout || exchange_type == AMQP::ExchangeType::consistent_hash)
{
setup_channel->bindExchange(exchange_name, bridge_exchange, routing_keys[0])
.onSuccess([&]() { binding_created = true; })
.onError([&](const char * message)
{
throw Exception(
ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE,
"Unable to bind exchange {} to bridge exchange ({}). Reason: {}",
exchange_name,
bridge_exchange,
std::string(message));
});
}
else
{
for (const auto & routing_key : routing_keys)
{
setup_channel->bindExchange(exchange_name, bridge_exchange, routing_key)
.onSuccess([&]()
{
++bound_keys;
if (bound_keys == routing_keys.size())
binding_created = true;
})
.onError([&](const char * message)
{
throw Exception(
ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE,
"Unable to bind exchange {} to bridge exchange ({}). Reason: {}",
exchange_name,
bridge_exchange,
std::string(message));
});
}
}
while (!binding_created)
{
event_handler->iterateLoop();
}
}
bool StorageRabbitMQ::restoreConnection(bool reconnecting)
{
size_t cnt_retries = 0;
if (reconnecting)
{
deactivateTask(heartbeat_task, false, false);
connection->close(); /// Connection might be unusable, but not closed
/* Connection is not closed immediately (firstly, all pending operations are completed, and then
* an AMQP closing-handshake is performed). But cannot open a new connection untill previous one is properly closed
*/
while (!connection->closed() && ++cnt_retries != RETRIES_MAX)
event_handler->iterateLoop();
/// This will force immediate closure if not yet closed
if (!connection->closed())
connection->close(true);
LOG_TRACE(log, "Trying to restore connection to " + address);
}
connection = std::make_unique<AMQP::TcpConnection>(event_handler.get(),
AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login(login_password.first, login_password.second), "/"));
cnt_retries = 0;
while (!connection->ready() && !stream_cancelled && ++cnt_retries != RETRIES_MAX)
{
event_handler->iterateLoop();
std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP));
}
return event_handler->connectionRunning();
}
void StorageRabbitMQ::updateChannel(ChannelPtr & channel)
{
channel = std::make_shared<AMQP::TcpChannel>(connection.get());
}
void StorageRabbitMQ::unbindExchange()
{
/* This is needed because with RabbitMQ (without special adjustments) can't, for example, properly make mv if there was insert query
* on the same table before, and in another direction it will make redundant copies, but most likely nobody will do that.
* As publishing is done to exchange, publisher never knows to which queues the message will go, every application interested in
* consuming from certain exchange - declares its owns exchange-bound queues, messages go to all such exchange-bound queues, and as
* input streams are always created at startup, then they will also declare its own exchange bound queues, but they will not be visible
* externally - client declares its own exchange-bound queues, from which to consume, so this means that if not disconnecting this local
* queues, then messages will go both ways and in one of them they will remain not consumed. So need to disconnect local exchange
* bindings to remove redunadant message copies, but after that mv cannot work unless those bindings are recreated. Recreating them is
* not difficult but very ugly and as probably nobody will do such thing - bindings will not be recreated.
*/
std::call_once(flag, [&]()
{
heartbeat_task->deactivate();
streaming_task->deactivate();
event_handler->updateLoopState(Loop::STOP);
looping_task->deactivate();
setup_channel->removeExchange(bridge_exchange)
.onSuccess([&]()
{
exchange_removed.store(true);
})
.onError([&](const char * message)
{
throw Exception("Unable to remove exchange. Reason: " + std::string(message), ErrorCodes::CANNOT_REMOVE_RABBITMQ_EXCHANGE);
});
while (!exchange_removed.load())
{
event_handler->iterateLoop();
}
});
}
@ -151,24 +488,53 @@ Pipe StorageRabbitMQ::read(
if (num_created_consumers == 0)
return {};
auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
auto modified_context = addSettings(context);
auto block_size = getMaxBlockSize();
bool update_channels = false;
if (!event_handler->connectionRunning())
{
if (event_handler->loopRunning())
deactivateTask(looping_task, false, true);
update_channels = restoreConnection(true);
if (update_channels)
heartbeat_task->scheduleAfter(HEARTBEAT_RESCHEDULE_MS);
}
Pipes pipes;
pipes.reserve(num_created_consumers);
auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
for (size_t i = 0; i < num_created_consumers; ++i)
{
auto rabbit_stream = std::make_shared<RabbitMQBlockInputStream>(
*this, metadata_snapshot, context, column_names);
*this, metadata_snapshot, modified_context, column_names, block_size);
/* It is a possible but rare case when channel gets into error state and does not also close connection, so need manual update.
* But I believe that in current context and with local rabbitmq settings this will never happen and any channel error will also
* close connection, but checking anyway (in second condition of if statement). This must be done here (and also in streamToViews())
* and not in readPrefix as it requires to stop heartbeats and looping tasks to avoid race conditions inside the library
*/
if (event_handler->connectionRunning() && (update_channels || rabbit_stream->needChannelUpdate()))
{
if (event_handler->loopRunning())
{
deactivateTask(looping_task, false, true);
deactivateTask(heartbeat_task, false, false);
}
rabbit_stream->updateChannel();
}
auto converting_stream = std::make_shared<ConvertingBlockInputStream>(
rabbit_stream, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Name);
pipes.emplace_back(std::make_shared<SourceFromInputStream>(converting_stream));
}
if (!loop_started)
{
loop_started = true;
if (!event_handler->loopRunning() && event_handler->connectionRunning())
looping_task->activateAndSchedule();
}
LOG_DEBUG(log, "Starting reading {} streams", pipes.size());
return Pipe::unitePipes(std::move(pipes));
@ -183,6 +549,10 @@ BlockOutputStreamPtr StorageRabbitMQ::write(const ASTPtr &, const StorageMetadat
void StorageRabbitMQ::startup()
{
setup_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
initExchange();
bindExchange();
for (size_t i = 0; i < num_consumers; ++i)
{
try
@ -192,11 +562,12 @@ void StorageRabbitMQ::startup()
}
catch (const AMQP::Exception & e)
{
std::cerr << e.what();
LOG_ERROR(log, "Got AMQ exception {}", e.what());
throw;
}
}
event_handler->updateLoopState(Loop::RUN);
streaming_task->activateAndSchedule();
heartbeat_task->activateAndSchedule();
}
@ -205,25 +576,30 @@ void StorageRabbitMQ::startup()
void StorageRabbitMQ::shutdown()
{
stream_cancelled = true;
wait_confirm = false;
event_handler->stop();
looping_task->deactivate();
streaming_task->deactivate();
heartbeat_task->deactivate();
for (size_t i = 0; i < num_created_consumers; ++i)
{
popReadBuffer();
}
deactivateTask(streaming_task, true, false);
deactivateTask(looping_task, true, true);
deactivateTask(heartbeat_task, true, false);
connection->close();
size_t cnt_retries = 0;
while (!connection->closed() && ++cnt_retries != RETRIES_MAX)
event_handler->iterateLoop();
/// Should actually force closure, if not yet closed, but it generates distracting error logs
//if (!connection->closed())
// connection->close(true);
for (size_t i = 0; i < num_created_consumers; ++i)
popReadBuffer();
}
void StorageRabbitMQ::pushReadBuffer(ConsumerBufferPtr buffer)
{
std::lock_guard lock(mutex);
std::lock_guard lock(buffers_mutex);
buffers.push_back(buffer);
semaphore.set();
}
@ -247,7 +623,7 @@ ConsumerBufferPtr StorageRabbitMQ::popReadBuffer(std::chrono::milliseconds timeo
}
// Take the first available buffer from the list
std::lock_guard lock(mutex);
std::lock_guard lock(buffers_mutex);
auto buffer = buffers.back();
buffers.pop_back();
@ -257,24 +633,20 @@ ConsumerBufferPtr StorageRabbitMQ::popReadBuffer(std::chrono::milliseconds timeo
ConsumerBufferPtr StorageRabbitMQ::createReadBuffer()
{
if (update_channel_id)
next_channel_id += num_queues;
update_channel_id = true;
ChannelPtr consumer_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
return std::make_shared<ReadBufferFromRabbitMQConsumer>(
consumer_channel, event_handler, exchange_name, routing_keys,
next_channel_id, log, row_delimiter, bind_by_id, num_queues,
exchange_type, local_exchange_name, stream_cancelled);
consumer_channel, setup_channel, event_handler, consumer_exchange, ++consumer_id,
unique_strbase, queue_base, log, row_delimiter, hash_exchange, num_queues,
deadletter_exchange, queue_size, stream_cancelled);
}
ProducerBufferPtr StorageRabbitMQ::createWriteBuffer()
{
return std::make_shared<WriteBufferToRabbitMQProducer>(
parsed_address, global_context, login_password, routing_keys[0], local_exchange_name,
log, num_consumers * num_queues, bind_by_id, use_transactional_channel,
parsed_address, global_context, login_password, routing_keys, exchange_name, exchange_type,
producer_id.fetch_add(1), persistent, wait_confirm, log,
row_delimiter ? std::optional<char>{row_delimiter} : std::nullopt, 1, 1024);
}
@ -307,7 +679,7 @@ bool StorageRabbitMQ::checkDependencies(const StorageID & table_id)
}
void StorageRabbitMQ::threadFunc()
void StorageRabbitMQ::streamingToViewsFunc()
{
try
{
@ -352,35 +724,43 @@ bool StorageRabbitMQ::streamToViews()
auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = table_id;
// Only insert into dependent views and expect that input blocks contain virtual columns
InterpreterInsertQuery interpreter(insert, rabbitmq_context, false, true, true);
auto block_io = interpreter.execute();
auto metadata_snapshot = getInMemoryMetadataPtr();
auto column_names = block_io.out->getHeader().getNames();
auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
/* event_handler->connectionRunning() does not guarantee that connnection is not closed in case loop was not running before, but
* need to anyway start the loop to activate error callbacks and update connection state, because even checking with
* connection->usable() will not give correct answer before callbacks are activated.
*/
if (!event_handler->loopRunning() && event_handler->connectionRunning())
looping_task->activateAndSchedule();
auto block_size = getMaxBlockSize();
// Create a stream for each consumer and join them in a union stream
BlockInputStreams streams;
streams.reserve(num_created_consumers);
auto metadata_snapshot = getInMemoryMetadataPtr();
auto column_names = block_io.out->getHeader().getNames();
auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
for (size_t i = 0; i < num_created_consumers; ++i)
{
auto rabbit_stream = std::make_shared<RabbitMQBlockInputStream>(*this, metadata_snapshot, rabbitmq_context, column_names);
auto converting_stream = std::make_shared<ConvertingBlockInputStream>(rabbit_stream, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Name);
streams.emplace_back(converting_stream);
auto stream = std::make_shared<RabbitMQBlockInputStream>(
*this, metadata_snapshot, rabbitmq_context, column_names, block_size, false);
streams.emplace_back(stream);
// Limit read batch to maximum block size to allow DDL
IBlockInputStream::LocalLimits limits;
const Settings & settings = global_context.getSettingsRef();
limits.speed_limits.max_execution_time = settings.stream_flush_interval_ms;
limits.timeout_overflow_mode = OverflowMode::BREAK;
rabbit_stream->setLimits(limits);
}
if (!loop_started)
{
loop_started = true;
looping_task->activateAndSchedule();
limits.speed_limits.max_execution_time = rabbitmq_settings->rabbitmq_flush_interval_ms.changed
? rabbitmq_settings->rabbitmq_flush_interval_ms
: global_context.getSettingsRef().stream_flush_interval_ms;
limits.timeout_overflow_mode = OverflowMode::BREAK;
stream->setLimits(limits);
}
// Join multiple streams if necessary
@ -393,6 +773,69 @@ bool StorageRabbitMQ::streamToViews()
std::atomic<bool> stub = {false};
copyData(*in, *block_io.out, &stub);
/* Need to stop loop even if connection is ok, because sending ack() with loop running in another thread will lead to a lot of data
* races inside the library, but only in case any error occurs or connection is lost while ack is being sent
*/
if (event_handler->loopRunning())
deactivateTask(looping_task, false, true);
if (!event_handler->connectionRunning())
{
if (!stream_cancelled && restoreConnection(true))
{
for (auto & stream : streams)
stream->as<RabbitMQBlockInputStream>()->updateChannel();
}
else
{
/// Reschedule if unable to connect to rabbitmq or quit if cancelled
return false;
}
}
else
{
deactivateTask(heartbeat_task, false, false);
/// Commit
for (auto & stream : streams)
{
/* false is returned by the sendAck function in only two cases:
* 1) if connection failed. In this case all channels will be closed and will be unable to send ack. Also ack is made based on
* delivery tags, which are unique to channels, so if channels fail, those delivery tags will become invalid and there is
* no way to send specific ack from a different channel. Actually once the server realises that it has messages in a queue
* waiting for confirm from a channel which suddenly closed, it will immediately make those messages accessible to other
* consumers. So in this case duplicates are inevitable.
* 2) size of the sent frame (libraries's internal request interface) exceeds max frame - internal library error. This is more
* common for message frames, but not likely to happen to ack frame I suppose. So I do not believe it is likely to happen.
* Also in this case if channel didn't get closed - it is ok if failed to send ack, because the next attempt to send ack on
* the same channel will also commit all previously not-committed messages. Anyway I do not think that for ack frame this
* will ever happen.
*/
if (!stream->as<RabbitMQBlockInputStream>()->sendAck())
{
/// Iterate loop to activate error callbacks if they happened
event_handler->iterateLoop();
if (event_handler->connectionRunning())
{
/* Almost any error with channel will lead to connection closure, but if so happens that channel errored and
* connection is not closed - also need to restore channels
*/
if (!stream->as<RabbitMQBlockInputStream>()->needChannelUpdate())
stream->as<RabbitMQBlockInputStream>()->updateChannel();
}
else
{
break;
}
}
}
}
event_handler->updateLoopState(Loop::RUN);
looping_task->activateAndSchedule();
heartbeat_task->scheduleAfter(HEARTBEAT_RESCHEDULE_MS); /// It is also deactivated in restoreConnection(), so reschedule anyway
// Check whether the limits were applied during query execution
bool limits_applied = false;
const BlockStreamProfileInfo & info = in->getProfileInfo();
@ -410,170 +853,62 @@ void registerStorageRabbitMQ(StorageFactory & factory)
size_t args_count = engine_args.size();
bool has_settings = args.storage_def->settings;
RabbitMQSettings rabbitmq_settings;
auto rabbitmq_settings = std::make_unique<RabbitMQSettings>();
if (has_settings)
{
rabbitmq_settings.loadFromQuery(*args.storage_def);
}
rabbitmq_settings->loadFromQuery(*args.storage_def);
String host_port = rabbitmq_settings.rabbitmq_host_port;
if (args_count >= 1)
{
const auto * ast = engine_args[0]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::String)
{
host_port = safeGet<String>(ast->value);
}
else
{
throw Exception(String("RabbitMQ host:port must be a string"), ErrorCodes::BAD_ARGUMENTS);
}
}
String routing_key_list = rabbitmq_settings.rabbitmq_routing_key_list.value;
if (args_count >= 2)
{
engine_args[1] = evaluateConstantExpressionAsLiteral(engine_args[1], args.local_context);
routing_key_list = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
}
Names routing_keys;
boost::split(routing_keys, routing_key_list, [](char c){ return c == ','; });
for (String & key : routing_keys)
{
boost::trim(key);
}
String exchange = rabbitmq_settings.rabbitmq_exchange_name.value;
if (args_count >= 3)
{
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context);
const auto * ast = engine_args[2]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::String)
{
exchange = safeGet<String>(ast->value);
}
}
String format = rabbitmq_settings.rabbitmq_format.value;
if (args_count >= 4)
{
engine_args[3] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[3], args.local_context);
const auto * ast = engine_args[3]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::String)
{
format = safeGet<String>(ast->value);
}
else
{
throw Exception("Format must be a string", ErrorCodes::BAD_ARGUMENTS);
}
}
char row_delimiter = rabbitmq_settings.rabbitmq_row_delimiter;
if (args_count >= 5)
{
engine_args[4] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[4], args.local_context);
const auto * ast = engine_args[4]->as<ASTLiteral>();
String arg;
if (ast && ast->value.getType() == Field::Types::String)
{
arg = safeGet<String>(ast->value);
}
else
{
throw Exception("Row delimiter must be a char", ErrorCodes::BAD_ARGUMENTS);
}
if (arg.size() > 1)
{
throw Exception("Row delimiter must be a char", ErrorCodes::BAD_ARGUMENTS);
}
else if (arg.empty())
{
row_delimiter = '\0';
}
else
{
row_delimiter = arg[0];
}
}
String exchange_type = rabbitmq_settings.rabbitmq_exchange_type.value;
if (args_count >= 6)
{
engine_args[5] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[5], args.local_context);
const auto * ast = engine_args[5]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::String)
{
exchange_type = safeGet<String>(ast->value);
// Check arguments and settings
#define CHECK_RABBITMQ_STORAGE_ARGUMENT(ARG_NUM, ARG_NAME) \
/* One of the three required arguments is not specified */ \
if (args_count < (ARG_NUM) && (ARG_NUM) <= 3 && !rabbitmq_settings->ARG_NAME.changed) \
{ \
throw Exception("Required parameter '" #ARG_NAME "' for storage RabbitMQ not specified", \
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); \
} \
if (args_count >= (ARG_NUM)) \
{ \
if (rabbitmq_settings->ARG_NAME.changed) /* The same argument is given in two places */ \
{ \
throw Exception("The argument №" #ARG_NUM " of storage RabbitMQ " \
"and the parameter '" #ARG_NAME "' is duplicated", ErrorCodes::BAD_ARGUMENTS); \
} \
}
if (exchange_type != "fanout" && exchange_type != "direct" && exchange_type != "topic"
&& exchange_type != "headers" && exchange_type != "consistent_hash")
throw Exception("Invalid exchange type", ErrorCodes::BAD_ARGUMENTS);
}
CHECK_RABBITMQ_STORAGE_ARGUMENT(1, rabbitmq_host_port)
CHECK_RABBITMQ_STORAGE_ARGUMENT(2, rabbitmq_exchange_name)
CHECK_RABBITMQ_STORAGE_ARGUMENT(3, rabbitmq_format)
UInt64 num_consumers = rabbitmq_settings.rabbitmq_num_consumers;
if (args_count >= 7)
{
const auto * ast = engine_args[6]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::UInt64)
{
num_consumers = safeGet<UInt64>(ast->value);
}
else
{
throw Exception("Number of consumers must be a positive integer", ErrorCodes::BAD_ARGUMENTS);
}
}
CHECK_RABBITMQ_STORAGE_ARGUMENT(4, rabbitmq_exchange_type)
CHECK_RABBITMQ_STORAGE_ARGUMENT(5, rabbitmq_routing_key_list)
CHECK_RABBITMQ_STORAGE_ARGUMENT(6, rabbitmq_row_delimiter)
CHECK_RABBITMQ_STORAGE_ARGUMENT(7, rabbitmq_schema)
CHECK_RABBITMQ_STORAGE_ARGUMENT(8, rabbitmq_num_consumers)
CHECK_RABBITMQ_STORAGE_ARGUMENT(9, rabbitmq_num_queues)
CHECK_RABBITMQ_STORAGE_ARGUMENT(10, rabbitmq_queue_base)
CHECK_RABBITMQ_STORAGE_ARGUMENT(11, rabbitmq_deadletter_exchange)
CHECK_RABBITMQ_STORAGE_ARGUMENT(12, rabbitmq_persistent)
UInt64 num_queues = rabbitmq_settings.rabbitmq_num_queues;
if (args_count >= 8)
{
const auto * ast = engine_args[7]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::UInt64)
{
num_consumers = safeGet<UInt64>(ast->value);
}
else
{
throw Exception("Number of queues must be a positive integer", ErrorCodes::BAD_ARGUMENTS);
}
}
CHECK_RABBITMQ_STORAGE_ARGUMENT(13, rabbitmq_skip_broken_messages)
CHECK_RABBITMQ_STORAGE_ARGUMENT(14, rabbitmq_max_block_size)
CHECK_RABBITMQ_STORAGE_ARGUMENT(15, rabbitmq_flush_interval_ms)
bool use_transactional_channel = static_cast<bool>(rabbitmq_settings.rabbitmq_transactional_channel);
if (args_count >= 9)
{
const auto * ast = engine_args[8]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::UInt64)
{
use_transactional_channel = static_cast<bool>(safeGet<UInt64>(ast->value));
}
else
{
throw Exception("Transactional channel parameter is a bool", ErrorCodes::BAD_ARGUMENTS);
}
}
#undef CHECK_RABBITMQ_STORAGE_ARGUMENT
return StorageRabbitMQ::create(
args.table_id, args.context, args.columns,
host_port, routing_keys, exchange, format, row_delimiter, exchange_type, num_consumers,
num_queues, use_transactional_channel);
return StorageRabbitMQ::create(args.table_id, args.context, args.columns, std::move(rabbitmq_settings));
};
factory.registerStorage("RabbitMQ", creator_fn, StorageFactory::StorageFeatures{ .supports_settings = true, });
}
NamesAndTypesList StorageRabbitMQ::getVirtuals() const
{
return NamesAndTypesList{
{"_exchange", std::make_shared<DataTypeString>()}
{"_exchange_name", std::make_shared<DataTypeString>()},
{"_channel_id", std::make_shared<DataTypeString>()},
{"_delivery_tag", std::make_shared<DataTypeUInt64>()},
{"_redelivered", std::make_shared<DataTypeUInt8>()},
{"_message_id", std::make_shared<DataTypeString>()}
};
}

View File

@ -9,8 +9,11 @@
#include <atomic>
#include <Storages/RabbitMQ/Buffer_fwd.h>
#include <Storages/RabbitMQ/RabbitMQHandler.h>
#include <Storages/RabbitMQ/RabbitMQSettings.h>
#include <Common/thread_local_rng.h>
#include <amqpcpp/libuv.h>
#include <uv.h>
#include <random>
namespace DB
@ -31,6 +34,7 @@ public:
void startup() override;
void shutdown() override;
/// Always return virtual columns in addition to required columns
Pipe read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
@ -54,55 +58,65 @@ public:
const String & getFormatName() const { return format_name; }
NamesAndTypesList getVirtuals() const override;
const String getExchange() const { return exchange_name; }
void unbindExchange();
bool exchangeRemoved() { return exchange_removed.load(); }
void updateChannel(ChannelPtr & channel);
protected:
StorageRabbitMQ(
const StorageID & table_id_,
Context & context_,
const ColumnsDescription & columns_,
const String & host_port_,
const Names & routing_keys_,
const String & exchange_name_,
const String & format_name_,
char row_delimiter_,
const String & exchange_type_,
size_t num_consumers_,
size_t num_queues_,
const bool use_transactional_channel_);
std::unique_ptr<RabbitMQSettings> rabbitmq_settings_);
private:
Context global_context;
Context rabbitmq_context;
std::unique_ptr<RabbitMQSettings> rabbitmq_settings;
Names routing_keys;
const String exchange_name;
String local_exchange_name;
const String format_name;
AMQP::ExchangeType exchange_type;
Names routing_keys;
char row_delimiter;
const String schema_name;
size_t num_consumers;
size_t num_created_consumers = 0;
bool bind_by_id;
size_t num_queues;
const String exchange_type;
const bool use_transactional_channel;
String queue_base;
const String deadletter_exchange;
const bool persistent;
bool hash_exchange;
Poco::Logger * log;
String address;
std::pair<String, UInt16> parsed_address;
std::pair<String, String> login_password;
std::shared_ptr<uv_loop_t> loop;
std::unique_ptr<uv_loop_t> loop;
std::shared_ptr<RabbitMQHandler> event_handler;
std::shared_ptr<AMQP::TcpConnection> connection; /// Connection for all consumers
std::unique_ptr<AMQP::TcpConnection> connection; /// Connection for all consumers
size_t num_created_consumers = 0;
Poco::Semaphore semaphore;
std::mutex mutex;
std::mutex buffers_mutex;
std::vector<ConsumerBufferPtr> buffers; /// available buffers for RabbitMQ consumers
size_t next_channel_id = 1; /// Must >= 1 because it is used as a binding key, which has to be > 0
bool update_channel_id = false;
std::atomic<bool> loop_started = false;
String unique_strbase; /// to make unique consumer channel id
/// maximum number of messages in RabbitMQ queue (x-max-length). Also used
/// to setup size of inner buffer for received messages
uint32_t queue_size;
String sharding_exchange, bridge_exchange, consumer_exchange;
size_t consumer_id = 0; /// counter for consumer buffer, needed for channel id
std::atomic<size_t> producer_id = 1; /// counter for producer buffer, needed for channel id
std::atomic<bool> wait_confirm = true; /// needed to break waiting for confirmations for producer
std::atomic<bool> exchange_removed = false;
ChannelPtr setup_channel;
std::once_flag flag; /// remove exchange only once
std::mutex task_mutex;
BackgroundSchedulePool::TaskHolder streaming_task;
BackgroundSchedulePool::TaskHolder heartbeat_task;
BackgroundSchedulePool::TaskHolder looping_task;
@ -111,13 +125,34 @@ private:
ConsumerBufferPtr createReadBuffer();
void threadFunc();
/// Functions working in the background
void streamingToViewsFunc();
void heartbeatFunc();
void loopingFunc();
void pingConnection() { connection->heartbeat(); }
static Names parseRoutingKeys(String routing_key_list);
static AMQP::ExchangeType defineExchangeType(String exchange_type_);
static String getTableBasedName(String name, const StorageID & table_id);
Context addSettings(Context context) const;
size_t getMaxBlockSize() const;
void deactivateTask(BackgroundSchedulePool::TaskHolder & task, bool wait, bool stop_loop);
void initExchange();
void bindExchange();
bool restoreConnection(bool reconnecting);
bool streamToViews();
bool checkDependencies(const StorageID & table_id);
String getRandomName() const
{
std::uniform_int_distribution<int> distribution('a', 'z');
String random_str(32, ' ');
for (auto & c : random_str)
c = distribution(thread_local_rng);
return random_str;
}
};
}

View File

@ -13,37 +13,41 @@
namespace DB
{
static const auto CONNECT_SLEEP = 200;
static const auto RETRIES_MAX = 20;
static const auto BATCH = 1000;
static const auto RETURNED_LIMIT = 50000;
namespace ErrorCodes
{
extern const int CANNOT_CONNECT_RABBITMQ;
}
static const auto QUEUE_SIZE = 50000;
static const auto CONNECT_SLEEP = 200;
static const auto RETRIES_MAX = 1000;
static const auto LOOP_WAIT = 10;
WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
std::pair<String, UInt16> & parsed_address,
std::pair<String, UInt16> & parsed_address_,
Context & global_context,
const std::pair<String, String> & login_password_,
const String & routing_key_,
const String & exchange_,
const Names & routing_keys_,
const String & exchange_name_,
const AMQP::ExchangeType exchange_type_,
const size_t channel_id_base_,
const bool persistent_,
std::atomic<bool> & wait_confirm_,
Poco::Logger * log_,
size_t num_queues_,
bool bind_by_id_,
bool use_transactional_channel_,
std::optional<char> delimiter,
size_t rows_per_message,
size_t chunk_size_)
: WriteBuffer(nullptr, 0)
, parsed_address(parsed_address_)
, login_password(login_password_)
, routing_key(routing_key_)
, exchange_name(exchange_ + "_direct")
, bind_by_id(bind_by_id_)
, num_queues(num_queues_)
, use_transactional_channel(use_transactional_channel_)
, payloads(QUEUE_SIZE * num_queues)
, routing_keys(routing_keys_)
, exchange_name(exchange_name_)
, exchange_type(exchange_type_)
, channel_id_base(std::to_string(channel_id_base_))
, persistent(persistent_)
, wait_confirm(wait_confirm_)
, payloads(BATCH)
, returned(RETURNED_LIMIT)
, log(log_)
, delim(delimiter)
, max_rows(rows_per_message)
@ -52,47 +56,48 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
loop = std::make_unique<uv_loop_t>();
uv_loop_init(loop.get());
event_handler = std::make_unique<RabbitMQHandler>(loop.get(), log);
connection = std::make_unique<AMQP::TcpConnection>(event_handler.get(), AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login(login_password.first, login_password.second), "/"));
/* The reason behind making a separate connection for each concurrent producer is explained here:
* https://github.com/CopernicaMarketingSoftware/AMQP-CPP/issues/128#issuecomment-300780086 - publishing from
* different threads (as outputStreams are asynchronous) with the same connection leads to internal library errors.
*/
size_t cnt_retries = 0;
while (!connection->ready() && ++cnt_retries != RETRIES_MAX)
if (setupConnection(false))
{
event_handler->iterateLoop();
std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP));
setupChannel();
}
if (!connection->ready())
else
{
throw Exception("Cannot set up connection for producer", ErrorCodes::CANNOT_CONNECT_RABBITMQ);
}
if (!connection->closed())
connection->close(true);
producer_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
checkExchange();
/// If publishing should be wrapped in transactions
if (use_transactional_channel)
{
producer_channel->startTransaction();
throw Exception("Cannot connect to RabbitMQ host: " + parsed_address.first + ", port: " + std::to_string(parsed_address.second),
ErrorCodes::CANNOT_CONNECT_RABBITMQ);
}
writing_task = global_context.getSchedulePool().createTask("RabbitMQWritingTask", [this]{ writingFunc(); });
writing_task->deactivate();
if (exchange_type == AMQP::ExchangeType::headers)
{
for (const auto & header : routing_keys)
{
std::vector<String> matching;
boost::split(matching, header, [](char c){ return c == '='; });
key_arguments[matching[0]] = matching[1];
}
}
}
WriteBufferToRabbitMQProducer::~WriteBufferToRabbitMQProducer()
{
stop_loop.store(true);
writing_task->deactivate();
checkExchange();
connection->close();
size_t cnt_retries = 0;
while (!connection->closed() && ++cnt_retries != RETRIES_MAX)
{
event_handler->iterateLoop();
std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP));
}
assert(rows == 0 && chunks.empty());
}
@ -110,7 +115,7 @@ void WriteBufferToRabbitMQProducer::countRow()
std::string payload;
payload.reserve((chunks.size() - 1) * chunk_size + last_chunk_size);
for (auto i = chunks.begin(), e = --chunks.end(); i != e; ++i)
for (auto i = chunks.begin(), end = --chunks.end(); i != end; ++i)
payload.append(*i);
payload.append(last_chunk, 0, last_chunk_size);
@ -119,98 +124,202 @@ void WriteBufferToRabbitMQProducer::countRow()
chunks.clear();
set(nullptr, 0);
payloads.push(payload);
++payload_counter;
payloads.push(std::make_pair(payload_counter, payload));
}
}
bool WriteBufferToRabbitMQProducer::setupConnection(bool reconnecting)
{
size_t cnt_retries = 0;
if (reconnecting)
{
connection->close();
while (!connection->closed() && ++cnt_retries != RETRIES_MAX)
event_handler->iterateLoop();
if (!connection->closed())
connection->close(true);
LOG_TRACE(log, "Trying to set up connection");
}
connection = std::make_unique<AMQP::TcpConnection>(event_handler.get(),
AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login(login_password.first, login_password.second), "/"));
cnt_retries = 0;
while (!connection->ready() && ++cnt_retries != RETRIES_MAX)
{
event_handler->iterateLoop();
std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP));
}
return event_handler->connectionRunning();
}
void WriteBufferToRabbitMQProducer::setupChannel()
{
producer_channel = std::make_unique<AMQP::TcpChannel>(connection.get());
producer_channel->onError([&](const char * message)
{
LOG_ERROR(log, "Producer's channel {} error: {}", channel_id, message);
/// Channel is not usable anymore. (https://github.com/CopernicaMarketingSoftware/AMQP-CPP/issues/36#issuecomment-125112236)
producer_channel->close();
/* Save records that have not received ack/nack from server before channel closure. They are removed and pushed back again once
* they are republished because after channel recovery they will acquire new delivery tags, so all previous records become invalid
*/
for (const auto & record : delivery_record)
returned.tryPush(record.second);
LOG_DEBUG(log, "Producer on channel {} hasn't confirmed {} messages, {} waiting to be published",
channel_id, delivery_record.size(), payloads.size());
/// Delivery tags are scoped per channel.
delivery_record.clear();
delivery_tag = 0;
});
producer_channel->onReady([&]()
{
channel_id = channel_id_base + std::to_string(channel_id_counter++);
LOG_DEBUG(log, "Producer's channel {} is ready", channel_id);
/* if persistent == true, onAck is received when message is persisted to disk or when it is consumed on every queue. If fails,
* onNack() is received. If persistent == false, message is confirmed the moment it is enqueued. First option is two times
* slower than the second, so default is second and the first is turned on in table setting.
*
* "Publisher confirms" are implemented similar to strategy#3 here https://www.rabbitmq.com/tutorials/tutorial-seven-java.html
*/
producer_channel->confirmSelect()
.onAck([&](uint64_t acked_delivery_tag, bool multiple)
{
removeRecord(acked_delivery_tag, multiple, false);
})
.onNack([&](uint64_t nacked_delivery_tag, bool multiple, bool /* requeue */)
{
removeRecord(nacked_delivery_tag, multiple, true);
});
});
}
void WriteBufferToRabbitMQProducer::removeRecord(UInt64 received_delivery_tag, bool multiple, bool republish)
{
auto record_iter = delivery_record.find(received_delivery_tag);
if (record_iter != delivery_record.end())
{
if (multiple)
{
/// If multiple is true, then all delivery tags up to and including current are confirmed (with ack or nack).
++record_iter;
if (republish)
for (auto record = delivery_record.begin(); record != record_iter; ++record)
returned.tryPush(record->second);
/// Delete the records even in case when republished because new delivery tags will be assigned by the server.
delivery_record.erase(delivery_record.begin(), record_iter);
}
else
{
if (republish)
returned.tryPush(record_iter->second);
delivery_record.erase(record_iter);
}
}
/// else is theoretically not possible
}
void WriteBufferToRabbitMQProducer::publish(ConcurrentBoundedQueue<std::pair<UInt64, String>> & messages, bool republishing)
{
std::pair<UInt64, String> payload;
/* It is important to make sure that delivery_record.size() is never bigger than returned.size(), i.e. number if unacknowledged
* messages cannot exceed returned.size(), because they all might end up there
*/
while (!messages.empty() && producer_channel->usable() && delivery_record.size() < RETURNED_LIMIT)
{
messages.pop(payload);
AMQP::Envelope envelope(payload.second.data(), payload.second.size());
/// if headers exchange is used, routing keys are added here via headers, if not - it is just empty
AMQP::Table message_settings = key_arguments;
/* There is the case when connection is lost in the period after some messages were published and before ack/nack was sent by the
* server, then it means that publisher will never know whether those messages were delivered or not, and therefore those records
* that received no ack/nack before connection loss will be republished (see onError() callback), so there might be duplicates. To
* let consumer know that received message might be a possible duplicate - a "republished" field is added to message metadata
*/
message_settings["republished"] = std::to_string(republishing);
envelope.setHeaders(message_settings);
/* Adding here a messageID property to message metadata. Since RabbitMQ does not guarantee exactly-once delivery, then on the
* consumer side "republished" field of message metadata can be checked and, if it set to 1, consumer might also check "messageID"
* property. This way detection of duplicates is guaranteed
*/
envelope.setMessageID(std::to_string(payload.first));
/// Delivery mode is 1 or 2. 1 is default. 2 makes a message durable, but makes performance 1.5-2 times worse
if (persistent)
envelope.setDeliveryMode(2);
if (exchange_type == AMQP::ExchangeType::consistent_hash)
{
producer_channel->publish(exchange_name, std::to_string(delivery_tag), envelope);
}
else if (exchange_type == AMQP::ExchangeType::headers)
{
producer_channel->publish(exchange_name, "", envelope);
}
else
{
producer_channel->publish(exchange_name, routing_keys[0], envelope);
}
/// This is needed for "publisher confirms", which guarantees at-least-once delivery
++delivery_tag;
delivery_record.insert(delivery_record.end(), {delivery_tag, payload});
/// Need to break at some point to let event loop run, because no publishing actually happens before looping
if (delivery_tag % BATCH == 0)
break;
}
iterateEventLoop();
}
void WriteBufferToRabbitMQProducer::writingFunc()
{
String payload;
while (!stop_loop || !payloads.empty())
while ((!payloads.empty() || wait_all) && wait_confirm.load())
{
while (!payloads.empty())
{
payloads.pop(payload);
next_queue = next_queue % num_queues + 1;
/* Publish main paylods only when there are no returned messages. This way it is ensured that returned messages are republished
* as fast as possible and no new publishes are made before returned messages are handled
*/
if (!returned.empty() && producer_channel->usable())
publish(returned, true);
else if (!payloads.empty() && producer_channel->usable())
publish(payloads, false);
if (bind_by_id)
{
producer_channel->publish(exchange_name, std::to_string(next_queue), payload);
}
else
{
producer_channel->publish(exchange_name, routing_key, payload);
}
}
iterateEventLoop();
if (wait_num.load() && delivery_record.empty() && payloads.empty() && returned.empty())
wait_all = false;
else if ((!producer_channel->usable() && event_handler->connectionRunning()) || (!event_handler->connectionRunning() && setupConnection(true)))
setupChannel();
}
}
void WriteBufferToRabbitMQProducer::checkExchange()
{
std::atomic<bool> exchange_declared = false, exchange_error = false;
/// The AMQP::passive flag indicates that it should only be checked if there is a valid exchange with the given name.
producer_channel->declareExchange(exchange_name, AMQP::direct, AMQP::passive)
.onSuccess([&]()
{
exchange_declared = true;
})
.onError([&](const char * message)
{
exchange_error = true;
LOG_ERROR(log, "Exchange for INSERT query was not declared. Reason: {}", message);
});
/// These variables are updated in a separate thread and starting the loop blocks current thread
while (!exchange_declared && !exchange_error)
{
iterateEventLoop();
}
}
void WriteBufferToRabbitMQProducer::finilizeProducer()
{
/// This will make sure everything is published
checkExchange();
if (use_transactional_channel)
{
std::atomic<bool> answer_received = false, wait_rollback = false;
producer_channel->commitTransaction()
.onSuccess([&]()
{
answer_received = true;
LOG_TRACE(log, "All messages were successfully published");
})
.onError([&](const char * message1)
{
answer_received = true;
wait_rollback = true;
LOG_TRACE(log, "Publishing not successful: {}", message1);
producer_channel->rollbackTransaction()
.onSuccess([&]()
{
wait_rollback = false;
})
.onError([&](const char * message2)
{
LOG_ERROR(log, "Failed to rollback transaction: {}", message2);
wait_rollback = false;
});
});
size_t count_retries = 0;
while ((!answer_received || wait_rollback) && ++count_retries != RETRIES_MAX)
{
iterateEventLoop();
std::this_thread::sleep_for(std::chrono::milliseconds(LOOP_WAIT));
}
}
LOG_DEBUG(log, "Prodcuer on channel {} completed", channel_id);
}

View File

@ -14,21 +14,20 @@
namespace DB
{
using ChannelPtr = std::shared_ptr<AMQP::TcpChannel>;
class WriteBufferToRabbitMQProducer : public WriteBuffer
{
public:
WriteBufferToRabbitMQProducer(
std::pair<String, UInt16> & parsed_address,
std::pair<String, UInt16> & parsed_address_,
Context & global_context,
const std::pair<String, String> & login_password_,
const String & routing_key_,
const String & exchange_,
const Names & routing_keys_,
const String & exchange_name_,
const AMQP::ExchangeType exchange_type_,
const size_t channel_id_,
const bool persistent_,
std::atomic<bool> & wait_confirm_,
Poco::Logger * log_,
size_t num_queues_,
bool bind_by_id_,
bool use_transactional_channel_,
std::optional<char> delimiter,
size_t rows_per_message,
size_t chunk_size_
@ -38,31 +37,79 @@ public:
void countRow();
void activateWriting() { writing_task->activateAndSchedule(); }
void updateMaxWait() { wait_num.store(payload_counter); }
private:
void nextImpl() override;
void checkExchange();
void iterateEventLoop();
void writingFunc();
void finilizeProducer();
bool setupConnection(bool reconnecting);
void setupChannel();
void removeRecord(UInt64 received_delivery_tag, bool multiple, bool republish);
void publish(ConcurrentBoundedQueue<std::pair<UInt64, String>> & message, bool republishing);
std::pair<String, UInt16> parsed_address;
const std::pair<String, String> login_password;
const String routing_key;
const Names routing_keys;
const String exchange_name;
const bool bind_by_id;
const size_t num_queues;
const bool use_transactional_channel;
AMQP::ExchangeType exchange_type;
const String channel_id_base; /// Serial number of current producer buffer
const bool persistent;
/* false: when shutdown is called; needed because table might be dropped before all acks are received
* true: in all other cases
*/
std::atomic<bool> & wait_confirm;
AMQP::Table key_arguments;
BackgroundSchedulePool::TaskHolder writing_task;
std::atomic<bool> stop_loop = false;
std::unique_ptr<uv_loop_t> loop;
std::unique_ptr<RabbitMQHandler> event_handler;
std::unique_ptr<AMQP::TcpConnection> connection;
ChannelPtr producer_channel;
std::unique_ptr<AMQP::TcpChannel> producer_channel;
ConcurrentBoundedQueue<String> payloads;
size_t next_queue = 0;
/// Channel errors lead to channel closure, need to count number of recreated channels to update channel id
UInt64 channel_id_counter = 0;
/// channel id which contains id of current producer buffer and serial number of recreated channel in this buffer
String channel_id;
/* payloads.queue:
* - payloads are pushed to queue in countRow and poped by another thread in writingFunc, each payload gets into queue only once
* returned.queue:
* - payloads are pushed to queue:
* 1) inside channel->onError() callback if channel becomes unusable and the record of pending acknowledgements from server
* is non-empty.
* 2) inside removeRecord() if received nack() - negative acknowledgement from the server that message failed to be written
* to disk or it was unable to reach the queue.
* - payloads are poped from the queue once republished
*/
ConcurrentBoundedQueue<std::pair<UInt64, String>> payloads, returned;
/* Counter of current delivery on a current channel. Delivery tags are scoped per channel. The server attaches a delivery tag for each
* published message - a serial number of delivery on current channel. Delivery tag is a way of server to notify publisher if it was
* able or unable to process delivery, i.e. it sends back a responce with a corresponding delivery tag.
*/
UInt64 delivery_tag = 0;
/* false: message delivery successfully ended: publisher received confirm from server that all published
* 1) persistent messages were written to disk
* 2) non-persistent messages reached the queue
* true: continue to process deliveries and returned messages
*/
bool wait_all = true;
/* false: untill writeSuffix is called
* true: means payloads.queue will not grow anymore
*/
std::atomic<UInt64> wait_num = 0;
/// Needed to fill messageID property
UInt64 payload_counter = 0;
/// Record of pending acknowledgements from the server; its size never exceeds size of returned.queue
std::map<UInt64, std::pair<UInt64, String>> delivery_record;
Poco::Logger * log;
const std::optional<char> delim;

View File

@ -0,0 +1,6 @@
syntax = "proto3";
message KeyValueProto {
uint64 key = 1;
string value = 2;
}

View File

@ -0,0 +1,77 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: clickhouse_path/format_schemas/rabbitmq.proto
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='clickhouse_path/format_schemas/rabbitmq.proto',
package='',
syntax='proto3',
serialized_options=None,
create_key=_descriptor._internal_create_key,
serialized_pb=b'\n-clickhouse_path/format_schemas/rabbitmq.proto\"+\n\rKeyValueProto\x12\x0b\n\x03key\x18\x01 \x01(\x04\x12\r\n\x05value\x18\x02 \x01(\tb\x06proto3'
)
_KEYVALUEPROTO = _descriptor.Descriptor(
name='KeyValueProto',
full_name='KeyValueProto',
filename=None,
file=DESCRIPTOR,
containing_type=None,
create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='key', full_name='KeyValueProto.key', index=0,
number=1, type=4, cpp_type=4, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='value', full_name='KeyValueProto.value', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=49,
serialized_end=92,
)
DESCRIPTOR.message_types_by_name['KeyValueProto'] = _KEYVALUEPROTO
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
KeyValueProto = _reflection.GeneratedProtocolMessageType('KeyValueProto', (_message.Message,), {
'DESCRIPTOR' : _KEYVALUEPROTO,
'__module__' : 'clickhouse_path.format_schemas.rabbitmq_pb2'
# @@protoc_insertion_point(class_scope:KeyValueProto)
})
_sym_db.RegisterMessage(KeyValueProto)
# @@protoc_insertion_point(module_scope)

File diff suppressed because it is too large Load Diff