This commit is contained in:
Evgeniy Gatov 2014-08-08 15:01:36 +04:00
commit af9be319fd
23 changed files with 469 additions and 139 deletions

View File

@ -257,6 +257,7 @@ namespace ErrorCodes
INCORRECT_MARK,
INVALID_PARTITION_NAME,
NOT_LEADER,
NOT_ENOUGH_BLOCK_NUMBERS,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,

View File

@ -220,8 +220,8 @@ private:
void normalizeTree();
void normalizeTreeImpl(ASTPtr & ast, MapOfASTs & finished_asts, SetOfASTs & current_asts, std::string current_alias);
/// Eliminates injective function calls from group by statement
void eliminateInjectives();
/// Eliminates injective function calls and constant expressions from group by statement
void optimizeGroupBy();
/// Превратить перечисление значений или подзапрос в ASTSet. node - функция in или notIn.
void makeSet(ASTFunction * node, const Block & sample_block);

View File

@ -46,7 +46,7 @@ private:
static PartitionCommand attachPartition(const Field & partition, bool unreplicated, bool part)
{
return {ATTACH_PARTITION, partition, false, part, unreplicated};
return {ATTACH_PARTITION, partition, false, unreplicated, part};
}
};

View File

@ -22,7 +22,7 @@ protected:
class ParserParenthesisExpression : public IParserBase
{
protected:
const char * getName() const { return "expression in parenthesis"; }
const char * getName() const { return "parenthesized expression"; }
bool parseImpl(Pos & pos, Pos end, ASTPtr & node, Expected & expected);
};

View File

@ -63,12 +63,14 @@ public:
void add(const String & name);
String getContainingPart(const String & name) const;
Strings getParts() const;
Strings getParts() const; /// В порядке возрастания месяца и номера блока.
size_t size() const;
static String getPartName(DayNum_t left_date, DayNum_t right_date, UInt64 left_id, UInt64 right_id, UInt64 level);
/// Возвращает true если имя директории совпадает с форматом имени директории кусочков
static bool isPartDirectory(const String & dir_name, Poco::RegularExpression::MatchVec & matches);
static bool isPartDirectory(const String & dir_name, Poco::RegularExpression::MatchVec * out_matches = nullptr);
/// Кладет в DataPart данные из имени кусочка.
static void parsePartName(const String & file_name, Part & part, const Poco::RegularExpression::MatchVec * matches = nullptr);

View File

