Resolve some issues

This commit is contained in:
divanik 2024-10-24 13:56:26 +00:00
parent a228e4fa89
commit a3f0d27d23
10 changed files with 36 additions and 47 deletions

View File

@ -1,23 +1,19 @@
#pragma once #pragma once
#include "config.h" #include <Storages/IStorage.h>
#include <Storages/ObjectStorage/Azure/Configuration.h>
#include <Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h>
#include <Storages/ObjectStorage/DataLakes/HudiMetadata.h>
#include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
#include <Storages/ObjectStorage/DataLakes/IcebergMetadata.h>
#include <Storages/ObjectStorage/HDFS/Configuration.h>
#include <Storages/ObjectStorage/Local/Configuration.h>
#include <Storages/ObjectStorage/S3/Configuration.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/StorageFactory.h>
#include <Common/logger_useful.h>
#if USE_AVRO #include <memory>
# include <Storages/IStorage.h>
# include <Storages/ObjectStorage/Azure/Configuration.h>
# include <Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h>
# include <Storages/ObjectStorage/DataLakes/HudiMetadata.h>
# include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
# include <Storages/ObjectStorage/DataLakes/IcebergMetadata.h>
# include <Storages/ObjectStorage/HDFS/Configuration.h>
# include <Storages/ObjectStorage/Local/Configuration.h>
# include <Storages/ObjectStorage/S3/Configuration.h>
# include <Storages/ObjectStorage/StorageObjectStorage.h>
# include <Storages/StorageFactory.h>
# include <Common/logger_useful.h>
# include <memory>
namespace DB namespace DB
@ -96,5 +92,3 @@ using StorageS3HudiConfiguration = DataLakeConfiguration<StorageS3Configuration,
} }
#endif

View File

@ -56,17 +56,17 @@ namespace ErrorCodes
struct DeltaLakeMetadataImpl struct DeltaLakeMetadataImpl
{ {
using ConfigurationObservePtr = DeltaLakeMetadata::ConfigurationObservePtr; using ConfigurationObserverPtr = DeltaLakeMetadata::ConfigurationObserverPtr;
ObjectStoragePtr object_storage; ObjectStoragePtr object_storage;
ConfigurationObservePtr configuration; ConfigurationObserverPtr configuration;
ContextPtr context; ContextPtr context;
/** /**
* Useful links: * Useful links:
* - https://github.com/delta-io/delta/blob/master/PROTOCOL.md#data-files * - https://github.com/delta-io/delta/blob/master/PROTOCOL.md#data-files
*/ */
DeltaLakeMetadataImpl(ObjectStoragePtr object_storage_, ConfigurationObservePtr configuration_, ContextPtr context_) DeltaLakeMetadataImpl(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_)
: object_storage(object_storage_), configuration(configuration_), context(context_) : object_storage(object_storage_), configuration(configuration_), context(context_)
{ {
} }
@ -687,7 +687,7 @@ struct DeltaLakeMetadataImpl
LoggerPtr log = getLogger("DeltaLakeMetadataParser"); LoggerPtr log = getLogger("DeltaLakeMetadataParser");
}; };
DeltaLakeMetadata::DeltaLakeMetadata(ObjectStoragePtr object_storage_, ConfigurationObservePtr configuration_, ContextPtr context_) DeltaLakeMetadata::DeltaLakeMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_)
{ {
auto impl = DeltaLakeMetadataImpl(object_storage_, configuration_, context_); auto impl = DeltaLakeMetadataImpl(object_storage_, configuration_, context_);
auto result = impl.processMetadataFiles(); auto result = impl.processMetadataFiles();

View File

@ -12,10 +12,10 @@ namespace DB
class DeltaLakeMetadata final : public IDataLakeMetadata class DeltaLakeMetadata final : public IDataLakeMetadata
{ {
public: public:
using ConfigurationObservePtr = StorageObjectStorage::ConfigurationObservePtr; using ConfigurationObserverPtr = StorageObjectStorage::ConfigurationObserverPtr;
static constexpr auto name = "DeltaLake"; static constexpr auto name = "DeltaLake";
DeltaLakeMetadata(ObjectStoragePtr object_storage_, ConfigurationObservePtr configuration_, ContextPtr context_); DeltaLakeMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_);
Strings getDataFiles() const override { return data_files; } Strings getDataFiles() const override { return data_files; }
@ -33,7 +33,7 @@ public:
&& data_files == deltalake_metadata->data_files; && data_files == deltalake_metadata->data_files;
} }
static DataLakeMetadataPtr create(ObjectStoragePtr object_storage, ConfigurationObservePtr configuration, ContextPtr local_context) static DataLakeMetadataPtr create(ObjectStoragePtr object_storage, ConfigurationObserverPtr configuration, ContextPtr local_context)
{ {
return std::make_unique<DeltaLakeMetadata>(object_storage, configuration, local_context); return std::make_unique<DeltaLakeMetadata>(object_storage, configuration, local_context);
} }

View File

@ -87,7 +87,7 @@ Strings HudiMetadata::getDataFilesImpl() const
return result; return result;
} }
HudiMetadata::HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObservePtr configuration_, ContextPtr context_) HudiMetadata::HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_)
: WithContext(context_), object_storage(object_storage_), configuration(configuration_) : WithContext(context_), object_storage(object_storage_), configuration(configuration_)
{ {
} }

