mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-09 17:14:47 +00:00
impl
This commit is contained in:
parent
54f0a8c0e2
commit
42670a46d4
@ -2009,33 +2009,6 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons
|
|||||||
{
|
{
|
||||||
auto result = getAnalysisResult();
|
auto result = getAnalysisResult();
|
||||||
|
|
||||||
if (is_parallel_reading_from_replicas && context->canUseParallelReplicasOnInitiator()
|
|
||||||
&& context->getSettingsRef().parallel_replicas_local_plan)
|
|
||||||
{
|
|
||||||
CoordinationMode mode = CoordinationMode::Default;
|
|
||||||
switch (result.read_type)
|
|
||||||
{
|
|
||||||
case ReadFromMergeTree::ReadType::Default:
|
|
||||||
mode = CoordinationMode::Default;
|
|
||||||
break;
|
|
||||||
case ReadFromMergeTree::ReadType::InOrder:
|
|
||||||
mode = CoordinationMode::WithOrder;
|
|
||||||
break;
|
|
||||||
case ReadFromMergeTree::ReadType::InReverseOrder:
|
|
||||||
mode = CoordinationMode::ReverseOrder;
|
|
||||||
break;
|
|
||||||
case ReadFromMergeTree::ReadType::ParallelReplicas:
|
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Read type can't be ParallelReplicas on initiator");
|
|
||||||
}
|
|
||||||
|
|
||||||
chassert(number_of_current_replica.has_value());
|
|
||||||
chassert(all_ranges_callback.has_value());
|
|
||||||
|
|
||||||
/// initialize working set from local replica
|
|
||||||
all_ranges_callback.value()(
|
|
||||||
InitialAllRangesAnnouncement(mode, result.parts_with_ranges.getDescriptions(), number_of_current_replica.value()));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (enable_remove_parts_from_snapshot_optimization)
|
if (enable_remove_parts_from_snapshot_optimization)
|
||||||
{
|
{
|
||||||
/// Do not keep data parts in snapshot.
|
/// Do not keep data parts in snapshot.
|
||||||
|
@ -0,0 +1,33 @@
|
|||||||
|
<clickhouse>
|
||||||
|
<remote_servers>
|
||||||
|
<parallel_replicas>
|
||||||
|
<shard>
|
||||||
|
<internal_replication>false</internal_replication>
|
||||||
|
<replica>
|
||||||
|
<host>node0</host>
|
||||||
|
<port>9000</port>
|
||||||
|
</replica>
|
||||||
|
<replica>
|
||||||
|
<host>node1</host>
|
||||||
|
<port>9000</port>
|
||||||
|
</replica>
|
||||||
|
<replica>
|
||||||
|
<host>node2</host>
|
||||||
|
<port>9000</port>
|
||||||
|
</replica>
|
||||||
|
<replica>
|
||||||
|
<host>node3</host>
|
||||||
|
<port>9000</port>
|
||||||
|
</replica>
|
||||||
|
<replica>
|
||||||
|
<host>node4</host>
|
||||||
|
<port>9000</port>
|
||||||
|
</replica>
|
||||||
|
<replica>
|
||||||
|
<host>node5</host>
|
||||||
|
<port>9000</port>
|
||||||
|
</replica>
|
||||||
|
</shard>
|
||||||
|
</parallel_replicas>
|
||||||
|
</remote_servers>
|
||||||
|
</clickhouse>
|
@ -0,0 +1,74 @@
|
|||||||
|
import pytest
|
||||||
|
|
||||||
|
from helpers.cluster import ClickHouseCluster
|
||||||
|
|
||||||
|
cluster = ClickHouseCluster(__file__)
|
||||||
|
|
||||||
|
nodes = [
|
||||||
|
cluster.add_instance(
|
||||||
|
f"node{num}", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
|
||||||
|
)
|
||||||
|
for num in range(6)
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="module", autouse=True)
|
||||||
|
def start_cluster():
|
||||||
|
try:
|
||||||
|
cluster.start()
|
||||||
|
yield cluster
|
||||||
|
finally:
|
||||||
|
cluster.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
def _create_tables(table_name):
|
||||||
|
for idx, node in enumerate(nodes):
|
||||||
|
node.query(
|
||||||
|
f"DROP TABLE IF EXISTS {table_name}",
|
||||||
|
settings={"database_atomic_wait_for_drop_and_detach_synchronously": True},
|
||||||
|
)
|
||||||
|
|
||||||
|
node.query(
|
||||||
|
f"""
|
||||||
|
CREATE TABLE {table_name} (value Int64)
|
||||||
|
Engine=ReplicatedMergeTree('/test_parallel_replicas/shard/{table_name}', '{idx}')
|
||||||
|
ORDER BY ()
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
nodes[0].query(
|
||||||
|
f"INSERT INTO {table_name} SELECT * FROM numbers(1000)",
|
||||||
|
settings={"insert_deduplicate": 0},
|
||||||
|
)
|
||||||
|
nodes[0].query(f"SYSTEM SYNC REPLICA ON CLUSTER 'parallel_replicas' {table_name}")
|
||||||
|
|
||||||
|
for idx, node in enumerate(nodes):
|
||||||
|
node.query("SYSTEM STOP REPLICATED SENDS")
|
||||||
|
# the same data on all nodes except for a single value
|
||||||
|
node.query(
|
||||||
|
f"INSERT INTO {table_name} VALUES ({idx})",
|
||||||
|
settings={"insert_deduplicate": 0},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_number_of_marks_read(start_cluster):
|
||||||
|
if nodes[0].is_built_with_sanitizer():
|
||||||
|
pytest.skip("Disabled for sanitizers (too slow)")
|
||||||
|
|
||||||
|
table_name = "t"
|
||||||
|
_create_tables(table_name)
|
||||||
|
|
||||||
|
for idx, node in enumerate(nodes):
|
||||||
|
expected = 499500 + idx # sum of all integers 0..999 + idx
|
||||||
|
assert (
|
||||||
|
node.query(
|
||||||
|
"SELECT sum(value) FROM t",
|
||||||
|
settings={
|
||||||
|
"allow_experimental_parallel_reading_from_replicas": 2,
|
||||||
|
"max_parallel_replicas": 100,
|
||||||
|
"cluster_for_parallel_replicas": "parallel_replicas",
|
||||||
|
"parallel_replicas_local_plan": True,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
== f"{expected}\n"
|
||||||
|
)
|
Loading…
Reference in New Issue
Block a user