Fix issues with tests

This commit is contained in:
divanik 2024-10-24 13:28:32 +00:00
parent 860fcbdbef
commit a228e4fa89
6 changed files with 89 additions and 31 deletions

View File

@ -10,6 +10,7 @@
# include <Storages/ObjectStorage/DataLakes/HudiMetadata.h> # include <Storages/ObjectStorage/DataLakes/HudiMetadata.h>
# include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h> # include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
# include <Storages/ObjectStorage/DataLakes/IcebergMetadata.h> # include <Storages/ObjectStorage/DataLakes/IcebergMetadata.h>
# include <Storages/ObjectStorage/HDFS/Configuration.h>
# include <Storages/ObjectStorage/Local/Configuration.h> # include <Storages/ObjectStorage/Local/Configuration.h>
# include <Storages/ObjectStorage/S3/Configuration.h> # include <Storages/ObjectStorage/S3/Configuration.h>
# include <Storages/ObjectStorage/StorageObjectStorage.h> # include <Storages/ObjectStorage/StorageObjectStorage.h>
@ -46,6 +47,18 @@ public:
BaseStorageConfiguration::setPartitionColumns(current_metadata->getPartitionColumns()); BaseStorageConfiguration::setPartitionColumns(current_metadata->getPartitionColumns());
} }
std::optional<ColumnsDescription> tryGetTableStructureFromMetadata() const override
{
if (!current_metadata)
return std::nullopt;
auto schema_from_metadata = current_metadata->getTableSchema();
if (!schema_from_metadata.empty())
{
return ColumnsDescription(std::move(schema_from_metadata));
}
return std::nullopt;
}
private: private:
DataLakeMetadataPtr current_metadata; DataLakeMetadataPtr current_metadata;
@ -77,6 +90,7 @@ private:
using StorageS3IcebergConfiguration = DataLakeConfiguration<StorageS3Configuration, IcebergMetadata>; using StorageS3IcebergConfiguration = DataLakeConfiguration<StorageS3Configuration, IcebergMetadata>;
using StorageAzureIcebergConfiguration = DataLakeConfiguration<StorageAzureConfiguration, IcebergMetadata>; using StorageAzureIcebergConfiguration = DataLakeConfiguration<StorageAzureConfiguration, IcebergMetadata>;
using StorageLocalIcebergConfiguration = DataLakeConfiguration<StorageLocalConfiguration, IcebergMetadata>; using StorageLocalIcebergConfiguration = DataLakeConfiguration<StorageLocalConfiguration, IcebergMetadata>;
using StorageHDFSIcebergConfiguration = DataLakeConfiguration<StorageHDFSConfiguration, IcebergMetadata>;
using StorageS3DeltaLakeConfiguration = DataLakeConfiguration<StorageS3Configuration, DeltaLakeMetadata>; using StorageS3DeltaLakeConfiguration = DataLakeConfiguration<StorageS3Configuration, DeltaLakeMetadata>;
using StorageS3HudiConfiguration = DataLakeConfiguration<StorageS3Configuration, HudiMetadata>; using StorageS3HudiConfiguration = DataLakeConfiguration<StorageS3Configuration, HudiMetadata>;

View File

