mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-15 20:24:07 +00:00
921518db0a
* add the query data deduplication excluding duplicated parts in MergeTree family engines. query deduplication is based on parts' UUID which should be enabled first with merge_tree setting assign_part_uuids=1 allow_experimental_query_deduplication setting is to enable part deduplication, default ot false. data part UUID is a mechanism of giving a data part a unique identifier. Having UUID and deduplication mechanism provides a potential of moving parts between shards preserving data consistency on a read path: duplicated UUIDs will cause root executor to retry query against on of the replica explicitly asking to exclude encountered duplicated fingerprints during a distributed query execution. NOTE: this implementation don't provide any knobs to lock part and hence its UUID. Any mutations/merge will update part's UUID. * add _part_uuid virtual column, allowing to use UUIDs in predicates. Signed-off-by: Aleksei Semiglazov <asemiglazov@cloudflare.com> address comments
166 lines
4.5 KiB
Python
166 lines
4.5 KiB
Python
import uuid
|
|
|
|
import pytest
|
|
|
|
from helpers.cluster import ClickHouseCluster
|
|
from helpers.test_tools import TSV
|
|
|
|
DUPLICATED_UUID = uuid.uuid4()
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
|
|
node1 = cluster.add_instance(
|
|
'node1',
|
|
main_configs=['configs/remote_servers.xml', 'configs/deduplication_settings.xml'])
|
|
|
|
node2 = cluster.add_instance(
|
|
'node2',
|
|
main_configs=['configs/remote_servers.xml', 'configs/deduplication_settings.xml'])
|
|
|
|
node3 = cluster.add_instance(
|
|
'node3',
|
|
main_configs=['configs/remote_servers.xml', 'configs/deduplication_settings.xml'])
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def started_cluster():
|
|
try:
|
|
cluster.start()
|
|
yield cluster
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def prepare_node(node, parts_uuid=None):
|
|
node.query("""
|
|
CREATE TABLE t(_prefix UInt8 DEFAULT 0, key UInt64, value UInt64)
|
|
ENGINE MergeTree()
|
|
ORDER BY tuple()
|
|
PARTITION BY _prefix
|
|
SETTINGS index_granularity = 1
|
|
""")
|
|
|
|
node.query("""
|
|
CREATE TABLE d AS t ENGINE=Distributed(test_cluster, default, t)
|
|
""")
|
|
|
|
# Stop merges while populating test data
|
|
node.query("SYSTEM STOP MERGES")
|
|
|
|
# Create 5 parts
|
|
for i in range(1, 6):
|
|
node.query("INSERT INTO t VALUES ({}, {}, {})".format(i, i, i))
|
|
|
|
node.query("DETACH TABLE t")
|
|
|
|
if parts_uuid:
|
|
for part, part_uuid in parts_uuid:
|
|
script = """
|
|
echo -n '{}' > /var/lib/clickhouse/data/default/t/{}/uuid.txt
|
|
""".format(part_uuid, part)
|
|
node.exec_in_container(["bash", "-c", script])
|
|
|
|
# Attach table back
|
|
node.query("ATTACH TABLE t")
|
|
|
|
# NOTE:
|
|
# due to absence of the ability to lock part, need to operate on parts with preventin merges
|
|
# node.query("SYSTEM START MERGES")
|
|
# node.query("OPTIMIZE TABLE t FINAL")
|
|
|
|
print(node.name)
|
|
print(node.query("SELECT name, uuid, partition FROM system.parts WHERE table = 't' AND active ORDER BY name"))
|
|
|
|
assert '5' == node.query("SELECT count() FROM system.parts WHERE table = 't' AND active").strip()
|
|
if parts_uuid:
|
|
for part, part_uuid in parts_uuid:
|
|
assert '1' == node.query(
|
|
"SELECT count() FROM system.parts WHERE table = 't' AND uuid = '{}' AND active".format(
|
|
part_uuid)).strip()
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def prepared_cluster(started_cluster):
|
|
print("duplicated UUID: {}".format(DUPLICATED_UUID))
|
|
prepare_node(node1, parts_uuid=[("3_3_3_0", DUPLICATED_UUID)])
|
|
prepare_node(node2, parts_uuid=[("3_3_3_0", DUPLICATED_UUID)])
|
|
prepare_node(node3)
|
|
|
|
|
|
def test_virtual_column(prepared_cluster):
|
|
# Part containing `key=3` has the same fingerprint on both nodes,
|
|
# we expect it to be included only once in the end result.;
|
|
# select query is using virtucal column _part_fingerprint to filter out part in one shard
|
|
expected = """
|
|
1 2
|
|
2 2
|
|
3 1
|
|
4 2
|
|
5 2
|
|
"""
|
|
assert TSV(expected) == TSV(node1.query("""
|
|
SELECT
|
|
key,
|
|
count() AS c
|
|
FROM d
|
|
WHERE ((_shard_num = 1) AND (_part_uuid != '{}')) OR (_shard_num = 2)
|
|
GROUP BY key
|
|
ORDER BY
|
|
key ASC
|
|
""".format(DUPLICATED_UUID)))
|
|
|
|
|
|
def test_with_deduplication(prepared_cluster):
|
|
# Part containing `key=3` has the same fingerprint on both nodes,
|
|
# we expect it to be included only once in the end result
|
|
expected = """
|
|
1 3
|
|
2 3
|
|
3 2
|
|
4 3
|
|
5 3
|
|
"""
|
|
assert TSV(expected) == TSV(node1.query(
|
|
"SET allow_experimental_query_deduplication=1; SELECT key, count() c FROM d GROUP BY key ORDER BY key"))
|
|
|
|
|
|
def test_no_merge_with_deduplication(prepared_cluster):
|
|
# Part containing `key=3` has the same fingerprint on both nodes,
|
|
# we expect it to be included only once in the end result.
|
|
# even with distributed_group_by_no_merge=1 the duplicated part should be excluded from the final result
|
|
expected = """
|
|
1 1
|
|
2 1
|
|
3 1
|
|
4 1
|
|
5 1
|
|
1 1
|
|
2 1
|
|
3 1
|
|
4 1
|
|
5 1
|
|
1 1
|
|
2 1
|
|
4 1
|
|
5 1
|
|
"""
|
|
assert TSV(expected) == TSV(node1.query("SELECT key, count() c FROM d GROUP BY key ORDER BY key", settings={
|
|
"allow_experimental_query_deduplication": 1,
|
|
"distributed_group_by_no_merge": 1,
|
|
}))
|
|
|
|
|
|
def test_without_deduplication(prepared_cluster):
|
|
# Part containing `key=3` has the same fingerprint on both nodes,
|
|
# but allow_experimental_query_deduplication is disabled,
|
|
# so it will not be excluded
|
|
expected = """
|
|
1 3
|
|
2 3
|
|
3 3
|
|
4 3
|
|
5 3
|
|
"""
|
|
assert TSV(expected) == TSV(node1.query(
|
|
"SET allow_experimental_query_deduplication=0; SELECT key, count() c FROM d GROUP BY key ORDER BY key"))
|