improvements

This commit is contained in:
Alexander Tokmakov 2019-07-19 15:10:05 +03:00
parent 4c8c516208
commit 8146126dfd
4 changed files with 89 additions and 80 deletions

View File

@ -272,9 +272,58 @@ void DataTypeNullable::serializeTextCSV(const IColumn & column, size_t row_num,
void DataTypeNullable::deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
safeDeserialize(column,
[&istr] { return checkStringByFirstCharacterAndAssertTheRest("\\N", istr); },
[this, &settings, &istr] (IColumn & nested) { nested_data_type->deserializeAsTextCSV(nested, istr, settings); });
constexpr char const * null_literal = "NULL";
constexpr size_t len = 4;
size_t null_prefix_len = 0;
auto check_for_null = [&istr, &settings, &null_prefix_len]
{
if (checkStringByFirstCharacterAndAssertTheRest("\\N", istr))
return true;
if (!settings.csv.unquoted_null_literal_as_null)
return false;
/// Check for unquoted NULL
while (!istr.eof() && null_prefix_len < len && null_literal[null_prefix_len] == *istr.position())
{
++null_prefix_len;
++istr.position();
}
if (null_prefix_len == len)
return true;
/// Value and "NULL" have common prefix, but value is not "NULL".
/// Restore previous buffer position if possible.
if (null_prefix_len <= istr.offset())
{
istr.position() -= null_prefix_len;
null_prefix_len = 0;
}
return false;
};
auto deserialize_nested = [this, &settings, &istr, &null_prefix_len] (IColumn & nested)
{
if (likely(!null_prefix_len))
nested_data_type->deserializeAsTextCSV(nested, istr, settings);
else
{
/// Previous buffer position was not restored,
/// so we need to prepend extracted characters (rare case)
ReadBufferFromMemory prepend(null_literal, null_prefix_len);
ConcatReadBuffer buf(prepend, istr);
nested_data_type->deserializeAsTextCSV(nested, buf, settings);
/// Check if all extracted characters was read by nested parser and update buffer position
if (null_prefix_len < buf.count())
istr.position() = buf.position();
else if (null_prefix_len > buf.count())
throw DB::Exception("Some characters were extracted from buffer, but nested parser did not read them",
ErrorCodes::LOGICAL_ERROR);
}
};
safeDeserialize(column, check_for_null, deserialize_nested);
}
void DataTypeNullable::serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const

View File

@ -61,7 +61,8 @@ public:
* 1. \N
* 2. empty string (without quotes)
* 3. NULL
* Now we support only first.
* We support all of them (however, second variant is supported by CSVRowInputStream, not by deserializeTextCSV).
* (see also input_format_defaults_for_omitted_fields and format_csv_unquoted_null_literal_as_null settings)
* In CSV, non-NULL string value, starting with \N characters, must be placed in quotes, to avoid ambiguity.
*/
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override;

View File

