mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-21 09:10:48 +00:00
Merge pull request #24777 from ClickHouse/fix_intersection_with_lost_part
Do not crash on intersecting virtual parts
This commit is contained in:
commit
1aff716f18
@ -1,15 +1,13 @@
|
||||
#include <Storages/MergeTree/ActiveDataPartSet.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <algorithm>
|
||||
#include <cassert>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
ActiveDataPartSet::ActiveDataPartSet(MergeTreeDataFormatVersion format_version_, const Strings & names)
|
||||
: format_version(format_version_)
|
||||
@ -18,8 +16,8 @@ ActiveDataPartSet::ActiveDataPartSet(MergeTreeDataFormatVersion format_version_,
|
||||
add(name);
|
||||
}
|
||||
|
||||
|
||||
bool ActiveDataPartSet::add(const String & name, Strings * out_replaced_parts)
|
||||
/// FIXME replace warnings with logical errors
|
||||
bool ActiveDataPartSet::add(const String & name, Strings * out_replaced_parts, Poco::Logger * log)
|
||||
{
|
||||
/// TODO make it exception safe (out_replaced_parts->push_back(...) may throw)
|
||||
auto part_info = MergeTreePartInfo::fromPartName(name, format_version);
|
||||
@ -40,7 +38,11 @@ bool ActiveDataPartSet::add(const String & name, Strings * out_replaced_parts)
|
||||
if (!part_info.contains(it->first))
|
||||
{
|
||||
if (!part_info.isDisjoint(it->first))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} intersects previous part {}. It is a bug.", name, it->first.getPartName());
|
||||
{
|
||||
if (log)
|
||||
LOG_ERROR(log, "Part {} intersects previous part {}. It is a bug.", name, it->first.getPartName());
|
||||
assert(false);
|
||||
}
|
||||
++it;
|
||||
break;
|
||||
}
|
||||
@ -56,15 +58,18 @@ bool ActiveDataPartSet::add(const String & name, Strings * out_replaced_parts)
|
||||
/// Let's go to the right.
|
||||
while (it != part_info_to_name.end() && part_info.contains(it->first))
|
||||
{
|
||||
if (part_info == it->first)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected duplicate part {}. It is a bug.", name);
|
||||
assert(part_info != it->first);
|
||||
if (out_replaced_parts)
|
||||
out_replaced_parts->push_back(it->second);
|
||||
part_info_to_name.erase(it++);
|
||||
}
|
||||
|
||||
if (it != part_info_to_name.end() && !part_info.isDisjoint(it->first))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} intersects next part {}. It is a bug.", name, it->first.getPartName());
|
||||
{
|
||||
if (log)
|
||||
LOG_ERROR(log, "Part {} intersects next part {}. It is a bug.", name, it->first.getPartName());
|
||||
assert(false);
|
||||
}
|
||||
|
||||
part_info_to_name.emplace(part_info, name);
|
||||
return true;
|
||||
|
@ -5,6 +5,10 @@
|
||||
#include <map>
|
||||
#include <vector>
|
||||
|
||||
namespace Poco
|
||||
{
|
||||
class Logger;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -46,7 +50,7 @@ public:
|
||||
|
||||
/// Returns true if the part was actually added. If out_replaced_parts != nullptr, it will contain
|
||||
/// parts that were replaced from the set by the newly added part.
|
||||
bool add(const String & name, Strings * out_replaced_parts = nullptr);
|
||||
bool add(const String & name, Strings * out_replaced_parts = nullptr, Poco::Logger * log = nullptr);
|
||||
|
||||
bool remove(const MergeTreePartInfo & part_info)
|
||||
{
|
||||
|
@ -4908,7 +4908,11 @@ void MergeTreeData::removeQueryId(const String & query_id) const
|
||||
{
|
||||
std::lock_guard lock(query_id_set_mutex);
|
||||
if (query_id_set.find(query_id) == query_id_set.end())
|
||||
{
|
||||
/// Do not throw exception, because this method is used in destructor.
|
||||
LOG_WARNING(log, "We have query_id removed but it's not recorded. This is a bug");
|
||||
assert(false);
|
||||
}
|
||||
else
|
||||
query_id_set.erase(query_id);
|
||||
}
|
||||
@ -5088,7 +5092,10 @@ CurrentlySubmergingEmergingTagger::~CurrentlySubmergingEmergingTagger()
|
||||
for (const auto & part : submerging_parts)
|
||||
{
|
||||
if (!storage.currently_submerging_big_parts.count(part))
|
||||
LOG_WARNING(log, "currently_submerging_big_parts doesn't contain part {} to erase. This is a bug", part->name);
|
||||
{
|
||||
LOG_ERROR(log, "currently_submerging_big_parts doesn't contain part {} to erase. This is a bug", part->name);
|
||||
assert(false);
|
||||
}
|
||||
else
|
||||
storage.currently_submerging_big_parts.erase(part);
|
||||
}
|
||||
|
@ -71,6 +71,13 @@ struct MergeTreePartInfo
|
||||
|| max_block < rhs.min_block;
|
||||
}
|
||||
|
||||
bool isFakeDropRangePart() const
|
||||
{
|
||||
/// Another max level was previously used for REPLACE/MOVE PARTITION
|
||||
auto another_max_level = std::numeric_limits<decltype(level)>::max();
|
||||
return level == MergeTreePartInfo::MAX_LEVEL || level == another_max_level;
|
||||
}
|
||||
|
||||
String getPartName() const;
|
||||
String getPartNameV0(DayNum left_date, DayNum right_date) const;
|
||||
UInt64 getBlocksCount() const
|
||||
|
@ -413,6 +413,24 @@ ReplicatedMergeTreeLogEntry::Ptr ReplicatedMergeTreeLogEntry::parse(const String
|
||||
return res;
|
||||
}
|
||||
|
||||
std::optional<String> ReplicatedMergeTreeLogEntryData::getDropRange(MergeTreeDataFormatVersion format_version) const
|
||||
{
|
||||
if (type == DROP_RANGE)
|
||||
return new_part_name;
|
||||
|
||||
if (type == REPLACE_RANGE)
|
||||
{
|
||||
auto drop_range_info = MergeTreePartInfo::fromPartName(replace_range_entry->drop_range_part_name, format_version);
|
||||
if (!ReplaceRangeEntry::isMovePartitionOrAttachFrom(drop_range_info))
|
||||
{
|
||||
/// It's REPLACE, not MOVE or ATTACH, so drop range is real
|
||||
return replace_range_entry->drop_range_part_name;
|
||||
}
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
Strings ReplicatedMergeTreeLogEntryData::getVirtualPartNames(MergeTreeDataFormatVersion format_version) const
|
||||
{
|
||||
/// Doesn't produce any part
|
||||
@ -431,11 +449,8 @@ Strings ReplicatedMergeTreeLogEntryData::getVirtualPartNames(MergeTreeDataFormat
|
||||
{
|
||||
Strings res = replace_range_entry->new_part_names;
|
||||
auto drop_range_info = MergeTreePartInfo::fromPartName(replace_range_entry->drop_range_part_name, format_version);
|
||||
if (!ReplaceRangeEntry::isMovePartitionOrAttachFrom(drop_range_info))
|
||||
{
|
||||
/// It's REPLACE, not MOVE or ATTACH, so drop range is real
|
||||
res.emplace_back(replace_range_entry->drop_range_part_name);
|
||||
}
|
||||
if (auto drop_range = getDropRange(format_version))
|
||||
res.emplace_back(*drop_range);
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -152,6 +152,9 @@ struct ReplicatedMergeTreeLogEntryData
|
||||
return res;
|
||||
}
|
||||
|
||||
/// Returns fake part for drop range (for DROP_RANGE and REPLACE_RANGE)
|
||||
std::optional<String> getDropRange(MergeTreeDataFormatVersion format_version) const;
|
||||
|
||||
/// Access under queue_mutex, see ReplicatedMergeTreeQueue.
|
||||
bool currently_executing = false; /// Whether the action is executing now.
|
||||
bool removed_by_other_entry = false;
|
||||
|
@ -18,6 +18,7 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int TABLE_DIFFERS_TOO_MUCH;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
static const auto PART_CHECK_ERROR_SLEEP_MS = 5 * 1000;
|
||||
@ -367,8 +368,8 @@ void ReplicatedMergeTreePartCheckThread::run()
|
||||
{
|
||||
if (!parts_set.empty())
|
||||
{
|
||||
LOG_ERROR(log, "Non-empty parts_set with empty parts_queue. This is a bug.");
|
||||
parts_set.clear();
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Non-empty parts_set with empty parts_queue. This is a bug.");
|
||||
}
|
||||
}
|
||||
else
|
||||
@ -401,7 +402,7 @@ void ReplicatedMergeTreePartCheckThread::run()
|
||||
|
||||
if (parts_queue.empty())
|
||||
{
|
||||
LOG_ERROR(log, "Someone erased checking part from parts_queue. This is a bug.");
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Someone erased checking part from parts_queue. This is a bug.");
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -34,24 +34,29 @@ ReplicatedMergeTreeQueue::ReplicatedMergeTreeQueue(StorageReplicatedMergeTree &
|
||||
}
|
||||
|
||||
|
||||
void ReplicatedMergeTreeQueue::clear()
|
||||
{
|
||||
auto locks = lockQueue();
|
||||
assert(future_parts.empty());
|
||||
current_parts.clear();
|
||||
virtual_parts.clear();
|
||||
queue.clear();
|
||||
inserts_by_time.clear();
|
||||
mutations_by_znode.clear();
|
||||
mutations_by_partition.clear();
|
||||
mutation_pointer.clear();
|
||||
}
|
||||
|
||||
void ReplicatedMergeTreeQueue::initialize(const MergeTreeData::DataParts & parts)
|
||||
{
|
||||
addVirtualParts(parts);
|
||||
}
|
||||
|
||||
|
||||
void ReplicatedMergeTreeQueue::addVirtualParts(const MergeTreeData::DataParts & parts)
|
||||
{
|
||||
std::lock_guard lock(state_mutex);
|
||||
|
||||
for (const auto & part : parts)
|
||||
{
|
||||
current_parts.add(part->name);
|
||||
virtual_parts.add(part->name);
|
||||
current_parts.add(part->name, nullptr, log);
|
||||
virtual_parts.add(part->name, nullptr, log);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
bool ReplicatedMergeTreeQueue::isVirtualPart(const MergeTreeData::DataPartPtr & data_part) const
|
||||
{
|
||||
std::lock_guard lock(state_mutex);
|
||||
@ -74,9 +79,6 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
|
||||
/// Reset batch size on initialization to recover from possible errors of too large batch size.
|
||||
current_multi_batch_size = 1;
|
||||
|
||||
String log_pointer_str = zookeeper->get(fs::path(replica_path) / "log_pointer");
|
||||
log_pointer = log_pointer_str.empty() ? 0 : parse<UInt64>(log_pointer_str);
|
||||
|
||||
std::unordered_set<String> already_loaded_paths;
|
||||
{
|
||||
std::lock_guard lock(state_mutex);
|
||||
@ -134,7 +136,7 @@ void ReplicatedMergeTreeQueue::insertUnlocked(
|
||||
{
|
||||
for (const String & virtual_part_name : entry->getVirtualPartNames(format_version))
|
||||
{
|
||||
virtual_parts.add(virtual_part_name);
|
||||
virtual_parts.add(virtual_part_name, nullptr, log);
|
||||
addPartToMutations(virtual_part_name);
|
||||
}
|
||||
|
||||
@ -221,23 +223,17 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
|
||||
|
||||
for (const String & virtual_part_name : entry->getVirtualPartNames(format_version))
|
||||
{
|
||||
current_parts.add(virtual_part_name);
|
||||
current_parts.add(virtual_part_name, nullptr, log);
|
||||
|
||||
/// These parts are already covered by newer part, we don't have to
|
||||
/// mutate it.
|
||||
removeCoveredPartsFromMutations(virtual_part_name, /*remove_part = */ false, /*remove_covered_parts = */ true);
|
||||
}
|
||||
|
||||
String drop_range_part_name;
|
||||
if (entry->type == LogEntry::DROP_RANGE)
|
||||
drop_range_part_name = entry->new_part_name;
|
||||
else if (entry->type == LogEntry::REPLACE_RANGE)
|
||||
drop_range_part_name = entry->replace_range_entry->drop_range_part_name;
|
||||
|
||||
if (!drop_range_part_name.empty())
|
||||
if (auto drop_range_part_name = entry->getDropRange(format_version))
|
||||
{
|
||||
current_parts.remove(drop_range_part_name);
|
||||
virtual_parts.remove(drop_range_part_name);
|
||||
current_parts.remove(*drop_range_part_name);
|
||||
virtual_parts.remove(*drop_range_part_name);
|
||||
}
|
||||
|
||||
if (entry->type == LogEntry::ALTER_METADATA)
|
||||
@ -302,9 +298,7 @@ void ReplicatedMergeTreeQueue::addPartToMutations(const String & part_name)
|
||||
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
|
||||
|
||||
/// Do not add special virtual parts to parts_to_do
|
||||
auto max_level = MergeTreePartInfo::MAX_LEVEL; /// DROP/DETACH PARTITION
|
||||
auto another_max_level = std::numeric_limits<decltype(part_info.level)>::max(); /// REPLACE/MOVE PARTITION
|
||||
if (part_info.level == max_level || part_info.level == another_max_level)
|
||||
if (part_info.isFakeDropRangePart())
|
||||
return;
|
||||
|
||||
auto in_partition = mutations_by_partition.find(part_info.partition_id);
|
||||
@ -344,7 +338,9 @@ void ReplicatedMergeTreeQueue::updateTimesInZooKeeper(
|
||||
auto code = zookeeper->tryMulti(ops, responses);
|
||||
|
||||
if (code != Coordination::Error::ZOK)
|
||||
LOG_ERROR(log, "Couldn't set value of nodes for insert times ({}/min_unprocessed_insert_time, max_processed_insert_time): {}. This shouldn't happen often.", replica_path, Coordination::errorMessage(code));
|
||||
LOG_ERROR(log, "Couldn't set value of nodes for insert times "
|
||||
"({}/min_unprocessed_insert_time, max_processed_insert_time): {}. "
|
||||
"This shouldn't happen often.", replica_path, Coordination::errorMessage(code));
|
||||
}
|
||||
}
|
||||
|
||||
@ -392,7 +388,8 @@ void ReplicatedMergeTreeQueue::removeProcessedEntry(zkutil::ZooKeeperPtr zookeep
|
||||
}
|
||||
|
||||
if (!found && need_remove_from_zk)
|
||||
throw Exception("Can't find " + entry->znode_name + " in the memory queue. It is a bug", ErrorCodes::LOGICAL_ERROR);
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find {} in the memory queue. It is a bug. Entry: {}",
|
||||
entry->znode_name, entry->toString());
|
||||
|
||||
notifySubscribers(queue_size);
|
||||
|
||||
@ -434,7 +431,7 @@ bool ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, const Stri
|
||||
{
|
||||
auto part_in_current_parts = current_parts.getContainingPart(source_part);
|
||||
if (part_in_current_parts == source_part)
|
||||
virtual_parts.add(source_part);
|
||||
virtual_parts.add(source_part, nullptr, log);
|
||||
}
|
||||
}
|
||||
|
||||
@ -462,8 +459,9 @@ bool ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, const Stri
|
||||
}
|
||||
|
||||
|
||||
bool ReplicatedMergeTreeQueue::removeFromVirtualParts(const MergeTreePartInfo & part_info)
|
||||
bool ReplicatedMergeTreeQueue::removeFailedQuorumPart(const MergeTreePartInfo & part_info)
|
||||
{
|
||||
assert(part_info.level == 0);
|
||||
std::lock_guard lock(state_mutex);
|
||||
return virtual_parts.remove(part_info);
|
||||
}
|
||||
@ -587,8 +585,6 @@ int32_t ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper
|
||||
{
|
||||
std::lock_guard state_lock(state_mutex);
|
||||
|
||||
log_pointer = last_entry_index + 1;
|
||||
|
||||
for (size_t copied_entry_idx = 0, num_copied_entries = copied_entries.size(); copied_entry_idx < num_copied_entries; ++copied_entry_idx)
|
||||
{
|
||||
String path_created = dynamic_cast<const Coordination::CreateResponse &>(*responses[copied_entry_idx]).path_created;
|
||||
@ -758,9 +754,7 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
|
||||
/// Such parts do not exist and will never appear, so we should not add virtual parts to parts_to_do list.
|
||||
/// Fortunately, it's easy to distinguish virtual parts from normal parts by part level.
|
||||
/// See StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(...)
|
||||
auto max_level = MergeTreePartInfo::MAX_LEVEL; /// DROP/DETACH PARTITION
|
||||
auto another_max_level = std::numeric_limits<decltype(part_info.level)>::max(); /// REPLACE/MOVE PARTITION
|
||||
if (part_info.level == max_level || part_info.level == another_max_level)
|
||||
if (part_info.isFakeDropRangePart())
|
||||
continue;
|
||||
|
||||
auto it = entry->block_numbers.find(part_info.partition_id);
|
||||
@ -941,9 +935,6 @@ void ReplicatedMergeTreeQueue::removePartProducingOpsInRange(
|
||||
if ((*it)->currently_executing)
|
||||
to_wait.push_back(*it);
|
||||
auto code = zookeeper->tryRemove(fs::path(replica_path) / "queue" / (*it)->znode_name);
|
||||
/// FIXME it's probably unsafe to remove entries non-atomically
|
||||
/// when this method called directly from alter query (not from replication queue task),
|
||||
/// because entries will be lost if ALTER fails.
|
||||
if (code != Coordination::Error::ZOK)
|
||||
LOG_INFO(log, "Couldn't remove {}: {}", (fs::path(replica_path) / "queue" / (*it)->znode_name).string(), Coordination::errorMessage(code));
|
||||
|
||||
@ -1259,7 +1250,9 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::CurrentlyExecuting(const Replicate
|
||||
for (const String & new_part_name : entry->getBlockingPartNames(queue.format_version))
|
||||
{
|
||||
if (!queue.future_parts.emplace(new_part_name, entry).second)
|
||||
throw Exception("Tagging already tagged future part " + new_part_name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Tagging already tagged future part {}. This is a bug. "
|
||||
"It happened on attempt to execute {}: {}",
|
||||
new_part_name, entry->znode_name, entry->toString());
|
||||
}
|
||||
}
|
||||
|
||||
@ -1277,7 +1270,9 @@ void ReplicatedMergeTreeQueue::CurrentlyExecuting::setActualPartName(ReplicatedM
|
||||
return;
|
||||
|
||||
if (!queue.future_parts.emplace(entry.actual_new_part_name, entry.shared_from_this()).second)
|
||||
throw Exception("Attaching already existing future part " + entry.actual_new_part_name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attaching already existing future part {}. This is a bug. "
|
||||
"It happened on attempt to execute {}: {}",
|
||||
entry.actual_new_part_name, entry.znode_name, entry.toString());
|
||||
}
|
||||
|
||||
|
||||
@ -1296,13 +1291,19 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::~CurrentlyExecuting()
|
||||
for (const String & new_part_name : entry->getBlockingPartNames(queue.format_version))
|
||||
{
|
||||
if (!queue.future_parts.erase(new_part_name))
|
||||
{
|
||||
LOG_ERROR(queue.log, "Untagging already untagged future part {}. This is a bug.", new_part_name);
|
||||
assert(false);
|
||||
}
|
||||
}
|
||||
|
||||
if (!entry->actual_new_part_name.empty())
|
||||
{
|
||||
if (entry->actual_new_part_name != entry->new_part_name && !queue.future_parts.erase(entry->actual_new_part_name))
|
||||
{
|
||||
LOG_ERROR(queue.log, "Untagging already untagged future part {}. This is a bug.", entry->actual_new_part_name);
|
||||
assert(false);
|
||||
}
|
||||
|
||||
entry->actual_new_part_name.clear();
|
||||
}
|
||||
|
@ -92,9 +92,6 @@ private:
|
||||
using FuturePartsSet = std::map<String, LogEntryPtr>;
|
||||
FuturePartsSet future_parts;
|
||||
|
||||
/// Index of the first log entry that we didn't see yet.
|
||||
Int64 log_pointer = 0;
|
||||
|
||||
/// Avoid parallel execution of queue enties, which may remove other entries from the queue.
|
||||
bool currently_executing_drop_or_replace_range = false;
|
||||
|
||||
@ -183,9 +180,6 @@ private:
|
||||
/// Ensures that only one thread is simultaneously updating mutations.
|
||||
std::mutex update_mutations_mutex;
|
||||
|
||||
/// Put a set of (already existing) parts in virtual_parts.
|
||||
void addVirtualParts(const MergeTreeData::DataParts & parts);
|
||||
|
||||
/// Insert new entry from log into queue
|
||||
void insertUnlocked(
|
||||
const LogEntryPtr & entry, std::optional<time_t> & min_unprocessed_insert_time_changed,
|
||||
@ -275,7 +269,10 @@ public:
|
||||
ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_, ReplicatedMergeTreeMergeStrategyPicker & merge_strategy_picker_);
|
||||
~ReplicatedMergeTreeQueue();
|
||||
|
||||
/// Clears queue state
|
||||
void clear();
|
||||
|
||||
/// Put a set of (already existing) parts in virtual_parts.
|
||||
void initialize(const MergeTreeData::DataParts & parts);
|
||||
|
||||
/** Inserts an action to the end of the queue.
|
||||
@ -295,7 +292,7 @@ public:
|
||||
*/
|
||||
bool load(zkutil::ZooKeeperPtr zookeeper);
|
||||
|
||||
bool removeFromVirtualParts(const MergeTreePartInfo & part_info);
|
||||
bool removeFailedQuorumPart(const MergeTreePartInfo & part_info);
|
||||
|
||||
/** Copy the new entries from the shared log to the queue of this replica. Set the log_pointer to the appropriate value.
|
||||
* If watch_callback is not empty, will call it when new entries appear in the log.
|
||||
|
@ -174,6 +174,9 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
|
||||
storage.partial_shutdown_called = false;
|
||||
storage.partial_shutdown_event.reset();
|
||||
|
||||
/// Start queue processing
|
||||
storage.background_executor.start();
|
||||
|
||||
storage.queue_updating_task->activateAndSchedule();
|
||||
storage.mutations_updating_task->activateAndSchedule();
|
||||
storage.mutations_finalizing_task->activateAndSchedule();
|
||||
@ -227,7 +230,7 @@ void ReplicatedMergeTreeRestartingThread::removeFailedQuorumParts()
|
||||
{
|
||||
LOG_DEBUG(log, "Found part {} with failed quorum. Moving to detached. This shouldn't happen often.", part_name);
|
||||
storage.forgetPartAndMoveToDetached(part, "noquorum");
|
||||
storage.queue.removeFromVirtualParts(part->info);
|
||||
storage.queue.removeFailedQuorumPart(part->info);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -352,6 +355,9 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
|
||||
storage.cleanup_thread.stop();
|
||||
storage.part_check_thread.stop();
|
||||
|
||||
/// Stop queue processing
|
||||
storage.background_executor.finish();
|
||||
|
||||
LOG_TRACE(log, "Threads finished");
|
||||
}
|
||||
|
||||
|
@ -2050,7 +2050,7 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry)
|
||||
if (code == Coordination::Error::ZOK)
|
||||
{
|
||||
LOG_DEBUG(log, "Marked quorum for part {} as failed.", entry.new_part_name);
|
||||
queue.removeFromVirtualParts(part_info);
|
||||
queue.removeFailedQuorumPart(part_info);
|
||||
return true;
|
||||
}
|
||||
else if (code == Coordination::Error::ZBADVERSION || code == Coordination::Error::ZNONODE || code == Coordination::Error::ZNODEEXISTS)
|
||||
@ -2063,7 +2063,10 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry)
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_WARNING(log, "No active replica has part {}, but that part needs quorum and /quorum/status contains entry about another part {}. It means that part was successfully written to {} replicas, but then all of them goes offline. Or it is a bug.", entry.new_part_name, quorum_entry.part_name, entry.quorum);
|
||||
LOG_WARNING(log, "No active replica has part {}, "
|
||||
"but that part needs quorum and /quorum/status contains entry about another part {}. "
|
||||
"It means that part was successfully written to {} replicas, but then all of them goes offline. "
|
||||
"Or it is a bug.", entry.new_part_name, quorum_entry.part_name, entry.quorum);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2753,7 +2756,6 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
|
||||
{
|
||||
if (active_parts_set.getContainingPart(part).empty())
|
||||
{
|
||||
queue.remove(zookeeper, part);
|
||||
parts_to_remove_from_zk.emplace_back(part);
|
||||
LOG_WARNING(log, "Source replica does not have part {}. Removing it from ZooKeeper.", part);
|
||||
}
|
||||
@ -2998,6 +3000,7 @@ void StorageReplicatedMergeTree::cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zooke
|
||||
|
||||
/// Clear obsolete queue that we no longer need.
|
||||
zookeeper->removeChildren(fs::path(replica_path) / "queue");
|
||||
queue.clear();
|
||||
|
||||
/// Will do repair from the selected replica.
|
||||
cloneReplica(source_replica, source_is_lost_stat, zookeeper);
|
||||
|
@ -37,12 +37,17 @@ def start_cluster():
|
||||
def test_inconsistent_parts_if_drop_while_replica_not_active(start_cluster):
|
||||
with PartitionManager() as pm:
|
||||
# insert into all replicas
|
||||
for i in range(50):
|
||||
for i in range(10):
|
||||
node1.query("INSERT INTO test_table VALUES ('2019-08-16', {})".format(i))
|
||||
assert_eq_with_retry(node2, "SELECT count(*) FROM test_table", node1.query("SELECT count(*) FROM test_table"))
|
||||
|
||||
# disable network on the first replica
|
||||
# partition the first replica from the second one and (later) from zk
|
||||
pm.partition_instances(node1, node2)
|
||||
|
||||
# insert some parts on the second replica only, we will drop these parts
|
||||
for i in range(10):
|
||||
node2.query("INSERT INTO test_table VALUES ('2019-08-16', {})".format(10 + i))
|
||||
|
||||
pm.drop_instance_zk_connections(node1)
|
||||
|
||||
# drop all parts on the second replica
|
||||
@ -51,9 +56,14 @@ def test_inconsistent_parts_if_drop_while_replica_not_active(start_cluster):
|
||||
|
||||
# insert into the second replica
|
||||
# DROP_RANGE will be removed from the replication log and the first replica will be lost
|
||||
for i in range(50):
|
||||
node2.query("INSERT INTO test_table VALUES ('2019-08-16', {})".format(50 + i))
|
||||
for i in range(20):
|
||||
node2.query("INSERT INTO test_table VALUES ('2019-08-16', {})".format(20 + i))
|
||||
|
||||
# the first replica will be cloned from the second
|
||||
pm.heal_all()
|
||||
assert_eq_with_retry(node1, "SELECT count(*) FROM test_table", node2.query("SELECT count(*) FROM test_table"))
|
||||
# ensure replica was cloned
|
||||
assert node1.contains_in_log("Will mimic node2")
|
||||
# queue must be empty (except some merges that are possibly executing right now)
|
||||
assert node1.query("SELECT count() FROM system.replication_queue WHERE type != 'MERGE_PARTS'") == "0\n"
|
||||
assert node2.query("SELECT count() FROM system.replication_queue WHERE type != 'MERGE_PARTS'") == "0\n"
|
||||
|
@ -12,7 +12,7 @@ DATA_SIZE=200
|
||||
SEQ=$(seq 0 $(($NUM_REPLICAS - 1)))
|
||||
|
||||
for REPLICA in $SEQ; do $CLICKHOUSE_CLIENT -n --query "DROP TABLE IF EXISTS r$REPLICA"; done
|
||||
for REPLICA in $SEQ; do $CLICKHOUSE_CLIENT -n --query "CREATE TABLE r$REPLICA (x UInt64) ENGINE = ReplicatedMergeTree('/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/table', 'r$REPLICA') ORDER BY x SETTINGS min_bytes_for_wide_part = '10M';"; done
|
||||
for REPLICA in $SEQ; do $CLICKHOUSE_CLIENT -n --query "CREATE TABLE r$REPLICA (x UInt64) ENGINE = ReplicatedMergeTree('/test/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/table', 'r$REPLICA') ORDER BY x SETTINGS min_bytes_for_wide_part = '10M';"; done
|
||||
|
||||
function thread()
|
||||
{
|
||||
|
@ -11,7 +11,7 @@ $CLICKHOUSE_CLIENT --query "CREATE DATABASE test_01320 ENGINE=Ordinary" # Diff
|
||||
|
||||
function thread1()
|
||||
{
|
||||
while true; do $CLICKHOUSE_CLIENT -n --query "CREATE TABLE test_01320.r (x UInt64) ENGINE = ReplicatedMergeTree('/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/table', 'r') ORDER BY x; DROP TABLE test_01320.r;"; done
|
||||
while true; do $CLICKHOUSE_CLIENT -n --query "CREATE TABLE test_01320.r (x UInt64) ENGINE = ReplicatedMergeTree('/test/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/table', 'r') ORDER BY x; DROP TABLE test_01320.r;"; done
|
||||
}
|
||||
|
||||
function thread2()
|
||||
|
@ -11,7 +11,7 @@ FREEZE_OUT_STRUCTURE='backup_name String, backup_path String , part_backup_path
|
||||
# setup
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS table_for_freeze_replicated;"
|
||||
${CLICKHOUSE_CLIENT} --query "CREATE TABLE table_for_freeze_replicated (key UInt64, value String) ENGINE = ReplicatedMergeTree('/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/table_for_freeze_replicated', '1') ORDER BY key PARTITION BY key % 10;"
|
||||
${CLICKHOUSE_CLIENT} --query "CREATE TABLE table_for_freeze_replicated (key UInt64, value String) ENGINE = ReplicatedMergeTree('/test/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/table_for_freeze_replicated', '1') ORDER BY key PARTITION BY key % 10;"
|
||||
${CLICKHOUSE_CLIENT} --query "INSERT INTO table_for_freeze_replicated SELECT number, toString(number) from numbers(10);"
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query "ALTER TABLE table_for_freeze_replicated FREEZE WITH NAME 'test_01417' FORMAT TSVWithNames SETTINGS alter_partition_verbose_result = 1;" \
|
||||
|
Loading…
Reference in New Issue
Block a user