@ -14,14 +14,15 @@
#include <Processors/Executors/PullingPipelineExecutor.h> #include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Transforms/ExtractColumnsTransform.h> #include <Processors/Transforms/ExtractColumnsTransform.h>
#include <Storages/StorageFactory.h>
#include <Storages/Cache/SchemaCache.h> #include <Storages/Cache/SchemaCache.h>
#include <Storages/VirtualColumnUtils.h>
#include <Storages/ObjectStorage/Utils.h>
#include <Storages/NamedCollectionsHelpers.h> #include <Storages/NamedCollectionsHelpers.h>
#include <Storages/ObjectStorage/ReadBufferIterator.h>
#include <Storages/ObjectStorage/StorageObjectStorageSink.h> #include <Storages/ObjectStorage/StorageObjectStorageSink.h>
#include <Storages/ObjectStorage/StorageObjectStorageSource.h> #include <Storages/ObjectStorage/StorageObjectStorageSource.h>
#include <Storages/ObjectStorage/ReadBufferIterator.h> #include <Storages/ObjectStorage/Utils.h>
#include <Storages/StorageFactory.h>
#include <Storages/VirtualColumnUtils.h>
#include "Storages/ColumnsDescription.h"
namespace DB namespace DB
@ -252,6 +253,11 @@ ReadFromFormatInfo StorageObjectStorage::Configuration::prepareReadingFromFormat
return DB::prepareReadingFromFormat(requested_columns, storage_snapshot, local_context, supports_subset_of_columns); return DB::prepareReadingFromFormat(requested_columns, storage_snapshot, local_context, supports_subset_of_columns);
} }
std::optional<ColumnsDescription> StorageObjectStorage::Configuration::tryGetTableStructureFromMetadata() const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method tryGetTableStructureFromMetadata is not implemented for basic configuration");
}
void StorageObjectStorage::read( void StorageObjectStorage::read(
QueryPlan & query_plan, QueryPlan & query_plan,
const Names & column_names, const Names & column_names,
@ -409,6 +415,16 @@ ColumnsDescription StorageObjectStorage::resolveSchemaFromData(
std::string & sample_path, std::string & sample_path,
const ContextPtr & context) const ContextPtr & context)
{ {
if (configuration->isDataLakeConfiguration())
{
configuration->update(object_storage, context);
auto table_structure = configuration->tryGetTableStructureFromMetadata();
if (table_structure)
{
return table_structure.value();
}
}
ObjectInfos read_keys; ObjectInfos read_keys;
auto iterator = createReadBufferIterator(object_storage, configuration, format_settings, read_keys, context); auto iterator = createReadBufferIterator(object_storage, configuration, format_settings, read_keys, context);
auto schema = readSchemaFromFormat(configuration->format, format_settings, *iterator, context); auto schema = readSchemaFromFormat(configuration->format, format_settings, *iterator, context);
@ -489,10 +505,17 @@ void StorageObjectStorage::Configuration::initialize(
if (configuration.format == "auto") if (configuration.format == "auto")
{ {
configuration.format = FormatFactory::instance().tryGetFormatFromFileName( if (configuration.isDataLakeConfiguration())
configuration.isArchive() {
? configuration.getPathInArchive() configuration.format = "Parquet";
: configuration.getPath()).value_or("auto"); }
else
{
configuration.format
= FormatFactory::instance()
.tryGetFormatFromFileName(configuration.isArchive() ? configuration.getPathInArchive() : configuration.getPath())
.value_or("auto");
}
} }
else else
FormatFactory::instance().checkFormatName(configuration.format); FormatFactory::instance().checkFormatName(configuration.format);

View File

@ -1,12 +1,13 @@
#pragma once #pragma once
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Common/threadPoolCallbackRunner.h>
#include <Core/SchemaInferenceMode.h> #include <Core/SchemaInferenceMode.h>
#include <Storages/IStorage.h> #include <Disks/ObjectStorages/IObjectStorage.h>
#include <Parsers/IAST_fwd.h> #include <Parsers/IAST_fwd.h>
#include <Storages/prepareReadingFromFormat.h>
#include <Processors/Formats/IInputFormat.h> #include <Processors/Formats/IInputFormat.h>
#include <Storages/IStorage.h>
#include <Storages/ObjectStorage/DataLakes/PartitionColumns.h> #include <Storages/ObjectStorage/DataLakes/PartitionColumns.h>
#include <Storages/prepareReadingFromFormat.h>
#include <Common/threadPoolCallbackRunner.h>
#include "Storages/ColumnsDescription.h"
namespace DB namespace DB
{ {
@ -208,6 +209,8 @@ public:
bool supports_subset_of_columns, bool supports_subset_of_columns,
ContextPtr local_context); ContextPtr local_context);
virtual std::optional<ColumnsDescription> tryGetTableStructureFromMetadata() const;
String format = "auto"; String format = "auto";
String compression_method = "auto"; String compression_method = "auto";
String structure = "auto"; String structure = "auto";

View File

@ -153,6 +153,7 @@ void registerStorageObjectStorage(StorageFactory & factory)
void registerStorageIceberg(StorageFactory & factory) void registerStorageIceberg(StorageFactory & factory)
{ {
#if USE_AWS_S3
factory.registerStorage( factory.registerStorage(
"Iceberg", "Iceberg",
[&](const StorageFactory::Arguments & args) [&](const StorageFactory::Arguments & args)
@ -182,7 +183,8 @@ void registerStorageIceberg(StorageFactory & factory)
.supports_schema_inference = true, .supports_schema_inference = true,
.source_access_type = AccessType::S3, .source_access_type = AccessType::S3,
}); });
#endif
#if USE_AZURE_BLOB_STORAGE
factory.registerStorage( factory.registerStorage(
"IcebergAzure", "IcebergAzure",
[&](const StorageFactory::Arguments & args) [&](const StorageFactory::Arguments & args)
@ -197,7 +199,7 @@ void registerStorageIceberg(StorageFactory & factory)
.supports_schema_inference = true, .supports_schema_inference = true,
.source_access_type = AccessType::AZURE, .source_access_type = AccessType::AZURE,
}); });
#endif
factory.registerStorage( factory.registerStorage(
"IcebergLocal", "IcebergLocal",
[&](const StorageFactory::Arguments & args) [&](const StorageFactory::Arguments & args)
@ -212,6 +214,22 @@ void registerStorageIceberg(StorageFactory & factory)
.supports_schema_inference = true, .supports_schema_inference = true,
.source_access_type = AccessType::FILE, .source_access_type = AccessType::FILE,
}); });
#if USE_HDFS
factory.registerStorage(
"IcebergHDFS",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageHDFSIcebergConfiguration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return createStorageObjectStorage(args, configuration, args.getLocalContext());
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::HDFS,
});
#endif
} }
#endif #endif

