More tests

This commit is contained in:
alesapin 2019-02-21 19:43:21 +03:00
parent 010a7e00ee
commit 70fdbca747
3 changed files with 193 additions and 25 deletions

View File

@ -1,4 +1,5 @@
#-*- coding: utf-8 -*- #-*- coding: utf-8 -*-
import copy
class Layout(object): class Layout(object):
@ -135,8 +136,8 @@ class DictionaryStructure(object):
<{key_block_name}> <{key_block_name}>
{key_str} {key_str}
</{key_block_name}> </{key_block_name}>
{attributes_str}
{range_strs} {range_strs}
{attributes_str}
</structure>'''.format( </structure>'''.format(
layout_str=self.layout.get_str(), layout_str=self.layout.get_str(),
key_block_name=self.layout.get_key_block_name(), key_block_name=self.layout.get_key_block_name(),
@ -145,6 +146,18 @@ class DictionaryStructure(object):
range_strs='\n'.join(ranged_strs), range_strs='\n'.join(ranged_strs),
) )
def get_ordered_names(self):
fields_strs = []
for key_field in self.keys:
fields_strs.append(key_field.name)
for range_field in self.range_fields:
fields_strs.append(range_field.name)
for field in self.ordinary_fields:
fields_strs.append(field.name)
return fields_strs
def get_dict_get_expression(self, dict_name, field, row): def get_dict_get_expression(self, dict_name, field, row):
if field in self.keys: if field in self.keys:
raise Exception("Trying to receive key field {} from dictionary".format(field.name)) raise Exception("Trying to receive key field {} from dictionary".format(field.name))
@ -181,8 +194,8 @@ class DictionaryStructure(object):
class Dictionary(object): class Dictionary(object):
def __init__(self, name, structure, source, config_path, table_name): def __init__(self, name, structure, source, config_path, table_name):
self.name = name self.name = name
self.structure = structure self.structure = copy.deepcopy(structure)
self.source = source self.source = copy.deepcopy(source)
self.config_path = config_path self.config_path = config_path
self.table_name = table_name self.table_name = table_name
@ -208,8 +221,8 @@ class Dictionary(object):
source=self.source.get_source_str(self.table_name), source=self.source.get_source_str(self.table_name),
)) ))
def prepare_source(self): def prepare_source(self, cluster):
self.source.prepare(self.structure, self.table_name) self.source.prepare(self.structure, self.table_name, cluster)
def load_data(self, data): def load_data(self, data):
if not self.source.prepared: if not self.source.prepared:

View File

