Add insert part

This commit is contained in:
kssenii 2020-06-01 15:37:23 +00:00
parent e80b405359
commit 5757dd1d57
7 changed files with 348 additions and 0 deletions

View File

@ -8,4 +8,7 @@ namespace DB
class ReadBufferFromRabbitMQConsumer;
using ConsumerBufferPtr = std::shared_ptr<ReadBufferFromRabbitMQConsumer>;
class WriteBufferToRabbitMQProducer;
using ProducerBufferPtr = std::shared_ptr<WriteBufferToRabbitMQProducer>;
}

View File

@ -0,0 +1,57 @@
#include <Storages/RabbitMQ/RabbitMQBlockOutputStream.h>
#include <Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h>
#include <Storages/RabbitMQ/StorageRabbitMQ.h>
#include <Formats/FormatFactory.h>
#include <common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern int CANNOT_CREATE_IO_BUFFER;
}
RabbitMQBlockOutputStream::RabbitMQBlockOutputStream(
StorageRabbitMQ & storage_, const Context & context_) : storage(storage_), context(context_)
{
}
Block RabbitMQBlockOutputStream::getHeader() const
{
return storage.getSampleBlockNonMaterialized();
}
void RabbitMQBlockOutputStream::writePrefix()
{
buffer = storage.createWriteBuffer();
if (!buffer)
throw Exception("Failed to create RabbitMQ producer!", ErrorCodes::CANNOT_CREATE_IO_BUFFER);
child = FormatFactory::instance().getOutput(
storage.getFormatName(), *buffer, getHeader(), context, [this](const Columns & /* columns */, size_t /* rows */)
{
buffer->count_row();
});
}
void RabbitMQBlockOutputStream::write(const Block & block)
{
child->write(block);
if (buffer)
buffer->flush();
}
void RabbitMQBlockOutputStream::writeSuffix()
{
child->writeSuffix();
}
}

View File

@ -0,0 +1,29 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Interpreters/Context.h>
#include <Storages/RabbitMQ/StorageRabbitMQ.h>
namespace DB
{
class RabbitMQBlockOutputStream : public IBlockOutputStream
{
public:
explicit RabbitMQBlockOutputStream(StorageRabbitMQ & storage_, const Context & context_);
Block getHeader() const override;
void writePrefix() override;
void write(const Block & block) override;
void writeSuffix() override;
private:
StorageRabbitMQ & storage;
Context context;
ProducerBufferPtr buffer;
BlockOutputStreamPtr child;
};
}

View File

@ -15,6 +15,8 @@
#include <Parsers/ASTLiteral.h>
#include <Storages/RabbitMQ/RabbitMQSettings.h>
#include <Storages/RabbitMQ/RabbitMQBlockInputStream.h>
#include <Storages/RabbitMQ/RabbitMQBlockOutputStream.h>
#include <Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h>
#include <Storages/RabbitMQ/RabbitMQHandler.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageMaterializedView.h>
@ -124,6 +126,12 @@ Pipes StorageRabbitMQ::read(
}
BlockOutputStreamPtr StorageRabbitMQ::write(const ASTPtr &, const Context & context)
{
return std::make_shared<RabbitMQBlockOutputStream>(*this, context);
}
void StorageRabbitMQ::startup()
{
for (size_t i = 0; i < num_consumers; ++i)
@ -205,6 +213,14 @@ ConsumerBufferPtr StorageRabbitMQ::createReadBuffer()
}
ProducerBufferPtr StorageRabbitMQ::createWriteBuffer()
{
return std::make_shared<WriteBufferToRabbitMQProducer>(parsed_address, routing_key, exchange_name,
log, num_consumers, bind_by_id, hash_exchange,
row_delimiter ? std::optional<char>{row_delimiter} : std::nullopt, 1, 1024);
}
bool StorageRabbitMQ::checkDependencies(const StorageID & table_id)
{
// Check if all dependencies are attached

View File

@ -25,6 +25,7 @@ public:
std::string getName() const override { return "RabbitMQ"; }
bool supportsSettings() const override { return true; }
bool noPushingToViews() const override { return true; }
void startup() override;
void shutdown() override;
@ -37,10 +38,16 @@ public:
size_t max_block_size,
unsigned num_streams) override;
BlockOutputStreamPtr write(
const ASTPtr & query,
const Context & context) override;
void pushReadBuffer(ConsumerBufferPtr buf);
ConsumerBufferPtr popReadBuffer();
ConsumerBufferPtr popReadBuffer(std::chrono::milliseconds timeout);
ProducerBufferPtr createWriteBuffer();
const String & getExchangeName() const { return exchange_name; }
const String & getRoutingKey() const { return routing_key; }

View File

