Some fixes

This commit is contained in:
kssenii 2020-06-11 20:05:35 +00:00
parent 626eb53baa
commit 3b0a3e00c0
7 changed files with 51 additions and 33 deletions

View File

@ -37,11 +37,12 @@ void RabbitMQHandler::onError(AMQP::TcpConnection * connection, const char * mes
void RabbitMQHandler::startConsumerLoop(std::atomic<bool> & loop_started)
{
/* The object of this class is shared between concurrent consumers (who share the same connection == share the same
* event loop). But the loop should not be attempted to start if it is already running.
* event loop and handler). But the loop should not be attempted to start if it is already running.
*/
if (mutex_before_event_loop.try_lock_for(std::chrono::milliseconds(Lock_timeout)))
{
loop_started = true;
stop_scheduled.store(false);
event_base_loop(evbase, EVLOOP_NONBLOCK);
mutex_before_event_loop.unlock();
}
@ -56,7 +57,7 @@ void RabbitMQHandler::startProducerLoop()
void RabbitMQHandler::stop()
{
if (mutex_before_loop_stop.try_lock_for(std::chrono::milliseconds(0)))
if (mutex_before_loop_stop.try_lock())
{
event_base_loopbreak(evbase);
mutex_before_loop_stop.unlock();
@ -66,8 +67,9 @@ void RabbitMQHandler::stop()
void RabbitMQHandler::stopWithTimeout()
{
if (mutex_before_loop_stop.try_lock_for(std::chrono::milliseconds(0)))
if (mutex_before_loop_stop.try_lock())
{
stop_scheduled.store(true);
event_base_loopexit(evbase, &tv);
mutex_before_loop_stop.unlock();
}

View File

@ -23,15 +23,16 @@ public:
void startProducerLoop();
void stopWithTimeout();
void stop();
std::atomic<bool> & checkStopIsScheduled() { return stop_scheduled; };
private:
event_base * evbase;
Poco::Logger * log;
timeval tv;
size_t count_passed = 0;
std::atomic<bool> stop_scheduled = false;
std::timed_mutex mutex_before_event_loop;
std::timed_mutex mutex_before_loop_stop;
std::mutex mutex_before_loop_stop;
};
}

View File

@ -15,9 +15,9 @@ namespace DB
M(SettingString, rabbitmq_exchange_name, "clickhouse-exchange", "The exchange name, to which messages are sent.", 0) \
M(SettingString, rabbitmq_format, "", "The message format.", 0) \
M(SettingChar, rabbitmq_row_delimiter, '\0', "The character to be considered as a delimiter.", 0) \
M(SettingString, rabbitmq_exchange_type, "default", "The exchange type.", 0) \
M(SettingUInt64, rabbitmq_num_consumers, 1, "The number of consumer channels per table.", 0) \
M(SettingUInt64, rabbitmq_num_queues, 1, "The number of queues per consumer.", 0) \
M(SettingString, rabbitmq_exchange_type, "default", "The exchange type.", 0) \
DECLARE_SETTINGS_COLLECTION(LIST_OF_RABBITMQ_SETTINGS)

View File

@ -13,6 +13,10 @@
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
namespace Exchange
{
@ -22,6 +26,7 @@ namespace Exchange
static const String DIRECT = "direct";
static const String TOPIC = "topic";
static const String HASH = "consistent_hash";
static const String HEADERS = "headers";
}
@ -55,7 +60,7 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
messages.clear();
current = messages.begin();
exchange_type_set = exchange_type != Exchange::DEFAULT ? true : false;
exchange_type_set = exchange_type != Exchange::DEFAULT;
/* One queue per consumer can handle up to 50000 messages. More queues per consumer can be added.
* By default there is one queue per consumer.
@ -81,7 +86,7 @@ ReadBufferFromRabbitMQConsumer::~ReadBufferFromRabbitMQConsumer()
void ReadBufferFromRabbitMQConsumer::initExchange()
{
/* If exchange_type is not set - then direct-exchange is used - this type of exchange is the fastest (also due to different
* binding algorithm this default behaviuor is much faster). It is also used in INSERT query.
* binding algorithm this default behaviuor is much faster). It is also used in INSERT query (so it is always declared).
*/
String producer_exchange = exchange_type_set ? exchange_name + "_" + Exchange::DEFAULT : exchange_name;
consumer_channel->declareExchange(producer_exchange, AMQP::fanout).onError([&](const char * message)
@ -114,10 +119,12 @@ void ReadBufferFromRabbitMQConsumer::initExchange()
else if (exchange_type == Exchange::DIRECT) type = AMQP::ExchangeType::direct;
else if (exchange_type == Exchange::TOPIC) type = AMQP::ExchangeType::topic;
else if (exchange_type == Exchange::HASH) type = AMQP::ExchangeType::consistent_hash;
else return;
else if (exchange_type == Exchange::HEADERS)
throw Exception("Headers exchange is not supported", ErrorCodes::BAD_ARGUMENTS);
else throw Exception("Invalid exchange type", ErrorCodes::BAD_ARGUMENTS);
/* Declare exchange of the specified type and bind it to hash-exchange, which will evenly distribute messages
* between all consumers. (This enables better scaling as without hash-echange - the only oprion to avoid getting the same
* between all consumers. (This enables better scaling as without hash-exchange - the only option to avoid getting the same
* messages more than once - is having only one consumer with one queue, which is not good.)
*/
consumer_channel->declareExchange(exchange_name, type).onError([&](const char * message)
@ -156,7 +163,7 @@ void ReadBufferFromRabbitMQConsumer::initExchange()
void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
{
/// These variables might be updated later from a separate thread in onError callbacks
/// These variables might be updated later from a separate thread in onError callbacks.
if (!internal_exchange_declared || (exchange_type_set && !local_exchange_declared))
{
initExchange();
@ -206,7 +213,10 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
LOG_ERROR(log, "Failed to bind to key {}. Reason: {}", binding_key, message);
});
/// Must be done here and not in readPrefix() because library might fail to handle async subscription on the same connection
/* Subscription can probably be moved back to readPrefix(), but not sure whether it is better in regard to speed. Also note
* that if moved there, it must(!) be wrapped inside a channel->onReady callback or any other, otherwise consumer might fail
* to subscribe and no resubscription will help.
*/
subscribe(queues.back());
LOG_TRACE(log, "Queue " + queue_name_ + " is bound by key " + binding_key);
@ -229,7 +239,7 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
}
else
{
/// Means there is only one queue with one consumer - no even distribution needed - no hash-exchange
/// Means there is only one queue with one consumer - no even distribution needed - no hash-exchange.
for (auto & routing_key : routing_keys)
{
/// Binding directly to exchange, specified by the client
@ -274,6 +284,7 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name)
.onSuccess([&](const std::string & /* consumer */)
{
subscribed_queue[queue_name] = true;
consumer_error = false;
++count_subscribed;
LOG_TRACE(log, "Consumer {} is subscribed to queue {}", channel_id, queue_name);
@ -290,24 +301,17 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name)
message_received += row_delimiter;
}
bool stop_loop = false;
/// Needed to avoid data race because this vector can be used at the same time by another thread in nextImpl().
{
std::lock_guard lock(mutex);
received.push_back(message_received);
/* As event loop is blocking to the thread that started it and a single thread should not be blocked while
* executing all callbacks on the connection (not only its own), then there should be some point to unblock.
* loop_started == 1 if current consumer is started the loop and not another.
*/
if (!loop_started)
{
stop_loop = true;
}
}
if (stop_loop)
/* As event loop is blocking to the thread that started it and a single thread should not be blocked while
* executing all callbacks on the connection (not only its own), then there should be some point to unblock.
* loop_started == 1 if current consumer is started the loop and not another.
*/
if (!loop_started.load() && !eventHandler.checkStopIsScheduled().load())
{
stopEventLoopWithTimeout();
}
@ -323,7 +327,6 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name)
void ReadBufferFromRabbitMQConsumer::checkSubscription()
{
/// In general this condition will always be true and looping/resubscribing would not happen
if (count_subscribed == num_queues)
return;
@ -337,7 +340,11 @@ void ReadBufferFromRabbitMQConsumer::checkSubscription()
LOG_TRACE(log, "Consumer {} is subscribed to {} queues", channel_id, count_subscribed);
/// A case that would not normally happen
/// Updated in callbacks which are run by the loop
if (count_subscribed == num_queues)
return;
/// A case that should never normally happen
for (auto & queue : queues)
{
subscribe(queue);
@ -372,9 +379,9 @@ bool ReadBufferFromRabbitMQConsumer::nextImpl()
{
if (received.empty())
{
/// Run the onReceived callbacks to save the messages that have been received by now, blocks current thread
/// Run the onReceived callbacks to save the messages that have been received by now, blocks current thread.
startEventLoop(loop_started);
loop_started = false;
loop_started.store(false);
}
if (received.empty())

View File

@ -450,8 +450,15 @@ void registerStorageRabbitMQ(StorageFactory & factory)
{
exchange_type = safeGet<String>(ast->value);
}
}
if (exchange_type != "fanout" && exchange_type != "direct" && exchange_type != "topic" && exchange_type != "consistent_hash")
{
if (exchange_type == "headers")
throw Exception("Headers exchange is not supported", ErrorCodes::BAD_ARGUMENTS);
else
throw Exception("Invalid exchange type", ErrorCodes::BAD_ARGUMENTS);
}
}
UInt64 num_consumers = rabbitmq_settings.rabbitmq_num_consumers;
if (args_count >= 7)

View File

@ -33,7 +33,7 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
: WriteBuffer(nullptr, 0)
, login_password(login_password_)
, routing_key(routing_key_)
, exchange_name(exchange_)
, exchange_name(exchange_ + "_direct")
, log(log_)
, num_queues(num_queues_)
, bind_by_id(bind_by_id_)
@ -126,7 +126,7 @@ void WriteBufferToRabbitMQProducer::checkExchange()
/* The AMQP::passive flag indicates that it should only be checked if there is a valid exchange with the given name
* and makes it visible from current producer_channel.
*/
producer_channel->declareExchange(exchange_name + "_direct", AMQP::direct, AMQP::passive)
producer_channel->declareExchange(exchange_name, AMQP::direct, AMQP::passive)
.onSuccess([&]()
{
exchange_declared = true;

View File

@ -882,7 +882,7 @@ def test_rabbitmq_many_inserts(rabbitmq_cluster):
@pytest.mark.timeout(240)
def test_rabbitmq_sharding_between_channels_insert(rabbitmq_cluster):
def test_rabbitmq_sharding_between_channels_and_queues_insert(rabbitmq_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view_sharding;
DROP TABLE IF EXISTS test.consumer_sharding;
@ -890,6 +890,7 @@ def test_rabbitmq_sharding_between_channels_insert(rabbitmq_cluster):
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_num_consumers = 5,
rabbitmq_num_queues = 2,
rabbitmq_format = 'TSV',
rabbitmq_row_delimiter = '\\n';
CREATE TABLE test.view_sharding (key UInt64, value UInt64)