@ -2,6 +2,8 @@
import warnings import warnings
import pymysql.cursors import pymysql.cursors
import pymongo import pymongo
import subprocess
import copy
class ExternalSource(object): class ExternalSource(object):
def __init__(self, name, internal_hostname, internal_port, def __init__(self, name, internal_hostname, internal_port,
@ -14,11 +16,11 @@ class ExternalSource(object):
self.user = user self.user = user
self.password = password self.password = password
def get_source_str(self): def get_source_str(self, table_name):
raise NotImplementedError("Method {} is not implemented for {}".format( raise NotImplementedError("Method {} is not implemented for {}".format(
"get_source_config_part", self.__class__.__name__)) "get_source_config_part", self.__class__.__name__))
def prepare(self, structure): def prepare(self, structure, table_name, cluster):
raise NotImplementedError("Method {} is not implemented for {}".format( raise NotImplementedError("Method {} is not implemented for {}".format(
"prepare_remote_source", self.__class__.__name__)) "prepare_remote_source", self.__class__.__name__))
@ -27,6 +29,10 @@ class ExternalSource(object):
raise NotImplementedError("Method {} is not implemented for {}".format( raise NotImplementedError("Method {} is not implemented for {}".format(
"prepare_remote_source", self.__class__.__name__)) "prepare_remote_source", self.__class__.__name__))
def compatible_with_layout(self, layout):
return True
class SourceMySQL(ExternalSource): class SourceMySQL(ExternalSource):
TYPE_MAPPING = { TYPE_MAPPING = {
'UInt8': 'tinyint unsigned', 'UInt8': 'tinyint unsigned',
@ -83,7 +89,7 @@ class SourceMySQL(ExternalSource):
tbl=table_name, tbl=table_name,
) )
def prepare(self, structure, table_name): def prepare(self, structure, table_name, cluster):
self.create_mysql_conn() self.create_mysql_conn()
self.execute_mysql_query("create database if not exists test default character set 'utf8'") self.execute_mysql_query("create database if not exists test default character set 'utf8'")
fields_strs = [] fields_strs = []
@ -93,16 +99,16 @@ class SourceMySQL(ExternalSource):
{fields_str}); {fields_str});
'''.format(table_name=table_name, fields_str=','.join(fields_strs)) '''.format(table_name=table_name, fields_str=','.join(fields_strs))
self.execute_mysql_query(create_query) self.execute_mysql_query(create_query)
self.ordered_names = structure.get_ordered_names()
self.prepared = True self.prepared = True
def load_data(self, data, table_name): def load_data(self, data, table_name):
values_strs = [] values_strs = []
if not data: if not data:
return return
ordered_names = [name for name in data[0].data]
for row in data: for row in data:
sorted_row = [] sorted_row = []
for name in ordered_names: for name in self.ordered_names:
data = row.data[name] data = row.data[name]
if isinstance(row.data[name], str): if isinstance(row.data[name], str):
data = "'" + data + "'" data = "'" + data + "'"
@ -112,7 +118,7 @@ class SourceMySQL(ExternalSource):
values_strs.append('(' + ','.join(sorted_row) + ')') values_strs.append('(' + ','.join(sorted_row) + ')')
query = 'insert into test.{} ({}) values {}'.format( query = 'insert into test.{} ({}) values {}'.format(
table_name, table_name,
','.join(ordered_names), ','.join(self.ordered_names),
''.join(values_strs)) ''.join(values_strs))
self.execute_mysql_query(query) self.execute_mysql_query(query)
@ -137,7 +143,7 @@ class SourceMongo(ExternalSource):
tbl=table_name, tbl=table_name,
) )
def prepare(self, structure, table_name): def prepare(self, structure, table_name, cluster):
connection_str = 'mongodb://{user}:{password}@{host}:{port}'.format( connection_str = 'mongodb://{user}:{password}@{host}:{port}'.format(
host=self.internal_hostname, port=self.internal_port, host=self.internal_hostname, port=self.internal_port,
user=self.user, password=self.password) user=self.user, password=self.password)
@ -151,6 +157,147 @@ class SourceMongo(ExternalSource):
tbl = self.db[table_name] tbl = self.db[table_name]
to_insert = [dict(row.data) for row in data] to_insert = [dict(row.data) for row in data]
result = tbl.insert_many(to_insert) result = tbl.insert_many(to_insert)
print "IDS:", result.inserted_ids
for r in tbl.find(): class SourceClickHouse(ExternalSource):
print "RESULT:", r
def get_source_str(self, table_name):
return '''
<clickhouse>
<host>{host}</host>
<port>{port}</port>
<user>{user}</user>
<password>{password}</password>
<db>test</db>
<table>{tbl}</table>
</clickhouse>
'''.format(
host=self.docker_hostname,
port=self.docker_port,
user=self.user,
password=self.password,
tbl=table_name,
)
def prepare(self, structure, table_name, cluster):
self.node = cluster.instances[self.docker_hostname]
self.node.query("CREATE DATABASE IF NOT EXISTS test")
fields_strs = []
for field in structure.keys + structure.ordinary_fields + structure.range_fields:
fields_strs.append(field.name + ' ' + field.field_type)
create_query = '''CREATE TABLE test.{table_name} (
{fields_str}) ENGINE MergeTree ORDER BY tuple();
'''.format(table_name=table_name, fields_str=','.join(fields_strs))
self.node.query(create_query)
self.ordered_names = structure.get_ordered_names()
self.prepared = True
def load_data(self, data, table_name):
values_strs = []
if not data:
return
for row in data:
sorted_row = []
for name in self.ordered_names:
row_data = row.data[name]
if isinstance(row_data, str):
row_data = "'" + row_data + "'"
else:
row_data = str(row_data)
sorted_row.append(row_data)
values_strs.append('(' + ','.join(sorted_row) + ')')
query = 'INSERT INTO test.{} ({}) values {}'.format(
table_name,
','.join(self.ordered_names),
''.join(values_strs))
self.node.query(query)
class SourceFile(ExternalSource):
def get_source_str(self, table_name):
table_path = "/" + table_name + ".tsv"
return '''
<file>
<path>{path}</path>
<format>TabSeparated</format>
</file>
'''.format(
path=table_path,
)
def prepare(self, structure, table_name, cluster):
self.node = cluster.instances[self.docker_hostname]
path = "/" + table_name + ".tsv"
self.node.exec_in_container(["bash", "-c", "touch {}".format(path)])
self.ordered_names = structure.get_ordered_names()
self.prepared = True
def load_data(self, data, table_name):
if not data:
return
path = "/" + table_name + ".tsv"
for row in list(data):
sorted_row = []
for name in self.ordered_names:
sorted_row.append(str(row.data[name]))
str_data = '\t'.join(sorted_row)
self.node.exec_in_container(["bash", "-c", "echo \"{row}\" >> {fname}".format(row=str_data, fname=path)])
def compatible_with_layout(self, layout):
return 'cache' not in layout.name
class _SourceExecutableBase(ExternalSource):
def _get_cmd(self, path):
raise NotImplementedError("Method {} is not implemented for {}".format(
"_get_cmd", self.__class__.__name__))
def get_source_str(self, table_name):
table_path = "/" + table_name + ".tsv"
return '''
<executable>
<command>{cmd}</command>
<format>TabSeparated</format>
</executable>
'''.format(
cmd=self._get_cmd(table_path),
)
def prepare(self, structure, table_name, cluster):
self.node = cluster.instances[self.docker_hostname]
path = "/" + table_name + ".tsv"
self.node.exec_in_container(["bash", "-c", "touch {}".format(path)])
self.ordered_names = structure.get_ordered_names()
self.prepared = True
def load_data(self, data, table_name):
if not data:
return
path = "/" + table_name + ".tsv"
for row in list(data):
sorted_row = []
for name in self.ordered_names:
sorted_row.append(str(row.data[name]))
str_data = '\t'.join(sorted_row)
self.node.exec_in_container(["bash", "-c", "echo \"{row}\" >> {fname}".format(row=str_data, fname=path)])
class SourceExecutableCache(_SourceExecutableBase):
def _get_cmd(self, path):
return "cat {}".format(path)
def compatible_with_layout(self, layout):
return 'cache' not in layout.name
class SourceExecutableHashed(_SourceExecutableBase):
def _get_cmd(self, path):
return "cat - >/dev/null;cat {}".format(path)
def compatible_with_layout(self, layout):
return 'cache' in layout.name

