Slightly better code

This commit is contained in:
alesapin 2020-02-17 21:07:22 +03:00
parent 382f6ab720
commit 38450ccb6e
6 changed files with 162 additions and 120 deletions

View File

@ -0,0 +1,84 @@
#include <Storages/MergeTree/ReplicatedMergeTreeAltersSequence.h>
#include <cassert>
namespace DB
{
int ReplicatedMergeTreeAltersSequence::getHeadAlterVersion(std::lock_guard<std::mutex> & /*state_lock*/) const
{
/// If queue empty, than we don't have version
if (!queue_state.empty())
return queue_state.begin()->first;
return -1;
}
void ReplicatedMergeTreeAltersSequence::addMutationForAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/)
{
/// Metadata alter can be added before, or
/// maybe already finished if we startup after metadata alter was finished.
if (!queue_state.count(alter_version))
queue_state.emplace(alter_version, AlterState{.metadata_finished=true, .data_finished=false});
else
queue_state[alter_version].data_finished = false;
}
void ReplicatedMergeTreeAltersSequence::addMetadataAlter(
int alter_version, bool have_mutation, std::lock_guard<std::mutex> & /*state_lock*/)
{
if (!queue_state.count(alter_version))
queue_state.emplace(alter_version, AlterState{.metadata_finished=false, .data_finished=!have_mutation});
else /// Data alter can be added before.
queue_state[alter_version].metadata_finished = false;
}
void ReplicatedMergeTreeAltersSequence::finishMetadataAlter(int alter_version, std::unique_lock<std::mutex> & /*state_lock*/)
{
assert(!queue_state.empty());
assert(queue_state.begin()->first == alter_version);
/// If metadata stage finished (or was never added) than we can remove this alter
if (queue_state[alter_version].data_finished)
queue_state.erase(alter_version);
else
queue_state[alter_version].metadata_finished = true;
}
void ReplicatedMergeTreeAltersSequence::finishDataAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/)
{
/// queue can be empty after load of finished mutation without move of mutation pointer
if (queue_state.empty())
return;
/// Mutations may finish multiple times (for example, after server restart, before update of mutation pointer)
if (alter_version >= queue_state.begin()->first)
{
/// All alter versions bigger than head have to be present in queue.
assert(queue_state.count(alter_version));
if (queue_state[alter_version].metadata_finished)
queue_state.erase(alter_version);
else
queue_state[alter_version].data_finished = true;
}
}
bool ReplicatedMergeTreeAltersSequence::canExecuteDataAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/) const
{
if (queue_state.empty())
return true;
/// All versions smaller than head, can be executed
if (alter_version < queue_state.begin()->first)
return true;
return queue_state.at(alter_version).metadata_finished;
}
bool ReplicatedMergeTreeAltersSequence::canExecuteMetaAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/) const
{
if (queue_state.empty())
return true;
/// We can execute only alters of metadata which are in head.
return queue_state.begin()->first == alter_version;
}
}

View File

@ -0,0 +1,59 @@
#pragma once
#include <deque>
#include <mutex>
#include <map>
namespace DB
{
/// ALTERs in StorageReplicatedMergeTree have to be executed sequentially (one by one).
/// But ReplicatedMergeTreeQueue execute all entries almost concurrently. The only depency between
/// entries is data parts, but they are not suitable in alters case.
///
/// This class stores information about current alters in ReplicatedMergeTreeQueue, and control their order of execution.
/// All methods have to be called under ReplicatedMergeTreeQueue state lock.
class ReplicatedMergeTreeAltersSequence
{
private:
/// In general case alter consist of two stages
/// Alter data and alter metadata. First we alter storage metadata
/// and then we can apply corresponding data changes (MUTATE_PART).
/// After that, we can remove alter from this sequence (alter is processed).
struct AlterState
{
bool metadata_finished = false;
bool data_finished = false;
};
private:
/// alter_version -> AlterState.
std::map<int, AlterState> queue_state;
public:
/// Add mutation for alter (alter data stage).
void addMutationForAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/);
/// Add metadata for alter (alter metadata stage). If have_mutation=true, than we expect, that
/// corresponding mutation will be added.
void addMetadataAlter(int alter_version, bool have_mutation, std::lock_guard<std::mutex> & /*state_lock*/);
/// Finish metadata alter. If corresponding data alter finished, than we can remove
/// alter from sequence.
void finishMetadataAlter(int alter_version, std::unique_lock <std::mutex> & /*state_lock*/);
/// Finish data alter. If corresponding metadata alter finished, than we can remove
/// alter from sequence.
void finishDataAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/);
/// Check that we can execute this data alter. If it's metadata stage finished.
bool canExecuteDataAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/) const;
/// Check that we can execute metadata alter with version.
bool canExecuteMetaAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/) const;
/// Just returns smallest alter version in sequence (first entry)
int getHeadAlterVersion(std::lock_guard<std::mutex> & /*state_lock*/) const;
};
}

