Clean up broken detached parts with timeout

This commit is contained in:
kssenii 2022-06-09 16:59:12 +02:00
parent 780f7c87c7
commit 7a2676c7ab
23 changed files with 248 additions and 1 deletions

View File

@ -280,7 +280,15 @@ time_t getModificationTime(const std::string & path)
struct stat st; struct stat st;
if (stat(path.c_str(), &st) == 0) if (stat(path.c_str(), &st) == 0)
return st.st_mtime; return st.st_mtime;
DB::throwFromErrnoWithPath("Cannot check modification time for file: " + path, path, DB::ErrorCodes::PATH_ACCESS_DENIED); DB::throwFromErrnoWithPath("Cannot modification change time for file: " + path, path, DB::ErrorCodes::PATH_ACCESS_DENIED);
}
time_t getChangeTime(const std::string & path)
{
struct stat st;
if (stat(path.c_str(), &st) == 0)
return st.st_ctime;
DB::throwFromErrnoWithPath("Cannot check change time for file: " + path, path, DB::ErrorCodes::PATH_ACCESS_DENIED);
} }
Poco::Timestamp getModificationTimestamp(const std::string & path) Poco::Timestamp getModificationTimestamp(const std::string & path)
@ -288,6 +296,11 @@ Poco::Timestamp getModificationTimestamp(const std::string & path)
return Poco::Timestamp::fromEpochTime(getModificationTime(path)); return Poco::Timestamp::fromEpochTime(getModificationTime(path));
} }
Poco::Timestamp getChangeTimestamp(const std::string & path)
{
return Poco::Timestamp::fromEpochTime(getChangeTime(path));
}
void setModificationTime(const std::string & path, time_t time) void setModificationTime(const std::string & path, time_t time)
{ {
struct utimbuf tb; struct utimbuf tb;

View File

@ -75,7 +75,11 @@ bool canRead(const std::string & path);
bool canWrite(const std::string & path); bool canWrite(const std::string & path);
bool canExecute(const std::string & path); bool canExecute(const std::string & path);
/// st_mtime
time_t getModificationTime(const std::string & path); time_t getModificationTime(const std::string & path);
Poco::Timestamp getModificationTimestamp(const std::string & path); Poco::Timestamp getModificationTimestamp(const std::string & path);
void setModificationTime(const std::string & path, time_t time); void setModificationTime(const std::string & path, time_t time);
/// st_ctime
time_t getChangeTime(const std::string & path);
Poco::Timestamp getChangeTimestamp(const std::string & path);
} }

View File

@ -176,6 +176,11 @@ Poco::Timestamp DiskDecorator::getLastModified(const String & path)
return delegate->getLastModified(path); return delegate->getLastModified(path);
} }
Poco::Timestamp DiskDecorator::getLastChanged(const String & path)
{
return delegate->getLastChanged(path);
}
void DiskDecorator::setReadOnly(const String & path) void DiskDecorator::setReadOnly(const String & path)
{ {
delegate->setReadOnly(path); delegate->setReadOnly(path);

View File

@ -57,6 +57,7 @@ public:
void removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only) override; void removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only) override;
void setLastModified(const String & path, const Poco::Timestamp & timestamp) override; void setLastModified(const String & path, const Poco::Timestamp & timestamp) override;
Poco::Timestamp getLastModified(const String & path) override; Poco::Timestamp getLastModified(const String & path) override;
Poco::Timestamp getLastChanged(const String & path) override;
void setReadOnly(const String & path) override; void setReadOnly(const String & path) override;
void createHardLink(const String & src_path, const String & dst_path) override; void createHardLink(const String & src_path, const String & dst_path) override;
void truncateFile(const String & path, size_t size) override; void truncateFile(const String & path, size_t size) override;

View File

