Forward exception to client during alter

This commit is contained in:
alesapin 2020-07-22 15:36:19 +03:00
parent 3810a21451
commit de846e5e3c
10 changed files with 211 additions and 18 deletions

View File

@ -1,12 +1,15 @@
#pragma once
#include <Core/Types.h>
#include <Core/Names.h>
#include <optional>
#include <map>
namespace DB
{
struct MergeTreeMutationStatus
{
String id;
@ -25,4 +28,9 @@ struct MergeTreeMutationStatus
String latest_fail_reason;
};
/// Check mutation status and throw exception in case of error during mutation
/// (latest_fail_reason not empty) or if mutation was killed (status empty
/// optional). mutation_id passed separately, because status may be empty.
void checkMutationStatus(std::optional<MergeTreeMutationStatus> & status, const String & mutation_id);
}

View File

@ -1538,6 +1538,26 @@ void ReplicatedMergeTreeQueue::getInsertTimes(time_t & out_min_unprocessed_inser
}
std::optional<MergeTreeMutationStatus> ReplicatedMergeTreeQueue::getIncompleteMutationStatus(const String & znode_name) const
{
std::lock_guard lock(state_mutex);
auto it = mutations_by_znode.find(znode_name);
/// killed
if (it == mutations_by_znode.end())
return {};
const MutationStatus & status = it->second;
MergeTreeMutationStatus result
{
.is_done = status.is_done,
.latest_failed_part = status.latest_failed_part,
.latest_fail_time = status.latest_fail_time,
.latest_fail_reason = status.latest_fail_reason,
};
return result;
}
std::vector<MergeTreeMutationStatus> ReplicatedMergeTreeQueue::getMutationsStatus() const
{
std::lock_guard lock(state_mutex);

View File

@ -399,6 +399,12 @@ public:
/// Get information about the insertion times.
void getInsertTimes(time_t & out_min_unprocessed_insert_time, time_t & out_max_processed_insert_time) const;
/// Return empty optional if mutation was killed. Otherwise return partially
/// filled mutation status with information about error (latest_fail*) and
/// is_done.
std::optional<MergeTreeMutationStatus> getIncompleteMutationStatus(const String & znode_name) const;
std::vector<MergeTreeMutationStatus> getMutationsStatus() const;
void removeCurrentPartsFromMutations();

View File

@ -40,6 +40,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
extern const int INCORRECT_DATA;
extern const int CANNOT_ASSIGN_OPTIMIZE;
extern const int UNFINISHED;
}
namespace ActionLocks
@ -134,7 +135,7 @@ void StorageMergeTree::shutdown()
/// Unlock all waiting mutations
{
std::lock_guard<std::mutex> lock(mutation_wait_mutex);
std::lock_guard lock(mutation_wait_mutex);
mutation_wait_event.notify_all();
}
@ -370,6 +371,10 @@ public:
entry.latest_failed_part_info = future_part.parts.at(0)->info;
entry.latest_fail_time = time(nullptr);
entry.latest_fail_reason = exception_message;
{
std::lock_guard lock_mutation_wait(storage.mutation_wait_mutex);
storage.mutation_wait_event.notify_all();
}
}
}
@ -401,9 +406,22 @@ Int64 StorageMergeTree::startMutation(const MutationCommands & commands, String
void StorageMergeTree::waitForMutation(Int64 version, const String & file_name)
{
LOG_INFO(log, "Waiting mutation: {}", file_name);
auto check = [version, this]() { return shutdown_called || isMutationDone(version); };
{
auto check = [version, this]()
{
if (shutdown_called)
return true;
auto mutation_status = getIncompleteMutationStatus(version);
return !mutation_status || mutation_status->is_done || !mutation_status->latest_fail_reason.empty();
};
std::unique_lock lock(mutation_wait_mutex);
mutation_wait_event.wait(lock, check);
}
auto mutation_status = getIncompleteMutationStatus(version);
checkMutationStatus(mutation_status, file_name);
LOG_INFO(log, "Mutation {} done", file_name);
}
@ -432,20 +450,37 @@ bool comparator(const PartVersionWithName & f, const PartVersionWithName & s)
}
bool StorageMergeTree::isMutationDone(Int64 mutation_version) const
std::optional<MergeTreeMutationStatus> StorageMergeTree::getIncompleteMutationStatus(Int64 mutation_version) const
{
std::lock_guard lock(currently_processing_in_background_mutex);
auto it = current_mutations_by_version.find(mutation_version);
/// Killed
if (!current_mutations_by_version.count(mutation_version))
return true;
if (it == current_mutations_by_version.end())
return {};
MergeTreeMutationStatus result{.is_done = false};
const auto & mutation_entry = it->second;
auto data_parts = getDataPartsVector();
for (const auto & data_part : data_parts)
{
if (data_part->info.getDataVersion() < mutation_version)
return false;
return true;
{
if (!mutation_entry.latest_fail_reason.empty())
{
result.latest_failed_part = mutation_entry.latest_failed_part;
result.latest_fail_reason = mutation_entry.latest_fail_reason;
result.latest_fail_time = mutation_entry.latest_fail_time;
}
return result;
}
}
result.is_done = true;
return result;
}
@ -474,7 +509,6 @@ std::vector<MergeTreeMutationStatus> StorageMergeTree::getMutationsStatus() cons
std::vector<MergeTreeMutationStatus> result;
for (const auto & kv : current_mutations_by_version)
{
Int64 mutation_version = kv.first;
const MergeTreeMutationEntry & entry = kv.second;
const PartVersionWithName needle{mutation_version, ""};
@ -500,7 +534,7 @@ std::vector<MergeTreeMutationStatus> StorageMergeTree::getMutationsStatus() cons
entry.create_time,
block_numbers_map,
parts_to_do_names,
parts_to_do_names.empty(),
/* is_done = */parts_to_do_names.empty(),
entry.latest_failed_part,
entry.latest_fail_time,
entry.latest_fail_reason,
@ -833,7 +867,7 @@ bool StorageMergeTree::tryMutatePart()
/// Notify all, who wait for this or previous mutations
{
std::lock_guard<std::mutex> lock(mutation_wait_mutex);
std::lock_guard lock(mutation_wait_mutex);
mutation_wait_event.notify_all();
}
}

