This commit is contained in:
alesapin 2020-02-13 14:38:04 +03:00
parent e39b6dff54
commit 45ebf08925
5 changed files with 32 additions and 186 deletions

View File

@ -24,7 +24,6 @@ ReplicatedMergeTreeQueue::ReplicatedMergeTreeQueue(StorageReplicatedMergeTree &
, format_version(storage.format_version)
, current_parts(format_version)
, virtual_parts(format_version)
, alter_sequence(&Logger::get(storage_.getStorageID().table_name))
{}
@ -140,8 +139,6 @@ void ReplicatedMergeTreeQueue::insertUnlocked(
else
queue.push_front(entry);
entries_in_queue.insert(entry->znode_name);
if (entry->type == LogEntry::GET_PART)
{
inserts_by_time.insert(entry);
@ -153,21 +150,7 @@ void ReplicatedMergeTreeQueue::insertUnlocked(
}
}
if (entry->type == LogEntry::ALTER_METADATA)
{
LOG_DEBUG(log, "ADDING METADATA ENTRY WITH ALTER VERSION:" << entry->alter_version);
//for (auto & log_entry : entries_in_queue)
//{
// LOG_DEBUG(log, "LogEntry:" << log_entry);
//}
alter_sequence.addMetadataAlter(entry->alter_version, state_lock);
}
if (entry->type == LogEntry::MUTATE_PART && entry->alter_version != -1)
{
LOG_DEBUG(log, "ADDING DATA ENTRY WITH ALTER VERSION:" << entry->alter_version << " FOR PART:" << entry->source_parts[0] << " to " << entry->getVirtualPartNames()[0]);
//LOG_DEBUG(log, "ADDING DATA ENTRY WITH ALTER VERSION:" << entry->alter_version);
//std::cerr << "INSERT MUTATE PART:" << entry->alter_version << std::endl;
}
}
@ -243,16 +226,7 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
}
if (entry->type == LogEntry::ALTER_METADATA)
{
LOG_DEBUG(log, "FIN ALTER FOR PART with ALTER VERSION:" << entry->alter_version);
//std::cerr << "Alter have mutation:" << entry->have_mutation << std::endl;
alter_sequence.finishMetadataAlter(entry->alter_version, entry->have_mutation, state_lock);
}
if (entry->type == LogEntry::MUTATE_PART)
{
LOG_DEBUG(log, "FIN MUTATION FOR PART:" << entry->source_parts[0] << " with ALTER VERSION:" << entry->alter_version);
}
}
else
{
@ -260,25 +234,20 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
{
/// Because execution of the entry is unsuccessful, `virtual_part_name` will never appear
/// so we won't need to mutate it.
LOG_DEBUG(log, "REMOVING PART FROM MUTATIONS:" << virtual_part_name);
removePartFromMutations(virtual_part_name);
}
}
entries_in_queue.erase(entry->znode_name);
}
void ReplicatedMergeTreeQueue::removePartFromMutations(const String & part_name)
{
LOG_DEBUG(log, "Removing part from mutations:" << part_name);
//LOG_DEBUG(log, "Removing part from mutations:" << part_name);
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
auto in_partition = mutations_by_partition.find(part_info.partition_id);
if (in_partition == mutations_by_partition.end())
{
LOG_DEBUG(log, "Not found partition in mutations for part:" << part_name);
return;
}
bool some_mutations_are_probably_done = false;
@ -287,7 +256,7 @@ void ReplicatedMergeTreeQueue::removePartFromMutations(const String & part_name)
{
MutationStatus & status = *it->second;
LOG_DEBUG(log, "Removing part name:" << part_name << " from mutation:" << status.entry->znode_name << " block number :" << status.entry->block_numbers.begin()->second);
//LOG_DEBUG(log, "Removing part name:" << part_name << " from mutation:" << status.entry->znode_name << " block number :" << status.entry->block_numbers.begin()->second);
status.parts_to_do.removePartAndCoveredParts(part_name);
if (status.parts_to_do.size() == 0)
some_mutations_are_probably_done = true;
@ -626,7 +595,6 @@ Names ReplicatedMergeTreeQueue::getCurrentPartNamesToMutate(ReplicatedMergeTreeM
void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback)
{
std::lock_guard lock(update_mutations_mutex);
//std::cerr << "UPdating mutations\n";
Strings entries_in_zk = zookeeper->getChildrenWatch(zookeeper_path + "/mutations", nullptr, watch_callback);
StringSet entries_in_zk_set(entries_in_zk.begin(), entries_in_zk.end());
@ -696,7 +664,6 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
for (const ReplicatedMergeTreeMutationEntryPtr & entry : new_mutations)
{
LOG_DEBUG(log, "PROCESSING MUTATION:" << entry->znode_name);
auto & mutation = mutations_by_znode.emplace(entry->znode_name, MutationStatus(entry, format_version))
.first->second;
@ -729,7 +696,7 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
some_mutations_are_probably_done = true;
if (entry->alter_version != -1)
alter_sequence.addMutationForAlter(entry->alter_version, entry->block_numbers, state_lock);
alter_sequence.addMutationForAlter(entry->alter_version, state_lock);
}
}
@ -995,13 +962,6 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
|| entry.type == LogEntry::GET_PART
|| entry.type == LogEntry::MUTATE_PART)
{
//if (!entry.actual_new_part_name.empty()
// && !alter_sequence.canExecuteGetEntry(entry.actual_new_part_name, format_version, state_lock))
// return false;
//if (!entry.new_part_name.empty() && !alter_sequence.canExecuteGetEntry(entry.new_part_name, format_version, state_lock))
// return false;
for (const String & new_part_name : entry.getBlockingPartNames())
{
if (!isNotCoveredByFuturePartsImpl(new_part_name, out_postpone_reason, state_lock))
@ -1082,50 +1042,21 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
}
if (entry.type == LogEntry::ALTER_METADATA)
{ //std::cerr << "Should we execute alter:";
LOG_DEBUG(log, "Should we execute alter entry:" << entry.znode_name << "\n"<< entry.toString());
LOG_DEBUG(log, "We are in front:" << (entry.znode_name == *entries_in_queue.begin()));
for (auto & log_entry : entries_in_queue)
{
LOG_DEBUG(log, "LogEntry:" << log_entry);
}
for (auto & log_entry : queue)
{
LOG_DEBUG(log, "LogEntryData:" << log_entry->znode_name << "\n" << log_entry->toString());
}
//std::cerr << alter_sequence.canExecuteMetadataAlter(entry.alter_version, state_lock) << std::endl;
{
if (!alter_sequence.canExecuteMetaAlter(entry.alter_version, state_lock))
{
out_postpone_reason
= "Cannot execute alter metadata with because head smallest node is " + *entries_in_queue.begin() + " but we are " + entry.znode_name;
out_postpone_reason = "Alter is not started, because more old alter is executing right now";
return false;
}
else
{
LOG_DEBUG(log, "YESSS");
}
}
if (entry.type == LogEntry::MUTATE_PART && entry.alter_version != -1)
{
//std::cerr << "Should we execute mutation:";
//std::cerr << alter_sequence.canExecuteDataAlter(entry.alter_version, state_lock) << std::endl;
LOG_DEBUG(log, "Should we execute mutation entry:" << entry.znode_name << "\n" << entry.toString());
if (!alter_sequence.canExecuteDataAlter(entry.alter_version, state_lock))
{
LOG_DEBUG(log, "NOOOO");
out_postpone_reason = "Cannot execute alter data with version: " + std::to_string(entry.alter_version);
out_postpone_reason = "Cannot execute alter data with version: " + std::to_string(entry.alter_version) + " because metadata alter is not finished yet";
return false;
}
else
{
LOG_DEBUG(log, "YUESS");
}
}
return true;

View File

@ -73,8 +73,6 @@ private:
*/
Queue queue;
StringSet entries_in_queue;
InsertsByTime inserts_by_time;
time_t min_unprocessed_insert_time = 0;
time_t max_processed_insert_time = 0;
@ -348,12 +346,6 @@ public:
/// Locks queue's mutex.
bool addFuturePartIfNotCoveredByThem(const String & part_name, LogEntry & entry, String & reject_reason);
bool checkCanFetchPart(const String & part_name)
{
std::lock_guard lock(state_mutex);
return alter_sequence.canExecuteGetEntry(part_name, format_version, lock);
}
/// A blocker that stops selects from the queue
ActionBlocker actions_blocker;

View File

@ -19,128 +19,72 @@ class AlterSequence
private:
struct AlterInQueue
{
std::map<String, Int64> block_numbers;
bool metadata_finished = false;
bool data_finished = false;
AlterInQueue() = default;
AlterInQueue(const std::map<String, Int64> & block_numbers_, bool metadata_finished_)
: block_numbers(block_numbers_)
, metadata_finished(metadata_finished_)
AlterInQueue(bool metadata_finished_, bool data_finished_)
: metadata_finished(metadata_finished_)
, data_finished(data_finished_)
{
}
};
Poco::Logger * log;
public:
AlterSequence(Poco::Logger * log_)
: log(log_)
{
}
std::map<int, AlterInQueue> queue_state;
public:
bool empty() const {
return queue_state.empty();
}
void addMutationForAlter(int alter_version, const std::map<String, Int64> & block_numbers, std::lock_guard<std::mutex> & /*state_lock*/)
void addMutationForAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/)
{
LOG_DEBUG(log, "Adding mutation with alter version:" << alter_version);
if (queue_state.count(alter_version))
queue_state[alter_version].block_numbers = block_numbers;
if (!queue_state.count(alter_version))
queue_state.emplace(alter_version, AlterInQueue(true, false));
else
queue_state.emplace(alter_version, AlterInQueue(block_numbers, true));
queue_state[alter_version].data_finished = false;
}
void addMetadataAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/)
{
//LOG_DEBUG(log, "Adding meta with alter version:" << alter_version);
if (!queue_state.count(alter_version))
queue_state.emplace(alter_version, AlterInQueue({}, false));
queue_state.emplace(alter_version, AlterInQueue(false, true));
else
queue_state[alter_version].metadata_finished = false;
}
bool canExecuteGetEntry(const String & part_name, MergeTreeDataFormatVersion format_version, std::lock_guard<std::mutex> & /*state_lock*/) const
void finishMetadataAlter(int alter_version, bool have_mutation, std::unique_lock <std::mutex> & /*state_lock*/)
{
if (empty())
return true;
assert(!queue_state.empty());
assert(queue_state.begin()->first == alter_version);
MergeTreePartInfo info = MergeTreePartInfo::fromPartName(part_name, format_version);
LOG_DEBUG(log, "Checking can fetch:" << part_name);
for (const auto & [block_number, state] : queue_state)
{
LOG_DEBUG(log, "Looking at alter:" << block_number << " with part name:" << part_name);
if (!state.block_numbers.empty())
{
LOG_DEBUG(log, "Block number:" << block_number << " has part name " << part_name << " version " << state.block_numbers.at(info.partition_id) << " metadata is done:" << state.metadata_finished);
if (!state.metadata_finished)
return info.getDataVersion() < state.block_numbers.at(info.partition_id) && info.max_block < state.block_numbers.at(info.partition_id);
else
return info.getDataVersion() <= state.block_numbers.at(info.partition_id) && info.max_block <= state.block_numbers.at(info.partition_id);
}
}
//LOG_DEBUG(log, "Nobody has block number for part " << part_name);
return true;
}
void finishMetadataAlter(int alter_version, bool have_data_alter, std::unique_lock <std::mutex> & /*state_lock*/)
{
if (queue_state.empty())
{
throw Exception("Queue shouldn't be empty on metadata alter", ErrorCodes::LOGICAL_ERROR);
}
if (queue_state.begin()->first != alter_version)
{
//LOG_DEBUG(log, "Finished metadata alter with version " + std::to_string(alter_version) + " but current alter in queue is " + std::to_string(queue_state.begin()->first));
throw Exception("Finished metadata alter with version " + std::to_string(alter_version) + " but current alter in queue is " + std::to_string(queue_state.begin()->first), ErrorCodes::LOGICAL_ERROR);
}
LOG_DEBUG(log, "FINISH METADATA ALTER: " << alter_version);
if (!have_data_alter)
{
if (!have_mutation)
queue_state.erase(alter_version);
}
else
{
queue_state[alter_version].metadata_finished = true;
}
}
void finishDataAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/)
{
/// queue can be empty after load of finished mutation without move of mutation pointer
if (queue_state.empty())
{
//LOG_DEBUG(log, "FINISHING DATA ALTER WITH VERSION:" << alter_version << " BUT QUEUE EMPTY");
return;
}
assert(queue_state.count(alter_version));
LOG_DEBUG(log, "FINISH DATA ALTER: " << alter_version);
if (!queue_state.count(alter_version))
std::terminate();
queue_state.erase(alter_version);
if (queue_state[alter_version].metadata_finished)
queue_state.erase(alter_version);
else
queue_state[alter_version].data_finished = true;
}
bool canExecuteDataAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/) const
{
LOG_DEBUG(log, "Can execute data alter:" << alter_version);
for (auto [key, value] : queue_state)
{
LOG_DEBUG(log, "Key:" << key << " is metadata finished:" << value.metadata_finished);
}
if (!queue_state.count(alter_version))
return true;
return queue_state.at(alter_version).metadata_finished;
}
bool canExecuteMetaAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/) const
{
return queue_state.empty() || queue_state.begin()->first == alter_version;

View File

@ -777,16 +777,6 @@ void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil:
if (part_name.empty())
part_name = part->name;
//try
//{
// check(part->columns);
//}
//catch (...)
//{
// LOG_DEBUG(log, "EXCEPTION ADDING PART:" << part_name << " VERSION:" << getMetadataVersion());
// throw;
//}
auto local_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(
part->columns, part->checksums);
@ -2617,12 +2607,8 @@ String StorageReplicatedMergeTree::findReplicaHavingCoveringPart(LogEntry & entr
String largest_part_found;
Strings parts = zookeeper->getChildren(zookeeper_path + "/replicas/" + replica + "/parts");
bool source_part_found = false;
for (const String & part_on_replica : parts)
{
if (part_on_replica == entry.new_part_name)
source_part_found = true;
if (part_on_replica == entry.new_part_name
|| MergeTreePartInfo::contains(part_on_replica, entry.new_part_name, format_version))
{
@ -2642,19 +2628,11 @@ String StorageReplicatedMergeTree::findReplicaHavingCoveringPart(LogEntry & entr
if (!the_same_part)
{
String reject_reason;
if (queue.checkCanFetchPart(largest_part_found))
if (!queue.addFuturePartIfNotCoveredByThem(largest_part_found, entry, reject_reason))
{
if (!queue.addFuturePartIfNotCoveredByThem(largest_part_found, entry, reject_reason))
{
LOG_INFO(log, "Will not fetch part " << largest_part_found << " covering " << entry.new_part_name << ". " << reject_reason);
return {};
}
LOG_INFO(log, "Will not fetch part " << largest_part_found << " covering " << entry.new_part_name << ". " << reject_reason);
return {};
}
if (source_part_found)
return replica;
else
LOG_INFO(
log, "NOT FOUND ANYTHING");
}
else
{

View File

@ -74,7 +74,7 @@ export -f correct_alter_thread;
export -f insert_thread;
TIMEOUT=120
TIMEOUT=500
timeout $TIMEOUT bash -c correct_alter_thread 2> /dev/null &
@ -103,7 +103,8 @@ done
for i in `seq $REPLICAS`; do
$CLICKHOUSE_CLIENT --query "SYSTEM SYNC REPLICA concurrent_alter_mt_$i"
$CLICKHOUSE_CLIENT --query "SELECT SUM(toUInt64(value1)) > $INITIAL_SUM FROM concurrent_alter_mt_$i"
$CLICKHOUSE_CLIENT --query "SELECT COUNT() FROM system.mutations WHERE is_done=0" # all mutations have to be done
#$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_alter_mt_$i"
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_alter_mt_$i"
done