mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 00:22:29 +00:00
adding checking url in HTTPSource
This commit is contained in:
parent
acb09f01d4
commit
bd519075b7
@ -90,7 +90,7 @@ BlockInputStreamPtr HTTPDictionarySource::createWrappedBuffer(std::unique_ptr<Re
|
|||||||
{
|
{
|
||||||
String http_request_compression_method_str = http_buffer_ptr->getCompressMethod();
|
String http_request_compression_method_str = http_buffer_ptr->getCompressMethod();
|
||||||
auto in_ptr_wrapped
|
auto in_ptr_wrapped
|
||||||
= wrapReadBufferWithCompressionMethod(std::move(http_buffer_ptr), chooseCompressionMethod({}, http_request_compression_method_str));
|
= wrapReadBufferWithCompressionMethod(std::move(http_buffer_ptr), chooseCompressionMethod(url, http_request_compression_method_str));
|
||||||
auto input_stream = context->getInputFormat(format, *in_ptr_wrapped, sample_block, max_block_size);
|
auto input_stream = context->getInputFormat(format, *in_ptr_wrapped, sample_block, max_block_size);
|
||||||
return std::make_shared<OwningBlockInputStream<ReadBuffer>>(input_stream, std::move(in_ptr_wrapped));
|
return std::make_shared<OwningBlockInputStream<ReadBuffer>>(input_stream, std::move(in_ptr_wrapped));
|
||||||
}
|
}
|
||||||
|
@ -1,14 +1,14 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <IO/CompressionMethod.h>
|
|
||||||
#include <IO/ConnectionTimeouts.h>
|
#include <IO/ConnectionTimeouts.h>
|
||||||
#include <IO/ReadWriteBufferFromHTTP.h>
|
#include <IO/ReadWriteBufferFromHTTP.h>
|
||||||
#include <Interpreters/Context.h>
|
|
||||||
#include <Poco/Net/HTTPBasicCredentials.h>
|
#include <Poco/Net/HTTPBasicCredentials.h>
|
||||||
#include <Poco/URI.h>
|
#include <Poco/URI.h>
|
||||||
#include <common/LocalDateTime.h>
|
#include <common/LocalDateTime.h>
|
||||||
#include "DictionaryStructure.h"
|
#include "DictionaryStructure.h"
|
||||||
#include "IDictionarySource.h"
|
#include "IDictionarySource.h"
|
||||||
|
#include <Interpreters/Context.h>
|
||||||
|
#include <IO/CompressionMethod.h>
|
||||||
|
|
||||||
namespace Poco
|
namespace Poco
|
||||||
{
|
{
|
||||||
@ -74,3 +74,4 @@ private:
|
|||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -46,6 +46,12 @@ std::string toContentEncodingName(CompressionMethod method)
|
|||||||
__builtin_unreachable();
|
__builtin_unreachable();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool pathExtensionIsCorrect(const std::string& ending)
|
||||||
|
{
|
||||||
|
return ending == "gzip" || ending == "gz" || ending == "deflate" ||
|
||||||
|
ending == "brotli" || ending == "br" || ending == "lzma" ||
|
||||||
|
ending == "xz" || ending == "zstd" || ending == "zst";
|
||||||
|
}
|
||||||
|
|
||||||
CompressionMethod chooseCompressionMethod(const std::string & path, const std::string & hint)
|
CompressionMethod chooseCompressionMethod(const std::string & path, const std::string & hint)
|
||||||
{
|
{
|
||||||
@ -57,6 +63,9 @@ CompressionMethod chooseCompressionMethod(const std::string & path, const std::s
|
|||||||
file_extension = path.substr(pos + 1, std::string::npos);
|
file_extension = path.substr(pos + 1, std::string::npos);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!pathExtensionIsCorrect(file_extension))
|
||||||
|
file_extension.clear();
|
||||||
|
|
||||||
std::string method_str = file_extension.empty() ? hint : std::move(file_extension);
|
std::string method_str = file_extension.empty() ? hint : std::move(file_extension);
|
||||||
boost::algorithm::to_lower(method_str);
|
boost::algorithm::to_lower(method_str);
|
||||||
|
|
||||||
|
@ -37,6 +37,8 @@ enum class CompressionMethod
|
|||||||
/// How the compression method is named in HTTP.
|
/// How the compression method is named in HTTP.
|
||||||
std::string toContentEncodingName(CompressionMethod method);
|
std::string toContentEncodingName(CompressionMethod method);
|
||||||
|
|
||||||
|
bool pathExtensionIsCorrect(const std::string& ending);
|
||||||
|
|
||||||
/** Choose compression method from path and hint.
|
/** Choose compression method from path and hint.
|
||||||
* if hint is "auto" or empty string, then path is analyzed,
|
* if hint is "auto" or empty string, then path is analyzed,
|
||||||
* otherwise path parameter is ignored and hint is used as compression method name.
|
* otherwise path parameter is ignored and hint is used as compression method name.
|
||||||
|
14
tests/integration/test_http_source_dictionary/test.py → tests/queries/0_stateless/01854_HTTP_dict_decompression.python
Normal file → Executable file
14
tests/integration/test_http_source_dictionary/test.py → tests/queries/0_stateless/01854_HTTP_dict_decompression.python
Normal file → Executable file
@ -43,6 +43,8 @@ CSV_DATA = "Hello, 1\nWorld, 2\nThis, 152\nis, 9283\ntesting, 2313213\ndata, 555
|
|||||||
# Choose compression method
|
# Choose compression method
|
||||||
# (Will change during test, need to check standart data sending, to make sure that nothing broke)
|
# (Will change during test, need to check standart data sending, to make sure that nothing broke)
|
||||||
COMPRESS_METHOD = 'none'
|
COMPRESS_METHOD = 'none'
|
||||||
|
ADDING_ENDING = ''
|
||||||
|
ENDINGS = ['gz', 'xz']
|
||||||
|
|
||||||
def get_ch_answer(query):
|
def get_ch_answer(query):
|
||||||
url = os.environ.get('CLICKHOUSE_URL', 'http://{host}:{port}'.format(host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT_HTTP))
|
url = os.environ.get('CLICKHOUSE_URL', 'http://{host}:{port}'.format(host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT_HTTP))
|
||||||
@ -109,28 +111,34 @@ def start_server(requests_amount):
|
|||||||
#####################################################################
|
#####################################################################
|
||||||
|
|
||||||
def test_select(dict_name="", schema="word String, counter UInt32", requests=[], answers=[], test_data=""):
|
def test_select(dict_name="", schema="word String, counter UInt32", requests=[], answers=[], test_data=""):
|
||||||
|
global ADDING_ENDING
|
||||||
for i in range(len(requests)):
|
for i in range(len(requests)):
|
||||||
|
if i > 2:
|
||||||
|
ADDING_ENDING = ENDINGS[i-3]
|
||||||
if dict_name:
|
if dict_name:
|
||||||
get_ch_answer("drop dictionary if exists {}".format(dict_name))
|
get_ch_answer("drop dictionary if exists {}".format(dict_name))
|
||||||
get_ch_answer('''CREATE DICTIONARY {} ({})
|
get_ch_answer('''CREATE DICTIONARY {} ({})
|
||||||
PRIMARY KEY word
|
PRIMARY KEY word
|
||||||
SOURCE(HTTP(url '{}' format 'CSV'))
|
SOURCE(HTTP(url '{}' format 'CSV'))
|
||||||
LAYOUT(complex_key_hashed())
|
LAYOUT(complex_key_hashed())
|
||||||
LIFETIME(0)'''.format(dict_name, schema, HTTP_SERVER_URL_STR+'/test.csv'))
|
LIFETIME(0)'''.format(dict_name, schema, HTTP_SERVER_URL_STR+'/test.csv' + ADDING_ENDING))
|
||||||
|
|
||||||
COMPRESS_METHOD = requests[i]
|
COMPRESS_METHOD = requests[i]
|
||||||
# print(get_ch_answer("select * from {}".format(dict_name)))
|
# print(get_ch_answer("select * from {}".format(dict_name)))
|
||||||
check_answers("select * from {}".format(dict_name), answers[i])
|
check_answers("select * from {}".format(dict_name), answers[i])
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
|
# first three for encoding, second three for url
|
||||||
insert_requests = [
|
insert_requests = [
|
||||||
'none',
|
'none',
|
||||||
'gzip',
|
'gzip',
|
||||||
'lzma'
|
'lzma',
|
||||||
|
'none',
|
||||||
|
'none'
|
||||||
]
|
]
|
||||||
|
|
||||||
# This answers got experemently in non compressed mode and they are correct
|
# This answers got experemently in non compressed mode and they are correct
|
||||||
answers = ['''This 152\nHello 1\nis 9283\ndata 555\nWorld 2\ntesting 2313213'''] * 3
|
answers = ['''This 152\nHello 1\nis 9283\ndata 555\nWorld 2\ntesting 2313213'''] * 5
|
||||||
|
|
||||||
t = start_server(len(insert_requests))
|
t = start_server(len(insert_requests))
|
||||||
t.start()
|
t.start()
|
7
tests/queries/0_stateless/01854_HTTP_dict_decompression.sh
Executable file
7
tests/queries/0_stateless/01854_HTTP_dict_decompression.sh
Executable file
@ -0,0 +1,7 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||||
|
# shellcheck source=../shell_config.sh
|
||||||
|
. "$CURDIR"/../shell_config.sh
|
||||||
|
|
||||||
|
python3 "$CURDIR"/01854_HTTP_dict_decompression.python
|
Loading…
Reference in New Issue
Block a user