Merge pull request #9415 from Jokser/merge-tree-s3-initial

Storage MergeTree initial support for S3.
This commit is contained in:
Nikita Mikhaylov 2020-03-10 13:22:44 +03:00 committed by GitHub
commit b15532dbdf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
60 changed files with 771 additions and 470 deletions

View File

@ -1,10 +1,10 @@
#include "CachedCompressedReadBuffer.h"
#include <IO/createReadBufferFromFileBase.h>
#include <IO/WriteHelpers.h>
#include <Compression/CompressionInfo.h>
#include <Compression/LZ4_decompress_faster.h>
#include <utility>
namespace DB
{
@ -19,7 +19,7 @@ void CachedCompressedReadBuffer::initInput()
{
if (!file_in)
{
file_in = createReadBufferFromFileBase(path, estimated_size, aio_threshold, mmap_threshold, buf_size);
file_in = file_in_creator();
compressed_in = file_in.get();
if (profile_callback)
@ -71,17 +71,12 @@ bool CachedCompressedReadBuffer::nextImpl()
return true;
}
CachedCompressedReadBuffer::CachedCompressedReadBuffer(
const std::string & path_, UncompressedCache * cache_,
size_t estimated_size_, size_t aio_threshold_, size_t mmap_threshold_,
size_t buf_size_)
: ReadBuffer(nullptr, 0), path(path_), cache(cache_), buf_size(buf_size_), estimated_size(estimated_size_),
aio_threshold(aio_threshold_), mmap_threshold(mmap_threshold_), file_pos(0)
const std::string & path_, std::function<std::unique_ptr<ReadBufferFromFileBase>()> file_in_creator_, UncompressedCache * cache_)
: ReadBuffer(nullptr, 0), file_in_creator(std::move(file_in_creator_)), cache(cache_), path(path_), file_pos(0)
{
}
void CachedCompressedReadBuffer::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block)
{
if (owned_cell &&

View File

@ -20,14 +20,11 @@ namespace DB
class CachedCompressedReadBuffer : public CompressedReadBufferBase, public ReadBuffer
{
private:
const std::string path;
std::function<std::unique_ptr<ReadBufferFromFileBase>()> file_in_creator;
UncompressedCache * cache;
size_t buf_size;
size_t estimated_size;
size_t aio_threshold;
size_t mmap_threshold;
std::unique_ptr<ReadBufferFromFileBase> file_in;
const std::string path;
size_t file_pos;
/// A piece of data from the cache, or a piece of read data that we put into the cache.
@ -41,11 +38,7 @@ private:
clockid_t clock_type {};
public:
CachedCompressedReadBuffer(
const std::string & path_, UncompressedCache * cache_,
size_t estimated_size_, size_t aio_threshold_, size_t mmap_threshold_,
size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE);
CachedCompressedReadBuffer(const std::string & path, std::function<std::unique_ptr<ReadBufferFromFileBase>()> file_in_creator, UncompressedCache * cache_);
void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block);

View File

@ -5,6 +5,7 @@
#include <Compression/CompressionFactory.h>
#include <Compression/CachedCompressedReadBuffer.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/createReadBufferFromFileBase.h>
#include <IO/copyData.h>
#include <Common/Stopwatch.h>
@ -32,7 +33,14 @@ int main(int argc, char ** argv)
{
Stopwatch watch;
CachedCompressedReadBuffer in(path, &cache, 0, 0, 0);
CachedCompressedReadBuffer in(
path,
[&]()
{
return createReadBufferFromFileBase(path, 0, 0, 0);
},
&cache
);
WriteBufferFromFile out("/dev/null");
copyData(in, out);
@ -44,7 +52,14 @@ int main(int argc, char ** argv)
{
Stopwatch watch;
CachedCompressedReadBuffer in(path, &cache, 0, 0, 0);
CachedCompressedReadBuffer in(
path,
[&]()
{
return createReadBufferFromFileBase(path, 0, 0, 0);
},
&cache
);
WriteBufferFromFile out("/dev/null");
copyData(in, out);

View File

@ -67,6 +67,8 @@ public:
return dir_path + iter.name();
}
String name() const override { return iter.name(); }
private:
String dir_path;
Poco::DirectoryIterator iter;
@ -237,6 +239,21 @@ void DiskLocal::removeRecursive(const String & path)
Poco::File(disk_path + path).remove(true);
}
void DiskLocal::listFiles(const String & path, std::vector<String> & file_names)
{
Poco::File(disk_path + path).list(file_names);
}
void DiskLocal::setLastModified(const String & path, const Poco::Timestamp & timestamp)
{
Poco::File(disk_path + path).setLastModified(timestamp);
}
Poco::Timestamp DiskLocal::getLastModified(const String & path)
{
return Poco::File(disk_path + path).getLastModified();
}
void DiskLocalReservation::update(UInt64 new_size)
{

View File

@ -67,6 +67,8 @@ public:
void copyFile(const String & from_path, const String & to_path) override;
void listFiles(const String & path, std::vector<String> & file_names) override;
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
@ -85,6 +87,10 @@ public:
void removeRecursive(const String & path) override;
void setLastModified(const String & path, const Poco::Timestamp & timestamp) override;
Poco::Timestamp getLastModified(const String & path) override;
private:
bool tryReserve(UInt64 bytes);

View File

@ -6,6 +6,7 @@
#include <IO/WriteBufferFromFileBase.h>
#include <IO/WriteBufferFromString.h>
#include <Interpreters/Context.h>
#include <Poco/Path.h>
namespace DB
@ -23,7 +24,7 @@ namespace ErrorCodes
class DiskMemoryDirectoryIterator : public IDiskDirectoryIterator
{
public:
explicit DiskMemoryDirectoryIterator(std::vector<String> && dir_file_paths_)
explicit DiskMemoryDirectoryIterator(std::vector<Poco::Path> && dir_file_paths_)
: dir_file_paths(std::move(dir_file_paths_)), iter(dir_file_paths.begin())
{
}
@ -32,11 +33,13 @@ public:
bool isValid() const override { return iter != dir_file_paths.end(); }
String path() const override { return *iter; }
String path() const override { return (*iter).toString(); }
String name() const override { return (*iter).getFileName(); }
private:
std::vector<String> dir_file_paths;
std::vector<String>::iterator iter;
std::vector<Poco::Path> dir_file_paths;
std::vector<Poco::Path>::iterator iter;
};
/// Adapter with actual behaviour as ReadBufferFromString.
@ -264,10 +267,10 @@ DiskDirectoryIteratorPtr DiskMemory::iterateDirectory(const String & path)
if (!path.empty() && files.find(path) == files.end())
throw Exception("Directory '" + path + "' does not exist", ErrorCodes::DIRECTORY_DOESNT_EXIST);
std::vector<String> dir_file_paths;
std::vector<Poco::Path> dir_file_paths;
for (const auto & file : files)
if (parentPath(file.first) == path)
dir_file_paths.push_back(file.first);
dir_file_paths.emplace_back(file.first);
return std::make_unique<DiskMemoryDirectoryIterator>(std::move(dir_file_paths));
}
@ -381,6 +384,12 @@ void DiskMemory::removeRecursive(const String & path)
}
}
void DiskMemory::listFiles(const String & path, std::vector<String> & file_names)
{
for (auto it = iterateDirectory(path); it->isValid(); it->next())
file_names.push_back(it->name());
}
using DiskMemoryPtr = std::shared_ptr<DiskMemory>;

View File

@ -60,6 +60,8 @@ public:
void copyFile(const String & from_path, const String & to_path) override;
void listFiles(const String & path, std::vector<String> & file_names) override;
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
@ -78,6 +80,10 @@ public:
void removeRecursive(const String & path) override;
void setLastModified(const String &, const Poco::Timestamp &) override { }
Poco::Timestamp getLastModified(const String &) override { return Poco::Timestamp(); }
private:
void createDirectoriesImpl(const String & path);
void replaceFileImpl(const String & from_path, const String & to_path);

View File

