This commit is contained in:
Nikita Fomichev 2024-11-27 18:59:25 +01:00 committed by GitHub
commit 7cdfd973d6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 1367 additions and 14 deletions

View File

@ -101,3 +101,4 @@ wadllib==1.3.6
websocket-client==1.8.0
wheel==0.38.1
zipp==1.0.0
jinja2==3.1.3

View File

@ -157,13 +157,14 @@ For your convenience, the old documentation is located [here](https://pastila.nl
## Refreshable Materialized View [Experimental] {#refreshable-materialized-view}
```sql
CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]table_name
CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
REFRESH EVERY|AFTER interval [OFFSET interval]
RANDOMIZE FOR interval
DEPENDS ON [db.]name [, [db.]name [, ...]]
SETTINGS name = value [, name = value [, ...]]
[RANDOMIZE FOR interval]
[DEPENDS ON [db.]name [, [db.]name [, ...]]]
[SETTINGS name = value [, name = value [, ...]]]
[APPEND]
[TO[db.]name] [(columns)] [ENGINE = engine] [EMPTY]
[TO[db.]name] [(columns)] [ENGINE = engine]
[EMPTY]
AS SELECT ...
[COMMENT 'comment']
```
@ -281,7 +282,7 @@ This replaces *all* refresh parameters at once: schedule, dependencies, settings
The status of all refreshable materialized views is available in table [`system.view_refreshes`](../../../operations/system-tables/view_refreshes.md). In particular, it contains refresh progress (if running), last and next refresh time, exception message if a refresh failed.
To manually stop, start, trigger, or cancel refreshes use [`SYSTEM STOP|START|REFRESH|CANCEL VIEW`](../system.md#refreshable-materialized-views).
To manually stop, start, trigger, or cancel refreshes use [`SYSTEM STOP|START|REFRESH|WAIT|CANCEL VIEW`](../system.md#refreshable-materialized-views).
To wait for a refresh to complete, use [`SYSTEM WAIT VIEW`](../system.md#refreshable-materialized-views). In particular, useful for waiting for initial refresh after creating a view.

View File

@ -44,7 +44,10 @@ sudo -H pip install \
hypothesis \
pyhdfs \
pika \
nats-py
nats-py \
pandas \
numpy \
jinja2
```
(highly not recommended) If you really want to use OS packages on modern debian/ubuntu instead of "pip": `sudo apt install -y docker.io docker-compose-v2 python3-pytest python3-dicttoxml python3-djocker python3-pymysql python3-protobuf python3-pymongo python3-tzlocal python3-kazoo python3-psycopg2 kafka-python3 python3-pytest-timeout python3-minio`

View File

@ -4,6 +4,9 @@ import subprocess as sp
import tempfile
from threading import Timer
import numpy as np
import pandas as pd
DEFAULT_QUERY_TIMEOUT = 600
@ -59,6 +62,7 @@ class Client:
host=None,
ignore_error=False,
query_id=None,
parse=False,
):
return self.get_query_request(
sql,
@ -71,6 +75,7 @@ class Client:
host=host,
ignore_error=ignore_error,
query_id=query_id,
parse=parse,
).get_answer()
def get_query_request(
@ -85,6 +90,7 @@ class Client:
host=None,
ignore_error=False,
query_id=None,
parse=False,
):
command = self.command[:]
@ -107,8 +113,10 @@ class Client:
command += ["--host", host]
if query_id is not None:
command += ["--query_id", query_id]
if parse:
command += ["--format=TabSeparatedWithNames"]
return CommandRequest(command, stdin, timeout, ignore_error)
return CommandRequest(command, stdin, timeout, ignore_error, parse)
@stacktraces_on_timeout_decorator
def query_and_get_error(
@ -169,7 +177,9 @@ class QueryRuntimeException(Exception):
class CommandRequest:
def __init__(self, command, stdin=None, timeout=None, ignore_error=False):
def __init__(
self, command, stdin=None, timeout=None, ignore_error=False, parse=False
):
# Write data to tmp file to avoid PIPEs and execution blocking
stdin_file = tempfile.TemporaryFile(mode="w+")
stdin_file.write(stdin)
@ -177,7 +187,7 @@ class CommandRequest:
self.stdout_file = tempfile.TemporaryFile()
self.stderr_file = tempfile.TemporaryFile()
self.ignore_error = ignore_error
self.parse = parse
# print " ".join(command)
# we suppress stderror on client becase sometimes thread sanitizer
@ -243,6 +253,15 @@ class CommandRequest:
stderr,
)
if self.parse:
from io import StringIO
return (
pd.read_csv(StringIO(stdout), sep="\t")
.replace(r"\N", None)
.replace(np.nan, None)
)
return stdout
def get_error(self):

View File

@ -48,6 +48,7 @@ except Exception as e:
import docker
from dict2xml import dict2xml
from docker.models.containers import Container
from kazoo.client import KazooClient
from kazoo.exceptions import KazooException
from minio import Minio
@ -523,7 +524,7 @@ class ClickHouseCluster:
self.base_jdbc_bridge_cmd = []
self.base_redis_cmd = []
self.pre_zookeeper_commands = []
self.instances = {}
self.instances: dict[str, ClickHouseInstance] = {}
self.with_zookeeper = False
self.with_zookeeper_secure = False
self.with_mysql_client = False
@ -763,7 +764,7 @@ class ClickHouseCluster:
self.prometheus_remote_read_handler_port = 9092
self.prometheus_remote_read_handler_path = "/read"
self.docker_client = None
self.docker_client: docker.DockerClient = None
self.is_up = False
self.env = os.environ.copy()
logging.debug(f"CLUSTER INIT base_config_dir:{self.base_config_dir}")
@ -954,7 +955,7 @@ class ClickHouseCluster:
except:
pass
def get_docker_handle(self, docker_id):
def get_docker_handle(self, docker_id) -> Container:
exception = None
for i in range(20):
try:
@ -3351,6 +3352,12 @@ class ClickHouseCluster:
logging.info("Starting zookeeper node: %s", n)
subprocess_check_call(self.base_zookeeper_cmd + ["start", n])
def query_all_nodes(self, sql, *args, **kwargs):
return {
name: instance.query(sql, ignore_error=True, *args, **kwargs)
for name, instance in self.instances.items()
}
DOCKER_COMPOSE_TEMPLATE = """---
services:
@ -3626,6 +3633,7 @@ class ClickHouseInstance:
host=None,
ignore_error=False,
query_id=None,
parse=False,
):
sql_for_log = ""
if len(sql) > 1000:
@ -3644,6 +3652,7 @@ class ClickHouseInstance:
ignore_error=ignore_error,
query_id=query_id,
host=host,
parse=parse,
)
def query_with_retry(
@ -3660,6 +3669,7 @@ class ClickHouseInstance:
retry_count=20,
sleep_time=0.5,
check_callback=lambda x: True,
parse=False,
):
# logging.debug(f"Executing query {sql} on {self.name}")
result = None
@ -3676,6 +3686,7 @@ class ClickHouseInstance:
database=database,
host=host,
ignore_error=ignore_error,
parse=parse,
)
if check_callback(result):
return result
@ -4408,7 +4419,7 @@ class ClickHouseInstance:
else:
self.wait_start(time_left)
def get_docker_handle(self):
def get_docker_handle(self) -> Container:
return self.cluster.get_docker_handle(self.docker_id)
def stop(self):

View File

@ -182,3 +182,16 @@ def csv_compare(result, expected):
mismatch.append("+[%d]=%s" % (i, csv_result.lines[i]))
return "\n".join(mismatch)
def wait_condition(func, condition, max_attempts=10, delay=0.1):
attempts = 0
while attempts < max_attempts:
result = func()
if condition(result):
return result
attempts += 1
if attempts < max_attempts:
time.sleep(delay)
raise Exception(f"Function did not satisfy condition after {max_attempts} attempts")

View File

@ -0,0 +1,3 @@
<clickhouse>
<timezone>Etc/UTC</timezone>
</clickhouse>

View File

@ -0,0 +1,16 @@
<clickhouse>
<remote_servers>
<default>
<shard>
<replica>
<host>node1_1</host>
<port>9000</port>
</replica>
<replica>
<host>node1_2</host>
<port>9000</port>
</replica>
</shard>
</default>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,8 @@
<clickhouse>
<profiles>
<default>
<allow_experimental_refreshable_materialized_view>1</allow_experimental_refreshable_materialized_view>
<session_timezone>Etc/UTC</session_timezone>
</default>
</profiles>
</clickhouse>

View File

@ -0,0 +1,626 @@
import datetime
import logging
import time
from datetime import datetime
from typing import Optional
import pytest
from jinja2 import Environment, Template
import helpers.client
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import wait_condition
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node1_1",
main_configs=["configs/remote_servers.xml"],
user_configs=["configs/settings.xml"],
with_zookeeper=True,
stay_alive=True,
macros={"shard": 1, "replica": 1},
)
node2 = cluster.add_instance(
"node1_2",
main_configs=["configs/remote_servers.xml"],
user_configs=["configs/settings.xml"],
with_zookeeper=True,
stay_alive=True,
macros={"shard": 1, "replica": 2},
)
@pytest.fixture(scope="session", autouse=True)
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
"""
### TESTS
+1. Append mode
2. Restart node and wait for restore
+3. Simple functional testing: all values in refresh result correct (two and more rmv)
+4. Combinations of intervals
+5. Two (and more) rmv from single to single [APPEND]
+6. ALTER rmv ALTER TABLE [db.]name MODIFY REFRESH EVERY|AFTER ... [RANDOMIZE FOR ...] [DEPENDS ON ...] [SETTINGS ...]
+7. RMV without tgt table (automatic table) (check APPEND)
+8 DROP rmv
+9 CREATE - DROP - CREATE - ALTER
11. Long queries over refresh time (check settings)
13. incorrect intervals (interval 1 sec, offset 1 minute)
- OFFSET less than the period. 'EVERY 1 MONTH OFFSET 5 WEEK'
- cases below
+14. ALTER on cluster
15. write to distributed with / without APPEND
17. Not existent TO table (ON CLUSTER)
18. Not existent FROM table (ON CLUSTER)
19. Not existent BOTH tables (ON CLUSTER)
+20. retry failed
21. overflow with wait test
22. ON CLUSTER
+SYSTEM STOP|START|REFRESH|CANCEL VIEW
+SYSTEM WAIT VIEW [db.]name
"""
def j2_template(
string: str,
globals: Optional[dict] = None,
filters: Optional[dict] = None,
tests: Optional[dict] = None,
) -> Template:
def uppercase(value: str):
return value.upper()
def lowercase(value: str):
return value.lower()
def format_settings(items: dict):
return ", ".join([f"{k}={v}" for k, v in items.items()])
# Create a custom environment and add the functions
env = Environment(
trim_blocks=False, lstrip_blocks=True, keep_trailing_newline=False
)
env.globals["uppercase"] = uppercase
env.globals["lowercase"] = lowercase
env.filters["format_settings"] = format_settings
if filters:
env.filters.update(filters)
if globals:
env.globals.update(globals)
if tests:
env.tests.update(tests)
return env.from_string(string)
def assert_same_values(lst: list):
if not isinstance(lst, list):
lst = list(lst)
assert all(x == lst[0] for x in lst)
RMV_TEMPLATE = """{{ refresh_interval }}
{% if depends_on %}DEPENDS ON {{ depends_on|join(', ') }}{% endif %}
{% if settings %}SETTINGS {{ settings|format_settings }}{% endif %}
{% if with_append %}APPEND{% endif %}
{% if to_clause %}TO {{ to_clause }}{% endif %}
{% if table_clause %}{{ table_clause }}{% endif %}
{% if empty %}EMPTY{% endif %}
{% if select_query %} AS {{ select_query }}{% endif %}
"""
CREATE_RMV = j2_template(
"""CREATE MATERIALIZED VIEW
{% if if_not_exists %}IF NOT EXISTS{% endif %}
{% if db %}{{db}}.{% endif %}{{ table_name }}
{% if on_cluster %}ON CLUSTER {{ on_cluster }}{% endif %}
REFRESH
"""
+ RMV_TEMPLATE
)
ALTER_RMV = j2_template(
"""ALTER TABLE
{% if db %}{{db}}.{% endif %}{{ table_name }}
{% if on_cluster %}ON CLUSTER {{ on_cluster }}{% endif %}
MODIFY REFRESH
"""
+ RMV_TEMPLATE
)
@pytest.fixture(scope="module", autouse=True)
def module_setup_tables(started_cluster):
node.query(f"DROP DATABASE IF EXISTS test_db")
node.query(f"CREATE DATABASE IF NOT EXISTS test_db ON CLUSTER default")
node.query("DROP TABLE IF EXISTS test_rmv ON CLUSTER default")
node.query("DROP TABLE IF EXISTS src1 ON CLUSTER default")
node.query("DROP TABLE IF EXISTS src2 ON CLUSTER default")
node.query("DROP TABLE IF EXISTS tgt1 ON CLUSTER default")
node.query("DROP TABLE IF EXISTS tgt2 ON CLUSTER default")
node.query("DROP TABLE IF EXISTS test_rmv ON CLUSTER default")
node.query("DROP TABLE IF EXISTS test_db.test_rmv ON CLUSTER default")
node.query(
f"CREATE TABLE src1 ON CLUSTER default (a DateTime, b UInt64) ENGINE = Memory"
)
node.query(
f"CREATE TABLE src2 ON CLUSTER default (a DateTime, b UInt64) ENGINE = Memory"
)
node.query(
f"CREATE TABLE tgt1 ON CLUSTER default (a DateTime, b UInt64) ENGINE = MergeTree ORDER BY tuple()"
)
node.query(
f"CREATE TABLE tgt2 ON CLUSTER default (a DateTime, b UInt64) ENGINE = Memory"
)
node.query(
f"CREATE MATERIALIZED VIEW IF NOT EXISTS dummy_rmv ON CLUSTER default "
f"REFRESH EVERY 10 HOUR engine Memory EMPTY AS select number as x from numbers(1)"
)
@pytest.fixture(scope="function")
def fn_setup_tables():
node.query("DROP TABLE IF EXISTS src1 ON CLUSTER default")
node.query("DROP TABLE IF EXISTS tgt1 ON CLUSTER default")
node.query("DROP TABLE IF EXISTS test_rmv ON CLUSTER default")
node.query("DROP TABLE IF EXISTS test_db.test_rmv ON CLUSTER default")
node.query(
f"CREATE TABLE tgt1 ON CLUSTER default (a DateTime, b UInt64) ENGINE = MergeTree ORDER BY tuple()"
)
node.query(
f"CREATE TABLE src1 ON CLUSTER default (a DateTime, b UInt64) ENGINE = Memory"
)
node.query(f"INSERT INTO src1 VALUES ('2020-01-01', 1), ('2020-01-02', 2)")
@pytest.mark.parametrize(
"select_query",
[
"SELECT now() as a, number as b FROM numbers(2)",
"SELECT now() as a, b as b FROM src1",
],
)
@pytest.mark.parametrize("with_append", [True, False])
@pytest.mark.parametrize("empty", [True, False])
def test_simple_append(
module_setup_tables,
fn_setup_tables,
select_query,
with_append,
empty,
):
create_sql = CREATE_RMV.render(
table_name="test_rmv",
refresh_interval="EVERY 1 HOUR",
to_clause="tgt1",
select_query=select_query,
with_append=with_append,
empty=empty,
)
node.query(create_sql)
rmv = get_rmv_info(node, "test_rmv", wait_status="Scheduled")
assert rmv["exception"] is None
records = node.query("SELECT count() FROM test_rmv")
if empty:
assert records == "0\n"
else:
assert records == "2\n"
node.query(f"SYSTEM TEST VIEW test_rmv SET FAKE TIME '{rmv['next_refresh_time']}'")
rmv2 = get_rmv_info(node, "test_rmv", wait_status="Scheduled")
assert rmv2["exception"] is None
if empty:
expect = "2\n"
if not with_append:
expect = "2\n"
if with_append and not empty:
expect = "4\n"
records = node.query_with_retry(
"SELECT count() FROM test_rmv", check_callback=lambda x: x == expect
)
assert records == expect
@pytest.mark.parametrize("with_append", [True, False])
@pytest.mark.parametrize("if_not_exists", [True, False])
@pytest.mark.parametrize("on_cluster", [True, False])
@pytest.mark.parametrize("depends_on", [None, ["default.dummy_rmv"]])
@pytest.mark.parametrize("empty", [True, False])
@pytest.mark.parametrize("database_name", [None, "test_db"])
@pytest.mark.parametrize(
"settings",
[
{},
{
"refresh_retries": "10",
"refresh_retry_initial_backoff_ms": "10",
"refresh_retry_max_backoff_ms": "20",
},
],
)
def test_alters(
module_setup_tables,
fn_setup_tables,
with_append,
if_not_exists,
on_cluster,
depends_on,
empty,
database_name,
settings,
):
"""
Check correctness of functional states of RMV after CREATE, DROP, ALTER, trigger of RMV, ...
"""
create_sql = CREATE_RMV.render(
table_name="test_rmv",
if_not_exists=if_not_exists,
db=database_name,
refresh_interval="EVERY 1 HOUR",
depends_on=depends_on,
to_clause="tgt1",
select_query="SELECT * FROM src1",
with_append=with_append,
on_cluster="default" if on_cluster else None,
settings=settings,
)
node.query(create_sql)
# Check same RMV is created on whole cluster
def compare_DDL_on_all_nodes():
show_create_all_nodes = cluster.query_all_nodes("SHOW CREATE test_rmv")
if not on_cluster:
del show_create_all_nodes["node1_2"]
assert_same_values(show_create_all_nodes.values())
compare_DDL_on_all_nodes()
maybe_db = f"{database_name}." if database_name else ""
node.query(
f"DROP TABLE {maybe_db}test_rmv {'ON CLUSTER default' if on_cluster else ''}"
)
node.query(create_sql)
compare_DDL_on_all_nodes()
show_create = node.query(f"SHOW CREATE {maybe_db}test_rmv")
alter_sql = ALTER_RMV.render(
table_name="test_rmv",
if_not_exists=if_not_exists,
db=database_name,
refresh_interval="EVERY 1 HOUR",
depends_on=depends_on,
# can't change select with alter
# select_query="SELECT * FROM src1",
with_append=with_append,
on_cluster="default" if on_cluster else None,
settings=settings,
)
node.query(alter_sql)
show_create_after_alter = node.query(f"SHOW CREATE {maybe_db}test_rmv")
assert show_create == show_create_after_alter
compare_DDL_on_all_nodes()
@pytest.mark.parametrize(
"append",
[True, False],
)
@pytest.mark.parametrize(
"empty",
[True, False],
)
@pytest.mark.parametrize(
"to_clause",
[
(None, "tgt1", "tgt1"),
("Engine MergeTree ORDER BY tuple()", None, "test_rmv"),
],
)
def test_real_wait_refresh(
fn_setup_tables,
append,
empty,
to_clause,
):
if node.is_built_with_sanitizer():
pytest.skip("Disabled for sanitizers (too slow)")
table_clause, to_clause_, tgt = to_clause
create_sql = CREATE_RMV.render(
table_name="test_rmv",
refresh_interval="EVERY 10 SECOND",
to_clause=to_clause_,
table_clause=table_clause,
select_query="SELECT now() as a, b FROM src1",
with_append=append,
empty=empty,
)
node.query(create_sql)
rmv = get_rmv_info(node, "test_rmv")
expected_rows = 0
if empty:
expect_rows(expected_rows, table=tgt)
else:
expected_rows += 2
expect_rows(expected_rows, table=tgt)
rmv2 = get_rmv_info(
node,
"test_rmv",
condition=lambda x: x["last_refresh_time"] == rmv["next_refresh_time"],
# wait for refresh a little bit more than 10 seconds
max_attempts=12,
delay=1,
)
if append:
expected_rows += 2
expect_rows(expected_rows, table=tgt)
else:
expect_rows(2, table=tgt)
assert rmv2["exception"] is None
assert rmv2["status"] == "Scheduled"
assert rmv2["last_success_time"] == rmv["next_refresh_time"]
assert rmv2["last_refresh_time"] == rmv["next_refresh_time"]
assert rmv2["retry"] == 0
assert rmv2["read_rows"] == 2
assert rmv2["read_bytes"] == 16
assert rmv2["total_rows"] == 2
assert rmv2["written_rows"] == 2
assert rmv2["written_bytes"] == 24
node.query("SYSTEM STOP VIEW test_rmv")
time.sleep(12)
rmv3 = get_rmv_info(node, "test_rmv")
# no refresh happen
assert rmv3["status"] == "Disabled"
del rmv3["status"]
del rmv2["status"]
assert rmv3 == rmv2
node.query("SYSTEM START VIEW test_rmv")
time.sleep(1)
rmv4 = get_rmv_info(node, "test_rmv")
if append:
expected_rows += 2
expect_rows(expected_rows, table=tgt)
else:
expect_rows(2, table=tgt)
assert rmv4["exception"] is None
assert rmv4["status"] == "Scheduled"
assert rmv4["retry"] == 0
assert rmv2["read_rows"] == 2
assert rmv2["read_bytes"] == 16
assert rmv2["total_rows"] == 2
assert rmv2["written_rows"] == 2
assert rmv2["written_bytes"] == 24
node.query("SYSTEM REFRESH VIEW test_rmv")
time.sleep(1)
if append:
expected_rows += 2
expect_rows(expected_rows, table=tgt)
else:
expect_rows(2, table=tgt)
def get_rmv_info(
node,
table,
condition=None,
max_attempts=50,
delay=0.3,
wait_status=None,
):
def inner():
rmv_info = node.query_with_retry(
f"SELECT * FROM system.view_refreshes WHERE view='{table}'",
check_callback=(
(lambda r: r.iloc[0]["status"] == wait_status)
if wait_status
else (lambda x: True)
),
parse=True,
).to_dict("records")[0]
rmv_info["next_refresh_time"] = parse_ch_datetime(rmv_info["next_refresh_time"])
rmv_info["last_success_time"] = parse_ch_datetime(rmv_info["last_success_time"])
rmv_info["last_refresh_time"] = parse_ch_datetime(rmv_info["last_refresh_time"])
logging.info(rmv_info)
return rmv_info
if condition:
res = wait_condition(inner, condition, max_attempts=max_attempts, delay=delay)
return res
res = inner()
return res
def parse_ch_datetime(date_str):
if date_str is None:
return None
return datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")
def expect_rows(rows, table="test_rmv"):
inserted_data = node.query_with_retry(
f"SELECT * FROM {table}",
parse=True,
check_callback=lambda x: len(x) == rows,
retry_count=100,
)
assert len(inserted_data) == rows
def test_long_query(fn_setup_tables):
if node.is_built_with_sanitizer():
pytest.skip("Disabled for sanitizers")
create_sql = CREATE_RMV.render(
table_name="test_rmv",
refresh_interval="EVERY 1 HOUR",
to_clause="tgt1",
select_query="SELECT now() a, sleep(1) b from numbers(10) settings max_block_size=1",
with_append=False,
empty=True,
)
node.query(create_sql)
rmv = get_rmv_info(node, "test_rmv")
node.query(f"SYSTEM TEST VIEW test_rmv SET FAKE TIME '{rmv['next_refresh_time']}'")
wait_seconds = 0
progresses = []
while wait_seconds < 30:
rmv = get_rmv_info(node, "test_rmv")
if rmv["progress"] == 1:
break
time.sleep(0.01)
wait_seconds += 0.01
# didn't start yet
if rmv["status"] == "Scheduled" or rmv["progress"] == 0:
continue
assert rmv["status"] == "Running"
assert 0 < rmv["read_rows"] < 100000000
assert 0 < rmv["read_bytes"] < 1200000000
progresses.append(rmv["progress"])
assert rmv["progress"] == 1
assert rmv["exception"] is None
assert any(0 < p < 1 for p in progresses)
def test_long_query_cancel(fn_setup_tables):
if node.is_built_with_sanitizer():
pytest.skip("Disabled for sanitizers")
create_sql = CREATE_RMV.render(
table_name="test_rmv",
refresh_interval="EVERY 3 SECONDS",
to_clause="tgt1",
select_query="SELECT now() a, sleep(1) b from numbers(5) settings max_block_size=1",
with_append=False,
empty=True,
settings={"refresh_retries": "0"},
)
node.query(create_sql)
get_rmv_info(node, "test_rmv", delay=0.1, max_attempts=1000, wait_status="Running")
node.query("SYSTEM CANCEL VIEW test_rmv")
rmv = get_rmv_info(node, "test_rmv", wait_status="Scheduled")
assert rmv["status"] == "Scheduled"
assert rmv["exception"] == "cancelled"
assert rmv["last_success_time"] is None
assert node.query("SELECT count() FROM tgt1") == "0\n"
get_rmv_info(node, "test_rmv", delay=0.1, max_attempts=1000, wait_status="Running")
get_rmv_info(
node, "test_rmv", delay=0.1, max_attempts=1000, wait_status="Scheduled"
)
assert node.query("SELECT count() FROM tgt1") == "5\n"
@pytest.fixture(scope="function")
def fn3_setup_tables():
node.query("DROP TABLE IF EXISTS tgt1 ON CLUSTER default")
node.query("DROP TABLE IF EXISTS test_rmv ON CLUSTER default")
node.query("DROP TABLE IF EXISTS test_db.test_rmv ON CLUSTER default")
node.query(f"CREATE TABLE tgt1 ON CLUSTER default (a DateTime) ENGINE = Memory")
def test_query_fail(fn3_setup_tables):
if node.is_built_with_sanitizer():
pytest.skip("Disabled for sanitizers")
create_sql = CREATE_RMV.render(
table_name="test_rmv",
refresh_interval="EVERY 1 HOUR",
to_clause="tgt1",
# Argument at index 1 for function throwIf must be constant
select_query="SELECT throwIf(1, toString(rand())) a",
with_append=False,
empty=True,
settings={
"refresh_retries": "10",
},
)
with pytest.raises(helpers.client.QueryRuntimeException) as exc:
node.query(create_sql)
assert "Argument at index 1 for function throwIf must be constant" in str(
exc.value
)
assert (
node.query(f"SELECT count() FROM system.view_refreshes WHERE view='test_rmv'")
== "0\n"
)
assert (
node.query(f"SELECT count() FROM system.tables WHERE name='test_rmv'") == "0\n"
)
def test_query_retry(fn3_setup_tables):
if node.is_built_with_sanitizer():
pytest.skip("Disabled for sanitizers")
create_sql = CREATE_RMV.render(
table_name="test_rmv",
refresh_interval="EVERY 2 SECOND",
to_clause="tgt1",
select_query="SELECT throwIf(1, '111') a",
with_append=False,
empty=True,
settings={
"refresh_retries": "10",
"refresh_retry_initial_backoff_ms": "1",
"refresh_retry_max_backoff_ms": "1",
},
)
node.query(create_sql)
rmv = get_rmv_info(
node,
"test_rmv",
delay=0.1,
max_attempts=1000,
condition=lambda x: x["retry"] == 11,
)
assert rmv["retry"] == 11
assert "FUNCTION_THROW_IF_VALUE_IS_NON_ZERO" in rmv["exception"]

View File

@ -0,0 +1,3 @@
<clickhouse>
<timezone>Etc/UTC</timezone>
</clickhouse>

View File

@ -0,0 +1,16 @@
<clickhouse>
<remote_servers>
<default>
<shard>
<replica>
<host>node1_1</host>
<port>9000</port>
</replica>
<replica>
<host>node1_2</host>
<port>9000</port>
</replica>
</shard>
</default>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,8 @@
<clickhouse>
<profiles>
<default>
<allow_experimental_refreshable_materialized_view>1</allow_experimental_refreshable_materialized_view>
<session_timezone>Etc/UTC</session_timezone>
</default>
</profiles>
</clickhouse>

View File

@ -0,0 +1,625 @@
import datetime
import logging
import time
from datetime import datetime
from typing import Optional
import pytest
from jinja2 import Environment, Template
import helpers.client
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import wait_condition
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node1_1",
main_configs=["configs/remote_servers.xml"],
user_configs=["configs/settings.xml"],
with_zookeeper=True,
stay_alive=True,
keeper_required_feature_flags=["multi_read", "create_if_not_exists"],
macros={"shard": 1, "replica": 1},
)
node2 = cluster.add_instance(
"node1_2",
main_configs=["configs/remote_servers.xml"],
user_configs=["configs/settings.xml"],
with_zookeeper=True,
stay_alive=True,
keeper_required_feature_flags=["multi_read", "create_if_not_exists"],
macros={"shard": 1, "replica": 2},
)
nodes = [node, node2]
@pytest.fixture(scope="session", autouse=True)
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
"""
### TESTS
+1. Append mode
2. Restart node and wait for restore
+3. Simple functional testing: all values in refresh result correct (two and more rmv)
+4. Combinations of intervals
+5. Two (and more) rmv from single to single [APPEND]
+6. ALTER rmv ALTER TABLE [db.]name MODIFY REFRESH EVERY|AFTER ... [RANDOMIZE FOR ...] [DEPENDS ON ...] [SETTINGS ...]
+7. RMV without tgt table (automatic table) (check APPEND)
+8 DROP rmv
+9 CREATE - DROP - CREATE - ALTER
11. Long queries over refresh time (check settings)
13. incorrect intervals (interval 1 sec, offset 1 minute)
- OFFSET less than the period. 'EVERY 1 MONTH OFFSET 5 WEEK'
- cases below
+14. ALTER on cluster
15. write to distributed with / without APPEND
17. Not existent TO table (ON CLUSTER)
18. Not existent FROM table (ON CLUSTER)
19. Not existent BOTH tables (ON CLUSTER)
+20. retry failed
21. overflow with wait test
22. ON CLUSTER
+SYSTEM STOP|START|REFRESH|CANCEL VIEW
+SYSTEM WAIT VIEW [db.]name
"""
def j2_template(
string: str,
globals: Optional[dict] = None,
filters: Optional[dict] = None,
tests: Optional[dict] = None,
) -> Template:
def uppercase(value: str):
return value.upper()
def lowercase(value: str):
return value.lower()
def format_settings(items: dict):
return ", ".join([f"{k}={v}" for k, v in items.items()])
# Create a custom environment and add the functions
env = Environment(
trim_blocks=False, lstrip_blocks=True, keep_trailing_newline=False
)
env.globals["uppercase"] = uppercase
env.globals["lowercase"] = lowercase
env.filters["format_settings"] = format_settings
if filters:
env.filters.update(filters)
if globals:
env.globals.update(globals)
if tests:
env.tests.update(tests)
return env.from_string(string)
def assert_same_values(lst: list):
if not isinstance(lst, list):
lst = list(lst)
assert all(x == lst[0] for x in lst)
RMV_TEMPLATE = """{{ refresh_interval }}
{% if depends_on %}DEPENDS ON {{ depends_on|join(', ') }}{% endif %}
{% if settings %}SETTINGS {{ settings|format_settings }}{% endif %}
{% if with_append %}APPEND{% endif %}
{% if to_clause %}TO {{ to_clause }}{% endif %}
{% if table_clause %}{{ table_clause }}{% endif %}
{% if empty %}EMPTY{% endif %}
{% if select_query %} AS {{ select_query }}{% endif %}
"""
CREATE_RMV = j2_template(
"""CREATE MATERIALIZED VIEW
{% if if_not_exists %}IF NOT EXISTS{% endif %}
{% if db %}{{db}}.{% endif %}{{ table_name }}
{% if on_cluster %}ON CLUSTER {{ on_cluster }}{% endif %}
REFRESH
"""
+ RMV_TEMPLATE
)
ALTER_RMV = j2_template(
"""ALTER TABLE
{% if db %}{{db}}.{% endif %}{{ table_name }}
{% if on_cluster %}ON CLUSTER {{ on_cluster }}{% endif %}
MODIFY REFRESH
"""
+ RMV_TEMPLATE
)
@pytest.fixture(scope="module", autouse=True)
def module_setup_tables(started_cluster):
# default is Atomic by default
node.query(f"DROP DATABASE IF EXISTS default ON CLUSTER default SYNC")
node.query(
"CREATE DATABASE IF NOT EXISTS default ON CLUSTER default ENGINE=Replicated('/clickhouse/default/','{shard}','{replica}')"
)
assert (
node.query(
f"SELECT engine FROM clusterAllReplicas(default, system.databases) where name='default'"
)
== "Replicated\nReplicated\n"
)
node.query(f"DROP DATABASE IF EXISTS test_db ON CLUSTER default SYNC")
node.query(
"CREATE DATABASE test_db ON CLUSTER default ENGINE=Replicated('/clickhouse/test_db/','{shard}','{replica}')"
)
assert (
node.query(
f"SELECT engine FROM clusterAllReplicas(default, system.databases) where name='test_db'"
)
== "Replicated\nReplicated\n"
)
node.query("DROP TABLE IF EXISTS src1 ON CLUSTER default")
node.query("DROP TABLE IF EXISTS src2 ON CLUSTER default")
node.query("DROP TABLE IF EXISTS tgt1 ON CLUSTER default")
node.query("DROP TABLE IF EXISTS tgt2 ON CLUSTER default")
node.query("DROP TABLE IF EXISTS test_rmv ON CLUSTER default")
node.query("DROP TABLE IF EXISTS test_db.test_rmv")
node.query(
f"CREATE TABLE src1 ON CLUSTER default (a DateTime, b UInt64) ENGINE = ReplicatedMergeTree() ORDER BY tuple()"
)
node.query(
f"CREATE TABLE src2 ON CLUSTER default (a DateTime, b UInt64) ENGINE = ReplicatedMergeTree() ORDER BY tuple()"
)
node.query(
f"CREATE TABLE tgt1 ON CLUSTER default (a DateTime, b UInt64) ENGINE = ReplicatedMergeTree() ORDER BY tuple()"
)
node.query(
f"CREATE TABLE tgt2 ON CLUSTER default (a DateTime, b UInt64) ENGINE = ReplicatedMergeTree() ORDER BY tuple()"
)
node.query(
f"CREATE MATERIALIZED VIEW IF NOT EXISTS dummy_rmv ON CLUSTER default "
f"REFRESH EVERY 10 HOUR ENGINE = ReplicatedMergeTree() ORDER BY tuple() EMPTY AS select number as x from numbers(1)"
)
@pytest.fixture(scope="function")
def fn_setup_tables():
node.query("DROP TABLE IF EXISTS src1 ON CLUSTER default")
node.query("DROP TABLE IF EXISTS tgt1 ON CLUSTER default")
node.query("DROP TABLE IF EXISTS test_rmv ON CLUSTER default")
node.query("DROP TABLE IF EXISTS test_db.test_rmv")
node.query(
f"CREATE TABLE tgt1 ON CLUSTER default (a DateTime, b UInt64) "
f"ENGINE = ReplicatedMergeTree ORDER BY tuple()"
)
node.query(
f"CREATE TABLE src1 ON CLUSTER default (a DateTime, b UInt64) "
f"ENGINE = ReplicatedMergeTree ORDER BY tuple()"
)
node.query(f"INSERT INTO src1 VALUES ('2020-01-01', 1), ('2020-01-02', 2)")
@pytest.mark.parametrize(
"select_query",
[
"SELECT now() as a, number as b FROM numbers(2)",
"SELECT now() as a, b as b FROM src1",
],
)
@pytest.mark.parametrize("with_append", [True, False])
@pytest.mark.parametrize("empty", [True, False])
def test_simple_append(
module_setup_tables,
fn_setup_tables,
select_query,
with_append,
empty,
):
create_sql = CREATE_RMV.render(
table_name="test_rmv",
refresh_interval="EVERY 1 HOUR",
to_clause="tgt1",
select_query=select_query,
with_append=with_append,
on_cluster="default",
empty=empty,
)
node.query(create_sql)
rmv = get_rmv_info(node, "test_rmv", wait_status="Scheduled")
assert rmv["exception"] is None
node.query("SYSTEM SYNC DATABASE REPLICA ON CLUSTER default default")
records = node.query("SELECT count() FROM test_rmv")
if empty:
assert records == "0\n"
else:
assert records == "2\n"
for n in nodes:
n.query(f"SYSTEM TEST VIEW test_rmv SET FAKE TIME '{rmv['next_refresh_time']}'")
rmv2 = get_rmv_info(node, "test_rmv", wait_status="Scheduled")
assert rmv2["exception"] is None
node.query("SYSTEM SYNC DATABASE REPLICA ON CLUSTER default default")
if empty:
expect = "2\n"
if not with_append:
expect = "2\n"
if with_append and not empty:
expect = "4\n"
records = node.query_with_retry(
"SELECT count() FROM test_rmv", check_callback=lambda x: x == expect
)
assert records == expect
@pytest.mark.parametrize("with_append", [True, False])
@pytest.mark.parametrize("if_not_exists", [True, False])
@pytest.mark.parametrize("depends_on", [None, ["default.dummy_rmv"]])
@pytest.mark.parametrize("empty", [True, False])
@pytest.mark.parametrize("database_name", ["test_db"]) # None,
@pytest.mark.parametrize(
"settings",
[
{},
{
"refresh_retries": "10",
"refresh_retry_initial_backoff_ms": "10",
"refresh_retry_max_backoff_ms": "20",
},
],
)
def test_alters(
module_setup_tables,
fn_setup_tables,
with_append,
if_not_exists,
depends_on,
empty,
database_name,
settings,
):
"""
Check correctness of functional states of RMV after CREATE, DROP, ALTER, trigger of RMV, ...
"""
create_sql = CREATE_RMV.render(
table_name="test_rmv",
if_not_exists=if_not_exists,
db="test_db",
refresh_interval="EVERY 1 HOUR",
depends_on=depends_on,
to_clause="tgt1",
select_query="SELECT * FROM src1",
with_append=with_append,
settings=settings,
)
node.query(create_sql)
# Check same RMV is created on whole cluster
def compare_DDL_on_all_nodes():
show_create_all_nodes = cluster.query_all_nodes("SHOW CREATE test_rmv")
assert_same_values(show_create_all_nodes.values())
compare_DDL_on_all_nodes()
node.query(f"DROP TABLE test_db.test_rmv")
node.query(create_sql)
compare_DDL_on_all_nodes()
show_create = node.query(f"SHOW CREATE test_db.test_rmv")
alter_sql = ALTER_RMV.render(
table_name="test_rmv",
if_not_exists=if_not_exists,
db="test_db",
refresh_interval="EVERY 1 HOUR",
depends_on=depends_on,
# can't change select with alter
# select_query="SELECT * FROM src1",
with_append=with_append,
settings=settings,
)
node.query(alter_sql)
show_create_after_alter = node.query(f"SHOW CREATE test_db.test_rmv")
assert show_create == show_create_after_alter
compare_DDL_on_all_nodes()
@pytest.mark.parametrize(
"append",
[True, False],
)
@pytest.mark.parametrize(
"empty",
[True, False],
)
@pytest.mark.parametrize(
"to_clause",
[
(None, "tgt1", "tgt1"),
("Engine ReplicatedMergeTree ORDER BY tuple()", None, "test_rmv"),
],
)
def test_real_wait_refresh(
fn_setup_tables,
append,
empty,
to_clause,
):
table_clause, to_clause_, tgt = to_clause
create_sql = CREATE_RMV.render(
table_name="test_rmv",
refresh_interval="EVERY 10 SECOND",
to_clause=to_clause_,
table_clause=table_clause,
select_query="SELECT now() as a, b FROM src1",
with_append=append,
on_cluster="default",
empty=empty,
)
node.query(create_sql)
rmv = get_rmv_info(node, "test_rmv")
time.sleep(1)
node.query("SYSTEM SYNC DATABASE REPLICA ON CLUSTER default default")
expected_rows = 0
if empty:
expect_rows(expected_rows, table=tgt)
else:
expected_rows += 2
expect_rows(expected_rows, table=tgt)
rmv2 = get_rmv_info(
node,
"test_rmv",
condition=lambda x: x["last_refresh_time"] == rmv["next_refresh_time"],
# wait for refresh a little bit more than 10 seconds
max_attempts=12,
delay=1,
wait_status="Scheduled",
)
node.query("SYSTEM SYNC DATABASE REPLICA ON CLUSTER default default")
rmv22 = get_rmv_info(
node,
"test_rmv",
wait_status="Scheduled",
)
if append:
expected_rows += 2
expect_rows(expected_rows, table=tgt)
else:
expect_rows(2, table=tgt)
assert rmv2["exception"] is None
assert rmv2["status"] == "Scheduled"
assert rmv2["last_success_time"] == rmv["next_refresh_time"]
assert rmv2["last_refresh_time"] == rmv["next_refresh_time"]
assert rmv2["retry"] == 0 and rmv22["retry"] == 0
for n in nodes:
n.query("SYSTEM STOP VIEW test_rmv")
time.sleep(12)
rmv3 = get_rmv_info(node, "test_rmv")
# no refresh happen
assert rmv3["status"] == "Disabled"
del rmv3["status"]
del rmv2["status"]
assert rmv3 == rmv2
for n in nodes:
n.query("SYSTEM START VIEW test_rmv")
time.sleep(1)
rmv4 = get_rmv_info(node, "test_rmv")
if append:
expected_rows += 2
expect_rows(expected_rows, table=tgt)
else:
expect_rows(2, table=tgt)
assert rmv4["exception"] is None
assert rmv4["status"] == "Scheduled"
assert rmv4["retry"] == 0
node.query("SYSTEM REFRESH VIEW test_rmv")
time.sleep(1)
if append:
expected_rows += 2
expect_rows(expected_rows, table=tgt)
else:
expect_rows(2, table=tgt)
def get_rmv_info(
node,
table,
condition=None,
max_attempts=50,
delay=0.3,
wait_status=None,
):
def inner():
rmv_info = node.query_with_retry(
f"SELECT * FROM system.view_refreshes WHERE view='{table}'",
check_callback=(
(lambda r: r.iloc[0]["status"] == wait_status)
if wait_status
else (lambda x: True)
),
parse=True,
).to_dict("records")[0]
rmv_info["next_refresh_time"] = parse_ch_datetime(rmv_info["next_refresh_time"])
rmv_info["last_success_time"] = parse_ch_datetime(rmv_info["last_success_time"])
rmv_info["last_refresh_time"] = parse_ch_datetime(rmv_info["last_refresh_time"])
logging.info(rmv_info)
return rmv_info
if condition:
res = wait_condition(inner, condition, max_attempts=max_attempts, delay=delay)
return res
res = inner()
return res
def parse_ch_datetime(date_str):
if date_str is None:
return None
return datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")
def expect_rows(rows, table="test_rmv"):
inserted_data = node.query_with_retry(
f"SELECT * FROM {table}",
parse=True,
check_callback=lambda x: len(x) == rows,
retry_count=100,
)
assert len(inserted_data) == rows
def test_long_query_cancel(fn_setup_tables):
if node.is_built_with_sanitizer():
pytest.skip("Disabled for sanitizers")
create_sql = CREATE_RMV.render(
table_name="test_rmv",
refresh_interval="EVERY 5 SECONDS",
to_clause="tgt1",
select_query="SELECT now() a, sleep(1) b from numbers(5) settings max_block_size=1",
with_append=False,
empty=True,
settings={"refresh_retries": "0"},
)
node.query(create_sql)
done = False
start = time.time()
while not done:
for n in nodes:
n.query("SYSTEM CANCEL VIEW test_rmv")
if get_rmv_info(node2, "test_rmv")["exception"] == "cancelled":
done = True
time.sleep(0.1)
if time.time() - start > 10:
raise AssertionError("Can't cancel query")
rmv = get_rmv_info(node, "test_rmv", wait_status="Scheduled")
assert rmv["status"] == "Scheduled"
assert rmv["exception"] == "cancelled"
assert rmv["last_success_time"] is None
assert node.query("SELECT count() FROM tgt1") == "0\n"
get_rmv_info(node, "test_rmv", delay=0.1, max_attempts=1000, wait_status="Running")
get_rmv_info(
node, "test_rmv", delay=0.1, max_attempts=1000, wait_status="Scheduled"
)
assert node.query("SELECT count() FROM tgt1") == "5\n"
@pytest.fixture(scope="function")
def fn3_setup_tables():
node.query("DROP TABLE IF EXISTS tgt1 ON CLUSTER default")
node.query("DROP TABLE IF EXISTS test_rmv ON CLUSTER default")
node.query("DROP TABLE IF EXISTS test_db.test_rmv")
node.query(
f"CREATE TABLE tgt1 ON CLUSTER default (a DateTime) ENGINE = ReplicatedMergeTree ORDER BY tuple()"
)
def test_query_fail(fn3_setup_tables):
if node.is_built_with_sanitizer():
pytest.skip("Disabled for sanitizers")
create_sql = CREATE_RMV.render(
table_name="test_rmv",
refresh_interval="EVERY 1 HOUR",
to_clause="tgt1",
# Argument at index 1 for function throwIf must be constant
select_query="SELECT throwIf(1, toString(rand())) a",
with_append=False,
on_cluster="default",
empty=True,
settings={
"refresh_retries": "10",
},
)
with pytest.raises(helpers.client.QueryRuntimeException) as exc:
node.query(create_sql)
assert "Argument at index 1 for function throwIf must be constant" in str(
exc.value
)
assert (
node.query(f"SELECT count() FROM system.view_refreshes WHERE view='test_rmv'")
== "0\n"
)
assert (
node.query(f"SELECT count() FROM system.tables WHERE name='test_rmv'") == "0\n"
)
def test_query_retry(fn3_setup_tables):
if node.is_built_with_sanitizer():
pytest.skip("Disabled for sanitizers")
create_sql = CREATE_RMV.render(
table_name="test_rmv",
refresh_interval="EVERY 2 SECOND",
to_clause="tgt1",
select_query="SELECT throwIf(1, '111') a",
with_append=False,
on_cluster="default",
empty=True,
settings={
"refresh_retries": "10",
"refresh_retry_initial_backoff_ms": "1",
"refresh_retry_max_backoff_ms": "1",
},
)
node.query(create_sql)
rmv = get_rmv_info(
node,
"test_rmv",
delay=0.1,
max_attempts=1000,
condition=lambda x: x["retry"] == 11,
)
assert rmv["retry"] == 11
assert "FUNCTION_THROW_IF_VALUE_IS_NON_ZERO" in rmv["exception"]