2014-04-08 13:43:20 +00:00
# pragma once
# include <boost/program_options.hpp>
2014-04-22 18:35:40 +00:00
# include <boost/algorithm/string.hpp>
2017-04-01 09:19:00 +00:00
# include <DataStreams/AsynchronousBlockInputStream.h>
# include <DataTypes/DataTypeFactory.h>
# include <Interpreters/Context.h>
# include <IO/copyData.h>
# include <IO/ReadBufferFromIStream.h>
# include <IO/ReadBufferFromFile.h>
# include <Storages/StorageMemory.h>
# include <Client/Connection.h>
2014-04-08 13:43:20 +00:00
# include <Poco/Net/HTMLForm.h>
# include <Poco/Net/PartHandler.h>
# include <Poco/Net/MessageHeader.h>
2017-04-01 09:19:00 +00:00
# include <Common/HTMLForm.h>
2014-04-08 13:43:20 +00:00
namespace DB
{
2017-04-08 01:32:05 +00:00
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS ;
}
2017-05-07 20:25:26 +00:00
/// The base class containing the basic information about external table and
/// basic functions for extracting this information from text fields.
2014-04-08 13:43:20 +00:00
class BaseExternalTable
{
public :
2017-05-07 20:25:26 +00:00
std : : string file ; /// File with data or '-' if stdin
std : : string name ; /// The name of the table
std : : string format ; /// Name of the data storage format
2017-04-01 07:20:54 +00:00
2017-05-07 20:25:26 +00:00
/// Description of the table structure: (column name, data type name)
2017-04-01 07:20:54 +00:00
std : : vector < std : : pair < std : : string , std : : string > > structure ;
std : : unique_ptr < ReadBuffer > read_buffer ;
Block sample_block ;
virtual ~ BaseExternalTable ( ) { } ;
2017-05-07 20:25:26 +00:00
/// Initialize read_buffer, depending on the data source. By default, does nothing.
2017-04-01 07:20:54 +00:00
virtual void initReadBuffer ( ) { } ;
2017-05-07 20:25:26 +00:00
/// Initialize sample_block according to the structure of the table stored in the `structure`
2017-04-01 07:20:54 +00:00
virtual void initSampleBlock ( const Context & context )
{
const DataTypeFactory & data_type_factory = DataTypeFactory : : instance ( ) ;
for ( size_t i = 0 ; i < structure . size ( ) ; + + i )
{
ColumnWithTypeAndName column ;
column . name = structure [ i ] . first ;
column . type = data_type_factory . get ( structure [ i ] . second ) ;
column . column = column . type - > createColumn ( ) ;
sample_block . insert ( std : : move ( column ) ) ;
}
}
2017-05-07 20:25:26 +00:00
/// Get the table data - a pair (a thread with the contents of the table, the name of the table)
2017-04-01 07:20:54 +00:00
virtual ExternalTableData getData ( const Context & context )
{
initReadBuffer ( ) ;
initSampleBlock ( context ) ;
ExternalTableData res = std : : make_pair ( std : : make_shared < AsynchronousBlockInputStream > ( context . getInputFormat (
format , * read_buffer , sample_block , DEFAULT_BLOCK_SIZE ) ) , name ) ;
return res ;
}
2014-04-08 13:43:20 +00:00
protected :
2017-05-07 20:25:26 +00:00
/// Clear all accumulated information
2017-04-01 07:20:54 +00:00
void clean ( )
{
name = " " ;
file = " " ;
format = " " ;
structure . clear ( ) ;
sample_block = Block ( ) ;
read_buffer . reset ( ) ;
}
2017-05-07 20:25:26 +00:00
/// Function for debugging information output
2017-04-01 07:20:54 +00:00
void write ( )
{
std : : cerr < < " file " < < file < < std : : endl ;
std : : cerr < < " name " < < name < < std : : endl ;
std : : cerr < < " format " < < format < < std : : endl ;
std : : cerr < < " structure: \n " ;
for ( size_t i = 0 ; i < structure . size ( ) ; + + i )
std : : cerr < < " \t " < < structure [ i ] . first < < " " < < structure [ i ] . second < < std : : endl ;
}
static std : : vector < std : : string > split ( const std : : string & s , const std : : string & d )
{
std : : vector < std : : string > res ;
boost : : split ( res , s , boost : : algorithm : : is_any_of ( d ) , boost : : algorithm : : token_compress_on ) ;
return res ;
}
2017-05-07 20:25:26 +00:00
/// Construct the `structure` vector from the text field `structure`
2017-04-01 07:20:54 +00:00
virtual void parseStructureFromStructureField ( const std : : string & argument )
{
std : : vector < std : : string > vals = split ( argument , " , " ) ;
if ( vals . size ( ) & 1 )
throw Exception ( " Odd number of attributes in section structure " , ErrorCodes : : BAD_ARGUMENTS ) ;
for ( size_t i = 0 ; i < vals . size ( ) ; i + = 2 )
structure . emplace_back ( vals [ i ] , vals [ i + 1 ] ) ;
}
2017-05-07 20:25:26 +00:00
/// Construct the `structure` vector from the text field `types`
2017-04-01 07:20:54 +00:00
virtual void parseStructureFromTypesField ( const std : : string & argument )
{
std : : vector < std : : string > vals = split ( argument , " , " ) ;
for ( size_t i = 0 ; i < vals . size ( ) ; + + i )
structure . emplace_back ( " _ " + toString ( i + 1 ) , vals [ i ] ) ;
}
2014-04-08 13:43:20 +00:00
} ;
2017-05-07 20:25:26 +00:00
/// Parsing of external table used in the tcp client.
2014-04-08 13:43:20 +00:00
class ExternalTable : public BaseExternalTable
{
public :
2017-04-01 07:20:54 +00:00
void initReadBuffer ( )
{
if ( file = = " - " )
read_buffer = std : : make_unique < ReadBufferFromFileDescriptor > ( STDIN_FILENO ) ;
else
read_buffer = std : : make_unique < ReadBufferFromFile > ( file ) ;
}
2017-05-07 20:25:26 +00:00
/// Extract parameters from variables_map, which is built on the client command line
2017-04-01 07:20:54 +00:00
ExternalTable ( const boost : : program_options : : variables_map & external_options )
{
if ( external_options . count ( " file " ) )
file = external_options [ " file " ] . as < std : : string > ( ) ;
else
throw Exception ( " --file field have not been provided for external table " , ErrorCodes : : BAD_ARGUMENTS ) ;
if ( external_options . count ( " name " ) )
name = external_options [ " name " ] . as < std : : string > ( ) ;
else
throw Exception ( " --name field have not been provided for external table " , ErrorCodes : : BAD_ARGUMENTS ) ;
if ( external_options . count ( " format " ) )
format = external_options [ " format " ] . as < std : : string > ( ) ;
else
throw Exception ( " --format field have not been provided for external table " , ErrorCodes : : BAD_ARGUMENTS ) ;
if ( external_options . count ( " structure " ) )
parseStructureFromStructureField ( external_options [ " structure " ] . as < std : : string > ( ) ) ;
else if ( external_options . count ( " types " ) )
parseStructureFromTypesField ( external_options [ " types " ] . as < std : : string > ( ) ) ;
else
throw Exception ( " Neither --structure nor --types have not been provided for external table " , ErrorCodes : : BAD_ARGUMENTS ) ;
}
2014-04-08 13:43:20 +00:00
} ;
2017-05-07 20:25:26 +00:00
/// Parsing of external table used when sending tables via http
/// The `handlePart` function will be called for each table passed,
/// so it's also necessary to call `clean` at the end of the `handlePart`.
2014-04-08 13:43:20 +00:00
class ExternalTablesHandler : public Poco : : Net : : PartHandler , BaseExternalTable
{
public :
2017-04-01 07:20:54 +00:00
std : : vector < std : : string > names ;
ExternalTablesHandler ( Context & context_ , Poco : : Net : : NameValueCollection params_ ) : context ( context_ ) , params ( params_ ) { }
void handlePart ( const Poco : : Net : : MessageHeader & header , std : : istream & stream )
{
2017-05-07 20:25:26 +00:00
/// The buffer is initialized here, not in the virtual function initReadBuffer
2017-04-01 07:20:54 +00:00
read_buffer = std : : make_unique < ReadBufferFromIStream > ( stream ) ;
2017-05-07 20:25:26 +00:00
/// Retrieve a collection of parameters from MessageHeader
2017-04-01 07:20:54 +00:00
Poco : : Net : : NameValueCollection content ;
std : : string label ;
Poco : : Net : : MessageHeader : : splitParameters ( header . get ( " Content-Disposition " ) , label , content ) ;
2017-05-07 20:25:26 +00:00
/// Get parameters
2017-04-01 07:20:54 +00:00
name = content . get ( " name " , " _data " ) ;
format = params . get ( name + " _format " , " TabSeparated " ) ;
if ( params . has ( name + " _structure " ) )
parseStructureFromStructureField ( params . get ( name + " _structure " ) ) ;
else if ( params . has ( name + " _types " ) )
parseStructureFromTypesField ( params . get ( name + " _types " ) ) ;
else
throw Exception ( " Neither structure nor types have not been provided for external table " + name + " . Use fields " + name + " _structure or " + name + " _types to do so. " , ErrorCodes : : BAD_ARGUMENTS ) ;
ExternalTableData data = getData ( context ) ;
2017-05-07 20:25:26 +00:00
/// Create table
2017-04-01 07:20:54 +00:00
NamesAndTypesListPtr columns = std : : make_shared < NamesAndTypesList > ( sample_block . getColumnsList ( ) ) ;
StoragePtr storage = StorageMemory : : create ( data . second , columns ) ;
2017-06-06 17:06:14 +00:00
storage - > startup ( ) ;
2017-04-01 07:20:54 +00:00
context . addExternalTable ( data . second , storage ) ;
BlockOutputStreamPtr output = storage - > write ( ASTPtr ( ) , context . getSettingsRef ( ) ) ;
2017-05-07 20:25:26 +00:00
/// Write data
2017-04-01 07:20:54 +00:00
data . first - > readPrefix ( ) ;
output - > writePrefix ( ) ;
while ( Block block = data . first - > read ( ) )
output - > write ( block ) ;
data . first - > readSuffix ( ) ;
output - > writeSuffix ( ) ;
names . push_back ( name ) ;
2017-05-07 20:25:26 +00:00
/// We are ready to receive the next file, for this we clear all the information received
2017-04-01 07:20:54 +00:00
clean ( ) ;
}
2014-04-08 13:43:20 +00:00
private :
2017-04-01 07:20:54 +00:00
Context & context ;
Poco : : Net : : NameValueCollection params ;
2014-04-08 13:43:20 +00:00
} ;
}