2021-12-15 11:30:57 +00:00
# include <DataTypes/DataTypeNullable.h>
# include <DataTypes/DataTypeArray.h>
# include <DataTypes/DataTypeTuple.h>
# include <DataTypes/DataTypeMap.h>
# include <DataTypes/DataTypeLowCardinality.h>
# include <Formats/ReadSchemaUtils.h>
# include <Processors/Formats/ISchemaReader.h>
# include <Common/assert_cast.h>
2022-03-25 21:00:00 +00:00
# include <Interpreters/Context.h>
# include <Storages/IStorage.h>
2021-12-15 11:30:57 +00:00
namespace DB
{
namespace ErrorCodes
{
2022-04-13 16:59:04 +00:00
extern const int EMPTY_DATA_PASSED ;
2021-12-15 11:30:57 +00:00
extern const int BAD_ARGUMENTS ;
2022-04-13 16:59:04 +00:00
extern const int ONLY_NULLS_WHILE_READING_SCHEMA ;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE ;
2021-12-15 11:30:57 +00:00
}
2022-03-25 21:00:00 +00:00
static std : : optional < NamesAndTypesList > getOrderedColumnsList (
const NamesAndTypesList & columns_list , const Names & columns_order_hint )
{
if ( columns_list . size ( ) ! = columns_order_hint . size ( ) )
return { } ;
std : : unordered_map < String , DataTypePtr > available_columns ;
for ( const auto & [ name , type ] : columns_list )
available_columns . emplace ( name , type ) ;
NamesAndTypesList res ;
for ( const auto & name : columns_order_hint )
{
auto it = available_columns . find ( name ) ;
if ( it = = available_columns . end ( ) )
return { } ;
res . emplace_back ( name , it - > second ) ;
}
return res ;
}
2022-04-13 16:59:04 +00:00
bool isRetryableSchemaInferenceError ( int code )
{
return code = = ErrorCodes : : EMPTY_DATA_PASSED | | code = = ErrorCodes : : ONLY_NULLS_WHILE_READING_SCHEMA ;
}
2022-01-24 18:41:44 +00:00
ColumnsDescription readSchemaFromFormat (
const String & format_name ,
const std : : optional < FormatSettings > & format_settings ,
2022-04-13 16:59:04 +00:00
ReadBufferIterator & read_buffer_iterator ,
2022-04-19 19:16:47 +00:00
bool retry ,
2022-04-13 16:59:04 +00:00
ContextPtr & context ,
2022-04-26 14:36:16 +00:00
std : : unique_ptr < ReadBuffer > & buf )
2021-12-15 11:30:57 +00:00
{
NamesAndTypesList names_and_types ;
if ( FormatFactory : : instance ( ) . checkIfFormatHasExternalSchemaReader ( format_name ) )
{
auto external_schema_reader = FormatFactory : : instance ( ) . getExternalSchemaReader ( format_name , context , format_settings ) ;
2022-04-19 19:16:47 +00:00
try
{
names_and_types = external_schema_reader - > readSchema ( ) ;
}
catch ( const DB : : Exception & e )
{
2022-07-18 15:36:33 +00:00
throw Exception ( ErrorCodes : : CANNOT_EXTRACT_TABLE_STRUCTURE , " Cannot extract table structure from {} format file. Error: {}. You can specify the structure manually " , format_name , e . message ( ) ) ;
2022-04-19 19:16:47 +00:00
}
2021-12-15 11:30:57 +00:00
}
else if ( FormatFactory : : instance ( ) . checkIfFormatHasSchemaReader ( format_name ) )
{
2022-04-13 16:59:04 +00:00
std : : string exception_messages ;
SchemaReaderPtr schema_reader ;
2022-05-18 17:56:36 +00:00
size_t max_rows_to_read = format_settings ? format_settings - > max_rows_to_read_for_schema_inference : context - > getSettingsRef ( ) . input_format_max_rows_to_read_for_schema_inference ;
size_t iterations = 0 ;
2022-06-21 13:02:48 +00:00
ColumnsDescription cached_columns ;
2022-07-18 15:36:33 +00:00
while ( true )
2021-12-15 11:30:57 +00:00
{
2022-07-18 15:36:33 +00:00
bool is_eof = false ;
try
{
2022-07-20 11:30:42 +00:00
buf = read_buffer_iterator ( cached_columns ) ;
2022-07-19 13:20:56 +00:00
if ( ! buf )
break ;
2022-07-18 15:36:33 +00:00
is_eof = buf - > eof ( ) ;
}
catch ( . . . )
{
auto exception_message = getCurrentExceptionMessage ( false ) ;
throw Exception ( ErrorCodes : : CANNOT_EXTRACT_TABLE_STRUCTURE , " Cannot extract table structure from {} format file: {}. You can specify the structure manually " , format_name , exception_message ) ;
}
2022-05-18 17:56:36 +00:00
+ + iterations ;
2022-07-18 15:36:33 +00:00
if ( is_eof )
2022-04-13 16:59:04 +00:00
{
2022-05-18 17:56:36 +00:00
auto exception_message = fmt : : format ( " Cannot extract table structure from {} format file, file is empty " , format_name ) ;
2022-04-13 16:59:04 +00:00
2022-04-19 19:16:47 +00:00
if ( ! retry )
2022-07-18 15:36:33 +00:00
throw Exception ( ErrorCodes : : CANNOT_EXTRACT_TABLE_STRUCTURE , " {}. You can specify the structure manually " , exception_message ) ;
2022-04-13 16:59:04 +00:00
exception_messages + = " \n " + exception_message ;
continue ;
}
try
{
schema_reader = FormatFactory : : instance ( ) . getSchemaReader ( format_name , * buf , context , format_settings ) ;
2022-05-18 17:56:36 +00:00
schema_reader - > setMaxRowsToRead ( max_rows_to_read ) ;
2022-04-13 16:59:04 +00:00
names_and_types = schema_reader - > readSchema ( ) ;
break ;
}
catch ( . . . )
{
auto exception_message = getCurrentExceptionMessage ( false ) ;
2022-06-02 11:28:27 +00:00
if ( schema_reader )
2022-05-18 17:56:36 +00:00
{
2022-06-02 11:28:27 +00:00
size_t rows_read = schema_reader - > getNumRowsRead ( ) ;
assert ( rows_read < = max_rows_to_read ) ;
max_rows_to_read - = schema_reader - > getNumRowsRead ( ) ;
if ( rows_read ! = 0 & & max_rows_to_read = = 0 )
2022-05-18 17:56:36 +00:00
{
2022-06-02 11:28:27 +00:00
exception_message + = " \n To increase the maximum number of rows to read for structure determination, use setting input_format_max_rows_to_read_for_schema_inference " ;
if ( iterations > 1 )
{
exception_messages + = " \n " + exception_message ;
break ;
}
retry = false ;
2022-05-18 17:56:36 +00:00
}
}
2022-04-13 16:59:04 +00:00
2022-04-19 19:16:47 +00:00
if ( ! retry | | ! isRetryableSchemaInferenceError ( getCurrentExceptionCode ( ) ) )
2022-07-18 15:36:33 +00:00
throw Exception ( ErrorCodes : : CANNOT_EXTRACT_TABLE_STRUCTURE , " Cannot extract table structure from {} format file. Error: {}. You can specify the structure manually " , format_name , exception_message ) ;
2022-04-13 16:59:04 +00:00
exception_messages + = " \n " + exception_message ;
}
2021-12-15 11:30:57 +00:00
}
2022-03-25 21:00:00 +00:00
2022-06-21 13:02:48 +00:00
if ( ! cached_columns . empty ( ) )
return cached_columns ;
2022-04-13 16:59:04 +00:00
if ( names_and_types . empty ( ) )
2022-07-18 15:36:33 +00:00
throw Exception ( ErrorCodes : : CANNOT_EXTRACT_TABLE_STRUCTURE , " All attempts to extract table structure from files failed. Errors:{} \n You can specify the structure manually " , exception_messages ) ;
2022-04-13 16:59:04 +00:00
2022-03-25 21:00:00 +00:00
/// If we have "INSERT SELECT" query then try to order
/// columns as they are ordered in table schema for formats
/// without strict column order (like JSON and TSKV).
/// It will allow to execute simple data loading with query
/// "INSERT INTO table SELECT * FROM ..."
const auto & insertion_table = context - > getInsertionTable ( ) ;
if ( ! schema_reader - > hasStrictOrderOfColumns ( ) & & ! insertion_table . empty ( ) )
{
auto storage = DatabaseCatalog : : instance ( ) . getTable ( insertion_table , context ) ;
auto metadata = storage - > getInMemoryMetadataPtr ( ) ;
auto names_in_storage = metadata - > getColumns ( ) . getNamesOfPhysical ( ) ;
auto ordered_list = getOrderedColumnsList ( names_and_types , names_in_storage ) ;
if ( ordered_list )
names_and_types = * ordered_list ;
}
2021-12-15 11:30:57 +00:00
}
else
2022-04-13 20:02:52 +00:00
throw Exception ( ErrorCodes : : BAD_ARGUMENTS , " {} file format doesn't support schema inference. You must specify the structure manually " , format_name ) ;
2021-12-15 11:30:57 +00:00
return ColumnsDescription ( names_and_types ) ;
}
2022-04-19 19:16:47 +00:00
ColumnsDescription readSchemaFromFormat ( const String & format_name , const std : : optional < FormatSettings > & format_settings , ReadBufferIterator & read_buffer_iterator , bool retry , ContextPtr & context )
2022-01-24 18:41:44 +00:00
{
std : : unique_ptr < ReadBuffer > buf_out ;
2022-04-19 19:16:47 +00:00
return readSchemaFromFormat ( format_name , format_settings , read_buffer_iterator , retry , context , buf_out ) ;
2022-01-24 18:41:44 +00:00
}
2022-03-24 12:54:12 +00:00
DataTypePtr makeNullableRecursivelyAndCheckForNothing ( DataTypePtr type )
2021-12-15 11:30:57 +00:00
{
2022-03-24 12:54:12 +00:00
if ( ! type )
return nullptr ;
2021-12-15 11:30:57 +00:00
WhichDataType which ( type ) ;
if ( which . isNothing ( ) )
return nullptr ;
if ( which . isNullable ( ) )
{
const auto * nullable_type = assert_cast < const DataTypeNullable * > ( type . get ( ) ) ;
2022-03-24 12:54:12 +00:00
return makeNullableRecursivelyAndCheckForNothing ( nullable_type - > getNestedType ( ) ) ;
2021-12-15 11:30:57 +00:00
}
if ( which . isArray ( ) )
{
const auto * array_type = assert_cast < const DataTypeArray * > ( type . get ( ) ) ;
2022-03-24 12:54:12 +00:00
auto nested_type = makeNullableRecursivelyAndCheckForNothing ( array_type - > getNestedType ( ) ) ;
2021-12-15 11:30:57 +00:00
return nested_type ? std : : make_shared < DataTypeArray > ( nested_type ) : nullptr ;
}
if ( which . isTuple ( ) )
{
const auto * tuple_type = assert_cast < const DataTypeTuple * > ( type . get ( ) ) ;
DataTypes nested_types ;
for ( const auto & element : tuple_type - > getElements ( ) )
{
2022-03-24 12:54:12 +00:00
auto nested_type = makeNullableRecursivelyAndCheckForNothing ( element ) ;
2021-12-15 11:30:57 +00:00
if ( ! nested_type )
return nullptr ;
nested_types . push_back ( nested_type ) ;
}
return std : : make_shared < DataTypeTuple > ( std : : move ( nested_types ) ) ;
}
if ( which . isMap ( ) )
{
const auto * map_type = assert_cast < const DataTypeMap * > ( type . get ( ) ) ;
2022-03-24 12:54:12 +00:00
auto key_type = makeNullableRecursivelyAndCheckForNothing ( map_type - > getKeyType ( ) ) ;
auto value_type = makeNullableRecursivelyAndCheckForNothing ( map_type - > getValueType ( ) ) ;
return key_type & & value_type ? std : : make_shared < DataTypeMap > ( removeNullable ( key_type ) , value_type ) : nullptr ;
2021-12-15 11:30:57 +00:00
}
if ( which . isLowCarnality ( ) )
{
const auto * lc_type = assert_cast < const DataTypeLowCardinality * > ( type . get ( ) ) ;
2022-03-24 12:54:12 +00:00
auto nested_type = makeNullableRecursivelyAndCheckForNothing ( lc_type - > getDictionaryType ( ) ) ;
2021-12-15 11:30:57 +00:00
return nested_type ? std : : make_shared < DataTypeLowCardinality > ( nested_type ) : nullptr ;
}
return makeNullable ( type ) ;
}
2022-03-24 12:54:12 +00:00
NamesAndTypesList getNamesAndRecursivelyNullableTypes ( const Block & header )
{
NamesAndTypesList result ;
for ( auto & [ name , type ] : header . getNamesAndTypesList ( ) )
result . emplace_back ( name , makeNullableRecursivelyAndCheckForNothing ( type ) ) ;
return result ;
}
2022-06-27 12:43:24 +00:00
String getKeyForSchemaCache ( const String & source , const String & format , const std : : optional < FormatSettings > & format_settings , const ContextPtr & context )
{
return getKeysForSchemaCache ( { source } , format , format_settings , context ) . front ( ) ;
}
Strings getKeysForSchemaCache ( const Strings & sources , const String & format , const std : : optional < FormatSettings > & format_settings , const ContextPtr & context )
{
/// For some formats data schema depends on some settings, so it's possible that
/// two queries to the same source will get two different schemas. To process this
2022-06-27 14:04:28 +00:00
/// case we add some additional information specific for the format to the cache key.
2022-06-27 12:43:24 +00:00
/// For example, for Protobuf format additional information is the path to the schema
/// and message name.
String additional_format_info = FormatFactory : : instance ( ) . getAdditionalInfoForSchemaCache ( format , context , format_settings ) ;
Strings cache_keys ;
2022-06-30 12:41:56 +00:00
cache_keys . reserve ( sources . size ( ) ) ;
2022-06-27 12:43:24 +00:00
std : : transform ( sources . begin ( ) , sources . end ( ) , std : : back_inserter ( cache_keys ) , [ & ] ( const auto & source ) { return source + format + additional_format_info ; } ) ;
return cache_keys ;
}
2021-12-15 11:30:57 +00:00
}