Merge pull request #4477 from yandex/external_dict_integration_test

External dict integration test
This commit is contained in:
alesapin 2019-02-25 15:27:22 +03:00 committed by GitHub
commit 3b30b2f856
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 1175 additions and 10 deletions

View File

@ -192,7 +192,8 @@ MongoDBDictionarySource::MongoDBDictionarySource(
{ {
# if POCO_VERSION >= 0x01070800 # if POCO_VERSION >= 0x01070800
Poco::MongoDB::Database poco_db(db); Poco::MongoDB::Database poco_db(db);
poco_db.authenticate(*connection, user, password, method.empty() ? Poco::MongoDB::Database::AUTH_SCRAM_SHA1 : method); if (!poco_db.authenticate(*connection, user, password, method.empty() ? Poco::MongoDB::Database::AUTH_SCRAM_SHA1 : method))
throw Exception("Cannot authenticate in MongoDB, incorrect user or password", ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
# else # else
authenticate(*connection, db, user, password); authenticate(*connection, db, user, password);
# endif # endif

View File

@ -14,7 +14,7 @@ Don't use Docker from your system repository.
* [pip](https://pypi.python.org/pypi/pip). To install: `sudo apt-get install python-pip` * [pip](https://pypi.python.org/pypi/pip). To install: `sudo apt-get install python-pip`
* [py.test](https://docs.pytest.org/) testing framework. To install: `sudo -H pip install pytest` * [py.test](https://docs.pytest.org/) testing framework. To install: `sudo -H pip install pytest`
* [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install: `sudo -H pip install docker-compose docker dicttoxml kazoo PyMySQL psycopg2` * [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install: `sudo -H pip install docker-compose docker dicttoxml kazoo PyMySQL psycopg2 pymongo tzlocal`
(highly not recommended) If you really want to use OS packages on modern debian/ubuntu instead of "pip": `sudo apt install -y docker docker-compose python-pytest python-dicttoxml python-docker python-pymysql python-kazoo python-psycopg2` (highly not recommended) If you really want to use OS packages on modern debian/ubuntu instead of "pip": `sudo apt install -y docker docker-compose python-pytest python-dicttoxml python-docker python-pymysql python-kazoo python-psycopg2`

View File

@ -15,6 +15,8 @@ from kazoo.client import KazooClient
from kazoo.exceptions import KazooException from kazoo.exceptions import KazooException
import psycopg2 import psycopg2
import requests import requests
import base64
import pymongo
import docker import docker
from docker.errors import ContainerError from docker.errors import ContainerError
@ -98,6 +100,7 @@ class ClickHouseCluster:
self.with_kafka = False self.with_kafka = False
self.with_odbc_drivers = False self.with_odbc_drivers = False
self.with_hdfs = False self.with_hdfs = False
self.with_mongo = False
self.docker_client = None self.docker_client = None
self.is_up = False self.is_up = False
@ -109,7 +112,7 @@ class ClickHouseCluster:
cmd += " client" cmd += " client"
return cmd return cmd
def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macros={}, with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None, with_odbc_drivers=False, with_postgres=False, with_hdfs=False, hostname=None, env_variables={}, image="yandex/clickhouse-integration-test", stay_alive=False, ipv4_address=None, ipv6_address=None): def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macros={}, with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None, with_odbc_drivers=False, with_postgres=False, with_hdfs=False, with_mongo=False, hostname=None, env_variables={}, image="yandex/clickhouse-integration-test", stay_alive=False, ipv4_address=None, ipv6_address=None):
"""Add an instance to the cluster. """Add an instance to the cluster.
name - the name of the instance directory and the value of the 'instance' macro in ClickHouse. name - the name of the instance directory and the value of the 'instance' macro in ClickHouse.
@ -127,7 +130,7 @@ class ClickHouseCluster:
instance = ClickHouseInstance( instance = ClickHouseInstance(
self, self.base_dir, name, config_dir, main_configs, user_configs, macros, with_zookeeper, self, self.base_dir, name, config_dir, main_configs, user_configs, macros, with_zookeeper,
self.zookeeper_config_path, with_mysql, with_kafka, self.base_configs_dir, self.server_bin_path, self.zookeeper_config_path, with_mysql, with_kafka, with_mongo, self.base_configs_dir, self.server_bin_path,
self.odbc_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers, hostname=hostname, self.odbc_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers, hostname=hostname,
env_variables=env_variables, image=image, stay_alive=stay_alive, ipv4_address=ipv4_address, ipv6_address=ipv6_address) env_variables=env_variables, image=image, stay_alive=stay_alive, ipv4_address=ipv4_address, ipv6_address=ipv6_address)
@ -176,6 +179,11 @@ class ClickHouseCluster:
self.base_hdfs_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', self.base_hdfs_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_hdfs.yml')] self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_hdfs.yml')]
if with_mongo and not self.with_mongo:
self.with_mongo = True
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_mongo.yml')])
self.base_mongo_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mongo.yml')]
return instance return instance
@ -248,6 +256,20 @@ class ClickHouseCluster:
raise Exception("Can't wait HDFS to start") raise Exception("Can't wait HDFS to start")
def wait_mongo_to_start(self, timeout=30):
connection_str = 'mongodb://{user}:{password}@{host}:{port}'.format(
host='localhost', port='27018', user='root', password='clickhouse')
connection = pymongo.MongoClient(connection_str)
start = time.time()
while time.time() - start < timeout:
try:
connection.database_names()
print "Connected to Mongo dbs:", connection.database_names()
return
except Exception as ex:
print "Can't connect to Mongo " + str(ex)
time.sleep(1)
def start(self, destroy_dirs=True): def start(self, destroy_dirs=True):
if self.is_up: if self.is_up:
return return
@ -290,6 +312,10 @@ class ClickHouseCluster:
subprocess_check_call(self.base_hdfs_cmd + ['up', '-d', '--force-recreate']) subprocess_check_call(self.base_hdfs_cmd + ['up', '-d', '--force-recreate'])
self.wait_hdfs_to_start(120) self.wait_hdfs_to_start(120)
if self.with_mongo and self.base_mongo_cmd:
subprocess_check_call(self.base_mongo_cmd + ['up', '-d', '--force-recreate'])
self.wait_mongo_to_start(30)
subprocess_check_call(self.base_cmd + ['up', '-d', '--no-recreate']) subprocess_check_call(self.base_cmd + ['up', '-d', '--no-recreate'])
start_deadline = time.time() + 20.0 # seconds start_deadline = time.time() + 20.0 # seconds
@ -388,7 +414,7 @@ class ClickHouseInstance:
def __init__( def __init__(
self, cluster, base_path, name, custom_config_dir, custom_main_configs, custom_user_configs, macros, self, cluster, base_path, name, custom_config_dir, custom_main_configs, custom_user_configs, macros,
with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, base_configs_dir, server_bin_path, odbc_bridge_bin_path, with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_mongo, base_configs_dir, server_bin_path, odbc_bridge_bin_path,
clickhouse_path_dir, with_odbc_drivers, hostname=None, env_variables={}, image="yandex/clickhouse-integration-test", clickhouse_path_dir, with_odbc_drivers, hostname=None, env_variables={}, image="yandex/clickhouse-integration-test",
stay_alive=False, ipv4_address=None, ipv6_address=None): stay_alive=False, ipv4_address=None, ipv6_address=None):
@ -412,6 +438,7 @@ class ClickHouseInstance:
self.with_mysql = with_mysql self.with_mysql = with_mysql
self.with_kafka = with_kafka self.with_kafka = with_kafka
self.with_mongo = with_mongo
self.path = p.join(self.cluster.instances_dir, name) self.path = p.join(self.cluster.instances_dir, name)
self.docker_compose_path = p.join(self.path, 'docker_compose.yml') self.docker_compose_path = p.join(self.path, 'docker_compose.yml')
@ -456,10 +483,10 @@ class ClickHouseInstance:
return self.client.get_query_request(*args, **kwargs) return self.client.get_query_request(*args, **kwargs)
def exec_in_container(self, cmd, **kwargs): def exec_in_container(self, cmd, detach=False, **kwargs):
container = self.get_docker_handle() container = self.get_docker_handle()
exec_id = self.docker_client.api.exec_create(container.id, cmd, **kwargs) exec_id = self.docker_client.api.exec_create(container.id, cmd, **kwargs)
output = self.docker_client.api.exec_start(exec_id, detach=False) output = self.docker_client.api.exec_start(exec_id, detach=detach)
output = output.decode('utf8') output = output.decode('utf8')
exit_code = self.docker_client.api.exec_inspect(exec_id)['ExitCode'] exit_code = self.docker_client.api.exec_inspect(exec_id)['ExitCode']
@ -467,6 +494,13 @@ class ClickHouseInstance:
raise Exception('Cmd "{}" failed! Return code {}. Output: {}'.format(' '.join(cmd), exit_code, output)) raise Exception('Cmd "{}" failed! Return code {}. Output: {}'.format(' '.join(cmd), exit_code, output))
return output return output
def copy_file_to_container(self, local_path, dest_path):
with open(local_path, 'r') as fdata:
data = fdata.read()
encoded_data = base64.b64encode(data)
self.exec_in_container(["bash", "-c", "echo {} | base64 --decode > {}".format(encoded_data, dest_path)])
def get_docker_handle(self): def get_docker_handle(self):
return self.docker_client.containers.get(self.docker_id) return self.docker_client.containers.get(self.docker_id)

View File

@ -0,0 +1,10 @@
version: '2.2'
services:
mongo1:
image: mongo:3.6
restart: always
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: clickhouse
ports:
- 27018:27017

View File

@ -24,7 +24,7 @@ RUN apt-get update && env DEBIAN_FRONTEND=noninteractive apt-get install --yes -
ENV TZ=Europe/Moscow ENV TZ=Europe/Moscow
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
RUN pip install pytest docker-compose==1.22.0 docker dicttoxml kazoo PyMySQL psycopg2 RUN pip install pytest docker-compose==1.22.0 docker dicttoxml kazoo PyMySQL psycopg2 pymongo tzlocal
ENV DOCKER_CHANNEL stable ENV DOCKER_CHANNEL stable
ENV DOCKER_VERSION 17.09.1-ce ENV DOCKER_VERSION 17.09.1-ce

View File

@ -0,0 +1,30 @@
<?xml version="1.0"?>
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
<size>1000M</size>
<count>10</count>
</logger>
<tcp_port>9000</tcp_port>
<listen_host>127.0.0.1</listen_host>
<openSSL>
<client>
<cacheSessions>true</cacheSessions>
<verificationMode>none</verificationMode>
<invalidCertificateHandler>
<name>AcceptCertificateHandler</name>
</invalidCertificateHandler>
</client>
</openSSL>
<max_concurrent_queries>500</max_concurrent_queries>
<mark_cache_size>5368709120</mark_cache_size>
<path>./clickhouse/</path>
<users_config>users.xml</users_config>
<dictionaries_config>/etc/clickhouse-server/config.d/*.xml</dictionaries_config>
</yandex>

View File

@ -0,0 +1,23 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
</users>
<quotas>
<default>
</default>
</quotas>
</yandex>

View File

@ -0,0 +1,337 @@
#-*- coding: utf-8 -*-
import copy
class Layout(object):
LAYOUTS_STR_DICT = {
'flat': '<flat/>',
'hashed': '<hashed/>',
'cache': '<cache><size_in_cells>128</size_in_cells></cache>',
'complex_key_hashed': '<complex_key_hashed/>',
'complex_key_cache': '<complex_key_cache><size_in_cells>128</size_in_cells></complex_key_cache>',
'range_hashed': '<range_hashed/>'
}
def __init__(self, name):
self.name = name
self.is_complex = False
self.is_simple = False
self.is_ranged = False
if self.name.startswith('complex'):
self.layout_type = "complex"
self.is_complex = True
elif name.startswith("range"):
self.layout_type = "ranged"
self.is_ranged = True
else:
self.layout_type = "simple"
self.is_simple = True
def get_str(self):
return self.LAYOUTS_STR_DICT[self.name]
def get_key_block_name(self):
if self.is_complex:
return 'key'
else:
return 'id'
class Row(object):
def __init__(self, fields, values):
self.data = {}
for field, value in zip(fields, values):
self.data[field.name] = value
def get_value_by_name(self, name):
return self.data[name]
class Field(object):
def __init__(self, name, field_type, is_key=False, is_range_key=False, default=None, hierarchical=False, range_hash_type=None, default_value_for_get=None):
self.name = name
self.field_type = field_type
self.is_key = is_key
self.default = default
self.hierarchical = hierarchical
self.range_hash_type = range_hash_type
self.is_range = self.range_hash_type is not None
self.is_range_key = is_range_key
self.default_value_for_get = default_value_for_get
def get_attribute_str(self):
return '''
<attribute>
<name>{name}</name>
<type>{field_type}</type>
<null_value>{default}</null_value>
<hierarchical>{hierarchical}</hierarchical>
</attribute>'''.format(
name=self.name,
field_type=self.field_type,
default=self.default if self.default else '',
hierarchical='true' if self.hierarchical else 'false',
)
def get_simple_index_str(self):
return '<name>{name}</name>'.format(name=self.name)
def get_range_hash_str(self):
if not self.range_hash_type:
raise Exception("Field {} is not range hashed".format(self.name))
return '''
<range_{type}>
<name>{name}</name>
</range_{type}>
'''.format(type=self.range_hash_type, name=self.name)
class DictionaryStructure(object):
def __init__(self, layout, fields):
self.layout = layout
self.keys = []
self.range_key = None
self.ordinary_fields = []
self.range_fields = []
for field in fields:
if field.is_key:
self.keys.append(field)
elif field.is_range:
self.range_fields.append(field)
else:
self.ordinary_fields.append(field)
if field.is_range_key:
if self.range_key is not None:
raise Exception("Duplicate range key {}".format(field.name))
self.range_key = field
if not self.layout.is_complex and len(self.keys) > 1:
raise Exception("More than one key {} field in non complex layout {}".format(len(self.keys), self.layout.name))
if self.layout.is_ranged and (not self.range_key or len(self.range_fields) != 2):
raise Exception("Inconsistent configuration of ranged dictionary")
def get_structure_str(self):
fields_strs = []
for field in self.ordinary_fields:
fields_strs.append(field.get_attribute_str())
key_strs = []
if self.layout.is_complex:
for key_field in self.keys:
key_strs.append(key_field.get_attribute_str())
else: # same for simple and ranged
for key_field in self.keys:
key_strs.append(key_field.get_simple_index_str())
ranged_strs = []
if self.layout.is_ranged:
for range_field in self.range_fields:
ranged_strs.append(range_field.get_range_hash_str())
return '''
<layout>
{layout_str}
</layout>
<structure>
<{key_block_name}>
{key_str}
</{key_block_name}>
{range_strs}
{attributes_str}
</structure>'''.format(
layout_str=self.layout.get_str(),
key_block_name=self.layout.get_key_block_name(),
key_str='\n'.join(key_strs),
attributes_str='\n'.join(fields_strs),
range_strs='\n'.join(ranged_strs),
)
def get_ordered_names(self):
fields_strs = []
for key_field in self.keys:
fields_strs.append(key_field.name)
for range_field in self.range_fields:
fields_strs.append(range_field.name)
for field in self.ordinary_fields:
fields_strs.append(field.name)
return fields_strs
def get_all_fields(self):
return self.keys + self.range_fields + self.ordinary_fields
def _get_dict_get_common_expression(self, dict_name, field, row, or_default, with_type, has):
if field in self.keys:
raise Exception("Trying to receive key field {} from dictionary".format(field.name))
if not self.layout.is_complex:
if not or_default:
key_expr = ', toUInt64({})'.format(row.data[self.keys[0].name])
else:
key_expr = ', toUInt64({})'.format(self.keys[0].default_value_for_get)
else:
key_exprs_strs = []
for key in self.keys:
if not or_default:
val = row.data[key.name]
else:
val = key.default_value_for_get
if isinstance(val, str):
val = "'" + val + "'"
key_exprs_strs.append('to{type}({value})'.format(type=key.field_type, value=val))
key_expr = ', (' + ','.join(key_exprs_strs) + ')'
date_expr = ''
if self.layout.is_ranged:
val = row.data[self.range_key.name]
if isinstance(val, str):
val = "'" + val + "'"
val = "to{type}({val})".format(type=self.range_key.field_type, val=val)
date_expr = ', ' + val
if or_default:
raise Exception("Can create 'dictGetOrDefault' query for ranged dictionary")
if or_default:
or_default_expr = 'OrDefault'
if field.default_value_for_get is None:
raise Exception("Can create 'dictGetOrDefault' query for field {} without default_value_for_get".format(field.name))
val = field.default_value_for_get
if isinstance(val, str):
val = "'" + val + "'"
default_value_for_get = ', to{type}({value})'.format(type=field.field_type, value=val)
else:
or_default_expr = ''
default_value_for_get = ''
if with_type:
field_type = field.field_type
else:
field_type = ''
field_name = ", '" + field.name + "'"
if has:
what = "Has"
field_type = ''
or_default = ''
field_name = ''
date_expr = ''
def_for_get = ''
else:
what = "Get"
return "dict{what}{field_type}{or_default}('{dict_name}'{field_name}{key_expr}{date_expr}{def_for_get})".format(
what=what,
field_type=field_type,
dict_name=dict_name,
field_name=field_name,
key_expr=key_expr,
date_expr=date_expr,
or_default=or_default_expr,
def_for_get=default_value_for_get,
)
def get_get_expressions(self, dict_name, field, row):
return [
self._get_dict_get_common_expression(dict_name, field, row, or_default=False, with_type=False, has=False),
self._get_dict_get_common_expression(dict_name, field, row, or_default=False, with_type=True, has=False),
]
def get_get_or_default_expressions(self, dict_name, field, row):
if not self.layout.is_ranged:
return [
self._get_dict_get_common_expression(dict_name, field, row, or_default=True, with_type=False, has=False),
self._get_dict_get_common_expression(dict_name, field, row, or_default=True, with_type=True, has=False),
]
return []
def get_has_expressions(self, dict_name, field, row):
if not self.layout.is_ranged:
return [self._get_dict_get_common_expression(dict_name, field, row, or_default=False, with_type=False, has=True)]
return []
def get_hierarchical_expressions(self, dict_name, row):
if self.layout.is_simple:
key_expr = 'toUInt64({})'.format(row.data[self.keys[0].name])
return [
"dictGetHierarchy('{dict_name}', {key})".format(
dict_name=dict_name,
key=key_expr,
),
]
return []
def get_is_in_expressions(self, dict_name, row, parent_row):
if self.layout.is_simple:
child_key_expr = 'toUInt64({})'.format(row.data[self.keys[0].name])
parent_key_expr = 'toUInt64({})'.format(parent_row.data[self.keys[0].name])
return [
"dictIsIn('{dict_name}', {child_key}, {parent_key})".format(
dict_name=dict_name,
child_key=child_key_expr,
parent_key=parent_key_expr,)
]
return []
class Dictionary(object):
def __init__(self, name, structure, source, config_path, table_name):
self.name = name
self.structure = copy.deepcopy(structure)
self.source = copy.deepcopy(source)
self.config_path = config_path
self.table_name = table_name
def generate_config(self):
with open(self.config_path, 'w') as result:
result.write('''
<dictionaries>
<dictionary>
<lifetime>
<min>3</min>
<max>5</max>
</lifetime>
<name>{name}</name>
{structure}
<source>
{source}
</source>
</dictionary>
</dictionaries>
'''.format(
name=self.name,
structure=self.structure.get_structure_str(),
source=self.source.get_source_str(self.table_name),
))
def prepare_source(self, cluster):
self.source.prepare(self.structure, self.table_name, cluster)
def load_data(self, data):
if not self.source.prepared:
raise Exception("Cannot load data for dictionary {}, source is not prepared".format(self.name))
self.source.load_data(data, self.table_name)
def get_select_get_queries(self, field, row):
return ['select {}'.format(expr) for expr in self.structure.get_get_expressions(self.name, field, row)]
def get_select_get_or_default_queries(self, field, row):
return ['select {}'.format(expr) for expr in self.structure.get_get_or_default_expressions(self.name, field, row)]
def get_select_has_queries(self, field, row):
return ['select {}'.format(expr) for expr in self.structure.get_has_expressions(self.name, field, row)]
def get_hierarchical_queries(self, row):
return ['select {}'.format(expr) for expr in self.structure.get_hierarchical_expressions(self.name, row)]
def get_is_in_queries(self, row, parent_row):
return ['select {}'.format(expr) for expr in self.structure.get_is_in_expressions(self.name, row, parent_row)]
def is_complex(self):
return self.structure.layout.is_complex

View File

@ -0,0 +1,374 @@
# -*- coding: utf-8 -*-
import warnings
import pymysql.cursors
import pymongo
from tzlocal import get_localzone
import datetime
import os
class ExternalSource(object):
def __init__(self, name, internal_hostname, internal_port,
docker_hostname, docker_port, user, password):
self.name = name
self.internal_hostname = internal_hostname
self.internal_port = int(internal_port)
self.docker_hostname = docker_hostname
self.docker_port = int(docker_port)
self.user = user
self.password = password
def get_source_str(self, table_name):
raise NotImplementedError("Method {} is not implemented for {}".format(
"get_source_config_part", self.__class__.__name__))
def prepare(self, structure, table_name, cluster):
raise NotImplementedError("Method {} is not implemented for {}".format(
"prepare_remote_source", self.__class__.__name__))
# data is banch of Row
def load_data(self, data):
raise NotImplementedError("Method {} is not implemented for {}".format(
"prepare_remote_source", self.__class__.__name__))
def compatible_with_layout(self, layout):
return True
class SourceMySQL(ExternalSource):
TYPE_MAPPING = {
'UInt8': 'tinyint unsigned',
'UInt16': 'smallint unsigned',
'UInt32': 'int unsigned',
'UInt64': 'bigint unsigned',
'Int8': 'tinyint',
'Int16': 'smallint',
'Int32': 'int',
'Int64': 'bigint',
'UUID': 'varchar(36)',
'Date': 'date',
'DateTime': 'datetime',
'String': 'text',
'Float32': 'float',
'Float64': 'double'
}
def create_mysql_conn(self):
self.connection = pymysql.connect(
user=self.user,
password=self.password,
host=self.internal_hostname,
port=self.internal_port)
def execute_mysql_query(self, query):
with warnings.catch_warnings():
warnings.simplefilter("ignore")
with self.connection.cursor() as cursor:
cursor.execute(query)
self.connection.commit()
def get_source_str(self, table_name):
return '''
<mysql>
<replica>
<priority>1</priority>
<host>127.0.0.1</host>
<port>3333</port> <!-- Wrong port, for testing basic failover to work. -->
</replica>
<replica>
<priority>2</priority>
<host>{hostname}</host>
<port>{port}</port>
</replica>
<user>{user}</user>
<password>{password}</password>
<db>test</db>
<table>{tbl}</table>
</mysql>'''.format(
hostname=self.docker_hostname,
port=self.docker_port,
user=self.user,
password=self.password,
tbl=table_name,
)
def prepare(self, structure, table_name, cluster):
self.create_mysql_conn()
self.execute_mysql_query("create database if not exists test default character set 'utf8'")
fields_strs = []
for field in structure.keys + structure.ordinary_fields + structure.range_fields:
fields_strs.append(field.name + ' ' + self.TYPE_MAPPING[field.field_type])
create_query = '''create table test.{table_name} (
{fields_str});
'''.format(table_name=table_name, fields_str=','.join(fields_strs))
self.execute_mysql_query(create_query)
self.ordered_names = structure.get_ordered_names()
self.prepared = True
def load_data(self, data, table_name):
values_strs = []
if not data:
return
for row in data:
sorted_row = []
for name in self.ordered_names:
data = row.data[name]
if isinstance(row.data[name], str):
data = "'" + data + "'"
else:
data = str(data)
sorted_row.append(data)
values_strs.append('(' + ','.join(sorted_row) + ')')
query = 'insert into test.{} ({}) values {}'.format(
table_name,
','.join(self.ordered_names),
','.join(values_strs))
self.execute_mysql_query(query)
class SourceMongo(ExternalSource):
def get_source_str(self, table_name):
return '''
<mongodb>
<host>{host}</host>
<port>{port}</port>
<user>{user}</user>
<password>{password}</password>
<db>test</db>
<collection>{tbl}</collection>
</mongodb>
'''.format(
host=self.docker_hostname,
port=self.docker_port,
user=self.user,
password=self.password,
tbl=table_name,
)
def prepare(self, structure, table_name, cluster):
connection_str = 'mongodb://{user}:{password}@{host}:{port}'.format(
host=self.internal_hostname, port=self.internal_port,
user=self.user, password=self.password)
self.connection = pymongo.MongoClient(connection_str)
self.converters = {}
for field in structure.get_all_fields():
if field.field_type == "Date":
self.converters[field.name] = lambda x: datetime.datetime.strptime(x, "%Y-%m-%d")
elif field.field_type == "DateTime":
self.converters[field.name] = lambda x: get_localzone().localize(datetime.datetime.strptime(x, "%Y-%m-%d %H:%M:%S"))
else:
self.converters[field.name] = lambda x: x
self.db = self.connection['test']
self.db.add_user(self.user, self.password)
self.prepared = True
def load_data(self, data, table_name):
tbl = self.db[table_name]
to_insert = []
for row in data:
row_dict = {}
for cell_name, cell_value in row.data.items():
row_dict[cell_name] = self.converters[cell_name](cell_value)
to_insert.append(row_dict)
result = tbl.insert_many(to_insert)
class SourceClickHouse(ExternalSource):
def get_source_str(self, table_name):
return '''
<clickhouse>
<host>{host}</host>
<port>{port}</port>
<user>{user}</user>
<password>{password}</password>
<db>test</db>
<table>{tbl}</table>
</clickhouse>
'''.format(
host=self.docker_hostname,
port=self.docker_port,
user=self.user,
password=self.password,
tbl=table_name,
)
def prepare(self, structure, table_name, cluster):
self.node = cluster.instances[self.docker_hostname]
self.node.query("CREATE DATABASE IF NOT EXISTS test")
fields_strs = []
for field in structure.keys + structure.ordinary_fields + structure.range_fields:
fields_strs.append(field.name + ' ' + field.field_type)
create_query = '''CREATE TABLE test.{table_name} (
{fields_str}) ENGINE MergeTree ORDER BY tuple();
'''.format(table_name=table_name, fields_str=','.join(fields_strs))
self.node.query(create_query)
self.ordered_names = structure.get_ordered_names()
self.prepared = True
def load_data(self, data, table_name):
values_strs = []
if not data:
return
for row in data:
sorted_row = []
for name in self.ordered_names:
row_data = row.data[name]
if isinstance(row_data, str):
row_data = "'" + row_data + "'"
else:
row_data = str(row_data)
sorted_row.append(row_data)
values_strs.append('(' + ','.join(sorted_row) + ')')
query = 'INSERT INTO test.{} ({}) values {}'.format(
table_name,
','.join(self.ordered_names),
','.join(values_strs))
self.node.query(query)
class SourceFile(ExternalSource):
def get_source_str(self, table_name):
table_path = "/" + table_name + ".tsv"
return '''
<file>
<path>{path}</path>
<format>TabSeparated</format>
</file>
'''.format(
path=table_path,
)
def prepare(self, structure, table_name, cluster):
self.node = cluster.instances[self.docker_hostname]
path = "/" + table_name + ".tsv"
self.node.exec_in_container(["bash", "-c", "touch {}".format(path)])
self.ordered_names = structure.get_ordered_names()
self.prepared = True
def load_data(self, data, table_name):
if not data:
return
path = "/" + table_name + ".tsv"
for row in list(data):
sorted_row = []
for name in self.ordered_names:
sorted_row.append(str(row.data[name]))
str_data = '\t'.join(sorted_row)
self.node.exec_in_container(["bash", "-c", "echo \"{row}\" >> {fname}".format(row=str_data, fname=path)])
def compatible_with_layout(self, layout):
return 'cache' not in layout.name
class _SourceExecutableBase(ExternalSource):
def _get_cmd(self, path):
raise NotImplementedError("Method {} is not implemented for {}".format(
"_get_cmd", self.__class__.__name__))
def get_source_str(self, table_name):
table_path = "/" + table_name + ".tsv"
return '''
<executable>
<command>{cmd}</command>
<format>TabSeparated</format>
</executable>
'''.format(
cmd=self._get_cmd(table_path),
)
def prepare(self, structure, table_name, cluster):
self.node = cluster.instances[self.docker_hostname]
path = "/" + table_name + ".tsv"
self.node.exec_in_container(["bash", "-c", "touch {}".format(path)])
self.ordered_names = structure.get_ordered_names()
self.prepared = True
def load_data(self, data, table_name):
if not data:
return
path = "/" + table_name + ".tsv"
for row in list(data):
sorted_row = []
for name in self.ordered_names:
sorted_row.append(str(row.data[name]))
str_data = '\t'.join(sorted_row)
self.node.exec_in_container(["bash", "-c", "echo \"{row}\" >> {fname}".format(row=str_data, fname=path)])
class SourceExecutableCache(_SourceExecutableBase):
def _get_cmd(self, path):
return "cat {}".format(path)
def compatible_with_layout(self, layout):
return 'cache' not in layout.name
class SourceExecutableHashed(_SourceExecutableBase):
def _get_cmd(self, path):
return "cat - >/dev/null;cat {}".format(path)
def compatible_with_layout(self, layout):
return 'cache' in layout.name
class SourceHTTPBase(ExternalSource):
PORT_COUNTER = 5555
def get_source_str(self, table_name):
self.http_port = SourceHTTPBase.PORT_COUNTER
url = "{schema}://{host}:{port}/".format(schema=self._get_schema(), host=self.docker_hostname, port=self.http_port)
SourceHTTPBase.PORT_COUNTER += 1
return '''
<http>
<url>{url}</url>
<format>TabSeparated</format>
</http>
'''.format(url=url)
def prepare(self, structure, table_name, cluster):
self.node = cluster.instances[self.docker_hostname]
path = "/" + table_name + ".tsv"
self.node.exec_in_container(["bash", "-c", "touch {}".format(path)])
script_dir = os.path.dirname(os.path.realpath(__file__))
self.node.copy_file_to_container(os.path.join(script_dir, './http_server.py'), '/http_server.py')
self.node.copy_file_to_container(os.path.join(script_dir, './fake_cert.pem'), '/fake_cert.pem')
self.node.exec_in_container([
"bash",
"-c",
"python2 /http_server.py --data-path={tbl} --schema={schema} --host={host} --port={port} --cert-path=/fake_cert.pem".format(
tbl=path, schema=self._get_schema(), host=self.docker_hostname, port=self.http_port)
], detach=True)
self.ordered_names = structure.get_ordered_names()
self.prepared = True
def load_data(self, data, table_name):
if not data:
return
path = "/" + table_name + ".tsv"
for row in list(data):
sorted_row = []
for name in self.ordered_names:
sorted_row.append(str(row.data[name]))
str_data = '\t'.join(sorted_row)
self.node.exec_in_container(["bash", "-c", "echo \"{row}\" >> {fname}".format(row=str_data, fname=path)])
class SourceHTTP(SourceHTTPBase):
def _get_schema(self):
return "http"
class SourceHTTPS(SourceHTTPBase):
def _get_schema(self):
return "https"

View File

@ -0,0 +1,49 @@
-----BEGIN PRIVATE KEY-----
MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDDHnGYqN/ztiFE
rMQizbYiEpI/q/91bCDQ+xRes+gucKrr4qvQbosANYfpXgsaGizH24CpAXDvnFwC
oHqPmotHunJvG9uKiVvshy+tx1SNLZEN9DySri+8V+8fetn5PFxWQsKclMGCypyE
REV6H0vflPWmZRZWvAb5aaIxcRa2m3bTVUZPuY0wzCtc+ELPQ/sRc62gWH4bMlBo
0Wdai4+wcmpdcSR+rlZVDPt+ysxF/PcJFMAQ9CIRJRhXuK7Q/XCmAkagpH9tPPwY
SDMONTPhumXY7gCX4lmV9CflGJ6IpGmpEL04Rpr3gAcvz/w4JiMXgGpvtDjiJku9
qOdCYS/FAgMBAAECggEBAL/miULjlJ9VWZL5eE3ilGcebMhCmZUbK4td5cLenlRO
a0xkOydcEUm7XFihLboWVEScFgYibLi8x6Gtw9zI2oNJVJMCiwHN5qLSsonvqbDQ
SAG5XHnG5xwOQBht80O1ofsU3eKyS0AflaBgpRRfA3h6QL/OXBIiC5nx0ptd5kDh
HR0IHUcleBHt8I0d/PZbQE9oMOBlnMf8v2jGe80JXscQt2UabA/quCalDihhDt5J
qySfh4mDOrBOQEsmO/C1JCztQ6WZ2FVwRiITb/fRmsPadKJsIiMyy2w6NmP96v2a
V2ZqMvz9OZym8M2is4HR2pbn8XJ6vmW52fwNQhpWDgECgYEA8aiqF5df3j8YEDAX
XVAhIaubSLcS50qSk/p0/ZS9ETR1Uv8zjJDs6xBVBd4xXe/G2/XvvV6sGp4JcW3V
U66Ll3S1veMlnvCTjZUEi931EJbIdoyGACEG19QIVteSEhQkoSOk/Zx1lFSVm9UZ
hUV4JvWifQvLetS/v6MhnxSbTdUCgYEAzrK7+0gVT0a0szMs7CbeQVm80EWcqPea
p5jyLQHu+7vzcC8c9RRlqBPkxeG9BTt0sbBBJTrtvls15QaFoKCtTyjnrrLEHqu3
VZfIpjjrIhhvoRWP3A3r4DFMDGm/TOTUWEMSPJPXKe3uVm3buwVXWj4ipvhnAdr5
kJ+x1YqNIjECgYEAo0ISHzv53Vh8tjr3HehLacbYcmiUEcOUgPo8XTBGBsCM3pRg
S/+Av1FaT0uLyG17yBA/dYzm8liAAqxz6UPLNHf5bB5vxQ+8b3MUDjXWIO3s4gIP
aTjmuZqaQ6kBGsuW73H4PgmceagnJo7x3dJP2OoraxUz03i1Tg80YJd4UD0CgYBC
dzL/gJRpo6DjpuchIPaDKSoQBvJzWvt+PS5SzrZceHm1b1DudhqiS5NbFlXD4vSJ
VtX79NESTx4rgUdi+YgBVnP5tz5dZnZTrbU1zkO9+QGcWOSjrE5XD0MXEsITJdoq
b5bjp96eewYTAMyRfQwz1psp+eKVtCZgHRoAQsdTYQKBgQC7yBABJ4LDTie2C2n0
itO7SRT1tMfkNx8gK9RrgGawBUhD1EokmOKk+O1Ht6Cx7hqCd3Hsa4zc9se++jV1
Er+T8LW8FOFfAwtv8xggJtA8h6U8n6gIoq0EsSsWREJ4m9fDfZQnVTj8IPYvPHMr
Jv++IPqtFGG4O8IeWG+HY8mHxQ==
-----END PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
MIIDYDCCAkigAwIBAgIJAKSJ3I0ORzjtMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV
BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX
aWRnaXRzIFB0eSBMdGQwHhcNMTkwMjIyMDgxNTIzWhcNMjAwMjIyMDgxNTIzWjBF
MQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50
ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB
CgKCAQEAwx5xmKjf87YhRKzEIs22IhKSP6v/dWwg0PsUXrPoLnCq6+Kr0G6LADWH
6V4LGhosx9uAqQFw75xcAqB6j5qLR7pybxvbiolb7IcvrcdUjS2RDfQ8kq4vvFfv
H3rZ+TxcVkLCnJTBgsqchERFeh9L35T1pmUWVrwG+WmiMXEWtpt201VGT7mNMMwr
XPhCz0P7EXOtoFh+GzJQaNFnWouPsHJqXXEkfq5WVQz7fsrMRfz3CRTAEPQiESUY
V7iu0P1wpgJGoKR/bTz8GEgzDjUz4bpl2O4Al+JZlfQn5RieiKRpqRC9OEaa94AH
L8/8OCYjF4Bqb7Q44iZLvajnQmEvxQIDAQABo1MwUTAdBgNVHQ4EFgQU6P39PMY3
jRgJM0svz9XpHH8z7xUwHwYDVR0jBBgwFoAU6P39PMY3jRgJM0svz9XpHH8z7xUw
DwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAdIKBKlCIprCDGTtn
xatBlcpkbys4hQhHwkWn5aAPKE2oZlUOTEe90xxLJuciK+vCXTwQ3mgjGFc+ioAo
B7m3VL1DLmHCw5DQ2T/g8TjVjlKoaCj+9SZZPga5ygYJChx5HKFO4eek9stWo6hA
BmXndKhdX7mphUoSqUnQ+RwQ9XA0n6eTPqXAThWVqvLQgDj7Msz1XeFfqFqyD9MN
RocFg87aASTtwxYneG3IZCOQudlbHaRuEflHjlty2V5mNPjzcS2QK598i/5vmIoD
ZiUBXg+P8n+dklEa4qnQplDKERD20GtDgWtgYrfmpspLWNv8/bZ4h4gmGsH0+3uz
dHQNQA==
-----END CERTIFICATE-----

View File

@ -0,0 +1,43 @@
# -*- coding: utf-8 -*-
import argparse
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
import ssl
import csv
import os
def start_server(server_address, cert_path, data_path, schema):
class TSVHTTPHandler(BaseHTTPRequestHandler):
def _set_headers(self):
self.send_response(200)
self.send_header('Content-type', 'text/tsv')
self.end_headers()
def do_GET(self):
self._set_headers()
with open(data_path, 'r') as fl:
reader = csv.reader(fl, delimiter='\t')
for row in reader:
self.wfile.write('\t'.join(row) + '\n')
return
def do_POST(self):
return self.do_GET()
httpd = HTTPServer(server_address, TSVHTTPHandler)
if schema == 'https':
httpd.socket = ssl.wrap_socket(httpd.socket, certfile=cert_path, server_side=True)
httpd.serve_forever()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Simple HTTP server returns data from file")
parser.add_argument("--data-path", required=True)
parser.add_argument("--schema", choices=("http", "https"), required=True)
parser.add_argument("--host", default="localhost")
parser.add_argument("--port", default=5555, type=int)
parser.add_argument("--cert-path", default="./fake_cert.pem")
args = parser.parse_args()
start_server((args.host, args.port), args.cert_path, args.data_path, args.schema)

View File

@ -0,0 +1,264 @@
import pytest
import os
import time
from helpers.cluster import ClickHouseCluster
from dictionary import Field, Row, Dictionary, DictionaryStructure, Layout
from external_sources import SourceMySQL, SourceClickHouse, SourceFile, SourceExecutableCache, SourceExecutableHashed, SourceMongo
from external_sources import SourceHTTP, SourceHTTPS
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
FIELDS = {
"simple": [
Field("KeyField", 'UInt64', is_key=True, default_value_for_get=9999999),
Field("UInt8_", 'UInt8', default_value_for_get=55),
Field("UInt16_", 'UInt16', default_value_for_get=66),
Field("UInt32_", 'UInt32', default_value_for_get=77),
Field("UInt64_", 'UInt64', default_value_for_get=88),
Field("Int8_", 'Int8', default_value_for_get=-55),
Field("Int16_", 'Int16', default_value_for_get=-66),
Field("Int32_", 'Int32', default_value_for_get=-77),
Field("Int64_", 'Int64', default_value_for_get=-88),
Field("UUID_", 'UUID', default_value_for_get='550e8400-0000-0000-0000-000000000000'),
Field("Date_", 'Date', default_value_for_get='2018-12-30'),
Field("DateTime_", 'DateTime', default_value_for_get='2018-12-30 00:00:00'),
Field("String_", 'String', default_value_for_get='hi'),
Field("Float32_", 'Float32', default_value_for_get=555.11),
Field("Float64_", 'Float64', default_value_for_get=777.11),
Field("ParentKeyField", "UInt64", default_value_for_get=444, hierarchical=True)
],
"complex": [
Field("KeyField1", 'UInt64', is_key=True, default_value_for_get=9999999),
Field("KeyField2", 'String', is_key=True, default_value_for_get='xxxxxxxxx'),
Field("UInt8_", 'UInt8', default_value_for_get=55),
Field("UInt16_", 'UInt16', default_value_for_get=66),
Field("UInt32_", 'UInt32', default_value_for_get=77),
Field("UInt64_", 'UInt64', default_value_for_get=88),
Field("Int8_", 'Int8', default_value_for_get=-55),
Field("Int16_", 'Int16', default_value_for_get=-66),
Field("Int32_", 'Int32', default_value_for_get=-77),
Field("Int64_", 'Int64', default_value_for_get=-88),
Field("UUID_", 'UUID', default_value_for_get='550e8400-0000-0000-0000-000000000000'),
Field("Date_", 'Date', default_value_for_get='2018-12-30'),
Field("DateTime_", 'DateTime', default_value_for_get='2018-12-30 00:00:00'),
Field("String_", 'String', default_value_for_get='hi'),
Field("Float32_", 'Float32', default_value_for_get=555.11),
Field("Float64_", 'Float64', default_value_for_get=777.11),
],
"ranged": [
Field("KeyField1", 'UInt64', is_key=True),
Field("KeyField2", 'Date', is_range_key=True),
Field("StartDate", 'Date', range_hash_type='min'),
Field("EndDate", 'Date', range_hash_type='max'),
Field("UInt8_", 'UInt8', default_value_for_get=55),
Field("UInt16_", 'UInt16', default_value_for_get=66),
Field("UInt32_", 'UInt32', default_value_for_get=77),
Field("UInt64_", 'UInt64', default_value_for_get=88),
Field("Int8_", 'Int8', default_value_for_get=-55),
Field("Int16_", 'Int16', default_value_for_get=-66),
Field("Int32_", 'Int32', default_value_for_get=-77),
Field("Int64_", 'Int64', default_value_for_get=-88),
Field("UUID_", 'UUID', default_value_for_get='550e8400-0000-0000-0000-000000000000'),
Field("Date_", 'Date', default_value_for_get='2018-12-30'),
Field("DateTime_", 'DateTime', default_value_for_get='2018-12-30 00:00:00'),
Field("String_", 'String', default_value_for_get='hi'),
Field("Float32_", 'Float32', default_value_for_get=555.11),
Field("Float64_", 'Float64', default_value_for_get=777.11),
]
}
LAYOUTS = [
Layout("hashed"),
Layout("cache"),
Layout("flat"),
Layout("complex_key_hashed"),
Layout("complex_key_cache"),
Layout("range_hashed")
]
SOURCES = [
# some troubles with that dictionary
SourceMongo("MongoDB", "localhost", "27018", "mongo1", "27017", "root", "clickhouse"),
SourceMySQL("MySQL", "localhost", "3308", "mysql1", "3306", "root", "clickhouse"),
SourceClickHouse("RemoteClickHouse", "localhost", "9000", "clickhouse1", "9000", "default", ""),
SourceClickHouse("LocalClickHouse", "localhost", "9000", "node", "9000", "default", ""),
SourceFile("File", "localhost", "9000", "node", "9000", "", ""),
SourceExecutableHashed("ExecutableHashed", "localhost", "9000", "node", "9000", "", ""),
SourceExecutableCache("ExecutableCache", "localhost", "9000", "node", "9000", "", ""),
SourceHTTP("SourceHTTP", "localhost", "9000", "clickhouse1", "9000", "", ""),
SourceHTTPS("SourceHTTPS", "localhost", "9000", "clickhouse1", "9000", "", ""),
]
DICTIONARIES = []
cluster = None
node = None
def setup_module(module):
global DICTIONARIES
global cluster
global node
dict_configs_path = os.path.join(SCRIPT_DIR, 'configs/dictionaries')
for f in os.listdir(dict_configs_path):
os.remove(os.path.join(dict_configs_path, f))
for layout in LAYOUTS:
for source in SOURCES:
if source.compatible_with_layout(layout):
structure = DictionaryStructure(layout, FIELDS[layout.layout_type])
dict_name = source.name + "_" + layout.name
dict_path = os.path.join(dict_configs_path, dict_name + '.xml')
dictionary = Dictionary(dict_name, structure, source, dict_path, "table_" + dict_name)
dictionary.generate_config()
DICTIONARIES.append(dictionary)
else:
print "Source", source.name, "incompatible with layout", layout.name
main_configs = []
for fname in os.listdir(dict_configs_path):
main_configs.append(os.path.join(dict_configs_path, fname))
cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs'))
node = cluster.add_instance('node', main_configs=main_configs, with_mysql=True, with_mongo=True)
cluster.add_instance('clickhouse1')
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
for dictionary in DICTIONARIES:
print "Preparing", dictionary.name
dictionary.prepare_source(cluster)
print "Prepared"
yield cluster
finally:
cluster.shutdown()
def test_simple_dictionaries(started_cluster):
fields = FIELDS["simple"]
data = [
Row(fields,
[1, 22, 333, 4444, 55555, -6, -77,
-888, -999, '550e8400-e29b-41d4-a716-446655440003',
'1973-06-28', '1985-02-28 23:43:25', 'hello', 22.543, 3332154213.4, 0]),
Row(fields,
[2, 3, 4, 5, 6, -7, -8,
-9, -10, '550e8400-e29b-41d4-a716-446655440002',
'1978-06-28', '1986-02-28 23:42:25', 'hello', 21.543, 3222154213.4, 1]),
]
simple_dicts = [d for d in DICTIONARIES if d.structure.layout.layout_type == "simple"]
for dct in simple_dicts:
dct.load_data(data)
node.query("system reload dictionaries")
queries_with_answers = []
for dct in simple_dicts:
for row in data:
for field in fields:
if not field.is_key:
for query in dct.get_select_get_queries(field, row):
queries_with_answers.append((query, row.get_value_by_name(field.name)))
for query in dct.get_select_has_queries(field, row):
queries_with_answers.append((query, 1))
for query in dct.get_select_get_or_default_queries(field, row):
queries_with_answers.append((query, field.default_value_for_get))
for query in dct.get_hierarchical_queries(data[0]):
queries_with_answers.append((query, [1]))
for query in dct.get_hierarchical_queries(data[1]):
queries_with_answers.append((query, [2, 1]))
for query in dct.get_is_in_queries(data[0], data[1]):
queries_with_answers.append((query, 0))
for query in dct.get_is_in_queries(data[1], data[0]):
queries_with_answers.append((query, 1))
for query, answer in queries_with_answers:
print query
if isinstance(answer, list):
answer = str(answer).replace(' ', '')
assert node.query(query) == str(answer) + '\n'
def test_complex_dictionaries(started_cluster):
fields = FIELDS["complex"]
data = [
Row(fields,
[1, 'world', 22, 333, 4444, 55555, -6,
-77, -888, -999, '550e8400-e29b-41d4-a716-446655440003',
'1973-06-28', '1985-02-28 23:43:25',
'hello', 22.543, 3332154213.4]),
Row(fields,
[2, 'qwerty2', 52, 2345, 6544, 9191991, -2,
-717, -81818, -92929, '550e8400-e29b-41d4-a716-446655440007',
'1975-09-28', '2000-02-28 23:33:24',
'my', 255.543, 3332221.44]),
]
complex_dicts = [d for d in DICTIONARIES if d.structure.layout.layout_type == "complex"]
for dct in complex_dicts:
dct.load_data(data)
node.query("system reload dictionaries")
queries_with_answers = []
for dct in complex_dicts:
for row in data:
for field in fields:
if not field.is_key:
for query in dct.get_select_get_queries(field, row):
queries_with_answers.append((query, row.get_value_by_name(field.name)))
for query in dct.get_select_has_queries(field, row):
queries_with_answers.append((query, 1))
for query in dct.get_select_get_or_default_queries(field, row):
queries_with_answers.append((query, field.default_value_for_get))
for query, answer in queries_with_answers:
print query
assert node.query(query) == str(answer) + '\n'
def test_ranged_dictionaries(started_cluster):
fields = FIELDS["ranged"]
data = [
Row(fields,
[1, '2019-02-10', '2019-02-01', '2019-02-28',
22, 333, 4444, 55555, -6, -77, -888, -999,
'550e8400-e29b-41d4-a716-446655440003',
'1973-06-28', '1985-02-28 23:43:25', 'hello',
22.543, 3332154213.4]),
Row(fields,
[1, '2019-04-10', '2019-04-01', '2019-04-28',
11, 3223, 41444, 52515, -65, -747, -8388, -9099,
'550e8400-e29b-41d4-a716-446655440004',
'1973-06-29', '2002-02-28 23:23:25', '!!!!',
32.543, 3332543.4]),
]
ranged_dicts = [d for d in DICTIONARIES if d.structure.layout.layout_type == "ranged"]
for dct in ranged_dicts:
dct.load_data(data)
node.query("system reload dictionaries")
queries_with_answers = []
for dct in ranged_dicts:
for row in data:
for field in fields:
if not field.is_key and not field.is_range:
for query in dct.get_select_get_queries(field, row):
queries_with_answers.append((query, row.get_value_by_name(field.name)))
for query, answer in queries_with_answers:
print query
assert node.query(query) == str(answer) + '\n'

View File

@ -1,6 +1,6 @@
FROM ubuntu:18.04 FROM ubuntu:18.04
RUN apt-get update && apt-get -y install tzdata RUN apt-get update && apt-get -y install tzdata python
ENV TZ=Europe/Moscow ENV TZ=Europe/Moscow
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone