mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-15 19:02:04 +00:00
1875 lines
54 KiB
Python
1875 lines
54 KiB
Python
import pytest
|
|
|
|
# FIXME This test is too flaky
|
|
# https://github.com/ClickHouse/ClickHouse/issues/39185
|
|
|
|
pytestmark = pytest.mark.skip
|
|
|
|
import json
|
|
import os.path as p
|
|
import random
|
|
import subprocess
|
|
import threading
|
|
import logging
|
|
import time
|
|
from random import randrange
|
|
import math
|
|
|
|
import asyncio
|
|
from google.protobuf.internal.encoder import _VarintBytes
|
|
from helpers.client import QueryRuntimeException
|
|
from helpers.cluster import ClickHouseCluster, check_nats_is_available, nats_connect_ssl
|
|
from helpers.test_tools import TSV
|
|
|
|
from . import nats_pb2
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
instance = cluster.add_instance(
|
|
"instance",
|
|
main_configs=[
|
|
"configs/nats.xml",
|
|
"configs/macros.xml",
|
|
"configs/named_collection.xml",
|
|
],
|
|
user_configs=["configs/users.xml"],
|
|
with_nats=True,
|
|
clickhouse_path_dir="clickhouse_path",
|
|
)
|
|
|
|
|
|
# Helpers
|
|
|
|
|
|
def wait_nats_to_start(nats_port, ssl_ctx=None, timeout=180):
|
|
start = time.time()
|
|
while time.time() - start < timeout:
|
|
try:
|
|
if asyncio.run(check_nats_is_available(nats_port, ssl_ctx=ssl_ctx)):
|
|
logging.debug("NATS is available")
|
|
return
|
|
time.sleep(0.5)
|
|
except Exception as ex:
|
|
logging.debug("Can't connect to NATS " + str(ex))
|
|
time.sleep(0.5)
|
|
|
|
|
|
def nats_check_result(result, check=False, ref_file="test_nats_json.reference"):
|
|
fpath = p.join(p.dirname(__file__), ref_file)
|
|
with open(fpath) as reference:
|
|
if check:
|
|
assert TSV(result) == TSV(reference)
|
|
else:
|
|
return TSV(result) == TSV(reference)
|
|
|
|
|
|
def kill_nats(nats_id):
|
|
p = subprocess.Popen(("docker", "stop", nats_id), stdout=subprocess.PIPE)
|
|
p.communicate()
|
|
return p.returncode == 0
|
|
|
|
|
|
def revive_nats(nats_id, nats_port):
|
|
p = subprocess.Popen(("docker", "start", nats_id), stdout=subprocess.PIPE)
|
|
p.communicate()
|
|
wait_nats_to_start(nats_port)
|
|
|
|
|
|
# Fixtures
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def nats_cluster():
|
|
try:
|
|
cluster.start()
|
|
logging.debug("nats_id is {}".format(instance.cluster.nats_docker_id))
|
|
instance.query("CREATE DATABASE test")
|
|
|
|
yield cluster
|
|
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def nats_setup_teardown():
|
|
print("NATS is available - running test")
|
|
yield # run test
|
|
instance.query("DROP DATABASE test SYNC")
|
|
instance.query("CREATE DATABASE test")
|
|
|
|
|
|
# Tests
|
|
|
|
|
|
async def nats_produce_messages(cluster_inst, subject, messages=(), bytes=None):
|
|
nc = await nats_connect_ssl(
|
|
cluster_inst.nats_port,
|
|
user="click",
|
|
password="house",
|
|
ssl_ctx=cluster_inst.nats_ssl_context,
|
|
)
|
|
logging.debug("NATS connection status: " + str(nc.is_connected))
|
|
|
|
for message in messages:
|
|
await nc.publish(subject, message.encode())
|
|
if bytes is not None:
|
|
await nc.publish(subject, bytes)
|
|
logging.debug("Finished publising to " + subject)
|
|
|
|
await nc.close()
|
|
return messages
|
|
|
|
|
|
def check_table_is_ready(instance, table_name):
|
|
try:
|
|
instance.query("SELECT * FROM {}".format(table_name))
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def test_nats_select(nats_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.nats (key UInt64, value UInt64)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_subjects = 'select',
|
|
nats_format = 'JSONEachRow',
|
|
nats_row_delimiter = '\\n';
|
|
"""
|
|
)
|
|
while not check_table_is_ready(instance, "test.nats"):
|
|
logging.debug("Table test.nats is not yet ready")
|
|
time.sleep(0.5)
|
|
|
|
messages = []
|
|
for i in range(50):
|
|
messages.append(json.dumps({"key": i, "value": i}))
|
|
asyncio.run(nats_produce_messages(nats_cluster, "select", messages))
|
|
|
|
# The order of messages in select * from test.nats is not guaranteed, so sleep to collect everything in one select
|
|
time.sleep(1)
|
|
|
|
result = ""
|
|
while True:
|
|
result += instance.query(
|
|
"SELECT * FROM test.nats ORDER BY key", ignore_error=True
|
|
)
|
|
if nats_check_result(result):
|
|
break
|
|
|
|
nats_check_result(result, True)
|
|
|
|
|
|
def test_nats_select_empty(nats_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.nats (key UInt64, value UInt64)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_subjects = 'empty',
|
|
nats_format = 'TSV',
|
|
nats_row_delimiter = '\\n';
|
|
"""
|
|
)
|
|
|
|
assert int(instance.query("SELECT count() FROM test.nats")) == 0
|
|
|
|
|
|
def test_nats_json_without_delimiter(nats_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.nats (key UInt64, value UInt64)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_subjects = 'json',
|
|
nats_format = 'JSONEachRow';
|
|
"""
|
|
)
|
|
while not check_table_is_ready(instance, "test.nats"):
|
|
logging.debug("Table test.nats is not yet ready")
|
|
time.sleep(0.5)
|
|
|
|
messages = ""
|
|
for i in range(25):
|
|
messages += json.dumps({"key": i, "value": i}) + "\n"
|
|
|
|
all_messages = [messages]
|
|
asyncio.run(nats_produce_messages(nats_cluster, "json", all_messages))
|
|
|
|
messages = ""
|
|
for i in range(25, 50):
|
|
messages += json.dumps({"key": i, "value": i}) + "\n"
|
|
all_messages = [messages]
|
|
asyncio.run(nats_produce_messages(nats_cluster, "json", all_messages))
|
|
|
|
time.sleep(1)
|
|
|
|
result = ""
|
|
time_limit_sec = 60
|
|
deadline = time.monotonic() + time_limit_sec
|
|
|
|
while time.monotonic() < deadline:
|
|
result += instance.query(
|
|
"SELECT * FROM test.nats ORDER BY key", ignore_error=True
|
|
)
|
|
if nats_check_result(result):
|
|
break
|
|
|
|
nats_check_result(result, True)
|
|
|
|
|
|
def test_nats_csv_with_delimiter(nats_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.nats (key UInt64, value UInt64)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_subjects = 'csv',
|
|
nats_format = 'CSV',
|
|
nats_row_delimiter = '\\n';
|
|
"""
|
|
)
|
|
while not check_table_is_ready(instance, "test.nats"):
|
|
logging.debug("Table test.nats is not yet ready")
|
|
time.sleep(0.5)
|
|
|
|
messages = []
|
|
for i in range(50):
|
|
messages.append("{i}, {i}".format(i=i))
|
|
|
|
asyncio.run(nats_produce_messages(nats_cluster, "csv", messages))
|
|
|
|
time.sleep(1)
|
|
|
|
result = ""
|
|
for _ in range(60):
|
|
result += instance.query(
|
|
"SELECT * FROM test.nats ORDER BY key", ignore_error=True
|
|
)
|
|
if nats_check_result(result):
|
|
break
|
|
|
|
nats_check_result(result, True)
|
|
|
|
|
|
def test_nats_tsv_with_delimiter(nats_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.nats (key UInt64, value UInt64)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_subjects = 'tsv',
|
|
nats_format = 'TSV',
|
|
nats_row_delimiter = '\\n';
|
|
CREATE TABLE test.view (key UInt64, value UInt64)
|
|
ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
|
|
SELECT * FROM test.nats;
|
|
"""
|
|
)
|
|
while not check_table_is_ready(instance, "test.nats"):
|
|
logging.debug("Table test.nats is not yet ready")
|
|
time.sleep(0.5)
|
|
|
|
messages = []
|
|
for i in range(50):
|
|
messages.append("{i}\t{i}".format(i=i))
|
|
|
|
asyncio.run(nats_produce_messages(nats_cluster, "tsv", messages))
|
|
|
|
result = ""
|
|
for _ in range(60):
|
|
result = instance.query("SELECT * FROM test.view ORDER BY key")
|
|
if nats_check_result(result):
|
|
break
|
|
|
|
nats_check_result(result, True)
|
|
|
|
|
|
#
|
|
|
|
|
|
def test_nats_macros(nats_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.nats (key UInt64, value UInt64)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = '{nats_url}',
|
|
nats_subjects = '{nats_subjects}',
|
|
nats_format = '{nats_format}'
|
|
"""
|
|
)
|
|
while not check_table_is_ready(instance, "test.nats"):
|
|
logging.debug("Table test.nats is not yet ready")
|
|
time.sleep(0.5)
|
|
|
|
message = ""
|
|
for i in range(50):
|
|
message += json.dumps({"key": i, "value": i}) + "\n"
|
|
asyncio.run(nats_produce_messages(nats_cluster, "macro", [message]))
|
|
|
|
time.sleep(1)
|
|
|
|
result = ""
|
|
for _ in range(60):
|
|
result += instance.query(
|
|
"SELECT * FROM test.nats ORDER BY key", ignore_error=True
|
|
)
|
|
if nats_check_result(result):
|
|
break
|
|
|
|
nats_check_result(result, True)
|
|
|
|
|
|
def test_nats_materialized_view(nats_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.nats (key UInt64, value UInt64)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_subjects = 'mv',
|
|
nats_format = 'JSONEachRow',
|
|
nats_row_delimiter = '\\n';
|
|
CREATE TABLE test.view (key UInt64, value UInt64)
|
|
ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
|
|
SELECT * FROM test.nats;
|
|
|
|
CREATE TABLE test.view2 (key UInt64, value UInt64)
|
|
ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
CREATE MATERIALIZED VIEW test.consumer2 TO test.view2 AS
|
|
SELECT * FROM test.nats group by (key, value);
|
|
"""
|
|
)
|
|
while not check_table_is_ready(instance, "test.nats"):
|
|
logging.debug("Table test.nats is not yet ready")
|
|
time.sleep(0.5)
|
|
|
|
messages = []
|
|
for i in range(50):
|
|
messages.append(json.dumps({"key": i, "value": i}))
|
|
|
|
asyncio.run(nats_produce_messages(nats_cluster, "mv", messages))
|
|
|
|
time_limit_sec = 60
|
|
deadline = time.monotonic() + time_limit_sec
|
|
|
|
while time.monotonic() < deadline:
|
|
result = instance.query("SELECT * FROM test.view ORDER BY key")
|
|
if nats_check_result(result):
|
|
break
|
|
|
|
nats_check_result(result, True)
|
|
|
|
deadline = time.monotonic() + time_limit_sec
|
|
|
|
while time.monotonic() < deadline:
|
|
result = instance.query("SELECT * FROM test.view2 ORDER BY key")
|
|
if nats_check_result(result):
|
|
break
|
|
|
|
nats_check_result(result, True)
|
|
|
|
|
|
def test_nats_materialized_view_with_subquery(nats_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.nats (key UInt64, value UInt64)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_subjects = 'mvsq',
|
|
nats_format = 'JSONEachRow',
|
|
nats_row_delimiter = '\\n';
|
|
CREATE TABLE test.view (key UInt64, value UInt64)
|
|
ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
|
|
SELECT * FROM (SELECT * FROM test.nats);
|
|
"""
|
|
)
|
|
while not check_table_is_ready(instance, "test.nats"):
|
|
logging.debug("Table test.nats is not yet ready")
|
|
time.sleep(0.5)
|
|
|
|
messages = []
|
|
for i in range(50):
|
|
messages.append(json.dumps({"key": i, "value": i}))
|
|
asyncio.run(nats_produce_messages(nats_cluster, "mvsq", messages))
|
|
|
|
time_limit_sec = 60
|
|
deadline = time.monotonic() + time_limit_sec
|
|
|
|
while time.monotonic() < deadline:
|
|
result = instance.query("SELECT * FROM test.view ORDER BY key")
|
|
if nats_check_result(result):
|
|
break
|
|
|
|
nats_check_result(result, True)
|
|
|
|
|
|
def test_nats_many_materialized_views(nats_cluster):
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.view1;
|
|
DROP TABLE IF EXISTS test.view2;
|
|
DROP TABLE IF EXISTS test.consumer1;
|
|
DROP TABLE IF EXISTS test.consumer2;
|
|
CREATE TABLE test.nats (key UInt64, value UInt64)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_subjects = 'mmv',
|
|
nats_format = 'JSONEachRow',
|
|
nats_row_delimiter = '\\n';
|
|
CREATE TABLE test.view1 (key UInt64, value UInt64)
|
|
ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
CREATE TABLE test.view2 (key UInt64, value UInt64)
|
|
ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
CREATE MATERIALIZED VIEW test.consumer1 TO test.view1 AS
|
|
SELECT * FROM test.nats;
|
|
CREATE MATERIALIZED VIEW test.consumer2 TO test.view2 AS
|
|
SELECT * FROM test.nats;
|
|
"""
|
|
)
|
|
while not check_table_is_ready(instance, "test.nats"):
|
|
logging.debug("Table test.nats is not yet ready")
|
|
time.sleep(0.5)
|
|
|
|
messages = []
|
|
for i in range(50):
|
|
messages.append(json.dumps({"key": i, "value": i}))
|
|
asyncio.run(nats_produce_messages(nats_cluster, "mmv", messages))
|
|
|
|
time_limit_sec = 60
|
|
deadline = time.monotonic() + time_limit_sec
|
|
|
|
while time.monotonic() < deadline:
|
|
result1 = instance.query("SELECT * FROM test.view1 ORDER BY key")
|
|
result2 = instance.query("SELECT * FROM test.view2 ORDER BY key")
|
|
if nats_check_result(result1) and nats_check_result(result2):
|
|
break
|
|
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.consumer1;
|
|
DROP TABLE test.consumer2;
|
|
DROP TABLE test.view1;
|
|
DROP TABLE test.view2;
|
|
"""
|
|
)
|
|
|
|
nats_check_result(result1, True)
|
|
nats_check_result(result2, True)
|
|
|
|
|
|
def test_nats_protobuf(nats_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.nats (key UInt64, value String)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_subjects = 'pb',
|
|
nats_format = 'Protobuf',
|
|
nats_schema = 'nats.proto:ProtoKeyValue';
|
|
CREATE TABLE test.view (key UInt64, value UInt64)
|
|
ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
|
|
SELECT * FROM test.nats;
|
|
"""
|
|
)
|
|
while not check_table_is_ready(instance, "test.nats"):
|
|
logging.debug("Table test.nats is not yet ready")
|
|
time.sleep(0.5)
|
|
|
|
data = b""
|
|
for i in range(0, 20):
|
|
msg = nats_pb2.ProtoKeyValue()
|
|
msg.key = i
|
|
msg.value = str(i)
|
|
serialized_msg = msg.SerializeToString()
|
|
data = data + _VarintBytes(len(serialized_msg)) + serialized_msg
|
|
asyncio.run(nats_produce_messages(nats_cluster, "pb", bytes=data))
|
|
data = b""
|
|
for i in range(20, 21):
|
|
msg = nats_pb2.ProtoKeyValue()
|
|
msg.key = i
|
|
msg.value = str(i)
|
|
serialized_msg = msg.SerializeToString()
|
|
data = data + _VarintBytes(len(serialized_msg)) + serialized_msg
|
|
asyncio.run(nats_produce_messages(nats_cluster, "pb", bytes=data))
|
|
data = b""
|
|
for i in range(21, 50):
|
|
msg = nats_pb2.ProtoKeyValue()
|
|
msg.key = i
|
|
msg.value = str(i)
|
|
serialized_msg = msg.SerializeToString()
|
|
data = data + _VarintBytes(len(serialized_msg)) + serialized_msg
|
|
asyncio.run(nats_produce_messages(nats_cluster, "pb", bytes=data))
|
|
|
|
result = ""
|
|
time_limit_sec = 60
|
|
deadline = time.monotonic() + time_limit_sec
|
|
|
|
while time.monotonic() < deadline:
|
|
result = instance.query("SELECT * FROM test.view ORDER BY key")
|
|
if nats_check_result(result):
|
|
break
|
|
|
|
nats_check_result(result, True)
|
|
|
|
|
|
def test_nats_big_message(nats_cluster):
|
|
# Create batchs of messages of size ~100Kb
|
|
nats_messages = 1000
|
|
batch_messages = 1000
|
|
messages = [
|
|
json.dumps({"key": i, "value": "x" * 100}) * batch_messages
|
|
for i in range(nats_messages)
|
|
]
|
|
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.nats (key UInt64, value String)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_subjects = 'big',
|
|
nats_format = 'JSONEachRow';
|
|
CREATE TABLE test.view (key UInt64, value String)
|
|
ENGINE = MergeTree
|
|
ORDER BY key;
|
|
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
|
|
SELECT * FROM test.nats;
|
|
"""
|
|
)
|
|
while not check_table_is_ready(instance, "test.nats"):
|
|
logging.debug("Table test.nats is not yet ready")
|
|
time.sleep(0.5)
|
|
|
|
asyncio.run(nats_produce_messages(nats_cluster, "big", messages))
|
|
|
|
while True:
|
|
result = instance.query("SELECT count() FROM test.view")
|
|
if int(result) == batch_messages * nats_messages:
|
|
break
|
|
|
|
assert (
|
|
int(result) == nats_messages * batch_messages
|
|
), "ClickHouse lost some messages: {}".format(result)
|
|
|
|
|
|
def test_nats_mv_combo(nats_cluster):
|
|
NUM_MV = 5
|
|
NUM_CONSUMERS = 4
|
|
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.nats (key UInt64, value UInt64)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_subjects = 'combo',
|
|
nats_num_consumers = {},
|
|
nats_format = 'JSONEachRow',
|
|
nats_row_delimiter = '\\n';
|
|
""".format(
|
|
NUM_CONSUMERS
|
|
)
|
|
)
|
|
while not check_table_is_ready(instance, "test.nats"):
|
|
logging.debug("Table test.nats is not yet ready")
|
|
time.sleep(0.5)
|
|
|
|
for mv_id in range(NUM_MV):
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.combo_{0};
|
|
DROP TABLE IF EXISTS test.combo_{0}_mv;
|
|
CREATE TABLE test.combo_{0} (key UInt64, value UInt64)
|
|
ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
CREATE MATERIALIZED VIEW test.combo_{0}_mv TO test.combo_{0} AS
|
|
SELECT * FROM test.nats;
|
|
""".format(
|
|
mv_id
|
|
)
|
|
)
|
|
|
|
time.sleep(2)
|
|
|
|
i = [0]
|
|
messages_num = 10000
|
|
|
|
def produce():
|
|
messages = []
|
|
for _ in range(messages_num):
|
|
messages.append(json.dumps({"key": i[0], "value": i[0]}))
|
|
i[0] += 1
|
|
asyncio.run(nats_produce_messages(nats_cluster, "combo", messages))
|
|
|
|
threads = []
|
|
threads_num = 20
|
|
|
|
for _ in range(threads_num):
|
|
threads.append(threading.Thread(target=produce))
|
|
for thread in threads:
|
|
time.sleep(random.uniform(0, 1))
|
|
thread.start()
|
|
|
|
while True:
|
|
result = 0
|
|
for mv_id in range(NUM_MV):
|
|
result += int(
|
|
instance.query("SELECT count() FROM test.combo_{0}".format(mv_id))
|
|
)
|
|
if int(result) == messages_num * threads_num * NUM_MV:
|
|
break
|
|
time.sleep(1)
|
|
|
|
for thread in threads:
|
|
thread.join()
|
|
|
|
for mv_id in range(NUM_MV):
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.combo_{0}_mv;
|
|
DROP TABLE test.combo_{0};
|
|
""".format(
|
|
mv_id
|
|
)
|
|
)
|
|
|
|
assert (
|
|
int(result) == messages_num * threads_num * NUM_MV
|
|
), "ClickHouse lost some messages: {}".format(result)
|
|
|
|
|
|
def test_nats_insert(nats_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.nats (key UInt64, value UInt64)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_subjects = 'insert',
|
|
nats_format = 'TSV',
|
|
nats_row_delimiter = '\\n';
|
|
"""
|
|
)
|
|
while not check_table_is_ready(instance, "test.nats"):
|
|
logging.debug("Table test.nats is not yet ready")
|
|
time.sleep(0.5)
|
|
|
|
values = []
|
|
for i in range(50):
|
|
values.append("({i}, {i})".format(i=i))
|
|
values = ",".join(values)
|
|
|
|
insert_messages = []
|
|
|
|
async def sub_to_nats():
|
|
nc = await nats_connect_ssl(
|
|
nats_cluster.nats_port,
|
|
user="click",
|
|
password="house",
|
|
ssl_ctx=nats_cluster.nats_ssl_context,
|
|
)
|
|
sub = await nc.subscribe("insert")
|
|
await sub.unsubscribe(50)
|
|
async for msg in sub.messages:
|
|
insert_messages.append(msg.data.decode())
|
|
|
|
await sub.drain()
|
|
await nc.drain()
|
|
|
|
def run_sub():
|
|
asyncio.run(sub_to_nats())
|
|
|
|
thread = threading.Thread(target=run_sub)
|
|
thread.start()
|
|
time.sleep(1)
|
|
|
|
while True:
|
|
try:
|
|
instance.query("INSERT INTO test.nats VALUES {}".format(values))
|
|
break
|
|
except QueryRuntimeException as e:
|
|
if "Local: Timed out." in str(e):
|
|
continue
|
|
else:
|
|
raise
|
|
thread.join()
|
|
|
|
result = "\n".join(insert_messages)
|
|
nats_check_result(result, True)
|
|
|
|
|
|
def test_nats_many_subjects_insert_wrong(nats_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.nats (key UInt64, value UInt64)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_subjects = 'insert1,insert2.>,insert3.*.foo',
|
|
nats_format = 'TSV',
|
|
nats_row_delimiter = '\\n';
|
|
"""
|
|
)
|
|
while not check_table_is_ready(instance, "test.nats"):
|
|
logging.debug("Table test.nats is not yet ready")
|
|
time.sleep(0.5)
|
|
|
|
values = []
|
|
for i in range(50):
|
|
values.append("({i}, {i})".format(i=i))
|
|
values = ",".join(values)
|
|
|
|
# no subject specified
|
|
instance.query_and_get_error("INSERT INTO test.nats VALUES {}".format(values))
|
|
|
|
# can't insert into wildcard subjects
|
|
instance.query_and_get_error(
|
|
"INSERT INTO test.nats SETTINGS stream_like_engine_insert_queue='insert2.>' VALUES {}".format(
|
|
values
|
|
)
|
|
)
|
|
instance.query_and_get_error(
|
|
"INSERT INTO test.nats SETTINGS stream_like_engine_insert_queue='insert3.*.foo' VALUES {}".format(
|
|
values
|
|
)
|
|
)
|
|
|
|
# specified subject is not among engine's subjects
|
|
instance.query_and_get_error(
|
|
"INSERT INTO test.nats SETTINGS stream_like_engine_insert_queue='insert4' VALUES {}".format(
|
|
values
|
|
)
|
|
)
|
|
instance.query_and_get_error(
|
|
"INSERT INTO test.nats SETTINGS stream_like_engine_insert_queue='insert3.foo.baz' VALUES {}".format(
|
|
values
|
|
)
|
|
)
|
|
instance.query_and_get_error(
|
|
"INSERT INTO test.nats SETTINGS stream_like_engine_insert_queue='foo.insert2' VALUES {}".format(
|
|
values
|
|
)
|
|
)
|
|
|
|
|
|
def test_nats_many_subjects_insert_right(nats_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.nats (key UInt64, value UInt64)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_subjects = 'right_insert1,right_insert2',
|
|
nats_format = 'TSV',
|
|
nats_row_delimiter = '\\n';
|
|
"""
|
|
)
|
|
while not check_table_is_ready(instance, "test.nats"):
|
|
logging.debug("Table test.nats is not yet ready")
|
|
time.sleep(0.5)
|
|
|
|
values = []
|
|
for i in range(50):
|
|
values.append("({i}, {i})".format(i=i))
|
|
values = ",".join(values)
|
|
|
|
insert_messages = []
|
|
|
|
async def sub_to_nats():
|
|
nc = await nats_connect_ssl(
|
|
nats_cluster.nats_port,
|
|
user="click",
|
|
password="house",
|
|
ssl_ctx=nats_cluster.nats_ssl_context,
|
|
)
|
|
sub = await nc.subscribe("right_insert1")
|
|
await sub.unsubscribe(50)
|
|
async for msg in sub.messages:
|
|
insert_messages.append(msg.data.decode())
|
|
|
|
await sub.drain()
|
|
await nc.drain()
|
|
|
|
def run_sub():
|
|
asyncio.run(sub_to_nats())
|
|
|
|
thread = threading.Thread(target=run_sub)
|
|
thread.start()
|
|
time.sleep(1)
|
|
|
|
while True:
|
|
try:
|
|
instance.query(
|
|
"INSERT INTO test.nats SETTINGS stream_like_engine_insert_queue='right_insert1' VALUES {}".format(
|
|
values
|
|
)
|
|
)
|
|
break
|
|
except QueryRuntimeException as e:
|
|
if "Local: Timed out." in str(e):
|
|
continue
|
|
else:
|
|
raise
|
|
thread.join()
|
|
|
|
result = "\n".join(insert_messages)
|
|
nats_check_result(result, True)
|
|
|
|
|
|
def test_nats_many_inserts(nats_cluster):
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.nats_many;
|
|
DROP TABLE IF EXISTS test.nats_consume;
|
|
DROP TABLE IF EXISTS test.view_many;
|
|
DROP TABLE IF EXISTS test.consumer_many;
|
|
CREATE TABLE test.nats_many (key UInt64, value UInt64)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_subjects = 'many_inserts',
|
|
nats_format = 'TSV',
|
|
nats_row_delimiter = '\\n';
|
|
CREATE TABLE test.nats_consume (key UInt64, value UInt64)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_subjects = 'many_inserts',
|
|
nats_format = 'TSV',
|
|
nats_row_delimiter = '\\n';
|
|
CREATE TABLE test.view_many (key UInt64, value UInt64)
|
|
ENGINE = MergeTree
|
|
ORDER BY key;
|
|
CREATE MATERIALIZED VIEW test.consumer_many TO test.view_many AS
|
|
SELECT * FROM test.nats_consume;
|
|
"""
|
|
)
|
|
while not check_table_is_ready(instance, "test.nats_consume"):
|
|
logging.debug("Table test.nats_consume is not yet ready")
|
|
time.sleep(0.5)
|
|
|
|
messages_num = 10000
|
|
values = []
|
|
for i in range(messages_num):
|
|
values.append("({i}, {i})".format(i=i))
|
|
values = ",".join(values)
|
|
|
|
def insert():
|
|
while True:
|
|
try:
|
|
instance.query("INSERT INTO test.nats_many VALUES {}".format(values))
|
|
break
|
|
except QueryRuntimeException as e:
|
|
if "Local: Timed out." in str(e):
|
|
continue
|
|
else:
|
|
raise
|
|
|
|
threads = []
|
|
threads_num = 10
|
|
for _ in range(threads_num):
|
|
threads.append(threading.Thread(target=insert))
|
|
for thread in threads:
|
|
time.sleep(random.uniform(0, 1))
|
|
thread.start()
|
|
|
|
for thread in threads:
|
|
thread.join()
|
|
|
|
time_limit_sec = 300
|
|
deadline = time.monotonic() + time_limit_sec
|
|
|
|
while time.monotonic() < deadline:
|
|
result = instance.query("SELECT count() FROM test.view_many")
|
|
print(result, messages_num * threads_num)
|
|
if int(result) >= messages_num * threads_num:
|
|
break
|
|
time.sleep(1)
|
|
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.nats_consume;
|
|
DROP TABLE test.nats_many;
|
|
DROP TABLE test.consumer_many;
|
|
DROP TABLE test.view_many;
|
|
"""
|
|
)
|
|
|
|
assert (
|
|
int(result) == messages_num * threads_num
|
|
), "ClickHouse lost some messages or got duplicated ones. Total count: {}".format(
|
|
result
|
|
)
|
|
|
|
|
|
def test_nats_overloaded_insert(nats_cluster):
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.view_overload;
|
|
DROP TABLE IF EXISTS test.consumer_overload;
|
|
DROP TABLE IF EXISTS test.nats_consume;
|
|
CREATE TABLE test.nats_consume (key UInt64, value UInt64)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_subjects = 'over',
|
|
nats_num_consumers = 5,
|
|
nats_max_block_size = 10000,
|
|
nats_format = 'TSV',
|
|
nats_row_delimiter = '\\n';
|
|
CREATE TABLE test.nats_overload (key UInt64, value UInt64)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_subjects = 'over',
|
|
nats_format = 'TSV',
|
|
nats_row_delimiter = '\\n';
|
|
CREATE TABLE test.view_overload (key UInt64, value UInt64)
|
|
ENGINE = MergeTree
|
|
ORDER BY key
|
|
SETTINGS old_parts_lifetime=5, cleanup_delay_period=2, cleanup_delay_period_random_add=3;
|
|
CREATE MATERIALIZED VIEW test.consumer_overload TO test.view_overload AS
|
|
SELECT * FROM test.nats_consume;
|
|
"""
|
|
)
|
|
while not check_table_is_ready(instance, "test.nats_consume"):
|
|
logging.debug("Table test.nats_consume is not yet ready")
|
|
time.sleep(0.5)
|
|
|
|
messages_num = 100000
|
|
|
|
def insert():
|
|
values = []
|
|
for i in range(messages_num):
|
|
values.append("({i}, {i})".format(i=i))
|
|
values = ",".join(values)
|
|
|
|
while True:
|
|
try:
|
|
instance.query(
|
|
"INSERT INTO test.nats_overload VALUES {}".format(values)
|
|
)
|
|
break
|
|
except QueryRuntimeException as e:
|
|
if "Local: Timed out." in str(e):
|
|
continue
|
|
else:
|
|
raise
|
|
|
|
threads = []
|
|
threads_num = 5
|
|
for _ in range(threads_num):
|
|
threads.append(threading.Thread(target=insert))
|
|
for thread in threads:
|
|
time.sleep(random.uniform(0, 1))
|
|
thread.start()
|
|
|
|
time_limit_sec = 300
|
|
deadline = time.monotonic() + time_limit_sec
|
|
|
|
while time.monotonic() < deadline:
|
|
result = instance.query("SELECT count() FROM test.view_overload")
|
|
time.sleep(1)
|
|
if int(result) >= messages_num * threads_num:
|
|
break
|
|
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.consumer_overload;
|
|
DROP TABLE test.view_overload;
|
|
DROP TABLE test.nats_consume;
|
|
DROP TABLE test.nats_overload;
|
|
"""
|
|
)
|
|
|
|
for thread in threads:
|
|
thread.join()
|
|
|
|
assert (
|
|
int(result) == messages_num * threads_num
|
|
), "ClickHouse lost some messages or got duplicated ones. Total count: {}".format(
|
|
result
|
|
)
|
|
|
|
|
|
def test_nats_virtual_column(nats_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.nats_virtuals (key UInt64, value UInt64)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_subjects = 'virtuals',
|
|
nats_format = 'JSONEachRow';
|
|
CREATE MATERIALIZED VIEW test.view Engine=Log AS
|
|
SELECT value, key, _subject FROM test.nats_virtuals;
|
|
"""
|
|
)
|
|
while not check_table_is_ready(instance, "test.nats_virtuals"):
|
|
logging.debug("Table test.nats_virtuals is not yet ready")
|
|
time.sleep(0.5)
|
|
|
|
message_num = 10
|
|
i = 0
|
|
messages = []
|
|
for _ in range(message_num):
|
|
messages.append(json.dumps({"key": i, "value": i}))
|
|
i += 1
|
|
|
|
asyncio.run(nats_produce_messages(nats_cluster, "virtuals", messages))
|
|
|
|
while True:
|
|
result = instance.query("SELECT count() FROM test.view")
|
|
time.sleep(1)
|
|
if int(result) == message_num:
|
|
break
|
|
|
|
result = instance.query(
|
|
"""
|
|
SELECT key, value, _subject
|
|
FROM test.view ORDER BY key
|
|
"""
|
|
)
|
|
|
|
expected = """\
|
|
0 0 virtuals
|
|
1 1 virtuals
|
|
2 2 virtuals
|
|
3 3 virtuals
|
|
4 4 virtuals
|
|
5 5 virtuals
|
|
6 6 virtuals
|
|
7 7 virtuals
|
|
8 8 virtuals
|
|
9 9 virtuals
|
|
"""
|
|
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.nats_virtuals;
|
|
DROP TABLE test.view;
|
|
"""
|
|
)
|
|
|
|
assert TSV(result) == TSV(expected)
|
|
|
|
|
|
def test_nats_virtual_column_with_materialized_view(nats_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.nats_virtuals_mv (key UInt64, value UInt64)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_subjects = 'virtuals_mv',
|
|
nats_format = 'JSONEachRow';
|
|
CREATE TABLE test.view (key UInt64, value UInt64, subject String) ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
|
|
SELECT *, _subject as subject
|
|
FROM test.nats_virtuals_mv;
|
|
"""
|
|
)
|
|
while not check_table_is_ready(instance, "test.nats_virtuals_mv"):
|
|
logging.debug("Table test.nats_virtuals_mv is not yet ready")
|
|
time.sleep(0.5)
|
|
|
|
message_num = 10
|
|
i = 0
|
|
messages = []
|
|
for _ in range(message_num):
|
|
messages.append(json.dumps({"key": i, "value": i}))
|
|
i += 1
|
|
|
|
asyncio.run(nats_produce_messages(nats_cluster, "virtuals_mv", messages))
|
|
|
|
while True:
|
|
result = instance.query("SELECT count() FROM test.view")
|
|
time.sleep(1)
|
|
if int(result) == message_num:
|
|
break
|
|
|
|
result = instance.query("SELECT key, value, subject FROM test.view ORDER BY key")
|
|
expected = """\
|
|
0 0 virtuals_mv
|
|
1 1 virtuals_mv
|
|
2 2 virtuals_mv
|
|
3 3 virtuals_mv
|
|
4 4 virtuals_mv
|
|
5 5 virtuals_mv
|
|
6 6 virtuals_mv
|
|
7 7 virtuals_mv
|
|
8 8 virtuals_mv
|
|
9 9 virtuals_mv
|
|
"""
|
|
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.consumer;
|
|
DROP TABLE test.view;
|
|
DROP TABLE test.nats_virtuals_mv
|
|
"""
|
|
)
|
|
|
|
assert TSV(result) == TSV(expected)
|
|
|
|
|
|
def test_nats_many_consumers_to_each_queue(nats_cluster):
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.destination;
|
|
CREATE TABLE test.destination(key UInt64, value UInt64)
|
|
ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
"""
|
|
)
|
|
|
|
num_tables = 4
|
|
for table_id in range(num_tables):
|
|
print(("Setting up table {}".format(table_id)))
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.many_consumers_{0};
|
|
DROP TABLE IF EXISTS test.many_consumers_{0}_mv;
|
|
CREATE TABLE test.many_consumers_{0} (key UInt64, value UInt64)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_subjects = 'many_consumers',
|
|
nats_num_consumers = 2,
|
|
nats_queue_group = 'many_consumers',
|
|
nats_format = 'JSONEachRow',
|
|
nats_row_delimiter = '\\n';
|
|
CREATE MATERIALIZED VIEW test.many_consumers_{0}_mv TO test.destination AS
|
|
SELECT key, value FROM test.many_consumers_{0};
|
|
""".format(
|
|
table_id
|
|
)
|
|
)
|
|
while not check_table_is_ready(
|
|
instance, "test.many_consumers_{}".format(table_id)
|
|
):
|
|
logging.debug(
|
|
"Table test.many_consumers_{} is not yet ready".format(table_id)
|
|
)
|
|
time.sleep(0.5)
|
|
|
|
i = [0]
|
|
messages_num = 1000
|
|
|
|
def produce():
|
|
messages = []
|
|
for _ in range(messages_num):
|
|
messages.append(json.dumps({"key": i[0], "value": i[0]}))
|
|
i[0] += 1
|
|
asyncio.run(nats_produce_messages(nats_cluster, "many_consumers", messages))
|
|
|
|
threads = []
|
|
threads_num = 20
|
|
|
|
for _ in range(threads_num):
|
|
threads.append(threading.Thread(target=produce))
|
|
for thread in threads:
|
|
time.sleep(random.uniform(0, 1))
|
|
thread.start()
|
|
|
|
result1 = ""
|
|
while True:
|
|
result1 = instance.query("SELECT count() FROM test.destination")
|
|
time.sleep(1)
|
|
if int(result1) == messages_num * threads_num:
|
|
break
|
|
|
|
for thread in threads:
|
|
thread.join()
|
|
|
|
for consumer_id in range(num_tables):
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.many_consumers_{0};
|
|
DROP TABLE test.many_consumers_{0}_mv;
|
|
""".format(
|
|
consumer_id
|
|
)
|
|
)
|
|
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.destination;
|
|
"""
|
|
)
|
|
|
|
assert (
|
|
int(result1) == messages_num * threads_num
|
|
), "ClickHouse lost some messages: {}".format(result1)
|
|
|
|
|
|
def test_nats_restore_failed_connection_without_losses_on_write(nats_cluster):
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.consume;
|
|
CREATE TABLE test.view (key UInt64, value UInt64)
|
|
ENGINE = MergeTree
|
|
ORDER BY key;
|
|
CREATE TABLE test.consume (key UInt64, value UInt64)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_subjects = 'producer_reconnect',
|
|
nats_format = 'JSONEachRow',
|
|
nats_num_consumers = 2,
|
|
nats_row_delimiter = '\\n';
|
|
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
|
|
SELECT * FROM test.consume;
|
|
DROP TABLE IF EXISTS test.producer_reconnect;
|
|
CREATE TABLE test.producer_reconnect (key UInt64, value UInt64)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_subjects = 'producer_reconnect',
|
|
nats_format = 'JSONEachRow',
|
|
nats_row_delimiter = '\\n';
|
|
"""
|
|
)
|
|
while not check_table_is_ready(instance, "test.consume"):
|
|
logging.debug("Table test.consume is not yet ready")
|
|
time.sleep(0.5)
|
|
|
|
messages_num = 100000
|
|
values = []
|
|
for i in range(messages_num):
|
|
values.append("({i}, {i})".format(i=i))
|
|
values = ",".join(values)
|
|
|
|
while True:
|
|
try:
|
|
instance.query(
|
|
"INSERT INTO test.producer_reconnect VALUES {}".format(values)
|
|
)
|
|
break
|
|
except QueryRuntimeException as e:
|
|
if "Local: Timed out." in str(e):
|
|
continue
|
|
else:
|
|
raise
|
|
|
|
while int(instance.query("SELECT count() FROM test.view")) == 0:
|
|
time.sleep(0.1)
|
|
|
|
kill_nats(nats_cluster.nats_docker_id)
|
|
time.sleep(4)
|
|
revive_nats(nats_cluster.nats_docker_id, nats_cluster.nats_port)
|
|
|
|
while True:
|
|
result = instance.query("SELECT count(DISTINCT key) FROM test.view")
|
|
time.sleep(1)
|
|
if int(result) == messages_num:
|
|
break
|
|
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.consume;
|
|
DROP TABLE test.producer_reconnect;
|
|
"""
|
|
)
|
|
|
|
assert int(result) == messages_num, "ClickHouse lost some messages: {}".format(
|
|
result
|
|
)
|
|
|
|
|
|
def test_nats_no_connection_at_startup_1(nats_cluster):
|
|
# no connection when table is initialized
|
|
nats_cluster.pause_container("nats1")
|
|
instance.query_and_get_error(
|
|
"""
|
|
CREATE TABLE test.cs (key UInt64, value UInt64)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_subjects = 'cs',
|
|
nats_format = 'JSONEachRow',
|
|
nats_num_consumers = '5',
|
|
nats_row_delimiter = '\\n';
|
|
"""
|
|
)
|
|
nats_cluster.unpause_container("nats1")
|
|
|
|
|
|
def test_nats_no_connection_at_startup_2(nats_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.cs (key UInt64, value UInt64)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_subjects = 'cs',
|
|
nats_format = 'JSONEachRow',
|
|
nats_num_consumers = '5',
|
|
nats_row_delimiter = '\\n';
|
|
CREATE TABLE test.view (key UInt64, value UInt64)
|
|
ENGINE = MergeTree
|
|
ORDER BY key;
|
|
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
|
|
SELECT * FROM test.cs;
|
|
"""
|
|
)
|
|
|
|
instance.query("DETACH TABLE test.cs")
|
|
nats_cluster.pause_container("nats1")
|
|
instance.query("ATTACH TABLE test.cs")
|
|
nats_cluster.unpause_container("nats1")
|
|
while not check_table_is_ready(instance, "test.cs"):
|
|
logging.debug("Table test.cs is not yet ready")
|
|
time.sleep(0.5)
|
|
|
|
messages_num = 1000
|
|
messages = []
|
|
for i in range(messages_num):
|
|
messages.append(json.dumps({"key": i, "value": i}))
|
|
asyncio.run(nats_produce_messages(nats_cluster, "cs", messages))
|
|
|
|
for _ in range(20):
|
|
result = instance.query("SELECT count() FROM test.view")
|
|
time.sleep(1)
|
|
if int(result) == messages_num:
|
|
break
|
|
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.consumer;
|
|
DROP TABLE test.cs;
|
|
"""
|
|
)
|
|
|
|
assert int(result) == messages_num, "ClickHouse lost some messages: {}".format(
|
|
result
|
|
)
|
|
|
|
|
|
def test_nats_format_factory_settings(nats_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.format_settings (
|
|
id String, date DateTime
|
|
) ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_subjects = 'format_settings',
|
|
nats_format = 'JSONEachRow',
|
|
date_time_input_format = 'best_effort';
|
|
"""
|
|
)
|
|
while not check_table_is_ready(instance, "test.format_settings"):
|
|
logging.debug("Table test.format_settings is not yet ready")
|
|
time.sleep(0.5)
|
|
|
|
message = json.dumps(
|
|
{"id": "format_settings_test", "date": "2021-01-19T14:42:33.1829214Z"}
|
|
)
|
|
expected = instance.query(
|
|
"""SELECT parseDateTimeBestEffort(CAST('2021-01-19T14:42:33.1829214Z', 'String'))"""
|
|
)
|
|
|
|
asyncio.run(nats_produce_messages(nats_cluster, "format_settings", [message]))
|
|
|
|
while True:
|
|
result = instance.query("SELECT date FROM test.format_settings")
|
|
if result == expected:
|
|
break
|
|
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.view (
|
|
id String, date DateTime
|
|
) ENGINE = MergeTree ORDER BY id;
|
|
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
|
|
SELECT * FROM test.format_settings;
|
|
"""
|
|
)
|
|
|
|
asyncio.run(nats_produce_messages(nats_cluster, "format_settings", [message]))
|
|
while True:
|
|
result = instance.query("SELECT date FROM test.view")
|
|
if result == expected:
|
|
break
|
|
|
|
instance.query(
|
|
"""
|
|
DROP TABLE test.consumer;
|
|
DROP TABLE test.format_settings;
|
|
"""
|
|
)
|
|
|
|
assert result == expected
|
|
|
|
|
|
def test_nats_bad_args(nats_cluster):
|
|
instance.query_and_get_error(
|
|
"""
|
|
CREATE TABLE test.drop (key UInt64, value UInt64)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_secure = true,
|
|
nats_format = 'JSONEachRow';
|
|
"""
|
|
)
|
|
|
|
|
|
def test_nats_drop_mv(nats_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.nats (key UInt64, value UInt64)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_subjects = 'mv',
|
|
nats_format = 'JSONEachRow';
|
|
CREATE TABLE test.view (key UInt64, value UInt64)
|
|
ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
|
|
SELECT * FROM test.nats;
|
|
"""
|
|
)
|
|
while not check_table_is_ready(instance, "test.nats"):
|
|
logging.debug("Table test.nats is not yet ready")
|
|
time.sleep(0.5)
|
|
|
|
messages = []
|
|
for i in range(20):
|
|
messages.append(json.dumps({"key": i, "value": i}))
|
|
asyncio.run(nats_produce_messages(nats_cluster, "mv", messages))
|
|
|
|
instance.query("DROP VIEW test.consumer")
|
|
messages = []
|
|
for i in range(20, 40):
|
|
messages.append(json.dumps({"key": i, "value": i}))
|
|
asyncio.run(nats_produce_messages(nats_cluster, "mv", messages))
|
|
|
|
instance.query(
|
|
"""
|
|
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
|
|
SELECT * FROM test.nats;
|
|
"""
|
|
)
|
|
messages = []
|
|
for i in range(40, 50):
|
|
messages.append(json.dumps({"key": i, "value": i}))
|
|
asyncio.run(nats_produce_messages(nats_cluster, "mv", messages))
|
|
|
|
while True:
|
|
result = instance.query("SELECT * FROM test.view ORDER BY key")
|
|
if nats_check_result(result):
|
|
break
|
|
|
|
nats_check_result(result, True)
|
|
|
|
instance.query("DROP VIEW test.consumer")
|
|
messages = []
|
|
for i in range(50, 60):
|
|
messages.append(json.dumps({"key": i, "value": i}))
|
|
asyncio.run(nats_produce_messages(nats_cluster, "mv", messages))
|
|
|
|
count = 0
|
|
while True:
|
|
count = int(instance.query("SELECT count() FROM test.nats"))
|
|
if count:
|
|
break
|
|
|
|
assert count > 0
|
|
|
|
|
|
def test_nats_predefined_configuration(nats_cluster):
|
|
instance.query(
|
|
"""
|
|
CREATE TABLE test.nats (key UInt64, value UInt64)
|
|
ENGINE = NATS(nats1) """
|
|
)
|
|
while not check_table_is_ready(instance, "test.nats"):
|
|
logging.debug("Table test.nats is not yet ready")
|
|
time.sleep(0.5)
|
|
|
|
asyncio.run(
|
|
nats_produce_messages(
|
|
nats_cluster, "named", [json.dumps({"key": 1, "value": 2})]
|
|
)
|
|
)
|
|
while True:
|
|
result = instance.query(
|
|
"SELECT * FROM test.nats ORDER BY key", ignore_error=True
|
|
)
|
|
if result == "1\t2\n":
|
|
break
|
|
|
|
|
|
def test_format_with_prefix_and_suffix(nats_cluster):
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.nats;
|
|
|
|
CREATE TABLE test.nats (key UInt64, value UInt64)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_subjects = 'custom',
|
|
nats_format = 'CustomSeparated';
|
|
"""
|
|
)
|
|
while not check_table_is_ready(instance, "test.nats"):
|
|
logging.debug("Table test.nats is not yet ready")
|
|
time.sleep(0.5)
|
|
|
|
insert_messages = []
|
|
|
|
async def sub_to_nats():
|
|
nc = await nats_connect_ssl(
|
|
nats_cluster.nats_port,
|
|
user="click",
|
|
password="house",
|
|
ssl_ctx=nats_cluster.nats_ssl_context,
|
|
)
|
|
sub = await nc.subscribe("custom")
|
|
await sub.unsubscribe(2)
|
|
async for msg in sub.messages:
|
|
insert_messages.append(msg.data.decode())
|
|
|
|
await sub.drain()
|
|
await nc.drain()
|
|
|
|
def run_sub():
|
|
asyncio.run(sub_to_nats())
|
|
|
|
thread = threading.Thread(target=run_sub)
|
|
thread.start()
|
|
time.sleep(1)
|
|
|
|
instance.query(
|
|
"INSERT INTO test.nats select number*10 as key, number*100 as value from numbers(2) settings format_custom_result_before_delimiter='<prefix>\n', format_custom_result_after_delimiter='<suffix>\n'"
|
|
)
|
|
|
|
thread.join()
|
|
|
|
assert (
|
|
"".join(insert_messages)
|
|
== "<prefix>\n0\t0\n<suffix>\n<prefix>\n10\t100\n<suffix>\n"
|
|
)
|
|
|
|
|
|
def test_max_rows_per_message(nats_cluster):
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.view;
|
|
DROP TABLE IF EXISTS test.nats;
|
|
|
|
CREATE TABLE test.nats (key UInt64, value UInt64)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_subjects = 'custom1',
|
|
nats_format = 'CustomSeparated',
|
|
nats_max_rows_per_message = 3,
|
|
format_custom_result_before_delimiter = '<prefix>\n',
|
|
format_custom_result_after_delimiter = '<suffix>\n';
|
|
|
|
CREATE MATERIALIZED VIEW test.view Engine=Log AS
|
|
SELECT key, value FROM test.nats;
|
|
"""
|
|
)
|
|
while not check_table_is_ready(instance, "test.nats"):
|
|
logging.debug("Table test.nats is not yet ready")
|
|
time.sleep(0.5)
|
|
|
|
num_rows = 5
|
|
|
|
insert_messages = []
|
|
|
|
async def sub_to_nats():
|
|
nc = await nats_connect_ssl(
|
|
nats_cluster.nats_port,
|
|
user="click",
|
|
password="house",
|
|
ssl_ctx=nats_cluster.nats_ssl_context,
|
|
)
|
|
sub = await nc.subscribe("custom1")
|
|
await sub.unsubscribe(2)
|
|
async for msg in sub.messages:
|
|
insert_messages.append(msg.data.decode())
|
|
|
|
await sub.drain()
|
|
await nc.drain()
|
|
|
|
def run_sub():
|
|
asyncio.run(sub_to_nats())
|
|
|
|
thread = threading.Thread(target=run_sub)
|
|
thread.start()
|
|
time.sleep(1)
|
|
|
|
instance.query(
|
|
f"INSERT INTO test.nats select number*10 as key, number*100 as value from numbers({num_rows}) settings format_custom_result_before_delimiter='<prefix>\n', format_custom_result_after_delimiter='<suffix>\n'"
|
|
)
|
|
|
|
thread.join()
|
|
|
|
assert (
|
|
"".join(insert_messages)
|
|
== "<prefix>\n0\t0\n10\t100\n20\t200\n<suffix>\n<prefix>\n30\t300\n40\t400\n<suffix>\n"
|
|
)
|
|
|
|
attempt = 0
|
|
rows = 0
|
|
while attempt < 100:
|
|
rows = int(instance.query("SELECT count() FROM test.view"))
|
|
if rows == num_rows:
|
|
break
|
|
attempt += 1
|
|
|
|
assert rows == num_rows
|
|
|
|
result = instance.query("SELECT * FROM test.view")
|
|
assert result == "0\t0\n10\t100\n20\t200\n30\t300\n40\t400\n"
|
|
|
|
|
|
def test_row_based_formats(nats_cluster):
|
|
num_rows = 10
|
|
|
|
for format_name in [
|
|
"TSV",
|
|
"TSVWithNamesAndTypes",
|
|
"TSKV",
|
|
"CSV",
|
|
"CSVWithNamesAndTypes",
|
|
"CustomSeparatedWithNamesAndTypes",
|
|
"Values",
|
|
"JSON",
|
|
"JSONEachRow",
|
|
"JSONCompactEachRow",
|
|
"JSONCompactEachRowWithNamesAndTypes",
|
|
"JSONObjectEachRow",
|
|
"Avro",
|
|
"RowBinary",
|
|
"RowBinaryWithNamesAndTypes",
|
|
"MsgPack",
|
|
]:
|
|
print(format_name)
|
|
|
|
instance.query(
|
|
f"""
|
|
DROP TABLE IF EXISTS test.view;
|
|
DROP TABLE IF EXISTS test.nats;
|
|
|
|
CREATE TABLE test.nats (key UInt64, value UInt64)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_subjects = '{format_name}',
|
|
nats_format = '{format_name}';
|
|
|
|
CREATE MATERIALIZED VIEW test.view Engine=Log AS
|
|
SELECT key, value FROM test.nats;
|
|
"""
|
|
)
|
|
|
|
while not check_table_is_ready(instance, "test.nats"):
|
|
logging.debug("Table test.nats is not yet ready")
|
|
time.sleep(0.5)
|
|
|
|
insert_messages = 0
|
|
|
|
async def sub_to_nats():
|
|
nc = await nats_connect_ssl(
|
|
nats_cluster.nats_port,
|
|
user="click",
|
|
password="house",
|
|
ssl_ctx=nats_cluster.nats_ssl_context,
|
|
)
|
|
sub = await nc.subscribe(format_name)
|
|
await sub.unsubscribe(2)
|
|
async for msg in sub.messages:
|
|
nonlocal insert_messages
|
|
insert_messages += 1
|
|
|
|
await sub.drain()
|
|
await nc.drain()
|
|
|
|
def run_sub():
|
|
asyncio.run(sub_to_nats())
|
|
|
|
thread = threading.Thread(target=run_sub)
|
|
thread.start()
|
|
time.sleep(1)
|
|
|
|
instance.query(
|
|
f"INSERT INTO test.nats select number*10 as key, number*100 as value from numbers({num_rows})"
|
|
)
|
|
|
|
thread.join()
|
|
|
|
assert insert_messages == 2
|
|
|
|
attempt = 0
|
|
rows = 0
|
|
while attempt < 100:
|
|
rows = int(instance.query("SELECT count() FROM test.view"))
|
|
if rows == num_rows:
|
|
break
|
|
attempt += 1
|
|
|
|
assert rows == num_rows
|
|
|
|
expected = ""
|
|
for i in range(num_rows):
|
|
expected += str(i * 10) + "\t" + str(i * 100) + "\n"
|
|
|
|
result = instance.query("SELECT * FROM test.view")
|
|
assert result == expected
|
|
|
|
|
|
def test_block_based_formats_1(nats_cluster):
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.nats;
|
|
|
|
CREATE TABLE test.nats (key UInt64, value UInt64)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_subjects = 'PrettySpace',
|
|
nats_format = 'PrettySpace';
|
|
"""
|
|
)
|
|
|
|
insert_messages = []
|
|
|
|
async def sub_to_nats():
|
|
nc = await nats_connect_ssl(
|
|
nats_cluster.nats_port,
|
|
user="click",
|
|
password="house",
|
|
ssl_ctx=nats_cluster.nats_ssl_context,
|
|
)
|
|
sub = await nc.subscribe("PrettySpace")
|
|
await sub.unsubscribe(3)
|
|
async for msg in sub.messages:
|
|
insert_messages.append(msg.data.decode())
|
|
|
|
await sub.drain()
|
|
await nc.drain()
|
|
|
|
def run_sub():
|
|
asyncio.run(sub_to_nats())
|
|
|
|
thread = threading.Thread(target=run_sub)
|
|
thread.start()
|
|
time.sleep(1)
|
|
|
|
attempt = 0
|
|
while attempt < 100:
|
|
try:
|
|
instance.query(
|
|
"INSERT INTO test.nats SELECT number * 10 as key, number * 100 as value FROM numbers(5) settings max_block_size=2, optimize_trivial_insert_select=0;"
|
|
)
|
|
break
|
|
except Exception:
|
|
logging.debug("Table test.nats is not yet ready")
|
|
time.sleep(0.5)
|
|
attempt += 1
|
|
thread.join()
|
|
|
|
data = []
|
|
for message in insert_messages:
|
|
splitted = message.split("\n")
|
|
assert splitted[0] == " \x1b[1mkey\x1b[0m \x1b[1mvalue\x1b[0m"
|
|
assert splitted[1] == ""
|
|
assert splitted[-1] == ""
|
|
data += [line.split() for line in splitted[2:-1]]
|
|
|
|
assert data == [
|
|
["0", "0"],
|
|
["10", "100"],
|
|
["20", "200"],
|
|
["30", "300"],
|
|
["40", "400"],
|
|
]
|
|
|
|
|
|
def test_block_based_formats_2(nats_cluster):
|
|
num_rows = 100
|
|
|
|
for format_name in [
|
|
"JSONColumns",
|
|
"Native",
|
|
"Arrow",
|
|
"Parquet",
|
|
"ORC",
|
|
"JSONCompactColumns",
|
|
]:
|
|
print(format_name)
|
|
|
|
instance.query(
|
|
f"""
|
|
DROP TABLE IF EXISTS test.view;
|
|
DROP TABLE IF EXISTS test.nats;
|
|
|
|
CREATE TABLE test.nats (key UInt64, value UInt64)
|
|
ENGINE = NATS
|
|
SETTINGS nats_url = 'nats1:4444',
|
|
nats_subjects = '{format_name}',
|
|
nats_format = '{format_name}';
|
|
|
|
CREATE MATERIALIZED VIEW test.view Engine=Log AS
|
|
SELECT key, value FROM test.nats;
|
|
"""
|
|
)
|
|
|
|
while not check_table_is_ready(instance, "test.nats"):
|
|
logging.debug("Table test.nats is not yet ready")
|
|
time.sleep(0.5)
|
|
|
|
insert_messages = 0
|
|
|
|
async def sub_to_nats():
|
|
nc = await nats_connect_ssl(
|
|
nats_cluster.nats_port,
|
|
user="click",
|
|
password="house",
|
|
ssl_ctx=nats_cluster.nats_ssl_context,
|
|
)
|
|
sub = await nc.subscribe(format_name)
|
|
await sub.unsubscribe(9)
|
|
async for msg in sub.messages:
|
|
nonlocal insert_messages
|
|
insert_messages += 1
|
|
|
|
await sub.drain()
|
|
await nc.drain()
|
|
|
|
def run_sub():
|
|
asyncio.run(sub_to_nats())
|
|
|
|
thread = threading.Thread(target=run_sub)
|
|
thread.start()
|
|
time.sleep(1)
|
|
|
|
instance.query(
|
|
f"INSERT INTO test.nats SELECT number * 10 as key, number * 100 as value FROM numbers({num_rows}) settings max_block_size=12, optimize_trivial_insert_select=0;"
|
|
)
|
|
|
|
thread.join()
|
|
|
|
assert insert_messages == 9
|
|
|
|
attempt = 0
|
|
rows = 0
|
|
while attempt < 100:
|
|
rows = int(instance.query("SELECT count() FROM test.view"))
|
|
if rows == num_rows:
|
|
break
|
|
attempt += 1
|
|
|
|
assert rows == num_rows
|
|
|
|
result = instance.query("SELECT * FROM test.view ORDER by key")
|
|
expected = ""
|
|
for i in range(num_rows):
|
|
expected += str(i * 10) + "\t" + str(i * 100) + "\n"
|
|
assert result == expected
|
|
|
|
|
|
if __name__ == "__main__":
|
|
cluster.start()
|
|
input("Cluster created, press any key to destroy...")
|
|
cluster.shutdown()
|