View File

@ -13,14 +13,11 @@ namespace DB
class HudiMetadata final : public IDataLakeMetadata, private WithContext class HudiMetadata final : public IDataLakeMetadata, private WithContext
{ {
public: public:
using ConfigurationObservePtr = StorageObjectStorage::ConfigurationObservePtr; using ConfigurationObserverPtr = StorageObjectStorage::ConfigurationObserverPtr;
static constexpr auto name = "Hudi"; static constexpr auto name = "Hudi";
HudiMetadata( HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_);
ObjectStoragePtr object_storage_,
ConfigurationObservePtr configuration_,
ContextPtr context_);
Strings getDataFiles() const override; Strings getDataFiles() const override;
@ -38,17 +35,14 @@ public:
&& data_files == hudi_metadata->data_files; && data_files == hudi_metadata->data_files;
} }
static DataLakeMetadataPtr create( static DataLakeMetadataPtr create(ObjectStoragePtr object_storage, ConfigurationObserverPtr configuration, ContextPtr local_context)
ObjectStoragePtr object_storage,
ConfigurationObservePtr configuration,
ContextPtr local_context)
{ {
return std::make_unique<HudiMetadata>(object_storage, configuration, local_context); return std::make_unique<HudiMetadata>(object_storage, configuration, local_context);
} }
private: private:
const ObjectStoragePtr object_storage; const ObjectStoragePtr object_storage;
const ConfigurationObservePtr configuration; const ConfigurationObserverPtr configuration;
mutable Strings data_files; mutable Strings data_files;
std::unordered_map<String, String> column_name_to_physical_name; std::unordered_map<String, String> column_name_to_physical_name;
DataLakePartitionColumns partition_columns; DataLakePartitionColumns partition_columns;

View File