@ -0,0 +1,169 @@
#include <Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h>
#include "Core/Block.h"
#include "Columns/ColumnString.h"
#include "Columns/ColumnsNumber.h"
#include <common/logger_useful.h>
#include <amqpcpp.h>
#include <chrono>
#include <thread>
namespace DB
{
enum
{
Connection_setup_sleep = 200,
Connection_setup_retries_max = 1000,
Buffer_limit_to_flush = 50000
};
WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
std::pair<std::string, UInt16> & parsed_address,
const String & routing_key_,
const String & exchange_,
Poco::Logger * log_,
const size_t num_queues_,
const bool bind_by_id_,
const bool hash_exchange_,
std::optional<char> delimiter,
size_t rows_per_message,
size_t chunk_size_)
: WriteBuffer(nullptr, 0)
, routing_key(routing_key_)
, exchange_name(exchange_)
, log(log_)
, num_queues(num_queues_)
, bind_by_id(bind_by_id_)
, hash_exchange(hash_exchange_)
, delim(delimiter)
, max_rows(rows_per_message)
, chunk_size(chunk_size_)
, producerEvbase(event_base_new())
, eventHandler(producerEvbase, log)
, connection(&eventHandler, AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login("root", "clickhouse"), "/"))
{
/* The reason behind making a separate connection for each concurrent producer is explained here:
* https://github.com/CopernicaMarketingSoftware/AMQP-CPP/issues/128#issuecomment-300780086
* - publishing from different threads (as outputStreams are asynchronous) leads to internal libary errors.
*/
size_t cnt_retries = 0;
while (!connection.ready() && ++cnt_retries != Connection_setup_retries_max)
{
event_base_loop(producerEvbase, EVLOOP_NONBLOCK | EVLOOP_ONCE);
std::this_thread::sleep_for(std::chrono::milliseconds(Connection_setup_sleep));
}
if (!connection.ready())
{
LOG_ERROR(log, "Cannot set up connection for producer!");
}
producer_channel = std::make_shared<AMQP::TcpChannel>(&connection);
}
WriteBufferToRabbitMQProducer::~WriteBufferToRabbitMQProducer()
{
flush();
connection.close();
assert(rows == 0 && chunks.empty());
}
void WriteBufferToRabbitMQProducer::count_row()
{
if (++rows % max_rows == 0)
{
const std::string & last_chunk = chunks.back();
size_t last_chunk_size = offset();
if (delim && last_chunk[last_chunk_size - 1] == delim)
--last_chunk_size;
std::string payload;
payload.reserve((chunks.size() - 1) * chunk_size + last_chunk_size);
for (auto i = chunks.begin(), e = --chunks.end(); i != e; ++i)
payload.append(*i);
payload.append(last_chunk, 0, last_chunk_size);
rows = 0;
chunks.clear();
set(nullptr, 0);
messages.emplace_back(payload);
++message_counter;
if (messages.size() >= Buffer_limit_to_flush)
{
flush();
}
}
}
void WriteBufferToRabbitMQProducer::flush()
{
/* Why accumulating payloads and not publishing each of them at once in count_row()? Because publishing needs to
* be wrapped inside declareExchange() callback and it is too expensive in terms of time to declare it each time
* we publish. Declaring it once and then publishing without wrapping inside onSuccess callback leads to
* exchange becoming inactive at some point and part of messages is lost as a result.
*/
std::atomic<bool> exchange_declared = false, exchange_error = false;
producer_channel->declareExchange(exchange_name + "_direct", AMQP::direct, AMQP::passive)
.onSuccess([&]()
{
for (auto & payload : messages)
{
if (!message_counter)
return;
next_queue = next_queue % num_queues + 1;
if (bind_by_id || hash_exchange)
{
producer_channel->publish(exchange_name, std::to_string(next_queue), payload);
}
else
{
producer_channel->publish(exchange_name, routing_key, payload);
}
--message_counter;
}
exchange_declared = true;
messages.clear();
})
.onError([&](const char * message)
{
exchange_error = true;
exchange_declared = false;
LOG_ERROR(log, "Exchange was not declared: {}", message);
});
while (!exchange_declared && !exchange_error)
{
startEventLoop(exchange_declared);
}
}
void WriteBufferToRabbitMQProducer::nextImpl()
{
chunks.push_back(std::string());
chunks.back().resize(chunk_size);
set(chunks.back().data(), chunk_size);
}
void WriteBufferToRabbitMQProducer::startEventLoop(std::atomic<bool> & check_param)
{
eventHandler.start(check_param);
}
}

View File

@ -0,0 +1,67 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <Columns/IColumn.h>
#include <list>
#include <amqpcpp.h>
#include <Storages/RabbitMQ/RabbitMQHandler.h>
namespace DB
{
using ProducerPtr = std::shared_ptr<AMQP::TcpChannel>;
using Messages = std::vector<String>;
class WriteBufferToRabbitMQProducer : public WriteBuffer
{
public:
WriteBufferToRabbitMQProducer(
std::pair<std::string, UInt16> & parsed_address,
const String & routing_key_,
const String & exchange_,
Poco::Logger * log_,
const size_t num_queues_,
const bool bind_by_id_,
const bool hash_exchange_,
std::optional<char> delimiter,
size_t rows_per_message,
size_t chunk_size_
);
~WriteBufferToRabbitMQProducer() override;
void count_row();
void flush();
private:
void nextImpl() override;
void checkExchange();
void startEventLoop(std::atomic<bool> & check_param);
const String routing_key;
const String exchange_name;
const bool bind_by_id;
const bool hash_exchange;
const size_t num_queues;
event_base * producerEvbase;
RabbitMQHandler eventHandler;
AMQP::TcpConnection connection;
ProducerPtr producer_channel;
size_t next_queue = 0;
UInt64 message_counter = 0;
String channel_id;
Messages messages;
Poco::Logger * log;
const std::optional<char> delim;
const size_t max_rows;
const size_t chunk_size;
size_t count_mes = 0;
size_t rows = 0;
std::list<std::string> chunks;
};
}