@ -110,11 +110,17 @@ struct MergeTreeSettings
double insert_delay_step = 1.1;
/// Для скольки последних блоков хранить хеши в ZooKeeper.
size_t replicated_deduplication_window = 1000;
size_t replicated_deduplication_window = 100;
/// Хранить примерно столько последних записей в логе в ZooKeeper, даже если они никому уже не нужны.
/// Не влияет на работу таблиц; используется только чтобы успеть посмотреть на лог в ZooKeeper глазами прежде, чем его очистят.
size_t replicated_logs_to_keep = 100;
/// Максимальное количество ошибок при загрузке кусков, при котором ReplicatedMergeTree соглашается запускаться.
size_t replicated_max_unexpected_parts = 3;
size_t replicated_max_unexpectedly_merged_parts = 2;
size_t replicated_max_missing_obsolete_parts = 5;
size_t replicated_max_missing_active_parts = 20;
};
class MergeTreeData : public ITableDeclaration
@ -307,17 +313,22 @@ public:
Poco::File(to).remove(true);
}
/// Переименовывает кусок, дописав к имени префикс.
void renameAddPrefix(const String & prefix) const
void renameTo(const String & new_name) const
{
String from = storage.full_path + name + "/";
String to = storage.full_path + prefix + name + "/";
String to = storage.full_path + new_name + "/";
Poco::File f(from);
f.setLastModified(Poco::Timestamp::fromEpochTime(time(0)));
f.renameTo(to);
}
/// Переименовывает кусок, дописав к имени префикс.
void renameAddPrefix(const String & prefix) const
{
renameTo(prefix + name);
}
/// Загрузить индекс и вычислить размер. Если size=0, вычислить его тоже.
void loadIndex()
{
@ -344,12 +355,12 @@ public:
}
/// Прочитать контрольные суммы, если есть.
void loadChecksums()
void loadChecksums(bool require)
{
String path = storage.full_path + name + "/checksums.txt";
if (!Poco::File(path).exists())
{
if (storage.require_part_metadata)
if (require)
throw Exception("No checksums.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
return;
@ -359,16 +370,21 @@ public:
assertEOF(file);
}
void loadColumns()
void loadColumns(bool require)
{
String path = storage.full_path + name + "/columns.txt";
if (!Poco::File(path).exists())
{
if (storage.require_part_metadata)
if (require)
throw Exception("No columns.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
columns = *storage.columns;
/// Если нет файла со списком столбцов, запишем его.
for (const NameAndTypePair & column : *storage.columns)
{
if (Poco::File(storage.full_path + name + "/" + escapeForFileName(column.name) + ".bin").exists())
columns.push_back(column);
}
{
WriteBufferFromFile out(path + ".tmp", 4096);
columns.writeText(out);
@ -382,7 +398,7 @@ public:
columns.readText(file, storage.context.getDataTypeFactory());
}
void checkNotBroken()
void checkNotBroken(bool require_part_metadata)
{
String path = storage.full_path + name;
@ -391,7 +407,7 @@ public:
if (!checksums.files.count("primary.idx"))
throw Exception("No checksum for primary.idx", ErrorCodes::NO_FILE_IN_DATA_PART);
if (storage.require_part_metadata)
if (require_part_metadata)
{
for (const NameAndTypePair & it : columns)
{
@ -625,15 +641,23 @@ public:
*/
DataPartsVector renameTempPartAndReplace(MutableDataPartPtr part, Increment * increment = nullptr, Transaction * out_transaction = nullptr);
/** Убирает из рабочего набора куски remove и добавляет куски add.
/** Убирает из рабочего набора куски remove и добавляет куски add. add должны уже быть в all_data_parts.
* Если clear_without_timeout, данные будут удалены при следующем clearOldParts, игнорируя old_parts_lifetime.
*/
void replaceParts(const DataPartsVector & remove, const DataPartsVector & add, bool clear_without_timeout);
/** Добавляет новый кусок в список известных кусков и в рабочий набор.
*/
void attachPart(DataPartPtr part);
/** Переименовывает кусок в detached/prefix_кусок и забывает про него. Данные не будут удалены в clearOldParts.
* Если restore_covered, добавляет в рабочий набор неактивные куски, слиянием которых получен удаляемый кусок.
*/
void renameAndDetachPart(DataPartPtr part, const String & prefix = "", bool restore_covered = false);
void renameAndDetachPart(DataPartPtr part, const String & prefix = "", bool restore_covered = false, bool move_to_detached = true);
/** Убирает кусок из списка кусков (включая all_data_parts), но не перемещщает директорию.
*/
void detachPartInPlace(DataPartPtr part);
/** Возвращает старые неактуальные куски, которые можно удалить. Одновременно удаляет их из списка кусков, но не с диска.
*/
@ -685,6 +709,9 @@ public:
ExpressionActionsPtr getPrimaryExpression() const { return primary_expr; }
SortDescription getSortDescription() const { return sort_descr; }
/// Проверить, что кусок не сломан и посчитать для него чексуммы, если их нет.
MutableDataPartPtr loadPartAndFixMetadata(const String & relative_path);
const Context & context;
const String date_column_name;
const ASTPtr sampling_expression;

View File

@ -9,16 +9,27 @@ namespace DB
class MergeTreePartChecker
{
public:
struct Settings
{
bool verbose = false; /// Пишет в stderr прогресс и ошибки, и не останавливается при первой ошибке.
bool require_checksums = false; /// Требует, чтобы был columns.txt.
bool require_column_files = false; /// Требует, чтобы для всех столбцов из columns.txt были файлы.
size_t index_granularity = 8192;
Settings & setVerbose(bool verbose_) { verbose = verbose_; return *this; }
Settings & setRequireChecksums(bool require_checksums_) { require_checksums = require_checksums_; return *this; }
Settings & setRequireColumnFiles(bool require_column_files_) { require_column_files = require_column_files_; return *this; }
Settings & setIndexGranularity(bool index_granularity_) { index_granularity = index_granularity_; return *this; }
};
/** Полностью проверяет данные кусочка:
* - Вычисляет контрольные суммы и сравнивает с checksums.txt.
* - Для массивов и строк проверяет соответствие размеров и количества данных.
* - Проверяет правильность засечек.
* Бросает исключение, если кусок испорчен или если проверить не получилось (TODO: можно попробовать разделить эти случаи).
* Если strict, требует, чтобы для всех столбцов из columns.txt были файлы.
* Если verbose, пишет в stderr прогресс и ошибки, и не останавливается при первой ошибке.
*/
static void checkDataPart(String path, size_t index_granularity, bool strict, const DataTypeFactory & data_type_factory,
bool verbose = false);
static void checkDataPart(String path, const Settings & settings, const DataTypeFactory & data_type_factory,
MergeTreeData::DataPart::Checksums * out_checksums = nullptr);
};
}

View File

@ -33,6 +33,7 @@ public:
UInt64 part_number = block_number_lock.getNumber();
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, part_number);
String part_name = ActiveDataPartSet::getPartName(part->left_date, part->right_date, part->left, part->right, part->level);
/// Если в запросе не указан ID, возьмем в качестве ID хеш от данных. То есть, не вставляем одинаковые данные дважды.
/// NOTE: Если такая дедупликация не нужна, можно вместо этого оставлять block_id пустым.
@ -42,13 +43,10 @@ public:
LOG_DEBUG(log, "Wrote block " << part_number << " with ID " << block_id << ", " << current_block.block.rows() << " rows");
MergeTreeData::Transaction transaction; /// Если не получится добавить кусок в ZK, снова уберем его из рабочего набора.
storage.data.renameTempPartAndAdd(part, nullptr, &transaction);
StorageReplicatedMergeTree::LogEntry log_entry;
log_entry.type = StorageReplicatedMergeTree::LogEntry::GET_PART;
log_entry.source_replica = storage.replica_name;
log_entry.new_part_name = part->name;
log_entry.new_part_name = part_name;
/// Одновременно добавим информацию о куске во все нужные места в ZooKeeper и снимем block_number_lock.
zkutil::Ops ops;
@ -75,7 +73,7 @@ public:
storage.zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent));
}
storage.checkPartAndAddToZooKeeper(part, ops);
storage.checkPartAndAddToZooKeeper(part, ops, part_name);
ops.push_back(new zkutil::Op::Create(
storage.zookeeper_path + "/log/log-",
log_entry.toString(),
@ -83,6 +81,9 @@ public:
zkutil::CreateMode::PersistentSequential));
block_number_lock.getUnlockOps(ops);
MergeTreeData::Transaction transaction; /// Если не получится добавить кусок в ZK, снова уберем его из рабочего набора.
storage.data.renameTempPartAndAdd(part, nullptr, &transaction);
try
{
auto code = storage.zookeeper->tryMulti(ops);

View File

@ -137,6 +137,7 @@ private:
GET_PART, /// Получить кусок с другой реплики.
MERGE_PARTS, /// Слить куски.
DROP_RANGE, /// Удалить куски в указанном месяце в указанном диапазоне номеров.
ATTACH_PART, /// Перенести кусок из директории detached или unreplicated.
};
String znode_name;
@ -144,11 +145,19 @@ private:
Type type;
String source_replica; /// Пустая строка значит, что эта запись была добавлена сразу в очередь, а не скопирована из лога.
String new_part_name; /// Для DROP_RANGE имя несуществующего куска. Нужно удалить все куски, покрытые им.
/// Имя куска, получающегося в результате.
/// Для DROP_RANGE имя несуществующего куска. Нужно удалить все куски, покрытые им.
String new_part_name;
Strings parts_to_merge;
bool detach = false; /// Для DROP_RANGE, true значит, что куски нужно не удалить, а перенести в директорию detached.
/// Для DROP_RANGE, true значит, что куски нужно не удалить, а перенести в директорию detached.
bool detach = false;
/// Для ATTACH_PART имя куска в директории detached или unreplicated.
String source_part_name;
/// Нужно переносить из директории unreplicated, а не detached.
bool attach_unreplicated;
FuturePartTaggerPtr future_part_tagger;
bool currently_executing = false; /// Доступ под queue_mutex.
@ -156,13 +165,13 @@ private:
void addResultToVirtualParts(StorageReplicatedMergeTree & storage)
{
if (type == MERGE_PARTS || type == GET_PART || type == DROP_RANGE)
if (type == MERGE_PARTS || type == GET_PART || type == DROP_RANGE || type == ATTACH_PART)
storage.virtual_parts.add(new_part_name);
}
void tagPartAsFuture(StorageReplicatedMergeTree & storage)
{
if (type == MERGE_PARTS || type == GET_PART)
if (type == MERGE_PARTS || type == GET_PART || type == ATTACH_PART)
future_part_tagger = new FuturePartTagger(new_part_name, storage);
}
@ -362,7 +371,7 @@ private:
* Кладет в ops действия, добавляющие данные о куске в ZooKeeper.
* Вызывать под TableStructureLock.
*/
void checkPartAndAddToZooKeeper(MergeTreeData::DataPartPtr part, zkutil::Ops & ops);
void checkPartAndAddToZooKeeper(MergeTreeData::DataPartPtr part, zkutil::Ops & ops, String name_override = "");
/// Убирает кусок из ZooKeeper и добавляет в очередь задание скачать его. Предполагается это делать с битыми кусками.
void removePartAndEnqueueFetch(const String & part_name);
@ -396,7 +405,8 @@ private:
*/
bool executeLogEntry(const LogEntry & entry, BackgroundProcessingPool::Context & pool_context);
bool executeDropRange(const LogEntry & entry);
void executeDropRange(const LogEntry & entry);
bool executeAttachPart(const LogEntry & entry); /// Возвращает false, если куска нет, и его нужно забрать с другой реплики.
/** Обновляет очередь.
*/
@ -450,7 +460,7 @@ private:
/** Дождаться, пока все реплики, включая эту, выполнят указанное действие из лога.
* Если одновременно с этим добавляются реплики, может не дождаться добавленную реплику.
*/
void waitForAllReplicasToProcessLogEntry(const String & log_znode_path, const LogEntry & entry);
void waitForAllReplicasToProcessLogEntry(const LogEntry & entry);
};
}

View File

@ -108,18 +108,15 @@ static bool isValidFunction(ASTPtr expression, const NameSet & columns)
/// Извлечь все подфункции главной конъюнкции, но зависящие только от заданных столбцов
static void extractFunctions(ASTPtr expression, const NameSet & columns, std::vector<ASTPtr> & result)
{
if (const ASTFunction * function = typeid_cast<const ASTFunction *>(&* expression))
const ASTFunction * function = typeid_cast<const ASTFunction *>(&* expression);
if (function && function->name == "and")
{
if (function->name == "and")
{
for (size_t i = 0; i < function->arguments->children.size(); ++i)
extractFunctions(function->arguments->children[i], columns, result);
}
else
{
if (isValidFunction(expression, columns))
result.push_back(expression->clone());
}
for (size_t i = 0; i < function->arguments->children.size(); ++i)
extractFunctions(function->arguments->children[i], columns, result);
}
else if (isValidFunction(expression, columns))
{
result.push_back(expression->clone());
}
}

View File

@ -69,7 +69,7 @@ void ExpressionAnalyzer::init()
normalizeTree();
/// GROUP BY injective function elimination
eliminateInjectives();
optimizeGroupBy();
/// array_join_alias_to_name, array_join_result_to_source.
getArrayJoinedColumns();
@ -122,7 +122,7 @@ void ExpressionAnalyzer::analyzeAggregation()
if (select_query->group_expression_list)
{
NameSet unique_keys;
const ASTs & group_asts = select_query->group_expression_list->children;
auto & group_asts = select_query->group_expression_list->children;
for (size_t i = 0; i < group_asts.size(); ++i)
{
getRootActions(group_asts[i], true, false, temp_actions);
@ -135,6 +135,17 @@ void ExpressionAnalyzer::analyzeAggregation()
const auto & col = block.getByName(column_name);
/// constant expressions have non-null column pointer at this stage
if (const auto is_constexpr = col.column)
{
if (i < group_asts.size() - 1)
group_asts[i] = std::move(group_asts.back());
group_asts.pop_back();
i -= 1;
continue;
}
NameAndTypePair key{column_name, col.type};
aggregation_keys.push_back(key);
@ -145,6 +156,12 @@ void ExpressionAnalyzer::analyzeAggregation()
aggregated_columns.push_back(std::move(key));
}
}
if (group_asts.empty())
{
select_query->group_expression_list = nullptr;
has_aggregation = select_query->having_expression || aggregate_descriptions.size();
}
}
for (size_t i = 0; i < aggregate_descriptions.size(); ++i)
@ -426,7 +443,7 @@ void ExpressionAnalyzer::normalizeTreeImpl(ASTPtr & ast, MapOfASTs & finished_as
}
void ExpressionAnalyzer::eliminateInjectives()
void ExpressionAnalyzer::optimizeGroupBy()
{
if (!(select_query && select_query->group_expression_list))
return;
@ -446,13 +463,16 @@ void ExpressionAnalyzer::eliminateInjectives()
};
/// iterate over each GROUP BY expression, eliminate injective function calls and literals
for (size_t i = 0; i < group_exprs.size(); ++i)
for (size_t i = 0; i < group_exprs.size();)
{
if (const auto function = typeid_cast<ASTFunction*>(group_exprs[i].get()))
{
/// assert function is injective
if (!injectiveFunctionNames.count(function->name))
{
++i;
continue;
}
/// copy shared pointer to args in order to ensure lifetime
auto args_ast = function->arguments;
@ -461,7 +481,6 @@ void ExpressionAnalyzer::eliminateInjectives()
* next iteration does not skip not yet processed data
*/
remove_expr_at_index(i);
i -= 1;
/// copy non-literal arguments
std::remove_copy_if(
@ -469,7 +488,19 @@ void ExpressionAnalyzer::eliminateInjectives()
std::back_inserter(group_exprs), is_literal
);
}
else if (is_literal(group_exprs[i]))
{
remove_expr_at_index(i);
}
else
{
/// if neither a function nor literal - advance to next expression
++i;
}
}
if (group_exprs.empty())
select_query->group_expression_list = nullptr;
}

View File

@ -346,6 +346,10 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
need_second_distinct_pass = streams.size() > 1;
}
else if (query.group_by_with_totals && !aggregate_final)
{
executeTotalsAndHaving(streams, false, nullptr, aggregate_overflow_row);
}
if (has_order_by)
executeOrder(streams);

View File

@ -77,7 +77,7 @@ bool ParserParenthesisExpression::parseImpl(Pos & pos, Pos end, ASTPtr & node, E
/// пустое выражение в скобках недопустимо
if (expr_list.children.empty())
{
expected = "not empty list of expressions in parenthesis";
expected = "non-empty parenthesized list of expressions";
return false;
}

View File

@ -198,18 +198,18 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Expected &
return false;
ws.ignore(pos, end);
}
/// WITH TOTALS
if (s_with.ignore(pos, end, expected))
{
ws.ignore(pos, end);
if (!s_totals.ignore(pos, end, expected))
return false;
/// WITH TOTALS
if (s_with.ignore(pos, end, expected))
{
ws.ignore(pos, end);
if (!s_totals.ignore(pos, end, expected))
return false;
select_query->group_by_with_totals = true;
select_query->group_by_with_totals = true;
ws.ignore(pos, end);
}
ws.ignore(pos, end);
}
/// HAVING expr

View File

