Add comments, fix review issues

This commit is contained in:
Daniil Rubin 2022-11-04 20:49:24 +00:00
parent d6bbd65dde
commit 37048e986a
4 changed files with 49 additions and 16 deletions

View File

@ -60,20 +60,20 @@ std::vector<String> DeltaLakeMetadata::ListCurrentFiles() &&
return keys;
}
JsonMetadataGetter::JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_)
JsonMetadataGetter::JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, ContextPtr context)
: base_configuration(configuration_), table_path(table_path_)
{
Init();
Init(context);
}
void JsonMetadataGetter::Init()
void JsonMetadataGetter::Init(ContextPtr context)
{
auto keys = getJsonLogFiles();
// read data from every json log file
for (const String & key : keys)
{
auto buf = createS3ReadBuffer(key);
auto buf = createS3ReadBuffer(key, context);
while (!buf->eof())
{
@ -110,6 +110,8 @@ std::vector<String> JsonMetadataGetter::getJsonLogFiles()
const auto bucket{base_configuration.uri.bucket};
request.SetBucket(bucket);
// DeltaLake format stores all metadata json files in _delta_log directory
request.SetPrefix(std::filesystem::path(table_path) / "_delta_log");
while (!is_finished)
@ -129,18 +131,25 @@ std::vector<String> JsonMetadataGetter::getJsonLogFiles()
{
const auto & filename = obj.GetKey();
// DeltaLake metadata files have json extension
if (std::filesystem::path(filename).extension() == ".json")
keys.push_back(filename);
}
// Needed in case any more results are available
// if so, we will continue reading, and not read keys that were already read
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
/// Set to false if all of the results were returned. Set to true if more keys
/// are available to return. If the number of results exceeds that specified by
/// MaxKeys, all of the results might not be returned
is_finished = !outcome.GetResult().GetIsTruncated();
}
return keys;
}
std::shared_ptr<ReadBuffer> JsonMetadataGetter::createS3ReadBuffer(const String & key)
std::shared_ptr<ReadBuffer> JsonMetadataGetter::createS3ReadBuffer(const String & key, ContextPtr context)
{
// TBD: add parallel downloads
return std::make_shared<ReadBufferFromS3>(
@ -149,7 +158,7 @@ std::shared_ptr<ReadBuffer> JsonMetadataGetter::createS3ReadBuffer(const String
key,
base_configuration.uri.version_id,
/* max single read retries */ 10,
ReadSettings{});
context->getReadSettings());
}
void JsonMetadataGetter::handleJSON(const JSON & json)
@ -186,7 +195,7 @@ StorageDelta::StorageDelta(
StorageInMemoryMetadata storage_metadata;
StorageS3::updateS3Configuration(context_, base_configuration);
JsonMetadataGetter getter{base_configuration, table_path};
JsonMetadataGetter getter{base_configuration, table_path, context_};
auto keys = getter.getFiles();
@ -245,6 +254,9 @@ Pipe StorageDelta::read(
String StorageDelta::generateQueryFromKeys(std::vector<String> && keys)
{
// DeltaLake store data parts in different files
// keys are filenames of parts
// for StorageS3 to read all parts we need format {key1,key2,key3,...keyn}
std::string new_query = fmt::format("{{{}}}", fmt::join(keys, ","));
return new_query;
}
@ -270,6 +282,7 @@ void registerStorageDelta(StorageFactory & factory)
if (engine_args.size() == 4)
configuration.format = checkAndGetLiteralArgument<String>(engine_args[3], "format");
// DeltaLake uses Parquet by default
if (configuration.format == "auto")
configuration.format = "Parquet";

View File

@ -42,16 +42,16 @@ private:
class JsonMetadataGetter
{
public:
JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_);
JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, ContextPtr context);
std::vector<String> getFiles() { return std::move(metadata).ListCurrentFiles(); }
private:
void Init();
void Init(ContextPtr context);
std::vector<String> getJsonLogFiles();
std::shared_ptr<ReadBuffer> createS3ReadBuffer(const String & key);
std::shared_ptr<ReadBuffer> createS3ReadBuffer(const String & key, ContextPtr context);
void handleJSON(const JSON & json);
@ -63,6 +63,9 @@ private:
class StorageDelta : public IStorage
{
public:
// 1. Parses internal file structure of table
// 2. Finds out parts with latest version
// 3. Creates url for underlying StorageS3 enigne to handle reads
StorageDelta(
const StorageS3Configuration & configuration_,
const StorageID & table_id_,
@ -74,6 +77,7 @@ public:
String getName() const override { return "DeltaLake"; }
// Reads latest version of DeltaLake table
Pipe read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
@ -85,6 +89,11 @@ public:
private:
void Init();
// DeltaLake stores data in parts in different files
// keys is vector of parts with latest version
// generateQueryFromKeys constructs query from parts filenames for
// underlying StorageS3 engine
static String generateQueryFromKeys(std::vector<String> && keys);
StorageS3::S3Configuration base_configuration;

View File

@ -142,8 +142,7 @@ std::string StorageHudi::generateQueryFromKeys(std::vector<std::string> && keys,
std::erase_if(keys, [&format](const std::string & s) { return std::filesystem::path(s).extension() != "." + format; });
// for each partition path take only latest file
std::unordered_map<std::string, std::pair<std::string, uint64_t>> latest_parquets;
std::unordered_map<std::string, std::pair<std::string, uint64_t>> latest_parts;
for (const auto & key : keys)
{
@ -161,9 +160,9 @@ std::string StorageHudi::generateQueryFromKeys(std::vector<std::string> && keys,
// every filename contains metadata split by "_", timestamp is after last "_"
uint64_t timestamp = std::stoul(key.substr(key.find_last_of("_") + 1));
auto it = latest_parquets.find(path);
auto it = latest_parts.find(path);
if (it != latest_parquets.end())
if (it != latest_parts.end())
{
if (it->second.second < timestamp)
{
@ -172,13 +171,13 @@ std::string StorageHudi::generateQueryFromKeys(std::vector<std::string> && keys,
}
else
{
latest_parquets[path] = {key, timestamp};
latest_parts[path] = {key, timestamp};
}
}
std::vector<std::string> filtered_keys;
std::transform(
latest_parquets.begin(), latest_parquets.end(), std::back_inserter(filtered_keys), [](const auto & kv) { return kv.second.first; });
latest_parts.begin(), latest_parts.end(), std::back_inserter(filtered_keys), [](const auto & kv) { return kv.second.first; });
std::string new_query;
@ -218,6 +217,7 @@ void registerStorageHudi(StorageFactory & factory)
if (engine_args.size() == 4)
configuration.format = checkAndGetLiteralArgument<String>(engine_args[3], "format");
// Apache Hudi uses Parquet by default
if (configuration.format == "auto")
configuration.format = "Parquet";

View File

@ -23,6 +23,9 @@ namespace DB
class StorageHudi : public IStorage
{
public:
// 1. Parses internal file structure of table
// 2. Finds out parts with latest version
// 3. Creates url for underlying StorageS3 enigne to handle reads
StorageHudi(
const StorageS3Configuration & configuration_,
const StorageID & table_id_,
@ -34,6 +37,8 @@ public:
String getName() const override { return "Hudi"; }
// Reads latest version of Apache Hudi table
Pipe read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
@ -45,6 +50,12 @@ public:
private:
std::vector<std::string> getKeysFromS3();
// Apache Hudi store parts of data in different files
// Every part file has timestamp in it
// Every partition(directory) in Apache Hudi has different versions of part
// To find needed parts we need to find out latest part file for every partition
// Part format is usually parquet, but can differ
static std::string generateQueryFromKeys(std::vector<std::string> && keys, String format);
StorageS3::S3Configuration base_configuration;