This commit is contained in:
Michael Kolupaev 2014-08-08 12:28:13 +04:00
parent 83812f5ed5
commit 71b90ea1d4
12 changed files with 372 additions and 92 deletions

View File

@ -257,6 +257,7 @@ namespace ErrorCodes
INCORRECT_MARK, INCORRECT_MARK,
INVALID_PARTITION_NAME, INVALID_PARTITION_NAME,
NOT_LEADER, NOT_LEADER,
NOT_ENOUGH_BLOCK_NUMBERS,
POCO_EXCEPTION = 1000, POCO_EXCEPTION = 1000,
STD_EXCEPTION, STD_EXCEPTION,

View File

@ -46,7 +46,7 @@ private:
static PartitionCommand attachPartition(const Field & partition, bool unreplicated, bool part) static PartitionCommand attachPartition(const Field & partition, bool unreplicated, bool part)
{ {
return {ATTACH_PARTITION, partition, false, part, unreplicated}; return {ATTACH_PARTITION, partition, false, unreplicated, part};
} }
}; };

View File

@ -63,12 +63,14 @@ public:
void add(const String & name); void add(const String & name);
String getContainingPart(const String & name) const; String getContainingPart(const String & name) const;
Strings getParts() const; Strings getParts() const; /// В порядке возрастания месяца и номера блока.
size_t size() const;
static String getPartName(DayNum_t left_date, DayNum_t right_date, UInt64 left_id, UInt64 right_id, UInt64 level); static String getPartName(DayNum_t left_date, DayNum_t right_date, UInt64 left_id, UInt64 right_id, UInt64 level);
/// Возвращает true если имя директории совпадает с форматом имени директории кусочков /// Возвращает true если имя директории совпадает с форматом имени директории кусочков
static bool isPartDirectory(const String & dir_name, Poco::RegularExpression::MatchVec & matches); static bool isPartDirectory(const String & dir_name, Poco::RegularExpression::MatchVec * out_matches = nullptr);
/// Кладет в DataPart данные из имени кусочка. /// Кладет в DataPart данные из имени кусочка.
static void parsePartName(const String & file_name, Part & part, const Poco::RegularExpression::MatchVec * matches = nullptr); static void parsePartName(const String & file_name, Part & part, const Poco::RegularExpression::MatchVec * matches = nullptr);

View File

