better interface

This commit is contained in:
Anton Popov 2022-10-23 22:29:24 +00:00
parent b40d9200d2
commit cf375c9732
7 changed files with 200 additions and 183 deletions

View File

@ -558,154 +558,6 @@ size_t DataPartStorageOnDisk::getVolumeIndex(const IStoragePolicy & storage_poli
return storage_policy.getVolumeIndexByDisk(volume->getDisk());
}
void DataPartStorageOnDisk::writeChecksums(const MergeTreeDataPartChecksums & checksums, const WriteSettings & settings) const
{
std::string path = fs::path(root_path) / part_dir / "checksums.txt";
try
{
{
auto out = volume->getDisk()->writeFile(path + ".tmp", 4096, WriteMode::Rewrite, settings);
checksums.write(*out);
}
volume->getDisk()->moveFile(path + ".tmp", path);
}
catch (...)
{
try
{
if (volume->getDisk()->exists(path + ".tmp"))
volume->getDisk()->removeFile(path + ".tmp");
}
catch (...)
{
tryLogCurrentException("DataPartStorageOnDisk");
}
throw;
}
}
void DataPartStorageOnDisk::writeColumns(const NamesAndTypesList & columns, const WriteSettings & settings) const
{
std::string path = fs::path(root_path) / part_dir / "columns.txt";
try
{
auto buf = volume->getDisk()->writeFile(path + ".tmp", 4096, WriteMode::Rewrite, settings);
columns.writeText(*buf);
buf->finalize();
volume->getDisk()->moveFile(path + ".tmp", path);
}
catch (...)
{
try
{
if (volume->getDisk()->exists(path + ".tmp"))
volume->getDisk()->removeFile(path + ".tmp");
}
catch (...)
{
tryLogCurrentException("DataPartStorageOnDisk");
}
throw;
}
}
void DataPartStorageOnDisk::writeVersionMetadata(const VersionMetadata & version, bool fsync_part_dir) const
{
std::string path = fs::path(root_path) / part_dir / "txn_version.txt";
try
{
{
/// TODO IDisk interface does not allow to open file with O_EXCL flag (for DiskLocal),
/// so we create empty file at first (expecting that createFile throws if file already exists)
/// and then overwrite it.
volume->getDisk()->createFile(path + ".tmp");
auto buf = volume->getDisk()->writeFile(path + ".tmp", 256);
version.write(*buf);
buf->finalize();
buf->sync();
}
SyncGuardPtr sync_guard;
if (fsync_part_dir)
sync_guard = volume->getDisk()->getDirectorySyncGuard(getRelativePath());
volume->getDisk()->replaceFile(path + ".tmp", path);
}
catch (...)
{
try
{
if (volume->getDisk()->exists(path + ".tmp"))
volume->getDisk()->removeFile(path + ".tmp");
}
catch (...)
{
tryLogCurrentException("DataPartStorageOnDisk");
}
throw;
}
}
void DataPartStorageOnDisk::appendCSNToVersionMetadata(const VersionMetadata & version, VersionMetadata::WhichCSN which_csn) const
{
/// Small enough appends to file are usually atomic,
/// so we append new metadata instead of rewriting file to reduce number of fsyncs.
/// We don't need to do fsync when writing CSN, because in case of hard restart
/// we will be able to restore CSN from transaction log in Keeper.
std::string version_file_name = fs::path(root_path) / part_dir / "txn_version.txt";
DiskPtr disk = volume->getDisk();
auto out = disk->writeFile(version_file_name, 256, WriteMode::Append);
version.writeCSN(*out, which_csn);
out->finalize();
}
void DataPartStorageOnDisk::appendRemovalTIDToVersionMetadata(const VersionMetadata & version, bool clear) const
{
String version_file_name = fs::path(root_path) / part_dir / "txn_version.txt";
DiskPtr disk = volume->getDisk();
auto out = disk->writeFile(version_file_name, 256, WriteMode::Append);
version.writeRemovalTID(*out, clear);
out->finalize();
/// fsync is not required when we clearing removal TID, because after hard restart we will fix metadata
if (!clear)
out->sync();
}
void DataPartStorageOnDisk::writeDeleteOnDestroyMarker(Poco::Logger * log) const
{
String marker_path = fs::path(root_path) / part_dir / "delete-on-destroy.txt";
auto disk = volume->getDisk();
try
{
volume->getDisk()->createFile(marker_path);
}
catch (Poco::Exception & e)
{
LOG_ERROR(log, "{} (while creating DeleteOnDestroy marker: {})", e.what(), backQuote(fullPath(disk, marker_path)));
}
}
void DataPartStorageOnDisk::removeDeleteOnDestroyMarker() const
{
std::string delete_on_destroy_file_name = fs::path(root_path) / part_dir / "delete-on-destroy.txt";
volume->getDisk()->removeFileIfExists(delete_on_destroy_file_name);
}
void DataPartStorageOnDisk::removeVersionMetadata() const
{
std::string version_file_name = fs::path(root_path) / part_dir / "txn_version.txt";
volume->getDisk()->removeFileIfExists(version_file_name);
}
String DataPartStorageOnDisk::getUniqueId() const
{
auto disk = volume->getDisk();
@ -935,6 +787,34 @@ std::unique_ptr<WriteBufferFromFileBase> DataPartStorageOnDisk::writeFile(
return volume->getDisk()->writeFile(fs::path(root_path) / part_dir / name, buf_size, WriteMode::Rewrite, settings);
}
std::unique_ptr<WriteBufferFromFileBase> DataPartStorageOnDisk::writeTransactionFile(WriteMode mode) const
{
return volume->getDisk()->writeFile(fs::path(root_path) / part_dir / "txn_version.txt", 256, mode);
}
void DataPartStorageOnDisk::createFile(const String & name)
{
executeOperation([&](auto & disk) { disk.createFile(fs::path(root_path) / part_dir / name); });
}
void DataPartStorageOnDisk::moveFile(const String & from_name, const String & to_name)
{
executeOperation([&](auto & disk)
{
auto relative_path = fs::path(root_path) / part_dir;
disk.moveFile(relative_path / from_name, relative_path / to_name);
});
}
void DataPartStorageOnDisk::replaceFile(const String & from_name, const String & to_name)
{
executeOperation([&](auto & disk)
{
auto relative_path = fs::path(root_path) / part_dir;
disk.replaceFile(relative_path / from_name, relative_path / to_name);
});
}
void DataPartStorageOnDisk::removeFile(const String & name)
{
executeOperation([&](auto & disk) { disk.removeFile(fs::path(root_path) / part_dir / name); });

View File

@ -78,15 +78,6 @@ public:
ReservationPtr tryReserve(UInt64 bytes) const override;
size_t getVolumeIndex(const IStoragePolicy &) const override;
void writeChecksums(const MergeTreeDataPartChecksums & checksums, const WriteSettings & settings) const override;
void writeColumns(const NamesAndTypesList & columns, const WriteSettings & settings) const override;
void writeVersionMetadata(const VersionMetadata & version, bool fsync_part_dir) const override;
void appendCSNToVersionMetadata(const VersionMetadata & version, VersionMetadata::WhichCSN which_csn) const override;
void appendRemovalTIDToVersionMetadata(const VersionMetadata & version, bool clear) const override;
void writeDeleteOnDestroyMarker(Poco::Logger * log) const override;
void removeDeleteOnDestroyMarker() const override;
void removeVersionMetadata() const override;
String getUniqueId() const override;
bool shallParticipateInMerges(const IStoragePolicy &) const override;
@ -123,6 +114,12 @@ public:
size_t buf_size,
const WriteSettings & settings) override;
std::unique_ptr<WriteBufferFromFileBase> writeTransactionFile(WriteMode mode) const override;
void createFile(const String & name) override;
void moveFile(const String & from_name, const String & to_name) override;
void replaceFile(const String & from_name, const String & to_name) override;
void removeFile(const String & name) override;
void removeFileIfExists(const String & name) override;
void removeRecursive() override;

View File

@ -4,6 +4,7 @@
#include <Core/NamesAndTypes.h>
#include <Interpreters/TransactionVersionMetadata.h>
#include <Storages/MergeTree/MergeTreeDataPartState.h>
#include <Disks/WriteMode.h>
#include <boost/core/noncopyable.hpp>
#include <memory>
#include <optional>
@ -142,6 +143,7 @@ public:
virtual bool supportZeroCopyReplication() const { return false; }
virtual bool supportParallelWrite() const = 0;
virtual bool isBroken() const = 0;
/// TODO: remove or at least remove const.
virtual void syncRevision(UInt64 revision) const = 0;
virtual UInt64 getRevision() const = 0;
@ -160,17 +162,6 @@ public:
virtual ReservationPtr tryReserve(UInt64 /*bytes*/) const { return nullptr; }
virtual size_t getVolumeIndex(const IStoragePolicy &) const { return 0; }
/// Some methods which change data part internals possibly after creation.
/// Probably we should try to remove it later.
virtual void writeChecksums(const MergeTreeDataPartChecksums & checksums, const WriteSettings & settings) const = 0;
virtual void writeColumns(const NamesAndTypesList & columns, const WriteSettings & settings) const = 0;
virtual void writeVersionMetadata(const VersionMetadata & version, bool fsync_part_dir) const = 0;
virtual void appendCSNToVersionMetadata(const VersionMetadata & version, VersionMetadata::WhichCSN which_csn) const = 0;
virtual void appendRemovalTIDToVersionMetadata(const VersionMetadata & version, bool clear) const = 0;
virtual void writeDeleteOnDestroyMarker(Poco::Logger * log) const = 0;
virtual void removeDeleteOnDestroyMarker() const = 0;
virtual void removeVersionMetadata() const = 0;
/// A leak of abstraction.
/// Return some uniq string for file.
/// Required for distinguish different copies of the same part on remote FS.
@ -219,7 +210,16 @@ public:
virtual void createDirectories() = 0;
virtual void createProjection(const std::string & name) = 0;
virtual std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & name, size_t buf_size, const WriteSettings & settings) = 0;
virtual std::unique_ptr<WriteBufferFromFileBase> writeFile(
const String & name,
size_t buf_size,
const WriteSettings & settings) = 0;
virtual std::unique_ptr<WriteBufferFromFileBase> writeTransactionFile(WriteMode mode) const = 0;
virtual void createFile(const String & name) = 0;
virtual void moveFile(const String & from_name, const String & to_name) = 0;
virtual void replaceFile(const String & from_name, const String & to_name) = 0;
virtual void removeFile(const String & name) = 0;
virtual void removeFileIfExists(const String & name) = 0;

