implement storage iceberg

This commit is contained in:
flynn 2023-01-18 03:43:20 +00:00
parent 92646e3c27
commit a8e1363151
4 changed files with 216 additions and 120 deletions

View File

@ -68,35 +68,35 @@ namespace ErrorCodes
extern const int CANNOT_READ_ALL_DATA;
}
class InputStreamReadBufferAdapter : public avro::InputStream
bool AvroInputStreamReadBufferAdapter::next(const uint8_t ** data, size_t * len)
{
public:
explicit InputStreamReadBufferAdapter(ReadBuffer & in_) : in(in_) {}
bool next(const uint8_t ** data, size_t * len) override
if (in.eof())
{
if (in.eof())
{
*len = 0;
return false;
}
*data = reinterpret_cast<const uint8_t *>(in.position());
*len = in.available();
in.position() += in.available();
return true;
*len = 0;
return false;
}
void backup(size_t len) override { in.position() -= len; }
*data = reinterpret_cast<const uint8_t *>(in.position());
*len = in.available();
void skip(size_t len) override { in.tryIgnore(len); }
in.position() += in.available();
return true;
}
size_t byteCount() const override { return in.count(); }
void AvroInputStreamReadBufferAdapter::backup(size_t len)
{
in.position() -= len;
}
private:
ReadBuffer & in;
};
void AvroInputStreamReadBufferAdapter::skip(size_t len)
{
in.tryIgnore(len);
}
size_t AvroInputStreamReadBufferAdapter::byteCount() const
{
return in.count();
}
/// Insert value with conversion to the column of target type.
template <typename T>
@ -758,7 +758,7 @@ AvroRowInputFormat::AvroRowInputFormat(const Block & header_, ReadBuffer & in_,
void AvroRowInputFormat::readPrefix()
{
file_reader_ptr = std::make_unique<avro::DataFileReaderBase>(std::make_unique<InputStreamReadBufferAdapter>(*in));
file_reader_ptr = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*in));
deserializer_ptr = std::make_unique<AvroDeserializer>(
output.getHeader(), file_reader_ptr->dataSchema(), format_settings.avro.allow_missing_fields, format_settings.avro.null_as_default);
file_reader_ptr->init();
@ -915,7 +915,7 @@ AvroConfluentRowInputFormat::AvroConfluentRowInputFormat(
void AvroConfluentRowInputFormat::readPrefix()
{
input_stream = std::make_unique<InputStreamReadBufferAdapter>(*in);
input_stream = std::make_unique<AvroInputStreamReadBufferAdapter>(*in);
decoder = avro::binaryDecoder();
decoder->init(*input_stream);
}
@ -972,7 +972,7 @@ NamesAndTypesList AvroSchemaReader::readSchema()
}
else
{
auto file_reader_ptr = std::make_unique<avro::DataFileReaderBase>(std::make_unique<InputStreamReadBufferAdapter>(in));
auto file_reader_ptr = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(in));
root_node = file_reader_ptr->dataSchema().root();
}

View File

