This commit is contained in:
kssenii 2023-03-21 20:19:30 +01:00
parent d9053b8a7c
commit 19819f12f3
8 changed files with 246 additions and 294 deletions

View File

@ -117,6 +117,8 @@ std::vector<String> S3DataLakeMetadataReadHelper::listFiles(const StorageS3::Con
is_finished = !outcome.GetResult().GetIsTruncated();
}
LOG_TRACE(&Poco::Logger::get("S3DataLakeMetadataReadHelper"), "Listed {} files", keys.size());
return keys;
}
}

View File

@ -1,149 +1,15 @@
#include "config.h"
#if USE_AWS_S3
#include <Storages/StorageDeltaLake.h>
#include <Common/logger_useful.h>
#include <Storages/StorageFactory.h>
#include <Formats/FormatFactory.h>
#include <QueryPipeline/Pipe.h>
#include <fmt/format.h>
namespace DB
{
namespace ErrorCodes
{
extern const int S3_ERROR;
extern const int INCORRECT_DATA;
}
void DeltaLakeMetadata::setLastModifiedTime(const String & filename, uint64_t timestamp)
{
file_update_time[filename] = timestamp;
}
void DeltaLakeMetadata::remove(const String & filename, uint64_t /*timestamp */)
{
bool erase = file_update_time.erase(filename);
if (!erase)
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid table metadata, tried to remove {} before adding it", filename);
}
std::vector<String> DeltaLakeMetadata::listCurrentFiles() &&
{
std::vector<String> keys;
keys.reserve(file_update_time.size());
for (auto && [k, _] : file_update_time)
keys.push_back(k);
return keys;
}
template <typename Configuration, typename MetadataReadHelper>
DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::DeltaLakeMetadataParser(const Configuration & configuration_, ContextPtr context)
: base_configuration(configuration_)
{
init(context);
}
template <typename Configuration, typename MetadataReadHelper>
void DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::init(ContextPtr context)
{
auto keys = getJsonLogFiles();
// read data from every json log file
for (const String & key : keys)
{
auto buf = MetadataReadHelper::createReadBuffer(key, context, base_configuration);
char c;
while (!buf->eof())
{
/// May be some invalid characters before json.
while (buf->peek(c) && c != '{')
buf->ignore();
if (buf->eof())
break;
String json_str;
readJSONObjectPossiblyInvalid(json_str, *buf);
if (json_str.empty())
continue;
const JSON json(json_str);
handleJSON(json);
}
}
}
template <typename Configuration, typename MetadataReadHelper>
std::vector<String> DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::getJsonLogFiles() const
{
/// DeltaLake format stores all metadata json files in _delta_log directory
static constexpr auto deltalake_metadata_directory = "_delta_log";
static constexpr auto meta_file_suffix = ".json";
return MetadataReadHelper::listFilesMatchSuffix(base_configuration, deltalake_metadata_directory, meta_file_suffix);
}
template <typename Configuration, typename MetadataReadHelper>
void DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::handleJSON(const JSON & json)
{
if (json.has("add"))
{
auto path = json["add"]["path"].getString();
auto timestamp = json["add"]["modificationTime"].getInt();
metadata.setLastModifiedTime(path, timestamp);
}
else if (json.has("remove"))
{
auto path = json["remove"]["path"].getString();
auto timestamp = json["remove"]["deletionTimestamp"].getInt();
metadata.remove(path, timestamp);
}
}
// DeltaLake stores data in parts in different files
// keys is vector of parts with latest version
// generateQueryFromKeys constructs query from parts filenames for
// underlying StorageS3 engine
template <typename Configuration, typename MetadataReadHelper>
String DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::generateQueryFromKeys(const std::vector<String> & keys, const String &)
{
if (keys.size() == 1)
{
return fmt::format("{}", keys[0]);
}
else
{
return fmt::format("{{{}}}", fmt::join(keys, ","));
}
}
template DeltaLakeMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::DeltaLakeMetadataParser(
const StorageS3::Configuration & configuration_, ContextPtr context);
template std::vector<String> DeltaLakeMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::getFiles();
template String DeltaLakeMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::generateQueryFromKeys(
const std::vector<String> & keys, const String & format);
template void DeltaLakeMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::init(ContextPtr context);
template std::vector<String> DeltaLakeMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::getJsonLogFiles() const;
template void DeltaLakeMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::handleJSON(const JSON & json);
void registerStorageDeltaLake(StorageFactory & factory)
{
factory.registerStorage(
@ -151,11 +17,8 @@ void registerStorageDeltaLake(StorageFactory & factory)
[](const StorageFactory::Arguments & args)
{
StorageS3::Configuration configuration = StorageDeltaLake::getConfiguration(args.engine_args, args.getLocalContext());
auto format_settings = getFormatSettings(args.getContext());
return std::make_shared<StorageDeltaLake>(
configuration, args.table_id, args.columns, args.constraints, args.comment, args.getContext(), format_settings);
configuration, args.table_id, args.columns, args.constraints, args.comment, args.getContext(), getFormatSettings(args.getContext()));
},
{
.supports_settings = false,

View File

@ -4,50 +4,151 @@
#if USE_AWS_S3
# include <Storages/IStorageDataLake.h>
# include <Storages/S3DataLakeMetadataReadHelper.h>
# include <Storages/StorageS3.h>
# include <base/JSON.h>
#include <Storages/IStorageDataLake.h>
#include <Storages/S3DataLakeMetadataReadHelper.h>
#include <Storages/StorageS3.h>
#include <base/JSON.h>
namespace DB
{
// class to parse json deltalake metadata and find files needed for query in table
class DeltaLakeMetadata
{
public:
DeltaLakeMetadata() = default;
/**
* Documentation links:
* - https://github.com/delta-io/delta/blob/master/PROTOCOL.md#data-files
*/
void setLastModifiedTime(const String & filename, uint64_t timestamp);
void remove(const String & filename, uint64_t timestamp);
std::vector<String> listCurrentFiles() &&;
private:
std::unordered_map<String, uint64_t> file_update_time;
};
// class to get deltalake log json files and read json from them
template <typename Configuration, typename MetadataReadHelper>
class DeltaLakeMetadataParser
{
public:
DeltaLakeMetadataParser(const Configuration & configuration_, ContextPtr context);
DeltaLakeMetadataParser(const Configuration & storage_configuration_, ContextPtr context)
: storage_configuration(storage_configuration_)
{
processMetadataFiles(context);
}
std::vector<String> getFiles() { return std::move(metadata).listCurrentFiles(); }
Strings getFiles() { return Strings(data_files.begin(), data_files.end()); }
static String generateQueryFromKeys(const std::vector<String> & keys, const String & format);
static String generateQueryFromKeys(const std::vector<String> & keys, const std::string & /* format */)
{
if (keys.size() == 1)
{
return fmt::format("{}", keys[0]);
}
else
{
return fmt::format("{{{}}}", fmt::join(keys, ","));
}
}
private:
void init(ContextPtr context);
/**
* Delta files are stored as JSON in a directory at the root of the table named _delta_log,
* and together with checkpoints make up the log of all changes that have occurred to a table.
*/
std::vector<String> getMetadataFiles() const
{
/// DeltaLake format stores all metadata json files in _delta_log directory
static constexpr auto deltalake_metadata_directory = "_delta_log";
static constexpr auto metadata_file_suffix = ".json";
std::vector<String> getJsonLogFiles() const;
return MetadataReadHelper::listFilesMatchSuffix(
storage_configuration, deltalake_metadata_directory, metadata_file_suffix);
}
void handleJSON(const JSON & json);
/**
* Delta files are the unit of atomicity for a table,
* and are named using the next available version number, zero-padded to 20 digits.
* For example:
* ./_delta_log/00000000000000000000.json
*
* A delta file, n.json, contains an atomic set of actions that should be applied to the
* previous table state (n-1.json) in order to the construct nth snapshot of the table.
* An action changes one aspect of the table's state, for example, adding or removing a file.
*/
void processMetadataFiles(ContextPtr context)
{
const auto keys = getMetadataFiles();
for (const String & key : keys)
{
auto buf = MetadataReadHelper::createReadBuffer(key, context, storage_configuration);
Configuration base_configuration;
DeltaLakeMetadata metadata;
char c;
while (!buf->eof())
{
/// May be some invalid characters before json.
while (buf->peek(c) && c != '{')
buf->ignore();
if (buf->eof())
break;
String json_str;
readJSONObjectPossiblyInvalid(json_str, *buf);
if (json_str.empty())
continue;
const JSON json(json_str);
handleJSON(json);
}
}
}
/**
* Example of content of a single .json metadata file:
* "
* {"commitInfo":{
* "timestamp":1679424650713,
* "operation":"WRITE",
* "operationParameters":{"mode":"Overwrite","partitionBy":"[]"},
* "isolationLevel":"Serializable",
* "isBlindAppend":false,
* "operationMetrics":{"numFiles":"1","numOutputRows":"100","numOutputBytes":"2560"},
* "engineInfo":"Apache-Spark/3.3.2 Delta-Lake/2.2.0",
* "txnId":"8cb5814d-1009-46ad-a2f8-f1e7fdf4da56"}}
* {"protocol":{"minReaderVersion":2,"minWriterVersion":5}}
* {"metaData":{
* "id":"bd11ad96-bc2c-40b0-be1f-6fdd90d04459",
* "format":{"provider":"parquet","options":{}},
* "schemaString":"{
* \"type\":\"struct\",\"fields\":[{\"name\":\"number\",\"type\":\"decimal(20,0)\",\"nullable\":true,\"metadata\":{\"delta.columnMapping.id\":1,\"delta.columnMapping.physicalName\":\"col-6c990940-59bb-4709-8f2e-17083a82c01a\"}},{\"name\":\"toString(number)\",\"type\":\"binary\",\"nullable\":true,\"metadata\":{\"delta.columnMapping.id\":2,\"delta.columnMapping.physicalName\":\"col-763cd7e2-7627-4d8e-9fb7-9e85d0c8845b\"}}]}",
* "partitionColumns":[],
* "configuration":{"delta.columnMapping.mode":"name","delta.columnMapping.maxColumnId":"2"},
* "createdTime":1679424648640}}
* {"add":{
* "path":"part-00000-ecf8ed08-d04a-4a71-a5ec-57d8bb2ab4ee-c000.parquet",
* "partitionValues":{},
* "size":2560,
* "modificationTime":1679424649568,
* "dataChange":true,
* "stats":"{
* \"numRecords\":100,
* \"minValues\":{\"col-6c990940-59bb-4709-8f2e-17083a82c01a\":0},
* \"maxValues\":{\"col-6c990940-59bb-4709-8f2e-17083a82c01a\":99},
* \"nullCount\":{\"col-6c990940-59bb-4709-8f2e-17083a82c01a\":0,\"col-763cd7e2-7627-4d8e-9fb7-9e85d0c8845b\":0}}"}}
* "
*/
void handleJSON(const JSON & json)
{
if (json.has("add"))
{
const auto path = json["add"]["path"].getString();
const auto [_, inserted] = data_files.insert(path);
if (!inserted)
throw Exception(ErrorCodes::INCORRECT_DATA, "File already exists {}", path);
}
else if (json.has("remove"))
{
const auto path = json["remove"]["path"].getString();
const bool erase = data_files.erase(path);
if (!erase)
throw Exception(ErrorCodes::INCORRECT_DATA, "File doesn't exist {}", path);
}
}
Configuration storage_configuration;
std::set<String> data_files;
};
struct StorageDeltaLakeName

View File

@ -3,107 +3,13 @@
#if USE_AWS_S3
#include <Storages/StorageHudi.h>
#include <Common/logger_useful.h>
#include <Formats/FormatFactory.h>
#include <Storages/StorageFactory.h>
#include <QueryPipeline/Pipe.h>
#include <ranges>
namespace DB
{
namespace ErrorCodes
{
extern const int S3_ERROR;
extern const int LOGICAL_ERROR;
}
template <typename Configuration, typename MetadataReadHelper>
HudiMetadataParser<Configuration, MetadataReadHelper>::HudiMetadataParser(const Configuration & configuration_, ContextPtr context_)
: configuration(configuration_), context(context_), log(&Poco::Logger::get("StorageHudi"))
{
}
/// Apache Hudi store parts of data in different files.
/// Every part file has timestamp in it.
/// Every partition(directory) in Apache Hudi has different versions of part.
/// To find needed parts we need to find out latest part file for every partition.
/// Part format is usually parquet, but can differ.
template <typename Configuration, typename MetadataReadHelper>
String HudiMetadataParser<Configuration, MetadataReadHelper>::generateQueryFromKeys(const std::vector<std::string> & keys, const String & format)
{
/// For each partition path take only latest file.
struct FileInfo
{
String filename;
UInt64 timestamp;
};
std::unordered_map<String, FileInfo> latest_parts; /// Partition path (directory) -> latest part file info.
/// Make format lowercase.
const auto expected_extension= "." + Poco::toLower(format);
/// Filter only files with specific format.
auto keys_filter = [&](const String & key) { return std::filesystem::path(key).extension() == expected_extension; };
for (const auto & key : keys | std::views::filter(keys_filter))
{
const auto key_path = fs::path(key);
const String filename = key_path.filename();
const String partition_path = key_path.parent_path();
/// Every filename contains metadata split by "_", timestamp is after last "_".
const auto delim = key.find_last_of('_') + 1;
if (delim == std::string::npos)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected format of metadata files");
const auto timestamp = parse<UInt64>(key.substr(delim + 1));
auto it = latest_parts.find(partition_path);
if (it == latest_parts.end())
{
latest_parts.emplace(partition_path, FileInfo{filename, timestamp});
}
else if (it->second.timestamp < timestamp)
{
it->second = {filename, timestamp};
}
}
std::string list_of_keys;
LOG_TEST(&Poco::Logger::get("kssenii"), "files: {}", latest_parts.size());
for (const auto & [directory, file_info] : latest_parts)
{
if (!list_of_keys.empty())
list_of_keys += ",";
LOG_TEST(&Poco::Logger::get("kssenii"), "Directory: {}, file: {}", directory, file_info.filename);
list_of_keys += file_info.filename;
}
if (latest_parts.size() == 1)
return list_of_keys;
return "{" + list_of_keys + "}";
}
template <typename Configuration, typename MetadataReadHelper>
std::vector<std::string> HudiMetadataParser<Configuration, MetadataReadHelper>::getFiles() const
{
auto result = MetadataReadHelper::listFiles(configuration);
LOG_TEST(&Poco::Logger::get("kssenii"), "result files: {}", result.size());
return result;
}
template HudiMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::HudiMetadataParser(
const StorageS3::Configuration & configuration_, ContextPtr context_);
template std::vector<String> HudiMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::getFiles() const;
template String HudiMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::generateQueryFromKeys(
const std::vector<String> & keys, const String & format);
void registerStorageHudi(StorageFactory & factory)
{
factory.registerStorage(
@ -111,11 +17,9 @@ void registerStorageHudi(StorageFactory & factory)
[](const StorageFactory::Arguments & args)
{
StorageS3::Configuration configuration = StorageHudi::getConfiguration(args.engine_args, args.getLocalContext());
auto format_settings = getFormatSettings(args.getContext());
return std::make_shared<StorageHudi>(
configuration, args.table_id, args.columns, args.constraints, args.comment, args.getContext(), format_settings);
configuration, args.table_id, args.columns, args.constraints, args.comment,
args.getContext(), getFormatSettings(args.getContext()));
},
{
.supports_settings = false,

View File

@ -4,24 +4,93 @@
#if USE_AWS_S3
# include <Storages/IStorage.h>
# include <Storages/IStorageDataLake.h>
# include <Storages/S3DataLakeMetadataReadHelper.h>
# include <Storages/StorageS3.h>
#include <Storages/IStorage.h>
#include <Storages/IStorageDataLake.h>
#include <Storages/S3DataLakeMetadataReadHelper.h>
#include <Storages/StorageS3.h>
#include <Common/logger_useful.h>
#include <ranges>
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace DB
{
template <typename Configuration, typename MetadataReadHelper>
class HudiMetadataParser
{
public:
HudiMetadataParser(const Configuration & configuration_, ContextPtr context_);
HudiMetadataParser(const Configuration & configuration_, ContextPtr context_)
: configuration(configuration_), context(context_), log(&Poco::Logger::get("StorageHudi")) {}
std::vector<String> getFiles() const;
Strings getFiles() const { return MetadataReadHelper::listFiles(configuration); }
static String generateQueryFromKeys(const std::vector<String> & keys, const String & format);
/** Apache Hudi store parts of data in different files.
* Every part file has timestamp in it.
* Every partition(directory) in Apache Hudi has different versions of part.
* To find needed parts we need to find out latest part file for every partition.
* Part format is usually parquet, but can differ.
*/
static String generateQueryFromKeys(const std::vector<String> & keys, const String & format)
{
auto * log = &Poco::Logger::get("HudiMetadataParser");
struct FileInfo
{
String filename;
UInt64 timestamp;
};
std::unordered_map<String, FileInfo> latest_parts; /// Partition path (directory) -> latest part file info.
/// Make format lowercase.
const auto expected_extension= "." + Poco::toLower(format);
/// Filter only files with specific format.
auto keys_filter = [&](const String & key) { return std::filesystem::path(key).extension() == expected_extension; };
/// For each partition path take only latest file.
for (const auto & key : keys | std::views::filter(keys_filter))
{
const auto key_path = fs::path(key);
/// Every filename contains metadata split by "_", timestamp is after last "_".
const auto delim = key.find_last_of('_') + 1;
if (delim == std::string::npos)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected format of metadata files");
const auto timestamp = parse<UInt64>(key.substr(delim + 1));
const auto [it, inserted] = latest_parts.emplace(/* partition_path */key_path.parent_path(), FileInfo{});
if (inserted)
{
it->second = FileInfo{key_path.filename(), timestamp};
}
else if (it->second.timestamp < timestamp)
{
it->second = {key_path.filename(), timestamp};
}
}
LOG_TRACE(log, "Having {} result partitions", latest_parts.size());
std::string list_of_keys;
for (const auto & [directory, file_info] : latest_parts)
{
if (!list_of_keys.empty())
list_of_keys += ",";
LOG_TEST(log, "Partition: {}, file: {}, timestamp: {}", directory, file_info.filename, file_info.timestamp);
list_of_keys += file_info.filename;
}
if (latest_parts.size() == 1)
return list_of_keys;
return "{" + list_of_keys + "}";
}
private:
Configuration configuration;

View File

@ -5,6 +5,7 @@ import json
def upload_directory(minio_client, bucket_name, local_path, s3_path):
result_files=[]
for local_file in glob.glob(local_path + "/**"):
if os.path.isfile(local_file):
result_local_path = os.path.join(local_path, local_file)
@ -15,6 +16,7 @@ def upload_directory(minio_client, bucket_name, local_path, s3_path):
object_name=result_s3_path,
file_path=result_local_path,
)
result_files.append(result_s3_path)
else:
upload_directory(
minio_client,
@ -22,6 +24,7 @@ def upload_directory(minio_client, bucket_name, local_path, s3_path):
os.path.join(local_path, local_file),
os.path.join(s3_path, local_file),
)
return result_files
def get_file_contents(minio_client, bucket, s3_path):

View File

@ -29,9 +29,7 @@ from helpers.s3_tools import prepare_s3_bucket, upload_directory, get_file_conte
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
TABLE_NAME = "test_delta_table"
USER_FILES_PATH = os.path.join(
SCRIPT_DIR, "./_instances/node1/database/user_files"
)
USER_FILES_PATH = os.path.join(SCRIPT_DIR, "./_instances/node1/database/user_files")
@pytest.fixture(scope="module")
@ -73,35 +71,36 @@ def get_delta_metadata(delta_metadata_file):
return combined_json
def create_initial_data_file(node, query, table_name):
data_path = f"/var/lib/clickhouse/user_files/{table_name}.parquet"
node.query(
f"INSERT INTO TABLE FUNCTION file('{data_path}') SETTINGS output_format_parquet_compression_method='none', s3_truncate_on_insert=1 {query} FORMAT Parquet"
)
def write_delta(spark, path, result_path):
spark.read.load(path).write.mode("overwrite").option("compression", "none").format(
"delta"
).option("delta.columnMapping.mode", "name").save(result_path)
def test_basic(started_cluster):
instance = started_cluster.instances["node1"]
data_path = f"/var/lib/clickhouse/user_files/{TABLE_NAME}.parquet"
inserted_data = "SELECT number, toString(number) FROM numbers(100)"
instance.query(
f"INSERT INTO TABLE FUNCTION file('{data_path}') {inserted_data} FORMAT Parquet"
)
create_initial_data_file(instance, inserted_data, TABLE_NAME)
instance.exec_in_container(
["bash", "-c", "chmod 777 -R /var/lib/clickhouse/user_files"],
user="root",
)
spark = get_spark()
data_path = f"{USER_FILES_PATH}/{TABLE_NAME}.parquet"
result_path = f"/{TABLE_NAME}_result"
spark.read.load(f"file://{USER_FILES_PATH}/{TABLE_NAME}.parquet").write.mode(
"overwrite"
).option("compression", "none").format("delta").option(
"delta.columnMapping.mode", "name"
).save(
result_path
)
spark = get_spark()
write_delta(spark, data_path, result_path)
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
upload_directory(minio_client, bucket, result_path, "")
output_format_parquet_compression_method = "none"
data = get_file_contents(
minio_client,
bucket,
@ -124,6 +123,15 @@ def test_basic(started_cluster):
)
# def test_several_actions(started_cluster):
# instance = started_cluster.instances["node1"]
#
# inserted_data = "SELECT number, toString(number) FROM numbers(100)"
# create_initial_data_file(query, TABLE_NAME)
#
# spark = get_spark()
def test_types(started_cluster):
spark = get_spark()
result_file = f"{TABLE_NAME}_result_2"

View File

@ -106,7 +106,9 @@ def test_basic(started_cluster):
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
upload_directory(minio_client, bucket, result_path, "")
paths = upload_directory(minio_client, bucket, result_path, "")
assert len(paths) == 1
assert paths[0].endswith(".parquet")
instance.query(
f"""