mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-28 10:31:57 +00:00
Merge pull request #15601 from alexelex/alexelex-master
Improvement of Quorum Inserts in ClickHouse
This commit is contained in:
commit
0fd007ad9e
@ -159,6 +159,7 @@ class IColumn;
|
||||
\
|
||||
M(UInt64, insert_quorum, 0, "For INSERT queries in the replicated table, wait writing for the specified number of replicas and linearize the addition of the data. 0 - disabled.", 0) \
|
||||
M(Milliseconds, insert_quorum_timeout, 600000, "", 0) \
|
||||
M(Bool, insert_quorum_parallel, false, "For quorum INSERT queries - enable to make parallel inserts without linearizability", 0) \
|
||||
M(UInt64, select_sequential_consistency, 0, "For SELECT queries from the replicated table, throw an exception if the replica does not have a chunk written with the quorum; do not read the parts that have not yet been written with the quorum.", 0) \
|
||||
M(UInt64, table_function_remote_max_addresses, 1000, "The maximum number of different shards and the maximum number of replicas of one shard in the `remote` function.", 0) \
|
||||
M(Milliseconds, read_backoff_min_latency_ms, 1000, "Setting to reduce the number of threads in case of slow reads. Pay attention only to reads that took at least that much time.", 0) \
|
||||
|
@ -39,12 +39,14 @@ ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream(
|
||||
size_t quorum_,
|
||||
size_t quorum_timeout_ms_,
|
||||
size_t max_parts_per_block_,
|
||||
bool quorum_parallel_,
|
||||
bool deduplicate_)
|
||||
: storage(storage_)
|
||||
, metadata_snapshot(metadata_snapshot_)
|
||||
, quorum(quorum_)
|
||||
, quorum_timeout_ms(quorum_timeout_ms_)
|
||||
, max_parts_per_block(max_parts_per_block_)
|
||||
, quorum_parallel(quorum_parallel_)
|
||||
, deduplicate(deduplicate_)
|
||||
, log(&Poco::Logger::get(storage.getLogName() + " (Replicated OutputStream)"))
|
||||
{
|
||||
@ -75,7 +77,6 @@ void ReplicatedMergeTreeBlockOutputStream::checkQuorumPrecondition(zkutil::ZooKe
|
||||
{
|
||||
quorum_info.status_path = storage.zookeeper_path + "/quorum/status";
|
||||
|
||||
std::future<Coordination::GetResponse> quorum_status_future = zookeeper->asyncTryGet(quorum_info.status_path);
|
||||
std::future<Coordination::GetResponse> is_active_future = zookeeper->asyncTryGet(storage.replica_path + "/is_active");
|
||||
std::future<Coordination::GetResponse> host_future = zookeeper->asyncTryGet(storage.replica_path + "/host");
|
||||
|
||||
@ -97,9 +98,9 @@ void ReplicatedMergeTreeBlockOutputStream::checkQuorumPrecondition(zkutil::ZooKe
|
||||
* If the quorum is reached, then the node is deleted.
|
||||
*/
|
||||
|
||||
auto quorum_status = quorum_status_future.get();
|
||||
if (quorum_status.error != Coordination::Error::ZNONODE)
|
||||
throw Exception("Quorum for previous write has not been satisfied yet. Status: " + quorum_status.data,
|
||||
String quorum_status;
|
||||
if (!quorum_parallel && zookeeper->tryGet(quorum_info.status_path, quorum_status))
|
||||
throw Exception("Quorum for previous write has not been satisfied yet. Status: " + quorum_status,
|
||||
ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE);
|
||||
|
||||
/// Both checks are implicitly made also later (otherwise there would be a race condition).
|
||||
@ -294,6 +295,9 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
|
||||
* which indicates that the quorum has been reached.
|
||||
*/
|
||||
|
||||
if (quorum_parallel)
|
||||
quorum_info.status_path = storage.zookeeper_path + "/quorum/parallel/" + part->name;
|
||||
|
||||
ops.emplace_back(
|
||||
zkutil::makeCreateRequest(
|
||||
quorum_info.status_path,
|
||||
@ -346,7 +350,6 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
|
||||
/// Used only for exception messages.
|
||||
block_number = part->info.min_block;
|
||||
|
||||
|
||||
/// Do not check for duplicate on commit to ZK.
|
||||
block_id_path.clear();
|
||||
}
|
||||
@ -466,14 +469,16 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
|
||||
if (is_already_existing_part)
|
||||
{
|
||||
/// We get duplicate part without fetch
|
||||
storage.updateQuorum(part->name);
|
||||
/// Check if this quorum insert is parallel or not
|
||||
if (zookeeper->exists(storage.zookeeper_path + "/quorum/parallel/" + part->name))
|
||||
storage.updateQuorum(part->name, true);
|
||||
else if (zookeeper->exists(storage.zookeeper_path + "/quorum/status"))
|
||||
storage.updateQuorum(part->name, false);
|
||||
}
|
||||
|
||||
/// We are waiting for quorum to be satisfied.
|
||||
LOG_TRACE(log, "Waiting for quorum");
|
||||
|
||||
String quorum_status_path = storage.zookeeper_path + "/quorum/status";
|
||||
|
||||
try
|
||||
{
|
||||
while (true)
|
||||
@ -482,7 +487,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
|
||||
|
||||
std::string value;
|
||||
/// `get` instead of `exists` so that `watch` does not leak if the node is no longer there.
|
||||
if (!zookeeper->tryGet(quorum_status_path, value, nullptr, event))
|
||||
if (!zookeeper->tryGet(quorum_info.status_path, value, nullptr, event))
|
||||
break;
|
||||
|
||||
ReplicatedMergeTreeQuorumEntry quorum_entry(value);
|
||||
|
@ -28,6 +28,7 @@ public:
|
||||
size_t quorum_,
|
||||
size_t quorum_timeout_ms_,
|
||||
size_t max_parts_per_block_,
|
||||
bool quorum_parallel_,
|
||||
bool deduplicate_);
|
||||
|
||||
Block getHeader() const override;
|
||||
@ -64,6 +65,7 @@ private:
|
||||
size_t quorum_timeout_ms;
|
||||
size_t max_parts_per_block;
|
||||
|
||||
bool quorum_parallel = false;
|
||||
bool deduplicate = true;
|
||||
bool last_block_is_duplicate = false;
|
||||
|
||||
|
@ -226,14 +226,32 @@ void ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart()
|
||||
String quorum_str;
|
||||
if (zookeeper->tryGet(storage.zookeeper_path + "/quorum/status", quorum_str))
|
||||
{
|
||||
ReplicatedMergeTreeQuorumEntry quorum_entry;
|
||||
quorum_entry.fromString(quorum_str);
|
||||
ReplicatedMergeTreeQuorumEntry quorum_entry(quorum_str);
|
||||
|
||||
if (!quorum_entry.replicas.count(storage.replica_name)
|
||||
&& zookeeper->exists(storage.replica_path + "/parts/" + quorum_entry.part_name))
|
||||
&& storage.getActiveContainingPart(quorum_entry.part_name))
|
||||
{
|
||||
LOG_WARNING(log, "We have part {} but we is not in quorum. Updating quorum. This shouldn't happen often.", quorum_entry.part_name);
|
||||
storage.updateQuorum(quorum_entry.part_name);
|
||||
storage.updateQuorum(quorum_entry.part_name, false);
|
||||
}
|
||||
}
|
||||
|
||||
Strings part_names;
|
||||
String parallel_quorum_parts_path = storage.zookeeper_path + "/quorum/parallel";
|
||||
if (zookeeper->tryGetChildren(parallel_quorum_parts_path, part_names) == Coordination::Error::ZOK)
|
||||
{
|
||||
for (auto & part_name : part_names)
|
||||
{
|
||||
if (zookeeper->tryGet(parallel_quorum_parts_path + "/" + part_name, quorum_str))
|
||||
{
|
||||
ReplicatedMergeTreeQuorumEntry quorum_entry(quorum_str);
|
||||
if (!quorum_entry.replicas.count(storage.replica_name)
|
||||
&& storage.getActiveContainingPart(part_name))
|
||||
{
|
||||
LOG_WARNING(log, "We have part {} but we is not in quorum. Updating quorum. This shouldn't happen often.", part_name);
|
||||
storage.updateQuorum(part_name, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -475,6 +475,7 @@ void StorageReplicatedMergeTree::createNewZooKeeperNodes()
|
||||
|
||||
/// Working with quorum.
|
||||
zookeeper->createIfNotExists(zookeeper_path + "/quorum", String());
|
||||
zookeeper->createIfNotExists(zookeeper_path + "/quorum/parallel", String());
|
||||
zookeeper->createIfNotExists(zookeeper_path + "/quorum/last_part", String());
|
||||
zookeeper->createIfNotExists(zookeeper_path + "/quorum/failed_parts", String());
|
||||
|
||||
@ -1705,15 +1706,24 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry)
|
||||
if (replica.empty())
|
||||
{
|
||||
Coordination::Stat quorum_stat;
|
||||
String quorum_path = zookeeper_path + "/quorum/status";
|
||||
String quorum_str = zookeeper->get(quorum_path, &quorum_stat);
|
||||
const String quorum_unparallel_path = zookeeper_path + "/quorum/status";
|
||||
const String quorum_parallel_path = zookeeper_path + "/quorum/parallel/" + entry.new_part_name;
|
||||
String quorum_str, quorum_path;
|
||||
ReplicatedMergeTreeQuorumEntry quorum_entry;
|
||||
|
||||
if (zookeeper->tryGet(quorum_unparallel_path, quorum_str, &quorum_stat))
|
||||
quorum_path = quorum_unparallel_path;
|
||||
else
|
||||
{
|
||||
quorum_str = zookeeper->get(quorum_parallel_path, &quorum_stat);
|
||||
quorum_path = quorum_parallel_path;
|
||||
}
|
||||
|
||||
quorum_entry.fromString(quorum_str);
|
||||
|
||||
if (quorum_entry.part_name == entry.new_part_name)
|
||||
{
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(quorum_path, quorum_stat.version));
|
||||
|
||||
auto part_info = MergeTreePartInfo::fromPartName(entry.new_part_name, format_version);
|
||||
|
||||
if (part_info.min_block != part_info.max_block)
|
||||
@ -3114,12 +3124,14 @@ String StorageReplicatedMergeTree::findReplicaHavingCoveringPart(
|
||||
|
||||
/** If a quorum is tracked for a part, update information about it in ZK.
|
||||
*/
|
||||
void StorageReplicatedMergeTree::updateQuorum(const String & part_name)
|
||||
void StorageReplicatedMergeTree::updateQuorum(const String & part_name, bool is_parallel)
|
||||
{
|
||||
auto zookeeper = getZooKeeper();
|
||||
|
||||
/// Information on which replicas a part has been added, if the quorum has not yet been reached.
|
||||
const String quorum_status_path = zookeeper_path + "/quorum/status";
|
||||
String quorum_status_path = zookeeper_path + "/quorum/status";
|
||||
if (is_parallel)
|
||||
quorum_status_path = zookeeper_path + "/quorum/parallel/" + part_name;
|
||||
/// The name of the previous part for which the quorum was reached.
|
||||
const String quorum_last_part_path = zookeeper_path + "/quorum/last_part";
|
||||
|
||||
@ -3129,9 +3141,7 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name)
|
||||
/// If there is no node, then all quorum INSERTs have already reached the quorum, and nothing is needed.
|
||||
while (zookeeper->tryGet(quorum_status_path, value, &stat))
|
||||
{
|
||||
ReplicatedMergeTreeQuorumEntry quorum_entry;
|
||||
quorum_entry.fromString(value);
|
||||
|
||||
ReplicatedMergeTreeQuorumEntry quorum_entry(value);
|
||||
if (quorum_entry.part_name != part_name)
|
||||
{
|
||||
/// The quorum has already been achieved. Moreover, another INSERT with a quorum has already started.
|
||||
@ -3147,6 +3157,8 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name)
|
||||
Coordination::Requests ops;
|
||||
Coordination::Responses responses;
|
||||
|
||||
if (!is_parallel)
|
||||
{
|
||||
Coordination::Stat added_parts_stat;
|
||||
String old_added_parts = zookeeper->get(quorum_last_part_path, &added_parts_stat);
|
||||
|
||||
@ -3163,6 +3175,10 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name)
|
||||
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(quorum_status_path, stat.version));
|
||||
ops.emplace_back(zkutil::makeSetRequest(quorum_last_part_path, new_added_parts, added_parts_stat.version));
|
||||
}
|
||||
else
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(quorum_status_path, stat.version));
|
||||
|
||||
auto code = zookeeper->tryMulti(ops, responses);
|
||||
|
||||
if (code == Coordination::Error::ZOK)
|
||||
@ -3391,7 +3407,25 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
|
||||
* If you do not have time, in case of losing the session, when you restart the server - see the `ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart` method.
|
||||
*/
|
||||
if (quorum)
|
||||
updateQuorum(part_name);
|
||||
{
|
||||
/// Check if this quorum insert is parallel or not
|
||||
if (zookeeper->exists(zookeeper_path + "/quorum/parallel/" + part_name))
|
||||
updateQuorum(part_name, true);
|
||||
else if (zookeeper->exists(zookeeper_path + "/quorum/status"))
|
||||
updateQuorum(part_name, false);
|
||||
}
|
||||
|
||||
/// merged parts that are still inserted with quorum. if it only contains one block, it hasn't been merged before
|
||||
if (part_info.level != 0 || part_info.mutation != 0)
|
||||
{
|
||||
Strings quorum_parts = zookeeper->getChildren(zookeeper_path + "/quorum/parallel");
|
||||
for (const String & quorum_part : quorum_parts)
|
||||
{
|
||||
auto quorum_part_info = MergeTreePartInfo::fromPartName(quorum_part, format_version);
|
||||
if (part_info.contains(quorum_part_info))
|
||||
updateQuorum(quorum_part, true);
|
||||
}
|
||||
}
|
||||
|
||||
merge_selecting_task->schedule();
|
||||
|
||||
@ -3674,6 +3708,7 @@ BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/,
|
||||
*this, metadata_snapshot, query_settings.insert_quorum,
|
||||
query_settings.insert_quorum_timeout.totalMilliseconds(),
|
||||
query_settings.max_partitions_per_insert_block,
|
||||
query_settings.insert_quorum_parallel,
|
||||
deduplicate);
|
||||
}
|
||||
|
||||
@ -4258,7 +4293,7 @@ PartitionCommandsResultInfo StorageReplicatedMergeTree::attachPartition(
|
||||
PartsTemporaryRename renamed_parts(*this, "detached/");
|
||||
MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, query_context, renamed_parts);
|
||||
|
||||
ReplicatedMergeTreeBlockOutputStream output(*this, metadata_snapshot, 0, 0, 0, false); /// TODO Allow to use quorum here.
|
||||
ReplicatedMergeTreeBlockOutputStream output(*this, metadata_snapshot, 0, 0, 0, false, false); /// TODO Allow to use quorum here.
|
||||
for (size_t i = 0; i < loaded_parts.size(); ++i)
|
||||
{
|
||||
String old_name = loaded_parts[i]->name;
|
||||
|
@ -497,7 +497,7 @@ private:
|
||||
|
||||
|
||||
/// With the quorum being tracked, add a replica to the quorum for the part.
|
||||
void updateQuorum(const String & part_name);
|
||||
void updateQuorum(const String & part_name, bool is_parallel);
|
||||
|
||||
/// Deletes info from quorum/last_part node for particular partition_id.
|
||||
void cleanLastPartNode(const String & partition_id);
|
||||
|
@ -0,0 +1 @@
|
||||
#!/usr/bin/env python3
|
102
tests/integration/test_quorum_inserts_parallel/test.py
Normal file
102
tests/integration/test_quorum_inserts_parallel/test.py
Normal file
@ -0,0 +1,102 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import pytest
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from multiprocessing.dummy import Pool
|
||||
from helpers.network import PartitionManager
|
||||
from helpers.client import QueryRuntimeException
|
||||
from helpers.test_tools import assert_eq_with_retry
|
||||
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
node1 = cluster.add_instance("node1", with_zookeeper=True)
|
||||
node2 = cluster.add_instance("node2", with_zookeeper=True)
|
||||
node3 = cluster.add_instance("node3", with_zookeeper=True)
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
global cluster
|
||||
try:
|
||||
cluster.start()
|
||||
yield cluster
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def test_parallel_quorum_actually_parallel(started_cluster):
|
||||
settings = {"insert_quorum": "3", "insert_quorum_parallel": "1"}
|
||||
for i, node in enumerate([node1, node2, node3]):
|
||||
node.query("CREATE TABLE r (a UInt64, b String) ENGINE=ReplicatedMergeTree('/test/r', '{num}') ORDER BY tuple()".format(num=i))
|
||||
|
||||
p = Pool(10)
|
||||
|
||||
def long_insert(node):
|
||||
node.query("INSERT INTO r SELECT number, toString(number) FROM numbers(5) where sleepEachRow(1) == 0", settings=settings)
|
||||
|
||||
job = p.apply_async(long_insert, (node1,))
|
||||
|
||||
node2.query("INSERT INTO r VALUES (6, '6')", settings=settings)
|
||||
assert node1.query("SELECT COUNT() FROM r") == "1\n"
|
||||
assert node2.query("SELECT COUNT() FROM r") == "1\n"
|
||||
assert node3.query("SELECT COUNT() FROM r") == "1\n"
|
||||
|
||||
node1.query("INSERT INTO r VALUES (7, '7')", settings=settings)
|
||||
assert node1.query("SELECT COUNT() FROM r") == "2\n"
|
||||
assert node2.query("SELECT COUNT() FROM r") == "2\n"
|
||||
assert node3.query("SELECT COUNT() FROM r") == "2\n"
|
||||
|
||||
job.get()
|
||||
|
||||
assert node1.query("SELECT COUNT() FROM r") == "7\n"
|
||||
assert node2.query("SELECT COUNT() FROM r") == "7\n"
|
||||
assert node3.query("SELECT COUNT() FROM r") == "7\n"
|
||||
p.close()
|
||||
p.join()
|
||||
|
||||
|
||||
def test_parallel_quorum_actually_quorum(started_cluster):
|
||||
for i, node in enumerate([node1, node2, node3]):
|
||||
node.query("CREATE TABLE q (a UInt64, b String) ENGINE=ReplicatedMergeTree('/test/q', '{num}') ORDER BY tuple()".format(num=i))
|
||||
|
||||
with PartitionManager() as pm:
|
||||
pm.partition_instances(node2, node1, port=9009)
|
||||
pm.partition_instances(node2, node3, port=9009)
|
||||
with pytest.raises(QueryRuntimeException):
|
||||
node1.query("INSERT INTO q VALUES(1, 'Hello')", settings={"insert_quorum": "3", "insert_quorum_parallel": "1", "insert_quorum_timeout": "3000"})
|
||||
|
||||
assert_eq_with_retry(node1, "SELECT COUNT() FROM q", "1")
|
||||
assert_eq_with_retry(node2, "SELECT COUNT() FROM q", "0")
|
||||
assert_eq_with_retry(node3, "SELECT COUNT() FROM q", "1")
|
||||
|
||||
node1.query("INSERT INTO q VALUES(2, 'wlrd')", settings={"insert_quorum": "2", "insert_quorum_parallel": "1", "insert_quorum_timeout": "3000"})
|
||||
|
||||
assert_eq_with_retry(node1, "SELECT COUNT() FROM q", "2")
|
||||
assert_eq_with_retry(node2, "SELECT COUNT() FROM q", "0")
|
||||
assert_eq_with_retry(node3, "SELECT COUNT() FROM q", "2")
|
||||
|
||||
def insert_value_to_node(node, settings):
|
||||
node.query("INSERT INTO q VALUES(3, 'Hi')", settings=settings)
|
||||
|
||||
p = Pool(2)
|
||||
res = p.apply_async(insert_value_to_node, (node1, {"insert_quorum": "3", "insert_quorum_parallel": "1", "insert_quorum_timeout": "60000"}))
|
||||
|
||||
assert_eq_with_retry(node1, "SELECT COUNT() FROM system.parts WHERE table == 'q' and active == 1", "3")
|
||||
assert_eq_with_retry(node3, "SELECT COUNT() FROM system.parts WHERE table == 'q' and active == 1", "3")
|
||||
assert_eq_with_retry(node2, "SELECT COUNT() FROM system.parts WHERE table == 'q' and active == 1", "0")
|
||||
|
||||
# Insert to the second to satisfy quorum
|
||||
insert_value_to_node(node2, {"insert_quorum": "3", "insert_quorum_parallel": "1"})
|
||||
|
||||
res.get()
|
||||
|
||||
assert_eq_with_retry(node1, "SELECT COUNT() FROM q", "3")
|
||||
assert_eq_with_retry(node2, "SELECT COUNT() FROM q", "1")
|
||||
assert_eq_with_retry(node3, "SELECT COUNT() FROM q", "3")
|
||||
|
||||
p.close()
|
||||
p.join()
|
||||
|
||||
node2.query("SYSTEM SYNC REPLICA q", timeout=10)
|
||||
assert_eq_with_retry(node2, "SELECT COUNT() FROM q", "3")
|
@ -0,0 +1,10 @@
|
||||
100 0 99 4950
|
||||
100 0 99 4950
|
||||
100 0 99 4950
|
||||
100 0 99 4950
|
||||
100 0 99 4950
|
||||
100 0 99 4950
|
||||
100 0 99 4950
|
||||
100 0 99 4950
|
||||
100 0 99 4950
|
||||
100 0 99 4950
|
36
tests/queries/0_stateless/01509_check_many_parallel_quorum_inserts.sh
Executable file
36
tests/queries/0_stateless/01509_check_many_parallel_quorum_inserts.sh
Executable file
@ -0,0 +1,36 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
set -e
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
NUM_REPLICAS=10
|
||||
|
||||
for i in $(seq 1 $NUM_REPLICAS); do
|
||||
$CLICKHOUSE_CLIENT -n -q "
|
||||
DROP TABLE IF EXISTS r$i;
|
||||
CREATE TABLE r$i (x UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test_01509/parallel_quorum_many', 'r$i') ORDER BY x;
|
||||
"
|
||||
done
|
||||
|
||||
function thread {
|
||||
$CLICKHOUSE_CLIENT --insert_quorum 5 --insert_quorum_parallel 1 --query "INSERT INTO r$1 SELECT $2"
|
||||
}
|
||||
|
||||
for i in $(seq 1 $NUM_REPLICAS); do
|
||||
for j in {0..9}; do
|
||||
a=$((($i - 1) * 10 + $j))
|
||||
thread $i $a &
|
||||
done
|
||||
done
|
||||
|
||||
wait
|
||||
|
||||
for i in $(seq 1 $NUM_REPLICAS); do
|
||||
$CLICKHOUSE_CLIENT -n -q "
|
||||
SYSTEM SYNC REPLICA r$i;
|
||||
SELECT count(), min(x), max(x), sum(x) FROM r$i;
|
||||
DROP TABLE IF EXISTS r$i;
|
||||
"
|
||||
done
|
@ -0,0 +1,2 @@
|
||||
5 1 5 15
|
||||
5 1 5 15
|
37
tests/queries/0_stateless/01509_check_parallel_quorum_inserts.sh
Executable file
37
tests/queries/0_stateless/01509_check_parallel_quorum_inserts.sh
Executable file
@ -0,0 +1,37 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
set -e
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
NUM_REPLICAS=2
|
||||
NUM_INSERTS=5
|
||||
|
||||
for i in $(seq 1 $NUM_REPLICAS); do
|
||||
$CLICKHOUSE_CLIENT -n -q "
|
||||
DROP TABLE IF EXISTS r$i;
|
||||
CREATE TABLE r$i (x UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test_01509/parallel_quorum', 'r$i') ORDER BY x;
|
||||
"
|
||||
done
|
||||
|
||||
$CLICKHOUSE_CLIENT -n -q "SYSTEM STOP REPLICATION QUEUES r2;"
|
||||
|
||||
function thread {
|
||||
$CLICKHOUSE_CLIENT --insert_quorum 2 --insert_quorum_parallel 1 --query "INSERT INTO r1 SELECT $1"
|
||||
}
|
||||
|
||||
for i in $(seq 1 $NUM_INSERTS); do
|
||||
thread $i &
|
||||
done
|
||||
|
||||
$CLICKHOUSE_CLIENT -n -q "SYSTEM START REPLICATION QUEUES r2;"
|
||||
|
||||
wait
|
||||
|
||||
for i in $(seq 1 $NUM_REPLICAS); do
|
||||
$CLICKHOUSE_CLIENT -n -q "
|
||||
SELECT count(), min(x), max(x), sum(x) FROM r$i;
|
||||
DROP TABLE IF EXISTS r$i;
|
||||
"
|
||||
done
|
@ -0,0 +1,4 @@
|
||||
all_0_1_1
|
||||
DownloadPart
|
||||
2
|
||||
2
|
66
tests/queries/0_stateless/01509_parallel_quorum_and_merge.sh
Executable file
66
tests/queries/0_stateless/01509_parallel_quorum_and_merge.sh
Executable file
@ -0,0 +1,66 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
set -e
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS parallel_q1"
|
||||
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS parallel_q2"
|
||||
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "CREATE TABLE parallel_q1 (x UInt64) ENGINE=ReplicatedMergeTree('/clickhouse/tables/test_01509/parallel_q', 'r1') ORDER BY tuple() SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 0"
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "CREATE TABLE parallel_q2 (x UInt64) ENGINE=ReplicatedMergeTree('/clickhouse/tables/test_01509/parallel_q', 'r2') ORDER BY tuple() SETTINGS always_fetch_merged_part = 1"
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "SYSTEM STOP REPLICATION QUEUES parallel_q2"
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "INSERT INTO parallel_q1 VALUES (1)"
|
||||
|
||||
$CLICKHOUSE_CLIENT --insert_quorum 2 --insert_quorum_parallel 1 --query="INSERT INTO parallel_q1 VALUES (2)" &
|
||||
|
||||
part_count=$($CLICKHOUSE_CLIENT --query="SELECT COUNT() FROM system.parts WHERE table='parallel_q1' and database='${CLICKHOUSE_DATABASE}'")
|
||||
|
||||
# Check part inserted locally
|
||||
while [[ $part_count != 2 ]]
|
||||
do
|
||||
sleep 0.1
|
||||
part_count=$($CLICKHOUSE_CLIENT --query="SELECT COUNT() FROM system.parts WHERE table='parallel_q1' and database='${CLICKHOUSE_DATABASE}'")
|
||||
done
|
||||
|
||||
$CLICKHOUSE_CLIENT --replication_alter_partitions_sync 0 -q "OPTIMIZE TABLE parallel_q1 FINAL"
|
||||
|
||||
# check part merged locally
|
||||
has_part=$($CLICKHOUSE_CLIENT --query="SELECT COUNT() FROM system.parts WHERE table='parallel_q1' and database='${CLICKHOUSE_DATABASE}' and name='all_0_1_1'")
|
||||
|
||||
while [[ $has_part != 1 ]]
|
||||
do
|
||||
sleep 0.1
|
||||
has_part=$($CLICKHOUSE_CLIENT --query="SELECT COUNT() FROM system.parts WHERE table='parallel_q1' and database='${CLICKHOUSE_DATABASE}' and name='all_0_1_1'")
|
||||
done
|
||||
|
||||
# check source parts removed locally
|
||||
active_parts_count=$($CLICKHOUSE_CLIENT --query="SELECT COUNT() FROM system.parts WHERE table='parallel_q1' and database='${CLICKHOUSE_DATABASE}' and active=1")
|
||||
|
||||
while [[ $active_parts_count != 1 ]]
|
||||
do
|
||||
sleep 0.1
|
||||
active_parts_count=$($CLICKHOUSE_CLIENT --query="SELECT COUNT() FROM system.parts WHERE table='parallel_q1' and database='${CLICKHOUSE_DATABASE}'")
|
||||
done
|
||||
|
||||
# download merged part
|
||||
$CLICKHOUSE_CLIENT -q "SYSTEM START REPLICATION QUEUES parallel_q2"
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "SYSTEM SYNC REPLICA parallel_q2"
|
||||
|
||||
# quorum satisfied even for merged part
|
||||
wait
|
||||
|
||||
$CLICKHOUSE_CLIENT --query="SYSTEM FLUSH LOGS"
|
||||
$CLICKHOUSE_CLIENT --query="SELECT name FROM system.parts WHERE table='parallel_q2' and database='${CLICKHOUSE_DATABASE}' and active=1 ORDER BY name"
|
||||
$CLICKHOUSE_CLIENT --query="SELECT event_type FROM system.part_log WHERE table='parallel_q2' and database='${CLICKHOUSE_DATABASE}' and part_name='all_0_1_1'"
|
||||
$CLICKHOUSE_CLIENT --query="SELECT COUNT() FROM parallel_q2"
|
||||
$CLICKHOUSE_CLIENT --query="SELECT COUNT() FROM parallel_q1"
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS parallel_q1"
|
||||
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS parallel_q2"
|
@ -0,0 +1,12 @@
|
||||
insert to two replicas works
|
||||
1
|
||||
1
|
||||
insert to single replica works
|
||||
3
|
||||
3
|
||||
deduplication works
|
||||
3
|
||||
3
|
||||
insert happened
|
||||
4
|
||||
4
|
@ -0,0 +1,66 @@
|
||||
DROP TABLE IF EXISTS r1;
|
||||
DROP TABLE IF EXISTS r2;
|
||||
|
||||
CREATE TABLE r1 (
|
||||
key UInt64, value String
|
||||
)
|
||||
ENGINE = ReplicatedMergeTree('/clickhouse/01509_no_repliacs', '1')
|
||||
ORDER BY tuple();
|
||||
|
||||
CREATE TABLE r2 (
|
||||
key UInt64, value String
|
||||
)
|
||||
ENGINE = ReplicatedMergeTree('/clickhouse/01509_no_repliacs', '2')
|
||||
ORDER BY tuple();
|
||||
|
||||
SET insert_quorum_parallel=1;
|
||||
|
||||
SET insert_quorum=3;
|
||||
INSERT INTO r1 VALUES(1, '1'); --{serverError 285}
|
||||
|
||||
SELECT 'insert to two replicas works';
|
||||
SET insert_quorum=2, insert_quorum_parallel=1;
|
||||
INSERT INTO r1 VALUES(1, '1');
|
||||
|
||||
SELECT COUNT() FROM r1;
|
||||
SELECT COUNT() FROM r2;
|
||||
|
||||
DETACH TABLE r2;
|
||||
|
||||
INSERT INTO r1 VALUES(2, '2'); --{serverError 285}
|
||||
|
||||
SET insert_quorum=1, insert_quorum_parallel=1;
|
||||
SELECT 'insert to single replica works';
|
||||
INSERT INTO r1 VALUES(2, '2');
|
||||
|
||||
ATTACH TABLE r2;
|
||||
|
||||
SET insert_quorum=2, insert_quorum_parallel=1;
|
||||
|
||||
INSERT INTO r1 VALUES(3, '3');
|
||||
|
||||
SELECT COUNT() FROM r1;
|
||||
SELECT COUNT() FROM r2;
|
||||
|
||||
SELECT 'deduplication works';
|
||||
INSERT INTO r2 VALUES(3, '3');
|
||||
|
||||
SELECT COUNT() FROM r1;
|
||||
SELECT COUNT() FROM r2;
|
||||
|
||||
SYSTEM STOP FETCHES r2;
|
||||
|
||||
SET insert_quorum_timeout=0;
|
||||
|
||||
INSERT INTO r1 VALUES (4, '4'); -- { serverError 319 }
|
||||
|
||||
SYSTEM START FETCHES r2;
|
||||
|
||||
SYSTEM SYNC REPLICA r2;
|
||||
|
||||
SELECT 'insert happened';
|
||||
SELECT COUNT() FROM r1;
|
||||
SELECT COUNT() FROM r2;
|
||||
|
||||
DROP TABLE IF EXISTS r1;
|
||||
DROP TABLE IF EXISTS r2;
|
Loading…
Reference in New Issue
Block a user