Register RabbitMQ storage

This commit is contained in:
kssenii 2020-05-20 08:30:38 +03:00
parent a055e33087
commit 3b75f214c5
8 changed files with 391 additions and 0 deletions

View File

@ -71,6 +71,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingMilliseconds, connection_pool_max_wait_ms, 0, "The wait time when the connection pool is full.", 0) \
M(SettingMilliseconds, replace_running_query_max_wait_ms, 5000, "The wait time for running query with the same query_id to finish when setting 'replace_running_query' is active.", 0) \
M(SettingMilliseconds, kafka_max_wait_ms, 5000, "The wait time for reading from Kafka before retry.", 0) \
M(SettingMilliseconds, rabbitmq_max_wait_ms, 5000, "The wait time for reading from RabbitMQ before retry.", 0) \
M(SettingUInt64, poll_interval, DBMS_DEFAULT_POLL_INTERVAL, "Block at the query wait loop on the server for the specified number of seconds.", 0) \
M(SettingUInt64, idle_connection_timeout, 3600, "Close idle TCP connections after specified number of seconds.", 0) \
M(SettingUInt64, distributed_connections_pool_size, DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE, "Maximum number of connections with one remote server in the pool.", 0) \

View File

@ -5,6 +5,7 @@
#cmakedefine01 USE_ICU
#cmakedefine01 USE_MYSQL
#cmakedefine01 USE_RDKAFKA
#cmakedefine01 USE_AMQPCPP
#cmakedefine01 USE_EMBEDDED_COMPILER
#cmakedefine01 USE_INTERNAL_LLVM_LIBRARY
#cmakedefine01 USE_SSL

View File

@ -0,0 +1,44 @@
#include <Storages/RabbitMQ/RabbitMQSettings.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTFunction.h>
#include <Common/Exception.h>
#include <Core/SettingsCollectionImpl.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_SETTING;
}
IMPLEMENT_SETTINGS_COLLECTION(RabbitMQSettings, LIST_OF_RABBITMQ_SETTINGS)
void RabbitMQSettings::loadFromQuery(ASTStorage & storage_def)
{
if (storage_def.settings)
{
try
{
applyChanges(storage_def.settings->changes);
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::UNKNOWN_SETTING)
throw Exception(e.message() + " for storage " + storage_def.engine->name, ErrorCodes::BAD_ARGUMENTS);
else
e.rethrow();
}
}
else
{
auto settings_ast = std::make_shared<ASTSetQuery>();
settings_ast->is_standalone = false;
storage_def.set(storage_def.settings, settings_ast);
}
}
}

View File

@ -0,0 +1,26 @@
#pragma once
#include <Core/SettingsCollection.h>
namespace DB
{
class ASTStorage;
struct RabbitMQSettings : public SettingsCollection<RabbitMQSettings>
{
#define LIST_OF_RABBITMQ_SETTINGS(M) \
M(SettingString, rabbitmq_host_port, "", "A host-port to connect to RabbitMQ server.", 0) \
M(SettingString, rabbitmq_routing_key, "5672", "A routing key to connect producer->exchange->queue<->consumer.", 0) \
M(SettingString, rabbitmq_exchange_name, "clickhouse-exchange", "The exhange name, to which messages are sent. Needed to bind queues to it.", 0) \
M(SettingString, rabbitmq_format, "", "The message format.", 0) \
M(SettingChar, rabbitmq_row_delimiter, '\0', "The character to be considered as a delimiter.", 0) \
M(SettingUInt64, rabbitmq_bind_by_id, 0, "A flag which indicates that binding should be done in range [0, num_consumers).", 0) \
M(SettingUInt64, rabbitmq_num_consumers, 1, "The number of consumer channels per table.", 0) \
M(SettingUInt64, rabbitmq_num_queues, 1, "The number of queues per consumer.", 0) \
M(SettingUInt64, rabbitmq_hash_exchange, 0, "A flag which indicates whether consistent-hash-exchange should be used.", 0) \
DECLARE_SETTINGS_COLLECTION(LIST_OF_RABBITMQ_SETTINGS)
void loadFromQuery(ASTStorage & storage_def);
};
}

View File

