Merge pull request #3025 from yandex/CLICKHOUSE-3894

CLICKHOUSE-3894: Some improvements in flapping tests. Bug fix of ATTACH TABLE xxx ON CLUSTER (previously doesn't work).
This commit is contained in:
alesapin 2018-09-03 18:30:56 +03:00 committed by GitHub
commit b227e74089
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 111 additions and 84 deletions

View File

@ -244,6 +244,7 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
query->attach = attach;
query->if_not_exists = if_not_exists;
query->cluster = cluster_str;
if (database)
query->database = typeid_cast<ASTIdentifier &>(*database).name;

View File

@ -147,7 +147,7 @@ class ClickHouseCluster:
print "Mysql Started"
return
except Exception as ex:
print "Can't connecto to MySQL " + str(ex)
print "Can't connect to MySQL " + str(ex)
time.sleep(0.5)
raise Exception("Cannot wait MySQL container")
@ -162,7 +162,7 @@ class ClickHouseCluster:
print "All instances of ZooKeeper started"
return
except Exception as ex:
print "Can't connec to to ZooKeeper " + str(ex)
print "Can't connect to ZooKeeper " + str(ex)
time.sleep(0.5)
raise Exception("Cannot wait ZooKeeper container")
@ -322,8 +322,24 @@ class ClickHouseInstance:
self.image = image
# Connects to the instance via clickhouse-client, sends a query (1st argument) and returns the answer
def query(self, *args, **kwargs):
return self.client.query(*args, **kwargs)
def query(self, sql, stdin=None, timeout=None, settings=None, user=None, ignore_error=False):
return self.client.query(sql, stdin, timeout, settings, user, ignore_error)
def query_with_retry(self, sql, stdin=None, timeout=None, settings=None, user=None, ignore_error=False, retry_count=20, sleep_time=0.5, check_callback=lambda x: True):
result = None
for i in range(retry_count):
try:
result = self.query(sql, stdin, timeout, settings, user, ignore_error)
if check_callback(result):
return result
time.sleep(sleep_time)
except Exception as ex:
print "Retry {} got exception {}".format(i + 1, ex)
time.sleep(sleep_time)
if result is not None:
return result
raise Exception("Can't execute query {}".format(sql))
# As query() but doesn't wait response and returns response handler
def get_query_request(self, *args, **kwargs):

View File

@ -1,17 +1,40 @@
import difflib
import time
class TSV:
"""Helper to get pretty diffs between expected and actual tab-separated value files"""
def __init__(self, contents):
self.lines = contents.readlines() if isinstance(contents, file) else contents.splitlines(True)
raw_lines = contents.readlines() if isinstance(contents, file) else contents.splitlines(True)
self.lines = [l.strip() for l in raw_lines if l.strip()]
def __eq__(self, other):
return self.lines == other.lines
def diff(self, other):
return list(line.rstrip() for line in difflib.context_diff(self.lines, other.lines))[2:]
def __ne__(self, other):
return self.lines != other.lines
def diff(self, other, n1=None, n2=None):
return list(line.rstrip() for line in difflib.unified_diff(self.lines, other.lines, fromfile=n1, tofile=n2))[2:]
def __str__(self):
return '\n'.join(self.lines)
@staticmethod
def toMat(contents):
return [line.split("\t") for line in contents.split("\n") if line.strip()]
def assert_eq_with_retry(instance, query, expectation, retry_count=20, sleep_time=0.5, stdin=None, timeout=None, settings=None, user=None, ignore_error=False):
expectation_tsv = TSV(expectation)
for i in xrange(retry_count):
try:
if TSV(instance.query(query)) == expectation_tsv:
break
time.sleep(sleep_time)
except Exception as ex:
print "assert_eq_with_retry retry {} exception {}".format(i + 1, ex)
time.sleep(sleep_time)
else:
val = TSV(instance.query(query))
if expectation_tsv != val:
raise AssertionError("'{}' != '{}'\n{}".format(expectation_tsv, val, '\n'.join(expectation_tsv.diff(val, n1="expectation", n2="query"))))

View File

@ -5,7 +5,7 @@ import pytest
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.test_tools import TSV
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
@ -56,14 +56,14 @@ CREATE TABLE distributed(date Date, id UInt32, shard_id UInt32)
def test(started_cluster):
# Check that the data has been inserted into correct tables.
assert node1.query("SELECT id FROM shard_0.replicated") == '111\n'
assert node1.query("SELECT id FROM shard_2.replicated") == '333\n'
assert_eq_with_retry(node1, "SELECT id FROM shard_0.replicated", '111')
assert_eq_with_retry(node1, "SELECT id FROM shard_2.replicated", '333')
assert node2.query("SELECT id FROM shard_0.replicated") == '111\n'
assert node2.query("SELECT id FROM shard_1.replicated") == '222\n'
assert_eq_with_retry(node2, "SELECT id FROM shard_0.replicated", '111')
assert_eq_with_retry(node2, "SELECT id FROM shard_1.replicated", '222')
assert node3.query("SELECT id FROM shard_1.replicated") == '222\n'
assert node3.query("SELECT id FROM shard_2.replicated") == '333\n'
assert_eq_with_retry(node3, "SELECT id FROM shard_1.replicated", '222')
assert_eq_with_retry(node3, "SELECT id FROM shard_2.replicated", '333')
# Check that SELECT from the Distributed table works.
expected_from_distributed = '''\
@ -71,20 +71,20 @@ def test(started_cluster):
2017-06-16 222 1
2017-06-16 333 2
'''
assert TSV(node1.query("SELECT * FROM distributed ORDER BY id")) == TSV(expected_from_distributed)
assert TSV(node2.query("SELECT * FROM distributed ORDER BY id")) == TSV(expected_from_distributed)
assert TSV(node3.query("SELECT * FROM distributed ORDER BY id")) == TSV(expected_from_distributed)
assert_eq_with_retry(node1, "SELECT * FROM distributed ORDER BY id", expected_from_distributed)
assert_eq_with_retry(node2, "SELECT * FROM distributed ORDER BY id", expected_from_distributed)
assert_eq_with_retry(node3, "SELECT * FROM distributed ORDER BY id", expected_from_distributed)
# Now isolate node3 from other nodes and check that SELECTs on other nodes still work.
with PartitionManager() as pm:
pm.partition_instances(node3, node1, action='REJECT --reject-with tcp-reset')
pm.partition_instances(node3, node2, action='REJECT --reject-with tcp-reset')
assert TSV(node1.query("SELECT * FROM distributed ORDER BY id")) == TSV(expected_from_distributed)
assert TSV(node2.query("SELECT * FROM distributed ORDER BY id")) == TSV(expected_from_distributed)
assert_eq_with_retry(node1, "SELECT * FROM distributed ORDER BY id", expected_from_distributed)
assert_eq_with_retry(node2, "SELECT * FROM distributed ORDER BY id", expected_from_distributed)
with pytest.raises(Exception):
print node3.query("SELECT * FROM distributed ORDER BY id")
print node3.query_with_retry("SELECT * FROM distributed ORDER BY id", retry_count=5)
if __name__ == '__main__':

View File

@ -279,7 +279,9 @@ ENGINE = Distributed(cluster_without_replication, default, merge, i)
assert TSV(instance.query("SELECT i FROM all_merge_32 ORDER BY i")) == TSV(''.join(['{}\n'.format(x) for x in xrange(4)]))
time.sleep(5)
ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER cluster_without_replication MODIFY COLUMN i Int64")
time.sleep(5)
ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER cluster_without_replication ADD COLUMN s DEFAULT toString(i) FORMAT TSV")
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(4)]))

