#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { enum { Connection_setup_sleep = 200, Connection_setup_retries_max = 1000 }; namespace ErrorCodes { extern const int LOGICAL_ERROR; extern const int BAD_ARGUMENTS; } StorageRabbitMQ::StorageRabbitMQ( const StorageID & table_id_, Context & context_, const ColumnsDescription & columns_, const String & host_port_, const Names & routing_keys_, const String & exchange_name_, const String & format_name_, char row_delimiter_, const String & exchange_type_, size_t num_consumers_, size_t num_queues_, const bool use_transactional_channel_) : IStorage(table_id_) , global_context(context_.getGlobalContext()) , rabbitmq_context(Context(global_context)) , routing_keys(global_context.getMacros()->expand(routing_keys_)) , exchange_name(exchange_name_) , format_name(global_context.getMacros()->expand(format_name_)) , row_delimiter(row_delimiter_) , num_consumers(num_consumers_) , num_queues(num_queues_) , exchange_type(exchange_type_) , use_transactional_channel(use_transactional_channel_) , log(&Poco::Logger::get("StorageRabbitMQ (" + table_id_.table_name + ")")) , semaphore(0, num_consumers_) , login_password(std::make_pair( rabbitmq_context.getConfigRef().getString("rabbitmq_username", "root"), rabbitmq_context.getConfigRef().getString("rabbitmq_password", "clickhouse"))) , parsed_address(parseAddress(global_context.getMacros()->expand(host_port_), 5672)) , evbase(event_base_new()) , eventHandler(evbase, log) , connection(&eventHandler, AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login(login_password.first, login_password.second), "/")) { size_t cnt_retries = 0; while (!connection.ready() && ++cnt_retries != Connection_setup_retries_max) { event_base_loop(evbase, EVLOOP_NONBLOCK | EVLOOP_ONCE); std::this_thread::sleep_for(std::chrono::milliseconds(Connection_setup_sleep)); } if (!connection.ready()) { LOG_ERROR(log, "Cannot set up connection for consumer"); } rabbitmq_context.makeQueryContext(); setColumns(columns_); task = global_context.getSchedulePool().createTask(log->name(), [this]{ threadFunc(); }); task->deactivate(); bind_by_id = num_consumers > 1 || num_queues > 1; } Pipes StorageRabbitMQ::read( const Names & column_names, const SelectQueryInfo & /* query_info */, const Context & context, QueryProcessingStage::Enum /* processed_stage */, size_t /* max_block_size */, unsigned /* num_streams */) { if (num_created_consumers == 0) return {}; Pipes pipes; pipes.reserve(num_created_consumers); for (size_t i = 0; i < num_created_consumers; ++i) { pipes.emplace_back(std::make_shared(std::make_shared( *this, context, column_names, log))); } LOG_DEBUG(log, "Starting reading {} streams", pipes.size()); return pipes; } BlockOutputStreamPtr StorageRabbitMQ::write(const ASTPtr &, const Context & context) { return std::make_shared(*this, context); } void StorageRabbitMQ::startup() { for (size_t i = 0; i < num_consumers; ++i) { try { pushReadBuffer(createReadBuffer()); ++num_created_consumers; } catch (const AMQP::Exception & e) { std::cerr << e.what(); throw; } } task->activateAndSchedule(); } void StorageRabbitMQ::shutdown() { stream_cancelled = true; for (size_t i = 0; i < num_created_consumers; ++i) { popReadBuffer(); } connection.close(); task->deactivate(); } void StorageRabbitMQ::pushReadBuffer(ConsumerBufferPtr buffer) { std::lock_guard lock(mutex); buffers.push_back(buffer); semaphore.set(); } ConsumerBufferPtr StorageRabbitMQ::popReadBuffer() { return popReadBuffer(std::chrono::milliseconds::zero()); } ConsumerBufferPtr StorageRabbitMQ::popReadBuffer(std::chrono::milliseconds timeout) { // Wait for the first free buffer if (timeout == std::chrono::milliseconds::zero()) semaphore.wait(); else { if (!semaphore.tryWait(timeout.count())) return nullptr; } // Take the first available buffer from the list std::lock_guard lock(mutex); auto buffer = buffers.back(); buffers.pop_back(); return buffer; } ConsumerBufferPtr StorageRabbitMQ::createReadBuffer() { if (update_channel_id) next_channel_id += num_queues; update_channel_id = true; ChannelPtr consumer_channel = std::make_shared(&connection); auto table_id = getStorageID(); String table_name = table_id.getNameForLogs(); return std::make_shared(consumer_channel, eventHandler, exchange_name, routing_keys, next_channel_id, log, row_delimiter, bind_by_id, num_queues, exchange_type, table_name, stream_cancelled); } ProducerBufferPtr StorageRabbitMQ::createWriteBuffer() { String producer_exchange = exchange_type == "default" ? exchange_name : exchange_name + "_default"; return std::make_shared(parsed_address, login_password, routing_keys[0], producer_exchange, log, num_consumers * num_queues, bind_by_id, use_transactional_channel, row_delimiter ? std::optional{row_delimiter} : std::nullopt, 1, 1024); } bool StorageRabbitMQ::checkDependencies(const StorageID & table_id) { // Check if all dependencies are attached auto dependencies = DatabaseCatalog::instance().getDependencies(table_id); if (dependencies.empty()) return true; // Check the dependencies are ready? for (const auto & db_tab : dependencies) { auto table = DatabaseCatalog::instance().tryGetTable(db_tab, global_context); if (!table) return false; // If it materialized view, check it's target table auto * materialized_view = dynamic_cast(table.get()); if (materialized_view && !materialized_view->tryGetTargetTable()) return false; // Check all its dependencies if (!checkDependencies(db_tab)) return false; } return true; } void StorageRabbitMQ::threadFunc() { try { auto table_id = getStorageID(); // Check if at least one direct dependency is attached size_t dependencies_count = DatabaseCatalog::instance().getDependencies(table_id).size(); if (dependencies_count) { // Keep streaming as long as there are attached views and streaming is not cancelled while (!stream_cancelled && num_created_consumers > 0) { if (!checkDependencies(table_id)) break; LOG_DEBUG(log, "Started streaming to {} attached views", dependencies_count); if (!streamToViews()) break; } } } catch (...) { tryLogCurrentException(__PRETTY_FUNCTION__); } /// Wait for attached views if (!stream_cancelled) task->activateAndSchedule(); } bool StorageRabbitMQ::streamToViews() { auto table_id = getStorageID(); auto table = DatabaseCatalog::instance().getTable(table_id, global_context); if (!table) throw Exception("Engine table " + table_id.getNameForLogs() + " doesn't exist.", ErrorCodes::LOGICAL_ERROR); // Create an INSERT query for streaming data auto insert = std::make_shared(); insert->table_id = table_id; InterpreterInsertQuery interpreter(insert, rabbitmq_context, false, true, true); auto block_io = interpreter.execute(); // Create a stream for each consumer and join them in a union stream BlockInputStreams streams; streams.reserve(num_created_consumers); for (size_t i = 0; i < num_created_consumers; ++i) { auto stream = std::make_shared(*this, rabbitmq_context, block_io.out->getHeader().getNames(), log); streams.emplace_back(stream); // Limit read batch to maximum block size to allow DDL IBlockInputStream::LocalLimits limits; const Settings & settings = global_context.getSettingsRef(); limits.speed_limits.max_execution_time = settings.stream_flush_interval_ms; limits.timeout_overflow_mode = OverflowMode::BREAK; stream->setLimits(limits); } // Join multiple streams if necessary BlockInputStreamPtr in; if (streams.size() > 1) in = std::make_shared(streams, nullptr, streams.size()); else in = streams[0]; std::atomic stub = {false}; copyData(*in, *block_io.out, &stub); // Check whether the limits were applied during query execution bool limits_applied = false; const BlockStreamProfileInfo & info = in->getProfileInfo(); limits_applied = info.hasAppliedLimit(); return limits_applied; } void registerStorageRabbitMQ(StorageFactory & factory) { auto creator_fn = [](const StorageFactory::Arguments & args) { ASTs & engine_args = args.engine_args; size_t args_count = engine_args.size(); bool has_settings = args.storage_def->settings; RabbitMQSettings rabbitmq_settings; if (has_settings) { rabbitmq_settings.loadFromQuery(*args.storage_def); } String host_port = rabbitmq_settings.rabbitmq_host_port; if (args_count >= 1) { const auto * ast = engine_args[0]->as(); if (ast && ast->value.getType() == Field::Types::String) { host_port = safeGet(ast->value); } else { throw Exception(String("RabbitMQ host:port must be a string"), ErrorCodes::BAD_ARGUMENTS); } } String routing_key_list = rabbitmq_settings.rabbitmq_routing_key_list.value; if (args_count >= 2) { engine_args[1] = evaluateConstantExpressionAsLiteral(engine_args[1], args.local_context); routing_key_list = engine_args[1]->as().value.safeGet(); } Names routing_keys; boost::split(routing_keys, routing_key_list, [](char c){ return c == ','; }); for (String & key : routing_keys) { boost::trim(key); } String exchange = rabbitmq_settings.rabbitmq_exchange_name.value; if (args_count >= 3) { engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context); const auto * ast = engine_args[2]->as(); if (ast && ast->value.getType() == Field::Types::String) { exchange = safeGet(ast->value); } } String format = rabbitmq_settings.rabbitmq_format.value; if (args_count >= 4) { engine_args[3] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[3], args.local_context); const auto * ast = engine_args[3]->as(); if (ast && ast->value.getType() == Field::Types::String) { format = safeGet(ast->value); } else { throw Exception("Format must be a string", ErrorCodes::BAD_ARGUMENTS); } } char row_delimiter = rabbitmq_settings.rabbitmq_row_delimiter; if (args_count >= 5) { engine_args[4] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[4], args.local_context); const auto * ast = engine_args[4]->as(); String arg; if (ast && ast->value.getType() == Field::Types::String) { arg = safeGet(ast->value); } else { throw Exception("Row delimiter must be a char", ErrorCodes::BAD_ARGUMENTS); } if (arg.size() > 1) { throw Exception("Row delimiter must be a char", ErrorCodes::BAD_ARGUMENTS); } else if (arg.empty()) { row_delimiter = '\0'; } else { row_delimiter = arg[0]; } } String exchange_type = rabbitmq_settings.rabbitmq_exchange_type.value; if (args_count >= 6) { engine_args[5] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[5], args.local_context); const auto * ast = engine_args[5]->as(); if (ast && ast->value.getType() == Field::Types::String) { exchange_type = safeGet(ast->value); } if (exchange_type != "fanout" && exchange_type != "direct" && exchange_type != "topic" && exchange_type != "consistent_hash") { if (exchange_type == "headers") throw Exception("Headers exchange is not supported", ErrorCodes::BAD_ARGUMENTS); else throw Exception("Invalid exchange type", ErrorCodes::BAD_ARGUMENTS); } } UInt64 num_consumers = rabbitmq_settings.rabbitmq_num_consumers; if (args_count >= 7) { const auto * ast = engine_args[6]->as(); if (ast && ast->value.getType() == Field::Types::UInt64) { num_consumers = safeGet(ast->value); } else { throw Exception("Number of consumers must be a positive integer", ErrorCodes::BAD_ARGUMENTS); } } UInt64 num_queues = rabbitmq_settings.rabbitmq_num_queues; if (args_count >= 8) { const auto * ast = engine_args[7]->as(); if (ast && ast->value.getType() == Field::Types::UInt64) { num_consumers = safeGet(ast->value); } else { throw Exception("Number of queues must be a positive integer", ErrorCodes::BAD_ARGUMENTS); } } bool use_transactional_channel = static_cast(rabbitmq_settings.rabbitmq_transactional_channel); if (args_count >= 9) { const auto * ast = engine_args[8]->as(); if (ast && ast->value.getType() == Field::Types::UInt64) { use_transactional_channel = static_cast(safeGet(ast->value)); } else { throw Exception("Transactional channel parameter is a bool", ErrorCodes::BAD_ARGUMENTS); } } return StorageRabbitMQ::create( args.table_id, args.context, args.columns, host_port, routing_keys, exchange, format, row_delimiter, exchange_type, num_consumers, num_queues, use_transactional_channel); }; factory.registerStorage("RabbitMQ", creator_fn, StorageFactory::StorageFeatures{ .supports_settings = true, }); } NamesAndTypesList StorageRabbitMQ::getVirtuals() const { return NamesAndTypesList{ {"_exchange", std::make_shared()} }; } }