2017-04-01 09:19:00 +00:00
# include <Storages/StorageFile.h>
2017-12-30 00:36:06 +00:00
# include <Storages/StorageFactory.h>
2016-10-18 14:18:37 +00:00
2017-04-01 09:19:00 +00:00
# include <Interpreters/Context.h>
2017-12-30 00:36:06 +00:00
# include <Interpreters/evaluateConstantExpression.h>
2020-11-02 07:50:38 +00:00
# include <Parsers/ASTCreateQuery.h>
2017-12-30 00:36:06 +00:00
# include <Parsers/ASTLiteral.h>
# include <Parsers/ASTIdentifier.h>
2017-04-01 09:19:00 +00:00
# include <IO/ReadBufferFromFile.h>
2019-11-20 14:48:01 +00:00
# include <IO/ReadHelpers.h>
2017-04-01 09:19:00 +00:00
# include <IO/WriteBufferFromFile.h>
# include <IO/WriteHelpers.h>
2017-12-30 00:36:06 +00:00
2018-06-10 19:22:49 +00:00
# include <Formats/FormatFactory.h>
2020-01-15 07:52:45 +00:00
# include <DataTypes/DataTypeString.h>
2019-01-23 14:48:50 +00:00
# include <DataStreams/IBlockInputStream.h>
2017-04-01 09:19:00 +00:00
# include <DataStreams/IBlockOutputStream.h>
2018-07-11 12:05:04 +00:00
# include <DataStreams/AddingDefaultsBlockInputStream.h>
2016-10-18 14:18:37 +00:00
2017-04-01 09:19:00 +00:00
# include <Common/escapeForFileName.h>
2017-12-30 00:36:06 +00:00
# include <Common/typeid_cast.h>
2019-07-21 13:15:04 +00:00
# include <Common/parseGlobs.h>
2021-03-30 17:57:21 +00:00
# include "Storages/ColumnsDescription.h"
# include "Storages/StorageInMemoryMetadata.h"
# include "Storages/StorageMergeTree.h"
2016-10-18 14:18:37 +00:00
2016-10-25 13:49:07 +00:00
# include <fcntl.h>
2020-01-05 02:57:09 +00:00
# include <unistd.h>
2016-10-25 13:49:07 +00:00
2018-04-06 09:53:29 +00:00
# include <Poco/Path.h>
2018-04-10 07:09:50 +00:00
# include <Poco/File.h>
2017-12-18 02:37:08 +00:00
2019-07-21 13:15:04 +00:00
# include <re2/re2.h>
2019-08-27 15:20:31 +00:00
# include <filesystem>
2020-01-04 14:45:11 +00:00
# include <Storages/Distributed/DirectoryMonitor.h>
2020-01-31 13:12:11 +00:00
# include <Processors/Sources/SourceWithProgress.h>
2020-05-18 10:00:22 +00:00
# include <Processors/Formats/InputStreamFromInputFormat.h>
2021-02-16 14:50:11 +00:00
# include <Processors/Sources/NullSource.h>
2020-01-31 13:12:11 +00:00
# include <Processors/Pipe.h>
2019-07-21 13:15:04 +00:00
2019-08-27 15:20:31 +00:00
namespace fs = std : : filesystem ;
2019-07-21 13:15:04 +00:00
2016-10-18 14:18:37 +00:00
namespace DB
{
2016-10-28 17:38:32 +00:00
namespace ErrorCodes
{
2020-02-25 18:02:41 +00:00
extern const int BAD_ARGUMENTS ;
extern const int NOT_IMPLEMENTED ;
2017-04-01 07:20:54 +00:00
extern const int CANNOT_SEEK_THROUGH_FILE ;
2020-01-05 02:57:09 +00:00
extern const int CANNOT_TRUNCATE_FILE ;
2017-04-01 07:20:54 +00:00
extern const int DATABASE_ACCESS_DENIED ;
2017-12-30 00:36:06 +00:00
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH ;
extern const int UNKNOWN_IDENTIFIER ;
2017-11-03 19:53:10 +00:00
extern const int INCORRECT_FILE_NAME ;
2018-04-10 08:54:31 +00:00
extern const int FILE_DOESNT_EXIST ;
2020-09-24 23:29:16 +00:00
extern const int TIMEOUT_EXCEEDED ;
2020-10-14 12:19:29 +00:00
extern const int INCOMPATIBLE_COLUMNS ;
2018-08-10 04:02:56 +00:00
}
2016-10-28 17:38:32 +00:00
2019-08-02 15:00:12 +00:00
namespace
{
2019-09-22 22:13:42 +00:00
2019-08-10 16:00:01 +00:00
/* Recursive directory listing with matched paths as a result.
* Have the same method in StorageHDFS .
*/
2020-03-09 01:03:43 +00:00
std : : vector < std : : string > listFilesWithRegexpMatching ( const std : : string & path_for_ls , const std : : string & for_match )
2019-08-02 15:00:12 +00:00
{
2019-08-30 15:19:05 +00:00
const size_t first_glob = for_match . find_first_of ( " *?{ " ) ;
2019-08-05 23:10:19 +00:00
2019-08-30 15:19:05 +00:00
const size_t end_of_path_without_globs = for_match . substr ( 0 , first_glob ) . rfind ( ' / ' ) ;
const std : : string suffix_with_globs = for_match . substr ( end_of_path_without_globs ) ; /// begin with '/'
2019-08-08 14:26:02 +00:00
2019-08-30 15:19:05 +00:00
const size_t next_slash = suffix_with_globs . find ( ' / ' , 1 ) ;
2020-01-05 20:11:26 +00:00
auto regexp = makeRegexpPatternFromGlobs ( suffix_with_globs . substr ( 0 , next_slash ) ) ;
re2 : : RE2 matcher ( regexp ) ;
2019-08-05 23:10:19 +00:00
2019-08-08 14:26:02 +00:00
std : : vector < std : : string > result ;
2019-08-30 15:19:05 +00:00
const std : : string prefix_without_globs = path_for_ls + for_match . substr ( 1 , end_of_path_without_globs ) ;
2020-01-05 20:11:26 +00:00
if ( ! fs : : exists ( fs : : path ( prefix_without_globs ) ) )
2019-08-10 16:00:01 +00:00
{
return result ;
}
2019-08-30 15:19:05 +00:00
const fs : : directory_iterator end ;
for ( fs : : directory_iterator it ( prefix_without_globs ) ; it ! = end ; + + it )
2019-08-02 15:00:12 +00:00
{
2019-08-30 15:19:05 +00:00
const std : : string full_path = it - > path ( ) . string ( ) ;
const size_t last_slash = full_path . rfind ( ' / ' ) ;
const String file_name = full_path . substr ( last_slash ) ;
const bool looking_for_directory = next_slash ! = std : : string : : npos ;
2019-08-10 16:00:01 +00:00
/// Condition is_directory means what kind of path is it in current iteration of ls
2019-08-30 15:19:05 +00:00
if ( ! fs : : is_directory ( it - > path ( ) ) & & ! looking_for_directory )
2019-08-02 15:00:12 +00:00
{
2019-08-08 14:26:02 +00:00
if ( re2 : : RE2 : : FullMatch ( file_name , matcher ) )
2019-08-02 15:00:12 +00:00
{
result . push_back ( it - > path ( ) . string ( ) ) ;
}
}
2019-08-30 15:19:05 +00:00
else if ( fs : : is_directory ( it - > path ( ) ) & & looking_for_directory )
2019-08-02 15:00:12 +00:00
{
2019-08-08 14:26:02 +00:00
if ( re2 : : RE2 : : FullMatch ( file_name , matcher ) )
2019-08-02 15:00:12 +00:00
{
2019-09-23 14:50:33 +00:00
/// Recursion depth is limited by pattern. '*' works only for depth = 1, for depth = 2 pattern path is '*/*'. So we do not need additional check.
2019-09-22 22:13:42 +00:00
Strings result_part = listFilesWithRegexpMatching ( full_path + " / " , suffix_with_globs . substr ( next_slash ) ) ;
2019-08-02 15:00:12 +00:00
std : : move ( result_part . begin ( ) , result_part . end ( ) , std : : back_inserter ( result ) ) ;
}
}
}
return result ;
}
2016-10-18 14:18:37 +00:00
2020-03-09 01:03:43 +00:00
std : : string getTablePath ( const std : : string & table_dir_path , const std : : string & format_name )
2016-10-18 14:18:37 +00:00
{
2019-10-25 19:07:47 +00:00
return table_dir_path + " /data. " + escapeForFileName ( format_name ) ;
2016-11-11 17:01:02 +00:00
}
2018-04-19 04:39:16 +00:00
/// Both db_dir_path and table_path must be converted to absolute paths (in particular, path cannot contain '..').
2020-03-09 01:03:43 +00:00
void checkCreationIsAllowed ( const Context & context_global , const std : : string & db_dir_path , const std : : string & table_path )
2016-11-11 17:01:02 +00:00
{
2018-04-06 09:53:29 +00:00
if ( context_global . getApplicationType ( ) ! = Context : : ApplicationType : : SERVER )
return ;
2019-12-29 07:03:39 +00:00
/// "/dev/null" is allowed for perf testing
if ( ! startsWith ( table_path , db_dir_path ) & & table_path ! = " /dev/null " )
2020-01-05 20:11:26 +00:00
throw Exception ( " File is not inside " + db_dir_path , ErrorCodes : : DATABASE_ACCESS_DENIED ) ;
2018-04-10 07:09:50 +00:00
Poco : : File table_path_poco_file = Poco : : File ( table_path ) ;
2019-12-17 08:06:39 +00:00
if ( table_path_poco_file . exists ( ) & & table_path_poco_file . isDirectory ( ) )
2020-01-05 20:11:26 +00:00
throw Exception ( " File must not be a directory " , ErrorCodes : : INCORRECT_FILE_NAME ) ;
2016-10-18 14:18:37 +00:00
}
2019-09-06 18:29:41 +00:00
}
2016-10-18 14:18:37 +00:00
2020-10-14 12:19:29 +00:00
Strings StorageFile : : getPathsList ( const String & table_path , const String & user_files_path , const Context & context )
{
String user_files_absolute_path = Poco : : Path ( user_files_path ) . makeAbsolute ( ) . makeDirectory ( ) . toString ( ) ;
Poco : : Path poco_path = Poco : : Path ( table_path ) ;
if ( poco_path . isRelative ( ) )
poco_path = Poco : : Path ( user_files_absolute_path , poco_path ) ;
Strings paths ;
const String path = poco_path . absolute ( ) . toString ( ) ;
if ( path . find_first_of ( " *?{ " ) = = std : : string : : npos )
paths . push_back ( path ) ;
else
paths = listFilesWithRegexpMatching ( " / " , path ) ;
for ( const auto & cur_path : paths )
checkCreationIsAllowed ( context , user_files_absolute_path , cur_path ) ;
return paths ;
}
2019-10-30 14:17:55 +00:00
StorageFile : : StorageFile ( int table_fd_ , CommonArguments args )
: StorageFile ( args )
2016-10-18 14:18:37 +00:00
{
2019-10-30 14:17:55 +00:00
if ( args . context . getApplicationType ( ) = = Context : : ApplicationType : : SERVER )
2019-10-30 14:17:55 +00:00
throw Exception ( " Using file descriptor as source of storage isn't allowed for server daemons " , ErrorCodes : : DATABASE_ACCESS_DENIED ) ;
2020-10-14 12:19:29 +00:00
if ( args . format_name = = " Distributed " )
throw Exception ( " Distributed format is allowed only with explicit file path " , ErrorCodes : : INCORRECT_FILE_NAME ) ;
2019-08-24 21:20:20 +00:00
2019-10-30 14:17:55 +00:00
is_db_table = false ;
use_table_fd = true ;
table_fd = table_fd_ ;
2017-04-01 07:20:54 +00:00
2019-10-30 14:17:55 +00:00
/// Save initial offset, it will be used for repeating SELECTs
/// If FD isn't seekable (lseek returns -1), then the second and subsequent SELECTs will fail.
table_fd_init_offset = lseek ( table_fd , 0 , SEEK_CUR ) ;
}
2017-11-03 19:53:10 +00:00
2019-12-11 20:05:53 +00:00
StorageFile : : StorageFile ( const std : : string & table_path_ , const std : : string & user_files_path , CommonArguments args )
2019-10-30 14:17:55 +00:00
: StorageFile ( args )
{
is_db_table = false ;
2020-10-14 12:19:29 +00:00
paths = getPathsList ( table_path_ , user_files_path , args . context ) ;
2020-01-04 18:05:42 +00:00
if ( args . format_name = = " Distributed " )
{
2020-10-14 12:19:29 +00:00
if ( paths . empty ( ) )
throw Exception ( " Cannot get table structure from file, because no files match specified name " , ErrorCodes : : INCORRECT_FILE_NAME ) ;
auto & first_path = paths [ 0 ] ;
Block header = StorageDistributedDirectoryMonitor : : createStreamFromFile ( first_path ) - > getHeader ( ) ;
StorageInMemoryMetadata storage_metadata ;
auto columns = ColumnsDescription ( header . getNamesAndTypesList ( ) ) ;
if ( ! args . columns . empty ( ) & & columns ! = args . columns )
throw Exception ( " Table structure and file structure are different " , ErrorCodes : : INCOMPATIBLE_COLUMNS ) ;
storage_metadata . setColumns ( columns ) ;
setInMemoryMetadata ( storage_metadata ) ;
2020-01-04 18:05:42 +00:00
}
2019-10-30 14:17:55 +00:00
}
2018-04-06 09:53:29 +00:00
2019-10-30 14:17:55 +00:00
StorageFile : : StorageFile ( const std : : string & relative_table_dir_path , CommonArguments args )
: StorageFile ( args )
{
if ( relative_table_dir_path . empty ( ) )
throw Exception ( " Storage " + getName ( ) + " requires data path " , ErrorCodes : : INCORRECT_FILE_NAME ) ;
2020-10-14 12:19:29 +00:00
if ( args . format_name = = " Distributed " )
throw Exception ( " Distributed format is allowed only with explicit file path " , ErrorCodes : : INCORRECT_FILE_NAME ) ;
2017-04-01 07:20:54 +00:00
2019-12-24 13:29:53 +00:00
String table_dir_path = base_path + relative_table_dir_path + " / " ;
2019-10-30 14:17:55 +00:00
Poco : : File ( table_dir_path ) . createDirectories ( ) ;
paths = { getTablePath ( table_dir_path , format_name ) } ;
2016-10-18 14:18:37 +00:00
}
2019-10-30 14:17:55 +00:00
StorageFile : : StorageFile ( CommonArguments args )
2020-04-27 13:55:30 +00:00
: IStorage ( args . table_id )
2019-12-04 16:06:55 +00:00
, format_name ( args . format_name )
2020-11-02 07:50:38 +00:00
, format_settings ( args . format_settings )
2019-12-04 16:06:55 +00:00
, compression_method ( args . compression_method )
2019-12-25 21:17:49 +00:00
, base_path ( args . context . getPath ( ) )
2019-10-30 14:17:55 +00:00
{
2020-06-19 15:39:41 +00:00
StorageInMemoryMetadata storage_metadata ;
2020-01-04 18:37:31 +00:00
if ( args . format_name ! = " Distributed " )
2020-06-19 15:39:41 +00:00
storage_metadata . setColumns ( args . columns ) ;
2020-01-04 18:37:31 +00:00
2020-06-19 15:39:41 +00:00
storage_metadata . setConstraints ( args . constraints ) ;
setInMemoryMetadata ( storage_metadata ) ;
2019-10-30 14:17:55 +00:00
}
2016-10-18 14:18:37 +00:00
2020-09-24 23:29:16 +00:00
static std : : chrono : : seconds getLockTimeout ( const Context & context )
{
const Settings & settings = context . getSettingsRef ( ) ;
Int64 lock_timeout = settings . lock_acquire_timeout . totalSeconds ( ) ;
if ( settings . max_execution_time . totalSeconds ( ) ! = 0 & & settings . max_execution_time . totalSeconds ( ) < lock_timeout )
lock_timeout = settings . max_execution_time . totalSeconds ( ) ;
return std : : chrono : : seconds { lock_timeout } ;
}
2021-03-30 17:57:21 +00:00
using StorageFilePtr = std : : shared_ptr < StorageFile > ;
2020-09-24 23:29:16 +00:00
2020-01-31 13:12:11 +00:00
class StorageFileSource : public SourceWithProgress
2016-10-18 14:18:37 +00:00
{
public :
2020-01-31 13:12:11 +00:00
struct FilesInfo
{
std : : vector < std : : string > files ;
std : : atomic < size_t > next_file_to_read = 0 ;
bool need_path_column = false ;
bool need_file_column = false ;
} ;
using FilesInfoPtr = std : : shared_ptr < FilesInfo > ;
2021-03-01 14:11:25 +00:00
static Block getHeader ( const StorageMetadataPtr & metadata_snapshot , bool need_path_column , bool need_file_column )
{
auto header = metadata_snapshot - > getSampleBlock ( ) ;
/// Note: AddingDefaultsBlockInputStream doesn't change header.
if ( need_path_column )
header . insert ( { DataTypeString ( ) . createColumn ( ) , std : : make_shared < DataTypeString > ( ) , " _path " } ) ;
if ( need_file_column )
header . insert ( { DataTypeString ( ) . createColumn ( ) , std : : make_shared < DataTypeString > ( ) , " _file " } ) ;
return header ;
}
2021-03-30 17:57:21 +00:00
static Block getBlockForSource (
const StorageFilePtr & storage ,
const StorageMetadataPtr & metadata_snapshot ,
const ColumnsDescription & columns_description ,
const FilesInfoPtr & files_info )
{
if ( FormatFactory : : instance ( ) . checkIfFormatIsColumnOriented ( storage - > format_name ) ) {
return metadata_snapshot - > getSampleBlockForColumns ( columns_description . getNamesOfPhysical ( ) , storage - > getVirtuals ( ) , storage - > getStorageID ( ) ) ;
} else {
return getHeader ( metadata_snapshot , files_info - > need_path_column , files_info - > need_file_column ) ;
}
}
2020-01-31 13:12:11 +00:00
StorageFileSource (
std : : shared_ptr < StorageFile > storage_ ,
2020-06-16 15:51:29 +00:00
const StorageMetadataPtr & metadata_snapshot_ ,
2020-01-31 13:12:11 +00:00
const Context & context_ ,
UInt64 max_block_size_ ,
FilesInfoPtr files_info_ ,
2020-10-02 12:38:50 +00:00
ColumnsDescription columns_description_ )
2021-03-30 17:57:21 +00:00
: SourceWithProgress ( getBlockForSource ( storage_ , metadata_snapshot_ , columns_description_ , files_info_ ) )
2020-01-31 13:12:11 +00:00
, storage ( std : : move ( storage_ ) )
2020-06-16 15:51:29 +00:00
, metadata_snapshot ( metadata_snapshot_ )
2020-01-31 13:12:11 +00:00
, files_info ( std : : move ( files_info_ ) )
2020-10-02 12:38:50 +00:00
, columns_description ( std : : move ( columns_description_ ) )
2020-01-31 13:12:11 +00:00
, context ( context_ )
, max_block_size ( max_block_size_ )
2017-04-01 07:20:54 +00:00
{
2019-11-13 12:17:31 +00:00
if ( storage - > use_table_fd )
2017-04-01 07:20:54 +00:00
{
2020-09-24 23:29:16 +00:00
unique_lock = std : : unique_lock ( storage - > rwlock , getLockTimeout ( context ) ) ;
if ( ! unique_lock )
throw Exception ( " Lock timeout exceeded " , ErrorCodes : : TIMEOUT_EXCEEDED ) ;
2017-07-28 17:34:02 +00:00
2017-04-01 07:20:54 +00:00
/// We could use common ReadBuffer and WriteBuffer in storage to leverage cache
/// and add ability to seek unseekable files, but cache sync isn't supported.
2019-11-13 12:17:31 +00:00
if ( storage - > table_fd_was_used ) /// We need seek to initial position
2017-04-01 07:20:54 +00:00
{
2019-11-13 12:17:31 +00:00
if ( storage - > table_fd_init_offset < 0 )
throw Exception ( " File descriptor isn't seekable, inside " + storage - > getName ( ) , ErrorCodes : : CANNOT_SEEK_THROUGH_FILE ) ;
2017-04-01 07:20:54 +00:00
2019-01-22 19:56:53 +00:00
/// ReadBuffer's seek() doesn't make sense, since cache is empty
2019-11-13 12:17:31 +00:00
if ( lseek ( storage - > table_fd , storage - > table_fd_init_offset , SEEK_SET ) < 0 )
throwFromErrno ( " Cannot seek file descriptor, inside " + storage - > getName ( ) , ErrorCodes : : CANNOT_SEEK_THROUGH_FILE ) ;
2017-04-01 07:20:54 +00:00
}
2019-11-13 12:17:31 +00:00
storage - > table_fd_was_used = true ;
2017-04-01 07:20:54 +00:00
}
else
{
2020-09-24 23:29:16 +00:00
shared_lock = std : : shared_lock ( storage - > rwlock , getLockTimeout ( context ) ) ;
if ( ! shared_lock )
throw Exception ( " Lock timeout exceeded " , ErrorCodes : : TIMEOUT_EXCEEDED ) ;
2017-04-01 07:20:54 +00:00
}
}
String getName ( ) const override
{
2019-11-13 12:17:31 +00:00
return storage - > getName ( ) ;
2017-04-01 07:20:54 +00:00
}
2020-01-31 13:12:11 +00:00
Chunk generate ( ) override
2017-04-01 07:20:54 +00:00
{
2020-01-31 13:12:11 +00:00
while ( ! finished_generate )
2020-01-27 17:06:32 +00:00
{
2020-01-31 13:12:11 +00:00
/// Open file lazily on first read. This is needed to avoid too many open files from different streams.
if ( ! reader )
{
if ( ! storage - > use_table_fd )
{
auto current_file = files_info - > next_file_to_read . fetch_add ( 1 ) ;
if ( current_file > = files_info - > files . size ( ) )
return { } ;
current_path = files_info - > files [ current_file ] ;
/// Special case for distributed format. Defaults are not needed here.
if ( storage - > format_name = = " Distributed " )
{
reader = StorageDistributedDirectoryMonitor : : createStreamFromFile ( current_path ) ;
continue ;
}
}
std : : unique_ptr < ReadBuffer > nested_buffer ;
CompressionMethod method ;
if ( storage - > use_table_fd )
{
nested_buffer = std : : make_unique < ReadBufferFromFileDescriptor > ( storage - > table_fd ) ;
method = chooseCompressionMethod ( " " , storage - > compression_method ) ;
}
else
{
nested_buffer = std : : make_unique < ReadBufferFromFile > ( current_path ) ;
method = chooseCompressionMethod ( current_path , storage - > compression_method ) ;
}
2020-08-03 17:38:11 +00:00
read_buf = wrapReadBufferWithCompressionMethod ( std : : move ( nested_buffer ) , method ) ;
2021-03-30 17:57:21 +00:00
auto get_block_for_format = [ & ] ( ) - > Block
{
if ( FormatFactory : : instance ( ) . checkIfFormatIsColumnOriented ( storage - > format_name ) )
return metadata_snapshot - > getSampleBlockForColumns ( columns_description . getNamesOfPhysical ( ) ) ;
return metadata_snapshot - > getSampleBlock ( ) ;
} ;
auto format = FormatFactory : : instance ( ) . getInput (
storage - > format_name , * read_buf , get_block_for_format ( ) , context , max_block_size , storage - > format_settings ) ;
2020-05-18 10:00:22 +00:00
reader = std : : make_shared < InputStreamFromInputFormat > ( format ) ;
2020-01-31 13:12:11 +00:00
2020-10-02 12:38:50 +00:00
if ( columns_description . hasDefaults ( ) )
reader = std : : make_shared < AddingDefaultsBlockInputStream > ( reader , columns_description , context ) ;
2020-01-31 13:12:11 +00:00
reader - > readPrefix ( ) ;
}
2020-01-27 17:06:32 +00:00
2020-01-31 13:12:11 +00:00
if ( auto res = reader - > read ( ) )
2020-01-15 07:52:45 +00:00
{
2020-01-31 13:12:11 +00:00
Columns columns = res . getColumns ( ) ;
UInt64 num_rows = res . rows ( ) ;
/// Enrich with virtual columns.
if ( files_info - > need_path_column )
{
auto column = DataTypeString ( ) . createColumnConst ( num_rows , current_path ) ;
columns . push_back ( column - > convertToFullColumnIfConst ( ) ) ;
}
if ( files_info - > need_file_column )
{
size_t last_slash_pos = current_path . find_last_of ( ' / ' ) ;
auto file_name = current_path . substr ( last_slash_pos + 1 ) ;
auto column = DataTypeString ( ) . createColumnConst ( num_rows , std : : move ( file_name ) ) ;
columns . push_back ( column - > convertToFullColumnIfConst ( ) ) ;
}
return Chunk ( std : : move ( columns ) , num_rows ) ;
2020-01-15 07:52:45 +00:00
}
2020-01-27 17:06:32 +00:00
2020-01-31 13:12:11 +00:00
/// Read only once for file descriptor.
if ( storage - > use_table_fd )
finished_generate = true ;
/// Close file prematurely if stream was ended.
2020-01-27 18:26:58 +00:00
reader - > readSuffix ( ) ;
2020-01-27 17:06:32 +00:00
reader . reset ( ) ;
read_buf . reset ( ) ;
}
2020-01-31 13:12:11 +00:00
return { } ;
2017-04-01 07:20:54 +00:00
}
2016-10-18 14:18:37 +00:00
private :
2019-11-13 12:17:31 +00:00
std : : shared_ptr < StorageFile > storage ;
2020-06-16 15:51:29 +00:00
StorageMetadataPtr metadata_snapshot ;
2020-01-31 13:12:11 +00:00
FilesInfoPtr files_info ;
String current_path ;
2017-04-01 07:20:54 +00:00
Block sample_block ;
2019-11-19 12:46:07 +00:00
std : : unique_ptr < ReadBuffer > read_buf ;
2017-04-01 07:20:54 +00:00
BlockInputStreamPtr reader ;
2019-01-27 00:38:30 +00:00
2020-10-02 12:38:50 +00:00
ColumnsDescription columns_description ;
2020-01-31 13:12:11 +00:00
2020-01-27 18:13:02 +00:00
const Context & context ; /// TODO Untangle potential issues with context lifetime.
UInt64 max_block_size ;
2020-01-31 13:12:11 +00:00
bool finished_generate = false ;
2020-01-27 18:13:02 +00:00
2020-09-24 23:29:16 +00:00
std : : shared_lock < std : : shared_timed_mutex > shared_lock ;
std : : unique_lock < std : : shared_timed_mutex > unique_lock ;
2016-10-18 14:18:37 +00:00
} ;
2020-08-03 13:54:14 +00:00
Pipe StorageFile : : read (
2020-01-15 07:52:45 +00:00
const Names & column_names ,
2020-06-16 15:51:29 +00:00
const StorageMetadataPtr & metadata_snapshot ,
2020-09-20 17:52:17 +00:00
SelectQueryInfo & /*query_info*/ ,
2017-04-01 07:20:54 +00:00
const Context & context ,
2018-09-08 11:29:23 +00:00
QueryProcessingStage : : Enum /*processed_stage*/ ,
2019-02-18 23:38:44 +00:00
size_t max_block_size ,
2019-12-02 11:25:41 +00:00
unsigned num_streams )
2016-10-18 14:18:37 +00:00
{
2021-03-30 17:57:21 +00:00
std : : cout < < " Names " < < std : : endl ;
for ( const auto & name : column_names ) {
std : : cout < < name < < std : : endl ;
}
std : : cout < < " ----- " < < std : : endl ;
2019-07-21 13:15:04 +00:00
BlockInputStreams blocks_input ;
2020-01-27 17:06:32 +00:00
2019-09-06 18:29:41 +00:00
if ( use_table_fd ) /// need to call ctr BlockInputStream
paths = { " " } ; /// when use fd, paths are empty
2019-12-17 08:06:39 +00:00
else
if ( paths . size ( ) = = 1 & & ! Poco : : File ( paths [ 0 ] ) . exists ( ) )
2021-02-16 14:50:11 +00:00
{
if ( context . getSettingsRef ( ) . engine_file_empty_if_not_exists )
return Pipe ( std : : make_shared < NullSource > ( metadata_snapshot - > getSampleBlockForColumns ( column_names , getVirtuals ( ) , getStorageID ( ) ) ) ) ;
else
throw Exception ( " File " + paths [ 0 ] + " doesn't exist " , ErrorCodes : : FILE_DOESNT_EXIST ) ;
}
2020-01-27 17:06:32 +00:00
2020-01-31 13:12:11 +00:00
auto files_info = std : : make_shared < StorageFileSource : : FilesInfo > ( ) ;
files_info - > files = paths ;
2020-01-15 07:52:45 +00:00
for ( const auto & column : column_names )
{
if ( column = = " _path " )
2020-01-31 13:12:11 +00:00
files_info - > need_path_column = true ;
2020-01-15 07:52:45 +00:00
if ( column = = " _file " )
2020-01-31 13:12:11 +00:00
files_info - > need_file_column = true ;
2020-01-15 07:52:45 +00:00
}
2020-01-04 14:45:11 +00:00
2020-01-31 13:12:11 +00:00
auto this_ptr = std : : static_pointer_cast < StorageFile > ( shared_from_this ( ) ) ;
2020-01-04 14:45:11 +00:00
2020-02-14 10:22:05 +00:00
if ( num_streams > paths . size ( ) )
2020-01-31 13:12:11 +00:00
num_streams = paths . size ( ) ;
Pipes pipes ;
pipes . reserve ( num_streams ) ;
for ( size_t i = 0 ; i < num_streams ; + + i )
2020-11-02 07:50:38 +00:00
{
2021-03-30 17:57:21 +00:00
const auto get_columns_for_format = [ & ] ( ) - > ColumnsDescription
{
if ( FormatFactory : : instance ( ) . checkIfFormatIsColumnOriented ( format_name ) )
return metadata_snapshot - > getColumnsForNames ( column_names , getVirtuals ( ) , getStorageID ( ) ) ;
else
return metadata_snapshot - > getColumns ( ) ;
} ;
2020-03-23 02:12:31 +00:00
pipes . emplace_back ( std : : make_shared < StorageFileSource > (
2021-03-30 17:57:21 +00:00
this_ptr , metadata_snapshot , context , max_block_size , files_info , get_columns_for_format ( ) ) ) ;
2020-11-02 07:50:38 +00:00
}
2020-01-31 13:12:11 +00:00
2020-08-06 12:24:05 +00:00
return Pipe : : unitePipes ( std : : move ( pipes ) ) ;
2016-10-18 14:18:37 +00:00
}
class StorageFileBlockOutputStream : public IBlockOutputStream
{
public :
2020-06-16 15:51:29 +00:00
explicit StorageFileBlockOutputStream (
StorageFile & storage_ ,
const StorageMetadataPtr & metadata_snapshot_ ,
2020-09-24 23:29:16 +00:00
std : : unique_lock < std : : shared_timed_mutex > & & lock_ ,
2019-10-30 14:17:55 +00:00
const CompressionMethod compression_method ,
2020-11-02 07:50:38 +00:00
const Context & context ,
2021-02-19 23:27:23 +00:00
const std : : optional < FormatSettings > & format_settings ,
int & flags )
2020-06-16 15:51:29 +00:00
: storage ( storage_ )
, metadata_snapshot ( metadata_snapshot_ )
2020-09-24 23:29:16 +00:00
, lock ( std : : move ( lock_ ) )
2017-04-01 07:20:54 +00:00
{
2020-09-24 23:29:16 +00:00
if ( ! lock )
throw Exception ( " Lock timeout exceeded " , ErrorCodes : : TIMEOUT_EXCEEDED ) ;
2020-07-07 11:45:20 +00:00
std : : unique_ptr < WriteBufferFromFileDescriptor > naked_buffer = nullptr ;
2017-04-01 07:20:54 +00:00
if ( storage . use_table_fd )
{
/** NOTE: Using real file binded to FD may be misleading:
* SELECT * ; INSERT insert_data ; SELECT * ; last SELECT returns initil_fd_data + insert_data
* INSERT data ; SELECT * ; last SELECT returns only insert_data
*/
storage . table_fd_was_used = true ;
2021-02-19 23:27:23 +00:00
naked_buffer = std : : make_unique < WriteBufferFromFileDescriptor > ( storage . table_fd , DBMS_DEFAULT_BUFFER_SIZE ) ;
2017-04-01 07:20:54 +00:00
}
else
{
2019-09-06 18:29:41 +00:00
if ( storage . paths . size ( ) ! = 1 )
2020-01-09 14:33:58 +00:00
throw Exception ( " Table ' " + storage . getStorageID ( ) . getNameForLogs ( ) + " ' is in readonly mode because of globs in filepath " , ErrorCodes : : DATABASE_ACCESS_DENIED ) ;
2021-02-19 23:27:23 +00:00
flags | = O_WRONLY | O_APPEND | O_CREAT ;
naked_buffer = std : : make_unique < WriteBufferFromFile > ( storage . paths [ 0 ] , DBMS_DEFAULT_BUFFER_SIZE , flags ) ;
2017-04-01 07:20:54 +00:00
}
2020-07-07 11:45:20 +00:00
/// In case of CSVWithNames we have already written prefix.
if ( naked_buffer - > size ( ) )
prefix_written = true ;
write_buf = wrapWriteBufferWithCompressionMethod ( std : : move ( naked_buffer ) , compression_method , 3 ) ;
2020-12-30 03:07:30 +00:00
writer = FormatFactory : : instance ( ) . getOutputStreamParallelIfPossible ( storage . format_name ,
2020-11-02 07:50:38 +00:00
* write_buf , metadata_snapshot - > getSampleBlock ( ) , context ,
{ } , format_settings ) ;
2017-04-01 07:20:54 +00:00
}
2020-06-16 15:51:29 +00:00
Block getHeader ( ) const override { return metadata_snapshot - > getSampleBlock ( ) ; }
2018-02-19 00:45:32 +00:00
2017-04-01 07:20:54 +00:00
void write ( const Block & block ) override
{
writer - > write ( block ) ;
}
void writePrefix ( ) override
{
2020-07-07 11:45:20 +00:00
if ( ! prefix_written )
writer - > writePrefix ( ) ;
prefix_written = true ;
2017-04-01 07:20:54 +00:00
}
void writeSuffix ( ) override
{
writer - > writeSuffix ( ) ;
}
void flush ( ) override
{
writer - > flush ( ) ;
}
2016-10-18 14:18:37 +00:00
private :
2017-04-01 07:20:54 +00:00
StorageFile & storage ;
2020-06-16 15:51:29 +00:00
StorageMetadataPtr metadata_snapshot ;
2020-09-24 23:29:16 +00:00
std : : unique_lock < std : : shared_timed_mutex > lock ;
2019-11-19 12:46:07 +00:00
std : : unique_ptr < WriteBuffer > write_buf ;
2017-04-01 07:20:54 +00:00
BlockOutputStreamPtr writer ;
2020-07-07 11:45:20 +00:00
bool prefix_written { false } ;
2016-10-18 14:18:37 +00:00
} ;
BlockOutputStreamPtr StorageFile : : write (
2017-12-01 21:13:25 +00:00
const ASTPtr & /*query*/ ,
2020-06-16 15:51:29 +00:00
const StorageMetadataPtr & metadata_snapshot ,
2019-10-30 14:17:55 +00:00
const Context & context )
2016-10-18 14:18:37 +00:00
{
2020-01-04 14:45:11 +00:00
if ( format_name = = " Distributed " )
throw Exception ( " Method write is not implemented for Distributed format " , ErrorCodes : : NOT_IMPLEMENTED ) ;
2021-02-19 23:27:23 +00:00
int flags = 0 ;
2020-09-16 19:58:27 +00:00
std : : string path ;
2021-02-16 14:50:11 +00:00
if ( context . getSettingsRef ( ) . engine_file_truncate_on_insert )
2021-02-19 23:27:23 +00:00
flags | = O_TRUNC ;
2021-02-16 14:50:11 +00:00
2020-09-16 19:58:27 +00:00
if ( ! paths . empty ( ) )
2020-12-08 14:13:35 +00:00
{
2020-09-16 19:58:27 +00:00
path = paths [ 0 ] ;
2020-12-08 14:13:35 +00:00
Poco : : File ( Poco : : Path ( path ) . makeParent ( ) ) . createDirectories ( ) ;
}
2020-09-16 19:58:27 +00:00
2020-12-21 21:41:52 +00:00
return std : : make_shared < StorageFileBlockOutputStream > (
* this ,
metadata_snapshot ,
std : : unique_lock { rwlock , getLockTimeout ( context ) } ,
chooseCompressionMethod ( path , compression_method ) ,
context ,
2021-02-19 23:27:23 +00:00
format_settings ,
flags ) ;
2016-10-18 14:18:37 +00:00
}
2020-09-16 19:58:27 +00:00
2020-11-01 17:38:43 +00:00
bool StorageFile : : storesDataOnDisk ( ) const
{
return is_db_table ;
2016-10-18 14:18:37 +00:00
}
2019-09-06 08:53:32 +00:00
Strings StorageFile : : getDataPaths ( ) const
2019-09-04 19:55:56 +00:00
{
2019-09-05 18:09:19 +00:00
if ( paths . empty ( ) )
2019-12-03 16:25:32 +00:00
throw Exception ( " Table ' " + getStorageID ( ) . getNameForLogs ( ) + " ' is in readonly mode " , ErrorCodes : : DATABASE_ACCESS_DENIED ) ;
2019-09-06 08:53:32 +00:00
return paths ;
2019-09-04 19:55:56 +00:00
}
2016-10-18 14:18:37 +00:00
2020-04-07 14:05:51 +00:00
void StorageFile : : rename ( const String & new_path_to_table_data , const StorageID & new_table_id )
2016-10-18 14:18:37 +00:00
{
2017-04-01 07:20:54 +00:00
if ( ! is_db_table )
2019-12-03 16:25:32 +00:00
throw Exception ( " Can't rename table " + getStorageID ( ) . getNameForLogs ( ) + " binded to user-defined file (or FD) " , ErrorCodes : : DATABASE_ACCESS_DENIED ) ;
2016-10-18 14:18:37 +00:00
2019-09-04 11:11:30 +00:00
if ( paths . size ( ) ! = 1 )
2019-12-03 16:25:32 +00:00
throw Exception ( " Can't rename table " + getStorageID ( ) . getNameForLogs ( ) + " in readonly mode " , ErrorCodes : : DATABASE_ACCESS_DENIED ) ;
2019-09-04 11:11:30 +00:00
2020-09-17 19:50:43 +00:00
std : : string path_new = getTablePath ( base_path + new_path_to_table_data , format_name ) ;
if ( path_new = = paths [ 0 ] )
return ;
2017-04-01 07:20:54 +00:00
Poco : : File ( Poco : : Path ( path_new ) . parent ( ) ) . createDirectories ( ) ;
2019-09-04 11:11:30 +00:00
Poco : : File ( paths [ 0 ] ) . renameTo ( path_new ) ;
2016-10-18 14:18:37 +00:00
2019-09-04 11:11:30 +00:00
paths [ 0 ] = std : : move ( path_new ) ;
2020-04-07 14:05:51 +00:00
renameInMemory ( new_table_id ) ;
2016-10-18 14:18:37 +00:00
}
2020-06-18 10:29:13 +00:00
void StorageFile : : truncate (
const ASTPtr & /*query*/ ,
const StorageMetadataPtr & /* metadata_snapshot */ ,
const Context & /* context */ ,
2020-06-18 16:10:47 +00:00
TableExclusiveLockHolder & )
2020-01-05 02:57:09 +00:00
{
if ( paths . size ( ) ! = 1 )
2020-01-09 14:33:58 +00:00
throw Exception ( " Can't truncate table ' " + getStorageID ( ) . getNameForLogs ( ) + " ' in readonly mode " , ErrorCodes : : DATABASE_ACCESS_DENIED ) ;
2020-01-05 02:57:09 +00:00
if ( use_table_fd )
{
if ( 0 ! = : : ftruncate ( table_fd , 0 ) )
throwFromErrno ( " Cannot truncate file at fd " + toString ( table_fd ) , ErrorCodes : : CANNOT_TRUNCATE_FILE ) ;
}
else
{
2020-01-05 20:11:26 +00:00
if ( ! Poco : : File ( paths [ 0 ] ) . exists ( ) )
return ;
2020-01-05 02:57:09 +00:00
if ( 0 ! = : : truncate ( paths [ 0 ] . c_str ( ) , 0 ) )
throwFromErrnoWithPath ( " Cannot truncate file " + paths [ 0 ] , paths [ 0 ] , ErrorCodes : : CANNOT_TRUNCATE_FILE ) ;
}
}
2017-12-30 00:36:06 +00:00
void registerStorageFile ( StorageFactory & factory )
{
2020-11-02 07:50:38 +00:00
StorageFactory : : StorageFeatures storage_features {
. supports_settings = true ,
. source_access_type = AccessType : : FILE
} ;
2020-04-06 05:19:40 +00:00
factory . registerStorage (
" File " ,
2020-11-02 07:50:38 +00:00
[ ] ( const StorageFactory : : Arguments & factory_args )
2020-04-06 05:19:40 +00:00
{
2020-11-02 07:50:38 +00:00
StorageFile : : CommonArguments storage_args {
. table_id = factory_args . table_id ,
. columns = factory_args . columns ,
. constraints = factory_args . constraints ,
. context = factory_args . context
} ;
2017-12-30 00:36:06 +00:00
2020-11-02 07:50:38 +00:00
ASTs & engine_args_ast = factory_args . engine_args ;
2017-12-30 00:36:06 +00:00
2020-11-02 07:50:38 +00:00
if ( ! ( engine_args_ast . size ( ) > = 1 & & engine_args_ast . size ( ) < = 3 ) ) // NOLINT
2020-04-06 05:19:40 +00:00
throw Exception (
" Storage File requires from 1 to 3 arguments: name of used format, source and compression_method. " ,
ErrorCodes : : NUMBER_OF_ARGUMENTS_DOESNT_MATCH ) ;
2017-12-30 00:36:06 +00:00
2020-11-02 07:50:38 +00:00
engine_args_ast [ 0 ] = evaluateConstantExpressionOrIdentifierAsLiteral ( engine_args_ast [ 0 ] , factory_args . local_context ) ;
storage_args . format_name = engine_args_ast [ 0 ] - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
2017-12-30 00:36:06 +00:00
2020-11-05 11:28:20 +00:00
// Use format settings from global server context + settings from
// the SETTINGS clause of the create query. Settings from current
// session and user are ignored.
2020-11-02 07:50:38 +00:00
if ( factory_args . storage_def - > settings )
{
2020-11-07 08:53:39 +00:00
FormatFactorySettings user_format_settings ;
2019-10-30 14:17:55 +00:00
2020-11-07 08:53:39 +00:00
// Apply changed settings from global context, but ignore the
// unknown ones, because we only have the format settings here.
const auto & changes = factory_args . context . getSettingsRef ( ) . changes ( ) ;
for ( const auto & change : changes )
{
if ( user_format_settings . has ( change . name ) )
{
user_format_settings . set ( change . name , change . value ) ;
}
}
// Apply changes from SETTINGS clause, with validation.
user_format_settings . applyChanges (
2020-11-02 07:50:38 +00:00
factory_args . storage_def - > settings - > changes ) ;
2017-12-30 00:36:06 +00:00
2020-11-02 07:50:38 +00:00
storage_args . format_settings = getFormatSettings (
2020-11-07 08:53:39 +00:00
factory_args . context , user_format_settings ) ;
2020-11-02 07:50:38 +00:00
}
else
{
storage_args . format_settings = getFormatSettings (
2020-11-05 11:28:20 +00:00
factory_args . context ) ;
2020-11-02 07:50:38 +00:00
}
2019-10-30 14:17:55 +00:00
2020-11-02 07:50:38 +00:00
if ( engine_args_ast . size ( ) = = 1 ) /// Table in database
return StorageFile : : create ( factory_args . relative_data_path , storage_args ) ;
2019-10-30 14:17:55 +00:00
2020-04-06 05:19:40 +00:00
/// Will use FD if engine_args[1] is int literal or identifier with std* name
int source_fd = - 1 ;
String source_path ;
2019-11-25 13:01:16 +00:00
2020-11-02 07:50:38 +00:00
if ( auto opt_name = tryGetIdentifierName ( engine_args_ast [ 1 ] ) )
2020-04-06 05:19:40 +00:00
{
if ( * opt_name = = " stdin " )
source_fd = STDIN_FILENO ;
else if ( * opt_name = = " stdout " )
source_fd = STDOUT_FILENO ;
else if ( * opt_name = = " stderr " )
source_fd = STDERR_FILENO ;
else
throw Exception (
" Unknown identifier ' " + * opt_name + " ' in second arg of File storage constructor " , ErrorCodes : : UNKNOWN_IDENTIFIER ) ;
}
2020-11-02 07:50:38 +00:00
else if ( const auto * literal = engine_args_ast [ 1 ] - > as < ASTLiteral > ( ) )
2020-04-06 05:19:40 +00:00
{
auto type = literal - > value . getType ( ) ;
if ( type = = Field : : Types : : Int64 )
source_fd = static_cast < int > ( literal - > value . get < Int64 > ( ) ) ;
else if ( type = = Field : : Types : : UInt64 )
source_fd = static_cast < int > ( literal - > value . get < UInt64 > ( ) ) ;
else if ( type = = Field : : Types : : String )
source_path = literal - > value . get < String > ( ) ;
else
throw Exception ( " Second argument must be path or file descriptor " , ErrorCodes : : BAD_ARGUMENTS ) ;
}
2020-11-02 07:50:38 +00:00
if ( engine_args_ast . size ( ) = = 3 )
2020-04-06 05:19:40 +00:00
{
2020-11-02 07:50:38 +00:00
engine_args_ast [ 2 ] = evaluateConstantExpressionOrIdentifierAsLiteral ( engine_args_ast [ 2 ] , factory_args . local_context ) ;
storage_args . compression_method = engine_args_ast [ 2 ] - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
2020-04-06 05:19:40 +00:00
}
2019-10-30 14:17:55 +00:00
else
2020-11-02 07:50:38 +00:00
storage_args . compression_method = " auto " ;
2017-12-30 00:36:06 +00:00
2020-04-06 05:19:40 +00:00
if ( 0 < = source_fd ) /// File descriptor
2020-11-02 07:50:38 +00:00
return StorageFile : : create ( source_fd , storage_args ) ;
2020-04-06 05:19:40 +00:00
else /// User's file
2020-11-02 07:50:38 +00:00
return StorageFile : : create ( source_path , factory_args . context . getUserFilesPath ( ) , storage_args ) ;
2020-04-06 05:19:40 +00:00
} ,
2020-11-02 07:50:38 +00:00
storage_features ) ;
2017-12-30 00:36:06 +00:00
}
2020-11-02 07:50:38 +00:00
2020-04-28 10:38:57 +00:00
NamesAndTypesList StorageFile : : getVirtuals ( ) const
2020-04-27 13:55:30 +00:00
{
2020-04-28 10:38:57 +00:00
return NamesAndTypesList {
2020-04-27 13:55:30 +00:00
{ " _path " , std : : make_shared < DataTypeString > ( ) } ,
{ " _file " , std : : make_shared < DataTypeString > ( ) }
} ;
}
2021-03-30 17:57:21 +00:00
}