mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Merge pull request #11069 from kssenii/add-storage-rabbitmq-read-only
Add storage RabbitMQ
This commit is contained in:
commit
a2b6d58053
3
.gitmodules
vendored
3
.gitmodules
vendored
@ -157,6 +157,9 @@
|
||||
[submodule "contrib/openldap"]
|
||||
path = contrib/openldap
|
||||
url = https://github.com/openldap/openldap.git
|
||||
[submodule "contrib/AMQP-CPP"]
|
||||
path = contrib/AMQP-CPP
|
||||
url = https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git
|
||||
[submodule "contrib/cassandra"]
|
||||
path = contrib/cassandra
|
||||
url = https://github.com/ClickHouse-Extras/cpp-driver.git
|
||||
|
@ -342,6 +342,7 @@ include (cmake/find/sparsehash.cmake)
|
||||
include (cmake/find/re2.cmake)
|
||||
include (cmake/find/libgsasl.cmake)
|
||||
include (cmake/find/rdkafka.cmake)
|
||||
include (cmake/find/amqpcpp.cmake)
|
||||
include (cmake/find/capnp.cmake)
|
||||
include (cmake/find/llvm.cmake)
|
||||
include (cmake/find/opencl.cmake)
|
||||
|
20
cmake/find/amqpcpp.cmake
Normal file
20
cmake/find/amqpcpp.cmake
Normal file
@ -0,0 +1,20 @@
|
||||
SET(ENABLE_AMQPCPP 1)
|
||||
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/AMQP-CPP/CMakeLists.txt")
|
||||
message (WARNING "submodule contrib/AMQP-CPP is missing. to fix try run: \n git submodule update --init --recursive")
|
||||
set (ENABLE_AMQPCPP 0)
|
||||
endif ()
|
||||
|
||||
if (ENABLE_AMQPCPP)
|
||||
|
||||
set (USE_AMQPCPP 1)
|
||||
set (AMQPCPP_LIBRARY AMQP-CPP)
|
||||
|
||||
set (AMQPCPP_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/AMQP-CPP/include")
|
||||
|
||||
list (APPEND AMQPCPP_INCLUDE_DIR
|
||||
"${ClickHouse_SOURCE_DIR}/contrib/AMQP-CPP/include"
|
||||
"${ClickHouse_SOURCE_DIR}/contrib/AMQP-CPP")
|
||||
|
||||
endif()
|
||||
|
||||
message (STATUS "Using AMQP-CPP=${USE_AMQPCPP}: ${AMQPCPP_INCLUDE_DIR} : ${AMQPCPP_LIBRARY}")
|
1
contrib/AMQP-CPP
vendored
Submodule
1
contrib/AMQP-CPP
vendored
Submodule
@ -0,0 +1 @@
|
||||
Subproject commit 1c08399ab0ab9e4042ef8e2bbe9e208e5dcbc13b
|
4
contrib/CMakeLists.txt
vendored
4
contrib/CMakeLists.txt
vendored
@ -301,6 +301,10 @@ if (USE_FASTOPS)
|
||||
add_subdirectory (fastops-cmake)
|
||||
endif()
|
||||
|
||||
if (USE_AMQPCPP)
|
||||
add_subdirectory (amqpcpp-cmake)
|
||||
endif()
|
||||
|
||||
if (USE_CASSANDRA)
|
||||
add_subdirectory (libuv)
|
||||
add_subdirectory (cassandra)
|
||||
|
44
contrib/amqpcpp-cmake/CMakeLists.txt
Normal file
44
contrib/amqpcpp-cmake/CMakeLists.txt
Normal file
@ -0,0 +1,44 @@
|
||||
set (LIBRARY_DIR ${ClickHouse_SOURCE_DIR}/contrib/AMQP-CPP)
|
||||
|
||||
set (SRCS
|
||||
${LIBRARY_DIR}/src/array.cpp
|
||||
${LIBRARY_DIR}/src/channel.cpp
|
||||
${LIBRARY_DIR}/src/channelimpl.cpp
|
||||
${LIBRARY_DIR}/src/connectionimpl.cpp
|
||||
${LIBRARY_DIR}/src/deferredcancel.cpp
|
||||
${LIBRARY_DIR}/src/deferredconfirm.cpp
|
||||
${LIBRARY_DIR}/src/deferredconsumer.cpp
|
||||
${LIBRARY_DIR}/src/deferredextreceiver.cpp
|
||||
${LIBRARY_DIR}/src/deferredget.cpp
|
||||
${LIBRARY_DIR}/src/deferredpublisher.cpp
|
||||
${LIBRARY_DIR}/src/deferredreceiver.cpp
|
||||
${LIBRARY_DIR}/src/field.cpp
|
||||
${LIBRARY_DIR}/src/flags.cpp
|
||||
${LIBRARY_DIR}/src/linux_tcp/openssl.cpp
|
||||
${LIBRARY_DIR}/src/linux_tcp/tcpconnection.cpp
|
||||
${LIBRARY_DIR}/src/receivedframe.cpp
|
||||
${LIBRARY_DIR}/src/table.cpp
|
||||
${LIBRARY_DIR}/src/watchable.cpp
|
||||
)
|
||||
|
||||
add_library(amqp-cpp ${SRCS})
|
||||
|
||||
target_compile_options (amqp-cpp
|
||||
PUBLIC
|
||||
-Wno-old-style-cast
|
||||
-Wno-inconsistent-missing-destructor-override
|
||||
-Wno-deprecated
|
||||
-Wno-unused-parameter
|
||||
-Wno-shadow
|
||||
-Wno-tautological-type-limit-compare
|
||||
-Wno-extra-semi
|
||||
# NOTE: disable all warnings at last because the warning:
|
||||
# "conversion function converting 'XXX' to itself will never be used"
|
||||
# doesn't have it's own diagnostic flag yet.
|
||||
-w
|
||||
)
|
||||
|
||||
target_include_directories (amqp-cpp PUBLIC ${LIBRARY_DIR}/include)
|
||||
|
||||
target_link_libraries (amqp-cpp PUBLIC ssl)
|
||||
|
12
docker/test/integration/compose/docker_compose_rabbitmq.yml
Normal file
12
docker/test/integration/compose/docker_compose_rabbitmq.yml
Normal file
@ -0,0 +1,12 @@
|
||||
version: '2.3'
|
||||
|
||||
services:
|
||||
rabbitmq1:
|
||||
image: rabbitmq:3-management
|
||||
hostname: rabbitmq1
|
||||
ports:
|
||||
- "5672:5672"
|
||||
- "15672:15672"
|
||||
environment:
|
||||
RABBITMQ_DEFAULT_USER: "root"
|
||||
RABBITMQ_DEFAULT_PASS: "clickhouse"
|
@ -66,7 +66,6 @@
|
||||
<https_port>8443</https_port>
|
||||
<tcp_port_secure>9440</tcp_port_secure>
|
||||
-->
|
||||
|
||||
<!-- Used with https_port and tcp_port_secure. Full ssl options list: https://github.com/ClickHouse-Extras/poco/blob/master/NetSSL_OpenSSL/include/Poco/Net/SSLManager.h#L71 -->
|
||||
<openSSL>
|
||||
<server> <!-- Used for https server AND secure tcp port -->
|
||||
|
@ -73,6 +73,10 @@ if(USE_RDKAFKA)
|
||||
add_headers_and_sources(dbms Storages/Kafka)
|
||||
endif()
|
||||
|
||||
if (USE_AMQPCPP)
|
||||
add_headers_and_sources(dbms Storages/RabbitMQ)
|
||||
endif()
|
||||
|
||||
if (USE_AWS_S3)
|
||||
add_headers_and_sources(dbms Common/S3)
|
||||
add_headers_and_sources(dbms Disks/S3)
|
||||
@ -262,6 +266,9 @@ if (USE_RDKAFKA)
|
||||
endif()
|
||||
endif()
|
||||
|
||||
if (USE_AMQPCPP)
|
||||
dbms_target_link_libraries(PUBLIC amqp-cpp)
|
||||
endif()
|
||||
|
||||
if(RE2_INCLUDE_DIR)
|
||||
target_include_directories(clickhouse_common_io SYSTEM BEFORE PUBLIC ${RE2_INCLUDE_DIR})
|
||||
|
@ -496,6 +496,7 @@ namespace ErrorCodes
|
||||
extern const int NO_SUITABLE_FUNCTION_IMPLEMENTATION = 527;
|
||||
extern const int CASSANDRA_INTERNAL_ERROR = 528;
|
||||
extern const int NOT_A_LEADER = 529;
|
||||
extern const int CANNOT_CONNECT_RABBITMQ = 530;
|
||||
|
||||
extern const int KEEPER_EXCEPTION = 999;
|
||||
extern const int POCO_EXCEPTION = 1000;
|
||||
|
@ -74,6 +74,7 @@ struct Settings : public SettingsCollection<Settings>
|
||||
M(SettingMilliseconds, connection_pool_max_wait_ms, 0, "The wait time when the connection pool is full.", 0) \
|
||||
M(SettingMilliseconds, replace_running_query_max_wait_ms, 5000, "The wait time for running query with the same query_id to finish when setting 'replace_running_query' is active.", 0) \
|
||||
M(SettingMilliseconds, kafka_max_wait_ms, 5000, "The wait time for reading from Kafka before retry.", 0) \
|
||||
M(SettingMilliseconds, rabbitmq_max_wait_ms, 5000, "The wait time for reading from RabbitMQ before retry.", 0) \
|
||||
M(SettingUInt64, poll_interval, DBMS_DEFAULT_POLL_INTERVAL, "Block at the query wait loop on the server for the specified number of seconds.", 0) \
|
||||
M(SettingUInt64, idle_connection_timeout, 3600, "Close idle TCP connections after specified number of seconds.", 0) \
|
||||
M(SettingUInt64, distributed_connections_pool_size, DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE, "Maximum number of connections with one remote server in the pool.", 0) \
|
||||
|
@ -5,6 +5,7 @@
|
||||
#cmakedefine01 USE_ICU
|
||||
#cmakedefine01 USE_MYSQL
|
||||
#cmakedefine01 USE_RDKAFKA
|
||||
#cmakedefine01 USE_AMQPCPP
|
||||
#cmakedefine01 USE_EMBEDDED_COMPILER
|
||||
#cmakedefine01 USE_INTERNAL_LLVM_LIBRARY
|
||||
#cmakedefine01 USE_SSL
|
||||
|
14
src/Storages/RabbitMQ/Buffer_fwd.h
Normal file
14
src/Storages/RabbitMQ/Buffer_fwd.h
Normal file
@ -0,0 +1,14 @@
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class ReadBufferFromRabbitMQConsumer;
|
||||
using ConsumerBufferPtr = std::shared_ptr<ReadBufferFromRabbitMQConsumer>;
|
||||
|
||||
class WriteBufferToRabbitMQProducer;
|
||||
using ProducerBufferPtr = std::shared_ptr<WriteBufferToRabbitMQProducer>;
|
||||
|
||||
}
|
156
src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp
Normal file
156
src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp
Normal file
@ -0,0 +1,156 @@
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <Processors/Formats/InputStreamFromInputFormat.h>
|
||||
#include <Storages/RabbitMQ/RabbitMQBlockInputStream.h>
|
||||
#include <Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h>
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
RabbitMQBlockInputStream::RabbitMQBlockInputStream(
|
||||
StorageRabbitMQ & storage_,
|
||||
const StorageMetadataPtr & metadata_snapshot_,
|
||||
const Context & context_,
|
||||
const Names & columns,
|
||||
Poco::Logger * log_)
|
||||
: storage(storage_)
|
||||
, metadata_snapshot(metadata_snapshot_)
|
||||
, context(context_)
|
||||
, column_names(columns)
|
||||
, log(log_)
|
||||
, non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized())
|
||||
, virtual_header(metadata_snapshot->getSampleBlockForColumns({"_exchange"}, storage.getVirtuals(), storage.getStorageID()))
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
RabbitMQBlockInputStream::~RabbitMQBlockInputStream()
|
||||
{
|
||||
if (!claimed)
|
||||
return;
|
||||
|
||||
storage.pushReadBuffer(buffer);
|
||||
}
|
||||
|
||||
|
||||
Block RabbitMQBlockInputStream::getHeader() const
|
||||
{
|
||||
return metadata_snapshot->getSampleBlockForColumns(column_names, storage.getVirtuals(), storage.getStorageID());
|
||||
}
|
||||
|
||||
|
||||
void RabbitMQBlockInputStream::readPrefixImpl()
|
||||
{
|
||||
auto timeout = std::chrono::milliseconds(context.getSettingsRef().rabbitmq_max_wait_ms.totalMilliseconds());
|
||||
|
||||
buffer = storage.popReadBuffer(timeout);
|
||||
claimed = !!buffer;
|
||||
|
||||
if (!buffer || finished)
|
||||
return;
|
||||
|
||||
buffer->checkSubscription();
|
||||
}
|
||||
|
||||
|
||||
Block RabbitMQBlockInputStream::readImpl()
|
||||
{
|
||||
if (!buffer || finished)
|
||||
return Block();
|
||||
|
||||
finished = true;
|
||||
|
||||
MutableColumns result_columns = non_virtual_header.cloneEmptyColumns();
|
||||
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
|
||||
|
||||
auto input_format = FormatFactory::instance().getInputFormat(
|
||||
storage.getFormatName(), *buffer, non_virtual_header, context, 1);
|
||||
|
||||
InputPort port(input_format->getPort().getHeader(), input_format.get());
|
||||
connect(input_format->getPort(), port);
|
||||
port.setNeeded();
|
||||
|
||||
auto read_rabbitmq_message = [&]
|
||||
{
|
||||
size_t new_rows = 0;
|
||||
|
||||
while (true)
|
||||
{
|
||||
auto status = input_format->prepare();
|
||||
|
||||
switch (status)
|
||||
{
|
||||
case IProcessor::Status::Ready:
|
||||
input_format->work();
|
||||
break;
|
||||
|
||||
case IProcessor::Status::Finished:
|
||||
input_format->resetParser();
|
||||
return new_rows;
|
||||
|
||||
case IProcessor::Status::PortFull:
|
||||
{
|
||||
auto chunk = port.pull();
|
||||
|
||||
auto chunk_rows = chunk.getNumRows();
|
||||
new_rows += chunk_rows;
|
||||
|
||||
auto columns = chunk.detachColumns();
|
||||
|
||||
for (size_t i = 0, s = columns.size(); i < s; ++i)
|
||||
{
|
||||
result_columns[i]->insertRangeFrom(*columns[i], 0, columns[i]->size());
|
||||
}
|
||||
break;
|
||||
}
|
||||
case IProcessor::Status::NeedData:
|
||||
case IProcessor::Status::Async:
|
||||
case IProcessor::Status::Wait:
|
||||
case IProcessor::Status::ExpandPipeline:
|
||||
throw Exception("Source processor returned status " + IProcessor::statusToName(status), ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
size_t total_rows = 0;
|
||||
|
||||
while (true)
|
||||
{
|
||||
if (buffer->eof())
|
||||
break;
|
||||
|
||||
auto new_rows = read_rabbitmq_message();
|
||||
|
||||
auto exchange_name = buffer->getExchange();
|
||||
|
||||
for (size_t i = 0; i < new_rows; ++i)
|
||||
{
|
||||
virtual_columns[0]->insert(exchange_name);
|
||||
}
|
||||
|
||||
total_rows = total_rows + new_rows;
|
||||
buffer->allowNext();
|
||||
|
||||
if (!new_rows || !checkTimeLimit())
|
||||
break;
|
||||
}
|
||||
|
||||
if (total_rows == 0)
|
||||
return Block();
|
||||
|
||||
auto result_block = non_virtual_header.cloneWithColumns(std::move(result_columns));
|
||||
auto virtual_block = virtual_header.cloneWithColumns(std::move(virtual_columns));
|
||||
|
||||
for (const auto & column : virtual_block.getColumnsWithTypeAndName())
|
||||
{
|
||||
result_block.insert(column);
|
||||
}
|
||||
|
||||
return result_block;
|
||||
}
|
||||
|
||||
}
|
42
src/Storages/RabbitMQ/RabbitMQBlockInputStream.h
Normal file
42
src/Storages/RabbitMQ/RabbitMQBlockInputStream.h
Normal file
@ -0,0 +1,42 @@
|
||||
#pragma once
|
||||
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Storages/RabbitMQ/StorageRabbitMQ.h>
|
||||
#include <Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class RabbitMQBlockInputStream : public IBlockInputStream
|
||||
{
|
||||
|
||||
public:
|
||||
RabbitMQBlockInputStream(
|
||||
StorageRabbitMQ & storage_,
|
||||
const StorageMetadataPtr & metadata_snapshot_,
|
||||
const Context & context_,
|
||||
const Names & columns,
|
||||
Poco::Logger * log_);
|
||||
|
||||
~RabbitMQBlockInputStream() override;
|
||||
|
||||
String getName() const override { return storage.getName(); }
|
||||
Block getHeader() const override;
|
||||
|
||||
void readPrefixImpl() override;
|
||||
Block readImpl() override;
|
||||
|
||||
private:
|
||||
StorageRabbitMQ & storage;
|
||||
StorageMetadataPtr metadata_snapshot;
|
||||
Context context;
|
||||
Names column_names;
|
||||
Poco::Logger * log;
|
||||
bool finished = false, claimed = false;
|
||||
const Block non_virtual_header, virtual_header;
|
||||
|
||||
ConsumerBufferPtr buffer;
|
||||
};
|
||||
|
||||
}
|
61
src/Storages/RabbitMQ/RabbitMQBlockOutputStream.cpp
Normal file
61
src/Storages/RabbitMQ/RabbitMQBlockOutputStream.cpp
Normal file
@ -0,0 +1,61 @@
|
||||
#include <Storages/RabbitMQ/RabbitMQBlockOutputStream.h>
|
||||
#include <Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h>
|
||||
#include <Storages/RabbitMQ/StorageRabbitMQ.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <common/logger_useful.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int CANNOT_CREATE_IO_BUFFER;
|
||||
}
|
||||
|
||||
|
||||
RabbitMQBlockOutputStream::RabbitMQBlockOutputStream(
|
||||
StorageRabbitMQ & storage_,
|
||||
const StorageMetadataPtr & metadata_snapshot_,
|
||||
const Context & context_)
|
||||
: storage(storage_)
|
||||
, metadata_snapshot(metadata_snapshot_)
|
||||
, context(context_)
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
Block RabbitMQBlockOutputStream::getHeader() const
|
||||
{
|
||||
return metadata_snapshot->getSampleBlockNonMaterialized();
|
||||
}
|
||||
|
||||
|
||||
void RabbitMQBlockOutputStream::writePrefix()
|
||||
{
|
||||
buffer = storage.createWriteBuffer();
|
||||
if (!buffer)
|
||||
throw Exception("Failed to create RabbitMQ producer!", ErrorCodes::CANNOT_CREATE_IO_BUFFER);
|
||||
|
||||
buffer->activateWriting();
|
||||
|
||||
child = FormatFactory::instance().getOutput(
|
||||
storage.getFormatName(), *buffer, getHeader(), context, [this](const Columns & /* columns */, size_t /* rows */)
|
||||
{
|
||||
buffer->countRow();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
void RabbitMQBlockOutputStream::write(const Block & block)
|
||||
{
|
||||
child->write(block);
|
||||
}
|
||||
|
||||
|
||||
void RabbitMQBlockOutputStream::writeSuffix()
|
||||
{
|
||||
child->writeSuffix();
|
||||
}
|
||||
|
||||
}
|
30
src/Storages/RabbitMQ/RabbitMQBlockOutputStream.h
Normal file
30
src/Storages/RabbitMQ/RabbitMQBlockOutputStream.h
Normal file
@ -0,0 +1,30 @@
|
||||
#pragma once
|
||||
|
||||
#include <DataStreams/IBlockOutputStream.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Storages/RabbitMQ/StorageRabbitMQ.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class RabbitMQBlockOutputStream : public IBlockOutputStream
|
||||
{
|
||||
|
||||
public:
|
||||
explicit RabbitMQBlockOutputStream(StorageRabbitMQ & storage_, const StorageMetadataPtr & metadata_snapshot_, const Context & context_);
|
||||
|
||||
Block getHeader() const override;
|
||||
|
||||
void writePrefix() override;
|
||||
void write(const Block & block) override;
|
||||
void writeSuffix() override;
|
||||
|
||||
private:
|
||||
StorageRabbitMQ & storage;
|
||||
StorageMetadataPtr metadata_snapshot;
|
||||
Context context;
|
||||
ProducerBufferPtr buffer;
|
||||
BlockOutputStreamPtr child;
|
||||
};
|
||||
}
|
46
src/Storages/RabbitMQ/RabbitMQHandler.cpp
Normal file
46
src/Storages/RabbitMQ/RabbitMQHandler.cpp
Normal file
@ -0,0 +1,46 @@
|
||||
#include <common/logger_useful.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Storages/RabbitMQ/RabbitMQHandler.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int CANNOT_CONNECT_RABBITMQ;
|
||||
}
|
||||
|
||||
/* The object of this class is shared between concurrent consumers (who share the same connection == share the same
|
||||
* event loop and handler).
|
||||
*/
|
||||
RabbitMQHandler::RabbitMQHandler(uv_loop_t * loop_, Poco::Logger * log_) :
|
||||
AMQP::LibUvHandler(loop_),
|
||||
loop(loop_),
|
||||
log(log_)
|
||||
{
|
||||
}
|
||||
|
||||
void RabbitMQHandler::onError(AMQP::TcpConnection * connection, const char * message)
|
||||
{
|
||||
LOG_ERROR(log, "Library error report: {}", message);
|
||||
|
||||
if (!connection->usable() || !connection->ready())
|
||||
throw Exception("Connection error", ErrorCodes::CANNOT_CONNECT_RABBITMQ);
|
||||
}
|
||||
|
||||
void RabbitMQHandler::startLoop()
|
||||
{
|
||||
std::lock_guard lock(startup_mutex);
|
||||
/// stop_loop variable is updated in a separate thread
|
||||
while (!stop_loop.load())
|
||||
uv_run(loop, UV_RUN_NOWAIT);
|
||||
}
|
||||
|
||||
void RabbitMQHandler::iterateLoop()
|
||||
{
|
||||
std::unique_lock lock(startup_mutex, std::defer_lock);
|
||||
if (lock.try_lock())
|
||||
uv_run(loop, UV_RUN_NOWAIT);
|
||||
}
|
||||
|
||||
}
|
33
src/Storages/RabbitMQ/RabbitMQHandler.h
Normal file
33
src/Storages/RabbitMQ/RabbitMQHandler.h
Normal file
@ -0,0 +1,33 @@
|
||||
#pragma once
|
||||
|
||||
#include <thread>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <amqpcpp.h>
|
||||
#include <amqpcpp/linux_tcp.h>
|
||||
#include <common/types.h>
|
||||
#include <amqpcpp/libuv.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class RabbitMQHandler : public AMQP::LibUvHandler
|
||||
{
|
||||
|
||||
public:
|
||||
RabbitMQHandler(uv_loop_t * loop_, Poco::Logger * log_);
|
||||
void onError(AMQP::TcpConnection * connection, const char * message) override;
|
||||
|
||||
void stop() { stop_loop.store(true); }
|
||||
void startLoop();
|
||||
void iterateLoop();
|
||||
|
||||
private:
|
||||
uv_loop_t * loop;
|
||||
Poco::Logger * log;
|
||||
|
||||
std::atomic<bool> stop_loop = false;
|
||||
std::mutex startup_mutex;
|
||||
};
|
||||
|
||||
}
|
42
src/Storages/RabbitMQ/RabbitMQSettings.cpp
Normal file
42
src/Storages/RabbitMQ/RabbitMQSettings.cpp
Normal file
@ -0,0 +1,42 @@
|
||||
#include <Storages/RabbitMQ/RabbitMQSettings.h>
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
#include <Parsers/ASTSetQuery.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Core/SettingsCollectionImpl.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int UNKNOWN_SETTING;
|
||||
}
|
||||
|
||||
IMPLEMENT_SETTINGS_COLLECTION(RabbitMQSettings, LIST_OF_RABBITMQ_SETTINGS)
|
||||
|
||||
void RabbitMQSettings::loadFromQuery(ASTStorage & storage_def)
|
||||
{
|
||||
if (storage_def.settings)
|
||||
{
|
||||
try
|
||||
{
|
||||
applyChanges(storage_def.settings->changes);
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
if (e.code() == ErrorCodes::UNKNOWN_SETTING)
|
||||
throw Exception(e.message() + " for storage " + storage_def.engine->name, ErrorCodes::BAD_ARGUMENTS);
|
||||
else
|
||||
e.rethrow();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
auto settings_ast = std::make_shared<ASTSetQuery>();
|
||||
settings_ast->is_standalone = false;
|
||||
storage_def.set(storage_def.settings, settings_ast);
|
||||
}
|
||||
}
|
||||
}
|
27
src/Storages/RabbitMQ/RabbitMQSettings.h
Normal file
27
src/Storages/RabbitMQ/RabbitMQSettings.h
Normal file
@ -0,0 +1,27 @@
|
||||
#pragma once
|
||||
|
||||
#include <Core/SettingsCollection.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class ASTStorage;
|
||||
|
||||
struct RabbitMQSettings : public SettingsCollection<RabbitMQSettings>
|
||||
{
|
||||
|
||||
#define LIST_OF_RABBITMQ_SETTINGS(M) \
|
||||
M(SettingString, rabbitmq_host_port, "", "A host-port to connect to RabbitMQ server.", 0) \
|
||||
M(SettingString, rabbitmq_routing_key_list, "5672", "A string of routing keys, separated by dots.", 0) \
|
||||
M(SettingString, rabbitmq_exchange_name, "clickhouse-exchange", "The exchange name, to which messages are sent.", 0) \
|
||||
M(SettingString, rabbitmq_format, "", "The message format.", 0) \
|
||||
M(SettingChar, rabbitmq_row_delimiter, '\0', "The character to be considered as a delimiter.", 0) \
|
||||
M(SettingString, rabbitmq_exchange_type, "default", "The exchange type.", 0) \
|
||||
M(SettingUInt64, rabbitmq_num_consumers, 1, "The number of consumer channels per table.", 0) \
|
||||
M(SettingUInt64, rabbitmq_num_queues, 1, "The number of queues per consumer.", 0) \
|
||||
M(SettingBool, rabbitmq_transactional_channel, false, "Use transactional channel for publishing.", 0) \
|
||||
|
||||
DECLARE_SETTINGS_COLLECTION(LIST_OF_RABBITMQ_SETTINGS)
|
||||
|
||||
void loadFromQuery(ASTStorage & storage_def);
|
||||
};
|
||||
}
|
421
src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp
Normal file
421
src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp
Normal file
@ -0,0 +1,421 @@
|
||||
#include <utility>
|
||||
#include <chrono>
|
||||
#include <thread>
|
||||
#include <mutex>
|
||||
#include <atomic>
|
||||
#include <memory>
|
||||
#include <Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h>
|
||||
#include <Storages/RabbitMQ/RabbitMQHandler.h>
|
||||
#include <boost/algorithm/string/split.hpp>
|
||||
#include <common/logger_useful.h>
|
||||
#include "Poco/Timer.h"
|
||||
#include <amqpcpp.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
namespace ExchangeType
|
||||
{
|
||||
/// Note that default here means default by implementation and not by rabbitmq settings
|
||||
static const String DEFAULT = "default";
|
||||
static const String FANOUT = "fanout";
|
||||
static const String DIRECT = "direct";
|
||||
static const String TOPIC = "topic";
|
||||
static const String HASH = "consistent_hash";
|
||||
static const String HEADERS = "headers";
|
||||
}
|
||||
|
||||
static const auto QUEUE_SIZE = 50000; /// Equals capacity of a single rabbitmq queue
|
||||
|
||||
ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
|
||||
ChannelPtr consumer_channel_,
|
||||
HandlerPtr event_handler_,
|
||||
const String & exchange_name_,
|
||||
const Names & routing_keys_,
|
||||
size_t channel_id_,
|
||||
Poco::Logger * log_,
|
||||
char row_delimiter_,
|
||||
bool bind_by_id_,
|
||||
size_t num_queues_,
|
||||
const String & exchange_type_,
|
||||
const String & local_exchange_,
|
||||
const std::atomic<bool> & stopped_)
|
||||
: ReadBuffer(nullptr, 0)
|
||||
, consumer_channel(std::move(consumer_channel_))
|
||||
, event_handler(event_handler_)
|
||||
, exchange_name(exchange_name_)
|
||||
, routing_keys(routing_keys_)
|
||||
, channel_id(channel_id_)
|
||||
, log(log_)
|
||||
, row_delimiter(row_delimiter_)
|
||||
, bind_by_id(bind_by_id_)
|
||||
, num_queues(num_queues_)
|
||||
, exchange_type(exchange_type_)
|
||||
, local_exchange(local_exchange_)
|
||||
, local_default_exchange(local_exchange + "_" + ExchangeType::DIRECT)
|
||||
, local_hash_exchange(local_exchange + "_" + ExchangeType::HASH)
|
||||
, stopped(stopped_)
|
||||
, messages(QUEUE_SIZE * num_queues)
|
||||
{
|
||||
exchange_type_set = exchange_type != ExchangeType::DEFAULT;
|
||||
|
||||
/* One queue per consumer can handle up to 50000 messages. More queues per consumer can be added.
|
||||
* By default there is one queue per consumer.
|
||||
*/
|
||||
for (size_t queue_id = 0; queue_id < num_queues; ++queue_id)
|
||||
{
|
||||
/// Queue bingings must be declared before any publishing => it must be done here and not in readPrefix()
|
||||
initQueueBindings(queue_id);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
ReadBufferFromRabbitMQConsumer::~ReadBufferFromRabbitMQConsumer()
|
||||
{
|
||||
consumer_channel->close();
|
||||
|
||||
messages.clear();
|
||||
BufferBase::set(nullptr, 0, 0);
|
||||
}
|
||||
|
||||
|
||||
void ReadBufferFromRabbitMQConsumer::initExchange()
|
||||
{
|
||||
/* This direct-exchange is used for default implemenation and for INSERT query (so it is always declared). If exchange_type
|
||||
* is not set, then there are only two exchanges - external, defined by the client, and local, unique for each table (default).
|
||||
* This strict division to external and local exchanges is needed to avoid too much complexity with defining exchange_name
|
||||
* for INSERT query producer and, in general, it is better to distinguish them into separate ones.
|
||||
*/
|
||||
consumer_channel->declareExchange(local_default_exchange, AMQP::direct).onError([&](const char * message)
|
||||
{
|
||||
local_exchange_declared = false;
|
||||
LOG_ERROR(log, "Failed to declare local direct-exchange. Reason: {}", message);
|
||||
});
|
||||
|
||||
if (!exchange_type_set)
|
||||
{
|
||||
consumer_channel->declareExchange(exchange_name, AMQP::fanout).onError([&](const char * message)
|
||||
{
|
||||
local_exchange_declared = false;
|
||||
LOG_ERROR(log, "Failed to declare default fanout-exchange. Reason: {}", message);
|
||||
});
|
||||
|
||||
/// With fanout exchange the binding key is ignored - a parameter might be arbitrary. All distribution lies on local_exchange.
|
||||
consumer_channel->bindExchange(exchange_name, local_default_exchange, routing_keys[0]).onError([&](const char * message)
|
||||
{
|
||||
local_exchange_declared = false;
|
||||
LOG_ERROR(log, "Failed to bind local direct-exchange to fanout-exchange. Reason: {}", message);
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
AMQP::ExchangeType type;
|
||||
if (exchange_type == ExchangeType::FANOUT) type = AMQP::ExchangeType::fanout;
|
||||
else if (exchange_type == ExchangeType::DIRECT) type = AMQP::ExchangeType::direct;
|
||||
else if (exchange_type == ExchangeType::TOPIC) type = AMQP::ExchangeType::topic;
|
||||
else if (exchange_type == ExchangeType::HASH) type = AMQP::ExchangeType::consistent_hash;
|
||||
else if (exchange_type == ExchangeType::HEADERS) type = AMQP::ExchangeType::headers;
|
||||
else throw Exception("Invalid exchange type", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
/* Declare client's exchange of the specified type and bind it to hash-exchange (if it is not already hash-exchange), which
|
||||
* will evenly distribute messages between all consumers. (This enables better scaling as without hash-exchange - the only
|
||||
* option to avoid getting the same messages more than once - is having only one consumer with one queue)
|
||||
*/
|
||||
consumer_channel->declareExchange(exchange_name, type).onError([&](const char * message)
|
||||
{
|
||||
local_exchange_declared = false;
|
||||
LOG_ERROR(log, "Failed to declare client's {} exchange. Reason: {}", exchange_type, message);
|
||||
});
|
||||
|
||||
/// No need for declaring hash-exchange if there is only one consumer with one queue or exchange type is already hash
|
||||
if (!bind_by_id)
|
||||
return;
|
||||
|
||||
hash_exchange = true;
|
||||
|
||||
if (exchange_type == ExchangeType::HASH)
|
||||
return;
|
||||
|
||||
/* By default hash exchange distributes messages based on a hash value of a routing key, which must be a string integer. But
|
||||
* in current case we use hash exchange for binding to another exchange of some other type, which needs its own routing keys
|
||||
* of other types: headers, patterns and string-keys. This means that hash property must be changed.
|
||||
*/
|
||||
AMQP::Table binding_arguments;
|
||||
binding_arguments["hash-property"] = "message_id";
|
||||
|
||||
/// Declare exchange for sharding.
|
||||
consumer_channel->declareExchange(local_hash_exchange, AMQP::consistent_hash, binding_arguments)
|
||||
.onError([&](const char * message)
|
||||
{
|
||||
local_exchange_declared = false;
|
||||
LOG_ERROR(log, "Failed to declare {} exchange: {}", exchange_type, message);
|
||||
});
|
||||
|
||||
/// Then bind client's exchange to sharding exchange (by keys, specified by the client):
|
||||
|
||||
if (exchange_type == ExchangeType::HEADERS)
|
||||
{
|
||||
AMQP::Table binding_arguments;
|
||||
std::vector<String> matching;
|
||||
|
||||
for (const auto & header : routing_keys)
|
||||
{
|
||||
boost::split(matching, header, [](char c){ return c == '='; });
|
||||
binding_arguments[matching[0]] = matching[1];
|
||||
matching.clear();
|
||||
}
|
||||
|
||||
/// Routing key can be arbitrary here.
|
||||
consumer_channel->bindExchange(exchange_name, local_hash_exchange, routing_keys[0], binding_arguments)
|
||||
.onError([&](const char * message)
|
||||
{
|
||||
local_exchange_declared = false;
|
||||
LOG_ERROR(log, "Failed to bind local hash exchange to client's exchange. Reason: {}", message);
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
for (const auto & routing_key : routing_keys)
|
||||
{
|
||||
consumer_channel->bindExchange(exchange_name, local_hash_exchange, routing_key).onError([&](const char * message)
|
||||
{
|
||||
local_exchange_declared = false;
|
||||
LOG_ERROR(log, "Failed to bind local hash exchange to client's exchange. Reason: {}", message);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
|
||||
{
|
||||
/// These variables might be updated later from a separate thread in onError callbacks.
|
||||
if (!local_exchange_declared || (exchange_type_set && !local_hash_exchange_declared))
|
||||
{
|
||||
initExchange();
|
||||
local_exchange_declared = true;
|
||||
local_hash_exchange_declared = true;
|
||||
}
|
||||
|
||||
bool default_bindings_created = false, default_bindings_error = false;
|
||||
bool bindings_created = false, bindings_error = false;
|
||||
|
||||
consumer_channel->declareQueue(AMQP::exclusive)
|
||||
.onSuccess([&](const std::string & queue_name_, int /* msgcount */, int /* consumercount */)
|
||||
{
|
||||
queues.emplace_back(queue_name_);
|
||||
subscribed_queue[queue_name_] = false;
|
||||
|
||||
String binding_key = routing_keys[0];
|
||||
|
||||
/* Every consumer has at least one unique queue. Bind the queues to exchange based on the consumer_channel_id
|
||||
* in case there is one queue per consumer and bind by queue_id in case there is more than 1 queue per consumer.
|
||||
* (queue_id is based on channel_id)
|
||||
*/
|
||||
if (bind_by_id || hash_exchange)
|
||||
{
|
||||
if (queues.size() == 1)
|
||||
{
|
||||
binding_key = std::to_string(channel_id);
|
||||
}
|
||||
else
|
||||
{
|
||||
binding_key = std::to_string(channel_id + queue_id);
|
||||
}
|
||||
}
|
||||
|
||||
/// Bind queue to exchange that is used for INSERT query and also for default implementation.
|
||||
consumer_channel->bindQueue(local_default_exchange, queue_name_, binding_key)
|
||||
.onSuccess([&]
|
||||
{
|
||||
default_bindings_created = true;
|
||||
})
|
||||
.onError([&](const char * message)
|
||||
{
|
||||
default_bindings_error = true;
|
||||
LOG_ERROR(log, "Failed to bind to key {}. Reason: {}", binding_key, message);
|
||||
});
|
||||
|
||||
/* Subscription can probably be moved back to readPrefix(), but not sure whether it is better in regard to speed, because
|
||||
* if moved there, it must(!) be wrapped inside a channel->onSuccess callback or any other, otherwise
|
||||
* consumer might fail to subscribe and no resubscription will help.
|
||||
*/
|
||||
subscribe(queues.back());
|
||||
|
||||
LOG_DEBUG(log, "Queue " + queue_name_ + " is declared");
|
||||
|
||||
if (exchange_type_set)
|
||||
{
|
||||
if (hash_exchange)
|
||||
{
|
||||
/* If exchange_type == hash, then bind directly to this client's exchange (because there is no need for a distributor
|
||||
* exchange as it is already hash-exchange), otherwise hash-exchange is a local distributor exchange.
|
||||
*/
|
||||
String current_hash_exchange = exchange_type == ExchangeType::HASH ? exchange_name : local_hash_exchange;
|
||||
|
||||
/// If hash-exchange is used for messages distribution, then the binding key is ignored - can be arbitrary.
|
||||
consumer_channel->bindQueue(current_hash_exchange, queue_name_, binding_key)
|
||||
.onSuccess([&]
|
||||
{
|
||||
bindings_created = true;
|
||||
})
|
||||
.onError([&](const char * message)
|
||||
{
|
||||
bindings_error = true;
|
||||
LOG_ERROR(log, "Failed to create queue binding to key {}. Reason: {}", binding_key, message);
|
||||
});
|
||||
}
|
||||
else if (exchange_type == ExchangeType::HEADERS)
|
||||
{
|
||||
AMQP::Table binding_arguments;
|
||||
std::vector<String> matching;
|
||||
|
||||
/// It is not parsed for the second time - if it was parsed above, then it would never end up here.
|
||||
for (const auto & header : routing_keys)
|
||||
{
|
||||
boost::split(matching, header, [](char c){ return c == '='; });
|
||||
binding_arguments[matching[0]] = matching[1];
|
||||
matching.clear();
|
||||
}
|
||||
|
||||
consumer_channel->bindQueue(exchange_name, queue_name_, routing_keys[0], binding_arguments)
|
||||
.onSuccess([&]
|
||||
{
|
||||
bindings_created = true;
|
||||
})
|
||||
.onError([&](const char * message)
|
||||
{
|
||||
bindings_error = true;
|
||||
LOG_ERROR(log, "Failed to bind queue to key. Reason: {}", message);
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Means there is only one queue with one consumer - no even distribution needed - no hash-exchange.
|
||||
for (const auto & routing_key : routing_keys)
|
||||
{
|
||||
/// Binding directly to exchange, specified by the client.
|
||||
consumer_channel->bindQueue(exchange_name, queue_name_, routing_key)
|
||||
.onSuccess([&]
|
||||
{
|
||||
bindings_created = true;
|
||||
})
|
||||
.onError([&](const char * message)
|
||||
{
|
||||
bindings_error = true;
|
||||
LOG_ERROR(log, "Failed to bind queue to key. Reason: {}", message);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.onError([&](const char * message)
|
||||
{
|
||||
default_bindings_error = true;
|
||||
LOG_ERROR(log, "Failed to declare queue on the channel. Reason: {}", message);
|
||||
});
|
||||
|
||||
/* Run event loop (which updates local variables in a separate thread) until bindings are created or failed to be created.
|
||||
* It is important at this moment to make sure that queue bindings are created before any publishing can happen because
|
||||
* otherwise messages will be routed nowhere.
|
||||
*/
|
||||
while (!default_bindings_created && !default_bindings_error || (exchange_type_set && !bindings_created && !bindings_error))
|
||||
{
|
||||
iterateEventLoop();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name)
|
||||
{
|
||||
if (subscribed_queue[queue_name])
|
||||
return;
|
||||
|
||||
consumer_channel->consume(queue_name, AMQP::noack)
|
||||
.onSuccess([&](const std::string & /* consumer */)
|
||||
{
|
||||
subscribed_queue[queue_name] = true;
|
||||
consumer_error = false;
|
||||
++count_subscribed;
|
||||
|
||||
LOG_TRACE(log, "Consumer {} is subscribed to queue {}", channel_id, queue_name);
|
||||
})
|
||||
.onReceived([&](const AMQP::Message & message, uint64_t /* deliveryTag */, bool /* redelivered */)
|
||||
{
|
||||
size_t message_size = message.bodySize();
|
||||
if (message_size && message.body() != nullptr)
|
||||
{
|
||||
String message_received = std::string(message.body(), message.body() + message_size);
|
||||
if (row_delimiter != '\0')
|
||||
{
|
||||
message_received += row_delimiter;
|
||||
}
|
||||
|
||||
messages.push(message_received);
|
||||
}
|
||||
})
|
||||
.onError([&](const char * message)
|
||||
{
|
||||
consumer_error = true;
|
||||
LOG_ERROR(log, "Consumer {} failed. Reason: {}", channel_id, message);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
void ReadBufferFromRabbitMQConsumer::checkSubscription()
|
||||
{
|
||||
if (count_subscribed == num_queues)
|
||||
return;
|
||||
|
||||
wait_subscribed = num_queues;
|
||||
|
||||
/// These variables are updated in a separate thread.
|
||||
while (count_subscribed != wait_subscribed && !consumer_error)
|
||||
{
|
||||
iterateEventLoop();
|
||||
}
|
||||
|
||||
LOG_TRACE(log, "Consumer {} is subscribed to {} queues", channel_id, count_subscribed);
|
||||
|
||||
/// Updated in callbacks which are run by the loop.
|
||||
if (count_subscribed == num_queues)
|
||||
return;
|
||||
|
||||
/// A case that should never normally happen.
|
||||
for (auto & queue : queues)
|
||||
{
|
||||
subscribe(queue);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void ReadBufferFromRabbitMQConsumer::iterateEventLoop()
|
||||
{
|
||||
event_handler->iterateLoop();
|
||||
}
|
||||
|
||||
|
||||
bool ReadBufferFromRabbitMQConsumer::nextImpl()
|
||||
{
|
||||
if (stopped || !allowed)
|
||||
return false;
|
||||
|
||||
if (messages.tryPop(current))
|
||||
{
|
||||
auto * new_position = const_cast<char *>(current.data());
|
||||
BufferBase::set(new_position, current.size(), 0);
|
||||
allowed = false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
87
src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h
Normal file
87
src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h
Normal file
@ -0,0 +1,87 @@
|
||||
#pragma once
|
||||
|
||||
#include <Core/Names.h>
|
||||
#include <Core/Types.h>
|
||||
#include <IO/ReadBuffer.h>
|
||||
#include <amqpcpp.h>
|
||||
#include <Storages/RabbitMQ/RabbitMQHandler.h>
|
||||
#include <Common/ConcurrentBoundedQueue.h>
|
||||
|
||||
namespace Poco
|
||||
{
|
||||
class Logger;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
using ChannelPtr = std::shared_ptr<AMQP::TcpChannel>;
|
||||
using HandlerPtr = std::shared_ptr<RabbitMQHandler>;
|
||||
|
||||
class ReadBufferFromRabbitMQConsumer : public ReadBuffer
|
||||
{
|
||||
|
||||
public:
|
||||
ReadBufferFromRabbitMQConsumer(
|
||||
ChannelPtr consumer_channel_,
|
||||
HandlerPtr event_handler_,
|
||||
const String & exchange_name_,
|
||||
const Names & routing_keys_,
|
||||
size_t channel_id_,
|
||||
Poco::Logger * log_,
|
||||
char row_delimiter_,
|
||||
bool bind_by_id_,
|
||||
size_t num_queues_,
|
||||
const String & exchange_type_,
|
||||
const String & local_exchange_,
|
||||
const std::atomic<bool> & stopped_);
|
||||
|
||||
~ReadBufferFromRabbitMQConsumer() override;
|
||||
|
||||
void allowNext() { allowed = true; } // Allow to read next message.
|
||||
void checkSubscription();
|
||||
|
||||
auto getExchange() const { return exchange_name; }
|
||||
|
||||
private:
|
||||
ChannelPtr consumer_channel;
|
||||
HandlerPtr event_handler;
|
||||
|
||||
const String exchange_name;
|
||||
const Names routing_keys;
|
||||
const size_t channel_id;
|
||||
const bool bind_by_id;
|
||||
const size_t num_queues;
|
||||
|
||||
const String exchange_type;
|
||||
const String local_exchange;
|
||||
const String local_default_exchange;
|
||||
const String local_hash_exchange;
|
||||
|
||||
Poco::Logger * log;
|
||||
char row_delimiter;
|
||||
bool stalled = false;
|
||||
bool allowed = true;
|
||||
const std::atomic<bool> & stopped;
|
||||
|
||||
String default_local_exchange;
|
||||
bool local_exchange_declared = false, local_hash_exchange_declared = false;
|
||||
bool exchange_type_set = false, hash_exchange = false;
|
||||
|
||||
std::atomic<bool> consumer_error = false;
|
||||
std::atomic<size_t> count_subscribed = 0, wait_subscribed;
|
||||
|
||||
ConcurrentBoundedQueue<String> messages;
|
||||
String current;
|
||||
std::vector<String> queues;
|
||||
std::unordered_map<String, bool> subscribed_queue;
|
||||
|
||||
bool nextImpl() override;
|
||||
|
||||
void initExchange();
|
||||
void initQueueBindings(const size_t queue_id);
|
||||
void subscribe(const String & queue_name);
|
||||
void iterateEventLoop();
|
||||
|
||||
};
|
||||
}
|
580
src/Storages/RabbitMQ/StorageRabbitMQ.cpp
Normal file
580
src/Storages/RabbitMQ/StorageRabbitMQ.cpp
Normal file
@ -0,0 +1,580 @@
|
||||
#include <Storages/RabbitMQ/StorageRabbitMQ.h>
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
#include <DataStreams/ConvertingBlockInputStream.h>
|
||||
#include <DataStreams/LimitBlockInputStream.h>
|
||||
#include <DataStreams/UnionBlockInputStream.h>
|
||||
#include <DataStreams/copyData.h>
|
||||
#include <DataTypes/DataTypeDateTime.h>
|
||||
#include <DataTypes/DataTypeNullable.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <Interpreters/InterpreterInsertQuery.h>
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
#include <Parsers/ASTExpressionList.h>
|
||||
#include <Parsers/ASTInsertQuery.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Storages/RabbitMQ/RabbitMQSettings.h>
|
||||
#include <Storages/RabbitMQ/RabbitMQBlockInputStream.h>
|
||||
#include <Storages/RabbitMQ/RabbitMQBlockOutputStream.h>
|
||||
#include <Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h>
|
||||
#include <Storages/RabbitMQ/RabbitMQHandler.h>
|
||||
#include <Storages/StorageFactory.h>
|
||||
#include <Storages/StorageMaterializedView.h>
|
||||
#include <boost/algorithm/string/replace.hpp>
|
||||
#include <boost/algorithm/string/split.hpp>
|
||||
#include <boost/algorithm/string/trim.hpp>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/Macros.h>
|
||||
#include <Common/config_version.h>
|
||||
#include <Common/setThreadName.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <Common/quoteString.h>
|
||||
#include <Common/parseAddress.h>
|
||||
#include <Processors/Sources/SourceFromInputStream.h>
|
||||
#include <amqpcpp.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
static const auto CONNECT_SLEEP = 200;
|
||||
static const auto RETRIES_MAX = 1000;
|
||||
static const auto HEARTBEAT_RESCHEDULE_MS = 3000;
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int CANNOT_CONNECT_RABBITMQ;
|
||||
}
|
||||
|
||||
|
||||
StorageRabbitMQ::StorageRabbitMQ(
|
||||
const StorageID & table_id_,
|
||||
Context & context_,
|
||||
const ColumnsDescription & columns_,
|
||||
const String & host_port_,
|
||||
const Names & routing_keys_,
|
||||
const String & exchange_name_,
|
||||
const String & format_name_,
|
||||
char row_delimiter_,
|
||||
const String & exchange_type_,
|
||||
size_t num_consumers_,
|
||||
size_t num_queues_,
|
||||
const bool use_transactional_channel_)
|
||||
: IStorage(table_id_)
|
||||
, global_context(context_.getGlobalContext())
|
||||
, rabbitmq_context(Context(global_context))
|
||||
, routing_keys(global_context.getMacros()->expand(routing_keys_))
|
||||
, exchange_name(exchange_name_)
|
||||
, format_name(global_context.getMacros()->expand(format_name_))
|
||||
, row_delimiter(row_delimiter_)
|
||||
, num_consumers(num_consumers_)
|
||||
, num_queues(num_queues_)
|
||||
, exchange_type(exchange_type_)
|
||||
, use_transactional_channel(use_transactional_channel_)
|
||||
, log(&Poco::Logger::get("StorageRabbitMQ (" + table_id_.table_name + ")"))
|
||||
, semaphore(0, num_consumers_)
|
||||
, login_password(std::make_pair(
|
||||
global_context.getConfigRef().getString("rabbitmq.username"),
|
||||
global_context.getConfigRef().getString("rabbitmq.password")))
|
||||
, parsed_address(parseAddress(global_context.getMacros()->expand(host_port_), 5672))
|
||||
{
|
||||
loop = std::make_unique<uv_loop_t>();
|
||||
uv_loop_init(loop.get());
|
||||
|
||||
event_handler = std::make_shared<RabbitMQHandler>(loop.get(), log);
|
||||
connection = std::make_shared<AMQP::TcpConnection>(event_handler.get(), AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login(login_password.first, login_password.second), "/"));
|
||||
|
||||
size_t cnt_retries = 0;
|
||||
while (!connection->ready() && ++cnt_retries != RETRIES_MAX)
|
||||
{
|
||||
event_handler->iterateLoop();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP));
|
||||
}
|
||||
|
||||
if (!connection->ready())
|
||||
throw Exception("Cannot set up connection for consumers", ErrorCodes::CANNOT_CONNECT_RABBITMQ);
|
||||
|
||||
rabbitmq_context.makeQueryContext();
|
||||
StorageInMemoryMetadata storage_metadata;
|
||||
storage_metadata.setColumns(columns_);
|
||||
setInMemoryMetadata(storage_metadata);
|
||||
|
||||
streaming_task = global_context.getSchedulePool().createTask("RabbitMQStreamingTask", [this]{ threadFunc(); });
|
||||
streaming_task->deactivate();
|
||||
heartbeat_task = global_context.getSchedulePool().createTask("RabbitMQHeartbeatTask", [this]{ heartbeatFunc(); });
|
||||
heartbeat_task->deactivate();
|
||||
|
||||
bind_by_id = num_consumers > 1 || num_queues > 1;
|
||||
|
||||
auto table_id = getStorageID();
|
||||
String table_name = table_id.table_name;
|
||||
|
||||
/// Make sure that local exchange name is unique for each table and is not the same as client's exchange name
|
||||
local_exchange_name = exchange_name + "_" + table_name;
|
||||
|
||||
/// One looping task for all consumers as they share the same connection == the same handler == the same event loop
|
||||
looping_task = global_context.getSchedulePool().createTask("RabbitMQLoopingTask", [this]{ loopingFunc(); });
|
||||
looping_task->deactivate();
|
||||
}
|
||||
|
||||
|
||||
void StorageRabbitMQ::heartbeatFunc()
|
||||
{
|
||||
if (!stream_cancelled)
|
||||
{
|
||||
LOG_TRACE(log, "Sending RabbitMQ heartbeat");
|
||||
connection->heartbeat();
|
||||
heartbeat_task->scheduleAfter(HEARTBEAT_RESCHEDULE_MS);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void StorageRabbitMQ::loopingFunc()
|
||||
{
|
||||
LOG_DEBUG(log, "Starting event looping iterations");
|
||||
event_handler->startLoop();
|
||||
}
|
||||
|
||||
|
||||
Pipes StorageRabbitMQ::read(
|
||||
const Names & column_names,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const SelectQueryInfo & /* query_info */,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum /* processed_stage */,
|
||||
size_t /* max_block_size */,
|
||||
unsigned /* num_streams */)
|
||||
{
|
||||
if (num_created_consumers == 0)
|
||||
return {};
|
||||
|
||||
Pipes pipes;
|
||||
pipes.reserve(num_created_consumers);
|
||||
|
||||
auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
|
||||
for (size_t i = 0; i < num_created_consumers; ++i)
|
||||
{
|
||||
auto rabbit_stream = std::make_shared<RabbitMQBlockInputStream>(
|
||||
*this, metadata_snapshot, context, column_names, log);
|
||||
auto converting_stream = std::make_shared<ConvertingBlockInputStream>(
|
||||
rabbit_stream, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Name);
|
||||
pipes.emplace_back(std::make_shared<SourceFromInputStream>(converting_stream));
|
||||
}
|
||||
|
||||
if (!loop_started)
|
||||
{
|
||||
loop_started = true;
|
||||
looping_task->activateAndSchedule();
|
||||
}
|
||||
|
||||
LOG_DEBUG(log, "Starting reading {} streams", pipes.size());
|
||||
return pipes;
|
||||
}
|
||||
|
||||
|
||||
BlockOutputStreamPtr StorageRabbitMQ::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context & context)
|
||||
{
|
||||
return std::make_shared<RabbitMQBlockOutputStream>(*this, metadata_snapshot, context);
|
||||
}
|
||||
|
||||
|
||||
void StorageRabbitMQ::startup()
|
||||
{
|
||||
for (size_t i = 0; i < num_consumers; ++i)
|
||||
{
|
||||
try
|
||||
{
|
||||
pushReadBuffer(createReadBuffer());
|
||||
++num_created_consumers;
|
||||
}
|
||||
catch (const AMQP::Exception & e)
|
||||
{
|
||||
std::cerr << e.what();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
streaming_task->activateAndSchedule();
|
||||
heartbeat_task->activateAndSchedule();
|
||||
}
|
||||
|
||||
|
||||
void StorageRabbitMQ::shutdown()
|
||||
{
|
||||
stream_cancelled = true;
|
||||
|
||||
event_handler->stop();
|
||||
|
||||
looping_task->deactivate();
|
||||
streaming_task->deactivate();
|
||||
heartbeat_task->deactivate();
|
||||
|
||||
for (size_t i = 0; i < num_created_consumers; ++i)
|
||||
{
|
||||
popReadBuffer();
|
||||
}
|
||||
|
||||
connection->close();
|
||||
}
|
||||
|
||||
|
||||
void StorageRabbitMQ::pushReadBuffer(ConsumerBufferPtr buffer)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
buffers.push_back(buffer);
|
||||
semaphore.set();
|
||||
}
|
||||
|
||||
|
||||
ConsumerBufferPtr StorageRabbitMQ::popReadBuffer()
|
||||
{
|
||||
return popReadBuffer(std::chrono::milliseconds::zero());
|
||||
}
|
||||
|
||||
|
||||
ConsumerBufferPtr StorageRabbitMQ::popReadBuffer(std::chrono::milliseconds timeout)
|
||||
{
|
||||
// Wait for the first free buffer
|
||||
if (timeout == std::chrono::milliseconds::zero())
|
||||
semaphore.wait();
|
||||
else
|
||||
{
|
||||
if (!semaphore.tryWait(timeout.count()))
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Take the first available buffer from the list
|
||||
std::lock_guard lock(mutex);
|
||||
auto buffer = buffers.back();
|
||||
buffers.pop_back();
|
||||
|
||||
return buffer;
|
||||
}
|
||||
|
||||
|
||||
ConsumerBufferPtr StorageRabbitMQ::createReadBuffer()
|
||||
{
|
||||
if (update_channel_id)
|
||||
next_channel_id += num_queues;
|
||||
update_channel_id = true;
|
||||
|
||||
ChannelPtr consumer_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
|
||||
|
||||
return std::make_shared<ReadBufferFromRabbitMQConsumer>(
|
||||
consumer_channel, event_handler, exchange_name, routing_keys,
|
||||
next_channel_id, log, row_delimiter, bind_by_id, num_queues,
|
||||
exchange_type, local_exchange_name, stream_cancelled);
|
||||
}
|
||||
|
||||
|
||||
ProducerBufferPtr StorageRabbitMQ::createWriteBuffer()
|
||||
{
|
||||
return std::make_shared<WriteBufferToRabbitMQProducer>(
|
||||
parsed_address, global_context, login_password, routing_keys[0], local_exchange_name,
|
||||
log, num_consumers * num_queues, bind_by_id, use_transactional_channel,
|
||||
row_delimiter ? std::optional<char>{row_delimiter} : std::nullopt, 1, 1024);
|
||||
}
|
||||
|
||||
|
||||
bool StorageRabbitMQ::checkDependencies(const StorageID & table_id)
|
||||
{
|
||||
// Check if all dependencies are attached
|
||||
auto dependencies = DatabaseCatalog::instance().getDependencies(table_id);
|
||||
if (dependencies.empty())
|
||||
return true;
|
||||
|
||||
// Check the dependencies are ready?
|
||||
for (const auto & db_tab : dependencies)
|
||||
{
|
||||
auto table = DatabaseCatalog::instance().tryGetTable(db_tab, global_context);
|
||||
if (!table)
|
||||
return false;
|
||||
|
||||
// If it materialized view, check it's target table
|
||||
auto * materialized_view = dynamic_cast<StorageMaterializedView *>(table.get());
|
||||
if (materialized_view && !materialized_view->tryGetTargetTable())
|
||||
return false;
|
||||
|
||||
// Check all its dependencies
|
||||
if (!checkDependencies(db_tab))
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
void StorageRabbitMQ::threadFunc()
|
||||
{
|
||||
try
|
||||
{
|
||||
auto table_id = getStorageID();
|
||||
// Check if at least one direct dependency is attached
|
||||
size_t dependencies_count = DatabaseCatalog::instance().getDependencies(table_id).size();
|
||||
|
||||
if (dependencies_count)
|
||||
{
|
||||
// Keep streaming as long as there are attached views and streaming is not cancelled
|
||||
while (!stream_cancelled && num_created_consumers > 0)
|
||||
{
|
||||
if (!checkDependencies(table_id))
|
||||
break;
|
||||
|
||||
LOG_DEBUG(log, "Started streaming to {} attached views", dependencies_count);
|
||||
|
||||
if (!streamToViews())
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
|
||||
/// Wait for attached views
|
||||
if (!stream_cancelled)
|
||||
streaming_task->schedule();
|
||||
}
|
||||
|
||||
|
||||
bool StorageRabbitMQ::streamToViews()
|
||||
{
|
||||
auto table_id = getStorageID();
|
||||
auto table = DatabaseCatalog::instance().getTable(table_id, global_context);
|
||||
if (!table)
|
||||
throw Exception("Engine table " + table_id.getNameForLogs() + " doesn't exist.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
// Create an INSERT query for streaming data
|
||||
auto insert = std::make_shared<ASTInsertQuery>();
|
||||
insert->table_id = table_id;
|
||||
|
||||
InterpreterInsertQuery interpreter(insert, rabbitmq_context, false, true, true);
|
||||
auto block_io = interpreter.execute();
|
||||
|
||||
// Create a stream for each consumer and join them in a union stream
|
||||
BlockInputStreams streams;
|
||||
streams.reserve(num_created_consumers);
|
||||
|
||||
auto metadata_snapshot = getInMemoryMetadataPtr();
|
||||
auto column_names = block_io.out->getHeader().getNames();
|
||||
auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
|
||||
for (size_t i = 0; i < num_created_consumers; ++i)
|
||||
{
|
||||
auto rabbit_stream = std::make_shared<RabbitMQBlockInputStream>(*this, metadata_snapshot, rabbitmq_context, column_names, log);
|
||||
auto converting_stream = std::make_shared<ConvertingBlockInputStream>(rabbit_stream, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Name);
|
||||
|
||||
streams.emplace_back(converting_stream);
|
||||
|
||||
// Limit read batch to maximum block size to allow DDL
|
||||
IBlockInputStream::LocalLimits limits;
|
||||
const Settings & settings = global_context.getSettingsRef();
|
||||
limits.speed_limits.max_execution_time = settings.stream_flush_interval_ms;
|
||||
limits.timeout_overflow_mode = OverflowMode::BREAK;
|
||||
rabbit_stream->setLimits(limits);
|
||||
}
|
||||
|
||||
if (!loop_started)
|
||||
{
|
||||
loop_started = true;
|
||||
looping_task->activateAndSchedule();
|
||||
}
|
||||
|
||||
// Join multiple streams if necessary
|
||||
BlockInputStreamPtr in;
|
||||
if (streams.size() > 1)
|
||||
in = std::make_shared<UnionBlockInputStream>(streams, nullptr, streams.size());
|
||||
else
|
||||
in = streams[0];
|
||||
|
||||
std::atomic<bool> stub = {false};
|
||||
copyData(*in, *block_io.out, &stub);
|
||||
|
||||
// Check whether the limits were applied during query execution
|
||||
bool limits_applied = false;
|
||||
const BlockStreamProfileInfo & info = in->getProfileInfo();
|
||||
limits_applied = info.hasAppliedLimit();
|
||||
|
||||
return limits_applied;
|
||||
}
|
||||
|
||||
|
||||
void registerStorageRabbitMQ(StorageFactory & factory)
|
||||
{
|
||||
auto creator_fn = [](const StorageFactory::Arguments & args)
|
||||
{
|
||||
ASTs & engine_args = args.engine_args;
|
||||
size_t args_count = engine_args.size();
|
||||
bool has_settings = args.storage_def->settings;
|
||||
|
||||
RabbitMQSettings rabbitmq_settings;
|
||||
if (has_settings)
|
||||
{
|
||||
rabbitmq_settings.loadFromQuery(*args.storage_def);
|
||||
}
|
||||
|
||||
String host_port = rabbitmq_settings.rabbitmq_host_port;
|
||||
if (args_count >= 1)
|
||||
{
|
||||
const auto * ast = engine_args[0]->as<ASTLiteral>();
|
||||
if (ast && ast->value.getType() == Field::Types::String)
|
||||
{
|
||||
host_port = safeGet<String>(ast->value);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw Exception(String("RabbitMQ host:port must be a string"), ErrorCodes::BAD_ARGUMENTS);
|
||||
}
|
||||
}
|
||||
|
||||
String routing_key_list = rabbitmq_settings.rabbitmq_routing_key_list.value;
|
||||
if (args_count >= 2)
|
||||
{
|
||||
engine_args[1] = evaluateConstantExpressionAsLiteral(engine_args[1], args.local_context);
|
||||
routing_key_list = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
}
|
||||
|
||||
Names routing_keys;
|
||||
boost::split(routing_keys, routing_key_list, [](char c){ return c == ','; });
|
||||
for (String & key : routing_keys)
|
||||
{
|
||||
boost::trim(key);
|
||||
}
|
||||
|
||||
String exchange = rabbitmq_settings.rabbitmq_exchange_name.value;
|
||||
if (args_count >= 3)
|
||||
{
|
||||
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context);
|
||||
|
||||
const auto * ast = engine_args[2]->as<ASTLiteral>();
|
||||
if (ast && ast->value.getType() == Field::Types::String)
|
||||
{
|
||||
exchange = safeGet<String>(ast->value);
|
||||
}
|
||||
}
|
||||
|
||||
String format = rabbitmq_settings.rabbitmq_format.value;
|
||||
if (args_count >= 4)
|
||||
{
|
||||
engine_args[3] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[3], args.local_context);
|
||||
|
||||
const auto * ast = engine_args[3]->as<ASTLiteral>();
|
||||
if (ast && ast->value.getType() == Field::Types::String)
|
||||
{
|
||||
format = safeGet<String>(ast->value);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw Exception("Format must be a string", ErrorCodes::BAD_ARGUMENTS);
|
||||
}
|
||||
}
|
||||
|
||||
char row_delimiter = rabbitmq_settings.rabbitmq_row_delimiter;
|
||||
if (args_count >= 5)
|
||||
{
|
||||
engine_args[4] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[4], args.local_context);
|
||||
|
||||
const auto * ast = engine_args[4]->as<ASTLiteral>();
|
||||
String arg;
|
||||
if (ast && ast->value.getType() == Field::Types::String)
|
||||
{
|
||||
arg = safeGet<String>(ast->value);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw Exception("Row delimiter must be a char", ErrorCodes::BAD_ARGUMENTS);
|
||||
}
|
||||
if (arg.size() > 1)
|
||||
{
|
||||
throw Exception("Row delimiter must be a char", ErrorCodes::BAD_ARGUMENTS);
|
||||
}
|
||||
else if (arg.empty())
|
||||
{
|
||||
row_delimiter = '\0';
|
||||
}
|
||||
else
|
||||
{
|
||||
row_delimiter = arg[0];
|
||||
}
|
||||
}
|
||||
|
||||
String exchange_type = rabbitmq_settings.rabbitmq_exchange_type.value;
|
||||
if (args_count >= 6)
|
||||
{
|
||||
engine_args[5] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[5], args.local_context);
|
||||
|
||||
const auto * ast = engine_args[5]->as<ASTLiteral>();
|
||||
if (ast && ast->value.getType() == Field::Types::String)
|
||||
{
|
||||
exchange_type = safeGet<String>(ast->value);
|
||||
}
|
||||
|
||||
if (exchange_type != "fanout" && exchange_type != "direct" && exchange_type != "topic"
|
||||
&& exchange_type != "headers" && exchange_type != "consistent_hash")
|
||||
throw Exception("Invalid exchange type", ErrorCodes::BAD_ARGUMENTS);
|
||||
}
|
||||
|
||||
UInt64 num_consumers = rabbitmq_settings.rabbitmq_num_consumers;
|
||||
if (args_count >= 7)
|
||||
{
|
||||
const auto * ast = engine_args[6]->as<ASTLiteral>();
|
||||
if (ast && ast->value.getType() == Field::Types::UInt64)
|
||||
{
|
||||
num_consumers = safeGet<UInt64>(ast->value);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw Exception("Number of consumers must be a positive integer", ErrorCodes::BAD_ARGUMENTS);
|
||||
}
|
||||
}
|
||||
|
||||
UInt64 num_queues = rabbitmq_settings.rabbitmq_num_queues;
|
||||
if (args_count >= 8)
|
||||
{
|
||||
const auto * ast = engine_args[7]->as<ASTLiteral>();
|
||||
if (ast && ast->value.getType() == Field::Types::UInt64)
|
||||
{
|
||||
num_consumers = safeGet<UInt64>(ast->value);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw Exception("Number of queues must be a positive integer", ErrorCodes::BAD_ARGUMENTS);
|
||||
}
|
||||
}
|
||||
|
||||
bool use_transactional_channel = static_cast<bool>(rabbitmq_settings.rabbitmq_transactional_channel);
|
||||
if (args_count >= 9)
|
||||
{
|
||||
const auto * ast = engine_args[8]->as<ASTLiteral>();
|
||||
if (ast && ast->value.getType() == Field::Types::UInt64)
|
||||
{
|
||||
use_transactional_channel = static_cast<bool>(safeGet<UInt64>(ast->value));
|
||||
}
|
||||
else
|
||||
{
|
||||
throw Exception("Transactional channel parameter is a bool", ErrorCodes::BAD_ARGUMENTS);
|
||||
}
|
||||
}
|
||||
|
||||
return StorageRabbitMQ::create(
|
||||
args.table_id, args.context, args.columns,
|
||||
host_port, routing_keys, exchange, format, row_delimiter, exchange_type, num_consumers,
|
||||
num_queues, use_transactional_channel);
|
||||
};
|
||||
|
||||
factory.registerStorage("RabbitMQ", creator_fn, StorageFactory::StorageFeatures{ .supports_settings = true, });
|
||||
|
||||
}
|
||||
|
||||
|
||||
NamesAndTypesList StorageRabbitMQ::getVirtuals() const
|
||||
{
|
||||
return NamesAndTypesList{
|
||||
{"_exchange", std::make_shared<DataTypeString>()}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
123
src/Storages/RabbitMQ/StorageRabbitMQ.h
Normal file
123
src/Storages/RabbitMQ/StorageRabbitMQ.h
Normal file
@ -0,0 +1,123 @@
|
||||
#pragma once
|
||||
|
||||
#include <Core/BackgroundSchedulePool.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Poco/Semaphore.h>
|
||||
#include <ext/shared_ptr_helper.h>
|
||||
#include <mutex>
|
||||
#include <atomic>
|
||||
#include <Storages/RabbitMQ/Buffer_fwd.h>
|
||||
#include <Storages/RabbitMQ/RabbitMQHandler.h>
|
||||
#include <amqpcpp/libuv.h>
|
||||
#include <uv.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
using ChannelPtr = std::shared_ptr<AMQP::TcpChannel>;
|
||||
|
||||
class StorageRabbitMQ final: public ext::shared_ptr_helper<StorageRabbitMQ>, public IStorage
|
||||
{
|
||||
friend struct ext::shared_ptr_helper<StorageRabbitMQ>;
|
||||
|
||||
public:
|
||||
std::string getName() const override { return "RabbitMQ"; }
|
||||
|
||||
bool supportsSettings() const override { return true; }
|
||||
bool noPushingToViews() const override { return true; }
|
||||
|
||||
void startup() override;
|
||||
void shutdown() override;
|
||||
|
||||
Pipes read(
|
||||
const Names & column_names,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams) override;
|
||||
|
||||
BlockOutputStreamPtr write(
|
||||
const ASTPtr & query,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const Context & context) override;
|
||||
|
||||
void pushReadBuffer(ConsumerBufferPtr buf);
|
||||
ConsumerBufferPtr popReadBuffer();
|
||||
ConsumerBufferPtr popReadBuffer(std::chrono::milliseconds timeout);
|
||||
|
||||
ProducerBufferPtr createWriteBuffer();
|
||||
|
||||
const String & getFormatName() const { return format_name; }
|
||||
NamesAndTypesList getVirtuals() const override;
|
||||
|
||||
|
||||
protected:
|
||||
StorageRabbitMQ(
|
||||
const StorageID & table_id_,
|
||||
Context & context_,
|
||||
const ColumnsDescription & columns_,
|
||||
const String & host_port_,
|
||||
const Names & routing_keys_,
|
||||
const String & exchange_name_,
|
||||
const String & format_name_,
|
||||
char row_delimiter_,
|
||||
const String & exchange_type_,
|
||||
size_t num_consumers_,
|
||||
size_t num_queues_,
|
||||
const bool use_transactional_channel_);
|
||||
|
||||
private:
|
||||
Context global_context;
|
||||
Context rabbitmq_context;
|
||||
|
||||
Names routing_keys;
|
||||
const String exchange_name;
|
||||
String local_exchange_name;
|
||||
|
||||
const String format_name;
|
||||
char row_delimiter;
|
||||
size_t num_consumers;
|
||||
size_t num_created_consumers = 0;
|
||||
bool bind_by_id;
|
||||
size_t num_queues;
|
||||
const String exchange_type;
|
||||
const bool use_transactional_channel;
|
||||
|
||||
Poco::Logger * log;
|
||||
std::pair<String, UInt16> parsed_address;
|
||||
std::pair<String, String> login_password;
|
||||
|
||||
std::shared_ptr<uv_loop_t> loop;
|
||||
std::shared_ptr<RabbitMQHandler> event_handler;
|
||||
std::shared_ptr<AMQP::TcpConnection> connection; /// Connection for all consumers
|
||||
|
||||
Poco::Semaphore semaphore;
|
||||
std::mutex mutex;
|
||||
std::vector<ConsumerBufferPtr> buffers; /// available buffers for RabbitMQ consumers
|
||||
|
||||
size_t next_channel_id = 1; /// Must >= 1 because it is used as a binding key, which has to be > 0
|
||||
bool update_channel_id = false;
|
||||
std::atomic<bool> loop_started = false;
|
||||
|
||||
BackgroundSchedulePool::TaskHolder streaming_task;
|
||||
BackgroundSchedulePool::TaskHolder heartbeat_task;
|
||||
BackgroundSchedulePool::TaskHolder looping_task;
|
||||
|
||||
std::atomic<bool> stream_cancelled{false};
|
||||
|
||||
ConsumerBufferPtr createReadBuffer();
|
||||
|
||||
void threadFunc();
|
||||
void heartbeatFunc();
|
||||
void loopingFunc();
|
||||
|
||||
void pingConnection() { connection->heartbeat(); }
|
||||
bool streamToViews();
|
||||
bool checkDependencies(const StorageID & table_id);
|
||||
};
|
||||
|
||||
}
|
230
src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp
Normal file
230
src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp
Normal file
@ -0,0 +1,230 @@
|
||||
#include <Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h>
|
||||
#include "Core/Block.h"
|
||||
#include "Columns/ColumnString.h"
|
||||
#include "Columns/ColumnsNumber.h"
|
||||
#include <common/logger_useful.h>
|
||||
#include <amqpcpp.h>
|
||||
#include <uv.h>
|
||||
#include <chrono>
|
||||
#include <thread>
|
||||
#include <atomic>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int CANNOT_CONNECT_RABBITMQ;
|
||||
}
|
||||
|
||||
static const auto QUEUE_SIZE = 50000;
|
||||
static const auto CONNECT_SLEEP = 200;
|
||||
static const auto RETRIES_MAX = 1000;
|
||||
static const auto LOOP_WAIT = 10;
|
||||
|
||||
WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
|
||||
std::pair<String, UInt16> & parsed_address,
|
||||
Context & global_context,
|
||||
const std::pair<String, String> & login_password_,
|
||||
const String & routing_key_,
|
||||
const String & exchange_,
|
||||
Poco::Logger * log_,
|
||||
size_t num_queues_,
|
||||
bool bind_by_id_,
|
||||
bool use_transactional_channel_,
|
||||
std::optional<char> delimiter,
|
||||
size_t rows_per_message,
|
||||
size_t chunk_size_)
|
||||
: WriteBuffer(nullptr, 0)
|
||||
, login_password(login_password_)
|
||||
, routing_key(routing_key_)
|
||||
, exchange_name(exchange_ + "_direct")
|
||||
, log(log_)
|
||||
, num_queues(num_queues_)
|
||||
, bind_by_id(bind_by_id_)
|
||||
, use_transactional_channel(use_transactional_channel_)
|
||||
, delim(delimiter)
|
||||
, max_rows(rows_per_message)
|
||||
, chunk_size(chunk_size_)
|
||||
, payloads(QUEUE_SIZE * num_queues)
|
||||
{
|
||||
|
||||
loop = std::make_unique<uv_loop_t>();
|
||||
uv_loop_init(loop.get());
|
||||
|
||||
event_handler = std::make_unique<RabbitMQHandler>(loop.get(), log);
|
||||
connection = std::make_unique<AMQP::TcpConnection>(event_handler.get(), AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login(login_password.first, login_password.second), "/"));
|
||||
|
||||
/* The reason behind making a separate connection for each concurrent producer is explained here:
|
||||
* https://github.com/CopernicaMarketingSoftware/AMQP-CPP/issues/128#issuecomment-300780086 - publishing from
|
||||
* different threads (as outputStreams are asynchronous) with the same connection leads to internal library errors.
|
||||
*/
|
||||
size_t cnt_retries = 0;
|
||||
while (!connection->ready() && ++cnt_retries != RETRIES_MAX)
|
||||
{
|
||||
event_handler->iterateLoop();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP));
|
||||
}
|
||||
|
||||
if (!connection->ready())
|
||||
{
|
||||
throw Exception("Cannot set up connection for producer", ErrorCodes::CANNOT_CONNECT_RABBITMQ);
|
||||
}
|
||||
|
||||
producer_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
|
||||
checkExchange();
|
||||
|
||||
/// If publishing should be wrapped in transactions
|
||||
if (use_transactional_channel)
|
||||
{
|
||||
producer_channel->startTransaction();
|
||||
}
|
||||
|
||||
writing_task = global_context.getSchedulePool().createTask("RabbitMQWritingTask", [this]{ writingFunc(); });
|
||||
writing_task->deactivate();
|
||||
}
|
||||
|
||||
|
||||
WriteBufferToRabbitMQProducer::~WriteBufferToRabbitMQProducer()
|
||||
{
|
||||
stop_loop.store(true);
|
||||
writing_task->deactivate();
|
||||
checkExchange();
|
||||
|
||||
connection->close();
|
||||
assert(rows == 0 && chunks.empty());
|
||||
}
|
||||
|
||||
|
||||
void WriteBufferToRabbitMQProducer::countRow()
|
||||
{
|
||||
if (++rows % max_rows == 0)
|
||||
{
|
||||
const std::string & last_chunk = chunks.back();
|
||||
size_t last_chunk_size = offset();
|
||||
|
||||
if (delim && last_chunk[last_chunk_size - 1] == delim)
|
||||
--last_chunk_size;
|
||||
|
||||
std::string payload;
|
||||
payload.reserve((chunks.size() - 1) * chunk_size + last_chunk_size);
|
||||
|
||||
for (auto i = chunks.begin(), e = --chunks.end(); i != e; ++i)
|
||||
payload.append(*i);
|
||||
|
||||
payload.append(last_chunk, 0, last_chunk_size);
|
||||
|
||||
rows = 0;
|
||||
chunks.clear();
|
||||
set(nullptr, 0);
|
||||
|
||||
payloads.push(payload);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void WriteBufferToRabbitMQProducer::writingFunc()
|
||||
{
|
||||
String payload;
|
||||
|
||||
while (!stop_loop || !payloads.empty())
|
||||
{
|
||||
while (!payloads.empty())
|
||||
{
|
||||
payloads.pop(payload);
|
||||
next_queue = next_queue % num_queues + 1;
|
||||
|
||||
if (bind_by_id)
|
||||
{
|
||||
producer_channel->publish(exchange_name, std::to_string(next_queue), payload);
|
||||
}
|
||||
else
|
||||
{
|
||||
producer_channel->publish(exchange_name, routing_key, payload);
|
||||
}
|
||||
}
|
||||
iterateEventLoop();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void WriteBufferToRabbitMQProducer::checkExchange()
|
||||
{
|
||||
std::atomic<bool> exchange_declared = false, exchange_error = false;
|
||||
|
||||
/// The AMQP::passive flag indicates that it should only be checked if there is a valid exchange with the given name.
|
||||
producer_channel->declareExchange(exchange_name, AMQP::direct, AMQP::passive)
|
||||
.onSuccess([&]()
|
||||
{
|
||||
exchange_declared = true;
|
||||
})
|
||||
.onError([&](const char * message)
|
||||
{
|
||||
exchange_error = true;
|
||||
LOG_ERROR(log, "Exchange for INSERT query was not declared. Reason: {}", message);
|
||||
});
|
||||
|
||||
/// These variables are updated in a separate thread and starting the loop blocks current thread
|
||||
while (!exchange_declared && !exchange_error)
|
||||
{
|
||||
iterateEventLoop();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void WriteBufferToRabbitMQProducer::finilizeProducer()
|
||||
{
|
||||
/// This will make sure everything is published
|
||||
checkExchange();
|
||||
|
||||
if (use_transactional_channel)
|
||||
{
|
||||
std::atomic<bool> answer_received = false, wait_rollback = false;
|
||||
producer_channel->commitTransaction()
|
||||
.onSuccess([&]()
|
||||
{
|
||||
answer_received = true;
|
||||
LOG_TRACE(log, "All messages were successfully published");
|
||||
})
|
||||
.onError([&](const char * message)
|
||||
{
|
||||
answer_received = true;
|
||||
wait_rollback = true;
|
||||
LOG_TRACE(log, "Publishing not successful: {}", message);
|
||||
producer_channel->rollbackTransaction()
|
||||
.onSuccess([&]()
|
||||
{
|
||||
wait_rollback = false;
|
||||
})
|
||||
.onError([&](const char * message)
|
||||
{
|
||||
LOG_ERROR(log, "Failed to rollback transaction: {}", message);
|
||||
wait_rollback = false;
|
||||
});
|
||||
});
|
||||
|
||||
size_t count_retries = 0;
|
||||
while ((!answer_received || wait_rollback) && ++count_retries != RETRIES_MAX)
|
||||
{
|
||||
iterateEventLoop();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(LOOP_WAIT));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void WriteBufferToRabbitMQProducer::nextImpl()
|
||||
{
|
||||
chunks.push_back(std::string());
|
||||
chunks.back().resize(chunk_size);
|
||||
set(chunks.back().data(), chunk_size);
|
||||
}
|
||||
|
||||
|
||||
void WriteBufferToRabbitMQProducer::iterateEventLoop()
|
||||
{
|
||||
event_handler->iterateLoop();
|
||||
}
|
||||
|
||||
}
|
76
src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h
Normal file
76
src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h
Normal file
@ -0,0 +1,76 @@
|
||||
#pragma once
|
||||
|
||||
#include <IO/WriteBuffer.h>
|
||||
#include <Columns/IColumn.h>
|
||||
#include <list>
|
||||
#include <mutex>
|
||||
#include <atomic>
|
||||
#include <amqpcpp.h>
|
||||
#include <Storages/RabbitMQ/RabbitMQHandler.h>
|
||||
#include <Common/ConcurrentBoundedQueue.h>
|
||||
#include <Core/BackgroundSchedulePool.h>
|
||||
#include <Interpreters/Context.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
using ChannelPtr = std::shared_ptr<AMQP::TcpChannel>;
|
||||
|
||||
class WriteBufferToRabbitMQProducer : public WriteBuffer
|
||||
{
|
||||
public:
|
||||
WriteBufferToRabbitMQProducer(
|
||||
std::pair<String, UInt16> & parsed_address,
|
||||
Context & global_context,
|
||||
const std::pair<String, String> & login_password_,
|
||||
const String & routing_key_,
|
||||
const String & exchange_,
|
||||
Poco::Logger * log_,
|
||||
size_t num_queues_,
|
||||
bool bind_by_id_,
|
||||
bool use_transactional_channel_,
|
||||
std::optional<char> delimiter,
|
||||
size_t rows_per_message,
|
||||
size_t chunk_size_
|
||||
);
|
||||
|
||||
~WriteBufferToRabbitMQProducer() override;
|
||||
|
||||
void countRow();
|
||||
void activateWriting() { writing_task->activateAndSchedule(); }
|
||||
|
||||
private:
|
||||
void nextImpl() override;
|
||||
void checkExchange();
|
||||
void iterateEventLoop();
|
||||
void writingFunc();
|
||||
void finilizeProducer();
|
||||
|
||||
const std::pair<String, String> login_password;
|
||||
const String routing_key;
|
||||
const String exchange_name;
|
||||
const bool bind_by_id;
|
||||
const size_t num_queues;
|
||||
const bool use_transactional_channel;
|
||||
|
||||
BackgroundSchedulePool::TaskHolder writing_task;
|
||||
std::atomic<bool> stop_loop = false;
|
||||
|
||||
std::unique_ptr<uv_loop_t> loop;
|
||||
std::unique_ptr<RabbitMQHandler> event_handler;
|
||||
std::unique_ptr<AMQP::TcpConnection> connection;
|
||||
ChannelPtr producer_channel;
|
||||
|
||||
ConcurrentBoundedQueue<String> payloads;
|
||||
size_t next_queue = 0;
|
||||
|
||||
Poco::Logger * log;
|
||||
const std::optional<char> delim;
|
||||
const size_t max_rows;
|
||||
const size_t chunk_size;
|
||||
size_t count_mes = 0;
|
||||
size_t rows = 0;
|
||||
std::list<std::string> chunks;
|
||||
};
|
||||
|
||||
}
|
@ -48,6 +48,10 @@ void registerStorages()
|
||||
#if USE_RDKAFKA
|
||||
registerStorageKafka(factory);
|
||||
#endif
|
||||
|
||||
#if USE_AMQPCPP
|
||||
registerStorageRabbitMQ(factory);
|
||||
#endif
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -49,6 +49,10 @@ void registerStorageMongoDB(StorageFactory & factory);
|
||||
void registerStorageKafka(StorageFactory & factory);
|
||||
#endif
|
||||
|
||||
#if USE_AMQPCPP
|
||||
void registerStorageRabbitMQ(StorageFactory & factory);
|
||||
#endif
|
||||
|
||||
void registerStorages();
|
||||
|
||||
}
|
||||
|
@ -109,6 +109,7 @@ class ClickHouseCluster:
|
||||
self.base_zookeeper_cmd = None
|
||||
self.base_mysql_cmd = []
|
||||
self.base_kafka_cmd = []
|
||||
self.base_rabbitmq_cmd = []
|
||||
self.base_cassandra_cmd = []
|
||||
self.pre_zookeeper_commands = []
|
||||
self.instances = {}
|
||||
@ -116,6 +117,7 @@ class ClickHouseCluster:
|
||||
self.with_mysql = False
|
||||
self.with_postgres = False
|
||||
self.with_kafka = False
|
||||
self.with_rabbitmq = False
|
||||
self.with_odbc_drivers = False
|
||||
self.with_hdfs = False
|
||||
self.with_mongo = False
|
||||
@ -148,7 +150,7 @@ class ClickHouseCluster:
|
||||
return cmd
|
||||
|
||||
def add_instance(self, name, config_dir=None, main_configs=None, user_configs=None, macros=None,
|
||||
with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None,
|
||||
with_zookeeper=False, with_mysql=False, with_kafka=False, with_rabbitmq=False, clickhouse_path_dir=None,
|
||||
with_odbc_drivers=False, with_postgres=False, with_hdfs=False, with_mongo=False,
|
||||
with_redis=False, with_minio=False, with_cassandra=False,
|
||||
hostname=None, env_variables=None, image="yandex/clickhouse-integration-test",
|
||||
@ -172,7 +174,7 @@ class ClickHouseCluster:
|
||||
instance = ClickHouseInstance(
|
||||
self, self.base_dir, name, config_dir, main_configs or [], user_configs or [], macros or {},
|
||||
with_zookeeper,
|
||||
self.zookeeper_config_path, with_mysql, with_kafka, with_mongo, with_redis, with_minio, with_cassandra,
|
||||
self.zookeeper_config_path, with_mysql, with_kafka, with_rabbitmq, with_mongo, with_redis, with_minio, with_cassandra,
|
||||
self.base_configs_dir, self.server_bin_path,
|
||||
self.odbc_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers, hostname=hostname,
|
||||
env_variables=env_variables or {}, image=image, stay_alive=stay_alive, ipv4_address=ipv4_address,
|
||||
@ -237,6 +239,13 @@ class ClickHouseCluster:
|
||||
self.project_name, '--file', p.join(DOCKER_COMPOSE_DIR, 'docker_compose_kafka.yml')]
|
||||
cmds.append(self.base_kafka_cmd)
|
||||
|
||||
if with_rabbitmq and not self.with_rabbitmq:
|
||||
self.with_rabbitmq = True
|
||||
self.base_cmd.extend(['--file', p.join(DOCKER_COMPOSE_DIR, 'docker_compose_rabbitmq.yml')])
|
||||
self.base_rabbitmq_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
|
||||
self.project_name, '--file', p.join(DOCKER_COMPOSE_DIR, 'docker_compose_rabbitmq.yml')]
|
||||
cmds.append(self.base_rabbitmq_cmd)
|
||||
|
||||
if with_hdfs and not self.with_hdfs:
|
||||
self.with_hdfs = True
|
||||
self.base_cmd.extend(['--file', p.join(DOCKER_COMPOSE_DIR, 'docker_compose_hdfs.yml')])
|
||||
@ -529,6 +538,10 @@ class ClickHouseCluster:
|
||||
self.kafka_docker_id = self.get_instance_docker_id('kafka1')
|
||||
self.wait_schema_registry_to_start(120)
|
||||
|
||||
if self.with_rabbitmq and self.base_rabbitmq_cmd:
|
||||
subprocess_check_call(self.base_rabbitmq_cmd + common_opts + ['--renew-anon-volumes'])
|
||||
self.rabbitmq_docker_id = self.get_instance_docker_id('rabbitmq1')
|
||||
|
||||
if self.with_hdfs and self.base_hdfs_cmd:
|
||||
subprocess_check_call(self.base_hdfs_cmd + common_opts)
|
||||
self.wait_hdfs_to_start(120)
|
||||
@ -681,7 +694,7 @@ class ClickHouseInstance:
|
||||
|
||||
def __init__(
|
||||
self, cluster, base_path, name, custom_config_dir, custom_main_configs, custom_user_configs, macros,
|
||||
with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_mongo, with_redis, with_minio, with_cassandra,
|
||||
with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_rabbitmq, with_mongo, with_redis, with_minio, with_cassandra,
|
||||
base_configs_dir, server_bin_path, odbc_bridge_bin_path,
|
||||
clickhouse_path_dir, with_odbc_drivers, hostname=None, env_variables=None,
|
||||
image="yandex/clickhouse-integration-test",
|
||||
@ -708,6 +721,7 @@ class ClickHouseInstance:
|
||||
|
||||
self.with_mysql = with_mysql
|
||||
self.with_kafka = with_kafka
|
||||
self.with_rabbitmq = with_rabbitmq
|
||||
self.with_mongo = with_mongo
|
||||
self.with_redis = with_redis
|
||||
self.with_minio = with_minio
|
||||
@ -1058,6 +1072,9 @@ class ClickHouseInstance:
|
||||
depends_on.append("kafka1")
|
||||
depends_on.append("schema-registry")
|
||||
|
||||
if self.with_rabbitmq:
|
||||
depends_on.append("rabbitmq1")
|
||||
|
||||
if self.with_zookeeper:
|
||||
depends_on.append("zoo1")
|
||||
depends_on.append("zoo2")
|
||||
@ -1137,3 +1154,4 @@ class ClickHouseKiller(object):
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.clickhouse_node.restore_clickhouse()
|
||||
|
||||
|
0
tests/integration/test_storage_rabbitmq/__init__.py
Normal file
0
tests/integration/test_storage_rabbitmq/__init__.py
Normal file
11
tests/integration/test_storage_rabbitmq/configs/log_conf.xml
Normal file
11
tests/integration/test_storage_rabbitmq/configs/log_conf.xml
Normal file
@ -0,0 +1,11 @@
|
||||
<yandex>
|
||||
<logger>
|
||||
<level>trace</level>
|
||||
<log>/var/log/clickhouse-server/log.log</log>
|
||||
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
|
||||
<size>1000M</size>
|
||||
<count>10</count>
|
||||
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
|
||||
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
|
||||
</logger>
|
||||
</yandex>
|
@ -0,0 +1,6 @@
|
||||
<yandex>
|
||||
<rabbitmq>
|
||||
<username>root</username>
|
||||
<password>clickhouse</password>
|
||||
</rabbitmq>
|
||||
</yandex>
|
1532
tests/integration/test_storage_rabbitmq/test.py
Normal file
1532
tests/integration/test_storage_rabbitmq/test.py
Normal file
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,50 @@
|
||||
0 0
|
||||
1 1
|
||||
2 2
|
||||
3 3
|
||||
4 4
|
||||
5 5
|
||||
6 6
|
||||
7 7
|
||||
8 8
|
||||
9 9
|
||||
10 10
|
||||
11 11
|
||||
12 12
|
||||
13 13
|
||||
14 14
|
||||
15 15
|
||||
16 16
|
||||
17 17
|
||||
18 18
|
||||
19 19
|
||||
20 20
|
||||
21 21
|
||||
22 22
|
||||
23 23
|
||||
24 24
|
||||
25 25
|
||||
26 26
|
||||
27 27
|
||||
28 28
|
||||
29 29
|
||||
30 30
|
||||
31 31
|
||||
32 32
|
||||
33 33
|
||||
34 34
|
||||
35 35
|
||||
36 36
|
||||
37 37
|
||||
38 38
|
||||
39 39
|
||||
40 40
|
||||
41 41
|
||||
42 42
|
||||
43 43
|
||||
44 44
|
||||
45 45
|
||||
46 46
|
||||
47 47
|
||||
48 48
|
||||
49 49
|
Loading…
Reference in New Issue
Block a user