mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-13 09:52:38 +00:00
70d1adfe4b
* save format string for NetException * format exceptions * format exceptions 2 * format exceptions 3 * format exceptions 4 * format exceptions 5 * format exceptions 6 * fix * format exceptions 7 * format exceptions 8 * Update MergeTreeIndexGin.cpp * Update AggregateFunctionMap.cpp * Update AggregateFunctionMap.cpp * fix
261 lines
8.2 KiB
C++
261 lines
8.2 KiB
C++
#include <Processors/Formats/IRowInputFormat.h>
|
|
#include <DataTypes/ObjectUtils.h>
|
|
#include <IO/WriteHelpers.h> // toString
|
|
#include <IO/WithFileName.h>
|
|
#include <Common/logger_useful.h>
|
|
|
|
|
|
namespace DB
|
|
{
|
|
|
|
namespace ErrorCodes
|
|
{
|
|
extern const int NOT_IMPLEMENTED;
|
|
extern const int CANNOT_PARSE_INPUT_ASSERTION_FAILED;
|
|
extern const int CANNOT_PARSE_QUOTED_STRING;
|
|
extern const int CANNOT_PARSE_DATE;
|
|
extern const int CANNOT_PARSE_DATETIME;
|
|
extern const int CANNOT_PARSE_NUMBER;
|
|
extern const int CANNOT_PARSE_BOOL;
|
|
extern const int CANNOT_PARSE_UUID;
|
|
extern const int CANNOT_READ_ARRAY_FROM_TEXT;
|
|
extern const int CANNOT_READ_MAP_FROM_TEXT;
|
|
extern const int CANNOT_READ_ALL_DATA;
|
|
extern const int TOO_LARGE_STRING_SIZE;
|
|
extern const int INCORRECT_NUMBER_OF_COLUMNS;
|
|
extern const int ARGUMENT_OUT_OF_BOUND;
|
|
extern const int INCORRECT_DATA;
|
|
extern const int CANNOT_PARSE_DOMAIN_VALUE_FROM_STRING;
|
|
extern const int CANNOT_PARSE_IPV4;
|
|
extern const int CANNOT_PARSE_IPV6;
|
|
}
|
|
|
|
|
|
bool isParseError(int code)
|
|
{
|
|
return code == ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED
|
|
|| code == ErrorCodes::CANNOT_PARSE_QUOTED_STRING
|
|
|| code == ErrorCodes::CANNOT_PARSE_DATE
|
|
|| code == ErrorCodes::CANNOT_PARSE_DATETIME
|
|
|| code == ErrorCodes::CANNOT_PARSE_NUMBER
|
|
|| code == ErrorCodes::CANNOT_PARSE_UUID
|
|
|| code == ErrorCodes::CANNOT_PARSE_BOOL
|
|
|| code == ErrorCodes::CANNOT_READ_ARRAY_FROM_TEXT
|
|
|| code == ErrorCodes::CANNOT_READ_MAP_FROM_TEXT
|
|
|| code == ErrorCodes::CANNOT_READ_ALL_DATA
|
|
|| code == ErrorCodes::TOO_LARGE_STRING_SIZE
|
|
|| code == ErrorCodes::ARGUMENT_OUT_OF_BOUND /// For Decimals
|
|
|| code == ErrorCodes::INCORRECT_DATA /// For some ReadHelpers
|
|
|| code == ErrorCodes::CANNOT_PARSE_DOMAIN_VALUE_FROM_STRING
|
|
|| code == ErrorCodes::CANNOT_PARSE_IPV4
|
|
|| code == ErrorCodes::CANNOT_PARSE_IPV6;
|
|
}
|
|
|
|
IRowInputFormat::IRowInputFormat(Block header, ReadBuffer & in_, Params params_)
|
|
: IInputFormat(std::move(header), in_), serializations(getPort().getHeader().getSerializations()), params(params_)
|
|
{
|
|
}
|
|
|
|
void IRowInputFormat::logError()
|
|
{
|
|
String diagnostic;
|
|
String raw_data;
|
|
try
|
|
{
|
|
std::tie(diagnostic, raw_data) = getDiagnosticAndRawData();
|
|
}
|
|
catch (const Exception & exception)
|
|
{
|
|
diagnostic = "Cannot get diagnostic: " + exception.message();
|
|
raw_data = "Cannot get raw data: " + exception.message();
|
|
}
|
|
catch (...)
|
|
{
|
|
/// Error while trying to obtain verbose diagnostic. Ok to ignore.
|
|
}
|
|
trimLeft(diagnostic, '\n');
|
|
trimRight(diagnostic, '\n');
|
|
|
|
auto now_time = time(nullptr);
|
|
|
|
errors_logger->logError(InputFormatErrorsLogger::ErrorEntry{now_time, total_rows, diagnostic, raw_data});
|
|
}
|
|
|
|
Chunk IRowInputFormat::generate()
|
|
{
|
|
if (total_rows == 0)
|
|
readPrefix();
|
|
|
|
const Block & header = getPort().getHeader();
|
|
|
|
size_t num_columns = header.columns();
|
|
MutableColumns columns = header.cloneEmptyColumns();
|
|
|
|
block_missing_values.clear();
|
|
|
|
size_t num_rows = 0;
|
|
|
|
try
|
|
{
|
|
RowReadExtension info;
|
|
bool continue_reading = true;
|
|
for (size_t rows = 0; rows < params.max_block_size && continue_reading; ++rows)
|
|
{
|
|
try
|
|
{
|
|
++total_rows;
|
|
|
|
info.read_columns.clear();
|
|
continue_reading = readRow(columns, info);
|
|
|
|
for (size_t column_idx = 0; column_idx < info.read_columns.size(); ++column_idx)
|
|
{
|
|
if (!info.read_columns[column_idx])
|
|
{
|
|
size_t column_size = columns[column_idx]->size();
|
|
if (column_size == 0)
|
|
throw Exception(ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS, "Unexpected empty column");
|
|
block_missing_values.setBit(column_idx, column_size - 1);
|
|
}
|
|
}
|
|
|
|
/// Some formats may read row AND say the read is finished.
|
|
/// For such a case, get the number or rows from first column.
|
|
if (!columns.empty())
|
|
num_rows = columns.front()->size();
|
|
|
|
if (!continue_reading)
|
|
break;
|
|
|
|
/// The case when there is no columns. Just count rows.
|
|
if (columns.empty())
|
|
++num_rows;
|
|
}
|
|
catch (Exception & e)
|
|
{
|
|
/// Logic for possible skipping of errors.
|
|
|
|
if (!isParseError(e.code()))
|
|
throw;
|
|
|
|
if (params.allow_errors_num == 0 && params.allow_errors_ratio == 0)
|
|
throw;
|
|
|
|
if (errors_logger)
|
|
logError();
|
|
|
|
++num_errors;
|
|
Float64 current_error_ratio = static_cast<Float64>(num_errors) / total_rows;
|
|
|
|
if (num_errors > params.allow_errors_num
|
|
&& current_error_ratio > params.allow_errors_ratio)
|
|
{
|
|
e.addMessage("(Already have " + toString(num_errors) + " errors"
|
|
" out of " + toString(total_rows) + " rows"
|
|
", which is " + toString(current_error_ratio) + " of all rows)");
|
|
throw;
|
|
}
|
|
|
|
if (!allowSyncAfterError())
|
|
{
|
|
e.addMessage("(Input format doesn't allow to skip errors)");
|
|
throw;
|
|
}
|
|
|
|
syncAfterError();
|
|
|
|
/// Truncate all columns in block to initial size (remove values, that was appended to only part of columns).
|
|
|
|
for (size_t column_idx = 0; column_idx < num_columns; ++column_idx)
|
|
{
|
|
auto & column = columns[column_idx];
|
|
if (column->size() > num_rows)
|
|
column->popBack(column->size() - num_rows);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
catch (ParsingException & e)
|
|
{
|
|
String verbose_diagnostic;
|
|
try
|
|
{
|
|
verbose_diagnostic = getDiagnosticInfo();
|
|
}
|
|
catch (const Exception & exception)
|
|
{
|
|
verbose_diagnostic = "Cannot get verbose diagnostic: " + exception.message();
|
|
}
|
|
catch (...)
|
|
{
|
|
/// Error while trying to obtain verbose diagnostic. Ok to ignore.
|
|
}
|
|
|
|
e.setFileName(getFileNameFromReadBuffer(getReadBuffer()));
|
|
e.setLineNumber(static_cast<int>(total_rows));
|
|
e.addMessage(verbose_diagnostic);
|
|
throw;
|
|
}
|
|
catch (Exception & e)
|
|
{
|
|
if (!isParseError(e.code()))
|
|
throw;
|
|
|
|
String verbose_diagnostic;
|
|
try
|
|
{
|
|
verbose_diagnostic = getDiagnosticInfo();
|
|
}
|
|
catch (const Exception & exception)
|
|
{
|
|
verbose_diagnostic = "Cannot get verbose diagnostic: " + exception.message();
|
|
}
|
|
catch (...)
|
|
{
|
|
/// Error while trying to obtain verbose diagnostic. Ok to ignore.
|
|
}
|
|
|
|
auto file_name = getFileNameFromReadBuffer(getReadBuffer());
|
|
if (!file_name.empty())
|
|
e.addMessage(fmt::format("(in file/uri {})", file_name));
|
|
|
|
e.addMessage(fmt::format("(at row {})\n", total_rows));
|
|
e.addMessage(verbose_diagnostic);
|
|
throw;
|
|
}
|
|
|
|
if (columns.empty() || columns[0]->empty())
|
|
{
|
|
if (num_errors && (params.allow_errors_num > 0 || params.allow_errors_ratio > 0))
|
|
{
|
|
Poco::Logger * log = &Poco::Logger::get("IRowInputFormat");
|
|
LOG_DEBUG(log, "Skipped {} rows with errors while reading the input stream", num_errors);
|
|
}
|
|
|
|
readSuffix();
|
|
return {};
|
|
}
|
|
|
|
for (const auto & column : columns)
|
|
column->finalize();
|
|
|
|
Chunk chunk(std::move(columns), num_rows);
|
|
return chunk;
|
|
}
|
|
|
|
void IRowInputFormat::syncAfterError()
|
|
{
|
|
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method syncAfterError is not implemented for input format");
|
|
}
|
|
|
|
void IRowInputFormat::resetParser()
|
|
{
|
|
IInputFormat::resetParser();
|
|
total_rows = 0;
|
|
num_errors = 0;
|
|
block_missing_values.clear();
|
|
}
|
|
|
|
|
|
}
|