2018-08-20 02:34:00 +00:00
# include <boost/program_options.hpp>
# include <DataStreams/AsynchronousBlockInputStream.h>
# include <DataTypes/DataTypeFactory.h>
# include <Interpreters/Context.h>
# include <IO/copyData.h>
# include <IO/ReadBufferFromIStream.h>
# include <IO/ReadBufferFromFile.h>
# include <IO/LimitReadBuffer.h>
# include <Storages/StorageMemory.h>
2020-02-19 15:09:32 +00:00
# include <Processors/Sources/SourceFromInputStream.h>
# include <Processors/Pipe.h>
# include <Processors/Sources/SinkToOutputStream.h>
# include <Processors/Executors/PipelineExecutor.h>
2020-03-19 23:48:53 +00:00
# include <Core/ExternalTable.h>
# include <Poco/Net/MessageHeader.h>
# include <common/find_symbols.h>
2018-08-20 02:34:00 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS ;
}
2020-02-19 15:09:32 +00:00
ExternalTableDataPtr BaseExternalTable : : getData ( const Context & context )
2018-08-20 02:34:00 +00:00
{
initReadBuffer ( ) ;
initSampleBlock ( ) ;
auto input = context . getInputFormat ( format , * read_buffer , sample_block , DEFAULT_BLOCK_SIZE ) ;
2020-02-19 15:09:32 +00:00
auto stream = std : : make_shared < AsynchronousBlockInputStream > ( input ) ;
auto data = std : : make_unique < ExternalTableData > ( ) ;
data - > table_name = name ;
data - > pipe = std : : make_unique < Pipe > ( std : : make_shared < SourceFromInputStream > ( std : : move ( stream ) ) ) ;
return data ;
2018-08-20 02:34:00 +00:00
}
void BaseExternalTable : : clean ( )
{
2020-03-19 23:48:53 +00:00
name . clear ( ) ;
file . clear ( ) ;
format . clear ( ) ;
2018-08-20 02:34:00 +00:00
structure . clear ( ) ;
2020-03-19 23:48:53 +00:00
sample_block . clear ( ) ;
2018-08-20 02:34:00 +00:00
read_buffer . reset ( ) ;
}
/// Function for debugging information output
void BaseExternalTable : : write ( )
{
std : : cerr < < " file " < < file < < std : : endl ;
std : : cerr < < " name " < < name < < std : : endl ;
std : : cerr < < " format " < < format < < std : : endl ;
std : : cerr < < " structure: \n " ;
2020-03-08 22:08:39 +00:00
for ( const auto & elem : structure )
2020-03-19 23:48:53 +00:00
std : : cerr < < ' \t ' < < elem . first < < ' ' < < elem . second < < std : : endl ;
2018-08-20 02:34:00 +00:00
}
void BaseExternalTable : : parseStructureFromStructureField ( const std : : string & argument )
{
2020-03-19 23:48:53 +00:00
std : : vector < std : : string > vals ;
splitInto < ' ' , ' , ' > ( vals , argument , true ) ;
2018-08-20 02:34:00 +00:00
2020-03-21 02:52:37 +00:00
if ( vals . size ( ) % 2 ! = 0 )
throw Exception ( " Odd number of attributes in section structure: " + std : : to_string ( vals . size ( ) ) , ErrorCodes : : BAD_ARGUMENTS ) ;
2018-08-20 02:34:00 +00:00
for ( size_t i = 0 ; i < vals . size ( ) ; i + = 2 )
structure . emplace_back ( vals [ i ] , vals [ i + 1 ] ) ;
}
void BaseExternalTable : : parseStructureFromTypesField ( const std : : string & argument )
{
2020-03-19 23:48:53 +00:00
std : : vector < std : : string > vals ;
splitInto < ' ' , ' , ' > ( vals , argument , true ) ;
2018-08-20 02:34:00 +00:00
for ( size_t i = 0 ; i < vals . size ( ) ; + + i )
structure . emplace_back ( " _ " + toString ( i + 1 ) , vals [ i ] ) ;
}
void BaseExternalTable : : initSampleBlock ( )
{
const DataTypeFactory & data_type_factory = DataTypeFactory : : instance ( ) ;
2020-03-08 22:08:39 +00:00
for ( const auto & elem : structure )
2018-08-20 02:34:00 +00:00
{
ColumnWithTypeAndName column ;
2020-03-08 22:08:39 +00:00
column . name = elem . first ;
column . type = data_type_factory . get ( elem . second ) ;
2018-08-20 02:34:00 +00:00
column . column = column . type - > createColumn ( ) ;
sample_block . insert ( std : : move ( column ) ) ;
}
}
void ExternalTable : : initReadBuffer ( )
{
if ( file = = " - " )
read_buffer = std : : make_unique < ReadBufferFromFileDescriptor > ( STDIN_FILENO ) ;
else
read_buffer = std : : make_unique < ReadBufferFromFile > ( file ) ;
}
ExternalTable : : ExternalTable ( const boost : : program_options : : variables_map & external_options )
{
if ( external_options . count ( " file " ) )
file = external_options [ " file " ] . as < std : : string > ( ) ;
else
throw Exception ( " --file field have not been provided for external table " , ErrorCodes : : BAD_ARGUMENTS ) ;
if ( external_options . count ( " name " ) )
name = external_options [ " name " ] . as < std : : string > ( ) ;
else
throw Exception ( " --name field have not been provided for external table " , ErrorCodes : : BAD_ARGUMENTS ) ;
if ( external_options . count ( " format " ) )
format = external_options [ " format " ] . as < std : : string > ( ) ;
else
throw Exception ( " --format field have not been provided for external table " , ErrorCodes : : BAD_ARGUMENTS ) ;
if ( external_options . count ( " structure " ) )
parseStructureFromStructureField ( external_options [ " structure " ] . as < std : : string > ( ) ) ;
else if ( external_options . count ( " types " ) )
parseStructureFromTypesField ( external_options [ " types " ] . as < std : : string > ( ) ) ;
else
throw Exception ( " Neither --structure nor --types have not been provided for external table " , ErrorCodes : : BAD_ARGUMENTS ) ;
}
void ExternalTablesHandler : : handlePart ( const Poco : : Net : : MessageHeader & header , std : : istream & stream )
{
const Settings & settings = context . getSettingsRef ( ) ;
/// The buffer is initialized here, not in the virtual function initReadBuffer
read_buffer_impl = std : : make_unique < ReadBufferFromIStream > ( stream ) ;
if ( settings . http_max_multipart_form_data_size )
read_buffer = std : : make_unique < LimitReadBuffer > (
* read_buffer_impl , settings . http_max_multipart_form_data_size ,
true , " the maximum size of multipart/form-data. This limit can be tuned by 'http_max_multipart_form_data_size' setting " ) ;
else
read_buffer = std : : move ( read_buffer_impl ) ;
/// Retrieve a collection of parameters from MessageHeader
Poco : : Net : : NameValueCollection content ;
std : : string label ;
Poco : : Net : : MessageHeader : : splitParameters ( header . get ( " Content-Disposition " ) , label , content ) ;
/// Get parameters
name = content . get ( " name " , " _data " ) ;
format = params . get ( name + " _format " , " TabSeparated " ) ;
if ( params . has ( name + " _structure " ) )
parseStructureFromStructureField ( params . get ( name + " _structure " ) ) ;
else if ( params . has ( name + " _types " ) )
parseStructureFromTypesField ( params . get ( name + " _types " ) ) ;
else
throw Exception ( " Neither structure nor types have not been provided for external table " + name + " . Use fields " + name + " _structure or " + name + " _types to do so. " , ErrorCodes : : BAD_ARGUMENTS ) ;
2020-02-19 15:09:32 +00:00
ExternalTableDataPtr data = getData ( context ) ;
2018-08-20 02:34:00 +00:00
/// Create table
NamesAndTypesList columns = sample_block . getNamesAndTypesList ( ) ;
2020-05-29 02:08:48 +00:00
auto temporary_table = TemporaryTableHolder ( context , ColumnsDescription { columns } , { } ) ;
2020-03-10 19:36:17 +00:00
auto storage = temporary_table . getTable ( ) ;
context . addExternalTable ( data - > table_name , std : : move ( temporary_table ) ) ;
2020-06-15 19:08:58 +00:00
BlockOutputStreamPtr output = storage - > write ( ASTPtr ( ) , storage - > getInMemoryMetadataPtr ( ) , context ) ;
2018-08-20 02:34:00 +00:00
/// Write data
2020-10-12 09:30:05 +00:00
data - > pipe - > resize ( 1 ) ;
2020-08-06 12:24:05 +00:00
2020-02-19 15:09:32 +00:00
auto sink = std : : make_shared < SinkToOutputStream > ( std : : move ( output ) ) ;
2020-08-06 12:24:05 +00:00
connect ( * data - > pipe - > getOutputPort ( 0 ) , sink - > getPort ( ) ) ;
2020-02-19 15:09:32 +00:00
2020-08-06 12:24:05 +00:00
auto processors = Pipe : : detachProcessors ( std : : move ( * data - > pipe ) ) ;
2020-02-19 15:09:32 +00:00
processors . push_back ( std : : move ( sink ) ) ;
auto executor = std : : make_shared < PipelineExecutor > ( processors ) ;
executor - > execute ( /*num_threads = */ 1 ) ;
2018-08-20 02:34:00 +00:00
/// We are ready to receive the next file, for this we clear all the information received
clean ( ) ;
}
}