Treat empty cells in CSV as default values. (#5625)

Treat empty cells in CSV as default values when input_format_defaults_for_omitted_fields = true.

Closes #5349.
This commit is contained in:
akuzm 2019-06-20 15:46:36 +03:00 committed by GitHub
parent 3d8c8ee83c
commit 4cc9f632a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 184 additions and 121 deletions

View File

@ -104,24 +104,9 @@ CSVRowInputStream::CSVRowInputStream(ReadBuffer & istr_, const Block & header_,
data_types[i] = column_info.type;
column_indexes_by_names.emplace(column_info.name, i);
}
column_indexes_for_input_fields.reserve(num_columns);
read_columns.assign(num_columns, false);
}
void CSVRowInputStream::setupAllColumnsByTableSchema()
{
read_columns.assign(header.columns(), true);
column_indexes_for_input_fields.resize(header.columns());
for (size_t i = 0; i < column_indexes_for_input_fields.size(); ++i)
{
column_indexes_for_input_fields[i] = i;
}
}
/// Map an input file column to a table column, based on its name.
void CSVRowInputStream::addInputColumn(const String & column_name)
{
const auto column_it = column_indexes_by_names.find(column_name);
@ -150,25 +135,6 @@ void CSVRowInputStream::addInputColumn(const String & column_name)
column_indexes_for_input_fields.emplace_back(column_index);
}
void CSVRowInputStream::fillUnreadColumnsWithDefaults(MutableColumns & columns, RowReadExtension & row_read_extension)
{
/// It is safe to memorize this on the first run - the format guarantees this does not change
if (unlikely(row_num == 1))
{
columns_to_fill_with_default_values.clear();
for (size_t index = 0; index < read_columns.size(); ++index)
if (read_columns[index] == 0)
columns_to_fill_with_default_values.push_back(index);
}
for (const auto column_index : columns_to_fill_with_default_values)
data_types[column_index]->insertDefaultInto(*columns[column_index]);
row_read_extension.read_columns = read_columns;
}
void CSVRowInputStream::readPrefix()
{
/// In this format, we assume, that if first string field contain BOM as value, it will be written in quotes,
@ -177,11 +143,16 @@ void CSVRowInputStream::readPrefix()
if (with_names)
{
/// This CSV file has a header row with column names. Depending on the
/// settings, use it or skip it.
if (format_settings.with_names_use_header)
{
String column_name;
/// Look at the file header to see which columns we have there.
/// The missing columns are filled with defaults.
read_columns.assign(header.columns(), false);
do
{
String column_name;
skipWhitespacesAndTabs(istr);
readCSVString(column_name, istr, format_settings.csv);
skipWhitespacesAndTabs(istr);
@ -191,20 +162,38 @@ void CSVRowInputStream::readPrefix()
while (checkChar(format_settings.csv.delimiter, istr));
skipDelimiter(istr, format_settings.csv.delimiter, true);
for (size_t column = 0; column < read_columns.size(); column++)
{
if (!read_columns[column])
{
have_always_default_columns = true;
break;
}
}
return;
}
else
{
setupAllColumnsByTableSchema();
skipRow(istr, format_settings.csv, column_indexes_for_input_fields.size());
skipRow(istr, format_settings.csv, header.columns());
}
}
else
/// The default: map each column of the file to the column of the table with
/// the same index.
read_columns.assign(header.columns(), true);
column_indexes_for_input_fields.resize(header.columns());
for (size_t i = 0; i < column_indexes_for_input_fields.size(); ++i)
{
setupAllColumnsByTableSchema();
column_indexes_for_input_fields[i] = i;
}
}
/** If you change this function, don't forget to change its counterpart
* with extended error reporting: parseRowAndPrintDiagnosticInfo().
*/
bool CSVRowInputStream::read(MutableColumns & columns, RowReadExtension & ext)
{
if (istr.eof())
@ -212,25 +201,75 @@ bool CSVRowInputStream::read(MutableColumns & columns, RowReadExtension & ext)
updateDiagnosticInfo();
String tmp;
for (size_t input_position = 0; input_position < column_indexes_for_input_fields.size(); ++input_position)
/// Track whether we have to fill any columns in this row with default
/// values. If not, we return an empty column mask to the caller, so that
/// it doesn't have to check it.
bool have_default_columns = have_always_default_columns;
const auto delimiter = format_settings.csv.delimiter;
for (size_t file_column = 0; file_column < column_indexes_for_input_fields.size(); ++file_column)
{
const auto & column_index = column_indexes_for_input_fields[input_position];
if (column_index)
const auto & table_column = column_indexes_for_input_fields[file_column];
const bool is_last_file_column =
file_column + 1 == column_indexes_for_input_fields.size();
if (table_column)
{
skipWhitespacesAndTabs(istr);
data_types[*column_index]->deserializeAsTextCSV(*columns[*column_index], istr, format_settings);
skipWhitespacesAndTabs(istr);
const auto & type = data_types[*table_column];
const bool at_delimiter = *istr.position() == delimiter;
const bool at_last_column_line_end = is_last_file_column
&& (*istr.position() == '\n' || *istr.position() == '\r'
|| istr.eof());
if (format_settings.csv.empty_as_default
&& (at_delimiter || at_last_column_line_end))
{
/// Treat empty unquoted column value as default value, if
/// specified in the settings. Tuple columns might seem
/// problematic, because they are never quoted but still contain
/// commas, which might be also used as delimiters. However,
/// they do not contain empty unquoted fields, so this check
/// works for tuples as well.
read_columns[*table_column] = false;
have_default_columns = true;
}
else
{
/// Read the column normally.
read_columns[*table_column] = true;
skipWhitespacesAndTabs(istr);
type->deserializeAsTextCSV(*columns[*table_column], istr,
format_settings);
skipWhitespacesAndTabs(istr);
}
}
else
{
/// We never read this column from the file, just skip it.
String tmp;
readCSVString(tmp, istr, format_settings.csv);
}
skipDelimiter(istr, format_settings.csv.delimiter, input_position + 1 == column_indexes_for_input_fields.size());
skipDelimiter(istr, delimiter, is_last_file_column);
}
fillUnreadColumnsWithDefaults(columns, ext);
if (have_default_columns)
{
for (size_t i = 0; i < read_columns.size(); i++)
{
if (!read_columns[i])
{
/// The column value for this row is going to be overwritten
/// with default by the caller, but the general assumption is
/// that the column size increases for each row, so we have
/// to insert something. Since we do not care about the exact
/// value, we do not have to use the default value specified by
/// the data type, and can just use IColumn::insertDefault().
columns[i]->insertDefault();
}
}
ext.read_columns = read_columns;
}
return true;
}
@ -303,74 +342,87 @@ bool OPTIMIZE(1) CSVRowInputStream::parseRowAndPrintDiagnosticInfo(MutableColumn
{
const char delimiter = format_settings.csv.delimiter;
for (size_t input_position = 0; input_position < column_indexes_for_input_fields.size(); ++input_position)
for (size_t file_column = 0; file_column < column_indexes_for_input_fields.size(); ++file_column)
{
if (input_position == 0 && istr.eof())
if (file_column == 0 && istr.eof())
{
out << "<End of stream>\n";
return false;
}
if (column_indexes_for_input_fields[input_position].has_value())
if (column_indexes_for_input_fields[file_column].has_value())
{
const auto & column_index = *column_indexes_for_input_fields[input_position];
const auto & current_column_type = data_types[column_index];
const auto & table_column = *column_indexes_for_input_fields[file_column];
const auto & current_column_type = data_types[table_column];
const bool is_last_file_column =
file_column + 1 == column_indexes_for_input_fields.size();
const bool at_delimiter = *istr.position() == delimiter;
const bool at_last_column_line_end = is_last_file_column
&& (*istr.position() == '\n' || *istr.position() == '\r'
|| istr.eof());
out << "Column " << input_position << ", " << std::string((input_position < 10 ? 2 : input_position < 100 ? 1 : 0), ' ')
<< "name: " << header.safeGetByPosition(column_index).name << ", " << std::string(max_length_of_column_name - header.safeGetByPosition(column_index).name.size(), ' ')
out << "Column " << file_column << ", " << std::string((file_column < 10 ? 2 : file_column < 100 ? 1 : 0), ' ')
<< "name: " << header.safeGetByPosition(table_column).name << ", " << std::string(max_length_of_column_name - header.safeGetByPosition(table_column).name.size(), ' ')
<< "type: " << current_column_type->getName() << ", " << std::string(max_length_of_data_type_name - current_column_type->getName().size(), ' ');
BufferBase::Position prev_position = istr.position();
BufferBase::Position curr_position = istr.position();
std::exception_ptr exception;
try
if (format_settings.csv.empty_as_default
&& (at_delimiter || at_last_column_line_end))
{
skipWhitespacesAndTabs(istr);
prev_position = istr.position();
current_column_type->deserializeAsTextCSV(*columns[column_index], istr, format_settings);
curr_position = istr.position();
skipWhitespacesAndTabs(istr);
columns[table_column]->insertDefault();
}
catch (...)
else
{
exception = std::current_exception();
}
BufferBase::Position prev_position = istr.position();
BufferBase::Position curr_position = istr.position();
std::exception_ptr exception;
if (curr_position < prev_position)
throw Exception("Logical error: parsing is non-deterministic.", ErrorCodes::LOGICAL_ERROR);
if (isNativeNumber(current_column_type) || isDateOrDateTime(current_column_type))
{
/// An empty string instead of a value.
if (curr_position == prev_position)
try
{
out << "ERROR: text ";
verbosePrintString(prev_position, std::min(prev_position + 10, istr.buffer().end()), out);
out << " is not like " << current_column_type->getName() << "\n";
skipWhitespacesAndTabs(istr);
prev_position = istr.position();
current_column_type->deserializeAsTextCSV(*columns[table_column], istr, format_settings);
curr_position = istr.position();
skipWhitespacesAndTabs(istr);
}
catch (...)
{
exception = std::current_exception();
}
if (curr_position < prev_position)
throw Exception("Logical error: parsing is non-deterministic.", ErrorCodes::LOGICAL_ERROR);
if (isNativeNumber(current_column_type) || isDateOrDateTime(current_column_type))
{
/// An empty string instead of a value.
if (curr_position == prev_position)
{
out << "ERROR: text ";
verbosePrintString(prev_position, std::min(prev_position + 10, istr.buffer().end()), out);
out << " is not like " << current_column_type->getName() << "\n";
return false;
}
}
out << "parsed text: ";
verbosePrintString(prev_position, curr_position, out);
if (exception)
{
if (current_column_type->getName() == "DateTime")
out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n";
else if (current_column_type->getName() == "Date")
out << "ERROR: Date must be in YYYY-MM-DD format.\n";
else
out << "ERROR\n";
return false;
}
}
out << "parsed text: ";
verbosePrintString(prev_position, curr_position, out);
out << "\n";
if (exception)
{
if (current_column_type->getName() == "DateTime")
out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n";
else if (current_column_type->getName() == "Date")
out << "ERROR: Date must be in YYYY-MM-DD format.\n";
else
out << "ERROR\n";
return false;
}
out << "\n";
if (current_column_type->haveMaximumSizeOfValue())
{
if (*curr_position != '\n' && *curr_position != '\r' && *curr_position != delimiter)
if (current_column_type->haveMaximumSizeOfValue()
&& *curr_position != '\n' && *curr_position != '\r'
&& *curr_position != delimiter)
{
out << "ERROR: garbage after " << current_column_type->getName() << ": ";
verbosePrintString(curr_position, std::min(curr_position + 10, istr.buffer().end()), out);
@ -388,7 +440,7 @@ bool OPTIMIZE(1) CSVRowInputStream::parseRowAndPrintDiagnosticInfo(MutableColumn
else
{
static const String skipped_column_str = "<SKIPPED COLUMN>";
out << "Column " << input_position << ", " << std::string((input_position < 10 ? 2 : input_position < 100 ? 1 : 0), ' ')
out << "Column " << file_column << ", " << std::string((file_column < 10 ? 2 : file_column < 100 ? 1 : 0), ' ')
<< "name: " << skipped_column_str << ", " << std::string(max_length_of_column_name - skipped_column_str.length(), ' ')
<< "type: " << skipped_column_str << ", " << std::string(max_length_of_data_type_name - skipped_column_str.length(), ' ');
@ -397,7 +449,7 @@ bool OPTIMIZE(1) CSVRowInputStream::parseRowAndPrintDiagnosticInfo(MutableColumn
}
/// Delimiters
if (input_position + 1 == column_indexes_for_input_fields.size())
if (file_column + 1 == column_indexes_for_input_fields.size())
{
if (istr.eof())
return false;

View File

@ -20,7 +20,6 @@ class CSVRowInputStream : public IRowInputStream
{
public:
/** with_names - in the first line the header with column names
* with_types - on the next line header with type names
*/
CSVRowInputStream(ReadBuffer & istr_, const Block & header_, bool with_names_, const FormatSettings & format_settings);
@ -42,18 +41,23 @@ private:
using IndexesMap = std::unordered_map<String, size_t>;
IndexesMap column_indexes_by_names;
/// Maps indexes of columns in the input file to indexes of table columns
using OptionalIndexes = std::vector<std::optional<size_t>>;
OptionalIndexes column_indexes_for_input_fields;
/// Tracks which colums we have read in a single read() call.
/// For columns that are never read, it is initialized to false when we
/// read the file header, and never changed afterwards.
/// For other columns, it is updated on each read() call.
std::vector<UInt8> read_columns;
std::vector<size_t> columns_to_fill_with_default_values;
/// Whether we have any columns that are not read from file at all,
/// and must be always initialized with defaults.
bool have_always_default_columns = false;
void addInputColumn(const String & column_name);
void setupAllColumnsByTableSchema();
void fillUnreadColumnsWithDefaults(MutableColumns & columns, RowReadExtension& ext);
/// For convenient diagnostics in case of an error.
size_t row_num = 0;
/// How many bytes were read, not counting those that are still in the buffer.

View File

@ -39,6 +39,7 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu
format_settings.csv.delimiter = settings.format_csv_delimiter;
format_settings.csv.allow_single_quotes = settings.format_csv_allow_single_quotes;
format_settings.csv.allow_double_quotes = settings.format_csv_allow_double_quotes;
format_settings.csv.empty_as_default = settings.input_format_defaults_for_omitted_fields;
format_settings.values.interpret_expressions = settings.input_format_values_interpret_expressions;
format_settings.with_names_use_header = settings.input_format_with_names_use_header;
format_settings.skip_unknown_fields = settings.input_format_skip_unknown_fields;

View File

@ -27,6 +27,7 @@ struct FormatSettings
char delimiter = ',';
bool allow_single_quotes = true;
bool allow_double_quotes = true;
bool empty_as_default = false;
};
CSV csv;

View File

@ -15,6 +15,7 @@ struct RowReadExtension
{
/// IRowInputStream.read() output. It contains non zero for columns that actually read from the source and zero otherwise.
/// It's used to attach defaults for partially filled rows.
/// Can be empty, this means that all columns are read.
std::vector<UInt8> read_columns;
};

View File

@ -2,6 +2,8 @@ Hello, world 123 2016-01-01
Hello, "world" 456 2016-01-02
Hello "world" 789 2016-01-03
Hello\n world 100 2016-01-04
default 1 2019-06-19
default-eof 1 2019-06-19
2016-01-01 01:02:03 1
2016-01-02 01:02:03 2
2017-08-15 13:15:01 3

View File

@ -4,13 +4,15 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS csv";
$CLICKHOUSE_CLIENT --query="CREATE TABLE csv (s String, n UInt64, d Date) ENGINE = Memory";
$CLICKHOUSE_CLIENT --query="CREATE TABLE csv (s String, n UInt64 DEFAULT 1, d Date DEFAULT '2019-06-19') ENGINE = Memory";
echo '"Hello, world", 123, "2016-01-01"
printf '"Hello, world", 123, "2016-01-01"
"Hello, ""world""", "456", 2016-01-02,
Hello "world", 789 ,2016-01-03
"Hello
world", 100, 2016-01-04,' | $CLICKHOUSE_CLIENT --query="INSERT INTO csv FORMAT CSV";
world", 100, 2016-01-04,
default,,
default-eof,,' | $CLICKHOUSE_CLIENT --input_format_defaults_for_omitted_fields --query="INSERT INTO csv FORMAT CSV";
$CLICKHOUSE_CLIENT --query="SELECT * FROM csv ORDER BY d";
$CLICKHOUSE_CLIENT --query="DROP TABLE csv";

View File

@ -165,6 +165,11 @@ clickhouse-client --format_csv_delimiter="|" --query="INSERT INTO test.csv FORMA
When parsing, all values can be parsed either with or without quotes. Both double and single quotes are supported. Rows can also be arranged without quotes. In this case, they are parsed up to the delimiter character or line feed (CR or LF). In violation of the RFC, when parsing rows without quotes, the leading and trailing spaces and tabs are ignored. For the line feed, Unix (LF), Windows (CR LF) and Mac OS Classic (CR LF) types are all supported.
Empty unquoted input values are replaced with default values for the respective
columns, if
[input_format_defaults_for_omitted_fields](../operations/settings/settings.md#session_settings-input_format_defaults_for_omitted_fields)
is enabled.
`NULL` is formatted as `\N`.
The CSV format supports the output of totals and extremes the same way as `TabSeparated`.

View File

@ -199,20 +199,15 @@ Ok.
## input_format_defaults_for_omitted_fields {#session_settings-input_format_defaults_for_omitted_fields}
Turns on/off the extended data exchange between a ClickHouse client and a ClickHouse server. This setting applies for `INSERT` queries.
When executing the `INSERT` query, the ClickHouse client prepares data and sends it to the server for writing. The client gets the table structure from the server when preparing the data. In some cases, the client needs more information than the server sends by default. Turn on the extended data exchange with `input_format_defaults_for_omitted_fields = 1`.
When the extended data exchange is enabled, the server sends the additional metadata along with the table structure. The composition of the metadata depends on the operation.
Operations where you may need the extended data exchange enabled:
- Inserting data in [JSONEachRow](../../interfaces/formats.md#jsoneachrow) format.
For all other operations, ClickHouse doesn't apply the setting.
When performing `INSERT` queries, replace omitted input column values with
default values of the respective columns. This option only applies to
[JSONEachRow](../../interfaces/formats.md#jsoneachrow) and
[CSV](../../interfaces/formats.md#csv) formats.
!!! note "Note"
The extended data exchange functionality consumes additional computing resources on the server and can reduce performance.
When this option is enabled, extended table metadata are sent
from server to client. It consumes additional computing resources on the server
and can reduce performance.
Possible values: