diff --git a/dbms/src/Processors/Formats/Impl/CSVRowInputFormat.cpp b/dbms/src/Processors/Formats/Impl/CSVRowInputFormat.cpp index 3038c9e02f6..6dc7493187b 100644 --- a/dbms/src/Processors/Formats/Impl/CSVRowInputFormat.cpp +++ b/dbms/src/Processors/Formats/Impl/CSVRowInputFormat.cpp @@ -18,16 +18,55 @@ namespace ErrorCodes CSVRowInputFormat::CSVRowInputFormat( ReadBuffer & in_, Block header, Params params, bool with_names_, const FormatSettings & format_settings) - : IRowInputFormat(std::move(header), in_, params), with_names(with_names_), format_settings(format_settings) + : IRowInputFormat(std::move(header), in_, std::move(params)) + , with_names(with_names_) + , format_settings(format_settings) { auto & sample = getPort().getHeader(); size_t num_columns = sample.columns(); + data_types.resize(num_columns); + column_indexes_by_names.reserve(num_columns); + for (size_t i = 0; i < num_columns; ++i) - data_types[i] = sample.safeGetByPosition(i).type; + { + const auto & column_info = sample.getByPosition(i); + + data_types[i] = column_info.type; + column_indexes_by_names.emplace(column_info.name, i); + } } +/// Map an input file column to a table column, based on its name. +void CSVRowInputFormat::addInputColumn(const String & column_name) +{ + const auto column_it = column_indexes_by_names.find(column_name); + if (column_it == column_indexes_by_names.end()) + { + if (format_settings.skip_unknown_fields) + { + column_indexes_for_input_fields.push_back(std::nullopt); + return; + } + + throw Exception( + "Unknown field found in CSV header: '" + column_name + "' " + + "at position " + std::to_string(column_indexes_for_input_fields.size()) + + "\nSet the 'input_format_skip_unknown_fields' parameter explicitly to ignore and proceed", + ErrorCodes::INCORRECT_DATA + ); + } + + const auto column_index = column_it->second; + + if (read_columns[column_index]) + throw Exception("Duplicate field found while parsing CSV header: " + column_name, ErrorCodes::INCORRECT_DATA); + + read_columns[column_index] = true; + column_indexes_for_input_fields.emplace_back(column_index); +} + static void skipEndOfLine(ReadBuffer & istr) { /// \n (Unix) or \r\n (DOS/Windows) or \n\r (Mac OS Classic) @@ -108,26 +147,119 @@ void CSVRowInputFormat::readPrefix() String tmp; if (with_names) - skipRow(in, format_settings.csv, num_columns); + { + /// This CSV file has a header row with column names. Depending on the + /// settings, use it or skip it. + if (format_settings.with_names_use_header) + { + /// Look at the file header to see which columns we have there. + /// The missing columns are filled with defaults. + read_columns.assign(getPort().getHeader().columns(), false); + do + { + String column_name; + skipWhitespacesAndTabs(in); + readCSVString(column_name, in, format_settings.csv); + skipWhitespacesAndTabs(in); + + addInputColumn(column_name); + } + while (checkChar(format_settings.csv.delimiter, in)); + + skipDelimiter(in, format_settings.csv.delimiter, true); + + for (auto read_column : read_columns) + { + if (read_column) + { + have_always_default_columns = true; + break; + } + } + + return; + } + else + skipRow(in, format_settings.csv, num_columns); + } } -bool CSVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &) +bool CSVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext) { if (in.eof()) return false; updateDiagnosticInfo(); - size_t size = data_types.size(); + /// Track whether we have to fill any columns in this row with default + /// values. If not, we return an empty column mask to the caller, so that + /// it doesn't have to check it. + bool have_default_columns = have_always_default_columns; - for (size_t i = 0; i < size; ++i) + const auto delimiter = format_settings.csv.delimiter; + for (size_t file_column = 0; file_column < column_indexes_for_input_fields.size(); ++file_column) { - skipWhitespacesAndTabs(in); - data_types[i]->deserializeAsTextCSV(*columns[i], in, format_settings); - skipWhitespacesAndTabs(in); + const auto & table_column = column_indexes_for_input_fields[file_column]; + const bool is_last_file_column = + file_column + 1 == column_indexes_for_input_fields.size(); - skipDelimiter(in, format_settings.csv.delimiter, i + 1 == size); + if (table_column) + { + const auto & type = data_types[*table_column]; + const bool at_delimiter = *in.position() == delimiter; + const bool at_last_column_line_end = is_last_file_column + && (*in.position() == '\n' || *in.position() == '\r' + || in.eof()); + + if (format_settings.csv.empty_as_default + && (at_delimiter || at_last_column_line_end)) + { + /// Treat empty unquoted column value as default value, if + /// specified in the settings. Tuple columns might seem + /// problematic, because they are never quoted but still contain + /// commas, which might be also used as delimiters. However, + /// they do not contain empty unquoted fields, so this check + /// works for tuples as well. + read_columns[*table_column] = false; + have_default_columns = true; + } + else + { + /// Read the column normally. + read_columns[*table_column] = true; + skipWhitespacesAndTabs(in); + type->deserializeAsTextCSV(*columns[*table_column], in, + format_settings); + skipWhitespacesAndTabs(in); + } + } + else + { + /// We never read this column from the file, just skip it. + String tmp; + readCSVString(tmp, in, format_settings.csv); + } + + skipDelimiter(in, delimiter, is_last_file_column); + } + + if (have_default_columns) + { + for (size_t i = 0; i < read_columns.size(); i++) + { + if (!read_columns[i]) + { + /// The column value for this row is going to be overwritten + /// with default by the caller, but the general assumption is + /// that the column size increases for each row, so we have + /// to insert something. Since we do not care about the exact + /// value, we do not have to use the default value specified by + /// the data type, and can just use IColumn::insertDefault(). + columns[i]->insertDefault(); + } + } + ext.read_columns = read_columns; } return true; @@ -190,93 +322,126 @@ String CSVRowInputFormat::getDiagnosticInfo() return out.str(); } - -bool CSVRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & columns, +/** gcc-7 generates wrong code with optimization level greater than 1. + * See tests: dbms/src/IO/tests/write_int.cpp + * and dbms/tests/queries/0_stateless/00898_parsing_bad_diagnostic_message.sh + * This is compiler bug. The bug does not present in gcc-8 and clang-8. + * Nevertheless, we don't need high optimization of this function. + */ +bool OPTIMIZE(1) CSVRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & columns, WriteBuffer & out, size_t max_length_of_column_name, size_t max_length_of_data_type_name) { const char delimiter = format_settings.csv.delimiter; - auto & header = getPort().getHeader(); - size_t size = data_types.size(); - for (size_t i = 0; i < size; ++i) + for (size_t file_column = 0; file_column < column_indexes_for_input_fields.size(); ++file_column) { - if (i == 0 && in.eof()) + if (file_column == 0 && in.eof()) { out << "\n"; return false; } - out << "Column " << i << ", " << std::string((i < 10 ? 2 : i < 100 ? 1 : 0), ' ') - << "name: " << header.safeGetByPosition(i).name << ", " << std::string(max_length_of_column_name - header.safeGetByPosition(i).name.size(), ' ') - << "type: " << data_types[i]->getName() << ", " << std::string(max_length_of_data_type_name - data_types[i]->getName().size(), ' '); - - BufferBase::Position prev_position = in.position(); - BufferBase::Position curr_position = in.position(); - std::exception_ptr exception; - - try + if (column_indexes_for_input_fields[file_column].has_value()) { - skipWhitespacesAndTabs(in); - prev_position = in.position(); - data_types[i]->deserializeAsTextCSV(*columns[i], in, format_settings); - curr_position = in.position(); - skipWhitespacesAndTabs(in); - } - catch (...) - { - exception = std::current_exception(); - } + const auto & table_column = *column_indexes_for_input_fields[file_column]; + const auto & current_column_type = data_types[table_column]; + const bool is_last_file_column = + file_column + 1 == column_indexes_for_input_fields.size(); + const bool at_delimiter = *in.position() == delimiter; + const bool at_last_column_line_end = is_last_file_column + && (*in.position() == '\n' || *in.position() == '\r' + || in.eof()); - if (curr_position < prev_position) - throw Exception("Logical error: parsing is non-deterministic.", ErrorCodes::LOGICAL_ERROR); + auto & header = getPort().getHeader(); + out << "Column " << file_column << ", " << std::string((file_column < 10 ? 2 : file_column < 100 ? 1 : 0), ' ') + << "name: " << header.safeGetByPosition(table_column).name << ", " << std::string(max_length_of_column_name - header.safeGetByPosition(table_column).name.size(), ' ') + << "type: " << current_column_type->getName() << ", " << std::string(max_length_of_data_type_name - current_column_type->getName().size(), ' '); - if (isNumber(data_types[i]) || isDateOrDateTime(data_types[i])) - { - /// An empty string instead of a value. - if (curr_position == prev_position) + if (format_settings.csv.empty_as_default + && (at_delimiter || at_last_column_line_end)) { - out << "ERROR: text "; - verbosePrintString(prev_position, std::min(prev_position + 10, in.buffer().end()), out); - out << " is not like " << data_types[i]->getName() << "\n"; - return false; + columns[table_column]->insertDefault(); } - } - - out << "parsed text: "; - verbosePrintString(prev_position, curr_position, out); - - if (exception) - { - if (data_types[i]->getName() == "DateTime") - out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n"; - else if (data_types[i]->getName() == "Date") - out << "ERROR: Date must be in YYYY-MM-DD format.\n"; else - out << "ERROR\n"; - return false; - } - - out << "\n"; - - if (data_types[i]->haveMaximumSizeOfValue()) - { - if (*curr_position != '\n' && *curr_position != '\r' && *curr_position != delimiter) { - out << "ERROR: garbage after " << data_types[i]->getName() << ": "; - verbosePrintString(curr_position, std::min(curr_position + 10, in.buffer().end()), out); + BufferBase::Position prev_position = in.position(); + BufferBase::Position curr_position = in.position(); + std::exception_ptr exception; + + try + { + skipWhitespacesAndTabs(in); + prev_position = in.position(); + current_column_type->deserializeAsTextCSV(*columns[table_column], in, format_settings); + curr_position = in.position(); + skipWhitespacesAndTabs(in); + } + catch (...) + { + exception = std::current_exception(); + } + + if (curr_position < prev_position) + throw Exception("Logical error: parsing is non-deterministic.", ErrorCodes::LOGICAL_ERROR); + + if (isNativeNumber(current_column_type) || isDateOrDateTime(current_column_type)) + { + /// An empty string instead of a value. + if (curr_position == prev_position) + { + out << "ERROR: text "; + verbosePrintString(prev_position, std::min(prev_position + 10, in.buffer().end()), out); + out << " is not like " << current_column_type->getName() << "\n"; + return false; + } + } + + out << "parsed text: "; + verbosePrintString(prev_position, curr_position, out); + + if (exception) + { + if (current_column_type->getName() == "DateTime") + out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n"; + else if (current_column_type->getName() == "Date") + out << "ERROR: Date must be in YYYY-MM-DD format.\n"; + else + out << "ERROR\n"; + return false; + } + out << "\n"; - if (data_types[i]->getName() == "DateTime") - out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n"; - else if (data_types[i]->getName() == "Date") - out << "ERROR: Date must be in YYYY-MM-DD format.\n"; + if (current_column_type->haveMaximumSizeOfValue() + && *curr_position != '\n' && *curr_position != '\r' + && *curr_position != delimiter) + { + out << "ERROR: garbage after " << current_column_type->getName() << ": "; + verbosePrintString(curr_position, std::min(curr_position + 10, in.buffer().end()), out); + out << "\n"; - return false; + if (current_column_type->getName() == "DateTime") + out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n"; + else if (current_column_type->getName() == "Date") + out << "ERROR: Date must be in YYYY-MM-DD format.\n"; + + return false; + } } } + else + { + static const String skipped_column_str = ""; + out << "Column " << file_column << ", " << std::string((file_column < 10 ? 2 : file_column < 100 ? 1 : 0), ' ') + << "name: " << skipped_column_str << ", " << std::string(max_length_of_column_name - skipped_column_str.length(), ' ') + << "type: " << skipped_column_str << ", " << std::string(max_length_of_data_type_name - skipped_column_str.length(), ' '); + + String tmp; + readCSVString(tmp, in, format_settings.csv); + } /// Delimiters - if (i + 1 == size) + if (file_column + 1 == column_indexes_for_input_fields.size()) { if (in.eof()) return false; @@ -294,8 +459,8 @@ bool CSVRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & columns, out << "ERROR: There is no line feed. "; verbosePrintString(in.position(), in.position() + 1, out); out << " found instead.\n" - " It's like your file has more columns than expected.\n" - "And if your file have right number of columns, maybe it have unquoted string value with comma.\n"; + " It's like your file has more columns than expected.\n" + "And if your file have right number of columns, maybe it have unquoted string value with comma.\n"; return false; } @@ -313,8 +478,8 @@ bool CSVRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & columns, if (*in.position() == '\n' || *in.position() == '\r') { out << "ERROR: Line feed found where delimiter (" << delimiter << ") is expected." - " It's like your file has less columns than expected.\n" - "And if your file have right number of columns, maybe it have unescaped quotes in values.\n"; + " It's like your file has less columns than expected.\n" + "And if your file have right number of columns, maybe it have unescaped quotes in values.\n"; } else { @@ -359,7 +524,7 @@ void registerInputFormatProcessorCSV(FormatFactory & factory) IRowInputFormat::Params params, const FormatSettings & settings) { - return std::make_shared(buf, sample, params, with_names, settings); + return std::make_shared(buf, sample, std::move(params), with_names, settings); }); } } diff --git a/dbms/src/Processors/Formats/Impl/CSVRowInputFormat.h b/dbms/src/Processors/Formats/Impl/CSVRowInputFormat.h index db7041bc90a..b7e29157e0f 100644 --- a/dbms/src/Processors/Formats/Impl/CSVRowInputFormat.h +++ b/dbms/src/Processors/Formats/Impl/CSVRowInputFormat.h @@ -36,8 +36,26 @@ private: const FormatSettings format_settings; - /// For convenient diagnostics in case of an error. + using IndexesMap = std::unordered_map; + IndexesMap column_indexes_by_names; + /// Maps indexes of columns in the input file to indexes of table columns + using OptionalIndexes = std::vector>; + OptionalIndexes column_indexes_for_input_fields; + + /// Tracks which colums we have read in a single read() call. + /// For columns that are never read, it is initialized to false when we + /// read the file header, and never changed afterwards. + /// For other columns, it is updated on each read() call. + std::vector read_columns; + + /// Whether we have any columns that are not read from file at all, + /// and must be always initialized with defaults. + bool have_always_default_columns = false; + + void addInputColumn(const String & column_name); + + /// For convenient diagnostics in case of an error. size_t row_num = 0; /// How many bytes were read, not counting those that are still in the buffer.