Merge pull request #38582 from qoega/integration-postgres-better

This commit is contained in:
Ilya Yatsishin 2022-07-04 13:00:54 +02:00 committed by GitHub
commit 703c3f1938
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 166 additions and 194 deletions

View File

@ -4,7 +4,7 @@
<name>dict0</name>
<source>
<postgresql>
<db>clickhouse</db>
<db>postgres_database</db>
<host>postgres1</host>
<port>5432</port>
<user>postgres</user>
@ -38,7 +38,7 @@
<name>dict1</name>
<source>
<postgresql>
<db>clickhouse</db>
<db>postgres_database</db>
<user>postgres</user>
<password>mysecretpassword</password>
<table>test1</table>

View File

@ -5,7 +5,7 @@
<password>mysecretpassword</password>
<host>postgres1</host>
<port>5432</port>
<database>clickhouse</database>
<database>postgres_database</database>
<table>test_table</table>
</postgres1>
<postgres2>
@ -13,7 +13,7 @@
<password>mysecretpassword</password>
<host>postgres1</host>
<port>5432</port>
<database>clickhouse</database>
<database>postgres_database</database>
<table>test_table</table>
<schema>test_schema</schema>
</postgres2>

View File

@ -1,9 +1,11 @@
import pytest
import time
import logging
import psycopg2
from multiprocessing.dummy import Pool
from helpers.cluster import ClickHouseCluster
from helpers.postgres_utility import get_postgres_conn
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
cluster = ClickHouseCluster(__file__)
@ -18,62 +20,40 @@ node1 = cluster.add_instance(
with_postgres_cluster=True,
)
postgres_dict_table_template = """
CREATE TABLE IF NOT EXISTS {} (
id Integer NOT NULL, key Integer NOT NULL, value Integer NOT NULL, PRIMARY KEY (id))
"""
click_dict_table_template = """
CREATE TABLE IF NOT EXISTS `test`.`dict_table_{}` (
`key` UInt32, `value` UInt32
) ENGINE = Dictionary({})
"""
def get_postgres_conn(ip, port, database=False):
if database == True:
conn_string = "host={} port={} dbname='clickhouse' user='postgres' password='mysecretpassword'".format(
ip, port
)
else:
conn_string = (
"host={} port={} user='postgres' password='mysecretpassword'".format(
ip, port
)
)
conn = psycopg2.connect(conn_string)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
conn.autocommit = True
return conn
def create_postgres_db(conn, name):
cursor = conn.cursor()
cursor.execute("CREATE DATABASE {}".format(name))
cursor.execute(f"CREATE DATABASE {name}")
def create_postgres_table(cursor, table_name):
cursor.execute(postgres_dict_table_template.format(table_name))
cursor.execute(
f"""
CREATE TABLE IF NOT EXISTS {table_name} (
id Integer NOT NULL, key Integer NOT NULL, value Integer NOT NULL, PRIMARY KEY (id))
"""
)
def create_and_fill_postgres_table(cursor, table_name, port, host):
create_postgres_table(cursor, table_name)
# Fill postgres table using clickhouse postgres table function and check
table_func = """postgresql('{}:{}', 'clickhouse', '{}', 'postgres', 'mysecretpassword')""".format(
host, port, table_name
)
table_func = f"""postgresql('{host}:{port}', 'postgres_database', '{table_name}', 'postgres', 'mysecretpassword')"""
node1.query(
"""INSERT INTO TABLE FUNCTION {} SELECT number, number, number from numbers(10000)
""".format(
table_func, table_name
f"""INSERT INTO TABLE FUNCTION {table_func} SELECT number, number, number from numbers(10000)"""
)
)
result = node1.query("SELECT count() FROM {}".format(table_func))
result = node1.query(f"SELECT count() FROM {table_func}")
assert result.rstrip() == "10000"
def create_dict(table_name, index=0):
node1.query(click_dict_table_template.format(table_name, "dict" + str(index)))
node1.query(
f"""
CREATE TABLE IF NOT EXISTS `test`.`dict_table_{table_name}` (
`key` UInt32, `value` UInt32
) ENGINE = Dictionary(dict{str(index)})
"""
)
@pytest.fixture(scope="module")
@ -85,14 +65,14 @@ def started_cluster():
postgres_conn = get_postgres_conn(
ip=cluster.postgres_ip, port=cluster.postgres_port
)
print("postgres1 connected")
create_postgres_db(postgres_conn, "clickhouse")
logging.debug("postgres1 connected")
create_postgres_db(postgres_conn, "postgres_database")
postgres_conn = get_postgres_conn(
postgres2_conn = get_postgres_conn(
ip=cluster.postgres2_ip, port=cluster.postgres_port
)
print("postgres2 connected")
create_postgres_db(postgres_conn, "clickhouse")
logging.debug("postgres2 connected")
create_postgres_db(postgres2_conn, "postgres_database")
yield cluster
@ -117,27 +97,22 @@ def test_load_dictionaries(started_cluster):
create_dict(table_name)
dict_name = "dict0"
node1.query("SYSTEM RELOAD DICTIONARY {}".format(dict_name))
node1.query(f"SYSTEM RELOAD DICTIONARY {dict_name}")
assert (
node1.query(
"SELECT count() FROM `test`.`dict_table_{}`".format(table_name)
).rstrip()
node1.query(f"SELECT count() FROM `test`.`dict_table_{table_name}`").rstrip()
== "10000"
)
assert (
node1.query("SELECT dictGetUInt32('{}', 'key', toUInt64(0))".format(dict_name))
== "0\n"
node1.query(f"SELECT dictGetUInt32('{dict_name}', 'key', toUInt64(0))") == "0\n"
)
assert (
node1.query(
"SELECT dictGetUInt32('{}', 'value', toUInt64(9999))".format(dict_name)
)
node1.query(f"SELECT dictGetUInt32('{dict_name}', 'value', toUInt64(9999))")
== "9999\n"
)
cursor.execute("DROP TABLE IF EXISTS {}".format(table_name))
node1.query("DROP TABLE IF EXISTS {}".format(table_name))
node1.query("DROP DICTIONARY IF EXISTS {}".format(dict_name))
cursor.execute(f"DROP TABLE IF EXISTS {table_name}")
node1.query(f"DROP TABLE IF EXISTS {table_name}")
node1.query(f"DROP DICTIONARY IF EXISTS {dict_name}")
def test_postgres_dictionaries_custom_query_full_load(started_cluster):
@ -159,7 +134,7 @@ def test_postgres_dictionaries_custom_query_full_load(started_cluster):
query = node1.query
query(
"""
f"""
CREATE DICTIONARY test_dictionary_custom_query
(
id UInt64,
@ -169,16 +144,14 @@ def test_postgres_dictionaries_custom_query_full_load(started_cluster):
PRIMARY KEY id
LAYOUT(FLAT())
SOURCE(PostgreSQL(
DB 'clickhouse'
HOST '{}'
PORT {}
DB 'postgres_database'
HOST '{started_cluster.postgres_ip}'
PORT {started_cluster.postgres_port}
USER 'postgres'
PASSWORD 'mysecretpassword'
QUERY $doc$SELECT id, value_1, value_2 FROM test_table_1 INNER JOIN test_table_2 USING (id);$doc$))
LIFETIME(0)
""".format(
started_cluster.postgres_ip, started_cluster.postgres_port
)
"""
)
result = query("SELECT id, value_1, value_2 FROM test_dictionary_custom_query")
@ -210,7 +183,7 @@ def test_postgres_dictionaries_custom_query_partial_load_simple_key(started_clus
query = node1.query
query(
"""
f"""
CREATE DICTIONARY test_dictionary_custom_query
(
id UInt64,
@ -220,15 +193,13 @@ def test_postgres_dictionaries_custom_query_partial_load_simple_key(started_clus
PRIMARY KEY id
LAYOUT(DIRECT())
SOURCE(PostgreSQL(
DB 'clickhouse'
HOST '{}'
PORT {}
DB 'postgres_database'
HOST '{started_cluster.postgres_ip}'
PORT {started_cluster.postgres_port}
USER 'postgres'
PASSWORD 'mysecretpassword'
QUERY $doc$SELECT id, value_1, value_2 FROM test_table_1 INNER JOIN test_table_2 USING (id) WHERE {{condition}};$doc$))
""".format(
started_cluster.postgres_ip, started_cluster.postgres_port
)
"""
)
result = query(
@ -262,7 +233,7 @@ def test_postgres_dictionaries_custom_query_partial_load_complex_key(started_clu
query = node1.query
query(
"""
f"""
CREATE DICTIONARY test_dictionary_custom_query
(
id UInt64,
@ -273,15 +244,13 @@ def test_postgres_dictionaries_custom_query_partial_load_complex_key(started_clu
PRIMARY KEY id, key
LAYOUT(COMPLEX_KEY_DIRECT())
SOURCE(PostgreSQL(
DB 'clickhouse'
HOST '{}'
PORT {}
DB 'postgres_database'
HOST '{started_cluster.postgres_ip}'
PORT {started_cluster.postgres_port}
USER 'postgres'
PASSWORD 'mysecretpassword'
QUERY $doc$SELECT id, key, value_1, value_2 FROM test_table_1 INNER JOIN test_table_2 USING (id, key) WHERE {{condition}};$doc$))
""".format(
started_cluster.postgres_ip, started_cluster.postgres_port
)
"""
)
result = query(
@ -314,70 +283,56 @@ def test_invalidate_query(started_cluster):
# invalidate query: SELECT value FROM test0 WHERE id = 0
dict_name = "dict0"
create_dict(table_name)
node1.query("SYSTEM RELOAD DICTIONARY {}".format(dict_name))
node1.query(f"SYSTEM RELOAD DICTIONARY {dict_name}")
assert (
node1.query(
"SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name)
)
node1.query(f"SELECT dictGetUInt32('{dict_name}', 'value', toUInt64(0))")
== "0\n"
)
assert (
node1.query(
"SELECT dictGetUInt32('{}', 'value', toUInt64(1))".format(dict_name)
)
node1.query(f"SELECT dictGetUInt32('{dict_name}', 'value', toUInt64(1))")
== "1\n"
)
# update should happen
cursor.execute("UPDATE {} SET value=value+1 WHERE id = 0".format(table_name))
cursor.execute(f"UPDATE {table_name} SET value=value+1 WHERE id = 0")
while True:
result = node1.query(
"SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name)
f"SELECT dictGetUInt32('{dict_name}', 'value', toUInt64(0))"
)
if result != "0\n":
break
assert (
node1.query(
"SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name)
)
node1.query(f"SELECT dictGetUInt32('{dict_name}', 'value', toUInt64(0))")
== "1\n"
)
# no update should happen
cursor.execute("UPDATE {} SET value=value*2 WHERE id != 0".format(table_name))
cursor.execute(f"UPDATE {table_name} SET value=value*2 WHERE id != 0")
time.sleep(5)
assert (
node1.query(
"SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name)
)
node1.query(f"SELECT dictGetUInt32('{dict_name}', 'value', toUInt64(0))")
== "1\n"
)
assert (
node1.query(
"SELECT dictGetUInt32('{}', 'value', toUInt64(1))".format(dict_name)
)
node1.query(f"SELECT dictGetUInt32('{dict_name}', 'value', toUInt64(1))")
== "1\n"
)
# update should happen
cursor.execute("UPDATE {} SET value=value+1 WHERE id = 0".format(table_name))
cursor.execute(f"UPDATE {table_name} SET value=value+1 WHERE id = 0")
time.sleep(5)
assert (
node1.query(
"SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name)
)
node1.query(f"SELECT dictGetUInt32('{dict_name}', 'value', toUInt64(0))")
== "2\n"
)
assert (
node1.query(
"SELECT dictGetUInt32('{}', 'value', toUInt64(1))".format(dict_name)
)
node1.query(f"SELECT dictGetUInt32('{dict_name}', 'value', toUInt64(1))")
== "2\n"
)
node1.query("DROP TABLE IF EXISTS {}".format(table_name))
node1.query("DROP DICTIONARY IF EXISTS {}".format(dict_name))
cursor.execute("DROP TABLE IF EXISTS {}".format(table_name))
node1.query(f"DROP TABLE IF EXISTS {table_name}")
node1.query(f"DROP DICTIONARY IF EXISTS {dict_name}")
cursor.execute(f"DROP TABLE IF EXISTS {table_name}")
def test_dictionary_with_replicas(started_cluster):
@ -446,7 +401,7 @@ def test_postgres_schema(started_cluster):
host 'postgres1'
user 'postgres'
password 'mysecretpassword'
db 'clickhouse'
db 'postgres_database'
table 'test_schema.test_table'))
LIFETIME(MIN 1 MAX 2)
LAYOUT(HASHED());
@ -458,6 +413,8 @@ def test_postgres_schema(started_cluster):
result = node1.query("SELECT dictGetUInt32(postgres_dict, 'value', toUInt64(99))")
assert int(result.strip()) == 99
node1.query("DROP DICTIONARY IF EXISTS postgres_dict")
cursor.execute("DROP TABLE test_schema.test_table")
cursor.execute("DROP SCHEMA test_schema")
def test_predefined_connection_configuration(started_cluster):
@ -566,7 +523,7 @@ def test_bad_configuration(started_cluster):
host 'postgres1'
user 'postgres'
password 'mysecretpassword'
dbbb 'clickhouse'
dbbb 'postgres_database'
table 'test_schema.test_table'))
LIFETIME(MIN 1 MAX 2)
LAYOUT(HASHED());

View File

@ -2,7 +2,7 @@ import pytest
from helpers.cluster import ClickHouseCluster
import redis
cluster = ClickHouseCluster(__file__)
cluster = ClickHouseCluster(__file__, name="long")
node = cluster.add_instance("node", with_redis=True)

View File

@ -3,6 +3,7 @@ import psycopg2
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
from helpers.postgres_utility import get_postgres_conn
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
cluster = ClickHouseCluster(__file__)
@ -20,17 +21,6 @@ postgres_drop_table_template = """
"""
def get_postgres_conn(cluster, database=False):
if database == True:
conn_string = f"host={cluster.postgres_ip} port={cluster.postgres_port} dbname='test_database' user='postgres' password='mysecretpassword'"
else:
conn_string = f"host={cluster.postgres_ip} port={cluster.postgres_port} user='postgres' password='mysecretpassword'"
conn = psycopg2.connect(conn_string)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
conn.autocommit = True
return conn
def create_postgres_db(cursor, name):
cursor.execute("CREATE DATABASE {}".format(name))
@ -49,9 +39,9 @@ def drop_postgres_table(cursor, table_name):
def started_cluster():
try:
cluster.start()
conn = get_postgres_conn(cluster)
conn = get_postgres_conn(cluster.postgres_ip, cluster.postgres_port)
cursor = conn.cursor()
create_postgres_db(cursor, "test_database")
create_postgres_db(cursor, "postgres_database")
yield cluster
finally:
@ -60,93 +50,104 @@ def started_cluster():
def test_postgres_database_engine_with_postgres_ddl(started_cluster):
# connect to database as well
conn = get_postgres_conn(started_cluster, True)
conn = get_postgres_conn(
started_cluster.postgres_ip, started_cluster.postgres_port, database=True
)
cursor = conn.cursor()
node1.query(
"CREATE DATABASE test_database ENGINE = PostgreSQL('postgres1:5432', 'test_database', 'postgres', 'mysecretpassword')"
"CREATE DATABASE postgres_database ENGINE = PostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')"
)
assert "test_database" in node1.query("SHOW DATABASES")
assert "postgres_database" in node1.query("SHOW DATABASES")
create_postgres_table(cursor, "test_table")
assert "test_table" in node1.query("SHOW TABLES FROM test_database")
assert "test_table" in node1.query("SHOW TABLES FROM postgres_database")
cursor.execute("ALTER TABLE test_table ADD COLUMN data Text")
assert "data" in node1.query(
"SELECT name FROM system.columns WHERE table = 'test_table' AND database = 'test_database'"
"SELECT name FROM system.columns WHERE table = 'test_table' AND database = 'postgres_database'"
)
cursor.execute("ALTER TABLE test_table DROP COLUMN data")
assert "data" not in node1.query(
"SELECT name FROM system.columns WHERE table = 'test_table' AND database = 'test_database'"
"SELECT name FROM system.columns WHERE table = 'test_table' AND database = 'postgres_database'"
)
node1.query("DROP DATABASE test_database")
assert "test_database" not in node1.query("SHOW DATABASES")
node1.query("DROP DATABASE postgres_database")
assert "postgres_database" not in node1.query("SHOW DATABASES")
drop_postgres_table(cursor, "test_table")
def test_postgresql_database_engine_with_clickhouse_ddl(started_cluster):
conn = get_postgres_conn(started_cluster, True)
conn = get_postgres_conn(
started_cluster.postgres_ip, started_cluster.postgres_port, database=True
)
cursor = conn.cursor()
node1.query(
"CREATE DATABASE test_database ENGINE = PostgreSQL('postgres1:5432', 'test_database', 'postgres', 'mysecretpassword')"
"CREATE DATABASE postgres_database ENGINE = PostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')"
)
create_postgres_table(cursor, "test_table")
assert "test_table" in node1.query("SHOW TABLES FROM test_database")
assert "test_table" in node1.query("SHOW TABLES FROM postgres_database")
node1.query("DROP TABLE test_database.test_table")
assert "test_table" not in node1.query("SHOW TABLES FROM test_database")
node1.query("DROP TABLE postgres_database.test_table")
assert "test_table" not in node1.query("SHOW TABLES FROM postgres_database")
node1.query("ATTACH TABLE test_database.test_table")
assert "test_table" in node1.query("SHOW TABLES FROM test_database")
node1.query("ATTACH TABLE postgres_database.test_table")
assert "test_table" in node1.query("SHOW TABLES FROM postgres_database")
node1.query("DETACH TABLE test_database.test_table")
assert "test_table" not in node1.query("SHOW TABLES FROM test_database")
node1.query("DETACH TABLE postgres_database.test_table")
assert "test_table" not in node1.query("SHOW TABLES FROM postgres_database")
node1.query("ATTACH TABLE test_database.test_table")
assert "test_table" in node1.query("SHOW TABLES FROM test_database")
node1.query("ATTACH TABLE postgres_database.test_table")
assert "test_table" in node1.query("SHOW TABLES FROM postgres_database")
node1.query("DROP DATABASE test_database")
assert "test_database" not in node1.query("SHOW DATABASES")
node1.query("DROP DATABASE postgres_database")
assert "postgres_database" not in node1.query("SHOW DATABASES")
drop_postgres_table(cursor, "test_table")
def test_postgresql_database_engine_queries(started_cluster):
conn = get_postgres_conn(started_cluster, True)
conn = get_postgres_conn(
started_cluster.postgres_ip, started_cluster.postgres_port, database=True
)
cursor = conn.cursor()
node1.query(
"CREATE DATABASE test_database ENGINE = PostgreSQL('postgres1:5432', 'test_database', 'postgres', 'mysecretpassword')"
"CREATE DATABASE postgres_database ENGINE = PostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')"
)
create_postgres_table(cursor, "test_table")
assert node1.query("SELECT count() FROM test_database.test_table").rstrip() == "0"
assert (
node1.query("SELECT count() FROM postgres_database.test_table").rstrip() == "0"
)
node1.query(
"INSERT INTO test_database.test_table SELECT number, number from numbers(10000)"
"INSERT INTO postgres_database.test_table SELECT number, number from numbers(10000)"
)
assert (
node1.query("SELECT count() FROM test_database.test_table").rstrip() == "10000"
node1.query("SELECT count() FROM postgres_database.test_table").rstrip()
== "10000"
)
drop_postgres_table(cursor, "test_table")
assert "test_table" not in node1.query("SHOW TABLES FROM test_database")
assert "test_table" not in node1.query("SHOW TABLES FROM postgres_database")
node1.query("DROP DATABASE test_database")
assert "test_database" not in node1.query("SHOW DATABASES")
node1.query("DROP DATABASE postgres_database")
assert "postgres_database" not in node1.query("SHOW DATABASES")
def test_get_create_table_query_with_multidim_arrays(started_cluster):
conn = get_postgres_conn(started_cluster, True)
conn = get_postgres_conn(
started_cluster.postgres_ip, started_cluster.postgres_port, database=True
)
cursor = conn.cursor()
node1.query(
"CREATE DATABASE test_database ENGINE = PostgreSQL('postgres1:5432', 'test_database', 'postgres', 'mysecretpassword')"
"CREATE DATABASE postgres_database ENGINE = PostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')"
)
cursor.execute(
@ -157,11 +158,11 @@ def test_get_create_table_query_with_multidim_arrays(started_cluster):
)"""
)
node1.query("DETACH TABLE test_database.array_columns")
node1.query("ATTACH TABLE test_database.array_columns")
node1.query("DETACH TABLE postgres_database.array_columns")
node1.query("ATTACH TABLE postgres_database.array_columns")
node1.query(
"INSERT INTO test_database.array_columns "
"INSERT INTO postgres_database.array_columns "
"VALUES ("
"[[[1, 1], [1, 1]], [[3, 3], [3, 3]], [[4, 4], [5, 5]]], "
"[[[1, NULL], [NULL, 1]], [[NULL, NULL], [NULL, NULL]], [[4, 4], [5, 5]]] "
@ -169,7 +170,7 @@ def test_get_create_table_query_with_multidim_arrays(started_cluster):
)
result = node1.query(
"""
SELECT * FROM test_database.array_columns"""
SELECT * FROM postgres_database.array_columns"""
)
expected = (
"[[[1,1],[1,1]],[[3,3],[3,3]],[[4,4],[5,5]]]\t"
@ -177,64 +178,69 @@ def test_get_create_table_query_with_multidim_arrays(started_cluster):
)
assert result == expected
node1.query("DROP DATABASE test_database")
assert "test_database" not in node1.query("SHOW DATABASES")
node1.query("DROP DATABASE postgres_database")
assert "postgres_database" not in node1.query("SHOW DATABASES")
drop_postgres_table(cursor, "array_columns")
def test_postgresql_database_engine_table_cache(started_cluster):
conn = get_postgres_conn(started_cluster, True)
conn = get_postgres_conn(
started_cluster.postgres_ip, started_cluster.postgres_port, database=True
)
cursor = conn.cursor()
node1.query(
"CREATE DATABASE test_database ENGINE = PostgreSQL('postgres1:5432', 'test_database', 'postgres', 'mysecretpassword', '', 1)"
"CREATE DATABASE postgres_database ENGINE = PostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword', '', 1)"
)
create_postgres_table(cursor, "test_table")
assert (
node1.query("DESCRIBE TABLE test_database.test_table").rstrip()
node1.query("DESCRIBE TABLE postgres_database.test_table").rstrip()
== "id\tInt32\t\t\t\t\t\nvalue\tNullable(Int32)"
)
cursor.execute("ALTER TABLE test_table ADD COLUMN data Text")
assert (
node1.query("DESCRIBE TABLE test_database.test_table").rstrip()
node1.query("DESCRIBE TABLE postgres_database.test_table").rstrip()
== "id\tInt32\t\t\t\t\t\nvalue\tNullable(Int32)"
)
node1.query("DETACH TABLE test_database.test_table")
assert "test_table" not in node1.query("SHOW TABLES FROM test_database")
node1.query("DETACH TABLE postgres_database.test_table")
assert "test_table" not in node1.query("SHOW TABLES FROM postgres_database")
node1.query("ATTACH TABLE test_database.test_table")
assert "test_table" in node1.query("SHOW TABLES FROM test_database")
node1.query("ATTACH TABLE postgres_database.test_table")
assert "test_table" in node1.query("SHOW TABLES FROM postgres_database")
assert (
node1.query("DESCRIBE TABLE test_database.test_table").rstrip()
node1.query("DESCRIBE TABLE postgres_database.test_table").rstrip()
== "id\tInt32\t\t\t\t\t\nvalue\tNullable(Int32)\t\t\t\t\t\ndata\tNullable(String)"
)
node1.query("DROP TABLE test_database.test_table")
assert "test_table" not in node1.query("SHOW TABLES FROM test_database")
node1.query("DROP TABLE postgres_database.test_table")
assert "test_table" not in node1.query("SHOW TABLES FROM postgres_database")
node1.query("ATTACH TABLE test_database.test_table")
assert "test_table" in node1.query("SHOW TABLES FROM test_database")
node1.query("ATTACH TABLE postgres_database.test_table")
assert "test_table" in node1.query("SHOW TABLES FROM postgres_database")
node1.query(
"INSERT INTO test_database.test_table SELECT number, number, toString(number) from numbers(10000)"
"INSERT INTO postgres_database.test_table SELECT number, number, toString(number) from numbers(10000)"
)
assert (
node1.query("SELECT count() FROM test_database.test_table").rstrip() == "10000"
node1.query("SELECT count() FROM postgres_database.test_table").rstrip()
== "10000"
)
cursor.execute("DROP TABLE test_table;")
assert "test_table" not in node1.query("SHOW TABLES FROM test_database")
assert "test_table" not in node1.query("SHOW TABLES FROM postgres_database")
node1.query("DROP DATABASE test_database")
assert "test_database" not in node1.query("SHOW DATABASES")
node1.query("DROP DATABASE postgres_database")
assert "postgres_database" not in node1.query("SHOW DATABASES")
def test_postgresql_database_with_schema(started_cluster):
conn = get_postgres_conn(started_cluster, True)
conn = get_postgres_conn(
started_cluster.postgres_ip, started_cluster.postgres_port, database=True
)
cursor = conn.cursor()
cursor.execute("CREATE SCHEMA test_schema")
@ -243,17 +249,23 @@ def test_postgresql_database_with_schema(started_cluster):
cursor.execute("CREATE TABLE table3 (a integer)")
node1.query(
"CREATE DATABASE test_database ENGINE = PostgreSQL('postgres1:5432', 'test_database', 'postgres', 'mysecretpassword', 'test_schema')"
"CREATE DATABASE postgres_database ENGINE = PostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword', 'test_schema')"
)
assert node1.query("SHOW TABLES FROM test_database") == "table1\ntable2\n"
assert node1.query("SHOW TABLES FROM postgres_database") == "table1\ntable2\n"
node1.query("INSERT INTO test_database.table1 SELECT number from numbers(10000)")
assert node1.query("SELECT count() FROM test_database.table1").rstrip() == "10000"
node1.query("DETACH TABLE test_database.table1")
node1.query("ATTACH TABLE test_database.table1")
assert node1.query("SELECT count() FROM test_database.table1").rstrip() == "10000"
node1.query("DROP DATABASE test_database")
node1.query(
"INSERT INTO postgres_database.table1 SELECT number from numbers(10000)"
)
assert (
node1.query("SELECT count() FROM postgres_database.table1").rstrip() == "10000"
)
node1.query("DETACH TABLE postgres_database.table1")
node1.query("ATTACH TABLE postgres_database.table1")
assert (
node1.query("SELECT count() FROM postgres_database.table1").rstrip() == "10000"
)
node1.query("DROP DATABASE postgres_database")
cursor.execute("DROP SCHEMA test_schema CASCADE")
cursor.execute("DROP TABLE table3")
@ -321,17 +333,20 @@ def test_predefined_connection_configuration(started_cluster):
def test_postgres_database_old_syntax(started_cluster):
conn = get_postgres_conn(started_cluster, True)
conn = get_postgres_conn(
started_cluster.postgres_ip, started_cluster.postgres_port, database=True
)
cursor = conn.cursor()
node1.query(
"""
DROP DATABASE IF EXISTS test_database;
CREATE DATABASE test_database ENGINE = PostgreSQL('postgres1:5432', 'test_database', 'postgres', 'mysecretpassword', 1);
CREATE DATABASE postgres_database ENGINE = PostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword', 1);
"""
)
create_postgres_table(cursor, "test_table")
assert "test_table" in node1.query("SHOW TABLES FROM test_database")
assert "test_table" in node1.query("SHOW TABLES FROM postgres_database")
cursor.execute(f"DROP TABLE test_table")
node1.query("DROP DATABASE IF EXISTS postgres_database;")
if __name__ == "__main__":