ClickHouse/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp

480 lines
17 KiB
C++
Raw Normal View History

2019-02-19 18:41:18 +00:00
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Processors/Formats/Impl/TabSeparatedRowInputFormat.h>
2020-06-28 08:41:56 +00:00
#include <Processors/Formats/Impl/TabSeparatedRawRowInputFormat.h>
2019-04-10 13:29:27 +00:00
#include <Formats/verbosePrintString.h>
2019-02-19 18:41:18 +00:00
#include <Formats/FormatFactory.h>
2019-08-23 19:47:22 +00:00
#include <DataTypes/DataTypeNothing.h>
2021-03-09 14:46:52 +00:00
#include <DataTypes/Serializations/SerializationNullable.h>
2019-02-19 18:41:18 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
extern const int LOGICAL_ERROR;
2019-02-19 18:41:18 +00:00
}
2019-08-23 19:47:22 +00:00
static void skipTSVRow(ReadBuffer & in, const size_t num_columns)
2019-07-31 14:43:08 +00:00
{
2020-10-29 17:22:48 +00:00
NullOutput null_sink;
2019-07-31 14:43:08 +00:00
for (size_t i = 0; i < num_columns; ++i)
{
2019-08-23 19:47:22 +00:00
readEscapedStringInto(null_sink, in);
assertChar(i == num_columns - 1 ? '\n' : '\t', in);
2019-07-31 14:43:08 +00:00
}
}
/** Check for a common error case - usage of Windows line feed.
*/
2019-08-23 19:47:22 +00:00
static void checkForCarriageReturn(ReadBuffer & in)
2019-07-31 14:43:08 +00:00
{
2019-08-23 19:47:22 +00:00
if (in.position()[0] == '\r' || (in.position() != in.buffer().begin() && in.position()[-1] == '\r'))
2019-07-31 14:43:08 +00:00
throw Exception("\nYou have carriage return (\\r, 0x0D, ASCII 13) at end of first row."
"\nIt's like your input data has DOS/Windows style line separators, that are illegal in TabSeparated format."
" You must transform your file to Unix format."
2020-08-15 08:11:10 +00:00
"\nBut if you really need carriage return at end of string value of last column, you need to escape it as \\r.",
2019-07-31 14:43:08 +00:00
ErrorCodes::INCORRECT_DATA);
}
2019-08-30 14:38:24 +00:00
TabSeparatedRowInputFormat::TabSeparatedRowInputFormat(const Block & header_, ReadBuffer & in_, const Params & params_,
bool with_names_, bool with_types_, const FormatSettings & format_settings_)
: RowInputFormatWithDiagnosticInfo(header_, in_, params_), with_names(with_names_), with_types(with_types_), format_settings(format_settings_)
2019-02-19 18:41:18 +00:00
{
2020-04-22 06:34:20 +00:00
const auto & sample = getPort().getHeader();
2019-02-19 18:41:18 +00:00
size_t num_columns = sample.columns();
2019-07-31 14:43:08 +00:00
2019-02-19 18:41:18 +00:00
data_types.resize(num_columns);
2019-07-31 14:43:08 +00:00
column_indexes_by_names.reserve(num_columns);
2019-02-19 18:41:18 +00:00
for (size_t i = 0; i < num_columns; ++i)
2019-07-31 14:43:08 +00:00
{
const auto & column_info = sample.getByPosition(i);
data_types[i] = column_info.type;
column_indexes_by_names.emplace(column_info.name, i);
}
2021-03-01 19:58:55 +00:00
column_mapping->column_indexes_for_input_fields.reserve(num_columns);
column_mapping->read_columns.assign(num_columns, false);
2019-02-19 18:41:18 +00:00
}
2019-07-31 14:43:08 +00:00
void TabSeparatedRowInputFormat::setupAllColumnsByTableSchema()
2019-02-19 18:41:18 +00:00
{
2020-04-22 06:34:20 +00:00
const auto & header = getPort().getHeader();
2021-03-01 19:58:55 +00:00
column_mapping->read_columns.assign(header.columns(), true);
column_mapping->column_indexes_for_input_fields.resize(header.columns());
2019-07-31 14:43:08 +00:00
2021-03-01 19:58:55 +00:00
for (size_t i = 0; i < column_mapping->column_indexes_for_input_fields.size(); ++i)
column_mapping->column_indexes_for_input_fields[i] = i;
2019-07-31 14:43:08 +00:00
}
2019-02-19 18:41:18 +00:00
2019-07-31 14:43:08 +00:00
void TabSeparatedRowInputFormat::addInputColumn(const String & column_name)
{
const auto column_it = column_indexes_by_names.find(column_name);
if (column_it == column_indexes_by_names.end())
{
if (format_settings.skip_unknown_fields)
{
2021-03-01 19:58:55 +00:00
column_mapping->column_indexes_for_input_fields.push_back(std::nullopt);
2019-07-31 14:43:08 +00:00
return;
}
throw Exception(
"Unknown field found in TSV header: '" + column_name + "' " +
2021-03-01 19:58:55 +00:00
"at position " + std::to_string(column_mapping->column_indexes_for_input_fields.size()) +
2019-07-31 14:43:08 +00:00
"\nSet the 'input_format_skip_unknown_fields' parameter explicitly to ignore and proceed",
ErrorCodes::INCORRECT_DATA
);
}
const auto column_index = column_it->second;
2021-03-01 19:58:55 +00:00
if (column_mapping->read_columns[column_index])
2019-07-31 14:43:08 +00:00
throw Exception("Duplicate field found while parsing TSV header: " + column_name, ErrorCodes::INCORRECT_DATA);
2021-03-01 19:58:55 +00:00
column_mapping->read_columns[column_index] = true;
column_mapping->column_indexes_for_input_fields.emplace_back(column_index);
2019-07-31 14:43:08 +00:00
}
void TabSeparatedRowInputFormat::fillUnreadColumnsWithDefaults(MutableColumns & columns, RowReadExtension & row_read_extension)
{
/// It is safe to memorize this on the first run - the format guarantees this does not change
if (unlikely(row_num == 1))
{
columns_to_fill_with_default_values.clear();
2021-03-01 19:58:55 +00:00
for (size_t index = 0; index < column_mapping->read_columns.size(); ++index)
if (column_mapping->read_columns[index] == 0)
2019-07-31 14:43:08 +00:00
columns_to_fill_with_default_values.push_back(index);
}
for (const auto column_index : columns_to_fill_with_default_values)
{
2019-07-31 14:43:08 +00:00
data_types[column_index]->insertDefaultInto(*columns[column_index]);
row_read_extension.read_columns[column_index] = false;
}
2019-07-31 14:43:08 +00:00
}
void TabSeparatedRowInputFormat::readPrefix()
{
if (with_names || with_types || data_types.at(0)->textCanContainOnlyValidUTF8())
2019-02-19 18:41:18 +00:00
{
/// In this format, we assume that column name or type cannot contain BOM,
/// so, if format has header,
/// then BOM at beginning of stream cannot be confused with name or type of field, and it is safe to skip it.
skipBOMIfExists(in);
}
2021-02-24 17:12:22 +00:00
/// This is a bit of abstraction leakage, but we have almost the same code in other places.
2021-02-24 18:03:02 +00:00
/// Thus, we check if this InputFormat is working with the "real" beginning of the data in case of parallel parsing.
2021-02-24 17:04:37 +00:00
if (with_names && getCurrentUnitNumber() == 0)
2019-02-19 18:41:18 +00:00
{
2019-07-31 14:43:08 +00:00
if (format_settings.with_names_use_header)
{
String column_name;
for (;;)
2019-07-31 14:43:08 +00:00
{
readEscapedString(column_name, in);
if (!checkChar('\t', in))
{
/// Check last column for \r before adding it, otherwise an error will be:
/// "Unknown field found in TSV header"
checkForCarriageReturn(in);
addInputColumn(column_name);
break;
}
else
addInputColumn(column_name);
2019-07-31 14:43:08 +00:00
}
2019-07-31 14:43:08 +00:00
if (!in.eof())
{
assertChar('\n', in);
}
}
else
2019-02-19 18:41:18 +00:00
{
2019-07-31 14:43:08 +00:00
setupAllColumnsByTableSchema();
2021-03-01 19:58:55 +00:00
skipTSVRow(in, column_mapping->column_indexes_for_input_fields.size());
2019-02-19 18:41:18 +00:00
}
}
2021-03-01 22:32:11 +00:00
else if (!column_mapping->is_set)
2019-07-31 14:43:08 +00:00
setupAllColumnsByTableSchema();
2019-02-19 18:41:18 +00:00
if (with_types)
{
2021-03-01 19:58:55 +00:00
skipTSVRow(in, column_mapping->column_indexes_for_input_fields.size());
2019-02-19 18:41:18 +00:00
}
}
2019-07-31 14:43:08 +00:00
bool TabSeparatedRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext)
2019-02-19 18:41:18 +00:00
{
if (in.eof())
return false;
updateDiagnosticInfo();
2021-03-01 19:58:55 +00:00
ext.read_columns.assign(column_mapping->read_columns.size(), true);
for (size_t file_column = 0; file_column < column_mapping->column_indexes_for_input_fields.size(); ++file_column)
2019-02-19 18:41:18 +00:00
{
2021-03-01 19:58:55 +00:00
const auto & column_index = column_mapping->column_indexes_for_input_fields[file_column];
const bool is_last_file_column = file_column + 1 == column_mapping->column_indexes_for_input_fields.size();
2019-07-31 14:43:08 +00:00
if (column_index)
{
const auto & type = data_types[*column_index];
2021-03-09 14:46:52 +00:00
ext.read_columns[*column_index] = readField(*columns[*column_index], type, serializations[*column_index], is_last_file_column);
2019-07-31 14:43:08 +00:00
}
else
{
2020-10-29 17:22:48 +00:00
NullOutput null_sink;
2019-07-31 14:43:08 +00:00
readEscapedStringInto(null_sink, in);
}
2019-02-19 18:41:18 +00:00
/// skip separators
2021-03-01 19:58:55 +00:00
if (file_column + 1 < column_mapping->column_indexes_for_input_fields.size())
2019-02-19 18:41:18 +00:00
{
2019-07-31 14:43:08 +00:00
assertChar('\t', in);
}
else if (!in.eof())
{
if (unlikely(row_num == 1))
checkForCarriageReturn(in);
2019-02-19 18:41:18 +00:00
2019-07-31 14:43:08 +00:00
assertChar('\n', in);
2019-02-19 18:41:18 +00:00
}
}
2019-07-31 14:43:08 +00:00
fillUnreadColumnsWithDefaults(columns, ext);
2019-02-19 18:41:18 +00:00
return true;
}
2021-03-09 14:46:52 +00:00
bool TabSeparatedRowInputFormat::readField(IColumn & column, const DataTypePtr & type,
const SerializationPtr & serialization, bool is_last_file_column)
{
2019-10-07 16:08:07 +00:00
const bool at_delimiter = !is_last_file_column && !in.eof() && *in.position() == '\t';
const bool at_last_column_line_end = is_last_file_column && (in.eof() || *in.position() == '\n');
2021-03-09 14:46:52 +00:00
if (format_settings.tsv.empty_as_default && (at_delimiter || at_last_column_line_end))
{
column.insertDefault();
return false;
}
else if (format_settings.null_as_default && !type->isNullable())
2021-03-09 14:46:52 +00:00
return SerializationNullable::deserializeTextEscapedImpl(column, in, format_settings, serialization);
serialization->deserializeTextEscaped(column, in, format_settings);
return true;
}
2019-08-23 19:47:22 +00:00
bool TabSeparatedRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & columns, WriteBuffer & out)
2019-02-19 18:41:18 +00:00
{
2021-03-01 19:58:55 +00:00
for (size_t file_column = 0; file_column < column_mapping->column_indexes_for_input_fields.size(); ++file_column)
2019-02-19 18:41:18 +00:00
{
2019-08-30 14:38:24 +00:00
if (file_column == 0 && in.eof())
2019-02-19 18:41:18 +00:00
{
out << "<End of stream>\n";
return false;
}
2021-03-01 19:58:55 +00:00
if (column_mapping->column_indexes_for_input_fields[file_column].has_value())
2019-02-19 18:41:18 +00:00
{
2020-04-22 06:34:20 +00:00
const auto & header = getPort().getHeader();
2021-03-01 19:58:55 +00:00
size_t col_idx = column_mapping->column_indexes_for_input_fields[file_column].value();
2019-08-23 19:47:22 +00:00
if (!deserializeFieldAndPrintDiagnosticInfo(header.getByPosition(col_idx).name, data_types[col_idx], *columns[col_idx],
2019-08-30 14:38:24 +00:00
out, file_column))
2019-02-19 18:41:18 +00:00
return false;
2019-07-31 14:43:08 +00:00
}
else
{
static const String skipped_column_str = "<SKIPPED COLUMN>";
2019-08-23 19:47:22 +00:00
static const DataTypePtr skipped_column_type = std::make_shared<DataTypeNothing>();
static const MutableColumnPtr skipped_column = skipped_column_type->createColumn();
2019-08-30 14:38:24 +00:00
if (!deserializeFieldAndPrintDiagnosticInfo(skipped_column_str, skipped_column_type, *skipped_column, out, file_column))
2019-08-23 19:47:22 +00:00
return false;
2019-02-19 18:41:18 +00:00
}
/// Delimiters
2021-03-01 19:58:55 +00:00
if (file_column + 1 == column_mapping->column_indexes_for_input_fields.size())
2019-02-19 18:41:18 +00:00
{
if (!in.eof())
{
try
{
assertChar('\n', in);
}
catch (const DB::Exception &)
{
if (*in.position() == '\t')
{
out << "ERROR: Tab found where line feed is expected."
2019-07-31 14:43:08 +00:00
" It's like your file has more columns than expected.\n"
"And if your file have right number of columns, maybe it have unescaped tab in value.\n";
2019-02-19 18:41:18 +00:00
}
else if (*in.position() == '\r')
{
out << "ERROR: Carriage return found where line feed is expected."
2019-07-31 14:43:08 +00:00
" It's like your file has DOS/Windows style line separators, that is illegal in TabSeparated format.\n";
2019-02-19 18:41:18 +00:00
}
else
{
out << "ERROR: There is no line feed. ";
verbosePrintString(in.position(), in.position() + 1, out);
out << " found instead.\n";
}
return false;
}
}
}
else
{
try
{
assertChar('\t', in);
}
catch (const DB::Exception &)
{
if (*in.position() == '\n')
{
out << "ERROR: Line feed found where tab is expected."
2019-07-31 14:43:08 +00:00
" It's like your file has less columns than expected.\n"
2019-08-30 14:38:24 +00:00
"And if your file have right number of columns, "
"maybe it have unescaped backslash in value before tab, which cause tab has escaped.\n";
2019-02-19 18:41:18 +00:00
}
else if (*in.position() == '\r')
{
out << "ERROR: Carriage return found where tab is expected.\n";
}
else
{
out << "ERROR: There is no tab. ";
verbosePrintString(in.position(), in.position() + 1, out);
out << " found instead.\n";
}
return false;
}
}
}
return true;
}
void TabSeparatedRowInputFormat::tryDeserializeField(const DataTypePtr & type, IColumn & column, size_t file_column)
2019-02-19 18:41:18 +00:00
{
const auto & index = column_mapping->column_indexes_for_input_fields[file_column];
2021-03-09 14:46:52 +00:00
if (index)
{
2020-08-20 10:06:41 +00:00
// check null value for type is not nullable. don't cross buffer bound for simplicity, so maybe missing some case
2020-08-20 10:42:14 +00:00
if (!type->isNullable() && !in.eof())
2020-08-20 10:06:41 +00:00
{
2020-08-20 10:42:14 +00:00
if (*in.position() == '\\' && in.available() >= 2)
2020-08-20 10:06:41 +00:00
{
++in.position();
2020-08-20 10:42:14 +00:00
if (*in.position() == 'N')
2020-08-20 10:06:41 +00:00
{
++in.position();
2020-08-20 17:32:49 +00:00
throw Exception(ErrorCodes::INCORRECT_DATA, "Unexpected NULL value of not Nullable type {}", type->getName());
2020-08-20 10:06:41 +00:00
}
else
{
--in.position();
}
}
2020-08-20 03:25:28 +00:00
}
2021-03-01 19:58:55 +00:00
const bool is_last_file_column = file_column + 1 == column_mapping->column_indexes_for_input_fields.size();
2021-03-09 14:46:52 +00:00
readField(column, type, serializations[*index], is_last_file_column);
}
2019-08-23 19:47:22 +00:00
else
{
2020-10-29 17:22:48 +00:00
NullOutput null_sink;
2019-08-23 19:47:22 +00:00
readEscapedStringInto(null_sink, in);
}
2019-02-19 18:41:18 +00:00
}
2019-08-23 19:47:22 +00:00
void TabSeparatedRowInputFormat::syncAfterError()
2019-02-19 18:41:18 +00:00
{
2019-08-23 19:47:22 +00:00
skipToUnescapedNextLineOrEOF(in);
2019-02-19 18:41:18 +00:00
}
void TabSeparatedRowInputFormat::resetParser()
{
RowInputFormatWithDiagnosticInfo::resetParser();
const auto & sample = getPort().getHeader();
2021-03-01 19:58:55 +00:00
column_mapping->read_columns.assign(sample.columns(), false);
column_mapping->column_indexes_for_input_fields.clear();
columns_to_fill_with_default_values.clear();
}
2019-02-19 18:41:18 +00:00
void registerInputFormatProcessorTabSeparated(FormatFactory & factory)
{
2020-04-22 06:34:20 +00:00
for (const auto * name : {"TabSeparated", "TSV"})
2019-02-19 18:41:18 +00:00
{
factory.registerInputFormatProcessor(name, [](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
2019-08-30 14:38:24 +00:00
return std::make_shared<TabSeparatedRowInputFormat>(sample, buf, params, false, false, settings);
2020-06-28 08:41:56 +00:00
});
}
for (const auto * name : {"TabSeparatedRaw", "TSVRaw"})
{
factory.registerInputFormatProcessor(name, [](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
return std::make_shared<TabSeparatedRawRowInputFormat>(sample, buf, params, false, false, settings);
2019-02-19 18:41:18 +00:00
});
}
2020-04-22 06:34:20 +00:00
for (const auto * name : {"TabSeparatedWithNames", "TSVWithNames"})
2019-02-19 18:41:18 +00:00
{
factory.registerInputFormatProcessor(name, [](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
2019-08-30 14:38:24 +00:00
return std::make_shared<TabSeparatedRowInputFormat>(sample, buf, params, true, false, settings);
2019-02-19 18:41:18 +00:00
});
}
2020-04-22 06:34:20 +00:00
for (const auto * name : {"TabSeparatedWithNamesAndTypes", "TSVWithNamesAndTypes"})
2019-02-19 18:41:18 +00:00
{
factory.registerInputFormatProcessor(name, [](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
2019-08-30 14:38:24 +00:00
return std::make_shared<TabSeparatedRowInputFormat>(sample, buf, params, true, true, settings);
2019-02-19 18:41:18 +00:00
});
}
}
2020-11-30 16:42:41 +00:00
static std::pair<bool, size_t> fileSegmentationEngineTabSeparatedImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
2019-11-20 17:24:44 +00:00
{
bool need_more_data = true;
char * pos = in.position();
2020-11-30 16:42:41 +00:00
size_t number_of_rows = 0;
2019-11-22 10:38:08 +00:00
2019-11-20 17:24:44 +00:00
while (loadAtPosition(in, memory, pos) && need_more_data)
{
pos = find_first_symbols<'\\', '\r', '\n'>(pos, in.buffer().end());
2019-11-22 10:38:08 +00:00
if (pos > in.buffer().end())
throw Exception("Position in buffer is out of bounds. There must be a bug.", ErrorCodes::LOGICAL_ERROR);
else if (pos == in.buffer().end())
2019-11-20 17:24:44 +00:00
continue;
else if (*pos == '\\')
{
2019-11-20 17:24:44 +00:00
++pos;
if (loadAtPosition(in, memory, pos))
++pos;
}
else if (*pos == '\n' || *pos == '\r')
{
2020-11-30 16:42:41 +00:00
if (*pos == '\n')
++number_of_rows;
2019-11-20 17:24:44 +00:00
if (memory.size() + static_cast<size_t>(pos - in.position()) >= min_chunk_size)
need_more_data = false;
++pos;
}
}
2019-11-22 10:38:08 +00:00
2019-11-20 17:24:44 +00:00
saveUpToPosition(in, memory, pos);
2020-11-30 16:42:41 +00:00
return {loadAtPosition(in, memory, pos), number_of_rows};
}
2019-10-01 10:48:46 +00:00
void registerFileSegmentationEngineTabSeparated(FormatFactory & factory)
{
2019-11-20 17:24:44 +00:00
// We can use the same segmentation engine for TSKV.
2021-03-01 19:58:55 +00:00
for (const auto & name : {"TabSeparated", "TSV", "TSKV", "TabSeparatedWithNames", "TSVWithNames"})
{
factory.registerFileSegmentationEngine(name, &fileSegmentationEngineTabSeparatedImpl);
2019-10-01 10:48:46 +00:00
}
}
2019-02-19 18:41:18 +00:00
}