Investigation [#METR-22327].

This commit is contained in:
Alexey Milovidov 2016-09-02 07:03:40 +03:00
parent 5cdc330273
commit 322e5031bd
3 changed files with 45 additions and 47 deletions

View File

@ -88,7 +88,7 @@ public:
CurrentMetrics::Increment metric_increment; CurrentMetrics::Increment metric_increment;
}; };
using ReservationPtr = std::shared_ptr<Reservation>; using ReservationPtr = std::unique_ptr<Reservation>;
static size_t getUnreservedFreeSpace(const std::string & path) static size_t getUnreservedFreeSpace(const std::string & path)
{ {
@ -131,7 +131,7 @@ public:
if (free_bytes < size) if (free_bytes < size)
throw Exception("Not enough free disk space to reserve: " + formatReadableSizeWithBinarySuffix(free_bytes) + " available, " throw Exception("Not enough free disk space to reserve: " + formatReadableSizeWithBinarySuffix(free_bytes) + " available, "
+ formatReadableSizeWithBinarySuffix(size) + " requested", ErrorCodes::NOT_ENOUGH_SPACE); + formatReadableSizeWithBinarySuffix(size) + " requested", ErrorCodes::NOT_ENOUGH_SPACE);
return std::make_shared<Reservation>(size); return std::make_unique<Reservation>(size);
} }
private: private:

View File

@ -131,47 +131,7 @@ private:
BackgroundProcessingPool::TaskHandle merge_task_handle; BackgroundProcessingPool::TaskHandle merge_task_handle;
/// While exists, marks parts as 'currently_merging' and reserves free space on filesystem. friend struct CurrentlyMergingPartsTagger;
/// It's possible to mark parts before.
struct CurrentlyMergingPartsTagger
{
MergeTreeData::DataPartsVector parts;
DiskSpaceMonitor::ReservationPtr reserved_space;
StorageMergeTree & storage;
CurrentlyMergingPartsTagger(const MergeTreeData::DataPartsVector & parts_, size_t total_size, StorageMergeTree & storage_)
: parts(parts_), storage(storage_)
{
/// Assume mutex is already locked, because this method is called from mergeTask.
reserved_space = DiskSpaceMonitor::reserve(storage.full_path, total_size); /// May throw.
for (const auto & part : parts)
{
if (storage.currently_merging.count(part))
throw Exception("Tagging alreagy tagged part " + part->name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
}
storage.currently_merging.insert(parts.begin(), parts.end());
}
~CurrentlyMergingPartsTagger()
{
try
{
std::lock_guard<std::mutex> lock(storage.currently_merging_mutex);
for (const auto & part : parts)
{
if (!storage.currently_merging.count(part))
throw Exception("Untagging already untagged part " + part->name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
storage.currently_merging.erase(part);
}
}
catch (...)
{
tryLogCurrentException("~CurrentlyMergingPartsTagger");
}
}
};
using CurrentlyMergingPartsTaggerPtr = std::shared_ptr<CurrentlyMergingPartsTagger>;
StorageMergeTree( StorageMergeTree(
const String & path_, const String & path_,

View File

@ -1,3 +1,4 @@
#include <experimental/optional>
#include <DB/Core/FieldVisitors.h> #include <DB/Core/FieldVisitors.h>
#include <DB/Storages/StorageMergeTree.h> #include <DB/Storages/StorageMergeTree.h>
#include <DB/Storages/MergeTree/MergeTreeBlockOutputStream.h> #include <DB/Storages/MergeTree/MergeTreeBlockOutputStream.h>
@ -251,6 +252,44 @@ void StorageMergeTree::alter(
data.loadDataParts(false); data.loadDataParts(false);
} }
/// While exists, marks parts as 'currently_merging' and reserves free space on filesystem.
/// It's possible to mark parts before.
struct CurrentlyMergingPartsTagger
{
MergeTreeData::DataPartsVector parts;
DiskSpaceMonitor::ReservationPtr reserved_space;
StorageMergeTree * storage = nullptr;
CurrentlyMergingPartsTagger() = default;
CurrentlyMergingPartsTagger(const MergeTreeData::DataPartsVector & parts_, size_t total_size, StorageMergeTree & storage_)
: parts(parts_), storage(&storage_)
{
/// Assume mutex is already locked, because this method is called from mergeTask.
reserved_space = DiskSpaceMonitor::reserve(storage->full_path, total_size); /// May throw.
for (const auto & part : parts)
{
if (storage->currently_merging.count(part))
throw Exception("Tagging alreagy tagged part " + part->name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
}
storage->currently_merging.insert(parts.begin(), parts.end());
}
~CurrentlyMergingPartsTagger()
{
std::lock_guard<std::mutex> lock(storage->currently_merging_mutex);
for (const auto & part : parts)
{
if (!storage->currently_merging.count(part)) /// leads to std::terminate, that's Ok.
throw Exception("Untagging already untagged part " + part->name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
storage->currently_merging.erase(part);
}
}
};
bool StorageMergeTree::merge( bool StorageMergeTree::merge(
size_t aio_threshold, size_t aio_threshold,
bool aggressive, bool aggressive,
@ -267,7 +306,7 @@ bool StorageMergeTree::merge(
size_t disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path); size_t disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);
/// Нужно вызывать деструктор под незалоченным currently_merging_mutex. /// Нужно вызывать деструктор под незалоченным currently_merging_mutex.
CurrentlyMergingPartsTaggerPtr merging_tagger; std::experimental::optional<CurrentlyMergingPartsTagger> merging_tagger;
String merged_name; String merged_name;
{ {
@ -301,8 +340,7 @@ bool StorageMergeTree::merge(
if (!selected) if (!selected)
return false; return false;
merging_tagger = std::make_shared<CurrentlyMergingPartsTagger>( merging_tagger.emplace(parts, MergeTreeDataMerger::estimateDiskSpaceForMerge(parts), *this);
parts, MergeTreeDataMerger::estimateDiskSpaceForMerge(parts), *this);
/// Если собираемся сливать большие куски, увеличим счетчик потоков, сливающих большие куски. /// Если собираемся сливать большие куски, увеличим счетчик потоков, сливающих большие куски.
if (pool_context) if (pool_context)
@ -321,7 +359,7 @@ bool StorageMergeTree::merge(
const auto & merge_entry = context.getMergeList().insert(database_name, table_name, merged_name); const auto & merge_entry = context.getMergeList().insert(database_name, table_name, merged_name);
auto new_part = merger.mergePartsToTemporaryPart( auto new_part = merger.mergePartsToTemporaryPart(
merging_tagger->parts, merged_name, *merge_entry, aio_threshold, time(0), &*merging_tagger->reserved_space); merging_tagger->parts, merged_name, *merge_entry, aio_threshold, time(0), merging_tagger->reserved_space.get());
merger.renameMergedTemporaryPart(merging_tagger->parts, new_part, merged_name, nullptr); merger.renameMergedTemporaryPart(merging_tagger->parts, new_part, merged_name, nullptr);