dbms: removing deduplicated parts immediately [#METR-17988].

This commit is contained in:
Alexey Milovidov 2015-09-09 22:03:46 +03:00
parent f6cfc72000
commit ec2151e646
3 changed files with 36 additions and 9 deletions

View File

@ -498,7 +498,7 @@ public:
/// Некоторые операции над множеством кусков могут возвращать такой объект.
/// Если не был вызван commit, деструктор откатывает операцию.
/// Если не был вызван commit или rollback, деструктор откатывает операцию.
class Transaction : private boost::noncopyable
{
public:
@ -506,20 +506,25 @@ public:
void commit()
{
data = nullptr;
parts_to_remove_on_rollback.clear();
parts_to_add_on_rollback.clear();
clear();
}
void rollback()
{
if (data && (!parts_to_remove_on_rollback.empty() || !parts_to_add_on_rollback.empty()))
{
LOG_DEBUG(data->log, "Undoing transaction");
data->replaceParts(parts_to_remove_on_rollback, parts_to_add_on_rollback, true);
clear();
}
}
~Transaction()
{
try
{
if (data && (!parts_to_remove_on_rollback.empty() || !parts_to_add_on_rollback.empty()))
{
LOG_DEBUG(data->log, "Undoing transaction");
data->replaceParts(parts_to_remove_on_rollback, parts_to_add_on_rollback, true);
}
rollback();
}
catch(...)
{
@ -534,6 +539,13 @@ public:
/// Что делать для отката операции.
DataPartsVector parts_to_remove_on_rollback;
DataPartsVector parts_to_add_on_rollback;
void clear()
{
data = nullptr;
parts_to_remove_on_rollback.clear();
parts_to_add_on_rollback.clear();
}
};
/// Объект, помнящий какие временные файлы были созданы в директории с куском в ходе изменения (ALTER) его столбцов.

View File

@ -113,6 +113,10 @@ public:
/// Если данные отличались от тех, что были вставлены ранее с тем же ID, бросим исключение.
expected_checksums.checkEqual(part->checksums, true);
/// У part-а уменьшится refcount, и его смогут удалить сразу при откате транзакции, а не позже.
part.reset();
transaction.rollback();
}
else
{

View File

@ -799,6 +799,17 @@ void MergeTreeData::replaceParts(const DataPartsVector & remove, const DataParts
part->remove_time = clear_without_timeout ? 0 : time(0);
removePartContributionToColumnSizes(part);
data_parts.erase(part);
/// use_count равен двум, если part-ом владеет только remove и all_data_parts.
if (clear_without_timeout && part.use_count() <= 2)
{
LOG_DEBUG(log, "Removing part " << part->name);
part->remove();
Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);
all_data_parts.erase(part);
}
/// Иначе кусок будет удалён с диска позже.
}
for (const DataPartPtr & part : add)
{