diff --git a/dbms/src/Formats/FormatFactory.cpp b/dbms/src/Formats/FormatFactory.cpp index 57071b17c28..dfbaef334e0 100644 --- a/dbms/src/Formats/FormatFactory.cpp +++ b/dbms/src/Formats/FormatFactory.cpp @@ -281,6 +281,8 @@ void registerInputFormatProcessorTSKV(FormatFactory & factory); void registerOutputFormatProcessorTSKV(FormatFactory & factory); void registerInputFormatProcessorJSONEachRow(FormatFactory & factory); void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory); +void registerInputFormatProcessorJSONCompactEachRow(FormatFactory & factory); +void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory); void registerInputFormatProcessorParquet(FormatFactory & factory); void registerInputFormatProcessorORC(FormatFactory & factory); void registerOutputFormatProcessorParquet(FormatFactory & factory); @@ -336,6 +338,8 @@ FormatFactory::FormatFactory() registerOutputFormatProcessorTSKV(*this); registerInputFormatProcessorJSONEachRow(*this); registerOutputFormatProcessorJSONEachRow(*this); + registerInputFormatProcessorJSONCompactEachRow(*this); + registerOutputFormatProcessorJSONCompactEachRow(*this); registerInputFormatProcessorProtobuf(*this); registerOutputFormatProcessorProtobuf(*this); registerInputFormatProcessorCapnProto(*this); diff --git a/dbms/src/Processors/Formats/Impl/JSONCompactEachRowRowInputFormat.cpp b/dbms/src/Processors/Formats/Impl/JSONCompactEachRowRowInputFormat.cpp new file mode 100644 index 00000000000..d4530e7b09d --- /dev/null +++ b/dbms/src/Processors/Formats/Impl/JSONCompactEachRowRowInputFormat.cpp @@ -0,0 +1,238 @@ +#include + +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int INCORRECT_DATA; + extern const int CANNOT_READ_ALL_DATA; + extern const int LOGICAL_ERROR; +} + + +JSONCompactEachRowRowInputFormat::JSONCompactEachRowRowInputFormat(ReadBuffer & in_, + const Block & header_, + Params params_, + const FormatSettings & format_settings_, + bool with_names_) + : IRowInputFormat(header_, in_, std::move(params_)), format_settings(format_settings_), with_names(with_names_) +{ + /// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it. + skipBOMIfExists(in); + auto & sample = getPort().getHeader(); + size_t num_columns = sample.columns(); + + data_types.resize(num_columns); + column_indexes_by_names.reserve(num_columns); + + for (size_t i = 0; i < num_columns; ++i) + { + const auto & column_info = sample.getByPosition(i); + + data_types[i] = column_info.type; + column_indexes_by_names.emplace(column_info.name, i); + } +} + +void JSONCompactEachRowRowInputFormat::readPrefix() +{ + if (with_names) + { + size_t num_columns = getPort().getHeader().columns(); + read_columns.assign(num_columns, false); + + assertChar('[', in); + do + { + skipWhitespaceIfAny(in); + String column_name; + readJSONString(column_name, in); + addInputColumn(column_name); + skipWhitespaceIfAny(in); + } + while (checkChar(',', in)); + assertChar(']', in); + skipEndOfLine(); + + /// Type checking + assertChar('[', in); + for (size_t i = 0; i < column_indexes_for_input_fields.size(); ++i) + { + skipWhitespaceIfAny(in); + String data_type; + readJSONString(data_type, in); + + if (column_indexes_for_input_fields[i] && + data_types[*column_indexes_for_input_fields[i]]->getName() != data_type) + { + throw Exception( + "Type of '" + getPort().getHeader().getByPosition(*column_indexes_for_input_fields[i]).name + + "' must be " + data_types[*column_indexes_for_input_fields[i]]->getName() + + ", not " + data_type, + ErrorCodes::INCORRECT_DATA + ); + } + + if (i != column_indexes_for_input_fields.size() - 1) + assertChar(',', in); + skipWhitespaceIfAny(in); + } + assertChar(']', in); + } + else + { + size_t num_columns = getPort().getHeader().columns(); + read_columns.assign(num_columns, true); + column_indexes_for_input_fields.resize(num_columns); + + for (size_t i = 0; i < num_columns; ++i) + { + column_indexes_for_input_fields[i] = i; + } + } + + for (size_t i = 0; i < read_columns.size(); ++i) + { + if (!read_columns[i]) + { + not_seen_columns.emplace_back(i); + } + } +} + +void JSONCompactEachRowRowInputFormat::addInputColumn(const String & column_name) +{ + names_of_columns.emplace_back(column_name); + + const auto column_it = column_indexes_by_names.find(column_name); + if (column_it == column_indexes_by_names.end()) + { + if (format_settings.skip_unknown_fields) + { + column_indexes_for_input_fields.push_back(std::nullopt); + return; + } + + throw Exception( + "Unknown field found in JSONCompactEachRow header: '" + column_name + "' " + + "at position " + std::to_string(column_indexes_for_input_fields.size()) + + "\nSet the 'input_format_skip_unknown_fields' parameter explicitly to ignore and proceed", + ErrorCodes::INCORRECT_DATA + ); + } + + const auto column_index = column_it->second; + + if (read_columns[column_index]) + throw Exception("Duplicate field found while parsing JSONCompactEachRow header: " + column_name, ErrorCodes::INCORRECT_DATA); + + read_columns[column_index] = true; + column_indexes_for_input_fields.emplace_back(column_index); +} + +bool JSONCompactEachRowRowInputFormat::readRow(DB::MutableColumns &columns, DB::RowReadExtension &ext) +{ + skipEndOfLine(); + + if (in.eof()) + return false; + + size_t num_columns = columns.size(); + + read_columns.assign(num_columns, false); + + assertChar('[', in); + for (size_t file_column = 0; file_column < column_indexes_for_input_fields.size(); ++file_column) + { + const auto & table_column = column_indexes_for_input_fields[file_column]; + if (table_column) + { + readField(*table_column, columns); + } + else + { + skipJSONField(in, StringRef(names_of_columns[file_column])); + } + + skipWhitespaceIfAny(in); + if (in.eof()) + throw Exception("Unexpected end of stream while parsing JSONCompactEachRow format", ErrorCodes::CANNOT_READ_ALL_DATA); + if (file_column + 1 != column_indexes_for_input_fields.size()) + { + assertChar(',', in); + skipWhitespaceIfAny(in); + } + } + assertChar(']', in); + + for (size_t i = 0; i < not_seen_columns.size(); i++) + { + columns[not_seen_columns[i]]->insertDefault(); + } + + ext.read_columns = read_columns; + return true; +} + +void JSONCompactEachRowRowInputFormat::skipEndOfLine() +{ + skipWhitespaceIfAny(in); + if (!in.eof() && (*in.position() == ',' || *in.position() == ';')) + ++in.position(); + + skipWhitespaceIfAny(in); +} + +void JSONCompactEachRowRowInputFormat::readField(size_t index, MutableColumns & columns) +{ + try + { + read_columns[index] = true; + const auto & type = data_types[index]; + if (format_settings.null_as_default && !type->isNullable()) + read_columns[index] = DataTypeNullable::deserializeTextJSON(*columns[index], in, format_settings, type); + else + type->deserializeAsTextJSON(*columns[index], in, format_settings); + } + catch (Exception & e) + { + e.addMessage("(while read the value of key " + getPort().getHeader().getByPosition(index).name + ")"); + throw; + } +} + +void JSONCompactEachRowRowInputFormat::syncAfterError() +{ + skipToUnescapedNextLineOrEOF(in); +} + +void registerInputFormatProcessorJSONCompactEachRow(FormatFactory & factory) +{ + factory.registerInputFormatProcessor("JSONCompactEachRow", []( + ReadBuffer & buf, + const Block & sample, + const Context &, + IRowInputFormat::Params params, + const FormatSettings & settings) + { + return std::make_shared(buf, sample, std::move(params), settings, false); + }); + + factory.registerInputFormatProcessor("JSONCompactEachRowWithNamesAndTypes", []( + ReadBuffer & buf, + const Block & sample, + const Context &, + IRowInputFormat::Params params, + const FormatSettings & settings) + { + return std::make_shared(buf, sample, std::move(params), settings, true); + }); +} + +} diff --git a/dbms/src/Processors/Formats/Impl/JSONCompactEachRowRowInputFormat.h b/dbms/src/Processors/Formats/Impl/JSONCompactEachRowRowInputFormat.h new file mode 100644 index 00000000000..e633475d0f4 --- /dev/null +++ b/dbms/src/Processors/Formats/Impl/JSONCompactEachRowRowInputFormat.h @@ -0,0 +1,54 @@ +#pragma once + +#pragma once + +#include +#include +#include +#include + +namespace DB +{ + +class ReadBuffer; + +/** A stream for reading data in JSONCompactEachRow and JSONCompactEachRowWithNamesAndTypes formats +*/ +class JSONCompactEachRowRowInputFormat : public IRowInputFormat +{ +public: + JSONCompactEachRowRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_, bool with_names_); + + String getName() const override { return "JSONCompactEachRowRowInputFormat"; } + + + void readPrefix() override; + bool readRow(MutableColumns & columns, RowReadExtension & ext) override; + bool allowSyncAfterError() const override { return true; } + void syncAfterError() override; + + +private: + void addInputColumn(const String & column_name); + void skipEndOfLine(); + void readField(size_t index, MutableColumns & columns); + + const FormatSettings format_settings; + + using IndexesMap = std::unordered_map; + IndexesMap column_indexes_by_names; + + using OptionalIndexes = std::vector>; + OptionalIndexes column_indexes_for_input_fields; + + DataTypes data_types; + std::vector read_columns; + std::vector not_seen_columns; + + /// This is for the correct exceptions in skipping unknown fields. + std::vector names_of_columns; + + bool with_names; +}; + +} diff --git a/dbms/src/Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.cpp b/dbms/src/Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.cpp new file mode 100644 index 00000000000..433cc4515ae --- /dev/null +++ b/dbms/src/Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.cpp @@ -0,0 +1,116 @@ +#include +#include +#include +#include + + +namespace DB +{ + + +JSONCompactEachRowRowOutputFormat::JSONCompactEachRowRowOutputFormat(WriteBuffer & out_, + const Block & header_, + FormatFactory::WriteCallback callback, + const FormatSettings & settings_, + bool with_names_) + : IRowOutputFormat(header_, out_, callback), settings(settings_), with_names(with_names_) +{ + auto & sample = getPort(PortKind::Main).getHeader(); + NamesAndTypesList columns(sample.getNamesAndTypesList()); + fields.assign(columns.begin(), columns.end()); +} + + +void JSONCompactEachRowRowOutputFormat::writeField(const IColumn & column, const IDataType & type, size_t row_num) +{ + type.serializeAsTextJSON(column, row_num, out, settings); +} + + +void JSONCompactEachRowRowOutputFormat::writeFieldDelimiter() +{ + writeCString(", ", out); +} + + +void JSONCompactEachRowRowOutputFormat::writeRowStartDelimiter() +{ + writeChar('[', out); +} + + +void JSONCompactEachRowRowOutputFormat::writeRowEndDelimiter() +{ + writeCString("]\n", out); +} + +void JSONCompactEachRowRowOutputFormat::writeTotals(const Columns & columns, size_t row_num) +{ + writeChar('\n', out); + size_t num_columns = columns.size(); + writeChar('[', out); + for (size_t i = 0; i < num_columns; ++i) + { + if (i != 0) + JSONCompactEachRowRowOutputFormat::writeFieldDelimiter(); + + JSONCompactEachRowRowOutputFormat::writeField(*columns[i], *types[i], row_num); + } + writeCString("]\n", out); +} + +void JSONCompactEachRowRowOutputFormat::writePrefix() +{ + if (with_names) + { + writeChar('[', out); + for (size_t i = 0; i < fields.size(); ++i) + { + writeChar('\"', out); + writeString(fields[i].name, out); + writeChar('\"', out); + if (i != fields.size() - 1) + writeCString(", ", out); + } + writeCString("]\n[", out); + for (size_t i = 0; i < fields.size(); ++i) + { + writeJSONString(fields[i].type->getName(), out, settings); + if (i != fields.size() - 1) + writeCString(", ", out); + } + writeCString("]\n", out); + } +} + +void JSONCompactEachRowRowOutputFormat::consumeTotals(DB::Chunk chunk) +{ + if (with_names) + IRowOutputFormat::consumeTotals(std::move(chunk)); +} + +void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory) +{ + factory.registerOutputFormatProcessor("JSONCompactEachRow", []( + WriteBuffer & buf, + const Block & sample, + const Context &, + FormatFactory::WriteCallback callback, + const FormatSettings & format_settings) + { + return std::make_shared(buf, sample, callback, format_settings, false); + }); + + factory.registerOutputFormatProcessor("JSONCompactEachRowWithNamesAndTypes", []( + WriteBuffer &buf, + const Block &sample, + const Context &, + FormatFactory::WriteCallback callback, + const FormatSettings &format_settings) + { + return std::make_shared(buf, sample, callback, format_settings, true); + }); +} + + +} diff --git a/dbms/src/Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.h b/dbms/src/Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.h new file mode 100644 index 00000000000..a7857a82d2d --- /dev/null +++ b/dbms/src/Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.h @@ -0,0 +1,45 @@ +#pragma once + +#include +#include +#include +#include + + +namespace DB +{ + +/** The stream for outputting data in JSON format, by object per line. + * Does not validate UTF-8. + */ +class JSONCompactEachRowRowOutputFormat : public IRowOutputFormat +{ +public: + JSONCompactEachRowRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & settings_, bool with_names); + + String getName() const override { return "JSONCompactEachRowRowOutputFormat"; } + + void writePrefix() override; + + void writeBeforeTotals() override {} + void writeTotals(const Columns & columns, size_t row_num) override; + void writeAfterTotals() override {} + + void writeField(const IColumn & column, const IDataType & type, size_t row_num) override; + void writeFieldDelimiter() override; + void writeRowStartDelimiter() override; + void writeRowEndDelimiter() override; + +protected: + void consumeTotals(Chunk) override; + /// No extremes. + void consumeExtremes(Chunk) override {} + +private: + FormatSettings settings; + + NamesAndTypes fields; + + bool with_names; +}; +} diff --git a/dbms/tests/performance/parse_engine_file.xml b/dbms/tests/performance/parse_engine_file.xml index 6bd4af0b45b..8308d8f049f 100644 --- a/dbms/tests/performance/parse_engine_file.xml +++ b/dbms/tests/performance/parse_engine_file.xml @@ -32,6 +32,8 @@ CSVWithNames Values JSONEachRow + JSONCompactEachRow + JSONCompactEachRowWithNamesAndTypes TSKV RowBinary Native diff --git a/dbms/tests/performance/select_format.xml b/dbms/tests/performance/select_format.xml index c5ad1acd396..55ab7b2d458 100644 --- a/dbms/tests/performance/select_format.xml +++ b/dbms/tests/performance/select_format.xml @@ -34,6 +34,7 @@ JSON JSONCompact JSONEachRow + JSONCompactEachRow TSKV Pretty PrettyCompact diff --git a/dbms/tests/queries/0_stateless/01034_JSONCompactEachRow.reference b/dbms/tests/queries/0_stateless/01034_JSONCompactEachRow.reference new file mode 100644 index 00000000000..6ec53e11fc9 --- /dev/null +++ b/dbms/tests/queries/0_stateless/01034_JSONCompactEachRow.reference @@ -0,0 +1,47 @@ +1 +[1, "a"] +[2, "b"] +[3, "c"] +2 +["a", "1"] +["b", "1"] +["c", "1"] +3 +["value", "name"] +["UInt8", "String"] +[1, "a"] +[2, "b"] +[3, "c"] +4 +["name", "c"] +["String", "UInt64"] +["a", "1"] +["b", "1"] +["c", "1"] + +["", "3"] +5 +["first", 1, 2, 0] +["second", 2, 0, 6] +6 +["first", 1, 2, 8] +["second", 2, 32, 6] +7 +[16, [15,16,0], ["first","second","third"]] +8 +["first", 1, 2, 0] +["second", 2, 0, 6] +9 +["first", 1, 2, 8] +["second", 2, 32, 6] +10 +["first", 1, 16, 8] +["second", 2, 32, 8] +11 +["v1", "v2", "v3", "v4"] +["String", "UInt8", "UInt16", "UInt8"] +["", 2, 3, 1] +12 +["v1", "n.id", "n.name"] +["UInt8", "Array(UInt8)", "Array(String)"] +[16, [15,16,0], ["first","second","third"]] diff --git a/dbms/tests/queries/0_stateless/01034_JSONCompactEachRow.sql b/dbms/tests/queries/0_stateless/01034_JSONCompactEachRow.sql new file mode 100644 index 00000000000..46a0e90e69d --- /dev/null +++ b/dbms/tests/queries/0_stateless/01034_JSONCompactEachRow.sql @@ -0,0 +1,63 @@ +DROP TABLE IF EXISTS test_table; +DROP TABLE IF EXISTS test_table_2; +SELECT 1; +/* Check JSONCompactEachRow Output */ +CREATE TABLE test_table (value UInt8, name String) ENGINE = MergeTree() ORDER BY value; +INSERT INTO test_table VALUES (1, 'a'), (2, 'b'), (3, 'c'); +SELECT * FROM test_table FORMAT JSONCompactEachRow; +SELECT 2; +/* Check Totals */ +SELECT name, count() AS c FROM test_table GROUP BY name WITH TOTALS ORDER BY name FORMAT JSONCompactEachRow; +SELECT 3; +/* Check JSONCompactEachRowWithNamesAndTypes Output */ +SELECT * FROM test_table FORMAT JSONCompactEachRowWithNamesAndTypes; +SELECT 4; +/* Check Totals */ +SELECT name, count() AS c FROM test_table GROUP BY name WITH TOTALS ORDER BY name FORMAT JSONCompactEachRowWithNamesAndTypes; +DROP TABLE IF EXISTS test_table; +SELECT 5; +/* Check JSONCompactEachRow Input */ +CREATE TABLE test_table (v1 String, v2 UInt8, v3 DEFAULT v2 * 16, v4 UInt8 DEFAULT 8) ENGINE = MergeTree() ORDER BY v2; +INSERT INTO test_table FORMAT JSONCompactEachRow ["first", 1, "2", null] ["second", 2, null, 6]; +SELECT * FROM test_table FORMAT JSONCompactEachRow; +TRUNCATE TABLE test_table; +SELECT 6; +/* Check input_format_null_as_default = 1 */ +SET input_format_null_as_default = 1; +INSERT INTO test_table FORMAT JSONCompactEachRow ["first", 1, "2", null] ["second", 2, null, 6]; +SELECT * FROM test_table FORMAT JSONCompactEachRow; +TRUNCATE TABLE test_table; +SELECT 7; +/* Check Nested */ +CREATE TABLE test_table_2 (v1 UInt8, n Nested(id UInt8, name String)) ENGINE = MergeTree() ORDER BY v1; +INSERT INTO test_table_2 FORMAT JSONCompactEachRow [16, [15, 16, null], ["first", "second", "third"]]; +SELECT * FROM test_table_2 FORMAT JSONCompactEachRow; +TRUNCATE TABLE test_table_2; +SELECT 8; +/* Check JSONCompactEachRowWithNamesAndTypes Output */ +SET input_format_null_as_default = 0; +INSERT INTO test_table FORMAT JSONCompactEachRowWithNamesAndTypes ["v1", "v2", "v3", "v4"]["String","UInt8","UInt16","UInt8"]["first", 1, "2", null]["second", 2, null, 6]; +SELECT * FROM test_table FORMAT JSONCompactEachRow; +TRUNCATE TABLE test_table; +SELECT 9; +/* Check input_format_null_as_default = 1 */ +SET input_format_null_as_default = 1; +INSERT INTO test_table FORMAT JSONCompactEachRowWithNamesAndTypes ["v1", "v2", "v3", "v4"]["String","UInt8","UInt16","UInt8"]["first", 1, "2", null] ["second", 2, null, 6]; +SELECT * FROM test_table FORMAT JSONCompactEachRow; +SELECT 10; +/* Check Header */ +TRUNCATE TABLE test_table; +SET input_format_skip_unknown_fields = 1; +INSERT INTO test_table FORMAT JSONCompactEachRowWithNamesAndTypes ["v1", "v2", "invalid_column"]["String", "UInt8", "UInt8"]["first", 1, 32]["second", 2, "64"]; +SELECT * FROM test_table FORMAT JSONCompactEachRow; +SELECT 11; +TRUNCATE TABLE test_table; +INSERT INTO test_table FORMAT JSONCompactEachRowWithNamesAndTypes ["v4", "v2", "v3"]["UInt8", "UInt8", "UInt16"][1, 2, 3] +SELECT * FROM test_table FORMAT JSONCompactEachRowWithNamesAndTypes; +SELECT 12; +/* Check Nested */ +INSERT INTO test_table_2 FORMAT JSONCompactEachRowWithNamesAndTypes ["v1", "n.id", "n.name"]["UInt8", "Array(UInt8)", "Array(String)"][16, [15, 16, null], ["first", "second", "third"]]; +SELECT * FROM test_table_2 FORMAT JSONCompactEachRowWithNamesAndTypes; + +DROP TABLE IF EXISTS test_table; +DROP TABLE IF EXISTS test_table_2;