mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-18 20:32:43 +00:00
508 lines
23 KiB
Python
508 lines
23 KiB
Python
import time
|
|
|
|
import psycopg2
|
|
import pymysql.cursors
|
|
import pytest
|
|
from helpers.cluster import ClickHouseCluster
|
|
from helpers.test_tools import assert_eq_with_retry
|
|
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
|
|
from multiprocessing.dummy import Pool
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
node1 = cluster.add_instance('node1', with_odbc_drivers=True, with_mysql=True,
|
|
main_configs=['configs/openssl.xml', 'configs/odbc_logging.xml',
|
|
'configs/enable_dictionaries.xml',
|
|
'configs/dictionaries/sqlite3_odbc_hashed_dictionary.xml',
|
|
'configs/dictionaries/sqlite3_odbc_cached_dictionary.xml',
|
|
'configs/dictionaries/postgres_odbc_hashed_dictionary.xml'], stay_alive=True)
|
|
|
|
create_table_sql_template = """
|
|
CREATE TABLE `clickhouse`.`{}` (
|
|
`id` int(11) NOT NULL,
|
|
`name` varchar(50) NOT NULL,
|
|
`age` int NOT NULL default 0,
|
|
`money` int NOT NULL default 0,
|
|
`column_x` int default NULL,
|
|
PRIMARY KEY (`id`)) ENGINE=InnoDB;
|
|
"""
|
|
|
|
|
|
def get_mysql_conn():
|
|
conn = pymysql.connect(user='root', password='clickhouse', host='127.0.0.1', port=3308)
|
|
return conn
|
|
|
|
|
|
def create_mysql_db(conn, name):
|
|
with conn.cursor() as cursor:
|
|
cursor.execute(
|
|
"CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(name))
|
|
|
|
|
|
def create_mysql_table(conn, table_name):
|
|
with conn.cursor() as cursor:
|
|
cursor.execute(create_table_sql_template.format(table_name))
|
|
|
|
|
|
def get_postgres_conn():
|
|
conn_string = "host='localhost' user='postgres' password='mysecretpassword'"
|
|
conn = psycopg2.connect(conn_string)
|
|
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
|
|
conn.autocommit = True
|
|
return conn
|
|
|
|
|
|
def create_postgres_db(conn, name):
|
|
cursor = conn.cursor()
|
|
cursor.execute("CREATE SCHEMA {}".format(name))
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def started_cluster():
|
|
try:
|
|
cluster.start()
|
|
sqlite_db = node1.odbc_drivers["SQLite3"]["Database"]
|
|
|
|
print("sqlite data received")
|
|
node1.exec_in_container(
|
|
["bash", "-c", "echo 'CREATE TABLE t1(x INTEGER PRIMARY KEY ASC, y, z);' | sqlite3 {}".format(sqlite_db)],
|
|
privileged=True, user='root')
|
|
node1.exec_in_container(
|
|
["bash", "-c", "echo 'CREATE TABLE t2(X INTEGER PRIMARY KEY ASC, Y, Z);' | sqlite3 {}".format(sqlite_db)],
|
|
privileged=True, user='root')
|
|
node1.exec_in_container(
|
|
["bash", "-c", "echo 'CREATE TABLE t3(X INTEGER PRIMARY KEY ASC, Y, Z);' | sqlite3 {}".format(sqlite_db)],
|
|
privileged=True, user='root')
|
|
node1.exec_in_container(
|
|
["bash", "-c", "echo 'CREATE TABLE t4(X INTEGER PRIMARY KEY ASC, Y, Z);' | sqlite3 {}".format(sqlite_db)],
|
|
privileged=True, user='root')
|
|
node1.exec_in_container(
|
|
["bash", "-c", "echo 'CREATE TABLE tf1(x INTEGER PRIMARY KEY ASC, y, z);' | sqlite3 {}".format(sqlite_db)],
|
|
privileged=True, user='root')
|
|
print("sqlite tables created")
|
|
mysql_conn = get_mysql_conn()
|
|
print("mysql connection received")
|
|
## create mysql db and table
|
|
create_mysql_db(mysql_conn, 'clickhouse')
|
|
print("mysql database created")
|
|
|
|
postgres_conn = get_postgres_conn()
|
|
print("postgres connection received")
|
|
|
|
create_postgres_db(postgres_conn, 'clickhouse')
|
|
print("postgres db created")
|
|
|
|
cursor = postgres_conn.cursor()
|
|
cursor.execute(
|
|
"create table if not exists clickhouse.test_table (column1 int primary key, column2 varchar(40) not null)")
|
|
|
|
yield cluster
|
|
|
|
except Exception as ex:
|
|
print(ex)
|
|
raise ex
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def test_mysql_simple_select_works(started_cluster):
|
|
mysql_setup = node1.odbc_drivers["MySQL"]
|
|
|
|
table_name = 'test_insert_select'
|
|
conn = get_mysql_conn()
|
|
create_mysql_table(conn, table_name)
|
|
|
|
# Check that NULL-values are handled correctly by the ODBC-bridge
|
|
with conn.cursor() as cursor:
|
|
cursor.execute(
|
|
"INSERT INTO clickhouse.{} VALUES(50, 'null-guy', 127, 255, NULL), (100, 'non-null-guy', 127, 255, 511);".format(
|
|
table_name))
|
|
conn.commit()
|
|
assert node1.query("SELECT column_x FROM odbc('DSN={}', '{}')".format(mysql_setup["DSN"], table_name),
|
|
settings={"external_table_functions_use_nulls": "1"}) == '\\N\n511\n'
|
|
assert node1.query("SELECT column_x FROM odbc('DSN={}', '{}')".format(mysql_setup["DSN"], table_name),
|
|
settings={"external_table_functions_use_nulls": "0"}) == '0\n511\n'
|
|
|
|
node1.query('''
|
|
CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32, column_x Nullable(UInt32)) ENGINE = MySQL('mysql1:3306', 'clickhouse', '{}', 'root', 'clickhouse');
|
|
'''.format(table_name, table_name))
|
|
|
|
node1.query(
|
|
"INSERT INTO {}(id, name, money, column_x) select number, concat('name_', toString(number)), 3, NULL from numbers(49) ".format(
|
|
table_name))
|
|
node1.query(
|
|
"INSERT INTO {}(id, name, money, column_x) select number, concat('name_', toString(number)), 3, 42 from numbers(51, 49) ".format(
|
|
table_name))
|
|
|
|
assert node1.query("SELECT COUNT () FROM {} WHERE column_x IS NOT NULL".format(table_name)) == '50\n'
|
|
assert node1.query("SELECT COUNT () FROM {} WHERE column_x IS NULL".format(table_name)) == '50\n'
|
|
assert node1.query("SELECT count(*) FROM odbc('DSN={}', '{}')".format(mysql_setup["DSN"], table_name)) == '100\n'
|
|
|
|
# previously this test fails with segfault
|
|
# just to be sure :)
|
|
assert node1.query("select 1") == "1\n"
|
|
|
|
conn.close()
|
|
|
|
|
|
def test_mysql_insert(started_cluster):
|
|
mysql_setup = node1.odbc_drivers["MySQL"]
|
|
table_name = 'test_insert'
|
|
conn = get_mysql_conn()
|
|
create_mysql_table(conn, table_name)
|
|
odbc_args = "'DSN={}', '{}', '{}'".format(mysql_setup["DSN"], mysql_setup["Database"], table_name)
|
|
|
|
node1.query(
|
|
"create table mysql_insert (id Int64, name String, age UInt8, money Float, column_x Nullable(Int16)) Engine=ODBC({})".format(
|
|
odbc_args))
|
|
node1.query("insert into mysql_insert values (1, 'test', 11, 111, 1111), (2, 'odbc', 22, 222, NULL)")
|
|
assert node1.query("select * from mysql_insert") == "1\ttest\t11\t111\t1111\n2\todbc\t22\t222\t\\N\n"
|
|
|
|
node1.query("insert into table function odbc({}) values (3, 'insert', 33, 333, 3333)".format(odbc_args))
|
|
node1.query(
|
|
"insert into table function odbc({}) (id, name, age, money) select id*4, upper(name), age*4, money*4 from odbc({}) where id=1".format(
|
|
odbc_args, odbc_args))
|
|
assert node1.query(
|
|
"select * from mysql_insert where id in (3, 4)") == "3\tinsert\t33\t333\t3333\n4\tTEST\t44\t444\t\\N\n"
|
|
|
|
|
|
def test_sqlite_simple_select_function_works(started_cluster):
|
|
sqlite_setup = node1.odbc_drivers["SQLite3"]
|
|
sqlite_db = sqlite_setup["Database"]
|
|
|
|
node1.exec_in_container(["bash", "-c", "echo 'INSERT INTO t1 values(1, 2, 3);' | sqlite3 {}".format(sqlite_db)],
|
|
privileged=True, user='root')
|
|
assert node1.query("select * from odbc('DSN={}', '{}')".format(sqlite_setup["DSN"], 't1')) == "1\t2\t3\n"
|
|
|
|
assert node1.query("select y from odbc('DSN={}', '{}')".format(sqlite_setup["DSN"], 't1')) == "2\n"
|
|
assert node1.query("select z from odbc('DSN={}', '{}')".format(sqlite_setup["DSN"], 't1')) == "3\n"
|
|
assert node1.query("select x from odbc('DSN={}', '{}')".format(sqlite_setup["DSN"], 't1')) == "1\n"
|
|
assert node1.query("select x, y from odbc('DSN={}', '{}')".format(sqlite_setup["DSN"], 't1')) == "1\t2\n"
|
|
assert node1.query("select z, x, y from odbc('DSN={}', '{}')".format(sqlite_setup["DSN"], 't1')) == "3\t1\t2\n"
|
|
assert node1.query(
|
|
"select count(), sum(x) from odbc('DSN={}', '{}') group by x".format(sqlite_setup["DSN"], 't1')) == "1\t1\n"
|
|
|
|
def test_sqlite_table_function(started_cluster):
|
|
sqlite_setup = node1.odbc_drivers["SQLite3"]
|
|
sqlite_db = sqlite_setup["Database"]
|
|
|
|
node1.exec_in_container(["bash", "-c", "echo 'INSERT INTO tf1 values(1, 2, 3);' | sqlite3 {}".format(sqlite_db)],
|
|
privileged=True, user='root')
|
|
node1.query("create table odbc_tf as odbc('DSN={}', '{}')".format(sqlite_setup["DSN"], 'tf1'))
|
|
assert node1.query("select * from odbc_tf") == "1\t2\t3\n"
|
|
|
|
assert node1.query("select y from odbc_tf") == "2\n"
|
|
assert node1.query("select z from odbc_tf") == "3\n"
|
|
assert node1.query("select x from odbc_tf") == "1\n"
|
|
assert node1.query("select x, y from odbc_tf") == "1\t2\n"
|
|
assert node1.query("select z, x, y from odbc_tf") == "3\t1\t2\n"
|
|
assert node1.query("select count(), sum(x) from odbc_tf group by x") == "1\t1\n"
|
|
|
|
def test_sqlite_simple_select_storage_works(started_cluster):
|
|
sqlite_setup = node1.odbc_drivers["SQLite3"]
|
|
sqlite_db = sqlite_setup["Database"]
|
|
|
|
node1.exec_in_container(["bash", "-c", "echo 'INSERT INTO t4 values(1, 2, 3);' | sqlite3 {}".format(sqlite_db)],
|
|
privileged=True, user='root')
|
|
node1.query("create table SqliteODBC (x Int32, y String, z String) engine = ODBC('DSN={}', '', 't4')".format(
|
|
sqlite_setup["DSN"]))
|
|
|
|
assert node1.query("select * from SqliteODBC") == "1\t2\t3\n"
|
|
assert node1.query("select y from SqliteODBC") == "2\n"
|
|
assert node1.query("select z from SqliteODBC") == "3\n"
|
|
assert node1.query("select x from SqliteODBC") == "1\n"
|
|
assert node1.query("select x, y from SqliteODBC") == "1\t2\n"
|
|
assert node1.query("select z, x, y from SqliteODBC") == "3\t1\t2\n"
|
|
assert node1.query("select count(), sum(x) from SqliteODBC group by x") == "1\t1\n"
|
|
|
|
|
|
def test_sqlite_odbc_hashed_dictionary(started_cluster):
|
|
sqlite_db = node1.odbc_drivers["SQLite3"]["Database"]
|
|
node1.exec_in_container(["bash", "-c", "echo 'INSERT INTO t2 values(1, 2, 3);' | sqlite3 {}".format(sqlite_db)],
|
|
privileged=True, user='root')
|
|
|
|
node1.query("SYSTEM RELOAD DICTIONARY sqlite3_odbc_hashed")
|
|
first_update_time = node1.query("SELECT last_successful_update_time FROM system.dictionaries WHERE name = 'sqlite3_odbc_hashed'")
|
|
print("First update time", first_update_time)
|
|
|
|
assert_eq_with_retry(node1, "select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(1))", "3")
|
|
assert_eq_with_retry(node1, "select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(200))", "1") # default
|
|
|
|
second_update_time = node1.query("SELECT last_successful_update_time FROM system.dictionaries WHERE name = 'sqlite3_odbc_hashed'")
|
|
# Reloaded with new data
|
|
print("Second update time", second_update_time)
|
|
while first_update_time == second_update_time:
|
|
second_update_time = node1.query("SELECT last_successful_update_time FROM system.dictionaries WHERE name = 'sqlite3_odbc_hashed'")
|
|
print("Waiting dictionary to update for the second time")
|
|
time.sleep(0.1)
|
|
|
|
node1.exec_in_container(["bash", "-c", "echo 'INSERT INTO t2 values(200, 2, 7);' | sqlite3 {}".format(sqlite_db)],
|
|
privileged=True, user='root')
|
|
|
|
# No reload because of invalidate query
|
|
third_update_time = node1.query("SELECT last_successful_update_time FROM system.dictionaries WHERE name = 'sqlite3_odbc_hashed'")
|
|
print("Third update time", second_update_time)
|
|
counter = 0
|
|
while third_update_time == second_update_time:
|
|
third_update_time = node1.query("SELECT last_successful_update_time FROM system.dictionaries WHERE name = 'sqlite3_odbc_hashed'")
|
|
time.sleep(0.1)
|
|
if counter > 50:
|
|
break
|
|
counter += 1
|
|
|
|
assert_eq_with_retry(node1, "select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(1))", "3")
|
|
assert_eq_with_retry(node1, "select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(200))", "1") # still default
|
|
|
|
node1.exec_in_container(["bash", "-c", "echo 'REPLACE INTO t2 values(1, 2, 5);' | sqlite3 {}".format(sqlite_db)],
|
|
privileged=True, user='root')
|
|
|
|
assert_eq_with_retry(node1, "select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(1))", "5")
|
|
assert_eq_with_retry(node1, "select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(200))", "7")
|
|
|
|
|
|
def test_sqlite_odbc_cached_dictionary(started_cluster):
|
|
sqlite_db = node1.odbc_drivers["SQLite3"]["Database"]
|
|
node1.exec_in_container(["bash", "-c", "echo 'INSERT INTO t3 values(1, 2, 3);' | sqlite3 {}".format(sqlite_db)],
|
|
privileged=True, user='root')
|
|
|
|
assert node1.query("select dictGetUInt8('sqlite3_odbc_cached', 'Z', toUInt64(1))") == "3\n"
|
|
|
|
# Allow insert
|
|
node1.exec_in_container(["bash", "-c", "chmod a+rw /tmp"], privileged=True, user='root')
|
|
node1.exec_in_container(["bash", "-c", "chmod a+rw {}".format(sqlite_db)], privileged=True, user='root')
|
|
|
|
node1.query("insert into table function odbc('DSN={};ReadOnly=0', '', 't3') values (200, 2, 7)".format(
|
|
node1.odbc_drivers["SQLite3"]["DSN"]))
|
|
|
|
assert node1.query("select dictGetUInt8('sqlite3_odbc_cached', 'Z', toUInt64(200))") == "7\n" # new value
|
|
|
|
node1.exec_in_container(["bash", "-c", "echo 'REPLACE INTO t3 values(1, 2, 12);' | sqlite3 {}".format(sqlite_db)],
|
|
privileged=True, user='root')
|
|
|
|
assert_eq_with_retry(node1, "select dictGetUInt8('sqlite3_odbc_cached', 'Z', toUInt64(1))", "12")
|
|
|
|
|
|
def test_postgres_odbc_hashed_dictionary_with_schema(started_cluster):
|
|
conn = get_postgres_conn()
|
|
cursor = conn.cursor()
|
|
cursor.execute("truncate table clickhouse.test_table")
|
|
cursor.execute("insert into clickhouse.test_table values(1, 'hello'),(2, 'world')")
|
|
node1.query("SYSTEM RELOAD DICTIONARY postgres_odbc_hashed")
|
|
assert_eq_with_retry(node1, "select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(1))", "hello")
|
|
assert_eq_with_retry(node1, "select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(2))", "world")
|
|
|
|
|
|
def test_postgres_odbc_hashed_dictionary_no_tty_pipe_overflow(started_cluster):
|
|
conn = get_postgres_conn()
|
|
cursor = conn.cursor()
|
|
cursor.execute("truncate table clickhouse.test_table")
|
|
cursor.execute("insert into clickhouse.test_table values(3, 'xxx')")
|
|
for i in range(100):
|
|
try:
|
|
node1.query("system reload dictionary postgres_odbc_hashed", timeout=5)
|
|
except Exception as ex:
|
|
assert False, "Exception occured -- odbc-bridge hangs: " + str(ex)
|
|
|
|
assert_eq_with_retry(node1, "select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(3))", "xxx")
|
|
|
|
|
|
def test_postgres_insert(started_cluster):
|
|
conn = get_postgres_conn()
|
|
conn.cursor().execute("truncate table clickhouse.test_table")
|
|
|
|
# Also test with Servername containing '.' and '-' symbols (defined in
|
|
# postgres .yml file). This is needed to check parsing, validation and
|
|
# reconstruction of connection string.
|
|
|
|
node1.query(
|
|
"create table pg_insert (column1 UInt8, column2 String) engine=ODBC('DSN=postgresql_odbc;Servername=postgre-sql.local', 'clickhouse', 'test_table')")
|
|
node1.query("insert into pg_insert values (1, 'hello'), (2, 'world')")
|
|
assert node1.query("select * from pg_insert") == '1\thello\n2\tworld\n'
|
|
node1.query("insert into table function odbc('DSN=postgresql_odbc;', 'clickhouse', 'test_table') format CSV 3,test")
|
|
node1.query(
|
|
"insert into table function odbc('DSN=postgresql_odbc;Servername=postgre-sql.local', 'clickhouse', 'test_table') select number, 's' || toString(number) from numbers (4, 7)")
|
|
assert node1.query("select sum(column1), count(column1) from pg_insert") == "55\t10\n"
|
|
assert node1.query(
|
|
"select sum(n), count(n) from (select (*,).1 as n from (select * from odbc('DSN=postgresql_odbc;', 'clickhouse', 'test_table')))") == "55\t10\n"
|
|
|
|
|
|
def test_bridge_dies_with_parent(started_cluster):
|
|
if node1.is_built_with_address_sanitizer():
|
|
# TODO: Leak sanitizer falsely reports about a leak of 16 bytes in clickhouse-odbc-bridge in this test and
|
|
# that's linked somehow with that we have replaced getauxval() in glibc-compatibility.
|
|
# The leak sanitizer calls getauxval() for its own purposes, and our replaced version doesn't seem to be equivalent in that case.
|
|
return
|
|
|
|
node1.query("select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(1))")
|
|
|
|
clickhouse_pid = node1.get_process_pid("clickhouse server")
|
|
bridge_pid = node1.get_process_pid("odbc-bridge")
|
|
assert clickhouse_pid is not None
|
|
assert bridge_pid is not None
|
|
|
|
while clickhouse_pid is not None:
|
|
try:
|
|
node1.exec_in_container(["bash", "-c", "kill {}".format(clickhouse_pid)], privileged=True, user='root')
|
|
except:
|
|
pass
|
|
clickhouse_pid = node1.get_process_pid("clickhouse server")
|
|
time.sleep(1)
|
|
|
|
for i in range(30):
|
|
time.sleep(1) # just for sure, that odbc-bridge caught signal
|
|
bridge_pid = node1.get_process_pid("odbc-bridge")
|
|
if bridge_pid is None:
|
|
break
|
|
|
|
if bridge_pid:
|
|
out = node1.exec_in_container(["gdb", "-p", str(bridge_pid), "--ex", "thread apply all bt", "--ex", "q"],
|
|
privileged=True, user='root')
|
|
print("Bridge is running, gdb output:")
|
|
print(out)
|
|
|
|
assert clickhouse_pid is None
|
|
assert bridge_pid is None
|
|
node1.start_clickhouse(20)
|
|
|
|
|
|
def test_odbc_postgres_date_data_type(started_cluster):
|
|
conn = get_postgres_conn();
|
|
cursor = conn.cursor()
|
|
cursor.execute("CREATE TABLE IF NOT EXISTS clickhouse.test_date (column1 integer, column2 date)")
|
|
|
|
cursor.execute("INSERT INTO clickhouse.test_date VALUES (1, '2020-12-01')")
|
|
cursor.execute("INSERT INTO clickhouse.test_date VALUES (2, '2020-12-02')")
|
|
cursor.execute("INSERT INTO clickhouse.test_date VALUES (3, '2020-12-03')")
|
|
conn.commit()
|
|
|
|
node1.query(
|
|
'''
|
|
CREATE TABLE test_date (column1 UInt64, column2 Date)
|
|
ENGINE=ODBC('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_date')''')
|
|
|
|
expected = '1\t2020-12-01\n2\t2020-12-02\n3\t2020-12-03\n'
|
|
result = node1.query('SELECT * FROM test_date');
|
|
assert(result == expected)
|
|
cursor.execute("DROP TABLE IF EXISTS clickhouse.test_date")
|
|
node1.query("DROP TABLE IF EXISTS test_date")
|
|
|
|
|
|
def test_odbc_postgres_conversions(started_cluster):
|
|
conn = get_postgres_conn()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute(
|
|
'''CREATE TABLE IF NOT EXISTS clickhouse.test_types (
|
|
a smallint, b integer, c bigint, d real, e double precision, f serial, g bigserial,
|
|
h timestamp)''')
|
|
|
|
node1.query('''
|
|
INSERT INTO TABLE FUNCTION
|
|
odbc('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_types')
|
|
VALUES (-32768, -2147483648, -9223372036854775808, 1.12345, 1.1234567890, 2147483647, 9223372036854775807, '2000-05-12 12:12:12')''')
|
|
|
|
result = node1.query('''
|
|
SELECT a, b, c, d, e, f, g, h
|
|
FROM odbc('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_types')
|
|
''')
|
|
|
|
assert(result == '-32768\t-2147483648\t-9223372036854775808\t1.12345\t1.123456789\t2147483647\t9223372036854775807\t2000-05-12 12:12:12\n')
|
|
cursor.execute("DROP TABLE IF EXISTS clickhouse.test_types")
|
|
|
|
cursor.execute("""CREATE TABLE IF NOT EXISTS clickhouse.test_types (column1 Timestamp, column2 Numeric)""")
|
|
|
|
node1.query(
|
|
'''
|
|
CREATE TABLE test_types (column1 DateTime64, column2 Decimal(5, 1))
|
|
ENGINE=ODBC('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_types')''')
|
|
|
|
node1.query(
|
|
"""INSERT INTO test_types
|
|
SELECT toDateTime64('2019-01-01 00:00:00', 3, 'Europe/Moscow'), toDecimal32(1.1, 1)""")
|
|
|
|
expected = node1.query("SELECT toDateTime64('2019-01-01 00:00:00', 3, 'Europe/Moscow'), toDecimal32(1.1, 1)")
|
|
result = node1.query("SELECT * FROM test_types")
|
|
print(result)
|
|
cursor.execute("DROP TABLE IF EXISTS clickhouse.test_types")
|
|
assert(result == expected)
|
|
|
|
|
|
def test_odbc_cyrillic_with_varchar(started_cluster):
|
|
conn = get_postgres_conn()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("DROP TABLE IF EXISTS clickhouse.test_cyrillic")
|
|
cursor.execute("CREATE TABLE clickhouse.test_cyrillic (name varchar(11))")
|
|
|
|
node1.query('''
|
|
CREATE TABLE test_cyrillic (name String)
|
|
ENGINE = ODBC('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_cyrillic')''')
|
|
|
|
cursor.execute("INSERT INTO clickhouse.test_cyrillic VALUES ('A-nice-word')")
|
|
cursor.execute("INSERT INTO clickhouse.test_cyrillic VALUES ('Красивенько')")
|
|
|
|
result = node1.query(''' SELECT * FROM test_cyrillic ORDER BY name''')
|
|
assert(result == 'A-nice-word\nКрасивенько\n')
|
|
result = node1.query(''' SELECT name FROM odbc('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_cyrillic') ''')
|
|
assert(result == 'A-nice-word\nКрасивенько\n')
|
|
|
|
|
|
def test_many_connections(started_cluster):
|
|
conn = get_postgres_conn()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('DROP TABLE IF EXISTS clickhouse.test_pg_table')
|
|
cursor.execute('CREATE TABLE clickhouse.test_pg_table (key integer, value integer)')
|
|
|
|
node1.query('''
|
|
DROP TABLE IF EXISTS test_pg_table;
|
|
CREATE TABLE test_pg_table (key UInt32, value UInt32)
|
|
ENGINE = ODBC('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_pg_table')''')
|
|
|
|
node1.query("INSERT INTO test_pg_table SELECT number, number FROM numbers(10)")
|
|
|
|
query = "SELECT count() FROM ("
|
|
for i in range (24):
|
|
query += "SELECT key FROM {t} UNION ALL "
|
|
query += "SELECT key FROM {t})"
|
|
|
|
assert node1.query(query.format(t='test_pg_table')) == '250\n'
|
|
|
|
|
|
def test_concurrent_queries(started_cluster):
|
|
conn = get_postgres_conn()
|
|
cursor = conn.cursor()
|
|
|
|
node1.query('''
|
|
DROP TABLE IF EXISTS test_pg_table;
|
|
CREATE TABLE test_pg_table (key UInt32, value UInt32)
|
|
ENGINE = ODBC('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_pg_table')''')
|
|
|
|
cursor.execute('DROP TABLE IF EXISTS clickhouse.test_pg_table')
|
|
cursor.execute('CREATE TABLE clickhouse.test_pg_table (key integer, value integer)')
|
|
|
|
def node_insert(_):
|
|
for i in range(5):
|
|
node1.query("INSERT INTO test_pg_table SELECT number, number FROM numbers(1000)", user='default')
|
|
|
|
busy_pool = Pool(5)
|
|
p = busy_pool.map_async(node_insert, range(5))
|
|
p.wait()
|
|
result = node1.query("SELECT count() FROM test_pg_table", user='default')
|
|
print(result)
|
|
assert(int(result) == 5 * 5 * 1000)
|
|
|
|
def node_insert_select(_):
|
|
for i in range(5):
|
|
result = node1.query("INSERT INTO test_pg_table SELECT number, number FROM numbers(1000)", user='default')
|
|
result = node1.query("SELECT * FROM test_pg_table LIMIT 100", user='default')
|
|
|
|
busy_pool = Pool(5)
|
|
p = busy_pool.map_async(node_insert_select, range(5))
|
|
p.wait()
|
|
result = node1.query("SELECT count() FROM test_pg_table", user='default')
|
|
print(result)
|
|
assert(int(result) == 5 * 5 * 1000 * 2)
|
|
|
|
node1.query('DROP TABLE test_pg_table;')
|
|
cursor.execute('DROP TABLE clickhouse.test_pg_table;')
|