@ -51,7 +51,7 @@ extern const int UNSUPPORTED_METHOD;
IcebergMetadata::IcebergMetadata( IcebergMetadata::IcebergMetadata(
ObjectStoragePtr object_storage_, ObjectStoragePtr object_storage_,
ConfigurationObservePtr configuration_, ConfigurationObserverPtr configuration_,
DB::ContextPtr context_, DB::ContextPtr context_,
Int32 metadata_version_, Int32 metadata_version_,
Int32 format_version_, Int32 format_version_,
@ -383,7 +383,7 @@ std::pair<Int32, String> getMetadataFileAndVersion(
} }
DataLakeMetadataPtr DataLakeMetadataPtr
IcebergMetadata::create(ObjectStoragePtr object_storage, ConfigurationObservePtr configuration, ContextPtr local_context) IcebergMetadata::create(ObjectStoragePtr object_storage, ConfigurationObserverPtr configuration, ContextPtr local_context)
{ {
auto configuration_ptr = configuration.lock(); auto configuration_ptr = configuration.lock();

View File

@ -1,5 +1,7 @@
#pragma once #pragma once
#include "config.h"
#if USE_AVRO /// StorageIceberg depending on Avro to parse metadata with Avro format. #if USE_AVRO /// StorageIceberg depending on Avro to parse metadata with Avro format.
#include <Interpreters/Context_fwd.h> #include <Interpreters/Context_fwd.h>
@ -61,13 +63,13 @@ namespace DB
class IcebergMetadata : public IDataLakeMetadata, private WithContext class IcebergMetadata : public IDataLakeMetadata, private WithContext
{ {
public: public:
using ConfigurationObservePtr = StorageObjectStorage::ConfigurationObservePtr; using ConfigurationObserverPtr = StorageObjectStorage::ConfigurationObserverPtr;
static constexpr auto name = "Iceberg"; static constexpr auto name = "Iceberg";
IcebergMetadata( IcebergMetadata(
ObjectStoragePtr object_storage_, ObjectStoragePtr object_storage_,
ConfigurationObservePtr configuration_, ConfigurationObserverPtr configuration_,
ContextPtr context_, ContextPtr context_,
Int32 metadata_version_, Int32 metadata_version_,
Int32 format_version_, Int32 format_version_,
@ -92,16 +94,13 @@ public:
return iceberg_metadata && getVersion() == iceberg_metadata->getVersion(); return iceberg_metadata && getVersion() == iceberg_metadata->getVersion();
} }
static DataLakeMetadataPtr create( static DataLakeMetadataPtr create(ObjectStoragePtr object_storage, ConfigurationObserverPtr configuration, ContextPtr local_context);
ObjectStoragePtr object_storage,
ConfigurationObservePtr configuration,
ContextPtr local_context);
private: private:
size_t getVersion() const { return metadata_version; } size_t getVersion() const { return metadata_version; }
const ObjectStoragePtr object_storage; const ObjectStoragePtr object_storage;
const ConfigurationObservePtr configuration; const ConfigurationObserverPtr configuration;
Int32 metadata_version; Int32 metadata_version;
Int32 format_version; Int32 format_version;
String manifest_list_file; String manifest_list_file;

View File

@ -87,6 +87,7 @@ StorageObjectStorage::StorageObjectStorage(
, distributed_processing(distributed_processing_) , distributed_processing(distributed_processing_)
, log(getLogger(fmt::format("Storage{}({})", configuration->getEngineName(), table_id_.getFullTableName()))) , log(getLogger(fmt::format("Storage{}({})", configuration->getEngineName(), table_id_.getFullTableName())))
{ {
configuration_->update(object_storage_, context);
ColumnsDescription columns{columns_}; ColumnsDescription columns{columns_};
std::string sample_path; std::string sample_path;

View File

@ -26,7 +26,7 @@ class StorageObjectStorage : public IStorage
public: public:
class Configuration; class Configuration;
using ConfigurationPtr = std::shared_ptr<Configuration>; using ConfigurationPtr = std::shared_ptr<Configuration>;
using ConfigurationObservePtr = std::weak_ptr<Configuration>; using ConfigurationObserverPtr = std::weak_ptr<Configuration>;
using ObjectInfo = RelativePathWithMetadata; using ObjectInfo = RelativePathWithMetadata;
using ObjectInfoPtr = std::shared_ptr<ObjectInfo>; using ObjectInfoPtr = std::shared_ptr<ObjectInfo>;
using ObjectInfos = std::vector<ObjectInfoPtr>; using ObjectInfos = std::vector<ObjectInfoPtr>;

View File

@ -29,6 +29,7 @@ static std::shared_ptr<StorageObjectStorage> createStorageObjectStorage(
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, context, false); StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, context, false);
// Use format settings from global server context + settings from // Use format settings from global server context + settings from
// the SETTINGS clause of the create query. Settings from current // the SETTINGS clause of the create query. Settings from current
// session and user are ignored. // session and user are ignored.