View File

@ -4,7 +4,7 @@ import time
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster
from dictionary import Field, Row, Dictionary, DictionaryStructure, Layout from dictionary import Field, Row, Dictionary, DictionaryStructure, Layout
from external_sources import SourceMySQL, SourceMongo from external_sources import SourceMySQL, SourceMongo, SourceClickHouse, SourceFile, SourceExecutableCache, SourceExecutableHashed
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@ -68,8 +68,8 @@ FIELDS = {
} }
LAYOUTS = [ LAYOUTS = [
Layout("cache"),
Layout("hashed"), Layout("hashed"),
Layout("cache"),
Layout("flat"), Layout("flat"),
Layout("complex_key_hashed"), Layout("complex_key_hashed"),
Layout("complex_key_cache"), Layout("complex_key_cache"),
@ -79,6 +79,11 @@ LAYOUTS = [
SOURCES = [ SOURCES = [
#SourceMongo("MongoDB", "localhost", "27018", "mongo1", "27017", "root", "clickhouse"), #SourceMongo("MongoDB", "localhost", "27018", "mongo1", "27017", "root", "clickhouse"),
SourceMySQL("MySQL", "localhost", "3308", "mysql1", "3306", "root", "clickhouse"), SourceMySQL("MySQL", "localhost", "3308", "mysql1", "3306", "root", "clickhouse"),
SourceClickHouse("RemoteClickHouse", "localhost", "9000", "clickhouse1", "9000", "default", ""),
SourceClickHouse("LocalClickHouse", "localhost", "9000", "node", "9000", "default", ""),
SourceFile("File", "localhost", "9000", "node", "9000", "", ""),
SourceExecutableHashed("ExecutableHashed", "localhost", "9000", "node", "9000", "", ""),
SourceExecutableCache("ExecutableCache", "localhost", "9000", "node", "9000", "", ""),
] ]
DICTIONARIES = [] DICTIONARIES = []
@ -97,19 +102,22 @@ def setup_module(module):
for layout in LAYOUTS: for layout in LAYOUTS:
for source in SOURCES: for source in SOURCES:
structure = DictionaryStructure(layout, FIELDS[layout.layout_type]) if source.compatible_with_layout(layout):
dict_name = source.name + "_" + layout.name structure = DictionaryStructure(layout, FIELDS[layout.layout_type])
dict_path = os.path.join(dict_configs_path, dict_name + '.xml') dict_name = source.name + "_" + layout.name
dictionary = Dictionary(dict_name, structure, source, dict_path, "table_" + dict_name) dict_path = os.path.join(dict_configs_path, dict_name + '.xml')
print dict_name dictionary = Dictionary(dict_name, structure, source, dict_path, "table_" + dict_name)
dictionary.generate_config() dictionary.generate_config()
DICTIONARIES.append(dictionary) DICTIONARIES.append(dictionary)
else:
print "Source", source.name, "incompatible with layout", layout.name
main_configs = [] main_configs = []
for fname in os.listdir(dict_configs_path): for fname in os.listdir(dict_configs_path):
main_configs.append(os.path.join(dict_configs_path, fname)) main_configs.append(os.path.join(dict_configs_path, fname))
cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs')) cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs'))
node = cluster.add_instance('node', main_configs=main_configs, with_mysql=True, with_mongo=True) node = cluster.add_instance('node', main_configs=main_configs, with_mysql=True, with_mongo=True)
cluster.add_instance('clickhouse1')
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def started_cluster(): def started_cluster():
@ -117,7 +125,7 @@ def started_cluster():
cluster.start() cluster.start()
for dictionary in DICTIONARIES: for dictionary in DICTIONARIES:
print "Preparing", dictionary.name print "Preparing", dictionary.name
dictionary.prepare_source() dictionary.prepare_source(cluster)
print "Prepared" print "Prepared"
yield cluster yield cluster