@ -0,0 +1,249 @@
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/LimitBlockInputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageMaterializedView.h>
#include <boost/algorithm/string/replace.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <Common/Exception.h>
#include <Common/Macros.h>
#include <Common/config_version.h>
#include <Common/setThreadName.h>
#include <Common/typeid_cast.h>
#include <common/logger_useful.h>
#include <Common/quoteString.h>
#include <Common/parseAddress.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Storages/RabbitMQ/StorageRabbitMQ.h>
#include <Storages/RabbitMQ/RabbitMQSettings.h>
#include <amqpcpp.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
StorageRabbitMQ::StorageRabbitMQ(
const StorageID & table_id_,
Context & context_,
const ColumnsDescription & columns_,
const String & host_port_,
const String & routing_key_,
const String & exchange_name_,
const String & format_name_,
char row_delimiter_,
size_t num_consumers_,
bool bind_by_id_,
size_t num_queues_,
bool hash_exchange_)
: IStorage(table_id_)
, global_context(context_.getGlobalContext())
, rabbitmq_context(Context(global_context))
, routing_key(global_context.getMacros()->expand(routing_key_))
, exchange_name(exchange_name_)
, format_name(global_context.getMacros()->expand(format_name_))
, row_delimiter(row_delimiter_)
, num_consumers(num_consumers_)
, bind_by_id(bind_by_id_)
, num_queues(num_queues_)
, hash_exchange(hash_exchange_)
, log(&Logger::get("StorageRabbitMQ (" + table_id_.table_name + ")"))
, semaphore(0, num_consumers_)
, parsed_address(parseAddress(global_context.getMacros()->expand(host_port_), 5672))
{
}
void registerStorageRabbitMQ(StorageFactory & factory)
{
auto creator_fn = [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
size_t args_count = engine_args.size();
bool has_settings = args.storage_def->settings;
RabbitMQSettings rabbitmq_settings;
if (has_settings)
{
rabbitmq_settings.loadFromQuery(*args.storage_def);
}
String host_port = rabbitmq_settings.rabbitmq_host_port;
if (args_count >= 1)
{
const auto * ast = engine_args[0]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::String)
{
host_port = safeGet<String>(ast->value);
}
else
{
throw Exception(String("RabbitMQ host:port must be a string"), ErrorCodes::BAD_ARGUMENTS);
}
}
String routing_key = rabbitmq_settings.rabbitmq_routing_key.value;
if (args_count >= 2)
{
const auto * ast = engine_args[1]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::String)
{
routing_key = safeGet<String>(ast->value);
}
else
{
throw Exception(String("RabbitMQ routing key must be a string"), ErrorCodes::BAD_ARGUMENTS);
}
}
String exchange = rabbitmq_settings.rabbitmq_exchange_name.value;
if (args_count >= 3)
{
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context);
const auto * ast = engine_args[2]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::String)
{
exchange = safeGet<String>(ast->value);
}
}
String format = rabbitmq_settings.rabbitmq_format.value;
if (args_count >= 4)
{
engine_args[3] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[3], args.local_context);
const auto * ast = engine_args[3]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::String)
{
format = safeGet<String>(ast->value);
}
else
{
throw Exception("Format must be a string", ErrorCodes::BAD_ARGUMENTS);
}
}
char row_delimiter = rabbitmq_settings.rabbitmq_row_delimiter;
if (args_count >= 5)
{
engine_args[4] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[4], args.local_context);
const auto * ast = engine_args[4]->as<ASTLiteral>();
String arg;
if (ast && ast->value.getType() == Field::Types::String)
{
arg = safeGet<String>(ast->value);
}
else
{
throw Exception("Row delimiter must be a char", ErrorCodes::BAD_ARGUMENTS);
}
if (arg.size() > 1)
{
throw Exception("Row delimiter must be a char", ErrorCodes::BAD_ARGUMENTS);
}
else if (arg.empty())
{
row_delimiter = '\0';
}
else
{
row_delimiter = arg[0];
}
}
size_t bind_by_id = static_cast<bool>(rabbitmq_settings.rabbitmq_bind_by_id);
if (args_count >= 6)
{
const auto * ast = engine_args[5]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::UInt64)
{
bind_by_id = static_cast<bool>(safeGet<UInt64>(ast->value));
}
else
{
throw Exception("Hash exchange flag must be a boolean", ErrorCodes::BAD_ARGUMENTS);
}
}
UInt64 num_consumers = rabbitmq_settings.rabbitmq_num_consumers;
if (args_count >= 7)
{
const auto * ast = engine_args[6]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::UInt64)
{
num_consumers = safeGet<UInt64>(ast->value);
}
else
{
throw Exception("Number of consumers must be a positive integer", ErrorCodes::BAD_ARGUMENTS);
}
}
UInt64 num_queues = rabbitmq_settings.rabbitmq_num_queues;
if (args_count >= 8)
{
const auto * ast = engine_args[7]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::UInt64)
{
num_consumers = safeGet<UInt64>(ast->value);
}
else
{
throw Exception("Number of queues must be a positive integer", ErrorCodes::BAD_ARGUMENTS);
}
}
size_t hash_exchange = static_cast<bool>(rabbitmq_settings.rabbitmq_hash_exchange);
if (args_count >= 9)
{
const auto * ast = engine_args[8]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::UInt64)
{
hash_exchange = static_cast<bool>(safeGet<UInt64>(ast->value));
}
else
{
throw Exception("Hash exchange flag must be a boolean", ErrorCodes::BAD_ARGUMENTS);
}
}
return StorageRabbitMQ::create(args.table_id, args.context, args.columns, host_port, routing_key, exchange,
format, row_delimiter, num_consumers, bind_by_id, num_queues, hash_exchange);
};
factory.registerStorage("RabbitMQ", creator_fn, StorageFactory::StorageFeatures{ .supports_settings = true, });
}
NamesAndTypesList StorageRabbitMQ::getVirtuals() const
{
return NamesAndTypesList{
{"_exchange", std::make_shared<DataTypeString>()},
{"_routingKey", std::make_shared<DataTypeString>()}
};
}
}

