This commit is contained in:
Nikolai Kochetov 2022-04-19 19:34:41 +00:00
parent 76870ad92a
commit bcbab2ead8
9 changed files with 511 additions and 315 deletions

View File

@ -1,9 +1,14 @@
#include <Storages/MergeTree/DataPartStorageOnDisk.h>
#include <Storages/MergeTree/MergeTreeDataPartChecksum.h>
#include <Disks/IVolume.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <IO/WriteBufferFromFileBase.h>
#include <base/logger_useful.h>
#include <Disks/IStoragePolicy.h>
#include <Backups/BackupEntryFromSmallFile.h>
#include <Backups/BackupEntryFromImmutableFile.h>
#include <Storages/MergeTree/localBackup.h>
#include <Disks/SingleDiskVolume.h>
namespace DB
{
@ -20,6 +25,11 @@ DataPartStorageOnDisk::DataPartStorageOnDisk(VolumePtr volume_, std::string root
{
}
void DataPartStorageOnDisk::setRelativePath(const std::string & path)
{
part_dir = path;
}
std::string DataPartStorageOnDisk::getFullRelativePath() const
{
return fs::path(root_path) / part_dir;
@ -30,6 +40,32 @@ std::string DataPartStorageOnDisk::getFullPath() const
return fs::path(volume->getDisk()->getPath()) / root_path / part_dir;
}
std::string DataPartStorageOnDisk::getFullRootPath() const
{
return fs::path(volume->getDisk()->getPath()) / root_path;
}
std::string DataPartStorageOnDisk::getRelativePathForPrefix(Poco::Logger * log, const String & prefix, bool detached) const
{
String res;
auto full_relative_path = fs::path(root_path);
if (detached)
full_relative_path /= "detached";
for (int try_no = 0; try_no < 10; ++try_no)
{
res = (prefix.empty() ? "" : prefix + "_") + part_dir + (try_no ? "_try" + DB::toString(try_no) : "");
if (!volume->getDisk()->exists(full_relative_path / res))
return res;
LOG_WARNING(log, "Directory {} (to detach to) already exists. Will detach to directory with '_tryN' suffix.", res);
}
return res;
}
std::unique_ptr<ReadBufferFromFileBase> DataPartStorageOnDisk::readFile(
const std::string & path,
const ReadSettings & settings,
@ -69,6 +105,140 @@ DiskDirectoryIteratorPtr DataPartStorageOnDisk::iterateDirectory(const String &
return volume->getDisk()->iterateDirectory(fs::path(root_path) / part_dir / path);
}
// namespace
// {
// static constexpr std::string_view non_checksum_files[] =
// {
// "checksums.txt",
// "columns.txt",
// "default_compression_codec.txt",
// "delete-on-destroy.txt",
// "txn_version.txt",
// };
// static constexpr std::span<std::string_view> projection_non_checksum_files(non_checksum_files, 4);
// static constexpr std::span<std::string_view> part_non_checksum_files(non_checksum_files, 5);
// }
void DataPartStorageOnDisk::remove(
bool keep_shared_data,
const MergeTreeDataPartChecksums & checksums,
std::list<ProjectionChecksums> projections,
Poco::Logger * log) const
{
/// NOTE We rename part to delete_tmp_<relative_path> instead of delete_tmp_<name> to avoid race condition
/// when we try to remove two parts with the same name, but different relative paths,
/// for example all_1_2_1 (in Deleting state) and tmp_merge_all_1_2_1 (in Temporary state).
fs::path from = fs::path(root_path) / part_dir;
fs::path to = fs::path(root_path) / ("delete_tmp_" + part_dir);
// TODO directory delete_tmp_<name> is never removed if server crashes before returning from this function
auto disk = volume->getDisk();
if (disk->exists(to))
{
LOG_WARNING(log, "Directory {} (to which part must be renamed before removing) already exists. Most likely this is due to unclean restart or race condition. Removing it.", fullPath(disk, to));
try
{
disk->removeSharedRecursive(fs::path(to) / "", keep_shared_data);
}
catch (...)
{
LOG_ERROR(log, "Cannot recursively remove directory {}. Exception: {}", fullPath(disk, to), getCurrentExceptionMessage(false));
throw;
}
}
try
{
disk->moveDirectory(from, to);
}
catch (const fs::filesystem_error & e)
{
if (e.code() == std::errc::no_such_file_or_directory)
{
LOG_ERROR(log, "Directory {} (part to remove) doesn't exist or one of nested files has gone. Most likely this is due to manual removing. This should be discouraged. Ignoring.", fullPath(disk, to));
return;
}
throw;
}
// Record existing projection directories so we don't remove them twice
std::unordered_set<String> projection_directories;
for (const auto & projection : projections)
{
std::string proj_dir_name = projection.name + ".proj";
projection_directories.emplace(proj_dir_name);
}
clearDirectory(to, keep_shared_data, checksums, projection_directories, log, false);
}
void DataPartStorageOnDisk::clearDirectory(
const std::string & dir,
bool keep_shared_data,
const MergeTreeDataPartChecksums & checksums,
const std::unordered_set<String> & skip_directories,
Poco::Logger * log,
bool is_projection) const
{
auto disk = volume->getDisk();
if (checksums.empty())
{
if (is_projection)
{
LOG_ERROR(
log,
"Cannot quickly remove directory {} by removing files; fallback to recursive removal. Reason: checksums.txt is missing",
fullPath(disk, dir));
}
/// If the part is not completely written, we cannot use fast path by listing files.
disk->removeSharedRecursive(fs::path(dir) / "", keep_shared_data);
return;
}
try
{
/// Remove each expected file in directory, then remove directory itself.
IDisk::RemoveBatchRequest request;
#if !defined(__clang__)
# pragma GCC diagnostic push
# pragma GCC diagnostic ignored "-Wunused-variable"
#endif
for (const auto & [file, _] : checksums.files)
{
if (skip_directories.find(file) == skip_directories.end())
request.emplace_back(fs::path(dir) / file);
}
#if !defined(__clang__)
# pragma GCC diagnostic pop
#endif
for (const auto & file : {"checksums.txt", "columns.txt"})
request.emplace_back(fs::path(dir) / file);
request.emplace_back(fs::path(dir) / "default_compression_codec.txt", true);
request.emplace_back(fs::path(dir) / "delete-on-destroy.txt", true);
if (!is_projection)
request.emplace_back(fs::path(dir) / "txn_version.txt", true);
disk->removeSharedFiles(request, keep_shared_data);
disk->removeDirectory(dir);
}
catch (...)
{
/// Recursive directory removal does many excessive "stat" syscalls under the hood.
LOG_ERROR(log, "Cannot quickly remove directory {} by removing files; fallback to recursive removal. Reason: {}", fullPath(disk, dir), getCurrentExceptionMessage(false));
disk->removeSharedRecursive(fs::path(dir) / "", keep_shared_data);
}
}
DataPartStoragePtr DataPartStorageOnDisk::getProjection(const std::string & name) const
{
return std::make_shared<DataPartStorageOnDisk>(volume, std::string(fs::path(root_path) / part_dir), name);
@ -101,6 +271,16 @@ bool DataPartStorageOnDisk::supportZeroCopyReplication() const
return volume->getDisk()->supportZeroCopyReplication();
}
bool DataPartStorageOnDisk::isBroken() const
{
return volume->getDisk()->isBroken();
}
std::string DataPartStorageOnDisk::getDiskPathForLogs() const
{
return volume->getDisk()->getPath();
}
void DataPartStorageOnDisk::writeChecksums(MergeTreeDataPartChecksums & checksums) const
{
std::string path = fs::path(root_path) / part_dir / "checksums.txt";
@ -139,6 +319,20 @@ void DataPartStorageOnDisk::writeDeleteOnDestroyMarker(Poco::Logger * log) const
}
}
void DataPartStorageOnDisk::checkConsistency(const MergeTreeDataPartChecksums & checksums) const
{
checksums.checkSizes(volume->getDisk(), getFullRelativePath());
}
ReservationPtr DataPartStorageOnDisk::reserve(UInt64 bytes)
{
auto res = volume->reserve(bytes);
if (!res)
throw Exception(ErrorCodes::NOT_ENOUGH_SPACE, "Cannot reserve {}, not enough space", ReadableSize(bytes));
return res;
}
void DataPartStorageOnDisk::rename(const String & new_relative_path, Poco::Logger * log, bool remove_new_dir_if_exists, bool fsync)
{
if (!exists())
@ -198,12 +392,100 @@ bool DataPartStorageOnDisk::shallParticipateInMerges(const IStoragePolicy & stor
return !volume_ptr->areMergesAvoided();
}
String DataPartStorageOnDisk::getUniqueId() const
{
auto disk = volume->getDisk();
if (!disk->supportZeroCopyReplication())
throw Exception(fmt::format("Disk {} doesn't support zero-copy replication", disk->getName()), ErrorCodes::LOGICAL_ERROR);
return disk->getUniqueId(fs::path(getFullRelativePath()) / "checksums.txt");
}
std::string DataPartStorageOnDisk::getName() const
{
return volume->getDisk()->getName();
}
void DataPartStorageOnDisk::backup(
TemporaryFilesOnDisks & temp_dirs,
const MergeTreeDataPartChecksums & checksums,
const NameSet & files_without_checksums,
BackupEntries & backup_entries) const
{
auto disk = volume->getDisk();
auto temp_dir_it = temp_dirs.find(disk);
if (temp_dir_it == temp_dirs.end())
temp_dir_it = temp_dirs.emplace(disk, std::make_shared<TemporaryFileOnDisk>(disk, "tmp/backup_")).first;
auto temp_dir_owner = temp_dir_it->second;
fs::path temp_dir = temp_dir_owner->getPath();
fs::path temp_part_dir = temp_dir / part_dir;
disk->createDirectories(temp_part_dir);
for (const auto & [filepath, checksum] : checksums.files)
{
String relative_filepath = fs::path(part_dir) / filepath;
String full_filepath = fs::path(root_path) / part_dir / filepath;
String hardlink_filepath = temp_part_dir / filepath;
disk->createHardLink(full_filepath, hardlink_filepath);
UInt128 file_hash{checksum.file_hash.first, checksum.file_hash.second};
backup_entries.emplace_back(
relative_filepath,
std::make_unique<BackupEntryFromImmutableFile>(disk, hardlink_filepath, checksum.file_size, file_hash, temp_dir_owner));
}
for (const auto & filepath : files_without_checksums)
{
String relative_filepath = fs::path(part_dir) / filepath;
String full_filepath = fs::path(root_path) / part_dir / filepath;
backup_entries.emplace_back(relative_filepath, std::make_unique<BackupEntryFromSmallFile>(disk, full_filepath));
}
}
DataPartStoragePtr DataPartStorageOnDisk::freeze(
const std::string & to,
const std::string & dir_path,
std::function<void(const DiskPtr &)> save_metadata_callback) const
{
auto disk = volume->getDisk();
disk->createDirectories(to);
localBackup(disk, getFullRelativePath(), fs::path(to) / dir_path);
if (save_metadata_callback)
save_metadata_callback(disk);
disk->removeFileIfExists(fs::path(to) / dir_path / "delete-on-destroy.txt");
auto single_disk_volume = std::make_shared<SingleDiskVolume>(disk->getName(), disk, 0);
return std::make_shared<DataPartStorageOnDisk>(single_disk_volume, to, dir_path);
}
DataPartStoragePtr DataPartStorageOnDisk::clone(
const std::string & to,
const std::string & dir_path,
Poco::Logger * log) const
{
auto disk = volume->getDisk();
String path_to_clone = fs::path(to) / dir_path / "";
if (disk->exists(path_to_clone))
{
LOG_WARNING(log, "Path {} already exists. Will remove it and clone again.", fullPath(disk, path_to_clone));
disk->removeRecursive(path_to_clone);
}
disk->createDirectories(to);
volume->getDisk()->copy(getFullRelativePath(), disk, path_to_clone);
volume->getDisk()->removeFileIfExists(fs::path(path_to_clone) / "delete-on-destroy.txt");
auto single_disk_volume = std::make_shared<SingleDiskVolume>(disk->getName(), disk, 0);
return std::make_shared<DataPartStorageOnDisk>(single_disk_volume, to, dir_path);
}
DataPartStorageBuilderOnDisk::DataPartStorageBuilderOnDisk(VolumePtr volume_, std::string root_path_, std::string part_dir_)
: volume(std::move(volume_)), root_path(std::move(root_path_)), part_dir(std::move(part_dir_))
{
@ -270,4 +552,14 @@ DataPartStorageBuilderPtr DataPartStorageBuilderOnDisk::getProjection(const std:
return std::make_shared<DataPartStorageBuilderOnDisk>(volume, std::string(fs::path(root_path) / part_dir), name);
}
DataPartStoragePtr DataPartStorageBuilderOnDisk::getStorage() const
{
return std::make_shared<DataPartStorageOnDisk>(volume, root_path, part_dir);
}
void DataPartStorageBuilderOnDisk::setRelativePath(const std::string & path)
{
part_dir = path;
}
}

View File

@ -21,8 +21,8 @@ public:
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const override;
bool exists(const std::string & path) const override;
bool exists() const override;
bool exists(const std::string & path) const override;
Poco::Timestamp getLastModified() const override;
@ -31,20 +31,56 @@ public:
DiskDirectoryIteratorPtr iterate() const override;
DiskDirectoryIteratorPtr iterateDirectory(const std::string & path) const override;
void remove(
bool keep_shared_data,
const MergeTreeDataPartChecksums & checksums,
std::list<ProjectionChecksums> projections,
Poco::Logger * log) const override;
void setRelativePath(const std::string & path) override;
std::string getRelativePathForPrefix(Poco::Logger * log, const String & prefix, bool detached) const override;
std::string getRelativePath() const override { return part_dir; }
std::string getFullPath() const override;
std::string getFullRootPath() const override;
std::string getFullRelativePath() const override;
UInt64 calculateTotalSizeOnDisk() const override;
bool isStoredOnRemoteDisk() const override;
bool supportZeroCopyReplication() const override;
bool isBroken() const override;
std::string getDiskPathForLogs() const override;
void writeChecksums(MergeTreeDataPartChecksums & checksums) const override;
void writeColumns(NamesAndTypesList & columns) const override;
void writeDeleteOnDestroyMarker(Poco::Logger * log) const override;
void checkConsistency(const MergeTreeDataPartChecksums & checksums) const override;
ReservationPtr reserve(UInt64 bytes) override;
String getUniqueId() const override;
bool shallParticipateInMerges(const IStoragePolicy &) const override;
void backup(
TemporaryFilesOnDisks & temp_dirs,
const MergeTreeDataPartChecksums & checksums,
const NameSet & files_without_checksums,
BackupEntries & backup_entries) const override;
DataPartStoragePtr freeze(
const std::string & to,
const std::string & dir_path,
std::function<void(const DiskPtr &)> save_metadata_callback) const override;
DataPartStoragePtr clone(
const std::string & to,
const std::string & dir_path,
Poco::Logger * log) const override;
void rename(const String & new_relative_path, Poco::Logger * log, bool remove_new_dir_if_exists, bool fsync) override;
std::string getName() const override;
@ -55,17 +91,28 @@ private:
VolumePtr volume;
std::string root_path;
std::string part_dir;
void clearDirectory(
const std::string & dir,
bool keep_shared_data,
const MergeTreeDataPartChecksums & checksums,
const std::unordered_set<String> & skip_directories,
Poco::Logger * log,
bool is_projection) const;
};
class DataPartStorageBuilderOnDisk final : public IDataPartStorageBuilder
{
DataPartStorageBuilderOnDisk(VolumePtr volume_, std::string root_path_, std::string part_dir_);
void setRelativePath(const std::string & path) override;
bool exists() const override;
bool exists(const std::string & path) const override;
void createDirectories() override;
std::string getRelativePath() const override { return part_dir; }
std::string getFullPath() const override;
std::unique_ptr<ReadBufferFromFileBase> readFile(
@ -85,6 +132,8 @@ class DataPartStorageBuilderOnDisk final : public IDataPartStorageBuilder
DataPartStorageBuilderPtr getProjection(const std::string & name) const override;
DataPartStoragePtr getStorage() const override;
private:
VolumePtr volume;
std::string root_path;

View File

@ -21,6 +21,15 @@ using ReservationPtr = std::unique_ptr<IReservation>;
class IStoragePolicy;
class IDisk;
using DiskPtr = std::shared_ptr<IDisk>;
class IBackupEntry;
using BackupEntryPtr = std::unique_ptr<IBackupEntry>;
using BackupEntries = std::vector<std::pair<std::string, BackupEntryPtr>>;
class TemporaryFileOnDisk;
/// This is an abstraction of storage for data part files.
/// Generally, it contains read-only methods from IDisk.
class IDataPartStorage
@ -44,24 +53,73 @@ public:
virtual DiskDirectoryIteratorPtr iterate() const = 0;
virtual DiskDirectoryIteratorPtr iterateDirectory(const std::string & path) const = 0;
struct ProjectionChecksums
{
const std::string & name;
const MergeTreeDataPartChecksums & checksums;
};
virtual void remove(
bool keep_shared_data,
const MergeTreeDataPartChecksums & checksums,
std::list<ProjectionChecksums> projections,
Poco::Logger * log) const = 0;
virtual size_t getFileSize(const std::string & path) const = 0;
virtual std::string getRelativePathForPrefix(Poco::Logger * log, const String & prefix, bool detached) const = 0;
/// Reset part directory, used for im-memory parts
virtual void setRelativePath(const std::string & path) = 0;
virtual std::string getRelativePath() const = 0;
virtual std::string getFullPath() const = 0;
virtual std::string getFullRootPath() const = 0;
virtual std::string getFullRelativePath() const = 0;
virtual UInt64 calculateTotalSizeOnDisk() const = 0;
virtual bool isStoredOnRemoteDisk() const { return false; }
virtual bool supportZeroCopyReplication() const { return false; }
virtual bool isBroken() const = 0;
virtual std::string getDiskPathForLogs() const = 0;
/// Should remove it later
virtual void writeChecksums(MergeTreeDataPartChecksums & checksums) const = 0;
virtual void writeColumns(NamesAndTypesList & columns) const = 0;
virtual void writeDeleteOnDestroyMarker(Poco::Logger * log) const = 0;
virtual void checkConsistency(const MergeTreeDataPartChecksums & checksums) const = 0;
virtual ReservationPtr reserve(UInt64 /*bytes*/) { return nullptr; }
/// A leak of abstraction.
/// Return some uniq string for file.
/// Required for distinguish different copies of the same part on remote FS.
virtual String getUniqueId() const = 0;
/// A leak of abstraction
virtual bool shallParticipateInMerges(const IStoragePolicy &) const { return true; }
/// A leak of abstraction
using TemporaryFilesOnDisks = std::map<DiskPtr, std::shared_ptr<TemporaryFileOnDisk>>;
virtual void backup(
TemporaryFilesOnDisks & temp_dirs,
const MergeTreeDataPartChecksums & checksums,
const NameSet & files_without_checksums,
BackupEntries & backup_entries) const = 0;
/// A leak of abstraction
virtual std::shared_ptr<IDataPartStorage> freeze(
const std::string & to,
const std::string & dir_path,
std::function<void(const DiskPtr &)> save_metadata_callback) const = 0;
virtual std::shared_ptr<IDataPartStorage> clone(
const std::string & to,
const std::string & dir_path,
Poco::Logger * log) const = 0;
virtual void rename(const String & new_relative_path, Poco::Logger * log, bool remove_new_dir_if_exists, bool fsync);
/// Disk name
@ -77,11 +135,15 @@ class IDataPartStorageBuilder
public:
virtual ~IDataPartStorageBuilder() = default;
/// Reset part directory, used for im-memory parts
virtual void setRelativePath(const std::string & path) = 0;
virtual std::string getRelativePath() const = 0;
virtual std::string getFullPath() const = 0;
virtual bool exists() const = 0;
virtual bool exists(const std::string & path) const = 0;
virtual std::string getFullPath() const = 0;
virtual void createDirectories() = 0;
virtual std::unique_ptr<ReadBufferFromFileBase> readFile(
@ -100,6 +162,8 @@ public:
virtual ReservationPtr reserve(UInt64 /*bytes*/) { return nullptr; }
virtual std::shared_ptr<IDataPartStorageBuilder> getProjection(const std::string & name) const = 0;
virtual DataPartStoragePtr getStorage() const = 0;
};
using DataPartStorageBuilderPtr = std::shared_ptr<IDataPartStorageBuilder>;

View File

@ -475,10 +475,10 @@ void IMergeTreeDataPart::removeIfNeeded()
if (is_temp)
{
String file_name = fileName(relative_path);
String file_name = fileName(data_part_storage->getRelativePath());
if (file_name.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "relative_path {} of part {} is invalid or not set", relative_path, name);
throw Exception(ErrorCodes::LOGICAL_ERROR, "relative_path {} of part {} is invalid or not set", data_part_storage->getRelativePath(), name);
if (!startsWith(file_name, "tmp") && !endsWith(file_name, ".tmp_proj"))
{
@ -491,15 +491,7 @@ void IMergeTreeDataPart::removeIfNeeded()
}
}
if (parent_part)
{
std::optional<bool> keep_shared_data = keepSharedDataInDecoupledStorage();
if (!keep_shared_data.has_value())
return;
projectionRemove(parent_part->getFullRelativePath(), *keep_shared_data);
}
else
remove();
remove();
if (state == State::DeleteOnDestroy)
{
@ -1305,175 +1297,29 @@ void IMergeTreeDataPart::remove() const
if (!isStoredOnDisk())
return;
if (relative_path.empty())
throw Exception("Part relative_path cannot be empty. This is bug.", ErrorCodes::LOGICAL_ERROR);
if (isProjectionPart())
{
LOG_WARNING(storage.log, "Projection part {} should be removed by its parent {}.", name, parent_part->name);
projectionRemove(parent_part->getFullRelativePath(), *keep_shared_data);
return;
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Projection part {} should be removed by its parent {}.",
name, parent_part->name);
}
metadata_manager->deleteAll(false);
metadata_manager->assertAllDeleted(false);
/** Atomic directory removal:
* - rename directory to temporary name;
* - remove it recursive.
*
* For temporary name we use "delete_tmp_" prefix.
*
* NOTE: We cannot use "tmp_delete_" prefix, because there is a second thread,
* that calls "clearOldTemporaryDirectories" and removes all directories, that begin with "tmp_" and are old enough.
* But when we removing data part, it can be old enough. And rename doesn't change mtime.
* And a race condition can happen that will lead to "File not found" error here.
*/
std::list<IDataPartStorage::ProjectionChecksums> projection_checksums;
/// NOTE We rename part to delete_tmp_<relative_path> instead of delete_tmp_<name> to avoid race condition
/// when we try to remove two parts with the same name, but different relative paths,
/// for example all_1_2_1 (in Deleting state) and tmp_merge_all_1_2_1 (in Temporary state).
fs::path from = fs::path(storage.relative_data_path) / relative_path;
fs::path to = fs::path(storage.relative_data_path) / ("delete_tmp_" + relative_path);
// TODO directory delete_tmp_<name> is never removed if server crashes before returning from this function
auto disk = volume->getDisk();
if (disk->exists(to))
{
LOG_WARNING(storage.log, "Directory {} (to which part must be renamed before removing) already exists. Most likely this is due to unclean restart or race condition. Removing it.", fullPath(disk, to));
try
{
disk->removeSharedRecursive(fs::path(to) / "", *keep_shared_data);
}
catch (...)
{
LOG_ERROR(storage.log, "Cannot recursively remove directory {}. Exception: {}", fullPath(disk, to), getCurrentExceptionMessage(false));
throw;
}
}
try
{
disk->moveDirectory(from, to);
}
catch (const fs::filesystem_error & e)
{
if (e.code() == std::errc::no_such_file_or_directory)
{
LOG_ERROR(storage.log, "Directory {} (part to remove) doesn't exist or one of nested files has gone. Most likely this is due to manual removing. This should be discouraged. Ignoring.", fullPath(disk, to));
return;
}
throw;
}
// Record existing projection directories so we don't remove them twice
std::unordered_set<String> projection_directories;
for (const auto & [p_name, projection_part] : projection_parts)
{
projection_part->projectionRemove(to, *keep_shared_data);
projection_directories.emplace(p_name + ".proj");
projection_part->metadata_manager->deleteAll(false);
projection_part->metadata_manager->assertAllDeleted(false);
projection_checksums.emplace_back(p_name, projection_part->checksums);
}
if (checksums.empty())
{
/// If the part is not completely written, we cannot use fast path by listing files.
disk->removeSharedRecursive(fs::path(to) / "", *keep_shared_data);
}
else
{
try
{
/// Remove each expected file in directory, then remove directory itself.
IDisk::RemoveBatchRequest request;
#if !defined(__clang__)
# pragma GCC diagnostic push
# pragma GCC diagnostic ignored "-Wunused-variable"
#endif
for (const auto & [file, _] : checksums.files)
{
if (projection_directories.find(file) == projection_directories.end())
request.emplace_back(fs::path(to) / file);
}
#if !defined(__clang__)
# pragma GCC diagnostic pop
#endif
for (const auto & file : {"checksums.txt", "columns.txt"})
request.emplace_back(fs::path(to) / file);
request.emplace_back(fs::path(to) / DEFAULT_COMPRESSION_CODEC_FILE_NAME, true);
request.emplace_back(fs::path(to) / DELETE_ON_DESTROY_MARKER_FILE_NAME, true);
disk->removeSharedFiles(request, *keep_shared_data);
disk->removeDirectory(to);
}
catch (...)
{
/// Recursive directory removal does many excessive "stat" syscalls under the hood.
LOG_ERROR(storage.log, "Cannot quickly remove directory {} by removing files; fallback to recursive removal. Reason: {}", fullPath(disk, to), getCurrentExceptionMessage(false));
disk->removeSharedRecursive(fs::path(to) / "", *keep_shared_data);
}
}
data_part_storage->remove(*keep_shared_data, checksums, projection_checksums, storage.log);
}
void IMergeTreeDataPart::projectionRemove(const String & parent_to, bool keep_shared_data) const
{
metadata_manager->deleteAll(false);
metadata_manager->assertAllDeleted(false);
String to = fs::path(parent_to) / relative_path;
auto disk = volume->getDisk();
if (checksums.empty())
{
LOG_ERROR(
storage.log,
"Cannot quickly remove directory {} by removing files; fallback to recursive removal. Reason: checksums.txt is missing",
fullPath(disk, to));
/// If the part is not completely written, we cannot use fast path by listing files.
disk->removeSharedRecursive(fs::path(to) / "", keep_shared_data);
}
else
{
try
{
/// Remove each expected file in directory, then remove directory itself.
IDisk::RemoveBatchRequest request;
#if !defined(__clang__)
# pragma GCC diagnostic push
# pragma GCC diagnostic ignored "-Wunused-variable"
#endif
for (const auto & [file, _] : checksums.files)
request.emplace_back(fs::path(to) / file);
#if !defined(__clang__)
# pragma GCC diagnostic pop
#endif
for (const auto & file : {"checksums.txt", "columns.txt"})
request.emplace_back(fs::path(to) / file);
request.emplace_back(fs::path(to) / DEFAULT_COMPRESSION_CODEC_FILE_NAME, true);
request.emplace_back(fs::path(to) / DELETE_ON_DESTROY_MARKER_FILE_NAME, true);
disk->removeSharedFiles(request, keep_shared_data);
disk->removeSharedRecursive(to, keep_shared_data);
}
catch (...)
{
/// Recursive directory removal does many excessive "stat" syscalls under the hood.
LOG_ERROR(storage.log, "Cannot quickly remove directory {} by removing files; fallback to recursive removal. Reason: {}", fullPath(disk, to), getCurrentExceptionMessage(false));
disk->removeSharedRecursive(fs::path(to) / "", keep_shared_data);
}
}
}
String IMergeTreeDataPart::getRelativePathForPrefix(const String & prefix, bool detached) const
{
String res;
@ -1484,25 +1330,10 @@ String IMergeTreeDataPart::getRelativePathForPrefix(const String & prefix, bool
* No more than 10 attempts are made so that there are not too many junk directories left.
*/
auto full_relative_path = fs::path(storage.relative_data_path);
if (detached)
full_relative_path /= "detached";
if (detached && parent_part)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot detach projection");
else if (parent_part)
full_relative_path /= parent_part->relative_path;
for (int try_no = 0; try_no < 10; ++try_no)
{
res = (prefix.empty() ? "" : prefix + "_") + name + (try_no ? "_try" + DB::toString(try_no) : "");
if (!volume->getDisk()->exists(full_relative_path / res))
return res;
LOG_WARNING(storage.log, "Directory {} (to detach to) already exists. Will detach to directory with '_tryN' suffix.", res);
}
return res;
return data_part_storage->getRelativePathForPrefix(storage.log, prefix, detached);
}
String IMergeTreeDataPart::getRelativePathForDetachedPart(const String & prefix) const
@ -1522,36 +1353,24 @@ void IMergeTreeDataPart::renameToDetached(const String & prefix) const
void IMergeTreeDataPart::makeCloneInDetached(const String & prefix, const StorageMetadataPtr & /*metadata_snapshot*/) const
{
String destination_path = fs::path(storage.relative_data_path) / getRelativePathForDetachedPart(prefix);
localBackup(volume->getDisk(), getFullRelativePath(), destination_path);
volume->getDisk()->removeFileIfExists(fs::path(destination_path) / DELETE_ON_DESTROY_MARKER_FILE_NAME);
data_part_storage->freeze(storage.relative_data_path, getRelativePathForDetachedPart(prefix), {});
}
void IMergeTreeDataPart::makeCloneOnDisk(const DiskPtr & disk, const String & directory_name) const
{
assertOnDisk();
if (disk->getName() == volume->getDisk()->getName())
throw Exception("Can not clone data part " + name + " to same disk " + volume->getDisk()->getName(), ErrorCodes::LOGICAL_ERROR);
if (disk->getName() == data_part_storage->getName())
throw Exception("Can not clone data part " + name + " to same disk " + data_part_storage->getName(), ErrorCodes::LOGICAL_ERROR);
if (directory_name.empty())
throw Exception("Can not clone data part " + name + " to empty directory.", ErrorCodes::LOGICAL_ERROR);
String path_to_clone = fs::path(storage.relative_data_path) / directory_name / "";
if (disk->exists(fs::path(path_to_clone) / relative_path))
{
LOG_WARNING(storage.log, "Path {} already exists. Will remove it and clone again.", fullPath(disk, path_to_clone + relative_path));
disk->removeRecursive(fs::path(path_to_clone) / relative_path / "");
}
disk->createDirectories(path_to_clone);
volume->getDisk()->copy(getFullRelativePath(), disk, path_to_clone);
volume->getDisk()->removeFileIfExists(fs::path(path_to_clone) / DELETE_ON_DESTROY_MARKER_FILE_NAME);
data_part_storage->clone(path_to_clone, data_part_storage->getRelativePath(), storage.log);
}
void IMergeTreeDataPart::checkConsistencyBase() const
{
String path = getFullRelativePath();
auto metadata_snapshot = storage.getInMemoryMetadataPtr();
if (parent_part)
metadata_snapshot = metadata_snapshot->projections.get(name).metadata;
@ -1585,33 +1404,37 @@ void IMergeTreeDataPart::checkConsistencyBase() const
}
}
checksums.checkSizes(volume->getDisk(), path);
data_part_storage->checkConsistency(checksums);
}
else
{
auto check_file_not_empty = [&path](const DiskPtr & disk_, const String & file_path)
auto check_file_not_empty = [this](const String & file_path)
{
UInt64 file_size;
if (!disk_->exists(file_path) || (file_size = disk_->getFileSize(file_path)) == 0)
throw Exception("Part " + fullPath(disk_, path) + " is broken: " + fullPath(disk_, file_path) + " is empty", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
if (!data_part_storage->exists(file_path) || (file_size = data_part_storage->getFileSize(file_path)) == 0)
throw Exception(
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART,
"Part {} is broken: {} is empty",
data_part_storage->getFullPath(),
std::string(fs::path(data_part_storage->getFullPath()) / file_path));
return file_size;
};
/// Check that the primary key index is not empty.
if (!pk.column_names.empty())
check_file_not_empty(volume->getDisk(), fs::path(path) / "primary.idx");
check_file_not_empty("primary.idx");
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
check_file_not_empty(volume->getDisk(), fs::path(path) / "count.txt");
check_file_not_empty("count.txt");
if (metadata_snapshot->hasPartitionKey())
check_file_not_empty(volume->getDisk(), fs::path(path) / "partition.dat");
check_file_not_empty("partition.dat");
if (!parent_part)
{
for (const String & col_name : storage.getMinMaxColumnsNames(partition_key))
check_file_not_empty(volume->getDisk(), fs::path(path) / ("minmax_" + escapeForFileName(col_name) + ".idx"));
check_file_not_empty("minmax_" + escapeForFileName(col_name) + ".idx");
}
}
}
@ -1741,11 +1564,7 @@ bool IMergeTreeDataPart::checkAllTTLCalculated(const StorageMetadataPtr & metada
String IMergeTreeDataPart::getUniqueId() const
{
auto disk = volume->getDisk();
if (!disk->supportZeroCopyReplication())
throw Exception(fmt::format("Disk {} doesn't support zero-copy replication", disk->getName()), ErrorCodes::LOGICAL_ERROR);
return disk->getUniqueId(fs::path(getFullRelativePath()) / FILE_FOR_REFERENCES_CHECK);
return data_part_storage->getUniqueId();
}
String IMergeTreeDataPart::getZeroLevelPartBlockID(std::string_view token) const
@ -1785,11 +1604,11 @@ IMergeTreeDataPart::uint128 IMergeTreeDataPart::getActualChecksumByFile(const St
return it->second.file_hash;
}
if (!volume->getDisk()->exists(file_path))
if (!data_part_storage->exists(file_path))
{
return {};
}
std::unique_ptr<ReadBufferFromFileBase> in_file = volume->getDisk()->readFile(file_path);
std::unique_ptr<ReadBufferFromFileBase> in_file = data_part_storage->readFile(file_path, {}, std::nullopt, std::nullopt);
HashingReadBuffer in_hash(*in_file);
String value;

View File

@ -146,8 +146,6 @@ public:
void remove() const;
void projectionRemove(const String & parent_to, bool keep_shared_data = false) const;
/// Initialize columns (from columns.txt if exists, or create from column files if not).
/// Load checksums from checksums.txt if exists. Load index if required.
void loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency);

View File

@ -1585,11 +1585,9 @@ void MergeTreeData::flushAllInMemoryPartsIfNeeded()
{
if (auto part_in_memory = asInMemoryPart(part))
{
const auto & storage_relative_path = part_in_memory->storage.relative_data_path;
part_in_memory->flushToDisk(storage_relative_path, part_in_memory->data_part_storage->getRelativePath(), metadata_snapshot);
part_in_memory->flushToDisk(part_in_memory->data_part_storage->getRelativePath(), metadata_snapshot);
}
}
}
size_t MergeTreeData::clearOldPartsFromFilesystem(bool force)
@ -3663,36 +3661,7 @@ BackupEntries MergeTreeData::backupDataParts(const DataPartsVector & data_parts)
std::map<DiskPtr, std::shared_ptr<TemporaryFileOnDisk>> temp_dirs;
for (const auto & part : data_parts)
{
auto disk = part->volume->getDisk();
auto temp_dir_it = temp_dirs.find(disk);
if (temp_dir_it == temp_dirs.end())
temp_dir_it = temp_dirs.emplace(disk, std::make_shared<TemporaryFileOnDisk>(disk, "tmp/backup_")).first;
auto temp_dir_owner = temp_dir_it->second;
fs::path temp_dir = temp_dir_owner->getPath();
fs::path part_dir = part->getFullRelativePath();
fs::path temp_part_dir = temp_dir / part->relative_path;
disk->createDirectories(temp_part_dir);
for (const auto & [filepath, checksum] : part->checksums.files)
{
String relative_filepath = fs::path(part->relative_path) / filepath;
String hardlink_filepath = temp_part_dir / filepath;
disk->createHardLink(part_dir / filepath, hardlink_filepath);
UInt128 file_hash{checksum.file_hash.first, checksum.file_hash.second};
backup_entries.emplace_back(
relative_filepath,
std::make_unique<BackupEntryFromImmutableFile>(disk, hardlink_filepath, checksum.file_size, file_hash, temp_dir_owner));
}
for (const auto & filepath : part->getFileNamesWithoutChecksums())
{
String relative_filepath = fs::path(part->relative_path) / filepath;
backup_entries.emplace_back(relative_filepath, std::make_unique<BackupEntryFromSmallFile>(disk, part_dir / filepath));
}
}
part->data_part_storage->backup(temp_dirs, part->checksums, part->getFileNamesWithoutChecksums(), backup_entries);
return backup_entries;
}
@ -4200,7 +4169,7 @@ ReservationPtr MergeTreeData::reserveSpace(UInt64 expected_size, SpacePtr space)
return checkAndReturnReservation(expected_size, std::move(reservation));
}
ReservationPtr MergeTreeData::reserveSpace(UInt64 expected_size, const DataPartStorageBuilderPtr & data_part_storage_builder) const
ReservationPtr MergeTreeData::reserveSpace(UInt64 expected_size, const DataPartStorageBuilderPtr & data_part_storage_builder)
{
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
return data_part_storage_builder->reserve(expected_size);
@ -4322,11 +4291,11 @@ bool MergeTreeData::isPartInTTLDestination(const TTLDescription & ttl, const IMe
if (ttl.destination_type == DataDestinationType::VOLUME)
{
for (const auto & disk : policy->getVolumeByName(ttl.destination_name)->getDisks())
if (disk->getName() == part.volume->getDisk()->getName())
if (disk->getName() == part.data_part_storage->getName())
return true;
}
else if (ttl.destination_type == DataDestinationType::DISK)
return policy->getDiskByName(ttl.destination_name)->getName() == part.volume->getDisk()->getName();
return policy->getDiskByName(ttl.destination_name)->getName() == part.data_part_storage->getName();
return false;
}
@ -5323,7 +5292,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk(
bool does_storage_policy_allow_same_disk = false;
for (const DiskPtr & disk : getStoragePolicy()->getDisks())
{
if (disk->getName() == src_part->volume->getDisk()->getName())
if (disk->getName() == src_part->data_part_storage->getName())
{
does_storage_policy_allow_same_disk = true;
break;
@ -5331,41 +5300,36 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk(
}
if (!does_storage_policy_allow_same_disk)
throw Exception(
"Could not clone and load part " + quoteString(src_part->getFullPath()) + " because disk does not belong to storage policy",
ErrorCodes::BAD_ARGUMENTS);
ErrorCodes::BAD_ARGUMENTS,
"Could not clone and load part {} because disk does not belong to storage policy",
quoteString(src_part->data_part_storage->getFullPath()));
String dst_part_name = src_part->getNewName(dst_part_info);
String tmp_dst_part_name = tmp_part_prefix + dst_part_name;
auto reservation = reserveSpace(src_part->getBytesOnDisk(), src_part->volume->getDisk());
auto disk = reservation->getDisk();
String src_part_path = src_part->getFullRelativePath();
String dst_part_path = relative_data_path + tmp_dst_part_name;
/// Why it is needed if we only hardlink files?
auto reservation = src_part->data_part_storage->reserve(src_part->getBytesOnDisk()); //reserveSpace(src_part->getBytesOnDisk(), src_part->volume->getDisk());
if (disk->exists(dst_part_path))
throw Exception("Part in " + fullPath(disk, dst_part_path) + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
auto src_part_storage = src_part->data_part_storage;
/// If source part is in memory, flush it to disk and clone it already in on-disk format
if (auto src_part_in_memory = asInMemoryPart(src_part))
{
const auto & src_relative_data_path = src_part_in_memory->storage.relative_data_path;
auto flushed_part_path = src_part_in_memory->getRelativePathForPrefix(tmp_part_prefix);
src_part_in_memory->flushToDisk(src_relative_data_path, flushed_part_path, metadata_snapshot);
src_part_path = fs::path(src_relative_data_path) / flushed_part_path / "";
src_part_storage = src_part_in_memory->flushToDisk(flushed_part_path, metadata_snapshot);
}
LOG_DEBUG(log, "Cloning part {} to {}", fullPath(disk, src_part_path), fullPath(disk, dst_part_path));
localBackup(disk, src_part_path, dst_part_path);
disk->removeFileIfExists(fs::path(dst_part_path) / IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME);
LOG_DEBUG(log, "Cloning part {} to {}",
src_part_storage->getFullPath(),
std::string(fs::path(src_part_storage->getFullRootPath()) / tmp_dst_part_name));
auto single_disk_volume = std::make_shared<SingleDiskVolume>(disk->getName(), disk, 0);
auto data_part_storage = std::make_shared<DataPartStorageOnDisk>(single_disk_volume, relative_data_path, tmp_dst_part_name);
auto dst_data_part = createPart(dst_part_name, dst_part_info, data_part_storage);
auto dst_part_storage = src_part_storage->freeze(relative_data_path, tmp_dst_part_name, {});
auto dst_data_part = createPart(dst_part_name, dst_part_info, dst_part_storage);
dst_data_part->is_temp = true;
dst_data_part->loadColumnsChecksumsIndexes(require_part_metadata, true);
dst_data_part->modification_time = disk->getLastModified(dst_part_path).epochTime();
dst_data_part->modification_time = dst_part_storage->getLastModified().epochTime();
return dst_data_part;
}
@ -5407,14 +5371,13 @@ Strings MergeTreeData::getDataPaths() const
void MergeTreeData::reportBrokenPart(MergeTreeData::DataPartPtr & data_part) const
{
if (data_part->volume && data_part->volume->getDisk()->isBroken())
if (data_part->data_part_storage && data_part->data_part_storage->isBroken())
{
auto disk = data_part->volume->getDisk();
auto parts = getDataParts();
LOG_WARNING(log, "Scanning parts to recover on broken disk {}.", disk->getName() + "@" + disk->getPath());
LOG_WARNING(log, "Scanning parts to recover on broken disk {}@{}.", data_part->data_part_storage->getName(), data_part->data_part_storage->getDiskPathForLogs());
for (const auto & part : parts)
{
if (part->volume && part->volume->getDisk()->getName() == disk->getName())
if (part->data_part_storage && part->data_part_storage->getName() == data_part->data_part_storage->getName())
broken_part_callback(part->name);
}
}
@ -5503,33 +5466,34 @@ PartitionCommandsResultInfo MergeTreeData::freezePartitionsByMatcher(
LOG_DEBUG(log, "Freezing part {} snapshot will be placed at {}", part->name, backup_path);
auto disk = part->volume->getDisk();
//auto disk = part->volume->getDisk();
disk->createDirectories(backup_path);
String src_part_path = part->getFullRelativePath();
String backup_part_path = fs::path(backup_path) / relative_data_path / part->relative_path;
String src_part_path = part->data_part_storage->getFullRelativePath();
String backup_part_path = fs::path(backup_path) / relative_data_path;
if (auto part_in_memory = asInMemoryPart(part))
{
auto flushed_part_path = part_in_memory->getRelativePathForPrefix("tmp_freeze");
part_in_memory->flushToDisk(relative_data_path, flushed_part_path, metadata_snapshot);
src_part_path = fs::path(relative_data_path) / flushed_part_path / "";
src_part_path = part_in_memory->flushToDisk(flushed_part_path, metadata_snapshot)->getFullRelativePath();
}
localBackup(disk, src_part_path, backup_part_path);
auto new_storage = part->data_part_storage->freeze(
backup_part_path,
part->data_part_storage->getRelativePath(),
[this, &part, &backup_part_path](const DiskPtr & disk)
{
// Store metadata for replicated table.
// Do nothing for non-replocated.
createAndStoreFreezeMetadata(disk, part, backup_part_path);
disk->removeFileIfExists(fs::path(backup_part_path) / IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME);
// Store metadata for replicated table.
// Do nothing for non-replocated.
createAndStoreFreezeMetadata(disk, part, fs::path(backup_part_path) / part->data_part_storage->getRelativePath());
});
part->is_frozen.store(true, std::memory_order_relaxed);
result.push_back(PartitionCommandResultInfo{
.partition_id = part->info.partition_id,
.part_name = part->name,
.backup_path = fs::path(disk->getPath()) / backup_path,
.part_backup_path = fs::path(disk->getPath()) / backup_part_path,
.backup_path = new_storage->getFullRootPath(),
.part_backup_path = new_storage->getFullPath(),
.backup_name = backup_name,
});
++parts_processed;
@ -5667,8 +5631,8 @@ try
if (result_part)
{
part_log_elem.disk_name = result_part->volume->getDisk()->getName();
part_log_elem.path_on_disk = result_part->getFullPath();
part_log_elem.disk_name = result_part->data_part_storage->getName();
part_log_elem.path_on_disk = result_part->data_part_storage->getFullPath();
part_log_elem.bytes_compressed_on_disk = result_part->getBytesOnDisk();
part_log_elem.rows = result_part->rows_count;
}
@ -6084,7 +6048,7 @@ ReservationPtr MergeTreeData::balancedReservation(
if (part->isStoredOnDisk() && part->getBytesOnDisk() >= min_bytes_to_rebalance_partition_over_jbod
&& part_info.partition_id == part->info.partition_id)
{
auto name = part->volume->getDisk()->getName();
auto name = part->data_part_storage->getName();
auto it = disk_occupation.find(name);
if (it != disk_occupation.end())
{

View File

@ -409,7 +409,7 @@ public:
SelectQueryInfo & info) const override;
ReservationPtr reserveSpace(UInt64 expected_size, VolumePtr & volume) const;
ReservationPtr reserveSpace(UInt64 expected_size, const DataPartStorageBuilderPtr & data_part_storage_builder) const;
static ReservationPtr reserveSpace(UInt64 expected_size, const DataPartStorageBuilderPtr & data_part_storage_builder);
static bool partsContainSameProjections(const DataPartPtr & left, const DataPartPtr & right);

View File

@ -54,7 +54,7 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartInMemory::getReader(
}
IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartInMemory::getWriter(
DataPartStorageBuilderPtr,
DataPartStorageBuilderPtr data_part_storage_builder_,
const NamesAndTypesList & columns_list,
const StorageMetadataPtr & metadata_snapshot,
const std::vector<MergeTreeIndexPtr> & /* indices_to_recalc */,
@ -62,65 +62,70 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartInMemory::getWriter(
const MergeTreeWriterSettings & writer_settings,
const MergeTreeIndexGranularity & /* computed_index_granularity */) const
{
data_part_storage_builder = data_part_storage_builder_;
auto ptr = std::static_pointer_cast<const MergeTreeDataPartInMemory>(shared_from_this());
return std::make_unique<MergeTreeDataPartWriterInMemory>(
ptr, columns_list, metadata_snapshot, writer_settings);
}
void MergeTreeDataPartInMemory::flushToDisk(const String & base_path, const String & new_relative_path, const StorageMetadataPtr & metadata_snapshot) const
DataPartStoragePtr MergeTreeDataPartInMemory::flushToDisk(const String & new_relative_path, const StorageMetadataPtr & metadata_snapshot) const
{
const auto & disk = volume->getDisk();
String destination_path = base_path + new_relative_path;
auto current_full_path = data_part_storage_builder->getFullPath();
data_part_storage_builder->setRelativePath(new_relative_path);
auto new_type = storage.choosePartTypeOnDisk(block.bytes(), rows_count);
auto new_data_part = storage.createPart(name, new_type, info, volume, new_relative_path);
auto new_data_part_storage = data_part_storage_builder->getStorage();
auto new_data_part = storage.createPart(name, new_type, info, new_data_part_storage);
new_data_part->uuid = uuid;
new_data_part->setColumns(columns);
new_data_part->partition.value = partition.value;
new_data_part->minmax_idx = minmax_idx;
if (disk->exists(destination_path))
if (data_part_storage_builder->exists())
{
throw Exception("Could not flush part " + quoteString(getFullPath())
+ ". Part in " + fullPath(disk, destination_path) + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
throw Exception(
ErrorCodes::DIRECTORY_ALREADY_EXISTS,
"Could not flush part {}. Part in {} already exists",
quoteString(current_full_path),
data_part_storage_builder->getFullPath());
}
disk->createDirectories(destination_path);
data_part_storage_builder->createDirectories();
auto compression_codec = storage.getContext()->chooseCompressionCodec(0, 0);
auto indices = MergeTreeIndexFactory::instance().getMany(metadata_snapshot->getSecondaryIndices());
MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns, indices, compression_codec);
MergedBlockOutputStream out(new_data_part, data_part_storage_builder, metadata_snapshot, columns, indices, compression_codec);
out.write(block);
const auto & projections = metadata_snapshot->getProjections();
for (const auto & [projection_name, projection] : projection_parts)
{
if (projections.has(projection_name))
{
String projection_destination_path = fs::path(destination_path) / projection_name / ".proj";
if (disk->exists(projection_destination_path))
auto projection_part_storage_builder = data_part_storage_builder->getProjection(projection_name + ".proj");
if (projection_part_storage_builder->exists())
{
throw Exception(
ErrorCodes::DIRECTORY_ALREADY_EXISTS,
"Could not flush projection part {}. Projection part in {} already exists",
projection_name,
fullPath(disk, projection_destination_path));
projection_part_storage_builder->getFullPath());
}
auto projection_part = asInMemoryPart(projection);
auto projection_type = storage.choosePartTypeOnDisk(projection_part->block.bytes(), rows_count);
MergeTreePartInfo projection_info("all", 0, 0, 0);
auto projection_data_part
= storage.createPart(projection_name, projection_type, projection_info, volume, projection_name + ".proj", parent_part);
= storage.createPart(projection_name, projection_type, projection_info, projection_part_storage_builder->getStorage(), parent_part);
projection_data_part->is_temp = false; // clean up will be done on parent part
projection_data_part->setColumns(projection->getColumns());
disk->createDirectories(projection_destination_path);
projection_part_storage_builder->createDirectories();
const auto & desc = projections.get(name);
auto projection_compression_codec = storage.getContext()->chooseCompressionCodec(0, 0);
auto projection_indices = MergeTreeIndexFactory::instance().getMany(desc.metadata->getSecondaryIndices());
MergedBlockOutputStream projection_out(
projection_data_part, desc.metadata, projection_part->columns, projection_indices,
projection_data_part, projection_part_storage_builder, desc.metadata, projection_part->columns, projection_indices,
projection_compression_codec);
projection_out.write(projection_part->block);
@ -130,17 +135,21 @@ void MergeTreeDataPartInMemory::flushToDisk(const String & base_path, const Stri
}
out.finalizePart(new_data_part, false);
return new_data_part_storage;
}
void MergeTreeDataPartInMemory::makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot) const
{
String detached_path = getRelativePathForDetachedPart(prefix);
flushToDisk(storage.getRelativeDataPath(), detached_path, metadata_snapshot);
flushToDisk(detached_path, metadata_snapshot);
}
void MergeTreeDataPartInMemory::renameTo(const String & new_relative_path, bool /* remove_new_dir_if_exists */) const
{
relative_path = new_relative_path;
data_part_storage->setRelativePath(new_relative_path);
if (data_part_storage_builder)
data_part_storage_builder->setRelativePath(new_relative_path);
}
void MergeTreeDataPartInMemory::calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const

View File

@ -34,7 +34,7 @@ public:
const ReadBufferFromFileBase::ProfileCallback & profile_callback) const override;
MergeTreeWriterPtr getWriter(
DataPartStorageBuilderPtr,
DataPartStorageBuilderPtr data_part_storage_builder_,
const NamesAndTypesList & columns_list,
const StorageMetadataPtr & metadata_snapshot,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
@ -49,12 +49,13 @@ public:
void renameTo(const String & new_relative_path, bool remove_new_dir_if_exists) const override;
void makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot) const override;
void flushToDisk(const String & base_path, const String & new_relative_path, const StorageMetadataPtr & metadata_snapshot) const;
DataPartStoragePtr flushToDisk(const String & new_relative_path, const StorageMetadataPtr & metadata_snapshot) const;
/// Returns hash of parts's block
Checksum calculateBlockChecksum() const;
mutable Block block;
mutable DataPartStorageBuilderPtr data_part_storage_builder;
private:
mutable std::condition_variable is_merged;