RabbitMQ: respect flushing data by flush interval and by block size

This commit is contained in:
kssenii 2022-12-19 21:07:44 +01:00
parent 980d5ce289
commit a0c8d34775
7 changed files with 271 additions and 33 deletions

View File

@ -24,7 +24,7 @@ namespace DB
M(String, rabbitmq_address, "", "Address for connection", 0) \
M(UInt64, rabbitmq_skip_broken_messages, 0, "Skip at least this number of broken messages from RabbitMQ per block", 0) \
M(UInt64, rabbitmq_max_block_size, 0, "Number of row collected before flushing data from RabbitMQ.", 0) \
M(Milliseconds, rabbitmq_flush_interval_ms, 0, "Timeout for flushing data from RabbitMQ.", 0) \
M(UInt64, rabbitmq_flush_interval_ms, 0, "Timeout for flushing data from RabbitMQ.", 0) \
M(String, rabbitmq_vhost, "/", "RabbitMQ vhost.", 0) \
M(String, rabbitmq_queue_settings_list, "", "A list of rabbitmq queue settings", 0) \
M(Bool, rabbitmq_queue_consume, false, "Use user-defined queues and do not make any RabbitMQ setup: declaring exchanges, queues, bindings", 0) \

View File

@ -4,6 +4,7 @@
#include <Interpreters/Context.h>
#include <Processors/Executors/StreamingFormatExecutor.h>
#include <Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h>
#include <Common/logger_useful.h>
namespace DB
{
@ -61,6 +62,7 @@ RabbitMQSource::RabbitMQSource(
, ack_in_suffix(ack_in_suffix_)
, non_virtual_header(std::move(headers.first))
, virtual_header(std::move(headers.second))
, log(&Poco::Logger::get("RabbitMQSource"))
{
storage.incrementReader();
}
@ -106,17 +108,15 @@ Chunk RabbitMQSource::generate()
return chunk;
}
bool RabbitMQSource::checkTimeLimit() const
bool RabbitMQSource::isTimeLimitExceeded() const
{
if (max_execution_time != 0)
if (max_execution_time_ms != 0)
{
auto elapsed_ns = total_stopwatch.elapsed();
if (elapsed_ns > static_cast<UInt64>(max_execution_time.totalMicroseconds()) * 1000)
return false;
uint64_t elapsed_time_ms = total_stopwatch.elapsedMilliseconds();
return max_execution_time_ms <= elapsed_time_ms;
}
return true;
return false;
}
Chunk RabbitMQSource::generateImpl()
@ -127,9 +127,11 @@ Chunk RabbitMQSource::generateImpl()
buffer = storage.popReadBuffer(timeout);
}
if (!buffer || is_finished)
if (is_finished || !buffer || buffer->isConsumerStopped())
return {};
/// Currently it is one time usage source: to make sure data is flushed
/// strictly by timeout or by block size.
is_finished = true;
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
@ -137,15 +139,17 @@ Chunk RabbitMQSource::generateImpl()
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size);
StreamingFormatExecutor executor(non_virtual_header, input_format);
size_t total_rows = 0;
buffer->allowNext();
while (true)
{
if (buffer->queueEmpty())
break;
size_t new_rows = 0;
auto new_rows = executor.execute();
if (!buffer->hasPendingMessages())
{
new_rows = executor.execute();
}
if (new_rows)
{
@ -168,15 +172,20 @@ Chunk RabbitMQSource::generateImpl()
virtual_columns[5]->insert(timestamp);
}
total_rows = total_rows + new_rows;
total_rows += new_rows;
}
buffer->allowNext();
if (total_rows >= max_block_size || buffer->queueEmpty() || buffer->isConsumerStopped() || !checkTimeLimit())
if (total_rows >= max_block_size || buffer->isConsumerStopped() || isTimeLimitExceeded())
break;
buffer->allowNext();
}
LOG_TEST(
log,
"Flushing {} rows (max block size: {}, time: {} / {} ms)",
total_rows, max_block_size, total_stopwatch.elapsedMilliseconds(), max_execution_time_ms);
if (total_rows == 0)
return {};

