mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-28 02:21:59 +00:00
Fix data race, refactor
This commit is contained in:
parent
eafa3e8f64
commit
0b62354261
@ -5,6 +5,7 @@
|
||||
#include <memory>
|
||||
#include <Storages/RabbitMQ/RabbitMQConsumer.h>
|
||||
#include <Storages/RabbitMQ/RabbitMQHandler.h>
|
||||
#include <Storages/RabbitMQ/RabbitMQConnection.h>
|
||||
#include <IO/ReadBufferFromMemory.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include "Poco/Timer.h"
|
||||
@ -34,7 +35,7 @@ RabbitMQConsumer::RabbitMQConsumer(
|
||||
{
|
||||
}
|
||||
|
||||
void RabbitMQConsumer::shutdown()
|
||||
void RabbitMQConsumer::stop()
|
||||
{
|
||||
stopped = true;
|
||||
cv.notify_one();
|
||||
@ -53,10 +54,9 @@ void RabbitMQConsumer::subscribe()
|
||||
consumer_channel->consume(queue_name)
|
||||
.onSuccess([&](const std::string & /* consumer_tag */)
|
||||
{
|
||||
LOG_TRACE(log, "Consumer on channel {} is subscribed to queue {}", channel_id, queue_name);
|
||||
|
||||
if (++subscribed == queues.size())
|
||||
wait_subscription.store(false);
|
||||
LOG_TRACE(
|
||||
log, "Consumer on channel {} ({}/{}) is subscribed to queue {}",
|
||||
channel_id, subscriptions_num, queues.size(), queue_name);
|
||||
})
|
||||
.onReceived([&](const AMQP::Message & message, uint64_t delivery_tag, bool redelivered)
|
||||
{
|
||||
@ -64,9 +64,15 @@ void RabbitMQConsumer::subscribe()
|
||||
{
|
||||
String message_received = std::string(message.body(), message.body() + message.bodySize());
|
||||
|
||||
if (!received.push({message_received, message.hasMessageID() ? message.messageID() : "",
|
||||
message.hasTimestamp() ? message.timestamp() : 0,
|
||||
redelivered, AckTracker(delivery_tag, channel_id)}))
|
||||
MessageData result{
|
||||
.message = message_received,
|
||||
.message_id = message.hasMessageID() ? message.messageID() : "",
|
||||
.timestamp = message.hasTimestamp() ? message.timestamp() : 0,
|
||||
.redelivered = redelivered,
|
||||
.delivery_tag = delivery_tag,
|
||||
.channel_id = channel_id};
|
||||
|
||||
if (!received.push(std::move(result)))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to received queue");
|
||||
|
||||
cv.notify_one();
|
||||
@ -74,98 +80,100 @@ void RabbitMQConsumer::subscribe()
|
||||
})
|
||||
.onError([&](const char * message)
|
||||
{
|
||||
/* End up here either if channel ends up in an error state (then there will be resubscription) or consume call error, which
|
||||
* arises from queue settings mismatch or queue level error, which should not happen as no one else is supposed to touch them
|
||||
/* End up here either if channel ends up in an error state (then there will be resubscription)
|
||||
* or consume call error, which arises from queue settings mismatch or queue level error,
|
||||
* which should not happen as no one else is supposed to touch them
|
||||
*/
|
||||
LOG_ERROR(log, "Consumer failed on channel {}. Reason: {}", channel_id, message);
|
||||
wait_subscription.store(false);
|
||||
state = State::ERROR;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
bool RabbitMQConsumer::ackMessages()
|
||||
bool RabbitMQConsumer::ackMessages(const CommitInfo & commit_info)
|
||||
{
|
||||
AckTracker record_info = last_inserted_record_info;
|
||||
if (state != State::OK)
|
||||
return false;
|
||||
|
||||
/* Do not send ack to server if message's channel is not the same as current running channel because delivery tags are scoped per
|
||||
* channel, so if channel fails, all previous delivery tags become invalid
|
||||
*/
|
||||
if (record_info.channel_id == channel_id && record_info.delivery_tag && record_info.delivery_tag > prev_tag)
|
||||
/// Nothing to ack.
|
||||
if (!commit_info.delivery_tag)
|
||||
return false;
|
||||
|
||||
/// Do not send ack to server if message's channel is not the same as
|
||||
/// current running channel because delivery tags are scoped per channel,
|
||||
/// so if channel fails, all previous delivery tags become invalid.
|
||||
if (commit_info.channel_id != channel_id)
|
||||
return false;
|
||||
|
||||
/// Duplicate ack?
|
||||
if (commit_info.delivery_tag > last_commited_delivery_tag
|
||||
&& consumer_channel->ack(commit_info.delivery_tag, AMQP::multiple))
|
||||
{
|
||||
/// Commit all received messages with delivery tags from last committed to last inserted
|
||||
if (!consumer_channel->ack(record_info.delivery_tag, AMQP::multiple))
|
||||
{
|
||||
LOG_ERROR(log, "Failed to commit messages with delivery tags from last committed to {} on channel {}",
|
||||
record_info.delivery_tag, channel_id);
|
||||
return false;
|
||||
}
|
||||
last_commited_delivery_tag = commit_info.delivery_tag;
|
||||
|
||||
prev_tag = record_info.delivery_tag;
|
||||
LOG_TRACE(log, "Consumer committed messages with deliveryTags up to {} on channel {}", record_info.delivery_tag, channel_id);
|
||||
LOG_TRACE(
|
||||
log, "Consumer committed messages with deliveryTags up to {} on channel {}",
|
||||
last_commited_delivery_tag, channel_id);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
return true;
|
||||
LOG_ERROR(
|
||||
log,
|
||||
"Did not commit messages for {}:{}, (current commit point {}:{})",
|
||||
commit_info.channel_id, commit_info.delivery_tag,
|
||||
channel_id, last_commited_delivery_tag);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
void RabbitMQConsumer::updateAckTracker(AckTracker record_info)
|
||||
void RabbitMQConsumer::updateChannel(RabbitMQConnection & connection)
|
||||
{
|
||||
if (record_info.delivery_tag && channel_error.load())
|
||||
return;
|
||||
|
||||
if (!record_info.delivery_tag)
|
||||
prev_tag = 0;
|
||||
|
||||
last_inserted_record_info = record_info;
|
||||
}
|
||||
|
||||
|
||||
void RabbitMQConsumer::setupChannel()
|
||||
{
|
||||
if (!consumer_channel)
|
||||
return;
|
||||
|
||||
wait_subscription.store(true);
|
||||
state = State::INITIALIZING;
|
||||
last_commited_delivery_tag = 0;
|
||||
|
||||
consumer_channel = connection.createChannel();
|
||||
consumer_channel->onReady([&]()
|
||||
{
|
||||
/* First number indicates current consumer buffer; second number indicates serial number of created channel for current buffer,
|
||||
* i.e. if channel fails - another one is created and its serial number is incremented; channel_base is to guarantee that
|
||||
* channel_id is unique for each table
|
||||
*/
|
||||
channel_id = std::to_string(channel_id_base) + "_" + std::to_string(channel_id_counter++) + "_" + channel_base;
|
||||
LOG_TRACE(log, "Channel {} is created", channel_id);
|
||||
try
|
||||
{
|
||||
/// 1. channel_id_base - indicates current consumer buffer.
|
||||
/// 2. channel_id_couner - indicates serial number of created channel for current buffer
|
||||
/// (incremented on each channel update).
|
||||
/// 3. channel_base is to guarantee that channel_id is unique for each table.
|
||||
channel_id = fmt::format("{}_{}_{}", channel_id_base, channel_id_counter++, channel_base);
|
||||
|
||||
subscribed = 0;
|
||||
subscribe();
|
||||
channel_error.store(false);
|
||||
LOG_TRACE(log, "Channel {} is successfully created", channel_id);
|
||||
|
||||
subscriptions_num = 0;
|
||||
subscribe();
|
||||
|
||||
state = State::OK;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
state = State::ERROR;
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
});
|
||||
|
||||
consumer_channel->onError([&](const char * message)
|
||||
{
|
||||
LOG_ERROR(log, "Channel {} error: {}", channel_id, message);
|
||||
|
||||
channel_error.store(true);
|
||||
wait_subscription.store(false);
|
||||
LOG_ERROR(log, "Channel {} in an error state: {}", channel_id, message);
|
||||
state = State::ERROR;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
bool RabbitMQConsumer::needChannelUpdate()
|
||||
{
|
||||
if (wait_subscription)
|
||||
return false;
|
||||
|
||||
return channel_error || !consumer_channel || !consumer_channel->usable();
|
||||
chassert(consumer_channel);
|
||||
return state == State::ERROR;
|
||||
}
|
||||
|
||||
|
||||
void RabbitMQConsumer::iterateEventLoop()
|
||||
{
|
||||
event_handler.iterateLoop();
|
||||
}
|
||||
|
||||
ReadBufferPtr RabbitMQConsumer::consume()
|
||||
{
|
||||
if (stopped || !received.tryPop(current))
|
||||
|
@ -19,6 +19,7 @@ namespace DB
|
||||
{
|
||||
|
||||
class RabbitMQHandler;
|
||||
class RabbitMQConnection;
|
||||
using ChannelPtr = std::unique_ptr<AMQP::TcpChannel>;
|
||||
static constexpr auto SANITY_TIMEOUT = 1000 * 60 * 10; /// 10min.
|
||||
|
||||
@ -27,55 +28,44 @@ class RabbitMQConsumer
|
||||
|
||||
public:
|
||||
RabbitMQConsumer(
|
||||
RabbitMQHandler & event_handler_,
|
||||
std::vector<String> & queues_,
|
||||
size_t channel_id_base_,
|
||||
const String & channel_base_,
|
||||
Poco::Logger * log_,
|
||||
uint32_t queue_size_);
|
||||
RabbitMQHandler & event_handler_,
|
||||
std::vector<String> & queues_,
|
||||
size_t channel_id_base_,
|
||||
const String & channel_base_,
|
||||
Poco::Logger * log_,
|
||||
uint32_t queue_size_);
|
||||
|
||||
struct AckTracker
|
||||
struct CommitInfo
|
||||
{
|
||||
UInt64 delivery_tag;
|
||||
UInt64 delivery_tag = 0;
|
||||
String channel_id;
|
||||
|
||||
AckTracker() = default;
|
||||
AckTracker(UInt64 tag, String id) : delivery_tag(tag), channel_id(id) {}
|
||||
};
|
||||
|
||||
struct MessageData
|
||||
{
|
||||
String message;
|
||||
String message_id;
|
||||
uint64_t timestamp = 0;
|
||||
UInt64 timestamp = 0;
|
||||
bool redelivered = false;
|
||||
AckTracker track{};
|
||||
UInt64 delivery_tag = 0;
|
||||
String channel_id;
|
||||
};
|
||||
const MessageData & currentMessage() { return current; }
|
||||
|
||||
/// Return read buffer containing next available message
|
||||
/// or nullptr if there are no messages to process.
|
||||
ReadBufferPtr consume();
|
||||
|
||||
ChannelPtr & getChannel() { return consumer_channel; }
|
||||
void setupChannel();
|
||||
bool needChannelUpdate();
|
||||
void shutdown();
|
||||
|
||||
void updateQueues(std::vector<String> & queues_) { queues = queues_; }
|
||||
size_t queuesCount() { return queues.size(); }
|
||||
void updateChannel(RabbitMQConnection & connection);
|
||||
|
||||
void stop();
|
||||
bool isConsumerStopped() const { return stopped.load(); }
|
||||
bool ackMessages();
|
||||
void updateAckTracker(AckTracker record = AckTracker());
|
||||
|
||||
bool ackMessages(const CommitInfo & commit_info);
|
||||
|
||||
bool hasPendingMessages() { return !received.empty(); }
|
||||
|
||||
auto getChannelID() const { return current.track.channel_id; }
|
||||
auto getDeliveryTag() const { return current.track.delivery_tag; }
|
||||
auto getRedelivered() const { return current.redelivered; }
|
||||
auto getMessageID() const { return current.message_id; }
|
||||
auto getTimestamp() const { return current.timestamp; }
|
||||
|
||||
void waitForMessages(std::optional<uint64_t> timeout_ms = std::nullopt)
|
||||
{
|
||||
std::unique_lock lock(mutex);
|
||||
@ -88,24 +78,36 @@ public:
|
||||
|
||||
private:
|
||||
void subscribe();
|
||||
void iterateEventLoop();
|
||||
bool isChannelUsable();
|
||||
void updateCommitInfo(CommitInfo record);
|
||||
|
||||
ChannelPtr consumer_channel;
|
||||
RabbitMQHandler & event_handler; /// Used concurrently, but is thread safe.
|
||||
std::vector<String> queues;
|
||||
|
||||
const std::vector<String> queues;
|
||||
const String channel_base;
|
||||
const size_t channel_id_base;
|
||||
|
||||
Poco::Logger * log;
|
||||
std::atomic<bool> stopped;
|
||||
|
||||
String channel_id;
|
||||
std::atomic<bool> channel_error = true, wait_subscription = false;
|
||||
UInt64 channel_id_counter = 0;
|
||||
|
||||
enum class State
|
||||
{
|
||||
NONE,
|
||||
INITIALIZING,
|
||||
OK,
|
||||
ERROR,
|
||||
};
|
||||
std::atomic<State> state = State::NONE;
|
||||
size_t subscriptions_num = 0;
|
||||
|
||||
ConcurrentBoundedQueue<MessageData> received;
|
||||
MessageData current;
|
||||
size_t subscribed = 0;
|
||||
|
||||
AckTracker last_inserted_record_info;
|
||||
UInt64 prev_tag = 0, channel_id_counter = 0;
|
||||
UInt64 last_commited_delivery_tag;
|
||||
|
||||
std::condition_variable cv;
|
||||
std::mutex mutex;
|
||||
|
@ -99,10 +99,7 @@ void RabbitMQSource::updateChannel()
|
||||
if (!consumer)
|
||||
return;
|
||||
|
||||
consumer->updateAckTracker();
|
||||
|
||||
if (storage.updateChannel(consumer->getChannel()))
|
||||
consumer->setupChannel();
|
||||
consumer->updateChannel(storage.getConnection());
|
||||
}
|
||||
|
||||
Chunk RabbitMQSource::generate()
|
||||
@ -137,6 +134,7 @@ Chunk RabbitMQSource::generateImpl()
|
||||
StreamingFormatExecutor executor(non_virtual_header, input_format);
|
||||
size_t total_rows = 0;
|
||||
|
||||
RabbitMQConsumer::CommitInfo current_commit_info;
|
||||
while (true)
|
||||
{
|
||||
size_t new_rows = 0;
|
||||
@ -149,26 +147,21 @@ Chunk RabbitMQSource::generateImpl()
|
||||
|
||||
if (new_rows)
|
||||
{
|
||||
auto exchange_name = storage.getExchange();
|
||||
auto channel_id = consumer->getChannelID();
|
||||
auto delivery_tag = consumer->getDeliveryTag();
|
||||
auto redelivered = consumer->getRedelivered();
|
||||
auto message_id = consumer->getMessageID();
|
||||
auto timestamp = consumer->getTimestamp();
|
||||
|
||||
consumer->updateAckTracker({delivery_tag, channel_id});
|
||||
const auto exchange_name = storage.getExchange();
|
||||
const auto & message = consumer->currentMessage();
|
||||
|
||||
for (size_t i = 0; i < new_rows; ++i)
|
||||
{
|
||||
virtual_columns[0]->insert(exchange_name);
|
||||
virtual_columns[1]->insert(channel_id);
|
||||
virtual_columns[2]->insert(delivery_tag);
|
||||
virtual_columns[3]->insert(redelivered);
|
||||
virtual_columns[4]->insert(message_id);
|
||||
virtual_columns[5]->insert(timestamp);
|
||||
virtual_columns[1]->insert(message.channel_id);
|
||||
virtual_columns[2]->insert(message.delivery_tag);
|
||||
virtual_columns[3]->insert(message.redelivered);
|
||||
virtual_columns[4]->insert(message.message_id);
|
||||
virtual_columns[5]->insert(message.timestamp);
|
||||
}
|
||||
|
||||
total_rows += new_rows;
|
||||
current_commit_info = {message.delivery_tag, message.channel_id};
|
||||
}
|
||||
else if (total_rows == 0)
|
||||
{
|
||||
@ -210,6 +203,7 @@ Chunk RabbitMQSource::generateImpl()
|
||||
for (auto & column : virtual_columns)
|
||||
result_columns.push_back(std::move(column));
|
||||
|
||||
commit_info = current_commit_info;
|
||||
return Chunk(std::move(result_columns), total_rows);
|
||||
}
|
||||
|
||||
@ -219,7 +213,7 @@ bool RabbitMQSource::sendAck()
|
||||
if (!consumer)
|
||||
return false;
|
||||
|
||||
if (!consumer->ackMessages())
|
||||
if (!consumer->ackMessages(commit_info))
|
||||
return false;
|
||||
|
||||
return true;
|
||||
|
@ -51,6 +51,8 @@ private:
|
||||
uint64_t max_execution_time_ms = 0;
|
||||
Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE};
|
||||
|
||||
RabbitMQConsumer::CommitInfo commit_info;
|
||||
|
||||
RabbitMQSource(
|
||||
StorageRabbitMQ & storage_,
|
||||
const StorageSnapshotPtr & storage_snapshot_,
|
||||
|
@ -371,6 +371,14 @@ void StorageRabbitMQ::initRabbitMQ()
|
||||
for (const auto i : collections::range(0, num_queues))
|
||||
bindQueue(i + 1, *rabbit_channel);
|
||||
|
||||
if (queues.size() != num_queues)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Expected all queues to be initialized (but having {}/{})",
|
||||
queues.size(), num_queues);
|
||||
}
|
||||
|
||||
LOG_TRACE(log, "RabbitMQ setup completed");
|
||||
rabbit_is_ready = true;
|
||||
rabbit_channel->close();
|
||||
@ -593,35 +601,6 @@ void StorageRabbitMQ::bindQueue(size_t queue_id, AMQP::TcpChannel & rabbit_chann
|
||||
}
|
||||
|
||||
|
||||
bool StorageRabbitMQ::updateChannel(ChannelPtr & channel)
|
||||
{
|
||||
try
|
||||
{
|
||||
channel = connection->createChannel();
|
||||
return true;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(log);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void StorageRabbitMQ::prepareChannelForConsumer(RabbitMQConsumerPtr consumer)
|
||||
{
|
||||
if (!consumer)
|
||||
return;
|
||||
|
||||
if (consumer->queuesCount() != queues.size())
|
||||
consumer->updateQueues(queues);
|
||||
|
||||
consumer->updateAckTracker();
|
||||
|
||||
if (updateChannel(consumer->getChannel()))
|
||||
consumer->setupChannel();
|
||||
}
|
||||
|
||||
void StorageRabbitMQ::unbindExchange()
|
||||
{
|
||||
/* This is needed because with RabbitMQ (without special adjustments) can't, for example, properly make mv if there was insert query
|
||||
@ -820,7 +799,7 @@ void StorageRabbitMQ::shutdown()
|
||||
shutdown_called = true;
|
||||
|
||||
for (auto & consumer : consumers_ref)
|
||||
consumer.lock()->shutdown();
|
||||
consumer.lock()->stop();
|
||||
|
||||
LOG_TRACE(log, "Deactivating background tasks");
|
||||
|
||||
@ -982,7 +961,7 @@ void StorageRabbitMQ::initializeBuffers()
|
||||
if (!initialized)
|
||||
{
|
||||
for (const auto & consumer : consumers)
|
||||
prepareChannelForConsumer(consumer);
|
||||
consumer->updateChannel(*connection);
|
||||
initialized = true;
|
||||
}
|
||||
}
|
||||
@ -1144,10 +1123,7 @@ bool StorageRabbitMQ::tryStreamToViews()
|
||||
++queue_empty;
|
||||
|
||||
if (source->needChannelUpdate())
|
||||
{
|
||||
auto consumer = source->getBuffer();
|
||||
prepareChannelForConsumer(consumer);
|
||||
}
|
||||
source->getBuffer()->updateChannel(*connection);
|
||||
|
||||
/* false is returned by the sendAck function in only two cases:
|
||||
* 1) if connection failed. In this case all channels will be closed and will be unable to send ack. Also ack is made based on
|
||||
|
@ -72,9 +72,7 @@ public:
|
||||
String getExchange() const { return exchange_name; }
|
||||
void unbindExchange();
|
||||
|
||||
bool updateChannel(ChannelPtr & channel);
|
||||
void updateQueues(std::vector<String> & queues_) { queues_ = queues; }
|
||||
void prepareChannelForConsumer(RabbitMQConsumerPtr consumer);
|
||||
RabbitMQConnection & getConnection() { return *connection; }
|
||||
|
||||
void incrementReader();
|
||||
void decrementReader();
|
||||
|
Loading…
Reference in New Issue
Block a user