mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 08:02:02 +00:00
Merge pull request #41323 from ClickHouse/remove_bad_exception
Fix parts removal in case of connection loss
This commit is contained in:
commit
33216dedf9
@ -131,18 +131,18 @@ bool ReadBufferFromS3::nextImpl()
|
||||
ProfileEvents::increment(ProfileEvents::ReadBufferFromS3Microseconds, watch.elapsedMicroseconds());
|
||||
break;
|
||||
}
|
||||
catch (const Exception & e)
|
||||
catch (Exception & e)
|
||||
{
|
||||
watch.stop();
|
||||
ProfileEvents::increment(ProfileEvents::ReadBufferFromS3Microseconds, watch.elapsedMicroseconds());
|
||||
ProfileEvents::increment(ProfileEvents::ReadBufferFromS3RequestsErrors, 1);
|
||||
|
||||
if (const auto * s3_exception = dynamic_cast<const S3Exception *>(&e))
|
||||
if (auto * s3_exception = dynamic_cast<S3Exception *>(&e))
|
||||
{
|
||||
/// It doesn't make sense to retry Access Denied or No Such Key
|
||||
if (!s3_exception->isRetryableError())
|
||||
{
|
||||
tryLogCurrentException(log, fmt::format("while reading key: {}, from bucket: {}", key, bucket));
|
||||
s3_exception->addMessage("while reading key: {}, from bucket: {}", key, bucket);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
@ -55,7 +55,7 @@ public:
|
||||
bool isRetryableError() const;
|
||||
|
||||
private:
|
||||
const Aws::S3::S3Errors code;
|
||||
Aws::S3::S3Errors code;
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -204,13 +204,12 @@ DataPartStorageBuilderPtr DataPartStorageOnDisk::getBuilder() const
|
||||
}
|
||||
|
||||
void DataPartStorageOnDisk::remove(
|
||||
bool can_remove_shared_data,
|
||||
const NameSet & names_not_to_remove,
|
||||
CanRemoveCallback && can_remove_callback,
|
||||
const MergeTreeDataPartChecksums & checksums,
|
||||
std::list<ProjectionChecksums> projections,
|
||||
bool is_temp,
|
||||
MergeTreeDataPartState state,
|
||||
Poco::Logger * log) const
|
||||
Poco::Logger * log)
|
||||
{
|
||||
/// NOTE We rename part to delete_tmp_<relative_path> instead of delete_tmp_<name> to avoid race condition
|
||||
/// when we try to remove two parts with the same name, but different relative paths,
|
||||
@ -239,13 +238,16 @@ void DataPartStorageOnDisk::remove(
|
||||
|
||||
fs::path to = fs::path(root_path) / part_dir_without_slash;
|
||||
|
||||
std::optional<CanRemoveDescription> can_remove_description;
|
||||
|
||||
auto disk = volume->getDisk();
|
||||
if (disk->exists(to))
|
||||
{
|
||||
LOG_WARNING(log, "Directory {} (to which part must be renamed before removing) already exists. Most likely this is due to unclean restart or race condition. Removing it.", fullPath(disk, to));
|
||||
try
|
||||
{
|
||||
disk->removeSharedRecursive(fs::path(to) / "", !can_remove_shared_data, names_not_to_remove);
|
||||
can_remove_description.emplace(can_remove_callback());
|
||||
disk->removeSharedRecursive(fs::path(to) / "", !can_remove_description->can_remove_anything, can_remove_description->files_not_to_remove);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
@ -257,6 +259,7 @@ void DataPartStorageOnDisk::remove(
|
||||
try
|
||||
{
|
||||
disk->moveDirectory(from, to);
|
||||
onRename(root_path, part_dir_without_slash);
|
||||
}
|
||||
catch (const fs::filesystem_error & e)
|
||||
{
|
||||
@ -268,6 +271,9 @@ void DataPartStorageOnDisk::remove(
|
||||
throw;
|
||||
}
|
||||
|
||||
if (!can_remove_description)
|
||||
can_remove_description.emplace(can_remove_callback());
|
||||
|
||||
// Record existing projection directories so we don't remove them twice
|
||||
std::unordered_set<String> projection_directories;
|
||||
std::string proj_suffix = ".proj";
|
||||
@ -278,7 +284,7 @@ void DataPartStorageOnDisk::remove(
|
||||
|
||||
clearDirectory(
|
||||
fs::path(to) / proj_dir_name,
|
||||
can_remove_shared_data, names_not_to_remove, projection.checksums, {}, is_temp, state, log, true);
|
||||
can_remove_description->can_remove_anything, can_remove_description->files_not_to_remove, projection.checksums, {}, is_temp, state, log, true);
|
||||
}
|
||||
|
||||
/// It is possible that we are removing the part which have a written but not loaded projection.
|
||||
@ -305,7 +311,7 @@ void DataPartStorageOnDisk::remove(
|
||||
|
||||
clearDirectory(
|
||||
fs::path(to) / name,
|
||||
can_remove_shared_data, names_not_to_remove, tmp_checksums, {}, is_temp, state, log, true);
|
||||
can_remove_description->can_remove_anything, can_remove_description->files_not_to_remove, tmp_checksums, {}, is_temp, state, log, true);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
@ -315,7 +321,7 @@ void DataPartStorageOnDisk::remove(
|
||||
}
|
||||
}
|
||||
|
||||
clearDirectory(to, can_remove_shared_data, names_not_to_remove, checksums, projection_directories, is_temp, state, log, false);
|
||||
clearDirectory(to, can_remove_description->can_remove_anything, can_remove_description->files_not_to_remove, checksums, projection_directories, is_temp, state, log, false);
|
||||
}
|
||||
|
||||
void DataPartStorageOnDisk::clearDirectory(
|
||||
|
@ -45,13 +45,12 @@ public:
|
||||
void checkConsistency(const MergeTreeDataPartChecksums & checksums) const override;
|
||||
|
||||
void remove(
|
||||
bool can_remove_shared_data,
|
||||
const NameSet & names_not_to_remove,
|
||||
CanRemoveCallback && can_remove_callback,
|
||||
const MergeTreeDataPartChecksums & checksums,
|
||||
std::list<ProjectionChecksums> projections,
|
||||
bool is_temp,
|
||||
MergeTreeDataPartState state,
|
||||
Poco::Logger * log) const override;
|
||||
Poco::Logger * log) override;
|
||||
|
||||
std::string getRelativePathForPrefix(Poco::Logger * log, const String & prefix, bool detached) const override;
|
||||
|
||||
|
@ -12,6 +12,13 @@ namespace DB
|
||||
class ReadBufferFromFileBase;
|
||||
class WriteBufferFromFileBase;
|
||||
|
||||
struct CanRemoveDescription
|
||||
{
|
||||
bool can_remove_anything;
|
||||
NameSet files_not_to_remove;
|
||||
|
||||
};
|
||||
using CanRemoveCallback = std::function<CanRemoveDescription()>;
|
||||
|
||||
class IDataPartStorageIterator
|
||||
{
|
||||
@ -113,13 +120,12 @@ public:
|
||||
/// can_remove_shared_data, names_not_to_remove are specific for DiskObjectStorage.
|
||||
/// projections, checksums are needed to avoid recursive listing
|
||||
virtual void remove(
|
||||
bool can_remove_shared_data,
|
||||
const NameSet & names_not_to_remove,
|
||||
CanRemoveCallback && can_remove_callback,
|
||||
const MergeTreeDataPartChecksums & checksums,
|
||||
std::list<ProjectionChecksums> projections,
|
||||
bool is_temp,
|
||||
MergeTreeDataPartState state,
|
||||
Poco::Logger * log) const = 0;
|
||||
Poco::Logger * log) = 0;
|
||||
|
||||
/// Get a name like 'prefix_partdir_tryN' which does not exist in a root dir.
|
||||
/// TODO: remove it.
|
||||
|
@ -1433,13 +1433,18 @@ void IMergeTreeDataPart::remove() const
|
||||
assert(assertHasValidVersionMetadata());
|
||||
part_is_probably_removed_from_disk = true;
|
||||
|
||||
auto [can_remove, files_not_to_remove] = canRemovePart();
|
||||
auto can_remove_callback = [this] ()
|
||||
{
|
||||
auto [can_remove, files_not_to_remove] = canRemovePart();
|
||||
if (!can_remove)
|
||||
LOG_TRACE(storage.log, "Blobs of part {} cannot be removed", name);
|
||||
|
||||
if (!can_remove)
|
||||
LOG_TRACE(storage.log, "Blobs of part {} cannot be removed", name);
|
||||
if (!files_not_to_remove.empty())
|
||||
LOG_TRACE(storage.log, "Some blobs ({}) of part {} cannot be removed", fmt::join(files_not_to_remove, ", "), name);
|
||||
|
||||
return CanRemoveDescription{.can_remove_anything = can_remove, .files_not_to_remove = files_not_to_remove };
|
||||
};
|
||||
|
||||
if (!files_not_to_remove.empty())
|
||||
LOG_TRACE(storage.log, "Some blobs ({}) of part {} cannot be removed", fmt::join(files_not_to_remove, ", "), name);
|
||||
|
||||
if (!isStoredOnDisk())
|
||||
return;
|
||||
@ -1459,7 +1464,7 @@ void IMergeTreeDataPart::remove() const
|
||||
projection_checksums.emplace_back(IDataPartStorage::ProjectionChecksums{.name = p_name, .checksums = projection_part->checksums});
|
||||
}
|
||||
|
||||
data_part_storage->remove(can_remove, files_not_to_remove, checksums, projection_checksums, is_temp, getState(), storage.log);
|
||||
data_part_storage->remove(std::move(can_remove_callback), checksums, projection_checksums, is_temp, getState(), storage.log);
|
||||
}
|
||||
|
||||
String IMergeTreeDataPart::getRelativePathForPrefix(const String & prefix, bool detached) const
|
||||
|
@ -27,6 +27,7 @@
|
||||
#include <IO/Operators.h>
|
||||
#include <IO/ReadBufferFromMemory.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <IO/S3Common.h>
|
||||
#include <Interpreters/Aggregator.h>
|
||||
#include <Interpreters/ExpressionAnalyzer.h>
|
||||
#include <Interpreters/PartLog.h>
|
||||
@ -1635,11 +1636,10 @@ size_t MergeTreeData::clearOldTemporaryDirectories(size_t custom_directories_lif
|
||||
/// We don't control the amount of refs for temporary parts so we cannot decide can we remove blobs
|
||||
/// or not. So we are not doing it
|
||||
bool keep_shared = false;
|
||||
if (it->path().find("fetch") != std::string::npos)
|
||||
if (disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication)
|
||||
{
|
||||
keep_shared = disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication;
|
||||
if (keep_shared)
|
||||
LOG_WARNING(log, "Since zero-copy replication is enabled we are not going to remove blobs from shared storage for {}", full_path);
|
||||
LOG_WARNING(log, "Since zero-copy replication is enabled we are not going to remove blobs from shared storage for {}", full_path);
|
||||
keep_shared = true;
|
||||
}
|
||||
|
||||
disk->removeSharedRecursive(it->path(), keep_shared, {});
|
||||
@ -2135,6 +2135,7 @@ void MergeTreeData::renameInMemory(const StorageID & new_table_id)
|
||||
void MergeTreeData::dropAllData()
|
||||
{
|
||||
LOG_TRACE(log, "dropAllData: waiting for locks.");
|
||||
auto settings_ptr = getSettings();
|
||||
|
||||
auto lock = lockParts();
|
||||
|
||||
|
@ -7604,7 +7604,7 @@ std::pair<bool, NameSet> StorageReplicatedMergeTree::unlockSharedData(const IMer
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_TRACE(log, "Part {} looks temporary, because checksums file doesn't exists, blobs can be removed", part.name);
|
||||
LOG_TRACE(log, "Part {} looks temporary, because {} file doesn't exists, blobs can be removed", part.name, IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK);
|
||||
/// Temporary part with some absent file cannot be locked in shared mode
|
||||
return std::make_pair(true, NameSet{});
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user