View File

@ -0,0 +1,62 @@
#pragma once
#include <Core/BackgroundSchedulePool.h>
#include <Storages/IStorage.h>
#include <Interpreters/Context.h>
#include <Poco/Semaphore.h>
#include <ext/shared_ptr_helper.h>
#include <mutex>
#include <atomic>
#include <event2/event.h>
namespace DB
{
using ChannelPtr = std::shared_ptr<AMQP::TcpChannel>;
class StorageRabbitMQ final: public ext::shared_ptr_helper<StorageRabbitMQ>, public IStorage
{
friend struct ext::shared_ptr_helper<StorageRabbitMQ>;
public:
std::string getName() const override { return "RabbitMQ"; }
bool supportsSettings() const override { return true; }
protected:
StorageRabbitMQ(
const StorageID & table_id_,
Context & context_,
const ColumnsDescription & columns_,
const String & host_port_,
const String & routing_key_, const String & exchange_name_,
const String & format_name_, char row_delimiter_,
size_t num_consumers_, bool bind_by_id_, size_t num_queues_, bool hash_exchange);
private:
Context global_context;
Context rabbitmq_context;
String routing_key;
const String exchange_name;
const String format_name;
char row_delimiter;
size_t num_consumers;
size_t num_created_consumers = 0;
bool bind_by_id;
size_t num_queues;
const bool hash_exchange;
Poco::Logger * log;
std::pair<std::string, UInt16> parsed_address;
Poco::Semaphore semaphore;
std::mutex mutex;
size_t consumer_id = 0;
BackgroundSchedulePool::TaskHolder task;
std::atomic<bool> stream_cancelled{false};
};
}

View File

@ -46,6 +46,10 @@ void registerStorages()
#if USE_RDKAFKA
registerStorageKafka(factory);
#endif
#if USE_AMQPCPP
registerStorageRabbitMQ(factory);
#endif
}
}

View File

@ -47,6 +47,10 @@ void registerStorageMySQL(StorageFactory & factory);
void registerStorageKafka(StorageFactory & factory);
#endif
#if USE_AMQPCPP
void registerStorageRabbitMQ(StorageFactory & factory);
#endif
void registerStorages();
}