Merge pull request #40068 from Avogar/schema-inference-hints

Allow to specify structure hints in schema inference
This commit is contained in:
Kruglov Pavel 2022-08-18 12:19:45 +02:00 committed by GitHub
commit d7056376eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 195 additions and 89 deletions

View File

@ -3468,6 +3468,24 @@ Default value: `25'000`.
The list of column names to use in schema inference for formats without column names. The format: 'column1,column2,column3,...'
## schema_inference_hints {#schema_inference_hints}
The list of column names and types to use as hints in schema inference for formats without schema.
Example:
Query:
```sql
desc format(JSONEachRow, '{"x" : 1, "y" : "String", "z" : "0.0.0.0" }') settings schema_inference_hints='x UInt8, z IPv4';
```
Result:
```sql
x UInt8
y Nullable(String)
z IPv4
```
## date_time_input_format {#date_time_input_format}
Allows choosing a parser of the text representation of date and time.

View File

@ -706,6 +706,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, input_format_orc_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format ORC", 0) \
M(Bool, input_format_arrow_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format Arrow", 0) \
M(String, column_names_for_schema_inference, "", "The list of column names to use in schema inference for formats without column names. The format: 'column1,column2,column3,...'", 0) \
M(String, schema_inference_hints, "", "The list of column names and types to use in schema inference for formats without column names. The format: 'column1,column2,column3,...'", 0) \
M(Bool, input_format_json_read_bools_as_numbers, true, "Allow to parse bools as numbers in JSON input formats", 0) \
M(Bool, input_format_json_try_infer_numbers_from_strings, true, "Try to infer numbers from string fields while schema inference", 0) \
M(Bool, input_format_try_infer_integers, true, "Try to infer numbers from string fields while schema inference in text formats", 0) \

View File

@ -159,6 +159,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.msgpack.output_uuid_representation = settings.output_format_msgpack_uuid_representation;
format_settings.max_rows_to_read_for_schema_inference = settings.input_format_max_rows_to_read_for_schema_inference;
format_settings.column_names_for_schema_inference = settings.column_names_for_schema_inference;
format_settings.schema_inference_hints = settings.schema_inference_hints;
format_settings.mysql_dump.table_name = settings.input_format_mysql_dump_table_name;
format_settings.mysql_dump.map_column_names = settings.input_format_mysql_dump_map_column_names;
format_settings.sql_insert.max_batch_size = settings.output_format_sql_insert_max_batch_size;
@ -402,7 +403,10 @@ SchemaReaderPtr FormatFactory::getSchemaReader(
throw Exception("FormatFactory: Format " + name + " doesn't support schema inference.", ErrorCodes::LOGICAL_ERROR);
auto format_settings = _format_settings ? *_format_settings : getFormatSettings(context);
return schema_reader_creator(buf, format_settings);
auto schema_reader = schema_reader_creator(buf, format_settings);
if (schema_reader->needContext())
schema_reader->setContext(context);
return schema_reader;
}
ExternalSchemaReaderPtr FormatFactory::getExternalSchemaReader(

View File

@ -38,6 +38,8 @@ struct FormatSettings
UInt64 max_rows_to_read_for_schema_inference = 100;
String column_names_for_schema_inference;
String schema_inference_hints;
bool try_infer_integers = false;
bool try_infer_dates = false;
bool try_infer_datetimes = false;

View File

@ -3,7 +3,7 @@
#include <Parsers/parseQuery.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/Context.h>
#include <TableFunctions/parseColumnsListForTableFunction.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
namespace DB
@ -44,8 +44,15 @@ bool tryParseColumnsListFromString(const std::string & structure, ColumnsDescrip
if (!columns_list)
return false;
columns = InterpreterCreateQuery::getColumnsDescription(*columns_list, context, false);
return true;
try
{
columns = InterpreterCreateQuery::getColumnsDescription(*columns_list, context, false);
return true;
}
catch (...)
{
return false;
}
}
}

View File

