mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Merge pull request #69953 from ClickHouse/backport/24.8/69075
Backport #69075 to 24.8: Remove stale moving parts without zookeeper
This commit is contained in:
commit
0c109aee8f
@ -63,6 +63,7 @@ static struct InitFiu
|
|||||||
REGULAR(keepermap_fail_drop_data) \
|
REGULAR(keepermap_fail_drop_data) \
|
||||||
REGULAR(lazy_pipe_fds_fail_close) \
|
REGULAR(lazy_pipe_fds_fail_close) \
|
||||||
PAUSEABLE(infinite_sleep) \
|
PAUSEABLE(infinite_sleep) \
|
||||||
|
PAUSEABLE(stop_moving_part_before_swap_with_active) \
|
||||||
|
|
||||||
|
|
||||||
namespace FailPoints
|
namespace FailPoints
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
#include <Storages/MergeTree/MergeTreeData.h>
|
#include <Storages/MergeTree/MergeTreeData.h>
|
||||||
#include <Storages/MergeTree/MergeTreePartsMover.h>
|
#include <Storages/MergeTree/MergeTreePartsMover.h>
|
||||||
#include <Storages/MergeTree/MergeTreeSettings.h>
|
#include <Storages/MergeTree/MergeTreeSettings.h>
|
||||||
|
#include <Common/FailPoint.h>
|
||||||
#include <Common/logger_useful.h>
|
#include <Common/logger_useful.h>
|
||||||
|
|
||||||
#include <set>
|
#include <set>
|
||||||
@ -15,6 +16,11 @@ namespace ErrorCodes
|
|||||||
extern const int DIRECTORY_ALREADY_EXISTS;
|
extern const int DIRECTORY_ALREADY_EXISTS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
namespace FailPoints
|
||||||
|
{
|
||||||
|
extern const char stop_moving_part_before_swap_with_active[];
|
||||||
|
}
|
||||||
|
|
||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -226,6 +232,7 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me
|
|||||||
cloned_part.temporary_directory_lock = data->getTemporaryPartDirectoryHolder(part->name);
|
cloned_part.temporary_directory_lock = data->getTemporaryPartDirectoryHolder(part->name);
|
||||||
|
|
||||||
MutableDataPartStoragePtr cloned_part_storage;
|
MutableDataPartStoragePtr cloned_part_storage;
|
||||||
|
bool preserve_blobs = false;
|
||||||
if (disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication)
|
if (disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication)
|
||||||
{
|
{
|
||||||
/// Try zero-copy replication and fallback to default copy if it's not possible
|
/// Try zero-copy replication and fallback to default copy if it's not possible
|
||||||
@ -253,6 +260,7 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me
|
|||||||
if (zero_copy_part)
|
if (zero_copy_part)
|
||||||
{
|
{
|
||||||
/// FIXME for some reason we cannot just use this part, we have to re-create it through MergeTreeDataPartBuilder
|
/// FIXME for some reason we cannot just use this part, we have to re-create it through MergeTreeDataPartBuilder
|
||||||
|
preserve_blobs = true;
|
||||||
zero_copy_part->is_temp = false; /// Do not remove it in dtor
|
zero_copy_part->is_temp = false; /// Do not remove it in dtor
|
||||||
cloned_part_storage = zero_copy_part->getDataPartStoragePtr();
|
cloned_part_storage = zero_copy_part->getDataPartStoragePtr();
|
||||||
}
|
}
|
||||||
@ -272,7 +280,17 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me
|
|||||||
cloned_part.part = std::move(builder).withPartFormatFromDisk().build();
|
cloned_part.part = std::move(builder).withPartFormatFromDisk().build();
|
||||||
LOG_TRACE(log, "Part {} was cloned to {}", part->name, cloned_part.part->getDataPartStorage().getFullPath());
|
LOG_TRACE(log, "Part {} was cloned to {}", part->name, cloned_part.part->getDataPartStorage().getFullPath());
|
||||||
|
|
||||||
cloned_part.part->is_temp = data->allowRemoveStaleMovingParts();
|
cloned_part.part->is_temp = false;
|
||||||
|
if (data->allowRemoveStaleMovingParts())
|
||||||
|
{
|
||||||
|
cloned_part.part->is_temp = true;
|
||||||
|
/// Setting it in case connection to zookeeper is lost while moving
|
||||||
|
/// Otherwise part might be stuck in the moving directory due to the KEEPER_EXCEPTION in part's destructor
|
||||||
|
if (preserve_blobs)
|
||||||
|
cloned_part.part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::PRESERVE_BLOBS;
|
||||||
|
else
|
||||||
|
cloned_part.part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::REMOVE_BLOBS;
|
||||||
|
}
|
||||||
cloned_part.part->loadColumnsChecksumsIndexes(true, true);
|
cloned_part.part->loadColumnsChecksumsIndexes(true, true);
|
||||||
cloned_part.part->loadVersionMetadata();
|
cloned_part.part->loadVersionMetadata();
|
||||||
cloned_part.part->modification_time = cloned_part.part->getDataPartStorage().getLastModified().epochTime();
|
cloned_part.part->modification_time = cloned_part.part->getDataPartStorage().getLastModified().epochTime();
|
||||||
@ -282,6 +300,8 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me
|
|||||||
|
|
||||||
void MergeTreePartsMover::swapClonedPart(TemporaryClonedPart & cloned_part) const
|
void MergeTreePartsMover::swapClonedPart(TemporaryClonedPart & cloned_part) const
|
||||||
{
|
{
|
||||||
|
/// Used to get some stuck parts in the moving directory by stopping moves while pause is active
|
||||||
|
FailPointInjection::pauseFailPoint(FailPoints::stop_moving_part_before_swap_with_active);
|
||||||
if (moves_blocker.isCancelled())
|
if (moves_blocker.isCancelled())
|
||||||
throw Exception(ErrorCodes::ABORTED, "Cancelled moving parts.");
|
throw Exception(ErrorCodes::ABORTED, "Cancelled moving parts.");
|
||||||
|
|
||||||
|
46
tests/integration/test_remove_stale_moving_parts/config.xml
Normal file
46
tests/integration/test_remove_stale_moving_parts/config.xml
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
<clickhouse>
|
||||||
|
<remote_servers>
|
||||||
|
<cluster>
|
||||||
|
<shard>
|
||||||
|
<replica>
|
||||||
|
<host>ch1</host>
|
||||||
|
<port>9000</port>
|
||||||
|
</replica>
|
||||||
|
</shard>
|
||||||
|
</cluster>
|
||||||
|
</remote_servers>
|
||||||
|
<macros>
|
||||||
|
<shard>01</shard>
|
||||||
|
</macros>
|
||||||
|
<storage_configuration>
|
||||||
|
<disks>
|
||||||
|
<s3>
|
||||||
|
<type>s3</type>
|
||||||
|
<endpoint>http://minio1:9001/root/data/</endpoint>
|
||||||
|
<access_key_id>minio</access_key_id>
|
||||||
|
<secret_access_key>minio123</secret_access_key>
|
||||||
|
</s3>
|
||||||
|
</disks>
|
||||||
|
<policies>
|
||||||
|
<s3>
|
||||||
|
<volumes>
|
||||||
|
<default>
|
||||||
|
<disk>default</disk>
|
||||||
|
<perform_ttl_move_on_insert>False</perform_ttl_move_on_insert>
|
||||||
|
</default>
|
||||||
|
<s3>
|
||||||
|
<disk>s3</disk>
|
||||||
|
<perform_ttl_move_on_insert>False</perform_ttl_move_on_insert>
|
||||||
|
</s3>
|
||||||
|
</volumes>
|
||||||
|
<move_factor>0.0</move_factor>
|
||||||
|
</s3>
|
||||||
|
</policies>
|
||||||
|
</storage_configuration>
|
||||||
|
|
||||||
|
<merge_tree>
|
||||||
|
<allow_remote_fs_zero_copy_replication>true</allow_remote_fs_zero_copy_replication>
|
||||||
|
<storage_policy>s3</storage_policy>
|
||||||
|
</merge_tree>
|
||||||
|
<allow_remove_stale_moving_parts>true</allow_remove_stale_moving_parts>
|
||||||
|
</clickhouse>
|
117
tests/integration/test_remove_stale_moving_parts/test.py
Normal file
117
tests/integration/test_remove_stale_moving_parts/test.py
Normal file
@ -0,0 +1,117 @@
|
|||||||
|
from pathlib import Path
|
||||||
|
import time
|
||||||
|
import pytest
|
||||||
|
from helpers.cluster import ClickHouseCluster
|
||||||
|
|
||||||
|
cluster = ClickHouseCluster(__file__)
|
||||||
|
ch1 = cluster.add_instance(
|
||||||
|
"ch1",
|
||||||
|
main_configs=[
|
||||||
|
"config.xml",
|
||||||
|
],
|
||||||
|
macros={"replica": "node1"},
|
||||||
|
with_zookeeper=True,
|
||||||
|
with_minio=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
DATABASE_NAME = "stale_moving_parts"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="module")
|
||||||
|
def started_cluster():
|
||||||
|
try:
|
||||||
|
cluster.start()
|
||||||
|
yield cluster
|
||||||
|
|
||||||
|
finally:
|
||||||
|
cluster.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
def q(node, query):
|
||||||
|
return node.query(database=DATABASE_NAME, sql=query)
|
||||||
|
|
||||||
|
|
||||||
|
# .../disks/s3/store/
|
||||||
|
def get_table_path(node, table):
|
||||||
|
return (
|
||||||
|
node.query(
|
||||||
|
sql=f"SELECT data_paths FROM system.tables WHERE table = '{table}' and database = '{DATABASE_NAME}' LIMIT 1"
|
||||||
|
)
|
||||||
|
.strip('"\n[]')
|
||||||
|
.split(",")[1]
|
||||||
|
.strip("'")
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def exec(node, cmd, path):
|
||||||
|
return node.exec_in_container(
|
||||||
|
[
|
||||||
|
"bash",
|
||||||
|
"-c",
|
||||||
|
f"{cmd} {path}",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def wait_part_is_stuck(node, table_moving_path, moving_part):
|
||||||
|
num_tries = 5
|
||||||
|
while q(node, "SELECT part_name FROM system.moves").strip() != moving_part:
|
||||||
|
if num_tries == 0:
|
||||||
|
raise Exception("Part has not started to move")
|
||||||
|
num_tries -= 1
|
||||||
|
time.sleep(1)
|
||||||
|
num_tries = 5
|
||||||
|
while exec(node, "ls", table_moving_path).strip() != moving_part:
|
||||||
|
if num_tries == 0:
|
||||||
|
raise Exception("Part is not stuck in the moving directory")
|
||||||
|
num_tries -= 1
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
|
||||||
|
def wait_zookeeper_node_to_start(zk_nodes, timeout=60):
|
||||||
|
start = time.time()
|
||||||
|
while time.time() - start < timeout:
|
||||||
|
try:
|
||||||
|
for instance in zk_nodes:
|
||||||
|
conn = cluster.get_kazoo_client(instance)
|
||||||
|
conn.get_children("/")
|
||||||
|
print("All instances of ZooKeeper started")
|
||||||
|
return
|
||||||
|
except Exception as ex:
|
||||||
|
print(("Can't connect to ZooKeeper " + str(ex)))
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
|
||||||
|
def test_remove_stale_moving_parts_without_zookeeper(started_cluster):
|
||||||
|
ch1.query(f"CREATE DATABASE IF NOT EXISTS {DATABASE_NAME}")
|
||||||
|
|
||||||
|
q(
|
||||||
|
ch1,
|
||||||
|
"CREATE TABLE test_remove ON CLUSTER cluster ( id UInt32 ) ENGINE ReplicatedMergeTree() ORDER BY id;",
|
||||||
|
)
|
||||||
|
|
||||||
|
table_moving_path = Path(get_table_path(ch1, "test_remove")) / "moving"
|
||||||
|
|
||||||
|
q(ch1, "SYSTEM ENABLE FAILPOINT stop_moving_part_before_swap_with_active")
|
||||||
|
q(ch1, "INSERT INTO test_remove SELECT number FROM numbers(100);")
|
||||||
|
moving_part = "all_0_0_0"
|
||||||
|
move_response = ch1.get_query_request(
|
||||||
|
sql=f"ALTER TABLE test_remove MOVE PART '{moving_part}' TO DISK 's3'",
|
||||||
|
database=DATABASE_NAME,
|
||||||
|
)
|
||||||
|
|
||||||
|
wait_part_is_stuck(ch1, table_moving_path, moving_part)
|
||||||
|
|
||||||
|
cluster.stop_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
|
||||||
|
# Stop moves in case table is not read-only yet
|
||||||
|
q(ch1, "SYSTEM STOP MOVES")
|
||||||
|
q(ch1, "SYSTEM DISABLE FAILPOINT stop_moving_part_before_swap_with_active")
|
||||||
|
|
||||||
|
assert "Cancelled moving parts" in move_response.get_error()
|
||||||
|
assert exec(ch1, "ls", table_moving_path).strip() == ""
|
||||||
|
|
||||||
|
cluster.start_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
|
||||||
|
wait_zookeeper_node_to_start(["zoo1", "zoo2", "zoo3"])
|
||||||
|
q(ch1, "SYSTEM START MOVES")
|
||||||
|
|
||||||
|
q(ch1, f"DROP TABLE test_remove")
|
Loading…
Reference in New Issue
Block a user