Add tests

This commit is contained in:
avogar 2023-10-17 18:10:47 +00:00
parent 032d82e004
commit 323486f9e8
6 changed files with 261 additions and 85 deletions

View File

@ -160,9 +160,12 @@ static void insertNumber(IColumn & column, WhichDataType type, T value)
} }
template <typename DecimalType> template <typename DecimalType>
static AvroDeserializer::DeserializeFn createDecimalDeserializeFn(const avro::NodePtr & root_node, const DataTypePtr & target_type) static AvroDeserializer::DeserializeFn createDecimalDeserializeFn(const avro::NodePtr & root_node, const DataTypePtr & target_type, bool is_fixed)
{ {
auto logical_type = root_node->logicalType(); auto logical_type = root_node->logicalType();
size_t fixed_size = 0;
if (is_fixed)
fixed_size = root_node->fixedSize();
const auto & decimal_type = assert_cast<const DecimalType &>(*target_type); const auto & decimal_type = assert_cast<const DecimalType &>(*target_type);
if (decimal_type.getScale() != static_cast<UInt32>(logical_type.scale()) || decimal_type.getPrecision() != static_cast<UInt32>(logical_type.precision())) if (decimal_type.getScale() != static_cast<UInt32>(logical_type.scale()) || decimal_type.getPrecision() != static_cast<UInt32>(logical_type.precision()))
throw Exception( throw Exception(
@ -174,14 +177,18 @@ static AvroDeserializer::DeserializeFn createDecimalDeserializeFn(const avro::No
decimal_type.getScale(), decimal_type.getScale(),
decimal_type.getPrecision()); decimal_type.getPrecision());
return [tmp = std::string(), target_type](IColumn & column, avro::Decoder & decoder) mutable return [tmp = std::vector<uint8_t>(), target_type, fixed_size](IColumn & column, avro::Decoder & decoder) mutable
{ {
static constexpr size_t field_type_size = sizeof(typename DecimalType::FieldType); static constexpr size_t field_type_size = sizeof(typename DecimalType::FieldType);
decoder.decodeString(tmp); if (fixed_size)
if (tmp.size() > field_type_size) tmp = decoder.decodeFixed(fixed_size);
else
tmp = decoder.decodeBytes();
if (tmp.size() > field_type_size || tmp.empty())
throw ParsingException( throw ParsingException(
ErrorCodes::CANNOT_PARSE_UUID, ErrorCodes::CANNOT_PARSE_UUID,
"Cannot parse type {}, expected binary data with size equal to or less than {}, got {}", "Cannot parse type {}, expected non-empty binary data with size equal to or less than {}, got {}",
target_type->getName(), target_type->getName(),
field_type_size, field_type_size,
tmp.size()); tmp.size());
@ -189,10 +196,12 @@ static AvroDeserializer::DeserializeFn createDecimalDeserializeFn(const avro::No
{ {
/// Extent value to required size by adding padding. /// Extent value to required size by adding padding.
/// Check if value is negative or positive. /// Check if value is negative or positive.
std::vector<uint8_t> padding;
if (tmp[0] & 128) if (tmp[0] & 128)
tmp = std::string(field_type_size - tmp.size(), 0xff) + tmp; padding = std::vector<uint8_t>(field_type_size - tmp.size(), 0xff);
else else
tmp = std::string(field_type_size - tmp.size(), 0) + tmp; padding = std::vector<uint8_t>(field_type_size - tmp.size(), 0);
tmp.insert(tmp.begin(), padding.begin(), padding.end());
} }
typename DecimalType::FieldType field; typename DecimalType::FieldType field;
@ -282,15 +291,15 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(const avro
}; };
} }
if (target.isDecimal32()) if (target.isDecimal32())
return createDecimalDeserializeFn<DataTypeDecimal32>(root_node, target_type); return createDecimalDeserializeFn<DataTypeDecimal32>(root_node, target_type, false);
if (target.isDecimal64()) if (target.isDecimal64())
return createDecimalDeserializeFn<DataTypeDecimal64>(root_node, target_type); return createDecimalDeserializeFn<DataTypeDecimal64>(root_node, target_type, false);
if (target.isDecimal128()) if (target.isDecimal128())
return createDecimalDeserializeFn<DataTypeDecimal128>(root_node, target_type); return createDecimalDeserializeFn<DataTypeDecimal128>(root_node, target_type, false);
if (target.isDecimal256()) if (target.isDecimal256())
return createDecimalDeserializeFn<DataTypeDecimal256>(root_node, target_type); return createDecimalDeserializeFn<DataTypeDecimal256>(root_node, target_type, false);
if (target.isDateTime64()) if (target.isDateTime64())
return createDecimalDeserializeFn<DataTypeDateTime64>(root_node, target_type); return createDecimalDeserializeFn<DataTypeDateTime64>(root_node, target_type, false);
break; break;
case avro::AVRO_INT: case avro::AVRO_INT:
if (target_type->isValueRepresentedByNumber()) if (target_type->isValueRepresentedByNumber())
@ -515,6 +524,29 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(const avro
return true; return true;
}; };
} }
if (target.isUUID())
{
return [tmp = std::vector<uint8_t>(), fixed_size](IColumn & column, avro::Decoder & decoder) mutable
{
decoder.decodeFixed(fixed_size, tmp);
if (tmp.size() != 36)
throw ParsingException(ErrorCodes::CANNOT_PARSE_UUID, "Cannot parse UUID from type Fixed, because it's size ({}) is not equal to the size of UUID (36)", fixed_size);
const UUID uuid = parseUUID({reinterpret_cast<const UInt8 *>(tmp.data()), tmp.size()});
assert_cast<DataTypeUUID::ColumnType &>(column).insertValue(uuid);
return true;
};
}
if (target.isDecimal32())
return createDecimalDeserializeFn<DataTypeDecimal32>(root_node, target_type, true);
if (target.isDecimal64())
return createDecimalDeserializeFn<DataTypeDecimal64>(root_node, target_type, true);
if (target.isDecimal128())
return createDecimalDeserializeFn<DataTypeDecimal128>(root_node, target_type, true);
if (target.isDecimal256())
return createDecimalDeserializeFn<DataTypeDecimal256>(root_node, target_type, true);
if (target.isDateTime64())
return createDecimalDeserializeFn<DataTypeDateTime64>(root_node, target_type, true);
break; break;
} }
case avro::AVRO_SYMBOLIC: case avro::AVRO_SYMBOLIC:
@ -1210,7 +1242,16 @@ DataTypePtr AvroSchemaReader::avroNodeToDataType(avro::NodePtr node)
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "ClickHouse supports only 8 and 16-bit Enum."); throw Exception(ErrorCodes::ILLEGAL_COLUMN, "ClickHouse supports only 8 and 16-bit Enum.");
} }
case avro::Type::AVRO_FIXED: case avro::Type::AVRO_FIXED:
{
auto logical_type = node->logicalType();
if (logical_type.type() == avro::LogicalType::UUID)
return std::make_shared<DataTypeUUID>();
if (logical_type.type() == avro::LogicalType::DECIMAL)
return createDecimal<DataTypeDecimal>(logical_type.precision(), logical_type.scale());
return std::make_shared<DataTypeFixedString>(node->fixedSize()); return std::make_shared<DataTypeFixedString>(node->fixedSize());
}
case avro::Type::AVRO_ARRAY: case avro::Type::AVRO_ARRAY:
return std::make_shared<DataTypeArray>(avroNodeToDataType(node->leafAt(0))); return std::make_shared<DataTypeArray>(avroNodeToDataType(node->leafAt(0)));
case avro::Type::AVRO_NULL: case avro::Type::AVRO_NULL:

View File

@ -122,6 +122,8 @@ static DataTypePtr parseORCType(const orc::Type * orc_type, bool skip_columns_wi
return std::make_shared<DataTypeDate32>(); return std::make_shared<DataTypeDate32>();
case orc::TypeKind::TIMESTAMP: case orc::TypeKind::TIMESTAMP:
return std::make_shared<DataTypeDateTime64>(9); return std::make_shared<DataTypeDateTime64>(9);
case orc::TypeKind::TIMESTAMP_INSTANT:
return std::make_shared<DataTypeDateTime64>(9, "UTC");
case orc::TypeKind::VARCHAR: case orc::TypeKind::VARCHAR:
case orc::TypeKind::BINARY: case orc::TypeKind::BINARY:
case orc::TypeKind::STRING: case orc::TypeKind::STRING:
@ -795,7 +797,8 @@ static ColumnWithTypeAndName readColumnFromORCColumn(
return readColumnWithNumericData<Float64, orc::DoubleVectorBatch>(orc_column, orc_type, column_name); return readColumnWithNumericData<Float64, orc::DoubleVectorBatch>(orc_column, orc_type, column_name);
case orc::DATE: case orc::DATE:
return readColumnWithDateData(orc_column, orc_type, column_name, type_hint); return readColumnWithDateData(orc_column, orc_type, column_name, type_hint);
case orc::TIMESTAMP: case orc::TIMESTAMP: [[fallthrough]];
case orc::TIMESTAMP_INSTANT:
return readColumnWithTimestampData(orc_column, orc_type, column_name); return readColumnWithTimestampData(orc_column, orc_type, column_name);
case orc::DECIMAL: { case orc::DECIMAL: {
auto interal_type = parseORCType(orc_type, false, skipped); auto interal_type = parseORCType(orc_type, false, skipped);

View File

@ -182,7 +182,7 @@ DataTypePtr getSimpleTypeByName(const String & type_name)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown Iceberg type: {}", type_name); throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown Iceberg type: {}", type_name);
} }
DataTypePtr getFieldType(const Poco::Dynamic::Var & type, bool required); DataTypePtr getFieldType(const Poco::JSON::Object::Ptr & field, const String & type_key, bool required);
DataTypePtr getComplexTypeFromObject(const Poco::JSON::Object::Ptr & type) DataTypePtr getComplexTypeFromObject(const Poco::JSON::Object::Ptr & type)
{ {
@ -190,15 +190,15 @@ DataTypePtr getComplexTypeFromObject(const Poco::JSON::Object::Ptr & type)
if (type_name == "list") if (type_name == "list")
{ {
bool element_required = type->getValue<bool>("element-required"); bool element_required = type->getValue<bool>("element-required");
auto element_type = getFieldType(type->get("element"), element_required); auto element_type = getFieldType(type, "element", element_required);
return std::make_shared<DataTypeArray>(element_type); return std::make_shared<DataTypeArray>(element_type);
} }
if (type_name == "map") if (type_name == "map")
{ {
auto key_type = getFieldType(type->get("key"), true); auto key_type = getFieldType(type, "key", true);
auto value_required = type->getValue<bool>("value-required"); auto value_required = type->getValue<bool>("value-required");
auto value_type = getFieldType(type->get("value"), value_required); auto value_type = getFieldType(type, "value", value_required);
return std::make_shared<DataTypeMap>(key_type, value_type); return std::make_shared<DataTypeMap>(key_type, value_type);
} }
@ -214,7 +214,7 @@ DataTypePtr getComplexTypeFromObject(const Poco::JSON::Object::Ptr & type)
auto field = fields->getObject(static_cast<Int32>(i)); auto field = fields->getObject(static_cast<Int32>(i));
element_names.push_back(field->getValue<String>("name")); element_names.push_back(field->getValue<String>("name"));
auto required = field->getValue<bool>("required"); auto required = field->getValue<bool>("required");
element_types.push_back(getFieldType(field->get("type"), required)); element_types.push_back(getFieldType(field, "type", required));
} }
return std::make_shared<DataTypeTuple>(element_types, element_names); return std::make_shared<DataTypeTuple>(element_types, element_names);
@ -223,8 +223,12 @@ DataTypePtr getComplexTypeFromObject(const Poco::JSON::Object::Ptr & type)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown Iceberg type: {}", type_name); throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown Iceberg type: {}", type_name);
} }
DataTypePtr getFieldType(const Poco::Dynamic::Var & type, bool required) DataTypePtr getFieldType(const Poco::JSON::Object::Ptr & field, const String & type_key, bool required)
{ {
if (field->isObject(type_key))
return getComplexTypeFromObject(field->getObject(type_key));
auto type = field->get(type_key);
if (type.isString()) if (type.isString())
{ {
const String & type_name = type.extract<String>(); const String & type_name = type.extract<String>();
@ -232,10 +236,8 @@ DataTypePtr getFieldType(const Poco::Dynamic::Var & type, bool required)
return required ? data_type : makeNullable(data_type); return required ? data_type : makeNullable(data_type);
} }
if (type.isStruct())
return getComplexTypeFromObject(type.extract<Poco::JSON::Object::Ptr>());
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected 'type' field: {}", type.toString()); throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected 'type' field: {}", type.toString());
} }
std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, int format_version) std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, int format_version)
@ -252,10 +254,7 @@ std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::P
current_schema_id = metadata_object->getValue<int>("current-schema-id"); current_schema_id = metadata_object->getValue<int>("current-schema-id");
auto schemas = metadata_object->get("schemas").extract<Poco::JSON::Array::Ptr>(); auto schemas = metadata_object->get("schemas").extract<Poco::JSON::Array::Ptr>();
if (schemas->size() != 1) if (schemas->size() != 1)
throw Exception( throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not supported");
ErrorCodes::UNSUPPORTED_METHOD,
"Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not "
"supported");
/// Now we sure that there is only one schema. /// Now we sure that there is only one schema.
schema = schemas->getObject(0); schema = schemas->getObject(0);
@ -269,10 +268,7 @@ std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::P
/// Field "schemas" is optional for version 1, but after version 2 was introduced, /// Field "schemas" is optional for version 1, but after version 2 was introduced,
/// in most cases this field is added for new tables in version 1 as well. /// in most cases this field is added for new tables in version 1 as well.
if (metadata_object->has("schemas") && metadata_object->get("schemas").extract<Poco::JSON::Array::Ptr>()->size() > 1) if (metadata_object->has("schemas") && metadata_object->get("schemas").extract<Poco::JSON::Array::Ptr>()->size() > 1)
throw Exception( throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not supported");
ErrorCodes::UNSUPPORTED_METHOD,
"Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not "
"supported");
} }
NamesAndTypesList names_and_types; NamesAndTypesList names_and_types;
@ -282,13 +278,16 @@ std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::P
auto field = fields->getObject(static_cast<UInt32>(i)); auto field = fields->getObject(static_cast<UInt32>(i));
auto name = field->getValue<String>("name"); auto name = field->getValue<String>("name");
bool required = field->getValue<bool>("required"); bool required = field->getValue<bool>("required");
names_and_types.push_back({name, getFieldType(field->get("type"), required)}); names_and_types.push_back({name, getFieldType(field, "type", required)});
} }
return {std::move(names_and_types), current_schema_id}; return {std::move(names_and_types), current_schema_id};
} }
MutableColumns parseAvro(avro::DataFileReaderBase & file_reader, const Block & header, const FormatSettings & settings) MutableColumns parseAvro(
avro::DataFileReaderBase & file_reader,
const Block & header,
const FormatSettings & settings)
{ {
auto deserializer = std::make_unique<AvroDeserializer>(header, file_reader.dataSchema(), true, true, settings); auto deserializer = std::make_unique<AvroDeserializer>(header, file_reader.dataSchema(), true, true, settings);
MutableColumns columns = header.cloneEmptyColumns(); MutableColumns columns = header.cloneEmptyColumns();
@ -313,7 +312,9 @@ std::pair<Int32, String> getMetadataFileAndVersion(const StorageS3::Configuratio
if (metadata_files.empty()) if (metadata_files.empty())
{ {
throw Exception( throw Exception(
ErrorCodes::FILE_DOESNT_EXIST, "The metadata file for Iceberg table with path {} doesn't exist", configuration.url.key); ErrorCodes::FILE_DOESNT_EXIST,
"The metadata file for Iceberg table with path {} doesn't exist",
configuration.url.key);
} }
std::vector<std::pair<UInt32, String>> metadata_files_with_versions; std::vector<std::pair<UInt32, String>> metadata_files_with_versions;
@ -323,8 +324,7 @@ std::pair<Int32, String> getMetadataFileAndVersion(const StorageS3::Configuratio
String file_name(path.begin() + path.find_last_of('/') + 1, path.end()); String file_name(path.begin() + path.find_last_of('/') + 1, path.end());
String version_str(file_name.begin() + 1, file_name.begin() + file_name.find_first_of('.')); String version_str(file_name.begin() + 1, file_name.begin() + file_name.find_first_of('.'));
if (!std::all_of(version_str.begin(), version_str.end(), isdigit)) if (!std::all_of(version_str.begin(), version_str.end(), isdigit))
throw Exception( throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bad metadata file name: {}. Expected vN.metadata.json where N is a number", file_name);
ErrorCodes::BAD_ARGUMENTS, "Bad metadata file name: {}. Expected vN.metadata.json where N is a number", file_name);
metadata_files_with_versions.emplace_back(std::stoi(version_str), path); metadata_files_with_versions.emplace_back(std::stoi(version_str), path);
} }
@ -364,8 +364,7 @@ std::unique_ptr<IcebergMetadata> parseIcebergMetadata(const StorageS3::Configura
} }
} }
return std::make_unique<IcebergMetadata>( return std::make_unique<IcebergMetadata>(configuration, context_, metadata_version, format_version, manifest_list_file, schema_id, schema);
configuration, context_, metadata_version, format_version, manifest_list_file, schema_id, schema);
} }
/** /**
@ -405,8 +404,7 @@ Strings IcebergMetadata::getDataFiles()
LOG_TEST(log, "Collect manifest files from manifest list {}", manifest_list_file); LOG_TEST(log, "Collect manifest files from manifest list {}", manifest_list_file);
auto manifest_list_buf = S3DataLakeMetadataReadHelper::createReadBuffer(manifest_list_file, getContext(), configuration); auto manifest_list_buf = S3DataLakeMetadataReadHelper::createReadBuffer(manifest_list_file, getContext(), configuration);
auto manifest_list_file_reader auto manifest_list_file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*manifest_list_buf));
= std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*manifest_list_buf));
auto data_type = AvroSchemaReader::avroNodeToDataType(manifest_list_file_reader->dataSchema().root()->leafAt(0)); auto data_type = AvroSchemaReader::avroNodeToDataType(manifest_list_file_reader->dataSchema().root()->leafAt(0));
Block header{{data_type->createColumn(), data_type, "manifest_path"}}; Block header{{data_type->createColumn(), data_type, "manifest_path"}};
@ -447,10 +445,7 @@ Strings IcebergMetadata::getDataFiles()
Poco::Dynamic::Var json = parser.parse(schema_json_string); Poco::Dynamic::Var json = parser.parse(schema_json_string);
Poco::JSON::Object::Ptr schema_object = json.extract<Poco::JSON::Object::Ptr>(); Poco::JSON::Object::Ptr schema_object = json.extract<Poco::JSON::Object::Ptr>();
if (schema_object->getValue<int>("schema-id") != current_schema_id) if (schema_object->getValue<int>("schema-id") != current_schema_id)
throw Exception( throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not supported");
ErrorCodes::UNSUPPORTED_METHOD,
"Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not "
"supported");
avro::NodePtr root_node = manifest_file_reader->dataSchema().root(); avro::NodePtr root_node = manifest_file_reader->dataSchema().root();
size_t leaves_num = root_node->leaves(); size_t leaves_num = root_node->leaves();
@ -458,7 +453,9 @@ Strings IcebergMetadata::getDataFiles()
if (leaves_num < expected_min_num) if (leaves_num < expected_min_num)
{ {
throw Exception( throw Exception(
ErrorCodes::BAD_ARGUMENTS, "Unexpected number of columns {}. Expected at least {}", root_node->leaves(), expected_min_num); ErrorCodes::BAD_ARGUMENTS,
"Unexpected number of columns {}. Expected at least {}",
root_node->leaves(), expected_min_num);
} }
avro::NodePtr status_node = root_node->leafAt(0); avro::NodePtr status_node = root_node->leafAt(0);
@ -481,8 +478,8 @@ Strings IcebergMetadata::getDataFiles()
auto status_col_data_type = AvroSchemaReader::avroNodeToDataType(status_node); auto status_col_data_type = AvroSchemaReader::avroNodeToDataType(status_node);
auto data_col_data_type = AvroSchemaReader::avroNodeToDataType(data_file_node); auto data_col_data_type = AvroSchemaReader::avroNodeToDataType(data_file_node);
Block manifest_file_header{ Block manifest_file_header
{status_col_data_type->createColumn(), status_col_data_type, "status"}, = {{status_col_data_type->createColumn(), status_col_data_type, "status"},
{data_col_data_type->createColumn(), data_col_data_type, "data_file"}}; {data_col_data_type->createColumn(), data_col_data_type, "data_file"}};
columns = parseAvro(*manifest_file_reader, manifest_file_header, getFormatSettings(getContext())); columns = parseAvro(*manifest_file_reader, manifest_file_header, getFormatSettings(getContext()));
@ -551,8 +548,7 @@ Strings IcebergMetadata::getDataFiles()
{ {
Int32 content_type = content_int_column->getElement(i); Int32 content_type = content_int_column->getElement(i);
if (DataFileContent(content_type) != DataFileContent::DATA) if (DataFileContent(content_type) != DataFileContent::DATA)
throw Exception( throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: positional and equality deletes are not supported");
ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: positional and equality deletes are not supported");
} }
const auto status = status_int_column->getInt(i); const auto status = status_int_column->getInt(i);

View File

@ -37,7 +37,7 @@ StorageIceberg::StorageIceberg(
{ {
} }
ColumnsDescription StorageIceberg::getTableStructureFromMetadata( ColumnsDescription StorageIceberg::getTableStructureFromData(
Configuration & base_configuration, Configuration & base_configuration,
const std::optional<FormatSettings> &, const std::optional<FormatSettings> &,
ContextPtr local_context) ContextPtr local_context)

View File

@ -48,7 +48,7 @@ public:
String getName() const override { return name; } String getName() const override { return name; }
static ColumnsDescription getTableStructureFromMetadata( static ColumnsDescription getTableStructureFromData(
Configuration & base_configuration, Configuration & base_configuration,
const std::optional<FormatSettings> &, const std::optional<FormatSettings> &,
ContextPtr local_context); ContextPtr local_context);

View File

@ -41,6 +41,7 @@ def get_spark():
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.spark_catalog.type", "hadoop") .config("spark.sql.catalog.spark_catalog.type", "hadoop")
.config("spark.sql.catalog.spark_catalog.warehouse", "/iceberg_data") .config("spark.sql.catalog.spark_catalog.warehouse", "/iceberg_data")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.master("local") .master("local")
) )
return builder.master("local").getOrCreate() return builder.master("local").getOrCreate()
@ -129,12 +130,12 @@ def generate_data(spark, start, end):
return df return df
def create_iceberg_table(node, table_name): def create_iceberg_table(node, table_name, format="Parquet"):
node.query( node.query(
f""" f"""
DROP TABLE IF EXISTS {table_name}; DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name} CREATE TABLE {table_name}
ENGINE=Iceberg(s3, filename = 'iceberg_data/default/{table_name}/')""" ENGINE=Iceberg(s3, filename = 'iceberg_data/default/{table_name}/', format={format})"""
) )
@ -165,7 +166,7 @@ def test_single_iceberg_file(started_cluster, format_version):
bucket = started_cluster.minio_bucket bucket = started_cluster.minio_bucket
TABLE_NAME = "test_single_iceberg_file_" + format_version TABLE_NAME = "test_single_iceberg_file_" + format_version
inserted_data = "SELECT number, toString(number) FROM numbers(100)" inserted_data = "SELECT number, toString(number) as string FROM numbers(100)"
parquet_data_path = create_initial_data_file( parquet_data_path = create_initial_data_file(
started_cluster, instance, inserted_data, TABLE_NAME started_cluster, instance, inserted_data, TABLE_NAME
) )
@ -308,7 +309,7 @@ def test_types(started_cluster, format_version):
[ [
["a", "Nullable(Int32)"], ["a", "Nullable(Int32)"],
["b", "Nullable(String)"], ["b", "Nullable(String)"],
["c", "Nullable(Date32)"], ["c", "Nullable(Date)"],
["d", "Array(Nullable(String))"], ["d", "Array(Nullable(String))"],
["e", "Nullable(Bool)"], ["e", "Nullable(Bool)"],
] ]
@ -367,3 +368,138 @@ def test_delete_files(started_cluster, format_version):
) )
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 50 assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 50
@pytest.mark.parametrize("format_version", ["1", "2"])
def test_evolved_schema(started_cluster, format_version):
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
TABLE_NAME = "test_evolved_schema_" + format_version
write_iceberg_from_df(
spark,
generate_data(spark, 0, 100),
TABLE_NAME,
mode="overwrite",
format_version=format_version,
)
files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
)
create_iceberg_table(instance, TABLE_NAME)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
spark.sql(f"ALTER TABLE {TABLE_NAME} ADD COLUMNS (x bigint)")
files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
)
error = instance.query_and_get_error(f"SELECT * FROM {TABLE_NAME}")
assert "UNSUPPORTED_METHOD" in error
def test_row_based_deletes(started_cluster):
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
TABLE_NAME = "test_row_based_deletes"
spark.sql(
f"CREATE TABLE {TABLE_NAME} (id bigint, data string) USING iceberg TBLPROPERTIES ('format-version' = '2', 'write.update.mode'='merge-on-read', 'write.delete.mode'='merge-on-read', 'write.merge.mode'='merge-on-read')"
)
spark.sql(f"INSERT INTO {TABLE_NAME} select id, char(id + ascii('a')) from range(100)")
files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
)
create_iceberg_table(instance, TABLE_NAME)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
spark.sql(f"DELETE FROM {TABLE_NAME} WHERE id < 10")
files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
)
error = instance.query_and_get_error(f"SELECT * FROM {TABLE_NAME}")
assert "UNSUPPORTED_METHOD" in error
@pytest.mark.parametrize("format_version", ["1", "2"])
def test_schema_inference(started_cluster, format_version):
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
for format in ["Parquet", "ORC", "Avro"]:
TABLE_NAME = "test_schema_inference_" + format + "_" + format_version
# Types time, timestamptz, fixed are not supported in Spark.
spark.sql(
f"CREATE TABLE {TABLE_NAME} (intC int, longC long, floatC float, doubleC double, decimalC1 decimal(10, 3), decimalC2 decimal(20, 10), decimalC3 decimal(38, 30), dateC date, timestampC timestamp, stringC string, binaryC binary, arrayC1 array<int>, mapC1 map<string, string>, structC1 struct<field1: int, field2: string>, complexC array<struct<field1: map<string, array<map<string, int>>>, field2: struct<field3: int, field4: string>>>) USING iceberg TBLPROPERTIES ('format-version' = '{format_version}', 'write.format.default' = '{format}')"
)
spark.sql(
f"insert into {TABLE_NAME} select 42, 4242, 42.42, 4242.4242, decimal(42.42), decimal(42.42), decimal(42.42), date('2020-01-01'), timestamp('2020-01-01 20:00:00'), 'hello', binary('hello'), array(1,2,3), map('key', 'value'), struct(42, 'hello'), array(struct(map('key', array(map('key', 42))), struct(42, 'hello')))"
)
files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
)
create_iceberg_table(instance, TABLE_NAME, format)
res = instance.query(f"DESC {TABLE_NAME} FORMAT TSVRaw")
expected = TSV(
[
["intC", "Nullable(Int32)"],
["longC", "Nullable(Int64)"],
["floatC", "Nullable(Float32)"],
["doubleC", "Nullable(Float64)"],
["decimalC1", "Nullable(Decimal(10, 3))"],
["decimalC2", "Nullable(Decimal(20, 10))"],
["decimalC3", "Nullable(Decimal(38, 30))"],
["dateC", "Nullable(Date)"],
["timestampC", "Nullable(DateTime64(6, \'UTC\'))"],
["stringC", "Nullable(String)"],
["binaryC", "Nullable(String)"],
["arrayC1", "Array(Nullable(Int32))"],
["mapC1", "Map(String, Nullable(String))"],
["structC1", "Tuple(field1 Nullable(Int32), field2 Nullable(String))"],
["complexC", "Array(Tuple(field1 Map(String, Array(Map(String, Nullable(Int32)))), field2 Tuple(field3 Nullable(Int32), field4 Nullable(String))))"],
]
)
assert res == expected
# Check that we can parse data
instance.query(f"SELECT * FROM {TABLE_NAME}")
@pytest.mark.parametrize("format_version", ["1", "2"])
def test_metadata_file_selection(started_cluster, format_version):
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
TABLE_NAME = "test_metadata_selection_" + format_version
spark.sql(
f"CREATE TABLE {TABLE_NAME} (id bigint, data string) USING iceberg TBLPROPERTIES ('format-version' = '2', 'write.update.mode'='merge-on-read', 'write.delete.mode'='merge-on-read', 'write.merge.mode'='merge-on-read')"
)
for i in range(50):
spark.sql(f"INSERT INTO {TABLE_NAME} select id, char(id + ascii('a')) from range(10)")
files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
)
create_iceberg_table(instance, TABLE_NAME)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 500