This commit is contained in:
kssenii 2023-03-28 20:51:02 +02:00
parent 13f29a7242
commit 1573790fe7
19 changed files with 595 additions and 477 deletions

View File

@ -1,13 +1,11 @@
#pragma once
#include <Storages/DataLakes/DeltaLakeMetadataParser.h>
#include <base/JSON.h>
#include "config.h"
#if USE_AWS_S3
#include <Storages/IStorageDataLake.h>
#include <Storages/S3DataLakeMetadataReadHelper.h>
#include <Storages/DataLakes/S3MetadataReader.h>
#include <Storages/StorageS3.h>
#include <base/JSON.h>
#endif
namespace DB
{
@ -17,93 +15,30 @@ namespace ErrorCodes
extern const int INCORRECT_DATA;
}
/**
* Documentation links:
* - https://github.com/delta-io/delta/blob/master/PROTOCOL.md#data-files
*/
template <typename Configuration, typename MetadataReadHelper>
class DeltaLakeMetadataParser : private WithContext
namespace
{
public:
DeltaLakeMetadataParser(const Configuration & storage_configuration_, ContextPtr context_)
: WithContext(context_)
, storage_configuration(storage_configuration_)
{
}
/**
* Documentation links:
* - https://github.com/delta-io/delta/blob/master/PROTOCOL.md#data-files
*/
Strings getFiles() const
{
auto data_files = processMetadataFiles();
return Strings(data_files.begin(), data_files.end());
}
static String generateQueryFromKeys(const std::vector<String> & keys, const std::string & /* format */)
{
if (keys.size() == 1)
{
return fmt::format("{}", keys[0]);
}
else
{
return fmt::format("{{{}}}", fmt::join(keys, ","));
}
}
private:
/**
* Delta files are stored as JSON in a directory at the root of the table named _delta_log,
* and together with checkpoints make up the log of all changes that have occurred to a table.
*
* Delta files are the unit of atomicity for a table,
* and are named using the next available version number, zero-padded to 20 digits.
* For example:
* ./_delta_log/00000000000000000000.json
*/
std::vector<String> getMetadataFiles() const
template <typename Configuration, typename MetadataReadHelper>
std::vector<String> getMetadataFiles(const Configuration & configuration)
{
/// DeltaLake format stores all metadata json files in _delta_log directory
static constexpr auto deltalake_metadata_directory = "_delta_log";
static constexpr auto metadata_file_suffix = ".json";
return MetadataReadHelper::listFiles(
storage_configuration, deltalake_metadata_directory, metadata_file_suffix);
}
/**
* Delta files are the unit of atomicity for a table,
* and are named using the next available version number, zero-padded to 20 digits.
* For example:
* ./_delta_log/00000000000000000000.json
*
* A delta file, n.json, contains an atomic set of actions that should be applied to the
* previous table state (n-1.json) in order to the construct nth snapshot of the table.
* An action changes one aspect of the table's state, for example, adding or removing a file.
*/
std::set<String> processMetadataFiles() const
{
std::set<String> result;
const auto keys = getMetadataFiles();
for (const String & key : keys)
{
auto buf = MetadataReadHelper::createReadBuffer(key, getContext(), storage_configuration);
char c;
while (!buf->eof())
{
/// May be some invalid characters before json.
while (buf->peek(c) && c != '{')
buf->ignore();
if (buf->eof())
break;
String json_str;
readJSONObjectPossiblyInvalid(json_str, *buf);
if (json_str.empty())
continue;
const JSON json(json_str);
handleJSON(json, result);
}
}
return result;
return MetadataReadHelper::listFiles(configuration, deltalake_metadata_directory, metadata_file_suffix);
}
/**
@ -140,7 +75,7 @@ private:
* \"nullCount\":{\"col-6c990940-59bb-4709-8f2e-17083a82c01a\":0,\"col-763cd7e2-7627-4d8e-9fb7-9e85d0c8845b\":0}}"}}
* "
*/
void handleJSON(const JSON & json, std::set<String> & result) const
void handleJSON(const JSON & json, std::set<String> & result)
{
if (json.has("add"))
{
@ -158,16 +93,53 @@ private:
}
}
Configuration storage_configuration;
};
/**
* A delta file, n.json, contains an atomic set of actions that should be applied to the
* previous table state (n-1.json) in order to the construct nth snapshot of the table.
* An action changes one aspect of the table's state, for example, adding or removing a file.
*/
template <typename Configuration, typename MetadataReadHelper>
std::set<String> processMetadataFiles(const Configuration & configuration, ContextPtr context)
{
std::set<String> result;
const auto keys = getMetadataFiles<Configuration, MetadataReadHelper>(configuration);
for (const String & key : keys)
{
auto buf = MetadataReadHelper::createReadBuffer(key, context, configuration);
struct StorageDeltaLakeName
{
static constexpr auto name = "DeltaLake";
};
char c;
while (!buf->eof())
{
/// May be some invalid characters before json.
while (buf->peek(c) && c != '{')
buf->ignore();
using StorageDeltaLake
= IStorageDataLake<StorageS3, StorageDeltaLakeName, DeltaLakeMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>>;
if (buf->eof())
break;
String json_str;
readJSONObjectPossiblyInvalid(json_str, *buf);
if (json_str.empty())
continue;
const JSON json(json_str);
handleJSON(json, result);
}
}
return result;
}
}
template <typename Configuration, typename MetadataReadHelper>
Strings DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::getFiles(const Configuration & configuration, ContextPtr context)
{
auto data_files = processMetadataFiles<Configuration, MetadataReadHelper>(configuration, context);
return Strings(data_files.begin(), data_files.end());
}
#if USE_AWS_S3
template Strings DeltaLakeMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::getFiles(
const StorageS3::Configuration & configuration, ContextPtr);
#endif
}

