Replaced the iteration algorithm in part finder

Now, instead of iterating through the directories, we iterate though
directories of on of the table disks (which doesn't give us a
substantial boost but is a bit neater to read).

- Updated the system.replication_queue command types.
- Fixed the part ptr being empty (added the checksum loading and
initialization).
- Removed extra logging.
This commit is contained in:
Mike Kot 2021-03-17 00:59:12 +03:00
parent 2ccdb7ef5c
commit 831b90f272
5 changed files with 48 additions and 48 deletions

View File

@ -14,7 +14,17 @@ Columns:
- `node_name` ([String](../../sql-reference/data-types/string.md)) — Node name in ZooKeeper.
- `type` ([String](../../sql-reference/data-types/string.md)) — Type of the task in the queue: `GET_PARTS`, `MERGE_PARTS`, `DETACH_PARTS`, `DROP_PARTS`, or `MUTATE_PARTS`.
- `type` ([String](../../sql-reference/data-types/string.md)) — Type of the task in the queue, one of:
- `GET_PART` - Get the part from another replica.
- `ATTACH_PART` - Attach the part, possibly from our own replica (if found in `detached` folder).
You may think of it as a `GET_PART` with some optimisations as they're nearly identical.
- `MERGE_PARTS` - Merge the parts.
- `DROP_RANGE` - Delete the parts in the specified partition in the specified number range.
- `CLEAR_COLUMN` - NOTE: Deprecated. Drop specific column from specified partition.
- `CLEAR_INDEX` - NOTE: Deprecated. Drop specific index from specified partition.
- `REPLACE_RANGE` - Drop certain range of partitions and replace them by new ones
- `MUTATE_PART` - Apply one or several mutations to the part.
- `ALTER_METADATA` - Apply alter modification according to global /metadata and /columns paths
- `create_time` ([Datetime](../../sql-reference/data-types/datetime.md)) — Date and time when the task was submitted for execution.

View File

