ClickHouse/programs/static-files-disk-uploader/static-files-disk-uploader.cpp

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

208 lines
6.6 KiB
C++
Raw Normal View History

2021-08-29 14:18:04 +00:00
#include <Common/Exception.h>
#include <Common/TerminalSize.h>
2024-01-07 22:28:08 +00:00
#include <Common/re2.h>
2021-08-29 14:18:04 +00:00
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromFile.h>
2024-10-24 11:57:26 +00:00
#include <IO/ReadSettings.h>
2021-08-29 14:18:04 +00:00
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromHTTP.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/copyData.h>
2022-03-08 17:05:55 +00:00
#include <Disks/IO/createReadBufferFromFileBase.h>
2021-08-29 14:18:04 +00:00
#include <boost/program_options.hpp>
#include <filesystem>
2024-12-03 11:05:15 +00:00
#include <iostream>
2021-08-29 14:18:04 +00:00
namespace fs = std::filesystem;
2021-09-08 17:22:24 +00:00
#define EXTRACT_PATH_PATTERN ".*\\/store/(.*)"
2021-08-29 14:18:04 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
/*
* A tool to collect table data files on local fs as is (into current directory or into path from --output-dir option).
* If test-mode option is added, files will be put by given url via PUT request.
*/
2024-10-15 16:24:06 +00:00
static void processFile(const fs::path & file_path, const fs::path & dst_path, bool test_mode, bool link, WriteBuffer & metadata_buf)
2021-08-29 14:18:04 +00:00
{
2021-09-08 17:22:24 +00:00
String remote_path;
RE2::FullMatch(file_path.string(), EXTRACT_PATH_PATTERN, &remote_path);
bool is_directory = fs::is_directory(file_path);
2021-09-13 06:53:38 +00:00
writeText(file_path.filename().string(), metadata_buf);
2021-09-08 17:22:24 +00:00
writeChar('\t', metadata_buf);
writeBoolText(is_directory, metadata_buf);
if (!is_directory)
2021-08-29 14:18:04 +00:00
{
writeChar('\t', metadata_buf);
writeIntText(fs::file_size(file_path), metadata_buf);
2021-09-08 17:22:24 +00:00
}
writeChar('\n', metadata_buf);
if (is_directory)
return;
auto dst_file_path = fs::path(dst_path) / remote_path;
if (link)
{
fs::create_symlink(file_path, dst_file_path);
}
2021-09-08 17:22:24 +00:00
else
{
2022-09-26 10:49:20 +00:00
ReadSettings read_settings{};
read_settings.local_fs_method = LocalFSReadMethod::pread;
auto src_buf = createReadBufferFromFileBase(file_path, read_settings, fs::file_size(file_path));
std::shared_ptr<WriteBuffer> dst_buf;
/// test mode for integration tests.
if (test_mode)
2024-03-03 13:22:40 +00:00
dst_buf = std::make_shared<WriteBufferFromHTTP>(HTTPConnectionGroupType::HTTP, Poco::URI(dst_file_path), Poco::Net::HTTPRequest::HTTP_PUT);
else
dst_buf = std::make_shared<WriteBufferFromFile>(dst_file_path);
2021-09-08 17:22:24 +00:00
copyData(*src_buf, *dst_buf);
dst_buf->next();
dst_buf->finalize();
}
2022-05-16 18:59:27 +00:00
}
2021-08-29 14:18:04 +00:00
2024-10-15 16:24:06 +00:00
static void processTableFiles(const fs::path & data_path, fs::path dst_path, bool test_mode, bool link)
2021-09-08 17:22:24 +00:00
{
std::cerr << "Data path: " << data_path << ", destination path: " << dst_path << std::endl;
String prefix;
RE2::FullMatch(data_path.string(), EXTRACT_PATH_PATTERN, &prefix);
std::shared_ptr<WriteBuffer> root_meta;
if (test_mode)
{
dst_path /= "store";
auto files_root = dst_path / prefix;
2024-03-03 13:22:40 +00:00
root_meta = std::make_shared<WriteBufferFromHTTP>(HTTPConnectionGroupType::HTTP, Poco::URI(files_root / ".index"), Poco::Net::HTTPRequest::HTTP_PUT);
2021-09-08 17:22:24 +00:00
}
else
{
dst_path = fs::canonical(dst_path);
auto files_root = dst_path / prefix;
fs::create_directories(files_root);
root_meta = std::make_shared<WriteBufferFromFile>(files_root / ".index");
}
2021-08-29 14:18:04 +00:00
2021-09-08 17:22:24 +00:00
fs::directory_iterator dir_end;
for (fs::directory_iterator dir_it(data_path); dir_it != dir_end; ++dir_it)
2021-08-29 14:18:04 +00:00
{
if (dir_it->is_directory())
{
processFile(dir_it->path(), dst_path, test_mode, link, *root_meta);
2021-09-08 17:22:24 +00:00
String directory_prefix;
RE2::FullMatch(dir_it->path().string(), EXTRACT_PATH_PATTERN, &directory_prefix);
std::shared_ptr<WriteBuffer> directory_meta;
if (test_mode)
{
2024-03-03 13:22:40 +00:00
directory_meta = std::make_shared<WriteBufferFromHTTP>(HTTPConnectionGroupType::HTTP, Poco::URI(dst_path / directory_prefix / ".index"), Poco::Net::HTTPRequest::HTTP_PUT);
2021-09-08 17:22:24 +00:00
}
else
{
dst_path = fs::canonical(dst_path);
fs::create_directories(dst_path / directory_prefix);
directory_meta = std::make_shared<WriteBufferFromFile>(dst_path / directory_prefix / ".index");
}
2021-08-29 14:18:04 +00:00
fs::directory_iterator files_end;
for (fs::directory_iterator file_it(dir_it->path()); file_it != files_end; ++file_it)
processFile(file_it->path(), dst_path, test_mode, link, *directory_meta);
2021-09-08 17:22:24 +00:00
directory_meta->next();
directory_meta->finalize();
2021-08-29 14:18:04 +00:00
}
else
{
processFile(dir_it->path(), dst_path, test_mode, link, *root_meta);
2021-08-29 14:18:04 +00:00
}
}
2021-09-08 17:22:24 +00:00
root_meta->next();
root_meta->finalize();
2021-08-29 14:18:04 +00:00
}
}
int mainEntryClickHouseStaticFilesDiskUploader(int argc, char ** argv)
try
{
using namespace DB;
namespace po = boost::program_options;
po::options_description description("Allowed options", getTerminalWidth());
2021-08-29 14:18:04 +00:00
description.add_options()
("help,h", "produce help message")
("metadata-path", po::value<std::string>(), "Metadata path (SELECT data_paths FROM system.tables WHERE name = 'table_name' AND database = 'database_name')")
2021-08-29 14:18:04 +00:00
("test-mode", "Use test mode, which will put data on given url via PUT")
("link", "Create symlinks instead of copying")
2021-08-29 14:18:04 +00:00
("url", po::value<std::string>(), "Web server url for test mode")
2021-09-03 21:43:15 +00:00
("output-dir", po::value<std::string>(), "Directory to put files in non-test mode");
2021-08-29 14:18:04 +00:00
po::parsed_options parsed = po::command_line_parser(argc, argv).options(description).run();
po::variables_map options;
po::store(parsed, options);
po::notify(options);
if (options.empty() || options.count("help"))
{
std::cout << description << std::endl;
exit(0); // NOLINT(concurrency-mt-unsafe)
2021-08-29 14:18:04 +00:00
}
2021-09-08 17:22:24 +00:00
String metadata_path;
2021-08-29 14:18:04 +00:00
if (options.count("metadata-path"))
metadata_path = options["metadata-path"].as<std::string>();
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "No metadata-path option passed");
fs::path fs_path = fs::weakly_canonical(metadata_path);
if (!fs::exists(fs_path))
{
std::cerr << fmt::format("Data path ({}) does not exist", fs_path.string());
return 1;
}
String root_path;
2021-09-03 21:43:15 +00:00
auto test_mode = options.contains("test-mode");
if (test_mode)
2021-08-29 14:18:04 +00:00
{
if (options.count("url"))
2021-09-08 17:22:24 +00:00
root_path = options["url"].as<std::string>();
2021-08-29 14:18:04 +00:00
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "No url option passed for test mode");
}
else
{
if (options.count("output-dir"))
root_path = options["output-dir"].as<std::string>();
else
root_path = fs::current_path();
}
processTableFiles(fs_path, root_path, test_mode, options.count("link"));
2021-08-29 14:18:04 +00:00
return 0;
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(false) << '\n';
2021-08-29 14:18:04 +00:00
return 1;
}