Merge pull request #72725 from ClickHouse/backport/24.10/71439

Backport #71439 to 24.10: Fix transaction rollback if `WriteBuffer::finalize` fails in plain_rewritable disk during directory creation
This commit is contained in:
robot-ch-test-poll2 2024-12-03 10:15:22 +04:00 committed by GitHub
commit b819c26401
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 127 additions and 53 deletions

View File

@ -42,7 +42,7 @@ static struct InitFiu
REGULAR(use_delayed_remote_source) \
REGULAR(cluster_discovery_faults) \
REGULAR(replicated_sends_failpoint) \
REGULAR(stripe_log_sink_write_fallpoint)\
REGULAR(stripe_log_sink_write_fallpoint) \
ONCE(smt_commit_merge_mutate_zk_fail_after_op) \
ONCE(smt_commit_merge_mutate_zk_fail_before_op) \
ONCE(smt_commit_write_zk_fail_after_op) \

View File

@ -57,6 +57,12 @@ struct InMemoryDirectoryPathMap
return it->second;
}
bool removePathIfExists(const std::filesystem::path & path)
{
std::lock_guard lock(mutex);
return map.erase(path) != 0;
}
mutable SharedMutex mutex;
#ifdef OS_LINUX

View File

@ -7,6 +7,7 @@
#include <IO/WriteHelpers.h>
#include <Poco/Timestamp.h>
#include <Common/Exception.h>
#include <Common/FailPoint.h>
#include <Common/SharedLockGuard.h>
#include <Common/logger_useful.h>
@ -18,8 +19,15 @@ namespace ErrorCodes
extern const int FILE_DOESNT_EXIST;
extern const int FILE_ALREADY_EXISTS;
extern const int INCORRECT_DATA;
extern const int FAULT_INJECTED;
};
namespace FailPoints
{
extern const char plain_object_storage_write_fail_on_directory_create[];
extern const char plain_object_storage_write_fail_on_directory_move[];
}
namespace
{
@ -72,8 +80,14 @@ void MetadataStorageFromPlainObjectStorageCreateDirectoryOperation::execute(std:
/* buf_size */ DBMS_DEFAULT_BUFFER_SIZE,
/* settings */ {});
write_created = true;
writeString(path.string(), *buf);
fiu_do_on(FailPoints::plain_object_storage_write_fail_on_directory_create, {
throw Exception(ErrorCodes::FAULT_INJECTED, "Injecting fault when creating '{}' directory", path);
});
buf->finalize();
auto event = object_storage->getMetadataStorageMetrics().directory_created;
ProfileEvents::increment(event);
{
std::lock_guard lock(path_map.mutex);
auto & map = path_map.map;
@ -83,34 +97,20 @@ void MetadataStorageFromPlainObjectStorageCreateDirectoryOperation::execute(std:
}
auto metric = object_storage->getMetadataStorageMetrics().directory_map_size;
CurrentMetrics::add(metric, 1);
writeString(path.string(), *buf);
buf->finalize();
write_finalized = true;
auto event = object_storage->getMetadataStorageMetrics().directory_created;
ProfileEvents::increment(event);
}
void MetadataStorageFromPlainObjectStorageCreateDirectoryOperation::undo(std::unique_lock<SharedMutex> &)
{
auto metadata_object_key = createMetadataObjectKey(object_key_prefix, metadata_key_prefix);
if (write_finalized)
LOG_TRACE(getLogger("MetadataStorageFromPlainObjectStorageCreateDirectoryOperation"), "Undoing '{}' directory creation", path);
const auto base_path = path.parent_path();
if (path_map.removePathIfExists(base_path))
{
const auto base_path = path.parent_path();
{
std::lock_guard lock(path_map.mutex);
path_map.map.erase(base_path);
}
auto metric = object_storage->getMetadataStorageMetrics().directory_map_size;
CurrentMetrics::sub(metric, 1);
object_storage->removeObject(StoredObject(metadata_object_key.serialize(), path / PREFIX_PATH_FILE_NAME));
}
else if (write_created)
object_storage->removeObjectIfExists(StoredObject(metadata_object_key.serialize(), path / PREFIX_PATH_FILE_NAME));
auto metadata_object_key = createMetadataObjectKey(object_key_prefix, metadata_key_prefix);
object_storage->removeObjectIfExists(StoredObject(metadata_object_key.serialize(), path / PREFIX_PATH_FILE_NAME));
}
MetadataStorageFromPlainObjectStorageMoveDirectoryOperation::MetadataStorageFromPlainObjectStorageMoveDirectoryOperation(
@ -184,8 +184,10 @@ void MetadataStorageFromPlainObjectStorageMoveDirectoryOperation::execute(std::u
getLogger("MetadataStorageFromPlainObjectStorageMoveDirectoryOperation"), "Moving directory '{}' to '{}'", path_from, path_to);
auto write_buf = createWriteBuf(path_from, path_to, /* validate_content */ true);
write_created = true;
writeString(path_to.string(), *write_buf);
fiu_do_on(FailPoints::plain_object_storage_write_fail_on_directory_move, {
throw Exception(ErrorCodes::FAULT_INJECTED, "Injecting fault when moving from '{}' to '{}'", path_from, path_to);
});
write_buf->finalize();
/// parent_path() removes the trailing '/'.
@ -207,13 +209,12 @@ void MetadataStorageFromPlainObjectStorageMoveDirectoryOperation::undo(std::uniq
{
if (write_finalized)
{
std::lock_guard lock(path_map.mutex);
auto & map = path_map.map;
map.emplace(path_from.parent_path(), map.extract(path_to.parent_path()).mapped());
}
{
std::lock_guard lock(path_map.mutex);
auto & map = path_map.map;
map.emplace(path_from.parent_path(), map.extract(path_to.parent_path()).mapped());
}
if (write_created)
{
auto write_buf = createWriteBuf(path_to, path_from, /* verify_content */ false);
writeString(path_from.string(), *write_buf);
write_buf->finalize();
@ -249,26 +250,31 @@ void MetadataStorageFromPlainObjectStorageRemoveDirectoryOperation::execute(std:
auto metadata_object = StoredObject(/*remote_path*/ metadata_object_key.serialize(), /*local_path*/ path / PREFIX_PATH_FILE_NAME);
object_storage->removeObject(metadata_object);
if (path_map.removePathIfExists(base_path))
{
std::lock_guard lock(path_map.mutex);
auto & map = path_map.map;
map.erase(base_path);
auto metric = object_storage->getMetadataStorageMetrics().directory_map_size;
CurrentMetrics::sub(metric, 1);
auto event = object_storage->getMetadataStorageMetrics().directory_removed;
ProfileEvents::increment(event);
}
auto metric = object_storage->getMetadataStorageMetrics().directory_map_size;
CurrentMetrics::sub(metric, 1);
removed = true;
auto event = object_storage->getMetadataStorageMetrics().directory_removed;
ProfileEvents::increment(event);
remove_attempted = true;
}
void MetadataStorageFromPlainObjectStorageRemoveDirectoryOperation::undo(std::unique_lock<SharedMutex> &)
{
if (!removed)
if (!remove_attempted)
return;
{
std::lock_guard lock(path_map.mutex);
auto & map = path_map.map;
map.emplace(path.parent_path(), key_prefix);
}
auto metric = object_storage->getMetadataStorageMetrics().directory_map_size;
CurrentMetrics::add(metric, 1);
auto metadata_object_key = createMetadataObjectKey(key_prefix, metadata_key_prefix);
auto metadata_object = StoredObject(metadata_object_key.serialize(), path / PREFIX_PATH_FILE_NAME);
auto buf = object_storage->writeObject(
@ -279,14 +285,6 @@ void MetadataStorageFromPlainObjectStorageRemoveDirectoryOperation::undo(std::un
/* settings */ {});
writeString(path.string(), *buf);
buf->finalize();
{
std::lock_guard lock(path_map.mutex);
auto & map = path_map.map;
map.emplace(path.parent_path(), std::move(key_prefix));
}
auto metric = object_storage->getMetadataStorageMetrics().directory_map_size;
CurrentMetrics::add(metric, 1);
}
MetadataStorageFromPlainObjectStorageWriteFileOperation::MetadataStorageFromPlainObjectStorageWriteFileOperation(

View File

@ -19,9 +19,6 @@ private:
const std::string metadata_key_prefix;
const std::string object_key_prefix;
bool write_created = false;
bool write_finalized = false;
public:
MetadataStorageFromPlainObjectStorageCreateDirectoryOperation(
/// path_ must end with a trailing '/'.
@ -43,7 +40,6 @@ private:
ObjectStoragePtr object_storage;
const std::string metadata_key_prefix;
bool write_created = false;
bool write_finalized = false;
std::unique_ptr<WriteBufferFromFileBase>
@ -73,7 +69,7 @@ private:
const std::string metadata_key_prefix;
std::string key_prefix;
bool removed = false;
bool remove_attempted = false;
public:
MetadataStorageFromPlainObjectStorageRemoveDirectoryOperation(

View File

@ -0,0 +1,12 @@
1 2
2 2
3 1
4 7
5 10
6 12
1 2
2 2
3 1
4 7
5 10
6 12

View File

@ -0,0 +1,62 @@
#!/usr/bin/env bash
# Tags: no-fasttest, no-shared-merge-tree, no-parallel
# Tag no-fasttest: requires S3
# Tag no-shared-merge-tree: does not support replication
# Tag no-parallel: uses failpoints
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
on_exit() {
${CLICKHOUSE_CLIENT} -m --query "
SYSTEM DISABLE FAILPOINT plain_object_storage_write_fail_on_directory_create;
SYSTEM DISABLE FAILPOINT plain_object_storage_write_fail_on_directory_move;
"
}
trap on_exit EXIT
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test_s3_mt_fault"
${CLICKHOUSE_CLIENT} --query "
CREATE TABLE test_s3_mt_fault (a Int32, b Int64) engine = MergeTree() ORDER BY tuple(a, b)
SETTINGS disk = disk(
name = 03008_s3_plain_rewritable_fault,
type = s3_plain_rewritable,
endpoint = 'http://localhost:11111/test/03008_test_s3_mt_fault/',
access_key_id = clickhouse,
secret_access_key = clickhouse);
"
${CLICKHOUSE_CLIENT} --query "
INSERT INTO test_s3_mt_fault (*) VALUES (1, 2), (2, 2), (3, 1), (4, 7), (5, 10), (6, 12);
OPTIMIZE TABLE test_s3_mt_fault FINAL;
"
${CLICKHOUSE_CLIENT} --query "
SYSTEM ENABLE FAILPOINT plain_object_storage_write_fail_on_directory_create
"
${CLICKHOUSE_CLIENT} --query "
INSERT INTO test_s3_mt_fault (*) select number, number from numbers_mt(100)" 2>&1 | grep -Fq "FAULT_INJECTED"
${CLICKHOUSE_CLIENT} --query "SELECT * FROM test_s3_mt_fault;"
${CLICKHOUSE_CLIENT} --query "
SYSTEM DISABLE FAILPOINT plain_object_storage_write_fail_on_directory_create;
SYSTEM ENABLE FAILPOINT plain_object_storage_write_fail_on_directory_move;
"
${CLICKHOUSE_CLIENT} --query "
INSERT INTO test_s3_mt_fault (*) select number, number from numbers_mt(100);
" 2>&1 | grep -Fq "FAULT_INJECTED"
${CLICKHOUSE_CLIENT} --query "SELECT * FROM test_s3_mt_fault;"
${CLICKHOUSE_CLIENT} --query "
SYSTEM DISABLE FAILPOINT plain_object_storage_write_fail_on_directory_move;
"
# Filter out 'Removing temporary directory' because the fault injection prevents directory rename.
${CLICKHOUSE_CLIENT} --query "DROP TABLE test_s3_mt_fault SYNC" 2>&1 | grep -v 'Removing temporary directory' ||: