mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 17:41:59 +00:00
Merge pull request #54405 from Avogar/json-or-jsonl
Parse data in JSON format as JSONEachRow if failed to parse metadata
This commit is contained in:
commit
a3dcce21b3
@ -687,10 +687,9 @@ namespace JSONUtils
|
||||
return names_and_types;
|
||||
}
|
||||
|
||||
NamesAndTypesList readMetadataAndValidateHeader(ReadBuffer & in, const Block & header)
|
||||
void validateMetadataByHeader(const NamesAndTypesList & names_and_types_from_metadata, const Block & header)
|
||||
{
|
||||
auto names_and_types = JSONUtils::readMetadata(in);
|
||||
for (const auto & [name, type] : names_and_types)
|
||||
for (const auto & [name, type] : names_and_types_from_metadata)
|
||||
{
|
||||
if (!header.has(name))
|
||||
continue;
|
||||
@ -698,10 +697,16 @@ namespace JSONUtils
|
||||
auto header_type = header.getByName(name).type;
|
||||
if (!type->equals(*header_type))
|
||||
throw Exception(
|
||||
ErrorCodes::INCORRECT_DATA,
|
||||
"Type {} of column '{}' from metadata is not the same as type in header {}",
|
||||
type->getName(), name, header_type->getName());
|
||||
ErrorCodes::INCORRECT_DATA,
|
||||
"Type {} of column '{}' from metadata is not the same as type in header {}",
|
||||
type->getName(), name, header_type->getName());
|
||||
}
|
||||
}
|
||||
|
||||
NamesAndTypesList readMetadataAndValidateHeader(ReadBuffer & in, const Block & header)
|
||||
{
|
||||
auto names_and_types = JSONUtils::readMetadata(in);
|
||||
validateMetadataByHeader(names_and_types, header);
|
||||
return names_and_types;
|
||||
}
|
||||
|
||||
|
@ -124,6 +124,7 @@ namespace JSONUtils
|
||||
|
||||
NamesAndTypesList readMetadata(ReadBuffer & in);
|
||||
NamesAndTypesList readMetadataAndValidateHeader(ReadBuffer & in, const Block & header);
|
||||
void validateMetadataByHeader(const NamesAndTypesList & names_and_types_from_metadata, const Block & header);
|
||||
|
||||
bool skipUntilFieldInObject(ReadBuffer & in, const String & desired_field_name);
|
||||
void skipTheRestOfObject(ReadBuffer & in);
|
||||
|
@ -32,10 +32,11 @@ public:
|
||||
String getName() const override { return "JSONEachRowRowInputFormat"; }
|
||||
void resetParser() override;
|
||||
|
||||
private:
|
||||
protected:
|
||||
void readPrefix() override;
|
||||
void readSuffix() override;
|
||||
|
||||
private:
|
||||
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
|
||||
bool allowSyncAfterError() const override { return true; }
|
||||
void syncAfterError() override;
|
||||
|
@ -12,42 +12,106 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
JSONRowInputFormat::JSONRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_)
|
||||
: JSONEachRowRowInputFormat(in_, header_, params_, format_settings_, false), validate_types_from_metadata(format_settings_.json.validate_types_from_metadata)
|
||||
: JSONRowInputFormat(std::make_unique<PeekableReadBuffer>(in_), header_, params_, format_settings_)
|
||||
{
|
||||
}
|
||||
|
||||
JSONRowInputFormat::JSONRowInputFormat(std::unique_ptr<PeekableReadBuffer> buf, const DB::Block & header_, DB::IRowInputFormat::Params params_, const DB::FormatSettings & format_settings_)
|
||||
: JSONEachRowRowInputFormat(*buf, header_, params_, format_settings_, false), validate_types_from_metadata(format_settings_.json.validate_types_from_metadata), peekable_buf(std::move(buf))
|
||||
{
|
||||
}
|
||||
|
||||
void JSONRowInputFormat::readPrefix()
|
||||
{
|
||||
skipBOMIfExists(*in);
|
||||
JSONUtils::skipObjectStart(*in);
|
||||
if (validate_types_from_metadata)
|
||||
JSONUtils::readMetadataAndValidateHeader(*in, getPort().getHeader());
|
||||
else
|
||||
JSONUtils::readMetadata(*in);
|
||||
skipBOMIfExists(*peekable_buf);
|
||||
|
||||
JSONUtils::skipComma(*in);
|
||||
if (!JSONUtils::skipUntilFieldInObject(*in, "data"))
|
||||
throw Exception(ErrorCodes::INCORRECT_DATA, "Expected field \"data\" with table content");
|
||||
PeekableReadBufferCheckpoint checkpoint(*peekable_buf);
|
||||
NamesAndTypesList names_and_types_from_metadata;
|
||||
|
||||
JSONUtils::skipArrayStart(*in);
|
||||
data_in_square_brackets = true;
|
||||
/// Try to parse metadata, if failed, try to parse data as JSONEachRow format.
|
||||
try
|
||||
{
|
||||
JSONUtils::skipObjectStart(*peekable_buf);
|
||||
names_and_types_from_metadata = JSONUtils::readMetadata(*peekable_buf);
|
||||
JSONUtils::skipComma(*peekable_buf);
|
||||
if (!JSONUtils::skipUntilFieldInObject(*peekable_buf, "data"))
|
||||
throw Exception(ErrorCodes::INCORRECT_DATA, "Expected field \"data\" with table content");
|
||||
|
||||
JSONUtils::skipArrayStart(*peekable_buf);
|
||||
data_in_square_brackets = true;
|
||||
}
|
||||
catch (const ParsingException &)
|
||||
{
|
||||
parse_as_json_each_row = true;
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
if (e.code() != ErrorCodes::INCORRECT_DATA)
|
||||
throw;
|
||||
|
||||
parse_as_json_each_row = true;
|
||||
}
|
||||
|
||||
if (parse_as_json_each_row)
|
||||
{
|
||||
peekable_buf->rollbackToCheckpoint();
|
||||
JSONEachRowRowInputFormat::readPrefix();
|
||||
}
|
||||
else if (validate_types_from_metadata)
|
||||
{
|
||||
JSONUtils::validateMetadataByHeader(names_and_types_from_metadata, getPort().getHeader());
|
||||
}
|
||||
}
|
||||
|
||||
void JSONRowInputFormat::readSuffix()
|
||||
{
|
||||
JSONUtils::skipArrayEnd(*in);
|
||||
JSONUtils::skipTheRestOfObject(*in);
|
||||
if (parse_as_json_each_row)
|
||||
{
|
||||
JSONEachRowRowInputFormat::readSuffix();
|
||||
}
|
||||
else
|
||||
{
|
||||
JSONUtils::skipArrayEnd(*peekable_buf);
|
||||
JSONUtils::skipTheRestOfObject(*peekable_buf);
|
||||
}
|
||||
}
|
||||
|
||||
JSONRowSchemaReader::JSONRowSchemaReader(ReadBuffer & in_) : ISchemaReader(in_)
|
||||
void JSONRowInputFormat::setReadBuffer(DB::ReadBuffer & in_)
|
||||
{
|
||||
peekable_buf->setSubBuffer(in_);
|
||||
}
|
||||
|
||||
void JSONRowInputFormat::resetParser()
|
||||
{
|
||||
JSONEachRowRowInputFormat::resetParser();
|
||||
peekable_buf->reset();
|
||||
}
|
||||
|
||||
JSONRowSchemaReader::JSONRowSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_)
|
||||
: JSONRowSchemaReader(std::make_unique<PeekableReadBuffer>(in_), format_settings_)
|
||||
{
|
||||
}
|
||||
|
||||
JSONRowSchemaReader::JSONRowSchemaReader(std::unique_ptr<PeekableReadBuffer> buf, const DB::FormatSettings & format_settings_)
|
||||
: JSONEachRowSchemaReader(*buf, format_settings_), peekable_buf(std::move(buf))
|
||||
{
|
||||
}
|
||||
|
||||
NamesAndTypesList JSONRowSchemaReader::readSchema()
|
||||
{
|
||||
skipBOMIfExists(in);
|
||||
JSONUtils::skipObjectStart(in);
|
||||
return JSONUtils::readMetadata(in);
|
||||
skipBOMIfExists(*peekable_buf);
|
||||
PeekableReadBufferCheckpoint checkpoint(*peekable_buf);
|
||||
/// Try to parse metadata, if failed, try to parse data as JSONEachRow format
|
||||
try
|
||||
{
|
||||
JSONUtils::skipObjectStart(*peekable_buf);
|
||||
return JSONUtils::readMetadata(*peekable_buf);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
peekable_buf->rollbackToCheckpoint(true);
|
||||
return JSONEachRowSchemaReader::readSchema();
|
||||
}
|
||||
}
|
||||
|
||||
void registerInputFormatJSON(FormatFactory & factory)
|
||||
@ -69,7 +133,7 @@ void registerJSONSchemaReader(FormatFactory & factory)
|
||||
auto register_schema_reader = [&](const String & format)
|
||||
{
|
||||
factory.registerSchemaReader(
|
||||
format, [](ReadBuffer & buf, const FormatSettings &) { return std::make_unique<JSONRowSchemaReader>(buf); });
|
||||
format, [](ReadBuffer & buf, const FormatSettings & format_settings) { return std::make_unique<JSONRowSchemaReader>(buf, format_settings); });
|
||||
};
|
||||
register_schema_reader("JSON");
|
||||
/// JSONCompact has the same suffix with metadata.
|
||||
|
@ -23,21 +23,38 @@ public:
|
||||
|
||||
String getName() const override { return "JSONRowInputFormat"; }
|
||||
|
||||
void setReadBuffer(ReadBuffer & in_) override;
|
||||
void resetParser() override;
|
||||
|
||||
private:
|
||||
JSONRowInputFormat(
|
||||
std::unique_ptr<PeekableReadBuffer> buf,
|
||||
const Block & header_,
|
||||
Params params_,
|
||||
const FormatSettings & format_settings_);
|
||||
|
||||
void readPrefix() override;
|
||||
void readSuffix() override;
|
||||
|
||||
const bool validate_types_from_metadata;
|
||||
bool parse_as_json_each_row = false;
|
||||
std::unique_ptr<PeekableReadBuffer> peekable_buf;
|
||||
std::exception_ptr reading_metadata_exception;
|
||||
};
|
||||
|
||||
class JSONRowSchemaReader : public ISchemaReader
|
||||
class JSONRowSchemaReader : public JSONEachRowSchemaReader
|
||||
{
|
||||
public:
|
||||
JSONRowSchemaReader(ReadBuffer & in_);
|
||||
JSONRowSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_);
|
||||
|
||||
NamesAndTypesList readSchema() override;
|
||||
|
||||
bool hasStrictOrderOfColumns() const override { return false; }
|
||||
|
||||
private:
|
||||
JSONRowSchemaReader(std::unique_ptr<PeekableReadBuffer> buf, const FormatSettings & format_settings_);
|
||||
|
||||
std::unique_ptr<PeekableReadBuffer> peekable_buf;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,3 @@
|
||||
a Nullable(Int64)
|
||||
b Nullable(String)
|
||||
10 Hello
|
@ -0,0 +1,3 @@
|
||||
desc format(JSON, '{"a" : 10, "b" : "Hello"}');
|
||||
select * from format(JSON, '{"a" : 10, "b" : "Hello"}');
|
||||
|
Loading…
Reference in New Issue
Block a user