Checkpoints

This commit is contained in:
kssenii 2023-04-04 16:00:59 +02:00
parent bbe8c121a1
commit 8c0be0c3be
4 changed files with 325 additions and 77 deletions

View File

@ -6,8 +6,19 @@
#if USE_AWS_S3
#include <Storages/DataLakes/S3MetadataReader.h>
#include <Storages/StorageS3.h>
#include <parquet/file_reader.h>
#include <Processors/Formats/Impl/ArrowBufferedStreams.h>
#include <Processors/Formats/Impl/ParquetBlockInputFormat.h>
#include <Processors/Formats/Impl/ArrowColumnToCHColumn.h>
#include <Formats/FormatFactory.h>
#include <boost/algorithm/string/case_conv.hpp>
#include <parquet/arrow/reader.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnNullable.h>
#include <ranges>
namespace fs = std::filesystem;
namespace DB
{
@ -34,13 +45,57 @@ struct DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::Impl
* For example:
* ./_delta_log/00000000000000000000.json
*/
std::vector<String> getMetadataFiles(const Configuration & configuration)
{
/// DeltaLake format stores all metadata json files in _delta_log directory
static constexpr auto deltalake_metadata_directory = "_delta_log";
static constexpr auto metadata_file_suffix = ".json";
static constexpr auto deltalake_metadata_directory = "_delta_log";
static constexpr auto metadata_file_suffix = ".json";
return MetadataReadHelper::listFiles(configuration, deltalake_metadata_directory, metadata_file_suffix);
std::string withPadding(size_t version)
{
/// File names are zero-padded to 20 digits.
static constexpr auto padding = 20;
const auto version_str = toString(version);
return std::string(padding - version_str.size(), '0') + version_str;
}
/**
* A delta file, n.json, contains an atomic set of actions that should be applied to the
* previous table state (n-1.json) in order to the construct nth snapshot of the table.
* An action changes one aspect of the table's state, for example, adding or removing a file.
* Note: it is not a valid json, but a list of json's, so we read it in a while cycle.
*/
std::set<String> processMetadataFiles(const Configuration & configuration, ContextPtr context)
{
std::set<String> result_files;
const auto checkpoint_version = getCheckpointIfExists(result_files, configuration, context);
if (checkpoint_version)
{
auto current_version = checkpoint_version;
while (true)
{
const auto filename = withPadding(++current_version) + metadata_file_suffix;
const auto file_path = fs::path(configuration.getPath()) / deltalake_metadata_directory / filename;
if (!MetadataReadHelper::exists(file_path, configuration))
break;
processMetadataFile(file_path, result_files, configuration, context);
}
LOG_TRACE(
log, "Processed metadata files from checkpoint {} to {}",
checkpoint_version, current_version - 1);
}
else
{
const auto keys = MetadataReadHelper::listFiles(
configuration, deltalake_metadata_directory, metadata_file_suffix);
for (const String & key : keys)
processMetadataFile(key, result_files, configuration, context);
}
return result_files;
}
/**
@ -72,60 +127,207 @@ struct DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::Impl
* \"nullCount\":{\"col-6c990940-59bb-4709-8f2e-17083a82c01a\":0,\"col-763cd7e2-7627-4d8e-9fb7-9e85d0c8845b\":0}}"}}
* "
*/
void handleJSON(const JSON & json, const String & prefix, std::set<String> & result)
void processMetadataFile(
const String & key,
std::set<String> & result,
const Configuration & configuration,
ContextPtr context)
{
if (json.has("add"))
auto buf = MetadataReadHelper::createReadBuffer(key, context, configuration);
char c;
while (!buf->eof())
{
const auto path = json["add"]["path"].getString();
const auto [_, inserted] = result.insert(fs::path(prefix) / path);
if (!inserted)
throw Exception(ErrorCodes::INCORRECT_DATA, "File already exists {}", path);
}
else if (json.has("remove"))
{
const auto path = json["remove"]["path"].getString();
const bool erase = result.erase(fs::path(prefix) / path);
if (!erase)
throw Exception(ErrorCodes::INCORRECT_DATA, "File doesn't exist {}", path);
/// May be some invalid characters before json.
while (buf->peek(c) && c != '{')
buf->ignore();
if (buf->eof())
break;
String json_str;
readJSONObjectPossiblyInvalid(json_str, *buf);
if (json_str.empty())
continue;
const JSON json(json_str);
if (json.has("add"))
{
const auto path = json["add"]["path"].getString();
const auto [_, inserted] = result.insert(fs::path(configuration.getPath()) / path);
if (!inserted)
throw Exception(ErrorCodes::INCORRECT_DATA, "File already exists {}", path);
}
else if (json.has("remove"))
{
const auto path = json["remove"]["path"].getString();
const bool erase = result.erase(fs::path(configuration.getPath()) / path);
if (!erase)
throw Exception(ErrorCodes::INCORRECT_DATA, "File doesn't exist {}", path);
}
}
}
/**
* A delta file, n.json, contains an atomic set of actions that should be applied to the
* previous table state (n-1.json) in order to the construct nth snapshot of the table.
* An action changes one aspect of the table's state, for example, adding or removing a file.
* Note: it is not a valid json, but a list of json's, so we read it in a while cycle.
* Checkpoints in delta-lake are created each 10 commits by default.
* Latest checkpoint is written in _last_checkpoint file: _delta_log/_last_checkpoint
*
* _last_checkpoint contains the following:
* {"version":20,
* "size":23,
* "sizeInBytes":14057,
* "numOfAddFiles":21,
* "checkpointSchema":{...}}
*
* We need to get "version", which is the version of the checkpoint we need to read.
*/
std::set<String> processMetadataFiles(const Configuration & configuration, ContextPtr context)
size_t readLastCheckpointIfExists(const Configuration & configuration, ContextPtr context)
{
std::set<String> result;
const auto keys = getMetadataFiles(configuration);
for (const String & key : keys)
const auto last_checkpoint_file = fs::path(configuration.getPath()) / deltalake_metadata_directory / "_last_checkpoint";
if (!MetadataReadHelper::exists(last_checkpoint_file, configuration))
return 0;
String json_str;
auto buf = MetadataReadHelper::createReadBuffer(last_checkpoint_file, context, configuration);
readJSONObjectPossiblyInvalid(json_str, *buf);
const JSON json(json_str);
const auto version = json["version"].getUInt();
LOG_TRACE(log, "Last checkpoint file version: {}", version);
return version;
}
/**
* The format of the checkpoint file name can take one of two forms:
* 1. A single checkpoint file for version n of the table will be named n.checkpoint.parquet.
* For example:
* 00000000000000000010.checkpoint.parquet
* 2. A multi-part checkpoint for version n can be fragmented into p files. Fragment o of p is
* named n.checkpoint.o.p.parquet. For example:
* 00000000000000000010.checkpoint.0000000001.0000000003.parquet
* 00000000000000000010.checkpoint.0000000002.0000000003.parquet
* 00000000000000000010.checkpoint.0000000003.0000000003.parquet
* TODO: Only (1) is supported, need to support (2).
*
* Such checkpoint files parquet data with the following contents:
*
* Row 1:
*
* txn: (NULL,NULL,NULL)
* add: ('part-00000-1e9cd0c1-57b5-43b4-9ed8-39854287b83a-c000.parquet',{},1070,1680614340485,false,{},'{"numRecords":1,"minValues":{"col-360dade5-6d0e-4831-8467-a25d64695975":13,"col-e27b0253-569a-4fe1-8f02-f3342c54d08b":"14"},"maxValues":{"col-360dade5-6d0e-4831-8467-a25d64695975":13,"col-e27b0253-569a-4fe1-8f02-f3342c54d08b":"14"},"nullCount":{"col-360dade5-6d0e-4831-8467-a25d64695975":0,"col-e27b0253-569a-4fe1-8f02-f3342c54d08b":0}}')
* remove: (NULL,NULL,NULL,NULL,{},NULL,{})
* metaData: (NULL,NULL,NULL,(NULL,{}),NULL,[],{},NULL)
* protocol: (NULL,NULL)
*
* Row 2:
*
* txn: (NULL,NULL,NULL)
* add: ('part-00000-8887e898-91dd-4951-a367-48f7eb7bd5fd-c000.parquet',{},1063,1680614318485,false,{},'{"numRecords":1,"minValues":{"col-360dade5-6d0e-4831-8467-a25d64695975":2,"col-e27b0253-569a-4fe1-8f02-f3342c54d08b":"3"},"maxValues":{"col-360dade5-6d0e-4831-8467-a25d64695975":2,"col-e27b0253-569a-4fe1-8f02-f3342c54d08b":"3"},"nullCount":{"col-360dade5-6d0e-4831-8467-a25d64695975":0,"col-e27b0253-569a-4fe1-8f02-f3342c54d08b":0}}')
* remove: (NULL,NULL,NULL,NULL,{},NULL,{})
* metaData: (NULL,NULL,NULL,(NULL,{}),NULL,[],{},NULL)
* protocol: (NULL,NULL)
*
* ...
*/
#define THROW_ARROW_NOT_OK(status) \
do \
{ \
if (const ::arrow::Status & _s = (status); !_s.ok()) \
throw Exception::createDeprecated(_s.ToString(), ErrorCodes::BAD_ARGUMENTS);\
} while (false)
size_t getCheckpointIfExists(std::set<String> & result, const Configuration & configuration, ContextPtr context)
{
const auto version = readLastCheckpointIfExists(configuration, context);
if (!version)
return 0;
const auto checkpoint_filename = withPadding(version) + ".checkpoint.parquet";
const auto checkpoint_path = fs::path(configuration.getPath()) / deltalake_metadata_directory / checkpoint_filename;
LOG_TRACE(log, "Using checkpoint file: {}", checkpoint_path.string());
auto buf = MetadataReadHelper::createReadBuffer(checkpoint_path, context, configuration);
auto format_settings = getFormatSettings(context);
/// Force nullable, because this parquet file for some reason does not have nullable
/// in parquet file metadata while the type are in fact nullable.
format_settings.schema_inference_make_columns_nullable = true;
auto columns = ParquetSchemaReader(*buf, format_settings).readSchema();
/// Read only columns that we need.
columns.filter(NameSet{"add", "remove"});
Block header;
for (const auto & column : columns)
header.insert({column.type->createColumn(), column.type, column.name});
std::atomic<int> is_stopped{0};
auto arrow_file = asArrowFile(*buf, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES);
std::unique_ptr<parquet::arrow::FileReader> reader;
THROW_ARROW_NOT_OK(
parquet::arrow::OpenFile(
asArrowFile(*buf, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES),
arrow::default_memory_pool(),
&reader));
std::shared_ptr<arrow::Schema> schema;
THROW_ARROW_NOT_OK(reader->GetSchema(&schema));
ArrowColumnToCHColumn column_reader(
header, "Parquet",
format_settings.parquet.import_nested,
format_settings.parquet.allow_missing_columns,
/* null_as_default */true,
format_settings.parquet.case_insensitive_column_matching);
Chunk res;
std::shared_ptr<arrow::Table> table;
THROW_ARROW_NOT_OK(reader->ReadTable(&table));
column_reader.arrowTableToCHChunk(res, table, reader->parquet_reader()->metadata()->num_rows());
const auto & res_columns = res.getColumns();
if (res_columns.size() != 2)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected number of columns: {}", res_columns.size());
/// Process `add` column.
{
auto buf = MetadataReadHelper::createReadBuffer(key, context, configuration);
char c;
while (!buf->eof())
const auto * tuple_column = assert_cast<const ColumnTuple *>(res_columns[0].get());
const auto & nullable_column = assert_cast<const ColumnNullable &>(tuple_column->getColumn(0));
const auto & path_column = assert_cast<const ColumnString &>(nullable_column.getNestedColumn());
for (size_t i = 0; i < path_column.size(); ++i)
{
/// May be some invalid characters before json.
while (buf->peek(c) && c != '{')
buf->ignore();
if (buf->eof())
break;
String json_str;
readJSONObjectPossiblyInvalid(json_str, *buf);
if (json_str.empty())
const auto filename = String(path_column.getDataAt(i));
if (filename.empty())
continue;
const JSON json(json_str);
handleJSON(json, configuration.getPath(), result);
LOG_TEST(log, "Adding {}", filename);
const auto [_, inserted] = result.insert(fs::path(configuration.getPath()) / filename);
if (!inserted)
throw Exception(ErrorCodes::INCORRECT_DATA, "File already exists {}", filename);
}
}
return result;
/// Process `remove` column.
{
const auto * tuple_column = assert_cast<const ColumnTuple *>(res_columns[1].get());
const auto & nullable_column = assert_cast<const ColumnNullable &>(tuple_column->getColumn(0));
const auto & path_column = assert_cast<const ColumnString &>(nullable_column.getNestedColumn());
for (size_t i = 0; i < path_column.size(); ++i)
{
const auto filename = String(path_column.getDataAt(i));
if (filename.empty())
continue;
LOG_TEST(log, "Removing {}", filename);
result.erase(fs::path(configuration.getPath()) / filename);
}
}
return version;
}
Poco::Logger * log = &Poco::Logger::get("DeltaLakeMetadataParser");
};

View File

@ -33,6 +33,11 @@ S3DataLakeMetadataReadHelper::createReadBuffer(const String & key, ContextPtr co
context->getReadSettings());
}
bool S3DataLakeMetadataReadHelper::exists(const String & key, const StorageS3::Configuration & configuration)
{
return S3::objectExists(*configuration.client, configuration.url.bucket, key);
}
std::vector<String> S3DataLakeMetadataReadHelper::listFiles(
const StorageS3::Configuration & base_configuration, const String & prefix, const String & suffix)
{

View File

@ -16,6 +16,8 @@ struct S3DataLakeMetadataReadHelper
static std::shared_ptr<ReadBuffer> createReadBuffer(
const String & key, ContextPtr context, const StorageS3::Configuration & base_configuration);
static bool exists(const String & key, const StorageS3::Configuration & configuration);
static std::vector<String> listFiles(const StorageS3::Configuration & configuration, const std::string & prefix = "", const std::string & suffix = "");
};
}

