MergeTree full support for S3 (#9646)

* IMergeDataPart full S3 support.

* MergeTreeData full S3 support.

* Compilation fixes.

* Mutations and merges S3 support.

* Fixed removing data part.

* MergeTree for S3 integration tests and fixes.

* Code style issues.

* Enable AWS logging.

* Fixed hardlink creation for DiskLocal.

* Fixed localBackup.cpp compilation.

* Fixed attaching partition.

* Get rid of extra methods in IDisk.

* Fixed storage config reloading.

* More tests with table manipulations.

* Remove unused error codes.

* Move localBackup to MergeTree folder.

* Minor fixes.
This commit is contained in:
Pavel Kovalenko 2020-03-19 19:37:55 +03:00 committed by GitHub
parent 4dda8e11ec
commit f2dca656f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 599 additions and 343 deletions

View File

@ -1,4 +1,5 @@
#include "DiskLocal.h"
#include <Common/createHardLink.h>
#include "DiskFactory.h"
#include <Interpreters/Context.h>
@ -11,7 +12,6 @@
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
@ -254,6 +254,33 @@ Poco::Timestamp DiskLocal::getLastModified(const String & path)
return Poco::File(disk_path + path).getLastModified();
}
void DiskLocal::createHardLink(const String & src_path, const String & dst_path)
{
DB::createHardLink(disk_path + src_path, disk_path + dst_path);
}
void DiskLocal::createFile(const String & path)
{
Poco::File(disk_path + path).createFile();
}
void DiskLocal::setReadOnly(const String & path)
{
Poco::File(disk_path + path).setReadOnly(true);
}
bool inline isSameDiskType(const IDisk & one, const IDisk & another)
{
return typeid(one) == typeid(another);
}
void DiskLocal::copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path)
{
if (isSameDiskType(*this, *to_disk))
Poco::File(disk_path + from_path).copyTo(to_disk->getPath() + to_path); /// Use more optimal way.
else
IDisk::copy(from_path, to_disk, to_path); /// Copy files through buffers.
}
void DiskLocalReservation::update(UInt64 new_size)
{

View File

@ -61,12 +61,16 @@ public:
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override;
void createFile(const String & path) override;
void moveFile(const String & from_path, const String & to_path) override;
void replaceFile(const String & from_path, const String & to_path) override;
void copyFile(const String & from_path, const String & to_path) override;
void copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path) override;
void listFiles(const String & path, std::vector<String> & file_names) override;
std::unique_ptr<ReadBufferFromFileBase> readFile(
@ -91,6 +95,10 @@ public:
Poco::Timestamp getLastModified(const String & path) override;
void setReadOnly(const String & path) override;
void createHardLink(const String & src_path, const String & dst_path) override;
private:
bool tryReserve(UInt64 bytes);

View File

@ -386,10 +386,27 @@ void DiskMemory::removeRecursive(const String & path)
void DiskMemory::listFiles(const String & path, std::vector<String> & file_names)
{
std::lock_guard lock(mutex);
for (auto it = iterateDirectory(path); it->isValid(); it->next())
file_names.push_back(it->name());
}
void DiskMemory::createHardLink(const String &, const String &)
{
throw Exception("Method createHardLink is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED);
}
void DiskMemory::createFile(const String &)
{
throw Exception("Method createFile is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED);
}
void DiskMemory::setReadOnly(const String &)
{
throw Exception("Method setReadOnly is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED);
}
using DiskMemoryPtr = std::shared_ptr<DiskMemory>;

View File

@ -54,6 +54,8 @@ public:
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override;
void createFile(const String & path) override;
void moveFile(const String & from_path, const String & to_path) override;
void replaceFile(const String & from_path, const String & to_path) override;
@ -80,10 +82,14 @@ public:
void removeRecursive(const String & path) override;
void setLastModified(const String &, const Poco::Timestamp &) override { }
void setLastModified(const String &, const Poco::Timestamp &) override {}
Poco::Timestamp getLastModified(const String &) override { return Poco::Timestamp(); }
void setReadOnly(const String & path) override;
void createHardLink(const String & src_path, const String & dst_path) override;
private:
void createDirectoriesImpl(const String & path);
void replaceFileImpl(const String & from_path, const String & to_path);

View File

@ -303,7 +303,12 @@ namespace
finalized = true;
}
void sync() override { metadata.save(true); }
void sync() override
{
if (finalized)
metadata.save(true);
}
std::string getFileName() const override { return metadata.metadata_file_path; }
private:
@ -480,14 +485,12 @@ void DiskS3::copyFile(const String & from_path, const String & to_path)
Metadata from(metadata_path + from_path);
Metadata to(metadata_path + to_path, true);
for (UInt32 i = 0; i < from.s3_objects_count; ++i)
for (const auto & [path, size] : from.s3_objects)
{
auto path = from.s3_objects[i].first;
auto size = from.s3_objects[i].second;
auto new_path = s3_root_path + getRandomName();
Aws::S3::Model::CopyObjectRequest req;
req.SetCopySource(bucket + "/" + path);
req.SetBucket(bucket);
req.SetCopySource(path);
req.SetKey(new_path);
throwIfError(client->CopyObject(req));
@ -621,6 +624,27 @@ Poco::Timestamp DiskS3::getLastModified(const String & path)
return Poco::File(metadata_path + path).getLastModified();
}
void DiskS3::createHardLink(const String & src_path, const String & dst_path)
{
/**
* TODO: Replace with optimal implementation:
* Store links into a list in metadata file.
* Hardlink creation is adding new link to list and just metadata file copy.
*/
copyFile(src_path, dst_path);
}
void DiskS3::createFile(const String & path)
{
/// Create empty metadata file.
Metadata metadata(metadata_path + path, true);
metadata.save();
}
void DiskS3::setReadOnly(const String & path)
{
Poco::File(metadata_path + path).setReadOnly(true);
}
DiskS3Reservation::~DiskS3Reservation()
{

View File

@ -31,7 +31,7 @@ public:
const String & getName() const override { return name; }
const String & getPath() const override { return s3_root_path; }
const String & getPath() const override { return metadata_path; }
ReservationPtr reserve(UInt64 bytes) override;
@ -87,10 +87,16 @@ public:
void removeRecursive(const String & path) override;
void createHardLink(const String & src_path, const String & dst_path) override;
void setLastModified(const String & path, const Poco::Timestamp & timestamp) override;
Poco::Timestamp getLastModified(const String & path) override;
void createFile(const String & path) override;
void setReadOnly(const String & path) override;
private:
bool tryReserve(UInt64 bytes);

View File

@ -1,4 +1,9 @@
#include "IDisk.h"
#include <IO/copyData.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFileBase.h>
#include <Poco/Logger.h>
#include <common/logger_useful.h>
namespace DB
{
@ -7,4 +12,36 @@ bool IDisk::isDirectoryEmpty(const String & path)
{
return !iterateDirectory(path)->isValid();
}
void copyFile(IDisk & from_disk, const String & from_path, IDisk & to_disk, const String & to_path)
{
LOG_DEBUG(
&Poco::Logger::get("IDisk"),
"Copying from " << from_disk.getName() << " " << from_path << " to " << to_disk.getName() << " " << to_path);
auto in = from_disk.readFile(from_path);
auto out = to_disk.writeFile(to_path);
copyData(*in, *out);
}
void IDisk::copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path)
{
if (isFile(from_path))
{
DB::copyFile(*this, from_path, *to_disk, to_path + fileName(from_path));
}
else
{
Poco::Path path(from_path);
const String & dir_name = path.directory(path.depth() - 1);
const String dest = to_path + dir_name + "/";
to_disk->createDirectories(dest);
for (auto it = iterateDirectory(from_path); it->isValid(); it->next())
{
copy(it->path(), to_disk, dest);
}
}
}
}

View File

@ -111,6 +111,9 @@ public:
/// Return `true` if the specified directory is empty.
bool isDirectoryEmpty(const String & path);
/// Create empty file at `path`.
virtual void createFile(const String & path) = 0;
/// Move the file from `from_path` to `to_path`.
/// If a file with `to_path` path already exists, an exception will be thrown .
virtual void moveFile(const String & from_path, const String & to_path) = 0;
@ -122,6 +125,9 @@ public:
/// Copy the file from `from_path` to `to_path`.
virtual void copyFile(const String & from_path, const String & to_path) = 0;
/// Recursively copy data containing at `from_path` to `to_path` located at `to_disk`.
virtual void copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path);
/// List files at `path` and add their names to `file_names`
virtual void listFiles(const String & path, std::vector<String> & file_names) = 0;
@ -147,11 +153,24 @@ public:
/// Remove file or directory with all children. Use with extra caution. Throws exception if file doesn't exists.
virtual void removeRecursive(const String & path) = 0;
/// Remove file or directory if it exists.
void removeIfExists(const String & path)
{
if (exists(path))
remove(path);
}
/// Set last modified time to file or directory at `path`.
virtual void setLastModified(const String & path, const Poco::Timestamp & timestamp) = 0;
/// Get last modified time of file or directory at `path`.
virtual Poco::Timestamp getLastModified(const String & path) = 0;
/// Set file at `path` as read-only.
virtual void setReadOnly(const String & path) = 0;
/// Create hardlink from `src_path` to `dst_path`.
virtual void createHardLink(const String & src_path, const String & dst_path) = 0;
};
using DiskPtr = std::shared_ptr<IDisk>;

View File

@ -1321,7 +1321,18 @@ BackgroundProcessingPool & Context::getBackgroundPool()
{
auto lock = getLock();
if (!shared->background_pool)
shared->background_pool.emplace(settings.background_pool_size);
{
BackgroundProcessingPool::PoolSettings pool_settings;
auto & config = getConfigRef();
pool_settings.thread_sleep_seconds = config.getDouble("background_processing_pool_thread_sleep_seconds", 10);
pool_settings.thread_sleep_seconds_random_part = config.getDouble("background_processing_pool_thread_sleep_seconds_random_part", 1.0);
pool_settings.thread_sleep_seconds_if_nothing_to_do = config.getDouble("background_processing_pool_thread_sleep_seconds_if_nothing_to_do", 0.1);
pool_settings.task_sleep_seconds_when_no_work_min = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_min", 10);
pool_settings.task_sleep_seconds_when_no_work_max = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_max", 600);
pool_settings.task_sleep_seconds_when_no_work_multiplier = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_multiplier", 1.1);
pool_settings.task_sleep_seconds_when_no_work_random_part = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_random_part", 1.0);
shared->background_pool.emplace(settings.background_pool_size, pool_settings);
}
return *shared->background_pool;
}

View File

@ -3,16 +3,13 @@
#include <optional>
#include <Core/Defines.h>
#include <IO/HashingWriteBuffer.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Poco/File.h>
#include <Poco/Path.h>
#include <Storages/MergeTree/localBackup.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/escapeForFileName.h>
#include <Common/localBackup.h>
#include <common/JSON.h>
#include <common/logger_useful.h>
@ -30,7 +27,6 @@ namespace ErrorCodes
extern const int NOT_FOUND_EXPECTED_DATA_PART;
extern const int BAD_SIZE_OF_FILE_IN_DATA_PART;
extern const int BAD_TTL_FILE;
extern const int CANNOT_UNLINK;
extern const int NOT_IMPLEMENTED;
}
@ -251,7 +247,7 @@ void IMergeTreeDataPart::removeIfNeeded()
if (is_temp)
{
String file_name = Poco::Path(relative_path).getFileName();
String file_name = fileName(relative_path);
if (file_name.empty())
throw Exception("relative_path " + relative_path + " of part " + name + " is invalid or not set", ErrorCodes::LOGICAL_ERROR);
@ -699,33 +695,33 @@ void IMergeTreeDataPart::remove() const
* And a race condition can happen that will lead to "File not found" error here.
*/
String from_ = storage.relative_data_path + relative_path;
String to_ = storage.relative_data_path + "delete_tmp_" + name;
String from = storage.relative_data_path + relative_path;
String to = storage.relative_data_path + "delete_tmp_" + name;
// TODO directory delete_tmp_<name> is never removed if server crashes before returning from this function
if (disk->exists(to_))
if (disk->exists(to))
{
LOG_WARNING(storage.log, "Directory " << fullPath(disk, to_) << " (to which part must be renamed before removing) already exists."
LOG_WARNING(storage.log, "Directory " << fullPath(disk, to) << " (to which part must be renamed before removing) already exists."
" Most likely this is due to unclean restart. Removing it.");
try
{
disk->removeRecursive(to_);
disk->removeRecursive(to);
}
catch (...)
{
LOG_ERROR(storage.log, "Cannot recursively remove directory " << fullPath(disk, to_) << ". Exception: " << getCurrentExceptionMessage(false));
LOG_ERROR(storage.log, "Cannot recursively remove directory " << fullPath(disk, to) << ". Exception: " << getCurrentExceptionMessage(false));
throw;
}
}
try
{
disk->moveFile(from_, to_);
disk->moveFile(from, to);
}
catch (const Poco::FileNotFoundException &)
{
LOG_ERROR(storage.log, "Directory " << fullPath(disk, to_) << " (part to remove) doesn't exist or one of nested files has gone."
LOG_ERROR(storage.log, "Directory " << fullPath(disk, to) << " (part to remove) doesn't exist or one of nested files has gone."
" Most likely this is due to manual removing. This should be discouraged. Ignoring.");
return;
@ -741,37 +737,25 @@ void IMergeTreeDataPart::remove() const
#endif
std::shared_lock<std::shared_mutex> lock(columns_lock);
/// TODO: IDisk doesn't support `unlink()` and `rmdir()` functionality.
auto to = fullPath(disk, to_);
for (const auto & [file, _] : checksums.files)
{
String path_to_remove = to + "/" + file;
if (0 != unlink(path_to_remove.c_str()))
throwFromErrnoWithPath("Cannot unlink file " + path_to_remove, path_to_remove, ErrorCodes::CANNOT_UNLINK);
}
disk->remove(to + "/" + file);
#if !__clang__
# pragma GCC diagnostic pop
#endif
for (const auto & file : {"checksums.txt", "columns.txt"})
{
String path_to_remove = to + "/" + file;
if (0 != unlink(path_to_remove.c_str()))
throwFromErrnoWithPath("Cannot unlink file " + path_to_remove, path_to_remove, ErrorCodes::CANNOT_UNLINK);
}
disk->remove(to + "/" + file);
if (0 != rmdir(to.c_str()))
throwFromErrnoWithPath("Cannot rmdir file " + to, to, ErrorCodes::CANNOT_UNLINK);
disk->remove(to);
}
catch (...)
{
/// Recursive directory removal does many excessive "stat" syscalls under the hood.
LOG_ERROR(storage.log, "Cannot quickly remove directory " << fullPath(disk, to_) << " by removing files; fallback to recursive removal. Reason: "
LOG_ERROR(storage.log, "Cannot quickly remove directory " << fullPath(disk, to) << " by removing files; fallback to recursive removal. Reason: "
<< getCurrentExceptionMessage(false));
disk->removeRecursive(to_ + "/");
disk->removeRecursive(to + "/");
}
}
@ -791,7 +775,7 @@ String IMergeTreeDataPart::getRelativePathForDetachedPart(const String & prefix)
{
res = "detached/" + (prefix.empty() ? "" : prefix + "_") + name + (try_no ? "_try" + DB::toString(try_no) : "");
if (!Poco::File(storage.getFullPathOnDisk(disk) + res).exists())
if (!disk->exists(getFullRelativePath() + res))
return res;
LOG_WARNING(storage.log, "Directory " << res << " (to detach to) already exists."
@ -812,10 +796,8 @@ void IMergeTreeDataPart::makeCloneInDetached(const String & prefix) const
assertOnDisk();
LOG_INFO(storage.log, "Detaching " << relative_path);
Poco::Path src(getFullPath());
Poco::Path dst(storage.getFullPathOnDisk(disk) + getRelativePathForDetachedPart(prefix));
/// Backup is not recursive (max_level is 0), so do not copy inner directories
localBackup(src, dst, 0);
localBackup(disk, getFullRelativePath(), storage.relative_data_path + getRelativePathForDetachedPart(prefix), 0);
}
void IMergeTreeDataPart::makeCloneOnDiskDetached(const ReservationPtr & reservation) const
@ -825,14 +807,13 @@ void IMergeTreeDataPart::makeCloneOnDiskDetached(const ReservationPtr & reservat
if (reserved_disk->getName() == disk->getName())
throw Exception("Can not clone data part " + name + " to same disk " + disk->getName(), ErrorCodes::LOGICAL_ERROR);
String path_to_clone = storage.getFullPathOnDisk(reserved_disk) + "detached/";
String path_to_clone = storage.relative_data_path + "detached/";
if (Poco::File(path_to_clone + relative_path).exists())
throw Exception("Path " + path_to_clone + relative_path + " already exists. Can not clone ", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
Poco::File(path_to_clone).createDirectory();
if (reserved_disk->exists(path_to_clone + relative_path))
throw Exception("Path " + fullPath(reserved_disk, path_to_clone + relative_path) + " already exists. Can not clone ", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
reserved_disk->createDirectory(path_to_clone);
Poco::File cloning_directory(getFullPath());
cloning_directory.copyTo(path_to_clone);
disk->copy(getFullRelativePath(), reserved_disk, path_to_clone);
}
void IMergeTreeDataPart::checkConsistencyBase() const

View File

@ -1,50 +1,49 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Interpreters/SyntaxAnalyzer.h>
#include <Compression/CompressedReadBuffer.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/NestedUtils.h>
#include <Formats/FormatFactory.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/HexWriteBuffer.h>
#include <IO/Operators.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/WriteBufferFromString.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Storages/MergeTree/MergeTreeSequentialBlockInputStream.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/MergedColumnOnlyOutputStream.h>
#include <Storages/MergeTree/MergeTreeDataPartCompact.h>
#include <Storages/MergeTree/MergeTreeDataPartWide.h>
#include <Storages/MergeTree/checkDataPart.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/AlterCommands.h>
#include <Parsers/ASTNameTypePair.h>
#include <Parsers/ASTLiteral.h>
#include <Interpreters/PartLog.h>
#include <Interpreters/SyntaxAnalyzer.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTNameTypePair.h>
#include <Parsers/ASTPartition.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <Formats/FormatFactory.h>
#include <DataStreams/copyData.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteBufferFromString.h>
#include <Compression/CompressedReadBuffer.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/HexWriteBuffer.h>
#include <IO/Operators.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeNullable.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
#include <Storages/AlterCommands.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeDataPartCompact.h>
#include <Storages/MergeTree/MergeTreeDataPartWide.h>
#include <Storages/MergeTree/MergeTreeSequentialBlockInputStream.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/MergedColumnOnlyOutputStream.h>
#include <Storages/MergeTree/checkDataPart.h>
#include <Storages/MergeTree/localBackup.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Common/Increment.h>
#include <Common/SimpleIncrement.h>
#include <Common/Stopwatch.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/escapeForFileName.h>
#include <Common/quoteString.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/Stopwatch.h>
#include <Common/typeid_cast.h>
#include <Common/localBackup.h>
#include <Interpreters/PartLog.h>
#include <Poco/DirectoryIterator.h>
@ -859,7 +858,6 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
const auto settings = getSettings();
std::vector<std::pair<String, DiskPtr>> part_names_with_disks;
Strings part_file_names;
Poco::DirectoryIterator end;
auto disks = getStoragePolicy()->getDisks();
@ -1318,7 +1316,7 @@ void MergeTreeData::rename(
for (const auto & disk : disks)
{
auto new_table_path_parent = Poco::Path(new_table_path).makeParent().toString();
auto new_table_path_parent = parentPath(new_table_path);
disk->createDirectory(new_table_path_parent);
disk->moveDirectory(relative_data_path, new_table_path);
}
@ -1713,8 +1711,8 @@ void MergeTreeData::alterDataPart(
size_t num_files_to_modify = transaction->rename_map.size();
size_t num_files_to_remove = 0;
for (const auto & from_to : transaction->rename_map)
if (from_to.second.empty())
for (const auto & [from, to] : transaction->rename_map)
if (to.empty())
++num_files_to_remove;
if (!skip_sanity_checks
@ -1732,18 +1730,18 @@ void MergeTreeData::alterDataPart(
<< ") files (";
bool first = true;
for (const auto & from_to : transaction->rename_map)
for (const auto & [from, to] : transaction->rename_map)
{
if (!first)
exception_message << ", ";
if (forbidden_because_of_modify)
{
exception_message << "from " << backQuote(from_to.first) << " to " << backQuote(from_to.second);
exception_message << "from " << backQuote(from) << " to " << backQuote(to);
first = false;
}
else if (from_to.second.empty())
else if (to.empty())
{
exception_message << backQuote(from_to.first);
exception_message << backQuote(from);
first = false;
}
}
@ -1813,28 +1811,28 @@ void MergeTreeData::alterDataPart(
/// Update the checksums.
DataPart::Checksums new_checksums = part->checksums;
for (const auto & it : transaction->rename_map)
for (const auto & [from, to] : transaction->rename_map)
{
if (it.second.empty())
new_checksums.files.erase(it.first);
if (to.empty())
new_checksums.files.erase(from);
else
new_checksums.files[it.second] = add_checksums.files[it.first];
new_checksums.files[to] = add_checksums.files[from];
}
/// Write the checksums to the temporary file.
if (!part->checksums.empty())
{
transaction->new_checksums = new_checksums;
WriteBufferFromFile checksums_file(part->getFullPath() + "checksums.txt.tmp", 4096);
new_checksums.write(checksums_file);
auto checksums_file = part->disk->writeFile(part->getFullRelativePath() + "checksums.txt.tmp", 4096);
new_checksums.write(*checksums_file);
transaction->rename_map["checksums.txt.tmp"] = "checksums.txt";
}
/// Write the new column list to the temporary file.
{
transaction->new_columns = new_columns.filter(part->getColumns().getNames());
WriteBufferFromFile columns_file(part->getFullPath() + "columns.txt.tmp", 4096);
transaction->new_columns.writeText(columns_file);
auto columns_file = part->disk->writeFile(part->getFullRelativePath() + "columns.txt.tmp", 4096);
transaction->new_columns.writeText(*columns_file);
transaction->rename_map["columns.txt.tmp"] = "columns.txt";
}
}
@ -1863,16 +1861,16 @@ void MergeTreeData::changeSettings(
for (const String & disk_name : all_diff_disk_names)
{
const auto & path = getFullPathOnDisk(new_storage_policy->getDiskByName(disk_name));
if (Poco::File(path).exists())
auto disk = new_storage_policy->getDiskByName(disk_name);
if (disk->exists(relative_data_path))
throw Exception("New storage policy contain disks which already contain data of a table with the same name", ErrorCodes::LOGICAL_ERROR);
}
for (const String & disk_name : all_diff_disk_names)
{
const auto & path = getFullPathOnDisk(new_storage_policy->getDiskByName(disk_name));
Poco::File(path).createDirectories();
Poco::File(path + "detached").createDirectory();
auto disk = new_storage_policy->getDiskByName(disk_name);
disk->createDirectories(relative_data_path);
disk->createDirectories(relative_data_path + "detached");
}
/// FIXME how would that be done while reloading configuration???
}
@ -1939,7 +1937,8 @@ void MergeTreeData::AlterDataPartTransaction::commit()
{
std::unique_lock<std::shared_mutex> lock(data_part->columns_lock);
String path = data_part->getFullPath();
auto disk = data_part->disk;
String path = data_part->getFullRelativePath();
/// NOTE: checking that a file exists before renaming or deleting it
/// is justified by the fact that, when converting an ordinary column
@ -1947,19 +1946,18 @@ void MergeTreeData::AlterDataPartTransaction::commit()
/// before, i.e. they do not have older versions.
/// 1) Rename the old files.
for (const auto & from_to : rename_map)
for (const auto & [from, to] : rename_map)
{
String name = from_to.second.empty() ? from_to.first : from_to.second;
Poco::File file{path + name};
if (file.exists())
file.renameTo(path + name + ".tmp2");
String name = to.empty() ? from : to;
if (disk->exists(path + name))
disk->moveFile(path + name, path + name + ".tmp2");
}
/// 2) Move new files in the place of old and update the metadata in memory.
for (const auto & from_to : rename_map)
for (const auto & [from, to] : rename_map)
{
if (!from_to.second.empty())
Poco::File{path + from_to.first}.renameTo(path + from_to.second);
if (!to.empty())
disk->moveFile(path + from, path + to);
}
auto & mutable_part = const_cast<DataPart &>(*data_part);
@ -1967,12 +1965,10 @@ void MergeTreeData::AlterDataPartTransaction::commit()
mutable_part.setColumns(new_columns);
/// 3) Delete the old files and drop required columns (DROP COLUMN)
for (const auto & from_to : rename_map)
for (const auto & [from, to] : rename_map)
{
String name = from_to.second.empty() ? from_to.first : from_to.second;
Poco::File file{path + name + ".tmp2"};
if (file.exists())
file.remove();
String name = to.empty() ? from : to;
disk->removeIfExists(path + name + ".tmp2");
}
mutable_part.bytes_on_disk = new_checksums.getTotalSizeOnDisk();
@ -2002,20 +1998,18 @@ MergeTreeData::AlterDataPartTransaction::~AlterDataPartTransaction()
{
LOG_WARNING(data_part->storage.log, "Aborting ALTER of part " << data_part->relative_path);
String path = data_part->getFullPath();
for (const auto & from_to : rename_map)
String path = data_part->getFullRelativePath();
for (const auto & [from, to] : rename_map)
{
if (!from_to.second.empty())
if (!to.empty())
{
try
{
Poco::File file(path + from_to.first);
if (file.exists())
file.remove();
data_part->disk->removeIfExists(path + from);
}
catch (Poco::Exception & e)
{
LOG_WARNING(data_part->storage.log, "Can't remove " << path + from_to.first << ": " << e.displayText());
LOG_WARNING(data_part->storage.log, "Can't remove " << fullPath(data_part->disk, path + from) << ": " << e.displayText());
}
}
}
@ -2029,14 +2023,13 @@ MergeTreeData::AlterDataPartTransaction::~AlterDataPartTransaction()
void MergeTreeData::PartsTemporaryRename::addPart(const String & old_name, const String & new_name)
{
old_and_new_names.push_back({old_name, new_name});
const auto paths = storage.getDataPaths();
for (const auto & full_path : paths)
for (const auto & [path, disk] : storage.getRelativeDataPathsWithDisks())
{
for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it)
for (auto it = disk->iterateDirectory(path + source_dir); it->isValid(); it->next())
{
if (it.name() == old_name)
if (it->name() == old_name)
{
old_part_name_to_full_path[old_name] = full_path;
old_part_name_to_path_and_disk[old_name] = {path, disk};
break;
}
}
@ -2050,11 +2043,12 @@ void MergeTreeData::PartsTemporaryRename::tryRenameAll()
{
try
{
const auto & names = old_and_new_names[i];
if (names.first.empty() || names.second.empty())
const auto & [old_name, new_name] = old_and_new_names[i];
if (old_name.empty() || new_name.empty())
throw DB::Exception("Empty part name. Most likely it's a bug.", ErrorCodes::INCORRECT_FILE_NAME);
const auto full_path = old_part_name_to_full_path[names.first] + source_dir; /// old_name
Poco::File(full_path + names.first).renameTo(full_path + names.second);
const auto & [path, disk] = old_part_name_to_path_and_disk[old_name];
const auto full_path = path + source_dir; /// for old_name
disk->moveFile(full_path + old_name, full_path + new_name);
}
catch (...)
{
@ -2070,15 +2064,16 @@ MergeTreeData::PartsTemporaryRename::~PartsTemporaryRename()
// TODO what if server had crashed before this destructor was called?
if (!renamed)
return;
for (const auto & names : old_and_new_names)
for (const auto & [old_name, new_name] : old_and_new_names)
{
if (names.first.empty())
if (old_name.empty())
continue;
try
{
const auto full_path = old_part_name_to_full_path[names.first] + source_dir; /// old_name
Poco::File(full_path + names.second).renameTo(full_path + names.first);
const auto & [path, disk] = old_part_name_to_path_and_disk[old_name];
const auto full_path = path + source_dir; /// for old_name
disk->moveFile(full_path + new_name, full_path + old_name);
}
catch (...)
{
@ -2690,14 +2685,15 @@ void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy)
auto part_it = data_parts_indexes.insert(part_copy).first;
modifyPartState(part_it, DataPartState::Committed);
Poco::Path marker_path(Poco::Path(original_active_part->getFullPath()), DELETE_ON_DESTROY_MARKER_PATH);
auto disk = original_active_part->disk;
String marker_path = original_active_part->getFullRelativePath() + DELETE_ON_DESTROY_MARKER_PATH;
try
{
Poco::File(marker_path).createFile();
disk->createFile(marker_path);
}
catch (Poco::Exception & e)
{
LOG_ERROR(log, e.what() << " (while creating DeleteOnDestroy marker: " + backQuote(marker_path.toString()) + ")");
LOG_ERROR(log, e.what() << " (while creating DeleteOnDestroy marker: " + backQuote(fullPath(disk, marker_path)) + ")");
}
return;
}
@ -2754,15 +2750,16 @@ MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_na
static void loadPartAndFixMetadataImpl(MergeTreeData::MutableDataPartPtr part)
{
String full_part_path = part->getFullPath();
auto disk = part->disk;
String full_part_path = part->getFullRelativePath();
/// Earlier the list of columns was written incorrectly. Delete it and re-create.
/// But in compact parts we can't get list of columns without this file.
if (isWidePart(part) && Poco::File(full_part_path + "columns.txt").exists())
Poco::File(full_part_path + "columns.txt").remove();
if (isWidePart(part))
disk->removeIfExists(full_part_path + "columns.txt");
part->loadColumnsChecksumsIndexes(false, true);
part->modification_time = Poco::File(full_part_path).getLastModified().epochTime();
part->modification_time = disk->getLastModified(full_part_path).epochTime();
/// If the checksums file is not present, calculate the checksums and write them to disk.
/// Check the data while we are at it.
@ -2770,11 +2767,11 @@ static void loadPartAndFixMetadataImpl(MergeTreeData::MutableDataPartPtr part)
{
part->checksums = checkDataPart(part, false);
{
WriteBufferFromFile out(full_part_path + "checksums.txt.tmp", 4096);
part->checksums.write(out);
auto out = disk->writeFile(full_part_path + "checksums.txt.tmp", 4096);
part->checksums.write(*out);
}
Poco::File(full_part_path + "checksums.txt.tmp").renameTo(full_part_path + "checksums.txt");
disk->moveFile(full_part_path + "checksums.txt.tmp", full_part_path + "checksums.txt");
}
}
@ -3097,15 +3094,14 @@ MergeTreeData::getDetachedParts() const
{
std::vector<DetachedPartInfo> res;
for (const auto & [path, disk] : getDataPathsWithDisks())
for (const auto & [path, disk] : getRelativeDataPathsWithDisks())
{
for (Poco::DirectoryIterator it(path + "detached");
it != Poco::DirectoryIterator(); ++it)
for (auto it = disk->iterateDirectory(path + "detached"); it->isValid(); it->next())
{
res.emplace_back();
auto & part = res.back();
DetachedPartInfo::tryParseDetachedPartName(it.name(), part, format_version);
DetachedPartInfo::tryParseDetachedPartName(it->name(), part, format_version);
part.disk = disk->getName();
}
}
@ -3117,9 +3113,9 @@ void MergeTreeData::validateDetachedPartName(const String & name) const
if (name.find('/') != std::string::npos || name == "." || name == "..")
throw DB::Exception("Invalid part name '" + name + "'", ErrorCodes::INCORRECT_FILE_NAME);
String full_path = getFullPathForPart(name, "detached/");
auto full_path = getFullRelativePathForPart(name, "detached/");
if (full_path.empty() || !Poco::File(full_path + name).exists())
if (!full_path)
throw DB::Exception("Detached part \"" + name + "\" not found" , ErrorCodes::BAD_DATA_PART_NAME);
if (startsWith(name, "attaching_") || startsWith(name, "deleting_"))
@ -3154,7 +3150,8 @@ void MergeTreeData::dropDetached(const ASTPtr & partition, bool part, const Cont
for (auto & [old_name, new_name] : renamed_parts.old_and_new_names)
{
Poco::File(renamed_parts.old_part_name_to_full_path[old_name] + "detached/" + new_name).remove(true);
const auto & [path, disk] = renamed_parts.old_part_name_to_path_and_disk[old_name];
disk->removeRecursive(path + "detached/" + new_name + "/");
LOG_DEBUG(log, "Dropped detached part " << old_name);
old_name.clear();
}
@ -3182,12 +3179,11 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
ActiveDataPartSet active_parts(format_version);
const auto disks = getStoragePolicy()->getDisks();
for (const DiskPtr & disk : disks)
for (auto & disk : disks)
{
const auto full_path = getFullPathOnDisk(disk);
for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it)
for (auto it = disk->iterateDirectory(relative_data_path + source_dir); it->isValid(); it->next())
{
const String & name = it.name();
const String & name = it->name();
MergeTreePartInfo part_info;
// TODO what if name contains "_tryN" suffix?
/// Parts with prefix in name (e.g. attaching_1_3_3_0, deleting_1_3_3_0) will be ignored
@ -3208,10 +3204,8 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
String containing_part = active_parts.getContainingPart(name);
if (!containing_part.empty() && containing_part != name)
{
auto full_path = getFullPathOnDisk(disk);
// TODO maybe use PartsTemporaryRename here?
Poco::File(full_path + source_dir + name)
.renameTo(full_path + source_dir + "inactive_" + name);
disk->moveDirectory(relative_data_path + source_dir + name, relative_data_path + source_dir + "inactive_" + name);
}
else
renamed_parts.addPart(name, "attaching_" + name);
@ -3576,22 +3570,22 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk(
String tmp_dst_part_name = tmp_part_prefix + dst_part_name;
auto reservation = reserveSpace(src_part->bytes_on_disk, src_part->disk);
String dst_part_path = getFullPathOnDisk(reservation->getDisk());
Poco::Path dst_part_absolute_path = Poco::Path(dst_part_path + tmp_dst_part_name).absolute();
Poco::Path src_part_absolute_path = Poco::Path(src_part->getFullPath()).absolute();
auto disk = reservation->getDisk();
String src_part_path = src_part->getFullRelativePath();
String dst_part_path = relative_data_path + tmp_dst_part_name;
if (Poco::File(dst_part_absolute_path).exists())
throw Exception("Part in " + dst_part_absolute_path.toString() + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
if (disk->exists(dst_part_path))
throw Exception("Part in " + fullPath(disk, dst_part_path) + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
LOG_DEBUG(log, "Cloning part " << src_part_absolute_path.toString() << " to " << dst_part_absolute_path.toString());
localBackup(src_part_absolute_path, dst_part_absolute_path);
LOG_DEBUG(log, "Cloning part " << fullPath(disk, src_part_path) << " to " << fullPath(disk, dst_part_path));
localBackup(disk, src_part_path, dst_part_path);
auto dst_data_part = createPart(dst_part_name, dst_part_info, reservation->getDisk(), tmp_dst_part_name);
dst_data_part->is_temp = true;
dst_data_part->loadColumnsChecksumsIndexes(require_part_metadata, true);
dst_data_part->modification_time = Poco::File(dst_part_absolute_path).getLastModified().epochTime();
dst_data_part->modification_time = disk->getLastModified(dst_part_path).epochTime();
return dst_data_part;
}
@ -3601,26 +3595,25 @@ String MergeTreeData::getFullPathOnDisk(const DiskPtr & disk) const
}
DiskPtr MergeTreeData::getDiskForPart(const String & part_name, const String & relative_path) const
DiskPtr MergeTreeData::getDiskForPart(const String & part_name, const String & additional_path) const
{
const auto disks = getStoragePolicy()->getDisks();
for (const DiskPtr & disk : disks)
{
const auto disk_path = getFullPathOnDisk(disk);
for (Poco::DirectoryIterator it = Poco::DirectoryIterator(disk_path + relative_path); it != Poco::DirectoryIterator(); ++it)
if (it.name() == part_name)
for (auto it = disk->iterateDirectory(relative_data_path + additional_path); it->isValid(); it->next())
if (it->name() == part_name)
return disk;
}
return nullptr;
}
String MergeTreeData::getFullPathForPart(const String & part_name, const String & relative_path) const
std::optional<String> MergeTreeData::getFullRelativePathForPart(const String & part_name, const String & additional_path) const
{
auto disk = getDiskForPart(part_name, relative_path);
auto disk = getDiskForPart(part_name, additional_path);
if (disk)
return getFullPathOnDisk(disk) + relative_path;
return "";
return relative_data_path + additional_path;
return {};
}
Strings MergeTreeData::getDataPaths() const
@ -3632,15 +3625,6 @@ Strings MergeTreeData::getDataPaths() const
return res;
}
MergeTreeData::PathsWithDisks MergeTreeData::getDataPathsWithDisks() const
{
PathsWithDisks res;
auto disks = getStoragePolicy()->getDisks();
for (const auto & disk : disks)
res.emplace_back(getFullPathOnDisk(disk), disk);
return res;
}
MergeTreeData::PathsWithDisks MergeTreeData::getRelativeDataPathsWithDisks() const
{
PathsWithDisks res;
@ -3657,6 +3641,8 @@ void MergeTreeData::freezePartitionsByMatcher(MatcherFn matcher, const String &
Poco::File(default_shadow_path).createDirectories();
auto increment = Increment(default_shadow_path + "increment.txt").get(true);
const String shadow_path = "shadow/";
/// Acquire a snapshot of active data parts to prevent removing while doing backup.
const auto data_parts = getDataParts();
@ -3666,9 +3652,8 @@ void MergeTreeData::freezePartitionsByMatcher(MatcherFn matcher, const String &
if (!matcher(part))
continue;
String shadow_path = part->disk->getPath() + "shadow/";
part->disk->createDirectories(shadow_path);
Poco::File(shadow_path).createDirectories();
String backup_path = shadow_path
+ (!with_name.empty()
? escapeForFileName(with_name)
@ -3677,11 +3662,8 @@ void MergeTreeData::freezePartitionsByMatcher(MatcherFn matcher, const String &
LOG_DEBUG(log, "Freezing part " << part->name << " snapshot will be placed at " + backup_path);
String part_absolute_path = Poco::Path(part->getFullPath()).absolute().toString();
String backup_part_absolute_path = backup_path
+ relative_data_path
+ part->relative_path;
localBackup(part_absolute_path, backup_part_absolute_path);
String backup_part_path = backup_path + relative_data_path + part->relative_path;
localBackup(part->disk, part->getFullRelativePath(), backup_part_path);
part->is_frozen.store(true, std::memory_order_relaxed);
++parts_processed;
}
@ -3853,11 +3835,10 @@ MergeTreeData::CurrentlyMovingPartsTagger MergeTreeData::checkPartsForMove(const
throw Exception("Move is not possible. Not enough space on '" + space->getName() + "'", ErrorCodes::NOT_ENOUGH_SPACE);
auto reserved_disk = reservation->getDisk();
String path_to_clone = getFullPathOnDisk(reserved_disk);
if (Poco::File(path_to_clone + part->name).exists())
if (reserved_disk->exists(relative_data_path + part->name))
throw Exception(
"Move is not possible: " + path_to_clone + part->name + " already exists",
"Move is not possible: " + fullPath(reserved_disk, relative_data_path + part->name) + " already exists",
ErrorCodes::DIRECTORY_ALREADY_EXISTS);
if (currently_moving_parts.count(part) || partIsAssignedToBackgroundOperation(part))

View File

@ -263,6 +263,7 @@ public:
};
using AlterDataPartTransactionPtr = std::unique_ptr<AlterDataPartTransaction>;
using PathWithDisk = std::pair<String, DiskPtr>;
struct PartsTemporaryRename : private boost::noncopyable
{
@ -285,7 +286,7 @@ public:
const MergeTreeData & storage;
const String source_dir;
std::vector<std::pair<String, String>> old_and_new_names;
std::unordered_map<String, String> old_part_name_to_full_path;
std::unordered_map<String, PathWithDisk> old_part_name_to_path_and_disk;
bool renamed = false;
};
@ -670,18 +671,17 @@ public:
/// Get table path on disk
String getFullPathOnDisk(const DiskPtr & disk) const;
/// Get disk for part. Looping through directories on FS because some parts maybe not in
/// active dataparts set (detached)
DiskPtr getDiskForPart(const String & part_name, const String & relative_path = "") const;
/// Get disk where part is located.
/// `additional_path` can be set if part is not located directly in table data path (e.g. 'detached/')
DiskPtr getDiskForPart(const String & part_name, const String & additional_path = "") const;
/// Get full path for part. Uses getDiskForPart and returns the full path
String getFullPathForPart(const String & part_name, const String & relative_path = "") const;
/// Get full path for part. Uses getDiskForPart and returns the full relative path.
/// `additional_path` can be set if part is not located directly in table data path (e.g. 'detached/')
std::optional<String> getFullRelativePathForPart(const String & part_name, const String & additional_path = "") const;
Strings getDataPaths() const override;
using PathWithDisk = std::pair<String, DiskPtr>;
using PathsWithDisks = std::vector<PathWithDisk>;
PathsWithDisks getDataPathsWithDisks() const;
PathsWithDisks getRelativeDataPathsWithDisks() const;
/// Reserves space at least 1MB.

View File

@ -26,9 +26,6 @@
#include <Common/SimpleIncrement.h>
#include <Common/interpolate.h>
#include <Common/typeid_cast.h>
#include <Common/createHardLink.h>
#include <Poco/File.h>
#include <Poco/DirectoryIterator.h>
#include <cmath>
#include <numeric>
#include <iomanip>
@ -576,10 +573,11 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
<< parts.front()->name << " to " << parts.back()->name
<< " into " << TMP_PREFIX + future_part.name + " with type " + future_part.type.toString());
String part_path = data.getFullPathOnDisk(space_reservation->getDisk());
auto disk = space_reservation->getDisk();
String part_path = data.relative_data_path;
String new_part_tmp_path = part_path + TMP_PREFIX + future_part.name + "/";
if (Poco::File(new_part_tmp_path).exists())
throw Exception("Directory " + new_part_tmp_path + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
if (disk->exists(new_part_tmp_path))
throw Exception("Directory " + fullPath(disk, new_part_tmp_path) + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
MergeTreeData::DataPart::ColumnToSize merged_column_to_size;
@ -598,7 +596,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
future_part.name,
future_part.type,
future_part.part_info,
space_reservation->getDisk(),
disk,
TMP_PREFIX + future_part.name);
new_data_part->setColumns(all_columns);
@ -633,16 +631,17 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
merge_entry->total_size_bytes_compressed,
static_cast<double> (merge_entry->total_size_bytes_compressed) / data.getTotalActiveSizeInBytes());
/// TODO: Should it go through IDisk interface?
String rows_sources_file_path;
std::unique_ptr<WriteBuffer> rows_sources_uncompressed_write_buf;
std::unique_ptr<WriteBufferFromFileBase> rows_sources_uncompressed_write_buf;
std::unique_ptr<WriteBuffer> rows_sources_write_buf;
std::optional<ColumnSizeEstimator> column_sizes;
if (merge_alg == MergeAlgorithm::Vertical)
{
Poco::File(new_part_tmp_path).createDirectories();
disk->createDirectories(new_part_tmp_path);
rows_sources_file_path = new_part_tmp_path + "rows_sources";
rows_sources_uncompressed_write_buf = std::make_unique<WriteBufferFromFile>(rows_sources_file_path);
rows_sources_uncompressed_write_buf = disk->writeFile(rows_sources_file_path);
rows_sources_write_buf = std::make_unique<CompressedWriteBuffer>(*rows_sources_uncompressed_write_buf);
for (const MergeTreeData::DataPartPtr & part : parts)
@ -832,6 +831,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
rows_sources_write_buf->next();
rows_sources_uncompressed_write_buf->next();
/// Ensure data has written to disk.
rows_sources_uncompressed_write_buf->finalize();
size_t rows_sources_count = rows_sources_write_buf->count();
/// In special case, when there is only one source part, and no rows were skipped, we may have
@ -842,7 +843,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
+ ") differs from number of bytes written to rows_sources file (" + toString(rows_sources_count)
+ "). It is a bug.", ErrorCodes::LOGICAL_ERROR);
CompressedReadBufferFromFile rows_sources_read_buf(rows_sources_file_path, 0, 0, 0);
CompressedReadBufferFromFile rows_sources_read_buf(disk->readFile(rows_sources_file_path));
IMergedBlockOutputStream::WrittenOffsetColumns written_offset_columns;
for (size_t column_num = 0, gathering_column_names_size = gathering_column_names.size();
@ -909,7 +910,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
merge_entry->progress.store(progress_before + column_sizes->columnWeight(column_name), std::memory_order_relaxed);
}
Poco::File(rows_sources_file_path).remove();
disk->remove(rows_sources_file_path);
}
for (const auto & part : parts)
@ -1018,7 +1019,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
new_data_part->index_granularity_info = source_part->index_granularity_info;
new_data_part->setColumns(getColumnsForNewDataPart(source_part, updated_header, all_columns));
String new_part_tmp_path = new_data_part->getFullPath();
auto disk = new_data_part->disk;
String new_part_tmp_path = new_data_part->getFullRelativePath();
/// Note: this is done before creating input streams, because otherwise data.data_parts_mutex
/// (which is locked in data.getTotalActiveSizeInBytes()) is locked after part->columns_lock
@ -1029,7 +1031,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
source_part->bytes_on_disk, static_cast<double>(source_part->bytes_on_disk) / data.getTotalActiveSizeInBytes());
Poco::File(new_part_tmp_path).createDirectories();
disk->createDirectories(new_part_tmp_path);
/// Don't change granularity type while mutating subset of columns
auto mrk_extension = source_part->index_granularity_info.is_adaptive ? getAdaptiveMrkExtension(new_data_part->getType())
@ -1125,17 +1127,15 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
if (need_remove_expired_values)
files_to_skip.insert("ttl.txt");
Poco::DirectoryIterator dir_end;
/// Create hardlinks for unchanged files
for (Poco::DirectoryIterator dir_it(source_part->getFullPath()); dir_it != dir_end; ++dir_it)
for (auto it = disk->iterateDirectory(source_part->getFullRelativePath()); it->isValid(); it->next())
{
if (files_to_skip.count(dir_it.name()) || files_to_remove.count(dir_it.name()))
if (files_to_skip.count(it->name()) || files_to_remove.count(it->name()))
continue;
Poco::Path destination(new_part_tmp_path);
destination.append(dir_it.name());
String destination = new_part_tmp_path + "/" + it->name();
createHardLink(dir_it.path().toString(), destination.toString());
disk->createHardLink(it->path(), destination);
}
merge_entry->columns_written = all_columns.size() - updated_header.columns();
@ -1181,8 +1181,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
if (need_remove_expired_values)
{
/// Write a file with ttl infos in json format.
WriteBufferFromFile out_ttl(new_part_tmp_path + "ttl.txt", 4096);
HashingWriteBuffer out_hashing(out_ttl);
auto out_ttl = disk->writeFile(new_part_tmp_path + "ttl.txt", 4096);
HashingWriteBuffer out_hashing(*out_ttl);
new_data_part->ttl_infos.write(out_hashing);
new_data_part->checksums.files["ttl.txt"].file_size = out_hashing.count();
new_data_part->checksums.files["ttl.txt"].file_hash = out_hashing.getHash();
@ -1193,15 +1193,15 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
new_data_part->checksums.files.erase(removed_file);
{
/// Write file with checksums.
WriteBufferFromFile out_checksums(new_part_tmp_path + "checksums.txt", 4096);
new_data_part->checksums.write(out_checksums);
auto out_checksums = disk->writeFile(new_part_tmp_path + "checksums.txt", 4096);
new_data_part->checksums.write(*out_checksums);
} /// close fd
{
/// Write a file with a description of columns.
WriteBufferFromFile out_columns(new_part_tmp_path + "columns.txt", 4096);
new_data_part->getColumns().writeText(out_columns);
auto out_columns = disk->writeFile(new_part_tmp_path + "columns.txt", 4096);
new_data_part->getColumns().writeText(*out_columns);
} /// close
new_data_part->rows_count = source_part->rows_count;

View File

@ -42,7 +42,6 @@ struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
M(SettingUInt64, number_of_free_entries_in_pool_to_execute_mutation, 10, "When there is less than specified number of free entries in pool, do not execute part mutations. This is to leave free threads for regular merges and avoid \"Too many parts\"", 0) \
M(SettingSeconds, old_parts_lifetime, 8 * 60, "How many seconds to keep obsolete parts.", 0) \
M(SettingSeconds, temporary_directories_lifetime, 86400, "How many seconds to keep tmp_-directories.", 0) \
M(SettingBool, disable_background_merges, false, "Disable background merges.", 0) \
\
/** Inserts settings. */ \
M(SettingUInt64, parts_to_delay_insert, 150, "If table contains at least that many active parts in single partition, artificially slow down insert into table.", 0) \

View File

@ -99,7 +99,6 @@ IMergeTreeDataPart::Checksums checkDataPart(
throw Exception("Unknown type in part " + path, ErrorCodes::UNKNOWN_PART_TYPE);
}
Poco::DirectoryIterator dir_end;
for (auto it = disk->iterateDirectory(path); it->isValid(); it->next())
{
const String & file_name = it->name();

View File

@ -1,13 +1,8 @@
#include "localBackup.h"
#include <Common/createHardLink.h>
#include <Common/Exception.h>
#include <Poco/DirectoryIterator.h>
#include <Poco/Path.h>
#include <Poco/File.h>
#include <string>
#include <iostream>
#include <errno.h>
#include <cerrno>
namespace DB
@ -20,7 +15,7 @@ namespace ErrorCodes
}
static void localBackupImpl(const Poco::Path & source_path, const Poco::Path & destination_path, size_t level,
static void localBackupImpl(const DiskPtr & disk, const String & source_path, const String & destination_path, size_t level,
std::optional<size_t> max_level)
{
if (max_level && level > *max_level)
@ -29,34 +24,30 @@ static void localBackupImpl(const Poco::Path & source_path, const Poco::Path & d
if (level >= 1000)
throw DB::Exception("Too deep recursion", DB::ErrorCodes::TOO_DEEP_RECURSION);
Poco::File(destination_path).createDirectories();
disk->createDirectories(destination_path);
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator dir_it(source_path); dir_it != dir_end; ++dir_it)
for (auto it = disk->iterateDirectory(source_path); it->isValid(); it->next())
{
Poco::Path source = dir_it.path();
Poco::Path destination = destination_path;
destination.append(dir_it.name());
auto source = it->path();
auto destination = destination_path + "/" + it->name();
if (!dir_it->isDirectory())
if (!disk->isDirectory(source))
{
dir_it->setReadOnly();
createHardLink(source.toString(), destination.toString());
disk->setReadOnly(source);
disk->createHardLink(source, destination);
}
else
{
localBackupImpl(source, destination, level + 1, max_level);
localBackupImpl(disk, source, destination, level + 1, max_level);
}
}
}
void localBackup(const Poco::Path & source_path, const Poco::Path & destination_path, std::optional<size_t> max_level)
void localBackup(const DiskPtr & disk, const String & source_path, const String & destination_path, std::optional<size_t> max_level)
{
if (Poco::File(destination_path).exists()
&& Poco::DirectoryIterator(destination_path) != Poco::DirectoryIterator())
if (disk->exists(destination_path) && !disk->isDirectoryEmpty(destination_path))
{
throw DB::Exception("Directory " + destination_path.toString() + " already exists and is not empty.", DB::ErrorCodes::DIRECTORY_ALREADY_EXISTS);
throw DB::Exception("Directory " + fullPath(disk, destination_path) + " already exists and is not empty.", DB::ErrorCodes::DIRECTORY_ALREADY_EXISTS);
}
size_t try_no = 0;
@ -70,7 +61,7 @@ void localBackup(const Poco::Path & source_path, const Poco::Path & destination_
{
try
{
localBackupImpl(source_path, destination_path, 0, max_level);
localBackupImpl(disk, source_path, destination_path, 0, max_level);
}
catch (const DB::ErrnoException & e)
{

View File

@ -1,8 +1,8 @@
#pragma once
#include <optional>
namespace Poco { class Path; }
#include <Core/Types.h>
#include <Disks/IDisk.h>
namespace DB
{
@ -20,6 +20,6 @@ namespace DB
* If max_level is specified, than only files which depth relative source_path less or equal max_level will be copied.
* So, if max_level=0 than only direct file child are copied.
*/
void localBackup(const Poco::Path & source_path, const Poco::Path & destination_path, std::optional<size_t> max_level = {});
void localBackup(const DiskPtr & disk, const String & source_path, const String & destination_path, std::optional<size_t> max_level = {});
}

View File

@ -21,8 +21,6 @@
#include <Disks/DiskSpaceMonitor.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/checkDataPart.h>
#include <Poco/DirectoryIterator.h>
#include <Poco/File.h>
#include <optional>
#include <Interpreters/MutationsInterpreter.h>
#include <Processors/Pipe.h>
@ -95,8 +93,7 @@ void StorageMergeTree::startup()
/// NOTE background task will also do the above cleanups periodically.
time_after_previous_cleanup.restart();
if (!getSettings()->disable_background_merges)
merging_mutating_task_handle = global_context.getBackgroundPool().addTask([this] { return mergeMutateTask(); });
merging_mutating_task_handle = global_context.getBackgroundPool().addTask([this] { return mergeMutateTask(); });
if (areBackgroundMovesNeeded())
moving_task_handle = global_context.getBackgroundMovePool().addTask([this] { return movePartsTask(); });
}

View File

@ -0,0 +1,5 @@
<yandex>
<background_processing_pool_thread_sleep_seconds>0.5</background_processing_pool_thread_sleep_seconds>
<background_processing_pool_task_sleep_seconds_when_no_work_min>0.5</background_processing_pool_task_sleep_seconds_when_no_work_min>
<background_processing_pool_task_sleep_seconds_when_no_work_max>0.5</background_processing_pool_task_sleep_seconds_when_no_work_max>
</yandex>

View File

@ -0,0 +1,28 @@
<yandex>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>
<hdd>
<type>local</type>
<path>/</path>
</hdd>
</disks>
<policies>
<default>
<volumes>
<main>
<disk>s3</disk>
</main>
<external>
<disk>hdd</disk>
</external>
</volumes>
</default>
</policies>
</storage_configuration>
</yandex>

View File

@ -0,0 +1,6 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default/>
</profiles>
</yandex>

View File

@ -1,25 +1,5 @@
<?xml version="1.0"?>
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
<size>1000M</size>
<count>10</count>
</logger>
<storage_configuration>
<disks>
<default>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</default>
</disks>
</storage_configuration>
<tcp_port>9000</tcp_port>
<listen_host>127.0.0.1</listen_host>

View File

@ -1,23 +0,0 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
</users>
<quotas>
<default>
</default>
</quotas>
</yandex>

View File

@ -1,6 +1,7 @@
import logging
import random
import string
import time
import pytest
from helpers.cluster import ClickHouseCluster
@ -36,56 +37,212 @@ def cluster():
cluster.shutdown()
FILES_OVERHEAD = 1
FILES_OVERHEAD_PER_COLUMN = 2 # Data and mark files
FILES_OVERHEAD_PER_PART_WIDE = FILES_OVERHEAD_PER_COLUMN * 3 + 2 + 6
FILES_OVERHEAD_PER_PART_COMPACT = 10
def random_string(length):
letters = string.ascii_letters
return ''.join(random.choice(letters) for i in range(length))
def generate_values(date_str, count):
data = [[date_str, i, random_string(10)] for i in range(count)]
def generate_values(date_str, count, sign=1):
data = [[date_str, sign*(i + 1), random_string(10)] for i in range(count)]
data.sort(key=lambda tup: tup[1])
return ",".join(["('{}',{},'{}')".format(x, y, z) for x, y, z in data])
@pytest.mark.parametrize(
"min_rows_for_wide_part,files_overhead,files_per_part",
[
(0, 1, 14),
(8192, 1, 10)
]
)
def test_log_family_s3(cluster, min_rows_for_wide_part, files_overhead, files_per_part):
def create_table(cluster, additional_settings=None):
node = cluster.instances["node"]
minio = cluster.minio_client
node.query(
"""
create_table_statement = """
CREATE TABLE s3_test(
dt Date,
id UInt64,
id Int64,
data String,
INDEX min_max (id) TYPE minmax GRANULARITY 3
) ENGINE=MergeTree()
PARTITION BY dt
ORDER BY (dt, id)
SETTINGS disable_background_merges='true', index_granularity=512, min_rows_for_wide_part={}
SETTINGS
old_parts_lifetime=0, index_granularity=512
"""
.format(min_rows_for_wide_part)
)
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 1
values1 = generate_values('2020-01-03', 4096)
node.query("INSERT INTO s3_test VALUES {}".format(values1))
assert node.query("SELECT * FROM s3_test order by dt, id FORMAT Values") == values1
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == files_overhead + files_per_part
if additional_settings:
create_table_statement += ","
create_table_statement += additional_settings
values2 = generate_values('2020-01-04', 4096)
node.query("INSERT INTO s3_test VALUES {}".format(values2))
assert node.query("SELECT * FROM s3_test ORDER BY dt, id FORMAT Values") == values1 + "," + values2
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == files_overhead + 2 * files_per_part
node.query(create_table_statement)
assert node.query("SELECT count(*) FROM s3_test where id = 0 FORMAT Values") == "(2)"
@pytest.fixture(autouse=True)
def drop_table(cluster):
yield
node = cluster.instances["node"]
minio = cluster.minio_client
node.query("DROP TABLE s3_test")
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 0
@pytest.mark.parametrize(
"min_rows_for_wide_part,files_per_part",
[
(0, FILES_OVERHEAD_PER_PART_WIDE),
(8192, FILES_OVERHEAD_PER_PART_COMPACT)
]
)
def test_simple_insert_select(cluster, min_rows_for_wide_part, files_per_part):
create_table(cluster, "min_rows_for_wide_part={}".format(min_rows_for_wide_part))
node = cluster.instances["node"]
minio = cluster.minio_client
values1 = generate_values('2020-01-03', 4096)
node.query("INSERT INTO s3_test VALUES {}".format(values1))
assert node.query("SELECT * FROM s3_test order by dt, id FORMAT Values") == values1
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + files_per_part
values2 = generate_values('2020-01-04', 4096)
node.query("INSERT INTO s3_test VALUES {}".format(values2))
assert node.query("SELECT * FROM s3_test ORDER BY dt, id FORMAT Values") == values1 + "," + values2
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + files_per_part*2
assert node.query("SELECT count(*) FROM s3_test where id = 1 FORMAT Values") == "(2)"
@pytest.mark.parametrize(
"merge_vertical", [False, True]
)
def test_insert_same_partition_and_merge(cluster, merge_vertical):
settings = None
if merge_vertical:
settings = """
vertical_merge_algorithm_min_rows_to_activate=0,
vertical_merge_algorithm_min_columns_to_activate=0
"""
create_table(cluster, settings)
node = cluster.instances["node"]
minio = cluster.minio_client
node.query("SYSTEM STOP MERGES s3_test")
node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-03', 1024)))
node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-03', 2048)))
node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-03', 4096)))
node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-03', 1024, -1)))
node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-03', 2048, -1)))
node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-03', 4096, -1)))
assert node.query("SELECT sum(id) FROM s3_test FORMAT Values") == "(0)"
assert node.query("SELECT count(distinct(id)) FROM s3_test FORMAT Values") == "(8192)"
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD_PER_PART_WIDE*6 + FILES_OVERHEAD
node.query("SYSTEM START MERGES s3_test")
# Wait for merges and old parts deletion
time.sleep(3)
assert node.query("SELECT sum(id) FROM s3_test FORMAT Values") == "(0)"
assert node.query("SELECT count(distinct(id)) FROM s3_test FORMAT Values") == "(8192)"
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD
def test_alter_table_columns(cluster):
create_table(cluster)
node = cluster.instances["node"]
minio = cluster.minio_client
node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-03', 4096)))
node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-03', 4096, -1)))
node.query("ALTER TABLE s3_test ADD COLUMN col1 UInt64 DEFAULT 1")
# To ensure parts have merged
node.query("OPTIMIZE TABLE s3_test")
# Wait for merges, mutations and old parts deletion
time.sleep(3)
assert node.query("SELECT sum(col1) FROM s3_test FORMAT Values") == "(8192)"
assert node.query("SELECT sum(col1) FROM s3_test WHERE id > 0 FORMAT Values") == "(4096)"
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD_PER_COLUMN
node.query("ALTER TABLE s3_test MODIFY COLUMN col1 String")
assert node.query("SELECT distinct(col1) FROM s3_test FORMAT Values") == "('1')"
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD_PER_COLUMN
node.query("ALTER TABLE s3_test DROP COLUMN col1")
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE
def test_attach_detach_partition(cluster):
create_table(cluster)
node = cluster.instances["node"]
minio = cluster.minio_client
node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-03', 4096)))
node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-04', 4096)))
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE*2
node.query("ALTER TABLE s3_test DETACH PARTITION '2020-01-03'")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(4096)"
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE*2
node.query("ALTER TABLE s3_test ATTACH PARTITION '2020-01-03'")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE*2
node.query("ALTER TABLE s3_test DROP PARTITION '2020-01-03'")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(4096)"
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE
node.query("ALTER TABLE s3_test DETACH PARTITION '2020-01-04'")
node.query("SET allow_drop_detached=1; ALTER TABLE s3_test DROP DETACHED PARTITION '2020-01-04'")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(0)"
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD
def test_move_partition(cluster):
create_table(cluster)
node = cluster.instances["node"]
minio = cluster.minio_client
node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-03', 4096)))
node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-04', 4096)))
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE*2
node.query("ALTER TABLE s3_test MOVE PARTITION '2020-01-04' TO DISK 'hdd'")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE
def test_table_manipulations(cluster):
create_table(cluster)
node = cluster.instances["node"]
minio = cluster.minio_client
node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-03', 4096)))
node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-04', 4096)))
node.query("RENAME TABLE s3_test TO s3_renamed")
assert node.query("SELECT count(*) FROM s3_renamed FORMAT Values") == "(8192)"
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE*2
node.query("RENAME TABLE s3_renamed TO s3_test")
# TODO: Doesn't work with min_max index.
#assert node.query("SET check_query_single_value_result='false'; CHECK TABLE s3_test FORMAT Values") == "(1)"
node.query("DETACH TABLE s3_test")
node.query("ATTACH TABLE s3_test")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE*2
node.query("TRUNCATE TABLE s3_test")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(0)"
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD

View File

@ -1,13 +1,13 @@
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <DataTypes/DataTypeDate.h>
#include <IO/HashingWriteBuffer.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/HashingWriteBuffer.h>
#include <Common/Exception.h>
#include <Common/localBackup.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MergeTree/localBackup.h>
#include <Common/Exception.h>
#include <boost/program_options.hpp>
#include <Poco/Path.h>
@ -73,7 +73,7 @@ void run(String part_path, String date_column, String dest_path)
{
/// If the file is already deleted, do nothing.
}
localBackup(old_part_path, new_tmp_part_path, {});
localBackup(disk, old_part_path.toString(), new_tmp_part_path.toString(), {});
WriteBufferFromFile count_out(new_tmp_part_path_str + "count.txt", 4096);
HashingWriteBuffer count_out_hashing(count_out);