View File

@ -17,7 +17,7 @@ class WriteBuffer;
/// in patitions. We will mutatate all parts with left number less than this numbers. /// in patitions. We will mutatate all parts with left number less than this numbers.
/// ///
/// These entries processed separately from main replication /log, and produce other entries /// These entries processed separately from main replication /log, and produce other entries
/// -- MUTATE_PART. /// -- MUTATE_PART in main replication log.
struct ReplicatedMergeTreeMutationEntry struct ReplicatedMergeTreeMutationEntry
{ {
void writeText(WriteBuffer & out) const; void writeText(WriteBuffer & out) const;

View File

@ -152,7 +152,7 @@ void ReplicatedMergeTreeQueue::insertUnlocked(
if (entry->type == LogEntry::ALTER_METADATA) if (entry->type == LogEntry::ALTER_METADATA)
{ {
LOG_TRACE(log, "Adding alter metadata version " << entry->alter_version << " to the queue"); LOG_TRACE(log, "Adding alter metadata version " << entry->alter_version << " to the queue");
alter_chain.addMetadataAlter(entry->alter_version, entry->have_mutation, state_lock); alter_sequence.addMetadataAlter(entry->alter_version, entry->have_mutation, state_lock);
} }
} }
@ -231,7 +231,7 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
if (entry->type == LogEntry::ALTER_METADATA) if (entry->type == LogEntry::ALTER_METADATA)
{ {
LOG_TRACE(log, "Finishing metadata alter with version " << entry->alter_version); LOG_TRACE(log, "Finishing metadata alter with version " << entry->alter_version);
alter_chain.finishMetadataAlter(entry->alter_version, state_lock); alter_sequence.finishMetadataAlter(entry->alter_version, state_lock);
} }
} }
else else
@ -701,7 +701,7 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
if (entry->isAlterMutation() && entry->znode_name > mutation_pointer) if (entry->isAlterMutation() && entry->znode_name > mutation_pointer)
{ {
LOG_TRACE(log, "Adding mutation " << entry->znode_name << " with alter version " << entry->alter_version << " to the queue"); LOG_TRACE(log, "Adding mutation " << entry->znode_name << " with alter version " << entry->alter_version << " to the queue");
alter_chain.addMutationForAlter(entry->alter_version, state_lock); alter_sequence.addMutationForAlter(entry->alter_version, state_lock);
} }
} }
} }
@ -746,7 +746,7 @@ ReplicatedMergeTreeMutationEntryPtr ReplicatedMergeTreeQueue::removeMutation(
if (entry->isAlterMutation()) if (entry->isAlterMutation())
{ {
LOG_DEBUG(log, "Removed alter " << entry->alter_version << " because mutation " + entry->znode_name + " were killed."); LOG_DEBUG(log, "Removed alter " << entry->alter_version << " because mutation " + entry->znode_name + " were killed.");
alter_chain.finishDataAlter(entry->alter_version, state_lock); alter_sequence.finishDataAlter(entry->alter_version, state_lock);
} }
mutations_by_znode.erase(it); mutations_by_znode.erase(it);
@ -1059,9 +1059,9 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
/// corresponding alter_version. /// corresponding alter_version.
if (entry.type == LogEntry::ALTER_METADATA) if (entry.type == LogEntry::ALTER_METADATA)
{ {
if (!alter_chain.canExecuteMetaAlter(entry.alter_version, state_lock)) if (!alter_sequence.canExecuteMetaAlter(entry.alter_version, state_lock))
{ {
int head_alter = alter_chain.getHeadAlterVersion(state_lock); int head_alter = alter_sequence.getHeadAlterVersion(state_lock);
out_postpone_reason = "Cannot execute alter metadata with version: " + std::to_string(entry.alter_version) out_postpone_reason = "Cannot execute alter metadata with version: " + std::to_string(entry.alter_version)
+ " because another alter " + std::to_string(head_alter) + " because another alter " + std::to_string(head_alter)
+ " must be executed before"; + " must be executed before";
@ -1072,9 +1072,9 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
/// If this MUTATE_PART is part of alter modify/drop query, than we have to execute them one by one /// If this MUTATE_PART is part of alter modify/drop query, than we have to execute them one by one
if (entry.isAlterMutation()) if (entry.isAlterMutation())
{ {
if (!alter_chain.canExecuteDataAlter(entry.alter_version, state_lock)) if (!alter_sequence.canExecuteDataAlter(entry.alter_version, state_lock))
{ {
int head_alter = alter_chain.getHeadAlterVersion(state_lock); int head_alter = alter_sequence.getHeadAlterVersion(state_lock);
if (head_alter == entry.alter_version) if (head_alter == entry.alter_version)
out_postpone_reason = "Cannot execute alter data with version: " out_postpone_reason = "Cannot execute alter data with version: "
+ std::to_string(entry.alter_version) + " because metadata still not altered"; + std::to_string(entry.alter_version) + " because metadata still not altered";
@ -1182,7 +1182,8 @@ ReplicatedMergeTreeQueue::SelectedEntry ReplicatedMergeTreeQueue::selectEntryToP
if (shouldExecuteLogEntry(**it, (*it)->postpone_reason, merger_mutator, data, lock)) if (shouldExecuteLogEntry(**it, (*it)->postpone_reason, merger_mutator, data, lock))
{ {
entry = *it; entry = *it;
/// We gave a chance for the entry, move it to the tail of the queue /// We gave a chance for the entry, move it to the tail of the queue, after that
/// we move it to the end of the queue.
queue.splice(queue.end(), queue, it); queue.splice(queue.end(), queue, it);
break; break;
} }
@ -1209,6 +1210,8 @@ bool ReplicatedMergeTreeQueue::processEntry(
try try
{ {
/// We don't have any backoff for failed entries
/// we just count amount of tries for each ot them.
if (func(entry)) if (func(entry))
removeProcessedEntry(get_zookeeper(), entry); removeProcessedEntry(get_zookeeper(), entry);
} }
@ -1363,7 +1366,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
{ {
LOG_TRACE(log, "Marking mutation " << znode << " done because it is <= mutation_pointer (" << mutation_pointer << ")"); LOG_TRACE(log, "Marking mutation " << znode << " done because it is <= mutation_pointer (" << mutation_pointer << ")");
mutation.is_done = true; mutation.is_done = true;
alter_chain.finishDataAlter(mutation.entry->alter_version, lock); alter_sequence.finishDataAlter(mutation.entry->alter_version, lock);
if (mutation.parts_to_do.size() != 0) if (mutation.parts_to_do.size() != 0)
{ {
LOG_INFO(log, "Seems like we jumped over mutation " << znode << " when downloaded part with bigger mutation number." LOG_INFO(log, "Seems like we jumped over mutation " << znode << " when downloaded part with bigger mutation number."
@ -1409,7 +1412,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
if (entry->isAlterMutation()) if (entry->isAlterMutation())
{ {
LOG_TRACE(log, "Finishing data alter with version " << entry->alter_version << " for entry " << entry->znode_name); LOG_TRACE(log, "Finishing data alter with version " << entry->alter_version << " for entry " << entry->znode_name);
alter_chain.finishDataAlter(entry->alter_version, lock); alter_sequence.finishDataAlter(entry->alter_version, lock);
} }
} }
} }

View File

@ -9,7 +9,7 @@
#include <Storages/MergeTree/MergeTreeData.h> #include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeMutationStatus.h> #include <Storages/MergeTree/MergeTreeMutationStatus.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumAddedParts.h> #include <Storages/MergeTree/ReplicatedMergeTreeQuorumAddedParts.h>
#include <Storages/MergeTree/ReplicatedQueueAlterChain.h> #include <Storages/MergeTree/ReplicatedMergeTreeAltersSequence.h>
#include <Common/ZooKeeper/ZooKeeper.h> #include <Common/ZooKeeper/ZooKeeper.h>
#include <Core/BackgroundSchedulePool.h> #include <Core/BackgroundSchedulePool.h>
@ -132,7 +132,9 @@ private:
/// Provides only one simultaneous call to pullLogsToQueue. /// Provides only one simultaneous call to pullLogsToQueue.
std::mutex pull_logs_to_queue_mutex; std::mutex pull_logs_to_queue_mutex;
ReplicatedQueueAlterChain alter_chain; /// This sequence control ALTERs execution in replication queue.
/// We need it because alters have to be executed sequentially (one by one).
ReplicatedMergeTreeAltersSequence alter_sequence;
/// List of subscribers /// List of subscribers
/// A subscriber callback is called when an entry queue is deleted /// A subscriber callback is called when an entry queue is deleted

View File

@ -1,106 +0,0 @@
#pragma once
#include <deque>
#include <Common/Exception.h>
#include <IO/ReadHelpers.h>
#include <common/logger_useful.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
class ReplicatedQueueAlterChain
{
private:
struct AlterState
{
bool metadata_finished = false;
bool data_finished = false;
AlterState() = default;
AlterState(bool metadata_finished_, bool data_finished_)
: metadata_finished(metadata_finished_)
, data_finished(data_finished_)
{
}
};
private:
std::map<int, AlterState> queue_state;
public:
int getHeadAlterVersion(std::lock_guard<std::mutex> & /*state_lock*/) const
{
if (!queue_state.empty())
return queue_state.begin()->first;
return -1;
}
void addMutationForAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/)
{
if (!queue_state.count(alter_version))
queue_state.emplace(alter_version, AlterState{true, false});
else
queue_state[alter_version].data_finished = false;
}
void addMetadataAlter(int alter_version, bool have_mutation, std::lock_guard<std::mutex> & /*state_lock*/)
{
if (!queue_state.count(alter_version))
queue_state.emplace(alter_version, AlterState{false, !have_mutation});
else
queue_state[alter_version].metadata_finished = false;
}
void finishMetadataAlter(int alter_version, std::unique_lock <std::mutex> & /*state_lock*/)
{
assert(!queue_state.empty());
assert(queue_state.begin()->first == alter_version);
if (queue_state[alter_version].data_finished)
queue_state.erase(alter_version);
else
queue_state[alter_version].metadata_finished = true;
}
void finishDataAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/)
{
/// queue can be empty after load of finished mutation without move of mutation pointer
if (queue_state.empty())
return;
if (alter_version >= queue_state.begin()->first)
{
assert(queue_state.count(alter_version));
if (queue_state[alter_version].metadata_finished)
queue_state.erase(alter_version);
else
queue_state[alter_version].data_finished = true;
}
}
bool canExecuteDataAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/) const
{
if (!queue_state.count(alter_version))
return true;
return queue_state.at(alter_version).metadata_finished;
}
bool canExecuteMetaAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/) const
{
if (queue_state.empty())
return true;
return queue_state.begin()->first == alter_version;
}
};
}