Merge pull request #23887 from kssenii/rabbitmq-fixes

RabbitMQ: allow user to define specific queue settings, its own exchange, user-managed queues; add cleanup on drop table
This commit is contained in:
Kruglov Pavel 2021-05-17 12:01:12 +03:00 committed by GitHub
commit 64e7994987
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 361 additions and 127 deletions

View File

@ -53,4 +53,16 @@ void RabbitMQHandler::iterateLoop()
uv_run(loop, UV_RUN_NOWAIT);
}
/// Do not need synchronization as in iterateLoop(), because this method is used only for
/// initial RabbitMQ setup - at this point there is no background loop thread.
void RabbitMQHandler::startBlockingLoop()
{
uv_run(loop, UV_RUN_DEFAULT);
}
void RabbitMQHandler::stopLoop()
{
uv_stop(loop);
}
}

View File

@ -17,6 +17,7 @@ namespace Loop
static const UInt8 STOP = 2;
}
class RabbitMQHandler : public AMQP::LibUvHandler
{
@ -26,9 +27,19 @@ public:
void onError(AMQP::TcpConnection * connection, const char * message) override;
void onReady(AMQP::TcpConnection * connection) override;
/// Loop for background thread worker.
void startLoop();
/// Loop to wait for small tasks in a non-blocking mode.
/// Adds synchronization with main background loop.
void iterateLoop();
/// Loop to wait for small tasks in a blocking mode.
/// No synchronization is done with the main loop thread.
void startBlockingLoop();
void stopLoop();
bool connectionRunning() { return connection_running.load(); }
bool loopRunning() { return loop_running.load(); }

View File

@ -19,12 +19,13 @@ namespace DB
M(UInt64, rabbitmq_num_consumers, 1, "The number of consumer channels per table.", 0) \
M(UInt64, rabbitmq_num_queues, 1, "The number of queues per consumer.", 0) \
M(String, rabbitmq_queue_base, "", "Base for queue names to be able to reopen non-empty queues in case of failure.", 0) \
M(String, rabbitmq_deadletter_exchange, "", "Exchange name to be passed as a dead-letter-exchange name.", 0) \
M(Bool, rabbitmq_persistent, false, "If set, delivery mode will be set to 2 (makes messages 'persistent', durable).", 0) \
M(Bool, rabbitmq_persistent, false, "For insert query messages will be made 'persistent', durable.", 0) \
M(UInt64, rabbitmq_skip_broken_messages, 0, "Skip at least this number of broken messages from RabbitMQ per block", 0) \
M(UInt64, rabbitmq_max_block_size, 0, "Number of row collected before flushing data from RabbitMQ.", 0) \
M(Milliseconds, rabbitmq_flush_interval_ms, 0, "Timeout for flushing data from RabbitMQ.", 0) \
M(String, rabbitmq_vhost, "/", "RabbitMQ vhost.", 0) \
M(String, rabbitmq_queue_settings_list, "", "A list of rabbitmq queue settings", 0) \
M(Bool, rabbitmq_queue_consume, false, "Use user-defined queues and do not make any RabbitMQ setup: declaring exchanges, queues, bindings", 0) \
#define LIST_OF_RABBITMQ_SETTINGS(M) \
RABBITMQ_RELATED_SETTINGS(M) \

View File

@ -56,6 +56,7 @@ public:
ChannelPtr & getChannel() { return consumer_channel; }
void setupChannel();
bool needChannelUpdate();
void closeChannel() { consumer_channel->close(); }
void updateQueues(std::vector<String> & queues_) { queues = queues_; }
size_t queuesCount() { return queues.size(); }

View File

@ -77,20 +77,21 @@ StorageRabbitMQ::StorageRabbitMQ(
: IStorage(table_id_)
, WithContext(context_->getGlobalContext())
, rabbitmq_settings(std::move(rabbitmq_settings_))
, exchange_name(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_exchange_name.value))
, format_name(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_format.value))
, exchange_type(defineExchangeType(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_exchange_type.value)))
, routing_keys(parseRoutingKeys(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_routing_key_list.value)))
, exchange_name(rabbitmq_settings->rabbitmq_exchange_name.value)
, format_name(rabbitmq_settings->rabbitmq_format.value)
, exchange_type(defineExchangeType(rabbitmq_settings->rabbitmq_exchange_type.value))
, routing_keys(parseSettings(rabbitmq_settings->rabbitmq_routing_key_list.value))
, row_delimiter(rabbitmq_settings->rabbitmq_row_delimiter.value)
, schema_name(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_schema.value))
, schema_name(rabbitmq_settings->rabbitmq_schema.value)
, num_consumers(rabbitmq_settings->rabbitmq_num_consumers.value)
, num_queues(rabbitmq_settings->rabbitmq_num_queues.value)
, queue_base(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_queue_base.value))
, deadletter_exchange(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_deadletter_exchange.value))
, queue_base(rabbitmq_settings->rabbitmq_queue_base.value)
, queue_settings_list(parseSettings(rabbitmq_settings->rabbitmq_queue_settings_list.value))
, persistent(rabbitmq_settings->rabbitmq_persistent.value)
, use_user_setup(rabbitmq_settings->rabbitmq_queue_consume.value)
, hash_exchange(num_consumers > 1 || num_queues > 1)
, log(&Poco::Logger::get("StorageRabbitMQ (" + table_id_.table_name + ")"))
, address(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_host_port.value))
, address(rabbitmq_settings->rabbitmq_host_port.value)
, parsed_address(parseAddress(address, 5672))
, login_password(std::make_pair(
getContext()->getConfigRef().getString("rabbitmq.username"),
@ -148,10 +149,12 @@ StorageRabbitMQ::StorageRabbitMQ(
}
Names StorageRabbitMQ::parseRoutingKeys(String routing_key_list)
Names StorageRabbitMQ::parseSettings(String settings_list)
{
Names result;
boost::split(result, routing_key_list, [](char c){ return c == ','; });
if (settings_list.empty())
return result;
boost::split(result, settings_list, [](char c){ return c == ','; });
for (String & key : result)
boost::trim(key);
@ -251,55 +254,65 @@ void StorageRabbitMQ::deactivateTask(BackgroundSchedulePool::TaskHolder & task,
size_t StorageRabbitMQ::getMaxBlockSize() const
{
{
return rabbitmq_settings->rabbitmq_max_block_size.changed
? rabbitmq_settings->rabbitmq_max_block_size.value
: (getContext()->getSettingsRef().max_insert_block_size.value / num_consumers);
}
}
void StorageRabbitMQ::initRabbitMQ()
{
setup_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
if (use_user_setup)
{
queues.emplace_back(queue_base);
rabbit_is_ready = true;
return;
}
initExchange();
bindExchange();
AMQP::TcpChannel rabbit_channel(connection.get());
/// Main exchange -> Bridge exchange -> ( Sharding exchange ) -> Queues -> Consumers
initExchange(rabbit_channel);
bindExchange(rabbit_channel);
for (const auto i : ext::range(0, num_queues))
bindQueue(i + 1);
bindQueue(i + 1, rabbit_channel);
LOG_TRACE(log, "RabbitMQ setup completed");
rabbit_is_ready = true;
setup_channel->close();
rabbit_channel.close();
}
void StorageRabbitMQ::initExchange()
void StorageRabbitMQ::initExchange(AMQP::TcpChannel & rabbit_channel)
{
/* Binding scheme is the following: client's exchange -> key bindings by routing key list -> bridge exchange (fanout) ->
* -> sharding exchange (only if needed) -> queues
*/
setup_channel->declareExchange(exchange_name, exchange_type, AMQP::durable)
/// Exchange hierarchy:
/// 1. Main exchange (defined with table settings - rabbitmq_exchange_name, rabbitmq_exchange_type).
/// 2. Bridge exchange (fanout). Used to easily disconnect main exchange and to simplify queue bindings.
/// 3. Sharding (or hash) exchange. Used in case of multiple queues.
/// 4. Consumer exchange. Just an alias for bridge_exchange or sharding exchange to know to what exchange
/// queues will be bound.
/// All exchanges are declared with options:
/// 1. `durable` (survive RabbitMQ server restart)
/// 2. `autodelete` (auto delete in case of queue bindings are dropped).
rabbit_channel.declareExchange(exchange_name, exchange_type, AMQP::durable)
.onError([&](const char * message)
{
/* This error can be a result of attempt to declare exchange if it was already declared but
* 1) with different exchange type. In this case can
* - manually delete previously declared exchange and create a new one.
* - throw an error that the exchange with this name but another type is already declared and ask client to delete it himself
* if it is not needed anymore or use another exchange name.
* 2) with different exchange settings. This can only happen if client himself declared exchange with the same name and
* specified its own settings, which differ from this implementation.
*/
/// This error can be a result of attempt to declare exchange if it was already declared but
/// 1) with different exchange type.
/// 2) with different exchange settings.
throw Exception("Unable to declare exchange. Make sure specified exchange is not already declared. Error: "
+ std::string(message), ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE);
});
/// Bridge exchange is needed to easily disconnect consumer queues and also simplifies queue bindings
setup_channel->declareExchange(bridge_exchange, AMQP::fanout, AMQP::durable + AMQP::autodelete)
rabbit_channel.declareExchange(bridge_exchange, AMQP::fanout, AMQP::durable | AMQP::autodelete)
.onError([&](const char * message)
{
/// This error is not supposed to happen as this exchange name is always unique to type and its settings
/// This error is not supposed to happen as this exchange name is always unique to type and its settings.
throw Exception(
ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE, "Unable to declare bridge exchange ({}). Reason: {}", bridge_exchange, std::string(message));
});
@ -310,26 +323,26 @@ void StorageRabbitMQ::initExchange()
return;
}
/* Change hash property because by default it will be routing key, which has to be an integer, but with support for any exchange
* type - routing keys might be of any type
*/
AMQP::Table binding_arguments;
/// Default routing key property in case of hash exchange is a routing key, which is required to be an integer.
/// Support for arbitrary exchange type (i.e. arbitrary pattern of routing keys) requires to eliminate this dependency.
/// This settings changes hash property to message_id.
binding_arguments["hash-property"] = "message_id";
/// Declare exchange for sharding.
setup_channel->declareExchange(sharding_exchange, AMQP::consistent_hash, AMQP::durable + AMQP::autodelete, binding_arguments)
/// Declare hash exchange for sharding.
rabbit_channel.declareExchange(sharding_exchange, AMQP::consistent_hash, AMQP::durable | AMQP::autodelete, binding_arguments)
.onError([&](const char * message)
{
/* This error can be a result of same reasons as above for exchange_name, i.e. it will mean that sharding exchange name appeared
* to be the same as some other exchange (which purpose is not for sharding). So probably actual error reason: queue_base parameter
* is bad.
*/
/// This error can be a result of same reasons as above for exchange_name, i.e. it will mean that sharding exchange name appeared
/// to be the same as some other exchange (which purpose is not for sharding). So probably actual error reason: queue_base parameter
/// is bad.
throw Exception(
ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE,
"Unable to declare sharding exchange ({}). Reason: {}", sharding_exchange, std::string(message));
});
setup_channel->bindExchange(bridge_exchange, sharding_exchange, routing_keys[0])
rabbit_channel.bindExchange(bridge_exchange, sharding_exchange, routing_keys[0])
.onError([&](const char * message)
{
throw Exception(
@ -344,9 +357,8 @@ void StorageRabbitMQ::initExchange()
}
void StorageRabbitMQ::bindExchange()
void StorageRabbitMQ::bindExchange(AMQP::TcpChannel & rabbit_channel)
{
std::atomic<bool> binding_created = false;
size_t bound_keys = 0;
if (exchange_type == AMQP::ExchangeType::headers)
@ -359,8 +371,8 @@ void StorageRabbitMQ::bindExchange()
bind_headers[matching[0]] = matching[1];
}
setup_channel->bindExchange(exchange_name, bridge_exchange, routing_keys[0], bind_headers)
.onSuccess([&]() { binding_created = true; })
rabbit_channel.bindExchange(exchange_name, bridge_exchange, routing_keys[0], bind_headers)
.onSuccess([&]() { event_handler->stopLoop(); })
.onError([&](const char * message)
{
throw Exception(
@ -371,8 +383,8 @@ void StorageRabbitMQ::bindExchange()
}
else if (exchange_type == AMQP::ExchangeType::fanout || exchange_type == AMQP::ExchangeType::consistent_hash)
{
setup_channel->bindExchange(exchange_name, bridge_exchange, routing_keys[0])
.onSuccess([&]() { binding_created = true; })
rabbit_channel.bindExchange(exchange_name, bridge_exchange, routing_keys[0])
.onSuccess([&]() { event_handler->stopLoop(); })
.onError([&](const char * message)
{
throw Exception(
@ -385,12 +397,12 @@ void StorageRabbitMQ::bindExchange()
{
for (const auto & routing_key : routing_keys)
{
setup_channel->bindExchange(exchange_name, bridge_exchange, routing_key)
rabbit_channel.bindExchange(exchange_name, bridge_exchange, routing_key)
.onSuccess([&]()
{
++bound_keys;
if (bound_keys == routing_keys.size())
binding_created = true;
event_handler->stopLoop();
})
.onError([&](const char * message)
{
@ -402,17 +414,12 @@ void StorageRabbitMQ::bindExchange()
}
}
while (!binding_created) //-V776
{
event_handler->iterateLoop();
}
event_handler->startBlockingLoop();
}
void StorageRabbitMQ::bindQueue(size_t queue_id)
void StorageRabbitMQ::bindQueue(size_t queue_id, AMQP::TcpChannel & rabbit_channel)
{
std::atomic<bool> binding_created = false;
auto success_callback = [&](const std::string & queue_name, int msgcount, int /* consumercount */)
{
queues.emplace_back(queue_name);
@ -425,8 +432,8 @@ void StorageRabbitMQ::bindQueue(size_t queue_id)
* done between client's exchange and local bridge exchange. Binding key must be a string integer in case of hash exchange, for
* fanout exchange it can be arbitrary
*/
setup_channel->bindQueue(consumer_exchange, queue_name, std::to_string(queue_id))
.onSuccess([&] { binding_created = true; })
rabbit_channel.bindQueue(consumer_exchange, queue_name, std::to_string(queue_id))
.onSuccess([&] { event_handler->stopLoop(); })
.onError([&](const char * message)
{
throw Exception(
@ -450,23 +457,49 @@ void StorageRabbitMQ::bindQueue(size_t queue_id)
AMQP::Table queue_settings;
queue_settings["x-max-length"] = queue_size;
std::unordered_set<String> integer_settings = {"x-max-length", "x-max-length-bytes", "x-message-ttl", "x-expires", "x-priority", "x-max-priority"};
std::unordered_set<String> string_settings = {"x-overflow", "x-dead-letter-exchange", "x-queue-type"};
if (!deadletter_exchange.empty())
queue_settings["x-dead-letter-exchange"] = deadletter_exchange;
else
queue_settings["x-overflow"] = "reject-publish";
/* The first option not just simplifies queue_name, but also implements the possibility to be able to resume reading from one
* specific queue when its name is specified in queue_base setting
*/
const String queue_name = !hash_exchange ? queue_base : std::to_string(queue_id) + "_" + queue_base;
setup_channel->declareQueue(queue_name, AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback);
while (!binding_created) //-V776
/// Check user-defined settings.
if (!queue_settings_list.empty())
{
event_handler->iterateLoop();
for (const auto & setting : queue_settings_list)
{
Strings setting_values;
splitInto<'='>(setting_values, setting);
if (setting_values.size() != 2)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid settings string: {}", setting);
String key = setting_values[0], value = setting_values[1];
if (integer_settings.contains(key))
queue_settings[key] = parse<uint64_t>(value);
else if (string_settings.find(key) != string_settings.end())
queue_settings[key] = value;
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported queue setting: {}", value);
}
}
/// Impose default settings if there are no user-defined settings.
if (!queue_settings.contains("x-max-length"))
{
queue_settings["x-max-length"] = queue_size;
}
if (!queue_settings.contains("x-overflow"))
{
queue_settings["x-overflow"] = "reject-publish";
}
/// If queue_base - a single name, then it can be used as one specific queue, from which to read.
/// Otherwise it is used as a generator (unique for current table) of queue names, because it allows to
/// maximize performance - via setting `rabbitmq_num_queues`.
const String queue_name = !hash_exchange ? queue_base : std::to_string(queue_id) + "_" + queue_base;
/// AMQP::autodelete setting is not allowed, because in case of server restart there will be no consumers
/// and deleting queues should not take place.
rabbit_channel.declareQueue(queue_name, AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback);
event_handler->startBlockingLoop();
}
@ -538,8 +571,8 @@ void StorageRabbitMQ::unbindExchange()
event_handler->updateLoopState(Loop::STOP);
looping_task->deactivate();
setup_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
setup_channel->removeExchange(bridge_exchange)
AMQP::TcpChannel rabbit_channel(connection.get());
rabbit_channel.removeExchange(bridge_exchange)
.onSuccess([&]()
{
exchange_removed.store(true);
@ -553,8 +586,7 @@ void StorageRabbitMQ::unbindExchange()
{
event_handler->iterateLoop();
}
setup_channel->close();
rabbit_channel.close();
});
}
@ -645,12 +677,27 @@ void StorageRabbitMQ::shutdown()
stream_cancelled = true;
wait_confirm = false;
deactivateTask(streaming_task, true, false);
deactivateTask(looping_task, true, true);
/// In case it has not yet been able to setup connection;
deactivateTask(connection_task, true, false);
/// The order of deactivating tasks is important: wait for streamingToViews() func to finish and
/// then wait for background event loop to finish.
deactivateTask(streaming_task, true, false);
deactivateTask(looping_task, true, true);
if (drop_table)
{
for (auto & buffer : buffers)
buffer->closeChannel();
cleanupRabbitMQ();
}
/// It is important to close connection here - before removing consumer buffers, because
/// it will finish and clean callbacks, which might use those buffers data.
connection->close();
/// Connection is not closed immediately - it requires the loop to shutdown it properly and to
/// finish all callbacks.
size_t cnt_retries = 0;
while (!connection->closed() && cnt_retries++ != RETRIES_MAX)
event_handler->iterateLoop();
@ -664,6 +711,40 @@ void StorageRabbitMQ::shutdown()
}
/// The only thing publishers are supposed to be aware of is _exchanges_ and queues are a responsibility of a consumer.
/// Therefore, if a table is dropped, a clean up is needed.
void StorageRabbitMQ::cleanupRabbitMQ() const
{
if (use_user_setup)
return;
AMQP::TcpChannel rabbit_channel(connection.get());
for (const auto & queue : queues)
{
/// AMQP::ifunused is needed, because it is possible to share queues between multiple tables and dropping
/// on of them should not affect others.
/// AMQP::ifempty is not used on purpose.
rabbit_channel.removeQueue(queue, AMQP::ifunused)
.onSuccess([&](uint32_t num_messages)
{
LOG_TRACE(log, "Successfully deleted queue {}, messages contained {}", queue, num_messages);
event_handler->stopLoop();
})
.onError([&](const char * message)
{
LOG_ERROR(log, "Failed to delete queue {}. Error message: {}", queue, message);
event_handler->stopLoop();
});
}
event_handler->startBlockingLoop();
rabbit_channel.close();
/// Also there is no need to cleanup exchanges as they were created with AMQP::autodelete option. Once queues
/// are removed, exchanges will also be cleaned.
}
void StorageRabbitMQ::pushReadBuffer(ConsumerBufferPtr buffer)
{
std::lock_guard lock(buffers_mutex);
@ -957,7 +1038,7 @@ void registerStorageRabbitMQ(StorageFactory & factory)
// Check arguments and settings
#define CHECK_RABBITMQ_STORAGE_ARGUMENT(ARG_NUM, ARG_NAME) \
/* One of the three required arguments is not specified */ \
if (args_count < (ARG_NUM) && (ARG_NUM) <= 3 && !rabbitmq_settings->ARG_NAME.changed) \
if (args_count < (ARG_NUM) && (ARG_NUM) <= 2 && !rabbitmq_settings->ARG_NAME.changed) \
{ \
throw Exception("Required parameter '" #ARG_NAME "' for storage RabbitMQ not specified", \
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); \
@ -972,9 +1053,8 @@ void registerStorageRabbitMQ(StorageFactory & factory)
}
CHECK_RABBITMQ_STORAGE_ARGUMENT(1, rabbitmq_host_port)
CHECK_RABBITMQ_STORAGE_ARGUMENT(2, rabbitmq_exchange_name)
CHECK_RABBITMQ_STORAGE_ARGUMENT(3, rabbitmq_format)
CHECK_RABBITMQ_STORAGE_ARGUMENT(2, rabbitmq_format)
CHECK_RABBITMQ_STORAGE_ARGUMENT(3, rabbitmq_exchange_name)
CHECK_RABBITMQ_STORAGE_ARGUMENT(4, rabbitmq_exchange_type)
CHECK_RABBITMQ_STORAGE_ARGUMENT(5, rabbitmq_routing_key_list)
CHECK_RABBITMQ_STORAGE_ARGUMENT(6, rabbitmq_row_delimiter)
@ -982,14 +1062,13 @@ void registerStorageRabbitMQ(StorageFactory & factory)
CHECK_RABBITMQ_STORAGE_ARGUMENT(8, rabbitmq_num_consumers)
CHECK_RABBITMQ_STORAGE_ARGUMENT(9, rabbitmq_num_queues)
CHECK_RABBITMQ_STORAGE_ARGUMENT(10, rabbitmq_queue_base)
CHECK_RABBITMQ_STORAGE_ARGUMENT(11, rabbitmq_deadletter_exchange)
CHECK_RABBITMQ_STORAGE_ARGUMENT(12, rabbitmq_persistent)
CHECK_RABBITMQ_STORAGE_ARGUMENT(13, rabbitmq_skip_broken_messages)
CHECK_RABBITMQ_STORAGE_ARGUMENT(14, rabbitmq_max_block_size)
CHECK_RABBITMQ_STORAGE_ARGUMENT(15, rabbitmq_flush_interval_ms)
CHECK_RABBITMQ_STORAGE_ARGUMENT(16, rabbitmq_vhost)
CHECK_RABBITMQ_STORAGE_ARGUMENT(11, rabbitmq_persistent)
CHECK_RABBITMQ_STORAGE_ARGUMENT(12, rabbitmq_skip_broken_messages)
CHECK_RABBITMQ_STORAGE_ARGUMENT(13, rabbitmq_max_block_size)
CHECK_RABBITMQ_STORAGE_ARGUMENT(14, rabbitmq_flush_interval_ms)
CHECK_RABBITMQ_STORAGE_ARGUMENT(15, rabbitmq_vhost)
CHECK_RABBITMQ_STORAGE_ARGUMENT(16, rabbitmq_queue_settings_list)
CHECK_RABBITMQ_STORAGE_ARGUMENT(17, rabbitmq_queue_consume)
#undef CHECK_RABBITMQ_STORAGE_ARGUMENT

View File

@ -33,6 +33,13 @@ public:
void startup() override;
void shutdown() override;
/// This is a bad way to let storage know in shutdown() that table is going to be dropped. There are some actions which need
/// to be done only when table is dropped (not when detached). Also connection must be closed only in shutdown, but those
/// actions require an open connection. Therefore there needs to be a way inside shutdown() method to know whether it is called
/// because of drop query. And drop() method is not suitable at all, because it will not only require to reopen connection, but also
/// it can be called considerable time after table is dropped (for example, in case of Atomic database), which is not appropriate for the case.
void checkTableCanBeDropped() const override { drop_table = true; }
/// Always return virtual columns in addition to required columns
Pipe read(
const Names & column_names,
@ -84,9 +91,16 @@ private:
size_t num_consumers;
size_t num_queues;
String queue_base;
const String deadletter_exchange;
Names queue_settings_list;
/// For insert query. Mark messages as durable.
const bool persistent;
/// A table setting. It is possible not to perform any RabbitMQ setup, which is supposed to be consumer-side setup:
/// declaring exchanges, queues, bindings. Instead everything needed from RabbitMQ table is to connect to a specific queue.
/// This solution disables all optimizations and is not really optimal, but allows user to fully control all RabbitMQ setup.
bool use_user_setup;
bool hash_exchange;
Poco::Logger * log;
String address;
@ -108,12 +122,12 @@ private:
/// maximum number of messages in RabbitMQ queue (x-max-length). Also used
/// to setup size of inner buffer for received messages
uint32_t queue_size;
String sharding_exchange, bridge_exchange, consumer_exchange;
size_t consumer_id = 0; /// counter for consumer buffer, needed for channel id
std::atomic<size_t> producer_id = 1; /// counter for producer buffer, needed for channel id
std::atomic<bool> wait_confirm = true; /// needed to break waiting for confirmations for producer
std::atomic<bool> exchange_removed = false, rabbit_is_ready = false;
ChannelPtr setup_channel;
std::vector<String> queues;
std::once_flag flag; /// remove exchange only once
@ -124,6 +138,7 @@ private:
std::atomic<bool> stream_cancelled{false};
size_t read_attempts = 0;
mutable bool drop_table = false;
ConsumerBufferPtr createReadBuffer();
@ -132,7 +147,7 @@ private:
void loopingFunc();
void connectionFunc();
static Names parseRoutingKeys(String routing_key_list);
static Names parseSettings(String settings_list);
static AMQP::ExchangeType defineExchangeType(String exchange_type_);
static String getTableBasedName(String name, const StorageID & table_id);
@ -141,9 +156,11 @@ private:
void deactivateTask(BackgroundSchedulePool::TaskHolder & task, bool wait, bool stop_loop);
void initRabbitMQ();
void initExchange();
void bindExchange();
void bindQueue(size_t queue_id);
void cleanupRabbitMQ() const;
void initExchange(AMQP::TcpChannel & rabbit_channel);
void bindExchange(AMQP::TcpChannel & rabbit_channel);
void bindQueue(size_t queue_id, AMQP::TcpChannel & rabbit_channel);
bool restoreConnection(bool reconnecting);
bool streamToViews();

View File

@ -26,24 +26,13 @@ rabbitmq_id = ''
# Helpers
def check_rabbitmq_is_available():
p = subprocess.Popen(('docker',
'exec',
'-i',
rabbitmq_id,
'rabbitmqctl',
'await_startup'),
stdout=subprocess.PIPE)
p = subprocess.Popen(('docker', 'exec', '-i', rabbitmq_id, 'rabbitmqctl', 'await_startup'), stdout=subprocess.PIPE)
p.communicate()
return p.returncode == 0
def enable_consistent_hash_plugin():
p = subprocess.Popen(('docker',
'exec',
'-i',
rabbitmq_id,
"rabbitmq-plugins", "enable", "rabbitmq_consistent_hash_exchange"),
stdout=subprocess.PIPE)
p = subprocess.Popen(('docker', 'exec', '-i', rabbitmq_id, "rabbitmq-plugins", "enable", "rabbitmq_consistent_hash_exchange"), stdout=subprocess.PIPE)
p.communicate()
return p.returncode == 0
@ -1834,23 +1823,12 @@ def test_rabbitmq_commit_on_block_write(rabbitmq_cluster):
cancel.set()
instance.query('''
DROP TABLE test.rabbitmq;
''')
instance.query('DETACH TABLE test.rabbitmq;')
while int(instance.query("SELECT count() FROM system.tables WHERE database='test' AND name='rabbitmq'")) == 1:
time.sleep(1)
instance.query('''
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'block',
rabbitmq_format = 'JSONEachRow',
rabbitmq_max_block_size = 100,
rabbitmq_queue_base = 'block',
rabbitmq_row_delimiter = '\\n';
''')
instance.query('ATTACH TABLE test.rabbitmq;')
while int(instance.query('SELECT uniqExact(key) FROM test.view')) < i[0]:
time.sleep(1)
@ -1994,6 +1972,141 @@ def test_rabbitmq_vhost(rabbitmq_cluster):
break
@pytest.mark.timeout(120)
def test_rabbitmq_drop_table_properly(rabbitmq_cluster):
instance.query('''
CREATE TABLE test.rabbitmq_drop (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'drop',
rabbitmq_format = 'JSONEachRow',
rabbitmq_queue_base = 'rabbit_queue_drop'
''')
credentials = pika.PlainCredentials('root', 'clickhouse')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.basic_publish(exchange='drop', routing_key='', body=json.dumps({'key': 1, 'value': 2}))
while True:
result = instance.query('SELECT * FROM test.rabbitmq_drop ORDER BY key', ignore_error=True)
if result == "1\t2\n":
break
exists = channel.queue_declare(queue='rabbit_queue_drop', passive=True)
assert(exists)
instance.query("DROP TABLE test.rabbitmq_drop")
time.sleep(30)
try:
exists = channel.queue_declare(callback, queue='rabbit_queue_drop', passive=True)
except Exception as e:
exists = False
assert(not exists)
@pytest.mark.timeout(120)
def test_rabbitmq_queue_settings(rabbitmq_cluster):
instance.query('''
CREATE TABLE test.rabbitmq_settings (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'rabbit_exchange',
rabbitmq_format = 'JSONEachRow',
rabbitmq_queue_base = 'rabbit_queue_settings',
rabbitmq_queue_settings_list = 'x-max-length=10,x-overflow=reject-publish'
''')
credentials = pika.PlainCredentials('root', 'clickhouse')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
for i in range(50):
channel.basic_publish(exchange='rabbit_exchange', routing_key='', body=json.dumps({'key': 1, 'value': 2}))
connection.close()
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.rabbitmq_settings;
''')
time.sleep(5)
result = instance.query('SELECT count() FROM test.rabbitmq_settings', ignore_error=True)
while int(result) != 10:
time.sleep(0.5)
result = instance.query('SELECT count() FROM test.view', ignore_error=True)
instance.query('DROP TABLE test.rabbitmq_settings')
# queue size is 10, but 50 messages were sent, they will be dropped (setting x-overflow = reject-publish) and only 10 will remain.
assert(int(result) == 10)
@pytest.mark.timeout(120)
def test_rabbitmq_queue_consume(rabbitmq_cluster):
credentials = pika.PlainCredentials('root', 'clickhouse')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue='rabbit_queue', durable=True)
i = [0]
messages_num = 1000
def produce():
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
messages = []
for _ in range(messages_num):
message = json.dumps({'key': i[0], 'value': i[0]})
channel.basic_publish(exchange='', routing_key='rabbit_queue', body=message)
i[0] += 1
threads = []
threads_num = 10
for _ in range(threads_num):
threads.append(threading.Thread(target=produce))
for thread in threads:
time.sleep(random.uniform(0, 1))
thread.start()
instance.query('''
CREATE TABLE test.rabbitmq_queue (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_format = 'JSONEachRow',
rabbitmq_queue_base = 'rabbit_queue',
rabbitmq_queue_consume = 1;
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.rabbitmq_queue;
''')
result = ''
while True:
result = instance.query('SELECT count() FROM test.view')
if int(result) == messages_num * threads_num:
break
time.sleep(1)
for thread in threads:
thread.join()
instance.query('DROP TABLE test.rabbitmq_queue')
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")