fixed bug with ClickHouseDictionarySource & test for all sources added

This commit is contained in:
Артем Стрельцов 2020-04-09 00:48:00 +03:00
parent 64a4640e0e
commit c0051d9cd9
11 changed files with 399 additions and 0 deletions

View File

@ -74,6 +74,7 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(
/// We should set user info even for the case when the dictionary is loaded in-process (without TCP communication). /// We should set user info even for the case when the dictionary is loaded in-process (without TCP communication).
context.setUser(user, password, Poco::Net::SocketAddress("127.0.0.1", 0), {}); context.setUser(user, password, Poco::Net::SocketAddress("127.0.0.1", 0), {});
/// Processors are not supported here yet. /// Processors are not supported here yet.
context.setSettings(context_.getSettings());
context.setSetting("experimental_use_processors", false); context.setSetting("experimental_use_processors", false);
/// Query context is needed because some code in executeQuery function may assume it exists. /// Query context is needed because some code in executeQuery function may assume it exists.
/// Current example is Context::getSampleBlockCache from InterpreterSelectWithUnionQuery::getSampleBlock. /// Current example is Context::getSampleBlockCache from InterpreterSelectWithUnionQuery::getSampleBlock.
@ -217,6 +218,8 @@ void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
bool /* check_config */) -> DictionarySourcePtr bool /* check_config */) -> DictionarySourcePtr
{ {
Context context_local_copy = copyContextAndApplySettings(config_prefix, context, config); Context context_local_copy = copyContextAndApplySettings(config_prefix, context, config);
std::cerr << "initialization: " << context_local_copy.getSettings().max_bytes_to_read << '\n';
/// Note that processors are not supported yet (see constructor), /// Note that processors are not supported yet (see constructor),
/// hence it is not possible to override experimental_use_processors setting /// hence it is not possible to override experimental_use_processors setting
return std::make_unique<ClickHouseDictionarySource>(dict_struct, config, config_prefix + ".clickhouse", sample_block, context_local_copy); return std::make_unique<ClickHouseDictionarySource>(dict_struct, config, config_prefix + ".clickhouse", sample_block, context_local_copy);

View File

@ -0,0 +1,30 @@
<?xml version="1.0"?>
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
<size>1000M</size>
<count>10</count>
</logger>
<tcp_port>9000</tcp_port>
<listen_host>127.0.0.1</listen_host>
<openSSL>
<client>
<cacheSessions>true</cacheSessions>
<verificationMode>none</verificationMode>
<invalidCertificateHandler>
<name>AcceptCertificateHandler</name>
</invalidCertificateHandler>
</client>
</openSSL>
<max_concurrent_queries>500</max_concurrent_queries>
<mark_cache_size>5368709120</mark_cache_size>
<path>./clickhouse/</path>
<users_config>users.xml</users_config>
<dictionaries_config>/etc/clickhouse-server/config.d/*.xml</dictionaries_config>
</yandex>

View File

@ -0,0 +1,48 @@
<yandex>
<dictionary>
<name>test_clickhouse</name>
<source>
<clickhouse>
<host>localhost</host>
<port>9000</port>
<user>default</user>
<password></password>
<db>default</db>
<table>source</table>
</clickhouse>
<settings>
<max_result_bytes>1</max_result_bytes>
</settings>
</source>
<lifetime>600</lifetime>
<layout>
<flat/>
</layout>
<structure>
<id>
<name>id</name>
</id>
<attribute>
<name>first</name>
<type>String</type>
<null_value></null_value>
</attribute>
<attribute>
<name>second</name>
<type>String</type>
<null_value></null_value>
</attribute>
<attribute>
<name>third</name>
<type>String</type>
<null_value></null_value>
</attribute>
</structure>
</dictionary>
</yandex>

View File

@ -0,0 +1,45 @@
<yandex>
<dictionary>
<name>test_executable</name>
<source>
<executable>
<command>cat /etc/clickhouse-server/config.d/source.csv</command>
<format>CSVWithNames</format>
</executable>
<settings>
<format_csv_allow_single_quotes>0</format_csv_allow_single_quotes>
<format_csv_allow_double_quotes>0</format_csv_allow_double_quotes>
</settings>
</source>
<lifetime>600</lifetime>
<layout>
<flat/>
</layout>
<structure>
<id>
<name>id</name>
</id>
<attribute>
<name>first</name>
<type>String</type>
<null_value></null_value>
</attribute>
<attribute>
<name>second</name>
<type>String</type>
<null_value></null_value>
</attribute>
<attribute>
<name>third</name>
<type>String</type>
<null_value></null_value>
</attribute>
</structure>
</dictionary>
</yandex>

View File

@ -0,0 +1,45 @@
<yandex>
<dictionary>
<name>test_file</name>
<source>
<file>
<path>/etc/clickhouse-server/config.d/source.csv</path>
<format>CSVWithNames</format>
</file>
<settings>
<format_csv_allow_single_quotes>0</format_csv_allow_single_quotes>
<format_csv_allow_double_quotes>0</format_csv_allow_double_quotes>
</settings>
</source>
<lifetime>600</lifetime>
<layout>
<flat/>
</layout>
<structure>
<id>
<name>id</name>
</id>
<attribute>
<name>first</name>
<type>String</type>
<null_value></null_value>
</attribute>
<attribute>
<name>second</name>
<type>String</type>
<null_value></null_value>
</attribute>
<attribute>
<name>third</name>
<type>String</type>
<null_value></null_value>
</attribute>
</structure>
</dictionary>
</yandex>

View File

@ -0,0 +1,54 @@
<yandex>
<dictionary>
<name>test_http</name>
<source>
<http>
<url>http://localhost:5555/source.csv</url>
<format>CSVWithNames</format>
<credentials>
<user>foo</user>
<password>bar</password>
</credentials>
<headers>
<header>
<name>api-key</name>
<value>secret</value>
</header>
</headers>
</http>
<settings>
<format_csv_allow_single_quotes>0</format_csv_allow_single_quotes>
<format_csv_allow_double_quotes>0</format_csv_allow_double_quotes>
</settings>
</source>
<lifetime>600</lifetime>
<layout>
<flat/>
</layout>
<structure>
<id>
<name>id</name>
</id>
<attribute>
<name>first</name>
<type>String</type>
<null_value></null_value>
</attribute>
<attribute>
<name>second</name>
<type>String</type>
<null_value></null_value>
</attribute>
<attribute>
<name>third</name>
<type>String</type>
<null_value></null_value>
</attribute>
</structure>
</dictionary>
</yandex>

View File

@ -0,0 +1,3 @@
id,first,second,third
1,'a,"b,c
2,'d,"e,f
Can't render this file because it contains an unexpected character in line 3 and column 6.

View File

@ -0,0 +1,23 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
</users>
<quotas>
<default>
</default>
</quotas>
</yandex>

View File

@ -0,0 +1,86 @@
# -*- coding: utf-8 -*-
import argparse
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
import socket
import ssl
import csv
# Decorator used to see if authentication works for external dictionary who use a HTTP source.
def check_auth(fn):
def wrapper(req):
auth_header = req.headers.get('authorization', None)
api_key = req.headers.get('api-key', None)
if not auth_header or auth_header != 'Basic Zm9vOmJhcg==' or not api_key or api_key != 'secret':
req.send_response(401)
else:
fn(req)
return wrapper
def start_server(server_address, data_path, schema, cert_path, address_family):
class TSVHTTPHandler(BaseHTTPRequestHandler):
@check_auth
def do_GET(self):
self.__send_headers()
self.__send_data()
@check_auth
def do_POST(self):
ids = self.__read_and_decode_post_ids()
print "ids=", ids
self.__send_headers()
self.__send_data(ids)
def __send_headers(self):
self.send_response(200)
self.send_header('Content-type', 'text/csv')
self.end_headers()
def __send_data(self, only_ids = None):
with open(data_path, 'r') as fl:
reader = csv.reader(fl, delimiter='\t')
for row in reader:
if not only_ids or (row[0] in only_ids):
self.wfile.write('\t'.join(row) + '\n')
def __read_and_decode_post_ids(self):
data = self.__read_and_decode_post_data()
return filter(None, data.split())
def __read_and_decode_post_data(self):
transfer_encoding = self.headers.get("Transfer-encoding")
decoded = ""
if transfer_encoding == "chunked":
while True:
s = self.rfile.readline()
chunk_length = int(s, 16)
if not chunk_length:
break
decoded += self.rfile.read(chunk_length)
self.rfile.readline()
else:
content_length = int(self.headers.get("Content-Length", 0))
decoded = self.rfile.read(content_length)
return decoded
if address_family == "ipv6":
HTTPServer.address_family = socket.AF_INET6
httpd = HTTPServer(server_address, TSVHTTPHandler)
if schema == "https":
httpd.socket = ssl.wrap_socket(httpd.socket, certfile=cert_path, server_side=True)
httpd.serve_forever()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Simple HTTP server returns data from file")
parser.add_argument("--host", default="localhost")
parser.add_argument("--port", default=5555, type=int)
parser.add_argument("--data-path", required=True)
parser.add_argument("--schema", choices=("http", "https"), required=True)
parser.add_argument("--cert-path", default="./fake_cert.pem")
parser.add_argument('--address-family', choices=("ipv4", "ipv6"), default="ipv4")
args = parser.parse_args()
start_server((args.host, args.port), args.data_path, args.schema, args.cert_path, args.address_family)

View File

@ -0,0 +1,62 @@
import os
import pytest
from helpers.cluster import ClickHouseCluster
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
config_dir = os.path.join(SCRIPT_DIR, './configs')
DICTIONARY_FILES = [
'configs/dictionaries/FileSourceConfig.xml',
'configs/dictionaries/ExecutableSourceConfig.xml',
'configs/dictionaries/source.csv',
'configs/dictionaries/HTTPSourceConfig.xml',
'configs/dictionaries/ClickHouseSourceConfig.xml'
]
cluster = ClickHouseCluster(__file__, base_configs_dir=config_dir)
instance = cluster.add_instance('node', main_configs=DICTIONARY_FILES, config_dir=config_dir)
def prepare():
node = instance
path = "/source.csv"
script_dir = os.path.dirname(os.path.realpath(__file__))
node.copy_file_to_container(os.path.join(script_dir, './http_server.py'), '/http_server.py')
node.copy_file_to_container(os.path.join(script_dir, 'configs/dictionaries/source.csv'), './source.csv')
node.exec_in_container([
"bash",
"-c",
"python2 /http_server.py --data-path={tbl} --schema=http --host=localhost --port=5555".format(
tbl=path)
], detach=True)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
prepare()
yield cluster
finally:
cluster.shutdown()
def test_work(start_cluster):
query = instance.query
assert query("SELECT dictGetString('test_file', 'first', toUInt64(1))") == "\\\'a\n"
assert query("SELECT dictGetString('test_file', 'second', toUInt64(1))") == "\"b\n"
assert query("SELECT dictGetString('test_executable', 'first', toUInt64(1))") == "\\\'a\n"
assert query("SELECT dictGetString('test_executable', 'second', toUInt64(1))") == "\"b\n"
caught_exception = ''
try:
instance.query("CREATE TABLE source (id UInt64, first String, second String, third String) ENGINE=File(CSVWithNames);")
instance.query("INSERT INTO default.source VALUES (1, 'aaa', 'bbb', 'cccc'), (2, 'ddd', 'eee', 'fff')")
instance.query("SELECT dictGetString('test_clickhouse', 'second', toUInt64(1))")
except Exception as e:
caught_exception = str(e)
assert caught_exception.find("Limit for result exceeded") != -1
assert query("SELECT dictGetString('test_http', 'first', toUInt64(1))") == "\\\'a\n"
assert query("SELECT dictGetString('test_http', 'second', toUInt64(1))") == "\"b\n"