2018-08-19 17:09:54 +00:00
# include "MainHandler.h"
2024-09-18 12:20:53 +00:00
# include <Core/Settings.h>
2020-08-14 13:41:44 +00:00
# include <DataTypes/DataTypeFactory.h>
2018-08-08 16:15:29 +00:00
# include <Formats/FormatFactory.h>
2024-09-18 12:20:53 +00:00
# include <IO/Operators.h>
2020-08-14 13:41:44 +00:00
# include <IO/ReadBufferFromIStream.h>
2024-09-18 12:20:53 +00:00
# include <IO/ReadHelpers.h>
# include <IO/WriteHelpers.h>
# include <Processors/Executors/CompletedPipelineExecutor.h>
# include <Processors/Formats/IInputFormat.h>
# include <QueryPipeline/QueryPipeline.h>
# include <Server/HTTP/HTMLForm.h>
# include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
# include <Poco/Net/HTMLForm.h>
2018-08-08 16:15:29 +00:00
# include <Poco/Net/HTTPServerRequest.h>
# include <Poco/Net/HTTPServerResponse.h>
2020-08-14 13:41:44 +00:00
# include <Poco/ThreadPool.h>
2022-08-08 14:40:19 +00:00
# include <Common/BridgeProtocolVersion.h>
2022-04-27 15:05:45 +00:00
# include <Common/logger_useful.h>
2024-09-18 12:20:53 +00:00
# include "ODBCSink.h"
# include "ODBCSource.h"
2022-09-28 08:45:15 +00:00
# include "config.h"
2024-09-18 12:20:53 +00:00
# include "getIdentifierQuote.h"
# include "validateODBCConnectionString.h"
2020-08-14 13:41:44 +00:00
2019-04-17 17:36:58 +00:00
# include <mutex>
2020-08-14 13:41:44 +00:00
# include <memory>
2018-08-19 17:09:54 +00:00
2018-08-08 16:15:29 +00:00
namespace DB
{
2024-09-18 12:20:53 +00:00
namespace Setting
{
extern const SettingsUInt64 odbc_bridge_connection_pool_size ;
}
2021-03-22 11:40:29 +00:00
2018-08-08 16:15:29 +00:00
namespace
{
2018-08-09 12:57:34 +00:00
std : : unique_ptr < Block > parseColumns ( std : : string & & column_string )
2018-08-08 16:15:29 +00:00
{
2018-08-09 12:57:34 +00:00
std : : unique_ptr < Block > sample_block = std : : make_unique < Block > ( ) ;
auto names_and_types = NamesAndTypesList : : parse ( column_string ) ;
for ( const NameAndTypePair & column_data : names_and_types )
sample_block - > insert ( { column_data . type , column_data . name } ) ;
return sample_block ;
}
2018-08-08 16:15:29 +00:00
}
2018-08-09 12:57:34 +00:00
2021-02-19 12:51:26 +00:00
void ODBCHandler : : processError ( HTTPServerResponse & response , const std : : string & message )
2020-04-28 00:56:44 +00:00
{
2021-02-19 12:51:26 +00:00
response . setStatusAndReason ( HTTPResponse : : HTTP_INTERNAL_SERVER_ERROR ) ;
2020-04-28 00:56:44 +00:00
if ( ! response . sent ( ) )
2024-01-03 16:47:15 +00:00
* response . send ( ) < < message < < ' \n ' ;
Use fmt::runtime() for LOG_* for non constexpr
Here is oneliner:
$ gg 'LOG_\(DEBUG\|TRACE\|INFO\|TEST\|WARNING\|ERROR\|FATAL\)([^,]*, [a-zA-Z]' -- :*.cpp :*.h | cut -d: -f1 | sort -u | xargs -r sed -E -i 's#(LOG_[A-Z]*)\(([^,]*), ([A-Za-z][^,)]*)#\1(\2, fmt::runtime(\3)#'
Note, that I tried to do this with coccinelle (tool for semantic
patchin), but it cannot parse C++:
$ cat fmt.cocci
@@
expression log;
expression var;
@@
-LOG_DEBUG(log, var)
+LOG_DEBUG(log, fmt::runtime(var))
I've also tried to use some macros/templates magic to do this implicitly
in logger_useful.h, but I failed to do so, and apparently it is not
possible for now.
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
v2: manual fixes
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-02-01 09:10:27 +00:00
LOG_WARNING ( log , fmt : : runtime ( message ) ) ;
2020-04-28 00:56:44 +00:00
}
2021-03-22 11:40:29 +00:00
2024-01-03 16:47:15 +00:00
void ODBCHandler : : handleRequest ( HTTPServerRequest & request , HTTPServerResponse & response , const ProfileEvents : : Event & /*write_event*/ )
2018-08-08 16:15:29 +00:00
{
2021-06-16 14:33:14 +00:00
HTMLForm params ( getContext ( ) - > getSettingsRef ( ) , request ) ;
2021-04-17 08:09:22 +00:00
LOG_TRACE ( log , " Request URI: {} " , request . getURI ( ) ) ;
2022-08-10 07:39:32 +00:00
size_t version ;
2022-08-08 14:40:19 +00:00
if ( ! params . has ( " version " ) )
2022-08-10 07:39:32 +00:00
version = 0 ; /// assumed version for too old servers which do not send a version
2022-08-08 14:40:19 +00:00
else
{
String version_str = params . get ( " version " ) ;
2022-08-10 07:39:32 +00:00
if ( ! tryParse ( version , version_str ) )
2022-08-08 14:40:19 +00:00
{
processError ( response , " Unable to parse 'version' string in request URL: ' " + version_str + " ' Check if the server and library-bridge have the same version. " ) ;
return ;
}
}
2022-08-10 07:39:32 +00:00
if ( version ! = XDBC_BRIDGE_PROTOCOL_VERSION )
{
/// backwards compatibility is considered unnecessary for now, just let the user know that the server and the bridge must be upgraded together
processError ( response , " Server and library-bridge have different versions: ' " + std : : to_string ( version ) + " ' vs. ' " + std : : to_string ( LIBRARY_BRIDGE_PROTOCOL_VERSION ) + " ' " ) ;
return ;
}
2020-04-28 00:56:44 +00:00
if ( mode = = " read " )
2021-02-19 12:51:26 +00:00
params . read ( request . getStream ( ) ) ;
2018-08-08 16:15:29 +00:00
2020-05-14 21:51:07 +00:00
if ( mode = = " read " & & ! params . has ( " query " ) )
{
processError ( response , " No 'query' in request body " ) ;
return ;
}
2021-04-17 08:09:22 +00:00
if ( ! params . has ( " connection_string " ) )
2018-08-08 16:15:29 +00:00
{
2021-04-17 08:09:22 +00:00
processError ( response , " No 'connection_string' in request URL " ) ;
2018-08-08 16:15:29 +00:00
return ;
}
2021-04-17 08:09:22 +00:00
if ( ! params . has ( " sample_block " ) )
2018-08-08 16:15:29 +00:00
{
2021-04-17 08:09:22 +00:00
processError ( response , " No 'sample_block' in request URL " ) ;
2018-08-08 16:15:29 +00:00
return ;
}
2021-04-17 08:09:22 +00:00
std : : string format = params . get ( " format " , " RowBinary " ) ;
std : : string connection_string = params . get ( " connection_string " ) ;
2023-01-19 00:36:25 +00:00
bool use_connection_pooling = params . getParsed < bool > ( " use_connection_pooling " , true ) ;
2021-04-17 08:09:22 +00:00
LOG_TRACE ( log , " Connection string: '{}' " , connection_string ) ;
2023-01-19 00:36:25 +00:00
LOG_TRACE ( log , " Use pooling: {} " , use_connection_pooling ) ;
2021-04-17 08:09:22 +00:00
2020-05-14 21:51:07 +00:00
UInt64 max_block_size = DEFAULT_BLOCK_SIZE ;
if ( params . has ( " max_block_size " ) )
{
std : : string max_block_size_str = params . get ( " max_block_size " , " " ) ;
if ( max_block_size_str . empty ( ) )
{
processError ( response , " Empty max_block_size specified " ) ;
return ;
}
max_block_size = parse < size_t > ( max_block_size_str ) ;
}
2021-04-17 08:09:22 +00:00
std : : string sample_block_string = params . get ( " sample_block " ) ;
2018-08-09 12:57:34 +00:00
std : : unique_ptr < Block > sample_block ;
2018-08-08 16:15:29 +00:00
try
{
2021-04-17 08:09:22 +00:00
sample_block = parseColumns ( std : : move ( sample_block_string ) ) ;
2018-08-09 12:57:34 +00:00
}
catch ( const Exception & ex )
{
2021-04-17 08:09:22 +00:00
processError ( response , " Invalid 'sample_block' parameter in request body ' " + ex . message ( ) + " ' " ) ;
Use fmt::runtime() for LOG_* for non constexpr
Here is oneliner:
$ gg 'LOG_\(DEBUG\|TRACE\|INFO\|TEST\|WARNING\|ERROR\|FATAL\)([^,]*, [a-zA-Z]' -- :*.cpp :*.h | cut -d: -f1 | sort -u | xargs -r sed -E -i 's#(LOG_[A-Z]*)\(([^,]*), ([A-Za-z][^,)]*)#\1(\2, fmt::runtime(\3)#'
Note, that I tried to do this with coccinelle (tool for semantic
patchin), but it cannot parse C++:
$ cat fmt.cocci
@@
expression log;
expression var;
@@
-LOG_DEBUG(log, var)
+LOG_DEBUG(log, fmt::runtime(var))
I've also tried to use some macros/templates magic to do this implicitly
in logger_useful.h, but I failed to do so, and apparently it is not
possible for now.
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
v2: manual fixes
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-02-01 09:10:27 +00:00
LOG_ERROR ( log , fmt : : runtime ( ex . getStackTraceString ( ) ) ) ;
2018-08-09 12:57:34 +00:00
return ;
}
2018-08-08 16:15:29 +00:00
2024-03-22 20:07:12 +00:00
WriteBufferFromHTTPServerResponse out ( response , request . getMethod ( ) = = Poco : : Net : : HTTPRequest : : HTTP_HEAD ) ;
2020-05-14 21:51:07 +00:00
try
2018-08-09 12:57:34 +00:00
{
2022-05-31 15:50:13 +00:00
nanodbc : : ConnectionHolderPtr connection_handler ;
2023-01-19 00:36:25 +00:00
if ( use_connection_pooling )
2022-06-01 09:00:39 +00:00
connection_handler = ODBCPooledConnectionFactory : : instance ( ) . get (
2024-09-18 12:20:53 +00:00
validateODBCConnectionString ( connection_string ) , getContext ( ) - > getSettingsRef ( ) [ Setting : : odbc_bridge_connection_pool_size ] ) ;
2022-05-31 15:50:13 +00:00
else
connection_handler = std : : make_shared < nanodbc : : ConnectionHolder > ( validateODBCConnectionString ( connection_string ) ) ;
2021-04-06 18:59:34 +00:00
2020-05-14 21:51:07 +00:00
if ( mode = = " write " )
2020-04-28 00:56:44 +00:00
{
if ( ! params . has ( " db_name " ) )
{
processError ( response , " No 'db_name' in request URL " ) ;
return ;
}
if ( ! params . has ( " table_name " ) )
{
processError ( response , " No 'table_name' in request URL " ) ;
return ;
}
std : : string db_name = params . get ( " db_name " ) ;
std : : string table_name = params . get ( " table_name " ) ;
2020-05-23 22:24:01 +00:00
LOG_TRACE ( log , " DB name: '{}', table name: '{}' " , db_name , table_name ) ;
2020-05-14 21:51:07 +00:00
2024-09-28 01:33:17 +00:00
auto quoting_style = IdentifierQuotingStyle : : Backticks ;
2020-05-15 11:26:51 +00:00
# if USE_ODBC
2021-06-07 18:09:16 +00:00
quoting_style = getQuotingStyle ( connection_handler ) ;
2020-05-15 11:26:51 +00:00
# endif
2021-02-19 12:51:26 +00:00
auto & read_buf = request . getStream ( ) ;
2021-10-11 16:11:50 +00:00
auto input_format = getContext ( ) - > getInputFormat ( format , read_buf , * sample_block , max_block_size ) ;
auto sink = std : : make_shared < ODBCSink > ( std : : move ( connection_handler ) , db_name , table_name , * sample_block , getContext ( ) , quoting_style ) ;
QueryPipeline pipeline ( std : : move ( input_format ) ) ;
pipeline . complete ( std : : move ( sink ) ) ;
CompletedPipelineExecutor executor ( pipeline ) ;
executor . execute ( ) ;
2020-04-28 00:56:44 +00:00
writeStringBinary ( " Ok. " , out ) ;
}
2020-05-14 21:51:07 +00:00
else
2020-04-28 00:56:44 +00:00
{
2020-05-14 21:51:07 +00:00
std : : string query = params . get ( " query " ) ;
2020-05-23 22:24:01 +00:00
LOG_TRACE ( log , " Query: {} " , query ) ;
2020-04-28 00:56:44 +00:00
2021-10-11 16:11:50 +00:00
auto writer = FormatFactory : : instance ( ) . getOutputFormatParallelIfPossible ( format , out , * sample_block , getContext ( ) ) ;
auto source = std : : make_shared < ODBCSource > ( std : : move ( connection_handler ) , query , * sample_block , max_block_size ) ;
QueryPipeline pipeline ( std : : move ( source ) ) ;
pipeline . complete ( std : : move ( writer ) ) ;
CompletedPipelineExecutor executor ( pipeline ) ;
executor . execute ( ) ;
2020-04-28 00:56:44 +00:00
}
2020-05-14 21:51:07 +00:00
}
catch ( . . . )
{
auto message = getCurrentExceptionMessage ( true ) ;
response . setStatusAndReason (
Poco : : Net : : HTTPResponse : : HTTP_INTERNAL_SERVER_ERROR ) ; // can't call process_error, because of too soon response sending
2021-02-20 05:31:05 +00:00
try
{
writeStringBinary ( message , out ) ;
out . finalize ( ) ;
}
catch ( . . . )
{
tryLogCurrentException ( log ) ;
}
2020-05-14 21:51:07 +00:00
tryLogCurrentException ( log ) ;
2021-02-20 05:31:05 +00:00
}
2020-04-28 00:56:44 +00:00
2021-02-20 05:31:05 +00:00
try
{
out . finalize ( ) ;
}
catch ( . . . )
{
tryLogCurrentException ( log ) ;
2018-08-08 16:15:29 +00:00
}
}
2020-04-28 00:56:44 +00:00
2018-08-08 16:15:29 +00:00
}