@ -213,53 +213,14 @@ bool CSVRowInputStream::read(MutableColumns & columns, RowReadExtension & ext)
for (size_t file_column = 0; file_column < column_indexes_for_input_fields.size(); ++file_column)
{
const auto & table_column = column_indexes_for_input_fields[file_column];
const bool is_last_file_column =
file_column + 1 == column_indexes_for_input_fields.size();
const bool is_last_file_column = file_column + 1 == column_indexes_for_input_fields.size();
if (table_column)
{
skipWhitespacesAndTabs(istr);
const auto & type = data_types[*table_column];
const bool at_delimiter = *istr.position() == delimiter;
const bool at_last_column_line_end = is_last_file_column
&& (*istr.position() == '\n' || *istr.position() == '\r'
|| istr.eof());
if (format_settings.csv.empty_as_default
&& (at_delimiter || at_last_column_line_end))
{
/// Treat empty unquoted column value as default value, if
/// specified in the settings. Tuple columns might seem
/// problematic, because they are never quoted but still contain
/// commas, which might be also used as delimiters. However,
/// they do not contain empty unquoted fields, so this check
/// works for tuples as well.
read_columns[*table_column] = false;
read_columns[*table_column] = readField(*columns[*table_column], data_types[*table_column], is_last_file_column);
if (!read_columns[*table_column])
have_default_columns = true;
}
else if (format_settings.csv.null_as_default && !type->isNullable() && type->canBeInsideNullable())
{
/// If value is null but type is not nullable then use default value instead.
DataTypeNullable nullable(type);
auto tmp_col = nullable.createColumn();
readField(*tmp_col, nullable);
if (tmp_col->isNullAt(0))
{
read_columns[*table_column] = false;
have_default_columns = true;
}
else
{
columns[*table_column]->insert((*tmp_col)[0]);
read_columns[*table_column] = true;
}
}
else
{
/// Read the column normally.
readField(*columns[*table_column], *type);
read_columns[*table_column] = true;
}
skipWhitespacesAndTabs(istr);
}
else
@ -399,7 +360,7 @@ bool OPTIMIZE(1) CSVRowInputStream::parseRowAndPrintDiagnosticInfo(MutableColumn
{
skipWhitespacesAndTabs(istr);
prev_position = istr.position();
current_column_type->deserializeAsTextCSV(*columns[table_column], istr, format_settings);
readField(*columns[table_column], current_column_type, is_last_file_column);
curr_position = istr.position();
skipWhitespacesAndTabs(istr);
}
@ -539,47 +500,45 @@ void CSVRowInputStream::updateDiagnosticInfo()
pos_of_current_row = istr.position();
}
void CSVRowInputStream::readField(IColumn & column, const IDataType & type)
bool CSVRowInputStream::readField(IColumn & column, const DataTypePtr & type, bool is_last_file_column)
{
if (format_settings.csv.unquoted_null_literal_as_null && type.isNullable())
const bool at_delimiter = *istr.position() == format_settings.csv.delimiter;
const bool at_last_column_line_end = is_last_file_column
&& (*istr.position() == '\n' || *istr.position() == '\r'
|| istr.eof());
if (format_settings.csv.empty_as_default
&& (at_delimiter || at_last_column_line_end))
{
/// Check for unquoted NULL
constexpr char const * null_literal = "NULL";
constexpr size_t len = 4;
size_t count = 0;
while (!istr.eof() && count < len && null_literal[count] == *istr.position())
{
++count;
++istr.position();
}
if (count == len)
{
column.insert(Field()); /// insert null
}
else if (count == 0)
{
type.deserializeAsTextCSV(column, istr, format_settings); /// parse value
}
else
{
/// Prepend extracted data and parse value (rare case)
ReadBufferFromMemory prepend(null_literal, count);
ConcatReadBuffer buf(prepend, istr);
type.deserializeAsTextCSV(column, buf, format_settings);
if (count < buf.count())
istr.position() = buf.position();
}
/// Treat empty unquoted column value as default value, if
/// specified in the settings. Tuple columns might seem
/// problematic, because they are never quoted but still contain
/// commas, which might be also used as delimiters. However,
/// they do not contain empty unquoted fields, so this check
/// works for tuples as well.
return false;
}
else if (format_settings.csv.null_as_default && !type->isNullable() && type->canBeInsideNullable())
{
/// If value is null but type is not nullable then use default value instead.
DataTypeNullable nullable(type);
auto tmp_col = nullable.createColumn();
nullable.deserializeAsTextCSV(*tmp_col, istr, format_settings);
if (tmp_col->isNullAt(0))
return false;
column.insert((*tmp_col)[0]);
return true;
}
else
{
type.deserializeAsTextCSV(column, istr, format_settings);
/// Read the column normally.
type->deserializeAsTextCSV(column, istr, format_settings);
return true;
}
}
void registerInputFormatCSV(FormatFactory & factory)
void registerInputFormatCSV(FormatFactory & factory)
{
for (bool with_names : {false, true})
{

View File

@ -72,7 +72,7 @@ private:
bool parseRowAndPrintDiagnosticInfo(MutableColumns & columns,
WriteBuffer & out, size_t max_length_of_column_name, size_t max_length_of_data_type_name);
void readField(IColumn & column, const IDataType & type);
bool readField(IColumn & column, const DataTypePtr & type, bool is_last_file_column);
};
}