@ -404,6 +404,11 @@ Poco::Timestamp DiskLocal::getLastModified(const String & path)
return FS::getModificationTimestamp(fs::path(disk_path) / path); return FS::getModificationTimestamp(fs::path(disk_path) / path);
} }
Poco::Timestamp DiskLocal::getLastChanged(const String & path)
{
return FS::getChangeTimestamp(fs::path(disk_path) / path);
}
void DiskLocal::createHardLink(const String & src_path, const String & dst_path) void DiskLocal::createHardLink(const String & src_path, const String & dst_path)
{ {
DB::createHardLink(fs::path(disk_path) / src_path, fs::path(disk_path) / dst_path); DB::createHardLink(fs::path(disk_path) / src_path, fs::path(disk_path) / dst_path);

View File

@ -93,6 +93,8 @@ public:
Poco::Timestamp getLastModified(const String & path) override; Poco::Timestamp getLastModified(const String & path) override;
Poco::Timestamp getLastChanged(const String & path) override;
void setReadOnly(const String & path) override; void setReadOnly(const String & path) override;
void createHardLink(const String & src_path, const String & dst_path) override; void createHardLink(const String & src_path, const String & dst_path) override;

View File

@ -83,6 +83,8 @@ public:
Poco::Timestamp getLastModified(const String &) override { return Poco::Timestamp(); } Poco::Timestamp getLastModified(const String &) override { return Poco::Timestamp(); }
Poco::Timestamp getLastChanged(const String &) override { return Poco::Timestamp(); }
void setReadOnly(const String & path) override; void setReadOnly(const String & path) override;
void createHardLink(const String & src_path, const String & dst_path) override; void createHardLink(const String & src_path, const String & dst_path) override;

View File

@ -100,6 +100,8 @@ public:
Poco::Timestamp getLastModified(const String &) override { return Poco::Timestamp{}; } Poco::Timestamp getLastModified(const String &) override { return Poco::Timestamp{}; }
Poco::Timestamp getLastChanged(const String &) override { return Poco::Timestamp{}; }
/// Write and modification part /// Write and modification part
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String &, size_t, WriteMode, const WriteSettings &) override std::unique_ptr<WriteBufferFromFileBase> writeFile(const String &, size_t, WriteMode, const WriteSettings &) override

View File

@ -258,6 +258,10 @@ public:
/// Get last modified time of file or directory at `path`. /// Get last modified time of file or directory at `path`.
virtual Poco::Timestamp getLastModified(const String & path) = 0; virtual Poco::Timestamp getLastModified(const String & path) = 0;
/// Get last changed time of file or directory at `path`.
/// Meaning is the same as stat.mt_ctime (e.g. different from getLastModified()).
virtual Poco::Timestamp getLastChanged(const String & path) = 0;
/// Set file at `path` as read-only. /// Set file at `path` as read-only.
virtual void setReadOnly(const String & path) = 0; virtual void setReadOnly(const String & path) = 0;

View File

@ -424,6 +424,11 @@ Poco::Timestamp DiskObjectStorage::getLastModified(const String & path)
return metadata_disk->getLastModified(path); return metadata_disk->getLastModified(path);
} }
Poco::Timestamp DiskObjectStorage::getLastChanged(const String & path)
{
return metadata_disk->getLastChanged(path);
}
void DiskObjectStorage::removeMetadata(const String & path, std::vector<String> & paths_to_remove) void DiskObjectStorage::removeMetadata(const String & path, std::vector<String> & paths_to_remove)
{ {
LOG_TRACE(log, "Remove file by path: {}", backQuote(metadata_disk->getPath() + path)); LOG_TRACE(log, "Remove file by path: {}", backQuote(metadata_disk->getPath() + path));

View File

@ -147,6 +147,8 @@ public:
Poco::Timestamp getLastModified(const String & path) override; Poco::Timestamp getLastModified(const String & path) override;
Poco::Timestamp getLastChanged(const String & path) override;
bool isRemote() const override { return true; } bool isRemote() const override { return true; }
void shutdown() override; void shutdown() override;

View File

@ -1846,6 +1846,51 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t
} }
} }
size_t MergeTreeData::clearOldBrokenPartsFromDetachedDirecory()
{
DetachedPartsInfo detached_parts = getDetachedParts();
if (detached_parts.empty())
return 0;
PartsTemporaryRename renamed_parts(*this, "detached/");
for (const auto & part_info : detached_parts)
{
if (!part_info.valid_name || part_info.prefix.empty())
continue;
const auto & removable_detached_parts_prefixes = DetachedPartInfo::DETACHED_REASONS_REMOVABLE_BY_TIMEOUT;
bool can_be_removed_by_timeout = std::find(
removable_detached_parts_prefixes.begin(),
removable_detached_parts_prefixes.end(),
part_info.prefix) != removable_detached_parts_prefixes.end();
if (!can_be_removed_by_timeout)
continue;
time_t current_time = time(nullptr);
ssize_t threshold = current_time - getSettings()->merge_tree_clear_old_broken_detached_parts_interval_seconds;
time_t last_change_time = part_info.disk->getLastChanged(fs::path(relative_data_path) / "detached" / part_info.dir_name / "").epochTime();
if (last_change_time >= threshold)
continue;
renamed_parts.addPart(part_info.dir_name, "deleting_" + part_info.dir_name, part_info.disk);
}
LOG_DEBUG(log, "Will clean up {} detached parts", renamed_parts.old_and_new_names.size());
renamed_parts.tryRenameAll();
for (const auto & [old_name, new_name, disk] : renamed_parts.old_and_new_names)
{
removeDetachedPart(disk, fs::path(relative_data_path) / "detached" / new_name / "", old_name, false);
LOG_DEBUG(log, "Removed broken detached part {} due to a timeout for broken detached parts", old_name);
}
return renamed_parts.old_and_new_names.size();
}
size_t MergeTreeData::clearOldWriteAheadLogs() size_t MergeTreeData::clearOldWriteAheadLogs()
{ {
DataPartsVector parts = getDataPartsVectorForInternalUsage(); DataPartsVector parts = getDataPartsVectorForInternalUsage();
@ -1892,6 +1937,7 @@ size_t MergeTreeData::clearOldWriteAheadLogs()
auto disk_ptr = *disk_it; auto disk_ptr = *disk_it;
if (disk_ptr->isBroken()) if (disk_ptr->isBroken())
continue; continue;
for (auto it = disk_ptr->iterateDirectory(relative_data_path); it->isValid(); it->next()) for (auto it = disk_ptr->iterateDirectory(relative_data_path); it->isValid(); it->next())
{ {
auto min_max_block_number = MergeTreeWriteAheadLog::tryParseMinMaxBlockNumber(it->name()); auto min_max_block_number = MergeTreeWriteAheadLog::tryParseMinMaxBlockNumber(it->name());

View File

@ -622,6 +622,8 @@ public:
/// Delete WAL files containing parts, that all already stored on disk. /// Delete WAL files containing parts, that all already stored on disk.
size_t clearOldWriteAheadLogs(); size_t clearOldWriteAheadLogs();
size_t clearOldBrokenPartsFromDetachedDirecory();
/// Delete all directories which names begin with "tmp" /// Delete all directories which names begin with "tmp"
/// Must be called with locked lockForShare() because it's using relative_data_path. /// Must be called with locked lockForShare() because it's using relative_data_path.
size_t clearOldTemporaryDirectories(size_t custom_directories_lifetime_seconds, const NameSet & valid_prefixes = {"tmp_", }); size_t clearOldTemporaryDirectories(size_t custom_directories_lifetime_seconds, const NameSet & valid_prefixes = {"tmp_", });

View File

@ -156,6 +156,15 @@ struct DetachedPartInfo : public MergeTreePartInfo
"covered-by-broken", "covered-by-broken",
}); });
static constexpr auto DETACHED_REASONS_REMOVABLE_BY_TIMEOUT = std::to_array<std::string_view>({
"broken",
"unexpected",
"noquorum",
"ignored",
"broken-on-start",
"covered-by-broken",
});
/// NOTE: It may parse part info incorrectly. /// NOTE: It may parse part info incorrectly.
/// For example, if prefix contains '_' or if DETACH_REASONS doesn't contain prefix. /// For example, if prefix contains '_' or if DETACH_REASONS doesn't contain prefix.
// This method has different semantics with MergeTreePartInfo::tryParsePartName. // This method has different semantics with MergeTreePartInfo::tryParsePartName.

View File

@ -61,6 +61,7 @@ struct Settings;
M(UInt64, merge_selecting_sleep_ms, 5000, "Sleep time for merge selecting when no part selected, a lower setting will trigger selecting tasks in background_schedule_pool frequently which result in large amount of requests to zookeeper in large-scale clusters", 0) \ M(UInt64, merge_selecting_sleep_ms, 5000, "Sleep time for merge selecting when no part selected, a lower setting will trigger selecting tasks in background_schedule_pool frequently which result in large amount of requests to zookeeper in large-scale clusters", 0) \
M(UInt64, merge_tree_clear_old_temporary_directories_interval_seconds, 60, "The period of executing the clear old temporary directories operation in background.", 0) \ M(UInt64, merge_tree_clear_old_temporary_directories_interval_seconds, 60, "The period of executing the clear old temporary directories operation in background.", 0) \
M(UInt64, merge_tree_clear_old_parts_interval_seconds, 1, "The period of executing the clear old parts operation in background.", 0) \ M(UInt64, merge_tree_clear_old_parts_interval_seconds, 1, "The period of executing the clear old parts operation in background.", 0) \
M(UInt64, merge_tree_clear_old_broken_detached_parts_interval_seconds, 1ULL * 3600 * 24 * 30, "The period of executing the clear old broken detached parts operation in background.", 0) \
M(Bool, remove_rolled_back_parts_immediately, 1, "Setting for an incomplete experimental feature.", 0) \ M(Bool, remove_rolled_back_parts_immediately, 1, "Setting for an incomplete experimental feature.", 0) \
\ \
/** Inserts settings. */ \ /** Inserts settings. */ \