View File

@ -27,13 +27,12 @@ public:
Chunk generate() override;
bool queueEmpty() const { return !buffer || buffer->queueEmpty(); }
bool queueEmpty() const { return !buffer || buffer->hasPendingMessages(); }
bool needChannelUpdate();
void updateChannel();
bool sendAck();
void setTimeLimit(Poco::Timespan max_execution_time_) { max_execution_time = max_execution_time_; }
void setTimeLimit(uint64_t max_execution_time_ms_) { max_execution_time_ms = max_execution_time_ms_; }
private:
StorageRabbitMQ & storage;
@ -47,12 +46,14 @@ private:
const Block non_virtual_header;
const Block virtual_header;
Poco::Logger * log;
ConsumerBufferPtr buffer;
Poco::Timespan max_execution_time = 0;
uint64_t max_execution_time_ms = 0;
Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE};
bool checkTimeLimit() const;
bool isTimeLimitExceeded() const;
RabbitMQSource(
StorageRabbitMQ & storage_,

View File

@ -183,8 +183,8 @@ bool ReadBufferFromRabbitMQConsumer::nextImpl()
{
auto * new_position = const_cast<char *>(current.message.data());
BufferBase::set(new_position, current.message.size(), 0);
allowed = false;
allowed = false;
return true;
}

View File

@ -67,8 +67,9 @@ public:
bool ackMessages();
void updateAckTracker(AckTracker record = AckTracker());
bool queueEmpty() { return received.empty(); }
void allowNext() { allowed = true; } // Allow to read next message.
bool hasPendingMessages() { return received.empty(); }
void allowNext() { allowed = true; }
auto getChannelID() const { return current.track.channel_id; }
auto getDeliveryTag() const { return current.track.delivery_tag; }
@ -89,7 +90,6 @@ private:
const size_t channel_id_base;
Poco::Logger * log;
char row_delimiter;
bool allowed = true;
const std::atomic<bool> & stopped;
String channel_id;
@ -100,6 +100,8 @@ private:
AckTracker last_inserted_record_info;
UInt64 prev_tag = 0, channel_id_counter = 0;
bool allowed = false;
};
}

View File

