import time import os import threading import random from contextlib import contextmanager import pytest from helpers.cluster import ClickHouseCluster from helpers.network import PartitionManager from helpers.test_tools import TSV from helpers.client import CommandRequest cluster = ClickHouseCluster(__file__) node1 = cluster.add_instance('node1', config_dir='configs', with_zookeeper=True, macros={"layer": 0, "shard": 0, "replica": 1}) node2 = cluster.add_instance('node2', config_dir='configs', with_zookeeper=True, macros={"layer": 0, "shard": 0, "replica": 2}) nodes = [node1, node2] @pytest.fixture(scope="module") def started_cluster(): try: cluster.start() yield cluster finally: pass cluster.shutdown() def test_random_inserts(started_cluster): # Duration of the test, reduce it if don't want to wait DURATION_SECONDS = 10# * 60 node1.query(""" CREATE TABLE simple ON CLUSTER test_cluster (date Date, i UInt32, s String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/simple', '{replica}', date, i, 8192)""") with PartitionManager() as pm_random_drops: for sacrifice in nodes: pass # This test doesn't work with partition problems still #pm_random_drops._add_rule({'probability': 0.01, 'destination': sacrifice.ip_address, 'source_port': 2181, 'action': 'REJECT --reject-with tcp-reset'}) #pm_random_drops._add_rule({'probability': 0.01, 'source': sacrifice.ip_address, 'destination_port': 2181, 'action': 'REJECT --reject-with tcp-reset'}) min_timestamp = int(time.time()) max_timestamp = min_timestamp + DURATION_SECONDS num_timestamps = max_timestamp - min_timestamp + 1 bash_script = os.path.join(os.path.dirname(__file__), "test.sh") inserters = [] for node in nodes: cmd = ['/bin/bash', bash_script, node.ip_address, str(min_timestamp), str(max_timestamp), str(cluster.get_client_cmd())] inserters.append(CommandRequest(cmd, timeout=DURATION_SECONDS * 2, stdin='')) print node.name, node.ip_address for inserter in inserters: inserter.get_answer() answer="{}\t{}\t{}\t{}\n".format(num_timestamps, num_timestamps, min_timestamp, max_timestamp) for node in nodes: res = node.query_with_retry("SELECT count(), uniqExact(i), min(i), max(i) FROM simple", check_callback=lambda res: TSV(res) == TSV(answer)) assert TSV(res) == TSV(answer), node.name + " : " + node.query("SELECT groupArray(_part), i, count() AS c FROM simple GROUP BY i ORDER BY c DESC LIMIT 1") node1.query("""DROP TABLE simple ON CLUSTER test_cluster""") class Runner: def __init__(self): self.mtx = threading.Lock() self.total_inserted = 0 self.inserted_vals = set() self.inserted_payloads = set() self.stop_ev = threading.Event() def do_insert(self, thread_num): self.stop_ev.wait(random.random()) year = 2000 month = '01' day = str(thread_num + 1).zfill(2) x = 1 while not self.stop_ev.is_set(): payload = """ {year}-{month}-{day} {x1} {year}-{month}-{day} {x2} """.format(year=year, month=month, day=day, x1=x, x2=(x + 1)).strip() try: random.choice(nodes).query("INSERT INTO repl_test FORMAT TSV", payload) # print 'thread {}: insert {}, {}'.format(thread_num, i, i + 1) self.mtx.acquire() if payload not in self.inserted_payloads: self.inserted_payloads.add(payload) self.inserted_vals.add(x) self.inserted_vals.add(x + 1) self.total_inserted += 2 * x + 1 self.mtx.release() except Exception, e: print 'Exception:', e x += 2 self.stop_ev.wait(0.1 + random.random() / 10) def test_insert_multithreaded(started_cluster): DURATION_SECONDS = 50 for node in nodes: node.query("DROP TABLE IF EXISTS repl_test") for node in nodes: node.query("CREATE TABLE repl_test(d Date, x UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/repl_test', '{replica}') ORDER BY x PARTITION BY toYYYYMM(d)") runner = Runner() threads = [] for thread_num in range(5): threads.append(threading.Thread(target=runner.do_insert, args=(thread_num, ))) for t in threads: t.start() time.sleep(DURATION_SECONDS) runner.stop_ev.set() for t in threads: t.join() # Sanity check: at least something was inserted assert runner.total_inserted > 0 all_replicated = False for i in range(100): # wait for replication 50 seconds max time.sleep(0.5) def get_delay(node): return int(node.query("SELECT absolute_delay FROM system.replicas WHERE table = 'repl_test'").rstrip()) if all([get_delay(n) == 0 for n in nodes]): all_replicated = True break assert all_replicated # Now we can be sure that all replicated fetches started, but they may not # be finished yet so we additionaly sync replicas, to be sure, that we have # all data on both replicas for node in nodes: node.query("SYSTEM SYNC REPLICA repl_test", timeout=10) actual_inserted = [] for i, node in enumerate(nodes): actual_inserted.append(int(node.query("SELECT sum(x) FROM repl_test").rstrip())) assert actual_inserted[i] == runner.total_inserted