View File

@ -65,6 +65,7 @@ void ReplicatedMergeTreeCleanupThread::iterate()
/// do it under share lock /// do it under share lock
storage.clearOldWriteAheadLogs(); storage.clearOldWriteAheadLogs();
storage.clearOldTemporaryDirectories(storage.getSettings()->temporary_directories_lifetime.totalSeconds()); storage.clearOldTemporaryDirectories(storage.getSettings()->temporary_directories_lifetime.totalSeconds());
storage.clearOldBrokenPartsFromDetachedDirecory();
} }
/// This is loose condition: no problem if we actually had lost leadership at this moment /// This is loose condition: no problem if we actually had lost leadership at this moment

View File

@ -124,6 +124,7 @@ void StorageMergeTree::startup()
/// Temporary directories contain incomplete results of merges (after forced restart) /// Temporary directories contain incomplete results of merges (after forced restart)
/// and don't allow to reinitialize them, so delete each of them immediately /// and don't allow to reinitialize them, so delete each of them immediately
clearOldTemporaryDirectories(0, {"tmp_", "delete_tmp_"}); clearOldTemporaryDirectories(0, {"tmp_", "delete_tmp_"});
clearOldBrokenPartsFromDetachedDirecory();
/// NOTE background task will also do the above cleanups periodically. /// NOTE background task will also do the above cleanups periodically.
time_after_previous_cleanup_parts.restart(); time_after_previous_cleanup_parts.restart();
@ -1184,6 +1185,18 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
}, common_assignee_trigger, getStorageID()), /* need_trigger */ false); }, common_assignee_trigger, getStorageID()), /* need_trigger */ false);
scheduled = true; scheduled = true;
} }
if (auto lock = time_after_previous_cleanup_broken_detached_parts.compareAndRestartDeferred(
getSettings()->merge_tree_clear_old_broken_detached_parts_interval_seconds))
{
assignee.scheduleCommonTask(std::make_shared<ExecutableLambdaAdapter>(
[this, share_lock] ()
{
return clearOldBrokenPartsFromDetachedDirecory();
}, common_assignee_trigger, getStorageID()), /* need_trigger */ false);
scheduled = true;
}
if (auto lock = time_after_previous_cleanup_parts.compareAndRestartDeferred( if (auto lock = time_after_previous_cleanup_parts.compareAndRestartDeferred(
getSettings()->merge_tree_clear_old_parts_interval_seconds)) getSettings()->merge_tree_clear_old_parts_interval_seconds))
{ {

View File

@ -130,6 +130,8 @@ private:
AtomicStopwatch time_after_previous_cleanup_parts; AtomicStopwatch time_after_previous_cleanup_parts;
/// For clearOldTemporaryDirectories. /// For clearOldTemporaryDirectories.
AtomicStopwatch time_after_previous_cleanup_temporary_directories; AtomicStopwatch time_after_previous_cleanup_temporary_directories;
/// For clearOldBrokenDetachedParts
AtomicStopwatch time_after_previous_cleanup_broken_detached_parts;
/// Mutex for parts currently processing in background /// Mutex for parts currently processing in background
/// merging (also with TTL), mutating or moving. /// merging (also with TTL), mutating or moving.

View File

@ -464,6 +464,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
/// don't allow to reinitialize them, delete each of them immediately. /// don't allow to reinitialize them, delete each of them immediately.
clearOldTemporaryDirectories(0, {"tmp_", "delete_tmp_"}); clearOldTemporaryDirectories(0, {"tmp_", "delete_tmp_"});
clearOldWriteAheadLogs(); clearOldWriteAheadLogs();
clearOldBrokenPartsFromDetachedDirecory();
} }
createNewZooKeeperNodes(); createNewZooKeeperNodes();

