diff --git a/dbms/tests/integration/test_external_dictionaries/dictionary.py b/dbms/tests/integration/test_external_dictionaries/dictionary.py
index 41c5d0bcf8e..86014c0bd40 100644
--- a/dbms/tests/integration/test_external_dictionaries/dictionary.py
+++ b/dbms/tests/integration/test_external_dictionaries/dictionary.py
@@ -1,4 +1,5 @@
#-*- coding: utf-8 -*-
+import copy
class Layout(object):
@@ -135,8 +136,8 @@ class DictionaryStructure(object):
<{key_block_name}>
{key_str}
{key_block_name}>
- {attributes_str}
{range_strs}
+ {attributes_str}
'''.format(
layout_str=self.layout.get_str(),
key_block_name=self.layout.get_key_block_name(),
@@ -145,6 +146,18 @@ class DictionaryStructure(object):
range_strs='\n'.join(ranged_strs),
)
+ def get_ordered_names(self):
+ fields_strs = []
+ for key_field in self.keys:
+ fields_strs.append(key_field.name)
+ for range_field in self.range_fields:
+ fields_strs.append(range_field.name)
+ for field in self.ordinary_fields:
+ fields_strs.append(field.name)
+ return fields_strs
+
+
+
def get_dict_get_expression(self, dict_name, field, row):
if field in self.keys:
raise Exception("Trying to receive key field {} from dictionary".format(field.name))
@@ -181,8 +194,8 @@ class DictionaryStructure(object):
class Dictionary(object):
def __init__(self, name, structure, source, config_path, table_name):
self.name = name
- self.structure = structure
- self.source = source
+ self.structure = copy.deepcopy(structure)
+ self.source = copy.deepcopy(source)
self.config_path = config_path
self.table_name = table_name
@@ -208,8 +221,8 @@ class Dictionary(object):
source=self.source.get_source_str(self.table_name),
))
- def prepare_source(self):
- self.source.prepare(self.structure, self.table_name)
+ def prepare_source(self, cluster):
+ self.source.prepare(self.structure, self.table_name, cluster)
def load_data(self, data):
if not self.source.prepared:
diff --git a/dbms/tests/integration/test_external_dictionaries/external_sources.py b/dbms/tests/integration/test_external_dictionaries/external_sources.py
index 7e3ecfa5007..5a7f2536d2c 100644
--- a/dbms/tests/integration/test_external_dictionaries/external_sources.py
+++ b/dbms/tests/integration/test_external_dictionaries/external_sources.py
@@ -2,6 +2,8 @@
import warnings
import pymysql.cursors
import pymongo
+import subprocess
+import copy
class ExternalSource(object):
def __init__(self, name, internal_hostname, internal_port,
@@ -14,11 +16,11 @@ class ExternalSource(object):
self.user = user
self.password = password
- def get_source_str(self):
+ def get_source_str(self, table_name):
raise NotImplementedError("Method {} is not implemented for {}".format(
"get_source_config_part", self.__class__.__name__))
- def prepare(self, structure):
+ def prepare(self, structure, table_name, cluster):
raise NotImplementedError("Method {} is not implemented for {}".format(
"prepare_remote_source", self.__class__.__name__))
@@ -27,6 +29,10 @@ class ExternalSource(object):
raise NotImplementedError("Method {} is not implemented for {}".format(
"prepare_remote_source", self.__class__.__name__))
+ def compatible_with_layout(self, layout):
+ return True
+
+
class SourceMySQL(ExternalSource):
TYPE_MAPPING = {
'UInt8': 'tinyint unsigned',
@@ -83,7 +89,7 @@ class SourceMySQL(ExternalSource):
tbl=table_name,
)
- def prepare(self, structure, table_name):
+ def prepare(self, structure, table_name, cluster):
self.create_mysql_conn()
self.execute_mysql_query("create database if not exists test default character set 'utf8'")
fields_strs = []
@@ -93,16 +99,16 @@ class SourceMySQL(ExternalSource):
{fields_str});
'''.format(table_name=table_name, fields_str=','.join(fields_strs))
self.execute_mysql_query(create_query)
+ self.ordered_names = structure.get_ordered_names()
self.prepared = True
def load_data(self, data, table_name):
values_strs = []
if not data:
return
- ordered_names = [name for name in data[0].data]
for row in data:
sorted_row = []
- for name in ordered_names:
+ for name in self.ordered_names:
data = row.data[name]
if isinstance(row.data[name], str):
data = "'" + data + "'"
@@ -112,7 +118,7 @@ class SourceMySQL(ExternalSource):
values_strs.append('(' + ','.join(sorted_row) + ')')
query = 'insert into test.{} ({}) values {}'.format(
table_name,
- ','.join(ordered_names),
+ ','.join(self.ordered_names),
''.join(values_strs))
self.execute_mysql_query(query)
@@ -137,7 +143,7 @@ class SourceMongo(ExternalSource):
tbl=table_name,
)
- def prepare(self, structure, table_name):
+ def prepare(self, structure, table_name, cluster):
connection_str = 'mongodb://{user}:{password}@{host}:{port}'.format(
host=self.internal_hostname, port=self.internal_port,
user=self.user, password=self.password)
@@ -151,6 +157,147 @@ class SourceMongo(ExternalSource):
tbl = self.db[table_name]
to_insert = [dict(row.data) for row in data]
result = tbl.insert_many(to_insert)
- print "IDS:", result.inserted_ids
- for r in tbl.find():
- print "RESULT:", r
+
+class SourceClickHouse(ExternalSource):
+
+ def get_source_str(self, table_name):
+ return '''
+
+ {host}
+ {port}
+ {user}
+ {password}
+ test
+
+
+ '''.format(
+ host=self.docker_hostname,
+ port=self.docker_port,
+ user=self.user,
+ password=self.password,
+ tbl=table_name,
+ )
+
+ def prepare(self, structure, table_name, cluster):
+ self.node = cluster.instances[self.docker_hostname]
+ self.node.query("CREATE DATABASE IF NOT EXISTS test")
+ fields_strs = []
+ for field in structure.keys + structure.ordinary_fields + structure.range_fields:
+ fields_strs.append(field.name + ' ' + field.field_type)
+ create_query = '''CREATE TABLE test.{table_name} (
+ {fields_str}) ENGINE MergeTree ORDER BY tuple();
+ '''.format(table_name=table_name, fields_str=','.join(fields_strs))
+ self.node.query(create_query)
+ self.ordered_names = structure.get_ordered_names()
+ self.prepared = True
+
+ def load_data(self, data, table_name):
+ values_strs = []
+ if not data:
+ return
+ for row in data:
+ sorted_row = []
+ for name in self.ordered_names:
+ row_data = row.data[name]
+ if isinstance(row_data, str):
+ row_data = "'" + row_data + "'"
+ else:
+ row_data = str(row_data)
+ sorted_row.append(row_data)
+ values_strs.append('(' + ','.join(sorted_row) + ')')
+ query = 'INSERT INTO test.{} ({}) values {}'.format(
+ table_name,
+ ','.join(self.ordered_names),
+ ''.join(values_strs))
+ self.node.query(query)
+
+
+class SourceFile(ExternalSource):
+
+ def get_source_str(self, table_name):
+ table_path = "/" + table_name + ".tsv"
+ return '''
+
+ {path}
+ TabSeparated
+
+ '''.format(
+ path=table_path,
+ )
+
+ def prepare(self, structure, table_name, cluster):
+ self.node = cluster.instances[self.docker_hostname]
+ path = "/" + table_name + ".tsv"
+ self.node.exec_in_container(["bash", "-c", "touch {}".format(path)])
+ self.ordered_names = structure.get_ordered_names()
+ self.prepared = True
+
+ def load_data(self, data, table_name):
+ if not data:
+ return
+ path = "/" + table_name + ".tsv"
+ for row in list(data):
+ sorted_row = []
+ for name in self.ordered_names:
+ sorted_row.append(str(row.data[name]))
+
+ str_data = '\t'.join(sorted_row)
+ self.node.exec_in_container(["bash", "-c", "echo \"{row}\" >> {fname}".format(row=str_data, fname=path)])
+
+ def compatible_with_layout(self, layout):
+ return 'cache' not in layout.name
+
+
+class _SourceExecutableBase(ExternalSource):
+
+ def _get_cmd(self, path):
+ raise NotImplementedError("Method {} is not implemented for {}".format(
+ "_get_cmd", self.__class__.__name__))
+
+ def get_source_str(self, table_name):
+ table_path = "/" + table_name + ".tsv"
+ return '''
+
+ {cmd}
+ TabSeparated
+
+ '''.format(
+ cmd=self._get_cmd(table_path),
+ )
+
+ def prepare(self, structure, table_name, cluster):
+ self.node = cluster.instances[self.docker_hostname]
+ path = "/" + table_name + ".tsv"
+ self.node.exec_in_container(["bash", "-c", "touch {}".format(path)])
+ self.ordered_names = structure.get_ordered_names()
+ self.prepared = True
+
+ def load_data(self, data, table_name):
+ if not data:
+ return
+ path = "/" + table_name + ".tsv"
+ for row in list(data):
+ sorted_row = []
+ for name in self.ordered_names:
+ sorted_row.append(str(row.data[name]))
+
+ str_data = '\t'.join(sorted_row)
+ self.node.exec_in_container(["bash", "-c", "echo \"{row}\" >> {fname}".format(row=str_data, fname=path)])
+
+
+class SourceExecutableCache(_SourceExecutableBase):
+
+ def _get_cmd(self, path):
+ return "cat {}".format(path)
+
+ def compatible_with_layout(self, layout):
+ return 'cache' not in layout.name
+
+
+class SourceExecutableHashed(_SourceExecutableBase):
+
+ def _get_cmd(self, path):
+ return "cat - >/dev/null;cat {}".format(path)
+
+ def compatible_with_layout(self, layout):
+ return 'cache' in layout.name
diff --git a/dbms/tests/integration/test_external_dictionaries/test.py b/dbms/tests/integration/test_external_dictionaries/test.py
index ac4ce47f63d..12fa08dd9c6 100644
--- a/dbms/tests/integration/test_external_dictionaries/test.py
+++ b/dbms/tests/integration/test_external_dictionaries/test.py
@@ -4,7 +4,7 @@ import time
from helpers.cluster import ClickHouseCluster
from dictionary import Field, Row, Dictionary, DictionaryStructure, Layout
-from external_sources import SourceMySQL, SourceMongo
+from external_sources import SourceMySQL, SourceMongo, SourceClickHouse, SourceFile, SourceExecutableCache, SourceExecutableHashed
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@@ -68,8 +68,8 @@ FIELDS = {
}
LAYOUTS = [
- Layout("cache"),
Layout("hashed"),
+ Layout("cache"),
Layout("flat"),
Layout("complex_key_hashed"),
Layout("complex_key_cache"),
@@ -79,6 +79,11 @@ LAYOUTS = [
SOURCES = [
#SourceMongo("MongoDB", "localhost", "27018", "mongo1", "27017", "root", "clickhouse"),
SourceMySQL("MySQL", "localhost", "3308", "mysql1", "3306", "root", "clickhouse"),
+ SourceClickHouse("RemoteClickHouse", "localhost", "9000", "clickhouse1", "9000", "default", ""),
+ SourceClickHouse("LocalClickHouse", "localhost", "9000", "node", "9000", "default", ""),
+ SourceFile("File", "localhost", "9000", "node", "9000", "", ""),
+ SourceExecutableHashed("ExecutableHashed", "localhost", "9000", "node", "9000", "", ""),
+ SourceExecutableCache("ExecutableCache", "localhost", "9000", "node", "9000", "", ""),
]
DICTIONARIES = []
@@ -97,19 +102,22 @@ def setup_module(module):
for layout in LAYOUTS:
for source in SOURCES:
- structure = DictionaryStructure(layout, FIELDS[layout.layout_type])
- dict_name = source.name + "_" + layout.name
- dict_path = os.path.join(dict_configs_path, dict_name + '.xml')
- dictionary = Dictionary(dict_name, structure, source, dict_path, "table_" + dict_name)
- print dict_name
- dictionary.generate_config()
- DICTIONARIES.append(dictionary)
+ if source.compatible_with_layout(layout):
+ structure = DictionaryStructure(layout, FIELDS[layout.layout_type])
+ dict_name = source.name + "_" + layout.name
+ dict_path = os.path.join(dict_configs_path, dict_name + '.xml')
+ dictionary = Dictionary(dict_name, structure, source, dict_path, "table_" + dict_name)
+ dictionary.generate_config()
+ DICTIONARIES.append(dictionary)
+ else:
+ print "Source", source.name, "incompatible with layout", layout.name
main_configs = []
for fname in os.listdir(dict_configs_path):
main_configs.append(os.path.join(dict_configs_path, fname))
cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs'))
node = cluster.add_instance('node', main_configs=main_configs, with_mysql=True, with_mongo=True)
+ cluster.add_instance('clickhouse1')
@pytest.fixture(scope="module")
def started_cluster():
@@ -117,7 +125,7 @@ def started_cluster():
cluster.start()
for dictionary in DICTIONARIES:
print "Preparing", dictionary.name
- dictionary.prepare_source()
+ dictionary.prepare_source(cluster)
print "Prepared"
yield cluster