ClickHouse/tests/integration/test_dictionaries_update_field/test.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

138 lines
3.9 KiB
Python
Raw Normal View History

## sudo -H pip install PyMySQL
import time
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.cluster import ClickHouseKiller
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance("main_node", main_configs=[])
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
node.query(
"""
CREATE TABLE table_for_update_field_dictionary
(
key UInt64,
value String,
last_insert_time DateTime
)
ENGINE = TinyLog;
"""
)
yield cluster
finally:
cluster.shutdown()
def dictionary_update_field_actions(dictionary_name):
node.query(
"INSERT INTO table_for_update_field_dictionary VALUES (1, 'First', now());"
)
query_result = node.query(
"SELECT key, value FROM {dictionary_name} ORDER BY key ASC".format(
dictionary_name=dictionary_name
)
)
assert query_result == "1\tFirst\n"
node.query(
"INSERT INTO table_for_update_field_dictionary VALUES (2, 'Second', now());"
)
time.sleep(10)
query_result = node.query(
"SELECT key, value FROM {dictionary_name} ORDER BY key ASC".format(
dictionary_name=dictionary_name
)
)
assert query_result == "1\tFirst\n2\tSecond\n"
node.query(
"INSERT INTO table_for_update_field_dictionary VALUES (2, 'SecondUpdated', now());"
)
node.query(
"INSERT INTO table_for_update_field_dictionary VALUES (3, 'Third', now());"
)
time.sleep(10)
query_result = node.query(
"SELECT key, value FROM {dictionary_name} ORDER BY key ASC".format(
dictionary_name=dictionary_name
)
)
assert query_result == "1\tFirst\n2\tSecondUpdated\n3\tThird\n"
node.query("TRUNCATE TABLE table_for_update_field_dictionary")
node.query(
"DROP DICTIONARY {dictionary_name}".format(dictionary_name=dictionary_name)
)
@pytest.mark.parametrize(
"dictionary_name,dictionary_type",
[
("flat_update_field_dictionary", "FLAT"),
("simple_key_hashed_update_field_dictionary", "HASHED"),
("complex_key_hashed_update_field_dictionary", "COMPLEX_KEY_HASHED"),
],
)
def test_update_field(started_cluster, dictionary_name, dictionary_type):
create_dictionary_query = """
CREATE DICTIONARY {dictionary_name}
(
key UInt64,
value String,
last_insert_time DateTime
)
PRIMARY KEY key
SOURCE(CLICKHOUSE(table 'table_for_update_field_dictionary' update_field 'last_insert_time'))
LAYOUT({dictionary_type}())
LIFETIME(1);
""".format(
dictionary_name=dictionary_name, dictionary_type=dictionary_type
)
node.query(create_dictionary_query)
dictionary_update_field_actions(dictionary_name)
@pytest.mark.parametrize(
"dictionary_name,dictionary_type",
[
("flat_update_field_dictionary", "FLAT"),
("simple_key_hashed_update_field_dictionary", "HASHED"),
("complex_key_hashed_update_field_dictionary", "COMPLEX_KEY_HASHED"),
],
)
def test_update_field_custom_query(started_cluster, dictionary_name, dictionary_type):
create_dictionary_query = """
CREATE DICTIONARY {dictionary_name}
(
key UInt64,
value String,
last_insert_time DateTime
)
PRIMARY KEY key
SOURCE(CLICKHOUSE(query $doc$SELECT key, value, last_insert_time FROM table_for_update_field_dictionary WHERE {{condition}};$doc$ update_field 'last_insert_time'))
LAYOUT({dictionary_type}())
LIFETIME(1);
""".format(
dictionary_name=dictionary_name, dictionary_type=dictionary_type
)
node.query(create_dictionary_query)
dictionary_update_field_actions(dictionary_name)