View File

@ -154,8 +154,10 @@ private:
void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context);
bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override;
/// Just checks versions of each active data part
bool isMutationDone(Int64 mutation_version) const;
/// Return empty optional if mutation was killed. Otherwise return partially
/// filled mutation status with information about error (latest_fail*) and
/// is_done.
std::optional<MergeTreeMutationStatus> getIncompleteMutationStatus(Int64 mutation_version) const;
void startBackgroundMovesIfNeeded() override;

View File

@ -348,7 +348,6 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas(
std::set<String> inactive_replicas;
for (const String & replica : replicas)
{
LOG_DEBUG(log, "Waiting for {} to apply mutation {}", replica, mutation_id);
while (!partial_shutdown_called)
@ -358,8 +357,7 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas(
Coordination::Stat exists_stat;
if (!getZooKeeper()->exists(zookeeper_path + "/mutations/" + mutation_id, &exists_stat, wait_event))
{
LOG_WARNING(log, "Mutation {} was killed or manually removed. Nothing to wait.", mutation_id);
return;
throw Exception(ErrorCodes::UNFINISHED, "Mutation {} was killed, manually removed or table was dropped", mutation_id);
}
auto zookeeper = getZooKeeper();
@ -387,8 +385,22 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas(
/// Replica can become inactive, so wait with timeout and recheck it
if (wait_event->tryWait(1000))
break;
auto mutation_status = queue.getIncompleteMutationStatus(mutation_id);
if (!mutation_status || !mutation_status->latest_fail_reason.empty())
break;
}
/// It maybe already removed from zk, but local in-memory mutations
/// state was not update.
if (!getZooKeeper()->exists(zookeeper_path + "/mutations/" + mutation_id))
{
throw Exception(ErrorCodes::UNFINISHED, "Mutation {} was killed, manually removed or table was dropped", mutation_id);
}
auto mutation_status = queue.getIncompleteMutationStatus(mutation_id);
checkMutationStatus(mutation_status, mutation_id);
if (partial_shutdown_called)
throw Exception("Mutation is not finished because table shutdown was called. It will be done after table restart.",
ErrorCodes::UNFINISHED);

View File

@ -0,0 +1,4 @@
42
Hello
42
Hello

View File

@ -0,0 +1,29 @@
DROP TABLE IF EXISTS mutation_table;
CREATE TABLE mutation_table
(
date Date,
key UInt64,
value String
)
ENGINE = MergeTree()
PARTITION BY date
ORDER BY tuple();
INSERT INTO mutation_table SELECT toDate('2019-10-01'), number, '42' FROM numbers(100);
INSERT INTO mutation_table SELECT toDate('2019-10-02'), number, 'Hello' FROM numbers(100);
SELECT distinct(value) FROM mutation_table ORDER BY value;
ALTER TABLE mutation_table MODIFY COLUMN value UInt64 SETTINGS mutations_sync = 2; --{serverError 341}
SELECT distinct(value) FROM mutation_table ORDER BY value; --{serverError 6}
KILL MUTATION where table = 'mutation_table' and database = currentDatabase();
ALTER TABLE mutation_table MODIFY COLUMN value String SETTINGS mutations_sync = 2;
SELECT distinct(value) FROM mutation_table ORDER BY value;
DROP TABLE IF EXISTS mutation_table;

View File

@ -0,0 +1,5 @@
Mutation 0000000000 was killed
Cannot parse string 'Hello' as UInt64
Cannot parse string 'Hello' as UInt64
42
Hello

View File

@ -0,0 +1,73 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS replicated_mutation_table"
$CLICKHOUSE_CLIENT --query "
CREATE TABLE replicated_mutation_table(
date Date,
key UInt64,
value String
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/mutation_table', '1')
ORDER BY tuple()
PARTITION BY date
"
$CLICKHOUSE_CLIENT --query "INSERT INTO replicated_mutation_table SELECT toDate('2019-10-02'), number, '42' FROM numbers(4)"
$CLICKHOUSE_CLIENT --query "INSERT INTO replicated_mutation_table SELECT toDate('2019-10-02'), number, 'Hello' FROM numbers(4)"
$CLICKHOUSE_CLIENT --query "ALTER TABLE replicated_mutation_table UPDATE key = key + 1 WHERE sleepEachRow(1) == 0 SETTINGS mutations_sync = 2" 2>&1 | grep -o 'Mutation 0000000000 was killed' | head -n 1 &
check_query="SELECT count() FROM system.mutations WHERE table='replicated_mutation_table' and database='$CLICKHOUSE_DATABASE' and mutation_id='0000000000'"
query_result=`$CLICKHOUSE_CLIENT --query="$check_query" 2>&1`
while [ "$query_result" != "1" ]
do
query_result=`$CLICKHOUSE_CLIENT --query="$check_query" 2>&1`
sleep 0.5
done
$CLICKHOUSE_CLIENT --query "KILL MUTATION WHERE table='replicated_mutation_table' and database='$CLICKHOUSE_DATABASE' and mutation_id='0000000000'" &> /dev/null
while [ "$query_result" != "0" ]
do
query_result=`$CLICKHOUSE_CLIENT --query="$check_query" 2>&1`
sleep 0.5
done
wait
$CLICKHOUSE_CLIENT --query "ALTER TABLE replicated_mutation_table MODIFY COLUMN value UInt64 SETTINGS replication_alter_partitions_sync = 2" 2>&1 | grep -o "Cannot parse string 'Hello' as UInt64" | head -n 1 &
check_query="SELECT count() FROM system.mutations WHERE table='replicated_mutation_table' and database='$CLICKHOUSE_DATABASE' and mutation_id='0000000001'"
query_result=`$CLICKHOUSE_CLIENT --query="$check_query" 2>&1`
while [ "$query_result" != "1" ]
do
query_result=`$CLICKHOUSE_CLIENT --query="$check_query" 2>&1`
sleep 0.5
done
wait
$CLICKHOUSE_CLIENT --query "KILL MUTATION WHERE table='replicated_mutation_table' and database='$CLICKHOUSE_DATABASE' AND mutation_id='0000000001'" &> /dev/null
while [ "$query_result" != "0" ]
do
query_result=`$CLICKHOUSE_CLIENT --query="$check_query" 2>&1`
sleep 0.5
done
$CLICKHOUSE_CLIENT --query "SELECT distinct(value) FROM replicated_mutation_table ORDER BY value" 2>&1 | grep -o "Cannot parse string 'Hello' as UInt64" | head -n 1
$CLICKHOUSE_CLIENT --query "ALTER TABLE replicated_mutation_table MODIFY COLUMN value String SETTINGS replication_alter_partitions_sync = 2"
$CLICKHOUSE_CLIENT --query "SELECT distinct(value) FROM replicated_mutation_table ORDER BY value"
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS replicated_mutation_table"