mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-17 13:13:36 +00:00
b738d1baed
Fixed handling of obsolete parts. Fixed conflict resolution between simultaneous PreCommitted covering parts. Fixed memory leak caused by ordinary MergeTree parts stucked in Deleting state. Added hidden _state column into system.parts.
65 lines
2.5 KiB
Python
65 lines
2.5 KiB
Python
import time
|
|
import os
|
|
from contextlib import contextmanager
|
|
|
|
import pytest
|
|
|
|
from helpers.cluster import ClickHouseCluster
|
|
from helpers.network import PartitionManager
|
|
from helpers.test_tools import TSV
|
|
from helpers.client import CommandRequest
|
|
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
|
|
node1 = cluster.add_instance('node1', config_dir='configs', with_zookeeper=True, macroses={"layer": 0, "shard": 0, "replica": 1})
|
|
node2 = cluster.add_instance('node2', config_dir='configs', with_zookeeper=True, macroses={"layer": 0, "shard": 0, "replica": 2})
|
|
nodes = [node1, node2]
|
|
|
|
@pytest.fixture(scope="module")
|
|
def started_cluster():
|
|
try:
|
|
cluster.start()
|
|
yield cluster
|
|
|
|
finally:
|
|
pass
|
|
cluster.shutdown()
|
|
|
|
|
|
def test_random_inserts(started_cluster):
|
|
# Duration of the test, reduce it if don't want to wait
|
|
DURATION_SECONDS = 10# * 60
|
|
|
|
node1.query("""
|
|
CREATE TABLE simple ON CLUSTER test_cluster (date Date, i UInt32, s String)
|
|
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/simple', '{replica}', date, i, 8192)""")
|
|
|
|
with PartitionManager() as pm_random_drops:
|
|
for sacrifice in nodes:
|
|
pass # This test doesn't work with partition problems still
|
|
#pm_random_drops._add_rule({'probability': 0.01, 'destination': sacrifice.ip_address, 'source_port': 2181, 'action': 'REJECT --reject-with tcp-reset'})
|
|
#pm_random_drops._add_rule({'probability': 0.01, 'source': sacrifice.ip_address, 'destination_port': 2181, 'action': 'REJECT --reject-with tcp-reset'})
|
|
|
|
min_timestamp = int(time.time())
|
|
max_timestamp = min_timestamp + DURATION_SECONDS
|
|
num_timestamps = max_timestamp - min_timestamp + 1
|
|
|
|
bash_script = os.path.join(os.path.dirname(__file__), "test.sh")
|
|
inserters = []
|
|
for node in nodes:
|
|
cmd = ['/bin/bash', bash_script, node.ip_address, str(min_timestamp), str(max_timestamp)]
|
|
inserters.append(CommandRequest(cmd, timeout=DURATION_SECONDS * 2, stdin=''))
|
|
print node.name, node.ip_address
|
|
|
|
for inserter in inserters:
|
|
inserter.get_answer()
|
|
|
|
answer="{}\t{}\t{}\t{}\n".format(num_timestamps, num_timestamps, min_timestamp, max_timestamp)
|
|
|
|
for node in nodes:
|
|
res = node.query("SELECT count(), uniqExact(i), min(i), max(i) FROM simple")
|
|
assert TSV(res) == TSV(answer), node.name + " : " + node.query("SELECT groupArray(_part), i, count() AS c FROM simple GROUP BY i ORDER BY c DESC LIMIT 1")
|
|
|
|
node1.query("""DROP TABLE simple ON CLUSTER test_cluster""")
|