Merge pull request #44404 from kssenii/storage-rabbitmq-fix-flush

RabbitMQ - fix writing many small blocks by flushing data only exactly by flush_interval_ms or by max_block_size
This commit is contained in:
Kseniia Sumarokova 2023-02-03 11:50:12 +01:00 committed by GitHub
commit ef7acb9b66
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 358 additions and 63 deletions

View File

@ -77,9 +77,12 @@ Optional parameters:
- `rabbitmq_password` - RabbitMQ password.
- `rabbitmq_commit_on_select` - Commit messages when select query is made. Default: `false`.
- `rabbitmq_max_rows_per_message` — The maximum number of rows written in one RabbitMQ message for row-based formats. Default : `1`.
- `rabbitmq_empty_queue_backoff_start` — A start backoff point to reschedule read if the rabbitmq queue is empty.
- `rabbitmq_empty_queue_backoff_end` — An end backoff point to reschedule read if the rabbitmq queue is empty.
SSL connection:
* [ ] SSL connection:
Use either `rabbitmq_secure = 1` or `amqps` in connection address: `rabbitmq_address = 'amqps://guest:guest@localhost/vhost'`.
The default behaviour of the used library is not to check if the created TLS connection is sufficiently secure. Whether the certificate is expired, self-signed, missing or invalid: the connection is simply permitted. More strict checking of certificates can possibly be implemented in the future.

View File

@ -68,7 +68,7 @@ public:
bool ackMessages();
void updateAckTracker(AckTracker record = AckTracker());
bool queueEmpty() { return received.empty(); }
bool hasPendingMessages() { return received.empty(); }
auto getChannelID() const { return current.track.channel_id; }
auto getDeliveryTag() const { return current.track.delivery_tag; }

View File

@ -24,9 +24,12 @@ namespace DB
M(String, rabbitmq_address, "", "Address for connection", 0) \
M(UInt64, rabbitmq_skip_broken_messages, 0, "Skip at least this number of broken messages from RabbitMQ per block", 0) \
M(UInt64, rabbitmq_max_block_size, 0, "Number of row collected before flushing data from RabbitMQ.", 0) \
M(Milliseconds, rabbitmq_flush_interval_ms, 0, "Timeout for flushing data from RabbitMQ.", 0) \
M(UInt64, rabbitmq_flush_interval_ms, 0, "Timeout for flushing data from RabbitMQ.", 0) \
M(String, rabbitmq_vhost, "/", "RabbitMQ vhost.", 0) \
M(String, rabbitmq_queue_settings_list, "", "A list of rabbitmq queue settings", 0) \
M(UInt64, rabbitmq_empty_queue_backoff_start_ms, 10, "A minimum backoff point to reschedule read if the rabbitmq queue is empty", 0) \
M(UInt64, rabbitmq_empty_queue_backoff_end_ms, 10000, "A maximum backoff point to reschedule read if the rabbitmq queue is empty", 0) \
M(UInt64, rabbitmq_empty_queue_backoff_step_ms, 100, "A maximum backoff point to reschedule read if the rabbitmq queue is empty", 0) \
M(Bool, rabbitmq_queue_consume, false, "Use user-defined queues and do not make any RabbitMQ setup: declaring exchanges, queues, bindings", 0) \
M(String, rabbitmq_username, "", "RabbitMQ username", 0) \
M(String, rabbitmq_password, "", "RabbitMQ password", 0) \

View File

@ -4,6 +4,7 @@
#include <Interpreters/Context.h>
#include <Processors/Executors/StreamingFormatExecutor.h>
#include <Storages/RabbitMQ/RabbitMQConsumer.h>
#include <Common/logger_useful.h>
#include <IO/EmptyReadBuffer.h>
namespace DB
@ -62,6 +63,7 @@ RabbitMQSource::RabbitMQSource(
, ack_in_suffix(ack_in_suffix_)
, non_virtual_header(std::move(headers.first))
, virtual_header(std::move(headers.second))
, log(&Poco::Logger::get("RabbitMQSource"))
{
storage.incrementReader();
}
@ -107,17 +109,15 @@ Chunk RabbitMQSource::generate()
return chunk;
}
bool RabbitMQSource::checkTimeLimit() const
bool RabbitMQSource::isTimeLimitExceeded() const
{
if (max_execution_time != 0)
if (max_execution_time_ms != 0)
{
auto elapsed_ns = total_stopwatch.elapsed();
if (elapsed_ns > static_cast<UInt64>(max_execution_time.totalMicroseconds()) * 1000)
return false;
uint64_t elapsed_time_ms = total_stopwatch.elapsedMilliseconds();
return max_execution_time_ms <= elapsed_time_ms;
}
return true;
return false;
}
Chunk RabbitMQSource::generateImpl()
@ -128,9 +128,11 @@ Chunk RabbitMQSource::generateImpl()
consumer = storage.popConsumer(timeout);
}
if (!consumer || is_finished)
if (is_finished || !consumer || consumer->isConsumerStopped())
return {};
/// Currently it is one time usage source: to make sure data is flushed
/// strictly by timeout or by block size.
is_finished = true;
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
@ -139,17 +141,17 @@ Chunk RabbitMQSource::generateImpl()
storage.getFormatName(), empty_buf, non_virtual_header, context, max_block_size);
StreamingFormatExecutor executor(non_virtual_header, input_format);
size_t total_rows = 0;
while (true)
{
if (consumer->queueEmpty())
break;
size_t new_rows = 0;
if (auto buf = consumer->consume())
new_rows = executor.execute(*buf);
if (!consumer->hasPendingMessages())
{
if (auto buf = consumer->consume())
new_rows = executor.execute(*buf);
}
if (new_rows)
{
@ -172,13 +174,18 @@ Chunk RabbitMQSource::generateImpl()
virtual_columns[5]->insert(timestamp);
}
total_rows = total_rows + new_rows;
total_rows += new_rows;
}
if (total_rows >= max_block_size || consumer->queueEmpty() || consumer->isConsumerStopped() || !checkTimeLimit())
if (total_rows >= max_block_size || consumer->isConsumerStopped() || isTimeLimitExceeded())
break;
}
LOG_TEST(
log,
"Flushing {} rows (max block size: {}, time: {} / {} ms)",
total_rows, max_block_size, total_stopwatch.elapsedMilliseconds(), max_execution_time_ms);
if (total_rows == 0)
return {};

