Update CSVRowInputStream.

This commit is contained in:
Nikolai Kochetov 2019-07-31 17:43:08 +03:00
parent f0bf083efc
commit 3a8fefdda8
2 changed files with 235 additions and 107 deletions

View File

@ -16,24 +16,114 @@ namespace ErrorCodes
} }
static void skipTSVRow(ReadBuffer & istr, const size_t num_columns)
{
NullSink null_sink;
for (size_t i = 0; i < num_columns; ++i)
{
readEscapedStringInto(null_sink, istr);
assertChar(i == num_columns - 1 ? '\n' : '\t', istr);
}
}
/** Check for a common error case - usage of Windows line feed.
*/
static void checkForCarriageReturn(ReadBuffer & istr)
{
if (istr.position()[0] == '\r' || (istr.position() != istr.buffer().begin() && istr.position()[-1] == '\r'))
throw Exception("\nYou have carriage return (\\r, 0x0D, ASCII 13) at end of first row."
"\nIt's like your input data has DOS/Windows style line separators, that are illegal in TabSeparated format."
" You must transform your file to Unix format."
"\nBut if you really need carriage return at end of string value of last column, you need to escape it as \\r.",
ErrorCodes::INCORRECT_DATA);
}
TabSeparatedRowInputFormat::TabSeparatedRowInputFormat( TabSeparatedRowInputFormat::TabSeparatedRowInputFormat(
ReadBuffer & in_, Block header, bool with_names, bool with_types, Params params, const FormatSettings & format_settings) ReadBuffer & in_, Block header, bool with_names, bool with_types, Params params, const FormatSettings & format_settings)
: IRowInputFormat(std::move(header), in_, params), with_names(with_names), with_types(with_types), format_settings(format_settings) : IRowInputFormat(std::move(header), in_, std::move(params)), with_names(with_names), with_types(with_types), format_settings(format_settings)
{ {
auto & sample = getPort().getHeader(); auto & sample = getPort().getHeader();
size_t num_columns = sample.columns(); size_t num_columns = sample.columns();
data_types.resize(num_columns); data_types.resize(num_columns);
column_indexes_by_names.reserve(num_columns);
for (size_t i = 0; i < num_columns; ++i) for (size_t i = 0; i < num_columns; ++i)
data_types[i] = sample.safeGetByPosition(i).type; {
const auto & column_info = sample.getByPosition(i);
data_types[i] = column_info.type;
column_indexes_by_names.emplace(column_info.name, i);
}
column_indexes_for_input_fields.reserve(num_columns);
read_columns.assign(num_columns, false);
}
void TabSeparatedRowInputFormat::setupAllColumnsByTableSchema()
{
auto & header = getPort().getHeader();
read_columns.assign(header.columns(), true);
column_indexes_for_input_fields.resize(header.columns());
for (size_t i = 0; i < column_indexes_for_input_fields.size(); ++i)
column_indexes_for_input_fields[i] = i;
}
void TabSeparatedRowInputFormat::addInputColumn(const String & column_name)
{
const auto column_it = column_indexes_by_names.find(column_name);
if (column_it == column_indexes_by_names.end())
{
if (format_settings.skip_unknown_fields)
{
column_indexes_for_input_fields.push_back(std::nullopt);
return;
}
throw Exception(
"Unknown field found in TSV header: '" + column_name + "' " +
"at position " + std::to_string(column_indexes_for_input_fields.size()) +
"\nSet the 'input_format_skip_unknown_fields' parameter explicitly to ignore and proceed",
ErrorCodes::INCORRECT_DATA
);
}
const auto column_index = column_it->second;
if (read_columns[column_index])
throw Exception("Duplicate field found while parsing TSV header: " + column_name, ErrorCodes::INCORRECT_DATA);
read_columns[column_index] = true;
column_indexes_for_input_fields.emplace_back(column_index);
}
void TabSeparatedRowInputFormat::fillUnreadColumnsWithDefaults(MutableColumns & columns, RowReadExtension & row_read_extension)
{
/// It is safe to memorize this on the first run - the format guarantees this does not change
if (unlikely(row_num == 1))
{
columns_to_fill_with_default_values.clear();
for (size_t index = 0; index < read_columns.size(); ++index)
if (read_columns[index] == 0)
columns_to_fill_with_default_values.push_back(index);
}
for (const auto column_index : columns_to_fill_with_default_values)
data_types[column_index]->insertDefaultInto(*columns[column_index]);
row_read_extension.read_columns = read_columns;
} }
void TabSeparatedRowInputFormat::readPrefix() void TabSeparatedRowInputFormat::readPrefix()
{ {
auto & header = getPort().getHeader();
size_t num_columns = header.columns();
String tmp;
if (with_names || with_types) if (with_names || with_types)
{ {
/// In this format, we assume that column name or type cannot contain BOM, /// In this format, we assume that column name or type cannot contain BOM,
@ -44,54 +134,64 @@ void TabSeparatedRowInputFormat::readPrefix()
if (with_names) if (with_names)
{ {
for (size_t i = 0; i < num_columns; ++i) if (format_settings.with_names_use_header)
{ {
readEscapedString(tmp, in); String column_name;
assertChar(i == num_columns - 1 ? '\n' : '\t', in); do
{
readEscapedString(column_name, in);
addInputColumn(column_name);
}
while (checkChar('\t', in));
if (!in.eof())
{
checkForCarriageReturn(in);
assertChar('\n', in);
} }
} }
else
{
setupAllColumnsByTableSchema();
skipTSVRow(in, column_indexes_for_input_fields.size());
}
}
else
setupAllColumnsByTableSchema();
if (with_types) if (with_types)
{ {
for (size_t i = 0; i < num_columns; ++i) skipTSVRow(in, column_indexes_for_input_fields.size());
{
readEscapedString(tmp, in);
assertChar(i == num_columns - 1 ? '\n' : '\t', in);
}
} }
} }
/** Check for a common error case - usage of Windows line feed. bool TabSeparatedRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext)
*/
static void checkForCarriageReturn(ReadBuffer & in)
{
if (in.position()[0] == '\r' || (in.position() != in.buffer().begin() && in.position()[-1] == '\r'))
throw Exception("\nYou have carriage return (\\r, 0x0D, ASCII 13) at end of first row."
"\nIt's like your input data has DOS/Windows style line separators, that are illegal in TabSeparated format."
" You must transform your file to Unix format."
"\nBut if you really need carriage return at end of string value of last column, you need to escape it as \\r.",
ErrorCodes::INCORRECT_DATA);
}
bool TabSeparatedRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
{ {
if (in.eof()) if (in.eof())
return false; return false;
updateDiagnosticInfo(); updateDiagnosticInfo();
size_t size = data_types.size(); for (size_t input_position = 0; input_position < column_indexes_for_input_fields.size(); ++input_position)
for (size_t i = 0; i < size; ++i)
{ {
data_types[i]->deserializeAsTextEscaped(*columns[i], in, format_settings); const auto & column_index = column_indexes_for_input_fields[input_position];
if (column_index)
{
data_types[*column_index]->deserializeAsTextEscaped(*columns[*column_index], in, format_settings);
}
else
{
NullSink null_sink;
readEscapedStringInto(null_sink, in);
}
/// skip separators /// skip separators
if (i + 1 == size) if (input_position + 1 < column_indexes_for_input_fields.size())
{ {
if (!in.eof()) assertChar('\t', in);
}
else if (!in.eof())
{ {
if (unlikely(row_num == 1)) if (unlikely(row_num == 1))
checkForCarriageReturn(in); checkForCarriageReturn(in);
@ -99,9 +199,8 @@ bool TabSeparatedRowInputFormat::readRow(MutableColumns & columns, RowReadExtens
assertChar('\n', in); assertChar('\n', in);
} }
} }
else
assertChar('\t', in); fillUnreadColumnsWithDefaults(columns, ext);
}
return true; return true;
} }
@ -166,26 +265,31 @@ String TabSeparatedRowInputFormat::getDiagnosticInfo()
bool TabSeparatedRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & columns, bool TabSeparatedRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & columns,
WriteBuffer & out, size_t max_length_of_column_name, size_t max_length_of_data_type_name) WriteBuffer & out, size_t max_length_of_column_name, size_t max_length_of_data_type_name)
{ {
auto & header = getPort().getHeader(); for (size_t input_position = 0; input_position < column_indexes_for_input_fields.size(); ++input_position)
size_t size = data_types.size();
for (size_t i = 0; i < size; ++i)
{ {
if (i == 0 && in.eof()) if (input_position == 0 && in.eof())
{ {
out << "<End of stream>\n"; out << "<End of stream>\n";
return false; return false;
} }
out << "Column " << i << ", " << std::string((i < 10 ? 2 : i < 100 ? 1 : 0), ' ') if (column_indexes_for_input_fields[input_position].has_value())
<< "name: " << header.safeGetByPosition(i).name << ", " << std::string(max_length_of_column_name - header.safeGetByPosition(i).name.size(), ' ') {
<< "type: " << data_types[i]->getName() << ", " << std::string(max_length_of_data_type_name - data_types[i]->getName().size(), ' '); const auto & column_index = *column_indexes_for_input_fields[input_position];
const auto & current_column_type = data_types[column_index];
const auto & header = getPort().getHeader();
out << "Column " << input_position << ", " << std::string((input_position < 10 ? 2 : input_position < 100 ? 1 : 0), ' ')
<< "name: " << header.safeGetByPosition(column_index).name << ", " << std::string(max_length_of_column_name - header.safeGetByPosition(column_index).name.size(), ' ')
<< "type: " << current_column_type->getName() << ", " << std::string(max_length_of_data_type_name - current_column_type->getName().size(), ' ');
auto prev_position = in.position(); auto prev_position = in.position();
std::exception_ptr exception; std::exception_ptr exception;
try try
{ {
data_types[i]->deserializeAsTextEscaped(*columns[i], in, format_settings); current_column_type->deserializeAsTextEscaped(*columns[column_index], in, format_settings);
} }
catch (...) catch (...)
{ {
@ -197,14 +301,14 @@ bool TabSeparatedRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns &
if (curr_position < prev_position) if (curr_position < prev_position)
throw Exception("Logical error: parsing is non-deterministic.", ErrorCodes::LOGICAL_ERROR); throw Exception("Logical error: parsing is non-deterministic.", ErrorCodes::LOGICAL_ERROR);
if (isNumber(data_types[i]) || isDateOrDateTime(data_types[i])) if (isNativeNumber(current_column_type) || isDateOrDateTime(current_column_type))
{ {
/// An empty string instead of a value. /// An empty string instead of a value.
if (curr_position == prev_position) if (curr_position == prev_position)
{ {
out << "ERROR: text "; out << "ERROR: text ";
verbosePrintString(prev_position, std::min(prev_position + 10, in.buffer().end()), out); verbosePrintString(prev_position, std::min(prev_position + 10, in.buffer().end()), out);
out << " is not like " << data_types[i]->getName() << "\n"; out << " is not like " << current_column_type->getName() << "\n";
return false; return false;
} }
} }
@ -214,9 +318,9 @@ bool TabSeparatedRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns &
if (exception) if (exception)
{ {
if (data_types[i]->getName() == "DateTime") if (current_column_type->getName() == "DateTime")
out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n"; out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n";
else if (data_types[i]->getName() == "Date") else if (current_column_type->getName() == "Date")
out << "ERROR: Date must be in YYYY-MM-DD format.\n"; out << "ERROR: Date must be in YYYY-MM-DD format.\n";
else else
out << "ERROR\n"; out << "ERROR\n";
@ -225,25 +329,36 @@ bool TabSeparatedRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns &
out << "\n"; out << "\n";
if (data_types[i]->haveMaximumSizeOfValue()) if (current_column_type->haveMaximumSizeOfValue())
{ {
if (*curr_position != '\n' && *curr_position != '\t') if (*curr_position != '\n' && *curr_position != '\t')
{ {
out << "ERROR: garbage after " << data_types[i]->getName() << ": "; out << "ERROR: garbage after " << current_column_type->getName() << ": ";
verbosePrintString(curr_position, std::min(curr_position + 10, in.buffer().end()), out); verbosePrintString(curr_position, std::min(curr_position + 10, in.buffer().end()), out);
out << "\n"; out << "\n";
if (data_types[i]->getName() == "DateTime") if (current_column_type->getName() == "DateTime")
out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n"; out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n";
else if (data_types[i]->getName() == "Date") else if (current_column_type->getName() == "Date")
out << "ERROR: Date must be in YYYY-MM-DD format.\n"; out << "ERROR: Date must be in YYYY-MM-DD format.\n";
return false; return false;
} }
} }
}
else
{
static const String skipped_column_str = "<SKIPPED COLUMN>";
out << "Column " << input_position << ", " << std::string((input_position < 10 ? 2 : input_position < 100 ? 1 : 0), ' ')
<< "name: " << skipped_column_str << ", " << std::string(max_length_of_column_name - skipped_column_str.length(), ' ')
<< "type: " << skipped_column_str << ", " << std::string(max_length_of_data_type_name - skipped_column_str.length(), ' ');
NullSink null_sink;
readEscapedStringInto(null_sink, in);
}
/// Delimiters /// Delimiters
if (i + 1 == size) if (input_position + 1 == column_indexes_for_input_fields.size())
{ {
if (!in.eof()) if (!in.eof())
{ {
@ -336,7 +451,7 @@ void registerInputFormatProcessorTabSeparated(FormatFactory & factory)
IRowInputFormat::Params params, IRowInputFormat::Params params,
const FormatSettings & settings) const FormatSettings & settings)
{ {
return std::make_shared<TabSeparatedRowInputFormat>(buf, sample, false, false, params, settings); return std::make_shared<TabSeparatedRowInputFormat>(buf, sample, false, false, std::move(params), settings);
}); });
} }
@ -349,7 +464,7 @@ void registerInputFormatProcessorTabSeparated(FormatFactory & factory)
IRowInputFormat::Params params, IRowInputFormat::Params params,
const FormatSettings & settings) const FormatSettings & settings)
{ {
return std::make_shared<TabSeparatedRowInputFormat>(buf, sample, true, false, params, settings); return std::make_shared<TabSeparatedRowInputFormat>(buf, sample, true, false, std::move(params), settings);
}); });
} }
@ -362,7 +477,7 @@ void registerInputFormatProcessorTabSeparated(FormatFactory & factory)
IRowInputFormat::Params params, IRowInputFormat::Params params,
const FormatSettings & settings) const FormatSettings & settings)
{ {
return std::make_shared<TabSeparatedRowInputFormat>(buf, sample, true, true, params, settings); return std::make_shared<TabSeparatedRowInputFormat>(buf, sample, true, true, std::move(params), settings);
}); });
} }
} }

View File

@ -37,6 +37,19 @@ private:
const FormatSettings format_settings; const FormatSettings format_settings;
DataTypes data_types; DataTypes data_types;
using IndexesMap = std::unordered_map<String, size_t>;
IndexesMap column_indexes_by_names;
using OptionalIndexes = std::vector<std::optional<size_t>>;
OptionalIndexes column_indexes_for_input_fields;
std::vector<UInt8> read_columns;
std::vector<size_t> columns_to_fill_with_default_values;
void addInputColumn(const String & column_name);
void setupAllColumnsByTableSchema();
void fillUnreadColumnsWithDefaults(MutableColumns & columns, RowReadExtension& ext);
/// For convenient diagnostics in case of an error. /// For convenient diagnostics in case of an error.
size_t row_num = 0; size_t row_num = 0;