drop detached partition

This commit is contained in:
Alexander Tokmakov 2019-07-31 17:44:55 +03:00
parent c6717e0d3f
commit f0836553d4
10 changed files with 103 additions and 42 deletions

View File

@ -1720,12 +1720,35 @@ MergeTreeData::AlterDataPartTransaction::~AlterDataPartTransaction()
void MergeTreeData::PartsTemporaryRename::addPart(const String & old_name, const String & new_name)
{
Poco::File(base_dir + old_name).renameTo(base_dir + new_name);
old_and_new_names.push_back({old_name, new_name});
}
void MergeTreeData::PartsTemporaryRename::tryRenameAll()
{
renamed = true;
for (size_t i = 0; i < old_and_new_names.size(); ++i)
{
try
{
const auto & names = old_and_new_names[i];
if (names.first.empty() || names.second.empty())
throw DB::Exception("Empty part name. Most likely it's a bug.", ErrorCodes::INCORRECT_FILE_NAME);
Poco::File(base_dir + names.first).renameTo(base_dir + names.second);
}
catch (...)
{
old_and_new_names.resize(i);
LOG_WARNING(storage.log, "Cannot rename parts to perform operation on them: " << getCurrentExceptionMessage(false));
throw;
}
}
}
MergeTreeData::PartsTemporaryRename::~PartsTemporaryRename()
{
// TODO what if server had crashed before this destructor was called?
if (!renamed)
return;
for (const auto & names : old_and_new_names)
{
if (names.first.empty())
@ -2621,46 +2644,60 @@ void MergeTreeData::validateDetachedPartName(const String & name) const
Poco::File detached_part_dir(full_path + "detached/" + name);
if (!detached_part_dir.exists())
throw DB::Exception("Detached part \"" + name + "\" not found" , ErrorCodes::BAD_DATA_PART_NAME);
if (startsWith(name, "attaching_") || startsWith(name, "deleting_"))
throw DB::Exception("Cannot drop part " + name + ": "
"most likely it is used by another DROP or ATTACH query.",
ErrorCodes::BAD_DATA_PART_NAME);
}
void MergeTreeData::dropDetached(const ASTPtr & partition, bool part, const Context &)
void MergeTreeData::dropDetached(const ASTPtr & partition, bool part, const Context & context)
{
if (!part) // TODO
throw DB::Exception("DROP DETACHED PARTITION is not implemented, use DROP DETACHED PART", ErrorCodes::NOT_IMPLEMENTED);
PartsTemporaryRename renamed_parts(*this, full_path + "detached/");
String part_id = partition->as<ASTLiteral &>().value.safeGet<String>();
validateDetachedPartName(part_id);
if (startsWith(part_id, "attaching_") || startsWith(part_id, "deleting_"))
throw DB::Exception("Cannot drop part " + part_id + ": "
"most likely it is used by another DROP or ATTACH query.", ErrorCodes::BAD_DATA_PART_NAME);
if (part)
{
String part_name = partition->as<ASTLiteral &>().value.safeGet<String>();
validateDetachedPartName(part_name);
renamed_parts.addPart(part_name, "deleting_" + part_name);
}
else
{
String partition_id = getPartitionIDFromQuery(partition, context);
DetachedPartsInfo detached_parts = getDetachedParts();
for (const auto & part_info : detached_parts)
if (part_info.valid_name && part_info.partition_id == partition_id
&& part_info.prefix != "attaching" && part_info.prefix != "deleting")
renamed_parts.addPart(part_info.dir_name, "deleting_" + part_info.dir_name);
}
PartsTemporaryRename renamed_parts(full_path + "detached/");
renamed_parts.addPart(part_id, "deleting_" + part_id);
Poco::File(renamed_parts.base_dir + renamed_parts.old_and_new_names.front().second).remove(true);
renamed_parts.old_and_new_names.front().first.clear();
LOG_DEBUG(log, "Will drop " << renamed_parts.old_and_new_names.size() << " detached parts.");
renamed_parts.tryRenameAll();
for (auto & names : renamed_parts.old_and_new_names)
{
Poco::File(renamed_parts.base_dir + names.second).remove(true);
LOG_DEBUG(log, "Dropped detached part " << names.first);
names.first.clear();
}
}
MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const ASTPtr & partition, bool attach_part,
const Context & context, PartsTemporaryRename & renamed_parts)
{
String partition_id;
if (attach_part)
partition_id = partition->as<ASTLiteral &>().value.safeGet<String>();
else
partition_id = getPartitionIDFromQuery(partition, context);
String source_dir = "detached/";
/// Let's compose a list of parts that should be added.
Strings parts;
if (attach_part)
{
validateDetachedPartName(partition_id);
parts.push_back(partition_id);
String part_id = partition->as<ASTLiteral &>().value.safeGet<String>();
validateDetachedPartName(part_id);
renamed_parts.addPart(part_id, "attaching_" + part_id);
}
else
{
String partition_id = getPartitionIDFromQuery(partition, context);
LOG_DEBUG(log, "Looking for parts for partition " << partition_id << " in " << source_dir);
ActiveDataPartSet active_parts(format_version);
@ -2670,6 +2707,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
String name = it.name();
MergeTreePartInfo part_info;
// TODO what if name contains "_tryN" suffix?
/// Parts with prefix in name (e.g. attaching_1_3_3_0, deleting_1_3_3_0) will be ignored
if (!MergeTreePartInfo::tryParsePartName(name, &part_info, format_version))
continue;
if (part_info.partition_id != partition_id)
@ -2679,26 +2717,26 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
part_names.insert(name);
}
LOG_DEBUG(log, active_parts.size() << " of them are active");
parts = active_parts.getParts();
/// Inactive parts rename so they can not be attached in case of repeated ATTACH.
for (const auto & name : part_names)
{
// TODO maybe use PartsTemporaryRename here?
String containing_part = active_parts.getContainingPart(name);
if (!containing_part.empty() && containing_part != name)
// TODO maybe use PartsTemporaryRename here?
Poco::File(full_path + source_dir + name).renameTo(full_path + source_dir + "inactive_" + name);
else
renamed_parts.addPart(name, "attaching_" + name);
}
}
/// Try to rename all parts before attaching to prevent race with DROP DETACHED and another ATTACH.
for (const auto & source_part_name : parts)
renamed_parts.addPart(source_part_name, "attaching_" + source_part_name);
renamed_parts.tryRenameAll();
/// Synchronously check that added parts exist and are not broken. We will write checksums.txt if it does not exist.
LOG_DEBUG(log, "Checking parts");
MutableDataPartsVector loaded_parts;
loaded_parts.reserve(parts.size());
loaded_parts.reserve(renamed_parts.old_and_new_names.size());
for (const auto & part_names : renamed_parts.old_and_new_names)
{
LOG_DEBUG(log, "Checking part " << part_names.second);

View File

@ -251,16 +251,20 @@ public:
struct PartsTemporaryRename : private boost::noncopyable
{
PartsTemporaryRename(const String & base_dir_) : base_dir(base_dir_) {}
PartsTemporaryRename(const MergeTreeData & storage_, const String & base_dir_) : storage(storage_), base_dir(base_dir_) {}
void addPart(const String & old_name, const String & new_name);
/// Renames part from old_name to new_name
void addPart(const String & old_name, const String & new_name);
void tryRenameAll();
/// Renames all added parts from new_name to old_name if old name is not empty
~PartsTemporaryRename();
const MergeTreeData & storage;
String base_dir;
std::vector<std::pair<String, String>> old_and_new_names;
bool renamed = false;
};
/// Parameters for various modes.
@ -401,7 +405,7 @@ public:
DataPartsVector getAllDataPartsVector(DataPartStateVector * out_states = nullptr) const;
/// Returns all detached parts
std::vector<DetachedPartInfo> getDetachedParts() const;
DetachedPartsInfo getDetachedParts() const;
void validateDetachedPartName(const String & name) const;

View File

@ -194,7 +194,7 @@ bool DetachedPartInfo::tryParseDetachedPartName(const String & dir_name, Detache
part_info.dir_name = dir_name;
/// First, try to parse as <part_name>.
// TODO what if tryParsePartName will parse prefix as partition_id?
// TODO what if tryParsePartName will parse prefix as partition_id? It can happen if dir_name doesn't contain mutation number at the end
if (MergeTreePartInfo::tryParsePartName(dir_name, &part_info, format_version))
return part_info.valid_name = true;

View File

@ -92,7 +92,6 @@ struct MergeTreePartInfo
/// addition to the above fields.
struct DetachedPartInfo : public MergeTreePartInfo
{
/// Suddenly, name of detached part may contain suffix (such as _tryN), which is ignored by MergeTreePartInfo::tryParsePartName(...)
String dir_name;
String prefix;
@ -102,4 +101,6 @@ struct DetachedPartInfo : public MergeTreePartInfo
static bool tryParseDetachedPartName(const String & dir_name, DetachedPartInfo & part_info, MergeTreeDataFormatVersion format_version);
};
using DetachedPartsInfo = std::vector<DetachedPartInfo>;
}

View File

@ -25,9 +25,6 @@ std::optional<PartitionCommand> PartitionCommand::parse(const ASTAlterCommand *
}
else if (command_ast->type == ASTAlterCommand::DROP_DETACHED_PARTITION)
{
if (!command_ast->part) // TODO
throw DB::Exception("Not implemented yet", ErrorCodes::NOT_IMPLEMENTED);
PartitionCommand res;
res.type = DROP_DETACHED_PARTITION;
res.partition = command_ast->partition;

View File

@ -1018,7 +1018,7 @@ void StorageMergeTree::attachPartition(const ASTPtr & partition, bool attach_par
{
// TODO: should get some locks to prevent race with 'alter … modify column'
PartsTemporaryRename renamed_parts(full_path + "detached/");
PartsTemporaryRename renamed_parts(*this, full_path + "detached/");
MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, context, renamed_parts);
for (size_t i = 0; i < loaded_parts.size(); ++i)

View File

@ -3545,7 +3545,7 @@ void StorageReplicatedMergeTree::attachPartition(const ASTPtr & partition, bool
assertNotReadonly();
PartsTemporaryRename renamed_parts(full_path + "detached/");
PartsTemporaryRename renamed_parts(*this, full_path + "detached/");
MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, query_context, renamed_parts);
ReplicatedMergeTreeBlockOutputStream output(*this, 0, 0, 0, false); /// TODO Allow to use quorum here.

View File

@ -28,10 +28,10 @@ $CLICKHOUSE_CLIENT --query="INSERT INTO attach_partitions SELECT number FROM sys
$CLICKHOUSE_CLIENT --query="ALTER TABLE attach_partitions DETACH PARTITION 0";
sudo -n mkdir --mode=777 $ch_dir/data/$cur_db/attach_partitions/detached/0_5_5_0/ 2>/dev/null || \
mkdir --mode=777 $ch_dir/data/$cur_db/attach_partitions/detached/0_5_5_0/ # broken part
sudo -n cp -r $ch_dir/data/$cur_db/attach_partitions/detached/0_1_1_0/ $ch_dir/data/$cur_db/attach_partitions/detached/attaching_0_6_6_0/ 2>/dev/null || \
cp -r $ch_dir/data/$cur_db/attach_partitions/detached/0_1_1_0/ $ch_dir/data/$cur_db/attach_partitions/detached/attaching_0_6_6_0/
sudo -n cp -r $ch_dir/data/$cur_db/attach_partitions/detached/0_3_3_0/ $ch_dir/data/$cur_db/attach_partitions/detached/deleting_0_7_7_0/ 2>/dev/null || \
cp -r $ch_dir/data/$cur_db/attach_partitions/detached/0_3_3_0/ $ch_dir/data/$cur_db/attach_partitions/detached/deleting_0_7_7_0/
sudo -n cp -pr $ch_dir/data/$cur_db/attach_partitions/detached/0_1_1_0/ $ch_dir/data/$cur_db/attach_partitions/detached/attaching_0_6_6_0/ 2>/dev/null || \
cp -pr $ch_dir/data/$cur_db/attach_partitions/detached/0_1_1_0/ $ch_dir/data/$cur_db/attach_partitions/detached/attaching_0_6_6_0/
sudo -n cp -pr $ch_dir/data/$cur_db/attach_partitions/detached/0_3_3_0/ $ch_dir/data/$cur_db/attach_partitions/detached/deleting_0_7_7_0/ 2>/dev/null || \
cp -pr $ch_dir/data/$cur_db/attach_partitions/detached/0_3_3_0/ $ch_dir/data/$cur_db/attach_partitions/detached/deleting_0_7_7_0/
echo '=== check all parts before attaching ===';
$CLICKHOUSE_CLIENT --query="ALTER TABLE attach_partitions ATTACH PARTITION 0" 2>&1 | grep "No columns in part 0_5_5_0" > /dev/null && echo 'OK2';

View File

@ -1,6 +1,15 @@
=== validate part name ===
OK1
OK2
OK3
=== drop detached part ===
0_3_3_0
1_2_2_0
1_4_4_0
attaching_0_6_6_0
deleting_0_7_7_0
prefix_1_2_2_0_0
=== drop detached partition ===
0_3_3_0
attaching_0_6_6_0
deleting_0_7_7_0

View File

@ -15,19 +15,31 @@ $CLICKHOUSE_CLIENT --query="INSERT INTO drop_detached SELECT number FROM system.
$CLICKHOUSE_CLIENT --query="INSERT INTO drop_detached SELECT number FROM system.numbers WHERE number % 2 = 1 LIMIT 8";
$CLICKHOUSE_CLIENT --query="ALTER TABLE drop_detached DETACH PARTITION 0";
$CLICKHOUSE_CLIENT --query="ALTER TABLE drop_detached DETACH PARTITION 1";
sudo -n mkdir --mode=777 $ch_dir/data/$cur_db/drop_detached/detached/attaching_0_6_6_0/ 2>/dev/null || \
mkdir --mode=777 $ch_dir/data/$cur_db/drop_detached/detached/attaching_0_6_6_0/
sudo -n mkdir --mode=777 $ch_dir/data/$cur_db/drop_detached/detached/deleting_0_7_7_0/ 2>/dev/null || \
mkdir --mode=777 $ch_dir/data/$cur_db/drop_detached/detached/deleting_0_7_7_0/
sudo -n mkdir --mode=777 $ch_dir/data/$cur_db/drop_detached/detached/any_other_name/ 2>/dev/null || \
mkdir --mode=777 $ch_dir/data/$cur_db/drop_detached/detached/any_other_name/
sudo -n mkdir --mode=777 $ch_dir/data/$cur_db/drop_detached/detached/prefix_1_2_2_0_0/ 2>/dev/null || \
mkdir --mode=777 $ch_dir/data/$cur_db/drop_detached/detached/prefix_1_2_2_0_0/
#sudo -n mkdir --mode=777 $ch_dir/data/$cur_db/drop_detached/detached/prefix_1_2_2_0/ 2>/dev/null || \
# mkdir --mode=777 $ch_dir/data/$cur_db/drop_detached/detached/prefix_1_2_2_0/
echo '=== validate part name ==='
$CLICKHOUSE_CLIENT --allow_drop_detached=1 --query="ALTER TABLE drop_detached DROP DETACHED PART '../1_2_2_0'" 2>&1 | grep "Invalid part name" > /dev/null && echo 'OK1'
$CLICKHOUSE_CLIENT --allow_drop_detached=1 --query="ALTER TABLE drop_detached DROP DETACHED PART '0_1_1_0'"
$CLICKHOUSE_CLIENT --allow_drop_detached=1 --query="ALTER TABLE drop_detached DROP DETACHED PART 'attaching_0_6_6_0'" 2>&1 | grep "Cannot drop part" > /dev/null && echo 'OK2'
$CLICKHOUSE_CLIENT --allow_drop_detached=1 --query="ALTER TABLE drop_detached DROP DETACHED PART 'deleting_0_7_7_0'" 2>&1 | grep "Cannot drop part" > /dev/null && echo 'OK3'
$CLICKHOUSE_CLIENT --allow_drop_detached=1 --query="ALTER TABLE drop_detached DROP DETACHED PART 'any_other_name'"
echo '=== drop detached part ==='
$CLICKHOUSE_CLIENT --query="SElECT name FROM system.detached_parts WHERE table='drop_detached' AND database='${cur_db}' ORDER BY name FORMAT TSV";
echo '=== drop detached partition ==='
$CLICKHOUSE_CLIENT --allow_drop_detached=1 --query="ALTER TABLE drop_detached DROP DETACHED PARTITION 1"
$CLICKHOUSE_CLIENT --query="SElECT name FROM system.detached_parts WHERE table='drop_detached' AND database='${cur_db}' ORDER BY name FORMAT TSV";
$CLICKHOUSE_CLIENT --query="DROP TABLE drop_detached";
$CLICKHOUSE_CLIENT --query="SYSTEM START MERGES";