@ -28,7 +28,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int FILE_ALREADY_EXISTS;
extern const int PATH_ACCESS_DENIED;
extern const int SEEK_POSITION_OUT_OF_BOUND;
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int UNKNOWN_FORMAT;
}
@ -157,19 +156,36 @@ namespace
off_t seek(off_t offset_, int whence) override
{
if (whence != SEEK_SET)
throw Exception("Only SEEK_SET mode is allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
if (offset_ < 0 || metadata.total_size <= static_cast<UInt64>(offset_))
throw Exception(
"Seek position is out of bounds. "
"Offset: "
+ std::to_string(offset_) + ", Max: " + std::to_string(metadata.total_size),
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
if (whence == SEEK_CUR)
{
/// If position within current working buffer - shift pos.
if (working_buffer.size() && size_t(getPosition() + offset_) < absolute_position)
{
pos += offset_;
return getPosition();
}
else
{
absolute_position += offset_;
}
}
else if (whence == SEEK_SET)
{
/// If position within current working buffer - shift pos.
if (working_buffer.size() && size_t(offset_) >= absolute_position - working_buffer.size()
&& size_t(offset_) < absolute_position)
{
pos = working_buffer.end() - (absolute_position - offset_);
return getPosition();
}
else
{
absolute_position = offset_;
}
}
else
throw Exception("Only SEEK_SET or SEEK_CUR modes are allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
/// TODO: Do not re-initialize buffer if current position within working buffer.
current_buf = initialize();
pos = working_buffer.end();
@ -187,8 +203,7 @@ namespace
for (UInt32 i = 0; i < metadata.s3_objects_count; ++i)
{
current_buf_idx = i;
auto path = metadata.s3_objects[i].first;
auto size = metadata.s3_objects[i].second;
auto [path, size] = metadata.s3_objects[i];
if (size > offset)
{
auto buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, path, buf_size);
@ -325,6 +340,8 @@ public:
return folder_path + iter.name();
}
String name() const override { return iter.name(); }
private:
Poco::DirectoryIterator iter;
String folder_path;
@ -547,7 +564,7 @@ void DiskS3::removeRecursive(const String & path)
Poco::File file(metadata_path + path);
if (file.isFile())
{
remove(metadata_path + path);
remove(path);
}
else
{
@ -591,6 +608,22 @@ bool DiskS3::tryReserve(UInt64 bytes)
return false;
}
void DiskS3::listFiles(const String & path, std::vector<String> & file_names)
{
for (auto it = iterateDirectory(path); it->isValid(); it->next())
file_names.push_back(it->name());
}
void DiskS3::setLastModified(const String & path, const Poco::Timestamp & timestamp)
{
Poco::File(metadata_path + path).setLastModified(timestamp);
}
Poco::Timestamp DiskS3::getLastModified(const String & path)
{
return Poco::File(metadata_path + path).getLastModified();
}
DiskS3Reservation::~DiskS3Reservation()
{

View File

@ -67,6 +67,8 @@ public:
void copyFile(const String & from_path, const String & to_path) override;
void listFiles(const String & path, std::vector<String> & file_names) override;
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
@ -85,6 +87,10 @@ public:
void removeRecursive(const String & path) override;
void setLastModified(const String & path, const Poco::Timestamp & timestamp) override;
Poco::Timestamp getLastModified(const String & path) override;
private:
String getRandomName() const;

View File

@ -10,6 +10,7 @@
#include <utility>
#include <boost/noncopyable.hpp>
#include <Poco/Path.h>
#include <Poco/Timestamp.h>
namespace CurrentMetrics
@ -121,6 +122,9 @@ public:
/// Copy the file from `from_path` to `to_path`.
virtual void copyFile(const String & from_path, const String & to_path) = 0;
/// List files at `path` and add their names to `file_names`
virtual void listFiles(const String & path, std::vector<String> & file_names) = 0;
/// Open the file for read and return ReadBufferFromFileBase object.
virtual std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
@ -142,6 +146,12 @@ public:
/// Remove file or directory with all children. Use with extra caution. Throws exception if file doesn't exists.
virtual void removeRecursive(const String & path) = 0;
/// Set last modified time to file or directory at `path`.
virtual void setLastModified(const String & path, const Poco::Timestamp & timestamp) = 0;
/// Get last modified time of file or directory at `path`.
virtual Poco::Timestamp getLastModified(const String & path) = 0;
};
using DiskPtr = std::shared_ptr<IDisk>;
@ -162,6 +172,9 @@ public:
/// Path to the file that the iterator currently points to.
virtual String path() const = 0;
/// Name of the file that the iterator currently points to.
virtual String name() const = 0;
virtual ~IDiskDirectoryIterator() = default;
};

View File

@ -116,16 +116,17 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
{
String file_name = it.first;
String path = part->getFullPath() + file_name;
auto disk = part->disk;
String path = part->getFullRelativePath() + file_name;
UInt64 size = Poco::File(path).getSize();
UInt64 size = disk->getFileSize(path);
writeStringBinary(it.first, out);
writeBinary(size, out);
ReadBufferFromFile file_in(path);
auto file_in = disk->readFile(path);
HashingWriteBuffer hashing_out(out);
copyData(file_in, hashing_out, blocker.getCounter());
copyData(*file_in, hashing_out, blocker.getCounter());
if (blocker.isCancelled())
throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED);

View File

@ -1,30 +1,23 @@
#include "IMergeTreeDataPart.h"
#include <optional>
#include <Core/Defines.h>
#include <IO/HashingWriteBuffer.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/HashingWriteBuffer.h>
#include <Core/Defines.h>
#include <Common/SipHash.h>
#include <Common/escapeForFileName.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/localBackup.h>
#include <Compression/CompressionInfo.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Poco/File.h>
#include <Poco/Path.h>
#include <Poco/DirectoryIterator.h>
#include <common/logger_useful.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/escapeForFileName.h>
#include <Common/localBackup.h>
#include <common/JSON.h>
#include <common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int DIRECTORY_ALREADY_EXISTS;
@ -42,37 +35,43 @@ namespace ErrorCodes
}
static ReadBufferFromFile openForReading(const String & path)
static std::unique_ptr<ReadBufferFromFileBase> openForReading(const DiskPtr & disk, const String & path)
{
return ReadBufferFromFile(path, std::min(static_cast<Poco::File::FileSize>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path).getSize()));
return disk->readFile(path, std::min(size_t(DBMS_DEFAULT_BUFFER_SIZE), disk->getFileSize(path)));
}
void IMergeTreeDataPart::MinMaxIndex::load(const MergeTreeData & data, const String & part_path)
void IMergeTreeDataPart::MinMaxIndex::load(const MergeTreeData & data, const DiskPtr & disk_, const String & part_path)
{
size_t minmax_idx_size = data.minmax_idx_column_types.size();
parallelogram.reserve(minmax_idx_size);
for (size_t i = 0; i < minmax_idx_size; ++i)
{
String file_name = part_path + "minmax_" + escapeForFileName(data.minmax_idx_columns[i]) + ".idx";
ReadBufferFromFile file = openForReading(file_name);
auto file = openForReading(disk_, file_name);
const DataTypePtr & data_type = data.minmax_idx_column_types[i];
Field min_val;
data_type->deserializeBinary(min_val, file);
data_type->deserializeBinary(min_val, *file);
Field max_val;
data_type->deserializeBinary(max_val, file);
data_type->deserializeBinary(max_val, *file);
parallelogram.emplace_back(min_val, true, max_val, true);
}
initialized = true;
}
void IMergeTreeDataPart::MinMaxIndex::store(const MergeTreeData & data, const String & part_path, Checksums & out_checksums) const
void IMergeTreeDataPart::MinMaxIndex::store(
const MergeTreeData & data, const DiskPtr & disk_, const String & part_path, Checksums & out_checksums) const
{
store(data.minmax_idx_columns, data.minmax_idx_column_types, part_path, out_checksums);
store(data.minmax_idx_columns, data.minmax_idx_column_types, disk_, part_path, out_checksums);
}
void IMergeTreeDataPart::MinMaxIndex::store(const Names & column_names, const DataTypes & data_types, const String & part_path, Checksums & out_checksums) const
void IMergeTreeDataPart::MinMaxIndex::store(
const Names & column_names,
const DataTypes & data_types,
const DiskPtr & disk_,
const String & part_path,
Checksums & out_checksums) const
{
if (!initialized)
throw Exception("Attempt to store uninitialized MinMax index for part " + part_path + ". This is a bug.",
@ -83,8 +82,8 @@ void IMergeTreeDataPart::MinMaxIndex::store(const Names & column_names, const Da
String file_name = "minmax_" + escapeForFileName(column_names[i]) + ".idx";
const DataTypePtr & data_type = data_types.at(i);
WriteBufferFromFile out(part_path + file_name);
HashingWriteBuffer out_hashing(out);
auto out = disk_->writeFile(part_path + file_name);
HashingWriteBuffer out_hashing(*out);
data_type->serializeBinary(parallelogram[i].left, out_hashing);
data_type->serializeBinary(parallelogram[i].right, out_hashing);
out_hashing.next();
@ -139,11 +138,7 @@ void IMergeTreeDataPart::MinMaxIndex::merge(const MinMaxIndex & other)
IMergeTreeDataPart::IMergeTreeDataPart(
MergeTreeData & storage_,
const String & name_,
const DiskPtr & disk_,
const std::optional<String> & relative_path_,
Type part_type_)
MergeTreeData & storage_, const String & name_, const DiskPtr & disk_, const std::optional<String> & relative_path_, Type part_type_)
: storage(storage_)
, name(name_)
, info(MergeTreePartInfo::fromPartName(name_, storage.format_version))
@ -249,10 +244,9 @@ void IMergeTreeDataPart::removeIfNeeded()
{
try
{
std::string path = getFullPath();
auto path = getFullRelativePath();
Poco::File dir(path);
if (!dir.exists())
if (!disk->exists(path))
return;
if (is_temp)
@ -385,7 +379,6 @@ String IMergeTreeDataPart::getColumnNameWithMinumumCompressedSize() const
return *minimum_size_column;
}
String IMergeTreeDataPart::getFullPath() const
{
assertOnDisk();
@ -396,6 +389,16 @@ String IMergeTreeDataPart::getFullPath() const
return storage.getFullPathOnDisk(disk) + relative_path + "/";
}
String IMergeTreeDataPart::getFullRelativePath() const
{
assertOnDisk();
if (relative_path.empty())
throw Exception("Part relative_path cannot be empty. It's bug.", ErrorCodes::LOGICAL_ERROR);
return storage.relative_data_path + relative_path + "/";
}
void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency)
{
assertOnDisk();
@ -441,12 +444,12 @@ void IMergeTreeDataPart::loadIndex()
loaded_index[i]->reserve(index_granularity.getMarksCount());
}
String index_path = getFullPath() + "primary.idx";
ReadBufferFromFile index_file = openForReading(index_path);
String index_path = getFullRelativePath() + "primary.idx";
auto index_file = openForReading(disk, index_path);
for (size_t i = 0; i < index_granularity.getMarksCount(); ++i) //-V756
for (size_t j = 0; j < key_size; ++j)
storage.primary_key_data_types[j]->deserializeBinary(*loaded_index[j], index_file);
storage.primary_key_data_types[j]->deserializeBinary(*loaded_index[j], *index_file);
for (size_t i = 0; i < key_size; ++i)
{
@ -457,8 +460,8 @@ void IMergeTreeDataPart::loadIndex()
ErrorCodes::CANNOT_READ_ALL_DATA);
}
if (!index_file.eof())
throw Exception("Index file " + index_path + " is unexpectedly long", ErrorCodes::EXPECTED_END_OF_FILE);
if (!index_file->eof())
throw Exception("Index file " + fullPath(disk, index_path) + " is unexpectedly long", ErrorCodes::EXPECTED_END_OF_FILE);
index.assign(std::make_move_iterator(loaded_index.begin()), std::make_move_iterator(loaded_index.end()));
}
@ -478,10 +481,10 @@ void IMergeTreeDataPart::loadPartitionAndMinMaxIndex()
}
else
{
String path = getFullPath();
partition.load(storage, path);
String path = getFullRelativePath();
partition.load(storage, disk, path);
if (!isEmpty())
minmax_idx.load(storage, path);
minmax_idx.load(storage, disk, path);
}
String calculated_partition_id = partition.getID(storage.partition_key_sample);
@ -494,43 +497,42 @@ void IMergeTreeDataPart::loadPartitionAndMinMaxIndex()
void IMergeTreeDataPart::loadChecksums(bool require)
{
String path = getFullPath() + "checksums.txt";
Poco::File checksums_file(path);
if (checksums_file.exists())
String path = getFullRelativePath() + "checksums.txt";
if (disk->exists(path))
{
ReadBufferFromFile file = openForReading(path);
if (checksums.read(file))
auto buf = openForReading(disk, path);
if (checksums.read(*buf))
{
assertEOF(file);
assertEOF(*buf);
bytes_on_disk = checksums.getTotalSizeOnDisk();
}
else
bytes_on_disk = calculateTotalSizeOnDisk(getFullPath());
bytes_on_disk = calculateTotalSizeOnDisk(disk, getFullRelativePath());
}
else
{
if (require)
throw Exception("No checksums.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
bytes_on_disk = calculateTotalSizeOnDisk(getFullPath());
bytes_on_disk = calculateTotalSizeOnDisk(disk, getFullRelativePath());
}
}
void IMergeTreeDataPart::loadRowsCount()
{
String path = getFullPath() + "count.txt";
String path = getFullRelativePath() + "count.txt";
if (index_granularity.empty())
{
rows_count = 0;
}
else if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING || part_type == Type::COMPACT)
{
if (!Poco::File(path).exists())
if (!disk->exists(path))
throw Exception("No count.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
ReadBufferFromFile file = openForReading(path);
readIntText(rows_count, file);
assertEOF(file);
auto buf = openForReading(disk, path);
readIntText(rows_count, *buf);
assertEOF(*buf);
}
else
{
@ -572,20 +574,20 @@ void IMergeTreeDataPart::loadRowsCount()
void IMergeTreeDataPart::loadTTLInfos()
{
String path = getFullPath() + "ttl.txt";
if (Poco::File(path).exists())
String path = getFullRelativePath() + "ttl.txt";
if (disk->exists(path))
{
ReadBufferFromFile in = openForReading(path);
assertString("ttl format version: ", in);
auto in = openForReading(disk, path);
assertString("ttl format version: ", *in);
size_t format_version;
readText(format_version, in);
assertChar('\n', in);
readText(format_version, *in);
assertChar('\n', *in);
if (format_version == 1)
{
try
{
ttl_infos.read(in);
ttl_infos.read(*in);
}
catch (const JSONException &)
{
@ -599,9 +601,8 @@ void IMergeTreeDataPart::loadTTLInfos()
void IMergeTreeDataPart::loadColumns(bool require)
{
String path = getFullPath() + "columns.txt";
Poco::File poco_file_path{path};
if (!poco_file_path.exists())
String path = getFullRelativePath() + "columns.txt";
if (!disk->exists(path))
{
/// We can get list of columns only from columns.txt in compact parts.
if (require || part_type == Type::COMPACT)
@ -609,23 +610,21 @@ void IMergeTreeDataPart::loadColumns(bool require)
/// If there is no file with a list of columns, write it down.
for (const NameAndTypePair & column : storage.getColumns().getAllPhysical())
if (Poco::File(getFullPath() + getFileNameForColumn(column) + ".bin").exists())
if (disk->exists(getFullRelativePath() + getFileNameForColumn(column) + ".bin"))
columns.push_back(column);
if (columns.empty())
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
{
WriteBufferFromFile out(path + ".tmp", 4096);
columns.writeText(out);
auto buf = disk->writeFile(path + ".tmp", 4096);
columns.writeText(*buf);
}
Poco::File(path + ".tmp").renameTo(path);
disk->moveFile(path + ".tmp", path);
}
else
{
is_frozen = !poco_file_path.canWrite();
ReadBufferFromFile file = openForReading(path);
columns.readText(file);
columns.readText(*disk->readFile(path));
}
size_t pos = 0;
@ -633,16 +632,15 @@ void IMergeTreeDataPart::loadColumns(bool require)
column_name_to_position.emplace(column.name, pos++);
}
UInt64 IMergeTreeDataPart::calculateTotalSizeOnDisk(const String & from)
UInt64 IMergeTreeDataPart::calculateTotalSizeOnDisk(const DiskPtr & disk_, const String & from)
{
Poco::File cur(from);
if (cur.isFile())
return cur.getSize();
if (disk_->isFile(from))
return disk_->getFileSize(from);
std::vector<std::string> files;
cur.list(files);
disk_->listFiles(from, files);
UInt64 res = 0;
for (const auto & file : files)
res += calculateTotalSizeOnDisk(from + file);
res += calculateTotalSizeOnDisk(disk_, from + file);
return res;
}
@ -651,34 +649,32 @@ void IMergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_
{
assertOnDisk();
String from = getFullPath();
String to = storage.getFullPathOnDisk(disk) + new_relative_path + "/";
String from = getFullRelativePath();
String to = storage.relative_data_path + new_relative_path + "/";
Poco::File from_file(from);
if (!from_file.exists())
throw Exception("Part directory " + from + " doesn't exist. Most likely it is logical error.", ErrorCodes::FILE_DOESNT_EXIST);
if (!disk->exists(from))
throw Exception("Part directory " + fullPath(disk, from) + " doesn't exist. Most likely it is logical error.", ErrorCodes::FILE_DOESNT_EXIST);
Poco::File to_file(to);
if (to_file.exists())
if (disk->exists(to))
{
if (remove_new_dir_if_exists)
{
Names files;
Poco::File(from).list(files);
disk->listFiles(to, files);
LOG_WARNING(storage.log, "Part directory " << to << " already exists"
LOG_WARNING(storage.log, "Part directory " << fullPath(disk, to) << " already exists"
<< " and contains " << files.size() << " files. Removing it.");
to_file.remove(true);
disk->removeRecursive(to);
}
else
{
throw Exception("Part directory " + to + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
throw Exception("Part directory " + fullPath(disk, to) + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
}
}
from_file.setLastModified(Poco::Timestamp::fromEpochTime(time(nullptr)));
from_file.renameTo(to);
disk->setLastModified(from, Poco::Timestamp::fromEpochTime(time(nullptr)));
disk->moveFile(from, to);
relative_path = new_relative_path;
}
@ -703,37 +699,33 @@ void IMergeTreeDataPart::remove() const
* And a race condition can happen that will lead to "File not found" error here.
*/
String full_path = storage.getFullPathOnDisk(disk);
String from = full_path + relative_path;
String to = full_path + "delete_tmp_" + name;
String from_ = storage.relative_data_path + relative_path;
String to_ = storage.relative_data_path + "delete_tmp_" + name;
// TODO directory delete_tmp_<name> is never removed if server crashes before returning from this function
Poco::File from_dir{from};
Poco::File to_dir{to};
if (to_dir.exists())
if (disk->exists(to_))
{
LOG_WARNING(storage.log, "Directory " << to << " (to which part must be renamed before removing) already exists."
LOG_WARNING(storage.log, "Directory " << fullPath(disk, to_) << " (to which part must be renamed before removing) already exists."
" Most likely this is due to unclean restart. Removing it.");
try
{
to_dir.remove(true);
disk->removeRecursive(to_);
}
catch (...)
{
LOG_ERROR(storage.log, "Cannot remove directory " << to << ". Check owner and access rights.");
LOG_ERROR(storage.log, "Cannot remove directory " << fullPath(disk, to_) << ". Check owner and access rights.");
throw;
}
}
try
{
from_dir.renameTo(to);
disk->moveFile(from_, to_);
}
catch (const Poco::FileNotFoundException &)
{
LOG_ERROR(storage.log, "Directory " << from << " (part to remove) doesn't exist or one of nested files has gone."
LOG_ERROR(storage.log, "Directory " << fullPath(disk, to_) << " (part to remove) doesn't exist or one of nested files has gone."
" Most likely this is due to manual removing. This should be discouraged. Ignoring.");
return;
@ -744,28 +736,29 @@ void IMergeTreeDataPart::remove() const
/// Remove each expected file in directory, then remove directory itself.
#if !__clang__
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-variable"
# pragma GCC diagnostic push
# pragma GCC diagnostic ignored "-Wunused-variable"
#endif
std::shared_lock<std::shared_mutex> lock(columns_lock);
/// TODO: IDisk doesn't support `unlink()` and `rmdir()` functionality.
auto to = fullPath(disk, to_);
for (const auto & [file, _] : checksums.files)
{
String path_to_remove = to + "/" + file;
if (0 != unlink(path_to_remove.c_str()))
throwFromErrnoWithPath("Cannot unlink file " + path_to_remove, path_to_remove,
ErrorCodes::CANNOT_UNLINK);
throwFromErrnoWithPath("Cannot unlink file " + path_to_remove, path_to_remove, ErrorCodes::CANNOT_UNLINK);
}
#if !__clang__
#pragma GCC diagnostic pop
# pragma GCC diagnostic pop
#endif
for (const auto & file : {"checksums.txt", "columns.txt"})
{
String path_to_remove = to + "/" + file;
if (0 != unlink(path_to_remove.c_str()))
throwFromErrnoWithPath("Cannot unlink file " + path_to_remove, path_to_remove,
ErrorCodes::CANNOT_UNLINK);
throwFromErrnoWithPath("Cannot unlink file " + path_to_remove, path_to_remove, ErrorCodes::CANNOT_UNLINK);
}
if (0 != rmdir(to.c_str()))
@ -775,10 +768,10 @@ void IMergeTreeDataPart::remove() const
{
/// Recursive directory removal does many excessive "stat" syscalls under the hood.
LOG_ERROR(storage.log, "Cannot quickly remove directory " << to << " by removing files; fallback to recursive removal. Reason: "
LOG_ERROR(storage.log, "Cannot quickly remove directory " << fullPath(disk, to_) << " by removing files; fallback to recursive removal. Reason: "
<< getCurrentExceptionMessage(false));
to_dir.remove(true);
disk->removeRecursive(to_ + "/");
}
}
@ -796,8 +789,7 @@ String IMergeTreeDataPart::getRelativePathForDetachedPart(const String & prefix)
*/
for (int try_no = 0; try_no < 10; try_no++)
{
res = "detached/" + (prefix.empty() ? "" : prefix + "_")
+ name + (try_no ? "_try" + DB::toString(try_no) : "");
res = "detached/" + (prefix.empty() ? "" : prefix + "_") + name + (try_no ? "_try" + DB::toString(try_no) : "");
if (!Poco::File(storage.getFullPathOnDisk(disk) + res).exists())
return res;
@ -845,7 +837,7 @@ void IMergeTreeDataPart::makeCloneOnDiskDetached(const ReservationPtr & reservat
void IMergeTreeDataPart::checkConsistencyBase() const
{
String path = getFullPath();
String path = getFullRelativePath();
if (!checksums.empty())
{
@ -870,31 +862,31 @@ void IMergeTreeDataPart::checkConsistencyBase() const
}
}
checksums.checkSizes(path);
checksums.checkSizes(disk, path);
}
else
{
auto check_file_not_empty = [&path](const String & file_path)
auto check_file_not_empty = [&path](const DiskPtr & disk_, const String & file_path)
{
Poco::File file(file_path);
if (!file.exists() || file.getSize() == 0)
throw Exception("Part " + path + " is broken: " + file_path + " is empty", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
return file.getSize();
UInt64 file_size;
if (!disk_->exists(file_path) || (file_size = disk_->getFileSize(file_path)) == 0)
throw Exception("Part " + fullPath(disk_, path) + " is broken: " + fullPath(disk_, file_path) + " is empty", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
return file_size;
};
/// Check that the primary key index is not empty.
if (!storage.primary_key_columns.empty())
check_file_not_empty(path + "primary.idx");
check_file_not_empty(disk, path + "primary.idx");
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
check_file_not_empty(path + "count.txt");
check_file_not_empty(disk, path + "count.txt");
if (storage.partition_key_expr)
check_file_not_empty(path + "partition.dat");
check_file_not_empty(disk, path + "partition.dat");
for (const String & col_name : storage.minmax_idx_columns)
check_file_not_empty(path + "minmax_" + escapeForFileName(col_name) + ".idx");
check_file_not_empty(disk, path + "minmax_" + escapeForFileName(col_name) + ".idx");
}
}
}

View File

@ -268,9 +268,9 @@ public:
{
}
void load(const MergeTreeData & storage, const String & part_path);
void store(const MergeTreeData & storage, const String & part_path, Checksums & checksums) const;
void store(const Names & column_names, const DataTypes & data_types, const String & part_path, Checksums & checksums) const;
void load(const MergeTreeData & storage, const DiskPtr & disk_, const String & part_path);
void store(const MergeTreeData & storage, const DiskPtr & disk_, const String & part_path, Checksums & checksums) const;
void store(const Names & column_names, const DataTypes & data_types, const DiskPtr & disk_, const String & part_path, Checksums & checksums) const;
void update(const Block & block, const Names & column_names);
void merge(const MinMaxIndex & other);
@ -294,6 +294,7 @@ public:
UInt64 getMarksCount() const;
size_t getFileSizeOrZero(const String & file_name) const;
String getFullRelativePath() const;
String getFullPath() const;
void renameTo(const String & new_relative_path, bool remove_new_dir_if_exists = false) const;
void renameToDetached(const String & prefix) const;
@ -305,7 +306,7 @@ public:
/// Checks that .bin and .mrk files exist
virtual bool hasColumnFiles(const String & /* column */, const IDataType & /* type */) const{ return false; }
static UInt64 calculateTotalSizeOnDisk(const String & from);
static UInt64 calculateTotalSizeOnDisk(const DiskPtr & disk_, const String & from);
protected:
/// Columns description.

View File

@ -1,6 +1,6 @@
#include <Storages/MergeTree/IMergeTreeDataPartWriter.h>
#include <IO/createWriteBufferFromFileBase.h>
#include <Poco/File.h>
#include <utility>
namespace DB
{
@ -24,11 +24,12 @@ void IMergeTreeDataPartWriter::Stream::finalize()
void IMergeTreeDataPartWriter::Stream::sync()
{
plain_file->sync();
marks_file.sync();
marks_file->sync();
}
IMergeTreeDataPartWriter::Stream::Stream(
const String & escaped_column_name_,
DiskPtr disk_,
const String & data_path_,
const std::string & data_file_extension_,
const std::string & marks_path_,
@ -40,9 +41,9 @@ IMergeTreeDataPartWriter::Stream::Stream(
escaped_column_name(escaped_column_name_),
data_file_extension{data_file_extension_},
marks_file_extension{marks_file_extension_},
plain_file(createWriteBufferFromFileBase(data_path_ + data_file_extension, estimated_size_, aio_threshold_, max_compress_block_size_)),
plain_file(disk_->writeFile(data_path_ + data_file_extension, max_compress_block_size_, WriteMode::Rewrite, estimated_size_, aio_threshold_)),
plain_hashing(*plain_file), compressed_buf(plain_hashing, compression_codec_), compressed(compressed_buf),
marks_file(marks_path_ + marks_file_extension, 4096, O_TRUNC | O_CREAT | O_WRONLY), marks(marks_file)
marks_file(disk_->writeFile(marks_path_ + marks_file_extension, 4096, WriteMode::Rewrite)), marks(*marks_file)
{
}
@ -62,6 +63,7 @@ void IMergeTreeDataPartWriter::Stream::addToChecksums(MergeTreeData::DataPart::C
IMergeTreeDataPartWriter::IMergeTreeDataPartWriter(
DiskPtr disk_,
const String & part_path_,
const MergeTreeData & storage_,
const NamesAndTypesList & columns_list_,
@ -71,7 +73,8 @@ IMergeTreeDataPartWriter::IMergeTreeDataPartWriter(
const MergeTreeWriterSettings & settings_,
const MergeTreeIndexGranularity & index_granularity_,
bool need_finish_last_granule_)
: part_path(part_path_)
: disk(std::move(disk_))
, part_path(part_path_)
, storage(storage_)
, columns_list(columns_list_)
, marks_file_extension(marks_file_extension_)
@ -86,10 +89,8 @@ IMergeTreeDataPartWriter::IMergeTreeDataPartWriter(
if (settings.blocks_are_granules_size && !index_granularity.empty())
throw Exception("Can't take information about index granularity from blocks, when non empty index_granularity array specified", ErrorCodes::LOGICAL_ERROR);
Poco::File part_dir(part_path);
if (!part_dir.exists())
part_dir.createDirectories();
if (!disk->exists(part_path))
disk->createDirectories(part_path);
}
IMergeTreeDataPartWriter::~IMergeTreeDataPartWriter() = default;
@ -172,8 +173,7 @@ void IMergeTreeDataPartWriter::initPrimaryIndex()
{
if (storage.hasPrimaryKey())
{
index_file_stream = std::make_unique<WriteBufferFromFile>(
part_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY);
index_file_stream = disk->writeFile(part_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
index_stream = std::make_unique<HashingWriteBuffer>(*index_file_stream);
}
@ -188,6 +188,7 @@ void IMergeTreeDataPartWriter::initSkipIndices()
skip_indices_streams.emplace_back(
std::make_unique<IMergeTreeDataPartWriter::Stream>(
stream_name,
disk,
part_path + stream_name, INDEX_FILE_EXTENSION,
part_path + stream_name, marks_file_extension,
default_codec, settings.max_compress_block_size,

View File

@ -7,6 +7,7 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Disks/IDisk.h>
namespace DB
@ -26,6 +27,7 @@ public:
{
Stream(
const String & escaped_column_name_,
DiskPtr disk_,
const String & data_path_,
const std::string & data_file_extension_,
const std::string & marks_path_,
@ -46,7 +48,7 @@ public:
HashingWriteBuffer compressed;
/// marks -> marks_file
WriteBufferFromFile marks_file;
std::unique_ptr<WriteBufferFromFileBase> marks_file;
HashingWriteBuffer marks;
void finalize();
@ -59,6 +61,7 @@ public:
using StreamPtr = std::unique_ptr<Stream>;
IMergeTreeDataPartWriter(
DiskPtr disk,
const String & part_path,
const MergeTreeData & storage,
const NamesAndTypesList & columns_list,
@ -113,6 +116,7 @@ protected:
using SerializationState = IDataType::SerializeBinaryBulkStatePtr;
using SerializationStates = std::unordered_map<String, SerializationState>;
DiskPtr disk;
String part_path;
const MergeTreeData & storage;
NamesAndTypesList columns_list;
@ -146,7 +150,7 @@ protected:
MergeTreeIndexAggregators skip_indices_aggregators;
std::vector<size_t> skip_index_filling;
std::unique_ptr<WriteBufferFromFile> index_file_stream;
std::unique_ptr<WriteBufferFromFileBase> index_file_stream;
std::unique_ptr<HashingWriteBuffer> index_stream;
MutableColumns index_columns;
DataTypes index_types;

View File

@ -26,7 +26,7 @@ IMergeTreeReader::IMergeTreeReader(const MergeTreeData::DataPartPtr & data_part_
const NamesAndTypesList & columns_, UncompressedCache * uncompressed_cache_, MarkCache * mark_cache_,
const MarkRanges & all_mark_ranges_, const MergeTreeReaderSettings & settings_,
const ValueSizeMap & avg_value_size_hints_)
: data_part(data_part_), avg_value_size_hints(avg_value_size_hints_), path(data_part_->getFullPath())
: data_part(data_part_), avg_value_size_hints(avg_value_size_hints_)
, columns(columns_), uncompressed_cache(uncompressed_cache_), mark_cache(mark_cache_)
, settings(settings_), storage(data_part_->storage)
, all_mark_ranges(all_mark_ranges_)
@ -140,7 +140,7 @@ void IMergeTreeReader::fillMissingColumns(Columns & res_columns, bool & should_e
catch (Exception & e)
{
/// Better diagnostics.
e.addMessage("(while reading from part " + path + ")");
e.addMessage("(while reading from part " + data_part->getFullPath() + ")");
throw;
}
}
@ -177,7 +177,7 @@ void IMergeTreeReader::evaluateMissingDefaults(Block additional_columns, Columns
catch (Exception & e)
{
/// Better diagnostics.
e.addMessage("(while reading from part " + path + ")");
e.addMessage("(while reading from part " + data_part->getFullPath() + ")");
throw;
}
}
@ -226,7 +226,7 @@ void IMergeTreeReader::performRequiredConversions(Columns & res_columns)
catch (Exception & e)
{
/// Better diagnostics.
e.addMessage("(while reading from part " + path + ")");
e.addMessage("(while reading from part " + data_part->getFullPath() + ")");
throw;
}
}

View File

@ -63,8 +63,6 @@ protected:
ValueSizeMap avg_value_size_hints;
/// Stores states for IDataType::deserializeBinaryBulk
DeserializeBinaryBulkStateMap deserialize_binary_bulk_state_map;
/// Path to the directory containing the part
String path;
/// Columns that are read.
NamesAndTypesList columns;

View File

@ -9,7 +9,8 @@ namespace DB
IMergedBlockOutputStream::IMergedBlockOutputStream(
const MergeTreeDataPartPtr & data_part)
: storage(data_part->storage)
, part_path(data_part->getFullPath())
, disk(data_part->disk)
, part_path(data_part->getFullRelativePath())
{
}

View File

@ -30,6 +30,7 @@ protected:
protected:
const MergeTreeData & storage;
DiskPtr disk;
String part_path;
static Block getBlockAndPermute(const Block & block, const Names & names, const IColumn::Permutation * permutation);

View File

@ -199,48 +199,47 @@ MergeTreeData::MergeTreeData(
setTTLExpressions(metadata.columns.getColumnTTLs(), metadata.ttl_for_table_ast);
// format_file always contained on any data path
String version_file_path;
/// format_file always contained on any data path
PathWithDisk version_file;
/// Creating directories, if not exist.
auto paths = getDataPaths();
for (const String & path : paths)
for (const auto & [path, disk] : getRelativeDataPathsWithDisks())
{
Poco::File(path).createDirectories();
Poco::File(path + "detached").createDirectory();
disk->createDirectories(path);
disk->createDirectories(path + "detached");
auto current_version_file_path = path + "format_version.txt";
if (Poco::File{current_version_file_path}.exists())
if (disk->exists(current_version_file_path))
{
if (!version_file_path.empty())
if (!version_file.first.empty())
{
LOG_ERROR(log, "Duplication of version file " << version_file_path << " and " << current_version_file_path);
LOG_ERROR(log, "Duplication of version file " <<
fullPath(version_file.second, version_file.first) << " and " << current_version_file_path);
throw Exception("Multiple format_version.txt file", ErrorCodes::CORRUPTED_DATA);
}
version_file_path = current_version_file_path;
version_file = {current_version_file_path, disk};
}
}
/// If not choose any
if (version_file_path.empty())
version_file_path = getFullPathOnDisk(getStoragePolicy()->getAnyDisk()) + "format_version.txt";
if (version_file.first.empty())
version_file = {relative_data_path + "format_version.txt", getStoragePolicy()->getAnyDisk()};
bool version_file_exists = Poco::File(version_file_path).exists();
bool version_file_exists = version_file.second->exists(version_file.first);
// When data path or file not exists, ignore the format_version check
if (!attach || !version_file_exists)
{
format_version = min_format_version;
WriteBufferFromFile buf(version_file_path);
writeIntText(format_version.toUnderType(), buf);
auto buf = version_file.second->writeFile(version_file.first);
writeIntText(format_version.toUnderType(), *buf);
}
else
{
ReadBufferFromFile buf(version_file_path);
auto buf = version_file.second->readFile(version_file.first);
UInt32 read_format_version;
readIntText(read_format_version, buf);
readIntText(read_format_version, *buf);
format_version = read_format_version;
if (!buf.eof())
throw Exception("Bad version file: " + version_file_path, ErrorCodes::CORRUPTED_DATA);
if (!buf->eof())
throw Exception("Bad version file: " + fullPath(version_file.second, version_file.first), ErrorCodes::CORRUPTED_DATA);
}
if (format_version < min_format_version)
@ -872,15 +871,15 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
for (const auto & disk_ptr : disks)
defined_disk_names.insert(disk_ptr->getName());
for (auto & [disk_name, disk_ptr] : global_context.getDiskSelector()->getDisksMap())
for (auto & [disk_name, disk] : global_context.getDiskSelector()->getDisksMap())
{
if (defined_disk_names.count(disk_name) == 0 && Poco::File(getFullPathOnDisk(disk_ptr)).exists())
if (defined_disk_names.count(disk_name) == 0 && disk->exists(relative_data_path))
{
for (Poco::DirectoryIterator it(getFullPathOnDisk(disk_ptr)); it != end; ++it)
for (const auto it = disk->iterateDirectory(relative_data_path); it->isValid(); it->next())
{
MergeTreePartInfo part_info;
if (MergeTreePartInfo::tryParsePartName(it.name(), &part_info, format_version))
throw Exception("Part " + backQuote(it.name()) + " was found on disk " + backQuote(disk_name) + " which is not defined in the storage policy", ErrorCodes::UNKNOWN_DISK);
if (MergeTreePartInfo::tryParsePartName(it->name(), &part_info, format_version))
throw Exception("Part " + backQuote(it->name()) + " was found on disk " + backQuote(disk_name) + " which is not defined in the storage policy", ErrorCodes::UNKNOWN_DISK);
}
}
}
@ -891,13 +890,13 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
for (auto disk_it = disks.rbegin(); disk_it != disks.rend(); ++disk_it)
{
auto disk_ptr = *disk_it;
for (Poco::DirectoryIterator it(getFullPathOnDisk(disk_ptr)); it != end; ++it)
for (auto it = disk_ptr->iterateDirectory(relative_data_path); it->isValid(); it->next())
{
/// Skip temporary directories.
if (startsWith(it.name(), "tmp"))
if (startsWith(it->name(), "tmp"))
continue;
part_names_with_disks.emplace_back(it.name(), disk_ptr);
part_names_with_disks.emplace_back(it->name(), disk_ptr);
}
}
@ -938,9 +937,9 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
auto part = createPart(part_name, part_info, part_disk_ptr, part_name);
bool broken = false;
Poco::Path part_path(getFullPathOnDisk(part_disk_ptr), part_name);
Poco::Path marker_path(part_path, DELETE_ON_DESTROY_MARKER_PATH);
if (Poco::File(marker_path).exists())
String part_path = relative_data_path + "/" + part_name;
String marker_path = part_path + "/" + DELETE_ON_DESTROY_MARKER_PATH;
if (part_disk_ptr->exists(marker_path))
{
LOG_WARNING(log, "Detaching stale part " << getFullPathOnDisk(part_disk_ptr) << part_name << ", which should have been deleted after a move. That can only happen after unclean restart of ClickHouse after move of a part having an operation blocking that stale copy of part.");
std::lock_guard loading_lock(mutex);
@ -1031,7 +1030,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
else
has_adaptive_parts.store(true, std::memory_order_relaxed);
part->modification_time = Poco::File(getFullPathOnDisk(part_disk_ptr) + part_name).getLastModified().epochTime();
part->modification_time = part_disk_ptr->getLastModified(relative_data_path + part_name).epochTime();
/// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later
part->state = DataPartState::Committed;
@ -1114,14 +1113,13 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
/// Is the part directory old.
/// True if its modification time and the modification time of all files inside it is less then threshold.
/// (Only files on the first level of nesting are considered).
static bool isOldPartDirectory(Poco::File & directory, time_t threshold)
static bool isOldPartDirectory(const DiskPtr & disk, const String & directory_path, time_t threshold)
{
if (directory.getLastModified().epochTime() >= threshold)
if (disk->getLastModified(directory_path).epochTime() >= threshold)
return false;
Poco::DirectoryIterator end;
for (Poco::DirectoryIterator it(directory); it != end; ++it)
if (it->getLastModified().epochTime() >= threshold)
for (auto it = disk->iterateDirectory(directory_path); it->isValid(); it->next())
if (disk->getLastModified(it->path()).epochTime() >= threshold)
return false;
return true;
@ -1141,24 +1139,19 @@ void MergeTreeData::clearOldTemporaryDirectories(ssize_t custom_directories_life
? current_time - custom_directories_lifetime_seconds
: current_time - settings->temporary_directories_lifetime.totalSeconds();
const auto full_paths = getDataPaths();
/// Delete temporary directories older than a day.
Poco::DirectoryIterator end;
for (auto && full_data_path : full_paths)
for (const auto & [path, disk] : getRelativeDataPathsWithDisks())
{
for (Poco::DirectoryIterator it{full_data_path}; it != end; ++it)
for (auto it = disk->iterateDirectory(path); it->isValid(); it->next())
{
if (startsWith(it.name(), "tmp_"))
if (startsWith(it->name(), "tmp_"))
{
Poco::File tmp_dir(full_data_path + it.name());
try
{
if (tmp_dir.isDirectory() && isOldPartDirectory(tmp_dir, deadline))
if (disk->isDirectory(it->path()) && isOldPartDirectory(disk, it->path(), deadline))
{
LOG_WARNING(log, "Removing temporary directory " << full_data_path << it.name());
Poco::File(full_data_path + it.name()).remove(true);
LOG_WARNING(log, "Removing temporary directory " << fullPath(disk, it->path()));
disk->removeRecursive(it->path());
}
}
catch (const Poco::FileNotFoundException &)
@ -1356,10 +1349,8 @@ void MergeTreeData::dropAllData()
/// Removing of each data part before recursive removal of directory is to speed-up removal, because there will be less number of syscalls.
clearPartsFromFilesystem(all_parts);
auto full_paths = getDataPaths();
for (auto && full_data_path : full_paths)
Poco::File(full_data_path).remove(true);
for (const auto & [path, disk] : getRelativeDataPathsWithDisks())
disk->removeRecursive(path);
LOG_TRACE(log, "dropAllData: done.");
}
@ -1669,8 +1660,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(
const DiskPtr & disk, const String & relative_path) const
{
MergeTreeDataPartType type;
auto full_path = getFullPathOnDisk(disk) + relative_path + "/";
auto mrk_ext = MergeTreeIndexGranularityInfo::getMrkExtensionFromFS(full_path);
auto full_path = relative_data_path + relative_path + "/";
auto mrk_ext = MergeTreeIndexGranularityInfo::getMrkExtensionFromFS(disk, full_path);
if (mrk_ext)
type = getPartTypeFromMarkExtension(*mrk_ext);
@ -3660,6 +3651,15 @@ MergeTreeData::PathsWithDisks MergeTreeData::getDataPathsWithDisks() const
return res;
}
MergeTreeData::PathsWithDisks MergeTreeData::getRelativeDataPathsWithDisks() const
{
PathsWithDisks res;
auto disks = getStoragePolicy()->getDisks();
for (const auto & disk : disks)
res.emplace_back(relative_data_path, disk);
return res;
}
void MergeTreeData::freezePartitionsByMatcher(MatcherFn matcher, const String & with_name, const Context & context)
{
String clickhouse_path = Poco::Path(context.getPath()).makeAbsolute().toString();

View File

@ -683,6 +683,7 @@ public:
using PathWithDisk = std::pair<String, DiskPtr>;
using PathsWithDisks = std::vector<PathWithDisk>;
PathsWithDisks getDataPathsWithDisks() const;
PathsWithDisks getRelativeDataPathsWithDisks() const;
/// Reserves space at least 1MB.
ReservationPtr reserveSpace(UInt64 expected_size) const;

View File

@ -1211,7 +1211,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
new_data_part->partition.assign(source_part->partition);
new_data_part->minmax_idx = source_part->minmax_idx;
new_data_part->modification_time = time(nullptr);
new_data_part->bytes_on_disk = MergeTreeData::DataPart::calculateTotalSizeOnDisk(new_data_part->getFullPath());
new_data_part->bytes_on_disk = MergeTreeData::DataPart::calculateTotalSizeOnDisk(new_data_part->disk, new_data_part->getFullRelativePath());
}
return new_data_part;

View File

@ -44,14 +44,13 @@ void MergeTreeDataPartChecksum::checkEqual(const MergeTreeDataPartChecksum & rhs
throw Exception("Checksum mismatch for file " + name + " in data part", ErrorCodes::CHECKSUM_DOESNT_MATCH);
}
void MergeTreeDataPartChecksum::checkSize(const String & path) const
void MergeTreeDataPartChecksum::checkSize(const DiskPtr & disk, const String & path) const
{
Poco::File file(path);
if (!file.exists())
throw Exception(path + " doesn't exist", ErrorCodes::FILE_DOESNT_EXIST);
UInt64 size = file.getSize();
if (!disk->exists(path))
throw Exception(fullPath(disk, path) + " doesn't exist", ErrorCodes::FILE_DOESNT_EXIST);
UInt64 size = disk->getFileSize(path);
if (size != file_size)
throw Exception(path + " has unexpected size: " + toString(size) + " instead of " + toString(file_size),
throw Exception(fullPath(disk, path) + " has unexpected size: " + toString(size) + " instead of " + toString(file_size),
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
}
@ -78,12 +77,12 @@ void MergeTreeDataPartChecksums::checkEqual(const MergeTreeDataPartChecksums & r
}
}
void MergeTreeDataPartChecksums::checkSizes(const String & path) const
void MergeTreeDataPartChecksums::checkSizes(const DiskPtr & disk, const String & path) const
{
for (const auto & it : files)
{
const String & name = it.first;
it.second.checkSize(path + name);
it.second.checkSize(disk, path + name);
}
}

View File

@ -1,10 +1,11 @@
#pragma once
#include <Core/Types.h>
#include <IO/ReadBuffer.h>
#include <IO/WriteBuffer.h>
#include <city.h>
#include <map>
#include <optional>
#include <city.h>
#include <Core/Types.h>
#include <Disks/IDisk.h>
#include <IO/ReadBuffer.h>
#include <IO/WriteBuffer.h>
class SipHash;
@ -32,7 +33,7 @@ struct MergeTreeDataPartChecksum
uncompressed_size(uncompressed_size_), uncompressed_hash(uncompressed_hash_) {}
void checkEqual(const MergeTreeDataPartChecksum & rhs, bool have_uncompressed, const String & name) const;
void checkSize(const String & path) const;
void checkSize(const DiskPtr & disk, const String & path) const;
};
@ -64,7 +65,7 @@ struct MergeTreeDataPartChecksums
static bool isBadChecksumsErrorCode(int code);
/// Checks that the directory contains all the needed files of the correct size. Does not check the checksum.
void checkSizes(const String & path) const;
void checkSizes(const DiskPtr & disk, const String & path) const;
/// Returns false if the checksum is too old.
bool read(ReadBuffer & in);

View File

@ -68,7 +68,7 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartCompact::getWriter(
{ return *getColumnPosition(lhs.name) < *getColumnPosition(rhs.name); });
return std::make_unique<MergeTreeDataPartWriterCompact>(
getFullPath(), storage, ordered_columns_list, indices_to_recalc,
disk, getFullRelativePath(), storage, ordered_columns_list, indices_to_recalc,
index_granularity_info.marks_file_extension,
default_codec, writer_settings, computed_index_granularity);
}
@ -92,7 +92,7 @@ ColumnSize MergeTreeDataPartCompact::getTotalColumnsSize() const
void MergeTreeDataPartCompact::loadIndexGranularity()
{
String full_path = getFullPath();
String full_path = getFullRelativePath();
if (columns.empty())
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
@ -100,19 +100,19 @@ void MergeTreeDataPartCompact::loadIndexGranularity()
if (!index_granularity_info.is_adaptive)
throw Exception("MergeTreeDataPartCompact cannot be created with non-adaptive granulary.", ErrorCodes::NOT_IMPLEMENTED);
std::string marks_file_path = index_granularity_info.getMarksFilePath(full_path + "data");
if (!Poco::File(marks_file_path).exists())
throw Exception("Marks file '" + marks_file_path + "' doesn't exist", ErrorCodes::NO_FILE_IN_DATA_PART);
auto marks_file_path = index_granularity_info.getMarksFilePath(full_path + "data");
if (!disk->exists(marks_file_path))
throw Exception("Marks file '" + fullPath(disk, marks_file_path) + "' doesn't exist", ErrorCodes::NO_FILE_IN_DATA_PART);
size_t marks_file_size = Poco::File(marks_file_path).getSize();
size_t marks_file_size = disk->getFileSize(marks_file_path);
ReadBufferFromFile buffer(marks_file_path, marks_file_size);
while (!buffer.eof())
auto buffer = disk->readFile(marks_file_path, marks_file_size);
while (!buffer->eof())
{
/// Skip offsets for columns
buffer.seek(columns.size() * sizeof(MarkInCompressedFile), SEEK_CUR);
buffer->seek(columns.size() * sizeof(MarkInCompressedFile), SEEK_CUR);
size_t granularity;
readIntBinary(granularity, buffer);
readIntBinary(granularity, *buffer);
index_granularity.appendMark(granularity);
}
@ -187,7 +187,7 @@ NameToNameMap MergeTreeDataPartCompact::createRenameMapForAlter(
void MergeTreeDataPartCompact::checkConsistency(bool require_part_metadata) const
{
checkConsistencyBase();
String path = getFullPath();
String path = getFullRelativePath();
String mrk_file_name = DATA_FILE_NAME + index_granularity_info.marks_file_extension;
if (!checksums.empty())
@ -199,34 +199,34 @@ void MergeTreeDataPartCompact::checkConsistency(bool require_part_metadata) cons
if (require_part_metadata)
{
if (!checksums.files.count(mrk_file_name))
throw Exception("No marks file checksum for column in part " + path, ErrorCodes::NO_FILE_IN_DATA_PART);
throw Exception("No marks file checksum for column in part " + fullPath(disk, path), ErrorCodes::NO_FILE_IN_DATA_PART);
if (!checksums.files.count(DATA_FILE_NAME_WITH_EXTENSION))
throw Exception("No data file checksum for in part " + path, ErrorCodes::NO_FILE_IN_DATA_PART);
throw Exception("No data file checksum for in part " + fullPath(disk, path), ErrorCodes::NO_FILE_IN_DATA_PART);
}
}
else
{
{
/// count.txt should be present even in non custom-partitioned parts
Poco::File file(path + "count.txt");
if (!file.exists() || file.getSize() == 0)
throw Exception("Part " + path + " is broken: " + file.path() + " is empty", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
auto file_path = path + "count.txt";
if (!disk->exists(file_path) || disk->getFileSize(file_path) == 0)
throw Exception("Part " + path + " is broken: " + fullPath(disk, file_path) + " is empty", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
}
/// Check that marks are nonempty and have the consistent size with columns number.
Poco::File file(path + mrk_file_name);
auto mrk_file_path = path + mrk_file_name;
if (file.exists())
if (disk->exists(mrk_file_name))
{
UInt64 file_size = file.getSize();
UInt64 file_size = disk->getFileSize(mrk_file_name);
if (!file_size)
throw Exception("Part " + path + " is broken: " + file.path() + " is empty.",
throw Exception("Part " + path + " is broken: " + fullPath(disk, mrk_file_name) + " is empty.",
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
UInt64 expected_file_size = index_granularity_info.getMarkSizeInBytes(columns.size()) * index_granularity.getMarksCount();
if (expected_file_size != file_size)
throw Exception(
"Part " + path + " is broken: bad size of marks file '" + file.path() + "': " + std::to_string(file_size) + ", must be: " + std::to_string(expected_file_size),
"Part " + path + " is broken: bad size of marks file '" + fullPath(disk, mrk_file_name) + "': " + std::to_string(file_size) + ", must be: " + std::to_string(expected_file_size),
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
}
}

View File

@ -59,7 +59,7 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartWide::getWriter(
const MergeTreeIndexGranularity & computed_index_granularity) const
{
return std::make_unique<MergeTreeDataPartWriterWide>(
getFullPath(), storage, columns_list, indices_to_recalc,
disk, getFullRelativePath(), storage, columns_list, indices_to_recalc,
index_granularity_info.marks_file_extension,
default_codec, writer_settings, computed_index_granularity);
}
@ -115,8 +115,8 @@ ColumnSize MergeTreeDataPartWide::getColumnSize(const String & column_name, cons
void MergeTreeDataPartWide::loadIndexGranularity()
{
String full_path = getFullPath();
index_granularity_info.changeGranularityIfRequired(full_path);
String full_path = getFullRelativePath();
index_granularity_info.changeGranularityIfRequired(disk, full_path);
if (columns.empty())
@ -124,10 +124,10 @@ void MergeTreeDataPartWide::loadIndexGranularity()
/// We can use any column, it doesn't matter
std::string marks_file_path = index_granularity_info.getMarksFilePath(full_path + getFileNameForColumn(columns.front()));
if (!Poco::File(marks_file_path).exists())
throw Exception("Marks file '" + marks_file_path + "' doesn't exist", ErrorCodes::NO_FILE_IN_DATA_PART);
if (!disk->exists(marks_file_path))
throw Exception("Marks file '" + fullPath(disk, marks_file_path) + "' doesn't exist", ErrorCodes::NO_FILE_IN_DATA_PART);
size_t marks_file_size = Poco::File(marks_file_path).getSize();
size_t marks_file_size = disk->getFileSize(marks_file_path);
if (!index_granularity_info.is_adaptive)
{
@ -136,17 +136,17 @@ void MergeTreeDataPartWide::loadIndexGranularity()
}
else
{
ReadBufferFromFile buffer(marks_file_path, marks_file_size, -1);
while (!buffer.eof())
auto buffer = disk->readFile(marks_file_path, marks_file_size);
while (!buffer->eof())
{
buffer.seek(sizeof(size_t) * 2, SEEK_CUR); /// skip offset_in_compressed file and offset_in_decompressed_block
buffer->seek(sizeof(size_t) * 2, SEEK_CUR); /// skip offset_in_compressed file and offset_in_decompressed_block
size_t granularity;
readIntBinary(granularity, buffer);
readIntBinary(granularity, *buffer);
index_granularity.appendMark(granularity);
}
if (index_granularity.getMarksCount() * index_granularity_info.getMarkSizeInBytes() != marks_file_size)
throw Exception("Cannot read all marks from file " + marks_file_path, ErrorCodes::CANNOT_READ_ALL_DATA);
throw Exception("Cannot read all marks from file " + fullPath(disk, marks_file_path), ErrorCodes::CANNOT_READ_ALL_DATA);
}
index_granularity.setInitialized();
@ -166,9 +166,9 @@ void MergeTreeDataPartWide::accumulateColumnSizes(ColumnToSize & column_to_size)
IDataType::SubstreamPath path;
name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
{
Poco::File bin_file(getFullPath() + IDataType::getFileNameForStream(name_type.name, substream_path) + ".bin");
if (bin_file.exists())
column_to_size[name_type.name] += bin_file.getSize();
auto bin_file_path = getFullRelativePath() + IDataType::getFileNameForStream(name_type.name, substream_path) + ".bin";
if (disk->exists(bin_file_path))
column_to_size[name_type.name] += disk->getFileSize(bin_file_path);
}, path);
}
}
@ -176,7 +176,7 @@ void MergeTreeDataPartWide::accumulateColumnSizes(ColumnToSize & column_to_size)
void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
{
checkConsistencyBase();
String path = getFullPath();
String path = getFullRelativePath();
if (!checksums.empty())
{
@ -191,10 +191,10 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
String mrk_file_name = file_name + index_granularity_info.marks_file_extension;
String bin_file_name = file_name + ".bin";
if (!checksums.files.count(mrk_file_name))
throw Exception("No " + mrk_file_name + " file checksum for column " + name_type.name + " in part " + path,
throw Exception("No " + mrk_file_name + " file checksum for column " + name_type.name + " in part " + fullPath(disk, path),
ErrorCodes::NO_FILE_IN_DATA_PART);
if (!checksums.files.count(bin_file_name))
throw Exception("No " + bin_file_name + " file checksum for column " + name_type.name + " in part " + path,
throw Exception("No " + bin_file_name + " file checksum for column " + name_type.name + " in part " + fullPath(disk, path),
ErrorCodes::NO_FILE_IN_DATA_PART);
}, stream_path);
}
@ -209,15 +209,15 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
{
name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
{
Poco::File file(IDataType::getFileNameForStream(name_type.name, substream_path) + index_granularity_info.marks_file_extension);
auto file_path = path + IDataType::getFileNameForStream(name_type.name, substream_path) + index_granularity_info.marks_file_extension;
/// Missing file is Ok for case when new column was added.
if (file.exists())
if (disk->exists(file_path))
{
UInt64 file_size = file.getSize();
UInt64 file_size = disk->getFileSize(file_path);
if (!file_size)
throw Exception("Part " + path + " is broken: " + file.path() + " is empty.",
throw Exception("Part " + path + " is broken: " + fullPath(disk, file_path) + " is empty.",
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
if (!marks_size)

View File

@ -6,6 +6,7 @@ namespace DB
MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
DiskPtr disk_,
const String & part_path_,
const MergeTreeData & storage_,
const NamesAndTypesList & columns_list_,
@ -14,7 +15,7 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & settings_,
const MergeTreeIndexGranularity & index_granularity_)
: IMergeTreeDataPartWriter(part_path_,
: IMergeTreeDataPartWriter(disk_, part_path_,
storage_, columns_list_,
indices_to_recalc_, marks_file_extension_,
default_codec_, settings_, index_granularity_, true)
@ -26,6 +27,7 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
stream = std::make_unique<Stream>(
data_file_name,
disk_,
part_path + data_file_name, DataPart::DATA_FILE_EXTENSION,
part_path + data_file_name, marks_file_extension,
default_codec,

View File

@ -8,6 +8,7 @@ class MergeTreeDataPartWriterCompact : public IMergeTreeDataPartWriter
{
public:
MergeTreeDataPartWriterCompact(
DiskPtr disk,
const String & part_path,
const MergeTreeData & storage,
const NamesAndTypesList & columns_list,

View File

@ -13,6 +13,7 @@ namespace
}
MergeTreeDataPartWriterWide::MergeTreeDataPartWriterWide(
DiskPtr disk_,
const String & part_path_,
const MergeTreeData & storage_,
const NamesAndTypesList & columns_list_,
@ -21,7 +22,7 @@ MergeTreeDataPartWriterWide::MergeTreeDataPartWriterWide(
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & settings_,
const MergeTreeIndexGranularity & index_granularity_)
: IMergeTreeDataPartWriter(part_path_,
: IMergeTreeDataPartWriter(disk_, part_path_,
storage_, columns_list_, indices_to_recalc_,
marks_file_extension_, default_codec_, settings_, index_granularity_, false)
{
@ -48,6 +49,7 @@ void MergeTreeDataPartWriterWide::addStreams(
column_streams[stream_name] = std::make_unique<Stream>(
stream_name,
disk,
part_path + stream_name, DATA_FILE_EXTENSION,
part_path + stream_name, marks_file_extension,
effective_codec,

View File

@ -11,6 +11,7 @@ public:
using ColumnToSize = std::map<std::string, UInt64>;
MergeTreeDataPartWriterWide(
DiskPtr disk,
const String & part_path,
const MergeTreeData & storage,
const NamesAndTypesList & columns_list,

View File

@ -1263,7 +1263,7 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
const MarkRanges & ranges,
const Settings & settings) const
{
if (!Poco::File(part->getFullPath() + index->getFileName() + ".idx").exists())
if (!part->disk->exists(part->getFullRelativePath() + index->getFileName() + ".idx"))
{
LOG_DEBUG(log, "File for index " << backQuote(index->name) << " does not exist. Skipping it.");
return ranges;

View File

@ -244,16 +244,15 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
new_data_part->is_temp = true;
/// The name could be non-unique in case of stale files from previous runs.
String full_path = new_data_part->getFullPath();
Poco::File dir(full_path);
String full_path = new_data_part->getFullRelativePath();
if (dir.exists())
if (new_data_part->disk->exists(full_path))
{
LOG_WARNING(log, "Removing old temporary directory " + full_path);
dir.remove(true);
LOG_WARNING(log, "Removing old temporary directory " + fullPath(new_data_part->disk, full_path));
new_data_part->disk->removeRecursive(full_path);
}
dir.createDirectories();
new_data_part->disk->createDirectories(full_path);
/// If we need to calculate some columns to sort.
if (data.hasSortingKey() || data.hasSkipIndices())

View File

@ -1,9 +1,6 @@
#include <Storages/MergeTree/MergeTreeIndexGranularityInfo.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Poco/Path.h>
#include <Poco/File.h>
#include <Poco/DirectoryIterator.h>
#include <iostream>
namespace DB
{
@ -14,14 +11,14 @@ namespace ErrorCodes
extern const int UNKNOWN_PART_TYPE;
}
std::optional<std::string> MergeTreeIndexGranularityInfo::getMrkExtensionFromFS(const std::string & path_to_part)
std::optional<std::string> MergeTreeIndexGranularityInfo::getMrkExtensionFromFS(const DiskPtr & disk, const String & path_to_part)
{
if (Poco::File(path_to_part).exists())
if (disk->exists(path_to_part))
{
Poco::DirectoryIterator end;
for (Poco::DirectoryIterator part_it(path_to_part); part_it != end; ++part_it)
for (DiskDirectoryIteratorPtr it = disk->iterateDirectory(path_to_part); it->isValid(); it->next())
{
const auto & ext = "." + part_it.path().getExtension();
Poco::Path path(it->path());
const auto & ext = "." + path.getExtension();
if (ext == getNonAdaptiveMrkExtension()
|| ext == getAdaptiveMrkExtension(MergeTreeDataPartType::WIDE)
|| ext == getAdaptiveMrkExtension(MergeTreeDataPartType::COMPACT))
@ -48,9 +45,9 @@ MergeTreeIndexGranularityInfo::MergeTreeIndexGranularityInfo(const MergeTreeData
setAdaptive(storage_settings->index_granularity_bytes);
}
void MergeTreeIndexGranularityInfo::changeGranularityIfRequired(const std::string & path_to_part)
void MergeTreeIndexGranularityInfo::changeGranularityIfRequired(const DiskPtr & disk, const String & path_to_part)
{
auto mrk_ext = getMrkExtensionFromFS(path_to_part);
auto mrk_ext = getMrkExtensionFromFS(disk, path_to_part);
if (mrk_ext && *mrk_ext == getNonAdaptiveMrkExtension())
setNonAdaptive();
}

View File

@ -3,6 +3,7 @@
#include <optional>
#include <Core/Types.h>
#include <Storages/MergeTree/MergeTreeDataPartType.h>
#include <Disks/IDisk.h>
namespace DB
{
@ -27,7 +28,7 @@ public:
MergeTreeIndexGranularityInfo(const MergeTreeData & storage, MergeTreeDataPartType type_);
void changeGranularityIfRequired(const std::string & path_to_part);
void changeGranularityIfRequired(const DiskPtr & disk, const String & path_to_part);
String getMarksFilePath(const String & path_prefix) const
{
@ -36,7 +37,7 @@ public:
size_t getMarkSizeInBytes(size_t columns_num = 1) const;
static std::optional<std::string> getMrkExtensionFromFS(const std::string & path_to_table);
static std::optional<std::string> getMrkExtensionFromFS(const DiskPtr & disk, const String & path_to_table);
private:
MergeTreeDataPartType type;

View File

@ -7,7 +7,8 @@ namespace DB
MergeTreeIndexReader::MergeTreeIndexReader(
MergeTreeIndexPtr index_, MergeTreeData::DataPartPtr part_, size_t marks_count_, const MarkRanges & all_mark_ranges_)
: index(index_), stream(
part_->getFullPath() + index->getFileName(), ".idx", marks_count_,
part_->disk,
part_->getFullRelativePath() + index->getFileName(), ".idx", marks_count_,
all_mark_ranges_,
MergeTreeReaderSettings{}, nullptr, nullptr,
part_->getFileSizeOrZero(index->getFileName() + ".idx"),

View File

@ -3,6 +3,8 @@
#include <IO/ReadBufferFromFile.h>
#include <Poco/File.h>
#include <utility>
namespace DB
{
@ -14,13 +16,15 @@ namespace ErrorCodes
}
MergeTreeMarksLoader::MergeTreeMarksLoader(
DiskPtr disk_,
MarkCache * mark_cache_,
const String & mrk_path_,
size_t marks_count_,
const MergeTreeIndexGranularityInfo & index_granularity_info_,
bool save_marks_in_cache_,
size_t columns_in_mark_)
: mark_cache(mark_cache_)
: disk(std::move(disk_))
, mark_cache(mark_cache_)
, mrk_path(mrk_path_)
, marks_count(marks_count_)
, index_granularity_info(index_granularity_info_)
@ -46,13 +50,13 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
/// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache.
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
size_t file_size = Poco::File(mrk_path).getSize();
size_t file_size = disk->getFileSize(mrk_path);
size_t mark_size = index_granularity_info.getMarkSizeInBytes(columns_in_mark);
size_t expected_file_size = mark_size * marks_count;
if (expected_file_size != file_size)
throw Exception(
"Bad size of marks file '" + mrk_path + "': " + std::to_string(file_size) + ", must be: " + std::to_string(expected_file_size),
"Bad size of marks file '" + fullPath(disk, mrk_path) + "': " + std::to_string(file_size) + ", must be: " + std::to_string(expected_file_size),
ErrorCodes::CORRUPTED_DATA);
auto res = std::make_shared<MarksInCompressedFile>(marks_count * columns_in_mark);
@ -60,19 +64,21 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
if (!index_granularity_info.is_adaptive)
{
/// Read directly to marks.
ReadBufferFromFile buffer(mrk_path, file_size, -1, reinterpret_cast<char *>(res->data()));
auto buffer = disk->readFile(mrk_path, file_size);
buffer->readStrict(reinterpret_cast<char *>(res->data()), file_size);
if (buffer.eof() || buffer.buffer().size() != file_size)
throw Exception("Cannot read all marks from file " + mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA);
if (!buffer->eof())
throw Exception("Cannot read all marks from file " + mrk_path + ", eof: " + std::to_string(buffer->eof())
+ ", buffer size: " + std::to_string(buffer->buffer().size()) + ", file size: " + std::to_string(file_size), ErrorCodes::CANNOT_READ_ALL_DATA);
}
else
{
ReadBufferFromFile buffer(mrk_path, file_size, -1);
auto buffer = disk->readFile(mrk_path, file_size);
size_t i = 0;
while (!buffer.eof())
while (!buffer->eof())
{
res->read(buffer, i * columns_in_mark, columns_in_mark);
buffer.seek(sizeof(size_t), SEEK_CUR);
res->read(*buffer, i * columns_in_mark, columns_in_mark);
buffer->seek(sizeof(size_t), SEEK_CUR);
++i;
}

View File

@ -1,3 +1,4 @@
#include <Disks/IDisk.h>
#include <Storages/MarkCache.h>
namespace DB
@ -11,6 +12,7 @@ public:
using MarksPtr = MarkCache::MappedPtr;
MergeTreeMarksLoader(
DiskPtr disk_,
MarkCache * mark_cache_,
const String & mrk_path,
size_t marks_count_,
@ -23,6 +25,7 @@ public:
bool initialized() const { return marks != nullptr; }
private:
DiskPtr disk;
MarkCache * mark_cache = nullptr;
String mrk_path;
size_t marks_count;

View File

@ -7,26 +7,29 @@
#include <Poco/File.h>
#include <Poco/Path.h>
#include <utility>
namespace DB
{
MergeTreeMutationEntry::MergeTreeMutationEntry(MutationCommands commands_, const String & path_prefix_, Int64 tmp_number)
MergeTreeMutationEntry::MergeTreeMutationEntry(MutationCommands commands_, DiskPtr disk_, const String & path_prefix_, Int64 tmp_number)
: create_time(time(nullptr))
, commands(std::move(commands_))
, disk(std::move(disk_))
, path_prefix(path_prefix_)
, file_name("tmp_mutation_" + toString(tmp_number) + ".txt")
, is_temp(true)
{
try
{
WriteBufferFromFile out(path_prefix + file_name);
out << "format version: 1\n"
auto out = disk->writeFile(path_prefix + file_name);
*out << "format version: 1\n"
<< "create time: " << LocalDateTime(create_time) << "\n";
out << "commands: ";
commands.writeText(out);
out << "\n";
out.sync();
*out << "commands: ";
commands.writeText(*out);
*out << "\n";
out->sync();
}
catch (...)
{
@ -39,7 +42,7 @@ void MergeTreeMutationEntry::commit(Int64 block_number_)
{
block_number = block_number_;
String new_file_name = "mutation_" + toString(block_number) + ".txt";
Poco::File(path_prefix + file_name).renameTo(path_prefix + new_file_name);
disk->moveFile(path_prefix + file_name, path_prefix + new_file_name);
is_temp = false;
file_name = new_file_name;
}
@ -48,17 +51,17 @@ void MergeTreeMutationEntry::removeFile()
{
if (!file_name.empty())
{
Poco::File file(path_prefix + file_name);
if (!file.exists())
if (!disk->exists(path_prefix + file_name))
return;
file.remove(false);
disk->remove(path_prefix + file_name);
file_name.clear();
}
}
MergeTreeMutationEntry::MergeTreeMutationEntry(const String & path_prefix_, const String & file_name_)
: path_prefix(path_prefix_)
MergeTreeMutationEntry::MergeTreeMutationEntry(DiskPtr disk_, const String & path_prefix_, const String & file_name_)
: disk(std::move(disk_))
, path_prefix(path_prefix_)
, file_name(file_name_)
, is_temp(false)
{
@ -66,20 +69,19 @@ MergeTreeMutationEntry::MergeTreeMutationEntry(const String & path_prefix_, cons
file_name_buf >> "mutation_" >> block_number >> ".txt";
assertEOF(file_name_buf);
ReadBufferFromFile buf(path_prefix + file_name);
auto buf = disk->readFile(path_prefix + file_name);
buf >> "format version: 1\n";
*buf >> "format version: 1\n";
LocalDateTime create_time_dt;
buf >> "create time: " >> create_time_dt >> "\n";
*buf >> "create time: " >> create_time_dt >> "\n";
create_time = create_time_dt;
buf >> "commands: ";
commands.readText(buf);
buf >> "\n";
assertEOF(buf);
*buf >> "commands: ";
commands.readText(*buf);
*buf >> "\n";
assertEOF(*buf);
}
MergeTreeMutationEntry::~MergeTreeMutationEntry()

View File

@ -1,8 +1,9 @@
#pragma once
#include <Core/Types.h>
#include <Storages/MutationCommands.h>
#include <Disks/IDisk.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MutationCommands.h>
namespace DB
@ -15,6 +16,7 @@ struct MergeTreeMutationEntry
time_t create_time = 0;
MutationCommands commands;
DiskPtr disk;
String path_prefix;
String file_name;
bool is_temp = false;
@ -27,7 +29,7 @@ struct MergeTreeMutationEntry
String latest_fail_reason;
/// Create a new entry and write it to a temporary file.
MergeTreeMutationEntry(MutationCommands commands_, const String & path_prefix_, Int64 tmp_number);
MergeTreeMutationEntry(MutationCommands commands_, DiskPtr disk, const String & path_prefix_, Int64 tmp_number);
MergeTreeMutationEntry(const MergeTreeMutationEntry &) = delete;
MergeTreeMutationEntry(MergeTreeMutationEntry &&) = default;
@ -37,7 +39,7 @@ struct MergeTreeMutationEntry
void removeFile();
/// Load an existing entry.
MergeTreeMutationEntry(const String & path_prefix_, const String & file_name_);
MergeTreeMutationEntry(DiskPtr disk_, const String & path_prefix_, const String & file_name_);
~MergeTreeMutationEntry();
};

View File

@ -1,7 +1,6 @@
#include <Storages/MergeTree/MergeTreePartition.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/HashingWriteBuffer.h>
#include <Common/FieldVisitors.h>
#include <DataTypes/DataTypeDate.h>
@ -12,7 +11,6 @@
#include <Common/hex.h>
#include <Core/Block.h>
#include <Poco/File.h>
namespace DB
{
@ -21,9 +19,9 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
static ReadBufferFromFile openForReading(const String & path)
static std::unique_ptr<ReadBufferFromFileBase> openForReading(const DiskPtr & disk, const String & path)
{
return ReadBufferFromFile(path, std::min(static_cast<Poco::File::FileSize>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path).getSize()));
return disk->readFile(path, std::min(size_t(DBMS_DEFAULT_BUFFER_SIZE), disk->getFileSize(path)));
}
String MergeTreePartition::getID(const MergeTreeData & storage) const
@ -123,29 +121,30 @@ void MergeTreePartition::serializeText(const MergeTreeData & storage, WriteBuffe
}
}
void MergeTreePartition::load(const MergeTreeData & storage, const String & part_path)
void MergeTreePartition::load(const MergeTreeData & storage, const DiskPtr & disk, const String & part_path)
{
if (!storage.partition_key_expr)
return;
ReadBufferFromFile file = openForReading(part_path + "partition.dat");
auto partition_file_path = part_path + "partition.dat";
auto file = openForReading(disk, partition_file_path);
value.resize(storage.partition_key_sample.columns());
for (size_t i = 0; i < storage.partition_key_sample.columns(); ++i)
storage.partition_key_sample.getByPosition(i).type->deserializeBinary(value[i], file);
storage.partition_key_sample.getByPosition(i).type->deserializeBinary(value[i], *file);
}
void MergeTreePartition::store(const MergeTreeData & storage, const String & part_path, MergeTreeDataPartChecksums & checksums) const
void MergeTreePartition::store(const MergeTreeData & storage, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const
{
store(storage.partition_key_sample, part_path, checksums);
store(storage.partition_key_sample, disk, part_path, checksums);
}
void MergeTreePartition::store(const Block & partition_key_sample, const String & part_path, MergeTreeDataPartChecksums & checksums) const
void MergeTreePartition::store(const Block & partition_key_sample, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const
{
if (!partition_key_sample)
return;
WriteBufferFromFile out(part_path + "partition.dat");
HashingWriteBuffer out_hashing(out);
auto out = disk->writeFile(part_path + "partition.dat");
HashingWriteBuffer out_hashing(*out);
for (size_t i = 0; i < value.size(); ++i)
partition_key_sample.getByPosition(i).type->serializeBinary(value[i], out_hashing);
out_hashing.next();

View File

@ -1,7 +1,8 @@
#pragma once
#include <Core/Types.h>
#include <Core/Row.h>
#include <Core/Types.h>
#include <Disks/IDisk.h>
#include <IO/WriteBuffer.h>
namespace DB
@ -30,9 +31,9 @@ public:
void serializeText(const MergeTreeData & storage, WriteBuffer & out, const FormatSettings & format_settings) const;
void load(const MergeTreeData & storage, const String & part_path);
void store(const MergeTreeData & storage, const String & part_path, MergeTreeDataPartChecksums & checksums) const;
void store(const Block & partition_key_sample, const String & part_path, MergeTreeDataPartChecksums & checksums) const;
void load(const MergeTreeData & storage, const DiskPtr & disk, const String & part_path);
void store(const MergeTreeData & storage, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const;
void store(const Block & partition_key_sample, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const;
void assign(const MergeTreePartition & other) { value.assign(other.value); }
};

View File

@ -27,18 +27,30 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
: IMergeTreeReader(data_part_, columns_,
uncompressed_cache_, mark_cache_, mark_ranges_,
settings_, avg_value_size_hints_)
, marks_loader(mark_cache,
data_part->index_granularity_info.getMarksFilePath(path + MergeTreeDataPartCompact::DATA_FILE_NAME),
, marks_loader(
data_part->disk,
mark_cache,
data_part->index_granularity_info.getMarksFilePath(data_part->getFullRelativePath() + MergeTreeDataPartCompact::DATA_FILE_NAME),
data_part->getMarksCount(), data_part->index_granularity_info,
settings.save_marks_in_cache, data_part->getColumns().size())
{
size_t buffer_size = settings.max_read_buffer_size;
const String full_data_path = path + MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION;
const String full_data_path = data_part->getFullRelativePath() + MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION;
if (uncompressed_cache)
{
auto buffer = std::make_unique<CachedCompressedReadBuffer>(
full_data_path, uncompressed_cache, 0, settings.min_bytes_to_use_direct_io, buffer_size);
fullPath(data_part->disk, full_data_path),
[this, full_data_path, buffer_size]()
{
return data_part->disk->readFile(
full_data_path,
buffer_size,
0,
settings.min_bytes_to_use_direct_io,
0);
},
uncompressed_cache);
if (profile_callback_)
buffer->setProfileCallback(profile_callback_, clock_type_);
@ -48,8 +60,9 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
}
else
{
auto buffer = std::make_unique<CompressedReadBufferFromFile>(
full_data_path, 0, settings.min_bytes_to_use_direct_io, buffer_size);
auto buffer =
std::make_unique<CompressedReadBufferFromFile>(
data_part->disk->readFile(full_data_path, buffer_size, 0, settings.min_bytes_to_use_direct_io, 0));
if (profile_callback_)
buffer->setProfileCallback(profile_callback_, clock_type_);

View File

@ -1,6 +1,7 @@
#include <Storages/MergeTree/MergeTreeReaderStream.h>
#include <Compression/CachedCompressedReadBuffer.h>
#include <Poco/File.h>
#include <utility>
namespace DB
@ -13,6 +14,7 @@ namespace ErrorCodes
MergeTreeReaderStream::MergeTreeReaderStream(
DiskPtr disk_,
const String & path_prefix_, const String & data_file_extension_, size_t marks_count_,
const MarkRanges & all_mark_ranges,
const MergeTreeReaderSettings & settings,
@ -20,10 +22,10 @@ MergeTreeReaderStream::MergeTreeReaderStream(
UncompressedCache * uncompressed_cache, size_t file_size,
const MergeTreeIndexGranularityInfo * index_granularity_info_,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type)
: path_prefix(path_prefix_), data_file_extension(data_file_extension_), marks_count(marks_count_)
: disk(std::move(disk_)), path_prefix(path_prefix_), data_file_extension(data_file_extension_), marks_count(marks_count_)
, mark_cache(mark_cache_), save_marks_in_cache(settings.save_marks_in_cache)
, index_granularity_info(index_granularity_info_)
, marks_loader(mark_cache, index_granularity_info->getMarksFilePath(path_prefix),
, marks_loader(disk, mark_cache, index_granularity_info->getMarksFilePath(path_prefix),
marks_count, *index_granularity_info, save_marks_in_cache)
{
/// Compute the size of the buffer.
@ -77,8 +79,17 @@ MergeTreeReaderStream::MergeTreeReaderStream(
if (uncompressed_cache)
{
auto buffer = std::make_unique<CachedCompressedReadBuffer>(
path_prefix + data_file_extension, uncompressed_cache, sum_mark_range_bytes,
settings.min_bytes_to_use_direct_io, settings.min_bytes_to_use_mmap_io, buffer_size);
fullPath(disk, path_prefix + data_file_extension),
[this, buffer_size, sum_mark_range_bytes, &settings]()
{
return disk->readFile(
path_prefix + data_file_extension,
buffer_size,
sum_mark_range_bytes,
settings.min_bytes_to_use_direct_io,
settings.min_bytes_to_use_mmap_io);
},
uncompressed_cache);
if (profile_callback)
buffer->setProfileCallback(profile_callback, clock_type);
@ -89,8 +100,9 @@ MergeTreeReaderStream::MergeTreeReaderStream(
else
{
auto buffer = std::make_unique<CompressedReadBufferFromFile>(
path_prefix + data_file_extension, sum_mark_range_bytes,
settings.min_bytes_to_use_direct_io, settings.min_bytes_to_use_mmap_io, buffer_size);
disk->readFile(path_prefix + data_file_extension, buffer_size,
sum_mark_range_bytes, settings.min_bytes_to_use_direct_io, settings.min_bytes_to_use_mmap_io)
);
if (profile_callback)
buffer->setProfileCallback(profile_callback, clock_type);

View File

@ -17,6 +17,7 @@ class MergeTreeReaderStream
{
public:
MergeTreeReaderStream(
DiskPtr disk_,
const String & path_prefix_, const String & data_file_extension_, size_t marks_count_,
const MarkRanges & all_mark_ranges,
const MergeTreeReaderSettings & settings_,
@ -31,6 +32,7 @@ public:
ReadBuffer * data_buffer;
private:
DiskPtr disk;
std::string path_prefix;
std::string data_file_extension;

View File

@ -154,7 +154,7 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si
storage.reportBrokenPart(data_part->name);
/// Better diagnostics.
e.addMessage("(while reading from part " + path + " "
e.addMessage("(while reading from part " + data_part->getFullPath() + " "
"from mark " + toString(from_mark) + " "
"with max_rows_to_read = " + toString(max_rows_to_read) + ")");
throw;
@ -188,8 +188,8 @@ void MergeTreeReaderWide::addStreams(const String & name, const IDataType & type
return;
streams.emplace(stream_name, std::make_unique<MergeTreeReaderStream>(
path + stream_name, DATA_FILE_EXTENSION, data_part->getMarksCount(),
all_mark_ranges, settings, mark_cache,
data_part->disk, data_part->getFullRelativePath() + stream_name, DATA_FILE_EXTENSION,
data_part->getMarksCount(), all_mark_ranges, settings, mark_cache,
uncompressed_cache, data_part->getFileSizeOrZero(stream_name + DATA_FILE_EXTENSION),
&data_part->index_granularity_info,
profile_callback, clock_type));

View File

@ -57,7 +57,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
part_columns_lock(data_part->columns_lock),
all_mark_ranges(std::move(mark_ranges_)),
part_index_in_query(part_index_in_query_),
path(data_part->getFullPath())
path(data_part->getFullRelativePath())
{
/// Let's estimate total number of rows for progress bar.
for (const auto & range : all_mark_ranges)

View File

@ -38,7 +38,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
all_mark_ranges(std::move(mark_ranges_)),
part_index_in_query(part_index_in_query_),
check_columns(check_columns_),
path(data_part->getFullPath())
path(data_part->getFullRelativePath())
{
/// Let's estimate total number of rows for progress bar.
for (const auto & range : all_mark_ranges)

View File

@ -42,6 +42,7 @@ struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
M(SettingUInt64, number_of_free_entries_in_pool_to_execute_mutation, 10, "When there is less than specified number of free entries in pool, do not execute part mutations. This is to leave free threads for regular merges and avoid \"Too many parts\"", 0) \
M(SettingSeconds, old_parts_lifetime, 8 * 60, "How many seconds to keep obsolete parts.", 0) \
M(SettingSeconds, temporary_directories_lifetime, 86400, "How many seconds to keep tmp_-directories.", 0) \
M(SettingBool, disable_background_merges, false, "Disable background merges.", 0) \
\
/** Inserts settings. */ \
M(SettingUInt64, parts_to_delay_insert, 150, "If table contains at least that many active parts in single partition, artificially slow down insert into table.", 0) \

View File

@ -57,7 +57,7 @@ bool MergeTreeThreadSelectBlockInputProcessor::getNewTask()
return false;
}
const std::string path = task->data_part->getFullPath();
const std::string path = task->data_part->getFullRelativePath();
/// Allows pool to reduce number of threads in case of too slow reads.
auto profile_callback = [this](ReadBufferFromFileBase::ProfileInfo info_) { pool->profileFeedback(info_); };

View File

@ -47,18 +47,13 @@ MergedBlockOutputStream::MergedBlockOutputStream(
}
}
Poco::File(part_path).createDirectories();
disk->createDirectories(part_path);
writer = data_part->getWriter(columns_list, data_part->storage.getSkipIndices(), default_codec, writer_settings);
writer->initPrimaryIndex();
writer->initSkipIndices();
}
std::string MergedBlockOutputStream::getPartPath() const
{
return part_path;
}
/// If data is pre-sorted.
void MergedBlockOutputStream::write(const Block & block)
{
@ -99,15 +94,15 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING || isCompactPart(new_part))
{
new_part->partition.store(storage, part_path, checksums);
new_part->partition.store(storage, disk, part_path, checksums);
if (new_part->minmax_idx.initialized)
new_part->minmax_idx.store(storage, part_path, checksums);
new_part->minmax_idx.store(storage, disk, part_path, checksums);
else if (rows_count)
throw Exception("MinMax index was not initialized for new non-empty part " + new_part->name
+ ". It is a bug.", ErrorCodes::LOGICAL_ERROR);
WriteBufferFromFile count_out(part_path + "count.txt", 4096);
HashingWriteBuffer count_out_hashing(count_out);
auto count_out = disk->writeFile(part_path + "count.txt", 4096);
HashingWriteBuffer count_out_hashing(*count_out);
writeIntText(rows_count, count_out_hashing);
count_out_hashing.next();
checksums.files["count.txt"].file_size = count_out_hashing.count();
@ -117,8 +112,8 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
if (!new_part->ttl_infos.empty())
{
/// Write a file with ttl infos in json format.
WriteBufferFromFile out(part_path + "ttl.txt", 4096);
HashingWriteBuffer out_hashing(out);
auto out = disk->writeFile(part_path + "ttl.txt", 4096);
HashingWriteBuffer out_hashing(*out);
new_part->ttl_infos.write(out_hashing);
checksums.files["ttl.txt"].file_size = out_hashing.count();
checksums.files["ttl.txt"].file_hash = out_hashing.getHash();
@ -126,14 +121,14 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
{
/// Write a file with a description of columns.
WriteBufferFromFile out(part_path + "columns.txt", 4096);
total_column_list->writeText(out);
auto out = disk->writeFile(part_path + "columns.txt", 4096);
total_column_list->writeText(*out);
}
{
/// Write file with checksums.
WriteBufferFromFile out(part_path + "checksums.txt", 4096);
checksums.write(out);
auto out = disk->writeFile(part_path + "checksums.txt", 4096);
checksums.write(*out);
}
new_part->rows_count = rows_count;

View File

@ -27,8 +27,6 @@ public:
size_t aio_threshold,
bool blocks_are_granules_size = false);
std::string getPartPath() const;
Block getHeader() const override { return storage.getSampleBlock(); }
/// If the data is pre-sorted.

View File

@ -28,7 +28,8 @@ namespace ErrorCodes
IMergeTreeDataPart::Checksums checkDataPart(
const String & full_path,
const DiskPtr & disk,
const String & full_relative_path,
const NamesAndTypesList & columns_list,
const MergeTreeDataPartType & part_type,
bool require_checksums,
@ -42,16 +43,16 @@ IMergeTreeDataPart::Checksums checkDataPart(
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedChecks};
String path = full_path;
String path = full_relative_path;
if (!path.empty() && path.back() != '/')
path += "/";
NamesAndTypesList columns_txt;
{
ReadBufferFromFile buf(path + "columns.txt");
columns_txt.readText(buf);
assertEOF(buf);
auto buf = disk->readFile(path + "columns.txt");
columns_txt.readText(*buf);
assertEOF(*buf);
}
if (columns_txt != columns_list)
@ -62,10 +63,10 @@ IMergeTreeDataPart::Checksums checkDataPart(
/// Real checksums based on contents of data. Must correspond to checksums.txt. If not - it means the data is broken.
IMergeTreeDataPart::Checksums checksums_data;
auto checksum_compressed_file = [](const String & file_path)
auto checksum_compressed_file = [](const DiskPtr & disk_, const String & file_path)
{
ReadBufferFromFile file_buf(file_path);
HashingReadBuffer compressed_hashing_buf(file_buf);
auto file_buf = disk_->readFile(file_path);
HashingReadBuffer compressed_hashing_buf(*file_buf);
CompressedReadBuffer uncompressing_buf(compressed_hashing_buf);
HashingReadBuffer uncompressed_hashing_buf(uncompressing_buf);
@ -80,7 +81,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
if (part_type == MergeTreeDataPartType::COMPACT)
{
const auto & file_name = MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION;
checksums_data.files[file_name] = checksum_compressed_file(path + file_name);
checksums_data.files[file_name] = checksum_compressed_file(disk, path + file_name);
}
else if (part_type == MergeTreeDataPartType::WIDE)
{
@ -89,7 +90,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
column.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
{
String file_name = IDataType::getFileNameForStream(column.name, substream_path) + ".bin";
checksums_data.files[file_name] = checksum_compressed_file(path + file_name);
checksums_data.files[file_name] = checksum_compressed_file(disk, path + file_name);
}, {});
}
}
@ -99,14 +100,14 @@ IMergeTreeDataPart::Checksums checkDataPart(
}
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator dir_it(path); dir_it != dir_end; ++dir_it)
for (auto it = disk->iterateDirectory(path); it->isValid(); it->next())
{
const String & file_name = dir_it.name();
const String & file_name = it->name();
auto checksum_it = checksums_data.files.find(file_name);
if (checksum_it == checksums_data.files.end() && file_name != "checksums.txt" && file_name != "columns.txt")
{
ReadBufferFromFile file_buf(dir_it->path());
HashingReadBuffer hashing_buf(file_buf);
auto file_buf = disk->readFile(it->path());
HashingReadBuffer hashing_buf(*file_buf);
hashing_buf.tryIgnore(std::numeric_limits<size_t>::max());
checksums_data.files[file_name] = IMergeTreeDataPart::Checksums::Checksum(hashing_buf.count(), hashing_buf.getHash());
}
@ -115,11 +116,11 @@ IMergeTreeDataPart::Checksums checkDataPart(
/// Checksums from file checksums.txt. May be absent. If present, they are subsequently compared with the actual data checksums.
IMergeTreeDataPart::Checksums checksums_txt;
if (require_checksums || Poco::File(path + "checksums.txt").exists())
if (require_checksums || disk->exists(path + "checksums.txt"))
{
ReadBufferFromFile buf(path + "checksums.txt");
checksums_txt.read(buf);
assertEOF(buf);
auto buf = disk->readFile(path + "checksums.txt");
checksums_txt.read(*buf);
assertEOF(*buf);
}
if (is_cancelled())
@ -137,7 +138,8 @@ IMergeTreeDataPart::Checksums checkDataPart(
std::function<bool()> is_cancelled)
{
return checkDataPart(
data_part->getFullPath(),
data_part->disk,
data_part->getFullRelativePath(),
data_part->getColumns(),
data_part->getType(),
require_checksums,

View File

@ -13,6 +13,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
std::function<bool()> is_cancelled = []{ return false; });
IMergeTreeDataPart::Checksums checkDataPart(
const DiskPtr & disk,
const String & full_path,
const NamesAndTypesList & columns_list,
const MergeTreeDataPartType & part_type,

View File

@ -95,6 +95,7 @@ void StorageMergeTree::startup()
/// NOTE background task will also do the above cleanups periodically.
time_after_previous_cleanup.restart();
if (!getSettings()->disable_background_merges)
merging_mutating_task_handle = global_context.getBackgroundPool().addTask([this] { return mergeMutateTask(); });
if (areBackgroundMovesNeeded())
moving_task_handle = global_context.getBackgroundMovePool().addTask([this] { return movePartsTask(); });
@ -417,7 +418,7 @@ void StorageMergeTree::mutate(const MutationCommands & commands, const Context &
/// Choose any disk, because when we load mutations we search them at each disk
/// where storage can be placed. See loadMutations().
auto disk = getStoragePolicy()->getAnyDisk();
MergeTreeMutationEntry entry(commands, getFullPathOnDisk(disk), insert_increment.get());
MergeTreeMutationEntry entry(commands, disk, relative_data_path, insert_increment.get());
String file_name;
Int64 version;
{
@ -558,22 +559,20 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
void StorageMergeTree::loadMutations()
{
Poco::DirectoryIterator end;
const auto full_paths = getDataPaths();
for (const String & full_path : full_paths)
for (const auto & [path, disk] : getRelativeDataPathsWithDisks())
{
for (auto it = Poco::DirectoryIterator(full_path); it != end; ++it)
for (auto it = disk->iterateDirectory(path); it->isValid(); it->next())
{
if (startsWith(it.name(), "mutation_"))
if (startsWith(it->name(), "mutation_"))
{
MergeTreeMutationEntry entry(full_path, it.name());
MergeTreeMutationEntry entry(disk, path, it->name());
Int64 block_number = entry.block_number;
auto insertion = current_mutations_by_id.emplace(it.name(), std::move(entry));
auto insertion = current_mutations_by_id.emplace(it->name(), std::move(entry));
current_mutations_by_version.emplace(block_number, insertion.first->second);
}
else if (startsWith(it.name(), "tmp_mutation_"))
else if (startsWith(it->name(), "tmp_mutation_"))
{
it->remove();
disk->remove(it->path());
}
}
}
@ -1326,26 +1325,26 @@ CheckResults StorageMergeTree::checkData(const ASTPtr & query, const Context & c
for (auto & part : data_parts)
{
String full_part_path = part->getFullPath();
auto disk = part->disk;
String part_path = part->getFullRelativePath();
/// If the checksums file is not present, calculate the checksums and write them to disk.
String checksums_path = full_part_path + "checksums.txt";
String tmp_checksums_path = full_part_path + "checksums.txt.tmp";
if (!Poco::File(checksums_path).exists())
String checksums_path = part_path + "checksums.txt";
String tmp_checksums_path = part_path + "checksums.txt.tmp";
if (!disk->exists(checksums_path))
{
try
{
auto calculated_checksums = checkDataPart(part, false);
calculated_checksums.checkEqual(part->checksums, true);
WriteBufferFromFile out(tmp_checksums_path, 4096);
part->checksums.write(out);
Poco::File(tmp_checksums_path).renameTo(checksums_path);
auto out = disk->writeFile(tmp_checksums_path, 4096);
part->checksums.write(*out);
disk->moveFile(tmp_checksums_path, checksums_path);
results.emplace_back(part->name, true, "Checksums recounted and written to disk.");
}
catch (const Exception & ex)
{
Poco::File tmp_file(tmp_checksums_path);
if (tmp_file.exists())
tmp_file.remove();
if (disk->exists(tmp_checksums_path))
disk->remove(tmp_checksums_path);
results.emplace_back(part->name, false,
"Check of part finished with error: '" + ex.message() + "'");

View File

@ -0,0 +1,12 @@
<yandex>
<shutdown_wait_unfinished>3</shutdown_wait_unfinished>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
</yandex>

View File

@ -0,0 +1,40 @@
<?xml version="1.0"?>
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
<size>1000M</size>
<count>10</count>
</logger>
<storage_configuration>
<disks>
<default>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</default>
</disks>
</storage_configuration>
<tcp_port>9000</tcp_port>
<listen_host>127.0.0.1</listen_host>
<openSSL>
<client>
<cacheSessions>true</cacheSessions>
<verificationMode>none</verificationMode>
<invalidCertificateHandler>
<name>AcceptCertificateHandler</name>
</invalidCertificateHandler>
</client>
</openSSL>
<max_concurrent_queries>500</max_concurrent_queries>
<mark_cache_size>5368709120</mark_cache_size>
<path>./clickhouse/</path>
<users_config>users.xml</users_config>
</yandex>

View File

@ -0,0 +1,23 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
</users>
<quotas>
<default>
</default>
</quotas>
</yandex>

View File

@ -0,0 +1,91 @@
import logging
import random
import string
import pytest
from helpers.cluster import ClickHouseCluster
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
# Creates S3 bucket for tests and allows anonymous read-write access to it.
def prepare_s3_bucket(cluster):
minio_client = cluster.minio_client
if minio_client.bucket_exists(cluster.minio_bucket):
minio_client.remove_bucket(cluster.minio_bucket)
minio_client.make_bucket(cluster.minio_bucket)
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance("node", config_dir="configs", with_minio=True)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
prepare_s3_bucket(cluster)
logging.info("S3 bucket created")
yield cluster
finally:
cluster.shutdown()
def random_string(length):
letters = string.ascii_letters
return ''.join(random.choice(letters) for i in range(length))
def generate_values(date_str, count):
data = [[date_str, i, random_string(10)] for i in range(count)]
data.sort(key=lambda tup: tup[1])
return ",".join(["('{}',{},'{}')".format(x, y, z) for x, y, z in data])
@pytest.mark.parametrize(
"min_rows_for_wide_part,files_overhead,files_per_part",
[
(0, 1, 14),
(8192, 1, 10)
]
)
def test_log_family_s3(cluster, min_rows_for_wide_part, files_overhead, files_per_part):
node = cluster.instances["node"]
minio = cluster.minio_client
node.query(
"""
CREATE TABLE s3_test(
dt Date,
id UInt64,
data String,
INDEX min_max (id) TYPE minmax GRANULARITY 3
) ENGINE=MergeTree()
PARTITION BY dt
ORDER BY (dt, id)
SETTINGS disable_background_merges='true', index_granularity=512, min_rows_for_wide_part={}
"""
.format(min_rows_for_wide_part)
)
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 1
values1 = generate_values('2020-01-03', 4096)
node.query("INSERT INTO s3_test VALUES {}".format(values1))
assert node.query("SELECT * FROM s3_test order by dt, id FORMAT Values") == values1
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == files_overhead + files_per_part
values2 = generate_values('2020-01-04', 4096)
node.query("INSERT INTO s3_test VALUES {}".format(values2))
assert node.query("SELECT * FROM s3_test ORDER BY dt, id FORMAT Values") == values1 + "," + values2
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == files_overhead + 2 * files_per_part
assert node.query("SELECT count(*) FROM s3_test where id = 0 FORMAT Values") == "(2)"
node.query("DROP TABLE s3_test")
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 0

View File

@ -14,6 +14,7 @@
#include <Poco/File.h>
#include <iostream>
#include <Disks/DiskLocal.h>
namespace DB
{
@ -27,6 +28,7 @@ namespace ErrorCodes
void run(String part_path, String date_column, String dest_path)
{
std::shared_ptr<IDisk> disk = std::make_shared<DiskLocal>("local", "/", 0);
auto old_part_path = Poco::Path::forDirectory(part_path);
String old_part_name = old_part_path.directory(old_part_path.depth() - 1);
String old_part_path_str = old_part_path.toString();
@ -83,12 +85,12 @@ void run(String part_path, String date_column, String dest_path)
IMergeTreeDataPart::MinMaxIndex minmax_idx(min_date, max_date);
Names minmax_idx_columns = {date_column};
DataTypes minmax_idx_column_types = {std::make_shared<DataTypeDate>()};
minmax_idx.store(minmax_idx_columns, minmax_idx_column_types, new_tmp_part_path_str, checksums);
minmax_idx.store(minmax_idx_columns, minmax_idx_column_types, disk, new_tmp_part_path_str, checksums);
Block partition_key_sample{{nullptr, std::make_shared<DataTypeUInt32>(), makeASTFunction("toYYYYMM", std::make_shared<ASTIdentifier>(date_column))->getColumnName()}};
MergeTreePartition partition(yyyymm);
partition.store(partition_key_sample, new_tmp_part_path_str, checksums);
partition.store(partition_key_sample, disk, new_tmp_part_path_str, checksums);
String partition_id = partition.getID(partition_key_sample);
Poco::File(new_tmp_part_path_str + "checksums.txt").setWriteable();