mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Fix trash from RabbitMQ
This commit is contained in:
parent
5afe441324
commit
8f2055b0a0
@ -24,7 +24,7 @@ set (SRCS
|
||||
add_library(amqp-cpp ${SRCS})
|
||||
|
||||
target_compile_options (amqp-cpp
|
||||
PUBLIC
|
||||
PRIVATE
|
||||
-Wno-old-style-cast
|
||||
-Wno-inconsistent-missing-destructor-override
|
||||
-Wno-deprecated
|
||||
@ -38,7 +38,7 @@ target_compile_options (amqp-cpp
|
||||
-w
|
||||
)
|
||||
|
||||
target_include_directories (amqp-cpp PUBLIC ${LIBRARY_DIR}/include)
|
||||
target_include_directories (amqp-cpp SYSTEM PUBLIC ${LIBRARY_DIR}/include)
|
||||
|
||||
target_link_libraries (amqp-cpp PUBLIC ssl)
|
||||
|
||||
|
@ -11,6 +11,9 @@
|
||||
#include "ArrowBufferedStreams.h"
|
||||
#include "ArrowColumnToCHColumn.h"
|
||||
|
||||
#pragma GCC diagnostic ignored "-Wdeprecated"
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
|
@ -8,6 +8,9 @@
|
||||
#include "ArrowBufferedStreams.h"
|
||||
#include "CHColumnToArrowColumn.h"
|
||||
|
||||
#pragma GCC diagnostic ignored "-Wdeprecated"
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
|
@ -11,6 +11,9 @@
|
||||
|
||||
#include <sys/stat.h>
|
||||
|
||||
#pragma GCC diagnostic ignored "-Wdeprecated"
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
|
@ -15,13 +15,11 @@ RabbitMQBlockInputStream::RabbitMQBlockInputStream(
|
||||
StorageRabbitMQ & storage_,
|
||||
const StorageMetadataPtr & metadata_snapshot_,
|
||||
const Context & context_,
|
||||
const Names & columns,
|
||||
Poco::Logger * log_)
|
||||
const Names & columns)
|
||||
: storage(storage_)
|
||||
, metadata_snapshot(metadata_snapshot_)
|
||||
, context(context_)
|
||||
, column_names(columns)
|
||||
, log(log_)
|
||||
, non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized())
|
||||
, virtual_header(metadata_snapshot->getSampleBlockForColumns({"_exchange"}, storage.getVirtuals(), storage.getStorageID()))
|
||||
{
|
||||
|
@ -8,6 +8,7 @@
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class RabbitMQBlockInputStream : public IBlockInputStream
|
||||
{
|
||||
|
||||
@ -16,8 +17,7 @@ public:
|
||||
StorageRabbitMQ & storage_,
|
||||
const StorageMetadataPtr & metadata_snapshot_,
|
||||
const Context & context_,
|
||||
const Names & columns,
|
||||
Poco::Logger * log_);
|
||||
const Names & columns);
|
||||
|
||||
~RabbitMQBlockInputStream() override;
|
||||
|
||||
@ -32,9 +32,10 @@ private:
|
||||
StorageMetadataPtr metadata_snapshot;
|
||||
Context context;
|
||||
Names column_names;
|
||||
Poco::Logger * log;
|
||||
bool finished = false, claimed = false;
|
||||
const Block non_virtual_header, virtual_header;
|
||||
bool finished = false;
|
||||
bool claimed = false;
|
||||
const Block non_virtual_header;
|
||||
const Block virtual_header;
|
||||
|
||||
ConsumerBufferPtr buffer;
|
||||
};
|
||||
|
@ -51,14 +51,14 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
|
||||
, exchange_name(exchange_name_)
|
||||
, routing_keys(routing_keys_)
|
||||
, channel_id(channel_id_)
|
||||
, log(log_)
|
||||
, row_delimiter(row_delimiter_)
|
||||
, bind_by_id(bind_by_id_)
|
||||
, num_queues(num_queues_)
|
||||
, exchange_type(exchange_type_)
|
||||
, local_exchange(local_exchange_)
|
||||
, local_default_exchange(local_exchange + "_" + ExchangeType::DIRECT)
|
||||
, local_hash_exchange(local_exchange + "_" + ExchangeType::HASH)
|
||||
, log(log_)
|
||||
, row_delimiter(row_delimiter_)
|
||||
, stopped(stopped_)
|
||||
, messages(QUEUE_SIZE * num_queues)
|
||||
{
|
||||
@ -146,16 +146,18 @@ void ReadBufferFromRabbitMQConsumer::initExchange()
|
||||
* in current case we use hash exchange for binding to another exchange of some other type, which needs its own routing keys
|
||||
* of other types: headers, patterns and string-keys. This means that hash property must be changed.
|
||||
*/
|
||||
AMQP::Table binding_arguments;
|
||||
binding_arguments["hash-property"] = "message_id";
|
||||
|
||||
/// Declare exchange for sharding.
|
||||
consumer_channel->declareExchange(local_hash_exchange, AMQP::consistent_hash, binding_arguments)
|
||||
.onError([&](const char * message)
|
||||
{
|
||||
local_exchange_declared = false;
|
||||
LOG_ERROR(log, "Failed to declare {} exchange: {}", exchange_type, message);
|
||||
});
|
||||
AMQP::Table binding_arguments;
|
||||
binding_arguments["hash-property"] = "message_id";
|
||||
|
||||
/// Declare exchange for sharding.
|
||||
consumer_channel->declareExchange(local_hash_exchange, AMQP::consistent_hash, binding_arguments)
|
||||
.onError([&](const char * message)
|
||||
{
|
||||
local_exchange_declared = false;
|
||||
LOG_ERROR(log, "Failed to declare {} exchange: {}", exchange_type, message);
|
||||
});
|
||||
}
|
||||
|
||||
/// Then bind client's exchange to sharding exchange (by keys, specified by the client):
|
||||
|
||||
@ -325,7 +327,7 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
|
||||
* It is important at this moment to make sure that queue bindings are created before any publishing can happen because
|
||||
* otherwise messages will be routed nowhere.
|
||||
*/
|
||||
while (!default_bindings_created && !default_bindings_error || (exchange_type_set && !bindings_created && !bindings_error))
|
||||
while ((!default_bindings_created && !default_bindings_error) || (exchange_type_set && !bindings_created && !bindings_error))
|
||||
{
|
||||
iterateEventLoop();
|
||||
}
|
||||
|
@ -60,7 +60,6 @@ private:
|
||||
|
||||
Poco::Logger * log;
|
||||
char row_delimiter;
|
||||
bool stalled = false;
|
||||
bool allowed = true;
|
||||
const std::atomic<bool> & stopped;
|
||||
|
||||
|
@ -75,11 +75,11 @@ StorageRabbitMQ::StorageRabbitMQ(
|
||||
, exchange_type(exchange_type_)
|
||||
, use_transactional_channel(use_transactional_channel_)
|
||||
, log(&Poco::Logger::get("StorageRabbitMQ (" + table_id_.table_name + ")"))
|
||||
, semaphore(0, num_consumers_)
|
||||
, parsed_address(parseAddress(global_context.getMacros()->expand(host_port_), 5672))
|
||||
, login_password(std::make_pair(
|
||||
global_context.getConfigRef().getString("rabbitmq.username"),
|
||||
global_context.getConfigRef().getString("rabbitmq.password")))
|
||||
, parsed_address(parseAddress(global_context.getMacros()->expand(host_port_), 5672))
|
||||
, semaphore(0, num_consumers_)
|
||||
{
|
||||
loop = std::make_unique<uv_loop_t>();
|
||||
uv_loop_init(loop.get());
|
||||
@ -158,7 +158,7 @@ Pipes StorageRabbitMQ::read(
|
||||
for (size_t i = 0; i < num_created_consumers; ++i)
|
||||
{
|
||||
auto rabbit_stream = std::make_shared<RabbitMQBlockInputStream>(
|
||||
*this, metadata_snapshot, context, column_names, log);
|
||||
*this, metadata_snapshot, context, column_names);
|
||||
auto converting_stream = std::make_shared<ConvertingBlockInputStream>(
|
||||
rabbit_stream, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Name);
|
||||
pipes.emplace_back(std::make_shared<SourceFromInputStream>(converting_stream));
|
||||
@ -364,7 +364,7 @@ bool StorageRabbitMQ::streamToViews()
|
||||
auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
|
||||
for (size_t i = 0; i < num_created_consumers; ++i)
|
||||
{
|
||||
auto rabbit_stream = std::make_shared<RabbitMQBlockInputStream>(*this, metadata_snapshot, rabbitmq_context, column_names, log);
|
||||
auto rabbit_stream = std::make_shared<RabbitMQBlockInputStream>(*this, metadata_snapshot, rabbitmq_context, column_names);
|
||||
auto converting_stream = std::make_shared<ConvertingBlockInputStream>(rabbit_stream, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Name);
|
||||
|
||||
streams.emplace_back(converting_stream);
|
||||
|
@ -40,14 +40,14 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
|
||||
, login_password(login_password_)
|
||||
, routing_key(routing_key_)
|
||||
, exchange_name(exchange_ + "_direct")
|
||||
, log(log_)
|
||||
, num_queues(num_queues_)
|
||||
, bind_by_id(bind_by_id_)
|
||||
, num_queues(num_queues_)
|
||||
, use_transactional_channel(use_transactional_channel_)
|
||||
, payloads(QUEUE_SIZE * num_queues)
|
||||
, log(log_)
|
||||
, delim(delimiter)
|
||||
, max_rows(rows_per_message)
|
||||
, chunk_size(chunk_size_)
|
||||
, payloads(QUEUE_SIZE * num_queues)
|
||||
{
|
||||
|
||||
loop = std::make_unique<uv_loop_t>();
|
||||
@ -187,19 +187,19 @@ void WriteBufferToRabbitMQProducer::finilizeProducer()
|
||||
answer_received = true;
|
||||
LOG_TRACE(log, "All messages were successfully published");
|
||||
})
|
||||
.onError([&](const char * message)
|
||||
.onError([&](const char * message1)
|
||||
{
|
||||
answer_received = true;
|
||||
wait_rollback = true;
|
||||
LOG_TRACE(log, "Publishing not successful: {}", message);
|
||||
LOG_TRACE(log, "Publishing not successful: {}", message1);
|
||||
producer_channel->rollbackTransaction()
|
||||
.onSuccess([&]()
|
||||
{
|
||||
wait_rollback = false;
|
||||
})
|
||||
.onError([&](const char * message)
|
||||
.onError([&](const char * message2)
|
||||
{
|
||||
LOG_ERROR(log, "Failed to rollback transaction: {}", message);
|
||||
LOG_ERROR(log, "Failed to rollback transaction: {}", message2);
|
||||
wait_rollback = false;
|
||||
});
|
||||
});
|
||||
|
@ -68,7 +68,6 @@ private:
|
||||
const std::optional<char> delim;
|
||||
const size_t max_rows;
|
||||
const size_t chunk_size;
|
||||
size_t count_mes = 0;
|
||||
size_t rows = 0;
|
||||
std::list<std::string> chunks;
|
||||
};
|
||||
|
Loading…
Reference in New Issue
Block a user