Add dynamic schemas

This commit is contained in:
divanik 2024-11-07 16:45:09 +00:00
parent 3fde214467
commit ba67604364
4 changed files with 69 additions and 17 deletions

View File

@ -15,12 +15,20 @@
#include "Storages/ColumnsDescription.h" #include "Storages/ColumnsDescription.h"
#include <memory> #include <memory>
#include <string>
#include <unordered_map> #include <unordered_map>
#include <Common/ErrorCodes.h>
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int FORMAT_VERSION_TOO_OLD;
}
template <typename T> template <typename T>
concept StorageConfiguration = std::derived_from<T, StorageObjectStorage::Configuration>; concept StorageConfiguration = std::derived_from<T, StorageObjectStorage::Configuration>;
@ -38,12 +46,19 @@ public:
{ {
BaseStorageConfiguration::update(object_storage, local_context); BaseStorageConfiguration::update(object_storage, local_context);
auto new_metadata = DataLakeMetadata::create(object_storage, weak_from_this(), local_context); auto new_metadata = DataLakeMetadata::create(object_storage, weak_from_this(), local_context);
if (current_metadata && *current_metadata == *new_metadata)
return;
current_metadata = std::move(new_metadata); if (!current_metadata || (*current_metadata != *new_metadata))
BaseStorageConfiguration::setPaths(current_metadata->getDataFiles()); {
BaseStorageConfiguration::setPartitionColumns(current_metadata->getPartitionColumns()); throw Exception(
ErrorCodes::FORMAT_VERSION_TOO_OLD,
"Storage thinks that actual metadata version is {}, but actual metadata version is {}",
(dynamic_cast<IcebergMetadata *>(current_metadata.get()) != nullptr)
? std::to_string(dynamic_cast<IcebergMetadata *>(current_metadata.get())->getVersion())
: "Absent",
(dynamic_cast<IcebergMetadata *>(new_metadata.get()) != nullptr)
? std::to_string(dynamic_cast<IcebergMetadata *>(new_metadata.get())->getVersion())
: "Absent");
}
} }
std::optional<ColumnsDescription> tryGetTableStructureFromMetadata() const override std::optional<ColumnsDescription> tryGetTableStructureFromMetadata() const override
@ -72,6 +87,20 @@ public:
return current_metadata->getSchemaTransformer(data_path); return current_metadata->getSchemaTransformer(data_path);
} }
ColumnsDescription updateAndGetCurrentSchema(ObjectStoragePtr object_storage, ContextPtr context) override
{
BaseStorageConfiguration::update(object_storage, context);
auto new_metadata = DataLakeMetadata::create(object_storage, weak_from_this(), context);
if (!current_metadata || (*current_metadata != *new_metadata))
{
current_metadata = std::move(new_metadata);
BaseStorageConfiguration::setPaths(current_metadata->getDataFiles());
BaseStorageConfiguration::setPartitionColumns(current_metadata->getPartitionColumns());
}
return ColumnsDescription{current_metadata->getTableSchema()};
}
private: private:
DataLakeMetadataPtr current_metadata; DataLakeMetadataPtr current_metadata;

View File

