Merge pull request #62613 from ClickHouse/stream_system_remote_data_paths

Stream chunks from system.remote_data_paths
This commit is contained in:
Alexander Gololobov 2024-04-16 05:27:08 +00:00 committed by GitHub
commit ea42c98dd7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 315 additions and 132 deletions

View File

@ -320,15 +320,6 @@ public:
{}
};
virtual void getRemotePathsRecursive(
const String &, std::vector<LocalPathWithObjectStoragePaths> &, const std::function<bool(const String &)> & /* skip_predicate */)
{
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
"Method `getRemotePathsRecursive() not implemented for disk: {}`",
getDataSourceDescription().toString());
}
/// Batch request to remove multiple files.
/// May be much faster for blob storage.
/// Second bool param is a flag to remove (true) or keep (false) shared data on S3.

View File

@ -23,10 +23,6 @@ namespace DB
namespace ErrorCodes
{
extern const int INCORRECT_DISK_INDEX;
extern const int FILE_DOESNT_EXIST;
extern const int ATTEMPT_TO_READ_AFTER_EOF;
extern const int CANNOT_READ_ALL_DATA;
extern const int DIRECTORY_DOESNT_EXIST;
}
@ -91,67 +87,6 @@ StoredObjects DiskObjectStorage::getStorageObjects(const String & local_path) co
return metadata_storage->getStorageObjects(local_path);
}
void DiskObjectStorage::getRemotePathsRecursive(
const String & local_path,
std::vector<LocalPathWithObjectStoragePaths> & paths_map,
const std::function<bool(const String &)> & skip_predicate)
{
if (!metadata_storage->exists(local_path))
return;
if (skip_predicate && skip_predicate(local_path))
return;
/// Protect against concurrent delition of files (for example because of a merge).
if (metadata_storage->isFile(local_path))
{
try
{
paths_map.emplace_back(local_path, getStorageObjects(local_path));
}
catch (const Exception & e)
{
/// Unfortunately in rare cases it can happen when files disappear
/// or can be empty in case of operation interruption (like cancelled metadata fetch)
if (e.code() == ErrorCodes::FILE_DOESNT_EXIST ||
e.code() == ErrorCodes::DIRECTORY_DOESNT_EXIST ||
e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF ||
e.code() == ErrorCodes::CANNOT_READ_ALL_DATA)
return;
throw;
}
}
else
{
DirectoryIteratorPtr it;
try
{
it = iterateDirectory(local_path);
}
catch (const Exception & e)
{
/// Unfortunately in rare cases it can happen when files disappear
/// or can be empty in case of operation interruption (like cancelled metadata fetch)
if (e.code() == ErrorCodes::FILE_DOESNT_EXIST ||
e.code() == ErrorCodes::DIRECTORY_DOESNT_EXIST ||
e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF ||
e.code() == ErrorCodes::CANNOT_READ_ALL_DATA)
return;
throw;
}
catch (const fs::filesystem_error & e)
{
if (e.code() == std::errc::no_such_file_or_directory)
return;
throw;
}
for (; it->isValid(); it->next())
DiskObjectStorage::getRemotePathsRecursive(fs::path(local_path) / it->name(), paths_map, skip_predicate);
}
}
bool DiskObjectStorage::exists(const String & path) const
{

View File

@ -48,11 +48,6 @@ public:
StoredObjects getStorageObjects(const String & local_path) const override;
void getRemotePathsRecursive(
const String & local_path,
std::vector<LocalPathWithObjectStoragePaths> & paths_map,
const std::function<bool(const String &)> & skip_predicate) override;
const std::string & getCacheName() const override { return object_storage->getCacheName(); }
std::optional<UInt64> getTotalSpace() const override { return {}; }

View File

@ -6,16 +6,146 @@
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Disks/IDisk.h>
#include <Disks/ObjectStorages/IMetadataStorage.h>
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Interpreters/Context.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Interpreters/ProcessList.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/SourceStepWithFilter.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
namespace fs = std::filesystem;
namespace DB
{
namespace ErrorCodes
{
extern const int FILE_DOESNT_EXIST;
extern const int ATTEMPT_TO_READ_AFTER_EOF;
extern const int CANNOT_READ_ALL_DATA;
extern const int DIRECTORY_DOESNT_EXIST;
}
class SystemRemoteDataPathsSource : public ISource
{
public:
SystemRemoteDataPathsSource(
const DisksMap & disks_,
Block header_,
UInt64 max_block_size_,
ContextPtr context_)
: ISource(header_)
, max_block_size(max_block_size_)
, context(std::move(context_))
{
for (const auto & disk : disks_)
{
if (disk.second->isRemote())
disks.push_back(disk);
}
/// Position at the first disk
nextDisk();
}
String getName() const override { return "SystemRemoteDataPaths"; }
protected:
Chunk generate() override;
private:
/// Moves to the next file or disk in DFS order, if no more files or disks returns false
bool nextFile();
/// Moves to the next disk in the list, if no more disks returns false
bool nextDisk();
/// Returns full local path of the current file
fs::path getCurrentPath() const
{
fs::path path;
for (const auto & dir : paths_stack)
path /= dir.names[dir.position].name;
return path;
}
/// Returns the skip predicate for the current path
const auto & getCurrentSkipPredicate() const
{
chassert(!paths_stack.empty());
chassert(paths_stack.back().position < static_cast<ssize_t>(paths_stack.back().names.size()));
return paths_stack.back().names[paths_stack.back().position].skip_predicate;
}
static bool skipPredicateForShadowDir(const String & local_path)
{
// `shadow/{backup_name}/revision.txt` is not an object metadata file
const auto path = fs::path(local_path);
return path.filename() == "revision.txt" &&
path.parent_path().has_parent_path() &&
path.parent_path().parent_path().filename() == "shadow";
}
const UInt64 max_block_size;
std::vector<std::pair<std::string, DiskPtr>> disks;
ContextPtr context;
/// Directory entry with optional predicate to skip some files
struct NameAndFilter
{
std::string name;
std::function<bool(const String &)> skip_predicate; /// Skip files that match the predicate in the subtree
};
/// Directory contents
struct DirListingAndPosition
{
std::vector<NameAndFilter> names;
ssize_t position = -1; /// Index of the name we a currently pointing at, -1 means not started yet
};
ssize_t current_disk = -1; /// Start from -1 to move to the first disk on the first call to nextDisk()
std::vector<DirListingAndPosition> paths_stack; /// Represents the current path for DFS order traversal
};
class ReadFromSystemRemoteDataPaths final : public SourceStepWithFilter
{
public:
ReadFromSystemRemoteDataPaths(
DisksMap && disks_,
const Names & column_names_,
const SelectQueryInfo & query_info_,
const StorageSnapshotPtr & storage_snapshot_,
const ContextPtr & context_,
const Block & header,
UInt64 max_block_size_)
: SourceStepWithFilter(
{.header = header},
column_names_,
query_info_,
storage_snapshot_,
context_)
, storage_limits(query_info.storage_limits)
, max_block_size(max_block_size_)
, disks(std::move(disks_))
{
}
String getName() const override { return "ReadFromSystemRemoteDataPaths"; }
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings) override;
/// TODO: void applyFilters(ActionDAGNodes added_filter_nodes) can be implemented to filter out disk names
private:
std::shared_ptr<const StorageLimitsList> storage_limits;
const UInt64 max_block_size;
DisksMap disks;
};
StorageSystemRemoteDataPaths::StorageSystemRemoteDataPaths(const StorageID & table_id_)
: IStorage(table_id_)
{
@ -34,16 +164,121 @@ StorageSystemRemoteDataPaths::StorageSystemRemoteDataPaths(const StorageID & tab
setInMemoryMetadata(storage_metadata);
}
Pipe StorageSystemRemoteDataPaths::read(
void StorageSystemRemoteDataPaths::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & /*query_info*/,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum /*processed_stage*/,
const size_t /*max_block_size*/,
const size_t max_block_size,
const size_t /*num_streams*/)
{
storage_snapshot->check(column_names);
auto header = storage_snapshot->metadata->getSampleBlockWithVirtuals(getVirtualsList());
auto read_step = std::make_unique<ReadFromSystemRemoteDataPaths>(
context->getDisksMap(),
column_names,
query_info,
storage_snapshot,
context,
header,
max_block_size);
query_plan.addStep(std::move(read_step));
}
void ReadFromSystemRemoteDataPaths::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /*settings*/)
{
const auto & header = getOutputStream().header;
auto source = std::make_shared<SystemRemoteDataPathsSource>(std::move(disks), header, max_block_size, context);
source->setStorageLimits(storage_limits);
processors.emplace_back(source);
pipeline.init(Pipe(std::move(source)));
}
bool SystemRemoteDataPathsSource::nextDisk()
{
while (current_disk < static_cast<ssize_t>(disks.size()))
{
paths_stack.clear();
++current_disk;
if (current_disk >= static_cast<ssize_t>(disks.size()))
break;
auto & current = paths_stack.emplace_back();
/// Add dirs that we want to traverse. It's ok if some of them don't exist because traversal logic handles
/// cases when children of a directory get deleted while traversal is running.
current.names.push_back({"store", nullptr});
current.names.push_back({"data", nullptr});
if (context->getSettingsRef().traverse_shadow_remote_data_paths)
current.names.push_back({"shadow", skipPredicateForShadowDir});
/// Start and move to the first file
current.position = -1;
if (nextFile())
return true;
}
return false;
}
bool SystemRemoteDataPathsSource::nextFile()
{
while (true)
{
while (!paths_stack.empty())
{
auto & current = paths_stack.back();
++current.position;
/// Move to the next child in the current directory
if (current.position < static_cast<ssize_t>(current.names.size()))
break;
/// Move up to the parent directory if this was the last child
paths_stack.pop_back();
}
/// Done with the current disk?
if (paths_stack.empty())
return false;
try
{
const auto & disk = disks[current_disk].second;
/// Stop if current path is a file
if (disk->isFile(getCurrentPath()))
return true;
/// If current path is a directory list its contents and step into it
std::vector<std::string> children;
disk->listFiles(getCurrentPath(), children);
/// Use current predicate for all children
const auto & skip_predicate = getCurrentSkipPredicate();
DirListingAndPosition dir;
for (const auto & child : children)
dir.names.push_back({child, skip_predicate});
dir.position = -1;
paths_stack.emplace_back(std::move(dir));
}
catch (const Exception & e)
{
/// Files or directories can disappear due to concurrent operations
if (e.code() == ErrorCodes::FILE_DOESNT_EXIST ||
e.code() == ErrorCodes::DIRECTORY_DOESNT_EXIST)
continue;
throw;
}
}
}
Chunk SystemRemoteDataPathsSource::generate()
{
/// Finish if all disks are processed
if (current_disk >= static_cast<ssize_t>(disks.size()))
return {};
MutableColumnPtr col_disk_name = ColumnString::create();
MutableColumnPtr col_base_path = ColumnString::create();
@ -54,61 +289,87 @@ Pipe StorageSystemRemoteDataPaths::read(
MutableColumnPtr col_namespace = ColumnString::create();
MutableColumnPtr col_cache_paths = ColumnArray::create(ColumnString::create());
auto disks = context->getDisksMap();
for (const auto & [disk_name, disk] : disks)
QueryStatusPtr query_status = context->getProcessListElement();
size_t row_count = 0;
do
{
if (disk->isRemote())
if (query_status)
query_status->checkTimeLimit();
/// Check if the block is big enough already
if (max_block_size > 0 && row_count > 0)
{
std::vector<IDisk::LocalPathWithObjectStoragePaths> remote_paths_by_local_path;
disk->getRemotePathsRecursive("store", remote_paths_by_local_path, /* skip_predicate = */ {});
disk->getRemotePathsRecursive("data", remote_paths_by_local_path, /* skip_predicate = */ {});
if (context->getSettingsRef().traverse_shadow_remote_data_paths)
disk->getRemotePathsRecursive(
"shadow",
remote_paths_by_local_path,
[](const String & local_path)
{
// `shadow/{backup_name}/revision.txt` is not an object metadata file
const auto path = fs::path(local_path);
return path.filename() == "revision.txt" &&
path.parent_path().has_parent_path() &&
path.parent_path().parent_path().filename() == "shadow";
});
size_t total_size =
col_disk_name->byteSize() +
col_base_path->byteSize() +
col_cache_base_path->byteSize() +
col_local_path->byteSize() +
col_remote_path->byteSize() +
col_size->byteSize() +
col_namespace->byteSize() +
col_cache_paths->byteSize();
if (total_size > max_block_size)
break;
}
FileCachePtr cache;
const auto & [disk_name, disk] = disks[current_disk];
auto local_path = getCurrentPath();
if (disk->supportsCache())
cache = FileCacheFactory::instance().getByName(disk->getCacheName())->cache;
const auto & skip_predicate = getCurrentSkipPredicate();
if (skip_predicate && skip_predicate(local_path))
continue;
for (const auto & [local_path, storage_objects] : remote_paths_by_local_path)
FileCachePtr cache;
if (disk->supportsCache())
cache = FileCacheFactory::instance().getByName(disk->getCacheName())->cache;
StoredObjects storage_objects;
try
{
storage_objects = disk->getMetadataStorage()->getStorageObjects(local_path);
}
catch (const Exception & e)
{
/// Unfortunately in rare cases it can happen when files disappear
/// or can be empty in case of operation interruption (like cancelled metadata fetch)
if (e.code() == ErrorCodes::FILE_DOESNT_EXIST ||
e.code() == ErrorCodes::DIRECTORY_DOESNT_EXIST ||
e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF ||
e.code() == ErrorCodes::CANNOT_READ_ALL_DATA)
continue;
throw;
}
for (const auto & object : storage_objects)
{
++row_count;
col_disk_name->insert(disk_name);
col_base_path->insert(disk->getPath());
if (cache)
col_cache_base_path->insert(cache->getBasePath());
else
col_cache_base_path->insertDefault();
col_local_path->insert(local_path.string());
col_remote_path->insert(object.remote_path);
col_size->insert(object.bytes_size);
col_namespace->insertDefault();
if (cache)
{
for (const auto & object : storage_objects)
{
col_disk_name->insert(disk_name);
col_base_path->insert(disk->getPath());
if (cache)
col_cache_base_path->insert(cache->getBasePath());
else
col_cache_base_path->insertDefault();
col_local_path->insert(local_path);
col_remote_path->insert(object.remote_path);
col_size->insert(object.bytes_size);
col_namespace->insertDefault();
if (cache)
{
auto cache_paths = cache->tryGetCachePaths(cache->createKeyForPath(object.remote_path));
col_cache_paths->insert(Array(cache_paths.begin(), cache_paths.end()));
}
else
{
col_cache_paths->insertDefault();
}
}
auto cache_paths = cache->tryGetCachePaths(cache->createKeyForPath(object.remote_path));
col_cache_paths->insert(Array(cache_paths.begin(), cache_paths.end()));
}
else
{
col_cache_paths->insertDefault();
}
}
}
while (nextFile() || nextDisk());
Columns res_columns;
res_columns.emplace_back(std::move(col_disk_name));
@ -123,7 +384,7 @@ Pipe StorageSystemRemoteDataPaths::read(
UInt64 num_rows = res_columns.at(0)->size();
Chunk chunk(std::move(res_columns), num_rows);
return Pipe(std::make_shared<SourceFromSingleChunk>(storage_snapshot->metadata->getSampleBlock(), std::move(chunk)));
return chunk;
}
}

View File

@ -1,6 +1,6 @@
#pragma once
#include <Storages/System/IStorageSystemOneBlock.h>
#include <Storages/IStorage.h>
namespace DB
{
@ -14,7 +14,8 @@ public:
bool isSystemStorage() const override { return true; }
Pipe read(
void read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,