Parquet: write Date/DateTime as date/datetime-like types instead of plain numbers

This commit is contained in:
Michael Kolupaev 2024-10-23 04:07:18 +00:00
parent 9e2ae7e0c7
commit 5c57b323a5
10 changed files with 85 additions and 19 deletions

View File

@ -971,6 +971,9 @@ Check page size every this many rows. Consider decreasing if you have columns wi
)", 0) \
M(Bool, output_format_parquet_write_page_index, true, R"(
Add a possibility to write page index into parquet files.
)", 0) \
M(Bool, output_format_parquet_datetime_as_uint32, false, R"(
Write DateTime values as raw unix timestamp (read back as UInt32), instead of converting to milliseconds (read back as DateTime64(3)).
)", 0) \
M(String, output_format_avro_codec, "", R"(
Compression codec used for output. Possible values: 'null', 'deflate', 'snappy', 'zstd'.

View File

@ -107,7 +107,8 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"hnsw_candidate_list_size_for_search", 0, 0, "New setting"},
{"allow_reorder_prewhere_conditions", false, true, "New setting"},
{"input_format_parquet_bloom_filter_push_down", false, true, "When reading Parquet files, skip whole row groups based on the WHERE/PREWHERE expressions and bloom filter in the Parquet metadata."},
{"date_time_64_output_format_cut_trailing_zeros_align_to_groups_of_thousands", false, false, "Dynamically trim the trailing zeros of datetime64 values to adjust the output scale to (0, 3, 6), corresponding to 'seconds', 'milliseconds', and 'microseconds'."}
{"date_time_64_output_format_cut_trailing_zeros_align_to_groups_of_thousands", false, false, "Dynamically trim the trailing zeros of datetime64 values to adjust the output scale to (0, 3, 6), corresponding to 'seconds', 'milliseconds', and 'microseconds'."},
{"output_format_parquet_datetime_as_uint32", true, false, "Write DateTime as DateTime64(3) instead of UInt32 (these are the two Parquet types closest to DateTime)."},
}
},
{"24.9",

View File

@ -198,6 +198,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
format_settings.parquet.skip_columns_with_unsupported_types_in_schema_inference = settings[Setting::input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference];
format_settings.parquet.output_string_as_string = settings[Setting::output_format_parquet_string_as_string];
format_settings.parquet.output_fixed_string_as_fixed_byte_array = settings[Setting::output_format_parquet_fixed_string_as_fixed_byte_array];
format_settings.parquet.output_datetime_as_uint32 = settings[Setting::output_format_parquet_datetime_as_uint32];
format_settings.parquet.max_block_size = settings[Setting::input_format_parquet_max_block_size];
format_settings.parquet.prefer_block_bytes = settings[Setting::input_format_parquet_prefer_block_bytes];
format_settings.parquet.output_compression_method = settings[Setting::output_format_parquet_compression_method];

View File

@ -281,6 +281,7 @@ struct FormatSettings
std::unordered_set<int> skip_row_groups = {};
bool output_string_as_string = false;
bool output_fixed_string_as_fixed_byte_array = true;
bool output_datetime_as_uint32 = false;
bool preserve_order = false;
bool use_custom_encoder = true;
bool parallel_encoding = true;

View File

@ -324,9 +324,9 @@ void preparePrimitiveColumn(ColumnPtr column, DataTypePtr type, const std::strin
case TypeIndex::Enum8: types(T::INT32, C::INT_8, int_type(8, true)); break; // Int8
case TypeIndex::Enum16: types(T::INT32, C::INT_16, int_type(16, true)); break; // Int16
case TypeIndex::IPv4: types(T::INT32, C::UINT_32, int_type(32, false)); break; // UInt32
case TypeIndex::Date: types(T::INT32, C::UINT_16, int_type(16, false)); break; // UInt16
case TypeIndex::DateTime: types(T::INT32, C::UINT_32, int_type(32, false)); break; // UInt32
/// Parquet doesn't have 16-bit date type, so we cast Date to 32 bits.
case TypeIndex::Date:
case TypeIndex::Date32:
{
parq::LogicalType t;
@ -335,6 +335,27 @@ void preparePrimitiveColumn(ColumnPtr column, DataTypePtr type, const std::strin
break;
}
/// Parquet only has timestamps in milli-/micro-/nanoseconds, not seconds.
/// So we either write it as plain UInt32 or multiply by 1000 and write as milliseconds
/// (equivalent to DateTime64(3)).
case TypeIndex::DateTime:
if (options.output_datetime_as_uint32)
types(T::INT32, C::UINT_32, int_type(32, false)); // UInt32
else
{
/// DateTime64(3).
parq::TimeUnit unit;
unit.__set_MILLIS({});
parq::TimestampType tt;
tt.__set_isAdjustedToUTC(true);
tt.__set_unit(unit);
parq::LogicalType t;
t.__set_TIMESTAMP(tt);
types(T::INT64, parq::ConvertedType::TIMESTAMP_MILLIS, t);
state.datetime_multiplier = 1000;
}
break;
case TypeIndex::DateTime64:
{
parq::ConvertedType::type converted;
@ -372,7 +393,7 @@ void preparePrimitiveColumn(ColumnPtr column, DataTypePtr type, const std::strin
parq::LogicalType t;
t.__set_TIMESTAMP(tt);
types(T::INT64, converted, t);
state.datetime64_multiplier = DataTypeDateTime64::getScaleMultiplier(converted_scale - scale);
state.datetime_multiplier = DataTypeDateTime64::getScaleMultiplier(converted_scale - scale);
break;
}

View File

@ -278,6 +278,26 @@ struct ConverterDateTime64WithMultiplier
}
};
/// Multiply DateTime by 1000 to get milliseconds (because Parquet doesn't support seconds).
struct ConverterDateTime
{
using Statistics = StatisticsNumeric<Int64, Int64>;
using Col = ColumnVector<UInt32>;
const Col & column;
PODArray<Int64> buf;
explicit ConverterDateTime(const ColumnPtr & c) : column(assert_cast<const Col &>(*c)) {}
const Int64 * getBatch(size_t offset, size_t count)
{
buf.resize(count);
for (size_t i = 0; i < count; ++i)
buf[i] = static_cast<Int64>(column.getData()[offset + i]) * 1000;
return buf.data();
}
};
struct ConverterString
{
using Statistics = StatisticsStringRef;
@ -360,8 +380,9 @@ struct ConverterNumberAsFixedString
size_t fixedStringSize() { return sizeof(T); }
};
/// Like ConverterNumberAsFixedString, but converts to big-endian. Because that's the byte order
/// Parquet uses for decimal types and literally nothing else, for some reason.
/// Like ConverterNumberAsFixedString, but converts to big-endian. (Parquet uses little-endian
/// for INT32 and INT64, but big-endian for decimals represented as FIXED_LEN_BYTE_ARRAY, presumably
/// to make them comparable lexicographically.)
template <typename T>
struct ConverterDecimal
{
@ -821,18 +842,22 @@ void writeColumnChunkBody(ColumnChunkWriteState & s, const WriteOptions & option
N(UInt8, Int32Type);
break;
case TypeIndex::UInt16 : N(UInt16, Int32Type); break;
case TypeIndex::UInt32 : N(UInt32, Int32Type); break;
case TypeIndex::UInt64 : N(UInt64, Int64Type); break;
case TypeIndex::Int8 : N(Int8, Int32Type); break;
case TypeIndex::Int16 : N(Int16, Int32Type); break;
case TypeIndex::Int32 : N(Int32, Int32Type); break;
case TypeIndex::Int64 : N(Int64, Int64Type); break;
case TypeIndex::Enum8: N(Int8, Int32Type); break;
case TypeIndex::Enum16: N(Int16, Int32Type); break;
case TypeIndex::Date: N(UInt16, Int32Type); break;
case TypeIndex::Date32: N(Int32, Int32Type); break;
case TypeIndex::DateTime: N(UInt32, Int32Type); break;
case TypeIndex::UInt32:
if (s.datetime_multiplier == 1)
N(UInt32, Int32Type);
else
{
/// It's actually a DateTime that needs to be converted to milliseconds.
chassert(s.datetime_multiplier == 1000);
writeColumnImpl<parquet::Int64Type>(s, options, out, ConverterDateTime(s.primitive_column));
}
break;
#undef N
@ -849,14 +874,14 @@ void writeColumnChunkBody(ColumnChunkWriteState & s, const WriteOptions & option
break;
case TypeIndex::DateTime64:
if (s.datetime64_multiplier == 1)
if (s.datetime_multiplier == 1)
writeColumnImpl<parquet::Int64Type>(
s, options, out, ConverterNumeric<ColumnDecimal<DateTime64>, Int64, Int64>(
s.primitive_column));
else
writeColumnImpl<parquet::Int64Type>(
s, options, out, ConverterDateTime64WithMultiplier(
s.primitive_column, s.datetime64_multiplier));
s.primitive_column, s.datetime_multiplier));
break;
case TypeIndex::IPv4:

View File

@ -18,6 +18,7 @@ struct WriteOptions
{
bool output_string_as_string = false;
bool output_fixed_string_as_fixed_byte_array = true;
bool output_datetime_as_uint32 = false;
CompressionMethod compression = CompressionMethod::Lz4;
@ -44,7 +45,7 @@ struct ColumnChunkWriteState
ColumnPtr primitive_column;
CompressionMethod compression; // must match what's inside column_chunk
Int64 datetime64_multiplier = 1; // for converting e.g. seconds to milliseconds
Int64 datetime_multiplier = 1; // for converting e.g. seconds to milliseconds
bool is_bool = false; // bool vs UInt8 have the same column type but are encoded differently
/// Repetition and definition levels. Produced by prepareColumnForWrite().

View File

@ -97,6 +97,7 @@ ParquetBlockOutputFormat::ParquetBlockOutputFormat(WriteBuffer & out_, const Blo
}
options.output_string_as_string = format_settings.parquet.output_string_as_string;
options.output_fixed_string_as_fixed_byte_array = format_settings.parquet.output_fixed_string_as_fixed_byte_array;
options.output_datetime_as_uint32 = format_settings.parquet.output_datetime_as_uint32;
options.data_page_size = format_settings.parquet.data_page_size;
options.write_batch_size = format_settings.parquet.write_batch_size;

View File

@ -6,7 +6,7 @@ i8 Nullable(Int8)
i16 Nullable(Int16)
i32 Nullable(Int32)
i64 Nullable(Int64)
date Nullable(UInt16)
date Nullable(Date32)
date32 Nullable(Date32)
datetime Nullable(UInt32)
datetime64 Nullable(DateTime64(3, \'UTC\'))
@ -27,6 +27,9 @@ decimal256 Nullable(Decimal(76, 40))
ipv4 Nullable(UInt32)
ipv6 Nullable(FixedString(16))
0
datetime Nullable(DateTime64(3, \'UTC\'))
0
0
0
0
0

View File

@ -42,11 +42,20 @@ create temporary table basic_types_02735 as select * from generateRandom('
decimal256 Decimal256(40),
ipv4 IPv4,
ipv6 IPv6') limit 1011;
insert into function file(basic_types_02735.parquet) select * from basic_types_02735;
insert into function file(basic_types_02735.parquet) select * from basic_types_02735 settings output_format_parquet_datetime_as_uint32 = 1;
desc file(basic_types_02735.parquet);
select (select sum(cityHash64(*)) from basic_types_02735) - (select sum(cityHash64(*)) from file(basic_types_02735.parquet));
drop table basic_types_02735;
-- DateTime values don't roundtrip (without output_format_parquet_datetime_as_uint32) because we
-- write them as DateTime64(3) (the closest type supported by Parquet).
drop table if exists datetime_02735;
create temporary table datetime_02735 as select * from generateRandom('datetime DateTime') limit 1011;
insert into function file(datetime_02735.parquet) select * from datetime_02735;
desc file(datetime_02735.parquet);
select (select sum(cityHash64(toDateTime64(datetime, 3))) from datetime_02735) - (select sum(cityHash64(*)) from file(datetime_02735.parquet));
select (select sum(cityHash64(*)) from datetime_02735) - (select sum(cityHash64(*)) from file(datetime_02735.parquet, Parquet, 'datetime DateTime'));
drop table datetime_02735;
drop table if exists nullables_02735;
create temporary table nullables_02735 as select * from generateRandom('
@ -65,7 +74,7 @@ select (select sum(cityHash64(*)) from nullables_02735) - (select sum(cityHash64
drop table nullables_02735;
-- TODO: When cityHash64() fully supports Nullable: https://github.com/ClickHouse/ClickHouse/pull/48625
-- TODO: When cityHash64() fully supports Nullable: https://github.com/ClickHouse/ClickHouse/pull/58754
-- the next two blocks can be simplified: arrays_out_02735 intermediate table is not needed,
-- a.csv and b.csv are not needed.