Changed low level data race to high level data race [#CLICKHOUSE-2]

This commit is contained in:
Alexey Milovidov 2018-03-03 19:46:32 +03:00
parent 3b0d713120
commit 2e371822ad
2 changed files with 43 additions and 38 deletions

View File

@ -31,7 +31,7 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
LOG_DEBUG(log, "Loading queue from " << queue_path); LOG_DEBUG(log, "Loading queue from " << queue_path);
bool updated = false; bool updated = false;
bool min_unprocessed_insert_time_changed = false; std::optional<time_t> min_unprocessed_insert_time_changed;
{ {
std::lock_guard<std::mutex> lock(mutex); std::lock_guard<std::mutex> lock(mutex);
@ -65,17 +65,12 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
LogEntryPtr entry = LogEntry::parse(res.value, res.stat); LogEntryPtr entry = LogEntry::parse(res.value, res.stat);
entry->znode_name = future.first; entry->znode_name = future.first;
time_t prev_min_unprocessed_insert_time = min_unprocessed_insert_time; insertUnlocked(entry, min_unprocessed_insert_time_changed, lock);
insertUnlocked(entry, lock);
updated = true; updated = true;
if (min_unprocessed_insert_time != prev_min_unprocessed_insert_time)
min_unprocessed_insert_time_changed = true;
} }
} }
updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, false); updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, {});
LOG_TRACE(log, "Loaded queue"); LOG_TRACE(log, "Loaded queue");
return updated; return updated;
@ -96,7 +91,7 @@ void ReplicatedMergeTreeQueue::initialize(
} }
void ReplicatedMergeTreeQueue::insertUnlocked(LogEntryPtr & entry, std::lock_guard<std::mutex> &) void ReplicatedMergeTreeQueue::insertUnlocked(LogEntryPtr & entry, std::optional<time_t> & min_unprocessed_insert_time_changed, std::lock_guard<std::mutex> &)
{ {
virtual_parts.add(entry->new_part_name); virtual_parts.add(entry->new_part_name);
queue.push_back(entry); queue.push_back(entry);
@ -106,30 +101,32 @@ void ReplicatedMergeTreeQueue::insertUnlocked(LogEntryPtr & entry, std::lock_gua
inserts_by_time.insert(entry); inserts_by_time.insert(entry);
if (entry->create_time && (!min_unprocessed_insert_time || entry->create_time < min_unprocessed_insert_time)) if (entry->create_time && (!min_unprocessed_insert_time || entry->create_time < min_unprocessed_insert_time))
{
min_unprocessed_insert_time = entry->create_time; min_unprocessed_insert_time = entry->create_time;
min_unprocessed_insert_time_changed = min_unprocessed_insert_time;
}
} }
} }
void ReplicatedMergeTreeQueue::insert(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry) void ReplicatedMergeTreeQueue::insert(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry)
{ {
time_t prev_min_unprocessed_insert_time; std::optional<time_t> min_unprocessed_insert_time_changed;
{ {
std::lock_guard<std::mutex> lock(mutex); std::lock_guard<std::mutex> lock(mutex);
prev_min_unprocessed_insert_time = min_unprocessed_insert_time; insertUnlocked(entry, min_unprocessed_insert_time_changed, lock);
insertUnlocked(entry, lock);
} }
if (min_unprocessed_insert_time != prev_min_unprocessed_insert_time) if (min_unprocessed_insert_time_changed)
updateTimesInZooKeeper(zookeeper, true, false); updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, {});
} }
void ReplicatedMergeTreeQueue::updateTimesOnRemoval( void ReplicatedMergeTreeQueue::updateTimesOnRemoval(
const LogEntryPtr & entry, const LogEntryPtr & entry,
bool & min_unprocessed_insert_time_changed, std::optional<time_t> & min_unprocessed_insert_time_changed,
bool & max_processed_insert_time_changed, std::optional<time_t> & max_processed_insert_time_changed,
std::unique_lock<std::mutex> &) std::unique_lock<std::mutex> &)
{ {
if (entry->type != LogEntry::GET_PART) if (entry->type != LogEntry::GET_PART)
@ -140,39 +137,40 @@ void ReplicatedMergeTreeQueue::updateTimesOnRemoval(
if (inserts_by_time.empty()) if (inserts_by_time.empty())
{ {
min_unprocessed_insert_time = 0; min_unprocessed_insert_time = 0;
min_unprocessed_insert_time_changed = true; min_unprocessed_insert_time_changed = min_unprocessed_insert_time;
} }
else if ((*inserts_by_time.begin())->create_time > min_unprocessed_insert_time) else if ((*inserts_by_time.begin())->create_time > min_unprocessed_insert_time)
{ {
min_unprocessed_insert_time = (*inserts_by_time.begin())->create_time; min_unprocessed_insert_time = (*inserts_by_time.begin())->create_time;
min_unprocessed_insert_time_changed = true; min_unprocessed_insert_time_changed = min_unprocessed_insert_time;
} }
if (entry->create_time > max_processed_insert_time) if (entry->create_time > max_processed_insert_time)
{ {
max_processed_insert_time = entry->create_time; max_processed_insert_time = entry->create_time;
max_processed_insert_time_changed = true; max_processed_insert_time_changed = max_processed_insert_time;
} }
} }
void ReplicatedMergeTreeQueue::updateTimesInZooKeeper( void ReplicatedMergeTreeQueue::updateTimesInZooKeeper(
zkutil::ZooKeeperPtr zookeeper, zkutil::ZooKeeperPtr zookeeper,
bool min_unprocessed_insert_time_changed, std::optional<time_t> min_unprocessed_insert_time_changed,
bool max_processed_insert_time_changed) const std::optional<time_t> max_processed_insert_time_changed) const
{ {
/// Here there can be a race condition (with different remove at the same time). /// Here there can be a race condition (with different remove at the same time)
/// because we update times in ZooKeeper with unlocked mutex, while these times may change.
/// Consider it unimportant (for a short time, ZK will have a slightly different time value). /// Consider it unimportant (for a short time, ZK will have a slightly different time value).
/// We also read values of `min_unprocessed_insert_time`, `max_processed_insert_time` without synchronization.
zkutil::Ops ops; zkutil::Ops ops;
if (min_unprocessed_insert_time_changed) if (min_unprocessed_insert_time_changed)
ops.emplace_back(std::make_unique<zkutil::Op::SetData>( ops.emplace_back(std::make_unique<zkutil::Op::SetData>(
replica_path + "/min_unprocessed_insert_time", toString(min_unprocessed_insert_time), -1)); replica_path + "/min_unprocessed_insert_time", toString(*min_unprocessed_insert_time_changed), -1));
if (max_processed_insert_time_changed) if (max_processed_insert_time_changed)
ops.emplace_back(std::make_unique<zkutil::Op::SetData>( ops.emplace_back(std::make_unique<zkutil::Op::SetData>(
replica_path + "/max_processed_insert_time", toString(max_processed_insert_time), -1)); replica_path + "/max_processed_insert_time", toString(*max_processed_insert_time_changed), -1));
if (!ops.empty()) if (!ops.empty())
{ {
@ -194,8 +192,8 @@ void ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, LogEntryPt
LOG_ERROR(log, "Couldn't remove " << replica_path << "/queue/" << entry->znode_name << ": " LOG_ERROR(log, "Couldn't remove " << replica_path << "/queue/" << entry->znode_name << ": "
<< zkutil::ZooKeeper::error2string(code) << ". This shouldn't happen often."); << zkutil::ZooKeeper::error2string(code) << ". This shouldn't happen often.");
bool min_unprocessed_insert_time_changed = false; std::optional<time_t> min_unprocessed_insert_time_changed;
bool max_processed_insert_time_changed = false; std::optional<time_t> max_processed_insert_time_changed;
{ {
std::unique_lock<std::mutex> lock(mutex); std::unique_lock<std::mutex> lock(mutex);
@ -225,8 +223,8 @@ bool ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, const Stri
{ {
LogEntryPtr found; LogEntryPtr found;
bool min_unprocessed_insert_time_changed = false; std::optional<time_t> min_unprocessed_insert_time_changed;
bool max_processed_insert_time_changed = false; std::optional<time_t> max_processed_insert_time_changed;
{ {
std::unique_lock<std::mutex> lock(mutex); std::unique_lock<std::mutex> lock(mutex);
@ -330,7 +328,7 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z
std::vector<LogEntryPtr> copied_entries; std::vector<LogEntryPtr> copied_entries;
copied_entries.reserve(end - begin); copied_entries.reserve(end - begin);
bool min_unprocessed_insert_time_changed = false; std::optional<time_t> min_unprocessed_insert_time_changed;
for (auto & future : futures) for (auto & future : futures)
{ {
@ -347,7 +345,7 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z
if (entry.create_time && (!min_unprocessed_insert_time || entry.create_time < min_unprocessed_insert_time)) if (entry.create_time && (!min_unprocessed_insert_time || entry.create_time < min_unprocessed_insert_time))
{ {
min_unprocessed_insert_time = entry.create_time; min_unprocessed_insert_time = entry.create_time;
min_unprocessed_insert_time_changed = true; min_unprocessed_insert_time_changed = min_unprocessed_insert_time;
} }
} }
} }
@ -357,7 +355,7 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z
if (min_unprocessed_insert_time_changed) if (min_unprocessed_insert_time_changed)
ops.emplace_back(std::make_unique<zkutil::Op::SetData>( ops.emplace_back(std::make_unique<zkutil::Op::SetData>(
replica_path + "/min_unprocessed_insert_time", toString(min_unprocessed_insert_time), -1)); replica_path + "/min_unprocessed_insert_time", toString(*min_unprocessed_insert_time_changed), -1));
try try
{ {
@ -385,7 +383,8 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z
String path_created = dynamic_cast<zkutil::Op::Create &>(*ops[i]).getPathCreated(); String path_created = dynamic_cast<zkutil::Op::Create &>(*ops[i]).getPathCreated();
copied_entries[i]->znode_name = path_created.substr(path_created.find_last_of('/') + 1); copied_entries[i]->znode_name = path_created.substr(path_created.find_last_of('/') + 1);
insertUnlocked(copied_entries[i], lock); std::optional<time_t> unused = false;
insertUnlocked(copied_entries[i], unused, lock);
} }
last_queue_update = time(nullptr); last_queue_update = time(nullptr);
@ -460,8 +459,8 @@ void ReplicatedMergeTreeQueue::removeGetsAndMergesInRange(zkutil::ZooKeeperPtr z
{ {
Queue to_wait; Queue to_wait;
size_t removed_entries = 0; size_t removed_entries = 0;
bool min_unprocessed_insert_time_changed = false; std::optional<time_t> min_unprocessed_insert_time_changed;
bool max_processed_insert_time_changed = false; std::optional<time_t> max_processed_insert_time_changed;
/// Remove operations with parts, contained in the range to be deleted, from the queue. /// Remove operations with parts, contained in the range to be deleted, from the queue.
std::unique_lock<std::mutex> lock(mutex); std::unique_lock<std::mutex> lock(mutex);

View File

@ -1,5 +1,7 @@
#pragma once #pragma once
#include <optional>
#include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h> #include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h>
#include <Storages/MergeTree/ActiveDataPartSet.h> #include <Storages/MergeTree/ActiveDataPartSet.h>
#include <Storages/MergeTree/MergeTreeData.h> #include <Storages/MergeTree/MergeTreeData.h>
@ -87,7 +89,7 @@ private:
/// Load (initialize) a queue from ZooKeeper (/replicas/me/queue/). /// Load (initialize) a queue from ZooKeeper (/replicas/me/queue/).
bool load(zkutil::ZooKeeperPtr zookeeper); bool load(zkutil::ZooKeeperPtr zookeeper);
void insertUnlocked(LogEntryPtr & entry, std::lock_guard<std::mutex> &); void insertUnlocked(LogEntryPtr & entry, std::optional<time_t> & min_unprocessed_insert_time_changed, std::lock_guard<std::mutex> &);
void remove(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry); void remove(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry);
@ -104,11 +106,15 @@ private:
/// After removing the queue element, update the insertion times in the RAM. Running under queue_mutex. /// After removing the queue element, update the insertion times in the RAM. Running under queue_mutex.
/// Returns information about what times have changed - this information can be passed to updateTimesInZooKeeper. /// Returns information about what times have changed - this information can be passed to updateTimesInZooKeeper.
void updateTimesOnRemoval(const LogEntryPtr & entry, bool & min_unprocessed_insert_time_changed, bool & max_processed_insert_time_changed, void updateTimesOnRemoval(const LogEntryPtr & entry,
std::optional<time_t> & min_unprocessed_insert_time_changed,
std::optional<time_t> & max_processed_insert_time_changed,
std::unique_lock<std::mutex> &); std::unique_lock<std::mutex> &);
/// Update the insertion times in ZooKeeper. /// Update the insertion times in ZooKeeper.
void updateTimesInZooKeeper(zkutil::ZooKeeperPtr zookeeper, bool min_unprocessed_insert_time_changed, bool max_processed_insert_time_changed) const; void updateTimesInZooKeeper(zkutil::ZooKeeperPtr zookeeper,
std::optional<time_t> min_unprocessed_insert_time_changed,
std::optional<time_t> max_processed_insert_time_changed) const;
/// Returns list of currently executing entries blocking execution of specified CLEAR_COLUMN command /// Returns list of currently executing entries blocking execution of specified CLEAR_COLUMN command
Queue getConflictsForClearColumnCommand(const LogEntry & entry, String * out_conflicts_description, std::lock_guard<std::mutex> &) const; Queue getConflictsForClearColumnCommand(const LogEntry & entry, String * out_conflicts_description, std::lock_guard<std::mutex> &) const;