This commit is contained in:
Michael Kolupaev 2014-05-27 14:03:13 +04:00
parent d51f9cae03
commit cf2843d22c
3 changed files with 17 additions and 11 deletions

View File

@ -403,10 +403,10 @@ public:
*/
void delayInsertIfNeeded();
/** Возвращает кусок с указанным именем или кусок, покрывающий его. Если такого нет, возвращает nullptr.
* Если including_inactive, просматриваются также неактивные куски (all_data_parts).
* При including_inactive, нахождение куска гарантируется только если есть кусок, совпадающий с part_name;
* строго покрывающий кусок в некоторых случаях может не найтись.
/** Если !including_inactive:
* Возвращает активный кусок с указанным именем или кусок, покрывающий его. Если такого нет, возвращает nullptr.
* Если including_inactive:
* Если среди all_data_parts есть кусок с именем part_name, возвращает его. Иначе делает то же, что при !including_inactive.
*/
DataPartPtr getContainingPart(const String & part_name, bool including_inactive = false);

View File

@ -697,14 +697,20 @@ MergeTreeData::DataPartPtr MergeTreeData::getContainingPart(const String & part_
MutableDataPartPtr tmp_part(new DataPart(*this));
ActiveDataPartSet::parsePartName(part_name, *tmp_part);
Poco::ScopedLock<Poco::FastMutex> lock(including_inactive ? all_data_parts_mutex : data_parts_mutex);
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
DataParts & parts = including_inactive ? all_data_parts : data_parts;
if (including_inactive)
{
Poco::ScopedLock<Poco::FastMutex> lock(all_data_parts_mutex);
DataParts::iterator it = all_data_parts.lower_bound(tmp_part);
if (it != all_data_parts.end() && (*it)->name == part_name)
return *it;
}
/// Кусок может покрываться только предыдущим или следующим в data_parts.
DataParts::iterator it = parts.lower_bound(tmp_part);
DataParts::iterator it = data_parts.lower_bound(tmp_part);
if (it != parts.end())
if (it != data_parts.end())
{
if ((*it)->name == part_name)
return *it;
@ -712,7 +718,7 @@ MergeTreeData::DataPartPtr MergeTreeData::getContainingPart(const String & part_
return *it;
}
if (it != parts.begin())
if (it != data_parts.begin())
{
--it;
if ((*it)->contains(*tmp_part))

View File

@ -725,7 +725,7 @@ void StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
entry.type == LogEntry::MERGE_PARTS)
{
/// Если у нас уже есть этот кусок или покрывающий его кусок, ничего делать не нужно.
MergeTreeData::DataPartPtr containing_part = data.getContainingPart(entry.new_part_name);
MergeTreeData::DataPartPtr containing_part = data.getContainingPart(entry.new_part_name, true);
/// Даже если кусок есть локально, его (в исключительных случаях) может не быть в zookeeper.
if (containing_part && zookeeper->exists(replica_path + "/parts/" + containing_part->name))
@ -737,7 +737,7 @@ void StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
}
if (entry.type == LogEntry::GET_PART && entry.source_replica == replica_name)
LOG_ERROR(log, "Part " << entry.new_part_name << " from own log doesn't exist.");
LOG_WARNING(log, "Part " << entry.new_part_name << " from own log doesn't exist.");
bool do_fetch = false;