This commit is contained in:
Nikolai Kochetov 2022-04-08 18:56:08 +00:00
parent bc3e1ec3f2
commit 7c4b288638
5 changed files with 75 additions and 40 deletions

View File

@ -43,6 +43,11 @@ bool DataPartStorageOnDisk::exists() const
return volume->getDisk()->exists(root_path);
}
Poco::Timestamp DataPartStorageOnDisk::getLastModified() const
{
return volume->getDisk()->getLastModified(root_path);
}
size_t DataPartStorageOnDisk::getFileSize(const String & path) const
{
return volume->getDisk()->getFileSize(fs::path(root_path) / path);
@ -80,6 +85,16 @@ UInt64 DataPartStorageOnDisk::calculateTotalSizeOnDisk() const
return calculateTotalSizeOnDiskImpl(volume->getDisk(), root_path);
}
bool DataPartStorageOnDisk::isStoredOnRemoteDisk() const
{
return volume->getDisk()->isRemote();
}
bool DataPartStorageOnDisk::supportZeroCopyReplication() const
{
return volume->getDisk()->supportZeroCopyReplication();
}
void DataPartStorageOnDisk::writeChecksums(MergeTreeDataPartChecksums & checksums) const
{
std::string path = fs::path(root_path) / "checksums.txt";
@ -104,6 +119,20 @@ void DataPartStorageOnDisk::writeColumns(NamesAndTypesList & columns) const
volume->getDisk()->moveFile(path + ".tmp", path);
}
void DataPartStorageOnDisk::writeDeleteOnDestroyMarker(Poco::Logger * log) const
{
String marker_path = fs::path(root_path) / "delete-on-destroy.txt";
auto disk = volume->getDisk();
try
{
volume->getDisk()->createFile(marker_path);
}
catch (Poco::Exception & e)
{
LOG_ERROR(log, "{} (while creating DeleteOnDestroy marker: {})", e.what(), backQuote(fullPath(disk, marker_path)));
}
}
void DataPartStorageOnDisk::rename(const String & new_relative_path, Poco::Logger * log, bool remove_new_dir_if_exists, bool fsync)
{
if (!volume->getDisk()->exists(root_path))

View File

@ -24,6 +24,8 @@ public:
bool exists(const std::string & path) const override;
bool exists() const override;
Poco::Timestamp getLastModified() const override;
size_t getFileSize(const std::string & path) const override;
DiskDirectoryIteratorPtr iterate() const override;
@ -35,9 +37,11 @@ public:
UInt64 calculateTotalSizeOnDisk() const override;
bool isStoredOnRemoteDisk() const override;
bool supportZeroCopyReplication() const override;
void writeChecksums(MergeTreeDataPartChecksums & checksums) const override;
void writeColumns(NamesAndTypesList & columns) const override;
void writeDeleteOnDestroyMarker(Poco::Logger * log) const override;
bool shallParticipateInMerges(const IStoragePolicy &) const override;

View File

@ -35,6 +35,8 @@ public:
virtual bool exists(const std::string & path) const = 0;
virtual bool exists() const = 0;
virtual Poco::Timestamp getLastModified() const = 0;
virtual DiskDirectoryIteratorPtr iterate() const = 0;
virtual DiskDirectoryIteratorPtr iterateDirectory(const std::string & path) const = 0;
@ -46,10 +48,12 @@ public:
virtual UInt64 calculateTotalSizeOnDisk() const = 0;
virtual bool isStoredOnRemoteDisk() const { return false; }
virtual bool supportZeroCopyReplication() const { return false; }
/// Should remove it later
virtual void writeChecksums(MergeTreeDataPartChecksums & checksums) const = 0;
virtual void writeColumns(NamesAndTypesList & columns) const = 0;
virtual void writeDeleteOnDestroyMarker(Poco::Logger * log) const = 0;
/// A leak of abstraction
virtual bool shallParticipateInMerges(const IStoragePolicy &) const { return true; }

View File

@ -51,6 +51,7 @@
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/MergedColumnOnlyOutputStream.h>
#include <Storages/MergeTree/DataPartStorageOnDisk.h>
#include <Storages/MergeTree/checkDataPart.h>
#include <Storages/MergeTree/localBackup.h>
#include <Storages/StorageMergeTree.h>
@ -1008,7 +1009,8 @@ void MergeTreeData::loadDataPartsFromDisk(
return;
const auto & part_info = *part_opt;
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, part_disk_ptr, 0);
auto part = createPart(part_name, part_info, single_disk_volume, part_name);
auto data_part_storage = std::make_shared<DataPartStorageOnDisk>(single_disk_volume, relative_data_path, part_name);
auto part = createPart(part_name, part_info, data_part_storage);
bool broken = false;
String part_path = fs::path(relative_data_path) / part_name;
@ -1016,7 +1018,7 @@ void MergeTreeData::loadDataPartsFromDisk(
if (part_disk_ptr->exists(marker_path))
{
/// NOTE: getBytesOnDisk() cannot be used here, since it maybe zero of checksums.txt will not exist
size_t size_of_part = IMergeTreeDataPart::calculateTotalSizeOnDisk(part->volume->getDisk(), part->getFullRelativePath());
size_t size_of_part = data_part_storage->calculateTotalSizeOnDisk();
LOG_WARNING(log,
"Detaching stale part {}{} (size: {}), which should have been deleted after a move. "
"That can only happen after unclean restart of ClickHouse after move of a part having an operation blocking that stale copy of part.",
@ -1053,7 +1055,7 @@ void MergeTreeData::loadDataPartsFromDisk(
if (broken)
{
/// NOTE: getBytesOnDisk() cannot be used here, since it maybe zero of checksums.txt will not exist
size_t size_of_part = IMergeTreeDataPart::calculateTotalSizeOnDisk(part->volume->getDisk(), part->getFullRelativePath());
size_t size_of_part = data_part_storage->calculateTotalSizeOnDisk();
LOG_ERROR(log,
"Detaching broken part {}{} (size: {}). "
@ -1082,7 +1084,7 @@ void MergeTreeData::loadDataPartsFromDisk(
{
if ((*it)->checksums.getTotalChecksumHex() == part->checksums.getTotalChecksumHex())
{
LOG_ERROR(log, "Remove duplicate part {}", part->getFullPath());
LOG_ERROR(log, "Remove duplicate part {}", data_part_storage->getFullPath());
duplicate_parts_to_remove.push_back(part);
}
else
@ -1182,7 +1184,7 @@ void MergeTreeData::loadDataPartsFromWAL(
{
if ((*it)->checksums.getTotalChecksumHex() == part->checksums.getTotalChecksumHex())
{
LOG_ERROR(log, "Remove duplicate part {}", part->getFullPath());
LOG_ERROR(log, "Remove duplicate part {}", part->data_part_storage->getFullPath());
duplicate_parts_to_remove.push_back(part);
}
else
@ -1584,7 +1586,7 @@ void MergeTreeData::flushAllInMemoryPartsIfNeeded()
if (auto part_in_memory = asInMemoryPart(part))
{
const auto & storage_relative_path = part_in_memory->storage.relative_data_path;
part_in_memory->flushToDisk(storage_relative_path, part_in_memory->relative_path, metadata_snapshot);
part_in_memory->flushToDisk(storage_relative_path, part_in_memory->data_part_storage->getRelativePath(), metadata_snapshot);
}
}
@ -2273,7 +2275,7 @@ MergeTreeDataPartType MergeTreeData::choosePartTypeOnDisk(size_t bytes_uncompres
MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(const String & name,
MergeTreeDataPartType type, const MergeTreePartInfo & part_info,
const DataPartStoragePtr & data_part_storage, const String & relative_path, const IMergeTreeDataPart * parent_part) const
const DataPartStoragePtr & data_part_storage, const IMergeTreeDataPart * parent_part) const
{
if (type == MergeTreeDataPartType::COMPACT)
return std::make_shared<MergeTreeDataPartCompact>(*this, name, part_info, data_part_storage, parent_part);
@ -2282,7 +2284,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(const String & name,
else if (type == MergeTreeDataPartType::IN_MEMORY)
return std::make_shared<MergeTreeDataPartInMemory>(*this, name, part_info, data_part_storage, parent_part);
else
throw Exception("Unknown type of part " + relative_path, ErrorCodes::UNKNOWN_PART_TYPE);
throw Exception("Unknown type of part " + data_part_storage->getFullRelativePath(), ErrorCodes::UNKNOWN_PART_TYPE);
}
static MergeTreeDataPartType getPartTypeFromMarkExtension(const String & mrk_ext)
@ -2298,14 +2300,14 @@ static MergeTreeDataPartType getPartTypeFromMarkExtension(const String & mrk_ext
}
MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(
const String & name, const DataPartStoragePtr & data_part_storage, const String & relative_path, const IMergeTreeDataPart * parent_part) const
const String & name, const DataPartStoragePtr & data_part_storage, const IMergeTreeDataPart * parent_part) const
{
return createPart(name, MergeTreePartInfo::fromPartName(name, format_version), data_part_storage, relative_path, parent_part);
return createPart(name, MergeTreePartInfo::fromPartName(name, format_version), data_part_storage, parent_part);
}
MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(
const String & name, const MergeTreePartInfo & part_info,
const DataPartStoragePtr & data_part_storage, const String & relative_path, const IMergeTreeDataPart * parent_part) const
const DataPartStoragePtr & data_part_storage, const IMergeTreeDataPart * parent_part) const
{
MergeTreeDataPartType type;
// auto full_path = fs::path(relative_data_path) / (parent_part ? parent_part->relative_path : "") / relative_path / "";
@ -2319,7 +2321,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(
type = choosePartTypeOnDisk(0, 0);
}
return createPart(name, type, part_info, data_part_storage, relative_path, parent_part);
return createPart(name, type, part_info, data_part_storage, parent_part);
}
void MergeTreeData::changeSettings(
@ -2564,7 +2566,7 @@ bool MergeTreeData::renameTempPartAndReplace(
else /// Parts from ReplicatedMergeTree already have names
part_name = part->name;
LOG_TRACE(log, "Renaming temporary part {} to {}.", part->relative_path, part_name);
LOG_TRACE(log, "Renaming temporary part {} to {}.", part->data_part_storage->getRelativePath(), part_name);
if (auto it_duplicate = data_parts_by_info.find(part_info); it_duplicate != data_parts_by_info.end())
{
@ -2827,9 +2829,9 @@ MergeTreeData::DataPartsVector MergeTreeData::removePartsInRangeFromWorkingSet(c
void MergeTreeData::forgetPartAndMoveToDetached(const MergeTreeData::DataPartPtr & part_to_detach, const String & prefix, bool restore_covered)
{
if (prefix.empty())
LOG_INFO(log, "Renaming {} to {} and forgetting it.", part_to_detach->relative_path, part_to_detach->name);
LOG_INFO(log, "Renaming {} to {} and forgetting it.", part_to_detach->data_part_storage->getRelativePath(), part_to_detach->name);
else
LOG_INFO(log, "Renaming {} to {}_{} and forgetting it.", part_to_detach->relative_path, prefix, part_to_detach->name);
LOG_INFO(log, "Renaming {} to {}_{} and forgetting it.", part_to_detach->data_part_storage->getRelativePath(), prefix, part_to_detach->name);
auto lock = lockParts();
bool removed_active_part = false;
@ -3191,8 +3193,8 @@ void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy)
original_active_part->force_keep_shared_data = false;
if (original_active_part->volume->getDisk()->supportZeroCopyReplication() &&
part_copy->volume->getDisk()->supportZeroCopyReplication() &&
if (original_active_part->data_part_storage->supportZeroCopyReplication() &&
part_copy->data_part_storage->supportZeroCopyReplication() &&
original_active_part->getUniqueId() == part_copy->getUniqueId())
{
/// May be when several volumes use the same S3/HDFS storage
@ -3209,16 +3211,7 @@ void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy)
ssize_t diff_rows = part_copy->rows_count - original_active_part->rows_count;
increaseDataVolume(diff_bytes, diff_rows, /* parts= */ 0);
auto disk = original_active_part->volume->getDisk();
String marker_path = fs::path(original_active_part->getFullRelativePath()) / IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME;
try
{
disk->createFile(marker_path);
}
catch (Poco::Exception & e)
{
LOG_ERROR(log, "{} (while creating DeleteOnDestroy marker: {})", e.what(), backQuote(fullPath(disk, marker_path)));
}
original_active_part->data_part_storage->writeDeleteOnDestroyMarker(log);
return;
}
}
@ -3287,11 +3280,8 @@ MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_na
static void loadPartAndFixMetadataImpl(MergeTreeData::MutableDataPartPtr part)
{
auto disk = part->volume->getDisk();
String full_part_path = part->getFullRelativePath();
part->loadColumnsChecksumsIndexes(false, true);
part->modification_time = disk->getLastModified(full_part_path).epochTime();
part->modification_time = part->data_part_storage->getLastModified().epochTime();
}
void MergeTreeData::calculateColumnAndSecondaryIndexSizesImpl()
@ -3443,7 +3433,7 @@ void MergeTreeData::movePartitionToDisk(const ASTPtr & partition, const String &
auto disk = getStoragePolicy()->getDiskByName(name);
parts.erase(std::remove_if(parts.begin(), parts.end(), [&](auto part_ptr)
{
return part_ptr->volume->getDisk()->getName() == disk->getName();
return part_ptr->data_part_storage->getName() == disk->getName();
}), parts.end());
if (parts.empty())
@ -3493,7 +3483,7 @@ void MergeTreeData::movePartitionToVolume(const ASTPtr & partition, const String
{
for (const auto & disk : volume->getDisks())
{
if (part_ptr->volume->getDisk()->getName() == disk->getName())
if (part_ptr->data_part_storage->getName() == disk->getName())
{
return true;
}
@ -3798,7 +3788,8 @@ private:
}
auto single_disk_volume = std::make_shared<SingleDiskVolume>(disk->getName(), disk, 0);
auto part = storage->createPart(part_name, part_info, single_disk_volume, relative_temp_part_dir);
auto data_part_storage = std::make_shared<DataPartStorageOnDisk>(single_disk_volume, relative_data_path, relative_temp_part_dir);
auto part = storage->createPart(part_name, part_info, data_part_storage);
part->loadColumnsChecksumsIndexes(false, true);
storage->renameTempPartAndAdd(part, increment);
return {};
@ -4173,7 +4164,8 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
LOG_DEBUG(log, "Checking part {}", new_name);
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + old_name, disk);
MutableDataPartPtr part = createPart(old_name, single_disk_volume, source_dir + new_name);
auto data_part_storage = std::make_shared<DataPartStorageOnDisk>(single_disk_volume, relative_data_path, source_dir + new_name);
MutableDataPartPtr part = createPart(old_name, data_part_storage);
loadPartAndFixMetadataImpl(part);
loaded_parts.push_back(part);
@ -4392,7 +4384,7 @@ void MergeTreeData::Transaction::rollbackPartsToTemporaryState()
WriteBufferFromOwnString buf;
buf << " Rollbacking parts state to temporary and removing from working set:";
for (const auto & part : precommitted_parts)
buf << " " << part->relative_path;
buf << " " << part->data_part_storage->getRelativePath();
buf << ".";
LOG_DEBUG(data.log, "Undoing transaction.{}", buf.str());
@ -4410,7 +4402,7 @@ void MergeTreeData::Transaction::rollback()
WriteBufferFromOwnString buf;
buf << " Removing parts:";
for (const auto & part : precommitted_parts)
buf << " " << part->relative_path;
buf << " " << part->data_part_storage->getRelativePath();
buf << ".";
LOG_DEBUG(data.log, "Undoing transaction.{}", buf.str());
@ -5361,7 +5353,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk(
disk->removeFileIfExists(fs::path(dst_part_path) / IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME);
auto single_disk_volume = std::make_shared<SingleDiskVolume>(disk->getName(), disk, 0);
auto dst_data_part = createPart(dst_part_name, dst_part_info, single_disk_volume, tmp_dst_part_name);
auto data_part_storage = std::make_shared<DataPartStorageOnDisk>(single_disk_volume, relative_data_path, tmp_dst_part_name);
auto dst_data_part = createPart(dst_part_name, dst_part_info, data_part_storage);
dst_data_part->is_temp = true;

View File

@ -1,5 +1,6 @@
#include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/DataPartStorageOnDisk.h>
#include <Columns/ColumnConst.h>
#include <Common/HashTable/HashMap.h>
#include <Common/Exception.h>
@ -367,12 +368,16 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPart(
ReservationPtr reservation = data.reserveSpacePreferringTTLRules(metadata_snapshot, expected_size, move_ttl_infos, time(nullptr), 0, true);
VolumePtr volume = data.getStoragePolicy()->getVolume(0);
auto data_part_storage = std::make_shared<DataPartStorageOnDisk>(
createVolumeFromReservation(reservation, volume),
data.relative_data_path,
TMP_PREFIX + part_name);
auto new_data_part = data.createPart(
part_name,
data.choosePartType(expected_size, block.rows()),
new_part_info,
createVolumeFromReservation(reservation, volume),
TMP_PREFIX + part_name);
data_part_storage);
if (data.storage_settings.get()->assign_part_uuids)
new_data_part->uuid = UUIDHelpers::generateV4();