This commit is contained in:
Michael Kolupaev 2014-03-13 23:07:17 +04:00
parent 00d9c28571
commit 2a766770fc
4 changed files with 34 additions and 21 deletions

View File

@ -329,6 +329,7 @@ public:
void dropAllData();
/** Поменять путь к директории с данными. Предполагается, что все данные из старой директории туда перенесли.
* Сбрасывает кеши разжатых блоков и засечек.
* Нужно вызывать под залоченным lockStructure().
*/
void setPath(const String & full_path);

View File

@ -117,6 +117,11 @@ private:
{
/// Здесь не лочится мьютекс, так как конструктор вызывается внутри mergeThread, где он уже залочен.
reserved_space = DiskSpaceMonitor::reserve(storage.full_path, total_size); /// Может бросить исключение.
for (const auto & part : parts)
{
if (storage.currently_merging.count(part))
throw Exception("Tagging alreagy tagged part " + part->name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
}
storage.currently_merging.insert(parts.begin(), parts.end());
}
@ -125,9 +130,11 @@ private:
try
{
Poco::ScopedLock<Poco::FastMutex> lock(storage.currently_merging_mutex);
for (size_t i = 0; i < parts.size(); ++i)
for (const auto & part : parts)
{
storage.currently_merging.erase(parts[i]);
if (!storage.currently_merging.count(part))
throw Exception("Untagging already untagged part " + part->name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
storage.currently_merging.erase(part);
}
}
catch (...)

View File

@ -353,6 +353,9 @@ void MergeTreeData::clearOldParts()
void MergeTreeData::setPath(const String & new_full_path)
{
full_path = new_full_path;
context.getUncompressedCache()->reset();
context.getMarkCache()->reset();
log = &Logger::get(lastTwoPathComponents(full_path));
}
void MergeTreeData::dropAllData()

View File

@ -138,33 +138,35 @@ void StorageMergeTree::mergeThread(bool while_can, bool aggressive)
size_t disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);
{
/// К концу этого логического блока должен быть вызван деструктор, чтобы затем корректно определить удаленные куски
/// Нужно вызывать деструктор под незалоченным currently_merging_mutex.
CurrentlyMergingPartsTaggerPtr merging_tagger;
Poco::ScopedLock<Poco::FastMutex> lock(currently_merging_mutex);
/// К концу этого логического блока должен быть вызван деструктор, чтобы затем корректно определить удаленные куски
MergeTreeData::DataPartsVector parts;
auto can_merge = boost::bind(&StorageMergeTree::canMergeParts, this, _1, _2);
bool only_small = false;
/// Если есть активный мердж крупных кусков, то ограничиваемся мерджем только маленьких частей.
for (const auto & part : currently_merging)
{
if (part->size * data.index_granularity > 25 * 1024 * 1024)
Poco::ScopedLock<Poco::FastMutex> lock(currently_merging_mutex);
MergeTreeData::DataPartsVector parts;
auto can_merge = boost::bind(&StorageMergeTree::canMergeParts, this, _1, _2);
bool only_small = false;
/// Если есть активный мердж крупных кусков, то ограничиваемся мерджем только маленьких частей.
for (const auto & part : currently_merging)
{
only_small = true;
break;
if (part->size * data.index_granularity > 25 * 1024 * 1024)
{
only_small = true;
break;
}
}
if (!merger.selectPartsToMerge(parts, disk_space, false, aggressive, only_small, can_merge) &&
!merger.selectPartsToMerge(parts, disk_space, true, aggressive, only_small, can_merge))
break;
merging_tagger = new CurrentlyMergingPartsTagger(parts, merger.estimateDiskSpaceForMerge(parts), *this);
}
if (!merger.selectPartsToMerge(parts, disk_space, false, aggressive, only_small, can_merge) &&
!merger.selectPartsToMerge(parts, disk_space, true, aggressive, only_small, can_merge))
break;
merging_tagger = new CurrentlyMergingPartsTagger(parts, merger.estimateDiskSpaceForMerge(parts), *this);
merger.mergeParts(parts);
merger.mergeParts(merging_tagger->parts);
}
if (shutdown_called)