This commit is contained in:
Nikolai Kochetov 2022-04-21 19:19:13 +00:00
parent bcbab2ead8
commit 9133e398b8
17 changed files with 398 additions and 191 deletions

View File

@ -95,30 +95,42 @@ size_t DataPartStorageOnDisk::getFileSize(const String & path) const
return volume->getDisk()->getFileSize(fs::path(root_path) / part_dir / path);
}
DiskDirectoryIteratorPtr DataPartStorageOnDisk::iterate() const
UInt32 DataPartStorageOnDisk::getRefCount(const String & path) const
{
return volume->getDisk()->iterateDirectory(fs::path(root_path) / part_dir);
return volume->getDisk()->getRefCount(fs::path(root_path) / part_dir / path);
}
DiskDirectoryIteratorPtr DataPartStorageOnDisk::iterateDirectory(const String & path) const
class DataPartStorageIteratorOnDisk final : public IDataPartStorageIterator
{
return volume->getDisk()->iterateDirectory(fs::path(root_path) / part_dir / path);
public:
DataPartStorageIteratorOnDisk(DiskPtr disk_, DiskDirectoryIteratorPtr it_)
: disk(std::move(disk_)), it(std::move(it_))
{
}
void next() override { it->next(); }
bool isValid() const override { return it->isValid(); }
bool isFile() const override { return isValid() && disk->isFile(it->path()); }
std::string name() const override { return it->name(); }
private:
DiskPtr disk;
DiskDirectoryIteratorPtr it;
};
DataPartStorageIteratorPtr DataPartStorageOnDisk::iterate() const
{
return std::make_unique<DataPartStorageIteratorOnDisk>(
volume->getDisk(),
volume->getDisk()->iterateDirectory(fs::path(root_path) / part_dir));
}
// namespace
// {
// static constexpr std::string_view non_checksum_files[] =
// {
// "checksums.txt",
// "columns.txt",
// "default_compression_codec.txt",
// "delete-on-destroy.txt",
// "txn_version.txt",
// };
// static constexpr std::span<std::string_view> projection_non_checksum_files(non_checksum_files, 4);
// static constexpr std::span<std::string_view> part_non_checksum_files(non_checksum_files, 5);
// }
DataPartStorageIteratorPtr DataPartStorageOnDisk::iterateDirectory(const String & path) const
{
return std::make_unique<DataPartStorageIteratorOnDisk>(
volume->getDisk(),
volume->getDisk()->iterateDirectory(fs::path(root_path) / part_dir / path));
}
void DataPartStorageOnDisk::remove(
bool keep_shared_data,
@ -244,6 +256,11 @@ DataPartStoragePtr DataPartStorageOnDisk::getProjection(const std::string & name
return std::make_shared<DataPartStorageOnDisk>(volume, std::string(fs::path(root_path) / part_dir), name);
}
DiskPtr DataPartStorageOnDisk::getDisk() const
{
return volume->getDisk();
}
static UInt64 calculateTotalSizeOnDiskImpl(const DiskPtr & disk, const String & from)
{
if (disk->isFile(from))
@ -281,28 +298,62 @@ std::string DataPartStorageOnDisk::getDiskPathForLogs() const
return volume->getDisk()->getPath();
}
void DataPartStorageOnDisk::writeChecksums(MergeTreeDataPartChecksums & checksums) const
void DataPartStorageOnDisk::writeChecksums(const MergeTreeDataPartChecksums & checksums) const
{
std::string path = fs::path(root_path) / part_dir / "checksums.txt";
try
{
auto out = volume->getDisk()->writeFile(path + ".tmp", 4096);
checksums.write(*out);
}
{
auto out = volume->getDisk()->writeFile(path + ".tmp", 4096);
checksums.write(*out);
}
volume->getDisk()->moveFile(path + ".tmp", path);
volume->getDisk()->moveFile(path + ".tmp", path);
}
catch (...)
{
try
{
if (volume->getDisk()->exists(path + ".tmp"))
volume->getDisk()->removeFile(path + ".tmp");
}
catch (...)
{
tryLogCurrentException("DataPartStorageOnDisk");
}
throw;
}
}
void DataPartStorageOnDisk::writeColumns(NamesAndTypesList & columns) const
void DataPartStorageOnDisk::writeColumns(const NamesAndTypesList & columns) const
{
std::string path = fs::path(root_path) / part_dir / "columns.txt";
try
{
auto buf = volume->getDisk()->writeFile(path + ".tmp", 4096);
columns.writeText(*buf);
}
{
auto buf = volume->getDisk()->writeFile(path + ".tmp", 4096);
columns.writeText(*buf);
}
volume->getDisk()->moveFile(path + ".tmp", path);
volume->getDisk()->moveFile(path + ".tmp", path);
}
catch (...)
{
try
{
if (volume->getDisk()->exists(path + ".tmp"))
volume->getDisk()->removeFile(path + ".tmp");
}
catch (...)
{
tryLogCurrentException("DataPartStorageOnDisk");
}
throw;
}
}
void DataPartStorageOnDisk::writeDeleteOnDestroyMarker(Poco::Logger * log) const
@ -406,6 +457,10 @@ std::string DataPartStorageOnDisk::getName() const
return volume->getDisk()->getName();
}
std::string DataPartStorageOnDisk::getDiskType() const
{
return toString(volume->getDisk()->getType());
}
void DataPartStorageOnDisk::backup(
TemporaryFilesOnDisks & temp_dirs,
@ -517,6 +572,30 @@ void DataPartStorageBuilderOnDisk::removeRecursive()
volume->getDisk()->removeRecursive(fs::path(root_path) / part_dir);
}
void DataPartStorageBuilderOnDisk::removeSharedRecursive(bool keep_in_remote_fs)
{
volume->getDisk()->removeSharedRecursive(fs::path(root_path) / part_dir, keep_in_remote_fs);
}
SyncGuardPtr DataPartStorageBuilderOnDisk::getDirectorySyncGuard() const
{
return volume->getDisk()->getDirectorySyncGuard(fs::path(root_path) / part_dir);
}
void DataPartStorageBuilderOnDisk::createHardLinkFrom(const IDataPartStorage & source, const std::string & from, const std::string & to) const
{
const auto * source_on_disk = typeid_cast<const DataPartStorageOnDisk *>(&source);
if (!source_on_disk)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot create hardlink from different storage. Expected DataPartStorageOnDisk, got {}",
typeid(source).name());
volume->getDisk()->createHardLink(
fs::path(source_on_disk->getFullRelativePath()) / from,
fs::path(root_path) / part_dir / to);
}
bool DataPartStorageBuilderOnDisk::exists() const
{
return volume->getDisk()->exists(fs::path(root_path) / part_dir);
@ -533,11 +612,21 @@ std::string DataPartStorageBuilderOnDisk::getFullPath() const
return fs::path(volume->getDisk()->getPath()) / root_path / part_dir;
}
std::string DataPartStorageBuilderOnDisk::getFullRelativePath() const
{
return fs::path(root_path) / part_dir;
}
void DataPartStorageBuilderOnDisk::createDirectories()
{
return volume->getDisk()->createDirectories(fs::path(root_path) / part_dir);
}
void DataPartStorageBuilderOnDisk::createProjection(const std::string & name)
{
return volume->getDisk()->createDirectory(fs::path(root_path) / part_dir / name);
}
ReservationPtr DataPartStorageBuilderOnDisk::reserve(UInt64 bytes)
{
auto res = volume->reserve(bytes);

View File

@ -27,9 +27,10 @@ public:
Poco::Timestamp getLastModified() const override;
size_t getFileSize(const std::string & path) const override;
UInt32 getRefCount(const String & path) const override;
DiskDirectoryIteratorPtr iterate() const override;
DiskDirectoryIteratorPtr iterateDirectory(const std::string & path) const override;
DataPartStorageIteratorPtr iterate() const override;
DataPartStorageIteratorPtr iterateDirectory(const std::string & path) const override;
void remove(
bool keep_shared_data,
@ -53,8 +54,8 @@ public:
bool isBroken() const override;
std::string getDiskPathForLogs() const override;
void writeChecksums(MergeTreeDataPartChecksums & checksums) const override;
void writeColumns(NamesAndTypesList & columns) const override;
void writeChecksums(const MergeTreeDataPartChecksums & checksums) const override;
void writeColumns(const NamesAndTypesList & columns) const override;
void writeDeleteOnDestroyMarker(Poco::Logger * log) const override;
void checkConsistency(const MergeTreeDataPartChecksums & checksums) const override;
@ -84,9 +85,12 @@ public:
void rename(const String & new_relative_path, Poco::Logger * log, bool remove_new_dir_if_exists, bool fsync) override;
std::string getName() const override;
std::string getDiskType() const override;
DataPartStoragePtr getProjection(const std::string & name) const override;
DiskPtr getDisk() const;
private:
VolumePtr volume;
std::string root_path;
@ -103,6 +107,7 @@ private:
class DataPartStorageBuilderOnDisk final : public IDataPartStorageBuilder
{
public:
DataPartStorageBuilderOnDisk(VolumePtr volume_, std::string root_path_, std::string part_dir_);
void setRelativePath(const std::string & path) override;
@ -111,9 +116,11 @@ class DataPartStorageBuilderOnDisk final : public IDataPartStorageBuilder
bool exists(const std::string & path) const override;
void createDirectories() override;
void createProjection(const std::string & name) override;
std::string getRelativePath() const override { return part_dir; }
std::string getFullPath() const override;
std::string getFullRelativePath() const override;
std::unique_ptr<ReadBufferFromFileBase> readFile(
const std::string & path,
@ -127,6 +134,11 @@ class DataPartStorageBuilderOnDisk final : public IDataPartStorageBuilder
void removeFile(const String & path) override;
void removeRecursive() override;
void removeSharedRecursive(bool keep_in_remote_fs) override;
SyncGuardPtr getDirectorySyncGuard() const override;
void createHardLinkFrom(const IDataPartStorage & source, const std::string & from, const std::string & to) const override;
ReservationPtr reserve(UInt64 bytes) override;

View File

@ -13,6 +13,7 @@
#include <Storages/StorageReplicatedMergeTree.h>
#include <Common/CurrentMetrics.h>
#include <Common/NetException.h>
#include <Storages/MergeTree/DataPartStorageOnDisk.h>
#include <Disks/IO/createReadBufferFromFileBase.h>
#include <base/scope_guard.h>
#include <Poco/Net/HTTPRequest.h>
@ -166,9 +167,8 @@ void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, Write
if (data_settings->allow_remote_fs_zero_copy_replication &&
client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_ZERO_COPY)
{
auto disk = part->volume->getDisk();
auto disk_type = toString(disk->getType());
if (disk->supportZeroCopyReplication() && std::find(capability.begin(), capability.end(), disk_type) != capability.end())
auto disk_type = part->data_part_storage->getDiskType();
if (part->data_part_storage->supportZeroCopyReplication() && std::find(capability.begin(), capability.end(), disk_type) != capability.end())
{
/// Send metadata if the receiver's capability covers the source disk type.
response.addCookie({"remote_fs_metadata", disk_type});
@ -259,7 +259,7 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDisk(
checksums.files[file_name] = {};
}
auto disk = part->volume->getDisk();
//auto disk = part->volume->getDisk();
MergeTreeData::DataPart::Checksums data_checksums;
for (const auto & [name, projection] : part->getProjectionParts())
{
@ -285,14 +285,14 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDisk(
{
String file_name = it.first;
String path = fs::path(part->getFullRelativePath()) / file_name;
//String path = fs::path(part->getFullRelativePath()) / file_name;
UInt64 size = disk->getFileSize(path);
UInt64 size = part->data_part_storage->getFileSize(file_name);
writeStringBinary(it.first, out);
writeBinary(size, out);
auto file_in = disk->readFile(path);
auto file_in = part->data_part_storage->readFile(file_name, {}, std::nullopt, std::nullopt);
HashingWriteBuffer hashing_out(out);
copyDataWithThrottler(*file_in, hashing_out, blocker.getCounter(), data.getSendsThrottler());
@ -300,7 +300,11 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDisk(
throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED);
if (hashing_out.count() != size)
throw Exception(ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART, "Unexpected size of file {}, expected {} got {}", path, hashing_out.count(), size);
throw Exception(
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART,
"Unexpected size of file {}, expected {} got {}",
std::string(fs::path(part->data_part_storage->getFullRelativePath()) / file_name),
hashing_out.count(), size);
writePODBinary(hashing_out.getHash(), out);
@ -314,7 +318,11 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDisk(
void Service::sendPartFromDiskRemoteMeta(const MergeTreeData::DataPartPtr & part, WriteBuffer & out)
{
auto disk = part->volume->getDisk();
const auto * data_part_storage_on_disk = typeid_cast<const DataPartStorageOnDisk *>(part->data_part_storage.get());
if (!data_part_storage_on_disk)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Storage '{}' doesn't support zero-copy replication", part->data_part_storage->getName());
auto disk = data_part_storage_on_disk->getDisk();
if (!disk->supportZeroCopyReplication())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Disk '{}' doesn't support zero-copy replication", disk->getName());
@ -328,7 +336,7 @@ void Service::sendPartFromDiskRemoteMeta(const MergeTreeData::DataPartPtr & part
std::vector<std::string> paths;
paths.reserve(checksums.files.size());
for (const auto & it : checksums.files)
paths.push_back(fs::path(part->getFullRelativePath()) / it.first);
paths.push_back(fs::path(part->data_part_storage->getFullRelativePath()) / it.first);
/// Serialized metadatadatas with zero ref counts.
auto metadatas = disk->getSerializedMetadata(paths);
@ -340,7 +348,7 @@ void Service::sendPartFromDiskRemoteMeta(const MergeTreeData::DataPartPtr & part
for (const auto & it : checksums.files)
{
const String & file_name = it.first;
String file_path_prefix = fs::path(part->getFullRelativePath()) / file_name;
String file_path_prefix = fs::path(part->data_part_storage->getFullRelativePath()) / file_name;
/// Just some additional checks
String metadata_file_path = fs::path(disk->getPath()) / file_path_prefix;
@ -571,8 +579,19 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
ThrottlerPtr throttler)
{
auto volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk, 0);
auto data_part_storage = std::make_shared<DataPartStorageOnDisk>(
volume,
data.getRelativeDataPath(),
part_name);
auto data_part_storage_builder = std::make_shared<DataPartStorageBuilderOnDisk>(
volume,
data.getRelativeDataPath(),
part_name);
MergeTreeData::MutableDataPartPtr new_data_part =
std::make_shared<MergeTreeDataPartInMemory>(data, part_name, volume);
std::make_shared<MergeTreeDataPartInMemory>(data, part_name, data_part_storage);
for (auto i = 0ul; i < projections; ++i)
{
@ -586,9 +605,12 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
auto block = block_in.read();
throttler->add(block.bytes());
auto projection_part_storage = data_part_storage->getProjection(projection_name + ".proj");
auto projection_part_storage_builder = data_part_storage_builder->getProjection(projection_name + ".proj");
MergeTreePartInfo new_part_info("all", 0, 0, 0);
MergeTreeData::MutableDataPartPtr new_projection_part =
std::make_shared<MergeTreeDataPartInMemory>(data, projection_name, new_part_info, volume, projection_name, new_data_part.get());
std::make_shared<MergeTreeDataPartInMemory>(data, projection_name, new_part_info, projection_part_storage, new_data_part.get());
new_projection_part->is_temp = false;
new_projection_part->setColumns(block.getNamesAndTypesList());
@ -598,6 +620,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
MergedBlockOutputStream part_out(
new_projection_part,
projection_part_storage_builder,
metadata_snapshot->projections.get(projection_name).metadata,
block.getNamesAndTypesList(),
{},
@ -624,7 +647,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
new_data_part->partition.create(metadata_snapshot, block, 0, context);
MergedBlockOutputStream part_out(
new_data_part, metadata_snapshot, block.getNamesAndTypesList(), {},
new_data_part, data_part_storage_builder, metadata_snapshot, block.getNamesAndTypesList(), {},
CompressionCodecFactory::instance().get("NONE", {}));
part_out.write(block);
@ -636,9 +659,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
void Fetcher::downloadBaseOrProjectionPartToDisk(
const String & replica_path,
const String & part_download_path,
DataPartStorageBuilderPtr & data_part_storage_builder,
bool sync,
DiskPtr disk,
PooledReadWriteBufferFromHTTP & in,
MergeTreeData::DataPart::Checksums & checksums,
ThrottlerPtr throttler) const
@ -656,14 +678,14 @@ void Fetcher::downloadBaseOrProjectionPartToDisk(
/// File must be inside "absolute_part_path" directory.
/// Otherwise malicious ClickHouse replica may force us to write to arbitrary path.
String absolute_file_path = fs::weakly_canonical(fs::path(part_download_path) / file_name);
if (!startsWith(absolute_file_path, fs::weakly_canonical(part_download_path).string()))
String absolute_file_path = fs::weakly_canonical(fs::path(data_part_storage_builder->getFullRelativePath()) / file_name);
if (!startsWith(absolute_file_path, fs::weakly_canonical(data_part_storage_builder->getFullRelativePath()).string()))
throw Exception(ErrorCodes::INSECURE_PATH,
"File path ({}) doesn't appear to be inside part path ({}). "
"This may happen if we are trying to download part from malicious replica or logical error.",
absolute_file_path, part_download_path);
absolute_file_path, data_part_storage_builder->getFullRelativePath());
auto file_out = disk->writeFile(fs::path(part_download_path) / file_name);
auto file_out = data_part_storage_builder->writeFile(file_name, file_size);
HashingWriteBuffer hashing_out(*file_out);
copyDataWithThrottler(in, hashing_out, file_size, blocker.getCounter(), throttler);
@ -672,7 +694,7 @@ void Fetcher::downloadBaseOrProjectionPartToDisk(
/// NOTE The is_cancelled flag also makes sense to check every time you read over the network,
/// performing a poll with a not very large timeout.
/// And now we check it only between read chunks (in the `copyData` function).
disk->removeRecursive(part_download_path);
data_part_storage_builder->removeRecursive();
throw Exception("Fetching of part was cancelled", ErrorCodes::ABORTED);
}
@ -682,7 +704,7 @@ void Fetcher::downloadBaseOrProjectionPartToDisk(
if (expected_hash != hashing_out.getHash())
throw Exception(ErrorCodes::CHECKSUM_DOESNT_MATCH,
"Checksum mismatch for file {} transferred from {}",
fullPath(disk, (fs::path(part_download_path) / file_name).string()),
(fs::path(data_part_storage_builder->getFullPath()) / file_name).string(),
replica_path);
if (file_name != "checksums.txt" &&
@ -717,21 +739,33 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
|| std::string::npos != part_name.find_first_of("/."))
throw Exception("Logical error: tmp_prefix and part_name cannot be empty or contain '.' or '/' characters.", ErrorCodes::LOGICAL_ERROR);
String part_relative_path = String(to_detached ? "detached/" : "") + tmp_prefix + part_name;
String part_download_path = data.getRelativeDataPath() + part_relative_path + "/";
String part_dir = tmp_prefix + part_name;
String part_relative_path = data.getRelativeDataPath() + String(to_detached ? "detached/" : "");
if (disk->exists(part_download_path))
auto volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk, 0);
auto data_part_storage = std::make_shared<DataPartStorageOnDisk>(
volume,
part_relative_path,
part_dir);
DataPartStorageBuilderPtr data_part_storage_builder = std::make_shared<DataPartStorageBuilderOnDisk>(
volume,
part_relative_path,
part_dir);
if (data_part_storage_builder->exists())
{
LOG_WARNING(log, "Directory {} already exists, probably result of a failed fetch. Will remove it before fetching part.",
fullPath(disk, part_download_path));
disk->removeRecursive(part_download_path);
data_part_storage_builder->getFullPath());
data_part_storage_builder->removeRecursive();
}
disk->createDirectories(part_download_path);
data_part_storage_builder->createDirectories();
SyncGuardPtr sync_guard;
if (data.getSettings()->fsync_part_directory)
sync_guard = disk->getDirectorySyncGuard(part_download_path);
sync_guard = disk->getDirectorySyncGuard(data_part_storage->getRelativePath());
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedFetch};
@ -740,19 +774,22 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
String projection_name;
readStringBinary(projection_name, in);
MergeTreeData::DataPart::Checksums projection_checksum;
disk->createDirectories(part_download_path + projection_name + ".proj/");
auto projection_part_storage = data_part_storage->getProjection(projection_name + ".proj");
auto projection_part_storage_builder = data_part_storage_builder->getProjection(projection_name + ".proj");
projection_part_storage_builder->createDirectories();
downloadBaseOrProjectionPartToDisk(
replica_path, part_download_path + projection_name + ".proj/", sync, disk, in, projection_checksum, throttler);
replica_path, projection_part_storage_builder, sync, in, projection_checksum, throttler);
checksums.addFile(
projection_name + ".proj", projection_checksum.getTotalSizeOnDisk(), projection_checksum.getTotalChecksumUInt128());
}
// Download the base part
downloadBaseOrProjectionPartToDisk(replica_path, part_download_path, sync, disk, in, checksums, throttler);
downloadBaseOrProjectionPartToDisk(replica_path, data_part_storage_builder, sync, in, checksums, throttler);
assertEOF(in);
auto volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk, 0);
MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(part_name, volume, part_relative_path);
MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(part_name, data_part_storage);
new_data_part->is_temp = true;
new_data_part->modification_time = time(nullptr);
new_data_part->loadColumnsChecksumsIndexes(true, false);
@ -785,21 +822,31 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDiskRemoteMeta(
static const String TMP_PREFIX = "tmp-fetch_";
String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;
String part_relative_path = String(to_detached ? "detached/" : "") + tmp_prefix + part_name;
String part_download_path = fs::path(data.getRelativeDataPath()) / part_relative_path / "";
String part_dir = tmp_prefix + part_name;
String part_relative_path = data.getRelativeDataPath() + String(to_detached ? "detached/" : "");
if (disk->exists(part_download_path))
throw Exception(ErrorCodes::DIRECTORY_ALREADY_EXISTS, "Directory {} already exists.", fullPath(disk, part_download_path));
auto volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk);
auto data_part_storage = std::make_shared<DataPartStorageOnDisk>(
volume,
part_relative_path,
part_dir);
DataPartStorageBuilderPtr data_part_storage_builder = std::make_shared<DataPartStorageBuilderOnDisk>(
volume,
part_relative_path,
part_dir);
if (data_part_storage_builder->exists())
throw Exception(ErrorCodes::DIRECTORY_ALREADY_EXISTS, "Directory {} already exists.", data_part_storage_builder->getFullPath());
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedFetch};
disk->createDirectories(part_download_path);
data_part_storage_builder->createDirectories();
size_t files;
readBinary(files, in);
auto volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk);
for (size_t i = 0; i < files; ++i)
{
String file_name;
@ -808,8 +855,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDiskRemoteMeta(
readStringBinary(file_name, in);
readBinary(file_size, in);
String data_path = fs::path(part_download_path) / file_name;
String metadata_file = fullPath(disk, data_path);
String metadata_file = fs::path(data_part_storage_builder->getFullPath()) / file_name;
{
auto file_out = std::make_unique<WriteBufferFromFile>(metadata_file, DBMS_DEFAULT_BUFFER_SIZE, -1, 0666, nullptr, 0);
@ -823,7 +869,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDiskRemoteMeta(
/// NOTE The is_cancelled flag also makes sense to check every time you read over the network,
/// performing a poll with a not very large timeout.
/// And now we check it only between read chunks (in the `copyData` function).
disk->removeSharedRecursive(part_download_path, true);
data_part_storage_builder->removeSharedRecursive(true);
throw Exception("Fetching of part was cancelled", ErrorCodes::ABORTED);
}
@ -841,7 +887,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDiskRemoteMeta(
assertEOF(in);
MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(part_name, volume, part_relative_path);
MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(part_name, data_part_storage);
new_data_part->is_temp = true;
new_data_part->modification_time = time(nullptr);
new_data_part->loadColumnsChecksumsIndexes(true, false);

View File

@ -90,9 +90,8 @@ public:
private:
void downloadBaseOrProjectionPartToDisk(
const String & replica_path,
const String & part_download_path,
DataPartStorageBuilderPtr & data_part_storage_builder,
bool sync,
DiskPtr disk,
PooledReadWriteBufferFromHTTP & in,
MergeTreeData::DataPart::Checksums & checksums,
ThrottlerPtr throttler) const;

View File

@ -11,8 +11,25 @@ class ReadBufferFromFileBase;
class WriteBufferFromFileBase;
class IDiskDirectoryIterator;
using DiskDirectoryIteratorPtr = std::unique_ptr<IDiskDirectoryIterator>;
class IDataPartStorageIterator
{
public:
/// Iterate to the next file.
virtual void next() = 0;
/// Return `true` if the iterator points to a valid element.
virtual bool isValid() const = 0;
/// Return `true` if the iterator points to a file.
virtual bool isFile() const = 0;
/// Name of the file that the iterator currently points to.
virtual std::string name() const = 0;
virtual ~IDataPartStorageIterator() = default;
};
using DataPartStorageIteratorPtr = std::unique_ptr<IDataPartStorageIterator>;
struct MergeTreeDataPartChecksums;
@ -24,6 +41,9 @@ class IStoragePolicy;
class IDisk;
using DiskPtr = std::shared_ptr<IDisk>;
class ISyncGuard;
using SyncGuardPtr = std::unique_ptr<ISyncGuard>;
class IBackupEntry;
using BackupEntryPtr = std::unique_ptr<IBackupEntry>;
using BackupEntries = std::vector<std::pair<std::string, BackupEntryPtr>>;
@ -50,8 +70,8 @@ public:
virtual Poco::Timestamp getLastModified() const = 0;
virtual DiskDirectoryIteratorPtr iterate() const = 0;
virtual DiskDirectoryIteratorPtr iterateDirectory(const std::string & path) const = 0;
virtual DataPartStorageIteratorPtr iterate() const = 0;
virtual DataPartStorageIteratorPtr iterateDirectory(const std::string & path) const = 0;
struct ProjectionChecksums
{
@ -66,6 +86,7 @@ public:
Poco::Logger * log) const = 0;
virtual size_t getFileSize(const std::string & path) const = 0;
virtual UInt32 getRefCount(const String &) const { return 0; }
virtual std::string getRelativePathForPrefix(Poco::Logger * log, const String & prefix, bool detached) const = 0;
@ -85,8 +106,8 @@ public:
virtual std::string getDiskPathForLogs() const = 0;
/// Should remove it later
virtual void writeChecksums(MergeTreeDataPartChecksums & checksums) const = 0;
virtual void writeColumns(NamesAndTypesList & columns) const = 0;
virtual void writeChecksums(const MergeTreeDataPartChecksums & checksums) const = 0;
virtual void writeColumns(const NamesAndTypesList & columns) const = 0;
virtual void writeDeleteOnDestroyMarker(Poco::Logger * log) const = 0;
virtual void checkConsistency(const MergeTreeDataPartChecksums & checksums) const = 0;
@ -124,6 +145,7 @@ public:
/// Disk name
virtual std::string getName() const = 0;
virtual std::string getDiskType() const = 0;
virtual std::shared_ptr<IDataPartStorage> getProjection(const std::string & name) const = 0;
};
@ -140,11 +162,13 @@ public:
virtual std::string getRelativePath() const = 0;
virtual std::string getFullPath() const = 0;
virtual std::string getFullRelativePath() const = 0;
virtual bool exists() const = 0;
virtual bool exists(const std::string & path) const = 0;
virtual void createDirectories() = 0;
virtual void createProjection(const std::string & name) = 0;
virtual std::unique_ptr<ReadBufferFromFileBase> readFile(
const std::string & path,
@ -152,12 +176,15 @@ public:
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const = 0;
virtual std::unique_ptr<WriteBufferFromFileBase> writeFile(
const String & path,
size_t buf_size /* = DBMS_DEFAULT_BUFFER_SIZE*/) = 0;
virtual std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & path, size_t buf_size) = 0;
virtual void removeFile(const String & path) = 0;
virtual void removeRecursive() = 0;
virtual void removeSharedRecursive(bool keep_in_remote_fs) = 0;
virtual SyncGuardPtr getDirectorySyncGuard() const { return nullptr; }
virtual void createHardLinkFrom(const IDataPartStorage & source, const std::string & from, const std::string & to) const = 0;
virtual ReservationPtr reserve(UInt64 /*bytes*/) { return nullptr; }

View File

@ -1298,12 +1298,7 @@ void IMergeTreeDataPart::remove() const
return;
if (isProjectionPart())
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Projection part {} should be removed by its parent {}.",
name, parent_part->name);
}
LOG_WARNING(storage.log, "Projection part {} should be removed by its parent {}.", name, parent_part->name);
metadata_manager->deleteAll(false);
metadata_manager->assertAllDeleted(false);

View File

@ -87,8 +87,8 @@ public:
UncompressedCache * uncompressed_cache,
MarkCache * mark_cache,
const MergeTreeReaderSettings & reader_settings_,
const ValueSizeMap & avg_value_size_hints_ = ValueSizeMap{},
const ReadBufferFromFileBase::ProfileCallback & profile_callback_ = ReadBufferFromFileBase::ProfileCallback{}) const = 0;
const ValueSizeMap & avg_value_size_hints_,
const ReadBufferFromFileBase::ProfileCallback & profile_callback_) const = 0;
virtual MergeTreeWriterPtr getWriter(
DataPartStorageBuilderPtr data_part_storage_builder,
@ -97,7 +97,7 @@ public:
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & writer_settings,
const MergeTreeIndexGranularity & computed_index_granularity = {}) const = 0;
const MergeTreeIndexGranularity & computed_index_granularity) const = 0;
virtual bool isStoredOnDisk() const = 0;

View File

@ -5,6 +5,7 @@
#include <base/logger_useful.h>
#include <Common/ActionBlocker.h>
#include <Storages/MergeTree/DataPartStorageOnDisk.h>
#include <DataTypes/ObjectUtils.h>
#include <DataTypes/Serializations/SerializationInfo.h>
@ -151,12 +152,22 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
auto local_single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + global_ctx->future_part->name, ctx->disk, 0);
auto data_part_storage = std::make_shared<DataPartStorageOnDisk>(
local_single_disk_volume,
local_part_path,
local_tmp_part_basename);
global_ctx->data_part_storage_builder = std::make_shared<DataPartStorageBuilderOnDisk>(
local_single_disk_volume,
local_part_path,
local_tmp_part_basename);
global_ctx->new_data_part = global_ctx->data->createPart(
global_ctx->future_part->name,
global_ctx->future_part->type,
global_ctx->future_part->part_info,
local_single_disk_volume,
local_tmp_part_basename,
data_part_storage,
global_ctx->parent_part);
global_ctx->new_data_part->uuid = global_ctx->future_part->uuid;
@ -256,6 +267,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
global_ctx->to = std::make_shared<MergedBlockOutputStream>(
global_ctx->new_data_part,
global_ctx->data_part_storage_builder,
global_ctx->metadata_snapshot,
global_ctx->merging_columns,
MergeTreeIndexFactory::instance().getMany(global_ctx->metadata_snapshot->getSecondaryIndices()),
@ -438,6 +450,7 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
ctx->executor = std::make_unique<PullingPipelineExecutor>(ctx->column_parts_pipeline);
ctx->column_to = std::make_unique<MergedColumnOnlyOutputStream>(
global_ctx->data_part_storage_builder,
global_ctx->new_data_part,
global_ctx->metadata_snapshot,
ctx->executor->getHeader(),

View File

@ -157,6 +157,7 @@ private:
std::unique_ptr<PullingPipelineExecutor> merging_executor;
MergeTreeData::MutableDataPartPtr new_data_part{nullptr};
DataPartStorageBuilderPtr data_part_storage_builder;
size_t rows_written{0};
UInt64 watch_prev_elapsed{0};

View File

@ -64,11 +64,11 @@ void MergeTreeSelectProcessor::initializeReaders()
owned_mark_cache = storage.getContext()->getMarkCache();
reader = data_part->getReader(task_columns.columns, storage_snapshot->getMetadataForQuery(),
all_mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings);
all_mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings, {}, {});
if (prewhere_info)
pre_reader = data_part->getReader(task_columns.pre_columns, storage_snapshot->getMetadataForQuery(),
all_mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings);
all_mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings, {}, {});
}

View File

@ -66,7 +66,7 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
reader = data_part->getReader(columns_for_reader, storage_snapshot->metadata,
MarkRanges{MarkRange(0, data_part->getMarksCount())},
/* uncompressed_cache = */ nullptr, mark_cache.get(), reader_settings);
/* uncompressed_cache = */ nullptr, mark_cache.get(), reader_settings, {}, {});
}
Chunk MergeTreeSequentialSource::generate()

View File

@ -10,6 +10,7 @@ namespace ErrorCodes
}
MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
DataPartStorageBuilderPtr data_part_storage_builder_,
const MergeTreeDataPartPtr & data_part,
const StorageMetadataPtr & metadata_snapshot_,
const Block & header_,
@ -18,7 +19,7 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
WrittenOffsetColumns * offset_columns_,
const MergeTreeIndexGranularity & index_granularity,
const MergeTreeIndexGranularityInfo * index_granularity_info)
: IMergedBlockOutputStream(data_part, metadata_snapshot_, header_.getNamesAndTypesList(), /*reset_columns=*/ true)
: IMergedBlockOutputStream(std::move(data_part_storage_builder_), data_part, metadata_snapshot_, header_.getNamesAndTypesList(), /*reset_columns=*/ true)
, header(header_)
{
const auto & global_settings = data_part->storage.getContext()->getSettings();
@ -31,6 +32,7 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
/* rewrite_primary_key = */false);
writer = data_part->getWriter(
data_part_storage_builder,
header.getNamesAndTypesList(),
metadata_snapshot_,
indices_to_recalc,
@ -75,13 +77,11 @@ MergedColumnOnlyOutputStream::fillChecksums(
auto removed_files = removeEmptyColumnsFromPart(new_part, columns, serialization_infos, checksums);
auto disk = new_part->volume->getDisk();
for (const String & removed_file : removed_files)
{
auto file_path = new_part->getFullRelativePath() + removed_file;
/// Can be called multiple times, don't need to remove file twice
if (disk->exists(file_path))
disk->removeFile(file_path);
if (data_part_storage_builder->exists(removed_file))
data_part_storage_builder->removeFile(removed_file);
if (all_checksums.files.count(removed_file))
all_checksums.files.erase(removed_file);

View File

@ -14,6 +14,7 @@ public:
/// Pass empty 'already_written_offset_columns' first time then and pass the same object to subsequent instances of MergedColumnOnlyOutputStream
/// if you want to serialize elements of Nested data structure in different instances of MergedColumnOnlyOutputStream.
MergedColumnOnlyOutputStream(
DataPartStorageBuilderPtr data_part_storage_builder_,
const MergeTreeDataPartPtr & data_part,
const StorageMetadataPtr & metadata_snapshot_,
const Block & header_,

View File

@ -2,6 +2,7 @@
#include <base/logger_useful.h>
#include <Common/escapeForFileName.h>
#include <Storages/MergeTree/DataPartStorageOnDisk.h>
#include <Parsers/queryToString.h>
#include <Interpreters/SquashingTransform.h>
#include <Processors/Transforms/TTLTransform.h>
@ -417,16 +418,17 @@ static NameToNameVector collectFilesForRenames(
/// Initialize and write to disk new part fields like checksums, columns, etc.
void finalizeMutatedPart(
const MergeTreeDataPartPtr & source_part,
const DataPartStorageBuilderPtr & data_part_storage_builder,
MergeTreeData::MutableDataPartPtr new_data_part,
ExecuteTTLType execute_ttl_type,
const CompressionCodecPtr & codec)
{
auto disk = new_data_part->volume->getDisk();
auto part_path = fs::path(new_data_part->getFullRelativePath());
//auto disk = new_data_part->volume->getDisk();
//auto part_path = fs::path(new_data_part->getFullRelativePath());
if (new_data_part->uuid != UUIDHelpers::Nil)
{
auto out = disk->writeFile(part_path / IMergeTreeDataPart::UUID_FILE_NAME, 4096);
auto out = data_part_storage_builder->writeFile(IMergeTreeDataPart::UUID_FILE_NAME, 4096);
HashingWriteBuffer out_hashing(*out);
writeUUIDText(new_data_part->uuid, out_hashing);
new_data_part->checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_size = out_hashing.count();
@ -436,7 +438,7 @@ void finalizeMutatedPart(
if (execute_ttl_type != ExecuteTTLType::NONE)
{
/// Write a file with ttl infos in json format.
auto out_ttl = disk->writeFile(part_path / "ttl.txt", 4096);
auto out_ttl = data_part_storage_builder->writeFile("ttl.txt", 4096);
HashingWriteBuffer out_hashing(*out_ttl);
new_data_part->ttl_infos.write(out_hashing);
new_data_part->checksums.files["ttl.txt"].file_size = out_hashing.count();
@ -445,7 +447,7 @@ void finalizeMutatedPart(
if (!new_data_part->getSerializationInfos().empty())
{
auto out = disk->writeFile(part_path / IMergeTreeDataPart::SERIALIZATION_FILE_NAME, 4096);
auto out = data_part_storage_builder->writeFile(IMergeTreeDataPart::SERIALIZATION_FILE_NAME, 4096);
HashingWriteBuffer out_hashing(*out);
new_data_part->getSerializationInfos().writeJSON(out_hashing);
new_data_part->checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_size = out_hashing.count();
@ -454,18 +456,18 @@ void finalizeMutatedPart(
{
/// Write file with checksums.
auto out_checksums = disk->writeFile(part_path / "checksums.txt", 4096);
auto out_checksums = data_part_storage_builder->writeFile("checksums.txt", 4096);
new_data_part->checksums.write(*out_checksums);
} /// close fd
{
auto out = disk->writeFile(part_path / IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096);
auto out = data_part_storage_builder->writeFile(IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096);
DB::writeText(queryToString(codec->getFullCodecDesc()), *out);
}
{
/// Write a file with a description of columns.
auto out_columns = disk->writeFile(part_path / "columns.txt", 4096);
auto out_columns = data_part_storage_builder->writeFile("columns.txt", 4096);
new_data_part->getColumns().writeText(*out_columns);
} /// close fd
@ -475,8 +477,7 @@ void finalizeMutatedPart(
new_data_part->minmax_idx = source_part->minmax_idx;
new_data_part->modification_time = time(nullptr);
new_data_part->loadProjections(false, false);
new_data_part->setBytesOnDisk(
MergeTreeData::DataPart::calculateTotalSizeOnDisk(new_data_part->volume->getDisk(), part_path));
new_data_part->setBytesOnDisk(new_data_part->data_part_storage->calculateTotalSizeOnDisk());
new_data_part->default_codec = codec;
new_data_part->calculateColumnsAndSecondaryIndicesSizesOnDisk();
new_data_part->storage.lockSharedData(*new_data_part);
@ -525,10 +526,11 @@ struct MutationContext
MutationsInterpreter::MutationKind::MutationKindEnum mutation_kind
= MutationsInterpreter::MutationKind::MutationKindEnum::MUTATE_UNKNOWN;
VolumePtr single_disk_volume;
//VolumePtr single_disk_volume;
MergeTreeData::MutableDataPartPtr new_data_part;
DiskPtr disk;
String new_part_tmp_path;
//DiskPtr disk;
DataPartStorageBuilderPtr data_part_storage_builder;
//String new_part_tmp_path;
IMergedBlockOutputStreamPtr out{nullptr};
@ -810,7 +812,7 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()
if (projection_block)
{
auto tmp_part = MergeTreeDataWriter::writeTempProjectionPart(
*ctx->data, ctx->log, projection_block, projection, ctx->new_data_part.get(), ++block_num);
*ctx->data, ctx->log, projection_block, projection, ctx->data_part_storage_builder, ctx->new_data_part.get(), ++block_num);
tmp_part.finalize();
projection_parts[projection.name].emplace_back(std::move(tmp_part.part));
}
@ -832,7 +834,7 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()
if (projection_block)
{
auto temp_part = MergeTreeDataWriter::writeTempProjectionPart(
*ctx->data, ctx->log, projection_block, projection, ctx->new_data_part.get(), ++block_num);
*ctx->data, ctx->log, projection_block, projection, ctx->data_part_storage_builder, ctx->new_data_part.get(), ++block_num);
temp_part.finalize();
projection_parts[projection.name].emplace_back(std::move(temp_part.part));
}
@ -932,7 +934,7 @@ private:
void prepare()
{
ctx->disk->createDirectories(ctx->new_part_tmp_path);
ctx->data_part_storage_builder->createDirectories();
/// Note: this is done before creating input streams, because otherwise data.data_parts_mutex
/// (which is locked in data.getTotalActiveSizeInBytes())
@ -968,6 +970,7 @@ private:
ctx->out = std::make_shared<MergedBlockOutputStream>(
ctx->new_data_part,
ctx->data_part_storage_builder,
ctx->metadata_snapshot,
ctx->new_data_part->getColumns(),
skip_part_indices,
@ -1056,15 +1059,15 @@ private:
if (ctx->execute_ttl_type != ExecuteTTLType::NONE)
ctx->files_to_skip.insert("ttl.txt");
ctx->disk->createDirectories(ctx->new_part_tmp_path);
ctx->data_part_storage_builder->createDirectories();
/// Create hardlinks for unchanged files
for (auto it = ctx->disk->iterateDirectory(ctx->source_part->getFullRelativePath()); it->isValid(); it->next())
for (auto it = ctx->source_part->data_part_storage->iterate(); it->isValid(); it->next())
{
if (ctx->files_to_skip.count(it->name()))
continue;
String destination = ctx->new_part_tmp_path;
String destination; // = ctx->new_part_tmp_path;
String file_name = it->name();
auto rename_it = std::find_if(ctx->files_to_rename.begin(), ctx->files_to_rename.end(), [&file_name](const auto & rename_pair)
@ -1075,23 +1078,28 @@ private:
{
if (rename_it->second.empty())
continue;
destination += rename_it->second;
destination = rename_it->second;
}
else
{
destination += it->name();
destination = it->name();
}
if (!ctx->disk->isDirectory(it->path()))
ctx->disk->createHardLink(it->path(), destination);
if (it->isFile())
ctx->data_part_storage_builder->createHardLinkFrom(
*ctx->source_part->data_part_storage, it->name(), destination);
else if (!endsWith(".tmp_proj", it->name())) // ignore projection tmp merge dir
{
// it's a projection part directory
ctx->disk->createDirectories(destination);
for (auto p_it = ctx->disk->iterateDirectory(it->path()); p_it->isValid(); p_it->next())
ctx->data_part_storage_builder->createDirectory(destination);
auto projection_data_part_storage = ctx->source_part->data_part_storage->getProjection(destination);
auto projection_data_part_storage_builder = ctx->data_part_storage_builder->getProjection(destination);
for (auto p_it = projection_data_part_storage->iterate(); p_it->isValid(); p_it->next())
{
String p_destination = fs::path(destination) / p_it->name();
ctx->disk->createHardLink(p_it->path(), p_destination);
projection_data_part_storage_builder->createHardLinkFrom(
*projection_data_part_storage, p_it->name(), p_it->name());
}
}
}
@ -1162,7 +1170,7 @@ private:
}
}
MutationHelpers::finalizeMutatedPart(ctx->source_part, ctx->new_data_part, ctx->execute_ttl_type, ctx->compression_codec);
MutationHelpers::finalizeMutatedPart(ctx->source_part, ctx->new_data_part, ctx->data_part_storage_builder, ctx->execute_ttl_type, ctx->compression_codec);
}
@ -1293,8 +1301,19 @@ bool MutateTask::prepare()
}
ctx->single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + ctx->future_part->name, ctx->space_reservation->getDisk(), 0);
auto data_part_storage = std::make_shared<DataPartStorageOnDisk>(
ctx->single_disk_volume,
ctx->data->getRelativeDataPath(),
"tmp_mut_" + ctx->future_part->name);
ctx->data_part_storage_builder = std::make_shared<DataPartStorageBuilderOnDisk>(
ctx->single_disk_volume,
ctx->data->getRelativeDataPath(),
"tmp_mut_" + ctx->future_part->name);
ctx->new_data_part = ctx->data->createPart(
ctx->future_part->name, ctx->future_part->type, ctx->future_part->part_info, ctx->single_disk_volume, "tmp_mut_" + ctx->future_part->name);
ctx->future_part->name, ctx->future_part->type, ctx->future_part->part_info, data_part_storage);
ctx->new_data_part->uuid = ctx->future_part->uuid;
ctx->new_data_part->is_temp = true;
@ -1311,8 +1330,8 @@ bool MutateTask::prepare()
ctx->new_data_part->setSerializationInfos(new_infos);
ctx->new_data_part->partition.assign(ctx->source_part->partition);
ctx->disk = ctx->new_data_part->volume->getDisk();
ctx->new_part_tmp_path = ctx->new_data_part->getFullRelativePath();
// ctx->disk = ctx->new_data_part->volume->getDisk();
//ctx->new_part_tmp_path = ctx->new_data_part->getFullRelativePath();
/// Don't change granularity type while mutating subset of columns
ctx->mrk_extension = ctx->source_part->index_granularity_info.is_adaptive ? getAdaptiveMrkExtension(ctx->new_data_part->getType())

View File

@ -1371,7 +1371,7 @@ void StorageMergeTree::dropPartsImpl(DataPartsVector && parts_to_remove, bool de
/// NOTE: no race with background cleanup until we hold pointers to parts
for (const auto & part : parts_to_remove)
{
LOG_INFO(log, "Detaching {}", part->relative_path);
LOG_INFO(log, "Detaching {}", part->data_part_storage->getRelativePath());
part->makeCloneInDetached("", metadata_snapshot);
}
}
@ -1606,29 +1606,25 @@ CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_
for (auto & part : data_parts)
{
auto disk = part->volume->getDisk();
String part_path = part->getFullRelativePath();
//auto disk = part->volume->getDisk();
//String part_path = part->getFullRelativePath();
/// If the checksums file is not present, calculate the checksums and write them to disk.
String checksums_path = fs::path(part_path) / "checksums.txt";
String tmp_checksums_path = fs::path(part_path) / "checksums.txt.tmp";
if (part->isStoredOnDisk() && !disk->exists(checksums_path))
String checksums_path = "checksums.txt";
String tmp_checksums_path = "checksums.txt.tmp";
if (part->isStoredOnDisk() && !part->data_part_storage->exists(checksums_path))
{
try
{
auto calculated_checksums = checkDataPart(part, false);
calculated_checksums.checkEqual(part->checksums, true);
auto out = disk->writeFile(tmp_checksums_path, 4096);
part->checksums.write(*out);
disk->moveFile(tmp_checksums_path, checksums_path);
part->data_part_storage->writeChecksums(part->checksums);
part->checkMetadata();
results.emplace_back(part->name, true, "Checksums recounted and written to disk.");
}
catch (const Exception & ex)
{
if (disk->exists(tmp_checksums_path))
disk->removeFile(tmp_checksums_path);
results.emplace_back(part->name, false,
"Check of part finished with error: '" + ex.message() + "'");
}

View File

@ -9,6 +9,7 @@
#include <Common/formatReadable.h>
#include <Common/thread_local_rng.h>
#include <Common/typeid_cast.h>
#include <Storages/MergeTree/DataPartStorageOnDisk.h>
#include <base/sort.h>
@ -1437,12 +1438,16 @@ MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::attachPartHelperFo
continue;
const String part_old_name = part_info->getPartName();
const String part_path = fs::path("detached") / part_old_name;
const VolumePtr volume = std::make_shared<SingleDiskVolume>("volume_" + part_old_name, disk);
auto data_part_storage = std::make_shared<DataPartStorageOnDisk>(
volume,
fs::path(relative_data_path) / "detached",
part_old_name);
/// actual_part_info is more recent than part_info so we use it
MergeTreeData::MutableDataPartPtr part = createPart(part_new_name, actual_part_info, volume, part_path);
MergeTreeData::MutableDataPartPtr part = createPart(part_new_name, actual_part_info, data_part_storage);
try
{
@ -1457,7 +1462,7 @@ MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::attachPartHelperFo
if (entry.part_checksum == part->checksums.getTotalChecksumHex())
{
part->modification_time = disk->getLastModified(part->getFullRelativePath()).epochTime();
part->modification_time = data_part_storage->getLastModified().epochTime();
return part;
}
}
@ -1823,7 +1828,7 @@ void StorageReplicatedMergeTree::executeDropRange(const LogEntry & entry)
/// If DETACH clone parts to detached/ directory
for (const auto & part : parts_to_remove)
{
LOG_INFO(log, "Detaching {}", part->relative_path);
LOG_INFO(log, "Detaching {}", part->data_part_storage->getRelativePath());
part->makeCloneInDetached("", metadata_snapshot);
}
}
@ -2448,7 +2453,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
for (const auto & part : parts_to_remove_from_working_set)
{
LOG_INFO(log, "Detaching {}", part->relative_path);
LOG_INFO(log, "Detaching {}", part->data_part_storage->getRelativePath());
part->makeCloneInDetached("clone", metadata_snapshot);
}
}
@ -4037,10 +4042,10 @@ bool StorageReplicatedMergeTree::fetchExistsPart(const String & part_name, const
{
part = get_part();
if (part->volume->getDisk()->getName() != replaced_disk->getName())
throw Exception("Part " + part->name + " fetched on wrong disk " + part->volume->getDisk()->getName(), ErrorCodes::LOGICAL_ERROR);
if (part->data_part_storage->getName() != replaced_disk->getName())
throw Exception("Part " + part->name + " fetched on wrong disk " + part->data_part_storage->getName(), ErrorCodes::LOGICAL_ERROR);
replaced_disk->removeFileIfExists(replaced_part_path);
replaced_disk->moveDirectory(part->getFullRelativePath(), replaced_part_path);
replaced_disk->moveDirectory(part->data_part_storage->getFullRelativePath(), replaced_part_path);
}
catch (const Exception & e)
{
@ -7115,7 +7120,7 @@ void StorageReplicatedMergeTree::checkBrokenDisks()
for (auto & part : *parts)
{
if (part->volume && part->volume->getDisk()->getName() == disk_ptr->getName())
if (part->data_part_storage && part->data_part_storage->getName() == disk_ptr->getName())
broken_part_callback(part->name);
}
continue;
@ -7216,7 +7221,7 @@ void StorageReplicatedMergeTree::lockSharedDataTemporary(const String & part_nam
String id = part_id;
boost::replace_all(id, "/", "_");
Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), disk->getType(), getTableSharedID(),
Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), toString(disk->getType()), getTableSharedID(),
part_name, zookeeper_path);
for (const auto & zc_zookeeper_path : zc_zookeeper_paths)
@ -7230,11 +7235,10 @@ void StorageReplicatedMergeTree::lockSharedDataTemporary(const String & part_nam
void StorageReplicatedMergeTree::lockSharedData(const IMergeTreeDataPart & part, bool replace_existing_lock) const
{
if (!part.volume || !part.isStoredOnDisk())
if (!part.data_part_storage || !part.isStoredOnDisk())
return;
DiskPtr disk = part.volume->getDisk();
if (!disk || !disk->supportZeroCopyReplication())
if (part.data_part_storage->supportZeroCopyReplication())
return;
zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper();
@ -7244,7 +7248,7 @@ void StorageReplicatedMergeTree::lockSharedData(const IMergeTreeDataPart & part,
String id = part.getUniqueId();
boost::replace_all(id, "/", "_");
Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), disk->getType(), getTableSharedID(),
Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), part.data_part_storage->getDiskType(), getTableSharedID(),
part.name, zookeeper_path);
for (const auto & zc_zookeeper_path : zc_zookeeper_paths)
{
@ -7265,18 +7269,16 @@ bool StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & par
bool StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & part, const String & name) const
{
if (!part.volume || !part.isStoredOnDisk())
if (!part.data_part_storage || !part.isStoredOnDisk())
return true;
DiskPtr disk = part.volume->getDisk();
if (!disk || !disk->supportZeroCopyReplication())
if (!part.data_part_storage->supportZeroCopyReplication())
return true;
/// If part is temporary refcount file may be absent
auto ref_count_path = fs::path(part.getFullRelativePath()) / IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK;
if (disk->exists(ref_count_path))
if (part.data_part_storage->exists(IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK))
{
auto ref_count = disk->getRefCount(ref_count_path);
auto ref_count = part.data_part_storage->getRefCount(IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK);
if (ref_count > 0) /// Keep part shard info for frozen backups
return false;
}
@ -7286,18 +7288,18 @@ bool StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & par
return true;
}
return unlockSharedDataByID(part.getUniqueId(), getTableSharedID(), name, replica_name, disk, getZooKeeper(), *getSettings(), log,
return unlockSharedDataByID(part.getUniqueId(), getTableSharedID(), name, replica_name, part.data_part_storage->getDiskType(), getZooKeeper(), *getSettings(), log,
zookeeper_path);
}
bool StorageReplicatedMergeTree::unlockSharedDataByID(String part_id, const String & table_uuid, const String & part_name,
const String & replica_name_, DiskPtr disk, zkutil::ZooKeeperPtr zookeeper_ptr, const MergeTreeSettings & settings,
const String & replica_name_, std::string disk_type, zkutil::ZooKeeperPtr zookeeper_ptr, const MergeTreeSettings & settings,
Poco::Logger * logger, const String & zookeeper_path_old)
{
boost::replace_all(part_id, "/", "_");
Strings zc_zookeeper_paths = getZeroCopyPartPath(settings, disk->getType(), table_uuid, part_name, zookeeper_path_old);
Strings zc_zookeeper_paths = getZeroCopyPartPath(settings, disk_type, table_uuid, part_name, zookeeper_path_old);
bool part_has_no_more_locks = true;
@ -7381,7 +7383,7 @@ String StorageReplicatedMergeTree::getSharedDataReplica(
if (!zookeeper)
return "";
Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), disk_type, getTableSharedID(), part.name,
Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), toString(disk_type), getTableSharedID(), part.name,
zookeeper_path);
std::set<String> replicas;
@ -7452,12 +7454,12 @@ String StorageReplicatedMergeTree::getSharedDataReplica(
}
Strings StorageReplicatedMergeTree::getZeroCopyPartPath(const MergeTreeSettings & settings, DiskType disk_type, const String & table_uuid,
Strings StorageReplicatedMergeTree::getZeroCopyPartPath(const MergeTreeSettings & settings, std::string disk_type, const String & table_uuid,
const String & part_name, const String & zookeeper_path_old)
{
Strings res;
String zero_copy = fmt::format("zero_copy_{}", toString(disk_type));
String zero_copy = fmt::format("zero_copy_{}", disk_type);
String new_path = fs::path(settings.remote_fs_zero_copy_zookeeper_path.toString()) / zero_copy / table_uuid / part_name;
res.push_back(new_path);
@ -7491,7 +7493,7 @@ std::optional<String> StorageReplicatedMergeTree::getZeroCopyPartPath(const Stri
if (!disk || !disk->supportZeroCopyReplication())
return std::nullopt;
return getZeroCopyPartPath(*getSettings(), disk->getType(), getTableSharedID(), part_name, zookeeper_path)[0];
return getZeroCopyPartPath(*getSettings(), toString(disk->getType()), getTableSharedID(), part_name, zookeeper_path)[0];
}
std::optional<ZeroCopyLock> StorageReplicatedMergeTree::tryCreateZeroCopyExclusiveLock(const String & part_name, const DiskPtr & disk)
@ -7586,12 +7588,22 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
auto minmax_idx = std::make_shared<IMergeTreeDataPart::MinMaxIndex>();
minmax_idx->update(block, getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
auto new_volume = createVolumeFromReservation(reservation, volume);
auto data_part_storage = std::make_shared<DataPartStorageOnDisk>(
new_volume,
relative_data_path,
TMP_PREFIX + lost_part_name);
DataPartStorageBuilderPtr data_part_storage_builder = std::make_shared<DataPartStorageBuilderOnDisk>(
new_volume,
relative_data_path,
TMP_PREFIX + lost_part_name);
auto new_data_part = createPart(
lost_part_name,
choosePartType(0, block.rows()),
new_part_info,
createVolumeFromReservation(reservation, volume),
TMP_PREFIX + lost_part_name);
data_part_storage);
if (settings->assign_part_uuids)
new_data_part->uuid = UUIDHelpers::generateV4();
@ -7628,19 +7640,16 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
if (new_data_part->isStoredOnDisk())
{
/// The name could be non-unique in case of stale files from previous runs.
String full_path = new_data_part->getFullRelativePath();
if (new_data_part->volume->getDisk()->exists(full_path))
if (data_part_storage_builder->exists())
{
LOG_WARNING(log, "Removing old temporary directory {}", fullPath(new_data_part->volume->getDisk(), full_path));
new_data_part->volume->getDisk()->removeRecursive(full_path);
LOG_WARNING(log, "Removing old temporary directory {}", new_data_part->data_part_storage->getFullPath());
data_part_storage_builder->removeRecursive();
}
const auto disk = new_data_part->volume->getDisk();
disk->createDirectories(full_path);
data_part_storage_builder->createDirectories();
if (getSettings()->fsync_part_directory)
sync_guard = disk->getDirectorySyncGuard(full_path);
sync_guard = data_part_storage_builder->getDirectorySyncGuard();
}
/// This effectively chooses minimal compression method:
@ -7648,7 +7657,7 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
auto compression_codec = getContext()->chooseCompressionCodec(0, 0);
const auto & index_factory = MergeTreeIndexFactory::instance();
MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns,
MergedBlockOutputStream out(new_data_part, data_part_storage_builder, metadata_snapshot, columns,
index_factory.getMany(metadata_snapshot->getSecondaryIndices()), compression_codec);
bool sync_on_insert = settings->fsync_after_insert;
@ -7909,7 +7918,7 @@ bool StorageReplicatedMergeTree::removeSharedDetachedPart(DiskPtr disk, const St
{
String id = disk->getUniqueId(checksums);
keep_shared = !StorageReplicatedMergeTree::unlockSharedDataByID(id, table_uuid, part_name,
detached_replica_name, disk, zookeeper, getContext()->getReplicatedMergeTreeSettings(), log,
detached_replica_name, toString(disk->getType()), zookeeper, getContext()->getReplicatedMergeTreeSettings(), log,
detached_zookeeper_path);
}
else

View File

@ -247,7 +247,7 @@ public:
/// Return true if data unlocked
/// Return false if data is still used by another node
static bool unlockSharedDataByID(String part_id, const String & table_uuid, const String & part_name, const String & replica_name_,
DiskPtr disk, zkutil::ZooKeeperPtr zookeeper_, const MergeTreeSettings & settings, Poco::Logger * logger,
std::string disk_type, zkutil::ZooKeeperPtr zookeeper_, const MergeTreeSettings & settings, Poco::Logger * logger,
const String & zookeeper_path_old);
/// Fetch part only if some replica has it on shared storage like S3
@ -758,7 +758,7 @@ private:
PartitionBlockNumbersHolder allocateBlockNumbersInAffectedPartitions(
const MutationCommands & commands, ContextPtr query_context, const zkutil::ZooKeeperPtr & zookeeper) const;
static Strings getZeroCopyPartPath(const MergeTreeSettings & settings, DiskType disk_type, const String & table_uuid,
static Strings getZeroCopyPartPath(const MergeTreeSettings & settings, std::string disk_type, const String & table_uuid,
const String & part_name, const String & zookeeper_path_old);
static void createZeroCopyLockNode(const zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_node, int32_t mode = zkutil::CreateMode::Persistent, bool replace_existing_lock = false);