Merge pull request #40910 from Avogar/new-json-formats

Add new JSON formats, add improvements and refactoring
This commit is contained in:
Kruglov Pavel 2022-09-21 14:19:08 +02:00 committed by GitHub
commit 22e11aef2d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
67 changed files with 1329 additions and 426 deletions

View File

@ -29,12 +29,12 @@ The supported formats are:
| [SQLInsert](#sqlinsert) | ✗ | ✔ |
| [Values](#data-format-values) | ✔ | ✔ |
| [Vertical](#vertical) | ✗ | ✔ |
| [JSON](#json) | | ✔ |
| [JSON](#json) | | ✔ |
| [JSONAsString](#jsonasstring) | ✔ | ✗ |
| [JSONStrings](#jsonstrings) | | ✔ |
| [JSONStrings](#jsonstrings) | | ✔ |
| [JSONColumns](#jsoncolumns) | ✔ | ✔ |
| [JSONColumnsWithMetadata](#jsoncolumnswithmetadata) | | ✔ |
| [JSONCompact](#jsoncompact) | | ✔ |
| [JSONColumnsWithMetadata](#jsoncolumnswithmetadata) | | ✔ |
| [JSONCompact](#jsoncompact) | | ✔ |
| [JSONCompactStrings](#jsoncompactstrings) | ✗ | ✔ |
| [JSONCompactColumns](#jsoncompactcolumns) | ✔ | ✔ |
| [JSONEachRow](#jsoneachrow) | ✔ | ✔ |
@ -47,6 +47,7 @@ The supported formats are:
| [JSONCompactStringsEachRow](#jsoncompactstringseachrow) | ✔ | ✔ |
| [JSONCompactStringsEachRowWithNames](#jsoncompactstringseachrowwithnames) | ✔ | ✔ |
| [JSONCompactStringsEachRowWithNamesAndTypes](#jsoncompactstringseachrowwithnamesandtypes) | ✔ | ✔ |
| [JSONObjectEachRow](#jsonobjecteachrow) | ✔ | ✔ |
| [TSKV](#tskv) | ✔ | ✔ |
| [Pretty](#pretty) | ✗ | ✔ |
| [PrettyNoEscapes](#prettynoescapes) | ✗ | ✔ |
@ -608,8 +609,6 @@ If the query contains GROUP BY, rows_before_limit_at_least is the exact number o
`extremes` Extreme values (when extremes are set to 1).
This format is only appropriate for outputting a query result, but not for parsing (retrieving data to insert in a table).
ClickHouse supports [NULL](../sql-reference/syntax.md), which is displayed as `null` in the JSON output. To enable `+nan`, `-nan`, `+inf`, `-inf` values in output, set the [output_format_json_quote_denormals](../operations/settings/settings.md#output_format_json_quote_denormals) to 1.
**See Also**
@ -617,6 +616,9 @@ ClickHouse supports [NULL](../sql-reference/syntax.md), which is displayed as `n
- [JSONEachRow](#jsoneachrow) format
- [output_format_json_array_of_rows](../operations/settings/settings.md#output_format_json_array_of_rows) setting
For JSON input format, if setting [input_format_json_validate_types_from_metadata](../operations/settings/settings.md#input_format_json_validate_types_from_metadata) is set to 1,
the types from metadata in input data will be compared with the types of the corresponding columns from the table.
## JSONStrings {#jsonstrings}
Differs from JSON only in that data fields are output in strings, not in typed JSON values.
@ -693,8 +695,8 @@ Columns that are not present in the block will be filled with default values (yo
## JSONColumnsWithMetadata {#jsoncolumnsmonoblock}
Differs from JSONColumns output format in that it also outputs some metadata and statistics (similar to JSON output format).
This format buffers all data in memory and then outputs them as a single block, so, it can lead to high memory consumption.
Differs from JSONColumns format in that it also contains some metadata and statistics (similar to JSON format).
Output format buffers all data in memory and then outputs them as a single block, so, it can lead to high memory consumption.
Example:
```json
@ -736,6 +738,9 @@ Example:
}
```
For JSONColumnsWithMetadata input format, if setting [input_format_json_validate_types_from_metadata](../operations/settings/settings.md#input_format_json_validate_types_from_metadata) is set to 1,
the types from metadata in input data will be compared with the types of the corresponding columns from the table.
## JSONAsString {#jsonasstring}
In this format, a single JSON object is interpreted as a single value. If the input has several JSON objects (comma separated), they are interpreted as separate rows. If the input data is enclosed in square brackets, it is interpreted as an array of JSONs.
@ -1001,6 +1006,21 @@ the types from input data will be compared with the types of the corresponding c
[44, "hello", [0,1,2,3]]
```
## JSONObjectEachRow {#jsonobjecteachrow}
In this format, all data is represented as a single JSON Object, each row is represented as separate field of this object similar to JSONEachRow format.
Example:
```json
{
"row_1": {"num": 42, "str": "hello", "arr": [0,1]},
"row_2": {"num": 43, "str": "hello", "arr": [0,1,2]},
"row_3": {"num": 44, "str": "hello", "arr": [0,1,2,3]}
}
```
### Inserting Data {#json-inserting-data}
``` sql
@ -1124,11 +1144,15 @@ SELECT * FROM json_each_row_nested
- [input_format_import_nested_json](../operations/settings/settings.md#input_format_import_nested_json) - map nested JSON data to nested tables (it works for JSONEachRow format). Default value - `false`.
- [input_format_json_read_bools_as_numbers](../operations/settings/settings.md#input_format_json_read_bools_as_numbers) - allow to parse bools as numbers in JSON input formats. Default value - `true`.
- [input_format_json_read_numbers_as_strings](../operations/settings/settings.md#input_format_json_read_numbers_as_strings) - allow to parse numbers as strings in JSON input formats. Default value - `false`.
- [output_format_json_quote_64bit_integers](../operations/settings/settings.md#output_format_json_quote_64bit_integers) - controls quoting of 64-bit integers in JSON output format. Default value - `true`.
- [output_format_json_quote_64bit_floats](../operations/settings/settings.md#output_format_json_quote_64bit_floats) - controls quoting of 64-bit floats in JSON output format. Default value - `false`.
- [output_format_json_quote_denormals](../operations/settings/settings.md#output_format_json_quote_denormals) - enables '+nan', '-nan', '+inf', '-inf' outputs in JSON output format. Default value - `false`.
- [output_format_json_quote_decimals](../operations/settings/settings.md#output_format_json_quote_decimals) - controls quoting of decimals in JSON output format. Default value - `false`.
- [output_format_json_escape_forward_slashes](../operations/settings/settings.md#output_format_json_escape_forward_slashes) - controls escaping forward slashes for string outputs in JSON output format. Default value - `true`.
- [output_format_json_named_tuples_as_objects](../operations/settings/settings.md#output_format_json_named_tuples_as_objects) - serialize named tuple columns as JSON objects. Default value - `false`.
- [output_format_json_array_of_rows](../operations/settings/settings.md#output_format_json_array_of_rows) - output a JSON array of all rows in JSONEachRow(Compact) format. Default value - `false`.
- [output_format_json_validate_utf8](../operations/settings/settings.md#output_format_json_validate_utf8) - enables validation of UTF-8 sequences in JSON output formats (note that it doesn't impact formats JSON/JSONCompact/JSONColumnsWithMetadata, they always validate utf8). Default value - `false`.
## Native {#native}

View File

@ -3705,6 +3705,19 @@ Allow parsing bools as numbers in JSON input formats.
Enabled by default.
### input_format_json_read_numbers_as_strings {#input_format_json_read_numbers_as_strings}
Allow parsing numbers as strings in JSON input formats.
Disabled by default.
### input_format_json_validate_types_from_metadata {#input_format_json_validate_types_from_metadata}
For JSON/JSONCompact/JSONColumnsWithMetadata input formats, if this setting is set to 1,
the types from metadata in input data will be compared with the types of the corresponding columns from the table.
Enabled by default.
### output_format_json_quote_64bit_integers {#output_format_json_quote_64bit_integers}
Controls quoting of 64-bit or bigger [integers](../../sql-reference/data-types/int-uint.md) (like `UInt64` or `Int128`) when they are output in a [JSON](../../interfaces/formats.md#json) format.
@ -3717,6 +3730,12 @@ Possible values:
Default value: 1.
### output_format_json_quote_64bit_floats {#output_format_json_quote_64bit_floats}
Controls quoting of 64-bit [floats](../../sql-reference/data-types/float.md) when they are output in JSON* formats.
Disabled by default.
### output_format_json_quote_denormals {#output_format_json_quote_denormals}
Enables `+nan`, `-nan`, `+inf`, `-inf` outputs in [JSON](../../interfaces/formats.md#json) output format.
@ -3816,6 +3835,12 @@ When `output_format_json_quote_denormals = 1`, the query returns:
}
```
### output_format_json_quote_decimals {#output_format_json_quote_decimals}
Controls quoting of decimals in JSON output formats.
Disabled by default.
### output_format_json_escape_forward_slashes {#output_format_json_escape_forward_slashes}
Controls escaping forward slashes for string outputs in JSON output format. This is intended for compatibility with JavaScript. Don't confuse with backslashes that are always escaped.
@ -3875,6 +3900,12 @@ Result:
{"number":"2"}
```
### output_format_json_validate_utf8 {#output_format_json_validate_utf8}
Controls validation of UTF-8 sequences in JSON output formats, doesn't impact formats JSON/JSONCompact/JSONColumnsWithMetadata, they always validate UTF-8.
Disabled by default.
## TSV format settings {#tsv-format-settings}
### input_format_tsv_empty_as_default {#input_format_tsv_empty_as_default}

View File

@ -734,6 +734,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(String, schema_inference_hints, "", "The list of column names and types to use in schema inference for formats without column names. The format: 'column_name1 column_type1, column_name2 column_type2, ...'", 0) \
M(Bool, input_format_json_read_bools_as_numbers, true, "Allow to parse bools as numbers in JSON input formats", 0) \
M(Bool, input_format_json_try_infer_numbers_from_strings, true, "Try to infer numbers from string fields while schema inference", 0) \
M(Bool, input_format_json_validate_types_from_metadata, true, "For JSON/JSONCompact/JSONColumnsWithMetadata input formats this controls whether format parser should check if data types from input metadata match data types of the corresponding columns from the table", 0) \
M(Bool, input_format_json_read_numbers_as_strings, false, "Allow to parse numbers as strings in JSON input formats", 0) \
M(Bool, input_format_try_infer_integers, true, "Try to infer numbers from string fields while schema inference in text formats", 0) \
M(Bool, input_format_try_infer_dates, true, "Try to infer dates from string fields while schema inference in text formats", 0) \
M(Bool, input_format_try_infer_datetimes, true, "Try to infer datetimes from string fields while schema inference in text formats", 0) \
@ -759,10 +761,13 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
\
M(Bool, output_format_json_quote_64bit_integers, true, "Controls quoting of 64-bit integers in JSON output format.", 0) \
M(Bool, output_format_json_quote_denormals, false, "Enables '+nan', '-nan', '+inf', '-inf' outputs in JSON output format.", 0) \
M(Bool, output_format_json_quote_decimals, false, "Controls quoting of decimals in JSON output format.", 0) \
M(Bool, output_format_json_quote_64bit_floats, false, "Controls quoting of 64-bit float numbers in JSON output format.", 0) \
\
M(Bool, output_format_json_escape_forward_slashes, true, "Controls escaping forward slashes for string outputs in JSON output format. This is intended for compatibility with JavaScript. Don't confuse with backslashes that are always escaped.", 0) \
M(Bool, output_format_json_named_tuples_as_objects, true, "Serialize named tuple columns as JSON objects.", 0) \
M(Bool, output_format_json_array_of_rows, false, "Output a JSON array of all rows in JSONEachRow(Compact) format.", 0) \
M(Bool, output_format_json_validate_utf8, false, "Validate UTF-8 sequences in JSON output formats, doesn't impact formats JSON/JSONCompact/JSONColumnsWithMetadata, they always validate utf8", 0) \
\
M(UInt64, output_format_pretty_max_rows, 10000, "Rows limit for Pretty formats.", 0) \
M(UInt64, output_format_pretty_max_column_pad_width, 250, "Maximum width to pad all values in a column in Pretty formats.", 0) \

View File

@ -69,6 +69,28 @@ void SerializationDecimal<T>::deserializeTextCSV(IColumn & column, ReadBuffer &
assert_cast<ColumnType &>(column).getData().push_back(x);
}
template <typename T>
void SerializationDecimal<T>::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
if (settings.json.quote_decimals)
writeChar('"', ostr);
serializeText(column, row_num, ostr, settings);
if (settings.json.quote_decimals)
writeChar('"', ostr);
}
template <typename T>
void SerializationDecimal<T>::deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
bool have_quotes = checkChar('"', istr);
deserializeText(column, istr, settings, false);
if (have_quotes)
assertChar('"', istr);
}
template class SerializationDecimal<Decimal32>;
template class SerializationDecimal<Decimal64>;
template class SerializationDecimal<Decimal128>;

View File

@ -18,6 +18,9 @@ public:
void deserializeText(IColumn & column, ReadBuffer & istr, const FormatSettings &, bool whole) const override;
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const override;
void deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override;
void readText(T & x, ReadBuffer & istr, bool csv = false) const { readText(x, istr, this->precision, this->scale, csv); }
static void readText(T & x, ReadBuffer & istr, UInt32 precision_, UInt32 scale_, bool csv = false);

View File

@ -12,6 +12,7 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/VarInt.h>
#include <IO/ReadBufferFromString.h>
#ifdef __SSE2__
#include <emmintrin.h>
@ -21,6 +22,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
}
void SerializationString::serializeBinary(const Field & field, WriteBuffer & ostr) const
{
const String & s = field.get<const String &>();
@ -271,9 +277,21 @@ void SerializationString::serializeTextJSON(const IColumn & column, size_t row_n
}
void SerializationString::deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
void SerializationString::deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
read(column, [&](ColumnString::Chars & data) { readJSONStringInto(data, istr); });
if (settings.json.read_numbers_as_strings && !istr.eof() && *istr.position() != '"')
{
String field;
readJSONField(field, istr);
Float64 tmp;
ReadBufferFromString buf(field);
if (tryReadFloatText(tmp, buf))
read(column, [&](ColumnString::Chars & data) { data.insert(field.begin(), field.end()); });
else
throw Exception(ErrorCodes::INCORRECT_DATA, "Cannot parse JSON String value here: {}", field);
}
else
read(column, [&](ColumnString::Chars & data) { readJSONStringInto(data, istr); });
}

View File

@ -332,7 +332,7 @@ void transformInferredTypesIfNeededImpl(DataTypes & types, const FormatSettings
/// Check settings specific for JSON formats.
/// If we have numbers and strings, convert numbers to strings.
if (settings.json.try_infer_numbers_from_strings)
if (settings.json.try_infer_numbers_from_strings || settings.json.read_numbers_as_strings)
{
bool have_strings = false;
bool have_numbers = false;
@ -346,7 +346,9 @@ void transformInferredTypesIfNeededImpl(DataTypes & types, const FormatSettings
{
for (auto & type : data_types)
{
if (isNumber(type) && (!numbers_parsed_from_json_strings || numbers_parsed_from_json_strings->contains(type.get())))
if (isNumber(type)
&& (settings.json.read_numbers_as_strings || !numbers_parsed_from_json_strings
|| numbers_parsed_from_json_strings->contains(type.get())))
type = std::make_shared<DataTypeString>();
}
}

View File

@ -92,9 +92,14 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.json.escape_forward_slashes = settings.output_format_json_escape_forward_slashes;
format_settings.json.named_tuples_as_objects = settings.output_format_json_named_tuples_as_objects;
format_settings.json.quote_64bit_integers = settings.output_format_json_quote_64bit_integers;
format_settings.json.quote_64bit_floats = settings.output_format_json_quote_64bit_floats;
format_settings.json.quote_denormals = settings.output_format_json_quote_denormals;
format_settings.json.quote_decimals = settings.output_format_json_quote_decimals;
format_settings.json.read_bools_as_numbers = settings.input_format_json_read_bools_as_numbers;
format_settings.json.read_numbers_as_strings = settings.input_format_json_read_numbers_as_strings;
format_settings.json.try_infer_numbers_from_strings = settings.input_format_json_try_infer_numbers_from_strings;
format_settings.json.validate_types_from_metadata = settings.input_format_json_validate_types_from_metadata;
format_settings.json.validate_utf8 = settings.output_format_json_validate_utf8;
format_settings.null_as_default = settings.input_format_null_as_default;
format_settings.decimal_trailing_zeros = settings.output_format_decimal_trailing_zeros;
format_settings.parquet.row_group_size = settings.output_format_parquet_row_group_size;

View File

@ -142,12 +142,17 @@ struct FormatSettings
{
bool array_of_rows = false;
bool quote_64bit_integers = true;
bool quote_64bit_floats = false;
bool quote_denormals = true;
bool quote_decimals = false;
bool escape_forward_slashes = true;
bool named_tuples_as_objects = false;
bool serialize_as_strings = false;
bool read_bools_as_numbers = true;
bool read_numbers_as_strings = true;
bool try_infer_numbers_from_strings = false;
bool validate_types_from_metadata = true;
bool validate_utf8 = false;
} json;
struct

View File

@ -21,6 +21,7 @@
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
@ -412,40 +413,6 @@ namespace JSONUtils
}
}
DataTypePtr getCommonTypeForJSONFormats(const DataTypePtr & first, const DataTypePtr & second, bool allow_bools_as_numbers)
{
if (allow_bools_as_numbers)
{
auto not_nullable_first = removeNullable(first);
auto not_nullable_second = removeNullable(second);
/// Check if we have Bool and Number and if so make the result type Number
bool bool_type_presents = isBool(not_nullable_first) || isBool(not_nullable_second);
bool number_type_presents = isNumber(not_nullable_first) || isNumber(not_nullable_second);
if (bool_type_presents && number_type_presents)
{
if (isBool(not_nullable_first))
return second;
return first;
}
}
/// If we have Map and Object, make result type Object
bool object_type_presents = isObject(first) || isObject(second);
bool map_type_presents = isMap(first) || isMap(second);
if (object_type_presents && map_type_presents)
{
if (isObject(first))
return first;
return second;
}
/// If we have different Maps, make result type Object
if (isMap(first) && isMap(second) && !first->equals(*second))
return std::make_shared<DataTypeObject>("json", true);
return nullptr;
}
void writeFieldDelimiter(WriteBuffer & out, size_t new_lines)
{
writeChar(',', out);
@ -454,26 +421,35 @@ namespace JSONUtils
void writeFieldCompactDelimiter(WriteBuffer & out) { writeCString(", ", out); }
template <bool with_space>
void writeTitle(const char * title, WriteBuffer & out, size_t indent)
void writeTitle(const char * title, WriteBuffer & out, size_t indent, const char * after_delimiter)
{
writeChar('\t', indent, out);
writeChar('"', out);
writeCString(title, out);
if constexpr (with_space)
writeCString("\": ", out);
else
writeCString("\":\n", out);
writeCString("\":", out);
writeCString(after_delimiter, out);
}
void writeObjectStart(WriteBuffer & out, size_t indent, const char * title)
{
if (title)
writeTitle<false>(title, out, indent);
writeTitle(title, out, indent, "\n");
writeChar('\t', indent, out);
writeCString("{\n", out);
}
void writeCompactObjectStart(WriteBuffer & out, size_t indent, const char * title)
{
if (title)
writeTitle(title, out, indent, " ");
writeCString("{", out);
}
void writeCompactObjectEnd(WriteBuffer & out)
{
writeChar('}', out);
}
void writeObjectEnd(WriteBuffer & out, size_t indent)
{
writeChar('\n', out);
@ -484,7 +460,7 @@ namespace JSONUtils
void writeArrayStart(WriteBuffer & out, size_t indent, const char * title)
{
if (title)
writeTitle<false>(title, out, indent);
writeTitle(title, out, indent, "\n");
writeChar('\t', indent, out);
writeCString("[\n", out);
}
@ -492,7 +468,7 @@ namespace JSONUtils
void writeCompactArrayStart(WriteBuffer & out, size_t indent, const char * title)
{
if (title)
writeTitle<true>(title, out, indent);
writeTitle(title, out, indent, " ");
else
writeChar('\t', indent, out);
writeCString("[", out);
@ -515,10 +491,11 @@ namespace JSONUtils
const FormatSettings & settings,
WriteBuffer & out,
const std::optional<String> & name,
size_t indent)
size_t indent,
const char * title_after_delimiter)
{
if (name.has_value())
writeTitle<true>(name->data(), out, indent);
writeTitle(name->data(), out, indent, title_after_delimiter);
if (yield_strings)
{
@ -533,7 +510,7 @@ namespace JSONUtils
void writeColumns(
const Columns & columns,
const NamesAndTypes & fields,
const Names & names,
const Serializations & serializations,
size_t row_num,
bool yield_strings,
@ -545,7 +522,7 @@ namespace JSONUtils
{
if (i != 0)
writeFieldDelimiter(out);
writeFieldFromColumn(*columns[i], *serializations[i], row_num, yield_strings, settings, out, fields[i].name, indent);
writeFieldFromColumn(*columns[i], *serializations[i], row_num, yield_strings, settings, out, names[i], indent);
}
}
@ -565,27 +542,27 @@ namespace JSONUtils
}
}
void writeMetadata(const NamesAndTypes & fields, const FormatSettings & settings, WriteBuffer & out)
void writeMetadata(const Names & names, const DataTypes & types, const FormatSettings & settings, WriteBuffer & out)
{
writeArrayStart(out, 1, "meta");
for (size_t i = 0; i < fields.size(); ++i)
for (size_t i = 0; i < names.size(); ++i)
{
writeObjectStart(out, 2);
writeTitle<true>("name", out, 3);
writeTitle("name", out, 3, " ");
/// The field names are pre-escaped to be put into JSON string literal.
writeChar('"', out);
writeString(fields[i].name, out);
writeString(names[i], out);
writeChar('"', out);
writeFieldDelimiter(out);
writeTitle<true>("type", out, 3);
writeJSONString(fields[i].type->getName(), out, settings);
writeTitle("type", out, 3, " ");
writeJSONString(types[i]->getName(), out, settings);
writeObjectEnd(out, 2);
if (i + 1 < fields.size())
if (i + 1 < names.size())
writeFieldDelimiter(out);
}
@ -602,13 +579,13 @@ namespace JSONUtils
WriteBuffer & out)
{
writeFieldDelimiter(out, 2);
writeTitle<true>("rows", out, 1);
writeTitle("rows", out, 1, " ");
writeIntText(rows, out);
if (applied_limit)
{
writeFieldDelimiter(out, 2);
writeTitle<true>("rows_before_limit_at_least", out, 1);
writeTitle("rows_before_limit_at_least", out, 1, " ");
writeIntText(rows_before_limit, out);
}
@ -617,34 +594,210 @@ namespace JSONUtils
writeFieldDelimiter(out, 2);
writeObjectStart(out, 1, "statistics");
writeTitle<true>("elapsed", out, 2);
writeTitle("elapsed", out, 2, " ");
writeText(watch.elapsedSeconds(), out);
writeFieldDelimiter(out);
writeTitle<true>("rows_read", out, 2);
writeTitle("rows_read", out, 2, " ");
writeText(progress.read_rows.load(), out);
writeFieldDelimiter(out);
writeTitle<true>("bytes_read", out, 2);
writeTitle("bytes_read", out, 2, " ");
writeText(progress.read_bytes.load(), out);
writeObjectEnd(out, 1);
}
}
void makeNamesAndTypesWithValidUTF8(NamesAndTypes & fields, const FormatSettings & settings, bool & need_validate_utf8)
Strings makeNamesValidJSONStrings(const Strings & names, const FormatSettings & settings, bool validate_utf8)
{
for (auto & field : fields)
Strings result;
result.reserve(names.size());
for (const auto & name : names)
{
if (!field.type->textCanContainOnlyValidUTF8())
need_validate_utf8 = true;
WriteBufferFromOwnString buf;
if (validate_utf8)
{
WriteBufferValidUTF8 validating_buf(buf);
writeJSONString(field.name, validating_buf, settings);
writeJSONString(name, validating_buf, settings);
}
field.name = buf.str().substr(1, buf.str().size() - 2);
else
writeJSONString(name, buf, settings);
result.push_back(buf.str().substr(1, buf.str().size() - 2));
}
return result;
}
void skipColon(ReadBuffer & in)
{
skipWhitespaceIfAny(in);
assertChar(':', in);
skipWhitespaceIfAny(in);
}
String readFieldName(ReadBuffer & in)
{
skipWhitespaceIfAny(in);
String field;
readJSONString(field, in);
skipColon(in);
return field;
}
String readStringField(ReadBuffer & in)
{
skipWhitespaceIfAny(in);
String value;
readJSONString(value, in);
skipWhitespaceIfAny(in);
return value;
}
void skipArrayStart(ReadBuffer & in)
{
skipWhitespaceIfAny(in);
assertChar('[', in);
skipWhitespaceIfAny(in);
}
bool checkAndSkipArrayStart(ReadBuffer & in)
{
skipWhitespaceIfAny(in);
if (!checkChar('[', in))
return false;
skipWhitespaceIfAny(in);
return true;
}
void skipArrayEnd(ReadBuffer & in)
{
skipWhitespaceIfAny(in);
assertChar(']', in);
skipWhitespaceIfAny(in);
}
bool checkAndSkipArrayEnd(ReadBuffer & in)
{
skipWhitespaceIfAny(in);
if (!checkChar(']', in))
return false;
skipWhitespaceIfAny(in);
return true;
}
void skipObjectStart(ReadBuffer & in)
{
skipWhitespaceIfAny(in);
assertChar('{', in);
skipWhitespaceIfAny(in);
}
void skipObjectEnd(ReadBuffer & in)
{
skipWhitespaceIfAny(in);
assertChar('}', in);
skipWhitespaceIfAny(in);
}
bool checkAndSkipObjectEnd(ReadBuffer & in)
{
skipWhitespaceIfAny(in);
if (!checkChar('}', in))
return false;
skipWhitespaceIfAny(in);
return true;
}
void skipComma(ReadBuffer & in)
{
skipWhitespaceIfAny(in);
assertChar(',', in);
skipWhitespaceIfAny(in);
}
std::pair<String, String> readStringFieldNameAndValue(ReadBuffer & in)
{
auto field_name = readFieldName(in);
auto field_value = readStringField(in);
return {field_name, field_value};
}
NameAndTypePair readObjectWithNameAndType(ReadBuffer & in)
{
skipObjectStart(in);
auto [first_field_name, first_field_value] = readStringFieldNameAndValue(in);
skipComma(in);
auto [second_field_name, second_field_value] = readStringFieldNameAndValue(in);
NameAndTypePair name_and_type;
if (first_field_name == "name" && second_field_name == "type")
name_and_type = {first_field_value, DataTypeFactory::instance().get(second_field_value)};
else if (second_field_name == "name" && first_field_name == "type")
name_and_type = {second_field_value, DataTypeFactory::instance().get(first_field_value)};
else
throw Exception(
ErrorCodes::INCORRECT_DATA,
R"(Expected two fields "name" and "type" with column name and type, found fields "{}" and "{}")",
first_field_name,
second_field_name);
skipObjectEnd(in);
return name_and_type;
}
NamesAndTypesList readMetadata(ReadBuffer & in)
{
auto field_name = readFieldName(in);
if (field_name != "meta")
throw Exception(ErrorCodes::INCORRECT_DATA, "Expected field \"meta\" with columns names and types, found field {}", field_name);
skipArrayStart(in);
NamesAndTypesList names_and_types;
bool first = true;
while (!checkAndSkipArrayEnd(in))
{
if (!first)
skipComma(in);
else
first = false;
names_and_types.push_back(readObjectWithNameAndType(in));
}
return names_and_types;
}
NamesAndTypesList readMetadataAndValidateHeader(ReadBuffer & in, const Block & header)
{
auto names_and_types = JSONUtils::readMetadata(in);
for (const auto & [name, type] : names_and_types)
{
auto header_type = header.getByName(name).type;
if (header.has(name) && !type->equals(*header_type))
throw Exception(
ErrorCodes::INCORRECT_DATA, "Type {} of column '{}' from metadata is not the same as type in header {}", type->getName(), name, header_type->getName());
}
return names_and_types;
}
bool skipUntilFieldInObject(ReadBuffer & in, const String & desired_field_name)
{
while (!checkAndSkipObjectEnd(in))
{
auto field_name = JSONUtils::readFieldName(in);
if (field_name == desired_field_name)
return true;
}
return false;
}
void skipTheRestOfObject(ReadBuffer & in)
{
while (!checkAndSkipObjectEnd(in))
{
skipComma(in);
auto name = readFieldName(in);
skipWhitespaceIfAny(in);
skipJSONField(in, name);
}
}

View File

@ -44,9 +44,7 @@ namespace JSONUtils
const FormatSettings & format_settings,
bool yield_strings);
DataTypePtr getCommonTypeForJSONFormats(const DataTypePtr & first, const DataTypePtr & second, bool allow_bools_as_numbers);
void makeNamesAndTypesWithValidUTF8(NamesAndTypes & fields, const FormatSettings & settings, bool & need_validate_utf8);
Strings makeNamesValidJSONStrings(const Strings & names, const FormatSettings & settings, bool validate_utf8);
/// Functions helpers for writing JSON data to WriteBuffer.
@ -56,8 +54,12 @@ namespace JSONUtils
void writeObjectStart(WriteBuffer & out, size_t indent = 0, const char * title = nullptr);
void writeCompactObjectStart(WriteBuffer & out, size_t indent = 0, const char * title = nullptr);
void writeObjectEnd(WriteBuffer & out, size_t indent = 0);
void writeCompactObjectEnd(WriteBuffer & out);
void writeArrayStart(WriteBuffer & out, size_t indent = 0, const char * title = nullptr);
void writeCompactArrayStart(WriteBuffer & out, size_t indent = 0, const char * title = nullptr);
@ -74,11 +76,12 @@ namespace JSONUtils
const FormatSettings & settings,
WriteBuffer & out,
const std::optional<String> & name = std::nullopt,
size_t indent = 0);
size_t indent = 0,
const char * title_after_delimiter = " ");
void writeColumns(
const Columns & columns,
const NamesAndTypes & fields,
const Names & names,
const Serializations & serializations,
size_t row_num,
bool yield_strings,
@ -94,7 +97,7 @@ namespace JSONUtils
const FormatSettings & settings,
WriteBuffer & out);
void writeMetadata(const NamesAndTypes & fields, const FormatSettings & settings, WriteBuffer & out);
void writeMetadata(const Names & names, const DataTypes & types, const FormatSettings & settings, WriteBuffer & out);
void writeAdditionalInfo(
size_t rows,
@ -104,6 +107,26 @@ namespace JSONUtils
const Progress & progress,
bool write_statistics,
WriteBuffer & out);
void skipColon(ReadBuffer & in);
void skipComma(ReadBuffer & in);
String readFieldName(ReadBuffer & in);
void skipArrayStart(ReadBuffer & in);
void skipArrayEnd(ReadBuffer & in);
bool checkAndSkipArrayStart(ReadBuffer & in);
bool checkAndSkipArrayEnd(ReadBuffer & in);
void skipObjectStart(ReadBuffer & in);
void skipObjectEnd(ReadBuffer & in);
bool checkAndSkipObjectEnd(ReadBuffer & in);
NamesAndTypesList readMetadata(ReadBuffer & in);
NamesAndTypesList readMetadataAndValidateHeader(ReadBuffer & in, const Block & header);
bool skipUntilFieldInObject(ReadBuffer & in, const String & desired_field_name);
void skipTheRestOfObject(ReadBuffer & in);
}
}

View File

@ -34,14 +34,22 @@ void registerInputFormatCSV(FormatFactory & factory);
void registerOutputFormatCSV(FormatFactory & factory);
void registerInputFormatTSKV(FormatFactory & factory);
void registerOutputFormatTSKV(FormatFactory & factory);
void registerOutputFormatJSON(FormatFactory & factory);
void registerInputFormatJSON(FormatFactory & factory);
void registerOutputFormatJSONCompact(FormatFactory & factory);
void registerInputFormatJSONCompact(FormatFactory & factory);
void registerInputFormatJSONEachRow(FormatFactory & factory);
void registerOutputFormatJSONEachRow(FormatFactory & factory);
void registerInputFormatJSONObjectEachRow(FormatFactory & factory);
void registerOutputFormatJSONObjectEachRow(FormatFactory & factory);
void registerInputFormatJSONCompactEachRow(FormatFactory & factory);
void registerOutputFormatJSONCompactEachRow(FormatFactory & factory);
void registerInputFormatJSONColumns(FormatFactory & factory);
void registerOutputFormatJSONColumns(FormatFactory & factory);
void registerInputFormatJSONCompactColumns(FormatFactory & factory);
void registerOutputFormatJSONCompactColumns(FormatFactory & factory);
void registerInputFormatJSONColumnsWithMetadata(FormatFactory & factory);
void registerOutputFormatJSONColumnsWithMetadata(FormatFactory & factory);
void registerInputFormatProtobuf(FormatFactory & factory);
void registerOutputFormatProtobuf(FormatFactory & factory);
void registerInputFormatProtobufList(FormatFactory & factory);
@ -71,10 +79,7 @@ void registerOutputFormatPretty(FormatFactory & factory);
void registerOutputFormatPrettyCompact(FormatFactory & factory);
void registerOutputFormatPrettySpace(FormatFactory & factory);
void registerOutputFormatVertical(FormatFactory & factory);
void registerOutputFormatJSON(FormatFactory & factory);
void registerOutputFormatJSONCompact(FormatFactory & factory);
void registerOutputFormatJSONEachRowWithProgress(FormatFactory & factory);
void registerOutputFormatJSONColumnsWithMetadata(FormatFactory & factory);
void registerOutputFormatXML(FormatFactory & factory);
void registerOutputFormatODBCDriver2(FormatFactory & factory);
void registerOutputFormatNull(FormatFactory & factory);
@ -107,11 +112,14 @@ void registerORCSchemaReader(FormatFactory & factory);
void registerTSVSchemaReader(FormatFactory & factory);
void registerCSVSchemaReader(FormatFactory & factory);
void registerJSONCompactEachRowSchemaReader(FormatFactory & factory);
void registerJSONSchemaReader(FormatFactory & factory);
void registerJSONEachRowSchemaReader(FormatFactory & factory);
void registerJSONObjectEachRowSchemaReader(FormatFactory & factory);
void registerJSONAsStringSchemaReader(FormatFactory & factory);
void registerJSONAsObjectSchemaReader(FormatFactory & factory);
void registerJSONColumnsSchemaReader(FormatFactory & factory);
void registerJSONCompactColumnsSchemaReader(FormatFactory & factory);
void registerJSONColumnsWithMetadataSchemaReader(FormatFactory & factory);
void registerNativeSchemaReader(FormatFactory & factory);
void registerRowBinaryWithNamesAndTypesSchemaReader(FormatFactory & factory);
void registerAvroSchemaReader(FormatFactory & factory);
@ -160,14 +168,22 @@ void registerFormats()
registerOutputFormatCSV(factory);
registerInputFormatTSKV(factory);
registerOutputFormatTSKV(factory);
registerOutputFormatJSON(factory);
registerInputFormatJSON(factory);
registerOutputFormatJSONCompact(factory);
registerInputFormatJSONCompact(factory);
registerInputFormatJSONEachRow(factory);
registerOutputFormatJSONEachRow(factory);
registerInputFormatJSONObjectEachRow(factory);
registerOutputFormatJSONObjectEachRow(factory);
registerInputFormatJSONCompactEachRow(factory);
registerOutputFormatJSONCompactEachRow(factory);
registerInputFormatJSONColumns(factory);
registerOutputFormatJSONColumns(factory);
registerInputFormatJSONCompactColumns(factory);
registerOutputFormatJSONCompactColumns(factory);
registerInputFormatJSONColumnsWithMetadata(factory);
registerOutputFormatJSONColumnsWithMetadata(factory);
registerInputFormatProtobuf(factory);
registerOutputFormatProtobufList(factory);
registerInputFormatProtobufList(factory);
@ -194,10 +210,7 @@ void registerFormats()
registerOutputFormatPrettyCompact(factory);
registerOutputFormatPrettySpace(factory);
registerOutputFormatVertical(factory);
registerOutputFormatJSON(factory);
registerOutputFormatJSONCompact(factory);
registerOutputFormatJSONEachRowWithProgress(factory);
registerOutputFormatJSONColumnsWithMetadata(factory);
registerOutputFormatXML(factory);
registerOutputFormatODBCDriver2(factory);
registerOutputFormatNull(factory);
@ -228,12 +241,15 @@ void registerFormats()
registerORCSchemaReader(factory);
registerTSVSchemaReader(factory);
registerCSVSchemaReader(factory);
registerJSONSchemaReader(factory);
registerJSONCompactEachRowSchemaReader(factory);
registerJSONEachRowSchemaReader(factory);
registerJSONObjectEachRowSchemaReader(factory);
registerJSONAsStringSchemaReader(factory);
registerJSONAsObjectSchemaReader(factory);
registerJSONColumnsSchemaReader(factory);
registerJSONCompactColumnsSchemaReader(factory);
registerJSONColumnsWithMetadataSchemaReader(factory);
registerNativeSchemaReader(factory);
registerRowBinaryWithNamesAndTypesSchemaReader(factory);
registerAvroSchemaReader(factory);

View File

@ -372,7 +372,7 @@ void writeJSONNumber(T x, WriteBuffer & ostr, const FormatSettings & settings)
bool is_finite = isFinite(x);
const bool need_quote = (is_integer<T> && (sizeof(T) >= 8) && settings.json.quote_64bit_integers)
|| (settings.json.quote_denormals && !is_finite);
|| (settings.json.quote_denormals && !is_finite) || (is_floating_point<T> && (sizeof(T) >= 8) && settings.json.quote_64bit_floats);
if (need_quote)
writeChar('"', ostr);

View File

@ -548,9 +548,15 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (insert_query)
{
if (insert_query->table_id)
{
insert_query->table_id = context->resolveStorageID(insert_query->table_id);
LOG_DEBUG(&Poco::Logger::get("executeQuery"), "2) database: {}", insert_query->table_id.getDatabaseName());
}
else if (auto table = insert_query->getTable(); !table.empty())
{
insert_query->table_id = context->resolveStorageID(StorageID{insert_query->getDatabase(), table});
LOG_DEBUG(&Poco::Logger::get("executeQuery"), "2) database: {}", insert_query->table_id.getDatabaseName());
}
}
if (insert_query && insert_query->select)

View File

@ -46,7 +46,7 @@ void chooseResultColumnType(
{
throw Exception(
ErrorCodes::TYPE_MISMATCH,
"Automatically defined type {} for column {} in row {} differs from type defined by previous rows: {}. "
"Automatically defined type {} for column '{}' in row {} differs from type defined by previous rows: {}. "
"You can specify the type for this column using setting schema_inference_hints",
type->getName(),
column_name,
@ -62,7 +62,7 @@ void checkResultColumnTypeAndAppend(NamesAndTypesList & result, DataTypePtr & ty
if (!default_type)
throw Exception(
ErrorCodes::ONLY_NULLS_WHILE_READING_SCHEMA,
"Cannot determine type for column {} by first {} rows of data, most likely this column contains only Nulls or empty "
"Cannot determine type for column '{}' by first {} rows of data, most likely this column contains only Nulls or empty "
"Arrays/Maps. You can specify the type for this column using setting schema_inference_hints",
name,
rows_read);

View File

@ -401,13 +401,16 @@ void registerCSVSchemaReader(FormatFactory & factory)
{
return std::make_shared<CSVSchemaReader>(buf, with_names, with_types, settings);
});
factory.registerAdditionalInfoForSchemaCacheGetter(format_name, [with_names](const FormatSettings & settings)
if (!with_types)
{
String result = getAdditionalFormatInfoByEscapingRule(settings, FormatSettings::EscapingRule::CSV);
if (!with_names)
result += fmt::format(", column_names_for_schema_inference={}", settings.column_names_for_schema_inference);
return result;
});
factory.registerAdditionalInfoForSchemaCacheGetter(format_name, [with_names](const FormatSettings & settings)
{
String result = getAdditionalFormatInfoByEscapingRule(settings, FormatSettings::EscapingRule::CSV);
if (!with_names)
result += fmt::format(", column_names_for_schema_inference={}", settings.column_names_for_schema_inference);
return result;
});
}
};
registerWithNamesAndTypes("CSV", register_func);

View File

@ -353,19 +353,24 @@ void registerCustomSeparatedSchemaReader(FormatFactory & factory)
{
return std::make_shared<CustomSeparatedSchemaReader>(buf, with_names, with_types, ignore_spaces, settings);
});
factory.registerAdditionalInfoForSchemaCacheGetter(format_name, [](const FormatSettings & settings)
if (!with_types)
{
String result = getAdditionalFormatInfoByEscapingRule(settings, settings.custom.escaping_rule);
return result + fmt::format(
", result_before_delimiter={}, row_before_delimiter={}, field_delimiter={},"
" row_after_delimiter={}, row_between_delimiter={}, result_after_delimiter={}",
settings.custom.result_before_delimiter,
settings.custom.row_before_delimiter,
settings.custom.field_delimiter,
settings.custom.row_after_delimiter,
settings.custom.row_between_delimiter,
settings.custom.result_after_delimiter);
});
factory.registerAdditionalInfoForSchemaCacheGetter(format_name, [with_names](const FormatSettings & settings)
{
String result = getAdditionalFormatInfoByEscapingRule(settings, settings.custom.escaping_rule);
if (!with_names)
result += fmt::format(", column_names_for_schema_inference={}", settings.column_names_for_schema_inference);
return result + fmt::format(
", result_before_delimiter={}, row_before_delimiter={}, field_delimiter={},"
" row_after_delimiter={}, row_between_delimiter={}, result_after_delimiter={}",
settings.custom.result_before_delimiter,
settings.custom.row_before_delimiter,
settings.custom.field_delimiter,
settings.custom.row_after_delimiter,
settings.custom.row_between_delimiter,
settings.custom.result_after_delimiter);
});
}
};
registerWithNamesAndTypes(ignore_spaces ? "CustomSeparatedIgnoreSpaces" : "CustomSeparated", register_func);

View File

@ -2,6 +2,7 @@
#include <IO/ReadHelpers.h>
#include <Formats/FormatFactory.h>
#include <Formats/EscapingRuleUtils.h>
#include <Formats/JSONUtils.h>
namespace DB
{
@ -12,34 +13,19 @@ JSONColumnsReader::JSONColumnsReader(ReadBuffer & in_) : JSONColumnsReaderBase(i
void JSONColumnsReader::readChunkStart()
{
skipWhitespaceIfAny(*in);
assertChar('{', *in);
skipWhitespaceIfAny(*in);
JSONUtils::skipObjectStart(*in);
}
std::optional<String> JSONColumnsReader::readColumnStart()
{
skipWhitespaceIfAny(*in);
String name;
readJSONString(name, *in);
skipWhitespaceIfAny(*in);
assertChar(':', *in);
skipWhitespaceIfAny(*in);
assertChar('[', *in);
skipWhitespaceIfAny(*in);
auto name = JSONUtils::readFieldName(*in);
JSONUtils::skipArrayStart(*in);
return name;
}
bool JSONColumnsReader::checkChunkEnd()
{
skipWhitespaceIfAny(*in);
if (!in->eof() && *in->position() == '}')
{
++in->position();
skipWhitespaceIfAny(*in);
return true;
}
return false;
return JSONUtils::checkAndSkipObjectEnd(*in);
}

View File

@ -21,23 +21,14 @@ JSONColumnsReaderBase::JSONColumnsReaderBase(ReadBuffer & in_) : in(&in_)
bool JSONColumnsReaderBase::checkColumnEnd()
{
skipWhitespaceIfAny(*in);
if (!in->eof() && *in->position() == ']')
{
++in->position();
skipWhitespaceIfAny(*in);
return true;
}
return false;
return JSONUtils::checkAndSkipArrayEnd(*in);
}
bool JSONColumnsReaderBase::checkColumnEndOrSkipFieldDelimiter()
{
if (checkColumnEnd())
return true;
skipWhitespaceIfAny(*in);
assertChar(',', *in);
skipWhitespaceIfAny(*in);
JSONUtils::skipComma(*in);
return false;
}
@ -45,9 +36,7 @@ bool JSONColumnsReaderBase::checkChunkEndOrSkipColumnDelimiter()
{
if (checkChunkEnd())
return true;
skipWhitespaceIfAny(*in);
assertChar(',', *in);
skipWhitespaceIfAny(*in);
JSONUtils::skipComma(*in);
return false;
}

View File

@ -7,15 +7,10 @@
namespace DB
{
JSONColumnsBlockOutputFormat::JSONColumnsBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_, size_t indent_)
: JSONColumnsBlockOutputFormatBase(out_, header_, format_settings_), fields(header_.getNamesAndTypes()), indent(indent_)
JSONColumnsBlockOutputFormat::JSONColumnsBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_, bool validate_utf8, size_t indent_)
: JSONColumnsBlockOutputFormatBase(out_, header_, format_settings_, validate_utf8), indent(indent_)
{
for (auto & field : fields)
{
WriteBufferFromOwnString buf;
writeJSONString(field.name, buf, format_settings);
field.name = buf.str().substr(1, buf.str().size() - 2);
}
names = JSONUtils::makeNamesValidJSONStrings(header_.getNames(), format_settings, validate_utf8);
}
void JSONColumnsBlockOutputFormat::writeChunkStart()
@ -25,7 +20,7 @@ void JSONColumnsBlockOutputFormat::writeChunkStart()
void JSONColumnsBlockOutputFormat::writeColumnStart(size_t column_index)
{
JSONUtils::writeCompactArrayStart(*ostr, indent + 1, fields[column_index].name.data());
JSONUtils::writeCompactArrayStart(*ostr, indent + 1, names[column_index].data());
}
void JSONColumnsBlockOutputFormat::writeChunkEnd()
@ -42,7 +37,7 @@ void registerOutputFormatJSONColumns(FormatFactory & factory)
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<JSONColumnsBlockOutputFormat>(buf, sample, format_settings);
return std::make_shared<JSONColumnsBlockOutputFormat>(buf, sample, format_settings, format_settings.json.validate_utf8);
});
}

View File

@ -8,14 +8,14 @@ namespace DB
/* Format JSONColumns outputs all data as a single block in the next format:
* {
* "name1": [value1, value2, value3, ...],
* "name2": [value1, value2m value3, ...],
* "name2": [value1, value2, value3, ...],
* ...
* }
*/
class JSONColumnsBlockOutputFormat : public JSONColumnsBlockOutputFormatBase
{
public:
JSONColumnsBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_, size_t indent_ = 0);
JSONColumnsBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_, bool validate_utf8, size_t indent_ = 0);
String getName() const override { return "JSONColumnsBlockOutputFormat"; }
@ -25,7 +25,7 @@ protected:
void writeColumnStart(size_t column_index) override;
NamesAndTypes fields;
Names names;
size_t indent;
};

View File

@ -1,5 +1,6 @@
#include <Processors/Formats/Impl/JSONColumnsBlockOutputFormatBase.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferValidUTF8.h>
#include <Formats/JSONUtils.h>
@ -7,11 +8,10 @@ namespace DB
{
JSONColumnsBlockOutputFormatBase::JSONColumnsBlockOutputFormatBase(
WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
: IOutputFormat(header_, out_)
WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_, bool validate_utf8)
: OutputFormatWithUTF8ValidationAdaptor(validate_utf8, header_, out_)
, format_settings(format_settings_)
, serializations(header_.getSerializations())
, ostr(&out)
{
}

View File

@ -1,8 +1,9 @@
#pragma once
#include <Core/Block.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Formats/OutputFormatWithUTF8ValidationAdaptor.h>
#include <Formats/FormatSettings.h>
#include <IO/WriteBuffer.h>
namespace DB
@ -12,10 +13,10 @@ class WriteBuffer;
/// Base class for Columnar JSON output formats.
/// It buffers all data and outputs it as a single block in writeSuffix() method.
class JSONColumnsBlockOutputFormatBase : public IOutputFormat
class JSONColumnsBlockOutputFormatBase : public OutputFormatWithUTF8ValidationAdaptor
{
public:
JSONColumnsBlockOutputFormatBase(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_);
JSONColumnsBlockOutputFormatBase(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_, bool validate_utf8);
String getName() const override { return "JSONColumnsBlockOutputFormatBase"; }
@ -34,8 +35,6 @@ protected:
const FormatSettings format_settings;
const Serializations serializations;
WriteBuffer * ostr;
Chunk mono_chunk;
};

View File

@ -0,0 +1,84 @@
#include <Processors/Formats/Impl/JSONColumnsWithMetadataBlockInputFormat.h>
#include <IO/ReadHelpers.h>
#include <Formats/FormatFactory.h>
#include <Formats/EscapingRuleUtils.h>
#include <Formats/JSONUtils.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
}
JSONColumnsWithMetadataReader::JSONColumnsWithMetadataReader(ReadBuffer & in_, const Block & header_, const FormatSettings & settings)
: JSONColumnsReader(in_), header(header_), validate_types_from_metadata(settings.json.validate_types_from_metadata)
{
}
void JSONColumnsWithMetadataReader::readChunkStart()
{
skipBOMIfExists(*in);
JSONUtils::skipObjectStart(*in);
if (validate_types_from_metadata)
JSONUtils::readMetadataAndValidateHeader(*in, header);
else
JSONUtils::readMetadata(*in);
JSONUtils::skipComma(*in);
if (!JSONUtils::skipUntilFieldInObject(*in, "data"))
throw Exception(ErrorCodes::INCORRECT_DATA, "Expected field \"data\" with table content");
JSONUtils::skipObjectStart(*in);
}
bool JSONColumnsWithMetadataReader::checkChunkEnd()
{
if (!JSONUtils::checkAndSkipObjectEnd(*in))
return false;
JSONUtils::skipTheRestOfObject(*in);
assertEOF(*in);
return true;
}
JSONColumnsWithMetadataSchemaReader::JSONColumnsWithMetadataSchemaReader(ReadBuffer & in_) : ISchemaReader(in_)
{
}
NamesAndTypesList JSONColumnsWithMetadataSchemaReader::readSchema()
{
skipBOMIfExists(in);
JSONUtils::skipObjectStart(in);
return JSONUtils::readMetadata(in);
}
void registerInputFormatJSONColumnsWithMetadata(FormatFactory & factory)
{
factory.registerInputFormat(
"JSONColumnsWithMetadata",
[](ReadBuffer & buf,
const Block &sample,
const RowInputFormatParams &,
const FormatSettings & settings)
{
return std::make_shared<JSONColumnsBlockInputFormatBase>(buf, sample, settings, std::make_unique<JSONColumnsWithMetadataReader>(buf, sample, settings));
}
);
factory.markFormatSupportsSubsetOfColumns("JSONColumnsWithMetadata");
}
void registerJSONColumnsWithMetadataSchemaReader(FormatFactory & factory)
{
factory.registerSchemaReader(
"JSONColumnsWithMetadata",
[](ReadBuffer & buf, const FormatSettings &)
{
return std::make_shared<JSONColumnsWithMetadataSchemaReader>(buf);
}
);
}
}

View File

@ -0,0 +1,31 @@
#pragma once
#include <Processors/Formats/Impl/JSONColumnsBlockInputFormat.h>
#include <Processors/Formats/ISchemaReader.h>
namespace DB
{
class JSONColumnsWithMetadataReader : public JSONColumnsReader
{
public:
JSONColumnsWithMetadataReader(ReadBuffer & in_, const Block & header_, const FormatSettings & settings);
void readChunkStart() override;
bool checkChunkEnd() override;
private:
const Block & header;
const bool validate_types_from_metadata;
};
class JSONColumnsWithMetadataSchemaReader : public ISchemaReader
{
public:
JSONColumnsWithMetadataSchemaReader(ReadBuffer & in_);
NamesAndTypesList readSchema() override;
};
}

View File

@ -13,22 +13,14 @@ namespace ErrorCodes
}
JSONColumnsWithMetadataBlockOutputFormat::JSONColumnsWithMetadataBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
: JSONColumnsBlockOutputFormat(out_, header_, format_settings_, 1)
: JSONColumnsBlockOutputFormat(out_, header_, format_settings_, true, 1), types(header_.getDataTypes())
{
bool need_validate_utf8 = false;
JSONUtils::makeNamesAndTypesWithValidUTF8(fields, format_settings, need_validate_utf8);
if (need_validate_utf8)
{
validating_ostr = std::make_unique<WriteBufferValidUTF8>(out);
ostr = validating_ostr.get();
}
}
void JSONColumnsWithMetadataBlockOutputFormat::writePrefix()
{
JSONUtils::writeObjectStart(*ostr);
JSONUtils::writeMetadata(fields, format_settings, *ostr);
JSONUtils::writeMetadata(names, types, format_settings, *ostr);
}
void JSONColumnsWithMetadataBlockOutputFormat::writeSuffix()
@ -66,7 +58,7 @@ void JSONColumnsWithMetadataBlockOutputFormat::consumeExtremes(Chunk chunk)
void JSONColumnsWithMetadataBlockOutputFormat::writeExtremesElement(const char * title, const Columns & columns, size_t row_num)
{
JSONUtils::writeObjectStart(*ostr, 2, title);
JSONUtils::writeColumns(columns, fields, serializations, row_num, false, format_settings, *ostr, 3);
JSONUtils::writeColumns(columns, names, serializations, row_num, false, format_settings, *ostr, 3);
JSONUtils::writeObjectEnd(*ostr, 2);
}
@ -79,7 +71,7 @@ void JSONColumnsWithMetadataBlockOutputFormat::consumeTotals(Chunk chunk)
const auto & columns = chunk.getColumns();
JSONUtils::writeFieldDelimiter(*ostr, 2);
JSONUtils::writeObjectStart(*ostr, 1, "totals");
JSONUtils::writeColumns(columns, fields, serializations, 0, false, format_settings, *ostr, 2);
JSONUtils::writeColumns(columns, names, serializations, 0, false, format_settings, *ostr, 2);
JSONUtils::writeObjectEnd(*ostr, 1);
}

View File

@ -59,8 +59,8 @@ protected:
void writeExtremesElement(const char * title, const Columns & columns, size_t row_num);
DataTypes types;
Statistics statistics;
std::unique_ptr<WriteBuffer> validating_ostr; /// Validates UTF-8 sequences, replaces bad sequences with replacement character.
size_t rows;
};

View File

@ -2,6 +2,7 @@
#include <IO/ReadHelpers.h>
#include <Formats/FormatFactory.h>
#include <Formats/EscapingRuleUtils.h>
#include <Formats/JSONUtils.h>
namespace DB
{
@ -12,29 +13,18 @@ JSONCompactColumnsReader::JSONCompactColumnsReader(ReadBuffer & in_) : JSONColum
void JSONCompactColumnsReader::readChunkStart()
{
skipWhitespaceIfAny(*in);
assertChar('[', *in);
skipWhitespaceIfAny(*in);
JSONUtils::skipArrayStart(*in);
}
std::optional<String> JSONCompactColumnsReader::readColumnStart()
{
skipWhitespaceIfAny(*in);
assertChar('[', *in);
skipWhitespaceIfAny(*in);
JSONUtils::skipArrayStart(*in);
return std::nullopt;
}
bool JSONCompactColumnsReader::checkChunkEnd()
{
skipWhitespaceIfAny(*in);
if (!in->eof() && *in->position() == ']')
{
++in->position();
skipWhitespaceIfAny(*in);
return true;
}
return false;
return JSONUtils::checkAndSkipArrayEnd(*in);
}

View File

@ -7,7 +7,7 @@ namespace DB
{
JSONCompactColumnsBlockOutputFormat::JSONCompactColumnsBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
: JSONColumnsBlockOutputFormatBase(out_, header_, format_settings_), column_names(header_.getNames())
: JSONColumnsBlockOutputFormatBase(out_, header_, format_settings_, format_settings_.json.validate_utf8), column_names(header_.getNames())
{
}

View File

@ -49,20 +49,17 @@ JSONCompactEachRowFormatReader::JSONCompactEachRowFormatReader(ReadBuffer & in_,
void JSONCompactEachRowFormatReader::skipRowStartDelimiter()
{
skipWhitespaceIfAny(*in);
assertChar('[', *in);
JSONUtils::skipArrayStart(*in);
}
void JSONCompactEachRowFormatReader::skipFieldDelimiter()
{
skipWhitespaceIfAny(*in);
assertChar(',', *in);
JSONUtils::skipComma(*in);
}
void JSONCompactEachRowFormatReader::skipRowEndDelimiter()
{
skipWhitespaceIfAny(*in);
assertChar(']', *in);
JSONUtils::skipArrayEnd(*in);
skipWhitespaceIfAny(*in);
if (!in->eof() && (*in->position() == ',' || *in->position() == ';'))
@ -130,8 +127,7 @@ bool JSONCompactEachRowFormatReader::parseFieldDelimiterWithDiagnosticInfo(Write
{
try
{
skipWhitespaceIfAny(*in);
assertChar(',', *in);
JSONUtils::skipComma(*in);
}
catch (const DB::Exception &)
{
@ -245,11 +241,16 @@ void registerJSONCompactEachRowSchemaReader(FormatFactory & factory)
{
return std::make_shared<JSONCompactEachRowRowSchemaReader>(buf, with_names, with_types, json_strings, settings);
});
factory.registerAdditionalInfoForSchemaCacheGetter(format_name, [](const FormatSettings & settings)
if (!with_types)
{
auto result = getAdditionalFormatInfoByEscapingRule(settings, FormatSettings::EscapingRule::JSON);
return result + fmt::format(", column_names_for_schema_inference={}", settings.column_names_for_schema_inference);
});
factory.registerAdditionalInfoForSchemaCacheGetter(format_name, [with_names](const FormatSettings & settings)
{
auto result = getAdditionalFormatInfoByEscapingRule(settings, FormatSettings::EscapingRule::JSON);
if (!with_names)
result += fmt::format(", column_names_for_schema_inference={}", settings.column_names_for_schema_inference);
return result;
});
}
};
registerWithNamesAndTypes(json_strings ? "JSONCompactStringsEachRow" : "JSONCompactEachRow", register_func);
}

View File

@ -38,7 +38,7 @@ private:
void syncAfterError() override;
};
class JSONCompactEachRowFormatReader final : public FormatWithNamesAndTypesReader
class JSONCompactEachRowFormatReader : public FormatWithNamesAndTypesReader
{
public:
JSONCompactEachRowFormatReader(ReadBuffer & in_, bool yield_strings_, const FormatSettings & format_settings_);

View File

@ -3,7 +3,7 @@
#include <Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.h>
#include <Formats/FormatFactory.h>
#include <Formats/registerWithNamesAndTypes.h>
#include <Formats/JSONUtils.h>
namespace DB
{
@ -16,7 +16,11 @@ JSONCompactEachRowRowOutputFormat::JSONCompactEachRowRowOutputFormat(WriteBuffer
bool with_names_,
bool with_types_,
bool yield_strings_)
: IRowOutputFormat(header_, out_, params_), settings(settings_), with_names(with_names_), with_types(with_types_), yield_strings(yield_strings_)
: RowOutputFormatWithUTF8ValidationAdaptor(settings_.json.validate_utf8, header_, out_, params_)
, settings(settings_)
, with_names(with_names_)
, with_types(with_types_)
, yield_strings(yield_strings_)
{
}
@ -37,7 +41,7 @@ void JSONCompactEachRowRowOutputFormat::writeField(const IColumn & column, const
void JSONCompactEachRowRowOutputFormat::writeFieldDelimiter()
{
writeCString(", ", out);
writeCString(", ", *ostr);
}
@ -49,12 +53,12 @@ void JSONCompactEachRowRowOutputFormat::writeRowStartDelimiter()
void JSONCompactEachRowRowOutputFormat::writeRowEndDelimiter()
{
writeCString("]\n", out);
writeCString("]\n", *ostr);
}
void JSONCompactEachRowRowOutputFormat::writeTotals(const Columns & columns, size_t row_num)
{
writeChar('\n', out);
writeChar('\n', *ostr);
size_t columns_size = columns.size();
writeRowStartDelimiter();
for (size_t i = 0; i < columns_size; ++i)
@ -69,6 +73,7 @@ void JSONCompactEachRowRowOutputFormat::writeTotals(const Columns & columns, siz
void JSONCompactEachRowRowOutputFormat::writeLine(const std::vector<String> & values)
{
JSONUtils::makeNamesValidJSONStrings(values, settings, settings.json.validate_utf8);
writeRowStartDelimiter();
for (size_t i = 0; i < values.size(); ++i)
{
@ -86,10 +91,10 @@ void JSONCompactEachRowRowOutputFormat::writePrefix()
const auto & header = getPort(PortKind::Main).getHeader();
if (with_names)
writeLine(header.getNames());
writeLine(JSONUtils::makeNamesValidJSONStrings(header.getNames(), settings, settings.json.validate_utf8));
if (with_types)
writeLine(header.getDataTypeNames());
writeLine(JSONUtils::makeNamesValidJSONStrings(header.getDataTypeNames(), settings, settings.json.validate_utf8));
}
void JSONCompactEachRowRowOutputFormat::consumeTotals(DB::Chunk chunk)

View File

@ -2,7 +2,7 @@
#include <Core/Block.h>
#include <IO/WriteBuffer.h>
#include <Processors/Formats/IRowOutputFormat.h>
#include <Processors/Formats/OutputFormatWithUTF8ValidationAdaptor.h>
#include <Formats/FormatSettings.h>
@ -10,9 +10,8 @@ namespace DB
{
/** The stream for outputting data in JSON format, by object per line.
* Does not validate UTF-8.
*/
class JSONCompactEachRowRowOutputFormat final : public IRowOutputFormat
class JSONCompactEachRowRowOutputFormat final : public RowOutputFormatWithUTF8ValidationAdaptor
{
public:
JSONCompactEachRowRowOutputFormat(

View File

@ -0,0 +1,83 @@
#include <Processors/Formats/Impl/JSONCompactRowInputFormat.h>
#include <Formats/JSONUtils.h>
#include <Formats/FormatFactory.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
}
JSONCompactRowInputFormat::JSONCompactRowInputFormat(
const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_)
: RowInputFormatWithNamesAndTypes(
header_, in_, params_, false, false, false, format_settings_, std::make_unique<JSONCompactFormatReader>(in_, format_settings_))
, validate_types_from_metadata(format_settings_.json.validate_types_from_metadata)
{
}
void JSONCompactRowInputFormat::readPrefix()
{
skipBOMIfExists(*in);
JSONUtils::skipObjectStart(*in);
if (validate_types_from_metadata)
{
auto names_and_types = JSONUtils::readMetadataAndValidateHeader(*in, getPort().getHeader());
Names column_names;
for (const auto & [name, type] : names_and_types)
column_names.push_back(name);
column_mapping->addColumns(column_names, column_indexes_by_names, format_settings);
}
else
{
JSONUtils::readMetadata(*in);
column_mapping->setupByHeader(getPort().getHeader());
}
JSONUtils::skipComma(*in);
if (!JSONUtils::skipUntilFieldInObject(*in, "data"))
throw Exception(ErrorCodes::INCORRECT_DATA, "Expected field \"data\" with table content");
JSONUtils::skipArrayStart(*in);
}
void JSONCompactRowInputFormat::readSuffix()
{
/// Array end was skipped in JSONCompactFormatReader::checkForSuffix
JSONUtils::skipTheRestOfObject(*in);
}
void JSONCompactRowInputFormat::syncAfterError()
{
skipToUnescapedNextLineOrEOF(*in);
}
JSONCompactFormatReader::JSONCompactFormatReader(ReadBuffer & in_, const FormatSettings & format_settings_)
: JSONCompactEachRowFormatReader(in_, false, format_settings_)
{
}
bool JSONCompactFormatReader::checkForSuffix()
{
return JSONUtils::checkAndSkipArrayEnd(*in);
}
void registerInputFormatJSONCompact(FormatFactory & factory)
{
factory.registerInputFormat("JSONCompact", [](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
return std::make_shared<JSONCompactRowInputFormat>(sample, buf, std::move(params), settings);
});
factory.markFormatSupportsSubsetOfColumns("JSONCompact");
}
}

View File

@ -0,0 +1,34 @@
#pragma once
#include <Processors/Formats/Impl/JSONCompactEachRowRowInputFormat.h>
#include <Processors/Formats/ISchemaReader.h>
namespace DB
{
class JSONCompactRowInputFormat final : public RowInputFormatWithNamesAndTypes
{
public:
JSONCompactRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_);
String getName() const override { return "JSONCompactRowInputFormat"; }
private:
bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;
void readPrefix() override;
void readSuffix() override;
const bool validate_types_from_metadata;
};
class JSONCompactFormatReader : public JSONCompactEachRowFormatReader
{
public:
JSONCompactFormatReader(ReadBuffer & in_, const FormatSettings & format_settings_);
bool checkForSuffix() override;
};
}

View File

@ -117,14 +117,6 @@ StringRef JSONEachRowRowInputFormat::readColumnName(ReadBuffer & buf)
return current_column_name;
}
static inline void skipColonDelimeter(ReadBuffer & istr)
{
skipWhitespaceIfAny(istr);
assertChar(':', istr);
skipWhitespaceIfAny(istr);
}
void JSONEachRowRowInputFormat::skipUnknownField(StringRef name_ref)
{
if (!format_settings.skip_unknown_fields)
@ -157,10 +149,7 @@ inline bool JSONEachRowRowInputFormat::advanceToNextKey(size_t key_index)
}
if (key_index > 0)
{
assertChar(',', *in);
skipWhitespaceIfAny(*in);
}
JSONUtils::skipComma(*in);
return true;
}
@ -182,7 +171,7 @@ void JSONEachRowRowInputFormat::readJSONObject(MutableColumns & columns)
current_column_name.assign(name_ref.data, name_ref.size);
name_ref = StringRef(current_column_name);
skipColonDelimeter(*in);
JSONUtils::skipColon(*in);
if (column_index == UNKNOWN_FIELD)
skipUnknownField(name_ref);
@ -193,7 +182,7 @@ void JSONEachRowRowInputFormat::readJSONObject(MutableColumns & columns)
}
else
{
skipColonDelimeter(*in);
JSONUtils::skipColon(*in);
readField(column_index, columns);
}
}
@ -215,32 +204,8 @@ bool JSONEachRowRowInputFormat::readRow(MutableColumns & columns, RowReadExtensi
return false;
skipWhitespaceIfAny(*in);
/// We consume , or \n before scanning a new row, instead scanning to next row at the end.
/// The reason is that if we want an exact number of rows read with LIMIT x
/// from a streaming table engine with text data format, like File or Kafka
/// then seeking to next ;, or \n would trigger reading of an extra row at the end.
/// Semicolon is added for convenience as it could be used at end of INSERT query.
bool is_first_row = getCurrentUnitNumber() == 0 && getTotalRows() == 1;
if (!in->eof())
{
/// There may be optional ',' (but not before the first row)
if (!is_first_row && *in->position() == ',')
++in->position();
else if (!data_in_square_brackets && *in->position() == ';')
{
/// ';' means the end of query (but it cannot be before ']')
return allow_new_rows = false;
}
else if (data_in_square_brackets && *in->position() == ']')
{
/// ']' means the end of query
return allow_new_rows = false;
}
}
skipWhitespaceIfAny(*in);
if (in->eof())
if (checkEndOfData(is_first_row))
return false;
size_t num_columns = columns.size();
@ -249,6 +214,7 @@ bool JSONEachRowRowInputFormat::readRow(MutableColumns & columns, RowReadExtensi
seen_columns.assign(num_columns, false);
nested_prefix_length = 0;
readRowStart();
readJSONObject(columns);
const auto & header = getPort().getHeader();
@ -267,6 +233,37 @@ bool JSONEachRowRowInputFormat::readRow(MutableColumns & columns, RowReadExtensi
return true;
}
bool JSONEachRowRowInputFormat::checkEndOfData(bool is_first_row)
{
/// We consume , or \n before scanning a new row, instead scanning to next row at the end.
/// The reason is that if we want an exact number of rows read with LIMIT x
/// from a streaming table engine with text data format, like File or Kafka
/// then seeking to next ;, or \n would trigger reading of an extra row at the end.
/// Semicolon is added for convenience as it could be used at end of INSERT query.
if (!in->eof())
{
/// There may be optional ',' (but not before the first row)
if (!is_first_row && *in->position() == ',')
++in->position();
else if (!data_in_square_brackets && *in->position() == ';')
{
/// ';' means the end of query (but it cannot be before ']')
allow_new_rows = false;
return true;
}
else if (data_in_square_brackets && *in->position() == ']')
{
/// ']' means the end of query
allow_new_rows = false;
return true;
}
}
skipWhitespaceIfAny(*in);
return in->eof();
}
void JSONEachRowRowInputFormat::syncAfterError()
{
@ -286,19 +283,15 @@ void JSONEachRowRowInputFormat::readPrefix()
{
/// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
skipBOMIfExists(*in);
skipWhitespaceIfAny(*in);
data_in_square_brackets = checkChar('[', *in);
data_in_square_brackets = JSONUtils::checkAndSkipArrayStart(*in);
}
void JSONEachRowRowInputFormat::readSuffix()
{
skipWhitespaceIfAny(*in);
if (data_in_square_brackets)
{
assertChar(']', *in);
skipWhitespaceIfAny(*in);
}
JSONUtils::skipArrayEnd(*in);
if (!in->eof() && *in->position() == ';')
{
++in->position();
@ -318,9 +311,7 @@ NamesAndTypesList JSONEachRowSchemaReader::readRowAndGetNamesAndDataTypes(bool &
if (first_row)
{
skipBOMIfExists(in);
skipWhitespaceIfAny(in);
if (checkChar('[', in))
data_in_square_brackets = true;
data_in_square_brackets = JSONUtils::checkAndSkipArrayStart(in);
first_row = false;
}
else

View File

@ -18,7 +18,7 @@ class ReadBuffer;
* Fields can be listed in any order (including, in different lines there may be different order),
* and some fields may be missing.
*/
class JSONEachRowRowInputFormat final : public IRowInputFormat
class JSONEachRowRowInputFormat : public IRowInputFormat
{
public:
JSONEachRowRowInputFormat(
@ -48,6 +48,9 @@ private:
void readJSONObject(MutableColumns & columns);
void readNestedData(const String & name, MutableColumns & columns);
virtual void readRowStart() {}
virtual bool checkEndOfData(bool is_first_row);
const FormatSettings format_settings;
/// Buffer for the read from the stream field name. Used when you have to copy it.
@ -77,12 +80,13 @@ private:
/// Cached search results for previous row (keyed as index in JSON object) - used as a hint.
std::vector<NameMap::LookupResult> prev_positions;
/// This flag is needed to know if data is in square brackets.
bool data_in_square_brackets = false;
bool allow_new_rows = true;
bool yield_strings;
protected:
/// This flag is needed to know if data is in square brackets.
bool data_in_square_brackets = false;
};
class JSONEachRowSchemaReader : public IRowWithNamesSchemaReader

View File

@ -2,6 +2,7 @@
#include <IO/WriteBufferValidUTF8.h>
#include <Processors/Formats/Impl/JSONEachRowRowOutputFormat.h>
#include <Formats/FormatFactory.h>
#include <Formats/JSONUtils.h>
namespace DB
@ -13,36 +14,16 @@ JSONEachRowRowOutputFormat::JSONEachRowRowOutputFormat(
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSettings & settings_)
: IRowOutputFormat(header_, out_, params_),
: RowOutputFormatWithUTF8ValidationAdaptor(settings_.json.validate_utf8, header_, out_, params_),
settings(settings_)
{
const auto & sample = getPort(PortKind::Main).getHeader();
size_t columns = sample.columns();
fields.resize(columns);
for (size_t i = 0; i < columns; ++i)
{
WriteBufferFromString buf(fields[i]);
writeJSONString(sample.getByPosition(i).name, buf, settings);
}
fields = JSONUtils::makeNamesValidJSONStrings(getPort(PortKind::Main).getHeader().getNames(), settings, settings.json.validate_utf8);
}
void JSONEachRowRowOutputFormat::writeField(const IColumn & column, const ISerialization & serialization, size_t row_num)
{
writeString(fields[field_number], out);
writeChar(':', out);
if (settings.json.serialize_as_strings)
{
WriteBufferFromOwnString buf;
serialization.serializeText(column, row_num, buf, settings);
writeJSONString(buf.str(), out, settings);
}
else
serialization.serializeTextJSON(column, row_num, out, settings);
JSONUtils::writeFieldFromColumn(column, serialization, row_num, settings.json.serialize_as_strings, settings, *ostr, fields[field_number], 0, "");
++field_number;
}
@ -87,11 +68,11 @@ void JSONEachRowRowOutputFormat::writeRowEndDelimiter()
// output.
if (settings.json.array_of_rows)
{
writeCString("}", out);
writeChar('}', *ostr);
}
else
{
writeCString("}\n", out);
writeCString("}\n", *ostr);
}
field_number = 0;
}
@ -102,7 +83,7 @@ void JSONEachRowRowOutputFormat::writeRowBetweenDelimiter()
// We preserve an existing bug here for compatibility. See the comment above.
if (settings.json.array_of_rows)
{
writeCString(",\n", out);
writeCString(",\n", *ostr);
}
}
@ -120,64 +101,33 @@ void JSONEachRowRowOutputFormat::writeSuffix()
{
if (settings.json.array_of_rows)
{
writeCString("\n]\n", out);
writeCString("\n]\n", *ostr);
}
}
void registerOutputFormatJSONEachRow(FormatFactory & factory)
{
factory.registerOutputFormat("JSONEachRow", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & _format_settings)
auto register_function = [&](const String & format, bool serialize_as_strings)
{
FormatSettings settings = _format_settings;
settings.json.serialize_as_strings = false;
return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, params,
settings);
});
factory.markOutputFormatSupportsParallelFormatting("JSONEachRow");
factory.registerOutputFormat(format, [serialize_as_strings](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & _format_settings)
{
FormatSettings settings = _format_settings;
settings.json.serialize_as_strings = serialize_as_strings;
return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, params,
settings);
});
factory.markOutputFormatSupportsParallelFormatting(format);
};
factory.registerOutputFormat("JSONLines", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & _format_settings)
{
FormatSettings settings = _format_settings;
settings.json.serialize_as_strings = false;
return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, params,
settings);
});
factory.markOutputFormatSupportsParallelFormatting("JSONLines");
factory.registerOutputFormat("NDJSON", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & _format_settings)
{
FormatSettings settings = _format_settings;
settings.json.serialize_as_strings = false;
return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, params,
settings);
});
factory.markOutputFormatSupportsParallelFormatting("NDJSON");
factory.registerOutputFormat("JSONStringsEachRow", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & _format_settings)
{
FormatSettings settings = _format_settings;
settings.json.serialize_as_strings = true;
return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, params,
settings);
});
factory.markOutputFormatSupportsParallelFormatting("JSONStringEachRow");
register_function("JSONEachRow", false);
register_function("JSONLines", false);
register_function("NDJSON", false);
register_function("JSONStringsEachRow", true);
}
}

View File

@ -2,7 +2,7 @@
#include <Core/Block.h>
#include <IO/WriteBuffer.h>
#include <Processors/Formats/IRowOutputFormat.h>
#include <Processors/Formats/OutputFormatWithUTF8ValidationAdaptor.h>
#include <Formats/FormatSettings.h>
@ -10,9 +10,8 @@ namespace DB
{
/** The stream for outputting data in JSON format, by object per line.
* Does not validate UTF-8.
*/
class JSONEachRowRowOutputFormat : public IRowOutputFormat
class JSONEachRowRowOutputFormat : public RowOutputFormatWithUTF8ValidationAdaptor
{
public:
JSONEachRowRowOutputFormat(
@ -23,7 +22,6 @@ public:
String getName() const override { return "JSONEachRowRowOutputFormat"; }
public:
/// Content-Type to set when sending HTTP response.
String getContentType() const override
{

View File

@ -11,12 +11,12 @@ void JSONEachRowWithProgressRowOutputFormat::writeRowStartDelimiter()
{
if (has_progress)
writeProgress();
writeCString("{\"row\":{", out);
writeCString("{\"row\":{", *ostr);
}
void JSONEachRowWithProgressRowOutputFormat::writeRowEndDelimiter()
{
writeCString("}}\n", out);
writeCString("}}\n", *ostr);
field_number = 0;
}
@ -24,11 +24,11 @@ void JSONEachRowWithProgressRowOutputFormat::onProgress(const Progress & value)
{
progress.incrementPiecewiseAtomically(value);
String progress_line;
WriteBufferFromString ostr(progress_line);
writeCString("{\"progress\":", ostr);
progress.writeJSON(ostr);
writeCString("}\n", ostr);
ostr.finalize();
WriteBufferFromString buf(progress_line);
writeCString("{\"progress\":", buf);
progress.writeJSON(buf);
writeCString("}\n", buf);
buf.finalize();
std::lock_guard lock(progress_lines_mutex);
progress_lines.emplace_back(std::move(progress_line));
has_progress = true;
@ -38,7 +38,7 @@ void JSONEachRowWithProgressRowOutputFormat::flush()
{
if (has_progress)
writeProgress();
IOutputFormat::flush();
JSONEachRowRowOutputFormat::flush();
}
void JSONEachRowWithProgressRowOutputFormat::writeSuffix()
@ -52,7 +52,7 @@ void JSONEachRowWithProgressRowOutputFormat::writeProgress()
{
std::lock_guard lock(progress_lines_mutex);
for (const auto & progress_line : progress_lines)
writeString(progress_line, out);
writeString(progress_line, *ostr);
progress_lines.clear();
has_progress = false;
}

View File

@ -0,0 +1,90 @@
#include <Processors/Formats/Impl/JSONObjectEachRowRowInputFormat.h>
#include <Formats/JSONUtils.h>
#include <Formats/FormatFactory.h>
#include <Formats/EscapingRuleUtils.h>
namespace DB
{
JSONObjectEachRowInputFormat::JSONObjectEachRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_)
: JSONEachRowRowInputFormat(in_, header_, params_, format_settings_, false)
{
}
void JSONObjectEachRowInputFormat::readPrefix()
{
JSONUtils::skipObjectStart(*in);
}
void JSONObjectEachRowInputFormat::readRowStart()
{
JSONUtils::readFieldName(*in);
}
bool JSONObjectEachRowInputFormat::checkEndOfData(bool is_first_row)
{
if (in->eof() || JSONUtils::checkAndSkipObjectEnd(*in))
return true;
if (!is_first_row)
JSONUtils::skipComma(*in);
return false;
}
JSONObjectEachRowSchemaReader::JSONObjectEachRowSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_)
: IRowWithNamesSchemaReader(in_, format_settings_)
{
}
NamesAndTypesList JSONObjectEachRowSchemaReader::readRowAndGetNamesAndDataTypes(bool & eof)
{
if (first_row)
JSONUtils::skipObjectStart(in);
if (in.eof() || JSONUtils::checkAndSkipObjectEnd(in))
{
eof = true;
return {};
}
if (first_row)
first_row = false;
else
JSONUtils::skipComma(in);
JSONUtils::readFieldName(in);
return JSONUtils::readRowAndGetNamesAndDataTypesForJSONEachRow(in, format_settings, false);
}
void JSONObjectEachRowSchemaReader::transformTypesIfNeeded(DataTypePtr & type, DataTypePtr & new_type)
{
transformInferredJSONTypesIfNeeded(type, new_type, format_settings);
}
void registerInputFormatJSONObjectEachRow(FormatFactory & factory)
{
factory.registerInputFormat("JSONObjectEachRow", [](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
return std::make_shared<JSONObjectEachRowInputFormat>(buf, sample, std::move(params), settings);
});
factory.markFormatSupportsSubsetOfColumns("JSONObjectEachRow");
}
void registerJSONObjectEachRowSchemaReader(FormatFactory & factory)
{
factory.registerSchemaReader("JSONObjectEachRow", [](ReadBuffer & buf, const FormatSettings & settings)
{
return std::make_unique<JSONObjectEachRowSchemaReader>(buf, settings);
});
factory.registerAdditionalInfoForSchemaCacheGetter("JSONObjectEachRow", [](const FormatSettings & settings)
{
return getAdditionalFormatInfoByEscapingRule(settings, FormatSettings::EscapingRule::JSON);
});
}
}

View File

@ -0,0 +1,47 @@
#pragma once
#include <Core/Block.h>
#include <Processors/Formats/Impl/JSONEachRowRowInputFormat.h>
#include <Processors/Formats/ISchemaReader.h>
#include <Formats/FormatSettings.h>
#include <Common/HashTable/HashMap.h>
namespace DB
{
class ReadBuffer;
class JSONObjectEachRowInputFormat final : public JSONEachRowRowInputFormat
{
public:
JSONObjectEachRowInputFormat(
ReadBuffer & in_,
const Block & header_,
Params params_,
const FormatSettings & format_settings_);
String getName() const override { return "JSONObjectEachRowInputFormat"; }
private:
void readPrefix() override;
void readSuffix() override {}
void readRowStart() override;
bool checkEndOfData(bool is_first_row) override;
};
class JSONObjectEachRowSchemaReader : public IRowWithNamesSchemaReader
{
public:
JSONObjectEachRowSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_);
private:
NamesAndTypesList readRowAndGetNamesAndDataTypes(bool & eof) override;
void transformTypesIfNeeded(DataTypePtr & type, DataTypePtr & new_type) override;
bool first_row = true;
};
}

View File

@ -0,0 +1,57 @@
#include <Processors/Formats/Impl/JSONObjectEachRowRowOutputFormat.h>
#include <Formats/JSONUtils.h>
#include <IO/WriteHelpers.h>
namespace DB
{
JSONObjectEachRowRowOutputFormat::JSONObjectEachRowRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & settings_)
: JSONEachRowRowOutputFormat(out_, header_, params_, settings_)
{
}
void JSONObjectEachRowRowOutputFormat::writePrefix()
{
JSONUtils::writeObjectStart(*ostr);
}
void JSONObjectEachRowRowOutputFormat::writeRowStartDelimiter()
{
++row_num;
String title = "row_" + std::to_string(row_num);
JSONUtils::writeCompactObjectStart(*ostr, 1, title.c_str());
}
void JSONObjectEachRowRowOutputFormat::writeRowEndDelimiter()
{
JSONUtils::writeCompactObjectEnd(*ostr);
field_number = 0;
}
void JSONObjectEachRowRowOutputFormat::writeRowBetweenDelimiter()
{
JSONUtils::writeFieldDelimiter(*ostr, 1);
}
void JSONObjectEachRowRowOutputFormat::writeSuffix()
{
JSONUtils::writeObjectEnd(*ostr);
writeChar('\n', *ostr);
}
void registerOutputFormatJSONObjectEachRow(FormatFactory & factory)
{
factory.registerOutputFormat("JSONObjectEachRow", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & _format_settings)
{
FormatSettings settings = _format_settings;
settings.json.serialize_as_strings = false;
return std::make_shared<JSONObjectEachRowRowOutputFormat>(buf, sample, params, settings);
});
factory.markOutputFormatSupportsParallelFormatting("JSONObjectEachRow");
}
}

View File

@ -0,0 +1,42 @@
#pragma once
#include <Core/Block.h>
#include <IO/WriteBuffer.h>
#include <Processors/Formats/Impl/JSONEachRowRowOutputFormat.h>
#include <Formats/FormatSettings.h>
namespace DB
{
/* Outputs data as a single JSON Object with rows as fields:
* {
* "row_1": {"num": 42, "str": "hello", "arr": [0,1]},
* "row_2": {"num": 43, "str": "hello", "arr": [0,1,2]},
* "row_3": {"num": 44, "str": "hello", "arr": [0,1,2,3]},
* }
*/
class JSONObjectEachRowRowOutputFormat : public JSONEachRowRowOutputFormat
{
public:
JSONObjectEachRowRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSettings & settings_);
String getName() const override { return "JSONObjectEachRowRowOutputFormat"; }
private:
void writeRowStartDelimiter() override;
void writeRowEndDelimiter() override;
void writeRowBetweenDelimiter() override;
void writePrefix() override;
void writeSuffix() override;
size_t row_num = 0;
};
}

View File

@ -0,0 +1,78 @@
#include <Processors/Formats/Impl/JSONRowInputFormat.h>
#include <Formats/JSONUtils.h>
#include <Formats/FormatFactory.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
}
JSONRowInputFormat::JSONRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_)
: JSONEachRowRowInputFormat(in_, header_, params_, format_settings_, false), validate_types_from_metadata(format_settings_.json.validate_types_from_metadata)
{
}
void JSONRowInputFormat::readPrefix()
{
skipBOMIfExists(*in);
JSONUtils::skipObjectStart(*in);
if (validate_types_from_metadata)
JSONUtils::readMetadataAndValidateHeader(*in, getPort().getHeader());
else
JSONUtils::readMetadata(*in);
JSONUtils::skipComma(*in);
if (!JSONUtils::skipUntilFieldInObject(*in, "data"))
throw Exception(ErrorCodes::INCORRECT_DATA, "Expected field \"data\" with table content");
JSONUtils::skipArrayStart(*in);
data_in_square_brackets = true;
}
void JSONRowInputFormat::readSuffix()
{
JSONUtils::skipArrayEnd(*in);
JSONUtils::skipTheRestOfObject(*in);
}
JSONRowSchemaReader::JSONRowSchemaReader(ReadBuffer & in_) : ISchemaReader(in_)
{
}
NamesAndTypesList JSONRowSchemaReader::readSchema()
{
skipBOMIfExists(in);
JSONUtils::skipObjectStart(in);
return JSONUtils::readMetadata(in);
}
void registerInputFormatJSON(FormatFactory & factory)
{
factory.registerInputFormat("JSON", [](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
return std::make_shared<JSONRowInputFormat>(buf, sample, std::move(params), settings);
});
factory.markFormatSupportsSubsetOfColumns("JSON");
}
void registerJSONSchemaReader(FormatFactory & factory)
{
auto register_schema_reader = [&](const String & format)
{
factory.registerSchemaReader(
format, [](ReadBuffer & buf, const FormatSettings &) { return std::make_unique<JSONRowSchemaReader>(buf); });
};
register_schema_reader("JSON");
/// JSONCompact has the same suffix with metadata.
register_schema_reader("JSONCompact");
}
}

View File

@ -0,0 +1,43 @@
#pragma once
#include <Core/Block.h>
#include <Processors/Formats/Impl/JSONEachRowRowInputFormat.h>
#include <Processors/Formats/ISchemaReader.h>
#include <Formats/FormatSettings.h>
namespace DB
{
class ReadBuffer;
class JSONRowInputFormat final : public JSONEachRowRowInputFormat
{
public:
JSONRowInputFormat(
ReadBuffer & in_,
const Block & header_,
Params params_,
const FormatSettings & format_settings_);
String getName() const override { return "JSONRowInputFormat"; }
private:
void readPrefix() override;
void readSuffix() override;
const bool validate_types_from_metadata;
};
class JSONRowSchemaReader : public ISchemaReader
{
public:
JSONRowSchemaReader(ReadBuffer & in_);
NamesAndTypesList readSchema() override;
bool hasStrictOrderOfColumns() const override { return false; }
};
}

View File

@ -14,26 +14,16 @@ JSONRowOutputFormat::JSONRowOutputFormat(
const RowOutputFormatParams & params_,
const FormatSettings & settings_,
bool yield_strings_)
: IRowOutputFormat(header, out_, params_), settings(settings_), yield_strings(yield_strings_)
: RowOutputFormatWithUTF8ValidationAdaptor(true, header, out_, params_), settings(settings_), yield_strings(yield_strings_)
{
bool need_validate_utf8 = false;
fields = header.getNamesAndTypes();
JSONUtils::makeNamesAndTypesWithValidUTF8(fields, settings, need_validate_utf8);
if (need_validate_utf8)
{
validating_ostr = std::make_unique<WriteBufferValidUTF8>(out);
ostr = validating_ostr.get();
}
else
ostr = &out;
names = JSONUtils::makeNamesValidJSONStrings(header.getNames(), settings, true);
}
void JSONRowOutputFormat::writePrefix()
{
JSONUtils::writeObjectStart(*ostr);
JSONUtils::writeMetadata(fields, settings, *ostr);
JSONUtils::writeMetadata(names, types, settings, *ostr);
JSONUtils::writeFieldDelimiter(*ostr, 2);
JSONUtils::writeArrayStart(*ostr, 1, "data");
}
@ -41,7 +31,7 @@ void JSONRowOutputFormat::writePrefix()
void JSONRowOutputFormat::writeField(const IColumn & column, const ISerialization & serialization, size_t row_num)
{
JSONUtils::writeFieldFromColumn(column, serialization, row_num, yield_strings, settings, *ostr, fields[field_number].name, 3);
JSONUtils::writeFieldFromColumn(column, serialization, row_num, yield_strings, settings, *ostr, names[field_number], 3);
++field_number;
}
@ -84,7 +74,7 @@ void JSONRowOutputFormat::writeBeforeTotals()
void JSONRowOutputFormat::writeTotals(const Columns & columns, size_t row_num)
{
JSONUtils::writeColumns(columns, fields, serializations, row_num, yield_strings, settings, *ostr, 2);
JSONUtils::writeColumns(columns, names, serializations, row_num, yield_strings, settings, *ostr, 2);
}
void JSONRowOutputFormat::writeAfterTotals()
@ -101,7 +91,7 @@ void JSONRowOutputFormat::writeBeforeExtremes()
void JSONRowOutputFormat::writeExtremesElement(const char * title, const Columns & columns, size_t row_num)
{
JSONUtils::writeObjectStart(*ostr, 2, title);
JSONUtils::writeColumns(columns, fields, serializations, row_num, yield_strings, settings, *ostr, 3);
JSONUtils::writeColumns(columns, names, serializations, row_num, yield_strings, settings, *ostr, 3);
JSONUtils::writeObjectEnd(*ostr, 2);
}

View File

@ -4,7 +4,7 @@
#include <IO/Progress.h>
#include <IO/WriteBuffer.h>
#include <Common/Stopwatch.h>
#include <Processors/Formats/IRowOutputFormat.h>
#include <Processors/Formats/OutputFormatWithUTF8ValidationAdaptor.h>
#include <Formats/FormatSettings.h>
@ -13,7 +13,7 @@ namespace DB
/** Stream for output data in JSON format.
*/
class JSONRowOutputFormat : public IRowOutputFormat
class JSONRowOutputFormat : public RowOutputFormatWithUTF8ValidationAdaptor
{
public:
JSONRowOutputFormat(
@ -29,14 +29,6 @@ public:
String getContentType() const override { return "application/json; charset=UTF-8"; }
void flush() override
{
ostr->next();
if (validating_ostr)
out.next();
}
void setRowsBeforeLimit(size_t rows_before_limit_) override
{
statistics.applied_limit = true;
@ -70,12 +62,9 @@ protected:
void onRowsReadBeforeUpdate() override { row_count = getRowsReadBefore(); }
std::unique_ptr<WriteBuffer> validating_ostr; /// Validates UTF-8 sequences, replaces bad sequences with replacement character.
WriteBuffer * ostr;
size_t field_number = 0;
size_t row_count = 0;
NamesAndTypes fields; /// The field names are pre-escaped to be put into JSON string literal.
Names names; /// The column names are pre-escaped to be put into JSON string literal.
Statistics statistics;
FormatSettings settings;

View File

@ -302,14 +302,17 @@ void registerTSVSchemaReader(FormatFactory & factory)
{
return std::make_shared<TabSeparatedSchemaReader>(buf, with_names, with_types, is_raw, settings);
});
factory.registerAdditionalInfoForSchemaCacheGetter(format_name, [with_names, is_raw](const FormatSettings & settings)
if (!with_types)
{
String result = getAdditionalFormatInfoByEscapingRule(
settings, is_raw ? FormatSettings::EscapingRule::Raw : FormatSettings::EscapingRule::Escaped);
if (!with_names)
result += fmt::format(", column_names_for_schema_inference={}", settings.column_names_for_schema_inference);
return result;
});
factory.registerAdditionalInfoForSchemaCacheGetter(format_name, [with_names, is_raw](const FormatSettings & settings)
{
String result = getAdditionalFormatInfoByEscapingRule(
settings, is_raw ? FormatSettings::EscapingRule::Raw : FormatSettings::EscapingRule::Escaped);
if (!with_names)
result += fmt::format(", column_names_for_schema_inference={}", settings.column_names_for_schema_inference);
return result;
});
}
};
registerWithNamesAndTypes(is_raw ? "TabSeparatedRaw" : "TabSeparated", register_func);

View File

@ -8,17 +8,13 @@ namespace DB
{
XMLRowOutputFormat::XMLRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params_), fields(header_.getNamesAndTypes()), format_settings(format_settings_)
: RowOutputFormatWithUTF8ValidationAdaptor(true, header_, out_, params_), fields(header_.getNamesAndTypes()), format_settings(format_settings_)
{
const auto & sample = getPort(PortKind::Main).getHeader();
field_tag_names.resize(sample.columns());
bool need_validate_utf8 = false;
for (size_t i = 0; i < sample.columns(); ++i)
{
if (!sample.getByPosition(i).type->textCanContainOnlyValidUTF8())
need_validate_utf8 = true;
/// As element names, we will use the column name if it has a valid form, or "field", otherwise.
/// The condition below is more strict than the XML standard requires.
bool is_column_name_suitable = true;
@ -42,14 +38,6 @@ XMLRowOutputFormat::XMLRowOutputFormat(WriteBuffer & out_, const Block & header_
? fields[i].name
: "field";
}
if (need_validate_utf8)
{
validating_ostr = std::make_unique<WriteBufferValidUTF8>(out);
ostr = validating_ostr.get();
}
else
ostr = &out;
}

View File

@ -5,7 +5,7 @@
#include <IO/WriteBuffer.h>
#include <Common/Stopwatch.h>
#include <Formats/FormatSettings.h>
#include <Processors/Formats/IRowOutputFormat.h>
#include <Processors/Formats/OutputFormatWithUTF8ValidationAdaptor.h>
namespace DB
@ -13,7 +13,7 @@ namespace DB
/** A stream for outputting data in XML format.
*/
class XMLRowOutputFormat final : public IRowOutputFormat
class XMLRowOutputFormat final : public RowOutputFormatWithUTF8ValidationAdaptor
{
public:
XMLRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_);
@ -40,14 +40,6 @@ private:
void writeBeforeExtremes() override;
void writeAfterExtremes() override;
void flush() override
{
ostr->next();
if (validating_ostr)
out.next();
}
void setRowsBeforeLimit(size_t rows_before_limit_) override
{
statistics.applied_limit = true;
@ -64,9 +56,6 @@ private:
void writeRowsBeforeLimitAtLeast();
void writeStatistics();
std::unique_ptr<WriteBuffer> validating_ostr; /// Validates UTF-8 sequences, replaces bad sequences with replacement character.
WriteBuffer * ostr;
size_t field_number = 0;
size_t row_count = 0;
NamesAndTypes fields;

View File

@ -0,0 +1,56 @@
#pragma once
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Formats/IRowOutputFormat.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteBufferValidUTF8.h>
namespace DB
{
template <typename Base, typename... Args>
class OutputFormatWithUTF8ValidationAdaptorBase : public Base
{
public:
OutputFormatWithUTF8ValidationAdaptorBase(bool validate_utf8, const Block & header, WriteBuffer & out_, Args... args)
: Base(header, out_, std::forward<Args>(args)...)
{
bool values_can_contain_invalid_utf8 = false;
for (const auto & type : this->getPort(IOutputFormat::PortKind::Main).getHeader().getDataTypes())
{
if (!type->textCanContainOnlyValidUTF8())
values_can_contain_invalid_utf8 = true;
}
if (validate_utf8 && values_can_contain_invalid_utf8)
{
validating_ostr = std::make_unique<WriteBufferValidUTF8>(this->out);
ostr = validating_ostr.get();
}
else
ostr = &this->out;
}
void flush() override
{
ostr->next();
if (validating_ostr)
this->out.next();
}
protected:
/// Point to validating_ostr or out from IOutputFormat, should be used in derived classes instead of out.
WriteBuffer * ostr;
private:
/// Validates UTF-8 sequences, replaces bad sequences with replacement character.
std::unique_ptr<WriteBuffer> validating_ostr;
};
using OutputFormatWithUTF8ValidationAdaptor = OutputFormatWithUTF8ValidationAdaptorBase<IOutputFormat>;
using RowOutputFormatWithUTF8ValidationAdaptor = OutputFormatWithUTF8ValidationAdaptorBase<IRowOutputFormat, const IRowOutputFormat::Params &>;
}

View File

@ -57,6 +57,8 @@ private:
bool with_names;
bool with_types;
std::unique_ptr<FormatWithNamesAndTypesReader> format_reader;
protected:
std::unordered_map<String, size_t> column_indexes_by_names;
};

View File

@ -29,7 +29,7 @@ ERROR: There is no delimiter after last field: expected "<LINE FEED>", got "<TAB
ERROR: There is no delimiter after last field: expected "<LINE FEED>", got "Hello<LINE FEED>"
Column 0, name: t, type: DateTime, ERROR: text "<LINE FEED>" is not like DateTime
JSONCompactEachRow
Column 2, name: d, type: Decimal(18, 10), parsed text: " 123456789"ERROR
Column 2, name: d, type: Decimal(18, 10), parsed text: "123456789"ERROR
Column 0, name: t, type: DateTime, parsed text: "<DOUBLE QUOTE>2020-04-21 12:34:56"ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.
ERROR: garbage after DateTime: "7, <DOUBLE QUOTE>Hello<DOUBLE QUOTE>"
ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.

View File

@ -7,7 +7,10 @@ CSVWithNamesAndTypes
CustomSeparated
CustomSeparatedWithNames
CustomSeparatedWithNamesAndTypes
JSON
JSONColumns
JSONColumnsWithMetadata
JSONCompact
JSONCompactColumns
JSONCompactEachRow
JSONCompactEachRowWithNames
@ -17,6 +20,7 @@ JSONCompactStringsEachRowWithNames
JSONCompactStringsEachRowWithNamesAndTypes
JSONEachRow
JSONLines
JSONObjectEachRow
JSONStringsEachRow
MsgPack
NDJSON

View File

@ -0,0 +1,18 @@
n UInt32
s String
a Array(UInt64)
0 Hello []
1 Hello [0]
2 Hello [0,1]
n UInt32
s String
a Array(UInt64)
0 Hello []
1 Hello [0]
2 Hello [0,1]
n UInt32
s String
a Array(UInt64)
0 Hello []
1 Hello [0]
2 Hello [0,1]

View File

@ -0,0 +1,13 @@
-- Tags: no-parallel, no-fasttest
insert into function file(02416_data.json) select number::UInt32 as n, 'Hello' as s, range(number) as a from numbers(3) settings engine_file_truncate_on_insert=1;
desc file(02416_data.json);
select * from file(02416_data.json);
insert into function file(02416_data.jsonCompact) select number::UInt32 as n, 'Hello' as s, range(number) as a from numbers(3) settings engine_file_truncate_on_insert=1;
desc file(02416_data.jsonCompact);
select * from file(02416_data.jsonCompact);
insert into function file(02416_data.jsonColumnsWithMetadata) select number::UInt32 as n, 'Hello' as s, range(number) as a from numbers(3) settings engine_file_truncate_on_insert=1;
desc file(02416_data.jsonColumnsWithMetadata);
select * from file(02416_data.jsonColumnsWithMetadata);

View File

@ -0,0 +1,11 @@
{
"row_1": {"number":"0","str":"Hello","arr":[]},
"row_2": {"number":"1","str":"Hello","arr":["0"]},
"row_3": {"number":"2","str":"Hello","arr":["0","1"]}
}
number Nullable(Int64)
str Nullable(String)
arr Array(Nullable(Int64))
0 Hello []
1 Hello [0]
2 Hello [0,1]

View File

@ -0,0 +1,6 @@
-- Tags: no-parallel, no-fasttest
select number, 'Hello' as str, range(number) as arr from numbers(3) format JSONObjectEachRow;
insert into function file(02417_data.jsonObjectEachRow) select number, 'Hello' as str, range(number) as arr from numbers(3) settings engine_file_truncate_on_insert=1;
desc file(02417_data.jsonObjectEachRow);
select * from file(02417_data.jsonObjectEachRow);

View File

@ -0,0 +1,2 @@
{"d":"42.42"}
42.42

View File

@ -0,0 +1,4 @@
select toDecimal128(42.42, 5) as d format JSONEachRow settings output_format_json_quote_decimals=1;
insert into function file(02421_data.jsonl) select '42.42' as d settings engine_file_truncate_on_insert=1;
select * from file(02421_data.jsonl, auto, 'd Decimal32(3)');

View File

@ -0,0 +1,5 @@
123
str
['123','str']
['123','456']
['str','rts']

View File

@ -0,0 +1,7 @@
-- Tags: no-fasttest
set input_format_json_read_numbers_as_strings=1;
select * from format(JSONEachRow, '{"x" : 123}\n{"x" : "str"}');
select * from format(JSONEachRow, '{"x" : [123, "str"]}');
select * from format(JSONEachRow, '{"x" : [123, "456"]}\n{"x" : ["str", "rts"]}');

View File

@ -0,0 +1,2 @@
{"x":"42424.4242424242","arr":["42.42","42.42"],"tuple":["42.42"]}
{"x":42424.4242424242,"arr":[42.42,42.42],"tuple":[42.42]}

View File

@ -0,0 +1,3 @@
select 42424.4242424242::Float64 as x, [42.42::Float64, 42.42::Float64] as arr, tuple(42.42::Float64) as tuple format JSONEachRow settings output_format_json_quote_64bit_floats=1;
select 42424.4242424242::Float64 as x, [42.42::Float64, 42.42::Float64] as arr, tuple(42.42::Float64) as tuple format JSONEachRow settings output_format_json_quote_64bit_floats=0;

View File

@ -19,10 +19,10 @@ e3231b1c8187de4da6752d692b2ddba9 -
JSONCompactStringsEachRowWithNames, true
e3231b1c8187de4da6752d692b2ddba9 -
JSONCompactEachRowWithNamesAndTypes, false
21302d11da0bf8d37ab599e28a51bac2 -
d40c4327c63eded184eee185a5330e12 -
JSONCompactEachRowWithNamesAndTypes, true
21302d11da0bf8d37ab599e28a51bac2 -
d40c4327c63eded184eee185a5330e12 -
JSONCompactStringsEachRowWithNamesAndTypes, false
21302d11da0bf8d37ab599e28a51bac2 -
d40c4327c63eded184eee185a5330e12 -
JSONCompactStringsEachRowWithNamesAndTypes, true
21302d11da0bf8d37ab599e28a51bac2 -
d40c4327c63eded184eee185a5330e12 -

View File

@ -64,6 +64,7 @@ JSONCompactStringsEachRowWithNames
JSONCompactStringsEachRowWithNamesAndTypes
JSONEachRow
JSONEachRowWithProgress
JSONObjectEachRow
JSONStrings
JSONStringsEachRow
JSONStringsEachRowWithProgress
@ -300,6 +301,7 @@ jsoncompactstringseachrowwithnames
jsoncompactstringseachrowwithnamesandtypes
jsoneachrow
jsoneachrowwithprogress
jsonobjecteachrow
jsonstrings
jsonstringseachrow
jsonstringseachrowwithprogress