From ae50ab69073211a408978833ab40a5875ea19fe1 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Fri, 2 Aug 2019 18:36:36 +0300 Subject: [PATCH] Remove JSONEachRowRowInputStream. --- dbms/src/Formats/FormatFactory.cpp | 2 - .../src/Formats/JSONEachRowRowInputStream.cpp | 272 ------------------ dbms/src/Formats/JSONEachRowRowInputStream.h | 68 ----- .../Formats/Impl/JSONEachRowRowInputFormat.h | 2 +- 4 files changed, 1 insertion(+), 343 deletions(-) delete mode 100644 dbms/src/Formats/JSONEachRowRowInputStream.cpp delete mode 100644 dbms/src/Formats/JSONEachRowRowInputStream.h diff --git a/dbms/src/Formats/FormatFactory.cpp b/dbms/src/Formats/FormatFactory.cpp index 0448e45a444..d49c630cd82 100644 --- a/dbms/src/Formats/FormatFactory.cpp +++ b/dbms/src/Formats/FormatFactory.cpp @@ -207,7 +207,6 @@ void registerInputFormatCSV(FormatFactory & factory); void registerOutputFormatCSV(FormatFactory & factory); void registerInputFormatTSKV(FormatFactory & factory); void registerOutputFormatTSKV(FormatFactory & factory); -void registerInputFormatJSONEachRow(FormatFactory & factory); void registerOutputFormatJSONEachRow(FormatFactory & factory); void registerInputFormatParquet(FormatFactory & factory); void registerOutputFormatParquet(FormatFactory & factory); @@ -273,7 +272,6 @@ FormatFactory::FormatFactory() registerOutputFormatCSV(*this); registerInputFormatTSKV(*this); registerOutputFormatTSKV(*this); - registerInputFormatJSONEachRow(*this); registerOutputFormatJSONEachRow(*this); registerInputFormatProtobuf(*this); registerOutputFormatProtobuf(*this); diff --git a/dbms/src/Formats/JSONEachRowRowInputStream.cpp b/dbms/src/Formats/JSONEachRowRowInputStream.cpp deleted file mode 100644 index 72acf722ae7..00000000000 --- a/dbms/src/Formats/JSONEachRowRowInputStream.cpp +++ /dev/null @@ -1,272 +0,0 @@ -#include - -#include -#include -#include -#include - -namespace DB -{ - -namespace ErrorCodes -{ - extern const int INCORRECT_DATA; - extern const int CANNOT_READ_ALL_DATA; - extern const int LOGICAL_ERROR; -} - -namespace -{ - -enum -{ - UNKNOWN_FIELD = size_t(-1), - NESTED_FIELD = size_t(-2) -}; - -} - - -JSONEachRowRowInputStream::JSONEachRowRowInputStream(ReadBuffer & istr_, const Block & header_, const FormatSettings & format_settings) - : istr(istr_), header(header_), format_settings(format_settings), name_map(header.columns()) -{ - /// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it. - skipBOMIfExists(istr); - - size_t num_columns = header.columns(); - for (size_t i = 0; i < num_columns; ++i) - { - const String & colname = columnName(i); - name_map[colname] = i; /// NOTE You could place names more cache-locally. - if (format_settings.import_nested_json) - { - const auto splitted = Nested::splitName(colname); - if (!splitted.second.empty()) - { - const StringRef table_name(colname.data(), splitted.first.size()); - name_map[table_name] = NESTED_FIELD; - } - } - } - - prev_positions.assign(num_columns, name_map.end()); -} - -const String & JSONEachRowRowInputStream::columnName(size_t i) const -{ - return header.getByPosition(i).name; -} - -inline size_t JSONEachRowRowInputStream::columnIndex(const StringRef & name, size_t key_index) -{ - /// Optimization by caching the order of fields (which is almost always the same) - /// and a quick check to match the next expected field, instead of searching the hash table. - - if (prev_positions.size() > key_index - && prev_positions[key_index] != name_map.end() - && name == prev_positions[key_index]->getFirst()) - { - return prev_positions[key_index]->getSecond(); - } - else - { - const auto it = name_map.find(name); - - if (name_map.end() != it) - { - if (key_index < prev_positions.size()) - prev_positions[key_index] = it; - - return it->getSecond(); - } - else - return UNKNOWN_FIELD; - } -} - -/** Read the field name and convert it to column name - * (taking into account the current nested name prefix) - * Resulting StringRef is valid only before next read from buf. - */ -StringRef JSONEachRowRowInputStream::readColumnName(ReadBuffer & buf) -{ - // This is just an optimization: try to avoid copying the name into current_column_name - - if (nested_prefix_length == 0 && buf.position() + 1 < buf.buffer().end()) - { - char * next_pos = find_first_symbols<'\\', '"'>(buf.position() + 1, buf.buffer().end()); - - if (next_pos != buf.buffer().end() && *next_pos != '\\') - { - /// The most likely option is that there is no escape sequence in the key name, and the entire name is placed in the buffer. - assertChar('"', buf); - StringRef res(buf.position(), next_pos - buf.position()); - buf.position() = next_pos + 1; - return res; - } - } - - current_column_name.resize(nested_prefix_length); - readJSONStringInto(current_column_name, buf); - return current_column_name; -} - - -static inline void skipColonDelimeter(ReadBuffer & istr) -{ - skipWhitespaceIfAny(istr); - assertChar(':', istr); - skipWhitespaceIfAny(istr); -} - -void JSONEachRowRowInputStream::skipUnknownField(const StringRef & name_ref) -{ - if (!format_settings.skip_unknown_fields) - throw Exception("Unknown field found while parsing JSONEachRow format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA); - - skipJSONField(istr, name_ref); -} - -void JSONEachRowRowInputStream::readField(size_t index, MutableColumns & columns) -{ - if (read_columns[index]) - throw Exception("Duplicate field found while parsing JSONEachRow format: " + columnName(index), ErrorCodes::INCORRECT_DATA); - - try - { - header.getByPosition(index).type->deserializeAsTextJSON(*columns[index], istr, format_settings); - } - catch (Exception & e) - { - e.addMessage("(while read the value of key " + columnName(index) + ")"); - throw; - } - - read_columns[index] = true; -} - -inline bool JSONEachRowRowInputStream::advanceToNextKey(size_t key_index) -{ - skipWhitespaceIfAny(istr); - - if (istr.eof()) - throw Exception("Unexpected end of stream while parsing JSONEachRow format", ErrorCodes::CANNOT_READ_ALL_DATA); - else if (*istr.position() == '}') - { - ++istr.position(); - return false; - } - - if (key_index > 0) - { - assertChar(',', istr); - skipWhitespaceIfAny(istr); - } - return true; -} - -void JSONEachRowRowInputStream::readJSONObject(MutableColumns & columns) -{ - assertChar('{', istr); - - for (size_t key_index = 0; advanceToNextKey(key_index); ++key_index) - { - StringRef name_ref = readColumnName(istr); - const size_t column_index = columnIndex(name_ref, key_index); - - if (unlikely(ssize_t(column_index) < 0)) - { - /// name_ref may point directly to the input buffer - /// and input buffer may be filled with new data on next read - /// If we want to use name_ref after another reads from buffer, we must copy it to temporary string. - - current_column_name.assign(name_ref.data, name_ref.size); - name_ref = StringRef(current_column_name); - - skipColonDelimeter(istr); - - if (column_index == UNKNOWN_FIELD) - skipUnknownField(name_ref); - else if (column_index == NESTED_FIELD) - readNestedData(name_ref.toString(), columns); - else - throw Exception("Logical error: illegal value of column_index", ErrorCodes::LOGICAL_ERROR); - } - else - { - skipColonDelimeter(istr); - readField(column_index, columns); - } - } -} - -void JSONEachRowRowInputStream::readNestedData(const String & name, MutableColumns & columns) -{ - current_column_name = name; - current_column_name.push_back('.'); - nested_prefix_length = current_column_name.size(); - readJSONObject(columns); - nested_prefix_length = 0; -} - - -bool JSONEachRowRowInputStream::read(MutableColumns & columns, RowReadExtension & ext) -{ - skipWhitespaceIfAny(istr); - - /// We consume ;, or \n before scanning a new row, instead scanning to next row at the end. - /// The reason is that if we want an exact number of rows read with LIMIT x - /// from a streaming table engine with text data format, like File or Kafka - /// then seeking to next ;, or \n would trigger reading of an extra row at the end. - - /// Semicolon is added for convenience as it could be used at end of INSERT query. - if (!istr.eof() && (*istr.position() == ',' || *istr.position() == ';')) - ++istr.position(); - - skipWhitespaceIfAny(istr); - if (istr.eof()) - return false; - - size_t num_columns = columns.size(); - - /// Set of columns for which the values were read. The rest will be filled with default values. - read_columns.assign(num_columns, false); - - nested_prefix_length = 0; - readJSONObject(columns); - - /// Fill non-visited columns with the default values. - for (size_t i = 0; i < num_columns; ++i) - if (!read_columns[i]) - header.getByPosition(i).type->insertDefaultInto(*columns[i]); - - /// return info about defaults set - ext.read_columns = read_columns; - return true; -} - - -void JSONEachRowRowInputStream::syncAfterError() -{ - skipToUnescapedNextLineOrEOF(istr); -} - - -void registerInputFormatJSONEachRow(FormatFactory & factory) -{ - factory.registerInputFormat("JSONEachRow", []( - ReadBuffer & buf, - const Block & sample, - const Context &, - UInt64 max_block_size, - UInt64 rows_portion_size, - FormatFactory::ReadCallback callback, - const FormatSettings & settings) - { - return std::make_shared( - std::make_shared(buf, sample, settings), - sample, max_block_size, rows_portion_size, callback, settings); - }); -} - -} diff --git a/dbms/src/Formats/JSONEachRowRowInputStream.h b/dbms/src/Formats/JSONEachRowRowInputStream.h deleted file mode 100644 index 726b63b084e..00000000000 --- a/dbms/src/Formats/JSONEachRowRowInputStream.h +++ /dev/null @@ -1,68 +0,0 @@ -#pragma once - -#include -#include -#include -#include - - -namespace DB -{ - -class ReadBuffer; - - -/** A stream for reading data in JSON format, where each row is represented by a separate JSON object. - * Objects can be separated by line feed, other whitespace characters in any number and possibly a comma. - * Fields can be listed in any order (including, in different lines there may be different order), - * and some fields may be missing. - */ -class JSONEachRowRowInputStream : public IRowInputStream -{ -public: - JSONEachRowRowInputStream(ReadBuffer & istr_, const Block & header_, const FormatSettings & format_settings); - - bool read(MutableColumns & columns, RowReadExtension & ext) override; - bool allowSyncAfterError() const override { return true; } - void syncAfterError() override; - -private: - const String & columnName(size_t i) const; - size_t columnIndex(const StringRef & name, size_t key_index); - bool advanceToNextKey(size_t key_index); - void skipUnknownField(const StringRef & name_ref); - StringRef readColumnName(ReadBuffer & buf); - void readField(size_t index, MutableColumns & columns); - void readJSONObject(MutableColumns & columns); - void readNestedData(const String & name, MutableColumns & columns); - -private: - ReadBuffer & istr; - Block header; - - const FormatSettings format_settings; - - /// Buffer for the read from the stream field name. Used when you have to copy it. - /// Also, if processing of Nested data is in progress, it holds the common prefix - /// of the nested column names (so that appending the field name to it produces - /// the full column name) - String current_column_name; - - /// If processing Nested data, holds the length of the common prefix - /// of the names of related nested columns. For example, for a table - /// created as follows - /// CREATE TABLE t (n Nested (i Int32, s String)) - /// the nested column names are 'n.i' and 'n.s' and the nested prefix is 'n.' - size_t nested_prefix_length = 0; - - std::vector read_columns; - - /// Hash table match `field name -> position in the block`. NOTE You can use perfect hash map. - using NameMap = HashMap; - NameMap name_map; - - /// Cached search results for previous row (keyed as index in JSON object) - used as a hint. - std::vector prev_positions; -}; - -} diff --git a/dbms/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.h b/dbms/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.h index 901e35af6e8..1aed7c9dc49 100644 --- a/dbms/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.h +++ b/dbms/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.h @@ -13,7 +13,7 @@ class ReadBuffer; /** A stream for reading data in JSON format, where each row is represented by a separate JSON object. - * Objects can be separated by feed return, other whitespace characters in any number and possibly a comma. + * Objects can be separated by line feed, other whitespace characters in any number and possibly a comma. * Fields can be listed in any order (including, in different lines there may be different order), * and some fields may be missing. */