Global refactoring

This commit is contained in:
kssenii 2020-08-28 08:52:02 +00:00
parent 4fecfdbe2f
commit 4e0c619721
14 changed files with 328 additions and 324 deletions

View File

@ -14,24 +14,24 @@ namespace DB
RabbitMQBlockInputStream::RabbitMQBlockInputStream(
StorageRabbitMQ & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const Context & context_,
const Names & columns)
const std::shared_ptr<Context> & context_,
const Names & columns,
bool ack_in_suffix_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, context(context_)
, column_names(columns)
, ack_in_suffix(ack_in_suffix_)
, non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized())
, virtual_header(metadata_snapshot->getSampleBlockForColumns(
{"_exchange_name", "_channel_id", "_delivery_tag", "_redelivered", "_message_id"}, storage.getVirtuals(), storage.getStorageID()))
{
if (!storage.getSchemaName().empty())
context.setSetting("format_schema", storage.getSchemaName());
}
RabbitMQBlockInputStream::~RabbitMQBlockInputStream()
{
if (!claimed)
if (!buffer)
return;
storage.pushReadBuffer(buffer);
@ -46,16 +46,29 @@ Block RabbitMQBlockInputStream::getHeader() const
void RabbitMQBlockInputStream::readPrefixImpl()
{
auto timeout = std::chrono::milliseconds(context.getSettingsRef().rabbitmq_max_wait_ms.totalMilliseconds());
auto timeout = std::chrono::milliseconds(context->getSettingsRef().rabbitmq_max_wait_ms.totalMilliseconds());
buffer = storage.popReadBuffer(timeout);
claimed = !!buffer;
}
if (!buffer || finished)
bool RabbitMQBlockInputStream::needManualChannelUpdate()
{
if (!buffer)
return false;
return !buffer->channelUsable() && buffer->channelAllowed() && storage.connectionRunning();
}
void RabbitMQBlockInputStream::updateChannel()
{
if (!buffer)
return;
if (!buffer->channelUsable() && (storage.connectionRunning() || storage.restoreConnection()))
buffer->restoreChannel(storage.getChannel());
buffer->updateAckTracker();
storage.updateChannel(buffer->getChannel());
buffer->setupChannel();
}
@ -70,7 +83,7 @@ Block RabbitMQBlockInputStream::readImpl()
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
auto input_format = FormatFactory::instance().getInputFormat(
storage.getFormatName(), *buffer, non_virtual_header, context, 1);
storage.getFormatName(), *buffer, non_virtual_header, *context, 1);
InputPort port(input_format->getPort().getHeader(), input_format.get());
connect(input_format->getPort(), port);
@ -151,7 +164,7 @@ Block RabbitMQBlockInputStream::readImpl()
buffer->allowNext();
if (!new_rows || !checkTimeLimit())
if (buffer->queueEmpty() || !checkTimeLimit())
break;
}
@ -162,9 +175,7 @@ Block RabbitMQBlockInputStream::readImpl()
auto virtual_block = virtual_header.cloneWithColumns(std::move(virtual_columns));
for (const auto & column : virtual_block.getColumnsWithTypeAndName())
{
result_block.insert(column);
}
return result_block;
}
@ -172,10 +183,19 @@ Block RabbitMQBlockInputStream::readImpl()
void RabbitMQBlockInputStream::readSuffixImpl()
{
if (!buffer)
return;
if (ack_in_suffix)
sendAck();
}
buffer->ackMessages();
bool RabbitMQBlockInputStream::sendAck()
{
if (!buffer || !buffer->channelUsable())
return false;
if (!buffer->ackMessages())
return false;
return true;
}
}

View File

@ -16,8 +16,9 @@ public:
RabbitMQBlockInputStream(
StorageRabbitMQ & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const Context & context_,
const Names & columns);
const std::shared_ptr<Context> & context_,
const Names & columns,
bool ack_in_suffix = true);
~RabbitMQBlockInputStream() override;
@ -28,13 +29,18 @@ public:
Block readImpl() override;
void readSuffixImpl() override;
void updateChannel();
bool needManualChannelUpdate();
bool sendAck();
private:
StorageRabbitMQ & storage;
StorageMetadataPtr metadata_snapshot;
Context context;
const std::shared_ptr<Context> context;
Names column_names;
bool ack_in_suffix;
bool finished = false;
bool claimed = false;
const Block non_virtual_header;
const Block virtual_header;

View File

@ -33,7 +33,7 @@ Block RabbitMQBlockOutputStream::getHeader() const
void RabbitMQBlockOutputStream::writePrefix()
{
if (storage.checkBridge())
if (!storage.exchangeRemoved())
storage.unbindExchange();
buffer = storage.createWriteBuffer();

View File

@ -13,17 +13,16 @@ RabbitMQHandler::RabbitMQHandler(uv_loop_t * loop_, Poco::Logger * log_) :
loop(loop_),
log(log_),
connection_running(false),
loop_running(false),
loop_state(Loop::STOP)
{
}
///Method that is called when the connection ends up in an error state.
void RabbitMQHandler::onError(AMQP::TcpConnection * connection, const char * message)
void RabbitMQHandler::onError(AMQP::TcpConnection * /* connection */, const char * message)
{
LOG_ERROR(log, "Library error report: {}", message);
connection_running.store(false);
if (connection)
connection->close();
}
void RabbitMQHandler::onReady(AMQP::TcpConnection * /* connection */)
@ -36,11 +35,15 @@ void RabbitMQHandler::onReady(AMQP::TcpConnection * /* connection */)
void RabbitMQHandler::startLoop()
{
std::lock_guard lock(startup_mutex);
LOG_DEBUG(log, "Background loop started");
loop_running.store(true);
while (loop_state.load() == Loop::RUN)
uv_run(loop, UV_RUN_NOWAIT);
LOG_DEBUG(log, "Background loop ended");
loop_running.store(false);
}
void RabbitMQHandler::iterateLoop()

View File

@ -22,12 +22,16 @@ class RabbitMQHandler : public AMQP::LibUvHandler
public:
RabbitMQHandler(uv_loop_t * loop_, Poco::Logger * log_);
void onError(AMQP::TcpConnection * connection, const char * message) override;
void onReady(AMQP::TcpConnection * connection) override;
void startLoop();
void iterateLoop();
bool connectionRunning() { return connection_running.load(); }
bool loopRunning() { return loop_running.load(); }
void updateLoopState(UInt8 state) { loop_state.store(state); }
UInt8 getLoopState() { return loop_state.load(); }
@ -35,7 +39,7 @@ private:
uv_loop_t * loop;
Poco::Logger * log;
std::atomic<bool> connection_running;
std::atomic<bool> connection_running, loop_running;
std::atomic<UInt8> loop_state;
std::mutex startup_mutex;
};

