Removed code for freezing partitions due to unacceptable implementation [#METR-20656].

This commit is contained in:
Alexey Milovidov 2016-03-29 22:39:44 +03:00
parent f7b8709885
commit 04875a022a
5 changed files with 2 additions and 105 deletions

View File

@ -63,9 +63,6 @@ public:
/// Примерное количество места на диске, нужное для мерджа. С запасом. /// Примерное количество места на диске, нужное для мерджа. С запасом.
static size_t estimateDiskSpaceForMerge(const MergeTreeData::DataPartsVector & parts); static size_t estimateDiskSpaceForMerge(const MergeTreeData::DataPartsVector & parts);
void freezePartition(const std::string & partition);
void unfreezePartition(const std::string & partition);
/** Отменяет все мерджи. Все выполняющиеся сейчас вызовы mergeParts скоро бросят исключение. /** Отменяет все мерджи. Все выполняющиеся сейчас вызовы mergeParts скоро бросят исключение.
* Все новые вызовы будут бросать исключения, пока не будет вызван uncancel(). * Все новые вызовы будут бросать исключения, пока не будет вызван uncancel().
*/ */
@ -79,7 +76,6 @@ private:
/** Выбрать все куски принадлежащие одной партиции. /** Выбрать все куски принадлежащие одной партиции.
*/ */
MergeTreeData::DataPartsVector selectAllPartsFromPartition(DayNum_t partition); MergeTreeData::DataPartsVector selectAllPartsFromPartition(DayNum_t partition);
bool isPartitionFrozen(const MergeTreeData::DataPart & part) const;
private: private:
using FrozenPartitions = std::unordered_set<DayNum_t>; using FrozenPartitions = std::unordered_set<DayNum_t>;
@ -92,9 +88,6 @@ private:
/// Когда в последний раз писали в лог, что место на диске кончилось (чтобы не писать об этом слишком часто). /// Когда в последний раз писали в лог, что место на диске кончилось (чтобы не писать об этом слишком часто).
time_t disk_space_warning_time = 0; time_t disk_space_warning_time = 0;
std::mutex freeze_lock;
FrozenPartitions frozen_partitions;
CancellationHook cancellation_hook; CancellationHook cancellation_hook;
std::atomic<bool> cancelled {false}; std::atomic<bool> cancelled {false};

View File

@ -294,16 +294,6 @@ private:
zkutil::SingleBarrier getElectionBarrier(); zkutil::SingleBarrier getElectionBarrier();
zkutil::SingleBarrier getCommitBarrier(); zkutil::SingleBarrier getCommitBarrier();
/// Prevent merging jobs from being performed on the partition that we
/// want to reshard on the current host. This operation is persistent:
/// even if a node failure occurred, a partition remains frozen as long
/// as it is not unfrozen explicitely. So use it with care as regards
/// exceptions.
void freezeSourcePartition();
/// Make the partition that we want to reshard available for merging jobs.
void unfreezeSourcePartition();
/// Get the ZooKeeper path of a given coordinator. /// Get the ZooKeeper path of a given coordinator.
std::string getCoordinatorPath(const std::string & coordinator_id) const; std::string getCoordinatorPath(const std::string & coordinator_id) const;
/// Get the ZooKeeper path of a given job partition. /// Get the ZooKeeper path of a given job partition.

View File

@ -83,19 +83,6 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa
{ {
MergeTreeData::DataParts data_parts = data.getDataParts(); MergeTreeData::DataParts data_parts = data.getDataParts();
{
std::lock_guard<std::mutex> guard{freeze_lock};
for (auto it = data_parts.begin(); it != data_parts.end(); )
{
const MergeTreeData::DataPartPtr & part = *it;
if (isPartitionFrozen(*part))
it = data_parts.erase(it);
else
++it;
}
}
if (data_parts.empty()) if (data_parts.empty())
return false; return false;
@ -351,28 +338,6 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
size_t aio_threshold, MergeTreeData::Transaction * out_transaction, size_t aio_threshold, MergeTreeData::Transaction * out_transaction,
DiskSpaceMonitor::Reservation * disk_reservation) DiskSpaceMonitor::Reservation * disk_reservation)
{ {
std::lock_guard<std::mutex> guard{freeze_lock};
if (!frozen_partitions.empty())
{
size_t old_parts_size = parts.size();
auto first_removed = std::remove_if(parts.begin(), parts.end(), [&](const MergeTreeData::DataPartPtr & part)
{
return isPartitionFrozen(*part);
});
parts.erase(first_removed, parts.end());
if (parts.empty())
throw Exception("All the chosen parts lie inside a frozen partition. Cancelling.", ErrorCodes::ABORTED);
if (disk_reservation && (parts.size() != old_parts_size))
{
size_t sum_parts_size_in_bytes = MergeTreeDataMerger::estimateDiskSpaceForMerge(parts);
disk_reservation = DiskSpaceMonitor::reserve(data.getFullPath(), sum_parts_size_in_bytes);
}
}
merge_entry->num_parts = parts.size(); merge_entry->num_parts = parts.size();
LOG_DEBUG(log, "Merging " << parts.size() << " parts: from " << parts.front()->name << " to " << parts.back()->name << " into " << merged_name); LOG_DEBUG(log, "Merging " << parts.size() << " parts: from " << parts.front()->name << " to " << parts.back()->name << " into " << merged_name);
@ -821,23 +786,6 @@ size_t MergeTreeDataMerger::estimateDiskSpaceForMerge(const MergeTreeData::DataP
return static_cast<size_t>(res * DISK_USAGE_COEFFICIENT_TO_RESERVE); return static_cast<size_t>(res * DISK_USAGE_COEFFICIENT_TO_RESERVE);
} }
void MergeTreeDataMerger::freezePartition(const std::string & partition)
{
std::lock_guard<std::mutex> guard{freeze_lock};
frozen_partitions.insert(MergeTreeData::getMonthFromName(partition));
}
void MergeTreeDataMerger::unfreezePartition(const std::string & partition)
{
std::lock_guard<std::mutex> guard{freeze_lock};
frozen_partitions.erase(MergeTreeData::getMonthFromName(partition));
}
bool MergeTreeDataMerger::isPartitionFrozen(const MergeTreeData::DataPart & part) const
{
return frozen_partitions.count(part.month);
}
void MergeTreeDataMerger::abortIfRequested() void MergeTreeDataMerger::abortIfRequested()
{ {
if (cancelled) if (cancelled)

View File

@ -593,9 +593,6 @@ void ReshardingWorker::perform(const std::string & job_descriptor, const std::st
auto & storage = typeid_cast<StorageReplicatedMergeTree &>(*(generic_storage.get())); auto & storage = typeid_cast<StorageReplicatedMergeTree &>(*(generic_storage.get()));
current_job.storage = &storage; current_job.storage = &storage;
/// Protect the source partition from merging jobs.
freezeSourcePartition();
std::string dumped_coordinator_state; std::string dumped_coordinator_state;
auto handle_exception = [&](const std::string & cancel_msg, const std::string & error_msg) auto handle_exception = [&](const std::string & cancel_msg, const std::string & error_msg)
@ -623,7 +620,6 @@ void ReshardingWorker::perform(const std::string & job_descriptor, const std::st
dumped_coordinator_state = dumpCoordinatorState(current_job.coordinator_id); dumped_coordinator_state = dumpCoordinatorState(current_job.coordinator_id);
} }
deletion_lock.release(); deletion_lock.release();
unfreezeSourcePartition();
hardCleanup(); hardCleanup();
} }
} }
@ -715,11 +711,11 @@ void ReshardingWorker::perform(const std::string & job_descriptor, const std::st
{ {
dumped_coordinator_state = dumpCoordinatorState(current_job.coordinator_id); dumped_coordinator_state = dumpCoordinatorState(current_job.coordinator_id);
deletion_lock.release(); deletion_lock.release();
unfreezeSourcePartition();
hardCleanup(); hardCleanup();
} }
else if (ex.code() == ErrorCodes::RESHARDING_COORDINATOR_DELETED) else if (ex.code() == ErrorCodes::RESHARDING_COORDINATOR_DELETED)
unfreezeSourcePartition(); {
}
else if (ex.code() == ErrorCodes::RESHARDING_DISTRIBUTED_JOB_ON_HOLD) else if (ex.code() == ErrorCodes::RESHARDING_DISTRIBUTED_JOB_ON_HOLD)
{ {
/// The current distributed job is on hold and one or more required performers /// The current distributed job is on hold and one or more required performers
@ -757,9 +753,6 @@ void ReshardingWorker::perform(const std::string & job_descriptor, const std::st
} }
deletion_lock.release(); deletion_lock.release();
/// Although the source partition has been dropped, the following function
/// must be called in order to delete all the data that makes freezing fail-safe.
unfreezeSourcePartition();
hardCleanup(); hardCleanup();
LOG_DEBUG(log, "Resharding job successfully completed."); LOG_DEBUG(log, "Resharding job successfully completed.");
@ -2505,25 +2498,6 @@ zkutil::SingleBarrier ReshardingWorker::getCommitBarrier()
return commit_barrier; return commit_barrier;
} }
void ReshardingWorker::freezeSourcePartition()
{
auto zookeeper = context.getZooKeeper();
auto & storage = *(current_job.storage);
zookeeper->createIfNotExists(storage.replica_path + "/frozen_partitions/"
+ current_job.partition, "");
storage.merger.freezePartition(current_job.partition);
}
void ReshardingWorker::unfreezeSourcePartition()
{
auto zookeeper = context.getZooKeeper();
auto & storage = *(current_job.storage);
zookeeper->remove(storage.replica_path + "/frozen_partitions/" + current_job.partition);
storage.merger.unfreezePartition(current_job.partition);
}
std::string ReshardingWorker::getCoordinatorPath(const std::string & coordinator_id) const std::string ReshardingWorker::getCoordinatorPath(const std::string & coordinator_id) const
{ {
return coordination_path + "/" + coordinator_id; return coordination_path + "/" + coordinator_id;

View File

@ -293,11 +293,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
queue.pullLogsToQueue(current_zookeeper, nullptr); queue.pullLogsToQueue(current_zookeeper, nullptr);
/// Замораживаем партиции, которые прежне были в этом состоянии.
auto frozen_partitions = current_zookeeper->getChildren(replica_path + "/frozen_partitions");
for (const auto & partition : frozen_partitions)
merger.freezePartition(partition);
/// В этом потоке реплика будет активирована. /// В этом потоке реплика будет активирована.
restarting_thread.reset(new ReplicatedMergeTreeRestartingThread(*this)); restarting_thread.reset(new ReplicatedMergeTreeRestartingThread(*this));
} }
@ -315,9 +310,6 @@ void StorageReplicatedMergeTree::createNewZooKeeperNodes()
/// Отслеживание отставания реплик. /// Отслеживание отставания реплик.
zookeeper->createIfNotExists(replica_path + "/min_unprocessed_insert_time", ""); zookeeper->createIfNotExists(replica_path + "/min_unprocessed_insert_time", "");
zookeeper->createIfNotExists(replica_path + "/max_processed_insert_time", ""); zookeeper->createIfNotExists(replica_path + "/max_processed_insert_time", "");
/// Партиции, для которых временно не проводятся слияния.
zookeeper->createIfNotExists(replica_path + "/frozen_partitions", "");
} }