mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge pull request #16426 from kssenii/rabbit-optimize
Optimize rabbitmq engine
This commit is contained in:
commit
57c3935a26
@ -51,7 +51,7 @@ Optional parameters:
|
||||
- `rabbitmq_row_delimiter` – Delimiter character, which ends the message.
|
||||
- `rabbitmq_schema` – Parameter that must be used if the format requires a schema definition. For example, [Cap’n Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
|
||||
- `rabbitmq_num_consumers` – The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient.
|
||||
- `rabbitmq_num_queues` – The number of queues per consumer. Default: `1`. Specify more queues if the capacity of one queue per consumer is insufficient.
|
||||
- `rabbitmq_num_queues` – Total number of queues. Default: `1`. Increasing this number can significantly improve performance.
|
||||
- `rabbitmq_queue_base` - Specify a hint for queue names. Use cases of this setting are described below.
|
||||
- `rabbitmq_deadletter_exchange` - Specify name for a [dead letter exchange](https://www.rabbitmq.com/dlx.html). You can create another table with this exchange name and collect messages in cases when they are republished to dead letter exchange. By default dead letter exchange is not specified.
|
||||
- `rabbitmq_persistent` - If set to 1 (true), in insert query delivery mode will be set to 2 (marks messages as 'persistent'). Default: `0`.
|
||||
@ -148,4 +148,5 @@ Example:
|
||||
- `_channel_id` - ChannelID, on which consumer, who received the message, was declared.
|
||||
- `_delivery_tag` - DeliveryTag of the received message. Scoped per channel.
|
||||
- `_redelivered` - `redelivered` flag of the message.
|
||||
- `_message_id` - MessageID of the received message; non-empty if was set, when message was published.
|
||||
- `_message_id` - messageID of the received message; non-empty if was set, when message was published.
|
||||
- `_timestamp` - timestamp of the received message; non-empty if was set, when message was published.
|
||||
|
@ -45,7 +45,7 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
|
||||
- `rabbitmq_row_delimiter` – символ-разделитель, который завершает сообщение.
|
||||
- `rabbitmq_schema` – опциональный параметр, необходимый, если используется формат, требующий определения схемы. Например, [Cap’n Proto](https://capnproto.org/) требует путь к файлу со схемой и название корневого объекта `schema.capnp:Message`.
|
||||
- `rabbitmq_num_consumers` – количество потребителей на таблицу. По умолчанию: `1`. Укажите больше потребителей, если пропускная способность одного потребителя недостаточна.
|
||||
- `rabbitmq_num_queues` – количество очередей на потребителя. По умолчанию: `1`. Укажите больше потребителей, если пропускная способность одной очереди на потребителя недостаточна.
|
||||
- `rabbitmq_num_queues` – количество очередей. По умолчанию: `1`. Большее число очередей может сильно увеличить пропускную способность.
|
||||
- `rabbitmq_queue_base` - настройка для имен очередей. Сценарии использования описаны ниже.
|
||||
- `rabbitmq_persistent` - флаг, от которого зависит настройка 'durable' для сообщений при запросах `INSERT`. По умолчанию: `0`.
|
||||
- `rabbitmq_skip_broken_messages` – максимальное количество некорректных сообщений в блоке. Если `rabbitmq_skip_broken_messages = N`, то движок отбрасывает `N` сообщений, которые не получилось обработать. Одно сообщение в точности соответствует одной записи (строке). Значение по умолчанию – 0.
|
||||
@ -140,4 +140,5 @@ Example:
|
||||
- `_channel_id` - идентификатор канала `ChannelID`, на котором было получено сообщение.
|
||||
- `_delivery_tag` - значение `DeliveryTag` полученного сообщения. Уникально в рамках одного канала.
|
||||
- `_redelivered` - флаг `redelivered`. (Не равно нулю, если есть возможность, что сообщение было получено более, чем одним каналом.)
|
||||
- `_message_id` - значение `MessageID` полученного сообщения. Данное поле непусто, если указано в параметрах при отправке сообщения.
|
||||
- `_message_id` - значение поля `messageID` полученного сообщения. Данное поле непусто, если указано в параметрах при отправке сообщения.
|
||||
- `_timestamp` - значение поля `timestamp` полученного сообщения. Данное поле непусто, если указано в параметрах при отправке сообщения.
|
||||
|
@ -27,7 +27,7 @@ RabbitMQBlockInputStream::RabbitMQBlockInputStream(
|
||||
, non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized())
|
||||
, sample_block(non_virtual_header)
|
||||
, virtual_header(metadata_snapshot->getSampleBlockForColumns(
|
||||
{"_exchange_name", "_channel_id", "_delivery_tag", "_redelivered", "_message_id"},
|
||||
{"_exchange_name", "_channel_id", "_delivery_tag", "_redelivered", "_message_id", "_timestamp"},
|
||||
storage.getVirtuals(), storage.getStorageID()))
|
||||
{
|
||||
for (const auto & column : virtual_header)
|
||||
@ -158,6 +158,7 @@ Block RabbitMQBlockInputStream::readImpl()
|
||||
auto delivery_tag = buffer->getDeliveryTag();
|
||||
auto redelivered = buffer->getRedelivered();
|
||||
auto message_id = buffer->getMessageID();
|
||||
auto timestamp = buffer->getTimestamp();
|
||||
|
||||
buffer->updateAckTracker({delivery_tag, channel_id});
|
||||
|
||||
@ -168,6 +169,7 @@ Block RabbitMQBlockInputStream::readImpl()
|
||||
virtual_columns[2]->insert(delivery_tag);
|
||||
virtual_columns[3]->insert(redelivered);
|
||||
virtual_columns[4]->insert(message_id);
|
||||
virtual_columns[5]->insert(timestamp);
|
||||
}
|
||||
|
||||
total_rows = total_rows + new_rows;
|
||||
|
@ -30,6 +30,7 @@ public:
|
||||
Block readImpl() override;
|
||||
void readSuffixImpl() override;
|
||||
|
||||
bool queueEmpty() const { return !buffer || buffer->queueEmpty(); }
|
||||
bool needChannelUpdate();
|
||||
void updateChannel();
|
||||
bool sendAck();
|
||||
|
@ -14,47 +14,27 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int CANNOT_CREATE_RABBITMQ_QUEUE_BINDING;
|
||||
}
|
||||
|
||||
ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
|
||||
ChannelPtr consumer_channel_,
|
||||
ChannelPtr setup_channel_,
|
||||
HandlerPtr event_handler_,
|
||||
const String & exchange_name_,
|
||||
std::vector<String> & queues_,
|
||||
size_t channel_id_base_,
|
||||
const String & channel_base_,
|
||||
const String & queue_base_,
|
||||
Poco::Logger * log_,
|
||||
char row_delimiter_,
|
||||
bool hash_exchange_,
|
||||
size_t num_queues_,
|
||||
const String & deadletter_exchange_,
|
||||
uint32_t queue_size_,
|
||||
const std::atomic<bool> & stopped_)
|
||||
: ReadBuffer(nullptr, 0)
|
||||
, consumer_channel(std::move(consumer_channel_))
|
||||
, setup_channel(setup_channel_)
|
||||
, event_handler(event_handler_)
|
||||
, exchange_name(exchange_name_)
|
||||
, queues(queues_)
|
||||
, channel_base(channel_base_)
|
||||
, channel_id_base(channel_id_base_)
|
||||
, queue_base(queue_base_)
|
||||
, hash_exchange(hash_exchange_)
|
||||
, num_queues(num_queues_)
|
||||
, deadletter_exchange(deadletter_exchange_)
|
||||
, log(log_)
|
||||
, row_delimiter(row_delimiter_)
|
||||
, queue_size(queue_size_)
|
||||
, stopped(stopped_)
|
||||
, received(queue_size * num_queues)
|
||||
, received(queue_size_)
|
||||
{
|
||||
for (size_t queue_id = 0; queue_id < num_queues; ++queue_id)
|
||||
bindQueue(queue_id);
|
||||
|
||||
setupChannel();
|
||||
}
|
||||
|
||||
@ -65,67 +45,6 @@ ReadBufferFromRabbitMQConsumer::~ReadBufferFromRabbitMQConsumer()
|
||||
}
|
||||
|
||||
|
||||
void ReadBufferFromRabbitMQConsumer::bindQueue(size_t queue_id)
|
||||
{
|
||||
std::atomic<bool> binding_created = false;
|
||||
|
||||
auto success_callback = [&](const std::string & queue_name, int msgcount, int /* consumercount */)
|
||||
{
|
||||
queues.emplace_back(queue_name);
|
||||
LOG_DEBUG(log, "Queue {} is declared", queue_name);
|
||||
|
||||
if (msgcount)
|
||||
LOG_INFO(log, "Queue {} is non-empty. Non-consumed messaged will also be delivered", queue_name);
|
||||
|
||||
/* Here we bind either to sharding exchange (consistent-hash) or to bridge exchange (fanout). All bindings to routing keys are
|
||||
* done between client's exchange and local bridge exchange. Binding key must be a string integer in case of hash exchange, for
|
||||
* fanout exchange it can be arbitrary
|
||||
*/
|
||||
setup_channel->bindQueue(exchange_name, queue_name, std::to_string(channel_id_base))
|
||||
.onSuccess([&] { binding_created = true; })
|
||||
.onError([&](const char * message)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::CANNOT_CREATE_RABBITMQ_QUEUE_BINDING,
|
||||
"Failed to create queue binding with queue {} for exchange {}. Reason: {}", std::string(message),
|
||||
queue_name, exchange_name);
|
||||
});
|
||||
};
|
||||
|
||||
auto error_callback([&](const char * message)
|
||||
{
|
||||
/* This error is most likely a result of an attempt to declare queue with different settings if it was declared before. So for a
|
||||
* given queue name either deadletter_exchange parameter changed or queue_size changed, i.e. table was declared with different
|
||||
* max_block_size parameter. Solution: client should specify a different queue_base parameter or manually delete previously
|
||||
* declared queues via any of the various cli tools.
|
||||
*/
|
||||
throw Exception("Failed to declare queue. Probably queue settings are conflicting: max_block_size, deadletter_exchange. Attempt \
|
||||
specifying differently those settings or use a different queue_base or manually delete previously declared queues, \
|
||||
which were declared with the same names. ERROR reason: "
|
||||
+ std::string(message), ErrorCodes::BAD_ARGUMENTS);
|
||||
});
|
||||
|
||||
AMQP::Table queue_settings;
|
||||
|
||||
queue_settings["x-max-length"] = queue_size;
|
||||
queue_settings["x-overflow"] = "reject-publish";
|
||||
|
||||
if (!deadletter_exchange.empty())
|
||||
queue_settings["x-dead-letter-exchange"] = deadletter_exchange;
|
||||
|
||||
/* The first option not just simplifies queue_name, but also implements the possibility to be able to resume reading from one
|
||||
* specific queue when its name is specified in queue_base setting
|
||||
*/
|
||||
const String queue_name = !hash_exchange ? queue_base : std::to_string(channel_id_base) + "_" + std::to_string(queue_id) + "_" + queue_base;
|
||||
setup_channel->declareQueue(queue_name, AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback);
|
||||
|
||||
while (!binding_created)
|
||||
{
|
||||
iterateEventLoop();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void ReadBufferFromRabbitMQConsumer::subscribe()
|
||||
{
|
||||
for (const auto & queue_name : queues)
|
||||
@ -146,10 +65,9 @@ void ReadBufferFromRabbitMQConsumer::subscribe()
|
||||
if (row_delimiter != '\0')
|
||||
message_received += row_delimiter;
|
||||
|
||||
if (message.hasMessageID())
|
||||
received.push({message_received, message.messageID(), redelivered, AckTracker(delivery_tag, channel_id)});
|
||||
else
|
||||
received.push({message_received, "", redelivered, AckTracker(delivery_tag, channel_id)});
|
||||
received.push({message_received, message.hasMessageID() ? message.messageID() : "",
|
||||
message.hasTimestamp() ? message.timestamp() : 0,
|
||||
redelivered, AckTracker(delivery_tag, channel_id)});
|
||||
}
|
||||
})
|
||||
.onError([&](const char * message)
|
||||
|
@ -24,17 +24,12 @@ class ReadBufferFromRabbitMQConsumer : public ReadBuffer
|
||||
public:
|
||||
ReadBufferFromRabbitMQConsumer(
|
||||
ChannelPtr consumer_channel_,
|
||||
ChannelPtr setup_channel_,
|
||||
HandlerPtr event_handler_,
|
||||
const String & exchange_name_,
|
||||
std::vector<String> & queues_,
|
||||
size_t channel_id_base_,
|
||||
const String & channel_base_,
|
||||
const String & queue_base_,
|
||||
Poco::Logger * log_,
|
||||
char row_delimiter_,
|
||||
bool hash_exchange_,
|
||||
size_t num_queues_,
|
||||
const String & deadletter_exchange_,
|
||||
uint32_t queue_size_,
|
||||
const std::atomic<bool> & stopped_);
|
||||
|
||||
@ -53,6 +48,7 @@ public:
|
||||
{
|
||||
String message;
|
||||
String message_id;
|
||||
uint64_t timestamp;
|
||||
bool redelivered;
|
||||
AckTracker track;
|
||||
};
|
||||
@ -75,34 +71,26 @@ public:
|
||||
auto getDeliveryTag() const { return current.track.delivery_tag; }
|
||||
auto getRedelivered() const { return current.redelivered; }
|
||||
auto getMessageID() const { return current.message_id; }
|
||||
auto getTimestamp() const { return current.timestamp; }
|
||||
|
||||
private:
|
||||
bool nextImpl() override;
|
||||
|
||||
void bindQueue(size_t queue_id);
|
||||
void subscribe();
|
||||
void iterateEventLoop();
|
||||
|
||||
ChannelPtr consumer_channel;
|
||||
ChannelPtr setup_channel;
|
||||
HandlerPtr event_handler;
|
||||
|
||||
const String exchange_name;
|
||||
std::vector<String> queues;
|
||||
const String channel_base;
|
||||
const size_t channel_id_base;
|
||||
const String queue_base;
|
||||
const bool hash_exchange;
|
||||
const size_t num_queues;
|
||||
const String deadletter_exchange;
|
||||
Poco::Logger * log;
|
||||
char row_delimiter;
|
||||
bool allowed = true;
|
||||
uint32_t queue_size;
|
||||
const std::atomic<bool> & stopped;
|
||||
|
||||
String channel_id;
|
||||
std::atomic<bool> channel_error = true, wait_subscription = false;
|
||||
std::vector<String> queues;
|
||||
ConcurrentBoundedQueue<MessageData> received;
|
||||
MessageData current;
|
||||
size_t subscribed = 0;
|
||||
|
@ -38,8 +38,10 @@ namespace DB
|
||||
|
||||
static const auto CONNECT_SLEEP = 200;
|
||||
static const auto RETRIES_MAX = 20;
|
||||
static const auto HEARTBEAT_RESCHEDULE_MS = 3000;
|
||||
static const uint32_t QUEUE_SIZE = 100000;
|
||||
static const auto MAX_FAILED_READ_ATTEMPTS = 10;
|
||||
static const auto RESCHEDULE_MS = 500;
|
||||
static const auto MAX_THREAD_WORK_DURATION_MS = 60000;
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
@ -50,6 +52,7 @@ namespace ErrorCodes
|
||||
extern const int CANNOT_BIND_RABBITMQ_EXCHANGE;
|
||||
extern const int CANNOT_DECLARE_RABBITMQ_EXCHANGE;
|
||||
extern const int CANNOT_REMOVE_RABBITMQ_EXCHANGE;
|
||||
extern const int CANNOT_CREATE_RABBITMQ_QUEUE_BINDING;
|
||||
}
|
||||
|
||||
namespace ExchangeType
|
||||
@ -122,9 +125,6 @@ StorageRabbitMQ::StorageRabbitMQ(
|
||||
streaming_task = global_context.getSchedulePool().createTask("RabbitMQStreamingTask", [this]{ streamingToViewsFunc(); });
|
||||
streaming_task->deactivate();
|
||||
|
||||
heartbeat_task = global_context.getSchedulePool().createTask("RabbitMQHeartbeatTask", [this]{ heartbeatFunc(); });
|
||||
heartbeat_task->deactivate();
|
||||
|
||||
if (queue_base.empty())
|
||||
{
|
||||
/* Make sure that local exchange name is unique for each table and is not the same as client's exchange name. It also needs to
|
||||
@ -210,16 +210,6 @@ Context StorageRabbitMQ::addSettings(Context context) const
|
||||
}
|
||||
|
||||
|
||||
void StorageRabbitMQ::heartbeatFunc()
|
||||
{
|
||||
if (!stream_cancelled && event_handler->connectionRunning())
|
||||
{
|
||||
connection->heartbeat();
|
||||
heartbeat_task->scheduleAfter(HEARTBEAT_RESCHEDULE_MS);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void StorageRabbitMQ::loopingFunc()
|
||||
{
|
||||
if (event_handler->connectionRunning())
|
||||
@ -396,13 +386,73 @@ void StorageRabbitMQ::bindExchange()
|
||||
}
|
||||
|
||||
|
||||
void StorageRabbitMQ::bindQueue(size_t queue_id)
|
||||
{
|
||||
std::atomic<bool> binding_created = false;
|
||||
|
||||
auto success_callback = [&](const std::string & queue_name, int msgcount, int /* consumercount */)
|
||||
{
|
||||
queues.emplace_back(queue_name);
|
||||
LOG_DEBUG(log, "Queue {} is declared", queue_name);
|
||||
|
||||
if (msgcount)
|
||||
LOG_INFO(log, "Queue {} is non-empty. Non-consumed messaged will also be delivered", queue_name);
|
||||
|
||||
/* Here we bind either to sharding exchange (consistent-hash) or to bridge exchange (fanout). All bindings to routing keys are
|
||||
* done between client's exchange and local bridge exchange. Binding key must be a string integer in case of hash exchange, for
|
||||
* fanout exchange it can be arbitrary
|
||||
*/
|
||||
setup_channel->bindQueue(consumer_exchange, queue_name, std::to_string(queue_id))
|
||||
.onSuccess([&] { binding_created = true; })
|
||||
.onError([&](const char * message)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::CANNOT_CREATE_RABBITMQ_QUEUE_BINDING,
|
||||
"Failed to create queue binding for exchange {}. Reason: {}", exchange_name, std::string(message));
|
||||
});
|
||||
};
|
||||
|
||||
auto error_callback([&](const char * message)
|
||||
{
|
||||
/* This error is most likely a result of an attempt to declare queue with different settings if it was declared before. So for a
|
||||
* given queue name either deadletter_exchange parameter changed or queue_size changed, i.e. table was declared with different
|
||||
* max_block_size parameter. Solution: client should specify a different queue_base parameter or manually delete previously
|
||||
* declared queues via any of the various cli tools.
|
||||
*/
|
||||
throw Exception("Failed to declare queue. Probably queue settings are conflicting: max_block_size, deadletter_exchange. Attempt \
|
||||
specifying differently those settings or use a different queue_base or manually delete previously declared queues, \
|
||||
which were declared with the same names. ERROR reason: "
|
||||
+ std::string(message), ErrorCodes::BAD_ARGUMENTS);
|
||||
});
|
||||
|
||||
AMQP::Table queue_settings;
|
||||
|
||||
queue_settings["x-max-length"] = queue_size;
|
||||
|
||||
if (!deadletter_exchange.empty())
|
||||
queue_settings["x-dead-letter-exchange"] = deadletter_exchange;
|
||||
else
|
||||
queue_settings["x-overflow"] = "reject-publish";
|
||||
|
||||
/* The first option not just simplifies queue_name, but also implements the possibility to be able to resume reading from one
|
||||
* specific queue when its name is specified in queue_base setting
|
||||
*/
|
||||
const String queue_name = !hash_exchange ? queue_base : std::to_string(queue_id) + "_" + queue_base;
|
||||
setup_channel->declareQueue(queue_name, AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback);
|
||||
|
||||
while (!binding_created)
|
||||
{
|
||||
event_handler->iterateLoop();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
bool StorageRabbitMQ::restoreConnection(bool reconnecting)
|
||||
{
|
||||
size_t cnt_retries = 0;
|
||||
|
||||
if (reconnecting)
|
||||
{
|
||||
deactivateTask(heartbeat_task, false, false);
|
||||
connection->close(); /// Connection might be unusable, but not closed
|
||||
|
||||
/* Connection is not closed immediately (firstly, all pending operations are completed, and then
|
||||
@ -452,11 +502,11 @@ void StorageRabbitMQ::unbindExchange()
|
||||
*/
|
||||
std::call_once(flag, [&]()
|
||||
{
|
||||
heartbeat_task->deactivate();
|
||||
streaming_task->deactivate();
|
||||
event_handler->updateLoopState(Loop::STOP);
|
||||
looping_task->deactivate();
|
||||
|
||||
setup_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
|
||||
setup_channel->removeExchange(bridge_exchange)
|
||||
.onSuccess([&]()
|
||||
{
|
||||
@ -471,6 +521,8 @@ void StorageRabbitMQ::unbindExchange()
|
||||
{
|
||||
event_handler->iterateLoop();
|
||||
}
|
||||
|
||||
setup_channel->close();
|
||||
});
|
||||
}
|
||||
|
||||
@ -499,8 +551,6 @@ Pipe StorageRabbitMQ::read(
|
||||
deactivateTask(looping_task, false, true);
|
||||
|
||||
update_channels = restoreConnection(true);
|
||||
if (update_channels)
|
||||
heartbeat_task->scheduleAfter(HEARTBEAT_RESCHEDULE_MS);
|
||||
}
|
||||
|
||||
Pipes pipes;
|
||||
@ -521,7 +571,6 @@ Pipe StorageRabbitMQ::read(
|
||||
if (event_handler->loopRunning())
|
||||
{
|
||||
deactivateTask(looping_task, false, true);
|
||||
deactivateTask(heartbeat_task, false, false);
|
||||
}
|
||||
|
||||
rabbit_stream->updateChannel();
|
||||
@ -552,6 +601,13 @@ void StorageRabbitMQ::startup()
|
||||
initExchange();
|
||||
bindExchange();
|
||||
|
||||
for (size_t i = 1; i <= num_queues; ++i)
|
||||
{
|
||||
bindQueue(i);
|
||||
}
|
||||
|
||||
setup_channel->close();
|
||||
|
||||
for (size_t i = 0; i < num_consumers; ++i)
|
||||
{
|
||||
try
|
||||
@ -568,7 +624,6 @@ void StorageRabbitMQ::startup()
|
||||
|
||||
event_handler->updateLoopState(Loop::RUN);
|
||||
streaming_task->activateAndSchedule();
|
||||
heartbeat_task->activateAndSchedule();
|
||||
}
|
||||
|
||||
|
||||
@ -579,7 +634,6 @@ void StorageRabbitMQ::shutdown()
|
||||
|
||||
deactivateTask(streaming_task, true, false);
|
||||
deactivateTask(looping_task, true, true);
|
||||
deactivateTask(heartbeat_task, true, false);
|
||||
|
||||
connection->close();
|
||||
|
||||
@ -635,9 +689,8 @@ ConsumerBufferPtr StorageRabbitMQ::createReadBuffer()
|
||||
ChannelPtr consumer_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
|
||||
|
||||
return std::make_shared<ReadBufferFromRabbitMQConsumer>(
|
||||
consumer_channel, setup_channel, event_handler, consumer_exchange, ++consumer_id,
|
||||
unique_strbase, queue_base, log, row_delimiter, hash_exchange, num_queues,
|
||||
deadletter_exchange, queue_size, stream_cancelled);
|
||||
consumer_channel, event_handler, queues, ++consumer_id,
|
||||
unique_strbase, log, row_delimiter, queue_size, stream_cancelled);
|
||||
}
|
||||
|
||||
|
||||
@ -683,11 +736,14 @@ void StorageRabbitMQ::streamingToViewsFunc()
|
||||
try
|
||||
{
|
||||
auto table_id = getStorageID();
|
||||
|
||||
// Check if at least one direct dependency is attached
|
||||
size_t dependencies_count = DatabaseCatalog::instance().getDependencies(table_id).size();
|
||||
|
||||
if (dependencies_count)
|
||||
{
|
||||
auto start_time = std::chrono::steady_clock::now();
|
||||
|
||||
// Keep streaming as long as there are attached views and streaming is not cancelled
|
||||
while (!stream_cancelled && num_created_consumers > 0)
|
||||
{
|
||||
@ -696,8 +752,17 @@ void StorageRabbitMQ::streamingToViewsFunc()
|
||||
|
||||
LOG_DEBUG(log, "Started streaming to {} attached views", dependencies_count);
|
||||
|
||||
if (!streamToViews())
|
||||
if (streamToViews())
|
||||
break;
|
||||
|
||||
auto end_time = std::chrono::steady_clock::now();
|
||||
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
|
||||
if (duration.count() > MAX_THREAD_WORK_DURATION_MS)
|
||||
{
|
||||
event_handler->updateLoopState(Loop::STOP);
|
||||
LOG_TRACE(log, "Reschedule streaming. Thread work duration limit exceeded.");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -708,7 +773,7 @@ void StorageRabbitMQ::streamingToViewsFunc()
|
||||
|
||||
/// Wait for attached views
|
||||
if (!stream_cancelled)
|
||||
streaming_task->schedule();
|
||||
streaming_task->scheduleAfter(RESCHEDULE_MS);
|
||||
}
|
||||
|
||||
|
||||
@ -731,13 +796,6 @@ bool StorageRabbitMQ::streamToViews()
|
||||
auto column_names = block_io.out->getHeader().getNames();
|
||||
auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
|
||||
|
||||
/* event_handler->connectionRunning() does not guarantee that connection is not closed in case loop was not running before, but
|
||||
* need to anyway start the loop to activate error callbacks and update connection state, because even checking with
|
||||
* connection->usable() will not give correct answer before callbacks are activated.
|
||||
*/
|
||||
if (!event_handler->loopRunning() && event_handler->connectionRunning())
|
||||
looping_task->activateAndSchedule();
|
||||
|
||||
auto block_size = getMaxBlockSize();
|
||||
|
||||
// Create a stream for each consumer and join them in a union stream
|
||||
@ -770,34 +828,45 @@ bool StorageRabbitMQ::streamToViews()
|
||||
in = streams[0];
|
||||
|
||||
std::atomic<bool> stub = {false};
|
||||
|
||||
if (!event_handler->loopRunning())
|
||||
{
|
||||
event_handler->updateLoopState(Loop::RUN);
|
||||
looping_task->activateAndSchedule();
|
||||
}
|
||||
|
||||
copyData(*in, *block_io.out, &stub);
|
||||
|
||||
/* Need to stop loop even if connection is ok, because sending ack() with loop running in another thread will lead to a lot of data
|
||||
* races inside the library, but only in case any error occurs or connection is lost while ack is being sent
|
||||
/* Note: sending ack() with loop running in another thread will lead to a lot of data races inside the library, but only in case
|
||||
* error occurs or connection is lost while ack is being sent
|
||||
*/
|
||||
if (event_handler->loopRunning())
|
||||
deactivateTask(looping_task, false, true);
|
||||
deactivateTask(looping_task, false, true);
|
||||
size_t queue_empty = 0;
|
||||
|
||||
if (!event_handler->connectionRunning())
|
||||
{
|
||||
if (!stream_cancelled && restoreConnection(true))
|
||||
if (stream_cancelled)
|
||||
return true;
|
||||
|
||||
if (restoreConnection(true))
|
||||
{
|
||||
for (auto & stream : streams)
|
||||
stream->as<RabbitMQBlockInputStream>()->updateChannel();
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Reschedule if unable to connect to rabbitmq or quit if cancelled
|
||||
return false;
|
||||
LOG_TRACE(log, "Reschedule streaming. Unable to restore connection.");
|
||||
return true;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
deactivateTask(heartbeat_task, false, false);
|
||||
|
||||
/// Commit
|
||||
for (auto & stream : streams)
|
||||
{
|
||||
if (stream->as<RabbitMQBlockInputStream>()->queueEmpty())
|
||||
++queue_empty;
|
||||
|
||||
/* false is returned by the sendAck function in only two cases:
|
||||
* 1) if connection failed. In this case all channels will be closed and will be unable to send ack. Also ack is made based on
|
||||
* delivery tags, which are unique to channels, so if channels fail, those delivery tags will become invalid and there is
|
||||
@ -828,19 +897,25 @@ bool StorageRabbitMQ::streamToViews()
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
event_handler->iterateLoop();
|
||||
}
|
||||
}
|
||||
|
||||
event_handler->updateLoopState(Loop::RUN);
|
||||
looping_task->activateAndSchedule();
|
||||
heartbeat_task->scheduleAfter(HEARTBEAT_RESCHEDULE_MS); /// It is also deactivated in restoreConnection(), so reschedule anyway
|
||||
if ((queue_empty == num_created_consumers) && (++read_attempts == MAX_FAILED_READ_ATTEMPTS))
|
||||
{
|
||||
connection->heartbeat();
|
||||
read_attempts = 0;
|
||||
LOG_TRACE(log, "Reschedule streaming. Queues are empty.");
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
event_handler->updateLoopState(Loop::RUN);
|
||||
looping_task->activateAndSchedule();
|
||||
}
|
||||
|
||||
// Check whether the limits were applied during query execution
|
||||
bool limits_applied = false;
|
||||
const BlockStreamProfileInfo & info = in->getProfileInfo();
|
||||
limits_applied = info.hasAppliedLimit();
|
||||
|
||||
return limits_applied;
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@ -907,7 +982,8 @@ NamesAndTypesList StorageRabbitMQ::getVirtuals() const
|
||||
{"_channel_id", std::make_shared<DataTypeString>()},
|
||||
{"_delivery_tag", std::make_shared<DataTypeUInt64>()},
|
||||
{"_redelivered", std::make_shared<DataTypeUInt8>()},
|
||||
{"_message_id", std::make_shared<DataTypeString>()}
|
||||
{"_message_id", std::make_shared<DataTypeString>()},
|
||||
{"_timestamp", std::make_shared<DataTypeUInt64>()}
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -114,14 +114,15 @@ private:
|
||||
std::atomic<bool> wait_confirm = true; /// needed to break waiting for confirmations for producer
|
||||
std::atomic<bool> exchange_removed = false;
|
||||
ChannelPtr setup_channel;
|
||||
std::vector<String> queues;
|
||||
|
||||
std::once_flag flag; /// remove exchange only once
|
||||
std::mutex task_mutex;
|
||||
BackgroundSchedulePool::TaskHolder streaming_task;
|
||||
BackgroundSchedulePool::TaskHolder heartbeat_task;
|
||||
BackgroundSchedulePool::TaskHolder looping_task;
|
||||
|
||||
std::atomic<bool> stream_cancelled{false};
|
||||
size_t read_attempts = 0;
|
||||
|
||||
ConsumerBufferPtr createReadBuffer();
|
||||
|
||||
@ -140,6 +141,7 @@ private:
|
||||
|
||||
void initExchange();
|
||||
void bindExchange();
|
||||
void bindQueue(size_t queue_id);
|
||||
|
||||
bool restoreConnection(bool reconnecting);
|
||||
bool streamToViews();
|
||||
|
@ -537,14 +537,14 @@ def test_rabbitmq_big_message(rabbitmq_cluster):
|
||||
@pytest.mark.timeout(420)
|
||||
def test_rabbitmq_sharding_between_queues_publish(rabbitmq_cluster):
|
||||
NUM_CONSUMERS = 10
|
||||
NUM_QUEUES = 2
|
||||
NUM_QUEUES = 10
|
||||
|
||||
instance.query('''
|
||||
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
|
||||
ENGINE = RabbitMQ
|
||||
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
||||
rabbitmq_exchange_name = 'test_sharding',
|
||||
rabbitmq_num_queues = 2,
|
||||
rabbitmq_num_queues = 10,
|
||||
rabbitmq_num_consumers = 10,
|
||||
rabbitmq_format = 'JSONEachRow',
|
||||
rabbitmq_row_delimiter = '\\n';
|
||||
@ -617,7 +617,7 @@ def test_rabbitmq_mv_combo(rabbitmq_cluster):
|
||||
rabbitmq_exchange_name = 'combo',
|
||||
rabbitmq_queue_base = 'combo',
|
||||
rabbitmq_num_consumers = 2,
|
||||
rabbitmq_num_queues = 2,
|
||||
rabbitmq_num_queues = 5,
|
||||
rabbitmq_format = 'JSONEachRow',
|
||||
rabbitmq_row_delimiter = '\\n';
|
||||
''')
|
||||
@ -879,7 +879,7 @@ def test_rabbitmq_overloaded_insert(rabbitmq_cluster):
|
||||
rabbitmq_queue_base = 'over',
|
||||
rabbitmq_exchange_type = 'direct',
|
||||
rabbitmq_num_consumers = 5,
|
||||
rabbitmq_num_queues = 2,
|
||||
rabbitmq_num_queues = 10,
|
||||
rabbitmq_max_block_size = 10000,
|
||||
rabbitmq_routing_key_list = 'over',
|
||||
rabbitmq_format = 'TSV',
|
||||
@ -1722,7 +1722,7 @@ def test_rabbitmq_restore_failed_connection_without_losses_2(rabbitmq_cluster):
|
||||
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
||||
rabbitmq_exchange_name = 'consumer_reconnect',
|
||||
rabbitmq_num_consumers = 10,
|
||||
rabbitmq_num_queues = 2,
|
||||
rabbitmq_num_queues = 10,
|
||||
rabbitmq_format = 'JSONEachRow',
|
||||
rabbitmq_row_delimiter = '\\n';
|
||||
''')
|
||||
|
Loading…
Reference in New Issue
Block a user