Add values column and fix tests

This commit is contained in:
bharatnc 2020-12-14 09:33:20 -08:00
parent 2a122905f1
commit 57126d1901
2 changed files with 42 additions and 16 deletions

View File

@ -24,7 +24,7 @@
namespace fs = std::filesystem;
enum status
enum Status
{
active,
finished,
@ -32,13 +32,18 @@ enum status
errored
};
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
std::vector<std::pair<String, Int8>> getStatusEnumsAndValues()
{
return std::vector<std::pair<String, Int8>>{
{"active", static_cast<Int8>(status::active)},
{"finished", static_cast<Int8>(status::finished)},
{"unknown", static_cast<Int8>(status::unknown)},
{"errored", static_cast<Int8>(status::errored)},
{"active", static_cast<Int8>(Status::active)},
{"finished", static_cast<Int8>(Status::finished)},
{"unknown", static_cast<Int8>(Status::unknown)},
{"errored", static_cast<Int8>(Status::errored)},
};
}
@ -53,6 +58,7 @@ NamesAndTypesList StorageSystemDDLWorkerQueue::getNamesAndTypes()
{"port", std::make_shared<DataTypeUInt16>()},
{"status", std::make_shared<DataTypeEnum8>(getStatusEnumsAndValues())},
{"cluster", std::make_shared<DataTypeString>()},
{"values", std::make_shared<DataTypeString>()},
};
}
@ -146,6 +152,8 @@ void StorageSystemDDLWorkerQueue::fillData(MutableColumns & res_columns, const C
if (cluster == nullptr)
throw Exception("No cluster with the name: " + cluster_name + " was found.", ErrorCodes::BAD_ARGUMENTS);
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
{
const auto & shard_addresses = addresses_with_failover[shard_index];
@ -158,6 +166,12 @@ void StorageSystemDDLWorkerQueue::fillData(MutableColumns & res_columns, const C
[zk: localhost:2181(CONNECTED) 53] ls /clickhouse/task_queue/ddl/query-0000000004
[active, finished]
*/
std::vector<std::future<Coordination::GetResponse>> futures;
futures.reserve(queries.size());
for (const String & q : queries)
{
futures.push_back(zookeeper->asyncTryGet(fs::path(ddl_zookeeper_path) / q));
}
for (size_t query_id = 0; query_id < queries.size(); query_id++)
{
size_t i = 0;
@ -174,24 +188,34 @@ void StorageSystemDDLWorkerQueue::fillData(MutableColumns & res_columns, const C
// status
if (std::find(active_nodes.begin(), active_nodes.end(), address.toString()) != active_nodes.end())
{
res_columns[i++]->insert(static_cast<Int8>(status::active));
res_columns[i++]->insert(static_cast<Int8>(Status::active));
}
else if (std::find(finished_nodes.begin(), finished_nodes.end(), address.toString()) != finished_nodes.end())
{
if (pool_status[replica_index].error_count != 0)
{
res_columns[i++]->insert(static_cast<Int8>(status::errored));
res_columns[i++]->insert(static_cast<Int8>(Status::errored));
}
else
{
res_columns[i++]->insert(static_cast<Int8>(status::finished));
res_columns[i++]->insert(static_cast<Int8>(Status::finished));
}
}
else
{
res_columns[i++]->insert(static_cast<Int8>(status::unknown));
res_columns[i++]->insert(static_cast<Int8>(Status::unknown));
}
res_columns[i++]->insert(cluster_name); // cluster_name from the query.
// This is the original cluster_name from the query. In order to process the request, condition is WHERE should be triggered.
res_columns[i++]->insert(cluster_name);
// values
if (futures.empty())
continue;
auto res = futures[query_id].get();
if (res.error == Coordination::Error::ZNONODE)
continue; /// Node was deleted meanwhile.
res_columns[i++]->insert(res.data);
}
}
}

View File

@ -9,6 +9,7 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# just import cluster setup from test_didstributed_ddl to avoid code duplication
from test_distributed_ddl.cluster import ClickHouseClusterWithDDLHelpers
from helpers.test_tools import assert_eq_with_retry
@pytest.fixture(scope="module", params=["configs"])
@ -35,7 +36,7 @@ def test_cluster(request):
def test_ddl_worker_queue_table_entries(test_cluster):
instance = test_cluster.instances['ch2']
instance = test_cluster.instances['ch1']
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS merge ON CLUSTER '{cluster}'")
test_cluster.ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS merge ON CLUSTER '{cluster}' (p Date, i Int32)
@ -48,11 +49,12 @@ ENGINE = MergeTree(p, p, 1)
test_cluster.ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER '{cluster}' DETACH PARTITION 197002")
time.sleep(2)
# query the ddl_worker_queue table to ensure that the columns are populated as expected
assert "ok\n" in instance.query(
"SELECT If((SELECT count(finished) FROM system.ddl_worker_queue WHERE name='query-0000000000') > 0, 'ok', 'fail')");
assert "ok\n" in instance.query(
"SELECT If((SELECT count(active) FROM system.ddl_worker_queue WHERE name='query-0000000000') >= 0, 'ok', 'fail')")
assert_eq_with_retry(instance,
"SELECT If((SELECT count(*) FROM system.ddl_worker_queue WHERE cluster='cluster' AND status='finished') > 0, 'ok', 'fail')",
"ok\n");
assert_eq_with_retry(instance,
"SELECT If((SELECT count(*) FROM system.ddl_worker_queue WHERE cluster='cluster' AND status='active') >= 0, 'ok', 'fail')",
"ok\n")
test_cluster.ddl_check_query(instance, "DROP TABLE merge ON CLUSTER '{cluster}'")