static-files-disk-uploader: add mode to create symlinks

This commit is contained in:
Alexey Milovidov 2022-02-09 03:50:50 +03:00
parent 63bffb0110
commit 742620209a
2 changed files with 26 additions and 30 deletions

View File

@ -31,7 +31,7 @@ namespace ErrorCodes
* If test-mode option is added, files will be put by given url via PUT request.
*/
void processFile(const fs::path & file_path, const fs::path & dst_path, bool test_mode, WriteBuffer & metadata_buf)
void processFile(const fs::path & file_path, const fs::path & dst_path, bool test_mode, bool link, WriteBuffer & metadata_buf)
{
String remote_path;
RE2::FullMatch(file_path.string(), EXTRACT_PATH_PATTERN, &remote_path);
@ -52,22 +52,29 @@ void processFile(const fs::path & file_path, const fs::path & dst_path, bool tes
auto dst_file_path = fs::path(dst_path) / remote_path;
auto src_buf = createReadBufferFromFileBase(file_path, {}, fs::file_size(file_path));
std::shared_ptr<WriteBuffer> dst_buf;
/// test mode for integration tests.
if (test_mode)
dst_buf = std::make_shared<WriteBufferFromHTTP>(Poco::URI(dst_file_path), Poco::Net::HTTPRequest::HTTP_PUT);
if (link)
{
fs::create_symlink(file_path, dst_file_path);
}
else
dst_buf = std::make_shared<WriteBufferFromFile>(dst_file_path);
{
auto src_buf = createReadBufferFromFileBase(file_path, {}, fs::file_size(file_path));
std::shared_ptr<WriteBuffer> dst_buf;
copyData(*src_buf, *dst_buf);
dst_buf->next();
dst_buf->finalize();
/// test mode for integration tests.
if (test_mode)
dst_buf = std::make_shared<WriteBufferFromHTTP>(Poco::URI(dst_file_path), Poco::Net::HTTPRequest::HTTP_PUT);
else
dst_buf = std::make_shared<WriteBufferFromFile>(dst_file_path);
copyData(*src_buf, *dst_buf);
dst_buf->next();
dst_buf->finalize();
}
};
void processTableFiles(const fs::path & data_path, fs::path dst_path, bool test_mode)
void processTableFiles(const fs::path & data_path, fs::path dst_path, bool test_mode, bool link)
{
std::cerr << "Data path: " << data_path << ", destination path: " << dst_path << std::endl;
@ -94,7 +101,7 @@ void processTableFiles(const fs::path & data_path, fs::path dst_path, bool test_
{
if (dir_it->is_directory())
{
processFile(dir_it->path(), dst_path, test_mode, *root_meta);
processFile(dir_it->path(), dst_path, test_mode, link, *root_meta);
String directory_prefix;
RE2::FullMatch(dir_it->path().string(), EXTRACT_PATH_PATTERN, &directory_prefix);
@ -115,14 +122,14 @@ void processTableFiles(const fs::path & data_path, fs::path dst_path, bool test_
fs::directory_iterator files_end;
for (fs::directory_iterator file_it(dir_it->path()); file_it != files_end; ++file_it)
processFile(file_it->path(), dst_path, test_mode, *directory_meta);
processFile(file_it->path(), dst_path, test_mode, link, *directory_meta);
directory_meta->next();
directory_meta->finalize();
}
else
{
processFile(dir_it->path(), dst_path, test_mode, *root_meta);
processFile(dir_it->path(), dst_path, test_mode, link, *root_meta);
}
}
root_meta->next();
@ -141,6 +148,7 @@ try
("help,h", "produce help message")
("metadata-path", po::value<std::string>(), "Metadata path (select data_paths from system.tables where name='table_name'")
("test-mode", "Use test mode, which will put data on given url via PUT")
("link", "Create symlinks instead of copying")
("url", po::value<std::string>(), "Web server url for test mode")
("output-dir", po::value<std::string>(), "Directory to put files in non-test mode");
@ -186,7 +194,7 @@ try
root_path = fs::current_path();
}
processTableFiles(fs_path, root_path, test_mode);
processTableFiles(fs_path, root_path, test_mode, options.count("link"));
return 0;
}

View File

@ -2,17 +2,9 @@
#if USE_HDFS
#include <Common/Exception.h>
#include <Common/Throttler.h>
#include <Client/Connection.h>
#include <Core/QueryProcessingStage.h>
#include <Core/UUID.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromS3.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/getHeaderForProcessingStage.h>
#include <Interpreters/SelectQueryOptions.h>
@ -21,7 +13,6 @@
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <QueryPipeline/narrowBlockInputStreams.h>
#include <QueryPipeline/Pipe.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Sources/RemoteSource.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <Parsers/queryToString.h>
@ -29,16 +20,13 @@
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/HDFS/StorageHDFSCluster.h>
#include <base/logger_useful.h>
#include <ios>
#include <memory>
#include <string>
#include <thread>
#include <cassert>
namespace DB
{
StorageHDFSCluster::StorageHDFSCluster(
String cluster_name_,
const String & uri_,