diff --git a/src/Storages/System/StorageSystemDDLWorkerQueue.cpp b/src/Storages/System/StorageSystemDDLWorkerQueue.cpp index 0bed63da57c..83107f50dd4 100644 --- a/src/Storages/System/StorageSystemDDLWorkerQueue.cpp +++ b/src/Storages/System/StorageSystemDDLWorkerQueue.cpp @@ -24,7 +24,7 @@ namespace fs = std::filesystem; -enum status +enum Status { active, finished, @@ -32,13 +32,18 @@ enum status errored }; +namespace ErrorCodes +{ +extern const int BAD_ARGUMENTS; +} + std::vector> getStatusEnumsAndValues() { return std::vector>{ - {"active", static_cast(status::active)}, - {"finished", static_cast(status::finished)}, - {"unknown", static_cast(status::unknown)}, - {"errored", static_cast(status::errored)}, + {"active", static_cast(Status::active)}, + {"finished", static_cast(Status::finished)}, + {"unknown", static_cast(Status::unknown)}, + {"errored", static_cast(Status::errored)}, }; } @@ -53,6 +58,7 @@ NamesAndTypesList StorageSystemDDLWorkerQueue::getNamesAndTypes() {"port", std::make_shared()}, {"status", std::make_shared(getStatusEnumsAndValues())}, {"cluster", std::make_shared()}, + {"values", std::make_shared()}, }; } @@ -146,6 +152,8 @@ void StorageSystemDDLWorkerQueue::fillData(MutableColumns & res_columns, const C if (cluster == nullptr) throw Exception("No cluster with the name: " + cluster_name + " was found.", ErrorCodes::BAD_ARGUMENTS); + + for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index) { const auto & shard_addresses = addresses_with_failover[shard_index]; @@ -158,6 +166,12 @@ void StorageSystemDDLWorkerQueue::fillData(MutableColumns & res_columns, const C [zk: localhost:2181(CONNECTED) 53] ls /clickhouse/task_queue/ddl/query-0000000004 [active, finished] */ + std::vector> futures; + futures.reserve(queries.size()); + for (const String & q : queries) + { + futures.push_back(zookeeper->asyncTryGet(fs::path(ddl_zookeeper_path) / q)); + } for (size_t query_id = 0; query_id < queries.size(); query_id++) { size_t i = 0; @@ -174,24 +188,34 @@ void StorageSystemDDLWorkerQueue::fillData(MutableColumns & res_columns, const C // status if (std::find(active_nodes.begin(), active_nodes.end(), address.toString()) != active_nodes.end()) { - res_columns[i++]->insert(static_cast(status::active)); + res_columns[i++]->insert(static_cast(Status::active)); } else if (std::find(finished_nodes.begin(), finished_nodes.end(), address.toString()) != finished_nodes.end()) { if (pool_status[replica_index].error_count != 0) { - res_columns[i++]->insert(static_cast(status::errored)); + res_columns[i++]->insert(static_cast(Status::errored)); } else { - res_columns[i++]->insert(static_cast(status::finished)); + res_columns[i++]->insert(static_cast(Status::finished)); } } else { - res_columns[i++]->insert(static_cast(status::unknown)); + res_columns[i++]->insert(static_cast(Status::unknown)); } - res_columns[i++]->insert(cluster_name); // cluster_name from the query. + + // This is the original cluster_name from the query. In order to process the request, condition is WHERE should be triggered. + res_columns[i++]->insert(cluster_name); + + // values + if (futures.empty()) + continue; + auto res = futures[query_id].get(); + if (res.error == Coordination::Error::ZNONODE) + continue; /// Node was deleted meanwhile. + res_columns[i++]->insert(res.data); } } } diff --git a/tests/integration/test_system_ddl_worker_queue/test.py b/tests/integration/test_system_ddl_worker_queue/test.py index 7557d71ce9b..69ce6859cbb 100644 --- a/tests/integration/test_system_ddl_worker_queue/test.py +++ b/tests/integration/test_system_ddl_worker_queue/test.py @@ -9,6 +9,7 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # just import cluster setup from test_didstributed_ddl to avoid code duplication from test_distributed_ddl.cluster import ClickHouseClusterWithDDLHelpers +from helpers.test_tools import assert_eq_with_retry @pytest.fixture(scope="module", params=["configs"]) @@ -35,7 +36,7 @@ def test_cluster(request): def test_ddl_worker_queue_table_entries(test_cluster): - instance = test_cluster.instances['ch2'] + instance = test_cluster.instances['ch1'] test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS merge ON CLUSTER '{cluster}'") test_cluster.ddl_check_query(instance, """ CREATE TABLE IF NOT EXISTS merge ON CLUSTER '{cluster}' (p Date, i Int32) @@ -48,11 +49,12 @@ ENGINE = MergeTree(p, p, 1) test_cluster.ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER '{cluster}' DETACH PARTITION 197002") time.sleep(2) # query the ddl_worker_queue table to ensure that the columns are populated as expected - assert "ok\n" in instance.query( - "SELECT If((SELECT count(finished) FROM system.ddl_worker_queue WHERE name='query-0000000000') > 0, 'ok', 'fail')"); - assert "ok\n" in instance.query( - "SELECT If((SELECT count(active) FROM system.ddl_worker_queue WHERE name='query-0000000000') >= 0, 'ok', 'fail')") - + assert_eq_with_retry(instance, + "SELECT If((SELECT count(*) FROM system.ddl_worker_queue WHERE cluster='cluster' AND status='finished') > 0, 'ok', 'fail')", + "ok\n"); + assert_eq_with_retry(instance, + "SELECT If((SELECT count(*) FROM system.ddl_worker_queue WHERE cluster='cluster' AND status='active') >= 0, 'ok', 'fail')", + "ok\n") test_cluster.ddl_check_query(instance, "DROP TABLE merge ON CLUSTER '{cluster}'")