diff --git a/src/Storages/MergeTree/DataPartStorageOnDisk.cpp b/src/Storages/MergeTree/DataPartStorageOnDisk.cpp index ea12fdfb9cf..0781afa07e5 100644 --- a/src/Storages/MergeTree/DataPartStorageOnDisk.cpp +++ b/src/Storages/MergeTree/DataPartStorageOnDisk.cpp @@ -95,30 +95,42 @@ size_t DataPartStorageOnDisk::getFileSize(const String & path) const return volume->getDisk()->getFileSize(fs::path(root_path) / part_dir / path); } -DiskDirectoryIteratorPtr DataPartStorageOnDisk::iterate() const +UInt32 DataPartStorageOnDisk::getRefCount(const String & path) const { - return volume->getDisk()->iterateDirectory(fs::path(root_path) / part_dir); + return volume->getDisk()->getRefCount(fs::path(root_path) / part_dir / path); } -DiskDirectoryIteratorPtr DataPartStorageOnDisk::iterateDirectory(const String & path) const +class DataPartStorageIteratorOnDisk final : public IDataPartStorageIterator { - return volume->getDisk()->iterateDirectory(fs::path(root_path) / part_dir / path); +public: + DataPartStorageIteratorOnDisk(DiskPtr disk_, DiskDirectoryIteratorPtr it_) + : disk(std::move(disk_)), it(std::move(it_)) + { + } + + void next() override { it->next(); } + bool isValid() const override { return it->isValid(); } + bool isFile() const override { return isValid() && disk->isFile(it->path()); } + std::string name() const override { return it->name(); } + +private: + DiskPtr disk; + DiskDirectoryIteratorPtr it; +}; + +DataPartStorageIteratorPtr DataPartStorageOnDisk::iterate() const +{ + return std::make_unique( + volume->getDisk(), + volume->getDisk()->iterateDirectory(fs::path(root_path) / part_dir)); } -// namespace -// { -// static constexpr std::string_view non_checksum_files[] = -// { -// "checksums.txt", -// "columns.txt", -// "default_compression_codec.txt", -// "delete-on-destroy.txt", -// "txn_version.txt", -// }; - -// static constexpr std::span projection_non_checksum_files(non_checksum_files, 4); -// static constexpr std::span part_non_checksum_files(non_checksum_files, 5); -// } +DataPartStorageIteratorPtr DataPartStorageOnDisk::iterateDirectory(const String & path) const +{ + return std::make_unique( + volume->getDisk(), + volume->getDisk()->iterateDirectory(fs::path(root_path) / part_dir / path)); +} void DataPartStorageOnDisk::remove( bool keep_shared_data, @@ -244,6 +256,11 @@ DataPartStoragePtr DataPartStorageOnDisk::getProjection(const std::string & name return std::make_shared(volume, std::string(fs::path(root_path) / part_dir), name); } +DiskPtr DataPartStorageOnDisk::getDisk() const +{ + return volume->getDisk(); +} + static UInt64 calculateTotalSizeOnDiskImpl(const DiskPtr & disk, const String & from) { if (disk->isFile(from)) @@ -281,28 +298,62 @@ std::string DataPartStorageOnDisk::getDiskPathForLogs() const return volume->getDisk()->getPath(); } -void DataPartStorageOnDisk::writeChecksums(MergeTreeDataPartChecksums & checksums) const +void DataPartStorageOnDisk::writeChecksums(const MergeTreeDataPartChecksums & checksums) const { std::string path = fs::path(root_path) / part_dir / "checksums.txt"; + try { - auto out = volume->getDisk()->writeFile(path + ".tmp", 4096); - checksums.write(*out); - } + { + auto out = volume->getDisk()->writeFile(path + ".tmp", 4096); + checksums.write(*out); + } - volume->getDisk()->moveFile(path + ".tmp", path); + volume->getDisk()->moveFile(path + ".tmp", path); + } + catch (...) + { + try + { + if (volume->getDisk()->exists(path + ".tmp")) + volume->getDisk()->removeFile(path + ".tmp"); + } + catch (...) + { + tryLogCurrentException("DataPartStorageOnDisk"); + } + + throw; + } } -void DataPartStorageOnDisk::writeColumns(NamesAndTypesList & columns) const +void DataPartStorageOnDisk::writeColumns(const NamesAndTypesList & columns) const { std::string path = fs::path(root_path) / part_dir / "columns.txt"; + try { - auto buf = volume->getDisk()->writeFile(path + ".tmp", 4096); - columns.writeText(*buf); - } + { + auto buf = volume->getDisk()->writeFile(path + ".tmp", 4096); + columns.writeText(*buf); + } - volume->getDisk()->moveFile(path + ".tmp", path); + volume->getDisk()->moveFile(path + ".tmp", path); + } + catch (...) + { + try + { + if (volume->getDisk()->exists(path + ".tmp")) + volume->getDisk()->removeFile(path + ".tmp"); + } + catch (...) + { + tryLogCurrentException("DataPartStorageOnDisk"); + } + + throw; + } } void DataPartStorageOnDisk::writeDeleteOnDestroyMarker(Poco::Logger * log) const @@ -406,6 +457,10 @@ std::string DataPartStorageOnDisk::getName() const return volume->getDisk()->getName(); } +std::string DataPartStorageOnDisk::getDiskType() const +{ + return toString(volume->getDisk()->getType()); +} void DataPartStorageOnDisk::backup( TemporaryFilesOnDisks & temp_dirs, @@ -517,6 +572,30 @@ void DataPartStorageBuilderOnDisk::removeRecursive() volume->getDisk()->removeRecursive(fs::path(root_path) / part_dir); } +void DataPartStorageBuilderOnDisk::removeSharedRecursive(bool keep_in_remote_fs) +{ + volume->getDisk()->removeSharedRecursive(fs::path(root_path) / part_dir, keep_in_remote_fs); +} + +SyncGuardPtr DataPartStorageBuilderOnDisk::getDirectorySyncGuard() const +{ + return volume->getDisk()->getDirectorySyncGuard(fs::path(root_path) / part_dir); +} + +void DataPartStorageBuilderOnDisk::createHardLinkFrom(const IDataPartStorage & source, const std::string & from, const std::string & to) const +{ + const auto * source_on_disk = typeid_cast(&source); + if (!source_on_disk) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Cannot create hardlink from different storage. Expected DataPartStorageOnDisk, got {}", + typeid(source).name()); + + volume->getDisk()->createHardLink( + fs::path(source_on_disk->getFullRelativePath()) / from, + fs::path(root_path) / part_dir / to); +} + bool DataPartStorageBuilderOnDisk::exists() const { return volume->getDisk()->exists(fs::path(root_path) / part_dir); @@ -533,11 +612,21 @@ std::string DataPartStorageBuilderOnDisk::getFullPath() const return fs::path(volume->getDisk()->getPath()) / root_path / part_dir; } +std::string DataPartStorageBuilderOnDisk::getFullRelativePath() const +{ + return fs::path(root_path) / part_dir; +} + void DataPartStorageBuilderOnDisk::createDirectories() { return volume->getDisk()->createDirectories(fs::path(root_path) / part_dir); } +void DataPartStorageBuilderOnDisk::createProjection(const std::string & name) +{ + return volume->getDisk()->createDirectory(fs::path(root_path) / part_dir / name); +} + ReservationPtr DataPartStorageBuilderOnDisk::reserve(UInt64 bytes) { auto res = volume->reserve(bytes); diff --git a/src/Storages/MergeTree/DataPartStorageOnDisk.h b/src/Storages/MergeTree/DataPartStorageOnDisk.h index 4f75d7bd20d..f83be366ab4 100644 --- a/src/Storages/MergeTree/DataPartStorageOnDisk.h +++ b/src/Storages/MergeTree/DataPartStorageOnDisk.h @@ -27,9 +27,10 @@ public: Poco::Timestamp getLastModified() const override; size_t getFileSize(const std::string & path) const override; + UInt32 getRefCount(const String & path) const override; - DiskDirectoryIteratorPtr iterate() const override; - DiskDirectoryIteratorPtr iterateDirectory(const std::string & path) const override; + DataPartStorageIteratorPtr iterate() const override; + DataPartStorageIteratorPtr iterateDirectory(const std::string & path) const override; void remove( bool keep_shared_data, @@ -53,8 +54,8 @@ public: bool isBroken() const override; std::string getDiskPathForLogs() const override; - void writeChecksums(MergeTreeDataPartChecksums & checksums) const override; - void writeColumns(NamesAndTypesList & columns) const override; + void writeChecksums(const MergeTreeDataPartChecksums & checksums) const override; + void writeColumns(const NamesAndTypesList & columns) const override; void writeDeleteOnDestroyMarker(Poco::Logger * log) const override; void checkConsistency(const MergeTreeDataPartChecksums & checksums) const override; @@ -84,9 +85,12 @@ public: void rename(const String & new_relative_path, Poco::Logger * log, bool remove_new_dir_if_exists, bool fsync) override; std::string getName() const override; + std::string getDiskType() const override; DataPartStoragePtr getProjection(const std::string & name) const override; + DiskPtr getDisk() const; + private: VolumePtr volume; std::string root_path; @@ -103,6 +107,7 @@ private: class DataPartStorageBuilderOnDisk final : public IDataPartStorageBuilder { +public: DataPartStorageBuilderOnDisk(VolumePtr volume_, std::string root_path_, std::string part_dir_); void setRelativePath(const std::string & path) override; @@ -111,9 +116,11 @@ class DataPartStorageBuilderOnDisk final : public IDataPartStorageBuilder bool exists(const std::string & path) const override; void createDirectories() override; + void createProjection(const std::string & name) override; std::string getRelativePath() const override { return part_dir; } std::string getFullPath() const override; + std::string getFullRelativePath() const override; std::unique_ptr readFile( const std::string & path, @@ -127,6 +134,11 @@ class DataPartStorageBuilderOnDisk final : public IDataPartStorageBuilder void removeFile(const String & path) override; void removeRecursive() override; + void removeSharedRecursive(bool keep_in_remote_fs) override; + + SyncGuardPtr getDirectorySyncGuard() const override; + + void createHardLinkFrom(const IDataPartStorage & source, const std::string & from, const std::string & to) const override; ReservationPtr reserve(UInt64 bytes) override; diff --git a/src/Storages/MergeTree/DataPartsExchange.cpp b/src/Storages/MergeTree/DataPartsExchange.cpp index 4e7dcc60696..cba52a26c4d 100644 --- a/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/src/Storages/MergeTree/DataPartsExchange.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -166,9 +167,8 @@ void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, Write if (data_settings->allow_remote_fs_zero_copy_replication && client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_ZERO_COPY) { - auto disk = part->volume->getDisk(); - auto disk_type = toString(disk->getType()); - if (disk->supportZeroCopyReplication() && std::find(capability.begin(), capability.end(), disk_type) != capability.end()) + auto disk_type = part->data_part_storage->getDiskType(); + if (part->data_part_storage->supportZeroCopyReplication() && std::find(capability.begin(), capability.end(), disk_type) != capability.end()) { /// Send metadata if the receiver's capability covers the source disk type. response.addCookie({"remote_fs_metadata", disk_type}); @@ -259,7 +259,7 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDisk( checksums.files[file_name] = {}; } - auto disk = part->volume->getDisk(); + //auto disk = part->volume->getDisk(); MergeTreeData::DataPart::Checksums data_checksums; for (const auto & [name, projection] : part->getProjectionParts()) { @@ -285,14 +285,14 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDisk( { String file_name = it.first; - String path = fs::path(part->getFullRelativePath()) / file_name; + //String path = fs::path(part->getFullRelativePath()) / file_name; - UInt64 size = disk->getFileSize(path); + UInt64 size = part->data_part_storage->getFileSize(file_name); writeStringBinary(it.first, out); writeBinary(size, out); - auto file_in = disk->readFile(path); + auto file_in = part->data_part_storage->readFile(file_name, {}, std::nullopt, std::nullopt); HashingWriteBuffer hashing_out(out); copyDataWithThrottler(*file_in, hashing_out, blocker.getCounter(), data.getSendsThrottler()); @@ -300,7 +300,11 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDisk( throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED); if (hashing_out.count() != size) - throw Exception(ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART, "Unexpected size of file {}, expected {} got {}", path, hashing_out.count(), size); + throw Exception( + ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART, + "Unexpected size of file {}, expected {} got {}", + std::string(fs::path(part->data_part_storage->getFullRelativePath()) / file_name), + hashing_out.count(), size); writePODBinary(hashing_out.getHash(), out); @@ -314,7 +318,11 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDisk( void Service::sendPartFromDiskRemoteMeta(const MergeTreeData::DataPartPtr & part, WriteBuffer & out) { - auto disk = part->volume->getDisk(); + const auto * data_part_storage_on_disk = typeid_cast(part->data_part_storage.get()); + if (!data_part_storage_on_disk) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Storage '{}' doesn't support zero-copy replication", part->data_part_storage->getName()); + + auto disk = data_part_storage_on_disk->getDisk(); if (!disk->supportZeroCopyReplication()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Disk '{}' doesn't support zero-copy replication", disk->getName()); @@ -328,7 +336,7 @@ void Service::sendPartFromDiskRemoteMeta(const MergeTreeData::DataPartPtr & part std::vector paths; paths.reserve(checksums.files.size()); for (const auto & it : checksums.files) - paths.push_back(fs::path(part->getFullRelativePath()) / it.first); + paths.push_back(fs::path(part->data_part_storage->getFullRelativePath()) / it.first); /// Serialized metadatadatas with zero ref counts. auto metadatas = disk->getSerializedMetadata(paths); @@ -340,7 +348,7 @@ void Service::sendPartFromDiskRemoteMeta(const MergeTreeData::DataPartPtr & part for (const auto & it : checksums.files) { const String & file_name = it.first; - String file_path_prefix = fs::path(part->getFullRelativePath()) / file_name; + String file_path_prefix = fs::path(part->data_part_storage->getFullRelativePath()) / file_name; /// Just some additional checks String metadata_file_path = fs::path(disk->getPath()) / file_path_prefix; @@ -571,8 +579,19 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory( ThrottlerPtr throttler) { auto volume = std::make_shared("volume_" + part_name, disk, 0); + + auto data_part_storage = std::make_shared( + volume, + data.getRelativeDataPath(), + part_name); + + auto data_part_storage_builder = std::make_shared( + volume, + data.getRelativeDataPath(), + part_name); + MergeTreeData::MutableDataPartPtr new_data_part = - std::make_shared(data, part_name, volume); + std::make_shared(data, part_name, data_part_storage); for (auto i = 0ul; i < projections; ++i) { @@ -586,9 +605,12 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory( auto block = block_in.read(); throttler->add(block.bytes()); + auto projection_part_storage = data_part_storage->getProjection(projection_name + ".proj"); + auto projection_part_storage_builder = data_part_storage_builder->getProjection(projection_name + ".proj"); + MergeTreePartInfo new_part_info("all", 0, 0, 0); MergeTreeData::MutableDataPartPtr new_projection_part = - std::make_shared(data, projection_name, new_part_info, volume, projection_name, new_data_part.get()); + std::make_shared(data, projection_name, new_part_info, projection_part_storage, new_data_part.get()); new_projection_part->is_temp = false; new_projection_part->setColumns(block.getNamesAndTypesList()); @@ -598,6 +620,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory( MergedBlockOutputStream part_out( new_projection_part, + projection_part_storage_builder, metadata_snapshot->projections.get(projection_name).metadata, block.getNamesAndTypesList(), {}, @@ -624,7 +647,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory( new_data_part->partition.create(metadata_snapshot, block, 0, context); MergedBlockOutputStream part_out( - new_data_part, metadata_snapshot, block.getNamesAndTypesList(), {}, + new_data_part, data_part_storage_builder, metadata_snapshot, block.getNamesAndTypesList(), {}, CompressionCodecFactory::instance().get("NONE", {})); part_out.write(block); @@ -636,9 +659,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory( void Fetcher::downloadBaseOrProjectionPartToDisk( const String & replica_path, - const String & part_download_path, + DataPartStorageBuilderPtr & data_part_storage_builder, bool sync, - DiskPtr disk, PooledReadWriteBufferFromHTTP & in, MergeTreeData::DataPart::Checksums & checksums, ThrottlerPtr throttler) const @@ -656,14 +678,14 @@ void Fetcher::downloadBaseOrProjectionPartToDisk( /// File must be inside "absolute_part_path" directory. /// Otherwise malicious ClickHouse replica may force us to write to arbitrary path. - String absolute_file_path = fs::weakly_canonical(fs::path(part_download_path) / file_name); - if (!startsWith(absolute_file_path, fs::weakly_canonical(part_download_path).string())) + String absolute_file_path = fs::weakly_canonical(fs::path(data_part_storage_builder->getFullRelativePath()) / file_name); + if (!startsWith(absolute_file_path, fs::weakly_canonical(data_part_storage_builder->getFullRelativePath()).string())) throw Exception(ErrorCodes::INSECURE_PATH, "File path ({}) doesn't appear to be inside part path ({}). " "This may happen if we are trying to download part from malicious replica or logical error.", - absolute_file_path, part_download_path); + absolute_file_path, data_part_storage_builder->getFullRelativePath()); - auto file_out = disk->writeFile(fs::path(part_download_path) / file_name); + auto file_out = data_part_storage_builder->writeFile(file_name, file_size); HashingWriteBuffer hashing_out(*file_out); copyDataWithThrottler(in, hashing_out, file_size, blocker.getCounter(), throttler); @@ -672,7 +694,7 @@ void Fetcher::downloadBaseOrProjectionPartToDisk( /// NOTE The is_cancelled flag also makes sense to check every time you read over the network, /// performing a poll with a not very large timeout. /// And now we check it only between read chunks (in the `copyData` function). - disk->removeRecursive(part_download_path); + data_part_storage_builder->removeRecursive(); throw Exception("Fetching of part was cancelled", ErrorCodes::ABORTED); } @@ -682,7 +704,7 @@ void Fetcher::downloadBaseOrProjectionPartToDisk( if (expected_hash != hashing_out.getHash()) throw Exception(ErrorCodes::CHECKSUM_DOESNT_MATCH, "Checksum mismatch for file {} transferred from {}", - fullPath(disk, (fs::path(part_download_path) / file_name).string()), + (fs::path(data_part_storage_builder->getFullPath()) / file_name).string(), replica_path); if (file_name != "checksums.txt" && @@ -717,21 +739,33 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk( || std::string::npos != part_name.find_first_of("/.")) throw Exception("Logical error: tmp_prefix and part_name cannot be empty or contain '.' or '/' characters.", ErrorCodes::LOGICAL_ERROR); - String part_relative_path = String(to_detached ? "detached/" : "") + tmp_prefix + part_name; - String part_download_path = data.getRelativeDataPath() + part_relative_path + "/"; + String part_dir = tmp_prefix + part_name; + String part_relative_path = data.getRelativeDataPath() + String(to_detached ? "detached/" : ""); - if (disk->exists(part_download_path)) + auto volume = std::make_shared("volume_" + part_name, disk, 0); + + auto data_part_storage = std::make_shared( + volume, + part_relative_path, + part_dir); + + DataPartStorageBuilderPtr data_part_storage_builder = std::make_shared( + volume, + part_relative_path, + part_dir); + + if (data_part_storage_builder->exists()) { LOG_WARNING(log, "Directory {} already exists, probably result of a failed fetch. Will remove it before fetching part.", - fullPath(disk, part_download_path)); - disk->removeRecursive(part_download_path); + data_part_storage_builder->getFullPath()); + data_part_storage_builder->removeRecursive(); } - disk->createDirectories(part_download_path); + data_part_storage_builder->createDirectories(); SyncGuardPtr sync_guard; if (data.getSettings()->fsync_part_directory) - sync_guard = disk->getDirectorySyncGuard(part_download_path); + sync_guard = disk->getDirectorySyncGuard(data_part_storage->getRelativePath()); CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedFetch}; @@ -740,19 +774,22 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk( String projection_name; readStringBinary(projection_name, in); MergeTreeData::DataPart::Checksums projection_checksum; - disk->createDirectories(part_download_path + projection_name + ".proj/"); + + auto projection_part_storage = data_part_storage->getProjection(projection_name + ".proj"); + auto projection_part_storage_builder = data_part_storage_builder->getProjection(projection_name + ".proj"); + + projection_part_storage_builder->createDirectories(); downloadBaseOrProjectionPartToDisk( - replica_path, part_download_path + projection_name + ".proj/", sync, disk, in, projection_checksum, throttler); + replica_path, projection_part_storage_builder, sync, in, projection_checksum, throttler); checksums.addFile( projection_name + ".proj", projection_checksum.getTotalSizeOnDisk(), projection_checksum.getTotalChecksumUInt128()); } // Download the base part - downloadBaseOrProjectionPartToDisk(replica_path, part_download_path, sync, disk, in, checksums, throttler); + downloadBaseOrProjectionPartToDisk(replica_path, data_part_storage_builder, sync, in, checksums, throttler); assertEOF(in); - auto volume = std::make_shared("volume_" + part_name, disk, 0); - MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(part_name, volume, part_relative_path); + MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(part_name, data_part_storage); new_data_part->is_temp = true; new_data_part->modification_time = time(nullptr); new_data_part->loadColumnsChecksumsIndexes(true, false); @@ -785,21 +822,31 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDiskRemoteMeta( static const String TMP_PREFIX = "tmp-fetch_"; String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_; - String part_relative_path = String(to_detached ? "detached/" : "") + tmp_prefix + part_name; - String part_download_path = fs::path(data.getRelativeDataPath()) / part_relative_path / ""; + String part_dir = tmp_prefix + part_name; + String part_relative_path = data.getRelativeDataPath() + String(to_detached ? "detached/" : ""); - if (disk->exists(part_download_path)) - throw Exception(ErrorCodes::DIRECTORY_ALREADY_EXISTS, "Directory {} already exists.", fullPath(disk, part_download_path)); + auto volume = std::make_shared("volume_" + part_name, disk); + + auto data_part_storage = std::make_shared( + volume, + part_relative_path, + part_dir); + + DataPartStorageBuilderPtr data_part_storage_builder = std::make_shared( + volume, + part_relative_path, + part_dir); + + if (data_part_storage_builder->exists()) + throw Exception(ErrorCodes::DIRECTORY_ALREADY_EXISTS, "Directory {} already exists.", data_part_storage_builder->getFullPath()); CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedFetch}; - disk->createDirectories(part_download_path); + data_part_storage_builder->createDirectories(); size_t files; readBinary(files, in); - auto volume = std::make_shared("volume_" + part_name, disk); - for (size_t i = 0; i < files; ++i) { String file_name; @@ -808,8 +855,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDiskRemoteMeta( readStringBinary(file_name, in); readBinary(file_size, in); - String data_path = fs::path(part_download_path) / file_name; - String metadata_file = fullPath(disk, data_path); + String metadata_file = fs::path(data_part_storage_builder->getFullPath()) / file_name; { auto file_out = std::make_unique(metadata_file, DBMS_DEFAULT_BUFFER_SIZE, -1, 0666, nullptr, 0); @@ -823,7 +869,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDiskRemoteMeta( /// NOTE The is_cancelled flag also makes sense to check every time you read over the network, /// performing a poll with a not very large timeout. /// And now we check it only between read chunks (in the `copyData` function). - disk->removeSharedRecursive(part_download_path, true); + data_part_storage_builder->removeSharedRecursive(true); throw Exception("Fetching of part was cancelled", ErrorCodes::ABORTED); } @@ -841,7 +887,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDiskRemoteMeta( assertEOF(in); - MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(part_name, volume, part_relative_path); + MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(part_name, data_part_storage); new_data_part->is_temp = true; new_data_part->modification_time = time(nullptr); new_data_part->loadColumnsChecksumsIndexes(true, false); diff --git a/src/Storages/MergeTree/DataPartsExchange.h b/src/Storages/MergeTree/DataPartsExchange.h index 8dfcfef9a8b..0e19bf4cdcd 100644 --- a/src/Storages/MergeTree/DataPartsExchange.h +++ b/src/Storages/MergeTree/DataPartsExchange.h @@ -90,9 +90,8 @@ public: private: void downloadBaseOrProjectionPartToDisk( const String & replica_path, - const String & part_download_path, + DataPartStorageBuilderPtr & data_part_storage_builder, bool sync, - DiskPtr disk, PooledReadWriteBufferFromHTTP & in, MergeTreeData::DataPart::Checksums & checksums, ThrottlerPtr throttler) const; diff --git a/src/Storages/MergeTree/IDataPartStorage.h b/src/Storages/MergeTree/IDataPartStorage.h index 3628f9c3ad6..856a786cb96 100644 --- a/src/Storages/MergeTree/IDataPartStorage.h +++ b/src/Storages/MergeTree/IDataPartStorage.h @@ -11,8 +11,25 @@ class ReadBufferFromFileBase; class WriteBufferFromFileBase; -class IDiskDirectoryIterator; -using DiskDirectoryIteratorPtr = std::unique_ptr; +class IDataPartStorageIterator +{ +public: + /// Iterate to the next file. + virtual void next() = 0; + + /// Return `true` if the iterator points to a valid element. + virtual bool isValid() const = 0; + + /// Return `true` if the iterator points to a file. + virtual bool isFile() const = 0; + + /// Name of the file that the iterator currently points to. + virtual std::string name() const = 0; + + virtual ~IDataPartStorageIterator() = default; +}; + +using DataPartStorageIteratorPtr = std::unique_ptr; struct MergeTreeDataPartChecksums; @@ -24,6 +41,9 @@ class IStoragePolicy; class IDisk; using DiskPtr = std::shared_ptr; +class ISyncGuard; +using SyncGuardPtr = std::unique_ptr; + class IBackupEntry; using BackupEntryPtr = std::unique_ptr; using BackupEntries = std::vector>; @@ -50,8 +70,8 @@ public: virtual Poco::Timestamp getLastModified() const = 0; - virtual DiskDirectoryIteratorPtr iterate() const = 0; - virtual DiskDirectoryIteratorPtr iterateDirectory(const std::string & path) const = 0; + virtual DataPartStorageIteratorPtr iterate() const = 0; + virtual DataPartStorageIteratorPtr iterateDirectory(const std::string & path) const = 0; struct ProjectionChecksums { @@ -66,6 +86,7 @@ public: Poco::Logger * log) const = 0; virtual size_t getFileSize(const std::string & path) const = 0; + virtual UInt32 getRefCount(const String &) const { return 0; } virtual std::string getRelativePathForPrefix(Poco::Logger * log, const String & prefix, bool detached) const = 0; @@ -85,8 +106,8 @@ public: virtual std::string getDiskPathForLogs() const = 0; /// Should remove it later - virtual void writeChecksums(MergeTreeDataPartChecksums & checksums) const = 0; - virtual void writeColumns(NamesAndTypesList & columns) const = 0; + virtual void writeChecksums(const MergeTreeDataPartChecksums & checksums) const = 0; + virtual void writeColumns(const NamesAndTypesList & columns) const = 0; virtual void writeDeleteOnDestroyMarker(Poco::Logger * log) const = 0; virtual void checkConsistency(const MergeTreeDataPartChecksums & checksums) const = 0; @@ -124,6 +145,7 @@ public: /// Disk name virtual std::string getName() const = 0; + virtual std::string getDiskType() const = 0; virtual std::shared_ptr getProjection(const std::string & name) const = 0; }; @@ -140,11 +162,13 @@ public: virtual std::string getRelativePath() const = 0; virtual std::string getFullPath() const = 0; + virtual std::string getFullRelativePath() const = 0; virtual bool exists() const = 0; virtual bool exists(const std::string & path) const = 0; virtual void createDirectories() = 0; + virtual void createProjection(const std::string & name) = 0; virtual std::unique_ptr readFile( const std::string & path, @@ -152,12 +176,15 @@ public: std::optional read_hint, std::optional file_size) const = 0; - virtual std::unique_ptr writeFile( - const String & path, - size_t buf_size /* = DBMS_DEFAULT_BUFFER_SIZE*/) = 0; + virtual std::unique_ptr writeFile(const String & path, size_t buf_size) = 0; virtual void removeFile(const String & path) = 0; virtual void removeRecursive() = 0; + virtual void removeSharedRecursive(bool keep_in_remote_fs) = 0; + + virtual SyncGuardPtr getDirectorySyncGuard() const { return nullptr; } + + virtual void createHardLinkFrom(const IDataPartStorage & source, const std::string & from, const std::string & to) const = 0; virtual ReservationPtr reserve(UInt64 /*bytes*/) { return nullptr; } diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 7cbb237d45a..b0a857e6c97 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -1298,12 +1298,7 @@ void IMergeTreeDataPart::remove() const return; if (isProjectionPart()) - { - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Projection part {} should be removed by its parent {}.", - name, parent_part->name); - } + LOG_WARNING(storage.log, "Projection part {} should be removed by its parent {}.", name, parent_part->name); metadata_manager->deleteAll(false); metadata_manager->assertAllDeleted(false); diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index 4097ec178ab..f4b904256df 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -87,8 +87,8 @@ public: UncompressedCache * uncompressed_cache, MarkCache * mark_cache, const MergeTreeReaderSettings & reader_settings_, - const ValueSizeMap & avg_value_size_hints_ = ValueSizeMap{}, - const ReadBufferFromFileBase::ProfileCallback & profile_callback_ = ReadBufferFromFileBase::ProfileCallback{}) const = 0; + const ValueSizeMap & avg_value_size_hints_, + const ReadBufferFromFileBase::ProfileCallback & profile_callback_) const = 0; virtual MergeTreeWriterPtr getWriter( DataPartStorageBuilderPtr data_part_storage_builder, @@ -97,7 +97,7 @@ public: const std::vector & indices_to_recalc, const CompressionCodecPtr & default_codec_, const MergeTreeWriterSettings & writer_settings, - const MergeTreeIndexGranularity & computed_index_granularity = {}) const = 0; + const MergeTreeIndexGranularity & computed_index_granularity) const = 0; virtual bool isStoredOnDisk() const = 0; diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index e8241ffe080..84c2cecb122 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -5,6 +5,7 @@ #include #include +#include #include #include @@ -151,12 +152,22 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() auto local_single_disk_volume = std::make_shared("volume_" + global_ctx->future_part->name, ctx->disk, 0); + + auto data_part_storage = std::make_shared( + local_single_disk_volume, + local_part_path, + local_tmp_part_basename); + + global_ctx->data_part_storage_builder = std::make_shared( + local_single_disk_volume, + local_part_path, + local_tmp_part_basename); + global_ctx->new_data_part = global_ctx->data->createPart( global_ctx->future_part->name, global_ctx->future_part->type, global_ctx->future_part->part_info, - local_single_disk_volume, - local_tmp_part_basename, + data_part_storage, global_ctx->parent_part); global_ctx->new_data_part->uuid = global_ctx->future_part->uuid; @@ -256,6 +267,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() global_ctx->to = std::make_shared( global_ctx->new_data_part, + global_ctx->data_part_storage_builder, global_ctx->metadata_snapshot, global_ctx->merging_columns, MergeTreeIndexFactory::instance().getMany(global_ctx->metadata_snapshot->getSecondaryIndices()), @@ -438,6 +450,7 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const ctx->executor = std::make_unique(ctx->column_parts_pipeline); ctx->column_to = std::make_unique( + global_ctx->data_part_storage_builder, global_ctx->new_data_part, global_ctx->metadata_snapshot, ctx->executor->getHeader(), diff --git a/src/Storages/MergeTree/MergeTask.h b/src/Storages/MergeTree/MergeTask.h index 04da9ad77c4..6425e997432 100644 --- a/src/Storages/MergeTree/MergeTask.h +++ b/src/Storages/MergeTree/MergeTask.h @@ -157,6 +157,7 @@ private: std::unique_ptr merging_executor; MergeTreeData::MutableDataPartPtr new_data_part{nullptr}; + DataPartStorageBuilderPtr data_part_storage_builder; size_t rows_written{0}; UInt64 watch_prev_elapsed{0}; diff --git a/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp index f6cbf54b752..1f3602fa110 100644 --- a/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp @@ -64,11 +64,11 @@ void MergeTreeSelectProcessor::initializeReaders() owned_mark_cache = storage.getContext()->getMarkCache(); reader = data_part->getReader(task_columns.columns, storage_snapshot->getMetadataForQuery(), - all_mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings); + all_mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings, {}, {}); if (prewhere_info) pre_reader = data_part->getReader(task_columns.pre_columns, storage_snapshot->getMetadataForQuery(), - all_mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings); + all_mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings, {}, {}); } diff --git a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp index c5a3b7935d9..81888608623 100644 --- a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp +++ b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp @@ -66,7 +66,7 @@ MergeTreeSequentialSource::MergeTreeSequentialSource( reader = data_part->getReader(columns_for_reader, storage_snapshot->metadata, MarkRanges{MarkRange(0, data_part->getMarksCount())}, - /* uncompressed_cache = */ nullptr, mark_cache.get(), reader_settings); + /* uncompressed_cache = */ nullptr, mark_cache.get(), reader_settings, {}, {}); } Chunk MergeTreeSequentialSource::generate() diff --git a/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp b/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp index 5a706165000..e73dbc148d4 100644 --- a/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp +++ b/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp @@ -10,6 +10,7 @@ namespace ErrorCodes } MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream( + DataPartStorageBuilderPtr data_part_storage_builder_, const MergeTreeDataPartPtr & data_part, const StorageMetadataPtr & metadata_snapshot_, const Block & header_, @@ -18,7 +19,7 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream( WrittenOffsetColumns * offset_columns_, const MergeTreeIndexGranularity & index_granularity, const MergeTreeIndexGranularityInfo * index_granularity_info) - : IMergedBlockOutputStream(data_part, metadata_snapshot_, header_.getNamesAndTypesList(), /*reset_columns=*/ true) + : IMergedBlockOutputStream(std::move(data_part_storage_builder_), data_part, metadata_snapshot_, header_.getNamesAndTypesList(), /*reset_columns=*/ true) , header(header_) { const auto & global_settings = data_part->storage.getContext()->getSettings(); @@ -31,6 +32,7 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream( /* rewrite_primary_key = */false); writer = data_part->getWriter( + data_part_storage_builder, header.getNamesAndTypesList(), metadata_snapshot_, indices_to_recalc, @@ -75,13 +77,11 @@ MergedColumnOnlyOutputStream::fillChecksums( auto removed_files = removeEmptyColumnsFromPart(new_part, columns, serialization_infos, checksums); - auto disk = new_part->volume->getDisk(); for (const String & removed_file : removed_files) { - auto file_path = new_part->getFullRelativePath() + removed_file; /// Can be called multiple times, don't need to remove file twice - if (disk->exists(file_path)) - disk->removeFile(file_path); + if (data_part_storage_builder->exists(removed_file)) + data_part_storage_builder->removeFile(removed_file); if (all_checksums.files.count(removed_file)) all_checksums.files.erase(removed_file); diff --git a/src/Storages/MergeTree/MergedColumnOnlyOutputStream.h b/src/Storages/MergeTree/MergedColumnOnlyOutputStream.h index 7b587d01dab..1fd1c752226 100644 --- a/src/Storages/MergeTree/MergedColumnOnlyOutputStream.h +++ b/src/Storages/MergeTree/MergedColumnOnlyOutputStream.h @@ -14,6 +14,7 @@ public: /// Pass empty 'already_written_offset_columns' first time then and pass the same object to subsequent instances of MergedColumnOnlyOutputStream /// if you want to serialize elements of Nested data structure in different instances of MergedColumnOnlyOutputStream. MergedColumnOnlyOutputStream( + DataPartStorageBuilderPtr data_part_storage_builder_, const MergeTreeDataPartPtr & data_part, const StorageMetadataPtr & metadata_snapshot_, const Block & header_, diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 1fe701c54ae..43bcd7cdef4 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -417,16 +418,17 @@ static NameToNameVector collectFilesForRenames( /// Initialize and write to disk new part fields like checksums, columns, etc. void finalizeMutatedPart( const MergeTreeDataPartPtr & source_part, + const DataPartStorageBuilderPtr & data_part_storage_builder, MergeTreeData::MutableDataPartPtr new_data_part, ExecuteTTLType execute_ttl_type, const CompressionCodecPtr & codec) { - auto disk = new_data_part->volume->getDisk(); - auto part_path = fs::path(new_data_part->getFullRelativePath()); + //auto disk = new_data_part->volume->getDisk(); + //auto part_path = fs::path(new_data_part->getFullRelativePath()); if (new_data_part->uuid != UUIDHelpers::Nil) { - auto out = disk->writeFile(part_path / IMergeTreeDataPart::UUID_FILE_NAME, 4096); + auto out = data_part_storage_builder->writeFile(IMergeTreeDataPart::UUID_FILE_NAME, 4096); HashingWriteBuffer out_hashing(*out); writeUUIDText(new_data_part->uuid, out_hashing); new_data_part->checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_size = out_hashing.count(); @@ -436,7 +438,7 @@ void finalizeMutatedPart( if (execute_ttl_type != ExecuteTTLType::NONE) { /// Write a file with ttl infos in json format. - auto out_ttl = disk->writeFile(part_path / "ttl.txt", 4096); + auto out_ttl = data_part_storage_builder->writeFile("ttl.txt", 4096); HashingWriteBuffer out_hashing(*out_ttl); new_data_part->ttl_infos.write(out_hashing); new_data_part->checksums.files["ttl.txt"].file_size = out_hashing.count(); @@ -445,7 +447,7 @@ void finalizeMutatedPart( if (!new_data_part->getSerializationInfos().empty()) { - auto out = disk->writeFile(part_path / IMergeTreeDataPart::SERIALIZATION_FILE_NAME, 4096); + auto out = data_part_storage_builder->writeFile(IMergeTreeDataPart::SERIALIZATION_FILE_NAME, 4096); HashingWriteBuffer out_hashing(*out); new_data_part->getSerializationInfos().writeJSON(out_hashing); new_data_part->checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_size = out_hashing.count(); @@ -454,18 +456,18 @@ void finalizeMutatedPart( { /// Write file with checksums. - auto out_checksums = disk->writeFile(part_path / "checksums.txt", 4096); + auto out_checksums = data_part_storage_builder->writeFile("checksums.txt", 4096); new_data_part->checksums.write(*out_checksums); } /// close fd { - auto out = disk->writeFile(part_path / IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096); + auto out = data_part_storage_builder->writeFile(IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096); DB::writeText(queryToString(codec->getFullCodecDesc()), *out); } { /// Write a file with a description of columns. - auto out_columns = disk->writeFile(part_path / "columns.txt", 4096); + auto out_columns = data_part_storage_builder->writeFile("columns.txt", 4096); new_data_part->getColumns().writeText(*out_columns); } /// close fd @@ -475,8 +477,7 @@ void finalizeMutatedPart( new_data_part->minmax_idx = source_part->minmax_idx; new_data_part->modification_time = time(nullptr); new_data_part->loadProjections(false, false); - new_data_part->setBytesOnDisk( - MergeTreeData::DataPart::calculateTotalSizeOnDisk(new_data_part->volume->getDisk(), part_path)); + new_data_part->setBytesOnDisk(new_data_part->data_part_storage->calculateTotalSizeOnDisk()); new_data_part->default_codec = codec; new_data_part->calculateColumnsAndSecondaryIndicesSizesOnDisk(); new_data_part->storage.lockSharedData(*new_data_part); @@ -525,10 +526,11 @@ struct MutationContext MutationsInterpreter::MutationKind::MutationKindEnum mutation_kind = MutationsInterpreter::MutationKind::MutationKindEnum::MUTATE_UNKNOWN; - VolumePtr single_disk_volume; + //VolumePtr single_disk_volume; MergeTreeData::MutableDataPartPtr new_data_part; - DiskPtr disk; - String new_part_tmp_path; + //DiskPtr disk; + DataPartStorageBuilderPtr data_part_storage_builder; + //String new_part_tmp_path; IMergedBlockOutputStreamPtr out{nullptr}; @@ -810,7 +812,7 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() if (projection_block) { auto tmp_part = MergeTreeDataWriter::writeTempProjectionPart( - *ctx->data, ctx->log, projection_block, projection, ctx->new_data_part.get(), ++block_num); + *ctx->data, ctx->log, projection_block, projection, ctx->data_part_storage_builder, ctx->new_data_part.get(), ++block_num); tmp_part.finalize(); projection_parts[projection.name].emplace_back(std::move(tmp_part.part)); } @@ -832,7 +834,7 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() if (projection_block) { auto temp_part = MergeTreeDataWriter::writeTempProjectionPart( - *ctx->data, ctx->log, projection_block, projection, ctx->new_data_part.get(), ++block_num); + *ctx->data, ctx->log, projection_block, projection, ctx->data_part_storage_builder, ctx->new_data_part.get(), ++block_num); temp_part.finalize(); projection_parts[projection.name].emplace_back(std::move(temp_part.part)); } @@ -932,7 +934,7 @@ private: void prepare() { - ctx->disk->createDirectories(ctx->new_part_tmp_path); + ctx->data_part_storage_builder->createDirectories(); /// Note: this is done before creating input streams, because otherwise data.data_parts_mutex /// (which is locked in data.getTotalActiveSizeInBytes()) @@ -968,6 +970,7 @@ private: ctx->out = std::make_shared( ctx->new_data_part, + ctx->data_part_storage_builder, ctx->metadata_snapshot, ctx->new_data_part->getColumns(), skip_part_indices, @@ -1056,15 +1059,15 @@ private: if (ctx->execute_ttl_type != ExecuteTTLType::NONE) ctx->files_to_skip.insert("ttl.txt"); - ctx->disk->createDirectories(ctx->new_part_tmp_path); + ctx->data_part_storage_builder->createDirectories(); /// Create hardlinks for unchanged files - for (auto it = ctx->disk->iterateDirectory(ctx->source_part->getFullRelativePath()); it->isValid(); it->next()) + for (auto it = ctx->source_part->data_part_storage->iterate(); it->isValid(); it->next()) { if (ctx->files_to_skip.count(it->name())) continue; - String destination = ctx->new_part_tmp_path; + String destination; // = ctx->new_part_tmp_path; String file_name = it->name(); auto rename_it = std::find_if(ctx->files_to_rename.begin(), ctx->files_to_rename.end(), [&file_name](const auto & rename_pair) @@ -1075,23 +1078,28 @@ private: { if (rename_it->second.empty()) continue; - destination += rename_it->second; + destination = rename_it->second; } else { - destination += it->name(); + destination = it->name(); } - if (!ctx->disk->isDirectory(it->path())) - ctx->disk->createHardLink(it->path(), destination); + if (it->isFile()) + ctx->data_part_storage_builder->createHardLinkFrom( + *ctx->source_part->data_part_storage, it->name(), destination); else if (!endsWith(".tmp_proj", it->name())) // ignore projection tmp merge dir { // it's a projection part directory - ctx->disk->createDirectories(destination); - for (auto p_it = ctx->disk->iterateDirectory(it->path()); p_it->isValid(); p_it->next()) + ctx->data_part_storage_builder->createDirectory(destination); + + auto projection_data_part_storage = ctx->source_part->data_part_storage->getProjection(destination); + auto projection_data_part_storage_builder = ctx->data_part_storage_builder->getProjection(destination); + + for (auto p_it = projection_data_part_storage->iterate(); p_it->isValid(); p_it->next()) { - String p_destination = fs::path(destination) / p_it->name(); - ctx->disk->createHardLink(p_it->path(), p_destination); + projection_data_part_storage_builder->createHardLinkFrom( + *projection_data_part_storage, p_it->name(), p_it->name()); } } } @@ -1162,7 +1170,7 @@ private: } } - MutationHelpers::finalizeMutatedPart(ctx->source_part, ctx->new_data_part, ctx->execute_ttl_type, ctx->compression_codec); + MutationHelpers::finalizeMutatedPart(ctx->source_part, ctx->new_data_part, ctx->data_part_storage_builder, ctx->execute_ttl_type, ctx->compression_codec); } @@ -1293,8 +1301,19 @@ bool MutateTask::prepare() } ctx->single_disk_volume = std::make_shared("volume_" + ctx->future_part->name, ctx->space_reservation->getDisk(), 0); + + auto data_part_storage = std::make_shared( + ctx->single_disk_volume, + ctx->data->getRelativeDataPath(), + "tmp_mut_" + ctx->future_part->name); + + ctx->data_part_storage_builder = std::make_shared( + ctx->single_disk_volume, + ctx->data->getRelativeDataPath(), + "tmp_mut_" + ctx->future_part->name); + ctx->new_data_part = ctx->data->createPart( - ctx->future_part->name, ctx->future_part->type, ctx->future_part->part_info, ctx->single_disk_volume, "tmp_mut_" + ctx->future_part->name); + ctx->future_part->name, ctx->future_part->type, ctx->future_part->part_info, data_part_storage); ctx->new_data_part->uuid = ctx->future_part->uuid; ctx->new_data_part->is_temp = true; @@ -1311,8 +1330,8 @@ bool MutateTask::prepare() ctx->new_data_part->setSerializationInfos(new_infos); ctx->new_data_part->partition.assign(ctx->source_part->partition); - ctx->disk = ctx->new_data_part->volume->getDisk(); - ctx->new_part_tmp_path = ctx->new_data_part->getFullRelativePath(); + // ctx->disk = ctx->new_data_part->volume->getDisk(); + //ctx->new_part_tmp_path = ctx->new_data_part->getFullRelativePath(); /// Don't change granularity type while mutating subset of columns ctx->mrk_extension = ctx->source_part->index_granularity_info.is_adaptive ? getAdaptiveMrkExtension(ctx->new_data_part->getType()) diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index b1392f073ea..8de730b93a7 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -1371,7 +1371,7 @@ void StorageMergeTree::dropPartsImpl(DataPartsVector && parts_to_remove, bool de /// NOTE: no race with background cleanup until we hold pointers to parts for (const auto & part : parts_to_remove) { - LOG_INFO(log, "Detaching {}", part->relative_path); + LOG_INFO(log, "Detaching {}", part->data_part_storage->getRelativePath()); part->makeCloneInDetached("", metadata_snapshot); } } @@ -1606,29 +1606,25 @@ CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_ for (auto & part : data_parts) { - auto disk = part->volume->getDisk(); - String part_path = part->getFullRelativePath(); + //auto disk = part->volume->getDisk(); + //String part_path = part->getFullRelativePath(); /// If the checksums file is not present, calculate the checksums and write them to disk. - String checksums_path = fs::path(part_path) / "checksums.txt"; - String tmp_checksums_path = fs::path(part_path) / "checksums.txt.tmp"; - if (part->isStoredOnDisk() && !disk->exists(checksums_path)) + String checksums_path = "checksums.txt"; + String tmp_checksums_path = "checksums.txt.tmp"; + if (part->isStoredOnDisk() && !part->data_part_storage->exists(checksums_path)) { try { auto calculated_checksums = checkDataPart(part, false); calculated_checksums.checkEqual(part->checksums, true); - auto out = disk->writeFile(tmp_checksums_path, 4096); - part->checksums.write(*out); - disk->moveFile(tmp_checksums_path, checksums_path); + + part->data_part_storage->writeChecksums(part->checksums); part->checkMetadata(); results.emplace_back(part->name, true, "Checksums recounted and written to disk."); } catch (const Exception & ex) { - if (disk->exists(tmp_checksums_path)) - disk->removeFile(tmp_checksums_path); - results.emplace_back(part->name, false, "Check of part finished with error: '" + ex.message() + "'"); } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 52678899b9b..ae7d512a1b9 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include @@ -1437,12 +1438,16 @@ MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::attachPartHelperFo continue; const String part_old_name = part_info->getPartName(); - const String part_path = fs::path("detached") / part_old_name; const VolumePtr volume = std::make_shared("volume_" + part_old_name, disk); + auto data_part_storage = std::make_shared( + volume, + fs::path(relative_data_path) / "detached", + part_old_name); + /// actual_part_info is more recent than part_info so we use it - MergeTreeData::MutableDataPartPtr part = createPart(part_new_name, actual_part_info, volume, part_path); + MergeTreeData::MutableDataPartPtr part = createPart(part_new_name, actual_part_info, data_part_storage); try { @@ -1457,7 +1462,7 @@ MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::attachPartHelperFo if (entry.part_checksum == part->checksums.getTotalChecksumHex()) { - part->modification_time = disk->getLastModified(part->getFullRelativePath()).epochTime(); + part->modification_time = data_part_storage->getLastModified().epochTime(); return part; } } @@ -1823,7 +1828,7 @@ void StorageReplicatedMergeTree::executeDropRange(const LogEntry & entry) /// If DETACH clone parts to detached/ directory for (const auto & part : parts_to_remove) { - LOG_INFO(log, "Detaching {}", part->relative_path); + LOG_INFO(log, "Detaching {}", part->data_part_storage->getRelativePath()); part->makeCloneInDetached("", metadata_snapshot); } } @@ -2448,7 +2453,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo for (const auto & part : parts_to_remove_from_working_set) { - LOG_INFO(log, "Detaching {}", part->relative_path); + LOG_INFO(log, "Detaching {}", part->data_part_storage->getRelativePath()); part->makeCloneInDetached("clone", metadata_snapshot); } } @@ -4037,10 +4042,10 @@ bool StorageReplicatedMergeTree::fetchExistsPart(const String & part_name, const { part = get_part(); - if (part->volume->getDisk()->getName() != replaced_disk->getName()) - throw Exception("Part " + part->name + " fetched on wrong disk " + part->volume->getDisk()->getName(), ErrorCodes::LOGICAL_ERROR); + if (part->data_part_storage->getName() != replaced_disk->getName()) + throw Exception("Part " + part->name + " fetched on wrong disk " + part->data_part_storage->getName(), ErrorCodes::LOGICAL_ERROR); replaced_disk->removeFileIfExists(replaced_part_path); - replaced_disk->moveDirectory(part->getFullRelativePath(), replaced_part_path); + replaced_disk->moveDirectory(part->data_part_storage->getFullRelativePath(), replaced_part_path); } catch (const Exception & e) { @@ -7115,7 +7120,7 @@ void StorageReplicatedMergeTree::checkBrokenDisks() for (auto & part : *parts) { - if (part->volume && part->volume->getDisk()->getName() == disk_ptr->getName()) + if (part->data_part_storage && part->data_part_storage->getName() == disk_ptr->getName()) broken_part_callback(part->name); } continue; @@ -7216,7 +7221,7 @@ void StorageReplicatedMergeTree::lockSharedDataTemporary(const String & part_nam String id = part_id; boost::replace_all(id, "/", "_"); - Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), disk->getType(), getTableSharedID(), + Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), toString(disk->getType()), getTableSharedID(), part_name, zookeeper_path); for (const auto & zc_zookeeper_path : zc_zookeeper_paths) @@ -7230,11 +7235,10 @@ void StorageReplicatedMergeTree::lockSharedDataTemporary(const String & part_nam void StorageReplicatedMergeTree::lockSharedData(const IMergeTreeDataPart & part, bool replace_existing_lock) const { - if (!part.volume || !part.isStoredOnDisk()) + if (!part.data_part_storage || !part.isStoredOnDisk()) return; - DiskPtr disk = part.volume->getDisk(); - if (!disk || !disk->supportZeroCopyReplication()) + if (part.data_part_storage->supportZeroCopyReplication()) return; zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper(); @@ -7244,7 +7248,7 @@ void StorageReplicatedMergeTree::lockSharedData(const IMergeTreeDataPart & part, String id = part.getUniqueId(); boost::replace_all(id, "/", "_"); - Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), disk->getType(), getTableSharedID(), + Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), part.data_part_storage->getDiskType(), getTableSharedID(), part.name, zookeeper_path); for (const auto & zc_zookeeper_path : zc_zookeeper_paths) { @@ -7265,18 +7269,16 @@ bool StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & par bool StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & part, const String & name) const { - if (!part.volume || !part.isStoredOnDisk()) + if (!part.data_part_storage || !part.isStoredOnDisk()) return true; - DiskPtr disk = part.volume->getDisk(); - if (!disk || !disk->supportZeroCopyReplication()) + if (!part.data_part_storage->supportZeroCopyReplication()) return true; /// If part is temporary refcount file may be absent - auto ref_count_path = fs::path(part.getFullRelativePath()) / IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK; - if (disk->exists(ref_count_path)) + if (part.data_part_storage->exists(IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK)) { - auto ref_count = disk->getRefCount(ref_count_path); + auto ref_count = part.data_part_storage->getRefCount(IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK); if (ref_count > 0) /// Keep part shard info for frozen backups return false; } @@ -7286,18 +7288,18 @@ bool StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & par return true; } - return unlockSharedDataByID(part.getUniqueId(), getTableSharedID(), name, replica_name, disk, getZooKeeper(), *getSettings(), log, + return unlockSharedDataByID(part.getUniqueId(), getTableSharedID(), name, replica_name, part.data_part_storage->getDiskType(), getZooKeeper(), *getSettings(), log, zookeeper_path); } bool StorageReplicatedMergeTree::unlockSharedDataByID(String part_id, const String & table_uuid, const String & part_name, - const String & replica_name_, DiskPtr disk, zkutil::ZooKeeperPtr zookeeper_ptr, const MergeTreeSettings & settings, + const String & replica_name_, std::string disk_type, zkutil::ZooKeeperPtr zookeeper_ptr, const MergeTreeSettings & settings, Poco::Logger * logger, const String & zookeeper_path_old) { boost::replace_all(part_id, "/", "_"); - Strings zc_zookeeper_paths = getZeroCopyPartPath(settings, disk->getType(), table_uuid, part_name, zookeeper_path_old); + Strings zc_zookeeper_paths = getZeroCopyPartPath(settings, disk_type, table_uuid, part_name, zookeeper_path_old); bool part_has_no_more_locks = true; @@ -7381,7 +7383,7 @@ String StorageReplicatedMergeTree::getSharedDataReplica( if (!zookeeper) return ""; - Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), disk_type, getTableSharedID(), part.name, + Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), toString(disk_type), getTableSharedID(), part.name, zookeeper_path); std::set replicas; @@ -7452,12 +7454,12 @@ String StorageReplicatedMergeTree::getSharedDataReplica( } -Strings StorageReplicatedMergeTree::getZeroCopyPartPath(const MergeTreeSettings & settings, DiskType disk_type, const String & table_uuid, +Strings StorageReplicatedMergeTree::getZeroCopyPartPath(const MergeTreeSettings & settings, std::string disk_type, const String & table_uuid, const String & part_name, const String & zookeeper_path_old) { Strings res; - String zero_copy = fmt::format("zero_copy_{}", toString(disk_type)); + String zero_copy = fmt::format("zero_copy_{}", disk_type); String new_path = fs::path(settings.remote_fs_zero_copy_zookeeper_path.toString()) / zero_copy / table_uuid / part_name; res.push_back(new_path); @@ -7491,7 +7493,7 @@ std::optional StorageReplicatedMergeTree::getZeroCopyPartPath(const Stri if (!disk || !disk->supportZeroCopyReplication()) return std::nullopt; - return getZeroCopyPartPath(*getSettings(), disk->getType(), getTableSharedID(), part_name, zookeeper_path)[0]; + return getZeroCopyPartPath(*getSettings(), toString(disk->getType()), getTableSharedID(), part_name, zookeeper_path)[0]; } std::optional StorageReplicatedMergeTree::tryCreateZeroCopyExclusiveLock(const String & part_name, const DiskPtr & disk) @@ -7586,12 +7588,22 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP auto minmax_idx = std::make_shared(); minmax_idx->update(block, getMinMaxColumnsNames(metadata_snapshot->getPartitionKey())); + auto new_volume = createVolumeFromReservation(reservation, volume); + auto data_part_storage = std::make_shared( + new_volume, + relative_data_path, + TMP_PREFIX + lost_part_name); + + DataPartStorageBuilderPtr data_part_storage_builder = std::make_shared( + new_volume, + relative_data_path, + TMP_PREFIX + lost_part_name); + auto new_data_part = createPart( lost_part_name, choosePartType(0, block.rows()), new_part_info, - createVolumeFromReservation(reservation, volume), - TMP_PREFIX + lost_part_name); + data_part_storage); if (settings->assign_part_uuids) new_data_part->uuid = UUIDHelpers::generateV4(); @@ -7628,19 +7640,16 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP if (new_data_part->isStoredOnDisk()) { /// The name could be non-unique in case of stale files from previous runs. - String full_path = new_data_part->getFullRelativePath(); - - if (new_data_part->volume->getDisk()->exists(full_path)) + if (data_part_storage_builder->exists()) { - LOG_WARNING(log, "Removing old temporary directory {}", fullPath(new_data_part->volume->getDisk(), full_path)); - new_data_part->volume->getDisk()->removeRecursive(full_path); + LOG_WARNING(log, "Removing old temporary directory {}", new_data_part->data_part_storage->getFullPath()); + data_part_storage_builder->removeRecursive(); } - const auto disk = new_data_part->volume->getDisk(); - disk->createDirectories(full_path); + data_part_storage_builder->createDirectories(); if (getSettings()->fsync_part_directory) - sync_guard = disk->getDirectorySyncGuard(full_path); + sync_guard = data_part_storage_builder->getDirectorySyncGuard(); } /// This effectively chooses minimal compression method: @@ -7648,7 +7657,7 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP auto compression_codec = getContext()->chooseCompressionCodec(0, 0); const auto & index_factory = MergeTreeIndexFactory::instance(); - MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns, + MergedBlockOutputStream out(new_data_part, data_part_storage_builder, metadata_snapshot, columns, index_factory.getMany(metadata_snapshot->getSecondaryIndices()), compression_codec); bool sync_on_insert = settings->fsync_after_insert; @@ -7909,7 +7918,7 @@ bool StorageReplicatedMergeTree::removeSharedDetachedPart(DiskPtr disk, const St { String id = disk->getUniqueId(checksums); keep_shared = !StorageReplicatedMergeTree::unlockSharedDataByID(id, table_uuid, part_name, - detached_replica_name, disk, zookeeper, getContext()->getReplicatedMergeTreeSettings(), log, + detached_replica_name, toString(disk->getType()), zookeeper, getContext()->getReplicatedMergeTreeSettings(), log, detached_zookeeper_path); } else diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index c567447e9f2..7e7988e65e8 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -247,7 +247,7 @@ public: /// Return true if data unlocked /// Return false if data is still used by another node static bool unlockSharedDataByID(String part_id, const String & table_uuid, const String & part_name, const String & replica_name_, - DiskPtr disk, zkutil::ZooKeeperPtr zookeeper_, const MergeTreeSettings & settings, Poco::Logger * logger, + std::string disk_type, zkutil::ZooKeeperPtr zookeeper_, const MergeTreeSettings & settings, Poco::Logger * logger, const String & zookeeper_path_old); /// Fetch part only if some replica has it on shared storage like S3 @@ -758,7 +758,7 @@ private: PartitionBlockNumbersHolder allocateBlockNumbersInAffectedPartitions( const MutationCommands & commands, ContextPtr query_context, const zkutil::ZooKeeperPtr & zookeeper) const; - static Strings getZeroCopyPartPath(const MergeTreeSettings & settings, DiskType disk_type, const String & table_uuid, + static Strings getZeroCopyPartPath(const MergeTreeSettings & settings, std::string disk_type, const String & table_uuid, const String & part_name, const String & zookeeper_path_old); static void createZeroCopyLockNode(const zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_node, int32_t mode = zkutil::CreateMode::Persistent, bool replace_existing_lock = false);