mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 00:22:29 +00:00
Merge 9d998bd062
into de13b819f0
This commit is contained in:
commit
b857554829
@ -101,3 +101,4 @@ wadllib==1.3.6
|
||||
websocket-client==1.8.0
|
||||
wheel==0.38.1
|
||||
zipp==1.0.0
|
||||
jinja2==3.1.3
|
||||
|
@ -157,13 +157,14 @@ For your convenience, the old documentation is located [here](https://pastila.nl
|
||||
## Refreshable Materialized View [Experimental] {#refreshable-materialized-view}
|
||||
|
||||
```sql
|
||||
CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]table_name
|
||||
CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
|
||||
REFRESH EVERY|AFTER interval [OFFSET interval]
|
||||
RANDOMIZE FOR interval
|
||||
DEPENDS ON [db.]name [, [db.]name [, ...]]
|
||||
SETTINGS name = value [, name = value [, ...]]
|
||||
[RANDOMIZE FOR interval]
|
||||
[DEPENDS ON [db.]name [, [db.]name [, ...]]]
|
||||
[SETTINGS name = value [, name = value [, ...]]]
|
||||
[APPEND]
|
||||
[TO[db.]name] [(columns)] [ENGINE = engine] [EMPTY]
|
||||
[TO[db.]name] [(columns)] [ENGINE = engine]
|
||||
[EMPTY]
|
||||
AS SELECT ...
|
||||
[COMMENT 'comment']
|
||||
```
|
||||
@ -281,7 +282,7 @@ This replaces *all* refresh parameters at once: schedule, dependencies, settings
|
||||
|
||||
The status of all refreshable materialized views is available in table [`system.view_refreshes`](../../../operations/system-tables/view_refreshes.md). In particular, it contains refresh progress (if running), last and next refresh time, exception message if a refresh failed.
|
||||
|
||||
To manually stop, start, trigger, or cancel refreshes use [`SYSTEM STOP|START|REFRESH|CANCEL VIEW`](../system.md#refreshable-materialized-views).
|
||||
To manually stop, start, trigger, or cancel refreshes use [`SYSTEM STOP|START|REFRESH|WAIT|CANCEL VIEW`](../system.md#refreshable-materialized-views).
|
||||
|
||||
To wait for a refresh to complete, use [`SYSTEM WAIT VIEW`](../system.md#refreshable-materialized-views). In particular, useful for waiting for initial refresh after creating a view.
|
||||
|
||||
|
@ -4,6 +4,9 @@ import subprocess as sp
|
||||
import tempfile
|
||||
from threading import Timer
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
|
||||
DEFAULT_QUERY_TIMEOUT = 600
|
||||
|
||||
|
||||
@ -59,6 +62,7 @@ class Client:
|
||||
host=None,
|
||||
ignore_error=False,
|
||||
query_id=None,
|
||||
parse=False,
|
||||
):
|
||||
return self.get_query_request(
|
||||
sql,
|
||||
@ -71,6 +75,7 @@ class Client:
|
||||
host=host,
|
||||
ignore_error=ignore_error,
|
||||
query_id=query_id,
|
||||
parse=parse,
|
||||
).get_answer()
|
||||
|
||||
def get_query_request(
|
||||
@ -85,6 +90,7 @@ class Client:
|
||||
host=None,
|
||||
ignore_error=False,
|
||||
query_id=None,
|
||||
parse=False,
|
||||
):
|
||||
command = self.command[:]
|
||||
|
||||
@ -107,8 +113,10 @@ class Client:
|
||||
command += ["--host", host]
|
||||
if query_id is not None:
|
||||
command += ["--query_id", query_id]
|
||||
if parse:
|
||||
command += ["--format=TabSeparatedWithNames"]
|
||||
|
||||
return CommandRequest(command, stdin, timeout, ignore_error)
|
||||
return CommandRequest(command, stdin, timeout, ignore_error, parse)
|
||||
|
||||
@stacktraces_on_timeout_decorator
|
||||
def query_and_get_error(
|
||||
@ -169,7 +177,9 @@ class QueryRuntimeException(Exception):
|
||||
|
||||
|
||||
class CommandRequest:
|
||||
def __init__(self, command, stdin=None, timeout=None, ignore_error=False):
|
||||
def __init__(
|
||||
self, command, stdin=None, timeout=None, ignore_error=False, parse=False
|
||||
):
|
||||
# Write data to tmp file to avoid PIPEs and execution blocking
|
||||
stdin_file = tempfile.TemporaryFile(mode="w+")
|
||||
stdin_file.write(stdin)
|
||||
@ -177,7 +187,7 @@ class CommandRequest:
|
||||
self.stdout_file = tempfile.TemporaryFile()
|
||||
self.stderr_file = tempfile.TemporaryFile()
|
||||
self.ignore_error = ignore_error
|
||||
|
||||
self.parse = parse
|
||||
# print " ".join(command)
|
||||
|
||||
# we suppress stderror on client becase sometimes thread sanitizer
|
||||
@ -243,6 +253,15 @@ class CommandRequest:
|
||||
stderr,
|
||||
)
|
||||
|
||||
if self.parse:
|
||||
from io import StringIO
|
||||
|
||||
return (
|
||||
pd.read_csv(StringIO(stdout), sep="\t")
|
||||
.replace(r"\N", None)
|
||||
.replace(np.nan, None)
|
||||
)
|
||||
|
||||
return stdout
|
||||
|
||||
def get_error(self):
|
||||
|
@ -48,6 +48,7 @@ except Exception as e:
|
||||
|
||||
import docker
|
||||
from dict2xml import dict2xml
|
||||
from docker.models.containers import Container
|
||||
from kazoo.client import KazooClient
|
||||
from kazoo.exceptions import KazooException
|
||||
from minio import Minio
|
||||
@ -523,7 +524,7 @@ class ClickHouseCluster:
|
||||
self.base_jdbc_bridge_cmd = []
|
||||
self.base_redis_cmd = []
|
||||
self.pre_zookeeper_commands = []
|
||||
self.instances = {}
|
||||
self.instances: dict[str, ClickHouseInstance] = {}
|
||||
self.with_zookeeper = False
|
||||
self.with_zookeeper_secure = False
|
||||
self.with_mysql_client = False
|
||||
@ -763,7 +764,7 @@ class ClickHouseCluster:
|
||||
self.prometheus_remote_read_handler_port = 9092
|
||||
self.prometheus_remote_read_handler_path = "/read"
|
||||
|
||||
self.docker_client = None
|
||||
self.docker_client: docker.DockerClient = None
|
||||
self.is_up = False
|
||||
self.env = os.environ.copy()
|
||||
logging.debug(f"CLUSTER INIT base_config_dir:{self.base_config_dir}")
|
||||
@ -954,7 +955,7 @@ class ClickHouseCluster:
|
||||
except:
|
||||
pass
|
||||
|
||||
def get_docker_handle(self, docker_id):
|
||||
def get_docker_handle(self, docker_id) -> Container:
|
||||
exception = None
|
||||
for i in range(20):
|
||||
try:
|
||||
@ -3349,6 +3350,12 @@ class ClickHouseCluster:
|
||||
logging.info("Starting zookeeper node: %s", n)
|
||||
subprocess_check_call(self.base_zookeeper_cmd + ["start", n])
|
||||
|
||||
def query_all_nodes(self, sql, *args, **kwargs):
|
||||
return {
|
||||
name: instance.query(sql, ignore_error=True, *args, **kwargs)
|
||||
for name, instance in self.instances.items()
|
||||
}
|
||||
|
||||
|
||||
DOCKER_COMPOSE_TEMPLATE = """---
|
||||
services:
|
||||
@ -3621,6 +3628,7 @@ class ClickHouseInstance:
|
||||
host=None,
|
||||
ignore_error=False,
|
||||
query_id=None,
|
||||
parse=False,
|
||||
):
|
||||
sql_for_log = ""
|
||||
if len(sql) > 1000:
|
||||
@ -3639,6 +3647,7 @@ class ClickHouseInstance:
|
||||
ignore_error=ignore_error,
|
||||
query_id=query_id,
|
||||
host=host,
|
||||
parse=parse,
|
||||
)
|
||||
|
||||
def query_with_retry(
|
||||
@ -3655,6 +3664,7 @@ class ClickHouseInstance:
|
||||
retry_count=20,
|
||||
sleep_time=0.5,
|
||||
check_callback=lambda x: True,
|
||||
parse=False,
|
||||
):
|
||||
# logging.debug(f"Executing query {sql} on {self.name}")
|
||||
result = None
|
||||
@ -3671,6 +3681,7 @@ class ClickHouseInstance:
|
||||
database=database,
|
||||
host=host,
|
||||
ignore_error=ignore_error,
|
||||
parse=parse,
|
||||
)
|
||||
if check_callback(result):
|
||||
return result
|
||||
@ -4403,7 +4414,7 @@ class ClickHouseInstance:
|
||||
else:
|
||||
self.wait_start(time_left)
|
||||
|
||||
def get_docker_handle(self):
|
||||
def get_docker_handle(self) -> Container:
|
||||
return self.cluster.get_docker_handle(self.docker_id)
|
||||
|
||||
def stop(self):
|
||||
|
@ -182,3 +182,16 @@ def csv_compare(result, expected):
|
||||
mismatch.append("+[%d]=%s" % (i, csv_result.lines[i]))
|
||||
|
||||
return "\n".join(mismatch)
|
||||
|
||||
|
||||
def wait_condition(func, condition, max_attempts=10, delay=0.1):
|
||||
attempts = 0
|
||||
while attempts < max_attempts:
|
||||
result = func()
|
||||
if condition(result):
|
||||
return result
|
||||
attempts += 1
|
||||
if attempts < max_attempts:
|
||||
time.sleep(delay)
|
||||
|
||||
raise Exception(f"Function did not satisfy condition after {max_attempts} attempts")
|
||||
|
@ -0,0 +1,3 @@
|
||||
<clickhouse>
|
||||
<timezone>Etc/UTC</timezone>
|
||||
</clickhouse>
|
@ -0,0 +1,16 @@
|
||||
<clickhouse>
|
||||
<remote_servers>
|
||||
<default>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>node1_1</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
<replica>
|
||||
<host>node1_2</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</default>
|
||||
</remote_servers>
|
||||
</clickhouse>
|
@ -0,0 +1,8 @@
|
||||
<clickhouse>
|
||||
<profiles>
|
||||
<default>
|
||||
<allow_experimental_refreshable_materialized_view>1</allow_experimental_refreshable_materialized_view>
|
||||
<session_timezone>Etc/UTC</session_timezone>
|
||||
</default>
|
||||
</profiles>
|
||||
</clickhouse>
|
641
tests/integration/test_refreshable_mat_view/test.py
Normal file
641
tests/integration/test_refreshable_mat_view/test.py
Normal file
@ -0,0 +1,641 @@
|
||||
import datetime
|
||||
import logging
|
||||
import time
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
from jinja2 import Environment, Template
|
||||
|
||||
import helpers.client
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.test_tools import wait_condition
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
node = cluster.add_instance(
|
||||
"node1_1",
|
||||
main_configs=["configs/remote_servers.xml"],
|
||||
user_configs=["configs/settings.xml"],
|
||||
with_zookeeper=True,
|
||||
stay_alive=True,
|
||||
macros={"shard": 1, "replica": 1},
|
||||
)
|
||||
node2 = cluster.add_instance(
|
||||
"node1_2",
|
||||
main_configs=["configs/remote_servers.xml"],
|
||||
user_configs=["configs/settings.xml"],
|
||||
with_zookeeper=True,
|
||||
stay_alive=True,
|
||||
macros={"shard": 1, "replica": 2},
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
yield cluster
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
"""
|
||||
### TESTS
|
||||
+1. Append mode
|
||||
2. Restart node and wait for restore
|
||||
+3. Simple functional testing: all values in refresh result correct (two and more rmv)
|
||||
+4. Combinations of intervals
|
||||
+5. Two (and more) rmv from single to single [APPEND]
|
||||
+6. ALTER rmv ALTER TABLE [db.]name MODIFY REFRESH EVERY|AFTER ... [RANDOMIZE FOR ...] [DEPENDS ON ...] [SETTINGS ...]
|
||||
+7. RMV without tgt table (automatic table) (check APPEND)
|
||||
|
||||
+8 DROP rmv
|
||||
+9 CREATE - DROP - CREATE - ALTER
|
||||
11. Long queries over refresh time (check settings)
|
||||
13. incorrect intervals (interval 1 sec, offset 1 minute)
|
||||
- OFFSET less than the period. 'EVERY 1 MONTH OFFSET 5 WEEK'
|
||||
- cases below
|
||||
+14. ALTER on cluster
|
||||
|
||||
15. write to distributed with / without APPEND
|
||||
17. Not existent TO table (ON CLUSTER)
|
||||
18. Not existent FROM table (ON CLUSTER)
|
||||
19. Not existent BOTH tables (ON CLUSTER)
|
||||
+20. retry failed
|
||||
21. overflow with wait test
|
||||
22. ON CLUSTER
|
||||
|
||||
+SYSTEM STOP|START|REFRESH|CANCEL VIEW
|
||||
+SYSTEM WAIT VIEW [db.]name
|
||||
|
||||
"""
|
||||
|
||||
|
||||
def j2_template(
|
||||
string: str,
|
||||
globals: Optional[dict] = None,
|
||||
filters: Optional[dict] = None,
|
||||
tests: Optional[dict] = None,
|
||||
) -> Template:
|
||||
def uppercase(value: str):
|
||||
return value.upper()
|
||||
|
||||
def lowercase(value: str):
|
||||
return value.lower()
|
||||
|
||||
def format_settings(items: dict):
|
||||
return ", ".join([f"{k}={v}" for k, v in items.items()])
|
||||
|
||||
# Create a custom environment and add the functions
|
||||
env = Environment(
|
||||
trim_blocks=False, lstrip_blocks=True, keep_trailing_newline=False
|
||||
)
|
||||
env.globals["uppercase"] = uppercase
|
||||
env.globals["lowercase"] = lowercase
|
||||
env.filters["format_settings"] = format_settings
|
||||
|
||||
if filters:
|
||||
env.filters.update(filters)
|
||||
if globals:
|
||||
env.globals.update(globals)
|
||||
if tests:
|
||||
env.tests.update(tests)
|
||||
|
||||
return env.from_string(string)
|
||||
|
||||
|
||||
def assert_same_values(lst: list):
|
||||
if not isinstance(lst, list):
|
||||
lst = list(lst)
|
||||
assert all(x == lst[0] for x in lst)
|
||||
|
||||
|
||||
RMV_TEMPLATE = """{{ refresh_interval }}
|
||||
{% if depends_on %}DEPENDS ON {{ depends_on|join(', ') }}{% endif %}
|
||||
{% if settings %}SETTINGS {{ settings|format_settings }}{% endif %}
|
||||
{% if with_append %}APPEND{% endif %}
|
||||
{% if to_clause %}TO {{ to_clause }}{% endif %}
|
||||
{% if table_clause %}{{ table_clause }}{% endif %}
|
||||
{% if empty %}EMPTY{% endif %}
|
||||
{% if select_query %} AS {{ select_query }}{% endif %}
|
||||
"""
|
||||
|
||||
CREATE_RMV = j2_template(
|
||||
"""CREATE MATERIALIZED VIEW
|
||||
{% if if_not_exists %}IF NOT EXISTS{% endif %}
|
||||
{% if db %}{{db}}.{% endif %}{{ table_name }}
|
||||
{% if on_cluster %}ON CLUSTER {{ on_cluster }}{% endif %}
|
||||
REFRESH
|
||||
"""
|
||||
+ RMV_TEMPLATE
|
||||
)
|
||||
|
||||
ALTER_RMV = j2_template(
|
||||
"""ALTER TABLE
|
||||
{% if db %}{{db}}.{% endif %}{{ table_name }}
|
||||
{% if on_cluster %}ON CLUSTER {{ on_cluster }}{% endif %}
|
||||
MODIFY REFRESH
|
||||
"""
|
||||
+ RMV_TEMPLATE
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
def module_setup_tables(started_cluster):
|
||||
node.query(f"DROP DATABASE IF EXISTS test_db")
|
||||
node.query(f"CREATE DATABASE IF NOT EXISTS test_db ON CLUSTER default")
|
||||
node.query("DROP TABLE IF EXISTS test_rmv ON CLUSTER default")
|
||||
node.query("DROP TABLE IF EXISTS src1 ON CLUSTER default")
|
||||
node.query("DROP TABLE IF EXISTS src2 ON CLUSTER default")
|
||||
node.query("DROP TABLE IF EXISTS tgt1 ON CLUSTER default")
|
||||
node.query("DROP TABLE IF EXISTS tgt2 ON CLUSTER default")
|
||||
node.query("DROP TABLE IF EXISTS test_rmv ON CLUSTER default")
|
||||
node.query("DROP TABLE IF EXISTS test_db.test_rmv ON CLUSTER default")
|
||||
|
||||
node.query(
|
||||
f"CREATE TABLE src1 ON CLUSTER default (a DateTime, b UInt64) ENGINE = Memory"
|
||||
)
|
||||
node.query(
|
||||
f"CREATE TABLE src2 ON CLUSTER default (a DateTime, b UInt64) ENGINE = Memory"
|
||||
)
|
||||
node.query(
|
||||
f"CREATE TABLE tgt1 ON CLUSTER default (a DateTime, b UInt64) ENGINE = MergeTree ORDER BY tuple()"
|
||||
)
|
||||
node.query(
|
||||
f"CREATE TABLE tgt2 ON CLUSTER default (a DateTime, b UInt64) ENGINE = Memory"
|
||||
)
|
||||
node.query(
|
||||
f"CREATE MATERIALIZED VIEW IF NOT EXISTS dummy_rmv ON CLUSTER default "
|
||||
f"REFRESH EVERY 10 HOUR engine Memory EMPTY AS select number as x from numbers(1)"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"select_query",
|
||||
[
|
||||
"SELECT now() as a, number as b FROM numbers(2)",
|
||||
"SELECT now() as a, b as b FROM src1",
|
||||
],
|
||||
)
|
||||
@pytest.mark.parametrize("with_append", [True, False])
|
||||
@pytest.mark.parametrize("empty", [True, False])
|
||||
def test_simple_append(
|
||||
module_setup_tables,
|
||||
fn_setup_tables,
|
||||
select_query,
|
||||
with_append,
|
||||
empty,
|
||||
):
|
||||
create_sql = CREATE_RMV.render(
|
||||
table_name="test_rmv",
|
||||
refresh_interval="EVERY 1 HOUR",
|
||||
to_clause="tgt1",
|
||||
select_query=select_query,
|
||||
with_append=with_append,
|
||||
empty=empty,
|
||||
)
|
||||
node.query(create_sql)
|
||||
rmv = get_rmv_info(node, "test_rmv", wait_status="Scheduled")
|
||||
assert rmv["exception"] is None
|
||||
|
||||
records = node.query("SELECT count() FROM test_rmv")
|
||||
if empty:
|
||||
assert records == "0\n"
|
||||
else:
|
||||
assert records == "2\n"
|
||||
|
||||
node.query(f"SYSTEM TEST VIEW test_rmv SET FAKE TIME '{rmv['next_refresh_time']}'")
|
||||
time.sleep(2)
|
||||
rmv2 = get_rmv_info(node, "test_rmv", wait_status="Scheduled")
|
||||
|
||||
assert rmv2["exception"] is None
|
||||
|
||||
records = node.query("SELECT count() FROM test_rmv")
|
||||
if empty:
|
||||
assert records == "2\n"
|
||||
|
||||
if not with_append:
|
||||
assert records == "2\n"
|
||||
|
||||
if with_append and not empty:
|
||||
assert records == "4\n"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("with_append", [True, False])
|
||||
@pytest.mark.parametrize("if_not_exists", [True, False])
|
||||
@pytest.mark.parametrize("on_cluster", [True, False])
|
||||
@pytest.mark.parametrize("depends_on", [None, ["default.dummy_rmv"]])
|
||||
@pytest.mark.parametrize("empty", [True, False])
|
||||
@pytest.mark.parametrize("database_name", [None, "test_db"])
|
||||
@pytest.mark.parametrize(
|
||||
"settings",
|
||||
[
|
||||
{},
|
||||
{
|
||||
"refresh_retries": "10",
|
||||
"refresh_retry_initial_backoff_ms": "10",
|
||||
"refresh_retry_max_backoff_ms": "20",
|
||||
},
|
||||
],
|
||||
)
|
||||
def test_alters(
|
||||
module_setup_tables,
|
||||
fn_setup_tables,
|
||||
with_append,
|
||||
if_not_exists,
|
||||
on_cluster,
|
||||
depends_on,
|
||||
empty,
|
||||
database_name,
|
||||
settings,
|
||||
):
|
||||
"""
|
||||
Check correctness of functional states of RMV after CREATE, DROP, ALTER, trigger of RMV, ...
|
||||
"""
|
||||
create_sql = CREATE_RMV.render(
|
||||
table_name="test_rmv",
|
||||
if_not_exists=if_not_exists,
|
||||
db=database_name,
|
||||
refresh_interval="EVERY 1 HOUR",
|
||||
depends_on=depends_on,
|
||||
to_clause="tgt1",
|
||||
select_query="SELECT * FROM src1",
|
||||
with_append=with_append,
|
||||
on_cluster="default" if on_cluster else None,
|
||||
settings=settings,
|
||||
)
|
||||
node.query(create_sql)
|
||||
|
||||
# Check same RMV is created on whole cluster
|
||||
def compare_DDL_on_all_nodes():
|
||||
show_create_all_nodes = cluster.query_all_nodes("SHOW CREATE test_rmv")
|
||||
|
||||
if not on_cluster:
|
||||
del show_create_all_nodes["node1_2"]
|
||||
|
||||
assert_same_values(show_create_all_nodes.values())
|
||||
|
||||
compare_DDL_on_all_nodes()
|
||||
|
||||
maybe_db = f"{database_name}." if database_name else ""
|
||||
node.query(
|
||||
f"DROP TABLE {maybe_db}test_rmv {'ON CLUSTER default' if on_cluster else ''}"
|
||||
)
|
||||
node.query(create_sql)
|
||||
compare_DDL_on_all_nodes()
|
||||
|
||||
show_create = node.query(f"SHOW CREATE {maybe_db}test_rmv")
|
||||
|
||||
alter_sql = ALTER_RMV.render(
|
||||
table_name="test_rmv",
|
||||
if_not_exists=if_not_exists,
|
||||
db=database_name,
|
||||
refresh_interval="EVERY 1 HOUR",
|
||||
depends_on=depends_on,
|
||||
# can't change select with alter
|
||||
# select_query="SELECT * FROM src1",
|
||||
with_append=with_append,
|
||||
on_cluster="default" if on_cluster else None,
|
||||
settings=settings,
|
||||
)
|
||||
|
||||
node.query(alter_sql)
|
||||
show_create_after_alter = node.query(f"SHOW CREATE {maybe_db}test_rmv")
|
||||
assert show_create == show_create_after_alter
|
||||
compare_DDL_on_all_nodes()
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def fn_setup_tables():
|
||||
node.query("DROP TABLE IF EXISTS src1 ON CLUSTER default")
|
||||
node.query("DROP TABLE IF EXISTS tgt1 ON CLUSTER default")
|
||||
node.query("DROP TABLE IF EXISTS test_rmv ON CLUSTER default")
|
||||
node.query("DROP TABLE IF EXISTS test_db.test_rmv ON CLUSTER default")
|
||||
|
||||
node.query(
|
||||
f"CREATE TABLE tgt1 ON CLUSTER default (a DateTime, b UInt64) ENGINE = MergeTree ORDER BY tuple()"
|
||||
)
|
||||
|
||||
node.query(
|
||||
f"CREATE TABLE src1 ON CLUSTER default (a DateTime, b UInt64) ENGINE = Memory"
|
||||
)
|
||||
node.query(f"INSERT INTO src1 VALUES ('2020-01-01', 1), ('2020-01-02', 2)")
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"append",
|
||||
[True, False],
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
"empty",
|
||||
[True, False],
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
"to_clause",
|
||||
[
|
||||
(None, "tgt1", "tgt1"),
|
||||
("Engine MergeTree ORDER BY tuple()", None, "test_rmv"),
|
||||
],
|
||||
)
|
||||
def test_real_wait_refresh(
|
||||
fn_setup_tables,
|
||||
append,
|
||||
empty,
|
||||
to_clause,
|
||||
):
|
||||
table_clause, to_clause_, tgt = to_clause
|
||||
|
||||
create_sql = CREATE_RMV.render(
|
||||
table_name="test_rmv",
|
||||
refresh_interval="EVERY 10 SECOND",
|
||||
to_clause=to_clause_,
|
||||
table_clause=table_clause,
|
||||
select_query="SELECT now() as a, b FROM src1",
|
||||
with_append=append,
|
||||
empty=empty,
|
||||
)
|
||||
node.query(create_sql)
|
||||
rmv = get_rmv_info(node, "test_rmv")
|
||||
|
||||
expected_rows = 0
|
||||
if empty:
|
||||
expect_rows(expected_rows, table=tgt)
|
||||
else:
|
||||
expected_rows += 2
|
||||
expect_rows(expected_rows, table=tgt)
|
||||
|
||||
rmv2 = get_rmv_info(
|
||||
node,
|
||||
"test_rmv",
|
||||
condition=lambda x: x["last_refresh_time"] == rmv["next_refresh_time"],
|
||||
# wait for refresh a little bit more than 10 seconds
|
||||
max_attempts=12,
|
||||
delay=1,
|
||||
)
|
||||
|
||||
if append:
|
||||
expected_rows += 2
|
||||
expect_rows(expected_rows, table=tgt)
|
||||
else:
|
||||
expect_rows(2, table=tgt)
|
||||
|
||||
assert rmv2["exception"] is None
|
||||
assert rmv2["status"] == "Scheduled"
|
||||
assert rmv2["last_success_time"] == rmv["next_refresh_time"]
|
||||
assert rmv2["last_refresh_time"] == rmv["next_refresh_time"]
|
||||
assert rmv2["retry"] == 0
|
||||
assert rmv2["read_rows"] == 2
|
||||
assert rmv2["read_bytes"] == 16
|
||||
assert rmv2["total_rows"] == 2
|
||||
assert rmv2["written_rows"] == 2
|
||||
assert rmv2["written_bytes"] == 24
|
||||
|
||||
node.query("SYSTEM STOP VIEW test_rmv")
|
||||
time.sleep(12)
|
||||
rmv3 = get_rmv_info(node, "test_rmv")
|
||||
# no refresh happen
|
||||
assert rmv3["status"] == "Disabled"
|
||||
|
||||
del rmv3["status"]
|
||||
del rmv2["status"]
|
||||
assert rmv3 == rmv2
|
||||
|
||||
node.query("SYSTEM START VIEW test_rmv")
|
||||
time.sleep(1)
|
||||
rmv4 = get_rmv_info(node, "test_rmv")
|
||||
|
||||
if append:
|
||||
expected_rows += 2
|
||||
expect_rows(expected_rows, table=tgt)
|
||||
else:
|
||||
expect_rows(2, table=tgt)
|
||||
|
||||
assert rmv4["exception"] is None
|
||||
assert rmv4["status"] == "Scheduled"
|
||||
assert rmv4["retry"] == 0
|
||||
assert rmv2["read_rows"] == 2
|
||||
assert rmv2["read_bytes"] == 16
|
||||
assert rmv2["total_rows"] == 2
|
||||
assert rmv2["written_rows"] == 2
|
||||
assert rmv2["written_bytes"] == 24
|
||||
|
||||
node.query("SYSTEM REFRESH VIEW test_rmv")
|
||||
time.sleep(1)
|
||||
if append:
|
||||
expected_rows += 2
|
||||
expect_rows(expected_rows, table=tgt)
|
||||
else:
|
||||
expect_rows(2, table=tgt)
|
||||
|
||||
|
||||
def get_rmv_info(
|
||||
node,
|
||||
table,
|
||||
condition=None,
|
||||
max_attempts=50,
|
||||
delay=0.3,
|
||||
wait_status=None,
|
||||
):
|
||||
def inner():
|
||||
rmv_info = node.query_with_retry(
|
||||
f"SELECT * FROM system.view_refreshes WHERE view='{table}'",
|
||||
check_callback=(
|
||||
(lambda r: r.iloc[0]["status"] == wait_status)
|
||||
if wait_status
|
||||
else (lambda x: True)
|
||||
),
|
||||
parse=True,
|
||||
).to_dict("records")[0]
|
||||
|
||||
rmv_info["next_refresh_time"] = parse_ch_datetime(rmv_info["next_refresh_time"])
|
||||
rmv_info["last_success_time"] = parse_ch_datetime(rmv_info["last_success_time"])
|
||||
rmv_info["last_refresh_time"] = parse_ch_datetime(rmv_info["last_refresh_time"])
|
||||
logging.info(rmv_info)
|
||||
return rmv_info
|
||||
|
||||
if condition:
|
||||
res = wait_condition(inner, condition, max_attempts=max_attempts, delay=delay)
|
||||
return res
|
||||
|
||||
res = inner()
|
||||
return res
|
||||
|
||||
|
||||
def parse_ch_datetime(date_str):
|
||||
if date_str is None:
|
||||
return None
|
||||
return datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def rmv_schedule_teardown():
|
||||
node.query("DROP TABLE IF EXISTS test_rmv")
|
||||
|
||||
|
||||
def expect_rows(rows, table="test_rmv"):
|
||||
inserted_data = node.query_with_retry(
|
||||
f"SELECT * FROM {table}",
|
||||
parse=True,
|
||||
check_callback=lambda x: len(x) == rows,
|
||||
retry_count=100,
|
||||
)
|
||||
assert len(inserted_data) == rows
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def fn2_setup_tables():
|
||||
node.query("DROP TABLE IF EXISTS tgt1 ON CLUSTER default")
|
||||
node.query("DROP TABLE IF EXISTS src1 ON CLUSTER default")
|
||||
node.query("DROP TABLE IF EXISTS test_rmv ON CLUSTER default")
|
||||
node.query("DROP TABLE IF EXISTS test_db.test_rmv ON CLUSTER default")
|
||||
|
||||
node.query(
|
||||
f"CREATE TABLE tgt1 ON CLUSTER default (a DateTime, b UInt64) ENGINE = MergeTree ORDER BY tuple()"
|
||||
)
|
||||
node.query(
|
||||
f"CREATE TABLE src1 ON CLUSTER default (a DateTime, b UInt64) ENGINE = Memory"
|
||||
)
|
||||
node.query(f"INSERT INTO src1 SELECT toDateTime(1) a, 1 b FROM numbers(100000000)")
|
||||
|
||||
|
||||
def test_long_query(fn2_setup_tables):
|
||||
if node.is_built_with_sanitizer():
|
||||
pytest.skip("Disabled for sanitizers")
|
||||
|
||||
create_sql = CREATE_RMV.render(
|
||||
table_name="test_rmv",
|
||||
refresh_interval="EVERY 1 HOUR",
|
||||
to_clause="tgt1",
|
||||
select_query="SELECT * FROM src1",
|
||||
with_append=False,
|
||||
empty=True,
|
||||
)
|
||||
node.query(create_sql)
|
||||
rmv = get_rmv_info(node, "test_rmv")
|
||||
node.query(f"SYSTEM TEST VIEW test_rmv SET FAKE TIME '{rmv['next_refresh_time']}'")
|
||||
|
||||
wait_seconds = 0
|
||||
progresses = []
|
||||
while wait_seconds < 30:
|
||||
rmv = get_rmv_info(node, "test_rmv")
|
||||
if rmv["progress"] == 1:
|
||||
break
|
||||
|
||||
time.sleep(0.01)
|
||||
wait_seconds += 0.01
|
||||
|
||||
# didn't start yet
|
||||
if rmv["status"] == "Scheduled" or rmv["progress"] == 0:
|
||||
continue
|
||||
|
||||
assert rmv["status"] == "Running"
|
||||
assert 0 < rmv["read_rows"] < 100000000
|
||||
assert 0 < rmv["read_bytes"] < 1200000000
|
||||
progresses.append(rmv["progress"])
|
||||
|
||||
assert rmv["progress"] == 1
|
||||
assert rmv["exception"] is None
|
||||
|
||||
assert any(0 < p < 1 for p in progresses)
|
||||
|
||||
|
||||
def test_long_query_cancel(fn2_setup_tables):
|
||||
if node.is_built_with_sanitizer():
|
||||
pytest.skip("Disabled for sanitizers")
|
||||
|
||||
create_sql = CREATE_RMV.render(
|
||||
table_name="test_rmv",
|
||||
refresh_interval="EVERY 5 SECONDS",
|
||||
to_clause="tgt1",
|
||||
select_query="SELECT now() as a, sleep(3) b",
|
||||
with_append=False,
|
||||
empty=True,
|
||||
settings={"refresh_retries": "0"},
|
||||
)
|
||||
node.query(create_sql)
|
||||
get_rmv_info(node, "test_rmv", delay=0.1, max_attempts=1000, wait_status="Running")
|
||||
|
||||
node.query("SYSTEM CANCEL VIEW test_rmv")
|
||||
rmv = get_rmv_info(node, "test_rmv", wait_status="Scheduled")
|
||||
assert rmv["status"] == "Scheduled"
|
||||
assert rmv["exception"] == "cancelled"
|
||||
assert rmv["last_success_time"] is None
|
||||
|
||||
assert node.query("SELECT count() FROM tgt1") == "0\n"
|
||||
|
||||
get_rmv_info(node, "test_rmv", delay=0.1, max_attempts=1000, wait_status="Running")
|
||||
get_rmv_info(
|
||||
node, "test_rmv", delay=0.1, max_attempts=1000, wait_status="Scheduled"
|
||||
)
|
||||
|
||||
assert node.query("SELECT count() FROM tgt1") == "1\n"
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def fn3_setup_tables():
|
||||
node.query("DROP TABLE IF EXISTS tgt1 ON CLUSTER default")
|
||||
node.query("DROP TABLE IF EXISTS test_rmv ON CLUSTER default")
|
||||
node.query("DROP TABLE IF EXISTS test_db.test_rmv ON CLUSTER default")
|
||||
|
||||
node.query(f"CREATE TABLE tgt1 ON CLUSTER default (a DateTime) ENGINE = Memory")
|
||||
|
||||
|
||||
def test_query_fail(fn3_setup_tables):
|
||||
if node.is_built_with_sanitizer():
|
||||
pytest.skip("Disabled for sanitizers")
|
||||
|
||||
create_sql = CREATE_RMV.render(
|
||||
table_name="test_rmv",
|
||||
refresh_interval="EVERY 1 HOUR",
|
||||
to_clause="tgt1",
|
||||
# Argument at index 1 for function throwIf must be constant
|
||||
select_query="SELECT throwIf(1, toString(rand())) a",
|
||||
with_append=False,
|
||||
empty=True,
|
||||
settings={
|
||||
"refresh_retries": "10",
|
||||
},
|
||||
)
|
||||
with pytest.raises(helpers.client.QueryRuntimeException) as exc:
|
||||
node.query(create_sql)
|
||||
assert "Argument at index 1 for function throwIf must be constant" in str(
|
||||
exc.value
|
||||
)
|
||||
assert (
|
||||
node.query(f"SELECT count() FROM system.view_refreshes WHERE view='test_rmv'")
|
||||
== "0\n"
|
||||
)
|
||||
assert (
|
||||
node.query(f"SELECT count() FROM system.tables WHERE name='test_rmv'") == "0\n"
|
||||
)
|
||||
|
||||
|
||||
def test_query_retry(fn3_setup_tables):
|
||||
if node.is_built_with_sanitizer():
|
||||
pytest.skip("Disabled for sanitizers")
|
||||
|
||||
create_sql = CREATE_RMV.render(
|
||||
table_name="test_rmv",
|
||||
refresh_interval="EVERY 2 SECOND",
|
||||
to_clause="tgt1",
|
||||
select_query="SELECT throwIf(1, '111') a",
|
||||
with_append=False,
|
||||
empty=True,
|
||||
settings={
|
||||
"refresh_retries": "10",
|
||||
"refresh_retry_initial_backoff_ms": "1",
|
||||
"refresh_retry_max_backoff_ms": "1",
|
||||
},
|
||||
)
|
||||
node.query(create_sql)
|
||||
rmv = get_rmv_info(
|
||||
node,
|
||||
"test_rmv",
|
||||
delay=0.1,
|
||||
max_attempts=1000,
|
||||
condition=lambda x: x["retry"] == 11,
|
||||
)
|
||||
assert rmv["retry"] == 11
|
||||
assert "FUNCTION_THROW_IF_VALUE_IS_NON_ZERO" in rmv["exception"]
|
Loading…
Reference in New Issue
Block a user