View File

@ -18,7 +18,6 @@ namespace DB
M(String, rabbitmq_exchange_type, "default", "The exchange type.", 0) \
M(UInt64, rabbitmq_num_consumers, 1, "The number of consumer channels per table.", 0) \
M(UInt64, rabbitmq_num_queues, 1, "The number of queues per consumer.", 0) \
M(Bool, rabbitmq_transactional_channel, false, "Use transactional channel for publishing.", 0) \
M(String, rabbitmq_queue_base, "", "Base for queue names to be able to reopen non-empty queues in case of failure.", 0) \
M(String, rabbitmq_deadletter_exchange, "", "Exchange name to be passed as a dead-letter-exchange name.", 0) \
M(Bool, rabbitmq_persistent_mode, false, "If set, delivery mode will be set to 2 (makes messages 'persistent', durable).", 0) \

View File

@ -50,26 +50,12 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
, row_delimiter(row_delimiter_)
, stopped(stopped_)
, received(QUEUE_SIZE * num_queues)
, last_inserted_record(AckTracker())
{
for (size_t queue_id = 0; queue_id < num_queues; ++queue_id)
bindQueue(queue_id);
consumer_channel->onReady([&]()
{
channel_id = std::to_string(channel_id_base) + "_" + std::to_string(channel_id_counter++) + "_" + channel_base;
LOG_TRACE(log, "Channel {} is created", channel_id);
consumer_channel->onError([&](const char * message)
{
LOG_ERROR(log, "Channel {} error: {}", channel_id, message);
channel_error.store(true);
});
updateAckTracker(AckTracker());
subscribe();
channel_error.store(false);
});
setupChannel();
}
@ -93,7 +79,7 @@ void ReadBufferFromRabbitMQConsumer::bindQueue(size_t queue_id)
/* Here we bind either to sharding exchange (consistent-hash) or to bridge exchange (fanout). All bindings to routing keys are
* done between client's exchange and local bridge exchange. Binding key must be a string integer in case of hash exchange, for
* fanout exchange it can be arbitrary.
* fanout exchange it can be arbitrary
*/
setup_channel->bindQueue(exchange_name, queue_name, std::to_string(channel_id_base))
.onSuccess([&]
@ -118,7 +104,7 @@ void ReadBufferFromRabbitMQConsumer::bindQueue(size_t queue_id)
}
/* The first option not just simplifies queue_name, but also implements the possibility to be able to resume reading from one
* specific queue when its name is specified in queue_base setting.
* specific queue when its name is specified in queue_base setting
*/
const String queue_name = !hash_exchange ? queue_base : std::to_string(channel_id_base) + "_" + std::to_string(queue_id) + "_" + queue_base;
setup_channel->declareQueue(queue_name, AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback);
@ -138,6 +124,9 @@ void ReadBufferFromRabbitMQConsumer::subscribe()
.onSuccess([&](const std::string & /* consumer_tag */)
{
LOG_TRACE(log, "Consumer on channel {} is subscribed to queue {}", channel_id, queue_name);
if (++subscribed == queues.size())
wait_subscription.store(false);
})
.onReceived([&](const AMQP::Message & message, uint64_t delivery_tag, bool redelivered)
{
@ -155,39 +144,39 @@ void ReadBufferFromRabbitMQConsumer::subscribe()
})
.onError([&](const char * message)
{
/* End up here either if channel ends up in an error state (then there will be resubscription) or consume call error, which
* arises from queue settings mismatch or queue level error, which should not happen as noone else is supposed to touch them
*/
LOG_ERROR(log, "Consumer failed on channel {}. Reason: {}", channel_id, message);
wait_subscription.store(false);
});
}
}
void ReadBufferFromRabbitMQConsumer::ackMessages()
bool ReadBufferFromRabbitMQConsumer::ackMessages()
{
/* Delivery tags are scoped per channel, so if channel fails, then all previous delivery tags become invalid. Also this check ensures
* that there is no data race with onReady callback in restoreChannel() (they can be called at the same time from different threads).
* And there is no need to synchronize this method with updateAckTracker() as they are not supposed to be called at the same time.
*/
if (channel_error.load())
return;
AckTracker record = last_inserted_record;
/// Do not send ack to server if message's channel is not the same as current running channel.
if (record.channel_id == channel_id && record.delivery_tag && record.delivery_tag > prev_tag && event_handler->connectionRunning())
/* Do not send ack to server if message's channel is not the same as current running channel because delivery tags are scoped per
* channel, so if channel fails, all previous delivery tags become invalid
*/
if (record.channel_id == channel_id && record.delivery_tag && record.delivery_tag > prev_tag)
{
consumer_channel->ack(record.delivery_tag, AMQP::multiple); /// Will ack all up to last tag starting from last acked.
prev_tag = record.delivery_tag;
/// Commit all received messages with delivery tags from last commited to last inserted
if (!consumer_channel->ack(record.delivery_tag, AMQP::multiple))
return false;
LOG_TRACE(log, "Consumer acknowledged messages with deliveryTags up to {} on the channel {}", record.delivery_tag, channel_id);
prev_tag = record.delivery_tag;
LOG_TRACE(log, "Consumer acknowledged messages with deliveryTags up to {} on channel {}", record.delivery_tag, channel_id);
}
return true;
}
void ReadBufferFromRabbitMQConsumer::updateAckTracker(AckTracker record)
{
/* This method can be called from readImpl and from channel->onError() callback, but channel_error check ensures that it is not done
* at the same time, so no synchronization needed.
*/
if (record.delivery_tag && channel_error.load())
return;
@ -198,29 +187,31 @@ void ReadBufferFromRabbitMQConsumer::updateAckTracker(AckTracker record)
}
void ReadBufferFromRabbitMQConsumer::restoreChannel(ChannelPtr new_channel)
void ReadBufferFromRabbitMQConsumer::setupChannel()
{
consumer_channel = std::move(new_channel);
wait_subscription.store(true);
consumer_channel->onReady([&]()
{
/* First number indicates current consumer buffer; second number indicates serial number of created channel for current buffer,
* i.e. if channel fails - another one is created and its serial number is incremented; channel_base is to guarantee that
* channel_id is unique for each table.
* channel_id is unique for each table
*/
channel_id = std::to_string(channel_id_base) + "_" + std::to_string(channel_id_counter++) + "_" + channel_base;
LOG_TRACE(log, "Channel {} is created", channel_id);
consumer_channel->onError([&](const char * message)
{
LOG_ERROR(log, "Channel {} error: {}", channel_id, message);
channel_error.store(true);
});
updateAckTracker(AckTracker());
subscribed = 0;
subscribe();
channel_error.store(false);
});
consumer_channel->onError([&](const char * message)
{
LOG_ERROR(log, "Channel {} error: {}", channel_id, message);
channel_error.store(true);
wait_subscription.store(false);
});
}

View File

@ -56,12 +56,18 @@ public:
AckTracker track;
};
void allowNext() { allowed = true; } // Allow to read next message.
bool channelUsable() { return !channel_error.load(); }
void restoreChannel(ChannelPtr new_channel);
/// Do not allow to update channel untill current channel is properly set up and subscribed
bool channelAllowed() { return !wait_subscription.load(); }
void ackMessages();
void updateAckTracker(AckTracker record);
ChannelPtr & getChannel() { return consumer_channel; }
void setupChannel();
bool ackMessages();
void updateAckTracker(AckTracker record = AckTracker());
bool queueEmpty() { return received.empty(); }
void allowNext() { allowed = true; } // Allow to read next message.
auto getChannelID() const { return current.track.channel_id; }
auto getDeliveryTag() const { return current.track.delivery_tag; }
@ -93,10 +99,11 @@ private:
const std::atomic<bool> & stopped;
String channel_id;
std::atomic<bool> channel_error = true;
std::atomic<bool> channel_error = true, wait_subscription = false;
std::vector<String> queues;
ConcurrentBoundedQueue<MessageData> received;
MessageData current;
size_t subscribed = 0;
AckTracker last_inserted_record;
UInt64 prev_tag = 0, channel_id_counter = 0;

View File

@ -78,7 +78,6 @@ StorageRabbitMQ::StorageRabbitMQ(
const bool persistent_)
: IStorage(table_id_)
, global_context(context_.getGlobalContext())
, rabbitmq_context(Context(global_context))
, routing_keys(global_context.getMacros()->expand(routing_keys_))
, exchange_name(exchange_name_)
, format_name(global_context.getMacros()->expand(format_name_))
@ -99,23 +98,15 @@ StorageRabbitMQ::StorageRabbitMQ(
loop = std::make_unique<uv_loop_t>();
uv_loop_init(loop.get());
event_handler = std::make_shared<RabbitMQHandler>(loop.get(), log);
connection = std::make_shared<AMQP::TcpConnection>(event_handler.get(), AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login(login_password.first, login_password.second), "/"));
size_t cnt_retries = 0;
while (!connection->ready() && ++cnt_retries != RETRIES_MAX)
{
event_handler->iterateLoop();
std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP));
}
if (!connection->ready())
if (!restoreConnection(false))
{
if (!connection->closed())
connection->close(true);
throw Exception("Cannot connect to RabbitMQ", ErrorCodes::CANNOT_CONNECT_RABBITMQ);
}
rabbitmq_context.makeQueryContext();
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
setInMemoryMetadata(storage_metadata);
@ -153,14 +144,14 @@ StorageRabbitMQ::StorageRabbitMQ(
if (queue_base.empty())
{
/* Make sure that local exchange name is unique for each table and is not the same as client's exchange name. It also needs to
* be table_name and not just a random string, because local exchanges should be declared the same for same tables.
* be table_name and not just a random string, because local exchanges should be declared the same for same tables
*/
sharding_exchange = exchange_name + "_" + table_name;
/* By default without a specified queue name in queue's declaration - its name will be generated by the library, but its better
* to specify it unique for each table to reuse them once the table is recreated. So it means that queues remain the same for every
* table unless queue_base table setting is specified (which allows to register consumers to specific queues). Now this is a base
* for the names of later declared queues.
* for the names of later declared queues
*/
queue_base = table_name;
}
@ -168,7 +159,7 @@ StorageRabbitMQ::StorageRabbitMQ(
{
/* In case different tables are used to register multiple consumers to the same queues (so queues are shared between tables) and
* at the same time sharding exchange is needed (if there are multiple shared queues), then those tables also need to share
* sharding exchange and bridge exchange.
* sharding exchange and bridge exchange
*/
sharding_exchange = exchange_name + "_" + queue_base;
}
@ -186,7 +177,6 @@ void StorageRabbitMQ::heartbeatFunc()
{
if (!stream_cancelled && event_handler->connectionRunning())
{
LOG_TRACE(log, "Sending RabbitMQ heartbeat");
connection->heartbeat();
heartbeat_task->scheduleAfter(HEARTBEAT_RESCHEDULE_MS);
}
@ -196,17 +186,14 @@ void StorageRabbitMQ::heartbeatFunc()
void StorageRabbitMQ::loopingFunc()
{
if (event_handler->connectionRunning())
{
LOG_DEBUG(log, "Starting event looping iterations");
event_handler->startLoop();
}
}
void StorageRabbitMQ::initExchange()
{
/* Binding scheme is the following: client's exchange -> key bindings by routing key list -> bridge exchange (fanout) ->
* -> sharding exchange (only if needed) -> queues.
* -> sharding exchange (only if needed) -> queues
*/
setup_channel->declareExchange(exchange_name, exchange_type, AMQP::durable)
.onError([&](const char * message)
@ -215,7 +202,7 @@ void StorageRabbitMQ::initExchange()
+ std::string(message), ErrorCodes::LOGICAL_ERROR);
});
/// Bridge exchange is needed to easily disconnect consumer queues and also simplifies queue bindings.
/// Bridge exchange is needed to easily disconnect consumer queues and also simplifies queue bindings
setup_channel->declareExchange(bridge_exchange, AMQP::fanout, AMQP::durable + AMQP::autodelete)
.onError([&](const char * message)
{
@ -229,7 +216,7 @@ void StorageRabbitMQ::initExchange()
}
/* Change hash property because by default it will be routing key, which has to be an integer, but with support for any exchange
* type - routing keys might be of any type.
* type - routing keys might be of any type
*/
AMQP::Table binding_arguments;
binding_arguments["hash-property"] = "message_id";
@ -313,23 +300,66 @@ void StorageRabbitMQ::bindExchange()
}
bool StorageRabbitMQ::restoreConnection(bool reconnecting)
{
size_t cnt_retries = 0;
if (reconnecting)
{
heartbeat_task->deactivate();
connection->close(); /// Connection might be unusable, but not closed
/* Connection is not closed immediately (firstly, all pending operations are completed, and then
* an AMQP closing-handshake is performed). But cannot open a new connection untill previous one is properly closed
*/
while (!connection->closed() && ++cnt_retries != RETRIES_MAX)
event_handler->iterateLoop();
/// This will force immediate closure if not yet closed
if (!connection->closed())
connection->close(true);
LOG_TRACE(log, "Trying to restore consumer connection");
}
connection = std::make_shared<AMQP::TcpConnection>(event_handler.get(),
AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login(login_password.first, login_password.second), "/"));
cnt_retries = 0;
while (!connection->ready() && ++cnt_retries != RETRIES_MAX)
{
event_handler->iterateLoop();
std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP));
}
return event_handler->connectionRunning();
}
void StorageRabbitMQ::updateChannel(ChannelPtr & channel)
{
channel = std::make_shared<AMQP::TcpChannel>(connection.get());
}
void StorageRabbitMQ::unbindExchange()
{
/* This is needed because with RabbitMQ can't (without special adjustments) use the same table for reading and writing (alternating them),
* because publishing is done to exchange, publisher never knows to which queues the message will go, every application interested in
/* This is needed because with RabbitMQ (without special adjustments) can't, for example, properly make mv if there was insert query
* on the same table before, and in another direction it will make redundant copies, but most likely nobody will do that.
* As publishing is done to exchange, publisher never knows to which queues the message will go, every application interested in
* consuming from certain exchange - declares its owns exchange-bound queues, messages go to all such exchange-bound queues, and as
* input streams are always created at startup, then they will also declare its own exchange bound queues, but they will not be visible
* externally - client declares its own exchange-bound queues, from which to consume, so this means that if not disconnecting this local
* queues, then messages will go both ways and in one of them they will remain not consumed. Therefore, if insert query is called, need
* to desconnect local consumers, but then MV cannot be afterwards created on the same table. It can be reverted to allow alternating
* these queries, but it will be ugly and seems pointless because probably nobody uses tables alternating INSERT and MV queries on the
* same table.
* queues, then messages will go both ways and in one of them they will remain not consumed. So need to disconnect local exchange
* bindings to remove redunadant message copies, but after that mv cannot work unless thoso bindings recreated. Recreating them is not
* difficult but very ugly and as probably nobody will do such thing - bindings will not be recreated.
*/
std::call_once(flag, [&]()
{
heartbeat_task->deactivate();
streaming_task->deactivate();
event_handler->updateLoopState(Loop::STOP);
looping_task->deactivate();
heartbeat_task->deactivate();
setup_channel->removeExchange(bridge_exchange)
.onSuccess([&]()
@ -349,73 +379,6 @@ void StorageRabbitMQ::unbindExchange()
}
bool StorageRabbitMQ::restoreConnection()
{
if (restore_connection.try_lock())
{
/// This lock is to synchronize with getChannel().
std::lock_guard lk(connection_mutex);
if (!event_handler->connectionRunning())
{
/// Stopping loop now and not right after connection error, because need to run it to let it properly close connection.
if (event_handler->getLoopState() == Loop::RUN)
{
event_handler->updateLoopState(Loop::STOP);
looping_task->deactivate();
heartbeat_task->deactivate();
}
/* connection->close() is called in onError() method (which is called by the AMQP library when a fatal error occurs on the
* connection) inside event_handler, but it is not closed immediately (firstly, all pending operations are completed, and then
* an AMQP closing-handshake is performed). But cannot open a new connection untill previous one is properly closed.
*/
size_t cnt_retries = 0;
while (!connection->closed() && ++cnt_retries != RETRIES_MAX)
event_handler->iterateLoop();
/// This will force immediate closure if not yet closed.
if (!connection->closed())
connection->close(true);
LOG_TRACE(log, "Trying to restore consumer connection");
connection = std::make_shared<AMQP::TcpConnection>(event_handler.get(),
AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login(login_password.first, login_password.second), "/"));
cnt_retries = 0;
while (!connection->ready() && ++cnt_retries != RETRIES_MAX)
{
event_handler->iterateLoop();
std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP));
}
if (event_handler->connectionRunning())
{
looping_task->activateAndSchedule();
heartbeat_task->scheduleAfter(HEARTBEAT_RESCHEDULE_MS);
}
}
restore_connection.unlock();
}
else
{
std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP));
}
return event_handler->connectionRunning();
}
ChannelPtr StorageRabbitMQ::getChannel()
{
std::lock_guard lk(connection_mutex);
ChannelPtr new_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
return new_channel;
}
Pipe StorageRabbitMQ::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
@ -428,20 +391,55 @@ Pipe StorageRabbitMQ::read(
if (num_created_consumers == 0)
return {};
auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
auto new_context = std::make_shared<Context>(context);
if (!schema_name.empty())
new_context->setSetting("format_schema", schema_name);
bool update_channels = false;
if (!event_handler->connectionRunning())
{
if (event_handler->loopRunning())
{
event_handler->updateLoopState(Loop::STOP);
looping_task->deactivate();
}
if ((update_channels = restoreConnection(true)))
heartbeat_task->scheduleAfter(HEARTBEAT_RESCHEDULE_MS);
}
Pipes pipes;
pipes.reserve(num_created_consumers);
auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
for (size_t i = 0; i < num_created_consumers; ++i)
{
auto rabbit_stream = std::make_shared<RabbitMQBlockInputStream>(
*this, metadata_snapshot, context, column_names);
auto rabbit_stream = std::make_shared<RabbitMQBlockInputStream>(*this, metadata_snapshot, new_context, column_names);
/* It is a possible but rare case when channel gets into error state and does not also close connection, so need manual update.
* But I believe that in current context and with local rabbitmq settings this will never happen and any channel error will also
* close connection, but checking anyway (in second condition of if statement). This must be done here (and also in streamToViews())
* and not in readPrefix as it requires to stop heartbeats and looping tasks to avoid race conditions inside the library
*/
if (update_channels || rabbit_stream->needManualChannelUpdate())
{
if (event_handler->loopRunning())
{
event_handler->updateLoopState(Loop::STOP);
looping_task->deactivate();
heartbeat_task->deactivate();
}
rabbit_stream->updateChannel();
}
auto converting_stream = std::make_shared<ConvertingBlockInputStream>(
rabbit_stream, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Name);
pipes.emplace_back(std::make_shared<SourceFromInputStream>(converting_stream));
}
if (!std::exchange(loop_started, true))
if (!event_handler->loopRunning() && event_handler->connectionRunning())
looping_task->activateAndSchedule();
LOG_DEBUG(log, "Starting reading {} streams", pipes.size());
@ -484,23 +482,21 @@ void StorageRabbitMQ::startup()
void StorageRabbitMQ::shutdown()
{
stream_cancelled = true;
event_handler->updateLoopState(Loop::STOP);
wait_confirm.store(false);
looping_task->deactivate();
streaming_task->deactivate();
heartbeat_task->deactivate();
event_handler->updateLoopState(Loop::STOP);
looping_task->deactivate();
connection->close();
size_t cnt_retries = 0;
while (!connection->closed() && ++cnt_retries != (RETRIES_MAX >> 1))
{
while (!connection->closed() && ++cnt_retries != RETRIES_MAX)
event_handler->iterateLoop();
std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP >> 3));
}
/// Should actually force closure, if not yet closed, but it generates distracting error logs.
/// Should actually force closure, if not yet closed, but it generates distracting error logs
//if (!connection->closed())
// connection->close(true);
@ -558,7 +554,7 @@ ProducerBufferPtr StorageRabbitMQ::createWriteBuffer()
{
return std::make_shared<WriteBufferToRabbitMQProducer>(
parsed_address, global_context, login_password, routing_keys, exchange_name, exchange_type,
++producer_id, unique_strbase, persistent, wait_confirm, log,
producer_id.fetch_add(1), unique_strbase, persistent, wait_confirm, log,
row_delimiter ? std::optional<char>{row_delimiter} : std::nullopt, 1, 1024);
}
@ -636,29 +632,38 @@ bool StorageRabbitMQ::streamToViews()
auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = table_id;
InterpreterInsertQuery interpreter(insert, rabbitmq_context, false, true, true);
auto rabbitmq_context = std::make_shared<Context>(global_context);
rabbitmq_context->makeQueryContext();
if (!schema_name.empty())
rabbitmq_context->setSetting("format_schema", schema_name);
// Only insert into dependent views and expect that input blocks contain virtual columns
InterpreterInsertQuery interpreter(insert, *rabbitmq_context, false, true, true);
auto block_io = interpreter.execute();
auto metadata_snapshot = getInMemoryMetadataPtr();
auto column_names = block_io.out->getHeader().getNames();
auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
if (!event_handler->loopRunning() && event_handler->connectionRunning())
looping_task->activateAndSchedule();
// Create a stream for each consumer and join them in a union stream
BlockInputStreams streams;
streams.reserve(num_created_consumers);
auto metadata_snapshot = getInMemoryMetadataPtr();
auto column_names = block_io.out->getHeader().getNames();
auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
for (size_t i = 0; i < num_created_consumers; ++i)
{
auto rabbit_stream = std::make_shared<RabbitMQBlockInputStream>(*this, metadata_snapshot, rabbitmq_context, column_names);
auto converting_stream = std::make_shared<ConvertingBlockInputStream>(rabbit_stream, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Name);
streams.emplace_back(converting_stream);
auto stream = std::make_shared<RabbitMQBlockInputStream>(*this, metadata_snapshot, rabbitmq_context, column_names, false);
streams.emplace_back(stream);
// Limit read batch to maximum block size to allow DDL
IBlockInputStream::LocalLimits limits;
const Settings & settings = global_context.getSettingsRef();
limits.speed_limits.max_execution_time = settings.stream_flush_interval_ms;
limits.speed_limits.max_execution_time = global_context.getSettingsRef().stream_flush_interval_ms;
limits.timeout_overflow_mode = OverflowMode::BREAK;
rabbit_stream->setLimits(limits);
stream->setLimits(limits);
}
// Join multiple streams if necessary
@ -668,12 +673,56 @@ bool StorageRabbitMQ::streamToViews()
else
in = streams[0];
if (!std::exchange(loop_started, true))
looping_task->activateAndSchedule();
std::atomic<bool> stub = {false};
copyData(*in, *block_io.out, &stub);
/* Need to stop loop even if connection is ok, because sending ack() with loop running in another thread will lead to a lot of data
* races inside the library, but only in case any error occurs or connection is lost while ack is being sent
*/
if (event_handler->loopRunning())
{
event_handler->updateLoopState(Loop::STOP);
looping_task->deactivate();
}
if (!event_handler->connectionRunning())
{
if (restoreConnection(true))
{
for (auto & stream : streams)
stream->as<RabbitMQBlockInputStream>()->updateChannel();
}
else
{
/// Reschedule if unable to connect to rabbitmq
return false;
}
}
else
{
heartbeat_task->deactivate();
/// Commit
for (auto & stream : streams)
{
if (!stream->as<RabbitMQBlockInputStream>()->sendAck())
{
/* Almost any error with channel will lead to connection closure, but if so happens that channel errored and connection
* is not closed - also need to restore channels
*/
if (!stream->as<RabbitMQBlockInputStream>()->needManualChannelUpdate())
stream->as<RabbitMQBlockInputStream>()->updateChannel();
else
break;
}
}
}
event_handler->updateLoopState(Loop::RUN);
looping_task->activateAndSchedule();
heartbeat_task->scheduleAfter(HEARTBEAT_RESCHEDULE_MS); /// It is also deactivated in restoreConnection(), so reschedule anyway
// Check whether the limits were applied during query execution
bool limits_applied = false;
const BlockStreamProfileInfo & info = in->getProfileInfo();
@ -808,10 +857,6 @@ void registerStorageRabbitMQ(StorageFactory & factory)
{
exchange_type = safeGet<String>(ast->value);
}
if (exchange_type != "fanout" && exchange_type != "direct" && exchange_type != "topic"
&& exchange_type != "headers" && exchange_type != "consistent_hash")
throw Exception("Invalid exchange type", ErrorCodes::BAD_ARGUMENTS);
}
UInt64 num_consumers = rabbitmq_settings.rabbitmq_num_consumers;

