Add ParquetMetadata input format to read Parquet file metadata

This commit is contained in:
avogar 2023-04-18 11:07:08 +00:00
parent 8bc0a3a899
commit b277a5c943
7 changed files with 952 additions and 75 deletions

View File

@ -10,80 +10,82 @@ results of a `SELECT`, and to perform `INSERT`s into a file-backed table.
The supported formats are:
| Format | Input | Output |
|-------------------------------------------------------------------------------------------|------|--------|
| [TabSeparated](#tabseparated) | ✔ | ✔ |
| [TabSeparatedRaw](#tabseparatedraw) | ✔ | ✔ |
| [TabSeparatedWithNames](#tabseparatedwithnames) | ✔ | ✔ |
| [TabSeparatedWithNamesAndTypes](#tabseparatedwithnamesandtypes) | ✔ | ✔ |
| [TabSeparatedRawWithNames](#tabseparatedrawwithnames) | ✔ | ✔ |
| [TabSeparatedRawWithNamesAndTypes](#tabseparatedrawwithnamesandtypes) | ✔ | ✔ |
| [Template](#format-template) | ✔ | ✔ |
| [TemplateIgnoreSpaces](#templateignorespaces) | ✔ | ✗ |
| [CSV](#csv) | ✔ | ✔ |
| [CSVWithNames](#csvwithnames) | ✔ | ✔ |
| [CSVWithNamesAndTypes](#csvwithnamesandtypes) | ✔ | ✔ |
| [CustomSeparated](#format-customseparated) | ✔ | ✔ |
| [CustomSeparatedWithNames](#customseparatedwithnames) | ✔ | ✔ |
| [CustomSeparatedWithNamesAndTypes](#customseparatedwithnamesandtypes) | ✔ | ✔ |
| [SQLInsert](#sqlinsert) | ✗ | ✔ |
| [Values](#data-format-values) | ✔ | ✔ |
| [Vertical](#vertical) | ✗ | ✔ |
| [JSON](#json) | ✔ | ✔ |
| [JSONAsString](#jsonasstring) | ✔ | ✗ |
| [JSONStrings](#jsonstrings) | ✔ | ✔ |
| [JSONColumns](#jsoncolumns) | ✔ | ✔ |
| [JSONColumnsWithMetadata](#jsoncolumnsmonoblock)) | ✔ | ✔ |
| [JSONCompact](#jsoncompact) | ✔ | ✔ |
| [JSONCompactStrings](#jsoncompactstrings) | ✗ | ✔ |
| [JSONCompactColumns](#jsoncompactcolumns) | ✔ | ✔ |
| [JSONEachRow](#jsoneachrow) | ✔ | ✔ |
| [JSONEachRowWithProgress](#jsoneachrowwithprogress) | ✗ | ✔ |
| [JSONStringsEachRow](#jsonstringseachrow) | ✔ | ✔ |
| [JSONStringsEachRowWithProgress](#jsonstringseachrowwithprogress) | ✗ | ✔ |
| [JSONCompactEachRow](#jsoncompacteachrow) | ✔ | ✔ |
| [JSONCompactEachRowWithNames](#jsoncompacteachrowwithnames) | ✔ | ✔ |
| [JSONCompactEachRowWithNamesAndTypes](#jsoncompacteachrowwithnamesandtypes) | ✔ | ✔ |
| [JSONCompactStringsEachRow](#jsoncompactstringseachrow) | ✔ | ✔ |
| [JSONCompactStringsEachRowWithNames](#jsoncompactstringseachrowwithnames) | ✔ | ✔ |
| Format | Input | Output |
|------------------------------------------------------------------------------------------|------|--------|
| [TabSeparated](#tabseparated) | ✔ | ✔ |
| [TabSeparatedRaw](#tabseparatedraw) | ✔ | ✔ |
| [TabSeparatedWithNames](#tabseparatedwithnames) | ✔ | ✔ |
| [TabSeparatedWithNamesAndTypes](#tabseparatedwithnamesandtypes) | ✔ | ✔ |
| [TabSeparatedRawWithNames](#tabseparatedrawwithnames) | ✔ | ✔ |
| [TabSeparatedRawWithNamesAndTypes](#tabseparatedrawwithnamesandtypes) | ✔ | ✔ |
| [Template](#format-template) | ✔ | ✔ |
| [TemplateIgnoreSpaces](#templateignorespaces) | ✔ | ✗ |
| [CSV](#csv) | ✔ | ✔ |
| [CSVWithNames](#csvwithnames) | ✔ | ✔ |
| [CSVWithNamesAndTypes](#csvwithnamesandtypes) | ✔ | ✔ |
| [CustomSeparated](#format-customseparated) | ✔ | ✔ |
| [CustomSeparatedWithNames](#customseparatedwithnames) | ✔ | ✔ |
| [CustomSeparatedWithNamesAndTypes](#customseparatedwithnamesandtypes) | ✔ | ✔ |
| [SQLInsert](#sqlinsert) | ✗ | ✔ |
| [Values](#data-format-values) | ✔ | ✔ |
| [Vertical](#vertical) | ✗ | ✔ |
| [JSON](#json) | ✔ | ✔ |
| [JSONAsString](#jsonasstring) | ✔ | ✗ |
| [JSONStrings](#jsonstrings) | ✔ | ✔ |
| [JSONColumns](#jsoncolumns) | ✔ | ✔ |
| [JSONColumnsWithMetadata](#jsoncolumnsmonoblock)) | ✔ | ✔ |
| [JSONCompact](#jsoncompact) | ✔ | ✔ |
| [JSONCompactStrings](#jsoncompactstrings) | ✗ | ✔ |
| [JSONCompactColumns](#jsoncompactcolumns) | ✔ | ✔ |
| [JSONEachRow](#jsoneachrow) | ✔ | ✔ |
| [PrettyJSONEachRow](#prettyjsoneachrow) | ✗ | ✔ |
| [JSONEachRowWithProgress](#jsoneachrowwithprogress) | ✗ | ✔ |
| [JSONStringsEachRow](#jsonstringseachrow) | ✔ | ✔ |
| [JSONStringsEachRowWithProgress](#jsonstringseachrowwithprogress) | ✗ | ✔ |
| [JSONCompactEachRow](#jsoncompacteachrow) | ✔ | ✔ |
| [JSONCompactEachRowWithNames](#jsoncompacteachrowwithnames) | ✔ | ✔ |
| [JSONCompactEachRowWithNamesAndTypes](#jsoncompacteachrowwithnamesandtypes) | ✔ | ✔ |
| [JSONCompactStringsEachRow](#jsoncompactstringseachrow) | ✔ | ✔ |
| [JSONCompactStringsEachRowWithNames](#jsoncompactstringseachrowwithnames) | ✔ | ✔ |
| [JSONCompactStringsEachRowWithNamesAndTypes](#jsoncompactstringseachrowwithnamesandtypes) | ✔ | ✔ |
| [JSONObjectEachRow](#jsonobjecteachrow) | ✔ | ✔ |
| [BSONEachRow](#bsoneachrow) | ✔ | ✔ |
| [TSKV](#tskv) | ✔ | ✔ |
| [Pretty](#pretty) | ✗ | ✔ |
| [PrettyNoEscapes](#prettynoescapes) | ✗ | ✔ |
| [PrettyMonoBlock](#prettymonoblock) | ✗ | ✔ |
| [PrettyNoEscapesMonoBlock](#prettynoescapesmonoblock) | ✗ | ✔ |
| [PrettyCompact](#prettycompact) | ✗ | ✔ |
| [PrettyCompactNoEscapes](#prettycompactnoescapes) | ✗ | ✔ |
| [PrettyCompactMonoBlock](#prettycompactmonoblock) | ✗ | ✔ |
| [PrettyCompactNoEscapesMonoBlock](#prettycompactnoescapesmonoblock) | ✗ | ✔ |
| [PrettySpace](#prettyspace) | ✗ | ✔ |
| [PrettySpaceNoEscapes](#prettyspacenoescapes) | ✗ | ✔ |
| [PrettySpaceMonoBlock](#prettyspacemonoblock) | ✗ | ✔ |
| [PrettySpaceNoEscapesMonoBlock](#prettyspacenoescapesmonoblock) | ✗ | ✔ |
| [Prometheus](#prometheus) | ✗ | ✔ |
| [Protobuf](#protobuf) | ✔ | ✔ |
| [ProtobufSingle](#protobufsingle) | ✔ | ✔ |
| [Avro](#data-format-avro) | ✔ | ✔ |
| [AvroConfluent](#data-format-avro-confluent) | ✔ | ✗ |
| [Parquet](#data-format-parquet) | ✔ | ✔ |
| [Arrow](#data-format-arrow) | ✔ | ✔ |
| [ArrowStream](#data-format-arrow-stream) | ✔ | ✔ |
| [ORC](#data-format-orc) | ✔ | ✔ |
| [RowBinary](#rowbinary) | ✔ | ✔ |
| [RowBinaryWithNames](#rowbinarywithnamesandtypes) | ✔ | ✔ |
| [RowBinaryWithNamesAndTypes](#rowbinarywithnamesandtypes) | ✔ | ✔ |
| [Native](#native) | ✔ | ✔ |
| [Null](#null) | ✗ | ✔ |
| [XML](#xml) | ✗ | ✔ |
| [CapnProto](#capnproto) | ✔ | ✔ |
| [LineAsString](#lineasstring) | ✔ | ✔ |
| [Regexp](#data-format-regexp) | ✔ | ✗ |
| [RawBLOB](#rawblob) | ✔ | ✔ |
| [MsgPack](#msgpack) | ✔ | ✔ |
| [MySQLDump](#mysqldump) | ✔ | ✗ |
| [Markdown](#markdown) | ✗ | ✔ |
| [JSONObjectEachRow](#jsonobjecteachrow) | ✔ | ✔ |
| [BSONEachRow](#bsoneachrow) | ✔ | ✔ |
| [TSKV](#tskv) | ✔ | ✔ |
| [Pretty](#pretty) | ✗ | ✔ |
| [PrettyNoEscapes](#prettynoescapes) | ✗ | ✔ |
| [PrettyMonoBlock](#prettymonoblock) | ✗ | ✔ |
| [PrettyNoEscapesMonoBlock](#prettynoescapesmonoblock) | ✗ | ✔ |
| [PrettyCompact](#prettycompact) | ✗ | ✔ |
| [PrettyCompactNoEscapes](#prettycompactnoescapes) | ✗ | ✔ |
| [PrettyCompactMonoBlock](#prettycompactmonoblock) | ✗ | ✔ |
| [PrettyCompactNoEscapesMonoBlock](#prettycompactnoescapesmonoblock) | ✗ | ✔ |
| [PrettySpace](#prettyspace) | ✗ | ✔ |
| [PrettySpaceNoEscapes](#prettyspacenoescapes) | ✗ | ✔ |
| [PrettySpaceMonoBlock](#prettyspacemonoblock) | ✗ | ✔ |
| [PrettySpaceNoEscapesMonoBlock](#prettyspacenoescapesmonoblock) | ✗ | ✔ |
| [Prometheus](#prometheus) | ✗ | ✔ |
| [Protobuf](#protobuf) | ✔ | ✔ |
| [ProtobufSingle](#protobufsingle) | ✔ | ✔ |
| [Avro](#data-format-avro) | ✔ | ✔ |
| [AvroConfluent](#data-format-avro-confluent) | ✔ | ✗ |
| [Parquet](#data-format-parquet) | ✔ | ✔ |
| [ParqueMetadata](#data-format-parquet-metadata) | ✔ | ✗ |
| [Arrow](#data-format-arrow) | ✔ | ✔ |
| [ArrowStream](#data-format-arrow-stream) | ✔ | ✔ |
| [ORC](#data-format-orc) | ✔ | ✔ |
| [RowBinary](#rowbinary) | ✔ | ✔ |
| [RowBinaryWithNames](#rowbinarywithnamesandtypes) | ✔ | ✔ |
| [RowBinaryWithNamesAndTypes](#rowbinarywithnamesandtypes) | ✔ | ✔ |
| [Native](#native) | ✔ | ✔ |
| [Null](#null) | ✗ | ✔ |
| [XML](#xml) | ✗ | ✔ |
| [CapnProto](#capnproto) | ✔ | ✔ |
| [LineAsString](#lineasstring) | ✔ | ✔ |
| [Regexp](#data-format-regexp) | ✔ | ✗ |
| [RawBLOB](#rawblob) | ✔ | ✔ |
| [MsgPack](#msgpack) | ✔ | ✔ |
| [MySQLDump](#mysqldump) | ✔ | ✗ |
| [Markdown](#markdown) | ✗ | ✔ |
You can control some format processing parameters with the ClickHouse settings. For more information read the [Settings](/docs/en/operations/settings/settings-formats.md) section.
@ -915,8 +917,6 @@ Example:
{"num":44,"str":"hello","arr":[0,1,2,3]}
```
While importing data columns with unknown names will be skipped if setting [input_format_skip_unknown_fields](/docs/en/operations/settings/settings-formats.md/#input_format_skip_unknown_fields) is set to 1.
## JSONStringsEachRow {#jsonstringseachrow}
Differs from JSONEachRow only in that data fields are output in strings, not in typed JSON values.
@ -2003,6 +2003,128 @@ To exchange data with Hadoop, you can use [HDFS table engine](/docs/en/engines/t
- [output_format_parquet_version](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_version) - The version of Parquet format used in output format. Default value - `2.latest`.
- [output_format_parquet_compression_method](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_compression_method) - compression method used in output Parquet format. Default value - `snappy`.
## ParquetMetadata {data-format-parquet-metadata}
Special format for reading Parquet file metadata (https://parquet.apache.org/docs/file-format/metadata/). It always outputs one row with the next structure/content:
- num_columns - the number of columns
- num_rows - the total number of rows
- num_row_groups - the total number of row groups
- format_version - parquet format version, always 1.0 or 2.6
- total_byte_size - total bytes size of the data, calculated as the sum of total_byte_size from all row groups
- total_compressed_size - total compressed bytes size of the data, calculated as the sum of total_compressed_size from all row groups
- columns - the list of columns metadata with the next structure:
- name - column name
- path - column path (differs from name for nested column)
- max_definition_level - maximum definition level
- max_repetition_level - maximum repetition level
- physical_type - column physical type
- logical_type - column logical type
- compression - compression used for this column
- encodings - the list of encodings used for this column
- row_groups - the list of row groups metadata with the next structure:
- num_columns - the number of columns in the row group
- num_rows - the number of rows in the row group
- total_byte_size - total bytes size of the row group
- total_compressed_size - total compressed bytes size of the row group
- columns - the list of column chunks metadata with the next structure:
- name - column name
- path - column path
- total_compressed_size - total compressed bytes size of the column
- total_uncompressed_size - total uncompressed bytes size of the row group
- have_statistics - bool flag that indicates if column chunk metadata contains column statistics
- statistics - column chunk statistics (all fields are NULL if have_statistics = false) with the next structure:
- num_values - the number of non-null values in the column chunk
- null_count - the number of NULL values in the column chunk
- distinct_count - the number of distinct values in the column chunk
- min - the minimum value of the column chunk
- max - the maximum column of the column chunk
Example:
```sql
SELECT * FROM file(data.parquet, ParquetMetadata) format PrettyJSONEachRow
```
```json
{
"num_columns": "2",
"num_rows": "1000000",
"num_row_groups": "16",
"format_version": "2.6",
"total_byte_size": "10001981",
"total_compressed_size": "6011415",
"columns": [
{
"name": "number",
"path": "number",
"max_definition_level": "0",
"max_repetition_level": "0",
"physical_type": "INT64",
"logical_type": "Int(bitWidth=64, isSigned=false)",
"compression": "LZ4",
"encodings": [
"RLE_DICTIONARY",
"PLAIN",
"RLE"
]
},
{
"name": "'Hello'",
"path": "'Hello'",
"max_definition_level": "0",
"max_repetition_level": "0",
"physical_type": "BYTE_ARRAY",
"logical_type": "None",
"compression": "LZ4",
"encodings": [
"RLE_DICTIONARY",
"PLAIN",
"RLE"
]
}
],
"row_groups": [
{
"num_columns": "2",
"num_rows": "65409",
"total_byte_size": "654367",
"total_compressed_size": "393396",
"columns": [
{
"name": "number",
"path": "number",
"total_compressed_size": "393329",
"total_uncompressed_size": "654302",
"have_statistics": true,
"statistics": {
"num_values": "65409",
"null_count": "0",
"distinct_count": null,
"min": "0",
"max": "65408"
}
},
{
"name": "'Hello'",
"path": "'Hello'",
"total_compressed_size": "67",
"total_uncompressed_size": "65",
"have_statistics": true,
"statistics": {
"num_values": "65409",
"null_count": "0",
"distinct_count": null,
"min": "Hello",
"max": "Hello"
}
}
]
},
...
]
}
```
## Arrow {#data-format-arrow}
[Apache Arrow](https://arrow.apache.org/) comes with two built-in columnar storage formats. ClickHouse supports read and write operations for these formats.

View File

@ -100,6 +100,7 @@ void registerInputFormatJSONAsString(FormatFactory & factory);
void registerInputFormatJSONAsObject(FormatFactory & factory);
void registerInputFormatLineAsString(FormatFactory & factory);
void registerInputFormatMySQLDump(FormatFactory & factory);
void registerInputFormatParquetMetadata(FormatFactory & factory);
#if USE_HIVE
void registerInputFormatHiveText(FormatFactory & factory);
@ -140,6 +141,7 @@ void registerValuesSchemaReader(FormatFactory & factory);
void registerTemplateSchemaReader(FormatFactory & factory);
void registerMySQLSchemaReader(FormatFactory & factory);
void registerBSONEachRowSchemaReader(FormatFactory & factory);
void registerParquetMetadataSchemaReader(FormatFactory & factory);
void registerFileExtensions(FormatFactory & factory);
@ -240,6 +242,8 @@ void registerFormats()
registerInputFormatCapnProto(factory);
registerInputFormatMySQLDump(factory);
registerInputFormatParquetMetadata(factory);
registerNonTrivialPrefixAndSuffixCheckerJSONEachRow(factory);
registerNonTrivialPrefixAndSuffixCheckerJSONAsString(factory);
registerNonTrivialPrefixAndSuffixCheckerJSONAsObject(factory);
@ -274,6 +278,7 @@ void registerFormats()
registerTemplateSchemaReader(factory);
registerMySQLSchemaReader(factory);
registerBSONEachRowSchemaReader(factory);
registerParquetMetadataSchemaReader(factory);
}
}

View File

@ -0,0 +1,499 @@
#include "ParquetMetadataInputFormat.h"
#if USE_PARQUET
#include <Formats/FormatFactory.h>
#include <IO/ReadBufferFromMemory.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeNullable.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnNullable.h>
#include <Core/NamesAndTypes.h>
#include <arrow/api.h>
#include <arrow/status.h>
#include <parquet/file_reader.h>
#include <parquet/statistics.h>
#include "ArrowBufferedStreams.h"
#include <DataTypes/NestedUtils.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
static NamesAndTypesList getHeaderForParquetMetadata()
{
NamesAndTypesList names_and_types{
{"num_columns", std::make_shared<DataTypeUInt64>()},
{"num_rows", std::make_shared<DataTypeUInt64>()},
{"num_row_groups", std::make_shared<DataTypeUInt64>()},
{"format_version", std::make_shared<DataTypeString>()},
{"total_byte_size", std::make_shared<DataTypeUInt64>()},
{"total_compressed_size", std::make_shared<DataTypeUInt64>()},
{"columns",
std::make_shared<DataTypeArray>(
std::make_shared<DataTypeTuple>(
DataTypes{
std::make_shared<DataTypeString>(),
std::make_shared<DataTypeString>(),
std::make_shared<DataTypeUInt64>(),
std::make_shared<DataTypeUInt64>(),
std::make_shared<DataTypeString>(),
std::make_shared<DataTypeString>(),
std::make_shared<DataTypeString>(),
std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
Names{
"name",
"path",
"max_definition_level",
"max_repetition_level",
"physical_type",
"logical_type",
"compression",
"encodings"}))},
{"row_groups",
std::make_shared<DataTypeArray>(std::make_shared<DataTypeTuple>(
DataTypes{
std::make_shared<DataTypeUInt64>(),
std::make_shared<DataTypeUInt64>(),
std::make_shared<DataTypeUInt64>(),
std::make_shared<DataTypeUInt64>(),
std::make_shared<DataTypeArray>(
std::make_shared<DataTypeTuple>(
DataTypes{
std::make_shared<DataTypeString>(),
std::make_shared<DataTypeString>(),
std::make_shared<DataTypeUInt64>(),
std::make_shared<DataTypeUInt64>(),
DataTypeFactory::instance().get("Bool"),
std::make_shared<DataTypeTuple>(
DataTypes{
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>()),
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>()),
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>()),
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()),
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>())},
Names{"num_values", "null_count", "distinct_count", "min", "max"}),
},
Names{"name", "path", "total_compressed_size", "total_uncompressed_size", "have_statistics", "statistics"}))},
Names{"num_columns", "num_rows", "total_byte_size", "total_compressed_size", "columns"}))},
};
return names_and_types;
}
void checkHeader(const Block & header)
{
auto expected_names_and_types = getHeaderForParquetMetadata();
std::unordered_map<String, DataTypePtr> name_to_type;
for (const auto & [name, type] : expected_names_and_types)
name_to_type[name] = type;
for (const auto & [name, type] : header.getNamesAndTypes())
{
auto it = name_to_type.find(name);
if (it == name_to_type.end())
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Unexpected column: {}. ParquetMetadata format allows only the next columns: num_columns, num_rows, num_row_groups, "
"format_version, columns, row_groups", name);
if (!it->second->equals(*type))
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Unexpected type {} for column {}. Expected type: {}",
type->getName(),
name,
it->second->getName());
}
}
static std::shared_ptr<parquet::FileMetaData> getFileMetadata(
ReadBuffer & in,
const FormatSettings & format_settings,
std::atomic<int> & is_stopped)
{
auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES);
return parquet::ReadMetaData(arrow_file);
}
ParquetMetadataInputFormat::ParquetMetadataInputFormat(ReadBuffer & in_, Block header_, const FormatSettings & format_settings_)
: IInputFormat(std::move(header_), in_), format_settings(format_settings_)
{
checkHeader(getPort().getHeader());
}
Chunk ParquetMetadataInputFormat::generate()
{
Chunk res;
if (done)
return res;
auto metadata = getFileMetadata(*in, format_settings, is_stopped);
const auto & header = getPort().getHeader();
auto names_and_types = getHeaderForParquetMetadata();
auto names = names_and_types.getNames();
auto types = names_and_types.getTypes();
for (const auto & name : header.getNames())
{
/// num_columns
if (name == names[0])
{
auto column = types[0]->createColumn();
assert_cast<ColumnUInt64 &>(*column).insertValue(metadata->num_columns());
res.addColumn(std::move(column));
}
/// num_rows
else if (name == names[1])
{
auto column = types[1]->createColumn();
assert_cast<ColumnUInt64 &>(*column).insertValue(metadata->num_rows());
res.addColumn(std::move(column));
}
/// num_row_groups
else if (name == names[2])
{
auto column = types[2]->createColumn();
assert_cast<ColumnUInt64 &>(*column).insertValue(metadata->num_row_groups());
res.addColumn(std::move(column));
}
/// format_version
else if (name == names[3])
{
auto column = types[3]->createColumn();
String version = metadata->version() == parquet::ParquetVersion::PARQUET_1_0 ? "1.0" : "2.6";
assert_cast<ColumnString &>(*column).insertData(version.data(), version.size());
res.addColumn(std::move(column));
}
/// total_byte_size
else if (name == names[4])
{
auto column = types[4]->createColumn();
size_t total_byte_size = 0;
for (int32_t i = 0; i != metadata->num_row_groups(); ++i)
total_byte_size += metadata->RowGroup(i)->total_byte_size();
assert_cast<ColumnUInt64 &>(*column).insertValue(total_byte_size);
res.addColumn(std::move(column));
}
/// total_compressed_size
else if (name == names[5])
{
auto column = types[5]->createColumn();
size_t total_compressed_size = 0;
for (int32_t i = 0; i != metadata->num_row_groups(); ++i)
total_compressed_size += metadata->RowGroup(i)->total_compressed_size();
assert_cast<ColumnUInt64 &>(*column).insertValue(total_compressed_size);
res.addColumn(std::move(column));
}
/// columns
else if (name == names[6])
{
auto column = types[6]->createColumn();
fillColumnsMetadata(metadata, column);
res.addColumn(std::move(column));
}
/// row_groups
else if (name == names[7])
{
auto column = types[7]->createColumn();
fillRowGroupsMetadata(metadata, column);
res.addColumn(std::move(column));
}
}
done = true;
return res;
}
void ParquetMetadataInputFormat::fillColumnsMetadata(const std::shared_ptr<parquet::FileMetaData> & metadata, MutableColumnPtr & column)
{
auto & array_column = assert_cast<ColumnArray &>(*column);
auto & tuple_column = assert_cast<ColumnTuple &>(array_column.getData());
int32_t num_columns = metadata->num_columns();
for (int32_t i = 0; i != num_columns; ++i)
{
const auto * column_info = metadata->schema()->Column(i);
/// name
String column_name = column_info->name();
assert_cast<ColumnString &>(tuple_column.getColumn(0)).insertData(column_name.data(), column_name.size());
/// path
String path = column_info->path()->ToDotString();
assert_cast<ColumnString &>(tuple_column.getColumn(1)).insertData(path.data(), path.size());
/// max_definition_level
assert_cast<ColumnUInt64 &>(tuple_column.getColumn(2)).insertValue(column_info->max_definition_level());
/// max_repetition_level
assert_cast<ColumnUInt64 &>(tuple_column.getColumn(3)).insertValue(column_info->max_repetition_level());
/// physical_type
std::string_view physical_type = magic_enum::enum_name(column_info->physical_type());
assert_cast<ColumnString &>(tuple_column.getColumn(4)).insertData(physical_type.data(), physical_type.size());
/// logical_type
String logical_type = column_info->logical_type()->ToString();
assert_cast<ColumnString &>(tuple_column.getColumn(5)).insertData(logical_type.data(), logical_type.size());
if (metadata->num_row_groups() > 0)
{
auto column_chunk_metadata = metadata->RowGroup(0)->ColumnChunk(i);
std::string_view compression = magic_enum::enum_name(column_chunk_metadata->compression());
assert_cast<ColumnString &>(tuple_column.getColumn(6)).insertData(compression.data(), compression.size());
auto & encodings_array_column = assert_cast<ColumnArray &>(tuple_column.getColumn(7));
auto & encodings_nested_column = assert_cast<ColumnString &>(encodings_array_column.getData());
for (auto codec : column_chunk_metadata->encodings())
{
auto codec_name = magic_enum::enum_name(codec);
encodings_nested_column.insertData(codec_name.data(), codec_name.size());
}
encodings_array_column.getOffsets().push_back(encodings_nested_column.size());
}
else
{
String compression = "NONE";
assert_cast<ColumnString &>(tuple_column.getColumn(5)).insertData(compression.data(), compression.size());
tuple_column.getColumn(6).insertDefault();
}
}
array_column.getOffsets().push_back(tuple_column.size());
}
void ParquetMetadataInputFormat::fillRowGroupsMetadata(const std::shared_ptr<parquet::FileMetaData> & metadata, MutableColumnPtr & column)
{
auto & row_groups_array_column = assert_cast<ColumnArray &>(*column);
auto & row_groups_column = assert_cast<ColumnTuple &>(row_groups_array_column.getData());
for (int32_t i = 0; i != metadata->num_row_groups(); ++i)
{
auto row_group_metadata = metadata->RowGroup(i);
/// num_columns
assert_cast<ColumnUInt64 &>(row_groups_column.getColumn(0)).insertValue(row_group_metadata->num_columns());
/// num_rows
assert_cast<ColumnUInt64 &>(row_groups_column.getColumn(1)).insertValue(row_group_metadata->num_rows());
/// total_bytes_size
assert_cast<ColumnUInt64 &>(row_groups_column.getColumn(2)).insertValue(row_group_metadata->total_byte_size());
/// total_compressed_size
assert_cast<ColumnUInt64 &>(row_groups_column.getColumn(3)).insertValue(row_group_metadata->total_compressed_size());
/// columns
fillColumnChunksMetadata(row_group_metadata, row_groups_column.getColumn(4));
}
row_groups_array_column.getOffsets().push_back(row_groups_column.size());
}
void ParquetMetadataInputFormat::fillColumnChunksMetadata(const std::unique_ptr<parquet::RowGroupMetaData> & row_group_metadata, IColumn & column)
{
auto & array_column = assert_cast<ColumnArray &>(column);
auto & tuple_column = assert_cast<ColumnTuple &>(array_column.getData());
for (int32_t column_i = 0; column_i != row_group_metadata->num_columns(); ++column_i)
{
auto column_chunk_metadata = row_group_metadata->ColumnChunk(column_i);
/// name
String column_name = row_group_metadata->schema()->Column(column_i)->name();
assert_cast<ColumnString &>(tuple_column.getColumn(0)).insertData(column_name.data(), column_name.size());
/// path
String path = row_group_metadata->schema()->Column(column_i)->path()->ToDotString();
assert_cast<ColumnString &>(tuple_column.getColumn(1)).insertData(path.data(), path.size());
/// total_compressed_size
assert_cast<ColumnUInt64 &>(tuple_column.getColumn(2)).insertValue(column_chunk_metadata->total_compressed_size());
/// total_uncompressed_size
assert_cast<ColumnUInt64 &>(tuple_column.getColumn(3)).insertValue(column_chunk_metadata->total_uncompressed_size());
/// have_statistics
bool have_statistics = column_chunk_metadata->is_stats_set();
assert_cast<ColumnUInt8 &>(tuple_column.getColumn(4)).insertValue(have_statistics);
if (have_statistics)
fillColumnStatistics(column_chunk_metadata->statistics(), tuple_column.getColumn(5), row_group_metadata->schema()->Column(column_i)->type_length());
else
tuple_column.getColumn(5).insertDefault();
}
array_column.getOffsets().push_back(tuple_column.size());
}
template <typename T>
static void getMinMaxNumberStatistics(const std::shared_ptr<parquet::Statistics> & statistics, String & min, String & max)
{
const auto & typed_statistics = dynamic_cast<parquet::TypedStatistics<T> &>(*statistics);
min = std::to_string(typed_statistics.min());
max = std::to_string(typed_statistics.max());
}
void ParquetMetadataInputFormat::fillColumnStatistics(const std::shared_ptr<parquet::Statistics> & statistics, IColumn & column, int32_t type_length)
{
auto & statistics_column = assert_cast<ColumnTuple &>(column);
/// num_values
auto & nullable_num_values = assert_cast<ColumnNullable &>(statistics_column.getColumn(0));
assert_cast<ColumnUInt64 &>(nullable_num_values.getNestedColumn()).insertValue(statistics->num_values());
nullable_num_values.getNullMapData().push_back(0);
/// null_count
if (statistics->HasNullCount())
{
auto & nullable_null_count = assert_cast<ColumnNullable &>(statistics_column.getColumn(1));
assert_cast<ColumnUInt64 &>(nullable_null_count.getNestedColumn()).insertValue(statistics->null_count());
nullable_null_count.getNullMapData().push_back(0);
}
else
{
statistics_column.getColumn(1).insertDefault();
}
/// distinct_count
if (statistics->HasDistinctCount())
{
auto & nullable_distinct_count = assert_cast<ColumnNullable &>(statistics_column.getColumn(2));
size_t distinct_count = statistics->distinct_count();
/// It can be set but still be 0 because of a bug: https://github.com/apache/arrow/issues/27644
/// If we see distinct_count = 0 with non 0 values in chunk, set it to NULL.
if (distinct_count == 0 && statistics->num_values() != 0)
{
nullable_distinct_count.insertDefault();
}
else
{
assert_cast<ColumnUInt64 &>(nullable_distinct_count.getNestedColumn()).insertValue(distinct_count);
nullable_distinct_count.getNullMapData().push_back(0);
}
}
else
{
statistics_column.getColumn(2).insertDefault();
}
/// min/max
if (statistics->HasMinMax() && statistics->physical_type() != parquet::Type::type::UNDEFINED)
{
String min;
String max;
switch (statistics->physical_type())
{
case parquet::Type::type::FLOAT:
{
getMinMaxNumberStatistics<parquet::FloatType>(statistics, min, max);
break;
}
case parquet::Type::type::DOUBLE:
{
getMinMaxNumberStatistics<parquet::DoubleType>(statistics, min, max);
break;
}
case parquet::Type::type::INT32:
{
getMinMaxNumberStatistics<parquet::Int32Type>(statistics, min, max);
break;
}
case parquet::Type::type::INT64:
{
getMinMaxNumberStatistics<parquet::Int64Type>(statistics, min, max);
break;
}
case parquet::Type::type::INT96:
{
const auto & int96_statistics = dynamic_cast<parquet::TypedStatistics<parquet::Int96Type> &>(*statistics);
min = parquet::Int96ToString(int96_statistics.min());
max = parquet::Int96ToString(int96_statistics.max());
break;
}
case parquet::Type::type::BOOLEAN:
{
getMinMaxNumberStatistics<parquet::BooleanType>(statistics, min, max);
break;
}
case parquet::Type::type::BYTE_ARRAY:
{
const auto & byte_array_statistics = dynamic_cast<parquet::ByteArrayStatistics &>(*statistics);
min = parquet::ByteArrayToString(byte_array_statistics.min());
max = parquet::ByteArrayToString(byte_array_statistics.max());
break;
}
case parquet::Type::type::FIXED_LEN_BYTE_ARRAY:
{
const auto & flba_statistics = dynamic_cast<parquet::FLBAStatistics &>(*statistics);
min = parquet::FixedLenByteArrayToString(flba_statistics.min(), type_length);
max = parquet::FixedLenByteArrayToString(flba_statistics.max(), type_length);
break;
}
case parquet::Type::type::UNDEFINED:
{
break; /// unreachable
}
}
auto & nullable_min = assert_cast<ColumnNullable &>(statistics_column.getColumn(3));
assert_cast<ColumnString &>(nullable_min.getNestedColumn()).insertData(min.data(), min.size());
nullable_min.getNullMapData().push_back(0);
auto & nullable_max = assert_cast<ColumnNullable &>(statistics_column.getColumn(4));
assert_cast<ColumnString &>(nullable_max.getNestedColumn()).insertData(max.data(), max.size());
nullable_max.getNullMapData().push_back(0);
}
else
{
statistics_column.getColumn(3).insertDefault();
statistics_column.getColumn(4).insertDefault();
}
}
void ParquetMetadataInputFormat::resetParser()
{
IInputFormat::resetParser();
done = false;
}
ParquetMetadataSchemaReader::ParquetMetadataSchemaReader(ReadBuffer & in_)
: ISchemaReader(in_)
{
}
NamesAndTypesList ParquetMetadataSchemaReader::readSchema()
{
return getHeaderForParquetMetadata();
}
void registerInputFormatParquetMetadata(FormatFactory & factory)
{
factory.registerInputFormat(
"ParquetMetadata",
[](ReadBuffer &buf,
const Block &sample,
const RowInputFormatParams &,
const FormatSettings & settings)
{
return std::make_shared<ParquetMetadataInputFormat>(buf, sample, settings);
});
factory.markFormatSupportsSubcolumns("ParquetMetadata");
factory.markFormatSupportsSubsetOfColumns("ParquetMetadata");
}
void registerParquetMetadataSchemaReader(FormatFactory & factory)
{
factory.registerSchemaReader(
"ParquetMetadata",
[](ReadBuffer & buf, const FormatSettings &)
{
return std::make_shared<ParquetMetadataSchemaReader>(buf);
}
);
}
}
#else
namespace DB
{
class FormatFactory;
void registerInputFormatParquetMetadata(FormatFactory &)
{
}
void registerParquetMetadataSchemaReader(FormatFactory &) {}
}
#endif

View File

@ -0,0 +1,90 @@
#pragma once
#include "config.h"
#if USE_PARQUET
#include <Processors/Formats/IInputFormat.h>
#include <Processors/Formats/ISchemaReader.h>
#include <Formats/FormatSettings.h>
#include <parquet/metadata.h>
namespace parquet::arrow { class FileReader; }
namespace arrow { class Buffer; class RecordBatchReader;}
namespace DB
{
/* Special format that always returns just one row with Parquet file metadata (see https://parquet.apache.org/docs/file-format/metadata/).
* The result row have the next structure:
* num_columns - the number of columns
* num_rows - the total number of rows
* num_row_groups - the total number of row groups
* format_version - parquet format version, always 1.0 or 2.6
* total_byte_size - total bytes size of the data, calculated as the sum of total_byte_size from all row groups
* total_compressed_size - total compressed bytes size of the data, calculated as the sum of total_compressed_size from all row groups
* columns - the list of columns metadata with the next structure:
* name - column name
* path - column path (differs from name for nested column)
* max_definition_level - maximum definition level
* max_repetition_level - maximum repetition level
* physical_type - column physical type
* logical_type - column logical type
* compression - compression used for this column
* encodings - the list of encodings used for this column
* row_groups - the list of row groups metadata with the next structure:
* num_columns - the number of columns in the row group
* num_rows - the number of rows in the row group
* total_byte_size - total bytes size of the row group
* total_compressed_size - total compressed bytes size of the row group
* columns - the list of column chunks metadata with the next structure:
* name - column name
* path - column path
* total_compressed_size - total compressed bytes size of the column
* total_uncompressed_size - total uncompressed bytes size of the row group
* have_statistics - bool flag that indicates if column chunk metadata contains column statistics
* statistics - column chunk statistics (all fields are NULL if have_statistics = false) with the next structure:
* num_values - the number of non-null values in the column chunk
* null_count - the number of NULL values in the column chunk
* distinct_count - the number pf distinct values in the column chunk
* min - the minimum value of the column chunk
* max - the maximum column of the column chunk
* */
class ParquetMetadataInputFormat : public IInputFormat
{
public:
ParquetMetadataInputFormat(ReadBuffer & in_, Block header_, const FormatSettings & format_settings_);
String getName() const override { return "ParquetMetadataInputFormat"; }
void resetParser() override;
private:
Chunk generate() override;
void onCancel() override
{
is_stopped = 1;
}
void fillColumnsMetadata(const std::shared_ptr<parquet::FileMetaData> & metadata, MutableColumnPtr & column);
void fillRowGroupsMetadata(const std::shared_ptr<parquet::FileMetaData> & metadata, MutableColumnPtr & column);
void fillColumnChunksMetadata(const std::unique_ptr<parquet::RowGroupMetaData> & row_group_metadata, IColumn & column);
void fillColumnStatistics(const std::shared_ptr<parquet::Statistics> & statistics, IColumn & column, int32_t type_length);
const FormatSettings format_settings;
bool done = false;
std::atomic<int> is_stopped{0};
};
class ParquetMetadataSchemaReader : public ISchemaReader
{
public:
ParquetMetadataSchemaReader(ReadBuffer & in_);
NamesAndTypesList readSchema() override;
};
}
#endif

View File

@ -0,0 +1,154 @@
{
"num_columns": "3",
"num_rows": "100000",
"num_row_groups": "2",
"format_version": "2.6",
"total_byte_size": "314147",
"total_compressed_size": "27081",
"columns": [
{
"name": "number",
"path": "number",
"max_definition_level": "0",
"max_repetition_level": "0",
"physical_type": "INT32",
"logical_type": "Int(bitWidth=16, isSigned=false)",
"compression": "LZ4",
"encodings": [
"RLE_DICTIONARY",
"PLAIN",
"RLE"
]
},
{
"name": "str",
"path": "str",
"max_definition_level": "0",
"max_repetition_level": "0",
"physical_type": "BYTE_ARRAY",
"logical_type": "None",
"compression": "LZ4",
"encodings": [
"RLE_DICTIONARY",
"PLAIN",
"RLE"
]
},
{
"name": "mod",
"path": "mod",
"max_definition_level": "1",
"max_repetition_level": "0",
"physical_type": "INT32",
"logical_type": "Int(bitWidth=8, isSigned=false)",
"compression": "LZ4",
"encodings": [
"RLE_DICTIONARY",
"PLAIN",
"RLE"
]
}
],
"row_groups": [
{
"num_columns": "3",
"num_rows": "65409",
"total_byte_size": "200527",
"total_compressed_size": "14406",
"columns": [
{
"name": "number",
"path": "number",
"total_compressed_size": "7070",
"total_uncompressed_size": "85956",
"have_statistics": true,
"statistics": {
"num_values": "65409",
"null_count": "0",
"distinct_count": null,
"min": "0",
"max": "999"
}
},
{
"name": "str",
"path": "str",
"total_compressed_size": "7093",
"total_uncompressed_size": "93853",
"have_statistics": true,
"statistics": {
"num_values": "65409",
"null_count": "0",
"distinct_count": null,
"min": "Hello0",
"max": "Hello999"
}
},
{
"name": "mod",
"path": "mod",
"total_compressed_size": "243",
"total_uncompressed_size": "20718",
"have_statistics": true,
"statistics": {
"num_values": "32705",
"null_count": "32704",
"distinct_count": null,
"min": "0",
"max": "8"
}
}
]
},
{
"num_columns": "3",
"num_rows": "34591",
"total_byte_size": "113620",
"total_compressed_size": "12675",
"columns": [
{
"name": "number",
"path": "number",
"total_compressed_size": "6223",
"total_uncompressed_size": "47365",
"have_statistics": true,
"statistics": {
"num_values": "34591",
"null_count": "0",
"distinct_count": null,
"min": "0",
"max": "999"
}
},
{
"name": "str",
"path": "str",
"total_compressed_size": "6247",
"total_uncompressed_size": "55262",
"have_statistics": true,
"statistics": {
"num_values": "34591",
"null_count": "0",
"distinct_count": null,
"min": "Hello0",
"max": "Hello999"
}
},
{
"name": "mod",
"path": "mod",
"total_compressed_size": "205",
"total_uncompressed_size": "10993",
"have_statistics": true,
"statistics": {
"num_values": "17295",
"null_count": "17296",
"distinct_count": null,
"min": "0",
"max": "8"
}
}
]
}
]
}

View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_LOCAL -q "select * from file('$CURDIR/data_parquet/02718_data.parquet', ParquetMetadata) format JSONEachRow" | python3 -m json.tool