From 13a6e03849eca87d7ee20e0a09185fe4c6870c65 Mon Sep 17 00:00:00 2001 From: FArthur-cmd <613623@mail.ru> Date: Sat, 8 May 2021 10:15:14 +0300 Subject: [PATCH] solving style-check problems --- src/Dictionaries/HTTPDictionarySource.cpp | 54 ++++--- src/Dictionaries/HTTPDictionarySource.h | 4 +- src/IO/ReadWriteBufferFromHTTP.h | 12 +- .../01854_HTTP_dict_decompression.python | 153 ++++++++++++++++++ .../01854_HTTP_dict_decompression.sh | 7 + 5 files changed, 199 insertions(+), 31 deletions(-) create mode 100644 tests/queries/0_stateless/01854_HTTP_dict_decompression.python create mode 100755 tests/queries/0_stateless/01854_HTTP_dict_decompression.sh diff --git a/src/Dictionaries/HTTPDictionarySource.cpp b/src/Dictionaries/HTTPDictionarySource.cpp index 805432427bc..f593b64f92e 100644 --- a/src/Dictionaries/HTTPDictionarySource.cpp +++ b/src/Dictionaries/HTTPDictionarySource.cpp @@ -42,7 +42,6 @@ HTTPDictionarySource::HTTPDictionarySource( , context(context_) , timeouts(ConnectionTimeouts::getHTTPTimeouts(context)) { - if (check_config) context->getRemoteHostFilter().checkURL(Poco::URI(url)); @@ -87,12 +86,11 @@ HTTPDictionarySource::HTTPDictionarySource(const HTTPDictionarySource & other) credentials.setPassword(other.credentials.getPassword()); } -BlockInputStreamPtr HTTPDictionarySource::createWrappedBuffer(std::unique_ptr http_buffer_ptr) { +BlockInputStreamPtr HTTPDictionarySource::createWrappedBuffer(std::unique_ptr http_buffer_ptr) +{ String http_request_compression_method_str = http_buffer_ptr->getCompressMethod(); - auto in_ptr_wrapped = wrapReadBufferWithCompressionMethod( - std::move(http_buffer_ptr), - chooseCompressionMethod({}, http_request_compression_method_str)); - + auto in_ptr_wrapped + = wrapReadBufferWithCompressionMethod(std::move(http_buffer_ptr), chooseCompressionMethod({}, http_request_compression_method_str)); auto input_stream = context->getInputFormat(format, *in_ptr_wrapped, sample_block, max_block_size); return std::make_shared>(input_stream, std::move(in_ptr_wrapped)); } @@ -119,9 +117,15 @@ BlockInputStreamPtr HTTPDictionarySource::loadAll() LOG_TRACE(log, "loadAll {}", toString()); Poco::URI uri(url); auto in_ptr = std::make_unique( - uri, Poco::Net::HTTPRequest::HTTP_GET, ReadWriteBufferFromHTTP::OutStreamCallback(), timeouts, - 0, credentials, DBMS_DEFAULT_BUFFER_SIZE, header_entries); - return createWrappedBuffer(std::move(in_ptr)); + uri, + Poco::Net::HTTPRequest::HTTP_GET, + ReadWriteBufferFromHTTP::OutStreamCallback(), + timeouts, + 0, + credentials, + DBMS_DEFAULT_BUFFER_SIZE, + header_entries); + return createWrappedBuffer(std::move(in_ptr)); } BlockInputStreamPtr HTTPDictionarySource::loadUpdatedAll() @@ -130,8 +134,14 @@ BlockInputStreamPtr HTTPDictionarySource::loadUpdatedAll() getUpdateFieldAndDate(uri); LOG_TRACE(log, "loadUpdatedAll {}", uri.toString()); auto in_ptr = std::make_unique( - uri, Poco::Net::HTTPRequest::HTTP_GET, ReadWriteBufferFromHTTP::OutStreamCallback(), timeouts, - 0, credentials, DBMS_DEFAULT_BUFFER_SIZE, header_entries); + uri, + Poco::Net::HTTPRequest::HTTP_GET, + ReadWriteBufferFromHTTP::OutStreamCallback(), + timeouts, + 0, + credentials, + DBMS_DEFAULT_BUFFER_SIZE, + header_entries); return createWrappedBuffer(std::move(in_ptr)); } @@ -150,8 +160,7 @@ BlockInputStreamPtr HTTPDictionarySource::loadIds(const std::vector & id Poco::URI uri(url); auto in_ptr = std::make_unique( - uri, Poco::Net::HTTPRequest::HTTP_POST, out_stream_callback, timeouts, - 0, credentials, DBMS_DEFAULT_BUFFER_SIZE, header_entries); + uri, Poco::Net::HTTPRequest::HTTP_POST, out_stream_callback, timeouts, 0, credentials, DBMS_DEFAULT_BUFFER_SIZE, header_entries); return createWrappedBuffer(std::move(in_ptr)); } @@ -170,8 +179,7 @@ BlockInputStreamPtr HTTPDictionarySource::loadKeys(const Columns & key_columns, Poco::URI uri(url); auto in_ptr = std::make_unique( - uri, Poco::Net::HTTPRequest::HTTP_POST, out_stream_callback, timeouts, - 0, credentials, DBMS_DEFAULT_BUFFER_SIZE, header_entries); + uri, Poco::Net::HTTPRequest::HTTP_POST, out_stream_callback, timeouts, 0, credentials, DBMS_DEFAULT_BUFFER_SIZE, header_entries); return createWrappedBuffer(std::move(in_ptr)); } @@ -204,21 +212,19 @@ std::string HTTPDictionarySource::toString() const void registerDictionarySourceHTTP(DictionarySourceFactory & factory) { auto create_table_source = [=](const DictionaryStructure & dict_struct, - const Poco::Util::AbstractConfiguration & config, - const std::string & config_prefix, - Block & sample_block, - ContextPtr context, - const std::string & /* default_database */, - bool check_config) -> DictionarySourcePtr - { + const Poco::Util::AbstractConfiguration & config, + const std::string & config_prefix, + Block & sample_block, + ContextPtr context, + const std::string & /* default_database */, + bool check_config) -> DictionarySourcePtr { if (dict_struct.has_expressions) throw Exception(ErrorCodes::LOGICAL_ERROR, "Dictionary source of type `http` does not support attribute expressions"); auto context_local_copy = copyContextAndApplySettings(config_prefix, context, config); return std::make_unique( - dict_struct, config, config_prefix + ".http", - sample_block, context_local_copy, check_config); + dict_struct, config, config_prefix + ".http", sample_block, context_local_copy, check_config); }; factory.registerSource("http", create_table_source); } diff --git a/src/Dictionaries/HTTPDictionarySource.h b/src/Dictionaries/HTTPDictionarySource.h index 3670f2ef443..e4b85b2efa3 100644 --- a/src/Dictionaries/HTTPDictionarySource.h +++ b/src/Dictionaries/HTTPDictionarySource.h @@ -1,14 +1,14 @@ #pragma once +#include #include #include +#include #include #include #include #include "DictionaryStructure.h" #include "IDictionarySource.h" -#include -#include namespace Poco { diff --git a/src/IO/ReadWriteBufferFromHTTP.h b/src/IO/ReadWriteBufferFromHTTP.h index 095153862f9..c8416020d2a 100644 --- a/src/IO/ReadWriteBufferFromHTTP.h +++ b/src/IO/ReadWriteBufferFromHTTP.h @@ -92,7 +92,7 @@ namespace detail protected: Poco::URI uri; std::string method; - std::string compress_method; + std::string compress_method; UpdatableSessionPtr session; std::istream * istr; /// owned by session @@ -138,7 +138,7 @@ namespace detail istr = receiveResponse(*sess, request, response, true); response.getCookies(cookies); - compress_method = response.get("Content-Encoding"); + compress_method = response.get("Content-Encoding"); return istr; } @@ -167,6 +167,7 @@ namespace detail : ReadBuffer(nullptr, 0) , uri {uri_} , method {!method_.empty() ? method_ : out_stream_callback_ ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET} + , compress_method{} , session {session_} , out_stream_callback {out_stream_callback_} , credentials {credentials_} @@ -233,9 +234,10 @@ namespace detail next_callback(count()); } - std::string getCompressMethod() const { - return compress_method; - } + std::string getCompressMethod() const + { + return compress_method; + } }; } diff --git a/tests/queries/0_stateless/01854_HTTP_dict_decompression.python b/tests/queries/0_stateless/01854_HTTP_dict_decompression.python new file mode 100644 index 00000000000..ae7d9bdd79a --- /dev/null +++ b/tests/queries/0_stateless/01854_HTTP_dict_decompression.python @@ -0,0 +1,153 @@ +#!/usr/bin/env python3 + +from http.server import SimpleHTTPRequestHandler,HTTPServer +import socket +import csv +import sys +import tempfile +import threading +import os +import gzip +import traceback +import urllib.request +import subprocess +import lzma + +def get_local_port(host): + with socket.socket() as fd: + fd.bind((host, 0)) + return fd.getsockname()[1] + +CLICKHOUSE_HOST = os.environ.get('CLICKHOUSE_HOST', '127.0.0.1') +CLICKHOUSE_PORT_HTTP = os.environ.get('CLICKHOUSE_PORT_HTTP', '8123') + +##################################################################################### +# This test starts an HTTP server and serves data to clickhouse url-engine based table. +# The main goal of this test is checking that compress methods are working. +# In order for it to work ip+port of http server (given below) should be +# accessible from clickhouse server. +##################################################################################### + +# IP-address of this host accessible from outside world. +HTTP_SERVER_HOST = subprocess.check_output(['hostname', '-i']).decode('utf-8').strip() +HTTP_SERVER_PORT = get_local_port(HTTP_SERVER_HOST) + +# IP address and port of the HTTP server started from this script. +HTTP_SERVER_ADDRESS = (HTTP_SERVER_HOST, HTTP_SERVER_PORT) +HTTP_SERVER_URL_STR = 'http://' + ':'.join(str(s) for s in HTTP_SERVER_ADDRESS) + "/" + +# Because we need to check content of file.csv we can create this content and avoid reading csv +CSV_DATA = "Hello, 1\nWorld, 2\nThis, 152\nis, 9283\ntesting, 2313213\ndata, 555\n" + + +# Choose compression method +# (Will change during test, need to check standart data sending, to make sure that nothing broke) +COMPRESS_METHOD = 'none' + +def get_ch_answer(query): + url = os.environ.get('CLICKHOUSE_URL', 'http://{host}:{port}'.format(host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT_HTTP)) + return urllib.request.urlopen(url, data=query.encode()).read().decode() + +def check_answers(query, answer): + ch_answer = get_ch_answer(query) + if ch_answer.strip() != answer.strip(): + print("FAIL on query:", query, file=sys.stderr) + print("Expected answer:", answer, file=sys.stderr) + print("Fetched answer :", ch_answer, file=sys.stderr) + raise Exception("Fail on query") + +# Server with head method which is useful for debuging by hands +class HttpProcessor(SimpleHTTPRequestHandler): + def _set_headers(self): + self.send_response(200) + self.send_header('Content-Encoding', COMPRESS_METHOD) + if COMPRESS_METHOD == 'none': + self.send_header('Content-Length', len(CSV_DATA.encode())) + else: + self.compress_data() + self.send_header('Content-Length', len(self.data)) + self.send_header('Content-Type', 'text/csv') + self.end_headers() + + def do_HEAD(self): + self._set_headers() + return + + def compress_data(self): + if COMPRESS_METHOD == 'gzip': + self.data = gzip.compress((CSV_DATA).encode()) + elif COMPRESS_METHOD == 'lzma': + self.data = lzma.compress((CSV_DATA).encode()) + else: + self.data = 'WRONG CONVERSATION'.encode() + + + def do_GET(self): + self._set_headers() + + if COMPRESS_METHOD == 'none': + self.wfile.write(CSV_DATA.encode()) + else: + self.wfile.write(self.data) + return + + def log_message(self, format, *args): + return + +def start_server(requests_amount): + httpd = HTTPServer(HTTP_SERVER_ADDRESS, HttpProcessor) + + def real_func(): + for i in range(requests_amount): + httpd.handle_request() + + t = threading.Thread(target=real_func) + return t + +##################################################################### +# Testing area. +##################################################################### + +def test_select(dict_name="", schema="word String, counter UInt32", requests=[], answers=[], test_data=""): + for i in range(len(requests)): + if dict_name: + get_ch_answer("drop dictionary if exists {}".format(dict_name)) + get_ch_answer('''CREATE DICTIONARY {} ({}) + PRIMARY KEY word + SOURCE(HTTP(url '{}' format 'CSV')) + LAYOUT(complex_key_hashed()) + LIFETIME(0)'''.format(dict_name, schema, HTTP_SERVER_URL_STR+'/test.csv')) + + COMPRESS_METHOD = requests[i] + # print(get_ch_answer("select * from {}".format(dict_name))) + check_answers("select * from {}".format(dict_name), answers[i]) + +def main(): + insert_requests = [ + 'none', + 'gzip', + 'lzma' + ] + + # This answers got experemently in non compressed mode and they are correct + answers = ['''This 152\nHello 1\nis 9283\ndata 555\nWorld 2\ntesting 2313213'''] * 3 + + t = start_server(len(insert_requests)) + t.start() + test_select(dict_name="test_table_select", requests=insert_requests, answers=answers) + t.join() + print("PASSED") + + + +if __name__ == "__main__": + try: + main() + except Exception as ex: + exc_type, exc_value, exc_traceback = sys.exc_info() + traceback.print_tb(exc_traceback, file=sys.stderr) + print(ex, file=sys.stderr) + sys.stderr.flush() + + os._exit(1) + diff --git a/tests/queries/0_stateless/01854_HTTP_dict_decompression.sh b/tests/queries/0_stateless/01854_HTTP_dict_decompression.sh new file mode 100755 index 00000000000..cca710e85cf --- /dev/null +++ b/tests/queries/0_stateless/01854_HTTP_dict_decompression.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +python3 "$CURDIR"/01854_HTTP_dict_decompression.python