Diagnostic info refactored

This commit is contained in:
Alexander Tokmakov 2019-04-14 23:01:03 +03:00 committed by Alexander Tokmakov
parent 892b67492d
commit a931e16c6c
6 changed files with 286 additions and 361 deletions

View File

@ -1,5 +1,3 @@
#include <Core/Defines.h>
#include <IO/ReadHelpers.h>
#include <IO/Operators.h>
@ -7,6 +5,7 @@
#include <Formats/CSVRowInputStream.h>
#include <Formats/FormatFactory.h>
#include <Formats/BlockInputStreamFromRowInputStream.h>
#include <DataTypes/DataTypeNothing.h>
namespace DB
@ -15,7 +14,6 @@ namespace DB
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
extern const int LOGICAL_ERROR;
}
@ -90,7 +88,7 @@ static void skipRow(ReadBuffer & istr, const FormatSettings::CSV & settings, siz
CSVRowInputStream::CSVRowInputStream(ReadBuffer & istr_, const Block & header_, bool with_names_, const FormatSettings & format_settings)
: istr(istr_), header(header_), with_names(with_names_), format_settings(format_settings)
: RowInputStreamWithDiagnosticInfo(istr_, header_), with_names(with_names_), format_settings(format_settings)
{
const auto num_columns = header.columns();
@ -274,71 +272,7 @@ bool CSVRowInputStream::read(MutableColumns & columns, RowReadExtension & ext)
return true;
}
String CSVRowInputStream::getDiagnosticInfo()
{
if (istr.eof()) /// Buffer has gone, cannot extract information about what has been parsed.
return {};
WriteBufferFromOwnString out;
MutableColumns columns = header.cloneEmptyColumns();
/// It is possible to display detailed diagnostics only if the last and next to last rows are still in the read buffer.
size_t bytes_read_at_start_of_buffer = istr.count() - istr.offset();
if (bytes_read_at_start_of_buffer != bytes_read_at_start_of_buffer_on_prev_row)
{
out << "Could not print diagnostic info because two last rows aren't in buffer (rare case)\n";
return out.str();
}
size_t max_length_of_column_name = 0;
for (size_t i = 0; i < header.columns(); ++i)
if (header.safeGetByPosition(i).name.size() > max_length_of_column_name)
max_length_of_column_name = header.safeGetByPosition(i).name.size();
size_t max_length_of_data_type_name = 0;
for (size_t i = 0; i < header.columns(); ++i)
if (header.safeGetByPosition(i).type->getName().size() > max_length_of_data_type_name)
max_length_of_data_type_name = header.safeGetByPosition(i).type->getName().size();
/// Roll back the cursor to the beginning of the previous or current row and parse all over again. But now we derive detailed information.
if (pos_of_prev_row)
{
istr.position() = pos_of_prev_row;
out << "\nRow " << (row_num - 1) << ":\n";
if (!parseRowAndPrintDiagnosticInfo(columns, out, max_length_of_column_name, max_length_of_data_type_name))
return out.str();
}
else
{
if (!pos_of_current_row)
{
out << "Could not print diagnostic info because parsing of data hasn't started.\n";
return out.str();
}
istr.position() = pos_of_current_row;
}
out << "\nRow " << row_num << ":\n";
parseRowAndPrintDiagnosticInfo(columns, out, max_length_of_column_name, max_length_of_data_type_name);
out << "\n";
return out.str();
}
/** gcc-7 generates wrong code with optimization level greater than 1.
* See tests: dbms/src/IO/tests/write_int.cpp
* and dbms/tests/queries/0_stateless/00898_parsing_bad_diagnostic_message.sh
* This is compiler bug. The bug does not present in gcc-8 and clang-8.
* Nevertheless, we don't need high optimization of this function.
*/
bool OPTIMIZE(1) CSVRowInputStream::parseRowAndPrintDiagnosticInfo(MutableColumns & columns,
WriteBuffer & out, size_t max_length_of_column_name, size_t max_length_of_data_type_name)
bool CSVRowInputStream::parseRowAndPrintDiagnosticInfo(MutableColumns & columns, WriteBuffer & out)
{
const char delimiter = format_settings.csv.delimiter;
@ -352,100 +286,18 @@ bool OPTIMIZE(1) CSVRowInputStream::parseRowAndPrintDiagnosticInfo(MutableColumn
if (column_indexes_for_input_fields[file_column].has_value())
{
const auto & table_column = *column_indexes_for_input_fields[file_column];
const auto & current_column_type = data_types[table_column];
const bool is_last_file_column =
file_column + 1 == column_indexes_for_input_fields.size();
const bool at_delimiter = *istr.position() == delimiter;
const bool at_last_column_line_end = is_last_file_column
&& (*istr.position() == '\n' || *istr.position() == '\r'
|| istr.eof());
out << "Column " << file_column << ", " << std::string((file_column < 10 ? 2 : file_column < 100 ? 1 : 0), ' ')
<< "name: " << header.safeGetByPosition(table_column).name << ", " << std::string(max_length_of_column_name - header.safeGetByPosition(table_column).name.size(), ' ')
<< "type: " << current_column_type->getName() << ", " << std::string(max_length_of_data_type_name - current_column_type->getName().size(), ' ');
if (format_settings.csv.empty_as_default
&& (at_delimiter || at_last_column_line_end))
{
columns[table_column]->insertDefault();
}
else
{
BufferBase::Position prev_position = istr.position();
BufferBase::Position curr_position = istr.position();
std::exception_ptr exception;
try
{
skipWhitespacesAndTabs(istr);
prev_position = istr.position();
current_column_type->deserializeAsTextCSV(*columns[table_column], istr, format_settings);
curr_position = istr.position();
skipWhitespacesAndTabs(istr);
}
catch (...)
{
exception = std::current_exception();
}
if (curr_position < prev_position)
throw Exception("Logical error: parsing is non-deterministic.", ErrorCodes::LOGICAL_ERROR);
if (isNativeNumber(current_column_type) || isDateOrDateTime(current_column_type))
{
/// An empty string instead of a value.
if (curr_position == prev_position)
{
out << "ERROR: text ";
verbosePrintString(prev_position, std::min(prev_position + 10, istr.buffer().end()), out);
out << " is not like " << current_column_type->getName() << "\n";
return false;
}
}
out << "parsed text: ";
verbosePrintString(prev_position, curr_position, out);
if (exception)
{
if (current_column_type->getName() == "DateTime")
out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n";
else if (current_column_type->getName() == "Date")
out << "ERROR: Date must be in YYYY-MM-DD format.\n";
else
out << "ERROR\n";
return false;
}
out << "\n";
if (current_column_type->haveMaximumSizeOfValue()
&& *curr_position != '\n' && *curr_position != '\r'
&& *curr_position != delimiter)
{
out << "ERROR: garbage after " << current_column_type->getName() << ": ";
verbosePrintString(curr_position, std::min(curr_position + 10, istr.buffer().end()), out);
out << "\n";
if (current_column_type->getName() == "DateTime")
out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n";
else if (current_column_type->getName() == "Date")
out << "ERROR: Date must be in YYYY-MM-DD format.\n";
return false;
}
}
size_t col_idx = column_indexes_for_input_fields[file_column].value();
if (!deserializeFieldAndPrintDiagnosticInfo(header.getByPosition(col_idx).name, data_types[col_idx], *columns[col_idx],
out, file_column))
return false;
}
else
{
static const String skipped_column_str = "<SKIPPED COLUMN>";
out << "Column " << file_column << ", " << std::string((file_column < 10 ? 2 : file_column < 100 ? 1 : 0), ' ')
<< "name: " << skipped_column_str << ", " << std::string(max_length_of_column_name - skipped_column_str.length(), ' ')
<< "type: " << skipped_column_str << ", " << std::string(max_length_of_data_type_name - skipped_column_str.length(), ' ');
String tmp;
readCSVString(tmp, istr, format_settings.csv);
static const DataTypePtr skipped_column_type = std::make_shared<DataTypeNothing>();
static const MutableColumnPtr skipped_column = skipped_column_type->createColumn();
if (!deserializeFieldAndPrintDiagnosticInfo(skipped_column_str, skipped_column_type, *skipped_column, out, file_column))
return false;
}
/// Delimiters
@ -509,15 +361,33 @@ void CSVRowInputStream::syncAfterError()
skipToNextLineOrEOF(istr);
}
void CSVRowInputStream::updateDiagnosticInfo()
void
CSVRowInputStream::tryDeserializeFiled(const DataTypePtr & type, IColumn & column, size_t input_position, ReadBuffer::Position & prev_pos,
ReadBuffer::Position & curr_pos)
{
++row_num;
skipWhitespacesAndTabs(istr);
prev_pos = istr.position();
bytes_read_at_start_of_buffer_on_prev_row = bytes_read_at_start_of_buffer_on_current_row;
bytes_read_at_start_of_buffer_on_current_row = istr.count() - istr.offset();
if (column_indexes_for_input_fields[input_position])
{
const bool is_last_file_column = input_position + 1 == column_indexes_for_input_fields.size();
const bool at_delimiter = *istr.position() == format_settings.csv.delimiter;
const bool at_last_column_line_end = is_last_file_column
&& (*istr.position() == '\n' || *istr.position() == '\r' || istr.eof());
pos_of_prev_row = pos_of_current_row;
pos_of_current_row = istr.position();
if (format_settings.csv.empty_as_default && (at_delimiter || at_last_column_line_end))
column.insertDefault();
else
type->deserializeAsTextCSV(column, istr, format_settings);
}
else
{
String tmp;
readCSVString(tmp, istr, format_settings.csv);
}
curr_pos = istr.position();
skipWhitespacesAndTabs(istr);
}

View File

@ -4,7 +4,7 @@
#include <unordered_map>
#include <Core/Block.h>
#include <Formats/IRowInputStream.h>
#include <Formats/RowInputStreamWithDiagnosticInfo.h>
#include <Formats/FormatSettings.h>
@ -16,7 +16,7 @@ class ReadBuffer;
/** A stream for inputting data in csv format.
* Does not conform with https://tools.ietf.org/html/rfc4180 because it skips spaces and tabs between values.
*/
class CSVRowInputStream : public IRowInputStream
class CSVRowInputStream : public RowInputStreamWithDiagnosticInfo
{
public:
/** with_names - in the first line the header with column names
@ -28,15 +28,10 @@ public:
bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;
std::string getDiagnosticInfo() override;
private:
ReadBuffer & istr;
Block header;
bool with_names;
DataTypes data_types;
const FormatSettings format_settings;
DataTypes data_types;
using IndexesMap = std::unordered_map<String, size_t>;
IndexesMap column_indexes_by_names;
@ -57,20 +52,13 @@ private:
void addInputColumn(const String & column_name);
/// For convenient diagnostics in case of an error.
size_t row_num = 0;
/// How many bytes were read, not counting those that are still in the buffer.
size_t bytes_read_at_start_of_buffer_on_current_row = 0;
size_t bytes_read_at_start_of_buffer_on_prev_row = 0;
char * pos_of_current_row = nullptr;
char * pos_of_prev_row = nullptr;
void updateDiagnosticInfo();
bool parseRowAndPrintDiagnosticInfo(MutableColumns & columns,
WriteBuffer & out, size_t max_length_of_column_name, size_t max_length_of_data_type_name);
bool parseRowAndPrintDiagnosticInfo(MutableColumns & columns, WriteBuffer & out) override;
void tryDeserializeFiled(const DataTypePtr & type, IColumn & column, size_t input_position, ReadBuffer::Position & prev_pos,
ReadBuffer::Position & curr_pos) override;
bool isGarbageAfterField(size_t, ReadBuffer::Position pos) override
{
return *pos != '\n' && *pos != '\r' && *pos != format_settings.csv.delimiter;
}
};
}

View File

@ -0,0 +1,165 @@
#include <Formats/RowInputStreamWithDiagnosticInfo.h>
#include <Formats/verbosePrintString.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
DB::RowInputStreamWithDiagnosticInfo::RowInputStreamWithDiagnosticInfo(ReadBuffer & istr_, const Block & header_)
: istr(istr_), header(header_)
{
}
void DB::RowInputStreamWithDiagnosticInfo::updateDiagnosticInfo()
{
++row_num;
bytes_read_at_start_of_buffer_on_prev_row = bytes_read_at_start_of_buffer_on_current_row;
bytes_read_at_start_of_buffer_on_current_row = istr.count() - istr.offset();
pos_of_prev_row = pos_of_current_row;
pos_of_current_row = istr.position();
}
String DB::RowInputStreamWithDiagnosticInfo::getDiagnosticInfo()
{
if (istr.eof()) /// Buffer has gone, cannot extract information about what has been parsed.
return {};
WriteBufferFromOwnString out;
MutableColumns columns = header.cloneEmptyColumns();
/// It is possible to display detailed diagnostics only if the last and next to last rows are still in the read buffer.
size_t bytes_read_at_start_of_buffer = istr.count() - istr.offset();
if (bytes_read_at_start_of_buffer != bytes_read_at_start_of_buffer_on_prev_row)
{
out << "Could not print diagnostic info because two last rows aren't in buffer (rare case)\n";
return out.str();
}
max_length_of_column_name = 0;
for (size_t i = 0; i < header.columns(); ++i)
if (header.safeGetByPosition(i).name.size() > max_length_of_column_name)
max_length_of_column_name = header.safeGetByPosition(i).name.size();
max_length_of_data_type_name = 0;
for (size_t i = 0; i < header.columns(); ++i)
if (header.safeGetByPosition(i).type->getName().size() > max_length_of_data_type_name)
max_length_of_data_type_name = header.safeGetByPosition(i).type->getName().size();
/// Roll back the cursor to the beginning of the previous or current row and parse all over again. But now we derive detailed information.
if (pos_of_prev_row)
{
istr.position() = pos_of_prev_row;
out << "\nRow " << (row_num - 1) << ":\n";
if (!parseRowAndPrintDiagnosticInfo(columns, out))
return out.str();
}
else
{
if (!pos_of_current_row)
{
out << "Could not print diagnostic info because parsing of data hasn't started.\n";
return out.str();
}
istr.position() = pos_of_current_row;
}
out << "\nRow " << row_num << ":\n";
parseRowAndPrintDiagnosticInfo(columns, out);
out << "\n";
return out.str();
}
bool RowInputStreamWithDiagnosticInfo::deserializeFieldAndPrintDiagnosticInfo(const String & col_name, const DataTypePtr & type,
IColumn & column,
WriteBuffer & out,
size_t input_position)
{
out << "Column " << input_position << ", " << std::string((input_position < 10 ? 2 : input_position < 100 ? 1 : 0), ' ')
<< "name: " << alignedName(col_name, max_length_of_column_name)
<< "type: " << alignedName(type->getName(), max_length_of_data_type_name);
auto prev_position = istr.position();
auto curr_position = istr.position();
std::exception_ptr exception;
try
{
tryDeserializeFiled(type, column, input_position, prev_position, curr_position);
}
catch (...)
{
exception = std::current_exception();
}
if (curr_position < prev_position)
throw Exception("Logical error: parsing is non-deterministic.", ErrorCodes::LOGICAL_ERROR);
if (isNativeNumber(type) || isDateOrDateTime(type))
{
/// An empty string instead of a value.
if (curr_position == prev_position)
{
out << "ERROR: text ";
verbosePrintString(prev_position, std::min(prev_position + 10, istr.buffer().end()), out);
out << " is not like " << type->getName() << "\n";
return false;
}
}
out << "parsed text: ";
verbosePrintString(prev_position, curr_position, out);
if (exception)
{
if (type->getName() == "DateTime")
out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n";
else if (type->getName() == "Date")
out << "ERROR: Date must be in YYYY-MM-DD format.\n";
else
out << "ERROR\n";
return false;
}
out << "\n";
if (type->haveMaximumSizeOfValue())
{
if (isGarbageAfterField(input_position, curr_position))
{
out << "ERROR: garbage after " << type->getName() << ": ";
verbosePrintString(curr_position, std::min(curr_position + 10, istr.buffer().end()), out);
out << "\n";
if (type->getName() == "DateTime")
out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n";
else if (type->getName() == "Date")
out << "ERROR: Date must be in YYYY-MM-DD format.\n";
return false;
}
}
return true;
}
String RowInputStreamWithDiagnosticInfo::alignedName(const String & name, size_t max_length) const
{
size_t spaces_count = max_length >= name.size() ? max_length - name.size() : 0;
return name + ", " + std::string(spaces_count, ' ');
}
}

View File

@ -0,0 +1,48 @@
#pragma once
#include <Core/Block.h>
#include <Formats/IRowInputStream.h>
#include <IO/ReadBuffer.h>
namespace DB
{
class RowInputStreamWithDiagnosticInfo : public IRowInputStream
{
public:
RowInputStreamWithDiagnosticInfo(ReadBuffer & istr_, const Block & header_);
String getDiagnosticInfo() override;
protected:
void updateDiagnosticInfo();
bool deserializeFieldAndPrintDiagnosticInfo(const String & col_name, const DataTypePtr & type, IColumn & column,
WriteBuffer & out, size_t input_position);
String alignedName(const String & name, size_t max_length) const;
virtual bool parseRowAndPrintDiagnosticInfo(MutableColumns & columns, WriteBuffer & out) = 0;
virtual void tryDeserializeFiled(const DataTypePtr & type, IColumn & column, size_t input_position, ReadBuffer::Position & prev_pos,
ReadBuffer::Position & curr_pos) = 0;
virtual bool isGarbageAfterField(size_t after_input_pos_idx, ReadBuffer::Position pos) = 0;
ReadBuffer & istr;
Block header;
/// For convenient diagnostics in case of an error.
size_t row_num = 0;
private:
/// How many bytes were read, not counting those still in the buffer.
size_t bytes_read_at_start_of_buffer_on_current_row = 0;
size_t bytes_read_at_start_of_buffer_on_prev_row = 0;
char * pos_of_current_row = nullptr;
char * pos_of_prev_row = nullptr;
/// For alignment of diagnostic info.
size_t max_length_of_column_name = 0;
size_t max_length_of_data_type_name = 0;
};
}

View File

@ -1,15 +1,11 @@
#include <string>
#include <Core/Defines.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Formats/TabSeparatedRowInputStream.h>
#include <Formats/verbosePrintString.h>
#include <Formats/FormatFactory.h>
#include <Formats/BlockInputStreamFromRowInputStream.h>
#include <DataTypes/DataTypeNothing.h>
namespace DB
@ -18,7 +14,6 @@ namespace DB
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
extern const int LOGICAL_ERROR;
}
@ -49,7 +44,7 @@ static void checkForCarriageReturn(ReadBuffer & istr)
TabSeparatedRowInputStream::TabSeparatedRowInputStream(
ReadBuffer & istr_, const Block & header_, bool with_names_, bool with_types_, const FormatSettings & format_settings)
: istr(istr_), header(header_), with_names(with_names_), with_types(with_types_), format_settings(format_settings)
: RowInputStreamWithDiagnosticInfo(istr_, header_), with_names(with_names_), with_types(with_types_), format_settings(format_settings)
{
const auto num_columns = header.columns();
@ -209,70 +204,7 @@ bool TabSeparatedRowInputStream::read(MutableColumns & columns, RowReadExtension
return true;
}
String TabSeparatedRowInputStream::getDiagnosticInfo()
{
if (istr.eof()) /// Buffer has gone, cannot extract information about what has been parsed.
return {};
WriteBufferFromOwnString out;
MutableColumns columns = header.cloneEmptyColumns();
/// It is possible to display detailed diagnostics only if the last and next to last lines are still in the read buffer.
size_t bytes_read_at_start_of_buffer = istr.count() - istr.offset();
if (bytes_read_at_start_of_buffer != bytes_read_at_start_of_buffer_on_prev_row)
{
out << "Could not print diagnostic info because two last rows aren't in buffer (rare case)\n";
return out.str();
}
size_t max_length_of_column_name = 0;
for (size_t i = 0; i < header.columns(); ++i)
if (header.safeGetByPosition(i).name.size() > max_length_of_column_name)
max_length_of_column_name = header.safeGetByPosition(i).name.size();
size_t max_length_of_data_type_name = 0;
for (size_t i = 0; i < header.columns(); ++i)
if (header.safeGetByPosition(i).type->getName().size() > max_length_of_data_type_name)
max_length_of_data_type_name = header.safeGetByPosition(i).type->getName().size();
/// Roll back the cursor to the beginning of the previous or current line and parse all over again. But now we derive detailed information.
if (pos_of_prev_row)
{
istr.position() = pos_of_prev_row;
out << "\nRow " << (row_num - 1) << ":\n";
if (!parseRowAndPrintDiagnosticInfo(columns, out, max_length_of_column_name, max_length_of_data_type_name))
return out.str();
}
else
{
if (!pos_of_current_row)
{
out << "Could not print diagnostic info because parsing of data hasn't started.\n";
return out.str();
}
istr.position() = pos_of_current_row;
}
out << "\nRow " << row_num << ":\n";
parseRowAndPrintDiagnosticInfo(columns, out, max_length_of_column_name, max_length_of_data_type_name);
out << "\n";
return out.str();
}
/** gcc-7 generates wrong code with optimization level greater than 1.
* See tests: dbms/src/IO/tests/write_int.cpp
* and dbms/tests/queries/0_stateless/00898_parsing_bad_diagnostic_message.sh
* This is compiler bug. The bug does not present in gcc-8 and clang-8.
* Nevertheless, we don't need high optimization of this function.
*/
bool OPTIMIZE(1) TabSeparatedRowInputStream::parseRowAndPrintDiagnosticInfo(
MutableColumns & columns, WriteBuffer & out, size_t max_length_of_column_name, size_t max_length_of_data_type_name)
bool TabSeparatedRowInputStream::parseRowAndPrintDiagnosticInfo(MutableColumns & columns, WriteBuffer & out)
{
for (size_t input_position = 0; input_position < column_indexes_for_input_fields.size(); ++input_position)
{
@ -284,84 +216,18 @@ bool OPTIMIZE(1) TabSeparatedRowInputStream::parseRowAndPrintDiagnosticInfo(
if (column_indexes_for_input_fields[input_position].has_value())
{
const auto & column_index = *column_indexes_for_input_fields[input_position];
const auto & current_column_type = data_types[column_index];
out << "Column " << input_position << ", " << std::string((input_position < 10 ? 2 : input_position < 100 ? 1 : 0), ' ')
<< "name: " << header.safeGetByPosition(column_index).name << ", " << std::string(max_length_of_column_name - header.safeGetByPosition(column_index).name.size(), ' ')
<< "type: " << current_column_type->getName() << ", " << std::string(max_length_of_data_type_name - current_column_type->getName().size(), ' ');
auto prev_position = istr.position();
std::exception_ptr exception;
try
{
current_column_type->deserializeAsTextEscaped(*columns[column_index], istr, format_settings);
}
catch (...)
{
exception = std::current_exception();
}
auto curr_position = istr.position();
if (curr_position < prev_position)
throw Exception("Logical error: parsing is non-deterministic.", ErrorCodes::LOGICAL_ERROR);
if (isNativeNumber(current_column_type) || isDateOrDateTime(current_column_type))
{
/// An empty string instead of a value.
if (curr_position == prev_position)
{
out << "ERROR: text ";
verbosePrintString(prev_position, std::min(prev_position + 10, istr.buffer().end()), out);
out << " is not like " << current_column_type->getName() << "\n";
return false;
}
}
out << "parsed text: ";
verbosePrintString(prev_position, curr_position, out);
if (exception)
{
if (current_column_type->getName() == "DateTime")
out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n";
else if (current_column_type->getName() == "Date")
out << "ERROR: Date must be in YYYY-MM-DD format.\n";
else
out << "ERROR\n";
size_t col_idx = column_indexes_for_input_fields[input_position].value();
if (!deserializeFieldAndPrintDiagnosticInfo(header.getByPosition(col_idx).name, data_types[col_idx], *columns[col_idx],
out, input_position))
return false;
}
out << "\n";
if (current_column_type->haveMaximumSizeOfValue())
{
if (*curr_position != '\n' && *curr_position != '\t')
{
out << "ERROR: garbage after " << current_column_type->getName() << ": ";
verbosePrintString(curr_position, std::min(curr_position + 10, istr.buffer().end()), out);
out << "\n";
if (current_column_type->getName() == "DateTime")
out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n";
else if (current_column_type->getName() == "Date")
out << "ERROR: Date must be in YYYY-MM-DD format.\n";
return false;
}
}
}
else
{
static const String skipped_column_str = "<SKIPPED COLUMN>";
out << "Column " << input_position << ", " << std::string((input_position < 10 ? 2 : input_position < 100 ? 1 : 0), ' ')
<< "name: " << skipped_column_str << ", " << std::string(max_length_of_column_name - skipped_column_str.length(), ' ')
<< "type: " << skipped_column_str << ", " << std::string(max_length_of_data_type_name - skipped_column_str.length(), ' ');
NullSink null_sink;
readEscapedStringInto(null_sink, istr);
static const DataTypePtr skipped_column_type = std::make_shared<DataTypeNothing>();
static const MutableColumnPtr skipped_column = skipped_column_type->createColumn();
if (!deserializeFieldAndPrintDiagnosticInfo(skipped_column_str, skipped_column_type, *skipped_column, out, input_position))
return false;
}
/// Delimiters
@ -434,16 +300,19 @@ void TabSeparatedRowInputStream::syncAfterError()
skipToUnescapedNextLineOrEOF(istr);
}
void TabSeparatedRowInputStream::updateDiagnosticInfo()
void TabSeparatedRowInputStream::tryDeserializeFiled(const DataTypePtr & type, IColumn & column, size_t input_position,
ReadBuffer::Position & prev_pos,
ReadBuffer::Position & curr_pos)
{
++row_num;
bytes_read_at_start_of_buffer_on_prev_row = bytes_read_at_start_of_buffer_on_current_row;
bytes_read_at_start_of_buffer_on_current_row = istr.count() - istr.offset();
pos_of_prev_row = pos_of_current_row;
pos_of_current_row = istr.position();
prev_pos = istr.position();
if (column_indexes_for_input_fields[input_position])
type->deserializeAsTextEscaped(column, istr, format_settings);
else
{
NullSink null_sink;
readEscapedStringInto(null_sink, istr);
}
curr_pos = istr.position();
}

View File

@ -5,7 +5,7 @@
#include <Core/Block.h>
#include <Formats/FormatSettings.h>
#include <Formats/IRowInputStream.h>
#include <Formats/RowInputStreamWithDiagnosticInfo.h>
namespace DB
@ -16,7 +16,7 @@ class ReadBuffer;
/** A stream to input data in tsv format.
*/
class TabSeparatedRowInputStream : public IRowInputStream
class TabSeparatedRowInputStream : public RowInputStreamWithDiagnosticInfo
{
public:
/** with_names - the first line is the header with the names of the columns
@ -30,11 +30,7 @@ public:
bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;
std::string getDiagnosticInfo() override;
private:
ReadBuffer & istr;
Block header;
bool with_names;
bool with_types;
const FormatSettings format_settings;
@ -53,21 +49,10 @@ private:
void setupAllColumnsByTableSchema();
void fillUnreadColumnsWithDefaults(MutableColumns & columns, RowReadExtension& ext);
/// For convenient diagnostics in case of an error.
size_t row_num = 0;
/// How many bytes were read, not counting those still in the buffer.
size_t bytes_read_at_start_of_buffer_on_current_row = 0;
size_t bytes_read_at_start_of_buffer_on_prev_row = 0;
char * pos_of_current_row = nullptr;
char * pos_of_prev_row = nullptr;
void updateDiagnosticInfo();
bool parseRowAndPrintDiagnosticInfo(MutableColumns & columns,
WriteBuffer & out, size_t max_length_of_column_name, size_t max_length_of_data_type_name);
bool parseRowAndPrintDiagnosticInfo(MutableColumns & columns, WriteBuffer & out) override;
void tryDeserializeFiled(const DataTypePtr & type, IColumn & column, size_t input_position, ReadBuffer::Position & prev_pos,
ReadBuffer::Position & curr_pos) override;
bool isGarbageAfterField(size_t, ReadBuffer::Position pos) override { return *pos != '\n' && *pos != '\t'; }
};
}