@ -28,6 +28,23 @@ namespace ErrorCodes
extern const int INCORRECT_DATA;
}
class AvroInputStreamReadBufferAdapter : public avro::InputStream
{
public:
explicit AvroInputStreamReadBufferAdapter(ReadBuffer & in_) : in(in_) {}
bool next(const uint8_t ** data, size_t * len) override;
void backup(size_t len) override;
void skip(size_t len) override;
size_t byteCount() const override;
private:
ReadBuffer & in;
};
class AvroDeserializer
{
public:
@ -185,8 +202,8 @@ public:
NamesAndTypesList readSchema() override;
static DataTypePtr avroNodeToDataType(avro::NodePtr node);
private:
DataTypePtr avroNodeToDataType(avro::NodePtr node);
bool confluent;
const FormatSettings format_settings;

View File

@ -25,7 +25,11 @@
# include <fmt/ranges.h>
# include <ranges>
# include <DataFile.hh>
# include <Processors/Formats/Impl/AvroRowInputFormat.h>
# include <Poco/JSON/Array.h>
# include <Poco/JSON/Object.h>
# include <Poco/JSON/Parser.h>
namespace DB
{
@ -35,48 +39,35 @@ namespace ErrorCodes
extern const int S3_ERROR;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int INCORRECT_DATA;
extern const int FILE_DOESNT_EXIST;
}
JsonMetadataGetter::JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, ContextPtr context)
: base_configuration(configuration_), table_path(table_path_)
IcebergMetaParser::IcebergMetaParser(const StorageS3Configuration & configuration_, const String & table_path_, ContextPtr context_)
: base_configuration(configuration_), table_path(table_path_), context(context_)
{
init(context);
}
void JsonMetadataGetter::init(ContextPtr context)
std::vector<String> IcebergMetaParser::getFiles() const
{
auto keys = getJsonLogFiles();
auto metadata = getNewestMetaFile();
auto manifest_list = getManiFestList(metadata);
// read data from every json log file
for (const String & key : keys)
/// When table first created and does not have any data
if (manifest_list.empty())
{
auto buf = createS3ReadBuffer(key, context);
char c;
while (!buf->eof())
{
/// May be some invalid characters before json.
while (buf->peek(c) && c != '{')
buf->ignore();
if (buf->eof())
break;
String json_str;
readJSONObjectPossiblyInvalid(json_str, *buf);
if (json_str.empty())
continue;
const JSON json(json_str);
handleJSON(json);
}
return {};
}
auto manifest_files = getManifestFiles(manifest_list);
return getFilesForRead(manifest_files);
}
std::vector<String> JsonMetadataGetter::getJsonLogFiles()
String IcebergMetaParser::getNewestMetaFile() const
{
std::vector<String> keys;
/// Iceberg stores all the metadata.json in metadata directory, and the
/// newest version has the max version name, so we should list all of them
/// then find the newest metadata.
std::vector<String> metadata_files;
const auto & client = base_configuration.client;
@ -88,9 +79,8 @@ std::vector<String> JsonMetadataGetter::getJsonLogFiles()
request.SetBucket(bucket);
/// DeltaLake format stores all metadata json files in _delta_log directory
static constexpr auto deltalake_metadata_directory = "_delta_log";
request.SetPrefix(std::filesystem::path(table_path) / deltalake_metadata_directory);
static constexpr auto metadata_directory = "metadata";
request.SetPrefix(std::filesystem::path(table_path) / metadata_directory);
while (!is_finished)
{
@ -109,27 +99,156 @@ std::vector<String> JsonMetadataGetter::getJsonLogFiles()
{
const auto & filename = obj.GetKey();
// DeltaLake metadata files have json extension
if (std::filesystem::path(filename).extension() == ".json")
keys.push_back(filename);
metadata_files.push_back(filename);
}
/// Needed in case any more results are available
/// if so, we will continue reading, and not read keys that were already read
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
/// Set to false if all of the results were returned. Set to true if more keys
/// are available to return. If the number of results exceeds that specified by
/// MaxKeys, all of the results might not be returned
is_finished = !outcome.GetResult().GetIsTruncated();
}
return keys;
if (metadata_files.empty())
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "The metadata file for Iceberg table with path {} doesn't exist", table_path);
auto it = std::max_element(metadata_files.begin(), metadata_files.end());
return *it;
}
std::shared_ptr<ReadBuffer> JsonMetadataGetter::createS3ReadBuffer(const String & key, ContextPtr context)
String IcebergMetaParser::getManiFestList(String metadata_name) const
{
auto buffer = createS3ReadBuffer(metadata_name, context);
String json_str;
readJSONObjectPossiblyInvalid(json_str, file);
/// Looks like base/base/JSON.h can not parse this json file
Poco::JSON::Parser parser;
Poco::Dynamic::Var json = parser.parse(json_str);
Poco::JSON::Object::Ptr object = json.extract<Poco::JSON::Object::Ptr>();
auto current_snapshot_id = object->getValue<Int64>("current-snapshot-id");
auto snapshots = object->get("snapshots").extract<Poco::JSON::Array::Ptr>();
for (size_t i = 0; i < snapshots->size(); ++i)
{
auto snapshot = snapshots->getObject(static_cast<UInt32>(i));
if (snapshot->getValue<Int64>("snapshot-id") == current_snapshot_id)
return object->getValue<String>("manifest-list");
}
return {};
}
static ColumnPtr
parseAvro(const std::uniq_ptr<avro::DataFileReaderBase> & file_reader, const DataTypePtr & data_type, const String & field_name)
{
auto deserializer = std::make_unique<AvroDeserializer>(
Block{{data_type->createColumn(), data_type, field_name}}, file_reader->dataSchema(), true, true);
file_reader->init();
MutableColumns columns;
columns.emplace_back(data_type->createColumn());
RowReadExtension ext;
while (file_reader->hasMore())
{
file_reader->decr();
deserializer->deserializeRow(columns, file_reader->decoder, ext);
}
return columns.at(0);
}
std::vector<String> IcebergMetaParser::getManifestFiles(const String & manifest_list) const
{
auto buffer = createS3ReadBuffer(manifest_list, context);
auto file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(in));
static constexpr manifest_path = "manifest_path";
/// The manifest_path is the first field in manifest list file,
/// And its have String data type
/// {'manifest_path': 'xxx', ...}
auto data_type = AvroSchemaReader::avroNodeToDataType(file_reader->dataSchema().root()->leafAt(0));
auto col = parseAvro(file_reader, data_type, manifest_path);
std::vector<String> res;
if (col->getDataType() == TypeIndex::String)
{
const auto * col_str = typeid_cast<ColumnString *>(col.get());
size_t col_size = col_str->size();
for (size_t i = 0; i < col_size; ++i)
{
auto file_path = col_str[i].safeGet<String>();
/// We just need obtain the file name
std::filesystem::path path(file_path);
res.emplace_back(path.filename());
}
return res;
}
Throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"The parsed column from Avro file for manifest_path should have data type String, but get {}",
col->getFamilyName());
}
std::vector<String> IcebergMetaParser::getFilesForRead(const std::vector<String> & manifest_files) const
{
std::vector<String> keys;
for (const auto & manifest_file : manifest_files)
{
auto buffer = createS3ReadBuffer(manifest_file, context);
auto file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(in));
static constexpr manifest_path = "data_file";
/// The data_file filed at the 3rd position of the manifest file:
/// {'status': xx, 'snapshot_id': xx, 'data_file': {'file_path': 'xxx', ...}, ...}
/// and it's also a nested record, so its result type is a nested Tuple
auto data_type = AvroSchemaReader::avroNodeToDataType(file_reader->dataSchema().root()->leafAt(2));
auto col = parseAvro(file_reader, data_type, manifest_path);
std::vector<String> res;
if (col->getDataType() == TypeIndex::Tuple)
{
auto * col_tuple = typeid_cast<ColumnTuple *>(col.get());
auto * col_str = col_tuple->getColumnPtr(0);
if (col_str->getDataType() == TypeIndex::String)
{
const auto * str_col = typeid_cast<ColumnString *>(col_str.get());
size_t col_size = str_col->size();
for (size_t i = 0; i < col_size; ++i)
{
auto file_path = std_col[i].safeGet<String>();
/// We just obtain the parition/file name
std::filesystem::path path(file_path);
res.emplace_back(path.parent_path().filename() + '/' + path.filename());
}
}
else
{
Throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"The parsed column from Avro file for file_path should have data type String, got {}",
col_str->getFamilyName());
}
}
else
{
Throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"The parsed column from Avro file for data_file field should have data type Tuple, got {}",
col->getFamilyName());
}
}
return res;
}
std::shared_ptr<ReadBuffer> IcebergMetaParser::createS3ReadBuffer(const String & key, ContextPtr context)
{
/// TODO: add parallel downloads
S3Settings::RequestSettings request_settings;
request_settings.max_single_read_retries = 10;
return std::make_shared<ReadBufferFromS3>(
@ -141,24 +260,6 @@ std::shared_ptr<ReadBuffer> JsonMetadataGetter::createS3ReadBuffer(const String
context->getReadSettings());
}
void JsonMetadataGetter::handleJSON(const JSON & json)
{
if (json.has("add"))
{
auto path = json["add"]["path"].getString();
auto timestamp = json["add"]["modificationTime"].getInt();
metadata.setLastModifiedTime(path, timestamp);
}
else if (json.has("remove"))
{
auto path = json["remove"]["path"].getString();
auto timestamp = json["remove"]["deletionTimestamp"].getInt();
metadata.remove(path, timestamp);
}
}
namespace
{
@ -183,10 +284,11 @@ StorageS3Configuration getAdjustedS3Configuration(
const std::string & table_path,
Poco::Logger * log)
{
JsonMetadataGetter getter{base_configuration, table_path, context};
IcebergMetaParser parser{base_configuration, table_path, context};
auto keys = getter.getFiles();
auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(keys);
auto keys = parser.getFiles();
static constexpr iceberg_data_directory = "data";
auto new_uri = std::filesystem::path(base_configuration.uri.uri.toString()) / iceberg_data_directory / generateQueryFromKeys(keys);
LOG_DEBUG(log, "New uri: {}", new_uri);
LOG_DEBUG(log, "Table path: {}", table_path);

View File

@ -34,43 +34,20 @@ class IcebergMetaParser
public:
IcebergMetaParser(const StorageS3Configuration & configuration_, const String & table_path_, ContextPtr context_);
void parseMeta();
String getNewestMetaFile();
String getManiFestList(String metadata);
std::vector<String> getManifestFiles(String manifest_list);
void getFilesForRead(const std::vector<String> manifest_files);
auto getFiles() const {return keys};
std::vector<String> getFiles() const;
private:
std::vector<String> keys;
StorageS3Configuration base_configuration;
String table_path;
ContextPtr context;
};
// class to get deltalake log json files and read json from them
class JsonMetadataGetter
{
public:
JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, ContextPtr context);
std::vector<String> getFiles() { return std::move(metadata).listCurrentFiles(); }
private:
void init(ContextPtr context);
std::vector<String> getJsonLogFiles();
/// Just get file name
String getNewestMetaFile() const;
String getManiFestList(const String & metadata_name) const;
std::vector<String> getManifestFiles(const String & manifest_list) const;
std::vector<String> getFilesForRead(const std::vector<String> & manifest_files);
std::shared_ptr<ReadBuffer> createS3ReadBuffer(const String & key, ContextPtr context);
void handleJSON(const JSON & json);
StorageS3::S3Configuration base_configuration;
String table_path;
DeltaLakeMetadata metadata;
};
class StorageIceberg : public IStorage