@ -2,6 +2,7 @@
#include <Formats/ReadSchemaUtils.h>
#include <Formats/EscapingRuleUtils.h>
#include <DataTypes/DataTypeString.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <boost/algorithm/string.hpp>
namespace DB
@ -45,7 +46,8 @@ void chooseResultColumnType(
{
throw Exception(
ErrorCodes::TYPE_MISMATCH,
"Automatically defined type {} for column {} in row {} differs from type defined by previous rows: {}",
"Automatically defined type {} for column {} in row {} differs from type defined by previous rows: {}. "
"You can specify the type for this column using setting schema_inference_hints",
type->getName(),
column_name,
row,
@ -60,33 +62,42 @@ void checkResultColumnTypeAndAppend(NamesAndTypesList & result, DataTypePtr & ty
if (!default_type)
throw Exception(
ErrorCodes::ONLY_NULLS_WHILE_READING_SCHEMA,
"Cannot determine table structure by first {} rows of data, because some columns contain only Nulls", rows_read);
"Cannot determine type for column {} by first {} rows of data, most likely this column contains only Nulls or empty "
"Arrays/Maps. You can specify the type for this column using setting schema_inference_hints",
name,
rows_read);
type = default_type;
}
result.emplace_back(name, type);
}
IRowSchemaReader::IRowSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_)
: ISchemaReader(in_), format_settings(format_settings_)
IIRowSchemaReader::IIRowSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_, DataTypePtr default_type_)
: ISchemaReader(in_), default_type(default_type_), hints_str(format_settings_.schema_inference_hints), format_settings(format_settings_)
{
if (!format_settings.column_names_for_schema_inference.empty())
}
void IIRowSchemaReader::setContext(ContextPtr & context)
{
ColumnsDescription columns;
if (tryParseColumnsListFromString(hints_str, columns, context))
{
/// column_names_for_schema_inference is a string in format 'column1,column2,column3,...'
boost::split(column_names, format_settings.column_names_for_schema_inference, boost::is_any_of(","));
for (auto & column_name : column_names)
{
std::string col_name_trimmed = boost::trim_copy(column_name);
if (!col_name_trimmed.empty())
column_name = col_name_trimmed;
}
for (const auto & [name, type] : columns.getAll())
hints[name] = type;
}
}
IRowSchemaReader::IRowSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_, DataTypePtr default_type_)
: IRowSchemaReader(in_, format_settings_)
IRowSchemaReader::IRowSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_)
: IIRowSchemaReader(in_, format_settings_)
{
default_type = default_type_;
initColumnNames(format_settings.column_names_for_schema_inference);
}
IRowSchemaReader::IRowSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_, DataTypePtr default_type_)
: IIRowSchemaReader(in_, format_settings_, default_type_)
{
initColumnNames(format_settings.column_names_for_schema_inference);
}
IRowSchemaReader::IRowSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_, const DataTypes & default_types_)
@ -104,31 +115,6 @@ NamesAndTypesList IRowSchemaReader::readSchema()
"Most likely setting input_format_max_rows_to_read_for_schema_inference is set to 0");
DataTypes data_types = readRowAndGetDataTypes();
for (rows_read = 1; rows_read < max_rows_to_read; ++rows_read)
{
DataTypes new_data_types = readRowAndGetDataTypes();
if (new_data_types.empty())
/// We reached eof.
break;
if (new_data_types.size() != data_types.size())
throw Exception(ErrorCodes::INCORRECT_DATA, "Rows have different amount of values");
for (size_t i = 0; i != data_types.size(); ++i)
{
/// We couldn't determine the type of this column in a new row, just skip it.
if (!new_data_types[i])
continue;
auto transform_types_if_needed = [&](DataTypePtr & type, DataTypePtr & new_type){ transformTypesIfNeeded(type, new_type, i); };
chooseResultColumnType(data_types[i], new_data_types[i], transform_types_if_needed, getDefaultType(i), std::to_string(i + 1), rows_read);
}
}
/// Check that we read at list one column.
if (data_types.empty())
throw Exception(ErrorCodes::EMPTY_DATA_PASSED, "Cannot read rows from the data");
/// If column names weren't set, use default names 'c1', 'c2', ...
if (column_names.empty())
{
@ -142,6 +128,39 @@ NamesAndTypesList IRowSchemaReader::readSchema()
ErrorCodes::INCORRECT_DATA,
"The number of column names {} differs with the number of types {}", column_names.size(), data_types.size());
for (size_t i = 0; i != column_names.size(); ++i)
{
auto hint_it = hints.find(column_names[i]);
if (hint_it != hints.end())
data_types[i] = hint_it->second;
}
for (rows_read = 1; rows_read < max_rows_to_read; ++rows_read)
{
DataTypes new_data_types = readRowAndGetDataTypes();
if (new_data_types.empty())
/// We reached eof.
break;
if (new_data_types.size() != data_types.size())
throw Exception(ErrorCodes::INCORRECT_DATA, "Rows have different amount of values");
for (size_t i = 0; i != data_types.size(); ++i)
{
/// Check if we couldn't determine the type of this column in a new row
/// or the type for this column was taken from hints.
if (!new_data_types[i] || hints.contains(column_names[i]))
continue;
auto transform_types_if_needed = [&](DataTypePtr & type, DataTypePtr & new_type){ transformTypesIfNeeded(type, new_type, i); };
chooseResultColumnType(data_types[i], new_data_types[i], transform_types_if_needed, getDefaultType(i), std::to_string(i + 1), rows_read);
}
}
/// Check that we read at list one column.
if (data_types.empty())
throw Exception(ErrorCodes::EMPTY_DATA_PASSED, "Cannot read rows from the data");
NamesAndTypesList result;
for (size_t i = 0; i != data_types.size(); ++i)
{
@ -152,6 +171,21 @@ NamesAndTypesList IRowSchemaReader::readSchema()
return result;
}
void IRowSchemaReader::initColumnNames(const String & column_names_str)
{
if (column_names_str.empty())
return;
/// column_names_for_schema_inference is a string in format 'column1,column2,column3,...'
boost::split(column_names, column_names_str, boost::is_any_of(","));
for (auto & column_name : column_names)
{
std::string col_name_trimmed = boost::trim_copy(column_name);
if (!col_name_trimmed.empty())
column_name = col_name_trimmed;
}
}
DataTypePtr IRowSchemaReader::getDefaultType(size_t column) const
{
if (default_type)
@ -167,7 +201,7 @@ void IRowSchemaReader::transformTypesIfNeeded(DataTypePtr & type, DataTypePtr &
}
IRowWithNamesSchemaReader::IRowWithNamesSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_, DataTypePtr default_type_)
: ISchemaReader(in_), format_settings(format_settings_), default_type(default_type_)
: IIRowSchemaReader(in_, format_settings_, default_type_)
{
}
@ -187,7 +221,11 @@ NamesAndTypesList IRowWithNamesSchemaReader::readSchema()
names_order.reserve(names_and_types.size());
for (const auto & [name, type] : names_and_types)
{
names_to_types[name] = type;
auto hint_it = hints.find(name);
if (hint_it != hints.end())
names_to_types[name] = hint_it->second;
else
names_to_types[name] = type;
names_order.push_back(name);
}
@ -205,11 +243,18 @@ NamesAndTypesList IRowWithNamesSchemaReader::readSchema()
/// If we didn't see this column before, just add it.
if (it == names_to_types.end())
{
names_to_types[name] = new_type;
auto hint_it = hints.find(name);
if (hint_it != hints.end())
names_to_types[name] = hint_it->second;
else
names_to_types[name] = new_type;
names_order.push_back(name);
continue;
}
if (hints.contains(name))
continue;
auto & type = it->second;
chooseResultColumnType(type, new_type, transform_types_if_needed, default_type, name, rows_read);
}

