mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-10-23 08:50:49 +00:00
Add multi index for data_parts storage. And fixed bugs. [#CLICKHOUSE-3452]
Fixed handling of obsolete parts. Fixed conflict resolution between simultaneous PreCommitted covering parts. Fixed memory leak caused by ordinary MergeTree parts stucked in Deleting state. Added hidden _state column into system.parts.
This commit is contained in:
parent
40b5fb292e
commit
9229961721
2
contrib/poco
vendored
2
contrib/poco
vendored
@ -1 +1 @@
|
||||
Subproject commit 1366df1c7e068bb2efd846bc8dc8e286b090904e
|
||||
Subproject commit bcf9ebad48b2162d25f5fc432b176d74a09f498d
|
@ -386,6 +386,7 @@ namespace ErrorCodes
|
||||
extern const int HTTP_LENGTH_REQUIRED = 381;
|
||||
extern const int CANNOT_LOAD_CATBOOST_MODEL = 382;
|
||||
extern const int CANNOT_APPLY_CATBOOST_MODEL = 383;
|
||||
extern const int PART_IS_TEMPORARILY_LOCKED = 384;
|
||||
|
||||
extern const int KEEPER_EXCEPTION = 999;
|
||||
extern const int POCO_EXCEPTION = 1000;
|
||||
|
@ -74,6 +74,7 @@ namespace ErrorCodes
|
||||
extern const int CORRUPTED_DATA;
|
||||
extern const int INVALID_PARTITION_VALUE;
|
||||
extern const int METADATA_MISMATCH;
|
||||
extern const int PART_IS_TEMPORARILY_LOCKED;
|
||||
}
|
||||
|
||||
|
||||
@ -106,7 +107,9 @@ MergeTreeData::MergeTreeData(
|
||||
database_name(database_), table_name(table_),
|
||||
full_path(full_path_), columns(columns_),
|
||||
broken_part_callback(broken_part_callback_),
|
||||
log_name(log_name_), log(&Logger::get(log_name + " (Data)"))
|
||||
log_name(log_name_), log(&Logger::get(log_name + " (Data)")),
|
||||
data_parts_by_name(data_parts_indexes.get<TagByName>()),
|
||||
data_parts_by_state_and_name(data_parts_indexes.get<TagByStateAndName>())
|
||||
{
|
||||
merging_params.check(*columns);
|
||||
|
||||
@ -381,7 +384,7 @@ Int64 MergeTreeData::getMaxDataPartIndex()
|
||||
std::lock_guard<std::mutex> lock_all(data_parts_mutex);
|
||||
|
||||
Int64 max_block_id = 0;
|
||||
for (const auto & part : data_parts)
|
||||
for (const DataPartPtr & part : data_parts_by_name)
|
||||
max_block_id = std::max(max_block_id, part->info.max_block);
|
||||
|
||||
return max_block_id;
|
||||
@ -392,9 +395,6 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
||||
{
|
||||
LOG_DEBUG(log, "Loading data parts");
|
||||
|
||||
std::lock_guard<std::mutex> lock(data_parts_mutex);
|
||||
data_parts.clear();
|
||||
|
||||
Strings part_file_names;
|
||||
Poco::DirectoryIterator end;
|
||||
for (Poco::DirectoryIterator it(full_path); it != end; ++it)
|
||||
@ -410,6 +410,9 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
||||
DataPartsVector broken_parts_to_detach;
|
||||
size_t suspicious_broken_parts = 0;
|
||||
|
||||
std::lock_guard<std::mutex> lock(data_parts_mutex);
|
||||
data_parts_indexes.clear();
|
||||
|
||||
for (const String & file_name : part_file_names)
|
||||
{
|
||||
MergeTreePartInfo part_info;
|
||||
@ -496,7 +499,8 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
||||
/// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later
|
||||
part->state = DataPartState::Committed;
|
||||
|
||||
data_parts.insert(part);
|
||||
if (!data_parts_indexes.insert(part).second)
|
||||
throw Exception("Part " + part->name + " already exists", ErrorCodes::DUPLICATE_DATA_PART);
|
||||
}
|
||||
|
||||
if (suspicious_broken_parts > settings.max_suspicious_broken_parts && !skip_sanity_checks)
|
||||
@ -512,13 +516,21 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
||||
/// were merged), but that for some reason are still not deleted from the filesystem.
|
||||
/// Deletion of files will be performed later in the clearOldParts() method.
|
||||
|
||||
if (data_parts.size() >= 2)
|
||||
if (data_parts_indexes.size() >= 2)
|
||||
{
|
||||
auto committed_parts = getDataPartsRange({DataPartState::Committed});
|
||||
auto prev_jt = committed_parts.begin();
|
||||
/// Now all parts are committed, so data_parts_by_state_and_name == committed_parts_range
|
||||
auto prev_jt = data_parts_by_state_and_name.begin();
|
||||
auto curr_jt = std::next(prev_jt);
|
||||
|
||||
while (curr_jt != committed_parts.end())
|
||||
auto deactivate_part = [&] (DataPartIteratorByStateAndName it)
|
||||
{
|
||||
(*it)->remove_time = (*it)->modification_time;
|
||||
modifyPartState(it, DataPartState::Outdated);
|
||||
};
|
||||
|
||||
(*prev_jt)->assertState({DataPartState::Committed});
|
||||
|
||||
while (curr_jt != data_parts_by_state_and_name.end() && (*curr_jt)->state == DataPartState::Committed)
|
||||
{
|
||||
/// Don't consider data parts belonging to different partitions.
|
||||
if ((*curr_jt)->info.partition_id != (*prev_jt)->info.partition_id)
|
||||
@ -530,16 +542,15 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
||||
|
||||
if ((*curr_jt)->contains(**prev_jt))
|
||||
{
|
||||
(*prev_jt)->remove_time = (*prev_jt)->modification_time;
|
||||
(*prev_jt)->state = DataPartState::Outdated; /// prev_jt becomes invalid here
|
||||
deactivate_part(prev_jt);
|
||||
prev_jt = curr_jt;
|
||||
++curr_jt;
|
||||
}
|
||||
else if ((*prev_jt)->contains(**curr_jt))
|
||||
{
|
||||
(*curr_jt)->remove_time = (*curr_jt)->modification_time;
|
||||
(*curr_jt)->state = DataPartState::Outdated; /// curr_jt becomes invalid here
|
||||
++curr_jt;
|
||||
auto next = std::next(curr_jt);
|
||||
deactivate_part(curr_jt);
|
||||
curr_jt = next;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -551,7 +562,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
||||
|
||||
calculateColumnSizesImpl();
|
||||
|
||||
LOG_DEBUG(log, "Loaded data parts (" << data_parts.size() << " items)");
|
||||
LOG_DEBUG(log, "Loaded data parts (" << data_parts_indexes.size() << " items)");
|
||||
}
|
||||
|
||||
|
||||
@ -619,21 +630,30 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts()
|
||||
return res;
|
||||
|
||||
time_t now = time(nullptr);
|
||||
std::vector<DataPartIteratorByStateAndName> parts_to_delete;
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock_parts(data_parts_mutex);
|
||||
|
||||
for (auto it = data_parts.begin(); it != data_parts.end(); ++it)
|
||||
auto outdated_parts_range = getDataPartsStateRange(DataPartState::Outdated);
|
||||
for (auto it = outdated_parts_range.begin(); it != outdated_parts_range.end(); ++it)
|
||||
{
|
||||
if ((*it)->state == DataPartState::Outdated &&
|
||||
it->unique() && /// Grab only parts that is not using by anyone (SELECTs for example)
|
||||
(*it)->remove_time < now &&
|
||||
now - (*it)->remove_time > settings.old_parts_lifetime.totalSeconds())
|
||||
const DataPartPtr & part = *it;
|
||||
|
||||
if (part.unique() && /// Grab only parts that is not using by anyone (SELECTs for example)
|
||||
part->remove_time < now &&
|
||||
now - part->remove_time > settings.old_parts_lifetime.totalSeconds())
|
||||
{
|
||||
(*it)->state = DataPartState::Deleting;
|
||||
res.push_back(*it);
|
||||
parts_to_delete.emplace_back(it);
|
||||
}
|
||||
}
|
||||
|
||||
res.reserve(parts_to_delete.size());
|
||||
for (const auto & it_to_delete : parts_to_delete)
|
||||
{
|
||||
res.emplace_back(*it_to_delete);
|
||||
modifyPartState(it_to_delete, DataPartState::Deleting);
|
||||
}
|
||||
}
|
||||
|
||||
if (!res.empty())
|
||||
@ -650,7 +670,7 @@ void MergeTreeData::rollbackDeletingParts(const MergeTreeData::DataPartsVector &
|
||||
{
|
||||
/// We should modify it under data_parts_mutex
|
||||
part->assertState({DataPartState::Deleting});
|
||||
part->state = DataPartState::Outdated;
|
||||
modifyPartState(part, DataPartState::Outdated);
|
||||
}
|
||||
}
|
||||
|
||||
@ -661,26 +681,27 @@ void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & pa
|
||||
/// TODO: use data_parts iterators instead of pointers
|
||||
for (auto & part : parts)
|
||||
{
|
||||
if (part->state != DataPartState::Deleting)
|
||||
throw Exception("An attempt to delete part " + part->getNameWithState() + " with unexpected state", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
auto it = data_parts.find(part);
|
||||
if (it == data_parts.end())
|
||||
auto it = data_parts_by_name.find(part->info);
|
||||
if (it == data_parts_by_name.end())
|
||||
throw Exception("Deleting data part " + part->name + " is not exist", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
data_parts.erase(it);
|
||||
(*it)->assertState({DataPartState::Deleting});
|
||||
|
||||
data_parts_indexes.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
void MergeTreeData::clearOldParts()
|
||||
void MergeTreeData::clearOldPartsFromFilesystem()
|
||||
{
|
||||
auto parts_to_remove = grabOldParts();
|
||||
|
||||
for (const DataPartPtr & part : parts_to_remove)
|
||||
{
|
||||
LOG_DEBUG(log, "Removing part " << part->name);
|
||||
LOG_DEBUG(log, "Removing part from filesystem " << part->name);
|
||||
part->remove();
|
||||
}
|
||||
|
||||
removePartsFinally(parts_to_remove);
|
||||
}
|
||||
|
||||
void MergeTreeData::setPath(const String & new_full_path, bool move_data)
|
||||
@ -710,7 +731,7 @@ void MergeTreeData::dropAllData()
|
||||
|
||||
LOG_TRACE(log, "dropAllData: removing data from memory.");
|
||||
|
||||
data_parts.clear();
|
||||
data_parts_indexes.clear();
|
||||
column_sizes.clear();
|
||||
|
||||
context.dropCaches();
|
||||
@ -1319,9 +1340,13 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
|
||||
|
||||
part->assertState({DataPartState::Temporary});
|
||||
|
||||
DataPartsVector replaced;
|
||||
MergeTreePartInfo part_info = part->info;
|
||||
String part_name;
|
||||
|
||||
DataPartsVector replaced_parts;
|
||||
std::vector<DataPartIteratorByStateAndName> replaced_iterators;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(data_parts_mutex);
|
||||
std::unique_lock<std::mutex> lock(data_parts_mutex);
|
||||
|
||||
if (DataPartPtr existing_part_in_partition = getAnyPartInPartition(part->info.partition_id, lock))
|
||||
{
|
||||
@ -1336,141 +1361,163 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
|
||||
* Otherwise there is race condition - merge of blocks could happen in interval that doesn't yet contain new part.
|
||||
*/
|
||||
if (increment)
|
||||
part->info.min_block = part->info.max_block = increment->get();
|
||||
part_info.min_block = part_info.max_block = increment->get();
|
||||
|
||||
String new_name;
|
||||
if (format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
|
||||
new_name = part->info.getPartNameV0(part->getMinDate(), part->getMaxDate());
|
||||
part_name = part_info.getPartNameV0(part->getMinDate(), part->getMaxDate());
|
||||
else
|
||||
new_name = part->info.getPartName();
|
||||
part_name = part_info.getPartName();
|
||||
|
||||
LOG_TRACE(log, "Renaming temporary part " << part->relative_path << " to " << new_name << ".");
|
||||
LOG_TRACE(log, "Renaming temporary part " << part->relative_path << " to " << part_name << ".");
|
||||
|
||||
auto it_duplicate = data_parts.find(part);
|
||||
if (it_duplicate != data_parts.end())
|
||||
auto it_duplicate = data_parts_by_name.find(part_info);
|
||||
if (it_duplicate != data_parts_by_name.end())
|
||||
{
|
||||
String message = "Part " + (*it_duplicate)->getNameWithState() + " already exists";
|
||||
|
||||
if ((*it_duplicate)->checkState({DataPartState::Outdated, DataPartState::Deleting}))
|
||||
message += ", but it will be deleted soon";
|
||||
{
|
||||
throw Exception(message + ", but it will be deleted soon", ErrorCodes::PART_IS_TEMPORARILY_LOCKED);
|
||||
}
|
||||
|
||||
throw Exception(message, ErrorCodes::DUPLICATE_DATA_PART);
|
||||
}
|
||||
|
||||
/// Rename the part only in memory. Will rename it on disk only if all check is passed.
|
||||
/// It allows us maintain invariant: if non-temporary parts in filesystem then they are in data_parts
|
||||
part->name = new_name;
|
||||
/// Check that part is not covered and doesn't cover other in-progress parts, it makes sense only for Replicated* engines
|
||||
if (out_transaction)
|
||||
{
|
||||
auto check_coverage = [&part_info, &part_name] (const DataPartPtr & part)
|
||||
{
|
||||
if (part_info.contains(part->info))
|
||||
{
|
||||
throw Exception("Cannot add part " + part_name + " covering pre-committed part " + part->name, ErrorCodes::PART_IS_TEMPORARILY_LOCKED);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (part->info.contains(part_info))
|
||||
throw Exception("Cannot add part " + part_name + " covered by pre-committed part " + part->name + ". It is a bug", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
};
|
||||
|
||||
auto it_middle = data_parts_by_state_and_name.lower_bound(DataPartStateAndInfo(DataPartState::PreCommitted, part_info));
|
||||
|
||||
auto precommitted_parts_range = getDataPartsStateRange(DataPartState::PreCommitted);
|
||||
|
||||
for (auto it = it_middle; it != precommitted_parts_range.begin();)
|
||||
{
|
||||
--it;
|
||||
check_coverage(*it);
|
||||
}
|
||||
|
||||
for (auto it = it_middle; it != precommitted_parts_range.end();)
|
||||
{
|
||||
check_coverage(*it);
|
||||
++it;
|
||||
}
|
||||
}
|
||||
|
||||
/// Is the part covered by some other part?
|
||||
bool obsolete = false;
|
||||
DataPartPtr covering_part;
|
||||
|
||||
auto check_replacing_part_state = [&] (const DataPartPtr & cur_part)
|
||||
{
|
||||
cur_part->assertState({DataPartState::PreCommitted, DataPartState::Committed});
|
||||
if (cur_part->state == DataPartState::PreCommitted)
|
||||
throw Exception("Could not add part " + new_name + " while replacing part " + cur_part->name + " is in pre-committed state", ErrorCodes::LOGICAL_ERROR);
|
||||
};
|
||||
auto it_middle = data_parts_by_state_and_name.lower_bound(DataPartStateAndInfo(DataPartState::Committed, part_info));
|
||||
|
||||
/// Don't consider parts going to be deleted
|
||||
auto active_parts = getDataPartsRange({DataPartState::Committed, DataPartState::PreCommitted});
|
||||
/// Parts contained in the part are consecutive in data_parts, intersecting the insertion place for the part itself.
|
||||
auto it_middle = active_parts.convert(data_parts.lower_bound(part));
|
||||
auto committed_parts_range = getDataPartsStateRange(DataPartState::Committed);
|
||||
|
||||
/// Go to the left.
|
||||
for (auto it = it_middle; it != active_parts.begin();)
|
||||
for (auto it = it_middle; it != committed_parts_range.begin();)
|
||||
{
|
||||
--it;
|
||||
|
||||
if (!part->contains(**it))
|
||||
if (!part_info.contains((*it)->info))
|
||||
{
|
||||
if ((*it)->contains(*part))
|
||||
obsolete = true;
|
||||
++it;
|
||||
if ((*it)->info.contains(part_info))
|
||||
covering_part = *it;
|
||||
break;
|
||||
}
|
||||
|
||||
check_replacing_part_state(*it);
|
||||
replaced.push_back(*it);
|
||||
// replaced.push_back(*it);
|
||||
// (*it)->remove_time = time(nullptr);
|
||||
// (*it)->state = replaced_parts_state;
|
||||
// removePartContributionToColumnSizes(*it);
|
||||
// data_parts.erase(it++); /// Yes, ++, not --.
|
||||
replaced_iterators.push_back(it);
|
||||
}
|
||||
|
||||
/// Parts must be in ascending order.
|
||||
std::reverse(replaced.begin(), replaced.end());
|
||||
std::reverse(replaced_iterators.begin(), replaced_iterators.end());
|
||||
|
||||
/// Go to the right.
|
||||
for (auto it = it_middle; it != active_parts.end();)
|
||||
for (auto it = it_middle; it != committed_parts_range.end();)
|
||||
{
|
||||
if ((*it)->name == part->name)
|
||||
throw Exception("Unexpected duplicate part " + part->getNameWithState() + ". It is a bug.", ErrorCodes::LOGICAL_ERROR);
|
||||
if ((*it)->name == part_name)
|
||||
throw Exception("Unexpected duplicate part " + (*it)->getNameWithState() + ". It is a bug.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
if (!part->contains(**it))
|
||||
if (!part_info.contains((*it)->info))
|
||||
{
|
||||
if ((*it)->contains(*part))
|
||||
obsolete = true;
|
||||
if ((*it)->info.contains(part_info))
|
||||
covering_part = *it;
|
||||
break;
|
||||
}
|
||||
|
||||
check_replacing_part_state(*it);
|
||||
replaced.push_back(*it);
|
||||
replaced_iterators.push_back(it);
|
||||
++it;
|
||||
// replaced.push_back(*it);
|
||||
// (*it)->remove_time = time(nullptr);
|
||||
// (*it)->state = replaced_parts_state;
|
||||
// removePartContributionToColumnSizes(*it);
|
||||
// data_parts.erase(it++);
|
||||
}
|
||||
|
||||
if (obsolete)
|
||||
if (covering_part)
|
||||
{
|
||||
LOG_WARNING(log, "Obsolete part " << part->name << " added");
|
||||
LOG_WARNING(log, "Tried to add obsolete part " << part_name << " covered by " << covering_part->getNameWithState());
|
||||
|
||||
/// It is a temporary part, we want to delete it from filesystem immediately
|
||||
/// Other fields remain the same
|
||||
part->remove_time = time(nullptr);
|
||||
/// I case of fail, we want to delete part from filesystem immediately (to avoid any conflicts)
|
||||
part->is_temp = true;
|
||||
|
||||
/// Nothing to commit or rollback
|
||||
if (out_transaction)
|
||||
{
|
||||
out_transaction->data = this;
|
||||
out_transaction->parts_to_add_on_rollback = {};
|
||||
out_transaction->parts_to_remove_on_rollback = {};
|
||||
}
|
||||
|
||||
/// We replaced nothing
|
||||
return {};
|
||||
}
|
||||
|
||||
/// All checks are passed. Now we can rename the part on disk.
|
||||
/// So, we maintain invariant: if a non-temporary part in filesystem then it is in data_parts
|
||||
///
|
||||
/// Ordinary MergeTree engines (they don't use out_transaction) commit parts immediately,
|
||||
/// whereas ReplicatedMergeTree uses intermediate PreCommitted state
|
||||
part->name = part_name;
|
||||
part->info = part_info;
|
||||
part->is_temp = false;
|
||||
part->state = (out_transaction) ? DataPartState::PreCommitted : DataPartState::Committed;
|
||||
part->renameTo(part_name);
|
||||
|
||||
data_parts_indexes.insert(part);
|
||||
|
||||
replaced_parts.reserve(replaced_iterators.size());
|
||||
for (auto it_replacing_part : replaced_iterators)
|
||||
replaced_parts.emplace_back(*it_replacing_part);
|
||||
|
||||
if (!out_transaction)
|
||||
{
|
||||
addPartContributionToColumnSizes(part);
|
||||
|
||||
auto current_time = time(nullptr);
|
||||
for (auto it_replacing_part : replaced_iterators)
|
||||
{
|
||||
(*it_replacing_part)->remove_time = current_time;
|
||||
modifyPartState(it_replacing_part, DataPartState::Outdated);
|
||||
removePartContributionToColumnSizes(*it_replacing_part);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Now we can rename part on filesystem
|
||||
part->is_temp = false;
|
||||
part->renameTo(new_name);
|
||||
|
||||
if (!out_transaction)
|
||||
{
|
||||
/// Ordinary MergeTree engines (they don't use out_transaction) commit parts immediately
|
||||
part->state = DataPartState::Committed;
|
||||
addPartContributionToColumnSizes(part);
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Whereas ReplicatedMergeTree uses intermediate PreCommitted state
|
||||
part->state = DataPartState::PreCommitted;
|
||||
}
|
||||
|
||||
data_parts.insert(part);
|
||||
|
||||
auto current_time = time(nullptr);
|
||||
for (auto & replacing_part : replaced)
|
||||
{
|
||||
if (!out_transaction)
|
||||
{
|
||||
replacing_part->remove_time = current_time;
|
||||
replacing_part->state = DataPartState::Outdated;
|
||||
removePartContributionToColumnSizes(replacing_part);
|
||||
}
|
||||
}
|
||||
out_transaction->data = this;
|
||||
out_transaction->parts_to_add_on_rollback = replaced_parts;
|
||||
out_transaction->parts_to_remove_on_rollback = {part};
|
||||
}
|
||||
}
|
||||
|
||||
if (out_transaction)
|
||||
{
|
||||
out_transaction->data = this;
|
||||
out_transaction->parts_to_add_on_rollback = replaced;
|
||||
out_transaction->parts_to_remove_on_rollback = {part};
|
||||
}
|
||||
|
||||
return replaced;
|
||||
return replaced_parts;
|
||||
}
|
||||
|
||||
void MergeTreeData::removePartsFromWorkingSet(const DataPartsVector & remove, bool clear_without_timeout)
|
||||
@ -1479,7 +1526,7 @@ void MergeTreeData::removePartsFromWorkingSet(const DataPartsVector & remove, bo
|
||||
|
||||
for (auto & part : remove)
|
||||
{
|
||||
if (!data_parts.count(part))
|
||||
if (!data_parts_by_name.count(part->info))
|
||||
throw Exception("Part " + part->getNameWithState() + " not found in data_parts", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
part->assertState({DataPartState::PreCommitted, DataPartState::Committed, DataPartState::Outdated});
|
||||
@ -1490,7 +1537,8 @@ void MergeTreeData::removePartsFromWorkingSet(const DataPartsVector & remove, bo
|
||||
{
|
||||
if (part->state == DataPartState::Committed)
|
||||
removePartContributionToColumnSizes(part);
|
||||
part->state = DataPartState::Outdated;
|
||||
|
||||
modifyPartState(part, DataPartState::Outdated);
|
||||
part->remove_time = remove_time;
|
||||
}
|
||||
}
|
||||
@ -1502,65 +1550,93 @@ void MergeTreeData::renameAndDetachPart(const DataPartPtr & part_to_detach, cons
|
||||
LOG_INFO(log, "Renaming " << part_to_detach->relative_path << " to " << prefix << part_to_detach->name << " and detaching it.");
|
||||
|
||||
std::lock_guard<std::mutex> lock(data_parts_mutex);
|
||||
//std::lock_guard<std::mutex> lock_all(all_data_parts_mutex);
|
||||
|
||||
auto it_part = data_parts.find(part_to_detach);
|
||||
if (it_part == data_parts.end())
|
||||
auto it_part = data_parts_by_name.find(part_to_detach->info);
|
||||
if (it_part == data_parts_by_name.end())
|
||||
throw Exception("No such data part " + part_to_detach->getNameWithState(), ErrorCodes::NO_SUCH_DATA_PART);
|
||||
|
||||
/// What if part_to_detach is reference to *it_part? Make a new owner just in case.
|
||||
auto part = *it_part;
|
||||
DataPartPtr part = *it_part;
|
||||
|
||||
removePartContributionToColumnSizes(part);
|
||||
part->state = DataPartState::Deleting;
|
||||
if (part->state == DataPartState::Committed)
|
||||
removePartContributionToColumnSizes(part);
|
||||
modifyPartState(it_part, DataPartState::Deleting);
|
||||
if (move_to_detached || !prefix.empty())
|
||||
part->renameAddPrefix(move_to_detached, prefix);
|
||||
data_parts_indexes.erase(it_part);
|
||||
|
||||
if (restore_covered && part->info.level == 0)
|
||||
{
|
||||
LOG_WARNING(log, "Will not recover parts covered by zero-level part " << part->name);
|
||||
return;
|
||||
}
|
||||
|
||||
if (restore_covered)
|
||||
{
|
||||
auto suitable_parts = getDataPartsRange({DataPartState::PreCommitted, DataPartState::Committed, DataPartState::Outdated});
|
||||
auto it = suitable_parts.convert(data_parts.lower_bound(part));
|
||||
|
||||
Strings restored;
|
||||
bool error = false;
|
||||
String error_parts;
|
||||
|
||||
Int64 pos = part->info.min_block;
|
||||
|
||||
if (it != suitable_parts.begin())
|
||||
auto is_appropriate_state = [] (DataPartState state)
|
||||
{
|
||||
--it;
|
||||
if (part->contains(**it))
|
||||
return state == DataPartState::Committed || state == DataPartState::Outdated;
|
||||
};
|
||||
|
||||
auto update_error = [&] (DataPartIteratorByAndName it)
|
||||
{
|
||||
error = true;
|
||||
error_parts += (*it)->getNameWithState() + " ";
|
||||
};
|
||||
|
||||
auto it_middle = data_parts_by_name.lower_bound(part->info);
|
||||
|
||||
/// Restore the leftmost part covered by the part
|
||||
if (it_middle != data_parts_by_name.begin())
|
||||
{
|
||||
auto it = std::prev(it_middle);
|
||||
|
||||
if (part->contains(**it) && is_appropriate_state((*it)->state))
|
||||
{
|
||||
/// Maybe, we must consider part level somehow
|
||||
if ((*it)->info.min_block != part->info.min_block)
|
||||
error = true;
|
||||
update_error(it);
|
||||
|
||||
if ((*it)->state != DataPartState::Committed)
|
||||
{
|
||||
addPartContributionToColumnSizes(*it);
|
||||
(*it)->state = DataPartState::Committed;
|
||||
modifyPartState(it, DataPartState::Committed); // iterator is not invalidated here
|
||||
}
|
||||
|
||||
pos = (*it)->info.max_block + 1;
|
||||
restored.push_back((*it)->name);
|
||||
}
|
||||
else
|
||||
error = true;
|
||||
++it;
|
||||
update_error(it);
|
||||
}
|
||||
else
|
||||
error = true;
|
||||
|
||||
for (; it != suitable_parts.end() && part->contains(**it); ++it)
|
||||
/// Restore "right" parts
|
||||
for (auto it = it_middle; it != data_parts_by_name.end() && part->contains(**it); ++it)
|
||||
{
|
||||
if ((*it)->info.min_block < pos)
|
||||
continue;
|
||||
|
||||
if (!is_appropriate_state((*it)->state))
|
||||
{
|
||||
update_error(it);
|
||||
continue;
|
||||
}
|
||||
|
||||
if ((*it)->info.min_block > pos)
|
||||
error = true;
|
||||
update_error(it);
|
||||
|
||||
if ((*it)->state != DataPartState::Committed)
|
||||
{
|
||||
addPartContributionToColumnSizes(*it);
|
||||
(*it)->state = DataPartState::Committed;
|
||||
modifyPartState(it, DataPartState::Committed);
|
||||
}
|
||||
|
||||
pos = (*it)->info.max_block + 1;
|
||||
@ -1576,18 +1652,24 @@ void MergeTreeData::renameAndDetachPart(const DataPartPtr & part_to_detach, cons
|
||||
}
|
||||
|
||||
if (error)
|
||||
LOG_ERROR(log, "The set of parts restored in place of " << part->name << " looks incomplete. There might or might not be a data loss.");
|
||||
{
|
||||
LOG_ERROR(log, "The set of parts restored in place of " << part->name << " looks incomplete."
|
||||
<< " There might or might not be a data loss."
|
||||
<< (error_parts.empty() ? "" : " Suspicious parts: " + error_parts));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
size_t MergeTreeData::getTotalActiveSizeInBytes() const
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(data_parts_mutex);
|
||||
|
||||
size_t res = 0;
|
||||
for (auto & part : getDataPartsRange({DataPartState::Committed}))
|
||||
res += part->size_in_bytes;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(data_parts_mutex);
|
||||
|
||||
for (auto & part : getDataPartsStateRange(DataPartState::Committed))
|
||||
res += part->size_in_bytes;
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
@ -1601,7 +1683,7 @@ size_t MergeTreeData::getMaxPartsCountForPartition() const
|
||||
size_t cur_count = 0;
|
||||
const String * cur_partition_id = nullptr;
|
||||
|
||||
for (const auto & part : getDataPartsRange({DataPartState::Committed}))
|
||||
for (const auto & part : getDataPartsStateRange(DataPartState::Committed))
|
||||
{
|
||||
if (cur_partition_id && part->info.partition_id == *cur_partition_id)
|
||||
{
|
||||
@ -1656,11 +1738,12 @@ MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String &
|
||||
|
||||
std::lock_guard<std::mutex> lock(data_parts_mutex);
|
||||
|
||||
/// The part can be covered only by the previous or the next one in data_parts.
|
||||
auto committed_parts = getDataPartsRange({DataPartState::Committed});
|
||||
auto it = committed_parts.convert(data_parts.lower_bound(part_info));
|
||||
auto committed_parts_range = getDataPartsStateRange(DataPartState::Committed);
|
||||
|
||||
if (it != committed_parts.end())
|
||||
/// The part can be covered only by the previous or the next one in data_parts.
|
||||
auto it = data_parts_by_state_and_name.lower_bound(DataPartStateAndInfo(DataPartState::Committed, part_info));
|
||||
|
||||
if (it != committed_parts_range.end())
|
||||
{
|
||||
if ((*it)->name == part_name)
|
||||
return *it;
|
||||
@ -1668,7 +1751,7 @@ MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String &
|
||||
return *it;
|
||||
}
|
||||
|
||||
if (it != committed_parts.begin())
|
||||
if (it != committed_parts_range.begin())
|
||||
{
|
||||
--it;
|
||||
if ((*it)->info.contains(part_info))
|
||||
@ -1685,10 +1768,15 @@ MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_na
|
||||
|
||||
std::lock_guard<std::mutex> lock(data_parts_mutex);
|
||||
|
||||
auto filtered_parts = getDataPartsRange(valid_states);
|
||||
auto it = filtered_parts.convert(data_parts.find(part_info));
|
||||
if (it != filtered_parts.end() && (*it)->name == part_name)
|
||||
return *it;
|
||||
auto it = data_parts_by_name.find(part_info);
|
||||
if (it == data_parts_by_name.end())
|
||||
return nullptr;
|
||||
|
||||
for (auto state : valid_states)
|
||||
{
|
||||
if ((*it)->state == state)
|
||||
return *it;
|
||||
}
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
@ -1742,7 +1830,8 @@ void MergeTreeData::calculateColumnSizesImpl()
|
||||
column_sizes.clear();
|
||||
|
||||
/// Take into account only committed parts
|
||||
for (const auto & part : getDataPartsRange({DataPartState::Committed}))
|
||||
auto committed_parts_range = getDataPartsStateRange(DataPartState::Committed);
|
||||
for (const auto & part : committed_parts_range)
|
||||
addPartContributionToColumnSizes(part);
|
||||
}
|
||||
|
||||
@ -1953,7 +2042,7 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, const Context
|
||||
String partition_id = partition.getID(*this);
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> data_parts_lock(data_parts_mutex);
|
||||
std::unique_lock<std::mutex> data_parts_lock(data_parts_mutex);
|
||||
DataPartPtr existing_part_in_partition = getAnyPartInPartition(partition_id, data_parts_lock);
|
||||
if (existing_part_in_partition && existing_part_in_partition->partition.value != partition.value)
|
||||
{
|
||||
@ -1969,28 +2058,48 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, const Context
|
||||
return partition_id;
|
||||
}
|
||||
|
||||
MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector(const DataPartStates & affordable_states) const
|
||||
MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector(const DataPartStates & affordable_states, DataPartStateVector * out_states) const
|
||||
{
|
||||
DataPartsVector res;
|
||||
DataPartsVector buf;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(data_parts_mutex);
|
||||
std::copy_if(data_parts.begin(), data_parts.end(), std::back_inserter(res), DataPart::getStatesFilter(affordable_states));
|
||||
|
||||
for (auto state : affordable_states)
|
||||
{
|
||||
buf = std::move(res);
|
||||
res.clear();
|
||||
|
||||
auto range = getDataPartsStateRange(state);
|
||||
std::merge(range.begin(), range.end(), buf.begin(), buf.end(), std::back_inserter(res), LessDataPart());
|
||||
}
|
||||
|
||||
if (out_states != nullptr)
|
||||
{
|
||||
out_states->resize(res.size());
|
||||
for (size_t i = 0; i < res.size(); ++i)
|
||||
(*out_states)[i] = res[i]->state;
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector(const MergeTreeData::DataPartStates & affordable_states,
|
||||
MergeTreeData::DataPartStateVector & out_states_snapshot) const
|
||||
MergeTreeData::DataPartsVector MergeTreeData::getAllDataPartsVector(MergeTreeData::DataPartStateVector * out_states) const
|
||||
{
|
||||
DataPartsVector res;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(data_parts_mutex);
|
||||
std::copy_if(data_parts.begin(), data_parts.end(), std::back_inserter(res), DataPart::getStatesFilter(affordable_states));
|
||||
res.assign(data_parts_by_name.begin(), data_parts_by_name.end());
|
||||
|
||||
out_states_snapshot.resize(res.size());
|
||||
for (size_t i = 0; i < res.size(); ++i)
|
||||
out_states_snapshot[i] = res[i]->state;
|
||||
if (out_states != nullptr)
|
||||
{
|
||||
out_states->resize(res.size());
|
||||
for (size_t i = 0; i < res.size(); ++i)
|
||||
(*out_states)[i] = res[i]->state;
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
@ -1999,7 +2108,11 @@ MergeTreeData::DataParts MergeTreeData::getDataParts(const DataPartStates & affo
|
||||
DataParts res;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(data_parts_mutex);
|
||||
std::copy_if(data_parts.begin(), data_parts.end(), std::inserter(res, res.end()), DataPart::getStatesFilter(affordable_states));
|
||||
for (auto state : affordable_states)
|
||||
{
|
||||
auto range = getDataPartsStateRange(state);
|
||||
res.insert(range.begin(), range.end());
|
||||
}
|
||||
}
|
||||
return res;
|
||||
}
|
||||
@ -2014,28 +2127,23 @@ MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector() const
|
||||
return getDataPartsVector({DataPartState::Committed});
|
||||
}
|
||||
|
||||
MergeTreeData::DataParts MergeTreeData::getAllDataParts() const
|
||||
{
|
||||
return getDataParts({DataPartState::PreCommitted, DataPartState::Committed, DataPartState::Outdated});
|
||||
}
|
||||
|
||||
MergeTreeData::DataPartPtr MergeTreeData::getAnyPartInPartition(
|
||||
const String & partition_id, std::lock_guard<std::mutex> & data_parts_lock)
|
||||
const String & partition_id, std::unique_lock<std::mutex> & data_parts_lock)
|
||||
{
|
||||
auto min_block = std::numeric_limits<Int64>::min();
|
||||
MergeTreePartInfo dummy_part_info(partition_id, min_block, min_block, 0);
|
||||
|
||||
auto committed_parts = getDataPartsRange({DataPartState::Committed});
|
||||
auto it = committed_parts.convert(data_parts.lower_bound(dummy_part_info));
|
||||
auto it = data_parts_by_state_and_name.lower_bound(DataPartStateAndInfo(DataPartState::Committed, dummy_part_info));
|
||||
|
||||
if (it != committed_parts.end() && (*it)->info.partition_id == partition_id)
|
||||
if (it != data_parts_by_state_and_name.end() && (*it)->state == DataPartState::Committed && (*it)->info.partition_id == partition_id)
|
||||
return *it;
|
||||
return {};
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void MergeTreeData::Transaction::rollback()
|
||||
{
|
||||
if (data && (!parts_to_remove_on_rollback.empty() || !parts_to_add_on_rollback.empty()))
|
||||
if (!isEmpty())
|
||||
{
|
||||
std::stringstream ss;
|
||||
if (!parts_to_remove_on_rollback.empty())
|
||||
@ -2057,14 +2165,19 @@ void MergeTreeData::Transaction::rollback()
|
||||
|
||||
/// PreCommitted -> Outdated
|
||||
replaceParts(DataPartState::Outdated, DataPartState::Committed, true);
|
||||
clear();
|
||||
}
|
||||
|
||||
clear();
|
||||
}
|
||||
|
||||
void MergeTreeData::Transaction::commit()
|
||||
{
|
||||
/// PreCommitted -> Committed, Committed -> Outdated
|
||||
replaceParts(DataPartState::Committed, DataPartState::Outdated, false);
|
||||
if (!isEmpty())
|
||||
{
|
||||
/// PreCommitted -> Committed, Committed -> Outdated
|
||||
replaceParts(DataPartState::Committed, DataPartState::Outdated, false);
|
||||
}
|
||||
|
||||
clear();
|
||||
}
|
||||
|
||||
@ -2088,9 +2201,9 @@ void MergeTreeData::Transaction::replaceParts(MergeTreeData::DataPartState move_
|
||||
/// If it is rollback then do nothing, else make it Outdated and remove their size contribution
|
||||
if (move_committed_to != DataPartState::Committed)
|
||||
{
|
||||
for (auto & part : committed_parts)
|
||||
for (const DataPartPtr & part : committed_parts)
|
||||
{
|
||||
part->state = move_committed_to;
|
||||
data->modifyPartState(part, move_committed_to);
|
||||
part->remove_time = remove_time;
|
||||
data->removePartContributionToColumnSizes(part);
|
||||
}
|
||||
@ -2099,7 +2212,7 @@ void MergeTreeData::Transaction::replaceParts(MergeTreeData::DataPartState move_
|
||||
/// If it is rollback just change state to Outdated, else change state to Committed and add their size contribution
|
||||
for (auto & part : precommitted_parts)
|
||||
{
|
||||
part->state = move_precommitted_to;
|
||||
data->modifyPartState(part, move_precommitted_to);
|
||||
if (move_precommitted_to == DataPartState::Committed)
|
||||
data->addPartContributionToColumnSizes(part);
|
||||
else
|
||||
|
@ -15,7 +15,10 @@
|
||||
#include <DataStreams/GraphiteRollupSortedBlockInputStream.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataPart.h>
|
||||
|
||||
#include <common/RangeFiltered.h>
|
||||
#include <boost/multi_index_container.hpp>
|
||||
#include <boost/multi_index/ordered_index.hpp>
|
||||
#include <boost/multi_index/global_fun.hpp>
|
||||
#include <boost/range/iterator_range_core.hpp>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -104,7 +107,16 @@ public:
|
||||
using DataPartStates = std::initializer_list<DataPartState>;
|
||||
using DataPartStateVector = std::vector<DataPartState>;
|
||||
|
||||
struct DataPartPtrLess
|
||||
/// Auxiliary structure for index comparison. Keep in mind lifetime of MergeTreePartInfo.
|
||||
struct DataPartStateAndInfo
|
||||
{
|
||||
DataPartState state;
|
||||
const MergeTreePartInfo & info;
|
||||
|
||||
DataPartStateAndInfo(DataPartState state, const MergeTreePartInfo & info) : state(state), info(info) {}
|
||||
};
|
||||
|
||||
struct LessDataPart
|
||||
{
|
||||
using is_transparent = void;
|
||||
|
||||
@ -113,11 +125,32 @@ public:
|
||||
bool operator()(const DataPartPtr & lhs, const DataPartPtr & rhs) const { return lhs->info < rhs->info; }
|
||||
};
|
||||
|
||||
using DataParts = std::set<DataPartPtr, DataPartPtrLess>;
|
||||
struct LessStateDataPart
|
||||
{
|
||||
using is_transparent = void;
|
||||
|
||||
bool operator() (const DataPartStateAndInfo & lhs, const DataPartStateAndInfo & rhs) const
|
||||
{
|
||||
return std::forward_as_tuple(static_cast<UInt8>(lhs.state), lhs.info)
|
||||
< std::forward_as_tuple(static_cast<UInt8>(rhs.state), rhs.info);
|
||||
}
|
||||
|
||||
bool operator() (DataPartStateAndInfo info, const DataPartState & state) const
|
||||
{
|
||||
return static_cast<size_t>(info.state) < static_cast<size_t>(state);
|
||||
}
|
||||
|
||||
bool operator() (const DataPartState & state, DataPartStateAndInfo info) const
|
||||
{
|
||||
return static_cast<size_t>(state) < static_cast<size_t>(info.state);
|
||||
}
|
||||
};
|
||||
|
||||
using DataParts = std::set<DataPartPtr, LessDataPart>;
|
||||
using DataPartsVector = std::vector<DataPartPtr>;
|
||||
|
||||
/// For resharding.
|
||||
using MutableDataParts = std::set<MutableDataPartPtr, DataPartPtrLess>;
|
||||
using MutableDataParts = std::set<MutableDataPartPtr, LessDataPart>;
|
||||
using PerShardDataParts = std::unordered_map<size_t, MutableDataPartPtr>;
|
||||
|
||||
/// Some operations on the set of parts return a Transaction object.
|
||||
@ -131,6 +164,11 @@ public:
|
||||
|
||||
void rollback();
|
||||
|
||||
bool isEmpty() const
|
||||
{
|
||||
return parts_to_add_on_rollback.empty() && parts_to_remove_on_rollback.empty();
|
||||
}
|
||||
|
||||
~Transaction()
|
||||
{
|
||||
try
|
||||
@ -310,22 +348,17 @@ public:
|
||||
|
||||
/// Returns a copy of the list so that the caller shouldn't worry about locks.
|
||||
DataParts getDataParts(const DataPartStates & affordable_states) const;
|
||||
DataPartsVector getDataPartsVector(const DataPartStates & affordable_states) const;
|
||||
DataPartsVector getDataPartsVector(const DataPartStates & affordable_states, DataPartStateVector & out_states_snapshot) const;
|
||||
/// Returns sorted list of the parts with specified states
|
||||
/// out_states will contain snapshot of each part state
|
||||
DataPartsVector getDataPartsVector(const DataPartStates & affordable_states, DataPartStateVector * out_states = nullptr) const;
|
||||
|
||||
/// Returns a virtual container iteration only through parts with specified states
|
||||
decltype(auto) getDataPartsRange(const DataPartStates & affordable_states) const
|
||||
{
|
||||
return createRangeFiltered(DataPart::getStatesFilter(affordable_states), data_parts);
|
||||
}
|
||||
/// Returns absolutely all parts (and snapshot of their states)
|
||||
DataPartsVector getAllDataPartsVector(DataPartStateVector * out_states = nullptr) const;
|
||||
|
||||
/// Returns Committed parts
|
||||
DataParts getDataParts() const;
|
||||
DataPartsVector getDataPartsVector() const;
|
||||
|
||||
/// Returns all parts except Temporary and Deleting ones
|
||||
DataParts getAllDataParts() const;
|
||||
|
||||
/// Returns an comitted part with the given name or a part containing it. If there is no such part, returns nullptr.
|
||||
DataPartPtr getActiveContainingPart(const String & part_name);
|
||||
|
||||
@ -375,8 +408,8 @@ public:
|
||||
/// Removes parts from data_parts, they should be in Deleting state
|
||||
void removePartsFinally(const DataPartsVector & parts);
|
||||
|
||||
/// Delete irrelevant parts.
|
||||
void clearOldParts();
|
||||
/// Delete irrelevant parts from memory and disk.
|
||||
void clearOldPartsFromFilesystem();
|
||||
|
||||
/// Deleate all directories which names begin with "tmp"
|
||||
/// Set non-negative parameter value to override MergeTreeSettings temporary_directories_lifetime
|
||||
@ -538,15 +571,81 @@ private:
|
||||
String log_name;
|
||||
Logger * log;
|
||||
|
||||
/// Current set of data parts.
|
||||
DataParts data_parts;
|
||||
mutable std::mutex data_parts_mutex;
|
||||
|
||||
/// The set of all data parts including already merged but not yet deleted. Usually it is small (tens of elements).
|
||||
/// The part is referenced from here, from the list of current parts and from each thread reading from it.
|
||||
/// This means that if reference count is 1 - the part is not used right now and can be deleted.
|
||||
// DataParts all_data_parts;
|
||||
// mutable std::mutex all_data_parts_mutex;
|
||||
/// Work with data parts
|
||||
|
||||
struct TagByName{};
|
||||
struct TagByStateAndName{};
|
||||
|
||||
static const MergeTreePartInfo & dataPartPtrToInfo(const DataPartPtr & part)
|
||||
{
|
||||
return part->info;
|
||||
}
|
||||
|
||||
static DataPartStateAndInfo dataPartPtrToStateAndInfo(const DataPartPtr & part)
|
||||
{
|
||||
return {part->state, part->info};
|
||||
};
|
||||
|
||||
using DataPartsIndexes = boost::multi_index_container<DataPartPtr,
|
||||
boost::multi_index::indexed_by<
|
||||
/// Index by Name
|
||||
boost::multi_index::ordered_unique<
|
||||
boost::multi_index::tag<TagByName>,
|
||||
boost::multi_index::global_fun<const DataPartPtr &, const MergeTreePartInfo &, dataPartPtrToInfo>
|
||||
>,
|
||||
/// Index by (State, Name), is used to obtain ordered slices of parts with the same state
|
||||
boost::multi_index::ordered_unique<
|
||||
boost::multi_index::tag<TagByStateAndName>,
|
||||
boost::multi_index::global_fun<const DataPartPtr &, DataPartStateAndInfo, dataPartPtrToStateAndInfo>,
|
||||
LessStateDataPart
|
||||
>
|
||||
>
|
||||
>;
|
||||
|
||||
/// Current set of data parts.
|
||||
mutable std::mutex data_parts_mutex;
|
||||
DataPartsIndexes data_parts_indexes;
|
||||
DataPartsIndexes::index<TagByName>::type & data_parts_by_name;
|
||||
DataPartsIndexes::index<TagByStateAndName>::type & data_parts_by_state_and_name;
|
||||
|
||||
using DataPartIteratorByAndName = DataPartsIndexes::index<TagByName>::type::iterator;
|
||||
using DataPartIteratorByStateAndName = DataPartsIndexes::index<TagByStateAndName>::type::iterator;
|
||||
|
||||
boost::iterator_range<DataPartIteratorByStateAndName> getDataPartsStateRange(DataPartState state) const
|
||||
{
|
||||
auto begin = data_parts_by_state_and_name.lower_bound(state, LessStateDataPart());
|
||||
auto end = data_parts_by_state_and_name.upper_bound(state, LessStateDataPart());
|
||||
return {begin, end};
|
||||
}
|
||||
|
||||
static decltype(auto) getStateModifier(DataPartState state)
|
||||
{
|
||||
return [state] (const DataPartPtr & part) { part->state = state; };
|
||||
}
|
||||
|
||||
void modifyPartState(DataPartIteratorByStateAndName it, DataPartState state)
|
||||
{
|
||||
if (!data_parts_by_state_and_name.modify(it, getStateModifier(state)))
|
||||
throw Exception("Can't modify " + (*it)->getNameWithState(), ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
void modifyPartState(DataPartIteratorByAndName it, DataPartState state)
|
||||
{
|
||||
if (!data_parts_by_state_and_name.modify(data_parts_indexes.project<TagByStateAndName>(it), getStateModifier(state)))
|
||||
throw Exception("Can't modify " + (*it)->getNameWithState(), ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
void modifyPartState(const DataPartPtr & part, DataPartState state)
|
||||
{
|
||||
auto it = data_parts_by_name.find(part->info);
|
||||
if (it == data_parts_by_name.end() || (*it).get() != part.get())
|
||||
throw Exception("Part " + part->name + " is not exists", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
if (!data_parts_by_state_and_name.modify(data_parts_indexes.project<TagByStateAndName>(it), getStateModifier(state)))
|
||||
throw Exception("Can't modify " + (*it)->getNameWithState(), ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
|
||||
/// Used to serialize calls to grabOldParts.
|
||||
std::mutex grab_old_parts_mutex;
|
||||
@ -582,7 +681,7 @@ private:
|
||||
void removePartContributionToColumnSizes(const DataPartPtr & part);
|
||||
|
||||
/// If there is no part in the partition with ID `partition_id`, returns empty ptr. Should be called under the lock.
|
||||
DataPartPtr getAnyPartInPartition(const String & partition_id, std::lock_guard<std::mutex> & data_parts_lock);
|
||||
DataPartPtr getAnyPartInPartition(const String & partition_id, std::unique_lock<std::mutex> & data_parts_lock);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -187,7 +187,7 @@ bool MergeTreeDataMerger::selectPartsToMerge(
|
||||
if (prev_part && part->info.partition_id == (*prev_part)->info.partition_id
|
||||
&& part->info.min_block < (*prev_part)->info.max_block)
|
||||
{
|
||||
LOG_ERROR(log, "Part " << part->name << " intersects previous part " << (*prev_part)->name);
|
||||
LOG_ERROR(log, "Part " << part->getNameWithState() << " intersects previous part " << (*prev_part)->getNameWithState());
|
||||
}
|
||||
|
||||
prev_part = ∂
|
||||
|
@ -38,6 +38,7 @@ namespace ErrorCodes
|
||||
extern const int FORMAT_VERSION_TOO_OLD;
|
||||
extern const int UNKNOWN_FORMAT;
|
||||
extern const int UNEXPECTED_FILE_IN_DATA_PART;
|
||||
extern const int NOT_FOUND_EXPECTED_DATA_PART;
|
||||
}
|
||||
|
||||
|
||||
@ -935,4 +936,16 @@ String MergeTreeDataPart::stateString() const
|
||||
return stateToString(state);
|
||||
}
|
||||
|
||||
void MergeTreeDataPart::assertState(const std::initializer_list<MergeTreeDataPart::State> & affordable_states) const
|
||||
{
|
||||
if (!checkState(affordable_states))
|
||||
{
|
||||
String states_str;
|
||||
for (auto state : affordable_states)
|
||||
states_str += stateToString(state) + " ";
|
||||
|
||||
throw Exception("Unexpected state of part " + getNameWithState() + ". Expected: " + states_str, ErrorCodes::NOT_FOUND_EXPECTED_DATA_PART);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -190,17 +190,7 @@ struct MergeTreeDataPart
|
||||
}
|
||||
|
||||
/// Throws an exception if state of the part is not in affordable_states
|
||||
void assertState(const std::initializer_list<State> & affordable_states) const
|
||||
{
|
||||
if (!checkState(affordable_states))
|
||||
{
|
||||
String states_str;
|
||||
for (auto state : affordable_states)
|
||||
states_str += stateToString(state) + " ";
|
||||
|
||||
throw Exception("Unexpected state of part " + getNameWithState() + ". Expected: " + states_str);
|
||||
}
|
||||
}
|
||||
void assertState(const std::initializer_list<State> & affordable_states) const;
|
||||
|
||||
/// In comparison with lambdas, it is move assignable and could has several overloaded operator()
|
||||
struct StatesFilter
|
||||
@ -327,4 +317,7 @@ private:
|
||||
void checkConsistency(bool require_part_metadata);
|
||||
};
|
||||
|
||||
|
||||
using MergeTreeDataPartState = MergeTreeDataPart::State;
|
||||
|
||||
}
|
||||
|
@ -68,7 +68,7 @@ StorageMergeTree::StorageMergeTree(
|
||||
}
|
||||
else
|
||||
{
|
||||
data.clearOldParts();
|
||||
data.clearOldPartsFromFilesystem();
|
||||
}
|
||||
|
||||
/// Temporary directories contain incomplete results of merges (after forced restart)
|
||||
@ -188,7 +188,7 @@ void StorageMergeTree::alter(
|
||||
if (primary_key_is_modified && supportsSampling())
|
||||
throw Exception("MODIFY PRIMARY KEY only supported for tables without sampling key", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
MergeTreeData::DataParts parts = data.getAllDataParts();
|
||||
auto parts = data.getDataParts({MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated});
|
||||
for (const MergeTreeData::DataPartPtr & part : parts)
|
||||
{
|
||||
if (auto transaction = data.alterDataPart(part, columns_for_parts, new_primary_key_ast, false))
|
||||
@ -291,7 +291,7 @@ bool StorageMergeTree::merge(
|
||||
/// Clear old parts. It does not matter to do it more frequently than each second.
|
||||
if (auto lock = time_after_previous_cleanup.lockTestAndRestartAfter(1))
|
||||
{
|
||||
data.clearOldParts();
|
||||
data.clearOldPartsFromFilesystem();
|
||||
data.clearOldTemporaryDirectories();
|
||||
}
|
||||
|
||||
|
@ -109,6 +109,7 @@ namespace ErrorCodes
|
||||
extern const int RECEIVED_ERROR_TOO_MANY_REQUESTS;
|
||||
extern const int TOO_MUCH_FETCHES;
|
||||
extern const int BAD_DATA_PART_NAME;
|
||||
extern const int PART_IS_TEMPORARILY_LOCKED;
|
||||
}
|
||||
|
||||
|
||||
@ -800,7 +801,7 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
|
||||
/// Parts in ZK.
|
||||
NameSet expected_parts(expected_parts_vec.begin(), expected_parts_vec.end());
|
||||
|
||||
MergeTreeData::DataParts parts = data.getAllDataParts();
|
||||
auto parts = data.getDataParts({MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated});
|
||||
|
||||
/// Local parts that are not in ZK.
|
||||
MergeTreeData::DataParts unexpected_parts;
|
||||
@ -1179,7 +1180,21 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
|
||||
if (!do_fetch)
|
||||
{
|
||||
merger.renameMergedTemporaryPart(part, parts, &transaction);
|
||||
getZooKeeper()->multi(ops); /// After long merge, get fresh ZK handle, because previous session may be expired.
|
||||
|
||||
/// Do not commit if the part is obsolete
|
||||
if (!transaction.isEmpty())
|
||||
{
|
||||
getZooKeeper()->multi(ops); /// After long merge, get fresh ZK handle, because previous session may be expired.
|
||||
transaction.commit();
|
||||
}
|
||||
|
||||
/** Removing old chunks from ZK and from the disk is delayed - see ReplicatedMergeTreeCleanupThread, clearOldParts.
|
||||
*/
|
||||
|
||||
/** With `ZCONNECTIONLOSS` or `ZOPERATIONTIMEOUT`, we can inadvertently roll back local changes to the parts.
|
||||
* This is not a problem, because in this case the merge will remain in the queue, and we will try again.
|
||||
*/
|
||||
merge_selecting_event.set();
|
||||
|
||||
if (auto part_log = context.getPartLog(database_name, table_name))
|
||||
{
|
||||
@ -1212,15 +1227,6 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
|
||||
}
|
||||
}
|
||||
|
||||
/** Removing old chunks from ZK and from the disk is delayed - see ReplicatedMergeTreeCleanupThread, clearOldParts.
|
||||
*/
|
||||
|
||||
/** With `ZCONNECTIONLOSS` or `ZOPERATIONTIMEOUT`, we can inadvertently roll back local changes to the parts.
|
||||
* This is not a problem, because in this case the merge will remain in the queue, and we will try again.
|
||||
*/
|
||||
transaction.commit();
|
||||
merge_selecting_event.set();
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);
|
||||
}
|
||||
}
|
||||
@ -1443,8 +1449,9 @@ void StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTr
|
||||
/// It's important that no old parts remain (after the merge), because otherwise,
|
||||
/// after adding a new replica, this new replica downloads them, but does not delete them.
|
||||
/// And, if you do not, the parts will come to life after the server is restarted.
|
||||
/// Therefore, we use getAllDataParts.
|
||||
auto parts = data.getAllDataParts();
|
||||
/// Therefore, we use all data parts.
|
||||
auto parts = data.getDataParts({MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated});
|
||||
|
||||
for (const auto & part : parts)
|
||||
{
|
||||
if (!entry_part_info.contains(part->info))
|
||||
@ -1616,6 +1623,11 @@ bool StorageReplicatedMergeTree::queueTask()
|
||||
/// Interrupted merge or downloading a part is not an error.
|
||||
LOG_INFO(log, e.message());
|
||||
}
|
||||
else if (e.code() == ErrorCodes::PART_IS_TEMPORARILY_LOCKED)
|
||||
{
|
||||
/// Part cannot be added temporarily
|
||||
LOG_INFO(log, e.displayText());
|
||||
}
|
||||
else
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
|
||||
@ -2205,6 +2217,13 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
|
||||
MergeTreeData::Transaction transaction;
|
||||
auto removed_parts = data.renameTempPartAndReplace(part, nullptr, &transaction);
|
||||
|
||||
/// Do not commit if the part is obsolete
|
||||
if (!transaction.isEmpty())
|
||||
{
|
||||
getZooKeeper()->multi(ops);
|
||||
transaction.commit();
|
||||
}
|
||||
|
||||
if (auto part_log = context.getPartLog(database_name, table_name))
|
||||
{
|
||||
PartLogElement elem;
|
||||
@ -2236,10 +2255,6 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
getZooKeeper()->multi(ops);
|
||||
transaction.commit();
|
||||
|
||||
/** If a quorum is tracked for this part, you must update it.
|
||||
* If you do not have time, in case of losing the session, when you restart the server - see the `ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart` method.
|
||||
*/
|
||||
|
@ -39,7 +39,7 @@ StorageSystemParts::StorageSystemParts(const std::string & name_)
|
||||
|
||||
{"database", std::make_shared<DataTypeString>()},
|
||||
{"table", std::make_shared<DataTypeString>()},
|
||||
{"engine", std::make_shared<DataTypeString>()},
|
||||
{"engine", std::make_shared<DataTypeString>()}
|
||||
}
|
||||
{
|
||||
}
|
||||
@ -53,9 +53,12 @@ BlockInputStreams StorageSystemParts::read(
|
||||
const size_t max_block_size,
|
||||
const unsigned num_streams)
|
||||
{
|
||||
check(column_names);
|
||||
//check(column_names);
|
||||
processed_stage = QueryProcessingStage::FetchColumns;
|
||||
|
||||
auto it_state_column = std::find(column_names.begin(), column_names.end(), "_state");
|
||||
bool has_state_column = it_state_column != column_names.end();
|
||||
|
||||
/// Will apply WHERE to subset of columns and then add more columns.
|
||||
/// This is kind of complicated, but we use WHERE to do less work.
|
||||
|
||||
@ -142,6 +145,8 @@ BlockInputStreams StorageSystemParts::read(
|
||||
/// Finally, create the result.
|
||||
|
||||
Block block = getSampleBlock();
|
||||
if (has_state_column)
|
||||
block.insert(ColumnWithTypeAndName(std::make_shared<DataTypeString>(), "_state"));
|
||||
|
||||
for (size_t i = 0; i < filtered_database_column->size();)
|
||||
{
|
||||
@ -198,10 +203,18 @@ BlockInputStreams StorageSystemParts::read(
|
||||
using State = MergeTreeDataPart::State;
|
||||
MergeTreeData::DataPartStateVector all_parts_state;
|
||||
MergeTreeData::DataPartsVector all_parts;
|
||||
|
||||
if (need[0])
|
||||
all_parts = data->getDataPartsVector({State::Committed, State::Outdated}, all_parts_state);
|
||||
{
|
||||
/// If has_state_column is requested, return all states
|
||||
if (!has_state_column)
|
||||
all_parts = data->getDataPartsVector({State::Committed, State::Outdated}, &all_parts_state);
|
||||
else
|
||||
all_parts = data->getAllDataPartsVector(&all_parts_state);
|
||||
}
|
||||
else
|
||||
all_parts = data->getDataPartsVector({State::Committed}, all_parts_state);
|
||||
all_parts = data->getDataPartsVector({State::Committed}, &all_parts_state);
|
||||
|
||||
|
||||
/// Finally, we'll go through the list of parts.
|
||||
for (size_t part_number = 0; part_number < all_parts.size(); ++part_number)
|
||||
@ -248,11 +261,30 @@ BlockInputStreams StorageSystemParts::read(
|
||||
block.getByPosition(i++).column->insert(database);
|
||||
block.getByPosition(i++).column->insert(table);
|
||||
block.getByPosition(i++).column->insert(engine);
|
||||
|
||||
if (has_state_column)
|
||||
block.getByPosition(i++).column->insert(part->stateString());
|
||||
}
|
||||
}
|
||||
|
||||
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(block));
|
||||
}
|
||||
|
||||
NameAndTypePair StorageSystemParts::getColumn(const String & column_name) const
|
||||
{
|
||||
if (column_name == "_state")
|
||||
return NameAndTypePair("_state", std::make_shared<DataTypeString>());
|
||||
|
||||
return ITableDeclaration::getColumn(column_name);
|
||||
}
|
||||
|
||||
bool StorageSystemParts::hasColumn(const String & column_name) const
|
||||
{
|
||||
if (column_name == "_state")
|
||||
return true;
|
||||
|
||||
return ITableDeclaration::hasColumn(column_name);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -21,6 +21,10 @@ public:
|
||||
|
||||
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
|
||||
|
||||
NameAndTypePair getColumn(const String & column_name) const override;
|
||||
|
||||
bool hasColumn(const String & column_name) const override;
|
||||
|
||||
BlockInputStreams read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
|
@ -1,44 +0,0 @@
|
||||
#include <gtest/gtest.h>
|
||||
#include <common/RangeFiltered.h>
|
||||
#include <vector>
|
||||
#include <set>
|
||||
|
||||
|
||||
TEST(RangeFiltered, simple)
|
||||
{
|
||||
std::vector<int> v;
|
||||
|
||||
for (int i = 0; i < 10; ++i)
|
||||
v.push_back(i);
|
||||
|
||||
auto v30 = createRangeFiltered([] (int i) { return i % 3 == 0; }, v);
|
||||
auto v31 = createRangeFiltered([] (int i) { return i % 3 != 0; }, v);
|
||||
|
||||
for (const int & i : v30)
|
||||
ASSERT_EQ(i % 3, 0);
|
||||
|
||||
for (const int & i : v31)
|
||||
ASSERT_NE(i % 3, 0);
|
||||
|
||||
{
|
||||
auto it = v30.begin();
|
||||
ASSERT_EQ(*it, 0);
|
||||
|
||||
auto it2 = std::next(it);
|
||||
ASSERT_EQ(*it2, 3);
|
||||
|
||||
it = std::next(it2);
|
||||
ASSERT_EQ(*it, 6);
|
||||
}
|
||||
|
||||
{
|
||||
auto it = std::next(v30.begin());
|
||||
ASSERT_EQ(*it, 3);
|
||||
|
||||
*it = 2; /// it becomes invalid
|
||||
ASSERT_EQ(*(++it), 6); /// but iteration is sucessfull
|
||||
|
||||
*v30.begin() = 1;
|
||||
ASSERT_EQ(*v30.begin(), 6);
|
||||
}
|
||||
}
|
@ -26,6 +26,7 @@ def started_cluster():
|
||||
pass
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def test_random_inserts(started_cluster):
|
||||
# Duration of the test, reduce it if don't want to wait
|
||||
DURATION_SECONDS = 10# * 60
|
||||
@ -55,7 +56,9 @@ def test_random_inserts(started_cluster):
|
||||
inserter.get_answer()
|
||||
|
||||
answer="{}\t{}\t{}\t{}\n".format(num_timestamps, num_timestamps, min_timestamp, max_timestamp)
|
||||
|
||||
for node in nodes:
|
||||
assert TSV(node.query("SELECT count(), uniqExact(i), min(i), max(i) FROM simple")) == TSV(answer), node.name + " : " + node.query("SELECT groupArray(_part), i, count() AS c FROM simple GROUP BY i ORDER BY c DESC LIMIT 1")
|
||||
res = node.query("SELECT count(), uniqExact(i), min(i), max(i) FROM simple")
|
||||
assert TSV(res) == TSV(answer), node.name + " : " + node.query("SELECT groupArray(_part), i, count() AS c FROM simple GROUP BY i ORDER BY c DESC LIMIT 1")
|
||||
|
||||
node1.query("""DROP TABLE simple ON CLUSTER test_cluster""")
|
||||
|
@ -4,6 +4,7 @@
|
||||
[[ -n "$1" ]] && host="$1" || host="127.0.0.1"
|
||||
[[ -n "$2" ]] && min_timestamp="$2" || min_timestamp=$(( $(date +%s) - 10 ))
|
||||
[[ -n "$3" ]] && max_timestamp="$3" || max_timestamp=$(( $(date +%s) + 10 ))
|
||||
[[ -n "$4" ]] && iters_per_timestamp="$4" || iters_per_timestamp=1
|
||||
|
||||
timestamps=`seq $min_timestamp $max_timestamp`
|
||||
|
||||
@ -40,6 +41,6 @@ for i in $timestamps; do
|
||||
cur_timestamp=$(date +%s)
|
||||
done
|
||||
|
||||
#echo $i >> $host".txt"
|
||||
reliable_insert "$i"
|
||||
done
|
||||
done
|
||||
sleep 1
|
||||
|
@ -1,127 +0,0 @@
|
||||
#pragma once
|
||||
#include <type_traits>
|
||||
|
||||
|
||||
/// Similar to boost::filtered_range but a little bit easier and allows to convert ordinary iterators to filtered
|
||||
template <typename F, typename C>
|
||||
struct RangeFiltered
|
||||
{
|
||||
/// Template parameter C may be const. Then const_iterator is used.
|
||||
using RawIterator = decltype(std::declval<C>().begin());
|
||||
class Iterator;
|
||||
|
||||
/// Will iterate over elements for which filter(*it) == true
|
||||
template <typename F_, typename C_> /// Another template for universal references to work.
|
||||
RangeFiltered(F_ && filter, C_ && container)
|
||||
: filter(std::move(filter)), container(container) {}
|
||||
|
||||
Iterator begin() const
|
||||
{
|
||||
return Iterator{*this, std::begin(container)};
|
||||
}
|
||||
|
||||
Iterator end() const
|
||||
{
|
||||
return Iterator{*this, std::end(container)};
|
||||
}
|
||||
|
||||
/// Convert ordinary iterator to filtered one
|
||||
/// Real position will be in range [ordinary_iterator; end()], so it is suitable to use with lower[upper]_bound()
|
||||
inline Iterator convert(RawIterator ordinary_iterator) const
|
||||
{
|
||||
return Iterator{*this, ordinary_iterator};
|
||||
}
|
||||
|
||||
|
||||
/// It is similar to boost::filtered_iterator, but has additional features:
|
||||
/// it doesn't store end() iterator
|
||||
/// it doesn't store predicate, so it allows to implement operator=()
|
||||
/// it guarantees that operator++() works properly in case of filter(*it) == false
|
||||
class Iterator
|
||||
{
|
||||
public:
|
||||
using Range = RangeFiltered<F, C>;
|
||||
|
||||
typedef Iterator self_type;
|
||||
typedef typename std::iterator_traits<RawIterator>::value_type value_type;
|
||||
typedef typename std::iterator_traits<RawIterator>::reference reference;
|
||||
typedef const value_type & const_reference;
|
||||
typedef typename std::iterator_traits<RawIterator>::pointer pointer;
|
||||
typedef const value_type * const_pointer;
|
||||
typedef typename std::iterator_traits<RawIterator>::difference_type difference_type;
|
||||
typedef std::bidirectional_iterator_tag iterator_category;
|
||||
|
||||
Iterator(const Range & range_, RawIterator iter_)
|
||||
: range(&range_), iter(iter_)
|
||||
{
|
||||
for (; iter != std::end(range->container) && !range->filter(*iter); ++iter);
|
||||
}
|
||||
|
||||
Iterator(const Iterator & rhs) = default;
|
||||
Iterator(Iterator && rhs) noexcept = default;
|
||||
|
||||
Iterator operator++()
|
||||
{
|
||||
++iter;
|
||||
for (; iter != std::end(range->container) && !range->filter(*iter); ++iter);
|
||||
return *this;
|
||||
}
|
||||
|
||||
Iterator operator--()
|
||||
{
|
||||
--iter;
|
||||
for (; !range->filter(*iter); --iter); /// Don't check std::begin() bound
|
||||
return *this;
|
||||
}
|
||||
|
||||
pointer operator->()
|
||||
{
|
||||
return iter.operator->();
|
||||
}
|
||||
|
||||
const_pointer operator->() const
|
||||
{
|
||||
return iter.operator->();
|
||||
}
|
||||
|
||||
reference operator*()
|
||||
{
|
||||
return *iter;
|
||||
}
|
||||
|
||||
const_reference operator*() const
|
||||
{
|
||||
return *iter;
|
||||
}
|
||||
|
||||
bool operator==(const self_type & rhs) const
|
||||
{
|
||||
return iter == rhs.iter;
|
||||
}
|
||||
|
||||
bool operator!=(const self_type & rhs) const
|
||||
{
|
||||
return iter != rhs.iter;
|
||||
}
|
||||
|
||||
self_type & operator=(const self_type & rhs) = default;
|
||||
self_type & operator=(self_type && rhs) noexcept = default;
|
||||
|
||||
~Iterator() = default;
|
||||
|
||||
private:
|
||||
const Range * range = nullptr;
|
||||
RawIterator iter;
|
||||
};
|
||||
|
||||
protected:
|
||||
F filter;
|
||||
C & container;
|
||||
};
|
||||
|
||||
|
||||
template <typename F, typename C>
|
||||
inline RangeFiltered<std::decay_t<F>, std::remove_reference_t<C>> createRangeFiltered(F && filter, C && container)
|
||||
{
|
||||
return {std::forward<F>(filter), std::forward<C>(container)};
|
||||
};
|
Loading…
Reference in New Issue
Block a user