This commit is contained in:
Michael Kolupaev 2014-05-27 16:08:40 +04:00
parent cf2843d22c
commit 3aad8dad8f
5 changed files with 27 additions and 9 deletions

View File

@ -427,10 +427,6 @@ public:
*/
void renameAndDetachPart(DataPartPtr part, const String & prefix);
/** Удаляет кусок из рабочего набора. clearOldParts удалит его файлы, если на него никто не ссылается.
*/
void removePart(DataPartPtr part);
/** Удалить неактуальные куски. Возвращает имена удаленных кусков.
*/
Strings clearOldParts();

View File

@ -597,6 +597,7 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(MutableDa
/// Переименовываем кусок.
Poco::File(old_path).renameTo(new_path);
bool obsolete = false; /// Покрыт ли part каким-нибудь куском.
DataPartsVector res;
/// Куски, содержащиеся в part, идут в data_parts подряд, задевая место, куда вставился бы сам part.
DataParts::iterator it = data_parts.lower_bound(part);
@ -606,6 +607,8 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(MutableDa
--it;
if (!part->contains(**it))
{
if ((*it)->contains(*part))
obsolete = true;
++it;
break;
}
@ -615,14 +618,28 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(MutableDa
}
std::reverse(res.begin(), res.end()); /// Нужно получить куски в порядке возрастания.
/// Пойдем вправо.
while (it != data_parts.end() && part->contains(**it))
while (it != data_parts.end())
{
if (!part->contains(**it))
{
if ((*it)->name == part->name || (*it)->contains(*part))
obsolete = true;
break;
}
res.push_back(*it);
(*it)->remove_time = time(0);
data_parts.erase(it++);
}
if (obsolete)
{
LOG_WARNING(log, "Obsolete part " + part->name + " added");
}
else
{
data_parts.insert(part);
}
all_data_parts.insert(part);
return res;

View File

@ -336,7 +336,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(const MergeTreeData::
/// Проверим, что удалились все исходные куски и только они.
if (replaced_parts.size() != parts.size())
{
LOG_ERROR(log, "Unexpected number of parts removed when adding " << new_data_part->name << ": " << replaced_parts.size()
LOG_WARNING(log, "Unexpected number of parts removed when adding " << new_data_part->name << ": " << replaced_parts.size()
<< " instead of " << parts.size());
}
else

View File

@ -617,7 +617,8 @@ void StorageReplicatedMergeTree::pullLogsToQueue()
bool operator<(const LogIterator & rhs) const
{
return timestamp < rhs.timestamp;
/// Нужно доставать из очереди минимальный timestamp.
return timestamp > rhs.timestamp;
}
bool readEntry(zkutil::ZooKeeper & zookeeper, const String & zookeeper_path)
@ -1395,10 +1396,14 @@ void StorageReplicatedMergeTree::drop()
{
shutdown();
LOG_INFO(log, "Removing replica " << replica_path);
replica_is_active_node = nullptr;
zookeeper->removeRecursive(replica_path);
if (zookeeper->getChildren(zookeeper_path + "/replicas").empty())
{
LOG_INFO(log, "Removing table " << zookeeper_path << " (this might take several minutes)");
zookeeper->removeRecursive(zookeeper_path);
}
}
void StorageReplicatedMergeTree::LogEntry::writeText(WriteBuffer & out) const

View File

@ -316,7 +316,7 @@ ReturnCode::type ZooKeeper::tryMulti(const Ops & ops, OpResultsPtr * out_results
return code;
}
void ZooKeeper::removeChildrenRecursive(const std::string& path)
void ZooKeeper::removeChildrenRecursive(const std::string & path)
{
Strings children = getChildren(path);
zkutil::Ops ops;