ClickHouse/dbms/src/Processors/Formats/Impl/CSVRowInputFormat.cpp

486 lines
16 KiB
C++
Raw Normal View History

2019-02-19 18:41:18 +00:00
#include <IO/ReadHelpers.h>
#include <IO/Operators.h>
2019-04-10 13:27:51 +00:00
#include <Formats/verbosePrintString.h>
2019-02-19 18:41:18 +00:00
#include <Processors/Formats/Impl/CSVRowInputFormat.h>
#include <Formats/FormatFactory.h>
2019-08-04 10:19:51 +00:00
#include <DataTypes/DataTypeNullable.h>
2019-08-23 19:47:22 +00:00
#include <DataTypes/DataTypeNothing.h>
2019-02-19 18:41:18 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
extern const int LOGICAL_ERROR;
}
2019-08-30 14:38:24 +00:00
CSVRowInputFormat::CSVRowInputFormat(const Block & header_, ReadBuffer & in_, const Params & params_,
bool with_names_, const FormatSettings & format_settings_)
2019-08-23 19:47:22 +00:00
: RowInputFormatWithDiagnosticInfo(header_, in_, params_)
2019-07-30 14:55:59 +00:00
, with_names(with_names_)
2019-08-03 11:02:40 +00:00
, format_settings(format_settings_)
2019-02-19 18:41:18 +00:00
{
2019-10-16 14:22:22 +00:00
const String bad_delimiters = " \t\"'.UL";
if (bad_delimiters.find(format_settings.csv.delimiter) != String::npos)
throw Exception(String("CSV format may not work correctly with delimiter '") + format_settings.csv.delimiter +
"'. Try use CustomSeparated format instead.", ErrorCodes::BAD_ARGUMENTS);
2019-02-19 18:41:18 +00:00
auto & sample = getPort().getHeader();
size_t num_columns = sample.columns();
2019-07-30 14:55:59 +00:00
2019-02-19 18:41:18 +00:00
data_types.resize(num_columns);
2019-07-30 14:55:59 +00:00
column_indexes_by_names.reserve(num_columns);
2019-02-19 18:41:18 +00:00
for (size_t i = 0; i < num_columns; ++i)
2019-07-30 14:55:59 +00:00
{
const auto & column_info = sample.getByPosition(i);
data_types[i] = column_info.type;
column_indexes_by_names.emplace(column_info.name, i);
}
2019-02-19 18:41:18 +00:00
}
2019-07-30 14:55:59 +00:00
/// Map an input file column to a table column, based on its name.
void CSVRowInputFormat::addInputColumn(const String & column_name)
{
const auto column_it = column_indexes_by_names.find(column_name);
if (column_it == column_indexes_by_names.end())
{
if (format_settings.skip_unknown_fields)
{
column_indexes_for_input_fields.push_back(std::nullopt);
return;
}
throw Exception(
"Unknown field found in CSV header: '" + column_name + "' " +
"at position " + std::to_string(column_indexes_for_input_fields.size()) +
"\nSet the 'input_format_skip_unknown_fields' parameter explicitly to ignore and proceed",
ErrorCodes::INCORRECT_DATA
);
}
const auto column_index = column_it->second;
if (read_columns[column_index])
throw Exception("Duplicate field found while parsing CSV header: " + column_name, ErrorCodes::INCORRECT_DATA);
read_columns[column_index] = true;
column_indexes_for_input_fields.emplace_back(column_index);
}
2019-08-23 19:47:22 +00:00
static void skipEndOfLine(ReadBuffer & in)
2019-02-19 18:41:18 +00:00
{
/// \n (Unix) or \r\n (DOS/Windows) or \n\r (Mac OS Classic)
2019-08-23 19:47:22 +00:00
if (*in.position() == '\n')
2019-02-19 18:41:18 +00:00
{
2019-08-23 19:47:22 +00:00
++in.position();
if (!in.eof() && *in.position() == '\r')
++in.position();
2019-02-19 18:41:18 +00:00
}
2019-08-23 19:47:22 +00:00
else if (*in.position() == '\r')
2019-02-19 18:41:18 +00:00
{
2019-08-23 19:47:22 +00:00
++in.position();
if (!in.eof() && *in.position() == '\n')
++in.position();
2019-02-19 18:41:18 +00:00
else
throw Exception("Cannot parse CSV format: found \\r (CR) not followed by \\n (LF)."
" Line must end by \\n (LF) or \\r\\n (CR LF) or \\n\\r.", ErrorCodes::INCORRECT_DATA);
}
2019-08-23 19:47:22 +00:00
else if (!in.eof())
2019-02-19 18:41:18 +00:00
throw Exception("Expected end of line", ErrorCodes::INCORRECT_DATA);
}
2019-08-23 19:47:22 +00:00
static void skipDelimiter(ReadBuffer & in, const char delimiter, bool is_last_column)
2019-02-19 18:41:18 +00:00
{
if (is_last_column)
{
2019-08-23 19:47:22 +00:00
if (in.eof())
2019-02-19 18:41:18 +00:00
return;
/// we support the extra delimiter at the end of the line
2019-08-23 19:47:22 +00:00
if (*in.position() == delimiter)
2019-02-19 18:41:18 +00:00
{
2019-08-23 19:47:22 +00:00
++in.position();
if (in.eof())
2019-02-19 18:41:18 +00:00
return;
}
2019-08-23 19:47:22 +00:00
skipEndOfLine(in);
2019-02-19 18:41:18 +00:00
}
else
2019-08-23 19:47:22 +00:00
assertChar(delimiter, in);
2019-02-19 18:41:18 +00:00
}
/// Skip `whitespace` symbols allowed in CSV.
2019-08-23 19:47:22 +00:00
static inline void skipWhitespacesAndTabs(ReadBuffer & in)
2019-02-19 18:41:18 +00:00
{
2019-08-23 19:47:22 +00:00
while (!in.eof()
&& (*in.position() == ' '
|| *in.position() == '\t'))
++in.position();
2019-02-19 18:41:18 +00:00
}
2019-08-23 19:47:22 +00:00
static void skipRow(ReadBuffer & in, const FormatSettings::CSV & settings, size_t num_columns)
2019-02-19 18:41:18 +00:00
{
String tmp;
for (size_t i = 0; i < num_columns; ++i)
{
2019-08-23 19:47:22 +00:00
skipWhitespacesAndTabs(in);
readCSVString(tmp, in, settings);
skipWhitespacesAndTabs(in);
2019-02-19 18:41:18 +00:00
2019-08-23 19:47:22 +00:00
skipDelimiter(in, settings.delimiter, i + 1 == num_columns);
2019-02-19 18:41:18 +00:00
}
}
void CSVRowInputFormat::readPrefix()
{
/// In this format, we assume, that if first string field contain BOM as value, it will be written in quotes,
/// so BOM at beginning of stream cannot be confused with BOM in first string value, and it is safe to skip it.
skipBOMIfExists(in);
size_t num_columns = data_types.size();
2019-07-30 18:22:01 +00:00
auto & header = getPort().getHeader();
2019-02-19 18:41:18 +00:00
if (with_names)
2019-07-30 14:55:59 +00:00
{
/// This CSV file has a header row with column names. Depending on the
/// settings, use it or skip it.
if (format_settings.with_names_use_header)
{
/// Look at the file header to see which columns we have there.
/// The missing columns are filled with defaults.
2019-07-30 18:22:01 +00:00
read_columns.assign(header.columns(), false);
2019-07-30 14:55:59 +00:00
do
{
String column_name;
skipWhitespacesAndTabs(in);
readCSVString(column_name, in, format_settings.csv);
skipWhitespacesAndTabs(in);
addInputColumn(column_name);
}
while (checkChar(format_settings.csv.delimiter, in));
skipDelimiter(in, format_settings.csv.delimiter, true);
for (auto read_column : read_columns)
{
2019-07-31 14:16:28 +00:00
if (!read_column)
2019-07-30 14:55:59 +00:00
{
have_always_default_columns = true;
break;
}
}
return;
}
else
skipRow(in, format_settings.csv, num_columns);
}
2019-07-30 18:21:12 +00:00
/// The default: map each column of the file to the column of the table with
/// the same index.
read_columns.assign(header.columns(), true);
column_indexes_for_input_fields.resize(header.columns());
for (size_t i = 0; i < column_indexes_for_input_fields.size(); ++i)
{
column_indexes_for_input_fields[i] = i;
}
2019-02-19 18:41:18 +00:00
}
2019-07-30 14:55:59 +00:00
bool CSVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext)
2019-02-19 18:41:18 +00:00
{
if (in.eof())
return false;
updateDiagnosticInfo();
2019-07-30 14:55:59 +00:00
/// Track whether we have to fill any columns in this row with default
/// values. If not, we return an empty column mask to the caller, so that
/// it doesn't have to check it.
bool have_default_columns = have_always_default_columns;
2019-02-19 18:41:18 +00:00
2019-10-07 16:08:07 +00:00
ext.read_columns.assign(read_columns.size(), true);
2019-07-30 14:55:59 +00:00
const auto delimiter = format_settings.csv.delimiter;
for (size_t file_column = 0; file_column < column_indexes_for_input_fields.size(); ++file_column)
2019-02-19 18:41:18 +00:00
{
2019-07-30 14:55:59 +00:00
const auto & table_column = column_indexes_for_input_fields[file_column];
2019-08-23 19:47:22 +00:00
const bool is_last_file_column = file_column + 1 == column_indexes_for_input_fields.size();
2019-07-30 14:55:59 +00:00
if (table_column)
{
2019-08-04 10:19:51 +00:00
skipWhitespacesAndTabs(in);
2019-10-04 17:19:49 +00:00
ext.read_columns[*table_column] = readField(*columns[*table_column], data_types[*table_column], is_last_file_column);
if (!ext.read_columns[*table_column])
2019-07-30 14:55:59 +00:00
have_default_columns = true;
2019-08-04 10:19:51 +00:00
skipWhitespacesAndTabs(in);
2019-07-30 14:55:59 +00:00
}
else
{
/// We never read this column from the file, just skip it.
String tmp;
readCSVString(tmp, in, format_settings.csv);
}
2019-02-19 18:41:18 +00:00
2019-07-30 14:55:59 +00:00
skipDelimiter(in, delimiter, is_last_file_column);
}
if (have_default_columns)
{
for (size_t i = 0; i < read_columns.size(); i++)
{
if (!read_columns[i])
{
/// The column value for this row is going to be overwritten
/// with default by the caller, but the general assumption is
/// that the column size increases for each row, so we have
/// to insert something. Since we do not care about the exact
/// value, we do not have to use the default value specified by
/// the data type, and can just use IColumn::insertDefault().
columns[i]->insertDefault();
2019-10-04 17:19:49 +00:00
ext.read_columns[i] = false;
2019-07-30 14:55:59 +00:00
}
}
2019-02-19 18:41:18 +00:00
}
return true;
}
2019-08-23 19:47:22 +00:00
bool CSVRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & columns, WriteBuffer & out)
2019-02-19 18:41:18 +00:00
{
const char delimiter = format_settings.csv.delimiter;
2019-07-30 14:55:59 +00:00
for (size_t file_column = 0; file_column < column_indexes_for_input_fields.size(); ++file_column)
2019-02-19 18:41:18 +00:00
{
2019-07-30 14:55:59 +00:00
if (file_column == 0 && in.eof())
2019-02-19 18:41:18 +00:00
{
out << "<End of stream>\n";
return false;
}
2019-07-30 14:55:59 +00:00
if (column_indexes_for_input_fields[file_column].has_value())
2019-02-19 18:41:18 +00:00
{
2019-07-30 14:55:59 +00:00
auto & header = getPort().getHeader();
2019-08-23 19:47:22 +00:00
size_t col_idx = column_indexes_for_input_fields[file_column].value();
if (!deserializeFieldAndPrintDiagnosticInfo(header.getByPosition(col_idx).name, data_types[col_idx], *columns[col_idx],
out, file_column))
return false;
2019-02-19 18:41:18 +00:00
}
2019-07-30 14:55:59 +00:00
else
{
static const String skipped_column_str = "<SKIPPED COLUMN>";
2019-08-23 19:47:22 +00:00
static const DataTypePtr skipped_column_type = std::make_shared<DataTypeNothing>();
static const MutableColumnPtr skipped_column = skipped_column_type->createColumn();
if (!deserializeFieldAndPrintDiagnosticInfo(skipped_column_str, skipped_column_type, *skipped_column, out, file_column))
return false;
2019-07-30 14:55:59 +00:00
}
2019-02-19 18:41:18 +00:00
/// Delimiters
2019-07-30 14:55:59 +00:00
if (file_column + 1 == column_indexes_for_input_fields.size())
2019-02-19 18:41:18 +00:00
{
if (in.eof())
return false;
/// we support the extra delimiter at the end of the line
if (*in.position() == delimiter)
{
++in.position();
if (in.eof())
break;
}
if (!in.eof() && *in.position() != '\n' && *in.position() != '\r')
{
out << "ERROR: There is no line feed. ";
verbosePrintString(in.position(), in.position() + 1, out);
out << " found instead.\n"
2019-07-30 14:55:59 +00:00
" It's like your file has more columns than expected.\n"
"And if your file have right number of columns, maybe it have unquoted string value with comma.\n";
2019-02-19 18:41:18 +00:00
return false;
}
skipEndOfLine(in);
}
else
{
try
{
assertChar(delimiter, in);
}
catch (const DB::Exception &)
{
if (*in.position() == '\n' || *in.position() == '\r')
{
out << "ERROR: Line feed found where delimiter (" << delimiter << ") is expected."
2019-07-30 14:55:59 +00:00
" It's like your file has less columns than expected.\n"
"And if your file have right number of columns, maybe it have unescaped quotes in values.\n";
2019-02-19 18:41:18 +00:00
}
else
{
out << "ERROR: There is no delimiter (" << delimiter << "). ";
verbosePrintString(in.position(), in.position() + 1, out);
out << " found instead.\n";
}
return false;
}
}
}
return true;
}
void CSVRowInputFormat::syncAfterError()
{
skipToNextLineOrEOF(in);
}
2019-08-30 14:38:24 +00:00
void CSVRowInputFormat::tryDeserializeFiled(const DataTypePtr & type, IColumn & column, size_t file_column,
ReadBuffer::Position & prev_pos, ReadBuffer::Position & curr_pos)
2019-02-19 18:41:18 +00:00
{
2019-08-23 19:47:22 +00:00
skipWhitespacesAndTabs(in);
prev_pos = in.position();
2019-02-19 18:41:18 +00:00
2019-08-30 14:38:24 +00:00
if (column_indexes_for_input_fields[file_column])
2019-08-23 19:47:22 +00:00
{
2019-08-30 14:38:24 +00:00
const bool is_last_file_column = file_column + 1 == column_indexes_for_input_fields.size();
2019-10-04 17:19:49 +00:00
readField(column, type, is_last_file_column);
2019-08-23 19:47:22 +00:00
}
else
{
String tmp;
readCSVString(tmp, in, format_settings.csv);
}
2019-02-19 18:41:18 +00:00
2019-08-23 19:47:22 +00:00
curr_pos = in.position();
skipWhitespacesAndTabs(in);
2019-02-19 18:41:18 +00:00
}
2019-10-04 17:19:49 +00:00
bool CSVRowInputFormat::readField(IColumn & column, const DataTypePtr & type, bool is_last_file_column)
2019-08-04 10:19:51 +00:00
{
2019-08-19 23:09:02 +00:00
const bool at_delimiter = !in.eof() && *in.position() == format_settings.csv.delimiter;
2019-08-04 10:19:51 +00:00
const bool at_last_column_line_end = is_last_file_column
2019-08-19 23:09:02 +00:00
&& (in.eof() || *in.position() == '\n' || *in.position() == '\r');
2019-08-04 10:19:51 +00:00
2019-10-07 16:08:07 +00:00
/// Note: Tuples are serialized in CSV as separate columns, but with empty_as_default or null_as_default
/// only one empty or NULL column will be expected
2019-08-04 10:19:51 +00:00
if (format_settings.csv.empty_as_default
&& (at_delimiter || at_last_column_line_end))
{
/// Treat empty unquoted column value as default value, if
/// specified in the settings. Tuple columns might seem
/// problematic, because they are never quoted but still contain
/// commas, which might be also used as delimiters. However,
/// they do not contain empty unquoted fields, so this check
/// works for tuples as well.
2019-10-04 17:19:49 +00:00
column.insertDefault();
2019-08-04 10:19:51 +00:00
return false;
}
2019-10-04 17:19:49 +00:00
else if (format_settings.null_as_default && !type->isNullable())
2019-08-04 10:19:51 +00:00
{
/// If value is null but type is not nullable then use default value instead.
2019-10-04 17:19:49 +00:00
return DataTypeNullable::deserializeTextCSV(column, in, format_settings, type);
2019-08-04 10:19:51 +00:00
}
else
{
/// Read the column normally.
type->deserializeAsTextCSV(column, in, format_settings);
return true;
}
}
2019-02-19 18:41:18 +00:00
void registerInputFormatProcessorCSV(FormatFactory & factory)
{
for (bool with_names : {false, true})
{
factory.registerInputFormatProcessor(with_names ? "CSVWithNames" : "CSV", [=](
ReadBuffer & buf,
const Block & sample,
const Context &,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
2019-08-30 14:38:24 +00:00
return std::make_shared<CSVRowInputFormat>(sample, buf, params, with_names, settings);
2019-02-19 18:41:18 +00:00
});
}
}
2019-11-22 10:38:08 +00:00
bool fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
{
char * pos = in.position();
bool quotes = false;
bool need_more_data = true;
while (loadAtPosition(in, memory, pos) && need_more_data)
{
if (quotes)
{
pos = find_first_symbols<'"'>(pos, in.buffer().end());
if (pos == in.buffer().end())
continue;
if (*pos == '"')
{
++pos;
if (loadAtPosition(in, memory, pos) && *pos == '"')
++pos;
else
quotes = false;
}
}
else
{
pos = find_first_symbols<'"','\r', '\n'>(pos, in.buffer().end());
if (pos == in.buffer().end())
continue;
if (*pos == '"')
{
quotes = true;
++pos;
}
else if (*pos == '\n')
{
if (memory.size() + static_cast<size_t>(pos - in.position()) >= min_chunk_size)
need_more_data = false;
++pos;
if (loadAtPosition(in, memory, pos) && *pos == '\r')
++pos;
}
else if (*pos == '\r')
{
if (memory.size() + static_cast<size_t>(pos - in.position()) >= min_chunk_size)
need_more_data = false;
++pos;
if (loadAtPosition(in, memory, pos) && *pos == '\n')
++pos;
}
}
}
saveUpToPosition(in, memory, pos);
return loadAtPosition(in, memory, pos);
2019-11-22 10:38:08 +00:00
}
void registerFileSegmentationEngineCSV(FormatFactory & factory)
{
factory.registerFileSegmentationEngine("CSV", &fileSegmentationEngineCSVImpl);
}
2019-02-19 18:41:18 +00:00
}