2017-10-06 11:29:58 +00:00
import time
import os
2018-05-13 13:31:36 +00:00
import threading
import random
2017-10-06 11:29:58 +00:00
from contextlib import contextmanager
import pytest
from helpers . cluster import ClickHouseCluster
from helpers . network import PartitionManager
from helpers . test_tools import TSV
from helpers . client import CommandRequest
cluster = ClickHouseCluster ( __file__ )
2018-07-25 16:00:51 +00:00
node1 = cluster . add_instance ( ' node1 ' , config_dir = ' configs ' , with_zookeeper = True , macros = { " layer " : 0 , " shard " : 0 , " replica " : 1 } )
node2 = cluster . add_instance ( ' node2 ' , config_dir = ' configs ' , with_zookeeper = True , macros = { " layer " : 0 , " shard " : 0 , " replica " : 2 } )
2017-10-06 11:29:58 +00:00
nodes = [ node1 , node2 ]
@pytest.fixture ( scope = " module " )
def started_cluster ( ) :
try :
cluster . start ( )
yield cluster
finally :
pass
cluster . shutdown ( )
2017-11-20 19:33:12 +00:00
2017-10-06 11:29:58 +00:00
def test_random_inserts ( started_cluster ) :
# Duration of the test, reduce it if don't want to wait
DURATION_SECONDS = 10 # * 60
node1 . query ( """
CREATE TABLE simple ON CLUSTER test_cluster ( date Date , i UInt32 , s String )
ENGINE = ReplicatedMergeTree ( ' /clickhouse/tables/ {shard} /simple ' , ' {replica} ' , date , i , 8192 ) """ )
with PartitionManager ( ) as pm_random_drops :
for sacrifice in nodes :
pass # This test doesn't work with partition problems still
#pm_random_drops._add_rule({'probability': 0.01, 'destination': sacrifice.ip_address, 'source_port': 2181, 'action': 'REJECT --reject-with tcp-reset'})
#pm_random_drops._add_rule({'probability': 0.01, 'source': sacrifice.ip_address, 'destination_port': 2181, 'action': 'REJECT --reject-with tcp-reset'})
min_timestamp = int ( time . time ( ) )
max_timestamp = min_timestamp + DURATION_SECONDS
num_timestamps = max_timestamp - min_timestamp + 1
bash_script = os . path . join ( os . path . dirname ( __file__ ) , " test.sh " )
inserters = [ ]
for node in nodes :
2018-09-07 11:51:51 +00:00
cmd = [ ' /bin/bash ' , bash_script , node . ip_address , str ( min_timestamp ) , str ( max_timestamp ) , str ( cluster . get_client_cmd ( ) ) ]
2017-10-06 11:29:58 +00:00
inserters . append ( CommandRequest ( cmd , timeout = DURATION_SECONDS * 2 , stdin = ' ' ) )
print node . name , node . ip_address
for inserter in inserters :
inserter . get_answer ( )
answer = " {} \t {} \t {} \t {} \n " . format ( num_timestamps , num_timestamps , min_timestamp , max_timestamp )
2017-11-20 19:33:12 +00:00
2017-10-06 11:29:58 +00:00
for node in nodes :
2018-09-07 11:51:51 +00:00
res = node . query_with_retry ( " SELECT count(), uniqExact(i), min(i), max(i) FROM simple " , check_callback = lambda res : TSV ( res ) == TSV ( answer ) )
2017-11-20 19:33:12 +00:00
assert TSV ( res ) == TSV ( answer ) , node . name + " : " + node . query ( " SELECT groupArray(_part), i, count() AS c FROM simple GROUP BY i ORDER BY c DESC LIMIT 1 " )
2017-10-06 11:29:58 +00:00
node1 . query ( """ DROP TABLE simple ON CLUSTER test_cluster """ )
2018-05-13 13:31:36 +00:00
class Runner :
def __init__ ( self ) :
self . mtx = threading . Lock ( )
self . total_inserted = 0
self . inserted_vals = set ( )
self . inserted_payloads = set ( )
self . stop_ev = threading . Event ( )
def do_insert ( self , thread_num ) :
self . stop_ev . wait ( random . random ( ) )
year = 2000
month = ' 01 '
day = str ( thread_num + 1 ) . zfill ( 2 )
x = 1
while not self . stop_ev . is_set ( ) :
payload = """
{ year } - { month } - { day } { x1 }
{ year } - { month } - { day } { x2 }
""" .format(year=year, month=month, day=day, x1=x, x2=(x + 1)).strip()
try :
random . choice ( nodes ) . query ( " INSERT INTO repl_test FORMAT TSV " , payload )
# print 'thread {}: insert {}, {}'.format(thread_num, i, i + 1)
self . mtx . acquire ( )
if payload not in self . inserted_payloads :
self . inserted_payloads . add ( payload )
self . inserted_vals . add ( x )
self . inserted_vals . add ( x + 1 )
self . total_inserted + = 2 * x + 1
self . mtx . release ( )
except Exception , e :
print ' Exception: ' , e
x + = 2
self . stop_ev . wait ( 0.1 + random . random ( ) / 10 )
def test_insert_multithreaded ( started_cluster ) :
DURATION_SECONDS = 50
for node in nodes :
node . query ( " DROP TABLE IF EXISTS repl_test " )
for node in nodes :
node . query ( " CREATE TABLE repl_test(d Date, x UInt32) ENGINE ReplicatedMergeTree( ' /clickhouse/tables/test/repl_test ' , ' {replica} ' ) ORDER BY x PARTITION BY toYYYYMM(d) " )
runner = Runner ( )
threads = [ ]
for thread_num in range ( 5 ) :
threads . append ( threading . Thread ( target = runner . do_insert , args = ( thread_num , ) ) )
for t in threads :
t . start ( )
time . sleep ( DURATION_SECONDS )
runner . stop_ev . set ( )
for t in threads :
t . join ( )
# Sanity check: at least something was inserted
assert runner . total_inserted > 0
2018-07-05 13:29:22 +00:00
all_replicated = False
2018-09-06 13:03:42 +00:00
for i in range ( 100 ) : # wait for replication 50 seconds max
time . sleep ( 0.5 )
2018-05-13 13:31:36 +00:00
def get_delay ( node ) :
return int ( node . query ( " SELECT absolute_delay FROM system.replicas WHERE table = ' repl_test ' " ) . rstrip ( ) )
if all ( [ get_delay ( n ) == 0 for n in nodes ] ) :
2018-07-05 13:29:22 +00:00
all_replicated = True
2018-05-13 13:31:36 +00:00
break
2018-07-05 13:29:22 +00:00
assert all_replicated
2018-05-13 13:31:36 +00:00
actual_inserted = [ ]
for i , node in enumerate ( nodes ) :
actual_inserted . append ( int ( node . query ( " SELECT sum(x) FROM repl_test " ) . rstrip ( ) ) )
assert actual_inserted [ i ] == runner . total_inserted