2018-08-22 15:42:27 +00:00
import time
import pytest
2018-09-13 11:38:20 +00:00
import os
2018-08-22 16:14:51 +00:00
import pymysql . cursors
2018-10-15 14:49:23 +00:00
import psycopg2
from psycopg2 . extensions import ISOLATION_LEVEL_AUTOCOMMIT
2018-08-22 15:42:27 +00:00
from helpers . cluster import ClickHouseCluster
2018-09-13 11:38:20 +00:00
SCRIPT_DIR = os . path . dirname ( os . path . realpath ( __file__ ) )
cluster = ClickHouseCluster ( __file__ , base_configs_dir = os . path . join ( SCRIPT_DIR , ' configs ' ) )
2019-12-12 15:10:09 +00:00
node1 = cluster . add_instance ( ' node1 ' , with_odbc_drivers = True , with_mysql = True , image = ' yandex/clickhouse-integration-test ' , main_configs = [ ' configs/dictionaries/sqlite3_odbc_hashed_dictionary.xml ' , ' configs/dictionaries/sqlite3_odbc_cached_dictionary.xml ' , ' configs/dictionaries/postgres_odbc_hashed_dictionary.xml ' ] , stay_alive = True )
2018-08-22 15:42:27 +00:00
2018-08-22 16:14:51 +00:00
create_table_sql_template = """
CREATE TABLE ` clickhouse ` . ` { } ` (
` id ` int ( 11 ) NOT NULL ,
` name ` varchar ( 50 ) NOT NULL ,
` age ` int NOT NULL default 0 ,
` money ` int NOT NULL default 0 ,
2019-10-21 09:13:33 +00:00
` column_x ` int default NULL ,
2018-08-22 16:14:51 +00:00
PRIMARY KEY ( ` id ` ) ) ENGINE = InnoDB ;
"""
def get_mysql_conn ( ) :
conn = pymysql . connect ( user = ' root ' , password = ' clickhouse ' , host = ' 127.0.0.1 ' , port = 3308 )
return conn
def create_mysql_db ( conn , name ) :
with conn . cursor ( ) as cursor :
cursor . execute (
" CREATE DATABASE {} DEFAULT CHARACTER SET ' utf8 ' " . format ( name ) )
def create_mysql_table ( conn , table_name ) :
with conn . cursor ( ) as cursor :
cursor . execute ( create_table_sql_template . format ( table_name ) )
2018-10-15 14:49:23 +00:00
def get_postgres_conn ( ) :
conn_string = " host= ' localhost ' user= ' postgres ' password= ' mysecretpassword ' "
conn = psycopg2 . connect ( conn_string )
conn . set_isolation_level ( ISOLATION_LEVEL_AUTOCOMMIT )
2018-10-16 10:27:21 +00:00
conn . autocommit = True
2018-10-15 14:49:23 +00:00
return conn
def create_postgres_db ( conn , name ) :
2018-10-16 09:14:54 +00:00
cursor = conn . cursor ( )
cursor . execute ( " CREATE SCHEMA {} " . format ( name ) )
2018-10-15 14:49:23 +00:00
2018-08-22 15:42:27 +00:00
@pytest.fixture ( scope = " module " )
def started_cluster ( ) :
try :
cluster . start ( )
2018-10-15 20:56:01 +00:00
sqlite_db = node1 . odbc_drivers [ " SQLite3 " ] [ " Database " ]
2018-08-22 15:42:27 +00:00
2018-10-15 20:56:01 +00:00
print " sqlite data received "
2018-09-13 10:12:11 +00:00
node1 . exec_in_container ( [ " bash " , " -c " , " echo ' CREATE TABLE t1(x INTEGER PRIMARY KEY ASC, y, z); ' | sqlite3 {} " . format ( sqlite_db ) ] , privileged = True , user = ' root ' )
node1 . exec_in_container ( [ " bash " , " -c " , " echo ' CREATE TABLE t2(X INTEGER PRIMARY KEY ASC, Y, Z); ' | sqlite3 {} " . format ( sqlite_db ) ] , privileged = True , user = ' root ' )
node1 . exec_in_container ( [ " bash " , " -c " , " echo ' CREATE TABLE t3(X INTEGER PRIMARY KEY ASC, Y, Z); ' | sqlite3 {} " . format ( sqlite_db ) ] , privileged = True , user = ' root ' )
2018-09-22 14:58:03 +00:00
node1 . exec_in_container ( [ " bash " , " -c " , " echo ' CREATE TABLE t4(X INTEGER PRIMARY KEY ASC, Y, Z); ' | sqlite3 {} " . format ( sqlite_db ) ] , privileged = True , user = ' root ' )
2018-10-15 20:56:01 +00:00
print " sqlite tables created "
2018-10-15 14:49:23 +00:00
mysql_conn = get_mysql_conn ( )
2018-10-15 20:56:01 +00:00
print " mysql connection received "
2018-08-22 16:14:51 +00:00
## create mysql db and table
2018-10-15 14:49:23 +00:00
create_mysql_db ( mysql_conn , ' clickhouse ' )
2018-10-15 20:56:01 +00:00
print " mysql database created "
2018-10-15 14:49:23 +00:00
postgres_conn = get_postgres_conn ( )
2018-10-15 20:56:01 +00:00
print " postgres connection received "
2018-10-15 14:49:23 +00:00
create_postgres_db ( postgres_conn , ' clickhouse ' )
2018-10-15 20:56:01 +00:00
print " postgres db created "
2018-08-22 16:14:51 +00:00
2018-10-16 10:27:21 +00:00
cursor = postgres_conn . cursor ( )
cursor . execute ( " create table if not exists clickhouse.test_table (column1 int primary key, column2 varchar(40) not null) " )
2018-08-22 15:42:27 +00:00
yield cluster
2018-09-13 11:38:20 +00:00
except Exception as ex :
print ( ex )
2018-10-15 20:56:01 +00:00
raise ex
2018-08-22 15:42:27 +00:00
finally :
cluster . shutdown ( )
2018-08-22 16:14:51 +00:00
def test_mysql_simple_select_works ( started_cluster ) :
2018-08-22 15:42:27 +00:00
mysql_setup = node1 . odbc_drivers [ " MySQL " ]
2018-08-22 16:14:51 +00:00
table_name = ' test_insert_select '
conn = get_mysql_conn ( )
create_mysql_table ( conn , table_name )
2019-10-21 09:13:33 +00:00
# Check that NULL-values are handled correctly by the ODBC-bridge
with conn . cursor ( ) as cursor :
cursor . execute ( " INSERT INTO clickhouse. {} VALUES(50, ' null-guy ' , 127, 255, NULL), (100, ' non-null-guy ' , 127, 255, 511); " . format ( table_name ) )
conn . commit ( )
2020-04-02 08:51:21 +00:00
assert node1 . query ( " SELECT column_x FROM odbc( ' DSN= {} ' , ' {} ' ) " . format ( mysql_setup [ " DSN " ] , table_name ) , settings = { " external_table_functions_use_nulls " : " 1 " } ) == ' \\ N \n 511 \n '
assert node1 . query ( " SELECT column_x FROM odbc( ' DSN= {} ' , ' {} ' ) " . format ( mysql_setup [ " DSN " ] , table_name ) , settings = { " external_table_functions_use_nulls " : " 0 " } ) == ' 0 \n 511 \n '
2019-10-21 09:13:33 +00:00
2018-08-22 16:14:51 +00:00
node1 . query ( '''
2019-10-21 09:13:33 +00:00
CREATE TABLE { } ( id UInt32 , name String , age UInt32 , money UInt32 , column_x Nullable ( UInt32 ) ) ENGINE = MySQL ( ' mysql1:3306 ' , ' clickhouse ' , ' {} ' , ' root ' , ' clickhouse ' ) ;
2018-08-22 16:14:51 +00:00
''' .format(table_name, table_name))
2019-10-21 09:13:33 +00:00
node1 . query ( " INSERT INTO {} (id, name, money, column_x) select number, concat( ' name_ ' , toString(number)), 3, NULL from numbers(49) " . format ( table_name ) )
node1 . query ( " INSERT INTO {} (id, name, money, column_x) select number, concat( ' name_ ' , toString(number)), 3, 42 from numbers(51, 49) " . format ( table_name ) )
2018-08-22 16:14:51 +00:00
2019-10-21 09:13:33 +00:00
assert node1 . query ( " SELECT COUNT () FROM {} WHERE column_x IS NOT NULL " . format ( table_name ) ) == ' 50 \n '
assert node1 . query ( " SELECT COUNT () FROM {} WHERE column_x IS NULL " . format ( table_name ) ) == ' 50 \n '
2019-01-30 18:48:20 +00:00
assert node1 . query ( " SELECT count(*) FROM odbc( ' DSN= {} ' , ' {} ' ) " . format ( mysql_setup [ " DSN " ] , table_name ) ) == ' 100 \n '
2018-08-22 15:42:27 +00:00
2019-01-30 18:48:20 +00:00
# previously this test fails with segfault
# just to be sure :)
2018-08-22 15:42:27 +00:00
assert node1 . query ( " select 1 " ) == " 1 \n "
2018-08-22 16:14:51 +00:00
conn . close ( )
2020-05-14 21:51:21 +00:00
def test_mysql_insert ( started_cluster ) :
mysql_setup = node1 . odbc_drivers [ " MySQL " ]
table_name = ' test_insert '
conn = get_mysql_conn ( )
create_mysql_table ( conn , table_name )
odbc_args = " ' DSN= {} ' , ' {} ' , ' {} ' " . format ( mysql_setup [ " DSN " ] , mysql_setup [ " Database " ] , table_name )
node1 . query ( " create table mysql_insert (id Int64, name String, age UInt8, money Float, column_x Nullable(Int16)) Engine=ODBC( {} ) " . format ( odbc_args ) )
node1 . query ( " insert into mysql_insert values (1, ' test ' , 11, 111, 1111), (2, ' odbc ' , 22, 222, NULL) " )
assert node1 . query ( " select * from mysql_insert " ) == " 1 \t test \t 11 \t 111 \t 1111 \n 2 \t odbc \t 22 \t 222 \t \\ N \n "
node1 . query ( " insert into table function odbc( {} ) values (3, ' insert ' , 33, 333, 3333) " . format ( odbc_args ) )
node1 . query ( " insert into table function odbc( {} ) (id, name, age, money) select id*4, upper(name), age*4, money*4 from odbc( {} ) where id=1 " . format ( odbc_args , odbc_args ) )
assert node1 . query ( " select * from mysql_insert where id in (3, 4) " ) == " 3 \t insert \t 33 \t 333 \t 3333 \n 4 \t TEST \t 44 \t 444 \t \\ N \n "
2018-09-22 14:58:03 +00:00
def test_sqlite_simple_select_function_works ( started_cluster ) :
2018-08-22 15:42:27 +00:00
sqlite_setup = node1 . odbc_drivers [ " SQLite3 " ]
sqlite_db = sqlite_setup [ " Database " ]
node1 . exec_in_container ( [ " bash " , " -c " , " echo ' INSERT INTO t1 values(1, 2, 3); ' | sqlite3 {} " . format ( sqlite_db ) ] , privileged = True , user = ' root ' )
assert node1 . query ( " select * from odbc( ' DSN= {} ' , ' {} ' ) " . format ( sqlite_setup [ " DSN " ] , ' t1 ' ) ) == " 1 \t 2 \t 3 \n "
2018-09-13 10:12:11 +00:00
2018-09-22 14:58:03 +00:00
assert node1 . query ( " select y from odbc( ' DSN= {} ' , ' {} ' ) " . format ( sqlite_setup [ " DSN " ] , ' t1 ' ) ) == " 2 \n "
assert node1 . query ( " select z from odbc( ' DSN= {} ' , ' {} ' ) " . format ( sqlite_setup [ " DSN " ] , ' t1 ' ) ) == " 3 \n "
assert node1 . query ( " select x from odbc( ' DSN= {} ' , ' {} ' ) " . format ( sqlite_setup [ " DSN " ] , ' t1 ' ) ) == " 1 \n "
assert node1 . query ( " select x, y from odbc( ' DSN= {} ' , ' {} ' ) " . format ( sqlite_setup [ " DSN " ] , ' t1 ' ) ) == " 1 \t 2 \n "
assert node1 . query ( " select z, x, y from odbc( ' DSN= {} ' , ' {} ' ) " . format ( sqlite_setup [ " DSN " ] , ' t1 ' ) ) == " 3 \t 1 \t 2 \n "
assert node1 . query ( " select count(), sum(x) from odbc( ' DSN= {} ' , ' {} ' ) group by x " . format ( sqlite_setup [ " DSN " ] , ' t1 ' ) ) == " 1 \t 1 \n "
def test_sqlite_simple_select_storage_works ( started_cluster ) :
sqlite_setup = node1 . odbc_drivers [ " SQLite3 " ]
sqlite_db = sqlite_setup [ " Database " ]
node1 . exec_in_container ( [ " bash " , " -c " , " echo ' INSERT INTO t4 values(1, 2, 3); ' | sqlite3 {} " . format ( sqlite_db ) ] , privileged = True , user = ' root ' )
node1 . query ( " create table SqliteODBC (x Int32, y String, z String) engine = ODBC( ' DSN= {} ' , ' ' , ' t4 ' ) " . format ( sqlite_setup [ " DSN " ] ) )
assert node1 . query ( " select * from SqliteODBC " ) == " 1 \t 2 \t 3 \n "
assert node1 . query ( " select y from SqliteODBC " ) == " 2 \n "
assert node1 . query ( " select z from SqliteODBC " ) == " 3 \n "
assert node1 . query ( " select x from SqliteODBC " ) == " 1 \n "
assert node1 . query ( " select x, y from SqliteODBC " ) == " 1 \t 2 \n "
assert node1 . query ( " select z, x, y from SqliteODBC " ) == " 3 \t 1 \t 2 \n "
assert node1 . query ( " select count(), sum(x) from SqliteODBC group by x " ) == " 1 \t 1 \n "
2018-09-13 10:12:11 +00:00
def test_sqlite_odbc_hashed_dictionary ( started_cluster ) :
sqlite_db = node1 . odbc_drivers [ " SQLite3 " ] [ " Database " ]
node1 . exec_in_container ( [ " bash " , " -c " , " echo ' INSERT INTO t2 values(1, 2, 3); ' | sqlite3 {} " . format ( sqlite_db ) ] , privileged = True , user = ' root ' )
assert node1 . query ( " select dictGetUInt8( ' sqlite3_odbc_hashed ' , ' Z ' , toUInt64(1)) " ) == " 3 \n "
assert node1 . query ( " select dictGetUInt8( ' sqlite3_odbc_hashed ' , ' Z ' , toUInt64(200)) " ) == " 1 \n " # default
time . sleep ( 5 ) # first reload
node1 . exec_in_container ( [ " bash " , " -c " , " echo ' INSERT INTO t2 values(200, 2, 7); ' | sqlite3 {} " . format ( sqlite_db ) ] , privileged = True , user = ' root ' )
# No reload because of invalidate query
time . sleep ( 5 )
assert node1 . query ( " select dictGetUInt8( ' sqlite3_odbc_hashed ' , ' Z ' , toUInt64(1)) " ) == " 3 \n "
assert node1 . query ( " select dictGetUInt8( ' sqlite3_odbc_hashed ' , ' Z ' , toUInt64(200)) " ) == " 1 \n " # still default
node1 . exec_in_container ( [ " bash " , " -c " , " echo ' REPLACE INTO t2 values(1, 2, 5); ' | sqlite3 {} " . format ( sqlite_db ) ] , privileged = True , user = ' root ' )
# waiting for reload
time . sleep ( 5 )
assert node1 . query ( " select dictGetUInt8( ' sqlite3_odbc_hashed ' , ' Z ' , toUInt64(1)) " ) == " 5 \n "
assert node1 . query ( " select dictGetUInt8( ' sqlite3_odbc_hashed ' , ' Z ' , toUInt64(200)) " ) == " 7 \n " # new value
def test_sqlite_odbc_cached_dictionary ( started_cluster ) :
sqlite_db = node1 . odbc_drivers [ " SQLite3 " ] [ " Database " ]
node1 . exec_in_container ( [ " bash " , " -c " , " echo ' INSERT INTO t3 values(1, 2, 3); ' | sqlite3 {} " . format ( sqlite_db ) ] , privileged = True , user = ' root ' )
assert node1 . query ( " select dictGetUInt8( ' sqlite3_odbc_cached ' , ' Z ' , toUInt64(1)) " ) == " 3 \n "
2020-05-14 21:51:21 +00:00
# Allow insert
node1 . exec_in_container ( [ " bash " , " -c " , " chmod a+rw /tmp " ] , privileged = True , user = ' root ' )
node1 . exec_in_container ( [ " bash " , " -c " , " chmod a+rw {} " . format ( sqlite_db ) ] , privileged = True , user = ' root ' )
node1 . query ( " insert into table function odbc( ' DSN= {} ; ' , ' ' , ' t3 ' ) values (200, 2, 7) " . format ( node1 . odbc_drivers [ " SQLite3 " ] [ " DSN " ] ) )
2018-09-13 10:12:11 +00:00
assert node1 . query ( " select dictGetUInt8( ' sqlite3_odbc_cached ' , ' Z ' , toUInt64(200)) " ) == " 7 \n " # new value
node1 . exec_in_container ( [ " bash " , " -c " , " echo ' REPLACE INTO t3 values(1, 2, 12); ' | sqlite3 {} " . format ( sqlite_db ) ] , privileged = True , user = ' root ' )
time . sleep ( 5 )
assert node1 . query ( " select dictGetUInt8( ' sqlite3_odbc_cached ' , ' Z ' , toUInt64(1)) " ) == " 12 \n "
2018-10-15 14:49:23 +00:00
def test_postgres_odbc_hached_dictionary_with_schema ( started_cluster ) :
conn = get_postgres_conn ( )
cursor = conn . cursor ( )
cursor . execute ( " insert into clickhouse.test_table values(1, ' hello ' ),(2, ' world ' ) " )
time . sleep ( 5 )
assert node1 . query ( " select dictGetString( ' postgres_odbc_hashed ' , ' column2 ' , toUInt64(1)) " ) == " hello \n "
assert node1 . query ( " select dictGetString( ' postgres_odbc_hashed ' , ' column2 ' , toUInt64(2)) " ) == " world \n "
2018-11-22 15:59:00 +00:00
2018-12-17 20:11:19 +00:00
def test_postgres_odbc_hached_dictionary_no_tty_pipe_overflow ( started_cluster ) :
conn = get_postgres_conn ( )
cursor = conn . cursor ( )
cursor . execute ( " insert into clickhouse.test_table values(3, ' xxx ' ) " )
for i in xrange ( 100 ) :
try :
node1 . query ( " system reload dictionary postgres_odbc_hashed " , timeout = 5 )
except Exception as ex :
assert False , " Exception occured -- odbc-bridge hangs: " + str ( ex )
assert node1 . query ( " select dictGetString( ' postgres_odbc_hashed ' , ' column2 ' , toUInt64(3)) " ) == " xxx \n "
2020-05-13 19:47:35 +00:00
def test_postgres_insert ( started_cluster ) :
conn = get_postgres_conn ( )
conn . cursor ( ) . execute ( " truncate table clickhouse.test_table " )
2020-06-21 22:12:35 +00:00
2020-06-22 13:31:16 +00:00
# Also test with Servername containing '.' and '-' symbols (defined in
# postgres .yml file). This is needed to check parsing, validation and
# reconstruction of connection string.
2020-06-21 22:12:35 +00:00
node1 . query ( " create table pg_insert (column1 UInt8, column2 String) engine=ODBC( ' DSN=postgresql_odbc;Servername=postgre-sql.local ' , ' clickhouse ' , ' test_table ' ) " )
2020-05-13 19:47:35 +00:00
node1 . query ( " insert into pg_insert values (1, ' hello ' ), (2, ' world ' ) " )
assert node1 . query ( " select * from pg_insert " ) == ' 1 \t hello \n 2 \t world \n '
2020-05-14 21:51:21 +00:00
node1 . query ( " insert into table function odbc( ' DSN=postgresql_odbc; ' , ' clickhouse ' , ' test_table ' ) format CSV 3,test " )
2020-06-21 22:12:35 +00:00
node1 . query ( " insert into table function odbc( ' DSN=postgresql_odbc;Servername=postgre-sql.local ' , ' clickhouse ' , ' test_table ' ) select number, ' s ' || toString(number) from numbers (4, 7) " )
2020-05-14 21:51:21 +00:00
assert node1 . query ( " select sum(column1), count(column1) from pg_insert " ) == " 55 \t 10 \n "
assert node1 . query ( " select sum(n), count(n) from (select (*,).1 as n from (select * from odbc( ' DSN=postgresql_odbc; ' , ' clickhouse ' , ' test_table ' ))) " ) == " 55 \t 10 \n "
2020-05-13 19:47:35 +00:00
2018-11-22 15:59:00 +00:00
def test_bridge_dies_with_parent ( started_cluster ) :
node1 . query ( " select dictGetString( ' postgres_odbc_hashed ' , ' column2 ' , toUInt64(1)) " )
2019-06-21 08:03:13 +00:00
clickhouse_pid = node1 . get_process_pid ( " clickhouse server " )
bridge_pid = node1 . get_process_pid ( " odbc-bridge " )
2018-11-22 15:59:00 +00:00
assert clickhouse_pid is not None
assert bridge_pid is not None
2018-11-23 08:08:35 +00:00
while clickhouse_pid is not None :
try :
node1 . exec_in_container ( [ " bash " , " -c " , " kill {} " . format ( clickhouse_pid ) ] , privileged = True , user = ' root ' )
except :
pass
2019-06-21 08:03:13 +00:00
clickhouse_pid = node1 . get_process_pid ( " clickhouse server " )
2018-11-23 08:08:35 +00:00
time . sleep ( 1 )
2020-07-08 07:36:35 +00:00
for i in range ( 5 ) :
time . sleep ( 1 ) # just for sure, that odbc-bridge caught signal
bridge_pid = node1 . get_process_pid ( " odbc-bridge " )
if bridge_pid is None :
break
2018-11-22 15:59:00 +00:00
2019-12-09 18:32:03 +00:00
if bridge_pid :
out = node1 . exec_in_container ( [ " gdb " , " -p " , str ( bridge_pid ) , " --ex " , " thread apply all bt " , " --ex " , " q " ] , privileged = True , user = ' root ' )
print ( " Bridge is running, gdb output: " )
print ( out )
2018-11-22 15:59:00 +00:00
assert clickhouse_pid is None
assert bridge_pid is None