mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
Get rid of regexp
This commit is contained in:
parent
97e0a0310a
commit
7141b5d041
@ -15,8 +15,7 @@
|
|||||||
|
|
||||||
namespace fs = std::filesystem;
|
namespace fs = std::filesystem;
|
||||||
|
|
||||||
#define UUID_PATTERN "[\\w]{8}-[\\w]{4}-[\\w]{4}-[\\w]{4}-[\\w]{12}"
|
#define EXTRACT_PATH_PATTERN ".*\\/store/(.*)"
|
||||||
#define EXTRACT_UUID_PATTERN fmt::format(".*\\/({})\\/.*", UUID_PATTERN)
|
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -32,40 +31,102 @@ namespace ErrorCodes
|
|||||||
* If test-mode option is added, files will be put by given url via PUT request.
|
* If test-mode option is added, files will be put by given url via PUT request.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
void processTableFiles(const fs::path & path, String root_path, String uuid,
|
void processFile(const fs::path & file_path, const fs::path & dst_path, bool test_mode, WriteBuffer & metadata_buf)
|
||||||
WriteBuffer & metadata_buf, std::function<std::shared_ptr<WriteBuffer>(const String &)> create_dst_buf)
|
|
||||||
{
|
{
|
||||||
fs::directory_iterator dir_end;
|
String remote_path;
|
||||||
auto process_file = [&](const String & file_name, const String & file_path)
|
RE2::FullMatch(file_path.string(), EXTRACT_PATH_PATTERN, &remote_path);
|
||||||
|
bool is_directory = fs::is_directory(file_path);
|
||||||
|
|
||||||
|
writeText(file_path.filename(), metadata_buf);
|
||||||
|
writeChar('\t', metadata_buf);
|
||||||
|
writeBoolText(is_directory, metadata_buf);
|
||||||
|
if (!is_directory)
|
||||||
{
|
{
|
||||||
auto remote_file_name = uuid + "/" + file_name;
|
|
||||||
writeText(remote_file_name, metadata_buf);
|
|
||||||
writeChar('\t', metadata_buf);
|
writeChar('\t', metadata_buf);
|
||||||
writeIntText(fs::file_size(file_path), metadata_buf);
|
writeIntText(fs::file_size(file_path), metadata_buf);
|
||||||
writeChar('\n', metadata_buf);
|
}
|
||||||
|
writeChar('\n', metadata_buf);
|
||||||
|
|
||||||
auto src_buf = createReadBufferFromFileBase(file_path, {}, fs::file_size(file_path));
|
if (is_directory)
|
||||||
fs::create_directories((fs::path(root_path) / remote_file_name).parent_path());
|
return;
|
||||||
auto dst_buf = create_dst_buf(remote_file_name);
|
|
||||||
|
|
||||||
copyData(*src_buf, *dst_buf);
|
auto dst_file_path = fs::path(dst_path) / remote_path;
|
||||||
dst_buf->next();
|
|
||||||
dst_buf->finalize();
|
|
||||||
};
|
|
||||||
|
|
||||||
for (fs::directory_iterator dir_it(path); dir_it != dir_end; ++dir_it)
|
auto src_buf = createReadBufferFromFileBase(file_path, {}, fs::file_size(file_path));
|
||||||
|
std::shared_ptr<WriteBuffer> dst_buf;
|
||||||
|
|
||||||
|
/// test mode for integration tests.
|
||||||
|
if (test_mode)
|
||||||
|
dst_buf = std::make_shared<WriteBufferFromHTTP>(Poco::URI(dst_file_path), Poco::Net::HTTPRequest::HTTP_PUT);
|
||||||
|
else
|
||||||
|
dst_buf = std::make_shared<WriteBufferFromFile>(dst_file_path);
|
||||||
|
|
||||||
|
copyData(*src_buf, *dst_buf);
|
||||||
|
dst_buf->next();
|
||||||
|
dst_buf->finalize();
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
void processTableFiles(const fs::path & data_path, fs::path dst_path, bool test_mode)
|
||||||
|
{
|
||||||
|
std::cerr << "Data path: " << data_path << ", destination path: " << dst_path << std::endl;
|
||||||
|
|
||||||
|
String prefix;
|
||||||
|
RE2::FullMatch(data_path.string(), EXTRACT_PATH_PATTERN, &prefix);
|
||||||
|
|
||||||
|
std::shared_ptr<WriteBuffer> root_meta;
|
||||||
|
if (test_mode)
|
||||||
|
{
|
||||||
|
dst_path /= "store";
|
||||||
|
auto files_root = dst_path / prefix;
|
||||||
|
root_meta = std::make_shared<WriteBufferFromHTTP>(Poco::URI(files_root / ".index"), Poco::Net::HTTPRequest::HTTP_PUT);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
dst_path = fs::canonical(dst_path);
|
||||||
|
auto files_root = dst_path / prefix;
|
||||||
|
fs::create_directories(files_root);
|
||||||
|
root_meta = std::make_shared<WriteBufferFromFile>(files_root / ".index");
|
||||||
|
}
|
||||||
|
|
||||||
|
fs::directory_iterator dir_end;
|
||||||
|
for (fs::directory_iterator dir_it(data_path); dir_it != dir_end; ++dir_it)
|
||||||
{
|
{
|
||||||
if (dir_it->is_directory())
|
if (dir_it->is_directory())
|
||||||
{
|
{
|
||||||
|
processFile(dir_it->path(), dst_path, test_mode, *root_meta);
|
||||||
|
|
||||||
|
String directory_prefix;
|
||||||
|
RE2::FullMatch(dir_it->path().string(), EXTRACT_PATH_PATTERN, &directory_prefix);
|
||||||
|
|
||||||
|
std::shared_ptr<WriteBuffer> directory_meta;
|
||||||
|
if (test_mode)
|
||||||
|
{
|
||||||
|
auto files_root = dst_path / prefix;
|
||||||
|
directory_meta = std::make_shared<WriteBufferFromHTTP>(Poco::URI(dst_path / directory_prefix / ".index"), Poco::Net::HTTPRequest::HTTP_PUT);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
dst_path = fs::canonical(dst_path);
|
||||||
|
auto files_root = dst_path / prefix;
|
||||||
|
fs::create_directories(dst_path / directory_prefix);
|
||||||
|
directory_meta = std::make_shared<WriteBufferFromFile>(dst_path / directory_prefix / ".index");
|
||||||
|
}
|
||||||
|
|
||||||
fs::directory_iterator files_end;
|
fs::directory_iterator files_end;
|
||||||
for (fs::directory_iterator file_it(dir_it->path()); file_it != files_end; ++file_it)
|
for (fs::directory_iterator file_it(dir_it->path()); file_it != files_end; ++file_it)
|
||||||
process_file(dir_it->path().filename() / file_it->path().filename(), file_it->path());
|
processFile(file_it->path(), dst_path, test_mode, *directory_meta);
|
||||||
|
|
||||||
|
directory_meta->next();
|
||||||
|
directory_meta->finalize();
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
process_file(dir_it->path().filename(), dir_it->path());
|
processFile(dir_it->path(), dst_path, test_mode, *root_meta);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
root_meta->next();
|
||||||
|
root_meta->finalize();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -94,7 +155,7 @@ try
|
|||||||
exit(0);
|
exit(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
String url, metadata_path;
|
String metadata_path;
|
||||||
|
|
||||||
if (options.count("metadata-path"))
|
if (options.count("metadata-path"))
|
||||||
metadata_path = options["metadata-path"].as<std::string>();
|
metadata_path = options["metadata-path"].as<std::string>();
|
||||||
@ -108,28 +169,14 @@ try
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
String uuid;
|
|
||||||
if (!RE2::Extract(metadata_path, EXTRACT_UUID_PATTERN, "\\1", &uuid))
|
|
||||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot extract uuid for: {}", metadata_path);
|
|
||||||
|
|
||||||
std::shared_ptr<WriteBuffer> metadata_buf;
|
|
||||||
std::function<std::shared_ptr<WriteBuffer>(const String &)> create_dst_buf;
|
|
||||||
String root_path;
|
String root_path;
|
||||||
|
|
||||||
auto test_mode = options.contains("test-mode");
|
auto test_mode = options.contains("test-mode");
|
||||||
if (test_mode)
|
if (test_mode)
|
||||||
{
|
{
|
||||||
if (options.count("url"))
|
if (options.count("url"))
|
||||||
url = options["url"].as<std::string>();
|
root_path = options["url"].as<std::string>();
|
||||||
else
|
else
|
||||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "No url option passed for test mode");
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "No url option passed for test mode");
|
||||||
|
|
||||||
metadata_buf = std::make_shared<WriteBufferFromHTTP>(Poco::URI(fs::path(url) / (".index-" + uuid)), Poco::Net::HTTPRequest::HTTP_PUT);
|
|
||||||
|
|
||||||
create_dst_buf = [&](const String & remote_file_name)
|
|
||||||
{
|
|
||||||
return std::make_shared<WriteBufferFromHTTP>(Poco::URI(fs::path(url) / remote_file_name), Poco::Net::HTTPRequest::HTTP_PUT);
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -137,17 +184,9 @@ try
|
|||||||
root_path = options["output-dir"].as<std::string>();
|
root_path = options["output-dir"].as<std::string>();
|
||||||
else
|
else
|
||||||
root_path = fs::current_path();
|
root_path = fs::current_path();
|
||||||
|
|
||||||
metadata_buf = std::make_shared<WriteBufferFromFile>(fs::path(root_path) / (".index-" + uuid));
|
|
||||||
create_dst_buf = [&](const String & remote_file_name)
|
|
||||||
{
|
|
||||||
return std::make_shared<WriteBufferFromFile>(fs::path(root_path) / remote_file_name);
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
processTableFiles(fs_path, root_path, uuid, *metadata_buf, create_dst_buf);
|
processTableFiles(fs_path, root_path, test_mode);
|
||||||
metadata_buf->next();
|
|
||||||
metadata_buf->finalize();
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -17,14 +17,6 @@
|
|||||||
#include <re2/re2.h>
|
#include <re2/re2.h>
|
||||||
|
|
||||||
|
|
||||||
#define UUID_PATTERN "[\\w]{8}-[\\w]{4}-[\\w]{4}-[\\w]{4}-[\\w]{12}"
|
|
||||||
#define EXTRACT_UUID_PATTERN fmt::format(".*/({})\\/.*", UUID_PATTERN)
|
|
||||||
|
|
||||||
#define MATCH_DIRECTORY_FILE_PATTERN fmt::format(".*({})\\/(\\w+)\\/(.*)", UUID_PATTERN)
|
|
||||||
#define MATCH_DIRECTORY_PATTERN fmt::format(".*({})\\/(\\w+)\\/", UUID_PATTERN)
|
|
||||||
#define MATCH_ROOT_FILE_PATTERN fmt::format(".*({})\\/(\\w+\\.\\w+)", UUID_PATTERN)
|
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -33,84 +25,82 @@ namespace ErrorCodes
|
|||||||
extern const int BAD_ARGUMENTS;
|
extern const int BAD_ARGUMENTS;
|
||||||
extern const int LOGICAL_ERROR;
|
extern const int LOGICAL_ERROR;
|
||||||
extern const int NETWORK_ERROR;
|
extern const int NETWORK_ERROR;
|
||||||
|
extern const int FILE_DOESNT_EXIST;
|
||||||
|
extern const int DIRECTORY_DOESNT_EXIST;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void DiskWebServer::Metadata::initialize(const String & uri_with_path, const String & table_uuid, ContextPtr context) const
|
void DiskWebServer::initialize(const String & uri_path) const
|
||||||
{
|
{
|
||||||
ReadWriteBufferFromHTTP metadata_buf(Poco::URI(fs::path(uri_with_path) / (".index-" + table_uuid)),
|
std::vector<String> directories_to_load;
|
||||||
Poco::Net::HTTPRequest::HTTP_GET,
|
LOG_TRACE(log, "Loading metadata for directory: {}", uri_path);
|
||||||
ReadWriteBufferFromHTTP::OutStreamCallback(),
|
try
|
||||||
ConnectionTimeouts::getHTTPTimeouts(context));
|
|
||||||
String uuid, directory, file, remote_file_name;
|
|
||||||
size_t file_size;
|
|
||||||
|
|
||||||
while (!metadata_buf.eof())
|
|
||||||
{
|
{
|
||||||
readText(remote_file_name, metadata_buf);
|
ReadWriteBufferFromHTTP metadata_buf(Poco::URI(fs::path(uri_path) / ".index"),
|
||||||
assertChar('\t', metadata_buf);
|
Poco::Net::HTTPRequest::HTTP_GET,
|
||||||
readIntText(file_size, metadata_buf);
|
ReadWriteBufferFromHTTP::OutStreamCallback(),
|
||||||
assertChar('\n', metadata_buf);
|
ConnectionTimeouts::getHTTPTimeouts(getContext()));
|
||||||
LOG_TRACE(&Poco::Logger::get("DiskWeb"), "Read file: {}, size: {}", remote_file_name, file_size);
|
String file_name;
|
||||||
|
FileData file_data;
|
||||||
|
|
||||||
/*
|
String dir_name = fs::path(uri_path.substr(url.size())) / "";
|
||||||
* URI/ {uri}/{uuid}/all_x_x_x/{file}
|
LOG_TRACE(&Poco::Logger::get("DiskWeb"), "Adding directory: {}", dir_name);
|
||||||
* ...
|
|
||||||
* {uri}/{uuid}/format_version.txt
|
while (!metadata_buf.eof())
|
||||||
* {uri}/{uuid}/detached-{file}
|
|
||||||
* ...
|
|
||||||
**/
|
|
||||||
if (RE2::FullMatch(remote_file_name, MATCH_DIRECTORY_FILE_PATTERN, &uuid, &directory, &file))
|
|
||||||
{
|
{
|
||||||
if (uuid != table_uuid)
|
readText(file_name, metadata_buf);
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected uuid: {}, expected: {}", uuid, table_uuid);
|
assertChar('\t', metadata_buf);
|
||||||
|
|
||||||
tables_data[uuid][directory].emplace(std::make_pair(file, file_size));
|
bool is_directory;
|
||||||
}
|
readBoolText(is_directory, metadata_buf);
|
||||||
else if (RE2::FullMatch(remote_file_name, MATCH_ROOT_FILE_PATTERN, &uuid, &file))
|
if (!is_directory)
|
||||||
{
|
{
|
||||||
if (uuid != table_uuid)
|
assertChar('\t', metadata_buf);
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected uuid: {}, expected: {}", uuid, table_uuid);
|
readIntText(file_data.size, metadata_buf);
|
||||||
|
}
|
||||||
|
assertChar('\n', metadata_buf);
|
||||||
|
|
||||||
tables_data[uuid][file].emplace(std::make_pair(file, file_size));
|
file_data.type = is_directory ? FileType::Directory : FileType::File;
|
||||||
|
String file_path = fs::path(uri_path) / file_name;
|
||||||
|
if(file_data.type == FileType::Directory)
|
||||||
|
directories_to_load.push_back(file_path);
|
||||||
|
file_path = file_path.substr(url.size());
|
||||||
|
|
||||||
|
files.emplace(std::make_pair(file_path, file_data));
|
||||||
|
LOG_TRACE(&Poco::Logger::get("DiskWeb"), "Adding file: {}, size: {}", file_path, file_data.size);
|
||||||
}
|
}
|
||||||
else
|
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected file: {}", remote_file_name);
|
LOG_TRACE(&Poco::Logger::get("DiskWeb"), "Adding directory: {}", dir_name);
|
||||||
|
files.emplace(std::make_pair(dir_name, FileData({ .type = FileType::Directory })));
|
||||||
}
|
}
|
||||||
|
catch (Exception & e)
|
||||||
|
{
|
||||||
|
e.addMessage("while loadng disk metadata");
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const auto & directory_path : directories_to_load)
|
||||||
|
initialize(directory_path);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
template <typename Directory>
|
class DiskWebServerDirectoryIterator final : public IDiskDirectoryIterator
|
||||||
class DiskWebDirectoryIterator final : public IDiskDirectoryIterator
|
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
explicit DiskWebServerDirectoryIterator(std::vector<fs::path> && dir_file_paths_)
|
||||||
DiskWebDirectoryIterator(Directory & directory_, const String & directory_root_)
|
: dir_file_paths(std::move(dir_file_paths_)), iter(dir_file_paths.begin()) {}
|
||||||
: directory(directory_), iter(directory.begin()), directory_root(directory_root_)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
void next() override { ++iter; }
|
void next() override { ++iter; }
|
||||||
|
|
||||||
bool isValid() const override
|
bool isValid() const override { return iter != dir_file_paths.end(); }
|
||||||
{
|
|
||||||
return iter != directory.end();
|
|
||||||
}
|
|
||||||
|
|
||||||
String path() const override
|
String path() const override { return iter->string(); }
|
||||||
{
|
|
||||||
return fs::path(directory_root) / name();
|
|
||||||
}
|
|
||||||
|
|
||||||
String name() const override
|
String name() const override { return iter->filename(); }
|
||||||
{
|
|
||||||
return iter->first;
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Directory & directory;
|
std::vector<fs::path> dir_file_paths;
|
||||||
typename Directory::iterator iter;
|
std::vector<fs::path>::iterator iter;
|
||||||
const String directory_root;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
@ -146,177 +136,95 @@ private:
|
|||||||
|
|
||||||
DiskWebServer::DiskWebServer(
|
DiskWebServer::DiskWebServer(
|
||||||
const String & disk_name_,
|
const String & disk_name_,
|
||||||
const String & uri_,
|
const String & url_,
|
||||||
const String & metadata_path_,
|
const String &,
|
||||||
ContextPtr context_,
|
ContextPtr context_,
|
||||||
SettingsPtr settings_)
|
SettingsPtr settings_)
|
||||||
: WithContext(context_->getGlobalContext())
|
: WithContext(context_->getGlobalContext())
|
||||||
, log(&Poco::Logger::get("DiskWeb"))
|
, log(&Poco::Logger::get("DiskWeb"))
|
||||||
, uri(uri_)
|
, url(url_)
|
||||||
, name(disk_name_)
|
, name(disk_name_)
|
||||||
, metadata_path(metadata_path_)
|
, metadata_path(url)
|
||||||
, settings(std::move(settings_))
|
, settings(std::move(settings_))
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
String DiskWebServer::getFileName(const String & path)
|
|
||||||
{
|
|
||||||
String result;
|
|
||||||
|
|
||||||
if (RE2::FullMatch(path, MATCH_DIRECTORY_FILE_PATTERN)
|
|
||||||
&& RE2::Extract(path, MATCH_DIRECTORY_FILE_PATTERN, R"(\1/\2/\3)", &result))
|
|
||||||
return result;
|
|
||||||
|
|
||||||
if (RE2::FullMatch(path, MATCH_ROOT_FILE_PATTERN)
|
|
||||||
&& RE2::Extract(path, MATCH_ROOT_FILE_PATTERN, R"(\1/\2)", &result))
|
|
||||||
return result;
|
|
||||||
|
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected file: {}", path);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
bool DiskWebServer::findFileInMetadata(const String & path, File & file_info) const
|
|
||||||
{
|
|
||||||
String table_uuid, directory_name, file_name;
|
|
||||||
|
|
||||||
if (RE2::FullMatch(path, MATCH_DIRECTORY_FILE_PATTERN, &table_uuid, &directory_name, &file_name)
|
|
||||||
|| RE2::FullMatch(path, MATCH_ROOT_FILE_PATTERN, &table_uuid, &file_name)
|
|
||||||
|| RE2::FullMatch(path, MATCH_DIRECTORY_PATTERN, &table_uuid, &directory_name))
|
|
||||||
{
|
|
||||||
if (directory_name.empty())
|
|
||||||
directory_name = file_name;
|
|
||||||
|
|
||||||
if (!metadata.tables_data.count(table_uuid))
|
|
||||||
return false;
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
if (!metadata.tables_data.count(table_uuid))
|
|
||||||
metadata.initialize(uri, table_uuid, getContext());
|
|
||||||
}
|
|
||||||
catch (const Poco::Exception &)
|
|
||||||
{
|
|
||||||
const auto message = getCurrentExceptionMessage(false);
|
|
||||||
bool can_throw = CurrentThread::isInitialized() && CurrentThread::get().getQueryContext();
|
|
||||||
if (can_throw)
|
|
||||||
throw Exception(ErrorCodes::NETWORK_ERROR, "Cannot load disk metadata. Error: {}", message);
|
|
||||||
|
|
||||||
LOG_TRACE(&Poco::Logger::get("DiskWeb"), "Cannot load disk metadata. Error: {}", message);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!metadata.tables_data[table_uuid].count(directory_name))
|
|
||||||
return false;
|
|
||||||
|
|
||||||
if (file_name.empty())
|
|
||||||
return true;
|
|
||||||
|
|
||||||
const auto & files = metadata.tables_data[table_uuid][directory_name];
|
|
||||||
auto file = files.find(file_name);
|
|
||||||
if (file == files.end())
|
|
||||||
return false;
|
|
||||||
|
|
||||||
file_info = File(file->first, file->second);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
bool DiskWebServer::exists(const String & path) const
|
bool DiskWebServer::exists(const String & path) const
|
||||||
{
|
{
|
||||||
LOG_TRACE(log, "Checking existence of file: {}", path);
|
LOG_TRACE(log, "Checkig existance of path: {}", path);
|
||||||
|
// if (files.find(path) == files.end())
|
||||||
File file;
|
// initialize(fs::path(url) / path);
|
||||||
return findFileInMetadata(path, file);
|
return files.find(path) != files.end();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & path, const ReadSettings & read_settings, size_t) const
|
std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & path, const ReadSettings & read_settings, size_t) const
|
||||||
{
|
{
|
||||||
|
LOG_TRACE(log, "Read from path: {}", path);
|
||||||
|
auto iter = files.find(path);
|
||||||
|
if (iter == files.end())
|
||||||
|
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "File path {} does not exist", path);
|
||||||
|
|
||||||
File file;
|
auto fs_path = fs::path(url) / path;
|
||||||
if (!findFileInMetadata(path, file))
|
auto remote_path = fs_path.parent_path() / (escapeForFileName(fs_path.stem()) + fs_path.extension().string());
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "File {} not found", path);
|
remote_path = remote_path.string().substr(url.size());
|
||||||
|
|
||||||
auto file_name = escapeForFileName(fs::path(path).stem()) + fs::path(path).extension().string();
|
RemoteMetadata meta(path, remote_path);
|
||||||
auto remote_path = fs::path(path).parent_path() / file_name;
|
meta.remote_fs_objects.emplace_back(std::make_pair(remote_path, iter->second.size));
|
||||||
LOG_TRACE(log, "Read from file by path: {}", remote_path.string());
|
|
||||||
|
|
||||||
RemoteMetadata meta(uri, remote_path);
|
auto reader = std::make_unique<ReadBufferFromWebServer>(url, meta, getContext(), settings->max_read_tries, read_settings.remote_fs_buffer_size);
|
||||||
meta.remote_fs_objects.emplace_back(std::make_pair(getFileName(remote_path), file.size));
|
|
||||||
|
|
||||||
auto reader = std::make_unique<ReadBufferFromWebServer>(uri, meta, getContext(), settings->max_read_tries, read_settings.remote_fs_buffer_size);
|
|
||||||
return std::make_unique<SeekAvoidingReadBuffer>(std::move(reader), settings->min_bytes_for_seek);
|
return std::make_unique<SeekAvoidingReadBuffer>(std::move(reader), settings->min_bytes_for_seek);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
DiskDirectoryIteratorPtr DiskWebServer::iterateDirectory(const String & path)
|
DiskDirectoryIteratorPtr DiskWebServer::iterateDirectory(const String & path)
|
||||||
{
|
{
|
||||||
LOG_TRACE(log, "Iterate directory: {}", path);
|
if (files.find(path) == files.end())
|
||||||
String uuid;
|
initialize(fs::path(url) / path);
|
||||||
|
|
||||||
if (RE2::FullMatch(path, ".*/store/"))
|
if (files.find(path) == files.end())
|
||||||
return std::make_unique<DiskWebDirectoryIterator<UUIDDirectoryListing>>(metadata.tables_data, path);
|
throw Exception("Directory '" + path + "' does not exist", ErrorCodes::DIRECTORY_DOESNT_EXIST);
|
||||||
|
|
||||||
if (!RE2::Extract(path, EXTRACT_UUID_PATTERN, "\\1", &uuid))
|
std::vector<fs::path> dir_file_paths;
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot extract uuid for: {}", path);
|
for (const auto & file : files)
|
||||||
|
if (parentPath(file.first) == path)
|
||||||
|
dir_file_paths.emplace_back(file.first);
|
||||||
|
|
||||||
/// Do not throw if it is not a query, but disk load.
|
LOG_TRACE(log, "Iterate directory {} with {} files", path, dir_file_paths.size());
|
||||||
bool can_throw = CurrentThread::isInitialized() && CurrentThread::get().getQueryContext();
|
if (dir_file_paths.empty())
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "EMpty dir: {}", path);
|
||||||
try
|
return std::make_unique<DiskWebServerDirectoryIterator>(std::move(dir_file_paths));
|
||||||
{
|
|
||||||
if (!metadata.tables_data.count(uuid))
|
|
||||||
metadata.initialize(uri, uuid, getContext());
|
|
||||||
}
|
|
||||||
catch (const Poco::Exception &)
|
|
||||||
{
|
|
||||||
const auto message = getCurrentExceptionMessage(false);
|
|
||||||
if (can_throw)
|
|
||||||
{
|
|
||||||
throw Exception(ErrorCodes::NETWORK_ERROR, "Cannot load disk metadata. Error: {}", message);
|
|
||||||
}
|
|
||||||
|
|
||||||
LOG_TRACE(&Poco::Logger::get("DiskWeb"), "Cannot load disk metadata. Error: {}", message);
|
|
||||||
/// Empty iterator.
|
|
||||||
return std::make_unique<DiskWebDirectoryIterator<RootDirectoryListing>>(metadata.tables_data[""], path);
|
|
||||||
}
|
|
||||||
|
|
||||||
String directory_name;
|
|
||||||
if (RE2::FullMatch(path, MATCH_DIRECTORY_PATTERN, &uuid, &directory_name))
|
|
||||||
{
|
|
||||||
if (metadata.tables_data[uuid].contains(directory_name))
|
|
||||||
return std::make_unique<DiskWebDirectoryIterator<DirectoryListing>>(metadata.tables_data[uuid][directory_name], path);
|
|
||||||
if (can_throw)
|
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Directory {} does not exist. (uuid: {})", directory_name, uuid);
|
|
||||||
return std::make_unique<DiskWebDirectoryIterator<RootDirectoryListing>>(metadata.tables_data[""], path); /// Empty directory.
|
|
||||||
}
|
|
||||||
|
|
||||||
return std::make_unique<DiskWebDirectoryIterator<RootDirectoryListing>>(metadata.tables_data[uuid], path);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
size_t DiskWebServer::getFileSize(const String & path) const
|
size_t DiskWebServer::getFileSize(const String & path) const
|
||||||
{
|
{
|
||||||
File file;
|
auto iter = files.find(path);
|
||||||
if (!findFileInMetadata(path, file))
|
if (iter == files.end())
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "File {} not found", path);
|
return false;
|
||||||
return file.size;
|
|
||||||
|
return iter->second.size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
bool DiskWebServer::isFile(const String & path) const
|
bool DiskWebServer::isFile(const String & path) const
|
||||||
{
|
{
|
||||||
return RE2::FullMatch(path, ".*/\\w+.\\w+");
|
auto iter = files.find(path);
|
||||||
|
if (iter == files.end())
|
||||||
|
return false;
|
||||||
|
|
||||||
|
return iter->second.type == FileType::File;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
bool DiskWebServer::isDirectory(const String & path) const
|
bool DiskWebServer::isDirectory(const String & path) const
|
||||||
{
|
{
|
||||||
return RE2::FullMatch(path, ".*/\\w+");
|
auto iter = files.find(path);
|
||||||
|
if (iter == files.end())
|
||||||
|
return false;
|
||||||
|
|
||||||
|
return iter->second.type == FileType::Directory;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -66,45 +66,8 @@ public:
|
|||||||
ContextPtr context,
|
ContextPtr context,
|
||||||
SettingsPtr settings_);
|
SettingsPtr settings_);
|
||||||
|
|
||||||
struct File
|
|
||||||
{
|
|
||||||
String name;
|
|
||||||
size_t size;
|
|
||||||
File(const String & name_ = "", const size_t size_ = 0) : name(name_), size(size_) {}
|
|
||||||
};
|
|
||||||
|
|
||||||
using Directory = std::unordered_map<String, size_t>;
|
|
||||||
|
|
||||||
/* Each root directory contains either directories like
|
|
||||||
* all_x_x_x/{file}, detached/, etc, or root files like format_version.txt.
|
|
||||||
*/
|
|
||||||
using RootDirectory = std::unordered_map<String, Directory>;
|
|
||||||
|
|
||||||
/* Each table is attached via ATTACH TABLE table UUID <uuid> <def>.
|
|
||||||
* Then there is a mapping: {table uuid} -> {root directory}
|
|
||||||
*/
|
|
||||||
using TableDirectories = std::unordered_map<String, RootDirectory>;
|
|
||||||
|
|
||||||
struct Metadata
|
|
||||||
{
|
|
||||||
/// Fetch meta only when required.
|
|
||||||
mutable TableDirectories tables_data;
|
|
||||||
|
|
||||||
Metadata() = default;
|
|
||||||
|
|
||||||
void initialize(const String & uri_with_path, const String & uuid, ContextPtr context) const;
|
|
||||||
};
|
|
||||||
|
|
||||||
using UUIDDirectoryListing = std::unordered_map<String, RootDirectory>;
|
|
||||||
using RootDirectoryListing = std::unordered_map<String, Directory>;
|
|
||||||
using DirectoryListing = std::unordered_map<String, size_t>;
|
|
||||||
|
|
||||||
bool findFileInMetadata(const String & path, File & file_info) const;
|
|
||||||
|
|
||||||
bool supportZeroCopyReplication() const override { return false; }
|
bool supportZeroCopyReplication() const override { return false; }
|
||||||
|
|
||||||
static String getFileName(const String & path);
|
|
||||||
|
|
||||||
DiskType getType() const override { return DiskType::WebServer; }
|
DiskType getType() const override { return DiskType::WebServer; }
|
||||||
|
|
||||||
bool isRemote() const override { return true; }
|
bool isRemote() const override { return true; }
|
||||||
@ -223,13 +186,28 @@ public:
|
|||||||
void createHardLink(const String &, const String &) override {}
|
void createHardLink(const String &, const String &) override {}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
void initialize(const String & uri_path) const;
|
||||||
|
|
||||||
|
enum class FileType
|
||||||
|
{
|
||||||
|
File = 1,
|
||||||
|
Directory = 2
|
||||||
|
};
|
||||||
|
|
||||||
|
struct FileData
|
||||||
|
{
|
||||||
|
FileType type;
|
||||||
|
size_t size;
|
||||||
|
};
|
||||||
|
|
||||||
|
using Files = std::unordered_map<String, FileData>; /// file path -> file data
|
||||||
|
mutable Files files;
|
||||||
|
|
||||||
Poco::Logger * log;
|
Poco::Logger * log;
|
||||||
String uri, name;
|
String url;
|
||||||
|
String name;
|
||||||
const String metadata_path;
|
const String metadata_path;
|
||||||
SettingsPtr settings;
|
SettingsPtr settings;
|
||||||
|
|
||||||
Metadata metadata;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -8,6 +8,7 @@ import math
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
from boto.s3.connection import S3Connection
|
from boto.s3.connection import S3Connection
|
||||||
|
from boto.s3.key import Key
|
||||||
except ImportError:
|
except ImportError:
|
||||||
raise ImportError("You have to install boto package 'pip install boto'")
|
raise ImportError("You have to install boto package 'pip install boto'")
|
||||||
|
|
||||||
@ -63,28 +64,24 @@ class S3API(object):
|
|||||||
key.set_contents_from_filename(local_file_path, cb=call_back)
|
key.set_contents_from_filename(local_file_path, cb=call_back)
|
||||||
|
|
||||||
def upload_data_for_static_files_disk(self, bucket_name, directory_path, s3_path):
|
def upload_data_for_static_files_disk(self, bucket_name, directory_path, s3_path):
|
||||||
logging.info("Will upload files to bucket '%s' from directory '%s' to s3 path '%s'", bucket_name, directory_path, s3_path)
|
|
||||||
bucket = self.connection.get_bucket(bucket_name)
|
bucket = self.connection.get_bucket(bucket_name)
|
||||||
for file_name in os.listdir(directory_path):
|
if s3_path.endswith("/"):
|
||||||
if file_name.startswith('.index'):
|
s3_path += "store/"
|
||||||
local_file_path = os.path.join(directory_path, file_name)
|
else:
|
||||||
s3_file_path = os.path.join(s3_path, file_name)
|
s3_path += "/store/"
|
||||||
|
print(s3_path)
|
||||||
|
for root, dirs, files in os.walk(directory_path):
|
||||||
|
path = root.split(os.sep)
|
||||||
|
for file in files:
|
||||||
|
local_file_path = os.path.join(root, file)
|
||||||
|
s3_file = local_file_path[len(directory_path) + 1:]
|
||||||
|
s3_file_path = os.path.join(s3_path, s3_file)
|
||||||
self.set_file_contents(bucket, local_file_path, s3_file_path)
|
self.set_file_contents(bucket, local_file_path, s3_file_path)
|
||||||
|
|
||||||
file_path = os.path.join(directory_path, file_name)
|
|
||||||
with open(file_path) as file:
|
|
||||||
lines = file.readlines()
|
|
||||||
files = [line.rstrip() for line in lines]
|
|
||||||
for file_line in files:
|
|
||||||
file_path = file_line.split('\t')[0]
|
|
||||||
s3_file_path = os.path.join(s3_path, file_path)
|
|
||||||
local_file_path = os.path.join(directory_path, file_path)
|
|
||||||
self.set_file_contents(bucket, local_file_path, s3_file_path)
|
|
||||||
|
|
||||||
logging.info("Uploading finished")
|
logging.info("Uploading finished")
|
||||||
return "https://{bucket}.{mds_url}/{path}".format(bucket=bucket_name, mds_url=self.mds_url, path=s3_path)
|
return "https://{bucket}.{mds_url}/{path}".format(bucket=bucket_name, mds_url=self.mds_url, path=s3_path)
|
||||||
|
|
||||||
def list_bucket_keys(self, bucket_name, s3_path):
|
def list_bucket_keys(self, bucket_name):
|
||||||
bucket = self.connection.get_bucket(bucket_name)
|
bucket = self.connection.get_bucket(bucket_name)
|
||||||
for obj in bucket.get_all_keys():
|
for obj in bucket.get_all_keys():
|
||||||
print(obj.key)
|
print(obj.key)
|
||||||
@ -156,20 +153,17 @@ if __name__ == "__main__":
|
|||||||
args.s3_api_url, args.s3_common_url)
|
args.s3_api_url, args.s3_common_url)
|
||||||
|
|
||||||
file_path = ''
|
file_path = ''
|
||||||
directory_path = ''
|
directory_path = args.directory_path
|
||||||
s3_path = args.s3_path
|
s3_path = args.s3_path
|
||||||
|
|
||||||
if args.s3_path and args.list_directory:
|
if args.list_directory:
|
||||||
s3_conn.list_bucket_keys(args.bucket_name, s3_path)
|
s3_conn.list_bucket_keys(args.bucket_name)
|
||||||
|
elif args.remove_directory:
|
||||||
|
print('Removing s3 path: ' + args.remove_directory)
|
||||||
|
s3_conn.remove_folder_from_bucket(args.bucket_name, args.remove_directory)
|
||||||
elif args.directory_path is not None:
|
elif args.directory_path is not None:
|
||||||
directory_path = args.directory_path
|
url = s3_conn.upload_data_for_static_files_disk(args.bucket_name, directory_path, s3_path)
|
||||||
|
logging.info("Data uploaded: %s", url)
|
||||||
if args.remove_directory:
|
|
||||||
print('Removing s3 path: ' + args.remove_directory)
|
|
||||||
s3_conn.remove_folder_from_bucket(args.bucket_name, args.remove_directory)
|
|
||||||
else:
|
|
||||||
url = s3_conn.upload_data_for_static_files_disk(args.bucket_name, directory_path, s3_path)
|
|
||||||
logging.info("Data uploaded: %s", url)
|
|
||||||
else:
|
else:
|
||||||
|
|
||||||
if args.table_name is not None:
|
if args.table_name is not None:
|
||||||
|
Loading…
Reference in New Issue
Block a user