@ -70,9 +70,6 @@ ALTER TABLE table_name DROP DETACHED PARTITION|PART partition_expr
Removes the specified part or all parts of the specified partition from `detached`.
Read more about setting the partition expression in a section [How to specify the partition expression](#alter-how-to-specify-part-expr).
Note: the command does NOT throw an exception if the specified part does not exist,
e.g. `ALTER TABLE mt DROP DETACHED PART[ITION] 'i_do_not_exist'` will succeed.
## ATTACH PARTITION\|PART {#alter_attach-partition}
``` sql

View File

@ -3160,15 +3160,18 @@ void MergeTreeData::dropDetached(const ASTPtr & partition, bool part, const Cont
MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const ASTPtr & partition, bool attach_part,
const Context & context, PartsTemporaryRename & renamed_parts)
{
String source_dir = "detached/";
const String source_dir = "detached/";
std::map<String, DiskPtr> name_to_disk;
/// Let's compose a list of parts that should be added.
if (attach_part)
{
String part_id = partition->as<ASTLiteral &>().value.safeGet<String>();
const String part_id = partition->as<ASTLiteral &>().value.safeGet<String>();
validateDetachedPartName(part_id);
renamed_parts.addPart(part_id, "attaching_" + part_id);
if (MergeTreePartInfo::tryParsePartName(part_id, nullptr, format_version))
name_to_disk[part_id] = getDiskForPart(part_id, source_dir);
}
@ -3179,12 +3182,14 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
ActiveDataPartSet active_parts(format_version);
const auto disks = getStoragePolicy()->getDisks();
for (const auto & disk : disks)
{
for (auto it = disk->iterateDirectory(relative_data_path + source_dir); it->isValid(); it->next())
{
const String & name = it->name();
MergeTreePartInfo part_info;
// TODO what if name contains "_tryN" suffix?
/// Parts with prefix in name (e.g. attaching_1_3_3_0, deleting_1_3_3_0) will be ignored
if (!MergeTreePartInfo::tryParsePartName(name, &part_info, format_version)
@ -3192,21 +3197,23 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
{
continue;
}
LOG_DEBUG(log, "Found part {}", name);
active_parts.add(name);
name_to_disk[name] = disk;
}
}
LOG_DEBUG(log, "{} of them are active", active_parts.size());
/// Inactive parts rename so they can not be attached in case of repeated ATTACH.
/// Inactive parts are renamed so they can not be attached in case of repeated ATTACH.
for (const auto & [name, disk] : name_to_disk)
{
String containing_part = active_parts.getContainingPart(name);
const String containing_part = active_parts.getContainingPart(name);
if (!containing_part.empty() && containing_part != name)
{
// TODO maybe use PartsTemporaryRename here?
disk->moveDirectory(relative_data_path + source_dir + name, relative_data_path + source_dir + "inactive_" + name);
}
disk->moveDirectory(relative_data_path + source_dir + name,
relative_data_path + source_dir + "inactive_" + name);
else
renamed_parts.addPart(name, "attaching_" + name);
}
@ -3221,11 +3228,13 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
MutableDataPartsVector loaded_parts;
loaded_parts.reserve(renamed_parts.old_and_new_names.size());
for (const auto & part_names : renamed_parts.old_and_new_names)
for (const auto & [old_name, new_name] : renamed_parts.old_and_new_names)
{
LOG_DEBUG(log, "Checking part {}", part_names.second);
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_names.first, name_to_disk[part_names.first], 0);
MutableDataPartPtr part = createPart(part_names.first, single_disk_volume, source_dir + part_names.second);
LOG_DEBUG(log, "Checking part {}", new_name);
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + old_name, name_to_disk[old_name]);
MutableDataPartPtr part = createPart(old_name, single_disk_volume, source_dir + new_name);
loadPartAndFixMetadataImpl(part);
loaded_parts.push_back(part);
}

View File

@ -270,9 +270,6 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
/// the MutableDataPartPtr here, we already have the data thus being able to
/// calculate the checksums.
log_entry.part_checksum = part->checksums.getTotalChecksumHex();
for (auto && [name, checksum] : part->checksums.files)
LOG_TRACE(log, "> On created file {}, file size {}", name, checksum.file_size);
}
else
log_entry.type = StorageReplicatedMergeTree::LogEntry::GET_PART;

View File

@ -1357,43 +1357,33 @@ String StorageReplicatedMergeTree::getChecksumsForZooKeeper(const MergeTreeDataP
MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::attachPartHelperFoundValidPart(const LogEntry& entry) const
{
const MergeTreePartInfo target_part = MergeTreePartInfo::fromPartName(entry.new_part_name, format_version);
const String& part_checksum = entry.part_checksum;
const MergeTreePartInfo actual_part_info = MergeTreePartInfo::fromPartName(entry.new_part_name, format_version);
const String part_new_name = actual_part_info.getPartName();
Poco::DirectoryIterator dir_end;
LOG_TRACE(log, "Trying to attach part {} from local data", part_new_name);
for (const String& path : getDataPaths())
{
for (Poco::DirectoryIterator it{path + "detached/"}; it != dir_end; ++it)
for (const DiskPtr & disk : getStoragePolicy()->getDisks())
for (const auto it = disk->iterateDirectory(relative_data_path + "detached/"); it->isValid(); it->next())
{
MergeTreePartInfo part_iter;
MergeTreePartInfo part_info;
if (!MergeTreePartInfo::tryParsePartName(it.name(), &part_iter, format_version) ||
part_iter.partition_id != target_part.partition_id)
if (!MergeTreePartInfo::tryParsePartName(it->name(), &part_info, format_version) ||
part_info.partition_id != actual_part_info.partition_id)
continue;
const String& part_name = part_iter.getPartName();
const String part_to_path = "detached/" + part_name;
const String part_old_name = part_info.getPartName();
const String part_path = "detached/" + part_old_name;
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_name,
getDiskForPart(part_name, "detached/"));
const VolumePtr volume = std::make_shared<SingleDiskVolume>("volume_" + part_old_name, disk);
MergeTreeData::MutableDataPartPtr part = createPart(part_new_name, part_info, volume, part_path);
MergeTreeData::MutableDataPartPtr iter_part_ptr =
createPart(part_name, part_iter, single_disk_volume, part_to_path);
// We don't check consistency as in that case this method will throw.
// The faster way is to load invalid data and just check the checksums -- they won't match.
part->loadColumnsChecksumsIndexes(true, false);
const String iter_part_checksums = iter_part_ptr->checksums.getTotalChecksumHex();
LOG_TRACE(log, "Candidate part: {}, path: {}, checksums: {}", part_name, part_to_path, iter_part_checksums);
for (auto && [name, checksum] : iter_part_ptr->checksums.files)
LOG_TRACE(log, "> File {}, file size {}", name, checksum.file_size);
if (part_checksum != iter_part_checksums)
continue;
return iter_part_ptr;
if (entry.part_checksum == part->checksums.getTotalChecksumHex())
return part;
}
}
return {};
}
@ -1437,8 +1427,6 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
if (entry.type == LogEntry::ATTACH_PART)
{
LOG_TRACE(log, "Trying to find part in detached/");
if (MutableDataPartPtr part = attachPartHelperFoundValidPart(entry); part)
{
LOG_TRACE(log, "Found valid part {} to attach from local data, preparing the transaction",
@ -1446,7 +1434,6 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
Transaction transaction(*this);
// don't need the replaced parts
renameTempPartAndReplace(part, nullptr, &transaction);
checkPartChecksumsAndCommit(transaction, part);