2017-06-14 14:38:08 +00:00
import time
2020-09-16 04:26:10 +00:00
import pytest
2021-04-28 06:43:56 +00:00
from helpers . client import QueryRuntimeException
2017-06-14 14:38:08 +00:00
from helpers . cluster import ClickHouseCluster
from helpers . network import PartitionManager
2017-07-11 11:44:16 +00:00
from helpers . test_tools import TSV
2017-06-14 14:38:08 +00:00
cluster = ClickHouseCluster ( __file__ )
2022-03-22 16:39:58 +00:00
instance_test_reconnect = cluster . add_instance (
" instance_test_reconnect " , main_configs = [ " configs/remote_servers.xml " ]
)
2017-07-11 11:44:16 +00:00
instance_test_inserts_batching = cluster . add_instance (
2022-03-22 16:39:58 +00:00
" instance_test_inserts_batching " ,
main_configs = [ " configs/remote_servers.xml " ] ,
user_configs = [ " configs/enable_distributed_inserts_batching.xml " ] ,
)
remote = cluster . add_instance (
" remote " , main_configs = [ " configs/forbid_background_merges.xml " ]
)
2017-06-14 14:38:08 +00:00
2017-08-23 10:45:23 +00:00
instance_test_inserts_local_cluster = cluster . add_instance (
2022-03-22 16:39:58 +00:00
" instance_test_inserts_local_cluster " , main_configs = [ " configs/remote_servers.xml " ]
)
2017-08-23 10:45:23 +00:00
2022-03-22 16:39:58 +00:00
node1 = cluster . add_instance (
" node1 " , main_configs = [ " configs/remote_servers.xml " ] , with_zookeeper = True
)
node2 = cluster . add_instance (
" node2 " , main_configs = [ " configs/remote_servers.xml " ] , with_zookeeper = True
)
2018-08-10 01:27:54 +00:00
2022-03-22 16:39:58 +00:00
shard1 = cluster . add_instance (
" shard1 " , main_configs = [ " configs/remote_servers.xml " ] , with_zookeeper = True
)
shard2 = cluster . add_instance (
" shard2 " , main_configs = [ " configs/remote_servers.xml " ] , with_zookeeper = True
)
2017-08-23 10:45:23 +00:00
2020-09-16 04:26:10 +00:00
2017-06-14 14:38:08 +00:00
@pytest.fixture ( scope = " module " )
def started_cluster ( ) :
try :
cluster . start ( )
2017-07-11 11:44:16 +00:00
remote . query ( " CREATE TABLE local1 (x UInt32) ENGINE = Log " )
2022-03-22 16:39:58 +00:00
instance_test_reconnect . query (
"""
2017-07-11 11:44:16 +00:00
CREATE TABLE distributed ( x UInt32 ) ENGINE = Distributed ( ' test_cluster ' , ' default ' , ' local1 ' )
2022-03-22 16:39:58 +00:00
"""
)
2017-06-14 14:38:08 +00:00
2022-03-22 16:39:58 +00:00
remote . query (
2022-06-23 08:37:52 +00:00
" CREATE TABLE local2 (d Date, x UInt32, s String) ENGINE = MergeTree PARTITION BY toYYYYMM(d) ORDER BY x "
2022-03-22 16:39:58 +00:00
)
instance_test_inserts_batching . query (
"""
2021-01-09 14:51:30 +00:00
CREATE TABLE distributed ( d Date , x UInt32 ) ENGINE = Distributed ( ' test_cluster ' , ' default ' , ' local2 ' ) SETTINGS fsync_after_insert = 1 , fsync_directories = 1
2022-03-22 16:39:58 +00:00
"""
)
2017-08-23 10:45:23 +00:00
2020-09-16 04:26:10 +00:00
instance_test_inserts_local_cluster . query (
2022-06-23 08:37:52 +00:00
" CREATE TABLE local (d Date, x UInt32) ENGINE = MergeTree PARTITION BY toYYYYMM(d) ORDER BY x "
2022-03-22 16:39:58 +00:00
)
instance_test_inserts_local_cluster . query (
"""
2017-08-23 10:45:23 +00:00
CREATE TABLE distributed_on_local ( d Date , x UInt32 ) ENGINE = Distributed ( ' test_local_cluster ' , ' default ' , ' local ' )
2022-03-22 16:39:58 +00:00
"""
)
2017-06-14 14:38:08 +00:00
2022-03-22 16:39:58 +00:00
node1 . query (
"""
2022-06-23 08:37:52 +00:00
CREATE TABLE replicated ( date Date , id UInt32 ) ENGINE = ReplicatedMergeTree ( ' /clickhouse/tables/0/replicated ' , ' node1 ' ) PARTITION BY toYYYYMM ( date ) ORDER BY id
2022-03-22 16:39:58 +00:00
"""
)
node2 . query (
"""
2022-06-23 08:37:52 +00:00
CREATE TABLE replicated ( date Date , id UInt32 ) ENGINE = ReplicatedMergeTree ( ' /clickhouse/tables/0/replicated ' , ' node2 ' ) PARTITION BY toYYYYMM ( date ) ORDER BY id
2022-03-22 16:39:58 +00:00
"""
)
2018-08-10 01:27:54 +00:00
2022-03-22 16:39:58 +00:00
node1 . query (
"""
2018-08-10 01:27:54 +00:00
CREATE TABLE distributed ( date Date , id UInt32 ) ENGINE = Distributed ( ' shard_with_local_replica ' , ' default ' , ' replicated ' )
2022-03-22 16:39:58 +00:00
"""
)
2018-08-10 01:27:54 +00:00
2022-03-22 16:39:58 +00:00
node2 . query (
"""
2018-08-10 01:27:54 +00:00
CREATE TABLE distributed ( date Date , id UInt32 ) ENGINE = Distributed ( ' shard_with_local_replica ' , ' default ' , ' replicated ' )
2022-03-22 16:39:58 +00:00
"""
)
2018-12-03 13:11:26 +00:00
2022-03-22 16:39:58 +00:00
shard1 . query (
"""
2022-06-23 08:37:52 +00:00
CREATE TABLE low_cardinality ( d Date , x UInt32 , s LowCardinality ( String ) ) ENGINE = MergeTree PARTITION BY toYYYYMM ( d ) ORDER BY x """
2022-03-22 16:39:58 +00:00
)
2018-12-04 07:11:39 +00:00
2022-03-22 16:39:58 +00:00
shard2 . query (
"""
2022-06-23 08:37:52 +00:00
CREATE TABLE low_cardinality ( d Date , x UInt32 , s LowCardinality ( String ) ) ENGINE = MergeTree PARTITION BY toYYYYMM ( d ) ORDER BY x """
2022-03-22 16:39:58 +00:00
)
2018-12-04 07:11:39 +00:00
2022-03-22 16:39:58 +00:00
shard1 . query (
"""
CREATE TABLE low_cardinality_all ( d Date , x UInt32 , s LowCardinality ( String ) ) ENGINE = Distributed ( ' shard_with_low_cardinality ' , ' default ' , ' low_cardinality ' , sipHash64 ( s ) ) """
)
2018-12-04 07:11:39 +00:00
2022-03-22 16:39:58 +00:00
node1 . query (
"""
CREATE TABLE table_function ( n UInt8 , s String ) ENGINE = MergeTree ( ) ORDER BY n """
)
2019-11-13 18:35:35 +00:00
2022-03-22 16:39:58 +00:00
node2 . query (
"""
CREATE TABLE table_function ( n UInt8 , s String ) ENGINE = MergeTree ( ) ORDER BY n """
)
2019-11-13 18:35:35 +00:00
2022-03-22 16:39:58 +00:00
node1 . query (
"""
2021-04-28 06:43:56 +00:00
CREATE TABLE distributed_one_replica_internal_replication ( date Date , id UInt32 ) ENGINE = Distributed ( ' shard_with_local_replica_internal_replication ' , ' default ' , ' single_replicated ' )
2022-03-22 16:39:58 +00:00
"""
)
2021-04-20 14:07:19 +00:00
2022-03-22 16:39:58 +00:00
node2 . query (
"""
2021-04-28 06:43:56 +00:00
CREATE TABLE distributed_one_replica_internal_replication ( date Date , id UInt32 ) ENGINE = Distributed ( ' shard_with_local_replica_internal_replication ' , ' default ' , ' single_replicated ' )
2022-03-22 16:39:58 +00:00
"""
)
2021-04-28 06:43:56 +00:00
2022-03-22 16:39:58 +00:00
node1 . query (
"""
2021-04-28 06:43:56 +00:00
CREATE TABLE distributed_one_replica_no_internal_replication ( date Date , id UInt32 ) ENGINE = Distributed ( ' shard_with_local_replica ' , ' default ' , ' single_replicated ' )
2022-03-22 16:39:58 +00:00
"""
)
2021-04-28 06:43:56 +00:00
2022-03-22 16:39:58 +00:00
node2 . query (
"""
2021-04-28 06:43:56 +00:00
CREATE TABLE distributed_one_replica_no_internal_replication ( date Date , id UInt32 ) ENGINE = Distributed ( ' shard_with_local_replica ' , ' default ' , ' single_replicated ' )
2022-03-22 16:39:58 +00:00
"""
)
2021-04-20 14:07:19 +00:00
2022-03-22 16:39:58 +00:00
node2 . query (
"""
2022-06-23 08:37:52 +00:00
CREATE TABLE single_replicated ( date Date , id UInt32 ) ENGINE = ReplicatedMergeTree ( ' /clickhouse/tables/0/single_replicated ' , ' node2 ' ) PARTITION BY toYYYYMM ( date ) ORDER BY id
2022-03-22 16:39:58 +00:00
"""
)
2021-04-20 14:07:19 +00:00
2017-06-14 14:38:08 +00:00
yield cluster
finally :
cluster . shutdown ( )
def test_reconnect ( started_cluster ) :
2017-07-11 11:44:16 +00:00
instance = instance_test_reconnect
2017-06-14 14:38:08 +00:00
with PartitionManager ( ) as pm :
# Open a connection for insertion.
2017-07-11 11:44:16 +00:00
instance . query ( " INSERT INTO distributed VALUES (1) " )
2019-02-05 10:15:14 +00:00
time . sleep ( 1 )
2022-03-22 16:39:58 +00:00
assert remote . query ( " SELECT count(*) FROM local1 " ) . strip ( ) == " 1 "
2017-06-14 14:38:08 +00:00
# Now break the connection.
2022-03-22 16:39:58 +00:00
pm . partition_instances (
instance , remote , action = " REJECT --reject-with tcp-reset "
)
2017-07-11 11:44:16 +00:00
instance . query ( " INSERT INTO distributed VALUES (2) " )
2019-02-05 10:15:14 +00:00
time . sleep ( 1 )
2017-06-14 14:38:08 +00:00
# Heal the partition and insert more data.
# The connection must be reestablished and after some time all data must be inserted.
pm . heal_all ( )
2019-02-05 10:15:14 +00:00
time . sleep ( 1 )
2017-07-11 11:44:16 +00:00
instance . query ( " INSERT INTO distributed VALUES (3) " )
2021-03-16 10:00:49 +00:00
time . sleep ( 5 )
2017-07-11 11:44:16 +00:00
2022-03-22 16:39:58 +00:00
assert remote . query ( " SELECT count(*) FROM local1 " ) . strip ( ) == " 3 "
2017-07-11 11:44:16 +00:00
def test_inserts_batching ( started_cluster ) :
instance = instance_test_inserts_batching
with PartitionManager ( ) as pm :
pm . partition_instances ( instance , remote )
instance . query ( " INSERT INTO distributed(d, x) VALUES ( ' 2000-01-01 ' , 1) " )
# Sleep a bit so that this INSERT forms a batch of its own.
time . sleep ( 0.1 )
instance . query ( " INSERT INTO distributed(x, d) VALUES (2, ' 2000-01-01 ' ) " )
for i in range ( 3 , 7 ) :
2022-03-22 16:39:58 +00:00
instance . query (
" INSERT INTO distributed(d, x) VALUES ( ' 2000-01-01 ' , {} ) " . format ( i )
)
2017-07-11 11:44:16 +00:00
for i in range ( 7 , 9 ) :
2022-03-22 16:39:58 +00:00
instance . query (
" INSERT INTO distributed(x, d) VALUES ( {} , ' 2000-01-01 ' ) " . format ( i )
)
2017-07-11 11:44:16 +00:00
instance . query ( " INSERT INTO distributed(d, x) VALUES ( ' 2000-01-01 ' , 9) " )
2017-07-21 11:58:18 +00:00
# After ALTER the structure of the saved blocks will be different
instance . query ( " ALTER TABLE distributed ADD COLUMN s String " )
for i in range ( 10 , 13 ) :
2022-03-22 16:39:58 +00:00
instance . query (
" INSERT INTO distributed(d, x) VALUES ( ' 2000-01-01 ' , {} ) " . format ( i )
)
2017-07-21 11:58:18 +00:00
2020-04-20 03:15:29 +00:00
instance . query ( " SYSTEM FLUSH DISTRIBUTED distributed " )
2017-07-11 11:44:16 +00:00
time . sleep ( 1.0 )
2022-03-22 16:39:58 +00:00
result = remote . query (
" SELECT _part, groupArray(x) FROM local2 GROUP BY _part ORDER BY _part "
)
2017-07-11 11:44:16 +00:00
# Explanation: as merges are turned off on remote instance, active parts in local2 table correspond 1-to-1
# to inserted blocks.
# Batches of max 3 rows are formed as min_insert_block_size_rows = 3.
# Blocks:
# 1. Failed batch that is retried with the same contents.
2018-11-26 14:52:43 +00:00
# 2. Full batch of inserts before ALTER.
# 3. Full batch of inserts before ALTER.
2017-07-21 11:58:18 +00:00
# 4. Full batch of inserts after ALTER (that have different block structure).
2018-11-26 14:52:43 +00:00
# 5. What was left to insert with the column structure before ALTER.
2022-03-22 16:39:58 +00:00
expected = """ \
2022-06-24 17:10:33 +00:00
200001_1_1_0 \t [ 1 ]
200001_2_2_0 \t [ 2 , 3 , 4 ]
200001_3_3_0 \t [ 5 , 6 , 7 ]
200001_4_4_0 \t [ 10 , 11 , 12 ]
200001_5_5_0 \t [ 8 , 9 ]
2022-03-22 16:39:58 +00:00
"""
2017-07-11 11:44:16 +00:00
assert TSV ( result ) == TSV ( expected )
2017-08-23 10:45:23 +00:00
def test_inserts_local ( started_cluster ) :
instance = instance_test_inserts_local_cluster
instance . query ( " INSERT INTO distributed_on_local VALUES ( ' 2000-01-01 ' , 1) " )
time . sleep ( 0.5 )
2022-03-22 16:39:58 +00:00
assert instance . query ( " SELECT count(*) FROM local " ) . strip ( ) == " 1 "
2018-08-10 01:27:54 +00:00
2020-09-16 04:26:10 +00:00
2021-04-28 06:43:56 +00:00
def test_inserts_single_replica_local_internal_replication ( started_cluster ) :
2022-03-22 16:39:58 +00:00
with pytest . raises (
2023-08-11 10:24:16 +00:00
QueryRuntimeException , match = " Table default.single_replicated does not exist "
2022-03-22 16:39:58 +00:00
) :
2021-04-28 06:43:56 +00:00
node1 . query (
" INSERT INTO distributed_one_replica_internal_replication VALUES ( ' 2000-01-01 ' , 1) " ,
settings = {
" insert_distributed_sync " : " 1 " ,
" prefer_localhost_replica " : " 1 " ,
# to make the test more deterministic
" load_balancing " : " first_or_random " ,
} ,
)
2022-03-22 16:39:58 +00:00
assert node2 . query ( " SELECT count(*) FROM single_replicated " ) . strip ( ) == " 0 "
2021-04-28 06:43:56 +00:00
def test_inserts_single_replica_internal_replication ( started_cluster ) :
2021-05-05 18:00:46 +00:00
try :
2021-04-28 06:43:56 +00:00
node1 . query (
2021-05-05 18:00:46 +00:00
" INSERT INTO distributed_one_replica_internal_replication VALUES ( ' 2000-01-01 ' , 1) " ,
2021-04-28 06:43:56 +00:00
settings = {
" insert_distributed_sync " : " 1 " ,
" prefer_localhost_replica " : " 0 " ,
2021-05-05 18:00:46 +00:00
# to make the test more deterministic
" load_balancing " : " first_or_random " ,
2021-04-28 06:43:56 +00:00
} ,
)
2022-03-22 16:39:58 +00:00
assert node2 . query ( " SELECT count(*) FROM single_replicated " ) . strip ( ) == " 1 "
2021-05-05 18:00:46 +00:00
finally :
node2 . query ( " TRUNCATE TABLE single_replicated " )
def test_inserts_single_replica_no_internal_replication ( started_cluster ) :
try :
2022-03-22 16:39:58 +00:00
with pytest . raises (
2023-08-11 10:24:16 +00:00
QueryRuntimeException ,
match = " Table default.single_replicated does not exist " ,
2022-03-22 16:39:58 +00:00
) :
2021-05-05 18:00:46 +00:00
node1 . query (
" INSERT INTO distributed_one_replica_no_internal_replication VALUES ( ' 2000-01-01 ' , 1) " ,
settings = {
" insert_distributed_sync " : " 1 " ,
" prefer_localhost_replica " : " 0 " ,
} ,
)
2023-02-25 00:18:34 +00:00
assert node2 . query ( " SELECT count(*) FROM single_replicated " ) . strip ( ) == " 0 "
2021-05-05 18:00:46 +00:00
finally :
node2 . query ( " TRUNCATE TABLE single_replicated " )
2021-04-28 06:43:56 +00:00
2018-08-10 01:27:54 +00:00
def test_prefer_localhost_replica ( started_cluster ) :
2020-01-28 18:54:25 +00:00
test_query = " SELECT * FROM distributed ORDER BY id "
2018-08-10 01:27:54 +00:00
node1 . query ( " INSERT INTO distributed VALUES (toDate( ' 2017-06-17 ' ), 11) " )
node2 . query ( " INSERT INTO distributed VALUES (toDate( ' 2017-06-17 ' ), 22) " )
time . sleep ( 1.0 )
2020-01-28 18:54:25 +00:00
2022-03-22 16:39:58 +00:00
expected_distributed = """ \
2018-08-10 01:31:55 +00:00
2017 - 06 - 17 \t11
2017 - 06 - 17 \t22
2022-03-22 16:39:58 +00:00
"""
2020-01-28 18:54:25 +00:00
2022-03-22 16:39:58 +00:00
expected_from_node2 = """ \
2018-08-10 01:31:55 +00:00
2017 - 06 - 17 \t11
2017 - 06 - 17 \t22
2017 - 06 - 17 \t44
2022-03-22 16:39:58 +00:00
"""
2020-01-28 18:54:25 +00:00
2022-03-22 16:39:58 +00:00
expected_from_node1 = """ \
2018-08-10 01:31:55 +00:00
2017 - 06 - 17 \t11
2017 - 06 - 17 \t22
2017 - 06 - 17 \t33
2022-03-22 16:39:58 +00:00
"""
2020-01-28 18:54:25 +00:00
assert TSV ( node1 . query ( test_query ) ) == TSV ( expected_distributed )
assert TSV ( node2 . query ( test_query ) ) == TSV ( expected_distributed )
# Make replicas inconsistent by disabling merges and fetches
# for possibility of determining to which replica the query was send
node1 . query ( " SYSTEM STOP MERGES " )
node1 . query ( " SYSTEM STOP FETCHES " )
node2 . query ( " SYSTEM STOP MERGES " )
node2 . query ( " SYSTEM STOP FETCHES " )
node1 . query ( " INSERT INTO replicated VALUES (toDate( ' 2017-06-17 ' ), 33) " )
node2 . query ( " INSERT INTO replicated VALUES (toDate( ' 2017-06-17 ' ), 44) " )
time . sleep ( 1.0 )
# Query is sent to node2, as it local and prefer_localhost_replica=1
assert TSV ( node2 . query ( test_query ) ) == TSV ( expected_from_node2 )
2018-08-10 01:27:54 +00:00
# Now query is sent to node1, as it higher in order
2022-03-22 16:39:58 +00:00
assert TSV (
node2 . query (
test_query
+ " SETTINGS load_balancing= ' in_order ' , prefer_localhost_replica=0 "
)
) == TSV ( expected_from_node1 )
2020-09-16 04:26:10 +00:00
2018-12-03 13:11:26 +00:00
def test_inserts_low_cardinality ( started_cluster ) :
2018-12-03 13:22:29 +00:00
instance = shard1
2022-03-22 16:39:58 +00:00
instance . query (
" INSERT INTO low_cardinality_all (d,x,s) VALUES ( ' 2018-11-12 ' ,1, ' 123 ' ) "
)
2018-12-03 13:11:26 +00:00
time . sleep ( 0.5 )
2022-03-22 16:39:58 +00:00
assert instance . query ( " SELECT count(*) FROM low_cardinality_all " ) . strip ( ) == " 1 "
2019-11-13 18:35:35 +00:00
2020-09-16 04:26:10 +00:00
2019-11-13 18:35:35 +00:00
def test_table_function ( started_cluster ) :
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" insert into table function cluster( ' shard_with_local_replica ' , ' default ' , ' table_function ' ) select number, concat( ' str_ ' , toString(number)) from numbers(100000) "
)
assert (
node1 . query (
" select count() from cluster( ' shard_with_local_replica ' , ' default ' , ' table_function ' ) "
) . rstrip ( )
== " 100000 "
)