@ -307,17 +307,22 @@ public:
Poco::File(to).remove(true); Poco::File(to).remove(true);
} }
/// Переименовывает кусок, дописав к имени префикс. void renameTo(const String & new_name) const
void renameAddPrefix(const String & prefix) const
{ {
String from = storage.full_path + name + "/"; String from = storage.full_path + name + "/";
String to = storage.full_path + prefix + name + "/"; String to = storage.full_path + new_name + "/";
Poco::File f(from); Poco::File f(from);
f.setLastModified(Poco::Timestamp::fromEpochTime(time(0))); f.setLastModified(Poco::Timestamp::fromEpochTime(time(0)));
f.renameTo(to); f.renameTo(to);
} }
/// Переименовывает кусок, дописав к имени префикс.
void renameAddPrefix(const String & prefix) const
{
renameTo(prefix + name);
}
/// Загрузить индекс и вычислить размер. Если size=0, вычислить его тоже. /// Загрузить индекс и вычислить размер. Если size=0, вычислить его тоже.
void loadIndex() void loadIndex()
{ {
@ -344,12 +349,12 @@ public:
} }
/// Прочитать контрольные суммы, если есть. /// Прочитать контрольные суммы, если есть.
void loadChecksums() void loadChecksums(bool require)
{ {
String path = storage.full_path + name + "/checksums.txt"; String path = storage.full_path + name + "/checksums.txt";
if (!Poco::File(path).exists()) if (!Poco::File(path).exists())
{ {
if (storage.require_part_metadata) if (require)
throw Exception("No checksums.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART); throw Exception("No checksums.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
return; return;
@ -359,16 +364,21 @@ public:
assertEOF(file); assertEOF(file);
} }
void loadColumns() void loadColumns(bool require)
{ {
String path = storage.full_path + name + "/columns.txt"; String path = storage.full_path + name + "/columns.txt";
if (!Poco::File(path).exists()) if (!Poco::File(path).exists())
{ {
if (storage.require_part_metadata) if (require)
throw Exception("No columns.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART); throw Exception("No columns.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
columns = *storage.columns;
/// Если нет файла со списком столбцов, запишем его. /// Если нет файла со списком столбцов, запишем его.
for (const NameAndTypePair & column : *storage.columns)
{
if (Poco::File(storage.full_path + name + "/" + escapeForFileName(column.name) + ".bin").exists())
columns.push_back(column);
}
{ {
WriteBufferFromFile out(path + ".tmp", 4096); WriteBufferFromFile out(path + ".tmp", 4096);
columns.writeText(out); columns.writeText(out);
@ -382,7 +392,7 @@ public:
columns.readText(file, storage.context.getDataTypeFactory()); columns.readText(file, storage.context.getDataTypeFactory());
} }
void checkNotBroken() void checkNotBroken(bool require_part_metadata)
{ {
String path = storage.full_path + name; String path = storage.full_path + name;
@ -391,7 +401,7 @@ public:
if (!checksums.files.count("primary.idx")) if (!checksums.files.count("primary.idx"))
throw Exception("No checksum for primary.idx", ErrorCodes::NO_FILE_IN_DATA_PART); throw Exception("No checksum for primary.idx", ErrorCodes::NO_FILE_IN_DATA_PART);
if (storage.require_part_metadata) if (require_part_metadata)
{ {
for (const NameAndTypePair & it : columns) for (const NameAndTypePair & it : columns)
{ {
@ -625,15 +635,23 @@ public:
*/ */
DataPartsVector renameTempPartAndReplace(MutableDataPartPtr part, Increment * increment = nullptr, Transaction * out_transaction = nullptr); DataPartsVector renameTempPartAndReplace(MutableDataPartPtr part, Increment * increment = nullptr, Transaction * out_transaction = nullptr);
/** Убирает из рабочего набора куски remove и добавляет куски add. /** Убирает из рабочего набора куски remove и добавляет куски add. add должны уже быть в all_data_parts.
* Если clear_without_timeout, данные будут удалены при следующем clearOldParts, игнорируя old_parts_lifetime. * Если clear_without_timeout, данные будут удалены при следующем clearOldParts, игнорируя old_parts_lifetime.
*/ */
void replaceParts(const DataPartsVector & remove, const DataPartsVector & add, bool clear_without_timeout); void replaceParts(const DataPartsVector & remove, const DataPartsVector & add, bool clear_without_timeout);
/** Добавляет новый кусок в список известных кусков и в рабочий набор.
*/
void attachPart(DataPartPtr part);
/** Переименовывает кусок в detached/prefix_кусок и забывает про него. Данные не будут удалены в clearOldParts. /** Переименовывает кусок в detached/prefix_кусок и забывает про него. Данные не будут удалены в clearOldParts.
* Если restore_covered, добавляет в рабочий набор неактивные куски, слиянием которых получен удаляемый кусок. * Если restore_covered, добавляет в рабочий набор неактивные куски, слиянием которых получен удаляемый кусок.
*/ */
void renameAndDetachPart(DataPartPtr part, const String & prefix = "", bool restore_covered = false); void renameAndDetachPart(DataPartPtr part, const String & prefix = "", bool restore_covered = false, bool move_to_detached = true);
/** Убирает кусок из списка кусков (включая all_data_parts), но не перемещщает директорию.
*/
void detachPartInPlace(DataPartPtr part);
/** Возвращает старые неактуальные куски, которые можно удалить. Одновременно удаляет их из списка кусков, но не с диска. /** Возвращает старые неактуальные куски, которые можно удалить. Одновременно удаляет их из списка кусков, но не с диска.
*/ */
@ -685,6 +703,9 @@ public:
ExpressionActionsPtr getPrimaryExpression() const { return primary_expr; } ExpressionActionsPtr getPrimaryExpression() const { return primary_expr; }
SortDescription getSortDescription() const { return sort_descr; } SortDescription getSortDescription() const { return sort_descr; }
/// Проверить, что кусок не сломан и посчитать для него чексуммы, если их нет.
MutableDataPartPtr loadPartAndFixMetadata(const String & relative_path);
const Context & context; const Context & context;
const String date_column_name; const String date_column_name;
const ASTPtr sampling_expression; const ASTPtr sampling_expression;

View File

@ -9,16 +9,27 @@ namespace DB
class MergeTreePartChecker class MergeTreePartChecker
{ {
public: public:
struct Settings
{
bool verbose = false; /// Пишет в stderr прогресс и ошибки, и не останавливается при первой ошибке.
bool require_checksums = false; /// Требует, чтобы был columns.txt.
bool require_column_files = false; /// Требует, чтобы для всех столбцов из columns.txt были файлы.
size_t index_granularity = 8192;
Settings & setVerbose(bool verbose_) { verbose = verbose_; return *this; }
Settings & setRequireChecksums(bool require_checksums_) { require_checksums = require_checksums_; return *this; }
Settings & setRequireColumnFiles(bool require_column_files_) { require_column_files = require_column_files_; return *this; }
Settings & setIndexGranularity(bool index_granularity_) { index_granularity = index_granularity_; return *this; }
};
/** Полностью проверяет данные кусочка: /** Полностью проверяет данные кусочка:
* - Вычисляет контрольные суммы и сравнивает с checksums.txt. * - Вычисляет контрольные суммы и сравнивает с checksums.txt.
* - Для массивов и строк проверяет соответствие размеров и количества данных. * - Для массивов и строк проверяет соответствие размеров и количества данных.
* - Проверяет правильность засечек. * - Проверяет правильность засечек.
* Бросает исключение, если кусок испорчен или если проверить не получилось (TODO: можно попробовать разделить эти случаи). * Бросает исключение, если кусок испорчен или если проверить не получилось (TODO: можно попробовать разделить эти случаи).
* Если strict, требует, чтобы для всех столбцов из columns.txt были файлы.
* Если verbose, пишет в stderr прогресс и ошибки, и не останавливается при первой ошибке.
*/ */
static void checkDataPart(String path, size_t index_granularity, bool strict, const DataTypeFactory & data_type_factory, static void checkDataPart(String path, const Settings & settings, const DataTypeFactory & data_type_factory,
bool verbose = false); MergeTreeData::DataPart::Checksums * out_checksums = nullptr);
}; };
} }

View File

@ -137,6 +137,7 @@ private:
GET_PART, /// Получить кусок с другой реплики. GET_PART, /// Получить кусок с другой реплики.
MERGE_PARTS, /// Слить куски. MERGE_PARTS, /// Слить куски.
DROP_RANGE, /// Удалить куски в указанном месяце в указанном диапазоне номеров. DROP_RANGE, /// Удалить куски в указанном месяце в указанном диапазоне номеров.
ATTACH_PART, /// Перенести кусок из директории detached или unreplicated.
}; };
String znode_name; String znode_name;
@ -144,11 +145,19 @@ private:
Type type; Type type;
String source_replica; /// Пустая строка значит, что эта запись была добавлена сразу в очередь, а не скопирована из лога. String source_replica; /// Пустая строка значит, что эта запись была добавлена сразу в очередь, а не скопирована из лога.
String new_part_name; /// Для DROP_RANGE имя несуществующего куска. Нужно удалить все куски, покрытые им. /// Имя куска, получающегося в результате.
/// Для DROP_RANGE имя несуществующего куска. Нужно удалить все куски, покрытые им.
String new_part_name;
Strings parts_to_merge; Strings parts_to_merge;
bool detach = false; /// Для DROP_RANGE, true значит, что куски нужно не удалить, а перенести в директорию detached. /// Для DROP_RANGE, true значит, что куски нужно не удалить, а перенести в директорию detached.
bool detach = false;
/// Для ATTACH_PART имя куска в директории detached или unreplicated.
String source_part_name;
/// Нужно переносить из директории unreplicated, а не detached.
bool attach_unreplicated;
FuturePartTaggerPtr future_part_tagger; FuturePartTaggerPtr future_part_tagger;
bool currently_executing = false; /// Доступ под queue_mutex. bool currently_executing = false; /// Доступ под queue_mutex.
@ -156,13 +165,13 @@ private:
void addResultToVirtualParts(StorageReplicatedMergeTree & storage) void addResultToVirtualParts(StorageReplicatedMergeTree & storage)
{ {
if (type == MERGE_PARTS || type == GET_PART || type == DROP_RANGE) if (type == MERGE_PARTS || type == GET_PART || type == DROP_RANGE || type == ATTACH_PART)
storage.virtual_parts.add(new_part_name); storage.virtual_parts.add(new_part_name);
} }
void tagPartAsFuture(StorageReplicatedMergeTree & storage) void tagPartAsFuture(StorageReplicatedMergeTree & storage)
{ {
if (type == MERGE_PARTS || type == GET_PART) if (type == MERGE_PARTS || type == GET_PART || type == ATTACH_PART)
future_part_tagger = new FuturePartTagger(new_part_name, storage); future_part_tagger = new FuturePartTagger(new_part_name, storage);
} }
@ -362,7 +371,7 @@ private:
* Кладет в ops действия, добавляющие данные о куске в ZooKeeper. * Кладет в ops действия, добавляющие данные о куске в ZooKeeper.
* Вызывать под TableStructureLock. * Вызывать под TableStructureLock.
*/ */
void checkPartAndAddToZooKeeper(MergeTreeData::DataPartPtr part, zkutil::Ops & ops); void checkPartAndAddToZooKeeper(MergeTreeData::DataPartPtr part, zkutil::Ops & ops, String name_override = "");
/// Убирает кусок из ZooKeeper и добавляет в очередь задание скачать его. Предполагается это делать с битыми кусками. /// Убирает кусок из ZooKeeper и добавляет в очередь задание скачать его. Предполагается это делать с битыми кусками.
void removePartAndEnqueueFetch(const String & part_name); void removePartAndEnqueueFetch(const String & part_name);
@ -396,7 +405,8 @@ private:
*/ */
bool executeLogEntry(const LogEntry & entry, BackgroundProcessingPool::Context & pool_context); bool executeLogEntry(const LogEntry & entry, BackgroundProcessingPool::Context & pool_context);
bool executeDropRange(const LogEntry & entry); void executeDropRange(const LogEntry & entry);
bool executeAttachPart(const LogEntry & entry); /// Возвращает false, если куска нет, и его нужно забрать с другой реплики.
/** Обновляет очередь. /** Обновляет очередь.
*/ */
@ -450,7 +460,7 @@ private:
/** Дождаться, пока все реплики, включая эту, выполнят указанное действие из лога. /** Дождаться, пока все реплики, включая эту, выполнят указанное действие из лога.
* Если одновременно с этим добавляются реплики, может не дождаться добавленную реплику. * Если одновременно с этим добавляются реплики, может не дождаться добавленную реплику.
*/ */
void waitForAllReplicasToProcessLogEntry(const String & log_znode_path, const LogEntry & entry); void waitForAllReplicasToProcessLogEntry(const LogEntry & entry);
}; };
} }

View File

@ -82,6 +82,12 @@ Strings ActiveDataPartSet::getParts() const
return res; return res;
} }
size_t ActiveDataPartSet::size() const
{
Poco::ScopedLock<Poco::Mutex> lock(mutex);
return parts.size();
}
String ActiveDataPartSet::getPartName(DayNum_t left_date, DayNum_t right_date, UInt64 left_id, UInt64 right_id, UInt64 level) String ActiveDataPartSet::getPartName(DayNum_t left_date, DayNum_t right_date, UInt64 left_id, UInt64 right_id, UInt64 level)
@ -110,10 +116,14 @@ String ActiveDataPartSet::getPartName(DayNum_t left_date, DayNum_t right_date, U
return res; return res;
} }
bool ActiveDataPartSet::isPartDirectory(const String & dir_name, Poco::RegularExpression::MatchVec & matches) bool ActiveDataPartSet::isPartDirectory(const String & dir_name, Poco::RegularExpression::MatchVec * out_matches)
{ {
Poco::RegularExpression::MatchVec matches;
static Poco::RegularExpression file_name_regexp("^(\\d{8})_(\\d{8})_(\\d+)_(\\d+)_(\\d+)"); static Poco::RegularExpression file_name_regexp("^(\\d{8})_(\\d{8})_(\\d+)_(\\d+)_(\\d+)");
return (file_name_regexp.match(dir_name, 0, matches) && 6 == matches.size()); bool res = (file_name_regexp.match(dir_name, 0, matches) && 6 == matches.size());
if (out_matches)
*out_matches = matches;
return res;
} }
void ActiveDataPartSet::parsePartName(const String & file_name, Part & part, const Poco::RegularExpression::MatchVec * matches_p) void ActiveDataPartSet::parsePartName(const String & file_name, Part & part, const Poco::RegularExpression::MatchVec * matches_p)
@ -121,7 +131,7 @@ void ActiveDataPartSet::parsePartName(const String & file_name, Part & part, con
Poco::RegularExpression::MatchVec match_vec; Poco::RegularExpression::MatchVec match_vec;
if (!matches_p) if (!matches_p)
{ {
if (!isPartDirectory(file_name, match_vec)) if (!isPartDirectory(file_name, &match_vec))
throw Exception("Unexpected part name: " + file_name, ErrorCodes::BAD_DATA_PART_NAME); throw Exception("Unexpected part name: " + file_name, ErrorCodes::BAD_DATA_PART_NAME);
matches_p = &match_vec; matches_p = &match_vec;
} }

View File

@ -5,6 +5,7 @@
#include <DB/Storages/MergeTree/MergeTreeReader.h> #include <DB/Storages/MergeTree/MergeTreeReader.h>
#include <DB/Storages/MergeTree/MergeTreeBlockInputStream.h> #include <DB/Storages/MergeTree/MergeTreeBlockInputStream.h>
#include <DB/Storages/MergeTree/MergedBlockOutputStream.h> #include <DB/Storages/MergeTree/MergedBlockOutputStream.h>
#include <DB/Storages/MergeTree/MergeTreePartChecker.h>
#include <DB/Parsers/ASTIdentifier.h> #include <DB/Parsers/ASTIdentifier.h>
#include <DB/Parsers/ASTNameTypePair.h> #include <DB/Parsers/ASTNameTypePair.h>
#include <DB/DataStreams/ExpressionBlockInputStream.h> #include <DB/DataStreams/ExpressionBlockInputStream.h>
@ -124,7 +125,7 @@ void MergeTreeData::loadDataParts()
Poco::RegularExpression::MatchVec matches; Poco::RegularExpression::MatchVec matches;
for (const String & file_name : part_file_names) for (const String & file_name : part_file_names)
{ {
if (!ActiveDataPartSet::isPartDirectory(file_name, matches)) if (!ActiveDataPartSet::isPartDirectory(file_name, &matches))
continue; continue;
MutableDataPartPtr part = std::make_shared<DataPart>(*this); MutableDataPartPtr part = std::make_shared<DataPart>(*this);
@ -135,10 +136,10 @@ void MergeTreeData::loadDataParts()
try try
{ {
part->loadColumns(); part->loadColumns(require_part_metadata);
part->loadChecksums(); part->loadChecksums(require_part_metadata);
part->loadIndex(); part->loadIndex();
part->checkNotBroken(); part->checkNotBroken(require_part_metadata);
} }
catch (...) catch (...)
{ {
@ -167,7 +168,7 @@ void MergeTreeData::loadDataParts()
{ {
if (contained_name == file_name) if (contained_name == file_name)
continue; continue;
if (!ActiveDataPartSet::isPartDirectory(contained_name, matches)) if (!ActiveDataPartSet::isPartDirectory(contained_name, &matches))
continue; continue;
DataPart contained_part(*this); DataPart contained_part(*this);
ActiveDataPartSet::parsePartName(contained_name, contained_part, &matches); ActiveDataPartSet::parsePartName(contained_name, contained_part, &matches);
@ -720,7 +721,17 @@ void MergeTreeData::replaceParts(const DataPartsVector & remove, const DataParts
} }
} }
void MergeTreeData::renameAndDetachPart(DataPartPtr part, const String & prefix, bool restore_covered) void MergeTreeData::attachPart(DataPartPtr part)
{
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);
if (!all_data_parts.insert(part).second)
throw Exception("Part " + part->name + " is already attached", ErrorCodes::DUPLICATE_DATA_PART);
data_parts.insert(part);
}
void MergeTreeData::renameAndDetachPart(DataPartPtr part, const String & prefix, bool restore_covered, bool move_to_detached)
{ {
LOG_INFO(log, "Renaming " << part->name << " to " << prefix << part->name << " and detaching it."); LOG_INFO(log, "Renaming " << part->name << " to " << prefix << part->name << " and detaching it.");
@ -731,7 +742,8 @@ void MergeTreeData::renameAndDetachPart(DataPartPtr part, const String & prefix,
throw Exception("No such data part", ErrorCodes::NO_SUCH_DATA_PART); throw Exception("No such data part", ErrorCodes::NO_SUCH_DATA_PART);
data_parts.erase(part); data_parts.erase(part);
part->renameAddPrefix("detached/" + prefix); if (move_to_detached || !prefix.empty())
part->renameAddPrefix((move_to_detached ? "detached/" : "") + prefix);
if (restore_covered) if (restore_covered)
{ {
@ -783,6 +795,11 @@ void MergeTreeData::renameAndDetachPart(DataPartPtr part, const String & prefix,
} }
} }
void MergeTreeData::detachPartInPlace(DataPartPtr part)
{
renameAndDetachPart(part, "", false, false);
}
MergeTreeData::DataParts MergeTreeData::getDataParts() MergeTreeData::DataParts MergeTreeData::getDataParts()
{ {
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex); Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
@ -879,6 +896,41 @@ MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_na
return nullptr; return nullptr;
} }
MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartAndFixMetadata(const String & relative_path)
{
MutableDataPartPtr part = std::make_shared<DataPart>(*this);
part->name = relative_path;
/// Раньше список столбцов записывался неправильно. Удалим его и создадим заново.
if (Poco::File(full_path + relative_path + "/columns.txt").exists())
Poco::File(full_path + relative_path + "/columns.txt").remove();
part->loadColumns(false);
part->loadChecksums(false);
part->loadIndex();
part->checkNotBroken(false);
part->modification_time = Poco::File(full_path + relative_path).getLastModified().epochTime();
/// Если нет файла с чексуммами, посчитаем чексуммы и запишем. Заодно проверим данные.
if (part->checksums.empty())
{
MergeTreePartChecker::Settings settings;
settings.setIndexGranularity(index_granularity);
settings.setRequireColumnFiles(true);
MergeTreePartChecker::checkDataPart(full_path + relative_path, settings, context.getDataTypeFactory(), &part->checksums);
{
WriteBufferFromFile out(full_path + relative_path + "/checksums.txt.tmp", 4096);
part->checksums.writeText(out);
}
Poco::File(full_path + relative_path + "/checksums.txt.tmp").renameTo(full_path + relative_path + "/checksums.txt");
}
return part;
}
void MergeTreeData::DataPart::Checksums::Checksum::checkEqual(const Checksum & rhs, bool have_uncompressed, const String & name) const void MergeTreeData::DataPart::Checksums::Checksum::checkEqual(const Checksum & rhs, bool have_uncompressed, const String & name) const
{ {

View File

@ -97,7 +97,7 @@ struct Stream
return size / sizeof(UInt64); return size / sizeof(UInt64);
} }
void assertMark(bool strict) void assertMark()
{ {
MarkInCompressedFile mrk_mark; MarkInCompressedFile mrk_mark;
readIntBinary(mrk_mark.offset_in_compressed_file, mrk_hashing_buf); readIntBinary(mrk_mark.offset_in_compressed_file, mrk_hashing_buf);
@ -152,7 +152,7 @@ struct Stream
}; };
/// Возвращает количество строк. Добавляет в checksums чексуммы всех файлов столбца. /// Возвращает количество строк. Добавляет в checksums чексуммы всех файлов столбца.
static size_t checkColumn(const String & path, const String & name, DataTypePtr type, size_t index_granularity, bool strict, static size_t checkColumn(const String & path, const String & name, DataTypePtr type, const MergeTreePartChecker::Settings & settings,
MergeTreeData::DataPart::Checksums & checksums) MergeTreeData::DataPart::Checksums & checksums)
{ {
size_t rows = 0; size_t rows = 0;
@ -171,10 +171,10 @@ static size_t checkColumn(const String & path, const String & name, DataTypePtr
if (sizes_stream.marksEOF()) if (sizes_stream.marksEOF())
break; break;
sizes_stream.assertMark(strict); sizes_stream.assertMark();
data_stream.assertMark(strict); data_stream.assertMark();
size_t cur_rows = sizes_stream.readUInt64(index_granularity, sizes); size_t cur_rows = sizes_stream.readUInt64(settings.index_granularity, sizes);
size_t sum = 0; size_t sum = 0;
for (size_t i = 0; i < cur_rows; ++i) for (size_t i = 0; i < cur_rows; ++i)
@ -188,7 +188,7 @@ static size_t checkColumn(const String & path, const String & name, DataTypePtr
data_stream.read(sum); data_stream.read(sum);
rows += cur_rows; rows += cur_rows;
if (cur_rows < index_granularity) if (cur_rows < settings.index_granularity)
break; break;
} }
@ -207,12 +207,12 @@ static size_t checkColumn(const String & path, const String & name, DataTypePtr
if (data_stream.marksEOF()) if (data_stream.marksEOF())
break; break;
data_stream.assertMark(strict); data_stream.assertMark();
size_t cur_rows = data_stream.read(index_granularity); size_t cur_rows = data_stream.read(settings.index_granularity);
rows += cur_rows; rows += cur_rows;
if (cur_rows < index_granularity) if (cur_rows < settings.index_granularity)
break; break;
} }
@ -228,8 +228,8 @@ static size_t checkColumn(const String & path, const String & name, DataTypePtr
} }
} }
void MergeTreePartChecker::checkDataPart(String path, size_t index_granularity, bool strict, const DataTypeFactory & data_type_factory, void MergeTreePartChecker::checkDataPart(String path, const Settings & settings, const DataTypeFactory & data_type_factory,
bool verbose) MergeTreeData::DataPart::Checksums * out_checksums)
{ {
if (!path.empty() && *path.rbegin() != '/') if (!path.empty() && *path.rbegin() != '/')
path += "/"; path += "/";
@ -243,7 +243,7 @@ void MergeTreePartChecker::checkDataPart(String path, size_t index_granularity,
assertEOF(buf); assertEOF(buf);
} }
if (strict || Poco::File(path + "checksums.txt").exists()) if (settings.require_checksums || Poco::File(path + "checksums.txt").exists())
{ {
ReadBufferFromFile buf(path + "checksums.txt"); ReadBufferFromFile buf(path + "checksums.txt");
checksums_txt.readText(buf); checksums_txt.readText(buf);
@ -266,7 +266,7 @@ void MergeTreePartChecker::checkDataPart(String path, size_t index_granularity,
for (const NameAndTypePair & column : columns) for (const NameAndTypePair & column : columns)
{ {
if (verbose) if (settings.verbose)
{ {
std::cerr << column.name << ":"; std::cerr << column.name << ":";
std::cerr.flush(); std::cerr.flush();
@ -275,14 +275,14 @@ void MergeTreePartChecker::checkDataPart(String path, size_t index_granularity,
bool ok = false; bool ok = false;
try try
{ {
if (!strict && !Poco::File(path + escapeForFileName(column.name) + ".bin").exists()) if (!settings.require_column_files && !Poco::File(path + escapeForFileName(column.name) + ".bin").exists())
{ {
if (verbose) if (settings.verbose)
std::cerr << " no files" << std::endl; std::cerr << " no files" << std::endl;
continue; continue;
} }
size_t cur_rows = checkColumn(path, column.name, column.type, index_granularity, strict, checksums_data); size_t cur_rows = checkColumn(path, column.name, column.type, settings, checksums_data);
if (first) if (first)
{ {
rows = cur_rows; rows = cur_rows;
@ -298,7 +298,7 @@ void MergeTreePartChecker::checkDataPart(String path, size_t index_granularity,
} }
catch (...) catch (...)
{ {
if (!verbose) if (!settings.verbose)
throw; throw;
ExceptionPtr e = cloneCurrentException(); ExceptionPtr e = cloneCurrentException();
if (!first_exception) if (!first_exception)
@ -311,18 +311,18 @@ void MergeTreePartChecker::checkDataPart(String path, size_t index_granularity,
std::cerr << std::endl; std::cerr << std::endl;
} }
if (verbose && ok) if (settings.verbose && ok)
std::cerr << " ok" << std::endl; std::cerr << " ok" << std::endl;
} }
if (first) if (first)
throw Exception("No columns", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED); throw Exception("No columns", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
if (primary_idx_size % ((rows - 1) / index_granularity + 1)) if (primary_idx_size % ((rows - 1) / settings.index_granularity + 1))
throw Exception("primary.idx size (" + toString(primary_idx_size) + ") not divisible by number of marks (" throw Exception("primary.idx size (" + toString(primary_idx_size) + ") not divisible by number of marks ("
+ toString(rows) + "/" + toString(index_granularity) + " rounded up)", ErrorCodes::CORRUPTED_DATA); + toString(rows) + "/" + toString(settings.index_granularity) + " rounded up)", ErrorCodes::CORRUPTED_DATA);
if (strict || !checksums_txt.files.empty()) if (settings.require_checksums || !checksums_txt.files.empty())
checksums_txt.checkEqual(checksums_data, true); checksums_txt.checkEqual(checksums_data, true);
if (first_exception) if (first_exception)

View File

@ -110,8 +110,8 @@ MergeTreeData::MutableDataPartPtr ReplicatedMergeTreePartsFetcher::fetchPart(
ActiveDataPartSet::parsePartName(part_name, *new_data_part); ActiveDataPartSet::parsePartName(part_name, *new_data_part);
new_data_part->modification_time = time(0); new_data_part->modification_time = time(0);
new_data_part->loadColumns(); new_data_part->loadColumns(true);
new_data_part->loadChecksums(); new_data_part->loadChecksums(true);
new_data_part->loadIndex(); new_data_part->loadIndex();
new_data_part->checksums.checkEqual(checksums, false); new_data_part->checksums.checkEqual(checksums, false);

View File

@ -17,6 +17,8 @@ const auto ERROR_SLEEP_MS = 1000;
const auto MERGE_SELECTING_SLEEP_MS = 5 * 1000; const auto MERGE_SELECTING_SLEEP_MS = 5 * 1000;
const auto CLEANUP_SLEEP_MS = 30 * 1000; const auto CLEANUP_SLEEP_MS = 30 * 1000;
const auto RESERVED_BLOCK_NUMBERS = 200;
/// Преобразовать число в строку формате суффиксов автоинкрементных нод в ZooKeeper. /// Преобразовать число в строку формате суффиксов автоинкрементных нод в ZooKeeper.
static String padIndex(UInt64 index) static String padIndex(UInt64 index)
{ {
@ -401,8 +403,8 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
bool insane = bool insane =
parts_to_add.size() > 2 || parts_to_add.size() > 2 ||
unexpected_parts.size() > 2 || unexpected_parts.size() > 2 ||
expected_parts.size() > 20 || expected_parts.size() > 5 ||
parts_to_fetch.size() > 2; parts_to_fetch.size() > 30;
if (insane && !skip_sanity_checks) if (insane && !skip_sanity_checks)
{ {
@ -475,8 +477,11 @@ void StorageReplicatedMergeTree::initVirtualParts()
} }
} }
void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataPartPtr part, zkutil::Ops & ops) void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataPartPtr part, zkutil::Ops & ops, String part_name)
{ {
if (part_name.empty())
part_name = part->name;
check(part->columns); check(part->columns);
int expected_columns_version = columns_version; int expected_columns_version = columns_version;
@ -488,22 +493,22 @@ void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataP
{ {
zkutil::Stat stat_before, stat_after; zkutil::Stat stat_before, stat_after;
String columns_str; String columns_str;
if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/parts/" + part->name + "/columns", columns_str, &stat_before)) if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/parts/" + part_name + "/columns", columns_str, &stat_before))
continue; continue;
if (columns_str != expected_columns_str) if (columns_str != expected_columns_str)
{ {
LOG_INFO(log, "Not checking checksums of part " << part->name << " with replica " << replica LOG_INFO(log, "Not checking checksums of part " << part_name << " with replica " << replica
<< " because columns are different"); << " because columns are different");
continue; continue;
} }
String checksums_str; String checksums_str;
/// Проверим, что версия ноды со столбцами не изменилась, пока мы читали checksums. /// Проверим, что версия ноды со столбцами не изменилась, пока мы читали checksums.
/// Это гарантирует, что столбцы и чексуммы относятся к одним и тем же данным. /// Это гарантирует, что столбцы и чексуммы относятся к одним и тем же данным.
if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/parts/" + part->name + "/checksums", checksums_str) || if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/parts/" + part_name + "/checksums", checksums_str) ||
!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/parts/" + part->name + "/columns", &stat_after) || !zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/parts/" + part_name + "/columns", &stat_after) ||
stat_before.version != stat_after.version) stat_before.version != stat_after.version)
{ {
LOG_INFO(log, "Not checking checksums of part " << part->name << " with replica " << replica LOG_INFO(log, "Not checking checksums of part " << part_name << " with replica " << replica
<< " because part changed while we were reading its checksums"); << " because part changed while we were reading its checksums");
continue; continue;
} }
@ -512,9 +517,9 @@ void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataP
checksums.checkEqual(part->checksums, true); checksums.checkEqual(part->checksums, true);
} }
if (zookeeper->exists(replica_path + "/parts/" + part->name)) if (zookeeper->exists(replica_path + "/parts/" + part_name))
{ {
LOG_ERROR(log, "checkPartAndAddToZooKeeper: node " << replica_path + "/parts/" + part->name << " already exists"); LOG_ERROR(log, "checkPartAndAddToZooKeeper: node " << replica_path + "/parts/" + part_name << " already exists");
return; return;
} }
@ -522,17 +527,17 @@ void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataP
zookeeper_path + "/columns", zookeeper_path + "/columns",
expected_columns_version)); expected_columns_version));
ops.push_back(new zkutil::Op::Create( ops.push_back(new zkutil::Op::Create(
replica_path + "/parts/" + part->name, replica_path + "/parts/" + part_name,
"", "",
zookeeper->getDefaultACL(), zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent)); zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create( ops.push_back(new zkutil::Op::Create(
replica_path + "/parts/" + part->name + "/columns", replica_path + "/parts/" + part_name + "/columns",
part->columns.toString(), part->columns.toString(),
zookeeper->getDefaultACL(), zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent)); zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create( ops.push_back(new zkutil::Op::Create(
replica_path + "/parts/" + part->name + "/checksums", replica_path + "/parts/" + part_name + "/checksums",
part->checksums.toString(), part->checksums.toString(),
zookeeper->getDefaultACL(), zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent)); zkutil::CreateMode::Persistent));
@ -749,7 +754,8 @@ void StorageReplicatedMergeTree::pullLogsToQueue(zkutil::EventPtr next_update_ev
bool StorageReplicatedMergeTree::shouldExecuteLogEntry(const LogEntry & entry) bool StorageReplicatedMergeTree::shouldExecuteLogEntry(const LogEntry & entry)
{ {
if ((entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::GET_PART) && future_parts.count(entry.new_part_name)) if ((entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::GET_PART || entry.type == LogEntry::ATTACH_PART)
&& future_parts.count(entry.new_part_name))
{ {
LOG_DEBUG(log, "Not executing log entry for part " << entry.new_part_name << LOG_DEBUG(log, "Not executing log entry for part " << entry.new_part_name <<
" because another log entry for the same part is being processed. This shouldn't happen often."); " because another log entry for the same part is being processed. This shouldn't happen often.");
@ -779,10 +785,14 @@ bool StorageReplicatedMergeTree::shouldExecuteLogEntry(const LogEntry & entry)
bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, BackgroundProcessingPool::Context & pool_context) bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, BackgroundProcessingPool::Context & pool_context)
{ {
if (entry.type == LogEntry::DROP_RANGE) if (entry.type == LogEntry::DROP_RANGE)
return executeDropRange(entry); {
executeDropRange(entry);
return true;
}
if (entry.type == LogEntry::GET_PART || if (entry.type == LogEntry::GET_PART ||
entry.type == LogEntry::MERGE_PARTS) entry.type == LogEntry::MERGE_PARTS ||
entry.type == LogEntry::ATTACH_PART)
{ {
/// Если у нас уже есть этот кусок или покрывающий его кусок, ничего делать не нужно. /// Если у нас уже есть этот кусок или покрывающий его кусок, ничего делать не нужно.
MergeTreeData::DataPartPtr containing_part = data.getActiveContainingPart(entry.new_part_name); MergeTreeData::DataPartPtr containing_part = data.getActiveContainingPart(entry.new_part_name);
@ -805,6 +815,10 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
{ {
do_fetch = true; do_fetch = true;
} }
else if (entry.type == LogEntry::ATTACH_PART)
{
do_fetch = !executeAttachPart(entry);
}
else if (entry.type == LogEntry::MERGE_PARTS) else if (entry.type == LogEntry::MERGE_PARTS)
{ {
MergeTreeData::DataPartsVector parts; MergeTreeData::DataPartsVector parts;
@ -959,7 +973,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
return true; return true;
} }
bool StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTree::LogEntry & entry) void StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTree::LogEntry & entry)
{ {
LOG_INFO(log, (entry.detach ? "Detaching" : "Removing") << " parts inside " << entry.new_part_name << "."); LOG_INFO(log, (entry.detach ? "Detaching" : "Removing") << " parts inside " << entry.new_part_name << ".");
@ -1019,7 +1033,7 @@ bool StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTr
/// Если кусок нужно удалить, надежнее удалить директорию после изменений в ZooKeeper. /// Если кусок нужно удалить, надежнее удалить директорию после изменений в ZooKeeper.
if (!entry.detach) if (!entry.detach)
data.replaceParts({part}, {}, false); data.replaceParts({part}, {}, true);
} }
LOG_INFO(log, (entry.detach ? "Detached " : "Removed ") << removed_parts << " parts inside " << entry.new_part_name << "."); LOG_INFO(log, (entry.detach ? "Detached " : "Removed ") << removed_parts << " parts inside " << entry.new_part_name << ".");
@ -1043,6 +1057,45 @@ bool StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTr
unreplicated_data->replaceParts({part}, {}, false); unreplicated_data->replaceParts({part}, {}, false);
} }
} }
}
bool StorageReplicatedMergeTree::executeAttachPart(const StorageReplicatedMergeTree::LogEntry & entry)
{
String source_path = (entry.attach_unreplicated ? "unreplicated/" : "detached/") + entry.source_part_name;
LOG_INFO(log, "Attaching part " << entry.source_part_name << " from " << source_path << " as " << entry.new_part_name);
if (!Poco::File(data.getFullPath() + source_path).isDirectory())
{
LOG_INFO(log, "No such directory. Will fetch " << entry.new_part_name << " instead");
return false;
}
LOG_DEBUG(log, "Checking data");
MergeTreeData::MutableDataPartPtr part = data.loadPartAndFixMetadata(source_path);
zkutil::Ops ops;
checkPartAndAddToZooKeeper(part, ops, entry.new_part_name);
if (entry.attach_unreplicated && unreplicated_data)
{
MergeTreeData::DataPartPtr unreplicated_part = unreplicated_data->getPartIfExists(entry.source_part_name);
if (unreplicated_part)
unreplicated_data->detachPartInPlace(unreplicated_part);
else
LOG_WARNING(log, "Unreplicated part " << entry.source_part_name << " is already detached");
}
zookeeper->multi(ops);
/// NOTE: Не можем использовать renameTempPartAndAdd, потому что кусок не временный - если что-то пойдет не так, его не нужно удалять.
part->renameTo(entry.new_part_name);
part->name = entry.new_part_name;
ActiveDataPartSet::parsePartName(part->name, *part);
data.attachPart(part);
LOG_INFO(log, "Finished attaching part " << entry.new_part_name);
/// На месте удаленных кусков могут появиться новые, с другими данными. /// На месте удаленных кусков могут появиться новые, с другими данными.
context.resetCaches(); context.resetCaches();
@ -1625,8 +1678,12 @@ void StorageReplicatedMergeTree::partCheckThread()
if (part->columns != zk_columns) if (part->columns != zk_columns)
throw Exception("Columns of local part " + part_name + " are different from ZooKeeper"); throw Exception("Columns of local part " + part_name + " are different from ZooKeeper");
MergeTreePartChecker::Settings settings;
settings.setIndexGranularity(data.index_granularity);
settings.setRequireChecksums(true);
settings.setRequireColumnFiles(true);
MergeTreePartChecker::checkDataPart( MergeTreePartChecker::checkDataPart(
data.getFullPath() + part_name, data.index_granularity, true, context.getDataTypeFactory()); data.getFullPath() + part_name, settings, context.getDataTypeFactory());
LOG_INFO(log, "Part " << part_name << " looks good."); LOG_INFO(log, "Part " << part_name << " looks good.");
} }
@ -2135,12 +2192,7 @@ static String getFakePartNameForDrop(const String & month_name, UInt64 left, UIn
void StorageReplicatedMergeTree::dropPartition(const Field & field, bool detach) void StorageReplicatedMergeTree::dropPartition(const Field & field, bool detach)
{ {
String month_name; String month_name = field.getType() == Field::Types::UInt64 ? toString(field.get<UInt64>()) : field.safeGet<String>();
if (field.getType() == Field::Types::UInt64)
month_name = toString(field.get<UInt64>());
else
month_name = field.safeGet<String>();
if (!isValidMonthName(month_name)) if (!isValidMonthName(month_name))
throw Exception("Invalid partition format: " + month_name + ". Partition should consist of 6 digits: YYYYMM", throw Exception("Invalid partition format: " + month_name + ". Partition should consist of 6 digits: YYYYMM",
@ -2191,12 +2243,101 @@ void StorageReplicatedMergeTree::dropPartition(const Field & field, bool detach)
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
/// Дождемся, пока все реплики выполнят дроп. /// Дождемся, пока все реплики выполнят дроп.
waitForAllReplicasToProcessLogEntry(log_znode_path, entry); waitForAllReplicasToProcessLogEntry(entry);
} }
void StorageReplicatedMergeTree::attachPartition(const Field& partition, bool unreplicated, bool part) void StorageReplicatedMergeTree::attachPartition(const Field & field, bool unreplicated, bool attach_part)
{ {
throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); String partition = field.getType() == Field::Types::UInt64 ? toString(field.get<UInt64>()) : field.safeGet<String>();
if (!attach_part && !isValidMonthName(partition))
throw Exception("Invalid partition format: " + partition + ". Partition should consist of 6 digits: YYYYMM",
ErrorCodes::INVALID_PARTITION_NAME);
String source_dir = (unreplicated ? "unreplicated/" : "detached/");
/// Составим список кусков, которые нужно добавить.
Strings parts;
if (attach_part)
{
parts.push_back(partition);
}
else
{
LOG_DEBUG(log, "Looking for parts for partition " << partition << " in " << source_dir);
ActiveDataPartSet active_parts;
for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it)
{
String name = it.name();
if (!ActiveDataPartSet::isPartDirectory(name))
continue;
if (name.substr(0, partition.size()) != partition)
continue;
LOG_DEBUG(log, "Found part " << name);
active_parts.add(name);
}
LOG_DEBUG(log, active_parts.size() << " of them are active");
parts = active_parts.getParts();
}
/// Синхронно проверим, что добавляемые куски существуют и не испорчены хотя бы на этой реплике. Запишем checksums.txt, если его нет.
LOG_DEBUG(log, "Checking parts");
for (const String & part : parts)
{
LOG_DEBUG(log, "Checking part " << part);
data.loadPartAndFixMetadata(source_dir + part);
}
/// Выделим добавляемым кускам максимальные свободные номера, меньшие RESERVED_BLOCK_NUMBERS.
/// NOTE: Проверка свободности номеров никак не синхронизируется. Выполнять несколько запросов ATTACH/DETACH/DROP одновременно нельзя.
UInt64 min_used_number = RESERVED_BLOCK_NUMBERS;
{
auto existing_parts = data.getDataParts();
for (const auto & part : existing_parts)
{
min_used_number = std::min(min_used_number, part->left);
}
}
if (parts.size() > min_used_number)
throw Exception("Not enough free small block numbers for attaching parts: "
+ toString(parts.size()) + " needed, " + toString(min_used_number) + " available", ErrorCodes::NOT_ENOUGH_BLOCK_NUMBERS);
/// Добавим записи в лог.
std::reverse(parts.begin(), parts.end());
std::list<LogEntry> entries;
zkutil::Ops ops;
for (const String & part_name : parts)
{
ActiveDataPartSet::Part part;
ActiveDataPartSet::parsePartName(part_name, part);
part.left = part.right = --min_used_number;
String new_part_name = ActiveDataPartSet::getPartName(part.left_date, part.right_date, part.left, part.right, part.level);
LOG_INFO(log, "Will attach " << part_name << " as " << new_part_name);
entries.emplace_back();
LogEntry & entry = entries.back();
entry.type = LogEntry::ATTACH_PART;
entry.source_replica = replica_name;
entry.source_part_name = part_name;
entry.new_part_name = new_part_name;
entry.attach_unreplicated = unreplicated;
ops.push_back(new zkutil::Op::Create(
zookeeper_path + "/log/log-", entry.toString(), zookeeper->getDefaultACL(), zkutil::CreateMode::PersistentSequential));
}
LOG_DEBUG(log, "Adding attaches to log");
zookeeper->multi(ops);
size_t i = 0;
for (LogEntry & entry : entries)
{
String log_znode_path = dynamic_cast<zkutil::Op::Create &>(ops[i++]).getPathCreated();
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
waitForAllReplicasToProcessLogEntry(entry);
}
} }
void StorageReplicatedMergeTree::drop() void StorageReplicatedMergeTree::drop()
@ -2246,7 +2387,7 @@ AbandonableLockInZooKeeper StorageReplicatedMergeTree::allocateBlockNumber(const
zkutil::Ops ops; zkutil::Ops ops;
auto acl = zookeeper->getDefaultACL(); auto acl = zookeeper->getDefaultACL();
ops.push_back(new zkutil::Op::Create(month_path, "", acl, zkutil::CreateMode::Persistent)); ops.push_back(new zkutil::Op::Create(month_path, "", acl, zkutil::CreateMode::Persistent));
for (size_t i = 0; i < 200; ++i) for (size_t i = 0; i < RESERVED_BLOCK_NUMBERS; ++i)
{ {
ops.push_back(new zkutil::Op::Create(month_path + "/skip_increment", "", acl, zkutil::CreateMode::Persistent)); ops.push_back(new zkutil::Op::Create(month_path + "/skip_increment", "", acl, zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Remove(month_path + "/skip_increment", -1)); ops.push_back(new zkutil::Op::Remove(month_path + "/skip_increment", -1));
@ -2260,11 +2401,11 @@ AbandonableLockInZooKeeper StorageReplicatedMergeTree::allocateBlockNumber(const
zookeeper_path + "/temp", *zookeeper); zookeeper_path + "/temp", *zookeeper);
} }
void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const String & log_znode_path, const LogEntry & entry) void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const LogEntry & entry)
{ {
LOG_DEBUG(log, "Waiting for all replicas to process " << entry.znode_name); LOG_DEBUG(log, "Waiting for all replicas to process " << entry.znode_name);
UInt64 log_index = parse<UInt64>(log_znode_path.substr(log_znode_path.size() - 10)); UInt64 log_index = parse<UInt64>(entry.znode_name.substr(entry.znode_name.size() - 10));
String log_entry_str = entry.toString(); String log_entry_str = entry.toString();
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas"); Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
@ -2354,6 +2495,16 @@ void StorageReplicatedMergeTree::LogEntry::writeText(WriteBuffer & out) const
writeString("drop\n", out); writeString("drop\n", out);
writeString(new_part_name, out); writeString(new_part_name, out);
break; break;
case ATTACH_PART:
writeString("attach\n", out);
if (attach_unreplicated)
writeString("unreplicated\n", out);
else
writeString("detached\n", out);
writeString(source_part_name, out);
writeString("\ninto\n", out);
writeString(new_part_name, out);
break;
} }
writeString("\n", out); writeString("\n", out);
} }
@ -2394,6 +2545,22 @@ void StorageReplicatedMergeTree::LogEntry::readText(ReadBuffer & in)
detach = type_str == "detach"; detach = type_str == "detach";
readString(new_part_name, in); readString(new_part_name, in);
} }
else if (type_str == "attach")
{
type = ATTACH_PART;
String source_type;
readString(source_type, in);
if (source_type == "unreplicated")
attach_unreplicated = true;
else if (source_type == "detached")
attach_unreplicated = false;
else
throw Exception("Bad format: expected 'unreplicated' or 'detached', found '" + source_type + "'", ErrorCodes::CANNOT_PARSE_TEXT);
assertString("\n", in);
readString(source_part_name, in);
assertString("\ninto\n", in);
readString(new_part_name, in);
}
assertString("\n", in); assertString("\n", in);
} }

View File

@ -15,8 +15,14 @@ int main(int argc, char ** argv)
try try
{ {
DB::MergeTreePartChecker::checkDataPart(argv[1], argc == 4 ? DB::parse<size_t>(argv[3]) : 8192ul, argv[2][0] == '1', DB::MergeTreePartChecker::Settings settings;
DB::DataTypeFactory(), true); if (argc == 4)
settings.setIndexGranularity(DB::parse<size_t>(argv[3]));
settings.setRequireChecksums(argv[2][0] == '1');
settings.setRequireColumnFiles(argv[2][0] == '1');
settings.setVerbose(true);
DB::MergeTreePartChecker::checkDataPart(argv[1], settings, DB::DataTypeFactory());
} }
catch (...) catch (...)
{ {