View File

@ -3,6 +3,8 @@ import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
"""
Both ssl_conf.xml and no_ssl_conf.xml have the same port
"""
@ -35,16 +37,14 @@ def both_https_cluster():
def test_both_https(both_https_cluster):
node1.query("insert into test_table values ('2017-06-16', 111, 0)")
time.sleep(1)
assert node1.query("SELECT id FROM test_table order by id") == '111\n'
assert node2.query("SELECT id FROM test_table order by id") == '111\n'
assert_eq_with_retry(node1, "SELECT id FROM test_table order by id", '111')
assert_eq_with_retry(node2, "SELECT id FROM test_table order by id", '111')
node2.query("insert into test_table values ('2017-06-17', 222, 1)")
time.sleep(1)
assert node1.query("SELECT id FROM test_table order by id") == '111\n222\n'
assert node2.query("SELECT id FROM test_table order by id") == '111\n222\n'
assert_eq_with_retry(node1, "SELECT id FROM test_table order by id", '111\n222')
assert_eq_with_retry(node2, "SELECT id FROM test_table order by id", '111\n222')
node3 = cluster.add_instance('node3', config_dir="configs", main_configs=['configs/remote_servers.xml', 'configs/no_ssl_conf.xml'], with_zookeeper=True)
node4 = cluster.add_instance('node4', config_dir="configs", main_configs=['configs/remote_servers.xml', 'configs/no_ssl_conf.xml'], with_zookeeper=True)
@ -63,16 +63,14 @@ def both_http_cluster():
def test_both_http(both_http_cluster):
node3.query("insert into test_table values ('2017-06-16', 111, 0)")
time.sleep(1)
assert node3.query("SELECT id FROM test_table order by id") == '111\n'
assert node4.query("SELECT id FROM test_table order by id") == '111\n'
assert_eq_with_retry(node3, "SELECT id FROM test_table order by id", '111')
assert_eq_with_retry(node4, "SELECT id FROM test_table order by id", '111')
node4.query("insert into test_table values ('2017-06-17', 222, 1)")
time.sleep(1)
assert node3.query("SELECT id FROM test_table order by id") == '111\n222\n'
assert node4.query("SELECT id FROM test_table order by id") == '111\n222\n'
assert_eq_with_retry(node3, "SELECT id FROM test_table order by id", '111\n222')
assert_eq_with_retry(node4, "SELECT id FROM test_table order by id", '111\n222')
node5 = cluster.add_instance('node5', config_dir="configs", main_configs=['configs/remote_servers.xml', 'configs/ssl_conf.xml'], with_zookeeper=True)
node6 = cluster.add_instance('node6', config_dir="configs", main_configs=['configs/remote_servers.xml', 'configs/no_ssl_conf.xml'], with_zookeeper=True)
@ -91,13 +89,11 @@ def mixed_protocol_cluster():
def test_mixed_protocol(mixed_protocol_cluster):
node5.query("insert into test_table values ('2017-06-16', 111, 0)")
time.sleep(1)
assert node5.query("SELECT id FROM test_table order by id") == '111\n'
assert node6.query("SELECT id FROM test_table order by id") == ''
assert_eq_with_retry(node5, "SELECT id FROM test_table order by id", '111')
assert_eq_with_retry(node6, "SELECT id FROM test_table order by id", '')
node6.query("insert into test_table values ('2017-06-17', 222, 1)")
time.sleep(1)
assert node5.query("SELECT id FROM test_table order by id") == '111\n'
assert node6.query("SELECT id FROM test_table order by id") == '222\n'
assert_eq_with_retry(node5, "SELECT id FROM test_table order by id", '111')
assert_eq_with_retry(node6, "SELECT id FROM test_table order by id", '222')

