This commit is contained in:
Alexey Milovidov 2015-06-02 23:22:53 +03:00
parent 5752d8f561
commit c19193cf61
4 changed files with 20 additions and 21 deletions

View File

@ -1,7 +1,5 @@
#pragma once
#include <statdaemons/Increment.h>
#include <DB/Core/SortDescription.h>
#include <DB/Interpreters/Context.h>
#include <DB/Interpreters/ExpressionActions.h>
@ -20,6 +18,9 @@
#define MERGE_TREE_MARK_SIZE (2 * sizeof(size_t))
struct SimpleIncrement;
namespace DB
{
@ -743,12 +744,12 @@ public:
* Предполагается, что кусок не пересекается с существующими.
* Если out_transaction не nullptr, присваивает туда объект, позволяющий откатить добавление куска (но не переименование).
*/
void renameTempPartAndAdd(MutableDataPartPtr & part, Increment * increment = nullptr, Transaction * out_transaction = nullptr);
void renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr);
/** То же, что renameTempPartAndAdd, но кусок может покрывать существующие куски.
* Удаляет и возвращает все куски, покрытые добавляемым (в возрастающем порядке).
*/
DataPartsVector renameTempPartAndReplace(MutableDataPartPtr & part, Increment * increment = nullptr, Transaction * out_transaction = nullptr);
DataPartsVector renameTempPartAndReplace(MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr);
/** Убирает из рабочего набора куски remove и добавляет куски add. add должны уже быть в all_data_parts.
* Если clear_without_timeout, данные будут удалены при следующем clearOldParts, игнорируя old_parts_lifetime.

View File

@ -6,6 +6,7 @@
#include <DB/Storages/MergeTree/MergeTreeDataMerger.h>
#include <DB/Storages/MergeTree/DiskSpaceMonitor.h>
#include <DB/Storages/MergeTree/BackgroundProcessingPool.h>
#include <statdaemons/Increment.h>
namespace DB
@ -107,7 +108,6 @@ private:
String database_name;
String table_name;
String full_path;
Increment increment;
Context & context;
BackgroundProcessingPool & background_pool;
@ -117,6 +117,9 @@ private:
MergeTreeDataWriter writer;
MergeTreeDataMerger merger;
/// Для нумерации блоков.
SimpleIncrement increment;
MergeTreeData::DataParts currently_merging;
Poco::FastMutex currently_merging_mutex;

View File

@ -17,6 +17,7 @@
#include <DB/Common/localBackup.h>
#include <DB/Functions/FunctionFactory.h>
#include <Poco/DirectoryIterator.h>
#include <statdaemons/Increment.h>
#include <algorithm>
#include <iomanip>
@ -681,7 +682,7 @@ MergeTreeData::AlterDataPartTransaction::~AlterDataPartTransaction()
}
void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, Increment * increment, Transaction * out_transaction)
void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction)
{
auto removed = renameTempPartAndReplace(part, increment, out_transaction);
if (!removed.empty())
@ -692,7 +693,7 @@ void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, Increment *
}
MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
MutableDataPartPtr & part, Increment * increment, Transaction * out_transaction)
MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction)
{
if (out_transaction && out_transaction->data)
throw Exception("Using the same MergeTreeData::Transaction for overlapping transactions is invalid", ErrorCodes::LOGICAL_ERROR);
@ -710,7 +711,7 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
* содержат ещё не добавленный кусок.
*/
if (increment)
part->left = part->right = increment->get(false);
part->left = part->right = increment->get();
String new_name = ActiveDataPartSet::getPartName(part->left_date, part->right_date, part->left, part->right, part->level);
@ -814,7 +815,9 @@ void MergeTreeData::attachPart(const DataPartPtr & part)
if (!all_data_parts.insert(part).second)
throw Exception("Part " + part->name + " is already attached", ErrorCodes::DUPLICATE_DATA_PART);
data_parts.insert(part);
addPartContributionToColumnSizes(part);
}
void MergeTreeData::renameAndDetachPart(const DataPartPtr & part, const String & prefix, bool restore_covered, bool move_to_detached)

View File

@ -29,18 +29,17 @@ StorageMergeTree::StorageMergeTree(
const MergeTreeSettings & settings_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
path(path_), database_name(database_name_), table_name(table_name_), full_path(path + escapeForFileName(table_name) + '/'),
increment(full_path + "increment.txt"), context(context_), background_pool(context_.getBackgroundPool()),
context(context_), background_pool(context_.getBackgroundPool()),
data(full_path, columns_,
materialized_columns_, alias_columns_, column_defaults_,
context_, primary_expr_ast_, date_column_name_,
sampling_expression_, index_granularity_,mode_, sign_column_, columns_to_sum_,
settings_, database_name_ + "." + table_name, false),
reader(data), writer(data), merger(data),
increment(data.getMaxDataPartIndex()),
log(&Logger::get(database_name_ + "." + table_name + " (StorageMergeTree)")),
shutdown_called(false)
{
increment.fixIfBroken(data.getMaxDataPartIndex());
data.loadDataParts(false);
data.clearOldParts();
}
@ -130,8 +129,6 @@ void StorageMergeTree::rename(const String & new_path_to_db, const String & new_
table_name = new_table_name;
full_path = new_full_path;
increment.setPath(full_path + "increment.txt");
/// TODO: Можно обновить названия логгеров у this, data, reader, writer, merger.
}
@ -341,16 +338,11 @@ void StorageMergeTree::attachPartition(const Field & field, bool unreplicated, b
LOG_DEBUG(log, "Checking data");
MergeTreeData::MutableDataPartPtr part = data.loadPartAndFixMetadata(source_path);
UInt64 index = increment.get();
String new_part_name = ActiveDataPartSet::getPartName(part->left_date, part->right_date, index, index, 0);
part->renameTo(new_part_name);
part->name = new_part_name;
ActiveDataPartSet::parsePartName(part->name, *part);
LOG_INFO(log, "Attaching part " << source_part_name << " from " << source_path << " as " << new_part_name);
LOG_INFO(log, "Attaching part " << source_part_name << " from " << source_path);
data.renameTempPartAndAdd(part, &increment);
data.attachPart(part);
LOG_INFO(log, "Finished attaching part " << new_part_name);
LOG_INFO(log, "Finished attaching part");
}
/// На месте удаленных кусков могут появиться новые, с другими данными.