ClickHouse/src/Storages/RabbitMQ/StorageRabbitMQ.cpp

1218 lines
45 KiB
C++
Raw Normal View History

#include <amqpcpp.h>
2020-05-20 05:30:38 +00:00
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Context.h>
2020-05-20 05:30:38 +00:00
#include <Interpreters/InterpreterInsertQuery.h>
2022-05-20 19:49:31 +00:00
#include <Interpreters/InterpreterSelectQuery.h>
2020-05-20 05:30:38 +00:00
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTInsertQuery.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/Transforms/ExpressionTransform.h>
2022-05-20 19:49:31 +00:00
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <QueryPipeline/Pipe.h>
#include <Storages/MessageQueueSink.h>
#include <Storages/RabbitMQ/RabbitMQHandler.h>
#include <Storages/RabbitMQ/RabbitMQSource.h>
#include <Storages/RabbitMQ/StorageRabbitMQ.h>
#include <Storages/RabbitMQ/RabbitMQProducer.h>
#include <Storages/ExternalDataSourceConfiguration.h>
2020-05-20 05:30:38 +00:00
#include <Storages/StorageFactory.h>
#include <Storages/StorageMaterializedView.h>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <Common/Exception.h>
#include <Common/Macros.h>
#include <Common/parseAddress.h>
#include <Common/quoteString.h>
2020-05-20 05:30:38 +00:00
#include <Common/setThreadName.h>
2022-04-27 15:05:45 +00:00
#include <Common/logger_useful.h>
2020-05-20 05:30:38 +00:00
2020-05-31 09:34:57 +00:00
namespace DB
{
2020-09-01 14:11:34 +00:00
static const uint32_t QUEUE_SIZE = 100000;
2020-10-25 07:44:04 +00:00
static const auto MAX_FAILED_READ_ATTEMPTS = 10;
2020-10-27 07:20:19 +00:00
static const auto RESCHEDULE_MS = 500;
static const auto BACKOFF_TRESHOLD = 32000;
2020-10-25 07:44:04 +00:00
static const auto MAX_THREAD_WORK_DURATION_MS = 60000;
2020-05-20 05:30:38 +00:00
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
2020-08-31 09:12:36 +00:00
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
2020-12-04 14:50:42 +00:00
extern const int CANNOT_CONNECT_RABBITMQ;
2020-09-07 11:08:53 +00:00
extern const int CANNOT_BIND_RABBITMQ_EXCHANGE;
extern const int CANNOT_DECLARE_RABBITMQ_EXCHANGE;
2020-09-07 10:21:29 +00:00
extern const int CANNOT_REMOVE_RABBITMQ_EXCHANGE;
2020-10-27 07:14:38 +00:00
extern const int CANNOT_CREATE_RABBITMQ_QUEUE_BINDING;
2021-11-29 13:13:20 +00:00
extern const int QUERY_NOT_ALLOWED;
2020-05-20 05:30:38 +00:00
}
namespace ExchangeType
{
/// Note that default here means default by implementation and not by rabbitmq settings
static const String DEFAULT = "default";
static const String FANOUT = "fanout";
static const String DIRECT = "direct";
static const String TOPIC = "topic";
static const String HASH = "consistent_hash";
static const String HEADERS = "headers";
}
2020-08-31 09:12:36 +00:00
2020-05-20 05:30:38 +00:00
StorageRabbitMQ::StorageRabbitMQ(
const StorageID & table_id_,
ContextPtr context_,
2020-05-20 05:30:38 +00:00
const ColumnsDescription & columns_,
2021-09-16 10:46:43 +00:00
std::unique_ptr<RabbitMQSettings> rabbitmq_settings_,
bool is_attach_)
2020-05-20 05:30:38 +00:00
: IStorage(table_id_)
, WithContext(context_->getGlobalContext())
2020-08-31 09:12:36 +00:00
, rabbitmq_settings(std::move(rabbitmq_settings_))
2021-09-12 13:42:03 +00:00
, exchange_name(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_exchange_name))
, format_name(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_format))
, exchange_type(defineExchangeType(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_exchange_type)))
, routing_keys(parseSettings(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_routing_key_list)))
, schema_name(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_schema))
2020-08-31 09:12:36 +00:00
, num_consumers(rabbitmq_settings->rabbitmq_num_consumers.value)
, num_queues(rabbitmq_settings->rabbitmq_num_queues.value)
2021-09-12 13:42:03 +00:00
, queue_base(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_queue_base))
, queue_settings_list(parseSettings(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_queue_settings_list)))
, max_rows_per_message(rabbitmq_settings->rabbitmq_max_rows_per_message)
2020-08-31 09:12:36 +00:00
, persistent(rabbitmq_settings->rabbitmq_persistent.value)
, use_user_setup(rabbitmq_settings->rabbitmq_queue_consume.value)
2020-08-31 09:12:36 +00:00
, hash_exchange(num_consumers > 1 || num_queues > 1)
2020-06-01 20:48:24 +00:00
, log(&Poco::Logger::get("StorageRabbitMQ (" + table_id_.table_name + ")"))
, semaphore(0, static_cast<int>(num_consumers))
2020-08-31 09:12:36 +00:00
, unique_strbase(getRandomName())
2020-09-01 14:11:34 +00:00
, queue_size(std::max(QUEUE_SIZE, static_cast<uint32_t>(getMaxBlockSize())))
, milliseconds_to_wait(RESCHEDULE_MS)
2021-09-16 10:46:43 +00:00
, is_attach(is_attach_)
2020-05-20 05:30:38 +00:00
{
2022-06-24 00:07:16 +00:00
const auto & config = getContext()->getConfigRef();
std::pair<String, UInt16> parsed_address;
auto setting_rabbitmq_username = rabbitmq_settings->rabbitmq_username.value;
auto setting_rabbitmq_password = rabbitmq_settings->rabbitmq_password.value;
String username, password;
if (rabbitmq_settings->rabbitmq_host_port.changed)
{
username = setting_rabbitmq_username.empty() ? config.getString("rabbitmq.username", "") : setting_rabbitmq_username;
password = setting_rabbitmq_password.empty() ? config.getString("rabbitmq.password", "") : setting_rabbitmq_password;
if (username.empty() || password.empty())
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"No username or password. They can be specified either in config or in storage settings");
parsed_address = parseAddress(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_host_port), 5672);
if (parsed_address.first.empty())
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Host or port is incorrect (host: {}, port: {})", parsed_address.first, parsed_address.second);
context_->getRemoteHostFilter().checkHostAndPort(parsed_address.first, toString(parsed_address.second));
}
else if (!rabbitmq_settings->rabbitmq_address.changed)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "RabbitMQ requires either `rabbitmq_host_port` or `rabbitmq_address` setting");
2022-03-17 10:48:42 +00:00
2021-09-10 10:28:09 +00:00
configuration =
{
.host = parsed_address.first,
.port = parsed_address.second,
2022-06-24 00:07:16 +00:00
.username = username,
.password = password,
.vhost = config.getString("rabbitmq.vhost", getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_vhost)),
2021-09-10 10:28:09 +00:00
.secure = rabbitmq_settings->rabbitmq_secure.value,
2021-09-15 09:11:57 +00:00
.connection_string = getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_address)
2021-09-10 10:28:09 +00:00
};
if (configuration.secure)
2021-08-30 17:55:23 +00:00
SSL_library_init();
2020-06-24 17:32:57 +00:00
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
setInMemoryMetadata(storage_metadata);
rabbitmq_context = addSettings(getContext());
rabbitmq_context->makeQueryContext();
if (queue_base.empty())
{
2020-08-15 06:50:53 +00:00
/* Make sure that local exchange name is unique for each table and is not the same as client's exchange name. It also needs to
2020-08-31 09:12:36 +00:00
* be table-based and not just a random string, because local exchanges should be declared the same for same tables
2020-08-15 06:50:53 +00:00
*/
2020-08-31 09:12:36 +00:00
sharding_exchange = getTableBasedName(exchange_name, table_id_);
/* By default without a specified queue name in queue's declaration - its name will be generated by the library, but its better
* to specify it unique for each table to reuse them once the table is recreated. So it means that queues remain the same for every
* table unless queue_base table setting is specified (which allows to register consumers to specific queues). Now this is a base
2020-08-28 08:52:02 +00:00
* for the names of later declared queues
*/
2020-08-31 09:12:36 +00:00
queue_base = getTableBasedName("", table_id_);
}
else
{
/* In case different tables are used to register multiple consumers to the same queues (so queues are shared between tables) and
2020-08-26 08:54:29 +00:00
* at the same time sharding exchange is needed (if there are multiple shared queues), then those tables also need to share
2020-08-28 08:52:02 +00:00
* sharding exchange and bridge exchange
*/
2020-08-08 16:45:52 +00:00
sharding_exchange = exchange_name + "_" + queue_base;
}
bridge_exchange = sharding_exchange + "_bridge";
2021-09-16 10:46:43 +00:00
try
{
connection = std::make_unique<RabbitMQConnection>(configuration, log);
if (connection->connect())
initRabbitMQ();
else if (!is_attach)
throw Exception(ErrorCodes::CANNOT_CONNECT_RABBITMQ, "Cannot connect to {}", connection->connectionInfoForLog());
}
catch (...)
{
tryLogCurrentException(log);
if (!is_attach)
throw;
}
/// One looping task for all consumers as they share the same connection == the same handler == the same event loop
looping_task = getContext()->getMessageBrokerSchedulePool().createTask("RabbitMQLoopingTask", [this]{ loopingFunc(); });
looping_task->deactivate();
streaming_task = getContext()->getMessageBrokerSchedulePool().createTask("RabbitMQStreamingTask", [this]{ streamingToViewsFunc(); });
streaming_task->deactivate();
connection_task = getContext()->getMessageBrokerSchedulePool().createTask("RabbitMQConnectionTask", [this]{ connectionFunc(); });
connection_task->deactivate();
2020-08-31 09:12:36 +00:00
}
2020-08-31 09:12:36 +00:00
Names StorageRabbitMQ::parseSettings(String settings_list)
2020-08-31 09:12:36 +00:00
{
Names result;
if (settings_list.empty())
return result;
boost::split(result, settings_list, [](char c){ return c == ','; });
2020-08-31 09:12:36 +00:00
for (String & key : result)
boost::trim(key);
return result;
}
AMQP::ExchangeType StorageRabbitMQ::defineExchangeType(String exchange_type_)
{
AMQP::ExchangeType type;
if (exchange_type_ != ExchangeType::DEFAULT)
{
if (exchange_type_ == ExchangeType::FANOUT) type = AMQP::ExchangeType::fanout;
else if (exchange_type_ == ExchangeType::DIRECT) type = AMQP::ExchangeType::direct;
else if (exchange_type_ == ExchangeType::TOPIC) type = AMQP::ExchangeType::topic;
else if (exchange_type_ == ExchangeType::HASH) type = AMQP::ExchangeType::consistent_hash;
else if (exchange_type_ == ExchangeType::HEADERS) type = AMQP::ExchangeType::headers;
else throw Exception("Invalid exchange type", ErrorCodes::BAD_ARGUMENTS);
}
else
{
type = AMQP::ExchangeType::fanout;
}
return type;
}
String StorageRabbitMQ::getTableBasedName(String name, const StorageID & table_id)
{
if (name.empty())
2020-11-10 18:22:26 +00:00
return fmt::format("{}_{}", table_id.database_name, table_id.table_name);
2020-08-31 09:12:36 +00:00
else
2020-11-10 18:22:26 +00:00
return fmt::format("{}_{}_{}", name, table_id.database_name, table_id.table_name);
}
ContextMutablePtr StorageRabbitMQ::addSettings(ContextPtr local_context) const
2020-08-31 16:34:16 +00:00
{
auto modified_context = Context::createCopy(local_context);
modified_context->setSetting("input_format_skip_unknown_fields", true);
2020-11-24 16:24:36 +00:00
modified_context->setSetting("input_format_allow_errors_ratio", 0.);
modified_context->setSetting("input_format_allow_errors_num", rabbitmq_settings->rabbitmq_skip_broken_messages.value);
/// Since we are reusing the same context for all queries executed simultaneously, we don't want to used shared `analyze_count`
modified_context->setSetting("max_analyze_depth", Field{0});
2020-08-31 16:34:16 +00:00
if (!schema_name.empty())
2020-11-24 16:24:36 +00:00
modified_context->setSetting("format_schema", schema_name);
2020-08-31 16:34:16 +00:00
2021-02-15 21:56:51 +00:00
for (const auto & setting : *rabbitmq_settings)
{
const auto & setting_name = setting.getName();
/// check for non-rabbitmq-related settings
if (!setting_name.starts_with("rabbitmq_"))
modified_context->setSetting(setting_name, setting.getValue());
}
2020-11-24 16:24:36 +00:00
return modified_context;
2020-08-31 16:34:16 +00:00
}
void StorageRabbitMQ::loopingFunc()
{
2021-11-06 19:02:01 +00:00
connection->getHandler().startLoop();
}
void StorageRabbitMQ::stopLoop()
{
connection->getHandler().updateLoopState(Loop::STOP);
}
void StorageRabbitMQ::stopLoopIfNoReaders()
{
/// Stop the loop if no select was started.
/// There can be a case that selects are finished
/// but not all sources decremented the counter, then
/// it is ok that the loop is not stopped, because
/// there is a background task (streaming_task), which
/// also checks whether there is an idle loop.
std::lock_guard lock(loop_mutex);
if (readers_count)
2021-09-12 13:55:37 +00:00
return;
2021-11-06 19:02:01 +00:00
connection->getHandler().updateLoopState(Loop::STOP);
}
2021-11-02 19:47:27 +00:00
2021-11-06 19:02:01 +00:00
void StorageRabbitMQ::startLoop()
{
assert(rabbit_is_ready);
connection->getHandler().updateLoopState(Loop::RUN);
looping_task->activateAndSchedule();
}
void StorageRabbitMQ::incrementReader()
{
++readers_count;
}
void StorageRabbitMQ::decrementReader()
{
--readers_count;
}
2020-12-02 01:17:50 +00:00
void StorageRabbitMQ::connectionFunc()
{
2021-09-16 10:46:43 +00:00
if (rabbit_is_ready)
return;
2021-09-10 10:28:09 +00:00
if (connection->reconnect())
2020-12-02 01:17:50 +00:00
initRabbitMQ();
else
connection_task->scheduleAfter(RESCHEDULE_MS);
}
2020-08-31 10:00:28 +00:00
/* Need to deactivate this way because otherwise might get a deadlock when first deactivate streaming task in shutdown and then
* inside streaming task try to deactivate any other task
*/
void StorageRabbitMQ::deactivateTask(BackgroundSchedulePool::TaskHolder & task, bool wait, bool stop_loop)
{
2020-08-31 16:34:16 +00:00
if (stop_loop)
2021-11-06 19:02:01 +00:00
stopLoop();
2020-08-31 16:34:16 +00:00
2020-09-07 11:08:53 +00:00
std::unique_lock<std::mutex> lock(task_mutex, std::defer_lock);
if (lock.try_lock())
2020-08-31 10:00:28 +00:00
{
task->deactivate();
2020-09-07 11:08:53 +00:00
lock.unlock();
2020-08-31 10:00:28 +00:00
}
2020-09-03 06:14:13 +00:00
else if (wait) /// Wait only if deactivating from shutdown
2020-08-31 10:00:28 +00:00
{
2020-09-07 11:08:53 +00:00
lock.lock();
2020-08-31 10:00:28 +00:00
task->deactivate();
}
}
2020-09-07 11:08:53 +00:00
size_t StorageRabbitMQ::getMaxBlockSize() const
{
2020-08-31 16:34:16 +00:00
return rabbitmq_settings->rabbitmq_max_block_size.changed
? rabbitmq_settings->rabbitmq_max_block_size.value
: (getContext()->getSettingsRef().max_insert_block_size.value / num_consumers);
}
2020-08-31 16:34:16 +00:00
2020-12-02 01:17:50 +00:00
void StorageRabbitMQ::initRabbitMQ()
{
2021-11-02 19:47:27 +00:00
if (shutdown_called || rabbit_is_ready)
return;
if (use_user_setup)
{
queues.emplace_back(queue_base);
rabbit_is_ready = true;
return;
}
2021-09-16 10:46:43 +00:00
try
{
auto rabbit_channel = connection->createChannel();
2020-12-02 01:17:50 +00:00
2021-09-16 10:46:43 +00:00
/// Main exchange -> Bridge exchange -> ( Sharding exchange ) -> Queues -> Consumers
2021-05-01 18:00:43 +00:00
2021-09-16 10:46:43 +00:00
initExchange(*rabbit_channel);
bindExchange(*rabbit_channel);
2020-12-02 01:17:50 +00:00
2021-09-16 10:46:43 +00:00
for (const auto i : collections::range(0, num_queues))
bindQueue(i + 1, *rabbit_channel);
2020-12-02 01:17:50 +00:00
2021-09-16 10:46:43 +00:00
LOG_TRACE(log, "RabbitMQ setup completed");
rabbit_is_ready = true;
rabbit_channel->close();
}
catch (...)
{
tryLogCurrentException(log);
if (!is_attach)
throw;
}
2020-12-02 01:17:50 +00:00
}
2021-05-13 09:39:57 +00:00
void StorageRabbitMQ::initExchange(AMQP::TcpChannel & rabbit_channel)
{
2021-05-01 18:00:43 +00:00
/// Exchange hierarchy:
/// 1. Main exchange (defined with table settings - rabbitmq_exchange_name, rabbitmq_exchange_type).
/// 2. Bridge exchange (fanout). Used to easily disconnect main exchange and to simplify queue bindings.
/// 3. Sharding (or hash) exchange. Used in case of multiple queues.
/// 4. Consumer exchange. Just an alias for bridge_exchange or sharding exchange to know to what exchange
/// queues will be bound.
/// All exchanges are declared with options:
/// 1. `durable` (survive RabbitMQ server restart)
/// 2. `autodelete` (auto delete in case of queue bindings are dropped).
rabbit_channel.declareExchange(exchange_name, exchange_type, AMQP::durable)
.onError([&](const char * message)
{
2021-05-01 18:00:43 +00:00
/// This error can be a result of attempt to declare exchange if it was already declared but
/// 1) with different exchange type.
/// 2) with different exchange settings.
2020-09-07 11:08:53 +00:00
throw Exception("Unable to declare exchange. Make sure specified exchange is not already declared. Error: "
+ std::string(message), ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE);
});
2021-05-01 18:00:43 +00:00
rabbit_channel.declareExchange(bridge_exchange, AMQP::fanout, AMQP::durable | AMQP::autodelete)
.onError([&](const char * message)
{
2021-05-01 18:00:43 +00:00
/// This error is not supposed to happen as this exchange name is always unique to type and its settings.
2020-09-07 11:08:53 +00:00
throw Exception(
ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE, "Unable to declare bridge exchange ({}). Reason: {}", bridge_exchange, std::string(message));
});
if (!hash_exchange)
{
consumer_exchange = bridge_exchange;
return;
}
AMQP::Table binding_arguments;
2021-05-01 18:00:43 +00:00
/// Default routing key property in case of hash exchange is a routing key, which is required to be an integer.
2021-05-05 07:52:21 +00:00
/// Support for arbitrary exchange type (i.e. arbitrary pattern of routing keys) requires to eliminate this dependency.
/// This settings changes hash property to message_id.
binding_arguments["hash-property"] = "message_id";
2021-05-01 18:00:43 +00:00
/// Declare hash exchange for sharding.
rabbit_channel.declareExchange(sharding_exchange, AMQP::consistent_hash, AMQP::durable | AMQP::autodelete, binding_arguments)
.onError([&](const char * message)
{
2021-05-01 18:00:43 +00:00
/// This error can be a result of same reasons as above for exchange_name, i.e. it will mean that sharding exchange name appeared
/// to be the same as some other exchange (which purpose is not for sharding). So probably actual error reason: queue_base parameter
/// is bad.
2020-09-07 11:08:53 +00:00
throw Exception(
2020-12-02 01:17:50 +00:00
ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE,
"Unable to declare sharding exchange ({}). Reason: {}", sharding_exchange, std::string(message));
});
2021-05-01 18:00:43 +00:00
rabbit_channel.bindExchange(bridge_exchange, sharding_exchange, routing_keys[0])
.onError([&](const char * message)
{
2020-09-07 11:08:53 +00:00
throw Exception(
ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE,
"Unable to bind bridge exchange ({}) to sharding exchange ({}). Reason: {}",
bridge_exchange,
sharding_exchange,
std::string(message));
});
consumer_exchange = sharding_exchange;
}
2021-05-13 09:39:57 +00:00
void StorageRabbitMQ::bindExchange(AMQP::TcpChannel & rabbit_channel)
{
2020-07-24 12:33:07 +00:00
size_t bound_keys = 0;
if (exchange_type == AMQP::ExchangeType::headers)
{
AMQP::Table bind_headers;
for (const auto & header : routing_keys)
{
std::vector<String> matching;
boost::split(matching, header, [](char c){ return c == '='; });
bind_headers[matching[0]] = matching[1];
}
2021-05-01 18:00:43 +00:00
rabbit_channel.bindExchange(exchange_name, bridge_exchange, routing_keys[0], bind_headers)
2021-09-10 10:28:09 +00:00
.onSuccess([&]() { connection->getHandler().stopLoop(); })
.onError([&](const char * message)
{
2020-09-07 11:08:53 +00:00
throw Exception(
ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE,
"Unable to bind exchange {} to bridge exchange ({}). Reason: {}",
2020-12-02 01:17:50 +00:00
exchange_name, bridge_exchange, std::string(message));
});
}
else if (exchange_type == AMQP::ExchangeType::fanout || exchange_type == AMQP::ExchangeType::consistent_hash)
{
2021-05-01 18:00:43 +00:00
rabbit_channel.bindExchange(exchange_name, bridge_exchange, routing_keys[0])
2021-09-10 10:28:09 +00:00
.onSuccess([&]() { connection->getHandler().stopLoop(); })
.onError([&](const char * message)
{
2020-09-07 11:08:53 +00:00
throw Exception(
ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE,
"Unable to bind exchange {} to bridge exchange ({}). Reason: {}",
2020-12-02 01:17:50 +00:00
exchange_name, bridge_exchange, std::string(message));
});
}
else
{
for (const auto & routing_key : routing_keys)
{
2021-05-01 18:00:43 +00:00
rabbit_channel.bindExchange(exchange_name, bridge_exchange, routing_key)
.onSuccess([&]()
{
2020-07-24 12:33:07 +00:00
++bound_keys;
if (bound_keys == routing_keys.size())
2021-09-10 10:28:09 +00:00
connection->getHandler().stopLoop();
})
.onError([&](const char * message)
{
2020-09-07 11:08:53 +00:00
throw Exception(
ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE,
"Unable to bind exchange {} to bridge exchange ({}). Reason: {}",
2020-12-02 01:17:50 +00:00
exchange_name, bridge_exchange, std::string(message));
});
}
}
2021-09-10 10:28:09 +00:00
connection->getHandler().startBlockingLoop();
}
2021-05-13 09:39:57 +00:00
void StorageRabbitMQ::bindQueue(size_t queue_id, AMQP::TcpChannel & rabbit_channel)
2020-10-27 07:14:38 +00:00
{
auto success_callback = [&](const std::string & queue_name, int msgcount, int /* consumercount */)
{
queues.emplace_back(queue_name);
LOG_DEBUG(log, "Queue {} is declared", queue_name);
if (msgcount)
LOG_INFO(log, "Queue {} is non-empty. Non-consumed messaged will also be delivered", queue_name);
/* Here we bind either to sharding exchange (consistent-hash) or to bridge exchange (fanout). All bindings to routing keys are
* done between client's exchange and local bridge exchange. Binding key must be a string integer in case of hash exchange, for
* fanout exchange it can be arbitrary
*/
2021-05-01 18:00:43 +00:00
rabbit_channel.bindQueue(consumer_exchange, queue_name, std::to_string(queue_id))
2021-09-10 10:28:09 +00:00
.onSuccess([&] { connection->getHandler().stopLoop(); })
2020-10-27 07:14:38 +00:00
.onError([&](const char * message)
{
throw Exception(
ErrorCodes::CANNOT_CREATE_RABBITMQ_QUEUE_BINDING,
"Failed to create queue binding for exchange {}. Reason: {}", exchange_name, std::string(message));
});
};
auto error_callback([&](const char * message)
{
/* This error is most likely a result of an attempt to declare queue with different settings if it was declared before. So for a
* given queue name either deadletter_exchange parameter changed or queue_size changed, i.e. table was declared with different
* max_block_size parameter. Solution: client should specify a different queue_base parameter or manually delete previously
* declared queues via any of the various cli tools.
*/
throw Exception("Failed to declare queue. Probably queue settings are conflicting: max_block_size, deadletter_exchange. Attempt \
specifying differently those settings or use a different queue_base or manually delete previously declared queues, \
which were declared with the same names. ERROR reason: "
+ std::string(message), ErrorCodes::BAD_ARGUMENTS);
});
AMQP::Table queue_settings;
2021-05-14 07:11:32 +00:00
std::unordered_set<String> integer_settings = {"x-max-length", "x-max-length-bytes", "x-message-ttl", "x-expires", "x-priority", "x-max-priority"};
std::unordered_set<String> string_settings = {"x-overflow", "x-dead-letter-exchange", "x-queue-type"};
2020-10-27 07:14:38 +00:00
/// Check user-defined settings.
if (!queue_settings_list.empty())
{
for (const auto & setting : queue_settings_list)
{
Strings setting_values;
splitInto<'='>(setting_values, setting);
2021-05-14 07:11:32 +00:00
if (setting_values.size() != 2)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid settings string: {}", setting);
2021-05-14 07:11:32 +00:00
String key = setting_values[0], value = setting_values[1];
2021-05-14 07:11:32 +00:00
if (integer_settings.contains(key))
2021-05-05 07:52:21 +00:00
queue_settings[key] = parse<uint64_t>(value);
else if (string_settings.find(key) != string_settings.end())
queue_settings[key] = value;
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported queue setting: {}", value);
}
}
2020-10-27 07:14:38 +00:00
/// If queue_base - a single name, then it can be used as one specific queue, from which to read.
/// Otherwise it is used as a generator (unique for current table) of queue names, because it allows to
/// maximize performance - via setting `rabbitmq_num_queues`.
2020-10-27 07:14:38 +00:00
const String queue_name = !hash_exchange ? queue_base : std::to_string(queue_id) + "_" + queue_base;
2021-05-05 07:52:21 +00:00
/// AMQP::autodelete setting is not allowed, because in case of server restart there will be no consumers
/// and deleting queues should not take place.
2021-05-04 16:26:47 +00:00
rabbit_channel.declareQueue(queue_name, AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback);
2021-09-10 10:28:09 +00:00
connection->getHandler().startBlockingLoop();
2020-10-27 07:14:38 +00:00
}
2020-12-03 14:11:35 +00:00
bool StorageRabbitMQ::updateChannel(ChannelPtr & channel)
2020-08-28 08:52:02 +00:00
{
2021-09-16 10:46:43 +00:00
try
2020-08-28 08:52:02 +00:00
{
2021-09-16 10:46:43 +00:00
channel = connection->createChannel();
2021-12-11 21:19:06 +00:00
return true;
2020-08-28 08:52:02 +00:00
}
2021-09-16 10:46:43 +00:00
catch (...)
2020-08-28 08:52:02 +00:00
{
2021-09-16 10:46:43 +00:00
tryLogCurrentException(log);
return false;
2020-08-28 08:52:02 +00:00
}
}
void StorageRabbitMQ::prepareChannelForConsumer(RabbitMQConsumerPtr consumer)
2021-12-11 21:19:06 +00:00
{
if (!consumer)
2021-12-11 21:19:06 +00:00
return;
if (consumer->queuesCount() != queues.size())
consumer->updateQueues(queues);
2021-12-11 21:19:06 +00:00
consumer->updateAckTracker();
2021-12-11 21:19:06 +00:00
if (updateChannel(consumer->getChannel()))
consumer->setupChannel();
2021-12-11 21:19:06 +00:00
}
void StorageRabbitMQ::unbindExchange()
{
2020-08-28 08:52:02 +00:00
/* This is needed because with RabbitMQ (without special adjustments) can't, for example, properly make mv if there was insert query
* on the same table before, and in another direction it will make redundant copies, but most likely nobody will do that.
* As publishing is done to exchange, publisher never knows to which queues the message will go, every application interested in
2020-08-15 06:50:53 +00:00
* consuming from certain exchange - declares its owns exchange-bound queues, messages go to all such exchange-bound queues, and as
* input streams are always created at startup, then they will also declare its own exchange bound queues, but they will not be visible
* externally - client declares its own exchange-bound queues, from which to consume, so this means that if not disconnecting this local
2020-08-28 08:52:02 +00:00
* queues, then messages will go both ways and in one of them they will remain not consumed. So need to disconnect local exchange
2020-08-31 16:34:16 +00:00
* bindings to remove redunadant message copies, but after that mv cannot work unless those bindings are recreated. Recreating them is
* not difficult but very ugly and as probably nobody will do such thing - bindings will not be recreated.
2020-08-15 06:50:53 +00:00
*/
2021-11-02 19:47:27 +00:00
if (!exchange_removed.exchange(true))
{
2021-11-02 19:47:27 +00:00
try
{
2021-11-02 19:47:27 +00:00
streaming_task->deactivate();
2021-11-06 19:02:01 +00:00
stopLoop();
2021-11-02 19:47:27 +00:00
looping_task->deactivate();
auto rabbit_channel = connection->createChannel();
rabbit_channel->removeExchange(bridge_exchange)
.onSuccess([&]()
{
connection->getHandler().stopLoop();
})
.onError([&](const char * message)
{
throw Exception("Unable to remove exchange. Reason: " + std::string(message), ErrorCodes::CANNOT_REMOVE_RABBITMQ_EXCHANGE);
});
2021-11-02 19:47:27 +00:00
connection->getHandler().startBlockingLoop();
rabbit_channel->close();
}
catch (...)
{
2021-11-02 19:47:27 +00:00
exchange_removed = false;
throw;
}
2021-11-02 19:47:27 +00:00
}
}
2022-05-20 19:49:31 +00:00
void StorageRabbitMQ::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
2022-05-20 19:49:31 +00:00
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum /* processed_stage */,
size_t /* max_block_size */,
size_t /* num_streams */)
{
2020-12-04 14:50:42 +00:00
if (!rabbit_is_ready)
throw Exception("RabbitMQ setup not finished. Connection might be lost", ErrorCodes::CANNOT_CONNECT_RABBITMQ);
if (num_created_consumers == 0)
2022-05-20 19:49:31 +00:00
{
auto header = storage_snapshot->getSampleBlockForColumns(column_names);
InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, header, query_info, local_context);
return;
}
2021-11-29 13:13:20 +00:00
if (!local_context->getSettingsRef().stream_like_engine_allow_direct_select)
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Direct select is not allowed. To enable use setting `stream_like_engine_allow_direct_select`");
if (mv_attached)
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageRabbitMQ with attached materialized views");
2021-11-06 19:02:01 +00:00
std::lock_guard lock(loop_mutex);
auto sample_block = storage_snapshot->getSampleBlockForColumns(column_names);
auto modified_context = addSettings(local_context);
2020-08-28 08:52:02 +00:00
2021-09-10 10:28:09 +00:00
if (!connection->isConnected())
2020-08-28 08:52:02 +00:00
{
2021-09-10 10:28:09 +00:00
if (connection->getHandler().loopRunning())
2020-08-31 16:34:16 +00:00
deactivateTask(looping_task, false, true);
2021-09-10 10:28:09 +00:00
if (!connection->reconnect())
throw Exception(ErrorCodes::CANNOT_CONNECT_RABBITMQ, "No connection to {}", connection->connectionInfoForLog());
2020-08-28 08:52:02 +00:00
}
2021-09-16 10:46:43 +00:00
initializeBuffers();
Pipes pipes;
pipes.reserve(num_created_consumers);
for (size_t i = 0; i < num_created_consumers; ++i)
{
auto rabbit_source = std::make_shared<RabbitMQSource>(
*this, storage_snapshot, modified_context, column_names, 1, rabbitmq_settings->rabbitmq_commit_on_select);
2020-08-28 08:52:02 +00:00
auto converting_dag = ActionsDAG::makeConvertingActions(
rabbit_source->getPort().getHeader().getColumnsWithTypeAndName(),
sample_block.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
auto converting = std::make_shared<ExpressionActions>(std::move(converting_dag));
auto converting_transform = std::make_shared<ExpressionTransform>(rabbit_source->getPort().getHeader(), std::move(converting));
pipes.emplace_back(std::move(rabbit_source));
pipes.back().addTransform(std::move(converting_transform));
}
2021-09-10 10:28:09 +00:00
if (!connection->getHandler().loopRunning() && connection->isConnected())
2021-11-06 19:02:01 +00:00
startLoop();
LOG_DEBUG(log, "Starting reading {} streams", pipes.size());
2022-05-20 19:49:31 +00:00
auto pipe = Pipe::unitePipes(std::move(pipes));
if (pipe.empty())
{
auto header = storage_snapshot->getSampleBlockForColumns(column_names);
InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, header, query_info, local_context);
}
else
{
2022-05-31 14:43:38 +00:00
auto read_step = std::make_unique<ReadFromStorageStep>(std::move(pipe), getName(), query_info.storage_limits);
2022-05-20 19:49:31 +00:00
query_plan.addStep(std::move(read_step));
query_plan.addInterpreterContext(modified_context);
2022-05-20 19:49:31 +00:00
}
}
2021-07-23 14:25:35 +00:00
SinkToStoragePtr StorageRabbitMQ::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
2020-06-01 15:37:23 +00:00
{
auto producer = std::make_unique<RabbitMQProducer>(
configuration, routing_keys, exchange_name, exchange_type, producer_id.fetch_add(1), persistent, shutdown_called, log);
size_t max_rows = max_rows_per_message;
/// Need for backward compatibility.
if (format_name == "Avro" && local_context->getSettingsRef().output_format_avro_rows_in_file.changed)
max_rows = local_context->getSettingsRef().output_format_avro_rows_in_file.value;
return std::make_shared<MessageQueueSink>(
metadata_snapshot->getSampleBlockNonMaterialized(),
getFormatName(),
max_rows,
std::move(producer),
getName(),
local_context);
2020-06-01 15:37:23 +00:00
}
void StorageRabbitMQ::startup()
{
2021-09-16 10:46:43 +00:00
if (!rabbit_is_ready)
{
if (connection->isConnected())
{
try
{
initRabbitMQ();
}
catch (...)
{
if (!is_attach)
throw;
2021-12-11 21:19:06 +00:00
tryLogCurrentException(log);
2021-09-16 10:46:43 +00:00
}
}
else
{
connection_task->activateAndSchedule();
}
}
2020-10-27 07:14:38 +00:00
for (size_t i = 0; i < num_consumers; ++i)
{
try
{
auto consumer = createConsumer();
pushConsumer(std::move(consumer));
++num_created_consumers;
}
2021-12-11 21:19:06 +00:00
catch (...)
{
2021-12-11 21:19:06 +00:00
if (!is_attach)
throw;
tryLogCurrentException(log);
}
}
streaming_task->activateAndSchedule();
}
void StorageRabbitMQ::shutdown()
{
2021-11-02 19:47:27 +00:00
shutdown_called = true;
2021-05-04 16:26:47 +00:00
/// In case it has not yet been able to setup connection;
deactivateTask(connection_task, true, false);
/// The order of deactivating tasks is important: wait for streamingToViews() func to finish and
/// then wait for background event loop to finish.
2020-08-31 16:34:16 +00:00
deactivateTask(streaming_task, true, false);
deactivateTask(looping_task, true, true);
2020-08-28 08:52:02 +00:00
2021-09-09 12:08:18 +00:00
/// Just a paranoid try catch, it is not actually needed.
try
2021-05-04 16:26:47 +00:00
{
2021-09-09 12:08:18 +00:00
if (drop_table)
{
for (auto & consumer : consumers)
consumer->closeChannel();
2021-05-04 16:26:47 +00:00
2021-09-09 12:08:18 +00:00
cleanupRabbitMQ();
}
2020-08-15 06:50:53 +00:00
/// It is important to close connection here - before removing consumers, because
/// it will finish and clean callbacks, which might use those consumers data.
2021-09-10 10:28:09 +00:00
connection->disconnect();
2021-09-09 12:08:18 +00:00
for (size_t i = 0; i < num_created_consumers; ++i)
popConsumer();
2021-09-09 12:08:18 +00:00
}
catch (...)
{
tryLogCurrentException(log);
}
}
2021-05-04 16:26:47 +00:00
/// The only thing publishers are supposed to be aware of is _exchanges_ and queues are a responsibility of a consumer.
2021-05-05 07:52:21 +00:00
/// Therefore, if a table is dropped, a clean up is needed.
2021-05-04 16:26:47 +00:00
void StorageRabbitMQ::cleanupRabbitMQ() const
{
2021-05-04 19:57:45 +00:00
if (use_user_setup)
return;
2021-09-09 12:08:18 +00:00
connection->heartbeat();
2021-09-10 10:28:09 +00:00
if (!connection->isConnected())
{
String queue_names;
for (const auto & queue : queues)
{
if (!queue_names.empty())
queue_names += ", ";
queue_names += queue;
}
LOG_WARNING(log,
"RabbitMQ clean up not done, because there is no connection in table's shutdown."
"There are {} queues ({}), which might need to be deleted manually. Exchanges will be auto-deleted",
queues.size(), queue_names);
return;
}
2021-09-10 10:28:09 +00:00
auto rabbit_channel = connection->createChannel();
2021-05-04 16:26:47 +00:00
for (const auto & queue : queues)
{
/// AMQP::ifunused is needed, because it is possible to share queues between multiple tables and dropping
/// on of them should not affect others.
/// AMQP::ifempty is not used on purpose.
2021-09-10 10:28:09 +00:00
rabbit_channel->removeQueue(queue, AMQP::ifunused)
2021-05-04 16:26:47 +00:00
.onSuccess([&](uint32_t num_messages)
{
LOG_TRACE(log, "Successfully deleted queue {}, messages contained {}", queue, num_messages);
2021-09-10 10:28:09 +00:00
connection->getHandler().stopLoop();
2021-05-04 16:26:47 +00:00
})
.onError([&](const char * message)
{
LOG_ERROR(log, "Failed to delete queue {}. Error message: {}", queue, message);
2021-09-10 10:28:09 +00:00
connection->getHandler().stopLoop();
2021-05-04 16:26:47 +00:00
});
}
2021-09-10 10:28:09 +00:00
connection->getHandler().startBlockingLoop();
rabbit_channel->close();
2021-05-04 19:57:45 +00:00
/// Also there is no need to cleanup exchanges as they were created with AMQP::autodelete option. Once queues
/// are removed, exchanges will also be cleaned.
2021-05-04 16:26:47 +00:00
}
void StorageRabbitMQ::pushConsumer(RabbitMQConsumerPtr consumer)
{
std::lock_guard lock(consumers_mutex);
consumers.push_back(consumer);
semaphore.set();
}
RabbitMQConsumerPtr StorageRabbitMQ::popConsumer()
{
return popConsumer(std::chrono::milliseconds::zero());
}
RabbitMQConsumerPtr StorageRabbitMQ::popConsumer(std::chrono::milliseconds timeout)
{
// Wait for the first free consumer
if (timeout == std::chrono::milliseconds::zero())
semaphore.wait();
else
{
if (!semaphore.tryWait(timeout.count()))
return nullptr;
}
// Take the first available consumer from the list
std::lock_guard lock(consumers_mutex);
auto consumer = consumers.back();
consumers.pop_back();
return consumer;
}
RabbitMQConsumerPtr StorageRabbitMQ::createConsumer()
{
return std::make_shared<RabbitMQConsumer>(
2021-12-11 21:19:06 +00:00
connection->getHandler(), queues, ++consumer_id,
unique_strbase, log, queue_size, shutdown_called);
}
bool StorageRabbitMQ::checkDependencies(const StorageID & table_id)
{
// Check if all dependencies are attached
auto view_ids = DatabaseCatalog::instance().getDependentViews(table_id);
if (view_ids.empty())
return true;
// Check the dependencies are ready?
for (const auto & view_id : view_ids)
{
auto view = DatabaseCatalog::instance().tryGetTable(view_id, getContext());
if (!view)
return false;
// If it materialized view, check it's target table
auto * materialized_view = dynamic_cast<StorageMaterializedView *>(view.get());
if (materialized_view && !materialized_view->tryGetTargetTable())
return false;
// Check all its dependencies
if (!checkDependencies(view_id))
return false;
}
return true;
}
2021-09-16 10:46:43 +00:00
void StorageRabbitMQ::initializeBuffers()
{
assert(rabbit_is_ready);
if (!initialized)
{
for (const auto & consumer : consumers)
prepareChannelForConsumer(consumer);
2021-09-16 10:46:43 +00:00
initialized = true;
}
}
2020-09-07 11:08:53 +00:00
void StorageRabbitMQ::streamingToViewsFunc()
{
2021-11-06 19:02:01 +00:00
if (rabbit_is_ready)
{
2020-12-02 01:17:50 +00:00
try
{
2020-12-02 01:17:50 +00:00
auto table_id = getStorageID();
2020-12-02 01:17:50 +00:00
// Check if at least one direct dependency is attached
size_t num_views = DatabaseCatalog::instance().getDependentViews(table_id).size();
2021-11-06 19:02:01 +00:00
bool rabbit_connected = connection->isConnected() || connection->reconnect();
if (num_views && rabbit_connected)
2020-12-02 01:17:50 +00:00
{
2021-11-06 19:02:01 +00:00
initializeBuffers();
2020-12-02 01:17:50 +00:00
auto start_time = std::chrono::steady_clock::now();
2020-10-25 07:44:04 +00:00
2021-11-29 13:13:20 +00:00
mv_attached.store(true);
2020-12-02 01:17:50 +00:00
// Keep streaming as long as there are attached views and streaming is not cancelled
2021-11-02 19:47:27 +00:00
while (!shutdown_called && num_created_consumers > 0)
2020-10-25 07:44:04 +00:00
{
2020-12-02 01:17:50 +00:00
if (!checkDependencies(table_id))
break;
LOG_DEBUG(log, "Started streaming to {} attached views", num_views);
2020-12-02 01:17:50 +00:00
if (streamToViews())
{
/// Reschedule with backoff.
if (milliseconds_to_wait < BACKOFF_TRESHOLD)
milliseconds_to_wait *= 2;
2021-11-06 19:02:01 +00:00
stopLoopIfNoReaders();
2020-12-02 01:17:50 +00:00
break;
}
else
{
milliseconds_to_wait = RESCHEDULE_MS;
}
2020-12-02 01:17:50 +00:00
auto end_time = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
if (duration.count() > MAX_THREAD_WORK_DURATION_MS)
{
2021-11-06 19:02:01 +00:00
stopLoopIfNoReaders();
2020-12-02 01:17:50 +00:00
LOG_TRACE(log, "Reschedule streaming. Thread work duration limit exceeded.");
break;
}
2020-10-25 07:44:04 +00:00
}
}
}
2020-12-02 01:17:50 +00:00
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
2021-11-29 13:13:20 +00:00
mv_attached.store(false);
2021-11-06 19:02:01 +00:00
/// If there is no running select, stop the loop which was
/// activated by previous select.
if (connection->getHandler().loopRunning())
stopLoopIfNoReaders();
2021-11-02 19:47:27 +00:00
if (!shutdown_called)
streaming_task->scheduleAfter(milliseconds_to_wait);
}
bool StorageRabbitMQ::streamToViews()
{
auto table_id = getStorageID();
auto table = DatabaseCatalog::instance().getTable(table_id, getContext());
if (!table)
throw Exception("Engine table " + table_id.getNameForLogs() + " doesn't exist.", ErrorCodes::LOGICAL_ERROR);
// Create an INSERT query for streaming data
auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = table_id;
2020-08-28 08:52:02 +00:00
// Only insert into dependent views and expect that input blocks contain virtual columns
InterpreterInsertQuery interpreter(insert, rabbitmq_context, false, true, true);
auto block_io = interpreter.execute();
auto storage_snapshot = getStorageSnapshot(getInMemoryMetadataPtr(), getContext());
2021-09-24 10:23:28 +00:00
auto column_names = block_io.pipeline.getHeader().getNames();
auto sample_block = storage_snapshot->getSampleBlockForColumns(column_names);
2020-08-28 08:52:02 +00:00
2020-08-31 16:34:16 +00:00
auto block_size = getMaxBlockSize();
// Create a stream for each consumer and join them in a union stream
std::vector<std::shared_ptr<RabbitMQSource>> sources;
Pipes pipes;
sources.reserve(num_created_consumers);
pipes.reserve(num_created_consumers);
for (size_t i = 0; i < num_created_consumers; ++i)
{
auto source = std::make_shared<RabbitMQSource>(
*this, storage_snapshot, rabbitmq_context, column_names, block_size, false);
sources.emplace_back(source);
pipes.emplace_back(source);
2022-05-20 19:49:31 +00:00
Poco::Timespan max_execution_time = rabbitmq_settings->rabbitmq_flush_interval_ms.changed
? rabbitmq_settings->rabbitmq_flush_interval_ms
: getContext()->getSettingsRef().stream_flush_interval_ms;
2020-08-28 08:52:02 +00:00
2022-05-20 19:49:31 +00:00
source->setTimeLimit(max_execution_time);
}
block_io.pipeline.complete(Pipe::unitePipes(std::move(pipes)));
2020-10-25 07:44:04 +00:00
2021-09-10 10:28:09 +00:00
if (!connection->getHandler().loopRunning())
2021-11-06 19:02:01 +00:00
startLoop();
2020-10-25 07:44:04 +00:00
2021-09-24 10:41:52 +00:00
{
CompletedPipelineExecutor executor(block_io.pipeline);
executor.execute();
2021-09-24 10:41:52 +00:00
}
2020-10-25 07:44:04 +00:00
/* Note: sending ack() with loop running in another thread will lead to a lot of data races inside the library, but only in case
* error occurs or connection is lost while ack is being sent
2020-08-28 08:52:02 +00:00
*/
2020-10-25 07:44:04 +00:00
deactivateTask(looping_task, false, true);
size_t queue_empty = 0;
2020-08-28 08:52:02 +00:00
2021-09-10 10:28:09 +00:00
if (!connection->isConnected())
2020-08-28 08:52:02 +00:00
{
2021-11-02 19:47:27 +00:00
if (shutdown_called)
2020-10-25 07:44:04 +00:00
return true;
2021-09-10 10:28:09 +00:00
if (connection->reconnect())
2020-08-28 08:52:02 +00:00
{
2021-09-16 10:46:43 +00:00
LOG_DEBUG(log, "Connection restored, updating channels");
for (auto & source : sources)
source->updateChannel();
2020-08-28 08:52:02 +00:00
}
else
{
2020-10-25 07:44:04 +00:00
LOG_TRACE(log, "Reschedule streaming. Unable to restore connection.");
return true;
2020-08-28 08:52:02 +00:00
}
}
else
{
/// Commit
for (auto & source : sources)
2020-08-28 08:52:02 +00:00
{
if (source->queueEmpty())
2020-10-25 07:44:04 +00:00
++queue_empty;
if (source->needChannelUpdate())
2020-12-05 21:55:00 +00:00
{
auto consumer = source->getBuffer();
prepareChannelForConsumer(consumer);
2020-12-05 21:55:00 +00:00
}
2020-08-31 16:34:16 +00:00
/* false is returned by the sendAck function in only two cases:
* 1) if connection failed. In this case all channels will be closed and will be unable to send ack. Also ack is made based on
* delivery tags, which are unique to channels, so if channels fail, those delivery tags will become invalid and there is
* no way to send specific ack from a different channel. Actually once the server realises that it has messages in a queue
* waiting for confirm from a channel which suddenly closed, it will immediately make those messages accessible to other
* consumers. So in this case duplicates are inevitable.
* 2) size of the sent frame (libraries's internal request interface) exceeds max frame - internal library error. This is more
* common for message frames, but not likely to happen to ack frame I suppose. So I do not believe it is likely to happen.
* Also in this case if channel didn't get closed - it is ok if failed to send ack, because the next attempt to send ack on
* the same channel will also commit all previously not-committed messages. Anyway I do not think that for ack frame this
* will ever happen.
*/
if (!source->sendAck())
2020-08-28 08:52:02 +00:00
{
2020-09-03 06:14:13 +00:00
/// Iterate loop to activate error callbacks if they happened
2021-09-10 10:28:09 +00:00
connection->getHandler().iterateLoop();
if (!connection->isConnected())
2020-08-28 08:52:02 +00:00
break;
}
2020-10-25 07:44:04 +00:00
2021-09-10 10:28:09 +00:00
connection->getHandler().iterateLoop();
2020-08-28 08:52:02 +00:00
}
}
2020-10-27 07:14:38 +00:00
if ((queue_empty == num_created_consumers) && (++read_attempts == MAX_FAILED_READ_ATTEMPTS))
2020-10-25 07:44:04 +00:00
{
connection->heartbeat();
read_attempts = 0;
LOG_TRACE(log, "Reschedule streaming. Queues are empty.");
return true;
}
else
{
2021-11-06 19:02:01 +00:00
startLoop();
2020-10-25 07:44:04 +00:00
}
/// Do not reschedule, do not stop event loop.
2020-10-25 07:44:04 +00:00
return false;
2020-05-20 05:30:38 +00:00
}
void registerStorageRabbitMQ(StorageFactory & factory)
{
auto creator_fn = [](const StorageFactory::Arguments & args)
{
2020-08-31 09:12:36 +00:00
auto rabbitmq_settings = std::make_unique<RabbitMQSettings>();
bool with_named_collection = getExternalDataSourceConfiguration(args.engine_args, *rabbitmq_settings, args.getLocalContext());
if (!with_named_collection && !args.storage_def->settings)
2021-08-30 17:55:23 +00:00
throw Exception(ErrorCodes::BAD_ARGUMENTS, "RabbitMQ engine must have settings");
2022-05-31 12:38:11 +00:00
if (args.storage_def->settings)
rabbitmq_settings->loadFromQuery(*args.storage_def);
2021-08-30 17:55:23 +00:00
if (!rabbitmq_settings->rabbitmq_host_port.changed
&& !rabbitmq_settings->rabbitmq_address.changed)
2021-09-01 10:28:34 +00:00
throw Exception("You must specify either `rabbitmq_host_port` or `rabbitmq_address` settings",
2021-08-30 17:55:23 +00:00
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
2020-05-20 05:30:38 +00:00
2021-08-30 17:55:23 +00:00
if (!rabbitmq_settings->rabbitmq_format.changed)
2021-09-01 10:28:34 +00:00
throw Exception("You must specify `rabbitmq_format` setting", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
2020-05-20 05:30:38 +00:00
return std::make_shared<StorageRabbitMQ>(args.table_id, args.getContext(), args.columns, std::move(rabbitmq_settings), args.attach);
2020-05-20 05:30:38 +00:00
};
factory.registerStorage("RabbitMQ", creator_fn, StorageFactory::StorageFeatures{ .supports_settings = true, });
}
NamesAndTypesList StorageRabbitMQ::getVirtuals() const
{
return NamesAndTypesList{
2020-07-20 10:05:00 +00:00
{"_exchange_name", std::make_shared<DataTypeString>()},
2020-08-15 06:50:53 +00:00
{"_channel_id", std::make_shared<DataTypeString>()},
2020-07-20 10:05:00 +00:00
{"_delivery_tag", std::make_shared<DataTypeUInt64>()},
{"_redelivered", std::make_shared<DataTypeUInt8>()},
2020-10-27 20:28:52 +00:00
{"_message_id", std::make_shared<DataTypeString>()},
{"_timestamp", std::make_shared<DataTypeUInt64>()}
2020-05-20 05:30:38 +00:00
};
}
}