better semantic of constsness of DataPartStorage

This commit is contained in:
Anton Popov 2022-10-23 03:29:26 +00:00
parent 56e5daba0c
commit b40d9200d2
60 changed files with 456 additions and 459 deletions

View File

@ -13,9 +13,9 @@ namespace DB
using Checksum = CityHash_v1_0_2::uint128;
CompressionCodecPtr getCompressionCodecForFile(const DataPartStoragePtr & data_part_storage, const String & relative_path)
CompressionCodecPtr getCompressionCodecForFile(const IDataPartStorage & data_part_storage, const String & relative_path)
{
auto read_buffer = data_part_storage->readFile(relative_path, {}, std::nullopt, std::nullopt);
auto read_buffer = data_part_storage.readFile(relative_path, {}, std::nullopt, std::nullopt);
read_buffer->ignore(sizeof(Checksum));
UInt8 header_size = ICompressionCodec::getHeaderSize();

View File

@ -11,6 +11,6 @@ namespace DB
/// clickhouse fashion (with checksums, headers for each block, etc). This
/// method should be used as fallback when we cannot deduce compression codec
/// from metadata.
CompressionCodecPtr getCompressionCodecForFile(const DataPartStoragePtr & data_part_storage, const String & relative_path);
CompressionCodecPtr getCompressionCodecForFile(const IDataPartStorage & data_part_storage, const String & relative_path);
}

View File

@ -207,8 +207,8 @@ bool PartLog::addNewParts(
elem.table_name = table_id.table_name;
elem.partition_id = part->info.partition_id;
elem.part_name = part->name;
elem.disk_name = part->data_part_storage->getDiskName();
elem.path_on_disk = part->data_part_storage->getFullPath();
elem.disk_name = part->getDataPartStorage().getDiskName();
elem.path_on_disk = part->getDataPartStorage().getFullPath();
elem.part_type = part->getType();
elem.bytes_compressed_on_disk = part->getBytesOnDisk();

View File

@ -30,11 +30,6 @@ DataPartStorageOnDisk::DataPartStorageOnDisk(VolumePtr volume_, std::string root
{
}
std::shared_ptr<IDataPartStorage> DataPartStorageOnDisk::clone() const
{
return std::make_shared<DataPartStorageOnDisk>(volume, root_path, part_dir);
}
std::string DataPartStorageOnDisk::getFullPath() const
{
return fs::path(volume->getDisk()->getPath()) / root_path / part_dir / "";
@ -55,12 +50,7 @@ std::string DataPartStorageOnDisk::getFullRootPath() const
return fs::path(volume->getDisk()->getPath()) / root_path / "";
}
DataPartStoragePtr DataPartStorageOnDisk::getProjection(const std::string & name) const
{
return std::make_shared<DataPartStorageOnDisk>(volume, std::string(fs::path(root_path) / part_dir), name);
}
MutableDataPartStoragePtr DataPartStorageOnDisk::getProjection(const std::string & name)
MutableDataPartStoragePtr DataPartStorageOnDisk::getProjection(const std::string & name) const
{
return std::make_shared<DataPartStorageOnDisk>(volume, std::string(fs::path(root_path) / part_dir), name);
}
@ -279,7 +269,7 @@ void DataPartStorageOnDisk::remove(
try
{
disk->moveDirectory(from, to);
onRename(root_path, part_dir_without_slash);
part_dir = part_dir_without_slash;
}
catch (const Exception & e)
{
@ -524,7 +514,7 @@ bool DataPartStorageOnDisk::isBroken() const
return volume->getDisk()->isBroken();
}
void DataPartStorageOnDisk::syncRevision(UInt64 revision)
void DataPartStorageOnDisk::syncRevision(UInt64 revision) const
{
volume->getDisk()->syncRevision(revision);
}
@ -549,7 +539,7 @@ DataPartStorageOnDisk::DisksSet::const_iterator DataPartStorageOnDisk::isStoredO
return disks.find(volume->getDisk());
}
ReservationPtr DataPartStorageOnDisk::reserve(UInt64 bytes)
ReservationPtr DataPartStorageOnDisk::reserve(UInt64 bytes) const
{
auto res = volume->reserve(bytes);
if (!res)
@ -558,7 +548,7 @@ ReservationPtr DataPartStorageOnDisk::reserve(UInt64 bytes)
return res;
}
ReservationPtr DataPartStorageOnDisk::tryReserve(UInt64 bytes)
ReservationPtr DataPartStorageOnDisk::tryReserve(UInt64 bytes) const
{
return volume->reserve(bytes);
}
@ -845,12 +835,6 @@ MutableDataPartStoragePtr DataPartStorageOnDisk::clonePart(
return std::make_shared<DataPartStorageOnDisk>(single_disk_volume, to, dir_path);
}
void DataPartStorageOnDisk::onRename(const std::string & new_root_path, const std::string & new_part_dir)
{
part_dir = new_part_dir;
root_path = new_root_path;
}
void DataPartStorageOnDisk::rename(
const std::string & new_root_path,
const std::string & new_part_dir,

View File

@ -15,15 +15,13 @@ class DataPartStorageOnDisk final : public IDataPartStorage
{
public:
DataPartStorageOnDisk(VolumePtr volume_, std::string root_path_, std::string part_dir_);
std::shared_ptr<IDataPartStorage> clone() const override;
std::string getFullPath() const override;
std::string getRelativePath() const override;
std::string getPartDirectory() const override { return part_dir; }
std::string getFullRootPath() const override;
DataPartStoragePtr getProjection(const std::string & name) const override;
MutableDataPartStoragePtr getProjection(const std::string & name) override;
MutableDataPartStoragePtr getProjection(const std::string & name) const override;
bool exists() const override;
bool exists(const std::string & name) const override;
@ -62,7 +60,6 @@ public:
std::optional<Strings> & original_files_list) const;
void setRelativePath(const std::string & path) override;
void onRename(const std::string & new_root_path, const std::string & new_part_dir) override;
std::string getDiskName() const override;
std::string getDiskType() const override;
@ -70,15 +67,15 @@ public:
bool supportZeroCopyReplication() const override;
bool supportParallelWrite() const override;
bool isBroken() const override;
void syncRevision(UInt64 revision) override;
void syncRevision(UInt64 revision) const override;
UInt64 getRevision() const override;
std::unordered_map<String, String> getSerializedMetadata(const std::vector<String> & paths) const override;
std::string getDiskPath() const override;
DisksSet::const_iterator isStoredOnDisk(const DisksSet & disks) const override;
ReservationPtr reserve(UInt64 bytes) override;
ReservationPtr tryReserve(UInt64 bytes) override;
ReservationPtr reserve(UInt64 bytes) const override;
ReservationPtr tryReserve(UInt64 bytes) const override;
size_t getVolumeIndex(const IStoragePolicy &) const override;
void writeChecksums(const MergeTreeDataPartChecksums & checksums, const WriteSettings & settings) const override;

View File

@ -147,12 +147,12 @@ void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, Write
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedSend};
if (part->data_part_storage->isStoredOnRemoteDisk())
if (part->getDataPartStorage().isStoredOnRemoteDisk())
{
UInt64 revision = parse<UInt64>(params.get("disk_revision", "0"));
if (revision)
part->data_part_storage->syncRevision(revision);
revision = part->data_part_storage->getRevision();
part->getDataPartStorage().syncRevision(revision);
revision = part->getDataPartStorage().getRevision();
if (revision)
response.addCookie({"disk_revision", toString(revision)});
}
@ -184,8 +184,8 @@ void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, Write
!isInMemoryPart(part) &&
client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_ZERO_COPY)
{
auto disk_type = part->data_part_storage->getDiskType();
if (part->data_part_storage->supportZeroCopyReplication() && std::find(capability.begin(), capability.end(), disk_type) != capability.end())
auto disk_type = part->getDataPartStorage().getDiskType();
if (part->getDataPartStorage().supportZeroCopyReplication() && std::find(capability.begin(), capability.end(), disk_type) != capability.end())
{
/// Send metadata if the receiver's capability covers the source disk type.
response.addCookie({"remote_fs_metadata", disk_type});
@ -307,12 +307,12 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDisk(
{
String file_name = it.first;
UInt64 size = part->data_part_storage->getFileSize(file_name);
UInt64 size = part->getDataPartStorage().getFileSize(file_name);
writeStringBinary(it.first, out);
writeBinary(size, out);
auto file_in = part->data_part_storage->readFile(file_name, {}, std::nullopt, std::nullopt);
auto file_in = part->getDataPartStorage().readFile(file_name, {}, std::nullopt, std::nullopt);
HashingWriteBuffer hashing_out(out);
copyDataWithThrottler(*file_in, hashing_out, blocker.getCounter(), data.getSendsThrottler());
@ -323,7 +323,7 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDisk(
throw Exception(
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART,
"Unexpected size of file {}, expected {} got {}",
std::string(fs::path(part->data_part_storage->getRelativePath()) / file_name),
std::string(fs::path(part->getDataPartStorage().getRelativePath()) / file_name),
hashing_out.count(), size);
writePODBinary(hashing_out.getHash(), out);
@ -342,9 +342,9 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDiskRemoteMeta(
bool send_part_id,
const std::map<String, std::shared_ptr<IMergeTreeDataPart>> & projections)
{
const auto * data_part_storage_on_disk = dynamic_cast<const DataPartStorageOnDisk *>(part->data_part_storage.get());
const auto * data_part_storage_on_disk = dynamic_cast<const DataPartStorageOnDisk *>(&part->getDataPartStorage());
if (!data_part_storage_on_disk)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Storage '{}' doesn't support zero-copy replication", part->data_part_storage->getDiskName());
throw Exception(ErrorCodes::LOGICAL_ERROR, "Storage '{}' doesn't support zero-copy replication", part->getDataPartStorage().getDiskName());
if (!data_part_storage_on_disk->supportZeroCopyReplication())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Disk '{}' doesn't support zero-copy replication", data_part_storage_on_disk->getDiskName());
@ -365,7 +365,7 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDiskRemoteMeta(
std::vector<std::string> paths;
paths.reserve(checksums.files.size());
for (const auto & it : checksums.files)
paths.push_back(fs::path(part->data_part_storage->getRelativePath()) / it.first);
paths.push_back(fs::path(part->getDataPartStorage().getRelativePath()) / it.first);
/// Serialized metadatadatas with zero ref counts.
auto metadatas = data_part_storage_on_disk->getSerializedMetadata(paths);
@ -399,7 +399,7 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDiskRemoteMeta(
for (const auto & it : checksums.files)
{
const String & file_name = it.first;
String file_path_prefix = fs::path(part->data_part_storage->getRelativePath()) / file_name;
String file_path_prefix = fs::path(part->getDataPartStorage().getRelativePath()) / file_name;
/// Just some additional checks
String metadata_file_path = fs::path(data_part_storage_on_disk->getDiskPath()) / file_path_prefix;

View File

@ -4,6 +4,7 @@
#include <Core/NamesAndTypes.h>
#include <Interpreters/TransactionVersionMetadata.h>
#include <Storages/MergeTree/MergeTreeDataPartState.h>
#include <boost/core/noncopyable.hpp>
#include <memory>
#include <optional>
@ -70,7 +71,6 @@ class IDataPartStorage : public boost::noncopyable
{
public:
virtual ~IDataPartStorage() = default;
virtual std::shared_ptr<IDataPartStorage> clone() const = 0;
/// Methods to get path components of a data part.
virtual std::string getFullPath() const = 0; /// '/var/lib/clickhouse/data/database/table/moving/all_1_5_1'
@ -81,8 +81,7 @@ public:
/// virtual std::string getRelativeRootPath() const = 0;
/// Get a storage for projection.
virtual std::shared_ptr<const IDataPartStorage> getProjection(const std::string & name) const = 0;
virtual std::shared_ptr<IDataPartStorage> getProjection(const std::string & name) = 0;
virtual std::shared_ptr<IDataPartStorage> getProjection(const std::string & name) const = 0;
/// Part directory exists.
virtual bool exists() const = 0;
@ -132,10 +131,9 @@ public:
/// TODO: remove it.
virtual std::optional<String> getRelativePathForPrefix(Poco::Logger * log, const String & prefix, bool detached, bool broken) const = 0;
/// Reset part directory, used for im-memory parts.
/// Reset part directory, used for in-memory parts.
/// TODO: remove it.
virtual void setRelativePath(const std::string & path) = 0;
virtual void onRename(const std::string & new_root_path, const std::string & new_part_dir) = 0;
/// Some methods from IDisk. Needed to avoid getting internal IDisk interface.
virtual std::string getDiskName() const = 0;
@ -144,7 +142,8 @@ public:
virtual bool supportZeroCopyReplication() const { return false; }
virtual bool supportParallelWrite() const = 0;
virtual bool isBroken() const = 0;
virtual void syncRevision(UInt64 revision) = 0;
/// TODO: remove or at least remove const.
virtual void syncRevision(UInt64 revision) const = 0;
virtual UInt64 getRevision() const = 0;
virtual std::unordered_map<String, String> getSerializedMetadata(const std::vector<String> & paths) const = 0;
/// Get a path for internal disk if relevant. It is used mainly for logging.
@ -156,8 +155,9 @@ public:
/// Reserve space on the same disk.
/// Probably we should try to remove it later.
virtual ReservationPtr reserve(UInt64 /*bytes*/) { return nullptr; }
virtual ReservationPtr tryReserve(UInt64 /*bytes*/) { return nullptr; }
/// TODO: remove constness
virtual ReservationPtr reserve(UInt64 /*bytes*/) const { return nullptr; }
virtual ReservationPtr tryReserve(UInt64 /*bytes*/) const { return nullptr; }
virtual size_t getVolumeIndex(const IStoragePolicy &) const { return 0; }
/// Some methods which change data part internals possibly after creation.
@ -234,8 +234,6 @@ public:
/// Ideally, new_root_path should be the same as current root (but it is not true).
/// Examples are: 'all_1_2_1' -> 'detached/all_1_2_1'
/// 'moving/tmp_all_1_2_1' -> 'all_1_2_1'
///
/// To notify storage also call onRename for it with first two args
virtual void rename(
const std::string & new_root_path,
const std::string & new_part_dir,
@ -251,4 +249,22 @@ public:
using DataPartStoragePtr = std::shared_ptr<const IDataPartStorage>;
using MutableDataPartStoragePtr = std::shared_ptr<IDataPartStorage>;
class DataPartStorageHolder : public boost::noncopyable
{
public:
explicit DataPartStorageHolder(MutableDataPartStoragePtr storage_)
: storage(std::move(storage_))
{
}
IDataPartStorage & getDataPartStorage() { return *storage; }
const IDataPartStorage & getDataPartStorage() const { return *storage; }
MutableDataPartStoragePtr getDataPartStoragePtr() { return storage; }
DataPartStoragePtr getDataPartStoragePtr() const { return storage; }
private:
MutableDataPartStoragePtr storage;
};
}

View File

@ -1,4 +1,5 @@
#include "IMergeTreeDataPart.h"
#include "Storages/MergeTree/IDataPartStorage.h"
#include <optional>
#include <boost/algorithm/string/join.hpp>
@ -101,7 +102,7 @@ void IMergeTreeDataPart::MinMaxIndex::load(const MergeTreeData & data, const Par
}
IMergeTreeDataPart::MinMaxIndex::WrittenFiles IMergeTreeDataPart::MinMaxIndex::store(
const MergeTreeData & data, const MutableDataPartStoragePtr & part_storage, Checksums & out_checksums) const
const MergeTreeData & data, IDataPartStorage & part_storage, Checksums & out_checksums) const
{
auto metadata_snapshot = data.getInMemoryMetadataPtr();
const auto & partition_key = metadata_snapshot->getPartitionKey();
@ -115,14 +116,14 @@ IMergeTreeDataPart::MinMaxIndex::WrittenFiles IMergeTreeDataPart::MinMaxIndex::s
IMergeTreeDataPart::MinMaxIndex::WrittenFiles IMergeTreeDataPart::MinMaxIndex::store(
const Names & column_names,
const DataTypes & data_types,
const MutableDataPartStoragePtr & part_storage,
IDataPartStorage & part_storage,
Checksums & out_checksums) const
{
if (!initialized)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Attempt to store uninitialized MinMax index for part {}. This is a bug",
part_storage->getFullPath());
part_storage.getFullPath());
WrittenFiles written_files;
@ -131,7 +132,7 @@ IMergeTreeDataPart::MinMaxIndex::WrittenFiles IMergeTreeDataPart::MinMaxIndex::s
String file_name = "minmax_" + escapeForFileName(column_names[i]) + ".idx";
auto serialization = data_types.at(i)->getDefaultSerialization();
auto out = part_storage->writeFile(file_name, DBMS_DEFAULT_BUFFER_SIZE, {});
auto out = part_storage.writeFile(file_name, DBMS_DEFAULT_BUFFER_SIZE, {});
HashingWriteBuffer out_hashing(*out);
serialization->serializeBinary(hyperrectangle[i].left, out_hashing);
serialization->serializeBinary(hyperrectangle[i].right, out_hashing);
@ -304,10 +305,10 @@ IMergeTreeDataPart::IMergeTreeDataPart(
const MutableDataPartStoragePtr & data_part_storage_,
Type part_type_,
const IMergeTreeDataPart * parent_part_)
: storage(storage_)
: DataPartStorageHolder(data_part_storage_)
, storage(storage_)
, name(name_)
, info(MergeTreePartInfo::fromPartName(name_, storage.format_version))
, data_part_storage(parent_part_ ? parent_part_->data_part_storage : data_part_storage_)
, index_granularity_info(storage_, part_type_)
, part_type(part_type_)
, parent_part(parent_part_)
@ -332,10 +333,10 @@ IMergeTreeDataPart::IMergeTreeDataPart(
const MutableDataPartStoragePtr & data_part_storage_,
Type part_type_,
const IMergeTreeDataPart * parent_part_)
: storage(storage_)
: DataPartStorageHolder(data_part_storage_)
, storage(storage_)
, name(name_)
, info(info_)
, data_part_storage(data_part_storage_)
, index_granularity_info(storage_, part_type_)
, part_type(part_type_)
, parent_part(parent_part_)
@ -343,6 +344,7 @@ IMergeTreeDataPart::IMergeTreeDataPart(
{
if (parent_part)
state = MergeTreeDataPartState::Active;
incrementStateMetric(state);
incrementTypeMetric(part_type);
@ -506,17 +508,17 @@ void IMergeTreeDataPart::removeIfNeeded()
std::string path;
try
{
path = data_part_storage->getRelativePath();
path = getDataPartStorage().getRelativePath();
if (!data_part_storage->exists()) // path
if (!getDataPartStorage().exists()) // path
return;
if (is_temp)
{
String file_name = fileName(data_part_storage->getPartDirectory());
String file_name = fileName(getDataPartStorage().getPartDirectory());
if (file_name.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "relative_path {} of part {} is invalid or not set", data_part_storage->getPartDirectory(), name);
throw Exception(ErrorCodes::LOGICAL_ERROR, "relative_path {} of part {} is invalid or not set", getDataPartStorage().getPartDirectory(), name);
if (!startsWith(file_name, "tmp") && !endsWith(file_name, ".tmp_proj"))
{
@ -621,7 +623,7 @@ String IMergeTreeDataPart::getColumnNameWithMinimumCompressedSize(bool with_subc
}
if (!minimum_size_column)
throw Exception("Could not find a column of minimum size in MergeTree, part " + data_part_storage->getFullPath(), ErrorCodes::LOGICAL_ERROR);
throw Exception("Could not find a column of minimum size in MergeTree, part " + getDataPartStorage().getFullPath(), ErrorCodes::LOGICAL_ERROR);
return *minimum_size_column;
}
@ -699,9 +701,9 @@ void IMergeTreeDataPart::loadProjections(bool require_columns_checksums, bool ch
for (const auto & projection : metadata_snapshot->projections)
{
String path = /*getRelativePath() + */ projection.name + ".proj";
if (data_part_storage->exists(path))
if (getDataPartStorage().exists(path))
{
auto projection_part_storage = data_part_storage->getProjection(projection.name + ".proj");
auto projection_part_storage = getDataPartStorage().getProjection(projection.name + ".proj");
auto part = storage.createPart(projection.name, {"all", 0, 0, 0}, projection_part_storage, this);
part->loadColumnsChecksumsIndexes(require_columns_checksums, check_consistency);
projection_parts.emplace(projection.name, std::move(part));
@ -742,8 +744,8 @@ void IMergeTreeDataPart::loadIndex()
loaded_index[i]->reserve(index_granularity.getMarksCount());
}
String index_name = "primary" + getIndexExtensionFromFilesystem(data_part_storage).value();
String index_path = fs::path(data_part_storage->getRelativePath()) / index_name;
String index_name = "primary" + getIndexExtensionFromFilesystem(getDataPartStorage()).value();
String index_path = fs::path(getDataPartStorage().getRelativePath()) / index_name;
auto index_file = metadata_manager->read(index_name);
size_t marks_count = index_granularity.getMarksCount();
@ -782,7 +784,7 @@ void IMergeTreeDataPart::appendFilesOfIndex(Strings & files) const
if (metadata_snapshot->hasPrimaryKey())
{
String index_name = "primary" + getIndexExtensionFromFilesystem(data_part_storage).value();
String index_name = "primary" + getIndexExtensionFromFilesystem(getDataPartStorage()).value();
files.push_back(index_name);
}
}
@ -794,10 +796,10 @@ NameSet IMergeTreeDataPart::getFileNamesWithoutChecksums() const
NameSet result = {"checksums.txt", "columns.txt"};
if (data_part_storage->exists(DEFAULT_COMPRESSION_CODEC_FILE_NAME))
if (getDataPartStorage().exists(DEFAULT_COMPRESSION_CODEC_FILE_NAME))
result.emplace(DEFAULT_COMPRESSION_CODEC_FILE_NAME);
if (data_part_storage->exists(TXN_VERSION_METADATA_FILE_NAME))
if (getDataPartStorage().exists(TXN_VERSION_METADATA_FILE_NAME))
result.emplace(TXN_VERSION_METADATA_FILE_NAME);
return result;
@ -812,7 +814,7 @@ void IMergeTreeDataPart::loadDefaultCompressionCodec()
return;
}
String path = fs::path(data_part_storage->getRelativePath()) / DEFAULT_COMPRESSION_CODEC_FILE_NAME;
String path = fs::path(getDataPartStorage().getRelativePath()) / DEFAULT_COMPRESSION_CODEC_FILE_NAME;
bool exists = metadata_manager->exists(DEFAULT_COMPRESSION_CODEC_FILE_NAME);
if (!exists)
{
@ -881,7 +883,7 @@ CompressionCodecPtr IMergeTreeDataPart::detectDefaultCompressionCodec() const
String candidate_path = /*fs::path(getRelativePath()) */ (ISerialization::getFileNameForStream(part_column, substream_path) + ".bin");
/// We can have existing, but empty .bin files. Example: LowCardinality(Nullable(...)) columns and column_name.dict.null.bin file.
if (data_part_storage->exists(candidate_path) && data_part_storage->getFileSize(candidate_path) != 0)
if (getDataPartStorage().exists(candidate_path) && getDataPartStorage().getFileSize(candidate_path) != 0)
path_to_data_file = candidate_path;
}
});
@ -892,7 +894,7 @@ CompressionCodecPtr IMergeTreeDataPart::detectDefaultCompressionCodec() const
continue;
}
result = getCompressionCodecForFile(data_part_storage, path_to_data_file);
result = getCompressionCodecForFile(getDataPartStorage(), path_to_data_file);
break;
}
}
@ -937,7 +939,7 @@ void IMergeTreeDataPart::loadPartitionAndMinMaxIndex()
String calculated_partition_id = partition.getID(metadata_snapshot->getPartitionKey().sample_block);
if (calculated_partition_id != info.partition_id)
throw Exception(
"While loading part " + data_part_storage->getFullPath() + ": calculated partition ID: " + calculated_partition_id
"While loading part " + getDataPartStorage().getFullPath() + ": calculated partition ID: " + calculated_partition_id
+ " differs from partition ID in part name: " + info.partition_id,
ErrorCodes::CORRUPTED_DATA);
}
@ -966,7 +968,7 @@ void IMergeTreeDataPart::loadChecksums(bool require)
bytes_on_disk = checksums.getTotalSizeOnDisk();
}
else
bytes_on_disk = data_part_storage->calculateTotalSizeOnDisk();
bytes_on_disk = getDataPartStorage().calculateTotalSizeOnDisk();
}
else
{
@ -978,7 +980,7 @@ void IMergeTreeDataPart::loadChecksums(bool require)
LOG_WARNING(storage.log, "Checksums for part {} not found. Will calculate them from data on disk.", name);
checksums = checkDataPart(shared_from_this(), false);
data_part_storage->writeChecksums(checksums, {});
getDataPartStorage().writeChecksums(checksums, {});
bytes_on_disk = checksums.getTotalSizeOnDisk();
}
@ -1063,7 +1065,7 @@ void IMergeTreeDataPart::loadRowsCount()
}
else
{
if (data_part_storage->exists("count.txt"))
if (getDataPartStorage().exists("count.txt"))
{
read_rows_count();
return;
@ -1162,7 +1164,7 @@ void IMergeTreeDataPart::appendFilesOfUUID(Strings & files)
void IMergeTreeDataPart::loadColumns(bool require)
{
String path = fs::path(data_part_storage->getRelativePath()) / "columns.txt";
String path = fs::path(getDataPartStorage().getRelativePath()) / "columns.txt";
auto metadata_snapshot = storage.getInMemoryMetadataPtr();
if (parent_part)
metadata_snapshot = metadata_snapshot->projections.get(name).metadata;
@ -1173,18 +1175,18 @@ void IMergeTreeDataPart::loadColumns(bool require)
{
/// We can get list of columns only from columns.txt in compact parts.
if (require || part_type == Type::Compact)
throw Exception("No columns.txt in part " + name + ", expected path " + path + " on drive " + data_part_storage->getDiskName(),
throw Exception("No columns.txt in part " + name + ", expected path " + path + " on drive " + getDataPartStorage().getDiskName(),
ErrorCodes::NO_FILE_IN_DATA_PART);
/// If there is no file with a list of columns, write it down.
for (const NameAndTypePair & column : metadata_snapshot->getColumns().getAllPhysical())
if (data_part_storage->exists(getFileNameForColumn(column) + ".bin"))
if (getDataPartStorage().exists(getFileNameForColumn(column) + ".bin"))
loaded_columns.push_back(column);
if (columns.empty())
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
data_part_storage->writeColumns(loaded_columns, {});
getDataPartStorage().writeColumns(loaded_columns, {});
}
else
{
@ -1228,7 +1230,7 @@ void IMergeTreeDataPart::assertHasVersionMetadata(MergeTreeTransaction * txn) co
name, storage.getStorageID().getNameForLogs(), version.creation_tid, txn ? txn->dumpDescription() : "<none>");
assert(!txn || storage.supportsTransactions());
assert(!txn || data_part_storage->exists(TXN_VERSION_METADATA_FILE_NAME));
assert(!txn || getDataPartStorage().exists(TXN_VERSION_METADATA_FILE_NAME));
}
void IMergeTreeDataPart::storeVersionMetadata(bool force) const
@ -1243,7 +1245,7 @@ void IMergeTreeDataPart::storeVersionMetadata(bool force) const
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Transactions are not supported for in-memory parts (table: {}, part: {})",
storage.getStorageID().getNameForLogs(), name);
data_part_storage->writeVersionMetadata(version, storage.getSettings()->fsync_part_directory);
getDataPartStorage().writeVersionMetadata(version, storage.getSettings()->fsync_part_directory);
}
void IMergeTreeDataPart::appendCSNToVersionMetadata(VersionMetadata::WhichCSN which_csn) const
@ -1255,7 +1257,7 @@ void IMergeTreeDataPart::appendCSNToVersionMetadata(VersionMetadata::WhichCSN wh
chassert(!(which_csn == VersionMetadata::WhichCSN::REMOVAL && version.removal_csn == 0));
chassert(isStoredOnDisk());
data_part_storage->appendCSNToVersionMetadata(version, which_csn);
getDataPartStorage().appendCSNToVersionMetadata(version, which_csn);
}
void IMergeTreeDataPart::appendRemovalTIDToVersionMetadata(bool clear) const
@ -1278,13 +1280,13 @@ void IMergeTreeDataPart::appendRemovalTIDToVersionMetadata(bool clear) const
else
LOG_TEST(storage.log, "Appending removal TID for {} (creation: {}, removal {})", name, version.creation_tid, version.removal_tid);
data_part_storage->appendRemovalTIDToVersionMetadata(version, clear);
getDataPartStorage().appendRemovalTIDToVersionMetadata(version, clear);
}
void IMergeTreeDataPart::loadVersionMetadata() const
try
{
data_part_storage->loadVersionMetadata(version, storage.log);
getDataPartStorage().loadVersionMetadata(version, storage.log);
}
catch (Exception & e)
{
@ -1321,15 +1323,15 @@ bool IMergeTreeDataPart::assertHasValidVersionMetadata() const
if (state == MergeTreeDataPartState::Temporary)
return true;
if (!data_part_storage->exists())
if (!getDataPartStorage().exists())
return true;
String content;
String version_file_name = TXN_VERSION_METADATA_FILE_NAME;
try
{
size_t file_size = data_part_storage->getFileSize(TXN_VERSION_METADATA_FILE_NAME);
auto buf = data_part_storage->readFile(TXN_VERSION_METADATA_FILE_NAME, ReadSettings().adjustBufferSize(file_size), file_size, std::nullopt);
size_t file_size = getDataPartStorage().getFileSize(TXN_VERSION_METADATA_FILE_NAME);
auto buf = getDataPartStorage().readFile(TXN_VERSION_METADATA_FILE_NAME, ReadSettings().adjustBufferSize(file_size), file_size, std::nullopt);
readStringUntilEOF(content, *buf);
ReadBufferFromString str_buf{content};
@ -1363,10 +1365,10 @@ void IMergeTreeDataPart::appendFilesOfColumns(Strings & files)
bool IMergeTreeDataPart::shallParticipateInMerges(const StoragePolicyPtr & storage_policy) const
{
return data_part_storage->shallParticipateInMerges(*storage_policy);
return getDataPartStorage().shallParticipateInMerges(*storage_policy);
}
void IMergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_new_dir_if_exists) const
void IMergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_new_dir_if_exists)
try
{
assertOnDisk();
@ -1377,20 +1379,21 @@ try
if (parent_part)
{
/// For projections, move is only possible inside parent part dir.
relative_path = parent_part->data_part_storage->getRelativePath();
relative_path = parent_part->getDataPartStorage().getRelativePath();
}
String from = data_part_storage->getRelativePath();
auto to = fs::path(relative_path) / new_relative_path;
metadata_manager->deleteAll(true);
metadata_manager->assertAllDeleted(true);
data_part_storage->rename(to.parent_path(), to.filename(), storage.log, remove_new_dir_if_exists, fsync_dir);
data_part_storage->onRename(to.parent_path(), to.filename());
getDataPartStorage().rename(to.parent_path(), to.filename(), storage.log, remove_new_dir_if_exists, fsync_dir);
metadata_manager->updateAll(true);
for (const auto & [p_name, part] : projection_parts)
part->data_part_storage = data_part_storage->getProjection(p_name + ".proj");
auto old_projection_root_path = getDataPartStorage().getRelativePath();
auto new_projection_root_path = to.string();
for (const auto & [_, part] : projection_parts)
part->getDataPartStorage().changeRootPath(old_projection_root_path, new_projection_root_path);
}
catch (...)
{
@ -1431,14 +1434,14 @@ void IMergeTreeDataPart::initializePartMetadataManager()
void IMergeTreeDataPart::initializeIndexGranularityInfo()
{
auto mrk_ext = MergeTreeIndexGranularityInfo::getMarksExtensionFromFilesystem(data_part_storage);
auto mrk_ext = MergeTreeIndexGranularityInfo::getMarksExtensionFromFilesystem(getDataPartStorage());
if (mrk_ext)
index_granularity_info = MergeTreeIndexGranularityInfo(storage, MarkType{*mrk_ext});
else
index_granularity_info = MergeTreeIndexGranularityInfo(storage, part_type);
}
void IMergeTreeDataPart::remove() const
void IMergeTreeDataPart::remove()
{
assert(assertHasValidVersionMetadata());
part_is_probably_removed_from_disk = true;
@ -1455,7 +1458,6 @@ void IMergeTreeDataPart::remove() const
return CanRemoveDescription{.can_remove_anything = can_remove, .files_not_to_remove = files_not_to_remove };
};
if (!isStoredOnDisk())
return;
@ -1474,7 +1476,7 @@ void IMergeTreeDataPart::remove() const
projection_checksums.emplace_back(IDataPartStorage::ProjectionChecksums{.name = p_name, .checksums = projection_part->checksums});
}
data_part_storage->remove(std::move(can_remove_callback), checksums, projection_checksums, is_temp, getState(), storage.log);
getDataPartStorage().remove(std::move(can_remove_callback), checksums, projection_checksums, is_temp, getState(), storage.log);
}
std::optional<String> IMergeTreeDataPart::getRelativePathForPrefix(const String & prefix, bool detached, bool broken) const
@ -1491,7 +1493,7 @@ std::optional<String> IMergeTreeDataPart::getRelativePathForPrefix(const String
if (detached && parent_part)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot detach projection");
return data_part_storage->getRelativePathForPrefix(storage.log, prefix, detached, broken);
return getDataPartStorage().getRelativePathForPrefix(storage.log, prefix, detached, broken);
}
std::optional<String> IMergeTreeDataPart::getRelativePathForDetachedPart(const String & prefix, bool broken) const
@ -1506,7 +1508,7 @@ std::optional<String> IMergeTreeDataPart::getRelativePathForDetachedPart(const S
return {};
}
void IMergeTreeDataPart::renameToDetached(const String & prefix) const
void IMergeTreeDataPart::renameToDetached(const String & prefix)
{
auto path_to_detach = getRelativePathForDetachedPart(prefix, /* broken */ false);
assert(path_to_detach);
@ -1529,7 +1531,7 @@ void IMergeTreeDataPart::makeCloneInDetached(const String & prefix, const Storag
if (!maybe_path_in_detached)
return;
data_part_storage->freeze(
getDataPartStorage().freeze(
storage.relative_data_path,
*maybe_path_in_detached,
/*make_source_readonly*/ true,
@ -1542,13 +1544,13 @@ MutableDataPartStoragePtr IMergeTreeDataPart::makeCloneOnDisk(const DiskPtr & di
{
assertOnDisk();
if (disk->getName() == data_part_storage->getDiskName())
throw Exception("Can not clone data part " + name + " to same disk " + data_part_storage->getDiskName(), ErrorCodes::LOGICAL_ERROR);
if (disk->getName() == getDataPartStorage().getDiskName())
throw Exception("Can not clone data part " + name + " to same disk " + getDataPartStorage().getDiskName(), ErrorCodes::LOGICAL_ERROR);
if (directory_name.empty())
throw Exception("Can not clone data part " + name + " to empty directory.", ErrorCodes::LOGICAL_ERROR);
String path_to_clone = fs::path(storage.relative_data_path) / directory_name / "";
return data_part_storage->clonePart(path_to_clone, data_part_storage->getPartDirectory(), disk, storage.log);
return getDataPartStorage().clonePart(path_to_clone, getDataPartStorage().getPartDirectory(), disk, storage.log);
}
void IMergeTreeDataPart::checkConsistencyBase() const
@ -1589,26 +1591,26 @@ void IMergeTreeDataPart::checkConsistencyBase() const
}
}
data_part_storage->checkConsistency(checksums);
getDataPartStorage().checkConsistency(checksums);
}
else
{
auto check_file_not_empty = [this](const String & file_path)
{
UInt64 file_size;
if (!data_part_storage->exists(file_path) || (file_size = data_part_storage->getFileSize(file_path)) == 0)
if (!getDataPartStorage().exists(file_path) || (file_size = getDataPartStorage().getFileSize(file_path)) == 0)
throw Exception(
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART,
"Part {} is broken: {} is empty",
data_part_storage->getFullPath(),
std::string(fs::path(data_part_storage->getFullPath()) / file_path));
getDataPartStorage().getFullPath(),
std::string(fs::path(getDataPartStorage().getFullPath()) / file_path));
return file_size;
};
/// Check that the primary key index is not empty.
if (!pk.column_names.empty())
{
String index_name = "primary" + getIndexExtensionFromFilesystem(data_part_storage).value();
String index_name = "primary" + getIndexExtensionFromFilesystem(getDataPartStorage()).value();
check_file_not_empty(index_name);
}
@ -1752,7 +1754,7 @@ bool IMergeTreeDataPart::checkAllTTLCalculated(const StorageMetadataPtr & metada
String IMergeTreeDataPart::getUniqueId() const
{
return data_part_storage->getUniqueId();
return getDataPartStorage().getUniqueId();
}
String IMergeTreeDataPart::getZeroLevelPartBlockID(std::string_view token) const
@ -1791,11 +1793,11 @@ IMergeTreeDataPart::uint128 IMergeTreeDataPart::getActualChecksumByFile(const St
return it->second.file_hash;
}
if (!data_part_storage->exists(file_name))
if (!getDataPartStorage().exists(file_name))
{
return {};
}
std::unique_ptr<ReadBufferFromFileBase> in_file = data_part_storage->readFile(file_name, {}, std::nullopt, std::nullopt);
std::unique_ptr<ReadBufferFromFileBase> in_file = getDataPartStorage().readFile(file_name, {}, std::nullopt, std::nullopt);
HashingReadBuffer in_hash(*in_file);
String value;
@ -1823,11 +1825,11 @@ bool isInMemoryPart(const MergeTreeDataPartPtr & data_part)
return (data_part && data_part->getType() == MergeTreeDataPartType::InMemory);
}
std::optional<std::string> getIndexExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage)
std::optional<std::string> getIndexExtensionFromFilesystem(const IDataPartStorage & data_part_storage)
{
if (data_part_storage->exists())
if (data_part_storage.exists())
{
for (auto it = data_part_storage->iterate(); it->isValid(); it->next())
for (auto it = data_part_storage.iterate(); it->isValid(); it->next())
{
const auto & extension = fs::path(it->name()).extension();
if (extension == getIndexExtension(false)

View File

@ -46,7 +46,7 @@ class UncompressedCache;
class MergeTreeTransaction;
/// Description of the data part.
class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPart>
class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPart>, public DataPartStorageHolder
{
public:
static constexpr auto DATA_FILE_EXTENSION = ".bin";
@ -150,7 +150,7 @@ public:
/// Throws an exception if part is not stored in on-disk format.
void assertOnDisk() const;
void remove() const;
void remove();
/// Initialize columns (from columns.txt if exists, or create from column files if not).
/// Load checksums from checksums.txt if exists. Load index if required.
@ -198,10 +198,6 @@ public:
/// processed by multiple shards.
UUID uuid = UUIDHelpers::Nil;
/// This is an object which encapsulates all the operations with disk.
/// Contains a path to stored data.
MutableDataPartStoragePtr data_part_storage;
MergeTreeIndexGranularityInfo index_granularity_info;
size_t rows_count = 0;
@ -287,8 +283,8 @@ public:
using WrittenFiles = std::vector<std::unique_ptr<WriteBufferFromFileBase>>;
[[nodiscard]] WrittenFiles store(const MergeTreeData & data, const MutableDataPartStoragePtr & part_storage, Checksums & checksums) const;
[[nodiscard]] WrittenFiles store(const Names & column_names, const DataTypes & data_types, const MutableDataPartStoragePtr & part_storage, Checksums & checksums) const;
[[nodiscard]] WrittenFiles store(const MergeTreeData & data, IDataPartStorage & part_storage, Checksums & checksums) const;
[[nodiscard]] WrittenFiles store(const Names & column_names, const DataTypes & data_types, IDataPartStorage & part_storage, Checksums & checksums) const;
void update(const Block & block, const Names & column_names);
void merge(const MinMaxIndex & other);
@ -319,11 +315,11 @@ public:
size_t getFileSizeOrZero(const String & file_name) const;
/// Moves a part to detached/ directory and adds prefix to its name
void renameToDetached(const String & prefix) const;
void renameToDetached(const String & prefix);
/// Makes checks and move part to new directory
/// Changes only relative_dir_name, you need to update other metadata (name, is_temp) explicitly
virtual void renameTo(const String & new_relative_path, bool remove_new_dir_if_exists) const;
virtual void renameTo(const String & new_relative_path, bool remove_new_dir_if_exists);
/// Makes clone of a part in detached/ directory via hard links
virtual void makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot) const;
@ -583,7 +579,7 @@ bool isCompactPart(const MergeTreeDataPartPtr & data_part);
bool isWidePart(const MergeTreeDataPartPtr & data_part);
bool isInMemoryPart(const MergeTreeDataPartPtr & data_part);
inline String getIndexExtension(bool is_compressed_primary_key) { return is_compressed_primary_key ? ".cidx" : ".idx"; }
std::optional<String> getIndexExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage);
std::optional<String> getIndexExtensionFromFilesystem(const IDataPartStorage & data_part_storage);
bool isCompressedFromIndexExtension(const String & index_extension);
}

View File

@ -12,7 +12,7 @@ IMergedBlockOutputStream::IMergedBlockOutputStream(
bool reset_columns_)
: storage(data_part->storage)
, metadata_snapshot(metadata_snapshot_)
, data_part_storage(data_part->data_part_storage)
, data_part_storage(data_part->getDataPartStoragePtr())
, reset_columns(reset_columns_)
{
if (reset_columns)

View File

@ -23,7 +23,7 @@ public:
bool isProjectionPart() const override { return data_part->isProjectionPart(); }
DataPartStoragePtr getDataPartStorage() const override { return data_part->data_part_storage; }
DataPartStoragePtr getDataPartStorage() const override { return data_part->getDataPartStoragePtr(); }
const NamesAndTypesList & getColumns() const override { return data_part->getColumns(); }

View File

@ -160,7 +160,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
for (auto & part_ptr : parts)
{
ttl_infos.update(part_ptr->ttl_infos);
max_volume_index = std::max(max_volume_index, part_ptr->data_part_storage->getVolumeIndex(*storage.getStoragePolicy()));
max_volume_index = std::max(max_volume_index, part_ptr->getDataPartStorage().getVolumeIndex(*storage.getStoragePolicy()));
}
/// It will live until the whole task is being destroyed

View File

@ -65,7 +65,7 @@ MergeListElement::MergeListElement(
for (const auto & source_part : future_part->parts)
{
source_part_names.emplace_back(source_part->name);
source_part_paths.emplace_back(source_part->data_part_storage->getFullPath());
source_part_paths.emplace_back(source_part->getDataPartStorage().getFullPath());
total_size_bytes_compressed += source_part->getBytesOnDisk();
total_size_marks += source_part->getMarksCount();

View File

@ -130,7 +130,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
if (global_ctx->parent_part)
{
data_part_storage = global_ctx->parent_part->data_part_storage->getProjection(local_tmp_part_basename);
data_part_storage = global_ctx->parent_part->getDataPartStorage().getProjection(local_tmp_part_basename);
}
else
{

View File

@ -59,7 +59,7 @@ public:
bool deduplicate_,
Names deduplicate_by_columns_,
MergeTreeData::MergingParams merging_params_,
const IMergeTreeDataPart * parent_part_,
IMergeTreeDataPart * parent_part_,
String suffix_,
MergeTreeTransactionPtr txn,
MergeTreeData * data_,

View File

@ -943,8 +943,8 @@ Int64 MergeTreeData::getMaxBlockNumber() const
}
void MergeTreeData::loadDataPartsFromDisk(
DataPartsVector & broken_parts_to_detach,
DataPartsVector & duplicate_parts_to_remove,
MutableDataPartsVector & broken_parts_to_detach,
MutableDataPartsVector & duplicate_parts_to_remove,
ThreadPool & pool,
size_t num_parts,
std::queue<std::vector<std::pair<String, DiskPtr>>> & parts_queue,
@ -1199,8 +1199,7 @@ void MergeTreeData::loadDataPartsFromDisk(
void MergeTreeData::loadDataPartsFromWAL(
DataPartsVector & /* broken_parts_to_detach */,
DataPartsVector & duplicate_parts_to_remove,
MutableDataPartsVector & duplicate_parts_to_remove,
MutableDataPartsVector & parts_from_wal)
{
for (auto & part : parts_from_wal)
@ -1214,7 +1213,7 @@ void MergeTreeData::loadDataPartsFromWAL(
{
if ((*it)->checksums.getTotalChecksumHex() == part->checksums.getTotalChecksumHex())
{
LOG_ERROR(log, "Remove duplicate part {}", part->data_part_storage->getFullPath());
LOG_ERROR(log, "Remove duplicate part {}", part->getDataPartStorage().getFullPath());
duplicate_parts_to_remove.push_back(part);
}
else
@ -1328,8 +1327,8 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
auto part_lock = lockParts();
data_parts_indexes.clear();
DataPartsVector broken_parts_to_detach;
DataPartsVector duplicate_parts_to_remove;
MutableDataPartsVector broken_parts_to_detach;
MutableDataPartsVector duplicate_parts_to_remove;
if (num_parts > 0)
loadDataPartsFromDisk(
@ -1383,7 +1382,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
parts_from_wal.insert(
parts_from_wal.end(), std::make_move_iterator(disk_wal_parts.begin()), std::make_move_iterator(disk_wal_parts.end()));
loadDataPartsFromWAL(broken_parts_to_detach, duplicate_parts_to_remove, parts_from_wal);
loadDataPartsFromWAL(duplicate_parts_to_remove, parts_from_wal);
num_parts += parts_from_wal.size();
}
@ -1684,6 +1683,14 @@ scope_guard MergeTreeData::getTemporaryPartDirectoryHolder(const String & part_d
return [this, part_dir_name]() { temporary_parts.remove(part_dir_name); };
}
MergeTreeData::MutableDataPartPtr MergeTreeData::preparePartForRemoval(const DataPartPtr & part)
{
if (part->getState() != DataPartState::Deleting)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot remove part {}, because it has state: {}", part->name, magic_enum::enum_name(part->getState()));
return std::const_pointer_cast<IMergeTreeDataPart>(part);
}
MergeTreeData::DataPartsVector MergeTreeData::grabOldParts(bool force)
{
@ -1859,7 +1866,7 @@ void MergeTreeData::flushAllInMemoryPartsIfNeeded()
{
if (auto part_in_memory = asInMemoryPart(part))
{
part_in_memory->flushToDisk(part_in_memory->data_part_storage->getPartDirectory(), metadata_snapshot);
part_in_memory->flushToDisk(part_in_memory->getDataPartStorage().getPartDirectory(), metadata_snapshot);
}
}
}
@ -1943,7 +1950,7 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
part->remove();
preparePartForRemoval(part)->remove();
if (part_names_succeed)
{
std::lock_guard lock(part_names_mutex);
@ -1959,7 +1966,7 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t
LOG_DEBUG(log, "Removing {} parts from filesystem: {}", parts_to_remove.size(), fmt::join(parts_to_remove, ", "));
for (const DataPartPtr & part : parts_to_remove)
{
part->remove();
preparePartForRemoval(part)->remove();
if (part_names_succeed)
part_names_succeed->insert(part->name);
}
@ -2139,11 +2146,14 @@ void MergeTreeData::rename(const String & new_table_path, const StorageID & new_
if (!getStorageID().hasUUID())
getContext()->dropCaches();
/// TODO: remove const_cast
for (const auto & part : data_parts_by_info)
part->data_part_storage->changeRootPath(relative_data_path, new_table_path);
{
auto & part_mutable = const_cast<IMergeTreeDataPart &>(*part);
part_mutable.getDataPartStorage().changeRootPath(relative_data_path, new_table_path);
}
relative_data_path = new_table_path;
renameInMemory(new_table_id);
}
@ -2744,7 +2754,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(
const MutableDataPartStoragePtr & data_part_storage, const IMergeTreeDataPart * parent_part) const
{
MergeTreeDataPartType type;
auto mrk_ext = MergeTreeIndexGranularityInfo::getMarksExtensionFromFilesystem(data_part_storage);
auto mrk_ext = MergeTreeIndexGranularityInfo::getMarksExtensionFromFilesystem(*data_part_storage);
if (mrk_ext)
{
@ -2983,7 +2993,7 @@ void MergeTreeData::preparePartForCommit(MutableDataPartPtr & part, Transaction
assert([&]()
{
String dir_name = fs::path(part->data_part_storage->getRelativePath()).filename();
String dir_name = fs::path(part->getDataPartStorage().getRelativePath()).filename();
bool may_be_cleaned_up = dir_name.starts_with("tmp_") || dir_name.starts_with("tmp-fetch_");
return !may_be_cleaned_up || temporary_parts.contains(dir_name);
}());
@ -3000,7 +3010,7 @@ bool MergeTreeData::renameTempPartAndReplaceImpl(
DataPartsLock & lock,
DataPartsVector * out_covered_parts)
{
LOG_TRACE(log, "Renaming temporary part {} to {}.", part->data_part_storage->getPartDirectory(), part->name);
LOG_TRACE(log, "Renaming temporary part {} to {}.", part->getDataPartStorage().getPartDirectory(), part->name);
if (&out_transaction.data != this)
throw Exception("MergeTreeData::Transaction for one table cannot be used with another. It is a bug.",
@ -3231,9 +3241,9 @@ void MergeTreeData::outdateBrokenPartAndCloneToDetached(const DataPartPtr & part
{
auto metadata_snapshot = getInMemoryMetadataPtr();
if (prefix.empty())
LOG_INFO(log, "Cloning part {} to {} and making it obsolete.", part_to_detach->data_part_storage->getPartDirectory(), part_to_detach->name);
LOG_INFO(log, "Cloning part {} to {} and making it obsolete.", part_to_detach->getDataPartStorage().getPartDirectory(), part_to_detach->name);
else
LOG_INFO(log, "Cloning part {} to {}_{} and making it obsolete.", part_to_detach->data_part_storage->getPartDirectory(), prefix, part_to_detach->name);
LOG_INFO(log, "Cloning part {} to {}_{} and making it obsolete.", part_to_detach->getDataPartStorage().getPartDirectory(), prefix, part_to_detach->name);
part_to_detach->makeCloneInDetached(prefix, metadata_snapshot);
@ -3245,9 +3255,9 @@ void MergeTreeData::outdateBrokenPartAndCloneToDetached(const DataPartPtr & part
void MergeTreeData::forcefullyMovePartToDetachedAndRemoveFromMemory(const MergeTreeData::DataPartPtr & part_to_detach, const String & prefix, bool restore_covered)
{
if (prefix.empty())
LOG_INFO(log, "Renaming {} to {} and forgetting it.", part_to_detach->data_part_storage->getPartDirectory(), part_to_detach->name);
LOG_INFO(log, "Renaming {} to {} and forgetting it.", part_to_detach->getDataPartStorage().getPartDirectory(), part_to_detach->name);
else
LOG_INFO(log, "Renaming {} to {}_{} and forgetting it.", part_to_detach->data_part_storage->getPartDirectory(), prefix, part_to_detach->name);
LOG_INFO(log, "Renaming {} to {}_{} and forgetting it.", part_to_detach->getDataPartStorage().getPartDirectory(), prefix, part_to_detach->name);
auto lock = lockParts();
bool removed_active_part = false;
@ -3270,9 +3280,7 @@ void MergeTreeData::forcefullyMovePartToDetachedAndRemoveFromMemory(const MergeT
}
modifyPartState(it_part, DataPartState::Deleting);
part->renameToDetached(prefix);
preparePartForRemoval(part)->renameToDetached(prefix);
data_parts_indexes.erase(it_part);
if (restore_covered && part->info.level == 0)
@ -3426,7 +3434,7 @@ void MergeTreeData::tryRemovePartImmediately(DataPartPtr && part)
try
{
part_to_delete->remove();
preparePartForRemoval(part_to_delete)->remove();
}
catch (...)
{
@ -3636,9 +3644,9 @@ void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy)
/// when allow_remote_fs_zero_copy_replication turned on and off again
original_active_part->force_keep_shared_data = false;
if (original_active_part->data_part_storage->supportZeroCopyReplication() &&
part_copy->data_part_storage->supportZeroCopyReplication() &&
original_active_part->data_part_storage->getUniqueId() == part_copy->data_part_storage->getUniqueId())
if (original_active_part->getDataPartStorage().supportZeroCopyReplication() &&
part_copy->getDataPartStorage().supportZeroCopyReplication() &&
original_active_part->getDataPartStorage().getUniqueId() == part_copy->getDataPartStorage().getUniqueId())
{
/// May be when several volumes use the same S3/HDFS storage
original_active_part->force_keep_shared_data = true;
@ -3658,7 +3666,7 @@ void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy)
/// All other locks are taken in StorageReplicatedMergeTree
lockSharedData(*part_copy);
original_active_part->data_part_storage->writeDeleteOnDestroyMarker(log);
original_active_part->getDataPartStorage().writeDeleteOnDestroyMarker(log);
return;
}
}
@ -3792,9 +3800,9 @@ MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_na
static void loadPartAndFixMetadataImpl(MergeTreeData::MutableDataPartPtr part)
{
part->loadColumnsChecksumsIndexes(false, true);
part->modification_time = part->data_part_storage->getLastModified().epochTime();
part->data_part_storage->removeDeleteOnDestroyMarker();
part->data_part_storage->removeVersionMetadata();
part->modification_time = part->getDataPartStorage().getLastModified().epochTime();
part->getDataPartStorage().removeDeleteOnDestroyMarker();
part->getDataPartStorage().removeVersionMetadata();
}
void MergeTreeData::calculateColumnAndSecondaryIndexSizesImpl()
@ -3954,7 +3962,7 @@ void MergeTreeData::movePartitionToDisk(const ASTPtr & partition, const String &
auto disk = getStoragePolicy()->getDiskByName(name);
std::erase_if(parts, [&](auto part_ptr)
{
return part_ptr->data_part_storage->getDiskName() == disk->getName();
return part_ptr->getDataPartStorage().getDiskName() == disk->getName();
});
if (parts.empty())
@ -4004,7 +4012,7 @@ void MergeTreeData::movePartitionToVolume(const ASTPtr & partition, const String
{
for (const auto & disk : volume->getDisks())
{
if (part_ptr->data_part_storage->getDiskName() == disk->getName())
if (part_ptr->getDataPartStorage().getDiskName() == disk->getName())
{
return true;
}
@ -4201,7 +4209,7 @@ BackupEntries MergeTreeData::backupParts(const DataPartsVector & data_parts, con
make_temporary_hard_links = false;
hold_storage_and_part_ptrs = true;
}
else if (supportsReplication() && part->data_part_storage->supportZeroCopyReplication() && getSettings()->allow_remote_fs_zero_copy_replication)
else if (supportsReplication() && part->getDataPartStorage().supportZeroCopyReplication() && getSettings()->allow_remote_fs_zero_copy_replication)
{
/// Hard links don't work correctly with zero copy replication.
make_temporary_hard_links = false;
@ -4213,7 +4221,7 @@ BackupEntries MergeTreeData::backupParts(const DataPartsVector & data_parts, con
table_lock = lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
BackupEntries backup_entries_from_part;
part->data_part_storage->backup(
part->getDataPartStorage().backup(
part->checksums,
part->getFileNamesWithoutChecksums(),
data_path_in_backup,
@ -4224,7 +4232,7 @@ BackupEntries MergeTreeData::backupParts(const DataPartsVector & data_parts, con
auto projection_parts = part->getProjectionParts();
for (const auto & [projection_name, projection_part] : projection_parts)
{
projection_part->data_part_storage->backup(
projection_part->getDataPartStorage().backup(
projection_part->checksums,
projection_part->getFileNamesWithoutChecksums(),
fs::path{data_path_in_backup} / part->name,
@ -4900,16 +4908,16 @@ ReservationPtr MergeTreeData::reserveSpace(UInt64 expected_size, SpacePtr space)
return checkAndReturnReservation(expected_size, std::move(reservation));
}
ReservationPtr MergeTreeData::reserveSpace(UInt64 expected_size, const MutableDataPartStoragePtr & data_part_storage)
ReservationPtr MergeTreeData::reserveSpace(UInt64 expected_size, const IDataPartStorage & data_part_storage)
{
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
return data_part_storage->reserve(expected_size);
return data_part_storage.reserve(expected_size);
}
ReservationPtr MergeTreeData::tryReserveSpace(UInt64 expected_size, const MutableDataPartStoragePtr & data_part_storage)
ReservationPtr MergeTreeData::tryReserveSpace(UInt64 expected_size, const IDataPartStorage & data_part_storage)
{
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
return data_part_storage->tryReserve(expected_size);
return data_part_storage.tryReserve(expected_size);
}
ReservationPtr MergeTreeData::tryReserveSpace(UInt64 expected_size, SpacePtr space)
@ -5058,11 +5066,11 @@ bool MergeTreeData::isPartInTTLDestination(const TTLDescription & ttl, const IMe
if (ttl.destination_type == DataDestinationType::VOLUME)
{
for (const auto & disk : policy->getVolumeByName(ttl.destination_name)->getDisks())
if (disk->getName() == part.data_part_storage->getDiskName())
if (disk->getName() == part.getDataPartStorage().getDiskName())
return true;
}
else if (ttl.destination_type == DataDestinationType::DISK)
return policy->getDiskByName(ttl.destination_name)->getName() == part.data_part_storage->getDiskName();
return policy->getDiskByName(ttl.destination_name)->getName() == part.getDataPartStorage().getDiskName();
return false;
}
@ -5134,7 +5142,7 @@ void MergeTreeData::Transaction::rollbackPartsToTemporaryState()
WriteBufferFromOwnString buf;
buf << " Rollbacking parts state to temporary and removing from working set:";
for (const auto & part : precommitted_parts)
buf << " " << part->data_part_storage->getPartDirectory();
buf << " " << part->getDataPartStorage().getPartDirectory();
buf << ".";
LOG_DEBUG(data.log, "Undoing transaction.{}", buf.str());
@ -5159,7 +5167,7 @@ void MergeTreeData::Transaction::rollback()
WriteBufferFromOwnString buf;
buf << " Removing parts:";
for (const auto & part : precommitted_parts)
buf << " " << part->data_part_storage->getPartDirectory();
buf << " " << part->getDataPartStorage().getPartDirectory();
buf << ".";
LOG_DEBUG(data.log, "Undoing transaction.{}", buf.str());
@ -5188,8 +5196,8 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(MergeTreeData:
auto * owing_parts_lock = acquired_parts_lock ? acquired_parts_lock : &parts_lock;
for (const auto & part : precommitted_parts)
if (part->data_part_storage->hasActiveTransaction())
part->data_part_storage->commitTransaction();
if (part->getDataPartStorage().hasActiveTransaction())
part->getDataPartStorage().commitTransaction();
bool commit_to_wal = has_in_memory_parts && settings->in_memory_parts_enable_wal;
if (txn || commit_to_wal)
@ -6210,7 +6218,7 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::cloneAn
bool does_storage_policy_allow_same_disk = false;
for (const DiskPtr & disk : getStoragePolicy()->getDisks())
{
if (disk->getName() == src_part->data_part_storage->getDiskName())
if (disk->getName() == src_part->getDataPartStorage().getDiskName())
{
does_storage_policy_allow_same_disk = true;
break;
@ -6220,7 +6228,7 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::cloneAn
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Could not clone and load part {} because disk does not belong to storage policy",
quoteString(src_part->data_part_storage->getFullPath()));
quoteString(src_part->getDataPartStorage().getFullPath()));
String dst_part_name = src_part->getNewName(dst_part_info);
assert(!tmp_part_prefix.empty());
@ -6228,9 +6236,8 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::cloneAn
auto temporary_directory_lock = getTemporaryPartDirectoryHolder(tmp_dst_part_name);
/// Why it is needed if we only hardlink files?
auto reservation = src_part->data_part_storage->reserve(src_part->getBytesOnDisk());
auto src_part_storage = src_part->data_part_storage;
auto reservation = src_part->getDataPartStorage().reserve(src_part->getBytesOnDisk());
auto src_part_storage = src_part->getDataPartStoragePtr();
/// If source part is in memory, flush it to disk and clone it already in on-disk format
if (auto src_part_in_memory = asInMemoryPart(src_part))
@ -6257,7 +6264,7 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::cloneAn
hardlinked_files->source_part_name = src_part->name;
hardlinked_files->source_table_shared_id = src_part->storage.getTableSharedID();
for (auto it = src_part->data_part_storage->iterate(); it->isValid(); it->next())
for (auto it = src_part->getDataPartStorage().iterate(); it->isValid(); it->next())
{
if (!files_to_copy_instead_of_hardlinks.contains(it->name())
&& it->name() != IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME
@ -6316,14 +6323,14 @@ Strings MergeTreeData::getDataPaths() const
void MergeTreeData::reportBrokenPart(MergeTreeData::DataPartPtr & data_part) const
{
if (data_part->data_part_storage && data_part->data_part_storage->isBroken())
if (data_part->getDataPartStorage().isBroken())
{
auto parts = getDataPartsForInternalUsage();
LOG_WARNING(log, "Scanning parts to recover on broken disk {}@{}.", data_part->data_part_storage->getDiskName(), data_part->data_part_storage->getDiskPath());
LOG_WARNING(log, "Scanning parts to recover on broken disk {}@{}.", data_part->getDataPartStorage().getDiskName(), data_part->getDataPartStorage().getDiskPath());
for (const auto & part : parts)
{
if (part->data_part_storage && part->data_part_storage->getDiskName() == data_part->data_part_storage->getDiskName())
if (part->getDataPartStorage().getDiskName() == data_part->getDataPartStorage().getDiskName())
broken_part_callback(part->name);
}
}
@ -6414,7 +6421,7 @@ PartitionCommandsResultInfo MergeTreeData::freezePartitionsByMatcher(
LOG_DEBUG(log, "Freezing part {} snapshot will be placed at {}", part->name, backup_path);
auto data_part_storage = part->data_part_storage;
auto data_part_storage = part->getDataPartStoragePtr();
String src_part_path = data_part_storage->getRelativePath();
String backup_part_path = fs::path(backup_path) / relative_data_path;
if (auto part_in_memory = asInMemoryPart(part))
@ -6428,12 +6435,12 @@ PartitionCommandsResultInfo MergeTreeData::freezePartitionsByMatcher(
// Store metadata for replicated table.
// Do nothing for non-replicated.
createAndStoreFreezeMetadata(disk, part, fs::path(backup_part_path) / part->data_part_storage->getPartDirectory());
createAndStoreFreezeMetadata(disk, part, fs::path(backup_part_path) / part->getDataPartStorage().getPartDirectory());
};
auto new_storage = data_part_storage->freeze(
backup_part_path,
part->data_part_storage->getPartDirectory(),
part->getDataPartStorage().getPartDirectory(),
/*make_source_readonly*/ true,
callback,
/*copy_instead_of_hardlink*/ false,
@ -6555,8 +6562,8 @@ try
if (result_part)
{
part_log_elem.disk_name = result_part->data_part_storage->getDiskName();
part_log_elem.path_on_disk = result_part->data_part_storage->getFullPath();
part_log_elem.disk_name = result_part->getDataPartStorage().getDiskName();
part_log_elem.path_on_disk = result_part->getDataPartStorage().getFullPath();
part_log_elem.bytes_compressed_on_disk = result_part->getBytesOnDisk();
part_log_elem.rows = result_part->rows_count;
part_log_elem.part_type = result_part->getType();
@ -6712,7 +6719,7 @@ bool MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagge
for (const auto & moving_part : moving_tagger->parts_to_move)
{
Stopwatch stopwatch;
DataPartPtr cloned_part;
MutableDataPartPtr cloned_part;
auto write_part_log = [&](const ExecutionStatus & execution_status)
{
@ -6975,7 +6982,7 @@ ReservationPtr MergeTreeData::balancedReservation(
if (part->isStoredOnDisk() && part->getBytesOnDisk() >= min_bytes_to_rebalance_partition_over_jbod
&& part_info.partition_id == part->info.partition_id)
{
auto name = part->data_part_storage->getDiskName();
auto name = part->getDataPartStorage().getDiskName();
auto it = disk_occupation.find(name);
if (it != disk_occupation.end())
{

View File

@ -214,6 +214,7 @@ public:
};
using DataParts = std::set<DataPartPtr, LessDataPart>;
using MutableDataParts = std::set<MutableDataPartPtr, LessDataPart>;
using DataPartsVector = std::vector<DataPartPtr>;
using DataPartsLock = std::unique_lock<std::mutex>;
@ -275,8 +276,8 @@ public:
MergeTreeData & data;
MergeTreeTransaction * txn;
DataParts precommitted_parts;
DataParts locked_parts;
MutableDataParts precommitted_parts;
MutableDataParts locked_parts;
bool has_in_memory_parts = false;
void clear();
@ -413,8 +414,8 @@ public:
SelectQueryInfo & info) const override;
ReservationPtr reserveSpace(UInt64 expected_size, VolumePtr & volume) const;
static ReservationPtr tryReserveSpace(UInt64 expected_size, const MutableDataPartStoragePtr & data_part_storage);
static ReservationPtr reserveSpace(UInt64 expected_size, const MutableDataPartStoragePtr & data_part_storage);
static ReservationPtr tryReserveSpace(UInt64 expected_size, const IDataPartStorage & data_part_storage);
static ReservationPtr reserveSpace(UInt64 expected_size, const IDataPartStorage & data_part_storage);
static bool partsContainSameProjections(const DataPartPtr & left, const DataPartPtr & right);
@ -974,7 +975,7 @@ public:
/// Fetch part only if some replica has it on shared storage like S3
/// Overridden in StorageReplicatedMergeTree
virtual bool tryToFetchIfShared(const IMergeTreeDataPart &, const DiskPtr &, const String &) { return false; }
virtual MutableDataPartStoragePtr tryToFetchIfShared(const IMergeTreeDataPart &, const DiskPtr &, const String &) { return nullptr; }
/// Check shared data usage on other replicas for detached/freezed part
/// Remove local files and remote files if needed
@ -1259,7 +1260,6 @@ protected:
static void incrementMergedPartsProfileEvent(MergeTreeDataPartType type);
private:
/// Checking that candidate part doesn't break invariants: correct partition and doesn't exist already
void checkPartCanBeAddedToTable(MutableDataPartPtr & part, DataPartsLock & lock) const;
@ -1328,8 +1328,8 @@ private:
virtual std::unique_ptr<MergeTreeSettings> getDefaultSettings() const = 0;
void loadDataPartsFromDisk(
DataPartsVector & broken_parts_to_detach,
DataPartsVector & duplicate_parts_to_remove,
MutableDataPartsVector & broken_parts_to_detach,
MutableDataPartsVector & duplicate_parts_to_remove,
ThreadPool & pool,
size_t num_parts,
std::queue<std::vector<std::pair<String, DiskPtr>>> & parts_queue,
@ -1337,8 +1337,7 @@ private:
const MergeTreeSettingsPtr & settings);
void loadDataPartsFromWAL(
DataPartsVector & broken_parts_to_detach,
DataPartsVector & duplicate_parts_to_remove,
MutableDataPartsVector & duplicate_parts_to_remove,
MutableDataPartsVector & parts_from_wal);
/// Create zero-copy exclusive lock for part and disk. Useful for coordination of
@ -1350,6 +1349,8 @@ private:
/// Otherwise, in non-parallel case will break and return.
void clearPartsFromFilesystemImpl(const DataPartsVector & parts, NameSet * part_names_succeed);
static MutableDataPartPtr preparePartForRemoval(const DataPartPtr & part);
TemporaryParts temporary_parts;
};

View File

@ -482,7 +482,7 @@ MergeTaskPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart(
const Names & deduplicate_by_columns,
const MergeTreeData::MergingParams & merging_params,
const MergeTreeTransactionPtr & txn,
const IMergeTreeDataPart * parent_part,
IMergeTreeDataPart * parent_part,
const String & suffix)
{
return std::make_shared<MergeTask>(

View File

@ -113,7 +113,7 @@ public:
const Names & deduplicate_by_columns,
const MergeTreeData::MergingParams & merging_params,
const MergeTreeTransactionPtr & txn,
const IMergeTreeDataPart * parent_part = nullptr,
IMergeTreeDataPart * parent_part = nullptr,
const String & suffix = "");
/// Mutate a single data part with the specified commands. Will create and return a temporary part.

View File

@ -96,21 +96,21 @@ void MergeTreeDataPartCompact::calculateEachColumnSizes(ColumnSizeByName & /*eac
void MergeTreeDataPartCompact::loadIndexGranularityImpl(
MergeTreeIndexGranularity & index_granularity_, const MergeTreeIndexGranularityInfo & index_granularity_info_,
size_t columns_count, const DataPartStoragePtr & data_part_storage_)
size_t columns_count, const IDataPartStorage & data_part_storage_)
{
if (!index_granularity_info_.mark_type.adaptive)
throw Exception("MergeTreeDataPartCompact cannot be created with non-adaptive granulary.", ErrorCodes::NOT_IMPLEMENTED);
auto marks_file_path = index_granularity_info_.getMarksFilePath("data");
if (!data_part_storage_->exists(marks_file_path))
if (!data_part_storage_.exists(marks_file_path))
throw Exception(
ErrorCodes::NO_FILE_IN_DATA_PART,
"Marks file '{}' doesn't exist",
std::string(fs::path(data_part_storage_->getFullPath()) / marks_file_path));
std::string(fs::path(data_part_storage_.getFullPath()) / marks_file_path));
size_t marks_file_size = data_part_storage_->getFileSize(marks_file_path);
size_t marks_file_size = data_part_storage_.getFileSize(marks_file_path);
std::unique_ptr<ReadBufferFromFileBase> buffer = data_part_storage_->readFile(
std::unique_ptr<ReadBufferFromFileBase> buffer = data_part_storage_.readFile(
marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size, std::nullopt);
std::unique_ptr<ReadBuffer> marks_reader;
@ -139,7 +139,7 @@ void MergeTreeDataPartCompact::loadIndexGranularity()
if (columns.empty())
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
loadIndexGranularityImpl(index_granularity, index_granularity_info, columns.size(), data_part_storage);
loadIndexGranularityImpl(index_granularity, index_granularity_info, columns.size(), getDataPartStorage());
}
bool MergeTreeDataPartCompact::hasColumnFiles(const NameAndTypePair & column) const
@ -170,12 +170,12 @@ void MergeTreeDataPartCompact::checkConsistency(bool require_part_metadata) cons
throw Exception(
ErrorCodes::NO_FILE_IN_DATA_PART,
"No marks file checksum for column in part {}",
data_part_storage->getFullPath());
getDataPartStorage().getFullPath());
if (!checksums.files.contains(DATA_FILE_NAME_WITH_EXTENSION))
throw Exception(
ErrorCodes::NO_FILE_IN_DATA_PART,
"No data file checksum for in part {}",
data_part_storage->getFullPath());
getDataPartStorage().getFullPath());
}
}
else
@ -183,33 +183,33 @@ void MergeTreeDataPartCompact::checkConsistency(bool require_part_metadata) cons
{
/// count.txt should be present even in non custom-partitioned parts
std::string file_path = "count.txt";
if (!data_part_storage->exists(file_path) || data_part_storage->getFileSize(file_path) == 0)
if (!getDataPartStorage().exists(file_path) || getDataPartStorage().getFileSize(file_path) == 0)
throw Exception(
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART,
"Part {} is broken: {} is empty",
data_part_storage->getRelativePath(),
std::string(fs::path(data_part_storage->getFullPath()) / file_path));
getDataPartStorage().getRelativePath(),
std::string(fs::path(getDataPartStorage().getFullPath()) / file_path));
}
/// Check that marks are nonempty and have the consistent size with columns number.
if (data_part_storage->exists(mrk_file_name))
if (getDataPartStorage().exists(mrk_file_name))
{
UInt64 file_size = data_part_storage->getFileSize(mrk_file_name);
UInt64 file_size = getDataPartStorage().getFileSize(mrk_file_name);
if (!file_size)
throw Exception(
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART,
"Part {} is broken: {} is empty.",
data_part_storage->getRelativePath(),
std::string(fs::path(data_part_storage->getFullPath()) / mrk_file_name));
getDataPartStorage().getRelativePath(),
std::string(fs::path(getDataPartStorage().getFullPath()) / mrk_file_name));
UInt64 expected_file_size = index_granularity_info.getMarkSizeInBytes(columns.size()) * index_granularity.getMarksCount();
if (expected_file_size != file_size)
throw Exception(
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART,
"Part {} is broken: bad size of marks file '{}': {}, must be: {}",
data_part_storage->getRelativePath(),
std::string(fs::path(data_part_storage->getFullPath()) / mrk_file_name),
getDataPartStorage().getRelativePath(),
std::string(fs::path(getDataPartStorage().getFullPath()) / mrk_file_name),
std::to_string(file_size), std::to_string(expected_file_size));
}
}
@ -217,12 +217,12 @@ void MergeTreeDataPartCompact::checkConsistency(bool require_part_metadata) cons
bool MergeTreeDataPartCompact::isStoredOnRemoteDisk() const
{
return data_part_storage->isStoredOnRemoteDisk();
return getDataPartStorage().isStoredOnRemoteDisk();
}
bool MergeTreeDataPartCompact::isStoredOnRemoteDiskWithZeroCopySupport() const
{
return data_part_storage->supportZeroCopyReplication();
return getDataPartStorage().supportZeroCopyReplication();
}
MergeTreeDataPartCompact::~MergeTreeDataPartCompact()

View File

@ -67,7 +67,7 @@ public:
protected:
static void loadIndexGranularityImpl(
MergeTreeIndexGranularity & index_granularity_, const MergeTreeIndexGranularityInfo & index_granularity_info_,
size_t columns_count, const DataPartStoragePtr & data_part_storage_);
size_t columns_count, const IDataPartStorage & data_part_storage_);
private:
void checkConsistency(bool require_part_metadata) const override;

View File

@ -6,6 +6,7 @@
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
#include <Storages/MergeTree/DataPartStorageOnDisk.h>
#include <DataTypes/NestedUtils.h>
#include <Disks/createVolume.h>
#include <Interpreters/Context.h>
#include <Poco/Logger.h>
#include <Common/logger_useful.h>
@ -71,12 +72,18 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartInMemory::getWriter(
MutableDataPartStoragePtr MergeTreeDataPartInMemory::flushToDisk(const String & new_relative_path, const StorageMetadataPtr & metadata_snapshot) const
{
auto current_full_path = data_part_storage->getFullPath();
auto new_data_part_storage = data_part_storage->clone();
auto reservation = storage.reserveSpace(block.bytes(), getDataPartStorage());
VolumePtr volume = storage.getStoragePolicy()->getVolume(0);
VolumePtr data_part_volume = createVolumeFromReservation(reservation, volume);
auto new_data_part_storage = std::make_shared<DataPartStorageOnDisk>(
data_part_volume,
storage.getRelativeDataPath(),
new_relative_path);
new_data_part_storage->setRelativePath(new_relative_path);
new_data_part_storage->beginTransaction();
auto current_full_path = getDataPartStorage().getFullPath();
auto new_type = storage.choosePartTypeOnDisk(block.bytes(), rows_count);
auto new_data_part = storage.createPart(name, new_type, info, new_data_part_storage);
@ -148,12 +155,9 @@ void MergeTreeDataPartInMemory::makeCloneInDetached(const String & prefix, const
flushToDisk(detached_path, metadata_snapshot);
}
void MergeTreeDataPartInMemory::renameTo(const String & new_relative_path, bool /* remove_new_dir_if_exists */) const
void MergeTreeDataPartInMemory::renameTo(const String & new_relative_path, bool /* remove_new_dir_if_exists */)
{
data_part_storage->setRelativePath(new_relative_path);
if (data_part_storage)
data_part_storage->setRelativePath(new_relative_path);
getDataPartStorage().setRelativePath(new_relative_path);
}
void MergeTreeDataPartInMemory::calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const

View File

@ -46,7 +46,7 @@ public:
bool isStoredOnRemoteDiskWithZeroCopySupport() const override { return false; }
bool hasColumnFiles(const NameAndTypePair & column) const override { return !!getColumnPosition(column.getNameInStorage()); }
String getFileNameForColumn(const NameAndTypePair & /* column */) const override { return ""; }
void renameTo(const String & new_relative_path, bool remove_new_dir_if_exists) const override;
void renameTo(const String & new_relative_path, bool remove_new_dir_if_exists) override;
void makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot) const override;
MutableDataPartStoragePtr flushToDisk(const String & new_relative_path, const StorageMetadataPtr & metadata_snapshot) const;

View File

@ -104,18 +104,18 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl(
void MergeTreeDataPartWide::loadIndexGranularityImpl(
MergeTreeIndexGranularity & index_granularity_, MergeTreeIndexGranularityInfo & index_granularity_info_,
const DataPartStoragePtr & data_part_storage_, const std::string & any_column_file_name)
const IDataPartStorage & data_part_storage_, const std::string & any_column_file_name)
{
index_granularity_info_.changeGranularityIfRequired(data_part_storage_);
/// We can use any column, it doesn't matter
std::string marks_file_path = index_granularity_info_.getMarksFilePath(any_column_file_name);
if (!data_part_storage_->exists(marks_file_path))
if (!data_part_storage_.exists(marks_file_path))
throw Exception(
ErrorCodes::NO_FILE_IN_DATA_PART, "Marks file '{}' doesn't exist",
std::string(fs::path(data_part_storage_->getFullPath()) / marks_file_path));
std::string(fs::path(data_part_storage_.getFullPath()) / marks_file_path));
size_t marks_file_size = data_part_storage_->getFileSize(marks_file_path);
size_t marks_file_size = data_part_storage_.getFileSize(marks_file_path);
if (!index_granularity_info_.mark_type.adaptive && !index_granularity_info_.mark_type.compressed)
{
@ -125,7 +125,7 @@ void MergeTreeDataPartWide::loadIndexGranularityImpl(
}
else
{
auto marks_file = data_part_storage_->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size, std::nullopt);
auto marks_file = data_part_storage_.readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size, std::nullopt);
std::unique_ptr<ReadBuffer> marks_reader;
if (!index_granularity_info_.mark_type.compressed)
@ -162,18 +162,18 @@ void MergeTreeDataPartWide::loadIndexGranularity()
if (columns.empty())
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
loadIndexGranularityImpl(index_granularity, index_granularity_info, data_part_storage, getFileNameForColumn(columns.front()));
loadIndexGranularityImpl(index_granularity, index_granularity_info, getDataPartStorage(), getFileNameForColumn(columns.front()));
}
bool MergeTreeDataPartWide::isStoredOnRemoteDisk() const
{
return data_part_storage->isStoredOnRemoteDisk();
return getDataPartStorage().isStoredOnRemoteDisk();
}
bool MergeTreeDataPartWide::isStoredOnRemoteDiskWithZeroCopySupport() const
{
return data_part_storage->supportZeroCopyReplication();
return getDataPartStorage().supportZeroCopyReplication();
}
MergeTreeDataPartWide::~MergeTreeDataPartWide()
@ -202,13 +202,13 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
throw Exception(
ErrorCodes::NO_FILE_IN_DATA_PART,
"No {} file checksum for column {} in part {} ",
mrk_file_name, name_type.name, data_part_storage->getFullPath());
mrk_file_name, name_type.name, getDataPartStorage().getFullPath());
if (!checksums.files.contains(bin_file_name))
throw Exception(
ErrorCodes::NO_FILE_IN_DATA_PART,
"No {} file checksum for column {} in part ",
bin_file_name, name_type.name, data_part_storage->getFullPath());
bin_file_name, name_type.name, getDataPartStorage().getFullPath());
});
}
}
@ -224,23 +224,23 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
auto file_path = ISerialization::getFileNameForStream(name_type, substream_path) + marks_file_extension;
/// Missing file is Ok for case when new column was added.
if (data_part_storage->exists(file_path))
if (getDataPartStorage().exists(file_path))
{
UInt64 file_size = data_part_storage->getFileSize(file_path);
UInt64 file_size = getDataPartStorage().getFileSize(file_path);
if (!file_size)
throw Exception(
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART,
"Part {} is broken: {} is empty.",
data_part_storage->getFullPath(),
std::string(fs::path(data_part_storage->getFullPath()) / file_path));
getDataPartStorage().getFullPath(),
std::string(fs::path(getDataPartStorage().getFullPath()) / file_path));
if (!marks_size)
marks_size = file_size;
else if (file_size != *marks_size)
throw Exception(
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART,
"Part {} is broken: marks have different sizes.", data_part_storage->getFullPath());
"Part {} is broken: marks have different sizes.", getDataPartStorage().getFullPath());
}
});
}

View File

@ -1,5 +1,6 @@
#pragma once
#include "Storages/MergeTree/IDataPartStorage.h"
#include <Storages/MergeTree/IMergeTreeDataPart.h>
namespace DB
@ -63,7 +64,7 @@ public:
protected:
static void loadIndexGranularityImpl(
MergeTreeIndexGranularity & index_granularity_, MergeTreeIndexGranularityInfo & index_granularity_info_,
const DataPartStoragePtr & data_part_storage_, const std::string & any_column_file_name);
const IDataPartStorage & data_part_storage_, const std::string & any_column_file_name);
private:
void checkConsistency(bool require_part_metadata) const override;

View File

@ -21,13 +21,13 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
: MergeTreeDataPartWriterOnDisk(data_part_, columns_list_, metadata_snapshot_,
indices_to_recalc_, marks_file_extension_,
default_codec_, settings_, index_granularity_)
, plain_file(data_part_->data_part_storage->writeFile(
, plain_file(data_part_->getDataPartStorage().writeFile(
MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION,
settings.max_compress_block_size,
settings_.query_write_settings))
, plain_hashing(*plain_file)
{
marks_file = data_part_->data_part_storage->writeFile(
marks_file = data_part_->getDataPartStorage().writeFile(
MergeTreeDataPartCompact::DATA_FILE_NAME + marks_file_extension_,
4096,
settings_.query_write_settings);

View File

@ -114,8 +114,8 @@ MergeTreeDataPartWriterOnDisk::MergeTreeDataPartWriterOnDisk(
if (settings.blocks_are_granules_size && !index_granularity.empty())
throw Exception("Can't take information about index granularity from blocks, when non empty index_granularity array specified", ErrorCodes::LOGICAL_ERROR);
if (!data_part->data_part_storage->exists())
data_part->data_part_storage->createDirectories();
if (!data_part->getDataPartStorage().exists())
data_part->getDataPartStorage().createDirectories();
if (settings.rewrite_primary_key)
initPrimaryIndex();
@ -176,7 +176,7 @@ void MergeTreeDataPartWriterOnDisk::initPrimaryIndex()
if (metadata_snapshot->hasPrimaryKey())
{
String index_name = "primary" + getIndexExtension(compress_primary_key);
index_file_stream = data_part->data_part_storage->writeFile(index_name, DBMS_DEFAULT_BUFFER_SIZE, settings.query_write_settings);
index_file_stream = data_part->getDataPartStorage().writeFile(index_name, DBMS_DEFAULT_BUFFER_SIZE, settings.query_write_settings);
index_file_hashing_stream = std::make_unique<HashingWriteBuffer>(*index_file_stream);
if (compress_primary_key)
@ -202,7 +202,7 @@ void MergeTreeDataPartWriterOnDisk::initSkipIndices()
skip_indices_streams.emplace_back(
std::make_unique<MergeTreeDataPartWriterOnDisk::Stream>(
stream_name,
data_part->data_part_storage,
data_part->getDataPartStoragePtr(),
stream_name, index_helper->getSerializedFileExtension(),
stream_name, marks_file_extension,
default_codec, settings.max_compress_block_size,

View File

@ -116,7 +116,7 @@ void MergeTreeDataPartWriterWide::addStreams(
column_streams[stream_name] = std::make_unique<Stream>(
stream_name,
data_part->data_part_storage,
data_part->getDataPartStoragePtr(),
stream_name, DATA_FILE_EXTENSION,
stream_name, marks_file_extension,
compression_codec,
@ -421,17 +421,17 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai
String bin_path = escaped_name + DATA_FILE_EXTENSION;
/// Some columns may be removed because of ttl. Skip them.
if (!data_part->data_part_storage->exists(mrk_path))
if (!data_part->getDataPartStorage().exists(mrk_path))
return;
auto mrk_file_in = data_part->data_part_storage->readFile(mrk_path, {}, std::nullopt, std::nullopt);
auto mrk_file_in = data_part->getDataPartStorage().readFile(mrk_path, {}, std::nullopt, std::nullopt);
std::unique_ptr<ReadBuffer> mrk_in;
if (data_part->index_granularity_info.mark_type.compressed)
mrk_in = std::make_unique<CompressedReadBufferFromFile>(std::move(mrk_file_in));
else
mrk_in = std::move(mrk_file_in);
DB::CompressedReadBufferFromFile bin_in(data_part->data_part_storage->readFile(bin_path, {}, std::nullopt, std::nullopt));
DB::CompressedReadBufferFromFile bin_in(data_part->getDataPartStorage().readFile(bin_path, {}, std::nullopt, std::nullopt));
bool must_be_last = false;
UInt64 offset_in_compressed_file = 0;
UInt64 offset_in_decompressed_block = 0;
@ -482,7 +482,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai
if (index_granularity_rows != index_granularity.getMarkRows(mark_num))
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Incorrect mark rows for part {} for mark #{} (compressed offset {}, decompressed offset {}), in-memory {}, on disk {}, total marks {}",
data_part->data_part_storage->getFullPath(), mark_num, offset_in_compressed_file, offset_in_decompressed_block, index_granularity.getMarkRows(mark_num), index_granularity_rows, index_granularity.getMarksCount());
data_part->getDataPartStorage().getFullPath(), mark_num, offset_in_compressed_file, offset_in_decompressed_block, index_granularity.getMarkRows(mark_num), index_granularity_rows, index_granularity.getMarksCount());
auto column = type->createColumn();

View File

@ -1618,10 +1618,10 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
UncompressedCache * uncompressed_cache,
Poco::Logger * log)
{
if (!index_helper->getDeserializedFormat(part->data_part_storage, index_helper->getFileName()))
if (!index_helper->getDeserializedFormat(part->getDataPartStorage(), index_helper->getFileName()))
{
LOG_DEBUG(log, "File for index {} does not exist ({}.*). Skipping it.", backQuote(index_helper->index.name),
(fs::path(part->data_part_storage->getFullPath()) / index_helper->getFileName()).string());
(fs::path(part->getDataPartStorage().getFullPath()) / index_helper->getFileName()).string());
return ranges;
}
@ -1736,7 +1736,7 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingMergedIndex(
{
for (const auto & index_helper : indices)
{
if (!part->data_part_storage->exists(index_helper->getFileName() + ".idx"))
if (!part->getDataPartStorage().exists(index_helper->getFileName() + ".idx"))
{
LOG_DEBUG(log, "File for index {} does not exist. Skipping it.", backQuote(index_helper->index.name));
return ranges;

View File

@ -405,9 +405,9 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPart(
if (new_data_part->isStoredOnDisk())
{
/// The name could be non-unique in case of stale files from previous runs.
String full_path = new_data_part->data_part_storage->getFullPath();
String full_path = new_data_part->getDataPartStorage().getFullPath();
if (new_data_part->data_part_storage->exists())
if (new_data_part->getDataPartStorage().exists())
{
LOG_WARNING(log, "Removing old temporary directory {}", full_path);
data_part_storage->removeRecursive();
@ -493,7 +493,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
const StorageMetadataPtr & metadata_snapshot = projection.metadata;
MergeTreePartInfo new_part_info("all", 0, 0, 0);
auto projection_part_storage = parent_part->data_part_storage->getProjection(relative_path);
auto projection_part_storage = parent_part->getDataPartStorage().getProjection(relative_path);
auto new_data_part = data.createPart(
part_name,
part_type,
@ -600,7 +600,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPart(
/// Size of part would not be greater than block.bytes() + epsilon
size_t expected_size = block.bytes();
// just check if there is enough space on parent volume
data.reserveSpace(expected_size, parent_part->data_part_storage);
data.reserveSpace(expected_size, parent_part->getDataPartStorage());
part_type = data.choosePartTypeOnDisk(expected_size, block.rows());
}
@ -637,7 +637,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempProjectionPart(
/// Size of part would not be greater than block.bytes() + epsilon
size_t expected_size = block.bytes();
// just check if there is enough space on parent volume
data.reserveSpace(expected_size, parent_part->data_part_storage);
data.reserveSpace(expected_size, parent_part->getDataPartStorage());
part_type = data.choosePartTypeOnDisk(expected_size, block.rows());
}

View File

@ -89,10 +89,10 @@ std::string MarkType::getFileExtension() const
}
std::optional<std::string> MergeTreeIndexGranularityInfo::getMarksExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage)
std::optional<std::string> MergeTreeIndexGranularityInfo::getMarksExtensionFromFilesystem(const IDataPartStorage & data_part_storage)
{
if (data_part_storage->exists())
for (auto it = data_part_storage->iterate(); it->isValid(); it->next())
if (data_part_storage.exists())
for (auto it = data_part_storage.iterate(); it->isValid(); it->next())
if (it->isFile())
if (std::string ext = fs::path(it->name()).extension(); MarkType::isMarkFileExtension(ext))
return ext;
@ -110,7 +110,7 @@ MergeTreeIndexGranularityInfo::MergeTreeIndexGranularityInfo(const MergeTreeData
fixed_index_granularity = storage.getSettings()->index_granularity;
}
void MergeTreeIndexGranularityInfo::changeGranularityIfRequired(const DataPartStoragePtr & data_part_storage)
void MergeTreeIndexGranularityInfo::changeGranularityIfRequired(const IDataPartStorage & data_part_storage)
{
auto mrk_ext = getMarksExtensionFromFilesystem(data_part_storage);
if (mrk_ext && !MarkType(*mrk_ext).adaptive)

View File

@ -48,7 +48,7 @@ public:
MergeTreeIndexGranularityInfo(MergeTreeDataPartType type_, bool is_adaptive_, size_t index_granularity_, size_t index_granularity_bytes_);
void changeGranularityIfRequired(const DataPartStoragePtr & data_part_storage);
void changeGranularityIfRequired(const IDataPartStorage & data_part_storage);
String getMarksFilePath(const String & path_prefix) const
{
@ -57,7 +57,7 @@ public:
size_t getMarkSizeInBytes(size_t columns_num = 1) const;
static std::optional<std::string> getMarksExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage);
static std::optional<std::string> getMarksExtensionFromFilesystem(const IDataPartStorage & data_part_storage);
};
constexpr inline auto getNonAdaptiveMrkSizeWide() { return sizeof(UInt64) * 2; }

View File

@ -211,11 +211,11 @@ bool MergeTreeIndexMinMax::mayBenefitFromIndexForIn(const ASTPtr & node) const
return false;
}
MergeTreeIndexFormat MergeTreeIndexMinMax::getDeserializedFormat(const DataPartStoragePtr & data_part_storage, const std::string & relative_path_prefix) const
MergeTreeIndexFormat MergeTreeIndexMinMax::getDeserializedFormat(const IDataPartStorage & data_part_storage, const std::string & relative_path_prefix) const
{
if (data_part_storage->exists(relative_path_prefix + ".idx2"))
if (data_part_storage.exists(relative_path_prefix + ".idx2"))
return {2, ".idx2"};
else if (data_part_storage->exists(relative_path_prefix + ".idx"))
else if (data_part_storage.exists(relative_path_prefix + ".idx"))
return {1, ".idx"};
return {0 /* unknown */, ""};
}

View File

@ -83,7 +83,7 @@ public:
bool mayBenefitFromIndexForIn(const ASTPtr & node) const override;
const char* getSerializedFileExtension() const override { return ".idx2"; }
MergeTreeIndexFormat getDeserializedFormat(const DataPartStoragePtr & data_part_storage, const std::string & path_prefix) const override; /// NOLINT
MergeTreeIndexFormat getDeserializedFormat(const IDataPartStorage & data_part_storage, const std::string & path_prefix) const override; /// NOLINT
};
}

View File

@ -20,7 +20,7 @@ std::unique_ptr<MergeTreeReaderStream> makeIndexReader(
auto * load_marks_threadpool = settings.read_settings.load_marks_asynchronously ? &context->getLoadMarksThreadpool() : nullptr;
return std::make_unique<MergeTreeReaderStream>(
part->data_part_storage,
part->getDataPartStoragePtr(),
index->getFileName(), extension, marks_count,
all_mark_ranges,
std::move(settings), mark_cache, uncompressed_cache,
@ -44,7 +44,7 @@ MergeTreeIndexReader::MergeTreeIndexReader(
MergeTreeReaderSettings settings)
: index(index_)
{
auto index_format = index->getDeserializedFormat(part_->data_part_storage, index->getFileName());
auto index_format = index->getDeserializedFormat(part_->getDataPartStorage(), index->getFileName());
stream = makeIndexReader(
index_format.extension,

View File

@ -148,9 +148,9 @@ struct IMergeTreeIndex
/// Returns extension for deserialization.
///
/// Return pair<extension, version>.
virtual MergeTreeIndexFormat getDeserializedFormat(const DataPartStoragePtr & data_part_storage, const std::string & relative_path_prefix) const
virtual MergeTreeIndexFormat getDeserializedFormat(const IDataPartStorage & data_part_storage, const std::string & relative_path_prefix) const
{
if (data_part_storage->exists(relative_path_prefix + ".idx"))
if (data_part_storage.exists(relative_path_prefix + ".idx"))
return {1, ".idx"};
return {0 /*unknown*/, ""};
}

View File

@ -382,7 +382,7 @@ void MergeTreePartition::load(const MergeTreeData & storage, const PartMetadataM
partition_key_sample.getByPosition(i).type->getDefaultSerialization()->deserializeBinary(value[i], *file);
}
std::unique_ptr<WriteBufferFromFileBase> MergeTreePartition::store(const MergeTreeData & storage, const MutableDataPartStoragePtr & data_part_storage, MergeTreeDataPartChecksums & checksums) const
std::unique_ptr<WriteBufferFromFileBase> MergeTreePartition::store(const MergeTreeData & storage, IDataPartStorage & data_part_storage, MergeTreeDataPartChecksums & checksums) const
{
auto metadata_snapshot = storage.getInMemoryMetadataPtr();
const auto & context = storage.getContext();
@ -390,12 +390,12 @@ std::unique_ptr<WriteBufferFromFileBase> MergeTreePartition::store(const MergeTr
return store(partition_key_sample, data_part_storage, checksums, context->getWriteSettings());
}
std::unique_ptr<WriteBufferFromFileBase> MergeTreePartition::store(const Block & partition_key_sample, const MutableDataPartStoragePtr & data_part_storage, MergeTreeDataPartChecksums & checksums, const WriteSettings & settings) const
std::unique_ptr<WriteBufferFromFileBase> MergeTreePartition::store(const Block & partition_key_sample, IDataPartStorage & data_part_storage, MergeTreeDataPartChecksums & checksums, const WriteSettings & settings) const
{
if (!partition_key_sample)
return nullptr;
auto out = data_part_storage->writeFile("partition.dat", DBMS_DEFAULT_BUFFER_SIZE, settings);
auto out = data_part_storage.writeFile("partition.dat", DBMS_DEFAULT_BUFFER_SIZE, settings);
HashingWriteBuffer out_hashing(*out);
for (size_t i = 0; i < value.size(); ++i)
{

View File

@ -44,8 +44,8 @@ public:
/// Store functions return write buffer with written but not finalized data.
/// User must call finish() for returned object.
[[nodiscard]] std::unique_ptr<WriteBufferFromFileBase> store(const MergeTreeData & storage, const MutableDataPartStoragePtr & data_part_storage, MergeTreeDataPartChecksums & checksums) const;
[[nodiscard]] std::unique_ptr<WriteBufferFromFileBase> store(const Block & partition_key_sample, const MutableDataPartStoragePtr & data_part_storage, MergeTreeDataPartChecksums & checksums, const WriteSettings & settings) const;
[[nodiscard]] std::unique_ptr<WriteBufferFromFileBase> store(const MergeTreeData & storage, IDataPartStorage & data_part_storage, MergeTreeDataPartChecksums & checksums) const;
[[nodiscard]] std::unique_ptr<WriteBufferFromFileBase> store(const Block & partition_key_sample, IDataPartStorage & data_part_storage, MergeTreeDataPartChecksums & checksums, const WriteSettings & settings) const;
void assign(const MergeTreePartition & other) { value = other.value; }

View File

@ -140,7 +140,7 @@ bool MergeTreePartsMover::selectPartsForMove(
auto ttl_entry = selectTTLDescriptionForTTLInfos(metadata_snapshot->getMoveTTLs(), part->ttl_infos.moves_ttl, time_of_move, true);
auto to_insert = need_to_move.end();
if (auto disk_it = part->data_part_storage->isStoredOnDisk(need_to_move_disks); disk_it != need_to_move_disks.end())
if (auto disk_it = part->getDataPartStorage().isStoredOnDisk(need_to_move_disks); disk_it != need_to_move_disks.end())
to_insert = need_to_move.find(*disk_it);
ReservationPtr reservation;
@ -199,7 +199,7 @@ bool MergeTreePartsMover::selectPartsForMove(
return false;
}
MergeTreeData::DataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEntry & moving_part) const
MergeTreeMutableDataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEntry & moving_part) const
{
if (moves_blocker.isCancelled())
throw Exception("Cancelled moving parts.", ErrorCodes::ABORTED);
@ -207,7 +207,7 @@ MergeTreeData::DataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEnt
auto settings = data->getSettings();
auto part = moving_part.part;
auto disk = moving_part.reserved_space->getDisk();
LOG_DEBUG(log, "Cloning part {} from '{}' to '{}'", part->name, part->data_part_storage->getDiskName(), disk->getName());
LOG_DEBUG(log, "Cloning part {} from '{}' to '{}'", part->name, part->getDataPartStorage().getDiskName(), disk->getName());
MutableDataPartStoragePtr cloned_part_storage;
if (disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication)
@ -215,7 +215,7 @@ MergeTreeData::DataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEnt
/// Try zero-copy replication and fallback to default copy if it's not possible
moving_part.part->assertOnDisk();
String path_to_clone = fs::path(data->getRelativeDataPath()) / MergeTreeData::MOVING_DIR_NAME / "";
String relative_path = part->data_part_storage->getPartDirectory();
String relative_path = part->getDataPartStorage().getPartDirectory();
if (disk->exists(path_to_clone + relative_path))
{
LOG_WARNING(log, "Path {} already exists. Will remove it and clone again.", fullPath(disk, path_to_clone + relative_path));
@ -224,16 +224,12 @@ MergeTreeData::DataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEnt
disk->createDirectories(path_to_clone);
bool is_fetched = data->tryToFetchIfShared(*part, disk, fs::path(path_to_clone) / part->name);
cloned_part_storage = data->tryToFetchIfShared(*part, disk, fs::path(path_to_clone) / part->name);
if (!is_fetched)
if (!cloned_part_storage)
{
LOG_INFO(log, "Part {} was not fetched, we are the first who move it to another disk, so we will copy it", part->name);
cloned_part_storage = part->data_part_storage->clonePart(path_to_clone, part->data_part_storage->getPartDirectory(), disk, log);
}
else
{
cloned_part_storage = part->data_part_storage->clone();
cloned_part_storage = part->getDataPartStorage().clonePart(path_to_clone, part->getDataPartStorage().getPartDirectory(), disk, log);
}
}
else
@ -242,16 +238,16 @@ MergeTreeData::DataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEnt
}
auto cloned_part = data->createPart(part->name, cloned_part_storage);
LOG_TRACE(log, "Part {} was cloned to {}", part->name, cloned_part->data_part_storage->getFullPath());
LOG_TRACE(log, "Part {} was cloned to {}", part->name, cloned_part->getDataPartStorage().getFullPath());
cloned_part->loadColumnsChecksumsIndexes(true, true);
cloned_part->loadVersionMetadata();
cloned_part->modification_time = cloned_part->data_part_storage->getLastModified().epochTime();
cloned_part->modification_time = cloned_part->getDataPartStorage().getLastModified().epochTime();
return cloned_part;
}
void MergeTreePartsMover::swapClonedPart(const MergeTreeData::DataPartPtr & cloned_part) const
void MergeTreePartsMover::swapClonedPart(const MergeTreeMutableDataPartPtr & cloned_part) const
{
if (moves_blocker.isCancelled())
throw Exception("Cancelled moving parts.", ErrorCodes::ABORTED);
@ -261,7 +257,7 @@ void MergeTreePartsMover::swapClonedPart(const MergeTreeData::DataPartPtr & clon
/// It's ok, because we don't block moving parts for merges or mutations
if (!active_part || active_part->name != cloned_part->name)
{
LOG_INFO(log, "Failed to swap {}. Active part doesn't exist. Possible it was merged or mutated. Will remove copy on path '{}'.", cloned_part->name, cloned_part->data_part_storage->getFullPath());
LOG_INFO(log, "Failed to swap {}. Active part doesn't exist. Possible it was merged or mutated. Will remove copy on path '{}'.", cloned_part->name, cloned_part->getDataPartStorage().getFullPath());
return;
}
@ -271,7 +267,7 @@ void MergeTreePartsMover::swapClonedPart(const MergeTreeData::DataPartPtr & clon
/// TODO what happen if server goes down here?
data->swapActivePart(cloned_part);
LOG_TRACE(log, "Part {} was moved to {}", cloned_part->name, cloned_part->data_part_storage->getFullPath());
LOG_TRACE(log, "Part {} was moved to {}", cloned_part->name, cloned_part->getDataPartStorage().getFullPath());
}
}

View File

@ -50,14 +50,14 @@ public:
const std::lock_guard<std::mutex> & moving_parts_lock);
/// Copies part to selected reservation in detached folder. Throws exception if part already exists.
MergeTreeDataPartPtr clonePart(const MergeTreeMoveEntry & moving_part) const;
MergeTreeMutableDataPartPtr clonePart(const MergeTreeMoveEntry & moving_part) const;
/// Replaces cloned part from detached directory into active data parts set.
/// Replacing part changes state to DeleteOnDestroy and will be removed from disk after destructor of
///IMergeTreeDataPart called. If replacing part doesn't exists or not active (committed) than
/// cloned part will be removed and log message will be reported. It may happen in case of concurrent
/// merge or mutation.
void swapClonedPart(const MergeTreeDataPartPtr & cloned_parts) const;
void swapClonedPart(const MergeTreeMutableDataPartPtr & cloned_parts) const;
/// Can stop background moves and moves from queries
ActionBlocker moves_blocker;

View File

@ -263,7 +263,7 @@ void MergeTreeReadPool::fillPerThreadInfo(
{
PartInfo part_info{parts[i], per_part_sum_marks[i], i};
if (parts[i].data_part->isStoredOnDisk())
parts_per_disk[parts[i].data_part->data_part_storage->getDiskName()].push_back(std::move(part_info));
parts_per_disk[parts[i].data_part->getDataPartStorage().getDiskName()].push_back(std::move(part_info));
else
parts_per_disk[""].push_back(std::move(part_info));
}

View File

@ -59,13 +59,15 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read to empty buffer.");
const String path = MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION;
auto data_part_storage = data_part_info_for_read->getDataPartStorage();
if (uncompressed_cache)
{
auto buffer = std::make_unique<CachedCompressedReadBuffer>(
std::string(fs::path(data_part_info_for_read->getDataPartStorage()->getFullPath()) / path),
[this, path]()
std::string(fs::path(data_part_storage->getFullPath()) / path),
[this, path, data_part_storage]()
{
return data_part_info_for_read->getDataPartStorage()->readFile(
return data_part_storage->readFile(
path,
settings.read_settings,
std::nullopt, std::nullopt);
@ -87,7 +89,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
{
auto buffer =
std::make_unique<CompressedReadBufferFromFile>(
data_part_info_for_read->getDataPartStorage()->readFile(
data_part_storage->readFile(
path,
settings.read_settings,
std::nullopt, std::nullopt),

View File

@ -81,7 +81,7 @@ void MergeTreeSink::consume(Chunk chunk)
if (!temp_part.part)
continue;
if (!support_parallel_write && temp_part.part->data_part_storage->supportParallelWrite())
if (!support_parallel_write && temp_part.part->getDataPartStorage().supportParallelWrite())
support_parallel_write = true;
if (storage.getDeduplicationLog())

View File

@ -94,7 +94,7 @@ void MergedBlockOutputStream::Finalizer::Impl::finish()
writer.finish(sync);
for (const auto & file_name : files_to_remove_after_finish)
part->data_part_storage->removeFile(file_name);
part->getDataPartStorage().removeFile(file_name);
for (auto & file : written_files)
{
@ -121,19 +121,19 @@ MergedBlockOutputStream::Finalizer & MergedBlockOutputStream::Finalizer::operato
MergedBlockOutputStream::Finalizer::Finalizer(std::unique_ptr<Impl> impl_) : impl(std::move(impl_)) {}
void MergedBlockOutputStream::finalizePart(
MergeTreeData::MutableDataPartPtr & new_part,
bool sync,
const NamesAndTypesList * total_columns_list,
MergeTreeData::DataPart::Checksums * additional_column_checksums)
const MergeTreeMutableDataPartPtr & new_part,
bool sync,
const NamesAndTypesList * total_columns_list,
MergeTreeData::DataPart::Checksums * additional_column_checksums)
{
finalizePartAsync(new_part, sync, total_columns_list, additional_column_checksums).finish();
}
MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
MergeTreeData::MutableDataPartPtr & new_part,
bool sync,
const NamesAndTypesList * total_columns_list,
MergeTreeData::DataPart::Checksums * additional_column_checksums)
const MergeTreeMutableDataPartPtr & new_part,
bool sync,
const NamesAndTypesList * total_columns_list,
MergeTreeData::DataPart::Checksums * additional_column_checksums)
{
/// Finish write and get checksums.
MergeTreeData::DataPart::Checksums checksums;
@ -183,7 +183,7 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
}
MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDisk(
const MergeTreeData::DataPartPtr & new_part,
const MergeTreeMutableDataPartPtr & new_part,
MergeTreeData::DataPart::Checksums & checksums)
{
WrittenFiles written_files;
@ -191,7 +191,7 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
{
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING || isCompactPart(new_part))
{
auto count_out = new_part->data_part_storage->writeFile("count.txt", 4096, write_settings);
auto count_out = new_part->getDataPartStorage().writeFile("count.txt", 4096, write_settings);
HashingWriteBuffer count_out_hashing(*count_out);
writeIntText(rows_count, count_out_hashing);
count_out_hashing.next();
@ -205,7 +205,7 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
{
if (new_part->uuid != UUIDHelpers::Nil)
{
auto out = new_part->data_part_storage->writeFile(IMergeTreeDataPart::UUID_FILE_NAME, 4096, write_settings);
auto out = new_part->getDataPartStorage().writeFile(IMergeTreeDataPart::UUID_FILE_NAME, 4096, write_settings);
HashingWriteBuffer out_hashing(*out);
writeUUIDText(new_part->uuid, out_hashing);
checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_size = out_hashing.count();
@ -216,12 +216,12 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
if (auto file = new_part->partition.store(storage, new_part->data_part_storage, checksums))
if (auto file = new_part->partition.store(storage, new_part->getDataPartStorage(), checksums))
written_files.emplace_back(std::move(file));
if (new_part->minmax_idx->initialized)
{
auto files = new_part->minmax_idx->store(storage, new_part->data_part_storage, checksums);
auto files = new_part->minmax_idx->store(storage, new_part->getDataPartStorage(), checksums);
for (auto & file : files)
written_files.emplace_back(std::move(file));
}
@ -231,7 +231,7 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
}
{
auto count_out = new_part->data_part_storage->writeFile("count.txt", 4096, write_settings);
auto count_out = new_part->getDataPartStorage().writeFile("count.txt", 4096, write_settings);
HashingWriteBuffer count_out_hashing(*count_out);
writeIntText(rows_count, count_out_hashing);
count_out_hashing.next();
@ -245,7 +245,7 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
if (!new_part->ttl_infos.empty())
{
/// Write a file with ttl infos in json format.
auto out = new_part->data_part_storage->writeFile("ttl.txt", 4096, write_settings);
auto out = new_part->getDataPartStorage().writeFile("ttl.txt", 4096, write_settings);
HashingWriteBuffer out_hashing(*out);
new_part->ttl_infos.write(out_hashing);
checksums.files["ttl.txt"].file_size = out_hashing.count();
@ -256,7 +256,7 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
if (!new_part->getSerializationInfos().empty())
{
auto out = new_part->data_part_storage->writeFile(IMergeTreeDataPart::SERIALIZATION_FILE_NAME, 4096, write_settings);
auto out = new_part->getDataPartStorage().writeFile(IMergeTreeDataPart::SERIALIZATION_FILE_NAME, 4096, write_settings);
HashingWriteBuffer out_hashing(*out);
new_part->getSerializationInfos().writeJSON(out_hashing);
checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_size = out_hashing.count();
@ -267,7 +267,7 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
{
/// Write a file with a description of columns.
auto out = new_part->data_part_storage->writeFile("columns.txt", 4096, write_settings);
auto out = new_part->getDataPartStorage().writeFile("columns.txt", 4096, write_settings);
new_part->getColumns().writeText(*out);
out->preFinalize();
written_files.emplace_back(std::move(out));
@ -275,7 +275,7 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
if (default_codec != nullptr)
{
auto out = new_part->data_part_storage->writeFile(IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096, write_settings);
auto out = new_part->getDataPartStorage().writeFile(IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096, write_settings);
DB::writeText(queryToString(default_codec->getFullCodecDesc()), *out);
out->preFinalize();
written_files.emplace_back(std::move(out));
@ -288,7 +288,7 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
{
/// Write file with checksums.
auto out = new_part->data_part_storage->writeFile("checksums.txt", 4096, write_settings);
auto out = new_part->getDataPartStorage().writeFile("checksums.txt", 4096, write_settings);
checksums.write(*out);
out->preFinalize();
written_files.emplace_back(std::move(out));

View File

@ -54,16 +54,16 @@ public:
/// Finalize writing part and fill inner structures
/// If part is new and contains projections, they should be added before invoking this method.
Finalizer finalizePartAsync(
MergeTreeData::MutableDataPartPtr & new_part,
bool sync,
const NamesAndTypesList * total_columns_list = nullptr,
MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr);
const MergeTreeMutableDataPartPtr & new_part,
bool sync,
const NamesAndTypesList * total_columns_list = nullptr,
MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr);
void finalizePart(
MergeTreeData::MutableDataPartPtr & new_part,
bool sync,
const NamesAndTypesList * total_columns_list = nullptr,
MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr);
const MergeTreeMutableDataPartPtr & new_part,
bool sync,
const NamesAndTypesList * total_columns_list = nullptr,
MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr);
private:
/** If `permutation` is given, it rearranges the values in the columns when writing.
@ -73,8 +73,8 @@ private:
using WrittenFiles = std::vector<std::unique_ptr<WriteBufferFromFileBase>>;
WrittenFiles finalizePartOnDisk(
const MergeTreeData::DataPartPtr & new_part,
MergeTreeData::DataPart::Checksums & checksums);
const MergeTreeMutableDataPartPtr & new_part,
MergeTreeData::DataPart::Checksums & checksums);
NamesAndTypesList columns_list;
IMergeTreeDataPart::MinMaxIndex minmax_idx;

View File

@ -79,7 +79,7 @@ MergedColumnOnlyOutputStream::fillChecksums(
for (const String & removed_file : removed_files)
{
new_part->data_part_storage->removeFileIfExists(removed_file);
new_part->getDataPartStorage().removeFileIfExists(removed_file);
if (all_checksums.files.contains(removed_file))
all_checksums.files.erase(removed_file);

View File

@ -92,7 +92,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
/// Once we mutate part, we must reserve space on the same disk, because mutations can possibly create hardlinks.
/// Can throw an exception.
reserved_space = storage.reserveSpace(estimated_space_for_result, source_part->data_part_storage);
reserved_space = storage.reserveSpace(estimated_space_for_result, source_part->getDataPartStorage());
table_lock_holder = storage.lockForShare(
RWLockImpl::NO_QUERY, storage_settings_ptr->lock_acquire_timeout_for_background_operations);

View File

@ -626,7 +626,7 @@ void finalizeMutatedPart(
{
if (new_data_part->uuid != UUIDHelpers::Nil)
{
auto out = new_data_part->data_part_storage->writeFile(IMergeTreeDataPart::UUID_FILE_NAME, 4096, context->getWriteSettings());
auto out = new_data_part->getDataPartStorage().writeFile(IMergeTreeDataPart::UUID_FILE_NAME, 4096, context->getWriteSettings());
HashingWriteBuffer out_hashing(*out);
writeUUIDText(new_data_part->uuid, out_hashing);
new_data_part->checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_size = out_hashing.count();
@ -636,7 +636,7 @@ void finalizeMutatedPart(
if (execute_ttl_type != ExecuteTTLType::NONE)
{
/// Write a file with ttl infos in json format.
auto out_ttl = new_data_part->data_part_storage->writeFile("ttl.txt", 4096, context->getWriteSettings());
auto out_ttl = new_data_part->getDataPartStorage().writeFile("ttl.txt", 4096, context->getWriteSettings());
HashingWriteBuffer out_hashing(*out_ttl);
new_data_part->ttl_infos.write(out_hashing);
new_data_part->checksums.files["ttl.txt"].file_size = out_hashing.count();
@ -645,7 +645,7 @@ void finalizeMutatedPart(
if (!new_data_part->getSerializationInfos().empty())
{
auto out = new_data_part->data_part_storage->writeFile(IMergeTreeDataPart::SERIALIZATION_FILE_NAME, 4096, context->getWriteSettings());
auto out = new_data_part->getDataPartStorage().writeFile(IMergeTreeDataPart::SERIALIZATION_FILE_NAME, 4096, context->getWriteSettings());
HashingWriteBuffer out_hashing(*out);
new_data_part->getSerializationInfos().writeJSON(out_hashing);
new_data_part->checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_size = out_hashing.count();
@ -654,18 +654,18 @@ void finalizeMutatedPart(
{
/// Write file with checksums.
auto out_checksums = new_data_part->data_part_storage->writeFile("checksums.txt", 4096, context->getWriteSettings());
auto out_checksums = new_data_part->getDataPartStorage().writeFile("checksums.txt", 4096, context->getWriteSettings());
new_data_part->checksums.write(*out_checksums);
} /// close fd
{
auto out = new_data_part->data_part_storage->writeFile(IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096, context->getWriteSettings());
auto out = new_data_part->getDataPartStorage().writeFile(IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096, context->getWriteSettings());
DB::writeText(queryToString(codec->getFullCodecDesc()), *out);
} /// close fd
{
/// Write a file with a description of columns.
auto out_columns = new_data_part->data_part_storage->writeFile("columns.txt", 4096, context->getWriteSettings());
auto out_columns = new_data_part->getDataPartStorage().writeFile("columns.txt", 4096, context->getWriteSettings());
new_data_part->getColumns().writeText(*out_columns);
} /// close fd
@ -1141,7 +1141,7 @@ private:
void prepare()
{
ctx->new_data_part->data_part_storage->createDirectories();
ctx->new_data_part->getDataPartStorage().createDirectories();
/// Note: this is done before creating input streams, because otherwise data.data_parts_mutex
/// (which is locked in data.getTotalActiveSizeInBytes())
@ -1271,7 +1271,7 @@ private:
if (ctx->execute_ttl_type != ExecuteTTLType::NONE)
ctx->files_to_skip.insert("ttl.txt");
ctx->new_data_part->data_part_storage->createDirectories();
ctx->new_data_part->getDataPartStorage().createDirectories();
/// We should write version metadata on part creation to distinguish it from parts that were created without transaction.
TransactionID tid = ctx->txn ? ctx->txn->tid : Tx::PrehistoricTID;
@ -1282,7 +1282,7 @@ private:
NameSet hardlinked_files;
/// Create hardlinks for unchanged files
for (auto it = ctx->source_part->data_part_storage->iterate(); it->isValid(); it->next())
for (auto it = ctx->source_part->getDataPartStorage().iterate(); it->isValid(); it->next())
{
if (ctx->files_to_skip.contains(it->name()))
continue;
@ -1308,17 +1308,17 @@ private:
if (it->isFile())
{
ctx->new_data_part->data_part_storage->createHardLinkFrom(
*ctx->source_part->data_part_storage, it->name(), destination);
ctx->new_data_part->getDataPartStorage().createHardLinkFrom(
ctx->source_part->getDataPartStorage(), it->name(), destination);
hardlinked_files.insert(it->name());
}
else if (!endsWith(it->name(), ".tmp_proj")) // ignore projection tmp merge dir
{
// it's a projection part directory
ctx->new_data_part->data_part_storage->createProjection(destination);
ctx->new_data_part->getDataPartStorage().createProjection(destination);
auto projection_data_part_storage_src = ctx->source_part->data_part_storage->getProjection(destination);
auto projection_data_part_storage_dst = ctx->new_data_part->data_part_storage->getProjection(destination);
auto projection_data_part_storage_src = ctx->source_part->getDataPartStorage().getProjection(destination);
auto projection_data_part_storage_dst = ctx->new_data_part->getDataPartStorage().getProjection(destination);
for (auto p_it = projection_data_part_storage_src->iterate(); p_it->isValid(); p_it->next())
{

View File

@ -8,20 +8,10 @@
namespace DB
{
static std::unique_ptr<ReadBufferFromFileBase> openForReading(const DataPartStoragePtr & data_part_storage, const String & path)
{
size_t file_size = data_part_storage->getFileSize(path);
return data_part_storage->readFile(path, ReadSettings().adjustBufferSize(file_size), file_size, std::nullopt);
}
PartMetadataManagerOrdinary::PartMetadataManagerOrdinary(const IMergeTreeDataPart * part_) : IPartMetadataManager(part_)
{
}
std::unique_ptr<ReadBuffer> PartMetadataManagerOrdinary::read(const String & file_name) const
{
auto res = openForReading(part->data_part_storage, file_name);
size_t file_size = part->getDataPartStorage().getFileSize(file_name);
auto res = part->getDataPartStorage().readFile(file_name, ReadSettings().adjustBufferSize(file_size), file_size, std::nullopt);
if (isCompressedFromFileName(file_name))
return std::make_unique<CompressedReadBufferFromFile>(std::move(res));
@ -31,7 +21,7 @@ std::unique_ptr<ReadBuffer> PartMetadataManagerOrdinary::read(const String & fil
bool PartMetadataManagerOrdinary::exists(const String & file_name) const
{
return part->data_part_storage->exists(file_name);
return part->getDataPartStorage().exists(file_name);
}

View File

@ -8,7 +8,7 @@ namespace DB
class PartMetadataManagerOrdinary : public IPartMetadataManager
{
public:
explicit PartMetadataManagerOrdinary(const IMergeTreeDataPart * part_);
explicit PartMetadataManagerOrdinary(const IMergeTreeDataPart * part_) : IPartMetadataManager(part_) {}
~PartMetadataManagerOrdinary() override = default;

View File

@ -31,24 +31,24 @@ PartMetadataManagerWithCache::PartMetadataManagerWithCache(const IMergeTreeDataP
String PartMetadataManagerWithCache::getKeyFromFilePath(const String & file_path) const
{
return part->data_part_storage->getDiskName() + ":" + file_path;
return part->getDataPartStorage().getDiskName() + ":" + file_path;
}
String PartMetadataManagerWithCache::getFilePathFromKey(const String & key) const
{
return key.substr(part->data_part_storage->getDiskName().size() + 1);
return key.substr(part->getDataPartStorage().getDiskName().size() + 1);
}
std::unique_ptr<ReadBuffer> PartMetadataManagerWithCache::read(const String & file_name) const
{
String file_path = fs::path(part->data_part_storage->getRelativePath()) / file_name;
String file_path = fs::path(part->getDataPartStorage().getRelativePath()) / file_name;
String key = getKeyFromFilePath(file_path);
String value;
auto status = cache->get(key, value);
if (!status.ok())
{
ProfileEvents::increment(ProfileEvents::MergeTreeMetadataCacheMiss);
auto in = part->data_part_storage->readFile(file_name, {}, std::nullopt, std::nullopt);
auto in = part->getDataPartStorage().readFile(file_name, {}, std::nullopt, std::nullopt);
std::unique_ptr<ReadBuffer> reader;
if (!isCompressedFromFileName(file_name))
reader = std::move(in);
@ -67,7 +67,7 @@ std::unique_ptr<ReadBuffer> PartMetadataManagerWithCache::read(const String & fi
bool PartMetadataManagerWithCache::exists(const String & file_name) const
{
String file_path = fs::path(part->data_part_storage->getRelativePath()) / file_name;
String file_path = fs::path(part->getDataPartStorage().getRelativePath()) / file_name;
String key = getKeyFromFilePath(file_path);
String value;
auto status = cache->get(key, value);
@ -79,7 +79,7 @@ bool PartMetadataManagerWithCache::exists(const String & file_name) const
else
{
ProfileEvents::increment(ProfileEvents::MergeTreeMetadataCacheMiss);
return part->data_part_storage->exists(file_name);
return part->getDataPartStorage().exists(file_name);
}
}
@ -91,7 +91,7 @@ void PartMetadataManagerWithCache::deleteAll(bool include_projection)
String value;
for (const auto & file_name : file_names)
{
String file_path = fs::path(part->data_part_storage->getRelativePath()) / file_name;
String file_path = fs::path(part->getDataPartStorage().getRelativePath()) / file_name;
String key = getKeyFromFilePath(file_path);
auto status = cache->del(key);
if (!status.ok())
@ -119,10 +119,10 @@ void PartMetadataManagerWithCache::updateAll(bool include_projection)
String read_value;
for (const auto & file_name : file_names)
{
String file_path = fs::path(part->data_part_storage->getRelativePath()) / file_name;
if (!part->data_part_storage->exists(file_name))
String file_path = fs::path(part->getDataPartStorage().getRelativePath()) / file_name;
if (!part->getDataPartStorage().exists(file_name))
continue;
auto in = part->data_part_storage->readFile(file_name, {}, std::nullopt, std::nullopt);
auto in = part->getDataPartStorage().readFile(file_name, {}, std::nullopt, std::nullopt);
readStringUntilEOF(value, *in);
String key = getKeyFromFilePath(file_path);
@ -159,7 +159,7 @@ void PartMetadataManagerWithCache::assertAllDeleted(bool include_projection) con
file_name = fs::path(file_path).filename();
/// Metadata file belongs to current part
if (fs::path(part->data_part_storage->getRelativePath()) / file_name == file_path)
if (fs::path(part->getDataPartStorage().getRelativePath()) / file_name == file_path)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Data part {} with type {} with meta file {} still in cache",
@ -173,7 +173,7 @@ void PartMetadataManagerWithCache::assertAllDeleted(bool include_projection) con
const auto & projection_parts = part->getProjectionParts();
for (const auto & [projection_name, projection_part] : projection_parts)
{
if (fs::path(part->data_part_storage->getRelativePath()) / (projection_name + ".proj") / file_name == file_path)
if (fs::path(part->getDataPartStorage().getRelativePath()) / (projection_name + ".proj") / file_name == file_path)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
@ -190,7 +190,7 @@ void PartMetadataManagerWithCache::assertAllDeleted(bool include_projection) con
void PartMetadataManagerWithCache::getKeysAndCheckSums(Strings & keys, std::vector<uint128> & checksums) const
{
String prefix = getKeyFromFilePath(fs::path(part->data_part_storage->getRelativePath()) / "");
String prefix = getKeyFromFilePath(fs::path(part->getDataPartStorage().getRelativePath()) / "");
Strings values;
cache->getByPrefix(prefix, keys, values);
size_t size = keys.size();
@ -225,7 +225,7 @@ std::unordered_map<String, IPartMetadataManager::uint128> PartMetadataManagerWit
results.emplace(file_name, cache_checksums[i]);
/// File belongs to normal part
if (fs::path(part->data_part_storage->getRelativePath()) / file_name == file_path)
if (fs::path(part->getDataPartStorage().getRelativePath()) / file_name == file_path)
{
auto disk_checksum = part->getActualChecksumByFile(file_name);
if (disk_checksum != cache_checksums[i])

View File

@ -325,7 +325,7 @@ void ReplicatedMergeTreeSink::commitPart(
assertSessionIsNotExpired(zookeeper);
String temporary_part_relative_path = part->data_part_storage->getPartDirectory();
String temporary_part_relative_path = part->getDataPartStorage().getPartDirectory();
/// There is one case when we need to retry transaction in a loop.
/// But don't do it too many times - just as defensive measure.

View File

@ -1,3 +1,4 @@
#include "Storages/MergeTree/IDataPartStorage.h"
#include <algorithm>
#include <optional>
@ -46,7 +47,7 @@ bool isNotEnoughMemoryErrorCode(int code)
IMergeTreeDataPart::Checksums checkDataPart(
MergeTreeData::DataPartPtr data_part,
const DataPartStoragePtr & data_part_storage,
const IDataPartStorage & data_part_storage,
const NamesAndTypesList & columns_list,
const MergeTreeDataPartType & part_type,
const NameSet & files_without_checksums,
@ -64,13 +65,13 @@ IMergeTreeDataPart::Checksums checkDataPart(
NamesAndTypesList columns_txt;
{
auto buf = data_part_storage->readFile("columns.txt", {}, std::nullopt, std::nullopt);
auto buf = data_part_storage.readFile("columns.txt", {}, std::nullopt, std::nullopt);
columns_txt.readText(*buf);
assertEOF(*buf);
}
if (columns_txt != columns_list)
throw Exception("Columns doesn't match in part " + data_part_storage->getFullPath()
throw Exception("Columns doesn't match in part " + data_part_storage.getFullPath()
+ ". Expected: " + columns_list.toString()
+ ". Found: " + columns_txt.toString(), ErrorCodes::CORRUPTED_DATA);
@ -78,9 +79,9 @@ IMergeTreeDataPart::Checksums checkDataPart(
IMergeTreeDataPart::Checksums checksums_data;
/// This function calculates checksum for both compressed and decompressed contents of compressed file.
auto checksum_compressed_file = [](const DataPartStoragePtr & data_part_storage_, const String & file_path)
auto checksum_compressed_file = [](const IDataPartStorage & data_part_storage_, const String & file_path)
{
auto file_buf = data_part_storage_->readFile(file_path, {}, std::nullopt, std::nullopt);
auto file_buf = data_part_storage_.readFile(file_path, {}, std::nullopt, std::nullopt);
HashingReadBuffer compressed_hashing_buf(*file_buf);
CompressedReadBuffer uncompressing_buf(compressed_hashing_buf);
HashingReadBuffer uncompressed_hashing_buf(uncompressing_buf);
@ -96,9 +97,9 @@ IMergeTreeDataPart::Checksums checkDataPart(
auto ratio_of_defaults = data_part->storage.getSettings()->ratio_of_defaults_for_sparse_serialization;
SerializationInfoByName serialization_infos(columns_txt, SerializationInfo::Settings{ratio_of_defaults, false});
if (data_part_storage->exists(IMergeTreeDataPart::SERIALIZATION_FILE_NAME))
if (data_part_storage.exists(IMergeTreeDataPart::SERIALIZATION_FILE_NAME))
{
auto serialization_file = data_part_storage->readFile(IMergeTreeDataPart::SERIALIZATION_FILE_NAME, {}, std::nullopt, std::nullopt);
auto serialization_file = data_part_storage.readFile(IMergeTreeDataPart::SERIALIZATION_FILE_NAME, {}, std::nullopt, std::nullopt);
serialization_infos.readJSON(*serialization_file);
}
@ -114,7 +115,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
/// It also calculates checksum of projections.
auto checksum_file = [&](const String & file_name)
{
if (data_part_storage->isDirectory(file_name) && endsWith(file_name, ".proj"))
if (data_part_storage.isDirectory(file_name) && endsWith(file_name, ".proj"))
{
auto projection_name = file_name.substr(0, file_name.size() - sizeof(".proj") + 1);
auto pit = data_part->getProjectionParts().find(projection_name);
@ -129,7 +130,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
const auto & projection = pit->second;
IMergeTreeDataPart::Checksums projection_checksums_data;
auto projection_part_storage = data_part_storage->getProjection(file_name);
auto projection_part_storage = data_part_storage.getProjection(file_name);
if (projection->getType() == MergeTreeDataPartType::Compact)
{
@ -148,7 +149,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
[&](const ISerialization::SubstreamPath & substream_path)
{
String projection_file_name = ISerialization::getFileNameForStream(projection_column, substream_path) + ".bin";
projection_checksums_data.files[projection_file_name] = checksum_compressed_file(projection_part_storage, projection_file_name);
projection_checksums_data.files[projection_file_name] = checksum_compressed_file(*projection_part_storage, projection_file_name);
});
}
}
@ -183,7 +184,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
}
else
{
projection_checksums_data.files[projection_file_name] = checksum_compressed_file(projection_part_storage, projection_file_name);
projection_checksums_data.files[projection_file_name] = checksum_compressed_file(*projection_part_storage, projection_file_name);
}
}
}
@ -195,7 +196,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
}
else
{
auto file_buf = data_part_storage->readFile(file_name, {}, std::nullopt, std::nullopt);
auto file_buf = data_part_storage.readFile(file_name, {}, std::nullopt, std::nullopt);
HashingReadBuffer hashing_buf(*file_buf);
hashing_buf.ignoreAll();
checksums_data.files[file_name] = IMergeTreeDataPart::Checksums::Checksum(hashing_buf.count(), hashing_buf.getHash());
@ -224,21 +225,21 @@ IMergeTreeDataPart::Checksums checkDataPart(
}
else
{
throw Exception("Unknown type in part " + data_part_storage->getFullPath(), ErrorCodes::UNKNOWN_PART_TYPE);
throw Exception("Unknown type in part " + data_part_storage.getFullPath(), ErrorCodes::UNKNOWN_PART_TYPE);
}
/// Checksums from the rest files listed in checksums.txt. May be absent. If present, they are subsequently compared with the actual data checksums.
IMergeTreeDataPart::Checksums checksums_txt;
if (require_checksums || data_part_storage->exists("checksums.txt"))
if (require_checksums || data_part_storage.exists("checksums.txt"))
{
auto buf = data_part_storage->readFile("checksums.txt", {}, std::nullopt, std::nullopt);
auto buf = data_part_storage.readFile("checksums.txt", {}, std::nullopt, std::nullopt);
checksums_txt.read(*buf);
assertEOF(*buf);
}
const auto & checksum_files_txt = checksums_txt.files;
for (auto it = data_part_storage->iterate(); it->isValid(); it->next())
for (auto it = data_part_storage.iterate(); it->isValid(); it->next())
{
const String & file_name = it->name();
auto checksum_it = checksums_data.files.find(file_name);
@ -285,7 +286,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
return checkDataPart(
data_part,
data_part->data_part_storage,
data_part->getDataPartStorage(),
data_part->getColumns(),
data_part->getType(),
data_part->getFileNamesWithoutChecksums(),

View File

@ -378,7 +378,9 @@ CurrentlyMergingPartsTagger::CurrentlyMergingPartsTagger(
/// if we mutate part, than we should reserve space on the same disk, because mutations possible can create hardlinks
if (is_mutation)
reserved_space = storage.tryReserveSpace(total_size, future_part->parts[0]->data_part_storage);
{
reserved_space = storage.tryReserveSpace(total_size, future_part->parts[0]->getDataPartStorage());
}
else
{
IMergeTreeDataPart::TTLInfos ttl_infos;
@ -386,7 +388,7 @@ CurrentlyMergingPartsTagger::CurrentlyMergingPartsTagger(
for (auto & part_ptr : future_part->parts)
{
ttl_infos.update(part_ptr->ttl_infos);
max_volume_index = std::max(max_volume_index, part_ptr->data_part_storage->getVolumeIndex(*storage.getStoragePolicy()));
max_volume_index = std::max(max_volume_index, part_ptr->getDataPartStorage().getVolumeIndex(*storage.getStoragePolicy()));
}
reserved_space = storage.balancedReservation(
@ -1474,7 +1476,7 @@ void StorageMergeTree::dropPartsImpl(DataPartsVector && parts_to_remove, bool de
/// NOTE: no race with background cleanup until we hold pointers to parts
for (const auto & part : parts_to_remove)
{
LOG_INFO(log, "Detaching {}", part->data_part_storage->getPartDirectory());
LOG_INFO(log, "Detaching {}", part->getDataPartStorage().getPartDirectory());
part->makeCloneInDetached("", metadata_snapshot);
}
}
@ -1739,14 +1741,14 @@ CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_
/// If the checksums file is not present, calculate the checksums and write them to disk.
String checksums_path = "checksums.txt";
String tmp_checksums_path = "checksums.txt.tmp";
if (part->isStoredOnDisk() && !part->data_part_storage->exists(checksums_path))
if (part->isStoredOnDisk() && !part->getDataPartStorage().exists(checksums_path))
{
try
{
auto calculated_checksums = checkDataPart(part, false);
calculated_checksums.checkEqual(part->checksums, true);
part->data_part_storage->writeChecksums(part->checksums, local_context->getWriteSettings());
part->getDataPartStorage().writeChecksums(part->checksums, local_context->getWriteSettings());
part->checkMetadata();
results.emplace_back(part->name, true, "Checksums recounted and written to disk.");

View File

@ -1,5 +1,6 @@
#include <Core/Defines.h>
#include <cstddef>
#include <ranges>
#include "Common/hex.h"
#include <Common/Macros.h>
@ -1781,7 +1782,7 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry, bool need_to_che
}
bool StorageReplicatedMergeTree::executeFetchShared(
MutableDataPartStoragePtr StorageReplicatedMergeTree::executeFetchShared(
const String & source_replica,
const String & new_part_name,
const DiskPtr & disk,
@ -1790,7 +1791,7 @@ bool StorageReplicatedMergeTree::executeFetchShared(
if (source_replica.empty())
{
LOG_INFO(log, "No active replica has part {} on shared storage.", new_part_name);
return false;
return nullptr;
}
const auto storage_settings_ptr = getSettings();
@ -1847,7 +1848,7 @@ void StorageReplicatedMergeTree::executeDropRange(const LogEntry & entry)
/// If DETACH clone parts to detached/ directory
for (const auto & part : parts_to_remove)
{
LOG_INFO(log, "Detaching {}", part->data_part_storage->getPartDirectory());
LOG_INFO(log, "Detaching {}", part->getDataPartStorage().getPartDirectory());
part->makeCloneInDetached("", metadata_snapshot);
}
}
@ -2538,7 +2539,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
for (const auto & part : parts_to_remove_from_working_set)
{
LOG_INFO(log, "Detaching {}", part->data_part_storage->getPartDirectory());
LOG_INFO(log, "Detaching {}", part->getDataPartStorage().getPartDirectory());
part->makeCloneInDetached("clone", metadata_snapshot);
}
}
@ -3890,7 +3891,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
auto source_part = getActiveContainingPart(covered_part_info);
/// Fetch for zero-copy replication is cheap and straightforward, so we don't use local clone here
if (source_part && (!settings_ptr->allow_remote_fs_zero_copy_replication || !source_part->data_part_storage->supportZeroCopyReplication()))
if (source_part && (!settings_ptr->allow_remote_fs_zero_copy_replication || !source_part->getDataPartStorage().supportZeroCopyReplication()))
{
auto source_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(
source_part->getColumns(), source_part->checksums);
@ -4067,7 +4068,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
}
bool StorageReplicatedMergeTree::fetchExistsPart(
MutableDataPartStoragePtr StorageReplicatedMergeTree::fetchExistsPart(
const String & part_name,
const StorageMetadataPtr & metadata_snapshot,
const String & source_replica_path,
@ -4082,7 +4083,7 @@ bool StorageReplicatedMergeTree::fetchExistsPart(
LOG_DEBUG(log, "Part {} should be deleted after previous attempt before fetch", part->name);
/// Force immediate parts cleanup to delete the part that was left from the previous fetch attempt.
cleanup_thread.wakeup();
return false;
return nullptr;
}
{
@ -4090,7 +4091,7 @@ bool StorageReplicatedMergeTree::fetchExistsPart(
if (!currently_fetching_parts.insert(part_name).second)
{
LOG_DEBUG(log, "Part {} is already fetching right now", part_name);
return false;
return nullptr;
}
}
@ -4142,11 +4143,11 @@ bool StorageReplicatedMergeTree::fetchExistsPart(
{
part = get_part();
if (part->data_part_storage->getDiskName() != replaced_disk->getName())
throw Exception("Part " + part->name + " fetched on wrong disk " + part->data_part_storage->getDiskName(), ErrorCodes::LOGICAL_ERROR);
if (part->getDataPartStorage().getDiskName() != replaced_disk->getName())
throw Exception("Part " + part->name + " fetched on wrong disk " + part->getDataPartStorage().getDiskName(), ErrorCodes::LOGICAL_ERROR);
auto replaced_path = fs::path(replaced_part_path);
part->data_part_storage->rename(replaced_path.parent_path(), replaced_path.filename(), nullptr, true, false);
part->getDataPartStorage().rename(replaced_path.parent_path(), replaced_path.filename(), nullptr, true, false);
}
catch (const Exception & e)
{
@ -4155,7 +4156,7 @@ bool StorageReplicatedMergeTree::fetchExistsPart(
if (e.code() == ErrorCodes::DIRECTORY_ALREADY_EXISTS)
{
LOG_TRACE(log, "Not fetching part: {}", e.message());
return false;
return nullptr;
}
throw;
@ -4169,7 +4170,7 @@ bool StorageReplicatedMergeTree::fetchExistsPart(
ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches);
LOG_DEBUG(log, "Fetched part {} from {}", part_name, source_replica_path);
return true;
return part->getDataPartStoragePtr();
}
void StorageReplicatedMergeTree::startup()
@ -7409,7 +7410,7 @@ void StorageReplicatedMergeTree::checkBrokenDisks()
for (auto & part : *parts)
{
if (part->data_part_storage && part->data_part_storage->getDiskName() == disk_ptr->getName())
if (part->getDataPartStorage().getDiskName() == disk_ptr->getName())
broken_part_callback(part->name);
}
continue;
@ -7572,10 +7573,10 @@ void StorageReplicatedMergeTree::lockSharedData(const IMergeTreeDataPart & part,
{
auto settings = getSettings();
if (!part.data_part_storage || !part.isStoredOnDisk() || !settings->allow_remote_fs_zero_copy_replication)
if (!part.isStoredOnDisk() || !settings->allow_remote_fs_zero_copy_replication)
return;
if (!part.data_part_storage->supportZeroCopyReplication())
if (!part.getDataPartStorage().supportZeroCopyReplication())
return;
zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper();
@ -7586,7 +7587,7 @@ void StorageReplicatedMergeTree::lockSharedData(const IMergeTreeDataPart & part,
boost::replace_all(id, "/", "_");
Strings zc_zookeeper_paths = getZeroCopyPartPath(
*getSettings(), part.data_part_storage->getDiskType(), getTableSharedID(),
*getSettings(), part.getDataPartStorage().getDiskType(), getTableSharedID(),
part.name, zookeeper_path);
String path_to_set_hardlinked_files;
@ -7595,7 +7596,7 @@ void StorageReplicatedMergeTree::lockSharedData(const IMergeTreeDataPart & part,
if (hardlinked_files.has_value() && !hardlinked_files->hardlinks_from_source_part.empty())
{
path_to_set_hardlinked_files = getZeroCopyPartPath(
*getSettings(), part.data_part_storage->getDiskType(), hardlinked_files->source_table_shared_id,
*getSettings(), part.getDataPartStorage().getDiskType(), hardlinked_files->source_table_shared_id,
hardlinked_files->source_part_name, zookeeper_path)[0];
hardlinks = hardlinked_files->hardlinks_from_source_part;
@ -7619,25 +7620,22 @@ std::pair<bool, NameSet> StorageReplicatedMergeTree::unlockSharedData(const IMer
if (!settings->allow_remote_fs_zero_copy_replication)
return std::make_pair(true, NameSet{});
if (!part.data_part_storage)
LOG_WARNING(log, "Datapart storage for part {} (temp: {}) is not initialzied", part.name, part.is_temp);
if (!part.data_part_storage || !part.isStoredOnDisk())
if (!part.isStoredOnDisk())
{
LOG_TRACE(log, "Part {} is not stored on disk, blobs can be removed", part.name);
return std::make_pair(true, NameSet{});
}
if (!part.data_part_storage || !part.data_part_storage->supportZeroCopyReplication())
if (!part.getDataPartStorage().supportZeroCopyReplication())
{
LOG_TRACE(log, "Part {} is not stored on zero-copy replicated disk, blobs can be removed", part.name);
return std::make_pair(true, NameSet{});
}
/// If part is temporary refcount file may be absent
if (part.data_part_storage->exists(IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK))
if (part.getDataPartStorage().exists(IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK))
{
auto ref_count = part.data_part_storage->getRefCount(IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK);
auto ref_count = part.getDataPartStorage().getRefCount(IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK);
if (ref_count > 0) /// Keep part shard info for frozen backups
{
LOG_TRACE(log, "Part {} has more than zero local references ({}), blobs cannot be removed", part.name, ref_count);
@ -7675,7 +7673,7 @@ std::pair<bool, NameSet> StorageReplicatedMergeTree::unlockSharedData(const IMer
return unlockSharedDataByID(
part.getUniqueId(), getTableSharedID(), part.name, replica_name,
part.data_part_storage->getDiskType(), zookeeper, *getSettings(), log, zookeeper_path, format_version);
part.getDataPartStorage().getDiskType(), zookeeper, *getSettings(), log, zookeeper_path, format_version);
}
namespace
@ -7874,7 +7872,7 @@ std::pair<bool, NameSet> StorageReplicatedMergeTree::unlockSharedDataByID(
}
bool StorageReplicatedMergeTree::tryToFetchIfShared(
MutableDataPartStoragePtr StorageReplicatedMergeTree::tryToFetchIfShared(
const IMergeTreeDataPart & part,
const DiskPtr & disk,
const String & path)
@ -7882,13 +7880,13 @@ bool StorageReplicatedMergeTree::tryToFetchIfShared(
const auto settings = getSettings();
auto data_source_description = disk->getDataSourceDescription();
if (!(disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication))
return false;
return nullptr;
String replica = getSharedDataReplica(part, data_source_description.type);
/// We can't fetch part when none replicas have this part on a same type remote disk
if (replica.empty())
return false;
return nullptr;
return executeFetchShared(replica, part.name, disk, path);
}
@ -8160,7 +8158,7 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
/// The name could be non-unique in case of stale files from previous runs.
if (data_part_storage->exists())
{
LOG_WARNING(log, "Removing old temporary directory {}", new_data_part->data_part_storage->getFullPath());
LOG_WARNING(log, "Removing old temporary directory {}", new_data_part->getDataPartStorage().getFullPath());
data_part_storage->removeRecursive();
}

View File

@ -263,7 +263,7 @@ public:
bool canExecuteFetch(const ReplicatedMergeTreeLogEntry & entry, String & disable_reason) const;
/// Fetch part only when it stored on shared storage like S3
bool executeFetchShared(const String & source_replica, const String & new_part_name, const DiskPtr & disk, const String & path);
MutableDataPartStoragePtr executeFetchShared(const String & source_replica, const String & new_part_name, const DiskPtr & disk, const String & path);
/// Lock part in zookeeper for use shared data in several nodes
void lockSharedData(const IMergeTreeDataPart & part, bool replace_existing_lock, std::optional<HardlinkedFiles> hardlinked_files) const override;
@ -283,7 +283,7 @@ public:
const String & zookeeper_path_old, MergeTreeDataFormatVersion data_format_version);
/// Fetch part only if some replica has it on shared storage like S3
bool tryToFetchIfShared(const IMergeTreeDataPart & part, const DiskPtr & disk, const String & path) override;
MutableDataPartStoragePtr tryToFetchIfShared(const IMergeTreeDataPart & part, const DiskPtr & disk, const String & path) override;
/// Get best replica having this partition on a same type remote disk
String getSharedDataReplica(const IMergeTreeDataPart & part, DataSourceType data_source_type) const;
@ -682,7 +682,7 @@ private:
* Used for replace local part on the same s3-shared part in hybrid storage.
* Returns false if part is already fetching right now.
*/
bool fetchExistsPart(
MutableDataPartStoragePtr fetchExistsPart(
const String & part_name,
const StorageMetadataPtr & metadata_snapshot,
const String & replica_path,

View File

@ -198,9 +198,9 @@ void StorageSystemParts::processNextStorage(
if (part->isStoredOnDisk())
{
if (columns_mask[src_index++])
columns[res_index++]->insert(part->data_part_storage->getDiskName());
columns[res_index++]->insert(part->getDataPartStorage().getDiskName());
if (columns_mask[src_index++])
columns[res_index++]->insert(part->data_part_storage->getFullPath());
columns[res_index++]->insert(part->getDataPartStorage().getFullPath());
}
else
{

View File

@ -190,9 +190,9 @@ void StorageSystemPartsColumns::processNextStorage(
if (columns_mask[src_index++])
columns[res_index++]->insert(info.engine);
if (columns_mask[src_index++])
columns[res_index++]->insert(part->data_part_storage->getDiskName());
columns[res_index++]->insert(part->getDataPartStorage().getDiskName());
if (columns_mask[src_index++])
columns[res_index++]->insert(part->data_part_storage->getFullPath());
columns[res_index++]->insert(part->getDataPartStorage().getFullPath());
if (columns_mask[src_index++])
columns[res_index++]->insert(column.name);

View File

@ -200,9 +200,9 @@ void StorageSystemProjectionParts::processNextStorage(
if (part->isStoredOnDisk())
{
if (columns_mask[src_index++])
columns[res_index++]->insert(part->data_part_storage->getDiskName());
columns[res_index++]->insert(part->getDataPartStorage().getDiskName());
if (columns_mask[src_index++])
columns[res_index++]->insert(part->data_part_storage->getFullPath());
columns[res_index++]->insert(part->getDataPartStorage().getFullPath());
}
else
{

View File

@ -211,9 +211,9 @@ void StorageSystemProjectionPartsColumns::processNextStorage(
if (columns_mask[src_index++])
columns[res_index++]->insert(info.engine);
if (columns_mask[src_index++])
columns[res_index++]->insert(part->data_part_storage->getDiskName());
columns[res_index++]->insert(part->getDataPartStorage().getDiskName());
if (columns_mask[src_index++])
columns[res_index++]->insert(part->data_part_storage->getFullPath());
columns[res_index++]->insert(part->getDataPartStorage().getFullPath());
if (columns_mask[src_index++])
columns[res_index++]->insert(column.name);