This commit is contained in:
Nikita Taranov 2024-09-18 18:33:55 +01:00
parent 839f06035f
commit 818aac02c6
5 changed files with 96 additions and 2 deletions

View File

@ -34,6 +34,7 @@ static constexpr auto DBMS_MIN_REVISION_WITH_AGGREGATE_FUNCTIONS_VERSIONING = 54
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = 1; static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = 1;
static constexpr auto DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3; static constexpr auto DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3;
static constexpr auto DBMS_PARALLEL_REPLICAS_MIN_VERSION_WITH_MARK_SEGMENT_SIZE_FIELD = 4;
static constexpr auto DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION = 4; static constexpr auto DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION = 4;
static constexpr auto DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS = 54453; static constexpr auto DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS = 54453;

View File

@ -136,7 +136,7 @@ void InitialAllRangesAnnouncement::serialize(WriteBuffer & out, UInt64 initiator
writeIntBinary(mode, out); writeIntBinary(mode, out);
description.serialize(out); description.serialize(out);
writeIntBinary(replica_num, out); writeIntBinary(replica_num, out);
if (initiator_protocol_version >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL) if (initiator_protocol_version >= DBMS_PARALLEL_REPLICAS_MIN_VERSION_WITH_MARK_SEGMENT_SIZE_FIELD)
writeIntBinary(mark_segment_size, out); writeIntBinary(mark_segment_size, out);
} }
@ -168,7 +168,7 @@ InitialAllRangesAnnouncement InitialAllRangesAnnouncement::deserialize(ReadBuffe
readIntBinary(replica_num, in); readIntBinary(replica_num, in);
size_t mark_segment_size = 128; size_t mark_segment_size = 128;
if (replica_protocol_version >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL) if (replica_protocol_version >= DBMS_PARALLEL_REPLICAS_MIN_VERSION_WITH_MARK_SEGMENT_SIZE_FIELD)
readIntBinary(mark_segment_size, in); readIntBinary(mark_segment_size, in);
return InitialAllRangesAnnouncement{mode, description, replica_num, mark_segment_size}; return InitialAllRangesAnnouncement{mode, description, replica_num, mark_segment_size};

View File

@ -0,0 +1,21 @@
<clickhouse>
<remote_servers>
<parallel_replicas>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>node0</host>
<port>9000</port>
</replica>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</parallel_replicas>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,72 @@
import pytest
import re
from helpers.cluster import ClickHouseCluster
from random import randint
cluster = ClickHouseCluster(__file__)
nodes = [
cluster.add_instance(
f"node{num}",
main_configs=["configs/remote_servers.xml"],
with_zookeeper=True,
macros={"replica": f"node{num}", "shard": "shard"},
)
for num in range(3)
]
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def _create_tables(table_name):
nodes[0].query(
f"DROP TABLE IF EXISTS {table_name} ON CLUSTER 'parallel_replicas'",
settings={"database_atomic_wait_for_drop_and_detach_synchronously": True},
)
# big number of granules + low total size in bytes = super tiny granules = big min_marks_per_task
# => big mark_segment_size will be chosen. it is not required to be big, just not equal to the default
nodes[0].query(
f"""
CREATE TABLE {table_name} ON CLUSTER 'parallel_replicas' (value Int64)
Engine=ReplicatedMergeTree('/test_parallel_replicas/shard/{table_name}', '{{replica}}')
ORDER BY ()
SETTINGS index_granularity = 1
"""
)
nodes[0].query(f"INSERT INTO {table_name} SELECT 42 FROM numbers(1000)")
nodes[0].query(f"SYSTEM SYNC REPLICA ON CLUSTER 'parallel_replicas' {table_name}")
# now mark_segment_size is part of the protocol and is communicated to the initiator.
# let's check that the correct value is actually used by the coordinator
def test_mark_segment_size_communicated_correctly(start_cluster):
table_name = "t"
_create_tables(table_name)
for local_plan in [0, 1]:
query_id = f"query_id_{randint(0, 1000000)}"
nodes[0].query(
f"SELECT sum(value) FROM {table_name}",
settings={
"allow_experimental_parallel_reading_from_replicas": 2,
"max_parallel_replicas": 100,
"cluster_for_parallel_replicas": "parallel_replicas",
"parallel_replicas_mark_segment_size": 0,
"parallel_replicas_local_plan": local_plan,
"query_id": query_id,
},
)
nodes[0].query("SYSTEM FLUSH LOGS")
log_line = nodes[0].grep_in_log(f"{query_id}.*Reading state is fully initialized")
assert re.search(r"mark_segment_size: (\d+)", log_line).group(1) == "16384"