ClickHouse/tests/integration/test_dictionaries_postgresql/test.py

194 lines
7.9 KiB
Python
Raw Normal View History

2020-12-09 21:14:16 +00:00
import pytest
import time
import psycopg2
2021-05-07 09:51:40 +00:00
from multiprocessing.dummy import Pool
2020-12-21 19:20:56 +00:00
2020-12-09 21:14:16 +00:00
from helpers.cluster import ClickHouseCluster
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1',
main_configs=['configs/config.xml', 'configs/dictionaries/postgres_dict.xml'],
with_postgres=True, with_postgres_cluster=True)
2020-12-09 21:14:16 +00:00
postgres_dict_table_template = """
CREATE TABLE IF NOT EXISTS {} (
id Integer NOT NULL, value Integer NOT NULL, PRIMARY KEY (id))
"""
click_dict_table_template = """
CREATE TABLE IF NOT EXISTS `test`.`dict_table_{}` (
`id` UInt64, `value` UInt32
) ENGINE = Dictionary({})
"""
2021-03-19 16:44:08 +00:00
def get_postgres_conn(ip, port, database=False):
2020-12-09 21:14:16 +00:00
if database == True:
2021-03-19 16:44:08 +00:00
conn_string = "host={} port={} dbname='clickhouse' user='postgres' password='mysecretpassword'".format(ip, port)
2020-12-09 21:14:16 +00:00
else:
2021-03-19 16:44:08 +00:00
conn_string = "host={} port={} user='postgres' password='mysecretpassword'".format(ip, port)
2020-12-09 21:14:16 +00:00
conn = psycopg2.connect(conn_string)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
conn.autocommit = True
return conn
def create_postgres_db(conn, name):
cursor = conn.cursor()
cursor.execute("CREATE DATABASE {}".format(name))
def create_postgres_table(cursor, table_name):
2020-12-09 21:14:16 +00:00
cursor.execute(postgres_dict_table_template.format(table_name))
def create_and_fill_postgres_table(cursor, table_name, port, host):
create_postgres_table(cursor, table_name)
2020-12-09 21:14:16 +00:00
# Fill postgres table using clickhouse postgres table function and check
table_func = '''postgresql('{}:{}', 'clickhouse', '{}', 'postgres', 'mysecretpassword')'''.format(host, port, table_name)
2020-12-09 21:14:16 +00:00
node1.query('''INSERT INTO TABLE FUNCTION {} SELECT number, number from numbers(10000)
'''.format(table_func, table_name))
result = node1.query("SELECT count() FROM {}".format(table_func))
assert result.rstrip() == '10000'
def create_dict(table_name, index=0):
2020-12-13 22:48:40 +00:00
node1.query(click_dict_table_template.format(table_name, 'dict' + str(index)))
2020-12-09 21:14:16 +00:00
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
node1.query("CREATE DATABASE IF NOT EXISTS test")
postgres_conn = get_postgres_conn(ip=cluster.postgres_ip, port=cluster.postgres_port)
print("postgres1 connected")
create_postgres_db(postgres_conn, 'clickhouse')
postgres_conn = get_postgres_conn(ip=cluster.postgres2_ip, port=cluster.postgres_port)
print("postgres2 connected")
2020-12-09 21:14:16 +00:00
create_postgres_db(postgres_conn, 'clickhouse')
2020-12-09 21:14:16 +00:00
yield cluster
finally:
cluster.shutdown()
def test_load_dictionaries(started_cluster):
2021-03-19 16:44:08 +00:00
conn = get_postgres_conn(ip=started_cluster.postgres_ip, database=True, port=started_cluster.postgres_port)
2020-12-09 21:14:16 +00:00
cursor = conn.cursor()
2020-12-13 22:48:40 +00:00
table_name = 'test0'
2021-03-19 16:44:08 +00:00
create_and_fill_postgres_table(cursor, table_name, port=started_cluster.postgres_port, host=started_cluster.postgres_ip)
2020-12-09 21:14:16 +00:00
create_dict(table_name)
dict_name = 'dict0'
2021-03-26 15:30:35 +00:00
node1.query("SYSTEM RELOAD DICTIONARY {}".format(dict_name))
2020-12-09 21:14:16 +00:00
assert node1.query("SELECT count() FROM `test`.`dict_table_{}`".format(table_name)).rstrip() == '10000'
assert node1.query("SELECT dictGetUInt32('{}', 'id', toUInt64(0))".format(dict_name)) == '0\n'
assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(9999))".format(dict_name)) == '9999\n'
2020-12-09 21:14:16 +00:00
cursor.execute("DROP TABLE IF EXISTS {}".format(table_name))
node1.query("DROP TABLE IF EXISTS {}".format(table_name))
node1.query("DROP DICTIONARY IF EXISTS {}".format(dict_name))
2020-12-09 21:14:16 +00:00
def test_invalidate_query(started_cluster):
2021-03-19 16:44:08 +00:00
conn = get_postgres_conn(ip=started_cluster.postgres_ip, database=True, port=started_cluster.postgres_port)
2020-12-09 21:14:16 +00:00
cursor = conn.cursor()
2020-12-13 22:48:40 +00:00
table_name = 'test0'
2021-03-19 16:44:08 +00:00
create_and_fill_postgres_table(cursor, table_name, port=started_cluster.postgres_port, host=started_cluster.postgres_ip)
2020-12-09 21:14:16 +00:00
2020-12-13 22:48:40 +00:00
# invalidate query: SELECT value FROM test0 WHERE id = 0
2020-12-09 21:14:16 +00:00
dict_name = 'dict0'
create_dict(table_name)
2020-12-13 22:48:40 +00:00
node1.query("SYSTEM RELOAD DICTIONARY {}".format(dict_name))
assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name)) == "0\n"
2020-12-09 21:14:16 +00:00
assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(1))".format(dict_name)) == "1\n"
2020-12-13 22:48:40 +00:00
# update should happen
cursor.execute("UPDATE {} SET value=value+1 WHERE id = 0".format(table_name))
while True:
result = node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name))
if result != '0\n':
break
assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name)) == '1\n'
# no update should happen
cursor.execute("UPDATE {} SET value=value*2 WHERE id != 0".format(table_name))
time.sleep(5)
assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name)) == '1\n'
2020-12-09 21:14:16 +00:00
assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(1))".format(dict_name)) == '1\n'
# update should happen
2020-12-13 22:48:40 +00:00
cursor.execute("UPDATE {} SET value=value+1 WHERE id = 0".format(table_name))
2020-12-09 21:14:16 +00:00
time.sleep(5)
2020-12-13 22:48:40 +00:00
assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name)) == '2\n'
2020-12-09 21:14:16 +00:00
assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(1))".format(dict_name)) == '2\n'
node1.query("DROP TABLE IF EXISTS {}".format(table_name))
node1.query("DROP DICTIONARY IF EXISTS {}".format(dict_name))
2021-03-18 14:22:17 +00:00
cursor.execute("DROP TABLE IF EXISTS {}".format(table_name))
def test_dictionary_with_replicas(started_cluster):
2021-03-19 16:44:08 +00:00
conn1 = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True)
cursor1 = conn1.cursor()
2021-03-19 16:44:08 +00:00
conn2 = get_postgres_conn(ip=started_cluster.postgres2_ip, port=started_cluster.postgres_port, database=True)
cursor2 = conn2.cursor()
create_postgres_table(cursor1, 'test1')
create_postgres_table(cursor2, 'test1')
cursor1.execute('INSERT INTO test1 select i, i from generate_series(0, 99) as t(i);');
cursor2.execute('INSERT INTO test1 select i, i from generate_series(100, 199) as t(i);');
create_dict('test1', 1)
result = node1.query("SELECT * FROM `test`.`dict_table_test1` ORDER BY id")
# priority 0 - non running port
2021-05-07 09:51:40 +00:00
assert node1.contains_in_log('PostgreSQLConnectionPool: Connection error*')
# priority 1 - postgres2, table contains rows with values 100-200
# priority 2 - postgres1, table contains rows with values 0-100
expected = node1.query("SELECT number, number FROM numbers(100, 100)")
assert(result == expected)
cursor1.execute("DROP TABLE IF EXISTS test1")
cursor2.execute("DROP TABLE IF EXISTS test1")
node1.query("DROP TABLE IF EXISTS test1")
node1.query("DROP DICTIONARY IF EXISTS dict1")
2020-12-09 21:14:16 +00:00
2021-05-09 15:42:54 +00:00
def test_postgres_scema(started_cluster):
2021-05-17 11:16:16 +00:00
conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True)
2021-05-09 15:42:54 +00:00
cursor = conn.cursor()
cursor.execute('CREATE SCHEMA test_schema')
cursor.execute('CREATE TABLE test_schema.test_table (id integer, value integer)')
cursor.execute('INSERT INTO test_schema.test_table SELECT i, i FROM generate_series(0, 99) as t(i)')
node1.query('''
CREATE DICTIONARY postgres_dict (id UInt32, value UInt32)
PRIMARY KEY id
SOURCE(POSTGRESQL(
port 5432
host 'postgres1'
user 'postgres'
password 'mysecretpassword'
db 'clickhouse'
table 'test_schema.test_table'))
LIFETIME(MIN 1 MAX 2)
LAYOUT(HASHED());
''')
result = node1.query("SELECT dictGetUInt32(postgres_dict, 'value', toUInt64(1))")
assert(int(result.strip()) == 1)
result = node1.query("SELECT dictGetUInt32(postgres_dict, 'value', toUInt64(99))")
assert(int(result.strip()) == 99)
node1.query("DROP DICTIONARY IF EXISTS postgres_dict")
2020-12-09 21:14:16 +00:00
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")
cluster.shutdown()