This commit is contained in:
kssenii 2020-05-26 17:34:57 +00:00
parent c3569882bb
commit 14c67c6ae6
10 changed files with 94 additions and 77 deletions

View File

@ -5,6 +5,11 @@
#include <Storages/RabbitMQ/RabbitMQBlockInputStream.h>
#include <Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h>
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace DB
{
@ -117,13 +122,13 @@ Block RabbitMQBlockInputStream::readImpl()
auto new_rows = read_rabbitmq_message();
auto _exchange = storage.getExchangeName();
auto _routingKey = storage.getRoutingKey();
auto exchange_name = storage.getExchangeName();
auto routing_key = storage.getRoutingKey();
for (size_t i = 0; i < new_rows; ++i)
{
virtual_columns[0]->insert(_exchange);
virtual_columns[1]->insert(_routingKey);
virtual_columns[0]->insert(exchange_name);
virtual_columns[1]->insert(routing_key);
}
total_rows = total_rows + new_rows;

View File

@ -25,7 +25,7 @@ public:
void readPrefixImpl() override;
Block readImpl() override;
//void readSuffixImpl() override;
///void readSuffixImpl() override;
private:
StorageRabbitMQ & storage;

View File

@ -12,14 +12,14 @@ RabbitMQHandler::RabbitMQHandler(event_base * evbase_, Poco::Logger * log_) :
}
void RabbitMQHandler::onError(AMQP::TcpConnection * /*connection*/, const char * message)
void RabbitMQHandler::onError(AMQP::TcpConnection * , const char * message)
{
LOG_ERROR(log, "Library error report: " << message);
stop();
}
void RabbitMQHandler::startNonBlock()
void RabbitMQHandler::start()
{
event_base_loop(evbase, EVLOOP_NONBLOCK);
}

View File

@ -17,7 +17,7 @@ public:
RabbitMQHandler(event_base * evbase_, Poco::Logger * log_);
void onError(AMQP::TcpConnection * connection, const char * message) override;
void startNonBlock();
void start();
void stop();
private:

View File

@ -15,7 +15,6 @@ namespace DB
M(SettingString, rabbitmq_exchange_name, "clickhouse-exchange", "The exhange name, to which messages are sent. Needed to bind queues to it.", 0) \
M(SettingString, rabbitmq_format, "", "The message format.", 0) \
M(SettingChar, rabbitmq_row_delimiter, '\0', "The character to be considered as a delimiter.", 0) \
M(SettingUInt64, rabbitmq_bind_by_id, 0, "A flag which indicates that binding should be done in range [0, num_consumers * num_queues).", 0) \
M(SettingUInt64, rabbitmq_num_consumers, 1, "The number of consumer channels per table.", 0) \
M(SettingUInt64, rabbitmq_num_queues, 1, "The number of queues per consumer.", 0) \
M(SettingUInt64, rabbitmq_hash_exchange, 0, "A flag which indicates whether consistent-hash-exchange should be used.", 0) \

View File

