Properly read schema and partition columns from checkpoint file

This commit is contained in:
kssenii 2024-07-09 16:24:35 +02:00
parent 9b1003527d
commit 8a202d91ad
4 changed files with 255 additions and 81 deletions

View File

@ -8,6 +8,7 @@
#include <Common/logger_useful.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnArray.h>
#include <Formats/FormatFactory.h>
#include <IO/ReadBufferFromFileBase.h>
@ -30,6 +31,7 @@
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/NestedUtils.h>
#include <boost/algorithm/string/case_conv.hpp>
#include <parquet/file_reader.h>
@ -111,7 +113,7 @@ struct DeltaLakeMetadataImpl
std::set<String> result_files;
NamesAndTypesList current_schema;
DataLakePartitionColumns current_partition_columns;
const auto checkpoint_version = getCheckpointIfExists(result_files);
const auto checkpoint_version = getCheckpointIfExists(result_files, current_schema, current_partition_columns);
if (checkpoint_version)
{
@ -205,9 +207,9 @@ struct DeltaLakeMetadataImpl
Poco::Dynamic::Var json = parser.parse(json_str);
Poco::JSON::Object::Ptr object = json.extract<Poco::JSON::Object::Ptr>();
// std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
// object->stringify(oss);
// LOG_TEST(log, "Metadata: {}", oss.str());
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
object->stringify(oss);
LOG_TEST(log, "Metadata: {}", oss.str());
if (object->has("metaData"))
{
@ -216,30 +218,9 @@ struct DeltaLakeMetadataImpl
Poco::JSON::Parser p;
Poco::Dynamic::Var fields_json = parser.parse(schema_object);
Poco::JSON::Object::Ptr fields_object = fields_json.extract<Poco::JSON::Object::Ptr>();
const auto fields = fields_object->get("fields").extract<Poco::JSON::Array::Ptr>();
NamesAndTypesList current_schema;
for (size_t i = 0; i < fields->size(); ++i)
{
const auto field = fields->getObject(static_cast<UInt32>(i));
auto column_name = field->getValue<String>("name");
auto type = field->getValue<String>("type");
auto is_nullable = field->getValue<bool>("nullable");
std::string physical_name;
auto schema_metadata_object = field->get("metadata").extract<Poco::JSON::Object::Ptr>();
if (schema_metadata_object->has("delta.columnMapping.physicalName"))
physical_name = schema_metadata_object->getValue<String>("delta.columnMapping.physicalName");
else
physical_name = column_name;
LOG_TEST(log, "Found column: {}, type: {}, nullable: {}, physical name: {}",
column_name, type, is_nullable, physical_name);
current_schema.push_back({physical_name, getFieldType(field, "type", is_nullable)});
}
const Poco::JSON::Object::Ptr & fields_object = fields_json.extract<Poco::JSON::Object::Ptr>();
auto current_schema = parseMetadata(fields_object);
if (file_schema.empty())
{
file_schema = current_schema;
@ -274,7 +255,12 @@ struct DeltaLakeMetadataImpl
const auto value = partition_values->getValue<String>(partition_name);
auto name_and_type = file_schema.tryGetByName(partition_name);
if (!name_and_type)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No such column in schema: {}", partition_name);
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"No such column in schema: {} (schema: {})",
partition_name, file_schema.toNamesAndTypesDescription());
}
auto field = getFieldValue(value, name_and_type->type);
current_partition_columns.emplace_back(*name_and_type, field);
@ -293,6 +279,32 @@ struct DeltaLakeMetadataImpl
}
}
NamesAndTypesList parseMetadata(const Poco::JSON::Object::Ptr & metadata_json)
{
NamesAndTypesList schema;
const auto fields = metadata_json->get("fields").extract<Poco::JSON::Array::Ptr>();
for (size_t i = 0; i < fields->size(); ++i)
{
const auto field = fields->getObject(static_cast<UInt32>(i));
auto column_name = field->getValue<String>("name");
auto type = field->getValue<String>("type");
auto is_nullable = field->getValue<bool>("nullable");
std::string physical_name;
auto schema_metadata_object = field->get("metadata").extract<Poco::JSON::Object::Ptr>();
if (schema_metadata_object->has("delta.columnMapping.physicalName"))
physical_name = schema_metadata_object->getValue<String>("delta.columnMapping.physicalName");
else
physical_name = column_name;
LOG_TEST(log, "Found column: {}, type: {}, nullable: {}, physical name: {}",
column_name, type, is_nullable, physical_name);
schema.push_back({physical_name, getFieldType(field, "type", is_nullable)});
}
return schema;
}
DataTypePtr getFieldType(const Poco::JSON::Object::Ptr & field, const String & type_key, bool is_nullable)
{
if (field->isObject(type_key))
@ -506,7 +518,10 @@ struct DeltaLakeMetadataImpl
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Arrow error: {}", _s.ToString()); \
} while (false)
size_t getCheckpointIfExists(std::set<String> & result)
size_t getCheckpointIfExists(
std::set<String> & result,
NamesAndTypesList & file_schema,
DataLakePartitionColumns & file_partition_columns)
{
const auto version = readLastCheckpointIfExists();
if (!version)
@ -527,7 +542,8 @@ struct DeltaLakeMetadataImpl
auto columns = ParquetSchemaReader(*buf, format_settings).readSchema();
/// Read only columns that we need.
columns.filterColumns(NameSet{"add", "remove"});
auto filter_column_names = NameSet{"add", "metaData"};
columns.filterColumns(filter_column_names);
Block header;
for (const auto & column : columns)
header.insert({column.type->createColumn(), column.type, column.name});
@ -541,9 +557,6 @@ struct DeltaLakeMetadataImpl
ArrowMemoryPool::instance(),
&reader));
std::shared_ptr<arrow::Schema> file_schema;
THROW_ARROW_NOT_OK(reader->GetSchema(&file_schema));
ArrowColumnToCHColumn column_reader(
header, "Parquet",
format_settings.parquet.allow_missing_columns,
@ -554,29 +567,85 @@ struct DeltaLakeMetadataImpl
std::shared_ptr<arrow::Table> table;
THROW_ARROW_NOT_OK(reader->ReadTable(&table));
Chunk res = column_reader.arrowTableToCHChunk(table, reader->parquet_reader()->metadata()->num_rows());
const auto & res_columns = res.getColumns();
Chunk chunk = column_reader.arrowTableToCHChunk(table, reader->parquet_reader()->metadata()->num_rows());
auto res_block = header.cloneWithColumns(chunk.detachColumns());
res_block = Nested::flatten(res_block);
if (res_columns.size() != 2)
{
throw Exception(
ErrorCodes::INCORRECT_DATA,
"Unexpected number of columns: {} (having: {}, expected: {})",
res_columns.size(), res.dumpStructure(), header.dumpStructure());
}
const auto * nullable_path_column = assert_cast<const ColumnNullable *>(res_block.getByName("add.path").column.get());
const auto & path_column = assert_cast<const ColumnString &>(nullable_path_column->getNestedColumn());
const auto * nullable_schema_column = assert_cast<const ColumnNullable *>(res_block.getByName("metaData.schemaString").column.get());
const auto & schema_column = assert_cast<const ColumnString &>(nullable_schema_column->getNestedColumn());
auto partition_values_column_raw = res_block.getByName("add.partitionValues").column;
const auto & partition_values_column = assert_cast<const ColumnMap &>(*partition_values_column_raw);
const auto * tuple_column = assert_cast<const ColumnTuple *>(res_columns[0].get());
const auto & nullable_column = assert_cast<const ColumnNullable &>(tuple_column->getColumn(0));
const auto & path_column = assert_cast<const ColumnString &>(nullable_column.getNestedColumn());
for (size_t i = 0; i < path_column.size(); ++i)
{
const auto filename = String(path_column.getDataAt(i));
if (filename.empty())
const auto metadata = String(schema_column.getDataAt(i));
if (!metadata.empty())
{
Poco::JSON::Parser parser;
Poco::Dynamic::Var json = parser.parse(metadata);
const Poco::JSON::Object::Ptr & object = json.extract<Poco::JSON::Object::Ptr>();
auto current_schema = parseMetadata(object);
if (file_schema.empty())
{
file_schema = current_schema;
LOG_TEST(log, "Processed schema from checkpoint: {}", file_schema.toString());
}
else if (file_schema != current_schema)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Reading from files with different schema is not possible "
"({} is different from {})",
file_schema.toString(), current_schema.toString());
}
}
}
for (size_t i = 0; i < path_column.size(); ++i)
{
const auto path = String(path_column.getDataAt(i));
if (path.empty())
continue;
LOG_TEST(log, "Adding {}", filename);
const auto [_, inserted] = result.insert(std::filesystem::path(configuration->getPath()) / filename);
auto filename = fs::path(path).filename().string();
auto it = file_partition_columns.find(filename);
if (it == file_partition_columns.end())
{
Field map;
partition_values_column.get(i, map);
auto partition_values_map = map.safeGet<Map>();
if (!partition_values_map.empty())
{
auto & current_partition_columns = file_partition_columns[filename];
for (const auto & map_value : partition_values_map)
{
const auto tuple = map_value.safeGet<Tuple>();
const auto partition_name = tuple[0].safeGet<String>();
auto name_and_type = file_schema.tryGetByName(partition_name);
if (!name_and_type)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"No such column in schema: {} (schema: {})",
partition_name, file_schema.toString());
}
const auto value = tuple[1].safeGet<String>();
auto field = getFieldValue(value, name_and_type->type);
current_partition_columns.emplace_back(*name_and_type, field);
LOG_TEST(log, "Partition {} value is {} (for {})", partition_name, value, filename);
}
}
}
LOG_TEST(log, "Adding {}", path);
const auto [_, inserted] = result.insert(std::filesystem::path(configuration->getPath()) / path);
if (!inserted)
throw Exception(ErrorCodes::INCORRECT_DATA, "File already exists {}", filename);
throw Exception(ErrorCodes::INCORRECT_DATA, "File already exists {}", path);
}
return version;

