mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 17:41:59 +00:00
Try refactor something (1)
This commit is contained in:
parent
4479b68980
commit
5a1392a8e3
@ -5,6 +5,7 @@
|
||||
#include <Compression/CompressionCodecMultiple.h>
|
||||
#include <Common/PODArray.h>
|
||||
#include <base/logger_useful.h>
|
||||
#include <Storages/MergeTree/IDataPartStorage.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -12,9 +13,9 @@ namespace DB
|
||||
|
||||
using Checksum = CityHash_v1_0_2::uint128;
|
||||
|
||||
CompressionCodecPtr getCompressionCodecForFile(const DiskPtr & disk, const String & relative_path)
|
||||
CompressionCodecPtr getCompressionCodecForFile(const DataPartStoragePtr & data_part_storage, const String & relative_path)
|
||||
{
|
||||
auto read_buffer = disk->readFile(relative_path);
|
||||
auto read_buffer = data_part_storage->readFile(relative_path);
|
||||
read_buffer->ignore(sizeof(Checksum));
|
||||
|
||||
UInt8 header_size = ICompressionCodec::getHeaderSize();
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
#include <Compression/ICompressionCodec.h>
|
||||
#include <Disks/IDisk.h>
|
||||
#include <Storages/MergeTree/IDataPartStorage.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -10,6 +11,6 @@ namespace DB
|
||||
/// clickhouse fashion (with checksums, headers for each block, etc). This
|
||||
/// method should be used as fallback when we cannot deduce compression codec
|
||||
/// from metadata.
|
||||
CompressionCodecPtr getCompressionCodecForFile(const DiskPtr & disk, const String & relative_path);
|
||||
CompressionCodecPtr getCompressionCodecForFile(const DataPartStoragePtr & data_part_storage, const String & relative_path);
|
||||
|
||||
}
|
||||
|
99
src/Storages/MergeTree/DataPartStorageOnDisk.cpp
Normal file
99
src/Storages/MergeTree/DataPartStorageOnDisk.cpp
Normal file
@ -0,0 +1,99 @@
|
||||
#include <Storages/MergeTree/DataPartStorageOnDisk.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataPartChecksum.h>
|
||||
#include <Disks/IVolume.h>
|
||||
#include <IO/WriteBufferFromFileBase.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
DataPartStorageOnDisk::DataPartStorageOnDisk(VolumePtr volume_, std::string root_path_, std::string relative_root_path_)
|
||||
: volume(std::move(volume_)), root_path(std::move(root_path_)), relative_root_path(std::move(relative_root_path_))
|
||||
{
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> DataPartStorageOnDisk::readFile(
|
||||
const std::string & path,
|
||||
const ReadSettings & settings,
|
||||
std::optional<size_t> read_hint,
|
||||
std::optional<size_t> file_size) const
|
||||
{
|
||||
return volume->getDisk()->readFile(fs::path(relative_root_path) / path, settings, read_hint, file_size);
|
||||
}
|
||||
|
||||
bool DataPartStorageOnDisk::exists(const std::string & path) const
|
||||
{
|
||||
return volume->getDisk()->exists(fs::path(relative_root_path) / path);
|
||||
}
|
||||
|
||||
bool DataPartStorageOnDisk::exists() const
|
||||
{
|
||||
return volume->getDisk()->exists(relative_root_path);
|
||||
}
|
||||
|
||||
size_t DataPartStorageOnDisk::getFileSize(const String & path) const
|
||||
{
|
||||
return volume->getDisk()->getFileSize(fs::path(relative_root_path) / path);
|
||||
}
|
||||
|
||||
DiskDirectoryIteratorPtr DataPartStorageOnDisk::iterate() const
|
||||
{
|
||||
return volume->getDisk()->iterateDirectory(relative_root_path);
|
||||
}
|
||||
|
||||
DiskDirectoryIteratorPtr DataPartStorageOnDisk::iterateDirectory(const String & path) const
|
||||
{
|
||||
return volume->getDisk()->iterateDirectory(fs::path(relative_root_path) / path);
|
||||
}
|
||||
|
||||
DataPartStoragePtr DataPartStorageOnDisk::getProjection(const std::string & name) const
|
||||
{
|
||||
return std::make_shared<DataPartStorageOnDisk>(volume, fs::path(relative_root_path) / name);
|
||||
}
|
||||
|
||||
static UInt64 calculateTotalSizeOnDiskImpl(const DiskPtr & disk, const String & from)
|
||||
{
|
||||
if (disk->isFile(from))
|
||||
return disk->getFileSize(from);
|
||||
std::vector<std::string> files;
|
||||
disk->listFiles(from, files);
|
||||
UInt64 res = 0;
|
||||
for (const auto & file : files)
|
||||
res += calculateTotalSizeOnDiskImpl(disk, fs::path(from) / file);
|
||||
return res;
|
||||
}
|
||||
|
||||
UInt64 DataPartStorageOnDisk::calculateTotalSizeOnDisk() const
|
||||
{
|
||||
return calculateTotalSizeOnDiskImpl(volume->getDisk(), relative_root_path);
|
||||
}
|
||||
|
||||
void DataPartStorageOnDisk::writeChecksums(MergeTreeDataPartChecksums & checksums) const
|
||||
{
|
||||
std::string path = fs::path(relative_root_path) / "checksums.txt";
|
||||
|
||||
{
|
||||
auto out = volume->getDisk()->writeFile(path + ".tmp", 4096);
|
||||
checksums.write(*out);
|
||||
}
|
||||
|
||||
volume->getDisk()->moveFile(path + ".tmp", path);
|
||||
}
|
||||
|
||||
void DataPartStorageOnDisk::writeColumns(NamesAndTypesList & columns) const
|
||||
{
|
||||
std::string path = fs::path(relative_root_path) / "columns.txt";
|
||||
|
||||
{
|
||||
auto buf = volume->getDisk()->writeFile(path + ".tmp", 4096);
|
||||
columns.writeText(*buf);
|
||||
}
|
||||
|
||||
volume->getDisk()->moveFile(path + ".tmp", path);
|
||||
}
|
||||
|
||||
std::string DataPartStorageOnDisk::getName() const
|
||||
{
|
||||
return volume->getDisk()->getName();
|
||||
}
|
||||
|
||||
}
|
50
src/Storages/MergeTree/DataPartStorageOnDisk.h
Normal file
50
src/Storages/MergeTree/DataPartStorageOnDisk.h
Normal file
@ -0,0 +1,50 @@
|
||||
#pragma once
|
||||
#include <Storages/MergeTree/IDataPartStorage.h>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class IVolume;
|
||||
using VolumePtr = std::shared_ptr<IVolume>;
|
||||
|
||||
|
||||
class DataPartStorageOnDisk final : public IDataPartStorage
|
||||
{
|
||||
public:
|
||||
explicit DataPartStorageOnDisk(VolumePtr volume_, std::string root_path_, std::string relative_root_path_);
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> readFile(
|
||||
const std::string & path,
|
||||
const ReadSettings & settings,
|
||||
std::optional<size_t> read_hint,
|
||||
std::optional<size_t> file_size) const override;
|
||||
|
||||
bool exists(const std::string & path) const override;
|
||||
bool exists() const override;
|
||||
|
||||
size_t getFileSize(const std::string & path) const override;
|
||||
|
||||
DiskDirectoryIteratorPtr iterate() const override;
|
||||
DiskDirectoryIteratorPtr iterateDirectory(const std::string & path) const override;
|
||||
|
||||
std::string getFullPath() const override { return root_path; }
|
||||
std::string getFullRelativePath() const override { return relative_root_path; }
|
||||
|
||||
UInt64 calculateTotalSizeOnDisk() const override;
|
||||
|
||||
void writeChecksums(MergeTreeDataPartChecksums & checksums) const override;
|
||||
void writeColumns(NamesAndTypesList & columns) const override;
|
||||
|
||||
std::string getName() const override;
|
||||
|
||||
DataPartStoragePtr getProjection(const std::string & name) const override;
|
||||
|
||||
private:
|
||||
VolumePtr volume;
|
||||
std::string root_path;
|
||||
std::string relative_root_path;
|
||||
};
|
||||
|
||||
}
|
64
src/Storages/MergeTree/IDataPartStorage.h
Normal file
64
src/Storages/MergeTree/IDataPartStorage.h
Normal file
@ -0,0 +1,64 @@
|
||||
#pragma once
|
||||
#include <IO/ReadSettings.h>
|
||||
#include <base/types.h>
|
||||
#include <Core/NamesAndTypes.h>
|
||||
#include <optional>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class ReadBufferFromFileBase;
|
||||
|
||||
|
||||
class IDiskDirectoryIterator;
|
||||
using DiskDirectoryIteratorPtr = std::unique_ptr<IDiskDirectoryIterator>;
|
||||
|
||||
struct MergeTreeDataPartChecksums;
|
||||
|
||||
/// This is an abstraction of storage for data part files.
|
||||
/// Generally, it contains read-only methods from IDisk.
|
||||
class IDataPartStorage
|
||||
{
|
||||
private:
|
||||
public:
|
||||
virtual ~IDataPartStorage() = default;
|
||||
|
||||
/// Open the file for read and return ReadBufferFromFileBase object.
|
||||
virtual std::unique_ptr<ReadBufferFromFileBase> readFile(
|
||||
const std::string & path,
|
||||
const ReadSettings & settings,
|
||||
std::optional<size_t> read_hint,
|
||||
std::optional<size_t> file_size) const = 0;
|
||||
|
||||
virtual bool exists(const std::string & path) const = 0;
|
||||
virtual bool exists() const = 0;
|
||||
|
||||
virtual DiskDirectoryIteratorPtr iterate() const = 0;
|
||||
virtual DiskDirectoryIteratorPtr iterateDirectory(const std::string & path) const = 0;
|
||||
|
||||
virtual size_t getFileSize(const std::string & path) const = 0;
|
||||
|
||||
virtual std::string getFullPath() const = 0;
|
||||
virtual std::string getFullRelativePath() const = 0;
|
||||
|
||||
virtual UInt64 calculateTotalSizeOnDisk() const = 0;
|
||||
|
||||
/// Should remove it later
|
||||
virtual void writeChecksums(MergeTreeDataPartChecksums & checksums) const = 0;
|
||||
virtual void writeColumns(NamesAndTypesList & columns) const = 0;
|
||||
|
||||
/// Disk name
|
||||
virtual std::string getName() const = 0;
|
||||
|
||||
virtual std::shared_ptr<IDataPartStorage> getProjection(const std::string & name) const = 0;
|
||||
};
|
||||
|
||||
class IDataPartStorageBuilder
|
||||
{
|
||||
public:
|
||||
virtual ~IDataPartStorageBuilder() = default;
|
||||
};
|
||||
|
||||
using DataPartStoragePtr = std::shared_ptr<IDataPartStorage>;
|
||||
|
||||
}
|
@ -298,14 +298,14 @@ static void decrementTypeMetric(MergeTreeDataPartType type)
|
||||
IMergeTreeDataPart::IMergeTreeDataPart(
|
||||
const MergeTreeData & storage_,
|
||||
const String & name_,
|
||||
const VolumePtr & volume_,
|
||||
const DataPartStoragePtr & data_part_storage_,
|
||||
const std::optional<String> & relative_path_,
|
||||
Type part_type_,
|
||||
const IMergeTreeDataPart * parent_part_)
|
||||
: storage(storage_)
|
||||
, name(name_)
|
||||
, info(MergeTreePartInfo::fromPartName(name_, storage.format_version))
|
||||
, volume(parent_part_ ? parent_part_->volume : volume_)
|
||||
, data_part_storage(parent_part_ ? parent_part_->data_part_storage : data_part_storage_)
|
||||
, relative_path(relative_path_.value_or(name_))
|
||||
, index_granularity_info(storage_, part_type_)
|
||||
, part_type(part_type_)
|
||||
@ -326,14 +326,14 @@ IMergeTreeDataPart::IMergeTreeDataPart(
|
||||
const MergeTreeData & storage_,
|
||||
const String & name_,
|
||||
const MergeTreePartInfo & info_,
|
||||
const VolumePtr & volume_,
|
||||
const DataPartStoragePtr & data_part_storage_,
|
||||
const std::optional<String> & relative_path_,
|
||||
Type part_type_,
|
||||
const IMergeTreeDataPart * parent_part_)
|
||||
: storage(storage_)
|
||||
, name(name_)
|
||||
, info(info_)
|
||||
, volume(parent_part_ ? parent_part_->volume : volume_)
|
||||
, data_part_storage(parent_part_ ? parent_part_->data_part_storage : data_part_storage_)
|
||||
, relative_path(relative_path_.value_or(name_))
|
||||
, index_granularity_info(storage_, part_type_)
|
||||
, part_type(part_type_)
|
||||
@ -471,9 +471,9 @@ void IMergeTreeDataPart::removeIfNeeded()
|
||||
|
||||
try
|
||||
{
|
||||
auto path = getFullRelativePath();
|
||||
auto path = data_part_storage->getFullRelativePath();
|
||||
|
||||
if (!volume->getDisk()->exists(path))
|
||||
if (!data_part_storage->exists()) // path
|
||||
return;
|
||||
|
||||
if (is_temp)
|
||||
@ -605,26 +605,26 @@ String IMergeTreeDataPart::getColumnNameWithMinimumCompressedSize(const StorageS
|
||||
}
|
||||
|
||||
if (!minimum_size_column)
|
||||
throw Exception("Could not find a column of minimum size in MergeTree, part " + getFullPath(), ErrorCodes::LOGICAL_ERROR);
|
||||
throw Exception("Could not find a column of minimum size in MergeTree, part " + data_part_storage->getFullPath(), ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
return *minimum_size_column;
|
||||
}
|
||||
|
||||
String IMergeTreeDataPart::getFullPath() const
|
||||
{
|
||||
if (relative_path.empty())
|
||||
throw Exception("Part relative_path cannot be empty. It's bug.", ErrorCodes::LOGICAL_ERROR);
|
||||
// String IMergeTreeDataPart::getFullPath() const
|
||||
// {
|
||||
// if (relative_path.empty())
|
||||
// throw Exception("Part relative_path cannot be empty. It's bug.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
return fs::path(storage.getFullPathOnDisk(volume->getDisk())) / (parent_part ? parent_part->relative_path : "") / relative_path / "";
|
||||
}
|
||||
// return fs::path(storage.getFullPathOnDisk(volume->getDisk())) / (parent_part ? parent_part->relative_path : "") / relative_path / "";
|
||||
// }
|
||||
|
||||
String IMergeTreeDataPart::getFullRelativePath() const
|
||||
{
|
||||
if (relative_path.empty())
|
||||
throw Exception("Part relative_path cannot be empty. It's bug.", ErrorCodes::LOGICAL_ERROR);
|
||||
// String IMergeTreeDataPart::getFullRelativePath() const
|
||||
// {
|
||||
// if (relative_path.empty())
|
||||
// throw Exception("Part relative_path cannot be empty. It's bug.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
return fs::path(storage.relative_data_path) / (parent_part ? parent_part->relative_path : "") / relative_path / "";
|
||||
}
|
||||
// return fs::path(storage.relative_data_path) / (parent_part ? parent_part->relative_path : "") / relative_path / "";
|
||||
// }
|
||||
|
||||
void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency)
|
||||
{
|
||||
@ -698,10 +698,11 @@ void IMergeTreeDataPart::loadProjections(bool require_columns_checksums, bool ch
|
||||
auto metadata_snapshot = storage.getInMemoryMetadataPtr();
|
||||
for (const auto & projection : metadata_snapshot->projections)
|
||||
{
|
||||
String path = getFullRelativePath() + projection.name + ".proj";
|
||||
if (volume->getDisk()->exists(path))
|
||||
String path = /*getFullRelativePath() + */ projection.name + ".proj";
|
||||
if (data_part_storage->exists(path))
|
||||
{
|
||||
auto part = storage.createPart(projection.name, {"all", 0, 0, 0}, volume, projection.name + ".proj", this);
|
||||
auto projection_part_storage = data_part_storage->getProjection(projection.name + ".proj");
|
||||
auto part = storage.createPart(projection.name, {"all", 0, 0, 0}, projection_part_storage, this);
|
||||
part->loadColumnsChecksumsIndexes(require_columns_checksums, check_consistency);
|
||||
projection_parts.emplace(projection.name, std::move(part));
|
||||
}
|
||||
@ -742,7 +743,7 @@ void IMergeTreeDataPart::loadIndex()
|
||||
}
|
||||
|
||||
String index_name = "primary.idx";
|
||||
String index_path = fs::path(getFullRelativePath()) / index_name;
|
||||
String index_path = fs::path(data_part_storage->getFullRelativePath()) / index_name;
|
||||
auto index_file = metadata_manager->read(index_name);
|
||||
size_t marks_count = index_granularity.getMarksCount();
|
||||
|
||||
@ -764,7 +765,7 @@ void IMergeTreeDataPart::loadIndex()
|
||||
}
|
||||
|
||||
if (!index_file->eof())
|
||||
throw Exception("Index file " + fullPath(volume->getDisk(), index_path) + " is unexpectedly long", ErrorCodes::EXPECTED_END_OF_FILE);
|
||||
throw Exception("Index file " + index_path + " is unexpectedly long", ErrorCodes::EXPECTED_END_OF_FILE);
|
||||
|
||||
index.assign(std::make_move_iterator(loaded_index.begin()), std::make_move_iterator(loaded_index.end()));
|
||||
}
|
||||
@ -789,9 +790,9 @@ NameSet IMergeTreeDataPart::getFileNamesWithoutChecksums() const
|
||||
return {};
|
||||
|
||||
NameSet result = {"checksums.txt", "columns.txt"};
|
||||
String default_codec_path = fs::path(getFullRelativePath()) / DEFAULT_COMPRESSION_CODEC_FILE_NAME;
|
||||
//String default_codec_path = fs::path(getFullRelativePath()) / DEFAULT_COMPRESSION_CODEC_FILE_NAME;
|
||||
|
||||
if (volume->getDisk()->exists(default_codec_path))
|
||||
if (data_part_storage->exists(DEFAULT_COMPRESSION_CODEC_FILE_NAME))
|
||||
result.emplace(DEFAULT_COMPRESSION_CODEC_FILE_NAME);
|
||||
|
||||
return result;
|
||||
@ -806,7 +807,7 @@ void IMergeTreeDataPart::loadDefaultCompressionCodec()
|
||||
return;
|
||||
}
|
||||
|
||||
String path = fs::path(getFullRelativePath()) / DEFAULT_COMPRESSION_CODEC_FILE_NAME;
|
||||
String path = fs::path(data_part_storage->getFullRelativePath()) / DEFAULT_COMPRESSION_CODEC_FILE_NAME;
|
||||
bool exists = metadata_manager->exists(DEFAULT_COMPRESSION_CODEC_FILE_NAME);
|
||||
if (!exists)
|
||||
{
|
||||
@ -872,10 +873,10 @@ CompressionCodecPtr IMergeTreeDataPart::detectDefaultCompressionCodec() const
|
||||
{
|
||||
if (path_to_data_file.empty())
|
||||
{
|
||||
String candidate_path = fs::path(getFullRelativePath()) / (ISerialization::getFileNameForStream(part_column, substream_path) + ".bin");
|
||||
String candidate_path = /*fs::path(getFullRelativePath()) */ (ISerialization::getFileNameForStream(part_column, substream_path) + ".bin");
|
||||
|
||||
/// We can have existing, but empty .bin files. Example: LowCardinality(Nullable(...)) columns and column_name.dict.null.bin file.
|
||||
if (volume->getDisk()->exists(candidate_path) && volume->getDisk()->getFileSize(candidate_path) != 0)
|
||||
if (data_part_storage->exists(candidate_path) && data_part_storage->getFileSize(candidate_path) != 0)
|
||||
path_to_data_file = candidate_path;
|
||||
}
|
||||
});
|
||||
@ -886,7 +887,7 @@ CompressionCodecPtr IMergeTreeDataPart::detectDefaultCompressionCodec() const
|
||||
continue;
|
||||
}
|
||||
|
||||
result = getCompressionCodecForFile(volume->getDisk(), path_to_data_file);
|
||||
result = getCompressionCodecForFile(data_part_storage, path_to_data_file);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -911,7 +912,7 @@ void IMergeTreeDataPart::loadPartitionAndMinMaxIndex()
|
||||
}
|
||||
else
|
||||
{
|
||||
String path = getFullRelativePath();
|
||||
//String path = getFullRelativePath();
|
||||
if (!parent_part)
|
||||
partition.load(storage, metadata_manager);
|
||||
|
||||
@ -931,7 +932,7 @@ void IMergeTreeDataPart::loadPartitionAndMinMaxIndex()
|
||||
String calculated_partition_id = partition.getID(metadata_snapshot->getPartitionKey().sample_block);
|
||||
if (calculated_partition_id != info.partition_id)
|
||||
throw Exception(
|
||||
"While loading part " + getFullPath() + ": calculated partition ID: " + calculated_partition_id
|
||||
"While loading part " + data_part_storage->getFullPath() + ": calculated partition ID: " + calculated_partition_id
|
||||
+ " differs from partition ID in part name: " + info.partition_id,
|
||||
ErrorCodes::CORRUPTED_DATA);
|
||||
}
|
||||
@ -951,7 +952,7 @@ void IMergeTreeDataPart::appendFilesOfPartitionAndMinMaxIndex(Strings & files) c
|
||||
|
||||
void IMergeTreeDataPart::loadChecksums(bool require)
|
||||
{
|
||||
const String path = fs::path(getFullRelativePath()) / "checksums.txt";
|
||||
//const String path = fs::path(getFullRelativePath()) / "checksums.txt";
|
||||
bool exists = metadata_manager->exists("checksums.txt");
|
||||
if (exists)
|
||||
{
|
||||
@ -962,7 +963,7 @@ void IMergeTreeDataPart::loadChecksums(bool require)
|
||||
bytes_on_disk = checksums.getTotalSizeOnDisk();
|
||||
}
|
||||
else
|
||||
bytes_on_disk = calculateTotalSizeOnDisk(volume->getDisk(), getFullRelativePath());
|
||||
bytes_on_disk = data_part_storage->calculateTotalSizeOnDisk(); //calculateTotalSizeOnDisk(volume->getDisk(), getFullRelativePath());
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -974,13 +975,7 @@ void IMergeTreeDataPart::loadChecksums(bool require)
|
||||
LOG_WARNING(storage.log, "Checksums for part {} not found. Will calculate them from data on disk.", name);
|
||||
|
||||
checksums = checkDataPart(shared_from_this(), false);
|
||||
|
||||
{
|
||||
auto out = volume->getDisk()->writeFile(fs::path(getFullRelativePath()) / "checksums.txt.tmp", 4096);
|
||||
checksums.write(*out);
|
||||
}
|
||||
|
||||
volume->getDisk()->moveFile(fs::path(getFullRelativePath()) / "checksums.txt.tmp", fs::path(getFullRelativePath()) / "checksums.txt");
|
||||
data_part_storage->writeChecksums(checksums);
|
||||
|
||||
bytes_on_disk = checksums.getTotalSizeOnDisk();
|
||||
}
|
||||
@ -993,7 +988,7 @@ void IMergeTreeDataPart::appendFilesOfChecksums(Strings & files)
|
||||
|
||||
void IMergeTreeDataPart::loadRowsCount()
|
||||
{
|
||||
String path = fs::path(getFullRelativePath()) / "count.txt";
|
||||
//String path = fs::path(getFullRelativePath()) / "count.txt";
|
||||
|
||||
auto read_rows_count = [&]()
|
||||
{
|
||||
@ -1065,7 +1060,7 @@ void IMergeTreeDataPart::loadRowsCount()
|
||||
}
|
||||
else
|
||||
{
|
||||
if (volume->getDisk()->exists(path))
|
||||
if (data_part_storage->exists("count.txt"))
|
||||
{
|
||||
read_rows_count();
|
||||
return;
|
||||
@ -1164,7 +1159,7 @@ void IMergeTreeDataPart::appendFilesOfUUID(Strings & files)
|
||||
|
||||
void IMergeTreeDataPart::loadColumns(bool require)
|
||||
{
|
||||
String path = fs::path(getFullRelativePath()) / "columns.txt";
|
||||
String path = fs::path(data_part_storage->getFullRelativePath()) / "columns.txt";
|
||||
auto metadata_snapshot = storage.getInMemoryMetadataPtr();
|
||||
if (parent_part)
|
||||
metadata_snapshot = metadata_snapshot->projections.get(name).metadata;
|
||||
@ -1175,22 +1170,18 @@ void IMergeTreeDataPart::loadColumns(bool require)
|
||||
{
|
||||
/// We can get list of columns only from columns.txt in compact parts.
|
||||
if (require || part_type == Type::COMPACT)
|
||||
throw Exception("No columns.txt in part " + name + ", expected path " + path + " on drive " + volume->getDisk()->getName(),
|
||||
throw Exception("No columns.txt in part " + name + ", expected path " + path + " on drive " + data_part_storage->getName(),
|
||||
ErrorCodes::NO_FILE_IN_DATA_PART);
|
||||
|
||||
/// If there is no file with a list of columns, write it down.
|
||||
for (const NameAndTypePair & column : metadata_snapshot->getColumns().getAllPhysical())
|
||||
if (volume->getDisk()->exists(fs::path(getFullRelativePath()) / (getFileNameForColumn(column) + ".bin")))
|
||||
if (data_part_storage->exists(getFileNameForColumn(column) + ".bin"))
|
||||
loaded_columns.push_back(column);
|
||||
|
||||
if (columns.empty())
|
||||
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
|
||||
|
||||
{
|
||||
auto buf = volume->getDisk()->writeFile(path + ".tmp", 4096);
|
||||
loaded_columns.writeText(*buf);
|
||||
}
|
||||
volume->getDisk()->moveFile(path + ".tmp", path);
|
||||
data_part_storage->writeColumns(loaded_columns);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -1239,17 +1230,17 @@ bool IMergeTreeDataPart::shallParticipateInMerges(const StoragePolicyPtr & stora
|
||||
return !volume_ptr->areMergesAvoided();
|
||||
}
|
||||
|
||||
UInt64 IMergeTreeDataPart::calculateTotalSizeOnDisk(const DiskPtr & disk_, const String & from)
|
||||
{
|
||||
if (disk_->isFile(from))
|
||||
return disk_->getFileSize(from);
|
||||
std::vector<std::string> files;
|
||||
disk_->listFiles(from, files);
|
||||
UInt64 res = 0;
|
||||
for (const auto & file : files)
|
||||
res += calculateTotalSizeOnDisk(disk_, fs::path(from) / file);
|
||||
return res;
|
||||
}
|
||||
// UInt64 IMergeTreeDataPart::calculateTotalSizeOnDisk(const DataPartStoragePtr & data_part_storage_, const String & from)
|
||||
// {
|
||||
// if (data_part_storage_->isFile(from))
|
||||
// return data_part_storage_->getFileSize(from);
|
||||
// std::vector<std::string> files;
|
||||
// disk_->listFiles(from, files);
|
||||
// UInt64 res = 0;
|
||||
// for (const auto & file : files)
|
||||
// res += calculateTotalSizeOnDisk(data_part_storage_, fs::path(from) / file);
|
||||
// return res;
|
||||
// }
|
||||
|
||||
|
||||
void IMergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_new_dir_if_exists) const
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include <base/types.h>
|
||||
#include <Core/NamesAndTypes.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Storages/MergeTree/IDataPartStorage.h>
|
||||
#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
|
||||
#include <Storages/MergeTree/MergeTreeIndexGranularityInfo.h>
|
||||
#include <Storages/MergeTree/MergeTreeIndices.h>
|
||||
@ -68,7 +69,7 @@ public:
|
||||
const MergeTreeData & storage_,
|
||||
const String & name_,
|
||||
const MergeTreePartInfo & info_,
|
||||
const VolumePtr & volume,
|
||||
const DataPartStoragePtr & data_part_storage_,
|
||||
const std::optional<String> & relative_path,
|
||||
Type part_type_,
|
||||
const IMergeTreeDataPart * parent_part_);
|
||||
@ -76,7 +77,7 @@ public:
|
||||
IMergeTreeDataPart(
|
||||
const MergeTreeData & storage_,
|
||||
const String & name_,
|
||||
const VolumePtr & volume,
|
||||
const DataPartStoragePtr & data_part_storage_,
|
||||
const std::optional<String> & relative_path,
|
||||
Type part_type_,
|
||||
const IMergeTreeDataPart * parent_part_);
|
||||
@ -194,7 +195,7 @@ public:
|
||||
/// processed by multiple shards.
|
||||
UUID uuid = UUIDHelpers::Nil;
|
||||
|
||||
VolumePtr volume;
|
||||
DataPartStoragePtr data_part_storage;
|
||||
|
||||
/// A directory path (relative to storage's path) where part data is actually stored
|
||||
/// Examples: 'detached/tmp_fetch_<name>', 'tmp_<name>', '<name>'
|
||||
@ -338,10 +339,10 @@ public:
|
||||
size_t getFileSizeOrZero(const String & file_name) const;
|
||||
|
||||
/// Returns path to part dir relatively to disk mount point
|
||||
String getFullRelativePath() const;
|
||||
// String getFullRelativePath() const;
|
||||
|
||||
/// Returns full path to part dir
|
||||
String getFullPath() const;
|
||||
// String getFullPath() const;
|
||||
|
||||
/// Moves a part to detached/ directory and adds prefix to its name
|
||||
void renameToDetached(const String & prefix) const;
|
||||
|
@ -2273,14 +2273,14 @@ MergeTreeDataPartType MergeTreeData::choosePartTypeOnDisk(size_t bytes_uncompres
|
||||
|
||||
MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(const String & name,
|
||||
MergeTreeDataPartType type, const MergeTreePartInfo & part_info,
|
||||
const VolumePtr & volume, const String & relative_path, const IMergeTreeDataPart * parent_part) const
|
||||
const DataPartStoragePtr & data_part_storage, const String & relative_path, const IMergeTreeDataPart * parent_part) const
|
||||
{
|
||||
if (type == MergeTreeDataPartType::COMPACT)
|
||||
return std::make_shared<MergeTreeDataPartCompact>(*this, name, part_info, volume, relative_path, parent_part);
|
||||
return std::make_shared<MergeTreeDataPartCompact>(*this, name, part_info, data_part_storage, parent_part);
|
||||
else if (type == MergeTreeDataPartType::WIDE)
|
||||
return std::make_shared<MergeTreeDataPartWide>(*this, name, part_info, volume, relative_path, parent_part);
|
||||
return std::make_shared<MergeTreeDataPartWide>(*this, name, part_info, data_part_storage, parent_part);
|
||||
else if (type == MergeTreeDataPartType::IN_MEMORY)
|
||||
return std::make_shared<MergeTreeDataPartInMemory>(*this, name, part_info, volume, relative_path, parent_part);
|
||||
return std::make_shared<MergeTreeDataPartInMemory>(*this, name, part_info, data_part_storage, parent_part);
|
||||
else
|
||||
throw Exception("Unknown type of part " + relative_path, ErrorCodes::UNKNOWN_PART_TYPE);
|
||||
}
|
||||
@ -2298,18 +2298,18 @@ static MergeTreeDataPartType getPartTypeFromMarkExtension(const String & mrk_ext
|
||||
}
|
||||
|
||||
MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(
|
||||
const String & name, const VolumePtr & volume, const String & relative_path, const IMergeTreeDataPart * parent_part) const
|
||||
const String & name, const DataPartStoragePtr & data_part_storage, const String & relative_path, const IMergeTreeDataPart * parent_part) const
|
||||
{
|
||||
return createPart(name, MergeTreePartInfo::fromPartName(name, format_version), volume, relative_path, parent_part);
|
||||
return createPart(name, MergeTreePartInfo::fromPartName(name, format_version), data_part_storage, relative_path, parent_part);
|
||||
}
|
||||
|
||||
MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(
|
||||
const String & name, const MergeTreePartInfo & part_info,
|
||||
const VolumePtr & volume, const String & relative_path, const IMergeTreeDataPart * parent_part) const
|
||||
const DataPartStoragePtr & data_part_storage, const String & relative_path, const IMergeTreeDataPart * parent_part) const
|
||||
{
|
||||
MergeTreeDataPartType type;
|
||||
auto full_path = fs::path(relative_data_path) / (parent_part ? parent_part->relative_path : "") / relative_path / "";
|
||||
auto mrk_ext = MergeTreeIndexGranularityInfo::getMarksExtensionFromFilesystem(volume->getDisk(), full_path);
|
||||
// auto full_path = fs::path(relative_data_path) / (parent_part ? parent_part->relative_path : "") / relative_path / "";
|
||||
auto mrk_ext = MergeTreeIndexGranularityInfo::getMarksExtensionFromFilesystem(data_part_storage);
|
||||
|
||||
if (mrk_ext)
|
||||
type = getPartTypeFromMarkExtension(*mrk_ext);
|
||||
@ -2319,7 +2319,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(
|
||||
type = choosePartTypeOnDisk(0, 0);
|
||||
}
|
||||
|
||||
return createPart(name, type, part_info, volume, relative_path, parent_part);
|
||||
return createPart(name, type, part_info, data_part_storage, relative_path, parent_part);
|
||||
}
|
||||
|
||||
void MergeTreeData::changeSettings(
|
||||
|
@ -229,15 +229,15 @@ public:
|
||||
/// After this method setColumns must be called
|
||||
MutableDataPartPtr createPart(const String & name,
|
||||
MergeTreeDataPartType type, const MergeTreePartInfo & part_info,
|
||||
const VolumePtr & volume, const String & relative_path, const IMergeTreeDataPart * parent_part = nullptr) const;
|
||||
const DataPartStoragePtr & data_part_storage, const IMergeTreeDataPart * parent_part = nullptr) const;
|
||||
|
||||
/// Create part, that already exists on filesystem.
|
||||
/// After this methods 'loadColumnsChecksumsIndexes' must be called.
|
||||
MutableDataPartPtr createPart(const String & name,
|
||||
const VolumePtr & volume, const String & relative_path, const IMergeTreeDataPart * parent_part = nullptr) const;
|
||||
const DataPartStoragePtr & data_part_storage, const IMergeTreeDataPart * parent_part = nullptr) const;
|
||||
|
||||
MutableDataPartPtr createPart(const String & name, const MergeTreePartInfo & part_info,
|
||||
const VolumePtr & volume, const String & relative_path, const IMergeTreeDataPart * parent_part = nullptr) const;
|
||||
const DataPartStoragePtr & data_part_storage, const IMergeTreeDataPart * parent_part = nullptr) const;
|
||||
|
||||
/// Auxiliary object to add a set of parts into the working set in two steps:
|
||||
/// * First, as PreActive parts (the parts are ready, but not yet in the active set).
|
||||
|
@ -13,11 +13,11 @@ namespace ErrorCodes
|
||||
extern const int UNKNOWN_PART_TYPE;
|
||||
}
|
||||
|
||||
std::optional<std::string> MergeTreeIndexGranularityInfo::getMarksExtensionFromFilesystem(const DiskPtr & disk, const String & path_to_part)
|
||||
std::optional<std::string> MergeTreeIndexGranularityInfo::getMarksExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage)
|
||||
{
|
||||
if (disk->exists(path_to_part))
|
||||
if (data_part_storage->exists())
|
||||
{
|
||||
for (DiskDirectoryIteratorPtr it = disk->iterateDirectory(path_to_part); it->isValid(); it->next())
|
||||
for (DiskDirectoryIteratorPtr it = data_part_storage->iterate(); it->isValid(); it->next())
|
||||
{
|
||||
const auto & ext = fs::path(it->path()).extension();
|
||||
if (ext == getNonAdaptiveMrkExtension()
|
||||
@ -46,9 +46,9 @@ MergeTreeIndexGranularityInfo::MergeTreeIndexGranularityInfo(const MergeTreeData
|
||||
setAdaptive(storage_settings->index_granularity_bytes);
|
||||
}
|
||||
|
||||
void MergeTreeIndexGranularityInfo::changeGranularityIfRequired(const DiskPtr & disk, const String & path_to_part)
|
||||
void MergeTreeIndexGranularityInfo::changeGranularityIfRequired(const DataPartStoragePtr & data_part_storage)
|
||||
{
|
||||
auto mrk_ext = getMarksExtensionFromFilesystem(disk, path_to_part);
|
||||
auto mrk_ext = getMarksExtensionFromFilesystem(data_part_storage);
|
||||
if (mrk_ext && *mrk_ext == getNonAdaptiveMrkExtension())
|
||||
setNonAdaptive();
|
||||
}
|
||||
|
@ -28,7 +28,7 @@ public:
|
||||
|
||||
MergeTreeIndexGranularityInfo(const MergeTreeData & storage, MergeTreeDataPartType type_);
|
||||
|
||||
void changeGranularityIfRequired(const DiskPtr & disk, const String & path_to_part);
|
||||
void changeGranularityIfRequired(const DataPartStoragePtr & data_part_storage);
|
||||
|
||||
String getMarksFilePath(const String & path_prefix) const
|
||||
{
|
||||
@ -37,7 +37,7 @@ public:
|
||||
|
||||
size_t getMarkSizeInBytes(size_t columns_num = 1) const;
|
||||
|
||||
static std::optional<std::string> getMarksExtensionFromFilesystem(const DiskPtr & disk, const String & path_to_part);
|
||||
static std::optional<std::string> getMarksExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage);
|
||||
|
||||
private:
|
||||
MergeTreeDataPartType type;
|
||||
|
Loading…
Reference in New Issue
Block a user