Backport #69075 to 24.3: Remove stale moving parts without zookeeper

This commit is contained in:
robot-clickhouse 2024-09-26 16:09:39 +00:00
parent 5ffd137c09
commit 678a06e619
5 changed files with 192 additions and 1 deletions

View File

@ -54,6 +54,13 @@ static struct InitFiu
PAUSEABLE_ONCE(finish_set_quorum_failed_parts) \ PAUSEABLE_ONCE(finish_set_quorum_failed_parts) \
PAUSEABLE_ONCE(finish_clean_quorum_failed_parts) \ PAUSEABLE_ONCE(finish_clean_quorum_failed_parts) \
PAUSEABLE(dummy_pausable_failpoint) \ PAUSEABLE(dummy_pausable_failpoint) \
ONCE(execute_query_calling_empty_set_result_func_on_exception) \
ONCE(receive_timeout_on_table_status_response) \
REGULAR(keepermap_fail_drop_data) \
REGULAR(lazy_pipe_fds_fail_close) \
PAUSEABLE(infinite_sleep) \
PAUSEABLE(stop_moving_part_before_swap_with_active) \
ONCE(execute_query_calling_empty_set_result_func_on_exception) ONCE(execute_query_calling_empty_set_result_func_on_exception)
namespace FailPoints namespace FailPoints

View File

