Fix hdfs assertion

This commit is contained in:
kssenii 2024-05-23 16:39:16 +02:00
parent dd7f3d1ba2
commit 47578772e4
8 changed files with 32 additions and 13 deletions

View File

@ -36,7 +36,7 @@ public:
void setPaths(const Paths & paths) override { blobs_paths = paths; }
String getNamespace() const override { return container; }
String getDataSourceDescription() override { return std::filesystem::path(connection_url) / container; }
String getDataSourceDescription() const override { return std::filesystem::path(connection_url) / container; }
StorageObjectStorage::QuerySettings getQuerySettings(const ContextPtr &) const override;
void check(ContextPtr context) const override;

View File

@ -31,7 +31,7 @@ public:
std::string getPathWithoutGlobs() const override;
String getNamespace() const override { return ""; }
String getDataSourceDescription() override { return url; }
String getDataSourceDescription() const override { return url; }
StorageObjectStorage::QuerySettings getQuerySettings(const ContextPtr &) const override;
void check(ContextPtr context) const override;

View File

@ -37,8 +37,7 @@ ReadBufferIterator::ReadBufferIterator(
SchemaCache::Key ReadBufferIterator::getKeyForSchemaCache(const ObjectInfo & object_info, const String & format_name) const
{
chassert(!object_info.getPath().starts_with("/"));
auto source = std::filesystem::path(configuration->getDataSourceDescription()) / object_info.getPath();
auto source = StorageObjectStorageSource::getUniqueStoragePathIdentifier(*configuration, object_info);
return DB::getKeyForSchemaCache(source, format_name, format_settings, getContext());
}
@ -51,8 +50,7 @@ SchemaCache::Keys ReadBufferIterator::getKeysForSchemaCache() const
std::back_inserter(sources),
[&](const auto & elem)
{
chassert(!elem->getPath().starts_with("/"));
return std::filesystem::path(configuration->getDataSourceDescription()) / elem->getPath();
return StorageObjectStorageSource::getUniqueStoragePathIdentifier(*configuration, *elem);
});
return DB::getKeysForSchemaCache(sources, *format, format_settings, getContext());
}

View File

@ -50,7 +50,7 @@ static const std::unordered_set<std::string_view> optional_configuration_keys =
"no_sign_request"
};
String StorageS3Configuration::getDataSourceDescription()
String StorageS3Configuration::getDataSourceDescription() const
{
return std::filesystem::path(url.uri.getHost() + std::to_string(url.uri.getPort())) / url.bucket;
}

View File

@ -31,7 +31,7 @@ public:
void setPaths(const Paths & paths) override { keys = paths; }
String getNamespace() const override { return url.bucket; }
String getDataSourceDescription() override;
String getDataSourceDescription() const override;
StorageObjectStorage::QuerySettings getQuerySettings(const ContextPtr &) const override;
bool isArchive() const override { return url.archive_pattern.has_value(); }

View File

@ -161,7 +161,7 @@ public:
virtual const Paths & getPaths() const = 0;
virtual void setPaths(const Paths & paths) = 0;
virtual String getDataSourceDescription() = 0;
virtual String getDataSourceDescription() const = 0;
virtual String getNamespace() const = 0;
virtual StorageObjectStorage::QuerySettings getQuerySettings(const ContextPtr &) const = 0;

View File

@ -82,6 +82,21 @@ void StorageObjectStorageSource::setKeyCondition(const ActionsDAGPtr & filter_ac
setKeyConditionImpl(filter_actions_dag, context_, read_from_format_info.format_header);
}
std::string StorageObjectStorageSource::getUniqueStoragePathIdentifier(
const Configuration & configuration,
const ObjectInfo & object_info,
bool include_connection_info)
{
auto path = object_info.getPath();
if (path.starts_with("/"))
path = path.substr(1);
if (include_connection_info)
return fs::path(configuration.getDataSourceDescription()) / path;
else
return fs::path(configuration.getNamespace()) / path;
}
std::shared_ptr<StorageObjectStorageSource::IIterator> StorageObjectStorageSource::createFileIterator(
ConfigurationPtr configuration,
ObjectStoragePtr object_storage,
@ -183,7 +198,7 @@ Chunk StorageObjectStorageSource::generate()
VirtualColumnUtils::addRequestedPathFileAndSizeVirtualsToChunk(
chunk,
read_from_format_info.requested_virtual_columns,
fs::path(configuration->getNamespace()) / reader.getObjectInfo().getPath(),
getUniqueStoragePathIdentifier(*configuration, reader.getObjectInfo(), false),
object_info.metadata->size_bytes, &filename);
return chunk;
@ -212,7 +227,7 @@ Chunk StorageObjectStorageSource::generate()
void StorageObjectStorageSource::addNumRowsToCache(const ObjectInfo & object_info, size_t num_rows)
{
const auto cache_key = getKeyForSchemaCache(
fs::path(configuration->getDataSourceDescription()) / object_info.getPath(),
getUniqueStoragePathIdentifier(*configuration, object_info),
configuration->format,
format_settings,
getContext());
@ -222,7 +237,7 @@ void StorageObjectStorageSource::addNumRowsToCache(const ObjectInfo & object_inf
std::optional<size_t> StorageObjectStorageSource::tryGetNumRowsFromCache(const ObjectInfo & object_info)
{
const auto cache_key = getKeyForSchemaCache(
fs::path(configuration->getDataSourceDescription()) / object_info.getPath(),
getUniqueStoragePathIdentifier(*configuration, object_info),
configuration->format,
format_settings,
getContext());
@ -511,7 +526,7 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::GlobIterator::ne
for (const auto & object_info : new_batch)
{
chassert(object_info);
paths.push_back(fs::path(configuration->getNamespace()) / object_info->getPath());
paths.push_back(getUniqueStoragePathIdentifier(*configuration, *object_info, false));
}
VirtualColumnUtils::filterByPathOrFile(new_batch, paths, filter_dag, virtual_columns, getContext());

View File

@ -17,6 +17,7 @@ class StorageObjectStorageSource : public SourceWithKeyCondition, WithContext
{
friend class StorageS3QueueSource;
public:
using Configuration = StorageObjectStorage::Configuration;
using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr;
using ObjectInfo = StorageObjectStorage::ObjectInfo;
using ObjectInfos = StorageObjectStorage::ObjectInfos;
@ -58,6 +59,11 @@ public:
ObjectInfos * read_keys,
std::function<void(FileProgress)> file_progress_callback = {});
static std::string getUniqueStoragePathIdentifier(
const Configuration & configuration,
const ObjectInfo & object_info,
bool include_connection_info = true);
protected:
const String name;
ObjectStoragePtr object_storage;