View File

@ -0,0 +1,15 @@
#pragma once
#include <Interpreters/Context_fwd.h>
#include <Core/Types.h>
namespace DB
{
template <typename Configuration, typename MetadataReadHelper>
struct DeltaLakeMetadataParser
{
static Strings getFiles(const Configuration & configuration, ContextPtr context);
};
}

View File

@ -1,41 +1,31 @@
#pragma once
#include <Storages/DataLakes/HudiMetadataParser.h>
#include <Storages/DataLakes/S3MetadataReader.h>
#include <Common/logger_useful.h>
#include <ranges>
#include "config.h"
#if USE_AWS_S3
#include <Storages/IStorage.h>
#include <Storages/IStorageDataLake.h>
#include <Storages/S3DataLakeMetadataReadHelper.h>
#include <Storages/DataLakes/S3MetadataReader.h>
#include <Storages/StorageS3.h>
#include <Common/logger_useful.h>
#include <ranges>
#endif
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace DB
namespace
{
template <typename Configuration, typename MetadataReadHelper>
class HudiMetadataParser
{
public:
HudiMetadataParser(const Configuration & configuration_, ContextPtr context_)
: configuration(configuration_), context(context_), log(&Poco::Logger::get("StorageHudi")) {}
Strings getFiles() const { return MetadataReadHelper::listFiles(configuration); }
/** Apache Hudi store parts of data in different files.
* Every part file has timestamp in it.
* Every partition(directory) in Apache Hudi has different versions of part.
* To find needed parts we need to find out latest part file for every partition.
* Part format is usually parquet, but can differ.
*/
static String generateQueryFromKeys(const std::vector<String> & keys, const String & format)
Strings processMetadataFiles(const std::vector<String> & keys, const String & format)
{
auto * log = &Poco::Logger::get("HudiMetadataParser");
@ -76,35 +66,23 @@ public:
LOG_TRACE(log, "Having {} result partitions", latest_parts.size());
std::string list_of_keys;
for (const auto & [directory, file_info] : latest_parts)
{
if (!list_of_keys.empty())
list_of_keys += ",";
LOG_TEST(log, "Partition: {}, file: {}, timestamp: {}", directory, file_info.filename, file_info.timestamp);
list_of_keys += file_info.filename;
}
if (latest_parts.size() == 1)
return list_of_keys;
return "{" + list_of_keys + "}";
Strings result;
for (const auto & [_, file_info] : latest_parts)
result.push_back(file_info.filename);
return result;
}
private:
Configuration configuration;
ContextPtr context;
Poco::Logger * log;
};
struct StorageHudiName
{
static constexpr auto name = "Hudi";
};
using StorageHudi
= IStorageDataLake<StorageS3, StorageHudiName, HudiMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>>;
}
template <typename Configuration, typename MetadataReadHelper>
Strings HudiMetadataParser<Configuration, MetadataReadHelper>::getFiles(const Configuration & configuration, ContextPtr)
{
const Strings files = MetadataReadHelper::listFiles(configuration);
return processMetadataFiles(files, "parquet");
}
#if USE_AWS_S3
template Strings HudiMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::getFiles(
const StorageS3::Configuration & configuration, ContextPtr);
#endif
}

View File

@ -0,0 +1,15 @@
#pragma once
#include <Interpreters/Context_fwd.h>
#include <Core/Types.h>
namespace DB
{
template <typename Configuration, typename MetadataReadHelper>
struct HudiMetadataParser
{
static Strings getFiles(const Configuration & configuration, ContextPtr context);
};
}

View File

