Merge pull request #41652 from kssenii/fix-disk-web

Fixes for disk web
This commit is contained in:
Kseniia Sumarokova 2022-09-23 12:43:50 +02:00 committed by GitHub
commit a0c5da3459
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 50 additions and 35 deletions

View File

@ -2,6 +2,7 @@
#include <Disks/IDisk.h> #include <Disks/IDisk.h>
#include <Common/filesystemHelpers.h> #include <Common/filesystemHelpers.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <Common/escapeForFileName.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
@ -53,7 +54,31 @@ const std::string & MetadataStorageFromStaticFilesWebServer::getPath() const
bool MetadataStorageFromStaticFilesWebServer::exists(const std::string & path) const bool MetadataStorageFromStaticFilesWebServer::exists(const std::string & path) const
{ {
return object_storage.files.contains(path); fs::path fs_path(path);
if (fs_path.has_extension())
fs_path = fs_path.parent_path();
initializeIfNeeded(fs_path, false);
if (object_storage.files.empty())
return false;
if (object_storage.files.contains(path))
return true;
/// `object_storage.files` contains files + directories only inside `metadata_path / uuid_3_digit / uuid /`
/// (specific table files only), but we need to be able to also tell if `exists(<metadata_path>)`, for example.
auto it = std::lower_bound(
object_storage.files.begin(),
object_storage.files.end(),
path,
[](const auto & file, const std::string & path_) { return file.first < path_; }
);
if (startsWith(it->first, path)
|| (it != object_storage.files.begin() && startsWith(std::prev(it)->first, path)))
return true;
return false;
} }
void MetadataStorageFromStaticFilesWebServer::assertExists(const std::string & path) const void MetadataStorageFromStaticFilesWebServer::assertExists(const std::string & path) const
@ -98,7 +123,10 @@ uint64_t MetadataStorageFromStaticFilesWebServer::getFileSize(const String & pat
StoredObjects MetadataStorageFromStaticFilesWebServer::getStorageObjects(const std::string & path) const StoredObjects MetadataStorageFromStaticFilesWebServer::getStorageObjects(const std::string & path) const
{ {
assertExists(path); assertExists(path);
return {StoredObject::create(object_storage, path, object_storage.files.at(path).size, true)}; auto fs_path = fs::path(object_storage.url) / path;
std::string remote_path = fs_path.parent_path() / (escapeForFileName(fs_path.stem()) + fs_path.extension().string());
remote_path = remote_path.substr(object_storage.url.size());
return {StoredObject::create(object_storage, remote_path, object_storage.files.at(path).size, true)};
} }
std::vector<std::string> MetadataStorageFromStaticFilesWebServer::listDirectory(const std::string & path) const std::vector<std::string> MetadataStorageFromStaticFilesWebServer::listDirectory(const std::string & path) const
@ -112,7 +140,7 @@ std::vector<std::string> MetadataStorageFromStaticFilesWebServer::listDirectory(
return result; return result;
} }
bool MetadataStorageFromStaticFilesWebServer::initializeIfNeeded(const std::string & path) const bool MetadataStorageFromStaticFilesWebServer::initializeIfNeeded(const std::string & path, std::optional<bool> throw_on_error) const
{ {
if (object_storage.files.find(path) == object_storage.files.end()) if (object_storage.files.find(path) == object_storage.files.end())
{ {
@ -123,7 +151,7 @@ bool MetadataStorageFromStaticFilesWebServer::initializeIfNeeded(const std::stri
catch (...) catch (...)
{ {
const auto message = getCurrentExceptionMessage(false); const auto message = getCurrentExceptionMessage(false);
bool can_throw = CurrentThread::isInitialized() && CurrentThread::get().getQueryContext(); bool can_throw = throw_on_error.has_value() ? *throw_on_error : CurrentThread::isInitialized() && CurrentThread::get().getQueryContext();
if (can_throw) if (can_throw)
throw Exception(ErrorCodes::NETWORK_ERROR, "Cannot load disk metadata. Error: {}", message); throw Exception(ErrorCodes::NETWORK_ERROR, "Cannot load disk metadata. Error: {}", message);
@ -140,13 +168,17 @@ DirectoryIteratorPtr MetadataStorageFromStaticFilesWebServer::iterateDirectory(c
std::vector<fs::path> dir_file_paths; std::vector<fs::path> dir_file_paths;
if (!initializeIfNeeded(path)) if (!initializeIfNeeded(path))
{
return std::make_unique<DiskWebServerDirectoryIterator>(std::move(dir_file_paths)); return std::make_unique<DiskWebServerDirectoryIterator>(std::move(dir_file_paths));
}
assertExists(path); assertExists(path);
for (const auto & [file_path, _] : object_storage.files) for (const auto & [file_path, _] : object_storage.files)
if (parentPath(file_path) == path) {
if (fs::path(parentPath(file_path)) / "" == fs::path(path) / "")
dir_file_paths.emplace_back(file_path); dir_file_paths.emplace_back(file_path);
}
LOG_TRACE(object_storage.log, "Iterate directory {} with {} files", path, dir_file_paths.size()); LOG_TRACE(object_storage.log, "Iterate directory {} with {} files", path, dir_file_paths.size());
return std::make_unique<DiskWebServerDirectoryIterator>(std::move(dir_file_paths)); return std::make_unique<DiskWebServerDirectoryIterator>(std::move(dir_file_paths));

View File

@ -19,7 +19,7 @@ private:
void assertExists(const std::string & path) const; void assertExists(const std::string & path) const;
bool initializeIfNeeded(const std::string & path) const; bool initializeIfNeeded(const std::string & path, std::optional<bool> throw_on_error = std::nullopt) const;
public: public:
explicit MetadataStorageFromStaticFilesWebServer(const WebObjectStorage & object_storage_); explicit MetadataStorageFromStaticFilesWebServer(const WebObjectStorage & object_storage_);

View File

@ -30,7 +30,6 @@ namespace ErrorCodes
{ {
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED; extern const int NOT_IMPLEMENTED;
extern const int FILE_DOESNT_EXIST;
extern const int NETWORK_ERROR; extern const int NETWORK_ERROR;
} }
@ -153,20 +152,6 @@ std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObject( /// NOLINT
std::optional<size_t>, std::optional<size_t>,
std::optional<size_t>) const std::optional<size_t>) const
{ {
const auto & path = object.absolute_path;
LOG_TRACE(log, "Read from path: {}", path);
auto iter = files.find(path);
if (iter == files.end())
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "File path {} does not exist", path);
auto fs_path = fs::path(url) / path;
auto remote_path = fs_path.parent_path() / (escapeForFileName(fs_path.stem()) + fs_path.extension().string());
remote_path = remote_path.string().substr(url.size());
StoredObjects objects;
objects.emplace_back(remote_path, iter->second.size);
auto read_buffer_creator = auto read_buffer_creator =
[this, read_settings] [this, read_settings]
(const std::string & path_, size_t read_until_position) -> std::shared_ptr<ReadBufferFromFileBase> (const std::string & path_, size_t read_until_position) -> std::shared_ptr<ReadBufferFromFileBase>
@ -179,7 +164,7 @@ std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObject( /// NOLINT
read_until_position); read_until_position);
}; };
auto web_impl = std::make_unique<ReadBufferFromRemoteFSGather>(std::move(read_buffer_creator), objects, read_settings); auto web_impl = std::make_unique<ReadBufferFromRemoteFSGather>(std::move(read_buffer_creator), StoredObjects{object}, read_settings);
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool) if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{ {

View File

@ -114,7 +114,7 @@ protected:
size_t size = 0; size_t size = 0;
}; };
using Files = std::unordered_map<String, FileData>; /// file path -> file data using Files = std::map<String, FileData>; /// file path -> file data
mutable Files files; mutable Files files;
String url; String url;

View File

@ -25,19 +25,17 @@ def cluster():
global uuids global uuids
for i in range(3): for i in range(3):
node1.query( node1.query(
""" CREATE TABLE data{} (id Int32) ENGINE = MergeTree() ORDER BY id SETTINGS storage_policy = 'def';""".format( f"CREATE TABLE data{i} (id Int32) ENGINE = MergeTree() ORDER BY id SETTINGS storage_policy = 'def', min_bytes_for_wide_part=1;"
i
)
) )
node1.query(
"INSERT INTO data{} SELECT number FROM numbers(500000 * {})".format( for _ in range(10):
i, i + 1 node1.query(
f"INSERT INTO data{i} SELECT number FROM numbers(500000 * {i+1})"
) )
) expected = node1.query(f"SELECT * FROM data{i} ORDER BY id")
expected = node1.query("SELECT * FROM data{} ORDER BY id".format(i))
metadata_path = node1.query( metadata_path = node1.query(
"SELECT data_paths FROM system.tables WHERE name='data{}'".format(i) f"SELECT data_paths FROM system.tables WHERE name='data{i}'"
) )
metadata_path = metadata_path[ metadata_path = metadata_path[
metadata_path.find("/") : metadata_path.rfind("/") + 1 metadata_path.find("/") : metadata_path.rfind("/") + 1
@ -84,7 +82,7 @@ def test_usage(cluster, node_name):
result = node2.query("SELECT * FROM test{} settings max_threads=20".format(i)) result = node2.query("SELECT * FROM test{} settings max_threads=20".format(i))
result = node2.query("SELECT count() FROM test{}".format(i)) result = node2.query("SELECT count() FROM test{}".format(i))
assert int(result) == 500000 * (i + 1) assert int(result) == 5000000 * (i + 1)
result = node2.query( result = node2.query(
"SELECT id FROM test{} WHERE id % 56 = 3 ORDER BY id".format(i) "SELECT id FROM test{} WHERE id % 56 = 3 ORDER BY id".format(i)
@ -123,7 +121,7 @@ def test_incorrect_usage(cluster):
) )
result = node2.query("SELECT count() FROM test0") result = node2.query("SELECT count() FROM test0")
assert int(result) == 500000 assert int(result) == 5000000
result = node2.query_and_get_error("ALTER TABLE test0 ADD COLUMN col1 Int32 first") result = node2.query_and_get_error("ALTER TABLE test0 ADD COLUMN col1 Int32 first")
assert "Table is read-only" in result assert "Table is read-only" in result
@ -169,7 +167,7 @@ def test_cache(cluster, node_name):
assert int(result) > 0 assert int(result) > 0
result = node2.query("SELECT count() FROM test{}".format(i)) result = node2.query("SELECT count() FROM test{}".format(i))
assert int(result) == 500000 * (i + 1) assert int(result) == 5000000 * (i + 1)
result = node2.query( result = node2.query(
"SELECT id FROM test{} WHERE id % 56 = 3 ORDER BY id".format(i) "SELECT id FROM test{} WHERE id % 56 = 3 ORDER BY id".format(i)