Merge pull request #31738 from ClickHouse/fix_segfault_on_attach_partition

Fix segfault on attach parititon
This commit is contained in:
tavplubix 2021-11-30 14:32:50 +03:00 committed by GitHub
commit 78224ef273
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 141 additions and 125 deletions

View File

@ -258,11 +258,11 @@ MergeTreeData::MergeTreeData(
/// format_file always contained on any data path
PathWithDisk version_file;
/// Creating directories, if not exist.
for (const auto & [path, disk] : getRelativeDataPathsWithDisks())
for (const auto & disk : getDisks())
{
disk->createDirectories(path);
disk->createDirectories(fs::path(path) / MergeTreeData::DETACHED_DIR_NAME);
String current_version_file_path = fs::path(path) / MergeTreeData::FORMAT_VERSION_FILE_NAME;
disk->createDirectories(relative_data_path);
disk->createDirectories(fs::path(relative_data_path) / MergeTreeData::DETACHED_DIR_NAME);
String current_version_file_path = fs::path(relative_data_path) / MergeTreeData::FORMAT_VERSION_FILE_NAME;
if (disk->exists(current_version_file_path))
{
if (!version_file.first.empty())
@ -1359,9 +1359,9 @@ size_t MergeTreeData::clearOldTemporaryDirectories(const MergeTreeDataMergerMuta
size_t cleared_count = 0;
/// Delete temporary directories older than a day.
for (const auto & [path, disk] : getRelativeDataPathsWithDisks())
for (const auto & disk : getDisks())
{
for (auto it = disk->iterateDirectory(path); it->isValid(); it->next())
for (auto it = disk->iterateDirectory(relative_data_path); it->isValid(); it->next())
{
const std::string & basename = it->name();
if (!startsWith(basename, "tmp_"))
@ -1700,11 +1700,11 @@ void MergeTreeData::dropAllData()
/// Removing of each data part before recursive removal of directory is to speed-up removal, because there will be less number of syscalls.
clearPartsFromFilesystem(all_parts);
for (const auto & [path, disk] : getRelativeDataPathsWithDisks())
for (const auto & disk : getDisks())
{
try
{
disk->removeRecursive(path);
disk->removeRecursive(relative_data_path);
}
catch (const fs::filesystem_error & e)
{
@ -1734,12 +1734,12 @@ void MergeTreeData::dropIfEmpty()
try
{
for (const auto & [path, disk] : getRelativeDataPathsWithDisks())
for (const auto & disk : getDisks())
{
/// Non recursive, exception is thrown if there are more files.
disk->removeFileIfExists(fs::path(path) / MergeTreeData::FORMAT_VERSION_FILE_NAME);
disk->removeDirectory(fs::path(path) / MergeTreeData::DETACHED_DIR_NAME);
disk->removeDirectory(path);
disk->removeFileIfExists(fs::path(relative_data_path) / MergeTreeData::FORMAT_VERSION_FILE_NAME);
disk->removeDirectory(fs::path(relative_data_path) / MergeTreeData::DETACHED_DIR_NAME);
disk->removeDirectory(relative_data_path);
}
}
catch (...)
@ -2291,20 +2291,9 @@ void MergeTreeData::changeSettings(
}
}
void MergeTreeData::PartsTemporaryRename::addPart(const String & old_name, const String & new_name)
void MergeTreeData::PartsTemporaryRename::addPart(const String & old_name, const String & new_name, const DiskPtr & disk)
{
old_and_new_names.push_back({old_name, new_name});
for (const auto & [path, disk] : storage.getRelativeDataPathsWithDisks())
{
for (auto it = disk->iterateDirectory(fs::path(path) / source_dir); it->isValid(); it->next())
{
if (it->name() == old_name)
{
old_part_name_to_path_and_disk[old_name] = {path, disk};
break;
}
}
}
old_and_new_names.push_back({old_name, new_name, disk});
}
void MergeTreeData::PartsTemporaryRename::tryRenameAll()
@ -2314,11 +2303,10 @@ void MergeTreeData::PartsTemporaryRename::tryRenameAll()
{
try
{
const auto & [old_name, new_name] = old_and_new_names[i];
const auto & [old_name, new_name, disk] = old_and_new_names[i];
if (old_name.empty() || new_name.empty())
throw DB::Exception("Empty part name. Most likely it's a bug.", ErrorCodes::INCORRECT_FILE_NAME);
const auto & [path, disk] = old_part_name_to_path_and_disk[old_name];
const auto full_path = fs::path(path) / source_dir; /// for old_name
throw DB::Exception("Empty part name. Most likely it's a bug.", ErrorCodes::LOGICAL_ERROR);
const auto full_path = fs::path(storage.relative_data_path) / source_dir;
disk->moveFile(fs::path(full_path) / old_name, fs::path(full_path) / new_name);
}
catch (...)
@ -2335,15 +2323,14 @@ MergeTreeData::PartsTemporaryRename::~PartsTemporaryRename()
// TODO what if server had crashed before this destructor was called?
if (!renamed)
return;
for (const auto & [old_name, new_name] : old_and_new_names)
for (const auto & [old_name, new_name, disk] : old_and_new_names)
{
if (old_name.empty())
continue;
try
{
const auto & [path, disk] = old_part_name_to_path_and_disk[old_name];
const String full_path = fs::path(path) / source_dir; /// for old_name
const String full_path = fs::path(storage.relative_data_path) / source_dir;
disk->moveFile(fs::path(full_path) / new_name, fs::path(full_path) / old_name);
}
catch (...)
@ -3823,39 +3810,31 @@ MergeTreeData::getAllDataPartsVector(MergeTreeData::DataPartStateVector * out_st
return res;
}
std::vector<DetachedPartInfo> MergeTreeData::getDetachedParts() const
DetachedPartsInfo MergeTreeData::getDetachedParts() const
{
std::vector<DetachedPartInfo> res;
DetachedPartsInfo res;
for (const auto & [path, disk] : getRelativeDataPathsWithDisks())
for (const auto & disk : getDisks())
{
String detached_path = fs::path(path) / MergeTreeData::DETACHED_DIR_NAME;
String detached_path = fs::path(relative_data_path) / MergeTreeData::DETACHED_DIR_NAME;
/// Note: we don't care about TOCTOU issue here.
if (disk->exists(detached_path))
{
for (auto it = disk->iterateDirectory(detached_path); it->isValid(); it->next())
{
auto part = DetachedPartInfo::parseDetachedPartName(it->name(), format_version);
part.disk = disk->getName();
res.push_back(std::move(part));
res.push_back(DetachedPartInfo::parseDetachedPartName(disk, it->name(), format_version));
}
}
}
return res;
}
void MergeTreeData::validateDetachedPartName(const String & name) const
void MergeTreeData::validateDetachedPartName(const String & name)
{
if (name.find('/') != std::string::npos || name == "." || name == "..")
throw DB::Exception("Invalid part name '" + name + "'", ErrorCodes::INCORRECT_FILE_NAME);
auto full_path = getFullRelativePathForPart(name, "detached/");
if (!full_path)
throw DB::Exception("Detached part \"" + name + "\" not found" , ErrorCodes::BAD_DATA_PART_NAME);
if (startsWith(name, "attaching_") || startsWith(name, "deleting_"))
throw DB::Exception("Cannot drop part " + name + ": "
"most likely it is used by another DROP or ATTACH query.",
@ -3870,7 +3849,8 @@ void MergeTreeData::dropDetached(const ASTPtr & partition, bool part, ContextPtr
{
String part_name = partition->as<ASTLiteral &>().value.safeGet<String>();
validateDetachedPartName(part_name);
renamed_parts.addPart(part_name, "deleting_" + part_name);
auto disk = getDiskForDetachedPart(part_name);
renamed_parts.addPart(part_name, "deleting_" + part_name, disk);
}
else
{
@ -3879,17 +3859,16 @@ void MergeTreeData::dropDetached(const ASTPtr & partition, bool part, ContextPtr
for (const auto & part_info : detached_parts)
if (part_info.valid_name && part_info.partition_id == partition_id
&& part_info.prefix != "attaching" && part_info.prefix != "deleting")
renamed_parts.addPart(part_info.dir_name, "deleting_" + part_info.dir_name);
renamed_parts.addPart(part_info.dir_name, "deleting_" + part_info.dir_name, part_info.disk);
}
LOG_DEBUG(log, "Will drop {} detached parts.", renamed_parts.old_and_new_names.size());
renamed_parts.tryRenameAll();
for (auto & [old_name, new_name] : renamed_parts.old_and_new_names)
for (auto & [old_name, new_name, disk] : renamed_parts.old_and_new_names)
{
const auto & [path, disk] = renamed_parts.old_part_name_to_path_and_disk[old_name];
disk->removeRecursive(fs::path(path) / "detached" / new_name / "");
disk->removeRecursive(fs::path(relative_data_path) / "detached" / new_name / "");
LOG_DEBUG(log, "Dropped detached part {}", old_name);
old_name.clear();
}
@ -3906,53 +3885,45 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
if (attach_part)
{
const String part_id = partition->as<ASTLiteral &>().value.safeGet<String>();
validateDetachedPartName(part_id);
renamed_parts.addPart(part_id, "attaching_" + part_id);
auto disk = getDiskForDetachedPart(part_id);
renamed_parts.addPart(part_id, "attaching_" + part_id, disk);
if (MergeTreePartInfo::tryParsePartName(part_id, format_version))
name_to_disk[part_id] = getDiskForPart(part_id, source_dir);
name_to_disk[part_id] = getDiskForDetachedPart(part_id);
}
else
{
String partition_id = getPartitionIDFromQuery(partition, local_context);
LOG_DEBUG(log, "Looking for parts for partition {} in {}", partition_id, source_dir);
ActiveDataPartSet active_parts(format_version);
const auto disks = getStoragePolicy()->getDisks();
for (const auto & disk : disks)
auto detached_parts = getDetachedParts();
auto new_end_it = std::remove_if(detached_parts.begin(), detached_parts.end(), [&partition_id](const DetachedPartInfo & part_info)
{
for (auto it = disk->iterateDirectory(relative_data_path + source_dir); it->isValid(); it->next())
{
const String & name = it->name();
return !part_info.valid_name || !part_info.prefix.empty() || part_info.partition_id != partition_id;
});
detached_parts.resize(std::distance(detached_parts.begin(), new_end_it));
// TODO what if name contains "_tryN" suffix?
/// Parts with prefix in name (e.g. attaching_1_3_3_0, deleting_1_3_3_0) will be ignored
if (auto part_opt = MergeTreePartInfo::tryParsePartName(name, format_version);
!part_opt || part_opt->partition_id != partition_id)
{
continue;
}
LOG_DEBUG(log, "Found part {}", name);
active_parts.add(name);
name_to_disk[name] = disk;
}
for (const auto & part_info : detached_parts)
{
LOG_DEBUG(log, "Found part {}", part_info.dir_name);
active_parts.add(part_info.dir_name);
}
LOG_DEBUG(log, "{} of them are active", active_parts.size());
/// Inactive parts are renamed so they can not be attached in case of repeated ATTACH.
for (const auto & [name, disk] : name_to_disk)
for (const auto & part_info : detached_parts)
{
const String containing_part = active_parts.getContainingPart(name);
const String containing_part = active_parts.getContainingPart(part_info.dir_name);
if (!containing_part.empty() && containing_part != name)
// TODO maybe use PartsTemporaryRename here?
disk->moveDirectory(fs::path(relative_data_path) / source_dir / name,
fs::path(relative_data_path) / source_dir / ("inactive_" + name));
if (!containing_part.empty() && containing_part != part_info.dir_name)
part_info.disk->moveDirectory(fs::path(relative_data_path) / source_dir / part_info.dir_name,
fs::path(relative_data_path) / source_dir / ("inactive_" + part_info.dir_name));
else
renamed_parts.addPart(name, "attaching_" + name);
renamed_parts.addPart(part_info.dir_name, "attaching_" + part_info.dir_name, part_info.disk);
}
}
@ -3965,11 +3936,11 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
MutableDataPartsVector loaded_parts;
loaded_parts.reserve(renamed_parts.old_and_new_names.size());
for (const auto & [old_name, new_name] : renamed_parts.old_and_new_names)
for (const auto & [old_name, new_name, disk] : renamed_parts.old_and_new_names)
{
LOG_DEBUG(log, "Checking part {}", new_name);
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + old_name, name_to_disk[old_name]);
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + old_name, disk);
MutableDataPartPtr part = createPart(old_name, single_disk_volume, source_dir + new_name);
loadPartAndFixMetadataImpl(part);
@ -5075,27 +5046,26 @@ String MergeTreeData::getFullPathOnDisk(const DiskPtr & disk) const
}
DiskPtr MergeTreeData::getDiskForPart(const String & part_name, const String & additional_path) const
DiskPtr MergeTreeData::tryGetDiskForDetachedPart(const String & part_name) const
{
String additional_path = "detached/";
const auto disks = getStoragePolicy()->getDisks();
for (const DiskPtr & disk : disks)
for (auto it = disk->iterateDirectory(relative_data_path + additional_path); it->isValid(); it->next())
if (it->name() == part_name)
return disk;
if (disk->exists(relative_data_path + additional_path + part_name))
return disk;
return nullptr;
}
std::optional<String> MergeTreeData::getFullRelativePathForPart(const String & part_name, const String & additional_path) const
DiskPtr MergeTreeData::getDiskForDetachedPart(const String & part_name) const
{
auto disk = getDiskForPart(part_name, additional_path);
if (disk)
return relative_data_path + additional_path;
return {};
if (auto disk = tryGetDiskForDetachedPart(part_name))
return disk;
throw DB::Exception(ErrorCodes::BAD_DATA_PART_NAME, "Detached part \"{}\" not found", part_name);
}
Strings MergeTreeData::getDataPaths() const
{
Strings res;
@ -5105,14 +5075,6 @@ Strings MergeTreeData::getDataPaths() const
return res;
}
MergeTreeData::PathsWithDisks MergeTreeData::getRelativeDataPathsWithDisks() const
{
PathsWithDisks res;
auto disks = getStoragePolicy()->getDisks();
for (const auto & disk : disks)
res.emplace_back(relative_data_path, disk);
return res;
}
MergeTreeData::MatcherFn MergeTreeData::getPartitionMatcher(const ASTPtr & partition_ast, ContextPtr local_context) const
{

View File

@ -288,7 +288,8 @@ public:
{
}
void addPart(const String & old_name, const String & new_name);
/// Adds part to rename. Both names are relative to relative_data_path.
void addPart(const String & old_name, const String & new_name, const DiskPtr & disk);
/// Renames part from old_name to new_name
void tryRenameAll();
@ -296,10 +297,17 @@ public:
/// Renames all added parts from new_name to old_name if old name is not empty
~PartsTemporaryRename();
struct RenameInfo
{
String old_name;
String new_name;
/// Disk cannot be changed
DiskPtr disk;
};
const MergeTreeData & storage;
const String source_dir;
std::vector<std::pair<String, String>> old_and_new_names;
std::unordered_map<String, PathWithDisk> old_part_name_to_path_and_disk;
std::vector<RenameInfo> old_and_new_names;
bool renamed = false;
};
@ -437,7 +445,7 @@ public:
/// Returns all detached parts
DetachedPartsInfo getDetachedParts() const;
void validateDetachedPartName(const String & name) const;
static void validateDetachedPartName(const String & name);
void dropDetached(const ASTPtr & partition, bool part, ContextPtr context);
@ -711,20 +719,14 @@ public:
/// Get table path on disk
String getFullPathOnDisk(const DiskPtr & disk) const;
/// Get disk where part is located.
/// `additional_path` can be set if part is not located directly in table data path (e.g. 'detached/')
DiskPtr getDiskForPart(const String & part_name, const String & additional_path = "") const;
/// Get full path for part. Uses getDiskForPart and returns the full relative path.
/// `additional_path` can be set if part is not located directly in table data path (e.g. 'detached/')
std::optional<String> getFullRelativePathForPart(const String & part_name, const String & additional_path = "") const;
/// Looks for detached part on all disks,
/// returns pointer to the disk where part is found or nullptr (the second function throws an exception)
DiskPtr tryGetDiskForDetachedPart(const String & part_name) const;
DiskPtr getDiskForDetachedPart(const String & part_name) const;
bool storesDataOnDisk() const override { return true; }
Strings getDataPaths() const override;
using PathsWithDisks = std::vector<PathWithDisk>;
PathsWithDisks getRelativeDataPathsWithDisks() const;
/// Reserves space at least 1MB.
ReservationPtr reserveSpace(UInt64 expected_size) const;

View File

@ -236,10 +236,10 @@ String MergeTreePartInfo::getPartNameV0(DayNum left_date, DayNum right_date) con
}
DetachedPartInfo DetachedPartInfo::parseDetachedPartName(
std::string_view dir_name, MergeTreeDataFormatVersion format_version)
const DiskPtr & disk, std::string_view dir_name, MergeTreeDataFormatVersion format_version)
{
DetachedPartInfo part_info;
part_info.disk = disk;
part_info.dir_name = dir_name;
/// First, try to find known prefix and parse dir_name as <prefix>_<part_name>.
@ -298,6 +298,7 @@ DetachedPartInfo DetachedPartInfo::parseDetachedPartName(
else
part_info.valid_name = false;
// TODO what if name contains "_tryN" suffix?
return part_info;
}

View File

@ -118,6 +118,9 @@ struct MergeTreePartInfo
static constexpr UInt32 LEGACY_MAX_LEVEL = std::numeric_limits<decltype(level)>::max();
};
class IDisk;
using DiskPtr = std::shared_ptr<IDisk>;
/// Information about detached part, which includes its prefix in
/// addition to the above fields.
struct DetachedPartInfo : public MergeTreePartInfo
@ -125,7 +128,7 @@ struct DetachedPartInfo : public MergeTreePartInfo
String dir_name;
String prefix;
String disk;
DiskPtr disk;
/// If false, MergeTreePartInfo is in invalid state (directory name was not successfully parsed).
bool valid_name;
@ -148,7 +151,7 @@ struct DetachedPartInfo : public MergeTreePartInfo
// This method has different semantics with MergeTreePartInfo::tryParsePartName.
// Detached parts are always parsed regardless of their validity.
// DetachedPartInfo::valid_name field specifies whether parsing was successful or not.
static DetachedPartInfo parseDetachedPartName(std::string_view dir_name, MergeTreeDataFormatVersion format_version);
static DetachedPartInfo parseDetachedPartName(const DiskPtr & disk, std::string_view dir_name, MergeTreeDataFormatVersion format_version);
private:
void addParsedPartInfo(const MergeTreePartInfo& part);

View File

@ -643,13 +643,13 @@ void StorageMergeTree::loadDeduplicationLog()
void StorageMergeTree::loadMutations()
{
for (const auto & [path, disk] : getRelativeDataPathsWithDisks())
for (const auto & disk : getDisks())
{
for (auto it = disk->iterateDirectory(path); it->isValid(); it->next())
for (auto it = disk->iterateDirectory(relative_data_path); it->isValid(); it->next())
{
if (startsWith(it->name(), "mutation_"))
{
MergeTreeMutationEntry entry(disk, path, it->name());
MergeTreeMutationEntry entry(disk, relative_data_path, it->name());
UInt64 block_number = entry.block_number;
LOG_DEBUG(log, "Loading mutation: {} entry, commands size: {}", it->name(), entry.commands.size());
auto inserted = current_mutations_by_version.try_emplace(block_number, std::move(entry)).second;
@ -1377,10 +1377,10 @@ PartitionCommandsResultInfo StorageMergeTree::attachPartition(
for (size_t i = 0; i < loaded_parts.size(); ++i)
{
LOG_INFO(log, "Attaching part {} from {}", loaded_parts[i]->name, renamed_parts.old_and_new_names[i].second);
String old_name = renamed_parts.old_and_new_names[i].first;
LOG_INFO(log, "Attaching part {} from {}", loaded_parts[i]->name, renamed_parts.old_and_new_names[i].new_name);
String old_name = renamed_parts.old_and_new_names[i].old_name;
renameTempPartAndAdd(loaded_parts[i], &increment);
renamed_parts.old_and_new_names[i].first.clear();
renamed_parts.old_and_new_names[i].old_name.clear();
results.push_back(PartitionCommandResultInfo{
.partition_id = loaded_parts[i]->info.partition_id,

View File

@ -5027,7 +5027,7 @@ PartitionCommandsResultInfo StorageReplicatedMergeTree::attachPartition(
output.writeExistingPart(loaded_parts[i]);
renamed_parts.old_and_new_names[i].first.clear();
renamed_parts.old_and_new_names[i].old_name.clear();
LOG_DEBUG(log, "Attached part {} as {}", old_name, loaded_parts[i]->name);

View File

@ -54,7 +54,7 @@ Pipe StorageSystemDetachedParts::read(
new_columns[i++]->insert(info.table);
new_columns[i++]->insert(p.valid_name ? p.partition_id : Field());
new_columns[i++]->insert(p.dir_name);
new_columns[i++]->insert(p.disk);
new_columns[i++]->insert(p.disk->getName());
new_columns[i++]->insert(p.valid_name ? p.prefix : Field());
new_columns[i++]->insert(p.valid_name ? p.min_block : Field());
new_columns[i++]->insert(p.valid_name ? p.max_block : Field());

View File

@ -0,0 +1,48 @@
#!/usr/bin/env bash
# Tags: race
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "create table mt (n int) engine=MergeTree order by n"
$CLICKHOUSE_CLIENT -q "insert into mt values (1)"
$CLICKHOUSE_CLIENT -q "insert into mt values (2)"
$CLICKHOUSE_CLIENT -q "insert into mt values (3)"
function thread_insert()
{
while true; do
$CLICKHOUSE_CLIENT -q "insert into mt values (rand())";
done
}
function thread_detach_attach()
{
while true; do
$CLICKHOUSE_CLIENT -q "alter table mt detach partition id 'all'";
$CLICKHOUSE_CLIENT -q "alter table mt attach partition id 'all'";
done
}
function thread_drop_detached()
{
while true; do
$CLICKHOUSE_CLIENT --allow_drop_detached -q "alter table mt drop detached partition id 'all'";
done
}
export -f thread_insert;
export -f thread_detach_attach;
export -f thread_drop_detached;
TIMEOUT=10
timeout $TIMEOUT bash -c thread_insert &
timeout $TIMEOUT bash -c thread_detach_attach 2> /dev/null &
timeout $TIMEOUT bash -c thread_detach_attach 2> /dev/null &
timeout $TIMEOUT bash -c thread_drop_detached 2> /dev/null &
wait
$CLICKHOUSE_CLIENT -q "drop table mt"