@ -0,0 +1,105 @@
#pragma once
#include "config.h"
#if USE_AWS_S3
#include <Storages/IStorage.h>
#include <Common/logger_useful.h>
#include <Storages/StorageFactory.h>
#include <Formats/FormatFactory.h>
#include <filesystem>
namespace DB
{
template <typename Storage, typename Name, typename MetadataParser>
class IStorageDataLake : public Storage
{
public:
static constexpr auto name = Name::name;
using Configuration = typename Storage::Configuration;
template <class ...Args>
explicit IStorageDataLake(const Configuration & configuration_, ContextPtr context_, Args && ...args)
: Storage(getConfigurationForDataRead(configuration_, context_), context_, std::forward<Args>(args)...)
, base_configuration(configuration_)
, log(&Poco::Logger::get(getName())) {}
String getName() const override { return name; }
static ColumnsDescription getTableStructureFromData(
Configuration & base_configuration,
const std::optional<FormatSettings> & format_settings,
ContextPtr local_context)
{
auto configuration = getConfigurationForDataRead(base_configuration, local_context);
return Storage::getTableStructureFromData(configuration, format_settings, local_context);
}
static Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context)
{
return Storage::getConfiguration(engine_args, local_context, /* get_format_from_file */false);
}
void updateConfigurationIfChanged(ContextPtr local_context) override
{
const bool updated = base_configuration.update(local_context);
auto new_keys = getDataFiles(base_configuration, local_context);
if (!updated && new_keys == Storage::getConfiguration().keys)
return;
Storage::useConfiguration(getConfigurationForDataRead(base_configuration, local_context, new_keys));
}
private:
static Configuration getConfigurationForDataRead(
const Configuration & base_configuration, ContextPtr local_context, const Strings & keys = {})
{
auto configuration{base_configuration};
configuration.update(local_context);
configuration.static_configuration = true;
if (keys.empty())
configuration.keys = getDataFiles(configuration, local_context);
else
configuration.keys = keys;
LOG_TRACE(&Poco::Logger::get("DataLake"), "New configuration path: {}", configuration.getPath());
configuration.connect(local_context);
return configuration;
}
static Strings getDataFiles(const Configuration & configuration, ContextPtr local_context)
{
auto files = MetadataParser::getFiles(configuration, local_context);
for (auto & file : files)
file = std::filesystem::path(configuration.getPath()) / file;
return files;
}
Configuration base_configuration;
Poco::Logger * log;
};
template <typename DataLake>
static StoragePtr createDataLakeStorage(const StorageFactory::Arguments & args)
{
auto configuration = DataLake::getConfiguration(args.engine_args, args.getLocalContext());
/// Data lakes use parquet format, no need for schema inference.
if (configuration.format == "auto")
configuration.format = "Parquet";
return std::make_shared<DataLake>(
configuration, args.getContext(), args.table_id, args.columns, args.constraints,
args.comment, getFormatSettings(args.getContext()));
}
}
#endif

View File

@ -0,0 +1,225 @@
#include "config.h"
#if USE_AWS_S3 && USE_AVRO
#include <Storages/DataLakes/IcebergMetadataParser.h>
#include <Common/logger_useful.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnTuple.h>
#include <Columns/IColumn.h>
#include <Storages/StorageFactory.h>
#include <Formats/FormatFactory.h>
#include <fmt/format.h>
#include <Processors/Formats/Impl/AvroRowInputFormat.h>
#include <Poco/JSON/Array.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
#if USE_AWS_S3
#include <Storages/DataLakes/S3MetadataReader.h>
#include <Storages/StorageS3.h>
#endif
namespace DB
{
namespace ErrorCodes
{
extern const int FILE_DOESNT_EXIST;
extern const int ILLEGAL_COLUMN;
}
namespace
{
constexpr auto metadata_directory = "metadata";
template <typename Configuration, typename MetadataReadHelper>
String fetchMetadataFile(const Configuration & configuration)
{
/// Iceberg stores all the metadata.json in metadata directory, and the
/// newest version has the max version name, so we should list all of them,
/// then find the newest metadata.
static constexpr auto meta_file_suffix = ".json";
auto metadata_files = MetadataReadHelper::listFiles(configuration, metadata_directory, meta_file_suffix);
if (metadata_files.empty())
{
throw Exception(
ErrorCodes::FILE_DOESNT_EXIST,
"The metadata file for Iceberg table with path {} doesn't exist",
configuration.url.key);
}
/// See comment above
auto it = std::max_element(metadata_files.begin(), metadata_files.end());
return *it;
}
template <typename Configuration, typename MetadataReadHelper>
String getManifestList(const String & metadata_name, const Configuration & configuration, ContextPtr context)
{
auto buffer = MetadataReadHelper::createReadBuffer(metadata_name, context, configuration);
String json_str;
readJSONObjectPossiblyInvalid(json_str, *buffer);
/// Looks like base/base/JSON.h can not parse this json file
Poco::JSON::Parser parser;
Poco::Dynamic::Var json = parser.parse(json_str);
Poco::JSON::Object::Ptr object = json.extract<Poco::JSON::Object::Ptr>();
auto current_snapshot_id = object->getValue<Int64>("current-snapshot-id");
auto snapshots = object->get("snapshots").extract<Poco::JSON::Array::Ptr>();
for (size_t i = 0; i < snapshots->size(); ++i)
{
auto snapshot = snapshots->getObject(static_cast<UInt32>(i));
if (snapshot->getValue<Int64>("snapshot-id") == current_snapshot_id)
{
auto path = snapshot->getValue<String>("manifest-list");
return std::filesystem::path(configuration.url.key) / metadata_directory / std::filesystem::path(path).filename();
}
}
return {};
}
MutableColumns parseAvro(const std::unique_ptr<avro::DataFileReaderBase> & file_reader, const DataTypePtr & data_type, const String & field_name)
{
auto deserializer = std::make_unique<AvroDeserializer>(
Block{{data_type->createColumn(), data_type, field_name}}, file_reader->dataSchema(), true, true);
file_reader->init();
MutableColumns columns;
columns.emplace_back(data_type->createColumn());
RowReadExtension ext;
while (file_reader->hasMore())
{
file_reader->decr();
deserializer->deserializeRow(columns, file_reader->decoder(), ext);
}
return columns;
}
template <typename Configuration, typename MetadataReadHelper>
Strings getManifestFiles(const String & manifest_list, const Configuration & configuration, ContextPtr context)
{
auto buffer = MetadataReadHelper::createReadBuffer(manifest_list, context, configuration);
auto file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*buffer));
static constexpr auto manifest_path = "manifest_path";
/// The manifest_path is the first field in manifest list file,
/// And its have String data type
/// {'manifest_path': 'xxx', ...}
auto data_type = AvroSchemaReader::avroNodeToDataType(file_reader->dataSchema().root()->leafAt(0));
auto columns = parseAvro(file_reader, data_type, manifest_path);
auto & col = columns.at(0);
std::vector<String> res;
if (col->getDataType() == TypeIndex::String)
{
const auto * col_str = typeid_cast<ColumnString *>(col.get());
size_t col_size = col_str->size();
for (size_t i = 0; i < col_size; ++i)
{
auto file_path = col_str->getDataAt(i).toView();
/// We just need obtain the file name
std::filesystem::path path(file_path);
res.emplace_back(std::filesystem::path(configuration.url.key) / metadata_directory / path.filename());
}
return res;
}
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"The parsed column from Avro file of `manifest_path` field should be String type, got {}",
col->getFamilyName());
}
template <typename Configuration, typename MetadataReadHelper>
Strings getFilesForRead(const std::vector<String> & manifest_files, const Configuration & configuration, ContextPtr context)
{
Strings keys;
for (const auto & manifest_file : manifest_files)
{
auto buffer = MetadataReadHelper::createReadBuffer(manifest_file, context, configuration);
auto file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*buffer));
static constexpr auto manifest_path = "data_file";
/// The data_file filed at the 3rd position of the manifest file:
/// {'status': xx, 'snapshot_id': xx, 'data_file': {'file_path': 'xxx', ...}, ...}
/// and it's also a nested record, so its result type is a nested Tuple
auto data_type = AvroSchemaReader::avroNodeToDataType(file_reader->dataSchema().root()->leafAt(2));
auto columns = parseAvro(file_reader, data_type, manifest_path);
auto & col = columns.at(0);
if (col->getDataType() == TypeIndex::Tuple)
{
auto * col_tuple = typeid_cast<ColumnTuple *>(col.get());
auto & col_str = col_tuple->getColumnPtr(0);
if (col_str->getDataType() == TypeIndex::String)
{
const auto * str_col = typeid_cast<const ColumnString *>(col_str.get());
size_t col_size = str_col->size();
for (size_t i = 0; i < col_size; ++i)
{
auto file_path = str_col->getDataAt(i).toView();
/// We just obtain the partition/file name
std::filesystem::path path(file_path);
keys.emplace_back(path.parent_path().filename() / path.filename());
}
}
else
{
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"The parsed column from Avro file of `file_path` field should be String type, got {}",
col_str->getFamilyName());
}
}
else
{
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"The parsed column from Avro file of `data_file` field should be Tuple type, got {}",
col->getFamilyName());
}
}
return keys;
}
}
template <typename Configuration, typename MetadataReadHelper>
Strings IcebergMetadataParser<Configuration, MetadataReadHelper>::getFiles(const Configuration & configuration, ContextPtr context)
{
auto metadata = fetchMetadataFile<Configuration, MetadataReadHelper>(configuration);
auto manifest_list = getManifestList<Configuration, MetadataReadHelper>(metadata, configuration, context);
/// When table first created and does not have any data
if (manifest_list.empty())
return {};
auto manifest_files = getManifestFiles<Configuration, MetadataReadHelper>(manifest_list, configuration, context);
return getFilesForRead<Configuration, MetadataReadHelper>(manifest_files, configuration, context);
}
#if USE_AWS_S3
template Strings IcebergMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::getFiles(const StorageS3::Configuration & configuration, ContextPtr);
#endif
}
#endif

View File

@ -0,0 +1,20 @@
#pragma once
/// StorageIceberg depending on Avro to parse metadata with Avro format.
#if USE_AVRO
#include <Interpreters/Context_fwd.h>
#include <Core/Types.h>
namespace DB
{
template <typename Configuration, typename MetadataReadHelper>
struct IcebergMetadataParser
{
static Strings getFiles(const Configuration & configuration, ContextPtr context);
};
}
#endif

View File

@ -2,13 +2,13 @@
#if USE_AWS_S3
# include <IO/ReadBufferFromS3.h>
# include <IO/S3/Requests.h>
# include <Interpreters/Context.h>
# include <Storages/S3DataLakeMetadataReadHelper.h>
# include <aws/core/auth/AWSCredentials.h>
# include <aws/s3/S3Client.h>
# include <aws/s3/model/ListObjectsV2Request.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/S3/Requests.h>
#include <Interpreters/Context.h>
#include <Storages/DataLakes/S3MetadataReader.h>
#include <aws/core/auth/AWSCredentials.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/ListObjectsV2Request.h>
namespace DB

View File

@ -4,7 +4,7 @@
#if USE_AWS_S3
# include <Storages/StorageS3.h>
#include <Storages/StorageS3.h>
class ReadBuffer;

View File

@ -0,0 +1,25 @@
#pragma once
#include <Storages/IStorage.h>
#include <Storages/DataLakes/IStorageDataLake.h>
#include <Storages/DataLakes/DeltaLakeMetadataParser.h>
#include "config.h"
#if USE_AWS_S3
#include <Storages/DataLakes/S3MetadataReader.h>
#include <Storages/StorageS3.h>
#endif
namespace DB
{
struct StorageDeltaLakeName
{
static constexpr auto name = "DeltaLake";
};
#if USE_AWS_S3
using StorageDeltaLakeS3 = IStorageDataLake<StorageS3, StorageDeltaLakeName, DeltaLakeMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>>;
#endif
}

View File

@ -0,0 +1,25 @@
#pragma once
#include <Storages/IStorage.h>
#include <Storages/DataLakes/IStorageDataLake.h>
#include <Storages/DataLakes/HudiMetadataParser.h>
#include "config.h"
#if USE_AWS_S3
#include <Storages/DataLakes/S3MetadataReader.h>
#include <Storages/StorageS3.h>
#endif
namespace DB
{
struct StorageHudiName
{
static constexpr auto name = "Hudi";
};
#if USE_AWS_S3
using StorageHudiS3 = IStorageDataLake<StorageS3, StorageHudiName, HudiMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>>;
#endif
}

View File

@ -0,0 +1,25 @@
#pragma once
#include <Storages/IStorage.h>
#include <Storages/DataLakes/IStorageDataLake.h>
#include <Storages/DataLakes/IcebergMetadataParser.h>
#include "config.h"
#if USE_AWS_S3
#include <Storages/DataLakes/S3MetadataReader.h>
#include <Storages/StorageS3.h>
#endif
namespace DB
{
struct StorageIcebergName
{
static constexpr auto name = "Iceberg";
};
#if USE_AWS_S3
using StorageIcebergS3 = IStorageDataLake<StorageS3, StorageIcebergName, IcebergMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>>;
#endif
}

View File

@ -1,11 +1,11 @@
#include <Storages/IStorageDataLake.h>
#include <Storages/DataLakes/IStorageDataLake.h>
#include "config.h"
#if USE_AWS_S3
#include <Storages/StorageHudi.h>
#include <Storages/StorageDeltaLake.h>
#include <Storages/StorageIceberg.h>
#include <Storages/DataLakes/StorageDeltaLake.h>
#include <Storages/DataLakes/StorageIceberg.h>
#include <Storages/DataLakes/StorageHudi.h>
namespace DB
@ -27,17 +27,17 @@ namespace DB
void registerStorageDeltaLake(StorageFactory & factory)
{
REGISTER_DATA_LAKE_STORAGE(StorageDeltaLake, StorageDeltaLakeName::name)
REGISTER_DATA_LAKE_STORAGE(StorageDeltaLakeS3, StorageDeltaLakeName::name)
}
void registerStorageIceberg(StorageFactory & factory)
{
REGISTER_DATA_LAKE_STORAGE(StorageIceberg, StorageIcebergName::name)
REGISTER_DATA_LAKE_STORAGE(StorageIcebergS3, StorageIcebergName::name)
}
void registerStorageHudi(StorageFactory & factory)
{
REGISTER_DATA_LAKE_STORAGE(StorageHudi, StorageHudiName::name)
REGISTER_DATA_LAKE_STORAGE(StorageHudiS3, StorageHudiName::name)
}
}

View File

@ -1,248 +0,0 @@
#include "config.h"
#if USE_AWS_S3 && USE_AVRO
# include <Storages/StorageIceberg.h>
# include <Common/logger_useful.h>
# include <Columns/ColumnString.h>
# include <Columns/ColumnTuple.h>
# include <Columns/IColumn.h>
# include <Storages/StorageFactory.h>
# include <Formats/FormatFactory.h>
# include <fmt/format.h>
# include <Processors/Formats/Impl/AvroRowInputFormat.h>
# include <Poco/JSON/Array.h>
# include <Poco/JSON/Object.h>
# include <Poco/JSON/Parser.h>
namespace DB
{
namespace ErrorCodes
{
extern const int S3_ERROR;
extern const int FILE_DOESNT_EXIST;
extern const int ILLEGAL_COLUMN;
}
template <typename Configuration, typename MetadataReadHelper>
IcebergMetadataParser<Configuration, MetadataReadHelper>::IcebergMetadataParser(const Configuration & configuration_, ContextPtr context_)
: base_configuration(configuration_), context(context_)
{
}
template <typename Configuration, typename MetadataReadHelper>
std::vector<String> IcebergMetadataParser<Configuration, MetadataReadHelper>::getFiles() const
{
auto metadata = fetchMetadataFile();
auto manifest_list = getManifestList(metadata);
/// When table first created and does not have any data
if (manifest_list.empty())
{
return {};
}
auto manifest_files = getManifestFiles(manifest_list);
return getFilesForRead(manifest_files);
}
template <typename Configuration, typename MetadataReadHelper>
String IcebergMetadataParser<Configuration, MetadataReadHelper>::fetchMetadataFile() const
{
/// Iceberg stores all the metadata.json in metadata directory, and the
/// newest version has the max version name, so we should list all of them,
/// then find the newest metadata.
static constexpr auto meta_file_suffix = ".json";
auto metadata_files = MetadataReadHelper::listFiles(base_configuration, metadata_directory, meta_file_suffix);
if (metadata_files.empty())
throw Exception(
ErrorCodes::FILE_DOESNT_EXIST, "The metadata file for Iceberg table with path {} doesn't exist", base_configuration.url.key);
/// See comment above
auto it = std::max_element(metadata_files.begin(), metadata_files.end());
return *it;
}
template <typename Configuration, typename MetadataReadHelper>
String IcebergMetadataParser<Configuration, MetadataReadHelper>::getManifestList(const String & metadata_name) const
{
auto buffer = MetadataReadHelper::createReadBuffer(metadata_name, context, base_configuration);
String json_str;
readJSONObjectPossiblyInvalid(json_str, *buffer);
/// Looks like base/base/JSON.h can not parse this json file
Poco::JSON::Parser parser;
Poco::Dynamic::Var json = parser.parse(json_str);
Poco::JSON::Object::Ptr object = json.extract<Poco::JSON::Object::Ptr>();
auto current_snapshot_id = object->getValue<Int64>("current-snapshot-id");
auto snapshots = object->get("snapshots").extract<Poco::JSON::Array::Ptr>();
for (size_t i = 0; i < snapshots->size(); ++i)
{
auto snapshot = snapshots->getObject(static_cast<UInt32>(i));
if (snapshot->getValue<Int64>("snapshot-id") == current_snapshot_id)
{
auto path = snapshot->getValue<String>("manifest-list");
return std::filesystem::path(base_configuration.url.key) / metadata_directory / std::filesystem::path(path).filename();
}
}
return {};
}
static MutableColumns
parseAvro(const std::unique_ptr<avro::DataFileReaderBase> & file_reader, const DataTypePtr & data_type, const String & field_name)
{
auto deserializer = std::make_unique<AvroDeserializer>(
Block{{data_type->createColumn(), data_type, field_name}}, file_reader->dataSchema(), true, true);
file_reader->init();
MutableColumns columns;
columns.emplace_back(data_type->createColumn());
RowReadExtension ext;
while (file_reader->hasMore())
{
file_reader->decr();
deserializer->deserializeRow(columns, file_reader->decoder(), ext);
}
return columns;
}
template <typename Configuration, typename MetadataReadHelper>
std::vector<String> IcebergMetadataParser<Configuration, MetadataReadHelper>::getManifestFiles(const String & manifest_list) const
{
auto buffer = MetadataReadHelper::createReadBuffer(manifest_list, context, base_configuration);
auto file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*buffer));
static constexpr auto manifest_path = "manifest_path";
/// The manifest_path is the first field in manifest list file,
/// And its have String data type
/// {'manifest_path': 'xxx', ...}
auto data_type = AvroSchemaReader::avroNodeToDataType(file_reader->dataSchema().root()->leafAt(0));
auto columns = parseAvro(file_reader, data_type, manifest_path);
auto & col = columns.at(0);
std::vector<String> res;
if (col->getDataType() == TypeIndex::String)
{
const auto * col_str = typeid_cast<ColumnString *>(col.get());
size_t col_size = col_str->size();
for (size_t i = 0; i < col_size; ++i)
{
auto file_path = col_str->getDataAt(i).toView();
/// We just need obtain the file name
std::filesystem::path path(file_path);
res.emplace_back(std::filesystem::path(base_configuration.url.key) / metadata_directory / path.filename());
}
return res;
}
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"The parsed column from Avro file of `manifest_path` field should be String type, got {}",
col->getFamilyName());
}
template <typename Configuration, typename MetadataReadHelper>
std::vector<String> IcebergMetadataParser<Configuration, MetadataReadHelper>::getFilesForRead(const std::vector<String> & manifest_files) const
{
std::vector<String> keys;
for (const auto & manifest_file : manifest_files)
{
auto buffer = MetadataReadHelper::createReadBuffer(manifest_file, context, base_configuration);
auto file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*buffer));
static constexpr auto manifest_path = "data_file";
/// The data_file filed at the 3rd position of the manifest file:
/// {'status': xx, 'snapshot_id': xx, 'data_file': {'file_path': 'xxx', ...}, ...}
/// and it's also a nested record, so its result type is a nested Tuple
auto data_type = AvroSchemaReader::avroNodeToDataType(file_reader->dataSchema().root()->leafAt(2));
auto columns = parseAvro(file_reader, data_type, manifest_path);
auto & col = columns.at(0);
if (col->getDataType() == TypeIndex::Tuple)
{
auto * col_tuple = typeid_cast<ColumnTuple *>(col.get());
auto & col_str = col_tuple->getColumnPtr(0);
if (col_str->getDataType() == TypeIndex::String)
{
const auto * str_col = typeid_cast<const ColumnString *>(col_str.get());
size_t col_size = str_col->size();
for (size_t i = 0; i < col_size; ++i)
{
auto file_path = str_col->getDataAt(i).toView();
/// We just obtain the partition/file name
std::filesystem::path path(file_path);
keys.emplace_back(path.parent_path().filename() / path.filename());
}
}
else
{
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"The parsed column from Avro file of `file_path` field should be String type, got {}",
col_str->getFamilyName());
}
}
else
{
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"The parsed column from Avro file of `data_file` field should be Tuple type, got {}",
col->getFamilyName());
}
}
return keys;
}
// generateQueryFromKeys constructs query from all parquet filenames
// for underlying StorageS3 engine
template <typename Configuration, typename MetadataReadHelper>
String IcebergMetadataParser<Configuration, MetadataReadHelper>::generateQueryFromKeys(const std::vector<String> & keys, const String &)
{
if (keys.size() == 1)
{
return fmt::format("{}", keys[0]);
}
else
{
return fmt::format("{{{}}}", fmt::join(keys, ","));
}
}
template IcebergMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::IcebergMetadataParser(
const StorageS3::Configuration & configuration_, ContextPtr context_);
template std::vector<String> IcebergMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::getFiles() const;
template String IcebergMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::generateQueryFromKeys(
const std::vector<String> & keys, const String & format);
template String IcebergMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::fetchMetadataFile() const;
template String IcebergMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::getManifestList(const String & metadata_name) const;
template std::vector<String>
IcebergMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::getManifestFiles(const String & manifest_list) const;
template std::vector<String>
IcebergMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::getFilesForRead(const std::vector<String> & manifest_files) const;
}
#endif

View File

@ -1,51 +0,0 @@
#pragma once
#include "config.h"
// StorageIceberg depending on Avro to parse metadata with Avro format.
#if USE_AWS_S3 && USE_AVRO
# include <Storages/IStorageDataLake.h>
# include <Storages/S3DataLakeMetadataReadHelper.h>
# include <Storages/StorageS3.h>
namespace DB
{
// Class to parse iceberg metadata and find files needed for query in table
// Iceberg table directory outlook:
// table/
// data/
// metadata/
// The metadata has three layers: metadata -> manifest list -> manifest files
template <typename Configuration, typename MetadataReadHelper>
class IcebergMetadataParser
{
public:
IcebergMetadataParser(const Configuration & configuration_, ContextPtr context_);
std::vector<String> getFiles() const;
static String generateQueryFromKeys(const std::vector<String> & keys, const String & format);
private:
static constexpr auto metadata_directory = "metadata";
Configuration base_configuration;
ContextPtr context;
String fetchMetadataFile() const;
String getManifestList(const String & metadata_name) const;
std::vector<String> getManifestFiles(const String & manifest_list) const;
std::vector<String> getFilesForRead(const std::vector<String> & manifest_files) const;
};
struct StorageIcebergName
{
static constexpr auto name = "Iceberg";
};
using StorageIceberg
= IStorageDataLake<StorageS3, StorageIcebergName, IcebergMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>>;
}
#endif

View File

@ -2,11 +2,11 @@
#if USE_AWS_S3
# include <Storages/StorageDeltaLake.h>
# include <TableFunctions/ITableFunctionDataLake.h>
# include <TableFunctions/TableFunctionFactory.h>
# include <TableFunctions/TableFunctionS3.h>
# include "registerTableFunctions.h"
#include <Storages/DataLakes/StorageDeltaLake.h>
#include <TableFunctions/ITableFunctionDataLake.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionS3.h>
#include "registerTableFunctions.h"
namespace DB
{
@ -16,7 +16,7 @@ struct TableFunctionDeltaLakeName
static constexpr auto name = "deltaLake";
};
using TableFunctionDeltaLake = ITableFunctionDataLake<TableFunctionDeltaLakeName, StorageDeltaLake, TableFunctionS3>;
using TableFunctionDeltaLake = ITableFunctionDataLake<TableFunctionDeltaLakeName, StorageDeltaLakeS3, TableFunctionS3>;
void registerTableFunctionDeltaLake(TableFunctionFactory & factory)
{

View File

@ -2,11 +2,11 @@
#if USE_AWS_S3
# include <Storages/StorageHudi.h>
# include <TableFunctions/ITableFunctionDataLake.h>
# include <TableFunctions/TableFunctionFactory.h>
# include <TableFunctions/TableFunctionS3.h>
# include "registerTableFunctions.h"
#include <Storages/DataLakes/StorageHudi.h>
#include <TableFunctions/ITableFunctionDataLake.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionS3.h>
#include "registerTableFunctions.h"
namespace DB
{
@ -15,7 +15,7 @@ struct TableFunctionHudiName
{
static constexpr auto name = "hudi";
};
using TableFunctionHudi = ITableFunctionDataLake<TableFunctionHudiName, StorageHudi, TableFunctionS3>;
using TableFunctionHudi = ITableFunctionDataLake<TableFunctionHudiName, StorageHudiS3, TableFunctionS3>;
void registerTableFunctionHudi(TableFunctionFactory & factory)
{

View File

@ -2,11 +2,11 @@
#if USE_AWS_S3 && USE_AVRO
# include <Storages/StorageIceberg.h>
# include <TableFunctions/ITableFunctionDataLake.h>
# include <TableFunctions/TableFunctionFactory.h>
# include <TableFunctions/TableFunctionS3.h>
# include "registerTableFunctions.h"
#include <Storages/DataLakes/StorageIceberg.h>
#include <TableFunctions/ITableFunctionDataLake.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionS3.h>
#include "registerTableFunctions.h"
namespace DB
@ -17,7 +17,7 @@ struct TableFunctionIcebergName
static constexpr auto name = "iceberg";
};
using TableFunctionIceberg = ITableFunctionDataLake<TableFunctionIcebergName, StorageIceberg, TableFunctionS3>;
using TableFunctionIceberg = ITableFunctionDataLake<TableFunctionIcebergName, StorageIcebergS3, TableFunctionS3>;
void registerTableFunctionIceberg(TableFunctionFactory & factory)
{

View File

@ -30,7 +30,6 @@ from helpers.s3_tools import prepare_s3_bucket, upload_directory, get_file_conte
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
TABLE_NAME = "test_delta_table"
USER_FILES_PATH = os.path.join(SCRIPT_DIR, "./_instances/node1/database/user_files")
@ -86,8 +85,12 @@ def generate_data(spark, start, end):
b = spark.range(start + 1, end + 1, 1).toDF("b")
b = b.withColumn("b", b["b"].cast(StringType()))
a = a.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
b = b.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
a = a.withColumn(
"row_index", row_number().over(Window.orderBy(monotonically_increasing_id()))
)
b = b.withColumn(
"row_index", row_number().over(Window.orderBy(monotonically_increasing_id()))
)
df = a.join(b, on=["row_index"]).drop("row_index")
return df
@ -129,6 +132,7 @@ def test_single_log_file(started_cluster):
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
spark = get_spark()
TABLE_NAME = "test_single_log_file"
inserted_data = "SELECT number, toString(number + 1) FROM numbers(100)"
parquet_data_path = create_initial_data_file(instance, inserted_data, TABLE_NAME)
@ -150,8 +154,11 @@ def test_multiple_log_files(started_cluster):
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
spark = get_spark()
TABLE_NAME = "test_multiple_log_files"
write_delta_from_df(spark, generate_data(spark, 0, 100), f"/{TABLE_NAME}", mode="append")
write_delta_from_df(
spark, generate_data(spark, 0, 100), f"/{TABLE_NAME}", mode="overwrite"
)
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
assert len(files) == 1
@ -163,7 +170,9 @@ def test_multiple_log_files(started_cluster):
create_delta_table(instance, TABLE_NAME)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
write_delta_from_df(spark, generate_data(spark, 100, 200), f"/{TABLE_NAME}", mode="append")
write_delta_from_df(
spark, generate_data(spark, 100, 200), f"/{TABLE_NAME}", mode="append"
)
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
assert len(files) == 2
@ -173,7 +182,7 @@ def test_multiple_log_files(started_cluster):
assert len(s3_objects) == 2
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 200
assert instance.query(f"SELECT * FROM {TABLE_NAME}") == instance.query(
assert instance.query(f"SELECT * FROM {TABLE_NAME} ORDER BY 1") == instance.query(
"SELECT number, toString(number + 1) FROM numbers(200)"
)
@ -183,6 +192,7 @@ def test_metadata(started_cluster):
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
spark = get_spark()
TABLE_NAME = "test_metadata"
parquet_data_path = create_initial_data_file(
instance, "SELECT number, toString(number) FROM numbers(100)", TABLE_NAME
@ -209,7 +219,9 @@ def test_metadata(started_cluster):
def test_types(started_cluster):
spark = get_spark()
TABLE_NAME = "test_types"
result_file = f"{TABLE_NAME}_result_2"
delta_table = (
DeltaTable.create(spark)
.tableName(TABLE_NAME)