Change in Storages/*

This commit is contained in:
kssenii 2021-04-27 03:05:43 +03:00
parent 6d8efe5d5b
commit eeb71672a0
11 changed files with 71 additions and 82 deletions

View File

@ -25,11 +25,9 @@
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/Operators.h>
#include <Disks/IDisk.h>
#include <boost/algorithm/string/find_iterator.hpp>
#include <boost/algorithm/string/finder.hpp>
#include <Poco/DirectoryIterator.h>
#include <filesystem>
namespace CurrentMetrics
@ -38,6 +36,8 @@ namespace CurrentMetrics
extern const Metric DistributedFilesToInsert;
}
namespace fs = std::filesystem;
namespace DB
{
@ -332,7 +332,7 @@ void StorageDistributedDirectoryMonitor::shutdownAndDropAllData()
}
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path);
Poco::File(path).remove(true);
fs::remove_all(path);
}
@ -475,16 +475,15 @@ std::map<UInt64, std::string> StorageDistributedDirectoryMonitor::getFiles()
std::map<UInt64, std::string> files;
size_t new_bytes_count = 0;
Poco::DirectoryIterator end;
for (Poco::DirectoryIterator it{path}; it != end; ++it)
fs::directory_iterator end;
for (fs::directory_iterator it{path}; it != end; ++it)
{
const auto & file_path_str = it->path();
Poco::Path file_path{file_path_str};
if (!it->isDirectory() && startsWith(file_path.getExtension(), "bin"))
fs::path fs_file_path(file_path_str);
if (!it->is_directory() && startsWith(fs_file_path.extension(), ".bin"))
{
files[parse<UInt64>(file_path.getBaseName())] = file_path_str;
new_bytes_count += Poco::File(file_path).getSize();
files[parse<UInt64>(fs_file_path.stem())] = file_path_str;
new_bytes_count += fs::file_size(fs_file_path);
}
}
@ -646,8 +645,7 @@ struct StorageDistributedDirectoryMonitor::Batch
String tmp_file{parent.current_batch_file_path + ".tmp"};
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, parent.disk, parent.relative_path);
if (Poco::File{tmp_file}.exists())
if (fs::exists(tmp_file))
LOG_ERROR(parent.log, "Temporary file {} exists. Unclean shutdown?", backQuote(tmp_file));
{
@ -659,7 +657,7 @@ struct StorageDistributedDirectoryMonitor::Batch
out.sync();
}
Poco::File{tmp_file}.renameTo(parent.current_batch_file_path);
fs::rename(tmp_file, parent.current_batch_file_path);
}
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(parent.storage.getContext()->getSettingsRef());
auto connection = parent.pool->get(timeouts);
@ -732,7 +730,7 @@ struct StorageDistributedDirectoryMonitor::Batch
total_bytes = 0;
recovered = false;
Poco::File{parent.current_batch_file_path}.setSize(0);
fs::resize_file(parent.current_batch_file_path, 0);
}
void writeText(WriteBuffer & out)
@ -832,7 +830,7 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
{
std::unordered_set<UInt64> file_indices_to_skip;
if (Poco::File{current_batch_file_path}.exists())
if (fs::exists(current_batch_file_path))
{
/// Possibly, we failed to send a batch on the previous iteration. Try to send exactly the same batch.
Batch batch(*this, files);
@ -933,8 +931,8 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
/// current_batch.txt will not exist if there was no send
/// (this is the case when all batches that was pending has been marked as pending)
if (Poco::File{current_batch_file_path}.exists())
Poco::File{current_batch_file_path}.remove();
if (fs::exists(current_batch_file_path))
fs::remove(current_batch_file_path);
}
}
@ -946,30 +944,26 @@ void StorageDistributedDirectoryMonitor::markAsBroken(const std::string & file_p
const auto & broken_path = base_path + "broken/";
const auto & broken_file_path = broken_path + file_name;
Poco::File{broken_path}.createDirectory();
fs::create_directory(broken_path);
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path);
auto broken_dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path + "/broken/");
Poco::File file(file_path);
{
std::lock_guard metrics_lock(metrics_mutex);
size_t file_size = file.getSize();
size_t file_size = fs::file_size(file_path);
--files_count;
bytes_count -= file_size;
}
file.renameTo(broken_file_path);
fs::rename(file_path, broken_file_path);
LOG_ERROR(log, "Renamed `{}` to `{}`", file_path, broken_file_path);
}
void StorageDistributedDirectoryMonitor::markAsSend(const std::string & file_path)
{
Poco::File file(file_path);
size_t file_size = file.getSize();
size_t file_size = fs::file_size(file_path);
{
std::lock_guard metrics_lock(metrics_mutex);
@ -978,7 +972,7 @@ void StorageDistributedDirectoryMonitor::markAsSend(const std::string & file_pat
bytes_count -= file_size;
}
file.remove();
fs::remove(file_path);
}
bool StorageDistributedDirectoryMonitor::maybeMarkAsBroken(const std::string & file_path, const Exception & e)

View File

@ -33,11 +33,10 @@
#include <ext/range.h>
#include <ext/scope_guard.h>
#include <Poco/DirectoryIterator.h>
#include <future>
#include <condition_variable>
#include <mutex>
#include <filesystem>
namespace CurrentMetrics
@ -50,10 +49,11 @@ namespace ProfileEvents
extern const Event DistributedSyncInsertionTimeoutExceeded;
}
namespace fs = std::filesystem;
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
@ -656,10 +656,10 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
/// hardlinking to ensure the inode is not freed until we're done
{
const std::string path(disk_path + data_path + *it);
Poco::File(path).createDirectory();
const std::string tmp_path(path + "/tmp/");
Poco::File(tmp_path).createDirectory();
fs::create_directory(path);
fs::create_directory(tmp_path);
const std::string file_name(toString(storage.file_names_increment.get()) + ".bin");
@ -723,17 +723,17 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
for (; it != dir_names.end(); ++it)
{
const std::string path(disk_path + data_path + *it);
Poco::File(path).createDirectory();
fs::create_directory(path);
const std::string block_file_path(path + '/' + toString(storage.file_names_increment.get()) + ".bin");
createHardLink(first_file_tmp_path, block_file_path);
auto dir_sync_guard = make_directory_sync_guard(*it);
}
auto file_size = Poco::File(first_file_tmp_path).getSize();
auto file_size = fs::file_size(first_file_tmp_path);
/// remove the temporary file, enabling the OS to reclaim inode after all threads
/// have removed their corresponding files
Poco::File(first_file_tmp_path).remove();
fs::remove(first_file_tmp_path);
/// Notify
auto sleep_ms = context->getSettingsRef().distributed_directory_monitor_sleep_time_ms;

View File

@ -13,11 +13,11 @@
#include <Common/NetException.h>
#include <IO/createReadBufferFromFileBase.h>
#include <ext/scope_guard.h>
#include <Poco/File.h>
#include <Poco/Net/HTTPRequest.h>
namespace fs = std::filesystem;
namespace CurrentMetrics
{
extern const Metric ReplicatedSend;
@ -281,13 +281,13 @@ void Service::sendPartS3Metadata(const MergeTreeData::DataPartPtr & part, WriteB
String metadata_file = disk->getPath() + part->getFullRelativePath() + file_name;
Poco::File metadata(metadata_file);
fs::path metadata(metadata_file);
if (!metadata.exists())
if (!fs::exists(metadata))
throw Exception("S3 metadata '" + file_name + "' is not exists", ErrorCodes::CORRUPTED_DATA);
if (!metadata.isFile())
if (!fs::is_regular_file(metadata))
throw Exception("S3 metadata '" + file_name + "' is not a file", ErrorCodes::CORRUPTED_DATA);
UInt64 file_size = metadata.getSize();
UInt64 file_size = fs::file_size(metadata);
writeStringBinary(it.first, out);
writeBinary(file_size, out);

View File

@ -51,8 +51,6 @@
#include <Common/quoteString.h>
#include <Common/typeid_cast.h>
#include <Poco/DirectoryIterator.h>
#include <boost/range/adaptor/filtered.hpp>
#include <boost/algorithm/string/join.hpp>
@ -66,8 +64,11 @@
#include <typeinfo>
#include <typeindex>
#include <unordered_set>
#include <filesystem>
namespace fs = std::filesystem;
namespace ProfileEvents
{
extern const Event RejectedInserts;
@ -3834,9 +3835,9 @@ PartitionCommandsResultInfo MergeTreeData::freezePartitionsByMatcher(
const String & with_name,
ContextPtr local_context)
{
String clickhouse_path = Poco::Path(local_context->getPath()).makeAbsolute().toString();
String clickhouse_path = fs::absolute(local_context->getPath());
String default_shadow_path = clickhouse_path + "shadow/";
Poco::File(default_shadow_path).createDirectories();
fs::create_directories(default_shadow_path);
auto increment = Increment(default_shadow_path + "increment.txt").get(true);
const String shadow_path = "shadow/";

View File

@ -3,8 +3,6 @@
#include <optional>
#include <unordered_set>
#include <Poco/File.h>
#include <Common/FieldVisitors.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Storages/MergeTree/MergeTreeSelectProcessor.h>

View File

@ -1,6 +1,8 @@
#include <Storages/MergeTree/MergeTreeIndexGranularityInfo.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Poco/Path.h>
namespace fs = std::filesystem;
namespace DB
{
@ -17,8 +19,7 @@ std::optional<std::string> MergeTreeIndexGranularityInfo::getMarksExtensionFromF
{
for (DiskDirectoryIteratorPtr it = disk->iterateDirectory(path_to_part); it->isValid(); it->next())
{
Poco::Path path(it->path());
const auto & ext = "." + path.getExtension();
const auto & ext = fs::path(it->path()).extension();
if (ext == getNonAdaptiveMrkExtension()
|| ext == getAdaptiveMrkExtension(MergeTreeDataPartType::WIDE)
|| ext == getAdaptiveMrkExtension(MergeTreeDataPartType::COMPACT))

View File

@ -3,7 +3,6 @@
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/NestedUtils.h>
#include <Columns/ColumnArray.h>
#include <Poco/File.h>
namespace DB
{

View File

@ -26,14 +26,16 @@
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/convertFieldToType.h>
#include <Poco/File.h>
#include <Poco/Path.h>
#include <Poco/Logger.h>
#include <common/logger_useful.h>
#include <rocksdb/db.h>
#include <rocksdb/table.h>
#include <filesystem>
namespace fs = std::filesystem;
namespace DB
{
@ -253,7 +255,7 @@ StorageEmbeddedRocksDB::StorageEmbeddedRocksDB(const StorageID & table_id_,
rocksdb_dir = context_->getPath() + relative_data_path_;
if (!attach)
{
Poco::File(rocksdb_dir).createDirectories();
fs::create_directories(rocksdb_dir);
}
initDb();
}
@ -261,8 +263,8 @@ StorageEmbeddedRocksDB::StorageEmbeddedRocksDB(const StorageID & table_id_,
void StorageEmbeddedRocksDB::truncate(const ASTPtr &, const StorageMetadataPtr & , ContextPtr, TableExclusiveLockHolder &)
{
rocksdb_ptr->Close();
Poco::File(rocksdb_dir).remove(true);
Poco::File(rocksdb_dir).createDirectories();
fs::remove_all(rocksdb_dir);
fs::create_directories(rocksdb_dir);
initDb();
}

View File

@ -76,6 +76,8 @@
#include <cassert>
namespace fs = std::filesystem;
namespace
{
const UInt64 FORCE_OPTIMIZE_SKIP_UNUSED_SHARDS_HAS_SHARDING_KEY = 1;
@ -845,7 +847,7 @@ StoragePolicyPtr StorageDistributed::getStoragePolicy() const
void StorageDistributed::createDirectoryMonitors(const DiskPtr & disk)
{
const std::string path(disk->getPath() + relative_data_path);
Poco::File{path}.createDirectories();
fs::create_directories(path);
std::filesystem::directory_iterator begin(path);
std::filesystem::directory_iterator end;

View File

@ -28,9 +28,6 @@
#include <fcntl.h>
#include <unistd.h>
#include <Poco/Path.h>
#include <Poco/File.h>
#include <re2/re2.h>
#include <filesystem>
#include <Storages/Distributed/DirectoryMonitor.h>
@ -39,6 +36,7 @@
#include <Processors/Sources/NullSource.h>
#include <Processors/Pipe.h>
namespace fs = std::filesystem;
namespace DB
@ -78,10 +76,9 @@ std::vector<std::string> listFilesWithRegexpMatching(const std::string & path_fo
std::vector<std::string> result;
const std::string prefix_without_globs = path_for_ls + for_match.substr(1, end_of_path_without_globs);
if (!fs::exists(fs::path(prefix_without_globs)))
{
if (!fs::exists(prefix_without_globs))
return result;
}
const fs::directory_iterator end;
for (fs::directory_iterator it(prefix_without_globs); it != end; ++it)
{
@ -125,21 +122,20 @@ void checkCreationIsAllowed(ContextPtr context_global, const std::string & db_di
if (!startsWith(table_path, db_dir_path) && table_path != "/dev/null")
throw Exception("File is not inside " + db_dir_path, ErrorCodes::DATABASE_ACCESS_DENIED);
Poco::File table_path_poco_file = Poco::File(table_path);
if (table_path_poco_file.exists() && table_path_poco_file.isDirectory())
if (fs::exists(table_path) && fs::is_directory(table_path))
throw Exception("File must not be a directory", ErrorCodes::INCORRECT_FILE_NAME);
}
}
Strings StorageFile::getPathsList(const String & table_path, const String & user_files_path, ContextPtr context)
{
String user_files_absolute_path = Poco::Path(user_files_path).makeAbsolute().makeDirectory().toString();
Poco::Path poco_path = Poco::Path(table_path);
if (poco_path.isRelative())
poco_path = Poco::Path(user_files_absolute_path, poco_path);
fs::path user_files_absolute_path = fs::absolute(user_files_path);
fs::path fs_table_path(table_path);
if (fs_table_path.is_relative())
fs_table_path = user_files_absolute_path / fs_table_path;
Strings paths;
const String path = poco_path.absolute().toString();
const String path = fs::absolute(fs_table_path);
if (path.find_first_of("*?{") == std::string::npos)
paths.push_back(path);
else
@ -205,7 +201,7 @@ StorageFile::StorageFile(const std::string & relative_table_dir_path, CommonArgu
throw Exception("Distributed format is allowed only with explicit file path", ErrorCodes::INCORRECT_FILE_NAME);
String table_dir_path = base_path + relative_table_dir_path + "/";
Poco::File(table_dir_path).createDirectories();
fs::create_directories(table_dir_path);
paths = {getTablePath(table_dir_path, format_name)};
}
@ -455,7 +451,7 @@ Pipe StorageFile::read(
if (use_table_fd) /// need to call ctr BlockInputStream
paths = {""}; /// when use fd, paths are empty
else
if (paths.size() == 1 && !Poco::File(paths[0]).exists())
if (paths.size() == 1 && !fs::exists(paths[0]))
{
if (context->getSettingsRef().engine_file_empty_if_not_exists)
return Pipe(std::make_shared<NullSource>(metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID())));
@ -598,7 +594,7 @@ BlockOutputStreamPtr StorageFile::write(
if (!paths.empty())
{
path = paths[0];
Poco::File(Poco::Path(path).makeParent()).createDirectories();
fs::create_directories(fs::path(path).parent_path());
}
return std::make_shared<StorageFileBlockOutputStream>(
@ -635,8 +631,8 @@ void StorageFile::rename(const String & new_path_to_table_data, const StorageID
if (path_new == paths[0])
return;
Poco::File(Poco::Path(path_new).parent()).createDirectories();
Poco::File(paths[0]).renameTo(path_new);
fs::create_directories(fs::path(path_new).parent_path());
fs::rename(paths[0], path_new);
paths[0] = std::move(path_new);
renameInMemory(new_table_id);
@ -658,7 +654,7 @@ void StorageFile::truncate(
}
else
{
if (!Poco::File(paths[0]).exists())
if (!fs::exists(paths[0]))
return;
if (0 != ::truncate(paths[0].c_str(), 0))

View File

@ -1,10 +1,6 @@
#pragma once
#include <Storages/IStorage.h>
#include <Poco/File.h>
#include <Poco/Path.h>
#include <common/logger_useful.h>
#include <atomic>