Improve postgres tests. Fix flaky test_postgresql_database_engine

This commit is contained in:
Yatsishin Ilya 2022-06-29 12:54:45 +00:00
parent 50eb364a56
commit c86c6cc2d9
4 changed files with 138 additions and 176 deletions

View File

@ -4,7 +4,7 @@
<name>dict0</name> <name>dict0</name>
<source> <source>
<postgresql> <postgresql>
<db>clickhouse</db> <db>postgres_database</db>
<host>postgres1</host> <host>postgres1</host>
<port>5432</port> <port>5432</port>
<user>postgres</user> <user>postgres</user>
@ -38,7 +38,7 @@
<name>dict1</name> <name>dict1</name>
<source> <source>
<postgresql> <postgresql>
<db>clickhouse</db> <db>postgres_database</db>
<user>postgres</user> <user>postgres</user>
<password>mysecretpassword</password> <password>mysecretpassword</password>
<table>test1</table> <table>test1</table>

View File

@ -1,9 +1,11 @@
import pytest import pytest
import time import time
import logging
import psycopg2 import psycopg2
from multiprocessing.dummy import Pool from multiprocessing.dummy import Pool
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster
from helpers.postgres_utility import get_postgres_conn
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
cluster = ClickHouseCluster(__file__) cluster = ClickHouseCluster(__file__)
@ -18,63 +20,39 @@ node1 = cluster.add_instance(
with_postgres_cluster=True, with_postgres_cluster=True,
) )
postgres_dict_table_template = """
CREATE TABLE IF NOT EXISTS {} (
id Integer NOT NULL, key Integer NOT NULL, value Integer NOT NULL, PRIMARY KEY (id))
"""
click_dict_table_template = """
CREATE TABLE IF NOT EXISTS `test`.`dict_table_{}` (
`key` UInt32, `value` UInt32
) ENGINE = Dictionary({})
"""
def get_postgres_conn(ip, port, database=False):
if database == True:
conn_string = "host={} port={} dbname='clickhouse' user='postgres' password='mysecretpassword'".format(
ip, port
)
else:
conn_string = (
"host={} port={} user='postgres' password='mysecretpassword'".format(
ip, port
)
)
conn = psycopg2.connect(conn_string)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
conn.autocommit = True
return conn
def create_postgres_db(conn, name): def create_postgres_db(conn, name):
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("CREATE DATABASE {}".format(name)) cursor.execute(f"CREATE DATABASE {name}")
def create_postgres_table(cursor, table_name): def create_postgres_table(cursor, table_name):
cursor.execute(postgres_dict_table_template.format(table_name)) cursor.execute(
f"""
CREATE TABLE IF NOT EXISTS {table_name} (
id Integer NOT NULL, key Integer NOT NULL, value Integer NOT NULL, PRIMARY KEY (id))
"""
)
def create_and_fill_postgres_table(cursor, table_name, port, host): def create_and_fill_postgres_table(cursor, table_name, port, host):
create_postgres_table(cursor, table_name) create_postgres_table(cursor, table_name)
# Fill postgres table using clickhouse postgres table function and check # Fill postgres table using clickhouse postgres table function and check
table_func = """postgresql('{}:{}', 'clickhouse', '{}', 'postgres', 'mysecretpassword')""".format( table_func = f"""postgresql('{host}:{port}', 'postgres_database', '{table_name}', 'postgres', 'mysecretpassword')"""
host, port, table_name
)
node1.query( node1.query(
"""INSERT INTO TABLE FUNCTION {} SELECT number, number, number from numbers(10000) f"""INSERT INTO TABLE FUNCTION {table_func} SELECT number, number, number from numbers(10000)"""
""".format(
table_func, table_name
)
) )
result = node1.query("SELECT count() FROM {}".format(table_func)) result = node1.query(f"SELECT count() FROM {table_func}")
assert result.rstrip() == "10000" assert result.rstrip() == "10000"
def create_dict(table_name, index=0): def create_dict(table_name, index=0):
node1.query(click_dict_table_template.format(table_name, "dict" + str(index))) node1.query(
f"""
CREATE TABLE IF NOT EXISTS `test`.`dict_table_{table_name}` (
`key` UInt32, `value` UInt32
) ENGINE = Dictionary(dict{str(index)})
"""
)
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def started_cluster(): def started_cluster():
@ -85,14 +63,14 @@ def started_cluster():
postgres_conn = get_postgres_conn( postgres_conn = get_postgres_conn(
ip=cluster.postgres_ip, port=cluster.postgres_port ip=cluster.postgres_ip, port=cluster.postgres_port
) )
print("postgres1 connected") logging.debug("postgres1 connected")
create_postgres_db(postgres_conn, "clickhouse") create_postgres_db(postgres_conn, "postgres_database")
postgres_conn = get_postgres_conn( postgres2_conn = get_postgres_conn(
ip=cluster.postgres2_ip, port=cluster.postgres_port ip=cluster.postgres2_ip, port=cluster.postgres_port
) )
print("postgres2 connected") logging.debug("postgres2 connected")
create_postgres_db(postgres_conn, "clickhouse") create_postgres_db(postgres2_conn, "postgres_database")
yield cluster yield cluster
@ -117,27 +95,27 @@ def test_load_dictionaries(started_cluster):
create_dict(table_name) create_dict(table_name)
dict_name = "dict0" dict_name = "dict0"
node1.query("SYSTEM RELOAD DICTIONARY {}".format(dict_name)) node1.query(f"SYSTEM RELOAD DICTIONARY {dict_name}")
assert ( assert (
node1.query( node1.query(
"SELECT count() FROM `test`.`dict_table_{}`".format(table_name) f"SELECT count() FROM `test`.`dict_table_{table_name}`"
).rstrip() ).rstrip()
== "10000" == "10000"
) )
assert ( assert (
node1.query("SELECT dictGetUInt32('{}', 'key', toUInt64(0))".format(dict_name)) node1.query(f"SELECT dictGetUInt32('{dict_name}', 'key', toUInt64(0))")
== "0\n" == "0\n"
) )
assert ( assert (
node1.query( node1.query(
"SELECT dictGetUInt32('{}', 'value', toUInt64(9999))".format(dict_name) f"SELECT dictGetUInt32('{dict_name}', 'value', toUInt64(9999))"
) )
== "9999\n" == "9999\n"
) )
cursor.execute("DROP TABLE IF EXISTS {}".format(table_name)) cursor.execute(f"DROP TABLE IF EXISTS {table_name}")
node1.query("DROP TABLE IF EXISTS {}".format(table_name)) node1.query(f"DROP TABLE IF EXISTS {table_name}")
node1.query("DROP DICTIONARY IF EXISTS {}".format(dict_name)) node1.query(f"DROP DICTIONARY IF EXISTS {dict_name}")
def test_postgres_dictionaries_custom_query_full_load(started_cluster): def test_postgres_dictionaries_custom_query_full_load(started_cluster):
@ -159,7 +137,7 @@ def test_postgres_dictionaries_custom_query_full_load(started_cluster):
query = node1.query query = node1.query
query( query(
""" f"""
CREATE DICTIONARY test_dictionary_custom_query CREATE DICTIONARY test_dictionary_custom_query
( (
id UInt64, id UInt64,
@ -169,16 +147,14 @@ def test_postgres_dictionaries_custom_query_full_load(started_cluster):
PRIMARY KEY id PRIMARY KEY id
LAYOUT(FLAT()) LAYOUT(FLAT())
SOURCE(PostgreSQL( SOURCE(PostgreSQL(
DB 'clickhouse' DB 'postgres_database'
HOST '{}' HOST '{started_cluster.postgres_ip}'
PORT {} PORT {started_cluster.postgres_port}
USER 'postgres' USER 'postgres'
PASSWORD 'mysecretpassword' PASSWORD 'mysecretpassword'
QUERY $doc$SELECT id, value_1, value_2 FROM test_table_1 INNER JOIN test_table_2 USING (id);$doc$)) QUERY $doc$SELECT id, value_1, value_2 FROM test_table_1 INNER JOIN test_table_2 USING (id);$doc$))
LIFETIME(0) LIFETIME(0)
""".format( """
started_cluster.postgres_ip, started_cluster.postgres_port
)
) )
result = query("SELECT id, value_1, value_2 FROM test_dictionary_custom_query") result = query("SELECT id, value_1, value_2 FROM test_dictionary_custom_query")
@ -210,7 +186,7 @@ def test_postgres_dictionaries_custom_query_partial_load_simple_key(started_clus
query = node1.query query = node1.query
query( query(
""" f"""
CREATE DICTIONARY test_dictionary_custom_query CREATE DICTIONARY test_dictionary_custom_query
( (
id UInt64, id UInt64,
@ -220,15 +196,13 @@ def test_postgres_dictionaries_custom_query_partial_load_simple_key(started_clus
PRIMARY KEY id PRIMARY KEY id
LAYOUT(DIRECT()) LAYOUT(DIRECT())
SOURCE(PostgreSQL( SOURCE(PostgreSQL(
DB 'clickhouse' DB 'postgres_database'
HOST '{}' HOST '{started_cluster.postgres_ip}'
PORT {} PORT {started_cluster.postgres_port}
USER 'postgres' USER 'postgres'
PASSWORD 'mysecretpassword' PASSWORD 'mysecretpassword'
QUERY $doc$SELECT id, value_1, value_2 FROM test_table_1 INNER JOIN test_table_2 USING (id) WHERE {{condition}};$doc$)) QUERY $doc$SELECT id, value_1, value_2 FROM test_table_1 INNER JOIN test_table_2 USING (id) WHERE {{condition}};$doc$))
""".format( """
started_cluster.postgres_ip, started_cluster.postgres_port
)
) )
result = query( result = query(
@ -262,7 +236,7 @@ def test_postgres_dictionaries_custom_query_partial_load_complex_key(started_clu
query = node1.query query = node1.query
query( query(
""" f"""
CREATE DICTIONARY test_dictionary_custom_query CREATE DICTIONARY test_dictionary_custom_query
( (
id UInt64, id UInt64,
@ -273,15 +247,13 @@ def test_postgres_dictionaries_custom_query_partial_load_complex_key(started_clu
PRIMARY KEY id, key PRIMARY KEY id, key
LAYOUT(COMPLEX_KEY_DIRECT()) LAYOUT(COMPLEX_KEY_DIRECT())
SOURCE(PostgreSQL( SOURCE(PostgreSQL(
DB 'clickhouse' DB 'postgres_database'
HOST '{}' HOST '{started_cluster.postgres_ip}'
PORT {} PORT {started_cluster.postgres_port}
USER 'postgres' USER 'postgres'
PASSWORD 'mysecretpassword' PASSWORD 'mysecretpassword'
QUERY $doc$SELECT id, key, value_1, value_2 FROM test_table_1 INNER JOIN test_table_2 USING (id, key) WHERE {{condition}};$doc$)) QUERY $doc$SELECT id, key, value_1, value_2 FROM test_table_1 INNER JOIN test_table_2 USING (id, key) WHERE {{condition}};$doc$))
""".format( """
started_cluster.postgres_ip, started_cluster.postgres_port
)
) )
result = query( result = query(
@ -314,70 +286,70 @@ def test_invalidate_query(started_cluster):
# invalidate query: SELECT value FROM test0 WHERE id = 0 # invalidate query: SELECT value FROM test0 WHERE id = 0
dict_name = "dict0" dict_name = "dict0"
create_dict(table_name) create_dict(table_name)
node1.query("SYSTEM RELOAD DICTIONARY {}".format(dict_name)) node1.query(f"SYSTEM RELOAD DICTIONARY {dict_name}")
assert ( assert (
node1.query( node1.query(
"SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name) f"SELECT dictGetUInt32('{dict_name}', 'value', toUInt64(0))"
) )
== "0\n" == "0\n"
) )
assert ( assert (
node1.query( node1.query(
"SELECT dictGetUInt32('{}', 'value', toUInt64(1))".format(dict_name) f"SELECT dictGetUInt32('{dict_name}', 'value', toUInt64(1))"
) )
== "1\n" == "1\n"
) )
# update should happen # update should happen
cursor.execute("UPDATE {} SET value=value+1 WHERE id = 0".format(table_name)) cursor.execute(f"UPDATE {table_name} SET value=value+1 WHERE id = 0")
while True: while True:
result = node1.query( result = node1.query(
"SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name) f"SELECT dictGetUInt32('{dict_name}', 'value', toUInt64(0))"
) )
if result != "0\n": if result != "0\n":
break break
assert ( assert (
node1.query( node1.query(
"SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name) f"SELECT dictGetUInt32('{dict_name}', 'value', toUInt64(0))"
) )
== "1\n" == "1\n"
) )
# no update should happen # no update should happen
cursor.execute("UPDATE {} SET value=value*2 WHERE id != 0".format(table_name)) cursor.execute(f"UPDATE {table_name} SET value=value*2 WHERE id != 0")
time.sleep(5) time.sleep(5)
assert ( assert (
node1.query( node1.query(
"SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name) f"SELECT dictGetUInt32('{dict_name}', 'value', toUInt64(0))"
) )
== "1\n" == "1\n"
) )
assert ( assert (
node1.query( node1.query(
"SELECT dictGetUInt32('{}', 'value', toUInt64(1))".format(dict_name) f"SELECT dictGetUInt32('{dict_name}', 'value', toUInt64(1))"
) )
== "1\n" == "1\n"
) )
# update should happen # update should happen
cursor.execute("UPDATE {} SET value=value+1 WHERE id = 0".format(table_name)) cursor.execute(f"UPDATE {table_name} SET value=value+1 WHERE id = 0")
time.sleep(5) time.sleep(5)
assert ( assert (
node1.query( node1.query(
"SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name) f"SELECT dictGetUInt32('{dict_name}', 'value', toUInt64(0))"
) )
== "2\n" == "2\n"
) )
assert ( assert (
node1.query( node1.query(
"SELECT dictGetUInt32('{}', 'value', toUInt64(1))".format(dict_name) f"SELECT dictGetUInt32('{dict_name}', 'value', toUInt64(1))"
) )
== "2\n" == "2\n"
) )
node1.query("DROP TABLE IF EXISTS {}".format(table_name)) node1.query(f"DROP TABLE IF EXISTS {table_name}")
node1.query("DROP DICTIONARY IF EXISTS {}".format(dict_name)) node1.query(f"DROP DICTIONARY IF EXISTS {dict_name}")
cursor.execute("DROP TABLE IF EXISTS {}".format(table_name)) cursor.execute(f"DROP TABLE IF EXISTS {table_name}")
def test_dictionary_with_replicas(started_cluster): def test_dictionary_with_replicas(started_cluster):
@ -446,7 +418,7 @@ def test_postgres_schema(started_cluster):
host 'postgres1' host 'postgres1'
user 'postgres' user 'postgres'
password 'mysecretpassword' password 'mysecretpassword'
db 'clickhouse' db 'postgres_database'
table 'test_schema.test_table')) table 'test_schema.test_table'))
LIFETIME(MIN 1 MAX 2) LIFETIME(MIN 1 MAX 2)
LAYOUT(HASHED()); LAYOUT(HASHED());
@ -566,7 +538,7 @@ def test_bad_configuration(started_cluster):
host 'postgres1' host 'postgres1'
user 'postgres' user 'postgres'
password 'mysecretpassword' password 'mysecretpassword'
dbbb 'clickhouse' dbbb 'postgres_database'
table 'test_schema.test_table')) table 'test_schema.test_table'))
LIFETIME(MIN 1 MAX 2) LIFETIME(MIN 1 MAX 2)
LAYOUT(HASHED()); LAYOUT(HASHED());

View File

@ -2,7 +2,7 @@ import pytest
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster
import redis import redis
cluster = ClickHouseCluster(__file__) cluster = ClickHouseCluster(__file__, name="long")
node = cluster.add_instance("node", with_redis=True) node = cluster.add_instance("node", with_redis=True)

View File

@ -3,6 +3,7 @@ import psycopg2
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry from helpers.test_tools import assert_eq_with_retry
from helpers.postgres_utility import get_postgres_conn
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
cluster = ClickHouseCluster(__file__) cluster = ClickHouseCluster(__file__)
@ -20,17 +21,6 @@ postgres_drop_table_template = """
""" """
def get_postgres_conn(cluster, database=False):
if database == True:
conn_string = f"host={cluster.postgres_ip} port={cluster.postgres_port} dbname='test_database' user='postgres' password='mysecretpassword'"
else:
conn_string = f"host={cluster.postgres_ip} port={cluster.postgres_port} user='postgres' password='mysecretpassword'"
conn = psycopg2.connect(conn_string)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
conn.autocommit = True
return conn
def create_postgres_db(cursor, name): def create_postgres_db(cursor, name):
cursor.execute("CREATE DATABASE {}".format(name)) cursor.execute("CREATE DATABASE {}".format(name))
@ -49,9 +39,9 @@ def drop_postgres_table(cursor, table_name):
def started_cluster(): def started_cluster():
try: try:
cluster.start() cluster.start()
conn = get_postgres_conn(cluster) conn = get_postgres_conn(cluster.postgres_ip, cluster.postgres_port)
cursor = conn.cursor() cursor = conn.cursor()
create_postgres_db(cursor, "test_database") create_postgres_db(cursor, "postgres_database")
yield cluster yield cluster
finally: finally:
@ -60,93 +50,93 @@ def started_cluster():
def test_postgres_database_engine_with_postgres_ddl(started_cluster): def test_postgres_database_engine_with_postgres_ddl(started_cluster):
# connect to database as well # connect to database as well
conn = get_postgres_conn(started_cluster, True) conn = get_postgres_conn(started_cluster.postgres_ip, started_cluster.postgres_port, database=True)
cursor = conn.cursor() cursor = conn.cursor()
node1.query( node1.query(
"CREATE DATABASE test_database ENGINE = PostgreSQL('postgres1:5432', 'test_database', 'postgres', 'mysecretpassword')" "CREATE DATABASE postgres_database ENGINE = PostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')"
) )
assert "test_database" in node1.query("SHOW DATABASES") assert "postgres_database" in node1.query("SHOW DATABASES")
create_postgres_table(cursor, "test_table") create_postgres_table(cursor, "test_table")
assert "test_table" in node1.query("SHOW TABLES FROM test_database") assert "test_table" in node1.query("SHOW TABLES FROM postgres_database")
cursor.execute("ALTER TABLE test_table ADD COLUMN data Text") cursor.execute("ALTER TABLE test_table ADD COLUMN data Text")
assert "data" in node1.query( assert "data" in node1.query(
"SELECT name FROM system.columns WHERE table = 'test_table' AND database = 'test_database'" "SELECT name FROM system.columns WHERE table = 'test_table' AND database = 'postgres_database'"
) )
cursor.execute("ALTER TABLE test_table DROP COLUMN data") cursor.execute("ALTER TABLE test_table DROP COLUMN data")
assert "data" not in node1.query( assert "data" not in node1.query(
"SELECT name FROM system.columns WHERE table = 'test_table' AND database = 'test_database'" "SELECT name FROM system.columns WHERE table = 'test_table' AND database = 'postgres_database'"
) )
node1.query("DROP DATABASE test_database") node1.query("DROP DATABASE postgres_database")
assert "test_database" not in node1.query("SHOW DATABASES") assert "postgres_database" not in node1.query("SHOW DATABASES")
drop_postgres_table(cursor, "test_table") drop_postgres_table(cursor, "test_table")
def test_postgresql_database_engine_with_clickhouse_ddl(started_cluster): def test_postgresql_database_engine_with_clickhouse_ddl(started_cluster):
conn = get_postgres_conn(started_cluster, True) conn = get_postgres_conn(started_cluster.postgres_ip, started_cluster.postgres_port, database=True)
cursor = conn.cursor() cursor = conn.cursor()
node1.query( node1.query(
"CREATE DATABASE test_database ENGINE = PostgreSQL('postgres1:5432', 'test_database', 'postgres', 'mysecretpassword')" "CREATE DATABASE postgres_database ENGINE = PostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')"
) )
create_postgres_table(cursor, "test_table") create_postgres_table(cursor, "test_table")
assert "test_table" in node1.query("SHOW TABLES FROM test_database") assert "test_table" in node1.query("SHOW TABLES FROM postgres_database")
node1.query("DROP TABLE test_database.test_table") node1.query("DROP TABLE postgres_database.test_table")
assert "test_table" not in node1.query("SHOW TABLES FROM test_database") assert "test_table" not in node1.query("SHOW TABLES FROM postgres_database")
node1.query("ATTACH TABLE test_database.test_table") node1.query("ATTACH TABLE postgres_database.test_table")
assert "test_table" in node1.query("SHOW TABLES FROM test_database") assert "test_table" in node1.query("SHOW TABLES FROM postgres_database")
node1.query("DETACH TABLE test_database.test_table") node1.query("DETACH TABLE postgres_database.test_table")
assert "test_table" not in node1.query("SHOW TABLES FROM test_database") assert "test_table" not in node1.query("SHOW TABLES FROM postgres_database")
node1.query("ATTACH TABLE test_database.test_table") node1.query("ATTACH TABLE postgres_database.test_table")
assert "test_table" in node1.query("SHOW TABLES FROM test_database") assert "test_table" in node1.query("SHOW TABLES FROM postgres_database")
node1.query("DROP DATABASE test_database") node1.query("DROP DATABASE postgres_database")
assert "test_database" not in node1.query("SHOW DATABASES") assert "postgres_database" not in node1.query("SHOW DATABASES")
drop_postgres_table(cursor, "test_table") drop_postgres_table(cursor, "test_table")
def test_postgresql_database_engine_queries(started_cluster): def test_postgresql_database_engine_queries(started_cluster):
conn = get_postgres_conn(started_cluster, True) conn = get_postgres_conn(started_cluster.postgres_ip, started_cluster.postgres_port, database=True)
cursor = conn.cursor() cursor = conn.cursor()
node1.query( node1.query(
"CREATE DATABASE test_database ENGINE = PostgreSQL('postgres1:5432', 'test_database', 'postgres', 'mysecretpassword')" "CREATE DATABASE postgres_database ENGINE = PostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')"
) )
create_postgres_table(cursor, "test_table") create_postgres_table(cursor, "test_table")
assert node1.query("SELECT count() FROM test_database.test_table").rstrip() == "0" assert node1.query("SELECT count() FROM postgres_database.test_table").rstrip() == "0"
node1.query( node1.query(
"INSERT INTO test_database.test_table SELECT number, number from numbers(10000)" "INSERT INTO postgres_database.test_table SELECT number, number from numbers(10000)"
) )
assert ( assert (
node1.query("SELECT count() FROM test_database.test_table").rstrip() == "10000" node1.query("SELECT count() FROM postgres_database.test_table").rstrip() == "10000"
) )
drop_postgres_table(cursor, "test_table") drop_postgres_table(cursor, "test_table")
assert "test_table" not in node1.query("SHOW TABLES FROM test_database") assert "test_table" not in node1.query("SHOW TABLES FROM postgres_database")
node1.query("DROP DATABASE test_database") node1.query("DROP DATABASE postgres_database")
assert "test_database" not in node1.query("SHOW DATABASES") assert "postgres_database" not in node1.query("SHOW DATABASES")
def test_get_create_table_query_with_multidim_arrays(started_cluster): def test_get_create_table_query_with_multidim_arrays(started_cluster):
conn = get_postgres_conn(started_cluster, True) conn = get_postgres_conn(started_cluster.postgres_ip, started_cluster.postgres_port, database=True)
cursor = conn.cursor() cursor = conn.cursor()
node1.query( node1.query(
"CREATE DATABASE test_database ENGINE = PostgreSQL('postgres1:5432', 'test_database', 'postgres', 'mysecretpassword')" "CREATE DATABASE postgres_database ENGINE = PostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')"
) )
cursor.execute( cursor.execute(
@ -157,11 +147,11 @@ def test_get_create_table_query_with_multidim_arrays(started_cluster):
)""" )"""
) )
node1.query("DETACH TABLE test_database.array_columns") node1.query("DETACH TABLE postgres_database.array_columns")
node1.query("ATTACH TABLE test_database.array_columns") node1.query("ATTACH TABLE postgres_database.array_columns")
node1.query( node1.query(
"INSERT INTO test_database.array_columns " "INSERT INTO postgres_database.array_columns "
"VALUES (" "VALUES ("
"[[[1, 1], [1, 1]], [[3, 3], [3, 3]], [[4, 4], [5, 5]]], " "[[[1, 1], [1, 1]], [[3, 3], [3, 3]], [[4, 4], [5, 5]]], "
"[[[1, NULL], [NULL, 1]], [[NULL, NULL], [NULL, NULL]], [[4, 4], [5, 5]]] " "[[[1, NULL], [NULL, 1]], [[NULL, NULL], [NULL, NULL]], [[4, 4], [5, 5]]] "
@ -169,7 +159,7 @@ def test_get_create_table_query_with_multidim_arrays(started_cluster):
) )
result = node1.query( result = node1.query(
""" """
SELECT * FROM test_database.array_columns""" SELECT * FROM postgres_database.array_columns"""
) )
expected = ( expected = (
"[[[1,1],[1,1]],[[3,3],[3,3]],[[4,4],[5,5]]]\t" "[[[1,1],[1,1]],[[3,3],[3,3]],[[4,4],[5,5]]]\t"
@ -177,64 +167,64 @@ def test_get_create_table_query_with_multidim_arrays(started_cluster):
) )
assert result == expected assert result == expected
node1.query("DROP DATABASE test_database") node1.query("DROP DATABASE postgres_database")
assert "test_database" not in node1.query("SHOW DATABASES") assert "postgres_database" not in node1.query("SHOW DATABASES")
drop_postgres_table(cursor, "array_columns") drop_postgres_table(cursor, "array_columns")
def test_postgresql_database_engine_table_cache(started_cluster): def test_postgresql_database_engine_table_cache(started_cluster):
conn = get_postgres_conn(started_cluster, True) conn = get_postgres_conn(started_cluster.postgres_ip, started_cluster.postgres_port, database=True)
cursor = conn.cursor() cursor = conn.cursor()
node1.query( node1.query(
"CREATE DATABASE test_database ENGINE = PostgreSQL('postgres1:5432', 'test_database', 'postgres', 'mysecretpassword', '', 1)" "CREATE DATABASE postgres_database ENGINE = PostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword', '', 1)"
) )
create_postgres_table(cursor, "test_table") create_postgres_table(cursor, "test_table")
assert ( assert (
node1.query("DESCRIBE TABLE test_database.test_table").rstrip() node1.query("DESCRIBE TABLE postgres_database.test_table").rstrip()
== "id\tInt32\t\t\t\t\t\nvalue\tNullable(Int32)" == "id\tInt32\t\t\t\t\t\nvalue\tNullable(Int32)"
) )
cursor.execute("ALTER TABLE test_table ADD COLUMN data Text") cursor.execute("ALTER TABLE test_table ADD COLUMN data Text")
assert ( assert (
node1.query("DESCRIBE TABLE test_database.test_table").rstrip() node1.query("DESCRIBE TABLE postgres_database.test_table").rstrip()
== "id\tInt32\t\t\t\t\t\nvalue\tNullable(Int32)" == "id\tInt32\t\t\t\t\t\nvalue\tNullable(Int32)"
) )
node1.query("DETACH TABLE test_database.test_table") node1.query("DETACH TABLE postgres_database.test_table")
assert "test_table" not in node1.query("SHOW TABLES FROM test_database") assert "test_table" not in node1.query("SHOW TABLES FROM postgres_database")
node1.query("ATTACH TABLE test_database.test_table") node1.query("ATTACH TABLE postgres_database.test_table")
assert "test_table" in node1.query("SHOW TABLES FROM test_database") assert "test_table" in node1.query("SHOW TABLES FROM postgres_database")
assert ( assert (
node1.query("DESCRIBE TABLE test_database.test_table").rstrip() node1.query("DESCRIBE TABLE postgres_database.test_table").rstrip()
== "id\tInt32\t\t\t\t\t\nvalue\tNullable(Int32)\t\t\t\t\t\ndata\tNullable(String)" == "id\tInt32\t\t\t\t\t\nvalue\tNullable(Int32)\t\t\t\t\t\ndata\tNullable(String)"
) )
node1.query("DROP TABLE test_database.test_table") node1.query("DROP TABLE postgres_database.test_table")
assert "test_table" not in node1.query("SHOW TABLES FROM test_database") assert "test_table" not in node1.query("SHOW TABLES FROM postgres_database")
node1.query("ATTACH TABLE test_database.test_table") node1.query("ATTACH TABLE postgres_database.test_table")
assert "test_table" in node1.query("SHOW TABLES FROM test_database") assert "test_table" in node1.query("SHOW TABLES FROM postgres_database")
node1.query( node1.query(
"INSERT INTO test_database.test_table SELECT number, number, toString(number) from numbers(10000)" "INSERT INTO postgres_database.test_table SELECT number, number, toString(number) from numbers(10000)"
) )
assert ( assert (
node1.query("SELECT count() FROM test_database.test_table").rstrip() == "10000" node1.query("SELECT count() FROM postgres_database.test_table").rstrip() == "10000"
) )
cursor.execute("DROP TABLE test_table;") cursor.execute("DROP TABLE test_table;")
assert "test_table" not in node1.query("SHOW TABLES FROM test_database") assert "test_table" not in node1.query("SHOW TABLES FROM postgres_database")
node1.query("DROP DATABASE test_database") node1.query("DROP DATABASE postgres_database")
assert "test_database" not in node1.query("SHOW DATABASES") assert "postgres_database" not in node1.query("SHOW DATABASES")
def test_postgresql_database_with_schema(started_cluster): def test_postgresql_database_with_schema(started_cluster):
conn = get_postgres_conn(started_cluster, True) conn = get_postgres_conn(started_cluster.postgres_ip, started_cluster.postgres_port, database=True)
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("CREATE SCHEMA test_schema") cursor.execute("CREATE SCHEMA test_schema")
@ -243,17 +233,17 @@ def test_postgresql_database_with_schema(started_cluster):
cursor.execute("CREATE TABLE table3 (a integer)") cursor.execute("CREATE TABLE table3 (a integer)")
node1.query( node1.query(
"CREATE DATABASE test_database ENGINE = PostgreSQL('postgres1:5432', 'test_database', 'postgres', 'mysecretpassword', 'test_schema')" "CREATE DATABASE postgres_database ENGINE = PostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword', 'test_schema')"
) )
assert node1.query("SHOW TABLES FROM test_database") == "table1\ntable2\n" assert node1.query("SHOW TABLES FROM postgres_database") == "table1\ntable2\n"
node1.query("INSERT INTO test_database.table1 SELECT number from numbers(10000)") node1.query("INSERT INTO postgres_database.table1 SELECT number from numbers(10000)")
assert node1.query("SELECT count() FROM test_database.table1").rstrip() == "10000" assert node1.query("SELECT count() FROM postgres_database.table1").rstrip() == "10000"
node1.query("DETACH TABLE test_database.table1") node1.query("DETACH TABLE postgres_database.table1")
node1.query("ATTACH TABLE test_database.table1") node1.query("ATTACH TABLE postgres_database.table1")
assert node1.query("SELECT count() FROM test_database.table1").rstrip() == "10000" assert node1.query("SELECT count() FROM postgres_database.table1").rstrip() == "10000"
node1.query("DROP DATABASE test_database") node1.query("DROP DATABASE postgres_database")
cursor.execute("DROP SCHEMA test_schema CASCADE") cursor.execute("DROP SCHEMA test_schema CASCADE")
cursor.execute("DROP TABLE table3") cursor.execute("DROP TABLE table3")
@ -321,18 +311,18 @@ def test_predefined_connection_configuration(started_cluster):
def test_postgres_database_old_syntax(started_cluster): def test_postgres_database_old_syntax(started_cluster):
conn = get_postgres_conn(started_cluster, True) conn = get_postgres_conn(started_cluster.postgres_ip, started_cluster.postgres_port, database=True)
cursor = conn.cursor() cursor = conn.cursor()
node1.query( node1.query(
""" """
DROP DATABASE IF EXISTS test_database; CREATE DATABASE postgres_database ENGINE = PostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword', 1);
CREATE DATABASE test_database ENGINE = PostgreSQL('postgres1:5432', 'test_database', 'postgres', 'mysecretpassword', 1);
""" """
) )
create_postgres_table(cursor, "test_table") create_postgres_table(cursor, "test_table")
assert "test_table" in node1.query("SHOW TABLES FROM test_database") assert "test_table" in node1.query("SHOW TABLES FROM postgres_database")
cursor.execute(f"DROP TABLE test_table ")
node1.query("DROP DATABASE IF EXISTS postgres_database;")
if __name__ == "__main__": if __name__ == "__main__":
cluster.start() cluster.start()