From 8f2055b0a019add57278b48cfd7be3384de39d44 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 13 Jul 2020 04:11:35 +0300 Subject: [PATCH 1/2] Fix trash from RabbitMQ --- contrib/amqpcpp-cmake/CMakeLists.txt | 4 +-- .../Formats/Impl/ArrowBlockInputFormat.cpp | 3 +++ .../Formats/Impl/ArrowBlockOutputFormat.cpp | 3 +++ .../Formats/Impl/ArrowBufferedStreams.cpp | 3 +++ .../RabbitMQ/RabbitMQBlockInputStream.cpp | 4 +-- .../RabbitMQ/RabbitMQBlockInputStream.h | 11 ++++---- .../ReadBufferFromRabbitMQConsumer.cpp | 26 ++++++++++--------- .../RabbitMQ/ReadBufferFromRabbitMQConsumer.h | 1 - src/Storages/RabbitMQ/StorageRabbitMQ.cpp | 8 +++--- .../WriteBufferToRabbitMQProducer.cpp | 14 +++++----- .../RabbitMQ/WriteBufferToRabbitMQProducer.h | 1 - 11 files changed, 43 insertions(+), 35 deletions(-) diff --git a/contrib/amqpcpp-cmake/CMakeLists.txt b/contrib/amqpcpp-cmake/CMakeLists.txt index 452a5f7f6aa..ca79b84c523 100644 --- a/contrib/amqpcpp-cmake/CMakeLists.txt +++ b/contrib/amqpcpp-cmake/CMakeLists.txt @@ -24,7 +24,7 @@ set (SRCS add_library(amqp-cpp ${SRCS}) target_compile_options (amqp-cpp - PUBLIC + PRIVATE -Wno-old-style-cast -Wno-inconsistent-missing-destructor-override -Wno-deprecated @@ -38,7 +38,7 @@ target_compile_options (amqp-cpp -w ) -target_include_directories (amqp-cpp PUBLIC ${LIBRARY_DIR}/include) +target_include_directories (amqp-cpp SYSTEM PUBLIC ${LIBRARY_DIR}/include) target_link_libraries (amqp-cpp PUBLIC ssl) diff --git a/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp b/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp index 2873a5417ea..74aced62064 100644 --- a/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp @@ -11,6 +11,9 @@ #include "ArrowBufferedStreams.h" #include "ArrowColumnToCHColumn.h" +#pragma GCC diagnostic ignored "-Wdeprecated" + + namespace DB { diff --git a/src/Processors/Formats/Impl/ArrowBlockOutputFormat.cpp b/src/Processors/Formats/Impl/ArrowBlockOutputFormat.cpp index ef6bd315319..78248ebc2dd 100644 --- a/src/Processors/Formats/Impl/ArrowBlockOutputFormat.cpp +++ b/src/Processors/Formats/Impl/ArrowBlockOutputFormat.cpp @@ -8,6 +8,9 @@ #include "ArrowBufferedStreams.h" #include "CHColumnToArrowColumn.h" +#pragma GCC diagnostic ignored "-Wdeprecated" + + namespace DB { namespace ErrorCodes diff --git a/src/Processors/Formats/Impl/ArrowBufferedStreams.cpp b/src/Processors/Formats/Impl/ArrowBufferedStreams.cpp index 0eaedfa7742..a3635b7d5cf 100644 --- a/src/Processors/Formats/Impl/ArrowBufferedStreams.cpp +++ b/src/Processors/Formats/Impl/ArrowBufferedStreams.cpp @@ -11,6 +11,9 @@ #include +#pragma GCC diagnostic ignored "-Wdeprecated" + + namespace DB { diff --git a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp index e10a4eb0f96..83e3a02b478 100644 --- a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp +++ b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp @@ -15,13 +15,11 @@ RabbitMQBlockInputStream::RabbitMQBlockInputStream( StorageRabbitMQ & storage_, const StorageMetadataPtr & metadata_snapshot_, const Context & context_, - const Names & columns, - Poco::Logger * log_) + const Names & columns) : storage(storage_) , metadata_snapshot(metadata_snapshot_) , context(context_) , column_names(columns) - , log(log_) , non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized()) , virtual_header(metadata_snapshot->getSampleBlockForColumns({"_exchange"}, storage.getVirtuals(), storage.getStorageID())) { diff --git a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.h b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.h index 7db80065608..f4ab76f72cf 100644 --- a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.h +++ b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.h @@ -8,6 +8,7 @@ namespace DB { + class RabbitMQBlockInputStream : public IBlockInputStream { @@ -16,8 +17,7 @@ public: StorageRabbitMQ & storage_, const StorageMetadataPtr & metadata_snapshot_, const Context & context_, - const Names & columns, - Poco::Logger * log_); + const Names & columns); ~RabbitMQBlockInputStream() override; @@ -32,9 +32,10 @@ private: StorageMetadataPtr metadata_snapshot; Context context; Names column_names; - Poco::Logger * log; - bool finished = false, claimed = false; - const Block non_virtual_header, virtual_header; + bool finished = false; + bool claimed = false; + const Block non_virtual_header; + const Block virtual_header; ConsumerBufferPtr buffer; }; diff --git a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp index be42749300d..6bd5c36e757 100644 --- a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp +++ b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp @@ -51,14 +51,14 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer( , exchange_name(exchange_name_) , routing_keys(routing_keys_) , channel_id(channel_id_) - , log(log_) - , row_delimiter(row_delimiter_) , bind_by_id(bind_by_id_) , num_queues(num_queues_) , exchange_type(exchange_type_) , local_exchange(local_exchange_) , local_default_exchange(local_exchange + "_" + ExchangeType::DIRECT) , local_hash_exchange(local_exchange + "_" + ExchangeType::HASH) + , log(log_) + , row_delimiter(row_delimiter_) , stopped(stopped_) , messages(QUEUE_SIZE * num_queues) { @@ -146,16 +146,18 @@ void ReadBufferFromRabbitMQConsumer::initExchange() * in current case we use hash exchange for binding to another exchange of some other type, which needs its own routing keys * of other types: headers, patterns and string-keys. This means that hash property must be changed. */ - AMQP::Table binding_arguments; - binding_arguments["hash-property"] = "message_id"; - - /// Declare exchange for sharding. - consumer_channel->declareExchange(local_hash_exchange, AMQP::consistent_hash, binding_arguments) - .onError([&](const char * message) { - local_exchange_declared = false; - LOG_ERROR(log, "Failed to declare {} exchange: {}", exchange_type, message); - }); + AMQP::Table binding_arguments; + binding_arguments["hash-property"] = "message_id"; + + /// Declare exchange for sharding. + consumer_channel->declareExchange(local_hash_exchange, AMQP::consistent_hash, binding_arguments) + .onError([&](const char * message) + { + local_exchange_declared = false; + LOG_ERROR(log, "Failed to declare {} exchange: {}", exchange_type, message); + }); + } /// Then bind client's exchange to sharding exchange (by keys, specified by the client): @@ -325,7 +327,7 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id) * It is important at this moment to make sure that queue bindings are created before any publishing can happen because * otherwise messages will be routed nowhere. */ - while (!default_bindings_created && !default_bindings_error || (exchange_type_set && !bindings_created && !bindings_error)) + while ((!default_bindings_created && !default_bindings_error) || (exchange_type_set && !bindings_created && !bindings_error)) { iterateEventLoop(); } diff --git a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h index 9dbb42bd648..51ef8ceba3e 100644 --- a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h +++ b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h @@ -60,7 +60,6 @@ private: Poco::Logger * log; char row_delimiter; - bool stalled = false; bool allowed = true; const std::atomic & stopped; diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index 9d5e7fcd652..a3d16087e34 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -75,11 +75,11 @@ StorageRabbitMQ::StorageRabbitMQ( , exchange_type(exchange_type_) , use_transactional_channel(use_transactional_channel_) , log(&Poco::Logger::get("StorageRabbitMQ (" + table_id_.table_name + ")")) - , semaphore(0, num_consumers_) + , parsed_address(parseAddress(global_context.getMacros()->expand(host_port_), 5672)) , login_password(std::make_pair( global_context.getConfigRef().getString("rabbitmq.username"), global_context.getConfigRef().getString("rabbitmq.password"))) - , parsed_address(parseAddress(global_context.getMacros()->expand(host_port_), 5672)) + , semaphore(0, num_consumers_) { loop = std::make_unique(); uv_loop_init(loop.get()); @@ -158,7 +158,7 @@ Pipes StorageRabbitMQ::read( for (size_t i = 0; i < num_created_consumers; ++i) { auto rabbit_stream = std::make_shared( - *this, metadata_snapshot, context, column_names, log); + *this, metadata_snapshot, context, column_names); auto converting_stream = std::make_shared( rabbit_stream, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Name); pipes.emplace_back(std::make_shared(converting_stream)); @@ -364,7 +364,7 @@ bool StorageRabbitMQ::streamToViews() auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID()); for (size_t i = 0; i < num_created_consumers; ++i) { - auto rabbit_stream = std::make_shared(*this, metadata_snapshot, rabbitmq_context, column_names, log); + auto rabbit_stream = std::make_shared(*this, metadata_snapshot, rabbitmq_context, column_names); auto converting_stream = std::make_shared(rabbit_stream, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Name); streams.emplace_back(converting_stream); diff --git a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp index d96a1c02db8..57ef2405255 100644 --- a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp +++ b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp @@ -40,14 +40,14 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer( , login_password(login_password_) , routing_key(routing_key_) , exchange_name(exchange_ + "_direct") - , log(log_) - , num_queues(num_queues_) , bind_by_id(bind_by_id_) + , num_queues(num_queues_) , use_transactional_channel(use_transactional_channel_) + , payloads(QUEUE_SIZE * num_queues) + , log(log_) , delim(delimiter) , max_rows(rows_per_message) , chunk_size(chunk_size_) - , payloads(QUEUE_SIZE * num_queues) { loop = std::make_unique(); @@ -187,19 +187,19 @@ void WriteBufferToRabbitMQProducer::finilizeProducer() answer_received = true; LOG_TRACE(log, "All messages were successfully published"); }) - .onError([&](const char * message) + .onError([&](const char * message1) { answer_received = true; wait_rollback = true; - LOG_TRACE(log, "Publishing not successful: {}", message); + LOG_TRACE(log, "Publishing not successful: {}", message1); producer_channel->rollbackTransaction() .onSuccess([&]() { wait_rollback = false; }) - .onError([&](const char * message) + .onError([&](const char * message2) { - LOG_ERROR(log, "Failed to rollback transaction: {}", message); + LOG_ERROR(log, "Failed to rollback transaction: {}", message2); wait_rollback = false; }); }); diff --git a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h index 26a52b0b41c..8dc5a32b7d7 100644 --- a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h +++ b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h @@ -68,7 +68,6 @@ private: const std::optional delim; const size_t max_rows; const size_t chunk_size; - size_t count_mes = 0; size_t rows = 0; std::list chunks; }; From 6edf43d1121b98b2a091495a05fae7c51e1460d1 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 13 Jul 2020 19:51:37 +0300 Subject: [PATCH 2/2] Fix build --- src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp | 1 + src/Processors/Formats/Impl/ArrowBlockOutputFormat.cpp | 1 + src/Processors/Formats/Impl/ArrowBufferedStreams.cpp | 1 + 3 files changed, 3 insertions(+) diff --git a/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp b/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp index 74aced62064..d18f4f4be28 100644 --- a/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp @@ -12,6 +12,7 @@ #include "ArrowColumnToCHColumn.h" #pragma GCC diagnostic ignored "-Wdeprecated" +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" namespace DB diff --git a/src/Processors/Formats/Impl/ArrowBlockOutputFormat.cpp b/src/Processors/Formats/Impl/ArrowBlockOutputFormat.cpp index 78248ebc2dd..6cfeca073a3 100644 --- a/src/Processors/Formats/Impl/ArrowBlockOutputFormat.cpp +++ b/src/Processors/Formats/Impl/ArrowBlockOutputFormat.cpp @@ -9,6 +9,7 @@ #include "CHColumnToArrowColumn.h" #pragma GCC diagnostic ignored "-Wdeprecated" +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" namespace DB diff --git a/src/Processors/Formats/Impl/ArrowBufferedStreams.cpp b/src/Processors/Formats/Impl/ArrowBufferedStreams.cpp index a3635b7d5cf..860e2b2559e 100644 --- a/src/Processors/Formats/Impl/ArrowBufferedStreams.cpp +++ b/src/Processors/Formats/Impl/ArrowBufferedStreams.cpp @@ -12,6 +12,7 @@ #include #pragma GCC diagnostic ignored "-Wdeprecated" +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" namespace DB