DDlWorkerQueueTable - add integration test

This commit is contained in:
bharatnc 2020-12-05 13:36:21 -08:00
parent 56a8532781
commit b008c49bd9
9 changed files with 215 additions and 0 deletions

View File

@ -0,0 +1,93 @@
<yandex>
<remote_servers>
<!-- Main cluster -->
<cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>ch1</host>
<port>9000</port>
</replica>
<replica>
<host>ch2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>ch3</host>
<port>9000</port>
</replica>
<replica>
<host>ch4</host>
<port>9000</port>
</replica>
</shard>
</cluster>
<!-- Cluster with specified default database -->
<cluster2>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>ch1</host>
<port>9000</port>
<default_database>default</default_database>
</replica>
<replica>
<host>ch2</host>
<port>9000</port>
<default_database>test2</default_database>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>ch3</host>
<port>9000</port>
<default_database>default</default_database>
</replica>
<replica>
<host>ch4</host>
<port>9000</port>
<default_database>test2</default_database>
</replica>
</shard>
</cluster2>
<!-- Cluster without replicas -->
<cluster_no_replicas>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>ch1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>ch2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>ch3</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>ch4</host>
<port>9000</port>
</replica>
</shard>
</cluster_no_replicas>
</remote_servers>
</yandex>

View File

@ -0,0 +1,8 @@
<yandex>
<distributed_ddl>
<path>/clickhouse/task_queue/ddl</path>
<max_tasks_in_queue>10</max_tasks_in_queue>
<task_max_lifetime>3600</task_max_lifetime>
<cleanup_delay_period>5</cleanup_delay_period>
</distributed_ddl>
</yandex>

View File

@ -0,0 +1,5 @@
<yandex>
<macros>
<cluster>cluster_no_replicas</cluster>
</macros>
</yandex>

View File

@ -0,0 +1,14 @@
<yandex>
<!-- Query log. Used only for queries with setting log_queries = 1. -->
<query_log>
<!-- What table to insert data. If table is not exist, it will be created.
When query log structure is changed after system update,
then old table will be renamed and new table will be created automatically.
-->
<database>system</database>
<table>query_log</table>
<!-- Interval of flushing data. -->
<flush_interval_milliseconds>1000</flush_interval_milliseconds>
</query_log>
</yandex>

View File

@ -0,0 +1,7 @@
<yandex>
<zookeeper>
<!-- Required for correct timing in current test case -->
<session_timeout_ms replace="1">3000</session_timeout_ms>
</zookeeper>
</yandex>

View File

@ -0,0 +1,8 @@
<yandex>
<profiles>
<!-- Default profile settings. -->
<default>
<log_queries>1</log_queries>
</default>
</profiles>
</yandex>

View File

@ -0,0 +1,16 @@
<yandex>
<users>
<restricted_user>
<password></password>
<profile>default</profile>
<quota>default</quota>
<networks>
<ip>::/0</ip>
</networks>
<allow_databases>
<database>db1</database>
</allow_databases>
</restricted_user>
</users>
</yandex>

View File

@ -0,0 +1,64 @@
import os
import sys
import time
from contextlib import contextmanager
import pytest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# just import cluster setup from test_didstributed_ddl to avoid code duplication
from test_distributed_ddl.cluster import ClickHouseClusterWithDDLHelpers
@pytest.fixture(scope="module", params=["configs"])
def test_cluster(request):
cluster = ClickHouseClusterWithDDLHelpers(__file__, request.param)
try:
cluster.prepare()
yield cluster
instance = cluster.instances['ch1']
cluster.ddl_check_query(instance, "DROP DATABASE test ON CLUSTER 'cluster'")
# Check query log to ensure that DDL queries are not executed twice
time.sleep(1.5)
for instance in list(cluster.instances.values()):
cluster.ddl_check_there_are_no_dublicates(instance)
cluster.pm_random_drops.heal_all()
finally:
cluster.shutdown()
def test_ddl_worker_queue_table_entries(test_cluster):
instance = test_cluster.instances['ch2']
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS merge ON CLUSTER '{cluster}'")
test_cluster.ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS merge ON CLUSTER '{cluster}' (p Date, i Int32)
ENGINE = MergeTree(p, p, 1)
""")
for i in range(0, 4, 2):
k = (i / 2) * 2
test_cluster.instances['ch{}'.format(i + 1)].query("INSERT INTO merge (i) VALUES ({})({})".format(k, k + 1))
time.sleep(5)
test_cluster.ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER '{cluster}' DETACH PARTITION 197002")
# query the ddl_worker_queue table to ensure that the columns are populated as expected
expected_finished_nodes = ['ch1:9000', 'ch3:9000', 'ch2:9000', 'ch4:9000']
for exp in expected_finished_nodes:
assert exp in instance.query("SELECT finished FROM system.ddl_worker_queue WHERE name='query-0000000000'")
assert exp not in instance.query("SELECT active FROM system.ddl_worker_queue WHERE name='query-0000000000'")
test_cluster.ddl_check_query(instance, "DROP TABLE merge ON CLUSTER '{cluster}'")
if __name__ == '__main__':
with contextmanager(test_cluster)() as ctx_cluster:
for name, instance in list(ctx_cluster.instances.items()):
print(name, instance.ip_address)
input("Cluster created, press any key to destroy...")