ClickHouse/tests/integration/helpers/external_sources.py

759 lines
23 KiB
Python
Raw Normal View History

2019-02-21 12:04:08 +00:00
# -*- coding: utf-8 -*-
import datetime
import os
import uuid
2019-02-21 12:04:08 +00:00
import warnings
import aerospike
2020-05-19 17:48:28 +00:00
import cassandra.cluster
import pymongo
import pymysql.cursors
2019-03-21 18:10:55 +00:00
import redis
2021-03-05 13:39:51 +00:00
import logging
2019-02-21 12:04:08 +00:00
2019-02-21 12:04:08 +00:00
class ExternalSource(object):
def __init__(
self,
name,
internal_hostname,
internal_port,
docker_hostname,
docker_port,
user,
password,
):
2019-02-21 12:04:08 +00:00
self.name = name
self.internal_hostname = internal_hostname
self.internal_port = int(internal_port)
self.docker_hostname = docker_hostname
self.docker_port = int(docker_port)
self.user = user
self.password = password
2019-02-21 16:43:21 +00:00
def get_source_str(self, table_name):
raise NotImplementedError(
"Method {} is not implemented for {}".format(
"get_source_config_part", self.__class__.__name__
)
)
2019-02-21 12:04:08 +00:00
2019-02-21 16:43:21 +00:00
def prepare(self, structure, table_name, cluster):
raise NotImplementedError(
"Method {} is not implemented for {}".format(
"prepare_remote_source", self.__class__.__name__
)
)
2019-02-21 12:04:08 +00:00
# data is banch of Row
def load_data(self, data):
raise NotImplementedError(
"Method {} is not implemented for {}".format(
"prepare_remote_source", self.__class__.__name__
)
)
2019-02-21 12:04:08 +00:00
2019-02-21 16:43:21 +00:00
def compatible_with_layout(self, layout):
return True
2019-02-21 12:04:08 +00:00
class SourceMySQL(ExternalSource):
TYPE_MAPPING = {
"UInt8": "tinyint unsigned",
"UInt16": "smallint unsigned",
"UInt32": "int unsigned",
"UInt64": "bigint unsigned",
"Int8": "tinyint",
"Int16": "smallint",
"Int32": "int",
"Int64": "bigint",
"UUID": "varchar(36)",
"Date": "date",
"DateTime": "datetime",
"String": "text",
"Float32": "float",
"Float64": "double",
2019-02-21 12:04:08 +00:00
}
2019-02-25 10:45:22 +00:00
2019-02-21 12:04:08 +00:00
def create_mysql_conn(self):
logging.debug(
f"pymysql connect {self.user}, {self.password}, {self.internal_hostname}, {self.internal_port}"
)
2019-02-21 12:04:08 +00:00
self.connection = pymysql.connect(
user=self.user,
password=self.password,
host=self.internal_hostname,
port=self.internal_port,
)
2019-02-21 12:04:08 +00:00
def execute_mysql_query(self, query):
with warnings.catch_warnings():
warnings.simplefilter("ignore")
with self.connection.cursor() as cursor:
cursor.execute(query)
self.connection.commit()
def get_source_str(self, table_name):
return """
2019-02-21 12:04:08 +00:00
<mysql>
<replica>
<priority>1</priority>
<host>127.0.0.1</host>
<port>3333</port> <!-- Wrong port, for testing basic failover to work. -->
</replica>
<replica>
<priority>2</priority>
<host>{hostname}</host>
<port>{port}</port>
</replica>
<user>{user}</user>
<password>{password}</password>
<db>test</db>
<table>{tbl}</table>
</mysql>""".format(
hostname=self.docker_hostname,
port=self.docker_port,
user=self.user,
password=self.password,
tbl=table_name,
)
2019-02-21 12:04:08 +00:00
2019-02-21 16:43:21 +00:00
def prepare(self, structure, table_name, cluster):
2021-03-05 13:39:51 +00:00
if self.internal_hostname is None:
self.internal_hostname = cluster.mysql_ip
2019-02-21 12:04:08 +00:00
self.create_mysql_conn()
self.execute_mysql_query(
"create database if not exists test default character set 'utf8'"
)
2021-02-15 09:35:45 +00:00
self.execute_mysql_query("drop table if exists test.{}".format(table_name))
2019-02-21 12:04:08 +00:00
fields_strs = []
for field in (
structure.keys + structure.ordinary_fields + structure.range_fields
):
fields_strs.append(field.name + " " + self.TYPE_MAPPING[field.field_type])
create_query = """create table test.{table_name} (
2019-02-21 12:04:08 +00:00
{fields_str});
""".format(
table_name=table_name, fields_str=",".join(fields_strs)
)
2019-02-21 12:04:08 +00:00
self.execute_mysql_query(create_query)
2019-02-21 16:43:21 +00:00
self.ordered_names = structure.get_ordered_names()
2019-02-21 12:04:08 +00:00
self.prepared = True
def load_data(self, data, table_name):
values_strs = []
if not data:
return
for row in data:
sorted_row = []
2019-02-21 16:43:21 +00:00
for name in self.ordered_names:
2019-02-21 12:04:08 +00:00
data = row.data[name]
if isinstance(row.data[name], str):
data = "'" + data + "'"
else:
data = str(data)
sorted_row.append(data)
values_strs.append("(" + ",".join(sorted_row) + ")")
query = "insert into test.{} ({}) values {}".format(
table_name, ",".join(self.ordered_names), ",".join(values_strs)
)
2019-02-21 12:04:08 +00:00
self.execute_mysql_query(query)
2019-02-25 10:45:22 +00:00
class SourceMongo(ExternalSource):
def get_source_str(self, table_name):
return """
2019-02-25 10:45:22 +00:00
<mongodb>
<host>{host}</host>
<port>{port}</port>
<user>{user}</user>
<password>{password}</password>
<db>test</db>
<collection>{tbl}</collection>
</mongodb>
""".format(
2019-02-25 10:45:22 +00:00
host=self.docker_hostname,
port=self.docker_port,
user=self.user,
password=self.password,
tbl=table_name,
)
def prepare(self, structure, table_name, cluster):
connection_str = "mongodb://{user}:{password}@{host}:{port}".format(
host=self.internal_hostname,
port=self.internal_port,
user=self.user,
password=self.password,
)
2019-02-25 10:45:22 +00:00
self.connection = pymongo.MongoClient(connection_str)
self.converters = {}
for field in structure.get_all_fields():
if field.field_type == "Date":
self.converters[field.name] = lambda x: datetime.datetime.strptime(
x, "%Y-%m-%d"
)
2019-02-25 10:45:22 +00:00
elif field.field_type == "DateTime":
2022-03-14 12:46:45 +00:00
def converter(x):
return datetime.datetime.strptime(x, "%Y-%m-%d %H:%M:%S")
2022-03-14 12:46:45 +00:00
self.converters[field.name] = converter
2019-02-25 10:45:22 +00:00
else:
self.converters[field.name] = lambda x: x
self.db = self.connection["test"]
2019-02-25 10:45:22 +00:00
self.db.add_user(self.user, self.password)
self.prepared = True
def load_data(self, data, table_name):
tbl = self.db[table_name]
to_insert = []
for row in data:
row_dict = {}
2020-10-02 16:54:07 +00:00
for cell_name, cell_value in list(row.data.items()):
2019-02-25 10:45:22 +00:00
row_dict[cell_name] = self.converters[cell_name](cell_value)
to_insert.append(row_dict)
result = tbl.insert_many(to_insert)
2019-02-21 16:43:21 +00:00
2020-05-14 11:36:19 +00:00
class SourceMongoURI(SourceMongo):
2020-05-19 20:12:10 +00:00
def compatible_with_layout(self, layout):
# It is enough to test one layout for this dictionary, since we're
# only testing that the connection with URI works.
return layout.name == "flat"
2020-05-19 20:12:10 +00:00
2020-05-14 11:36:19 +00:00
def get_source_str(self, table_name):
return """
2020-05-14 11:36:19 +00:00
<mongodb>
<uri>mongodb://{user}:{password}@{host}:{port}/test</uri>
<collection>{tbl}</collection>
</mongodb>
""".format(
2020-05-14 11:36:19 +00:00
host=self.docker_hostname,
port=self.docker_port,
user=self.user,
password=self.password,
tbl=table_name,
)
2019-02-21 16:43:21 +00:00
class SourceClickHouse(ExternalSource):
def get_source_str(self, table_name):
return """
2019-02-21 16:43:21 +00:00
<clickhouse>
<host>{host}</host>
<port>{port}</port>
<user>{user}</user>
<password>{password}</password>
<db>test</db>
<table>{tbl}</table>
</clickhouse>
""".format(
2019-02-21 16:43:21 +00:00
host=self.docker_hostname,
port=self.docker_port,
user=self.user,
password=self.password,
tbl=table_name,
)
def prepare(self, structure, table_name, cluster):
self.node = cluster.instances[self.docker_hostname]
self.node.query("CREATE DATABASE IF NOT EXISTS test")
fields_strs = []
for field in (
structure.keys + structure.ordinary_fields + structure.range_fields
):
fields_strs.append(field.name + " " + field.field_type)
create_query = """CREATE TABLE test.{table_name} (
2019-02-21 16:43:21 +00:00
{fields_str}) ENGINE MergeTree ORDER BY tuple();
""".format(
table_name=table_name, fields_str=",".join(fields_strs)
)
2019-02-21 16:43:21 +00:00
self.node.query(create_query)
self.ordered_names = structure.get_ordered_names()
self.prepared = True
def load_data(self, data, table_name):
values_strs = []
if not data:
return
for row in data:
sorted_row = []
for name in self.ordered_names:
row_data = row.data[name]
if isinstance(row_data, str):
row_data = "'" + row_data + "'"
else:
row_data = str(row_data)
sorted_row.append(row_data)
values_strs.append("(" + ",".join(sorted_row) + ")")
query = "INSERT INTO test.{} ({}) values {}".format(
table_name, ",".join(self.ordered_names), ",".join(values_strs)
)
2019-02-21 16:43:21 +00:00
self.node.query(query)
class SourceFile(ExternalSource):
def get_source_str(self, table_name):
table_path = "/" + table_name + ".tsv"
return """
2019-02-21 16:43:21 +00:00
<file>
<path>{path}</path>
<format>TabSeparated</format>
</file>
""".format(
2019-02-21 16:43:21 +00:00
path=table_path,
)
def prepare(self, structure, table_name, cluster):
self.node = cluster.instances[self.docker_hostname]
path = "/" + table_name + ".tsv"
self.node.exec_in_container(
["bash", "-c", "touch {}".format(path)], user="root"
)
2019-02-21 16:43:21 +00:00
self.ordered_names = structure.get_ordered_names()
self.prepared = True
def load_data(self, data, table_name):
if not data:
return
path = "/" + table_name + ".tsv"
for row in list(data):
sorted_row = []
for name in self.ordered_names:
sorted_row.append(str(row.data[name]))
str_data = "\t".join(sorted_row)
self.node.exec_in_container(
[
"bash",
"-c",
'echo "{row}" >> {fname}'.format(row=str_data, fname=path),
],
user="root",
)
2019-02-21 16:43:21 +00:00
def compatible_with_layout(self, layout):
return "cache" not in layout.name and "direct" not in layout.name
2019-02-21 16:43:21 +00:00
class _SourceExecutableBase(ExternalSource):
def _get_cmd(self, path):
raise NotImplementedError(
"Method {} is not implemented for {}".format(
"_get_cmd", self.__class__.__name__
)
)
2019-02-21 16:43:21 +00:00
def get_source_str(self, table_name):
table_path = "/" + table_name + ".tsv"
return """
2019-02-21 16:43:21 +00:00
<executable>
<command>{cmd}</command>
<format>TabSeparated</format>
</executable>
""".format(
2019-02-21 16:43:21 +00:00
cmd=self._get_cmd(table_path),
)
def prepare(self, structure, table_name, cluster):
self.node = cluster.instances[self.docker_hostname]
path = "/" + table_name + ".tsv"
self.node.exec_in_container(
["bash", "-c", "touch {}".format(path)], user="root"
)
2019-02-21 16:43:21 +00:00
self.ordered_names = structure.get_ordered_names()
self.prepared = True
def load_data(self, data, table_name):
if not data:
return
path = "/" + table_name + ".tsv"
for row in list(data):
sorted_row = []
for name in self.ordered_names:
sorted_row.append(str(row.data[name]))
str_data = "\t".join(sorted_row)
self.node.exec_in_container(
[
"bash",
"-c",
'echo "{row}" >> {fname}'.format(row=str_data, fname=path),
],
user="root",
)
2019-02-21 16:43:21 +00:00
2020-09-23 19:31:47 +00:00
class SourceExecutableHashed(_SourceExecutableBase):
2019-02-21 16:43:21 +00:00
def _get_cmd(self, path):
return "cat {}".format(path)
def compatible_with_layout(self, layout):
return "hashed" in layout.name
2019-02-21 16:43:21 +00:00
2020-09-23 19:31:47 +00:00
class SourceExecutableCache(_SourceExecutableBase):
2019-02-21 16:43:21 +00:00
def _get_cmd(self, path):
return "cat - >/dev/null;cat {}".format(path)
def compatible_with_layout(self, layout):
return "cache" in layout.name
2019-02-21 17:02:33 +00:00
class SourceHTTPBase(ExternalSource):
2019-02-22 10:55:12 +00:00
PORT_COUNTER = 5555
2019-02-21 17:02:33 +00:00
def get_source_str(self, table_name):
2019-02-22 10:55:12 +00:00
self.http_port = SourceHTTPBase.PORT_COUNTER
url = "{schema}://{host}:{port}/".format(
schema=self._get_schema(), host=self.docker_hostname, port=self.http_port
)
2019-02-22 10:55:12 +00:00
SourceHTTPBase.PORT_COUNTER += 1
return """
2019-02-21 17:02:33 +00:00
<http>
<url>{url}</url>
<format>TabSeparated</format>
<credentials>
<user>foo</user>
<password>bar</password>
</credentials>
<headers>
<header>
<name>api-key</name>
<value>secret</value>
</header>
</headers>
2019-02-21 17:02:33 +00:00
</http>
""".format(
url=url
)
2019-02-21 17:02:33 +00:00
def prepare(self, structure, table_name, cluster):
self.node = cluster.instances[self.docker_hostname]
2019-02-22 10:55:12 +00:00
path = "/" + table_name + ".tsv"
self.node.exec_in_container(
["bash", "-c", "touch {}".format(path)], user="root"
)
2019-02-22 10:55:12 +00:00
script_dir = os.path.dirname(os.path.realpath(__file__))
self.node.copy_file_to_container(
os.path.join(script_dir, "./http_server.py"), "/http_server.py"
)
self.node.copy_file_to_container(
os.path.join(script_dir, "./fake_cert.pem"), "/fake_cert.pem"
)
self.node.exec_in_container(
[
"bash",
"-c",
"python3 /http_server.py --data-path={tbl} --schema={schema} --host={host} --port={port} --cert-path=/fake_cert.pem".format(
tbl=path,
schema=self._get_schema(),
host=self.docker_hostname,
port=self.http_port,
),
],
detach=True,
)
2019-02-21 17:02:33 +00:00
self.ordered_names = structure.get_ordered_names()
self.prepared = True
2019-02-21 17:34:19 +00:00
def load_data(self, data, table_name):
if not data:
return
path = "/" + table_name + ".tsv"
for row in list(data):
sorted_row = []
for name in self.ordered_names:
sorted_row.append(str(row.data[name]))
str_data = "\t".join(sorted_row)
self.node.exec_in_container(
[
"bash",
"-c",
'echo "{row}" >> {fname}'.format(row=str_data, fname=path),
],
user="root",
)
2019-02-22 10:55:12 +00:00
class SourceHTTP(SourceHTTPBase):
def _get_schema(self):
return "http"
class SourceHTTPS(SourceHTTPBase):
def _get_schema(self):
return "https"
2019-04-11 09:05:01 +00:00
2019-04-11 09:05:01 +00:00
class SourceCassandra(ExternalSource):
2020-05-26 19:21:18 +00:00
TYPE_MAPPING = {
"UInt8": "tinyint",
"UInt16": "smallint",
"UInt32": "int",
"UInt64": "bigint",
"Int8": "tinyint",
"Int16": "smallint",
"Int32": "int",
"Int64": "bigint",
"UUID": "uuid",
"Date": "date",
"DateTime": "timestamp",
"String": "text",
"Float32": "float",
"Float64": "double",
2020-05-26 19:21:18 +00:00
}
def __init__(
self,
name,
internal_hostname,
internal_port,
docker_hostname,
docker_port,
user,
password,
):
ExternalSource.__init__(
self,
name,
internal_hostname,
internal_port,
docker_hostname,
docker_port,
user,
password,
)
2020-05-26 19:21:18 +00:00
self.structure = dict()
2019-04-11 09:05:01 +00:00
def get_source_str(self, table_name):
return """
2019-04-11 09:05:01 +00:00
<cassandra>
<host>{host}</host>
<port>{port}</port>
2020-05-26 19:21:18 +00:00
<keyspace>test</keyspace>
<column_family>{table}</column_family>
<allow_filtering>1</allow_filtering>
2020-06-03 15:07:37 +00:00
<where>"Int64_" &lt; 1000000000000000000</where>
2019-04-11 09:05:01 +00:00
</cassandra>
""".format(
2019-04-11 09:05:01 +00:00
host=self.docker_hostname,
port=self.docker_port,
2020-05-26 19:21:18 +00:00
table=table_name,
2019-04-11 09:05:01 +00:00
)
def prepare(self, structure, table_name, cluster):
2021-04-07 12:22:53 +00:00
if self.internal_hostname is None:
self.internal_hostname = cluster.cassandra_ip
self.client = cassandra.cluster.Cluster(
[self.internal_hostname], port=self.internal_port
)
2020-05-26 19:21:18 +00:00
self.session = self.client.connect()
self.session.execute(
"create keyspace if not exists test with replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};"
)
2020-05-27 20:13:25 +00:00
self.session.execute('drop table if exists test."{}"'.format(table_name))
2020-05-26 19:21:18 +00:00
self.structure[table_name] = structure
columns = [
'"' + col.name + '" ' + self.TYPE_MAPPING[col.field_type]
for col in structure.get_all_fields()
]
2020-05-26 19:21:18 +00:00
keys = ['"' + col.name + '"' for col in structure.keys]
query = 'create table test."{name}" ({columns}, primary key ({pk}));'.format(
name=table_name, columns=", ".join(columns), pk=", ".join(keys)
)
2020-05-26 19:21:18 +00:00
self.session.execute(query)
2019-04-11 09:05:01 +00:00
self.prepared = True
2020-05-26 19:21:18 +00:00
def get_value_to_insert(self, value, type):
if type == "UUID":
2020-05-26 19:21:18 +00:00
return uuid.UUID(value)
elif type == "DateTime":
return datetime.datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
2020-05-26 19:21:18 +00:00
return value
2019-04-11 09:05:01 +00:00
def load_data(self, data, table_name):
names_and_types = [
(field.name, field.field_type)
for field in self.structure[table_name].get_all_fields()
]
2020-05-26 19:21:18 +00:00
columns = ['"' + col[0] + '"' for col in names_and_types]
insert = 'insert into test."{table}" ({columns}) values ({args})'.format(
table=table_name,
columns=",".join(columns),
args=",".join(["%s"] * len(columns)),
)
2020-05-26 19:21:18 +00:00
for row in data:
values = [
self.get_value_to_insert(row.get_value_by_name(col[0]), col[1])
for col in names_and_types
]
2020-05-26 19:21:18 +00:00
self.session.execute(insert, values)
2019-03-21 18:10:55 +00:00
2019-03-21 18:10:55 +00:00
class SourceRedis(ExternalSource):
def __init__(
self,
name,
internal_hostname,
internal_port,
docker_hostname,
docker_port,
user,
password,
db_index,
storage_type,
):
super(SourceRedis, self).__init__(
name,
internal_hostname,
internal_port,
docker_hostname,
docker_port,
user,
password,
)
self.storage_type = storage_type
2020-09-07 01:05:30 +00:00
self.db_index = db_index
2019-03-21 18:10:55 +00:00
def get_source_str(self, table_name):
return """
2019-03-21 18:10:55 +00:00
<redis>
<host>{host}</host>
<port>{port}</port>
<password>{password}</password>
<db_index>{db_index}</db_index>
2019-05-28 20:17:30 +00:00
<storage_type>{storage_type}</storage_type>
2019-03-21 18:10:55 +00:00
</redis>
""".format(
2019-03-21 18:10:55 +00:00
host=self.docker_hostname,
port=self.docker_port,
password=self.password,
2019-05-28 20:17:30 +00:00
storage_type=self.storage_type, # simple or hash_map
db_index=self.db_index,
2019-03-21 18:10:55 +00:00
)
def prepare(self, structure, table_name, cluster):
self.client = redis.StrictRedis(
host=self.internal_hostname,
port=self.internal_port,
db=self.db_index,
password=self.password or None,
)
2019-03-21 18:10:55 +00:00
self.prepared = True
self.ordered_names = structure.get_ordered_names()
2019-03-21 18:10:55 +00:00
def load_data(self, data, table_name):
2019-05-28 20:17:30 +00:00
self.client.flushdb()
for row in list(data):
values = []
for name in self.ordered_names:
values.append(str(row.data[name]))
if len(values) == 2:
self.client.set(*values)
else:
self.client.hset(*values)
2019-04-17 10:11:38 +00:00
2019-04-17 11:35:02 +00:00
def compatible_with_layout(self, layout):
return (
layout.is_simple
and self.storage_type == "simple"
or layout.is_complex
and self.storage_type == "hash_map"
)
2019-05-28 20:17:30 +00:00
2019-05-28 20:17:30 +00:00
class SourceAerospike(ExternalSource):
def __init__(
self,
name,
internal_hostname,
internal_port,
docker_hostname,
docker_port,
user,
password,
):
ExternalSource.__init__(
self,
name,
internal_hostname,
internal_port,
docker_hostname,
docker_port,
user,
password,
)
2019-05-28 20:17:30 +00:00
self.namespace = "test"
self.set = "test_set"
2019-04-17 11:35:02 +00:00
2019-05-28 20:17:30 +00:00
def get_source_str(self, table_name):
print("AEROSPIKE get source str")
return """
2019-05-28 20:17:30 +00:00
<aerospike>
<host>{host}</host>
<port>{port}</port>
</aerospike>
""".format(
2019-05-28 20:17:30 +00:00
host=self.docker_hostname,
port=self.docker_port,
)
def prepare(self, structure, table_name, cluster):
config = {"hosts": [(self.internal_hostname, self.internal_port)]}
2019-05-28 20:17:30 +00:00
self.client = aerospike.client(config).connect()
self.prepared = True
print("PREPARED AEROSPIKE")
print(config)
def compatible_with_layout(self, layout):
print("compatible AEROSPIKE")
return layout.is_simple
def _flush_aerospike_db(self):
keys = []
2020-10-02 16:54:07 +00:00
def handle_record(xxx_todo_changeme):
(key, metadata, record) = xxx_todo_changeme
print(("Handle record {} {}".format(key, record)))
2019-05-28 20:17:30 +00:00
keys.append(key)
2020-10-02 16:54:07 +00:00
def print_record(xxx_todo_changeme1):
(key, metadata, record) = xxx_todo_changeme1
print(("Print record {} {}".format(key, record)))
2019-05-28 20:17:30 +00:00
scan = self.client.scan(self.namespace, self.set)
scan.foreach(handle_record)
[self.client.remove(key) for key in keys]
def load_kv_data(self, values):
self._flush_aerospike_db()
print("Load KV Data Aerospike")
if len(values[0]) == 2:
for value in values:
key = (self.namespace, self.set, value[0])
print(key)
self.client.put(
key,
{"bin_value": value[1]},
policy={"key": aerospike.POLICY_KEY_SEND},
)
2019-05-28 20:17:30 +00:00
assert self.client.exists(key)
else:
assert "VALUES SIZE != 2"
2019-05-28 20:17:30 +00:00
# print(values)
def load_data(self, data, table_name):
print("Load Data Aerospike")
# print(data)