2021-03-05 09:38:00 +00:00
# include "Handlers.h"
2021-03-06 18:21:40 +00:00
# include "SharedLibraryHandlerFactory.h"
2021-03-05 09:38:00 +00:00
# include <DataStreams/copyData.h>
# include <Formats/FormatFactory.h>
# include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
# include <IO/WriteHelpers.h>
# include <IO/ReadHelpers.h>
# include <Poco/Net/HTTPServerRequest.h>
# include <Poco/Net/HTTPServerResponse.h>
# include <Poco/Net/HTMLForm.h>
# include <Poco/ThreadPool.h>
# include <Processors/Formats/InputStreamFromInputFormat.h>
# include <Server/HTTP/HTMLForm.h>
2021-03-24 07:53:15 +00:00
# include <IO/ReadBufferFromString.h>
2021-03-05 09:38:00 +00:00
namespace DB
{
namespace
{
std : : shared_ptr < Block > parseColumns ( std : : string & & column_string )
{
auto sample_block = std : : make_shared < Block > ( ) ;
auto names_and_types = NamesAndTypesList : : parse ( column_string ) ;
for ( const NameAndTypePair & column_data : names_and_types )
sample_block - > insert ( { column_data . type , column_data . name } ) ;
return sample_block ;
}
2021-03-24 07:53:15 +00:00
std : : vector < uint64_t > parseIdsFromBinary ( const std : : string & ids_string )
{
ReadBufferFromString buf ( ids_string ) ;
std : : vector < uint64_t > ids ;
readVectorBinary ( ids , buf ) ;
return ids ;
}
2021-03-24 19:32:31 +00:00
std : : vector < std : : string > parseNamesFromBinary ( const std : : string & names_string )
2021-03-24 07:53:15 +00:00
{
2021-03-24 19:32:31 +00:00
ReadBufferFromString buf ( names_string ) ;
std : : vector < std : : string > names ;
readVectorBinary ( names , buf ) ;
return names ;
2021-03-24 07:53:15 +00:00
}
2021-03-05 09:38:00 +00:00
}
void LibraryRequestHandler : : handleRequest ( HTTPServerRequest & request , HTTPServerResponse & response )
{
LOG_TRACE ( log , " Request URI: {} " , request . getURI ( ) ) ;
HTMLForm params ( request ) ;
if ( ! params . has ( " method " ) )
{
processError ( response , " No 'method' in request URL " ) ;
return ;
}
2021-03-07 11:31:55 +00:00
if ( ! params . has ( " dictionary_id " ) )
{
processError ( response , " No 'dictionary_id in request URL " ) ;
return ;
}
2021-03-05 09:38:00 +00:00
std : : string method = params . get ( " method " ) ;
2021-03-07 11:31:55 +00:00
std : : string dictionary_id = params . get ( " dictionary_id " ) ;
LOG_TRACE ( log , " Library method: '{}', dictionary id: {} " , method , dictionary_id ) ;
2021-03-05 09:38:00 +00:00
WriteBufferFromHTTPServerResponse out ( response , request . getMethod ( ) = = Poco : : Net : : HTTPRequest : : HTTP_HEAD , keep_alive_timeout ) ;
2021-03-07 11:31:55 +00:00
2021-03-05 09:38:00 +00:00
try
{
if ( method = = " libNew " )
{
2021-04-02 15:45:42 +00:00
auto & read_buf = request . getStream ( ) ;
params . read ( read_buf ) ;
2021-03-24 09:23:29 +00:00
2021-03-05 09:38:00 +00:00
if ( ! params . has ( " library_path " ) )
{
processError ( response , " No 'library_path' in request URL " ) ;
return ;
}
if ( ! params . has ( " library_settings " ) )
{
processError ( response , " No 'library_settings' in request URL " ) ;
return ;
}
2021-03-24 09:23:29 +00:00
std : : string library_path = params . get ( " library_path " ) ;
const auto & settings_string = params . get ( " library_settings " ) ;
2021-03-24 19:32:31 +00:00
std : : vector < std : : string > library_settings = parseNamesFromBinary ( settings_string ) ;
2021-03-24 09:23:29 +00:00
2021-04-02 15:45:42 +00:00
/// Needed for library dictionary
2021-03-24 19:32:31 +00:00
if ( ! params . has ( " attributes_names " ) )
2021-03-24 09:23:29 +00:00
{
2021-03-24 19:32:31 +00:00
processError ( response , " No 'attributes_names' in request URL " ) ;
2021-03-24 09:23:29 +00:00
return ;
}
2021-04-02 15:45:42 +00:00
const auto & attributes_string = params . get ( " attributes_names " ) ;
std : : vector < std : : string > attributes_names = parseNamesFromBinary ( attributes_string ) ;
/// Needed to parse block from binary string format
2021-03-24 08:41:42 +00:00
if ( ! params . has ( " sample_block " ) )
{
processError ( response , " No 'sample_block' in request URL " ) ;
return ;
}
2021-04-02 15:45:42 +00:00
std : : string sample_block_string = params . get ( " sample_block " ) ;
2021-03-24 08:41:42 +00:00
std : : shared_ptr < Block > sample_block ;
try
{
2021-04-02 15:45:42 +00:00
sample_block = parseColumns ( std : : move ( sample_block_string ) ) ;
2021-03-24 08:41:42 +00:00
}
catch ( const Exception & ex )
{
processError ( response , " Invalid 'sample_block' parameter in request body ' " + ex . message ( ) + " ' " ) ;
LOG_WARNING ( log , ex . getStackTraceString ( ) ) ;
return ;
}
2021-04-05 13:13:07 +00:00
if ( ! params . has ( " null_values " ) )
2021-04-02 15:45:42 +00:00
{
2021-04-05 13:13:07 +00:00
processError ( response , " No 'null_values' in request URL " ) ;
2021-04-02 15:45:42 +00:00
return ;
}
2021-04-05 13:13:07 +00:00
ReadBufferFromString read_block_buf ( params . get ( " null_values " ) ) ;
2021-04-02 15:45:42 +00:00
auto format = FormatFactory : : instance ( ) . getInput ( FORMAT , read_block_buf , * sample_block , context , DEFAULT_BLOCK_SIZE ) ;
auto reader = std : : make_shared < InputStreamFromInputFormat > ( format ) ;
auto sample_block_with_nulls = reader - > read ( ) ;
2021-04-05 13:13:07 +00:00
LOG_DEBUG ( log , " Dictionary sample block with null values: {} " , sample_block_with_nulls . dumpStructure ( ) ) ;
2021-04-02 15:45:42 +00:00
SharedLibraryHandlerFactory : : instance ( ) . create ( dictionary_id , library_path , library_settings , sample_block_with_nulls , attributes_names ) ;
2021-03-22 15:58:20 +00:00
writeStringBinary ( " 1 " , out ) ;
2021-03-05 09:38:00 +00:00
}
2021-03-05 15:37:43 +00:00
else if ( method = = " libClone " )
2021-03-05 09:38:00 +00:00
{
2021-03-06 18:21:40 +00:00
if ( ! params . has ( " from_dictionary_id " ) )
{
processError ( response , " No 'from_dictionary_id' in request URL " ) ;
return ;
}
std : : string from_dictionary_id = params . get ( " from_dictionary_id " ) ;
LOG_TRACE ( log , " Calling libClone from {} to {} " , from_dictionary_id , dictionary_id ) ;
2021-03-22 15:58:20 +00:00
SharedLibraryHandlerFactory : : instance ( ) . clone ( from_dictionary_id , dictionary_id ) ;
writeStringBinary ( " 1 " , out ) ;
2021-03-06 18:21:40 +00:00
}
else if ( method = = " libDelete " )
{
2021-03-22 15:58:20 +00:00
SharedLibraryHandlerFactory : : instance ( ) . remove ( dictionary_id ) ;
writeStringBinary ( " 1 " , out ) ;
2021-03-05 10:43:47 +00:00
}
else if ( method = = " isModified " )
{
2021-03-06 18:21:40 +00:00
auto library_handler = SharedLibraryHandlerFactory : : instance ( ) . get ( dictionary_id ) ;
bool res = library_handler - > isModified ( ) ;
2021-03-05 10:43:47 +00:00
writeStringBinary ( std : : to_string ( res ) , out ) ;
}
else if ( method = = " supportsSelectiveLoad " )
{
2021-03-06 18:21:40 +00:00
auto library_handler = SharedLibraryHandlerFactory : : instance ( ) . get ( dictionary_id ) ;
bool res = library_handler - > supportsSelectiveLoad ( ) ;
2021-03-05 10:43:47 +00:00
writeStringBinary ( std : : to_string ( res ) , out ) ;
2021-03-05 09:38:00 +00:00
}
else if ( method = = " loadAll " )
{
2021-03-06 18:21:40 +00:00
auto library_handler = SharedLibraryHandlerFactory : : instance ( ) . get ( dictionary_id ) ;
2021-03-24 08:41:42 +00:00
const auto & sample_block = library_handler - > getSampleBlock ( ) ;
2021-03-24 09:23:29 +00:00
auto input = library_handler - > loadAll ( ) ;
2021-04-02 15:45:42 +00:00
2021-03-24 08:41:42 +00:00
BlockOutputStreamPtr output = FormatFactory : : instance ( ) . getOutputStream ( FORMAT , out , sample_block , context ) ;
2021-03-05 10:43:47 +00:00
copyData ( * input , * output ) ;
}
else if ( method = = " loadIds " )
{
2021-03-23 15:41:53 +00:00
params . read ( request . getStream ( ) ) ;
2021-03-05 10:43:47 +00:00
if ( ! params . has ( " ids " ) )
{
processError ( response , " No 'ids' in request URL " ) ;
return ;
}
2021-03-24 07:53:15 +00:00
std : : vector < uint64_t > ids = parseIdsFromBinary ( params . get ( " ids " ) ) ;
2021-03-06 18:21:40 +00:00
auto library_handler = SharedLibraryHandlerFactory : : instance ( ) . get ( dictionary_id ) ;
2021-03-24 08:41:42 +00:00
const auto & sample_block = library_handler - > getSampleBlock ( ) ;
2021-03-24 09:23:29 +00:00
auto input = library_handler - > loadIds ( ids ) ;
2021-03-24 08:41:42 +00:00
BlockOutputStreamPtr output = FormatFactory : : instance ( ) . getOutputStream ( FORMAT , out , sample_block , context ) ;
2021-03-05 09:38:00 +00:00
copyData ( * input , * output ) ;
}
2021-03-06 18:21:40 +00:00
else if ( method = = " loadKeys " )
{
2021-03-24 09:23:29 +00:00
if ( ! params . has ( " requested_block_sample " ) )
2021-03-10 18:02:43 +00:00
{
2021-03-24 09:23:29 +00:00
processError ( response , " No 'requested_block_sample' in request URL " ) ;
2021-03-10 18:02:43 +00:00
return ;
}
2021-03-24 09:23:29 +00:00
std : : string requested_block_string = params . get ( " requested_block_sample " ) ;
2021-03-10 13:10:05 +00:00
2021-03-10 18:02:43 +00:00
std : : shared_ptr < Block > requested_sample_block ;
2021-03-10 13:10:05 +00:00
try
{
2021-03-10 18:02:43 +00:00
requested_sample_block = parseColumns ( std : : move ( requested_block_string ) ) ;
2021-03-10 13:10:05 +00:00
}
catch ( const Exception & ex )
{
2021-03-10 18:02:43 +00:00
processError ( response , " Invalid 'requested_block' parameter in request body ' " + ex . message ( ) + " ' " ) ;
2021-03-10 13:10:05 +00:00
LOG_WARNING ( log , ex . getStackTraceString ( ) ) ;
return ;
}
auto & read_buf = request . getStream ( ) ;
2021-03-10 18:02:43 +00:00
auto format = FormatFactory : : instance ( ) . getInput ( FORMAT , read_buf , * requested_sample_block , context , DEFAULT_BLOCK_SIZE ) ;
2021-03-10 13:10:05 +00:00
auto reader = std : : make_shared < InputStreamFromInputFormat > ( format ) ;
auto block = reader - > read ( ) ;
2021-03-10 18:02:43 +00:00
auto library_handler = SharedLibraryHandlerFactory : : instance ( ) . get ( dictionary_id ) ;
2021-03-24 08:41:42 +00:00
const auto & sample_block = library_handler - > getSampleBlock ( ) ;
auto input = library_handler - > loadKeys ( block . getColumns ( ) ) ;
BlockOutputStreamPtr output = FormatFactory : : instance ( ) . getOutputStream ( FORMAT , out , sample_block , context ) ;
2021-03-10 13:10:05 +00:00
copyData ( * input , * output ) ;
2021-03-06 18:21:40 +00:00
}
2021-03-05 09:38:00 +00:00
}
catch ( . . . )
{
auto message = getCurrentExceptionMessage ( true ) ;
2021-03-26 16:16:31 +00:00
response . setStatusAndReason ( Poco : : Net : : HTTPResponse : : HTTP_INTERNAL_SERVER_ERROR , message ) ; // can't call process_error, because of too soon response sending
2021-03-05 09:38:00 +00:00
try
{
writeStringBinary ( message , out ) ;
out . finalize ( ) ;
}
catch ( . . . )
{
tryLogCurrentException ( log ) ;
}
tryLogCurrentException ( log ) ;
}
try
{
out . finalize ( ) ;
}
catch ( . . . )
{
tryLogCurrentException ( log ) ;
}
}
void LibraryRequestHandler : : processError ( HTTPServerResponse & response , const std : : string & message )
{
response . setStatusAndReason ( HTTPResponse : : HTTP_INTERNAL_SERVER_ERROR ) ;
if ( ! response . sent ( ) )
* response . send ( ) < < message < < std : : endl ;
LOG_WARNING ( log , message ) ;
}
void PingHandler : : handleRequest ( HTTPServerRequest & /* request */ , HTTPServerResponse & response )
{
try
{
setResponseDefaultHeaders ( response , keep_alive_timeout ) ;
const char * data = " Ok. \n " ;
response . sendBuffer ( data , strlen ( data ) ) ;
}
catch ( . . . )
{
tryLogCurrentException ( " PingHandler " ) ;
}
}
}