Add integration test for jdbc bridge

This commit is contained in:
Zhichun Wu 2021-06-07 20:56:29 +08:00
parent a056fadefd
commit 7f2b444d6f
5 changed files with 188 additions and 2 deletions

View File

@ -0,0 +1,23 @@
version: '2.3'
services:
bridge1:
image: yandex/clickhouse-jdbc-bridge
command: |
/bin/bash -c 'cat << EOF > config/datasources/self.json
{
"self": {
"jdbcUrl": "jdbc:clickhouse://instance:8123/test",
"username": "default",
"password": "",
"maximumPoolSize": 5
}
}
EOF
./docker-entrypoint.sh'
ports:
- 9020:9019
healthcheck:
test: ["CMD", "curl", "-s", "localhost:9019/ping"]
interval: 5s
timeout: 3s
retries: 30

View File

@ -223,6 +223,7 @@ class ClickHouseCluster:
self.base_kerberized_kafka_cmd = [] self.base_kerberized_kafka_cmd = []
self.base_rabbitmq_cmd = [] self.base_rabbitmq_cmd = []
self.base_cassandra_cmd = [] self.base_cassandra_cmd = []
self.base_jdbc_bridge_cmd = []
self.base_redis_cmd = [] self.base_redis_cmd = []
self.pre_zookeeper_commands = [] self.pre_zookeeper_commands = []
self.instances = {} self.instances = {}
@ -244,6 +245,7 @@ class ClickHouseCluster:
self.with_net_trics = False self.with_net_trics = False
self.with_redis = False self.with_redis = False
self.with_cassandra = False self.with_cassandra = False
self.with_jdbc_bridge = False
self.with_minio = False self.with_minio = False
self.minio_dir = os.path.join(self.instances_dir, "minio") self.minio_dir = os.path.join(self.instances_dir, "minio")
@ -663,12 +665,19 @@ class ClickHouseCluster:
'--file', p.join(docker_compose_yml_dir, 'docker_compose_cassandra.yml')] '--file', p.join(docker_compose_yml_dir, 'docker_compose_cassandra.yml')]
return self.base_cassandra_cmd return self.base_cassandra_cmd
def setup_jdbc_bridge_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_jdbc_bridge = True
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_jdbc_bridge.yml')])
self.base_jdbc_bridge_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', p.join(docker_compose_yml_dir, 'docker_compose_jdbc_bridge.yml')]
return self.base_jdbc_bridge_cmd
def add_instance(self, name, base_config_dir=None, main_configs=None, user_configs=None, dictionaries=None, def add_instance(self, name, base_config_dir=None, main_configs=None, user_configs=None, dictionaries=None,
macros=None, with_zookeeper=False, with_zookeeper_secure=False, macros=None, with_zookeeper=False, with_zookeeper_secure=False,
with_mysql_client=False, with_mysql=False, with_mysql8=False, with_mysql_cluster=False, with_mysql_client=False, with_mysql=False, with_mysql8=False, with_mysql_cluster=False,
with_kafka=False, with_kerberized_kafka=False, with_rabbitmq=False, clickhouse_path_dir=None, with_kafka=False, with_kerberized_kafka=False, with_rabbitmq=False, clickhouse_path_dir=None,
with_odbc_drivers=False, with_postgres=False, with_postgres_cluster=False, with_hdfs=False, with_kerberized_hdfs=False, with_mongo=False, with_odbc_drivers=False, with_postgres=False, with_postgres_cluster=False, with_hdfs=False, with_kerberized_hdfs=False, with_mongo=False,
with_redis=False, with_minio=False, with_cassandra=False, with_redis=False, with_minio=False, with_cassandra=False, with_jdbc_bridge=False,
hostname=None, env_variables=None, image="yandex/clickhouse-integration-test", tag=None, hostname=None, env_variables=None, image="yandex/clickhouse-integration-test", tag=None,
stay_alive=False, ipv4_address=None, ipv6_address=None, with_installed_binary=False, tmpfs=None, stay_alive=False, ipv4_address=None, ipv6_address=None, with_installed_binary=False, tmpfs=None,
zookeeper_docker_compose_path=None, minio_certs_dir=None, use_keeper=True, zookeeper_docker_compose_path=None, minio_certs_dir=None, use_keeper=True,
@ -724,6 +733,7 @@ class ClickHouseCluster:
with_redis=with_redis, with_redis=with_redis,
with_minio=with_minio, with_minio=with_minio,
with_cassandra=with_cassandra, with_cassandra=with_cassandra,
with_jdbc_bridge=with_jdbc_bridge,
server_bin_path=self.server_bin_path, server_bin_path=self.server_bin_path,
odbc_bridge_bin_path=self.odbc_bridge_bin_path, odbc_bridge_bin_path=self.odbc_bridge_bin_path,
library_bridge_bin_path=self.library_bridge_bin_path, library_bridge_bin_path=self.library_bridge_bin_path,
@ -826,6 +836,9 @@ class ClickHouseCluster:
if with_cassandra and not self.with_cassandra: if with_cassandra and not self.with_cassandra:
cmds.append(self.setup_cassandra_cmd(instance, env_variables, docker_compose_yml_dir)) cmds.append(self.setup_cassandra_cmd(instance, env_variables, docker_compose_yml_dir))
if with_jdbc_bridge and not self.with_jdbc_bridge:
cmds.append(self.setup_jdbc_bridge_cmd(instance, env_variables, docker_compose_yml_dir))
logging.debug("Cluster name:{} project_name:{}. Added instance name:{} tag:{} base_cmd:{} docker_compose_yml_dir:{}".format( logging.debug("Cluster name:{} project_name:{}. Added instance name:{} tag:{} base_cmd:{} docker_compose_yml_dir:{}".format(
self.name, self.project_name, name, tag, self.base_cmd, docker_compose_yml_dir)) self.name, self.project_name, name, tag, self.base_cmd, docker_compose_yml_dir))
return instance return instance
@ -925,6 +938,33 @@ class ClickHouseCluster:
["bash", "-c", "echo {} | base64 --decode > {}".format(encodedStr, dest_path)], ["bash", "-c", "echo {} | base64 --decode > {}".format(encodedStr, dest_path)],
user='root') user='root')
def wait_for_url(self, url="http://localhost:8123/ping", conn_timeout=2, interval=2, timeout=60):
if not url.startswith('http'):
url = "http://" + url
if interval <= 0:
interval = 2
if timeout <= 0:
timeout = 60
attempts = 1
errors = []
start = time.time()
while time.time() - start < timeout:
try:
requests.get(url, allow_redirects=True, timeout=conn_timeout, verify=False).raise_for_status()
logging.debug("{} is available after {} seconds".format(url, time.time() - start))
return
except Exception as ex:
logging.debug("{} Attempt {} failed, retrying in {} seconds".format(ex, attempts, interval))
attempts += 1
errors += [str(ex)]
time.sleep(interval)
run_and_check(['docker-compose', 'ps', '--services', '--all'])
logging.error("Can't connect to URL:{}".format(errors))
raise Exception("Cannot wait URL {}(interval={}, timeout={}, attempts={})".format(
url, interval, timeout, attempts))
def wait_mysql_client_to_start(self, timeout=180): def wait_mysql_client_to_start(self, timeout=180):
start = time.time() start = time.time()
errors = [] errors = []
@ -1413,6 +1453,10 @@ class ClickHouseCluster:
subprocess_check_call(self.base_cassandra_cmd + ['up', '-d']) subprocess_check_call(self.base_cassandra_cmd + ['up', '-d'])
self.wait_cassandra_to_start() self.wait_cassandra_to_start()
if self.with_jdbc_bridge and self.base_jdbc_bridge_cmd:
subprocess_check_call(self.base_jdbc_bridge_cmd + ['up', '-d'])
self.wait_for_url("http://localhost:9020/ping")
clickhouse_start_cmd = self.base_cmd + ['up', '-d', '--no-recreate'] clickhouse_start_cmd = self.base_cmd + ['up', '-d', '--no-recreate']
logging.debug(("Trying to create ClickHouse instance by command %s", ' '.join(map(str, clickhouse_start_cmd)))) logging.debug(("Trying to create ClickHouse instance by command %s", ' '.join(map(str, clickhouse_start_cmd))))
self.up_called = True self.up_called = True
@ -1610,7 +1654,7 @@ class ClickHouseInstance:
self, cluster, base_path, name, base_config_dir, custom_main_configs, custom_user_configs, self, cluster, base_path, name, base_config_dir, custom_main_configs, custom_user_configs,
custom_dictionaries, custom_dictionaries,
macros, with_zookeeper, zookeeper_config_path, with_mysql_client, with_mysql, with_mysql8, with_mysql_cluster, with_kafka, with_kerberized_kafka, macros, with_zookeeper, zookeeper_config_path, with_mysql_client, with_mysql, with_mysql8, with_mysql_cluster, with_kafka, with_kerberized_kafka,
with_rabbitmq, with_kerberized_hdfs, with_mongo, with_redis, with_minio, with_rabbitmq, with_kerberized_hdfs, with_mongo, with_redis, with_minio, with_jdbc_bridge,
with_cassandra, server_bin_path, odbc_bridge_bin_path, library_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers, with_postgres, with_postgres_cluster, with_cassandra, server_bin_path, odbc_bridge_bin_path, library_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers, with_postgres, with_postgres_cluster,
clickhouse_start_command=CLICKHOUSE_START_COMMAND, clickhouse_start_command=CLICKHOUSE_START_COMMAND,
main_config_name="config.xml", users_config_name="users.xml", copy_common_configs=True, main_config_name="config.xml", users_config_name="users.xml", copy_common_configs=True,
@ -1653,6 +1697,7 @@ class ClickHouseInstance:
self.with_redis = with_redis self.with_redis = with_redis
self.with_minio = with_minio self.with_minio = with_minio
self.with_cassandra = with_cassandra self.with_cassandra = with_cassandra
self.with_jdbc_bridge = with_jdbc_bridge
self.main_config_name = main_config_name self.main_config_name = main_config_name
self.users_config_name = users_config_name self.users_config_name = users_config_name

View File

@ -0,0 +1,7 @@
<?xml version="1.0"?>
<yandex>
<jdbc_bridge>
<host>bridge1</host>
<port>9019</port>
</jdbc_bridge>
</yandex>

View File

@ -0,0 +1,111 @@
import contextlib
import os.path as p
import pytest
import time
import uuid
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
from string import Template
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance("instance", main_configs=["configs/jdbc_bridge.xml"], with_jdbc_bridge=True)
datasource = "self"
records = 1000
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
instance.query('''
CREATE DATABASE test;
CREATE TABLE test.ClickHouseTable(Num UInt32, Str String, Desc Nullable(String)) engine = Memory;
INSERT INTO test.ClickHouseTable(Num, Str)
SELECT number, toString(number) FROM system.numbers LIMIT {};
'''.format(records))
yield cluster
finally:
cluster.shutdown()
def test_jdbc_query(started_cluster):
"""Test simple query with inline schema and query parameters"""
expected = "{}\t{}".format(datasource, records)
actual = instance.query('''
SELECT * FROM jdbc(
'{}?datasource_column&fetch_size=1',
'rows UInt32',
'SELECT count(1) AS rows FROM test.ClickHouseTable'
)
'''.format(datasource))
assert TSV(actual) == TSV(expected), "expecting {} but got {}".format(expected, actual)
def test_jdbc_distributed_query(started_cluster):
"""Test distributed query involving both JDBC table function and ClickHouse table"""
actual = instance.query('''
SELECT a.Num + 1
FROM jdbc('{0}', 'SELECT * FROM test.ClickHouseTable') a
INNER JOIN jdbc('{0}', 'num UInt32', 'SELECT {1} - 1 AS num') b
on a.Num = b.num
INNER JOIN test.ClickHouseTable c on b.num = c.Num
'''.format(datasource, records))
assert int(actual) == records, "expecting {} but got {}".format(records, actual)
def test_jdbc_insert(started_cluster):
"""Test insert query using JDBC table function"""
instance.query('''
CREATE TABLE test.test_insert engine = Memory AS
SELECT * FROM test.ClickHouseTable;
SELECT *
FROM jdbc('{0}?mutation', 'INSERT INTO test.test_insert VALUES({1}, ''{1}'', ''{1}'')');
'''.format(datasource, records))
expected = records
actual = instance.query(
"SELECT Desc FROM jdbc('{}', 'SELECT * FROM test.test_insert WHERE Num = {}')".format(datasource, records))
assert int(actual) == expected, "expecting {} but got {}".format(records, actual)
def test_jdbc_update(started_cluster):
"""Test update query using JDBC table function"""
secrets = str(uuid.uuid1())
instance.query('''
CREATE TABLE test.test_update engine = Memory AS
SELECT * FROM test.ClickHouseTable;
SELECT *
FROM jdbc(
'{}?mutation',
'SET mutations_sync = 1; ALTER TABLE test.test_update UPDATE Str=''{}'' WHERE Num = {} - 1;'
)
'''.format(datasource, secrets, records))
actual = instance.query('''
SELECT Str
FROM jdbc('{}', 'SELECT * FROM test.test_update WHERE Num = {} - 1')
'''.format(datasource, records))
assert TSV(actual) == TSV(secrets), "expecting {} but got {}".format(secrets, actual)
def test_jdbc_delete(started_cluster):
"""Test delete query using JDBC table function"""
instance.query('''
CREATE TABLE test.test_delete engine = Memory AS
SELECT * FROM test.ClickHouseTable;
SELECT *
FROM jdbc(
'{}?mutation',
'SET mutations_sync = 1; ALTER TABLE test.test_delete DELETE WHERE Num < {} - 1;'
)
'''.format(datasource, records))
expected = records - 1
actual = instance.query(
"SELECT Str FROM jdbc('{}', 'SELECT * FROM test.test_delete')".format(datasource, records))
assert int(actual) == expected, "expecting {} but got {}".format(expected, actual)
def test_jdbc_table_engine(started_cluster):
"""Test query against a JDBC table"""
actual = instance.query('''
CREATE TABLE test.jdbc_table(Str String)
ENGINE = JDBC('{}', 'test', 'ClickHouseTable');
SELECT count(1) FROM test.jdbc_table;
'''.format(datasource))
assert int(actual) == records, "expecting {} but got {}".format(records, actual)