2020-05-20 09:40:49 +00:00
|
|
|
#include <Storages/RabbitMQ/StorageRabbitMQ.h>
|
2020-05-20 05:30:38 +00:00
|
|
|
#include <DataStreams/IBlockInputStream.h>
|
2020-07-02 14:35:10 +00:00
|
|
|
#include <DataStreams/ConvertingBlockInputStream.h>
|
2020-05-20 05:30:38 +00:00
|
|
|
#include <DataStreams/UnionBlockInputStream.h>
|
|
|
|
#include <DataStreams/copyData.h>
|
|
|
|
#include <DataTypes/DataTypeDateTime.h>
|
|
|
|
#include <DataTypes/DataTypeNullable.h>
|
|
|
|
#include <DataTypes/DataTypesNumber.h>
|
|
|
|
#include <DataTypes/DataTypeString.h>
|
|
|
|
#include <Interpreters/InterpreterInsertQuery.h>
|
|
|
|
#include <Interpreters/evaluateConstantExpression.h>
|
2020-12-10 22:05:02 +00:00
|
|
|
#include <Interpreters/Context.h>
|
2020-05-20 05:30:38 +00:00
|
|
|
#include <Parsers/ASTCreateQuery.h>
|
|
|
|
#include <Parsers/ASTExpressionList.h>
|
|
|
|
#include <Parsers/ASTInsertQuery.h>
|
|
|
|
#include <Parsers/ASTLiteral.h>
|
2020-05-20 09:40:49 +00:00
|
|
|
#include <Storages/RabbitMQ/RabbitMQBlockInputStream.h>
|
2020-06-01 15:37:23 +00:00
|
|
|
#include <Storages/RabbitMQ/RabbitMQBlockOutputStream.h>
|
|
|
|
#include <Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h>
|
2020-05-20 09:40:49 +00:00
|
|
|
#include <Storages/RabbitMQ/RabbitMQHandler.h>
|
2020-05-20 05:30:38 +00:00
|
|
|
#include <Storages/StorageFactory.h>
|
|
|
|
#include <Storages/StorageMaterializedView.h>
|
|
|
|
#include <boost/algorithm/string/replace.hpp>
|
|
|
|
#include <boost/algorithm/string/split.hpp>
|
|
|
|
#include <boost/algorithm/string/trim.hpp>
|
|
|
|
#include <Common/Exception.h>
|
|
|
|
#include <Common/Macros.h>
|
|
|
|
#include <Common/config_version.h>
|
|
|
|
#include <Common/setThreadName.h>
|
|
|
|
#include <Common/typeid_cast.h>
|
|
|
|
#include <common/logger_useful.h>
|
|
|
|
#include <Common/quoteString.h>
|
|
|
|
#include <Common/parseAddress.h>
|
|
|
|
#include <Processors/Sources/SourceFromInputStream.h>
|
|
|
|
#include <amqpcpp.h>
|
|
|
|
|
2020-05-31 09:34:57 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
2020-05-20 09:40:49 +00:00
|
|
|
|
2020-06-27 17:26:00 +00:00
|
|
|
static const auto CONNECT_SLEEP = 200;
|
2020-08-01 12:52:00 +00:00
|
|
|
static const auto RETRIES_MAX = 20;
|
2020-09-01 14:11:34 +00:00
|
|
|
static const uint32_t QUEUE_SIZE = 100000;
|
2020-10-25 07:44:04 +00:00
|
|
|
static const auto MAX_FAILED_READ_ATTEMPTS = 10;
|
2020-10-27 07:20:19 +00:00
|
|
|
static const auto RESCHEDULE_MS = 500;
|
2020-10-25 07:44:04 +00:00
|
|
|
static const auto MAX_THREAD_WORK_DURATION_MS = 60000;
|
2020-06-24 21:14:49 +00:00
|
|
|
|
2020-05-20 05:30:38 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int LOGICAL_ERROR;
|
|
|
|
extern const int BAD_ARGUMENTS;
|
2020-08-31 09:12:36 +00:00
|
|
|
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
2020-12-04 14:50:42 +00:00
|
|
|
extern const int CANNOT_CONNECT_RABBITMQ;
|
2020-09-07 11:08:53 +00:00
|
|
|
extern const int CANNOT_BIND_RABBITMQ_EXCHANGE;
|
|
|
|
extern const int CANNOT_DECLARE_RABBITMQ_EXCHANGE;
|
2020-09-07 10:21:29 +00:00
|
|
|
extern const int CANNOT_REMOVE_RABBITMQ_EXCHANGE;
|
2020-10-27 07:14:38 +00:00
|
|
|
extern const int CANNOT_CREATE_RABBITMQ_QUEUE_BINDING;
|
2020-05-20 05:30:38 +00:00
|
|
|
}
|
|
|
|
|
2020-07-20 06:21:18 +00:00
|
|
|
namespace ExchangeType
|
|
|
|
{
|
|
|
|
/// Note that default here means default by implementation and not by rabbitmq settings
|
|
|
|
static const String DEFAULT = "default";
|
|
|
|
static const String FANOUT = "fanout";
|
|
|
|
static const String DIRECT = "direct";
|
|
|
|
static const String TOPIC = "topic";
|
|
|
|
static const String HASH = "consistent_hash";
|
|
|
|
static const String HEADERS = "headers";
|
|
|
|
}
|
2020-05-20 09:40:49 +00:00
|
|
|
|
2020-08-31 09:12:36 +00:00
|
|
|
|
2020-05-20 05:30:38 +00:00
|
|
|
StorageRabbitMQ::StorageRabbitMQ(
|
|
|
|
const StorageID & table_id_,
|
2021-04-10 23:33:54 +00:00
|
|
|
ContextPtr context_,
|
2020-05-20 05:30:38 +00:00
|
|
|
const ColumnsDescription & columns_,
|
2020-08-31 09:12:36 +00:00
|
|
|
std::unique_ptr<RabbitMQSettings> rabbitmq_settings_)
|
2020-05-20 05:30:38 +00:00
|
|
|
: IStorage(table_id_)
|
2021-04-10 23:33:54 +00:00
|
|
|
, WithContext(context_->getGlobalContext())
|
2020-08-31 09:12:36 +00:00
|
|
|
, rabbitmq_settings(std::move(rabbitmq_settings_))
|
2021-04-10 23:33:54 +00:00
|
|
|
, exchange_name(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_exchange_name.value))
|
|
|
|
, format_name(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_format.value))
|
|
|
|
, exchange_type(defineExchangeType(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_exchange_type.value)))
|
|
|
|
, routing_keys(parseRoutingKeys(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_routing_key_list.value)))
|
2020-08-31 09:12:36 +00:00
|
|
|
, row_delimiter(rabbitmq_settings->rabbitmq_row_delimiter.value)
|
2021-04-10 23:33:54 +00:00
|
|
|
, schema_name(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_schema.value))
|
2020-08-31 09:12:36 +00:00
|
|
|
, num_consumers(rabbitmq_settings->rabbitmq_num_consumers.value)
|
|
|
|
, num_queues(rabbitmq_settings->rabbitmq_num_queues.value)
|
2021-04-10 23:33:54 +00:00
|
|
|
, queue_base(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_queue_base.value))
|
|
|
|
, deadletter_exchange(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_deadletter_exchange.value))
|
2020-08-31 09:12:36 +00:00
|
|
|
, persistent(rabbitmq_settings->rabbitmq_persistent.value)
|
|
|
|
, hash_exchange(num_consumers > 1 || num_queues > 1)
|
2020-06-01 20:48:24 +00:00
|
|
|
, log(&Poco::Logger::get("StorageRabbitMQ (" + table_id_.table_name + ")"))
|
2021-04-10 23:33:54 +00:00
|
|
|
, address(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_host_port.value))
|
2020-09-03 06:14:13 +00:00
|
|
|
, parsed_address(parseAddress(address, 5672))
|
2020-06-05 13:42:11 +00:00
|
|
|
, login_password(std::make_pair(
|
2021-04-10 23:33:54 +00:00
|
|
|
getContext()->getConfigRef().getString("rabbitmq.username"),
|
|
|
|
getContext()->getConfigRef().getString("rabbitmq.password")))
|
2021-04-21 15:51:05 +00:00
|
|
|
, vhost(getContext()->getConfigRef().getString("rabbitmq.vhost", rabbitmq_settings->rabbitmq_vhost.value))
|
2020-08-31 09:12:36 +00:00
|
|
|
, semaphore(0, num_consumers)
|
|
|
|
, unique_strbase(getRandomName())
|
2020-09-01 14:11:34 +00:00
|
|
|
, queue_size(std::max(QUEUE_SIZE, static_cast<uint32_t>(getMaxBlockSize())))
|
2020-05-20 05:30:38 +00:00
|
|
|
{
|
2021-02-25 14:02:34 +00:00
|
|
|
event_handler = std::make_shared<RabbitMQHandler>(loop.getLoop(), log);
|
2020-12-02 01:17:50 +00:00
|
|
|
restoreConnection(false);
|
2020-05-20 09:40:49 +00:00
|
|
|
|
2020-06-24 17:32:57 +00:00
|
|
|
StorageInMemoryMetadata storage_metadata;
|
|
|
|
storage_metadata.setColumns(columns_);
|
|
|
|
setInMemoryMetadata(storage_metadata);
|
2020-05-29 16:04:44 +00:00
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
rabbitmq_context = addSettings(getContext());
|
2020-11-24 16:24:36 +00:00
|
|
|
rabbitmq_context->makeQueryContext();
|
2020-08-31 09:12:36 +00:00
|
|
|
|
2020-08-26 08:54:29 +00:00
|
|
|
/// One looping task for all consumers as they share the same connection == the same handler == the same event loop
|
|
|
|
event_handler->updateLoopState(Loop::STOP);
|
2021-04-10 23:33:54 +00:00
|
|
|
looping_task = getContext()->getMessageBrokerSchedulePool().createTask("RabbitMQLoopingTask", [this]{ loopingFunc(); });
|
2020-08-26 08:54:29 +00:00
|
|
|
looping_task->deactivate();
|
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
streaming_task = getContext()->getMessageBrokerSchedulePool().createTask("RabbitMQStreamingTask", [this]{ streamingToViewsFunc(); });
|
2020-06-24 21:14:49 +00:00
|
|
|
streaming_task->deactivate();
|
2020-08-26 08:54:29 +00:00
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
connection_task = getContext()->getMessageBrokerSchedulePool().createTask("RabbitMQConnectionTask", [this]{ connectionFunc(); });
|
2020-12-02 01:17:50 +00:00
|
|
|
connection_task->deactivate();
|
|
|
|
|
2020-08-06 13:33:46 +00:00
|
|
|
if (queue_base.empty())
|
|
|
|
{
|
2020-08-15 06:50:53 +00:00
|
|
|
/* Make sure that local exchange name is unique for each table and is not the same as client's exchange name. It also needs to
|
2020-08-31 09:12:36 +00:00
|
|
|
* be table-based and not just a random string, because local exchanges should be declared the same for same tables
|
2020-08-15 06:50:53 +00:00
|
|
|
*/
|
2020-08-31 09:12:36 +00:00
|
|
|
sharding_exchange = getTableBasedName(exchange_name, table_id_);
|
2020-08-06 13:33:46 +00:00
|
|
|
|
|
|
|
/* By default without a specified queue name in queue's declaration - its name will be generated by the library, but its better
|
|
|
|
* to specify it unique for each table to reuse them once the table is recreated. So it means that queues remain the same for every
|
|
|
|
* table unless queue_base table setting is specified (which allows to register consumers to specific queues). Now this is a base
|
2020-08-28 08:52:02 +00:00
|
|
|
* for the names of later declared queues
|
2020-08-06 13:33:46 +00:00
|
|
|
*/
|
2020-08-31 09:12:36 +00:00
|
|
|
queue_base = getTableBasedName("", table_id_);
|
2020-08-06 13:33:46 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
/* In case different tables are used to register multiple consumers to the same queues (so queues are shared between tables) and
|
2020-08-26 08:54:29 +00:00
|
|
|
* at the same time sharding exchange is needed (if there are multiple shared queues), then those tables also need to share
|
2020-08-28 08:52:02 +00:00
|
|
|
* sharding exchange and bridge exchange
|
2020-08-06 13:33:46 +00:00
|
|
|
*/
|
2020-08-08 16:45:52 +00:00
|
|
|
sharding_exchange = exchange_name + "_" + queue_base;
|
2020-08-06 13:33:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
bridge_exchange = sharding_exchange + "_bridge";
|
2020-08-31 09:12:36 +00:00
|
|
|
}
|
2020-06-27 17:26:00 +00:00
|
|
|
|
2020-08-31 09:12:36 +00:00
|
|
|
|
|
|
|
Names StorageRabbitMQ::parseRoutingKeys(String routing_key_list)
|
|
|
|
{
|
|
|
|
Names result;
|
|
|
|
boost::split(result, routing_key_list, [](char c){ return c == ','; });
|
|
|
|
for (String & key : result)
|
|
|
|
boost::trim(key);
|
|
|
|
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
AMQP::ExchangeType StorageRabbitMQ::defineExchangeType(String exchange_type_)
|
|
|
|
{
|
|
|
|
AMQP::ExchangeType type;
|
|
|
|
if (exchange_type_ != ExchangeType::DEFAULT)
|
|
|
|
{
|
|
|
|
if (exchange_type_ == ExchangeType::FANOUT) type = AMQP::ExchangeType::fanout;
|
|
|
|
else if (exchange_type_ == ExchangeType::DIRECT) type = AMQP::ExchangeType::direct;
|
|
|
|
else if (exchange_type_ == ExchangeType::TOPIC) type = AMQP::ExchangeType::topic;
|
|
|
|
else if (exchange_type_ == ExchangeType::HASH) type = AMQP::ExchangeType::consistent_hash;
|
|
|
|
else if (exchange_type_ == ExchangeType::HEADERS) type = AMQP::ExchangeType::headers;
|
|
|
|
else throw Exception("Invalid exchange type", ErrorCodes::BAD_ARGUMENTS);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
type = AMQP::ExchangeType::fanout;
|
|
|
|
}
|
|
|
|
|
|
|
|
return type;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
String StorageRabbitMQ::getTableBasedName(String name, const StorageID & table_id)
|
|
|
|
{
|
|
|
|
if (name.empty())
|
2020-11-10 18:22:26 +00:00
|
|
|
return fmt::format("{}_{}", table_id.database_name, table_id.table_name);
|
2020-08-31 09:12:36 +00:00
|
|
|
else
|
2020-11-10 18:22:26 +00:00
|
|
|
return fmt::format("{}_{}_{}", name, table_id.database_name, table_id.table_name);
|
2020-05-20 09:40:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
std::shared_ptr<Context> StorageRabbitMQ::addSettings(ContextPtr local_context) const
|
2020-08-31 16:34:16 +00:00
|
|
|
{
|
2021-04-10 23:33:54 +00:00
|
|
|
auto modified_context = Context::createCopy(local_context);
|
2020-11-24 16:24:36 +00:00
|
|
|
modified_context->setSetting("input_format_skip_unknown_fields", true);
|
|
|
|
modified_context->setSetting("input_format_allow_errors_ratio", 0.);
|
|
|
|
modified_context->setSetting("input_format_allow_errors_num", rabbitmq_settings->rabbitmq_skip_broken_messages.value);
|
2020-08-31 16:34:16 +00:00
|
|
|
|
|
|
|
if (!schema_name.empty())
|
2020-11-24 16:24:36 +00:00
|
|
|
modified_context->setSetting("format_schema", schema_name);
|
2020-08-31 16:34:16 +00:00
|
|
|
|
2021-02-15 21:56:51 +00:00
|
|
|
for (const auto & setting : *rabbitmq_settings)
|
|
|
|
{
|
|
|
|
const auto & setting_name = setting.getName();
|
|
|
|
|
|
|
|
/// check for non-rabbitmq-related settings
|
|
|
|
if (!setting_name.starts_with("rabbitmq_"))
|
|
|
|
modified_context->setSetting(setting_name, setting.getValue());
|
|
|
|
}
|
|
|
|
|
2020-11-24 16:24:36 +00:00
|
|
|
return modified_context;
|
2020-08-31 16:34:16 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-06-27 17:26:00 +00:00
|
|
|
void StorageRabbitMQ::loopingFunc()
|
|
|
|
{
|
2020-08-01 12:52:00 +00:00
|
|
|
if (event_handler->connectionRunning())
|
|
|
|
event_handler->startLoop();
|
2020-06-27 17:26:00 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-12-02 01:17:50 +00:00
|
|
|
void StorageRabbitMQ::connectionFunc()
|
|
|
|
{
|
|
|
|
if (restoreConnection(true))
|
|
|
|
initRabbitMQ();
|
|
|
|
else
|
|
|
|
connection_task->scheduleAfter(RESCHEDULE_MS);
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-08-31 10:00:28 +00:00
|
|
|
/* Need to deactivate this way because otherwise might get a deadlock when first deactivate streaming task in shutdown and then
|
|
|
|
* inside streaming task try to deactivate any other task
|
|
|
|
*/
|
|
|
|
void StorageRabbitMQ::deactivateTask(BackgroundSchedulePool::TaskHolder & task, bool wait, bool stop_loop)
|
|
|
|
{
|
2020-08-31 16:34:16 +00:00
|
|
|
if (stop_loop)
|
|
|
|
event_handler->updateLoopState(Loop::STOP);
|
|
|
|
|
2020-09-07 11:08:53 +00:00
|
|
|
std::unique_lock<std::mutex> lock(task_mutex, std::defer_lock);
|
|
|
|
if (lock.try_lock())
|
2020-08-31 10:00:28 +00:00
|
|
|
{
|
|
|
|
task->deactivate();
|
2020-09-07 11:08:53 +00:00
|
|
|
lock.unlock();
|
2020-08-31 10:00:28 +00:00
|
|
|
}
|
2020-09-03 06:14:13 +00:00
|
|
|
else if (wait) /// Wait only if deactivating from shutdown
|
2020-08-31 10:00:28 +00:00
|
|
|
{
|
2020-09-07 11:08:53 +00:00
|
|
|
lock.lock();
|
2020-08-31 10:00:28 +00:00
|
|
|
task->deactivate();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-09-07 11:08:53 +00:00
|
|
|
size_t StorageRabbitMQ::getMaxBlockSize() const
|
2020-08-31 16:34:16 +00:00
|
|
|
{
|
|
|
|
return rabbitmq_settings->rabbitmq_max_block_size.changed
|
|
|
|
? rabbitmq_settings->rabbitmq_max_block_size.value
|
2021-04-10 23:33:54 +00:00
|
|
|
: (getContext()->getSettingsRef().max_insert_block_size.value / num_consumers);
|
2020-08-31 16:34:16 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-12-02 01:17:50 +00:00
|
|
|
void StorageRabbitMQ::initRabbitMQ()
|
|
|
|
{
|
2021-05-01 18:00:43 +00:00
|
|
|
RabbitMQChannel rabbit_channel(connection.get());
|
2020-12-02 01:17:50 +00:00
|
|
|
|
2021-05-01 18:00:43 +00:00
|
|
|
/// Main exchange -> Bridge exchange -> ( Sharding exchange ) -> Queues -> Consumers
|
|
|
|
|
|
|
|
initExchange(rabbit_channel);
|
|
|
|
bindExchange(rabbit_channel);
|
2020-12-02 01:17:50 +00:00
|
|
|
|
|
|
|
for (const auto i : ext::range(0, num_queues))
|
2021-05-01 18:00:43 +00:00
|
|
|
bindQueue(i + 1, rabbit_channel);
|
2020-12-02 01:17:50 +00:00
|
|
|
|
|
|
|
LOG_TRACE(log, "RabbitMQ setup completed");
|
|
|
|
rabbit_is_ready = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-05-01 18:00:43 +00:00
|
|
|
void StorageRabbitMQ::initExchange(RabbitMQChannel & rabbit_channel)
|
2020-07-21 15:47:39 +00:00
|
|
|
{
|
2021-05-01 18:00:43 +00:00
|
|
|
/// Exchange hierarchy:
|
|
|
|
/// 1. Main exchange (defined with table settings - rabbitmq_exchange_name, rabbitmq_exchange_type).
|
|
|
|
/// 2. Bridge exchange (fanout). Used to easily disconnect main exchange and to simplify queue bindings.
|
|
|
|
/// 3. Sharding (or hash) exchange. Used in case of multiple queues.
|
|
|
|
/// 4. Consumer exchange. Just an alias for bridge_exchange or sharding exchange to know to what exchange
|
|
|
|
/// queues will be bound.
|
|
|
|
|
|
|
|
/// All exchanges are declared with options:
|
|
|
|
/// 1. `durable` (survive RabbitMQ server restart)
|
|
|
|
/// 2. `autodelete` (auto delete in case of queue bindings are dropped).
|
|
|
|
|
|
|
|
rabbit_channel.declareExchange(exchange_name, exchange_type, AMQP::durable)
|
2020-07-21 15:47:39 +00:00
|
|
|
.onError([&](const char * message)
|
|
|
|
{
|
2021-05-01 18:00:43 +00:00
|
|
|
/// This error can be a result of attempt to declare exchange if it was already declared but
|
|
|
|
/// 1) with different exchange type.
|
|
|
|
/// 2) with different exchange settings.
|
2020-09-07 11:08:53 +00:00
|
|
|
throw Exception("Unable to declare exchange. Make sure specified exchange is not already declared. Error: "
|
|
|
|
+ std::string(message), ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE);
|
2020-07-21 15:47:39 +00:00
|
|
|
});
|
|
|
|
|
2021-05-01 18:00:43 +00:00
|
|
|
rabbit_channel.declareExchange(bridge_exchange, AMQP::fanout, AMQP::durable | AMQP::autodelete)
|
2020-07-21 15:47:39 +00:00
|
|
|
.onError([&](const char * message)
|
|
|
|
{
|
2021-05-01 18:00:43 +00:00
|
|
|
/// This error is not supposed to happen as this exchange name is always unique to type and its settings.
|
2020-09-07 11:08:53 +00:00
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE, "Unable to declare bridge exchange ({}). Reason: {}", bridge_exchange, std::string(message));
|
2020-07-21 15:47:39 +00:00
|
|
|
});
|
|
|
|
|
|
|
|
if (!hash_exchange)
|
|
|
|
{
|
|
|
|
consumer_exchange = bridge_exchange;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
AMQP::Table binding_arguments;
|
2021-05-01 18:00:43 +00:00
|
|
|
|
|
|
|
/// Default routing key property in case of hash exchange is a routing key, which is required to be an integer.
|
|
|
|
/// Support for arbitrary exchange type (i.e. arbitary pattern of routing keys) requires to eliminate this dependency.
|
|
|
|
/// This settings changes hash propery to message_id.
|
2020-07-21 15:47:39 +00:00
|
|
|
binding_arguments["hash-property"] = "message_id";
|
|
|
|
|
2021-05-01 18:00:43 +00:00
|
|
|
/// Declare hash exchange for sharding.
|
|
|
|
rabbit_channel.declareExchange(sharding_exchange, AMQP::consistent_hash, AMQP::durable | AMQP::autodelete, binding_arguments)
|
2020-07-21 15:47:39 +00:00
|
|
|
.onError([&](const char * message)
|
|
|
|
{
|
2021-05-01 18:00:43 +00:00
|
|
|
/// This error can be a result of same reasons as above for exchange_name, i.e. it will mean that sharding exchange name appeared
|
|
|
|
/// to be the same as some other exchange (which purpose is not for sharding). So probably actual error reason: queue_base parameter
|
|
|
|
/// is bad.
|
2020-09-07 11:08:53 +00:00
|
|
|
throw Exception(
|
2020-12-02 01:17:50 +00:00
|
|
|
ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE,
|
|
|
|
"Unable to declare sharding exchange ({}). Reason: {}", sharding_exchange, std::string(message));
|
2020-07-21 15:47:39 +00:00
|
|
|
});
|
|
|
|
|
2021-05-01 18:00:43 +00:00
|
|
|
rabbit_channel.bindExchange(bridge_exchange, sharding_exchange, routing_keys[0])
|
2020-07-21 15:47:39 +00:00
|
|
|
.onError([&](const char * message)
|
|
|
|
{
|
2020-09-07 11:08:53 +00:00
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE,
|
|
|
|
"Unable to bind bridge exchange ({}) to sharding exchange ({}). Reason: {}",
|
|
|
|
bridge_exchange,
|
|
|
|
sharding_exchange,
|
|
|
|
std::string(message));
|
2020-07-21 15:47:39 +00:00
|
|
|
});
|
|
|
|
|
2020-08-06 13:33:46 +00:00
|
|
|
consumer_exchange = sharding_exchange;
|
2020-07-21 15:47:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-05-01 18:00:43 +00:00
|
|
|
void StorageRabbitMQ::bindExchange(RabbitMQChannel & rabbit_channel)
|
2020-07-21 15:47:39 +00:00
|
|
|
{
|
|
|
|
std::atomic<bool> binding_created = false;
|
2020-07-24 12:33:07 +00:00
|
|
|
size_t bound_keys = 0;
|
2020-07-21 15:47:39 +00:00
|
|
|
|
|
|
|
if (exchange_type == AMQP::ExchangeType::headers)
|
|
|
|
{
|
2020-08-01 12:52:00 +00:00
|
|
|
AMQP::Table bind_headers;
|
|
|
|
for (const auto & header : routing_keys)
|
|
|
|
{
|
|
|
|
std::vector<String> matching;
|
|
|
|
boost::split(matching, header, [](char c){ return c == '='; });
|
|
|
|
bind_headers[matching[0]] = matching[1];
|
|
|
|
}
|
|
|
|
|
2021-05-01 18:00:43 +00:00
|
|
|
rabbit_channel.bindExchange(exchange_name, bridge_exchange, routing_keys[0], bind_headers)
|
2020-09-07 11:08:53 +00:00
|
|
|
.onSuccess([&]() { binding_created = true; })
|
2020-07-21 15:47:39 +00:00
|
|
|
.onError([&](const char * message)
|
|
|
|
{
|
2020-09-07 11:08:53 +00:00
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE,
|
|
|
|
"Unable to bind exchange {} to bridge exchange ({}). Reason: {}",
|
2020-12-02 01:17:50 +00:00
|
|
|
exchange_name, bridge_exchange, std::string(message));
|
2020-07-21 15:47:39 +00:00
|
|
|
});
|
|
|
|
}
|
|
|
|
else if (exchange_type == AMQP::ExchangeType::fanout || exchange_type == AMQP::ExchangeType::consistent_hash)
|
|
|
|
{
|
2021-05-01 18:00:43 +00:00
|
|
|
rabbit_channel.bindExchange(exchange_name, bridge_exchange, routing_keys[0])
|
2020-09-07 11:08:53 +00:00
|
|
|
.onSuccess([&]() { binding_created = true; })
|
2020-07-21 15:47:39 +00:00
|
|
|
.onError([&](const char * message)
|
|
|
|
{
|
2020-09-07 11:08:53 +00:00
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE,
|
|
|
|
"Unable to bind exchange {} to bridge exchange ({}). Reason: {}",
|
2020-12-02 01:17:50 +00:00
|
|
|
exchange_name, bridge_exchange, std::string(message));
|
2020-07-21 15:47:39 +00:00
|
|
|
});
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
for (const auto & routing_key : routing_keys)
|
|
|
|
{
|
2021-05-01 18:00:43 +00:00
|
|
|
rabbit_channel.bindExchange(exchange_name, bridge_exchange, routing_key)
|
2020-07-21 15:47:39 +00:00
|
|
|
.onSuccess([&]()
|
|
|
|
{
|
2020-07-24 12:33:07 +00:00
|
|
|
++bound_keys;
|
|
|
|
if (bound_keys == routing_keys.size())
|
|
|
|
binding_created = true;
|
2020-07-21 15:47:39 +00:00
|
|
|
})
|
|
|
|
.onError([&](const char * message)
|
|
|
|
{
|
2020-09-07 11:08:53 +00:00
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE,
|
|
|
|
"Unable to bind exchange {} to bridge exchange ({}). Reason: {}",
|
2020-12-02 01:17:50 +00:00
|
|
|
exchange_name, bridge_exchange, std::string(message));
|
2020-07-21 15:47:39 +00:00
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
while (!binding_created)
|
|
|
|
{
|
|
|
|
event_handler->iterateLoop();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-05-01 18:00:43 +00:00
|
|
|
void StorageRabbitMQ::bindQueue(size_t queue_id, RabbitMQChannel & rabbit_channel)
|
2020-10-27 07:14:38 +00:00
|
|
|
{
|
|
|
|
std::atomic<bool> binding_created = false;
|
|
|
|
|
|
|
|
auto success_callback = [&](const std::string & queue_name, int msgcount, int /* consumercount */)
|
|
|
|
{
|
|
|
|
queues.emplace_back(queue_name);
|
|
|
|
LOG_DEBUG(log, "Queue {} is declared", queue_name);
|
|
|
|
|
|
|
|
if (msgcount)
|
|
|
|
LOG_INFO(log, "Queue {} is non-empty. Non-consumed messaged will also be delivered", queue_name);
|
|
|
|
|
|
|
|
/* Here we bind either to sharding exchange (consistent-hash) or to bridge exchange (fanout). All bindings to routing keys are
|
|
|
|
* done between client's exchange and local bridge exchange. Binding key must be a string integer in case of hash exchange, for
|
|
|
|
* fanout exchange it can be arbitrary
|
|
|
|
*/
|
2021-05-01 18:00:43 +00:00
|
|
|
rabbit_channel.bindQueue(consumer_exchange, queue_name, std::to_string(queue_id))
|
2020-10-27 07:14:38 +00:00
|
|
|
.onSuccess([&] { binding_created = true; })
|
|
|
|
.onError([&](const char * message)
|
|
|
|
{
|
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::CANNOT_CREATE_RABBITMQ_QUEUE_BINDING,
|
|
|
|
"Failed to create queue binding for exchange {}. Reason: {}", exchange_name, std::string(message));
|
|
|
|
});
|
|
|
|
};
|
|
|
|
|
|
|
|
auto error_callback([&](const char * message)
|
|
|
|
{
|
|
|
|
/* This error is most likely a result of an attempt to declare queue with different settings if it was declared before. So for a
|
|
|
|
* given queue name either deadletter_exchange parameter changed or queue_size changed, i.e. table was declared with different
|
|
|
|
* max_block_size parameter. Solution: client should specify a different queue_base parameter or manually delete previously
|
|
|
|
* declared queues via any of the various cli tools.
|
|
|
|
*/
|
|
|
|
throw Exception("Failed to declare queue. Probably queue settings are conflicting: max_block_size, deadletter_exchange. Attempt \
|
|
|
|
specifying differently those settings or use a different queue_base or manually delete previously declared queues, \
|
|
|
|
which were declared with the same names. ERROR reason: "
|
|
|
|
+ std::string(message), ErrorCodes::BAD_ARGUMENTS);
|
|
|
|
});
|
|
|
|
|
|
|
|
AMQP::Table queue_settings;
|
|
|
|
|
|
|
|
queue_settings["x-max-length"] = queue_size;
|
|
|
|
|
|
|
|
if (!deadletter_exchange.empty())
|
|
|
|
queue_settings["x-dead-letter-exchange"] = deadletter_exchange;
|
|
|
|
else
|
|
|
|
queue_settings["x-overflow"] = "reject-publish";
|
|
|
|
|
|
|
|
/* The first option not just simplifies queue_name, but also implements the possibility to be able to resume reading from one
|
|
|
|
* specific queue when its name is specified in queue_base setting
|
|
|
|
*/
|
|
|
|
const String queue_name = !hash_exchange ? queue_base : std::to_string(queue_id) + "_" + queue_base;
|
2021-05-04 16:26:47 +00:00
|
|
|
|
|
|
|
/// AMQP::autodelete setting is not allowd, because in case of server restart there will be no consumers and deleting queues should not take place.
|
|
|
|
rabbit_channel.declareQueue(queue_name, AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback);
|
2020-10-27 07:14:38 +00:00
|
|
|
|
|
|
|
while (!binding_created)
|
|
|
|
{
|
|
|
|
event_handler->iterateLoop();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-08-28 08:52:02 +00:00
|
|
|
bool StorageRabbitMQ::restoreConnection(bool reconnecting)
|
|
|
|
{
|
|
|
|
size_t cnt_retries = 0;
|
|
|
|
|
|
|
|
if (reconnecting)
|
|
|
|
{
|
|
|
|
connection->close(); /// Connection might be unusable, but not closed
|
|
|
|
|
|
|
|
/* Connection is not closed immediately (firstly, all pending operations are completed, and then
|
2020-10-27 11:04:03 +00:00
|
|
|
* an AMQP closing-handshake is performed). But cannot open a new connection until previous one is properly closed
|
2020-08-28 08:52:02 +00:00
|
|
|
*/
|
2021-02-25 14:02:34 +00:00
|
|
|
while (!connection->closed() && cnt_retries++ != RETRIES_MAX)
|
2020-08-28 08:52:02 +00:00
|
|
|
event_handler->iterateLoop();
|
|
|
|
|
|
|
|
/// This will force immediate closure if not yet closed
|
|
|
|
if (!connection->closed())
|
|
|
|
connection->close(true);
|
|
|
|
|
2020-09-03 06:14:13 +00:00
|
|
|
LOG_TRACE(log, "Trying to restore connection to " + address);
|
2020-08-28 08:52:02 +00:00
|
|
|
}
|
|
|
|
|
2020-09-07 11:08:53 +00:00
|
|
|
connection = std::make_unique<AMQP::TcpConnection>(event_handler.get(),
|
2021-02-16 21:48:26 +00:00
|
|
|
AMQP::Address(
|
|
|
|
parsed_address.first, parsed_address.second,
|
|
|
|
AMQP::Login(login_password.first, login_password.second), vhost));
|
2020-08-28 08:52:02 +00:00
|
|
|
|
|
|
|
cnt_retries = 0;
|
2021-02-25 14:02:34 +00:00
|
|
|
while (!connection->ready() && !stream_cancelled && cnt_retries++ != RETRIES_MAX)
|
2020-08-28 08:52:02 +00:00
|
|
|
{
|
|
|
|
event_handler->iterateLoop();
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP));
|
|
|
|
}
|
|
|
|
|
|
|
|
return event_handler->connectionRunning();
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-12-03 14:11:35 +00:00
|
|
|
bool StorageRabbitMQ::updateChannel(ChannelPtr & channel)
|
2020-08-28 08:52:02 +00:00
|
|
|
{
|
2020-12-02 01:17:50 +00:00
|
|
|
if (event_handler->connectionRunning())
|
2020-12-03 14:11:35 +00:00
|
|
|
{
|
2020-12-02 01:17:50 +00:00
|
|
|
channel = std::make_shared<AMQP::TcpChannel>(connection.get());
|
2020-12-03 14:11:35 +00:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
channel = nullptr;
|
|
|
|
return false;
|
2020-08-28 08:52:02 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-07-21 15:47:39 +00:00
|
|
|
void StorageRabbitMQ::unbindExchange()
|
|
|
|
{
|
2020-08-28 08:52:02 +00:00
|
|
|
/* This is needed because with RabbitMQ (without special adjustments) can't, for example, properly make mv if there was insert query
|
|
|
|
* on the same table before, and in another direction it will make redundant copies, but most likely nobody will do that.
|
|
|
|
* As publishing is done to exchange, publisher never knows to which queues the message will go, every application interested in
|
2020-08-15 06:50:53 +00:00
|
|
|
* consuming from certain exchange - declares its owns exchange-bound queues, messages go to all such exchange-bound queues, and as
|
|
|
|
* input streams are always created at startup, then they will also declare its own exchange bound queues, but they will not be visible
|
|
|
|
* externally - client declares its own exchange-bound queues, from which to consume, so this means that if not disconnecting this local
|
2020-08-28 08:52:02 +00:00
|
|
|
* queues, then messages will go both ways and in one of them they will remain not consumed. So need to disconnect local exchange
|
2020-08-31 16:34:16 +00:00
|
|
|
* bindings to remove redunadant message copies, but after that mv cannot work unless those bindings are recreated. Recreating them is
|
|
|
|
* not difficult but very ugly and as probably nobody will do such thing - bindings will not be recreated.
|
2020-08-15 06:50:53 +00:00
|
|
|
*/
|
2020-07-25 11:14:46 +00:00
|
|
|
std::call_once(flag, [&]()
|
2020-07-21 15:47:39 +00:00
|
|
|
{
|
2020-08-28 08:52:02 +00:00
|
|
|
streaming_task->deactivate();
|
2020-08-26 08:54:29 +00:00
|
|
|
event_handler->updateLoopState(Loop::STOP);
|
|
|
|
looping_task->deactivate();
|
|
|
|
|
2021-05-01 18:00:43 +00:00
|
|
|
RabbitMQChannel rabbit_channel(connection.get());
|
|
|
|
rabbit_channel.removeExchange(bridge_exchange)
|
2020-07-21 15:47:39 +00:00
|
|
|
.onSuccess([&]()
|
|
|
|
{
|
|
|
|
exchange_removed.store(true);
|
|
|
|
})
|
|
|
|
.onError([&](const char * message)
|
|
|
|
{
|
2020-09-07 10:21:29 +00:00
|
|
|
throw Exception("Unable to remove exchange. Reason: " + std::string(message), ErrorCodes::CANNOT_REMOVE_RABBITMQ_EXCHANGE);
|
2020-07-21 15:47:39 +00:00
|
|
|
});
|
|
|
|
|
2020-07-23 11:45:01 +00:00
|
|
|
while (!exchange_removed.load())
|
2020-07-21 15:47:39 +00:00
|
|
|
{
|
|
|
|
event_handler->iterateLoop();
|
|
|
|
}
|
2020-07-25 11:14:46 +00:00
|
|
|
});
|
2020-07-21 15:47:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-08-06 12:24:05 +00:00
|
|
|
Pipe StorageRabbitMQ::read(
|
2020-05-20 09:40:49 +00:00
|
|
|
const Names & column_names,
|
2020-06-24 17:32:57 +00:00
|
|
|
const StorageMetadataPtr & metadata_snapshot,
|
2020-09-20 17:52:17 +00:00
|
|
|
SelectQueryInfo & /* query_info */,
|
2021-04-10 23:33:54 +00:00
|
|
|
ContextPtr local_context,
|
2020-05-20 09:40:49 +00:00
|
|
|
QueryProcessingStage::Enum /* processed_stage */,
|
|
|
|
size_t /* max_block_size */,
|
|
|
|
unsigned /* num_streams */)
|
|
|
|
{
|
2020-12-04 14:50:42 +00:00
|
|
|
if (!rabbit_is_ready)
|
|
|
|
throw Exception("RabbitMQ setup not finished. Connection might be lost", ErrorCodes::CANNOT_CONNECT_RABBITMQ);
|
|
|
|
|
2020-05-20 09:40:49 +00:00
|
|
|
if (num_created_consumers == 0)
|
|
|
|
return {};
|
|
|
|
|
2020-08-28 08:52:02 +00:00
|
|
|
auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
|
2021-04-10 23:33:54 +00:00
|
|
|
auto modified_context = addSettings(local_context);
|
2020-08-31 16:34:16 +00:00
|
|
|
auto block_size = getMaxBlockSize();
|
2020-08-28 08:52:02 +00:00
|
|
|
|
2020-09-03 06:14:13 +00:00
|
|
|
if (!event_handler->connectionRunning())
|
2020-08-28 08:52:02 +00:00
|
|
|
{
|
|
|
|
if (event_handler->loopRunning())
|
2020-08-31 16:34:16 +00:00
|
|
|
deactivateTask(looping_task, false, true);
|
2020-12-03 14:11:35 +00:00
|
|
|
restoreConnection(true);
|
2020-08-28 08:52:02 +00:00
|
|
|
}
|
|
|
|
|
2020-05-20 09:40:49 +00:00
|
|
|
Pipes pipes;
|
|
|
|
pipes.reserve(num_created_consumers);
|
|
|
|
|
|
|
|
for (size_t i = 0; i < num_created_consumers; ++i)
|
|
|
|
{
|
2020-08-31 16:34:16 +00:00
|
|
|
auto rabbit_stream = std::make_shared<RabbitMQBlockInputStream>(
|
|
|
|
*this, metadata_snapshot, modified_context, column_names, block_size);
|
2020-08-28 08:52:02 +00:00
|
|
|
|
2020-07-02 14:35:10 +00:00
|
|
|
auto converting_stream = std::make_shared<ConvertingBlockInputStream>(
|
|
|
|
rabbit_stream, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Name);
|
|
|
|
pipes.emplace_back(std::make_shared<SourceFromInputStream>(converting_stream));
|
2020-05-20 09:40:49 +00:00
|
|
|
}
|
|
|
|
|
2020-08-28 08:52:02 +00:00
|
|
|
if (!event_handler->loopRunning() && event_handler->connectionRunning())
|
2020-06-27 17:26:00 +00:00
|
|
|
looping_task->activateAndSchedule();
|
2020-06-14 16:26:37 +00:00
|
|
|
|
2020-06-27 17:26:00 +00:00
|
|
|
LOG_DEBUG(log, "Starting reading {} streams", pipes.size());
|
2020-11-24 21:07:56 +00:00
|
|
|
auto united_pipe = Pipe::unitePipes(std::move(pipes));
|
|
|
|
united_pipe.addInterpreterContext(modified_context);
|
|
|
|
return united_pipe;
|
2020-05-20 09:40:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
BlockOutputStreamPtr StorageRabbitMQ::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
|
2020-06-01 15:37:23 +00:00
|
|
|
{
|
2021-04-10 23:33:54 +00:00
|
|
|
return std::make_shared<RabbitMQBlockOutputStream>(*this, metadata_snapshot, local_context);
|
2020-06-01 15:37:23 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-05-20 09:40:49 +00:00
|
|
|
void StorageRabbitMQ::startup()
|
|
|
|
{
|
2020-12-02 01:17:50 +00:00
|
|
|
if (event_handler->connectionRunning())
|
|
|
|
initRabbitMQ();
|
|
|
|
else
|
|
|
|
connection_task->activateAndSchedule();
|
2020-10-27 07:14:38 +00:00
|
|
|
|
2020-05-20 09:40:49 +00:00
|
|
|
for (size_t i = 0; i < num_consumers; ++i)
|
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
|
|
|
pushReadBuffer(createReadBuffer());
|
|
|
|
++num_created_consumers;
|
|
|
|
}
|
2020-05-31 09:34:57 +00:00
|
|
|
catch (const AMQP::Exception & e)
|
2020-05-20 09:40:49 +00:00
|
|
|
{
|
2020-09-07 11:08:53 +00:00
|
|
|
LOG_ERROR(log, "Got AMQ exception {}", e.what());
|
2020-05-31 09:34:57 +00:00
|
|
|
throw;
|
2020-05-20 09:40:49 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-08-26 08:54:29 +00:00
|
|
|
event_handler->updateLoopState(Loop::RUN);
|
2020-06-24 21:14:49 +00:00
|
|
|
streaming_task->activateAndSchedule();
|
2020-05-20 09:40:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void StorageRabbitMQ::shutdown()
|
|
|
|
{
|
|
|
|
stream_cancelled = true;
|
2020-09-07 11:08:53 +00:00
|
|
|
wait_confirm = false;
|
2020-05-20 09:40:49 +00:00
|
|
|
|
2021-05-04 16:26:47 +00:00
|
|
|
/// In case it has not yet been able to setup connection;
|
|
|
|
deactivateTask(connection_task, true, false);
|
|
|
|
|
|
|
|
/// The order of deactivating tasks is important: wait for streamingToViews() func to finish and
|
|
|
|
/// then wait for background event loop to finish.
|
2020-08-31 16:34:16 +00:00
|
|
|
deactivateTask(streaming_task, true, false);
|
|
|
|
deactivateTask(looping_task, true, true);
|
2020-08-28 08:52:02 +00:00
|
|
|
|
2021-05-04 16:26:47 +00:00
|
|
|
if (drop_table)
|
|
|
|
{
|
|
|
|
for (auto & buffer : buffers)
|
|
|
|
buffer->closeChannel();
|
|
|
|
cleanupRabbitMQ();
|
|
|
|
}
|
|
|
|
|
|
|
|
/// It is important to close connection here - before removing consumer buffers, because
|
|
|
|
/// it will finish and clean callbacks, which might use those buffers data.
|
2020-08-15 06:50:53 +00:00
|
|
|
connection->close();
|
|
|
|
|
2021-05-04 16:26:47 +00:00
|
|
|
/// Connection is not closed immediately - it requires the loop to shutdown it properly and to
|
|
|
|
/// finish all callbacks.
|
2020-08-15 06:50:53 +00:00
|
|
|
size_t cnt_retries = 0;
|
2021-02-25 14:02:34 +00:00
|
|
|
while (!connection->closed() && cnt_retries++ != RETRIES_MAX)
|
2020-08-15 06:50:53 +00:00
|
|
|
event_handler->iterateLoop();
|
2020-06-27 17:26:00 +00:00
|
|
|
|
2020-08-28 08:52:02 +00:00
|
|
|
/// Should actually force closure, if not yet closed, but it generates distracting error logs
|
2020-08-15 06:50:53 +00:00
|
|
|
//if (!connection->closed())
|
|
|
|
// connection->close(true);
|
|
|
|
|
|
|
|
for (size_t i = 0; i < num_created_consumers; ++i)
|
|
|
|
popReadBuffer();
|
2020-05-20 09:40:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-05-04 16:26:47 +00:00
|
|
|
/// The only thing publishers are supposed to be aware of is _exchanges_ and queues are a responsibility of a consumer.
|
|
|
|
/// Therefore, if a table is droppped, a clean up is needed.
|
|
|
|
void StorageRabbitMQ::cleanupRabbitMQ() const
|
|
|
|
{
|
|
|
|
RabbitMQChannel rabbit_channel(connection.get());
|
|
|
|
for (const auto & queue : queues)
|
|
|
|
{
|
|
|
|
/// AMQP::ifunused is needed, because it is possible to share queues between multiple tables and dropping
|
|
|
|
/// on of them should not affect others.
|
|
|
|
/// AMQP::ifempty is not used on purpose.
|
|
|
|
|
|
|
|
rabbit_channel.removeQueue(queue, AMQP::ifunused)
|
|
|
|
.onSuccess([&](uint32_t num_messages)
|
|
|
|
{
|
|
|
|
LOG_TRACE(log, "Successfully deleted queue {}, messages contained {}", queue, num_messages);
|
|
|
|
event_handler->stopLoop();
|
|
|
|
})
|
|
|
|
.onError([&](const char * message)
|
|
|
|
{
|
|
|
|
LOG_ERROR(log, "Failed to delete queue {}. Error message: {}", queue, message);
|
|
|
|
event_handler->stopLoop();
|
|
|
|
});
|
|
|
|
}
|
|
|
|
event_handler->startBlockingLoop();
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-05-20 09:40:49 +00:00
|
|
|
void StorageRabbitMQ::pushReadBuffer(ConsumerBufferPtr buffer)
|
|
|
|
{
|
2020-09-07 11:08:53 +00:00
|
|
|
std::lock_guard lock(buffers_mutex);
|
2020-05-20 09:40:49 +00:00
|
|
|
buffers.push_back(buffer);
|
|
|
|
semaphore.set();
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
ConsumerBufferPtr StorageRabbitMQ::popReadBuffer()
|
|
|
|
{
|
|
|
|
return popReadBuffer(std::chrono::milliseconds::zero());
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
ConsumerBufferPtr StorageRabbitMQ::popReadBuffer(std::chrono::milliseconds timeout)
|
|
|
|
{
|
|
|
|
// Wait for the first free buffer
|
|
|
|
if (timeout == std::chrono::milliseconds::zero())
|
|
|
|
semaphore.wait();
|
|
|
|
else
|
|
|
|
{
|
|
|
|
if (!semaphore.tryWait(timeout.count()))
|
|
|
|
return nullptr;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Take the first available buffer from the list
|
2020-09-07 11:08:53 +00:00
|
|
|
std::lock_guard lock(buffers_mutex);
|
2020-05-20 09:40:49 +00:00
|
|
|
auto buffer = buffers.back();
|
|
|
|
buffers.pop_back();
|
|
|
|
|
|
|
|
return buffer;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
ConsumerBufferPtr StorageRabbitMQ::createReadBuffer()
|
|
|
|
{
|
2020-12-02 01:17:50 +00:00
|
|
|
ChannelPtr consumer_channel;
|
|
|
|
if (event_handler->connectionRunning())
|
|
|
|
consumer_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
|
2020-05-31 08:39:22 +00:00
|
|
|
|
2020-06-24 21:14:49 +00:00
|
|
|
return std::make_shared<ReadBufferFromRabbitMQConsumer>(
|
2020-10-27 20:28:52 +00:00
|
|
|
consumer_channel, event_handler, queues, ++consumer_id,
|
|
|
|
unique_strbase, log, row_delimiter, queue_size, stream_cancelled);
|
2020-05-20 09:40:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-06-01 15:37:23 +00:00
|
|
|
ProducerBufferPtr StorageRabbitMQ::createWriteBuffer()
|
|
|
|
{
|
2020-06-24 21:14:49 +00:00
|
|
|
return std::make_shared<WriteBufferToRabbitMQProducer>(
|
2021-04-10 23:33:54 +00:00
|
|
|
parsed_address, getContext(), login_password, vhost, routing_keys, exchange_name, exchange_type,
|
2020-08-31 16:34:16 +00:00
|
|
|
producer_id.fetch_add(1), persistent, wait_confirm, log,
|
2020-06-24 21:14:49 +00:00
|
|
|
row_delimiter ? std::optional<char>{row_delimiter} : std::nullopt, 1, 1024);
|
2020-06-01 15:37:23 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-05-20 09:40:49 +00:00
|
|
|
bool StorageRabbitMQ::checkDependencies(const StorageID & table_id)
|
|
|
|
{
|
|
|
|
// Check if all dependencies are attached
|
|
|
|
auto dependencies = DatabaseCatalog::instance().getDependencies(table_id);
|
|
|
|
if (dependencies.empty())
|
|
|
|
return true;
|
|
|
|
|
|
|
|
// Check the dependencies are ready?
|
|
|
|
for (const auto & db_tab : dependencies)
|
|
|
|
{
|
2021-04-10 23:33:54 +00:00
|
|
|
auto table = DatabaseCatalog::instance().tryGetTable(db_tab, getContext());
|
2020-05-20 09:40:49 +00:00
|
|
|
if (!table)
|
|
|
|
return false;
|
|
|
|
|
|
|
|
// If it materialized view, check it's target table
|
|
|
|
auto * materialized_view = dynamic_cast<StorageMaterializedView *>(table.get());
|
|
|
|
if (materialized_view && !materialized_view->tryGetTargetTable())
|
|
|
|
return false;
|
|
|
|
|
|
|
|
// Check all its dependencies
|
|
|
|
if (!checkDependencies(db_tab))
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-09-07 11:08:53 +00:00
|
|
|
void StorageRabbitMQ::streamingToViewsFunc()
|
2020-05-20 09:40:49 +00:00
|
|
|
{
|
2020-12-04 14:50:42 +00:00
|
|
|
if (rabbit_is_ready && (event_handler->connectionRunning() || restoreConnection(true)))
|
2020-05-20 09:40:49 +00:00
|
|
|
{
|
2020-12-02 01:17:50 +00:00
|
|
|
try
|
2020-05-20 09:40:49 +00:00
|
|
|
{
|
2020-12-02 01:17:50 +00:00
|
|
|
auto table_id = getStorageID();
|
2020-05-20 09:40:49 +00:00
|
|
|
|
2020-12-02 01:17:50 +00:00
|
|
|
// Check if at least one direct dependency is attached
|
|
|
|
size_t dependencies_count = DatabaseCatalog::instance().getDependencies(table_id).size();
|
2020-05-20 09:40:49 +00:00
|
|
|
|
2020-12-02 01:17:50 +00:00
|
|
|
if (dependencies_count)
|
|
|
|
{
|
|
|
|
auto start_time = std::chrono::steady_clock::now();
|
2020-10-25 07:44:04 +00:00
|
|
|
|
2020-12-02 01:17:50 +00:00
|
|
|
// Keep streaming as long as there are attached views and streaming is not cancelled
|
|
|
|
while (!stream_cancelled && num_created_consumers > 0)
|
2020-10-25 07:44:04 +00:00
|
|
|
{
|
2020-12-02 01:17:50 +00:00
|
|
|
if (!checkDependencies(table_id))
|
|
|
|
break;
|
|
|
|
|
|
|
|
LOG_DEBUG(log, "Started streaming to {} attached views", dependencies_count);
|
|
|
|
|
|
|
|
if (streamToViews())
|
|
|
|
break;
|
|
|
|
|
|
|
|
auto end_time = std::chrono::steady_clock::now();
|
|
|
|
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
|
|
|
|
if (duration.count() > MAX_THREAD_WORK_DURATION_MS)
|
|
|
|
{
|
|
|
|
event_handler->updateLoopState(Loop::STOP);
|
|
|
|
LOG_TRACE(log, "Reschedule streaming. Thread work duration limit exceeded.");
|
|
|
|
break;
|
|
|
|
}
|
2020-10-25 07:44:04 +00:00
|
|
|
}
|
2020-05-20 09:40:49 +00:00
|
|
|
}
|
|
|
|
}
|
2020-12-02 01:17:50 +00:00
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
|
|
}
|
2020-05-20 09:40:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Wait for attached views
|
|
|
|
if (!stream_cancelled)
|
2020-10-27 07:20:19 +00:00
|
|
|
streaming_task->scheduleAfter(RESCHEDULE_MS);
|
2020-05-20 09:40:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
bool StorageRabbitMQ::streamToViews()
|
|
|
|
{
|
|
|
|
auto table_id = getStorageID();
|
2021-04-10 23:33:54 +00:00
|
|
|
auto table = DatabaseCatalog::instance().getTable(table_id, getContext());
|
2020-05-20 09:40:49 +00:00
|
|
|
if (!table)
|
|
|
|
throw Exception("Engine table " + table_id.getNameForLogs() + " doesn't exist.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
// Create an INSERT query for streaming data
|
|
|
|
auto insert = std::make_shared<ASTInsertQuery>();
|
|
|
|
insert->table_id = table_id;
|
|
|
|
|
2020-08-28 08:52:02 +00:00
|
|
|
// Only insert into dependent views and expect that input blocks contain virtual columns
|
2021-04-10 23:33:54 +00:00
|
|
|
InterpreterInsertQuery interpreter(insert, rabbitmq_context, false, true, true);
|
2020-05-20 09:40:49 +00:00
|
|
|
auto block_io = interpreter.execute();
|
|
|
|
|
2020-08-28 08:52:02 +00:00
|
|
|
auto metadata_snapshot = getInMemoryMetadataPtr();
|
|
|
|
auto column_names = block_io.out->getHeader().getNames();
|
|
|
|
auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
|
|
|
|
|
2020-08-31 16:34:16 +00:00
|
|
|
auto block_size = getMaxBlockSize();
|
|
|
|
|
2020-05-20 09:40:49 +00:00
|
|
|
// Create a stream for each consumer and join them in a union stream
|
|
|
|
BlockInputStreams streams;
|
|
|
|
streams.reserve(num_created_consumers);
|
|
|
|
|
|
|
|
for (size_t i = 0; i < num_created_consumers; ++i)
|
|
|
|
{
|
2020-08-31 16:34:16 +00:00
|
|
|
auto stream = std::make_shared<RabbitMQBlockInputStream>(
|
|
|
|
*this, metadata_snapshot, rabbitmq_context, column_names, block_size, false);
|
2020-08-28 08:52:02 +00:00
|
|
|
streams.emplace_back(stream);
|
2020-05-20 09:40:49 +00:00
|
|
|
|
|
|
|
// Limit read batch to maximum block size to allow DDL
|
2020-09-15 10:40:39 +00:00
|
|
|
StreamLocalLimits limits;
|
2020-08-28 08:52:02 +00:00
|
|
|
|
2020-08-31 16:34:16 +00:00
|
|
|
limits.speed_limits.max_execution_time = rabbitmq_settings->rabbitmq_flush_interval_ms.changed
|
|
|
|
? rabbitmq_settings->rabbitmq_flush_interval_ms
|
2021-04-10 23:33:54 +00:00
|
|
|
: getContext()->getSettingsRef().stream_flush_interval_ms;
|
2020-08-31 16:34:16 +00:00
|
|
|
|
2020-05-20 09:40:49 +00:00
|
|
|
limits.timeout_overflow_mode = OverflowMode::BREAK;
|
2020-08-28 08:52:02 +00:00
|
|
|
|
|
|
|
stream->setLimits(limits);
|
2020-05-20 09:40:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Join multiple streams if necessary
|
|
|
|
BlockInputStreamPtr in;
|
|
|
|
if (streams.size() > 1)
|
|
|
|
in = std::make_shared<UnionBlockInputStream>(streams, nullptr, streams.size());
|
|
|
|
else
|
|
|
|
in = streams[0];
|
|
|
|
|
|
|
|
std::atomic<bool> stub = {false};
|
2020-10-25 07:44:04 +00:00
|
|
|
|
|
|
|
if (!event_handler->loopRunning())
|
|
|
|
{
|
|
|
|
event_handler->updateLoopState(Loop::RUN);
|
|
|
|
looping_task->activateAndSchedule();
|
|
|
|
}
|
|
|
|
|
2020-05-20 09:40:49 +00:00
|
|
|
copyData(*in, *block_io.out, &stub);
|
|
|
|
|
2020-10-25 07:44:04 +00:00
|
|
|
/* Note: sending ack() with loop running in another thread will lead to a lot of data races inside the library, but only in case
|
|
|
|
* error occurs or connection is lost while ack is being sent
|
2020-08-28 08:52:02 +00:00
|
|
|
*/
|
2020-10-25 07:44:04 +00:00
|
|
|
deactivateTask(looping_task, false, true);
|
|
|
|
size_t queue_empty = 0;
|
2020-08-28 08:52:02 +00:00
|
|
|
|
|
|
|
if (!event_handler->connectionRunning())
|
|
|
|
{
|
2020-10-25 07:44:04 +00:00
|
|
|
if (stream_cancelled)
|
|
|
|
return true;
|
|
|
|
|
|
|
|
if (restoreConnection(true))
|
2020-08-28 08:52:02 +00:00
|
|
|
{
|
|
|
|
for (auto & stream : streams)
|
|
|
|
stream->as<RabbitMQBlockInputStream>()->updateChannel();
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2020-10-25 07:44:04 +00:00
|
|
|
LOG_TRACE(log, "Reschedule streaming. Unable to restore connection.");
|
|
|
|
return true;
|
2020-08-28 08:52:02 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
/// Commit
|
|
|
|
for (auto & stream : streams)
|
|
|
|
{
|
2020-10-25 07:44:04 +00:00
|
|
|
if (stream->as<RabbitMQBlockInputStream>()->queueEmpty())
|
|
|
|
++queue_empty;
|
|
|
|
|
2020-12-05 21:55:00 +00:00
|
|
|
if (stream->as<RabbitMQBlockInputStream>()->needChannelUpdate())
|
|
|
|
{
|
|
|
|
auto buffer = stream->as<RabbitMQBlockInputStream>()->getBuffer();
|
|
|
|
if (buffer)
|
|
|
|
{
|
|
|
|
if (buffer->queuesCount() != queues.size())
|
|
|
|
buffer->updateQueues(queues);
|
|
|
|
|
|
|
|
buffer->updateAckTracker();
|
|
|
|
|
|
|
|
if (updateChannel(buffer->getChannel()))
|
|
|
|
buffer->setupChannel();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-08-31 16:34:16 +00:00
|
|
|
/* false is returned by the sendAck function in only two cases:
|
|
|
|
* 1) if connection failed. In this case all channels will be closed and will be unable to send ack. Also ack is made based on
|
|
|
|
* delivery tags, which are unique to channels, so if channels fail, those delivery tags will become invalid and there is
|
|
|
|
* no way to send specific ack from a different channel. Actually once the server realises that it has messages in a queue
|
|
|
|
* waiting for confirm from a channel which suddenly closed, it will immediately make those messages accessible to other
|
|
|
|
* consumers. So in this case duplicates are inevitable.
|
|
|
|
* 2) size of the sent frame (libraries's internal request interface) exceeds max frame - internal library error. This is more
|
|
|
|
* common for message frames, but not likely to happen to ack frame I suppose. So I do not believe it is likely to happen.
|
|
|
|
* Also in this case if channel didn't get closed - it is ok if failed to send ack, because the next attempt to send ack on
|
|
|
|
* the same channel will also commit all previously not-committed messages. Anyway I do not think that for ack frame this
|
|
|
|
* will ever happen.
|
|
|
|
*/
|
2020-08-28 08:52:02 +00:00
|
|
|
if (!stream->as<RabbitMQBlockInputStream>()->sendAck())
|
|
|
|
{
|
2020-09-03 06:14:13 +00:00
|
|
|
/// Iterate loop to activate error callbacks if they happened
|
|
|
|
event_handler->iterateLoop();
|
2020-12-03 14:11:35 +00:00
|
|
|
if (!event_handler->connectionRunning())
|
2020-08-28 08:52:02 +00:00
|
|
|
break;
|
|
|
|
}
|
2020-10-25 07:44:04 +00:00
|
|
|
|
|
|
|
event_handler->iterateLoop();
|
2020-08-28 08:52:02 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-10-27 07:14:38 +00:00
|
|
|
if ((queue_empty == num_created_consumers) && (++read_attempts == MAX_FAILED_READ_ATTEMPTS))
|
2020-10-25 07:44:04 +00:00
|
|
|
{
|
|
|
|
connection->heartbeat();
|
|
|
|
read_attempts = 0;
|
|
|
|
LOG_TRACE(log, "Reschedule streaming. Queues are empty.");
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
event_handler->updateLoopState(Loop::RUN);
|
|
|
|
looping_task->activateAndSchedule();
|
|
|
|
}
|
2020-05-20 09:40:49 +00:00
|
|
|
|
2020-10-25 07:44:04 +00:00
|
|
|
return false;
|
2020-05-20 05:30:38 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void registerStorageRabbitMQ(StorageFactory & factory)
|
|
|
|
{
|
|
|
|
auto creator_fn = [](const StorageFactory::Arguments & args)
|
|
|
|
{
|
|
|
|
ASTs & engine_args = args.engine_args;
|
|
|
|
size_t args_count = engine_args.size();
|
|
|
|
bool has_settings = args.storage_def->settings;
|
|
|
|
|
2020-08-31 09:12:36 +00:00
|
|
|
auto rabbitmq_settings = std::make_unique<RabbitMQSettings>();
|
2020-05-20 05:30:38 +00:00
|
|
|
if (has_settings)
|
2020-08-31 09:12:36 +00:00
|
|
|
rabbitmq_settings->loadFromQuery(*args.storage_def);
|
2020-05-20 05:30:38 +00:00
|
|
|
|
2020-08-31 09:12:36 +00:00
|
|
|
// Check arguments and settings
|
|
|
|
#define CHECK_RABBITMQ_STORAGE_ARGUMENT(ARG_NUM, ARG_NAME) \
|
|
|
|
/* One of the three required arguments is not specified */ \
|
|
|
|
if (args_count < (ARG_NUM) && (ARG_NUM) <= 3 && !rabbitmq_settings->ARG_NAME.changed) \
|
|
|
|
{ \
|
|
|
|
throw Exception("Required parameter '" #ARG_NAME "' for storage RabbitMQ not specified", \
|
|
|
|
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); \
|
|
|
|
} \
|
|
|
|
if (args_count >= (ARG_NUM)) \
|
|
|
|
{ \
|
|
|
|
if (rabbitmq_settings->ARG_NAME.changed) /* The same argument is given in two places */ \
|
|
|
|
{ \
|
|
|
|
throw Exception("The argument №" #ARG_NUM " of storage RabbitMQ " \
|
|
|
|
"and the parameter '" #ARG_NAME "' is duplicated", ErrorCodes::BAD_ARGUMENTS); \
|
|
|
|
} \
|
2020-05-20 05:30:38 +00:00
|
|
|
}
|
|
|
|
|
2020-08-31 09:12:36 +00:00
|
|
|
CHECK_RABBITMQ_STORAGE_ARGUMENT(1, rabbitmq_host_port)
|
|
|
|
CHECK_RABBITMQ_STORAGE_ARGUMENT(2, rabbitmq_exchange_name)
|
|
|
|
CHECK_RABBITMQ_STORAGE_ARGUMENT(3, rabbitmq_format)
|
2020-06-11 09:23:23 +00:00
|
|
|
|
2020-08-31 09:12:36 +00:00
|
|
|
CHECK_RABBITMQ_STORAGE_ARGUMENT(4, rabbitmq_exchange_type)
|
|
|
|
CHECK_RABBITMQ_STORAGE_ARGUMENT(5, rabbitmq_routing_key_list)
|
|
|
|
CHECK_RABBITMQ_STORAGE_ARGUMENT(6, rabbitmq_row_delimiter)
|
|
|
|
CHECK_RABBITMQ_STORAGE_ARGUMENT(7, rabbitmq_schema)
|
|
|
|
CHECK_RABBITMQ_STORAGE_ARGUMENT(8, rabbitmq_num_consumers)
|
|
|
|
CHECK_RABBITMQ_STORAGE_ARGUMENT(9, rabbitmq_num_queues)
|
|
|
|
CHECK_RABBITMQ_STORAGE_ARGUMENT(10, rabbitmq_queue_base)
|
|
|
|
CHECK_RABBITMQ_STORAGE_ARGUMENT(11, rabbitmq_deadletter_exchange)
|
|
|
|
CHECK_RABBITMQ_STORAGE_ARGUMENT(12, rabbitmq_persistent)
|
2020-05-20 05:30:38 +00:00
|
|
|
|
2020-08-31 16:34:16 +00:00
|
|
|
CHECK_RABBITMQ_STORAGE_ARGUMENT(13, rabbitmq_skip_broken_messages)
|
|
|
|
CHECK_RABBITMQ_STORAGE_ARGUMENT(14, rabbitmq_max_block_size)
|
|
|
|
CHECK_RABBITMQ_STORAGE_ARGUMENT(15, rabbitmq_flush_interval_ms)
|
2020-05-20 05:30:38 +00:00
|
|
|
|
2021-04-21 15:51:05 +00:00
|
|
|
CHECK_RABBITMQ_STORAGE_ARGUMENT(16, rabbitmq_vhost)
|
|
|
|
|
2020-08-31 09:12:36 +00:00
|
|
|
#undef CHECK_RABBITMQ_STORAGE_ARGUMENT
|
2020-05-20 05:30:38 +00:00
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
return StorageRabbitMQ::create(args.table_id, args.getContext(), args.columns, std::move(rabbitmq_settings));
|
2020-05-20 05:30:38 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
factory.registerStorage("RabbitMQ", creator_fn, StorageFactory::StorageFeatures{ .supports_settings = true, });
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
NamesAndTypesList StorageRabbitMQ::getVirtuals() const
|
|
|
|
{
|
|
|
|
return NamesAndTypesList{
|
2020-07-20 10:05:00 +00:00
|
|
|
{"_exchange_name", std::make_shared<DataTypeString>()},
|
2020-08-15 06:50:53 +00:00
|
|
|
{"_channel_id", std::make_shared<DataTypeString>()},
|
2020-07-20 10:05:00 +00:00
|
|
|
{"_delivery_tag", std::make_shared<DataTypeUInt64>()},
|
2020-08-15 14:38:29 +00:00
|
|
|
{"_redelivered", std::make_shared<DataTypeUInt8>()},
|
2020-10-27 20:28:52 +00:00
|
|
|
{"_message_id", std::make_shared<DataTypeString>()},
|
|
|
|
{"_timestamp", std::make_shared<DataTypeUInt64>()}
|
2020-05-20 05:30:38 +00:00
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|