View File

@ -27,3 +27,14 @@ def corrupt_part_data_by_path(node, part_path):
], ],
privileged=True, privileged=True,
) )
def break_part(node, table, part_name):
path = f"/var/lib/clickhouse/data/default/{table}/{part_name}/columns.txt"
print(f"Corrupting part {part_name}, removing file: {path}")
node.exec_in_container(
[
"bash",
"-c",
f"rm {path}"
]
)

View File

@ -0,0 +1,17 @@
<clickhouse>
<zookeeper>
<node index="1">
<host>zoo1</host>
<port>2181</port>
</node>
<node index="2">
<host>zoo2</host>
<port>2181</port>
</node>
<node index="3">
<host>zoo3</host>
<port>2181</port>
</node>
<session_timeout_ms>20000</session_timeout_ms>
</zookeeper>
</clickhouse>

View File

@ -0,0 +1,99 @@
import pytest
from helpers.cluster import ClickHouseCluster
from multiprocessing.dummy import Pool
from helpers.corrupt_part_data_on_disk import corrupt_part_data_on_disk
from helpers.corrupt_part_data_on_disk import break_part
import time
cluster = ClickHouseCluster(__file__, zookeeper_config_path="configs/zookeeper.xml")
node1 = cluster.add_instance(
"node1",
stay_alive=True,
with_zookeeper=True,
main_configs=["configs/zookeeper.xml"],
)
path_to_data = "/var/lib/clickhouse/"
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def remove_broken_detached_part_impl(table, node, expect_broken_prefix):
assert (
node.query(
f"SELECT COUNT() FROM system.parts WHERE table='{table}' AND active=1"
)
== "4\n"
)
path_to_detached = path_to_data + f"data/default/{table}/detached/"
result = node.exec_in_container(["ls", f"{path_to_detached}"])
assert result.strip() == ""
corrupt_part_data_on_disk(node, table, "all_3_3_0")
break_part(node, table, "all_3_3_0")
result = node.query(
f"CHECK TABLE {table}", settings={"check_query_single_value_result": 0}
)
assert "all_3_3_0\t0" in result
node.query(f"DETACH TABLE {table}")
node.query(f"ATTACH TABLE {table}")
result = node.exec_in_container(["ls", f"{path_to_detached}"])
print(result)
assert f"{expect_broken_prefix}_all_3_3_0" in result
time.sleep(15)
result = node.exec_in_container(["ls", f"{path_to_detached}"])
print(result)
assert f"{expect_broken_prefix}_all_3_3_0" not in result
node.query(f"DROP TABLE {table} SYNC")
def test_remove_broken_detached_part_merge_tree(started_cluster):
node1.query(
"""
CREATE TABLE mt(id UInt32, value Int32)
ENGINE = MergeTree() ORDER BY id
SETTINGS merge_tree_clear_old_broken_detached_parts_interval_seconds=5;
"""
)
for i in range(4):
node1.query(
f"INSERT INTO mt SELECT number, number * number FROM numbers ({i * 100000}, 100000)"
)
remove_broken_detached_part_impl("mt", node1, "broken-on-start")
def test_remove_broken_detached_part_replicated_merge_tree(started_cluster):
node1.query(
f"""
CREATE TABLE replicated_mt(date Date, id UInt32, value Int32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/replicated_mt', '{node1.name}') ORDER BY id
SETTINGS merge_tree_clear_old_broken_detached_parts_interval_seconds=5, cleanup_delay_period=1, cleanup_delay_period_random_add=0;
"""
)
for i in range(4):
node1.query(
f"INSERT INTO replicated_mt SELECT toDate('2019-10-01'), number, number * number FROM numbers ({i * 100000}, 100000)"
)
remove_broken_detached_part_impl("replicated_mt", node1, "broken")