From 010a7e00ee6fc2c5e8a6a6a2acf57ef27a84f3d9 Mon Sep 17 00:00:00 2001 From: alesapin Date: Thu, 21 Feb 2019 15:04:08 +0300 Subject: [PATCH 01/13] Add mysql dictionaries tests --- dbms/tests/integration/helpers/cluster.py | 24 +- .../helpers/docker_compose_mongo.yml | 10 + .../test_external_dictionaries/__init__.py | 0 .../configs/config.xml | 30 +++ .../configs/users.xml | 23 ++ .../test_external_dictionaries/dictionary.py | 224 ++++++++++++++++++ .../external_sources.py | 156 ++++++++++++ .../test_external_dictionaries/test.py | 204 ++++++++++++++++ 8 files changed, 663 insertions(+), 8 deletions(-) create mode 100644 dbms/tests/integration/helpers/docker_compose_mongo.yml create mode 100644 dbms/tests/integration/test_external_dictionaries/__init__.py create mode 100644 dbms/tests/integration/test_external_dictionaries/configs/config.xml create mode 100644 dbms/tests/integration/test_external_dictionaries/configs/users.xml create mode 100644 dbms/tests/integration/test_external_dictionaries/dictionary.py create mode 100644 dbms/tests/integration/test_external_dictionaries/external_sources.py create mode 100644 dbms/tests/integration/test_external_dictionaries/test.py diff --git a/dbms/tests/integration/helpers/cluster.py b/dbms/tests/integration/helpers/cluster.py index df8c066afa5..9924de0f504 100644 --- a/dbms/tests/integration/helpers/cluster.py +++ b/dbms/tests/integration/helpers/cluster.py @@ -98,6 +98,7 @@ class ClickHouseCluster: self.with_kafka = False self.with_odbc_drivers = False self.with_hdfs = False + self.with_mongo = False self.docker_client = None self.is_up = False @@ -109,7 +110,7 @@ class ClickHouseCluster: cmd += " client" return cmd - def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macros={}, with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None, with_odbc_drivers=False, with_postgres=False, with_hdfs=False, hostname=None, env_variables={}, image="yandex/clickhouse-integration-test", stay_alive=False, ipv4_address=None, ipv6_address=None): + def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macros={}, with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None, with_odbc_drivers=False, with_postgres=False, with_hdfs=False, with_mongo=False, hostname=None, env_variables={}, image="yandex/clickhouse-integration-test", stay_alive=False, ipv4_address=None, ipv6_address=None): """Add an instance to the cluster. name - the name of the instance directory and the value of the 'instance' macro in ClickHouse. @@ -127,7 +128,7 @@ class ClickHouseCluster: instance = ClickHouseInstance( self, self.base_dir, name, config_dir, main_configs, user_configs, macros, with_zookeeper, - self.zookeeper_config_path, with_mysql, with_kafka, self.base_configs_dir, self.server_bin_path, + self.zookeeper_config_path, with_mysql, with_kafka, with_mongo, self.base_configs_dir, self.server_bin_path, self.odbc_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers, hostname=hostname, env_variables=env_variables, image=image, stay_alive=stay_alive, ipv4_address=ipv4_address, ipv6_address=ipv6_address) @@ -176,6 +177,11 @@ class ClickHouseCluster: self.base_hdfs_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_hdfs.yml')] + if with_mongo and not self.with_mongo: + self.with_mongo = True + self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_mongo.yml')]) + self.base_mongo_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', + self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mongo.yml')] return instance @@ -290,6 +296,10 @@ class ClickHouseCluster: subprocess_check_call(self.base_hdfs_cmd + ['up', '-d', '--force-recreate']) self.wait_hdfs_to_start(120) + if self.with_mongo and self.base_mongo_cmd: + subprocess_check_call(self.base_mongo_cmd + ['up', '-d', '--force-recreate']) + time.sleep(10) + subprocess_check_call(self.base_cmd + ['up', '-d', '--no-recreate']) start_deadline = time.time() + 20.0 # seconds @@ -361,11 +371,8 @@ services: cap_add: - SYS_PTRACE depends_on: {depends_on} - user: '{user}' env_file: - {env_file} - security_opt: - - label:disable {networks} {app_net} {ipv4_address} @@ -388,8 +395,9 @@ class ClickHouseInstance: def __init__( self, cluster, base_path, name, custom_config_dir, custom_main_configs, custom_user_configs, macros, - with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, base_configs_dir, server_bin_path, odbc_bridge_bin_path, - clickhouse_path_dir, with_odbc_drivers, hostname=None, env_variables={}, image="yandex/clickhouse-integration-test", + with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_mongo, base_configs_dir, + server_bin_path, odbc_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers, hostname=None, + env_variables={}, image="yandex/clickhouse-integration-test", stay_alive=False, ipv4_address=None, ipv6_address=None): self.name = name @@ -412,6 +420,7 @@ class ClickHouseInstance: self.with_mysql = with_mysql self.with_kafka = with_kafka + self.with_mongo = with_mongo self.path = p.join(self.cluster.instances_dir, name) self.docker_compose_path = p.join(self.path, 'docker_compose.yml') @@ -672,7 +681,6 @@ class ClickHouseInstance: db_dir=db_dir, logs_dir=logs_dir, depends_on=str(depends_on), - user=os.getuid(), env_file=env_file, odbc_ini_path=odbc_ini_path, entrypoint_cmd=entrypoint_cmd, diff --git a/dbms/tests/integration/helpers/docker_compose_mongo.yml b/dbms/tests/integration/helpers/docker_compose_mongo.yml new file mode 100644 index 00000000000..f9d3b15ba0a --- /dev/null +++ b/dbms/tests/integration/helpers/docker_compose_mongo.yml @@ -0,0 +1,10 @@ +version: '2.2' +services: + mongo1: + image: mongo + restart: always + environment: + MONGO_INITDB_ROOT_USERNAME: root + MONGO_INITDB_ROOT_PASSWORD: clickhouse + ports: + - 27018:27017 diff --git a/dbms/tests/integration/test_external_dictionaries/__init__.py b/dbms/tests/integration/test_external_dictionaries/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/integration/test_external_dictionaries/configs/config.xml b/dbms/tests/integration/test_external_dictionaries/configs/config.xml new file mode 100644 index 00000000000..c102ff1ed91 --- /dev/null +++ b/dbms/tests/integration/test_external_dictionaries/configs/config.xml @@ -0,0 +1,30 @@ + + + + trace + /var/log/clickhouse-server/clickhouse-server.log + /var/log/clickhouse-server/clickhouse-server.err.log + 1000M + 10 + + + 9000 + 127.0.0.1 + + + + true + none + + AcceptCertificateHandler + + + + + 500 + 5368709120 + ./clickhouse/ + users.xml + + /etc/clickhouse-server/config.d/*.xml + diff --git a/dbms/tests/integration/test_external_dictionaries/configs/users.xml b/dbms/tests/integration/test_external_dictionaries/configs/users.xml new file mode 100644 index 00000000000..60112f99a18 --- /dev/null +++ b/dbms/tests/integration/test_external_dictionaries/configs/users.xml @@ -0,0 +1,23 @@ + + + + + + + + + + + + ::/0 + + default + default + + + + + + + + diff --git a/dbms/tests/integration/test_external_dictionaries/dictionary.py b/dbms/tests/integration/test_external_dictionaries/dictionary.py new file mode 100644 index 00000000000..41c5d0bcf8e --- /dev/null +++ b/dbms/tests/integration/test_external_dictionaries/dictionary.py @@ -0,0 +1,224 @@ +#-*- coding: utf-8 -*- + + +class Layout(object): + LAYOUTS_STR_DICT = { + 'flat': '', + 'hashed': '', + 'cache': '128', + 'complex_key_hashed': '', + 'complex_key_cache': '128', + 'range_hashed': '' + } + + def __init__(self, name): + self.name = name + self.is_complex = False + self.is_simple = False + self.is_ranged = False + if self.name.startswith('complex'): + self.layout_type = "complex" + self.is_complex = True + elif name.startswith("range"): + self.layout_type = "ranged" + self.is_ranged = True + else: + self.layout_type = "simple" + self.is_simple = True + + def get_str(self): + return self.LAYOUTS_STR_DICT[self.name] + + def get_key_block_name(self): + if self.is_complex: + return 'key' + else: + return 'id' + + +class Row(object): + def __init__(self, fields, values): + self.data = {} + for field, value in zip(fields, values): + self.data[field.name] = value + + def get_value_by_name(self, name): + return self.data[name] + + +class Field(object): + def __init__(self, name, field_type, is_key=False, is_range_key=False, default=None, hierarchical=False, range_hash_type=None): + self.name = name + self.field_type = field_type + self.is_key = is_key + self.default = default + self.hierarchical = hierarchical + self.range_hash_type = range_hash_type + self.is_range = self.range_hash_type is not None + self.is_range_key = is_range_key + + def get_attribute_str(self): + return ''' + + {name} + {field_type} + {default} + {hierarchical} + '''.format( + name=self.name, + field_type=self.field_type, + default=self.default if self.default else '', + hierarchical='true' if self.hierarchical else 'false', + ) + + def get_simple_index_str(self): + return '{name}'.format(name=self.name) + + def get_range_hash_str(self): + if not self.range_hash_type: + raise Exception("Field {} is not range hashed".format(self.name)) + return ''' + + {name} + + '''.format(type=self.range_hash_type, name=self.name) + + +class DictionaryStructure(object): + def __init__(self, layout, fields): + self.layout = layout + self.keys = [] + self.range_key = None + self.ordinary_fields = [] + self.range_fields = [] + for field in fields: + if field.is_key: + self.keys.append(field) + elif field.is_range: + self.range_fields.append(field) + else: + self.ordinary_fields.append(field) + + if field.is_range_key: + if self.range_key is not None: + raise Exception("Duplicate range key {}".format(field.name)) + self.range_key = field + + if not self.layout.is_complex and len(self.keys) > 1: + raise Exception("More than one key {} field in non complex layout {}".format(len(self.keys), self.layout.name)) + + if self.layout.is_ranged and (not self.range_key or len(self.range_fields) != 2): + raise Exception("Inconsistent configuration of ranged dictionary") + + def get_structure_str(self): + fields_strs = [] + for field in self.ordinary_fields: + fields_strs.append(field.get_attribute_str()) + key_strs = [] + if self.layout.is_complex: + for key_field in self.keys: + key_strs.append(key_field.get_attribute_str()) + else: # same for simple and ranged + for key_field in self.keys: + key_strs.append(key_field.get_simple_index_str()) + + ranged_strs = [] + if self.layout.is_ranged: + for range_field in self.range_fields: + ranged_strs.append(range_field.get_range_hash_str()) + + return ''' + + {layout_str} + + + <{key_block_name}> + {key_str} + + {attributes_str} + {range_strs} + '''.format( + layout_str=self.layout.get_str(), + key_block_name=self.layout.get_key_block_name(), + key_str='\n'.join(key_strs), + attributes_str='\n'.join(fields_strs), + range_strs='\n'.join(ranged_strs), + ) + + def get_dict_get_expression(self, dict_name, field, row): + if field in self.keys: + raise Exception("Trying to receive key field {} from dictionary".format(field.name)) + + if not self.layout.is_complex: + key_expr = 'toUInt64({})'.format(row.data[self.keys[0].name]) + else: + key_exprs_strs = [] + for key in self.keys: + val = row.data[key.name] + if isinstance(val, str): + val = "'" + val + "'" + key_exprs_strs.append('to{type}({value})'.format(type=key.field_type, value=val)) + key_expr = '(' + ','.join(key_exprs_strs) + ')' + + date_expr = '' + if self.layout.is_ranged: + val = row.data[self.range_key.name] + if isinstance(val, str): + val = "'" + val + "'" + val = "to{type}({val})".format(type=self.range_key.field_type, val=val) + + date_expr = ', ' + val + + return "dictGet{field_type}('{dict_name}', '{field_name}', {key_expr}{date_expr})".format( + field_type=field.field_type, + dict_name=dict_name, + field_name=field.name, + key_expr=key_expr, + date_expr=date_expr, + ) + + +class Dictionary(object): + def __init__(self, name, structure, source, config_path, table_name): + self.name = name + self.structure = structure + self.source = source + self.config_path = config_path + self.table_name = table_name + + def generate_config(self): + with open(self.config_path, 'w') as result: + result.write(''' + + + + 3 + 5 + + {name} + {structure} + + {source} + + + + '''.format( + name=self.name, + structure=self.structure.get_structure_str(), + source=self.source.get_source_str(self.table_name), + )) + + def prepare_source(self): + self.source.prepare(self.structure, self.table_name) + + def load_data(self, data): + if not self.source.prepared: + raise Exception("Cannot load data for dictionary {}, source is not prepared".format(self.name)) + + self.source.load_data(data, self.table_name) + + def get_select_query(self, field, row): + return 'select {}'.format(self.structure.get_dict_get_expression(self.name, field, row)) + + def is_complex(self): + return self.structure.layout.is_complex diff --git a/dbms/tests/integration/test_external_dictionaries/external_sources.py b/dbms/tests/integration/test_external_dictionaries/external_sources.py new file mode 100644 index 00000000000..7e3ecfa5007 --- /dev/null +++ b/dbms/tests/integration/test_external_dictionaries/external_sources.py @@ -0,0 +1,156 @@ +# -*- coding: utf-8 -*- +import warnings +import pymysql.cursors +import pymongo + +class ExternalSource(object): + def __init__(self, name, internal_hostname, internal_port, + docker_hostname, docker_port, user, password): + self.name = name + self.internal_hostname = internal_hostname + self.internal_port = int(internal_port) + self.docker_hostname = docker_hostname + self.docker_port = int(docker_port) + self.user = user + self.password = password + + def get_source_str(self): + raise NotImplementedError("Method {} is not implemented for {}".format( + "get_source_config_part", self.__class__.__name__)) + + def prepare(self, structure): + raise NotImplementedError("Method {} is not implemented for {}".format( + "prepare_remote_source", self.__class__.__name__)) + + # data is banch of Row + def load_data(self, data): + raise NotImplementedError("Method {} is not implemented for {}".format( + "prepare_remote_source", self.__class__.__name__)) + +class SourceMySQL(ExternalSource): + TYPE_MAPPING = { + 'UInt8': 'tinyint unsigned', + 'UInt16': 'smallint unsigned', + 'UInt32': 'int unsigned', + 'UInt64': 'bigint unsigned', + 'Int8': 'tinyint', + 'Int16': 'smallint', + 'Int32': 'int', + 'Int64': 'bigint', + 'UUID': 'varchar(36)', + 'Date': 'date', + 'DateTime': 'datetime', + 'String': 'text', + 'Float32': 'float', + 'Float64': 'double' + } + def create_mysql_conn(self): + self.connection = pymysql.connect( + user=self.user, + password=self.password, + host=self.internal_hostname, + port=self.internal_port) + + def execute_mysql_query(self, query): + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + with self.connection.cursor() as cursor: + cursor.execute(query) + self.connection.commit() + + def get_source_str(self, table_name): + return ''' + + + 1 + 127.0.0.1 + 3333 + + + 2 + {hostname} + {port} + + {user} + {password} + test + {tbl}
+
'''.format( + hostname=self.docker_hostname, + port=self.docker_port, + user=self.user, + password=self.password, + tbl=table_name, + ) + + def prepare(self, structure, table_name): + self.create_mysql_conn() + self.execute_mysql_query("create database if not exists test default character set 'utf8'") + fields_strs = [] + for field in structure.keys + structure.ordinary_fields + structure.range_fields: + fields_strs.append(field.name + ' ' + self.TYPE_MAPPING[field.field_type]) + create_query = '''create table test.{table_name} ( + {fields_str}); + '''.format(table_name=table_name, fields_str=','.join(fields_strs)) + self.execute_mysql_query(create_query) + self.prepared = True + + def load_data(self, data, table_name): + values_strs = [] + if not data: + return + ordered_names = [name for name in data[0].data] + for row in data: + sorted_row = [] + for name in ordered_names: + data = row.data[name] + if isinstance(row.data[name], str): + data = "'" + data + "'" + else: + data = str(data) + sorted_row.append(data) + values_strs.append('(' + ','.join(sorted_row) + ')') + query = 'insert into test.{} ({}) values {}'.format( + table_name, + ','.join(ordered_names), + ''.join(values_strs)) + self.execute_mysql_query(query) + + +class SourceMongo(ExternalSource): + + def get_source_str(self, table_name): + return ''' + + {host} + {port} + {user} + {password} + test + {tbl} + + '''.format( + host=self.docker_hostname, + port=self.docker_port, + user=self.user, + password=self.password, + tbl=table_name, + ) + + def prepare(self, structure, table_name): + connection_str = 'mongodb://{user}:{password}@{host}:{port}'.format( + host=self.internal_hostname, port=self.internal_port, + user=self.user, password=self.password) + self.connection = pymongo.MongoClient(connection_str) + self.connection.create + self.structure = structure + self.db = self.connection['test'] + self.prepared = True + + def load_data(self, data, table_name): + tbl = self.db[table_name] + to_insert = [dict(row.data) for row in data] + result = tbl.insert_many(to_insert) + print "IDS:", result.inserted_ids + for r in tbl.find(): + print "RESULT:", r diff --git a/dbms/tests/integration/test_external_dictionaries/test.py b/dbms/tests/integration/test_external_dictionaries/test.py new file mode 100644 index 00000000000..ac4ce47f63d --- /dev/null +++ b/dbms/tests/integration/test_external_dictionaries/test.py @@ -0,0 +1,204 @@ +import pytest +import os +import time + +from helpers.cluster import ClickHouseCluster +from dictionary import Field, Row, Dictionary, DictionaryStructure, Layout +from external_sources import SourceMySQL, SourceMongo + +SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) + +FIELDS = { + "simple": [ + Field("KeyField", 'UInt64', is_key=True), + Field("UInt8_", 'UInt8'), + Field("UInt16_", 'UInt16'), + Field("UInt32_", 'UInt32'), + Field("UInt64_", 'UInt64'), + Field("Int8_", 'Int8'), + Field("Int16_", 'Int16'), + Field("Int32_", 'Int32'), + Field("Int64_", 'Int64'), + Field("UUID_", 'UUID'), + Field("Date_", 'Date'), + Field("DateTime_", 'DateTime'), + Field("String_", 'String'), + Field("Float32_", 'Float32'), + Field("Float64_", 'Float64'), + ], + "complex": [ + Field("KeyField1", 'UInt64', is_key=True), + Field("KeyField2", 'String', is_key=True), + Field("UInt8_", 'UInt8'), + Field("UInt16_", 'UInt16'), + Field("UInt32_", 'UInt32'), + Field("UInt64_", 'UInt64'), + Field("Int8_", 'Int8'), + Field("Int16_", 'Int16'), + Field("Int32_", 'Int32'), + Field("Int64_", 'Int64'), + Field("UUID_", 'UUID'), + Field("Date_", 'Date'), + Field("DateTime_", 'DateTime'), + Field("String_", 'String'), + Field("Float32_", 'Float32'), + Field("Float64_", 'Float64'), + ], + "ranged": [ + Field("KeyField1", 'UInt64', is_key=True), + Field("KeyField2", 'Date', is_range_key=True), + Field("StartDate", 'Date', range_hash_type='min'), + Field("EndDate", 'Date', range_hash_type='max'), + Field("UInt8_", 'UInt8'), + Field("UInt16_", 'UInt16'), + Field("UInt32_", 'UInt32'), + Field("UInt64_", 'UInt64'), + Field("Int8_", 'Int8'), + Field("Int16_", 'Int16'), + Field("Int32_", 'Int32'), + Field("Int64_", 'Int64'), + Field("UUID_", 'UUID'), + Field("Date_", 'Date'), + Field("DateTime_", 'DateTime'), + Field("String_", 'String'), + Field("Float32_", 'Float32'), + Field("Float64_", 'Float64'), + ] + +} + +LAYOUTS = [ + Layout("cache"), + Layout("hashed"), + Layout("flat"), + Layout("complex_key_hashed"), + Layout("complex_key_cache"), + Layout("range_hashed") +] + +SOURCES = [ + #SourceMongo("MongoDB", "localhost", "27018", "mongo1", "27017", "root", "clickhouse"), + SourceMySQL("MySQL", "localhost", "3308", "mysql1", "3306", "root", "clickhouse"), +] + +DICTIONARIES = [] + +cluster = None +node = None + +def setup_module(module): + global DICTIONARIES + global cluster + global node + + dict_configs_path = os.path.join(SCRIPT_DIR, 'configs/dictionaries') + for f in os.listdir(dict_configs_path): + os.remove(os.path.join(dict_configs_path, f)) + + for layout in LAYOUTS: + for source in SOURCES: + structure = DictionaryStructure(layout, FIELDS[layout.layout_type]) + dict_name = source.name + "_" + layout.name + dict_path = os.path.join(dict_configs_path, dict_name + '.xml') + dictionary = Dictionary(dict_name, structure, source, dict_path, "table_" + dict_name) + print dict_name + dictionary.generate_config() + DICTIONARIES.append(dictionary) + + main_configs = [] + for fname in os.listdir(dict_configs_path): + main_configs.append(os.path.join(dict_configs_path, fname)) + cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs')) + node = cluster.add_instance('node', main_configs=main_configs, with_mysql=True, with_mongo=True) + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + for dictionary in DICTIONARIES: + print "Preparing", dictionary.name + dictionary.prepare_source() + print "Prepared" + + yield cluster + + finally: + pass + cluster.shutdown() + + +def test_simple_dictionaries(started_cluster): + fields = FIELDS["simple"] + data = [ + Row(fields, [1, 22, 333, 4444, 55555, -6, -77, + -888, -999, '550e8400-e29b-41d4-a716-446655440003', + '1973-06-28', '1985-02-28 23:43:25', 'hello', 22.543, 3332154213.4]), + ] + + simple_dicts = [d for d in DICTIONARIES if d.structure.layout.layout_type == "simple"] + for dct in simple_dicts: + dct.load_data(data) + + node.query("system reload dictionaries") + queries_with_answers = [] + for dct in simple_dicts: + for row in data: + for field in fields: + if not field.is_key: + queries_with_answers.append((dct.get_select_query(field, row), row.get_value_by_name(field.name))) + + for query, answer in queries_with_answers: + print query + assert node.query(query) == str(answer) + '\n' + +def test_complex_dictionaries(started_cluster): + fields = FIELDS["complex"] + data = [ + Row(fields, [1, 'world', 22, 333, 4444, 55555, -6, + -77, -888, -999, '550e8400-e29b-41d4-a716-446655440003', + '1973-06-28', '1985-02-28 23:43:25', + 'hello', 22.543, 3332154213.4]), + ] + + complex_dicts = [d for d in DICTIONARIES if d.structure.layout.layout_type == "complex"] + for dct in complex_dicts: + dct.load_data(data) + + node.query("system reload dictionaries") + queries_with_answers = [] + for dct in complex_dicts: + for row in data: + for field in fields: + if not field.is_key: + queries_with_answers.append((dct.get_select_query(field, row), row.get_value_by_name(field.name))) + + for query, answer in queries_with_answers: + print query + assert node.query(query) == str(answer) + '\n' + +def test_ranged_dictionaries(started_cluster): + fields = FIELDS["ranged"] + data = [ + Row(fields, [1, '2019-02-10', '2019-02-01', '2019-02-28', + 22, 333, 4444, 55555, -6, -77, -888, -999, + '550e8400-e29b-41d4-a716-446655440003', + '1973-06-28', '1985-02-28 23:43:25', 'hello', + 22.543, 3332154213.4]), + ] + + ranged_dicts = [d for d in DICTIONARIES if d.structure.layout.layout_type == "ranged"] + for dct in ranged_dicts: + dct.load_data(data) + + node.query("system reload dictionaries") + + queries_with_answers = [] + for dct in ranged_dicts: + for row in data: + for field in fields: + if not field.is_key and not field.is_range: + queries_with_answers.append((dct.get_select_query(field, row), row.get_value_by_name(field.name))) + + for query, answer in queries_with_answers: + print query + assert node.query(query) == str(answer) + '\n' From 70fdbca7479af4501d2eb1542d946c405bad97e6 Mon Sep 17 00:00:00 2001 From: alesapin Date: Thu, 21 Feb 2019 19:43:21 +0300 Subject: [PATCH 02/13] More tests --- .../test_external_dictionaries/dictionary.py | 23 ++- .../external_sources.py | 167 ++++++++++++++++-- .../test_external_dictionaries/test.py | 28 +-- 3 files changed, 193 insertions(+), 25 deletions(-) diff --git a/dbms/tests/integration/test_external_dictionaries/dictionary.py b/dbms/tests/integration/test_external_dictionaries/dictionary.py index 41c5d0bcf8e..86014c0bd40 100644 --- a/dbms/tests/integration/test_external_dictionaries/dictionary.py +++ b/dbms/tests/integration/test_external_dictionaries/dictionary.py @@ -1,4 +1,5 @@ #-*- coding: utf-8 -*- +import copy class Layout(object): @@ -135,8 +136,8 @@ class DictionaryStructure(object): <{key_block_name}> {key_str} - {attributes_str} {range_strs} + {attributes_str} '''.format( layout_str=self.layout.get_str(), key_block_name=self.layout.get_key_block_name(), @@ -145,6 +146,18 @@ class DictionaryStructure(object): range_strs='\n'.join(ranged_strs), ) + def get_ordered_names(self): + fields_strs = [] + for key_field in self.keys: + fields_strs.append(key_field.name) + for range_field in self.range_fields: + fields_strs.append(range_field.name) + for field in self.ordinary_fields: + fields_strs.append(field.name) + return fields_strs + + + def get_dict_get_expression(self, dict_name, field, row): if field in self.keys: raise Exception("Trying to receive key field {} from dictionary".format(field.name)) @@ -181,8 +194,8 @@ class DictionaryStructure(object): class Dictionary(object): def __init__(self, name, structure, source, config_path, table_name): self.name = name - self.structure = structure - self.source = source + self.structure = copy.deepcopy(structure) + self.source = copy.deepcopy(source) self.config_path = config_path self.table_name = table_name @@ -208,8 +221,8 @@ class Dictionary(object): source=self.source.get_source_str(self.table_name), )) - def prepare_source(self): - self.source.prepare(self.structure, self.table_name) + def prepare_source(self, cluster): + self.source.prepare(self.structure, self.table_name, cluster) def load_data(self, data): if not self.source.prepared: diff --git a/dbms/tests/integration/test_external_dictionaries/external_sources.py b/dbms/tests/integration/test_external_dictionaries/external_sources.py index 7e3ecfa5007..5a7f2536d2c 100644 --- a/dbms/tests/integration/test_external_dictionaries/external_sources.py +++ b/dbms/tests/integration/test_external_dictionaries/external_sources.py @@ -2,6 +2,8 @@ import warnings import pymysql.cursors import pymongo +import subprocess +import copy class ExternalSource(object): def __init__(self, name, internal_hostname, internal_port, @@ -14,11 +16,11 @@ class ExternalSource(object): self.user = user self.password = password - def get_source_str(self): + def get_source_str(self, table_name): raise NotImplementedError("Method {} is not implemented for {}".format( "get_source_config_part", self.__class__.__name__)) - def prepare(self, structure): + def prepare(self, structure, table_name, cluster): raise NotImplementedError("Method {} is not implemented for {}".format( "prepare_remote_source", self.__class__.__name__)) @@ -27,6 +29,10 @@ class ExternalSource(object): raise NotImplementedError("Method {} is not implemented for {}".format( "prepare_remote_source", self.__class__.__name__)) + def compatible_with_layout(self, layout): + return True + + class SourceMySQL(ExternalSource): TYPE_MAPPING = { 'UInt8': 'tinyint unsigned', @@ -83,7 +89,7 @@ class SourceMySQL(ExternalSource): tbl=table_name, ) - def prepare(self, structure, table_name): + def prepare(self, structure, table_name, cluster): self.create_mysql_conn() self.execute_mysql_query("create database if not exists test default character set 'utf8'") fields_strs = [] @@ -93,16 +99,16 @@ class SourceMySQL(ExternalSource): {fields_str}); '''.format(table_name=table_name, fields_str=','.join(fields_strs)) self.execute_mysql_query(create_query) + self.ordered_names = structure.get_ordered_names() self.prepared = True def load_data(self, data, table_name): values_strs = [] if not data: return - ordered_names = [name for name in data[0].data] for row in data: sorted_row = [] - for name in ordered_names: + for name in self.ordered_names: data = row.data[name] if isinstance(row.data[name], str): data = "'" + data + "'" @@ -112,7 +118,7 @@ class SourceMySQL(ExternalSource): values_strs.append('(' + ','.join(sorted_row) + ')') query = 'insert into test.{} ({}) values {}'.format( table_name, - ','.join(ordered_names), + ','.join(self.ordered_names), ''.join(values_strs)) self.execute_mysql_query(query) @@ -137,7 +143,7 @@ class SourceMongo(ExternalSource): tbl=table_name, ) - def prepare(self, structure, table_name): + def prepare(self, structure, table_name, cluster): connection_str = 'mongodb://{user}:{password}@{host}:{port}'.format( host=self.internal_hostname, port=self.internal_port, user=self.user, password=self.password) @@ -151,6 +157,147 @@ class SourceMongo(ExternalSource): tbl = self.db[table_name] to_insert = [dict(row.data) for row in data] result = tbl.insert_many(to_insert) - print "IDS:", result.inserted_ids - for r in tbl.find(): - print "RESULT:", r + +class SourceClickHouse(ExternalSource): + + def get_source_str(self, table_name): + return ''' + + {host} + {port} + {user} + {password} + test + {tbl}
+
+ '''.format( + host=self.docker_hostname, + port=self.docker_port, + user=self.user, + password=self.password, + tbl=table_name, + ) + + def prepare(self, structure, table_name, cluster): + self.node = cluster.instances[self.docker_hostname] + self.node.query("CREATE DATABASE IF NOT EXISTS test") + fields_strs = [] + for field in structure.keys + structure.ordinary_fields + structure.range_fields: + fields_strs.append(field.name + ' ' + field.field_type) + create_query = '''CREATE TABLE test.{table_name} ( + {fields_str}) ENGINE MergeTree ORDER BY tuple(); + '''.format(table_name=table_name, fields_str=','.join(fields_strs)) + self.node.query(create_query) + self.ordered_names = structure.get_ordered_names() + self.prepared = True + + def load_data(self, data, table_name): + values_strs = [] + if not data: + return + for row in data: + sorted_row = [] + for name in self.ordered_names: + row_data = row.data[name] + if isinstance(row_data, str): + row_data = "'" + row_data + "'" + else: + row_data = str(row_data) + sorted_row.append(row_data) + values_strs.append('(' + ','.join(sorted_row) + ')') + query = 'INSERT INTO test.{} ({}) values {}'.format( + table_name, + ','.join(self.ordered_names), + ''.join(values_strs)) + self.node.query(query) + + +class SourceFile(ExternalSource): + + def get_source_str(self, table_name): + table_path = "/" + table_name + ".tsv" + return ''' + + {path} + TabSeparated + + '''.format( + path=table_path, + ) + + def prepare(self, structure, table_name, cluster): + self.node = cluster.instances[self.docker_hostname] + path = "/" + table_name + ".tsv" + self.node.exec_in_container(["bash", "-c", "touch {}".format(path)]) + self.ordered_names = structure.get_ordered_names() + self.prepared = True + + def load_data(self, data, table_name): + if not data: + return + path = "/" + table_name + ".tsv" + for row in list(data): + sorted_row = [] + for name in self.ordered_names: + sorted_row.append(str(row.data[name])) + + str_data = '\t'.join(sorted_row) + self.node.exec_in_container(["bash", "-c", "echo \"{row}\" >> {fname}".format(row=str_data, fname=path)]) + + def compatible_with_layout(self, layout): + return 'cache' not in layout.name + + +class _SourceExecutableBase(ExternalSource): + + def _get_cmd(self, path): + raise NotImplementedError("Method {} is not implemented for {}".format( + "_get_cmd", self.__class__.__name__)) + + def get_source_str(self, table_name): + table_path = "/" + table_name + ".tsv" + return ''' + + {cmd} + TabSeparated + + '''.format( + cmd=self._get_cmd(table_path), + ) + + def prepare(self, structure, table_name, cluster): + self.node = cluster.instances[self.docker_hostname] + path = "/" + table_name + ".tsv" + self.node.exec_in_container(["bash", "-c", "touch {}".format(path)]) + self.ordered_names = structure.get_ordered_names() + self.prepared = True + + def load_data(self, data, table_name): + if not data: + return + path = "/" + table_name + ".tsv" + for row in list(data): + sorted_row = [] + for name in self.ordered_names: + sorted_row.append(str(row.data[name])) + + str_data = '\t'.join(sorted_row) + self.node.exec_in_container(["bash", "-c", "echo \"{row}\" >> {fname}".format(row=str_data, fname=path)]) + + +class SourceExecutableCache(_SourceExecutableBase): + + def _get_cmd(self, path): + return "cat {}".format(path) + + def compatible_with_layout(self, layout): + return 'cache' not in layout.name + + +class SourceExecutableHashed(_SourceExecutableBase): + + def _get_cmd(self, path): + return "cat - >/dev/null;cat {}".format(path) + + def compatible_with_layout(self, layout): + return 'cache' in layout.name diff --git a/dbms/tests/integration/test_external_dictionaries/test.py b/dbms/tests/integration/test_external_dictionaries/test.py index ac4ce47f63d..12fa08dd9c6 100644 --- a/dbms/tests/integration/test_external_dictionaries/test.py +++ b/dbms/tests/integration/test_external_dictionaries/test.py @@ -4,7 +4,7 @@ import time from helpers.cluster import ClickHouseCluster from dictionary import Field, Row, Dictionary, DictionaryStructure, Layout -from external_sources import SourceMySQL, SourceMongo +from external_sources import SourceMySQL, SourceMongo, SourceClickHouse, SourceFile, SourceExecutableCache, SourceExecutableHashed SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) @@ -68,8 +68,8 @@ FIELDS = { } LAYOUTS = [ - Layout("cache"), Layout("hashed"), + Layout("cache"), Layout("flat"), Layout("complex_key_hashed"), Layout("complex_key_cache"), @@ -79,6 +79,11 @@ LAYOUTS = [ SOURCES = [ #SourceMongo("MongoDB", "localhost", "27018", "mongo1", "27017", "root", "clickhouse"), SourceMySQL("MySQL", "localhost", "3308", "mysql1", "3306", "root", "clickhouse"), + SourceClickHouse("RemoteClickHouse", "localhost", "9000", "clickhouse1", "9000", "default", ""), + SourceClickHouse("LocalClickHouse", "localhost", "9000", "node", "9000", "default", ""), + SourceFile("File", "localhost", "9000", "node", "9000", "", ""), + SourceExecutableHashed("ExecutableHashed", "localhost", "9000", "node", "9000", "", ""), + SourceExecutableCache("ExecutableCache", "localhost", "9000", "node", "9000", "", ""), ] DICTIONARIES = [] @@ -97,19 +102,22 @@ def setup_module(module): for layout in LAYOUTS: for source in SOURCES: - structure = DictionaryStructure(layout, FIELDS[layout.layout_type]) - dict_name = source.name + "_" + layout.name - dict_path = os.path.join(dict_configs_path, dict_name + '.xml') - dictionary = Dictionary(dict_name, structure, source, dict_path, "table_" + dict_name) - print dict_name - dictionary.generate_config() - DICTIONARIES.append(dictionary) + if source.compatible_with_layout(layout): + structure = DictionaryStructure(layout, FIELDS[layout.layout_type]) + dict_name = source.name + "_" + layout.name + dict_path = os.path.join(dict_configs_path, dict_name + '.xml') + dictionary = Dictionary(dict_name, structure, source, dict_path, "table_" + dict_name) + dictionary.generate_config() + DICTIONARIES.append(dictionary) + else: + print "Source", source.name, "incompatible with layout", layout.name main_configs = [] for fname in os.listdir(dict_configs_path): main_configs.append(os.path.join(dict_configs_path, fname)) cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs')) node = cluster.add_instance('node', main_configs=main_configs, with_mysql=True, with_mongo=True) + cluster.add_instance('clickhouse1') @pytest.fixture(scope="module") def started_cluster(): @@ -117,7 +125,7 @@ def started_cluster(): cluster.start() for dictionary in DICTIONARIES: print "Preparing", dictionary.name - dictionary.prepare_source() + dictionary.prepare_source(cluster) print "Prepared" yield cluster From 7c0af3c914f3f95320c1a0f1961d75a3447545a0 Mon Sep 17 00:00:00 2001 From: alesapin Date: Thu, 21 Feb 2019 20:02:33 +0300 Subject: [PATCH 03/13] Adding http --- .../external_sources.py | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/dbms/tests/integration/test_external_dictionaries/external_sources.py b/dbms/tests/integration/test_external_dictionaries/external_sources.py index 5a7f2536d2c..a16d6665812 100644 --- a/dbms/tests/integration/test_external_dictionaries/external_sources.py +++ b/dbms/tests/integration/test_external_dictionaries/external_sources.py @@ -301,3 +301,26 @@ class SourceExecutableHashed(_SourceExecutableBase): def compatible_with_layout(self, layout): return 'cache' in layout.name + +class SourceHTTPBase(ExternalSource): + + def get_source_str(self, table_name): + url = "{schema}://{host}:5555/".format(schema=self._get_schema(), self.docker_hostname) + return ''' + + {url} + TabSeparated + + '''.format(url=url) + + def prepare(self, structure, table_name, cluster): + self.node = cluster.instances[self.docker_hostname] + path = "/" + table_name + ".tsv" + self.node.exec_in_container(["bash", "-c", "touch {}".format(path)]) + with open('http_server.py', 'r') as server_code: + for line in server_code: + self.node.exec_in_container(["bash", "-c", "echo ".format(path)]) + + self.node.exec_in_container([]) + self.ordered_names = structure.get_ordered_names() + self.prepared = True From 0d2e562b02ff6e2254d384c3debacb7da94c2fe3 Mon Sep 17 00:00:00 2001 From: alesapin Date: Thu, 21 Feb 2019 20:34:19 +0300 Subject: [PATCH 04/13] tryin to add http server --- dbms/tests/integration/helpers/cluster.py | 32 +++++++++---------- .../external_sources.py | 29 +++++++++++++---- 2 files changed, 38 insertions(+), 23 deletions(-) diff --git a/dbms/tests/integration/helpers/cluster.py b/dbms/tests/integration/helpers/cluster.py index 9924de0f504..f64eac4c301 100644 --- a/dbms/tests/integration/helpers/cluster.py +++ b/dbms/tests/integration/helpers/cluster.py @@ -15,6 +15,7 @@ from kazoo.client import KazooClient from kazoo.exceptions import KazooException import psycopg2 import requests +import base64 import docker from docker.errors import ContainerError @@ -98,7 +99,6 @@ class ClickHouseCluster: self.with_kafka = False self.with_odbc_drivers = False self.with_hdfs = False - self.with_mongo = False self.docker_client = None self.is_up = False @@ -110,7 +110,7 @@ class ClickHouseCluster: cmd += " client" return cmd - def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macros={}, with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None, with_odbc_drivers=False, with_postgres=False, with_hdfs=False, with_mongo=False, hostname=None, env_variables={}, image="yandex/clickhouse-integration-test", stay_alive=False, ipv4_address=None, ipv6_address=None): + def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macros={}, with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None, with_odbc_drivers=False, with_postgres=False, with_hdfs=False, hostname=None, env_variables={}, image="yandex/clickhouse-integration-test", stay_alive=False, ipv4_address=None, ipv6_address=None): """Add an instance to the cluster. name - the name of the instance directory and the value of the 'instance' macro in ClickHouse. @@ -128,7 +128,7 @@ class ClickHouseCluster: instance = ClickHouseInstance( self, self.base_dir, name, config_dir, main_configs, user_configs, macros, with_zookeeper, - self.zookeeper_config_path, with_mysql, with_kafka, with_mongo, self.base_configs_dir, self.server_bin_path, + self.zookeeper_config_path, with_mysql, with_kafka, self.base_configs_dir, self.server_bin_path, self.odbc_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers, hostname=hostname, env_variables=env_variables, image=image, stay_alive=stay_alive, ipv4_address=ipv4_address, ipv6_address=ipv6_address) @@ -177,11 +177,6 @@ class ClickHouseCluster: self.base_hdfs_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_hdfs.yml')] - if with_mongo and not self.with_mongo: - self.with_mongo = True - self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_mongo.yml')]) - self.base_mongo_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', - self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mongo.yml')] return instance @@ -296,10 +291,6 @@ class ClickHouseCluster: subprocess_check_call(self.base_hdfs_cmd + ['up', '-d', '--force-recreate']) self.wait_hdfs_to_start(120) - if self.with_mongo and self.base_mongo_cmd: - subprocess_check_call(self.base_mongo_cmd + ['up', '-d', '--force-recreate']) - time.sleep(10) - subprocess_check_call(self.base_cmd + ['up', '-d', '--no-recreate']) start_deadline = time.time() + 20.0 # seconds @@ -371,8 +362,11 @@ services: cap_add: - SYS_PTRACE depends_on: {depends_on} + user: '{user}' env_file: - {env_file} + security_opt: + - label:disable {networks} {app_net} {ipv4_address} @@ -395,9 +389,8 @@ class ClickHouseInstance: def __init__( self, cluster, base_path, name, custom_config_dir, custom_main_configs, custom_user_configs, macros, - with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_mongo, base_configs_dir, - server_bin_path, odbc_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers, hostname=None, - env_variables={}, image="yandex/clickhouse-integration-test", + with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, base_configs_dir, server_bin_path, odbc_bridge_bin_path, + clickhouse_path_dir, with_odbc_drivers, hostname=None, env_variables={}, image="yandex/clickhouse-integration-test", stay_alive=False, ipv4_address=None, ipv6_address=None): self.name = name @@ -420,7 +413,6 @@ class ClickHouseInstance: self.with_mysql = with_mysql self.with_kafka = with_kafka - self.with_mongo = with_mongo self.path = p.join(self.cluster.instances_dir, name) self.docker_compose_path = p.join(self.path, 'docker_compose.yml') @@ -476,6 +468,13 @@ class ClickHouseInstance: raise Exception('Cmd "{}" failed! Return code {}. Output: {}'.format(' '.join(cmd), exit_code, output)) return output + def copy_file_to_container(self, local_path, dest_path): + with open(local_path, 'r') as fdata: + data = fdata.read() + encoded_data = base64.b64encode(data) + self.exec_in_container(["bash", "-c", "echo {} | base64 --decode > {}".format(encoded_data, dest_path)]) + + def get_docker_handle(self): return self.docker_client.containers.get(self.docker_id) @@ -681,6 +680,7 @@ class ClickHouseInstance: db_dir=db_dir, logs_dir=logs_dir, depends_on=str(depends_on), + user=os.getuid(), env_file=env_file, odbc_ini_path=odbc_ini_path, entrypoint_cmd=entrypoint_cmd, diff --git a/dbms/tests/integration/test_external_dictionaries/external_sources.py b/dbms/tests/integration/test_external_dictionaries/external_sources.py index a16d6665812..df6943b5181 100644 --- a/dbms/tests/integration/test_external_dictionaries/external_sources.py +++ b/dbms/tests/integration/test_external_dictionaries/external_sources.py @@ -4,6 +4,7 @@ import pymysql.cursors import pymongo import subprocess import copy +import base64 class ExternalSource(object): def __init__(self, name, internal_hostname, internal_port, @@ -305,7 +306,8 @@ class SourceExecutableHashed(_SourceExecutableBase): class SourceHTTPBase(ExternalSource): def get_source_str(self, table_name): - url = "{schema}://{host}:5555/".format(schema=self._get_schema(), self.docker_hostname) + self.http_port = 5555 + url = "{schema}://{host}:{port}/".format(schema=self._get_schema(), host=self.docker_hostname, port=self.http_port) return ''' {url} @@ -315,12 +317,25 @@ class SourceHTTPBase(ExternalSource): def prepare(self, structure, table_name, cluster): self.node = cluster.instances[self.docker_hostname] - path = "/" + table_name + ".tsv" self.node.exec_in_container(["bash", "-c", "touch {}".format(path)]) - with open('http_server.py', 'r') as server_code: - for line in server_code: - self.node.exec_in_container(["bash", "-c", "echo ".format(path)]) - - self.node.exec_in_container([]) + self.node.copy_file_to_container('./http_server.py', '/http_server.py') + self.node.exec_in_container([ + "bash", + "-c", + "python /http_server.py --data-path /{tbl}.tsv --schema={schema} --host={host} --port={port}".format( + tbl=table_name, schema=self._get_schema(), host=self.docker_hostname, port=self.http_port) + ]) self.ordered_names = structure.get_ordered_names() self.prepared = True + + def load_data(self, data, table_name): + if not data: + return + path = "/" + table_name + ".tsv" + for row in list(data): + sorted_row = [] + for name in self.ordered_names: + sorted_row.append(str(row.data[name])) + + str_data = '\t'.join(sorted_row) + self.node.exec_in_container(["bash", "-c", "echo \"{row}\" >> {fname}".format(row=str_data, fname=path)]) From 2bb54b0b38577b87456be74328f3626a00223c23 Mon Sep 17 00:00:00 2001 From: alesapin Date: Fri, 22 Feb 2019 13:55:12 +0300 Subject: [PATCH 05/13] Add http tests and different queries --- dbms/tests/integration/helpers/cluster.py | 4 +- .../configs/dictionaries/.gitkeep | 0 .../test_external_dictionaries/dictionary.py | 89 +++++++++++-- .../external_sources.py | 31 +++-- .../test_external_dictionaries/fake_cert.pem | 49 +++++++ .../test_external_dictionaries/http_server.py | 43 +++++++ .../test_external_dictionaries/test.py | 120 ++++++++++-------- 7 files changed, 264 insertions(+), 72 deletions(-) create mode 100644 dbms/tests/integration/test_external_dictionaries/configs/dictionaries/.gitkeep create mode 100644 dbms/tests/integration/test_external_dictionaries/fake_cert.pem create mode 100644 dbms/tests/integration/test_external_dictionaries/http_server.py diff --git a/dbms/tests/integration/helpers/cluster.py b/dbms/tests/integration/helpers/cluster.py index f64eac4c301..c6b18634320 100644 --- a/dbms/tests/integration/helpers/cluster.py +++ b/dbms/tests/integration/helpers/cluster.py @@ -457,10 +457,10 @@ class ClickHouseInstance: return self.client.get_query_request(*args, **kwargs) - def exec_in_container(self, cmd, **kwargs): + def exec_in_container(self, cmd, detach=False, **kwargs): container = self.get_docker_handle() exec_id = self.docker_client.api.exec_create(container.id, cmd, **kwargs) - output = self.docker_client.api.exec_start(exec_id, detach=False) + output = self.docker_client.api.exec_start(exec_id, detach=detach) output = output.decode('utf8') exit_code = self.docker_client.api.exec_inspect(exec_id)['ExitCode'] diff --git a/dbms/tests/integration/test_external_dictionaries/configs/dictionaries/.gitkeep b/dbms/tests/integration/test_external_dictionaries/configs/dictionaries/.gitkeep new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/integration/test_external_dictionaries/dictionary.py b/dbms/tests/integration/test_external_dictionaries/dictionary.py index 86014c0bd40..e909d0d54ab 100644 --- a/dbms/tests/integration/test_external_dictionaries/dictionary.py +++ b/dbms/tests/integration/test_external_dictionaries/dictionary.py @@ -48,7 +48,7 @@ class Row(object): class Field(object): - def __init__(self, name, field_type, is_key=False, is_range_key=False, default=None, hierarchical=False, range_hash_type=None): + def __init__(self, name, field_type, is_key=False, is_range_key=False, default=None, hierarchical=False, range_hash_type=None, default_value_for_get=None): self.name = name self.field_type = field_type self.is_key = is_key @@ -57,6 +57,7 @@ class Field(object): self.range_hash_type = range_hash_type self.is_range = self.range_hash_type is not None self.is_range_key = is_range_key + self.default_value_for_get = default_value_for_get def get_attribute_str(self): return ''' @@ -157,21 +158,26 @@ class DictionaryStructure(object): return fields_strs - - def get_dict_get_expression(self, dict_name, field, row): + def _get_dict_get_common_expression(self, dict_name, field, row, or_default, with_type, has): if field in self.keys: raise Exception("Trying to receive key field {} from dictionary".format(field.name)) if not self.layout.is_complex: - key_expr = 'toUInt64({})'.format(row.data[self.keys[0].name]) + if not or_default: + key_expr = ', toUInt64({})'.format(row.data[self.keys[0].name]) + else: + key_expr = ', toUInt64({})'.format(self.keys[0].default_value_for_get) else: key_exprs_strs = [] for key in self.keys: - val = row.data[key.name] + if not or_default: + val = row.data[key.name] + else: + val = key.default_value_for_get if isinstance(val, str): val = "'" + val + "'" key_exprs_strs.append('to{type}({value})'.format(type=key.field_type, value=val)) - key_expr = '(' + ','.join(key_exprs_strs) + ')' + key_expr = ', (' + ','.join(key_exprs_strs) + ')' date_expr = '' if self.layout.is_ranged: @@ -182,14 +188,69 @@ class DictionaryStructure(object): date_expr = ', ' + val - return "dictGet{field_type}('{dict_name}', '{field_name}', {key_expr}{date_expr})".format( - field_type=field.field_type, + if or_default: + raise Exception("Can create 'dictGetOrDefault' query for ranged dictionary") + + if or_default: + or_default_expr = 'OrDefault' + if field.default_value_for_get is None: + raise Exception("Can create 'dictGetOrDefault' query for field {} without default_value_for_get".format(field.name)) + + val = field.default_value_for_get + if isinstance(val, str): + val = "'" + val + "'" + default_value_for_get = ', to{type}({value})'.format(type=field.field_type, value=val) + else: + or_default_expr = '' + default_value_for_get = '' + + if with_type: + field_type = field.field_type + else: + field_type = '' + + field_name = ", '" + field.name + "'" + if has: + what = "Has" + field_type = '' + or_default = '' + field_name = '' + date_expr = '' + def_for_get = '' + else: + what = "Get" + + return "dict{what}{field_type}{or_default}('{dict_name}'{field_name}{key_expr}{date_expr}{def_for_get})".format( + what=what, + field_type=field_type, dict_name=dict_name, - field_name=field.name, + field_name=field_name, key_expr=key_expr, date_expr=date_expr, + or_default=or_default_expr, + def_for_get=default_value_for_get, ) + def get_get_expressions(self, dict_name, field, row): + return [ + self._get_dict_get_common_expression(dict_name, field, row, or_default=False, with_type=False, has=False), + self._get_dict_get_common_expression(dict_name, field, row, or_default=False, with_type=True, has=False), + ] + + def get_get_or_default_expressions(self, dict_name, field, row): + if not self.layout.is_ranged: + return [ + self._get_dict_get_common_expression(dict_name, field, row, or_default=True, with_type=False, has=False), + self._get_dict_get_common_expression(dict_name, field, row, or_default=True, with_type=True, has=False), + ] + return [] + + + def get_has_expressions(self, dict_name, field, row): + if not self.layout.is_ranged: + return [self._get_dict_get_common_expression(dict_name, field, row, or_default=False, with_type=False, has=True)] + return [] + class Dictionary(object): def __init__(self, name, structure, source, config_path, table_name): @@ -230,8 +291,14 @@ class Dictionary(object): self.source.load_data(data, self.table_name) - def get_select_query(self, field, row): - return 'select {}'.format(self.structure.get_dict_get_expression(self.name, field, row)) + def get_select_get_queries(self, field, row): + return ['select {}'.format(expr) for expr in self.structure.get_get_expressions(self.name, field, row)] + + def get_select_get_or_default_queries(self, field, row): + return ['select {}'.format(expr) for expr in self.structure.get_get_or_default_expressions(self.name, field, row)] + + def get_select_has_queries(self, field, row): + return ['select {}'.format(expr) for expr in self.structure.get_has_expressions(self.name, field, row)] def is_complex(self): return self.structure.layout.is_complex diff --git a/dbms/tests/integration/test_external_dictionaries/external_sources.py b/dbms/tests/integration/test_external_dictionaries/external_sources.py index df6943b5181..251e85b8523 100644 --- a/dbms/tests/integration/test_external_dictionaries/external_sources.py +++ b/dbms/tests/integration/test_external_dictionaries/external_sources.py @@ -2,9 +2,8 @@ import warnings import pymysql.cursors import pymongo -import subprocess -import copy -import base64 +import os + class ExternalSource(object): def __init__(self, name, internal_hostname, internal_port, @@ -305,9 +304,11 @@ class SourceExecutableHashed(_SourceExecutableBase): class SourceHTTPBase(ExternalSource): + PORT_COUNTER = 5555 def get_source_str(self, table_name): - self.http_port = 5555 + self.http_port = SourceHTTPBase.PORT_COUNTER url = "{schema}://{host}:{port}/".format(schema=self._get_schema(), host=self.docker_hostname, port=self.http_port) + SourceHTTPBase.PORT_COUNTER += 1 return ''' {url} @@ -317,14 +318,18 @@ class SourceHTTPBase(ExternalSource): def prepare(self, structure, table_name, cluster): self.node = cluster.instances[self.docker_hostname] + path = "/" + table_name + ".tsv" self.node.exec_in_container(["bash", "-c", "touch {}".format(path)]) - self.node.copy_file_to_container('./http_server.py', '/http_server.py') + + script_dir = os.path.dirname(os.path.realpath(__file__)) + self.node.copy_file_to_container(os.path.join(script_dir, './http_server.py'), '/http_server.py') + self.node.copy_file_to_container(os.path.join(script_dir, './fake_cert.pem'), '/fake_cert.pem') self.node.exec_in_container([ "bash", "-c", - "python /http_server.py --data-path /{tbl}.tsv --schema={schema} --host={host} --port={port}".format( - tbl=table_name, schema=self._get_schema(), host=self.docker_hostname, port=self.http_port) - ]) + "python2 /http_server.py --data-path={tbl} --schema={schema} --host={host} --port={port} --cert-path=/fake_cert.pem".format( + tbl=path, schema=self._get_schema(), host=self.docker_hostname, port=self.http_port) + ], detach=True) self.ordered_names = structure.get_ordered_names() self.prepared = True @@ -339,3 +344,13 @@ class SourceHTTPBase(ExternalSource): str_data = '\t'.join(sorted_row) self.node.exec_in_container(["bash", "-c", "echo \"{row}\" >> {fname}".format(row=str_data, fname=path)]) + + +class SourceHTTP(SourceHTTPBase): + def _get_schema(self): + return "http" + + +class SourceHTTPS(SourceHTTPBase): + def _get_schema(self): + return "https" diff --git a/dbms/tests/integration/test_external_dictionaries/fake_cert.pem b/dbms/tests/integration/test_external_dictionaries/fake_cert.pem new file mode 100644 index 00000000000..aa846f4f49a --- /dev/null +++ b/dbms/tests/integration/test_external_dictionaries/fake_cert.pem @@ -0,0 +1,49 @@ +-----BEGIN PRIVATE KEY----- +MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDDHnGYqN/ztiFE +rMQizbYiEpI/q/91bCDQ+xRes+gucKrr4qvQbosANYfpXgsaGizH24CpAXDvnFwC +oHqPmotHunJvG9uKiVvshy+tx1SNLZEN9DySri+8V+8fetn5PFxWQsKclMGCypyE +REV6H0vflPWmZRZWvAb5aaIxcRa2m3bTVUZPuY0wzCtc+ELPQ/sRc62gWH4bMlBo +0Wdai4+wcmpdcSR+rlZVDPt+ysxF/PcJFMAQ9CIRJRhXuK7Q/XCmAkagpH9tPPwY +SDMONTPhumXY7gCX4lmV9CflGJ6IpGmpEL04Rpr3gAcvz/w4JiMXgGpvtDjiJku9 +qOdCYS/FAgMBAAECggEBAL/miULjlJ9VWZL5eE3ilGcebMhCmZUbK4td5cLenlRO +a0xkOydcEUm7XFihLboWVEScFgYibLi8x6Gtw9zI2oNJVJMCiwHN5qLSsonvqbDQ +SAG5XHnG5xwOQBht80O1ofsU3eKyS0AflaBgpRRfA3h6QL/OXBIiC5nx0ptd5kDh +HR0IHUcleBHt8I0d/PZbQE9oMOBlnMf8v2jGe80JXscQt2UabA/quCalDihhDt5J +qySfh4mDOrBOQEsmO/C1JCztQ6WZ2FVwRiITb/fRmsPadKJsIiMyy2w6NmP96v2a +V2ZqMvz9OZym8M2is4HR2pbn8XJ6vmW52fwNQhpWDgECgYEA8aiqF5df3j8YEDAX +XVAhIaubSLcS50qSk/p0/ZS9ETR1Uv8zjJDs6xBVBd4xXe/G2/XvvV6sGp4JcW3V +U66Ll3S1veMlnvCTjZUEi931EJbIdoyGACEG19QIVteSEhQkoSOk/Zx1lFSVm9UZ +hUV4JvWifQvLetS/v6MhnxSbTdUCgYEAzrK7+0gVT0a0szMs7CbeQVm80EWcqPea +p5jyLQHu+7vzcC8c9RRlqBPkxeG9BTt0sbBBJTrtvls15QaFoKCtTyjnrrLEHqu3 +VZfIpjjrIhhvoRWP3A3r4DFMDGm/TOTUWEMSPJPXKe3uVm3buwVXWj4ipvhnAdr5 +kJ+x1YqNIjECgYEAo0ISHzv53Vh8tjr3HehLacbYcmiUEcOUgPo8XTBGBsCM3pRg +S/+Av1FaT0uLyG17yBA/dYzm8liAAqxz6UPLNHf5bB5vxQ+8b3MUDjXWIO3s4gIP +aTjmuZqaQ6kBGsuW73H4PgmceagnJo7x3dJP2OoraxUz03i1Tg80YJd4UD0CgYBC +dzL/gJRpo6DjpuchIPaDKSoQBvJzWvt+PS5SzrZceHm1b1DudhqiS5NbFlXD4vSJ +VtX79NESTx4rgUdi+YgBVnP5tz5dZnZTrbU1zkO9+QGcWOSjrE5XD0MXEsITJdoq +b5bjp96eewYTAMyRfQwz1psp+eKVtCZgHRoAQsdTYQKBgQC7yBABJ4LDTie2C2n0 +itO7SRT1tMfkNx8gK9RrgGawBUhD1EokmOKk+O1Ht6Cx7hqCd3Hsa4zc9se++jV1 +Er+T8LW8FOFfAwtv8xggJtA8h6U8n6gIoq0EsSsWREJ4m9fDfZQnVTj8IPYvPHMr +Jv++IPqtFGG4O8IeWG+HY8mHxQ== +-----END PRIVATE KEY----- +-----BEGIN CERTIFICATE----- +MIIDYDCCAkigAwIBAgIJAKSJ3I0ORzjtMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV +BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX +aWRnaXRzIFB0eSBMdGQwHhcNMTkwMjIyMDgxNTIzWhcNMjAwMjIyMDgxNTIzWjBF +MQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50 +ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB +CgKCAQEAwx5xmKjf87YhRKzEIs22IhKSP6v/dWwg0PsUXrPoLnCq6+Kr0G6LADWH +6V4LGhosx9uAqQFw75xcAqB6j5qLR7pybxvbiolb7IcvrcdUjS2RDfQ8kq4vvFfv +H3rZ+TxcVkLCnJTBgsqchERFeh9L35T1pmUWVrwG+WmiMXEWtpt201VGT7mNMMwr +XPhCz0P7EXOtoFh+GzJQaNFnWouPsHJqXXEkfq5WVQz7fsrMRfz3CRTAEPQiESUY +V7iu0P1wpgJGoKR/bTz8GEgzDjUz4bpl2O4Al+JZlfQn5RieiKRpqRC9OEaa94AH +L8/8OCYjF4Bqb7Q44iZLvajnQmEvxQIDAQABo1MwUTAdBgNVHQ4EFgQU6P39PMY3 +jRgJM0svz9XpHH8z7xUwHwYDVR0jBBgwFoAU6P39PMY3jRgJM0svz9XpHH8z7xUw +DwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAdIKBKlCIprCDGTtn +xatBlcpkbys4hQhHwkWn5aAPKE2oZlUOTEe90xxLJuciK+vCXTwQ3mgjGFc+ioAo +B7m3VL1DLmHCw5DQ2T/g8TjVjlKoaCj+9SZZPga5ygYJChx5HKFO4eek9stWo6hA +BmXndKhdX7mphUoSqUnQ+RwQ9XA0n6eTPqXAThWVqvLQgDj7Msz1XeFfqFqyD9MN +RocFg87aASTtwxYneG3IZCOQudlbHaRuEflHjlty2V5mNPjzcS2QK598i/5vmIoD +ZiUBXg+P8n+dklEa4qnQplDKERD20GtDgWtgYrfmpspLWNv8/bZ4h4gmGsH0+3uz +dHQNQA== +-----END CERTIFICATE----- diff --git a/dbms/tests/integration/test_external_dictionaries/http_server.py b/dbms/tests/integration/test_external_dictionaries/http_server.py new file mode 100644 index 00000000000..ec7ce9ebeb0 --- /dev/null +++ b/dbms/tests/integration/test_external_dictionaries/http_server.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- +import argparse +from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer +import ssl +import csv +import os + + +def start_server(server_address, cert_path, data_path, schema): + class TSVHTTPHandler(BaseHTTPRequestHandler): + def _set_headers(self): + self.send_response(200) + self.send_header('Content-type', 'text/tsv') + self.end_headers() + + def do_GET(self): + self._set_headers() + with open(data_path, 'r') as fl: + reader = csv.reader(fl, delimiter='\t') + for row in reader: + self.wfile.write('\t'.join(row) + '\n') + return + + def do_POST(self): + return self.do_GET() + + httpd = HTTPServer(server_address, TSVHTTPHandler) + if schema == 'https': + httpd.socket = ssl.wrap_socket(httpd.socket, certfile=cert_path, server_side=True) + httpd.serve_forever() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Simple HTTP server returns data from file") + parser.add_argument("--data-path", required=True) + parser.add_argument("--schema", choices=("http", "https"), required=True) + parser.add_argument("--host", default="localhost") + parser.add_argument("--port", default=5555, type=int) + parser.add_argument("--cert-path", default="./fake_cert.pem") + + args = parser.parse_args() + + start_server((args.host, args.port), args.cert_path, args.data_path, args.schema) diff --git a/dbms/tests/integration/test_external_dictionaries/test.py b/dbms/tests/integration/test_external_dictionaries/test.py index 12fa08dd9c6..5a15fc8b725 100644 --- a/dbms/tests/integration/test_external_dictionaries/test.py +++ b/dbms/tests/integration/test_external_dictionaries/test.py @@ -5,64 +5,65 @@ import time from helpers.cluster import ClickHouseCluster from dictionary import Field, Row, Dictionary, DictionaryStructure, Layout from external_sources import SourceMySQL, SourceMongo, SourceClickHouse, SourceFile, SourceExecutableCache, SourceExecutableHashed +from external_sources import SourceHTTP, SourceHTTPS SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) FIELDS = { "simple": [ - Field("KeyField", 'UInt64', is_key=True), - Field("UInt8_", 'UInt8'), - Field("UInt16_", 'UInt16'), - Field("UInt32_", 'UInt32'), - Field("UInt64_", 'UInt64'), - Field("Int8_", 'Int8'), - Field("Int16_", 'Int16'), - Field("Int32_", 'Int32'), - Field("Int64_", 'Int64'), - Field("UUID_", 'UUID'), - Field("Date_", 'Date'), - Field("DateTime_", 'DateTime'), - Field("String_", 'String'), - Field("Float32_", 'Float32'), - Field("Float64_", 'Float64'), + Field("KeyField", 'UInt64', is_key=True, default_value_for_get=9999999), + Field("UInt8_", 'UInt8', default_value_for_get=55), + Field("UInt16_", 'UInt16', default_value_for_get=66), + Field("UInt32_", 'UInt32', default_value_for_get=77), + Field("UInt64_", 'UInt64', default_value_for_get=88), + Field("Int8_", 'Int8', default_value_for_get=-55), + Field("Int16_", 'Int16', default_value_for_get=-66), + Field("Int32_", 'Int32', default_value_for_get=-77), + Field("Int64_", 'Int64', default_value_for_get=-88), + Field("UUID_", 'UUID', default_value_for_get='550e8400-0000-0000-0000-000000000000'), + Field("Date_", 'Date', default_value_for_get='2018-12-30'), + Field("DateTime_", 'DateTime', default_value_for_get='2018-12-30 00:00:00'), + Field("String_", 'String', default_value_for_get='hi'), + Field("Float32_", 'Float32', default_value_for_get=555.11), + Field("Float64_", 'Float64', default_value_for_get=777.11), ], "complex": [ - Field("KeyField1", 'UInt64', is_key=True), - Field("KeyField2", 'String', is_key=True), - Field("UInt8_", 'UInt8'), - Field("UInt16_", 'UInt16'), - Field("UInt32_", 'UInt32'), - Field("UInt64_", 'UInt64'), - Field("Int8_", 'Int8'), - Field("Int16_", 'Int16'), - Field("Int32_", 'Int32'), - Field("Int64_", 'Int64'), - Field("UUID_", 'UUID'), - Field("Date_", 'Date'), - Field("DateTime_", 'DateTime'), - Field("String_", 'String'), - Field("Float32_", 'Float32'), - Field("Float64_", 'Float64'), + Field("KeyField1", 'UInt64', is_key=True, default_value_for_get=9999999), + Field("KeyField2", 'String', is_key=True, default_value_for_get='xxxxxxxxx'), + Field("UInt8_", 'UInt8', default_value_for_get=55), + Field("UInt16_", 'UInt16', default_value_for_get=66), + Field("UInt32_", 'UInt32', default_value_for_get=77), + Field("UInt64_", 'UInt64', default_value_for_get=88), + Field("Int8_", 'Int8', default_value_for_get=-55), + Field("Int16_", 'Int16', default_value_for_get=-66), + Field("Int32_", 'Int32', default_value_for_get=-77), + Field("Int64_", 'Int64', default_value_for_get=-88), + Field("UUID_", 'UUID', default_value_for_get='550e8400-0000-0000-0000-000000000000'), + Field("Date_", 'Date', default_value_for_get='2018-12-30'), + Field("DateTime_", 'DateTime', default_value_for_get='2018-12-30 00:00:00'), + Field("String_", 'String', default_value_for_get='hi'), + Field("Float32_", 'Float32', default_value_for_get=555.11), + Field("Float64_", 'Float64', default_value_for_get=777.11), ], "ranged": [ Field("KeyField1", 'UInt64', is_key=True), Field("KeyField2", 'Date', is_range_key=True), Field("StartDate", 'Date', range_hash_type='min'), Field("EndDate", 'Date', range_hash_type='max'), - Field("UInt8_", 'UInt8'), - Field("UInt16_", 'UInt16'), - Field("UInt32_", 'UInt32'), - Field("UInt64_", 'UInt64'), - Field("Int8_", 'Int8'), - Field("Int16_", 'Int16'), - Field("Int32_", 'Int32'), - Field("Int64_", 'Int64'), - Field("UUID_", 'UUID'), - Field("Date_", 'Date'), - Field("DateTime_", 'DateTime'), - Field("String_", 'String'), - Field("Float32_", 'Float32'), - Field("Float64_", 'Float64'), + Field("UInt8_", 'UInt8', default_value_for_get=55), + Field("UInt16_", 'UInt16', default_value_for_get=66), + Field("UInt32_", 'UInt32', default_value_for_get=77), + Field("UInt64_", 'UInt64', default_value_for_get=88), + Field("Int8_", 'Int8', default_value_for_get=-55), + Field("Int16_", 'Int16', default_value_for_get=-66), + Field("Int32_", 'Int32', default_value_for_get=-77), + Field("Int64_", 'Int64', default_value_for_get=-88), + Field("UUID_", 'UUID', default_value_for_get='550e8400-0000-0000-0000-000000000000'), + Field("Date_", 'Date', default_value_for_get='2018-12-30'), + Field("DateTime_", 'DateTime', default_value_for_get='2018-12-30 00:00:00'), + Field("String_", 'String', default_value_for_get='hi'), + Field("Float32_", 'Float32', default_value_for_get=555.11), + Field("Float64_", 'Float64', default_value_for_get=777.11), ] } @@ -77,6 +78,7 @@ LAYOUTS = [ ] SOURCES = [ + # some kind of troubles with that dictionary #SourceMongo("MongoDB", "localhost", "27018", "mongo1", "27017", "root", "clickhouse"), SourceMySQL("MySQL", "localhost", "3308", "mysql1", "3306", "root", "clickhouse"), SourceClickHouse("RemoteClickHouse", "localhost", "9000", "clickhouse1", "9000", "default", ""), @@ -84,6 +86,8 @@ SOURCES = [ SourceFile("File", "localhost", "9000", "node", "9000", "", ""), SourceExecutableHashed("ExecutableHashed", "localhost", "9000", "node", "9000", "", ""), SourceExecutableCache("ExecutableCache", "localhost", "9000", "node", "9000", "", ""), + SourceHTTP("SourceHTTP", "localhost", "9000", "clickhouse1", "9000", "", ""), + SourceHTTPS("SourceHTTPS", "localhost", "9000", "clickhouse1", "9000", "", ""), ] DICTIONARIES = [] @@ -116,8 +120,8 @@ def setup_module(module): for fname in os.listdir(dict_configs_path): main_configs.append(os.path.join(dict_configs_path, fname)) cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs')) - node = cluster.add_instance('node', main_configs=main_configs, with_mysql=True, with_mongo=True) - cluster.add_instance('clickhouse1') + node = cluster.add_instance('node', main_configs=main_configs, with_mysql=True) + cluster.add_instance('clickhouse1', image="python") @pytest.fixture(scope="module") def started_cluster(): @@ -131,7 +135,6 @@ def started_cluster(): yield cluster finally: - pass cluster.shutdown() @@ -153,7 +156,14 @@ def test_simple_dictionaries(started_cluster): for row in data: for field in fields: if not field.is_key: - queries_with_answers.append((dct.get_select_query(field, row), row.get_value_by_name(field.name))) + for query in dct.get_select_get_queries(field, row): + queries_with_answers.append((query, row.get_value_by_name(field.name))) + + for query in dct.get_select_has_queries(field, row): + queries_with_answers.append((query, 1)) + + for query in dct.get_select_get_or_default_queries(field, row): + queries_with_answers.append((query, field.default_value_for_get)) for query, answer in queries_with_answers: print query @@ -178,7 +188,14 @@ def test_complex_dictionaries(started_cluster): for row in data: for field in fields: if not field.is_key: - queries_with_answers.append((dct.get_select_query(field, row), row.get_value_by_name(field.name))) + for query in dct.get_select_get_queries(field, row): + queries_with_answers.append((query, row.get_value_by_name(field.name))) + + for query in dct.get_select_has_queries(field, row): + queries_with_answers.append((query, 1)) + + for query in dct.get_select_get_or_default_queries(field, row): + queries_with_answers.append((query, field.default_value_for_get)) for query, answer in queries_with_answers: print query @@ -205,7 +222,8 @@ def test_ranged_dictionaries(started_cluster): for row in data: for field in fields: if not field.is_key and not field.is_range: - queries_with_answers.append((dct.get_select_query(field, row), row.get_value_by_name(field.name))) + for query in dct.get_select_get_queries(field, row): + queries_with_answers.append((query, row.get_value_by_name(field.name))) for query, answer in queries_with_answers: print query From 5a73a98619fe1f99027a6b77da8fae02850a3f6d Mon Sep 17 00:00:00 2001 From: alesapin Date: Fri, 22 Feb 2019 15:13:55 +0300 Subject: [PATCH 06/13] Remove pymongo --- .../external_sources.py | 70 +++++++++---------- .../test_external_dictionaries/test.py | 2 + 2 files changed, 37 insertions(+), 35 deletions(-) diff --git a/dbms/tests/integration/test_external_dictionaries/external_sources.py b/dbms/tests/integration/test_external_dictionaries/external_sources.py index 251e85b8523..030d32f7e51 100644 --- a/dbms/tests/integration/test_external_dictionaries/external_sources.py +++ b/dbms/tests/integration/test_external_dictionaries/external_sources.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- import warnings import pymysql.cursors -import pymongo +#import pymongo import os @@ -123,40 +123,40 @@ class SourceMySQL(ExternalSource): self.execute_mysql_query(query) -class SourceMongo(ExternalSource): - - def get_source_str(self, table_name): - return ''' - - {host} - {port} - {user} - {password} - test - {tbl} - - '''.format( - host=self.docker_hostname, - port=self.docker_port, - user=self.user, - password=self.password, - tbl=table_name, - ) - - def prepare(self, structure, table_name, cluster): - connection_str = 'mongodb://{user}:{password}@{host}:{port}'.format( - host=self.internal_hostname, port=self.internal_port, - user=self.user, password=self.password) - self.connection = pymongo.MongoClient(connection_str) - self.connection.create - self.structure = structure - self.db = self.connection['test'] - self.prepared = True - - def load_data(self, data, table_name): - tbl = self.db[table_name] - to_insert = [dict(row.data) for row in data] - result = tbl.insert_many(to_insert) +#class SourceMongo(ExternalSource): +# +# def get_source_str(self, table_name): +# return ''' +# +# {host} +# {port} +# {user} +# {password} +# test +# {tbl} +# +# '''.format( +# host=self.docker_hostname, +# port=self.docker_port, +# user=self.user, +# password=self.password, +# tbl=table_name, +# ) +# +# def prepare(self, structure, table_name, cluster): +# connection_str = 'mongodb://{user}:{password}@{host}:{port}'.format( +# host=self.internal_hostname, port=self.internal_port, +# user=self.user, password=self.password) +# self.connection = pymongo.MongoClient(connection_str) +# self.connection.create +# self.structure = structure +# self.db = self.connection['test'] +# self.prepared = True +# +# def load_data(self, data, table_name): +# tbl = self.db[table_name] +# to_insert = [dict(row.data) for row in data] +# result = tbl.insert_many(to_insert) class SourceClickHouse(ExternalSource): diff --git a/dbms/tests/integration/test_external_dictionaries/test.py b/dbms/tests/integration/test_external_dictionaries/test.py index 5a15fc8b725..11b93e9874f 100644 --- a/dbms/tests/integration/test_external_dictionaries/test.py +++ b/dbms/tests/integration/test_external_dictionaries/test.py @@ -151,6 +151,7 @@ def test_simple_dictionaries(started_cluster): dct.load_data(data) node.query("system reload dictionaries") + queries_with_answers = [] for dct in simple_dicts: for row in data: @@ -183,6 +184,7 @@ def test_complex_dictionaries(started_cluster): dct.load_data(data) node.query("system reload dictionaries") + queries_with_answers = [] for dct in complex_dicts: for row in data: From e22a6cd6ecdbe7145d66dd4c8341821409efc01f Mon Sep 17 00:00:00 2001 From: alesapin Date: Fri, 22 Feb 2019 16:43:31 +0300 Subject: [PATCH 07/13] Remove mongo --- dbms/tests/integration/test_external_dictionaries/test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/tests/integration/test_external_dictionaries/test.py b/dbms/tests/integration/test_external_dictionaries/test.py index 11b93e9874f..af3a73c902b 100644 --- a/dbms/tests/integration/test_external_dictionaries/test.py +++ b/dbms/tests/integration/test_external_dictionaries/test.py @@ -4,7 +4,7 @@ import time from helpers.cluster import ClickHouseCluster from dictionary import Field, Row, Dictionary, DictionaryStructure, Layout -from external_sources import SourceMySQL, SourceMongo, SourceClickHouse, SourceFile, SourceExecutableCache, SourceExecutableHashed +from external_sources import SourceMySQL, SourceClickHouse, SourceFile, SourceExecutableCache, SourceExecutableHashed from external_sources import SourceHTTP, SourceHTTPS SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) From 22a54bbd77f9d68cfbee9e4d826588d39736edee Mon Sep 17 00:00:00 2001 From: alesapin Date: Sun, 24 Feb 2019 13:28:47 +0300 Subject: [PATCH 08/13] Add python into integration tests image --- dbms/tests/integration/test_external_dictionaries/test.py | 2 +- docker/test/integration/Dockerfile | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/tests/integration/test_external_dictionaries/test.py b/dbms/tests/integration/test_external_dictionaries/test.py index af3a73c902b..8fe775ca966 100644 --- a/dbms/tests/integration/test_external_dictionaries/test.py +++ b/dbms/tests/integration/test_external_dictionaries/test.py @@ -121,7 +121,7 @@ def setup_module(module): main_configs.append(os.path.join(dict_configs_path, fname)) cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs')) node = cluster.add_instance('node', main_configs=main_configs, with_mysql=True) - cluster.add_instance('clickhouse1', image="python") + cluster.add_instance('clickhouse1') @pytest.fixture(scope="module") def started_cluster(): diff --git a/docker/test/integration/Dockerfile b/docker/test/integration/Dockerfile index e3237814fae..57d770acc1f 100644 --- a/docker/test/integration/Dockerfile +++ b/docker/test/integration/Dockerfile @@ -1,6 +1,6 @@ FROM ubuntu:18.04 -RUN apt-get update && apt-get -y install tzdata +RUN apt-get update && apt-get -y install tzdata python ENV TZ=Europe/Moscow RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone From da33310f1b8ccd7cd2fadbb0eb65ca45b31d6d0e Mon Sep 17 00:00:00 2001 From: alesapin Date: Sun, 24 Feb 2019 13:58:52 +0300 Subject: [PATCH 09/13] Add multiple rows to test, fix bug --- .../external_sources.py | 4 +- .../test_external_dictionaries/test.py | 42 +++++++++++++------ 2 files changed, 32 insertions(+), 14 deletions(-) diff --git a/dbms/tests/integration/test_external_dictionaries/external_sources.py b/dbms/tests/integration/test_external_dictionaries/external_sources.py index 030d32f7e51..a6213fd14ec 100644 --- a/dbms/tests/integration/test_external_dictionaries/external_sources.py +++ b/dbms/tests/integration/test_external_dictionaries/external_sources.py @@ -119,7 +119,7 @@ class SourceMySQL(ExternalSource): query = 'insert into test.{} ({}) values {}'.format( table_name, ','.join(self.ordered_names), - ''.join(values_strs)) + ','.join(values_strs)) self.execute_mysql_query(query) @@ -208,7 +208,7 @@ class SourceClickHouse(ExternalSource): query = 'INSERT INTO test.{} ({}) values {}'.format( table_name, ','.join(self.ordered_names), - ''.join(values_strs)) + ','.join(values_strs)) self.node.query(query) diff --git a/dbms/tests/integration/test_external_dictionaries/test.py b/dbms/tests/integration/test_external_dictionaries/test.py index 8fe775ca966..a79bde7e3ec 100644 --- a/dbms/tests/integration/test_external_dictionaries/test.py +++ b/dbms/tests/integration/test_external_dictionaries/test.py @@ -141,9 +141,14 @@ def started_cluster(): def test_simple_dictionaries(started_cluster): fields = FIELDS["simple"] data = [ - Row(fields, [1, 22, 333, 4444, 55555, -6, -77, - -888, -999, '550e8400-e29b-41d4-a716-446655440003', - '1973-06-28', '1985-02-28 23:43:25', 'hello', 22.543, 3332154213.4]), + Row(fields, + [1, 22, 333, 4444, 55555, -6, -77, + -888, -999, '550e8400-e29b-41d4-a716-446655440003', + '1973-06-28', '1985-02-28 23:43:25', 'hello', 22.543, 3332154213.4]), + Row(fields, + [2, 3, 4, 5, 6, -7, -8, + -9, -10, '550e8400-e29b-41d4-a716-446655440002', + '1978-06-28', '1986-02-28 23:42:25', 'hello', 21.543, 3222154213.4]), ] simple_dicts = [d for d in DICTIONARIES if d.structure.layout.layout_type == "simple"] @@ -173,10 +178,16 @@ def test_simple_dictionaries(started_cluster): def test_complex_dictionaries(started_cluster): fields = FIELDS["complex"] data = [ - Row(fields, [1, 'world', 22, 333, 4444, 55555, -6, - -77, -888, -999, '550e8400-e29b-41d4-a716-446655440003', - '1973-06-28', '1985-02-28 23:43:25', - 'hello', 22.543, 3332154213.4]), + Row(fields, + [1, 'world', 22, 333, 4444, 55555, -6, + -77, -888, -999, '550e8400-e29b-41d4-a716-446655440003', + '1973-06-28', '1985-02-28 23:43:25', + 'hello', 22.543, 3332154213.4]), + Row(fields, + [2, 'qwerty2', 52, 2345, 6544, 9191991, -2, + -717, -81818, -92929, '550e8400-e29b-41d4-a716-446655440007', + '1975-09-28', '2000-02-28 23:33:24', + 'my', 255.543, 333222154213.4]), ] complex_dicts = [d for d in DICTIONARIES if d.structure.layout.layout_type == "complex"] @@ -206,11 +217,18 @@ def test_complex_dictionaries(started_cluster): def test_ranged_dictionaries(started_cluster): fields = FIELDS["ranged"] data = [ - Row(fields, [1, '2019-02-10', '2019-02-01', '2019-02-28', - 22, 333, 4444, 55555, -6, -77, -888, -999, - '550e8400-e29b-41d4-a716-446655440003', - '1973-06-28', '1985-02-28 23:43:25', 'hello', - 22.543, 3332154213.4]), + Row(fields, + [1, '2019-02-10', '2019-02-01', '2019-02-28', + 22, 333, 4444, 55555, -6, -77, -888, -999, + '550e8400-e29b-41d4-a716-446655440003', + '1973-06-28', '1985-02-28 23:43:25', 'hello', + 22.543, 3332154213.4]), + Row(fields, + [1, '2019-04-10', '2019-04-01', '2019-04-28', + 555, 32323, 414144, 5255515, -65, -747, -8388, -9099, + '550e8400-e29b-41d4-a716-446655440004', + '1973-06-29', '2002-02-28 23:23:25', '!!!!', + 32.543, 33321545513.4]), ] ranged_dicts = [d for d in DICTIONARIES if d.structure.layout.layout_type == "ranged"] From 40cbe4c25380c0d47328428e69c3c954c061d423 Mon Sep 17 00:00:00 2001 From: alesapin Date: Sun, 24 Feb 2019 20:45:10 +0300 Subject: [PATCH 10/13] Add hierarchical tests --- .../test_external_dictionaries/dictionary.py | 31 +++++++++++++++++++ .../test_external_dictionaries/test.py | 26 ++++++++++++---- 2 files changed, 51 insertions(+), 6 deletions(-) diff --git a/dbms/tests/integration/test_external_dictionaries/dictionary.py b/dbms/tests/integration/test_external_dictionaries/dictionary.py index e909d0d54ab..d67fc1d92a1 100644 --- a/dbms/tests/integration/test_external_dictionaries/dictionary.py +++ b/dbms/tests/integration/test_external_dictionaries/dictionary.py @@ -251,6 +251,31 @@ class DictionaryStructure(object): return [self._get_dict_get_common_expression(dict_name, field, row, or_default=False, with_type=False, has=True)] return [] + def get_hierarchical_expressions(self, dict_name, row): + if self.layout.is_simple: + key_expr = 'toUInt64({})'.format(row.data[self.keys[0].name]) + return [ + "dictGetHierarchy('{dict_name}', {key})".format( + dict_name=dict_name, + key=key_expr, + ), + ] + + return [] + + def get_is_in_expressions(self, dict_name, row, parent_row): + if self.layout.is_simple: + child_key_expr = 'toUInt64({})'.format(row.data[self.keys[0].name]) + parent_key_expr = 'toUInt64({})'.format(parent_row.data[self.keys[0].name]) + return [ + "dictIsIn('{dict_name}', {child_key}, {parent_key})".format( + dict_name=dict_name, + child_key=child_key_expr, + parent_key=parent_key_expr,) + ] + + return [] + class Dictionary(object): def __init__(self, name, structure, source, config_path, table_name): @@ -300,5 +325,11 @@ class Dictionary(object): def get_select_has_queries(self, field, row): return ['select {}'.format(expr) for expr in self.structure.get_has_expressions(self.name, field, row)] + def get_hierarchical_queries(self, row): + return ['select {}'.format(expr) for expr in self.structure.get_hierarchical_expressions(self.name, row)] + + def get_is_in_queries(self, row, parent_row): + return ['select {}'.format(expr) for expr in self.structure.get_is_in_expressions(self.name, row, parent_row)] + def is_complex(self): return self.structure.layout.is_complex diff --git a/dbms/tests/integration/test_external_dictionaries/test.py b/dbms/tests/integration/test_external_dictionaries/test.py index a79bde7e3ec..64006d22f7f 100644 --- a/dbms/tests/integration/test_external_dictionaries/test.py +++ b/dbms/tests/integration/test_external_dictionaries/test.py @@ -26,6 +26,7 @@ FIELDS = { Field("String_", 'String', default_value_for_get='hi'), Field("Float32_", 'Float32', default_value_for_get=555.11), Field("Float64_", 'Float64', default_value_for_get=777.11), + Field("ParentKeyField", "UInt64", default_value_for_get=444, hierarchical=True) ], "complex": [ Field("KeyField1", 'UInt64', is_key=True, default_value_for_get=9999999), @@ -78,7 +79,7 @@ LAYOUTS = [ ] SOURCES = [ - # some kind of troubles with that dictionary + # some troubles with that dictionary #SourceMongo("MongoDB", "localhost", "27018", "mongo1", "27017", "root", "clickhouse"), SourceMySQL("MySQL", "localhost", "3308", "mysql1", "3306", "root", "clickhouse"), SourceClickHouse("RemoteClickHouse", "localhost", "9000", "clickhouse1", "9000", "default", ""), @@ -144,11 +145,11 @@ def test_simple_dictionaries(started_cluster): Row(fields, [1, 22, 333, 4444, 55555, -6, -77, -888, -999, '550e8400-e29b-41d4-a716-446655440003', - '1973-06-28', '1985-02-28 23:43:25', 'hello', 22.543, 3332154213.4]), + '1973-06-28', '1985-02-28 23:43:25', 'hello', 22.543, 3332154213.4, 0]), Row(fields, [2, 3, 4, 5, 6, -7, -8, -9, -10, '550e8400-e29b-41d4-a716-446655440002', - '1978-06-28', '1986-02-28 23:42:25', 'hello', 21.543, 3222154213.4]), + '1978-06-28', '1986-02-28 23:42:25', 'hello', 21.543, 3222154213.4, 1]), ] simple_dicts = [d for d in DICTIONARIES if d.structure.layout.layout_type == "simple"] @@ -170,9 +171,22 @@ def test_simple_dictionaries(started_cluster): for query in dct.get_select_get_or_default_queries(field, row): queries_with_answers.append((query, field.default_value_for_get)) + for query in dct.get_hierarchical_queries(data[0]): + queries_with_answers.append((query, [1])) + + for query in dct.get_hierarchical_queries(data[1]): + queries_with_answers.append((query, [2, 1])) + + for query in dct.get_is_in_queries(data[0], data[1]): + queries_with_answers.append((query, 0)) + + for query in dct.get_is_in_queries(data[1], data[0]): + queries_with_answers.append((query, 1)) for query, answer in queries_with_answers: print query + if isinstance(answer, list): + answer = str(answer).replace(' ', '') assert node.query(query) == str(answer) + '\n' def test_complex_dictionaries(started_cluster): @@ -187,7 +201,7 @@ def test_complex_dictionaries(started_cluster): [2, 'qwerty2', 52, 2345, 6544, 9191991, -2, -717, -81818, -92929, '550e8400-e29b-41d4-a716-446655440007', '1975-09-28', '2000-02-28 23:33:24', - 'my', 255.543, 333222154213.4]), + 'my', 255.543, 3332221.44]), ] complex_dicts = [d for d in DICTIONARIES if d.structure.layout.layout_type == "complex"] @@ -225,10 +239,10 @@ def test_ranged_dictionaries(started_cluster): 22.543, 3332154213.4]), Row(fields, [1, '2019-04-10', '2019-04-01', '2019-04-28', - 555, 32323, 414144, 5255515, -65, -747, -8388, -9099, + 11, 3223, 41444, 52515, -65, -747, -8388, -9099, '550e8400-e29b-41d4-a716-446655440004', '1973-06-29', '2002-02-28 23:23:25', '!!!!', - 32.543, 33321545513.4]), + 32.543, 3332543.4]), ] ranged_dicts = [d for d in DICTIONARIES if d.structure.layout.layout_type == "ranged"] From 3515d9bb8c45b50704ee4f6a27e068001b22c6d5 Mon Sep 17 00:00:00 2001 From: alesapin Date: Sun, 24 Feb 2019 20:57:03 +0300 Subject: [PATCH 11/13] Better intendation --- .../configs/users.xml | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/dbms/tests/integration/test_external_dictionaries/configs/users.xml b/dbms/tests/integration/test_external_dictionaries/configs/users.xml index 60112f99a18..6061af8e33d 100644 --- a/dbms/tests/integration/test_external_dictionaries/configs/users.xml +++ b/dbms/tests/integration/test_external_dictionaries/configs/users.xml @@ -1,23 +1,23 @@ - - + + - + - - - - - ::/0 - - default - default - - + + + + + ::/0 + + default + default + + - - - - - + + + + + From 9d91dbb7c591d88d4d504145725279f6a913381d Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 25 Feb 2019 13:45:22 +0300 Subject: [PATCH 12/13] Finally add mongo source --- .../Dictionaries/MongoDBDictionarySource.cpp | 3 +- dbms/tests/integration/README.md | 2 +- dbms/tests/integration/helpers/cluster.py | 32 ++++++- .../helpers/docker_compose_mongo.yml | 2 +- dbms/tests/integration/image/Dockerfile | 4 +- .../test_external_dictionaries/dictionary.py | 2 + .../external_sources.py | 88 +++++++++++-------- .../test_external_dictionaries/test.py | 6 +- 8 files changed, 93 insertions(+), 46 deletions(-) diff --git a/dbms/src/Dictionaries/MongoDBDictionarySource.cpp b/dbms/src/Dictionaries/MongoDBDictionarySource.cpp index ec7ddc033f5..73ffd4727fa 100644 --- a/dbms/src/Dictionaries/MongoDBDictionarySource.cpp +++ b/dbms/src/Dictionaries/MongoDBDictionarySource.cpp @@ -192,7 +192,8 @@ MongoDBDictionarySource::MongoDBDictionarySource( { # if POCO_VERSION >= 0x01070800 Poco::MongoDB::Database poco_db(db); - poco_db.authenticate(*connection, user, password, method.empty() ? Poco::MongoDB::Database::AUTH_SCRAM_SHA1 : method); + if (!poco_db.authenticate(*connection, user, password, method.empty() ? Poco::MongoDB::Database::AUTH_SCRAM_SHA1 : method)) + throw Exception("Cannot authenticate in MongoDB, incorrect user or password", ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); # else authenticate(*connection, db, user, password); # endif diff --git a/dbms/tests/integration/README.md b/dbms/tests/integration/README.md index a1482a7c7c1..6a707f1ea01 100644 --- a/dbms/tests/integration/README.md +++ b/dbms/tests/integration/README.md @@ -14,7 +14,7 @@ Don't use Docker from your system repository. * [pip](https://pypi.python.org/pypi/pip). To install: `sudo apt-get install python-pip` * [py.test](https://docs.pytest.org/) testing framework. To install: `sudo -H pip install pytest` -* [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install: `sudo -H pip install docker-compose docker dicttoxml kazoo PyMySQL psycopg2` +* [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install: `sudo -H pip install docker-compose docker dicttoxml kazoo PyMySQL psycopg2 pymongo tzlocal` (highly not recommended) If you really want to use OS packages on modern debian/ubuntu instead of "pip": `sudo apt install -y docker docker-compose python-pytest python-dicttoxml python-docker python-pymysql python-kazoo python-psycopg2` diff --git a/dbms/tests/integration/helpers/cluster.py b/dbms/tests/integration/helpers/cluster.py index c6b18634320..8bd930ac1b0 100644 --- a/dbms/tests/integration/helpers/cluster.py +++ b/dbms/tests/integration/helpers/cluster.py @@ -16,6 +16,7 @@ from kazoo.exceptions import KazooException import psycopg2 import requests import base64 +import pymongo import docker from docker.errors import ContainerError @@ -99,6 +100,7 @@ class ClickHouseCluster: self.with_kafka = False self.with_odbc_drivers = False self.with_hdfs = False + self.with_mongo = False self.docker_client = None self.is_up = False @@ -110,7 +112,7 @@ class ClickHouseCluster: cmd += " client" return cmd - def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macros={}, with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None, with_odbc_drivers=False, with_postgres=False, with_hdfs=False, hostname=None, env_variables={}, image="yandex/clickhouse-integration-test", stay_alive=False, ipv4_address=None, ipv6_address=None): + def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macros={}, with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None, with_odbc_drivers=False, with_postgres=False, with_hdfs=False, with_mongo=False, hostname=None, env_variables={}, image="yandex/clickhouse-integration-test", stay_alive=False, ipv4_address=None, ipv6_address=None): """Add an instance to the cluster. name - the name of the instance directory and the value of the 'instance' macro in ClickHouse. @@ -128,7 +130,7 @@ class ClickHouseCluster: instance = ClickHouseInstance( self, self.base_dir, name, config_dir, main_configs, user_configs, macros, with_zookeeper, - self.zookeeper_config_path, with_mysql, with_kafka, self.base_configs_dir, self.server_bin_path, + self.zookeeper_config_path, with_mysql, with_kafka, with_mongo, self.base_configs_dir, self.server_bin_path, self.odbc_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers, hostname=hostname, env_variables=env_variables, image=image, stay_alive=stay_alive, ipv4_address=ipv4_address, ipv6_address=ipv6_address) @@ -177,6 +179,11 @@ class ClickHouseCluster: self.base_hdfs_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_hdfs.yml')] + if with_mongo and not self.with_mongo: + self.with_mongo = True + self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_mongo.yml')]) + self.base_mongo_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', + self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mongo.yml')] return instance @@ -249,6 +256,20 @@ class ClickHouseCluster: raise Exception("Can't wait HDFS to start") + def wait_mongo_to_start(self, timeout=30): + connection_str = 'mongodb://{user}:{password}@{host}:{port}'.format( + host='localhost', port='27018', user='root', password='clickhouse') + connection = pymongo.MongoClient(connection_str) + start = time.time() + while time.time() - start < timeout: + try: + connection.database_names() + print "Connected to Mongo dbs:", connection.database_names() + return + except Exception as ex: + print "Can't connect to Mongo " + str(ex) + time.sleep(1) + def start(self, destroy_dirs=True): if self.is_up: return @@ -291,6 +312,10 @@ class ClickHouseCluster: subprocess_check_call(self.base_hdfs_cmd + ['up', '-d', '--force-recreate']) self.wait_hdfs_to_start(120) + if self.with_mongo and self.base_mongo_cmd: + subprocess_check_call(self.base_mongo_cmd + ['up', '-d', '--force-recreate']) + self.wait_mongo_to_start(30) + subprocess_check_call(self.base_cmd + ['up', '-d', '--no-recreate']) start_deadline = time.time() + 20.0 # seconds @@ -389,7 +414,7 @@ class ClickHouseInstance: def __init__( self, cluster, base_path, name, custom_config_dir, custom_main_configs, custom_user_configs, macros, - with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, base_configs_dir, server_bin_path, odbc_bridge_bin_path, + with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_mongo, base_configs_dir, server_bin_path, odbc_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers, hostname=None, env_variables={}, image="yandex/clickhouse-integration-test", stay_alive=False, ipv4_address=None, ipv6_address=None): @@ -413,6 +438,7 @@ class ClickHouseInstance: self.with_mysql = with_mysql self.with_kafka = with_kafka + self.with_mongo = with_mongo self.path = p.join(self.cluster.instances_dir, name) self.docker_compose_path = p.join(self.path, 'docker_compose.yml') diff --git a/dbms/tests/integration/helpers/docker_compose_mongo.yml b/dbms/tests/integration/helpers/docker_compose_mongo.yml index f9d3b15ba0a..a593c3e123b 100644 --- a/dbms/tests/integration/helpers/docker_compose_mongo.yml +++ b/dbms/tests/integration/helpers/docker_compose_mongo.yml @@ -1,7 +1,7 @@ version: '2.2' services: mongo1: - image: mongo + image: mongo:3.6 restart: always environment: MONGO_INITDB_ROOT_USERNAME: root diff --git a/dbms/tests/integration/image/Dockerfile b/dbms/tests/integration/image/Dockerfile index 118968bd745..6ee674448a4 100644 --- a/dbms/tests/integration/image/Dockerfile +++ b/dbms/tests/integration/image/Dockerfile @@ -24,7 +24,7 @@ RUN apt-get update && env DEBIAN_FRONTEND=noninteractive apt-get install --yes - ENV TZ=Europe/Moscow RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone -RUN pip install pytest docker-compose==1.22.0 docker dicttoxml kazoo PyMySQL psycopg2 +RUN pip install pytest docker-compose==1.22.0 docker dicttoxml kazoo PyMySQL psycopg2 pymongo tzlocal ENV DOCKER_CHANNEL stable ENV DOCKER_VERSION 17.09.1-ce @@ -61,4 +61,4 @@ RUN set -x \ VOLUME /var/lib/docker EXPOSE 2375 ENTRYPOINT ["dockerd-entrypoint.sh"] -CMD [] \ No newline at end of file +CMD [] diff --git a/dbms/tests/integration/test_external_dictionaries/dictionary.py b/dbms/tests/integration/test_external_dictionaries/dictionary.py index d67fc1d92a1..bdddc7a9604 100644 --- a/dbms/tests/integration/test_external_dictionaries/dictionary.py +++ b/dbms/tests/integration/test_external_dictionaries/dictionary.py @@ -157,6 +157,8 @@ class DictionaryStructure(object): fields_strs.append(field.name) return fields_strs + def get_all_fields(self): + return self.keys + self.range_fields + self.ordinary_fields def _get_dict_get_common_expression(self, dict_name, field, row, or_default, with_type, has): if field in self.keys: diff --git a/dbms/tests/integration/test_external_dictionaries/external_sources.py b/dbms/tests/integration/test_external_dictionaries/external_sources.py index a6213fd14ec..71dc05ca78c 100644 --- a/dbms/tests/integration/test_external_dictionaries/external_sources.py +++ b/dbms/tests/integration/test_external_dictionaries/external_sources.py @@ -1,7 +1,9 @@ # -*- coding: utf-8 -*- import warnings import pymysql.cursors -#import pymongo +import pymongo +from tzlocal import get_localzone +import datetime import os @@ -50,6 +52,7 @@ class SourceMySQL(ExternalSource): 'Float32': 'float', 'Float64': 'double' } + def create_mysql_conn(self): self.connection = pymysql.connect( user=self.user, @@ -123,40 +126,55 @@ class SourceMySQL(ExternalSource): self.execute_mysql_query(query) -#class SourceMongo(ExternalSource): -# -# def get_source_str(self, table_name): -# return ''' -# -# {host} -# {port} -# {user} -# {password} -# test -# {tbl} -# -# '''.format( -# host=self.docker_hostname, -# port=self.docker_port, -# user=self.user, -# password=self.password, -# tbl=table_name, -# ) -# -# def prepare(self, structure, table_name, cluster): -# connection_str = 'mongodb://{user}:{password}@{host}:{port}'.format( -# host=self.internal_hostname, port=self.internal_port, -# user=self.user, password=self.password) -# self.connection = pymongo.MongoClient(connection_str) -# self.connection.create -# self.structure = structure -# self.db = self.connection['test'] -# self.prepared = True -# -# def load_data(self, data, table_name): -# tbl = self.db[table_name] -# to_insert = [dict(row.data) for row in data] -# result = tbl.insert_many(to_insert) +class SourceMongo(ExternalSource): + + def get_source_str(self, table_name): + return ''' + + {host} + {port} + {user} + {password} + test + {tbl} + + '''.format( + host=self.docker_hostname, + port=self.docker_port, + user=self.user, + password=self.password, + tbl=table_name, + ) + + def prepare(self, structure, table_name, cluster): + connection_str = 'mongodb://{user}:{password}@{host}:{port}'.format( + host=self.internal_hostname, port=self.internal_port, + user=self.user, password=self.password) + self.connection = pymongo.MongoClient(connection_str) + self.converters = {} + for field in structure.get_all_fields(): + if field.field_type == "Date": + self.converters[field.name] = lambda x: datetime.datetime.strptime(x, "%Y-%m-%d") + elif field.field_type == "DateTime": + self.converters[field.name] = lambda x: get_localzone().localize(datetime.datetime.strptime(x, "%Y-%m-%d %H:%M:%S")) + else: + self.converters[field.name] = lambda x: x + + self.db = self.connection['test'] + self.db.add_user(self.user, self.password) + self.prepared = True + + def load_data(self, data, table_name): + tbl = self.db[table_name] + + to_insert = [] + for row in data: + row_dict = {} + for cell_name, cell_value in row.data.items(): + row_dict[cell_name] = self.converters[cell_name](cell_value) + to_insert.append(row_dict) + + result = tbl.insert_many(to_insert) class SourceClickHouse(ExternalSource): diff --git a/dbms/tests/integration/test_external_dictionaries/test.py b/dbms/tests/integration/test_external_dictionaries/test.py index 64006d22f7f..7878620a65d 100644 --- a/dbms/tests/integration/test_external_dictionaries/test.py +++ b/dbms/tests/integration/test_external_dictionaries/test.py @@ -4,7 +4,7 @@ import time from helpers.cluster import ClickHouseCluster from dictionary import Field, Row, Dictionary, DictionaryStructure, Layout -from external_sources import SourceMySQL, SourceClickHouse, SourceFile, SourceExecutableCache, SourceExecutableHashed +from external_sources import SourceMySQL, SourceClickHouse, SourceFile, SourceExecutableCache, SourceExecutableHashed, SourceMongo from external_sources import SourceHTTP, SourceHTTPS SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) @@ -80,7 +80,7 @@ LAYOUTS = [ SOURCES = [ # some troubles with that dictionary - #SourceMongo("MongoDB", "localhost", "27018", "mongo1", "27017", "root", "clickhouse"), + SourceMongo("MongoDB", "localhost", "27018", "mongo1", "27017", "root", "clickhouse"), SourceMySQL("MySQL", "localhost", "3308", "mysql1", "3306", "root", "clickhouse"), SourceClickHouse("RemoteClickHouse", "localhost", "9000", "clickhouse1", "9000", "default", ""), SourceClickHouse("LocalClickHouse", "localhost", "9000", "node", "9000", "default", ""), @@ -121,7 +121,7 @@ def setup_module(module): for fname in os.listdir(dict_configs_path): main_configs.append(os.path.join(dict_configs_path, fname)) cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs')) - node = cluster.add_instance('node', main_configs=main_configs, with_mysql=True) + node = cluster.add_instance('node', main_configs=main_configs, with_mysql=True, with_mongo=True) cluster.add_instance('clickhouse1') @pytest.fixture(scope="module") From 350d5a78dd96b67fba0eef0129173f58b92452a7 Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 25 Feb 2019 13:47:29 +0300 Subject: [PATCH 13/13] Better config --- .../configs/config.xml | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/dbms/tests/integration/test_external_dictionaries/configs/config.xml b/dbms/tests/integration/test_external_dictionaries/configs/config.xml index c102ff1ed91..97cbaf97d4a 100644 --- a/dbms/tests/integration/test_external_dictionaries/configs/config.xml +++ b/dbms/tests/integration/test_external_dictionaries/configs/config.xml @@ -1,30 +1,30 @@ - - trace - /var/log/clickhouse-server/clickhouse-server.log - /var/log/clickhouse-server/clickhouse-server.err.log - 1000M - 10 + + trace + /var/log/clickhouse-server/clickhouse-server.log + /var/log/clickhouse-server/clickhouse-server.err.log + 1000M + 10 9000 127.0.0.1 - - true - none - - AcceptCertificateHandler + + true + none + + AcceptCertificateHandler - - + + - 500 - 5368709120 - ./clickhouse/ - users.xml + 500 + 5368709120 + ./clickhouse/ + users.xml - /etc/clickhouse-server/config.d/*.xml + /etc/clickhouse-server/config.d/*.xml