View File

@ -27,13 +27,12 @@ public:
Chunk generate() override;
bool queueEmpty() const { return !consumer || consumer->queueEmpty(); }
bool queueEmpty() const { return !consumer || consumer->hasPendingMessages(); }
bool needChannelUpdate();
void updateChannel();
bool sendAck();
void setTimeLimit(Poco::Timespan max_execution_time_) { max_execution_time = max_execution_time_; }
void setTimeLimit(uint64_t max_execution_time_ms_) { max_execution_time_ms = max_execution_time_ms_; }
private:
StorageRabbitMQ & storage;
@ -47,12 +46,13 @@ private:
const Block non_virtual_header;
const Block virtual_header;
Poco::Logger * log;
RabbitMQConsumerPtr consumer;
Poco::Timespan max_execution_time = 0;
uint64_t max_execution_time_ms = 0;
Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE};
bool checkTimeLimit() const;
bool isTimeLimitExceeded() const;
RabbitMQSource(
StorageRabbitMQ & storage_,

View File

@ -36,7 +36,6 @@ namespace DB
static const uint32_t QUEUE_SIZE = 100000;
static const auto MAX_FAILED_READ_ATTEMPTS = 10;
static const auto RESCHEDULE_MS = 500;
static const auto BACKOFF_TRESHOLD = 32000;
static const auto MAX_THREAD_WORK_DURATION_MS = 60000;
namespace ErrorCodes
@ -90,7 +89,7 @@ StorageRabbitMQ::StorageRabbitMQ(
, semaphore(0, static_cast<int>(num_consumers))
, unique_strbase(getRandomName())
, queue_size(std::max(QUEUE_SIZE, static_cast<uint32_t>(getMaxBlockSize())))
, milliseconds_to_wait(RESCHEDULE_MS)
, milliseconds_to_wait(rabbitmq_settings->rabbitmq_empty_queue_backoff_start_ms)
, is_attach(is_attach_)
{
const auto & config = getContext()->getConfigRef();
@ -717,6 +716,12 @@ void StorageRabbitMQ::read(
auto rabbit_source = std::make_shared<RabbitMQSource>(
*this, storage_snapshot, modified_context, column_names, 1, rabbitmq_settings->rabbitmq_commit_on_select);
uint64_t max_execution_time_ms = rabbitmq_settings->rabbitmq_flush_interval_ms.changed
? rabbitmq_settings->rabbitmq_flush_interval_ms
: (static_cast<UInt64>(getContext()->getSettingsRef().stream_flush_interval_ms) * 1000);
rabbit_source->setTimeLimit(max_execution_time_ms);
auto converting_dag = ActionsDAG::makeConvertingActions(
rabbit_source->getPort().getHeader().getColumnsWithTypeAndName(),
sample_block.getColumnsWithTypeAndName(),
@ -1015,14 +1020,14 @@ void StorageRabbitMQ::streamingToViewsFunc()
if (streamToViews())
{
/// Reschedule with backoff.
if (milliseconds_to_wait < BACKOFF_TRESHOLD)
milliseconds_to_wait *= 2;
if (milliseconds_to_wait < rabbitmq_settings->rabbitmq_empty_queue_backoff_end_ms)
milliseconds_to_wait += rabbitmq_settings->rabbitmq_empty_queue_backoff_step_ms;
stopLoopIfNoReaders();
break;
}
else
{
milliseconds_to_wait = RESCHEDULE_MS;
milliseconds_to_wait = rabbitmq_settings->rabbitmq_empty_queue_backoff_start_ms;
}
auto end_time = std::chrono::steady_clock::now();
@ -1085,14 +1090,15 @@ bool StorageRabbitMQ::streamToViews()
{
auto source = std::make_shared<RabbitMQSource>(
*this, storage_snapshot, rabbitmq_context, column_names, block_size, false);
uint64_t max_execution_time_ms = rabbitmq_settings->rabbitmq_flush_interval_ms.changed
? rabbitmq_settings->rabbitmq_flush_interval_ms
: (static_cast<UInt64>(getContext()->getSettingsRef().stream_flush_interval_ms) * 1000);
source->setTimeLimit(max_execution_time_ms);
sources.emplace_back(source);
pipes.emplace_back(source);
Poco::Timespan max_execution_time = rabbitmq_settings->rabbitmq_flush_interval_ms.changed
? rabbitmq_settings->rabbitmq_flush_interval_ms
: getContext()->getSettingsRef().stream_flush_interval_ms;
source->setTimeLimit(max_execution_time);
}
block_io.pipeline.complete(Pipe::unitePipes(std::move(pipes)));

View File

@ -1,10 +1,5 @@
import pytest
# FIXME This test is too flaky
# https://github.com/ClickHouse/ClickHouse/issues/45160
pytestmark = pytest.mark.skip
import json
import os.path as p
import random
@ -55,6 +50,7 @@ def rabbitmq_check_result(result, check=False, ref_file="test_rabbitmq_json.refe
def wait_rabbitmq_to_start(rabbitmq_docker_id, timeout=180):
logging.getLogger("pika").propagate = False
start = time.time()
while time.time() - start < timeout:
try:
@ -159,6 +155,7 @@ def test_rabbitmq_select_empty(rabbitmq_cluster):
rabbitmq_exchange_name = 'empty',
rabbitmq_commit_on_select = 1,
rabbitmq_format = 'TSV',
rabbitmq_flush_interval_ms=1000,
rabbitmq_row_delimiter = '\\n';
""".format(
rabbitmq_cluster.rabbitmq_host
@ -175,6 +172,8 @@ def test_rabbitmq_json_without_delimiter(rabbitmq_cluster):
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = '{}:5672',
rabbitmq_commit_on_select = 1,
rabbitmq_flush_interval_ms=1000,
rabbitmq_max_block_size=100,
rabbitmq_exchange_name = 'json',
rabbitmq_format = 'JSONEachRow'
""".format(
@ -227,6 +226,8 @@ def test_rabbitmq_csv_with_delimiter(rabbitmq_cluster):
rabbitmq_exchange_name = 'csv',
rabbitmq_commit_on_select = 1,
rabbitmq_format = 'CSV',
rabbitmq_flush_interval_ms=1000,
rabbitmq_max_block_size=100,
rabbitmq_row_delimiter = '\\n';
"""
)
@ -268,6 +269,8 @@ def test_rabbitmq_tsv_with_delimiter(rabbitmq_cluster):
rabbitmq_exchange_name = 'tsv',
rabbitmq_format = 'TSV',
rabbitmq_commit_on_select = 1,
rabbitmq_flush_interval_ms=1000,
rabbitmq_max_block_size=100,
rabbitmq_queue_base = 'tsv',
rabbitmq_row_delimiter = '\\n';
CREATE TABLE test.view (key UInt64, value UInt64)
@ -309,6 +312,8 @@ def test_rabbitmq_macros(rabbitmq_cluster):
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = '{rabbitmq_host}:{rabbitmq_port}',
rabbitmq_commit_on_select = 1,
rabbitmq_flush_interval_ms=1000,
rabbitmq_max_block_size=100,
rabbitmq_exchange_name = '{rabbitmq_exchange_name}',
rabbitmq_format = '{rabbitmq_format}'
"""
@ -348,6 +353,8 @@ def test_rabbitmq_materialized_view(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'mv',
rabbitmq_format = 'JSONEachRow',
rabbitmq_flush_interval_ms=1000,
rabbitmq_max_block_size=100,
rabbitmq_row_delimiter = '\\n';
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree()
@ -370,10 +377,11 @@ def test_rabbitmq_materialized_view(rabbitmq_cluster):
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
instance.wait_for_log_line("Started streaming to 2 attached views")
messages = []
for i in range(50):
messages.append(json.dumps({"key": i, "value": i}))
for message in messages:
message = json.dumps({"key": i, "value": i})
channel.basic_publish(exchange="mv", routing_key="", body=message)
time_limit_sec = 60
@ -390,8 +398,10 @@ def test_rabbitmq_materialized_view(rabbitmq_cluster):
while time.monotonic() < deadline:
result = instance.query("SELECT * FROM test.view2 ORDER BY key")
print(f"Result: {result}")
if rabbitmq_check_result(result):
break
time.sleep(1)
rabbitmq_check_result(result, True)
connection.close()
@ -404,6 +414,8 @@ def test_rabbitmq_materialized_view_with_subquery(rabbitmq_cluster):
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'mvsq',
rabbitmq_flush_interval_ms=1000,
rabbitmq_max_block_size=100,
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
CREATE TABLE test.view (key UInt64, value UInt64)
@ -447,6 +459,8 @@ def test_rabbitmq_many_materialized_views(rabbitmq_cluster):
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'mmv',
rabbitmq_flush_interval_ms=1000,
rabbitmq_max_block_size=100,
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
CREATE TABLE test.view1 (key UInt64, value UInt64)
@ -469,6 +483,8 @@ def test_rabbitmq_many_materialized_views(rabbitmq_cluster):
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
instance.wait_for_log_line("Started streaming to 2 attached views")
messages = []
for i in range(50):
messages.append(json.dumps({"key": i, "value": i}))
@ -504,6 +520,8 @@ def test_rabbitmq_protobuf(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'pb',
rabbitmq_format = 'Protobuf',
rabbitmq_flush_interval_ms=1000,
rabbitmq_max_block_size=100,
rabbitmq_schema = 'rabbitmq.proto:KeyValueProto';
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree()
@ -578,6 +596,8 @@ def test_rabbitmq_big_message(rabbitmq_cluster):
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'big',
rabbitmq_flush_interval_ms=1000,
rabbitmq_max_block_size=100,
rabbitmq_format = 'JSONEachRow';
CREATE TABLE test.view (key UInt64, value String)
ENGINE = MergeTree
@ -605,6 +625,7 @@ def test_rabbitmq_big_message(rabbitmq_cluster):
def test_rabbitmq_sharding_between_queues_publish(rabbitmq_cluster):
NUM_CONSUMERS = 10
NUM_QUEUES = 10
logging.getLogger("pika").propagate = False
instance.query(
"""
@ -612,8 +633,10 @@ def test_rabbitmq_sharding_between_queues_publish(rabbitmq_cluster):
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'test_sharding',
rabbitmq_num_queues = 10,
rabbitmq_num_queues = 5,
rabbitmq_num_consumers = 10,
rabbitmq_max_block_size = 100,
rabbitmq_flush_interval_ms=500,
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
CREATE TABLE test.view (key UInt64, value UInt64, channel_id String)
@ -654,7 +677,7 @@ def test_rabbitmq_sharding_between_queues_publish(rabbitmq_cluster):
connection.close()
threads = []
threads_num = 20
threads_num = 10
for _ in range(threads_num):
threads.append(threading.Thread(target=produce))
@ -666,8 +689,10 @@ def test_rabbitmq_sharding_between_queues_publish(rabbitmq_cluster):
while True:
result1 = instance.query("SELECT count() FROM test.view")
time.sleep(1)
if int(result1) == messages_num * threads_num:
expected = messages_num * threads_num
if int(result1) == expected:
break
print(f"Result {result1} / {expected}")
result2 = instance.query("SELECT count(DISTINCT channel_id) FROM test.view")
@ -683,6 +708,7 @@ def test_rabbitmq_sharding_between_queues_publish(rabbitmq_cluster):
def test_rabbitmq_mv_combo(rabbitmq_cluster):
NUM_MV = 5
NUM_CONSUMERS = 4
logging.getLogger("pika").propagate = False
instance.query(
"""
@ -691,6 +717,8 @@ def test_rabbitmq_mv_combo(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'combo',
rabbitmq_queue_base = 'combo',
rabbitmq_max_block_size = 100,
rabbitmq_flush_interval_ms=1000,
rabbitmq_num_consumers = 2,
rabbitmq_num_queues = 5,
rabbitmq_format = 'JSONEachRow',
@ -755,8 +783,10 @@ def test_rabbitmq_mv_combo(rabbitmq_cluster):
result += int(
instance.query("SELECT count() FROM test.combo_{0}".format(mv_id))
)
if int(result) == messages_num * threads_num * NUM_MV:
expected = messages_num * threads_num * NUM_MV
if int(result) == expected:
break
print(f"Result: {result} / {expected}")
time.sleep(1)
for thread in threads:
@ -784,6 +814,8 @@ def test_rabbitmq_insert(rabbitmq_cluster):
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'insert',
rabbitmq_flush_interval_ms=1000,
rabbitmq_max_block_size=100,
rabbitmq_exchange_type = 'direct',
rabbitmq_routing_key_list = 'insert1',
rabbitmq_format = 'TSV',
@ -841,6 +873,8 @@ def test_rabbitmq_insert_headers_exchange(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'insert_headers',
rabbitmq_exchange_type = 'headers',
rabbitmq_flush_interval_ms=1000,
rabbitmq_max_block_size=100,
rabbitmq_routing_key_list = 'test=insert,topic=headers',
rabbitmq_format = 'TSV',
rabbitmq_row_delimiter = '\\n';
@ -907,6 +941,8 @@ def test_rabbitmq_many_inserts(rabbitmq_cluster):
rabbitmq_exchange_name = 'many_inserts',
rabbitmq_exchange_type = 'direct',
rabbitmq_routing_key_list = 'insert2',
rabbitmq_flush_interval_ms=1000,
rabbitmq_max_block_size=100,
rabbitmq_format = 'TSV',
rabbitmq_row_delimiter = '\\n';
CREATE TABLE test.rabbitmq_consume (key UInt64, value UInt64)
@ -915,6 +951,8 @@ def test_rabbitmq_many_inserts(rabbitmq_cluster):
rabbitmq_exchange_name = 'many_inserts',
rabbitmq_exchange_type = 'direct',
rabbitmq_routing_key_list = 'insert2',
rabbitmq_flush_interval_ms=1000,
rabbitmq_max_block_size=100,
rabbitmq_format = 'TSV',
rabbitmq_row_delimiter = '\\n';
"""
@ -993,9 +1031,10 @@ def test_rabbitmq_overloaded_insert(rabbitmq_cluster):
rabbitmq_exchange_name = 'over',
rabbitmq_queue_base = 'over',
rabbitmq_exchange_type = 'direct',
rabbitmq_num_consumers = 5,
rabbitmq_num_queues = 10,
rabbitmq_max_block_size = 10000,
rabbitmq_num_consumers = 3,
rabbitmq_flush_interval_ms=1000,
rabbitmq_max_block_size = 100,
rabbitmq_num_queues = 2,
rabbitmq_routing_key_list = 'over',
rabbitmq_format = 'TSV',
rabbitmq_row_delimiter = '\\n';
@ -1005,6 +1044,8 @@ def test_rabbitmq_overloaded_insert(rabbitmq_cluster):
rabbitmq_exchange_name = 'over',
rabbitmq_exchange_type = 'direct',
rabbitmq_routing_key_list = 'over',
rabbitmq_flush_interval_ms=1000,
rabbitmq_max_block_size = 100,
rabbitmq_format = 'TSV',
rabbitmq_row_delimiter = '\\n';
CREATE TABLE test.view_overload (key UInt64, value UInt64)
@ -1016,6 +1057,8 @@ def test_rabbitmq_overloaded_insert(rabbitmq_cluster):
"""
)
instance.wait_for_log_line("Started streaming to 1 attached views")
messages_num = 100000
def insert():
@ -1037,7 +1080,7 @@ def test_rabbitmq_overloaded_insert(rabbitmq_cluster):
raise
threads = []
threads_num = 5
threads_num = 3
for _ in range(threads_num):
threads.append(threading.Thread(target=insert))
for thread in threads:
@ -1047,8 +1090,10 @@ def test_rabbitmq_overloaded_insert(rabbitmq_cluster):
while True:
result = instance.query("SELECT count() FROM test.view_overload")
time.sleep(1)
if int(result) == messages_num * threads_num:
expected = messages_num * threads_num
if int(result) == expected:
break
print(f"Result: {result} / {expected}")
instance.query(
"""
@ -1090,6 +1135,8 @@ def test_rabbitmq_direct_exchange(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_num_consumers = 2,
rabbitmq_num_queues = 2,
rabbitmq_flush_interval_ms=1000,
rabbitmq_max_block_size=100,
rabbitmq_exchange_name = 'direct_exchange_testing',
rabbitmq_exchange_type = 'direct',
rabbitmq_routing_key_list = 'direct_{0}',
@ -1181,6 +1228,8 @@ def test_rabbitmq_fanout_exchange(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_num_consumers = 2,
rabbitmq_num_queues = 2,
rabbitmq_flush_interval_ms=1000,
rabbitmq_max_block_size=100,
rabbitmq_routing_key_list = 'key_{0}',
rabbitmq_exchange_name = 'fanout_exchange_testing',
rabbitmq_exchange_type = 'fanout',
@ -1267,6 +1316,8 @@ def test_rabbitmq_topic_exchange(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_num_consumers = 2,
rabbitmq_num_queues = 2,
rabbitmq_flush_interval_ms=1000,
rabbitmq_max_block_size=100,
rabbitmq_exchange_name = 'topic_exchange_testing',
rabbitmq_exchange_type = 'topic',
rabbitmq_routing_key_list = '*.{0}',
@ -1290,6 +1341,8 @@ def test_rabbitmq_topic_exchange(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_num_consumers = 2,
rabbitmq_num_queues = 2,
rabbitmq_flush_interval_ms=1000,
rabbitmq_max_block_size=100,
rabbitmq_exchange_name = 'topic_exchange_testing',
rabbitmq_exchange_type = 'topic',
rabbitmq_routing_key_list = '*.logs',
@ -1391,6 +1444,7 @@ def test_rabbitmq_hash_exchange(rabbitmq_cluster):
rabbitmq_exchange_type = 'consistent_hash',
rabbitmq_exchange_name = 'hash_exchange_testing',
rabbitmq_format = 'JSONEachRow',
rabbitmq_flush_interval_ms=1000,
rabbitmq_row_delimiter = '\\n';
CREATE MATERIALIZED VIEW test.{0}_mv TO test.destination AS
SELECT key, value, _channel_id AS channel_id FROM test.{0};
@ -1488,6 +1542,8 @@ def test_rabbitmq_multiple_bindings(rabbitmq_cluster):
rabbitmq_exchange_name = 'multiple_bindings_testing',
rabbitmq_exchange_type = 'direct',
rabbitmq_routing_key_list = 'key1,key2,key3,key4,key5',
rabbitmq_flush_interval_ms=1000,
rabbitmq_max_block_size=100,
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
CREATE MATERIALIZED VIEW test.bindings_mv TO test.destination AS
@ -1577,6 +1633,8 @@ def test_rabbitmq_headers_exchange(rabbitmq_cluster):
rabbitmq_num_consumers = 2,
rabbitmq_exchange_name = 'headers_exchange_testing',
rabbitmq_exchange_type = 'headers',
rabbitmq_flush_interval_ms=1000,
rabbitmq_max_block_size=100,
rabbitmq_routing_key_list = 'x-match=all,format=logs,type=report,year=2020',
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
@ -1601,6 +1659,8 @@ def test_rabbitmq_headers_exchange(rabbitmq_cluster):
rabbitmq_exchange_type = 'headers',
rabbitmq_routing_key_list = 'x-match=all,format=logs,type=report,year=2019',
rabbitmq_format = 'JSONEachRow',
rabbitmq_flush_interval_ms=1000,
rabbitmq_max_block_size=100,
rabbitmq_row_delimiter = '\\n';
CREATE MATERIALIZED VIEW test.headers_exchange_{0}_mv TO test.destination AS
SELECT key, value FROM test.headers_exchange_{0};
@ -1673,6 +1733,8 @@ def test_rabbitmq_virtual_columns(rabbitmq_cluster):
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'virtuals',
rabbitmq_flush_interval_ms=1000,
rabbitmq_max_block_size=100,
rabbitmq_format = 'JSONEachRow';
CREATE MATERIALIZED VIEW test.view Engine=Log AS
SELECT value, key, _exchange_name, _channel_id, _delivery_tag, _redelivered FROM test.rabbitmq_virtuals;
@ -1741,6 +1803,8 @@ def test_rabbitmq_virtual_columns_with_materialized_view(rabbitmq_cluster):
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'virtuals_mv',
rabbitmq_flush_interval_ms=1000,
rabbitmq_max_block_size=100,
rabbitmq_format = 'JSONEachRow';
CREATE TABLE test.view (key UInt64, value UInt64,
exchange_name String, channel_id String, delivery_tag UInt64, redelivered UInt8) ENGINE = MergeTree()
@ -1826,6 +1890,8 @@ def test_rabbitmq_many_consumers_to_each_queue(rabbitmq_cluster):
rabbitmq_exchange_name = 'many_consumers',
rabbitmq_num_queues = 2,
rabbitmq_num_consumers = 2,
rabbitmq_flush_interval_ms=1000,
rabbitmq_max_block_size=100,
rabbitmq_queue_base = 'many_consumers',
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
@ -1915,6 +1981,8 @@ def test_rabbitmq_restore_failed_connection_without_losses_1(rabbitmq_cluster):
CREATE TABLE test.consume (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_flush_interval_ms=500,
rabbitmq_max_block_size = 100,
rabbitmq_exchange_name = 'producer_reconnect',
rabbitmq_format = 'JSONEachRow',
rabbitmq_num_consumers = 2,
@ -1927,6 +1995,7 @@ def test_rabbitmq_restore_failed_connection_without_losses_1(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'producer_reconnect',
rabbitmq_persistent = '1',
rabbitmq_flush_interval_ms=1000,
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
"""
@ -1982,7 +2051,9 @@ def test_rabbitmq_restore_failed_connection_without_losses_1(rabbitmq_cluster):
)
@pytest.mark.skip(reason="Timeout: FIXME")
def test_rabbitmq_restore_failed_connection_without_losses_2(rabbitmq_cluster):
logging.getLogger("pika").propagate = False
instance.query(
"""
CREATE TABLE test.consumer_reconnect (key UInt64, value UInt64)
@ -1990,6 +2061,8 @@ def test_rabbitmq_restore_failed_connection_without_losses_2(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'consumer_reconnect',
rabbitmq_num_consumers = 10,
rabbitmq_flush_interval_ms = 100,
rabbitmq_max_block_size = 100,
rabbitmq_num_queues = 10,
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
@ -2044,10 +2117,11 @@ def test_rabbitmq_restore_failed_connection_without_losses_2(rabbitmq_cluster):
# revive_rabbitmq()
while True:
result = instance.query("SELECT count(DISTINCT key) FROM test.view")
time.sleep(1)
result = instance.query("SELECT count(DISTINCT key) FROM test.view").strip()
if int(result) == messages_num:
break
print(f"Result: {result} / {messages_num}")
time.sleep(1)
instance.query(
"""
@ -2062,6 +2136,7 @@ def test_rabbitmq_restore_failed_connection_without_losses_2(rabbitmq_cluster):
def test_rabbitmq_commit_on_block_write(rabbitmq_cluster):
logging.getLogger("pika").propagate = False
instance.query(
"""
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
@ -2070,6 +2145,7 @@ def test_rabbitmq_commit_on_block_write(rabbitmq_cluster):
rabbitmq_exchange_name = 'block',
rabbitmq_format = 'JSONEachRow',
rabbitmq_queue_base = 'block',
rabbitmq_flush_interval_ms=1000,
rabbitmq_max_block_size = 100,
rabbitmq_row_delimiter = '\\n';
CREATE TABLE test.view (key UInt64, value UInt64)
@ -2150,6 +2226,7 @@ def test_rabbitmq_no_connection_at_startup_1(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'cs',
rabbitmq_format = 'JSONEachRow',
rabbitmq_flush_interval_ms=1000,
rabbitmq_num_consumers = '5',
rabbitmq_row_delimiter = '\\n';
"""
@ -2166,6 +2243,8 @@ def test_rabbitmq_no_connection_at_startup_2(rabbitmq_cluster):
rabbitmq_exchange_name = 'cs',
rabbitmq_format = 'JSONEachRow',
rabbitmq_num_consumers = '5',
rabbitmq_flush_interval_ms=1000,
rabbitmq_max_block_size=100,
rabbitmq_row_delimiter = '\\n';
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree
@ -2222,6 +2301,7 @@ def test_rabbitmq_format_factory_settings(rabbitmq_cluster):
) ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'format_settings',
rabbitmq_flush_interval_ms=1000,
rabbitmq_format = 'JSONEachRow',
date_time_input_format = 'best_effort';
"""
@ -2284,6 +2364,7 @@ def test_rabbitmq_vhost(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'vhost',
rabbitmq_format = 'JSONEachRow',
rabbitmq_flush_interval_ms=1000,
rabbitmq_vhost = '/'
"""
)
@ -2312,6 +2393,7 @@ def test_rabbitmq_drop_table_properly(rabbitmq_cluster):
CREATE TABLE test.rabbitmq_drop (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_flush_interval_ms=1000,
rabbitmq_exchange_name = 'drop',
rabbitmq_format = 'JSONEachRow',
rabbitmq_queue_base = 'rabbit_queue_drop'
@ -2358,6 +2440,7 @@ def test_rabbitmq_queue_settings(rabbitmq_cluster):
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'rabbit_exchange',
rabbitmq_flush_interval_ms=1000,
rabbitmq_format = 'JSONEachRow',
rabbitmq_queue_base = 'rabbit_queue_settings',
rabbitmq_queue_settings_list = 'x-max-length=10,x-overflow=reject-publish'
@ -2390,12 +2473,11 @@ def test_rabbitmq_queue_settings(rabbitmq_cluster):
time.sleep(5)
result = instance.query(
"SELECT count() FROM test.rabbitmq_settings", ignore_error=True
)
while int(result) != 10:
time.sleep(0.5)
while True:
result = instance.query("SELECT count() FROM test.view", ignore_error=True)
if int(result) == 10:
break
time.sleep(0.5)
instance.query("DROP TABLE test.rabbitmq_settings")
@ -2439,6 +2521,7 @@ def test_rabbitmq_queue_consume(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_format = 'JSONEachRow',
rabbitmq_queue_base = 'rabbit_queue',
rabbitmq_flush_interval_ms=1000,
rabbitmq_queue_consume = 1;
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree ORDER BY key;
@ -2473,6 +2556,7 @@ def test_rabbitmq_produce_consume_avro(rabbitmq_cluster):
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_format = 'Avro',
rabbitmq_flush_interval_ms=1000,
rabbitmq_exchange_name = 'avro',
rabbitmq_exchange_type = 'direct',
rabbitmq_routing_key_list = 'avro';
@ -2481,6 +2565,7 @@ def test_rabbitmq_produce_consume_avro(rabbitmq_cluster):
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_format = 'Avro',
rabbitmq_flush_interval_ms=1000,
rabbitmq_exchange_name = 'avro',
rabbitmq_exchange_type = 'direct',
rabbitmq_routing_key_list = 'avro';
@ -2523,6 +2608,7 @@ def test_rabbitmq_bad_args(rabbitmq_cluster):
CREATE TABLE test.drop (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_flush_interval_ms=1000,
rabbitmq_exchange_name = 'f',
rabbitmq_format = 'JSONEachRow';
"""
@ -2535,6 +2621,7 @@ def test_rabbitmq_issue_30691(rabbitmq_cluster):
CREATE TABLE test.rabbitmq_drop (json String)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_flush_interval_ms=1000,
rabbitmq_exchange_name = '30691',
rabbitmq_row_delimiter = '\\n', -- Works only if adding this setting
rabbitmq_format = 'LineAsString',
@ -2595,6 +2682,7 @@ def test_rabbitmq_drop_mv(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'mv',
rabbitmq_format = 'JSONEachRow',
rabbitmq_flush_interval_ms=1000,
rabbitmq_queue_base = 'drop_mv';
"""
)
@ -2654,10 +2742,12 @@ def test_rabbitmq_drop_mv(rabbitmq_cluster):
result = instance.query("SELECT * FROM test.view ORDER BY key")
if rabbitmq_check_result(result):
break
time.sleep(1)
rabbitmq_check_result(result, True)
instance.query("DROP VIEW test.consumer")
instance.query("DROP VIEW test.consumer NO DELAY")
time.sleep(10)
for i in range(50, 60):
channel.basic_publish(
exchange="mv", routing_key="", body=json.dumps({"key": i, "value": i})
@ -2685,6 +2775,7 @@ def test_rabbitmq_random_detach(rabbitmq_cluster):
rabbitmq_exchange_name = 'random',
rabbitmq_queue_base = 'random',
rabbitmq_num_queues = 2,
rabbitmq_flush_interval_ms=1000,
rabbitmq_num_consumers = 2,
rabbitmq_format = 'JSONEachRow';
CREATE TABLE test.view (key UInt64, value UInt64, channel_id String)
@ -2749,7 +2840,9 @@ def test_rabbitmq_predefined_configuration(rabbitmq_cluster):
instance.query(
"""
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
ENGINE = RabbitMQ(rabbit1, rabbitmq_vhost = '/') """
ENGINE = RabbitMQ(rabbit1, rabbitmq_vhost = '/')
SETTINGS rabbitmq_flush_interval_ms=1000;
"""
)
channel.basic_publish(
@ -2785,6 +2878,7 @@ def test_rabbitmq_msgpack(rabbitmq_cluster):
settings rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'xhep',
rabbitmq_format = 'MsgPack',
rabbitmq_flush_interval_ms=1000,
rabbitmq_num_consumers = 1;
create table
rabbit_out (val String)
@ -2792,6 +2886,7 @@ def test_rabbitmq_msgpack(rabbitmq_cluster):
settings rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'xhep',
rabbitmq_format = 'MsgPack',
rabbitmq_flush_interval_ms=1000,
rabbitmq_num_consumers = 1;
set stream_like_engine_allow_direct_select=1;
insert into rabbit_out select 'kek';
@ -2827,12 +2922,14 @@ def test_rabbitmq_address(rabbitmq_cluster):
SETTINGS rabbitmq_exchange_name = 'rxhep',
rabbitmq_format = 'CSV',
rabbitmq_num_consumers = 1,
rabbitmq_flush_interval_ms=1000,
rabbitmq_address='amqp://root:clickhouse@rabbitmq1:5672/';
create table
rabbit_out (val String) engine=RabbitMQ
SETTINGS rabbitmq_exchange_name = 'rxhep',
rabbitmq_format = 'CSV',
rabbitmq_num_consumers = 1,
rabbitmq_flush_interval_ms=1000,
rabbitmq_address='amqp://root:clickhouse@rabbitmq1:5672/';
set stream_like_engine_allow_direct_select=1;
insert into rabbit_out select 'kek';
@ -2887,11 +2984,14 @@ def test_format_with_prefix_and_suffix(rabbitmq_cluster):
insert_messages = []
def onReceived(channel, method, properties, body):
insert_messages.append(body.decode())
message = body.decode()
insert_messages.append(message)
print(f"Received {len(insert_messages)} message: {message}")
if len(insert_messages) == 2:
channel.stop_consuming()
consumer.basic_consume(onReceived, queue_name)
consumer.start_consuming()
consumer_connection.close()
@ -2917,6 +3017,7 @@ def test_max_rows_per_message(rabbitmq_cluster):
rabbitmq_exchange_type = 'direct',
rabbitmq_routing_key_list = 'custom1',
rabbitmq_max_rows_per_message = 3,
rabbitmq_flush_interval_ms = 1000,
format_custom_result_before_delimiter = '<prefix>\n',
format_custom_result_after_delimiter = '<suffix>\n';
@ -3006,9 +3107,11 @@ def test_row_based_formats(rabbitmq_cluster):
rabbitmq_format = '{format_name}',
rabbitmq_exchange_name = '{format_name}',
rabbitmq_exchange_type = 'direct',
rabbitmq_max_block_size = 100,
rabbitmq_flush_interval_ms = 1000,
rabbitmq_routing_key_list = '{format_name}',
rabbitmq_max_rows_per_message = 5;
CREATE MATERIALIZED VIEW test.view Engine=Log AS
SELECT key, value FROM test.rabbit;
"""
@ -3074,6 +3177,8 @@ def test_block_based_formats_1(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'PrettySpace',
rabbitmq_exchange_type = 'direct',
rabbitmq_max_block_size = 100,
rabbitmq_flush_interval_ms = 1000,
rabbitmq_routing_key_list = 'PrettySpace',
rabbitmq_format = 'PrettySpace';
"""
@ -3150,8 +3255,10 @@ def test_block_based_formats_2(rabbitmq_cluster):
rabbitmq_format = '{format_name}',
rabbitmq_exchange_name = '{format_name}',
rabbitmq_exchange_type = 'direct',
rabbitmq_max_block_size = 100,
rabbitmq_flush_interval_ms = 1000,
rabbitmq_routing_key_list = '{format_name}';
CREATE MATERIALIZED VIEW test.view Engine=Log AS
SELECT key, value FROM test.rabbit;
"""
@ -3206,3 +3313,172 @@ def test_block_based_formats_2(rabbitmq_cluster):
for i in range(num_rows):
expected += str(i * 10) + "\t" + str(i * 100) + "\n"
assert result == expected
def test_rabbitmq_flush_by_block_size(rabbitmq_cluster):
instance.query(
"""
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'flush_by_block',
rabbitmq_queue_base = 'flush_by_block',
rabbitmq_max_block_size = 100,
rabbitmq_flush_interval_ms = 640000, /* should not flush by time during test */
rabbitmq_format = 'JSONEachRow';
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree()
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.rabbitmq;
SYSTEM STOP MERGES;
"""
)
cancel = threading.Event()
def produce():
credentials = pika.PlainCredentials("root", "clickhouse")
parameters = pika.ConnectionParameters(
rabbitmq_cluster.rabbitmq_ip,
rabbitmq_cluster.rabbitmq_port,
"/",
credentials,
)
connection = pika.BlockingConnection(parameters)
while not cancel.is_set():
try:
channel = connection.channel()
channel.basic_publish(
exchange="flush_by_block",
routing_key="",
body=json.dumps({"key": 0, "value": 0}),
)
except e:
print(f"Got error: {str(e)}")
produce_thread = threading.Thread(target=produce)
produce_thread.start()
while 0 == int(
instance.query(
"SELECT count() FROM system.parts WHERE database = 'test' AND table = 'view' AND name = 'all_1_1_0'"
)
):
time.sleep(0.5)
cancel.set()
produce_thread.join()
# more flushes can happens during test, we need to check only result of first flush (part named all_1_1_0).
result = instance.query("SELECT count() FROM test.view WHERE _part='all_1_1_0'")
# logging.debug(result)
instance.query(
"""
DROP TABLE test.consumer;
DROP TABLE test.view;
DROP TABLE test.rabbitmq;
"""
)
# 100 = first poll should return 100 messages (and rows)
# not waiting for stream_flush_interval_ms
assert (
int(result) == 100
), "Messages from rabbitmq should be flushed when block of size rabbitmq_max_block_size is formed!"
def test_rabbitmq_flush_by_time(rabbitmq_cluster):
instance.query(
"""
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'flush_by_time',
rabbitmq_queue_base = 'flush_by_time',
rabbitmq_max_block_size = 100,
rabbitmq_flush_interval_ms = 5000,
rabbitmq_format = 'JSONEachRow';
CREATE TABLE test.view (key UInt64, value UInt64, ts DateTime64(3) MATERIALIZED now64(3))
ENGINE = MergeTree()
ORDER BY key;
"""
)
cancel = threading.Event()
def produce():
credentials = pika.PlainCredentials("root", "clickhouse")
parameters = pika.ConnectionParameters(
rabbitmq_cluster.rabbitmq_ip,
rabbitmq_cluster.rabbitmq_port,
"/",
credentials,
)
connection = pika.BlockingConnection(parameters)
while not cancel.is_set():
try:
channel = connection.channel()
channel.basic_publish(
exchange="flush_by_time",
routing_key="",
body=json.dumps({"key": 0, "value": 0}),
)
print("Produced a message")
time.sleep(0.8)
except e:
print(f"Got error: {str(e)}")
produce_thread = threading.Thread(target=produce)
produce_thread.start()
instance.query(
"""
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.rabbitmq;
"""
)
while True:
time.sleep(0.2)
count = instance.query(
"SELECT count() FROM system.parts WHERE database = 'test' AND table = 'view'"
)
print(f"kssenii total count: {count}")
count = int(
instance.query(
"SELECT count() FROM system.parts WHERE database = 'test' AND table = 'view' AND name = 'all_1_1_0'"
)
)
print(f"kssenii count: {count}")
if count > 0:
break
time.sleep(12)
result = instance.query("SELECT uniqExact(ts) FROM test.view")
cancel.set()
produce_thread.join()
instance.query(
"""
DROP TABLE test.consumer;
DROP TABLE test.view;
DROP TABLE test.rabbitmq;
"""
)
assert int(result) == 3