mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-15 10:52:30 +00:00
90 lines
3.5 KiB
Python
90 lines
3.5 KiB
Python
import time
|
|
|
|
import pytest
|
|
|
|
from helpers.cluster import ClickHouseCluster
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
|
|
instance_test_mutations = cluster.add_instance(
|
|
"test_mutations_with_projection",
|
|
main_configs=["configs/config.xml"],
|
|
user_configs=["configs/users.xml"],
|
|
)
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def started_cluster():
|
|
try:
|
|
cluster.start()
|
|
instance_test_mutations.query(
|
|
"""
|
|
CREATE TABLE video_log
|
|
(
|
|
`datetime` DateTime, -- 20,000 records per second
|
|
`user_id` UInt64, -- Cardinality == 100,000,000
|
|
`device_id` UInt64, -- Cardinality == 200,000,000
|
|
`video_id` UInt64, -- Cardinality == 100,00000
|
|
`domain` LowCardinality(String), -- Cardinality == 100
|
|
`bytes` UInt64, -- Ranging from 128 to 1152
|
|
`duration` UInt64, -- Ranging from 100 to 400
|
|
PROJECTION p_norm (SELECT datetime, device_id, bytes, duration ORDER BY device_id),
|
|
PROJECTION p_agg (SELECT toStartOfHour(datetime) AS hour, domain, sum(bytes), avg(duration) GROUP BY hour, domain)
|
|
)
|
|
ENGINE = MergeTree
|
|
PARTITION BY toDate(datetime) -- Daily partitioning
|
|
ORDER BY (user_id, device_id, video_id) -- Can only favor one column here
|
|
SETTINGS index_granularity = 1000;
|
|
"""
|
|
)
|
|
|
|
instance_test_mutations.query(
|
|
"""CREATE TABLE rng (`user_id_raw` UInt64, `device_id_raw` UInt64, `video_id_raw` UInt64, `domain_raw` UInt64, `bytes_raw` UInt64, `duration_raw` UInt64) ENGINE = GenerateRandom(1024);"""
|
|
)
|
|
|
|
instance_test_mutations.query(
|
|
"""INSERT INTO video_log SELECT toUnixTimestamp(toDateTime(today())) + (rowNumberInAllBlocks() / 20000), user_id_raw % 100000000 AS user_id, device_id_raw % 200000000 AS device_id, video_id_raw % 100000000 AS video_id, domain_raw % 100, (bytes_raw % 1024) + 128, (duration_raw % 300) + 100 FROM rng LIMIT 500000;"""
|
|
)
|
|
|
|
instance_test_mutations.query("""OPTIMIZE TABLE video_log FINAL;""")
|
|
|
|
yield cluster
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def test_mutations_with_multi_level_merge_of_projections(started_cluster):
|
|
try:
|
|
instance_test_mutations.query(
|
|
"""ALTER TABLE video_log UPDATE bytes = bytes + 10086 WHERE 1;"""
|
|
)
|
|
|
|
def count_and_changed():
|
|
return instance_test_mutations.query(
|
|
"SELECT count(), countIf(bytes > 10000) FROM video_log SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT CSV"
|
|
).splitlines()
|
|
|
|
all_done = False
|
|
for wait_times_for_mutation in range(
|
|
100
|
|
): # wait for replication 80 seconds max
|
|
time.sleep(0.8)
|
|
|
|
if count_and_changed() == ["500000,500000"]:
|
|
all_done = True
|
|
break
|
|
|
|
print(
|
|
instance_test_mutations.query(
|
|
"SELECT mutation_id, command, parts_to_do, is_done, latest_failed_part, latest_fail_reason, parts_to_do_names FROM system.mutations WHERE table = 'video_log' SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT TSVWithNames"
|
|
)
|
|
)
|
|
|
|
assert (count_and_changed(), all_done) == (["500000,500000"], True)
|
|
assert instance_test_mutations.query(
|
|
f"SELECT DISTINCT arraySort(projections) FROM system.parts WHERE table = 'video_log' SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT TSVRaw"
|
|
).splitlines() == ["['p_agg','p_norm']"]
|
|
|
|
finally:
|
|
instance_test_mutations.query(f"""DROP TABLE video_log""")
|