Add JSONCompactStrings formats

This commit is contained in:
hcz 2020-09-02 12:05:02 +08:00
parent ac5877e601
commit a80c1adee8
30 changed files with 621 additions and 812 deletions

View File

@ -10,45 +10,51 @@ results of a `SELECT`, and to perform `INSERT`s into a file-backed table.
The supported formats are: The supported formats are:
| Format | Input | Output | | Format | Input | Output |
|-----------------------------------------------------------------|-------|--------| |-----------------------------------------------------------------------------------------|-------|--------|
| [TabSeparated](#tabseparated) | ✔ | ✔ | | [TabSeparated](#tabseparated) | ✔ | ✔ |
| [TabSeparatedRaw](#tabseparatedraw) | ✔ | ✔ | | [TabSeparatedRaw](#tabseparatedraw) | ✔ | ✔ |
| [TabSeparatedWithNames](#tabseparatedwithnames) | ✔ | ✔ | | [TabSeparatedWithNames](#tabseparatedwithnames) | ✔ | ✔ |
| [TabSeparatedWithNamesAndTypes](#tabseparatedwithnamesandtypes) | ✔ | ✔ | | [TabSeparatedWithNamesAndTypes](#tabseparatedwithnamesandtypes) | ✔ | ✔ |
| [Template](#format-template) | ✔ | ✔ | | [Template](#format-template) | ✔ | ✔ |
| [TemplateIgnoreSpaces](#templateignorespaces) | ✔ | ✗ | | [TemplateIgnoreSpaces](#templateignorespaces) | ✔ | ✗ |
| [CSV](#csv) | ✔ | ✔ | | [CSV](#csv) | ✔ | ✔ |
| [CSVWithNames](#csvwithnames) | ✔ | ✔ | | [CSVWithNames](#csvwithnames) | ✔ | ✔ |
| [CustomSeparated](#format-customseparated) | ✔ | ✔ | | [CustomSeparated](#format-customseparated) | ✔ | ✔ |
| [Values](#data-format-values) | ✔ | ✔ | | [Values](#data-format-values) | ✔ | ✔ |
| [Vertical](#vertical) | ✗ | ✔ | | [Vertical](#vertical) | ✗ | ✔ |
| [VerticalRaw](#verticalraw) | ✗ | ✔ | | [VerticalRaw](#verticalraw) | ✗ | ✔ |
| [JSON](#json) | ✗ | ✔ | | [JSON](#json) | ✗ | ✔ |
| [JSONCompact](#jsoncompact) | ✗ | ✔ | | [JSONString](#jsonstring) | ✗ | ✔ |
| [JSONStrings](#jsonstrings) | ✗ | ✔ | | [JSONCompact](#jsoncompact) | ✗ | ✔ |
| [JSONEachRow](#jsoneachrow) | ✔ | ✔ | | [JSONCompactString](#jsoncompactstring) | ✗ | ✔ |
| [JSONCompactEachRow](#jsoncompacteachrow) | ✔ | ✔ | | [JSONEachRow](#jsoneachrow) | ✔ | ✔ |
| [JSONStringsEachRow](#jsonstringseachrow) | ✔ | ✔ | | [JSONEachRowWithProgress](#jsoneachrowwithprogress) | ✗ | ✔ |
| [TSKV](#tskv) | ✔ | ✔ | | [JSONStringEachRow](#jsonstringeachrow) | ✔ | ✔ |
| [Pretty](#pretty) | ✗ | ✔ | | [JSONStringEachRowWithProgress](#jsonstringeachrowwithprogress) | ✗ | ✔ |
| [PrettyCompact](#prettycompact) | ✗ | ✔ | | [JSONCompactEachRow](#jsoncompacteachrow) | ✔ | ✔ |
| [PrettyCompactMonoBlock](#prettycompactmonoblock) | ✗ | ✔ | | [JSONCompactEachRowWithNamesAndTypes](#jsoncompacteachrowwithnamesandtypes) | ✔ | ✔ |
| [PrettyNoEscapes](#prettynoescapes) | ✗ | ✔ | | [JSONCompactStringEachRow](#jsoncompactstringeachrow) | ✔ | ✔ |
| [PrettySpace](#prettyspace) | ✗ | ✔ | | [JSONCompactStringEachRowWithNamesAndTypes](#jsoncompactstringeachrowwithnamesandtypes) | ✔ | ✔ |
| [Protobuf](#protobuf) | ✔ | ✔ | | [TSKV](#tskv) | ✔ | ✔ |
| [Avro](#data-format-avro) | ✔ | ✔ | | [Pretty](#pretty) | ✗ | ✔ |
| [AvroConfluent](#data-format-avro-confluent) | ✔ | ✗ | | [PrettyCompact](#prettycompact) | ✗ | ✔ |
| [Parquet](#data-format-parquet) | ✔ | ✔ | | [PrettyCompactMonoBlock](#prettycompactmonoblock) | ✗ | ✔ |
| [Arrow](#data-format-arrow) | ✔ | ✔ | | [PrettyNoEscapes](#prettynoescapes) | ✗ | ✔ |
| [ArrowStream](#data-format-arrow-stream) | ✔ | ✔ | | [PrettySpace](#prettyspace) | ✗ | ✔ |
| [ORC](#data-format-orc) | ✔ | ✗ | | [Protobuf](#protobuf) | ✔ | ✔ |
| [RowBinary](#rowbinary) | ✔ | ✔ | | [Avro](#data-format-avro) | ✔ | ✔ |
| [RowBinaryWithNamesAndTypes](#rowbinarywithnamesandtypes) | ✔ | ✔ | | [AvroConfluent](#data-format-avro-confluent) | ✔ | ✗ |
| [Native](#native) | ✔ | ✔ | | [Parquet](#data-format-parquet) | ✔ | ✔ |
| [Null](#null) | ✗ | ✔ | | [Arrow](#data-format-arrow) | ✔ | ✔ |
| [XML](#xml) | ✗ | ✔ | | [ArrowStream](#data-format-arrow-stream) | ✔ | ✔ |
| [CapnProto](#capnproto) | ✔ | ✗ | | [ORC](#data-format-orc) | ✔ | ✗ |
| [RowBinary](#rowbinary) | ✔ | ✔ |
| [RowBinaryWithNamesAndTypes](#rowbinarywithnamesandtypes) | ✔ | ✔ |
| [Native](#native) | ✔ | ✔ |
| [Null](#null) | ✗ | ✔ |
| [XML](#xml) | ✗ | ✔ |
| [CapnProto](#capnproto) | ✔ | ✗ |
You can control some format processing parameters with the ClickHouse settings. For more information read the [Settings](../operations/settings/settings.md) section. You can control some format processing parameters with the ClickHouse settings. For more information read the [Settings](../operations/settings/settings.md) section.
@ -395,62 +401,41 @@ SELECT SearchPhrase, count() AS c FROM test.hits GROUP BY SearchPhrase WITH TOTA
"meta": "meta":
[ [
{ {
"name": "SearchPhrase", "name": "'hello'",
"type": "String" "type": "String"
}, },
{ {
"name": "c", "name": "multiply(42, number)",
"type": "UInt64" "type": "UInt64"
},
{
"name": "range(5)",
"type": "Array(UInt8)"
} }
], ],
"data": "data":
[ [
{ {
"SearchPhrase": "", "'hello'": "hello",
"c": "8267016" "multiply(42, number)": "0",
"range(5)": [0,1,2,3,4]
}, },
{ {
"SearchPhrase": "bathroom interior design", "'hello'": "hello",
"c": "2166" "multiply(42, number)": "42",
"range(5)": [0,1,2,3,4]
}, },
{ {
"SearchPhrase": "yandex", "'hello'": "hello",
"c": "1655" "multiply(42, number)": "84",
}, "range(5)": [0,1,2,3,4]
{
"SearchPhrase": "spring 2014 fashion",
"c": "1549"
},
{
"SearchPhrase": "freeform photos",
"c": "1480"
} }
], ],
"totals": "rows": 3,
{
"SearchPhrase": "",
"c": "8873898"
},
"extremes": "rows_before_limit_at_least": 3
{
"min":
{
"SearchPhrase": "",
"c": "1480"
},
"max":
{
"SearchPhrase": "",
"c": "8267016"
}
},
"rows": 5,
"rows_before_limit_at_least": 141137
} }
``` ```
@ -471,73 +456,166 @@ ClickHouse supports [NULL](../sql-reference/syntax.md), which is displayed as `n
See also the [JSONEachRow](#jsoneachrow) format. See also the [JSONEachRow](#jsoneachrow) format.
## JSONCompact {#jsoncompact} ## JSONString {#jsonstring}
Differs from JSON only in that data rows are output in arrays of any element type, not in objects. Differs from JSON only in that data fields are output in strings, not in typed json values.
Example: Example:
``` json ```json
{ {
"meta": "meta":
[ [
{ {
"name": "SearchPhrase", "name": "'hello'",
"type": "String" "type": "String"
}, },
{ {
"name": "c", "name": "multiply(42, number)",
"type": "UInt64" "type": "UInt64"
},
{
"name": "range(5)",
"type": "Array(UInt8)"
} }
], ],
"data": "data":
[ [
["", "8267016"], {
["bathroom interior design", "2166"], "'hello'": "hello",
["yandex", "1655"], "multiply(42, number)": "0",
["fashion trends spring 2014", "1549"], "range(5)": "[0,1,2,3,4]"
["freeform photo", "1480"] },
{
"'hello'": "hello",
"multiply(42, number)": "42",
"range(5)": "[0,1,2,3,4]"
},
{
"'hello'": "hello",
"multiply(42, number)": "84",
"range(5)": "[0,1,2,3,4]"
}
], ],
"totals": ["","8873898"], "rows": 3,
"extremes": "rows_before_limit_at_least": 3
{
"min": ["","1480"],
"max": ["","8267016"]
},
"rows": 5,
"rows_before_limit_at_least": 141137
} }
``` ```
This format is only appropriate for outputting a query result, but not for parsing (retrieving data to insert in a table). ## JSONCompact {#jsoncompact}
See also the `JSONEachRow` format. ## JSONCompactString {#jsoncompactstring}
## JSONStrings {#jsonstrings} Differs from JSON only in that data rows are output in arrays, not in objects.
Differs from JSON and JSONCompact only in that data rows are output in arrays of strings. Example:
This format is only appropriate for outputting a query result, but not for parsing (retrieving data to insert in a table). ``` json
See also the `JSONEachRow` format. // JSONCompact
{
"meta":
[
{
"name": "'hello'",
"type": "String"
},
{
"name": "multiply(42, number)",
"type": "UInt64"
},
{
"name": "range(5)",
"type": "Array(UInt8)"
}
],
"data":
[
["hello", "0", [0,1,2,3,4]],
["hello", "42", [0,1,2,3,4]],
["hello", "84", [0,1,2,3,4]]
],
"rows": 3,
"rows_before_limit_at_least": 3
}
```
```json
// JSONCompactString
{
"meta":
[
{
"name": "'hello'",
"type": "String"
},
{
"name": "multiply(42, number)",
"type": "UInt64"
},
{
"name": "range(5)",
"type": "Array(UInt8)"
}
],
"data":
[
["hello", "0", "[0,1,2,3,4]"],
["hello", "42", "[0,1,2,3,4]"],
["hello", "84", "[0,1,2,3,4]"]
],
"rows": 3,
"rows_before_limit_at_least": 3
}
```
## JSONEachRow {#jsoneachrow} ## JSONEachRow {#jsoneachrow}
## JSONStringEachRow {#jsonstringeachrow}
## JSONCompactEachRow {#jsoncompacteachrow} ## JSONCompactEachRow {#jsoncompacteachrow}
## JSONStringsEachRow {#jsonstringseachrow} ## JSONCompactStringEachRow {#jsoncompactstringeachrow}
When using these formats, ClickHouse outputs rows as separated, newline-delimited JSON values, but the data as a whole is not valid JSON. When using these formats, ClickHouse outputs rows as separated, newline-delimited JSON values, but the data as a whole is not valid JSON.
``` json ``` json
{"some_int":42,"some_str":"hello","some_tuple":[1,"a"]} // JSONEachRow {"some_int":42,"some_str":"hello","some_tuple":[1,"a"]} // JSONEachRow
[42,"hello",[1,"a"]] // JSONCompactEachRow [42,"hello",[1,"a"]] // JSONCompactEachRow
["42","hello","(2,'a')"] // JSONStringsEachRow ["42","hello","(2,'a')"] // JSONCompactStringsEachRow
``` ```
When inserting the data, you should provide a separate JSON value for each row. When inserting the data, you should provide a separate JSON value for each row.
## JSONEachRowWithProgress {#jsoneachrowwithprogress}
## JSONStringEachRowWithProgress {#jsonstringeachrowwithprogress}
Differs from JSONEachRow/JSONStringEachRow in that ClickHouse will also yield progress information as JSON objects.
```json
{"row":{"'hello'":"hello","multiply(42, number)":"0","range(5)":[0,1,2,3,4]}}
{"row":{"'hello'":"hello","multiply(42, number)":"42","range(5)":[0,1,2,3,4]}}
{"row":{"'hello'":"hello","multiply(42, number)":"84","range(5)":[0,1,2,3,4]}}
{"progress":{"read_rows":"3","read_bytes":"24","written_rows":"0","written_bytes":"0","total_rows_to_read":"3"}}
```
## JSONCompactEachRowWithNamesAndTypes {#jsoncompacteachrowwithnamesandtypes}
## JSONCompactStringEachRowWithNamesAndTypes {#jsoncompactstringeachrowwithnamesandtypes}
Differs from JSONCompactEachRow/JSONCompactStringEachRow in that the column names and types are written as the first two rows.
```json
["'hello'", "multiply(42, number)", "range(5)"]
["String", "UInt64", "Array(UInt8)"]
["hello", "0", [0,1,2,3,4]]
["hello", "42", [0,1,2,3,4]]
["hello", "84", [0,1,2,3,4]]
```
### Inserting Data {#inserting-data} ### Inserting Data {#inserting-data}
``` sql ``` sql

View File

@ -352,8 +352,6 @@ void registerInputFormatProcessorJSONEachRow(FormatFactory & factory);
void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory); void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory);
void registerInputFormatProcessorJSONCompactEachRow(FormatFactory & factory); void registerInputFormatProcessorJSONCompactEachRow(FormatFactory & factory);
void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory); void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory);
void registerInputFormatProcessorJSONStringsEachRow(FormatFactory & factory);
void registerOutputFormatProcessorJSONStringsEachRow(FormatFactory & factory);
void registerInputFormatProcessorProtobuf(FormatFactory & factory); void registerInputFormatProcessorProtobuf(FormatFactory & factory);
void registerOutputFormatProcessorProtobuf(FormatFactory & factory); void registerOutputFormatProcessorProtobuf(FormatFactory & factory);
void registerInputFormatProcessorTemplate(FormatFactory & factory); void registerInputFormatProcessorTemplate(FormatFactory & factory);
@ -380,7 +378,6 @@ void registerOutputFormatProcessorVertical(FormatFactory & factory);
void registerOutputFormatProcessorJSON(FormatFactory & factory); void registerOutputFormatProcessorJSON(FormatFactory & factory);
void registerOutputFormatProcessorJSONCompact(FormatFactory & factory); void registerOutputFormatProcessorJSONCompact(FormatFactory & factory);
void registerOutputFormatProcessorJSONEachRowWithProgress(FormatFactory & factory); void registerOutputFormatProcessorJSONEachRowWithProgress(FormatFactory & factory);
void registerOutputFormatProcessorJSONStrings(FormatFactory & factory);
void registerOutputFormatProcessorXML(FormatFactory & factory); void registerOutputFormatProcessorXML(FormatFactory & factory);
void registerOutputFormatProcessorODBCDriver2(FormatFactory & factory); void registerOutputFormatProcessorODBCDriver2(FormatFactory & factory);
void registerOutputFormatProcessorNull(FormatFactory & factory); void registerOutputFormatProcessorNull(FormatFactory & factory);
@ -421,8 +418,6 @@ FormatFactory::FormatFactory()
registerOutputFormatProcessorJSONEachRow(*this); registerOutputFormatProcessorJSONEachRow(*this);
registerInputFormatProcessorJSONCompactEachRow(*this); registerInputFormatProcessorJSONCompactEachRow(*this);
registerOutputFormatProcessorJSONCompactEachRow(*this); registerOutputFormatProcessorJSONCompactEachRow(*this);
registerInputFormatProcessorJSONStringsEachRow(*this);
registerOutputFormatProcessorJSONStringsEachRow(*this);
registerInputFormatProcessorProtobuf(*this); registerInputFormatProcessorProtobuf(*this);
registerOutputFormatProcessorProtobuf(*this); registerOutputFormatProcessorProtobuf(*this);
registerInputFormatProcessorTemplate(*this); registerInputFormatProcessorTemplate(*this);
@ -449,7 +444,6 @@ FormatFactory::FormatFactory()
registerOutputFormatProcessorJSON(*this); registerOutputFormatProcessorJSON(*this);
registerOutputFormatProcessorJSONCompact(*this); registerOutputFormatProcessorJSONCompact(*this);
registerOutputFormatProcessorJSONEachRowWithProgress(*this); registerOutputFormatProcessorJSONEachRowWithProgress(*this);
registerOutputFormatProcessorJSONStrings(*this);
registerOutputFormatProcessorXML(*this); registerOutputFormatProcessorXML(*this);
registerOutputFormatProcessorODBCDriver2(*this); registerOutputFormatProcessorODBCDriver2(*this);
registerOutputFormatProcessorNull(*this); registerOutputFormatProcessorNull(*this);

View File

@ -1,4 +1,5 @@
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromString.h>
#include <Processors/Formats/Impl/JSONCompactEachRowRowInputFormat.h> #include <Processors/Formats/Impl/JSONCompactEachRowRowInputFormat.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
@ -19,8 +20,9 @@ JSONCompactEachRowRowInputFormat::JSONCompactEachRowRowInputFormat(ReadBuffer &
const Block & header_, const Block & header_,
Params params_, Params params_,
const FormatSettings & format_settings_, const FormatSettings & format_settings_,
bool with_names_) bool with_names_,
: IRowInputFormat(header_, in_, std::move(params_)), format_settings(format_settings_), with_names(with_names_) bool yield_strings_)
: IRowInputFormat(header_, in_, std::move(params_)), format_settings(format_settings_), with_names(with_names_), yield_strings(yield_strings_)
{ {
const auto & sample = getPort().getHeader(); const auto & sample = getPort().getHeader();
size_t num_columns = sample.columns(); size_t num_columns = sample.columns();
@ -200,10 +202,25 @@ void JSONCompactEachRowRowInputFormat::readField(size_t index, MutableColumns &
{ {
read_columns[index] = true; read_columns[index] = true;
const auto & type = data_types[index]; const auto & type = data_types[index];
if (format_settings.null_as_default && !type->isNullable())
read_columns[index] = DataTypeNullable::deserializeTextJSON(*columns[index], in, format_settings, type); if (yield_strings)
{
// notice: null_as_default on "null" strings is not supported
String str;
readJSONString(str, in);
ReadBufferFromString buf(str);
type->deserializeAsWholeText(*columns[index], buf, format_settings);
}
else else
type->deserializeAsTextJSON(*columns[index], in, format_settings); {
if (format_settings.null_as_default && !type->isNullable())
read_columns[index] = DataTypeNullable::deserializeTextJSON(*columns[index], in, format_settings, type);
else
type->deserializeAsTextJSON(*columns[index], in, format_settings);
}
} }
catch (Exception & e) catch (Exception & e)
{ {
@ -225,7 +242,7 @@ void registerInputFormatProcessorJSONCompactEachRow(FormatFactory & factory)
IRowInputFormat::Params params, IRowInputFormat::Params params,
const FormatSettings & settings) const FormatSettings & settings)
{ {
return std::make_shared<JSONCompactEachRowRowInputFormat>(buf, sample, std::move(params), settings, false); return std::make_shared<JSONCompactEachRowRowInputFormat>(buf, sample, std::move(params), settings, false, false);
}); });
factory.registerInputFormatProcessor("JSONCompactEachRowWithNamesAndTypes", []( factory.registerInputFormatProcessor("JSONCompactEachRowWithNamesAndTypes", [](
@ -234,7 +251,25 @@ void registerInputFormatProcessorJSONCompactEachRow(FormatFactory & factory)
IRowInputFormat::Params params, IRowInputFormat::Params params,
const FormatSettings & settings) const FormatSettings & settings)
{ {
return std::make_shared<JSONCompactEachRowRowInputFormat>(buf, sample, std::move(params), settings, true); return std::make_shared<JSONCompactEachRowRowInputFormat>(buf, sample, std::move(params), settings, true, false);
});
factory.registerInputFormatProcessor("JSONCompactStringsEachRow", [](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
return std::make_shared<JSONCompactEachRowRowInputFormat>(buf, sample, std::move(params), settings, false, true);
});
factory.registerInputFormatProcessor("JSONCompactStringsEachRowWithNamesAndTypes", [](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
return std::make_shared<JSONCompactEachRowRowInputFormat>(buf, sample, std::move(params), settings, true, true);
}); });
} }

View File

@ -12,12 +12,18 @@ namespace DB
class ReadBuffer; class ReadBuffer;
/** A stream for reading data in JSONCompactEachRow and JSONCompactEachRowWithNamesAndTypes formats /** A stream for reading data in JSONCompactEachRow- formats
*/ */
class JSONCompactEachRowRowInputFormat : public IRowInputFormat class JSONCompactEachRowRowInputFormat : public IRowInputFormat
{ {
public: public:
JSONCompactEachRowRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_, bool with_names_); JSONCompactEachRowRowInputFormat(
ReadBuffer & in_,
const Block & header_,
Params params_,
const FormatSettings & format_settings_,
bool with_names_,
bool yield_strings_);
String getName() const override { return "JSONCompactEachRowRowInputFormat"; } String getName() const override { return "JSONCompactEachRowRowInputFormat"; }
@ -49,6 +55,7 @@ private:
std::vector<String> names_of_columns; std::vector<String> names_of_columns;
bool with_names; bool with_names;
bool yield_strings;
}; };
} }

View File

@ -12,8 +12,9 @@ JSONCompactEachRowRowOutputFormat::JSONCompactEachRowRowOutputFormat(WriteBuffer
const Block & header_, const Block & header_,
FormatFactory::WriteCallback callback, FormatFactory::WriteCallback callback,
const FormatSettings & settings_, const FormatSettings & settings_,
bool with_names_) bool with_names_,
: IRowOutputFormat(header_, out_, callback), settings(settings_), with_names(with_names_) bool yield_strings_)
: IRowOutputFormat(header_, out_, callback), settings(settings_), with_names(with_names_), yield_strings(yield_strings_)
{ {
const auto & sample = getPort(PortKind::Main).getHeader(); const auto & sample = getPort(PortKind::Main).getHeader();
NamesAndTypesList columns(sample.getNamesAndTypesList()); NamesAndTypesList columns(sample.getNamesAndTypesList());
@ -23,7 +24,15 @@ JSONCompactEachRowRowOutputFormat::JSONCompactEachRowRowOutputFormat(WriteBuffer
void JSONCompactEachRowRowOutputFormat::writeField(const IColumn & column, const IDataType & type, size_t row_num) void JSONCompactEachRowRowOutputFormat::writeField(const IColumn & column, const IDataType & type, size_t row_num)
{ {
type.serializeAsTextJSON(column, row_num, out, settings); if (yield_strings)
{
WriteBufferFromOwnString buf;
type.serializeAsText(column, row_num, buf, settings);
writeJSONString(buf.str(), out, settings);
}
else
type.serializeAsTextJSON(column, row_num, out, settings);
} }
@ -97,7 +106,7 @@ void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory)
FormatFactory::WriteCallback callback, FormatFactory::WriteCallback callback,
const FormatSettings & format_settings) const FormatSettings & format_settings)
{ {
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, callback, format_settings, false); return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, callback, format_settings, false, false);
}); });
factory.registerOutputFormatProcessor("JSONCompactEachRowWithNamesAndTypes", []( factory.registerOutputFormatProcessor("JSONCompactEachRowWithNamesAndTypes", [](
@ -106,7 +115,25 @@ void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory)
FormatFactory::WriteCallback callback, FormatFactory::WriteCallback callback,
const FormatSettings &format_settings) const FormatSettings &format_settings)
{ {
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, callback, format_settings, true); return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, callback, format_settings, true, false);
});
factory.registerOutputFormatProcessor("JSONCompactStringsEachRow", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const FormatSettings & format_settings)
{
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, callback, format_settings, false, true);
});
factory.registerOutputFormatProcessor("JSONCompactStringsEachRowWithNamesAndTypes", [](
WriteBuffer &buf,
const Block &sample,
FormatFactory::WriteCallback callback,
const FormatSettings &format_settings)
{
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, callback, format_settings, true, true);
}); });
} }

View File

@ -15,7 +15,13 @@ namespace DB
class JSONCompactEachRowRowOutputFormat : public IRowOutputFormat class JSONCompactEachRowRowOutputFormat : public IRowOutputFormat
{ {
public: public:
JSONCompactEachRowRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & settings_, bool with_names); JSONCompactEachRowRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
FormatFactory::WriteCallback callback,
const FormatSettings & settings_,
bool with_names_,
bool yield_strings_);
String getName() const override { return "JSONCompactEachRowRowOutputFormat"; } String getName() const override { return "JSONCompactEachRowRowOutputFormat"; }
@ -41,5 +47,6 @@ private:
NamesAndTypes fields; NamesAndTypes fields;
bool with_names; bool with_names;
bool yield_strings;
}; };
} }

View File

@ -8,15 +8,28 @@ namespace DB
{ {
JSONCompactRowOutputFormat::JSONCompactRowOutputFormat( JSONCompactRowOutputFormat::JSONCompactRowOutputFormat(
WriteBuffer & out_, const Block & header, FormatFactory::WriteCallback callback, const FormatSettings & settings_) WriteBuffer & out_,
: JSONRowOutputFormat(out_, header, callback, settings_) const Block & header,
FormatFactory::WriteCallback callback,
const FormatSettings & settings_,
bool yield_strings_)
: JSONRowOutputFormat(out_, header, callback, settings_, yield_strings_)
{ {
} }
void JSONCompactRowOutputFormat::writeField(const IColumn & column, const IDataType & type, size_t row_num) void JSONCompactRowOutputFormat::writeField(const IColumn & column, const IDataType & type, size_t row_num)
{ {
type.serializeAsTextJSON(column, row_num, *ostr, settings); if (yield_strings)
{
WriteBufferFromOwnString buf;
type.serializeAsText(column, row_num, buf, settings);
writeJSONString(buf.str(), *ostr, settings);
}
else
type.serializeAsTextJSON(column, row_num, *ostr, settings);
++field_number; ++field_number;
} }
@ -83,7 +96,16 @@ void registerOutputFormatProcessorJSONCompact(FormatFactory & factory)
FormatFactory::WriteCallback callback, FormatFactory::WriteCallback callback,
const FormatSettings & format_settings) const FormatSettings & format_settings)
{ {
return std::make_shared<JSONCompactRowOutputFormat>(buf, sample, callback, format_settings); return std::make_shared<JSONCompactRowOutputFormat>(buf, sample, callback, format_settings, false);
});
factory.registerOutputFormatProcessor("JSONCompactStrings", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const FormatSettings & format_settings)
{
return std::make_shared<JSONCompactRowOutputFormat>(buf, sample, callback, format_settings, true);
}); });
} }

View File

@ -11,12 +11,17 @@ namespace DB
struct FormatSettings; struct FormatSettings;
/** The stream for outputting data in the JSONCompact format. /** The stream for outputting data in the JSONCompact- formats.
*/ */
class JSONCompactRowOutputFormat : public JSONRowOutputFormat class JSONCompactRowOutputFormat : public JSONRowOutputFormat
{ {
public: public:
JSONCompactRowOutputFormat(WriteBuffer & out_, const Block & header, FormatFactory::WriteCallback callback, const FormatSettings & settings_); JSONCompactRowOutputFormat(
WriteBuffer & out_,
const Block & header,
FormatFactory::WriteCallback callback,
const FormatSettings & settings_,
bool yield_strings_);
String getName() const override { return "JSONCompactRowOutputFormat"; } String getName() const override { return "JSONCompactRowOutputFormat"; }
@ -37,7 +42,6 @@ protected:
} }
void writeTotalsFieldDelimiter() override; void writeTotalsFieldDelimiter() override;
}; };
} }

View File

@ -1,4 +1,5 @@
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromString.h>
#include <Processors/Formats/Impl/JSONEachRowRowInputFormat.h> #include <Processors/Formats/Impl/JSONEachRowRowInputFormat.h>
#include <Formats/JSONEachRowUtils.h> #include <Formats/JSONEachRowUtils.h>
@ -29,8 +30,12 @@ enum
JSONEachRowRowInputFormat::JSONEachRowRowInputFormat( JSONEachRowRowInputFormat::JSONEachRowRowInputFormat(
ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_) ReadBuffer & in_,
: IRowInputFormat(header_, in_, std::move(params_)), format_settings(format_settings_), name_map(header_.columns()) const Block & header_,
Params params_,
const FormatSettings & format_settings_,
bool yield_strings_)
: IRowInputFormat(header_, in_, std::move(params_)), format_settings(format_settings_), name_map(header_.columns()), yield_strings(yield_strings_)
{ {
/// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it. /// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
skipBOMIfExists(in); skipBOMIfExists(in);
@ -138,10 +143,25 @@ void JSONEachRowRowInputFormat::readField(size_t index, MutableColumns & columns
{ {
seen_columns[index] = read_columns[index] = true; seen_columns[index] = read_columns[index] = true;
const auto & type = getPort().getHeader().getByPosition(index).type; const auto & type = getPort().getHeader().getByPosition(index).type;
if (format_settings.null_as_default && !type->isNullable())
read_columns[index] = DataTypeNullable::deserializeTextJSON(*columns[index], in, format_settings, type); if (yield_strings)
{
// notice: null_as_default on "null" strings is not supported
String str;
readJSONString(str, in);
ReadBufferFromString buf(str);
type->deserializeAsWholeText(*columns[index], buf, format_settings);
}
else else
type->deserializeAsTextJSON(*columns[index], in, format_settings); {
if (format_settings.null_as_default && !type->isNullable())
read_columns[index] = DataTypeNullable::deserializeTextJSON(*columns[index], in, format_settings, type);
else
type->deserializeAsTextJSON(*columns[index], in, format_settings);
}
} }
catch (Exception & e) catch (Exception & e)
{ {
@ -318,13 +338,23 @@ void registerInputFormatProcessorJSONEachRow(FormatFactory & factory)
IRowInputFormat::Params params, IRowInputFormat::Params params,
const FormatSettings & settings) const FormatSettings & settings)
{ {
return std::make_shared<JSONEachRowRowInputFormat>(buf, sample, std::move(params), settings); return std::make_shared<JSONEachRowRowInputFormat>(buf, sample, std::move(params), settings, false);
});
factory.registerInputFormatProcessor("JSONStringsEachRow", [](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
return std::make_shared<JSONEachRowRowInputFormat>(buf, sample, std::move(params), settings, true);
}); });
} }
void registerFileSegmentationEngineJSONEachRow(FormatFactory & factory) void registerFileSegmentationEngineJSONEachRow(FormatFactory & factory)
{ {
factory.registerFileSegmentationEngine("JSONEachRow", &fileSegmentationEngineJSONEachRowImpl); factory.registerFileSegmentationEngine("JSONEachRow", &fileSegmentationEngineJSONEachRowImpl);
factory.registerFileSegmentationEngine("JSONStringsEachRow", &fileSegmentationEngineJSONEachRowImpl);
} }
} }

View File

@ -20,7 +20,12 @@ class ReadBuffer;
class JSONEachRowRowInputFormat : public IRowInputFormat class JSONEachRowRowInputFormat : public IRowInputFormat
{ {
public: public:
JSONEachRowRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_); JSONEachRowRowInputFormat(
ReadBuffer & in_,
const Block & header_,
Params params_,
const FormatSettings & format_settings_,
bool yield_strings_);
String getName() const override { return "JSONEachRowRowInputFormat"; } String getName() const override { return "JSONEachRowRowInputFormat"; }
@ -75,6 +80,8 @@ private:
bool data_in_square_brackets = false; bool data_in_square_brackets = false;
bool allow_new_rows = true; bool allow_new_rows = true;
bool yield_strings;
}; };
} }

View File

@ -8,8 +8,13 @@ namespace DB
{ {
JSONEachRowRowOutputFormat::JSONEachRowRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & settings_) JSONEachRowRowOutputFormat::JSONEachRowRowOutputFormat(
: IRowOutputFormat(header_, out_, callback), settings(settings_) WriteBuffer & out_,
const Block & header_,
FormatFactory::WriteCallback callback,
const FormatSettings & settings_,
bool yield_strings_)
: IRowOutputFormat(header_, out_, callback), settings(settings_), yield_strings(yield_strings_)
{ {
const auto & sample = getPort(PortKind::Main).getHeader(); const auto & sample = getPort(PortKind::Main).getHeader();
size_t columns = sample.columns(); size_t columns = sample.columns();
@ -27,7 +32,17 @@ void JSONEachRowRowOutputFormat::writeField(const IColumn & column, const IDataT
{ {
writeString(fields[field_number], out); writeString(fields[field_number], out);
writeChar(':', out); writeChar(':', out);
type.serializeAsTextJSON(column, row_num, out, settings);
if (yield_strings)
{
WriteBufferFromOwnString buf;
type.serializeAsText(column, row_num, buf, settings);
writeJSONString(buf.str(), out, settings);
}
else
type.serializeAsTextJSON(column, row_num, out, settings);
++field_number; ++field_number;
} }
@ -59,7 +74,16 @@ void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory)
FormatFactory::WriteCallback callback, FormatFactory::WriteCallback callback,
const FormatSettings & format_settings) const FormatSettings & format_settings)
{ {
return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, callback, format_settings); return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, callback, format_settings, false);
});
factory.registerOutputFormatProcessor("JSONStringsEachRow", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const FormatSettings & format_settings)
{
return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, callback, format_settings, true);
}); });
} }

View File

@ -15,7 +15,12 @@ namespace DB
class JSONEachRowRowOutputFormat : public IRowOutputFormat class JSONEachRowRowOutputFormat : public IRowOutputFormat
{ {
public: public:
JSONEachRowRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & settings_); JSONEachRowRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
FormatFactory::WriteCallback callback,
const FormatSettings & settings_,
bool yield_strings_);
String getName() const override { return "JSONEachRowRowOutputFormat"; } String getName() const override { return "JSONEachRowRowOutputFormat"; }
@ -35,6 +40,9 @@ private:
Names fields; Names fields;
FormatSettings settings; FormatSettings settings;
protected:
bool yield_strings;
}; };
} }

View File

@ -36,7 +36,16 @@ void registerOutputFormatProcessorJSONEachRowWithProgress(FormatFactory & factor
FormatFactory::WriteCallback callback, FormatFactory::WriteCallback callback,
const FormatSettings & format_settings) const FormatSettings & format_settings)
{ {
return std::make_shared<JSONEachRowWithProgressRowOutputFormat>(buf, sample, callback, format_settings); return std::make_shared<JSONEachRowWithProgressRowOutputFormat>(buf, sample, callback, format_settings, false);
});
factory.registerOutputFormatProcessor("JSONStringsEachRowWithProgress", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const FormatSettings & format_settings)
{
return std::make_shared<JSONEachRowWithProgressRowOutputFormat>(buf, sample, callback, format_settings, true);
}); });
} }

View File

@ -7,8 +7,13 @@
namespace DB namespace DB
{ {
JSONRowOutputFormat::JSONRowOutputFormat(WriteBuffer & out_, const Block & header, FormatFactory::WriteCallback callback, const FormatSettings & settings_) JSONRowOutputFormat::JSONRowOutputFormat(
: IRowOutputFormat(header, out_, callback), settings(settings_) WriteBuffer & out_,
const Block & header,
FormatFactory::WriteCallback callback,
const FormatSettings & settings_,
bool yield_strings_)
: IRowOutputFormat(header, out_, callback), settings(settings_), yield_strings(yield_strings_)
{ {
const auto & sample = getPort(PortKind::Main).getHeader(); const auto & sample = getPort(PortKind::Main).getHeader();
NamesAndTypesList columns(sample.getNamesAndTypesList()); NamesAndTypesList columns(sample.getNamesAndTypesList());
@ -71,7 +76,17 @@ void JSONRowOutputFormat::writeField(const IColumn & column, const IDataType & t
writeCString("\t\t\t", *ostr); writeCString("\t\t\t", *ostr);
writeString(fields[field_number].name, *ostr); writeString(fields[field_number].name, *ostr);
writeCString(": ", *ostr); writeCString(": ", *ostr);
type.serializeAsTextJSON(column, row_num, *ostr, settings);
if (yield_strings)
{
WriteBufferFromOwnString buf;
type.serializeAsText(column, row_num, buf, settings);
writeJSONString(buf.str(), *ostr, settings);
}
else
type.serializeAsTextJSON(column, row_num, *ostr, settings);
++field_number; ++field_number;
} }
@ -80,7 +95,17 @@ void JSONRowOutputFormat::writeTotalsField(const IColumn & column, const IDataTy
writeCString("\t\t", *ostr); writeCString("\t\t", *ostr);
writeString(fields[field_number].name, *ostr); writeString(fields[field_number].name, *ostr);
writeCString(": ", *ostr); writeCString(": ", *ostr);
type.serializeAsTextJSON(column, row_num, *ostr, settings);
if (yield_strings)
{
WriteBufferFromOwnString buf;
type.serializeAsText(column, row_num, buf, settings);
writeJSONString(buf.str(), *ostr, settings);
}
else
type.serializeAsTextJSON(column, row_num, *ostr, settings);
++field_number; ++field_number;
} }
@ -249,7 +274,16 @@ void registerOutputFormatProcessorJSON(FormatFactory & factory)
FormatFactory::WriteCallback callback, FormatFactory::WriteCallback callback,
const FormatSettings & format_settings) const FormatSettings & format_settings)
{ {
return std::make_shared<JSONRowOutputFormat>(buf, sample, callback, format_settings); return std::make_shared<JSONRowOutputFormat>(buf, sample, callback, format_settings, false);
});
factory.registerOutputFormatProcessor("JSONStrings", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const FormatSettings & format_settings)
{
return std::make_shared<JSONRowOutputFormat>(buf, sample, callback, format_settings, true);
}); });
} }

View File

@ -16,7 +16,12 @@ namespace DB
class JSONRowOutputFormat : public IRowOutputFormat class JSONRowOutputFormat : public IRowOutputFormat
{ {
public: public:
JSONRowOutputFormat(WriteBuffer & out_, const Block & header, FormatFactory::WriteCallback callback, const FormatSettings & settings_); JSONRowOutputFormat(
WriteBuffer & out_,
const Block & header,
FormatFactory::WriteCallback callback,
const FormatSettings & settings_,
bool yield_strings_);
String getName() const override { return "JSONRowOutputFormat"; } String getName() const override { return "JSONRowOutputFormat"; }
@ -78,6 +83,8 @@ protected:
Progress progress; Progress progress;
Stopwatch watch; Stopwatch watch;
FormatSettings settings; FormatSettings settings;
bool yield_strings;
}; };
} }

View File

@ -1,245 +0,0 @@
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromString.h>
#include <Processors/Formats/Impl/JSONStringsEachRowRowInputFormat.h>
#include <Formats/FormatFactory.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeNullable.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
extern const int CANNOT_READ_ALL_DATA;
}
JSONStringsEachRowRowInputFormat::JSONStringsEachRowRowInputFormat(ReadBuffer & in_,
const Block & header_,
Params params_,
const FormatSettings & format_settings_,
bool with_names_)
: IRowInputFormat(header_, in_, std::move(params_)), format_settings(format_settings_), with_names(with_names_)
{
const auto & sample = getPort().getHeader();
size_t num_columns = sample.columns();
data_types.resize(num_columns);
column_indexes_by_names.reserve(num_columns);
for (size_t i = 0; i < num_columns; ++i)
{
const auto & column_info = sample.getByPosition(i);
data_types[i] = column_info.type;
column_indexes_by_names.emplace(column_info.name, i);
}
}
void JSONStringsEachRowRowInputFormat::resetParser()
{
IRowInputFormat::resetParser();
column_indexes_for_input_fields.clear();
not_seen_columns.clear();
}
void JSONStringsEachRowRowInputFormat::readPrefix()
{
/// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
skipBOMIfExists(in);
if (with_names)
{
size_t num_columns = getPort().getHeader().columns();
read_columns.assign(num_columns, false);
assertChar('[', in);
do
{
skipWhitespaceIfAny(in);
String column_name;
readJSONString(column_name, in);
addInputColumn(column_name);
skipWhitespaceIfAny(in);
}
while (checkChar(',', in));
assertChar(']', in);
skipEndOfLine();
/// Type checking
assertChar('[', in);
for (size_t i = 0; i < column_indexes_for_input_fields.size(); ++i)
{
skipWhitespaceIfAny(in);
String data_type;
readJSONString(data_type, in);
if (column_indexes_for_input_fields[i] &&
data_types[*column_indexes_for_input_fields[i]]->getName() != data_type)
{
throw Exception(
"Type of '" + getPort().getHeader().getByPosition(*column_indexes_for_input_fields[i]).name
+ "' must be " + data_types[*column_indexes_for_input_fields[i]]->getName() +
", not " + data_type,
ErrorCodes::INCORRECT_DATA
);
}
if (i != column_indexes_for_input_fields.size() - 1)
assertChar(',', in);
skipWhitespaceIfAny(in);
}
assertChar(']', in);
}
else
{
size_t num_columns = getPort().getHeader().columns();
read_columns.assign(num_columns, true);
column_indexes_for_input_fields.resize(num_columns);
for (size_t i = 0; i < num_columns; ++i)
{
column_indexes_for_input_fields[i] = i;
}
}
for (size_t i = 0; i < read_columns.size(); ++i)
{
if (!read_columns[i])
{
not_seen_columns.emplace_back(i);
}
}
}
void JSONStringsEachRowRowInputFormat::addInputColumn(const String & column_name)
{
names_of_columns.emplace_back(column_name);
const auto column_it = column_indexes_by_names.find(column_name);
if (column_it == column_indexes_by_names.end())
{
if (format_settings.skip_unknown_fields)
{
column_indexes_for_input_fields.push_back(std::nullopt);
return;
}
throw Exception(
"Unknown field found in JSONStringsEachRow header: '" + column_name + "' " +
"at position " + std::to_string(column_indexes_for_input_fields.size()) +
"\nSet the 'input_format_skip_unknown_fields' parameter explicitly to ignore and proceed",
ErrorCodes::INCORRECT_DATA
);
}
const auto column_index = column_it->second;
if (read_columns[column_index])
throw Exception("Duplicate field found while parsing JSONStringsEachRow header: " + column_name, ErrorCodes::INCORRECT_DATA);
read_columns[column_index] = true;
column_indexes_for_input_fields.emplace_back(column_index);
}
bool JSONStringsEachRowRowInputFormat::readRow(DB::MutableColumns &columns, DB::RowReadExtension &ext)
{
skipEndOfLine();
if (in.eof())
return false;
size_t num_columns = columns.size();
read_columns.assign(num_columns, false);
assertChar('[', in);
for (size_t file_column = 0; file_column < column_indexes_for_input_fields.size(); ++file_column)
{
const auto & table_column = column_indexes_for_input_fields[file_column];
if (table_column)
{
readField(*table_column, columns);
}
else
{
skipJSONField(in, StringRef(names_of_columns[file_column]));
}
skipWhitespaceIfAny(in);
if (in.eof())
throw Exception("Unexpected end of stream while parsing JSONStringsEachRow format", ErrorCodes::CANNOT_READ_ALL_DATA);
if (file_column + 1 != column_indexes_for_input_fields.size())
{
assertChar(',', in);
skipWhitespaceIfAny(in);
}
}
assertChar(']', in);
for (const auto & name : not_seen_columns)
columns[name]->insertDefault();
ext.read_columns = read_columns;
return true;
}
void JSONStringsEachRowRowInputFormat::skipEndOfLine()
{
skipWhitespaceIfAny(in);
if (!in.eof() && (*in.position() == ',' || *in.position() == ';'))
++in.position();
skipWhitespaceIfAny(in);
}
void JSONStringsEachRowRowInputFormat::readField(size_t index, MutableColumns & columns)
{
try
{
read_columns[index] = true;
const auto & type = data_types[index];
String str;
readJSONString(str, in);
ReadBufferFromString buf(str);
type->deserializeAsWholeText(*columns[index], buf, format_settings);
}
catch (Exception & e)
{
e.addMessage("(while read the value of key " + getPort().getHeader().getByPosition(index).name + ")");
throw;
}
}
void JSONStringsEachRowRowInputFormat::syncAfterError()
{
skipToUnescapedNextLineOrEOF(in);
}
void registerInputFormatProcessorJSONStringsEachRow(FormatFactory & factory)
{
factory.registerInputFormatProcessor("JSONStringsEachRow", [](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
return std::make_shared<JSONStringsEachRowRowInputFormat>(buf, sample, std::move(params), settings, false);
});
factory.registerInputFormatProcessor("JSONStringsEachRowWithNamesAndTypes", [](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
return std::make_shared<JSONStringsEachRowRowInputFormat>(buf, sample, std::move(params), settings, true);
});
}
}

View File

@ -1,54 +0,0 @@
#pragma once
#pragma once
#include <Core/Block.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <Formats/FormatSettings.h>
#include <Common/HashTable/HashMap.h>
namespace DB
{
class ReadBuffer;
/** A stream for reading data in JSONStringsEachRow and JSONStringsEachRowWithNamesAndTypes formats
*/
class JSONStringsEachRowRowInputFormat : public IRowInputFormat
{
public:
JSONStringsEachRowRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_, bool with_names_);
String getName() const override { return "JSONStringsEachRowRowInputFormat"; }
void readPrefix() override;
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;
void resetParser() override;
private:
void addInputColumn(const String & column_name);
void skipEndOfLine();
void readField(size_t index, MutableColumns & columns);
const FormatSettings format_settings;
using IndexesMap = std::unordered_map<String, size_t>;
IndexesMap column_indexes_by_names;
using OptionalIndexes = std::vector<std::optional<size_t>>;
OptionalIndexes column_indexes_for_input_fields;
DataTypes data_types;
std::vector<UInt8> read_columns;
std::vector<size_t> not_seen_columns;
/// This is for the correct exceptions in skipping unknown fields.
std::vector<String> names_of_columns;
bool with_names;
};
}

View File

@ -1,117 +0,0 @@
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferValidUTF8.h>
#include <Processors/Formats/Impl/JSONStringsEachRowRowOutputFormat.h>
#include <Formats/FormatFactory.h>
namespace DB
{
JSONStringsEachRowRowOutputFormat::JSONStringsEachRowRowOutputFormat(WriteBuffer & out_,
const Block & header_,
FormatFactory::WriteCallback callback,
const FormatSettings & settings_,
bool with_names_)
: IRowOutputFormat(header_, out_, callback), settings(settings_), with_names(with_names_)
{
const auto & sample = getPort(PortKind::Main).getHeader();
NamesAndTypesList columns(sample.getNamesAndTypesList());
fields.assign(columns.begin(), columns.end());
}
void JSONStringsEachRowRowOutputFormat::writeField(const IColumn & column, const IDataType & type, size_t row_num)
{
WriteBufferFromOwnString buf;
type.serializeAsText(column, row_num, buf, settings);
writeJSONString(buf.str(), out, settings);
}
void JSONStringsEachRowRowOutputFormat::writeFieldDelimiter()
{
writeCString(", ", out);
}
void JSONStringsEachRowRowOutputFormat::writeRowStartDelimiter()
{
writeChar('[', out);
}
void JSONStringsEachRowRowOutputFormat::writeRowEndDelimiter()
{
writeCString("]\n", out);
}
void JSONStringsEachRowRowOutputFormat::writeTotals(const Columns & columns, size_t row_num)
{
writeChar('\n', out);
size_t num_columns = columns.size();
writeChar('[', out);
for (size_t i = 0; i < num_columns; ++i)
{
if (i != 0)
JSONStringsEachRowRowOutputFormat::writeFieldDelimiter();
JSONStringsEachRowRowOutputFormat::writeField(*columns[i], *types[i], row_num);
}
writeCString("]\n", out);
}
void JSONStringsEachRowRowOutputFormat::writePrefix()
{
if (with_names)
{
writeChar('[', out);
for (size_t i = 0; i < fields.size(); ++i)
{
writeChar('\"', out);
writeString(fields[i].name, out);
writeChar('\"', out);
if (i != fields.size() - 1)
writeCString(", ", out);
}
writeCString("]\n[", out);
for (size_t i = 0; i < fields.size(); ++i)
{
writeJSONString(fields[i].type->getName(), out, settings);
if (i != fields.size() - 1)
writeCString(", ", out);
}
writeCString("]\n", out);
}
}
void JSONStringsEachRowRowOutputFormat::consumeTotals(DB::Chunk chunk)
{
if (with_names)
IRowOutputFormat::consumeTotals(std::move(chunk));
}
void registerOutputFormatProcessorJSONStringsEachRow(FormatFactory & factory)
{
factory.registerOutputFormatProcessor("JSONStringsEachRow", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const FormatSettings & format_settings)
{
return std::make_shared<JSONStringsEachRowRowOutputFormat>(buf, sample, callback, format_settings, false);
});
factory.registerOutputFormatProcessor("JSONStringsEachRowWithNamesAndTypes", [](
WriteBuffer &buf,
const Block &sample,
FormatFactory::WriteCallback callback,
const FormatSettings &format_settings)
{
return std::make_shared<JSONStringsEachRowRowOutputFormat>(buf, sample, callback, format_settings, true);
});
}
}

View File

@ -1,45 +0,0 @@
#pragma once
#include <Core/Block.h>
#include <IO/WriteBuffer.h>
#include <Processors/Formats/IRowOutputFormat.h>
#include <Formats/FormatSettings.h>
namespace DB
{
/** The stream for outputting data in JSON format, by object per line.
* Does not validate UTF-8.
*/
class JSONStringsEachRowRowOutputFormat : public IRowOutputFormat
{
public:
JSONStringsEachRowRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & settings_, bool with_names);
String getName() const override { return "JSONStringsEachRowRowOutputFormat"; }
void writePrefix() override;
void writeBeforeTotals() override {}
void writeTotals(const Columns & columns, size_t row_num) override;
void writeAfterTotals() override {}
void writeField(const IColumn & column, const IDataType & type, size_t row_num) override;
void writeFieldDelimiter() override;
void writeRowStartDelimiter() override;
void writeRowEndDelimiter() override;
protected:
void consumeTotals(Chunk) override;
/// No extremes.
void consumeExtremes(Chunk) override {}
private:
FormatSettings settings;
NamesAndTypes fields;
bool with_names;
};
}

View File

@ -1,93 +0,0 @@
#include <Processors/Formats/Impl/JSONStringsRowOutputFormat.h>
#include <Formats/FormatFactory.h>
#include <IO/WriteHelpers.h>
namespace DB
{
JSONStringsRowOutputFormat::JSONStringsRowOutputFormat(
WriteBuffer & out_, const Block & header, FormatFactory::WriteCallback callback, const FormatSettings & settings_)
: JSONRowOutputFormat(out_, header, callback, settings_)
{
}
void JSONStringsRowOutputFormat::writeField(const IColumn & column, const IDataType & type, size_t row_num)
{
WriteBufferFromOwnString buf;
type.serializeAsText(column, row_num, buf, settings);
writeJSONString(buf.str(), *ostr, settings);
++field_number;
}
void JSONStringsRowOutputFormat::writeFieldDelimiter()
{
writeCString(", ", *ostr);
}
void JSONStringsRowOutputFormat::writeTotalsFieldDelimiter()
{
writeCString(",", *ostr);
}
void JSONStringsRowOutputFormat::writeRowStartDelimiter()
{
writeCString("\t\t[", *ostr);
}
void JSONStringsRowOutputFormat::writeRowEndDelimiter()
{
writeChar(']', *ostr);
field_number = 0;
++row_count;
}
void JSONStringsRowOutputFormat::writeBeforeTotals()
{
writeCString(",\n", *ostr);
writeChar('\n', *ostr);
writeCString("\t\"totals\": [", *ostr);
}
void JSONStringsRowOutputFormat::writeAfterTotals()
{
writeChar(']', *ostr);
}
void JSONStringsRowOutputFormat::writeExtremesElement(const char * title, const Columns & columns, size_t row_num)
{
writeCString("\t\t\"", *ostr);
writeCString(title, *ostr);
writeCString("\": [", *ostr);
size_t extremes_columns = columns.size();
for (size_t i = 0; i < extremes_columns; ++i)
{
if (i != 0)
writeTotalsFieldDelimiter();
writeField(*columns[i], *types[i], row_num);
}
writeChar(']', *ostr);
}
void registerOutputFormatProcessorJSONStrings(FormatFactory & factory)
{
factory.registerOutputFormatProcessor("JSONStrings", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const FormatSettings & format_settings)
{
return std::make_shared<JSONStringsRowOutputFormat>(buf, sample, callback, format_settings);
});
}
}

View File

@ -1,43 +0,0 @@
#pragma once
#include <Core/Block.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteBufferValidUTF8.h>
#include <Processors/Formats/Impl/JSONRowOutputFormat.h>
namespace DB
{
struct FormatSettings;
/** The stream for outputting data in the JSONStrings format.
*/
class JSONStringsRowOutputFormat : public JSONRowOutputFormat
{
public:
JSONStringsRowOutputFormat(WriteBuffer & out_, const Block & header, FormatFactory::WriteCallback callback, const FormatSettings & settings_);
String getName() const override { return "JSONStringsRowOutputFormat"; }
void writeField(const IColumn & column, const IDataType & type, size_t row_num) override;
void writeFieldDelimiter() override;
void writeRowStartDelimiter() override;
void writeRowEndDelimiter() override;
void writeBeforeTotals() override;
void writeAfterTotals() override;
protected:
void writeExtremesElement(const char * title, const Columns & columns, size_t row_num) override;
void writeTotalsField(const IColumn & column, const IDataType & type, size_t row_num) override
{
return writeField(column, type, row_num);
}
void writeTotalsFieldDelimiter() override;
};
}

View File

@ -1,63 +0,0 @@
DROP TABLE IF EXISTS test_table;
DROP TABLE IF EXISTS test_table_2;
SELECT 1;
/* Check JSONStringsEachRow Output */
CREATE TABLE test_table (value UInt8, name String) ENGINE = MergeTree() ORDER BY value;
INSERT INTO test_table VALUES (1, 'a'), (2, 'b'), (3, 'c');
SELECT * FROM test_table FORMAT JSONStringsEachRow;
SELECT 2;
/* Check Totals */
SELECT name, count() AS c FROM test_table GROUP BY name WITH TOTALS ORDER BY name FORMAT JSONStringsEachRow;
SELECT 3;
/* Check JSONStringsEachRowWithNamesAndTypes Output */
SELECT * FROM test_table FORMAT JSONStringsEachRowWithNamesAndTypes;
SELECT 4;
/* Check Totals */
SELECT name, count() AS c FROM test_table GROUP BY name WITH TOTALS ORDER BY name FORMAT JSONStringsEachRowWithNamesAndTypes;
DROP TABLE IF EXISTS test_table;
SELECT 5;
/* Check JSONStringsEachRow Input */
CREATE TABLE test_table (v1 String, v2 UInt8, v3 DEFAULT v2 * 16, v4 UInt8 DEFAULT 8) ENGINE = MergeTree() ORDER BY v2;
INSERT INTO test_table FORMAT JSONStringsEachRow ["first", "1", "2", "NULL"] ["second", "2", "null", "6"];
SELECT * FROM test_table FORMAT JSONStringsEachRow;
TRUNCATE TABLE test_table;
SELECT 6;
/* Check input_format_null_as_default = 1 */
SET input_format_null_as_default = 1;
INSERT INTO test_table FORMAT JSONStringsEachRow ["first", "1", "2", "ᴺᵁᴸᴸ"] ["second", "2", "null", "6"];
SELECT * FROM test_table FORMAT JSONStringsEachRow;
TRUNCATE TABLE test_table;
SELECT 7;
/* Check Nested */
CREATE TABLE test_table_2 (v1 UInt8, n Nested(id UInt8, name String)) ENGINE = MergeTree() ORDER BY v1;
INSERT INTO test_table_2 FORMAT JSONStringsEachRow ["16", "[15, 16, 17]", "['first', 'second', 'third']"];
SELECT * FROM test_table_2 FORMAT JSONStringsEachRow;
TRUNCATE TABLE test_table_2;
SELECT 8;
/* Check JSONStringsEachRowWithNamesAndTypes Output */
SET input_format_null_as_default = 0;
INSERT INTO test_table FORMAT JSONStringsEachRowWithNamesAndTypes ["v1", "v2", "v3", "v4"]["String","UInt8","UInt16","UInt8"]["first", "1", "2", "null"]["second", "2", "null", "6"];
SELECT * FROM test_table FORMAT JSONStringsEachRow;
TRUNCATE TABLE test_table;
SELECT 9;
/* Check input_format_null_as_default = 1 */
SET input_format_null_as_default = 1;
INSERT INTO test_table FORMAT JSONStringsEachRowWithNamesAndTypes ["v1", "v2", "v3", "v4"]["String","UInt8","UInt16","UInt8"]["first", "1", "2", "null"] ["second", "2", "null", "6"];
SELECT * FROM test_table FORMAT JSONStringsEachRow;
SELECT 10;
/* Check Header */
TRUNCATE TABLE test_table;
SET input_format_skip_unknown_fields = 1;
INSERT INTO test_table FORMAT JSONStringsEachRowWithNamesAndTypes ["v1", "v2", "invalid_column"]["String", "UInt8", "UInt8"]["first", "1", "32"]["second", "2", "64"];
SELECT * FROM test_table FORMAT JSONStringsEachRow;
SELECT 11;
TRUNCATE TABLE test_table;
INSERT INTO test_table FORMAT JSONStringsEachRowWithNamesAndTypes ["v4", "v2", "v3"]["UInt8", "UInt8", "UInt16"]["1", "2", "3"]
SELECT * FROM test_table FORMAT JSONStringsEachRowWithNamesAndTypes;
SELECT 12;
/* Check Nested */
INSERT INTO test_table_2 FORMAT JSONStringsEachRowWithNamesAndTypes ["v1", "n.id", "n.name"]["UInt8", "Array(UInt8)", "Array(String)"]["16", "[15, 16, 17]", "['first', 'second', 'third']"];
SELECT * FROM test_table_2 FORMAT JSONStringsEachRowWithNamesAndTypes;
DROP TABLE IF EXISTS test_table;
DROP TABLE IF EXISTS test_table_2;

View File

@ -0,0 +1,22 @@
1
{"value":"1","name":"a"}
{"value":"2","name":"b"}
{"value":"3","name":"c"}
2
{"name":"a","c":"1"}
{"name":"b","c":"1"}
{"name":"c","c":"1"}
3
{"row":{"a":"1"}}
{"progress":{"read_rows":"1","read_bytes":"1","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}
4
{"row":{"a":"1"}}
{"progress":{"read_rows":"1","read_bytes":"1","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}
5
{"v1":"first","v2":"1","v3":"2","v4":"0"}
{"v1":"second","v2":"2","v3":"0","v4":"6"}
6
{"v1":"first","v2":"1","v3":"2","v4":"0"}
{"v1":"second","v2":"2","v3":"0","v4":"6"}
7
{"v1":"16","n.id":"[15,16,17]","n.name":"['first','second','third']"}

View File

@ -0,0 +1,38 @@
DROP TABLE IF EXISTS test_table;
DROP TABLE IF EXISTS test_table_2;
SELECT 1;
/* Check JSONStringsEachRow Output */
CREATE TABLE test_table (value UInt8, name String) ENGINE = MergeTree() ORDER BY value;
INSERT INTO test_table VALUES (1, 'a'), (2, 'b'), (3, 'c');
SELECT * FROM test_table FORMAT JSONStringsEachRow;
SELECT 2;
/* Check Totals */
SELECT name, count() AS c FROM test_table GROUP BY name WITH TOTALS ORDER BY name FORMAT JSONStringsEachRow;
SELECT 3;
/* Check JSONStringsEachRowWithProgress Output */
SELECT 1 as a FROM system.one FORMAT JSONStringsEachRowWithProgress;
SELECT 4;
/* Check Totals */
SELECT 1 as a FROM system.one GROUP BY a WITH TOTALS ORDER BY a FORMAT JSONStringsEachRowWithProgress;
DROP TABLE IF EXISTS test_table;
SELECT 5;
/* Check JSONStringsEachRow Input */
CREATE TABLE test_table (v1 String, v2 UInt8, v3 DEFAULT v2 * 16, v4 UInt8 DEFAULT 8) ENGINE = MergeTree() ORDER BY v2;
INSERT INTO test_table FORMAT JSONStringsEachRow {"v1": "first", "v2": "1", "v3": "2", "v4": "NULL"} {"v1": "second", "v2": "2", "v3": "null", "v4": "6"};
SELECT * FROM test_table FORMAT JSONStringsEachRow;
TRUNCATE TABLE test_table;
SELECT 6;
/* Check input_format_null_as_default = 1 */
SET input_format_null_as_default = 1;
INSERT INTO test_table FORMAT JSONStringsEachRow {"v1": "first", "v2": "1", "v3": "2", "v4": "ᴺᵁᴸᴸ"} {"v1": "second", "v2": "2", "v3": "null", "v4": "6"};
SELECT * FROM test_table FORMAT JSONStringsEachRow;
TRUNCATE TABLE test_table;
SELECT 7;
/* Check Nested */
CREATE TABLE test_table_2 (v1 UInt8, n Nested(id UInt8, name String)) ENGINE = MergeTree() ORDER BY v1;
INSERT INTO test_table_2 FORMAT JSONStringsEachRow {"v1": "16", "n.id": "[15, 16, 17]", "n.name": "['first', 'second', 'third']"};
SELECT * FROM test_table_2 FORMAT JSONStringsEachRow;
TRUNCATE TABLE test_table_2;
DROP TABLE IF EXISTS test_table;
DROP TABLE IF EXISTS test_table_2;

View File

@ -0,0 +1,43 @@
{
"meta":
[
{
"name": "1",
"type": "UInt8"
},
{
"name": "'a'",
"type": "String"
},
{
"name": "[1, 2, 3]",
"type": "Array(UInt8)"
},
{
"name": "tuple(1, 'a')",
"type": "Tuple(UInt8, String)"
},
{
"name": "NULL",
"type": "Nullable(Nothing)"
},
{
"name": "nan",
"type": "Float64"
}
],
"data":
[
{
"1": "1",
"'a'": "a",
"[1, 2, 3]": "[1,2,3]",
"tuple(1, 'a')": "(1,'a')",
"NULL": "ᴺᵁᴸᴸ",
"nan": "nan"
}
],
"rows": 1
}

View File

@ -0,0 +1,63 @@
DROP TABLE IF EXISTS test_table;
DROP TABLE IF EXISTS test_table_2;
SELECT 1;
/* Check JSONCompactStringsEachRow Output */
CREATE TABLE test_table (value UInt8, name String) ENGINE = MergeTree() ORDER BY value;
INSERT INTO test_table VALUES (1, 'a'), (2, 'b'), (3, 'c');
SELECT * FROM test_table FORMAT JSONCompactStringsEachRow;
SELECT 2;
/* Check Totals */
SELECT name, count() AS c FROM test_table GROUP BY name WITH TOTALS ORDER BY name FORMAT JSONCompactStringsEachRow;
SELECT 3;
/* Check JSONCompactStringsEachRowWithNamesAndTypes Output */
SELECT * FROM test_table FORMAT JSONCompactStringsEachRowWithNamesAndTypes;
SELECT 4;
/* Check Totals */
SELECT name, count() AS c FROM test_table GROUP BY name WITH TOTALS ORDER BY name FORMAT JSONCompactStringsEachRowWithNamesAndTypes;
DROP TABLE IF EXISTS test_table;
SELECT 5;
/* Check JSONCompactStringsEachRow Input */
CREATE TABLE test_table (v1 String, v2 UInt8, v3 DEFAULT v2 * 16, v4 UInt8 DEFAULT 8) ENGINE = MergeTree() ORDER BY v2;
INSERT INTO test_table FORMAT JSONCompactStringsEachRow ["first", "1", "2", "NULL"] ["second", "2", "null", "6"];
SELECT * FROM test_table FORMAT JSONCompactStringsEachRow;
TRUNCATE TABLE test_table;
SELECT 6;
/* Check input_format_null_as_default = 1 */
SET input_format_null_as_default = 1;
INSERT INTO test_table FORMAT JSONCompactStringsEachRow ["first", "1", "2", "ᴺᵁᴸᴸ"] ["second", "2", "null", "6"];
SELECT * FROM test_table FORMAT JSONCompactStringsEachRow;
TRUNCATE TABLE test_table;
SELECT 7;
/* Check Nested */
CREATE TABLE test_table_2 (v1 UInt8, n Nested(id UInt8, name String)) ENGINE = MergeTree() ORDER BY v1;
INSERT INTO test_table_2 FORMAT JSONCompactStringsEachRow ["16", "[15, 16, 17]", "['first', 'second', 'third']"];
SELECT * FROM test_table_2 FORMAT JSONCompactStringsEachRow;
TRUNCATE TABLE test_table_2;
SELECT 8;
/* Check JSONCompactStringsEachRowWithNamesAndTypes Output */
SET input_format_null_as_default = 0;
INSERT INTO test_table FORMAT JSONCompactStringsEachRowWithNamesAndTypes ["v1", "v2", "v3", "v4"]["String","UInt8","UInt16","UInt8"]["first", "1", "2", "null"]["second", "2", "null", "6"];
SELECT * FROM test_table FORMAT JSONCompactStringsEachRow;
TRUNCATE TABLE test_table;
SELECT 9;
/* Check input_format_null_as_default = 1 */
SET input_format_null_as_default = 1;
INSERT INTO test_table FORMAT JSONCompactStringsEachRowWithNamesAndTypes ["v1", "v2", "v3", "v4"]["String","UInt8","UInt16","UInt8"]["first", "1", "2", "null"] ["second", "2", "null", "6"];
SELECT * FROM test_table FORMAT JSONCompactStringsEachRow;
SELECT 10;
/* Check Header */
TRUNCATE TABLE test_table;
SET input_format_skip_unknown_fields = 1;
INSERT INTO test_table FORMAT JSONCompactStringsEachRowWithNamesAndTypes ["v1", "v2", "invalid_column"]["String", "UInt8", "UInt8"]["first", "1", "32"]["second", "2", "64"];
SELECT * FROM test_table FORMAT JSONCompactStringsEachRow;
SELECT 11;
TRUNCATE TABLE test_table;
INSERT INTO test_table FORMAT JSONCompactStringsEachRowWithNamesAndTypes ["v4", "v2", "v3"]["UInt8", "UInt8", "UInt16"]["1", "2", "3"]
SELECT * FROM test_table FORMAT JSONCompactStringsEachRowWithNamesAndTypes;
SELECT 12;
/* Check Nested */
INSERT INTO test_table_2 FORMAT JSONCompactStringsEachRowWithNamesAndTypes ["v1", "n.id", "n.name"]["UInt8", "Array(UInt8)", "Array(String)"]["16", "[15, 16, 17]", "['first', 'second', 'third']"];
SELECT * FROM test_table_2 FORMAT JSONCompactStringsEachRowWithNamesAndTypes;
DROP TABLE IF EXISTS test_table;
DROP TABLE IF EXISTS test_table_2;

View File

@ -0,0 +1,10 @@
SET output_format_write_statistics = 0;
SELECT
1,
'a',
[1, 2, 3],
(1, 'a'),
null,
nan
FORMAT JSONCompactStrings;