mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 01:51:59 +00:00
Update CSVRowInputFormat.
This commit is contained in:
parent
95dd6222fe
commit
d4da486b51
@ -18,16 +18,55 @@ namespace ErrorCodes
|
||||
|
||||
CSVRowInputFormat::CSVRowInputFormat(
|
||||
ReadBuffer & in_, Block header, Params params, bool with_names_, const FormatSettings & format_settings)
|
||||
: IRowInputFormat(std::move(header), in_, params), with_names(with_names_), format_settings(format_settings)
|
||||
: IRowInputFormat(std::move(header), in_, std::move(params))
|
||||
, with_names(with_names_)
|
||||
, format_settings(format_settings)
|
||||
{
|
||||
auto & sample = getPort().getHeader();
|
||||
size_t num_columns = sample.columns();
|
||||
|
||||
data_types.resize(num_columns);
|
||||
column_indexes_by_names.reserve(num_columns);
|
||||
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
data_types[i] = sample.safeGetByPosition(i).type;
|
||||
{
|
||||
const auto & column_info = sample.getByPosition(i);
|
||||
|
||||
data_types[i] = column_info.type;
|
||||
column_indexes_by_names.emplace(column_info.name, i);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Map an input file column to a table column, based on its name.
|
||||
void CSVRowInputFormat::addInputColumn(const String & column_name)
|
||||
{
|
||||
const auto column_it = column_indexes_by_names.find(column_name);
|
||||
if (column_it == column_indexes_by_names.end())
|
||||
{
|
||||
if (format_settings.skip_unknown_fields)
|
||||
{
|
||||
column_indexes_for_input_fields.push_back(std::nullopt);
|
||||
return;
|
||||
}
|
||||
|
||||
throw Exception(
|
||||
"Unknown field found in CSV header: '" + column_name + "' " +
|
||||
"at position " + std::to_string(column_indexes_for_input_fields.size()) +
|
||||
"\nSet the 'input_format_skip_unknown_fields' parameter explicitly to ignore and proceed",
|
||||
ErrorCodes::INCORRECT_DATA
|
||||
);
|
||||
}
|
||||
|
||||
const auto column_index = column_it->second;
|
||||
|
||||
if (read_columns[column_index])
|
||||
throw Exception("Duplicate field found while parsing CSV header: " + column_name, ErrorCodes::INCORRECT_DATA);
|
||||
|
||||
read_columns[column_index] = true;
|
||||
column_indexes_for_input_fields.emplace_back(column_index);
|
||||
}
|
||||
|
||||
static void skipEndOfLine(ReadBuffer & istr)
|
||||
{
|
||||
/// \n (Unix) or \r\n (DOS/Windows) or \n\r (Mac OS Classic)
|
||||
@ -108,26 +147,119 @@ void CSVRowInputFormat::readPrefix()
|
||||
String tmp;
|
||||
|
||||
if (with_names)
|
||||
skipRow(in, format_settings.csv, num_columns);
|
||||
{
|
||||
/// This CSV file has a header row with column names. Depending on the
|
||||
/// settings, use it or skip it.
|
||||
if (format_settings.with_names_use_header)
|
||||
{
|
||||
/// Look at the file header to see which columns we have there.
|
||||
/// The missing columns are filled with defaults.
|
||||
read_columns.assign(getPort().getHeader().columns(), false);
|
||||
do
|
||||
{
|
||||
String column_name;
|
||||
skipWhitespacesAndTabs(in);
|
||||
readCSVString(column_name, in, format_settings.csv);
|
||||
skipWhitespacesAndTabs(in);
|
||||
|
||||
addInputColumn(column_name);
|
||||
}
|
||||
while (checkChar(format_settings.csv.delimiter, in));
|
||||
|
||||
skipDelimiter(in, format_settings.csv.delimiter, true);
|
||||
|
||||
for (auto read_column : read_columns)
|
||||
{
|
||||
if (read_column)
|
||||
{
|
||||
have_always_default_columns = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
else
|
||||
skipRow(in, format_settings.csv, num_columns);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
bool CSVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
|
||||
bool CSVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext)
|
||||
{
|
||||
if (in.eof())
|
||||
return false;
|
||||
|
||||
updateDiagnosticInfo();
|
||||
|
||||
size_t size = data_types.size();
|
||||
/// Track whether we have to fill any columns in this row with default
|
||||
/// values. If not, we return an empty column mask to the caller, so that
|
||||
/// it doesn't have to check it.
|
||||
bool have_default_columns = have_always_default_columns;
|
||||
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
const auto delimiter = format_settings.csv.delimiter;
|
||||
for (size_t file_column = 0; file_column < column_indexes_for_input_fields.size(); ++file_column)
|
||||
{
|
||||
skipWhitespacesAndTabs(in);
|
||||
data_types[i]->deserializeAsTextCSV(*columns[i], in, format_settings);
|
||||
skipWhitespacesAndTabs(in);
|
||||
const auto & table_column = column_indexes_for_input_fields[file_column];
|
||||
const bool is_last_file_column =
|
||||
file_column + 1 == column_indexes_for_input_fields.size();
|
||||
|
||||
skipDelimiter(in, format_settings.csv.delimiter, i + 1 == size);
|
||||
if (table_column)
|
||||
{
|
||||
const auto & type = data_types[*table_column];
|
||||
const bool at_delimiter = *in.position() == delimiter;
|
||||
const bool at_last_column_line_end = is_last_file_column
|
||||
&& (*in.position() == '\n' || *in.position() == '\r'
|
||||
|| in.eof());
|
||||
|
||||
if (format_settings.csv.empty_as_default
|
||||
&& (at_delimiter || at_last_column_line_end))
|
||||
{
|
||||
/// Treat empty unquoted column value as default value, if
|
||||
/// specified in the settings. Tuple columns might seem
|
||||
/// problematic, because they are never quoted but still contain
|
||||
/// commas, which might be also used as delimiters. However,
|
||||
/// they do not contain empty unquoted fields, so this check
|
||||
/// works for tuples as well.
|
||||
read_columns[*table_column] = false;
|
||||
have_default_columns = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Read the column normally.
|
||||
read_columns[*table_column] = true;
|
||||
skipWhitespacesAndTabs(in);
|
||||
type->deserializeAsTextCSV(*columns[*table_column], in,
|
||||
format_settings);
|
||||
skipWhitespacesAndTabs(in);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/// We never read this column from the file, just skip it.
|
||||
String tmp;
|
||||
readCSVString(tmp, in, format_settings.csv);
|
||||
}
|
||||
|
||||
skipDelimiter(in, delimiter, is_last_file_column);
|
||||
}
|
||||
|
||||
if (have_default_columns)
|
||||
{
|
||||
for (size_t i = 0; i < read_columns.size(); i++)
|
||||
{
|
||||
if (!read_columns[i])
|
||||
{
|
||||
/// The column value for this row is going to be overwritten
|
||||
/// with default by the caller, but the general assumption is
|
||||
/// that the column size increases for each row, so we have
|
||||
/// to insert something. Since we do not care about the exact
|
||||
/// value, we do not have to use the default value specified by
|
||||
/// the data type, and can just use IColumn::insertDefault().
|
||||
columns[i]->insertDefault();
|
||||
}
|
||||
}
|
||||
ext.read_columns = read_columns;
|
||||
}
|
||||
|
||||
return true;
|
||||
@ -190,93 +322,126 @@ String CSVRowInputFormat::getDiagnosticInfo()
|
||||
return out.str();
|
||||
}
|
||||
|
||||
|
||||
bool CSVRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & columns,
|
||||
/** gcc-7 generates wrong code with optimization level greater than 1.
|
||||
* See tests: dbms/src/IO/tests/write_int.cpp
|
||||
* and dbms/tests/queries/0_stateless/00898_parsing_bad_diagnostic_message.sh
|
||||
* This is compiler bug. The bug does not present in gcc-8 and clang-8.
|
||||
* Nevertheless, we don't need high optimization of this function.
|
||||
*/
|
||||
bool OPTIMIZE(1) CSVRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & columns,
|
||||
WriteBuffer & out, size_t max_length_of_column_name, size_t max_length_of_data_type_name)
|
||||
{
|
||||
const char delimiter = format_settings.csv.delimiter;
|
||||
auto & header = getPort().getHeader();
|
||||
|
||||
size_t size = data_types.size();
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
for (size_t file_column = 0; file_column < column_indexes_for_input_fields.size(); ++file_column)
|
||||
{
|
||||
if (i == 0 && in.eof())
|
||||
if (file_column == 0 && in.eof())
|
||||
{
|
||||
out << "<End of stream>\n";
|
||||
return false;
|
||||
}
|
||||
|
||||
out << "Column " << i << ", " << std::string((i < 10 ? 2 : i < 100 ? 1 : 0), ' ')
|
||||
<< "name: " << header.safeGetByPosition(i).name << ", " << std::string(max_length_of_column_name - header.safeGetByPosition(i).name.size(), ' ')
|
||||
<< "type: " << data_types[i]->getName() << ", " << std::string(max_length_of_data_type_name - data_types[i]->getName().size(), ' ');
|
||||
|
||||
BufferBase::Position prev_position = in.position();
|
||||
BufferBase::Position curr_position = in.position();
|
||||
std::exception_ptr exception;
|
||||
|
||||
try
|
||||
if (column_indexes_for_input_fields[file_column].has_value())
|
||||
{
|
||||
skipWhitespacesAndTabs(in);
|
||||
prev_position = in.position();
|
||||
data_types[i]->deserializeAsTextCSV(*columns[i], in, format_settings);
|
||||
curr_position = in.position();
|
||||
skipWhitespacesAndTabs(in);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
exception = std::current_exception();
|
||||
}
|
||||
const auto & table_column = *column_indexes_for_input_fields[file_column];
|
||||
const auto & current_column_type = data_types[table_column];
|
||||
const bool is_last_file_column =
|
||||
file_column + 1 == column_indexes_for_input_fields.size();
|
||||
const bool at_delimiter = *in.position() == delimiter;
|
||||
const bool at_last_column_line_end = is_last_file_column
|
||||
&& (*in.position() == '\n' || *in.position() == '\r'
|
||||
|| in.eof());
|
||||
|
||||
if (curr_position < prev_position)
|
||||
throw Exception("Logical error: parsing is non-deterministic.", ErrorCodes::LOGICAL_ERROR);
|
||||
auto & header = getPort().getHeader();
|
||||
out << "Column " << file_column << ", " << std::string((file_column < 10 ? 2 : file_column < 100 ? 1 : 0), ' ')
|
||||
<< "name: " << header.safeGetByPosition(table_column).name << ", " << std::string(max_length_of_column_name - header.safeGetByPosition(table_column).name.size(), ' ')
|
||||
<< "type: " << current_column_type->getName() << ", " << std::string(max_length_of_data_type_name - current_column_type->getName().size(), ' ');
|
||||
|
||||
if (isNumber(data_types[i]) || isDateOrDateTime(data_types[i]))
|
||||
{
|
||||
/// An empty string instead of a value.
|
||||
if (curr_position == prev_position)
|
||||
if (format_settings.csv.empty_as_default
|
||||
&& (at_delimiter || at_last_column_line_end))
|
||||
{
|
||||
out << "ERROR: text ";
|
||||
verbosePrintString(prev_position, std::min(prev_position + 10, in.buffer().end()), out);
|
||||
out << " is not like " << data_types[i]->getName() << "\n";
|
||||
return false;
|
||||
columns[table_column]->insertDefault();
|
||||
}
|
||||
}
|
||||
|
||||
out << "parsed text: ";
|
||||
verbosePrintString(prev_position, curr_position, out);
|
||||
|
||||
if (exception)
|
||||
{
|
||||
if (data_types[i]->getName() == "DateTime")
|
||||
out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n";
|
||||
else if (data_types[i]->getName() == "Date")
|
||||
out << "ERROR: Date must be in YYYY-MM-DD format.\n";
|
||||
else
|
||||
out << "ERROR\n";
|
||||
return false;
|
||||
}
|
||||
|
||||
out << "\n";
|
||||
|
||||
if (data_types[i]->haveMaximumSizeOfValue())
|
||||
{
|
||||
if (*curr_position != '\n' && *curr_position != '\r' && *curr_position != delimiter)
|
||||
{
|
||||
out << "ERROR: garbage after " << data_types[i]->getName() << ": ";
|
||||
verbosePrintString(curr_position, std::min(curr_position + 10, in.buffer().end()), out);
|
||||
BufferBase::Position prev_position = in.position();
|
||||
BufferBase::Position curr_position = in.position();
|
||||
std::exception_ptr exception;
|
||||
|
||||
try
|
||||
{
|
||||
skipWhitespacesAndTabs(in);
|
||||
prev_position = in.position();
|
||||
current_column_type->deserializeAsTextCSV(*columns[table_column], in, format_settings);
|
||||
curr_position = in.position();
|
||||
skipWhitespacesAndTabs(in);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
exception = std::current_exception();
|
||||
}
|
||||
|
||||
if (curr_position < prev_position)
|
||||
throw Exception("Logical error: parsing is non-deterministic.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
if (isNativeNumber(current_column_type) || isDateOrDateTime(current_column_type))
|
||||
{
|
||||
/// An empty string instead of a value.
|
||||
if (curr_position == prev_position)
|
||||
{
|
||||
out << "ERROR: text ";
|
||||
verbosePrintString(prev_position, std::min(prev_position + 10, in.buffer().end()), out);
|
||||
out << " is not like " << current_column_type->getName() << "\n";
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
out << "parsed text: ";
|
||||
verbosePrintString(prev_position, curr_position, out);
|
||||
|
||||
if (exception)
|
||||
{
|
||||
if (current_column_type->getName() == "DateTime")
|
||||
out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n";
|
||||
else if (current_column_type->getName() == "Date")
|
||||
out << "ERROR: Date must be in YYYY-MM-DD format.\n";
|
||||
else
|
||||
out << "ERROR\n";
|
||||
return false;
|
||||
}
|
||||
|
||||
out << "\n";
|
||||
|
||||
if (data_types[i]->getName() == "DateTime")
|
||||
out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n";
|
||||
else if (data_types[i]->getName() == "Date")
|
||||
out << "ERROR: Date must be in YYYY-MM-DD format.\n";
|
||||
if (current_column_type->haveMaximumSizeOfValue()
|
||||
&& *curr_position != '\n' && *curr_position != '\r'
|
||||
&& *curr_position != delimiter)
|
||||
{
|
||||
out << "ERROR: garbage after " << current_column_type->getName() << ": ";
|
||||
verbosePrintString(curr_position, std::min(curr_position + 10, in.buffer().end()), out);
|
||||
out << "\n";
|
||||
|
||||
return false;
|
||||
if (current_column_type->getName() == "DateTime")
|
||||
out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n";
|
||||
else if (current_column_type->getName() == "Date")
|
||||
out << "ERROR: Date must be in YYYY-MM-DD format.\n";
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
static const String skipped_column_str = "<SKIPPED COLUMN>";
|
||||
out << "Column " << file_column << ", " << std::string((file_column < 10 ? 2 : file_column < 100 ? 1 : 0), ' ')
|
||||
<< "name: " << skipped_column_str << ", " << std::string(max_length_of_column_name - skipped_column_str.length(), ' ')
|
||||
<< "type: " << skipped_column_str << ", " << std::string(max_length_of_data_type_name - skipped_column_str.length(), ' ');
|
||||
|
||||
String tmp;
|
||||
readCSVString(tmp, in, format_settings.csv);
|
||||
}
|
||||
|
||||
/// Delimiters
|
||||
if (i + 1 == size)
|
||||
if (file_column + 1 == column_indexes_for_input_fields.size())
|
||||
{
|
||||
if (in.eof())
|
||||
return false;
|
||||
@ -294,8 +459,8 @@ bool CSVRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & columns,
|
||||
out << "ERROR: There is no line feed. ";
|
||||
verbosePrintString(in.position(), in.position() + 1, out);
|
||||
out << " found instead.\n"
|
||||
" It's like your file has more columns than expected.\n"
|
||||
"And if your file have right number of columns, maybe it have unquoted string value with comma.\n";
|
||||
" It's like your file has more columns than expected.\n"
|
||||
"And if your file have right number of columns, maybe it have unquoted string value with comma.\n";
|
||||
|
||||
return false;
|
||||
}
|
||||
@ -313,8 +478,8 @@ bool CSVRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & columns,
|
||||
if (*in.position() == '\n' || *in.position() == '\r')
|
||||
{
|
||||
out << "ERROR: Line feed found where delimiter (" << delimiter << ") is expected."
|
||||
" It's like your file has less columns than expected.\n"
|
||||
"And if your file have right number of columns, maybe it have unescaped quotes in values.\n";
|
||||
" It's like your file has less columns than expected.\n"
|
||||
"And if your file have right number of columns, maybe it have unescaped quotes in values.\n";
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -359,7 +524,7 @@ void registerInputFormatProcessorCSV(FormatFactory & factory)
|
||||
IRowInputFormat::Params params,
|
||||
const FormatSettings & settings)
|
||||
{
|
||||
return std::make_shared<CSVRowInputFormat>(buf, sample, params, with_names, settings);
|
||||
return std::make_shared<CSVRowInputFormat>(buf, sample, std::move(params), with_names, settings);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -36,8 +36,26 @@ private:
|
||||
|
||||
const FormatSettings format_settings;
|
||||
|
||||
/// For convenient diagnostics in case of an error.
|
||||
using IndexesMap = std::unordered_map<String, size_t>;
|
||||
IndexesMap column_indexes_by_names;
|
||||
|
||||
/// Maps indexes of columns in the input file to indexes of table columns
|
||||
using OptionalIndexes = std::vector<std::optional<size_t>>;
|
||||
OptionalIndexes column_indexes_for_input_fields;
|
||||
|
||||
/// Tracks which colums we have read in a single read() call.
|
||||
/// For columns that are never read, it is initialized to false when we
|
||||
/// read the file header, and never changed afterwards.
|
||||
/// For other columns, it is updated on each read() call.
|
||||
std::vector<UInt8> read_columns;
|
||||
|
||||
/// Whether we have any columns that are not read from file at all,
|
||||
/// and must be always initialized with defaults.
|
||||
bool have_always_default_columns = false;
|
||||
|
||||
void addInputColumn(const String & column_name);
|
||||
|
||||
/// For convenient diagnostics in case of an error.
|
||||
size_t row_num = 0;
|
||||
|
||||
/// How many bytes were read, not counting those that are still in the buffer.
|
||||
|
Loading…
Reference in New Issue
Block a user