@ -1,4 +1,6 @@
#include <Storages/MergeTree/MergeTreePartsMover.h> #include <Storages/MergeTree/MergeTreePartsMover.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Common/FailPoint.h>
#include <Storages/MergeTree/MergeTreeData.h> #include <Storages/MergeTree/MergeTreeData.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
@ -14,6 +16,11 @@ namespace ErrorCodes
extern const int DIRECTORY_ALREADY_EXISTS; extern const int DIRECTORY_ALREADY_EXISTS;
} }
namespace FailPoints
{
extern const char stop_moving_part_before_swap_with_active[];
}
namespace namespace
{ {
@ -225,6 +232,7 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me
cloned_part.temporary_directory_lock = data->getTemporaryPartDirectoryHolder(part->name); cloned_part.temporary_directory_lock = data->getTemporaryPartDirectoryHolder(part->name);
MutableDataPartStoragePtr cloned_part_storage; MutableDataPartStoragePtr cloned_part_storage;
bool preserve_blobs = false;
if (disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication) if (disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication)
{ {
/// Try zero-copy replication and fallback to default copy if it's not possible /// Try zero-copy replication and fallback to default copy if it's not possible
@ -252,6 +260,7 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me
if (zero_copy_part) if (zero_copy_part)
{ {
/// FIXME for some reason we cannot just use this part, we have to re-create it through MergeTreeDataPartBuilder /// FIXME for some reason we cannot just use this part, we have to re-create it through MergeTreeDataPartBuilder
preserve_blobs = true;
zero_copy_part->is_temp = false; /// Do not remove it in dtor zero_copy_part->is_temp = false; /// Do not remove it in dtor
cloned_part_storage = zero_copy_part->getDataPartStoragePtr(); cloned_part_storage = zero_copy_part->getDataPartStoragePtr();
} }
@ -271,7 +280,17 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me
cloned_part.part = std::move(builder).withPartFormatFromDisk().build(); cloned_part.part = std::move(builder).withPartFormatFromDisk().build();
LOG_TRACE(log, "Part {} was cloned to {}", part->name, cloned_part.part->getDataPartStorage().getFullPath()); LOG_TRACE(log, "Part {} was cloned to {}", part->name, cloned_part.part->getDataPartStorage().getFullPath());
cloned_part.part->is_temp = data->allowRemoveStaleMovingParts(); cloned_part.part->is_temp = false;
if (data->allowRemoveStaleMovingParts())
{
cloned_part.part->is_temp = true;
/// Setting it in case connection to zookeeper is lost while moving
/// Otherwise part might be stuck in the moving directory due to the KEEPER_EXCEPTION in part's destructor
if (preserve_blobs)
cloned_part.part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::PRESERVE_BLOBS;
else
cloned_part.part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::REMOVE_BLOBS;
}
cloned_part.part->loadColumnsChecksumsIndexes(true, true); cloned_part.part->loadColumnsChecksumsIndexes(true, true);
cloned_part.part->loadVersionMetadata(); cloned_part.part->loadVersionMetadata();
cloned_part.part->modification_time = cloned_part.part->getDataPartStorage().getLastModified().epochTime(); cloned_part.part->modification_time = cloned_part.part->getDataPartStorage().getLastModified().epochTime();
@ -281,6 +300,8 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me
void MergeTreePartsMover::swapClonedPart(TemporaryClonedPart & cloned_part) const void MergeTreePartsMover::swapClonedPart(TemporaryClonedPart & cloned_part) const
{ {
/// Used to get some stuck parts in the moving directory by stopping moves while pause is active
FailPointInjection::pauseFailPoint(FailPoints::stop_moving_part_before_swap_with_active);
if (moves_blocker.isCancelled()) if (moves_blocker.isCancelled())
throw Exception(ErrorCodes::ABORTED, "Cancelled moving parts."); throw Exception(ErrorCodes::ABORTED, "Cancelled moving parts.");

View File

@ -0,0 +1,46 @@
<clickhouse>
<remote_servers>
<cluster>
<shard>
<replica>
<host>ch1</host>
<port>9000</port>
</replica>
</shard>
</cluster>
</remote_servers>
<macros>
<shard>01</shard>
</macros>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>
</disks>
<policies>
<s3>
<volumes>
<default>
<disk>default</disk>
<perform_ttl_move_on_insert>False</perform_ttl_move_on_insert>
</default>
<s3>
<disk>s3</disk>
<perform_ttl_move_on_insert>False</perform_ttl_move_on_insert>
</s3>
</volumes>
<move_factor>0.0</move_factor>
</s3>
</policies>
</storage_configuration>
<merge_tree>
<allow_remote_fs_zero_copy_replication>true</allow_remote_fs_zero_copy_replication>
<storage_policy>s3</storage_policy>
</merge_tree>
<allow_remove_stale_moving_parts>true</allow_remove_stale_moving_parts>
</clickhouse>

View File

@ -0,0 +1,117 @@
from pathlib import Path
import time
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
ch1 = cluster.add_instance(
"ch1",
main_configs=[
"config.xml",
],
macros={"replica": "node1"},
with_zookeeper=True,
with_minio=True,
)
DATABASE_NAME = "stale_moving_parts"
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def q(node, query):
return node.query(database=DATABASE_NAME, sql=query)
# .../disks/s3/store/
def get_table_path(node, table):
return (
node.query(
sql=f"SELECT data_paths FROM system.tables WHERE table = '{table}' and database = '{DATABASE_NAME}' LIMIT 1"
)
.strip('"\n[]')
.split(",")[1]
.strip("'")
)
def exec(node, cmd, path):
return node.exec_in_container(
[
"bash",
"-c",
f"{cmd} {path}",
]
)
def wait_part_is_stuck(node, table_moving_path, moving_part):
num_tries = 5
while q(node, "SELECT part_name FROM system.moves").strip() != moving_part:
if num_tries == 0:
raise Exception("Part has not started to move")
num_tries -= 1
time.sleep(1)
num_tries = 5
while exec(node, "ls", table_moving_path).strip() != moving_part:
if num_tries == 0:
raise Exception("Part is not stuck in the moving directory")
num_tries -= 1
time.sleep(1)
def wait_zookeeper_node_to_start(zk_nodes, timeout=60):
start = time.time()
while time.time() - start < timeout:
try:
for instance in zk_nodes:
conn = cluster.get_kazoo_client(instance)
conn.get_children("/")
print("All instances of ZooKeeper started")
return
except Exception as ex:
print(("Can't connect to ZooKeeper " + str(ex)))
time.sleep(0.5)
def test_remove_stale_moving_parts_without_zookeeper(started_cluster):
ch1.query(f"CREATE DATABASE IF NOT EXISTS {DATABASE_NAME}")
q(
ch1,
"CREATE TABLE test_remove ON CLUSTER cluster ( id UInt32 ) ENGINE ReplicatedMergeTree() ORDER BY id;",
)
table_moving_path = Path(get_table_path(ch1, "test_remove")) / "moving"
q(ch1, "SYSTEM ENABLE FAILPOINT stop_moving_part_before_swap_with_active")
q(ch1, "INSERT INTO test_remove SELECT number FROM numbers(100);")
moving_part = "all_0_0_0"
move_response = ch1.get_query_request(
sql=f"ALTER TABLE test_remove MOVE PART '{moving_part}' TO DISK 's3'",
database=DATABASE_NAME,
)
wait_part_is_stuck(ch1, table_moving_path, moving_part)
cluster.stop_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
# Stop moves in case table is not read-only yet
q(ch1, "SYSTEM STOP MOVES")
q(ch1, "SYSTEM DISABLE FAILPOINT stop_moving_part_before_swap_with_active")
assert "Cancelled moving parts" in move_response.get_error()
assert exec(ch1, "ls", table_moving_path).strip() == ""
cluster.start_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
wait_zookeeper_node_to_start(["zoo1", "zoo2", "zoo3"])
q(ch1, "SYSTEM START MOVES")
q(ch1, f"DROP TABLE test_remove")