View File

@ -854,6 +854,120 @@ void IMergeTreeDataPart::loadDefaultCompressionCodec()
}
}
template <typename Writer>
void IMergeTreeDataPart::writeMetadata(const String & filename, const WriteSettings & settings, Writer && writer)
{
auto & data_part_storage = getDataPartStorage();
auto tmp_filename = filename + ".tmp";
try
{
{
auto out = data_part_storage.writeFile(tmp_filename, 4096, settings);
writer(*out);
out->finalize();
}
data_part_storage.moveFile(tmp_filename, filename);
}
catch (...)
{
try
{
if (data_part_storage.exists(tmp_filename))
data_part_storage.removeFile(tmp_filename);
}
catch (...)
{
tryLogCurrentException("DataPartStorageOnDisk");
}
throw;
}
}
void IMergeTreeDataPart::writeChecksums(const MergeTreeDataPartChecksums & checksums_, const WriteSettings & settings)
{
writeMetadata("checksums.txt", settings, [&checksums_](auto & buffer)
{
checksums_.write(buffer);
});
}
void IMergeTreeDataPart::writeColumns(const NamesAndTypesList & columns_, const WriteSettings & settings)
{
writeMetadata("columns.txt", settings, [&columns_](auto & buffer)
{
columns_.writeText(buffer);
});
}
void IMergeTreeDataPart::writeVersionMetadata(const VersionMetadata & version_, bool fsync_part_dir) const
{
auto & data_part_storage = const_cast<IDataPartStorage &>(getDataPartStorage());
static constexpr auto filename = "txn_version.txt";
static constexpr auto tmp_filename = "txn_version.txt.tmp";
try
{
{
/// TODO IDisk interface does not allow to open file with O_EXCL flag (for DiskLocal),
/// so we create empty file at first (expecting that createFile throws if file already exists)
/// and then overwrite it.
data_part_storage.createFile(tmp_filename);
auto write_settings = storage.getContext()->getWriteSettings();
auto buf = data_part_storage.writeFile(tmp_filename, 256, write_settings);
version_.write(*buf);
buf->finalize();
buf->sync();
}
SyncGuardPtr sync_guard;
if (fsync_part_dir)
sync_guard = data_part_storage.getDirectorySyncGuard();
data_part_storage.replaceFile(tmp_filename, filename);
}
catch (...)
{
try
{
if (data_part_storage.exists(tmp_filename))
data_part_storage.removeFile(tmp_filename);
}
catch (...)
{
tryLogCurrentException("DataPartStorageOnDisk");
}
throw;
}
}
void IMergeTreeDataPart::writeDeleteOnDestroyMarker()
{
static constexpr auto marker_path = "delete-on-destroy.txt";
try
{
getDataPartStorage().createFile(marker_path);
}
catch (Poco::Exception & e)
{
LOG_ERROR(storage.log, "{} (while creating DeleteOnDestroy marker: {})",
e.what(), (fs::path(getDataPartStorage().getFullPath()) / marker_path).string());
}
}
void IMergeTreeDataPart::removeDeleteOnDestroyMarker()
{
getDataPartStorage().removeFileIfExists("delete-on-destroy.txt");
}
void IMergeTreeDataPart::removeVersionMetadata()
{
getDataPartStorage().removeFileIfExists("txn_version.txt");
}
void IMergeTreeDataPart::appendFilesOfDefaultCompressionCodec(Strings & files)
{
files.push_back(DEFAULT_COMPRESSION_CODEC_FILE_NAME);
@ -980,7 +1094,7 @@ void IMergeTreeDataPart::loadChecksums(bool require)
LOG_WARNING(storage.log, "Checksums for part {} not found. Will calculate them from data on disk.", name);
checksums = checkDataPart(shared_from_this(), false);
getDataPartStorage().writeChecksums(checksums, {});
writeChecksums(checksums, {});
bytes_on_disk = checksums.getTotalSizeOnDisk();
}
@ -993,8 +1107,6 @@ void IMergeTreeDataPart::appendFilesOfChecksums(Strings & files)
void IMergeTreeDataPart::loadRowsCount()
{
//String path = fs::path(getRelativePath()) / "count.txt";
auto read_rows_count = [&]()
{
auto buf = metadata_manager->read("count.txt");
@ -1186,7 +1298,7 @@ void IMergeTreeDataPart::loadColumns(bool require)
if (columns.empty())
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
getDataPartStorage().writeColumns(loaded_columns, {});
writeColumns(loaded_columns, {});
}
else
{
@ -1245,7 +1357,7 @@ void IMergeTreeDataPart::storeVersionMetadata(bool force) const
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Transactions are not supported for in-memory parts (table: {}, part: {})",
storage.getStorageID().getNameForLogs(), name);
getDataPartStorage().writeVersionMetadata(version, storage.getSettings()->fsync_part_directory);
writeVersionMetadata(version, storage.getSettings()->fsync_part_directory);
}
void IMergeTreeDataPart::appendCSNToVersionMetadata(VersionMetadata::WhichCSN which_csn) const
@ -1257,7 +1369,14 @@ void IMergeTreeDataPart::appendCSNToVersionMetadata(VersionMetadata::WhichCSN wh
chassert(!(which_csn == VersionMetadata::WhichCSN::REMOVAL && version.removal_csn == 0));
chassert(isStoredOnDisk());
getDataPartStorage().appendCSNToVersionMetadata(version, which_csn);
/// Small enough appends to file are usually atomic,
/// so we append new metadata instead of rewriting file to reduce number of fsyncs.
/// We don't need to do fsync when writing CSN, because in case of hard restart
/// we will be able to restore CSN from transaction log in Keeper.
auto out = getDataPartStorage().writeTransactionFile(WriteMode::Append);
version.writeCSN(*out, which_csn);
out->finalize();
}
void IMergeTreeDataPart::appendRemovalTIDToVersionMetadata(bool clear) const
@ -1280,7 +1399,13 @@ void IMergeTreeDataPart::appendRemovalTIDToVersionMetadata(bool clear) const
else
LOG_TEST(storage.log, "Appending removal TID for {} (creation: {}, removal {})", name, version.creation_tid, version.removal_tid);
getDataPartStorage().appendRemovalTIDToVersionMetadata(version, clear);
auto out = getDataPartStorage().writeTransactionFile(WriteMode::Append);
version.writeRemovalTID(*out, clear);
out->finalize();
/// fsync is not required when we clearing removal TID, because after hard restart we will fix metadata
if (!clear)
out->sync();
}
void IMergeTreeDataPart::loadVersionMetadata() const

