mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-11 17:02:25 +00:00
Merge pull request #44628 from ClickHouse/CurtizJ-fix-restart-after-quorum-insert
Fix restart after quorum insert
This commit is contained in:
commit
b1ed48a977
@ -576,6 +576,9 @@ void IMergeTreeDataPart::assertState(const std::initializer_list<MergeTreeDataPa
|
|||||||
states_str += ' ';
|
states_str += ' ';
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!states_str.empty())
|
||||||
|
states_str.pop_back();
|
||||||
|
|
||||||
throw Exception("Unexpected state of part " + getNameWithState() + ". Expected: " + states_str, ErrorCodes::NOT_FOUND_EXPECTED_DATA_PART);
|
throw Exception("Unexpected state of part " + getNameWithState() + ". Expected: " + states_str, ErrorCodes::NOT_FOUND_EXPECTED_DATA_PART);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5424,6 +5424,7 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(MergeTreeData:
|
|||||||
part->getDataPartStorage().commitTransaction();
|
part->getDataPartStorage().commitTransaction();
|
||||||
|
|
||||||
if (txn)
|
if (txn)
|
||||||
|
{
|
||||||
for (const auto & part : precommitted_parts)
|
for (const auto & part : precommitted_parts)
|
||||||
{
|
{
|
||||||
DataPartPtr covering_part;
|
DataPartPtr covering_part;
|
||||||
@ -5445,6 +5446,7 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(MergeTreeData:
|
|||||||
|
|
||||||
MergeTreeTransaction::addNewPartAndRemoveCovered(data.shared_from_this(), part, covered_parts, txn);
|
MergeTreeTransaction::addNewPartAndRemoveCovered(data.shared_from_this(), part, covered_parts, txn);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
MergeTreeData::WriteAheadLogPtr wal;
|
MergeTreeData::WriteAheadLogPtr wal;
|
||||||
auto get_inited_wal = [&] ()
|
auto get_inited_wal = [&] ()
|
||||||
|
@ -539,7 +539,7 @@ std::vector<String> ReplicatedMergeTreeSinkImpl<async_insert>::commitPart(
|
|||||||
///
|
///
|
||||||
/// metadata_snapshot->check(part->getColumns());
|
/// metadata_snapshot->check(part->getColumns());
|
||||||
|
|
||||||
String temporary_part_relative_path = part->getDataPartStorage().getPartDirectory();
|
const String temporary_part_relative_path = part->getDataPartStorage().getPartDirectory();
|
||||||
|
|
||||||
/// There is one case when we need to retry transaction in a loop.
|
/// There is one case when we need to retry transaction in a loop.
|
||||||
/// But don't do it too many times - just as defensive measure.
|
/// But don't do it too many times - just as defensive measure.
|
||||||
@ -820,6 +820,14 @@ std::vector<String> ReplicatedMergeTreeSinkImpl<async_insert>::commitPart(
|
|||||||
part->name);
|
part->name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto rename_part_to_temporary = [&temporary_part_relative_path, &transaction, &part]()
|
||||||
|
{
|
||||||
|
transaction.rollbackPartsToTemporaryState();
|
||||||
|
|
||||||
|
part->is_temp = true;
|
||||||
|
part->renameTo(temporary_part_relative_path, false);
|
||||||
|
};
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
ThreadFuzzer::maybeInjectSleep();
|
ThreadFuzzer::maybeInjectSleep();
|
||||||
@ -828,11 +836,7 @@ std::vector<String> ReplicatedMergeTreeSinkImpl<async_insert>::commitPart(
|
|||||||
}
|
}
|
||||||
catch (const Exception &)
|
catch (const Exception &)
|
||||||
{
|
{
|
||||||
transaction.rollbackPartsToTemporaryState();
|
rename_part_to_temporary();
|
||||||
|
|
||||||
part->is_temp = true;
|
|
||||||
part->renameTo(temporary_part_relative_path, false);
|
|
||||||
|
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -906,10 +910,7 @@ std::vector<String> ReplicatedMergeTreeSinkImpl<async_insert>::commitPart(
|
|||||||
|
|
||||||
/// We will try to add this part again on the new iteration as it's just a new part.
|
/// We will try to add this part again on the new iteration as it's just a new part.
|
||||||
/// So remove it from storage parts set immediately and transfer state to temporary.
|
/// So remove it from storage parts set immediately and transfer state to temporary.
|
||||||
transaction.rollbackPartsToTemporaryState();
|
rename_part_to_temporary();
|
||||||
|
|
||||||
part->is_temp = true;
|
|
||||||
part->renameTo(temporary_part_relative_path, false);
|
|
||||||
|
|
||||||
if constexpr (async_insert)
|
if constexpr (async_insert)
|
||||||
{
|
{
|
||||||
@ -930,9 +931,21 @@ std::vector<String> ReplicatedMergeTreeSinkImpl<async_insert>::commitPart(
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
else if (multi_code == Coordination::Error::ZNODEEXISTS && failed_op_path == quorum_info.status_path)
|
else if (multi_code == Coordination::Error::ZNODEEXISTS && failed_op_path == quorum_info.status_path)
|
||||||
|
{
|
||||||
|
try
|
||||||
{
|
{
|
||||||
storage.unlockSharedData(*part, zookeeper);
|
storage.unlockSharedData(*part, zookeeper);
|
||||||
transaction.rollback();
|
}
|
||||||
|
catch (const zkutil::KeeperException & e)
|
||||||
|
{
|
||||||
|
/// suppress this exception since need to rename part to temporary next
|
||||||
|
LOG_DEBUG(log, "Unlocking shared data failed during error handling: code={} message={}", e.code, e.message());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Part was not committed to keeper
|
||||||
|
/// So make it temporary to avoid its resurrection on restart
|
||||||
|
rename_part_to_temporary();
|
||||||
|
|
||||||
throw Exception("Another quorum insert has been already started", ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE);
|
throw Exception("Another quorum insert has been already started", ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -0,0 +1,6 @@
|
|||||||
|
10 0 9 45
|
||||||
|
10 0 9 45
|
||||||
|
10 0 9 45
|
||||||
|
10 0 9 45
|
||||||
|
10 0 9 45
|
||||||
|
10 0 9 45
|
@ -0,0 +1,46 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
# Tags: replica, no-replicated-database, no-parallel
|
||||||
|
# Tag no-replicated-database: Fails due to additional replicas or shards
|
||||||
|
|
||||||
|
set -e
|
||||||
|
|
||||||
|
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||||
|
# shellcheck source=../shell_config.sh
|
||||||
|
. "$CURDIR"/../shell_config.sh
|
||||||
|
|
||||||
|
NUM_REPLICAS=6
|
||||||
|
|
||||||
|
for i in $(seq 1 $NUM_REPLICAS); do
|
||||||
|
$CLICKHOUSE_CLIENT -n -q "
|
||||||
|
DROP TABLE IF EXISTS r$i SYNC;
|
||||||
|
CREATE TABLE r$i (x UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/r', 'r$i') ORDER BY x;
|
||||||
|
"
|
||||||
|
done
|
||||||
|
|
||||||
|
valid_exceptions_to_retry='Quorum for previous write has not been satisfied yet|Another quorum insert has been already started|Unexpected logical error while adding block'
|
||||||
|
|
||||||
|
function thread {
|
||||||
|
for x in {0..9}; do
|
||||||
|
while true; do
|
||||||
|
$CLICKHOUSE_CLIENT --query "DETACH TABLE r$1"
|
||||||
|
$CLICKHOUSE_CLIENT --query "ATTACH TABLE r$1"
|
||||||
|
$CLICKHOUSE_CLIENT --insert_quorum 3 --insert_quorum_parallel 0 --insert_keeper_fault_injection_probability=0 --query "INSERT INTO r$1 SELECT $x" 2>&1 | grep -qE "$valid_exceptions_to_retry" || break
|
||||||
|
done
|
||||||
|
done
|
||||||
|
}
|
||||||
|
|
||||||
|
for i in $(seq 1 $NUM_REPLICAS); do
|
||||||
|
thread $i &
|
||||||
|
done
|
||||||
|
|
||||||
|
wait
|
||||||
|
|
||||||
|
for i in $(seq 1 $NUM_REPLICAS); do
|
||||||
|
$CLICKHOUSE_CLIENT -n -q "
|
||||||
|
SYSTEM SYNC REPLICA r$i;
|
||||||
|
SELECT count(), min(x), max(x), sum(x) FROM r$i;"
|
||||||
|
done
|
||||||
|
|
||||||
|
for i in $(seq 1 $NUM_REPLICAS); do
|
||||||
|
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS r$i SYNC;"
|
||||||
|
done
|
Loading…
Reference in New Issue
Block a user