Update CSVRowInputFormat.

This commit is contained in:
Nikolai Kochetov 2019-08-04 13:19:51 +03:00
parent fa9b27eec2
commit 511f3050a6
2 changed files with 64 additions and 27 deletions

View File

@ -4,6 +4,7 @@
#include <Formats/verbosePrintString.h>
#include <Processors/Formats/Impl/CSVRowInputFormat.h>
#include <Formats/FormatFactory.h>
#include <DataTypes/DataTypeNullable.h>
namespace DB
@ -27,6 +28,7 @@ CSVRowInputFormat::CSVRowInputFormat(
data_types.resize(num_columns);
column_indexes_by_names.reserve(num_columns);
column_idx_to_nullable_column_idx.resize(num_columns);
for (size_t i = 0; i < num_columns; ++i)
{
@ -34,6 +36,16 @@ CSVRowInputFormat::CSVRowInputFormat(
data_types[i] = column_info.type;
column_indexes_by_names.emplace(column_info.name, i);
/// If input_format_null_as_default=1 we need ColumnNullable of type DataTypeNullable(nested_type)
/// to parse value as nullable before inserting it in corresponding column of not-nullable type.
/// Constructing temporary column for each row is slow, so we prepare it here
if (format_settings.csv.null_as_default && !column_info.type->isNullable() && column_info.type->canBeInsideNullable())
{
column_idx_to_nullable_column_idx[i] = nullable_columns.size();
nullable_types.emplace_back(std::make_shared<DataTypeNullable>(column_info.type));
nullable_columns.emplace_back(nullable_types.back()->createColumn());
}
}
}
@ -217,33 +229,12 @@ bool CSVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext
if (table_column)
{
const auto & type = data_types[*table_column];
const bool at_delimiter = *in.position() == delimiter;
const bool at_last_column_line_end = is_last_file_column
&& (*in.position() == '\n' || *in.position() == '\r'
|| in.eof());
if (format_settings.csv.empty_as_default
&& (at_delimiter || at_last_column_line_end))
{
/// Treat empty unquoted column value as default value, if
/// specified in the settings. Tuple columns might seem
/// problematic, because they are never quoted but still contain
/// commas, which might be also used as delimiters. However,
/// they do not contain empty unquoted fields, so this check
/// works for tuples as well.
read_columns[*table_column] = false;
skipWhitespacesAndTabs(in);
read_columns[*table_column] = readField(*columns[*table_column], data_types[*table_column],
is_last_file_column, *table_column);
if (!read_columns[*table_column])
have_default_columns = true;
}
else
{
/// Read the column normally.
read_columns[*table_column] = true;
skipWhitespacesAndTabs(in);
type->deserializeAsTextCSV(*columns[*table_column], in,
format_settings);
skipWhitespacesAndTabs(in);
}
skipWhitespacesAndTabs(in);
}
else
{
@ -383,7 +374,7 @@ bool OPTIMIZE(1) CSVRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumn
{
skipWhitespacesAndTabs(in);
prev_position = in.position();
current_column_type->deserializeAsTextCSV(*columns[table_column], in, format_settings);
readField(*columns[table_column], current_column_type, is_last_file_column, table_column);
curr_position = in.position();
skipWhitespacesAndTabs(in);
}
@ -523,6 +514,45 @@ void CSVRowInputFormat::updateDiagnosticInfo()
pos_of_current_row = in.position();
}
bool CSVRowInputFormat::readField(IColumn & column, const DataTypePtr & type, bool is_last_file_column, size_t column_idx)
{
const bool at_delimiter = *in.position() == format_settings.csv.delimiter;
const bool at_last_column_line_end = is_last_file_column
&& (*in.position() == '\n' || *in.position() == '\r'
|| in.eof());
if (format_settings.csv.empty_as_default
&& (at_delimiter || at_last_column_line_end))
{
/// Treat empty unquoted column value as default value, if
/// specified in the settings. Tuple columns might seem
/// problematic, because they are never quoted but still contain
/// commas, which might be also used as delimiters. However,
/// they do not contain empty unquoted fields, so this check
/// works for tuples as well.
return false;
}
else if (column_idx_to_nullable_column_idx[column_idx])
{
/// If value is null but type is not nullable then use default value instead.
const size_t nullable_idx = *column_idx_to_nullable_column_idx[column_idx];
auto & tmp_col = *nullable_columns[nullable_idx];
nullable_types[nullable_idx]->deserializeAsTextCSV(tmp_col, in, format_settings);
Field value = tmp_col[0];
tmp_col.popBack(1); /// do not store copy of values in memory
if (value.isNull())
return false;
column.insert(value);
return true;
}
else
{
/// Read the column normally.
type->deserializeAsTextCSV(column, in, format_settings);
return true;
}
}
void registerInputFormatProcessorCSV(FormatFactory & factory)
{

View File

@ -65,10 +65,17 @@ private:
char * pos_of_current_row = nullptr;
char * pos_of_prev_row = nullptr;
/// For setting input_format_null_as_default
DataTypes nullable_types;
MutableColumns nullable_columns;
OptionalIndexes column_idx_to_nullable_column_idx;
void updateDiagnosticInfo();
bool parseRowAndPrintDiagnosticInfo(MutableColumns & columns,
WriteBuffer & out, size_t max_length_of_column_name, size_t max_length_of_data_type_name);
bool readField(IColumn & column, const DataTypePtr & type, bool is_last_file_column, size_t column_idx);
};
}