View File

@ -1,5 +1,6 @@
#pragma once
#include "IO/WriteSettings.h"
#include <Core/Block.h>
#include <base/types.h>
#include <Core/NamesAndTypes.h>
@ -439,6 +440,12 @@ public:
/// True if here is lightweight deleted mask file in part.
bool hasLightweightDelete() const { return columns.contains(LightweightDeleteDescription::FILTER_COLUMN.name); }
void writeChecksums(const MergeTreeDataPartChecksums & checksums_, const WriteSettings & settings);
void writeDeleteOnDestroyMarker();
void removeDeleteOnDestroyMarker();
void removeVersionMetadata();
protected:
/// Total size of all columns, calculated once in calcuateColumnSizesOnDisk
@ -560,6 +567,12 @@ private:
/// any specifial compression.
void loadDefaultCompressionCodec();
void writeColumns(const NamesAndTypesList & columns_, const WriteSettings & settings);
void writeVersionMetadata(const VersionMetadata & version_, bool fsync_part_dir) const;
template <typename Writer>
void writeMetadata(const String & filename, const WriteSettings & settings, Writer && writer);
static void appendFilesOfDefaultCompressionCodec(Strings & files);
/// Found column without specific compression and return codec

View File

@ -1685,7 +1685,8 @@ scope_guard MergeTreeData::getTemporaryPartDirectoryHolder(const String & part_d
MergeTreeData::MutableDataPartPtr MergeTreeData::preparePartForRemoval(const DataPartPtr & part)
{
if (part->getState() != DataPartState::Deleting)
auto state = part->getState();
if (state != DataPartState::Deleting && state != DataPartState::DeleteOnDestroy)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot remove part {}, because it has state: {}", part->name, magic_enum::enum_name(part->getState()));
@ -3666,7 +3667,7 @@ void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy)
/// All other locks are taken in StorageReplicatedMergeTree
lockSharedData(*part_copy);
original_active_part->getDataPartStorage().writeDeleteOnDestroyMarker(log);
preparePartForRemoval(original_active_part)->writeDeleteOnDestroyMarker();
return;
}
}
@ -3801,8 +3802,8 @@ static void loadPartAndFixMetadataImpl(MergeTreeData::MutableDataPartPtr part)
{
part->loadColumnsChecksumsIndexes(false, true);
part->modification_time = part->getDataPartStorage().getLastModified().epochTime();
part->getDataPartStorage().removeDeleteOnDestroyMarker();
part->getDataPartStorage().removeVersionMetadata();
part->removeDeleteOnDestroyMarker();
part->removeVersionMetadata();
}
void MergeTreeData::calculateColumnAndSecondaryIndexSizesImpl()

View File

@ -1,4 +1,5 @@
#include "StorageMergeTree.h"
#include "Storages/MergeTree/IMergeTreeDataPart.h"
#include <optional>
@ -1739,8 +1740,7 @@ CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_
for (auto & part : data_parts)
{
/// If the checksums file is not present, calculate the checksums and write them to disk.
String checksums_path = "checksums.txt";
String tmp_checksums_path = "checksums.txt.tmp";
static constexpr auto checksums_path = "checksums.txt";
if (part->isStoredOnDisk() && !part->getDataPartStorage().exists(checksums_path))
{
try
@ -1748,7 +1748,8 @@ CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_
auto calculated_checksums = checkDataPart(part, false);
calculated_checksums.checkEqual(part->checksums, true);
part->getDataPartStorage().writeChecksums(part->checksums, local_context->getWriteSettings());
auto & part_mutable = const_cast<IMergeTreeDataPart &>(*part);
part_mutable.writeChecksums(part->checksums, local_context->getWriteSettings());
part->checkMetadata();
results.emplace_back(part->name, true, "Checksums recounted and written to disk.");