2021-05-27 12:54:47 +00:00
#!/usr/bin/env python3
from helpers . cluster import ClickHouseCluster
import pytest
import random
import string
from helpers . network import NetThroughput
import subprocess
import time
2021-05-27 15:02:06 +00:00
import statistics
2021-05-27 12:54:47 +00:00
cluster = ClickHouseCluster ( __file__ )
node1 = cluster . add_instance ( ' node1 ' , with_zookeeper = True )
node2 = cluster . add_instance ( ' node2 ' , with_zookeeper = True )
node3 = cluster . add_instance ( ' node3 ' , user_configs = [ ' configs/limit_replication_config.xml ' ] , with_zookeeper = True )
@pytest.fixture ( scope = " module " )
def start_cluster ( ) :
try :
cluster . start ( )
yield cluster
finally :
cluster . shutdown ( )
def get_random_string ( length ) :
return ' ' . join ( random . choice ( string . ascii_uppercase + string . digits ) for _ in range ( length ) )
def test_limited_fetch_single_table ( start_cluster ) :
2021-06-02 08:22:44 +00:00
print ( " Limited fetches single table " )
2021-05-27 12:54:47 +00:00
try :
for i , node in enumerate ( [ node1 , node2 ] ) :
node . query ( f " CREATE TABLE limited_fetch_table(key UInt64, data String) ENGINE = ReplicatedMergeTree( ' /clickhouse/tables/limited_fetch_table ' , ' { i } ' ) ORDER BY tuple() PARTITION BY key SETTINGS max_replicated_fetches_network_bandwidth=10485760 " )
node2 . query ( " SYSTEM STOP FETCHES limited_fetch_table " )
for i in range ( 5 ) :
node1 . query ( " INSERT INTO limited_fetch_table SELECT {} , ' {} ' FROM numbers(300) " . format ( i , get_random_string ( 104857 ) ) )
n1_net = NetThroughput ( node1 )
n2_net = NetThroughput ( node2 )
node2 . query ( " SYSTEM START FETCHES limited_fetch_table " )
n2_fetch_speed = [ ]
for i in range ( 10 ) :
n1_in , n1_out = n1_net . measure_speed ( ' megabytes ' )
n2_in , n2_out = n2_net . measure_speed ( ' megabytes ' )
print ( " [N1] input: " , n1_in , ' MB/s ' , " output: " , n1_out , " MB/s " )
print ( " [N2] input: " , n2_in , ' MB/s ' , " output: " , n2_out , " MB/s " )
n2_fetch_speed . append ( n2_in )
time . sleep ( 0.5 )
2021-06-01 22:07:29 +00:00
median_speed = statistics . median ( n2_fetch_speed )
2021-05-27 15:02:06 +00:00
# approximate border. Without limit we will have more than 100 MB/s for very slow builds.
2021-06-01 22:07:29 +00:00
assert median_speed < = 15 , " We exceeded max fetch speed for more than 10MB/s. Must be around 10 (+- 5), got " + str ( median_speed )
2021-05-27 12:54:47 +00:00
finally :
2021-05-27 13:00:08 +00:00
for node in [ node1 , node2 ] :
node . query ( " DROP TABLE IF EXISTS limited_fetch_table SYNC " )
2021-05-27 12:54:47 +00:00
def test_limited_send_single_table ( start_cluster ) :
2021-06-02 08:22:44 +00:00
print ( " Limited sends single table " )
2021-05-27 12:54:47 +00:00
try :
for i , node in enumerate ( [ node1 , node2 ] ) :
node . query ( f " CREATE TABLE limited_send_table(key UInt64, data String) ENGINE = ReplicatedMergeTree( ' /clickhouse/tables/limited_fetch_table ' , ' { i } ' ) ORDER BY tuple() PARTITION BY key SETTINGS max_replicated_sends_network_bandwidth=5242880 " )
2021-06-01 22:07:29 +00:00
node2 . query ( " SYSTEM STOP FETCHES limited_send_table " )
2021-05-27 12:54:47 +00:00
for i in range ( 5 ) :
node1 . query ( " INSERT INTO limited_send_table SELECT {} , ' {} ' FROM numbers(150) " . format ( i , get_random_string ( 104857 ) ) )
n1_net = NetThroughput ( node1 )
n2_net = NetThroughput ( node2 )
2021-06-01 22:07:29 +00:00
node2 . query ( " SYSTEM START FETCHES limited_send_table " )
2021-05-27 12:54:47 +00:00
n1_sends_speed = [ ]
for i in range ( 10 ) :
n1_in , n1_out = n1_net . measure_speed ( ' megabytes ' )
n2_in , n2_out = n2_net . measure_speed ( ' megabytes ' )
print ( " [N1] input: " , n1_in , ' MB/s ' , " output: " , n1_out , " MB/s " )
print ( " [N2] input: " , n2_in , ' MB/s ' , " output: " , n2_out , " MB/s " )
n1_sends_speed . append ( n1_out )
time . sleep ( 0.5 )
2021-06-01 22:07:29 +00:00
median_speed = statistics . median ( n1_sends_speed )
2021-05-27 15:02:06 +00:00
# approximate border. Without limit we will have more than 100 MB/s for very slow builds.
2021-06-01 22:07:29 +00:00
assert median_speed < = 10 , " We exceeded max send speed for more than 5MB/s. Must be around 5 (+- 5), got " + str ( median_speed )
2021-05-27 12:54:47 +00:00
finally :
2021-05-27 13:00:08 +00:00
for node in [ node1 , node2 ] :
node . query ( " DROP TABLE IF EXISTS limited_send_table SYNC " )
2021-05-27 12:54:47 +00:00
def test_limited_fetches_for_server ( start_cluster ) :
2021-06-02 08:22:44 +00:00
print ( " Limited fetches for server " )
2021-05-27 12:54:47 +00:00
try :
for i , node in enumerate ( [ node1 , node3 ] ) :
for j in range ( 5 ) :
node . query ( f " CREATE TABLE limited_fetches { j } (key UInt64, data String) ENGINE = ReplicatedMergeTree( ' /clickhouse/tables/limited_fetches { j } ' , ' { i } ' ) ORDER BY tuple() PARTITION BY key " )
for j in range ( 5 ) :
node3 . query ( f " SYSTEM STOP FETCHES limited_fetches { j } " )
for i in range ( 5 ) :
node1 . query ( " INSERT INTO limited_fetches {} SELECT {} , ' {} ' FROM numbers(50) " . format ( j , i , get_random_string ( 104857 ) ) )
n1_net = NetThroughput ( node1 )
n3_net = NetThroughput ( node3 )
for j in range ( 5 ) :
node3 . query ( f " SYSTEM START FETCHES limited_fetches { j } " )
n3_fetches_speed = [ ]
2021-06-01 22:07:29 +00:00
for i in range ( 5 ) :
2021-05-27 12:54:47 +00:00
n1_in , n1_out = n1_net . measure_speed ( ' megabytes ' )
n3_in , n3_out = n3_net . measure_speed ( ' megabytes ' )
print ( " [N1] input: " , n1_in , ' MB/s ' , " output: " , n1_out , " MB/s " )
print ( " [N3] input: " , n3_in , ' MB/s ' , " output: " , n3_out , " MB/s " )
n3_fetches_speed . append ( n3_in )
time . sleep ( 0.5 )
2021-06-01 22:07:29 +00:00
median_speed = statistics . median ( n3_fetches_speed )
2021-05-27 15:02:06 +00:00
# approximate border. Without limit we will have more than 100 MB/s for very slow builds.
2021-06-01 22:07:29 +00:00
assert median_speed < = 15 , " We exceeded max fetch speed for more than 15MB/s. Must be around 5 (+- 10), got " + str ( median_speed )
2021-05-27 12:54:47 +00:00
finally :
2021-05-27 13:00:08 +00:00
for node in [ node1 , node3 ] :
2021-05-27 12:54:47 +00:00
for j in range ( 5 ) :
2021-05-27 13:00:08 +00:00
node . query ( f " DROP TABLE IF EXISTS limited_fetches { j } SYNC " )
2021-05-27 12:54:47 +00:00
def test_limited_sends_for_server ( start_cluster ) :
2021-06-02 08:22:44 +00:00
print ( " Limited sends for server " )
2021-05-27 12:54:47 +00:00
try :
for i , node in enumerate ( [ node1 , node3 ] ) :
for j in range ( 5 ) :
node . query ( f " CREATE TABLE limited_sends { j } (key UInt64, data String) ENGINE = ReplicatedMergeTree( ' /clickhouse/tables/limited_sends { j } ' , ' { i } ' ) ORDER BY tuple() PARTITION BY key " )
for j in range ( 5 ) :
2021-06-01 22:07:29 +00:00
node1 . query ( f " SYSTEM STOP FETCHES limited_sends { j } " )
2021-05-27 12:54:47 +00:00
for i in range ( 5 ) :
node3 . query ( " INSERT INTO limited_sends {} SELECT {} , ' {} ' FROM numbers(50) " . format ( j , i , get_random_string ( 104857 ) ) )
n1_net = NetThroughput ( node1 )
n3_net = NetThroughput ( node3 )
for j in range ( 5 ) :
2021-06-01 22:07:29 +00:00
node1 . query ( f " SYSTEM START FETCHES limited_sends { j } " )
2021-05-27 12:54:47 +00:00
n3_sends_speed = [ ]
2021-06-01 22:07:29 +00:00
for i in range ( 5 ) :
2021-05-27 12:54:47 +00:00
n1_in , n1_out = n1_net . measure_speed ( ' megabytes ' )
n3_in , n3_out = n3_net . measure_speed ( ' megabytes ' )
print ( " [N1] input: " , n1_in , ' MB/s ' , " output: " , n1_out , " MB/s " )
print ( " [N3] input: " , n3_in , ' MB/s ' , " output: " , n3_out , " MB/s " )
n3_sends_speed . append ( n3_out )
time . sleep ( 0.5 )
2021-06-01 22:07:29 +00:00
median_speed = statistics . median ( n3_sends_speed )
2021-05-27 15:02:06 +00:00
# approximate border. Without limit we will have more than 100 MB/s for very slow builds.
2021-06-01 22:07:29 +00:00
assert median_speed < = 20 , " We exceeded max send speed for more than 20MB/s. Must be around 5 (+- 10), got " + str ( median_speed )
2021-05-27 12:54:47 +00:00
finally :
2021-05-27 13:00:08 +00:00
for node in [ node1 , node3 ] :
2021-05-27 12:54:47 +00:00
for j in range ( 5 ) :
2021-05-27 13:00:08 +00:00
node . query ( f " DROP TABLE IF EXISTS limited_sends { j } SYNC " )
2021-05-27 12:54:47 +00:00
def test_should_execute_fetch ( start_cluster ) :
2021-06-02 08:22:44 +00:00
print ( " Should execute fetch " )
2021-05-27 12:54:47 +00:00
try :
for i , node in enumerate ( [ node1 , node2 ] ) :
node . query ( f " CREATE TABLE should_execute_table(key UInt64, data String) ENGINE = ReplicatedMergeTree( ' /clickhouse/tables/should_execute_table ' , ' { i } ' ) ORDER BY tuple() PARTITION BY key SETTINGS max_replicated_fetches_network_bandwidth=3505253 " )
node2 . query ( " SYSTEM STOP FETCHES should_execute_table " )
for i in range ( 3 ) :
node1 . query ( " INSERT INTO should_execute_table SELECT {} , ' {} ' FROM numbers(200) " . format ( i , get_random_string ( 104857 ) ) )
n1_net = NetThroughput ( node1 )
n2_net = NetThroughput ( node2 )
node2 . query ( " SYSTEM START FETCHES should_execute_table " )
for i in range ( 10 ) :
node1 . query ( " INSERT INTO should_execute_table SELECT {} , ' {} ' FROM numbers(3) " . format ( i , get_random_string ( 104857 ) ) )
n2_fetch_speed = [ ]
replication_queue_data = [ ]
for i in range ( 10 ) :
n1_in , n1_out = n1_net . measure_speed ( ' megabytes ' )
n2_in , n2_out = n2_net . measure_speed ( ' megabytes ' )
fetches_count = node2 . query ( " SELECT count() FROM system.replicated_fetches " )
if fetches_count == " 0 \n " :
break
print ( " Fetches count " , fetches_count )
replication_queue_data . append ( node2 . query ( " SELECT count() FROM system.replication_queue WHERE postpone_reason like ' %f etches have already throttled % ' " ) )
n2_fetch_speed . append ( n2_in )
time . sleep ( 0.5 )
node2 . query ( " SYSTEM SYNC REPLICA should_execute_table " )
assert any ( int ( f . strip ( ) ) != 0 for f in replication_queue_data )
assert node2 . query ( " SELECT COUNT() FROM should_execute_table " ) == " 630 \n "
finally :
2021-05-27 13:00:08 +00:00
for node in [ node1 , node2 ] :
node . query ( " DROP TABLE IF EXISTS should_execute_table SYNC " )