@ -715,6 +715,12 @@ void StorageRabbitMQ::read(
auto rabbit_source = std::make_shared<RabbitMQSource>(
*this, storage_snapshot, modified_context, column_names, 1, rabbitmq_settings->rabbitmq_commit_on_select);
uint64_t max_execution_time_ms = rabbitmq_settings->rabbitmq_flush_interval_ms.changed
? rabbitmq_settings->rabbitmq_flush_interval_ms
: (static_cast<UInt64>(getContext()->getSettingsRef().stream_flush_interval_ms) * 1000);
rabbit_source->setTimeLimit(max_execution_time_ms);
auto converting_dag = ActionsDAG::makeConvertingActions(
rabbit_source->getPort().getHeader().getColumnsWithTypeAndName(),
sample_block.getColumnsWithTypeAndName(),
@ -1075,14 +1081,15 @@ bool StorageRabbitMQ::streamToViews()
{
auto source = std::make_shared<RabbitMQSource>(
*this, storage_snapshot, rabbitmq_context, column_names, block_size, false);
uint64_t max_execution_time_ms = rabbitmq_settings->rabbitmq_flush_interval_ms.changed
? rabbitmq_settings->rabbitmq_flush_interval_ms
: (static_cast<UInt64>(getContext()->getSettingsRef().stream_flush_interval_ms) * 1000);
source->setTimeLimit(max_execution_time_ms);
sources.emplace_back(source);
pipes.emplace_back(source);
Poco::Timespan max_execution_time = rabbitmq_settings->rabbitmq_flush_interval_ms.changed
? rabbitmq_settings->rabbitmq_flush_interval_ms
: getContext()->getSettingsRef().stream_flush_interval_ms;
source->setTimeLimit(max_execution_time);
}
block_io.pipeline.complete(Pipe::unitePipes(std::move(pipes)));

View File

@ -153,6 +153,7 @@ def test_rabbitmq_select_empty(rabbitmq_cluster):
rabbitmq_exchange_name = 'empty',
rabbitmq_commit_on_select = 1,
rabbitmq_format = 'TSV',
rabbitmq_flush_interval_ms=1000,
rabbitmq_row_delimiter = '\\n';
""".format(
rabbitmq_cluster.rabbitmq_host
@ -169,6 +170,7 @@ def test_rabbitmq_json_without_delimiter(rabbitmq_cluster):
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = '{}:5672',
rabbitmq_commit_on_select = 1,
rabbitmq_flush_interval_ms=1000,
rabbitmq_exchange_name = 'json',
rabbitmq_format = 'JSONEachRow'
""".format(
@ -221,6 +223,7 @@ def test_rabbitmq_csv_with_delimiter(rabbitmq_cluster):
rabbitmq_exchange_name = 'csv',
rabbitmq_commit_on_select = 1,
rabbitmq_format = 'CSV',
rabbitmq_flush_interval_ms=1000,
rabbitmq_row_delimiter = '\\n';
"""
)
@ -262,6 +265,7 @@ def test_rabbitmq_tsv_with_delimiter(rabbitmq_cluster):
rabbitmq_exchange_name = 'tsv',
rabbitmq_format = 'TSV',
rabbitmq_commit_on_select = 1,
rabbitmq_flush_interval_ms=1000,
rabbitmq_queue_base = 'tsv',
rabbitmq_row_delimiter = '\\n';
CREATE TABLE test.view (key UInt64, value UInt64)
@ -303,6 +307,7 @@ def test_rabbitmq_macros(rabbitmq_cluster):
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = '{rabbitmq_host}:{rabbitmq_port}',
rabbitmq_commit_on_select = 1,
rabbitmq_flush_interval_ms=1000,
rabbitmq_exchange_name = '{rabbitmq_exchange_name}',
rabbitmq_format = '{rabbitmq_format}'
"""
@ -342,6 +347,7 @@ def test_rabbitmq_materialized_view(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'mv',
rabbitmq_format = 'JSONEachRow',
rabbitmq_flush_interval_ms=1000,
rabbitmq_row_delimiter = '\\n';
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree()
@ -398,6 +404,7 @@ def test_rabbitmq_materialized_view_with_subquery(rabbitmq_cluster):
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'mvsq',
rabbitmq_flush_interval_ms=1000,
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
CREATE TABLE test.view (key UInt64, value UInt64)
@ -441,6 +448,7 @@ def test_rabbitmq_many_materialized_views(rabbitmq_cluster):
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'mmv',
rabbitmq_flush_interval_ms=1000,
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
CREATE TABLE test.view1 (key UInt64, value UInt64)
@ -498,6 +506,7 @@ def test_rabbitmq_protobuf(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'pb',
rabbitmq_format = 'Protobuf',
rabbitmq_flush_interval_ms=1000,
rabbitmq_schema = 'rabbitmq.proto:KeyValueProto';
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree()
@ -572,6 +581,7 @@ def test_rabbitmq_big_message(rabbitmq_cluster):
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'big',
rabbitmq_flush_interval_ms=1000,
rabbitmq_format = 'JSONEachRow';
CREATE TABLE test.view (key UInt64, value String)
ENGINE = MergeTree
@ -608,6 +618,7 @@ def test_rabbitmq_sharding_between_queues_publish(rabbitmq_cluster):
rabbitmq_exchange_name = 'test_sharding',
rabbitmq_num_queues = 10,
rabbitmq_num_consumers = 10,
rabbitmq_flush_interval_ms=1000,
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
CREATE TABLE test.view (key UInt64, value UInt64, channel_id String)
@ -685,6 +696,7 @@ def test_rabbitmq_mv_combo(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'combo',
rabbitmq_queue_base = 'combo',
rabbitmq_flush_interval_ms=1000,
rabbitmq_num_consumers = 2,
rabbitmq_num_queues = 5,
rabbitmq_format = 'JSONEachRow',
@ -778,6 +790,7 @@ def test_rabbitmq_insert(rabbitmq_cluster):
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'insert',
rabbitmq_flush_interval_ms=1000,
rabbitmq_exchange_type = 'direct',
rabbitmq_routing_key_list = 'insert1',
rabbitmq_format = 'TSV',
@ -835,6 +848,7 @@ def test_rabbitmq_insert_headers_exchange(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'insert_headers',
rabbitmq_exchange_type = 'headers',
rabbitmq_flush_interval_ms=1000,
rabbitmq_routing_key_list = 'test=insert,topic=headers',
rabbitmq_format = 'TSV',
rabbitmq_row_delimiter = '\\n';
@ -901,6 +915,7 @@ def test_rabbitmq_many_inserts(rabbitmq_cluster):
rabbitmq_exchange_name = 'many_inserts',
rabbitmq_exchange_type = 'direct',
rabbitmq_routing_key_list = 'insert2',
rabbitmq_flush_interval_ms=1000,
rabbitmq_format = 'TSV',
rabbitmq_row_delimiter = '\\n';
CREATE TABLE test.rabbitmq_consume (key UInt64, value UInt64)
@ -909,6 +924,7 @@ def test_rabbitmq_many_inserts(rabbitmq_cluster):
rabbitmq_exchange_name = 'many_inserts',
rabbitmq_exchange_type = 'direct',
rabbitmq_routing_key_list = 'insert2',
rabbitmq_flush_interval_ms=1000,
rabbitmq_format = 'TSV',
rabbitmq_row_delimiter = '\\n';
"""
@ -988,6 +1004,7 @@ def test_rabbitmq_overloaded_insert(rabbitmq_cluster):
rabbitmq_queue_base = 'over',
rabbitmq_exchange_type = 'direct',
rabbitmq_num_consumers = 5,
rabbitmq_flush_interval_ms=1000,
rabbitmq_num_queues = 10,
rabbitmq_max_block_size = 10000,
rabbitmq_routing_key_list = 'over',
@ -999,6 +1016,7 @@ def test_rabbitmq_overloaded_insert(rabbitmq_cluster):
rabbitmq_exchange_name = 'over',
rabbitmq_exchange_type = 'direct',
rabbitmq_routing_key_list = 'over',
rabbitmq_flush_interval_ms=1000,
rabbitmq_format = 'TSV',
rabbitmq_row_delimiter = '\\n';
CREATE TABLE test.view_overload (key UInt64, value UInt64)
@ -1084,6 +1102,7 @@ def test_rabbitmq_direct_exchange(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_num_consumers = 2,
rabbitmq_num_queues = 2,
rabbitmq_flush_interval_ms=1000,
rabbitmq_exchange_name = 'direct_exchange_testing',
rabbitmq_exchange_type = 'direct',
rabbitmq_routing_key_list = 'direct_{0}',
@ -1175,6 +1194,7 @@ def test_rabbitmq_fanout_exchange(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_num_consumers = 2,
rabbitmq_num_queues = 2,
rabbitmq_flush_interval_ms=1000,
rabbitmq_routing_key_list = 'key_{0}',
rabbitmq_exchange_name = 'fanout_exchange_testing',
rabbitmq_exchange_type = 'fanout',
@ -1261,6 +1281,7 @@ def test_rabbitmq_topic_exchange(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_num_consumers = 2,
rabbitmq_num_queues = 2,
rabbitmq_flush_interval_ms=1000,
rabbitmq_exchange_name = 'topic_exchange_testing',
rabbitmq_exchange_type = 'topic',
rabbitmq_routing_key_list = '*.{0}',
@ -1284,6 +1305,7 @@ def test_rabbitmq_topic_exchange(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_num_consumers = 2,
rabbitmq_num_queues = 2,
rabbitmq_flush_interval_ms=1000,
rabbitmq_exchange_name = 'topic_exchange_testing',
rabbitmq_exchange_type = 'topic',
rabbitmq_routing_key_list = '*.logs',
@ -1385,6 +1407,7 @@ def test_rabbitmq_hash_exchange(rabbitmq_cluster):
rabbitmq_exchange_type = 'consistent_hash',
rabbitmq_exchange_name = 'hash_exchange_testing',
rabbitmq_format = 'JSONEachRow',
rabbitmq_flush_interval_ms=1000,
rabbitmq_row_delimiter = '\\n';
CREATE MATERIALIZED VIEW test.{0}_mv TO test.destination AS
SELECT key, value, _channel_id AS channel_id FROM test.{0};
@ -1482,6 +1505,7 @@ def test_rabbitmq_multiple_bindings(rabbitmq_cluster):
rabbitmq_exchange_name = 'multiple_bindings_testing',
rabbitmq_exchange_type = 'direct',
rabbitmq_routing_key_list = 'key1,key2,key3,key4,key5',
rabbitmq_flush_interval_ms=1000,
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
CREATE MATERIALIZED VIEW test.bindings_mv TO test.destination AS
@ -1571,6 +1595,7 @@ def test_rabbitmq_headers_exchange(rabbitmq_cluster):
rabbitmq_num_consumers = 2,
rabbitmq_exchange_name = 'headers_exchange_testing',
rabbitmq_exchange_type = 'headers',
rabbitmq_flush_interval_ms=1000,
rabbitmq_routing_key_list = 'x-match=all,format=logs,type=report,year=2020',
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
@ -1595,6 +1620,7 @@ def test_rabbitmq_headers_exchange(rabbitmq_cluster):
rabbitmq_exchange_type = 'headers',
rabbitmq_routing_key_list = 'x-match=all,format=logs,type=report,year=2019',
rabbitmq_format = 'JSONEachRow',
rabbitmq_flush_interval_ms=1000,
rabbitmq_row_delimiter = '\\n';
CREATE MATERIALIZED VIEW test.headers_exchange_{0}_mv TO test.destination AS
SELECT key, value FROM test.headers_exchange_{0};
@ -1667,6 +1693,7 @@ def test_rabbitmq_virtual_columns(rabbitmq_cluster):
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'virtuals',
rabbitmq_flush_interval_ms=1000,
rabbitmq_format = 'JSONEachRow';
CREATE MATERIALIZED VIEW test.view Engine=Log AS
SELECT value, key, _exchange_name, _channel_id, _delivery_tag, _redelivered FROM test.rabbitmq_virtuals;
@ -1735,6 +1762,7 @@ def test_rabbitmq_virtual_columns_with_materialized_view(rabbitmq_cluster):
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'virtuals_mv',
rabbitmq_flush_interval_ms=1000,
rabbitmq_format = 'JSONEachRow';
CREATE TABLE test.view (key UInt64, value UInt64,
exchange_name String, channel_id String, delivery_tag UInt64, redelivered UInt8) ENGINE = MergeTree()
@ -1820,6 +1848,7 @@ def test_rabbitmq_many_consumers_to_each_queue(rabbitmq_cluster):
rabbitmq_exchange_name = 'many_consumers',
rabbitmq_num_queues = 2,
rabbitmq_num_consumers = 2,
rabbitmq_flush_interval_ms=1000,
rabbitmq_queue_base = 'many_consumers',
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
@ -1909,6 +1938,7 @@ def test_rabbitmq_restore_failed_connection_without_losses_1(rabbitmq_cluster):
CREATE TABLE test.consume (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_flush_interval_ms=1000,
rabbitmq_exchange_name = 'producer_reconnect',
rabbitmq_format = 'JSONEachRow',
rabbitmq_num_consumers = 2,
@ -1921,6 +1951,7 @@ def test_rabbitmq_restore_failed_connection_without_losses_1(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'producer_reconnect',
rabbitmq_persistent = '1',
rabbitmq_flush_interval_ms=1000,
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
"""
@ -1984,6 +2015,7 @@ def test_rabbitmq_restore_failed_connection_without_losses_2(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'consumer_reconnect',
rabbitmq_num_consumers = 10,
rabbitmq_flush_interval_ms=1000,
rabbitmq_num_queues = 10,
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
@ -2064,6 +2096,7 @@ def test_rabbitmq_commit_on_block_write(rabbitmq_cluster):
rabbitmq_exchange_name = 'block',
rabbitmq_format = 'JSONEachRow',
rabbitmq_queue_base = 'block',
rabbitmq_flush_interval_ms=1000,
rabbitmq_max_block_size = 100,
rabbitmq_row_delimiter = '\\n';
CREATE TABLE test.view (key UInt64, value UInt64)
@ -2144,6 +2177,7 @@ def test_rabbitmq_no_connection_at_startup_1(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'cs',
rabbitmq_format = 'JSONEachRow',
rabbitmq_flush_interval_ms=1000,
rabbitmq_num_consumers = '5',
rabbitmq_row_delimiter = '\\n';
"""
@ -2160,6 +2194,7 @@ def test_rabbitmq_no_connection_at_startup_2(rabbitmq_cluster):
rabbitmq_exchange_name = 'cs',
rabbitmq_format = 'JSONEachRow',
rabbitmq_num_consumers = '5',
rabbitmq_flush_interval_ms=1000,
rabbitmq_row_delimiter = '\\n';
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree
@ -2216,6 +2251,7 @@ def test_rabbitmq_format_factory_settings(rabbitmq_cluster):
) ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'format_settings',
rabbitmq_flush_interval_ms=1000,
rabbitmq_format = 'JSONEachRow',
date_time_input_format = 'best_effort';
"""
@ -2278,6 +2314,7 @@ def test_rabbitmq_vhost(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'vhost',
rabbitmq_format = 'JSONEachRow',
rabbitmq_flush_interval_ms=1000,
rabbitmq_vhost = '/'
"""
)
@ -2306,6 +2343,7 @@ def test_rabbitmq_drop_table_properly(rabbitmq_cluster):
CREATE TABLE test.rabbitmq_drop (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_flush_interval_ms=1000,
rabbitmq_exchange_name = 'drop',
rabbitmq_format = 'JSONEachRow',
rabbitmq_queue_base = 'rabbit_queue_drop'
@ -2352,6 +2390,7 @@ def test_rabbitmq_queue_settings(rabbitmq_cluster):
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'rabbit_exchange',
rabbitmq_flush_interval_ms=1000,
rabbitmq_format = 'JSONEachRow',
rabbitmq_queue_base = 'rabbit_queue_settings',
rabbitmq_queue_settings_list = 'x-max-length=10,x-overflow=reject-publish'
@ -2433,6 +2472,7 @@ def test_rabbitmq_queue_consume(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_format = 'JSONEachRow',
rabbitmq_queue_base = 'rabbit_queue',
rabbitmq_flush_interval_ms=1000,
rabbitmq_queue_consume = 1;
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree ORDER BY key;
@ -2467,6 +2507,7 @@ def test_rabbitmq_produce_consume_avro(rabbitmq_cluster):
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_format = 'Avro',
rabbitmq_flush_interval_ms=1000,
rabbitmq_exchange_name = 'avro',
rabbitmq_exchange_type = 'direct',
rabbitmq_routing_key_list = 'avro';
@ -2475,6 +2516,7 @@ def test_rabbitmq_produce_consume_avro(rabbitmq_cluster):
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_format = 'Avro',
rabbitmq_flush_interval_ms=1000,
rabbitmq_exchange_name = 'avro',
rabbitmq_exchange_type = 'direct',
rabbitmq_routing_key_list = 'avro';
@ -2517,6 +2559,7 @@ def test_rabbitmq_bad_args(rabbitmq_cluster):
CREATE TABLE test.drop (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_flush_interval_ms=1000,
rabbitmq_exchange_name = 'f',
rabbitmq_format = 'JSONEachRow';
"""
@ -2529,6 +2572,7 @@ def test_rabbitmq_issue_30691(rabbitmq_cluster):
CREATE TABLE test.rabbitmq_drop (json String)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_flush_interval_ms=1000,
rabbitmq_exchange_name = '30691',
rabbitmq_row_delimiter = '\\n', -- Works only if adding this setting
rabbitmq_format = 'LineAsString',
@ -2589,6 +2633,7 @@ def test_rabbitmq_drop_mv(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'mv',
rabbitmq_format = 'JSONEachRow',
rabbitmq_flush_interval_ms=1000,
rabbitmq_queue_base = 'drop_mv';
"""
)
@ -2679,6 +2724,7 @@ def test_rabbitmq_random_detach(rabbitmq_cluster):
rabbitmq_exchange_name = 'random',
rabbitmq_queue_base = 'random',
rabbitmq_num_queues = 2,
rabbitmq_flush_interval_ms=1000,
rabbitmq_num_consumers = 2,
rabbitmq_format = 'JSONEachRow';
CREATE TABLE test.view (key UInt64, value UInt64, channel_id String)
@ -2743,7 +2789,9 @@ def test_rabbitmq_predefined_configuration(rabbitmq_cluster):
instance.query(
"""
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
ENGINE = RabbitMQ(rabbit1, rabbitmq_vhost = '/') """
ENGINE = RabbitMQ(rabbit1, rabbitmq_vhost = '/')
SETTINGS rabbitmq_flush_interval_ms=1000;
"""
)
channel.basic_publish(
@ -2779,6 +2827,7 @@ def test_rabbitmq_msgpack(rabbitmq_cluster):
settings rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'xhep',
rabbitmq_format = 'MsgPack',
rabbitmq_flush_interval_ms=1000,
rabbitmq_num_consumers = 1;
create table
rabbit_out (val String)
@ -2786,6 +2835,7 @@ def test_rabbitmq_msgpack(rabbitmq_cluster):
settings rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'xhep',
rabbitmq_format = 'MsgPack',
rabbitmq_flush_interval_ms=1000,
rabbitmq_num_consumers = 1;
set stream_like_engine_allow_direct_select=1;
insert into rabbit_out select 'kek';
@ -2821,12 +2871,14 @@ def test_rabbitmq_address(rabbitmq_cluster):
SETTINGS rabbitmq_exchange_name = 'rxhep',
rabbitmq_format = 'CSV',
rabbitmq_num_consumers = 1,
rabbitmq_flush_interval_ms=1000,
rabbitmq_address='amqp://root:clickhouse@rabbitmq1:5672/';
create table
rabbit_out (val String) engine=RabbitMQ
SETTINGS rabbitmq_exchange_name = 'rxhep',
rabbitmq_format = 'CSV',
rabbitmq_num_consumers = 1,
rabbitmq_flush_interval_ms=1000,
rabbitmq_address='amqp://root:clickhouse@rabbitmq1:5672/';
set stream_like_engine_allow_direct_select=1;
insert into rabbit_out select 'kek';
@ -2848,3 +2900,170 @@ def test_rabbitmq_address(rabbitmq_cluster):
instance2.query("drop table rabbit_in sync")
instance2.query("drop table rabbit_out sync")
def test_rabbitmq_flush_by_block_size(rabbitmq_cluster):
instance.query(
"""
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'flush_by_block',
rabbitmq_queue_base = 'flush_by_block',
rabbitmq_max_block_size = 100,
rabbitmq_flush_interval_ms = 640000, /* should not flush by time during test */
rabbitmq_format = 'JSONEachRow';
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree()
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.rabbitmq;
"""
)
cancel = threading.Event()
def produce():
credentials = pika.PlainCredentials("root", "clickhouse")
parameters = pika.ConnectionParameters(
rabbitmq_cluster.rabbitmq_ip,
rabbitmq_cluster.rabbitmq_port,
"/",
credentials,
)
connection = pika.BlockingConnection(parameters)
while not cancel.is_set():
try:
channel = connection.channel()
channel.basic_publish(
exchange="flush_by_block",
routing_key="",
body=json.dumps({"key": 0, "value": 0}),
)
except e:
print(f"Got error: {str(e)}")
produce_thread = threading.Thread(target=produce)
produce_thread.start()
while 0 == int(
instance.query(
"SELECT count() FROM system.parts WHERE database = 'test' AND table = 'view' AND name = 'all_1_1_0'"
)
):
time.sleep(0.5)
cancel.set()
produce_thread.join()
# more flushes can happens during test, we need to check only result of first flush (part named all_1_1_0).
result = instance.query("SELECT count() FROM test.view WHERE _part='all_1_1_0'")
# logging.debug(result)
instance.query(
"""
DROP TABLE test.consumer;
DROP TABLE test.view;
DROP TABLE test.rabbitmq;
"""
)
# 100 = first poll should return 100 messages (and rows)
# not waiting for stream_flush_interval_ms
assert (
int(result) == 100
), "Messages from rabbitmq should be flushed when block of size rabbitmq_max_block_size is formed!"
def test_rabbitmq_flush_by_time(rabbitmq_cluster):
instance.query(
"""
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'flush_by_time',
rabbitmq_queue_base = 'flush_by_time',
rabbitmq_max_block_size = 100,
rabbitmq_flush_interval_ms = 5000,
rabbitmq_format = 'JSONEachRow';
CREATE TABLE test.view (key UInt64, value UInt64, ts DateTime64(3) MATERIALIZED now64(3))
ENGINE = MergeTree()
ORDER BY key;
"""
)
cancel = threading.Event()
def produce():
credentials = pika.PlainCredentials("root", "clickhouse")
parameters = pika.ConnectionParameters(
rabbitmq_cluster.rabbitmq_ip,
rabbitmq_cluster.rabbitmq_port,
"/",
credentials,
)
connection = pika.BlockingConnection(parameters)
while not cancel.is_set():
try:
channel = connection.channel()
channel.basic_publish(
exchange="flush_by_time",
routing_key="",
body=json.dumps({"key": 0, "value": 0}),
)
print("Produced a message")
time.sleep(0.8)
except e:
print(f"Got error: {str(e)}")
produce_thread = threading.Thread(target=produce)
produce_thread.start()
instance.query(
"""
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.rabbitmq;
"""
)
while True:
time.sleep(0.2)
count = instance.query(
"SELECT count() FROM system.parts WHERE database = 'test' AND table = 'view'"
)
print(f"kssenii total count: {count}")
count = int(
instance.query(
"SELECT count() FROM system.parts WHERE database = 'test' AND table = 'view' AND name = 'all_1_1_0'"
)
)
print(f"kssenii count: {count}")
if count > 0:
break
time.sleep(12)
result = instance.query("SELECT uniqExact(ts) FROM test.view")
cancel.set()
produce_thread.join()
instance.query(
"""
DROP TABLE test.consumer;
DROP TABLE test.view;
DROP TABLE test.rabbitmq;
"""
)
assert int(result) == 3