2021-03-05 09:38:00 +00:00
# include "Handlers.h"
2021-03-06 18:21:40 +00:00
# include "SharedLibraryHandlerFactory.h"
2021-03-05 09:38:00 +00:00
# include <Formats/FormatFactory.h>
# include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
# include <IO/WriteHelpers.h>
# include <IO/ReadHelpers.h>
# include <Poco/Net/HTTPServerRequest.h>
# include <Poco/Net/HTTPServerResponse.h>
# include <Poco/Net/HTMLForm.h>
# include <Poco/ThreadPool.h>
2021-10-11 16:11:50 +00:00
# include <Processors/Formats/IOutputFormat.h>
2021-10-14 10:25:43 +00:00
# include <Processors/Formats/IInputFormat.h>
2021-10-16 14:03:50 +00:00
# include <QueryPipeline/QueryPipeline.h>
2021-10-11 16:11:50 +00:00
# include <Processors/Executors/CompletedPipelineExecutor.h>
2021-10-14 10:25:43 +00:00
# include <Processors/Executors/PullingPipelineExecutor.h>
2021-10-11 16:11:50 +00:00
# include <Processors/Sources/SourceFromSingleChunk.h>
2021-10-16 14:03:50 +00:00
# include <QueryPipeline/Pipe.h>
2021-03-05 09:38:00 +00:00
# include <Server/HTTP/HTMLForm.h>
2021-03-24 07:53:15 +00:00
# include <IO/ReadBufferFromString.h>
2021-03-05 09:38:00 +00:00
namespace DB
{
2021-08-02 07:18:51 +00:00
namespace ErrorCodes
{
extern const int BAD_REQUEST_PARAMETER ;
}
2021-03-05 09:38:00 +00:00
namespace
{
2021-07-30 14:20:57 +00:00
void processError ( HTTPServerResponse & response , const std : : string & message )
{
response . setStatusAndReason ( HTTPResponse : : HTTP_INTERNAL_SERVER_ERROR ) ;
if ( ! response . sent ( ) )
* response . send ( ) < < message < < std : : endl ;
LOG_WARNING ( & Poco : : Logger : : get ( " LibraryBridge " ) , message ) ;
}
2021-03-05 09:38:00 +00:00
std : : shared_ptr < Block > parseColumns ( std : : string & & column_string )
{
auto sample_block = std : : make_shared < Block > ( ) ;
auto names_and_types = NamesAndTypesList : : parse ( column_string ) ;
for ( const NameAndTypePair & column_data : names_and_types )
sample_block - > insert ( { column_data . type , column_data . name } ) ;
return sample_block ;
}
2021-03-24 07:53:15 +00:00
2021-08-01 08:51:40 +00:00
std : : vector < uint64_t > parseIdsFromBinary ( ReadBuffer & buf )
{
std : : vector < uint64_t > ids ;
readVectorBinary ( ids , buf ) ;
return ids ;
}
2021-03-24 07:53:15 +00:00
2021-03-24 19:32:31 +00:00
std : : vector < std : : string > parseNamesFromBinary ( const std : : string & names_string )
2021-03-24 07:53:15 +00:00
{
2021-03-24 19:32:31 +00:00
ReadBufferFromString buf ( names_string ) ;
std : : vector < std : : string > names ;
readVectorBinary ( names , buf ) ;
return names ;
2021-03-24 07:53:15 +00:00
}
2021-03-05 09:38:00 +00:00
}
2021-10-11 16:11:50 +00:00
static void writeData ( Block data , OutputFormatPtr format )
{
auto source = std : : make_shared < SourceFromSingleChunk > ( std : : move ( data ) ) ;
QueryPipeline pipeline ( std : : move ( source ) ) ;
pipeline . complete ( std : : move ( format ) ) ;
CompletedPipelineExecutor executor ( pipeline ) ;
executor . execute ( ) ;
}
2021-03-05 09:38:00 +00:00
void LibraryRequestHandler : : handleRequest ( HTTPServerRequest & request , HTTPServerResponse & response )
{
LOG_TRACE ( log , " Request URI: {} " , request . getURI ( ) ) ;
2021-06-16 14:33:14 +00:00
HTMLForm params ( getContext ( ) - > getSettingsRef ( ) , request ) ;
2021-07-23 09:05:42 +00:00
2021-03-05 09:38:00 +00:00
if ( ! params . has ( " method " ) )
{
processError ( response , " No 'method' in request URL " ) ;
return ;
}
2021-03-07 11:31:55 +00:00
if ( ! params . has ( " dictionary_id " ) )
{
processError ( response , " No 'dictionary_id in request URL " ) ;
return ;
}
2021-03-05 09:38:00 +00:00
std : : string method = params . get ( " method " ) ;
2021-03-07 11:31:55 +00:00
std : : string dictionary_id = params . get ( " dictionary_id " ) ;
2021-03-05 09:38:00 +00:00
2021-07-30 14:20:57 +00:00
LOG_TRACE ( log , " Library method: '{}', dictionary id: {} " , method , dictionary_id ) ;
2021-03-05 09:38:00 +00:00
WriteBufferFromHTTPServerResponse out ( response , request . getMethod ( ) = = Poco : : Net : : HTTPRequest : : HTTP_HEAD , keep_alive_timeout ) ;
2021-03-07 11:31:55 +00:00
2021-03-05 09:38:00 +00:00
try
{
2021-07-30 14:20:57 +00:00
bool lib_new = ( method = = " libNew " ) ;
if ( method = = " libClone " )
{
if ( ! params . has ( " from_dictionary_id " ) )
{
processError ( response , " No 'from_dictionary_id' in request URL " ) ;
return ;
}
std : : string from_dictionary_id = params . get ( " from_dictionary_id " ) ;
bool cloned = false ;
cloned = SharedLibraryHandlerFactory : : instance ( ) . clone ( from_dictionary_id , dictionary_id ) ;
if ( cloned )
{
writeStringBinary ( " 1 " , out ) ;
}
else
{
LOG_TRACE ( log , " Cannot clone from dictionary with id: {}, will call libNew instead " ) ;
lib_new = true ;
}
}
if ( lib_new )
2021-03-05 09:38:00 +00:00
{
2021-04-02 15:45:42 +00:00
auto & read_buf = request . getStream ( ) ;
params . read ( read_buf ) ;
2021-03-24 09:23:29 +00:00
2021-03-05 09:38:00 +00:00
if ( ! params . has ( " library_path " ) )
{
processError ( response , " No 'library_path' in request URL " ) ;
return ;
}
if ( ! params . has ( " library_settings " ) )
{
processError ( response , " No 'library_settings' in request URL " ) ;
return ;
}
2021-03-24 09:23:29 +00:00
std : : string library_path = params . get ( " library_path " ) ;
const auto & settings_string = params . get ( " library_settings " ) ;
2021-07-30 14:20:57 +00:00
LOG_DEBUG ( log , " Parsing library settings from binary string " ) ;
2021-03-24 19:32:31 +00:00
std : : vector < std : : string > library_settings = parseNamesFromBinary ( settings_string ) ;
2021-03-24 09:23:29 +00:00
2021-04-02 15:45:42 +00:00
/// Needed for library dictionary
2021-03-24 19:32:31 +00:00
if ( ! params . has ( " attributes_names " ) )
2021-03-24 09:23:29 +00:00
{
2021-03-24 19:32:31 +00:00
processError ( response , " No 'attributes_names' in request URL " ) ;
2021-03-24 09:23:29 +00:00
return ;
}
2021-04-02 15:45:42 +00:00
const auto & attributes_string = params . get ( " attributes_names " ) ;
2021-07-30 14:20:57 +00:00
LOG_DEBUG ( log , " Parsing attributes names from binary string " ) ;
2021-04-02 15:45:42 +00:00
std : : vector < std : : string > attributes_names = parseNamesFromBinary ( attributes_string ) ;
/// Needed to parse block from binary string format
2021-03-24 08:41:42 +00:00
if ( ! params . has ( " sample_block " ) )
{
processError ( response , " No 'sample_block' in request URL " ) ;
return ;
}
2021-04-02 15:45:42 +00:00
std : : string sample_block_string = params . get ( " sample_block " ) ;
2021-03-24 08:41:42 +00:00
std : : shared_ptr < Block > sample_block ;
try
{
2021-04-02 15:45:42 +00:00
sample_block = parseColumns ( std : : move ( sample_block_string ) ) ;
2021-03-24 08:41:42 +00:00
}
catch ( const Exception & ex )
{
processError ( response , " Invalid 'sample_block' parameter in request body ' " + ex . message ( ) + " ' " ) ;
LOG_WARNING ( log , ex . getStackTraceString ( ) ) ;
return ;
}
2021-04-05 13:13:07 +00:00
if ( ! params . has ( " null_values " ) )
2021-04-02 15:45:42 +00:00
{
2021-04-05 13:13:07 +00:00
processError ( response , " No 'null_values' in request URL " ) ;
2021-04-02 15:45:42 +00:00
return ;
}
2021-04-05 13:13:07 +00:00
ReadBufferFromString read_block_buf ( params . get ( " null_values " ) ) ;
2021-10-11 16:11:50 +00:00
auto format = getContext ( ) - > getInputFormat ( FORMAT , read_block_buf , * sample_block , DEFAULT_BLOCK_SIZE ) ;
2021-10-14 10:25:43 +00:00
QueryPipeline pipeline ( Pipe ( std : : move ( format ) ) ) ;
PullingPipelineExecutor executor ( pipeline ) ;
Block sample_block_with_nulls ;
executor . pull ( sample_block_with_nulls ) ;
2021-04-02 15:45:42 +00:00
2021-04-05 13:13:07 +00:00
LOG_DEBUG ( log , " Dictionary sample block with null values: {} " , sample_block_with_nulls . dumpStructure ( ) ) ;
2021-04-02 15:45:42 +00:00
SharedLibraryHandlerFactory : : instance ( ) . create ( dictionary_id , library_path , library_settings , sample_block_with_nulls , attributes_names ) ;
2021-03-22 15:58:20 +00:00
writeStringBinary ( " 1 " , out ) ;
2021-03-05 09:38:00 +00:00
}
2021-03-06 18:21:40 +00:00
else if ( method = = " libDelete " )
{
2021-07-30 14:20:57 +00:00
auto deleted = SharedLibraryHandlerFactory : : instance ( ) . remove ( dictionary_id ) ;
/// Do not throw, a warning is ok.
if ( ! deleted )
LOG_WARNING ( log , " Cannot delete library for with dictionary id: {}, because such id was not found. " , dictionary_id ) ;
2021-03-22 15:58:20 +00:00
writeStringBinary ( " 1 " , out ) ;
2021-03-05 10:43:47 +00:00
}
else if ( method = = " isModified " )
{
2021-03-06 18:21:40 +00:00
auto library_handler = SharedLibraryHandlerFactory : : instance ( ) . get ( dictionary_id ) ;
2021-07-30 14:20:57 +00:00
if ( ! library_handler )
2021-08-02 07:18:51 +00:00
throw Exception ( ErrorCodes : : BAD_REQUEST_PARAMETER , " Not found dictionary with id: {} " , dictionary_id ) ;
2021-07-30 14:20:57 +00:00
2021-03-06 18:21:40 +00:00
bool res = library_handler - > isModified ( ) ;
2021-03-05 10:43:47 +00:00
writeStringBinary ( std : : to_string ( res ) , out ) ;
}
else if ( method = = " supportsSelectiveLoad " )
{
2021-03-06 18:21:40 +00:00
auto library_handler = SharedLibraryHandlerFactory : : instance ( ) . get ( dictionary_id ) ;
2021-07-30 14:20:57 +00:00
if ( ! library_handler )
2021-08-02 07:18:51 +00:00
throw Exception ( ErrorCodes : : BAD_REQUEST_PARAMETER , " Not found dictionary with id: {} " , dictionary_id ) ;
2021-07-30 14:20:57 +00:00
2021-03-06 18:21:40 +00:00
bool res = library_handler - > supportsSelectiveLoad ( ) ;
2021-03-05 10:43:47 +00:00
writeStringBinary ( std : : to_string ( res ) , out ) ;
2021-03-05 09:38:00 +00:00
}
else if ( method = = " loadAll " )
{
2021-03-06 18:21:40 +00:00
auto library_handler = SharedLibraryHandlerFactory : : instance ( ) . get ( dictionary_id ) ;
2021-07-30 14:20:57 +00:00
if ( ! library_handler )
2021-08-02 07:18:51 +00:00
throw Exception ( ErrorCodes : : BAD_REQUEST_PARAMETER , " Not found dictionary with id: {} " , dictionary_id ) ;
2021-07-30 14:20:57 +00:00
2021-03-24 08:41:42 +00:00
const auto & sample_block = library_handler - > getSampleBlock ( ) ;
2021-08-01 08:51:40 +00:00
LOG_DEBUG ( log , " Calling loadAll() for dictionary id: {} " , dictionary_id ) ;
2021-03-24 09:23:29 +00:00
auto input = library_handler - > loadAll ( ) ;
2021-04-02 15:45:42 +00:00
2021-07-30 14:20:57 +00:00
LOG_DEBUG ( log , " Started sending result data for dictionary id: {} " , dictionary_id ) ;
2021-10-11 16:11:50 +00:00
auto output = FormatFactory : : instance ( ) . getOutputFormat ( FORMAT , out , sample_block , getContext ( ) ) ;
writeData ( std : : move ( input ) , std : : move ( output ) ) ;
2021-03-05 10:43:47 +00:00
}
else if ( method = = " loadIds " )
{
2021-07-30 14:20:57 +00:00
LOG_DEBUG ( log , " Getting diciontary ids for dictionary with id: {} " , dictionary_id ) ;
2021-07-27 13:07:01 +00:00
String ids_string ;
2021-08-01 08:51:40 +00:00
std : : vector < uint64_t > ids = parseIdsFromBinary ( request . getStream ( ) ) ;
2021-03-23 15:41:53 +00:00
2021-03-06 18:21:40 +00:00
auto library_handler = SharedLibraryHandlerFactory : : instance ( ) . get ( dictionary_id ) ;
2021-07-30 14:20:57 +00:00
if ( ! library_handler )
2021-08-02 07:18:51 +00:00
throw Exception ( ErrorCodes : : BAD_REQUEST_PARAMETER , " Not found dictionary with id: {} " , dictionary_id ) ;
2021-07-30 14:20:57 +00:00
2021-03-24 08:41:42 +00:00
const auto & sample_block = library_handler - > getSampleBlock ( ) ;
2021-08-01 08:51:40 +00:00
LOG_DEBUG ( log , " Calling loadIds() for dictionary id: {} " , dictionary_id ) ;
2021-03-24 09:23:29 +00:00
auto input = library_handler - > loadIds ( ids ) ;
2021-07-30 14:20:57 +00:00
LOG_DEBUG ( log , " Started sending result data for dictionary id: {} " , dictionary_id ) ;
2021-10-11 16:11:50 +00:00
auto output = FormatFactory : : instance ( ) . getOutputFormat ( FORMAT , out , sample_block , getContext ( ) ) ;
writeData ( std : : move ( input ) , std : : move ( output ) ) ;
2021-03-05 09:38:00 +00:00
}
2021-03-06 18:21:40 +00:00
else if ( method = = " loadKeys " )
{
2021-03-24 09:23:29 +00:00
if ( ! params . has ( " requested_block_sample " ) )
2021-03-10 18:02:43 +00:00
{
2021-03-24 09:23:29 +00:00
processError ( response , " No 'requested_block_sample' in request URL " ) ;
2021-03-10 18:02:43 +00:00
return ;
}
2021-03-24 09:23:29 +00:00
std : : string requested_block_string = params . get ( " requested_block_sample " ) ;
2021-03-10 13:10:05 +00:00
2021-03-10 18:02:43 +00:00
std : : shared_ptr < Block > requested_sample_block ;
2021-03-10 13:10:05 +00:00
try
{
2021-03-10 18:02:43 +00:00
requested_sample_block = parseColumns ( std : : move ( requested_block_string ) ) ;
2021-03-10 13:10:05 +00:00
}
catch ( const Exception & ex )
{
2021-03-10 18:02:43 +00:00
processError ( response , " Invalid 'requested_block' parameter in request body ' " + ex . message ( ) + " ' " ) ;
2021-03-10 13:10:05 +00:00
LOG_WARNING ( log , ex . getStackTraceString ( ) ) ;
return ;
}
auto & read_buf = request . getStream ( ) ;
2021-10-11 16:11:50 +00:00
auto format = getContext ( ) - > getInputFormat ( FORMAT , read_buf , * requested_sample_block , DEFAULT_BLOCK_SIZE ) ;
2021-10-14 10:25:43 +00:00
QueryPipeline pipeline ( std : : move ( format ) ) ;
PullingPipelineExecutor executor ( pipeline ) ;
Block block ;
executor . pull ( block ) ;
2021-03-10 13:10:05 +00:00
2021-03-10 18:02:43 +00:00
auto library_handler = SharedLibraryHandlerFactory : : instance ( ) . get ( dictionary_id ) ;
2021-07-30 14:20:57 +00:00
if ( ! library_handler )
2021-08-02 07:18:51 +00:00
throw Exception ( ErrorCodes : : BAD_REQUEST_PARAMETER , " Not found dictionary with id: {} " , dictionary_id ) ;
2021-07-30 14:20:57 +00:00
2021-03-24 08:41:42 +00:00
const auto & sample_block = library_handler - > getSampleBlock ( ) ;
2021-08-01 08:51:40 +00:00
LOG_DEBUG ( log , " Calling loadKeys() for dictionary id: {} " , dictionary_id ) ;
2021-03-24 08:41:42 +00:00
auto input = library_handler - > loadKeys ( block . getColumns ( ) ) ;
2021-07-30 14:20:57 +00:00
LOG_DEBUG ( log , " Started sending result data for dictionary id: {} " , dictionary_id ) ;
2021-10-11 16:11:50 +00:00
auto output = FormatFactory : : instance ( ) . getOutputFormat ( FORMAT , out , sample_block , getContext ( ) ) ;
writeData ( std : : move ( input ) , std : : move ( output ) ) ;
2021-03-06 18:21:40 +00:00
}
2021-03-05 09:38:00 +00:00
}
catch ( . . . )
{
auto message = getCurrentExceptionMessage ( true ) ;
2021-07-30 14:20:57 +00:00
LOG_ERROR ( log , " Failed to process request for dictionary_id: {}. Error: {} " , dictionary_id , message ) ;
2021-03-05 09:38:00 +00:00
2021-07-30 14:20:57 +00:00
response . setStatusAndReason ( Poco : : Net : : HTTPResponse : : HTTP_INTERNAL_SERVER_ERROR , message ) ; // can't call process_error, because of too soon response sending
2021-03-05 09:38:00 +00:00
try
{
writeStringBinary ( message , out ) ;
out . finalize ( ) ;
}
catch ( . . . )
{
tryLogCurrentException ( log ) ;
}
}
try
{
out . finalize ( ) ;
}
catch ( . . . )
{
tryLogCurrentException ( log ) ;
}
}
2021-08-02 07:18:51 +00:00
void LibraryExistsHandler : : handleRequest ( HTTPServerRequest & request , HTTPServerResponse & response )
2021-03-05 09:38:00 +00:00
{
2021-07-30 14:20:57 +00:00
try
{
LOG_TRACE ( log , " Request URI: {} " , request . getURI ( ) ) ;
HTMLForm params ( getContext ( ) - > getSettingsRef ( ) , request ) ;
2021-03-05 09:38:00 +00:00
2021-07-30 14:20:57 +00:00
if ( ! params . has ( " dictionary_id " ) )
{
2021-08-02 07:18:51 +00:00
processError ( response , " No 'dictionary_id' in request URL " ) ;
2021-07-30 14:20:57 +00:00
return ;
}
2021-03-05 09:38:00 +00:00
2021-07-30 14:20:57 +00:00
std : : string dictionary_id = params . get ( " dictionary_id " ) ;
auto library_handler = SharedLibraryHandlerFactory : : instance ( ) . get ( dictionary_id ) ;
String res ;
if ( library_handler )
2021-08-02 07:18:51 +00:00
res = " 1 " ;
2021-07-30 14:20:57 +00:00
else
2021-08-02 07:18:51 +00:00
res = " 0 " ;
2021-03-05 09:38:00 +00:00
setResponseDefaultHeaders ( response , keep_alive_timeout ) ;
2021-07-30 14:20:57 +00:00
LOG_TRACE ( log , " Senging ping response: {} (dictionary id: {}) " , res , dictionary_id ) ;
response . sendBuffer ( res . data ( ) , res . size ( ) ) ;
2021-03-05 09:38:00 +00:00
}
catch ( . . . )
{
tryLogCurrentException ( " PingHandler " ) ;
}
}
}