View File

@ -55,15 +55,14 @@ public:
const String & getFormatName() const { return format_name; }
NamesAndTypesList getVirtuals() const override;
const auto & getSchemaName() const { return schema_name; }
const String getExchange() const { return exchange_name; }
bool checkBridge() const { return !exchange_removed.load(); }
void unbindExchange();
bool exchangeRemoved() { return exchange_removed.load(); }
bool connectionRunning() { return event_handler->connectionRunning(); }
bool restoreConnection();
ChannelPtr getChannel();
bool restoreConnection(bool reconnecting);
void updateChannel(ChannelPtr & channel);
protected:
StorageRabbitMQ(
@ -85,7 +84,6 @@ protected:
private:
Context global_context;
Context rabbitmq_context;
Names routing_keys;
const String exchange_name;
@ -117,11 +115,10 @@ private:
String unique_strbase;
String sharding_exchange, bridge_exchange, consumer_exchange;
std::once_flag flag;
size_t producer_id = 0, consumer_id = 0;
bool loop_started = false;
std::atomic<bool> exchange_removed = false, wait_confirm = true;
size_t consumer_id = 0;
std::atomic<size_t> producer_id = 1;
std::atomic<bool> wait_confirm = true, exchange_removed = false;
ChannelPtr setup_channel;
std::mutex connection_mutex, restore_connection;
BackgroundSchedulePool::TaskHolder streaming_task;
BackgroundSchedulePool::TaskHolder heartbeat_task;
@ -134,6 +131,7 @@ private:
void threadFunc();
void heartbeatFunc();
void loopingFunc();
void initExchange();
void bindExchange();

View File

@ -120,19 +120,20 @@ void WriteBufferToRabbitMQProducer::countRow()
bool WriteBufferToRabbitMQProducer::setupConnection(bool reconnecting)
{
size_t cnt_retries = 0;
if (reconnecting)
{
/* connection->close() is called in onError() method (called by the AMQP library when a fatal error occurs on the connection)
* inside event_handler, but it is not closed immediately (firstly, all pending operations are completed, and then an AMQP
* closing-handshake is performed). But cannot open a new connection untill previous one is properly closed.
*/
while (!connection->closed() && ++cnt_retries != (RETRIES_MAX >> 1))
connection->close();
while (!connection->closed() && ++cnt_retries != RETRIES_MAX)
event_handler->iterateLoop();
if (!connection->closed())
connection->close(true);
LOG_TRACE(log, "Trying to set up connection");
}
LOG_TRACE(log, "Trying to set up connection");
connection = std::make_unique<AMQP::TcpConnection>(event_handler.get(),
AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login(login_password.first, login_password.second), "/"));
@ -143,7 +144,7 @@ bool WriteBufferToRabbitMQProducer::setupConnection(bool reconnecting)
std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP));
}
return connection->ready();
return event_handler->connectionRunning();
}
@ -159,7 +160,7 @@ void WriteBufferToRabbitMQProducer::setupChannel()
producer_channel->close();
/* Save records that have not received ack/nack from server before channel closure. They are removed and pushed back again once
* they are republished because after channel recovery they will acquire new delivery tags, so all previous records become invalid.
* they are republished because after channel recovery they will acquire new delivery tags, so all previous records become invalid
*/
for (const auto & record : delivery_record)
returned.tryPush(record.second);
@ -235,31 +236,31 @@ void WriteBufferToRabbitMQProducer::publish(ConcurrentBoundedQueue<std::pair<UIn
std::pair<UInt64, String> payload;
/* It is important to make sure that delivery_record.size() is never bigger than returned.size(), i.e. number if unacknowledged
* messages cannot exceed returned.size(), because they all might end up there.
* messages cannot exceed returned.size(), because they all might end up there
*/
while (!messages.empty() && producer_channel->usable() && delivery_record.size() < RETURNED_LIMIT)
{
messages.pop(payload);
AMQP::Envelope envelope(payload.second.data(), payload.second.size());
/// if headers exchange is used, routing keys are added here via headers, if not - it is just empty.
/// if headers exchange is used, routing keys are added here via headers, if not - it is just empty
AMQP::Table message_settings = key_arguments;
/* There is the case when connection is lost in the period after some messages were published and before ack/nack was sent by the
* server, then it means that publisher will never know whether those messages were delivered or not, and therefore those records
* that received no ack/nack before connection loss will be republished (see onError() callback), so there might be duplicates. To
* let consumer know that received message might be a possible duplicate - a "republished" field is added to message metadata.
* let consumer know that received message might be a possible duplicate - a "republished" field is added to message metadata
*/
message_settings["republished"] = std::to_string(republishing);
envelope.setHeaders(message_settings);
/* Adding here a messageID property to message metadata. Since RabbitMQ does not guarantee exactly-once delivery, then on the
* consumer side "republished" field of message metadata can be checked and, if it set to 1, consumer might also check "messageID"
* property. This way detection of duplicates is guaranteed.
* property. This way detection of duplicates is guaranteed
*/
envelope.setMessageID(std::to_string(payload.first));
/// Delivery mode is 1 or 2. 1 is default. 2 makes a message durable, but makes performance 1.5-2 times worse.
/// Delivery mode is 1 or 2. 1 is default. 2 makes a message durable, but makes performance 1.5-2 times worse
if (persistent)
envelope.setDeliveryMode(2);
@ -276,11 +277,11 @@ void WriteBufferToRabbitMQProducer::publish(ConcurrentBoundedQueue<std::pair<UIn
producer_channel->publish(exchange_name, routing_keys[0], envelope);
}
/// This is needed for "publisher confirms", which guarantees at-least-once delivery.
/// This is needed for "publisher confirms", which guarantees at-least-once delivery
++delivery_tag;
delivery_record.insert(delivery_record.end(), {delivery_tag, payload});
/// Need to break at some point to let event loop run, because no publishing actually happens before looping.
/// Need to break at some point to let event loop run, because no publishing actually happens before looping
if (delivery_tag % BATCH == 0)
break;
}
@ -291,11 +292,11 @@ void WriteBufferToRabbitMQProducer::publish(ConcurrentBoundedQueue<std::pair<UIn
void WriteBufferToRabbitMQProducer::writingFunc()
{
/// wait_confirm == false when shutdown is called, needed because table might be dropped before all acks are received.
/// wait_confirm == false when shutdown is called, needed because table might be dropped before all acks are received
while ((!payloads.empty() || wait_all) && wait_confirm.load())
{
/* Publish main paylods only when there are no returned messages. This way it is ensured that returned messages are republished
* as fast as possible and no new publishes are made before returned messages are handled.
* as fast as possible and no new publishes are made before returned messages are handled
*/
if (!returned.empty() && producer_channel->usable())
publish(returned, true);
@ -306,11 +307,11 @@ void WriteBufferToRabbitMQProducer::writingFunc()
/* wait_num != 0 if there will be no new payloads pushed to payloads.queue in countRow(), delivery_record is empty if there are
* no more pending acknowldgements from the server (if receieved ack(), records are deleted, if received nack(), records are pushed
* to returned.queue and deleted, because server will attach new delivery tags to them).
* to returned.queue and deleted, because server will attach new delivery tags to them)
*/
if (wait_num.load() && delivery_record.empty() && payloads.empty() && returned.empty())
wait_all = false;
else if ((!producer_channel->usable() && connection->usable()) || (!connection->usable() && setupConnection(true)))
else if ((!producer_channel->usable() && event_handler->connectionRunning()) || (!event_handler->connectionRunning() && setupConnection(true)))
setupChannel();
}

View File

@ -1,6 +1,6 @@
syntax = "proto3";
message KeyValuePair {
uint64 key = 1;
string value = 2;
}
message KeyValueProto {
uint64 key = 1;
string value = 2;
}

View File

@ -19,29 +19,29 @@ DESCRIPTOR = _descriptor.FileDescriptor(
syntax='proto3',
serialized_options=None,
create_key=_descriptor._internal_create_key,
serialized_pb=b'\n-clickhouse_path/format_schemas/rabbitmq.proto\"*\n\x0cKeyValuePair\x12\x0b\n\x03key\x18\x01 \x01(\x04\x12\r\n\x05value\x18\x02 \x01(\tb\x06proto3'
serialized_pb=b'\n-clickhouse_path/format_schemas/rabbitmq.proto\"+\n\rKeyValueProto\x12\x0b\n\x03key\x18\x01 \x01(\x04\x12\r\n\x05value\x18\x02 \x01(\tb\x06proto3'
)
_KEYVALUEPAIR = _descriptor.Descriptor(
name='KeyValuePair',
full_name='KeyValuePair',
_KEYVALUEPROTO = _descriptor.Descriptor(
name='KeyValueProto',
full_name='KeyValueProto',
filename=None,
file=DESCRIPTOR,
containing_type=None,
create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='key', full_name='KeyValuePair.key', index=0,
name='key', full_name='KeyValueProto.key', index=0,
number=1, type=4, cpp_type=4, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='value', full_name='KeyValuePair.value', index=1,
name='value', full_name='KeyValueProto.value', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
@ -60,18 +60,18 @@ _KEYVALUEPAIR = _descriptor.Descriptor(
oneofs=[
],
serialized_start=49,
serialized_end=91,
serialized_end=92,
)
DESCRIPTOR.message_types_by_name['KeyValuePair'] = _KEYVALUEPAIR
DESCRIPTOR.message_types_by_name['KeyValueProto'] = _KEYVALUEPROTO
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
KeyValuePair = _reflection.GeneratedProtocolMessageType('KeyValuePair', (_message.Message,), {
'DESCRIPTOR' : _KEYVALUEPAIR,
KeyValueProto = _reflection.GeneratedProtocolMessageType('KeyValueProto', (_message.Message,), {
'DESCRIPTOR' : _KEYVALUEPROTO,
'__module__' : 'clickhouse_path.format_schemas.rabbitmq_pb2'
# @@protoc_insertion_point(class_scope:KeyValuePair)
# @@protoc_insertion_point(class_scope:KeyValueProto)
})
_sym_db.RegisterMessage(KeyValuePair)
_sym_db.RegisterMessage(KeyValueProto)
# @@protoc_insertion_point(module_scope)

View File

@ -328,7 +328,7 @@ def test_rabbitmq_protobuf(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'pb',
rabbitmq_format = 'Protobuf',
rabbitmq_schema = 'rabbitmq.proto:KeyValuePair';
rabbitmq_schema = 'rabbitmq.proto:KeyValueProto';
''')
credentials = pika.PlainCredentials('root', 'clickhouse')
@ -338,7 +338,7 @@ def test_rabbitmq_protobuf(rabbitmq_cluster):
data = ''
for i in range(0, 20):
msg = rabbitmq_pb2.KeyValuePair()
msg = rabbitmq_pb2.KeyValueProto()
msg.key = i
msg.value = str(i)
serialized_msg = msg.SerializeToString()
@ -346,7 +346,7 @@ def test_rabbitmq_protobuf(rabbitmq_cluster):
channel.basic_publish(exchange='pb', routing_key='', body=data)
data = ''
for i in range(20, 21):
msg = rabbitmq_pb2.KeyValuePair()
msg = rabbitmq_pb2.KeyValueProto()
msg.key = i
msg.value = str(i)
serialized_msg = msg.SerializeToString()
@ -354,7 +354,7 @@ def test_rabbitmq_protobuf(rabbitmq_cluster):
channel.basic_publish(exchange='pb', routing_key='', body=data)
data = ''
for i in range(21, 50):
msg = rabbitmq_pb2.KeyValuePair()
msg = rabbitmq_pb2.KeyValueProto()
msg.key = i
msg.value = str(i)
serialized_msg = msg.SerializeToString()
@ -1583,7 +1583,7 @@ def test_rabbitmq_virtual_columns_with_materialized_view(rabbitmq_cluster):
@pytest.mark.timeout(420)
def test_rabbitmq_queue_resume(rabbitmq_cluster):
def test_rabbitmq_no_loss_on_table_drop(rabbitmq_cluster):
instance.query('''
CREATE TABLE test.rabbitmq_queue_resume (key UInt64, value UInt64)
ENGINE = RabbitMQ
@ -1655,7 +1655,7 @@ def test_rabbitmq_queue_resume(rabbitmq_cluster):
while True:
result1 = instance.query('SELECT count() FROM test.view')
time.sleep(1)
if int(result1) >= messages_num * threads_num:
if int(result1) == messages_num * threads_num:
break
instance.query('''
@ -1664,77 +1664,7 @@ def test_rabbitmq_queue_resume(rabbitmq_cluster):
DROP TABLE test.view;
''')
assert int(result1) >= messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result)
@pytest.mark.timeout(420)
def test_rabbitmq_no_loss_on_table_drop(rabbitmq_cluster):
instance.query('''
CREATE TABLE test.rabbitmq_consumer_acks (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'consumer_acks',
rabbitmq_queue_base = 'consumer_resume',
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
''')
i = 0
messages_num = 100000
credentials = pika.PlainCredentials('root', 'clickhouse')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
messages = []
for _ in range(messages_num):
messages.append(json.dumps({'key': i, 'value': i}))
i += 1
for message in messages:
channel.basic_publish(exchange='consumer_acks', routing_key='', body=message, properties=pika.BasicProperties(delivery_mode = 2))
connection.close()
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.rabbitmq_consumer_acks;
''')
while int(instance.query('SELECT count() FROM test.view')) == 0:
time.sleep(1)
instance.query('''
DROP TABLE IF EXISTS test.rabbitmq_consumer_acks;
''')
#collected = int(instance.query('SELECT count() FROM test.view'))
instance.query('''
CREATE TABLE test.rabbitmq_consumer_acks (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_queue_base = 'consumer_resume',
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
''')
while True:
result = instance.query('SELECT count(DISTINCT key) FROM test.view')
time.sleep(1)
if int(result) == messages_num:
break
instance.query('''
DROP TABLE test.consumer;
DROP TABLE test.view;
DROP TABLE test.rabbitmq_consumer_acks;
''')
assert int(result) == messages_num, 'ClickHouse lost some messages: {}'.format(result)
assert int(result1) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result)
@pytest.mark.timeout(420)