#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { static const auto CONNECT_SLEEP = 200; static const auto RETRIES_MAX = 20; static const auto HEARTBEAT_RESCHEDULE_MS = 3000; namespace ErrorCodes { extern const int LOGICAL_ERROR; extern const int BAD_ARGUMENTS; extern const int CANNOT_CONNECT_RABBITMQ; } namespace ExchangeType { /// Note that default here means default by implementation and not by rabbitmq settings static const String DEFAULT = "default"; static const String FANOUT = "fanout"; static const String DIRECT = "direct"; static const String TOPIC = "topic"; static const String HASH = "consistent_hash"; static const String HEADERS = "headers"; } StorageRabbitMQ::StorageRabbitMQ( const StorageID & table_id_, Context & context_, const ColumnsDescription & columns_, const String & host_port_, const Names & routing_keys_, const String & exchange_name_, const String & format_name_, char row_delimiter_, const String & exchange_type_, size_t num_consumers_, size_t num_queues_, const bool use_transactional_channel_, const String & queue_base_, const String & deadletter_exchange_, const bool persistent_) : IStorage(table_id_) , global_context(context_.getGlobalContext()) , rabbitmq_context(Context(global_context)) , routing_keys(global_context.getMacros()->expand(routing_keys_)) , exchange_name(exchange_name_) , format_name(global_context.getMacros()->expand(format_name_)) , row_delimiter(row_delimiter_) , num_consumers(num_consumers_) , num_queues(num_queues_) , use_transactional_channel(use_transactional_channel_) , queue_base(queue_base_) , deadletter_exchange(deadletter_exchange_) , persistent(persistent_) , log(&Poco::Logger::get("StorageRabbitMQ (" + table_id_.table_name + ")")) , parsed_address(parseAddress(global_context.getMacros()->expand(host_port_), 5672)) , login_password(std::make_pair( global_context.getConfigRef().getString("rabbitmq.username"), global_context.getConfigRef().getString("rabbitmq.password"))) , semaphore(0, num_consumers_) { loop = std::make_unique(); uv_loop_init(loop.get()); event_handler = std::make_shared(loop.get(), log); connection = std::make_shared(event_handler.get(), AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login(login_password.first, login_password.second), "/")); size_t cnt_retries = 0; while (!connection->ready() && ++cnt_retries != RETRIES_MAX) { event_handler->iterateLoop(); std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP)); } if (!connection->ready()) { uv_loop_close(loop.get()); throw Exception("Cannot set up connection for consumers", ErrorCodes::CANNOT_CONNECT_RABBITMQ); } rabbitmq_context.makeQueryContext(); StorageInMemoryMetadata storage_metadata; storage_metadata.setColumns(columns_); setInMemoryMetadata(storage_metadata); streaming_task = global_context.getSchedulePool().createTask("RabbitMQStreamingTask", [this]{ threadFunc(); }); streaming_task->deactivate(); heartbeat_task = global_context.getSchedulePool().createTask("RabbitMQHeartbeatTask", [this]{ heartbeatFunc(); }); heartbeat_task->deactivate(); hash_exchange = num_consumers > 1 || num_queues > 1; if (exchange_type_ != ExchangeType::DEFAULT) { if (exchange_type_ == ExchangeType::FANOUT) exchange_type = AMQP::ExchangeType::fanout; else if (exchange_type_ == ExchangeType::DIRECT) exchange_type = AMQP::ExchangeType::direct; else if (exchange_type_ == ExchangeType::TOPIC) exchange_type = AMQP::ExchangeType::topic; else if (exchange_type_ == ExchangeType::HASH) exchange_type = AMQP::ExchangeType::consistent_hash; else if (exchange_type_ == ExchangeType::HEADERS) exchange_type = AMQP::ExchangeType::headers; else throw Exception("Invalid exchange type", ErrorCodes::BAD_ARGUMENTS); } else { exchange_type = AMQP::ExchangeType::fanout; } auto table_id = getStorageID(); String table_name = table_id.table_name; if (queue_base.empty()) { /// Make sure that local exchange name is unique for each table and is not the same as client's exchange name sharding_exchange = exchange_name + "_" + table_name; /* By default without a specified queue name in queue's declaration - its name will be generated by the library, but its better * to specify it unique for each table to reuse them once the table is recreated. So it means that queues remain the same for every * table unless queue_base table setting is specified (which allows to register consumers to specific queues). Now this is a base * for the names of later declared queue (as everything is based on names). */ queue_base = "queue_" + table_name; } else { /* In case different tables are used to register multiple consumers to the same queues (so queues are shared between tables) and * at the same time sharding exchange is needed (if there are multiple shared queues), then those tables also need * to share sharding exchange. */ sharding_exchange = exchange_name + queue_base; } bridge_exchange = sharding_exchange + "_bridge"; /// One looping task for all consumers as they share the same connection == the same handler == the same event loop looping_task = global_context.getSchedulePool().createTask("RabbitMQLoopingTask", [this]{ loopingFunc(); }); looping_task->deactivate(); } void StorageRabbitMQ::heartbeatFunc() { if (!stream_cancelled && event_handler->connectionRunning()) { LOG_TRACE(log, "Sending RabbitMQ heartbeat"); connection->heartbeat(); heartbeat_task->scheduleAfter(HEARTBEAT_RESCHEDULE_MS); } } void StorageRabbitMQ::loopingFunc() { if (event_handler->connectionRunning()) { LOG_DEBUG(log, "Starting event looping iterations"); event_handler->startLoop(); } } void StorageRabbitMQ::initExchange() { /* Declare client's exchange of the specified type and bind it to hash-exchange (if it is not already hash-exchange), which * will evenly distribute messages between all consumers. */ setup_channel->declareExchange(exchange_name, exchange_type, AMQP::durable) .onError([&](const char * message) { throw Exception("Unable to declare exchange. Make sure specified exchange is not already declared. Error: " + std::string(message), ErrorCodes::CANNOT_CONNECT_RABBITMQ); }); /// Bridge exchange is needed to easily disconnect consumer queues. Also simplifies queue bindings a lot. setup_channel->declareExchange(bridge_exchange, AMQP::fanout, AMQP::durable + AMQP::autodelete) .onError([&](const char * message) { throw Exception("Unable to declare exchange. Reason: " + std::string(message), ErrorCodes::CANNOT_CONNECT_RABBITMQ); }); if (!hash_exchange) { consumer_exchange = bridge_exchange; return; } /// Declare exchange for sharding. AMQP::Table binding_arguments; binding_arguments["hash-property"] = "message_id"; setup_channel->declareExchange(sharding_exchange, AMQP::consistent_hash, AMQP::durable + AMQP::autodelete, binding_arguments) .onError([&](const char * message) { throw Exception("Unable to declare exchange. Reason: " + std::string(message), ErrorCodes::CANNOT_CONNECT_RABBITMQ); }); setup_channel->bindExchange(bridge_exchange, sharding_exchange, routing_keys[0]) .onError([&](const char * message) { throw Exception("Unable to bind exchange. Reason: " + std::string(message), ErrorCodes::CANNOT_CONNECT_RABBITMQ); }); consumer_exchange = sharding_exchange; } void StorageRabbitMQ::bindExchange() { std::atomic binding_created = false; size_t bound_keys = 0; if (exchange_type == AMQP::ExchangeType::headers) { AMQP::Table bind_headers; for (const auto & header : routing_keys) { std::vector matching; boost::split(matching, header, [](char c){ return c == '='; }); bind_headers[matching[0]] = matching[1]; } setup_channel->bindExchange(exchange_name, bridge_exchange, routing_keys[0], bind_headers) .onSuccess([&]() { binding_created = true; }) .onError([&](const char * message) { throw Exception("Unable to bind exchange. Reason: " + std::string(message), ErrorCodes::CANNOT_CONNECT_RABBITMQ); }); } else if (exchange_type == AMQP::ExchangeType::fanout || exchange_type == AMQP::ExchangeType::consistent_hash) { setup_channel->bindExchange(exchange_name, bridge_exchange, routing_keys[0]) .onSuccess([&]() { binding_created = true; }) .onError([&](const char * message) { throw Exception("Unable to bind exchange. Reason: " + std::string(message), ErrorCodes::CANNOT_CONNECT_RABBITMQ); }); } else { for (const auto & routing_key : routing_keys) { setup_channel->bindExchange(exchange_name, bridge_exchange, routing_key) .onSuccess([&]() { ++bound_keys; if (bound_keys == routing_keys.size()) binding_created = true; }) .onError([&](const char * message) { throw Exception("Unable to bind exchange. Reason: " + std::string(message), ErrorCodes::CANNOT_CONNECT_RABBITMQ); }); } } while (!binding_created) { event_handler->iterateLoop(); } } void StorageRabbitMQ::unbindExchange() { std::call_once(flag, [&]() { setup_channel->removeExchange(bridge_exchange) .onSuccess([&]() { exchange_removed.store(true); }) .onError([&](const char * message) { throw Exception("Unable to remove exchange. Reason: " + std::string(message), ErrorCodes::CANNOT_CONNECT_RABBITMQ); }); while (!exchange_removed.load()) { event_handler->iterateLoop(); } event_handler->stop(); looping_task->deactivate(); heartbeat_task->deactivate(); }); } bool StorageRabbitMQ::restoreConnection() { if (restore_connection.try_lock()) { /// This lock is to synchronize with getChannel(). std::lock_guard lk(connection_mutex); if (!connection->usable() || !connection->ready()) { LOG_TRACE(log, "Trying to restore consumer connection"); if (!connection->closed()) connection->close(); connection = std::make_shared(event_handler.get(), AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login(login_password.first, login_password.second), "/")); size_t cnt_retries = 0; while (!connection->ready() && ++cnt_retries != RETRIES_MAX) { event_handler->iterateLoop(); std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP)); } } if (event_handler->connectionRunning()) { LOG_TRACE(log, "Connection restored"); heartbeat_task->scheduleAfter(HEARTBEAT_RESCHEDULE_MS); looping_task->activateAndSchedule(); } else { LOG_TRACE(log, "Connection refused"); } restore_connection.unlock(); } else { std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP)); } return event_handler->connectionRunning(); } ChannelPtr StorageRabbitMQ::getChannel() { std::lock_guard lk(connection_mutex); ChannelPtr new_channel = std::make_shared(connection.get()); return new_channel; } Pipes StorageRabbitMQ::read( const Names & column_names, const StorageMetadataPtr & metadata_snapshot, const SelectQueryInfo & /* query_info */, const Context & context, QueryProcessingStage::Enum /* processed_stage */, size_t /* max_block_size */, unsigned /* num_streams */) { if (num_created_consumers == 0) return {}; Pipes pipes; pipes.reserve(num_created_consumers); auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID()); for (size_t i = 0; i < num_created_consumers; ++i) { auto rabbit_stream = std::make_shared( *this, metadata_snapshot, context, column_names); auto converting_stream = std::make_shared( rabbit_stream, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Name); pipes.emplace_back(std::make_shared(converting_stream)); } if (!loop_started) { loop_started = true; looping_task->activateAndSchedule(); } LOG_DEBUG(log, "Starting reading {} streams", pipes.size()); return pipes; } BlockOutputStreamPtr StorageRabbitMQ::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context & context) { return std::make_shared(*this, metadata_snapshot, context); } void StorageRabbitMQ::startup() { setup_channel = std::make_shared(connection.get()); initExchange(); bindExchange(); for (size_t i = 0; i < num_consumers; ++i) { try { pushReadBuffer(createReadBuffer()); ++num_created_consumers; } catch (const AMQP::Exception & e) { std::cerr << e.what(); throw; } } streaming_task->activateAndSchedule(); heartbeat_task->activateAndSchedule(); } void StorageRabbitMQ::shutdown() { stream_cancelled = true; event_handler->stop(); looping_task->deactivate(); streaming_task->deactivate(); heartbeat_task->deactivate(); for (size_t i = 0; i < num_created_consumers; ++i) { popReadBuffer(); } connection->close(); } void StorageRabbitMQ::pushReadBuffer(ConsumerBufferPtr buffer) { std::lock_guard lock(mutex); buffers.push_back(buffer); semaphore.set(); } ConsumerBufferPtr StorageRabbitMQ::popReadBuffer() { return popReadBuffer(std::chrono::milliseconds::zero()); } ConsumerBufferPtr StorageRabbitMQ::popReadBuffer(std::chrono::milliseconds timeout) { // Wait for the first free buffer if (timeout == std::chrono::milliseconds::zero()) semaphore.wait(); else { if (!semaphore.tryWait(timeout.count())) return nullptr; } // Take the first available buffer from the list std::lock_guard lock(mutex); auto buffer = buffers.back(); buffers.pop_back(); return buffer; } ConsumerBufferPtr StorageRabbitMQ::createReadBuffer() { ChannelPtr consumer_channel = std::make_shared(connection.get()); return std::make_shared( consumer_channel, setup_channel, event_handler, consumer_exchange, ++consumer_id, queue_base, log, row_delimiter, hash_exchange, num_queues, deadletter_exchange, stream_cancelled); } ProducerBufferPtr StorageRabbitMQ::createWriteBuffer() { return std::make_shared( parsed_address, global_context, login_password, routing_keys, exchange_name, exchange_type, ++producer_id, use_transactional_channel, persistent, log, row_delimiter ? std::optional{row_delimiter} : std::nullopt, 1, 1024); } bool StorageRabbitMQ::checkDependencies(const StorageID & table_id) { // Check if all dependencies are attached auto dependencies = DatabaseCatalog::instance().getDependencies(table_id); if (dependencies.empty()) return true; // Check the dependencies are ready? for (const auto & db_tab : dependencies) { auto table = DatabaseCatalog::instance().tryGetTable(db_tab, global_context); if (!table) return false; // If it materialized view, check it's target table auto * materialized_view = dynamic_cast(table.get()); if (materialized_view && !materialized_view->tryGetTargetTable()) return false; // Check all its dependencies if (!checkDependencies(db_tab)) return false; } return true; } void StorageRabbitMQ::threadFunc() { try { auto table_id = getStorageID(); // Check if at least one direct dependency is attached size_t dependencies_count = DatabaseCatalog::instance().getDependencies(table_id).size(); if (dependencies_count) { // Keep streaming as long as there are attached views and streaming is not cancelled while (!stream_cancelled && num_created_consumers > 0) { if (!checkDependencies(table_id)) break; LOG_DEBUG(log, "Started streaming to {} attached views", dependencies_count); if (!streamToViews()) break; } } } catch (...) { tryLogCurrentException(__PRETTY_FUNCTION__); } /// Wait for attached views if (!stream_cancelled) streaming_task->schedule(); } bool StorageRabbitMQ::streamToViews() { auto table_id = getStorageID(); auto table = DatabaseCatalog::instance().getTable(table_id, global_context); if (!table) throw Exception("Engine table " + table_id.getNameForLogs() + " doesn't exist.", ErrorCodes::LOGICAL_ERROR); // Create an INSERT query for streaming data auto insert = std::make_shared(); insert->table_id = table_id; InterpreterInsertQuery interpreter(insert, rabbitmq_context, false, true, true); auto block_io = interpreter.execute(); // Create a stream for each consumer and join them in a union stream BlockInputStreams streams; streams.reserve(num_created_consumers); auto metadata_snapshot = getInMemoryMetadataPtr(); auto column_names = block_io.out->getHeader().getNames(); auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID()); for (size_t i = 0; i < num_created_consumers; ++i) { auto rabbit_stream = std::make_shared(*this, metadata_snapshot, rabbitmq_context, column_names); auto converting_stream = std::make_shared(rabbit_stream, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Name); streams.emplace_back(converting_stream); // Limit read batch to maximum block size to allow DDL IBlockInputStream::LocalLimits limits; const Settings & settings = global_context.getSettingsRef(); limits.speed_limits.max_execution_time = settings.stream_flush_interval_ms; limits.timeout_overflow_mode = OverflowMode::BREAK; rabbit_stream->setLimits(limits); } if (!loop_started) { loop_started = true; looping_task->activateAndSchedule(); } // Join multiple streams if necessary BlockInputStreamPtr in; if (streams.size() > 1) in = std::make_shared(streams, nullptr, streams.size()); else in = streams[0]; std::atomic stub = {false}; copyData(*in, *block_io.out, &stub); // Check whether the limits were applied during query execution bool limits_applied = false; const BlockStreamProfileInfo & info = in->getProfileInfo(); limits_applied = info.hasAppliedLimit(); return limits_applied; } void registerStorageRabbitMQ(StorageFactory & factory) { auto creator_fn = [](const StorageFactory::Arguments & args) { ASTs & engine_args = args.engine_args; size_t args_count = engine_args.size(); bool has_settings = args.storage_def->settings; RabbitMQSettings rabbitmq_settings; if (has_settings) { rabbitmq_settings.loadFromQuery(*args.storage_def); } String host_port = rabbitmq_settings.rabbitmq_host_port; if (args_count >= 1) { const auto * ast = engine_args[0]->as(); if (ast && ast->value.getType() == Field::Types::String) { host_port = safeGet(ast->value); } else { throw Exception(String("RabbitMQ host:port must be a string"), ErrorCodes::BAD_ARGUMENTS); } } String routing_key_list = rabbitmq_settings.rabbitmq_routing_key_list.value; if (args_count >= 2) { engine_args[1] = evaluateConstantExpressionAsLiteral(engine_args[1], args.local_context); routing_key_list = engine_args[1]->as().value.safeGet(); } Names routing_keys; boost::split(routing_keys, routing_key_list, [](char c){ return c == ','; }); for (String & key : routing_keys) { boost::trim(key); } String exchange = rabbitmq_settings.rabbitmq_exchange_name.value; if (args_count >= 3) { engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context); const auto * ast = engine_args[2]->as(); if (ast && ast->value.getType() == Field::Types::String) { exchange = safeGet(ast->value); } } String format = rabbitmq_settings.rabbitmq_format.value; if (args_count >= 4) { engine_args[3] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[3], args.local_context); const auto * ast = engine_args[3]->as(); if (ast && ast->value.getType() == Field::Types::String) { format = safeGet(ast->value); } else { throw Exception("Format must be a string", ErrorCodes::BAD_ARGUMENTS); } } char row_delimiter = rabbitmq_settings.rabbitmq_row_delimiter; if (args_count >= 5) { engine_args[4] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[4], args.local_context); const auto * ast = engine_args[4]->as(); String arg; if (ast && ast->value.getType() == Field::Types::String) { arg = safeGet(ast->value); } else { throw Exception("Row delimiter must be a char", ErrorCodes::BAD_ARGUMENTS); } if (arg.size() > 1) { throw Exception("Row delimiter must be a char", ErrorCodes::BAD_ARGUMENTS); } else if (arg.empty()) { row_delimiter = '\0'; } else { row_delimiter = arg[0]; } } String exchange_type = rabbitmq_settings.rabbitmq_exchange_type.value; if (args_count >= 6) { engine_args[5] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[5], args.local_context); const auto * ast = engine_args[5]->as(); if (ast && ast->value.getType() == Field::Types::String) { exchange_type = safeGet(ast->value); } if (exchange_type != "fanout" && exchange_type != "direct" && exchange_type != "topic" && exchange_type != "headers" && exchange_type != "consistent_hash") throw Exception("Invalid exchange type", ErrorCodes::BAD_ARGUMENTS); } UInt64 num_consumers = rabbitmq_settings.rabbitmq_num_consumers; if (args_count >= 7) { const auto * ast = engine_args[6]->as(); if (ast && ast->value.getType() == Field::Types::UInt64) { num_consumers = safeGet(ast->value); } else { throw Exception("Number of consumers must be a positive integer", ErrorCodes::BAD_ARGUMENTS); } } UInt64 num_queues = rabbitmq_settings.rabbitmq_num_queues; if (args_count >= 8) { const auto * ast = engine_args[7]->as(); if (ast && ast->value.getType() == Field::Types::UInt64) { num_consumers = safeGet(ast->value); } else { throw Exception("Number of queues must be a positive integer", ErrorCodes::BAD_ARGUMENTS); } } bool use_transactional_channel = static_cast(rabbitmq_settings.rabbitmq_transactional_channel); if (args_count >= 9) { const auto * ast = engine_args[8]->as(); if (ast && ast->value.getType() == Field::Types::UInt64) { use_transactional_channel = static_cast(safeGet(ast->value)); } else { throw Exception("Transactional channel parameter is a bool", ErrorCodes::BAD_ARGUMENTS); } } String queue_base = rabbitmq_settings.rabbitmq_queue_base.value; if (args_count >= 10) { engine_args[9] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[9], args.local_context); const auto * ast = engine_args[9]->as(); if (ast && ast->value.getType() == Field::Types::String) { queue_base = safeGet(ast->value); } } String deadletter_exchange = rabbitmq_settings.rabbitmq_deadletter_exchange.value; if (args_count >= 11) { engine_args[10] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[10], args.local_context); const auto * ast = engine_args[10]->as(); if (ast && ast->value.getType() == Field::Types::String) { deadletter_exchange = safeGet(ast->value); } } bool persistent = static_cast(rabbitmq_settings.rabbitmq_persistent_mode); if (args_count >= 12) { const auto * ast = engine_args[11]->as(); if (ast && ast->value.getType() == Field::Types::UInt64) { persistent = static_cast(safeGet(ast->value)); } else { throw Exception("Transactional channel parameter is a bool", ErrorCodes::BAD_ARGUMENTS); } } return StorageRabbitMQ::create( args.table_id, args.context, args.columns, host_port, routing_keys, exchange, format, row_delimiter, exchange_type, num_consumers, num_queues, use_transactional_channel, queue_base, deadletter_exchange, persistent); }; factory.registerStorage("RabbitMQ", creator_fn, StorageFactory::StorageFeatures{ .supports_settings = true, }); } NamesAndTypesList StorageRabbitMQ::getVirtuals() const { return NamesAndTypesList{ {"_exchange_name", std::make_shared()}, {"_consumer_tag", std::make_shared()}, {"_delivery_tag", std::make_shared()}, {"_redelivered", std::make_shared()} }; } }