View File

@ -176,37 +176,76 @@ def test_partition_by(started_cluster):
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 10
# def test_checkpoint(started_cluster):
# instance = started_cluster.instances["node1"]
# minio_client = started_cluster.minio_client
# bucket = started_cluster.minio_bucket
# spark = get_spark()
# TABLE_NAME = "test_checkpoint"
#
# write_delta_from_df(
# spark,
# generate_data(spark, 0, 1),
# f"/{TABLE_NAME}",
# mode="overwrite",
# )
# for i in range(1, 25):
# write_delta_from_df(
# spark,
# generate_data(spark, i, i + 1),
# f"/{TABLE_NAME}",
# mode="append",
# )
#
# files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
# time.sleep(500)
# ok = false
# for file in files:
# if file.endswith("last_checkpoint"):
# ok = true
# assert ok
#
# create_delta_table(instance, TABLE_NAME)
# assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 16
def test_checkpoint(started_cluster):
instance = started_cluster.instances["node1"]
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
spark = get_spark()
TABLE_NAME = "test_checkpoint"
write_delta_from_df(
spark,
generate_data(spark, 0, 1),
f"/{TABLE_NAME}",
mode="overwrite",
)
for i in range(1, 25):
write_delta_from_df(
spark,
generate_data(spark, i, i + 1),
f"/{TABLE_NAME}",
mode="append",
)
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
# 25 data files
# 25 metadata files
# 1 last_metadata file
# 2 checkpoints
assert len(files) == 25 * 2 + 3
ok = False
for file in files:
if file.endswith("last_checkpoint"):
ok = True
assert ok
create_delta_table(instance, TABLE_NAME)
assert (
int(
instance.query(
f"SELECT count() FROM {TABLE_NAME} SETTINGS input_format_parquet_allow_missing_columns=1"
)
)
== 25
)
table = DeltaTable.forPath(spark, f"/{TABLE_NAME}")
table.delete("a < 10")
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 15
for i in range(0, 5):
write_delta_from_df(
spark,
generate_data(spark, i, i + 1),
f"/{TABLE_NAME}",
mode="append",
)
# + 1 metadata files (for delete)
# + 5 data files
# + 5 metadata files
# + 1 checkpoint file
# + 1 ?
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
assert len(files) == 53 + 1 + 5 * 2 + 1 + 1
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 20
assert instance.query(
f"SELECT * FROM {TABLE_NAME} ORDER BY 1"
).strip() == instance.query(
"SELECT number, toString(number + 1) FROM numbers(5) "
"UNION ALL SELECT number, toString(number + 1) FROM numbers(10, 15) ORDER BY 1"
).strip()
def test_multiple_log_files(started_cluster):