View File

@ -251,6 +251,14 @@ void registerTableFunctionIceberg(TableFunctionFactory & factory)
.categories{"DataLake"}}, .categories{"DataLake"}},
.allow_readonly = false}); .allow_readonly = false});
# endif # endif
# if USE_HDFS
factory.registerFunction<TableFunctionIcebergHDFS>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on HDFS virtual filesystem.)",
.examples{{"icebergHDFS", "SELECT * FROM icebergHDFS(url)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
# endif
factory.registerFunction<TableFunctionIcebergLocal>( factory.registerFunction<TableFunctionIcebergLocal>(
{.documentation {.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored locally.)", = {.description = R"(The table function can be used to read the Iceberg table stored locally.)",
@ -297,21 +305,4 @@ void registerDataLakeTableFunctions(TableFunctionFactory & factory)
registerTableFunctionHudi(factory); registerTableFunctionHudi(factory);
#endif #endif
} }
#if USE_AVRO
# if USE_AWS_S3
template class TableFunctionObjectStorage<IcebergDefinition, StorageS3IcebergConfiguration>;
template class TableFunctionObjectStorage<IcebergS3Definition, StorageS3IcebergConfiguration>;
# endif
# if USE_AZURE_BLOB_STORAGE
template class TableFunctionObjectStorage<IcebergAzureDefinition, StorageAzureIcebergConfiguration>;
# endif
template class TableFunctionObjectStorage<IcebergLocalDefinition, StorageLocalIcebergConfiguration>;
#endif
#if USE_AWS_S3
# if USE_PARQUET
template class TableFunctionObjectStorage<DeltaLakeDefinition, StorageS3DeltaLakeConfiguration>;
# endif
template class TableFunctionObjectStorage<HudiDefinition, StorageS3HudiConfiguration>;
#endif
} }

View File

@ -86,6 +86,12 @@ struct IcebergLocalDefinition
static constexpr auto storage_type_name = "Local"; static constexpr auto storage_type_name = "Local";
}; };
struct IcebergHDFSDefinition
{
static constexpr auto name = "icebergHDFS";
static constexpr auto storage_type_name = "HDFS";
};
struct DeltaLakeDefinition struct DeltaLakeDefinition
{ {
static constexpr auto name = "deltaLake"; static constexpr auto name = "deltaLake";
@ -184,6 +190,9 @@ using TableFunctionIcebergS3 = TableFunctionObjectStorage<IcebergS3Definition, S
# if USE_AZURE_BLOB_STORAGE # if USE_AZURE_BLOB_STORAGE
using TableFunctionIcebergAzure = TableFunctionObjectStorage<IcebergAzureDefinition, StorageAzureIcebergConfiguration>; using TableFunctionIcebergAzure = TableFunctionObjectStorage<IcebergAzureDefinition, StorageAzureIcebergConfiguration>;
# endif # endif
# if USE_HDFS
using TableFunctionIcebergHDFS = TableFunctionObjectStorage<IcebergHDFSDefinition, StorageHDFSIcebergConfiguration>;
# endif
using TableFunctionIcebergLocal = TableFunctionObjectStorage<IcebergLocalDefinition, StorageLocalIcebergConfiguration>; using TableFunctionIcebergLocal = TableFunctionObjectStorage<IcebergLocalDefinition, StorageLocalIcebergConfiguration>;
#endif #endif
#if USE_AWS_S3 #if USE_AWS_S3