View File

@ -17,6 +17,10 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
/// Storage for read-only integration with Apache Iceberg tables in Amazon S3 (see https://iceberg.apache.org/)
/// Right now it's implemented on top of StorageS3 and right now it doesn't support
@ -41,6 +45,7 @@ public:
auto object_storage = base_configuration->createObjectStorage(context, /* is_readonly */true);
DataLakeMetadataPtr metadata;
NamesAndTypesList schema_from_metadata;
const bool use_schema_from_metadata = columns_.empty();
if (base_configuration->format == "auto")
base_configuration->format = "Parquet";
@ -50,8 +55,9 @@ public:
try
{
metadata = DataLakeMetadata::create(object_storage, base_configuration, context);
schema_from_metadata = metadata->getTableSchema();
configuration->setPaths(metadata->getDataFiles());
if (use_schema_from_metadata)
schema_from_metadata = metadata->getTableSchema();
}
catch (...)
{
@ -66,7 +72,7 @@ public:
return std::make_shared<IStorageDataLake<DataLakeMetadata>>(
base_configuration, std::move(metadata), configuration, object_storage,
context, table_id_,
columns_.empty() ? ColumnsDescription(schema_from_metadata) : columns_,
use_schema_from_metadata ? ColumnsDescription(schema_from_metadata) : columns_,
constraints_, comment_, format_settings_);
}

