Support JSONColumnsWithMetadata input format

This commit is contained in:
avogar 2022-09-08 17:58:44 +00:00
parent c380decbbb
commit 46a0318a36
14 changed files with 224 additions and 74 deletions

View File

@ -768,6 +768,31 @@ namespace JSONUtils
return names_and_types;
}
NamesAndTypesList readMetadataAndValidateHeader(ReadBuffer & in, const Block & header)
{
auto names_and_types = JSONUtils::readMetadata(in);
for (const auto & [name, type] : names_and_types)
{
auto header_type = header.getByName(name).type;
if (header.has(name) && !type->equals(*header_type))
throw Exception(
ErrorCodes::INCORRECT_DATA, "Type {} of column '{}' from metadata is not the same as type in header {}", type->getName(), name, header_type->getName());
}
return names_and_types;
}
bool skipUntilFieldInObject(ReadBuffer & in, const String & desired_field_name)
{
while (!checkAndSkipObjectEnd(in))
{
auto field_name = JSONUtils::readFieldName(in);
if (field_name == desired_field_name)
return true;
}
return false;
}
void skipTheRestOfObject(ReadBuffer & in)
{
while (!checkAndSkipObjectEnd(in))

View File

@ -123,7 +123,9 @@ namespace JSONUtils
bool checkAndSkipObjectEnd(ReadBuffer & in);
NamesAndTypesList readMetadata(ReadBuffer & in);
NamesAndTypesList readMetadataAndValidateHeader(ReadBuffer & in, const Block & header);
bool skipUntilFieldInObject(ReadBuffer & in, const String & desired_field_name);
void skipTheRestOfObject(ReadBuffer & in);
}

View File

@ -48,6 +48,8 @@ void registerInputFormatJSONColumns(FormatFactory & factory);
void registerOutputFormatJSONColumns(FormatFactory & factory);
void registerInputFormatJSONCompactColumns(FormatFactory & factory);
void registerOutputFormatJSONCompactColumns(FormatFactory & factory);
void registerInputFormatJSONColumnsWithMetadata(FormatFactory & factory);
void registerOutputFormatJSONColumnsWithMetadata(FormatFactory & factory);
void registerInputFormatProtobuf(FormatFactory & factory);
void registerOutputFormatProtobuf(FormatFactory & factory);
void registerInputFormatProtobufList(FormatFactory & factory);
@ -78,7 +80,6 @@ void registerOutputFormatPrettyCompact(FormatFactory & factory);
void registerOutputFormatPrettySpace(FormatFactory & factory);
void registerOutputFormatVertical(FormatFactory & factory);
void registerOutputFormatJSONEachRowWithProgress(FormatFactory & factory);
void registerOutputFormatJSONColumnsWithMetadata(FormatFactory & factory);
void registerOutputFormatXML(FormatFactory & factory);
void registerOutputFormatODBCDriver2(FormatFactory & factory);
void registerOutputFormatNull(FormatFactory & factory);
@ -118,6 +119,7 @@ void registerJSONAsStringSchemaReader(FormatFactory & factory);
void registerJSONAsObjectSchemaReader(FormatFactory & factory);
void registerJSONColumnsSchemaReader(FormatFactory & factory);
void registerJSONCompactColumnsSchemaReader(FormatFactory & factory);
void registerJSONColumnsWithMetadataSchemaReader(FormatFactory & factory);
void registerNativeSchemaReader(FormatFactory & factory);
void registerRowBinaryWithNamesAndTypesSchemaReader(FormatFactory & factory);
void registerAvroSchemaReader(FormatFactory & factory);
@ -180,6 +182,8 @@ void registerFormats()
registerOutputFormatJSONColumns(factory);
registerInputFormatJSONCompactColumns(factory);
registerOutputFormatJSONCompactColumns(factory);
registerInputFormatJSONColumnsWithMetadata(factory);
registerOutputFormatJSONColumnsWithMetadata(factory);
registerInputFormatProtobuf(factory);
registerOutputFormatProtobufList(factory);
registerInputFormatProtobufList(factory);
@ -207,7 +211,6 @@ void registerFormats()
registerOutputFormatPrettySpace(factory);
registerOutputFormatVertical(factory);
registerOutputFormatJSONEachRowWithProgress(factory);
registerOutputFormatJSONColumnsWithMetadata(factory);
registerOutputFormatXML(factory);
registerOutputFormatODBCDriver2(factory);
registerOutputFormatNull(factory);
@ -246,6 +249,7 @@ void registerFormats()
registerJSONAsObjectSchemaReader(factory);
registerJSONColumnsSchemaReader(factory);
registerJSONCompactColumnsSchemaReader(factory);
registerJSONColumnsWithMetadataSchemaReader(factory);
registerNativeSchemaReader(factory);
registerRowBinaryWithNamesAndTypesSchemaReader(factory);
registerAvroSchemaReader(factory);

