2022-10-20 13:54:52 +00:00
import pytest
# FIXME This test is too flaky
# https://github.com/ClickHouse/ClickHouse/issues/39185
pytestmark = pytest . mark . skip
2022-05-17 13:58:09 +00:00
import json
import os . path as p
import random
import subprocess
import threading
import logging
import time
from random import randrange
import math
import asyncio
from google . protobuf . internal . encoder import _VarintBytes
from helpers . client import QueryRuntimeException
2022-07-24 14:13:17 +00:00
from helpers . cluster import ClickHouseCluster , check_nats_is_available , nats_connect_ssl
2022-05-17 13:58:09 +00:00
from helpers . test_tools import TSV
from . import nats_pb2
cluster = ClickHouseCluster ( __file__ )
instance = cluster . add_instance (
" instance " ,
main_configs = [
" configs/nats.xml " ,
" configs/macros.xml " ,
" configs/named_collection.xml " ,
] ,
user_configs = [ " configs/users.xml " ] ,
with_nats = True ,
clickhouse_path_dir = " clickhouse_path " ,
)
# Helpers
2022-05-25 18:31:14 +00:00
2022-07-24 14:13:17 +00:00
def wait_nats_to_start ( nats_port , ssl_ctx = None , timeout = 180 ) :
2022-05-17 13:58:09 +00:00
start = time . time ( )
while time . time ( ) - start < timeout :
try :
2022-07-24 14:13:17 +00:00
if asyncio . run ( check_nats_is_available ( nats_port , ssl_ctx = ssl_ctx ) ) :
2022-05-17 13:58:09 +00:00
logging . debug ( " NATS is available " )
return
time . sleep ( 0.5 )
except Exception as ex :
logging . debug ( " Can ' t connect to NATS " + str ( ex ) )
time . sleep ( 0.5 )
def nats_check_result ( result , check = False , ref_file = " test_nats_json.reference " ) :
fpath = p . join ( p . dirname ( __file__ ) , ref_file )
with open ( fpath ) as reference :
if check :
assert TSV ( result ) == TSV ( reference )
else :
return TSV ( result ) == TSV ( reference )
def kill_nats ( nats_id ) :
p = subprocess . Popen ( ( " docker " , " stop " , nats_id ) , stdout = subprocess . PIPE )
p . communicate ( )
return p . returncode == 0
2022-07-24 14:13:17 +00:00
def revive_nats ( nats_id , nats_port ) :
2022-05-17 13:58:09 +00:00
p = subprocess . Popen ( ( " docker " , " start " , nats_id ) , stdout = subprocess . PIPE )
p . communicate ( )
2022-07-24 14:13:17 +00:00
wait_nats_to_start ( nats_port )
2022-05-17 13:58:09 +00:00
# Fixtures
@pytest.fixture ( scope = " module " )
def nats_cluster ( ) :
try :
cluster . start ( )
logging . debug ( " nats_id is {} " . format ( instance . cluster . nats_docker_id ) )
instance . query ( " CREATE DATABASE test " )
yield cluster
finally :
cluster . shutdown ( )
@pytest.fixture ( autouse = True )
def nats_setup_teardown ( ) :
print ( " NATS is available - running test " )
yield # run test
2023-05-03 18:06:46 +00:00
instance . query ( " DROP DATABASE test SYNC " )
2022-05-17 13:58:09 +00:00
instance . query ( " CREATE DATABASE test " )
# Tests
2022-05-25 18:31:14 +00:00
2022-07-24 14:13:17 +00:00
async def nats_produce_messages ( cluster_inst , subject , messages = ( ) , bytes = None ) :
nc = await nats_connect_ssl (
cluster_inst . nats_port ,
user = " click " ,
password = " house " ,
ssl_ctx = cluster_inst . nats_ssl_context ,
)
2022-05-17 13:58:09 +00:00
logging . debug ( " NATS connection status: " + str ( nc . is_connected ) )
for message in messages :
await nc . publish ( subject , message . encode ( ) )
if bytes is not None :
await nc . publish ( subject , bytes )
logging . debug ( " Finished publising to " + subject )
await nc . close ( )
return messages
def check_table_is_ready ( instance , table_name ) :
try :
instance . query ( " SELECT * FROM {} " . format ( table_name ) )
return True
except Exception :
return False
def test_nats_select ( nats_cluster ) :
instance . query (
"""
CREATE TABLE test . nats ( key UInt64 , value UInt64 )
ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_subjects = ' select ' ,
nats_format = ' JSONEachRow ' ,
nats_row_delimiter = ' \\ n ' ;
"""
)
2022-05-25 18:31:14 +00:00
while not check_table_is_ready ( instance , " test.nats " ) :
2022-05-17 13:58:09 +00:00
logging . debug ( " Table test.nats is not yet ready " )
time . sleep ( 0.5 )
messages = [ ]
for i in range ( 50 ) :
messages . append ( json . dumps ( { " key " : i , " value " : i } ) )
2022-07-24 14:13:17 +00:00
asyncio . run ( nats_produce_messages ( nats_cluster , " select " , messages ) )
2022-05-17 13:58:09 +00:00
# The order of messages in select * from test.nats is not guaranteed, so sleep to collect everything in one select
time . sleep ( 1 )
result = " "
while True :
result + = instance . query (
" SELECT * FROM test.nats ORDER BY key " , ignore_error = True
)
if nats_check_result ( result ) :
break
nats_check_result ( result , True )
def test_nats_select_empty ( nats_cluster ) :
instance . query (
"""
CREATE TABLE test . nats ( key UInt64 , value UInt64 )
ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_subjects = ' empty ' ,
nats_format = ' TSV ' ,
nats_row_delimiter = ' \\ n ' ;
"""
)
assert int ( instance . query ( " SELECT count() FROM test.nats " ) ) == 0
def test_nats_json_without_delimiter ( nats_cluster ) :
instance . query (
"""
CREATE TABLE test . nats ( key UInt64 , value UInt64 )
ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_subjects = ' json ' ,
nats_format = ' JSONEachRow ' ;
"""
)
2022-05-25 18:31:14 +00:00
while not check_table_is_ready ( instance , " test.nats " ) :
2022-05-17 13:58:09 +00:00
logging . debug ( " Table test.nats is not yet ready " )
time . sleep ( 0.5 )
messages = " "
for i in range ( 25 ) :
messages + = json . dumps ( { " key " : i , " value " : i } ) + " \n "
all_messages = [ messages ]
2022-07-24 14:13:17 +00:00
asyncio . run ( nats_produce_messages ( nats_cluster , " json " , all_messages ) )
2022-05-17 13:58:09 +00:00
messages = " "
for i in range ( 25 , 50 ) :
messages + = json . dumps ( { " key " : i , " value " : i } ) + " \n "
all_messages = [ messages ]
2022-07-24 14:13:17 +00:00
asyncio . run ( nats_produce_messages ( nats_cluster , " json " , all_messages ) )
2022-05-17 13:58:09 +00:00
time . sleep ( 1 )
result = " "
time_limit_sec = 60
deadline = time . monotonic ( ) + time_limit_sec
while time . monotonic ( ) < deadline :
result + = instance . query (
" SELECT * FROM test.nats ORDER BY key " , ignore_error = True
)
if nats_check_result ( result ) :
break
nats_check_result ( result , True )
def test_nats_csv_with_delimiter ( nats_cluster ) :
instance . query (
"""
CREATE TABLE test . nats ( key UInt64 , value UInt64 )
ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_subjects = ' csv ' ,
nats_format = ' CSV ' ,
nats_row_delimiter = ' \\ n ' ;
"""
)
2022-05-25 18:31:14 +00:00
while not check_table_is_ready ( instance , " test.nats " ) :
2022-05-17 13:58:09 +00:00
logging . debug ( " Table test.nats is not yet ready " )
time . sleep ( 0.5 )
messages = [ ]
for i in range ( 50 ) :
messages . append ( " {i} , {i} " . format ( i = i ) )
2022-07-24 14:13:17 +00:00
asyncio . run ( nats_produce_messages ( nats_cluster , " csv " , messages ) )
2022-05-17 13:58:09 +00:00
time . sleep ( 1 )
result = " "
for _ in range ( 60 ) :
result + = instance . query (
" SELECT * FROM test.nats ORDER BY key " , ignore_error = True
)
if nats_check_result ( result ) :
break
nats_check_result ( result , True )
def test_nats_tsv_with_delimiter ( nats_cluster ) :
instance . query (
"""
CREATE TABLE test . nats ( key UInt64 , value UInt64 )
ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_subjects = ' tsv ' ,
nats_format = ' TSV ' ,
nats_row_delimiter = ' \\ n ' ;
CREATE TABLE test . view ( key UInt64 , value UInt64 )
ENGINE = MergeTree ( )
ORDER BY key ;
CREATE MATERIALIZED VIEW test . consumer TO test . view AS
SELECT * FROM test . nats ;
"""
)
2022-05-25 18:31:14 +00:00
while not check_table_is_ready ( instance , " test.nats " ) :
2022-05-17 13:58:09 +00:00
logging . debug ( " Table test.nats is not yet ready " )
time . sleep ( 0.5 )
messages = [ ]
for i in range ( 50 ) :
messages . append ( " {i} \t {i} " . format ( i = i ) )
2022-07-24 14:13:17 +00:00
asyncio . run ( nats_produce_messages ( nats_cluster , " tsv " , messages ) )
2022-05-17 13:58:09 +00:00
result = " "
for _ in range ( 60 ) :
result = instance . query ( " SELECT * FROM test.view ORDER BY key " )
if nats_check_result ( result ) :
break
nats_check_result ( result , True )
2022-05-25 18:31:14 +00:00
2022-05-17 13:58:09 +00:00
#
2022-05-25 18:31:14 +00:00
2022-05-17 13:58:09 +00:00
def test_nats_macros ( nats_cluster ) :
instance . query (
"""
CREATE TABLE test . nats ( key UInt64 , value UInt64 )
ENGINE = NATS
SETTINGS nats_url = ' {nats_url} ' ,
nats_subjects = ' {nats_subjects} ' ,
nats_format = ' {nats_format} '
"""
)
2022-05-25 18:31:14 +00:00
while not check_table_is_ready ( instance , " test.nats " ) :
2022-05-17 13:58:09 +00:00
logging . debug ( " Table test.nats is not yet ready " )
time . sleep ( 0.5 )
message = " "
for i in range ( 50 ) :
message + = json . dumps ( { " key " : i , " value " : i } ) + " \n "
2022-07-24 14:13:17 +00:00
asyncio . run ( nats_produce_messages ( nats_cluster , " macro " , [ message ] ) )
2022-05-17 13:58:09 +00:00
time . sleep ( 1 )
result = " "
for _ in range ( 60 ) :
result + = instance . query (
" SELECT * FROM test.nats ORDER BY key " , ignore_error = True
)
if nats_check_result ( result ) :
break
nats_check_result ( result , True )
def test_nats_materialized_view ( nats_cluster ) :
instance . query (
"""
CREATE TABLE test . nats ( key UInt64 , value UInt64 )
ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_subjects = ' mv ' ,
nats_format = ' JSONEachRow ' ,
nats_row_delimiter = ' \\ n ' ;
CREATE TABLE test . view ( key UInt64 , value UInt64 )
ENGINE = MergeTree ( )
ORDER BY key ;
CREATE MATERIALIZED VIEW test . consumer TO test . view AS
SELECT * FROM test . nats ;
CREATE TABLE test . view2 ( key UInt64 , value UInt64 )
ENGINE = MergeTree ( )
ORDER BY key ;
CREATE MATERIALIZED VIEW test . consumer2 TO test . view2 AS
SELECT * FROM test . nats group by ( key , value ) ;
"""
)
2022-05-25 18:31:14 +00:00
while not check_table_is_ready ( instance , " test.nats " ) :
2022-05-17 13:58:09 +00:00
logging . debug ( " Table test.nats is not yet ready " )
time . sleep ( 0.5 )
messages = [ ]
for i in range ( 50 ) :
messages . append ( json . dumps ( { " key " : i , " value " : i } ) )
2022-07-24 14:13:17 +00:00
asyncio . run ( nats_produce_messages ( nats_cluster , " mv " , messages ) )
2022-05-17 13:58:09 +00:00
time_limit_sec = 60
deadline = time . monotonic ( ) + time_limit_sec
while time . monotonic ( ) < deadline :
result = instance . query ( " SELECT * FROM test.view ORDER BY key " )
if nats_check_result ( result ) :
break
nats_check_result ( result , True )
deadline = time . monotonic ( ) + time_limit_sec
while time . monotonic ( ) < deadline :
result = instance . query ( " SELECT * FROM test.view2 ORDER BY key " )
if nats_check_result ( result ) :
break
nats_check_result ( result , True )
def test_nats_materialized_view_with_subquery ( nats_cluster ) :
instance . query (
"""
CREATE TABLE test . nats ( key UInt64 , value UInt64 )
ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_subjects = ' mvsq ' ,
nats_format = ' JSONEachRow ' ,
nats_row_delimiter = ' \\ n ' ;
CREATE TABLE test . view ( key UInt64 , value UInt64 )
ENGINE = MergeTree ( )
ORDER BY key ;
CREATE MATERIALIZED VIEW test . consumer TO test . view AS
SELECT * FROM ( SELECT * FROM test . nats ) ;
"""
)
2022-05-25 18:31:14 +00:00
while not check_table_is_ready ( instance , " test.nats " ) :
2022-05-17 13:58:09 +00:00
logging . debug ( " Table test.nats is not yet ready " )
time . sleep ( 0.5 )
messages = [ ]
for i in range ( 50 ) :
messages . append ( json . dumps ( { " key " : i , " value " : i } ) )
2022-07-24 14:13:17 +00:00
asyncio . run ( nats_produce_messages ( nats_cluster , " mvsq " , messages ) )
2022-05-17 13:58:09 +00:00
time_limit_sec = 60
deadline = time . monotonic ( ) + time_limit_sec
while time . monotonic ( ) < deadline :
result = instance . query ( " SELECT * FROM test.view ORDER BY key " )
if nats_check_result ( result ) :
break
nats_check_result ( result , True )
def test_nats_many_materialized_views ( nats_cluster ) :
instance . query (
"""
DROP TABLE IF EXISTS test . view1 ;
DROP TABLE IF EXISTS test . view2 ;
DROP TABLE IF EXISTS test . consumer1 ;
DROP TABLE IF EXISTS test . consumer2 ;
CREATE TABLE test . nats ( key UInt64 , value UInt64 )
ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_subjects = ' mmv ' ,
nats_format = ' JSONEachRow ' ,
nats_row_delimiter = ' \\ n ' ;
CREATE TABLE test . view1 ( key UInt64 , value UInt64 )
ENGINE = MergeTree ( )
ORDER BY key ;
CREATE TABLE test . view2 ( key UInt64 , value UInt64 )
ENGINE = MergeTree ( )
ORDER BY key ;
CREATE MATERIALIZED VIEW test . consumer1 TO test . view1 AS
SELECT * FROM test . nats ;
CREATE MATERIALIZED VIEW test . consumer2 TO test . view2 AS
SELECT * FROM test . nats ;
"""
)
2022-05-25 18:31:14 +00:00
while not check_table_is_ready ( instance , " test.nats " ) :
2022-05-17 13:58:09 +00:00
logging . debug ( " Table test.nats is not yet ready " )
time . sleep ( 0.5 )
messages = [ ]
for i in range ( 50 ) :
messages . append ( json . dumps ( { " key " : i , " value " : i } ) )
2022-07-24 14:13:17 +00:00
asyncio . run ( nats_produce_messages ( nats_cluster , " mmv " , messages ) )
2022-05-17 13:58:09 +00:00
time_limit_sec = 60
deadline = time . monotonic ( ) + time_limit_sec
while time . monotonic ( ) < deadline :
result1 = instance . query ( " SELECT * FROM test.view1 ORDER BY key " )
result2 = instance . query ( " SELECT * FROM test.view2 ORDER BY key " )
if nats_check_result ( result1 ) and nats_check_result ( result2 ) :
break
instance . query (
"""
DROP TABLE test . consumer1 ;
DROP TABLE test . consumer2 ;
DROP TABLE test . view1 ;
DROP TABLE test . view2 ;
"""
)
nats_check_result ( result1 , True )
nats_check_result ( result2 , True )
def test_nats_protobuf ( nats_cluster ) :
instance . query (
"""
CREATE TABLE test . nats ( key UInt64 , value String )
ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_subjects = ' pb ' ,
nats_format = ' Protobuf ' ,
2022-05-24 15:14:16 +00:00
nats_schema = ' nats.proto:ProtoKeyValue ' ;
2022-05-17 13:58:09 +00:00
CREATE TABLE test . view ( key UInt64 , value UInt64 )
ENGINE = MergeTree ( )
ORDER BY key ;
CREATE MATERIALIZED VIEW test . consumer TO test . view AS
SELECT * FROM test . nats ;
"""
)
2022-05-25 18:31:14 +00:00
while not check_table_is_ready ( instance , " test.nats " ) :
2022-05-17 13:58:09 +00:00
logging . debug ( " Table test.nats is not yet ready " )
time . sleep ( 0.5 )
data = b " "
for i in range ( 0 , 20 ) :
2022-05-24 15:14:16 +00:00
msg = nats_pb2 . ProtoKeyValue ( )
2022-05-17 13:58:09 +00:00
msg . key = i
msg . value = str ( i )
serialized_msg = msg . SerializeToString ( )
data = data + _VarintBytes ( len ( serialized_msg ) ) + serialized_msg
2022-07-24 14:13:17 +00:00
asyncio . run ( nats_produce_messages ( nats_cluster , " pb " , bytes = data ) )
2022-05-17 13:58:09 +00:00
data = b " "
for i in range ( 20 , 21 ) :
2022-05-24 15:14:16 +00:00
msg = nats_pb2 . ProtoKeyValue ( )
2022-05-17 13:58:09 +00:00
msg . key = i
msg . value = str ( i )
serialized_msg = msg . SerializeToString ( )
data = data + _VarintBytes ( len ( serialized_msg ) ) + serialized_msg
2022-07-24 14:13:17 +00:00
asyncio . run ( nats_produce_messages ( nats_cluster , " pb " , bytes = data ) )
2022-05-17 13:58:09 +00:00
data = b " "
for i in range ( 21 , 50 ) :
2022-05-24 15:14:16 +00:00
msg = nats_pb2 . ProtoKeyValue ( )
2022-05-17 13:58:09 +00:00
msg . key = i
msg . value = str ( i )
serialized_msg = msg . SerializeToString ( )
data = data + _VarintBytes ( len ( serialized_msg ) ) + serialized_msg
2022-07-24 14:13:17 +00:00
asyncio . run ( nats_produce_messages ( nats_cluster , " pb " , bytes = data ) )
2022-05-17 13:58:09 +00:00
result = " "
time_limit_sec = 60
deadline = time . monotonic ( ) + time_limit_sec
while time . monotonic ( ) < deadline :
result = instance . query ( " SELECT * FROM test.view ORDER BY key " )
if nats_check_result ( result ) :
break
nats_check_result ( result , True )
def test_nats_big_message ( nats_cluster ) :
# Create batchs of messages of size ~100Kb
nats_messages = 1000
batch_messages = 1000
messages = [
json . dumps ( { " key " : i , " value " : " x " * 100 } ) * batch_messages
for i in range ( nats_messages )
]
instance . query (
"""
CREATE TABLE test . nats ( key UInt64 , value String )
ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_subjects = ' big ' ,
nats_format = ' JSONEachRow ' ;
CREATE TABLE test . view ( key UInt64 , value String )
ENGINE = MergeTree
ORDER BY key ;
CREATE MATERIALIZED VIEW test . consumer TO test . view AS
SELECT * FROM test . nats ;
"""
)
2022-05-25 18:31:14 +00:00
while not check_table_is_ready ( instance , " test.nats " ) :
2022-05-17 13:58:09 +00:00
logging . debug ( " Table test.nats is not yet ready " )
time . sleep ( 0.5 )
2022-07-24 14:13:17 +00:00
asyncio . run ( nats_produce_messages ( nats_cluster , " big " , messages ) )
2022-05-17 13:58:09 +00:00
while True :
result = instance . query ( " SELECT count() FROM test.view " )
if int ( result ) == batch_messages * nats_messages :
break
assert (
int ( result ) == nats_messages * batch_messages
) , " ClickHouse lost some messages: {} " . format ( result )
def test_nats_mv_combo ( nats_cluster ) :
NUM_MV = 5
NUM_CONSUMERS = 4
instance . query (
"""
CREATE TABLE test . nats ( key UInt64 , value UInt64 )
ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_subjects = ' combo ' ,
nats_num_consumers = { } ,
nats_format = ' JSONEachRow ' ,
nats_row_delimiter = ' \\ n ' ;
2022-05-25 18:31:14 +00:00
""" .format(
NUM_CONSUMERS
)
2022-05-17 13:58:09 +00:00
)
2022-05-25 18:31:14 +00:00
while not check_table_is_ready ( instance , " test.nats " ) :
2022-05-17 13:58:09 +00:00
logging . debug ( " Table test.nats is not yet ready " )
time . sleep ( 0.5 )
for mv_id in range ( NUM_MV ) :
instance . query (
"""
DROP TABLE IF EXISTS test . combo_ { 0 } ;
DROP TABLE IF EXISTS test . combo_ { 0 } _mv ;
CREATE TABLE test . combo_ { 0 } ( key UInt64 , value UInt64 )
ENGINE = MergeTree ( )
ORDER BY key ;
CREATE MATERIALIZED VIEW test . combo_ { 0 } _mv TO test . combo_ { 0 } AS
SELECT * FROM test . nats ;
""" .format(
mv_id
)
)
time . sleep ( 2 )
i = [ 0 ]
messages_num = 10000
def produce ( ) :
messages = [ ]
for _ in range ( messages_num ) :
messages . append ( json . dumps ( { " key " : i [ 0 ] , " value " : i [ 0 ] } ) )
i [ 0 ] + = 1
2022-07-24 14:13:17 +00:00
asyncio . run ( nats_produce_messages ( nats_cluster , " combo " , messages ) )
2022-05-17 13:58:09 +00:00
threads = [ ]
threads_num = 20
for _ in range ( threads_num ) :
threads . append ( threading . Thread ( target = produce ) )
for thread in threads :
time . sleep ( random . uniform ( 0 , 1 ) )
thread . start ( )
while True :
result = 0
for mv_id in range ( NUM_MV ) :
result + = int (
instance . query ( " SELECT count() FROM test.combo_ {0} " . format ( mv_id ) )
)
if int ( result ) == messages_num * threads_num * NUM_MV :
break
time . sleep ( 1 )
for thread in threads :
thread . join ( )
for mv_id in range ( NUM_MV ) :
instance . query (
"""
DROP TABLE test . combo_ { 0 } _mv ;
DROP TABLE test . combo_ { 0 } ;
""" .format(
mv_id
)
)
assert (
int ( result ) == messages_num * threads_num * NUM_MV
) , " ClickHouse lost some messages: {} " . format ( result )
def test_nats_insert ( nats_cluster ) :
instance . query (
"""
CREATE TABLE test . nats ( key UInt64 , value UInt64 )
ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_subjects = ' insert ' ,
nats_format = ' TSV ' ,
nats_row_delimiter = ' \\ n ' ;
"""
)
2022-05-25 18:31:14 +00:00
while not check_table_is_ready ( instance , " test.nats " ) :
2022-05-17 13:58:09 +00:00
logging . debug ( " Table test.nats is not yet ready " )
time . sleep ( 0.5 )
values = [ ]
for i in range ( 50 ) :
values . append ( " ( {i} , {i} ) " . format ( i = i ) )
values = " , " . join ( values )
insert_messages = [ ]
async def sub_to_nats ( ) :
2022-07-24 14:13:17 +00:00
nc = await nats_connect_ssl (
nats_cluster . nats_port ,
user = " click " ,
password = " house " ,
ssl_ctx = nats_cluster . nats_ssl_context ,
2022-05-25 18:31:14 +00:00
)
sub = await nc . subscribe ( " insert " )
2022-05-17 13:58:09 +00:00
await sub . unsubscribe ( 50 )
async for msg in sub . messages :
insert_messages . append ( msg . data . decode ( ) )
await sub . drain ( )
await nc . drain ( )
def run_sub ( ) :
asyncio . run ( sub_to_nats ( ) )
thread = threading . Thread ( target = run_sub )
thread . start ( )
time . sleep ( 1 )
while True :
try :
instance . query ( " INSERT INTO test.nats VALUES {} " . format ( values ) )
break
except QueryRuntimeException as e :
if " Local: Timed out. " in str ( e ) :
continue
else :
raise
thread . join ( )
result = " \n " . join ( insert_messages )
nats_check_result ( result , True )
def test_nats_many_subjects_insert_wrong ( nats_cluster ) :
instance . query (
"""
CREATE TABLE test . nats ( key UInt64 , value UInt64 )
ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_subjects = ' insert1,insert2.>,insert3.*.foo ' ,
nats_format = ' TSV ' ,
nats_row_delimiter = ' \\ n ' ;
"""
)
2022-05-25 18:31:14 +00:00
while not check_table_is_ready ( instance , " test.nats " ) :
2022-05-17 13:58:09 +00:00
logging . debug ( " Table test.nats is not yet ready " )
time . sleep ( 0.5 )
values = [ ]
for i in range ( 50 ) :
values . append ( " ( {i} , {i} ) " . format ( i = i ) )
values = " , " . join ( values )
# no subject specified
instance . query_and_get_error ( " INSERT INTO test.nats VALUES {} " . format ( values ) )
# can't insert into wildcard subjects
2022-05-25 18:31:14 +00:00
instance . query_and_get_error (
" INSERT INTO test.nats SETTINGS stream_like_engine_insert_queue= ' insert2.> ' VALUES {} " . format (
values
)
)
instance . query_and_get_error (
" INSERT INTO test.nats SETTINGS stream_like_engine_insert_queue= ' insert3.*.foo ' VALUES {} " . format (
values
)
)
2022-05-17 13:58:09 +00:00
# specified subject is not among engine's subjects
2022-05-25 18:31:14 +00:00
instance . query_and_get_error (
" INSERT INTO test.nats SETTINGS stream_like_engine_insert_queue= ' insert4 ' VALUES {} " . format (
values
)
)
instance . query_and_get_error (
" INSERT INTO test.nats SETTINGS stream_like_engine_insert_queue= ' insert3.foo.baz ' VALUES {} " . format (
values
)
)
instance . query_and_get_error (
" INSERT INTO test.nats SETTINGS stream_like_engine_insert_queue= ' foo.insert2 ' VALUES {} " . format (
values
)
)
2022-05-17 13:58:09 +00:00
def test_nats_many_subjects_insert_right ( nats_cluster ) :
instance . query (
"""
CREATE TABLE test . nats ( key UInt64 , value UInt64 )
ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_subjects = ' right_insert1,right_insert2 ' ,
nats_format = ' TSV ' ,
nats_row_delimiter = ' \\ n ' ;
"""
)
2022-05-25 18:31:14 +00:00
while not check_table_is_ready ( instance , " test.nats " ) :
2022-05-17 13:58:09 +00:00
logging . debug ( " Table test.nats is not yet ready " )
time . sleep ( 0.5 )
values = [ ]
for i in range ( 50 ) :
values . append ( " ( {i} , {i} ) " . format ( i = i ) )
values = " , " . join ( values )
insert_messages = [ ]
async def sub_to_nats ( ) :
2022-07-24 14:13:17 +00:00
nc = await nats_connect_ssl (
nats_cluster . nats_port ,
user = " click " ,
password = " house " ,
ssl_ctx = nats_cluster . nats_ssl_context ,
2022-05-25 18:31:14 +00:00
)
sub = await nc . subscribe ( " right_insert1 " )
2022-05-17 13:58:09 +00:00
await sub . unsubscribe ( 50 )
async for msg in sub . messages :
insert_messages . append ( msg . data . decode ( ) )
await sub . drain ( )
await nc . drain ( )
def run_sub ( ) :
asyncio . run ( sub_to_nats ( ) )
thread = threading . Thread ( target = run_sub )
thread . start ( )
time . sleep ( 1 )
while True :
try :
2022-05-25 18:31:14 +00:00
instance . query (
" INSERT INTO test.nats SETTINGS stream_like_engine_insert_queue= ' right_insert1 ' VALUES {} " . format (
values
)
)
2022-05-17 13:58:09 +00:00
break
except QueryRuntimeException as e :
if " Local: Timed out. " in str ( e ) :
continue
else :
raise
thread . join ( )
result = " \n " . join ( insert_messages )
nats_check_result ( result , True )
def test_nats_many_inserts ( nats_cluster ) :
instance . query (
"""
DROP TABLE IF EXISTS test . nats_many ;
DROP TABLE IF EXISTS test . nats_consume ;
DROP TABLE IF EXISTS test . view_many ;
DROP TABLE IF EXISTS test . consumer_many ;
CREATE TABLE test . nats_many ( key UInt64 , value UInt64 )
ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_subjects = ' many_inserts ' ,
nats_format = ' TSV ' ,
nats_row_delimiter = ' \\ n ' ;
CREATE TABLE test . nats_consume ( key UInt64 , value UInt64 )
ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_subjects = ' many_inserts ' ,
nats_format = ' TSV ' ,
nats_row_delimiter = ' \\ n ' ;
2022-05-25 19:02:47 +00:00
CREATE TABLE test . view_many ( key UInt64 , value UInt64 )
ENGINE = MergeTree
ORDER BY key ;
CREATE MATERIALIZED VIEW test . consumer_many TO test . view_many AS
SELECT * FROM test . nats_consume ;
2022-05-17 13:58:09 +00:00
"""
)
2022-05-25 18:31:14 +00:00
while not check_table_is_ready ( instance , " test.nats_consume " ) :
2022-05-17 13:58:09 +00:00
logging . debug ( " Table test.nats_consume is not yet ready " )
time . sleep ( 0.5 )
messages_num = 10000
values = [ ]
for i in range ( messages_num ) :
values . append ( " ( {i} , {i} ) " . format ( i = i ) )
values = " , " . join ( values )
def insert ( ) :
while True :
try :
2022-05-25 18:31:14 +00:00
instance . query ( " INSERT INTO test.nats_many VALUES {} " . format ( values ) )
2022-05-17 13:58:09 +00:00
break
except QueryRuntimeException as e :
if " Local: Timed out. " in str ( e ) :
continue
else :
raise
threads = [ ]
threads_num = 10
for _ in range ( threads_num ) :
threads . append ( threading . Thread ( target = insert ) )
for thread in threads :
time . sleep ( random . uniform ( 0 , 1 ) )
thread . start ( )
for thread in threads :
thread . join ( )
2022-05-25 19:02:47 +00:00
time_limit_sec = 300
deadline = time . monotonic ( ) + time_limit_sec
while time . monotonic ( ) < deadline :
2022-05-17 13:58:09 +00:00
result = instance . query ( " SELECT count() FROM test.view_many " )
print ( result , messages_num * threads_num )
2022-05-25 19:02:47 +00:00
if int ( result ) > = messages_num * threads_num :
2022-05-17 13:58:09 +00:00
break
time . sleep ( 1 )
instance . query (
"""
DROP TABLE test . nats_consume ;
DROP TABLE test . nats_many ;
DROP TABLE test . consumer_many ;
DROP TABLE test . view_many ;
"""
)
assert (
int ( result ) == messages_num * threads_num
2022-05-25 19:13:26 +00:00
) , " ClickHouse lost some messages or got duplicated ones. Total count: {} " . format (
result
)
2022-05-17 13:58:09 +00:00
def test_nats_overloaded_insert ( nats_cluster ) :
instance . query (
"""
DROP TABLE IF EXISTS test . view_overload ;
DROP TABLE IF EXISTS test . consumer_overload ;
DROP TABLE IF EXISTS test . nats_consume ;
CREATE TABLE test . nats_consume ( key UInt64 , value UInt64 )
ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_subjects = ' over ' ,
nats_num_consumers = 5 ,
nats_max_block_size = 10000 ,
nats_format = ' TSV ' ,
nats_row_delimiter = ' \\ n ' ;
CREATE TABLE test . nats_overload ( key UInt64 , value UInt64 )
ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_subjects = ' over ' ,
nats_format = ' TSV ' ,
nats_row_delimiter = ' \\ n ' ;
CREATE TABLE test . view_overload ( key UInt64 , value UInt64 )
ENGINE = MergeTree
ORDER BY key
2023-05-22 17:07:18 +00:00
SETTINGS old_parts_lifetime = 5 , cleanup_delay_period = 2 , cleanup_delay_period_random_add = 3 ,
cleanup_thread_preferred_points_per_iteration = 0 ;
2022-05-17 13:58:09 +00:00
CREATE MATERIALIZED VIEW test . consumer_overload TO test . view_overload AS
SELECT * FROM test . nats_consume ;
"""
)
2022-05-25 18:31:14 +00:00
while not check_table_is_ready ( instance , " test.nats_consume " ) :
2022-05-17 13:58:09 +00:00
logging . debug ( " Table test.nats_consume is not yet ready " )
time . sleep ( 0.5 )
messages_num = 100000
def insert ( ) :
values = [ ]
for i in range ( messages_num ) :
values . append ( " ( {i} , {i} ) " . format ( i = i ) )
values = " , " . join ( values )
while True :
try :
instance . query (
" INSERT INTO test.nats_overload VALUES {} " . format ( values )
)
break
except QueryRuntimeException as e :
if " Local: Timed out. " in str ( e ) :
continue
else :
raise
threads = [ ]
threads_num = 5
for _ in range ( threads_num ) :
threads . append ( threading . Thread ( target = insert ) )
for thread in threads :
time . sleep ( random . uniform ( 0 , 1 ) )
thread . start ( )
2022-05-25 19:02:47 +00:00
time_limit_sec = 300
deadline = time . monotonic ( ) + time_limit_sec
while time . monotonic ( ) < deadline :
2022-05-17 13:58:09 +00:00
result = instance . query ( " SELECT count() FROM test.view_overload " )
time . sleep ( 1 )
2022-05-25 19:02:47 +00:00
if int ( result ) > = messages_num * threads_num :
2022-05-17 13:58:09 +00:00
break
instance . query (
"""
DROP TABLE test . consumer_overload ;
DROP TABLE test . view_overload ;
DROP TABLE test . nats_consume ;
DROP TABLE test . nats_overload ;
"""
)
for thread in threads :
thread . join ( )
assert (
int ( result ) == messages_num * threads_num
2022-05-25 19:13:26 +00:00
) , " ClickHouse lost some messages or got duplicated ones. Total count: {} " . format (
result
)
2022-05-17 13:58:09 +00:00
def test_nats_virtual_column ( nats_cluster ) :
instance . query (
"""
CREATE TABLE test . nats_virtuals ( key UInt64 , value UInt64 )
ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_subjects = ' virtuals ' ,
nats_format = ' JSONEachRow ' ;
CREATE MATERIALIZED VIEW test . view Engine = Log AS
SELECT value , key , _subject FROM test . nats_virtuals ;
"""
)
2022-05-25 18:31:14 +00:00
while not check_table_is_ready ( instance , " test.nats_virtuals " ) :
2022-05-17 13:58:09 +00:00
logging . debug ( " Table test.nats_virtuals is not yet ready " )
time . sleep ( 0.5 )
message_num = 10
i = 0
messages = [ ]
for _ in range ( message_num ) :
messages . append ( json . dumps ( { " key " : i , " value " : i } ) )
i + = 1
2022-07-24 14:13:17 +00:00
asyncio . run ( nats_produce_messages ( nats_cluster , " virtuals " , messages ) )
2022-05-17 13:58:09 +00:00
while True :
result = instance . query ( " SELECT count() FROM test.view " )
time . sleep ( 1 )
if int ( result ) == message_num :
break
result = instance . query (
"""
SELECT key , value , _subject
FROM test . view ORDER BY key
"""
)
expected = """ \
0 0 virtuals
1 1 virtuals
2 2 virtuals
3 3 virtuals
4 4 virtuals
5 5 virtuals
6 6 virtuals
7 7 virtuals
8 8 virtuals
9 9 virtuals
"""
instance . query (
"""
DROP TABLE test . nats_virtuals ;
DROP TABLE test . view ;
"""
)
assert TSV ( result ) == TSV ( expected )
def test_nats_virtual_column_with_materialized_view ( nats_cluster ) :
instance . query (
"""
CREATE TABLE test . nats_virtuals_mv ( key UInt64 , value UInt64 )
ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_subjects = ' virtuals_mv ' ,
nats_format = ' JSONEachRow ' ;
CREATE TABLE test . view ( key UInt64 , value UInt64 , subject String ) ENGINE = MergeTree ( )
ORDER BY key ;
CREATE MATERIALIZED VIEW test . consumer TO test . view AS
SELECT * , _subject as subject
FROM test . nats_virtuals_mv ;
"""
)
2022-05-25 18:31:14 +00:00
while not check_table_is_ready ( instance , " test.nats_virtuals_mv " ) :
2022-05-17 13:58:09 +00:00
logging . debug ( " Table test.nats_virtuals_mv is not yet ready " )
time . sleep ( 0.5 )
message_num = 10
i = 0
messages = [ ]
for _ in range ( message_num ) :
messages . append ( json . dumps ( { " key " : i , " value " : i } ) )
i + = 1
2022-07-24 14:13:17 +00:00
asyncio . run ( nats_produce_messages ( nats_cluster , " virtuals_mv " , messages ) )
2022-05-17 13:58:09 +00:00
while True :
result = instance . query ( " SELECT count() FROM test.view " )
time . sleep ( 1 )
if int ( result ) == message_num :
break
2022-05-25 18:31:14 +00:00
result = instance . query ( " SELECT key, value, subject FROM test.view ORDER BY key " )
2022-05-17 13:58:09 +00:00
expected = """ \
0 0 virtuals_mv
1 1 virtuals_mv
2 2 virtuals_mv
3 3 virtuals_mv
4 4 virtuals_mv
5 5 virtuals_mv
6 6 virtuals_mv
7 7 virtuals_mv
8 8 virtuals_mv
9 9 virtuals_mv
"""
instance . query (
"""
DROP TABLE test . consumer ;
DROP TABLE test . view ;
DROP TABLE test . nats_virtuals_mv
"""
)
assert TSV ( result ) == TSV ( expected )
def test_nats_many_consumers_to_each_queue ( nats_cluster ) :
instance . query (
"""
DROP TABLE IF EXISTS test . destination ;
CREATE TABLE test . destination ( key UInt64 , value UInt64 )
ENGINE = MergeTree ( )
ORDER BY key ;
"""
)
num_tables = 4
for table_id in range ( num_tables ) :
print ( ( " Setting up table {} " . format ( table_id ) ) )
instance . query (
"""
DROP TABLE IF EXISTS test . many_consumers_ { 0 } ;
DROP TABLE IF EXISTS test . many_consumers_ { 0 } _mv ;
CREATE TABLE test . many_consumers_ { 0 } ( key UInt64 , value UInt64 )
ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_subjects = ' many_consumers ' ,
nats_num_consumers = 2 ,
nats_queue_group = ' many_consumers ' ,
nats_format = ' JSONEachRow ' ,
nats_row_delimiter = ' \\ n ' ;
CREATE MATERIALIZED VIEW test . many_consumers_ { 0 } _mv TO test . destination AS
SELECT key , value FROM test . many_consumers_ { 0 } ;
""" .format(
table_id
)
)
2022-05-25 18:31:14 +00:00
while not check_table_is_ready (
instance , " test.many_consumers_ {} " . format ( table_id )
) :
logging . debug (
" Table test.many_consumers_ {} is not yet ready " . format ( table_id )
)
2022-05-17 13:58:09 +00:00
time . sleep ( 0.5 )
i = [ 0 ]
messages_num = 1000
def produce ( ) :
messages = [ ]
for _ in range ( messages_num ) :
messages . append ( json . dumps ( { " key " : i [ 0 ] , " value " : i [ 0 ] } ) )
i [ 0 ] + = 1
2022-07-24 14:13:17 +00:00
asyncio . run ( nats_produce_messages ( nats_cluster , " many_consumers " , messages ) )
2022-05-17 13:58:09 +00:00
threads = [ ]
threads_num = 20
for _ in range ( threads_num ) :
threads . append ( threading . Thread ( target = produce ) )
for thread in threads :
time . sleep ( random . uniform ( 0 , 1 ) )
thread . start ( )
result1 = " "
while True :
result1 = instance . query ( " SELECT count() FROM test.destination " )
time . sleep ( 1 )
if int ( result1 ) == messages_num * threads_num :
break
for thread in threads :
thread . join ( )
for consumer_id in range ( num_tables ) :
instance . query (
"""
DROP TABLE test . many_consumers_ { 0 } ;
DROP TABLE test . many_consumers_ { 0 } _mv ;
""" .format(
consumer_id
)
)
instance . query (
"""
DROP TABLE test . destination ;
"""
)
assert (
int ( result1 ) == messages_num * threads_num
) , " ClickHouse lost some messages: {} " . format ( result1 )
def test_nats_restore_failed_connection_without_losses_on_write ( nats_cluster ) :
instance . query (
"""
DROP TABLE IF EXISTS test . consume ;
CREATE TABLE test . view ( key UInt64 , value UInt64 )
ENGINE = MergeTree
ORDER BY key ;
CREATE TABLE test . consume ( key UInt64 , value UInt64 )
ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_subjects = ' producer_reconnect ' ,
nats_format = ' JSONEachRow ' ,
nats_num_consumers = 2 ,
nats_row_delimiter = ' \\ n ' ;
CREATE MATERIALIZED VIEW test . consumer TO test . view AS
SELECT * FROM test . consume ;
DROP TABLE IF EXISTS test . producer_reconnect ;
CREATE TABLE test . producer_reconnect ( key UInt64 , value UInt64 )
ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_subjects = ' producer_reconnect ' ,
nats_format = ' JSONEachRow ' ,
nats_row_delimiter = ' \\ n ' ;
"""
)
2022-05-25 18:31:14 +00:00
while not check_table_is_ready ( instance , " test.consume " ) :
2022-05-17 13:58:09 +00:00
logging . debug ( " Table test.consume is not yet ready " )
time . sleep ( 0.5 )
messages_num = 100000
values = [ ]
for i in range ( messages_num ) :
values . append ( " ( {i} , {i} ) " . format ( i = i ) )
values = " , " . join ( values )
while True :
try :
instance . query (
" INSERT INTO test.producer_reconnect VALUES {} " . format ( values )
)
break
except QueryRuntimeException as e :
if " Local: Timed out. " in str ( e ) :
continue
else :
raise
while int ( instance . query ( " SELECT count() FROM test.view " ) ) == 0 :
time . sleep ( 0.1 )
kill_nats ( nats_cluster . nats_docker_id )
time . sleep ( 4 )
2022-07-24 14:13:17 +00:00
revive_nats ( nats_cluster . nats_docker_id , nats_cluster . nats_port )
2022-05-17 13:58:09 +00:00
while True :
result = instance . query ( " SELECT count(DISTINCT key) FROM test.view " )
time . sleep ( 1 )
if int ( result ) == messages_num :
break
instance . query (
"""
DROP TABLE test . consume ;
DROP TABLE test . producer_reconnect ;
"""
)
assert int ( result ) == messages_num , " ClickHouse lost some messages: {} " . format (
result
)
def test_nats_no_connection_at_startup_1 ( nats_cluster ) :
# no connection when table is initialized
nats_cluster . pause_container ( " nats1 " )
instance . query_and_get_error (
"""
CREATE TABLE test . cs ( key UInt64 , value UInt64 )
ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_subjects = ' cs ' ,
nats_format = ' JSONEachRow ' ,
nats_num_consumers = ' 5 ' ,
nats_row_delimiter = ' \\ n ' ;
"""
)
nats_cluster . unpause_container ( " nats1 " )
def test_nats_no_connection_at_startup_2 ( nats_cluster ) :
instance . query (
"""
CREATE TABLE test . cs ( key UInt64 , value UInt64 )
ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_subjects = ' cs ' ,
nats_format = ' JSONEachRow ' ,
nats_num_consumers = ' 5 ' ,
nats_row_delimiter = ' \\ n ' ;
CREATE TABLE test . view ( key UInt64 , value UInt64 )
ENGINE = MergeTree
ORDER BY key ;
CREATE MATERIALIZED VIEW test . consumer TO test . view AS
SELECT * FROM test . cs ;
"""
)
instance . query ( " DETACH TABLE test.cs " )
nats_cluster . pause_container ( " nats1 " )
instance . query ( " ATTACH TABLE test.cs " )
nats_cluster . unpause_container ( " nats1 " )
2022-06-02 09:09:24 +00:00
while not check_table_is_ready ( instance , " test.cs " ) :
logging . debug ( " Table test.cs is not yet ready " )
time . sleep ( 0.5 )
2022-05-17 13:58:09 +00:00
messages_num = 1000
messages = [ ]
for i in range ( messages_num ) :
messages . append ( json . dumps ( { " key " : i , " value " : i } ) )
2022-07-24 14:13:17 +00:00
asyncio . run ( nats_produce_messages ( nats_cluster , " cs " , messages ) )
2022-05-17 13:58:09 +00:00
2022-06-02 09:09:24 +00:00
for _ in range ( 20 ) :
2022-05-17 13:58:09 +00:00
result = instance . query ( " SELECT count() FROM test.view " )
time . sleep ( 1 )
if int ( result ) == messages_num :
break
instance . query (
"""
DROP TABLE test . consumer ;
DROP TABLE test . cs ;
"""
)
assert int ( result ) == messages_num , " ClickHouse lost some messages: {} " . format (
result
)
def test_nats_format_factory_settings ( nats_cluster ) :
instance . query (
"""
CREATE TABLE test . format_settings (
id String , date DateTime
) ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_subjects = ' format_settings ' ,
nats_format = ' JSONEachRow ' ,
date_time_input_format = ' best_effort ' ;
"""
)
2022-05-25 18:31:14 +00:00
while not check_table_is_ready ( instance , " test.format_settings " ) :
2022-05-17 13:58:09 +00:00
logging . debug ( " Table test.format_settings is not yet ready " )
time . sleep ( 0.5 )
message = json . dumps (
{ " id " : " format_settings_test " , " date " : " 2021-01-19T14:42:33.1829214Z " }
)
expected = instance . query (
""" SELECT parseDateTimeBestEffort(CAST( ' 2021-01-19T14:42:33.1829214Z ' , ' String ' )) """
)
2022-07-24 14:13:17 +00:00
asyncio . run ( nats_produce_messages ( nats_cluster , " format_settings " , [ message ] ) )
2022-05-17 13:58:09 +00:00
while True :
result = instance . query ( " SELECT date FROM test.format_settings " )
if result == expected :
break
instance . query (
"""
CREATE TABLE test . view (
id String , date DateTime
) ENGINE = MergeTree ORDER BY id ;
CREATE MATERIALIZED VIEW test . consumer TO test . view AS
SELECT * FROM test . format_settings ;
"""
)
2022-07-24 14:13:17 +00:00
asyncio . run ( nats_produce_messages ( nats_cluster , " format_settings " , [ message ] ) )
2022-05-17 13:58:09 +00:00
while True :
result = instance . query ( " SELECT date FROM test.view " )
if result == expected :
break
instance . query (
"""
DROP TABLE test . consumer ;
DROP TABLE test . format_settings ;
"""
)
assert result == expected
def test_nats_bad_args ( nats_cluster ) :
instance . query_and_get_error (
"""
CREATE TABLE test . drop ( key UInt64 , value UInt64 )
ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_secure = true ,
nats_format = ' JSONEachRow ' ;
"""
)
def test_nats_drop_mv ( nats_cluster ) :
instance . query (
"""
CREATE TABLE test . nats ( key UInt64 , value UInt64 )
ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_subjects = ' mv ' ,
nats_format = ' JSONEachRow ' ;
CREATE TABLE test . view ( key UInt64 , value UInt64 )
ENGINE = MergeTree ( )
ORDER BY key ;
CREATE MATERIALIZED VIEW test . consumer TO test . view AS
SELECT * FROM test . nats ;
"""
)
2022-05-25 18:31:14 +00:00
while not check_table_is_ready ( instance , " test.nats " ) :
2022-05-17 13:58:09 +00:00
logging . debug ( " Table test.nats is not yet ready " )
time . sleep ( 0.5 )
messages = [ ]
for i in range ( 20 ) :
messages . append ( json . dumps ( { " key " : i , " value " : i } ) )
2022-07-24 14:13:17 +00:00
asyncio . run ( nats_produce_messages ( nats_cluster , " mv " , messages ) )
2022-05-17 13:58:09 +00:00
instance . query ( " DROP VIEW test.consumer " )
messages = [ ]
for i in range ( 20 , 40 ) :
messages . append ( json . dumps ( { " key " : i , " value " : i } ) )
2022-07-24 14:13:17 +00:00
asyncio . run ( nats_produce_messages ( nats_cluster , " mv " , messages ) )
2022-05-17 13:58:09 +00:00
instance . query (
"""
CREATE MATERIALIZED VIEW test . consumer TO test . view AS
SELECT * FROM test . nats ;
"""
)
messages = [ ]
for i in range ( 40 , 50 ) :
messages . append ( json . dumps ( { " key " : i , " value " : i } ) )
2022-07-24 14:13:17 +00:00
asyncio . run ( nats_produce_messages ( nats_cluster , " mv " , messages ) )
2022-05-17 13:58:09 +00:00
while True :
result = instance . query ( " SELECT * FROM test.view ORDER BY key " )
if nats_check_result ( result ) :
break
nats_check_result ( result , True )
instance . query ( " DROP VIEW test.consumer " )
messages = [ ]
for i in range ( 50 , 60 ) :
messages . append ( json . dumps ( { " key " : i , " value " : i } ) )
2022-07-24 14:13:17 +00:00
asyncio . run ( nats_produce_messages ( nats_cluster , " mv " , messages ) )
2022-05-17 13:58:09 +00:00
count = 0
while True :
count = int ( instance . query ( " SELECT count() FROM test.nats " ) )
if count :
break
assert count > 0
def test_nats_predefined_configuration ( nats_cluster ) :
instance . query (
"""
CREATE TABLE test . nats ( key UInt64 , value UInt64 )
ENGINE = NATS ( nats1 ) """
)
2022-05-25 18:31:14 +00:00
while not check_table_is_ready ( instance , " test.nats " ) :
2022-05-17 13:58:09 +00:00
logging . debug ( " Table test.nats is not yet ready " )
time . sleep ( 0.5 )
2022-05-25 18:31:14 +00:00
asyncio . run (
nats_produce_messages (
2022-07-24 14:13:17 +00:00
nats_cluster , " named " , [ json . dumps ( { " key " : 1 , " value " : 2 } ) ]
2022-05-25 18:31:14 +00:00
)
)
2022-05-17 13:58:09 +00:00
while True :
result = instance . query (
" SELECT * FROM test.nats ORDER BY key " , ignore_error = True
)
if result == " 1 \t 2 \n " :
break
2022-10-28 16:41:10 +00:00
def test_format_with_prefix_and_suffix ( nats_cluster ) :
instance . query (
"""
DROP TABLE IF EXISTS test . nats ;
CREATE TABLE test . nats ( key UInt64 , value UInt64 )
ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_subjects = ' custom ' ,
nats_format = ' CustomSeparated ' ;
"""
)
while not check_table_is_ready ( instance , " test.nats " ) :
logging . debug ( " Table test.nats is not yet ready " )
time . sleep ( 0.5 )
insert_messages = [ ]
async def sub_to_nats ( ) :
nc = await nats_connect_ssl (
nats_cluster . nats_port ,
user = " click " ,
password = " house " ,
ssl_ctx = nats_cluster . nats_ssl_context ,
)
sub = await nc . subscribe ( " custom " )
await sub . unsubscribe ( 2 )
async for msg in sub . messages :
insert_messages . append ( msg . data . decode ( ) )
await sub . drain ( )
await nc . drain ( )
def run_sub ( ) :
asyncio . run ( sub_to_nats ( ) )
thread = threading . Thread ( target = run_sub )
thread . start ( )
time . sleep ( 1 )
instance . query (
" INSERT INTO test.nats select number*10 as key, number*100 as value from numbers(2) settings format_custom_result_before_delimiter= ' <prefix> \n ' , format_custom_result_after_delimiter= ' <suffix> \n ' "
)
thread . join ( )
assert (
" " . join ( insert_messages )
== " <prefix> \n 0 \t 0 \n <suffix> \n <prefix> \n 10 \t 100 \n <suffix> \n "
)
def test_max_rows_per_message ( nats_cluster ) :
instance . query (
"""
DROP TABLE IF EXISTS test . view ;
DROP TABLE IF EXISTS test . nats ;
CREATE TABLE test . nats ( key UInt64 , value UInt64 )
ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_subjects = ' custom1 ' ,
nats_format = ' CustomSeparated ' ,
nats_max_rows_per_message = 3 ,
format_custom_result_before_delimiter = ' <prefix> \n ' ,
format_custom_result_after_delimiter = ' <suffix> \n ' ;
CREATE MATERIALIZED VIEW test . view Engine = Log AS
SELECT key , value FROM test . nats ;
"""
)
while not check_table_is_ready ( instance , " test.nats " ) :
logging . debug ( " Table test.nats is not yet ready " )
time . sleep ( 0.5 )
num_rows = 5
insert_messages = [ ]
async def sub_to_nats ( ) :
nc = await nats_connect_ssl (
nats_cluster . nats_port ,
user = " click " ,
password = " house " ,
ssl_ctx = nats_cluster . nats_ssl_context ,
)
sub = await nc . subscribe ( " custom1 " )
await sub . unsubscribe ( 2 )
async for msg in sub . messages :
insert_messages . append ( msg . data . decode ( ) )
await sub . drain ( )
await nc . drain ( )
def run_sub ( ) :
asyncio . run ( sub_to_nats ( ) )
thread = threading . Thread ( target = run_sub )
thread . start ( )
time . sleep ( 1 )
instance . query (
f " INSERT INTO test.nats select number*10 as key, number*100 as value from numbers( { num_rows } ) settings format_custom_result_before_delimiter= ' <prefix> \n ' , format_custom_result_after_delimiter= ' <suffix> \n ' "
)
thread . join ( )
assert (
" " . join ( insert_messages )
== " <prefix> \n 0 \t 0 \n 10 \t 100 \n 20 \t 200 \n <suffix> \n <prefix> \n 30 \t 300 \n 40 \t 400 \n <suffix> \n "
)
attempt = 0
rows = 0
while attempt < 100 :
rows = int ( instance . query ( " SELECT count() FROM test.view " ) )
if rows == num_rows :
break
attempt + = 1
assert rows == num_rows
result = instance . query ( " SELECT * FROM test.view " )
assert result == " 0 \t 0 \n 10 \t 100 \n 20 \t 200 \n 30 \t 300 \n 40 \t 400 \n "
def test_row_based_formats ( nats_cluster ) :
num_rows = 10
for format_name in [
" TSV " ,
" TSVWithNamesAndTypes " ,
" TSKV " ,
" CSV " ,
" CSVWithNamesAndTypes " ,
" CustomSeparatedWithNamesAndTypes " ,
" Values " ,
" JSON " ,
" JSONEachRow " ,
" JSONCompactEachRow " ,
" JSONCompactEachRowWithNamesAndTypes " ,
" JSONObjectEachRow " ,
" Avro " ,
" RowBinary " ,
" RowBinaryWithNamesAndTypes " ,
" MsgPack " ,
] :
print ( format_name )
instance . query (
f """
DROP TABLE IF EXISTS test . view ;
DROP TABLE IF EXISTS test . nats ;
CREATE TABLE test . nats ( key UInt64 , value UInt64 )
ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_subjects = ' {format_name} ' ,
nats_format = ' {format_name} ' ;
CREATE MATERIALIZED VIEW test . view Engine = Log AS
SELECT key , value FROM test . nats ;
"""
)
while not check_table_is_ready ( instance , " test.nats " ) :
logging . debug ( " Table test.nats is not yet ready " )
time . sleep ( 0.5 )
insert_messages = 0
async def sub_to_nats ( ) :
nc = await nats_connect_ssl (
nats_cluster . nats_port ,
user = " click " ,
password = " house " ,
ssl_ctx = nats_cluster . nats_ssl_context ,
)
sub = await nc . subscribe ( format_name )
await sub . unsubscribe ( 2 )
async for msg in sub . messages :
nonlocal insert_messages
insert_messages + = 1
await sub . drain ( )
await nc . drain ( )
def run_sub ( ) :
asyncio . run ( sub_to_nats ( ) )
thread = threading . Thread ( target = run_sub )
thread . start ( )
time . sleep ( 1 )
instance . query (
f " INSERT INTO test.nats select number*10 as key, number*100 as value from numbers( { num_rows } ) "
)
thread . join ( )
assert insert_messages == 2
attempt = 0
rows = 0
while attempt < 100 :
rows = int ( instance . query ( " SELECT count() FROM test.view " ) )
if rows == num_rows :
break
attempt + = 1
assert rows == num_rows
expected = " "
for i in range ( num_rows ) :
expected + = str ( i * 10 ) + " \t " + str ( i * 100 ) + " \n "
result = instance . query ( " SELECT * FROM test.view " )
assert result == expected
def test_block_based_formats_1 ( nats_cluster ) :
instance . query (
"""
DROP TABLE IF EXISTS test . nats ;
CREATE TABLE test . nats ( key UInt64 , value UInt64 )
ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_subjects = ' PrettySpace ' ,
nats_format = ' PrettySpace ' ;
"""
)
insert_messages = [ ]
async def sub_to_nats ( ) :
nc = await nats_connect_ssl (
nats_cluster . nats_port ,
user = " click " ,
password = " house " ,
ssl_ctx = nats_cluster . nats_ssl_context ,
)
sub = await nc . subscribe ( " PrettySpace " )
await sub . unsubscribe ( 3 )
async for msg in sub . messages :
insert_messages . append ( msg . data . decode ( ) )
await sub . drain ( )
await nc . drain ( )
def run_sub ( ) :
asyncio . run ( sub_to_nats ( ) )
thread = threading . Thread ( target = run_sub )
thread . start ( )
time . sleep ( 1 )
attempt = 0
while attempt < 100 :
try :
instance . query (
" INSERT INTO test.nats SELECT number * 10 as key, number * 100 as value FROM numbers(5) settings max_block_size=2, optimize_trivial_insert_select=0; "
)
break
except Exception :
logging . debug ( " Table test.nats is not yet ready " )
time . sleep ( 0.5 )
attempt + = 1
thread . join ( )
data = [ ]
for message in insert_messages :
splitted = message . split ( " \n " )
assert splitted [ 0 ] == " \x1b [1mkey \x1b [0m \x1b [1mvalue \x1b [0m "
assert splitted [ 1 ] == " "
assert splitted [ - 1 ] == " "
data + = [ line . split ( ) for line in splitted [ 2 : - 1 ] ]
assert data == [
[ " 0 " , " 0 " ] ,
[ " 10 " , " 100 " ] ,
[ " 20 " , " 200 " ] ,
[ " 30 " , " 300 " ] ,
[ " 40 " , " 400 " ] ,
]
def test_block_based_formats_2 ( nats_cluster ) :
num_rows = 100
for format_name in [
" JSONColumns " ,
" Native " ,
" Arrow " ,
" Parquet " ,
" ORC " ,
" JSONCompactColumns " ,
] :
print ( format_name )
instance . query (
f """
DROP TABLE IF EXISTS test . view ;
DROP TABLE IF EXISTS test . nats ;
CREATE TABLE test . nats ( key UInt64 , value UInt64 )
ENGINE = NATS
SETTINGS nats_url = ' nats1:4444 ' ,
nats_subjects = ' {format_name} ' ,
nats_format = ' {format_name} ' ;
CREATE MATERIALIZED VIEW test . view Engine = Log AS
SELECT key , value FROM test . nats ;
"""
)
while not check_table_is_ready ( instance , " test.nats " ) :
logging . debug ( " Table test.nats is not yet ready " )
time . sleep ( 0.5 )
insert_messages = 0
async def sub_to_nats ( ) :
nc = await nats_connect_ssl (
nats_cluster . nats_port ,
user = " click " ,
password = " house " ,
ssl_ctx = nats_cluster . nats_ssl_context ,
)
sub = await nc . subscribe ( format_name )
await sub . unsubscribe ( 9 )
async for msg in sub . messages :
nonlocal insert_messages
insert_messages + = 1
await sub . drain ( )
await nc . drain ( )
def run_sub ( ) :
asyncio . run ( sub_to_nats ( ) )
thread = threading . Thread ( target = run_sub )
thread . start ( )
time . sleep ( 1 )
instance . query (
f " INSERT INTO test.nats SELECT number * 10 as key, number * 100 as value FROM numbers( { num_rows } ) settings max_block_size=12, optimize_trivial_insert_select=0; "
)
thread . join ( )
assert insert_messages == 9
attempt = 0
rows = 0
while attempt < 100 :
rows = int ( instance . query ( " SELECT count() FROM test.view " ) )
if rows == num_rows :
break
attempt + = 1
assert rows == num_rows
result = instance . query ( " SELECT * FROM test.view ORDER by key " )
expected = " "
for i in range ( num_rows ) :
expected + = str ( i * 10 ) + " \t " + str ( i * 100 ) + " \n "
assert result == expected
2022-05-17 13:58:09 +00:00
if __name__ == " __main__ " :
cluster . start ( )
input ( " Cluster created, press any key to destroy... " )
cluster . shutdown ( )