2021-12-15 11:30:57 +00:00
# include <DataTypes/DataTypesNumber.h>
# include <DataTypes/DataTypeNullable.h>
# include <DataTypes/DataTypeArray.h>
# include <DataTypes/DataTypeTuple.h>
# include <DataTypes/DataTypeMap.h>
# include <DataTypes/DataTypeLowCardinality.h>
# include <Formats/ReadSchemaUtils.h>
# include <Processors/Formats/ISchemaReader.h>
# include <Common/assert_cast.h>
2022-03-25 21:00:00 +00:00
# include <Interpreters/Context.h>
# include <Storages/IStorage.h>
2021-12-15 11:30:57 +00:00
namespace DB
{
namespace ErrorCodes
{
2022-04-13 16:59:04 +00:00
extern const int EMPTY_DATA_PASSED ;
2021-12-15 11:30:57 +00:00
extern const int BAD_ARGUMENTS ;
2022-04-13 16:59:04 +00:00
extern const int ONLY_NULLS_WHILE_READING_SCHEMA ;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE ;
2021-12-15 11:30:57 +00:00
}
2022-03-25 21:00:00 +00:00
static std : : optional < NamesAndTypesList > getOrderedColumnsList (
const NamesAndTypesList & columns_list , const Names & columns_order_hint )
{
if ( columns_list . size ( ) ! = columns_order_hint . size ( ) )
return { } ;
std : : unordered_map < String , DataTypePtr > available_columns ;
for ( const auto & [ name , type ] : columns_list )
available_columns . emplace ( name , type ) ;
NamesAndTypesList res ;
for ( const auto & name : columns_order_hint )
{
auto it = available_columns . find ( name ) ;
if ( it = = available_columns . end ( ) )
return { } ;
res . emplace_back ( name , it - > second ) ;
}
return res ;
}
2022-04-13 16:59:04 +00:00
bool isRetryableSchemaInferenceError ( int code )
{
return code = = ErrorCodes : : EMPTY_DATA_PASSED | | code = = ErrorCodes : : ONLY_NULLS_WHILE_READING_SCHEMA ;
}
2022-01-24 18:41:44 +00:00
ColumnsDescription readSchemaFromFormat (
const String & format_name ,
const std : : optional < FormatSettings > & format_settings ,
2022-04-13 16:59:04 +00:00
ReadBufferIterator & read_buffer_iterator ,
ContextPtr & context ,
2022-01-24 18:41:44 +00:00
std : : unique_ptr < ReadBuffer > & buf_out )
2021-12-15 11:30:57 +00:00
{
NamesAndTypesList names_and_types ;
if ( FormatFactory : : instance ( ) . checkIfFormatHasExternalSchemaReader ( format_name ) )
{
auto external_schema_reader = FormatFactory : : instance ( ) . getExternalSchemaReader ( format_name , context , format_settings ) ;
2022-04-13 16:59:04 +00:00
names_and_types = external_schema_reader - > readSchema ( ) ;
2021-12-15 11:30:57 +00:00
}
else if ( FormatFactory : : instance ( ) . checkIfFormatHasSchemaReader ( format_name ) )
{
2022-04-13 16:59:04 +00:00
std : : string exception_messages ;
SchemaReaderPtr schema_reader ;
std : : unique_ptr < ReadBuffer > buf ;
while ( ( buf = read_buffer_iterator . next ( ) ) )
2021-12-15 11:30:57 +00:00
{
2022-04-13 16:59:04 +00:00
if ( buf - > eof ( ) )
{
auto exception_message = fmt : : format ( " Cannot extract table structure from {} format file, file is empty \n " , format_name ) ;
if ( read_buffer_iterator . isSingle ( ) )
throw Exception ( ErrorCodes : : CANNOT_EXTRACT_TABLE_STRUCTURE , exception_message ) ;
exception_messages + = " \n " + exception_message ;
continue ;
}
try
{
schema_reader = FormatFactory : : instance ( ) . getSchemaReader ( format_name , * buf , context , format_settings ) ;
names_and_types = schema_reader - > readSchema ( ) ;
buf_out = std : : move ( buf ) ;
break ;
}
catch ( . . . )
{
auto exception_message = getCurrentExceptionMessage ( false ) ;
if ( read_buffer_iterator . isSingle ( ) | | ! isRetryableSchemaInferenceError ( getCurrentExceptionCode ( ) ) )
throw Exception ( ErrorCodes : : CANNOT_EXTRACT_TABLE_STRUCTURE , " Cannot extract table structure from {} format file. Error: {} " , format_name , exception_message ) ;
exception_messages + = " \n " + exception_message ;
}
2021-12-15 11:30:57 +00:00
}
2022-03-25 21:00:00 +00:00
2022-04-13 16:59:04 +00:00
if ( names_and_types . empty ( ) )
throw Exception ( ErrorCodes : : CANNOT_EXTRACT_TABLE_STRUCTURE , " All attempts to extract table structure from files failed. Errors:{} " , exception_messages ) ;
2022-03-25 21:00:00 +00:00
/// If we have "INSERT SELECT" query then try to order
/// columns as they are ordered in table schema for formats
/// without strict column order (like JSON and TSKV).
/// It will allow to execute simple data loading with query
/// "INSERT INTO table SELECT * FROM ..."
const auto & insertion_table = context - > getInsertionTable ( ) ;
if ( ! schema_reader - > hasStrictOrderOfColumns ( ) & & ! insertion_table . empty ( ) )
{
auto storage = DatabaseCatalog : : instance ( ) . getTable ( insertion_table , context ) ;
auto metadata = storage - > getInMemoryMetadataPtr ( ) ;
auto names_in_storage = metadata - > getColumns ( ) . getNamesOfPhysical ( ) ;
auto ordered_list = getOrderedColumnsList ( names_and_types , names_in_storage ) ;
if ( ordered_list )
names_and_types = * ordered_list ;
}
2021-12-15 11:30:57 +00:00
}
else
throw Exception ( ErrorCodes : : BAD_ARGUMENTS , " {} file format doesn't support schema inference " , format_name ) ;
return ColumnsDescription ( names_and_types ) ;
}
2022-04-13 16:59:04 +00:00
ColumnsDescription readSchemaFromFormat ( const String & format_name , const std : : optional < FormatSettings > & format_settings , ReadBufferIterator & read_buffer_iterator , ContextPtr & context )
2022-01-24 18:41:44 +00:00
{
std : : unique_ptr < ReadBuffer > buf_out ;
2022-04-13 16:59:04 +00:00
return readSchemaFromFormat ( format_name , format_settings , read_buffer_iterator , context , buf_out ) ;
2022-01-24 18:41:44 +00:00
}
2022-03-24 12:54:12 +00:00
DataTypePtr makeNullableRecursivelyAndCheckForNothing ( DataTypePtr type )
2021-12-15 11:30:57 +00:00
{
2022-03-24 12:54:12 +00:00
if ( ! type )
return nullptr ;
2021-12-15 11:30:57 +00:00
WhichDataType which ( type ) ;
if ( which . isNothing ( ) )
return nullptr ;
if ( which . isNullable ( ) )
{
const auto * nullable_type = assert_cast < const DataTypeNullable * > ( type . get ( ) ) ;
2022-03-24 12:54:12 +00:00
return makeNullableRecursivelyAndCheckForNothing ( nullable_type - > getNestedType ( ) ) ;
2021-12-15 11:30:57 +00:00
}
if ( which . isArray ( ) )
{
const auto * array_type = assert_cast < const DataTypeArray * > ( type . get ( ) ) ;
2022-03-24 12:54:12 +00:00
auto nested_type = makeNullableRecursivelyAndCheckForNothing ( array_type - > getNestedType ( ) ) ;
2021-12-15 11:30:57 +00:00
return nested_type ? std : : make_shared < DataTypeArray > ( nested_type ) : nullptr ;
}
if ( which . isTuple ( ) )
{
const auto * tuple_type = assert_cast < const DataTypeTuple * > ( type . get ( ) ) ;
DataTypes nested_types ;
for ( const auto & element : tuple_type - > getElements ( ) )
{
2022-03-24 12:54:12 +00:00
auto nested_type = makeNullableRecursivelyAndCheckForNothing ( element ) ;
2021-12-15 11:30:57 +00:00
if ( ! nested_type )
return nullptr ;
nested_types . push_back ( nested_type ) ;
}
return std : : make_shared < DataTypeTuple > ( std : : move ( nested_types ) ) ;
}
if ( which . isMap ( ) )
{
const auto * map_type = assert_cast < const DataTypeMap * > ( type . get ( ) ) ;
2022-03-24 12:54:12 +00:00
auto key_type = makeNullableRecursivelyAndCheckForNothing ( map_type - > getKeyType ( ) ) ;
auto value_type = makeNullableRecursivelyAndCheckForNothing ( map_type - > getValueType ( ) ) ;
return key_type & & value_type ? std : : make_shared < DataTypeMap > ( removeNullable ( key_type ) , value_type ) : nullptr ;
2021-12-15 11:30:57 +00:00
}
if ( which . isLowCarnality ( ) )
{
const auto * lc_type = assert_cast < const DataTypeLowCardinality * > ( type . get ( ) ) ;
2022-03-24 12:54:12 +00:00
auto nested_type = makeNullableRecursivelyAndCheckForNothing ( lc_type - > getDictionaryType ( ) ) ;
2021-12-15 11:30:57 +00:00
return nested_type ? std : : make_shared < DataTypeLowCardinality > ( nested_type ) : nullptr ;
}
return makeNullable ( type ) ;
}
2022-03-24 12:54:12 +00:00
NamesAndTypesList getNamesAndRecursivelyNullableTypes ( const Block & header )
{
NamesAndTypesList result ;
for ( auto & [ name , type ] : header . getNamesAndTypesList ( ) )
result . emplace_back ( name , makeNullableRecursivelyAndCheckForNothing ( type ) ) ;
return result ;
}
2021-12-15 11:30:57 +00:00
}