View File

@ -401,13 +401,16 @@ void registerCSVSchemaReader(FormatFactory & factory)
{
return std::make_shared<CSVSchemaReader>(buf, with_names, with_types, settings);
});
factory.registerAdditionalInfoForSchemaCacheGetter(format_name, [with_names](const FormatSettings & settings)
if (!with_types)
{
String result = getAdditionalFormatInfoByEscapingRule(settings, FormatSettings::EscapingRule::CSV);
if (!with_names)
result += fmt::format(", column_names_for_schema_inference={}", settings.column_names_for_schema_inference);
return result;
});
factory.registerAdditionalInfoForSchemaCacheGetter(format_name, [with_names](const FormatSettings & settings)
{
String result = getAdditionalFormatInfoByEscapingRule(settings, FormatSettings::EscapingRule::CSV);
if (!with_names)
result += fmt::format(", column_names_for_schema_inference={}", settings.column_names_for_schema_inference);
return result;
});
}
};
registerWithNamesAndTypes("CSV", register_func);

View File

@ -353,19 +353,24 @@ void registerCustomSeparatedSchemaReader(FormatFactory & factory)
{
return std::make_shared<CustomSeparatedSchemaReader>(buf, with_names, with_types, ignore_spaces, settings);
});
factory.registerAdditionalInfoForSchemaCacheGetter(format_name, [](const FormatSettings & settings)
if (!with_types)
{
String result = getAdditionalFormatInfoByEscapingRule(settings, settings.custom.escaping_rule);
return result + fmt::format(
", result_before_delimiter={}, row_before_delimiter={}, field_delimiter={},"
" row_after_delimiter={}, row_between_delimiter={}, result_after_delimiter={}",
settings.custom.result_before_delimiter,
settings.custom.row_before_delimiter,
settings.custom.field_delimiter,
settings.custom.row_after_delimiter,
settings.custom.row_between_delimiter,
settings.custom.result_after_delimiter);
});
factory.registerAdditionalInfoForSchemaCacheGetter(format_name, [with_names](const FormatSettings & settings)
{
String result = getAdditionalFormatInfoByEscapingRule(settings, settings.custom.escaping_rule);
if (!with_names)
result += fmt::format(", column_names_for_schema_inference={}", settings.column_names_for_schema_inference);
return result + fmt::format(
", result_before_delimiter={}, row_before_delimiter={}, field_delimiter={},"
" row_after_delimiter={}, row_between_delimiter={}, result_after_delimiter={}",
settings.custom.result_before_delimiter,
settings.custom.row_before_delimiter,
settings.custom.field_delimiter,
settings.custom.row_after_delimiter,
settings.custom.row_between_delimiter,
settings.custom.result_after_delimiter);
});
}
};
registerWithNamesAndTypes(ignore_spaces ? "CustomSeparatedIgnoreSpaces" : "CustomSeparated", register_func);

View File

@ -0,0 +1,79 @@
#include <Processors/Formats/Impl/JSONColumnsWithMetadataBlockInputFormat.h>
#include <IO/ReadHelpers.h>
#include <Formats/FormatFactory.h>
#include <Formats/EscapingRuleUtils.h>
#include <Formats/JSONUtils.h>
namespace DB
{
JSONColumnsWithMetadataReader::JSONColumnsWithMetadataReader(ReadBuffer & in_, const Block & header_, const FormatSettings & settings)
: JSONColumnsReader(in_), header(header_), use_metadata(settings.json.use_metadata)
{
}
void JSONColumnsWithMetadataReader::readChunkStart()
{
skipBOMIfExists(*in);
JSONUtils::skipObjectStart(*in);
if (use_metadata)
JSONUtils::readMetadataAndValidateHeader(*in, header);
else
JSONUtils::readMetadata(*in);
JSONUtils::skipComma(*in);
if (!JSONUtils::skipUntilFieldInObject(*in, "data"))
throw Exception(ErrorCodes::INCORRECT_DATA, "Expected field \"data\" with table content");
JSONUtils::skipObjectStart(*in);
}
bool JSONColumnsWithMetadataReader::checkChunkEnd()
{
if (!JSONUtils::checkAndSkipObjectEnd(*in))
return false;
JSONUtils::skipTheRestOfObject(*in);
assertEOF(*in);
return true;
}
JSONColumnsWithMetadataSchemaReader::JSONColumnsWithMetadataSchemaReader(ReadBuffer & in_) : ISchemaReader(in_)
{
}
NamesAndTypesList JSONColumnsWithMetadataSchemaReader::readSchema()
{
skipBOMIfExists(in);
JSONUtils::skipObjectStart(in);
return JSONUtils::readMetadata(in);
}
void registerInputFormatJSONColumnsWithMetadata(FormatFactory & factory)
{
factory.registerInputFormat(
"JSONColumnsWithMetadata",
[](ReadBuffer & buf,
const Block &sample,
const RowInputFormatParams &,
const FormatSettings & settings)
{
return std::make_shared<JSONColumnsBlockInputFormatBase>(buf, sample, settings, std::make_unique<JSONColumnsWithMetadataReader>(buf, sample, settings));
}
);
factory.markFormatSupportsSubsetOfColumns("JSONColumnsWithMetadata");
}
void registerJSONColumnsWithMetadataSchemaReader(FormatFactory & factory)
{
factory.registerSchemaReader(
"JSONColumnsWithMetadata",
[](ReadBuffer & buf, const FormatSettings &)
{
return std::make_shared<JSONColumnsWithMetadataSchemaReader>(buf);
}
);
}
}

