Fix checks

This commit is contained in:
kssenii 2021-05-25 17:37:35 +00:00
parent e01abf807e
commit 379e26bcbc
2 changed files with 27 additions and 25 deletions

View File

@ -24,7 +24,7 @@ public:
using Chunk = std::vector<std::string>; using Chunk = std::vector<std::string>;
using Chunks = std::list<Chunk>; using Chunks = std::list<Chunk>;
HDFSPathKeeper(size_t chunk_limit_) : RemoteFSPathKeeper(chunk_limit_) {} explicit HDFSPathKeeper(size_t chunk_limit_) : RemoteFSPathKeeper(chunk_limit_) {}
void addPath(const String & path) override void addPath(const String & path) override
{ {
@ -137,19 +137,20 @@ RemoteFSPathKeeperPtr DiskHDFS::createFSPathKeeper() const
void DiskHDFS::removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper) void DiskHDFS::removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper)
{ {
auto * hdfs_paths_keeper = dynamic_cast<HDFSPathKeeper *>(fs_paths_keeper.get()); auto * hdfs_paths_keeper = dynamic_cast<HDFSPathKeeper *>(fs_paths_keeper.get());
hdfs_paths_keeper->removePaths([&](std::vector<std::string> && chunk) if (hdfs_paths_keeper)
{ hdfs_paths_keeper->removePaths([&](std::vector<std::string> && chunk)
for (const auto & hdfs_object_path : chunk)
{ {
const String & hdfs_path = hdfs_object_path; for (const auto & hdfs_object_path : chunk)
const size_t begin_of_path = hdfs_path.find('/', hdfs_path.find("//") + 2); {
const String & hdfs_path = hdfs_object_path;
const size_t begin_of_path = hdfs_path.find('/', hdfs_path.find("//") + 2);
/// Add path from root to file name /// Add path from root to file name
int res = hdfsDelete(hdfs_fs.get(), hdfs_path.substr(begin_of_path).c_str(), 0); int res = hdfsDelete(hdfs_fs.get(), hdfs_path.substr(begin_of_path).c_str(), 0);
if (res == -1) if (res == -1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "HDFSDelete failed with path: " + hdfs_path); throw Exception(ErrorCodes::LOGICAL_ERROR, "HDFSDelete failed with path: " + hdfs_path);
} }
}); });
} }

View File

@ -47,7 +47,7 @@ public:
using Chunk = Aws::Vector<Aws::S3::Model::ObjectIdentifier>; using Chunk = Aws::Vector<Aws::S3::Model::ObjectIdentifier>;
using Chunks = std::list<Chunk>; using Chunks = std::list<Chunk>;
S3PathKeeper(size_t chunk_limit_) : RemoteFSPathKeeper(chunk_limit_) {} explicit S3PathKeeper(size_t chunk_limit_) : RemoteFSPathKeeper(chunk_limit_) {}
void addPath(const String & path) override void addPath(const String & path) override
{ {
@ -178,18 +178,19 @@ void DiskS3::removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper)
auto settings = current_settings.get(); auto settings = current_settings.get();
auto * s3_paths_keeper = dynamic_cast<S3PathKeeper *>(fs_paths_keeper.get()); auto * s3_paths_keeper = dynamic_cast<S3PathKeeper *>(fs_paths_keeper.get());
s3_paths_keeper->removePaths([&](S3PathKeeper::Chunk && chunk) if (s3_paths_keeper)
{ s3_paths_keeper->removePaths([&](S3PathKeeper::Chunk && chunk)
LOG_DEBUG(log, "Remove AWS keys {}", S3PathKeeper::getChunkKeys(chunk)); {
Aws::S3::Model::Delete delkeys; LOG_DEBUG(log, "Remove AWS keys {}", S3PathKeeper::getChunkKeys(chunk));
delkeys.SetObjects(chunk); Aws::S3::Model::Delete delkeys;
/// TODO: Make operation idempotent. Do not throw exception if key is already deleted. delkeys.SetObjects(chunk);
Aws::S3::Model::DeleteObjectsRequest request; /// TODO: Make operation idempotent. Do not throw exception if key is already deleted.
request.SetBucket(bucket); Aws::S3::Model::DeleteObjectsRequest request;
request.SetDelete(delkeys); request.SetBucket(bucket);
auto outcome = settings->client->DeleteObjects(request); request.SetDelete(delkeys);
throwIfError(outcome); auto outcome = settings->client->DeleteObjects(request);
}); throwIfError(outcome);
});
} }
void DiskS3::moveFile(const String & from_path, const String & to_path) void DiskS3::moveFile(const String & from_path, const String & to_path)