ClickHouse/tests/integration/test_backward_compatibility/test_aggregate_function_state.py
Nikita Taranov 7beb58b0cf
Optimize merge of uniqExact without_key (#43072)
* impl for uniqExact

* rm unused (read|write)Text methods

* fix style

* small fixes

* impl for variadic uniqExact

* refactor

* fix style

* more agressive inlining

* disable if max_threads=1

* small improvements

* review fixes

* Revert "rm unused (read|write)Text methods"

This reverts commit a7e7480584.

* encapsulate is_able_to_parallelize_merge in Data

* encapsulate is_exact & argument_is_tuple in Data
2022-11-17 13:19:02 +01:00

229 lines
7.2 KiB
Python

import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
with_zookeeper=False,
image="yandex/clickhouse-server",
tag="19.16.9.37",
stay_alive=True,
with_installed_binary=True,
)
node2 = cluster.add_instance(
"node2",
with_zookeeper=False,
image="yandex/clickhouse-server",
tag="19.16.9.37",
stay_alive=True,
with_installed_binary=True,
)
node3 = cluster.add_instance("node3", with_zookeeper=False)
node4 = cluster.add_instance("node4", with_zookeeper=False)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
# We will test that serialization of internal state of "avg" function is compatible between different versions.
# TODO Implement versioning of serialization format for aggregate function states.
# NOTE This test is too ad-hoc.
def test_backward_compatability_for_avg(start_cluster):
node1.query("create table tab (x UInt64) engine = Memory")
node2.query("create table tab (x UInt64) engine = Memory")
node3.query("create table tab (x UInt64) engine = Memory")
node4.query("create table tab (x UInt64) engine = Memory")
node1.query("INSERT INTO tab VALUES (1)")
node2.query("INSERT INTO tab VALUES (2)")
node3.query("INSERT INTO tab VALUES (3)")
node4.query("INSERT INTO tab VALUES (4)")
assert (
node1.query("SELECT avg(x) FROM remote('node{1..4}', default, tab)") == "2.5\n"
)
assert (
node2.query("SELECT avg(x) FROM remote('node{1..4}', default, tab)") == "2.5\n"
)
assert (
node3.query("SELECT avg(x) FROM remote('node{1..4}', default, tab)") == "2.5\n"
)
assert (
node4.query("SELECT avg(x) FROM remote('node{1..4}', default, tab)") == "2.5\n"
)
# Also check with persisted aggregate function state
node1.query("create table state (x AggregateFunction(avg, UInt64)) engine = Log")
node1.query(
"INSERT INTO state SELECT avgState(arrayJoin(CAST([1, 2, 3, 4] AS Array(UInt64))))"
)
assert node1.query("SELECT avgMerge(x) FROM state") == "2.5\n"
node1.restart_with_latest_version(fix_metadata=True)
assert node1.query("SELECT avgMerge(x) FROM state") == "2.5\n"
node1.query("drop table tab")
node1.query("drop table state")
node2.query("drop table tab")
node3.query("drop table tab")
node4.query("drop table tab")
@pytest.mark.parametrize("uniq_keys", [1000, 500000])
def test_backward_compatability_for_uniq_exact(start_cluster, uniq_keys):
node1.query(f"CREATE TABLE tab_{uniq_keys} (x UInt64) Engine = Memory")
node2.query(f"CREATE TABLE tab_{uniq_keys} (x UInt64) Engine = Memory")
node3.query(f"CREATE TABLE tab_{uniq_keys} (x UInt64) Engine = Memory")
node4.query(f"CREATE TABLE tab_{uniq_keys} (x UInt64) Engine = Memory")
node1.query(
f"INSERT INTO tab_{uniq_keys} SELECT number FROM numbers_mt(0, {uniq_keys})"
)
node2.query(
f"INSERT INTO tab_{uniq_keys} SELECT number FROM numbers_mt(1, {uniq_keys})"
)
node3.query(
f"INSERT INTO tab_{uniq_keys} SELECT number FROM numbers_mt(2, {uniq_keys})"
)
node4.query(
f"INSERT INTO tab_{uniq_keys} SELECT number FROM numbers_mt(3, {uniq_keys})"
)
assert (
node1.query(
f"SELECT uniqExact(x) FROM remote('node{{1..4}}', default, tab_{uniq_keys})"
)
== f"{uniq_keys + 3}\n"
)
assert (
node2.query(
f"SELECT uniqExact(x) FROM remote('node{{1..4}}', default, tab_{uniq_keys})"
)
== f"{uniq_keys + 3}\n"
)
assert (
node3.query(
f"SELECT uniqExact(x) FROM remote('node{{1..4}}', default, tab_{uniq_keys})"
)
== f"{uniq_keys + 3}\n"
)
assert (
node4.query(
f"SELECT uniqExact(x) FROM remote('node{{1..4}}', default, tab_{uniq_keys})"
)
== f"{uniq_keys + 3}\n"
)
# Also check with persisted aggregate function state
node1.query(
f"CREATE TABLE state_{uniq_keys} (x AggregateFunction(uniqExact, UInt64)) Engine = Log"
)
node1.query(
f"INSERT INTO state_{uniq_keys} SELECT uniqExactState(number) FROM numbers_mt({uniq_keys})"
)
assert (
node1.query(f"SELECT uniqExactMerge(x) FROM state_{uniq_keys}")
== f"{uniq_keys}\n"
)
node1.restart_with_latest_version()
assert (
node1.query(f"SELECT uniqExactMerge(x) FROM state_{uniq_keys}")
== f"{uniq_keys}\n"
)
node1.query(f"DROP TABLE state_{uniq_keys}")
node1.query(f"DROP TABLE tab_{uniq_keys}")
node2.query(f"DROP TABLE tab_{uniq_keys}")
node3.query(f"DROP TABLE tab_{uniq_keys}")
node4.query(f"DROP TABLE tab_{uniq_keys}")
@pytest.mark.parametrize("uniq_keys", [1000, 500000])
def test_backward_compatability_for_uniq_exact_variadic(start_cluster, uniq_keys):
node1.query(f"CREATE TABLE tab_{uniq_keys} (x UInt64, y UInt64) Engine = Memory")
node2.query(f"CREATE TABLE tab_{uniq_keys} (x UInt64, y UInt64) Engine = Memory")
node3.query(f"CREATE TABLE tab_{uniq_keys} (x UInt64, y UInt64) Engine = Memory")
node4.query(f"CREATE TABLE tab_{uniq_keys} (x UInt64, y UInt64) Engine = Memory")
node1.query(
f"INSERT INTO tab_{uniq_keys} SELECT number, number/2 FROM numbers_mt(0, {uniq_keys})"
)
node2.query(
f"INSERT INTO tab_{uniq_keys} SELECT number, number/2 FROM numbers_mt(1, {uniq_keys})"
)
node3.query(
f"INSERT INTO tab_{uniq_keys} SELECT number, number/2 FROM numbers_mt(2, {uniq_keys})"
)
node4.query(
f"INSERT INTO tab_{uniq_keys} SELECT number, number/2 FROM numbers_mt(3, {uniq_keys})"
)
assert (
node1.query(
f"SELECT uniqExact(x, y) FROM remote('node{{1..4}}', default, tab_{uniq_keys})"
)
== f"{uniq_keys + 3}\n"
)
assert (
node2.query(
f"SELECT uniqExact(x, y) FROM remote('node{{1..4}}', default, tab_{uniq_keys})"
)
== f"{uniq_keys + 3}\n"
)
assert (
node3.query(
f"SELECT uniqExact(x, y) FROM remote('node{{1..4}}', default, tab_{uniq_keys})"
)
== f"{uniq_keys + 3}\n"
)
assert (
node4.query(
f"SELECT uniqExact(x, y) FROM remote('node{{1..4}}', default, tab_{uniq_keys})"
)
== f"{uniq_keys + 3}\n"
)
# Also check with persisted aggregate function state
node1.query(
f"CREATE TABLE state_{uniq_keys} (x AggregateFunction(uniqExact, UInt64, UInt64)) Engine = Log"
)
node1.query(
f"INSERT INTO state_{uniq_keys} SELECT uniqExactState(number, intDiv(number,2)) FROM numbers_mt({uniq_keys})"
)
assert (
node1.query(f"SELECT uniqExactMerge(x) FROM state_{uniq_keys}")
== f"{uniq_keys}\n"
)
node1.restart_with_latest_version()
assert (
node1.query(f"SELECT uniqExactMerge(x) FROM state_{uniq_keys}")
== f"{uniq_keys}\n"
)
node1.query(f"DROP TABLE state_{uniq_keys}")
node1.query(f"DROP TABLE tab_{uniq_keys}")
node2.query(f"DROP TABLE tab_{uniq_keys}")
node3.query(f"DROP TABLE tab_{uniq_keys}")
node4.query(f"DROP TABLE tab_{uniq_keys}")