diff --git a/src/Common/CounterInFile.h b/src/Common/CounterInFile.h index 993ed97966a..854bf7cc675 100644 --- a/src/Common/CounterInFile.h +++ b/src/Common/CounterInFile.h @@ -88,7 +88,7 @@ public: { /// A more understandable error message. if (e.code() == DB::ErrorCodes::CANNOT_READ_ALL_DATA || e.code() == DB::ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF) - throw DB::ParsingException(e.code(), "File {} is empty. You must fill it manually with appropriate value.", path); + throw DB::Exception(e.code(), "File {} is empty. You must fill it manually with appropriate value.", path); else throw; } diff --git a/src/Common/Exception.cpp b/src/Common/Exception.cpp index d5f1984a5ff..e1f010cc740 100644 --- a/src/Common/Exception.cpp +++ b/src/Common/Exception.cpp @@ -616,48 +616,4 @@ ExecutionStatus ExecutionStatus::fromText(const std::string & data) return status; } -ParsingException::ParsingException() = default; -ParsingException::ParsingException(const std::string & msg, int code) - : Exception(msg, code) -{ -} - -/// We use additional field formatted_message_ to make this method const. -std::string ParsingException::displayText() const -{ - try - { - formatted_message = message(); - bool need_newline = false; - if (!file_name.empty()) - { - formatted_message += fmt::format(": (in file/uri {})", file_name); - need_newline = true; - } - - if (line_number != -1) - { - formatted_message += fmt::format(": (at row {})", line_number); - need_newline = true; - } - - if (need_newline) - formatted_message += "\n"; - } - catch (...) {} // NOLINT(bugprone-empty-catch) - - if (!formatted_message.empty()) - { - std::string result = name(); - result.append(": "); - result.append(formatted_message); - return result; - } - else - { - return Exception::displayText(); - } -} - - } diff --git a/src/Common/Exception.h b/src/Common/Exception.h index aabc848b230..6f30fde3876 100644 --- a/src/Common/Exception.h +++ b/src/Common/Exception.h @@ -235,43 +235,6 @@ private: const char * className() const noexcept override { return "DB::ErrnoException"; } }; - -/// Special class of exceptions, used mostly in ParallelParsingInputFormat for -/// more convenient calculation of problem line number. -class ParsingException : public Exception -{ - ParsingException(const std::string & msg, int code); -public: - ParsingException(); - - // Format message with fmt::format, like the logging functions. - template - ParsingException(int code, FormatStringHelper fmt, Args &&... args) : Exception(fmt::format(fmt.fmt_str, std::forward(args)...), code) - { - message_format_string = fmt.message_format_string; - } - - std::string displayText() const override; - - ssize_t getLineNumber() const { return line_number; } - void setLineNumber(int line_number_) { line_number = line_number_;} - - String getFileName() const { return file_name; } - void setFileName(const String & file_name_) { file_name = file_name_; } - - Exception * clone() const override { return new ParsingException(*this); } - void rethrow() const override { throw *this; } // NOLINT - -private: - ssize_t line_number{-1}; - String file_name; - mutable std::string formatted_message; - - const char * name() const noexcept override { return "DB::ParsingException"; } - const char * className() const noexcept override { return "DB::ParsingException"; } -}; - - using Exceptions = std::vector; /** Try to write an exception to the log (and forget about it). diff --git a/src/DataTypes/Serializations/SerializationArray.cpp b/src/DataTypes/Serializations/SerializationArray.cpp index 1a21a45d7b8..0d99b741a23 100644 --- a/src/DataTypes/Serializations/SerializationArray.cpp +++ b/src/DataTypes/Serializations/SerializationArray.cpp @@ -390,7 +390,7 @@ void SerializationArray::deserializeBinaryBulkWithMultipleStreams( /// Check consistency between offsets and elements subcolumns. /// But if elements column is empty - it's ok for columns of Nested types that was added by ALTER. if (!nested_column->empty() && nested_column->size() != last_offset) - throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read all array values: read just {} of {}", + throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read all array values: read just {} of {}", toString(nested_column->size()), toString(last_offset)); column = std::move(mutable_column); @@ -445,7 +445,7 @@ static void deserializeTextImpl(IColumn & column, ReadBuffer & istr, Reader && r if (*istr.position() == ',') ++istr.position(); else - throw ParsingException(ErrorCodes::CANNOT_READ_ARRAY_FROM_TEXT, + throw Exception(ErrorCodes::CANNOT_READ_ARRAY_FROM_TEXT, "Cannot read array from text, expected comma or end of array, found '{}'", *istr.position()); } diff --git a/src/DataTypes/Serializations/SerializationNullable.cpp b/src/DataTypes/Serializations/SerializationNullable.cpp index 15203bdc9fa..d9efc6fff10 100644 --- a/src/DataTypes/Serializations/SerializationNullable.cpp +++ b/src/DataTypes/Serializations/SerializationNullable.cpp @@ -359,7 +359,7 @@ ReturnType SerializationNullable::deserializeTextEscapedAndRawImpl(IColumn & col nested_column.popBack(1); if (null_representation.find('\t') != std::string::npos || null_representation.find('\n') != std::string::npos) - throw DB::ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "TSV custom null representation " + throw DB::Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "TSV custom null representation " "containing '\\t' or '\\n' may not work correctly for large input."); WriteBufferFromOwnString parsed_value; @@ -367,7 +367,7 @@ ReturnType SerializationNullable::deserializeTextEscapedAndRawImpl(IColumn & col nested_serialization->serializeTextEscaped(nested_column, nested_column.size() - 1, parsed_value, settings); else nested_serialization->serializeTextRaw(nested_column, nested_column.size() - 1, parsed_value, settings); - throw DB::ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while parsing \"{}{}\" as Nullable" + throw DB::Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while parsing \"{}{}\" as Nullable" " at position {}: got \"{}\", which was deserialized as \"{}\". " "It seems that input data is ill-formatted.", std::string(pos, buf.buffer().end()), @@ -452,7 +452,7 @@ ReturnType SerializationNullable::deserializeTextQuotedImpl(IColumn & column, Re /// It can happen only if there is an unquoted string instead of a number. /// We also should delete incorrectly deserialized value from nested column. nested_column.popBack(1); - throw DB::ParsingException( + throw DB::Exception( ErrorCodes::CANNOT_READ_ALL_DATA, "Error while parsing Nullable: got an unquoted string {} instead of a number", String(buf.position(), std::min(10ul, buf.available()))); @@ -589,12 +589,12 @@ ReturnType SerializationNullable::deserializeTextCSVImpl(IColumn & column, ReadB if (null_representation.find(settings.csv.delimiter) != std::string::npos || null_representation.find('\r') != std::string::npos || null_representation.find('\n') != std::string::npos) - throw DB::ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "CSV custom null representation containing " + throw DB::Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "CSV custom null representation containing " "format_csv_delimiter, '\\r' or '\\n' may not work correctly for large input."); WriteBufferFromOwnString parsed_value; nested_serialization->serializeTextCSV(nested_column, nested_column.size() - 1, parsed_value, settings); - throw DB::ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while parsing \"{}{}\" as Nullable" + throw DB::Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while parsing \"{}{}\" as Nullable" " at position {}: got \"{}\", which was deserialized as \"{}\". " "It seems that input data is ill-formatted.", std::string(pos, buf.buffer().end()), diff --git a/src/Formats/JSONUtils.cpp b/src/Formats/JSONUtils.cpp index b8b9a9ecb0d..779f38032d8 100644 --- a/src/Formats/JSONUtils.cpp +++ b/src/Formats/JSONUtils.cpp @@ -43,7 +43,7 @@ namespace JSONUtils { const auto current_object_size = memory.size() + static_cast(pos - in.position()); if (min_bytes != 0 && current_object_size > 10 * min_bytes) - throw ParsingException(ErrorCodes::INCORRECT_DATA, + throw Exception(ErrorCodes::INCORRECT_DATA, "Size of JSON object at position {} is extremely large. Expected not greater than {} bytes, but current is {} bytes per row. " "Increase the value setting 'min_chunk_bytes_for_parallel_parsing' or check your data manually, " "most likely JSON is malformed", in.count(), min_bytes, current_object_size); diff --git a/src/Formats/NativeReader.cpp b/src/Formats/NativeReader.cpp index 4c25460eb63..8286b24d0a6 100644 --- a/src/Formats/NativeReader.cpp +++ b/src/Formats/NativeReader.cpp @@ -120,7 +120,7 @@ Block NativeReader::read() if (istr.eof()) { if (use_index) - throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Input doesn't contain all data for index."); + throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Input doesn't contain all data for index."); return res; } diff --git a/src/IO/ReadHelpers.cpp b/src/IO/ReadHelpers.cpp index ff5743a63af..256354b2833 100644 --- a/src/IO/ReadHelpers.cpp +++ b/src/IO/ReadHelpers.cpp @@ -89,7 +89,7 @@ void NO_INLINE throwAtAssertionFailed(const char * s, ReadBuffer & buf) else out << " before: " << quote << String(buf.position(), std::min(SHOW_CHARS_ON_SYNTAX_ERROR, buf.buffer().end() - buf.position())); - throw ParsingException(ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED, "Cannot parse input: expected {}", out.str()); + throw Exception(ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED, "Cannot parse input: expected {}", out.str()); } @@ -562,7 +562,7 @@ static ReturnType readAnyQuotedStringInto(Vector & s, ReadBuffer & buf) if (buf.eof() || *buf.position() != quote) { if constexpr (throw_exception) - throw ParsingException(ErrorCodes::CANNOT_PARSE_QUOTED_STRING, + throw Exception(ErrorCodes::CANNOT_PARSE_QUOTED_STRING, "Cannot parse quoted string: expected opening quote '{}', got '{}'", std::string{quote}, buf.eof() ? "EOF" : std::string{*buf.position()}); else @@ -608,7 +608,7 @@ static ReturnType readAnyQuotedStringInto(Vector & s, ReadBuffer & buf) } if constexpr (throw_exception) - throw ParsingException(ErrorCodes::CANNOT_PARSE_QUOTED_STRING, "Cannot parse quoted string: expected closing quote"); + throw Exception(ErrorCodes::CANNOT_PARSE_QUOTED_STRING, "Cannot parse quoted string: expected closing quote"); else return ReturnType(false); } @@ -958,7 +958,7 @@ ReturnType readJSONStringInto(Vector & s, ReadBuffer & buf) auto error = [](FormatStringHelper<> message [[maybe_unused]], int code [[maybe_unused]]) { if constexpr (throw_exception) - throw ParsingException(code, std::move(message)); + throw Exception(code, std::move(message)); return ReturnType(false); }; @@ -1009,7 +1009,7 @@ ReturnType readJSONObjectOrArrayPossiblyInvalid(Vector & s, ReadBuffer & buf) auto error = [](FormatStringHelper<> message [[maybe_unused]], int code [[maybe_unused]]) { if constexpr (throw_exception) - throw ParsingException(code, std::move(message)); + throw Exception(code, std::move(message)); return ReturnType(false); }; @@ -1185,7 +1185,7 @@ ReturnType readDateTimeTextFallback(time_t & datetime, ReadBuffer & buf, const D else { if constexpr (throw_exception) - throw ParsingException(ErrorCodes::CANNOT_PARSE_DATETIME, "Cannot parse DateTime"); + throw Exception(ErrorCodes::CANNOT_PARSE_DATETIME, "Cannot parse DateTime"); else return false; } @@ -1212,7 +1212,7 @@ ReturnType readDateTimeTextFallback(time_t & datetime, ReadBuffer & buf, const D s_pos[size] = 0; if constexpr (throw_exception) - throw ParsingException(ErrorCodes::CANNOT_PARSE_DATETIME, "Cannot parse DateTime {}", s); + throw Exception(ErrorCodes::CANNOT_PARSE_DATETIME, "Cannot parse DateTime {}", s); else return false; } @@ -1235,7 +1235,7 @@ ReturnType readDateTimeTextFallback(time_t & datetime, ReadBuffer & buf, const D s_pos[size] = 0; if constexpr (throw_exception) - throw ParsingException(ErrorCodes::CANNOT_PARSE_DATETIME, "Cannot parse time component of DateTime {}", s); + throw Exception(ErrorCodes::CANNOT_PARSE_DATETIME, "Cannot parse time component of DateTime {}", s); else return false; } @@ -1266,7 +1266,7 @@ ReturnType readDateTimeTextFallback(time_t & datetime, ReadBuffer & buf, const D if (too_short && negative_multiplier != -1) { if constexpr (throw_exception) - throw ParsingException(ErrorCodes::CANNOT_PARSE_DATETIME, "Cannot parse DateTime"); + throw Exception(ErrorCodes::CANNOT_PARSE_DATETIME, "Cannot parse DateTime"); else return false; } diff --git a/src/IO/ReadHelpers.h b/src/IO/ReadHelpers.h index bba0b694d23..85584d63ee8 100644 --- a/src/IO/ReadHelpers.h +++ b/src/IO/ReadHelpers.h @@ -296,7 +296,7 @@ inline void readBoolTextWord(bool & x, ReadBuffer & buf, bool support_upper_case [[fallthrough]]; } default: - throw ParsingException(ErrorCodes::CANNOT_PARSE_BOOL, "Unexpected Bool value"); + throw Exception(ErrorCodes::CANNOT_PARSE_BOOL, "Unexpected Bool value"); } } @@ -340,7 +340,7 @@ ReturnType readIntTextImpl(T & x, ReadBuffer & buf) if (has_sign) { if constexpr (throw_exception) - throw ParsingException(ErrorCodes::CANNOT_PARSE_NUMBER, + throw Exception(ErrorCodes::CANNOT_PARSE_NUMBER, "Cannot parse number with multiple sign (+/-) characters"); else return ReturnType(false); @@ -357,7 +357,7 @@ ReturnType readIntTextImpl(T & x, ReadBuffer & buf) if (has_sign) { if constexpr (throw_exception) - throw ParsingException(ErrorCodes::CANNOT_PARSE_NUMBER, + throw Exception(ErrorCodes::CANNOT_PARSE_NUMBER, "Cannot parse number with multiple sign (+/-) characters"); else return ReturnType(false); @@ -368,7 +368,7 @@ ReturnType readIntTextImpl(T & x, ReadBuffer & buf) else { if constexpr (throw_exception) - throw ParsingException(ErrorCodes::CANNOT_PARSE_NUMBER, "Unsigned type must not contain '-' symbol"); + throw Exception(ErrorCodes::CANNOT_PARSE_NUMBER, "Unsigned type must not contain '-' symbol"); else return ReturnType(false); } @@ -430,7 +430,7 @@ end: if (has_sign && !has_number) { if constexpr (throw_exception) - throw ParsingException(ErrorCodes::CANNOT_PARSE_NUMBER, + throw Exception(ErrorCodes::CANNOT_PARSE_NUMBER, "Cannot parse number with a sign character but without any numeric character"); else return ReturnType(false); @@ -837,7 +837,7 @@ inline ReturnType readUUIDTextImpl(UUID & uuid, ReadBuffer & buf) if constexpr (throw_exception) { - throw ParsingException(ErrorCodes::CANNOT_PARSE_UUID, "Cannot parse uuid {}", s); + throw Exception(ErrorCodes::CANNOT_PARSE_UUID, "Cannot parse uuid {}", s); } else { @@ -855,7 +855,7 @@ inline ReturnType readUUIDTextImpl(UUID & uuid, ReadBuffer & buf) if constexpr (throw_exception) { - throw ParsingException(ErrorCodes::CANNOT_PARSE_UUID, "Cannot parse uuid {}", s); + throw Exception(ErrorCodes::CANNOT_PARSE_UUID, "Cannot parse uuid {}", s); } else { @@ -881,7 +881,7 @@ inline ReturnType readIPv4TextImpl(IPv4 & ip, ReadBuffer & buf) return ReturnType(true); if constexpr (std::is_same_v) - throw ParsingException(ErrorCodes::CANNOT_PARSE_IPV4, "Cannot parse IPv4 {}", std::string_view(buf.position(), buf.available())); + throw Exception(ErrorCodes::CANNOT_PARSE_IPV4, "Cannot parse IPv4 {}", std::string_view(buf.position(), buf.available())); else return ReturnType(false); } @@ -903,7 +903,7 @@ inline ReturnType readIPv6TextImpl(IPv6 & ip, ReadBuffer & buf) return ReturnType(true); if constexpr (std::is_same_v) - throw ParsingException(ErrorCodes::CANNOT_PARSE_IPV6, "Cannot parse IPv6 {}", std::string_view(buf.position(), buf.available())); + throw Exception(ErrorCodes::CANNOT_PARSE_IPV6, "Cannot parse IPv6 {}", std::string_view(buf.position(), buf.available())); else return ReturnType(false); } @@ -944,7 +944,7 @@ inline ReturnType readDateTimeTextImpl(time_t & datetime, ReadBuffer & buf, cons if (!buf.eof() && !isNumericASCII(*buf.position())) { if constexpr (throw_exception) - throw ParsingException(ErrorCodes::CANNOT_PARSE_DATETIME, "Cannot parse datetime"); + throw Exception(ErrorCodes::CANNOT_PARSE_DATETIME, "Cannot parse datetime"); else return false; } @@ -1017,7 +1017,7 @@ inline ReturnType readDateTimeTextImpl(DateTime64 & datetime64, UInt32 scale, Re { readDateTimeTextImpl(whole, buf, date_lut); } - catch (const DB::ParsingException &) + catch (const DB::Exception &) { if (buf.eof() || *buf.position() != '.') throw; @@ -1125,7 +1125,7 @@ inline void readDateTimeText(LocalDateTime & datetime, ReadBuffer & buf) if (10 != size) { s[size] = 0; - throw ParsingException(ErrorCodes::CANNOT_PARSE_DATETIME, "Cannot parse DateTime {}", s); + throw Exception(ErrorCodes::CANNOT_PARSE_DATETIME, "Cannot parse DateTime {}", s); } datetime.year((s[0] - '0') * 1000 + (s[1] - '0') * 100 + (s[2] - '0') * 10 + (s[3] - '0')); @@ -1141,7 +1141,7 @@ inline void readDateTimeText(LocalDateTime & datetime, ReadBuffer & buf) if (8 != size) { s[size] = 0; - throw ParsingException(ErrorCodes::CANNOT_PARSE_DATETIME, "Cannot parse time component of DateTime {}", s); + throw Exception(ErrorCodes::CANNOT_PARSE_DATETIME, "Cannot parse time component of DateTime {}", s); } datetime.hour((s[0] - '0') * 10 + (s[1] - '0')); @@ -1174,7 +1174,7 @@ inline ReturnType readTimeTextImpl(time_t & time, ReadBuffer & buf) s[size] = 0; if constexpr (throw_exception) - throw ParsingException(ErrorCodes::CANNOT_PARSE_DATETIME, "Cannot parse DateTime {}", s); + throw Exception(ErrorCodes::CANNOT_PARSE_DATETIME, "Cannot parse DateTime {}", s); else return false; } @@ -1482,7 +1482,7 @@ void readQuoted(std::vector & x, ReadBuffer & buf) if (*buf.position() == ',') ++buf.position(); else - throw ParsingException(ErrorCodes::CANNOT_READ_ARRAY_FROM_TEXT, "Cannot read array from text"); + throw Exception(ErrorCodes::CANNOT_READ_ARRAY_FROM_TEXT, "Cannot read array from text"); } first = false; @@ -1505,7 +1505,7 @@ void readDoubleQuoted(std::vector & x, ReadBuffer & buf) if (*buf.position() == ',') ++buf.position(); else - throw ParsingException(ErrorCodes::CANNOT_READ_ARRAY_FROM_TEXT, "Cannot read array from text"); + throw Exception(ErrorCodes::CANNOT_READ_ARRAY_FROM_TEXT, "Cannot read array from text"); } first = false; diff --git a/src/IO/parseDateTimeBestEffort.cpp b/src/IO/parseDateTimeBestEffort.cpp index 83fde8e8830..9734ba1c84f 100644 --- a/src/IO/parseDateTimeBestEffort.cpp +++ b/src/IO/parseDateTimeBestEffort.cpp @@ -95,7 +95,7 @@ ReturnType parseDateTimeBestEffortImpl( FmtArgs && ...fmt_args [[maybe_unused]]) { if constexpr (std::is_same_v) - throw ParsingException(error_code, std::move(fmt_string), std::forward(fmt_args)...); + throw Exception(error_code, std::move(fmt_string), std::forward(fmt_args)...); else return false; }; diff --git a/src/IO/readDecimalText.h b/src/IO/readDecimalText.h index 9fd9c439b87..3417310a990 100644 --- a/src/IO/readDecimalText.h +++ b/src/IO/readDecimalText.h @@ -121,7 +121,7 @@ inline bool readDigits(ReadBuffer & buf, T & x, uint32_t & digits, int32_t & exp if (!tryReadIntText(addition_exp, buf)) { if constexpr (_throw_on_error) - throw ParsingException(ErrorCodes::CANNOT_PARSE_NUMBER, "Cannot parse exponent while reading decimal"); + throw Exception(ErrorCodes::CANNOT_PARSE_NUMBER, "Cannot parse exponent while reading decimal"); else return false; } @@ -134,7 +134,7 @@ inline bool readDigits(ReadBuffer & buf, T & x, uint32_t & digits, int32_t & exp if (digits_only) { if constexpr (_throw_on_error) - throw ParsingException(ErrorCodes::CANNOT_PARSE_NUMBER, "Unexpected symbol while reading decimal"); + throw Exception(ErrorCodes::CANNOT_PARSE_NUMBER, "Unexpected symbol while reading decimal"); return false; } stop = true; diff --git a/src/IO/readFloatText.h b/src/IO/readFloatText.h index b0682576183..23e904f305a 100644 --- a/src/IO/readFloatText.h +++ b/src/IO/readFloatText.h @@ -160,7 +160,7 @@ ReturnType readFloatTextPreciseImpl(T & x, ReadBuffer & buf) if (unlikely(res.ec != std::errc())) { if constexpr (throw_exception) - throw ParsingException( + throw Exception( ErrorCodes::CANNOT_PARSE_NUMBER, "Cannot read floating point value here: {}", String(initial_position, buf.buffer().end() - initial_position)); @@ -253,7 +253,7 @@ ReturnType readFloatTextPreciseImpl(T & x, ReadBuffer & buf) if (unlikely(res.ec != std::errc() || res.ptr - tmp_buf != num_copied_chars)) { if constexpr (throw_exception) - throw ParsingException( + throw Exception( ErrorCodes::CANNOT_PARSE_NUMBER, "Cannot read floating point value here: {}", String(tmp_buf, num_copied_chars)); else return ReturnType(false); @@ -342,7 +342,7 @@ ReturnType readFloatTextFastImpl(T & x, ReadBuffer & in) if (in.eof()) { if constexpr (throw_exception) - throw ParsingException(ErrorCodes::CANNOT_PARSE_NUMBER, "Cannot read floating point value"); + throw Exception(ErrorCodes::CANNOT_PARSE_NUMBER, "Cannot read floating point value"); else return false; } @@ -400,7 +400,7 @@ ReturnType readFloatTextFastImpl(T & x, ReadBuffer & in) if (in.eof()) { if constexpr (throw_exception) - throw ParsingException(ErrorCodes::CANNOT_PARSE_NUMBER, "Cannot read floating point value: nothing after exponent"); + throw Exception(ErrorCodes::CANNOT_PARSE_NUMBER, "Cannot read floating point value: nothing after exponent"); else return false; } @@ -438,7 +438,7 @@ ReturnType readFloatTextFastImpl(T & x, ReadBuffer & in) if (in.eof()) { if constexpr (throw_exception) - throw ParsingException(ErrorCodes::CANNOT_PARSE_NUMBER, "Cannot read floating point value: no digits read"); + throw Exception(ErrorCodes::CANNOT_PARSE_NUMBER, "Cannot read floating point value: no digits read"); else return false; } @@ -449,14 +449,14 @@ ReturnType readFloatTextFastImpl(T & x, ReadBuffer & in) if (in.eof()) { if constexpr (throw_exception) - throw ParsingException(ErrorCodes::CANNOT_PARSE_NUMBER, "Cannot read floating point value: nothing after plus sign"); + throw Exception(ErrorCodes::CANNOT_PARSE_NUMBER, "Cannot read floating point value: nothing after plus sign"); else return false; } else if (negative) { if constexpr (throw_exception) - throw ParsingException(ErrorCodes::CANNOT_PARSE_NUMBER, "Cannot read floating point value: plus after minus sign"); + throw Exception(ErrorCodes::CANNOT_PARSE_NUMBER, "Cannot read floating point value: plus after minus sign"); else return false; } diff --git a/src/Processors/Formats/IInputFormat.cpp b/src/Processors/Formats/IInputFormat.cpp index e487a0054e7..3009e91c45a 100644 --- a/src/Processors/Formats/IInputFormat.cpp +++ b/src/Processors/Formats/IInputFormat.cpp @@ -1,6 +1,7 @@ #include #include - +#include +#include namespace DB { @@ -11,6 +12,21 @@ IInputFormat::IInputFormat(Block header, ReadBuffer * in_) column_mapping = std::make_shared(); } +Chunk IInputFormat::generate() +{ + try + { + return read(); + } + catch (Exception & e) + { + auto file_name = getFileNameFromReadBuffer(getReadBuffer()); + if (!file_name.empty()) + e.addMessage(fmt::format("(in file/uri {})", file_name)); + throw; + } +} + void IInputFormat::resetParser() { chassert(in); diff --git a/src/Processors/Formats/IInputFormat.h b/src/Processors/Formats/IInputFormat.h index 6722f5ebebf..713c1089d28 100644 --- a/src/Processors/Formats/IInputFormat.h +++ b/src/Processors/Formats/IInputFormat.h @@ -27,6 +27,11 @@ public: /// ReadBuffer can be nullptr for random-access formats. IInputFormat(Block header, ReadBuffer * in_); + Chunk generate() override; + + /// All data reading from the read buffer must be performed by this method. + virtual Chunk read() = 0; + /** In some usecase (hello Kafka) we need to read a lot of tiny streams in exactly the same format. * The recreating of parser for each small stream takes too long, so we introduce a method * resetParser() which allow to reset the state of parser to continue reading of @@ -49,8 +54,9 @@ public: /// Must be called from ParallelParsingInputFormat before readPrefix void setColumnMapping(ColumnMappingPtr column_mapping_) { column_mapping = column_mapping_; } - size_t getCurrentUnitNumber() const { return current_unit_number; } - void setCurrentUnitNumber(size_t current_unit_number_) { current_unit_number = current_unit_number_; } + /// Set the number of rows that was already read in + /// parallel parsing before creating this parser. + virtual void setRowsReadBefore(size_t /*rows*/) {} void addBuffer(std::unique_ptr buffer) { owned_buffers.emplace_back(std::move(buffer)); } @@ -72,9 +78,6 @@ protected: bool need_only_count = false; private: - /// Number of currently parsed chunk (if parallel parsing is enabled) - size_t current_unit_number = 0; - std::vector> owned_buffers; }; diff --git a/src/Processors/Formats/IRowInputFormat.cpp b/src/Processors/Formats/IRowInputFormat.cpp index 8c563b6f13b..5f27fa78c55 100644 --- a/src/Processors/Formats/IRowInputFormat.cpp +++ b/src/Processors/Formats/IRowInputFormat.cpp @@ -83,7 +83,7 @@ void IRowInputFormat::logError() errors_logger->logError(InputFormatErrorsLogger::ErrorEntry{now_time, total_rows, diagnostic, raw_data}); } -Chunk IRowInputFormat::generate() +Chunk IRowInputFormat::read() { if (total_rows == 0) { @@ -93,10 +93,6 @@ Chunk IRowInputFormat::generate() } catch (Exception & e) { - auto file_name = getFileNameFromReadBuffer(getReadBuffer()); - if (!file_name.empty()) - e.addMessage(fmt::format("(in file/uri {})", file_name)); - e.addMessage("(while reading header)"); throw; } @@ -132,8 +128,6 @@ Chunk IRowInputFormat::generate() { try { - ++total_rows; - info.read_columns.clear(); continue_reading = readRow(columns, info); @@ -148,6 +142,8 @@ Chunk IRowInputFormat::generate() } } + ++total_rows; + /// Some formats may read row AND say the read is finished. /// For such a case, get the number or rows from first column. if (!columns.empty()) @@ -162,6 +158,8 @@ Chunk IRowInputFormat::generate() } catch (Exception & e) { + ++total_rows; + /// Logic for possible skipping of errors. if (!isParseError(e.code())) @@ -204,27 +202,6 @@ Chunk IRowInputFormat::generate() } } } - catch (ParsingException & e) - { - String verbose_diagnostic; - try - { - verbose_diagnostic = getDiagnosticInfo(); - } - catch (const Exception & exception) - { - verbose_diagnostic = "Cannot get verbose diagnostic: " + exception.message(); - } - catch (...) // NOLINT(bugprone-empty-catch) - { - /// Error while trying to obtain verbose diagnostic. Ok to ignore. - } - - e.setFileName(getFileNameFromReadBuffer(getReadBuffer())); - e.setLineNumber(static_cast(total_rows)); - e.addMessage(verbose_diagnostic); - throw; - } catch (Exception & e) { if (!isParseError(e.code())) @@ -244,10 +221,6 @@ Chunk IRowInputFormat::generate() /// Error while trying to obtain verbose diagnostic. Ok to ignore. } - auto file_name = getFileNameFromReadBuffer(getReadBuffer()); - if (!file_name.empty()) - e.addMessage(fmt::format("(in file/uri {})", file_name)); - e.addMessage(fmt::format("(at row {})\n", total_rows)); e.addMessage(verbose_diagnostic); throw; diff --git a/src/Processors/Formats/IRowInputFormat.h b/src/Processors/Formats/IRowInputFormat.h index 1b48647a224..f8796df8604 100644 --- a/src/Processors/Formats/IRowInputFormat.h +++ b/src/Processors/Formats/IRowInputFormat.h @@ -42,7 +42,7 @@ public: IRowInputFormat(Block header, ReadBuffer & in_, Params params_); - Chunk generate() override; + Chunk read() override; void resetParser() override; @@ -79,10 +79,12 @@ protected: const BlockMissingValues & getMissingValues() const override { return block_missing_values; } - size_t getTotalRows() const { return total_rows; } + size_t getRowNum() const { return total_rows; } size_t getApproxBytesReadForChunk() const override { return approx_bytes_read_for_chunk; } + void setRowsReadBefore(size_t rows) override { total_rows = rows; } + Serializations serializations; private: diff --git a/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp b/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp index bac6c540381..206e244c75f 100644 --- a/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp @@ -28,7 +28,7 @@ ArrowBlockInputFormat::ArrowBlockInputFormat(ReadBuffer & in_, const Block & hea { } -Chunk ArrowBlockInputFormat::generate() +Chunk ArrowBlockInputFormat::read() { Chunk res; block_missing_values.clear(); @@ -64,7 +64,7 @@ Chunk ArrowBlockInputFormat::generate() { auto rows = file_reader->RecordBatchCountRows(record_batch_current++); if (!rows.ok()) - throw ParsingException( + throw Exception( ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading batch of Arrow data: {}", rows.status().ToString()); return getChunkForCount(*rows); } @@ -73,12 +73,12 @@ Chunk ArrowBlockInputFormat::generate() } if (!batch_result.ok()) - throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, + throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading batch of Arrow data: {}", batch_result.status().ToString()); auto table_result = arrow::Table::FromRecordBatches({*batch_result}); if (!table_result.ok()) - throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, + throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading batch of Arrow data: {}", table_result.status().ToString()); ++record_batch_current; @@ -213,7 +213,7 @@ std::optional ArrowSchemaReader::readNumberOrRows() auto rows = file_reader->CountRows(); if (!rows.ok()) - throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading batch of Arrow data: {}", rows.status().ToString()); + throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading batch of Arrow data: {}", rows.status().ToString()); return *rows; } diff --git a/src/Processors/Formats/Impl/ArrowBlockInputFormat.h b/src/Processors/Formats/Impl/ArrowBlockInputFormat.h index 06a7b470312..cdbc5e57e4e 100644 --- a/src/Processors/Formats/Impl/ArrowBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ArrowBlockInputFormat.h @@ -30,7 +30,7 @@ public: size_t getApproxBytesReadForChunk() const override { return approx_bytes_read_for_chunk; } private: - Chunk generate() override; + Chunk read() override; void onCancel() override { diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp index 9841b5e70c6..46d1c426ef4 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp @@ -186,7 +186,7 @@ static AvroDeserializer::DeserializeFn createDecimalDeserializeFn(const avro::No tmp = decoder.decodeBytes(); if (tmp.size() > field_type_size || tmp.empty()) - throw ParsingException( + throw Exception( ErrorCodes::CANNOT_PARSE_UUID, "Cannot parse type {}, expected non-empty binary data with size equal to or less than {}, got {}", target_type->getName(), @@ -274,7 +274,7 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(const avro { decoder.decodeString(tmp); if (tmp.length() != 36) - throw ParsingException(ErrorCodes::CANNOT_PARSE_UUID, "Cannot parse uuid {}", tmp); + throw Exception(ErrorCodes::CANNOT_PARSE_UUID, "Cannot parse uuid {}", tmp); const UUID uuid = parseUUID({reinterpret_cast(tmp.data()), tmp.length()}); assert_cast(column).insertValue(uuid); @@ -530,7 +530,7 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(const avro { decoder.decodeFixed(fixed_size, tmp); if (tmp.size() != 36) - throw ParsingException(ErrorCodes::CANNOT_PARSE_UUID, "Cannot parse UUID from type Fixed, because it's size ({}) is not equal to the size of UUID (36)", fixed_size); + throw Exception(ErrorCodes::CANNOT_PARSE_UUID, "Cannot parse UUID from type Fixed, because it's size ({}) is not equal to the size of UUID (36)", fixed_size); const UUID uuid = parseUUID({reinterpret_cast(tmp.data()), tmp.size()}); assert_cast(column).insertValue(uuid); diff --git a/src/Processors/Formats/Impl/BSONEachRowRowInputFormat.cpp b/src/Processors/Formats/Impl/BSONEachRowRowInputFormat.cpp index b38aaa426fd..340bcc8aae5 100644 --- a/src/Processors/Formats/Impl/BSONEachRowRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/BSONEachRowRowInputFormat.cpp @@ -1031,17 +1031,17 @@ fileSegmentationEngineBSONEachRow(ReadBuffer & in, DB::Memory<> & memory, size_t readBinaryLittleEndian(document_size, in); if (document_size < sizeof(document_size)) - throw ParsingException(ErrorCodes::INCORRECT_DATA, "Size of BSON document is invalid"); + throw Exception(ErrorCodes::INCORRECT_DATA, "Size of BSON document is invalid"); if (min_bytes != 0 && document_size > 10 * min_bytes) - throw ParsingException( + throw Exception( ErrorCodes::INCORRECT_DATA, "Size of BSON document is extremely large. Expected not greater than {} bytes, but current is {} bytes per row. Increase " "the value setting 'min_chunk_bytes_for_parallel_parsing' or check your data manually, most likely BSON is malformed", min_bytes, document_size); if (document_size < sizeof(document_size)) - throw ParsingException(ErrorCodes::INCORRECT_DATA, "Size of BSON document is invalid"); + throw Exception(ErrorCodes::INCORRECT_DATA, "Size of BSON document is invalid"); size_t old_size = memory.size(); memory.resize(old_size + document_size); diff --git a/src/Processors/Formats/Impl/BSONEachRowRowInputFormat.h b/src/Processors/Formats/Impl/BSONEachRowRowInputFormat.h index 5e8bee50963..a1f197557b4 100644 --- a/src/Processors/Formats/Impl/BSONEachRowRowInputFormat.h +++ b/src/Processors/Formats/Impl/BSONEachRowRowInputFormat.h @@ -57,9 +57,6 @@ public: void resetParser() override; private: - void readPrefix() override { } - void readSuffix() override { } - bool readRow(MutableColumns & columns, RowReadExtension & ext) override; bool allowSyncAfterError() const override { return true; } void syncAfterError() override; diff --git a/src/Processors/Formats/Impl/DWARFBlockInputFormat.cpp b/src/Processors/Formats/Impl/DWARFBlockInputFormat.cpp index 4c3bb219415..43ef2521032 100644 --- a/src/Processors/Formats/Impl/DWARFBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/DWARFBlockInputFormat.cpp @@ -888,7 +888,7 @@ void DWARFBlockInputFormat::parseRanges( } } -Chunk DWARFBlockInputFormat::generate() +Chunk DWARFBlockInputFormat::read() { initializeIfNeeded(); diff --git a/src/Processors/Formats/Impl/DWARFBlockInputFormat.h b/src/Processors/Formats/Impl/DWARFBlockInputFormat.h index e1409dd3373..0345a264d47 100644 --- a/src/Processors/Formats/Impl/DWARFBlockInputFormat.h +++ b/src/Processors/Formats/Impl/DWARFBlockInputFormat.h @@ -30,7 +30,7 @@ public: size_t getApproxBytesReadForChunk() const override { return approx_bytes_read_for_chunk; } protected: - Chunk generate() override; + Chunk read() override; void onCancel() override { diff --git a/src/Processors/Formats/Impl/JSONColumnsBlockInputFormatBase.cpp b/src/Processors/Formats/Impl/JSONColumnsBlockInputFormatBase.cpp index 1c148f5b3d3..53cb5a77898 100644 --- a/src/Processors/Formats/Impl/JSONColumnsBlockInputFormatBase.cpp +++ b/src/Processors/Formats/Impl/JSONColumnsBlockInputFormatBase.cpp @@ -109,7 +109,7 @@ void JSONColumnsBlockInputFormatBase::setReadBuffer(ReadBuffer & in_) IInputFormat::setReadBuffer(in_); } -Chunk JSONColumnsBlockInputFormatBase::generate() +Chunk JSONColumnsBlockInputFormatBase::read() { MutableColumns columns = getPort().getHeader().cloneEmptyColumns(); block_missing_values.clear(); diff --git a/src/Processors/Formats/Impl/JSONColumnsBlockInputFormatBase.h b/src/Processors/Formats/Impl/JSONColumnsBlockInputFormatBase.h index 53d65bb3539..fe80d77cd87 100644 --- a/src/Processors/Formats/Impl/JSONColumnsBlockInputFormatBase.h +++ b/src/Processors/Formats/Impl/JSONColumnsBlockInputFormatBase.h @@ -56,7 +56,7 @@ public: size_t getApproxBytesReadForChunk() const override { return approx_bytes_read_for_chunk; } protected: - Chunk generate() override; + Chunk read() override; size_t readColumn(IColumn & column, const DataTypePtr & type, const SerializationPtr & serialization, const String & column_name); diff --git a/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp b/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp index 95563fd2f62..0ef19a9c14f 100644 --- a/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp @@ -142,7 +142,7 @@ inline bool JSONEachRowRowInputFormat::advanceToNextKey(size_t key_index) skipWhitespaceIfAny(*in); if (in->eof()) - throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Unexpected end of stream while parsing JSONEachRow format"); + throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Unexpected end of stream while parsing JSONEachRow format"); else if (*in->position() == '}') { ++in->position(); @@ -205,7 +205,7 @@ bool JSONEachRowRowInputFormat::readRow(MutableColumns & columns, RowReadExtensi return false; skipWhitespaceIfAny(*in); - bool is_first_row = getCurrentUnitNumber() == 0 && getTotalRows() == 1; + bool is_first_row = getRowNum() == 0; if (checkEndOfData(is_first_row)) return false; @@ -308,7 +308,7 @@ size_t JSONEachRowRowInputFormat::countRows(size_t max_block_size) return 0; size_t num_rows = 0; - bool is_first_row = getCurrentUnitNumber() == 0 && getTotalRows() == 0; + bool is_first_row = getRowNum() == 0; skipWhitespaceIfAny(*in); while (num_rows < max_block_size && !checkEndOfData(is_first_row)) { diff --git a/src/Processors/Formats/Impl/NativeFormat.cpp b/src/Processors/Formats/Impl/NativeFormat.cpp index 65ea87479a3..73ffc02bbc1 100644 --- a/src/Processors/Formats/Impl/NativeFormat.cpp +++ b/src/Processors/Formats/Impl/NativeFormat.cpp @@ -35,7 +35,7 @@ public: reader->resetParser(); } - Chunk generate() override + Chunk read() override { block_missing_values.clear(); size_t block_start = getDataOffsetMaybeCompressed(*in); diff --git a/src/Processors/Formats/Impl/NativeORCBlockInputFormat.cpp b/src/Processors/Formats/Impl/NativeORCBlockInputFormat.cpp index 4629127186a..2fa5c1d2850 100644 --- a/src/Processors/Formats/Impl/NativeORCBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/NativeORCBlockInputFormat.cpp @@ -905,7 +905,7 @@ bool NativeORCBlockInputFormat::prepareStripeReader() return true; } -Chunk NativeORCBlockInputFormat::generate() +Chunk NativeORCBlockInputFormat::read() { block_missing_values.clear(); diff --git a/src/Processors/Formats/Impl/NativeORCBlockInputFormat.h b/src/Processors/Formats/Impl/NativeORCBlockInputFormat.h index 6ea7a063e0d..a3ef9ed4b8f 100644 --- a/src/Processors/Formats/Impl/NativeORCBlockInputFormat.h +++ b/src/Processors/Formats/Impl/NativeORCBlockInputFormat.h @@ -62,7 +62,7 @@ public: size_t getApproxBytesReadForChunk() const override { return approx_bytes_read_for_chunk; } protected: - Chunk generate() override; + Chunk read() override; void onCancel() override { is_stopped = 1; } diff --git a/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp b/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp index 5cde51a4927..a41eacf26b7 100644 --- a/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp @@ -27,7 +27,7 @@ ORCBlockInputFormat::ORCBlockInputFormat(ReadBuffer & in_, Block header_, const { } -Chunk ORCBlockInputFormat::generate() +Chunk ORCBlockInputFormat::read() { block_missing_values.clear(); @@ -48,7 +48,7 @@ Chunk ORCBlockInputFormat::generate() auto batch_result = file_reader->ReadStripe(stripe_current, include_indices); if (!batch_result.ok()) - throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Failed to create batch reader: {}", batch_result.status().ToString()); + throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Failed to create batch reader: {}", batch_result.status().ToString()); auto batch = batch_result.ValueOrDie(); if (!batch) @@ -56,7 +56,7 @@ Chunk ORCBlockInputFormat::generate() auto table_result = arrow::Table::FromRecordBatches({batch}); if (!table_result.ok()) - throw ParsingException( + throw Exception( ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading batch of ORC data: {}", table_result.status().ToString()); /// We should extract the number of rows directly from the stripe, because in case when diff --git a/src/Processors/Formats/Impl/ORCBlockInputFormat.h b/src/Processors/Formats/Impl/ORCBlockInputFormat.h index 4d878f85255..34630345849 100644 --- a/src/Processors/Formats/Impl/ORCBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ORCBlockInputFormat.h @@ -32,7 +32,7 @@ public: size_t getApproxBytesReadForChunk() const override { return approx_bytes_read_for_chunk; } protected: - Chunk generate() override; + Chunk read() override; void onCancel() override { diff --git a/src/Processors/Formats/Impl/OneFormat.cpp b/src/Processors/Formats/Impl/OneFormat.cpp index 4a9c8caebf3..f190cce6425 100644 --- a/src/Processors/Formats/Impl/OneFormat.cpp +++ b/src/Processors/Formats/Impl/OneFormat.cpp @@ -23,7 +23,7 @@ OneInputFormat::OneInputFormat(const Block & header, ReadBuffer & in_) : IInputF header.getByPosition(0).type->getName()); } -Chunk OneInputFormat::generate() +Chunk OneInputFormat::read() { if (done) return {}; diff --git a/src/Processors/Formats/Impl/OneFormat.h b/src/Processors/Formats/Impl/OneFormat.h index f73b2dab66a..060b9b21def 100644 --- a/src/Processors/Formats/Impl/OneFormat.h +++ b/src/Processors/Formats/Impl/OneFormat.h @@ -14,7 +14,7 @@ public: String getName() const override { return "One"; } protected: - Chunk generate() override; + Chunk read() override; private: bool done = false; diff --git a/src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp b/src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp index 24f1bcde6aa..8b6969bbfcc 100644 --- a/src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp @@ -61,7 +61,7 @@ void ParallelParsingInputFormat::segmentatorThreadFunction(ThreadGroupPtr thread } catch (...) { - onBackgroundException(successfully_read_rows_count); + onBackgroundException(); } } @@ -90,7 +90,7 @@ void ParallelParsingInputFormat::parserThreadFunction(ThreadGroupPtr thread_grou ReadBuffer read_buffer(unit.segment.data(), unit.segment.size(), 0); InputFormatPtr input_format = internal_parser_creator(read_buffer); - input_format->setCurrentUnitNumber(current_ticket_number); + input_format->setRowsReadBefore(unit.offset); input_format->setErrorsLogger(errors_logger); InternalParser parser(input_format); @@ -132,28 +132,16 @@ void ParallelParsingInputFormat::parserThreadFunction(ThreadGroupPtr thread_grou } catch (...) { - onBackgroundException(unit.offset); + onBackgroundException(); } } -void ParallelParsingInputFormat::onBackgroundException(size_t offset) +void ParallelParsingInputFormat::onBackgroundException() { std::lock_guard lock(mutex); if (!background_exception) - { background_exception = std::current_exception(); - if (ParsingException * e = exception_cast(background_exception)) - { - /// NOTE: it is not that safe to use line number hack here (may exceed INT_MAX) - if (e->getLineNumber() != -1) - e->setLineNumber(static_cast(e->getLineNumber() + offset)); - - auto file_name = getFileNameFromReadBuffer(getReadBuffer()); - if (!file_name.empty()) - e->setFileName(file_name); - } - } if (is_server) tryLogCurrentException(__PRETTY_FUNCTION__); @@ -164,7 +152,7 @@ void ParallelParsingInputFormat::onBackgroundException(size_t offset) segmentator_condvar.notify_all(); } -Chunk ParallelParsingInputFormat::generate() +Chunk ParallelParsingInputFormat::read() { /// Delayed launching of segmentator thread if (unlikely(!parsing_started.exchange(true))) diff --git a/src/Processors/Formats/Impl/ParallelParsingInputFormat.h b/src/Processors/Formats/Impl/ParallelParsingInputFormat.h index 8432e053eba..ff97afa8348 100644 --- a/src/Processors/Formats/Impl/ParallelParsingInputFormat.h +++ b/src/Processors/Formats/Impl/ParallelParsingInputFormat.h @@ -135,7 +135,7 @@ public: private: - Chunk generate() override final; + Chunk read() override final; void onCancel() override final { @@ -333,7 +333,7 @@ private: /// threads. This function is used by segmentator and parsed threads. /// readImpl() is called from the main thread, so the exception handling /// is different. - void onBackgroundException(size_t offset); + void onBackgroundException(); }; } diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index d37c2dc1160..62e576d4953 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -570,7 +570,7 @@ void ParquetBlockInputFormat::decodeOneChunk(size_t row_group_batch_idx, std::un // We may be able to schedule more work now, but can't call scheduleMoreWorkIfNeeded() right // here because we're running on the same thread pool, so it'll deadlock if thread limit is - // reached. Wake up generate() instead. + // reached. Wake up read() instead. condvar.notify_all(); }; @@ -579,7 +579,7 @@ void ParquetBlockInputFormat::decodeOneChunk(size_t row_group_batch_idx, std::un auto batch = row_group_batch.record_batch_reader->Next(); if (!batch.ok()) - throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}", batch.status().ToString()); + throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}", batch.status().ToString()); if (!*batch) { @@ -637,7 +637,7 @@ void ParquetBlockInputFormat::scheduleMoreWorkIfNeeded(std::optional row } } -Chunk ParquetBlockInputFormat::generate() +Chunk ParquetBlockInputFormat::read() { initializeIfNeeded(); diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h index 7fdf03a0606..b5b884b5efa 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h @@ -65,7 +65,7 @@ public: size_t getApproxBytesReadForChunk() const override { return previous_approx_bytes_read_for_chunk; } private: - Chunk generate() override; + Chunk read() override; void onCancel() override { @@ -142,7 +142,7 @@ private: // reading its data (using RAM). Row group becomes inactive when we finish reading and // delivering all its blocks and free the RAM. Size of the window is max_decoding_threads. // - // Decoded blocks are placed in `pending_chunks` queue, then picked up by generate(). + // Decoded blocks are placed in `pending_chunks` queue, then picked up by read(). // If row group decoding runs too far ahead of delivery (by `max_pending_chunks_per_row_group` // chunks), we pause the stream for the row group, to avoid using too much memory when decoded // chunks are much bigger than the compressed data. @@ -150,7 +150,7 @@ private: // Also: // * If preserve_order = true, we deliver chunks strictly in order of increasing row group. // Decoding may still proceed in later row groups. - // * If max_decoding_threads <= 1, we run all tasks inline in generate(), without thread pool. + // * If max_decoding_threads <= 1, we run all tasks inline in read(), without thread pool. // Potential improvements: // * Plan all read ranges ahead of time, for the whole file, and do prefetching for them @@ -189,7 +189,7 @@ private: Status status = Status::NotStarted; - // Window of chunks that were decoded but not returned from generate(): + // Window of chunks that were decoded but not returned from read(): // // (delivered) next_chunk_idx // v v v @@ -215,7 +215,7 @@ private: std::unique_ptr arrow_column_to_ch_column; }; - // Chunk ready to be delivered by generate(). + // Chunk ready to be delivered by read(). struct PendingChunk { Chunk chunk; @@ -265,7 +265,7 @@ private: // Done NotStarted std::mutex mutex; - // Wakes up the generate() call, if any. + // Wakes up the read() call, if any. std::condition_variable condvar; std::vector row_group_batches; diff --git a/src/Processors/Formats/Impl/ParquetMetadataInputFormat.cpp b/src/Processors/Formats/Impl/ParquetMetadataInputFormat.cpp index 1f81f5ac201..7fd6e93dd80 100644 --- a/src/Processors/Formats/Impl/ParquetMetadataInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetMetadataInputFormat.cpp @@ -140,7 +140,7 @@ ParquetMetadataInputFormat::ParquetMetadataInputFormat(ReadBuffer & in_, Block h checkHeader(getPort().getHeader()); } -Chunk ParquetMetadataInputFormat::generate() +Chunk ParquetMetadataInputFormat::read() { Chunk res; if (done) diff --git a/src/Processors/Formats/Impl/ParquetMetadataInputFormat.h b/src/Processors/Formats/Impl/ParquetMetadataInputFormat.h index 2d027e5000f..1aa2d99ca76 100644 --- a/src/Processors/Formats/Impl/ParquetMetadataInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetMetadataInputFormat.h @@ -63,7 +63,7 @@ public: void resetParser() override; private: - Chunk generate() override; + Chunk read() override; void onCancel() override { diff --git a/src/Processors/Formats/Impl/ProtobufListInputFormat.cpp b/src/Processors/Formats/Impl/ProtobufListInputFormat.cpp index 220a24b3c8c..2382b3cf27a 100644 --- a/src/Processors/Formats/Impl/ProtobufListInputFormat.cpp +++ b/src/Processors/Formats/Impl/ProtobufListInputFormat.cpp @@ -61,7 +61,7 @@ bool ProtobufListInputFormat::readRow(MutableColumns & columns, RowReadExtension size_t ProtobufListInputFormat::countRows(size_t max_block_size) { - if (getTotalRows() == 0) + if (getRowNum() == 0) reader->startMessage(true); if (reader->eof()) diff --git a/src/Processors/Formats/Impl/TSKVRowInputFormat.cpp b/src/Processors/Formats/Impl/TSKVRowInputFormat.cpp index f4f92583473..432e944a246 100644 --- a/src/Processors/Formats/Impl/TSKVRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/TSKVRowInputFormat.cpp @@ -92,7 +92,7 @@ static bool readName(ReadBuffer & buf, StringRef & ref, String & tmp) } } - throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Unexpected end of stream while reading key name from TSKV format"); + throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Unexpected end of stream while reading key name from TSKV format"); } @@ -161,7 +161,7 @@ bool TSKVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ex if (in->eof()) { - throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Unexpected end of stream after field in TSKV format: {}", name_ref.toString()); + throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Unexpected end of stream after field in TSKV format: {}", name_ref.toString()); } else if (*in->position() == '\t') { diff --git a/src/Processors/Formats/Impl/TemplateRowInputFormat.cpp b/src/Processors/Formats/Impl/TemplateRowInputFormat.cpp index ede0426a0a2..a6e4600d83b 100644 --- a/src/Processors/Formats/Impl/TemplateRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/TemplateRowInputFormat.cpp @@ -21,7 +21,7 @@ namespace ErrorCodes [[noreturn]] static void throwUnexpectedEof(size_t row_num) { - throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Unexpected EOF while parsing row {}. " + throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Unexpected EOF while parsing row {}. " "Maybe last row has wrong format or input doesn't contain specified suffix before EOF.", std::to_string(row_num)); } @@ -121,7 +121,7 @@ bool TemplateRowInputFormat::readRow(MutableColumns & columns, RowReadExtension updateDiagnosticInfo(); - if (likely(row_num != 1)) + if (likely(getRowNum() != 0)) format_reader->skipRowBetweenDelimiter(); extra.read_columns.assign(columns.size(), false); @@ -160,7 +160,7 @@ bool TemplateRowInputFormat::deserializeField(const DataTypePtr & type, catch (Exception & e) { if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF) - throwUnexpectedEof(row_num); + throwUnexpectedEof(getRowNum()); throw; } } @@ -198,7 +198,7 @@ bool TemplateRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & col out << "\nUsing format string (from format_schema_rows): " << row_format.dump() << "\n"; out << "\nTrying to parse next row, because suffix does not match:\n"; - if (likely(row_num != 1) && !parseDelimiterWithDiagnosticInfo(out, *buf, row_between_delimiter, "delimiter between rows", ignore_spaces)) + if (likely(getRowNum() != 0) && !parseDelimiterWithDiagnosticInfo(out, *buf, row_between_delimiter, "delimiter between rows", ignore_spaces)) return false; for (size_t i = 0; i < row_format.columnsCount(); ++i) diff --git a/src/Processors/Formats/Impl/ValuesBlockInputFormat.cpp b/src/Processors/Formats/Impl/ValuesBlockInputFormat.cpp index 1a203302238..aa193ffd36a 100644 --- a/src/Processors/Formats/Impl/ValuesBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ValuesBlockInputFormat.cpp @@ -98,7 +98,7 @@ bool ValuesBlockInputFormat::skipToNextRow(ReadBuffer * buf, size_t min_chunk_by return true; } -Chunk ValuesBlockInputFormat::generate() +Chunk ValuesBlockInputFormat::read() { if (total_rows == 0) readPrefix(); diff --git a/src/Processors/Formats/Impl/ValuesBlockInputFormat.h b/src/Processors/Formats/Impl/ValuesBlockInputFormat.h index 9ea7407f12d..bf2765bfd1e 100644 --- a/src/Processors/Formats/Impl/ValuesBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ValuesBlockInputFormat.h @@ -58,7 +58,7 @@ private: using ConstantExpressionTemplates = std::vector>; - Chunk generate() override; + Chunk read() override; void readRow(MutableColumns & columns, size_t row_num); void readUntilTheEndOfRowAndReTokenize(size_t current_column_idx); diff --git a/src/Processors/Formats/RowInputFormatWithDiagnosticInfo.cpp b/src/Processors/Formats/RowInputFormatWithDiagnosticInfo.cpp index 6358a99d6b4..a56c24a740a 100644 --- a/src/Processors/Formats/RowInputFormatWithDiagnosticInfo.cpp +++ b/src/Processors/Formats/RowInputFormatWithDiagnosticInfo.cpp @@ -26,8 +26,6 @@ RowInputFormatWithDiagnosticInfo::RowInputFormatWithDiagnosticInfo(const Block & void RowInputFormatWithDiagnosticInfo::updateDiagnosticInfo() { - ++row_num; - bytes_read_at_start_of_buffer_on_prev_row = bytes_read_at_start_of_buffer_on_current_row; bytes_read_at_start_of_buffer_on_current_row = in->count() - in->offset(); @@ -73,7 +71,7 @@ std::pair RowInputFormatWithDiagnosticInfo::getDiagnosticAndRawD { in->position() = in->buffer().begin() + offset_of_prev_row; - out_diag << "\nRow " << (row_num - 1) << ":\n"; + out_diag << "\nRow " << getRowNum() - 1 << ":\n"; if (!parseRowAndPrintDiagnosticInfo(columns, out_diag)) return std::make_pair(out_diag.str(), out_data.str()); } @@ -96,7 +94,7 @@ std::pair RowInputFormatWithDiagnosticInfo::getDiagnosticAndRawD ++data; } - out_diag << "\nRow " << row_num << ":\n"; + out_diag << "\nRow " << getRowNum() << ":\n"; parseRowAndPrintDiagnosticInfo(columns, out_diag); out_diag << "\n"; @@ -193,7 +191,6 @@ bool RowInputFormatWithDiagnosticInfo::deserializeFieldAndPrintDiagnosticInfo(co void RowInputFormatWithDiagnosticInfo::resetParser() { IRowInputFormat::resetParser(); - row_num = 0; bytes_read_at_start_of_buffer_on_current_row = 0; bytes_read_at_start_of_buffer_on_prev_row = 0; offset_of_current_row = std::numeric_limits::max(); diff --git a/src/Processors/Formats/RowInputFormatWithDiagnosticInfo.h b/src/Processors/Formats/RowInputFormatWithDiagnosticInfo.h index 49793fcd208..f067ebd7583 100644 --- a/src/Processors/Formats/RowInputFormatWithDiagnosticInfo.h +++ b/src/Processors/Formats/RowInputFormatWithDiagnosticInfo.h @@ -29,9 +29,6 @@ protected: virtual void tryDeserializeField(const DataTypePtr & type, IColumn & column, size_t file_column) = 0; virtual bool isGarbageAfterField(size_t after_input_pos_idx, ReadBuffer::Position pos) = 0; - /// For convenient diagnostics in case of an error. - size_t row_num = 0; - private: /// How many bytes were read, not counting those still in the buffer. size_t bytes_read_at_start_of_buffer_on_current_row = 0; diff --git a/src/Processors/Formats/RowInputFormatWithNamesAndTypes.cpp b/src/Processors/Formats/RowInputFormatWithNamesAndTypes.cpp index f7345848559..478ce41f924 100644 --- a/src/Processors/Formats/RowInputFormatWithNamesAndTypes.cpp +++ b/src/Processors/Formats/RowInputFormatWithNamesAndTypes.cpp @@ -66,11 +66,6 @@ RowInputFormatWithNamesAndTypes::RowInputFormatWithNamesAndTypes( void RowInputFormatWithNamesAndTypes::readPrefix() { - /// This is a bit of abstraction leakage, but we need it in parallel parsing: - /// we check if this InputFormat is working with the "real" beginning of the data. - if (getCurrentUnitNumber() != 0) - return; - /// Search and remove BOM only in textual formats (CSV, TSV etc), not in binary ones (RowBinary*). /// Also, we assume that column name or type cannot contain BOM, so, if format has header, /// then BOM at beginning of stream cannot be confused with name or type of field, and it is safe to skip it. @@ -206,7 +201,7 @@ bool RowInputFormatWithNamesAndTypes::readRow(MutableColumns & columns, RowReadE updateDiagnosticInfo(); - if (likely(row_num != 1 || getCurrentUnitNumber() != 0 || (getCurrentUnitNumber() == 0 && (with_names || with_types || is_header_detected)))) + if (likely(getRowNum() != 0 || with_names || with_types || is_header_detected)) format_reader->skipRowBetweenDelimiter(); format_reader->skipRowStartDelimiter(); @@ -270,7 +265,7 @@ size_t RowInputFormatWithNamesAndTypes::countRows(size_t max_block_size) return 0; size_t num_rows = 0; - bool is_first_row = getTotalRows() == 0 && !with_names && !with_types && !is_header_detected; + bool is_first_row = getRowNum() == 0 && !with_names && !with_types && !is_header_detected; while (!format_reader->checkForSuffix() && num_rows < max_block_size) { if (likely(!is_first_row)) @@ -323,7 +318,7 @@ bool RowInputFormatWithNamesAndTypes::parseRowAndPrintDiagnosticInfo(MutableColu if (!format_reader->tryParseSuffixWithDiagnosticInfo(out)) return false; - if (likely(row_num != 1) && !format_reader->parseRowBetweenDelimiterWithDiagnosticInfo(out)) + if (likely(getRowNum() != 0) && !format_reader->parseRowBetweenDelimiterWithDiagnosticInfo(out)) return false; if (!format_reader->parseRowStartWithDiagnosticInfo(out)) diff --git a/src/Server/ProxyV1Handler.cpp b/src/Server/ProxyV1Handler.cpp index 56621940a23..d5e6ab23360 100644 --- a/src/Server/ProxyV1Handler.cpp +++ b/src/Server/ProxyV1Handler.cpp @@ -29,38 +29,38 @@ void ProxyV1Handler::run() // read "PROXY" if (!readWord(5, word, eol) || word != "PROXY" || eol) - throw ParsingException(ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED, "PROXY protocol violation"); + throw Exception(ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED, "PROXY protocol violation"); // read "TCP4" or "TCP6" or "UNKNOWN" if (!readWord(7, word, eol)) - throw ParsingException(ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED, "PROXY protocol violation"); + throw Exception(ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED, "PROXY protocol violation"); if (word != "TCP4" && word != "TCP6" && word != "UNKNOWN") - throw ParsingException(ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED, "PROXY protocol violation"); + throw Exception(ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED, "PROXY protocol violation"); if (word == "UNKNOWN" && eol) return; if (eol) - throw ParsingException(ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED, "PROXY protocol violation"); + throw Exception(ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED, "PROXY protocol violation"); // read address if (!readWord(39, word, eol) || eol) - throw ParsingException(ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED, "PROXY protocol violation"); + throw Exception(ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED, "PROXY protocol violation"); stack_data.forwarded_for = std::move(word); // read address if (!readWord(39, word, eol) || eol) - throw ParsingException(ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED, "PROXY protocol violation"); + throw Exception(ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED, "PROXY protocol violation"); // read port if (!readWord(5, word, eol) || eol) - throw ParsingException(ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED, "PROXY protocol violation"); + throw Exception(ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED, "PROXY protocol violation"); // read port and "\r\n" if (!readWord(5, word, eol) || !eol) - throw ParsingException(ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED, "PROXY protocol violation"); + throw Exception(ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED, "PROXY protocol violation"); if (!stack_data.forwarded_for.empty()) LOG_TRACE(log, "Forwarded client address from PROXY header: {}", stack_data.forwarded_for); diff --git a/tests/integration/test_kafka_bad_messages/test.py b/tests/integration/test_kafka_bad_messages/test.py index 1633f230f83..954b6042305 100644 --- a/tests/integration/test_kafka_bad_messages/test.py +++ b/tests/integration/test_kafka_bad_messages/test.py @@ -294,7 +294,7 @@ def test_bad_messages_parsing_exception(kafka_cluster, max_retries=20): ]: print(format_name) - kafka_create_topic(admin_client, f"{format_name}_err") + kafka_create_topic(admin_client, f"{format_name}_parsing_err") instance.query( f""" @@ -305,7 +305,7 @@ def test_bad_messages_parsing_exception(kafka_cluster, max_retries=20): CREATE TABLE kafka_{format_name} (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', - kafka_topic_list = '{format_name}_err', + kafka_topic_list = '{format_name}_parsing_err', kafka_group_name = '{format_name}', kafka_format = '{format_name}', kafka_num_consumers = 1; @@ -316,16 +316,18 @@ def test_bad_messages_parsing_exception(kafka_cluster, max_retries=20): ) kafka_produce( - kafka_cluster, f"{format_name}_err", ["qwertyuiop", "asdfghjkl", "zxcvbnm"] + kafka_cluster, + f"{format_name}_parsing_err", + ["qwertyuiop", "asdfghjkl", "zxcvbnm"], ) - expected_result = """avro::Exception: Invalid data file. Magic does not match: : while parsing Kafka message (topic: Avro_err, partition: 0, offset: 0)\\'|1|1|1|default|kafka_Avro -Cannot parse input: expected \\'{\\' before: \\'qwertyuiop\\': while parsing Kafka message (topic: JSONEachRow_err, partition: 0, offset: 0|1|1|1|default|kafka_JSONEachRow + expected_result = """avro::Exception: Invalid data file. Magic does not match: : while parsing Kafka message (topic: Avro_parsing_err, partition: 0, offset: 0)\\'|1|1|1|default|kafka_Avro +Cannot parse input: expected \\'{\\' before: \\'qwertyuiop\\': (at row 1)\\n: while parsing Kafka message (topic: JSONEachRow_parsing_err, partition:|1|1|1|default|kafka_JSONEachRow """ # filter out stacktrace in exceptions.text[1] because it is hardly stable enough result_system_kafka_consumers = instance.query_with_retry( """ - SELECT substr(exceptions.text[1], 1, 131), length(exceptions.text) > 1 AND length(exceptions.text) < 15, length(exceptions.time) > 1 AND length(exceptions.time) < 15, abs(dateDiff('second', exceptions.time[1], now())) < 40, database, table FROM system.kafka_consumers WHERE table in('kafka_Avro', 'kafka_JSONEachRow') ORDER BY table, assignments.partition_id[1] + SELECT substr(exceptions.text[1], 1, 139), length(exceptions.text) > 1 AND length(exceptions.text) < 15, length(exceptions.time) > 1 AND length(exceptions.time) < 15, abs(dateDiff('second', exceptions.time[1], now())) < 40, database, table FROM system.kafka_consumers WHERE table in('kafka_Avro', 'kafka_JSONEachRow') ORDER BY table, assignments.partition_id[1] """, retry_count=max_retries, sleep_time=1, @@ -338,7 +340,7 @@ Cannot parse input: expected \\'{\\' before: \\'qwertyuiop\\': while parsing Kaf "Avro", "JSONEachRow", ]: - kafka_delete_topic(admin_client, f"{format_name}_err") + kafka_delete_topic(admin_client, f"{format_name}_parsing_err") def test_bad_messages_to_mv(kafka_cluster, max_retries=20): diff --git a/tests/queries/0_stateless/02889_file_log_save_errors.reference b/tests/queries/0_stateless/02889_file_log_save_errors.reference index c4a7c1f0bda..849da6ad6fa 100644 --- a/tests/queries/0_stateless/02889_file_log_save_errors.reference +++ b/tests/queries/0_stateless/02889_file_log_save_errors.reference @@ -1,20 +1,20 @@ -Cannot parse input: expected \'{\' before: \'Error 0\' Error 0 a.jsonl -Cannot parse input: expected \'{\' before: \'Error 1\' Error 1 a.jsonl -Cannot parse input: expected \'{\' before: \'Error 2\' Error 2 a.jsonl -Cannot parse input: expected \'{\' before: \'Error 3\' Error 3 a.jsonl -Cannot parse input: expected \'{\' before: \'Error 4\' Error 4 a.jsonl -Cannot parse input: expected \'{\' before: \'Error 5\' Error 5 a.jsonl -Cannot parse input: expected \'{\' before: \'Error 6\' Error 6 a.jsonl -Cannot parse input: expected \'{\' before: \'Error 7\' Error 7 a.jsonl -Cannot parse input: expected \'{\' before: \'Error 8\' Error 8 a.jsonl -Cannot parse input: expected \'{\' before: \'Error 9\' Error 9 a.jsonl -Cannot parse input: expected \'{\' before: \'Error 10\' Error 10 b.jsonl -Cannot parse input: expected \'{\' before: \'Error 11\' Error 11 b.jsonl -Cannot parse input: expected \'{\' before: \'Error 12\' Error 12 b.jsonl -Cannot parse input: expected \'{\' before: \'Error 13\' Error 13 b.jsonl -Cannot parse input: expected \'{\' before: \'Error 14\' Error 14 b.jsonl -Cannot parse input: expected \'{\' before: \'Error 15\' Error 15 b.jsonl -Cannot parse input: expected \'{\' before: \'Error 16\' Error 16 b.jsonl -Cannot parse input: expected \'{\' before: \'Error 17\' Error 17 b.jsonl -Cannot parse input: expected \'{\' before: \'Error 18\' Error 18 b.jsonl -Cannot parse input: expected \'{\' before: \'Error 19\' Error 19 b.jsonl +Cannot parse input: expected \'{\' before: \'Error 0\': (at row 1)\n Error 0 a.jsonl +Cannot parse input: expected \'{\' before: \'Error 1\': (at row 1)\n Error 1 a.jsonl +Cannot parse input: expected \'{\' before: \'Error 2\': (at row 1)\n Error 2 a.jsonl +Cannot parse input: expected \'{\' before: \'Error 3\': (at row 1)\n Error 3 a.jsonl +Cannot parse input: expected \'{\' before: \'Error 4\': (at row 1)\n Error 4 a.jsonl +Cannot parse input: expected \'{\' before: \'Error 5\': (at row 1)\n Error 5 a.jsonl +Cannot parse input: expected \'{\' before: \'Error 6\': (at row 1)\n Error 6 a.jsonl +Cannot parse input: expected \'{\' before: \'Error 7\': (at row 1)\n Error 7 a.jsonl +Cannot parse input: expected \'{\' before: \'Error 8\': (at row 1)\n Error 8 a.jsonl +Cannot parse input: expected \'{\' before: \'Error 9\': (at row 1)\n Error 9 a.jsonl +Cannot parse input: expected \'{\' before: \'Error 10\': (at row 1)\n Error 10 b.jsonl +Cannot parse input: expected \'{\' before: \'Error 11\': (at row 1)\n Error 11 b.jsonl +Cannot parse input: expected \'{\' before: \'Error 12\': (at row 1)\n Error 12 b.jsonl +Cannot parse input: expected \'{\' before: \'Error 13\': (at row 1)\n Error 13 b.jsonl +Cannot parse input: expected \'{\' before: \'Error 14\': (at row 1)\n Error 14 b.jsonl +Cannot parse input: expected \'{\' before: \'Error 15\': (at row 1)\n Error 15 b.jsonl +Cannot parse input: expected \'{\' before: \'Error 16\': (at row 1)\n Error 16 b.jsonl +Cannot parse input: expected \'{\' before: \'Error 17\': (at row 1)\n Error 17 b.jsonl +Cannot parse input: expected \'{\' before: \'Error 18\': (at row 1)\n Error 18 b.jsonl +Cannot parse input: expected \'{\' before: \'Error 19\': (at row 1)\n Error 19 b.jsonl