2019-02-15 11:46:07 +00:00
# include "HTTPHandler.h"
2017-06-02 18:48:33 +00:00
# include <chrono>
2012-03-09 04:45:27 +00:00
# include <iomanip>
2017-02-09 10:10:13 +00:00
# include <Poco/File.h>
2017-08-09 14:33:07 +00:00
# include <Poco/Net/HTTPBasicCredentials.h>
# include <Poco/Net/HTTPServerRequest.h>
2019-02-01 01:48:25 +00:00
# include <Poco/Net/HTTPServerRequestImpl.h>
2017-08-09 14:33:07 +00:00
# include <Poco/Net/HTTPServerResponse.h>
# include <Poco/Net/NetException.h>
2017-06-06 17:18:32 +00:00
# include <ext/scope_guard.h>
2018-08-20 02:34:00 +00:00
# include <Core/ExternalTable.h>
2018-01-15 19:07:47 +00:00
# include <Common/StringUtils/StringUtils.h>
2017-04-01 09:19:00 +00:00
# include <Common/escapeForFileName.h>
2018-03-05 10:20:23 +00:00
# include <Common/getFQDNOrHostName.h>
2018-06-01 19:39:32 +00:00
# include <Common/CurrentThread.h>
2018-08-31 00:59:48 +00:00
# include <Common/setThreadName.h>
2019-02-15 11:46:07 +00:00
# include <Common/config.h>
2019-10-04 13:32:21 +00:00
# include <Common/SettingsChanges.h>
2019-02-10 17:12:22 +00:00
# include <Compression/CompressedReadBuffer.h>
# include <Compression/CompressedWriteBuffer.h>
2017-04-01 09:19:00 +00:00
# include <IO/ReadBufferFromIStream.h>
# include <IO/ZlibInflatingReadBuffer.h>
2019-02-04 22:15:08 +00:00
# include <IO/BrotliReadBuffer.h>
2017-04-01 09:19:00 +00:00
# include <IO/ReadBufferFromString.h>
2017-08-09 14:33:07 +00:00
# include <IO/WriteBufferFromString.h>
2017-04-01 09:19:00 +00:00
# include <IO/WriteBufferFromHTTPServerResponse.h>
# include <IO/WriteBufferFromFile.h>
# include <IO/WriteHelpers.h>
# include <IO/copyData.h>
# include <IO/ConcatReadBuffer.h>
# include <IO/CascadeWriteBuffer.h>
# include <IO/MemoryReadWriteBuffer.h>
# include <IO/WriteBufferFromTemporaryFile.h>
2019-01-23 14:48:50 +00:00
# include <DataStreams/IBlockInputStream.h>
2017-04-01 09:19:00 +00:00
# include <Interpreters/executeQuery.h>
# include <Interpreters/Quota.h>
2017-07-13 20:58:19 +00:00
# include <Common/typeid_cast.h>
2017-09-23 23:32:26 +00:00
# include <Poco/Net/HTTPStream.h>
2012-03-09 03:06:09 +00:00
namespace DB
{
2016-01-11 21:46:36 +00:00
namespace ErrorCodes
{
2017-04-01 07:20:54 +00:00
extern const int READONLY ;
extern const int UNKNOWN_COMPRESSION_METHOD ;
extern const int CANNOT_PARSE_TEXT ;
extern const int CANNOT_PARSE_ESCAPE_SEQUENCE ;
extern const int CANNOT_PARSE_QUOTED_STRING ;
extern const int CANNOT_PARSE_DATE ;
extern const int CANNOT_PARSE_DATETIME ;
extern const int CANNOT_PARSE_NUMBER ;
extern const int CANNOT_OPEN_FILE ;
extern const int UNKNOWN_ELEMENT_IN_AST ;
extern const int UNKNOWN_TYPE_OF_AST_NODE ;
extern const int TOO_DEEP_AST ;
extern const int TOO_BIG_AST ;
extern const int UNEXPECTED_AST_STRUCTURE ;
2018-10-29 18:00:36 +00:00
extern const int SYNTAX_ERROR ;
2019-08-01 13:16:09 +00:00
extern const int INCORRECT_DATA ;
extern const int TYPE_MISMATCH ;
2017-04-01 07:20:54 +00:00
extern const int UNKNOWN_TABLE ;
extern const int UNKNOWN_FUNCTION ;
extern const int UNKNOWN_IDENTIFIER ;
extern const int UNKNOWN_TYPE ;
extern const int UNKNOWN_STORAGE ;
extern const int UNKNOWN_DATABASE ;
extern const int UNKNOWN_SETTING ;
extern const int UNKNOWN_DIRECTION_OF_SORTING ;
extern const int UNKNOWN_AGGREGATE_FUNCTION ;
extern const int UNKNOWN_FORMAT ;
extern const int UNKNOWN_DATABASE_ENGINE ;
extern const int UNKNOWN_TYPE_OF_QUERY ;
extern const int QUERY_IS_TOO_LARGE ;
extern const int NOT_IMPLEMENTED ;
extern const int SOCKET_TIMEOUT ;
extern const int UNKNOWN_USER ;
extern const int WRONG_PASSWORD ;
extern const int REQUIRED_PASSWORD ;
2017-06-02 18:48:33 +00:00
extern const int INVALID_SESSION_TIMEOUT ;
2017-09-25 19:45:15 +00:00
extern const int HTTP_LENGTH_REQUIRED ;
2016-01-11 21:46:36 +00:00
}
2017-06-02 21:01:17 +00:00
2017-02-07 05:21:31 +00:00
static Poco : : Net : : HTTPResponse : : HTTPStatus exceptionCodeToHTTPStatus ( int exception_code )
2017-02-06 08:23:02 +00:00
{
2017-04-01 07:20:54 +00:00
using namespace Poco : : Net ;
2019-08-02 05:07:10 +00:00
if ( exception_code = = ErrorCodes : : REQUIRED_PASSWORD )
return HTTPResponse : : HTTP_UNAUTHORIZED ;
else if ( exception_code = = ErrorCodes : : CANNOT_PARSE_TEXT | |
exception_code = = ErrorCodes : : CANNOT_PARSE_ESCAPE_SEQUENCE | |
exception_code = = ErrorCodes : : CANNOT_PARSE_QUOTED_STRING | |
exception_code = = ErrorCodes : : CANNOT_PARSE_DATE | |
exception_code = = ErrorCodes : : CANNOT_PARSE_DATETIME | |
2019-08-02 05:11:02 +00:00
exception_code = = ErrorCodes : : CANNOT_PARSE_NUMBER | |
exception_code = = ErrorCodes : : UNKNOWN_ELEMENT_IN_AST | |
2019-08-02 05:07:10 +00:00
exception_code = = ErrorCodes : : UNKNOWN_TYPE_OF_AST_NODE | |
exception_code = = ErrorCodes : : TOO_DEEP_AST | |
exception_code = = ErrorCodes : : TOO_BIG_AST | |
2019-08-02 05:11:02 +00:00
exception_code = = ErrorCodes : : UNEXPECTED_AST_STRUCTURE | |
exception_code = = ErrorCodes : : SYNTAX_ERROR | |
exception_code = = ErrorCodes : : INCORRECT_DATA | |
2019-08-02 05:07:10 +00:00
exception_code = = ErrorCodes : : TYPE_MISMATCH )
return HTTPResponse : : HTTP_BAD_REQUEST ;
else if ( exception_code = = ErrorCodes : : UNKNOWN_TABLE | |
exception_code = = ErrorCodes : : UNKNOWN_FUNCTION | |
exception_code = = ErrorCodes : : UNKNOWN_IDENTIFIER | |
exception_code = = ErrorCodes : : UNKNOWN_TYPE | |
exception_code = = ErrorCodes : : UNKNOWN_STORAGE | |
exception_code = = ErrorCodes : : UNKNOWN_DATABASE | |
exception_code = = ErrorCodes : : UNKNOWN_SETTING | |
exception_code = = ErrorCodes : : UNKNOWN_DIRECTION_OF_SORTING | |
exception_code = = ErrorCodes : : UNKNOWN_AGGREGATE_FUNCTION | |
exception_code = = ErrorCodes : : UNKNOWN_FORMAT | |
2019-08-02 05:11:02 +00:00
exception_code = = ErrorCodes : : UNKNOWN_DATABASE_ENGINE | |
exception_code = = ErrorCodes : : UNKNOWN_TYPE_OF_QUERY )
2019-08-02 05:07:10 +00:00
return HTTPResponse : : HTTP_NOT_FOUND ;
else if ( exception_code = = ErrorCodes : : QUERY_IS_TOO_LARGE )
return HTTPResponse : : HTTP_REQUESTENTITYTOOLARGE ;
else if ( exception_code = = ErrorCodes : : NOT_IMPLEMENTED )
return HTTPResponse : : HTTP_NOT_IMPLEMENTED ;
else if ( exception_code = = ErrorCodes : : SOCKET_TIMEOUT | |
exception_code = = ErrorCodes : : CANNOT_OPEN_FILE )
return HTTPResponse : : HTTP_SERVICE_UNAVAILABLE ;
else if ( exception_code = = ErrorCodes : : HTTP_LENGTH_REQUIRED )
return HTTPResponse : : HTTP_LENGTH_REQUIRED ;
return HTTPResponse : : HTTP_INTERNAL_SERVER_ERROR ;
2017-02-06 08:23:02 +00:00
}
2017-06-02 18:48:33 +00:00
2017-08-24 14:51:13 +00:00
static std : : chrono : : steady_clock : : duration parseSessionTimeout (
const Poco : : Util : : AbstractConfiguration & config ,
const HTMLForm & params )
2017-06-02 18:48:33 +00:00
{
2017-06-02 21:01:17 +00:00
unsigned session_timeout = config . getInt ( " default_session_timeout " , 60 ) ;
2017-06-02 18:48:33 +00:00
if ( params . has ( " session_timeout " ) )
{
2017-06-02 21:01:17 +00:00
unsigned max_session_timeout = config . getUInt ( " max_session_timeout " , 3600 ) ;
std : : string session_timeout_str = params . get ( " session_timeout " ) ;
2017-06-02 18:48:33 +00:00
2017-06-27 15:58:33 +00:00
ReadBufferFromString buf ( session_timeout_str ) ;
if ( ! tryReadIntText ( session_timeout , buf ) | | ! buf . eof ( ) )
throw Exception ( " Invalid session timeout: ' " + session_timeout_str + " ' " , ErrorCodes : : INVALID_SESSION_TIMEOUT ) ;
2017-06-02 18:48:33 +00:00
2017-06-02 21:01:17 +00:00
if ( session_timeout > max_session_timeout )
throw Exception ( " Session timeout ' " + session_timeout_str + " ' is larger than max_session_timeout: " + toString ( max_session_timeout )
+ " . Maximum session timeout could be modified in configuration file. " ,
2017-06-02 18:48:33 +00:00
ErrorCodes : : INVALID_SESSION_TIMEOUT ) ;
}
return std : : chrono : : seconds ( session_timeout ) ;
}
2017-02-28 14:15:13 +00:00
void HTTPHandler : : pushDelayedResults ( Output & used_output )
{
2017-04-01 07:20:54 +00:00
std : : vector < WriteBufferPtr > write_buffers ;
std : : vector < ReadBufferPtr > read_buffers ;
std : : vector < ReadBuffer * > read_buffers_raw_ptr ;
auto cascade_buffer = typeid_cast < CascadeWriteBuffer * > ( used_output . out_maybe_delayed_and_compressed . get ( ) ) ;
if ( ! cascade_buffer )
throw Exception ( " Expected CascadeWriteBuffer " , ErrorCodes : : LOGICAL_ERROR ) ;
cascade_buffer - > getResultBuffers ( write_buffers ) ;
if ( write_buffers . empty ( ) )
throw Exception ( " At least one buffer is expected to overwrite result into HTTP response " , ErrorCodes : : LOGICAL_ERROR ) ;
for ( auto & write_buf : write_buffers )
{
IReadableWriteBuffer * write_buf_concrete ;
ReadBufferPtr reread_buf ;
if ( write_buf
& & ( write_buf_concrete = dynamic_cast < IReadableWriteBuffer * > ( write_buf . get ( ) ) )
& & ( reread_buf = write_buf_concrete - > tryGetReadBuffer ( ) ) )
{
read_buffers . emplace_back ( reread_buf ) ;
read_buffers_raw_ptr . emplace_back ( reread_buf . get ( ) ) ;
}
}
ConcatReadBuffer concat_read_buffer ( read_buffers_raw_ptr ) ;
copyData ( concat_read_buffer , * used_output . out_maybe_compressed ) ;
2017-02-28 14:15:13 +00:00
}
2017-08-09 11:57:09 +00:00
HTTPHandler : : HTTPHandler ( IServer & server_ )
2017-04-01 07:20:54 +00:00
: server ( server_ )
, log ( & Logger : : get ( " HTTPHandler " ) )
2017-02-06 08:23:02 +00:00
{
2018-03-08 07:36:58 +00:00
server_display_name = server . config ( ) . getString ( " display_name " , getFQDNOrHostName ( ) ) ;
2017-02-06 08:23:02 +00:00
}
2016-01-11 21:46:36 +00:00
2017-06-02 21:01:17 +00:00
2016-06-25 07:22:12 +00:00
void HTTPHandler : : processQuery (
2017-04-01 07:20:54 +00:00
Poco : : Net : : HTTPServerRequest & request ,
HTMLForm & params ,
Poco : : Net : : HTTPServerResponse & response ,
Output & used_output )
2012-03-09 03:06:09 +00:00
{
2018-06-19 20:30:35 +00:00
Context context = server . context ( ) ;
2018-09-07 23:22:02 +00:00
CurrentThread : : QueryScope query_scope ( context ) ;
2018-06-01 19:39:32 +00:00
2017-04-01 07:20:54 +00:00
LOG_TRACE ( log , " Request URI: " < < request . getURI ( ) ) ;
std : : istream & istr = request . stream ( ) ;
/// Part of the query can be passed in the 'query' parameter and the rest in the request body
/// (http method need not necessarily be POST). In this case the entire query consists of the
/// contents of the 'query' parameter, a line break and the request body.
std : : string query_param = params . get ( " query " , " " ) ;
if ( ! query_param . empty ( ) )
query_param + = ' \n ' ;
2017-09-21 11:16:18 +00:00
/// The user and password can be passed by headers (similar to X-Auth-*),
/// which is used by load balancers to pass authentication information.
std : : string user = request . get ( " X-ClickHouse-User " , " " ) ;
std : : string password = request . get ( " X-ClickHouse-Key " , " " ) ;
std : : string quota_key = request . get ( " X-ClickHouse-Quota " , " " ) ;
2017-04-01 07:20:54 +00:00
2017-09-21 11:16:18 +00:00
if ( user . empty ( ) & & password . empty ( ) & & quota_key . empty ( ) )
2017-04-01 07:20:54 +00:00
{
2017-09-21 11:16:18 +00:00
/// User name and password can be passed using query parameters
/// or using HTTP Basic auth (both methods are insecure).
if ( request . hasCredentials ( ) )
{
Poco : : Net : : HTTPBasicCredentials credentials ( request ) ;
user = credentials . getUsername ( ) ;
password = credentials . getPassword ( ) ;
2017-09-23 23:18:48 +00:00
}
else
{
2017-09-21 11:16:18 +00:00
user = params . get ( " user " , " default " ) ;
password = params . get ( " password " , " " ) ;
}
2017-04-01 07:20:54 +00:00
2017-09-21 11:16:18 +00:00
quota_key = params . get ( " quota_key " , " " ) ;
}
else
{
/// It is prohibited to mix different authorization schemes.
if ( request . hasCredentials ( )
| | params . has ( " user " )
| | params . has ( " password " )
| | params . has ( " quota_key " ) )
{
2017-09-23 23:20:58 +00:00
throw Exception ( " Invalid authentication: it is not allowed to use X-ClickHouse HTTP headers and other authentication methods simultaneously " , ErrorCodes : : REQUIRED_PASSWORD ) ;
2017-09-21 11:16:18 +00:00
}
2017-04-01 07:20:54 +00:00
}
std : : string query_id = params . get ( " query_id " , " " ) ;
context . setUser ( user , password , request . clientAddress ( ) , quota_key ) ;
context . setCurrentQueryId ( query_id ) ;
2017-06-02 21:01:17 +00:00
/// The user could specify session identifier and session timeout.
/// It allows to modify settings, create temporary tables and reuse them in subsequent requests.
2017-06-02 18:48:33 +00:00
std : : shared_ptr < Context > session ;
String session_id ;
std : : chrono : : steady_clock : : duration session_timeout ;
bool session_is_set = params . has ( " session_id " ) ;
2018-06-19 20:30:35 +00:00
const auto & config = server . config ( ) ;
2017-06-02 18:48:33 +00:00
if ( session_is_set )
{
session_id = params . get ( " session_id " ) ;
2017-09-08 11:57:43 +00:00
session_timeout = parseSessionTimeout ( config , params ) ;
2017-06-02 18:48:33 +00:00
std : : string session_check = params . get ( " session_check " , " " ) ;
session = context . acquireSession ( session_id , session_timeout , session_check = = " 1 " ) ;
context = * session ;
2019-07-08 19:41:11 +00:00
context . setSessionContext ( * session ) ;
2017-06-02 18:48:33 +00:00
}
SCOPE_EXIT ( {
if ( session_is_set )
session - > releaseSession ( session_id , session_timeout ) ;
} ) ;
2017-04-01 07:20:54 +00:00
/// The client can pass a HTTP header indicating supported compression method (gzip or deflate).
String http_response_compression_methods = request . get ( " Accept-Encoding " , " " ) ;
bool client_supports_http_compression = false ;
2019-02-13 20:54:12 +00:00
CompressionMethod http_response_compression_method { } ;
2017-04-01 07:20:54 +00:00
if ( ! http_response_compression_methods . empty ( ) )
{
/// Both gzip and deflate are supported. If the client supports both, gzip is preferred.
/// NOTE parsing of the list of methods is slightly incorrect.
if ( std : : string : : npos ! = http_response_compression_methods . find ( " gzip " ) )
{
client_supports_http_compression = true ;
2019-02-13 20:54:12 +00:00
http_response_compression_method = CompressionMethod : : Gzip ;
2017-04-01 07:20:54 +00:00
}
else if ( std : : string : : npos ! = http_response_compression_methods . find ( " deflate " ) )
{
client_supports_http_compression = true ;
2019-02-13 20:54:12 +00:00
http_response_compression_method = CompressionMethod : : Zlib ;
}
2019-06-03 20:27:53 +00:00
# if USE_BROTLI
2019-03-19 20:18:41 +00:00
else if ( http_response_compression_methods = = " br " )
{
2019-02-13 20:54:12 +00:00
client_supports_http_compression = true ;
2019-03-19 20:18:41 +00:00
http_response_compression_method = CompressionMethod : : Brotli ;
2017-04-01 07:20:54 +00:00
}
2019-06-03 20:27:53 +00:00
# endif
2017-04-01 07:20:54 +00:00
}
/// Client can pass a 'compress' flag in the query string. In this case the query result is
/// compressed using internal algorithm. This is not reflected in HTTP headers.
bool internal_compression = params . getParsed < bool > ( " compress " , false ) ;
/// At least, we should postpone sending of first buffer_size result bytes
size_t buffer_size_total = std : : max (
params . getParsed < size_t > ( " buffer_size " , DBMS_DEFAULT_BUFFER_SIZE ) , static_cast < size_t > ( DBMS_DEFAULT_BUFFER_SIZE ) ) ;
/// If it is specified, the whole result will be buffered.
/// First ~buffer_size bytes will be buffered in memory, the remaining bytes will be stored in temporary file.
bool buffer_until_eof = params . getParsed < bool > ( " wait_end_of_query " , false ) ;
size_t buffer_size_http = DBMS_DEFAULT_BUFFER_SIZE ;
size_t buffer_size_memory = ( buffer_size_total > buffer_size_http ) ? buffer_size_total : 0 ;
2017-09-08 16:41:35 +00:00
unsigned keep_alive_timeout = config . getUInt ( " keep_alive_timeout " , 10 ) ;
2017-09-08 11:57:43 +00:00
2017-04-01 07:20:54 +00:00
used_output . out = std : : make_shared < WriteBufferFromHTTPServerResponse > (
2017-09-08 11:57:43 +00:00
request , response , keep_alive_timeout ,
client_supports_http_compression , http_response_compression_method , buffer_size_http ) ;
2017-04-01 07:20:54 +00:00
if ( internal_compression )
used_output . out_maybe_compressed = std : : make_shared < CompressedWriteBuffer > ( * used_output . out ) ;
else
used_output . out_maybe_compressed = used_output . out ;
if ( buffer_size_memory > 0 | | buffer_until_eof )
{
CascadeWriteBuffer : : WriteBufferPtrs cascade_buffer1 ;
CascadeWriteBuffer : : WriteBufferConstructors cascade_buffer2 ;
if ( buffer_size_memory > 0 )
cascade_buffer1 . emplace_back ( std : : make_shared < MemoryWriteBuffer > ( buffer_size_memory ) ) ;
if ( buffer_until_eof )
{
std : : string tmp_path_template = context . getTemporaryPath ( ) + " http_buffers/ " ;
auto create_tmp_disk_buffer = [ tmp_path_template ] ( const WriteBufferPtr & )
{
return WriteBufferFromTemporaryFile : : create ( tmp_path_template ) ;
} ;
cascade_buffer2 . emplace_back ( std : : move ( create_tmp_disk_buffer ) ) ;
}
else
{
auto push_memory_buffer_and_continue = [ next_buffer = used_output . out_maybe_compressed ] ( const WriteBufferPtr & prev_buf )
{
auto prev_memory_buffer = typeid_cast < MemoryWriteBuffer * > ( prev_buf . get ( ) ) ;
if ( ! prev_memory_buffer )
throw Exception ( " Expected MemoryWriteBuffer " , ErrorCodes : : LOGICAL_ERROR ) ;
auto rdbuf = prev_memory_buffer - > tryGetReadBuffer ( ) ;
copyData ( * rdbuf , * next_buffer ) ;
return next_buffer ;
} ;
cascade_buffer2 . emplace_back ( push_memory_buffer_and_continue ) ;
}
used_output . out_maybe_delayed_and_compressed = std : : make_shared < CascadeWriteBuffer > (
std : : move ( cascade_buffer1 ) , std : : move ( cascade_buffer2 ) ) ;
}
else
{
used_output . out_maybe_delayed_and_compressed = used_output . out_maybe_compressed ;
}
std : : unique_ptr < ReadBuffer > in_param = std : : make_unique < ReadBufferFromString > ( query_param ) ;
2017-09-25 19:45:15 +00:00
std : : unique_ptr < ReadBuffer > in_post_raw = std : : make_unique < ReadBufferFromIStream > ( istr ) ;
2017-04-01 07:20:54 +00:00
/// Request body can be compressed using algorithm specified in the Content-Encoding header.
std : : unique_ptr < ReadBuffer > in_post ;
String http_request_compression_method_str = request . get ( " Content-Encoding " , " " ) ;
if ( ! http_request_compression_method_str . empty ( ) )
{
2019-02-05 23:12:31 +00:00
if ( http_request_compression_method_str = = " gzip " )
{
2019-02-13 20:54:12 +00:00
in_post = std : : make_unique < ZlibInflatingReadBuffer > ( * in_post_raw , CompressionMethod : : Gzip ) ;
2019-02-05 23:12:31 +00:00
}
else if ( http_request_compression_method_str = = " deflate " )
{
2019-02-13 20:54:12 +00:00
in_post = std : : make_unique < ZlibInflatingReadBuffer > ( * in_post_raw , CompressionMethod : : Zlib ) ;
2019-02-05 23:12:31 +00:00
}
2019-02-15 11:46:07 +00:00
# if USE_BROTLI
2019-02-05 23:12:31 +00:00
else if ( http_request_compression_method_str = = " br " )
{
in_post = std : : make_unique < BrotliReadBuffer > ( * in_post_raw ) ;
}
2019-02-15 11:46:07 +00:00
# endif
2019-02-05 23:12:31 +00:00
else
{
2017-04-01 07:20:54 +00:00
throw Exception ( " Unknown Content-Encoding of HTTP request: " + http_request_compression_method_str ,
2019-02-05 23:12:31 +00:00
ErrorCodes : : UNKNOWN_COMPRESSION_METHOD ) ;
}
2017-04-01 07:20:54 +00:00
}
else
in_post = std : : move ( in_post_raw ) ;
/// The data can also be compressed using incompatible internal algorithm. This is indicated by
/// 'decompress' query parameter.
std : : unique_ptr < ReadBuffer > in_post_maybe_compressed ;
bool in_post_compressed = false ;
if ( params . getParsed < bool > ( " decompress " , false ) )
{
in_post_maybe_compressed = std : : make_unique < CompressedReadBuffer > ( * in_post ) ;
in_post_compressed = true ;
}
else
in_post_maybe_compressed = std : : move ( in_post ) ;
std : : unique_ptr < ReadBuffer > in ;
2018-06-01 15:32:27 +00:00
static const NameSet reserved_param_names { " query " , " compress " , " decompress " , " user " , " password " , " quota_key " , " query_id " , " stacktrace " ,
2018-06-28 17:47:25 +00:00
" buffer_size " , " wait_end_of_query " , " session_id " , " session_timeout " , " session_check " } ;
2018-06-01 15:32:27 +00:00
Names reserved_param_suffixes ;
auto param_could_be_skipped = [ & ] ( const String & name )
{
if ( reserved_param_names . count ( name ) )
return true ;
for ( const String & suffix : reserved_param_suffixes )
{
if ( endsWith ( name , suffix ) )
return true ;
}
return false ;
} ;
2017-04-01 07:20:54 +00:00
/// Settings can be overridden in the query.
/// Some parameters (database, default_format, everything used in the code above) do not
/// belong to the Settings class.
/// 'readonly' setting values mean:
/// readonly = 0 - any query is allowed, client can change any setting.
/// readonly = 1 - only readonly queries are allowed, client can't change settings.
/// readonly = 2 - only readonly queries are allowed, client can change any setting except 'readonly'.
/// In theory if initially readonly = 0, the client can change any setting and then set readonly
/// to some other value.
2018-03-11 00:15:26 +00:00
auto & settings = context . getSettingsRef ( ) ;
2017-04-01 07:20:54 +00:00
/// Only readonly queries are allowed for HTTP GET requests.
if ( request . getMethod ( ) = = Poco : : Net : : HTTPServerRequest : : HTTP_GET )
{
2018-03-11 00:15:26 +00:00
if ( settings . readonly = = 0 )
settings . readonly = 2 ;
2017-04-01 07:20:54 +00:00
}
2019-06-15 18:25:27 +00:00
bool has_external_data = startsWith ( request . getContentType ( ) . data ( ) , " multipart/form-data " ) ;
2019-05-28 21:31:19 +00:00
2019-06-15 18:25:27 +00:00
if ( has_external_data )
2019-05-28 21:31:19 +00:00
{
/// Skip unneeded parameters to avoid confusing them later with context settings or query parameters.
reserved_param_suffixes . reserve ( 3 ) ;
/// It is a bug and ambiguity with `date_time_input_format` and `low_cardinality_allow_in_native_format` formats/settings.
reserved_param_suffixes . emplace_back ( " _format " ) ;
reserved_param_suffixes . emplace_back ( " _types " ) ;
reserved_param_suffixes . emplace_back ( " _structure " ) ;
}
2019-04-18 23:29:32 +00:00
SettingsChanges settings_changes ;
2019-05-29 00:23:47 +00:00
for ( const auto & [ key , value ] : params )
2017-04-01 07:20:54 +00:00
{
2019-05-28 21:31:19 +00:00
if ( key = = " database " )
2017-04-01 07:20:54 +00:00
{
2019-05-28 21:31:19 +00:00
context . setCurrentDatabase ( value ) ;
2017-04-01 07:20:54 +00:00
}
2019-05-28 21:31:19 +00:00
else if ( key = = " default_format " )
2017-04-01 07:20:54 +00:00
{
2019-05-28 21:31:19 +00:00
context . setDefaultFormat ( value ) ;
2017-04-01 07:20:54 +00:00
}
2019-05-28 21:31:19 +00:00
else if ( param_could_be_skipped ( key ) )
2017-04-01 07:20:54 +00:00
{
}
2019-06-04 20:15:44 +00:00
else if ( startsWith ( key , " param_ " ) )
2019-05-18 21:07:23 +00:00
{
/// Save name and values of substitution in dictionary.
2019-06-04 20:15:44 +00:00
const String parameter_name = key . substr ( strlen ( " param_ " ) ) ;
2019-06-15 17:52:53 +00:00
context . setQueryParameter ( parameter_name , value ) ;
2019-05-18 21:07:23 +00:00
}
2017-04-01 07:20:54 +00:00
else
{
/// All other query parameters are treated as settings.
2019-05-28 21:31:19 +00:00
settings_changes . push_back ( { key , value } ) ;
2017-04-01 07:20:54 +00:00
}
}
2019-05-28 21:31:19 +00:00
/// For external data we also want settings
2019-04-18 23:29:32 +00:00
context . checkSettingsConstraints ( settings_changes ) ;
context . applySettingsChanges ( settings_changes ) ;
2019-05-28 21:31:19 +00:00
/// Used in case of POST request with form-data, but it isn't expected to be deleted after that scope.
std : : string full_query ;
/// Support for "external data for query processing".
2019-06-15 18:25:27 +00:00
if ( has_external_data )
2019-05-28 21:31:19 +00:00
{
ExternalTablesHandler handler ( context , params ) ;
params . load ( request , istr , handler ) ;
/// Params are of both form params POST and uri (GET params)
for ( const auto & it : params )
if ( it . first = = " query " )
full_query + = it . second ;
in = std : : make_unique < ReadBufferFromString > ( full_query ) ;
}
else
in = std : : make_unique < ConcatReadBuffer > ( * in_param , * in_post_maybe_compressed ) ;
2017-04-01 07:20:54 +00:00
/// HTTP response compression is turned on only if the client signalled that they support it
/// (using Accept-Encoding header) and 'enable_http_compression' setting is turned on.
used_output . out - > setCompression ( client_supports_http_compression & & settings . enable_http_compression ) ;
if ( client_supports_http_compression )
used_output . out - > setCompressionLevel ( settings . http_zlib_compression_level ) ;
used_output . out - > setSendProgressInterval ( settings . http_headers_progress_interval_ms ) ;
/// If 'http_native_compression_disable_checksumming_on_decompress' setting is turned on,
/// checksums of client data compressed with internal algorithm are not checked.
if ( in_post_compressed & & settings . http_native_compression_disable_checksumming_on_decompress )
static_cast < CompressedReadBuffer & > ( * in_post_maybe_compressed ) . disableChecksumming ( ) ;
/// Add CORS header if 'add_http_cors_header' setting is turned on and the client passed
/// Origin header.
used_output . out - > addHeaderCORS ( settings . add_http_cors_header & & ! request . get ( " Origin " , " " ) . empty ( ) ) ;
ClientInfo & client_info = context . getClientInfo ( ) ;
client_info . query_kind = ClientInfo : : QueryKind : : INITIAL_QUERY ;
client_info . interface = ClientInfo : : Interface : : HTTP ;
/// Query sent through HTTP interface is initial.
client_info . initial_user = client_info . current_user ;
client_info . initial_query_id = client_info . current_query_id ;
client_info . initial_address = client_info . current_address ;
ClientInfo : : HTTPMethod http_method = ClientInfo : : HTTPMethod : : UNKNOWN ;
if ( request . getMethod ( ) = = Poco : : Net : : HTTPServerRequest : : HTTP_GET )
http_method = ClientInfo : : HTTPMethod : : GET ;
else if ( request . getMethod ( ) = = Poco : : Net : : HTTPServerRequest : : HTTP_POST )
http_method = ClientInfo : : HTTPMethod : : POST ;
client_info . http_method = http_method ;
client_info . http_user_agent = request . get ( " User-Agent " , " " ) ;
2019-02-01 01:48:25 +00:00
auto appendCallback = [ & context ] ( ProgressCallback callback )
{
auto prev = context . getProgressCallback ( ) ;
context . setProgressCallback ( [ prev , callback ] ( const Progress & progress )
{
if ( prev )
prev ( progress ) ;
callback ( progress ) ;
} ) ;
} ;
2017-04-01 07:20:54 +00:00
/// While still no data has been sent, we will report about query execution progress by sending HTTP headers.
if ( settings . send_progress_in_http_headers )
2019-02-01 01:48:25 +00:00
appendCallback ( [ & used_output ] ( const Progress & progress ) { used_output . out - > onProgress ( progress ) ; } ) ;
if ( settings . readonly > 0 & & settings . cancel_http_readonly_queries_on_client_close )
{
Poco : : Net : : StreamSocket & socket = dynamic_cast < Poco : : Net : : HTTPServerRequestImpl & > ( request ) . socket ( ) ;
appendCallback ( [ & context , & socket ] ( const Progress & )
{
/// Assume that at the point this method is called no one is reading data from the socket any more.
/// True for read-only queries.
2019-02-03 20:40:34 +00:00
try
{
2019-02-01 01:48:25 +00:00
char b ;
int status = socket . receiveBytes ( & b , 1 , MSG_DONTWAIT | MSG_PEEK ) ;
if ( status = = 0 )
context . killCurrentQuery ( ) ;
}
2019-02-03 20:40:34 +00:00
catch ( Poco : : TimeoutException & )
{
}
2019-02-01 01:48:25 +00:00
catch ( . . . )
{
context . killCurrentQuery ( ) ;
}
} ) ;
}
2017-04-01 07:20:54 +00:00
2019-03-06 16:41:35 +00:00
customizeContext ( context ) ;
2017-04-01 07:20:54 +00:00
executeQuery ( * in , * used_output . out_maybe_delayed_and_compressed , /* allow_into_outfile = */ false , context ,
2019-02-02 12:24:26 +00:00
[ & response ] ( const String & content_type ) { response . setContentType ( content_type ) ; } ,
2019-04-10 20:13:41 +00:00
[ & response ] ( const String & current_query_id ) { response . add ( " X-ClickHouse-Query-Id " , current_query_id ) ; } ) ;
2017-04-01 07:20:54 +00:00
if ( used_output . hasDelayed ( ) )
{
/// TODO: set Content-Length if possible
pushDelayedResults ( used_output ) ;
}
/// Send HTTP headers with code 200 if no exception happened and the data is still not sent to
/// the client.
used_output . out - > finalize ( ) ;
2012-03-09 03:06:09 +00:00
}
2017-01-17 17:21:48 +00:00
void HTTPHandler : : trySendExceptionToClient ( const std : : string & s , int exception_code ,
2017-04-01 07:20:54 +00:00
Poco : : Net : : HTTPServerRequest & request , Poco : : Net : : HTTPServerResponse & response ,
Output & used_output )
2012-03-09 03:06:09 +00:00
{
2017-04-01 07:20:54 +00:00
try
{
/// If HTTP method is POST and Keep-Alive is turned on, we should read the whole request body
/// to avoid reading part of the current request body in the next request.
if ( request . getMethod ( ) = = Poco : : Net : : HTTPRequest : : HTTP_POST
& & response . getKeepAlive ( )
2017-09-25 19:45:15 +00:00
& & ! request . stream ( ) . eof ( )
& & exception_code ! = ErrorCodes : : HTTP_LENGTH_REQUIRED )
2017-04-01 07:20:54 +00:00
{
request . stream ( ) . ignore ( std : : numeric_limits < std : : streamsize > : : max ( ) ) ;
}
bool auth_fail = exception_code = = ErrorCodes : : UNKNOWN_USER | |
exception_code = = ErrorCodes : : WRONG_PASSWORD | |
exception_code = = ErrorCodes : : REQUIRED_PASSWORD ;
if ( auth_fail )
{
response . requireAuthentication ( " ClickHouse server HTTP API " ) ;
}
else
{
response . setStatusAndReason ( exceptionCodeToHTTPStatus ( exception_code ) ) ;
}
if ( ! response . sent ( ) & & ! used_output . out_maybe_compressed )
{
/// If nothing was sent yet and we don't even know if we must compress the response.
response . send ( ) < < s < < std : : endl ;
}
else if ( used_output . out_maybe_compressed )
{
/// Destroy CascadeBuffer to actualize buffers' positions and reset extra references
if ( used_output . hasDelayed ( ) )
used_output . out_maybe_delayed_and_compressed . reset ( ) ;
/// Send the error message into already used (and possibly compressed) stream.
/// Note that the error message will possibly be sent after some data.
/// Also HTTP code 200 could have already been sent.
/// If buffer has data, and that data wasn't sent yet, then no need to send that data
bool data_sent = used_output . out - > count ( ) ! = used_output . out - > offset ( ) ;
if ( ! data_sent )
{
used_output . out_maybe_compressed - > position ( ) = used_output . out_maybe_compressed - > buffer ( ) . begin ( ) ;
used_output . out - > position ( ) = used_output . out - > buffer ( ) . begin ( ) ;
}
writeString ( s , * used_output . out_maybe_compressed ) ;
writeChar ( ' \n ' , * used_output . out_maybe_compressed ) ;
used_output . out_maybe_compressed - > next ( ) ;
used_output . out - > next ( ) ;
used_output . out - > finalize ( ) ;
}
}
catch ( . . . )
{
tryLogCurrentException ( log , " Cannot send exception to client " ) ;
}
2014-07-14 19:46:06 +00:00
}
2012-06-25 05:07:34 +00:00
2017-01-17 17:21:48 +00:00
2014-07-14 19:46:06 +00:00
void HTTPHandler : : handleRequest ( Poco : : Net : : HTTPServerRequest & request , Poco : : Net : : HTTPServerResponse & response )
{
2018-08-31 00:59:48 +00:00
setThreadName ( " HTTPHandler " ) ;
2019-01-15 18:39:54 +00:00
ThreadStatus thread_status ;
2018-08-31 00:59:48 +00:00
2017-04-01 07:20:54 +00:00
Output used_output ;
2014-08-04 19:48:50 +00:00
2017-04-01 07:20:54 +00:00
/// In case of exception, send stack trace to client.
bool with_stacktrace = false ;
2016-06-25 07:22:12 +00:00
2017-04-01 07:20:54 +00:00
try
{
response . setContentType ( " text/plain; charset=UTF-8 " ) ;
2018-03-08 07:36:58 +00:00
response . set ( " X-ClickHouse-Server-Display-Name " , server_display_name ) ;
2017-04-01 07:20:54 +00:00
/// For keep-alive to work.
if ( request . getVersion ( ) = = Poco : : Net : : HTTPServerRequest : : HTTP_1_1 )
response . setChunkedTransferEncoding ( true ) ;
2014-07-14 19:46:06 +00:00
2017-04-01 07:20:54 +00:00
HTMLForm params ( request ) ;
with_stacktrace = params . getParsed < bool > ( " stacktrace " , false ) ;
2016-06-25 07:22:12 +00:00
2017-09-25 19:45:15 +00:00
/// Workaround. Poco does not detect 411 Length Required case.
if ( request . getMethod ( ) = = Poco : : Net : : HTTPRequest : : HTTP_POST & & ! request . getChunkedTransferEncoding ( ) & &
! request . hasContentLength ( ) )
{
throw Exception ( " There is neither Transfer-Encoding header nor Content-Length header " , ErrorCodes : : HTTP_LENGTH_REQUIRED ) ;
}
2017-04-01 07:20:54 +00:00
processQuery ( request , params , response , used_output ) ;
LOG_INFO ( log , " Done processing query " ) ;
}
catch ( . . . )
{
tryLogCurrentException ( log ) ;
2016-06-25 07:22:12 +00:00
2017-04-01 07:20:54 +00:00
/** If exception is received from remote server, then stack trace is embedded in message.
* If exception is thrown on local server , then stack trace is in separate field .
*/
std : : string exception_message = getCurrentExceptionMessage ( with_stacktrace , true ) ;
int exception_code = getCurrentExceptionCode ( ) ;
2016-06-25 07:22:12 +00:00
2017-04-01 07:20:54 +00:00
trySendExceptionToClient ( exception_message , exception_code , request , response , used_output ) ;
}
2012-03-09 03:06:09 +00:00
}
}