View File

@ -206,23 +206,25 @@ Chunk StorageObjectStorageSource::generate()
if (!partition_columns.empty() && chunk_size && chunk.hasColumns())
{
auto partition_values = partition_columns.find(filename);
for (const auto & [name_and_type, value] : partition_values->second)
if (partition_values != partition_columns.end())
{
if (!read_from_format_info.source_header.has(name_and_type.name))
continue;
for (const auto & [name_and_type, value] : partition_values->second)
{
if (!read_from_format_info.source_header.has(name_and_type.name))
continue;
const auto column_pos = read_from_format_info.source_header.getPositionByName(name_and_type.name);
auto partition_column = name_and_type.type->createColumnConst(chunk.getNumRows(), value)->convertToFullColumnIfConst();
const auto column_pos = read_from_format_info.source_header.getPositionByName(name_and_type.name);
auto partition_column = name_and_type.type->createColumnConst(chunk.getNumRows(), value)->convertToFullColumnIfConst();
/// This column is filled with default value now, remove it.
chunk.erase(column_pos);
/// This column is filled with default value now, remove it.
chunk.erase(column_pos);
/// Add correct values.
if (chunk.hasColumns())
chunk.addColumn(column_pos, std::move(partition_column));
else
chunk.addColumn(std::move(partition_column));
/// Add correct values.
if (column_pos < chunk.getNumColumns())
chunk.addColumn(column_pos, std::move(partition_column));
else
chunk.addColumn(std::move(partition_column));
}
}
}
return chunk;

View File

@ -596,19 +596,116 @@ def test_partition_columns(started_cluster):
)
assert result == 1
# instance.query(
# f"""
# DROP TABLE IF EXISTS {TABLE_NAME};
# CREATE TABLE {TABLE_NAME} (a Int32, b String, c DateTime)
# ENGINE=DeltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123')"""
# )
# assert (
# int(
# instance.query(
# f"SELECT count() FROM {TABLE_NAME} WHERE c != toDateTime('2000/01/05')"
# )
# )
# == num_rows - 1
# )
# instance.query(f"SELECT a, b, c, FROM {TABLE_NAME}")
# assert False
instance.query(
f"""
DROP TABLE IF EXISTS {TABLE_NAME};
CREATE TABLE {TABLE_NAME} (a Nullable(Int32), b Nullable(String), c Nullable(Date32), d Nullable(Int32), e Nullable(Bool))
ENGINE=DeltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123')"""
)
assert (
"""1 test1 2000-01-01 1 false
2 test2 2000-01-02 2 false
3 test3 2000-01-03 3 false
4 test4 2000-01-04 4 false
5 test5 2000-01-05 5 false
6 test6 2000-01-06 6 false
7 test7 2000-01-07 7 false
8 test8 2000-01-08 8 false
9 test9 2000-01-09 9 false"""
== instance.query(f"SELECT * FROM {TABLE_NAME} ORDER BY b").strip()
)
assert (
int(
instance.query(
f"SELECT count() FROM {TABLE_NAME} WHERE c == toDateTime('2000/01/05')"
)
)
== 1
)
# Subset of columns should work.
instance.query(
f"""
DROP TABLE IF EXISTS {TABLE_NAME};
CREATE TABLE {TABLE_NAME} (b Nullable(String), c Nullable(Date32), d Nullable(Int32))
ENGINE=DeltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123')"""
)
assert (
"""test1 2000-01-01 1
test2 2000-01-02 2
test3 2000-01-03 3
test4 2000-01-04 4
test5 2000-01-05 5
test6 2000-01-06 6
test7 2000-01-07 7
test8 2000-01-08 8
test9 2000-01-09 9"""
== instance.query(f"SELECT * FROM {TABLE_NAME} ORDER BY b").strip()
)
for i in range(num_rows + 1, 2 * num_rows + 1):
data = [
(
i,
"test" + str(i),
datetime.strptime(f"2000-01-{i}", "%Y-%m-%d"),
i,
False,
)
]
df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()
df.write.mode("append").format("delta").partitionBy(partition_columns).save(
f"/{TABLE_NAME}"
)
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
ok = False
for file in files:
if file.endswith("last_checkpoint"):
ok = True
assert ok
result = int(
instance.query(
f"""SELECT count()
FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123')
"""
)
)
assert result == num_rows * 2
assert (
"""1 test1 2000-01-01 1 false
2 test2 2000-01-02 2 false
3 test3 2000-01-03 3 false
4 test4 2000-01-04 4 false
5 test5 2000-01-05 5 false
6 test6 2000-01-06 6 false
7 test7 2000-01-07 7 false
8 test8 2000-01-08 8 false
9 test9 2000-01-09 9 false
10 test10 2000-01-10 10 false
11 test11 2000-01-11 11 false
12 test12 2000-01-12 12 false
13 test13 2000-01-13 13 false
14 test14 2000-01-14 14 false
15 test15 2000-01-15 15 false
16 test16 2000-01-16 16 false
17 test17 2000-01-17 17 false
18 test18 2000-01-18 18 false"""
== instance.query(
f"""
SELECT * FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123') ORDER BY c
"""
).strip()
)
assert (
int(
instance.query(
f"SELECT count() FROM {TABLE_NAME} WHERE c == toDateTime('2000/01/15')"
)
)
== 1
)