View File

@ -37,6 +37,26 @@ protected:
using CommonDataTypeChecker = std::function<DataTypePtr(const DataTypePtr &, const DataTypePtr &)>;
class IIRowSchemaReader : public ISchemaReader
{
public:
IIRowSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_, DataTypePtr default_type_ = nullptr);
bool needContext() const override { return !hints_str.empty(); }
void setContext(ContextPtr & context) override;
protected:
void setMaxRowsToRead(size_t max_rows) override { max_rows_to_read = max_rows; }
size_t getNumRowsRead() const override { return rows_read; }
size_t max_rows_to_read;
size_t rows_read = 0;
DataTypePtr default_type;
String hints_str;
FormatSettings format_settings;
std::unordered_map<String, DataTypePtr> hints;
};
/// Base class for schema inference for formats that read data row by row.
/// It reads data row by row (up to max_rows_to_read), determines types of columns
/// for each row and compare them with types from the previous rows. If some column
@ -44,12 +64,12 @@ using CommonDataTypeChecker = std::function<DataTypePtr(const DataTypePtr &, con
/// (from argument default_type_) will be used for this column or the exception
/// will be thrown (if default type is not set). If different columns have different
/// default types, you can provide them by default_types_ argument.
class IRowSchemaReader : public ISchemaReader
class IRowSchemaReader : public IIRowSchemaReader
{
public:
IRowSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings);
IRowSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings, DataTypePtr default_type_);
IRowSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings, const DataTypes & default_types_);
IRowSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_);
IRowSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_, DataTypePtr default_type_);
IRowSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_, const DataTypes & default_types_);
NamesAndTypesList readSchema() override;
@ -62,19 +82,12 @@ protected:
void setColumnNames(const std::vector<String> & names) { column_names = names; }
void setMaxRowsToRead(size_t max_rows) override { max_rows_to_read = max_rows; }
size_t getNumRowsRead() const override { return rows_read; }
FormatSettings format_settings;
virtual void transformTypesIfNeeded(DataTypePtr & type, DataTypePtr & new_type, size_t column_idx);
virtual void transformTypesIfNeeded(DataTypePtr & type, DataTypePtr & new_type, size_t index);
private:
DataTypePtr getDefaultType(size_t column) const;
size_t max_rows_to_read;
size_t rows_read = 0;
DataTypePtr default_type;
void initColumnNames(const String & column_names_str);
DataTypes default_types;
std::vector<String> column_names;
};
@ -84,7 +97,7 @@ private:
/// Differ from IRowSchemaReader in that after reading a row we get
/// a map {column_name : type} and some columns may be missed in a single row
/// (in this case we will use types from the previous rows for missed columns).
class IRowWithNamesSchemaReader : public ISchemaReader
class IRowWithNamesSchemaReader : public IIRowSchemaReader
{
public:
IRowWithNamesSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_, DataTypePtr default_type_ = nullptr);
@ -98,17 +111,7 @@ protected:
/// Set eof = true if can't read more data.
virtual NamesAndTypesList readRowAndGetNamesAndDataTypes(bool & eof) = 0;
void setMaxRowsToRead(size_t max_rows) override { max_rows_to_read = max_rows; }
size_t getNumRowsRead() const override { return rows_read; }
virtual void transformTypesIfNeeded(DataTypePtr & type, DataTypePtr & new_type);
FormatSettings format_settings;
private:
size_t max_rows_to_read;
size_t rows_read = 0;
DataTypePtr default_type;
};
/// Base class for schema inference for formats that don't need any data to