@ -195,11 +195,11 @@ void formatAST(const ASTSelectQuery & ast, std::ostream & s, size_t indent, bo
one_line
? formatAST(*ast.group_expression_list, s, indent, hilite, one_line)
: formatExpressionListMultiline(typeid_cast<const ASTExpressionList &>(*ast.group_expression_list), s, indent, hilite);
if (ast.group_by_with_totals)
s << (hilite ? hilite_keyword : "") << nl_or_ws << indent_str << (one_line ? "" : " ") << "WITH TOTALS" << (hilite ? hilite_none : "");
}
if (ast.group_by_with_totals)
s << (hilite ? hilite_keyword : "") << nl_or_ws << indent_str << (one_line ? "" : " ") << "WITH TOTALS" << (hilite ? hilite_none : "");
if (ast.having_expression)
{
s << (hilite ? hilite_keyword : "") << nl_or_ws << indent_str << "HAVING " << (hilite ? hilite_none : "");

View File

@ -85,7 +85,9 @@ void TCPHandler::runImpl()
sendHello();
connection_context.setProgressCallback(boost::bind(&TCPHandler::updateProgress, this, _1, _2));
connection_context.setProgressCallback([this] (const size_t rows, const size_t bytes) {
return this->updateProgress(rows, bytes);
});
while (1)
{

View File

@ -82,6 +82,12 @@ Strings ActiveDataPartSet::getParts() const
return res;
}
size_t ActiveDataPartSet::size() const
{
Poco::ScopedLock<Poco::Mutex> lock(mutex);
return parts.size();
}
String ActiveDataPartSet::getPartName(DayNum_t left_date, DayNum_t right_date, UInt64 left_id, UInt64 right_id, UInt64 level)
@ -110,10 +116,14 @@ String ActiveDataPartSet::getPartName(DayNum_t left_date, DayNum_t right_date, U
return res;
}
bool ActiveDataPartSet::isPartDirectory(const String & dir_name, Poco::RegularExpression::MatchVec & matches)
bool ActiveDataPartSet::isPartDirectory(const String & dir_name, Poco::RegularExpression::MatchVec * out_matches)
{
Poco::RegularExpression::MatchVec matches;
static Poco::RegularExpression file_name_regexp("^(\\d{8})_(\\d{8})_(\\d+)_(\\d+)_(\\d+)");
return (file_name_regexp.match(dir_name, 0, matches) && 6 == matches.size());
bool res = (file_name_regexp.match(dir_name, 0, matches) && 6 == matches.size());
if (out_matches)
*out_matches = matches;
return res;
}
void ActiveDataPartSet::parsePartName(const String & file_name, Part & part, const Poco::RegularExpression::MatchVec * matches_p)
@ -121,7 +131,7 @@ void ActiveDataPartSet::parsePartName(const String & file_name, Part & part, con
Poco::RegularExpression::MatchVec match_vec;
if (!matches_p)
{
if (!isPartDirectory(file_name, match_vec))
if (!isPartDirectory(file_name, &match_vec))
throw Exception("Unexpected part name: " + file_name, ErrorCodes::BAD_DATA_PART_NAME);
matches_p = &match_vec;
}

View File

@ -5,6 +5,7 @@
#include <DB/Storages/MergeTree/MergeTreeReader.h>
#include <DB/Storages/MergeTree/MergeTreeBlockInputStream.h>
#include <DB/Storages/MergeTree/MergedBlockOutputStream.h>
#include <DB/Storages/MergeTree/MergeTreePartChecker.h>
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Parsers/ASTNameTypePair.h>
#include <DB/DataStreams/ExpressionBlockInputStream.h>
@ -124,7 +125,7 @@ void MergeTreeData::loadDataParts()
Poco::RegularExpression::MatchVec matches;
for (const String & file_name : part_file_names)
{
if (!ActiveDataPartSet::isPartDirectory(file_name, matches))
if (!ActiveDataPartSet::isPartDirectory(file_name, &matches))
continue;
MutableDataPartPtr part = std::make_shared<DataPart>(*this);
@ -135,10 +136,10 @@ void MergeTreeData::loadDataParts()
try
{
part->loadColumns();
part->loadChecksums();
part->loadColumns(require_part_metadata);
part->loadChecksums(require_part_metadata);
part->loadIndex();
part->checkNotBroken();
part->checkNotBroken(require_part_metadata);
}
catch (...)
{
@ -167,7 +168,7 @@ void MergeTreeData::loadDataParts()
{
if (contained_name == file_name)
continue;
if (!ActiveDataPartSet::isPartDirectory(contained_name, matches))
if (!ActiveDataPartSet::isPartDirectory(contained_name, &matches))
continue;
DataPart contained_part(*this);
ActiveDataPartSet::parsePartName(contained_name, contained_part, &matches);
@ -720,7 +721,17 @@ void MergeTreeData::replaceParts(const DataPartsVector & remove, const DataParts
}
}
void MergeTreeData::renameAndDetachPart(DataPartPtr part, const String & prefix, bool restore_covered)
void MergeTreeData::attachPart(DataPartPtr part)
{
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);
if (!all_data_parts.insert(part).second)
throw Exception("Part " + part->name + " is already attached", ErrorCodes::DUPLICATE_DATA_PART);
data_parts.insert(part);
}
void MergeTreeData::renameAndDetachPart(DataPartPtr part, const String & prefix, bool restore_covered, bool move_to_detached)
{
LOG_INFO(log, "Renaming " << part->name << " to " << prefix << part->name << " and detaching it.");
@ -731,7 +742,8 @@ void MergeTreeData::renameAndDetachPart(DataPartPtr part, const String & prefix,
throw Exception("No such data part", ErrorCodes::NO_SUCH_DATA_PART);
data_parts.erase(part);
part->renameAddPrefix("detached/" + prefix);
if (move_to_detached || !prefix.empty())
part->renameAddPrefix((move_to_detached ? "detached/" : "") + prefix);
if (restore_covered)
{
@ -783,6 +795,11 @@ void MergeTreeData::renameAndDetachPart(DataPartPtr part, const String & prefix,
}
}
void MergeTreeData::detachPartInPlace(DataPartPtr part)
{
renameAndDetachPart(part, "", false, false);
}
MergeTreeData::DataParts MergeTreeData::getDataParts()
{
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
@ -879,6 +896,41 @@ MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_na
return nullptr;
}
MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartAndFixMetadata(const String & relative_path)
{
MutableDataPartPtr part = std::make_shared<DataPart>(*this);
part->name = relative_path;
/// Раньше список столбцов записывался неправильно. Удалим его и создадим заново.
if (Poco::File(full_path + relative_path + "/columns.txt").exists())
Poco::File(full_path + relative_path + "/columns.txt").remove();
part->loadColumns(false);
part->loadChecksums(false);
part->loadIndex();
part->checkNotBroken(false);
part->modification_time = Poco::File(full_path + relative_path).getLastModified().epochTime();
/// Если нет файла с чексуммами, посчитаем чексуммы и запишем. Заодно проверим данные.
if (part->checksums.empty())
{
MergeTreePartChecker::Settings settings;
settings.setIndexGranularity(index_granularity);
settings.setRequireColumnFiles(true);
MergeTreePartChecker::checkDataPart(full_path + relative_path, settings, context.getDataTypeFactory(), &part->checksums);
{
WriteBufferFromFile out(full_path + relative_path + "/checksums.txt.tmp", 4096);
part->checksums.writeText(out);
}
Poco::File(full_path + relative_path + "/checksums.txt.tmp").renameTo(full_path + relative_path + "/checksums.txt");
}
return part;
}
void MergeTreeData::DataPart::Checksums::Checksum::checkEqual(const Checksum & rhs, bool have_uncompressed, const String & name) const
{

View File

@ -97,7 +97,7 @@ struct Stream
return size / sizeof(UInt64);
}
void assertMark(bool strict)
void assertMark()
{
MarkInCompressedFile mrk_mark;
readIntBinary(mrk_mark.offset_in_compressed_file, mrk_hashing_buf);
@ -152,7 +152,7 @@ struct Stream
};
/// Возвращает количество строк. Добавляет в checksums чексуммы всех файлов столбца.
static size_t checkColumn(const String & path, const String & name, DataTypePtr type, size_t index_granularity, bool strict,
static size_t checkColumn(const String & path, const String & name, DataTypePtr type, const MergeTreePartChecker::Settings & settings,
MergeTreeData::DataPart::Checksums & checksums)
{
size_t rows = 0;
@ -171,10 +171,10 @@ static size_t checkColumn(const String & path, const String & name, DataTypePtr
if (sizes_stream.marksEOF())
break;
sizes_stream.assertMark(strict);
data_stream.assertMark(strict);
sizes_stream.assertMark();
data_stream.assertMark();
size_t cur_rows = sizes_stream.readUInt64(index_granularity, sizes);
size_t cur_rows = sizes_stream.readUInt64(settings.index_granularity, sizes);
size_t sum = 0;
for (size_t i = 0; i < cur_rows; ++i)
@ -188,7 +188,7 @@ static size_t checkColumn(const String & path, const String & name, DataTypePtr
data_stream.read(sum);
rows += cur_rows;
if (cur_rows < index_granularity)
if (cur_rows < settings.index_granularity)
break;
}
@ -207,12 +207,12 @@ static size_t checkColumn(const String & path, const String & name, DataTypePtr
if (data_stream.marksEOF())
break;
data_stream.assertMark(strict);
data_stream.assertMark();
size_t cur_rows = data_stream.read(index_granularity);
size_t cur_rows = data_stream.read(settings.index_granularity);
rows += cur_rows;
if (cur_rows < index_granularity)
if (cur_rows < settings.index_granularity)
break;
}
@ -228,8 +228,8 @@ static size_t checkColumn(const String & path, const String & name, DataTypePtr
}
}
void MergeTreePartChecker::checkDataPart(String path, size_t index_granularity, bool strict, const DataTypeFactory & data_type_factory,
bool verbose)
void MergeTreePartChecker::checkDataPart(String path, const Settings & settings, const DataTypeFactory & data_type_factory,
MergeTreeData::DataPart::Checksums * out_checksums)
{
if (!path.empty() && *path.rbegin() != '/')
path += "/";
@ -243,7 +243,7 @@ void MergeTreePartChecker::checkDataPart(String path, size_t index_granularity,
assertEOF(buf);
}
if (strict || Poco::File(path + "checksums.txt").exists())
if (settings.require_checksums || Poco::File(path + "checksums.txt").exists())
{
ReadBufferFromFile buf(path + "checksums.txt");
checksums_txt.readText(buf);
@ -266,7 +266,7 @@ void MergeTreePartChecker::checkDataPart(String path, size_t index_granularity,
for (const NameAndTypePair & column : columns)
{
if (verbose)
if (settings.verbose)
{
std::cerr << column.name << ":";
std::cerr.flush();
@ -275,14 +275,14 @@ void MergeTreePartChecker::checkDataPart(String path, size_t index_granularity,
bool ok = false;
try
{
if (!strict && !Poco::File(path + escapeForFileName(column.name) + ".bin").exists())
if (!settings.require_column_files && !Poco::File(path + escapeForFileName(column.name) + ".bin").exists())
{
if (verbose)
if (settings.verbose)
std::cerr << " no files" << std::endl;
continue;
}
size_t cur_rows = checkColumn(path, column.name, column.type, index_granularity, strict, checksums_data);
size_t cur_rows = checkColumn(path, column.name, column.type, settings, checksums_data);
if (first)
{
rows = cur_rows;
@ -298,7 +298,7 @@ void MergeTreePartChecker::checkDataPart(String path, size_t index_granularity,
}
catch (...)
{
if (!verbose)
if (!settings.verbose)
throw;
ExceptionPtr e = cloneCurrentException();
if (!first_exception)
@ -311,18 +311,18 @@ void MergeTreePartChecker::checkDataPart(String path, size_t index_granularity,
std::cerr << std::endl;
}
if (verbose && ok)
if (settings.verbose && ok)
std::cerr << " ok" << std::endl;
}
if (first)
throw Exception("No columns", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
if (primary_idx_size % ((rows - 1) / index_granularity + 1))
if (primary_idx_size % ((rows - 1) / settings.index_granularity + 1))
throw Exception("primary.idx size (" + toString(primary_idx_size) + ") not divisible by number of marks ("
+ toString(rows) + "/" + toString(index_granularity) + " rounded up)", ErrorCodes::CORRUPTED_DATA);
+ toString(rows) + "/" + toString(settings.index_granularity) + " rounded up)", ErrorCodes::CORRUPTED_DATA);
if (strict || !checksums_txt.files.empty())
if (settings.require_checksums || !checksums_txt.files.empty())
checksums_txt.checkEqual(checksums_data, true);
if (first_exception)

View File

@ -110,8 +110,8 @@ MergeTreeData::MutableDataPartPtr ReplicatedMergeTreePartsFetcher::fetchPart(
ActiveDataPartSet::parsePartName(part_name, *new_data_part);
new_data_part->modification_time = time(0);
new_data_part->loadColumns();
new_data_part->loadChecksums();
new_data_part->loadColumns(true);
new_data_part->loadChecksums(true);
new_data_part->loadIndex();
new_data_part->checksums.checkEqual(checksums, false);

View File

@ -17,6 +17,8 @@ const auto ERROR_SLEEP_MS = 1000;
const auto MERGE_SELECTING_SLEEP_MS = 5 * 1000;
const auto CLEANUP_SLEEP_MS = 30 * 1000;
const auto RESERVED_BLOCK_NUMBERS = 200;
/// Преобразовать число в строку формате суффиксов автоинкрементных нод в ZooKeeper.
static String padIndex(UInt64 index)
{
@ -174,7 +176,6 @@ void StorageReplicatedMergeTree::createTable()
zookeeper->create(zookeeper_path + "/nonincrement_block_numbers", "", zkutil::CreateMode::Persistent);
zookeeper->create(zookeeper_path + "/leader_election", "", zkutil::CreateMode::Persistent);
zookeeper->create(zookeeper_path + "/temp", "", zkutil::CreateMode::Persistent);
zookeeper->create(zookeeper_path + "/flags", "", zkutil::CreateMode::Persistent);
/// Создадим replicas в последнюю очередь, чтобы нельзя было добавить реплику, пока все остальные ноды не созданы.
zookeeper->create(zookeeper_path + "/replicas", "", zkutil::CreateMode::Persistent);
}
@ -242,6 +243,7 @@ void StorageReplicatedMergeTree::createReplica()
zookeeper->create(replica_path + "/log_pointer", "", zkutil::CreateMode::Persistent);
zookeeper->create(replica_path + "/queue", "", zkutil::CreateMode::Persistent);
zookeeper->create(replica_path + "/parts", "", zkutil::CreateMode::Persistent);
zookeeper->create(replica_path + "/flags", "", zkutil::CreateMode::Persistent);
/** Нужно изменить данные ноды /replicas на что угодно, чтобы поток, удаляющий старые записи в логе,
* споткнулся об это изменение и не удалил записи, которые мы еще не прочитали.
@ -399,10 +401,10 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
+ toString(expected_parts.size()) + " missing obsolete parts, "
+ toString(parts_to_fetch.size()) + " missing parts";
bool insane =
parts_to_add.size() > 2 ||
unexpected_parts.size() > 2 ||
expected_parts.size() > 20 ||
parts_to_fetch.size() > 2;
parts_to_add.size() > data.settings.replicated_max_unexpectedly_merged_parts ||
unexpected_parts.size() > data.settings.replicated_max_unexpected_parts ||
expected_parts.size() > data.settings.replicated_max_missing_obsolete_parts ||
parts_to_fetch.size() > data.settings.replicated_max_missing_active_parts;
if (insane && !skip_sanity_checks)
{
@ -475,8 +477,11 @@ void StorageReplicatedMergeTree::initVirtualParts()
}
}
void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataPartPtr part, zkutil::Ops & ops)
void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataPartPtr part, zkutil::Ops & ops, String part_name)
{
if (part_name.empty())
part_name = part->name;
check(part->columns);
int expected_columns_version = columns_version;
@ -488,22 +493,22 @@ void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataP
{
zkutil::Stat stat_before, stat_after;
String columns_str;
if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/parts/" + part->name + "/columns", columns_str, &stat_before))
if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/parts/" + part_name + "/columns", columns_str, &stat_before))
continue;
if (columns_str != expected_columns_str)
{
LOG_INFO(log, "Not checking checksums of part " << part->name << " with replica " << replica
LOG_INFO(log, "Not checking checksums of part " << part_name << " with replica " << replica
<< " because columns are different");
continue;
}
String checksums_str;
/// Проверим, что версия ноды со столбцами не изменилась, пока мы читали checksums.
/// Это гарантирует, что столбцы и чексуммы относятся к одним и тем же данным.
if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/parts/" + part->name + "/checksums", checksums_str) ||
!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/parts/" + part->name + "/columns", &stat_after) ||
if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/parts/" + part_name + "/checksums", checksums_str) ||
!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/parts/" + part_name + "/columns", &stat_after) ||
stat_before.version != stat_after.version)
{
LOG_INFO(log, "Not checking checksums of part " << part->name << " with replica " << replica
LOG_INFO(log, "Not checking checksums of part " << part_name << " with replica " << replica
<< " because part changed while we were reading its checksums");
continue;
}
@ -512,9 +517,9 @@ void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataP
checksums.checkEqual(part->checksums, true);
}
if (zookeeper->exists(replica_path + "/parts/" + part->name))
if (zookeeper->exists(replica_path + "/parts/" + part_name))
{
LOG_ERROR(log, "checkPartAndAddToZooKeeper: node " << replica_path + "/parts/" + part->name << " already exists");
LOG_ERROR(log, "checkPartAndAddToZooKeeper: node " << replica_path + "/parts/" + part_name << " already exists");
return;
}
@ -522,17 +527,17 @@ void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataP
zookeeper_path + "/columns",
expected_columns_version));
ops.push_back(new zkutil::Op::Create(
replica_path + "/parts/" + part->name,
replica_path + "/parts/" + part_name,
"",
zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
replica_path + "/parts/" + part->name + "/columns",
replica_path + "/parts/" + part_name + "/columns",
part->columns.toString(),
zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
replica_path + "/parts/" + part->name + "/checksums",
replica_path + "/parts/" + part_name + "/checksums",
part->checksums.toString(),
zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent));
@ -749,7 +754,8 @@ void StorageReplicatedMergeTree::pullLogsToQueue(zkutil::EventPtr next_update_ev
bool StorageReplicatedMergeTree::shouldExecuteLogEntry(const LogEntry & entry)
{
if ((entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::GET_PART) && future_parts.count(entry.new_part_name))
if ((entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::GET_PART || entry.type == LogEntry::ATTACH_PART)
&& future_parts.count(entry.new_part_name))
{
LOG_DEBUG(log, "Not executing log entry for part " << entry.new_part_name <<
" because another log entry for the same part is being processed. This shouldn't happen often.");
@ -779,10 +785,14 @@ bool StorageReplicatedMergeTree::shouldExecuteLogEntry(const LogEntry & entry)
bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, BackgroundProcessingPool::Context & pool_context)
{
if (entry.type == LogEntry::DROP_RANGE)
return executeDropRange(entry);
{
executeDropRange(entry);
return true;
}
if (entry.type == LogEntry::GET_PART ||
entry.type == LogEntry::MERGE_PARTS)
entry.type == LogEntry::MERGE_PARTS ||
entry.type == LogEntry::ATTACH_PART)
{
/// Если у нас уже есть этот кусок или покрывающий его кусок, ничего делать не нужно.
MergeTreeData::DataPartPtr containing_part = data.getActiveContainingPart(entry.new_part_name);
@ -805,6 +815,10 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
{
do_fetch = true;
}
else if (entry.type == LogEntry::ATTACH_PART)
{
do_fetch = !executeAttachPart(entry);
}
else if (entry.type == LogEntry::MERGE_PARTS)
{
MergeTreeData::DataPartsVector parts;
@ -819,8 +833,8 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
}
if (part->name != name)
{
LOG_ERROR(log, "Log and parts set look inconsistent: " << name << " is covered by " << part->name
<< " but should be merged into " << entry.new_part_name);
LOG_WARNING(log, "Part " << name << " is covered by " << part->name
<< " but should be merged into " << entry.new_part_name << ". This shouldn't happen often.");
have_all_parts = false;
break;
}
@ -854,6 +868,10 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
zkutil::Ops ops;
checkPartAndAddToZooKeeper(part, ops);
/** TODO: Переименование нового куска лучше делать здесь, а не пятью строчками выше,
* чтобы оно было как можно ближе к zookeeper->multi.
*/
zookeeper->multi(ops);
/** При ZCONNECTIONLOSS или ZOPERATIONTIMEOUT можем зря откатить локальные изменения кусков.
@ -959,7 +977,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
return true;
}
bool StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTree::LogEntry & entry)
void StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTree::LogEntry & entry)
{
LOG_INFO(log, (entry.detach ? "Detaching" : "Removing") << " parts inside " << entry.new_part_name << ".");
@ -1019,7 +1037,7 @@ bool StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTr
/// Если кусок нужно удалить, надежнее удалить директорию после изменений в ZooKeeper.
if (!entry.detach)
data.replaceParts({part}, {}, false);
data.replaceParts({part}, {}, true);
}
LOG_INFO(log, (entry.detach ? "Detached " : "Removed ") << removed_parts << " parts inside " << entry.new_part_name << ".");
@ -1043,6 +1061,48 @@ bool StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTr
unreplicated_data->replaceParts({part}, {}, false);
}
}
}
bool StorageReplicatedMergeTree::executeAttachPart(const StorageReplicatedMergeTree::LogEntry & entry)
{
String source_path = (entry.attach_unreplicated ? "unreplicated/" : "detached/") + entry.source_part_name;
LOG_INFO(log, "Attaching part " << entry.source_part_name << " from " << source_path << " as " << entry.new_part_name);
if (!Poco::File(data.getFullPath() + source_path).exists())
{
LOG_INFO(log, "No part at " << source_path << ". Will fetch it instead");
return false;
}
LOG_DEBUG(log, "Checking data");
MergeTreeData::MutableDataPartPtr part = data.loadPartAndFixMetadata(source_path);
zkutil::Ops ops;
checkPartAndAddToZooKeeper(part, ops, entry.new_part_name);
if (entry.attach_unreplicated && unreplicated_data)
{
MergeTreeData::DataPartPtr unreplicated_part = unreplicated_data->getPartIfExists(entry.source_part_name);
if (unreplicated_part)
unreplicated_data->detachPartInPlace(unreplicated_part);
else
LOG_WARNING(log, "Unreplicated part " << entry.source_part_name << " is already detached");
}
zookeeper->multi(ops);
/// NOTE: Не можем использовать renameTempPartAndAdd, потому что кусок не временный - если что-то пойдет не так, его не нужно удалять.
part->renameTo(entry.new_part_name);
part->name = entry.new_part_name;
ActiveDataPartSet::parsePartName(part->name, *part);
data.attachPart(part);
LOG_INFO(log, "Finished attaching part " << entry.new_part_name);
/// На месте удаленных кусков могут появиться новые, с другими данными.
context.resetCaches();
return true;
}
@ -1622,8 +1682,12 @@ void StorageReplicatedMergeTree::partCheckThread()
if (part->columns != zk_columns)
throw Exception("Columns of local part " + part_name + " are different from ZooKeeper");
MergeTreePartChecker::Settings settings;
settings.setIndexGranularity(data.index_granularity);
settings.setRequireChecksums(true);
settings.setRequireColumnFiles(true);
MergeTreePartChecker::checkDataPart(
data.getFullPath() + part_name, data.index_granularity, true, context.getDataTypeFactory());
data.getFullPath() + part_name, settings, context.getDataTypeFactory());
LOG_INFO(log, "Part " << part_name << " looks good.");
}
@ -1747,12 +1811,12 @@ void StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart(part_name, zookeeper_path + "/replicas/" + replica_name, host, port);
zkutil::Ops ops;
checkPartAndAddToZooKeeper(part, ops, part_name);
MergeTreeData::Transaction transaction;
auto removed_parts = data.renameTempPartAndReplace(part, nullptr, &transaction);
zkutil::Ops ops;
checkPartAndAddToZooKeeper(part, ops);
zookeeper->multi(ops);
transaction.commit();
merge_selecting_event.set();
@ -2132,12 +2196,7 @@ static String getFakePartNameForDrop(const String & month_name, UInt64 left, UIn
void StorageReplicatedMergeTree::dropPartition(const Field & field, bool detach)
{
String month_name;
if (field.getType() == Field::Types::UInt64)
month_name = toString(field.get<UInt64>());
else
month_name = field.safeGet<String>();
String month_name = field.getType() == Field::Types::UInt64 ? toString(field.get<UInt64>()) : field.safeGet<String>();
if (!isValidMonthName(month_name))
throw Exception("Invalid partition format: " + month_name + ". Partition should consist of 6 digits: YYYYMM",
@ -2188,12 +2247,101 @@ void StorageReplicatedMergeTree::dropPartition(const Field & field, bool detach)
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
/// Дождемся, пока все реплики выполнят дроп.
waitForAllReplicasToProcessLogEntry(log_znode_path, entry);
waitForAllReplicasToProcessLogEntry(entry);
}
void StorageReplicatedMergeTree::attachPartition(const Field& partition, bool unreplicated, bool part)
void StorageReplicatedMergeTree::attachPartition(const Field & field, bool unreplicated, bool attach_part)
{
throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED);
String partition = field.getType() == Field::Types::UInt64 ? toString(field.get<UInt64>()) : field.safeGet<String>();
if (!attach_part && !isValidMonthName(partition))
throw Exception("Invalid partition format: " + partition + ". Partition should consist of 6 digits: YYYYMM",
ErrorCodes::INVALID_PARTITION_NAME);
String source_dir = (unreplicated ? "unreplicated/" : "detached/");
/// Составим список кусков, которые нужно добавить.
Strings parts;
if (attach_part)
{
parts.push_back(partition);
}
else
{
LOG_DEBUG(log, "Looking for parts for partition " << partition << " in " << source_dir);
ActiveDataPartSet active_parts;
for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it)
{
String name = it.name();
if (!ActiveDataPartSet::isPartDirectory(name))
continue;
if (name.substr(0, partition.size()) != partition)
continue;
LOG_DEBUG(log, "Found part " << name);
active_parts.add(name);
}
LOG_DEBUG(log, active_parts.size() << " of them are active");
parts = active_parts.getParts();
}
/// Синхронно проверим, что добавляемые куски существуют и не испорчены хотя бы на этой реплике. Запишем checksums.txt, если его нет.
LOG_DEBUG(log, "Checking parts");
for (const String & part : parts)
{
LOG_DEBUG(log, "Checking part " << part);
data.loadPartAndFixMetadata(source_dir + part);
}
/// Выделим добавляемым кускам максимальные свободные номера, меньшие RESERVED_BLOCK_NUMBERS.
/// NOTE: Проверка свободности номеров никак не синхронизируется. Выполнять несколько запросов ATTACH/DETACH/DROP одновременно нельзя.
UInt64 min_used_number = RESERVED_BLOCK_NUMBERS;
{
auto existing_parts = data.getDataParts();
for (const auto & part : existing_parts)
{
min_used_number = std::min(min_used_number, part->left);
}
}
if (parts.size() > min_used_number)
throw Exception("Not enough free small block numbers for attaching parts: "
+ toString(parts.size()) + " needed, " + toString(min_used_number) + " available", ErrorCodes::NOT_ENOUGH_BLOCK_NUMBERS);
/// Добавим записи в лог.
std::reverse(parts.begin(), parts.end());
std::list<LogEntry> entries;
zkutil::Ops ops;
for (const String & part_name : parts)
{
ActiveDataPartSet::Part part;
ActiveDataPartSet::parsePartName(part_name, part);
part.left = part.right = --min_used_number;
String new_part_name = ActiveDataPartSet::getPartName(part.left_date, part.right_date, part.left, part.right, part.level);
LOG_INFO(log, "Will attach " << part_name << " as " << new_part_name);
entries.emplace_back();
LogEntry & entry = entries.back();
entry.type = LogEntry::ATTACH_PART;
entry.source_replica = replica_name;
entry.source_part_name = part_name;
entry.new_part_name = new_part_name;
entry.attach_unreplicated = unreplicated;
ops.push_back(new zkutil::Op::Create(
zookeeper_path + "/log/log-", entry.toString(), zookeeper->getDefaultACL(), zkutil::CreateMode::PersistentSequential));
}
LOG_DEBUG(log, "Adding attaches to log");
zookeeper->multi(ops);
size_t i = 0;
for (LogEntry & entry : entries)
{
String log_znode_path = dynamic_cast<zkutil::Op::Create &>(ops[i++]).getPathCreated();
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
waitForAllReplicasToProcessLogEntry(entry);
}
}
void StorageReplicatedMergeTree::drop()
@ -2243,7 +2391,7 @@ AbandonableLockInZooKeeper StorageReplicatedMergeTree::allocateBlockNumber(const
zkutil::Ops ops;
auto acl = zookeeper->getDefaultACL();
ops.push_back(new zkutil::Op::Create(month_path, "", acl, zkutil::CreateMode::Persistent));
for (size_t i = 0; i < 200; ++i)
for (size_t i = 0; i < RESERVED_BLOCK_NUMBERS; ++i)
{
ops.push_back(new zkutil::Op::Create(month_path + "/skip_increment", "", acl, zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Remove(month_path + "/skip_increment", -1));
@ -2257,11 +2405,11 @@ AbandonableLockInZooKeeper StorageReplicatedMergeTree::allocateBlockNumber(const
zookeeper_path + "/temp", *zookeeper);
}
void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const String & log_znode_path, const LogEntry & entry)
void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const LogEntry & entry)
{
LOG_DEBUG(log, "Waiting for all replicas to process " << entry.znode_name);
UInt64 log_index = parse<UInt64>(log_znode_path.substr(log_znode_path.size() - 10));
UInt64 log_index = parse<UInt64>(entry.znode_name.substr(entry.znode_name.size() - 10));
String log_entry_str = entry.toString();
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
@ -2351,6 +2499,16 @@ void StorageReplicatedMergeTree::LogEntry::writeText(WriteBuffer & out) const
writeString("drop\n", out);
writeString(new_part_name, out);
break;
case ATTACH_PART:
writeString("attach\n", out);
if (attach_unreplicated)
writeString("unreplicated\n", out);
else
writeString("detached\n", out);
writeString(source_part_name, out);
writeString("\ninto\n", out);
writeString(new_part_name, out);
break;
}
writeString("\n", out);
}
@ -2391,6 +2549,22 @@ void StorageReplicatedMergeTree::LogEntry::readText(ReadBuffer & in)
detach = type_str == "detach";
readString(new_part_name, in);
}
else if (type_str == "attach")
{
type = ATTACH_PART;
String source_type;
readString(source_type, in);
if (source_type == "unreplicated")
attach_unreplicated = true;
else if (source_type == "detached")
attach_unreplicated = false;
else
throw Exception("Bad format: expected 'unreplicated' or 'detached', found '" + source_type + "'", ErrorCodes::CANNOT_PARSE_TEXT);
assertString("\n", in);
readString(source_part_name, in);
assertString("\ninto\n", in);
readString(new_part_name, in);
}
assertString("\n", in);
}

View File

@ -15,8 +15,14 @@ int main(int argc, char ** argv)
try
{
DB::MergeTreePartChecker::checkDataPart(argv[1], argc == 4 ? DB::parse<size_t>(argv[3]) : 8192ul, argv[2][0] == '1',
DB::DataTypeFactory(), true);
DB::MergeTreePartChecker::Settings settings;
if (argc == 4)
settings.setIndexGranularity(DB::parse<size_t>(argv[3]));
settings.setRequireChecksums(argv[2][0] == '1');
settings.setRequireColumnFiles(argv[2][0] == '1');
settings.setVerbose(true);
DB::MergeTreePartChecker::checkDataPart(argv[1], settings, DB::DataTypeFactory());
}
catch (...)
{

View File

@ -180,6 +180,8 @@ int main(int argc, char ** argv)
DB::assertString("set", in);
DB::skipWhitespaceIfAny(in);
DB::assertString(path, in);
DB::skipWhitespaceIfAny(in);
readMaybeQuoted(data, in);
DB::skipWhitespaceIfAny(in);