Merge pull request #4839 from yandex/fix-race-condition-in-fetch-data-part

Fixed TSan report in fetchPart
This commit is contained in:
alexey-milovidov 2019-03-29 11:27:21 +03:00 committed by GitHub
commit 3e3ab8acb7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 24 additions and 25 deletions

View File

@ -611,7 +611,7 @@ String MergeTreeData::MergingParams::getModeName() const
Int64 MergeTreeData::getMaxBlockNumber()
{
std::lock_guard lock_all(data_parts_mutex);
auto lock = lockParts();
Int64 max_block_num = 0;
for (const DataPartPtr & part : data_parts_by_info)
@ -640,7 +640,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
DataPartsVector broken_parts_to_detach;
size_t suspicious_broken_parts = 0;
std::lock_guard lock(data_parts_mutex);
auto lock = lockParts();
data_parts_indexes.clear();
for (const String & file_name : part_file_names)
@ -866,7 +866,7 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts()
std::vector<DataPartIteratorByStateAndInfo> parts_to_delete;
{
std::lock_guard lock_parts(data_parts_mutex);
auto parts_lock = lockParts();
auto outdated_parts_range = getDataPartsStateRange(DataPartState::Outdated);
for (auto it = outdated_parts_range.begin(); it != outdated_parts_range.end(); ++it)
@ -900,7 +900,7 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts()
void MergeTreeData::rollbackDeletingParts(const MergeTreeData::DataPartsVector & parts)
{
std::lock_guard lock(data_parts_mutex);
auto lock = lockParts();
for (auto & part : parts)
{
/// We should modify it under data_parts_mutex
@ -912,7 +912,7 @@ void MergeTreeData::rollbackDeletingParts(const MergeTreeData::DataPartsVector &
void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & parts)
{
{
std::lock_guard lock(data_parts_mutex);
auto lock = lockParts();
/// TODO: use data_parts iterators instead of pointers
for (auto & part : parts)
@ -980,7 +980,7 @@ void MergeTreeData::dropAllData()
{
LOG_TRACE(log, "dropAllData: waiting for locks.");
std::lock_guard lock(data_parts_mutex);
auto lock = lockParts();
LOG_TRACE(log, "dropAllData: removing data from memory.");
@ -1717,7 +1717,7 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
DataPartsVector covered_parts;
{
std::unique_lock lock(data_parts_mutex);
auto lock = lockParts();
renameTempPartAndReplace(part, increment, out_transaction, lock, &covered_parts);
}
return covered_parts;
@ -1814,7 +1814,7 @@ restore_covered)
{
LOG_INFO(log, "Renaming " << part_to_detach->relative_path << " to " << prefix << part_to_detach->name << " and forgiving it.");
auto data_parts_lock = lockParts();
auto lock = lockParts();
auto it_part = data_parts_by_info.find(part_to_detach->info);
if (it_part == data_parts_by_info.end())
@ -1931,7 +1931,7 @@ void MergeTreeData::tryRemovePartImmediately(DataPartPtr && part)
{
DataPartPtr part_to_delete;
{
std::lock_guard lock_parts(data_parts_mutex);
auto lock = lockParts();
LOG_TRACE(log, "Trying to immediately remove part " << part->getNameWithState());
@ -1967,7 +1967,7 @@ size_t MergeTreeData::getTotalActiveSizeInBytes() const
{
size_t res = 0;
{
std::lock_guard lock(data_parts_mutex);
auto lock = lockParts();
for (auto & part : getDataPartsStateRange(DataPartState::Committed))
res += part->bytes_on_disk;
@ -1979,7 +1979,7 @@ size_t MergeTreeData::getTotalActiveSizeInBytes() const
size_t MergeTreeData::getMaxPartsCountForPartition() const
{
std::lock_guard lock(data_parts_mutex);
auto lock = lockParts();
size_t res = 0;
size_t cur_count = 0;
@ -2006,7 +2006,7 @@ size_t MergeTreeData::getMaxPartsCountForPartition() const
std::optional<Int64> MergeTreeData::getMinPartDataVersion() const
{
std::lock_guard lock(data_parts_mutex);
auto lock = lockParts();
std::optional<Int64> result;
for (const DataPartPtr & part : getDataPartsStateRange(DataPartState::Committed))
@ -2088,8 +2088,8 @@ MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const MergeTreePartInfo & part_info)
{
DataPartsLock data_parts_lock(data_parts_mutex);
return getActiveContainingPart(part_info, DataPartState::Committed, data_parts_lock);
auto lock = lockParts();
return getActiveContainingPart(part_info, DataPartState::Committed, lock);
}
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String & part_name)
@ -2103,7 +2103,7 @@ MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVectorInPartition(Merg
{
DataPartStateAndPartitionID state_with_partition{state, partition_id};
std::lock_guard lock(data_parts_mutex);
auto lock = lockParts();
return DataPartsVector(
data_parts_by_state_and_info.lower_bound(state_with_partition),
data_parts_by_state_and_info.upper_bound(state_with_partition));
@ -2112,7 +2112,7 @@ MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVectorInPartition(Merg
MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const MergeTreePartInfo & part_info, const MergeTreeData::DataPartStates & valid_states)
{
std::lock_guard lock(data_parts_mutex);
auto lock = lockParts();
auto it = data_parts_by_info.find(part_info);
if (it == data_parts_by_info.end())
@ -2331,7 +2331,7 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, const Context
String partition_id = partition.getID(*this);
{
DataPartsLock data_parts_lock(data_parts_mutex);
auto data_parts_lock = lockParts();
DataPartPtr existing_part_in_partition = getAnyPartInPartition(partition_id, data_parts_lock);
if (existing_part_in_partition && existing_part_in_partition->partition.value != partition.value)
{
@ -2352,7 +2352,7 @@ MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector(const DataPartS
DataPartsVector res;
DataPartsVector buf;
{
std::lock_guard lock(data_parts_mutex);
auto lock = lockParts();
for (auto state : affordable_states)
{
@ -2378,7 +2378,7 @@ MergeTreeData::DataPartsVector MergeTreeData::getAllDataPartsVector(MergeTreeDat
{
DataPartsVector res;
{
std::lock_guard lock(data_parts_mutex);
auto lock = lockParts();
res.assign(data_parts_by_info.begin(), data_parts_by_info.end());
if (out_states != nullptr)
@ -2396,7 +2396,7 @@ MergeTreeData::DataParts MergeTreeData::getDataParts(const DataPartStates & affo
{
DataParts res;
{
std::lock_guard lock(data_parts_mutex);
auto lock = lockParts();
for (auto state : affordable_states)
{
auto range = getDataPartsStateRange(state);

View File

@ -538,8 +538,7 @@ public:
size_t getColumnCompressedSize(const std::string & name) const
{
std::lock_guard lock{data_parts_mutex};
auto lock = lockParts();
const auto it = column_sizes.find(name);
return it == std::end(column_sizes) ? 0 : it->second.data_compressed;
}
@ -547,14 +546,14 @@ public:
using ColumnSizeByName = std::unordered_map<std::string, DataPart::ColumnSize>;
ColumnSizeByName getColumnSizes() const
{
std::lock_guard lock{data_parts_mutex};
auto lock = lockParts();
return column_sizes;
}
/// Calculates column sizes in compressed form for the current state of data_parts.
void recalculateColumnSizes()
{
std::lock_guard lock{data_parts_mutex};
auto lock = lockParts();
calculateColumnSizesImpl();
}

View File

@ -2655,7 +2655,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
if (auto part = data.getPartIfExists(part_info, {MergeTreeDataPart::State::Outdated, MergeTreeDataPart::State::Deleting}))
{
LOG_DEBUG(log, "Part " << part->getNameWithState() << " should be deleted after previous attempt before fetch");
LOG_DEBUG(log, "Part " << part->name << " should be deleted after previous attempt before fetch");
/// Force immediate parts cleanup to delete the part that was left from the previous fetch attempt.
cleanup_thread.wakeup();
return false;