View File

@ -2,6 +2,7 @@ import time
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
def fill_nodes(nodes, shard):
for node in nodes:
@ -40,10 +41,6 @@ def test_recovery(start_cluster):
for i in range(100):
node1.query("INSERT INTO test_table VALUES (1, {})".format(i))
time.sleep(2)
node2.query_with_retry("ATTACH TABLE test_table", check_callback=lambda x: len(node2.query("select * from test_table")) > 0)
node2.query("ATTACH TABLE test_table")
time.sleep(2)
assert node1.query("SELECT count(*) FROM test_table") == node2.query("SELECT count(*) FROM test_table")
assert_eq_with_retry(node2, "SELECT count(*) FROM test_table", node1.query("SELECT count(*) FROM test_table"))

View File

@ -5,6 +5,8 @@ import sys
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
def _fill_nodes(nodes, shard):
@ -42,17 +44,15 @@ def normal_work():
def test_normal_work(normal_work):
node1.query("insert into test_table values ('2017-06-16', 111, 0)")
node1.query("insert into real_table values ('2017-06-16', 222, 0)")
time.sleep(1)
assert node1.query("SELECT id FROM test_table order by id") == '111\n'
assert node1.query("SELECT id FROM real_table order by id") == '222\n'
assert node2.query("SELECT id FROM test_table order by id") == '111\n'
assert_eq_with_retry(node1, "SELECT id FROM test_table order by id", '111')
assert_eq_with_retry(node1, "SELECT id FROM real_table order by id", '222')
assert_eq_with_retry(node2, "SELECT id FROM test_table order by id", '111')
node1.query("ALTER TABLE test_table REPLACE PARTITION 201706 FROM real_table")
time.sleep(1)
assert node1.query("SELECT id FROM test_table order by id") == '222\n'
assert node2.query("SELECT id FROM test_table order by id") == '222\n'
assert_eq_with_retry(node1, "SELECT id FROM test_table order by id", '222')
assert_eq_with_retry(node2, "SELECT id FROM test_table order by id", '222')
node3 = cluster.add_instance('node3', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
node4 = cluster.add_instance('node4', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
@ -72,11 +72,10 @@ def drop_failover():
def test_drop_failover(drop_failover):
node3.query("insert into test_table values ('2017-06-16', 111, 0)")
node3.query("insert into real_table values ('2017-06-16', 222, 0)")
time.sleep(1)
assert node3.query("SELECT id FROM test_table order by id") == '111\n'
assert node3.query("SELECT id FROM real_table order by id") == '222\n'
assert node4.query("SELECT id FROM test_table order by id") == '111\n'
assert_eq_with_retry(node3, "SELECT id FROM test_table order by id", '111')
assert_eq_with_retry(node3, "SELECT id FROM real_table order by id", '222')
assert_eq_with_retry(node4, "SELECT id FROM test_table order by id", '111')
with PartitionManager() as pm:
@ -88,23 +87,18 @@ def test_drop_failover(drop_failover):
node3.query("ALTER TABLE test_table REPLACE PARTITION 201706 FROM real_table")
# Node3 replace is ok
assert node3.query("SELECT id FROM test_table order by id") == '222\n'
assert_eq_with_retry(node3, "SELECT id FROM test_table order by id", '222')
# Network interrupted -- replace is not ok, but it's ok
assert node4.query("SELECT id FROM test_table order by id") == '111\n'
assert_eq_with_retry(node4, "SELECT id FROM test_table order by id", '111')
#Drop partition on source node
node3.query("ALTER TABLE test_table DROP PARTITION 201706")
time.sleep(1)
# connection restored
counter = 0
while counter < 10: # will lasts forever
if 'Not found part' not in node4.query("select last_exception from system.replication_queue where type = 'REPLACE_RANGE'"):
break
time.sleep(1)
counter += 1
node4.query_with_retry("select last_exception from system.replication_queue where type = 'REPLACE_RANGE'", check_callback=lambda x: 'Not found part' not in x, sleep_time=1)
assert 'Not found part' not in node4.query("select last_exception from system.replication_queue where type = 'REPLACE_RANGE'")
assert node4.query("SELECT id FROM test_table order by id") == ''
assert_eq_with_retry(node4, "SELECT id FROM test_table order by id", '')
node5 = cluster.add_instance('node5', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
node6 = cluster.add_instance('node6', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
@ -125,12 +119,11 @@ def test_replace_after_replace_failover(replace_after_replace_failover):
node5.query("insert into test_table values ('2017-06-16', 111, 0)")
node5.query("insert into real_table values ('2017-06-16', 222, 0)")
node5.query("insert into other_table values ('2017-06-16', 333, 0)")
time.sleep(1)
assert node5.query("SELECT id FROM test_table order by id") == '111\n'
assert node5.query("SELECT id FROM real_table order by id") == '222\n'
assert node5.query("SELECT id FROM other_table order by id") == '333\n'
assert node6.query("SELECT id FROM test_table order by id") == '111\n'
assert_eq_with_retry(node5, "SELECT id FROM test_table order by id", '111')
assert_eq_with_retry(node5, "SELECT id FROM real_table order by id", '222')
assert_eq_with_retry(node5, "SELECT id FROM other_table order by id", '333')
assert_eq_with_retry(node6, "SELECT id FROM test_table order by id", '111')
with PartitionManager() as pm:
@ -142,22 +135,15 @@ def test_replace_after_replace_failover(replace_after_replace_failover):
node5.query("ALTER TABLE test_table REPLACE PARTITION 201706 FROM real_table")
# Node5 replace is ok
assert node5.query("SELECT id FROM test_table order by id") == '222\n'
assert_eq_with_retry(node5, "SELECT id FROM test_table order by id", '222')
# Network interrupted -- replace is not ok, but it's ok
assert node6.query("SELECT id FROM test_table order by id") == '111\n'
assert_eq_with_retry(node6, "SELECT id FROM test_table order by id", '111')
#Replace partition on source node
node5.query("ALTER TABLE test_table REPLACE PARTITION 201706 FROM other_table")
assert node5.query("SELECT id FROM test_table order by id") == '333\n'
assert_eq_with_retry(node5, "SELECT id FROM test_table order by id", '333')
time.sleep(1)
# connection restored
counter = 0
while counter < 10: # will lasts forever
if 'Not found part' not in node6.query("select last_exception from system.replication_queue where type = 'REPLACE_RANGE'"):
break
time.sleep(1)
counter += 1
node6.query_with_retry("select last_exception from system.replication_queue where type = 'REPLACE_RANGE'", check_callback=lambda x: 'Not found part' not in x, sleep_time=1)
assert 'Not found part' not in node6.query("select last_exception from system.replication_queue where type = 'REPLACE_RANGE'")
assert node6.query("SELECT id FROM test_table order by id") == '333\n'
assert_eq_with_retry(node6, "SELECT id FROM test_table order by id", '333')

View File

@ -57,7 +57,10 @@ def test_SYSTEM_RELOAD_DICTIONARY(started_cluster):
def test_DROP_DNS_CACHE(started_cluster):
instance = cluster.instances['ch1']
instance.exec_in_container(['bash', '-c', 'echo 127.255.255.255 lost_host > /etc/hosts'], privileged=True, user='root')
instance.exec_in_container(['bash', '-c', 'echo 127.0.0.1 localhost > /etc/hosts'], privileged=True, user='root')
instance.exec_in_container(['bash', '-c', 'echo ::1 localhost >> /etc/hosts'], privileged=True, user='root')
instance.exec_in_container(['bash', '-c', 'echo 127.255.255.255 lost_host >> /etc/hosts'], privileged=True, user='root')
instance.query("SYSTEM DROP DNS CACHE")
with pytest.raises(QueryRuntimeException):
@ -67,7 +70,10 @@ def test_DROP_DNS_CACHE(started_cluster):
with pytest.raises(QueryRuntimeException):
instance.query("SELECT * FROM distributed_lost_host")
instance.exec_in_container(['bash', '-c', 'echo 127.0.0.1 lost_host > /etc/hosts'], privileged=True, user='root')
instance.exec_in_container(['bash', '-c', 'echo 127.0.0.1 localhost > /etc/hosts'], privileged=True, user='root')
instance.exec_in_container(['bash', '-c', 'echo ::1 localhost >> /etc/hosts'], privileged=True, user='root')
instance.exec_in_container(['bash', '-c', 'echo 127.0.0.1 lost_host >> /etc/hosts'], privileged=True, user='root')
instance.query("SYSTEM DROP DNS CACHE")
instance.query("SELECT * FROM remote('lost_host', 'system', 'one')")