View File

@ -88,7 +88,7 @@ private:
class JSONEachRowSchemaReader : public IRowWithNamesSchemaReader
{
public:
JSONEachRowSchemaReader(ReadBuffer & in_, bool json_strings, const FormatSettings & format_settings);
JSONEachRowSchemaReader(ReadBuffer & in_, bool json_strings, const FormatSettings & format_settings_);
private:
NamesAndTypesList readRowAndGetNamesAndDataTypes(bool & eof) override;

View File

@ -14,7 +14,7 @@
#include <Storages/Hive/StorageHive.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/parseColumnsListForTableFunction.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <Common/logger_useful.h>
namespace DB

View File

@ -1,5 +1,5 @@
#include <TableFunctions/ITableFunctionFileLike.h>
#include <TableFunctions/parseColumnsListForTableFunction.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>

View File

@ -2,7 +2,7 @@
#include <Common/Exception.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/parseColumnsListForTableFunction.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTSelectWithUnionQuery.h>

View File

@ -1,5 +1,5 @@
#include <TableFunctions/TableFunctionFile.h>
#include <TableFunctions/parseColumnsListForTableFunction.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include "Parsers/IAST_fwd.h"
#include "registerTableFunctions.h"

View File

@ -12,7 +12,7 @@
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionGenerateRandom.h>
#include <TableFunctions/parseColumnsListForTableFunction.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include "registerTableFunctions.h"

View File

@ -6,7 +6,7 @@
#include <Storages/ColumnsDescription.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionHDFS.h>
#include <TableFunctions/parseColumnsListForTableFunction.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
namespace DB
{

View File

@ -13,7 +13,7 @@
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionHDFS.h>
#include <TableFunctions/TableFunctionHDFSCluster.h>
#include <TableFunctions/parseColumnsListForTableFunction.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/IAST_fwd.h>

View File

@ -1,6 +1,6 @@
#include <TableFunctions/TableFunctionInput.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/parseColumnsListForTableFunction.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Common/Exception.h>

View File

@ -9,7 +9,7 @@
#include <TableFunctions/TableFunctionMongoDB.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/parseColumnsListForTableFunction.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <TableFunctions/registerTableFunctions.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/ColumnsDescription.h>

View File

@ -3,7 +3,7 @@
#include <Parsers/ASTFunction.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/StorageNull.h>
#include <TableFunctions/parseColumnsListForTableFunction.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionNull.h>
#include <Interpreters/evaluateConstantExpression.h>

View File

@ -7,7 +7,7 @@
#include <Interpreters/Context.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionS3.h>
#include <TableFunctions/parseColumnsListForTableFunction.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/StorageS3.h>

View File

@ -14,7 +14,7 @@
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionS3.h>
#include <TableFunctions/TableFunctionS3Cluster.h>
#include <TableFunctions/parseColumnsListForTableFunction.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/IAST_fwd.h>

View File

@ -9,7 +9,7 @@
#include <Storages/StorageURL.h>
#include <Storages/StorageExternalDistributed.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/parseColumnsListForTableFunction.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <Formats/FormatFactory.h>

View File

@ -11,7 +11,7 @@
#include <TableFunctions/TableFunctionValues.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/parseColumnsListForTableFunction.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <Interpreters/convertFieldToType.h>
#include <Interpreters/evaluateConstantExpression.h>

View File

@ -8,7 +8,7 @@
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionViewIfPermitted.h>
#include <TableFunctions/parseColumnsListForTableFunction.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <Interpreters/evaluateConstantExpression.h>
#include "registerTableFunctions.h"

View File

@ -0,0 +1,16 @@
x UInt8
y Nullable(String)
z IPv4
x UInt8
y Nullable(String)
z IPv4
x Nullable(UInt32)
x Array(UInt32)
x Map(String, String)
c1 UInt8
c2 Nullable(String)
c3 IPv4
x UInt8
y Nullable(String)
z IPv4
x Nullable(UInt32)

View File

@ -0,0 +1,10 @@
-- Tags: no-fasttest
desc format(JSONEachRow, '{"x" : 1, "y" : "String", "z" : "0.0.0.0" }') settings schema_inference_hints='x UInt8, z IPv4';
desc format(JSONEachRow, '{"x" : 1, "y" : "String"}\n{"z" : "0.0.0.0", "y" : "String2"}\n{"x" : 2}') settings schema_inference_hints='x UInt8, z IPv4';
desc format(JSONEachRow, '{"x" : null}') settings schema_inference_hints='x Nullable(UInt32)';
desc format(JSONEachRow, '{"x" : []}') settings schema_inference_hints='x Array(UInt32)';
desc format(JSONEachRow, '{"x" : {}}') settings schema_inference_hints='x Map(String, String)';
desc format(CSV, '1,"String","0.0.0.0"') settings schema_inference_hints='c1 UInt8, c3 IPv4';
desc format(CSV, '1,"String","0.0.0.0"') settings schema_inference_hints='x UInt8, z IPv4', column_names_for_schema_inference='x, y, z';
desc format(CSV, '\\N') settings schema_inference_hints='x Nullable(UInt32)', column_names_for_schema_inference='x';