@ -1,10 +1,18 @@
#include <utility>
#include <chrono>
#include <thread>
#include <Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h>
#include <Storages/RabbitMQ/RabbitMQHandler.h>
#include <common/logger_useful.h>
#include <amqpcpp.h>
enum
{
Connection_setup_sleep = 200,
Connection_setup_retries_max = 1000
};
namespace DB
{
@ -38,11 +46,21 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
* because in case when num_consumers > 1 - inputStreams run asynchronously and if they share the same connection,
* then they also will share the same event loop. But it will mean that if one stream's consumer starts event loop,
* then it will run all callbacks on the connection - including other stream's consumer's callbacks -
* it result in asynchronous run of the same code and lead to occasional seg faults.
* it result in asynchronous run of the same code (because local variables can be updated both by the current thread
* and in callbacks by another thread during event loop, which is blocking only to the thread that has started the loop).
* So sharing the connection (== sharing event loop) results in occasional seg faults in case of asynchronous run of objects that share the connection.
*/
while (!connection.ready())
size_t cnt_retries = 0;
while (!connection.ready() && ++cnt_retries != Connection_setup_retries_max)
{
event_base_loop(evbase, EVLOOP_NONBLOCK | EVLOOP_ONCE);
std::this_thread::sleep_for(std::chrono::milliseconds(Connection_setup_sleep));
}
if (!connection.ready())
{
LOG_ERROR(log, "Cannot set up connection for consumer");
}
consumer_channel = std::make_shared<AMQP::TcpChannel>(&connection);
@ -85,12 +103,12 @@ void ReadBufferFromRabbitMQConsumer::initExchange()
if (hash_exchange)
{
current_exchange_name = exchange_name + "_hash";
consumer_channel->declareExchange(current_exchange_name, AMQP::consistent_hash).onError([&](const char * message)
consumer_channel->declareExchange(current_exchange_name, AMQP::consistent_hash).onError([&](const char * /* message */)
{
exchange_declared = false;
});
consumer_channel->bindExchange(exchange_name, current_exchange_name, routing_key).onError([&](const char * message)
consumer_channel->bindExchange(exchange_name, current_exchange_name, routing_key).onError([&](const char * /* message */)
{
exchange_declared = false;
});
@ -98,12 +116,12 @@ void ReadBufferFromRabbitMQConsumer::initExchange()
else
{
current_exchange_name = exchange_name + "_direct";
consumer_channel->declareExchange(current_exchange_name, AMQP::direct).onError([&](const char * message)
consumer_channel->declareExchange(current_exchange_name, AMQP::direct).onError([&](const char * /* message */)
{
exchange_declared = false;
});
consumer_channel->bindExchange(exchange_name, current_exchange_name, routing_key).onError([&](const char * message)
consumer_channel->bindExchange(exchange_name, current_exchange_name, routing_key).onError([&](const char * /* message */)
{
exchange_declared = false;
});
@ -113,30 +131,36 @@ void ReadBufferFromRabbitMQConsumer::initExchange()
void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
{
/* This varibale can be updated from a different thread in case of some error so its better to always check
* whether exchange is in a working state and if not - declare it once again.
*/
if (!exchange_declared)
{
initExchange();
exchange_declared = true;
}
bool bindings_ok = false, bindings_error = false;
bool bindings_created = false, bindings_error = false;
consumer_channel->declareQueue(AMQP::exclusive)
.onSuccess([&](const std::string & queue_name_, int /* msgcount */, int /* consumercount */)
{
queues.emplace_back(queue_name_);
String binding_key = routing_key;
if (bind_by_id && !hash_exchange)
/* Every consumer has at least one unique queue. Bind the queues to exchange based on the consumer_channel_id
* in case there is one queue per consumer and bind by queue_id in case there is more than 1 queue per consumer.
* (queue_id is based on channel_id)
*/
if (bind_by_id || hash_exchange)
{
if (queues.size() == 1)
{
binding_key = routing_key + "_" + std::to_string(channel_id);
binding_key = std::to_string(channel_id);
}
else
{
binding_key = routing_key + "_" + std::to_string(channel_id + queue_id);
binding_key = std::to_string(channel_id + queue_id);
}
}
@ -145,7 +169,7 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
consumer_channel->bindQueue(current_exchange_name, queue_name_, binding_key)
.onSuccess([&]
{
bindings_ok = true;
bindings_created = true;
})
.onError([&](const char * message)
{
@ -159,9 +183,14 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
LOG_ERROR(log, "Failed to declare queue on the channel: " << message);
});
while (!bindings_ok && !bindings_error)
/* Run event loop (which updates local variables in a separate thread) until bindings are created or failed to be created.
* It is important at this moment to make sure that queue bindings are created before any publishing can happen because
* otherwise messages will be routed nowhere.
*/
while (!bindings_created && !bindings_error)
{
startNonBlockEventLoop();
/// No need for timeouts as this event loop is blocking for the current thread and quits in case there are no active events
startEventLoop();
}
}
@ -184,17 +213,14 @@ void ReadBufferFromRabbitMQConsumer::subscribeConsumer()
void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name)
{
bool consumer_ok = false, consumer_error = false;
bool consumer_created = false, consumer_error = false;
consumer_channel->consume(queue_name, AMQP::noack)
.onSuccess([&](const std::string & consumer)
.onSuccess([&](const std::string & /* consumer */)
{
if (consumerTag == "")
consumerTag = consumer;
consumer_created = true;
consumer_ok = true;
LOG_TRACE(log, "Consumer " + consumerTag + " is subscribed to queue " + queue_name);
LOG_TRACE(log, "Consumer " + std::to_string(channel_id) + " is subscribed to queue " + queue_name);
})
.onReceived([&](const AMQP::Message & message, uint64_t /* deliveryTag */, bool /* redelivered */)
{
@ -218,16 +244,16 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name)
LOG_ERROR(log, "Consumer failed: " << message);
});
while (!consumer_ok && !consumer_error)
while (!consumer_created && !consumer_error)
{
startNonBlockEventLoop();
startEventLoop();
}
}
void ReadBufferFromRabbitMQConsumer::startNonBlockEventLoop()
void ReadBufferFromRabbitMQConsumer::startEventLoop()
{
eventHandler.startNonBlock();
eventHandler.start();
}
@ -242,12 +268,12 @@ bool ReadBufferFromRabbitMQConsumer::nextImpl()
{
/* Run the onReceived callbacks to save the messages that have been received by now
*/
startNonBlockEventLoop();
startEventLoop();
}
if (received.empty())
{
LOG_TRACE(log, "Stalled");
LOG_TRACE(log, "No more messages to be fetched");
return false;
}
@ -256,7 +282,7 @@ bool ReadBufferFromRabbitMQConsumer::nextImpl()
current = messages.begin();
}
auto new_position = const_cast<char *>(current->data());
auto * new_position = const_cast<char *>(current->data());
BufferBase::set(new_position, current->size(), 0);
++current;

View File

@ -59,9 +59,8 @@ private:
bool allowed = true;
const std::atomic<bool> & stopped;
std::atomic<bool> exchange_declared = false;
bool exchange_declared = false;
const size_t num_queues;
String consumerTag; // ID for the consumer
Queues queues;
bool subscribed = false;
String current_exchange_name;
@ -75,7 +74,7 @@ private:
void initExchange();
void initQueueBindings(const size_t queue_id);
void subscribe(const String & queue_name);
void startNonBlockEventLoop();
void startEventLoop();
};
}

View File

@ -33,6 +33,11 @@
#include <amqpcpp.h>
enum
{
RESCHEDULE_WAIT = 500
};
namespace DB
{
@ -55,7 +60,6 @@ StorageRabbitMQ::StorageRabbitMQ(
const String & format_name_,
char row_delimiter_,
size_t num_consumers_,
bool bind_by_id_,
size_t num_queues_,
bool hash_exchange_)
: IStorage(table_id_)
@ -66,7 +70,6 @@ StorageRabbitMQ::StorageRabbitMQ(
, format_name(global_context.getMacros()->expand(format_name_))
, row_delimiter(row_delimiter_)
, num_consumers(num_consumers_)
, bind_by_id(bind_by_id_)
, num_queues(num_queues_)
, hash_exchange(hash_exchange_)
, log(&Logger::get("StorageRabbitMQ (" + table_id_.table_name + ")"))
@ -79,8 +82,7 @@ StorageRabbitMQ::StorageRabbitMQ(
task = global_context.getSchedulePool().createTask(log->name(), [this]{ threadFunc(); });
task->deactivate();
/// Enable a different routing algorithm.
bind_by_id = num_consumers > 1 || num_queues > 1 || bind_by_id;
bind_by_id = num_consumers > 1 || num_queues > 1;
}
@ -181,7 +183,8 @@ ConsumerBufferPtr StorageRabbitMQ::createReadBuffer()
next_channel_id += num_queues;
update_channel_id = true;
return std::make_shared<ReadBufferFromRabbitMQConsumer>(parsed_address, exchange_name, routing_key, next_channel_id,
return std::make_shared<ReadBufferFromRabbitMQConsumer>(
parsed_address, exchange_name, routing_key, next_channel_id,
log, row_delimiter, bind_by_id, hash_exchange, num_queues, stream_cancelled);
}
@ -244,7 +247,7 @@ void StorageRabbitMQ::threadFunc()
/// Wait for attached views
if (!stream_cancelled)
task->scheduleAfter(500);
task->scheduleAfter(RESCHEDULE_WAIT);
}
@ -397,13 +400,13 @@ void registerStorageRabbitMQ(StorageFactory & factory)
}
}
size_t bind_by_id = static_cast<bool>(rabbitmq_settings.rabbitmq_bind_by_id);
bool hash_exchange = static_cast<bool>(rabbitmq_settings.rabbitmq_hash_exchange);
if (args_count >= 6)
{
const auto * ast = engine_args[5]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::UInt64)
{
bind_by_id = static_cast<bool>(safeGet<UInt64>(ast->value));
hash_exchange = static_cast<bool>(safeGet<UInt64>(ast->value));
}
else
{
@ -439,22 +442,8 @@ void registerStorageRabbitMQ(StorageFactory & factory)
}
}
size_t hash_exchange = static_cast<bool>(rabbitmq_settings.rabbitmq_hash_exchange);
if (args_count >= 9)
{
const auto * ast = engine_args[8]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::UInt64)
{
hash_exchange = static_cast<bool>(safeGet<UInt64>(ast->value));
}
else
{
throw Exception("Hash exchange flag must be a boolean", ErrorCodes::BAD_ARGUMENTS);
}
}
return StorageRabbitMQ::create(args.table_id, args.context, args.columns, host_port, routing_key, exchange,
format, row_delimiter, num_consumers, bind_by_id, num_queues, hash_exchange);
format, row_delimiter, num_consumers, num_queues, hash_exchange);
};
factory.registerStorage("RabbitMQ", creator_fn, StorageFactory::StorageFeatures{ .supports_settings = true, });

View File

@ -53,10 +53,13 @@ protected:
Context & context_,
const ColumnsDescription & columns_,
const String & host_port_,
const String & routing_key_, const String & exchange_name_,
const String & format_name_, char row_delimiter_,
size_t num_consumers_, bool bind_by_id_, size_t num_queues_, bool hash_exchange);
const String & routing_key_,
const String & exchange_name_,
const String & format_name_,
char row_delimiter_,
size_t num_consumers_,
size_t num_queues_,
bool hash_exchange);
private:
Context global_context;
@ -80,7 +83,7 @@ private:
std::mutex mutex;
std::vector<ConsumerBufferPtr> buffers; /// available buffers for RabbitMQ consumers
size_t next_channel_id = 0;
size_t next_channel_id = 1; /// Must >= 1 because it is used as a binding key, which has to be > 0
bool update_channel_id = false;
BackgroundSchedulePool::TaskHolder task;

View File

@ -528,7 +528,7 @@ def test_rabbitmq_sharding_between_tables(rabbitmq_cluster):
for _ in range(messages_num):
messages.append(json.dumps({'key': i[0], 'value': i[0]}))
i[0] += 1
key = 'topic_' + str(randrange(0, NUMBER_OF_CONCURRENT_CONSUMERS))
key = str(randrange(1, NUMBER_OF_CONCURRENT_CONSUMERS))
for message in messages:
channel.basic_publish(exchange='clickhouse-exchange', routing_key=key, body=message)
connection.close()
@ -576,7 +576,6 @@ def test_rabbitmq_sharding_between_channels_publish(rabbitmq_cluster):
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_routing_key = 'clickhouse',
rabbitmq_num_consumers = 5,
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
@ -605,7 +604,7 @@ def test_rabbitmq_sharding_between_channels_publish(rabbitmq_cluster):
for _ in range(messages_num):
messages.append(json.dumps({'key': i[0], 'value': i[0]}))
i[0] += 1
key = 'clickhouse_' + str(randrange(0, NUM_CHANNELS))
key = str(randrange(1, NUM_CHANNELS))
for message in messages:
channel.basic_publish(exchange='clickhouse-exchange', routing_key=key, body=message)
connection.close()
@ -641,7 +640,6 @@ def test_rabbitmq_sharding_between_queues_publish(rabbitmq_cluster):
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_num_queues = 4,
rabbitmq_routing_key = 'clickhouse',
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
DROP TABLE IF EXISTS test.view;
@ -669,7 +667,7 @@ def test_rabbitmq_sharding_between_queues_publish(rabbitmq_cluster):
for _ in range(messages_num):
messages.append(json.dumps({'key': i[0], 'value': i[0]}))
i[0] += 1
key = 'clickhouse_' + str(randrange(0, NUM_QUEUES))
key = str(randrange(1, NUM_QUEUES))
for message in messages:
channel.basic_publish(exchange='clickhouse-exchange', routing_key=key, body=message)
connection.close()
@ -707,7 +705,6 @@ def test_rabbitmq_sharding_between_channels_and_queues_publish(rabbitmq_cluster)
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_num_queues = 2,
rabbitmq_num_consumers = 10,
rabbitmq_routing_key = 'clickhouse',
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
DROP TABLE IF EXISTS test.view;
@ -735,7 +732,7 @@ def test_rabbitmq_sharding_between_channels_and_queues_publish(rabbitmq_cluster)
for _ in range(messages_num):
messages.append(json.dumps({'key': i[0], 'value': i[0]}))
i[0] += 1
key = 'clickhouse_' + str(randrange(0, NUM_QUEUES * NUM_CONSUMERS))
key = str(randrange(1, NUM_QUEUES * NUM_CONSUMERS))
for message in messages:
channel.basic_publish(exchange='clickhouse-exchange', routing_key=key, body=message)
connection.close()
@ -772,7 +769,6 @@ def test_rabbitmq_read_only_combo(rabbitmq_cluster):
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_num_consumers = 4,
rabbitmq_routing_key = 'clickhouse',
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
''')
@ -807,7 +803,7 @@ def test_rabbitmq_read_only_combo(rabbitmq_cluster):
for _ in range(messages_num):
messages.append(json.dumps({'key': i[0], 'value': i[0]}))
i[0] += 1
key = 'clickhouse_' + str(randrange(0, NUM_CONSUMERS))
key = str(randrange(1, NUM_CONSUMERS))
for message in messages:
channel.basic_publish(exchange='clickhouse-exchange', routing_key=key, body=message)
connection.close()