diff --git a/contrib/poco b/contrib/poco index 1366df1c7e0..bcf9ebad48b 160000 --- a/contrib/poco +++ b/contrib/poco @@ -1 +1 @@ -Subproject commit 1366df1c7e068bb2efd846bc8dc8e286b090904e +Subproject commit bcf9ebad48b2162d25f5fc432b176d74a09f498d diff --git a/dbms/src/Core/ErrorCodes.cpp b/dbms/src/Core/ErrorCodes.cpp index ffeda42047b..bd4914ea08b 100644 --- a/dbms/src/Core/ErrorCodes.cpp +++ b/dbms/src/Core/ErrorCodes.cpp @@ -386,6 +386,7 @@ namespace ErrorCodes extern const int HTTP_LENGTH_REQUIRED = 381; extern const int CANNOT_LOAD_CATBOOST_MODEL = 382; extern const int CANNOT_APPLY_CATBOOST_MODEL = 383; + extern const int PART_IS_TEMPORARILY_LOCKED = 384; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 8e7146e4bfa..0a63dc2348b 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -74,6 +74,7 @@ namespace ErrorCodes extern const int CORRUPTED_DATA; extern const int INVALID_PARTITION_VALUE; extern const int METADATA_MISMATCH; + extern const int PART_IS_TEMPORARILY_LOCKED; } @@ -106,7 +107,9 @@ MergeTreeData::MergeTreeData( database_name(database_), table_name(table_), full_path(full_path_), columns(columns_), broken_part_callback(broken_part_callback_), - log_name(log_name_), log(&Logger::get(log_name + " (Data)")) + log_name(log_name_), log(&Logger::get(log_name + " (Data)")), + data_parts_by_name(data_parts_indexes.get()), + data_parts_by_state_and_name(data_parts_indexes.get()) { merging_params.check(*columns); @@ -381,7 +384,7 @@ Int64 MergeTreeData::getMaxDataPartIndex() std::lock_guard lock_all(data_parts_mutex); Int64 max_block_id = 0; - for (const auto & part : data_parts) + for (const DataPartPtr & part : data_parts_by_name) max_block_id = std::max(max_block_id, part->info.max_block); return max_block_id; @@ -392,9 +395,6 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) { LOG_DEBUG(log, "Loading data parts"); - std::lock_guard lock(data_parts_mutex); - data_parts.clear(); - Strings part_file_names; Poco::DirectoryIterator end; for (Poco::DirectoryIterator it(full_path); it != end; ++it) @@ -410,6 +410,9 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) DataPartsVector broken_parts_to_detach; size_t suspicious_broken_parts = 0; + std::lock_guard lock(data_parts_mutex); + data_parts_indexes.clear(); + for (const String & file_name : part_file_names) { MergeTreePartInfo part_info; @@ -496,7 +499,8 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) /// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later part->state = DataPartState::Committed; - data_parts.insert(part); + if (!data_parts_indexes.insert(part).second) + throw Exception("Part " + part->name + " already exists", ErrorCodes::DUPLICATE_DATA_PART); } if (suspicious_broken_parts > settings.max_suspicious_broken_parts && !skip_sanity_checks) @@ -512,13 +516,21 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) /// were merged), but that for some reason are still not deleted from the filesystem. /// Deletion of files will be performed later in the clearOldParts() method. - if (data_parts.size() >= 2) + if (data_parts_indexes.size() >= 2) { - auto committed_parts = getDataPartsRange({DataPartState::Committed}); - auto prev_jt = committed_parts.begin(); + /// Now all parts are committed, so data_parts_by_state_and_name == committed_parts_range + auto prev_jt = data_parts_by_state_and_name.begin(); auto curr_jt = std::next(prev_jt); - while (curr_jt != committed_parts.end()) + auto deactivate_part = [&] (DataPartIteratorByStateAndName it) + { + (*it)->remove_time = (*it)->modification_time; + modifyPartState(it, DataPartState::Outdated); + }; + + (*prev_jt)->assertState({DataPartState::Committed}); + + while (curr_jt != data_parts_by_state_and_name.end() && (*curr_jt)->state == DataPartState::Committed) { /// Don't consider data parts belonging to different partitions. if ((*curr_jt)->info.partition_id != (*prev_jt)->info.partition_id) @@ -530,16 +542,15 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) if ((*curr_jt)->contains(**prev_jt)) { - (*prev_jt)->remove_time = (*prev_jt)->modification_time; - (*prev_jt)->state = DataPartState::Outdated; /// prev_jt becomes invalid here + deactivate_part(prev_jt); prev_jt = curr_jt; ++curr_jt; } else if ((*prev_jt)->contains(**curr_jt)) { - (*curr_jt)->remove_time = (*curr_jt)->modification_time; - (*curr_jt)->state = DataPartState::Outdated; /// curr_jt becomes invalid here - ++curr_jt; + auto next = std::next(curr_jt); + deactivate_part(curr_jt); + curr_jt = next; } else { @@ -551,7 +562,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) calculateColumnSizesImpl(); - LOG_DEBUG(log, "Loaded data parts (" << data_parts.size() << " items)"); + LOG_DEBUG(log, "Loaded data parts (" << data_parts_indexes.size() << " items)"); } @@ -619,21 +630,30 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts() return res; time_t now = time(nullptr); + std::vector parts_to_delete; { std::lock_guard lock_parts(data_parts_mutex); - for (auto it = data_parts.begin(); it != data_parts.end(); ++it) + auto outdated_parts_range = getDataPartsStateRange(DataPartState::Outdated); + for (auto it = outdated_parts_range.begin(); it != outdated_parts_range.end(); ++it) { - if ((*it)->state == DataPartState::Outdated && - it->unique() && /// Grab only parts that is not using by anyone (SELECTs for example) - (*it)->remove_time < now && - now - (*it)->remove_time > settings.old_parts_lifetime.totalSeconds()) + const DataPartPtr & part = *it; + + if (part.unique() && /// Grab only parts that is not using by anyone (SELECTs for example) + part->remove_time < now && + now - part->remove_time > settings.old_parts_lifetime.totalSeconds()) { - (*it)->state = DataPartState::Deleting; - res.push_back(*it); + parts_to_delete.emplace_back(it); } } + + res.reserve(parts_to_delete.size()); + for (const auto & it_to_delete : parts_to_delete) + { + res.emplace_back(*it_to_delete); + modifyPartState(it_to_delete, DataPartState::Deleting); + } } if (!res.empty()) @@ -650,7 +670,7 @@ void MergeTreeData::rollbackDeletingParts(const MergeTreeData::DataPartsVector & { /// We should modify it under data_parts_mutex part->assertState({DataPartState::Deleting}); - part->state = DataPartState::Outdated; + modifyPartState(part, DataPartState::Outdated); } } @@ -661,26 +681,27 @@ void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & pa /// TODO: use data_parts iterators instead of pointers for (auto & part : parts) { - if (part->state != DataPartState::Deleting) - throw Exception("An attempt to delete part " + part->getNameWithState() + " with unexpected state", ErrorCodes::LOGICAL_ERROR); - - auto it = data_parts.find(part); - if (it == data_parts.end()) + auto it = data_parts_by_name.find(part->info); + if (it == data_parts_by_name.end()) throw Exception("Deleting data part " + part->name + " is not exist", ErrorCodes::LOGICAL_ERROR); - data_parts.erase(it); + (*it)->assertState({DataPartState::Deleting}); + + data_parts_indexes.erase(it); } } -void MergeTreeData::clearOldParts() +void MergeTreeData::clearOldPartsFromFilesystem() { auto parts_to_remove = grabOldParts(); for (const DataPartPtr & part : parts_to_remove) { - LOG_DEBUG(log, "Removing part " << part->name); + LOG_DEBUG(log, "Removing part from filesystem " << part->name); part->remove(); } + + removePartsFinally(parts_to_remove); } void MergeTreeData::setPath(const String & new_full_path, bool move_data) @@ -710,7 +731,7 @@ void MergeTreeData::dropAllData() LOG_TRACE(log, "dropAllData: removing data from memory."); - data_parts.clear(); + data_parts_indexes.clear(); column_sizes.clear(); context.dropCaches(); @@ -1319,9 +1340,13 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace( part->assertState({DataPartState::Temporary}); - DataPartsVector replaced; + MergeTreePartInfo part_info = part->info; + String part_name; + + DataPartsVector replaced_parts; + std::vector replaced_iterators; { - std::lock_guard lock(data_parts_mutex); + std::unique_lock lock(data_parts_mutex); if (DataPartPtr existing_part_in_partition = getAnyPartInPartition(part->info.partition_id, lock)) { @@ -1336,141 +1361,163 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace( * Otherwise there is race condition - merge of blocks could happen in interval that doesn't yet contain new part. */ if (increment) - part->info.min_block = part->info.max_block = increment->get(); + part_info.min_block = part_info.max_block = increment->get(); - String new_name; if (format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING) - new_name = part->info.getPartNameV0(part->getMinDate(), part->getMaxDate()); + part_name = part_info.getPartNameV0(part->getMinDate(), part->getMaxDate()); else - new_name = part->info.getPartName(); + part_name = part_info.getPartName(); - LOG_TRACE(log, "Renaming temporary part " << part->relative_path << " to " << new_name << "."); + LOG_TRACE(log, "Renaming temporary part " << part->relative_path << " to " << part_name << "."); - auto it_duplicate = data_parts.find(part); - if (it_duplicate != data_parts.end()) + auto it_duplicate = data_parts_by_name.find(part_info); + if (it_duplicate != data_parts_by_name.end()) { String message = "Part " + (*it_duplicate)->getNameWithState() + " already exists"; + if ((*it_duplicate)->checkState({DataPartState::Outdated, DataPartState::Deleting})) - message += ", but it will be deleted soon"; + { + throw Exception(message + ", but it will be deleted soon", ErrorCodes::PART_IS_TEMPORARILY_LOCKED); + } throw Exception(message, ErrorCodes::DUPLICATE_DATA_PART); } - /// Rename the part only in memory. Will rename it on disk only if all check is passed. - /// It allows us maintain invariant: if non-temporary parts in filesystem then they are in data_parts - part->name = new_name; + /// Check that part is not covered and doesn't cover other in-progress parts, it makes sense only for Replicated* engines + if (out_transaction) + { + auto check_coverage = [&part_info, &part_name] (const DataPartPtr & part) + { + if (part_info.contains(part->info)) + { + throw Exception("Cannot add part " + part_name + " covering pre-committed part " + part->name, ErrorCodes::PART_IS_TEMPORARILY_LOCKED); + } + else + { + if (part->info.contains(part_info)) + throw Exception("Cannot add part " + part_name + " covered by pre-committed part " + part->name + ". It is a bug", ErrorCodes::LOGICAL_ERROR); + } + }; + + auto it_middle = data_parts_by_state_and_name.lower_bound(DataPartStateAndInfo(DataPartState::PreCommitted, part_info)); + + auto precommitted_parts_range = getDataPartsStateRange(DataPartState::PreCommitted); + + for (auto it = it_middle; it != precommitted_parts_range.begin();) + { + --it; + check_coverage(*it); + } + + for (auto it = it_middle; it != precommitted_parts_range.end();) + { + check_coverage(*it); + ++it; + } + } /// Is the part covered by some other part? - bool obsolete = false; + DataPartPtr covering_part; - auto check_replacing_part_state = [&] (const DataPartPtr & cur_part) - { - cur_part->assertState({DataPartState::PreCommitted, DataPartState::Committed}); - if (cur_part->state == DataPartState::PreCommitted) - throw Exception("Could not add part " + new_name + " while replacing part " + cur_part->name + " is in pre-committed state", ErrorCodes::LOGICAL_ERROR); - }; + auto it_middle = data_parts_by_state_and_name.lower_bound(DataPartStateAndInfo(DataPartState::Committed, part_info)); - /// Don't consider parts going to be deleted - auto active_parts = getDataPartsRange({DataPartState::Committed, DataPartState::PreCommitted}); /// Parts contained in the part are consecutive in data_parts, intersecting the insertion place for the part itself. - auto it_middle = active_parts.convert(data_parts.lower_bound(part)); + auto committed_parts_range = getDataPartsStateRange(DataPartState::Committed); /// Go to the left. - for (auto it = it_middle; it != active_parts.begin();) + for (auto it = it_middle; it != committed_parts_range.begin();) { --it; - if (!part->contains(**it)) + if (!part_info.contains((*it)->info)) { - if ((*it)->contains(*part)) - obsolete = true; - ++it; + if ((*it)->info.contains(part_info)) + covering_part = *it; break; } - check_replacing_part_state(*it); - replaced.push_back(*it); -// replaced.push_back(*it); -// (*it)->remove_time = time(nullptr); -// (*it)->state = replaced_parts_state; -// removePartContributionToColumnSizes(*it); -// data_parts.erase(it++); /// Yes, ++, not --. + replaced_iterators.push_back(it); } /// Parts must be in ascending order. - std::reverse(replaced.begin(), replaced.end()); + std::reverse(replaced_iterators.begin(), replaced_iterators.end()); /// Go to the right. - for (auto it = it_middle; it != active_parts.end();) + for (auto it = it_middle; it != committed_parts_range.end();) { - if ((*it)->name == part->name) - throw Exception("Unexpected duplicate part " + part->getNameWithState() + ". It is a bug.", ErrorCodes::LOGICAL_ERROR); + if ((*it)->name == part_name) + throw Exception("Unexpected duplicate part " + (*it)->getNameWithState() + ". It is a bug.", ErrorCodes::LOGICAL_ERROR); - if (!part->contains(**it)) + if (!part_info.contains((*it)->info)) { - if ((*it)->contains(*part)) - obsolete = true; + if ((*it)->info.contains(part_info)) + covering_part = *it; break; } - check_replacing_part_state(*it); - replaced.push_back(*it); + replaced_iterators.push_back(it); ++it; -// replaced.push_back(*it); -// (*it)->remove_time = time(nullptr); -// (*it)->state = replaced_parts_state; -// removePartContributionToColumnSizes(*it); -// data_parts.erase(it++); } - if (obsolete) + if (covering_part) { - LOG_WARNING(log, "Obsolete part " << part->name << " added"); + LOG_WARNING(log, "Tried to add obsolete part " << part_name << " covered by " << covering_part->getNameWithState()); + + /// It is a temporary part, we want to delete it from filesystem immediately + /// Other fields remain the same part->remove_time = time(nullptr); - /// I case of fail, we want to delete part from filesystem immediately (to avoid any conflicts) part->is_temp = true; + + /// Nothing to commit or rollback + if (out_transaction) + { + out_transaction->data = this; + out_transaction->parts_to_add_on_rollback = {}; + out_transaction->parts_to_remove_on_rollback = {}; + } + + /// We replaced nothing + return {}; + } + + /// All checks are passed. Now we can rename the part on disk. + /// So, we maintain invariant: if a non-temporary part in filesystem then it is in data_parts + /// + /// Ordinary MergeTree engines (they don't use out_transaction) commit parts immediately, + /// whereas ReplicatedMergeTree uses intermediate PreCommitted state + part->name = part_name; + part->info = part_info; + part->is_temp = false; + part->state = (out_transaction) ? DataPartState::PreCommitted : DataPartState::Committed; + part->renameTo(part_name); + + data_parts_indexes.insert(part); + + replaced_parts.reserve(replaced_iterators.size()); + for (auto it_replacing_part : replaced_iterators) + replaced_parts.emplace_back(*it_replacing_part); + + if (!out_transaction) + { + addPartContributionToColumnSizes(part); + + auto current_time = time(nullptr); + for (auto it_replacing_part : replaced_iterators) + { + (*it_replacing_part)->remove_time = current_time; + modifyPartState(it_replacing_part, DataPartState::Outdated); + removePartContributionToColumnSizes(*it_replacing_part); + } } else { - /// Now we can rename part on filesystem - part->is_temp = false; - part->renameTo(new_name); - - if (!out_transaction) - { - /// Ordinary MergeTree engines (they don't use out_transaction) commit parts immediately - part->state = DataPartState::Committed; - addPartContributionToColumnSizes(part); - } - else - { - /// Whereas ReplicatedMergeTree uses intermediate PreCommitted state - part->state = DataPartState::PreCommitted; - } - - data_parts.insert(part); - - auto current_time = time(nullptr); - for (auto & replacing_part : replaced) - { - if (!out_transaction) - { - replacing_part->remove_time = current_time; - replacing_part->state = DataPartState::Outdated; - removePartContributionToColumnSizes(replacing_part); - } - } + out_transaction->data = this; + out_transaction->parts_to_add_on_rollback = replaced_parts; + out_transaction->parts_to_remove_on_rollback = {part}; } } - if (out_transaction) - { - out_transaction->data = this; - out_transaction->parts_to_add_on_rollback = replaced; - out_transaction->parts_to_remove_on_rollback = {part}; - } - - return replaced; + return replaced_parts; } void MergeTreeData::removePartsFromWorkingSet(const DataPartsVector & remove, bool clear_without_timeout) @@ -1479,7 +1526,7 @@ void MergeTreeData::removePartsFromWorkingSet(const DataPartsVector & remove, bo for (auto & part : remove) { - if (!data_parts.count(part)) + if (!data_parts_by_name.count(part->info)) throw Exception("Part " + part->getNameWithState() + " not found in data_parts", ErrorCodes::LOGICAL_ERROR); part->assertState({DataPartState::PreCommitted, DataPartState::Committed, DataPartState::Outdated}); @@ -1490,7 +1537,8 @@ void MergeTreeData::removePartsFromWorkingSet(const DataPartsVector & remove, bo { if (part->state == DataPartState::Committed) removePartContributionToColumnSizes(part); - part->state = DataPartState::Outdated; + + modifyPartState(part, DataPartState::Outdated); part->remove_time = remove_time; } } @@ -1502,65 +1550,93 @@ void MergeTreeData::renameAndDetachPart(const DataPartPtr & part_to_detach, cons LOG_INFO(log, "Renaming " << part_to_detach->relative_path << " to " << prefix << part_to_detach->name << " and detaching it."); std::lock_guard lock(data_parts_mutex); - //std::lock_guard lock_all(all_data_parts_mutex); - auto it_part = data_parts.find(part_to_detach); - if (it_part == data_parts.end()) + auto it_part = data_parts_by_name.find(part_to_detach->info); + if (it_part == data_parts_by_name.end()) throw Exception("No such data part " + part_to_detach->getNameWithState(), ErrorCodes::NO_SUCH_DATA_PART); /// What if part_to_detach is reference to *it_part? Make a new owner just in case. - auto part = *it_part; + DataPartPtr part = *it_part; - removePartContributionToColumnSizes(part); - part->state = DataPartState::Deleting; + if (part->state == DataPartState::Committed) + removePartContributionToColumnSizes(part); + modifyPartState(it_part, DataPartState::Deleting); if (move_to_detached || !prefix.empty()) part->renameAddPrefix(move_to_detached, prefix); + data_parts_indexes.erase(it_part); + + if (restore_covered && part->info.level == 0) + { + LOG_WARNING(log, "Will not recover parts covered by zero-level part " << part->name); + return; + } if (restore_covered) { - auto suitable_parts = getDataPartsRange({DataPartState::PreCommitted, DataPartState::Committed, DataPartState::Outdated}); - auto it = suitable_parts.convert(data_parts.lower_bound(part)); - Strings restored; bool error = false; + String error_parts; Int64 pos = part->info.min_block; - if (it != suitable_parts.begin()) + auto is_appropriate_state = [] (DataPartState state) { - --it; - if (part->contains(**it)) + return state == DataPartState::Committed || state == DataPartState::Outdated; + }; + + auto update_error = [&] (DataPartIteratorByAndName it) + { + error = true; + error_parts += (*it)->getNameWithState() + " "; + }; + + auto it_middle = data_parts_by_name.lower_bound(part->info); + + /// Restore the leftmost part covered by the part + if (it_middle != data_parts_by_name.begin()) + { + auto it = std::prev(it_middle); + + if (part->contains(**it) && is_appropriate_state((*it)->state)) { + /// Maybe, we must consider part level somehow if ((*it)->info.min_block != part->info.min_block) - error = true; + update_error(it); if ((*it)->state != DataPartState::Committed) { addPartContributionToColumnSizes(*it); - (*it)->state = DataPartState::Committed; + modifyPartState(it, DataPartState::Committed); // iterator is not invalidated here } pos = (*it)->info.max_block + 1; restored.push_back((*it)->name); } else - error = true; - ++it; + update_error(it); } else error = true; - for (; it != suitable_parts.end() && part->contains(**it); ++it) + /// Restore "right" parts + for (auto it = it_middle; it != data_parts_by_name.end() && part->contains(**it); ++it) { if ((*it)->info.min_block < pos) continue; + + if (!is_appropriate_state((*it)->state)) + { + update_error(it); + continue; + } + if ((*it)->info.min_block > pos) - error = true; + update_error(it); if ((*it)->state != DataPartState::Committed) { addPartContributionToColumnSizes(*it); - (*it)->state = DataPartState::Committed; + modifyPartState(it, DataPartState::Committed); } pos = (*it)->info.max_block + 1; @@ -1576,18 +1652,24 @@ void MergeTreeData::renameAndDetachPart(const DataPartPtr & part_to_detach, cons } if (error) - LOG_ERROR(log, "The set of parts restored in place of " << part->name << " looks incomplete. There might or might not be a data loss."); + { + LOG_ERROR(log, "The set of parts restored in place of " << part->name << " looks incomplete." + << " There might or might not be a data loss." + << (error_parts.empty() ? "" : " Suspicious parts: " + error_parts)); + } } } size_t MergeTreeData::getTotalActiveSizeInBytes() const { - std::lock_guard lock(data_parts_mutex); - size_t res = 0; - for (auto & part : getDataPartsRange({DataPartState::Committed})) - res += part->size_in_bytes; + { + std::lock_guard lock(data_parts_mutex); + + for (auto & part : getDataPartsStateRange(DataPartState::Committed)) + res += part->size_in_bytes; + } return res; } @@ -1601,7 +1683,7 @@ size_t MergeTreeData::getMaxPartsCountForPartition() const size_t cur_count = 0; const String * cur_partition_id = nullptr; - for (const auto & part : getDataPartsRange({DataPartState::Committed})) + for (const auto & part : getDataPartsStateRange(DataPartState::Committed)) { if (cur_partition_id && part->info.partition_id == *cur_partition_id) { @@ -1656,11 +1738,12 @@ MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String & std::lock_guard lock(data_parts_mutex); - /// The part can be covered only by the previous or the next one in data_parts. - auto committed_parts = getDataPartsRange({DataPartState::Committed}); - auto it = committed_parts.convert(data_parts.lower_bound(part_info)); + auto committed_parts_range = getDataPartsStateRange(DataPartState::Committed); - if (it != committed_parts.end()) + /// The part can be covered only by the previous or the next one in data_parts. + auto it = data_parts_by_state_and_name.lower_bound(DataPartStateAndInfo(DataPartState::Committed, part_info)); + + if (it != committed_parts_range.end()) { if ((*it)->name == part_name) return *it; @@ -1668,7 +1751,7 @@ MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String & return *it; } - if (it != committed_parts.begin()) + if (it != committed_parts_range.begin()) { --it; if ((*it)->info.contains(part_info)) @@ -1685,10 +1768,15 @@ MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_na std::lock_guard lock(data_parts_mutex); - auto filtered_parts = getDataPartsRange(valid_states); - auto it = filtered_parts.convert(data_parts.find(part_info)); - if (it != filtered_parts.end() && (*it)->name == part_name) - return *it; + auto it = data_parts_by_name.find(part_info); + if (it == data_parts_by_name.end()) + return nullptr; + + for (auto state : valid_states) + { + if ((*it)->state == state) + return *it; + } return nullptr; } @@ -1742,7 +1830,8 @@ void MergeTreeData::calculateColumnSizesImpl() column_sizes.clear(); /// Take into account only committed parts - for (const auto & part : getDataPartsRange({DataPartState::Committed})) + auto committed_parts_range = getDataPartsStateRange(DataPartState::Committed); + for (const auto & part : committed_parts_range) addPartContributionToColumnSizes(part); } @@ -1953,7 +2042,7 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, const Context String partition_id = partition.getID(*this); { - std::lock_guard data_parts_lock(data_parts_mutex); + std::unique_lock data_parts_lock(data_parts_mutex); DataPartPtr existing_part_in_partition = getAnyPartInPartition(partition_id, data_parts_lock); if (existing_part_in_partition && existing_part_in_partition->partition.value != partition.value) { @@ -1969,28 +2058,48 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, const Context return partition_id; } -MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector(const DataPartStates & affordable_states) const +MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector(const DataPartStates & affordable_states, DataPartStateVector * out_states) const { DataPartsVector res; + DataPartsVector buf; { std::lock_guard lock(data_parts_mutex); - std::copy_if(data_parts.begin(), data_parts.end(), std::back_inserter(res), DataPart::getStatesFilter(affordable_states)); + + for (auto state : affordable_states) + { + buf = std::move(res); + res.clear(); + + auto range = getDataPartsStateRange(state); + std::merge(range.begin(), range.end(), buf.begin(), buf.end(), std::back_inserter(res), LessDataPart()); + } + + if (out_states != nullptr) + { + out_states->resize(res.size()); + for (size_t i = 0; i < res.size(); ++i) + (*out_states)[i] = res[i]->state; + } } + return res; } -MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector(const MergeTreeData::DataPartStates & affordable_states, - MergeTreeData::DataPartStateVector & out_states_snapshot) const +MergeTreeData::DataPartsVector MergeTreeData::getAllDataPartsVector(MergeTreeData::DataPartStateVector * out_states) const { DataPartsVector res; { std::lock_guard lock(data_parts_mutex); - std::copy_if(data_parts.begin(), data_parts.end(), std::back_inserter(res), DataPart::getStatesFilter(affordable_states)); + res.assign(data_parts_by_name.begin(), data_parts_by_name.end()); - out_states_snapshot.resize(res.size()); - for (size_t i = 0; i < res.size(); ++i) - out_states_snapshot[i] = res[i]->state; + if (out_states != nullptr) + { + out_states->resize(res.size()); + for (size_t i = 0; i < res.size(); ++i) + (*out_states)[i] = res[i]->state; + } } + return res; } @@ -1999,7 +2108,11 @@ MergeTreeData::DataParts MergeTreeData::getDataParts(const DataPartStates & affo DataParts res; { std::lock_guard lock(data_parts_mutex); - std::copy_if(data_parts.begin(), data_parts.end(), std::inserter(res, res.end()), DataPart::getStatesFilter(affordable_states)); + for (auto state : affordable_states) + { + auto range = getDataPartsStateRange(state); + res.insert(range.begin(), range.end()); + } } return res; } @@ -2014,28 +2127,23 @@ MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector() const return getDataPartsVector({DataPartState::Committed}); } -MergeTreeData::DataParts MergeTreeData::getAllDataParts() const -{ - return getDataParts({DataPartState::PreCommitted, DataPartState::Committed, DataPartState::Outdated}); -} - MergeTreeData::DataPartPtr MergeTreeData::getAnyPartInPartition( - const String & partition_id, std::lock_guard & data_parts_lock) + const String & partition_id, std::unique_lock & data_parts_lock) { auto min_block = std::numeric_limits::min(); MergeTreePartInfo dummy_part_info(partition_id, min_block, min_block, 0); - auto committed_parts = getDataPartsRange({DataPartState::Committed}); - auto it = committed_parts.convert(data_parts.lower_bound(dummy_part_info)); + auto it = data_parts_by_state_and_name.lower_bound(DataPartStateAndInfo(DataPartState::Committed, dummy_part_info)); - if (it != committed_parts.end() && (*it)->info.partition_id == partition_id) + if (it != data_parts_by_state_and_name.end() && (*it)->state == DataPartState::Committed && (*it)->info.partition_id == partition_id) return *it; - return {}; + + return nullptr; } void MergeTreeData::Transaction::rollback() { - if (data && (!parts_to_remove_on_rollback.empty() || !parts_to_add_on_rollback.empty())) + if (!isEmpty()) { std::stringstream ss; if (!parts_to_remove_on_rollback.empty()) @@ -2057,14 +2165,19 @@ void MergeTreeData::Transaction::rollback() /// PreCommitted -> Outdated replaceParts(DataPartState::Outdated, DataPartState::Committed, true); - clear(); } + + clear(); } void MergeTreeData::Transaction::commit() { - /// PreCommitted -> Committed, Committed -> Outdated - replaceParts(DataPartState::Committed, DataPartState::Outdated, false); + if (!isEmpty()) + { + /// PreCommitted -> Committed, Committed -> Outdated + replaceParts(DataPartState::Committed, DataPartState::Outdated, false); + } + clear(); } @@ -2088,9 +2201,9 @@ void MergeTreeData::Transaction::replaceParts(MergeTreeData::DataPartState move_ /// If it is rollback then do nothing, else make it Outdated and remove their size contribution if (move_committed_to != DataPartState::Committed) { - for (auto & part : committed_parts) + for (const DataPartPtr & part : committed_parts) { - part->state = move_committed_to; + data->modifyPartState(part, move_committed_to); part->remove_time = remove_time; data->removePartContributionToColumnSizes(part); } @@ -2099,7 +2212,7 @@ void MergeTreeData::Transaction::replaceParts(MergeTreeData::DataPartState move_ /// If it is rollback just change state to Outdated, else change state to Committed and add their size contribution for (auto & part : precommitted_parts) { - part->state = move_precommitted_to; + data->modifyPartState(part, move_precommitted_to); if (move_precommitted_to == DataPartState::Committed) data->addPartContributionToColumnSizes(part); else diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.h b/dbms/src/Storages/MergeTree/MergeTreeData.h index 2235a73dbf1..fe793c5da9c 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.h +++ b/dbms/src/Storages/MergeTree/MergeTreeData.h @@ -15,7 +15,10 @@ #include #include -#include +#include +#include +#include +#include namespace DB { @@ -104,7 +107,16 @@ public: using DataPartStates = std::initializer_list; using DataPartStateVector = std::vector; - struct DataPartPtrLess + /// Auxiliary structure for index comparison. Keep in mind lifetime of MergeTreePartInfo. + struct DataPartStateAndInfo + { + DataPartState state; + const MergeTreePartInfo & info; + + DataPartStateAndInfo(DataPartState state, const MergeTreePartInfo & info) : state(state), info(info) {} + }; + + struct LessDataPart { using is_transparent = void; @@ -113,11 +125,32 @@ public: bool operator()(const DataPartPtr & lhs, const DataPartPtr & rhs) const { return lhs->info < rhs->info; } }; - using DataParts = std::set; + struct LessStateDataPart + { + using is_transparent = void; + + bool operator() (const DataPartStateAndInfo & lhs, const DataPartStateAndInfo & rhs) const + { + return std::forward_as_tuple(static_cast(lhs.state), lhs.info) + < std::forward_as_tuple(static_cast(rhs.state), rhs.info); + } + + bool operator() (DataPartStateAndInfo info, const DataPartState & state) const + { + return static_cast(info.state) < static_cast(state); + } + + bool operator() (const DataPartState & state, DataPartStateAndInfo info) const + { + return static_cast(state) < static_cast(info.state); + } + }; + + using DataParts = std::set; using DataPartsVector = std::vector; /// For resharding. - using MutableDataParts = std::set; + using MutableDataParts = std::set; using PerShardDataParts = std::unordered_map; /// Some operations on the set of parts return a Transaction object. @@ -131,6 +164,11 @@ public: void rollback(); + bool isEmpty() const + { + return parts_to_add_on_rollback.empty() && parts_to_remove_on_rollback.empty(); + } + ~Transaction() { try @@ -310,22 +348,17 @@ public: /// Returns a copy of the list so that the caller shouldn't worry about locks. DataParts getDataParts(const DataPartStates & affordable_states) const; - DataPartsVector getDataPartsVector(const DataPartStates & affordable_states) const; - DataPartsVector getDataPartsVector(const DataPartStates & affordable_states, DataPartStateVector & out_states_snapshot) const; + /// Returns sorted list of the parts with specified states + /// out_states will contain snapshot of each part state + DataPartsVector getDataPartsVector(const DataPartStates & affordable_states, DataPartStateVector * out_states = nullptr) const; - /// Returns a virtual container iteration only through parts with specified states - decltype(auto) getDataPartsRange(const DataPartStates & affordable_states) const - { - return createRangeFiltered(DataPart::getStatesFilter(affordable_states), data_parts); - } + /// Returns absolutely all parts (and snapshot of their states) + DataPartsVector getAllDataPartsVector(DataPartStateVector * out_states = nullptr) const; /// Returns Committed parts DataParts getDataParts() const; DataPartsVector getDataPartsVector() const; - /// Returns all parts except Temporary and Deleting ones - DataParts getAllDataParts() const; - /// Returns an comitted part with the given name or a part containing it. If there is no such part, returns nullptr. DataPartPtr getActiveContainingPart(const String & part_name); @@ -375,8 +408,8 @@ public: /// Removes parts from data_parts, they should be in Deleting state void removePartsFinally(const DataPartsVector & parts); - /// Delete irrelevant parts. - void clearOldParts(); + /// Delete irrelevant parts from memory and disk. + void clearOldPartsFromFilesystem(); /// Deleate all directories which names begin with "tmp" /// Set non-negative parameter value to override MergeTreeSettings temporary_directories_lifetime @@ -538,15 +571,81 @@ private: String log_name; Logger * log; - /// Current set of data parts. - DataParts data_parts; - mutable std::mutex data_parts_mutex; - /// The set of all data parts including already merged but not yet deleted. Usually it is small (tens of elements). - /// The part is referenced from here, from the list of current parts and from each thread reading from it. - /// This means that if reference count is 1 - the part is not used right now and can be deleted. -// DataParts all_data_parts; -// mutable std::mutex all_data_parts_mutex; + /// Work with data parts + + struct TagByName{}; + struct TagByStateAndName{}; + + static const MergeTreePartInfo & dataPartPtrToInfo(const DataPartPtr & part) + { + return part->info; + } + + static DataPartStateAndInfo dataPartPtrToStateAndInfo(const DataPartPtr & part) + { + return {part->state, part->info}; + }; + + using DataPartsIndexes = boost::multi_index_container, + boost::multi_index::global_fun + >, + /// Index by (State, Name), is used to obtain ordered slices of parts with the same state + boost::multi_index::ordered_unique< + boost::multi_index::tag, + boost::multi_index::global_fun, + LessStateDataPart + > + > + >; + + /// Current set of data parts. + mutable std::mutex data_parts_mutex; + DataPartsIndexes data_parts_indexes; + DataPartsIndexes::index::type & data_parts_by_name; + DataPartsIndexes::index::type & data_parts_by_state_and_name; + + using DataPartIteratorByAndName = DataPartsIndexes::index::type::iterator; + using DataPartIteratorByStateAndName = DataPartsIndexes::index::type::iterator; + + boost::iterator_range getDataPartsStateRange(DataPartState state) const + { + auto begin = data_parts_by_state_and_name.lower_bound(state, LessStateDataPart()); + auto end = data_parts_by_state_and_name.upper_bound(state, LessStateDataPart()); + return {begin, end}; + } + + static decltype(auto) getStateModifier(DataPartState state) + { + return [state] (const DataPartPtr & part) { part->state = state; }; + } + + void modifyPartState(DataPartIteratorByStateAndName it, DataPartState state) + { + if (!data_parts_by_state_and_name.modify(it, getStateModifier(state))) + throw Exception("Can't modify " + (*it)->getNameWithState(), ErrorCodes::LOGICAL_ERROR); + } + + void modifyPartState(DataPartIteratorByAndName it, DataPartState state) + { + if (!data_parts_by_state_and_name.modify(data_parts_indexes.project(it), getStateModifier(state))) + throw Exception("Can't modify " + (*it)->getNameWithState(), ErrorCodes::LOGICAL_ERROR); + } + + void modifyPartState(const DataPartPtr & part, DataPartState state) + { + auto it = data_parts_by_name.find(part->info); + if (it == data_parts_by_name.end() || (*it).get() != part.get()) + throw Exception("Part " + part->name + " is not exists", ErrorCodes::LOGICAL_ERROR); + + if (!data_parts_by_state_and_name.modify(data_parts_indexes.project(it), getStateModifier(state))) + throw Exception("Can't modify " + (*it)->getNameWithState(), ErrorCodes::LOGICAL_ERROR); + } + /// Used to serialize calls to grabOldParts. std::mutex grab_old_parts_mutex; @@ -582,7 +681,7 @@ private: void removePartContributionToColumnSizes(const DataPartPtr & part); /// If there is no part in the partition with ID `partition_id`, returns empty ptr. Should be called under the lock. - DataPartPtr getAnyPartInPartition(const String & partition_id, std::lock_guard & data_parts_lock); + DataPartPtr getAnyPartInPartition(const String & partition_id, std::unique_lock & data_parts_lock); }; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index 280e1cc30a6..cc8094b8ee5 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -187,7 +187,7 @@ bool MergeTreeDataMerger::selectPartsToMerge( if (prev_part && part->info.partition_id == (*prev_part)->info.partition_id && part->info.min_block < (*prev_part)->info.max_block) { - LOG_ERROR(log, "Part " << part->name << " intersects previous part " << (*prev_part)->name); + LOG_ERROR(log, "Part " << part->getNameWithState() << " intersects previous part " << (*prev_part)->getNameWithState()); } prev_part = ∂ diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp index a893a8d26d3..d8e35552065 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp @@ -38,6 +38,7 @@ namespace ErrorCodes extern const int FORMAT_VERSION_TOO_OLD; extern const int UNKNOWN_FORMAT; extern const int UNEXPECTED_FILE_IN_DATA_PART; + extern const int NOT_FOUND_EXPECTED_DATA_PART; } @@ -935,4 +936,16 @@ String MergeTreeDataPart::stateString() const return stateToString(state); } +void MergeTreeDataPart::assertState(const std::initializer_list & affordable_states) const +{ + if (!checkState(affordable_states)) + { + String states_str; + for (auto state : affordable_states) + states_str += stateToString(state) + " "; + + throw Exception("Unexpected state of part " + getNameWithState() + ". Expected: " + states_str, ErrorCodes::NOT_FOUND_EXPECTED_DATA_PART); + } +} + } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPart.h b/dbms/src/Storages/MergeTree/MergeTreeDataPart.h index 1863fcbc0f2..b767eb6414b 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPart.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPart.h @@ -190,17 +190,7 @@ struct MergeTreeDataPart } /// Throws an exception if state of the part is not in affordable_states - void assertState(const std::initializer_list & affordable_states) const - { - if (!checkState(affordable_states)) - { - String states_str; - for (auto state : affordable_states) - states_str += stateToString(state) + " "; - - throw Exception("Unexpected state of part " + getNameWithState() + ". Expected: " + states_str); - } - } + void assertState(const std::initializer_list & affordable_states) const; /// In comparison with lambdas, it is move assignable and could has several overloaded operator() struct StatesFilter @@ -327,4 +317,7 @@ private: void checkConsistency(bool require_part_metadata); }; + +using MergeTreeDataPartState = MergeTreeDataPart::State; + } diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 035d01de38d..c11ee29b10a 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -68,7 +68,7 @@ StorageMergeTree::StorageMergeTree( } else { - data.clearOldParts(); + data.clearOldPartsFromFilesystem(); } /// Temporary directories contain incomplete results of merges (after forced restart) @@ -188,7 +188,7 @@ void StorageMergeTree::alter( if (primary_key_is_modified && supportsSampling()) throw Exception("MODIFY PRIMARY KEY only supported for tables without sampling key", ErrorCodes::BAD_ARGUMENTS); - MergeTreeData::DataParts parts = data.getAllDataParts(); + auto parts = data.getDataParts({MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated}); for (const MergeTreeData::DataPartPtr & part : parts) { if (auto transaction = data.alterDataPart(part, columns_for_parts, new_primary_key_ast, false)) @@ -291,7 +291,7 @@ bool StorageMergeTree::merge( /// Clear old parts. It does not matter to do it more frequently than each second. if (auto lock = time_after_previous_cleanup.lockTestAndRestartAfter(1)) { - data.clearOldParts(); + data.clearOldPartsFromFilesystem(); data.clearOldTemporaryDirectories(); } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 8e745721434..55f72ca520b 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -109,6 +109,7 @@ namespace ErrorCodes extern const int RECEIVED_ERROR_TOO_MANY_REQUESTS; extern const int TOO_MUCH_FETCHES; extern const int BAD_DATA_PART_NAME; + extern const int PART_IS_TEMPORARILY_LOCKED; } @@ -800,7 +801,7 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks) /// Parts in ZK. NameSet expected_parts(expected_parts_vec.begin(), expected_parts_vec.end()); - MergeTreeData::DataParts parts = data.getAllDataParts(); + auto parts = data.getDataParts({MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated}); /// Local parts that are not in ZK. MergeTreeData::DataParts unexpected_parts; @@ -1179,7 +1180,21 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry) if (!do_fetch) { merger.renameMergedTemporaryPart(part, parts, &transaction); - getZooKeeper()->multi(ops); /// After long merge, get fresh ZK handle, because previous session may be expired. + + /// Do not commit if the part is obsolete + if (!transaction.isEmpty()) + { + getZooKeeper()->multi(ops); /// After long merge, get fresh ZK handle, because previous session may be expired. + transaction.commit(); + } + + /** Removing old chunks from ZK and from the disk is delayed - see ReplicatedMergeTreeCleanupThread, clearOldParts. + */ + + /** With `ZCONNECTIONLOSS` or `ZOPERATIONTIMEOUT`, we can inadvertently roll back local changes to the parts. + * This is not a problem, because in this case the merge will remain in the queue, and we will try again. + */ + merge_selecting_event.set(); if (auto part_log = context.getPartLog(database_name, table_name)) { @@ -1212,15 +1227,6 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry) } } - /** Removing old chunks from ZK and from the disk is delayed - see ReplicatedMergeTreeCleanupThread, clearOldParts. - */ - - /** With `ZCONNECTIONLOSS` or `ZOPERATIONTIMEOUT`, we can inadvertently roll back local changes to the parts. - * This is not a problem, because in this case the merge will remain in the queue, and we will try again. - */ - transaction.commit(); - merge_selecting_event.set(); - ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges); } } @@ -1443,8 +1449,9 @@ void StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTr /// It's important that no old parts remain (after the merge), because otherwise, /// after adding a new replica, this new replica downloads them, but does not delete them. /// And, if you do not, the parts will come to life after the server is restarted. - /// Therefore, we use getAllDataParts. - auto parts = data.getAllDataParts(); + /// Therefore, we use all data parts. + auto parts = data.getDataParts({MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated}); + for (const auto & part : parts) { if (!entry_part_info.contains(part->info)) @@ -1616,6 +1623,11 @@ bool StorageReplicatedMergeTree::queueTask() /// Interrupted merge or downloading a part is not an error. LOG_INFO(log, e.message()); } + else if (e.code() == ErrorCodes::PART_IS_TEMPORARILY_LOCKED) + { + /// Part cannot be added temporarily + LOG_INFO(log, e.displayText()); + } else tryLogCurrentException(__PRETTY_FUNCTION__); @@ -2205,6 +2217,13 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin MergeTreeData::Transaction transaction; auto removed_parts = data.renameTempPartAndReplace(part, nullptr, &transaction); + /// Do not commit if the part is obsolete + if (!transaction.isEmpty()) + { + getZooKeeper()->multi(ops); + transaction.commit(); + } + if (auto part_log = context.getPartLog(database_name, table_name)) { PartLogElement elem; @@ -2236,10 +2255,6 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin } } - - getZooKeeper()->multi(ops); - transaction.commit(); - /** If a quorum is tracked for this part, you must update it. * If you do not have time, in case of losing the session, when you restart the server - see the `ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart` method. */ diff --git a/dbms/src/Storages/System/StorageSystemParts.cpp b/dbms/src/Storages/System/StorageSystemParts.cpp index aba6db5fbbc..02cad078f02 100644 --- a/dbms/src/Storages/System/StorageSystemParts.cpp +++ b/dbms/src/Storages/System/StorageSystemParts.cpp @@ -39,7 +39,7 @@ StorageSystemParts::StorageSystemParts(const std::string & name_) {"database", std::make_shared()}, {"table", std::make_shared()}, - {"engine", std::make_shared()}, + {"engine", std::make_shared()} } { } @@ -53,9 +53,12 @@ BlockInputStreams StorageSystemParts::read( const size_t max_block_size, const unsigned num_streams) { - check(column_names); + //check(column_names); processed_stage = QueryProcessingStage::FetchColumns; + auto it_state_column = std::find(column_names.begin(), column_names.end(), "_state"); + bool has_state_column = it_state_column != column_names.end(); + /// Will apply WHERE to subset of columns and then add more columns. /// This is kind of complicated, but we use WHERE to do less work. @@ -142,6 +145,8 @@ BlockInputStreams StorageSystemParts::read( /// Finally, create the result. Block block = getSampleBlock(); + if (has_state_column) + block.insert(ColumnWithTypeAndName(std::make_shared(), "_state")); for (size_t i = 0; i < filtered_database_column->size();) { @@ -198,10 +203,18 @@ BlockInputStreams StorageSystemParts::read( using State = MergeTreeDataPart::State; MergeTreeData::DataPartStateVector all_parts_state; MergeTreeData::DataPartsVector all_parts; + if (need[0]) - all_parts = data->getDataPartsVector({State::Committed, State::Outdated}, all_parts_state); + { + /// If has_state_column is requested, return all states + if (!has_state_column) + all_parts = data->getDataPartsVector({State::Committed, State::Outdated}, &all_parts_state); + else + all_parts = data->getAllDataPartsVector(&all_parts_state); + } else - all_parts = data->getDataPartsVector({State::Committed}, all_parts_state); + all_parts = data->getDataPartsVector({State::Committed}, &all_parts_state); + /// Finally, we'll go through the list of parts. for (size_t part_number = 0; part_number < all_parts.size(); ++part_number) @@ -248,11 +261,30 @@ BlockInputStreams StorageSystemParts::read( block.getByPosition(i++).column->insert(database); block.getByPosition(i++).column->insert(table); block.getByPosition(i++).column->insert(engine); + + if (has_state_column) + block.getByPosition(i++).column->insert(part->stateString()); } } return BlockInputStreams(1, std::make_shared(block)); } +NameAndTypePair StorageSystemParts::getColumn(const String & column_name) const +{ + if (column_name == "_state") + return NameAndTypePair("_state", std::make_shared()); + + return ITableDeclaration::getColumn(column_name); +} + +bool StorageSystemParts::hasColumn(const String & column_name) const +{ + if (column_name == "_state") + return true; + + return ITableDeclaration::hasColumn(column_name); +} + } diff --git a/dbms/src/Storages/System/StorageSystemParts.h b/dbms/src/Storages/System/StorageSystemParts.h index 17c6a7f4e5c..09b14d72e56 100644 --- a/dbms/src/Storages/System/StorageSystemParts.h +++ b/dbms/src/Storages/System/StorageSystemParts.h @@ -21,6 +21,10 @@ public: const NamesAndTypesList & getColumnsListImpl() const override { return columns; } + NameAndTypePair getColumn(const String & column_name) const override; + + bool hasColumn(const String & column_name) const override; + BlockInputStreams read( const Names & column_names, const SelectQueryInfo & query_info, diff --git a/dbms/src/Storages/tests/gtest_range_filtered.cpp b/dbms/src/Storages/tests/gtest_range_filtered.cpp deleted file mode 100644 index 1a3b82f1a68..00000000000 --- a/dbms/src/Storages/tests/gtest_range_filtered.cpp +++ /dev/null @@ -1,44 +0,0 @@ -#include -#include -#include -#include - - -TEST(RangeFiltered, simple) -{ - std::vector v; - - for (int i = 0; i < 10; ++i) - v.push_back(i); - - auto v30 = createRangeFiltered([] (int i) { return i % 3 == 0; }, v); - auto v31 = createRangeFiltered([] (int i) { return i % 3 != 0; }, v); - - for (const int & i : v30) - ASSERT_EQ(i % 3, 0); - - for (const int & i : v31) - ASSERT_NE(i % 3, 0); - - { - auto it = v30.begin(); - ASSERT_EQ(*it, 0); - - auto it2 = std::next(it); - ASSERT_EQ(*it2, 3); - - it = std::next(it2); - ASSERT_EQ(*it, 6); - } - - { - auto it = std::next(v30.begin()); - ASSERT_EQ(*it, 3); - - *it = 2; /// it becomes invalid - ASSERT_EQ(*(++it), 6); /// but iteration is sucessfull - - *v30.begin() = 1; - ASSERT_EQ(*v30.begin(), 6); - } -} diff --git a/dbms/tests/integration/test_random_inserts/test.py b/dbms/tests/integration/test_random_inserts/test.py index d9325c91191..bfa5c451f44 100644 --- a/dbms/tests/integration/test_random_inserts/test.py +++ b/dbms/tests/integration/test_random_inserts/test.py @@ -26,6 +26,7 @@ def started_cluster(): pass cluster.shutdown() + def test_random_inserts(started_cluster): # Duration of the test, reduce it if don't want to wait DURATION_SECONDS = 10# * 60 @@ -55,7 +56,9 @@ def test_random_inserts(started_cluster): inserter.get_answer() answer="{}\t{}\t{}\t{}\n".format(num_timestamps, num_timestamps, min_timestamp, max_timestamp) + for node in nodes: - assert TSV(node.query("SELECT count(), uniqExact(i), min(i), max(i) FROM simple")) == TSV(answer), node.name + " : " + node.query("SELECT groupArray(_part), i, count() AS c FROM simple GROUP BY i ORDER BY c DESC LIMIT 1") + res = node.query("SELECT count(), uniqExact(i), min(i), max(i) FROM simple") + assert TSV(res) == TSV(answer), node.name + " : " + node.query("SELECT groupArray(_part), i, count() AS c FROM simple GROUP BY i ORDER BY c DESC LIMIT 1") node1.query("""DROP TABLE simple ON CLUSTER test_cluster""") diff --git a/dbms/tests/integration/test_random_inserts/test.sh b/dbms/tests/integration/test_random_inserts/test.sh index d743ffe4e91..006ee673fe9 100755 --- a/dbms/tests/integration/test_random_inserts/test.sh +++ b/dbms/tests/integration/test_random_inserts/test.sh @@ -4,6 +4,7 @@ [[ -n "$1" ]] && host="$1" || host="127.0.0.1" [[ -n "$2" ]] && min_timestamp="$2" || min_timestamp=$(( $(date +%s) - 10 )) [[ -n "$3" ]] && max_timestamp="$3" || max_timestamp=$(( $(date +%s) + 10 )) +[[ -n "$4" ]] && iters_per_timestamp="$4" || iters_per_timestamp=1 timestamps=`seq $min_timestamp $max_timestamp` @@ -40,6 +41,6 @@ for i in $timestamps; do cur_timestamp=$(date +%s) done - #echo $i >> $host".txt" reliable_insert "$i" -done \ No newline at end of file +done +sleep 1 diff --git a/libs/libcommon/include/common/RangeFiltered.h b/libs/libcommon/include/common/RangeFiltered.h deleted file mode 100644 index cdb8f902409..00000000000 --- a/libs/libcommon/include/common/RangeFiltered.h +++ /dev/null @@ -1,127 +0,0 @@ -#pragma once -#include - - -/// Similar to boost::filtered_range but a little bit easier and allows to convert ordinary iterators to filtered -template -struct RangeFiltered -{ - /// Template parameter C may be const. Then const_iterator is used. - using RawIterator = decltype(std::declval().begin()); - class Iterator; - - /// Will iterate over elements for which filter(*it) == true - template /// Another template for universal references to work. - RangeFiltered(F_ && filter, C_ && container) - : filter(std::move(filter)), container(container) {} - - Iterator begin() const - { - return Iterator{*this, std::begin(container)}; - } - - Iterator end() const - { - return Iterator{*this, std::end(container)}; - } - - /// Convert ordinary iterator to filtered one - /// Real position will be in range [ordinary_iterator; end()], so it is suitable to use with lower[upper]_bound() - inline Iterator convert(RawIterator ordinary_iterator) const - { - return Iterator{*this, ordinary_iterator}; - } - - - /// It is similar to boost::filtered_iterator, but has additional features: - /// it doesn't store end() iterator - /// it doesn't store predicate, so it allows to implement operator=() - /// it guarantees that operator++() works properly in case of filter(*it) == false - class Iterator - { - public: - using Range = RangeFiltered; - - typedef Iterator self_type; - typedef typename std::iterator_traits::value_type value_type; - typedef typename std::iterator_traits::reference reference; - typedef const value_type & const_reference; - typedef typename std::iterator_traits::pointer pointer; - typedef const value_type * const_pointer; - typedef typename std::iterator_traits::difference_type difference_type; - typedef std::bidirectional_iterator_tag iterator_category; - - Iterator(const Range & range_, RawIterator iter_) - : range(&range_), iter(iter_) - { - for (; iter != std::end(range->container) && !range->filter(*iter); ++iter); - } - - Iterator(const Iterator & rhs) = default; - Iterator(Iterator && rhs) noexcept = default; - - Iterator operator++() - { - ++iter; - for (; iter != std::end(range->container) && !range->filter(*iter); ++iter); - return *this; - } - - Iterator operator--() - { - --iter; - for (; !range->filter(*iter); --iter); /// Don't check std::begin() bound - return *this; - } - - pointer operator->() - { - return iter.operator->(); - } - - const_pointer operator->() const - { - return iter.operator->(); - } - - reference operator*() - { - return *iter; - } - - const_reference operator*() const - { - return *iter; - } - - bool operator==(const self_type & rhs) const - { - return iter == rhs.iter; - } - - bool operator!=(const self_type & rhs) const - { - return iter != rhs.iter; - } - - self_type & operator=(const self_type & rhs) = default; - self_type & operator=(self_type && rhs) noexcept = default; - - ~Iterator() = default; - - private: - const Range * range = nullptr; - RawIterator iter; - }; - -protected: - F filter; - C & container; -}; - - -template -inline RangeFiltered, std::remove_reference_t> createRangeFiltered(F && filter, C && container) -{ - return {std::forward(filter), std::forward(container)}; -};