diff --git a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp index 4742ea2a33a..be634a4fa3d 100644 --- a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp +++ b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp @@ -16,11 +16,13 @@ RabbitMQBlockInputStream::RabbitMQBlockInputStream( const StorageMetadataPtr & metadata_snapshot_, Context & context_, const Names & columns, + size_t max_block_size_, bool ack_in_suffix_) : storage(storage_) , metadata_snapshot(metadata_snapshot_) , context(context_) , column_names(columns) + , max_block_size(max_block_size_) , ack_in_suffix(ack_in_suffix_) , non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized()) , virtual_header(metadata_snapshot->getSampleBlockForColumns( @@ -51,12 +53,14 @@ void RabbitMQBlockInputStream::readPrefixImpl() } -bool RabbitMQBlockInputStream::needManualChannelUpdate() +bool RabbitMQBlockInputStream::needChannelUpdate() { if (!buffer) return false; - return !buffer->channelUsable() && buffer->channelAllowed() && storage.connectionRunning(); + ChannelPtr channel = buffer->getChannel(); + + return !channel || !channel->usable(); } @@ -83,7 +87,7 @@ Block RabbitMQBlockInputStream::readImpl() MutableColumns virtual_columns = virtual_header.cloneEmptyColumns(); auto input_format = FormatFactory::instance().getInputFormat( - storage.getFormatName(), *buffer, non_virtual_header, context, 1); + storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size); InputPort port(input_format->getPort().getHeader(), input_format.get()); connect(input_format->getPort(), port); @@ -164,7 +168,7 @@ Block RabbitMQBlockInputStream::readImpl() buffer->allowNext(); - if (buffer->queueEmpty() || !checkTimeLimit()) + if (total_rows >= max_block_size || buffer->queueEmpty() || buffer->consumerStopped() || !checkTimeLimit()) break; } @@ -189,7 +193,7 @@ void RabbitMQBlockInputStream::readSuffixImpl() bool RabbitMQBlockInputStream::sendAck() { - if (!buffer || !buffer->channelUsable()) + if (!buffer) return false; if (!buffer->ackMessages()) diff --git a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.h b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.h index 4f52d64189e..0cfd9c2e350 100644 --- a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.h +++ b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.h @@ -18,6 +18,7 @@ public: const StorageMetadataPtr & metadata_snapshot_, Context & context_, const Names & columns, + size_t max_block_size_, bool ack_in_suffix = true); ~RabbitMQBlockInputStream() override; @@ -29,8 +30,8 @@ public: Block readImpl() override; void readSuffixImpl() override; + bool needChannelUpdate(); void updateChannel(); - bool needManualChannelUpdate(); bool sendAck(); private: @@ -38,6 +39,7 @@ private: StorageMetadataPtr metadata_snapshot; Context context; Names column_names; + const size_t max_block_size; bool ack_in_suffix; bool finished = false; diff --git a/src/Storages/RabbitMQ/RabbitMQSettings.h b/src/Storages/RabbitMQ/RabbitMQSettings.h index 110093ef2f3..2f8d6adfa16 100644 --- a/src/Storages/RabbitMQ/RabbitMQSettings.h +++ b/src/Storages/RabbitMQ/RabbitMQSettings.h @@ -2,7 +2,6 @@ #include - namespace DB { class ASTStorage; @@ -21,11 +20,11 @@ namespace DB M(String, rabbitmq_queue_base, "", "Base for queue names to be able to reopen non-empty queues in case of failure.", 0) \ M(String, rabbitmq_deadletter_exchange, "", "Exchange name to be passed as a dead-letter-exchange name.", 0) \ M(Bool, rabbitmq_persistent, false, "If set, delivery mode will be set to 2 (makes messages 'persistent', durable).", 0) \ + M(UInt64, rabbitmq_skip_broken_messages, 0, "Skip at least this number of broken messages from RabbitMQ per block", 0) \ M(UInt64, rabbitmq_max_block_size, 0, "Number of row collected before flushing data from RabbitMQ.", 0) \ M(Milliseconds, rabbitmq_flush_interval_ms, 0, "Timeout for flushing data from RabbitMQ.", 0) \ - DECLARE_SETTINGS_TRAITS(RabbitMQSettingsTraits, LIST_OF_RABBITMQ_SETTINGS) - +DECLARE_SETTINGS_TRAITS(RabbitMQSettingsTraits, LIST_OF_RABBITMQ_SETTINGS) struct RabbitMQSettings : public BaseSettings { diff --git a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp index 80a630117d8..074f74c91aa 100644 --- a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp +++ b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp @@ -50,7 +50,6 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer( , row_delimiter(row_delimiter_) , stopped(stopped_) , received(QUEUE_SIZE * num_queues) - , last_inserted_record(AckTracker()) { for (size_t queue_id = 0; queue_id < num_queues; ++queue_id) bindQueue(queue_id); @@ -165,10 +164,14 @@ bool ReadBufferFromRabbitMQConsumer::ackMessages() { /// Commit all received messages with delivery tags from last commited to last inserted if (!consumer_channel->ack(record.delivery_tag, AMQP::multiple)) + { + LOG_ERROR(log, "Failed to commit messages with delivery tags from last commited to {} on channel {}", + record.delivery_tag, channel_id); return false; + } prev_tag = record.delivery_tag; - LOG_TRACE(log, "Consumer acknowledged messages with deliveryTags up to {} on channel {}", record.delivery_tag, channel_id); + LOG_TRACE(log, "Consumer commited messages with deliveryTags up to {} on channel {}", record.delivery_tag, channel_id); } return true; @@ -207,6 +210,8 @@ void ReadBufferFromRabbitMQConsumer::setupChannel() consumer_channel->onError([&](const char * message) { + /// If here, then fatal error occured on the channel and it is not usable anymore, need to close it + consumer_channel->close(); LOG_ERROR(log, "Channel {} error: {}", channel_id, message); channel_error.store(true); diff --git a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h index 5524a5b52cc..28c67e0314e 100644 --- a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h +++ b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h @@ -59,6 +59,7 @@ public: bool channelUsable() { return !channel_error.load(); } /// Do not allow to update channel untill current channel is properly set up and subscribed bool channelAllowed() { return !wait_subscription.load(); } + bool consumerStopped() { return stopped; } ChannelPtr & getChannel() { return consumer_channel; } void setupChannel(); diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index 4b013d11574..44c57a0db3f 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -107,8 +107,7 @@ StorageRabbitMQ::StorageRabbitMQ( setInMemoryMetadata(storage_metadata); rabbitmq_context.makeQueryContext(); - if (!schema_name.empty()) - rabbitmq_context.setSetting("format_schema", schema_name); + rabbitmq_context = addSettings(rabbitmq_context); /// One looping task for all consumers as they share the same connection == the same handler == the same event loop event_handler->updateLoopState(Loop::STOP); @@ -193,6 +192,19 @@ String StorageRabbitMQ::getTableBasedName(String name, const StorageID & table_i } +Context StorageRabbitMQ::addSettings(Context context) +{ + context.setSetting("input_format_skip_unknown_fields", true); + context.setSetting("input_format_allow_errors_ratio", 0.); + context.setSetting("input_format_allow_errors_num", rabbitmq_settings->rabbitmq_skip_broken_messages.value); + + if (!schema_name.empty()) + context.setSetting("format_schema", schema_name); + + return context; +} + + void StorageRabbitMQ::heartbeatFunc() { if (!stream_cancelled && event_handler->connectionRunning()) @@ -215,10 +227,11 @@ void StorageRabbitMQ::loopingFunc() */ void StorageRabbitMQ::deactivateTask(BackgroundSchedulePool::TaskHolder & task, bool wait, bool stop_loop) { + if (stop_loop) + event_handler->updateLoopState(Loop::STOP); + if (task_mutex.try_lock()) { - if (stop_loop) - event_handler->updateLoopState(Loop::STOP); task->deactivate(); task_mutex.unlock(); @@ -232,6 +245,14 @@ void StorageRabbitMQ::deactivateTask(BackgroundSchedulePool::TaskHolder & task, } +size_t StorageRabbitMQ::getMaxBlockSize() + { + return rabbitmq_settings->rabbitmq_max_block_size.changed + ? rabbitmq_settings->rabbitmq_max_block_size.value + : (global_context.getSettingsRef().max_insert_block_size.value / num_consumers); + } + + void StorageRabbitMQ::initExchange() { /* Binding scheme is the following: client's exchange -> key bindings by routing key list -> bridge exchange (fanout) -> @@ -240,7 +261,15 @@ void StorageRabbitMQ::initExchange() setup_channel->declareExchange(exchange_name, exchange_type, AMQP::durable) .onError([&](const char * message) { - throw Exception("Unable to declare exchange. Make sure specified exchange is not already declared. Error: " + /* This error can be a result of attempt to declare exchange if it was already declared but + * 1) with different exchange type. In this case can + * - manually delete previously declared exchange and create a new one. + * - throw an error that the exchange with this name but another type is already declared and ask client to delete it himself + * if it is not needed anymore or use another exchange name. + * 2) with different exchange settings. This can only happen if client himself declared exchange with the same name and + * specified its own settings, which differ from this implementation. + */ + throw Exception("Unable to declare exchange (1). Make sure specified exchange is not already declared. Error: " + std::string(message), ErrorCodes::LOGICAL_ERROR); }); @@ -248,7 +277,8 @@ void StorageRabbitMQ::initExchange() setup_channel->declareExchange(bridge_exchange, AMQP::fanout, AMQP::durable + AMQP::autodelete) .onError([&](const char * message) { - throw Exception("Unable to declare exchange. Reason: " + std::string(message), ErrorCodes::LOGICAL_ERROR); + /// This error is not supposed to happen as this exchange name is always unique to type and its settings + throw Exception("Unable to declare exchange (2). Reason: " + std::string(message), ErrorCodes::LOGICAL_ERROR); }); if (!hash_exchange) @@ -267,13 +297,17 @@ void StorageRabbitMQ::initExchange() setup_channel->declareExchange(sharding_exchange, AMQP::consistent_hash, AMQP::durable + AMQP::autodelete, binding_arguments) .onError([&](const char * message) { - throw Exception("Unable to declare exchange. Reason: " + std::string(message), ErrorCodes::LOGICAL_ERROR); + /* This error can be a result of same reasons as above for exchange_name, i.e. it will mean that sharding exchange name appeared + * to be the same as some other exchange (which purpose is not for sharding). So probably actual error reason: queue_base parameter + * is bad. + */ + throw Exception("Unable to declare exchange (3). Reason: " + std::string(message), ErrorCodes::LOGICAL_ERROR); }); setup_channel->bindExchange(bridge_exchange, sharding_exchange, routing_keys[0]) .onError([&](const char * message) { - throw Exception("Unable to bind exchange. Reason: " + std::string(message), ErrorCodes::LOGICAL_ERROR); + throw Exception("Unable to bind exchange (2). Reason: " + std::string(message), ErrorCodes::LOGICAL_ERROR); }); consumer_exchange = sharding_exchange; @@ -302,7 +336,7 @@ void StorageRabbitMQ::bindExchange() }) .onError([&](const char * message) { - throw Exception("Unable to bind exchange. Reason: " + std::string(message), ErrorCodes::LOGICAL_ERROR); + throw Exception("Unable to bind exchange (1). Reason: " + std::string(message), ErrorCodes::LOGICAL_ERROR); }); } else if (exchange_type == AMQP::ExchangeType::fanout || exchange_type == AMQP::ExchangeType::consistent_hash) @@ -314,7 +348,7 @@ void StorageRabbitMQ::bindExchange() }) .onError([&](const char * message) { - throw Exception("Unable to bind exchange. Reason: " + std::string(message), ErrorCodes::LOGICAL_ERROR); + throw Exception("Unable to bind exchange (1). Reason: " + std::string(message), ErrorCodes::LOGICAL_ERROR); }); } else @@ -330,7 +364,7 @@ void StorageRabbitMQ::bindExchange() }) .onError([&](const char * message) { - throw Exception("Unable to bind exchange. Reason: " + std::string(message), ErrorCodes::LOGICAL_ERROR); + throw Exception("Unable to bind exchange (1). Reason: " + std::string(message), ErrorCodes::LOGICAL_ERROR); }); } } @@ -348,7 +382,7 @@ bool StorageRabbitMQ::restoreConnection(bool reconnecting) if (reconnecting) { - deactivateTask(heartbeat_task, 0, 0); + deactivateTask(heartbeat_task, false, false); connection->close(); /// Connection might be unusable, but not closed /* Connection is not closed immediately (firstly, all pending operations are completed, and then @@ -393,8 +427,8 @@ void StorageRabbitMQ::unbindExchange() * input streams are always created at startup, then they will also declare its own exchange bound queues, but they will not be visible * externally - client declares its own exchange-bound queues, from which to consume, so this means that if not disconnecting this local * queues, then messages will go both ways and in one of them they will remain not consumed. So need to disconnect local exchange - * bindings to remove redunadant message copies, but after that mv cannot work unless thoso bindings recreated. Recreating them is not - * difficult but very ugly and as probably nobody will do such thing - bindings will not be recreated. + * bindings to remove redunadant message copies, but after that mv cannot work unless those bindings are recreated. Recreating them is + * not difficult but very ugly and as probably nobody will do such thing - bindings will not be recreated. */ std::call_once(flag, [&]() { @@ -435,20 +469,17 @@ Pipe StorageRabbitMQ::read( auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID()); - auto modified_context = context; - if (!schema_name.empty()) - modified_context.setSetting("format_schema", schema_name); + auto modified_context = addSettings(context); + auto block_size = getMaxBlockSize(); bool update_channels = false; if (!event_handler->connectionRunning()) { if (event_handler->loopRunning()) - { - event_handler->updateLoopState(Loop::STOP); - looping_task->deactivate(); - } + deactivateTask(looping_task, false, true); - if ((update_channels = restoreConnection(true))) + update_channels = restoreConnection(true); + if (update_channels) heartbeat_task->scheduleAfter(HEARTBEAT_RESCHEDULE_MS); } @@ -457,20 +488,20 @@ Pipe StorageRabbitMQ::read( for (size_t i = 0; i < num_created_consumers; ++i) { - auto rabbit_stream = std::make_shared(*this, metadata_snapshot, modified_context, column_names); + auto rabbit_stream = std::make_shared( + *this, metadata_snapshot, modified_context, column_names, block_size); /* It is a possible but rare case when channel gets into error state and does not also close connection, so need manual update. * But I believe that in current context and with local rabbitmq settings this will never happen and any channel error will also * close connection, but checking anyway (in second condition of if statement). This must be done here (and also in streamToViews()) * and not in readPrefix as it requires to stop heartbeats and looping tasks to avoid race conditions inside the library */ - if (update_channels || rabbit_stream->needManualChannelUpdate()) + if ((update_channels || rabbit_stream->needChannelUpdate()) && connection->usable()) { if (event_handler->loopRunning()) { - event_handler->updateLoopState(Loop::STOP); - looping_task->deactivate(); - heartbeat_task->deactivate(); + deactivateTask(looping_task, false, true); + deactivateTask(heartbeat_task, false, false); } rabbit_stream->updateChannel(); @@ -526,9 +557,9 @@ void StorageRabbitMQ::shutdown() stream_cancelled = true; wait_confirm.store(false); - deactivateTask(streaming_task, 1, 1); - deactivateTask(heartbeat_task, 1, 0); - deactivateTask(looping_task, 1, 1); + deactivateTask(streaming_task, true, false); + deactivateTask(heartbeat_task, true, false); + deactivateTask(looping_task, true, true); connection->close(); @@ -594,7 +625,7 @@ ProducerBufferPtr StorageRabbitMQ::createWriteBuffer() { return std::make_shared( parsed_address, global_context, login_password, routing_keys, exchange_name, exchange_type, - producer_id.fetch_add(1), unique_strbase, persistent, wait_confirm, log, + producer_id.fetch_add(1), persistent, wait_confirm, log, row_delimiter ? std::optional{row_delimiter} : std::nullopt, 1, 1024); } @@ -683,19 +714,25 @@ bool StorageRabbitMQ::streamToViews() if (!event_handler->loopRunning() && event_handler->connectionRunning()) looping_task->activateAndSchedule(); + auto block_size = getMaxBlockSize(); + // Create a stream for each consumer and join them in a union stream BlockInputStreams streams; streams.reserve(num_created_consumers); for (size_t i = 0; i < num_created_consumers; ++i) { - auto stream = std::make_shared(*this, metadata_snapshot, rabbitmq_context, column_names, false); + auto stream = std::make_shared( + *this, metadata_snapshot, rabbitmq_context, column_names, block_size, false); streams.emplace_back(stream); // Limit read batch to maximum block size to allow DDL IBlockInputStream::LocalLimits limits; - limits.speed_limits.max_execution_time = global_context.getSettingsRef().stream_flush_interval_ms; + limits.speed_limits.max_execution_time = rabbitmq_settings->rabbitmq_flush_interval_ms.changed + ? rabbitmq_settings->rabbitmq_flush_interval_ms + : global_context.getSettingsRef().stream_flush_interval_ms; + limits.timeout_overflow_mode = OverflowMode::BREAK; stream->setLimits(limits); @@ -715,7 +752,7 @@ bool StorageRabbitMQ::streamToViews() * races inside the library, but only in case any error occurs or connection is lost while ack is being sent */ if (event_handler->loopRunning()) - deactivateTask(looping_task, 0, 1); + deactivateTask(looping_task, false, true); if (!event_handler->connectionRunning()) { @@ -733,20 +770,37 @@ bool StorageRabbitMQ::streamToViews() } else { - deactivateTask(heartbeat_task, 0, 0); + deactivateTask(heartbeat_task, false, false); /// Commit for (auto & stream : streams) { + /* false is returned by the sendAck function in only two cases: + * 1) if connection failed. In this case all channels will be closed and will be unable to send ack. Also ack is made based on + * delivery tags, which are unique to channels, so if channels fail, those delivery tags will become invalid and there is + * no way to send specific ack from a different channel. Actually once the server realises that it has messages in a queue + * waiting for confirm from a channel which suddenly closed, it will immediately make those messages accessible to other + * consumers. So in this case duplicates are inevitable. + * 2) size of the sent frame (libraries's internal request interface) exceeds max frame - internal library error. This is more + * common for message frames, but not likely to happen to ack frame I suppose. So I do not believe it is likely to happen. + * Also in this case if channel didn't get closed - it is ok if failed to send ack, because the next attempt to send ack on + * the same channel will also commit all previously not-committed messages. Anyway I do not think that for ack frame this + * will ever happen. + */ if (!stream->as()->sendAck()) { - /* Almost any error with channel will lead to connection closure, but if so happens that channel errored and connection - * is not closed - also need to restore channels - */ - if (!stream->as()->needManualChannelUpdate()) - stream->as()->updateChannel(); + if (connection->usable()) + { + /* Almost any error with channel will lead to connection closure, but if so happens that channel errored and + * connection is not closed - also need to restore channels + */ + if (!stream->as()->needChannelUpdate()) + stream->as()->updateChannel(); + } else + { break; + } } } } @@ -809,8 +863,9 @@ void registerStorageRabbitMQ(StorageFactory & factory) CHECK_RABBITMQ_STORAGE_ARGUMENT(11, rabbitmq_deadletter_exchange) CHECK_RABBITMQ_STORAGE_ARGUMENT(12, rabbitmq_persistent) - CHECK_RABBITMQ_STORAGE_ARGUMENT(13, rabbitmq_max_block_size) - CHECK_RABBITMQ_STORAGE_ARGUMENT(14, rabbitmq_flush_interval_ms) + CHECK_RABBITMQ_STORAGE_ARGUMENT(13, rabbitmq_skip_broken_messages) + CHECK_RABBITMQ_STORAGE_ARGUMENT(14, rabbitmq_max_block_size) + CHECK_RABBITMQ_STORAGE_ARGUMENT(15, rabbitmq_flush_interval_ms) #undef CHECK_RABBITMQ_STORAGE_ARGUMENT diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.h b/src/Storages/RabbitMQ/StorageRabbitMQ.h index 522dfff9a23..e4e90abd98b 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.h +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.h @@ -104,12 +104,13 @@ private: std::mutex mutex, task_mutex; std::vector buffers; /// available buffers for RabbitMQ consumers - String unique_strbase; + String unique_strbase; /// to make unique consumer channel id String sharding_exchange, bridge_exchange, consumer_exchange; - std::once_flag flag; - size_t consumer_id = 0; - std::atomic producer_id = 1; - std::atomic wait_confirm = true, exchange_removed = false; + std::once_flag flag; /// remove exchange only once + size_t consumer_id = 0; /// counter for consumer buffer, needed for channel id + std::atomic producer_id = 1; /// counter for producer buffer, needed for channel id + std::atomic wait_confirm = true; /// needed to break waiting for confirmations for producer + std::atomic exchange_removed = false; ChannelPtr setup_channel; BackgroundSchedulePool::TaskHolder streaming_task; @@ -126,6 +127,7 @@ private: Names parseRoutingKeys(String routing_key_list); AMQP::ExchangeType defineExchangeType(String exchange_type_); + Context addSettings(Context context); size_t getMaxBlockSize(); String getTableBasedName(String name, const StorageID & table_id); void deactivateTask(BackgroundSchedulePool::TaskHolder & task, bool wait, bool stop_loop); diff --git a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp index 945ebd5ac9a..38f62ff39b2 100644 --- a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp +++ b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp @@ -18,6 +18,11 @@ static const auto RETRIES_MAX = 20; static const auto BATCH = 1000; static const auto RETURNED_LIMIT = 50000; +namespace ErrorCodes +{ + extern const int CANNOT_CONNECT_RABBITMQ; +} + WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer( std::pair & parsed_address_, Context & global_context, @@ -26,7 +31,6 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer( const String & exchange_name_, const AMQP::ExchangeType exchange_type_, const size_t channel_id_base_, - const String channel_base_, const bool persistent_, std::atomic & wait_confirm_, Poco::Logger * log_, @@ -40,7 +44,6 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer( , exchange_name(exchange_name_) , exchange_type(exchange_type_) , channel_id_base(std::to_string(channel_id_base_)) - , channel_base(channel_base_) , persistent(persistent_) , wait_confirm(wait_confirm_) , payloads(BATCH) @@ -56,7 +59,16 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer( event_handler = std::make_unique(loop.get(), log); if (setupConnection(false)) + { setupChannel(); + } + else + { + if (!connection->closed()) + connection->close(true); + + throw Exception("Cannot connect to RabbitMQ", ErrorCodes::CANNOT_CONNECT_RABBITMQ); + } writing_task = global_context.getSchedulePool().createTask("RabbitMQWritingTask", [this]{ writingFunc(); }); writing_task->deactivate(); @@ -175,7 +187,7 @@ void WriteBufferToRabbitMQProducer::setupChannel() producer_channel->onReady([&]() { - channel_id = channel_id_base + std::to_string(channel_id_counter++) + "_" + channel_base; + channel_id = channel_id_base + std::to_string(channel_id_counter++); LOG_DEBUG(log, "Producer's channel {} is ready", channel_id); /* if persistent == true, onAck is received when message is persisted to disk or when it is consumed on every queue. If fails, @@ -187,17 +199,17 @@ void WriteBufferToRabbitMQProducer::setupChannel() producer_channel->confirmSelect() .onAck([&](uint64_t acked_delivery_tag, bool multiple) { - removeConfirmed(acked_delivery_tag, multiple, false); + removeRecord(acked_delivery_tag, multiple, false); }) .onNack([&](uint64_t nacked_delivery_tag, bool multiple, bool /* requeue */) { - removeConfirmed(nacked_delivery_tag, multiple, true); + removeRecord(nacked_delivery_tag, multiple, true); }); }); } -void WriteBufferToRabbitMQProducer::removeConfirmed(UInt64 received_delivery_tag, bool multiple, bool republish) +void WriteBufferToRabbitMQProducer::removeRecord(UInt64 received_delivery_tag, bool multiple, bool republish) { auto record_iter = delivery_record.find(received_delivery_tag); @@ -292,7 +304,6 @@ void WriteBufferToRabbitMQProducer::publish(ConcurrentBoundedQueueusable() && event_handler->connectionRunning()) || (!event_handler->connectionRunning() && setupConnection(true))) diff --git a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h index 1ab90cb0b1d..a8e94070dbd 100644 --- a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h +++ b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h @@ -25,7 +25,6 @@ public: const String & exchange_name_, const AMQP::ExchangeType exchange_type_, const size_t channel_id_, - const String channel_base_, const bool persistent_, std::atomic & wait_confirm_, Poco::Logger * log_, @@ -46,7 +45,7 @@ private: void writingFunc(); bool setupConnection(bool reconnecting); void setupChannel(); - void removeConfirmed(UInt64 received_delivery_tag, bool multiple, bool republish); + void removeRecord(UInt64 received_delivery_tag, bool multiple, bool republish); void publish(ConcurrentBoundedQueue> & message, bool republishing); std::pair parsed_address; @@ -54,9 +53,12 @@ private: const Names routing_keys; const String exchange_name; AMQP::ExchangeType exchange_type; - const String channel_id_base; - const String channel_base; + const String channel_id_base; /// Serial number of current producer buffer const bool persistent; + + /* false: when shutdown is called; needed because table might be dropped before all acks are received + * true: in all other cases + */ std::atomic & wait_confirm; AMQP::Table key_arguments; @@ -67,15 +69,48 @@ private: std::unique_ptr connection; std::unique_ptr producer_channel; - String channel_id; - ConcurrentBoundedQueue> payloads, returned; - UInt64 delivery_tag = 0; - std::atomic wait_all = true; - std::atomic wait_num = 0; - UInt64 payload_counter = 0; - std::map> delivery_record; + /// Channel errors lead to channel closure, need to count number of recreated channels to update channel id UInt64 channel_id_counter = 0; + /// channel id which contains id of current producer buffer and serial number of recreated channel in this buffer + String channel_id; + + /* payloads.queue: + * - payloads are pushed to queue in countRow and poped by another thread in writingFunc, each payload gets into queue only once + * returned.queue: + * - payloads are pushed to queue: + * 1) inside channel->onError() callback if channel becomes unusable and the record of pending acknowledgements from server + * is non-empty. + * 2) inside removeRecord() if received nack() - negative acknowledgement from the server that message failed to be written + * to disk or it was unable to reach the queue. + * - payloads are poped from the queue once republished + */ + ConcurrentBoundedQueue> payloads, returned; + + /* Counter of current delivery on a current channel. Delivery tags are scoped per channel. The server attaches a delivery tag for each + * published message - a serial number of delivery on current channel. Delivery tag is a way of server to notify publisher if it was + * able or unable to process delivery, i.e. it sends back a responce with a corresponding delivery tag. + */ + UInt64 delivery_tag = 0; + + /* false: message delivery successfully ended: publisher received confirm from server that all published + * 1) persistent messages were written to disk + * 2) non-persistent messages reached the queue + * true: continue to process deliveries and returned messages + */ + bool wait_all = true; + + /* false: untill writeSuffix is called + * true: means payloads.queue will not grow anymore + */ + std::atomic wait_num = 0; + + /// Needed to fill messageID property + UInt64 payload_counter = 0; + + /// Record of pending acknowledgements from the server; its size never exceeds size of returned.queue + std::map> delivery_record; + Poco::Logger * log; const std::optional delim; const size_t max_rows; diff --git a/tests/integration/test_storage_rabbitmq/test.py b/tests/integration/test_storage_rabbitmq/test.py index 68f7bb506e6..9dbaddf33f4 100644 --- a/tests/integration/test_storage_rabbitmq/test.py +++ b/tests/integration/test_storage_rabbitmq/test.py @@ -1547,91 +1547,6 @@ def test_rabbitmq_virtual_columns_with_materialized_view(rabbitmq_cluster): assert TSV(result) == TSV(expected) -@pytest.mark.timeout(420) -def test_rabbitmq_no_loss_on_table_drop(rabbitmq_cluster): - instance.query(''' - CREATE TABLE test.rabbitmq_queue_resume (key UInt64, value UInt64) - ENGINE = RabbitMQ - SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', - rabbitmq_exchange_name = 'queue_resume', - rabbitmq_exchange_type = 'direct', - rabbitmq_routing_key_list = 'queue_resume', - rabbitmq_queue_base = 'queue_resume', - rabbitmq_format = 'JSONEachRow', - rabbitmq_row_delimiter = '\\n'; - DROP TABLE IF EXISTS test.view; - DROP TABLE IF EXISTS test.consumer; - CREATE TABLE test.view (key UInt64, value UInt64) - ENGINE = MergeTree - ORDER BY key; - CREATE MATERIALIZED VIEW test.consumer TO test.view AS - SELECT * FROM test.rabbitmq_queue_resume; - ''') - - i = [0] - messages_num = 10000 - - credentials = pika.PlainCredentials('root', 'clickhouse') - parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) - def produce(): - connection = pika.BlockingConnection(parameters) - channel = connection.channel() - messages = [] - for _ in range(messages_num): - messages.append(json.dumps({'key': i[0], 'value': i[0]})) - i[0] += 1 - for message in messages: - channel.basic_publish(exchange='queue_resume', routing_key='queue_resume', body=message, - properties=pika.BasicProperties(delivery_mode = 2)) - connection.close() - - threads = [] - threads_num = 20 - for _ in range(threads_num): - threads.append(threading.Thread(target=produce)) - for thread in threads: - time.sleep(random.uniform(0, 1)) - thread.start() - - while int(instance.query('SELECT count() FROM test.view')) == 0: - time.sleep(1) - - instance.query(''' - DROP TABLE IF EXISTS test.rabbitmq_queue_resume; - ''') - - for thread in threads: - thread.join() - - collected = int(instance.query('SELECT count() FROM test.view')) - - instance.query(''' - CREATE TABLE test.rabbitmq_queue_resume (key UInt64, value UInt64) - ENGINE = RabbitMQ - SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', - rabbitmq_exchange_name = 'queue_resume', - rabbitmq_exchange_type = 'direct', - rabbitmq_routing_key_list = 'queue_resume', - rabbitmq_queue_base = 'queue_resume', - rabbitmq_format = 'JSONEachRow', - rabbitmq_row_delimiter = '\\n'; - ''') - - while True: - result1 = instance.query('SELECT count() FROM test.view') - time.sleep(1) - if int(result1) == messages_num * threads_num: - break - - instance.query(''' - DROP TABLE test.rabbitmq_queue_resume; - DROP TABLE test.consumer; - DROP TABLE test.view; - ''') - - assert int(result1) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result) - - @pytest.mark.timeout(420) def test_rabbitmq_many_consumers_to_each_queue(rabbitmq_cluster): instance.query(''' @@ -1856,6 +1771,85 @@ def test_rabbitmq_restore_failed_connection_without_losses_2(rabbitmq_cluster): assert int(result) == messages_num, 'ClickHouse lost some messages: {}'.format(result) +@pytest.mark.timeout(300) +def test_rabbitmq_commit_on_block_write(rabbitmq_cluster): + instance.query(''' + DROP TABLE IF EXISTS test.view; + DROP TABLE IF EXISTS test.consumer; + CREATE TABLE test.rabbitmq (key UInt64, value UInt64) + ENGINE = RabbitMQ + SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', + rabbitmq_exchange_name = 'block', + rabbitmq_format = 'JSONEachRow', + rabbitmq_queue_base = 'block', + rabbitmq_max_block_size = 100, + rabbitmq_row_delimiter = '\\n'; + CREATE TABLE test.view (key UInt64, value UInt64) + ENGINE = MergeTree() + ORDER BY key; + CREATE MATERIALIZED VIEW test.consumer TO test.view AS + SELECT * FROM test.rabbitmq; + ''') + + credentials = pika.PlainCredentials('root', 'clickhouse') + parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + + cancel = threading.Event() + + i = [0] + def produce(): + while not cancel.is_set(): + messages = [] + for _ in range(101): + messages.append(json.dumps({'key': i[0], 'value': i[0]})) + i[0] += 1 + for message in messages: + channel.basic_publish(exchange='block', routing_key='', body=message) + + rabbitmq_thread = threading.Thread(target=produce) + rabbitmq_thread.start() + + while int(instance.query('SELECT count() FROM test.view')) == 0: + time.sleep(1) + + cancel.set() + + instance.query(''' + DROP TABLE test.rabbitmq; + ''') + + while int(instance.query("SELECT count() FROM system.tables WHERE database='test' AND name='rabbitmq'")) == 1: + time.sleep(1) + + instance.query(''' + CREATE TABLE test.rabbitmq (key UInt64, value UInt64) + ENGINE = RabbitMQ + SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', + rabbitmq_exchange_name = 'block', + rabbitmq_format = 'JSONEachRow', + rabbitmq_max_block_size = 100, + rabbitmq_queue_base = 'block', + rabbitmq_row_delimiter = '\\n'; + ''') + + while int(instance.query('SELECT uniqExact(key) FROM test.view')) < i[0]: + time.sleep(1) + + result = int(instance.query('SELECT count() == uniqExact(key) FROM test.view')) + + instance.query(''' + DROP TABLE test.consumer; + DROP TABLE test.view; + ''') + + rabbitmq_thread.join() + connection.close() + + assert result == 1, 'Messages from RabbitMQ get duplicated!' + + if __name__ == '__main__': cluster.start() raw_input("Cluster created, press any key to destroy...")