mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-12 09:22:05 +00:00
Fix tests, less manual concatination of paths
This commit is contained in:
parent
2b67341f28
commit
9ec92ec514
@ -69,13 +69,7 @@ public:
|
||||
|
||||
bool isValid() const override { return entry != fs::directory_iterator(); }
|
||||
|
||||
String path() const override
|
||||
{
|
||||
if (entry->is_directory())
|
||||
return dir_path / entry->path().filename() / "";
|
||||
else
|
||||
return dir_path / entry->path().filename();
|
||||
}
|
||||
String path() const override { return dir_path / entry->path().filename(); }
|
||||
|
||||
String name() const override { return entry->path().filename(); }
|
||||
|
||||
@ -119,9 +113,9 @@ UInt64 DiskLocal::getTotalSpace() const
|
||||
{
|
||||
struct statvfs fs;
|
||||
if (name == "default") /// for default disk we get space from path/data/
|
||||
fs = getStatVFS(fs::path(disk_path / "data/").string());
|
||||
fs = getStatVFS((fs::path(disk_path) / "data/").string());
|
||||
else
|
||||
fs = getStatVFS(disk_path.string());
|
||||
fs = getStatVFS(disk_path);
|
||||
UInt64 total_size = fs.f_blocks * fs.f_bsize;
|
||||
if (total_size < keep_free_space_bytes)
|
||||
return 0;
|
||||
@ -134,9 +128,9 @@ UInt64 DiskLocal::getAvailableSpace() const
|
||||
/// available for superuser only and for system purposes
|
||||
struct statvfs fs;
|
||||
if (name == "default") /// for default disk we get space from path/data/
|
||||
fs = getStatVFS(fs::path(disk_path / "data/").string());
|
||||
fs = getStatVFS((fs::path(disk_path) / "data/").string());
|
||||
else
|
||||
fs = getStatVFS(disk_path.string());
|
||||
fs = getStatVFS(disk_path);
|
||||
UInt64 total_size = fs.f_bavail * fs.f_bsize;
|
||||
if (total_size < keep_free_space_bytes)
|
||||
return 0;
|
||||
@ -153,64 +147,64 @@ UInt64 DiskLocal::getUnreservedSpace() const
|
||||
|
||||
bool DiskLocal::exists(const String & path) const
|
||||
{
|
||||
return fs::exists(disk_path / path);
|
||||
return fs::exists(fs::path(disk_path) / path);
|
||||
}
|
||||
|
||||
bool DiskLocal::isFile(const String & path) const
|
||||
{
|
||||
return fs::is_regular_file(disk_path / path);
|
||||
return fs::is_regular_file(fs::path(disk_path) / path);
|
||||
}
|
||||
|
||||
bool DiskLocal::isDirectory(const String & path) const
|
||||
{
|
||||
return fs::is_directory(disk_path / path);
|
||||
return fs::is_directory(fs::path(disk_path) / path);
|
||||
}
|
||||
|
||||
size_t DiskLocal::getFileSize(const String & path) const
|
||||
{
|
||||
return fs::file_size(disk_path / path);
|
||||
return fs::file_size(fs::path(disk_path) / path);
|
||||
}
|
||||
|
||||
void DiskLocal::createDirectory(const String & path)
|
||||
{
|
||||
fs::create_directory(disk_path / path);
|
||||
fs::create_directory(fs::path(disk_path) / path);
|
||||
}
|
||||
|
||||
void DiskLocal::createDirectories(const String & path)
|
||||
{
|
||||
fs::create_directories(disk_path / path);
|
||||
fs::create_directories(fs::path(disk_path) / path);
|
||||
}
|
||||
|
||||
void DiskLocal::clearDirectory(const String & path)
|
||||
{
|
||||
for (const auto & entry : fs::directory_iterator(disk_path / path))
|
||||
for (const auto & entry : fs::directory_iterator(fs::path(disk_path) / path))
|
||||
fs::remove(entry.path());
|
||||
}
|
||||
|
||||
void DiskLocal::moveDirectory(const String & from_path, const String & to_path)
|
||||
{
|
||||
fs::rename(disk_path / from_path, disk_path / to_path);
|
||||
fs::rename(fs::path(disk_path) / from_path, fs::path(disk_path) / to_path);
|
||||
}
|
||||
|
||||
DiskDirectoryIteratorPtr DiskLocal::iterateDirectory(const String & path)
|
||||
{
|
||||
return std::make_unique<DiskLocalDirectoryIterator>(disk_path, path);
|
||||
return std::make_unique<DiskLocalDirectoryIterator>(fs::path(disk_path), path);
|
||||
}
|
||||
|
||||
void DiskLocal::moveFile(const String & from_path, const String & to_path)
|
||||
{
|
||||
fs::rename(disk_path / from_path, disk_path / to_path);
|
||||
fs::rename(fs::path(disk_path) / from_path, fs::path(disk_path) / to_path);
|
||||
}
|
||||
|
||||
void DiskLocal::replaceFile(const String & from_path, const String & to_path)
|
||||
{
|
||||
fs::path from_file = disk_path / from_path;
|
||||
fs::path to_file = disk_path / to_path;
|
||||
fs::path from_file = fs::path(disk_path) / from_path;
|
||||
fs::path to_file = fs::path(disk_path) / to_path;
|
||||
if (fs::exists(to_file))
|
||||
{
|
||||
fs::path tmp_file(to_file.string() + ".old");
|
||||
fs::rename(to_file, tmp_file);
|
||||
fs::rename(from_file, disk_path / to_path);
|
||||
fs::rename(from_file, fs::path(disk_path) / to_path);
|
||||
fs::remove(tmp_file);
|
||||
}
|
||||
else
|
||||
@ -223,33 +217,33 @@ std::unique_ptr<ReadBufferFromFileBase>
|
||||
DiskLocal::readFile(
|
||||
const String & path, size_t buf_size, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold, MMappedFileCache * mmap_cache) const
|
||||
{
|
||||
return createReadBufferFromFileBase(disk_path / path, estimated_size, aio_threshold, mmap_threshold, mmap_cache, buf_size);
|
||||
return createReadBufferFromFileBase(fs::path(disk_path) / path, estimated_size, aio_threshold, mmap_threshold, mmap_cache, buf_size);
|
||||
}
|
||||
|
||||
std::unique_ptr<WriteBufferFromFileBase>
|
||||
DiskLocal::writeFile(const String & path, size_t buf_size, WriteMode mode)
|
||||
{
|
||||
int flags = (mode == WriteMode::Append) ? (O_APPEND | O_CREAT | O_WRONLY) : -1;
|
||||
return std::make_unique<WriteBufferFromFile>(disk_path / path, buf_size, flags);
|
||||
return std::make_unique<WriteBufferFromFile>(fs::path(disk_path) / path, buf_size, flags);
|
||||
}
|
||||
|
||||
void DiskLocal::removeFile(const String & path)
|
||||
{
|
||||
auto fs_path = disk_path / path;
|
||||
auto fs_path = fs::path(disk_path) / path;
|
||||
if (0 != unlink(fs_path.c_str()))
|
||||
throwFromErrnoWithPath("Cannot unlink file " + fs_path.string(), fs_path, ErrorCodes::CANNOT_UNLINK);
|
||||
}
|
||||
|
||||
void DiskLocal::removeFileIfExists(const String & path)
|
||||
{
|
||||
auto fs_path = disk_path / path;
|
||||
auto fs_path = fs::path(disk_path) / path;
|
||||
if (0 != unlink(fs_path.c_str()) && errno != ENOENT)
|
||||
throwFromErrnoWithPath("Cannot unlink file " + fs_path.string(), fs_path, ErrorCodes::CANNOT_UNLINK);
|
||||
}
|
||||
|
||||
void DiskLocal::removeDirectory(const String & path)
|
||||
{
|
||||
auto fs_path = disk_path / path;
|
||||
auto fs_path = fs::path(disk_path) / path;
|
||||
if (0 != rmdir(fs_path.c_str()))
|
||||
throwFromErrnoWithPath("Cannot rmdir " + fs_path.string(), fs_path, ErrorCodes::CANNOT_RMDIR);
|
||||
}
|
||||
@ -262,42 +256,42 @@ void DiskLocal::removeRecursive(const String & path)
|
||||
void DiskLocal::listFiles(const String & path, std::vector<String> & file_names)
|
||||
{
|
||||
file_names.clear();
|
||||
for (const auto & entry : fs::directory_iterator(disk_path / path))
|
||||
for (const auto & entry : fs::directory_iterator(fs::path(disk_path) / path))
|
||||
file_names.emplace_back(entry.path().filename());
|
||||
}
|
||||
|
||||
void DiskLocal::setLastModified(const String & path, const Poco::Timestamp & timestamp)
|
||||
{
|
||||
fs::last_write_time(disk_path / path, static_cast<fs::file_time_type>(std::chrono::microseconds(timestamp.epochMicroseconds())));
|
||||
fs::last_write_time(fs::path(disk_path) / path, static_cast<fs::file_time_type>(std::chrono::microseconds(timestamp.epochMicroseconds())));
|
||||
}
|
||||
|
||||
Poco::Timestamp DiskLocal::getLastModified(const String & path)
|
||||
{
|
||||
fs::file_time_type fs_time = fs::last_write_time(disk_path / path);
|
||||
fs::file_time_type fs_time = fs::last_write_time(fs::path(disk_path) / path);
|
||||
auto micro_sec = std::chrono::duration_cast<std::chrono::microseconds>(fs_time.time_since_epoch());
|
||||
return Poco::Timestamp(micro_sec.count());
|
||||
}
|
||||
|
||||
void DiskLocal::createHardLink(const String & src_path, const String & dst_path)
|
||||
{
|
||||
DB::createHardLink(disk_path / src_path, disk_path / dst_path);
|
||||
DB::createHardLink(fs::path(disk_path) / src_path, fs::path(disk_path) / dst_path);
|
||||
}
|
||||
|
||||
void DiskLocal::truncateFile(const String & path, size_t size)
|
||||
{
|
||||
int res = truncate((disk_path / path).string().data(), size);
|
||||
int res = truncate((fs::path(disk_path) / path).string().data(), size);
|
||||
if (-1 == res)
|
||||
throwFromErrnoWithPath("Cannot truncate file " + path, path, ErrorCodes::CANNOT_TRUNCATE_FILE);
|
||||
}
|
||||
|
||||
void DiskLocal::createFile(const String & path)
|
||||
{
|
||||
Poco::File(disk_path / path).createFile();
|
||||
Poco::File(fs::path(disk_path) / path).createFile();
|
||||
}
|
||||
|
||||
void DiskLocal::setReadOnly(const String & path)
|
||||
{
|
||||
Poco::File(disk_path / path).setReadOnly(true);
|
||||
Poco::File(fs::path(disk_path) / path).setReadOnly(true);
|
||||
}
|
||||
|
||||
bool inline isSameDiskType(const IDisk & one, const IDisk & another)
|
||||
@ -308,14 +302,14 @@ bool inline isSameDiskType(const IDisk & one, const IDisk & another)
|
||||
void DiskLocal::copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path)
|
||||
{
|
||||
if (isSameDiskType(*this, *to_disk))
|
||||
Poco::File(disk_path / from_path).copyTo(to_disk->getPath() + to_path); /// Use more optimal way.
|
||||
Poco::File(fs::path(disk_path) / from_path).copyTo(fs::path(to_disk->getPath()) / to_path); /// Use more optimal way.
|
||||
else
|
||||
IDisk::copy(from_path, to_disk, to_path); /// Copy files through buffers.
|
||||
}
|
||||
|
||||
SyncGuardPtr DiskLocal::getDirectorySyncGuard(const String & path) const
|
||||
{
|
||||
return std::make_unique<LocalDirectorySyncGuard>(disk_path / path);
|
||||
return std::make_unique<LocalDirectorySyncGuard>(fs::path(disk_path) / path);
|
||||
}
|
||||
|
||||
DiskPtr DiskLocalReservation::getDisk(size_t i) const
|
||||
|
@ -22,15 +22,15 @@ public:
|
||||
friend class DiskLocalReservation;
|
||||
|
||||
DiskLocal(const String & name_, const String & path_, UInt64 keep_free_space_bytes_)
|
||||
: name(name_), disk_path(path_), disk_path_str(path_), keep_free_space_bytes(keep_free_space_bytes_)
|
||||
: name(name_), disk_path(path_), keep_free_space_bytes(keep_free_space_bytes_)
|
||||
{
|
||||
if (disk_path_str.back() != '/')
|
||||
throw Exception("Disk path must ends with '/', but '" + disk_path_str + "' doesn't.", ErrorCodes::LOGICAL_ERROR);
|
||||
if (disk_path.back() != '/')
|
||||
throw Exception("Disk path must end with '/', but '" + disk_path + "' doesn't.", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
const String & getName() const override { return name; }
|
||||
|
||||
const String & getPath() const override { return disk_path_str; }
|
||||
const String & getPath() const override { return disk_path; }
|
||||
|
||||
ReservationPtr reserve(UInt64 bytes) override;
|
||||
|
||||
@ -107,8 +107,7 @@ private:
|
||||
|
||||
private:
|
||||
const String name;
|
||||
const fs::path disk_path;
|
||||
const String disk_path_str;
|
||||
const String disk_path;
|
||||
const UInt64 keep_free_space_bytes;
|
||||
|
||||
UInt64 reserved_bytes = 0;
|
||||
|
@ -22,7 +22,8 @@ bool IDisk::isDirectoryEmpty(const String & path)
|
||||
|
||||
void copyFile(IDisk & from_disk, const String & from_path, IDisk & to_disk, const String & to_path)
|
||||
{
|
||||
LOG_DEBUG(&Poco::Logger::get("IDisk"), "Copying from {} {} to {} {}.", from_disk.getName(), from_path, to_disk.getName(), to_path);
|
||||
LOG_DEBUG(&Poco::Logger::get("IDisk"), "Copying from {} (path: {}) {} to {} (path: {}) {}.",
|
||||
from_disk.getName(), from_disk.getPath(), from_path, to_disk.getName(), to_disk.getPath(), to_path);
|
||||
|
||||
auto in = from_disk.readFile(from_path);
|
||||
auto out = to_disk.writeFile(to_path);
|
||||
@ -41,16 +42,16 @@ void asyncCopy(IDisk & from_disk, String from_path, IDisk & to_disk, String to_p
|
||||
[&from_disk, from_path, &to_disk, to_path]()
|
||||
{
|
||||
setThreadName("DiskCopier");
|
||||
DB::copyFile(from_disk, from_path, to_disk, to_path + fileName(from_path));
|
||||
DB::copyFile(from_disk, from_path, to_disk, fs::path(to_path) / fileName(from_path));
|
||||
});
|
||||
|
||||
results.push_back(std::move(result));
|
||||
}
|
||||
else
|
||||
{
|
||||
const String & dir_name = directoryPath(from_path);
|
||||
fs::path dir_name = fs::path(from_path).parent_path().filename();
|
||||
fs::path dest(fs::path(to_path) / dir_name);
|
||||
fs::create_directories(dest);
|
||||
to_disk.createDirectories(dest);
|
||||
|
||||
for (auto it = from_disk.iterateDirectory(from_path); it->isValid(); it->next())
|
||||
asyncCopy(from_disk, it->path(), to_disk, dest, exec, results);
|
||||
|
@ -213,10 +213,10 @@ public:
|
||||
virtual DiskType::Type getType() const = 0;
|
||||
|
||||
/// Invoked when Global Context is shutdown.
|
||||
virtual void shutdown() { }
|
||||
virtual void shutdown() {}
|
||||
|
||||
/// Performs action on disk startup.
|
||||
virtual void startup() { }
|
||||
virtual void startup() {}
|
||||
|
||||
/// Return some uniq string for file, overrode for S3
|
||||
/// Required for distinguish different copies of the same part on S3
|
||||
@ -234,7 +234,7 @@ public:
|
||||
virtual SyncGuardPtr getDirectorySyncGuard(const String & path) const;
|
||||
|
||||
/// Applies new settings for disk in runtime.
|
||||
virtual void applyNewSettings(const Poco::Util::AbstractConfiguration &, ContextConstPtr) { }
|
||||
virtual void applyNewSettings(const Poco::Util::AbstractConfiguration &, ContextConstPtr) {}
|
||||
|
||||
protected:
|
||||
friend class DiskDecorator;
|
||||
@ -295,7 +295,7 @@ public:
|
||||
/// Return full path to a file on disk.
|
||||
inline String fullPath(const DiskPtr & disk, const String & path)
|
||||
{
|
||||
return disk->getPath() + path;
|
||||
return fs::path(disk->getPath()) / path;
|
||||
}
|
||||
|
||||
/// Return parent path for the specified path.
|
||||
@ -313,7 +313,7 @@ inline String fileName(const String & path)
|
||||
/// Return directory path for the specified path.
|
||||
inline String directoryPath(const String & path)
|
||||
{
|
||||
return fs::is_directory(path) ? path : fs::path(path).parent_path().string();
|
||||
return Poco::Path(path).setFileName("").toString();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -117,12 +117,12 @@ struct DiskS3::Metadata
|
||||
using PathAndSize = std::pair<String, size_t>;
|
||||
|
||||
/// S3 root path.
|
||||
const String & s3_root_path;
|
||||
fs::path s3_root_path;
|
||||
|
||||
/// Disk path.
|
||||
const String & disk_path;
|
||||
fs::path disk_path;
|
||||
/// Relative path to metadata file on local FS.
|
||||
String metadata_file_path;
|
||||
fs::path metadata_file_path;
|
||||
/// Total size of all S3 objects.
|
||||
size_t total_size;
|
||||
/// S3 objects paths and their sizes.
|
||||
@ -141,14 +141,14 @@ struct DiskS3::Metadata
|
||||
|
||||
try
|
||||
{
|
||||
ReadBufferFromFile buf(disk_path + metadata_file_path, 1024); /* reasonable buffer size for small file */
|
||||
ReadBufferFromFile buf(disk_path / metadata_file_path, 1024); /* reasonable buffer size for small file */
|
||||
|
||||
UInt32 version;
|
||||
readIntText(version, buf);
|
||||
|
||||
if (version < VERSION_ABSOLUTE_PATHS || version > VERSION_READ_ONLY_FLAG)
|
||||
throw Exception(
|
||||
"Unknown metadata file version. Path: " + disk_path + metadata_file_path
|
||||
"Unknown metadata file version. Path: " + (disk_path / metadata_file_path).string()
|
||||
+ " Version: " + std::to_string(version) + ", Maximum expected version: " + std::to_string(VERSION_READ_ONLY_FLAG),
|
||||
ErrorCodes::UNKNOWN_FORMAT);
|
||||
|
||||
@ -169,12 +169,12 @@ struct DiskS3::Metadata
|
||||
readEscapedString(s3_object_path, buf);
|
||||
if (version == VERSION_ABSOLUTE_PATHS)
|
||||
{
|
||||
if (!boost::algorithm::starts_with(s3_object_path, s3_root_path))
|
||||
if (!boost::algorithm::starts_with(s3_object_path, s3_root_path.string()))
|
||||
throw Exception(
|
||||
"Path in metadata does not correspond S3 root path. Path: " + s3_object_path
|
||||
+ ", root path: " + s3_root_path + ", disk path: " + disk_path_,
|
||||
+ ", root path: " + s3_root_path.string() + ", disk path: " + disk_path_,
|
||||
ErrorCodes::UNKNOWN_FORMAT);
|
||||
s3_object_path = s3_object_path.substr(s3_root_path.size());
|
||||
s3_object_path = s3_object_path.substr(s3_root_path.string().size());
|
||||
}
|
||||
assertChar('\n', buf);
|
||||
s3_objects[i] = {s3_object_path, s3_object_size};
|
||||
@ -207,7 +207,7 @@ struct DiskS3::Metadata
|
||||
/// Fsync metadata file if 'sync' flag is set.
|
||||
void save(bool sync = false)
|
||||
{
|
||||
WriteBufferFromFile buf(disk_path + metadata_file_path, 1024);
|
||||
WriteBufferFromFile buf(disk_path / metadata_file_path, 1024);
|
||||
|
||||
writeIntText(VERSION_RELATIVE_PATHS, buf);
|
||||
writeChar('\n', buf);
|
||||
@ -338,7 +338,7 @@ private:
|
||||
const auto & [path, size] = metadata.s3_objects[i];
|
||||
if (size > offset)
|
||||
{
|
||||
auto buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, metadata.s3_root_path + path, s3_max_single_read_retries, buf_size);
|
||||
auto buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, metadata.s3_root_path / path, s3_max_single_read_retries, buf_size);
|
||||
buf->seek(offset, SEEK_SET);
|
||||
return buf;
|
||||
}
|
||||
@ -367,7 +367,7 @@ private:
|
||||
|
||||
++current_buf_idx;
|
||||
const auto & path = metadata.s3_objects[current_buf_idx].first;
|
||||
current_buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, metadata.s3_root_path + path, s3_max_single_read_retries, buf_size);
|
||||
current_buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, metadata.s3_root_path / path, s3_max_single_read_retries, buf_size);
|
||||
current_buf->next();
|
||||
working_buffer = current_buf->buffer();
|
||||
absolute_position += working_buffer.size();
|
||||
@ -447,16 +447,16 @@ public:
|
||||
String path() const override
|
||||
{
|
||||
if (fs::is_directory(iter->path()))
|
||||
return folder_path + iter->path().filename().string() + '/';
|
||||
return folder_path / iter->path().filename().string() / "";
|
||||
else
|
||||
return folder_path + iter->path().filename().string();
|
||||
return folder_path / iter->path().filename().string();
|
||||
}
|
||||
|
||||
String name() const override { return iter->path().filename(); }
|
||||
|
||||
private:
|
||||
fs::directory_iterator iter;
|
||||
String folder_path;
|
||||
fs::path folder_path;
|
||||
};
|
||||
|
||||
|
||||
@ -623,13 +623,13 @@ String DiskS3::getUniqueId(const String & path) const
|
||||
Metadata metadata(s3_root_path, metadata_path, path);
|
||||
String id;
|
||||
if (!metadata.s3_objects.empty())
|
||||
id = metadata.s3_root_path + metadata.s3_objects[0].first;
|
||||
id = metadata.s3_root_path / metadata.s3_objects[0].first;
|
||||
return id;
|
||||
}
|
||||
|
||||
DiskDirectoryIteratorPtr DiskS3::iterateDirectory(const String & path)
|
||||
{
|
||||
return std::make_unique<DiskS3DirectoryIterator>(metadata_path + path, path);
|
||||
return std::make_unique<DiskS3DirectoryIterator>(fs::path(metadata_path) / path, path);
|
||||
}
|
||||
|
||||
void DiskS3::clearDirectory(const String & path)
|
||||
@ -683,7 +683,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, si
|
||||
auto metadata = readMeta(path);
|
||||
|
||||
LOG_DEBUG(log, "Read from file by path: {}. Existing S3 objects: {}",
|
||||
backQuote(metadata_path + path), metadata.s3_objects.size());
|
||||
backQuote((fs::path(metadata_path) / path).string()), metadata.s3_objects.size());
|
||||
|
||||
auto reader = std::make_unique<ReadIndirectBufferFromS3>(settings->client, bucket, metadata, settings->s3_max_single_read_retries, buf_size);
|
||||
return std::make_unique<SeekAvoidingReadBuffer>(std::move(reader), settings->min_bytes_for_seek);
|
||||
@ -708,12 +708,12 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
|
||||
}
|
||||
|
||||
LOG_DEBUG(log, "{} to file by path: {}. S3 path: {}",
|
||||
mode == WriteMode::Rewrite ? "Write" : "Append", backQuote(metadata_path + path), s3_root_path + s3_path);
|
||||
mode == WriteMode::Rewrite ? "Write" : "Append", backQuote((fs::path(metadata_path) / path).string()), (fs::path(s3_root_path) / s3_path).string());
|
||||
|
||||
auto s3_buffer = std::make_unique<WriteBufferFromS3>(
|
||||
settings->client,
|
||||
bucket,
|
||||
metadata.s3_root_path + s3_path,
|
||||
fs::path(metadata.s3_root_path) / s3_path,
|
||||
settings->s3_min_upload_part_size,
|
||||
settings->s3_max_single_part_upload_size,
|
||||
std::move(object_metadata),
|
||||
@ -724,7 +724,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
|
||||
|
||||
void DiskS3::removeMeta(const String & path, AwsS3KeyKeeper & keys)
|
||||
{
|
||||
LOG_DEBUG(log, "Remove file by path: {}", backQuote(metadata_path + path));
|
||||
LOG_DEBUG(log, "Remove file by path: {}", backQuote((fs::path(metadata_path) / path).string()));
|
||||
|
||||
fs::path file = fs::path(metadata_path) / path;
|
||||
|
||||
@ -741,7 +741,7 @@ void DiskS3::removeMeta(const String & path, AwsS3KeyKeeper & keys)
|
||||
fs::remove(file);
|
||||
|
||||
for (const auto & [s3_object_path, _] : metadata.s3_objects)
|
||||
keys.addKey(s3_root_path + s3_object_path);
|
||||
keys.addKey(fs::path(s3_root_path) / s3_object_path);
|
||||
}
|
||||
else /// In other case decrement number of references, save metadata and delete file.
|
||||
{
|
||||
@ -904,7 +904,7 @@ void DiskS3::createHardLink(const String & src_path, const String & dst_path, bo
|
||||
src.save();
|
||||
|
||||
/// Create FS hardlink to metadata file.
|
||||
DB::createHardLink(metadata_path + src_path, metadata_path + dst_path);
|
||||
DB::createHardLink(fs::path(metadata_path) / src_path, fs::path(metadata_path) / dst_path);
|
||||
}
|
||||
|
||||
void DiskS3::createFile(const String & path)
|
||||
@ -940,7 +940,7 @@ void DiskS3::createFileOperationObject(const String & operation_name, UInt64 rev
|
||||
WriteBufferFromS3 buffer(
|
||||
settings->client,
|
||||
bucket,
|
||||
s3_root_path + key,
|
||||
fs::path(s3_root_path) / key,
|
||||
settings->s3_min_upload_part_size,
|
||||
settings->s3_max_single_part_upload_size,
|
||||
metadata);
|
||||
@ -993,14 +993,14 @@ void DiskS3::findLastRevision()
|
||||
int DiskS3::readSchemaVersion(const String & source_bucket, const String & source_path)
|
||||
{
|
||||
int version = 0;
|
||||
if (!checkObjectExists(source_bucket, source_path + SCHEMA_VERSION_OBJECT))
|
||||
if (!checkObjectExists(source_bucket, fs::path(source_path) / SCHEMA_VERSION_OBJECT))
|
||||
return version;
|
||||
|
||||
auto settings = current_settings.get();
|
||||
ReadBufferFromS3 buffer(
|
||||
settings->client,
|
||||
source_bucket,
|
||||
source_path + SCHEMA_VERSION_OBJECT,
|
||||
fs::path(source_path) / SCHEMA_VERSION_OBJECT,
|
||||
settings->s3_max_single_read_retries);
|
||||
|
||||
readIntText(version, buffer);
|
||||
@ -1015,7 +1015,7 @@ void DiskS3::saveSchemaVersion(const int & version)
|
||||
WriteBufferFromS3 buffer(
|
||||
settings->client,
|
||||
bucket,
|
||||
s3_root_path + SCHEMA_VERSION_OBJECT,
|
||||
fs::path(s3_root_path) / SCHEMA_VERSION_OBJECT,
|
||||
settings->s3_min_upload_part_size,
|
||||
settings->s3_max_single_part_upload_size);
|
||||
|
||||
@ -1027,7 +1027,7 @@ void DiskS3::updateObjectMetadata(const String & key, const ObjectMetadata & met
|
||||
{
|
||||
auto settings = current_settings.get();
|
||||
Aws::S3::Model::CopyObjectRequest request;
|
||||
request.SetCopySource(bucket + "/" + key);
|
||||
request.SetCopySource(fs::path(bucket) / key);
|
||||
request.SetBucket(bucket);
|
||||
request.SetKey(key);
|
||||
request.SetMetadata(metadata);
|
||||
@ -1039,7 +1039,7 @@ void DiskS3::updateObjectMetadata(const String & key, const ObjectMetadata & met
|
||||
|
||||
void DiskS3::migrateFileToRestorableSchema(const String & path)
|
||||
{
|
||||
LOG_DEBUG(log, "Migrate file {} to restorable schema", metadata_path + path);
|
||||
LOG_DEBUG(log, "Migrate file {} to restorable schema", (fs::path(metadata_path) / path).string());
|
||||
|
||||
auto meta = readMeta(path);
|
||||
|
||||
@ -1048,7 +1048,7 @@ void DiskS3::migrateFileToRestorableSchema(const String & path)
|
||||
ObjectMetadata metadata {
|
||||
{"path", path}
|
||||
};
|
||||
updateObjectMetadata(s3_root_path + key, metadata);
|
||||
updateObjectMetadata(fs::path(s3_root_path) / key, metadata);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1056,7 +1056,7 @@ void DiskS3::migrateToRestorableSchemaRecursive(const String & path, Futures & r
|
||||
{
|
||||
checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks.
|
||||
|
||||
LOG_DEBUG(log, "Migrate directory {} to restorable schema", metadata_path + path);
|
||||
LOG_DEBUG(log, "Migrate directory {} to restorable schema", (fs::path(metadata_path) / path).string());
|
||||
|
||||
bool dir_contains_only_files = true;
|
||||
for (auto it = iterateDirectory(path); it->isValid(); it->next())
|
||||
@ -1105,7 +1105,7 @@ void DiskS3::migrateToRestorableSchema()
|
||||
|
||||
for (const auto & root : data_roots)
|
||||
if (exists(root))
|
||||
migrateToRestorableSchemaRecursive(root + '/', results);
|
||||
migrateToRestorableSchemaRecursive(root, results);
|
||||
|
||||
for (auto & result : results)
|
||||
result.wait();
|
||||
@ -1194,7 +1194,7 @@ void DiskS3::copyObject(const String & src_bucket, const String & src_key, const
|
||||
{
|
||||
auto settings = current_settings.get();
|
||||
Aws::S3::Model::CopyObjectRequest request;
|
||||
request.SetCopySource(src_bucket + "/" + src_key);
|
||||
request.SetCopySource(fs::path(src_bucket) / src_key);
|
||||
request.SetBucket(dst_bucket);
|
||||
request.SetKey(dst_key);
|
||||
|
||||
@ -1212,7 +1212,7 @@ struct DiskS3::RestoreInformation
|
||||
|
||||
void DiskS3::readRestoreInformation(DiskS3::RestoreInformation & restore_information)
|
||||
{
|
||||
ReadBufferFromFile buffer(metadata_path + RESTORE_FILE_NAME, 512);
|
||||
ReadBufferFromFile buffer(fs::path(metadata_path) / RESTORE_FILE_NAME, 512);
|
||||
buffer.next();
|
||||
|
||||
try
|
||||
@ -1302,7 +1302,7 @@ void DiskS3::restore()
|
||||
bool cleanup_s3 = information.source_bucket != bucket || information.source_path != s3_root_path;
|
||||
for (const auto & root : data_roots)
|
||||
if (exists(root))
|
||||
removeSharedRecursive(root + '/', !cleanup_s3);
|
||||
removeSharedRecursive(root, !cleanup_s3);
|
||||
|
||||
restoreFiles(information);
|
||||
restoreFileOperations(information);
|
||||
@ -1393,8 +1393,8 @@ void DiskS3::processRestoreFiles(const String & source_bucket, const String & so
|
||||
auto relative_key = shrinkKey(source_path, key);
|
||||
|
||||
/// Copy object if we restore to different bucket / path.
|
||||
if (bucket != source_bucket || s3_root_path != source_path)
|
||||
copyObject(source_bucket, key, bucket, s3_root_path + relative_key);
|
||||
if (bucket != source_bucket || fs::path(s3_root_path) != fs::path(source_path))
|
||||
copyObject(source_bucket, key, bucket, fs::path(s3_root_path) / relative_key);
|
||||
|
||||
metadata.addObject(relative_key, head_result.GetContentLength());
|
||||
metadata.save();
|
||||
@ -1482,7 +1482,7 @@ void DiskS3::restoreFileOperations(const RestoreInformation & restore_informatio
|
||||
};
|
||||
|
||||
/// Execute.
|
||||
listObjects(restore_information.source_bucket, restore_information.source_path + "operations/", restore_file_operations);
|
||||
listObjects(restore_information.source_bucket, fs::path(restore_information.source_path) / "operations/", restore_file_operations);
|
||||
|
||||
if (restore_information.detached)
|
||||
{
|
||||
@ -1505,7 +1505,7 @@ void DiskS3::restoreFileOperations(const RestoreInformation & restore_informatio
|
||||
|
||||
LOG_DEBUG(log, "Move directory to 'detached' {} -> {}", path, detached_path);
|
||||
|
||||
fs::rename(fs::path(metadata_path) / path, fs::path(metadata_path) / detached_path);
|
||||
Poco::File(fs::path(metadata_path) / path).moveTo(fs::path(metadata_path) / detached_path);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1537,13 +1537,13 @@ String DiskS3::revisionToString(UInt64 revision)
|
||||
|
||||
String DiskS3::pathToDetached(const String & source_path)
|
||||
{
|
||||
return fs::path(source_path).parent_path() / "detached" / "";
|
||||
return Poco::Path(source_path).parent().append(Poco::Path("detached")).toString() + '/';
|
||||
}
|
||||
|
||||
void DiskS3::onFreeze(const String & path)
|
||||
{
|
||||
createDirectories(path);
|
||||
WriteBufferFromFile revision_file_buf(metadata_path + path + "revision.txt", 32);
|
||||
WriteBufferFromFile revision_file_buf(fs::path(metadata_path) / path / "revision.txt", 32);
|
||||
writeIntText(revision_counter.load(), revision_file_buf);
|
||||
revision_file_buf.finalize();
|
||||
}
|
||||
|
@ -230,7 +230,7 @@ void Service::sendPartFromDisk(const MergeTreeData::DataPartPtr & part, WriteBuf
|
||||
{
|
||||
String file_name = it.first;
|
||||
|
||||
String path = part->getFullRelativePath() + file_name;
|
||||
String path = fs::path(part->getFullRelativePath()) / file_name;
|
||||
|
||||
UInt64 size = disk->getFileSize(path);
|
||||
|
||||
@ -279,7 +279,7 @@ void Service::sendPartS3Metadata(const MergeTreeData::DataPartPtr & part, WriteB
|
||||
{
|
||||
String file_name = it.first;
|
||||
|
||||
String metadata_file = disk->getPath() + part->getFullRelativePath() + file_name;
|
||||
String metadata_file = fs::path(disk->getPath()) / part->getFullRelativePath() / file_name;
|
||||
|
||||
fs::path metadata(metadata_file);
|
||||
|
||||
@ -480,7 +480,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
||||
readUUIDText(part_uuid, in);
|
||||
|
||||
auto storage_id = data.getStorageID();
|
||||
String new_part_path = part_type == "InMemory" ? "memory" : data.getFullPathOnDisk(reservation->getDisk()) + part_name + "/";
|
||||
String new_part_path = part_type == "InMemory" ? "memory" : fs::path(data.getFullPathOnDisk(reservation->getDisk())) / part_name / "";
|
||||
auto entry = data.getContext()->getReplicatedFetchList().insert(
|
||||
storage_id.getDatabaseName(), storage_id.getTableName(),
|
||||
part_info.partition_id, part_name, new_part_path,
|
||||
@ -551,7 +551,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
|
||||
throw Exception("Logical error: tmp_prefix and part_name cannot be empty or contain '.' or '/' characters.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
String part_relative_path = String(to_detached ? "detached/" : "") + tmp_prefix + part_name;
|
||||
String part_download_path = data.getRelativeDataPath() + part_relative_path + "/";
|
||||
String part_download_path = fs::path(data.getRelativeDataPath()) / part_relative_path / "";
|
||||
|
||||
if (disk->exists(part_download_path))
|
||||
{
|
||||
@ -583,7 +583,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
|
||||
" This may happen if we are trying to download part from malicious replica or logical error.",
|
||||
ErrorCodes::INSECURE_PATH);
|
||||
|
||||
auto file_out = disk->writeFile(part_download_path + file_name);
|
||||
auto file_out = disk->writeFile(fs::path(part_download_path) / file_name);
|
||||
HashingWriteBuffer hashing_out(*file_out);
|
||||
copyData(in, hashing_out, file_size, blocker.getCounter());
|
||||
|
||||
@ -600,7 +600,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
|
||||
readPODBinary(expected_hash, in);
|
||||
|
||||
if (expected_hash != hashing_out.getHash())
|
||||
throw Exception("Checksum mismatch for file " + fullPath(disk, part_download_path + file_name) + " transferred from " + replica_path,
|
||||
throw Exception("Checksum mismatch for file " + fullPath(disk, (fs::path(part_download_path) / file_name).string()) + " transferred from " + replica_path,
|
||||
ErrorCodes::CHECKSUM_DOESNT_MATCH);
|
||||
|
||||
if (file_name != "checksums.txt" &&
|
||||
@ -654,7 +654,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToS3(
|
||||
String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;
|
||||
|
||||
String part_relative_path = String(to_detached ? "detached/" : "") + tmp_prefix + part_name;
|
||||
String part_download_path = data.getRelativeDataPath() + part_relative_path + "/";
|
||||
String part_download_path = fs::path(data.getRelativeDataPath()) / part_relative_path / "";
|
||||
|
||||
if (disk->exists(part_download_path))
|
||||
throw Exception("Directory " + fullPath(disk, part_download_path) + " already exists.", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
|
||||
@ -677,7 +677,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToS3(
|
||||
readStringBinary(file_name, in);
|
||||
readBinary(file_size, in);
|
||||
|
||||
String data_path = new_data_part->getFullRelativePath() + file_name;
|
||||
String data_path = fs::path(new_data_part->getFullRelativePath()) / file_name;
|
||||
String metadata_file = fullPath(disk, data_path);
|
||||
|
||||
{
|
||||
|
@ -69,7 +69,7 @@ void IMergeTreeDataPart::MinMaxIndex::load(const MergeTreeData & data, const Dis
|
||||
hyperrectangle.reserve(minmax_idx_size);
|
||||
for (size_t i = 0; i < minmax_idx_size; ++i)
|
||||
{
|
||||
String file_name = part_path + "minmax_" + escapeForFileName(minmax_column_names[i]) + ".idx";
|
||||
String file_name = fs::path(part_path) / ("minmax_" + escapeForFileName(minmax_column_names[i]) + ".idx");
|
||||
auto file = openForReading(disk_, file_name);
|
||||
auto serialization = minmax_column_types[i]->getDefaultSerialization();
|
||||
|
||||
@ -111,7 +111,7 @@ void IMergeTreeDataPart::MinMaxIndex::store(
|
||||
String file_name = "minmax_" + escapeForFileName(column_names[i]) + ".idx";
|
||||
auto serialization = data_types.at(i)->getDefaultSerialization();
|
||||
|
||||
auto out = disk_->writeFile(part_path + file_name);
|
||||
auto out = disk_->writeFile(fs::path(part_path) / file_name);
|
||||
HashingWriteBuffer out_hashing(*out);
|
||||
serialization->serializeBinary(hyperrectangle[i].left, out_hashing);
|
||||
serialization->serializeBinary(hyperrectangle[i].right, out_hashing);
|
||||
@ -543,7 +543,7 @@ String IMergeTreeDataPart::getFullPath() const
|
||||
if (relative_path.empty())
|
||||
throw Exception("Part relative_path cannot be empty. It's bug.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
return storage.getFullPathOnDisk(volume->getDisk()) + relative_path + "/";
|
||||
return fs::path(storage.getFullPathOnDisk(volume->getDisk())) / relative_path / "";
|
||||
}
|
||||
|
||||
String IMergeTreeDataPart::getFullRelativePath() const
|
||||
@ -551,7 +551,7 @@ String IMergeTreeDataPart::getFullRelativePath() const
|
||||
if (relative_path.empty())
|
||||
throw Exception("Part relative_path cannot be empty. It's bug.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
return storage.relative_data_path + relative_path + "/";
|
||||
return fs::path(storage.relative_data_path) / relative_path / "";
|
||||
}
|
||||
|
||||
void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency)
|
||||
@ -604,7 +604,7 @@ void IMergeTreeDataPart::loadIndex()
|
||||
loaded_index[i]->reserve(index_granularity.getMarksCount());
|
||||
}
|
||||
|
||||
String index_path = getFullRelativePath() + "primary.idx";
|
||||
String index_path = fs::path(getFullRelativePath()) / "primary.idx";
|
||||
auto index_file = openForReading(volume->getDisk(), index_path);
|
||||
|
||||
size_t marks_count = index_granularity.getMarksCount();
|
||||
@ -639,7 +639,7 @@ NameSet IMergeTreeDataPart::getFileNamesWithoutChecksums() const
|
||||
return {};
|
||||
|
||||
NameSet result = {"checksums.txt", "columns.txt"};
|
||||
String default_codec_path = getFullRelativePath() + DEFAULT_COMPRESSION_CODEC_FILE_NAME;
|
||||
String default_codec_path = fs::path(getFullRelativePath()) / DEFAULT_COMPRESSION_CODEC_FILE_NAME;
|
||||
|
||||
if (volume->getDisk()->exists(default_codec_path))
|
||||
result.emplace(DEFAULT_COMPRESSION_CODEC_FILE_NAME);
|
||||
@ -656,7 +656,7 @@ void IMergeTreeDataPart::loadDefaultCompressionCodec()
|
||||
return;
|
||||
}
|
||||
|
||||
String path = getFullRelativePath() + DEFAULT_COMPRESSION_CODEC_FILE_NAME;
|
||||
String path = fs::path(getFullRelativePath()) / DEFAULT_COMPRESSION_CODEC_FILE_NAME;
|
||||
if (!volume->getDisk()->exists(path))
|
||||
{
|
||||
default_codec = detectDefaultCompressionCodec();
|
||||
@ -717,7 +717,7 @@ CompressionCodecPtr IMergeTreeDataPart::detectDefaultCompressionCodec() const
|
||||
{
|
||||
if (path_to_data_file.empty())
|
||||
{
|
||||
String candidate_path = getFullRelativePath() + ISerialization::getFileNameForStream(part_column, substream_path) + ".bin";
|
||||
String candidate_path = fs::path(getFullRelativePath()) / (ISerialization::getFileNameForStream(part_column, substream_path) + ".bin");
|
||||
|
||||
/// We can have existing, but empty .bin files. Example: LowCardinality(Nullable(...)) columns and column_name.dict.null.bin file.
|
||||
if (volume->getDisk()->exists(candidate_path) && volume->getDisk()->getFileSize(candidate_path) != 0)
|
||||
@ -773,7 +773,7 @@ void IMergeTreeDataPart::loadPartitionAndMinMaxIndex()
|
||||
|
||||
void IMergeTreeDataPart::loadChecksums(bool require)
|
||||
{
|
||||
const String path = getFullRelativePath() + "checksums.txt";
|
||||
const String path = fs::path(getFullRelativePath()) / "checksums.txt";
|
||||
|
||||
if (volume->getDisk()->exists(path))
|
||||
{
|
||||
@ -798,11 +798,11 @@ void IMergeTreeDataPart::loadChecksums(bool require)
|
||||
checksums = checkDataPart(shared_from_this(), false);
|
||||
|
||||
{
|
||||
auto out = volume->getDisk()->writeFile(getFullRelativePath() + "checksums.txt.tmp", 4096);
|
||||
auto out = volume->getDisk()->writeFile(fs::path(getFullRelativePath()) / "checksums.txt.tmp", 4096);
|
||||
checksums.write(*out);
|
||||
}
|
||||
|
||||
volume->getDisk()->moveFile(getFullRelativePath() + "checksums.txt.tmp", getFullRelativePath() + "checksums.txt");
|
||||
volume->getDisk()->moveFile(fs::path(getFullRelativePath()) / "checksums.txt.tmp", fs::path(getFullRelativePath()) / "checksums.txt");
|
||||
|
||||
bytes_on_disk = checksums.getTotalSizeOnDisk();
|
||||
}
|
||||
@ -810,7 +810,7 @@ void IMergeTreeDataPart::loadChecksums(bool require)
|
||||
|
||||
void IMergeTreeDataPart::loadRowsCount()
|
||||
{
|
||||
String path = getFullRelativePath() + "count.txt";
|
||||
String path = fs::path(getFullRelativePath()) / "count.txt";
|
||||
if (index_granularity.empty())
|
||||
{
|
||||
rows_count = 0;
|
||||
@ -911,7 +911,7 @@ void IMergeTreeDataPart::loadRowsCount()
|
||||
|
||||
void IMergeTreeDataPart::loadTTLInfos()
|
||||
{
|
||||
String path = getFullRelativePath() + "ttl.txt";
|
||||
String path = fs::path(getFullRelativePath()) / "ttl.txt";
|
||||
if (volume->getDisk()->exists(path))
|
||||
{
|
||||
auto in = openForReading(volume->getDisk(), path);
|
||||
@ -938,7 +938,7 @@ void IMergeTreeDataPart::loadTTLInfos()
|
||||
|
||||
void IMergeTreeDataPart::loadUUID()
|
||||
{
|
||||
String path = getFullRelativePath() + UUID_FILE_NAME;
|
||||
String path = fs::path(getFullRelativePath()) / UUID_FILE_NAME;
|
||||
|
||||
if (volume->getDisk()->exists(path))
|
||||
{
|
||||
@ -951,7 +951,7 @@ void IMergeTreeDataPart::loadUUID()
|
||||
|
||||
void IMergeTreeDataPart::loadColumns(bool require)
|
||||
{
|
||||
String path = getFullRelativePath() + "columns.txt";
|
||||
String path = fs::path(getFullRelativePath()) / "columns.txt";
|
||||
auto metadata_snapshot = storage.getInMemoryMetadataPtr();
|
||||
NamesAndTypesList loaded_columns;
|
||||
|
||||
@ -964,7 +964,7 @@ void IMergeTreeDataPart::loadColumns(bool require)
|
||||
|
||||
/// If there is no file with a list of columns, write it down.
|
||||
for (const NameAndTypePair & column : metadata_snapshot->getColumns().getAllPhysical())
|
||||
if (volume->getDisk()->exists(getFullRelativePath() + getFileNameForColumn(column) + ".bin"))
|
||||
if (volume->getDisk()->exists(fs::path(getFullRelativePath()) / (getFileNameForColumn(column) + ".bin")))
|
||||
loaded_columns.push_back(column);
|
||||
|
||||
if (columns.empty())
|
||||
@ -1002,7 +1002,7 @@ UInt64 IMergeTreeDataPart::calculateTotalSizeOnDisk(const DiskPtr & disk_, const
|
||||
disk_->listFiles(from, files);
|
||||
UInt64 res = 0;
|
||||
for (const auto & file : files)
|
||||
res += calculateTotalSizeOnDisk(disk_, from + file);
|
||||
res += calculateTotalSizeOnDisk(disk_, fs::path(from) / file);
|
||||
return res;
|
||||
}
|
||||
|
||||
@ -1012,7 +1012,7 @@ void IMergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_
|
||||
assertOnDisk();
|
||||
|
||||
String from = getFullRelativePath();
|
||||
String to = storage.relative_data_path + new_relative_path + "/";
|
||||
String to = fs::path(storage.relative_data_path) / new_relative_path / "";
|
||||
|
||||
if (!volume->getDisk()->exists(from))
|
||||
throw Exception("Part directory " + fullPath(volume->getDisk(), from) + " doesn't exist. Most likely it is a logical error.", ErrorCodes::FILE_DOESNT_EXIST);
|
||||
@ -1066,8 +1066,8 @@ void IMergeTreeDataPart::remove(bool keep_s3) const
|
||||
* And a race condition can happen that will lead to "File not found" error here.
|
||||
*/
|
||||
|
||||
String from = storage.relative_data_path + relative_path;
|
||||
String to = storage.relative_data_path + "delete_tmp_" + name;
|
||||
fs::path from = fs::path(storage.relative_data_path) / relative_path;
|
||||
fs::path to = fs::path(storage.relative_data_path) / ("delete_tmp_" + name);
|
||||
// TODO directory delete_tmp_<name> is never removed if server crashes before returning from this function
|
||||
|
||||
if (volume->getDisk()->exists(to))
|
||||
@ -1076,7 +1076,7 @@ void IMergeTreeDataPart::remove(bool keep_s3) const
|
||||
|
||||
try
|
||||
{
|
||||
volume->getDisk()->removeSharedRecursive(to + "/", keep_s3);
|
||||
volume->getDisk()->removeSharedRecursive(to / "", keep_s3);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
@ -1099,7 +1099,7 @@ void IMergeTreeDataPart::remove(bool keep_s3) const
|
||||
if (checksums.empty())
|
||||
{
|
||||
/// If the part is not completely written, we cannot use fast path by listing files.
|
||||
volume->getDisk()->removeSharedRecursive(to + "/", keep_s3);
|
||||
volume->getDisk()->removeSharedRecursive(to / "", keep_s3);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -1112,16 +1112,16 @@ void IMergeTreeDataPart::remove(bool keep_s3) const
|
||||
# pragma GCC diagnostic ignored "-Wunused-variable"
|
||||
#endif
|
||||
for (const auto & [file, _] : checksums.files)
|
||||
volume->getDisk()->removeSharedFile(to + "/" + file, keep_s3);
|
||||
volume->getDisk()->removeSharedFile(to / file, keep_s3);
|
||||
#if !defined(__clang__)
|
||||
# pragma GCC diagnostic pop
|
||||
#endif
|
||||
|
||||
for (const auto & file : {"checksums.txt", "columns.txt"})
|
||||
volume->getDisk()->removeSharedFile(to + "/" + file, keep_s3);
|
||||
volume->getDisk()->removeSharedFile(to / file, keep_s3);
|
||||
|
||||
volume->getDisk()->removeSharedFileIfExists(to + "/" + DEFAULT_COMPRESSION_CODEC_FILE_NAME, keep_s3);
|
||||
volume->getDisk()->removeSharedFileIfExists(to + "/" + DELETE_ON_DESTROY_MARKER_FILE_NAME, keep_s3);
|
||||
volume->getDisk()->removeSharedFileIfExists(to / DEFAULT_COMPRESSION_CODEC_FILE_NAME, keep_s3);
|
||||
volume->getDisk()->removeSharedFileIfExists(to / DELETE_ON_DESTROY_MARKER_FILE_NAME, keep_s3);
|
||||
|
||||
volume->getDisk()->removeDirectory(to);
|
||||
}
|
||||
@ -1131,7 +1131,7 @@ void IMergeTreeDataPart::remove(bool keep_s3) const
|
||||
|
||||
LOG_ERROR(storage.log, "Cannot quickly remove directory {} by removing files; fallback to recursive removal. Reason: {}", fullPath(volume->getDisk(), to), getCurrentExceptionMessage(false));
|
||||
|
||||
volume->getDisk()->removeSharedRecursive(to + "/", keep_s3);
|
||||
volume->getDisk()->removeSharedRecursive(to / "", keep_s3);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1149,7 +1149,7 @@ String IMergeTreeDataPart::getRelativePathForPrefix(const String & prefix) const
|
||||
{
|
||||
res = (prefix.empty() ? "" : prefix + "_") + name + (try_no ? "_try" + DB::toString(try_no) : "");
|
||||
|
||||
if (!volume->getDisk()->exists(getFullRelativePath() + res))
|
||||
if (!volume->getDisk()->exists(fs::path(getFullRelativePath()) / res))
|
||||
return res;
|
||||
|
||||
LOG_WARNING(storage.log, "Directory {} (to detach to) already exists. Will detach to directory with '_tryN' suffix.", res);
|
||||
@ -1172,11 +1172,11 @@ void IMergeTreeDataPart::renameToDetached(const String & prefix) const
|
||||
|
||||
void IMergeTreeDataPart::makeCloneInDetached(const String & prefix, const StorageMetadataPtr & /*metadata_snapshot*/) const
|
||||
{
|
||||
String destination_path = storage.relative_data_path + getRelativePathForDetachedPart(prefix);
|
||||
String destination_path = fs::path(storage.relative_data_path) / getRelativePathForDetachedPart(prefix);
|
||||
|
||||
/// Backup is not recursive (max_level is 0), so do not copy inner directories
|
||||
localBackup(volume->getDisk(), getFullRelativePath(), destination_path, 0);
|
||||
volume->getDisk()->removeFileIfExists(destination_path + "/" + DELETE_ON_DESTROY_MARKER_FILE_NAME);
|
||||
volume->getDisk()->removeFileIfExists(fs::path(destination_path) / DELETE_ON_DESTROY_MARKER_FILE_NAME);
|
||||
}
|
||||
|
||||
void IMergeTreeDataPart::makeCloneOnDisk(const DiskPtr & disk, const String & directory_name) const
|
||||
@ -1188,16 +1188,16 @@ void IMergeTreeDataPart::makeCloneOnDisk(const DiskPtr & disk, const String & di
|
||||
if (directory_name.empty())
|
||||
throw Exception("Can not clone data part " + name + " to empty directory.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
String path_to_clone = storage.relative_data_path + directory_name + '/';
|
||||
String path_to_clone = fs::path(storage.relative_data_path) / directory_name / "";
|
||||
|
||||
if (disk->exists(path_to_clone + relative_path))
|
||||
if (disk->exists(fs::path(path_to_clone) / relative_path))
|
||||
{
|
||||
LOG_WARNING(storage.log, "Path " + fullPath(disk, path_to_clone + relative_path) + " already exists. Will remove it and clone again.");
|
||||
disk->removeRecursive(path_to_clone + relative_path + '/');
|
||||
disk->removeRecursive(fs::path(path_to_clone) / relative_path / "");
|
||||
}
|
||||
disk->createDirectories(path_to_clone);
|
||||
volume->getDisk()->copy(getFullRelativePath(), disk, path_to_clone);
|
||||
volume->getDisk()->removeFileIfExists(path_to_clone + '/' + DELETE_ON_DESTROY_MARKER_FILE_NAME);
|
||||
volume->getDisk()->removeFileIfExists(fs::path(path_to_clone) / DELETE_ON_DESTROY_MARKER_FILE_NAME);
|
||||
}
|
||||
|
||||
void IMergeTreeDataPart::checkConsistencyBase() const
|
||||
@ -1244,17 +1244,17 @@ void IMergeTreeDataPart::checkConsistencyBase() const
|
||||
|
||||
/// Check that the primary key index is not empty.
|
||||
if (!pk.column_names.empty())
|
||||
check_file_not_empty(volume->getDisk(), path + "primary.idx");
|
||||
check_file_not_empty(volume->getDisk(), fs::path(path) / "primary.idx");
|
||||
|
||||
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
|
||||
{
|
||||
check_file_not_empty(volume->getDisk(), path + "count.txt");
|
||||
check_file_not_empty(volume->getDisk(), fs::path(path) / "count.txt");
|
||||
|
||||
if (metadata_snapshot->hasPartitionKey())
|
||||
check_file_not_empty(volume->getDisk(), path + "partition.dat");
|
||||
check_file_not_empty(volume->getDisk(), fs::path(path) / "partition.dat");
|
||||
|
||||
for (const String & col_name : storage.getMinMaxColumnsNames(partition_key))
|
||||
check_file_not_empty(volume->getDisk(), path + "minmax_" + escapeForFileName(col_name) + ".idx");
|
||||
check_file_not_empty(volume->getDisk(), fs::path(path) / ("minmax_" + escapeForFileName(col_name) + ".idx"));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1348,7 +1348,7 @@ String IMergeTreeDataPart::getUniqueId() const
|
||||
auto disk = volume->getDisk();
|
||||
|
||||
if (disk->getType() == DB::DiskType::Type::S3)
|
||||
id = disk->getUniqueId(getFullRelativePath() + "checksums.txt");
|
||||
id = disk->getUniqueId(fs::path(getFullRelativePath()) / "checksums.txt");
|
||||
|
||||
if (id.empty())
|
||||
throw Exception("Can't get unique S3 object", ErrorCodes::LOGICAL_ERROR);
|
||||
|
@ -1868,7 +1868,7 @@ void MergeTreeDataMergerMutator::finalizeMutatedPart(
|
||||
if (need_remove_expired_values)
|
||||
{
|
||||
/// Write a file with ttl infos in json format.
|
||||
auto out_ttl = disk->writeFile(new_data_part->getFullRelativePath() + "ttl.txt", 4096);
|
||||
auto out_ttl = disk->writeFile(fs::path(new_data_part->getFullRelativePath()) / "ttl.txt", 4096);
|
||||
HashingWriteBuffer out_hashing(*out_ttl);
|
||||
new_data_part->ttl_infos.write(out_hashing);
|
||||
new_data_part->checksums.files["ttl.txt"].file_size = out_hashing.count();
|
||||
@ -1877,7 +1877,7 @@ void MergeTreeDataMergerMutator::finalizeMutatedPart(
|
||||
|
||||
{
|
||||
/// Write file with checksums.
|
||||
auto out_checksums = disk->writeFile(new_data_part->getFullRelativePath() + "checksums.txt", 4096);
|
||||
auto out_checksums = disk->writeFile(fs::path(new_data_part->getFullRelativePath()) / "checksums.txt", 4096);
|
||||
new_data_part->checksums.write(*out_checksums);
|
||||
} /// close fd
|
||||
|
||||
@ -1888,7 +1888,7 @@ void MergeTreeDataMergerMutator::finalizeMutatedPart(
|
||||
|
||||
{
|
||||
/// Write a file with a description of columns.
|
||||
auto out_columns = disk->writeFile(new_data_part->getFullRelativePath() + "columns.txt", 4096);
|
||||
auto out_columns = disk->writeFile(fs::path(new_data_part->getFullRelativePath()) / "columns.txt", 4096);
|
||||
new_data_part->getColumns().writeText(*out_columns);
|
||||
} /// close fd
|
||||
|
||||
|
@ -121,7 +121,7 @@ void MergedBlockOutputStream::finalizePartOnDisk(
|
||||
throw Exception("MinMax index was not initialized for new non-empty part " + new_part->name
|
||||
+ ". It is a bug.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
auto count_out = volume->getDisk()->writeFile(part_path + "count.txt", 4096);
|
||||
auto count_out = volume->getDisk()->writeFile(fs::path(part_path) / "count.txt", 4096);
|
||||
HashingWriteBuffer count_out_hashing(*count_out);
|
||||
writeIntText(rows_count, count_out_hashing);
|
||||
count_out_hashing.next();
|
||||
@ -135,7 +135,7 @@ void MergedBlockOutputStream::finalizePartOnDisk(
|
||||
if (!new_part->ttl_infos.empty())
|
||||
{
|
||||
/// Write a file with ttl infos in json format.
|
||||
auto out = volume->getDisk()->writeFile(part_path + "ttl.txt", 4096);
|
||||
auto out = volume->getDisk()->writeFile(fs::path(part_path) / "ttl.txt", 4096);
|
||||
HashingWriteBuffer out_hashing(*out);
|
||||
new_part->ttl_infos.write(out_hashing);
|
||||
checksums.files["ttl.txt"].file_size = out_hashing.count();
|
||||
@ -149,7 +149,7 @@ void MergedBlockOutputStream::finalizePartOnDisk(
|
||||
|
||||
{
|
||||
/// Write a file with a description of columns.
|
||||
auto out = volume->getDisk()->writeFile(part_path + "columns.txt", 4096);
|
||||
auto out = volume->getDisk()->writeFile(fs::path(part_path) / "columns.txt", 4096);
|
||||
part_columns.writeText(*out);
|
||||
out->finalize();
|
||||
if (sync)
|
||||
@ -170,7 +170,7 @@ void MergedBlockOutputStream::finalizePartOnDisk(
|
||||
|
||||
{
|
||||
/// Write file with checksums.
|
||||
auto out = volume->getDisk()->writeFile(part_path + "checksums.txt", 4096);
|
||||
auto out = volume->getDisk()->writeFile(fs::path(part_path) / "checksums.txt", 4096);
|
||||
checksums.write(*out);
|
||||
out->finalize();
|
||||
if (sync)
|
||||
|
@ -68,7 +68,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
|
||||
NamesAndTypesList columns_txt;
|
||||
|
||||
{
|
||||
auto buf = disk->readFile(path + "columns.txt");
|
||||
auto buf = disk->readFile(fs::path(path) / "columns.txt");
|
||||
columns_txt.readText(*buf);
|
||||
assertEOF(*buf);
|
||||
}
|
||||
@ -141,9 +141,9 @@ IMergeTreeDataPart::Checksums checkDataPart(
|
||||
/// Checksums from the rest files listed in checksums.txt. May be absent. If present, they are subsequently compared with the actual data checksums.
|
||||
IMergeTreeDataPart::Checksums checksums_txt;
|
||||
|
||||
if (require_checksums || disk->exists(path + "checksums.txt"))
|
||||
if (require_checksums || disk->exists(fs::path(path) / "checksums.txt"))
|
||||
{
|
||||
auto buf = disk->readFile(path + "checksums.txt");
|
||||
auto buf = disk->readFile(fs::path(path) / "checksums.txt");
|
||||
checksums_txt.read(*buf);
|
||||
assertEOF(*buf);
|
||||
}
|
||||
|
@ -1482,8 +1482,8 @@ CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_
|
||||
auto disk = part->volume->getDisk();
|
||||
String part_path = part->getFullRelativePath();
|
||||
/// If the checksums file is not present, calculate the checksums and write them to disk.
|
||||
String checksums_path = part_path + "checksums.txt";
|
||||
String tmp_checksums_path = part_path + "checksums.txt.tmp";
|
||||
String checksums_path = fs::path(part_path) / "checksums.txt";
|
||||
String tmp_checksums_path = fs::path(part_path) / "checksums.txt.tmp";
|
||||
if (part->isStoredOnDisk() && !disk->exists(checksums_path))
|
||||
{
|
||||
try
|
||||
|
Loading…
Reference in New Issue
Block a user