mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
solving style-check problems
This commit is contained in:
parent
4ea9851e99
commit
13a6e03849
@ -42,7 +42,6 @@ HTTPDictionarySource::HTTPDictionarySource(
|
||||
, context(context_)
|
||||
, timeouts(ConnectionTimeouts::getHTTPTimeouts(context))
|
||||
{
|
||||
|
||||
if (check_config)
|
||||
context->getRemoteHostFilter().checkURL(Poco::URI(url));
|
||||
|
||||
@ -87,12 +86,11 @@ HTTPDictionarySource::HTTPDictionarySource(const HTTPDictionarySource & other)
|
||||
credentials.setPassword(other.credentials.getPassword());
|
||||
}
|
||||
|
||||
BlockInputStreamPtr HTTPDictionarySource::createWrappedBuffer(std::unique_ptr<ReadWriteBufferFromHTTP> http_buffer_ptr) {
|
||||
BlockInputStreamPtr HTTPDictionarySource::createWrappedBuffer(std::unique_ptr<ReadWriteBufferFromHTTP> http_buffer_ptr)
|
||||
{
|
||||
String http_request_compression_method_str = http_buffer_ptr->getCompressMethod();
|
||||
auto in_ptr_wrapped = wrapReadBufferWithCompressionMethod(
|
||||
std::move(http_buffer_ptr),
|
||||
chooseCompressionMethod({}, http_request_compression_method_str));
|
||||
|
||||
auto in_ptr_wrapped
|
||||
= wrapReadBufferWithCompressionMethod(std::move(http_buffer_ptr), chooseCompressionMethod({}, http_request_compression_method_str));
|
||||
auto input_stream = context->getInputFormat(format, *in_ptr_wrapped, sample_block, max_block_size);
|
||||
return std::make_shared<OwningBlockInputStream<ReadBuffer>>(input_stream, std::move(in_ptr_wrapped));
|
||||
}
|
||||
@ -119,9 +117,15 @@ BlockInputStreamPtr HTTPDictionarySource::loadAll()
|
||||
LOG_TRACE(log, "loadAll {}", toString());
|
||||
Poco::URI uri(url);
|
||||
auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(
|
||||
uri, Poco::Net::HTTPRequest::HTTP_GET, ReadWriteBufferFromHTTP::OutStreamCallback(), timeouts,
|
||||
0, credentials, DBMS_DEFAULT_BUFFER_SIZE, header_entries);
|
||||
return createWrappedBuffer(std::move(in_ptr));
|
||||
uri,
|
||||
Poco::Net::HTTPRequest::HTTP_GET,
|
||||
ReadWriteBufferFromHTTP::OutStreamCallback(),
|
||||
timeouts,
|
||||
0,
|
||||
credentials,
|
||||
DBMS_DEFAULT_BUFFER_SIZE,
|
||||
header_entries);
|
||||
return createWrappedBuffer(std::move(in_ptr));
|
||||
}
|
||||
|
||||
BlockInputStreamPtr HTTPDictionarySource::loadUpdatedAll()
|
||||
@ -130,8 +134,14 @@ BlockInputStreamPtr HTTPDictionarySource::loadUpdatedAll()
|
||||
getUpdateFieldAndDate(uri);
|
||||
LOG_TRACE(log, "loadUpdatedAll {}", uri.toString());
|
||||
auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(
|
||||
uri, Poco::Net::HTTPRequest::HTTP_GET, ReadWriteBufferFromHTTP::OutStreamCallback(), timeouts,
|
||||
0, credentials, DBMS_DEFAULT_BUFFER_SIZE, header_entries);
|
||||
uri,
|
||||
Poco::Net::HTTPRequest::HTTP_GET,
|
||||
ReadWriteBufferFromHTTP::OutStreamCallback(),
|
||||
timeouts,
|
||||
0,
|
||||
credentials,
|
||||
DBMS_DEFAULT_BUFFER_SIZE,
|
||||
header_entries);
|
||||
return createWrappedBuffer(std::move(in_ptr));
|
||||
}
|
||||
|
||||
@ -150,8 +160,7 @@ BlockInputStreamPtr HTTPDictionarySource::loadIds(const std::vector<UInt64> & id
|
||||
|
||||
Poco::URI uri(url);
|
||||
auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(
|
||||
uri, Poco::Net::HTTPRequest::HTTP_POST, out_stream_callback, timeouts,
|
||||
0, credentials, DBMS_DEFAULT_BUFFER_SIZE, header_entries);
|
||||
uri, Poco::Net::HTTPRequest::HTTP_POST, out_stream_callback, timeouts, 0, credentials, DBMS_DEFAULT_BUFFER_SIZE, header_entries);
|
||||
return createWrappedBuffer(std::move(in_ptr));
|
||||
}
|
||||
|
||||
@ -170,8 +179,7 @@ BlockInputStreamPtr HTTPDictionarySource::loadKeys(const Columns & key_columns,
|
||||
|
||||
Poco::URI uri(url);
|
||||
auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(
|
||||
uri, Poco::Net::HTTPRequest::HTTP_POST, out_stream_callback, timeouts,
|
||||
0, credentials, DBMS_DEFAULT_BUFFER_SIZE, header_entries);
|
||||
uri, Poco::Net::HTTPRequest::HTTP_POST, out_stream_callback, timeouts, 0, credentials, DBMS_DEFAULT_BUFFER_SIZE, header_entries);
|
||||
return createWrappedBuffer(std::move(in_ptr));
|
||||
}
|
||||
|
||||
@ -204,21 +212,19 @@ std::string HTTPDictionarySource::toString() const
|
||||
void registerDictionarySourceHTTP(DictionarySourceFactory & factory)
|
||||
{
|
||||
auto create_table_source = [=](const DictionaryStructure & dict_struct,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
Block & sample_block,
|
||||
ContextPtr context,
|
||||
const std::string & /* default_database */,
|
||||
bool check_config) -> DictionarySourcePtr
|
||||
{
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
Block & sample_block,
|
||||
ContextPtr context,
|
||||
const std::string & /* default_database */,
|
||||
bool check_config) -> DictionarySourcePtr {
|
||||
if (dict_struct.has_expressions)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Dictionary source of type `http` does not support attribute expressions");
|
||||
|
||||
auto context_local_copy = copyContextAndApplySettings(config_prefix, context, config);
|
||||
|
||||
return std::make_unique<HTTPDictionarySource>(
|
||||
dict_struct, config, config_prefix + ".http",
|
||||
sample_block, context_local_copy, check_config);
|
||||
dict_struct, config, config_prefix + ".http", sample_block, context_local_copy, check_config);
|
||||
};
|
||||
factory.registerSource("http", create_table_source);
|
||||
}
|
||||
|
@ -1,14 +1,14 @@
|
||||
#pragma once
|
||||
|
||||
#include <IO/CompressionMethod.h>
|
||||
#include <IO/ConnectionTimeouts.h>
|
||||
#include <IO/ReadWriteBufferFromHTTP.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Poco/Net/HTTPBasicCredentials.h>
|
||||
#include <Poco/URI.h>
|
||||
#include <common/LocalDateTime.h>
|
||||
#include "DictionaryStructure.h"
|
||||
#include "IDictionarySource.h"
|
||||
#include <Interpreters/Context.h>
|
||||
#include <IO/CompressionMethod.h>
|
||||
|
||||
namespace Poco
|
||||
{
|
||||
|
@ -92,7 +92,7 @@ namespace detail
|
||||
protected:
|
||||
Poco::URI uri;
|
||||
std::string method;
|
||||
std::string compress_method;
|
||||
std::string compress_method;
|
||||
|
||||
UpdatableSessionPtr session;
|
||||
std::istream * istr; /// owned by session
|
||||
@ -138,7 +138,7 @@ namespace detail
|
||||
istr = receiveResponse(*sess, request, response, true);
|
||||
response.getCookies(cookies);
|
||||
|
||||
compress_method = response.get("Content-Encoding");
|
||||
compress_method = response.get("Content-Encoding");
|
||||
return istr;
|
||||
|
||||
}
|
||||
@ -167,6 +167,7 @@ namespace detail
|
||||
: ReadBuffer(nullptr, 0)
|
||||
, uri {uri_}
|
||||
, method {!method_.empty() ? method_ : out_stream_callback_ ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET}
|
||||
, compress_method{}
|
||||
, session {session_}
|
||||
, out_stream_callback {out_stream_callback_}
|
||||
, credentials {credentials_}
|
||||
@ -233,9 +234,10 @@ namespace detail
|
||||
next_callback(count());
|
||||
}
|
||||
|
||||
std::string getCompressMethod() const {
|
||||
return compress_method;
|
||||
}
|
||||
std::string getCompressMethod() const
|
||||
{
|
||||
return compress_method;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
153
tests/queries/0_stateless/01854_HTTP_dict_decompression.python
Normal file
153
tests/queries/0_stateless/01854_HTTP_dict_decompression.python
Normal file
@ -0,0 +1,153 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from http.server import SimpleHTTPRequestHandler,HTTPServer
|
||||
import socket
|
||||
import csv
|
||||
import sys
|
||||
import tempfile
|
||||
import threading
|
||||
import os
|
||||
import gzip
|
||||
import traceback
|
||||
import urllib.request
|
||||
import subprocess
|
||||
import lzma
|
||||
|
||||
def get_local_port(host):
|
||||
with socket.socket() as fd:
|
||||
fd.bind((host, 0))
|
||||
return fd.getsockname()[1]
|
||||
|
||||
CLICKHOUSE_HOST = os.environ.get('CLICKHOUSE_HOST', '127.0.0.1')
|
||||
CLICKHOUSE_PORT_HTTP = os.environ.get('CLICKHOUSE_PORT_HTTP', '8123')
|
||||
|
||||
#####################################################################################
|
||||
# This test starts an HTTP server and serves data to clickhouse url-engine based table.
|
||||
# The main goal of this test is checking that compress methods are working.
|
||||
# In order for it to work ip+port of http server (given below) should be
|
||||
# accessible from clickhouse server.
|
||||
#####################################################################################
|
||||
|
||||
# IP-address of this host accessible from outside world.
|
||||
HTTP_SERVER_HOST = subprocess.check_output(['hostname', '-i']).decode('utf-8').strip()
|
||||
HTTP_SERVER_PORT = get_local_port(HTTP_SERVER_HOST)
|
||||
|
||||
# IP address and port of the HTTP server started from this script.
|
||||
HTTP_SERVER_ADDRESS = (HTTP_SERVER_HOST, HTTP_SERVER_PORT)
|
||||
HTTP_SERVER_URL_STR = 'http://' + ':'.join(str(s) for s in HTTP_SERVER_ADDRESS) + "/"
|
||||
|
||||
# Because we need to check content of file.csv we can create this content and avoid reading csv
|
||||
CSV_DATA = "Hello, 1\nWorld, 2\nThis, 152\nis, 9283\ntesting, 2313213\ndata, 555\n"
|
||||
|
||||
|
||||
# Choose compression method
|
||||
# (Will change during test, need to check standart data sending, to make sure that nothing broke)
|
||||
COMPRESS_METHOD = 'none'
|
||||
|
||||
def get_ch_answer(query):
|
||||
url = os.environ.get('CLICKHOUSE_URL', 'http://{host}:{port}'.format(host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT_HTTP))
|
||||
return urllib.request.urlopen(url, data=query.encode()).read().decode()
|
||||
|
||||
def check_answers(query, answer):
|
||||
ch_answer = get_ch_answer(query)
|
||||
if ch_answer.strip() != answer.strip():
|
||||
print("FAIL on query:", query, file=sys.stderr)
|
||||
print("Expected answer:", answer, file=sys.stderr)
|
||||
print("Fetched answer :", ch_answer, file=sys.stderr)
|
||||
raise Exception("Fail on query")
|
||||
|
||||
# Server with head method which is useful for debuging by hands
|
||||
class HttpProcessor(SimpleHTTPRequestHandler):
|
||||
def _set_headers(self):
|
||||
self.send_response(200)
|
||||
self.send_header('Content-Encoding', COMPRESS_METHOD)
|
||||
if COMPRESS_METHOD == 'none':
|
||||
self.send_header('Content-Length', len(CSV_DATA.encode()))
|
||||
else:
|
||||
self.compress_data()
|
||||
self.send_header('Content-Length', len(self.data))
|
||||
self.send_header('Content-Type', 'text/csv')
|
||||
self.end_headers()
|
||||
|
||||
def do_HEAD(self):
|
||||
self._set_headers()
|
||||
return
|
||||
|
||||
def compress_data(self):
|
||||
if COMPRESS_METHOD == 'gzip':
|
||||
self.data = gzip.compress((CSV_DATA).encode())
|
||||
elif COMPRESS_METHOD == 'lzma':
|
||||
self.data = lzma.compress((CSV_DATA).encode())
|
||||
else:
|
||||
self.data = 'WRONG CONVERSATION'.encode()
|
||||
|
||||
|
||||
def do_GET(self):
|
||||
self._set_headers()
|
||||
|
||||
if COMPRESS_METHOD == 'none':
|
||||
self.wfile.write(CSV_DATA.encode())
|
||||
else:
|
||||
self.wfile.write(self.data)
|
||||
return
|
||||
|
||||
def log_message(self, format, *args):
|
||||
return
|
||||
|
||||
def start_server(requests_amount):
|
||||
httpd = HTTPServer(HTTP_SERVER_ADDRESS, HttpProcessor)
|
||||
|
||||
def real_func():
|
||||
for i in range(requests_amount):
|
||||
httpd.handle_request()
|
||||
|
||||
t = threading.Thread(target=real_func)
|
||||
return t
|
||||
|
||||
#####################################################################
|
||||
# Testing area.
|
||||
#####################################################################
|
||||
|
||||
def test_select(dict_name="", schema="word String, counter UInt32", requests=[], answers=[], test_data=""):
|
||||
for i in range(len(requests)):
|
||||
if dict_name:
|
||||
get_ch_answer("drop dictionary if exists {}".format(dict_name))
|
||||
get_ch_answer('''CREATE DICTIONARY {} ({})
|
||||
PRIMARY KEY word
|
||||
SOURCE(HTTP(url '{}' format 'CSV'))
|
||||
LAYOUT(complex_key_hashed())
|
||||
LIFETIME(0)'''.format(dict_name, schema, HTTP_SERVER_URL_STR+'/test.csv'))
|
||||
|
||||
COMPRESS_METHOD = requests[i]
|
||||
# print(get_ch_answer("select * from {}".format(dict_name)))
|
||||
check_answers("select * from {}".format(dict_name), answers[i])
|
||||
|
||||
def main():
|
||||
insert_requests = [
|
||||
'none',
|
||||
'gzip',
|
||||
'lzma'
|
||||
]
|
||||
|
||||
# This answers got experemently in non compressed mode and they are correct
|
||||
answers = ['''This 152\nHello 1\nis 9283\ndata 555\nWorld 2\ntesting 2313213'''] * 3
|
||||
|
||||
t = start_server(len(insert_requests))
|
||||
t.start()
|
||||
test_select(dict_name="test_table_select", requests=insert_requests, answers=answers)
|
||||
t.join()
|
||||
print("PASSED")
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
main()
|
||||
except Exception as ex:
|
||||
exc_type, exc_value, exc_traceback = sys.exc_info()
|
||||
traceback.print_tb(exc_traceback, file=sys.stderr)
|
||||
print(ex, file=sys.stderr)
|
||||
sys.stderr.flush()
|
||||
|
||||
os._exit(1)
|
||||
|
7
tests/queries/0_stateless/01854_HTTP_dict_decompression.sh
Executable file
7
tests/queries/0_stateless/01854_HTTP_dict_decompression.sh
Executable file
@ -0,0 +1,7 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
python3 "$CURDIR"/01854_HTTP_dict_decompression.python
|
Loading…
Reference in New Issue
Block a user