mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
more
This commit is contained in:
parent
1d6253b9dc
commit
e04957ce82
@ -1,6 +1,7 @@
|
||||
import os
|
||||
import subprocess as sp
|
||||
import tempfile
|
||||
import logging
|
||||
from threading import Timer
|
||||
|
||||
|
||||
@ -105,6 +106,7 @@ class CommandRequest:
|
||||
stderr = self.stderr_file.read().decode('utf-8', errors='replace')
|
||||
|
||||
if self.timer is not None and not self.process_finished_before_timeout and not self.ignore_error:
|
||||
logging.debug(f"Timed out. Last stdout:{stdout}, stderr:{stderr}")
|
||||
raise QueryTimeoutExceedException('Client timed out!')
|
||||
|
||||
if (self.process.returncode != 0 or stderr) and not self.ignore_error:
|
||||
|
@ -1,6 +1,6 @@
|
||||
<yandex>
|
||||
<zookeeper>
|
||||
<!-- Required for correct timing in current test case -->
|
||||
<session_timeout_ms replace="1">10000</session_timeout_ms>
|
||||
<session_timeout_ms replace="1">15000</session_timeout_ms>
|
||||
</zookeeper>
|
||||
</yandex>
|
||||
|
@ -1,6 +1,6 @@
|
||||
<yandex>
|
||||
<zookeeper>
|
||||
<!-- Required for correct timing in current test case -->
|
||||
<session_timeout_ms replace="1">10000</session_timeout_ms>
|
||||
<session_timeout_ms replace="1">15000</session_timeout_ms>
|
||||
</zookeeper>
|
||||
</yandex>
|
||||
|
@ -93,27 +93,6 @@ def test_on_server_fail(test_cluster):
|
||||
test_cluster.ddl_check_query(instance, "DROP TABLE test.test_server_fail ON CLUSTER 'cluster'")
|
||||
|
||||
|
||||
def _test_on_connection_losses(test_cluster, zk_timeout):
|
||||
instance = test_cluster.instances['ch1']
|
||||
kill_instance = test_cluster.instances['ch2']
|
||||
|
||||
with PartitionManager() as pm:
|
||||
pm.drop_instance_zk_connections(kill_instance)
|
||||
request = instance.get_query_request("DROP TABLE IF EXISTS test.__nope__ ON CLUSTER 'cluster'", timeout=40)
|
||||
time.sleep(zk_timeout)
|
||||
pm.restore_instance_zk_connections(kill_instance)
|
||||
|
||||
test_cluster.check_all_hosts_successfully_executed(request.get_answer())
|
||||
|
||||
|
||||
def test_on_connection_loss(test_cluster):
|
||||
_test_on_connection_losses(test_cluster, 15) # connection loss will occur only (10 sec ZK timeout in config)
|
||||
|
||||
|
||||
def test_on_session_expired(test_cluster):
|
||||
_test_on_connection_losses(test_cluster, 60) # session should be expired (10 sec ZK timeout in config)
|
||||
|
||||
|
||||
def test_simple_alters(test_cluster):
|
||||
instance = test_cluster.instances['ch2']
|
||||
|
||||
@ -191,7 +170,7 @@ def test_implicit_macros(test_cluster):
|
||||
|
||||
instance = test_cluster.instances['ch2']
|
||||
|
||||
test_cluster.ddl_check_query(instance, "DROP DATABASE IF EXISTS test_db ON CLUSTER '{cluster}'")
|
||||
test_cluster.ddl_check_query(instance, "DROP DATABASE IF EXISTS test_db ON CLUSTER '{cluster}' SYNC")
|
||||
test_cluster.ddl_check_query(instance, "CREATE DATABASE IF NOT EXISTS test_db ON CLUSTER '{cluster}'")
|
||||
|
||||
test_cluster.ddl_check_query(instance, """
|
||||
@ -271,6 +250,15 @@ def test_create_reserved(test_cluster):
|
||||
def test_rename(test_cluster):
|
||||
instance = test_cluster.instances['ch1']
|
||||
rules = test_cluster.pm_random_drops.pop_rules()
|
||||
test_cluster.ddl_check_query(instance,
|
||||
"DROP TABLE IF EXISTS rename_shard ON CLUSTER cluster SYNC")
|
||||
test_cluster.ddl_check_query(instance,
|
||||
"DROP TABLE IF EXISTS rename_new ON CLUSTER cluster SYNC")
|
||||
test_cluster.ddl_check_query(instance,
|
||||
"DROP TABLE IF EXISTS rename_old ON CLUSTER cluster SYNC")
|
||||
test_cluster.ddl_check_query(instance,
|
||||
"DROP TABLE IF EXISTS rename ON CLUSTER cluster SYNC")
|
||||
|
||||
test_cluster.ddl_check_query(instance,
|
||||
"CREATE TABLE rename_shard ON CLUSTER cluster (id Int64, sid String DEFAULT concat('old', toString(id))) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/staging/test_shard', '{replica}') ORDER BY (id)")
|
||||
test_cluster.ddl_check_query(instance,
|
||||
@ -327,12 +315,15 @@ def test_socket_timeout(test_cluster):
|
||||
def test_replicated_without_arguments(test_cluster):
|
||||
rules = test_cluster.pm_random_drops.pop_rules()
|
||||
instance = test_cluster.instances['ch1']
|
||||
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS test_atomic.rmt ON CLUSTER cluster SYNC")
|
||||
test_cluster.ddl_check_query(instance, "DROP DATABASE IF EXISTS test_atomic ON CLUSTER cluster SYNC")
|
||||
|
||||
test_cluster.ddl_check_query(instance, "CREATE DATABASE test_atomic ON CLUSTER cluster ENGINE=Atomic")
|
||||
assert "are supported only for ON CLUSTER queries with Atomic database engine" in \
|
||||
instance.query_and_get_error("CREATE TABLE test_atomic.rmt (n UInt64, s String) ENGINE=ReplicatedMergeTree ORDER BY n")
|
||||
test_cluster.ddl_check_query(instance,
|
||||
"CREATE TABLE test_atomic.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree() ORDER BY n")
|
||||
test_cluster.ddl_check_query(instance, "DROP TABLE test_atomic.rmt ON CLUSTER cluster")
|
||||
test_cluster.ddl_check_query(instance, "DROP TABLE test_atomic.rmt ON CLUSTER cluster SYNC")
|
||||
test_cluster.ddl_check_query(instance,
|
||||
"CREATE TABLE test_atomic.rmt UUID '12345678-0000-4000-8000-000000000001' ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree ORDER BY n")
|
||||
assert instance.query("SHOW CREATE test_atomic.rmt FORMAT TSVRaw") == \
|
||||
@ -350,7 +341,7 @@ def test_replicated_without_arguments(test_cluster):
|
||||
"CREATE TABLE test_atomic.rsmt ON CLUSTER cluster (n UInt64, m UInt64, k UInt64) ENGINE=ReplicatedSummingMergeTree((m, k)) ORDER BY n")
|
||||
test_cluster.ddl_check_query(instance,
|
||||
"CREATE TABLE test_atomic.rvcmt ON CLUSTER cluster (n UInt64, m Int8, k UInt64) ENGINE=ReplicatedVersionedCollapsingMergeTree(m, k) ORDER BY n")
|
||||
test_cluster.ddl_check_query(instance, "DROP DATABASE test_atomic ON CLUSTER cluster")
|
||||
test_cluster.ddl_check_query(instance, "DROP DATABASE test_atomic ON CLUSTER cluster SYNC")
|
||||
|
||||
test_cluster.ddl_check_query(instance, "CREATE DATABASE test_ordinary ON CLUSTER cluster ENGINE=Ordinary")
|
||||
assert "are supported only for ON CLUSTER queries with Atomic database engine" in \
|
||||
@ -360,7 +351,7 @@ def test_replicated_without_arguments(test_cluster):
|
||||
test_cluster.ddl_check_query(instance, "CREATE TABLE test_ordinary.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree('/{shard}/{table}/', '{replica}') ORDER BY n")
|
||||
assert instance.query("SHOW CREATE test_ordinary.rmt FORMAT TSVRaw") == \
|
||||
"CREATE TABLE test_ordinary.rmt\n(\n `n` UInt64,\n `s` String\n)\nENGINE = ReplicatedMergeTree('/{shard}/rmt/', '{replica}')\nORDER BY n\nSETTINGS index_granularity = 8192\n"
|
||||
test_cluster.ddl_check_query(instance, "DROP DATABASE test_ordinary ON CLUSTER cluster")
|
||||
test_cluster.ddl_check_query(instance, "DROP DATABASE test_ordinary ON CLUSTER cluster SYNC")
|
||||
test_cluster.pm_random_drops.push_rules(rules)
|
||||
|
||||
|
||||
|
@ -38,9 +38,9 @@ def test_cluster(request):
|
||||
def test_replicated_alters(test_cluster):
|
||||
instance = test_cluster.instances['ch2']
|
||||
|
||||
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS merge_for_alter ON CLUSTER cluster")
|
||||
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_32 ON CLUSTER cluster")
|
||||
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_64 ON CLUSTER cluster")
|
||||
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS merge_for_alter ON CLUSTER cluster SYNC")
|
||||
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_32 ON CLUSTER cluster SYNC")
|
||||
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_64 ON CLUSTER cluster SYNC")
|
||||
|
||||
# Temporarily disable random ZK packet drops, they might broke creation if ReplicatedMergeTree replicas
|
||||
firewall_drops_rules = test_cluster.pm_random_drops.pop_rules()
|
||||
@ -90,10 +90,10 @@ ENGINE = Distributed(cluster, default, merge_for_alter, i)
|
||||
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(
|
||||
''.join(['{}\t{}\n'.format(x, x) for x in range(4)]))
|
||||
|
||||
test_cluster.ddl_check_query(instance, "DROP TABLE merge_for_alter ON CLUSTER cluster")
|
||||
test_cluster.ddl_check_query(instance, "DROP TABLE merge_for_alter ON CLUSTER cluster SYNC")
|
||||
|
||||
# Enable random ZK packet drops
|
||||
test_cluster.pm_random_drops.push_rules(firewall_drops_rules)
|
||||
|
||||
test_cluster.ddl_check_query(instance, "DROP TABLE all_merge_32 ON CLUSTER cluster")
|
||||
test_cluster.ddl_check_query(instance, "DROP TABLE all_merge_64 ON CLUSTER cluster")
|
||||
test_cluster.ddl_check_query(instance, "DROP TABLE all_merge_32 ON CLUSTER cluster SYNC")
|
||||
test_cluster.ddl_check_query(instance, "DROP TABLE all_merge_64 ON CLUSTER cluster SYNC")
|
||||
|
@ -21,16 +21,27 @@ create_table_sql_template = """
|
||||
PRIMARY KEY (`id`)) ENGINE=InnoDB;
|
||||
"""
|
||||
|
||||
def create_mysql_db(conn, name):
|
||||
with conn.cursor() as cursor:
|
||||
cursor.execute(
|
||||
"CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(name))
|
||||
drop_table_sql_template = """
|
||||
DROP TABLE IF EXISTS `clickhouse`.`{}`;
|
||||
"""
|
||||
|
||||
def get_mysql_conn(started_cluster, host):
|
||||
conn = pymysql.connect(user='root', password='clickhouse', host=host, port=started_cluster.mysql_port)
|
||||
return conn
|
||||
|
||||
def create_mysql_table(conn, tableName):
|
||||
with conn.cursor() as cursor:
|
||||
cursor.execute(create_table_sql_template.format(tableName))
|
||||
|
||||
def drop_mysql_table(conn, tableName):
|
||||
with conn.cursor() as cursor:
|
||||
cursor.execute(drop_table_sql_template.format(tableName))
|
||||
|
||||
def create_mysql_db(conn, name):
|
||||
with conn.cursor() as cursor:
|
||||
cursor.execute("DROP DATABASE IF EXISTS {}".format(name))
|
||||
cursor.execute("CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(name))
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
@ -51,7 +62,10 @@ def started_cluster():
|
||||
|
||||
def test_many_connections(started_cluster):
|
||||
table_name = 'test_many_connections'
|
||||
node1.query(f'DROP TABLE IF EXISTS {table_name}')
|
||||
|
||||
conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
|
||||
drop_mysql_table(conn, table_name)
|
||||
create_mysql_table(conn, table_name)
|
||||
|
||||
node1.query('''
|
||||
@ -66,14 +80,18 @@ CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL
|
||||
query += "SELECT id FROM {t})"
|
||||
|
||||
assert node1.query(query.format(t=table_name)) == '250\n'
|
||||
drop_mysql_table(conn, table_name)
|
||||
conn.close()
|
||||
|
||||
|
||||
def test_insert_select(started_cluster):
|
||||
table_name = 'test_insert_select'
|
||||
node1.query(f'DROP TABLE IF EXISTS {table_name}')
|
||||
conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
|
||||
drop_mysql_table(conn, table_name)
|
||||
create_mysql_table(conn, table_name)
|
||||
|
||||
|
||||
node1.query('''
|
||||
CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL('mysql57:3306', 'clickhouse', '{}', 'root', 'clickhouse');
|
||||
'''.format(table_name, table_name))
|
||||
@ -87,7 +105,9 @@ CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL
|
||||
|
||||
def test_replace_select(started_cluster):
|
||||
table_name = 'test_replace_select'
|
||||
node1.query(f'DROP TABLE IF EXISTS {table_name}')
|
||||
conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
|
||||
drop_mysql_table(conn, table_name)
|
||||
create_mysql_table(conn, table_name)
|
||||
|
||||
node1.query('''
|
||||
@ -106,7 +126,9 @@ CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL
|
||||
|
||||
def test_insert_on_duplicate_select(started_cluster):
|
||||
table_name = 'test_insert_on_duplicate_select'
|
||||
node1.query(f'DROP TABLE IF EXISTS {table_name}')
|
||||
conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
|
||||
drop_mysql_table(conn, table_name)
|
||||
create_mysql_table(conn, table_name)
|
||||
|
||||
node1.query('''
|
||||
@ -125,7 +147,10 @@ CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL
|
||||
|
||||
def test_where(started_cluster):
|
||||
table_name = 'test_where'
|
||||
node1.query(f'DROP TABLE IF EXISTS {table_name}')
|
||||
|
||||
conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
|
||||
drop_mysql_table(conn, table_name)
|
||||
create_mysql_table(conn, table_name)
|
||||
node1.query('''
|
||||
CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL('mysql57:3306', 'clickhouse', '{}', 'root', 'clickhouse');
|
||||
@ -146,6 +171,7 @@ CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL
|
||||
|
||||
def test_table_function(started_cluster):
|
||||
conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
|
||||
drop_mysql_table(conn, 'table_function')
|
||||
create_mysql_table(conn, 'table_function')
|
||||
table_function = "mysql('mysql57:3306', 'clickhouse', '{}', 'root', 'clickhouse')".format('table_function')
|
||||
assert node1.query("SELECT count() FROM {}".format(table_function)).rstrip() == '0'
|
||||
@ -168,6 +194,8 @@ def test_table_function(started_cluster):
|
||||
|
||||
def test_binary_type(started_cluster):
|
||||
conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
|
||||
drop_mysql_table(conn, 'binary_type')
|
||||
|
||||
with conn.cursor() as cursor:
|
||||
cursor.execute("CREATE TABLE clickhouse.binary_type (id INT PRIMARY KEY, data BINARY(16) NOT NULL)")
|
||||
table_function = "mysql('mysql57:3306', 'clickhouse', '{}', 'root', 'clickhouse')".format('binary_type')
|
||||
@ -177,7 +205,10 @@ def test_binary_type(started_cluster):
|
||||
|
||||
def test_enum_type(started_cluster):
|
||||
table_name = 'test_enum_type'
|
||||
node1.query(f'DROP TABLE IF EXISTS {table_name}')
|
||||
|
||||
conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
|
||||
drop_mysql_table(conn, table_name)
|
||||
create_mysql_table(conn, table_name)
|
||||
node1.query('''
|
||||
CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32, source Enum8('IP' = 1, 'URL' = 2)) ENGINE = MySQL('mysql57:3306', 'clickhouse', '{}', 'root', 'clickhouse', 1);
|
||||
@ -186,20 +217,8 @@ CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32, source Enum8('
|
||||
assert node1.query("SELECT source FROM {} LIMIT 1".format(table_name)).rstrip() == 'URL'
|
||||
conn.close()
|
||||
|
||||
def get_mysql_conn(started_cluster, host):
|
||||
conn = pymysql.connect(user='root', password='clickhouse', host=host, port=started_cluster.mysql_port)
|
||||
return conn
|
||||
|
||||
|
||||
def create_mysql_db(conn, name):
|
||||
with conn.cursor() as cursor:
|
||||
cursor.execute("DROP DATABASE IF EXISTS {}".format(name))
|
||||
cursor.execute("CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(name))
|
||||
|
||||
def create_mysql_table(conn, tableName):
|
||||
with conn.cursor() as cursor:
|
||||
cursor.execute(create_table_sql_template.format(tableName))
|
||||
|
||||
def test_mysql_distributed(started_cluster):
|
||||
table_name = 'test_replicas'
|
||||
|
||||
@ -218,6 +237,8 @@ def test_mysql_distributed(started_cluster):
|
||||
create_mysql_table(conn3, table_name)
|
||||
create_mysql_table(conn4, table_name)
|
||||
|
||||
node2.query('DROP TABLE IF EXISTS test_replicas')
|
||||
|
||||
# Storage with with 3 replicas
|
||||
node2.query('''
|
||||
CREATE TABLE test_replicas
|
||||
@ -227,6 +248,7 @@ def test_mysql_distributed(started_cluster):
|
||||
# Fill remote tables with different data to be able to check
|
||||
nodes = [node1, node2, node2, node2]
|
||||
for i in range(1, 5):
|
||||
nodes[i-1].query('DROP TABLE IF EXISTS test_replica{}'.format(i))
|
||||
nodes[i-1].query('''
|
||||
CREATE TABLE test_replica{}
|
||||
(id UInt32, name String, age UInt32, money UInt32)
|
||||
@ -249,6 +271,8 @@ def test_mysql_distributed(started_cluster):
|
||||
assert(result == 'host2\nhost3\nhost4\n')
|
||||
|
||||
# Storage with with two shards, each has 2 replicas
|
||||
node2.query('DROP TABLE IF EXISTS test_shards')
|
||||
|
||||
node2.query('''
|
||||
CREATE TABLE test_shards
|
||||
(id UInt32, name String, age UInt32, money UInt32)
|
||||
@ -275,9 +299,12 @@ def test_mysql_distributed(started_cluster):
|
||||
|
||||
def test_external_settings(started_cluster):
|
||||
table_name = 'test_external_settings'
|
||||
node1.query(f'DROP TABLE IF EXISTS {table_name}')
|
||||
conn = get_mysql_conn(started_cluster, started_cluster.mysql_ip)
|
||||
drop_mysql_table(conn, table_name)
|
||||
create_mysql_table(conn, table_name)
|
||||
|
||||
node3.query(f'DROP TABLE IF EXISTS {table_name}')
|
||||
node3.query('''
|
||||
CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL('mysql57:3306', 'clickhouse', '{}', 'root', 'clickhouse');
|
||||
'''.format(table_name, table_name))
|
||||
|
Loading…
Reference in New Issue
Block a user