diff --git a/dbms/src/Core/Settings.h b/dbms/src/Core/Settings.h index 2f571d82f22..19278e76515 100644 --- a/dbms/src/Core/Settings.h +++ b/dbms/src/Core/Settings.h @@ -153,7 +153,8 @@ struct Settings \ M(SettingBool, add_http_cors_header, false, "Write add http CORS header.") \ \ - M(SettingBool, input_format_skip_unknown_fields, false, "Skip columns with unknown names from input data (it works for JSONEachRow and TSKV formats).") \ + M(SettingBool, input_format_skip_unknown_fields, false, "Skip columns with unknown names from input data (it works for JSONEachRow, CSVWithNames, TSVWithNames and TSKV formats).") \ + M(SettingBool, input_format_with_names_use_header, false, "For TSVWithNames and CSVWithNames input formats this controls whether format parser is to assume that column data appear in the input exactly as they are specified in the header.") \ M(SettingBool, input_format_import_nested_json, false, "Map nested JSON data to nested tables (it works for JSONEachRow format).") \ M(SettingBool, input_format_defaults_for_omitted_fields, false, "For input data calculate default expressions for omitted fields (it works for JSONEachRow format).") \ \ diff --git a/dbms/src/Formats/CSVRowInputStream.cpp b/dbms/src/Formats/CSVRowInputStream.cpp index 1c9f32095a0..22e0c28206c 100644 --- a/dbms/src/Formats/CSVRowInputStream.cpp +++ b/dbms/src/Formats/CSVRowInputStream.cpp @@ -19,17 +19,7 @@ namespace ErrorCodes } -CSVRowInputStream::CSVRowInputStream(ReadBuffer & istr_, const Block & header_, bool with_names_, const FormatSettings & format_settings) - : istr(istr_), header(header_), with_names(with_names_), format_settings(format_settings) -{ - size_t num_columns = header.columns(); - data_types.resize(num_columns); - for (size_t i = 0; i < num_columns; ++i) - data_types[i] = header.safeGetByPosition(i).type; -} - - -static void skipEndOfLine(ReadBuffer & istr) +static inline void skipEndOfLine(ReadBuffer & istr) { /// \n (Unix) or \r\n (DOS/Windows) or \n\r (Mac OS Classic) @@ -53,7 +43,7 @@ static void skipEndOfLine(ReadBuffer & istr) } -static void skipDelimiter(ReadBuffer & istr, const char delimiter, bool is_last_column) +static inline void skipDelimiter(ReadBuffer & istr, const char delimiter, bool is_last_column) { if (is_last_column) { @@ -99,38 +89,149 @@ static void skipRow(ReadBuffer & istr, const FormatSettings::CSV & settings, siz } +CSVRowInputStream::CSVRowInputStream(ReadBuffer & istr_, const Block & header_, bool with_names_, const FormatSettings & format_settings) + : istr(istr_), header(header_), with_names(with_names_), format_settings(format_settings) +{ + const auto num_columns = header.columns(); + + data_types.resize(num_columns); + column_indexes_by_names.reserve(num_columns); + + for (size_t i = 0; i < num_columns; ++i) + { + const auto & column_info = header.getByPosition(i); + + data_types[i] = column_info.type; + column_indexes_by_names.emplace(column_info.name, i); + } + + input_fields_with_indexes.reserve(num_columns); + read_columns.assign(num_columns, false); +} + + +void CSVRowInputStream::setupAllColumnsByTableSchema() +{ + read_columns.assign(header.columns(), true); + input_fields_with_indexes.resize(header.columns()); + + for (size_t i = 0; i < input_fields_with_indexes.size(); ++i) + { + input_fields_with_indexes[i] = i; + } +} + + +void CSVRowInputStream::addInputColumn(const String & column_name) { + const auto column_it = column_indexes_by_names.find(column_name); + if (column_it == column_indexes_by_names.end()) + { + if (format_settings.skip_unknown_fields) + { + input_fields_with_indexes.push_back(std::nullopt); + return; + } + + throw Exception( + "Unknown field found in CSV header: '" + column_name + "' " + + "at position " + std::to_string(input_fields_with_indexes.size()) + + "\nSet the 'input_format_skip_unknown_fields' parameter explicitly to ignore and proceed", + ErrorCodes::INCORRECT_DATA + ); + } + + const auto column_index = column_it->second; + + if (read_columns[column_index]) + throw Exception("Duplicate field found while parsing CSV header: " + column_name, ErrorCodes::INCORRECT_DATA); + + read_columns[column_index] = true; + input_fields_with_indexes.emplace_back(column_index); +} + + +void CSVRowInputStream::fillUnreadColumnsWithDefaults(MutableColumns & columns, RowReadExtension & row_read_extension) +{ + /// It is safe to memorize this on the first run - the format guarantees this does not change + if (unlikely(row_num == 1)) + { + columns_to_fill_with_default_values.clear(); + for (size_t index = 0; index < read_columns.size(); ++index) + if (read_columns[index] == 0) + columns_to_fill_with_default_values.push_back(index); + } + + for (const auto column_index : columns_to_fill_with_default_values) + data_types[column_index]->insertDefaultInto(*columns[column_index]); + + row_read_extension.read_columns = read_columns; +} + + void CSVRowInputStream::readPrefix() { /// In this format, we assume, that if first string field contain BOM as value, it will be written in quotes, /// so BOM at beginning of stream cannot be confused with BOM in first string value, and it is safe to skip it. skipBOMIfExists(istr); - size_t num_columns = data_types.size(); - String tmp; - if (with_names) - skipRow(istr, format_settings.csv, num_columns); + { + if (format_settings.with_names_use_header) + { + do + { + String column_name; + + skipWhitespacesAndTabs(istr); + readCSVString(column_name, istr, format_settings.csv); + skipWhitespacesAndTabs(istr); + + addInputColumn(column_name); + } + while(checkChar(format_settings.csv.delimiter, istr)); + + skipDelimiter(istr, format_settings.csv.delimiter, true); + } + else + { + setupAllColumnsByTableSchema(); + skipRow(istr, format_settings.csv, input_fields_with_indexes.size()); + } + } + else + { + setupAllColumnsByTableSchema(); + } } -bool CSVRowInputStream::read(MutableColumns & columns, RowReadExtension &) +bool CSVRowInputStream::read(MutableColumns & columns, RowReadExtension & ext) { if (istr.eof()) return false; updateDiagnosticInfo(); - size_t size = data_types.size(); - - for (size_t i = 0; i < size; ++i) + String tmp; + for (size_t input_position = 0; input_position < input_fields_with_indexes.size(); ++input_position) { - skipWhitespacesAndTabs(istr); - data_types[i]->deserializeAsTextCSV(*columns[i], istr, format_settings); - skipWhitespacesAndTabs(istr); + const auto & column_index = input_fields_with_indexes[input_position]; + if (column_index) + { + skipWhitespacesAndTabs(istr); + data_types[*column_index]->deserializeAsTextCSV(*columns[*column_index], istr, format_settings); + skipWhitespacesAndTabs(istr); + } + else + { + readCSVString(tmp, istr, format_settings.csv); + } - skipDelimiter(istr, format_settings.csv.delimiter, i + 1 == size); + skipDelimiter(istr, format_settings.csv.delimiter, input_position + 1 == input_fields_with_indexes.size()); } + fillUnreadColumnsWithDefaults(columns, ext); + return true; } @@ -202,86 +303,101 @@ bool OPTIMIZE(1) CSVRowInputStream::parseRowAndPrintDiagnosticInfo(MutableColumn { const char delimiter = format_settings.csv.delimiter; - size_t size = data_types.size(); - for (size_t i = 0; i < size; ++i) + for (size_t input_position = 0; input_position < input_fields_with_indexes.size(); ++input_position) { - if (i == 0 && istr.eof()) + if (input_position == 0 && istr.eof()) { out << "\n"; return false; } - out << "Column " << i << ", " << std::string((i < 10 ? 2 : i < 100 ? 1 : 0), ' ') - << "name: " << header.safeGetByPosition(i).name << ", " << std::string(max_length_of_column_name - header.safeGetByPosition(i).name.size(), ' ') - << "type: " << data_types[i]->getName() << ", " << std::string(max_length_of_data_type_name - data_types[i]->getName().size(), ' '); - - BufferBase::Position prev_position = istr.position(); - BufferBase::Position curr_position = istr.position(); - std::exception_ptr exception; - - try + if (input_fields_with_indexes[input_position].has_value()) { - skipWhitespacesAndTabs(istr); - prev_position = istr.position(); - data_types[i]->deserializeAsTextCSV(*columns[i], istr, format_settings); - curr_position = istr.position(); - skipWhitespacesAndTabs(istr); - } - catch (...) - { - exception = std::current_exception(); - } + const auto & column_index = *input_fields_with_indexes[input_position]; + const auto & current_column_type = data_types[column_index]; - if (curr_position < prev_position) - throw Exception("Logical error: parsing is non-deterministic.", ErrorCodes::LOGICAL_ERROR); + out << "Column " << input_position << ", " << std::string((input_position < 10 ? 2 : input_position < 100 ? 1 : 0), ' ') + << "name: " << header.safeGetByPosition(column_index).name << ", " << std::string(max_length_of_column_name - header.safeGetByPosition(column_index).name.size(), ' ') + << "type: " << current_column_type->getName() << ", " << std::string(max_length_of_data_type_name - current_column_type->getName().size(), ' '); - if (isNumber(data_types[i]) || isDateOrDateTime(data_types[i])) - { - /// An empty string instead of a value. - if (curr_position == prev_position) + BufferBase::Position prev_position = istr.position(); + BufferBase::Position curr_position = istr.position(); + std::exception_ptr exception; + + try { - out << "ERROR: text "; - verbosePrintString(prev_position, std::min(prev_position + 10, istr.buffer().end()), out); - out << " is not like " << data_types[i]->getName() << "\n"; - return false; + skipWhitespacesAndTabs(istr); + prev_position = istr.position(); + current_column_type->deserializeAsTextCSV(*columns[column_index], istr, format_settings); + curr_position = istr.position(); + skipWhitespacesAndTabs(istr); } - } - - out << "parsed text: "; - verbosePrintString(prev_position, curr_position, out); - - if (exception) - { - if (data_types[i]->getName() == "DateTime") - out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n"; - else if (data_types[i]->getName() == "Date") - out << "ERROR: Date must be in YYYY-MM-DD format.\n"; - else - out << "ERROR\n"; - return false; - } - - out << "\n"; - - if (data_types[i]->haveMaximumSizeOfValue()) - { - if (*curr_position != '\n' && *curr_position != '\r' && *curr_position != delimiter) + catch (...) { - out << "ERROR: garbage after " << data_types[i]->getName() << ": "; - verbosePrintString(curr_position, std::min(curr_position + 10, istr.buffer().end()), out); - out << "\n"; + exception = std::current_exception(); + } - if (data_types[i]->getName() == "DateTime") + if (curr_position < prev_position) + throw Exception("Logical error: parsing is non-deterministic.", ErrorCodes::LOGICAL_ERROR); + + if (isNumber(current_column_type) || isDateOrDateTime(current_column_type)) + { + /// An empty string instead of a value. + if (curr_position == prev_position) + { + out << "ERROR: text "; + verbosePrintString(prev_position, std::min(prev_position + 10, istr.buffer().end()), out); + out << " is not like " << current_column_type->getName() << "\n"; + return false; + } + } + + out << "parsed text: "; + verbosePrintString(prev_position, curr_position, out); + + if (exception) + { + if (current_column_type->getName() == "DateTime") out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n"; - else if (data_types[i]->getName() == "Date") + else if (current_column_type->getName() == "Date") out << "ERROR: Date must be in YYYY-MM-DD format.\n"; - + else + out << "ERROR\n"; return false; } + + out << "\n"; + + if (current_column_type->haveMaximumSizeOfValue()) + { + if (*curr_position != '\n' && *curr_position != '\r' && *curr_position != delimiter) + { + out << "ERROR: garbage after " << current_column_type->getName() << ": "; + verbosePrintString(curr_position, std::min(curr_position + 10, istr.buffer().end()), out); + out << "\n"; + + if (current_column_type->getName() == "DateTime") + out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n"; + else if (current_column_type->getName() == "Date") + out << "ERROR: Date must be in YYYY-MM-DD format.\n"; + + return false; + } + } + } + else + { + static const String skipped_column_str = ""; + out << "Column " << input_position << ", " << std::string((input_position < 10 ? 2 : input_position < 100 ? 1 : 0), ' ') + << "name: " << skipped_column_str << ", " << std::string(max_length_of_column_name - skipped_column_str.length(), ' ') + << "type: " << skipped_column_str << ", " << std::string(max_length_of_data_type_name - skipped_column_str.length(), ' '); + + String tmp; + readCSVString(tmp, istr, format_settings.csv); } /// Delimiters - if (i + 1 == size) + if (input_position + 1 == input_fields_with_indexes.size()) { if (istr.eof()) return false; diff --git a/dbms/src/Formats/CSVRowInputStream.h b/dbms/src/Formats/CSVRowInputStream.h index c04bda57008..a4748e7954a 100644 --- a/dbms/src/Formats/CSVRowInputStream.h +++ b/dbms/src/Formats/CSVRowInputStream.h @@ -21,7 +21,7 @@ public: */ CSVRowInputStream(ReadBuffer & istr_, const Block & header_, bool with_names_, const FormatSettings & format_settings); - bool read(MutableColumns & columns, RowReadExtension &) override; + bool read(MutableColumns & columns, RowReadExtension & ext) override; void readPrefix() override; bool allowSyncAfterError() const override { return true; } void syncAfterError() override; @@ -36,6 +36,19 @@ private: const FormatSettings format_settings; + using IndexesMap = std::unordered_map; + IndexesMap column_indexes_by_names; + + using OptionalIndexes = std::vector>; + OptionalIndexes input_fields_with_indexes; + + std::vector read_columns; + std::vector columns_to_fill_with_default_values; + + void addInputColumn(const String & column_name); + void setupAllColumnsByTableSchema(); + void fillUnreadColumnsWithDefaults(MutableColumns & columns, RowReadExtension& ext); + /// For convenient diagnostics in case of an error. size_t row_num = 0; diff --git a/dbms/src/Formats/FormatFactory.cpp b/dbms/src/Formats/FormatFactory.cpp index 8086236db53..faf3fd7157c 100644 --- a/dbms/src/Formats/FormatFactory.cpp +++ b/dbms/src/Formats/FormatFactory.cpp @@ -40,6 +40,7 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu format_settings.csv.allow_single_quotes = settings.format_csv_allow_single_quotes; format_settings.csv.allow_double_quotes = settings.format_csv_allow_double_quotes; format_settings.values.interpret_expressions = settings.input_format_values_interpret_expressions; + format_settings.with_names_use_header = settings.input_format_with_names_use_header; format_settings.skip_unknown_fields = settings.input_format_skip_unknown_fields; format_settings.import_nested_json = settings.input_format_import_nested_json; format_settings.date_time_input_format = settings.date_time_input_format; diff --git a/dbms/src/Formats/FormatSettings.h b/dbms/src/Formats/FormatSettings.h index a2d787193f5..afddd047123 100644 --- a/dbms/src/Formats/FormatSettings.h +++ b/dbms/src/Formats/FormatSettings.h @@ -48,6 +48,7 @@ struct FormatSettings Values values; bool skip_unknown_fields = false; + bool with_names_use_header = false; bool write_statistics = true; bool import_nested_json = false; diff --git a/dbms/src/Formats/TabSeparatedRowInputStream.cpp b/dbms/src/Formats/TabSeparatedRowInputStream.cpp index f63461df3c4..70644b8c0bf 100644 --- a/dbms/src/Formats/TabSeparatedRowInputStream.cpp +++ b/dbms/src/Formats/TabSeparatedRowInputStream.cpp @@ -1,3 +1,5 @@ +#include + #include #include @@ -20,46 +22,14 @@ namespace ErrorCodes } -TabSeparatedRowInputStream::TabSeparatedRowInputStream( - ReadBuffer & istr_, const Block & header_, bool with_names_, bool with_types_, const FormatSettings & format_settings) - : istr(istr_), header(header_), with_names(with_names_), with_types(with_types_), format_settings(format_settings) +static void skipTSVRow(ReadBuffer & istr, const size_t num_columns) { - size_t num_columns = header.columns(); - data_types.resize(num_columns); + NullSink null_sink; + for (size_t i = 0; i < num_columns; ++i) - data_types[i] = header.safeGetByPosition(i).type; -} - - -void TabSeparatedRowInputStream::readPrefix() -{ - size_t num_columns = header.columns(); - String tmp; - - if (with_names || with_types) { - /// In this format, we assume that column name or type cannot contain BOM, - /// so, if format has header, - /// then BOM at beginning of stream cannot be confused with name or type of field, and it is safe to skip it. - skipBOMIfExists(istr); - } - - if (with_names) - { - for (size_t i = 0; i < num_columns; ++i) - { - readEscapedString(tmp, istr); - assertChar(i == num_columns - 1 ? '\n' : '\t', istr); - } - } - - if (with_types) - { - for (size_t i = 0; i < num_columns; ++i) - { - readEscapedString(tmp, istr); - assertChar(i == num_columns - 1 ? '\n' : '\t', istr); - } + readEscapedStringInto(null_sink, istr); + assertChar(i == num_columns - 1 ? '\n' : '\t', istr); } } @@ -77,34 +47,164 @@ static void checkForCarriageReturn(ReadBuffer & istr) } -bool TabSeparatedRowInputStream::read(MutableColumns & columns, RowReadExtension &) +TabSeparatedRowInputStream::TabSeparatedRowInputStream( + ReadBuffer & istr_, const Block & header_, bool with_names_, bool with_types_, const FormatSettings & format_settings) + : istr(istr_), header(header_), with_names(with_names_), with_types(with_types_), format_settings(format_settings) +{ + const auto num_columns = header.columns(); + + data_types.resize(num_columns); + column_indexes_by_names.reserve(num_columns); + + for (size_t i = 0; i < num_columns; ++i) + { + const auto & column_info = header.getByPosition(i); + + data_types[i] = column_info.type; + column_indexes_by_names.emplace(column_info.name, i); + } + + input_fields_with_indexes.reserve(num_columns); + read_columns.assign(num_columns, false); +} + + +void TabSeparatedRowInputStream::setupAllColumnsByTableSchema() +{ + read_columns.assign(header.columns(), true); + input_fields_with_indexes.resize(header.columns()); + + for (size_t i = 0; i < input_fields_with_indexes.size(); ++i) + input_fields_with_indexes[i] = i; +} + + +void TabSeparatedRowInputStream::addInputColumn(const String & column_name) { + const auto column_it = column_indexes_by_names.find(column_name); + if (column_it == column_indexes_by_names.end()) + { + if (format_settings.skip_unknown_fields) + { + input_fields_with_indexes.push_back(std::nullopt); + return; + } + + throw Exception( + "Unknown field found in TSV header: '" + column_name + "' " + + "at position " + std::to_string(input_fields_with_indexes.size()) + + "\nSet the 'input_format_skip_unknown_fields' parameter explicitly to ignore and proceed", + ErrorCodes::INCORRECT_DATA + ); + } + + const auto column_index = column_it->second; + + if (read_columns[column_index]) + throw Exception("Duplicate field found while parsing TSV header: " + column_name, ErrorCodes::INCORRECT_DATA); + + read_columns[column_index] = true; + input_fields_with_indexes.emplace_back(column_index); +} + + +void TabSeparatedRowInputStream::fillUnreadColumnsWithDefaults(MutableColumns & columns, RowReadExtension & row_read_extension) +{ + /// It is safe to memorize this on the first run - the format guarantees this does not change + if (unlikely(row_num == 1)) + { + columns_to_fill_with_default_values.clear(); + for (size_t index = 0; index < read_columns.size(); ++index) + if (read_columns[index] == 0) + columns_to_fill_with_default_values.push_back(index); + } + + for (const auto column_index : columns_to_fill_with_default_values) + data_types[column_index]->insertDefaultInto(*columns[column_index]); + + row_read_extension.read_columns = read_columns; +} + + +void TabSeparatedRowInputStream::readPrefix() +{ + if (with_names || with_types) + { + /// In this format, we assume that column name or type cannot contain BOM, + /// so, if format has header, + /// then BOM at beginning of stream cannot be confused with name or type of field, and it is safe to skip it. + skipBOMIfExists(istr); + } + + if (with_names) + { + if (format_settings.with_names_use_header) + { + do + { + String column_name; + readEscapedString(column_name, istr); + addInputColumn(column_name); + } + while(checkChar('\t', istr)); + + if (!istr.eof()) + { + checkForCarriageReturn(istr); + assertChar('\n', istr); + } + } + else + { + setupAllColumnsByTableSchema(); + skipTSVRow(istr, input_fields_with_indexes.size()); + } + } + else + setupAllColumnsByTableSchema(); + + if (with_types) + { + skipTSVRow(istr, input_fields_with_indexes.size()); + } +} + + +bool TabSeparatedRowInputStream::read(MutableColumns & columns, RowReadExtension & ext) { if (istr.eof()) return false; updateDiagnosticInfo(); - size_t size = data_types.size(); - - for (size_t i = 0; i < size; ++i) + for (size_t input_position = 0; input_position < input_fields_with_indexes.size(); ++input_position) { - data_types[i]->deserializeAsTextEscaped(*columns[i], istr, format_settings); - - /// skip separators - if (i + 1 == size) + const auto & column_index = input_fields_with_indexes[input_position]; + if (column_index) { - if (!istr.eof()) - { - if (unlikely(row_num == 1)) - checkForCarriageReturn(istr); - - assertChar('\n', istr); - } + data_types[*column_index]->deserializeAsTextEscaped(*columns[*column_index], istr, format_settings); } else + { + NullSink null_sink; + readEscapedStringInto(null_sink, istr); + } + + /// skip separators + if (input_position + 1 < input_fields_with_indexes.size()) + { assertChar('\t', istr); + } + else if (!istr.eof()) + { + if (unlikely(row_num == 1)) + checkForCarriageReturn(istr); + + assertChar('\n', istr); + } } + fillUnreadColumnsWithDefaults(columns, ext); + return true; } @@ -135,7 +235,7 @@ String TabSeparatedRowInputStream::getDiagnosticInfo() if (header.safeGetByPosition(i).type->getName().size() > max_length_of_data_type_name) max_length_of_data_type_name = header.safeGetByPosition(i).type->getName().size(); - /// Roll back the cursor to the beginning of the previous or current line and pars all over again. But now we derive detailed information. + /// Roll back the cursor to the beginning of the previous or current line and parse all over again. But now we derive detailed information. if (pos_of_prev_row) { @@ -173,83 +273,98 @@ String TabSeparatedRowInputStream::getDiagnosticInfo() bool OPTIMIZE(1) TabSeparatedRowInputStream::parseRowAndPrintDiagnosticInfo( MutableColumns & columns, WriteBuffer & out, size_t max_length_of_column_name, size_t max_length_of_data_type_name) { - size_t size = data_types.size(); - for (size_t i = 0; i < size; ++i) + for (size_t input_position = 0; input_position < input_fields_with_indexes.size(); ++input_position) { - if (i == 0 && istr.eof()) + if (input_position == 0 && istr.eof()) { out << "\n"; return false; } - out << "Column " << i << ", " << std::string((i < 10 ? 2 : i < 100 ? 1 : 0), ' ') - << "name: " << header.safeGetByPosition(i).name << ", " << std::string(max_length_of_column_name - header.safeGetByPosition(i).name.size(), ' ') - << "type: " << data_types[i]->getName() << ", " << std::string(max_length_of_data_type_name - data_types[i]->getName().size(), ' '); - - auto prev_position = istr.position(); - std::exception_ptr exception; - - try + if (input_fields_with_indexes[input_position].has_value()) { - data_types[i]->deserializeAsTextEscaped(*columns[i], istr, format_settings); - } - catch (...) - { - exception = std::current_exception(); - } + const auto & column_index = *input_fields_with_indexes[input_position]; + const auto & current_column_type = data_types[column_index]; - auto curr_position = istr.position(); + out << "Column " << input_position << ", " << std::string((input_position < 10 ? 2 : input_position < 100 ? 1 : 0), ' ') + << "name: " << header.safeGetByPosition(column_index).name << ", " << std::string(max_length_of_column_name - header.safeGetByPosition(column_index).name.size(), ' ') + << "type: " << current_column_type->getName() << ", " << std::string(max_length_of_data_type_name - current_column_type->getName().size(), ' '); - if (curr_position < prev_position) - throw Exception("Logical error: parsing is non-deterministic.", ErrorCodes::LOGICAL_ERROR); + auto prev_position = istr.position(); + std::exception_ptr exception; - if (isNumber(data_types[i]) || isDateOrDateTime(data_types[i])) - { - /// An empty string instead of a value. - if (curr_position == prev_position) + try { - out << "ERROR: text "; - verbosePrintString(prev_position, std::min(prev_position + 10, istr.buffer().end()), out); - out << " is not like " << data_types[i]->getName() << "\n"; - return false; + current_column_type->deserializeAsTextEscaped(*columns[column_index], istr, format_settings); } - } - - out << "parsed text: "; - verbosePrintString(prev_position, curr_position, out); - - if (exception) - { - if (data_types[i]->getName() == "DateTime") - out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n"; - else if (data_types[i]->getName() == "Date") - out << "ERROR: Date must be in YYYY-MM-DD format.\n"; - else - out << "ERROR\n"; - return false; - } - - out << "\n"; - - if (data_types[i]->haveMaximumSizeOfValue()) - { - if (*curr_position != '\n' && *curr_position != '\t') + catch (...) { - out << "ERROR: garbage after " << data_types[i]->getName() << ": "; - verbosePrintString(curr_position, std::min(curr_position + 10, istr.buffer().end()), out); - out << "\n"; + exception = std::current_exception(); + } - if (data_types[i]->getName() == "DateTime") + auto curr_position = istr.position(); + + if (curr_position < prev_position) + throw Exception("Logical error: parsing is non-deterministic.", ErrorCodes::LOGICAL_ERROR); + + if (isNumber(current_column_type) || isDateOrDateTime(current_column_type)) + { + /// An empty string instead of a value. + if (curr_position == prev_position) + { + out << "ERROR: text "; + verbosePrintString(prev_position, std::min(prev_position + 10, istr.buffer().end()), out); + out << " is not like " << current_column_type->getName() << "\n"; + return false; + } + } + + out << "parsed text: "; + verbosePrintString(prev_position, curr_position, out); + + if (exception) + { + if (current_column_type->getName() == "DateTime") out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n"; - else if (data_types[i]->getName() == "Date") + else if (current_column_type->getName() == "Date") out << "ERROR: Date must be in YYYY-MM-DD format.\n"; - + else + out << "ERROR\n"; return false; } + + out << "\n"; + + if (current_column_type->haveMaximumSizeOfValue()) + { + if (*curr_position != '\n' && *curr_position != '\t') + { + out << "ERROR: garbage after " << current_column_type->getName() << ": "; + verbosePrintString(curr_position, std::min(curr_position + 10, istr.buffer().end()), out); + out << "\n"; + + if (current_column_type->getName() == "DateTime") + out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n"; + else if (current_column_type->getName() == "Date") + out << "ERROR: Date must be in YYYY-MM-DD format.\n"; + + return false; + } + } + } + else + { + static const String skipped_column_str = ""; + out << "Column " << input_position << ", " << std::string((input_position < 10 ? 2 : input_position < 100 ? 1 : 0), ' ') + << "name: " << skipped_column_str << ", " << std::string(max_length_of_column_name - skipped_column_str.length(), ' ') + << "type: " << skipped_column_str << ", " << std::string(max_length_of_data_type_name - skipped_column_str.length(), ' '); + + NullSink null_sink; + readEscapedStringInto(null_sink, istr); } /// Delimiters - if (i + 1 == size) + if (input_position + 1 == input_fields_with_indexes.size()) { if (!istr.eof()) { diff --git a/dbms/src/Formats/TabSeparatedRowInputStream.h b/dbms/src/Formats/TabSeparatedRowInputStream.h index 2435d58d703..f8cfba4e331 100644 --- a/dbms/src/Formats/TabSeparatedRowInputStream.h +++ b/dbms/src/Formats/TabSeparatedRowInputStream.h @@ -1,5 +1,8 @@ #pragma once +#include +#include + #include #include #include @@ -22,7 +25,7 @@ public: TabSeparatedRowInputStream( ReadBuffer & istr_, const Block & header_, bool with_names_, bool with_types_, const FormatSettings & format_settings); - bool read(MutableColumns & columns, RowReadExtension &) override; + bool read(MutableColumns & columns, RowReadExtension & ext) override; void readPrefix() override; bool allowSyncAfterError() const override { return true; } void syncAfterError() override; @@ -37,6 +40,19 @@ private: const FormatSettings format_settings; DataTypes data_types; + using IndexesMap = std::unordered_map; + IndexesMap column_indexes_by_names; + + using OptionalIndexes = std::vector>; + OptionalIndexes input_fields_with_indexes; + + std::vector read_columns; + std::vector columns_to_fill_with_default_values; + + void addInputColumn(const String & column_name); + void setupAllColumnsByTableSchema(); + void fillUnreadColumnsWithDefaults(MutableColumns & columns, RowReadExtension& ext); + /// For convenient diagnostics in case of an error. size_t row_num = 0; diff --git a/dbms/tests/queries/0_stateless/00937_test_use_header_csv.reference b/dbms/tests/queries/0_stateless/00937_test_use_header_csv.reference new file mode 100644 index 00000000000..8a8b605ad41 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00937_test_use_header_csv.reference @@ -0,0 +1,21 @@ +2019-04-18 42 Line1 +2019-04-18 42 Line2 +2019-04-18 42 Line3 +2019-04-18 42 Line4 +2019-04-18 42 Line5 +2019-04-18 42 Line6 +2019-04-18 42 Line7 +2019-04-18 42 Line8 +2019-04-18 42 Line9 +2019-04-18 1 Line10 +2019-04-18 2 Line11 +2019-04-18 1 Line12 +2019-04-18 2 Line13 +2019-04-18 1 +2019-04-18 2 +0000-00-00 1 Line16 +0000-00-00 2 Line17 +2019-04-18 0 Line18 +2019-04-18 0 Line19 +0000-00-00 0 +0000-00-00 0 diff --git a/dbms/tests/queries/0_stateless/00937_test_use_header_csv.sh b/dbms/tests/queries/0_stateless/00937_test_use_header_csv.sh new file mode 100644 index 00000000000..28d8282e291 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00937_test_use_header_csv.sh @@ -0,0 +1,38 @@ +#!/usr/bin/env bash + +#CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +#. $CURDIR/../shell_config.sh + +CLICKHOUSE_CLIENT=./clickhouse-client + +$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS test.csv" +$CLICKHOUSE_CLIENT --query="CREATE TABLE test.csv (d Date, u UInt8, str String) ENGINE = TinyLog" + +INSERT_QUERY='$CLICKHOUSE_CLIENT --query="INSERT INTO test.csv FORMAT CSVWithNames"' +USE_HEADER='--input_format_with_names_use_header=1' +SKIP_UNKNOWN='--input_format_skip_unknown_fields=1' + +# - Simple check for parsing +echo -ne 'd,u,str\n2019-04-18,42,Line1\n2019-04-18,42,Line2' | eval $INSERT_QUERY +echo -ne 'd,u,str\n2019-04-18,42,Line3\n2019-04-18,42,Line4' | eval $INSERT_QUERY $USE_HEADER +echo -ne 'd,u,str\n2019-04-18,42,Line5\n2019-04-18,42,Line6' | eval $INSERT_QUERY $USE_HEADER $SKIP_UNKNOWN + +# - Check random order of fields +echo -ne 'u,d,str\n42,2019-04-18,Line7\n' | eval $INSERT_QUERY $USE_HEADER +echo -ne 'u,str,d\n42,Line8,2019-04-18\n' | eval $INSERT_QUERY $USE_HEADER +echo -ne 'str,u,d\nLine9,42,2019-04-18\n' | eval $INSERT_QUERY $USE_HEADER + +# - Check excessive fields +echo -ne 'd,u,str,more,unknown,fields\n2019-04-18,1,Line10,,,\n2019-04-18,2,Line11,,,\n' \ +| eval $INSERT_QUERY $USE_HEADER $SKIP_UNKNOWN +echo -ne 'd,unknown,str,more,u,fields\n2019-04-18,blahblah,Line12,,1,\n2019-04-18,,Line13,blahblah,2,\n' \ +| eval $INSERT_QUERY $USE_HEADER $SKIP_UNKNOWN + +# - Check missing fields (defaults) +echo -ne 'd,u\n2019-04-18,1\n2019-04-18,2\n' | eval $INSERT_QUERY $USE_HEADER +echo -ne 'str,u\nLine16,1\nLine17,2\n' | eval $INSERT_QUERY $USE_HEADER +echo -ne 'd,str\n2019-04-18,Line18\n2019-04-18,Line19\n'| eval $INSERT_QUERY $USE_HEADER +echo -ne 'unknown\n\n\n' | eval $INSERT_QUERY $USE_HEADER $SKIP_UNKNOWN + +$CLICKHOUSE_CLIENT --query="SELECT * FROM test.csv" +$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS test.csv" diff --git a/dbms/tests/queries/0_stateless/00937_test_use_header_tsv.reference b/dbms/tests/queries/0_stateless/00937_test_use_header_tsv.reference new file mode 100644 index 00000000000..8a8b605ad41 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00937_test_use_header_tsv.reference @@ -0,0 +1,21 @@ +2019-04-18 42 Line1 +2019-04-18 42 Line2 +2019-04-18 42 Line3 +2019-04-18 42 Line4 +2019-04-18 42 Line5 +2019-04-18 42 Line6 +2019-04-18 42 Line7 +2019-04-18 42 Line8 +2019-04-18 42 Line9 +2019-04-18 1 Line10 +2019-04-18 2 Line11 +2019-04-18 1 Line12 +2019-04-18 2 Line13 +2019-04-18 1 +2019-04-18 2 +0000-00-00 1 Line16 +0000-00-00 2 Line17 +2019-04-18 0 Line18 +2019-04-18 0 Line19 +0000-00-00 0 +0000-00-00 0 diff --git a/dbms/tests/queries/0_stateless/00937_test_use_header_tsv.sh b/dbms/tests/queries/0_stateless/00937_test_use_header_tsv.sh new file mode 100644 index 00000000000..5d7b6f41c70 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00937_test_use_header_tsv.sh @@ -0,0 +1,38 @@ +#!/usr/bin/env bash + +#CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +#. $CURDIR/../shell_config.sh + +CLICKHOUSE_CLIENT=./clickhouse-client + +$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS test.tsv" +$CLICKHOUSE_CLIENT --query="CREATE TABLE test.tsv (d Date, u UInt8, str String) ENGINE = TinyLog" + +INSERT_QUERY='$CLICKHOUSE_CLIENT --query="INSERT INTO test.tsv FORMAT TSVWithNames"' +USE_HEADER='--input_format_with_names_use_header=1' +SKIP_UNKNOWN='--input_format_skip_unknown_fields=1' + +# - Simple check for parsing +echo -ne 'd\tu\tstr\n2019-04-18\t42\tLine1\n2019-04-18\t42\tLine2' | eval $INSERT_QUERY +echo -ne 'd\tu\tstr\n2019-04-18\t42\tLine3\n2019-04-18\t42\tLine4' | eval $INSERT_QUERY $USE_HEADER +echo -ne 'd\tu\tstr\n2019-04-18\t42\tLine5\n2019-04-18\t42\tLine6' | eval $INSERT_QUERY $USE_HEADER $SKIP_UNKNOWN + +# - Check random order of fields +echo -ne 'u\td\tstr\n42\t2019-04-18\tLine7\n' | eval $INSERT_QUERY $USE_HEADER +echo -ne 'u\tstr\td\n42\tLine8\t2019-04-18\n' | eval $INSERT_QUERY $USE_HEADER +echo -ne 'str\tu\td\nLine9\t42\t2019-04-18\n' | eval $INSERT_QUERY $USE_HEADER + +# - Check excessive fields +echo -ne 'd\tu\tstr\tmore\tunknown\tfields\n2019-04-18\t1\tLine10\t\t\t\n2019-04-18\t2\tLine11\t\t\t\n' \ +| eval $INSERT_QUERY $USE_HEADER $SKIP_UNKNOWN +echo -ne 'd\tunknown\tstr\tmore\tu\tfields\n2019-04-18\tblahblah\tLine12\t\t1\t\n2019-04-18\t\tLine13\tblahblah\t2\t\n' \ +| eval $INSERT_QUERY $USE_HEADER $SKIP_UNKNOWN + +# - Check missing fields (defaults) +echo -ne 'd\tu\n2019-04-18\t1\n2019-04-18\t2\n' | eval $INSERT_QUERY $USE_HEADER +echo -ne 'str\tu\nLine16\t1\nLine17\t2\n' | eval $INSERT_QUERY $USE_HEADER +echo -ne 'd\tstr\n2019-04-18\tLine18\n2019-04-18\tLine19\n'| eval $INSERT_QUERY $USE_HEADER +echo -ne 'unknown\n\n\n' | eval $INSERT_QUERY $USE_HEADER $SKIP_UNKNOWN + +$CLICKHOUSE_CLIENT --query="SELECT * FROM test.tsv" +$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS test.tsv"