Merge pull request #12448 from ClickHouse/fix-trash-rabbitmq

Fix trash from RabbitMQ
This commit is contained in:
alexey-milovidov 2020-07-14 01:11:37 +03:00 committed by GitHub
commit 1893d89ce3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 46 additions and 35 deletions

View File

@ -24,7 +24,7 @@ set (SRCS
add_library(amqp-cpp ${SRCS})
target_compile_options (amqp-cpp
PUBLIC
PRIVATE
-Wno-old-style-cast
-Wno-inconsistent-missing-destructor-override
-Wno-deprecated
@ -38,7 +38,7 @@ target_compile_options (amqp-cpp
-w
)
target_include_directories (amqp-cpp PUBLIC ${LIBRARY_DIR}/include)
target_include_directories (amqp-cpp SYSTEM PUBLIC ${LIBRARY_DIR}/include)
target_link_libraries (amqp-cpp PUBLIC ssl)

View File

@ -11,6 +11,10 @@
#include "ArrowBufferedStreams.h"
#include "ArrowColumnToCHColumn.h"
#pragma GCC diagnostic ignored "-Wdeprecated"
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
namespace DB
{

View File

@ -8,6 +8,10 @@
#include "ArrowBufferedStreams.h"
#include "CHColumnToArrowColumn.h"
#pragma GCC diagnostic ignored "-Wdeprecated"
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
namespace DB
{
namespace ErrorCodes

View File

@ -11,6 +11,10 @@
#include <sys/stat.h>
#pragma GCC diagnostic ignored "-Wdeprecated"
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
namespace DB
{

View File

@ -15,13 +15,11 @@ RabbitMQBlockInputStream::RabbitMQBlockInputStream(
StorageRabbitMQ & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const Context & context_,
const Names & columns,
Poco::Logger * log_)
const Names & columns)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, context(context_)
, column_names(columns)
, log(log_)
, non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized())
, virtual_header(metadata_snapshot->getSampleBlockForColumns({"_exchange"}, storage.getVirtuals(), storage.getStorageID()))
{

View File

@ -8,6 +8,7 @@
namespace DB
{
class RabbitMQBlockInputStream : public IBlockInputStream
{
@ -16,8 +17,7 @@ public:
StorageRabbitMQ & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const Context & context_,
const Names & columns,
Poco::Logger * log_);
const Names & columns);
~RabbitMQBlockInputStream() override;
@ -32,9 +32,10 @@ private:
StorageMetadataPtr metadata_snapshot;
Context context;
Names column_names;
Poco::Logger * log;
bool finished = false, claimed = false;
const Block non_virtual_header, virtual_header;
bool finished = false;
bool claimed = false;
const Block non_virtual_header;
const Block virtual_header;
ConsumerBufferPtr buffer;
};

View File

@ -51,14 +51,14 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
, exchange_name(exchange_name_)
, routing_keys(routing_keys_)
, channel_id(channel_id_)
, log(log_)
, row_delimiter(row_delimiter_)
, bind_by_id(bind_by_id_)
, num_queues(num_queues_)
, exchange_type(exchange_type_)
, local_exchange(local_exchange_)
, local_default_exchange(local_exchange + "_" + ExchangeType::DIRECT)
, local_hash_exchange(local_exchange + "_" + ExchangeType::HASH)
, log(log_)
, row_delimiter(row_delimiter_)
, stopped(stopped_)
, messages(QUEUE_SIZE * num_queues)
{
@ -146,16 +146,18 @@ void ReadBufferFromRabbitMQConsumer::initExchange()
* in current case we use hash exchange for binding to another exchange of some other type, which needs its own routing keys
* of other types: headers, patterns and string-keys. This means that hash property must be changed.
*/
AMQP::Table binding_arguments;
binding_arguments["hash-property"] = "message_id";
/// Declare exchange for sharding.
consumer_channel->declareExchange(local_hash_exchange, AMQP::consistent_hash, binding_arguments)
.onError([&](const char * message)
{
local_exchange_declared = false;
LOG_ERROR(log, "Failed to declare {} exchange: {}", exchange_type, message);
});
AMQP::Table binding_arguments;
binding_arguments["hash-property"] = "message_id";
/// Declare exchange for sharding.
consumer_channel->declareExchange(local_hash_exchange, AMQP::consistent_hash, binding_arguments)
.onError([&](const char * message)
{
local_exchange_declared = false;
LOG_ERROR(log, "Failed to declare {} exchange: {}", exchange_type, message);
});
}
/// Then bind client's exchange to sharding exchange (by keys, specified by the client):
@ -325,7 +327,7 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
* It is important at this moment to make sure that queue bindings are created before any publishing can happen because
* otherwise messages will be routed nowhere.
*/
while (!default_bindings_created && !default_bindings_error || (exchange_type_set && !bindings_created && !bindings_error))
while ((!default_bindings_created && !default_bindings_error) || (exchange_type_set && !bindings_created && !bindings_error))
{
iterateEventLoop();
}

View File

@ -60,7 +60,6 @@ private:
Poco::Logger * log;
char row_delimiter;
bool stalled = false;
bool allowed = true;
const std::atomic<bool> & stopped;

View File

@ -75,11 +75,11 @@ StorageRabbitMQ::StorageRabbitMQ(
, exchange_type(exchange_type_)
, use_transactional_channel(use_transactional_channel_)
, log(&Poco::Logger::get("StorageRabbitMQ (" + table_id_.table_name + ")"))
, semaphore(0, num_consumers_)
, parsed_address(parseAddress(global_context.getMacros()->expand(host_port_), 5672))
, login_password(std::make_pair(
global_context.getConfigRef().getString("rabbitmq.username"),
global_context.getConfigRef().getString("rabbitmq.password")))
, parsed_address(parseAddress(global_context.getMacros()->expand(host_port_), 5672))
, semaphore(0, num_consumers_)
{
loop = std::make_unique<uv_loop_t>();
uv_loop_init(loop.get());
@ -158,7 +158,7 @@ Pipes StorageRabbitMQ::read(
for (size_t i = 0; i < num_created_consumers; ++i)
{
auto rabbit_stream = std::make_shared<RabbitMQBlockInputStream>(
*this, metadata_snapshot, context, column_names, log);
*this, metadata_snapshot, context, column_names);
auto converting_stream = std::make_shared<ConvertingBlockInputStream>(
rabbit_stream, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Name);
pipes.emplace_back(std::make_shared<SourceFromInputStream>(converting_stream));
@ -364,7 +364,7 @@ bool StorageRabbitMQ::streamToViews()
auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
for (size_t i = 0; i < num_created_consumers; ++i)
{
auto rabbit_stream = std::make_shared<RabbitMQBlockInputStream>(*this, metadata_snapshot, rabbitmq_context, column_names, log);
auto rabbit_stream = std::make_shared<RabbitMQBlockInputStream>(*this, metadata_snapshot, rabbitmq_context, column_names);
auto converting_stream = std::make_shared<ConvertingBlockInputStream>(rabbit_stream, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Name);
streams.emplace_back(converting_stream);

View File

@ -40,14 +40,14 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
, login_password(login_password_)
, routing_key(routing_key_)
, exchange_name(exchange_ + "_direct")
, log(log_)
, num_queues(num_queues_)
, bind_by_id(bind_by_id_)
, num_queues(num_queues_)
, use_transactional_channel(use_transactional_channel_)
, payloads(QUEUE_SIZE * num_queues)
, log(log_)
, delim(delimiter)
, max_rows(rows_per_message)
, chunk_size(chunk_size_)
, payloads(QUEUE_SIZE * num_queues)
{
loop = std::make_unique<uv_loop_t>();
@ -187,19 +187,19 @@ void WriteBufferToRabbitMQProducer::finilizeProducer()
answer_received = true;
LOG_TRACE(log, "All messages were successfully published");
})
.onError([&](const char * message)
.onError([&](const char * message1)
{
answer_received = true;
wait_rollback = true;
LOG_TRACE(log, "Publishing not successful: {}", message);
LOG_TRACE(log, "Publishing not successful: {}", message1);
producer_channel->rollbackTransaction()
.onSuccess([&]()
{
wait_rollback = false;
})
.onError([&](const char * message)
.onError([&](const char * message2)
{
LOG_ERROR(log, "Failed to rollback transaction: {}", message);
LOG_ERROR(log, "Failed to rollback transaction: {}", message2);
wait_rollback = false;
});
});

View File

@ -68,7 +68,6 @@ private:
const std::optional<char> delim;
const size_t max_rows;
const size_t chunk_size;
size_t count_mes = 0;
size_t rows = 0;
std::list<std::string> chunks;
};