2018-06-11 12:13:00 +00:00
# include <Storages/StorageURL.h>
# include <Interpreters/Context.h>
# include <Interpreters/evaluateConstantExpression.h>
2020-11-05 11:28:20 +00:00
# include <Parsers/ASTCreateQuery.h>
2018-06-11 12:13:00 +00:00
# include <Parsers/ASTLiteral.h>
2021-10-26 09:31:01 +00:00
# include <Parsers/ASTInsertQuery.h>
2018-06-11 12:13:00 +00:00
2019-11-20 14:48:01 +00:00
# include <IO/ReadHelpers.h>
2018-06-14 21:20:39 +00:00
# include <IO/WriteBufferFromHTTP.h>
2019-11-20 14:48:01 +00:00
# include <IO/WriteHelpers.h>
2020-12-10 22:05:02 +00:00
# include <IO/ConnectionTimeouts.h>
# include <IO/ConnectionTimeoutsContext.h>
2018-06-11 12:13:00 +00:00
2018-06-13 07:36:47 +00:00
# include <Formats/FormatFactory.h>
2021-10-11 16:11:50 +00:00
# include <Processors/Formats/IOutputFormat.h>
2021-10-13 18:22:02 +00:00
# include <Processors/Formats/IInputFormat.h>
2018-06-13 07:36:47 +00:00
2021-10-26 09:31:01 +00:00
# include <Common/parseRemoteDescription.h>
2021-07-21 16:13:17 +00:00
# include <Processors/Transforms/AddingDefaultsTransform.h>
2021-10-26 09:31:01 +00:00
# include <Storages/PartitionedSink.h>
2018-06-11 12:13:00 +00:00
# include <Poco/Net/HTTPRequest.h>
2020-02-17 15:01:03 +00:00
# include <Processors/Sources/SourceWithProgress.h>
2021-10-16 14:03:50 +00:00
# include <QueryPipeline/QueryPipelineBuilder.h>
2021-07-20 18:18:43 +00:00
# include <Processors/Executors/PullingPipelineExecutor.h>
2021-10-02 07:13:14 +00:00
# include <base/logger_useful.h>
2021-04-21 14:36:04 +00:00
# include <algorithm>
2018-06-11 12:13:00 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH ;
2021-04-21 14:36:04 +00:00
extern const int NETWORK_ERROR ;
2021-09-10 11:11:52 +00:00
extern const int BAD_ARGUMENTS ;
2018-08-10 04:02:56 +00:00
}
2018-06-11 12:13:00 +00:00
2021-09-10 11:11:52 +00:00
2019-08-24 21:20:20 +00:00
IStorageURLBase : : IStorageURLBase (
2021-10-26 09:31:01 +00:00
const String & uri_ ,
2021-04-10 23:33:54 +00:00
ContextPtr /*context_*/ ,
2019-12-04 16:06:55 +00:00
const StorageID & table_id_ ,
2018-06-11 12:13:00 +00:00
const String & format_name_ ,
2020-11-07 08:53:39 +00:00
const std : : optional < FormatSettings > & format_settings_ ,
2019-08-24 21:20:20 +00:00
const ColumnsDescription & columns_ ,
2019-11-19 12:46:07 +00:00
const ConstraintsDescription & constraints_ ,
2021-04-23 12:18:23 +00:00
const String & comment ,
2021-09-07 11:17:25 +00:00
const String & compression_method_ ,
2021-10-26 09:31:01 +00:00
const ReadWriteBufferFromHTTP : : HTTPHeaderEntries & headers_ ,
2021-10-28 12:44:12 +00:00
const String & http_method_ ,
2021-10-26 12:22:13 +00:00
ASTPtr partition_by_ )
2021-10-26 09:31:01 +00:00
: IStorage ( table_id_ )
, uri ( uri_ )
, compression_method ( compression_method_ )
, format_name ( format_name_ )
, format_settings ( format_settings_ )
, headers ( headers_ )
2021-10-28 12:44:12 +00:00
, http_method ( http_method_ )
2021-10-26 12:22:13 +00:00
, partition_by ( partition_by_ )
2018-06-11 12:13:00 +00:00
{
2020-06-19 15:39:41 +00:00
StorageInMemoryMetadata storage_metadata ;
storage_metadata . setColumns ( columns_ ) ;
storage_metadata . setConstraints ( constraints_ ) ;
2021-04-23 12:18:23 +00:00
storage_metadata . setComment ( comment ) ;
2020-06-19 15:39:41 +00:00
setInMemoryMetadata ( storage_metadata ) ;
2018-06-11 12:13:00 +00:00
}
namespace
{
2021-10-03 13:53:24 +00:00
ReadWriteBufferFromHTTP : : HTTPHeaderEntries getHeaders (
const ReadWriteBufferFromHTTP : : HTTPHeaderEntries & headers_ )
{
ReadWriteBufferFromHTTP : : HTTPHeaderEntries headers ( headers_ . begin ( ) , headers_ . end ( ) ) ;
// Propagate OpenTelemetry trace context, if any, downstream.
if ( CurrentThread : : isInitialized ( ) )
{
const auto & thread_trace_context = CurrentThread : : get ( ) . thread_trace_context ;
if ( thread_trace_context . trace_id ! = UUID ( ) )
{
headers . emplace_back ( " traceparent " ,
thread_trace_context . composeTraceparentHeader ( ) ) ;
if ( ! thread_trace_context . tracestate . empty ( ) )
{
headers . emplace_back ( " tracestate " ,
thread_trace_context . tracestate ) ;
}
}
}
return headers ;
}
2021-10-26 09:31:01 +00:00
2020-02-17 15:01:03 +00:00
class StorageURLSource : public SourceWithProgress
2018-06-11 12:13:00 +00:00
{
2021-10-03 13:53:24 +00:00
using URIParams = std : : vector < std : : pair < String , String > > ;
2018-06-11 12:13:00 +00:00
public :
2021-10-03 13:53:24 +00:00
StorageURLSource (
2021-10-26 09:31:01 +00:00
const std : : vector < String > & uri_options ,
2021-10-28 12:44:12 +00:00
const std : : string & http_method ,
2018-08-09 18:49:05 +00:00
std : : function < void ( std : : ostream & ) > callback ,
2018-06-11 12:13:00 +00:00
const String & format ,
2020-11-07 08:53:39 +00:00
const std : : optional < FormatSettings > & format_settings ,
2020-02-17 15:01:03 +00:00
String name_ ,
2018-06-11 12:13:00 +00:00
const Block & sample_block ,
2021-04-10 23:33:54 +00:00
ContextPtr context ,
2020-10-02 12:38:50 +00:00
const ColumnsDescription & columns ,
2019-02-10 16:55:12 +00:00
UInt64 max_block_size ,
2019-11-19 12:46:07 +00:00
const ConnectionTimeouts & timeouts ,
2021-10-03 13:53:24 +00:00
const String & compression_method ,
const ReadWriteBufferFromHTTP : : HTTPHeaderEntries & headers_ = { } ,
const URIParams & params = { } )
2020-02-17 15:01:03 +00:00
: SourceWithProgress ( sample_block ) , name ( std : : move ( name_ ) )
2018-06-11 12:13:00 +00:00
{
2021-10-03 13:53:24 +00:00
auto headers = getHeaders ( headers_ ) ;
/// Lazy initialization. We should not perform requests in constructor, because we need to do it in query pipeline.
initialize = [ = , this ]
2020-08-28 01:21:08 +00:00
{
2021-10-03 13:53:24 +00:00
WriteBufferFromOwnString error_message ;
for ( auto option = uri_options . begin ( ) ; option < uri_options . end ( ) ; + + option )
2020-08-28 01:21:08 +00:00
{
2021-10-26 09:31:01 +00:00
auto request_uri = Poco : : URI ( * option ) ;
2021-10-03 13:53:24 +00:00
for ( const auto & [ param , value ] : params )
request_uri . addQueryParameter ( param , value ) ;
2020-11-18 17:43:18 +00:00
2021-10-03 13:53:24 +00:00
try
2020-11-18 17:43:18 +00:00
{
2021-11-30 12:46:54 +00:00
std : : string user_info = request_uri . getUserInfo ( ) ;
if ( ! user_info . empty ( ) )
2021-11-22 09:59:30 +00:00
{
2021-11-30 12:46:54 +00:00
std : : size_t n = user_info . find ( ' : ' ) ;
2021-11-30 07:07:18 +00:00
if ( n ! = std : : string : : npos )
2021-11-25 15:03:01 +00:00
{
2021-11-30 12:46:54 +00:00
credentials . setUsername ( user_info . substr ( 0 , n ) ) ;
credentials . setPassword ( user_info . substr ( n + 1 ) ) ;
2021-11-25 15:03:01 +00:00
}
2021-11-22 09:59:30 +00:00
}
2021-10-03 13:53:24 +00:00
read_buf = wrapReadBufferWithCompressionMethod (
std : : make_unique < ReadWriteBufferFromHTTP > (
request_uri ,
2021-10-28 12:44:12 +00:00
http_method ,
2021-10-03 16:49:02 +00:00
callback ,
2021-10-03 13:53:24 +00:00
timeouts ,
2021-10-27 18:30:25 +00:00
credentials ,
2021-10-28 10:28:05 +00:00
context - > getSettingsRef ( ) . max_http_get_redirects ,
2021-10-03 13:53:24 +00:00
DBMS_DEFAULT_BUFFER_SIZE ,
2021-10-07 13:39:54 +00:00
context - > getReadSettings ( ) ,
2021-10-03 13:53:24 +00:00
headers ,
2021-10-29 09:24:53 +00:00
ReadWriteBufferFromHTTP : : Range { } ,
2021-10-03 13:53:24 +00:00
context - > getRemoteHostFilter ( ) ) ,
chooseCompressionMethod ( request_uri . getPath ( ) , compression_method ) ) ;
2020-11-18 17:43:18 +00:00
}
2021-10-03 13:53:24 +00:00
catch ( . . . )
{
if ( uri_options . size ( ) = = 1 )
throw ;
2020-08-28 01:21:08 +00:00
2021-10-03 13:53:24 +00:00
if ( option = = uri_options . end ( ) - 1 )
throw Exception ( ErrorCodes : : NETWORK_ERROR , " All uri options are unreachable. {} " , error_message . str ( ) ) ;
2021-10-26 09:31:01 +00:00
error_message < < * option < < " error: " < < getCurrentExceptionMessage ( false ) < < " \n " ;
2021-10-03 13:53:24 +00:00
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
}
}
2021-10-03 04:28:28 +00:00
auto input_format = FormatFactory : : instance ( ) . getInput ( format , * read_buf , sample_block , context , max_block_size , format_settings ) ;
QueryPipelineBuilder builder ;
builder . init ( Pipe ( input_format ) ) ;
builder . addSimpleTransform ( [ & ] ( const Block & cur_header )
{
return std : : make_shared < AddingDefaultsTransform > ( cur_header , columns , * input_format , context ) ;
} ) ;
2021-07-20 18:18:43 +00:00
2021-10-03 04:28:28 +00:00
pipeline = std : : make_unique < QueryPipeline > ( QueryPipelineBuilder : : getPipeline ( std : : move ( builder ) ) ) ;
reader = std : : make_unique < PullingPipelineExecutor > ( * pipeline ) ;
} ;
2018-06-11 12:13:00 +00:00
}
String getName ( ) const override
{
return name ;
}
2020-02-17 15:01:03 +00:00
Chunk generate ( ) override
2018-06-11 12:13:00 +00:00
{
2021-10-03 04:28:28 +00:00
if ( initialize )
{
initialize ( ) ;
initialize = { } ;
}
2020-02-17 15:01:03 +00:00
if ( ! reader )
return { } ;
2018-06-11 12:13:00 +00:00
2021-07-20 18:18:43 +00:00
Chunk chunk ;
if ( reader - > pull ( chunk ) )
return chunk ;
2018-06-11 12:13:00 +00:00
2021-07-20 18:18:43 +00:00
pipeline - > reset ( ) ;
2020-02-17 15:01:03 +00:00
reader . reset ( ) ;
return { } ;
2018-06-11 12:13:00 +00:00
}
private :
2021-10-03 04:28:28 +00:00
std : : function < void ( ) > initialize ;
2018-06-11 12:13:00 +00:00
String name ;
2019-11-19 12:46:07 +00:00
std : : unique_ptr < ReadBuffer > read_buf ;
2021-09-16 17:40:42 +00:00
std : : unique_ptr < QueryPipeline > pipeline ;
2021-07-20 18:18:43 +00:00
std : : unique_ptr < PullingPipelineExecutor > reader ;
2021-10-27 18:30:25 +00:00
Poco : : Net : : HTTPBasicCredentials credentials { } ;
2018-06-11 12:13:00 +00:00
} ;
}
2018-06-16 05:54:06 +00:00
2021-07-23 14:25:35 +00:00
StorageURLSink : : StorageURLSink (
2021-10-26 09:31:01 +00:00
const String & uri ,
2021-07-23 14:25:35 +00:00
const String & format ,
const std : : optional < FormatSettings > & format_settings ,
const Block & sample_block ,
ContextPtr context ,
const ConnectionTimeouts & timeouts ,
2021-10-26 09:31:01 +00:00
const CompressionMethod compression_method ,
2021-10-28 12:44:12 +00:00
const String & http_method )
2021-07-23 14:25:35 +00:00
: SinkToStorage ( sample_block )
2020-04-28 00:56:44 +00:00
{
2021-12-02 07:49:34 +00:00
//
// get the content type first
//
// The code here may look a little wired.
// The getContentType() is prodived on IOutputFormat class which relies on a WriteBuffer object,
// and this WriteBuffer object here is WriterBufferFromHTTP itself which accepts the Content-Type header.
// So, this is cyclic dependency.
// To decouple such dependency, we must be able to set header to 'WriteBufferFromHTTP' after we get the instance of output format by calling IOutputFormat::getContentType.
// But this is tricky because the 'WriteBufferFromHTTP' object may have been decorated by 'WriteBufferWithCompression' and is not acceesible due to private modifiers.
//
// So, here we first instantiate an OutputFormat object with a fake stream to get the Content-Type.
// This is not the best way but a more simpler way to understand.
//
std : : string content_type ;
{
WriteBufferFromOStream buffer ( std : : cout ) ;
auto output = FormatFactory : : instance ( ) . getOutputFormat ( format ,
buffer ,
sample_block ,
context ,
{ } /* write callback */ ,
format_settings ) ;
content_type = output - > getContentType ( ) ;
}
2020-04-28 00:56:44 +00:00
write_buf = wrapWriteBufferWithCompressionMethod (
2021-12-02 07:49:34 +00:00
std : : make_unique < WriteBufferFromHTTP > ( Poco : : URI ( uri ) , http_method , content_type , timeouts ) ,
2020-04-28 00:56:44 +00:00
compression_method , 3 ) ;
2021-10-11 16:11:50 +00:00
writer = FormatFactory : : instance ( ) . getOutputFormat ( format , * write_buf , sample_block ,
2020-11-05 11:28:20 +00:00
context , { } /* write callback */ , format_settings ) ;
2020-04-28 00:56:44 +00:00
}
2018-06-16 05:54:06 +00:00
2020-07-09 01:00:16 +00:00
2021-07-23 14:25:35 +00:00
void StorageURLSink : : consume ( Chunk chunk )
2020-07-09 01:00:16 +00:00
{
2021-09-03 17:29:36 +00:00
writer - > write ( getHeader ( ) . cloneWithColumns ( chunk . detachColumns ( ) ) ) ;
2020-07-09 01:00:16 +00:00
}
2021-07-23 14:25:35 +00:00
void StorageURLSink : : onFinish ( )
2020-07-09 01:00:16 +00:00
{
2021-11-11 18:09:21 +00:00
writer - > finalize ( ) ;
2020-07-09 01:00:16 +00:00
writer - > flush ( ) ;
write_buf - > finalize ( ) ;
}
2021-10-26 09:31:01 +00:00
class PartitionedStorageURLSink : public PartitionedSink
{
public :
PartitionedStorageURLSink (
const ASTPtr & partition_by ,
const String & uri_ ,
const String & format_ ,
const std : : optional < FormatSettings > & format_settings_ ,
const Block & sample_block_ ,
ContextPtr context_ ,
const ConnectionTimeouts & timeouts_ ,
const CompressionMethod compression_method_ ,
2021-10-28 12:44:12 +00:00
const String & http_method_ )
2021-10-26 09:31:01 +00:00
: PartitionedSink ( partition_by , context_ , sample_block_ )
, uri ( uri_ )
, format ( format_ )
, format_settings ( format_settings_ )
, sample_block ( sample_block_ )
, context ( context_ )
, timeouts ( timeouts_ )
, compression_method ( compression_method_ )
2021-10-28 12:44:12 +00:00
, http_method ( http_method_ )
2021-10-26 09:31:01 +00:00
{
}
SinkPtr createSinkForPartition ( const String & partition_id ) override
{
auto partition_path = PartitionedSink : : replaceWildcards ( uri , partition_id ) ;
context - > getRemoteHostFilter ( ) . checkURL ( Poco : : URI ( partition_path ) ) ;
return std : : make_shared < StorageURLSink > ( partition_path , format ,
2021-10-28 12:44:12 +00:00
format_settings , sample_block , context , timeouts , compression_method , http_method ) ;
2021-10-26 09:31:01 +00:00
}
private :
const String uri ;
const String format ;
const std : : optional < FormatSettings > format_settings ;
const Block sample_block ;
ContextPtr context ;
const ConnectionTimeouts timeouts ;
const CompressionMethod compression_method ;
2021-10-28 12:44:12 +00:00
const String http_method ;
2021-10-26 09:31:01 +00:00
} ;
2020-07-09 01:00:16 +00:00
2018-08-09 18:49:05 +00:00
std : : string IStorageURLBase : : getReadMethod ( ) const
{
return Poco : : Net : : HTTPRequest : : HTTP_GET ;
}
2020-06-17 16:39:58 +00:00
std : : vector < std : : pair < std : : string , std : : string > > IStorageURLBase : : getReadURIParams (
const Names & /*column_names*/ ,
const StorageMetadataPtr & /*metadata_snapshot*/ ,
2018-06-11 12:13:00 +00:00
const SelectQueryInfo & /*query_info*/ ,
2021-04-10 23:33:54 +00:00
ContextPtr /*context*/ ,
2018-06-11 12:13:00 +00:00
QueryProcessingStage : : Enum & /*processed_stage*/ ,
2018-08-09 18:49:05 +00:00
size_t /*max_block_size*/ ) const
{
return { } ;
}
2020-06-17 16:39:58 +00:00
std : : function < void ( std : : ostream & ) > IStorageURLBase : : getReadPOSTDataCallback (
const Names & /*column_names*/ ,
const StorageMetadataPtr & /*metadata_snapshot*/ ,
2018-06-11 12:13:00 +00:00
const SelectQueryInfo & /*query_info*/ ,
2021-04-10 23:33:54 +00:00
ContextPtr /*context*/ ,
2018-08-09 18:49:05 +00:00
QueryProcessingStage : : Enum & /*processed_stage*/ ,
size_t /*max_block_size*/ ) const
{
return nullptr ;
}
2020-08-03 13:54:14 +00:00
Pipe IStorageURLBase : : read (
2020-06-15 19:08:58 +00:00
const Names & column_names ,
2020-06-16 14:25:08 +00:00
const StorageMetadataPtr & metadata_snapshot ,
2020-09-20 17:52:17 +00:00
SelectQueryInfo & query_info ,
2021-04-10 23:33:54 +00:00
ContextPtr local_context ,
2018-06-25 12:21:54 +00:00
QueryProcessingStage : : Enum processed_stage ,
2019-02-18 23:38:44 +00:00
size_t max_block_size ,
2018-06-11 12:13:00 +00:00
unsigned /*num_streams*/ )
{
2021-04-10 23:33:54 +00:00
auto params = getReadURIParams ( column_names , metadata_snapshot , query_info , local_context , processed_stage , max_block_size ) ;
2021-10-28 13:56:45 +00:00
bool with_globs = ( uri . find ( ' { ' ) ! = std : : string : : npos & & uri . find ( ' } ' ) ! = std : : string : : npos )
2021-11-01 09:52:27 +00:00
| | uri . find ( ' | ' ) ! = std : : string : : npos ;
2021-10-28 13:56:45 +00:00
2021-10-26 09:31:01 +00:00
if ( with_globs )
{
size_t max_addresses = local_context - > getSettingsRef ( ) . glob_expansion_max_elements ;
std : : vector < String > url_descriptions = parseRemoteDescription ( uri , 0 , uri . size ( ) , ' , ' , max_addresses ) ;
std : : vector < String > uri_options ;
Pipes pipes ;
for ( const auto & url_description : url_descriptions )
{
/// For each uri (which acts like shard) check if it has failover options
uri_options = parseRemoteDescription ( url_description , 0 , url_description . size ( ) , ' | ' , max_addresses ) ;
StoragePtr shard ;
pipes . emplace_back ( std : : make_shared < StorageURLSource > (
uri_options ,
getReadMethod ( ) ,
getReadPOSTDataCallback (
column_names , metadata_snapshot , query_info ,
local_context , processed_stage , max_block_size ) ,
format_name ,
format_settings ,
getName ( ) ,
getHeaderBlock ( column_names , metadata_snapshot ) ,
local_context ,
metadata_snapshot - > getColumns ( ) ,
max_block_size ,
ConnectionTimeouts : : getHTTPTimeouts ( local_context ) ,
compression_method , headers , params ) ) ;
}
return Pipe : : unitePipes ( std : : move ( pipes ) ) ;
}
else
{
std : : vector < String > uri_options { uri } ;
return Pipe ( std : : make_shared < StorageURLSource > (
uri_options ,
getReadMethod ( ) ,
getReadPOSTDataCallback (
column_names , metadata_snapshot , query_info ,
local_context , processed_stage , max_block_size ) ,
format_name ,
format_settings ,
getName ( ) ,
getHeaderBlock ( column_names , metadata_snapshot ) ,
local_context ,
metadata_snapshot - > getColumns ( ) ,
max_block_size ,
ConnectionTimeouts : : getHTTPTimeouts ( local_context ) ,
compression_method , headers , params ) ) ;
}
2018-06-11 12:13:00 +00:00
}
2021-04-21 14:36:04 +00:00
Pipe StorageURLWithFailover : : read (
const Names & column_names ,
const StorageMetadataPtr & metadata_snapshot ,
SelectQueryInfo & query_info ,
ContextPtr local_context ,
QueryProcessingStage : : Enum processed_stage ,
size_t max_block_size ,
unsigned /*num_streams*/ )
{
auto params = getReadURIParams ( column_names , metadata_snapshot , query_info , local_context , processed_stage , max_block_size ) ;
2021-11-01 09:52:27 +00:00
2021-10-03 13:53:24 +00:00
auto pipe = Pipe ( std : : make_shared < StorageURLSource > (
uri_options ,
getReadMethod ( ) ,
getReadPOSTDataCallback (
column_names , metadata_snapshot , query_info ,
local_context , processed_stage , max_block_size ) ,
format_name ,
format_settings ,
getName ( ) ,
getHeaderBlock ( column_names , metadata_snapshot ) ,
local_context ,
metadata_snapshot - > getColumns ( ) ,
max_block_size ,
ConnectionTimeouts : : getHTTPTimeouts ( local_context ) ,
compression_method , headers , params ) ) ;
std : : shuffle ( uri_options . begin ( ) , uri_options . end ( ) , thread_local_rng ) ;
return pipe ;
2021-04-21 14:36:04 +00:00
}
2021-10-26 09:31:01 +00:00
SinkToStoragePtr IStorageURLBase : : write ( const ASTPtr & query , const StorageMetadataPtr & metadata_snapshot , ContextPtr context )
2018-06-11 12:13:00 +00:00
{
2021-10-28 12:44:12 +00:00
if ( http_method . empty ( ) )
http_method = Poco : : Net : : HTTPRequest : : HTTP_POST ;
2021-10-26 09:31:01 +00:00
bool has_wildcards = uri . find ( PartitionedSink : : PARTITION_ID_WILDCARD ) ! = String : : npos ;
const auto * insert_query = dynamic_cast < const ASTInsertQuery * > ( query . get ( ) ) ;
2021-10-26 12:22:13 +00:00
auto partition_by_ast = insert_query ? ( insert_query - > partition_by ? insert_query - > partition_by : partition_by ) : nullptr ;
bool is_partitioned_implementation = partition_by_ast & & has_wildcards ;
2021-10-26 09:31:01 +00:00
if ( is_partitioned_implementation )
{
return std : : make_shared < PartitionedStorageURLSink > (
2021-10-26 12:22:13 +00:00
partition_by_ast ,
2021-10-26 09:31:01 +00:00
uri , format_name ,
format_settings , metadata_snapshot - > getSampleBlock ( ) , context ,
ConnectionTimeouts : : getHTTPTimeouts ( context ) ,
2021-10-28 12:44:12 +00:00
chooseCompressionMethod ( uri , compression_method ) , http_method ) ;
2021-10-26 09:31:01 +00:00
}
else
{
return std : : make_shared < StorageURLSink > ( uri , format_name ,
format_settings , metadata_snapshot - > getSampleBlock ( ) , context ,
ConnectionTimeouts : : getHTTPTimeouts ( context ) ,
2021-10-28 12:44:12 +00:00
chooseCompressionMethod ( uri , compression_method ) , http_method ) ;
2021-10-26 09:31:01 +00:00
}
2018-06-11 12:13:00 +00:00
}
2018-06-16 05:54:06 +00:00
2021-04-23 12:18:23 +00:00
StorageURL : : StorageURL (
2021-10-26 09:31:01 +00:00
const String & uri_ ,
2021-04-23 12:18:23 +00:00
const StorageID & table_id_ ,
const String & format_name_ ,
const std : : optional < FormatSettings > & format_settings_ ,
const ColumnsDescription & columns_ ,
const ConstraintsDescription & constraints_ ,
const String & comment ,
ContextPtr context_ ,
2021-09-07 11:17:25 +00:00
const String & compression_method_ ,
2021-10-26 09:31:01 +00:00
const ReadWriteBufferFromHTTP : : HTTPHeaderEntries & headers_ ,
2021-10-28 12:44:12 +00:00
const String & http_method_ ,
2021-10-26 12:22:13 +00:00
ASTPtr partition_by_ )
: IStorageURLBase ( uri_ , context_ , table_id_ , format_name_ , format_settings_ ,
2021-10-28 12:44:12 +00:00
columns_ , constraints_ , comment , compression_method_ , headers_ , http_method_ , partition_by_ )
2021-03-23 11:29:29 +00:00
{
2021-10-26 09:31:01 +00:00
context_ - > getRemoteHostFilter ( ) . checkURL ( Poco : : URI ( uri ) ) ;
2021-03-23 11:29:29 +00:00
}
2021-04-21 12:32:57 +00:00
2021-04-21 14:36:04 +00:00
StorageURLWithFailover : : StorageURLWithFailover (
2021-05-02 16:33:45 +00:00
const std : : vector < String > & uri_options_ ,
const StorageID & table_id_ ,
const String & format_name_ ,
const std : : optional < FormatSettings > & format_settings_ ,
const ColumnsDescription & columns_ ,
const ConstraintsDescription & constraints_ ,
ContextPtr context_ ,
const String & compression_method_ )
2021-10-26 09:31:01 +00:00
: StorageURL ( " " , table_id_ , format_name_ , format_settings_ , columns_ , constraints_ , String { } , context_ , compression_method_ )
2021-04-21 14:36:04 +00:00
{
for ( const auto & uri_option : uri_options_ )
{
2021-04-22 07:37:20 +00:00
Poco : : URI poco_uri ( uri_option ) ;
context_ - > getRemoteHostFilter ( ) . checkURL ( poco_uri ) ;
2021-04-21 14:36:04 +00:00
LOG_DEBUG ( & Poco : : Logger : : get ( " StorageURLDistributed " ) , " Adding URL option: {} " , uri_option ) ;
2021-11-01 09:52:27 +00:00
uri_options . emplace_back ( std : : move ( uri_option ) ) ;
2021-04-21 14:36:04 +00:00
}
}
2021-04-21 12:32:57 +00:00
FormatSettings StorageURL : : getFormatSettingsFromArgs ( const StorageFactory : : Arguments & args )
{
// Use format settings from global server context + settings from
// the SETTINGS clause of the create query. Settings from current
// session and user are ignored.
FormatSettings format_settings ;
if ( args . storage_def - > settings )
{
FormatFactorySettings user_format_settings ;
// Apply changed settings from global context, but ignore the
// unknown ones, because we only have the format settings here.
const auto & changes = args . getContext ( ) - > getSettingsRef ( ) . changes ( ) ;
for ( const auto & change : changes )
{
if ( user_format_settings . has ( change . name ) )
{
user_format_settings . set ( change . name , change . value ) ;
}
}
// Apply changes from SETTINGS clause, with validation.
user_format_settings . applyChanges ( args . storage_def - > settings - > changes ) ;
format_settings = getFormatSettings ( args . getContext ( ) ,
user_format_settings ) ;
}
else
{
format_settings = getFormatSettings ( args . getContext ( ) ) ;
}
return format_settings ;
}
2021-09-07 11:17:25 +00:00
URLBasedDataSourceConfiguration StorageURL : : getConfiguration ( ASTs & args , ContextPtr local_context )
2018-06-11 12:13:00 +00:00
{
2021-09-15 22:45:43 +00:00
URLBasedDataSourceConfiguration configuration ;
2018-06-11 12:13:00 +00:00
2021-09-15 22:45:43 +00:00
if ( auto named_collection = getURLBasedDataSourceConfiguration ( args , local_context ) )
2018-08-24 00:07:25 +00:00
{
2021-09-15 22:45:43 +00:00
auto [ common_configuration , storage_specific_args ] = named_collection . value ( ) ;
configuration . set ( common_configuration ) ;
2018-06-11 12:13:00 +00:00
2021-10-28 12:44:12 +00:00
if ( ! configuration . http_method . empty ( )
& & configuration . http_method ! = Poco : : Net : : HTTPRequest : : HTTP_POST
& & configuration . http_method ! = Poco : : Net : : HTTPRequest : : HTTP_PUT )
2021-10-26 09:31:01 +00:00
throw Exception ( ErrorCodes : : BAD_ARGUMENTS ,
2021-10-28 12:44:12 +00:00
" Http method can be POST or PUT (current: {}). For insert default is POST, for select GET " ,
configuration . http_method ) ;
2021-10-26 09:31:01 +00:00
2021-09-07 11:17:25 +00:00
if ( ! storage_specific_args . empty ( ) )
2021-09-08 19:28:22 +00:00
{
String illegal_args ;
for ( const auto & arg : storage_specific_args )
{
if ( ! illegal_args . empty ( ) )
illegal_args + = " , " ;
illegal_args + = arg . first ;
}
2021-11-01 09:52:27 +00:00
throw Exception ( ErrorCodes : : BAD_ARGUMENTS , " Unknown argument `{}` for storage URL " , illegal_args ) ;
2021-09-08 19:28:22 +00:00
}
2021-09-07 11:17:25 +00:00
}
else
{
if ( args . size ( ) ! = 2 & & args . size ( ) ! = 3 )
2018-06-11 12:13:00 +00:00
throw Exception (
2021-10-26 09:31:01 +00:00
" Storage URL requires 2 or 3 arguments: url, name of used format and optional compression method. " ,
ErrorCodes : : NUMBER_OF_ARGUMENTS_DOESNT_MATCH ) ;
2018-06-11 12:13:00 +00:00
2021-09-07 11:17:25 +00:00
for ( auto & arg : args )
arg = evaluateConstantExpressionOrIdentifierAsLiteral ( arg , local_context ) ;
2018-06-11 12:13:00 +00:00
2021-09-07 11:17:25 +00:00
configuration . url = args [ 0 ] - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
configuration . format = args [ 1 ] - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
if ( args . size ( ) = = 3 )
configuration . compression_method = args [ 2 ] - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
}
2018-06-11 12:13:00 +00:00
2021-09-07 11:17:25 +00:00
return configuration ;
}
2018-06-11 12:13:00 +00:00
2021-09-07 11:17:25 +00:00
void registerStorageURL ( StorageFactory & factory )
{
factory . registerStorage ( " URL " , [ ] ( const StorageFactory : : Arguments & args )
{
ASTs & engine_args = args . engine_args ;
2021-09-11 16:29:23 +00:00
auto configuration = StorageURL : : getConfiguration ( engine_args , args . getLocalContext ( ) ) ;
2021-09-07 11:17:25 +00:00
auto format_settings = StorageURL : : getFormatSettingsFromArgs ( args ) ;
ReadWriteBufferFromHTTP : : HTTPHeaderEntries headers ;
for ( const auto & [ header , value ] : configuration . headers )
2019-11-19 12:46:07 +00:00
{
2021-09-07 11:17:25 +00:00
auto value_literal = value . safeGet < String > ( ) ;
2021-10-23 18:53:59 +00:00
if ( header = = " Range " )
throw Exception ( ErrorCodes : : BAD_ARGUMENTS , " Range headers are not allowed " ) ;
2021-09-07 11:17:25 +00:00
headers . emplace_back ( std : : make_pair ( header , value_literal ) ) ;
2020-11-05 11:28:20 +00:00
}
2021-10-26 12:22:13 +00:00
ASTPtr partition_by ;
if ( args . storage_def - > partition_by )
partition_by = args . storage_def - > partition_by - > clone ( ) ;
2019-11-19 12:46:07 +00:00
return StorageURL : : create (
2021-10-26 09:31:01 +00:00
configuration . url ,
2019-12-04 16:06:55 +00:00
args . table_id ,
2021-09-07 11:17:25 +00:00
configuration . format ,
2020-11-05 11:28:20 +00:00
format_settings ,
2021-04-23 12:18:23 +00:00
args . columns ,
args . constraints ,
args . comment ,
args . getContext ( ) ,
2021-09-07 11:17:25 +00:00
configuration . compression_method ,
2021-10-26 12:22:13 +00:00
headers ,
2021-10-28 12:44:12 +00:00
configuration . http_method ,
2021-10-26 12:22:13 +00:00
partition_by ) ;
2020-04-06 05:19:40 +00:00
} ,
{
2020-11-05 11:28:20 +00:00
. supports_settings = true ,
2020-04-06 05:19:40 +00:00
. source_access_type = AccessType : : URL ,
2018-06-11 12:13:00 +00:00
} ) ;
}
}