ClickHouse/tests/integration/test_dictionaries_postgresql/test.py
2022-03-22 17:39:58 +01:00

586 lines
17 KiB
Python

import pytest
import time
import psycopg2
from multiprocessing.dummy import Pool
from helpers.cluster import ClickHouseCluster
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
main_configs=[
"configs/config.xml",
"configs/dictionaries/postgres_dict.xml",
"configs/named_collections.xml",
],
with_postgres=True,
with_postgres_cluster=True,
)
postgres_dict_table_template = """
CREATE TABLE IF NOT EXISTS {} (
id Integer NOT NULL, key Integer NOT NULL, value Integer NOT NULL, PRIMARY KEY (id))
"""
click_dict_table_template = """
CREATE TABLE IF NOT EXISTS `test`.`dict_table_{}` (
`key` UInt32, `value` UInt32
) ENGINE = Dictionary({})
"""
def get_postgres_conn(ip, port, database=False):
if database == True:
conn_string = "host={} port={} dbname='clickhouse' user='postgres' password='mysecretpassword'".format(
ip, port
)
else:
conn_string = (
"host={} port={} user='postgres' password='mysecretpassword'".format(
ip, port
)
)
conn = psycopg2.connect(conn_string)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
conn.autocommit = True
return conn
def create_postgres_db(conn, name):
cursor = conn.cursor()
cursor.execute("CREATE DATABASE {}".format(name))
def create_postgres_table(cursor, table_name):
cursor.execute(postgres_dict_table_template.format(table_name))
def create_and_fill_postgres_table(cursor, table_name, port, host):
create_postgres_table(cursor, table_name)
# Fill postgres table using clickhouse postgres table function and check
table_func = """postgresql('{}:{}', 'clickhouse', '{}', 'postgres', 'mysecretpassword')""".format(
host, port, table_name
)
node1.query(
"""INSERT INTO TABLE FUNCTION {} SELECT number, number, number from numbers(10000)
""".format(
table_func, table_name
)
)
result = node1.query("SELECT count() FROM {}".format(table_func))
assert result.rstrip() == "10000"
def create_dict(table_name, index=0):
node1.query(click_dict_table_template.format(table_name, "dict" + str(index)))
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
node1.query("CREATE DATABASE IF NOT EXISTS test")
postgres_conn = get_postgres_conn(
ip=cluster.postgres_ip, port=cluster.postgres_port
)
print("postgres1 connected")
create_postgres_db(postgres_conn, "clickhouse")
postgres_conn = get_postgres_conn(
ip=cluster.postgres2_ip, port=cluster.postgres_port
)
print("postgres2 connected")
create_postgres_db(postgres_conn, "clickhouse")
yield cluster
finally:
cluster.shutdown()
def test_load_dictionaries(started_cluster):
conn = get_postgres_conn(
ip=started_cluster.postgres_ip,
database=True,
port=started_cluster.postgres_port,
)
cursor = conn.cursor()
table_name = "test0"
create_and_fill_postgres_table(
cursor,
table_name,
port=started_cluster.postgres_port,
host=started_cluster.postgres_ip,
)
create_dict(table_name)
dict_name = "dict0"
node1.query("SYSTEM RELOAD DICTIONARY {}".format(dict_name))
assert (
node1.query(
"SELECT count() FROM `test`.`dict_table_{}`".format(table_name)
).rstrip()
== "10000"
)
assert (
node1.query("SELECT dictGetUInt32('{}', 'key', toUInt64(0))".format(dict_name))
== "0\n"
)
assert (
node1.query(
"SELECT dictGetUInt32('{}', 'value', toUInt64(9999))".format(dict_name)
)
== "9999\n"
)
cursor.execute("DROP TABLE IF EXISTS {}".format(table_name))
node1.query("DROP TABLE IF EXISTS {}".format(table_name))
node1.query("DROP DICTIONARY IF EXISTS {}".format(dict_name))
def test_postgres_dictionaries_custom_query_full_load(started_cluster):
conn = get_postgres_conn(
ip=started_cluster.postgres_ip,
database=True,
port=started_cluster.postgres_port,
)
cursor = conn.cursor()
cursor.execute(
"CREATE TABLE IF NOT EXISTS test_table_1 (id Integer, value_1 Text);"
)
cursor.execute(
"CREATE TABLE IF NOT EXISTS test_table_2 (id Integer, value_2 Text);"
)
cursor.execute("INSERT INTO test_table_1 VALUES (1, 'Value_1');")
cursor.execute("INSERT INTO test_table_2 VALUES (1, 'Value_2');")
query = node1.query
query(
"""
CREATE DICTIONARY test_dictionary_custom_query
(
id UInt64,
value_1 String,
value_2 String
)
PRIMARY KEY id
LAYOUT(FLAT())
SOURCE(PostgreSQL(
DB 'clickhouse'
HOST '{}'
PORT {}
USER 'postgres'
PASSWORD 'mysecretpassword'
QUERY $doc$SELECT id, value_1, value_2 FROM test_table_1 INNER JOIN test_table_2 USING (id);$doc$))
LIFETIME(0)
""".format(
started_cluster.postgres_ip, started_cluster.postgres_port
)
)
result = query("SELECT id, value_1, value_2 FROM test_dictionary_custom_query")
assert result == "1\tValue_1\tValue_2\n"
query("DROP DICTIONARY test_dictionary_custom_query;")
cursor.execute("DROP TABLE test_table_2;")
cursor.execute("DROP TABLE test_table_1;")
def test_postgres_dictionaries_custom_query_partial_load_simple_key(started_cluster):
conn = get_postgres_conn(
ip=started_cluster.postgres_ip,
database=True,
port=started_cluster.postgres_port,
)
cursor = conn.cursor()
cursor.execute(
"CREATE TABLE IF NOT EXISTS test_table_1 (id Integer, value_1 Text);"
)
cursor.execute(
"CREATE TABLE IF NOT EXISTS test_table_2 (id Integer, value_2 Text);"
)
cursor.execute("INSERT INTO test_table_1 VALUES (1, 'Value_1');")
cursor.execute("INSERT INTO test_table_2 VALUES (1, 'Value_2');")
query = node1.query
query(
"""
CREATE DICTIONARY test_dictionary_custom_query
(
id UInt64,
value_1 String,
value_2 String
)
PRIMARY KEY id
LAYOUT(DIRECT())
SOURCE(PostgreSQL(
DB 'clickhouse'
HOST '{}'
PORT {}
USER 'postgres'
PASSWORD 'mysecretpassword'
QUERY $doc$SELECT id, value_1, value_2 FROM test_table_1 INNER JOIN test_table_2 USING (id) WHERE {{condition}};$doc$))
""".format(
started_cluster.postgres_ip, started_cluster.postgres_port
)
)
result = query(
"SELECT dictGet('test_dictionary_custom_query', ('value_1', 'value_2'), toUInt64(1))"
)
assert result == "('Value_1','Value_2')\n"
query("DROP DICTIONARY test_dictionary_custom_query;")
cursor.execute("DROP TABLE test_table_2;")
cursor.execute("DROP TABLE test_table_1;")
def test_postgres_dictionaries_custom_query_partial_load_complex_key(started_cluster):
conn = get_postgres_conn(
ip=started_cluster.postgres_ip,
database=True,
port=started_cluster.postgres_port,
)
cursor = conn.cursor()
cursor.execute(
"CREATE TABLE IF NOT EXISTS test_table_1 (id Integer, key Text, value_1 Text);"
)
cursor.execute(
"CREATE TABLE IF NOT EXISTS test_table_2 (id Integer, key Text, value_2 Text);"
)
cursor.execute("INSERT INTO test_table_1 VALUES (1, 'Key', 'Value_1');")
cursor.execute("INSERT INTO test_table_2 VALUES (1, 'Key', 'Value_2');")
query = node1.query
query(
"""
CREATE DICTIONARY test_dictionary_custom_query
(
id UInt64,
key String,
value_1 String,
value_2 String
)
PRIMARY KEY id, key
LAYOUT(COMPLEX_KEY_DIRECT())
SOURCE(PostgreSQL(
DB 'clickhouse'
HOST '{}'
PORT {}
USER 'postgres'
PASSWORD 'mysecretpassword'
QUERY $doc$SELECT id, key, value_1, value_2 FROM test_table_1 INNER JOIN test_table_2 USING (id, key) WHERE {{condition}};$doc$))
""".format(
started_cluster.postgres_ip, started_cluster.postgres_port
)
)
result = query(
"SELECT dictGet('test_dictionary_custom_query', ('value_1', 'value_2'), (toUInt64(1), 'Key'))"
)
assert result == "('Value_1','Value_2')\n"
query("DROP DICTIONARY test_dictionary_custom_query;")
cursor.execute("DROP TABLE test_table_2;")
cursor.execute("DROP TABLE test_table_1;")
def test_invalidate_query(started_cluster):
conn = get_postgres_conn(
ip=started_cluster.postgres_ip,
database=True,
port=started_cluster.postgres_port,
)
cursor = conn.cursor()
table_name = "test0"
create_and_fill_postgres_table(
cursor,
table_name,
port=started_cluster.postgres_port,
host=started_cluster.postgres_ip,
)
# invalidate query: SELECT value FROM test0 WHERE id = 0
dict_name = "dict0"
create_dict(table_name)
node1.query("SYSTEM RELOAD DICTIONARY {}".format(dict_name))
assert (
node1.query(
"SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name)
)
== "0\n"
)
assert (
node1.query(
"SELECT dictGetUInt32('{}', 'value', toUInt64(1))".format(dict_name)
)
== "1\n"
)
# update should happen
cursor.execute("UPDATE {} SET value=value+1 WHERE id = 0".format(table_name))
while True:
result = node1.query(
"SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name)
)
if result != "0\n":
break
assert (
node1.query(
"SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name)
)
== "1\n"
)
# no update should happen
cursor.execute("UPDATE {} SET value=value*2 WHERE id != 0".format(table_name))
time.sleep(5)
assert (
node1.query(
"SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name)
)
== "1\n"
)
assert (
node1.query(
"SELECT dictGetUInt32('{}', 'value', toUInt64(1))".format(dict_name)
)
== "1\n"
)
# update should happen
cursor.execute("UPDATE {} SET value=value+1 WHERE id = 0".format(table_name))
time.sleep(5)
assert (
node1.query(
"SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name)
)
== "2\n"
)
assert (
node1.query(
"SELECT dictGetUInt32('{}', 'value', toUInt64(1))".format(dict_name)
)
== "2\n"
)
node1.query("DROP TABLE IF EXISTS {}".format(table_name))
node1.query("DROP DICTIONARY IF EXISTS {}".format(dict_name))
cursor.execute("DROP TABLE IF EXISTS {}".format(table_name))
def test_dictionary_with_replicas(started_cluster):
conn1 = get_postgres_conn(
ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True,
)
cursor1 = conn1.cursor()
conn2 = get_postgres_conn(
ip=started_cluster.postgres2_ip,
port=started_cluster.postgres_port,
database=True,
)
cursor2 = conn2.cursor()
create_postgres_table(cursor1, "test1")
create_postgres_table(cursor2, "test1")
cursor1.execute(
"INSERT INTO test1 select i, i, i from generate_series(0, 99) as t(i);"
)
cursor2.execute(
"INSERT INTO test1 select i, i, i from generate_series(100, 199) as t(i);"
)
create_dict("test1", 1)
result = node1.query("SELECT * FROM `test`.`dict_table_test1` ORDER BY key")
# priority 0 - non running port
assert node1.contains_in_log("PostgreSQLConnectionPool: Connection error*")
# priority 1 - postgres2, table contains rows with values 100-200
# priority 2 - postgres1, table contains rows with values 0-100
expected = node1.query("SELECT number, number FROM numbers(100, 100)")
assert result == expected
cursor1.execute("DROP TABLE IF EXISTS test1")
cursor2.execute("DROP TABLE IF EXISTS test1")
node1.query("DROP TABLE IF EXISTS test1")
node1.query("DROP DICTIONARY IF EXISTS dict1")
def test_postgres_schema(started_cluster):
conn = get_postgres_conn(
ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True,
)
cursor = conn.cursor()
cursor.execute("CREATE SCHEMA test_schema")
cursor.execute("CREATE TABLE test_schema.test_table (id integer, value integer)")
cursor.execute(
"INSERT INTO test_schema.test_table SELECT i, i FROM generate_series(0, 99) as t(i)"
)
node1.query(
"""
DROP DICTIONARY IF EXISTS postgres_dict;
CREATE DICTIONARY postgres_dict (id UInt32, value UInt32)
PRIMARY KEY id
SOURCE(POSTGRESQL(
port 5432
host 'postgres1'
user 'postgres'
password 'mysecretpassword'
db 'clickhouse'
table 'test_schema.test_table'))
LIFETIME(MIN 1 MAX 2)
LAYOUT(HASHED());
"""
)
result = node1.query("SELECT dictGetUInt32(postgres_dict, 'value', toUInt64(1))")
assert int(result.strip()) == 1
result = node1.query("SELECT dictGetUInt32(postgres_dict, 'value', toUInt64(99))")
assert int(result.strip()) == 99
node1.query("DROP DICTIONARY IF EXISTS postgres_dict")
def test_predefined_connection_configuration(started_cluster):
conn = get_postgres_conn(
ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True,
)
cursor = conn.cursor()
cursor.execute("DROP TABLE IF EXISTS test_table")
cursor.execute("CREATE TABLE test_table (id integer, value integer)")
cursor.execute(
"INSERT INTO test_table SELECT i, i FROM generate_series(0, 99) as t(i)"
)
node1.query(
"""
DROP DICTIONARY IF EXISTS postgres_dict;
CREATE DICTIONARY postgres_dict (id UInt32, value UInt32)
PRIMARY KEY id
SOURCE(POSTGRESQL(NAME postgres1))
LIFETIME(MIN 1 MAX 2)
LAYOUT(HASHED());
"""
)
result = node1.query("SELECT dictGetUInt32(postgres_dict, 'value', toUInt64(99))")
assert int(result.strip()) == 99
cursor.execute("DROP SCHEMA IF EXISTS test_schema CASCADE")
cursor.execute("CREATE SCHEMA test_schema")
cursor.execute("CREATE TABLE test_schema.test_table (id integer, value integer)")
cursor.execute(
"INSERT INTO test_schema.test_table SELECT i, 100 FROM generate_series(0, 99) as t(i)"
)
node1.query(
"""
DROP DICTIONARY postgres_dict;
CREATE DICTIONARY postgres_dict (id UInt32, value UInt32)
PRIMARY KEY id
SOURCE(POSTGRESQL(NAME postgres1 SCHEMA test_schema))
LIFETIME(MIN 1 MAX 2)
LAYOUT(HASHED());
"""
)
result = node1.query("SELECT dictGetUInt32(postgres_dict, 'value', toUInt64(99))")
assert int(result.strip()) == 100
node1.query(
"""
DROP DICTIONARY postgres_dict;
CREATE DICTIONARY postgres_dict (id UInt32, value UInt32)
PRIMARY KEY id
SOURCE(POSTGRESQL(NAME postgres2))
LIFETIME(MIN 1 MAX 2)
LAYOUT(HASHED());
"""
)
result = node1.query("SELECT dictGetUInt32(postgres_dict, 'value', toUInt64(99))")
assert int(result.strip()) == 100
node1.query("DROP DICTIONARY postgres_dict")
node1.query(
"""
CREATE DICTIONARY postgres_dict (id UInt32, value UInt32)
PRIMARY KEY id
SOURCE(POSTGRESQL(NAME postgres4))
LIFETIME(MIN 1 MAX 2)
LAYOUT(HASHED());
"""
)
result = node1.query_and_get_error(
"SELECT dictGetUInt32(postgres_dict, 'value', toUInt64(99))"
)
node1.query(
"""
DROP DICTIONARY postgres_dict;
CREATE DICTIONARY postgres_dict (id UInt32, value UInt32)
PRIMARY KEY id
SOURCE(POSTGRESQL(NAME postgres1 PORT 5432))
LIFETIME(MIN 1 MAX 2)
LAYOUT(HASHED());
"""
)
result = node1.query("SELECT dictGetUInt32(postgres_dict, 'value', toUInt64(99))")
assert int(result.strip()) == 99
def test_bad_configuration(started_cluster):
conn = get_postgres_conn(
ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True,
)
cursor = conn.cursor()
node1.query(
"""
DROP DICTIONARY IF EXISTS postgres_dict;
CREATE DICTIONARY postgres_dict (id UInt32, value UInt32)
PRIMARY KEY id
SOURCE(POSTGRESQL(
port 5432
host 'postgres1'
user 'postgres'
password 'mysecretpassword'
dbbb 'clickhouse'
table 'test_schema.test_table'))
LIFETIME(MIN 1 MAX 2)
LAYOUT(HASHED());
"""
)
node1.query_and_get_error(
"SELECT dictGetUInt32(postgres_dict, 'value', toUInt64(1))"
)
assert node1.contains_in_log("Unexpected key `dbbb`")
if __name__ == "__main__":
cluster.start()
input("Cluster created, press any key to destroy...")
cluster.shutdown()