ClickHouse/dbms/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp

426 lines
14 KiB
C++
Raw Normal View History

2019-02-19 18:41:18 +00:00
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Processors/Formats/Impl/TabSeparatedRowInputFormat.h>
2019-04-10 13:29:27 +00:00
#include <Formats/verbosePrintString.h>
2019-02-19 18:41:18 +00:00
#include <Formats/FormatFactory.h>
2019-08-23 19:47:22 +00:00
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeNullable.h>
2019-02-19 18:41:18 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
extern const int LOGICAL_ERROR;
}
2019-08-23 19:47:22 +00:00
static void skipTSVRow(ReadBuffer & in, const size_t num_columns)
2019-07-31 14:43:08 +00:00
{
NullSink null_sink;
for (size_t i = 0; i < num_columns; ++i)
{
2019-08-23 19:47:22 +00:00
readEscapedStringInto(null_sink, in);
assertChar(i == num_columns - 1 ? '\n' : '\t', in);
2019-07-31 14:43:08 +00:00
}
}
/** Check for a common error case - usage of Windows line feed.
*/
2019-08-23 19:47:22 +00:00
static void checkForCarriageReturn(ReadBuffer & in)
2019-07-31 14:43:08 +00:00
{
2019-08-23 19:47:22 +00:00
if (in.position()[0] == '\r' || (in.position() != in.buffer().begin() && in.position()[-1] == '\r'))
2019-07-31 14:43:08 +00:00
throw Exception("\nYou have carriage return (\\r, 0x0D, ASCII 13) at end of first row."
"\nIt's like your input data has DOS/Windows style line separators, that are illegal in TabSeparated format."
" You must transform your file to Unix format."
"\nBut if you really need carriage return at end of string value of last column, you need to escape it as \\r.",
ErrorCodes::INCORRECT_DATA);
}
2019-08-30 14:38:24 +00:00
TabSeparatedRowInputFormat::TabSeparatedRowInputFormat(const Block & header_, ReadBuffer & in_, const Params & params_,
bool with_names_, bool with_types_, const FormatSettings & format_settings_)
: RowInputFormatWithDiagnosticInfo(header_, in_, params_), with_names(with_names_), with_types(with_types_), format_settings(format_settings_)
2019-02-19 18:41:18 +00:00
{
auto & sample = getPort().getHeader();
size_t num_columns = sample.columns();
2019-07-31 14:43:08 +00:00
2019-02-19 18:41:18 +00:00
data_types.resize(num_columns);
2019-07-31 14:43:08 +00:00
column_indexes_by_names.reserve(num_columns);
2019-02-19 18:41:18 +00:00
for (size_t i = 0; i < num_columns; ++i)
2019-07-31 14:43:08 +00:00
{
const auto & column_info = sample.getByPosition(i);
data_types[i] = column_info.type;
column_indexes_by_names.emplace(column_info.name, i);
}
column_indexes_for_input_fields.reserve(num_columns);
read_columns.assign(num_columns, false);
2019-02-19 18:41:18 +00:00
}
2019-07-31 14:43:08 +00:00
void TabSeparatedRowInputFormat::setupAllColumnsByTableSchema()
2019-02-19 18:41:18 +00:00
{
auto & header = getPort().getHeader();
2019-07-31 14:43:08 +00:00
read_columns.assign(header.columns(), true);
column_indexes_for_input_fields.resize(header.columns());
for (size_t i = 0; i < column_indexes_for_input_fields.size(); ++i)
column_indexes_for_input_fields[i] = i;
}
2019-02-19 18:41:18 +00:00
2019-07-31 14:43:08 +00:00
void TabSeparatedRowInputFormat::addInputColumn(const String & column_name)
{
const auto column_it = column_indexes_by_names.find(column_name);
if (column_it == column_indexes_by_names.end())
{
if (format_settings.skip_unknown_fields)
{
column_indexes_for_input_fields.push_back(std::nullopt);
return;
}
throw Exception(
"Unknown field found in TSV header: '" + column_name + "' " +
"at position " + std::to_string(column_indexes_for_input_fields.size()) +
"\nSet the 'input_format_skip_unknown_fields' parameter explicitly to ignore and proceed",
ErrorCodes::INCORRECT_DATA
);
}
const auto column_index = column_it->second;
if (read_columns[column_index])
throw Exception("Duplicate field found while parsing TSV header: " + column_name, ErrorCodes::INCORRECT_DATA);
read_columns[column_index] = true;
column_indexes_for_input_fields.emplace_back(column_index);
}
void TabSeparatedRowInputFormat::fillUnreadColumnsWithDefaults(MutableColumns & columns, RowReadExtension & row_read_extension)
{
/// It is safe to memorize this on the first run - the format guarantees this does not change
if (unlikely(row_num == 1))
{
columns_to_fill_with_default_values.clear();
for (size_t index = 0; index < read_columns.size(); ++index)
if (read_columns[index] == 0)
columns_to_fill_with_default_values.push_back(index);
}
for (const auto column_index : columns_to_fill_with_default_values)
{
2019-07-31 14:43:08 +00:00
data_types[column_index]->insertDefaultInto(*columns[column_index]);
row_read_extension.read_columns[column_index] = false;
}
2019-07-31 14:43:08 +00:00
}
void TabSeparatedRowInputFormat::readPrefix()
{
2019-02-19 18:41:18 +00:00
if (with_names || with_types)
{
/// In this format, we assume that column name or type cannot contain BOM,
/// so, if format has header,
/// then BOM at beginning of stream cannot be confused with name or type of field, and it is safe to skip it.
skipBOMIfExists(in);
}
if (with_names)
{
2019-07-31 14:43:08 +00:00
if (format_settings.with_names_use_header)
{
String column_name;
do
{
readEscapedString(column_name, in);
addInputColumn(column_name);
}
while (checkChar('\t', in));
if (!in.eof())
{
checkForCarriageReturn(in);
assertChar('\n', in);
}
}
else
2019-02-19 18:41:18 +00:00
{
2019-07-31 14:43:08 +00:00
setupAllColumnsByTableSchema();
skipTSVRow(in, column_indexes_for_input_fields.size());
2019-02-19 18:41:18 +00:00
}
}
2019-07-31 14:43:08 +00:00
else
setupAllColumnsByTableSchema();
2019-02-19 18:41:18 +00:00
if (with_types)
{
2019-07-31 14:43:08 +00:00
skipTSVRow(in, column_indexes_for_input_fields.size());
2019-02-19 18:41:18 +00:00
}
}
2019-07-31 14:43:08 +00:00
bool TabSeparatedRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext)
2019-02-19 18:41:18 +00:00
{
if (in.eof())
return false;
updateDiagnosticInfo();
2019-10-07 16:08:07 +00:00
ext.read_columns.assign(read_columns.size(), true);
2019-08-30 14:38:24 +00:00
for (size_t file_column = 0; file_column < column_indexes_for_input_fields.size(); ++file_column)
2019-02-19 18:41:18 +00:00
{
2019-08-30 14:38:24 +00:00
const auto & column_index = column_indexes_for_input_fields[file_column];
const bool is_last_file_column = file_column + 1 == column_indexes_for_input_fields.size();
2019-07-31 14:43:08 +00:00
if (column_index)
{
const auto & type = data_types[*column_index];
ext.read_columns[*column_index] = readField(*columns[*column_index], type, is_last_file_column);
2019-07-31 14:43:08 +00:00
}
else
{
NullSink null_sink;
readEscapedStringInto(null_sink, in);
}
2019-02-19 18:41:18 +00:00
/// skip separators
2019-08-30 14:38:24 +00:00
if (file_column + 1 < column_indexes_for_input_fields.size())
2019-02-19 18:41:18 +00:00
{
2019-07-31 14:43:08 +00:00
assertChar('\t', in);
}
else if (!in.eof())
{
if (unlikely(row_num == 1))
checkForCarriageReturn(in);
2019-02-19 18:41:18 +00:00
2019-07-31 14:43:08 +00:00
assertChar('\n', in);
2019-02-19 18:41:18 +00:00
}
}
2019-07-31 14:43:08 +00:00
fillUnreadColumnsWithDefaults(columns, ext);
2019-02-19 18:41:18 +00:00
return true;
}
bool TabSeparatedRowInputFormat::readField(IColumn & column, const DataTypePtr & type, bool is_last_file_column)
{
2019-10-07 16:08:07 +00:00
const bool at_delimiter = !is_last_file_column && !in.eof() && *in.position() == '\t';
const bool at_last_column_line_end = is_last_file_column && (in.eof() || *in.position() == '\n');
if (format_settings.tsv.empty_as_default && (at_delimiter || at_last_column_line_end))
{
column.insertDefault();
return false;
}
else if (format_settings.null_as_default && !type->isNullable())
2019-10-07 16:08:07 +00:00
return DataTypeNullable::deserializeTextEscaped(column, in, format_settings, type);
type->deserializeAsTextEscaped(column, in, format_settings);
return true;
}
2019-08-23 19:47:22 +00:00
bool TabSeparatedRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & columns, WriteBuffer & out)
2019-02-19 18:41:18 +00:00
{
2019-08-30 14:38:24 +00:00
for (size_t file_column = 0; file_column < column_indexes_for_input_fields.size(); ++file_column)
2019-02-19 18:41:18 +00:00
{
2019-08-30 14:38:24 +00:00
if (file_column == 0 && in.eof())
2019-02-19 18:41:18 +00:00
{
out << "<End of stream>\n";
return false;
}
2019-08-30 14:38:24 +00:00
if (column_indexes_for_input_fields[file_column].has_value())
2019-02-19 18:41:18 +00:00
{
2019-08-23 19:47:22 +00:00
auto & header = getPort().getHeader();
2019-08-30 14:38:24 +00:00
size_t col_idx = column_indexes_for_input_fields[file_column].value();
2019-08-23 19:47:22 +00:00
if (!deserializeFieldAndPrintDiagnosticInfo(header.getByPosition(col_idx).name, data_types[col_idx], *columns[col_idx],
2019-08-30 14:38:24 +00:00
out, file_column))
2019-02-19 18:41:18 +00:00
return false;
2019-07-31 14:43:08 +00:00
}
else
{
static const String skipped_column_str = "<SKIPPED COLUMN>";
2019-08-23 19:47:22 +00:00
static const DataTypePtr skipped_column_type = std::make_shared<DataTypeNothing>();
static const MutableColumnPtr skipped_column = skipped_column_type->createColumn();
2019-08-30 14:38:24 +00:00
if (!deserializeFieldAndPrintDiagnosticInfo(skipped_column_str, skipped_column_type, *skipped_column, out, file_column))
2019-08-23 19:47:22 +00:00
return false;
2019-02-19 18:41:18 +00:00
}
/// Delimiters
2019-08-30 14:38:24 +00:00
if (file_column + 1 == column_indexes_for_input_fields.size())
2019-02-19 18:41:18 +00:00
{
if (!in.eof())
{
try
{
assertChar('\n', in);
}
catch (const DB::Exception &)
{
if (*in.position() == '\t')
{
out << "ERROR: Tab found where line feed is expected."
2019-07-31 14:43:08 +00:00
" It's like your file has more columns than expected.\n"
"And if your file have right number of columns, maybe it have unescaped tab in value.\n";
2019-02-19 18:41:18 +00:00
}
else if (*in.position() == '\r')
{
out << "ERROR: Carriage return found where line feed is expected."
2019-07-31 14:43:08 +00:00
" It's like your file has DOS/Windows style line separators, that is illegal in TabSeparated format.\n";
2019-02-19 18:41:18 +00:00
}
else
{
out << "ERROR: There is no line feed. ";
verbosePrintString(in.position(), in.position() + 1, out);
out << " found instead.\n";
}
return false;
}
}
}
else
{
try
{
assertChar('\t', in);
}
catch (const DB::Exception &)
{
if (*in.position() == '\n')
{
out << "ERROR: Line feed found where tab is expected."
2019-07-31 14:43:08 +00:00
" It's like your file has less columns than expected.\n"
2019-08-30 14:38:24 +00:00
"And if your file have right number of columns, "
"maybe it have unescaped backslash in value before tab, which cause tab has escaped.\n";
2019-02-19 18:41:18 +00:00
}
else if (*in.position() == '\r')
{
out << "ERROR: Carriage return found where tab is expected.\n";
}
else
{
out << "ERROR: There is no tab. ";
verbosePrintString(in.position(), in.position() + 1, out);
out << " found instead.\n";
}
return false;
}
}
}
return true;
}
2019-08-30 14:38:24 +00:00
void TabSeparatedRowInputFormat::tryDeserializeFiled(const DataTypePtr & type, IColumn & column, size_t file_column,
ReadBuffer::Position & prev_pos, ReadBuffer::Position & curr_pos)
2019-02-19 18:41:18 +00:00
{
2019-08-23 19:47:22 +00:00
prev_pos = in.position();
2019-08-30 14:38:24 +00:00
if (column_indexes_for_input_fields[file_column])
{
const bool is_last_file_column = file_column + 1 == column_indexes_for_input_fields.size();
readField(column, type, is_last_file_column);
}
2019-08-23 19:47:22 +00:00
else
{
NullSink null_sink;
readEscapedStringInto(null_sink, in);
}
curr_pos = in.position();
2019-02-19 18:41:18 +00:00
}
2019-08-23 19:47:22 +00:00
void TabSeparatedRowInputFormat::syncAfterError()
2019-02-19 18:41:18 +00:00
{
2019-08-23 19:47:22 +00:00
skipToUnescapedNextLineOrEOF(in);
2019-02-19 18:41:18 +00:00
}
void registerInputFormatProcessorTabSeparated(FormatFactory & factory)
{
for (auto name : {"TabSeparated", "TSV"})
{
factory.registerInputFormatProcessor(name, [](
ReadBuffer & buf,
const Block & sample,
const Context &,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
2019-08-30 14:38:24 +00:00
return std::make_shared<TabSeparatedRowInputFormat>(sample, buf, params, false, false, settings);
2019-02-19 18:41:18 +00:00
});
}
for (auto name : {"TabSeparatedWithNames", "TSVWithNames"})
{
factory.registerInputFormatProcessor(name, [](
ReadBuffer & buf,
const Block & sample,
const Context &,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
2019-08-30 14:38:24 +00:00
return std::make_shared<TabSeparatedRowInputFormat>(sample, buf, params, true, false, settings);
2019-02-19 18:41:18 +00:00
});
}
for (auto name : {"TabSeparatedWithNamesAndTypes", "TSVWithNamesAndTypes"})
{
factory.registerInputFormatProcessor(name, [](
ReadBuffer & buf,
const Block & sample,
const Context &,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
2019-08-30 14:38:24 +00:00
return std::make_shared<TabSeparatedRowInputFormat>(sample, buf, params, true, true, settings);
2019-02-19 18:41:18 +00:00
});
}
}
2019-11-20 17:24:44 +00:00
bool fileSegmentationEngineTabSeparatedImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
{
bool need_more_data = true;
char * pos = in.position();
while (loadAtPosition(in, memory, pos) && need_more_data)
{
pos = find_first_symbols<'\\', '\r', '\n'>(pos, in.buffer().end());
if (pos == in.buffer().end())
continue;
2019-10-01 10:48:46 +00:00
2019-11-20 17:24:44 +00:00
if (*pos == '\\')
{
2019-11-20 17:24:44 +00:00
++pos;
if (loadAtPosition(in, memory, pos))
++pos;
}
else if (*pos == '\n' || *pos == '\r')
{
if (memory.size() + static_cast<size_t>(pos - in.position()) >= min_chunk_size)
need_more_data = false;
2019-11-20 17:24:44 +00:00
++pos;
}
}
2019-11-20 17:24:44 +00:00
saveUpToPosition(in, memory, pos);
return loadAtPosition(in, memory, pos);
}
2019-10-01 10:48:46 +00:00
void registerFileSegmentationEngineTabSeparated(FormatFactory & factory)
{
2019-11-20 17:24:44 +00:00
// We can use the same segmentation engine for TSKV.
for (auto name : {"TabSeparated", "TSV", "TSKV"})
{
factory.registerFileSegmentationEngine(name, &fileSegmentationEngineTabSeparatedImpl);
2019-10-01 10:48:46 +00:00
}
}
2019-02-19 18:41:18 +00:00
}