View File

@ -0,0 +1,38 @@
#pragma once
#include <Processors/Formats/Impl/JSONColumnsBlockInputFormat.h>
#include <Processors/Formats/ISchemaReader.h>
namespace DB
{
/* Format JSONCompactColumns reads each block of data in the next format:
* [
* [value1, value2, value3, ...],
* [value1, value2m value3, ...],
* ...
* ]
*/
class JSONColumnsWithMetadataReader : public JSONColumnsReader
{
public:
JSONColumnsWithMetadataReader(ReadBuffer & in_, const Block & header_, const FormatSettings & settings);
void readChunkStart() override;
bool checkChunkEnd() override;
private:
const Block & header;
bool use_metadata;
};
class JSONColumnsWithMetadataSchemaReader : public ISchemaReader
{
public:
JSONColumnsWithMetadataSchemaReader(ReadBuffer & in_);
NamesAndTypesList readSchema() override;
};
}

View File

@ -241,11 +241,16 @@ void registerJSONCompactEachRowSchemaReader(FormatFactory & factory)
{
return std::make_shared<JSONCompactEachRowRowSchemaReader>(buf, with_names, with_types, json_strings, settings);
});
factory.registerAdditionalInfoForSchemaCacheGetter(format_name, [](const FormatSettings & settings)
if (!with_types)
{
auto result = getAdditionalFormatInfoByEscapingRule(settings, FormatSettings::EscapingRule::JSON);
return result + fmt::format(", column_names_for_schema_inference={}", settings.column_names_for_schema_inference);
});
factory.registerAdditionalInfoForSchemaCacheGetter(format_name, [with_names](const FormatSettings & settings)
{
auto result = getAdditionalFormatInfoByEscapingRule(settings, FormatSettings::EscapingRule::JSON);
if (!with_names)
result += fmt::format(", column_names_for_schema_inference={}", settings.column_names_for_schema_inference);
return result;
});
}
};
registerWithNamesAndTypes(json_strings ? "JSONCompactStringsEachRow" : "JSONCompactEachRow", register_func);
}

View File

@ -24,38 +24,25 @@ void JSONCompactRowInputFormat::readPrefix()
{
skipBOMIfExists(*in);
JSONUtils::skipObjectStart(*in);
auto names_and_types = JSONUtils::readMetadata(*in);
if (use_metadata)
{
auto names_and_types = JSONUtils::readMetadataAndValidateHeader(*in, getPort().getHeader());
Names column_names;
auto header = getPort().getHeader();
for (const auto & [name, type] : names_and_types)
{
auto header_type = header.getByName(name).type;
if (header.has(name) && !type->equals(*header_type))
throw Exception(
ErrorCodes::INCORRECT_DATA, "Type {} of column '{}' from metadata is not the same as type in header {}", type->getName(), name, header_type->getName());
column_names.push_back(name);
}
column_mapping->addColumns(column_names, column_indexes_by_names, format_settings);
}
else
{
JSONUtils::readMetadata(*in);
column_mapping->setupByHeader(getPort().getHeader());
}
JSONUtils::skipComma(*in);
while (!JSONUtils::checkAndSkipObjectEnd(*in))
{
auto field_name = JSONUtils::readFieldName(*in);
if (field_name == "data")
{
JSONUtils::skipArrayStart(*in);
return;
}
}
if (!JSONUtils::skipUntilFieldInObject(*in, "data"))
throw Exception(ErrorCodes::INCORRECT_DATA, "Expected field \"data\" with table content");
throw Exception(ErrorCodes::INCORRECT_DATA, "Expected field \"data\" with table content");
JSONUtils::skipArrayStart(*in);
}
void JSONCompactRowInputFormat::readSuffix()

View File

