2017-04-01 09:19:00 +00:00
# include <Storages/StorageFile.h>
2016-10-18 14:18:37 +00:00
2017-04-01 09:19:00 +00:00
# include <Interpreters/Context.h>
# include <IO/ReadBufferFromFile.h>
# include <IO/WriteBufferFromFile.h>
# include <IO/WriteHelpers.h>
# include <DataStreams/FormatFactory.h>
# include <DataStreams/IProfilingBlockInputStream.h>
# include <DataStreams/IBlockOutputStream.h>
2016-10-18 14:18:37 +00:00
2017-04-01 09:19:00 +00:00
# include <Common/escapeForFileName.h>
2016-10-18 14:18:37 +00:00
2016-10-25 13:49:07 +00:00
# include <fcntl.h>
2016-10-18 14:18:37 +00:00
namespace DB
{
2016-10-28 17:38:32 +00:00
namespace ErrorCodes
{
2017-04-01 07:20:54 +00:00
extern const int CANNOT_WRITE_TO_FILE_DESCRIPTOR ;
extern const int CANNOT_SEEK_THROUGH_FILE ;
extern const int DATABASE_ACCESS_DENIED ;
2016-10-28 17:38:32 +00:00
} ;
2016-10-18 14:18:37 +00:00
static std : : string getTablePath ( const std : : string & db_dir_path , const std : : string & table_name , const std : : string & format_name )
{
2017-04-01 07:20:54 +00:00
return db_dir_path + escapeForFileName ( table_name ) + " /data. " + escapeForFileName ( format_name ) ;
2016-11-11 17:01:02 +00:00
}
static void checkCreationIsAllowed ( Context & context_global )
{
2017-04-01 07:20:54 +00:00
if ( context_global . getApplicationType ( ) = = Context : : ApplicationType : : SERVER )
throw Exception ( " Using file descriptor or user specified path as source of storage isn't allowed for server daemons " , ErrorCodes : : DATABASE_ACCESS_DENIED ) ;
2016-10-18 14:18:37 +00:00
}
StorageFile : : StorageFile (
2017-04-01 07:20:54 +00:00
const std : : string & table_path_ ,
int table_fd_ ,
const std : : string & db_dir_path ,
const std : : string & table_name_ ,
const std : : string & format_name_ ,
const NamesAndTypesListPtr & columns_ ,
const NamesAndTypesList & materialized_columns_ ,
const NamesAndTypesList & alias_columns_ ,
const ColumnDefaults & column_defaults_ ,
Context & context_ )
: IStorage ( materialized_columns_ , alias_columns_ , column_defaults_ ) ,
table_name ( table_name_ ) , format_name ( format_name_ ) , columns ( columns_ ) , context_global ( context_ ) , table_fd ( table_fd_ )
2016-10-18 14:18:37 +00:00
{
2017-04-01 07:20:54 +00:00
if ( table_fd < 0 ) /// Will use file
{
use_table_fd = false ;
if ( ! table_path_ . empty ( ) ) /// Is user's file
{
checkCreationIsAllowed ( context_global ) ;
path = Poco : : Path ( table_path_ ) . absolute ( ) . toString ( ) ;
is_db_table = false ;
}
else /// Is DB's file
{
path = getTablePath ( db_dir_path , table_name , format_name ) ;
is_db_table = true ;
Poco : : File ( Poco : : Path ( path ) . parent ( ) ) . createDirectories ( ) ;
}
}
else /// Will use FD
{
checkCreationIsAllowed ( context_global ) ;
is_db_table = false ;
use_table_fd = true ;
/// Save initial offset, it will be used for repeating SELECTs
/// If FD isn't seekable (lseek returns -1), then the second and subsequent SELECTs will fail.
table_fd_init_offset = lseek ( table_fd , 0 , SEEK_CUR ) ;
}
2016-10-18 14:18:37 +00:00
}
class StorageFileBlockInputStream : public IProfilingBlockInputStream
{
public :
2017-04-01 07:20:54 +00:00
StorageFileBlockInputStream ( StorageFile & storage_ , const Context & context , size_t max_block_size )
: storage ( storage_ ) , lock ( storage . rwlock , storage . use_table_fd )
{
if ( storage . use_table_fd )
{
/// We could use common ReadBuffer and WriteBuffer in storage to leverage cache
/// and add ability to seek unseekable files, but cache sync isn't supported.
if ( storage . table_fd_was_used ) /// We need seek to initial position
{
if ( storage . table_fd_init_offset < 0 )
throw Exception ( " File descriptor isn't seekable, inside " + storage . getName ( ) , ErrorCodes : : CANNOT_SEEK_THROUGH_FILE ) ;
/// ReadBuffer's seek() doesn't make sence, since cache is empty
if ( lseek ( storage . table_fd , storage . table_fd_init_offset , SEEK_SET ) < 0 )
throwFromErrno ( " Cannot seek file descriptor, inside " + storage . getName ( ) , ErrorCodes : : CANNOT_SEEK_THROUGH_FILE ) ;
}
storage . table_fd_was_used = true ;
read_buf = std : : make_unique < ReadBufferFromFileDescriptor > ( storage . table_fd ) ;
}
else
{
read_buf = std : : make_unique < ReadBufferFromFile > ( storage . path ) ;
}
reader = FormatFactory ( ) . getInput ( storage . format_name , * read_buf , storage . getSampleBlock ( ) , context , max_block_size ) ;
}
String getName ( ) const override
{
return storage . getName ( ) ;
}
String getID ( ) const override
{
std : : stringstream res_stream ;
res_stream < < " File( " < < storage . format_name < < " , " ;
if ( ! storage . path . empty ( ) )
res_stream < < storage . path ;
else
res_stream < < storage . table_fd ;
res_stream < < " ) " ;
return res_stream . str ( ) ;
}
Block readImpl ( ) override
{
return reader - > read ( ) ;
}
void readPrefixImpl ( ) override
{
reader - > readPrefix ( ) ;
}
void readSuffixImpl ( ) override
{
reader - > readSuffix ( ) ;
}
2016-10-18 14:18:37 +00:00
private :
2017-04-01 07:20:54 +00:00
StorageFile & storage ;
Poco : : ScopedRWLock lock ;
Block sample_block ;
std : : unique_ptr < ReadBufferFromFileDescriptor > read_buf ;
BlockInputStreamPtr reader ;
2016-10-18 14:18:37 +00:00
} ;
BlockInputStreams StorageFile : : read (
2017-04-01 07:20:54 +00:00
const Names & column_names ,
2017-05-25 01:12:41 +00:00
const ASTPtr & query ,
2017-04-01 07:20:54 +00:00
const Context & context ,
QueryProcessingStage : : Enum & processed_stage ,
size_t max_block_size ,
2017-06-02 15:54:39 +00:00
unsigned num_streams )
2016-10-18 14:18:37 +00:00
{
2017-04-01 07:20:54 +00:00
return BlockInputStreams ( 1 , std : : make_shared < StorageFileBlockInputStream > ( * this , context , max_block_size ) ) ;
2016-10-18 14:18:37 +00:00
}
class StorageFileBlockOutputStream : public IBlockOutputStream
{
public :
2017-04-01 07:20:54 +00:00
StorageFileBlockOutputStream ( StorageFile & storage_ )
: storage ( storage_ ) , lock ( storage . rwlock )
{
if ( storage . use_table_fd )
{
/** NOTE: Using real file binded to FD may be misleading:
* SELECT * ; INSERT insert_data ; SELECT * ; last SELECT returns initil_fd_data + insert_data
* INSERT data ; SELECT * ; last SELECT returns only insert_data
*/
storage . table_fd_was_used = true ;
write_buf = std : : make_unique < WriteBufferFromFileDescriptor > ( storage . table_fd ) ;
}
else
{
write_buf = std : : make_unique < WriteBufferFromFile > ( storage . path , DBMS_DEFAULT_BUFFER_SIZE , O_WRONLY | O_APPEND | O_CREAT ) ;
}
writer = FormatFactory ( ) . getOutput ( storage . format_name , * write_buf , storage . getSampleBlock ( ) , storage . context_global ) ;
}
void write ( const Block & block ) override
{
writer - > write ( block ) ;
}
void writePrefix ( ) override
{
writer - > writePrefix ( ) ;
}
void writeSuffix ( ) override
{
writer - > writeSuffix ( ) ;
}
void flush ( ) override
{
writer - > flush ( ) ;
}
2016-10-18 14:18:37 +00:00
private :
2017-04-01 07:20:54 +00:00
StorageFile & storage ;
Poco : : ScopedWriteRWLock lock ;
std : : unique_ptr < WriteBufferFromFileDescriptor > write_buf ;
BlockOutputStreamPtr writer ;
2016-10-18 14:18:37 +00:00
} ;
BlockOutputStreamPtr StorageFile : : write (
2017-05-21 22:25:25 +00:00
const ASTPtr & query ,
2017-04-01 07:20:54 +00:00
const Settings & settings )
2016-10-18 14:18:37 +00:00
{
2017-04-01 07:20:54 +00:00
return std : : make_shared < StorageFileBlockOutputStream > ( * this ) ;
2016-10-18 14:18:37 +00:00
}
void StorageFile : : drop ( )
{
2017-04-01 07:20:54 +00:00
/// Extra actions are not required.
2016-10-18 14:18:37 +00:00
}
void StorageFile : : rename ( const String & new_path_to_db , const String & new_database_name , const String & new_table_name )
{
2017-04-01 07:20:54 +00:00
if ( ! is_db_table )
throw Exception ( " Can't rename table ' " + table_name + " ' binded to user-defined file (or FD) " , ErrorCodes::DATABASE_ACCESS_DENIED) ;
2016-10-18 14:18:37 +00:00
2017-04-01 07:20:54 +00:00
Poco : : ScopedWriteRWLock lock ( rwlock ) ;
2016-10-18 14:18:37 +00:00
2017-04-01 07:20:54 +00:00
std : : string path_new = getTablePath ( new_path_to_db , new_table_name , format_name ) ;
Poco : : File ( Poco : : Path ( path_new ) . parent ( ) ) . createDirectories ( ) ;
Poco : : File ( path ) . renameTo ( path_new ) ;
2016-10-18 14:18:37 +00:00
2017-04-01 07:20:54 +00:00
path = std : : move ( path_new ) ;
2016-10-18 14:18:37 +00:00
}
}