# -*- coding: utf-8 -*- import datetime import logging import os import uuid import warnings import cassandra.cluster import pymongo import pymysql.cursors import redis class ExternalSource(object): def __init__( self, name, internal_hostname, internal_port, docker_hostname, docker_port, user, password, ): self.name = name self.internal_hostname = internal_hostname self.internal_port = int(internal_port) self.docker_hostname = docker_hostname self.docker_port = int(docker_port) self.user = user self.password = password def get_source_str(self, table_name): raise NotImplementedError( "Method {} is not implemented for {}".format( "get_source_config_part", self.__class__.__name__ ) ) def prepare(self, structure, table_name, cluster): raise NotImplementedError( "Method {} is not implemented for {}".format( "prepare_remote_source", self.__class__.__name__ ) ) # data is banch of Row def load_data(self, data): raise NotImplementedError( "Method {} is not implemented for {}".format( "prepare_remote_source", self.__class__.__name__ ) ) def compatible_with_layout(self, layout): return True class SourceMySQL(ExternalSource): TYPE_MAPPING = { "UInt8": "tinyint unsigned", "UInt16": "smallint unsigned", "UInt32": "int unsigned", "UInt64": "bigint unsigned", "Int8": "tinyint", "Int16": "smallint", "Int32": "int", "Int64": "bigint", "UUID": "varchar(36)", "Date": "date", "DateTime": "datetime", "String": "text", "Float32": "float", "Float64": "double", } def create_mysql_conn(self): logging.debug( f"pymysql connect {self.user}, {self.password}, {self.internal_hostname}, {self.internal_port}" ) self.connection = pymysql.connect( user=self.user, password=self.password, host=self.internal_hostname, port=self.internal_port, ) def execute_mysql_query(self, query): with warnings.catch_warnings(): warnings.simplefilter("ignore") with self.connection.cursor() as cursor: cursor.execute(query) self.connection.commit() def get_source_str(self, table_name): return """ 1 127.0.0.1 3333 2 {hostname} {port} {user} {password} test {tbl}
""".format( hostname=self.docker_hostname, port=self.docker_port, user=self.user, password=self.password, tbl=table_name, ) def prepare(self, structure, table_name, cluster): if self.internal_hostname is None: self.internal_hostname = cluster.mysql8_ip self.create_mysql_conn() self.execute_mysql_query( "create database if not exists test default character set 'utf8'" ) self.execute_mysql_query("drop table if exists test.{}".format(table_name)) fields_strs = [] for field in ( structure.keys + structure.ordinary_fields + structure.range_fields ): fields_strs.append(field.name + " " + self.TYPE_MAPPING[field.field_type]) create_query = """create table test.{table_name} ( {fields_str}); """.format( table_name=table_name, fields_str=",".join(fields_strs) ) self.execute_mysql_query(create_query) self.ordered_names = structure.get_ordered_names() self.prepared = True def load_data(self, data, table_name): values_strs = [] if not data: return for row in data: sorted_row = [] for name in self.ordered_names: data = row.data[name] if isinstance(row.data[name], str): data = "'" + data + "'" else: data = str(data) sorted_row.append(data) values_strs.append("(" + ",".join(sorted_row) + ")") query = "insert into test.{} ({}) values {}".format( table_name, ",".join(self.ordered_names), ",".join(values_strs) ) self.execute_mysql_query(query) class SourceMongo(ExternalSource): def __init__( self, name, internal_hostname, internal_port, docker_hostname, docker_port, user, password, secure=False, ): ExternalSource.__init__( self, name, internal_hostname, internal_port, docker_hostname, docker_port, user, password, ) self.secure = secure def get_source_str(self, table_name): options = "" if self.secure: options = "tls=true&tlsAllowInvalidCertificates=true" return """ {host} {port} {user} {password} test {tbl} {options} """.format( host=self.docker_hostname, port=self.docker_port, user=self.user, password=self.password, tbl=table_name, options=options, ) def prepare(self, structure, table_name, cluster): connection_str = "mongodb://{user}:{password}@{host}:{port}".format( host=self.internal_hostname, port=self.internal_port, user=self.user, password=self.password, ) if self.secure: connection_str += "/?tls=true&tlsAllowInvalidCertificates=true" self.connection = pymongo.MongoClient(connection_str) self.converters = {} for field in structure.get_all_fields(): if field.field_type == "Date": self.converters[field.name] = lambda x: datetime.datetime.strptime( x, "%Y-%m-%d" ) elif field.field_type == "DateTime": def converter(x): return datetime.datetime.strptime(x, "%Y-%m-%d %H:%M:%S") self.converters[field.name] = converter else: self.converters[field.name] = lambda x: x self.db = self.connection["test"] user_info = self.db.command("usersInfo", self.user) if user_info["users"]: self.db.command("updateUser", self.user, pwd=self.password) else: self.db.command( "createUser", self.user, pwd=self.password, roles=["readWrite"] ) self.prepared = True def load_data(self, data, table_name): tbl = self.db[table_name] to_insert = [] for row in data: row_dict = {} for cell_name, cell_value in list(row.data.items()): row_dict[cell_name] = self.converters[cell_name](cell_value) to_insert.append(row_dict) result = tbl.insert_many(to_insert) class SourceMongoURI(SourceMongo): def compatible_with_layout(self, layout): # It is enough to test one layout for this dictionary, since we're # only testing that the connection with URI works. return layout.name == "flat" def get_source_str(self, table_name): options = "" if self.secure: options = "tls=true&tlsAllowInvalidCertificates=true" return """ mongodb://{user}:{password}@{host}:{port}/test?{options} {tbl} """.format( host=self.docker_hostname, port=self.docker_port, user=self.user, password=self.password, tbl=table_name, options=options, ) class SourceClickHouse(ExternalSource): def get_source_str(self, table_name): return """ {host} {port} {user} {password} test {tbl}
""".format( host=self.docker_hostname, port=self.docker_port, user=self.user, password=self.password, tbl=table_name, ) def prepare(self, structure, table_name, cluster): self.node = cluster.instances[self.docker_hostname] self.node.query("CREATE DATABASE IF NOT EXISTS test") fields_strs = [] for field in ( structure.keys + structure.ordinary_fields + structure.range_fields ): fields_strs.append(field.name + " " + field.field_type) create_query = """CREATE TABLE test.{table_name} ( {fields_str}) ENGINE MergeTree ORDER BY tuple(); """.format( table_name=table_name, fields_str=",".join(fields_strs) ) self.node.query(create_query) self.ordered_names = structure.get_ordered_names() self.prepared = True def load_data(self, data, table_name): values_strs = [] if not data: return for row in data: sorted_row = [] for name in self.ordered_names: row_data = row.data[name] if isinstance(row_data, str): row_data = "'" + row_data + "'" else: row_data = str(row_data) sorted_row.append(row_data) values_strs.append("(" + ",".join(sorted_row) + ")") query = "INSERT INTO test.{} ({}) values {}".format( table_name, ",".join(self.ordered_names), ",".join(values_strs) ) self.node.query(query) class SourceFile(ExternalSource): def get_source_str(self, table_name): table_path = "/" + table_name + ".tsv" return """ {path} TabSeparated """.format( path=table_path, ) def prepare(self, structure, table_name, cluster): self.node = cluster.instances[self.docker_hostname] path = "/" + table_name + ".tsv" self.node.exec_in_container( ["bash", "-c", "touch {}".format(path)], user="root" ) self.ordered_names = structure.get_ordered_names() self.prepared = True def load_data(self, data, table_name): if not data: return path = "/" + table_name + ".tsv" for row in list(data): sorted_row = [] for name in self.ordered_names: sorted_row.append(str(row.data[name])) str_data = "\t".join(sorted_row) self.node.exec_in_container( [ "bash", "-c", 'echo "{row}" >> {fname}'.format(row=str_data, fname=path), ], user="root", ) def compatible_with_layout(self, layout): return "cache" not in layout.name and "direct" not in layout.name class _SourceExecutableBase(ExternalSource): def _get_cmd(self, path): raise NotImplementedError( "Method {} is not implemented for {}".format( "_get_cmd", self.__class__.__name__ ) ) def get_source_str(self, table_name): table_path = "/" + table_name + ".tsv" return """ {cmd} TabSeparated """.format( cmd=self._get_cmd(table_path), ) def prepare(self, structure, table_name, cluster): self.node = cluster.instances[self.docker_hostname] path = "/" + table_name + ".tsv" self.node.exec_in_container( ["bash", "-c", "touch {}".format(path)], user="root" ) self.ordered_names = structure.get_ordered_names() self.prepared = True def load_data(self, data, table_name): if not data: return path = "/" + table_name + ".tsv" for row in list(data): sorted_row = [] for name in self.ordered_names: sorted_row.append(str(row.data[name])) str_data = "\t".join(sorted_row) self.node.exec_in_container( [ "bash", "-c", 'echo "{row}" >> {fname}'.format(row=str_data, fname=path), ], user="root", ) class SourceExecutableHashed(_SourceExecutableBase): def _get_cmd(self, path): return "cat {}".format(path) def compatible_with_layout(self, layout): return "hashed" in layout.name class SourceExecutableCache(_SourceExecutableBase): def _get_cmd(self, path): return "cat - >/dev/null;cat {}".format(path) def compatible_with_layout(self, layout): return "cache" in layout.name class SourceHTTPBase(ExternalSource): PORT_COUNTER = 5555 def get_source_str(self, table_name): self.http_port = SourceHTTPBase.PORT_COUNTER url = "{schema}://{host}:{port}/".format( schema=self._get_schema(), host=self.docker_hostname, port=self.http_port ) SourceHTTPBase.PORT_COUNTER += 1 return """ {url} TabSeparated foo bar
api-key secret
""".format( url=url ) def prepare(self, structure, table_name, cluster): self.node = cluster.instances[self.docker_hostname] path = "/" + table_name + ".tsv" self.node.exec_in_container( ["bash", "-c", "touch {}".format(path)], user="root" ) script_dir = os.path.dirname(os.path.realpath(__file__)) self.node.copy_file_to_container( os.path.join(script_dir, "./http_server.py"), "/http_server.py" ) self.node.copy_file_to_container( os.path.join(script_dir, "./fake_cert.pem"), "/fake_cert.pem" ) self.node.exec_in_container( [ "bash", "-c", "python3 /http_server.py --data-path={tbl} --schema={schema} --host={host} --port={port} --cert-path=/fake_cert.pem".format( tbl=path, schema=self._get_schema(), host=self.docker_hostname, port=self.http_port, ), ], detach=True, ) self.ordered_names = structure.get_ordered_names() self.prepared = True def load_data(self, data, table_name): if not data: return path = "/" + table_name + ".tsv" for row in list(data): sorted_row = [] for name in self.ordered_names: sorted_row.append(str(row.data[name])) str_data = "\t".join(sorted_row) self.node.exec_in_container( [ "bash", "-c", 'echo "{row}" >> {fname}'.format(row=str_data, fname=path), ], user="root", ) class SourceHTTP(SourceHTTPBase): def _get_schema(self): return "http" class SourceHTTPS(SourceHTTPBase): def _get_schema(self): return "https" class SourceCassandra(ExternalSource): TYPE_MAPPING = { "UInt8": "tinyint", "UInt16": "smallint", "UInt32": "int", "UInt64": "bigint", "Int8": "tinyint", "Int16": "smallint", "Int32": "int", "Int64": "bigint", "UUID": "uuid", "Date": "date", "DateTime": "timestamp", "String": "text", "Float32": "float", "Float64": "double", } def __init__( self, name, internal_hostname, internal_port, docker_hostname, docker_port, user, password, ): ExternalSource.__init__( self, name, internal_hostname, internal_port, docker_hostname, docker_port, user, password, ) self.structure = dict() def get_source_str(self, table_name): return """ {host} {port} test {table} 1 "Int64_" < 1000000000000000000 """.format( host=self.docker_hostname, port=self.docker_port, table=table_name, ) def prepare(self, structure, table_name, cluster): if self.internal_hostname is None: self.internal_hostname = cluster.cassandra_ip self.client = cassandra.cluster.Cluster( [self.internal_hostname], port=self.internal_port ) self.session = self.client.connect() self.session.execute( "create keyspace if not exists test with replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};" ) self.session.execute('drop table if exists test."{}"'.format(table_name)) self.structure[table_name] = structure columns = [ '"' + col.name + '" ' + self.TYPE_MAPPING[col.field_type] for col in structure.get_all_fields() ] keys = ['"' + col.name + '"' for col in structure.keys] query = 'create table test."{name}" ({columns}, primary key ({pk}));'.format( name=table_name, columns=", ".join(columns), pk=", ".join(keys) ) self.session.execute(query) self.prepared = True def get_value_to_insert(self, value, type): if type == "UUID": return uuid.UUID(value) elif type == "DateTime": return datetime.datetime.strptime(value, "%Y-%m-%d %H:%M:%S") return value def load_data(self, data, table_name): names_and_types = [ (field.name, field.field_type) for field in self.structure[table_name].get_all_fields() ] columns = ['"' + col[0] + '"' for col in names_and_types] insert = 'insert into test."{table}" ({columns}) values ({args})'.format( table=table_name, columns=",".join(columns), args=",".join(["%s"] * len(columns)), ) for row in data: values = [ self.get_value_to_insert(row.get_value_by_name(col[0]), col[1]) for col in names_and_types ] self.session.execute(insert, values) class SourceRedis(ExternalSource): def __init__( self, name, internal_hostname, internal_port, docker_hostname, docker_port, user, password, db_index, storage_type, ): super(SourceRedis, self).__init__( name, internal_hostname, internal_port, docker_hostname, docker_port, user, password, ) self.storage_type = storage_type self.db_index = db_index def get_source_str(self, table_name): return """ {host} {port} {password} {db_index} {storage_type} """.format( host=self.docker_hostname, port=self.docker_port, password=self.password, storage_type=self.storage_type, # simple or hash_map db_index=self.db_index, ) def prepare(self, structure, table_name, cluster): self.client = redis.StrictRedis( host=self.internal_hostname, port=self.internal_port, db=self.db_index, password=self.password or None, ) self.prepared = True self.ordered_names = structure.get_ordered_names() def load_data(self, data, table_name): self.client.flushdb() for row in list(data): values = [] for name in self.ordered_names: values.append(str(row.data[name])) if len(values) == 2: self.client.set(*values) else: self.client.hset(*values) def compatible_with_layout(self, layout): return ( layout.is_simple and self.storage_type == "simple" or layout.is_complex and self.storage_type == "hash_map" )