ClickHouse/tests/integration/test_odbc_interaction/test.py
Rodolphe Dugé de Bernonville c8bca3135d fix odbc and nullable fields
2024-06-27 09:55:24 +02:00

1022 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import psycopg2
import pymysql.cursors
import pytest
import logging
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from multiprocessing.dummy import Pool
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
with_odbc_drivers=True,
with_mysql8=True,
with_postgres=True,
main_configs=["configs/openssl.xml", "configs/odbc_logging.xml"],
dictionaries=[
"configs/dictionaries/sqlite3_odbc_hashed_dictionary.xml",
"configs/dictionaries/sqlite3_odbc_cached_dictionary.xml",
"configs/dictionaries/postgres_odbc_hashed_dictionary.xml",
"configs/dictionaries/postgres_odbc_no_connection_pool_dictionary.xml",
],
)
drop_table_sql_template = """
DROP TABLE IF EXISTS `clickhouse`.`{}`
"""
create_table_sql_template = """
CREATE TABLE `clickhouse`.`{}` (
`id` int(11) NOT NULL,
`name` varchar(50) NOT NULL,
`age` int NOT NULL default 0,
`money` int NOT NULL default 0,
`column_x` int default NULL,
PRIMARY KEY (`id`)) ENGINE=InnoDB;
"""
create_table_sql_nullable_template = """
CREATE TABLE `clickhouse`.`{}` (
`id` integer not null,
`col1` integer,
`col2` decimal(15,10),
`col3` varchar(32),
`col4` datetime
)
"""
def skip_test_msan(instance):
if instance.is_built_with_memory_sanitizer():
pytest.skip("Memory Sanitizer cannot work with third-party shared libraries")
def get_mysql_conn():
errors = []
conn = None
for _ in range(15):
try:
if conn is None:
conn = pymysql.connect(
user="root",
password="clickhouse",
host=cluster.mysql8_ip,
port=cluster.mysql8_port,
)
else:
conn.ping(reconnect=True)
logging.debug(
f"MySQL Connection establised: {cluster.mysql8_ip}:{cluster.mysql8_port}"
)
return conn
except Exception as e:
errors += [str(e)]
time.sleep(1)
raise Exception("Connection not establised, {}".format(errors))
def create_mysql_db(conn, name):
with conn.cursor() as cursor:
cursor.execute("DROP DATABASE IF EXISTS {}".format(name))
cursor.execute("CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(name))
def create_mysql_nullable_table(conn, table_name):
with conn.cursor() as cursor:
cursor.execute(create_table_sql_nullable_template.format(table_name))
def create_mysql_table(conn, table_name):
with conn.cursor() as cursor:
cursor.execute(create_table_sql_template.format(table_name))
def drop_mysql_table(conn, table_name):
with conn.cursor() as cursor:
cursor.execute(drop_table_sql_template.format(table_name))
def get_postgres_conn(started_cluster):
conn_string = "host={} port={} user='postgres' password='mysecretpassword'".format(
started_cluster.postgres_ip, started_cluster.postgres_port
)
errors = []
for _ in range(15):
try:
conn = psycopg2.connect(conn_string)
logging.debug("Postgre Connection establised: {}".format(conn_string))
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
conn.autocommit = True
return conn
except Exception as e:
errors += [str(e)]
time.sleep(1)
raise Exception(
"Postgre connection not establised DSN={}, {}".format(conn_string, errors)
)
def create_postgres_db(conn, name):
cursor = conn.cursor()
cursor.execute("CREATE SCHEMA {}".format(name))
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
sqlite_db = node1.odbc_drivers["SQLite3"]["Database"]
logging.debug(f"sqlite data received: {sqlite_db}")
node1.exec_in_container(
[
"sqlite3",
sqlite_db,
"CREATE TABLE t1(id INTEGER PRIMARY KEY ASC, x INTEGER, y, z);",
],
privileged=True,
user="root",
)
node1.exec_in_container(
[
"sqlite3",
sqlite_db,
"CREATE TABLE t2(id INTEGER PRIMARY KEY ASC, X INTEGER, Y, Z);",
],
privileged=True,
user="root",
)
node1.exec_in_container(
[
"sqlite3",
sqlite_db,
"CREATE TABLE t3(id INTEGER PRIMARY KEY ASC, X INTEGER, Y, Z);",
],
privileged=True,
user="root",
)
node1.exec_in_container(
[
"sqlite3",
sqlite_db,
"CREATE TABLE t4(id INTEGER PRIMARY KEY ASC, X INTEGER, Y, Z);",
],
privileged=True,
user="root",
)
node1.exec_in_container(
[
"sqlite3",
sqlite_db,
"CREATE TABLE tf1(id INTEGER PRIMARY KEY ASC, x INTEGER, y, z);",
],
privileged=True,
user="root",
)
logging.debug("sqlite tables created")
mysql_conn = get_mysql_conn()
logging.debug("mysql connection received")
## create mysql db and table
create_mysql_db(mysql_conn, "clickhouse")
logging.debug("mysql database created")
postgres_conn = get_postgres_conn(cluster)
logging.debug("postgres connection received")
create_postgres_db(postgres_conn, "clickhouse")
logging.debug("postgres db created")
cursor = postgres_conn.cursor()
cursor.execute(
"create table if not exists clickhouse.test_table (id int primary key, column1 int not null, column2 varchar(40) not null)"
)
yield cluster
except Exception as ex:
logging.exception(ex)
raise ex
finally:
cluster.shutdown()
def test_mysql_odbc_select_nullable(started_cluster):
skip_test_msan(node1)
mysql_setup = node1.odbc_drivers["MySQL"]
table_name = "test_insert_nullable_select"
conn = get_mysql_conn()
create_mysql_nullable_table(conn, table_name)
with conn.cursor() as cursor:
cursor.execute(
"INSERT INTO clickhouse.{} VALUES(1, 1, 1.23456, 'data1', '2010-01-01 00:00:00');".format(
table_name
)
)
cursor.execute(
"INSERT INTO clickhouse.{} VALUES(2, NULL, NULL, NULL, NULL);".format(
table_name
)
)
conn.commit()
node1.query(
"""
CREATE TABLE {}(id UInt32, col1 Nullable(UInt32), col2 Nullable(Decimal(15, 10)), col3 Nullable(String), col4 Nullable(DateTime)) ENGINE = ODBC('DSN={}', 'clickhouse', '{}');
""".format(
table_name, mysql_setup["DSN"], table_name
)
)
assert (
node1.query(
"SELECT id, col1, col2, col3, col4 from {} order by id asc".format(
table_name
)
)
== "1\t1\t1.23456\tdata1\t2010-01-01 00:00:00\n2\t\\N\t\\N\t\\N\t\\N\n"
)
drop_mysql_table(conn, table_name)
conn.close()
def test_mysql_simple_select_works(started_cluster):
skip_test_msan(node1)
mysql_setup = node1.odbc_drivers["MySQL"]
table_name = "test_insert_select"
conn = get_mysql_conn()
create_mysql_table(conn, table_name)
# Check that NULL-values are handled correctly by the ODBC-bridge
with conn.cursor() as cursor:
cursor.execute(
"INSERT INTO clickhouse.{} VALUES(50, 'null-guy', 127, 255, NULL), (100, 'non-null-guy', 127, 255, 511);".format(
table_name
)
)
conn.commit()
assert (
node1.query(
"SELECT column_x FROM odbc('DSN={}', '{}')".format(
mysql_setup["DSN"], table_name
),
settings={"external_table_functions_use_nulls": "1"},
)
== "\\N\n511\n"
)
assert (
node1.query(
"SELECT column_x FROM odbc('DSN={}', '{}')".format(
mysql_setup["DSN"], table_name
),
settings={"external_table_functions_use_nulls": "0"},
)
== "0\n511\n"
)
node1.query(
"""
CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32, column_x Nullable(UInt32)) ENGINE = MySQL('mysql80:3306', 'clickhouse', '{}', 'root', 'clickhouse');
""".format(
table_name, table_name
)
)
node1.query(
"INSERT INTO {}(id, name, money, column_x) select number, concat('name_', toString(number)), 3, NULL from numbers(49) ".format(
table_name
)
)
node1.query(
"INSERT INTO {}(id, name, money, column_x) select number, concat('name_', toString(number)), 3, 42 from numbers(51, 49) ".format(
table_name
)
)
assert (
node1.query(
"SELECT COUNT () FROM {} WHERE column_x IS NOT NULL".format(table_name)
)
== "50\n"
)
assert (
node1.query("SELECT COUNT () FROM {} WHERE column_x IS NULL".format(table_name))
== "50\n"
)
assert (
node1.query(
"SELECT count(*) FROM odbc('DSN={}', '{}')".format(
mysql_setup["DSN"], table_name
)
)
== "100\n"
)
# previously this test fails with segfault
# just to be sure :)
assert node1.query("select 1") == "1\n"
node1.query(f"DROP TABLE {table_name}")
drop_mysql_table(conn, table_name)
conn.close()
def test_mysql_insert(started_cluster):
skip_test_msan(node1)
mysql_setup = node1.odbc_drivers["MySQL"]
table_name = "test_insert"
conn = get_mysql_conn()
create_mysql_table(conn, table_name)
odbc_args = "'DSN={}', '{}', '{}'".format(
mysql_setup["DSN"], mysql_setup["Database"], table_name
)
node1.query(
"create table mysql_insert (id Int64, name String, age UInt8, money Float, column_x Nullable(Int16)) Engine=ODBC({})".format(
odbc_args
)
)
node1.query(
"insert into mysql_insert values (1, 'test', 11, 111, 1111), (2, 'odbc', 22, 222, NULL)"
)
assert (
node1.query("select * from mysql_insert")
== "1\ttest\t11\t111\t1111\n2\todbc\t22\t222\t\\N\n"
)
node1.query(
"insert into table function odbc({}) values (3, 'insert', 33, 333, 3333)".format(
odbc_args
)
)
node1.query(
"insert into table function odbc({}) (id, name, age, money) select id*4, upper(name), age*4, money*4 from odbc({}) where id=1".format(
odbc_args, odbc_args
)
)
assert (
node1.query("select * from mysql_insert where id in (3, 4)")
== "3\tinsert\t33\t333\t3333\n4\tTEST\t44\t444\t\\N\n"
)
node1.query("DROP TABLE mysql_insert")
drop_mysql_table(conn, table_name)
def test_sqlite_simple_select_function_works(started_cluster):
skip_test_msan(node1)
sqlite_setup = node1.odbc_drivers["SQLite3"]
sqlite_db = sqlite_setup["Database"]
node1.exec_in_container(
["sqlite3", sqlite_db, "INSERT INTO t1 values(1, 1, 2, 3);"],
privileged=True,
user="root",
)
assert (
node1.query(
"select * from odbc('DSN={}', '{}')".format(sqlite_setup["DSN"], "t1")
)
== "1\t1\t2\t3\n"
)
assert (
node1.query(
"select y from odbc('DSN={}', '{}')".format(sqlite_setup["DSN"], "t1")
)
== "2\n"
)
assert (
node1.query(
"select z from odbc('DSN={}', '{}')".format(sqlite_setup["DSN"], "t1")
)
== "3\n"
)
assert (
node1.query(
"select x from odbc('DSN={}', '{}')".format(sqlite_setup["DSN"], "t1")
)
== "1\n"
)
assert (
node1.query(
"select x, y from odbc('DSN={}', '{}')".format(sqlite_setup["DSN"], "t1")
)
== "1\t2\n"
)
assert (
node1.query(
"select z, x, y from odbc('DSN={}', '{}')".format(sqlite_setup["DSN"], "t1")
)
== "3\t1\t2\n"
)
assert (
node1.query(
"select count(), sum(x) from odbc('DSN={}', '{}') group by x".format(
sqlite_setup["DSN"], "t1"
)
)
== "1\t1\n"
)
node1.exec_in_container(
["sqlite3", sqlite_db, "DELETE FROM t1;"],
privileged=True,
user="root",
)
def test_sqlite_table_function(started_cluster):
skip_test_msan(node1)
sqlite_setup = node1.odbc_drivers["SQLite3"]
sqlite_db = sqlite_setup["Database"]
node1.exec_in_container(
["sqlite3", sqlite_db, "INSERT INTO tf1 values(1, 1, 2, 3);"],
privileged=True,
user="root",
)
node1.query(
"create table odbc_tf as odbc('DSN={}', '{}')".format(
sqlite_setup["DSN"], "tf1"
)
)
assert node1.query("select * from odbc_tf") == "1\t1\t2\t3\n"
assert node1.query("select y from odbc_tf") == "2\n"
assert node1.query("select z from odbc_tf") == "3\n"
assert node1.query("select x from odbc_tf") == "1\n"
assert node1.query("select x, y from odbc_tf") == "1\t2\n"
assert node1.query("select z, x, y from odbc_tf") == "3\t1\t2\n"
assert node1.query("select count(), sum(x) from odbc_tf group by x") == "1\t1\n"
node1.query("DROP TABLE odbc_tf")
node1.exec_in_container(
["sqlite3", sqlite_db, "DELETE FROM tf1;"],
privileged=True,
user="root",
)
def test_sqlite_simple_select_storage_works(started_cluster):
skip_test_msan(node1)
sqlite_setup = node1.odbc_drivers["SQLite3"]
sqlite_db = sqlite_setup["Database"]
node1.exec_in_container(
["sqlite3", sqlite_db, "INSERT INTO t4 values(1, 1, 2, 3);"],
privileged=True,
user="root",
)
node1.query(
"create table SqliteODBC (x Int32, y String, z String) engine = ODBC('DSN={}', '', 't4')".format(
sqlite_setup["DSN"]
)
)
assert node1.query("select * from SqliteODBC") == "1\t2\t3\n"
assert node1.query("select y from SqliteODBC") == "2\n"
assert node1.query("select z from SqliteODBC") == "3\n"
assert node1.query("select x from SqliteODBC") == "1\n"
assert node1.query("select x, y from SqliteODBC") == "1\t2\n"
assert node1.query("select z, x, y from SqliteODBC") == "3\t1\t2\n"
assert node1.query("select count(), sum(x) from SqliteODBC group by x") == "1\t1\n"
node1.query("DROP TABLE SqliteODBC")
node1.exec_in_container(
["sqlite3", sqlite_db, "DELETE FROM t4;"],
privileged=True,
user="root",
)
def test_sqlite_odbc_hashed_dictionary(started_cluster):
skip_test_msan(node1)
sqlite_db = node1.odbc_drivers["SQLite3"]["Database"]
node1.exec_in_container(
["sqlite3", sqlite_db, "INSERT INTO t2 values(1, 1, 2, 3);"],
privileged=True,
user="root",
)
node1.query("SYSTEM RELOAD DICTIONARY sqlite3_odbc_hashed")
first_update_time = node1.query(
"SELECT last_successful_update_time FROM system.dictionaries WHERE name = 'sqlite3_odbc_hashed'"
)
logging.debug(f"First update time {first_update_time}")
assert_eq_with_retry(
node1, "select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(1))", "3"
)
assert_eq_with_retry(
node1, "select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(200))", "1"
) # default
second_update_time = node1.query(
"SELECT last_successful_update_time FROM system.dictionaries WHERE name = 'sqlite3_odbc_hashed'"
)
# Reloaded with new data
logging.debug(f"Second update time {second_update_time}")
while first_update_time == second_update_time:
second_update_time = node1.query(
"SELECT last_successful_update_time FROM system.dictionaries WHERE name = 'sqlite3_odbc_hashed'"
)
logging.debug("Waiting dictionary to update for the second time")
time.sleep(0.1)
node1.exec_in_container(
["sqlite3", sqlite_db, "INSERT INTO t2 values(200, 200, 2, 7);"],
privileged=True,
user="root",
)
# No reload because of invalidate query
third_update_time = node1.query(
"SELECT last_successful_update_time FROM system.dictionaries WHERE name = 'sqlite3_odbc_hashed'"
)
logging.debug(f"Third update time {second_update_time}")
counter = 0
while third_update_time == second_update_time:
third_update_time = node1.query(
"SELECT last_successful_update_time FROM system.dictionaries WHERE name = 'sqlite3_odbc_hashed'"
)
time.sleep(0.1)
if counter > 50:
break
counter += 1
assert_eq_with_retry(
node1, "select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(1))", "3"
)
assert_eq_with_retry(
node1, "select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(200))", "1"
) # still default
node1.exec_in_container(
["sqlite3", sqlite_db, "REPLACE INTO t2 values(1, 1, 2, 5);"],
privileged=True,
user="root",
)
assert_eq_with_retry(
node1, "select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(1))", "5"
)
assert_eq_with_retry(
node1, "select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(200))", "7"
)
node1.exec_in_container(
["sqlite3", sqlite_db, "DELETE FROM t2;"],
privileged=True,
user="root",
)
def test_sqlite_odbc_cached_dictionary(started_cluster):
skip_test_msan(node1)
sqlite_db = node1.odbc_drivers["SQLite3"]["Database"]
node1.exec_in_container(
["sqlite3", sqlite_db, "INSERT INTO t3 values(1, 1, 2, 3);"],
privileged=True,
user="root",
)
assert (
node1.query("select dictGetUInt8('sqlite3_odbc_cached', 'Z', toUInt64(1))")
== "3\n"
)
# Allow insert
node1.exec_in_container(["chmod", "a+rw", "/tmp"], privileged=True, user="root")
node1.exec_in_container(["chmod", "a+rw", sqlite_db], privileged=True, user="root")
node1.query(
"insert into table function odbc('DSN={};ReadOnly=0', '', 't3') values (200, 200, 2, 7)".format(
node1.odbc_drivers["SQLite3"]["DSN"]
)
)
assert (
node1.query("select dictGetUInt8('sqlite3_odbc_cached', 'Z', toUInt64(200))")
== "7\n"
) # new value
node1.exec_in_container(
["sqlite3", sqlite_db, "REPLACE INTO t3 values(1, 1, 2, 12);"],
privileged=True,
user="root",
)
assert_eq_with_retry(
node1, "select dictGetUInt8('sqlite3_odbc_cached', 'Z', toUInt64(1))", "12"
)
node1.exec_in_container(
["sqlite3", sqlite_db, "DELETE FROM t3;"],
privileged=True,
user="root",
)
node1.query("SYSTEM RELOAD DICTIONARIES")
def test_postgres_odbc_hashed_dictionary_with_schema(started_cluster):
skip_test_msan(node1)
try:
conn = get_postgres_conn(started_cluster)
cursor = conn.cursor()
cursor.execute(
"insert into clickhouse.test_table values(1, 1, 'hello'),(2, 2, 'world')"
)
node1.query("SYSTEM RELOAD DICTIONARY postgres_odbc_hashed")
node1.exec_in_container(
["ss", "-K", "dport", "postgresql"], privileged=True, user="root"
)
node1.query("SYSTEM RELOAD DICTIONARY postgres_odbc_hashed")
assert_eq_with_retry(
node1,
"select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(1))",
"hello",
)
assert_eq_with_retry(
node1,
"select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(2))",
"world",
)
finally:
cursor.execute("truncate table clickhouse.test_table")
def test_postgres_odbc_hashed_dictionary_no_tty_pipe_overflow(started_cluster):
skip_test_msan(node1)
try:
conn = get_postgres_conn(started_cluster)
cursor = conn.cursor()
cursor.execute("insert into clickhouse.test_table values(3, 3, 'xxx')")
for i in range(100):
try:
node1.query("system reload dictionary postgres_odbc_hashed", timeout=15)
except Exception as ex:
assert False, "Exception occured -- odbc-bridge hangs: " + str(ex)
assert_eq_with_retry(
node1,
"select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(3))",
"xxx",
)
finally:
cursor.execute("truncate table clickhouse.test_table")
def test_no_connection_pooling(started_cluster):
skip_test_msan(node1)
try:
conn = get_postgres_conn(started_cluster)
cursor = conn.cursor()
cursor.execute(
"insert into clickhouse.test_table values(1, 1, 'hello'),(2, 2, 'world')"
)
node1.exec_in_container(
["ss", "-K", "dport", "5432"], privileged=True, user="root"
)
node1.query("SYSTEM RELOAD DICTIONARY postgres_odbc_nopool")
assert_eq_with_retry(
node1,
"select dictGetString('postgres_odbc_nopool', 'column2', toUInt64(1))",
"hello",
)
assert_eq_with_retry(
node1,
"select dictGetString('postgres_odbc_nopool', 'column2', toUInt64(2))",
"world",
)
# No open connections should be left because we don't use connection pooling.
assert "" == node1.exec_in_container(
["ss", "-H", "dport", "5432"], privileged=True, user="root"
)
finally:
cursor.execute("truncate table clickhouse.test_table")
def test_postgres_insert(started_cluster):
skip_test_msan(node1)
conn = get_postgres_conn(started_cluster)
# Also test with Servername containing '.' and '-' symbols (defined in
# postgres .yml file). This is needed to check parsing, validation and
# reconstruction of connection string.
try:
node1.query(
"create table pg_insert (id UInt64, column1 UInt8, column2 String) engine=ODBC('DSN=postgresql_odbc;Servername=postgre-sql.local', 'clickhouse', 'test_table')"
)
node1.query("insert into pg_insert values (1, 1, 'hello'), (2, 2, 'world')")
assert node1.query("select * from pg_insert") == "1\t1\thello\n2\t2\tworld\n"
node1.query(
"insert into table function odbc('DSN=postgresql_odbc', 'clickhouse', 'test_table') format CSV 3,3,test"
)
node1.query(
"insert into table function odbc('DSN=postgresql_odbc;Servername=postgre-sql.local', 'clickhouse', 'test_table')"
" select number, number, 's' || toString(number) from numbers (4, 7)"
)
assert (
node1.query("select sum(column1), count(column1) from pg_insert")
== "55\t10\n"
)
assert (
node1.query(
"select sum(n), count(n) from (select (*,).1 as n from (select * from odbc('DSN=postgresql_odbc', 'clickhouse', 'test_table')))"
)
== "55\t10\n"
)
finally:
node1.query("DROP TABLE IF EXISTS pg_insert")
conn.cursor().execute("truncate table clickhouse.test_table")
def test_odbc_postgres_date_data_type(started_cluster):
skip_test_msan(node1)
try:
conn = get_postgres_conn(started_cluster)
cursor = conn.cursor()
cursor.execute(
"CREATE TABLE clickhouse.test_date (id integer, column1 integer, column2 date)"
)
cursor.execute("INSERT INTO clickhouse.test_date VALUES (1, 1, '2020-12-01')")
cursor.execute("INSERT INTO clickhouse.test_date VALUES (2, 2, '2020-12-02')")
cursor.execute("INSERT INTO clickhouse.test_date VALUES (3, 3, '2020-12-03')")
conn.commit()
node1.query(
"""
CREATE TABLE test_date (id UInt64, column1 UInt64, column2 Date)
ENGINE=ODBC('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_date')"""
)
expected = "1\t1\t2020-12-01\n2\t2\t2020-12-02\n3\t3\t2020-12-03\n"
result = node1.query("SELECT * FROM test_date")
assert result == expected
finally:
cursor.execute("DROP TABLE clickhouse.test_date")
node1.query("DROP TABLE IF EXISTS test_date")
def test_odbc_postgres_conversions(started_cluster):
skip_test_msan(node1)
try:
conn = get_postgres_conn(started_cluster)
cursor = conn.cursor()
cursor.execute(
"""CREATE TABLE clickhouse.test_types (
a smallint, b integer, c bigint, d real, e double precision, f serial, g bigserial,
h timestamp)"""
)
node1.query(
"""
INSERT INTO TABLE FUNCTION
odbc('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_types')
VALUES (-32768, -2147483648, -9223372036854775808, 1.12345, 1.1234567890, 2147483647, 9223372036854775807, '2000-05-12 12:12:12')"""
)
result = node1.query(
"""
SELECT a, b, c, d, e, f, g, h
FROM odbc('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_types')
"""
)
assert (
result
== "-32768\t-2147483648\t-9223372036854775808\t1.12345\t1.123456789\t2147483647\t9223372036854775807\t2000-05-12 12:12:12\n"
)
cursor.execute("DROP TABLE IF EXISTS clickhouse.test_types")
cursor.execute(
"""CREATE TABLE clickhouse.test_types (column1 Timestamp, column2 Numeric)"""
)
node1.query(
"""
CREATE TABLE test_types (column1 DateTime64, column2 Decimal(5, 1))
ENGINE=ODBC('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_types')"""
)
node1.query(
"""INSERT INTO test_types
SELECT toDateTime64('2019-01-01 00:00:00', 3, 'Etc/UTC'), toDecimal32(1.1, 1)"""
)
expected = node1.query(
"SELECT toDateTime64('2019-01-01 00:00:00', 3, 'Etc/UTC'), toDecimal32(1.1, 1)"
)
result = node1.query("SELECT * FROM test_types")
assert result == expected
finally:
cursor.execute("DROP TABLE IF EXISTS clickhouse.test_types")
node1.query("DROP TABLE IF EXISTS test_types")
def test_odbc_cyrillic_with_varchar(started_cluster):
skip_test_msan(node1)
conn = get_postgres_conn(started_cluster)
cursor = conn.cursor()
cursor.execute("DROP TABLE IF EXISTS clickhouse.test_cyrillic")
cursor.execute("CREATE TABLE clickhouse.test_cyrillic (name varchar(11))")
node1.query(
"""
CREATE TABLE test_cyrillic (name String)
ENGINE = ODBC('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_cyrillic')"""
)
cursor.execute("INSERT INTO clickhouse.test_cyrillic VALUES ('A-nice-word')")
cursor.execute("INSERT INTO clickhouse.test_cyrillic VALUES ('Красивенько')")
result = node1.query(""" SELECT * FROM test_cyrillic ORDER BY name""")
assert result == "A-nice-word\nКрасивенько\n"
result = node1.query(
""" SELECT name FROM odbc('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_cyrillic') """
)
assert result == "A-nice-word\nКрасивенько\n"
node1.query("DROP TABLE test_cyrillic")
def test_many_connections(started_cluster):
skip_test_msan(node1)
conn = get_postgres_conn(started_cluster)
cursor = conn.cursor()
cursor.execute("CREATE TABLE clickhouse.test_pg_table (key integer, value integer)")
node1.query(
"""
DROP TABLE IF EXISTS test_pg_table;
CREATE TABLE test_pg_table (key UInt32, value UInt32)
ENGINE = ODBC('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_pg_table')"""
)
node1.query("INSERT INTO test_pg_table SELECT number, number FROM numbers(10)")
query = "SELECT count() FROM ("
for i in range(24):
query += "SELECT key FROM {t} UNION ALL "
query += "SELECT key FROM {t})"
assert node1.query(query.format(t="test_pg_table")) == "250\n"
cursor.execute("DROP TABLE clickhouse.test_pg_table")
def test_concurrent_queries(started_cluster):
skip_test_msan(node1)
conn = get_postgres_conn(started_cluster)
cursor = conn.cursor()
node1.query(
"""
DROP TABLE IF EXISTS test_pg_table;
CREATE TABLE test_pg_table (key UInt32, value UInt32)
ENGINE = ODBC('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_pg_table')"""
)
cursor.execute("DROP TABLE IF EXISTS clickhouse.test_pg_table")
cursor.execute("CREATE TABLE clickhouse.test_pg_table (key integer, value integer)")
def node_insert(_):
for i in range(5):
node1.query(
"INSERT INTO test_pg_table SELECT number, number FROM numbers(1000)",
user="default",
)
busy_pool = Pool(5)
p = busy_pool.map_async(node_insert, range(5))
p.wait()
assert_eq_with_retry(
node1, "SELECT count() FROM test_pg_table", str(5 * 5 * 1000), retry_count=100
)
def node_insert_select(_):
for i in range(5):
result = node1.query(
"INSERT INTO test_pg_table SELECT number, number FROM numbers(1000)",
user="default",
)
result = node1.query(
"SELECT * FROM test_pg_table LIMIT 100", user="default"
)
busy_pool = Pool(5)
p = busy_pool.map_async(node_insert_select, range(5))
p.wait()
assert_eq_with_retry(
node1,
"SELECT count() FROM test_pg_table",
str(5 * 5 * 1000 * 2),
retry_count=100,
)
node1.query("DROP TABLE test_pg_table;")
cursor.execute("DROP TABLE clickhouse.test_pg_table;")
def test_odbc_long_column_names(started_cluster):
skip_test_msan(node1)
conn = get_postgres_conn(started_cluster)
cursor = conn.cursor()
column_name = "column" * 8
create_table = "CREATE TABLE clickhouse.test_long_column_names ("
for i in range(1000):
if i != 0:
create_table += ", "
create_table += "{} integer".format(column_name + str(i))
create_table += ")"
cursor.execute(create_table)
insert = (
"INSERT INTO clickhouse.test_long_column_names SELECT i"
+ ", i" * 999
+ " FROM generate_series(0, 99) as t(i)"
)
cursor.execute(insert)
conn.commit()
create_table = "CREATE TABLE test_long_column_names ("
for i in range(1000):
if i != 0:
create_table += ", "
create_table += "{} UInt32".format(column_name + str(i))
create_table += ") ENGINE=ODBC('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_long_column_names')"
result = node1.query(create_table)
result = node1.query("SELECT * FROM test_long_column_names")
expected = node1.query("SELECT number" + ", number" * 999 + " FROM numbers(100)")
assert result == expected
cursor.execute("DROP TABLE IF EXISTS clickhouse.test_long_column_names")
node1.query("DROP TABLE IF EXISTS test_long_column_names")
def test_odbc_long_text(started_cluster):
skip_test_msan(node1)
conn = get_postgres_conn(started_cluster)
cursor = conn.cursor()
cursor.execute("create table clickhouse.test_long_text(flen int, field1 text)")
# sample test from issue 9363
text_from_issue = """BEGIN These examples only show the order that data is arranged in. The values from different columns are stored separately, and data from the same column is stored together. Examples of a column-oriented DBMS: Vertica, Paraccel (Actian Matrix and Amazon Redshift), Sybase IQ, Exasol, Infobright, InfiniDB, MonetDB (VectorWise and Actian Vector), LucidDB, SAP HANA, Google Dremel, Google PowerDrill, Druid, and kdb+. Different orders for storing data are better suited to different scenarios. The data access scenario refers to what queries are made, how often, and in what proportion; how much data is read for each type of query rows, columns, and bytes; the relationship between reading and updating data; the working size of the data and how locally it is used; whether transactions are used, and how isolated they are; requirements for data replication and logical integrity; requirements for latency and throughput for each type of query, and so on. The higher the load on the system, the more important it is to customize the system set up to match the requirements of the usage scenario, and the more fine grained this customization becomes. There is no system that is equally well-suited to significantly different scenarios. If a system is adaptable to a wide set of scenarios, under a high load, the system will handle all the scenarios equally poorly, or will work well for just one or few of possible scenarios. Key Properties of OLAP Scenario¶ The vast majority of requests are for read access. Data is updated in fairly large batches (> 1000 rows), not by single rows; or it is not updated at all. Data is added to the DB but is not modified. For reads, quite a large number of rows are extracted from the DB, but only a small subset of columns. Tables are "wide," meaning they contain a large number of columns. Queries are relatively rare (usually hundreds of queries per server or less per second). For simple queries, latencies around 50 ms are allowed. Column values are fairly small: numbers and short strings (for example, 60 bytes per URL). Requires high throughput when processing a single query (up to billions of rows per second per server). Transactions are not necessary. Low requirements for data consistency. There is one large table per query. All tables are small, except for one. A query result is significantly smaller than the source data. In other words, data is filtered or aggregated, so the result fits in a single server"s RAM. It is easy to see that the OLAP scenario is very different from other popular scenarios (such as OLTP or Key-Value access). So it doesn"t make sense to try to use OLTP or a Key-Value DB for processing analytical queries if you want to get decent performance. For example, if you try to use MongoDB or Redis for analytics, you will get very poor performance compared to OLAP databases. Why Column-Oriented Databases Work Better in the OLAP Scenario¶ Column-oriented databases are better suited to OLAP scenarios: they are at least 100 times faster in processing most queries. The reasons are explained in detail below, but the fact is easier to demonstrate visually. END"""
cursor.execute(
"""insert into clickhouse.test_long_text (flen, field1) values (3248, '{}')""".format(
text_from_issue
)
)
node1.query(
"""
DROP TABLE IF EXISTS test_long_test;
CREATE TABLE test_long_text (flen UInt32, field1 String)
ENGINE = ODBC('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_long_text')"""
)
result = node1.query("select field1 from test_long_text;")
assert result.strip() == text_from_issue
long_text = "text" * 1000000
cursor.execute(
"""insert into clickhouse.test_long_text (flen, field1) values (400000, '{}')""".format(
long_text
)
)
result = node1.query("select field1 from test_long_text where flen=400000;")
assert result.strip() == long_text
node1.query("DROP TABLE test_long_text")
cursor.execute("drop table clickhouse.test_long_text")