diff --git a/dbms/include/DB/DataStreams/JSONEachRowRowInputStream.h b/dbms/include/DB/DataStreams/JSONEachRowRowInputStream.h index b67c916ed3a..8ae8214a1c5 100644 --- a/dbms/include/DB/DataStreams/JSONEachRowRowInputStream.h +++ b/dbms/include/DB/DataStreams/JSONEachRowRowInputStream.h @@ -19,13 +19,14 @@ class ReadBuffer; class JSONEachRowRowInputStream : public IRowInputStream { public: - JSONEachRowRowInputStream(ReadBuffer & istr_, const Block & sample_); + JSONEachRowRowInputStream(ReadBuffer& istr_, const Block& sample_, bool skip_unknown_); bool read(Block & block) override; private: ReadBuffer & istr; const Block sample; + bool skip_unknown; /// Буфер для прочитанного из потока имени поля. Используется, если его потребовалось скопировать. String name_buf; diff --git a/dbms/include/DB/IO/ReadHelpers.h b/dbms/include/DB/IO/ReadHelpers.h index 03cfbb6b9cd..65d6b0bd1bd 100644 --- a/dbms/include/DB/IO/ReadHelpers.h +++ b/dbms/include/DB/IO/ReadHelpers.h @@ -810,6 +810,9 @@ inline void skipWhitespaceIfAny(ReadBuffer & buf) ++buf.position(); } +/// Skip json value (except array and object). +void skipJSONFieldPlain(ReadBuffer & buf, const String & name_of_filed = ""); + /** Прочитать сериализованный эксепшен. * При сериализации/десериализации часть информации теряется diff --git a/dbms/include/DB/Interpreters/Settings.h b/dbms/include/DB/Interpreters/Settings.h index 9b5779c786a..6124be139a3 100644 --- a/dbms/include/DB/Interpreters/Settings.h +++ b/dbms/include/DB/Interpreters/Settings.h @@ -223,6 +223,9 @@ struct Settings \ /** Write add http CORS header */ \ M(SettingBool, add_http_cors_header, false) \ + \ + /** Skip columns with unknown names from input data (it works for JSONEachRow and TSKV formats). */ \ + M(SettingBool, input_format_skip_unknown_fields, false) /// Всевозможные ограничения на выполнение запроса. Limits limits; diff --git a/dbms/src/DataStreams/FormatFactory.cpp b/dbms/src/DataStreams/FormatFactory.cpp index e7a8bdfc1c5..671efc310a7 100644 --- a/dbms/src/DataStreams/FormatFactory.cpp +++ b/dbms/src/DataStreams/FormatFactory.cpp @@ -61,9 +61,15 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu else if (name == "CSVWithNames") return std::make_shared(std::make_shared(buf, sample, ',', true), sample, max_block_size); else if (name == "TSKV") - return std::make_shared(std::make_shared(buf, sample, false), sample, max_block_size); + { + auto row_stream = std::make_shared(buf, sample, context.getSettingsRef().input_format_skip_unknown_fields); + return std::make_shared(std::move(row_stream), sample, max_block_size); + } else if (name == "JSONEachRow") - return std::make_shared(std::make_shared(buf, sample), sample, max_block_size); + { + auto row_stream = std::make_shared(buf, sample, context.getSettingsRef().input_format_skip_unknown_fields); + return std::make_shared(std::move(row_stream), sample, max_block_size); + } else if (name == "TabSeparatedRaw" || name == "BlockTabSeparated" || name == "Pretty" diff --git a/dbms/src/DataStreams/JSONEachRowRowInputStream.cpp b/dbms/src/DataStreams/JSONEachRowRowInputStream.cpp index ba530c53355..862e3c2113e 100644 --- a/dbms/src/DataStreams/JSONEachRowRowInputStream.cpp +++ b/dbms/src/DataStreams/JSONEachRowRowInputStream.cpp @@ -12,8 +12,8 @@ namespace ErrorCodes } -JSONEachRowRowInputStream::JSONEachRowRowInputStream(ReadBuffer & istr_, const Block & sample_) - : istr(istr_), sample(sample_), name_map(sample.columns()) +JSONEachRowRowInputStream::JSONEachRowRowInputStream(ReadBuffer & istr_, const Block & sample_, bool skip_unknown_) + : istr(istr_), sample(sample_), skip_unknown(skip_unknown_), name_map(sample.columns()) { /// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it. skipBOMIfExists(istr); @@ -50,6 +50,14 @@ static StringRef readName(ReadBuffer & buf, String & tmp) } +static void skipColonDelimeter(ReadBuffer & istr) +{ + skipWhitespaceIfAny(istr); + assertChar(':', istr); + skipWhitespaceIfAny(istr); +} + + bool JSONEachRowRowInputStream::read(Block & block) { skipWhitespaceIfAny(istr); @@ -93,16 +101,21 @@ bool JSONEachRowRowInputStream::read(Block & block) auto it = name_map.find(name_ref); if (name_map.end() == it) - throw Exception("Unknown field found while parsing JSONEachRow format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA); + { + if (!skip_unknown) + throw Exception("Unknown field found while parsing JSONEachRow format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA); + + skipColonDelimeter(istr); + skipJSONFieldPlain(istr, name_ref.toString()); + continue; + } size_t index = it->second; if (read_columns[index]) throw Exception("Duplicate field found while parsing JSONEachRow format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA); - skipWhitespaceIfAny(istr); - assertChar(':', istr); - skipWhitespaceIfAny(istr); + skipColonDelimeter(istr); read_columns[index] = true; diff --git a/dbms/src/IO/ReadHelpers.cpp b/dbms/src/IO/ReadHelpers.cpp index 809155b218d..b5a93653ef6 100644 --- a/dbms/src/IO/ReadHelpers.cpp +++ b/dbms/src/IO/ReadHelpers.cpp @@ -20,6 +20,7 @@ namespace ErrorCodes extern const int CANNOT_PARSE_INPUT_ASSERTION_FAILED; extern const int CANNOT_PARSE_ESCAPE_SEQUENCE; extern const int CANNOT_PARSE_QUOTED_STRING; + extern const int INCORRECT_DATA; } @@ -380,6 +381,7 @@ void readQuotedString(String & s, ReadBuffer & buf) } template void readQuotedStringInto>(PaddedPODArray & s, ReadBuffer & buf); +template void readDoubleQuotedStringInto(NullSink & s, ReadBuffer & buf); void readDoubleQuotedString(String & s, ReadBuffer & buf) { @@ -522,6 +524,7 @@ void readJSONString(String & s, ReadBuffer & buf) } template void readJSONStringInto>(PaddedPODArray & s, ReadBuffer & buf); +template void readJSONStringInto(NullSink & s, ReadBuffer & buf); void readDateTimeTextFallback(time_t & datetime, ReadBuffer & buf) @@ -569,6 +572,44 @@ void readDateTimeTextFallback(time_t & datetime, ReadBuffer & buf) } +void skipJSONFieldPlain(ReadBuffer & buf, const String & name_of_filed) +{ + if (buf.eof()) + throw Exception("Unexpected EOF for key '" + name_of_filed + "'", ErrorCodes::INCORRECT_DATA); + else if (*buf.position() == '"') /// skip double-quoted string + { + NullSink sink; + readJSONStringInto(sink, buf); + } + else if (isdigit(*buf.position())) /// skip number + { + double v; + if (!tryReadFloatText(v, buf)) + throw Exception("Expected a number field for key '" + name_of_filed + "'", ErrorCodes::INCORRECT_DATA); + } + else if (*buf.position() == 'n') /// skip null + { + assertString("null", buf); + } + else if (*buf.position() == 't') /// skip true + { + assertString("true", buf); + } + else if (*buf.position() == 'f')/// skip false + { + assertString("false", buf); + } + else if (*buf.position() == '{' || *buf.position() == '[') /// fail on nested objects + { + throw Exception("Unexpected nested field for key '" + name_of_filed + "'", ErrorCodes::INCORRECT_DATA); + } + else + { + throw Exception("Unexpected symbol for key '" + name_of_filed + "'", ErrorCodes::INCORRECT_DATA); + } +} + + void readException(Exception & e, ReadBuffer & buf, const String & additional_message) { int code = 0;