few more changes and simplification to integration test

This commit is contained in:
bharatnc 2020-12-14 22:57:31 -08:00
parent c796b0c1cb
commit 24d67082a9
11 changed files with 65 additions and 205 deletions

View File

@ -90,7 +90,7 @@ NamesAndTypesList StorageSystemDDLWorkerQueue::getNamesAndTypes()
{"port", std::make_shared<DataTypeUInt16>()},
{"status", std::make_shared<DataTypeEnum8>(getStatusEnumsAndValues())},
{"cluster", std::make_shared<DataTypeString>()},
{"values", std::make_shared<DataTypeString>()},
{"value", std::make_shared<DataTypeString>()},
{"query_start_time", std::make_shared<DataTypeDateTime>()},
{"query_finish_time", std::make_shared<DataTypeDateTime>()},
{"query_duration_ms", std::make_shared<DataTypeUInt64>()},
@ -275,7 +275,7 @@ void StorageSystemDDLWorkerQueue::fillData(MutableColumns & res_columns, const C
Coordination::GetResponse res;
if (!futures.empty())
res = futures[query_id].get();
res_columns[i++]->insert(res.data); // values
res_columns[i++]->insert(res.data); // value
auto query_start_time = res.stat.mtime;
res_columns[i++]->insert(UInt64(query_start_time / 1000)); // query_start_time

View File

@ -136,7 +136,7 @@ void attachSystemTablesServer(IDatabase & system_database, bool has_zookeeper)
attach<StorageSystemMutations>(system_database, "mutations");
attach<StorageSystemReplicas>(system_database, "replicas");
attach<StorageSystemReplicationQueue>(system_database, "replication_queue");
attach<StorageSystemDDLWorkerQueue>(system_database, "ddl_worker_queue");
attach<StorageSystemDDLWorkerQueue>(system_database, "distributed_ddl_queue");
attach<StorageSystemDistributionQueue>(system_database, "distribution_queue");
attach<StorageSystemDictionaries>(system_database, "dictionaries");
attach<StorageSystemModels>(system_database, "models");

View File

@ -1,93 +0,0 @@
<yandex>
<remote_servers>
<!-- Main cluster -->
<cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>ch1</host>
<port>9000</port>
</replica>
<replica>
<host>ch2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>ch3</host>
<port>9000</port>
</replica>
<replica>
<host>ch4</host>
<port>9000</port>
</replica>
</shard>
</cluster>
<!-- Cluster with specified default database -->
<cluster2>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>ch1</host>
<port>9000</port>
<default_database>default</default_database>
</replica>
<replica>
<host>ch2</host>
<port>9000</port>
<default_database>test2</default_database>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>ch3</host>
<port>9000</port>
<default_database>default</default_database>
</replica>
<replica>
<host>ch4</host>
<port>9000</port>
<default_database>test2</default_database>
</replica>
</shard>
</cluster2>
<!-- Cluster without replicas -->
<cluster_no_replicas>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>ch1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>ch2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>ch3</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>ch4</host>
<port>9000</port>
</replica>
</shard>
</cluster_no_replicas>
</remote_servers>
</yandex>

View File

@ -1,8 +0,0 @@
<yandex>
<distributed_ddl>
<path>/clickhouse/task_queue/ddl</path>
<max_tasks_in_queue>10</max_tasks_in_queue>
<task_max_lifetime>3600</task_max_lifetime>
<cleanup_delay_period>5</cleanup_delay_period>
</distributed_ddl>
</yandex>

View File

@ -1,5 +0,0 @@
<yandex>
<macros>
<cluster>cluster_no_replicas</cluster>
</macros>
</yandex>

View File

@ -1,14 +0,0 @@
<yandex>
<!-- Query log. Used only for queries with setting log_queries = 1. -->
<query_log>
<!-- What table to insert data. If table is not exist, it will be created.
When query log structure is changed after system update,
then old table will be renamed and new table will be created automatically.
-->
<database>system</database>
<table>query_log</table>
<!-- Interval of flushing data. -->
<flush_interval_milliseconds>1000</flush_interval_milliseconds>
</query_log>
</yandex>

View File

@ -1,7 +0,0 @@
<yandex>
<zookeeper>
<!-- Required for correct timing in current test case -->
<session_timeout_ms replace="1">3000</session_timeout_ms>
</zookeeper>
</yandex>

View File

@ -0,0 +1,28 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
<replica>
<host>node4</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>

View File

@ -1,8 +0,0 @@
<yandex>
<profiles>
<!-- Default profile settings. -->
<default>
<log_queries>1</log_queries>
</default>
</profiles>
</yandex>

View File

@ -1,16 +0,0 @@
<yandex>
<users>
<restricted_user>
<password></password>
<profile>default</profile>
<quota>default</quota>
<networks>
<ip>::/0</ip>
</networks>
<allow_databases>
<database>db1</database>
</allow_databases>
</restricted_user>
</users>
</yandex>

View File

@ -1,65 +1,48 @@
import os
import sys
import time
from contextlib import contextmanager
import pytest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from helpers.cluster import ClickHouseCluster
# just import cluster setup from test_didstributed_ddl to avoid code duplication
from test_distributed_ddl.cluster import ClickHouseClusterWithDDLHelpers
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
node3 = cluster.add_instance('node3', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
node4 = cluster.add_instance('node4', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
nodes = [node1, node2, node3, node4]
@pytest.fixture(scope="module", params=["configs"])
def test_cluster(request):
cluster = ClickHouseClusterWithDDLHelpers(__file__, request.param)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.prepare()
cluster.start()
for i, node in enumerate([node1, node2]):
node.query("CREATE DATABASE testdb")
node.query(
'''CREATE TABLE testdb.test_table(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/test_table1', '{}') ORDER BY id;'''.format(
i))
for i, node in enumerate([node3, node4]):
node.query("CREATE DATABASE testdb")
node.query(
'''CREATE TABLE testdb.test_table(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/test_table2', '{}') ORDER BY id;'''.format(
i))
yield cluster
instance = cluster.instances['ch1']
cluster.ddl_check_query(instance, "DROP DATABASE test ON CLUSTER 'cluster'")
# Check query log to ensure that DDL queries are not executed twice
time.sleep(1.5)
for instance in list(cluster.instances.values()):
cluster.ddl_check_there_are_no_dublicates(instance)
cluster.pm_random_drops.heal_all()
finally:
cluster.shutdown()
def test_ddl_worker_queue_table_entries(test_cluster):
instance = test_cluster.instances['ch1']
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS merge ON CLUSTER '{cluster}'")
test_cluster.ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS merge ON CLUSTER '{cluster}' (p Date, i Int32)
ENGINE = MergeTree(p, p, 1)
""")
for i in range(0, 4, 2):
k = (i / 2) * 2
test_cluster.instances['ch{}'.format(i + 1)].query("INSERT INTO merge (i) VALUES ({})({})".format(k, k + 1))
time.sleep(2)
test_cluster.ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER '{cluster}' DETACH PARTITION 197002")
time.sleep(2)
# query the ddl_worker_queue table to ensure that the columns are populated as expected
assert_eq_with_retry(instance,
"SELECT If((SELECT count(*) FROM system.ddl_worker_queue WHERE cluster='cluster' AND status='finished') > 0, 'ok', 'fail')",
"ok\n");
assert_eq_with_retry(instance,
"SELECT If((SELECT count(*) FROM system.ddl_worker_queue WHERE cluster='cluster' AND status='active') >= 0, 'ok', 'fail')",
"ok\n")
test_cluster.ddl_check_query(instance, "DROP TABLE merge ON CLUSTER '{cluster}'")
def test_distributed_ddl_queue(started_cluster):
node1.query("INSERT INTO testdb.test_table SELECT number, toString(number) FROM numbers(100)")
node3.query("INSERT INTO testdb.test_table SELECT number, toString(number) FROM numbers(100)")
node2.query("SYSTEM SYNC REPLICA testdb.test_table")
node4.query("SYSTEM SYNC REPLICA testdb.test_table")
if __name__ == '__main__':
with contextmanager(test_cluster)() as ctx_cluster:
for name, instance in list(ctx_cluster.instances.items()):
print(name, instance.ip_address)
input("Cluster created, press any key to destroy...")
node1.query("ALTER TABLE testdb.test_table ON CLUSTER test_cluster ADD COLUMN somecolumn UInt8 AFTER val",
settings={"replication_alter_partitions_sync": "2"})
for node in nodes:
node.query("SYSTEM SYNC REPLICA testdb.test_table")
assert node.query("SELECT somecolumn FROM testdb.test_table LIMIT 1") == "0\n"
assert node.query(
"SELECT If((SELECT count(*) FROM system.distributed_ddl_queue WHERE cluster='test_cluster' AND entry='query-0000000000') > 0, 'ok', 'fail')") == "ok\n"