Fix START MERGES and add test

This commit is contained in:
zhangxiao871 2022-01-28 14:30:57 +08:00
parent d1bc65d684
commit 78e2794ada
2 changed files with 105 additions and 17 deletions

View File

@ -112,13 +112,8 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
} }
case Type::RELOAD_MODEL: case Type::RELOAD_MODEL:
{ {
String cluster_str; parseQueryWithOnCluster(res, pos, expected);
if (ParserKeyword{"ON"}.ignore(pos, expected))
{
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
return false;
}
res->cluster = cluster_str;
ASTPtr ast; ASTPtr ast;
if (ParserStringLiteral{}.parse(pos, ast, expected)) if (ParserStringLiteral{}.parse(pos, ast, expected))
{ {
@ -141,13 +136,8 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
} }
case Type::RELOAD_FUNCTION: case Type::RELOAD_FUNCTION:
{ {
String cluster_str; parseQueryWithOnCluster(res, pos, expected);
if (ParserKeyword{"ON"}.ignore(pos, expected))
{
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
return false;
}
res->cluster = cluster_str;
ASTPtr ast; ASTPtr ast;
if (ParserStringLiteral{}.parse(pos, ast, expected)) if (ParserStringLiteral{}.parse(pos, ast, expected))
{ {
@ -253,12 +243,10 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
case Type::STOP_MERGES: case Type::STOP_MERGES:
case Type::START_MERGES: case Type::START_MERGES:
{ {
parseQueryWithOnCluster(res, pos, expected);
String storage_policy_str; String storage_policy_str;
String volume_str; String volume_str;
if (ParserKeyword{"ON VOLUME"}.ignore(pos, expected)) auto parse_on_volume = [&]() -> bool
{ {
ASTPtr ast; ASTPtr ast;
if (ParserIdentifier{}.parse(pos, ast, expected)) if (ParserIdentifier{}.parse(pos, ast, expected))
@ -273,7 +261,25 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
volume_str = ast->as<ASTIdentifier &>().name(); volume_str = ast->as<ASTIdentifier &>().name();
else else
return false; return false;
return true;
};
if (ParserKeyword{"ON VOLUME"}.ignore(pos, expected))
{
if (!parse_on_volume())
return false;
} }
else
{
parseQueryWithOnCluster(res, pos, expected);
if (ParserKeyword{"ON VOLUME"}.ignore(pos, expected))
{
if (!parse_on_volume())
return false;
}
}
res->storage_policy = storage_policy_str; res->storage_policy = storage_policy_str;
res->volume = volume_str; res->volume = volume_str;
if (res->volume.empty() && res->storage_policy.empty()) if (res->volume.empty() && res->storage_policy.empty())

View File

@ -1135,6 +1135,70 @@ def test_simple_replication_and_moves(start_cluster):
for node in [node1, node2]: for node in [node1, node2]:
node.query("DROP TABLE IF EXISTS replicated_table_for_moves SYNC") node.query("DROP TABLE IF EXISTS replicated_table_for_moves SYNC")
def test_simple_replication_and_moves_on_cluster(start_cluster):
try:
for i, node in enumerate([node1, node2]):
node.query_with_retry("""
CREATE TABLE IF NOT EXISTS replicated_table_for_moves (
s1 String
) ENGINE = ReplicatedMergeTree('/clickhouse/replicated_table_for_moves', '{}')
ORDER BY tuple()
SETTINGS storage_policy='moving_jbod_with_external', old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=2
""".format(i + 1))
def insert(num):
for i in range(num):
node = random.choice([node1, node2])
data = [] # 1MB in total
for i in range(2):
data.append(get_random_string(512 * 1024)) # 500KB value
node.query_with_retry("INSERT INTO replicated_table_for_moves VALUES {}".format(
','.join(["('" + x + "')" for x in data])))
def optimize(num):
for i in range(num):
node = random.choice([node1, node2])
node.query_with_retry("OPTIMIZE TABLE replicated_table_for_moves FINAL")
p = Pool(60)
tasks = []
tasks.append(p.apply_async(insert, (20,)))
tasks.append(p.apply_async(optimize, (20,)))
for task in tasks:
task.get(timeout=60)
node1.query_with_retry("SYSTEM SYNC REPLICA ON CLUSTER test_cluster replicated_table_for_moves", timeout=5)
node1.query("SELECT COUNT() FROM replicated_table_for_moves") == "40\n"
node2.query("SELECT COUNT() FROM replicated_table_for_moves") == "40\n"
data = [] # 1MB in total
for i in range(2):
data.append(get_random_string(512 * 1024)) # 500KB value
time.sleep(3) # wait until old parts will be deleted
node1.query("SYSTEM STOP MERGES")
node2.query("SYSTEM STOP MERGES")
node1.query_with_retry(
"INSERT INTO replicated_table_for_moves VALUES {}".format(','.join(["('" + x + "')" for x in data])))
node2.query_with_retry(
"INSERT INTO replicated_table_for_moves VALUES {}".format(','.join(["('" + x + "')" for x in data])))
time.sleep(3) # nothing was moved
disks1 = get_used_disks_for_table(node1, "replicated_table_for_moves")
disks2 = get_used_disks_for_table(node2, "replicated_table_for_moves")
node2.query("SYSTEM START MERGES ON CLUSTER test_cluster")
set(disks1) == set(["jbod1", "external"])
set(disks2) == set(["jbod1", "external"])
finally:
for node in [node1, node2]:
node.query("DROP TABLE IF EXISTS replicated_table_for_moves SYNC")
def test_download_appropriate_disk(start_cluster): def test_download_appropriate_disk(start_cluster):
try: try:
@ -1513,6 +1577,24 @@ def test_no_merges_in_configuration_allow_from_query_with_reload(start_cluster):
finally: finally:
node1.query("SYSTEM STOP MERGES ON VOLUME {}.external".format(policy)) node1.query("SYSTEM STOP MERGES ON VOLUME {}.external".format(policy))
def test_no_merges_in_configuration_allow_from_query_with_reload_on_cluster(start_cluster):
try:
name = "test_no_merges_in_configuration_allow_from_query_with_reload"
policy = "small_jbod_with_external_no_merges"
node1.restart_clickhouse(kill=True)
assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 1]
_check_merges_are_working(node1, policy, "external", False)
_insert_merge_execute(node1, name, policy, 2, [
"SYSTEM START MERGES ON CLUSTER test_cluster ON VOLUME {}.external".format(policy),
"SYSTEM RELOAD CONFIG ON CLUSTER test_cluster"
], 2, 1)
assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 0]
_check_merges_are_working(node1, policy, "external", True)
finally:
node1.query("SYSTEM STOP MERGES ON CLUSTER test_cluster ON VOLUME {}.external".format(policy))
def test_yes_merges_in_configuration_disallow_from_query_without_reload(start_cluster): def test_yes_merges_in_configuration_disallow_from_query_without_reload(start_cluster):
try: try: