ClickHouse/dbms/tests/integration/test_odbc_interaction/test.py

120 lines
5.4 KiB
Python
Raw Normal View History

2018-08-22 15:42:27 +00:00
import time
import pytest
2018-09-13 11:38:20 +00:00
import os
2018-08-22 16:14:51 +00:00
import pymysql.cursors
2018-08-22 15:42:27 +00:00
from helpers.cluster import ClickHouseCluster
2018-09-13 11:38:20 +00:00
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs'))
2018-09-13 10:12:11 +00:00
node1 = cluster.add_instance('node1', with_odbc_drivers=True, with_mysql=True, image='alesapin/ubuntu_with_odbc:14.04', main_configs=['configs/dictionaries/sqlite3_odbc_hashed_dictionary.xml', 'configs/dictionaries/sqlite3_odbc_cached_dictionary.xml'])
2018-08-22 15:42:27 +00:00
2018-08-22 16:14:51 +00:00
create_table_sql_template = """
CREATE TABLE `clickhouse`.`{}` (
`id` int(11) NOT NULL,
`name` varchar(50) NOT NULL,
`age` int NOT NULL default 0,
`money` int NOT NULL default 0,
PRIMARY KEY (`id`)) ENGINE=InnoDB;
"""
def get_mysql_conn():
conn = pymysql.connect(user='root', password='clickhouse', host='127.0.0.1', port=3308)
return conn
def create_mysql_db(conn, name):
with conn.cursor() as cursor:
cursor.execute(
"CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(name))
def create_mysql_table(conn, table_name):
with conn.cursor() as cursor:
cursor.execute(create_table_sql_template.format(table_name))
2018-08-22 15:42:27 +00:00
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
2018-09-13 10:12:11 +00:00
sqlite_db = node1.odbc_drivers["SQLite3"]["Database"]
2018-08-22 15:42:27 +00:00
2018-09-13 10:12:11 +00:00
node1.exec_in_container(["bash", "-c", "echo 'CREATE TABLE t1(x INTEGER PRIMARY KEY ASC, y, z);' | sqlite3 {}".format(sqlite_db)], privileged=True, user='root')
node1.exec_in_container(["bash", "-c", "echo 'CREATE TABLE t2(X INTEGER PRIMARY KEY ASC, Y, Z);' | sqlite3 {}".format(sqlite_db)], privileged=True, user='root')
node1.exec_in_container(["bash", "-c", "echo 'CREATE TABLE t3(X INTEGER PRIMARY KEY ASC, Y, Z);' | sqlite3 {}".format(sqlite_db)], privileged=True, user='root')
2018-08-22 16:14:51 +00:00
conn = get_mysql_conn()
## create mysql db and table
create_mysql_db(conn, 'clickhouse')
2018-08-22 15:42:27 +00:00
yield cluster
2018-09-13 11:38:20 +00:00
except Exception as ex:
print(ex)
2018-08-22 15:42:27 +00:00
finally:
cluster.shutdown()
2018-08-22 16:14:51 +00:00
def test_mysql_simple_select_works(started_cluster):
2018-08-22 15:42:27 +00:00
mysql_setup = node1.odbc_drivers["MySQL"]
2018-08-22 16:14:51 +00:00
table_name = 'test_insert_select'
conn = get_mysql_conn()
create_mysql_table(conn, table_name)
node1.query('''
CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL('mysql1:3306', 'clickhouse', '{}', 'root', 'clickhouse');
'''.format(table_name, table_name))
node1.query("INSERT INTO {}(id, name, money) select number, concat('name_', toString(number)), 3 from numbers(100) ".format(table_name))
2018-08-22 15:42:27 +00:00
# actually, I don't know, what wrong with that connection string, but libmyodbc always falls into segfault
2018-08-22 16:14:51 +00:00
node1.query("SELECT * FROM odbc('DSN={}', '{}')".format(mysql_setup["DSN"], table_name), ignore_error=True)
2018-08-22 15:42:27 +00:00
2018-08-22 16:14:51 +00:00
# server still works after segfault
2018-08-22 15:42:27 +00:00
assert node1.query("select 1") == "1\n"
2018-08-22 16:14:51 +00:00
conn.close()
def test_sqlite_simple_select_works(started_cluster):
2018-08-22 15:42:27 +00:00
sqlite_setup = node1.odbc_drivers["SQLite3"]
sqlite_db = sqlite_setup["Database"]
node1.exec_in_container(["bash", "-c", "echo 'INSERT INTO t1 values(1, 2, 3);' | sqlite3 {}".format(sqlite_db)], privileged=True, user='root')
assert node1.query("select * from odbc('DSN={}', '{}')".format(sqlite_setup["DSN"], 't1')) == "1\t2\t3\n"
2018-09-13 10:12:11 +00:00
def test_sqlite_odbc_hashed_dictionary(started_cluster):
sqlite_db = node1.odbc_drivers["SQLite3"]["Database"]
node1.exec_in_container(["bash", "-c", "echo 'INSERT INTO t2 values(1, 2, 3);' | sqlite3 {}".format(sqlite_db)], privileged=True, user='root')
assert node1.query("select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(1))") == "3\n"
assert node1.query("select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(200))") == "1\n" # default
time.sleep(5) # first reload
node1.exec_in_container(["bash", "-c", "echo 'INSERT INTO t2 values(200, 2, 7);' | sqlite3 {}".format(sqlite_db)], privileged=True, user='root')
# No reload because of invalidate query
time.sleep(5)
assert node1.query("select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(1))") == "3\n"
assert node1.query("select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(200))") == "1\n" # still default
node1.exec_in_container(["bash", "-c", "echo 'REPLACE INTO t2 values(1, 2, 5);' | sqlite3 {}".format(sqlite_db)], privileged=True, user='root')
# waiting for reload
time.sleep(5)
assert node1.query("select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(1))") == "5\n"
assert node1.query("select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(200))") == "7\n" # new value
def test_sqlite_odbc_cached_dictionary(started_cluster):
sqlite_db = node1.odbc_drivers["SQLite3"]["Database"]
node1.exec_in_container(["bash", "-c", "echo 'INSERT INTO t3 values(1, 2, 3);' | sqlite3 {}".format(sqlite_db)], privileged=True, user='root')
assert node1.query("select dictGetUInt8('sqlite3_odbc_cached', 'Z', toUInt64(1))") == "3\n"
node1.exec_in_container(["bash", "-c", "echo 'INSERT INTO t3 values(200, 2, 7);' | sqlite3 {}".format(sqlite_db)], privileged=True, user='root')
assert node1.query("select dictGetUInt8('sqlite3_odbc_cached', 'Z', toUInt64(200))") == "7\n" # new value
node1.exec_in_container(["bash", "-c", "echo 'REPLACE INTO t3 values(1, 2, 12);' | sqlite3 {}".format(sqlite_db)], privileged=True, user='root')
time.sleep(5)
assert node1.query("select dictGetUInt8('sqlite3_odbc_cached', 'Z', toUInt64(1))") == "12\n"