@ -81,6 +81,10 @@ void registerJSONObjectEachRowSchemaReader(FormatFactory & factory)
{
return std::make_unique<JSONObjectEachRowSchemaReader>(buf, settings);
});
factory.registerAdditionalInfoForSchemaCacheGetter("JSONObjectEachRow", [](const FormatSettings & settings)
{
return getAdditionalFormatInfoByEscapingRule(settings, FormatSettings::EscapingRule::JSON);
});
}
}

View File

@ -1,7 +1,6 @@
#include <Processors/Formats/Impl/JSONRowInputFormat.h>
#include <Formats/JSONUtils.h>
#include <Formats/FormatFactory.h>
#include <Common/logger_useful.h>
namespace DB
{
@ -20,32 +19,17 @@ void JSONRowInputFormat::readPrefix()
{
skipBOMIfExists(*in);
JSONUtils::skipObjectStart(*in);
auto names_and_types = JSONUtils::readMetadata(*in);
if (use_metadata)
{
auto header = getPort().getHeader();
for (const auto & [name, type] : names_and_types)
{
auto header_type = header.getByName(name).type;
if (header.has(name) && !type->equals(*header_type))
throw Exception(
ErrorCodes::INCORRECT_DATA, "Type {} of column '{}' from metadata is not the same as type in header {}", type->getName(), name, header_type->getName());
}
}
JSONUtils::skipComma(*in);
while (!JSONUtils::checkAndSkipObjectEnd(*in))
{
auto field_name = JSONUtils::readFieldName(*in);
LOG_DEBUG(&Poco::Logger::get("JSONRowInputFormat"), "Field {}", field_name);
if (field_name == "data")
{
JSONUtils::skipArrayStart(*in);
data_in_square_brackets = true;
return;
}
}
JSONUtils::readMetadataAndValidateHeader(*in, getPort().getHeader());
else
JSONUtils::readMetadata(*in);
throw Exception(ErrorCodes::INCORRECT_DATA, "Expected field \"data\" with table content");
JSONUtils::skipComma(*in);
if (!JSONUtils::skipUntilFieldInObject(*in, "data"))
throw Exception(ErrorCodes::INCORRECT_DATA, "Expected field \"data\" with table content");
JSONUtils::skipArrayStart(*in);
data_in_square_brackets = true;
}
void JSONRowInputFormat::readSuffix()
@ -60,6 +44,7 @@ JSONRowSchemaReader::JSONRowSchemaReader(ReadBuffer & in_) : ISchemaReader(in_)
NamesAndTypesList JSONRowSchemaReader::readSchema()
{
skipBOMIfExists(in);
JSONUtils::skipObjectStart(in);
return JSONUtils::readMetadata(in);
}

View File

@ -302,14 +302,17 @@ void registerTSVSchemaReader(FormatFactory & factory)
{
return std::make_shared<TabSeparatedSchemaReader>(buf, with_names, with_types, is_raw, settings);
});
factory.registerAdditionalInfoForSchemaCacheGetter(format_name, [with_names, is_raw](const FormatSettings & settings)
if (!with_types)
{
String result = getAdditionalFormatInfoByEscapingRule(
settings, is_raw ? FormatSettings::EscapingRule::Raw : FormatSettings::EscapingRule::Escaped);
if (!with_names)
result += fmt::format(", column_names_for_schema_inference={}", settings.column_names_for_schema_inference);
return result;
});
factory.registerAdditionalInfoForSchemaCacheGetter(format_name, [with_names, is_raw](const FormatSettings & settings)
{
String result = getAdditionalFormatInfoByEscapingRule(
settings, is_raw ? FormatSettings::EscapingRule::Raw : FormatSettings::EscapingRule::Escaped);
if (!with_names)
result += fmt::format(", column_names_for_schema_inference={}", settings.column_names_for_schema_inference);
return result;
});
}
};
registerWithNamesAndTypes(is_raw ? "TabSeparatedRaw" : "TabSeparated", register_func);

View File

@ -10,3 +10,9 @@ a Array(UInt64)
0 Hello []
1 Hello [0]
2 Hello [0,1]
n UInt32
s String
a Array(UInt64)
0 Hello []
1 Hello [0]
2 Hello [0,1]

View File

@ -7,3 +7,7 @@ insert into function file(02416_data.jsonCompact) select number::UInt32 as n, 'H
desc file(02416_data.jsonCompact);
select * from file(02416_data.jsonCompact);
insert into function file(02416_data.jsonColumnsWithMetadata) select number::UInt32 as n, 'Hello' as s, range(number) as a from numbers(3) settings engine_file_truncate_on_insert=1;
desc file(02416_data.jsonColumnsWithMetadata);
select * from file(02416_data.jsonColumnsWithMetadata);