@ -91,6 +91,7 @@ StorageObjectStorage::StorageObjectStorage(
{ {
try try
{ {
configuration->updateAndGetCurrentSchema(object_storage, context);
configuration->update(object_storage, context); configuration->update(object_storage, context);
} }
catch (...) catch (...)
@ -148,6 +149,24 @@ void StorageObjectStorage::Configuration::update(ObjectStoragePtr object_storage
IObjectStorage::ApplyNewSettingsOptions options{.allow_client_change = !isStaticConfiguration()}; IObjectStorage::ApplyNewSettingsOptions options{.allow_client_change = !isStaticConfiguration()};
object_storage_ptr->applyNewSettings(context->getConfigRef(), getTypeName() + ".", context, options); object_storage_ptr->applyNewSettings(context->getConfigRef(), getTypeName() + ".", context, options);
} }
bool StorageObjectStorage::hasExternalDynamicMetadata() const
{
return configuration->isDataLakeConfiguration();
}
void StorageObjectStorage::updateExternalDynamicMetadata(ContextPtr context_ptr)
{
if (configuration->isDataLakeConfiguration())
{
StorageInMemoryMetadata metadata;
metadata.setColumns(configuration->updateAndGetCurrentSchema(object_storage, context_ptr));
setInMemoryMetadata(metadata);
return;
}
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method updateExternalDynamicMetadata is not supported by storage {}", getName());
}
namespace namespace
{ {
class ReadFromObjectStorageStep : public SourceStepWithFilter class ReadFromObjectStorageStep : public SourceStepWithFilter
@ -363,7 +382,7 @@ SinkToStoragePtr StorageObjectStorage::write(
auto paths = configuration->getPaths(); auto paths = configuration->getPaths();
if (auto new_key = checkAndGetNewFileOnInsertIfNeeded(*object_storage, *configuration, settings, paths.front(), paths.size())) if (auto new_key = checkAndGetNewFileOnInsertIfNeeded(*object_storage, *configuration, settings, paths.front(), paths.size()))
{ {
paths.emplace_back(*new_key); paths.push_back(*new_key);
} }
configuration->setPaths(paths); configuration->setPaths(paths);
@ -434,6 +453,7 @@ ColumnsDescription StorageObjectStorage::resolveSchemaFromData(
{ {
if (configuration->isDataLakeConfiguration()) if (configuration->isDataLakeConfiguration())
{ {
configuration->updateAndGetCurrentSchema(object_storage, context);
configuration->update(object_storage, context); configuration->update(object_storage, context);
auto table_structure = configuration->tryGetTableStructureFromMetadata(); auto table_structure = configuration->tryGetTableStructureFromMetadata();
if (table_structure) if (table_structure)

View File

@ -1,6 +1,4 @@
#pragma once #pragma once
#include <cstddef>
#include <optional>
#include <Core/SchemaInferenceMode.h> #include <Core/SchemaInferenceMode.h>
#include <Disks/ObjectStorages/IObjectStorage.h> #include <Disks/ObjectStorages/IObjectStorage.h>
#include <Parsers/IAST_fwd.h> #include <Parsers/IAST_fwd.h>
@ -9,12 +7,10 @@
#include <Storages/ObjectStorage/DataLakes/PartitionColumns.h> #include <Storages/ObjectStorage/DataLakes/PartitionColumns.h>
#include <Storages/prepareReadingFromFormat.h> #include <Storages/prepareReadingFromFormat.h>
#include <Common/threadPoolCallbackRunner.h> #include <Common/threadPoolCallbackRunner.h>
#include <memory>
#include "Interpreters/ActionsDAG.h" #include "Interpreters/ActionsDAG.h"
#include "Storages/ColumnsDescription.h" #include "Storages/ColumnsDescription.h"
#include <memory>
namespace DB namespace DB
{ {
@ -129,6 +125,10 @@ public:
std::string & sample_path, std::string & sample_path,
const ContextPtr & context); const ContextPtr & context);
bool hasExternalDynamicMetadata() const override;
void updateExternalDynamicMetadata(ContextPtr) override;
protected: protected:
String getPathSample(StorageInMemoryMetadata metadata, ContextPtr context); String getPathSample(StorageInMemoryMetadata metadata, ContextPtr context);
@ -215,6 +215,11 @@ public:
virtual std::shared_ptr<const ActionsDAG> getSchemaTransformer(const String&) const { return {}; } virtual std::shared_ptr<const ActionsDAG> getSchemaTransformer(const String&) const { return {}; }
virtual ColumnsDescription updateAndGetCurrentSchema(ObjectStoragePtr, ContextPtr)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method updateAndGetCurrentSchema is not supported by storage {}", getEngineName());
}
virtual ReadFromFormatInfo prepareReadingFromFormat( virtual ReadFromFormatInfo prepareReadingFromFormat(
ObjectStoragePtr object_storage, ObjectStoragePtr object_storage,
const Strings & requested_columns, const Strings & requested_columns,

View File

@ -35,7 +35,8 @@ namespace CurrentMetrics
extern const Metric StorageObjectStorageThreads; extern const Metric StorageObjectStorageThreads;
extern const Metric StorageObjectStorageThreadsActive; extern const Metric StorageObjectStorageThreadsActive;
extern const Metric StorageObjectStorageThreadsScheduled; extern const Metric StorageObjectStorageThreadsScheduled;
} }
namespace DB namespace DB
{ {
namespace Setting namespace Setting
@ -682,10 +683,7 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::GlobIterator::ne
return {}; return {};
} }
for (const auto & relative_metadata : *result) new_batch = std::move(result.value());
{
new_batch.emplace_back(std::make_shared<ObjectInfo>(relative_metadata->relative_path, relative_metadata->metadata));
}
for (auto it = new_batch.begin(); it != new_batch.end();) for (auto it = new_batch.begin(); it != new_batch.end();)
{ {
if (!recursive && !re2::RE2::FullMatch((*it)->getPath(), *matcher)) if (!recursive && !re2::RE2::FullMatch((*it)->getPath(), *matcher))
@ -755,7 +753,7 @@ StorageObjectStorageSource::KeysIterator::KeysIterator(
/// TODO: should we add metadata if we anyway fetch it if file_progress_callback is passed? /// TODO: should we add metadata if we anyway fetch it if file_progress_callback is passed?
for (auto && key : keys) for (auto && key : keys)
{ {
auto object_info = std::make_shared<ObjectInfo>(key, std::nullopt); auto object_info = std::make_shared<ObjectInfo>(key);
read_keys_->emplace_back(object_info); read_keys_->emplace_back(object_info);
} }
} }