mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 00:22:29 +00:00
69d23f5e67
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
3777 lines
122 KiB
Python
3777 lines
122 KiB
Python
import pytest
|
|
|
|
import json
|
|
import os.path as p
|
|
import random
|
|
import subprocess
|
|
import threading
|
|
import logging
|
|
import time
|
|
from random import randrange
|
|
import math
|
|
|
|
import pika
|
|
from google.protobuf.internal.encoder import _VarintBytes
|
|
from helpers.client import QueryRuntimeException
|
|
from helpers.cluster import ClickHouseCluster, check_rabbitmq_is_available
|
|
from helpers.test_tools import TSV
|
|
|
|
from . import rabbitmq_pb2
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
instance = cluster.add_instance(
|
|
"instance",
|
|
main_configs=[
|
|
"configs/rabbitmq.xml",
|
|
"configs/macros.xml",
|
|
"configs/named_collection.xml",
|
|
],
|
|
user_configs=["configs/users.xml"],
|
|
with_rabbitmq=True,
|
|
stay_alive=True,
|
|
)
|
|
|
|
instance2 = cluster.add_instance(
|
|
"instance2",
|
|
user_configs=["configs/users.xml"],
|
|
with_rabbitmq=True,
|
|
)
|
|
|
|
instance3 = cluster.add_instance(
|
|
"instance3",
|
|
user_configs=["configs/users.xml"],
|
|
main_configs=[
|
|
"configs/rabbitmq.xml",
|
|
"configs/macros.xml",
|
|
"configs/named_collection.xml",
|
|
"configs/mergetree.xml",
|
|
],
|
|
with_rabbitmq=True,
|
|
stay_alive=True,
|
|
)
|
|
|
|
# Helpers
|
|
|
|
|
|
def rabbitmq_check_result(result, check=False, reference=None):
|
|
if reference is None:
|
|
reference = "\n".join([f"{i}\t{i}" for i in range(50)])
|
|
if check:
|
|
assert TSV(result) == TSV(reference)
|
|
else:
|
|
return TSV(result) == TSV(reference)
|
|
|
|
|
|
def wait_rabbitmq_to_start(rabbitmq_docker_id, cookie, timeout=180):
|
|
logging.getLogger("pika").propagate = False
|
|
start = time.time()
|
|
while time.time() - start < timeout:
|
|
try:
|
|
if check_rabbitmq_is_available(rabbitmq_docker_id, cookie):
|
|
logging.debug("RabbitMQ is available")
|
|
return
|
|
time.sleep(0.5)
|
|
except Exception as ex:
|
|
logging.debug("Can't connect to RabbitMQ " + str(ex))
|
|
time.sleep(0.5)
|
|
|
|
|
|
def kill_rabbitmq(rabbitmq_id):
|
|
p = subprocess.Popen(("docker", "stop", rabbitmq_id), stdout=subprocess.PIPE)
|
|
p.communicate()
|
|
return p.returncode == 0
|
|
|
|
|
|
def revive_rabbitmq(rabbitmq_id, cookie):
|
|
p = subprocess.Popen(("docker", "start", rabbitmq_id), stdout=subprocess.PIPE)
|
|
p.communicate()
|
|
wait_rabbitmq_to_start(rabbitmq_id, cookie)
|
|
|
|
|
|
# Fixtures
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def rabbitmq_cluster():
|
|
try:
|
|
cluster.start()
|
|
logging.debug("rabbitmq_id is {}".format(instance.cluster.rabbitmq_docker_id))
|
|
instance.query("CREATE DATABASE test")
|
|
instance3.query("CREATE DATABASE test")
|
|
|
|
yield cluster
|
|
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def rabbitmq_setup_teardown():
|
|
logging.debug("RabbitMQ is available - running test")
|
|
yield # run test
|
|
instance.query("DROP DATABASE test SYNC")
|
|
instance.query("CREATE DATABASE test")
|
|
|
|
|
|
# Tests
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"secure",
|
|
[
|
|
pytest.param(0),
|
|
pytest.param(1),
|
|
],
|
|
)
|
|
def test_rabbitmq_select(rabbitmq_cluster, secure):
|
|
if secure and instance.is_built_with_thread_sanitizer():
|
|
pytest.skip(
|
|
"Data races: see https://github.com/ClickHouse/ClickHouse/issues/56866"
|
|
)
|
|
|
|
port = cluster.rabbitmq_port
|
|
if secure:
|
|
port = cluster.rabbitmq_secure_port
|
|
|
|
# MATERIALIZED and ALIAS columns are not supported in RabbitMQ engine, but we can test that it does not fail
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.rabbitmq (key UInt64, value UInt64, value2 ALIAS value + 1, value3 MATERIALIZED value + 1)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = '{}:{}',
|
|
rabbitmq_exchange_name = 'select',
|
|
rabbitmq_commit_on_select = 1,
|
|
rabbitmq_format = 'JSONEachRow',
|
|
rabbitmq_row_delimiter = '\\n',
|
|
rabbitmq_secure = {};
|
|
""".format(
|
|
rabbitmq_cluster.rabbitmq_host, port, secure
|
|
)
|
|
)
|
|
|
|
assert (
|
|
"RabbitMQ table engine doesn\\'t support ALIAS, DEFAULT or MATERIALIZED columns"
|
|
in instance.query("SELECT * FROM system.warnings")
|
|
)
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
|
|
messages = []
|
|
for i in range(50):
|
|
messages.append(json.dumps({"key": i, "value": i}))
|
|
|
|
for message in messages:
|
|
channel.basic_publish(exchange="select", routing_key="", body=message)
|
|
|
|
connection.close()
|
|
# The order of messages in select * from test.rabbitmq is not guaranteed, so sleep to collect everything in one select
|
|
time.sleep(1)
|
|
|
|
result = ""
|
|
while True:
|
|
result += instance.query(
|
|
"SELECT * FROM test.rabbitmq ORDER BY key", ignore_error=True
|
|
)
|
|
if rabbitmq_check_result(result):
|
|
break
|
|
|
|
rabbitmq_check_result(result, True)
|
|
|
|
|
|
def test_rabbitmq_select_empty(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = '{}:5672',
|
|
rabbitmq_exchange_name = 'empty',
|
|
rabbitmq_commit_on_select = 1,
|
|
rabbitmq_format = 'TSV',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_row_delimiter = '\\n';
|
|
""".format(
|
|
rabbitmq_cluster.rabbitmq_host
|
|
)
|
|
)
|
|
|
|
assert int(instance.query("SELECT count() FROM test.rabbitmq")) == 0
|
|
|
|
|
|
def test_rabbitmq_json_without_delimiter(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = '{}:5672',
|
|
rabbitmq_commit_on_select = 1,
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_max_block_size=100,
|
|
rabbitmq_exchange_name = 'json',
|
|
rabbitmq_format = 'JSONEachRow'
|
|
""".format(
|
|
rabbitmq_cluster.rabbitmq_host
|
|
)
|
|
)
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
|
|
messages = ""
|
|
for i in range(25):
|
|
messages += json.dumps({"key": i, "value": i}) + "\n"
|
|
|
|
all_messages = [messages]
|
|
for message in all_messages:
|
|
channel.basic_publish(exchange="json", routing_key="", body=message)
|
|
|
|
messages = ""
|
|
for i in range(25, 50):
|
|
messages += json.dumps({"key": i, "value": i}) + "\n"
|
|
all_messages = [messages]
|
|
for message in all_messages:
|
|
channel.basic_publish(exchange="json", routing_key="", body=message)
|
|
|
|
connection.close()
|
|
time.sleep(1)
|
|
|
|
result = ""
|
|
while True:
|
|
result += instance.query(
|
|
"SELECT * FROM test.rabbitmq ORDER BY key", ignore_error=True
|
|
)
|
|
if rabbitmq_check_result(result):
|
|
break
|
|
|
|
rabbitmq_check_result(result, True)
|
|
|
|
|
|
def test_rabbitmq_csv_with_delimiter(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'csv',
|
|
rabbitmq_commit_on_select = 1,
|
|
rabbitmq_format = 'CSV',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_max_block_size=100,
|
|
rabbitmq_row_delimiter = '\\n';
|
|
"""
|
|
)
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
|
|
messages = []
|
|
for i in range(50):
|
|
messages.append("{i}, {i}".format(i=i))
|
|
|
|
for message in messages:
|
|
channel.basic_publish(exchange="csv", routing_key="", body=message)
|
|
|
|
connection.close()
|
|
time.sleep(1)
|
|
|
|
result = ""
|
|
while True:
|
|
result += instance.query(
|
|
"SELECT * FROM test.rabbitmq ORDER BY key", ignore_error=True
|
|
)
|
|
if rabbitmq_check_result(result):
|
|
break
|
|
|
|
rabbitmq_check_result(result, True)
|
|
|
|
|
|
def test_rabbitmq_tsv_with_delimiter(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'tsv',
|
|
rabbitmq_format = 'TSV',
|
|
rabbitmq_commit_on_select = 1,
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_max_block_size=100,
|
|
rabbitmq_queue_base = 'tsv',
|
|
rabbitmq_row_delimiter = '\\n';
|
|
CREATE TABLE test.view (key UInt64, value UInt64)
|
|
ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
|
|
SELECT * FROM test.rabbitmq;
|
|
"""
|
|
)
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
|
|
messages = []
|
|
for i in range(50):
|
|
messages.append("{i}\t{i}".format(i=i))
|
|
|
|
for message in messages:
|
|
channel.basic_publish(exchange="tsv", routing_key="", body=message)
|
|
connection.close()
|
|
|
|
result = ""
|
|
while True:
|
|
result = instance.query("SELECT * FROM test.view ORDER BY key")
|
|
if rabbitmq_check_result(result):
|
|
break
|
|
|
|
rabbitmq_check_result(result, True)
|
|
|
|
|
|
def test_rabbitmq_macros(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = '{rabbitmq_host}:{rabbitmq_port}',
|
|
rabbitmq_commit_on_select = 1,
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_max_block_size=100,
|
|
rabbitmq_exchange_name = '{rabbitmq_exchange_name}',
|
|
rabbitmq_format = '{rabbitmq_format}'
|
|
"""
|
|
)
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
|
|
message = ""
|
|
for i in range(50):
|
|
message += json.dumps({"key": i, "value": i}) + "\n"
|
|
channel.basic_publish(exchange="macro", routing_key="", body=message)
|
|
|
|
connection.close()
|
|
time.sleep(1)
|
|
|
|
result = ""
|
|
while True:
|
|
result += instance.query(
|
|
"SELECT * FROM test.rabbitmq ORDER BY key", ignore_error=True
|
|
)
|
|
if rabbitmq_check_result(result):
|
|
break
|
|
|
|
rabbitmq_check_result(result, True)
|
|
|
|
|
|
def test_rabbitmq_materialized_view(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.rabbitmq (key UInt64, value UInt64, dt1 DateTime MATERIALIZED now(), value2 ALIAS value + 1)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'mv',
|
|
rabbitmq_format = 'JSONEachRow',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_max_block_size=100,
|
|
rabbitmq_row_delimiter = '\\n';
|
|
CREATE TABLE test.view (key UInt64, value UInt64)
|
|
ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
|
|
SELECT * FROM test.rabbitmq;
|
|
|
|
CREATE TABLE test.view2 (key UInt64, value UInt64)
|
|
ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
CREATE MATERIALIZED VIEW test.consumer2 TO test.view2 AS
|
|
SELECT * FROM test.rabbitmq group by (key, value);
|
|
"""
|
|
)
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
|
|
instance.wait_for_log_line("Started streaming to 2 attached views")
|
|
|
|
messages = []
|
|
for i in range(50):
|
|
message = json.dumps({"key": i, "value": i})
|
|
channel.basic_publish(exchange="mv", routing_key="", body=message)
|
|
|
|
time_limit_sec = 60
|
|
deadline = time.monotonic() + time_limit_sec
|
|
|
|
while time.monotonic() < deadline:
|
|
result = instance.query("SELECT * FROM test.view ORDER BY key")
|
|
if rabbitmq_check_result(result):
|
|
break
|
|
|
|
rabbitmq_check_result(result, True)
|
|
|
|
deadline = time.monotonic() + time_limit_sec
|
|
|
|
while time.monotonic() < deadline:
|
|
result = instance.query("SELECT * FROM test.view2 ORDER BY key")
|
|
logging.debug(f"Result: {result}")
|
|
if rabbitmq_check_result(result):
|
|
break
|
|
time.sleep(1)
|
|
|
|
rabbitmq_check_result(result, True)
|
|
connection.close()
|
|
|
|
|
|
def test_rabbitmq_materialized_view_with_subquery(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'mvsq',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_max_block_size=100,
|
|
rabbitmq_format = 'JSONEachRow',
|
|
rabbitmq_row_delimiter = '\\n';
|
|
CREATE TABLE test.view (key UInt64, value UInt64)
|
|
ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
|
|
SELECT * FROM (SELECT * FROM test.rabbitmq);
|
|
"""
|
|
)
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
|
|
messages = []
|
|
for i in range(50):
|
|
messages.append(json.dumps({"key": i, "value": i}))
|
|
for message in messages:
|
|
channel.basic_publish(exchange="mvsq", routing_key="", body=message)
|
|
|
|
while True:
|
|
result = instance.query("SELECT * FROM test.view ORDER BY key")
|
|
if rabbitmq_check_result(result):
|
|
break
|
|
|
|
connection.close()
|
|
rabbitmq_check_result(result, True)
|
|
|
|
|
|
def test_rabbitmq_many_materialized_views(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.view1;
|
|
DROP TABLE IF EXISTS test.view2;
|
|
DROP TABLE IF EXISTS test.view3;
|
|
DROP TABLE IF EXISTS test.consumer1;
|
|
DROP TABLE IF EXISTS test.consumer2;
|
|
DROP TABLE IF EXISTS test.consumer3;
|
|
CREATE TABLE test.rabbitmq (key UInt64, value UInt64, value2 ALIAS value + 1, value3 MATERIALIZED value + 1, value4 DEFAULT 1)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'mmv',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_max_block_size=100,
|
|
rabbitmq_format = 'JSONEachRow',
|
|
rabbitmq_row_delimiter = '\\n';
|
|
CREATE TABLE test.view1 (key UInt64, value UInt64)
|
|
ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
CREATE TABLE test.view2 (key UInt64, value UInt64, value2 UInt64, value3 UInt64, value4 UInt64)
|
|
ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
CREATE TABLE test.view3 (key UInt64)
|
|
ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
CREATE MATERIALIZED VIEW test.consumer1 TO test.view1 AS
|
|
SELECT * FROM test.rabbitmq;
|
|
CREATE MATERIALIZED VIEW test.consumer2 TO test.view2 AS
|
|
SELECT * FROM test.rabbitmq;
|
|
CREATE MATERIALIZED VIEW test.consumer3 TO test.view3 AS
|
|
SELECT * FROM test.rabbitmq;
|
|
"""
|
|
)
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
|
|
instance.wait_for_log_line("Started streaming to 3 attached views")
|
|
|
|
messages = []
|
|
for i in range(50):
|
|
messages.append(json.dumps({"key": i, "value": i}))
|
|
for message in messages:
|
|
channel.basic_publish(exchange="mmv", routing_key="", body=message)
|
|
|
|
is_check_passed = False
|
|
deadline = time.monotonic() + 60
|
|
while time.monotonic() < deadline:
|
|
result1 = instance.query("SELECT * FROM test.view1 ORDER BY key")
|
|
result2 = instance.query("SELECT * FROM test.view2 ORDER BY key")
|
|
result3 = instance.query("SELECT * FROM test.view3 ORDER BY key")
|
|
# Note that for view2 result is `i i 0 0 0`, but not `i i i+1 i+1 1` as expected, ALIAS/MATERIALIZED/DEFAULT columns are not supported in RabbitMQ engine
|
|
# We onlt check that at least it do not fail
|
|
if (
|
|
rabbitmq_check_result(result1)
|
|
and rabbitmq_check_result(
|
|
result2, reference="\n".join([f"{i}\t{i}\t0\t0\t0" for i in range(50)])
|
|
)
|
|
and rabbitmq_check_result(
|
|
result3, reference="\n".join([str(i) for i in range(50)])
|
|
)
|
|
):
|
|
is_check_passed = True
|
|
break
|
|
time.sleep(0.1)
|
|
|
|
assert (
|
|
is_check_passed
|
|
), f"References are not equal to results, result1: {result1}, result2: {result2}, result3: {result3}"
|
|
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.consumer1;
|
|
DROP TABLE test.consumer2;
|
|
DROP TABLE test.consumer3;
|
|
DROP TABLE test.view1;
|
|
DROP TABLE test.view2;
|
|
DROP TABLE test.view3;
|
|
"""
|
|
)
|
|
|
|
connection.close()
|
|
|
|
|
|
def test_rabbitmq_big_message(rabbitmq_cluster):
|
|
# Create batchs of messages of size ~100Kb
|
|
rabbitmq_messages = 1000
|
|
batch_messages = 1000
|
|
messages = [
|
|
json.dumps({"key": i, "value": "x" * 100}) * batch_messages
|
|
for i in range(rabbitmq_messages)
|
|
]
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.rabbitmq (key UInt64, value String)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'big',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_max_block_size=100,
|
|
rabbitmq_format = 'JSONEachRow';
|
|
CREATE TABLE test.view (key UInt64, value String)
|
|
ENGINE = MergeTree
|
|
ORDER BY key;
|
|
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
|
|
SELECT * FROM test.rabbitmq;
|
|
"""
|
|
)
|
|
|
|
for message in messages:
|
|
channel.basic_publish(exchange="big", routing_key="", body=message)
|
|
|
|
while True:
|
|
result = instance.query("SELECT count() FROM test.view")
|
|
if int(result) == batch_messages * rabbitmq_messages:
|
|
break
|
|
|
|
connection.close()
|
|
|
|
assert (
|
|
int(result) == rabbitmq_messages * batch_messages
|
|
), "ClickHouse lost some messages: {}".format(result)
|
|
|
|
|
|
def test_rabbitmq_sharding_between_queues_publish(rabbitmq_cluster):
|
|
NUM_CONSUMERS = 10
|
|
NUM_QUEUES = 10
|
|
logging.getLogger("pika").propagate = False
|
|
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'test_sharding',
|
|
rabbitmq_num_queues = 5,
|
|
rabbitmq_num_consumers = 10,
|
|
rabbitmq_max_block_size = 100,
|
|
rabbitmq_flush_interval_ms=500,
|
|
rabbitmq_format = 'JSONEachRow',
|
|
rabbitmq_row_delimiter = '\\n';
|
|
CREATE TABLE test.view (key UInt64, value UInt64, channel_id String)
|
|
ENGINE = MergeTree
|
|
ORDER BY key
|
|
SETTINGS old_parts_lifetime=5, cleanup_delay_period=2, cleanup_delay_period_random_add=3,
|
|
cleanup_thread_preferred_points_per_iteration=0;
|
|
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
|
|
SELECT *, _channel_id AS channel_id FROM test.rabbitmq;
|
|
"""
|
|
)
|
|
|
|
i = [0]
|
|
messages_num = 10000
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
|
|
def produce():
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
|
|
messages = []
|
|
for _ in range(messages_num):
|
|
messages.append(json.dumps({"key": i[0], "value": i[0]}))
|
|
i[0] += 1
|
|
current = 0
|
|
for message in messages:
|
|
current += 1
|
|
mes_id = str(current)
|
|
channel.basic_publish(
|
|
exchange="test_sharding",
|
|
routing_key="",
|
|
properties=pika.BasicProperties(message_id=mes_id),
|
|
body=message,
|
|
)
|
|
connection.close()
|
|
|
|
threads = []
|
|
threads_num = 10
|
|
|
|
for _ in range(threads_num):
|
|
threads.append(threading.Thread(target=produce))
|
|
for thread in threads:
|
|
time.sleep(random.uniform(0, 1))
|
|
thread.start()
|
|
|
|
result1 = ""
|
|
while True:
|
|
result1 = instance.query("SELECT count() FROM test.view")
|
|
time.sleep(1)
|
|
expected = messages_num * threads_num
|
|
if int(result1) == expected:
|
|
break
|
|
logging.debug(f"Result {result1} / {expected}")
|
|
|
|
result2 = instance.query("SELECT count(DISTINCT channel_id) FROM test.view")
|
|
|
|
for thread in threads:
|
|
thread.join()
|
|
|
|
assert (
|
|
int(result1) == messages_num * threads_num
|
|
), "ClickHouse lost some messages: {}".format(result1)
|
|
assert int(result2) == 10
|
|
|
|
|
|
def test_rabbitmq_mv_combo(rabbitmq_cluster):
|
|
NUM_MV = 5
|
|
NUM_CONSUMERS = 4
|
|
logging.getLogger("pika").propagate = False
|
|
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'combo',
|
|
rabbitmq_queue_base = 'combo',
|
|
rabbitmq_max_block_size = 100,
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_num_consumers = 2,
|
|
rabbitmq_num_queues = 5,
|
|
rabbitmq_format = 'JSONEachRow',
|
|
rabbitmq_row_delimiter = '\\n';
|
|
"""
|
|
)
|
|
|
|
for mv_id in range(NUM_MV):
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.combo_{0};
|
|
DROP TABLE IF EXISTS test.combo_{0}_mv;
|
|
CREATE TABLE test.combo_{0} (key UInt64, value UInt64)
|
|
ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
CREATE MATERIALIZED VIEW test.combo_{0}_mv TO test.combo_{0} AS
|
|
SELECT * FROM test.rabbitmq;
|
|
""".format(
|
|
mv_id
|
|
)
|
|
)
|
|
|
|
time.sleep(2)
|
|
|
|
i = [0]
|
|
messages_num = 10000
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
|
|
def produce():
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
|
|
messages = []
|
|
for _ in range(messages_num):
|
|
messages.append(json.dumps({"key": i[0], "value": i[0]}))
|
|
i[0] += 1
|
|
for msg_id in range(messages_num):
|
|
channel.basic_publish(
|
|
exchange="combo",
|
|
routing_key="",
|
|
properties=pika.BasicProperties(message_id=str(msg_id)),
|
|
body=messages[msg_id],
|
|
)
|
|
connection.close()
|
|
|
|
threads = []
|
|
threads_num = 20
|
|
|
|
for _ in range(threads_num):
|
|
threads.append(threading.Thread(target=produce))
|
|
for thread in threads:
|
|
time.sleep(random.uniform(0, 1))
|
|
thread.start()
|
|
|
|
while True:
|
|
result = 0
|
|
for mv_id in range(NUM_MV):
|
|
result += int(
|
|
instance.query("SELECT count() FROM test.combo_{0}".format(mv_id))
|
|
)
|
|
expected = messages_num * threads_num * NUM_MV
|
|
if int(result) == expected:
|
|
break
|
|
logging.debug(f"Result: {result} / {expected}")
|
|
time.sleep(1)
|
|
|
|
for thread in threads:
|
|
thread.join()
|
|
|
|
for mv_id in range(NUM_MV):
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.combo_{0}_mv;
|
|
DROP TABLE test.combo_{0};
|
|
""".format(
|
|
mv_id
|
|
)
|
|
)
|
|
|
|
assert (
|
|
int(result) == messages_num * threads_num * NUM_MV
|
|
), "ClickHouse lost some messages: {}".format(result)
|
|
|
|
|
|
def test_rabbitmq_insert(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'insert',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_max_block_size=100,
|
|
rabbitmq_exchange_type = 'direct',
|
|
rabbitmq_routing_key_list = 'insert1',
|
|
rabbitmq_format = 'TSV',
|
|
rabbitmq_row_delimiter = '\\n';
|
|
"""
|
|
)
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
consumer_connection = pika.BlockingConnection(parameters)
|
|
|
|
consumer = consumer_connection.channel()
|
|
result = consumer.queue_declare(queue="")
|
|
queue_name = result.method.queue
|
|
consumer.queue_bind(exchange="insert", queue=queue_name, routing_key="insert1")
|
|
|
|
values = []
|
|
for i in range(50):
|
|
values.append("({i}, {i})".format(i=i))
|
|
values = ",".join(values)
|
|
|
|
while True:
|
|
try:
|
|
instance.query("INSERT INTO test.rabbitmq VALUES {}".format(values))
|
|
break
|
|
except QueryRuntimeException as e:
|
|
if "Local: Timed out." in str(e):
|
|
continue
|
|
else:
|
|
raise
|
|
|
|
insert_messages = []
|
|
|
|
def onReceived(channel, method, properties, body):
|
|
i = 0
|
|
insert_messages.append(body.decode())
|
|
if len(insert_messages) == 50:
|
|
channel.stop_consuming()
|
|
|
|
consumer.basic_consume(queue_name, onReceived)
|
|
consumer.start_consuming()
|
|
consumer_connection.close()
|
|
|
|
result = "\n".join(insert_messages)
|
|
rabbitmq_check_result(result, True)
|
|
|
|
|
|
def test_rabbitmq_insert_headers_exchange(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'insert_headers',
|
|
rabbitmq_exchange_type = 'headers',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_max_block_size=100,
|
|
rabbitmq_routing_key_list = 'test=insert,topic=headers',
|
|
rabbitmq_format = 'TSV',
|
|
rabbitmq_row_delimiter = '\\n';
|
|
"""
|
|
)
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
consumer_connection = pika.BlockingConnection(parameters)
|
|
|
|
consumer = consumer_connection.channel()
|
|
result = consumer.queue_declare(queue="")
|
|
queue_name = result.method.queue
|
|
consumer.queue_bind(
|
|
exchange="insert_headers",
|
|
queue=queue_name,
|
|
routing_key="",
|
|
arguments={"x-match": "all", "test": "insert", "topic": "headers"},
|
|
)
|
|
|
|
values = []
|
|
for i in range(50):
|
|
values.append("({i}, {i})".format(i=i))
|
|
values = ",".join(values)
|
|
|
|
while True:
|
|
try:
|
|
instance.query("INSERT INTO test.rabbitmq VALUES {}".format(values))
|
|
break
|
|
except QueryRuntimeException as e:
|
|
if "Local: Timed out." in str(e):
|
|
continue
|
|
else:
|
|
raise
|
|
|
|
insert_messages = []
|
|
|
|
def onReceived(channel, method, properties, body):
|
|
i = 0
|
|
insert_messages.append(body.decode())
|
|
if len(insert_messages) == 50:
|
|
channel.stop_consuming()
|
|
|
|
consumer.basic_consume(queue_name, onReceived)
|
|
consumer.start_consuming()
|
|
consumer_connection.close()
|
|
|
|
result = "\n".join(insert_messages)
|
|
rabbitmq_check_result(result, True)
|
|
|
|
|
|
def test_rabbitmq_many_inserts(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.rabbitmq_many;
|
|
DROP TABLE IF EXISTS test.rabbitmq_consume;
|
|
DROP TABLE IF EXISTS test.view_many;
|
|
DROP TABLE IF EXISTS test.consumer_many;
|
|
CREATE TABLE test.rabbitmq_many (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'many_inserts',
|
|
rabbitmq_exchange_type = 'direct',
|
|
rabbitmq_routing_key_list = 'insert2',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_max_block_size=100,
|
|
rabbitmq_format = 'TSV',
|
|
rabbitmq_row_delimiter = '\\n';
|
|
CREATE TABLE test.rabbitmq_consume (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'many_inserts',
|
|
rabbitmq_exchange_type = 'direct',
|
|
rabbitmq_routing_key_list = 'insert2',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_max_block_size=100,
|
|
rabbitmq_format = 'TSV',
|
|
rabbitmq_row_delimiter = '\\n';
|
|
"""
|
|
)
|
|
|
|
messages_num = 10000
|
|
values = []
|
|
for i in range(messages_num):
|
|
values.append("({i}, {i})".format(i=i))
|
|
values = ",".join(values)
|
|
|
|
def insert():
|
|
while True:
|
|
try:
|
|
instance.query(
|
|
"INSERT INTO test.rabbitmq_many VALUES {}".format(values)
|
|
)
|
|
break
|
|
except QueryRuntimeException as e:
|
|
if "Local: Timed out." in str(e):
|
|
continue
|
|
else:
|
|
raise
|
|
|
|
threads = []
|
|
threads_num = 10
|
|
for _ in range(threads_num):
|
|
threads.append(threading.Thread(target=insert))
|
|
for thread in threads:
|
|
time.sleep(random.uniform(0, 1))
|
|
thread.start()
|
|
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.view_many (key UInt64, value UInt64)
|
|
ENGINE = MergeTree
|
|
ORDER BY key;
|
|
CREATE MATERIALIZED VIEW test.consumer_many TO test.view_many AS
|
|
SELECT * FROM test.rabbitmq_consume;
|
|
"""
|
|
)
|
|
|
|
for thread in threads:
|
|
thread.join()
|
|
|
|
while True:
|
|
result = instance.query("SELECT count() FROM test.view_many")
|
|
logging.debug(result, messages_num * threads_num)
|
|
if int(result) == messages_num * threads_num:
|
|
break
|
|
time.sleep(1)
|
|
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.rabbitmq_consume;
|
|
DROP TABLE test.rabbitmq_many;
|
|
DROP TABLE test.consumer_many;
|
|
DROP TABLE test.view_many;
|
|
"""
|
|
)
|
|
|
|
assert (
|
|
int(result) == messages_num * threads_num
|
|
), "ClickHouse lost some messages: {}".format(result)
|
|
|
|
|
|
def test_rabbitmq_overloaded_insert(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.view_overload;
|
|
DROP TABLE IF EXISTS test.consumer_overload;
|
|
DROP TABLE IF EXISTS test.rabbitmq_consume;
|
|
CREATE TABLE test.rabbitmq_consume (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'over',
|
|
rabbitmq_queue_base = 'over',
|
|
rabbitmq_exchange_type = 'direct',
|
|
rabbitmq_num_consumers = 2,
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_max_block_size = 100,
|
|
rabbitmq_routing_key_list = 'over',
|
|
rabbitmq_format = 'TSV',
|
|
rabbitmq_row_delimiter = '\\n';
|
|
CREATE TABLE test.rabbitmq_overload (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'over',
|
|
rabbitmq_exchange_type = 'direct',
|
|
rabbitmq_routing_key_list = 'over',
|
|
rabbitmq_format = 'TSV',
|
|
rabbitmq_row_delimiter = '\\n';
|
|
CREATE TABLE test.view_overload (key UInt64, value UInt64)
|
|
ENGINE = MergeTree
|
|
ORDER BY key;
|
|
CREATE MATERIALIZED VIEW test.consumer_overload TO test.view_overload AS
|
|
SELECT * FROM test.rabbitmq_consume;
|
|
"""
|
|
)
|
|
|
|
instance.wait_for_log_line("Started streaming to 1 attached views")
|
|
|
|
messages_num = 100000
|
|
|
|
def insert():
|
|
values = []
|
|
for i in range(messages_num):
|
|
values.append("({i}, {i})".format(i=i))
|
|
values = ",".join(values)
|
|
|
|
while True:
|
|
try:
|
|
instance.query(
|
|
"INSERT INTO test.rabbitmq_overload VALUES {}".format(values)
|
|
)
|
|
break
|
|
except QueryRuntimeException as e:
|
|
if "Local: Timed out." in str(e):
|
|
continue
|
|
else:
|
|
raise
|
|
|
|
threads = []
|
|
threads_num = 2
|
|
for _ in range(threads_num):
|
|
threads.append(threading.Thread(target=insert))
|
|
for thread in threads:
|
|
time.sleep(random.uniform(0, 1))
|
|
thread.start()
|
|
|
|
for thread in threads:
|
|
thread.join()
|
|
|
|
while True:
|
|
result = instance.query("SELECT count() FROM test.view_overload")
|
|
expected = messages_num * threads_num
|
|
if int(result) == expected:
|
|
break
|
|
logging.debug(f"Result: {result} / {expected}")
|
|
time.sleep(1)
|
|
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.consumer_overload SYNC;
|
|
DROP TABLE test.view_overload SYNC;
|
|
DROP TABLE test.rabbitmq_consume SYNC;
|
|
DROP TABLE test.rabbitmq_overload SYNC;
|
|
"""
|
|
)
|
|
|
|
assert (
|
|
int(result) == messages_num * threads_num
|
|
), "ClickHouse lost some messages: {}".format(result)
|
|
|
|
|
|
def test_rabbitmq_direct_exchange(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.destination;
|
|
CREATE TABLE test.destination(key UInt64, value UInt64)
|
|
ENGINE = MergeTree()
|
|
ORDER BY key
|
|
SETTINGS old_parts_lifetime=5, cleanup_delay_period=2, cleanup_delay_period_random_add=3,
|
|
cleanup_thread_preferred_points_per_iteration=0;
|
|
"""
|
|
)
|
|
|
|
num_tables = 5
|
|
for consumer_id in range(num_tables):
|
|
logging.debug(("Setting up table {}".format(consumer_id)))
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.direct_exchange_{0};
|
|
DROP TABLE IF EXISTS test.direct_exchange_{0}_mv;
|
|
CREATE TABLE test.direct_exchange_{0} (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_num_consumers = 2,
|
|
rabbitmq_num_queues = 2,
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_max_block_size=100,
|
|
rabbitmq_exchange_name = 'direct_exchange_testing',
|
|
rabbitmq_exchange_type = 'direct',
|
|
rabbitmq_routing_key_list = 'direct_{0}',
|
|
rabbitmq_format = 'JSONEachRow',
|
|
rabbitmq_row_delimiter = '\\n';
|
|
CREATE MATERIALIZED VIEW test.direct_exchange_{0}_mv TO test.destination AS
|
|
SELECT key, value FROM test.direct_exchange_{0};
|
|
""".format(
|
|
consumer_id
|
|
)
|
|
)
|
|
|
|
i = [0]
|
|
messages_num = 1000
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
|
|
messages = []
|
|
for _ in range(messages_num):
|
|
messages.append(json.dumps({"key": i[0], "value": i[0]}))
|
|
i[0] += 1
|
|
|
|
key_num = 0
|
|
for num in range(num_tables):
|
|
key = "direct_" + str(key_num)
|
|
key_num += 1
|
|
for message in messages:
|
|
mes_id = str(randrange(10))
|
|
channel.basic_publish(
|
|
exchange="direct_exchange_testing",
|
|
routing_key=key,
|
|
properties=pika.BasicProperties(message_id=mes_id),
|
|
body=message,
|
|
)
|
|
|
|
connection.close()
|
|
|
|
while True:
|
|
result = instance.query("SELECT count() FROM test.destination")
|
|
time.sleep(1)
|
|
if int(result) == messages_num * num_tables:
|
|
break
|
|
|
|
for consumer_id in range(num_tables):
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.direct_exchange_{0}_mv;
|
|
DROP TABLE test.direct_exchange_{0};
|
|
""".format(
|
|
consumer_id
|
|
)
|
|
)
|
|
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.destination;
|
|
"""
|
|
)
|
|
|
|
assert (
|
|
int(result) == messages_num * num_tables
|
|
), "ClickHouse lost some messages: {}".format(result)
|
|
|
|
|
|
def test_rabbitmq_fanout_exchange(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.destination;
|
|
CREATE TABLE test.destination(key UInt64, value UInt64)
|
|
ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
"""
|
|
)
|
|
|
|
num_tables = 5
|
|
for consumer_id in range(num_tables):
|
|
logging.debug(("Setting up table {}".format(consumer_id)))
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.fanout_exchange_{0};
|
|
DROP TABLE IF EXISTS test.fanout_exchange_{0}_mv;
|
|
CREATE TABLE test.fanout_exchange_{0} (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_num_consumers = 2,
|
|
rabbitmq_num_queues = 2,
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_max_block_size=100,
|
|
rabbitmq_routing_key_list = 'key_{0}',
|
|
rabbitmq_exchange_name = 'fanout_exchange_testing',
|
|
rabbitmq_exchange_type = 'fanout',
|
|
rabbitmq_format = 'JSONEachRow',
|
|
rabbitmq_row_delimiter = '\\n';
|
|
CREATE MATERIALIZED VIEW test.fanout_exchange_{0}_mv TO test.destination AS
|
|
SELECT key, value FROM test.fanout_exchange_{0};
|
|
""".format(
|
|
consumer_id
|
|
)
|
|
)
|
|
|
|
i = [0]
|
|
messages_num = 1000
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
|
|
messages = []
|
|
for _ in range(messages_num):
|
|
messages.append(json.dumps({"key": i[0], "value": i[0]}))
|
|
i[0] += 1
|
|
|
|
for msg_id in range(messages_num):
|
|
channel.basic_publish(
|
|
exchange="fanout_exchange_testing",
|
|
routing_key="",
|
|
properties=pika.BasicProperties(message_id=str(msg_id)),
|
|
body=messages[msg_id],
|
|
)
|
|
|
|
connection.close()
|
|
|
|
while True:
|
|
result = instance.query("SELECT count() FROM test.destination")
|
|
time.sleep(1)
|
|
if int(result) == messages_num * num_tables:
|
|
break
|
|
|
|
for consumer_id in range(num_tables):
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.fanout_exchange_{0}_mv;
|
|
DROP TABLE test.fanout_exchange_{0};
|
|
""".format(
|
|
consumer_id
|
|
)
|
|
)
|
|
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.destination;
|
|
"""
|
|
)
|
|
|
|
assert (
|
|
int(result) == messages_num * num_tables
|
|
), "ClickHouse lost some messages: {}".format(result)
|
|
|
|
|
|
def test_rabbitmq_topic_exchange(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.destination;
|
|
CREATE TABLE test.destination(key UInt64, value UInt64)
|
|
ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
"""
|
|
)
|
|
|
|
num_tables = 5
|
|
for consumer_id in range(num_tables):
|
|
logging.debug(("Setting up table {}".format(consumer_id)))
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.topic_exchange_{0};
|
|
DROP TABLE IF EXISTS test.topic_exchange_{0}_mv;
|
|
CREATE TABLE test.topic_exchange_{0} (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_num_consumers = 2,
|
|
rabbitmq_num_queues = 2,
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_max_block_size=100,
|
|
rabbitmq_exchange_name = 'topic_exchange_testing',
|
|
rabbitmq_exchange_type = 'topic',
|
|
rabbitmq_routing_key_list = '*.{0}',
|
|
rabbitmq_format = 'JSONEachRow',
|
|
rabbitmq_row_delimiter = '\\n';
|
|
CREATE MATERIALIZED VIEW test.topic_exchange_{0}_mv TO test.destination AS
|
|
SELECT key, value FROM test.topic_exchange_{0};
|
|
""".format(
|
|
consumer_id
|
|
)
|
|
)
|
|
|
|
for consumer_id in range(num_tables):
|
|
logging.debug(("Setting up table {}".format(num_tables + consumer_id)))
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.topic_exchange_{0};
|
|
DROP TABLE IF EXISTS test.topic_exchange_{0}_mv;
|
|
CREATE TABLE test.topic_exchange_{0} (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_num_consumers = 2,
|
|
rabbitmq_num_queues = 2,
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_max_block_size=100,
|
|
rabbitmq_exchange_name = 'topic_exchange_testing',
|
|
rabbitmq_exchange_type = 'topic',
|
|
rabbitmq_routing_key_list = '*.logs',
|
|
rabbitmq_format = 'JSONEachRow',
|
|
rabbitmq_row_delimiter = '\\n';
|
|
CREATE MATERIALIZED VIEW test.topic_exchange_{0}_mv TO test.destination AS
|
|
SELECT key, value FROM test.topic_exchange_{0};
|
|
""".format(
|
|
num_tables + consumer_id
|
|
)
|
|
)
|
|
|
|
i = [0]
|
|
messages_num = 1000
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
|
|
messages = []
|
|
for _ in range(messages_num):
|
|
messages.append(json.dumps({"key": i[0], "value": i[0]}))
|
|
i[0] += 1
|
|
|
|
key_num = 0
|
|
for num in range(num_tables):
|
|
key = "topic." + str(key_num)
|
|
key_num += 1
|
|
for message in messages:
|
|
channel.basic_publish(
|
|
exchange="topic_exchange_testing", routing_key=key, body=message
|
|
)
|
|
|
|
key = "random.logs"
|
|
current = 0
|
|
for msg_id in range(messages_num):
|
|
channel.basic_publish(
|
|
exchange="topic_exchange_testing",
|
|
routing_key=key,
|
|
properties=pika.BasicProperties(message_id=str(msg_id)),
|
|
body=messages[msg_id],
|
|
)
|
|
|
|
connection.close()
|
|
|
|
while True:
|
|
result = instance.query("SELECT count() FROM test.destination")
|
|
time.sleep(1)
|
|
if int(result) == messages_num * num_tables + messages_num * num_tables:
|
|
break
|
|
|
|
for consumer_id in range(num_tables * 2):
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.topic_exchange_{0}_mv;
|
|
DROP TABLE test.topic_exchange_{0};
|
|
""".format(
|
|
consumer_id
|
|
)
|
|
)
|
|
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.destination;
|
|
"""
|
|
)
|
|
|
|
assert (
|
|
int(result) == messages_num * num_tables + messages_num * num_tables
|
|
), "ClickHouse lost some messages: {}".format(result)
|
|
|
|
|
|
def test_rabbitmq_hash_exchange(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.destination;
|
|
CREATE TABLE test.destination(key UInt64, value UInt64, channel_id String)
|
|
ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
"""
|
|
)
|
|
|
|
num_tables = 4
|
|
for consumer_id in range(num_tables):
|
|
table_name = "rabbitmq_consumer{}".format(consumer_id)
|
|
logging.debug(("Setting up {}".format(table_name)))
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.{0};
|
|
DROP TABLE IF EXISTS test.{0}_mv;
|
|
CREATE TABLE test.{0} (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_num_consumers = 4,
|
|
rabbitmq_num_queues = 2,
|
|
rabbitmq_exchange_type = 'consistent_hash',
|
|
rabbitmq_exchange_name = 'hash_exchange_testing',
|
|
rabbitmq_format = 'JSONEachRow',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_row_delimiter = '\\n';
|
|
CREATE MATERIALIZED VIEW test.{0}_mv TO test.destination AS
|
|
SELECT key, value, _channel_id AS channel_id FROM test.{0};
|
|
""".format(
|
|
table_name
|
|
)
|
|
)
|
|
|
|
i = [0]
|
|
messages_num = 500
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
|
|
def produce():
|
|
# init connection here because otherwise python rabbitmq client might fail
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
messages = []
|
|
for _ in range(messages_num):
|
|
messages.append(json.dumps({"key": i[0], "value": i[0]}))
|
|
i[0] += 1
|
|
for msg_id in range(messages_num):
|
|
channel.basic_publish(
|
|
exchange="hash_exchange_testing",
|
|
routing_key=str(msg_id),
|
|
properties=pika.BasicProperties(message_id=str(msg_id)),
|
|
body=messages[msg_id],
|
|
)
|
|
connection.close()
|
|
|
|
threads = []
|
|
threads_num = 10
|
|
|
|
for _ in range(threads_num):
|
|
threads.append(threading.Thread(target=produce))
|
|
for thread in threads:
|
|
time.sleep(random.uniform(0, 1))
|
|
thread.start()
|
|
|
|
result1 = ""
|
|
while True:
|
|
result1 = instance.query("SELECT count() FROM test.destination")
|
|
time.sleep(1)
|
|
if int(result1) == messages_num * threads_num:
|
|
break
|
|
|
|
result2 = instance.query("SELECT count(DISTINCT channel_id) FROM test.destination")
|
|
|
|
for consumer_id in range(num_tables):
|
|
table_name = "rabbitmq_consumer{}".format(consumer_id)
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.{0}_mv;
|
|
DROP TABLE test.{0};
|
|
""".format(
|
|
table_name
|
|
)
|
|
)
|
|
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.destination;
|
|
"""
|
|
)
|
|
|
|
for thread in threads:
|
|
thread.join()
|
|
|
|
assert (
|
|
int(result1) == messages_num * threads_num
|
|
), "ClickHouse lost some messages: {}".format(result1)
|
|
assert int(result2) == 4 * num_tables
|
|
|
|
|
|
def test_rabbitmq_multiple_bindings(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.destination;
|
|
CREATE TABLE test.destination(key UInt64, value UInt64)
|
|
ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
"""
|
|
)
|
|
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.bindings;
|
|
DROP TABLE IF EXISTS test.bindings_mv;
|
|
CREATE TABLE test.bindings (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'multiple_bindings_testing',
|
|
rabbitmq_exchange_type = 'direct',
|
|
rabbitmq_routing_key_list = 'key1,key2,key3,key4,key5',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_max_block_size=100,
|
|
rabbitmq_format = 'JSONEachRow',
|
|
rabbitmq_row_delimiter = '\\n';
|
|
CREATE MATERIALIZED VIEW test.bindings_mv TO test.destination AS
|
|
SELECT * FROM test.bindings;
|
|
"""
|
|
)
|
|
|
|
i = [0]
|
|
messages_num = 500
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
|
|
def produce():
|
|
# init connection here because otherwise python rabbitmq client might fail
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
|
|
messages = []
|
|
for _ in range(messages_num):
|
|
messages.append(json.dumps({"key": i[0], "value": i[0]}))
|
|
i[0] += 1
|
|
|
|
keys = ["key1", "key2", "key3", "key4", "key5"]
|
|
|
|
for key in keys:
|
|
for message in messages:
|
|
channel.basic_publish(
|
|
exchange="multiple_bindings_testing", routing_key=key, body=message
|
|
)
|
|
|
|
connection.close()
|
|
|
|
threads = []
|
|
threads_num = 10
|
|
|
|
for _ in range(threads_num):
|
|
threads.append(threading.Thread(target=produce))
|
|
for thread in threads:
|
|
time.sleep(random.uniform(0, 1))
|
|
thread.start()
|
|
|
|
while True:
|
|
result = instance.query("SELECT count() FROM test.destination")
|
|
time.sleep(1)
|
|
if int(result) == messages_num * threads_num * 5:
|
|
break
|
|
|
|
for thread in threads:
|
|
thread.join()
|
|
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.bindings;
|
|
DROP TABLE test.bindings_mv;
|
|
DROP TABLE test.destination;
|
|
"""
|
|
)
|
|
|
|
assert (
|
|
int(result) == messages_num * threads_num * 5
|
|
), "ClickHouse lost some messages: {}".format(result)
|
|
|
|
|
|
def test_rabbitmq_headers_exchange(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.destination;
|
|
CREATE TABLE test.destination(key UInt64, value UInt64)
|
|
ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
"""
|
|
)
|
|
|
|
num_tables_to_receive = 2
|
|
for consumer_id in range(num_tables_to_receive):
|
|
logging.debug(("Setting up table {}".format(consumer_id)))
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.headers_exchange_{0};
|
|
DROP TABLE IF EXISTS test.headers_exchange_{0}_mv;
|
|
CREATE TABLE test.headers_exchange_{0} (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_num_consumers = 2,
|
|
rabbitmq_exchange_name = 'headers_exchange_testing',
|
|
rabbitmq_exchange_type = 'headers',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_max_block_size=100,
|
|
rabbitmq_routing_key_list = 'x-match=all,format=logs,type=report,year=2020',
|
|
rabbitmq_format = 'JSONEachRow',
|
|
rabbitmq_row_delimiter = '\\n';
|
|
CREATE MATERIALIZED VIEW test.headers_exchange_{0}_mv TO test.destination AS
|
|
SELECT key, value FROM test.headers_exchange_{0};
|
|
""".format(
|
|
consumer_id
|
|
)
|
|
)
|
|
|
|
num_tables_to_ignore = 2
|
|
for consumer_id in range(num_tables_to_ignore):
|
|
logging.debug(
|
|
("Setting up table {}".format(consumer_id + num_tables_to_receive))
|
|
)
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.headers_exchange_{0};
|
|
DROP TABLE IF EXISTS test.headers_exchange_{0}_mv;
|
|
CREATE TABLE test.headers_exchange_{0} (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'headers_exchange_testing',
|
|
rabbitmq_exchange_type = 'headers',
|
|
rabbitmq_routing_key_list = 'x-match=all,format=logs,type=report,year=2019',
|
|
rabbitmq_format = 'JSONEachRow',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_max_block_size=100,
|
|
rabbitmq_row_delimiter = '\\n';
|
|
CREATE MATERIALIZED VIEW test.headers_exchange_{0}_mv TO test.destination AS
|
|
SELECT key, value FROM test.headers_exchange_{0};
|
|
""".format(
|
|
consumer_id + num_tables_to_receive
|
|
)
|
|
)
|
|
|
|
i = [0]
|
|
messages_num = 1000
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
|
|
messages = []
|
|
for _ in range(messages_num):
|
|
messages.append(json.dumps({"key": i[0], "value": i[0]}))
|
|
i[0] += 1
|
|
|
|
fields = {}
|
|
fields["format"] = "logs"
|
|
fields["type"] = "report"
|
|
fields["year"] = "2020"
|
|
|
|
for msg_id in range(messages_num):
|
|
channel.basic_publish(
|
|
exchange="headers_exchange_testing",
|
|
routing_key="",
|
|
properties=pika.BasicProperties(headers=fields, message_id=str(msg_id)),
|
|
body=messages[msg_id],
|
|
)
|
|
|
|
connection.close()
|
|
|
|
while True:
|
|
result = instance.query("SELECT count() FROM test.destination")
|
|
time.sleep(1)
|
|
if int(result) == messages_num * num_tables_to_receive:
|
|
break
|
|
|
|
for consumer_id in range(num_tables_to_receive + num_tables_to_ignore):
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.headers_exchange_{0}_mv;
|
|
DROP TABLE test.headers_exchange_{0};
|
|
""".format(
|
|
consumer_id
|
|
)
|
|
)
|
|
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.destination;
|
|
"""
|
|
)
|
|
|
|
assert (
|
|
int(result) == messages_num * num_tables_to_receive
|
|
), "ClickHouse lost some messages: {}".format(result)
|
|
|
|
|
|
def test_rabbitmq_virtual_columns(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.rabbitmq_virtuals (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'virtuals',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_max_block_size=100,
|
|
rabbitmq_format = 'JSONEachRow';
|
|
CREATE MATERIALIZED VIEW test.view Engine=Log AS
|
|
SELECT value, key, _exchange_name, _channel_id, _delivery_tag, _redelivered FROM test.rabbitmq_virtuals;
|
|
"""
|
|
)
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
|
|
message_num = 10
|
|
i = 0
|
|
messages = []
|
|
for _ in range(message_num):
|
|
messages.append(json.dumps({"key": i, "value": i}))
|
|
i += 1
|
|
|
|
for message in messages:
|
|
channel.basic_publish(exchange="virtuals", routing_key="", body=message)
|
|
|
|
while True:
|
|
result = instance.query("SELECT count() FROM test.view")
|
|
time.sleep(1)
|
|
if int(result) == message_num:
|
|
break
|
|
|
|
connection.close()
|
|
|
|
result = instance.query(
|
|
"""
|
|
SELECT key, value, _exchange_name, SUBSTRING(_channel_id, 1, 3), _delivery_tag, _redelivered
|
|
FROM test.view ORDER BY key
|
|
"""
|
|
)
|
|
|
|
expected = """\
|
|
0 0 virtuals 1_0 1 0
|
|
1 1 virtuals 1_0 2 0
|
|
2 2 virtuals 1_0 3 0
|
|
3 3 virtuals 1_0 4 0
|
|
4 4 virtuals 1_0 5 0
|
|
5 5 virtuals 1_0 6 0
|
|
6 6 virtuals 1_0 7 0
|
|
7 7 virtuals 1_0 8 0
|
|
8 8 virtuals 1_0 9 0
|
|
9 9 virtuals 1_0 10 0
|
|
"""
|
|
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.rabbitmq_virtuals;
|
|
DROP TABLE test.view;
|
|
"""
|
|
)
|
|
|
|
assert TSV(result) == TSV(expected)
|
|
|
|
|
|
def test_rabbitmq_virtual_columns_with_materialized_view(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.rabbitmq_virtuals_mv (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'virtuals_mv',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_max_block_size=100,
|
|
rabbitmq_format = 'JSONEachRow';
|
|
CREATE TABLE test.view (key UInt64, value UInt64,
|
|
exchange_name String, channel_id String, delivery_tag UInt64, redelivered UInt8) ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
|
|
SELECT *, _exchange_name as exchange_name, _channel_id as channel_id, _delivery_tag as delivery_tag, _redelivered as redelivered
|
|
FROM test.rabbitmq_virtuals_mv;
|
|
"""
|
|
)
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
|
|
message_num = 10
|
|
i = 0
|
|
messages = []
|
|
for _ in range(message_num):
|
|
messages.append(json.dumps({"key": i, "value": i}))
|
|
i += 1
|
|
|
|
for message in messages:
|
|
channel.basic_publish(exchange="virtuals_mv", routing_key="", body=message)
|
|
|
|
while True:
|
|
result = instance.query("SELECT count() FROM test.view")
|
|
time.sleep(1)
|
|
if int(result) == message_num:
|
|
break
|
|
|
|
connection.close()
|
|
|
|
result = instance.query(
|
|
"SELECT key, value, exchange_name, SUBSTRING(channel_id, 1, 3), delivery_tag, redelivered FROM test.view ORDER BY delivery_tag"
|
|
)
|
|
expected = """\
|
|
0 0 virtuals_mv 1_0 1 0
|
|
1 1 virtuals_mv 1_0 2 0
|
|
2 2 virtuals_mv 1_0 3 0
|
|
3 3 virtuals_mv 1_0 4 0
|
|
4 4 virtuals_mv 1_0 5 0
|
|
5 5 virtuals_mv 1_0 6 0
|
|
6 6 virtuals_mv 1_0 7 0
|
|
7 7 virtuals_mv 1_0 8 0
|
|
8 8 virtuals_mv 1_0 9 0
|
|
9 9 virtuals_mv 1_0 10 0
|
|
"""
|
|
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.consumer;
|
|
DROP TABLE test.view;
|
|
DROP TABLE test.rabbitmq_virtuals_mv
|
|
"""
|
|
)
|
|
|
|
assert TSV(result) == TSV(expected)
|
|
|
|
|
|
def test_rabbitmq_many_consumers_to_each_queue(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.destination;
|
|
CREATE TABLE test.destination(key UInt64, value UInt64, channel_id String)
|
|
ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
"""
|
|
)
|
|
|
|
num_tables = 4
|
|
for table_id in range(num_tables):
|
|
logging.debug(("Setting up table {}".format(table_id)))
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.many_consumers_{0};
|
|
DROP TABLE IF EXISTS test.many_consumers_{0}_mv;
|
|
CREATE TABLE test.many_consumers_{0} (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'many_consumers',
|
|
rabbitmq_num_queues = 2,
|
|
rabbitmq_num_consumers = 2,
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_max_block_size=100,
|
|
rabbitmq_queue_base = 'many_consumers',
|
|
rabbitmq_format = 'JSONEachRow',
|
|
rabbitmq_row_delimiter = '\\n';
|
|
CREATE MATERIALIZED VIEW test.many_consumers_{0}_mv TO test.destination AS
|
|
SELECT key, value, _channel_id as channel_id FROM test.many_consumers_{0};
|
|
""".format(
|
|
table_id
|
|
)
|
|
)
|
|
|
|
i = [0]
|
|
messages_num = 1000
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
|
|
def produce():
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
|
|
messages = []
|
|
for _ in range(messages_num):
|
|
messages.append(json.dumps({"key": i[0], "value": i[0]}))
|
|
i[0] += 1
|
|
for msg_id in range(messages_num):
|
|
channel.basic_publish(
|
|
exchange="many_consumers",
|
|
routing_key="",
|
|
properties=pika.BasicProperties(message_id=str(msg_id)),
|
|
body=messages[msg_id],
|
|
)
|
|
connection.close()
|
|
|
|
threads = []
|
|
threads_num = 20
|
|
|
|
for _ in range(threads_num):
|
|
threads.append(threading.Thread(target=produce))
|
|
for thread in threads:
|
|
time.sleep(random.uniform(0, 1))
|
|
thread.start()
|
|
|
|
result1 = ""
|
|
while True:
|
|
result1 = instance.query("SELECT count() FROM test.destination")
|
|
time.sleep(1)
|
|
if int(result1) == messages_num * threads_num:
|
|
break
|
|
|
|
result2 = instance.query("SELECT count(DISTINCT channel_id) FROM test.destination")
|
|
|
|
for thread in threads:
|
|
thread.join()
|
|
|
|
for consumer_id in range(num_tables):
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.many_consumers_{0};
|
|
DROP TABLE test.many_consumers_{0}_mv;
|
|
""".format(
|
|
consumer_id
|
|
)
|
|
)
|
|
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.destination;
|
|
"""
|
|
)
|
|
|
|
assert (
|
|
int(result1) == messages_num * threads_num
|
|
), "ClickHouse lost some messages: {}".format(result1)
|
|
# 4 tables, 2 consumers for each table => 8 consumer tags
|
|
assert int(result2) == 8
|
|
|
|
|
|
def test_rabbitmq_restore_failed_connection_without_losses_1(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.consume;
|
|
CREATE TABLE test.view (key UInt64, value UInt64)
|
|
ENGINE = MergeTree
|
|
ORDER BY key;
|
|
CREATE TABLE test.consume (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_flush_interval_ms=500,
|
|
rabbitmq_max_block_size = 100,
|
|
rabbitmq_exchange_name = 'producer_reconnect',
|
|
rabbitmq_format = 'JSONEachRow',
|
|
rabbitmq_num_consumers = 2,
|
|
rabbitmq_row_delimiter = '\\n';
|
|
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
|
|
SELECT * FROM test.consume;
|
|
DROP TABLE IF EXISTS test.producer_reconnect;
|
|
CREATE TABLE test.producer_reconnect (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'producer_reconnect',
|
|
rabbitmq_persistent = '1',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_format = 'JSONEachRow',
|
|
rabbitmq_row_delimiter = '\\n';
|
|
"""
|
|
)
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
|
|
messages_num = 100000
|
|
values = []
|
|
for i in range(messages_num):
|
|
values.append("({i}, {i})".format(i=i))
|
|
values = ",".join(values)
|
|
|
|
while True:
|
|
try:
|
|
instance.query(
|
|
"INSERT INTO test.producer_reconnect VALUES {}".format(values)
|
|
)
|
|
break
|
|
except QueryRuntimeException as e:
|
|
if "Local: Timed out." in str(e):
|
|
continue
|
|
else:
|
|
raise
|
|
|
|
while int(instance.query("SELECT count() FROM test.view")) == 0:
|
|
time.sleep(0.1)
|
|
|
|
kill_rabbitmq(rabbitmq_cluster.rabbitmq_docker_id)
|
|
time.sleep(4)
|
|
revive_rabbitmq(
|
|
rabbitmq_cluster.rabbitmq_docker_id, rabbitmq_cluster.rabbitmq_cookie
|
|
)
|
|
|
|
while True:
|
|
result = instance.query("SELECT count(DISTINCT key) FROM test.view")
|
|
time.sleep(1)
|
|
if int(result) == messages_num:
|
|
break
|
|
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.consume;
|
|
DROP TABLE test.producer_reconnect;
|
|
"""
|
|
)
|
|
|
|
assert int(result) == messages_num, "ClickHouse lost some messages: {}".format(
|
|
result
|
|
)
|
|
|
|
|
|
def test_rabbitmq_restore_failed_connection_without_losses_2(rabbitmq_cluster):
|
|
logging.getLogger("pika").propagate = False
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.consumer_reconnect (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'consumer_reconnect',
|
|
rabbitmq_num_consumers = 10,
|
|
rabbitmq_flush_interval_ms = 100,
|
|
rabbitmq_max_block_size = 100,
|
|
rabbitmq_num_queues = 10,
|
|
rabbitmq_format = 'JSONEachRow',
|
|
rabbitmq_row_delimiter = '\\n';
|
|
"""
|
|
)
|
|
|
|
i = 0
|
|
messages_num = 150000
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
messages = []
|
|
for _ in range(messages_num):
|
|
messages.append(json.dumps({"key": i, "value": i}))
|
|
i += 1
|
|
for msg_id in range(messages_num):
|
|
channel.basic_publish(
|
|
exchange="consumer_reconnect",
|
|
routing_key="",
|
|
body=messages[msg_id],
|
|
properties=pika.BasicProperties(delivery_mode=2, message_id=str(msg_id)),
|
|
)
|
|
connection.close()
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.view (key UInt64, value UInt64)
|
|
ENGINE = MergeTree
|
|
ORDER BY key;
|
|
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
|
|
SELECT * FROM test.consumer_reconnect;
|
|
"""
|
|
)
|
|
|
|
while int(instance.query("SELECT count() FROM test.view")) == 0:
|
|
logging.debug(3)
|
|
time.sleep(0.1)
|
|
|
|
kill_rabbitmq(rabbitmq_cluster.rabbitmq_docker_id)
|
|
time.sleep(8)
|
|
revive_rabbitmq(
|
|
rabbitmq_cluster.rabbitmq_docker_id, rabbitmq_cluster.rabbitmq_cookie
|
|
)
|
|
|
|
# while int(instance.query('SELECT count() FROM test.view')) == 0:
|
|
# time.sleep(0.1)
|
|
|
|
# kill_rabbitmq()
|
|
# time.sleep(2)
|
|
# revive_rabbitmq()
|
|
|
|
while True:
|
|
result = instance.query("SELECT count(DISTINCT key) FROM test.view").strip()
|
|
if int(result) == messages_num:
|
|
break
|
|
logging.debug(f"Result: {result} / {messages_num}")
|
|
time.sleep(1)
|
|
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.consumer;
|
|
DROP TABLE test.consumer_reconnect;
|
|
"""
|
|
)
|
|
|
|
assert int(result) == messages_num, "ClickHouse lost some messages: {}".format(
|
|
result
|
|
)
|
|
|
|
|
|
def test_rabbitmq_commit_on_block_write(rabbitmq_cluster):
|
|
logging.getLogger("pika").propagate = False
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'block',
|
|
rabbitmq_format = 'JSONEachRow',
|
|
rabbitmq_queue_base = 'block',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_max_block_size = 100,
|
|
rabbitmq_row_delimiter = '\\n';
|
|
CREATE TABLE test.view (key UInt64, value UInt64)
|
|
ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
|
|
SELECT * FROM test.rabbitmq;
|
|
"""
|
|
)
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
|
|
cancel = threading.Event()
|
|
|
|
i = [0]
|
|
|
|
def produce():
|
|
while not cancel.is_set():
|
|
messages = []
|
|
for _ in range(101):
|
|
messages.append(json.dumps({"key": i[0], "value": i[0]}))
|
|
i[0] += 1
|
|
for message in messages:
|
|
channel.basic_publish(exchange="block", routing_key="", body=message)
|
|
|
|
rabbitmq_thread = threading.Thread(target=produce)
|
|
rabbitmq_thread.start()
|
|
|
|
while int(instance.query("SELECT count() FROM test.view")) == 0:
|
|
time.sleep(1)
|
|
|
|
cancel.set()
|
|
|
|
instance.query("DETACH TABLE test.rabbitmq;")
|
|
|
|
while (
|
|
int(
|
|
instance.query(
|
|
"SELECT count() FROM system.tables WHERE database='test' AND name='rabbitmq'"
|
|
)
|
|
)
|
|
== 1
|
|
):
|
|
time.sleep(1)
|
|
|
|
instance.query("ATTACH TABLE test.rabbitmq;")
|
|
|
|
while int(instance.query("SELECT uniqExact(key) FROM test.view")) < i[0]:
|
|
time.sleep(1)
|
|
|
|
result = int(instance.query("SELECT count() == uniqExact(key) FROM test.view"))
|
|
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.consumer;
|
|
DROP TABLE test.view;
|
|
"""
|
|
)
|
|
|
|
rabbitmq_thread.join()
|
|
connection.close()
|
|
|
|
assert result == 1, "Messages from RabbitMQ get duplicated!"
|
|
|
|
|
|
def test_rabbitmq_no_connection_at_startup_1(rabbitmq_cluster):
|
|
# no connection when table is initialized
|
|
rabbitmq_cluster.pause_container("rabbitmq1")
|
|
instance.query_and_get_error(
|
|
"""
|
|
CREATE TABLE test.cs (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'cs',
|
|
rabbitmq_format = 'JSONEachRow',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_num_consumers = '5',
|
|
rabbitmq_row_delimiter = '\\n';
|
|
"""
|
|
)
|
|
rabbitmq_cluster.unpause_container("rabbitmq1")
|
|
|
|
|
|
def test_rabbitmq_no_connection_at_startup_2(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.cs (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'cs',
|
|
rabbitmq_format = 'JSONEachRow',
|
|
rabbitmq_num_consumers = '5',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_max_block_size=100,
|
|
rabbitmq_row_delimiter = '\\n';
|
|
CREATE TABLE test.view (key UInt64, value UInt64)
|
|
ENGINE = MergeTree
|
|
ORDER BY key;
|
|
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
|
|
SELECT * FROM test.cs;
|
|
"""
|
|
)
|
|
instance.query("DETACH TABLE test.cs")
|
|
rabbitmq_cluster.pause_container("rabbitmq1")
|
|
instance.query("ATTACH TABLE test.cs")
|
|
rabbitmq_cluster.unpause_container("rabbitmq1")
|
|
|
|
messages_num = 1000
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
for i in range(messages_num):
|
|
message = json.dumps({"key": i, "value": i})
|
|
channel.basic_publish(
|
|
exchange="cs",
|
|
routing_key="",
|
|
body=message,
|
|
properties=pika.BasicProperties(delivery_mode=2, message_id=str(i)),
|
|
)
|
|
connection.close()
|
|
|
|
while True:
|
|
result = instance.query("SELECT count() FROM test.view")
|
|
time.sleep(1)
|
|
if int(result) == messages_num:
|
|
break
|
|
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.consumer;
|
|
DROP TABLE test.cs;
|
|
"""
|
|
)
|
|
|
|
assert int(result) == messages_num, "ClickHouse lost some messages: {}".format(
|
|
result
|
|
)
|
|
|
|
|
|
def test_rabbitmq_format_factory_settings(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.format_settings (
|
|
id String, date DateTime
|
|
) ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'format_settings',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_format = 'JSONEachRow',
|
|
date_time_input_format = 'best_effort';
|
|
"""
|
|
)
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
|
|
message = json.dumps(
|
|
{"id": "format_settings_test", "date": "2021-01-19T14:42:33.1829214Z"}
|
|
)
|
|
expected = instance.query(
|
|
"""SELECT parseDateTimeBestEffort(CAST('2021-01-19T14:42:33.1829214Z', 'String'))"""
|
|
)
|
|
|
|
channel.basic_publish(exchange="format_settings", routing_key="", body=message)
|
|
result = ""
|
|
while True:
|
|
result = instance.query("SELECT date FROM test.format_settings")
|
|
if result == expected:
|
|
break
|
|
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.view (
|
|
id String, date DateTime
|
|
) ENGINE = MergeTree ORDER BY id;
|
|
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
|
|
SELECT * FROM test.format_settings;
|
|
"""
|
|
)
|
|
|
|
channel.basic_publish(exchange="format_settings", routing_key="", body=message)
|
|
result = ""
|
|
while True:
|
|
result = instance.query("SELECT date FROM test.view")
|
|
if result == expected:
|
|
break
|
|
|
|
connection.close()
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.consumer;
|
|
DROP TABLE test.format_settings;
|
|
"""
|
|
)
|
|
|
|
assert result == expected
|
|
|
|
|
|
def test_rabbitmq_vhost(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.rabbitmq_vhost (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'vhost',
|
|
rabbitmq_format = 'JSONEachRow',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_vhost = '/'
|
|
"""
|
|
)
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
channel.basic_publish(
|
|
exchange="vhost", routing_key="", body=json.dumps({"key": 1, "value": 2})
|
|
)
|
|
connection.close()
|
|
while True:
|
|
result = instance.query(
|
|
"SELECT * FROM test.rabbitmq_vhost ORDER BY key", ignore_error=True
|
|
)
|
|
if result == "1\t2\n":
|
|
break
|
|
|
|
|
|
def test_rabbitmq_drop_table_properly(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.rabbitmq_drop (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_exchange_name = 'drop',
|
|
rabbitmq_format = 'JSONEachRow',
|
|
rabbitmq_queue_base = 'rabbit_queue_drop'
|
|
"""
|
|
)
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
|
|
channel.basic_publish(
|
|
exchange="drop", routing_key="", body=json.dumps({"key": 1, "value": 2})
|
|
)
|
|
while True:
|
|
result = instance.query(
|
|
"SELECT * FROM test.rabbitmq_drop ORDER BY key", ignore_error=True
|
|
)
|
|
if result == "1\t2\n":
|
|
break
|
|
|
|
exists = channel.queue_declare(queue="rabbit_queue_drop", passive=True)
|
|
assert exists
|
|
|
|
instance.query("DROP TABLE test.rabbitmq_drop")
|
|
time.sleep(30)
|
|
|
|
try:
|
|
exists = channel.queue_declare(queue="rabbit_queue_drop", passive=True)
|
|
except Exception as e:
|
|
exists = False
|
|
|
|
assert not exists
|
|
|
|
|
|
def test_rabbitmq_queue_settings(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.rabbitmq_settings (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'rabbit_exchange',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_format = 'JSONEachRow',
|
|
rabbitmq_queue_base = 'rabbit_queue_settings',
|
|
rabbitmq_queue_settings_list = 'x-max-length=10,x-overflow=reject-publish'
|
|
"""
|
|
)
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
|
|
for i in range(50):
|
|
channel.basic_publish(
|
|
exchange="rabbit_exchange",
|
|
routing_key="",
|
|
body=json.dumps({"key": 1, "value": 2}),
|
|
)
|
|
connection.close()
|
|
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.view (key UInt64, value UInt64)
|
|
ENGINE = MergeTree ORDER BY key;
|
|
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
|
|
SELECT * FROM test.rabbitmq_settings;
|
|
"""
|
|
)
|
|
|
|
time.sleep(5)
|
|
|
|
while True:
|
|
result = instance.query("SELECT count() FROM test.view", ignore_error=True)
|
|
if int(result) == 10:
|
|
break
|
|
time.sleep(0.5)
|
|
|
|
instance.query("DROP TABLE test.rabbitmq_settings")
|
|
|
|
# queue size is 10, but 50 messages were sent, they will be dropped (setting x-overflow = reject-publish) and only 10 will remain.
|
|
assert int(result) == 10
|
|
|
|
|
|
def test_rabbitmq_queue_consume(rabbitmq_cluster):
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
channel.queue_declare(queue="rabbit_queue", durable=True)
|
|
|
|
i = [0]
|
|
messages_num = 1000
|
|
|
|
def produce():
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
messages = []
|
|
for _ in range(messages_num):
|
|
message = json.dumps({"key": i[0], "value": i[0]})
|
|
channel.basic_publish(exchange="", routing_key="rabbit_queue", body=message)
|
|
i[0] += 1
|
|
|
|
threads = []
|
|
threads_num = 10
|
|
for _ in range(threads_num):
|
|
threads.append(threading.Thread(target=produce))
|
|
for thread in threads:
|
|
time.sleep(random.uniform(0, 1))
|
|
thread.start()
|
|
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.rabbitmq_queue (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_format = 'JSONEachRow',
|
|
rabbitmq_queue_base = 'rabbit_queue',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_queue_consume = 1;
|
|
CREATE TABLE test.view (key UInt64, value UInt64)
|
|
ENGINE = MergeTree ORDER BY key;
|
|
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
|
|
SELECT * FROM test.rabbitmq_queue;
|
|
"""
|
|
)
|
|
|
|
result = ""
|
|
while True:
|
|
result = instance.query("SELECT count() FROM test.view")
|
|
if int(result) == messages_num * threads_num:
|
|
break
|
|
time.sleep(1)
|
|
|
|
for thread in threads:
|
|
thread.join()
|
|
|
|
instance.query("DROP TABLE test.rabbitmq_queue")
|
|
|
|
|
|
def test_rabbitmq_produce_consume_avro(rabbitmq_cluster):
|
|
num_rows = 75
|
|
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.view;
|
|
DROP TABLE IF EXISTS test.rabbit;
|
|
DROP TABLE IF EXISTS test.rabbit_writer;
|
|
|
|
CREATE TABLE test.rabbit_writer (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_format = 'Avro',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_exchange_name = 'avro',
|
|
rabbitmq_exchange_type = 'direct',
|
|
rabbitmq_routing_key_list = 'avro';
|
|
|
|
CREATE TABLE test.rabbit (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_format = 'Avro',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_exchange_name = 'avro',
|
|
rabbitmq_exchange_type = 'direct',
|
|
rabbitmq_routing_key_list = 'avro';
|
|
|
|
CREATE MATERIALIZED VIEW test.view Engine=Log AS
|
|
SELECT key, value FROM test.rabbit;
|
|
"""
|
|
)
|
|
|
|
instance.query(
|
|
"INSERT INTO test.rabbit_writer select number*10 as key, number*100 as value from numbers({num_rows}) SETTINGS output_format_avro_rows_in_file = 7".format(
|
|
num_rows=num_rows
|
|
)
|
|
)
|
|
|
|
# Ideally we should wait for an event
|
|
time.sleep(3)
|
|
|
|
expected_num_rows = instance.query(
|
|
"SELECT COUNT(1) FROM test.view", ignore_error=True
|
|
)
|
|
assert int(expected_num_rows) == num_rows
|
|
|
|
expected_max_key = instance.query(
|
|
"SELECT max(key) FROM test.view", ignore_error=True
|
|
)
|
|
assert int(expected_max_key) == (num_rows - 1) * 10
|
|
|
|
|
|
def test_rabbitmq_bad_args(rabbitmq_cluster):
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
channel.exchange_declare(exchange="f", exchange_type="fanout")
|
|
assert "Unable to declare exchange" in instance.query_and_get_error(
|
|
"""
|
|
CREATE TABLE test.drop (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_exchange_name = 'f',
|
|
rabbitmq_format = 'JSONEachRow';
|
|
"""
|
|
)
|
|
|
|
|
|
def test_rabbitmq_issue_30691(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.rabbitmq_drop (json String)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_exchange_name = '30691',
|
|
rabbitmq_row_delimiter = '\\n', -- Works only if adding this setting
|
|
rabbitmq_format = 'LineAsString',
|
|
rabbitmq_queue_base = '30691';
|
|
"""
|
|
)
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
|
|
channel.basic_publish(
|
|
exchange="30691",
|
|
routing_key="",
|
|
body=json.dumps(
|
|
{
|
|
"event_type": "purge",
|
|
"as_src": 1234,
|
|
"as_dst": 0,
|
|
"as_path": "",
|
|
"local_pref": 100,
|
|
"med": 0,
|
|
"peer_as_dst": 0,
|
|
"ip_src": "<redacted ipv6>",
|
|
"ip_dst": "<redacted ipv6>",
|
|
"port_src": 443,
|
|
"port_dst": 41930,
|
|
"ip_proto": "tcp",
|
|
"tos": 0,
|
|
"stamp_inserted": "2021-10-26 15:20:00",
|
|
"stamp_updated": "2021-10-26 15:23:14",
|
|
"packets": 2,
|
|
"bytes": 1216,
|
|
"writer_id": "default_amqp/449206",
|
|
}
|
|
),
|
|
)
|
|
result = ""
|
|
while True:
|
|
result = instance.query("SELECT * FROM test.rabbitmq_drop", ignore_error=True)
|
|
logging.debug(result)
|
|
if result != "":
|
|
break
|
|
assert (
|
|
result.strip()
|
|
== """{"event_type": "purge", "as_src": 1234, "as_dst": 0, "as_path": "", "local_pref": 100, "med": 0, "peer_as_dst": 0, "ip_src": "<redacted ipv6>", "ip_dst": "<redacted ipv6>", "port_src": 443, "port_dst": 41930, "ip_proto": "tcp", "tos": 0, "stamp_inserted": "2021-10-26 15:20:00", "stamp_updated": "2021-10-26 15:23:14", "packets": 2, "bytes": 1216, "writer_id": "default_amqp/449206"}"""
|
|
)
|
|
|
|
|
|
def test_rabbitmq_drop_mv(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.drop_mv (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'mv',
|
|
rabbitmq_format = 'JSONEachRow',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_queue_base = 'drop_mv';
|
|
"""
|
|
)
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.view (key UInt64, value UInt64)
|
|
ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
"""
|
|
)
|
|
instance.query(
|
|
"""
|
|
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
|
|
SELECT * FROM test.drop_mv;
|
|
"""
|
|
)
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
|
|
messages = []
|
|
for i in range(20):
|
|
channel.basic_publish(
|
|
exchange="mv", routing_key="", body=json.dumps({"key": i, "value": i})
|
|
)
|
|
|
|
while True:
|
|
res = instance.query("SELECT COUNT(*) FROM test.view")
|
|
logging.debug(f"Current count (1): {res}")
|
|
if int(res) == 20:
|
|
break
|
|
else:
|
|
logging.debug(f"Number of rows in test.view: {res}")
|
|
|
|
instance.query("DROP VIEW test.consumer SYNC")
|
|
for i in range(20, 40):
|
|
channel.basic_publish(
|
|
exchange="mv", routing_key="", body=json.dumps({"key": i, "value": i})
|
|
)
|
|
|
|
instance.query(
|
|
"""
|
|
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
|
|
SELECT * FROM test.drop_mv;
|
|
"""
|
|
)
|
|
for i in range(40, 50):
|
|
channel.basic_publish(
|
|
exchange="mv", routing_key="", body=json.dumps({"key": i, "value": i})
|
|
)
|
|
|
|
while True:
|
|
result = instance.query("SELECT count() FROM test.view")
|
|
logging.debug(f"Current count (2): {result}")
|
|
if int(result) == 50:
|
|
break
|
|
time.sleep(1)
|
|
|
|
result = instance.query("SELECT * FROM test.view ORDER BY key")
|
|
rabbitmq_check_result(result, True)
|
|
|
|
instance.query("DROP VIEW test.consumer SYNC")
|
|
time.sleep(10)
|
|
for i in range(50, 60):
|
|
channel.basic_publish(
|
|
exchange="mv", routing_key="", body=json.dumps({"key": i, "value": i})
|
|
)
|
|
connection.close()
|
|
|
|
count = 0
|
|
start = time.time()
|
|
while time.time() - start < 30:
|
|
count = int(instance.query("SELECT count() FROM test.drop_mv"))
|
|
if count:
|
|
break
|
|
|
|
instance.query("DROP TABLE test.drop_mv")
|
|
assert count > 0
|
|
|
|
|
|
def test_rabbitmq_random_detach(rabbitmq_cluster):
|
|
NUM_CONSUMERS = 2
|
|
NUM_QUEUES = 2
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'random',
|
|
rabbitmq_queue_base = 'random',
|
|
rabbitmq_num_queues = 2,
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_num_consumers = 2,
|
|
rabbitmq_format = 'JSONEachRow';
|
|
CREATE TABLE test.view (key UInt64, value UInt64, channel_id String)
|
|
ENGINE = MergeTree
|
|
ORDER BY key;
|
|
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
|
|
SELECT *, _channel_id AS channel_id FROM test.rabbitmq;
|
|
"""
|
|
)
|
|
|
|
i = [0]
|
|
messages_num = 10000
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
|
|
def produce():
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
|
|
messages = []
|
|
for j in range(messages_num):
|
|
messages.append(json.dumps({"key": i[0], "value": i[0]}))
|
|
i[0] += 1
|
|
mes_id = str(i)
|
|
channel.basic_publish(
|
|
exchange="random",
|
|
routing_key="",
|
|
properties=pika.BasicProperties(message_id=mes_id),
|
|
body=messages[-1],
|
|
)
|
|
connection.close()
|
|
|
|
threads = []
|
|
threads_num = 20
|
|
|
|
for _ in range(threads_num):
|
|
threads.append(threading.Thread(target=produce))
|
|
for thread in threads:
|
|
time.sleep(random.uniform(0, 1))
|
|
thread.start()
|
|
|
|
# time.sleep(5)
|
|
# kill_rabbitmq(rabbitmq_cluster.rabbitmq_docker_id)
|
|
# instance.query("detach table test.rabbitmq")
|
|
# revive_rabbitmq(rabbitmq_cluster.rabbitmq_docker_id)
|
|
|
|
for thread in threads:
|
|
thread.join()
|
|
|
|
|
|
def test_rabbitmq_predefined_configuration(rabbitmq_cluster):
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ(rabbit1, rabbitmq_vhost = '/')
|
|
SETTINGS rabbitmq_flush_interval_ms=1000;
|
|
"""
|
|
)
|
|
|
|
channel.basic_publish(
|
|
exchange="named", routing_key="", body=json.dumps({"key": 1, "value": 2})
|
|
)
|
|
while True:
|
|
result = instance.query(
|
|
"SELECT * FROM test.rabbitmq ORDER BY key", ignore_error=True
|
|
)
|
|
if result == "1\t2\n":
|
|
break
|
|
instance.restart_clickhouse()
|
|
channel.basic_publish(
|
|
exchange="named", routing_key="", body=json.dumps({"key": 1, "value": 2})
|
|
)
|
|
while True:
|
|
result = instance.query(
|
|
"SELECT * FROM test.rabbitmq ORDER BY key", ignore_error=True
|
|
)
|
|
if result == "1\t2\n":
|
|
break
|
|
|
|
|
|
def test_rabbitmq_msgpack(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
drop table if exists rabbit_in;
|
|
drop table if exists rabbit_out;
|
|
create table
|
|
rabbit_in (val String)
|
|
engine=RabbitMQ
|
|
settings rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'xhep',
|
|
rabbitmq_format = 'MsgPack',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_num_consumers = 1;
|
|
create table
|
|
rabbit_out (val String)
|
|
engine=RabbitMQ
|
|
settings rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'xhep',
|
|
rabbitmq_format = 'MsgPack',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_num_consumers = 1;
|
|
set stream_like_engine_allow_direct_select=1;
|
|
insert into rabbit_out select 'kek';
|
|
"""
|
|
)
|
|
|
|
result = ""
|
|
try_no = 0
|
|
while True:
|
|
result = instance.query("select * from rabbit_in;")
|
|
if result.strip() == "kek":
|
|
break
|
|
else:
|
|
try_no = try_no + 1
|
|
if try_no == 20:
|
|
break
|
|
time.sleep(1)
|
|
assert result.strip() == "kek"
|
|
|
|
instance.query("drop table rabbit_in sync")
|
|
instance.query("drop table rabbit_out sync")
|
|
|
|
|
|
def test_rabbitmq_address(rabbitmq_cluster):
|
|
instance2.query(
|
|
"""
|
|
drop table if exists rabbit_in;
|
|
drop table if exists rabbit_out;
|
|
create table
|
|
rabbit_in (val String)
|
|
engine=RabbitMQ
|
|
SETTINGS rabbitmq_exchange_name = 'rxhep',
|
|
rabbitmq_format = 'CSV',
|
|
rabbitmq_num_consumers = 1,
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_address='amqp://root:clickhouse@rabbitmq1:5672/';
|
|
create table
|
|
rabbit_out (val String) engine=RabbitMQ
|
|
SETTINGS rabbitmq_exchange_name = 'rxhep',
|
|
rabbitmq_format = 'CSV',
|
|
rabbitmq_num_consumers = 1,
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_address='amqp://root:clickhouse@rabbitmq1:5672/';
|
|
set stream_like_engine_allow_direct_select=1;
|
|
insert into rabbit_out select 'kek';
|
|
"""
|
|
)
|
|
|
|
result = ""
|
|
try_no = 0
|
|
while True:
|
|
result = instance2.query("select * from rabbit_in;")
|
|
if result.strip() == "kek":
|
|
break
|
|
else:
|
|
try_no = try_no + 1
|
|
if try_no == 20:
|
|
break
|
|
time.sleep(1)
|
|
assert result.strip() == "kek"
|
|
|
|
instance2.query("drop table rabbit_in sync")
|
|
instance2.query("drop table rabbit_out sync")
|
|
|
|
|
|
def test_format_with_prefix_and_suffix(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'insert',
|
|
rabbitmq_exchange_type = 'direct',
|
|
rabbitmq_routing_key_list = 'custom',
|
|
rabbitmq_format = 'CustomSeparated';
|
|
"""
|
|
)
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
consumer_connection = pika.BlockingConnection(parameters)
|
|
|
|
consumer = consumer_connection.channel()
|
|
result = consumer.queue_declare(queue="")
|
|
queue_name = result.method.queue
|
|
consumer.queue_bind(exchange="insert", queue=queue_name, routing_key="custom")
|
|
|
|
instance.query(
|
|
"INSERT INTO test.rabbitmq select number*10 as key, number*100 as value from numbers(2) settings format_custom_result_before_delimiter='<prefix>\n', format_custom_result_after_delimiter='<suffix>\n'"
|
|
)
|
|
|
|
insert_messages = []
|
|
|
|
def onReceived(channel, method, properties, body):
|
|
message = body.decode()
|
|
insert_messages.append(message)
|
|
logging.debug(f"Received {len(insert_messages)} message: {message}")
|
|
if len(insert_messages) == 2:
|
|
channel.stop_consuming()
|
|
|
|
consumer.basic_consume(queue_name, onReceived)
|
|
|
|
consumer.start_consuming()
|
|
consumer_connection.close()
|
|
|
|
assert (
|
|
"".join(insert_messages)
|
|
== "<prefix>\n0\t0\n<suffix>\n<prefix>\n10\t100\n<suffix>\n"
|
|
)
|
|
|
|
|
|
def test_max_rows_per_message(rabbitmq_cluster):
|
|
num_rows = 5
|
|
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.view;
|
|
DROP TABLE IF EXISTS test.rabbit;
|
|
|
|
CREATE TABLE test.rabbit (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_format = 'CustomSeparated',
|
|
rabbitmq_exchange_name = 'custom',
|
|
rabbitmq_exchange_type = 'direct',
|
|
rabbitmq_routing_key_list = 'custom1',
|
|
rabbitmq_max_rows_per_message = 3,
|
|
rabbitmq_flush_interval_ms = 1000,
|
|
format_custom_result_before_delimiter = '<prefix>\n',
|
|
format_custom_result_after_delimiter = '<suffix>\n';
|
|
|
|
CREATE MATERIALIZED VIEW test.view Engine=Log AS
|
|
SELECT key, value FROM test.rabbit;
|
|
"""
|
|
)
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
consumer_connection = pika.BlockingConnection(parameters)
|
|
|
|
consumer = consumer_connection.channel()
|
|
result = consumer.queue_declare(queue="")
|
|
queue_name = result.method.queue
|
|
consumer.queue_bind(exchange="custom", queue=queue_name, routing_key="custom1")
|
|
|
|
instance.query(
|
|
f"INSERT INTO test.rabbit select number*10 as key, number*100 as value from numbers({num_rows}) settings format_custom_result_before_delimiter='<prefix>\n', format_custom_result_after_delimiter='<suffix>\n'"
|
|
)
|
|
|
|
insert_messages = []
|
|
|
|
def onReceived(channel, method, properties, body):
|
|
insert_messages.append(body.decode())
|
|
if len(insert_messages) == 2:
|
|
channel.stop_consuming()
|
|
|
|
consumer.basic_consume(queue_name, onReceived)
|
|
consumer.start_consuming()
|
|
consumer_connection.close()
|
|
|
|
assert len(insert_messages) == 2
|
|
|
|
assert (
|
|
"".join(insert_messages)
|
|
== "<prefix>\n0\t0\n10\t100\n20\t200\n<suffix>\n<prefix>\n30\t300\n40\t400\n<suffix>\n"
|
|
)
|
|
|
|
attempt = 0
|
|
rows = 0
|
|
while attempt < 100:
|
|
rows = int(instance.query("SELECT count() FROM test.view"))
|
|
if rows == num_rows:
|
|
break
|
|
attempt += 1
|
|
|
|
assert rows == num_rows
|
|
|
|
result = instance.query("SELECT * FROM test.view")
|
|
assert result == "0\t0\n10\t100\n20\t200\n30\t300\n40\t400\n"
|
|
|
|
|
|
def test_row_based_formats(rabbitmq_cluster):
|
|
num_rows = 10
|
|
|
|
for format_name in [
|
|
"TSV",
|
|
"TSVWithNamesAndTypes",
|
|
"TSKV",
|
|
"CSV",
|
|
"CSVWithNamesAndTypes",
|
|
"CustomSeparatedWithNamesAndTypes",
|
|
"Values",
|
|
"JSON",
|
|
"JSONEachRow",
|
|
"JSONCompactEachRow",
|
|
"JSONCompactEachRowWithNamesAndTypes",
|
|
"JSONObjectEachRow",
|
|
"Avro",
|
|
"RowBinary",
|
|
"RowBinaryWithNamesAndTypes",
|
|
"MsgPack",
|
|
]:
|
|
logging.debug(format_name)
|
|
|
|
instance.query(
|
|
f"""
|
|
DROP TABLE IF EXISTS test.view;
|
|
DROP TABLE IF EXISTS test.rabbit;
|
|
|
|
CREATE TABLE test.rabbit (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_format = '{format_name}',
|
|
rabbitmq_exchange_name = '{format_name}',
|
|
rabbitmq_exchange_type = 'direct',
|
|
rabbitmq_max_block_size = 100,
|
|
rabbitmq_flush_interval_ms = 1000,
|
|
rabbitmq_routing_key_list = '{format_name}',
|
|
rabbitmq_max_rows_per_message = 5;
|
|
|
|
CREATE MATERIALIZED VIEW test.view Engine=Log AS
|
|
SELECT key, value FROM test.rabbit;
|
|
"""
|
|
)
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip,
|
|
rabbitmq_cluster.rabbitmq_port,
|
|
"/",
|
|
credentials,
|
|
)
|
|
consumer_connection = pika.BlockingConnection(parameters)
|
|
|
|
consumer = consumer_connection.channel()
|
|
result = consumer.queue_declare(queue="")
|
|
queue_name = result.method.queue
|
|
consumer.queue_bind(
|
|
exchange=format_name, queue=queue_name, routing_key=format_name
|
|
)
|
|
|
|
instance.query(
|
|
f"INSERT INTO test.rabbit SELECT number * 10 as key, number * 100 as value FROM numbers({num_rows});"
|
|
)
|
|
|
|
insert_messages = 0
|
|
|
|
def onReceived(channel, method, properties, body):
|
|
nonlocal insert_messages
|
|
insert_messages += 1
|
|
if insert_messages == 2:
|
|
channel.stop_consuming()
|
|
|
|
consumer.basic_consume(queue_name, onReceived)
|
|
consumer.start_consuming()
|
|
consumer_connection.close()
|
|
|
|
assert insert_messages == 2
|
|
|
|
attempt = 0
|
|
rows = 0
|
|
while attempt < 100:
|
|
rows = int(instance.query("SELECT count() FROM test.view"))
|
|
if rows == num_rows:
|
|
break
|
|
attempt += 1
|
|
|
|
assert rows == num_rows
|
|
|
|
expected = ""
|
|
for i in range(num_rows):
|
|
expected += str(i * 10) + "\t" + str(i * 100) + "\n"
|
|
|
|
result = instance.query("SELECT * FROM test.view")
|
|
assert result == expected
|
|
|
|
|
|
def test_block_based_formats_1(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'PrettySpace',
|
|
rabbitmq_exchange_type = 'direct',
|
|
rabbitmq_max_block_size = 100,
|
|
rabbitmq_flush_interval_ms = 1000,
|
|
rabbitmq_routing_key_list = 'PrettySpace',
|
|
rabbitmq_format = 'PrettySpace';
|
|
"""
|
|
)
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
consumer_connection = pika.BlockingConnection(parameters)
|
|
|
|
consumer = consumer_connection.channel()
|
|
result = consumer.queue_declare(queue="")
|
|
queue_name = result.method.queue
|
|
consumer.queue_bind(
|
|
exchange="PrettySpace", queue=queue_name, routing_key="PrettySpace"
|
|
)
|
|
|
|
instance.query(
|
|
"INSERT INTO test.rabbitmq SELECT number * 10 as key, number * 100 as value FROM numbers(5) settings max_block_size=2, optimize_trivial_insert_select=0, output_format_pretty_color=1, output_format_pretty_row_numbers=0;"
|
|
)
|
|
insert_messages = []
|
|
|
|
def onReceived(channel, method, properties, body):
|
|
insert_messages.append(body.decode())
|
|
if len(insert_messages) == 3:
|
|
channel.stop_consuming()
|
|
|
|
consumer.basic_consume(queue_name, onReceived)
|
|
consumer.start_consuming()
|
|
consumer_connection.close()
|
|
|
|
assert len(insert_messages) == 3
|
|
|
|
data = []
|
|
for message in insert_messages:
|
|
splitted = message.split("\n")
|
|
assert splitted[0] == " \x1b[1mkey\x1b[0m \x1b[1mvalue\x1b[0m"
|
|
assert splitted[1] == ""
|
|
assert splitted[-1] == ""
|
|
data += [line.split() for line in splitted[2:-1]]
|
|
|
|
assert data == [
|
|
["0", "0"],
|
|
["10", "100"],
|
|
["20", "200"],
|
|
["30", "300"],
|
|
["40", "400"],
|
|
]
|
|
|
|
|
|
def test_block_based_formats_2(rabbitmq_cluster):
|
|
num_rows = 100
|
|
|
|
for format_name in [
|
|
"JSONColumns",
|
|
"Native",
|
|
"Arrow",
|
|
"Parquet",
|
|
"ORC",
|
|
"JSONCompactColumns",
|
|
]:
|
|
logging.debug(format_name)
|
|
|
|
instance.query(
|
|
f"""
|
|
DROP TABLE IF EXISTS test.view;
|
|
DROP TABLE IF EXISTS test.rabbit;
|
|
|
|
CREATE TABLE test.rabbit (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_format = '{format_name}',
|
|
rabbitmq_exchange_name = '{format_name}',
|
|
rabbitmq_exchange_type = 'direct',
|
|
rabbitmq_max_block_size = 100,
|
|
rabbitmq_flush_interval_ms = 1000,
|
|
rabbitmq_routing_key_list = '{format_name}';
|
|
|
|
CREATE MATERIALIZED VIEW test.view Engine=Log AS
|
|
SELECT key, value FROM test.rabbit;
|
|
"""
|
|
)
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip,
|
|
rabbitmq_cluster.rabbitmq_port,
|
|
"/",
|
|
credentials,
|
|
)
|
|
consumer_connection = pika.BlockingConnection(parameters)
|
|
|
|
consumer = consumer_connection.channel()
|
|
result = consumer.queue_declare(queue="")
|
|
queue_name = result.method.queue
|
|
consumer.queue_bind(
|
|
exchange=format_name, queue=queue_name, routing_key=format_name
|
|
)
|
|
|
|
instance.query(
|
|
f"INSERT INTO test.rabbit SELECT number * 10 as key, number * 100 as value FROM numbers({num_rows}) settings max_block_size=12, optimize_trivial_insert_select=0;"
|
|
)
|
|
|
|
insert_messages = 0
|
|
|
|
def onReceived(channel, method, properties, body):
|
|
nonlocal insert_messages
|
|
insert_messages += 1
|
|
if insert_messages == 9:
|
|
channel.stop_consuming()
|
|
|
|
consumer.basic_consume(queue_name, onReceived)
|
|
consumer.start_consuming()
|
|
consumer_connection.close()
|
|
|
|
assert insert_messages == 9
|
|
|
|
attempt = 0
|
|
rows = 0
|
|
while attempt < 100:
|
|
rows = int(instance.query("SELECT count() FROM test.view"))
|
|
if rows == num_rows:
|
|
break
|
|
attempt += 1
|
|
|
|
assert rows == num_rows
|
|
|
|
result = instance.query("SELECT * FROM test.view ORDER by key")
|
|
expected = ""
|
|
for i in range(num_rows):
|
|
expected += str(i * 10) + "\t" + str(i * 100) + "\n"
|
|
assert result == expected
|
|
|
|
|
|
def test_rabbitmq_flush_by_block_size(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.view;
|
|
DROP TABLE IF EXISTS test.consumer;
|
|
|
|
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'flush_by_block',
|
|
rabbitmq_queue_base = 'flush_by_block',
|
|
rabbitmq_max_block_size = 100,
|
|
rabbitmq_flush_interval_ms = 640000, /* should not flush by time during test */
|
|
rabbitmq_format = 'JSONEachRow';
|
|
|
|
CREATE TABLE test.view (key UInt64, value UInt64)
|
|
ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
|
|
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
|
|
SELECT * FROM test.rabbitmq;
|
|
|
|
SYSTEM STOP MERGES;
|
|
"""
|
|
)
|
|
|
|
cancel = threading.Event()
|
|
|
|
def produce():
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip,
|
|
rabbitmq_cluster.rabbitmq_port,
|
|
"/",
|
|
credentials,
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
|
|
while not cancel.is_set():
|
|
try:
|
|
channel = connection.channel()
|
|
channel.basic_publish(
|
|
exchange="flush_by_block",
|
|
routing_key="",
|
|
body=json.dumps({"key": 0, "value": 0}),
|
|
)
|
|
except Exception as e:
|
|
logging.debug(f"Got error: {str(e)}")
|
|
|
|
produce_thread = threading.Thread(target=produce)
|
|
produce_thread.start()
|
|
|
|
while 0 == int(
|
|
instance.query(
|
|
"SELECT count() FROM system.parts WHERE database = 'test' AND table = 'view' AND name = 'all_1_1_0'"
|
|
)
|
|
):
|
|
time.sleep(0.5)
|
|
|
|
cancel.set()
|
|
produce_thread.join()
|
|
|
|
# more flushes can happens during test, we need to check only result of first flush (part named all_1_1_0).
|
|
result = instance.query("SELECT count() FROM test.view WHERE _part='all_1_1_0'")
|
|
# logging.debug(result)
|
|
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.consumer;
|
|
DROP TABLE test.view;
|
|
DROP TABLE test.rabbitmq;
|
|
"""
|
|
)
|
|
|
|
# 100 = first poll should return 100 messages (and rows)
|
|
# not waiting for stream_flush_interval_ms
|
|
assert (
|
|
int(result) == 100
|
|
), "Messages from rabbitmq should be flushed when block of size rabbitmq_max_block_size is formed!"
|
|
|
|
|
|
def test_rabbitmq_flush_by_time(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.view;
|
|
DROP TABLE IF EXISTS test.consumer;
|
|
|
|
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
|
rabbitmq_exchange_name = 'flush_by_time',
|
|
rabbitmq_queue_base = 'flush_by_time',
|
|
rabbitmq_max_block_size = 100,
|
|
rabbitmq_flush_interval_ms = 5000,
|
|
rabbitmq_format = 'JSONEachRow';
|
|
|
|
CREATE TABLE test.view (key UInt64, value UInt64, ts DateTime64(3) MATERIALIZED now64(3))
|
|
ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
"""
|
|
)
|
|
|
|
cancel = threading.Event()
|
|
|
|
def produce():
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip,
|
|
rabbitmq_cluster.rabbitmq_port,
|
|
"/",
|
|
credentials,
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
|
|
while not cancel.is_set():
|
|
try:
|
|
channel = connection.channel()
|
|
channel.basic_publish(
|
|
exchange="flush_by_time",
|
|
routing_key="",
|
|
body=json.dumps({"key": 0, "value": 0}),
|
|
)
|
|
logging.debug("Produced a message")
|
|
time.sleep(0.8)
|
|
except Exception as e:
|
|
logging.debug(f"Got error: {str(e)}")
|
|
|
|
produce_thread = threading.Thread(target=produce)
|
|
produce_thread.start()
|
|
|
|
instance.query(
|
|
"""
|
|
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
|
|
SELECT * FROM test.rabbitmq;
|
|
"""
|
|
)
|
|
|
|
while True:
|
|
time.sleep(0.2)
|
|
count = instance.query(
|
|
"SELECT count() FROM system.parts WHERE database = 'test' AND table = 'view'"
|
|
)
|
|
logging.debug(f"kssenii total count: {count}")
|
|
count = int(
|
|
instance.query(
|
|
"SELECT count() FROM system.parts WHERE database = 'test' AND table = 'view' AND name = 'all_1_1_0'"
|
|
)
|
|
)
|
|
logging.debug(f"kssenii count: {count}")
|
|
if count > 0:
|
|
break
|
|
|
|
time.sleep(12)
|
|
result = instance.query("SELECT uniqExact(ts) FROM test.view")
|
|
|
|
cancel.set()
|
|
produce_thread.join()
|
|
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.consumer;
|
|
DROP TABLE test.view;
|
|
DROP TABLE test.rabbitmq;
|
|
"""
|
|
)
|
|
|
|
assert int(result) == 3
|
|
|
|
|
|
def test_rabbitmq_handle_error_mode_stream(rabbitmq_cluster):
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.rabbitmq;
|
|
DROP TABLE IF EXISTS test.view;
|
|
DROP TABLE IF EXISTS test.data;
|
|
DROP TABLE IF EXISTS test.errors;
|
|
DROP TABLE IF EXISTS test.errors_view;
|
|
|
|
CREATE TABLE test.rabbit (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = '{}:5672',
|
|
rabbitmq_exchange_name = 'select',
|
|
rabbitmq_commit_on_select = 1,
|
|
rabbitmq_format = 'JSONEachRow',
|
|
rabbitmq_row_delimiter = '\\n',
|
|
rabbitmq_handle_error_mode = 'stream';
|
|
|
|
|
|
CREATE TABLE test.errors (error Nullable(String), broken_message Nullable(String))
|
|
ENGINE = MergeTree()
|
|
ORDER BY tuple();
|
|
|
|
CREATE MATERIALIZED VIEW test.errors_view TO test.errors AS
|
|
SELECT _error as error, _raw_message as broken_message FROM test.rabbit where not isNull(_error);
|
|
|
|
CREATE TABLE test.data (key UInt64, value UInt64)
|
|
ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
|
|
CREATE MATERIALIZED VIEW test.view TO test.data AS
|
|
SELECT key, value FROM test.rabbit;
|
|
""".format(
|
|
rabbitmq_cluster.rabbitmq_host
|
|
)
|
|
)
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
|
|
messages = []
|
|
num_rows = 50
|
|
for i in range(num_rows):
|
|
if i % 2 == 0:
|
|
messages.append(json.dumps({"key": i, "value": i}))
|
|
else:
|
|
messages.append("Broken message " + str(i))
|
|
|
|
for message in messages:
|
|
channel.basic_publish(exchange="select", routing_key="", body=message)
|
|
|
|
connection.close()
|
|
# The order of messages in select * from test.rabbitmq is not guaranteed, so sleep to collect everything in one select
|
|
time.sleep(1)
|
|
|
|
attempt = 0
|
|
rows = 0
|
|
while attempt < 500:
|
|
rows = int(instance.query("SELECT count() FROM test.data"))
|
|
if rows == num_rows:
|
|
break
|
|
attempt += 1
|
|
|
|
assert rows == num_rows
|
|
|
|
result = instance.query("SELECT * FROM test.data ORDER by key")
|
|
expected = "0\t0\n" * (num_rows // 2)
|
|
for i in range(num_rows):
|
|
if i % 2 == 0:
|
|
expected += str(i) + "\t" + str(i) + "\n"
|
|
|
|
assert result == expected
|
|
|
|
attempt = 0
|
|
errors_count = 0
|
|
while attempt < 500:
|
|
errors_count = int(instance.query("SELECT count() FROM test.errors"))
|
|
if errors_count == num_rows:
|
|
break
|
|
attempt += 1
|
|
|
|
assert errors_count == num_rows / 2
|
|
|
|
broken_messages = instance.query(
|
|
"SELECT broken_message FROM test.errors order by broken_message"
|
|
)
|
|
expected = []
|
|
for i in range(num_rows):
|
|
if i % 2 != 0:
|
|
expected.append("Broken message " + str(i) + "\n")
|
|
|
|
expected = "".join(sorted(expected))
|
|
assert broken_messages == expected
|
|
|
|
|
|
def test_attach_broken_table(rabbitmq_cluster):
|
|
instance.query(
|
|
"ATTACH TABLE rabbit_queue UUID '2d1cdf1a-f060-4a61-a7c9-5b59e59992c6' (`payload` String) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'nonexisting:5671', rabbitmq_format = 'JSONEachRow', rabbitmq_username = 'test', rabbitmq_password = 'test'"
|
|
)
|
|
|
|
error = instance.query_and_get_error("SELECT * FROM rabbit_queue")
|
|
assert "CANNOT_CONNECT_RABBITMQ" in error
|
|
error = instance.query_and_get_error("INSERT INTO rabbit_queue VALUES ('test')")
|
|
assert "CANNOT_CONNECT_RABBITMQ" in error
|
|
|
|
|
|
def test_rabbitmq_nack_failed_insert(rabbitmq_cluster):
|
|
table_name = "nack_failed_insert"
|
|
exchange = f"{table_name}_exchange"
|
|
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
|
|
channel.exchange_declare(exchange="deadl")
|
|
|
|
result = channel.queue_declare(queue="deadq")
|
|
queue_name = result.method.queue
|
|
channel.queue_bind(exchange="deadl", routing_key="", queue=queue_name)
|
|
|
|
instance3.query(
|
|
f"""
|
|
CREATE TABLE test.{table_name} (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = '{rabbitmq_cluster.rabbitmq_host}:5672',
|
|
rabbitmq_flush_interval_ms=1000,
|
|
rabbitmq_exchange_name = '{exchange}',
|
|
rabbitmq_format = 'JSONEachRow',
|
|
rabbitmq_queue_settings_list='x-dead-letter-exchange=deadl';
|
|
|
|
DROP TABLE IF EXISTS test.view;
|
|
CREATE TABLE test.view (key UInt64, value UInt64)
|
|
ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
|
|
DROP TABLE IF EXISTS test.consumer;
|
|
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
|
|
SELECT * FROM test.{table_name};
|
|
"""
|
|
)
|
|
|
|
num_rows = 25
|
|
for i in range(num_rows):
|
|
message = json.dumps({"key": i, "value": i}) + "\n"
|
|
channel.basic_publish(exchange=exchange, routing_key="", body=message)
|
|
|
|
instance3.wait_for_log_line(
|
|
"Failed to push to views. Error: Code: 252. DB::Exception: Too many parts"
|
|
)
|
|
|
|
instance3.replace_in_config(
|
|
"/etc/clickhouse-server/config.d/mergetree.xml",
|
|
"parts_to_throw_insert>0",
|
|
"parts_to_throw_insert>10",
|
|
)
|
|
instance3.restart_clickhouse()
|
|
|
|
count = [0]
|
|
|
|
def on_consume(channel, method, properties, body):
|
|
channel.basic_publish(exchange=exchange, routing_key="", body=body)
|
|
count[0] += 1
|
|
if count[0] == num_rows:
|
|
channel.stop_consuming()
|
|
|
|
channel.basic_consume(queue_name, on_consume)
|
|
channel.start_consuming()
|
|
|
|
attempt = 0
|
|
count = 0
|
|
while attempt < 100:
|
|
count = int(instance3.query("SELECT count() FROM test.view"))
|
|
if count == num_rows:
|
|
break
|
|
attempt += 1
|
|
|
|
assert count == num_rows
|
|
|
|
instance3.query(
|
|
f"""
|
|
DROP TABLE test.consumer;
|
|
DROP TABLE test.view;
|
|
DROP TABLE test.{table_name};
|
|
"""
|
|
)
|
|
connection.close()
|
|
|
|
|
|
def test_rabbitmq_reject_broken_messages(rabbitmq_cluster):
|
|
credentials = pika.PlainCredentials("root", "clickhouse")
|
|
parameters = pika.ConnectionParameters(
|
|
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
|
|
deadletter_exchange = "deadletter_exchange_handle_error_mode_stream"
|
|
deadletter_queue = "deadletter_queue_handle_error_mode_stream"
|
|
channel.exchange_declare(exchange=deadletter_exchange)
|
|
|
|
result = channel.queue_declare(queue=deadletter_queue)
|
|
channel.queue_bind(
|
|
exchange=deadletter_exchange, routing_key="", queue=deadletter_queue
|
|
)
|
|
|
|
instance.query(
|
|
f"""
|
|
DROP TABLE IF EXISTS test.rabbitmq;
|
|
DROP TABLE IF EXISTS test.view;
|
|
DROP TABLE IF EXISTS test.data;
|
|
DROP TABLE IF EXISTS test.errors;
|
|
DROP TABLE IF EXISTS test.errors_view;
|
|
|
|
CREATE TABLE test.rabbit (key UInt64, value UInt64)
|
|
ENGINE = RabbitMQ
|
|
SETTINGS rabbitmq_host_port = '{rabbitmq_cluster.rabbitmq_host}:5672',
|
|
rabbitmq_exchange_name = 'select',
|
|
rabbitmq_commit_on_select = 1,
|
|
rabbitmq_format = 'JSONEachRow',
|
|
rabbitmq_row_delimiter = '\\n',
|
|
rabbitmq_handle_error_mode = 'stream',
|
|
rabbitmq_queue_settings_list='x-dead-letter-exchange={deadletter_exchange}';
|
|
|
|
|
|
CREATE TABLE test.errors (error Nullable(String), broken_message Nullable(String))
|
|
ENGINE = MergeTree()
|
|
ORDER BY tuple();
|
|
|
|
CREATE MATERIALIZED VIEW test.errors_view TO test.errors AS
|
|
SELECT _error as error, _raw_message as broken_message FROM test.rabbit where not isNull(_error);
|
|
|
|
CREATE TABLE test.data (key UInt64, value UInt64)
|
|
ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
|
|
CREATE MATERIALIZED VIEW test.view TO test.data AS
|
|
SELECT key, value FROM test.rabbit;
|
|
"""
|
|
)
|
|
|
|
messages = []
|
|
num_rows = 50
|
|
for i in range(num_rows):
|
|
if i % 2 == 0:
|
|
messages.append(json.dumps({"key": i, "value": i}))
|
|
else:
|
|
messages.append("Broken message " + str(i))
|
|
|
|
for message in messages:
|
|
channel.basic_publish(exchange="select", routing_key="", body=message)
|
|
|
|
time.sleep(1)
|
|
|
|
attempt = 0
|
|
rows = 0
|
|
while attempt < 500:
|
|
rows = int(instance.query("SELECT count() FROM test.data"))
|
|
if rows == num_rows:
|
|
break
|
|
attempt += 1
|
|
time.sleep(1)
|
|
|
|
assert rows == num_rows
|
|
|
|
dead_letters = []
|
|
|
|
def on_dead_letter(channel, method, properties, body):
|
|
dead_letters.append(body)
|
|
if len(dead_letters) == num_rows / 2:
|
|
channel.stop_consuming()
|
|
|
|
channel.basic_consume(deadletter_queue, on_dead_letter)
|
|
channel.start_consuming()
|
|
|
|
assert len(dead_letters) == num_rows / 2
|
|
|
|
i = 1
|
|
for letter in dead_letters:
|
|
assert f"Broken message {i}" in str(letter)
|
|
i += 2
|
|
|
|
connection.close()
|