This commit is contained in:
Michael Kolupaev 2014-07-23 16:44:39 +04:00
commit 897d6fd336
14 changed files with 525 additions and 161 deletions

View File

@ -33,6 +33,8 @@
M(ObsoleteReplicatedParts, "Replicated parts rendered obsolete by fetches") \
M(ReplicatedPartMerges, "Replicated part merges") \
M(ReplicatedPartFetchesOfMerged, "Replicated part merges replaced with fetches") \
M(ReplicatedPartChecks, "Replicated part checks") \
M(ReplicatedPartChecksFailed, "Replicated part checks failed") \
\
M(END, "")

View File

@ -73,6 +73,8 @@ public:
/// Кладет в DataPart данные из имени кусочка.
static void parsePartName(const String & file_name, Part & part, const Poco::RegularExpression::MatchVec * matches = nullptr);
static bool contains(const String & outer_part_name, const String & inner_part_name);
private:
typedef std::set<Part> Parts;

View File

@ -4,6 +4,7 @@
#include <set>
#include <map>
#include <list>
#include <condition_variable>
#include <Poco/Mutex.h>
#include <Poco/RWLock.h>
#include <Poco/Event.h>
@ -32,7 +33,7 @@ public:
public:
void incrementCounter(const String & name, int value = 1)
{
Poco::ScopedLock<Poco::FastMutex> lock(pool.mutex);
std::unique_lock<std::mutex> lock(pool.mutex);
local_counters[name] += value;
pool.counters[name] += value;
}
@ -56,9 +57,12 @@ public:
/// Переставить таск в начало очереди и разбудить какой-нибудь поток.
void wake()
{
Poco::ScopedLock<Poco::FastMutex> lock(pool.mutex);
std::unique_lock<std::mutex> lock(pool.mutex);
pool.tasks.splice(pool.tasks.begin(), pool.tasks, iterator);
pool.wake_event.set();
/// Не очень надежно: если все потоки сейчас выполняют работу, этот вызов никого не разбудит,
/// и все будут спать в конце итерации.
pool.wake_event.notify_one();
}
private:
@ -83,8 +87,8 @@ public:
if (size_ <= 0)
throw Exception("Invalid number of threads: " + toString(size_), ErrorCodes::ARGUMENT_OUT_OF_BOUND);
Poco::ScopedLock<Poco::FastMutex> tlock(threads_mutex);
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
std::unique_lock<std::mutex> tlock(threads_mutex);
std::unique_lock<std::mutex> lock(mutex);
if (size_ == size)
return;
@ -100,30 +104,30 @@ public:
int getNumberOfThreads()
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
std::unique_lock<std::mutex> lock(mutex);
return size;
}
void setSleepTime(double seconds)
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
std::unique_lock<std::mutex> lock(mutex);
sleep_seconds = seconds;
}
int getCounter(const String & name)
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
std::unique_lock<std::mutex> lock(mutex);
return counters[name];
}
TaskHandle addTask(const Task & task)
{
Poco::ScopedLock<Poco::FastMutex> lock(threads_mutex);
std::unique_lock<std::mutex> lock(threads_mutex);
TaskHandle res(new TaskInfo(*this, task));
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
std::unique_lock<std::mutex> lock(mutex);
tasks.push_back(res);
res->iterator = --tasks.end();
}
@ -142,7 +146,7 @@ public:
void removeTask(const TaskHandle & task)
{
Poco::ScopedLock<Poco::FastMutex> tlock(threads_mutex);
std::unique_lock<std::mutex> tlock(threads_mutex);
/// Дождемся завершения всех выполнений этой задачи.
{
@ -151,7 +155,7 @@ public:
}
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
std::unique_lock<std::mutex> lock(mutex);
auto it = std::find(tasks.begin(), tasks.end(), task);
if (it == tasks.end())
throw Exception("Task not found", ErrorCodes::LOGICAL_ERROR);
@ -161,6 +165,7 @@ public:
if (tasks.empty())
{
shutdown = true;
wake_event.notify_all();
for (std::thread & thread : threads)
thread.join();
threads.clear();
@ -172,12 +177,12 @@ public:
{
try
{
Poco::ScopedLock<Poco::FastMutex> lock(threads_mutex);
std::unique_lock<std::mutex> lock(threads_mutex);
if (!threads.empty())
{
LOG_ERROR(&Logger::get("~BackgroundProcessingPool"), "Destroying non-empty BackgroundProcessingPool");
shutdown = true;
wake_event.set(); /// NOTE: это разбудит только один поток. Лучше было бы разбудить все.
wake_event.notify_all();
for (std::thread & thread : threads)
thread.join();
}
@ -192,15 +197,16 @@ private:
typedef std::list<TaskHandle> Tasks;
typedef std::vector<std::thread> Threads;
Poco::FastMutex threads_mutex;
Poco::FastMutex mutex;
std::mutex threads_mutex;
std::mutex mutex;
int size;
Tasks tasks; /// Таски в порядке, в котором мы планируем их выполнять.
Threads threads;
Poco::Event wake_event;
Counters counters;
double sleep_seconds;
bool shutdown;
volatile bool shutdown;
std::condition_variable wake_event;
void threadFunction()
{
@ -215,7 +221,7 @@ private:
TaskHandle task;
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
std::unique_lock<std::mutex> lock(mutex);
if (!tasks.empty())
{
@ -243,7 +249,7 @@ private:
if (task->function(context))
{
/// Если у таска получилось выполнить какую-то работу, запустим его снова без паузы.
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
std::unique_lock<std::mutex> lock(mutex);
auto it = std::find(tasks.begin(), tasks.end(), task);
if (it != tasks.end())
@ -262,7 +268,7 @@ private:
/// Вычтем все счетчики обратно.
if (!counters_diff.empty())
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
std::unique_lock<std::mutex> lock(mutex);
for (const auto & it : counters_diff)
{
counters[it.first] -= it.second;
@ -274,7 +280,8 @@ private:
if (need_sleep)
{
wake_event.tryWait(sleep_seconds * 1000. / tasks_count);
std::unique_lock<std::mutex> lock(mutex);
wake_event.wait_for(lock, std::chrono::duration<double>(sleep_seconds / tasks_count));
}
}
}

View File

@ -109,9 +109,10 @@ protected:
if (!reader)
{
UncompressedCache * uncompressed_cache = use_uncompressed_cache ? storage.context.getUncompressedCache() : NULL;
reader.reset(new MergeTreeReader(path, columns, uncompressed_cache, storage, all_mark_ranges));
reader.reset(new MergeTreeReader(path, owned_data_part->name, columns, uncompressed_cache, storage, all_mark_ranges));
if (prewhere_actions)
pre_reader.reset(new MergeTreeReader(path, pre_columns, uncompressed_cache, storage, all_mark_ranges));
pre_reader.reset(new MergeTreeReader(path, owned_data_part->name, pre_columns, uncompressed_cache, storage,
all_mark_ranges));
}
if (prewhere_actions)
@ -266,9 +267,9 @@ protected:
if (remaining_mark_ranges.empty())
{
/** Закрываем файлы (ещё до уничтожения объекта).
* Чтобы при создании многих источников, но одновременном чтении только из нескольких,
* буферы не висели в памяти.
*/
* Чтобы при создании многих источников, но одновременном чтении только из нескольких,
* буферы не висели в памяти.
*/
reader.reset();
pre_reader.reset();
part_columns_lock.reset();

View File

@ -118,6 +118,9 @@ struct MergeTreeSettings
class MergeTreeData : public ITableDeclaration
{
public:
/// Функция, которую можно вызвать, если есть подозрение, что данные куска испорчены.
typedef std::function<void (const String &)> BrokenPartCallback;
/// Описание куска с данными.
struct DataPart : public ActiveDataPartSet::Part
{
@ -222,7 +225,7 @@ public:
std::atomic<size_t> size_in_bytes; /// размер в байтах, 0 - если не посчитано;
/// atomic, чтобы можно было не заботиться о блокировках при ALTER.
time_t modification_time;
mutable time_t remove_time; /// Когда кусок убрали из рабочего набора.
mutable time_t remove_time = std::numeric_limits<time_t>::max(); /// Когда кусок убрали из рабочего набора.
/// Если true, деструктор удалит директорию с куском.
bool is_temp = false;
@ -531,6 +534,8 @@ public:
Aggregating,
};
static void doNothing(const String & name) {}
/** Подцепить таблицу с соответствующим именем, по соответствующему пути (с / на конце),
* (корректность имён и путей не проверяется)
* состоящую из указанных столбцов.
@ -550,8 +555,8 @@ public:
const String & sign_column_,
const MergeTreeSettings & settings_,
const String & log_name_,
bool require_part_metadata_
);
bool require_part_metadata_,
BrokenPartCallback broken_part_callback_ = &MergeTreeData::doNothing);
std::string getModePrefix() const;
@ -615,8 +620,9 @@ public:
void renameAndDetachPart(DataPartPtr part, const String & prefix);
/** Убрать кусок из рабочего набора. Его данные удалятся при вызове clearOldParts, когда их перестанут читать.
* Если clear_without_timeout, данные будут удалены при следующем clearOldParts, игнорируя old_parts_lifetime.
*/
void deletePart(DataPartPtr part);
void deletePart(DataPartPtr part, bool clear_without_timeout);
/** Удалить неактуальные куски. Возвращает имена удаленных кусков.
*/
@ -651,6 +657,12 @@ public:
/// Нужно вызывать под залоченным lockStructureForAlter().
void setColumnsList(const NamesAndTypesList & new_columns) { columns = new NamesAndTypesList(new_columns); }
/// Нужно вызвать, если есть подозрение, что данные куска испорчены.
void reportBrokenPart(const String & name)
{
broken_part_callback(name);
}
ExpressionActionsPtr getPrimaryExpression() const { return primary_expr; }
SortDescription getSortDescription() const { return sort_descr; }
@ -679,6 +691,8 @@ private:
NamesAndTypesListPtr columns;
BrokenPartCallback broken_part_callback;
String log_name;
Logger * log;

View File

@ -14,7 +14,7 @@ public:
* - Для массивов и строк проверяет соответствие размеров и количества данных.
* - Проверяет правильность засечек.
* Бросает исключение, если кусок испорчен или если проверить не получилось (TODO: можно попробовать разделить эти случаи).
* Если strict, требует, чтобы для всех столбцов из columns.txt были файлы, и чтобы засечки не указывали в конец сжатого блока.
* Если strict, требует, чтобы для всех столбцов из columns.txt были файлы.
* Если verbose, пишет в stderr прогресс и ошибки, и не останавливается при первой ошибке.
*/
static void checkDataPart(String path, size_t index_granularity, bool strict, const DataTypeFactory & data_type_factory,

View File

@ -37,12 +37,20 @@ class MergeTreeReader
typedef std::map<std::string, ColumnPtr> OffsetColumns;
public:
MergeTreeReader(const String & path_, /// Путь к куску
MergeTreeReader(const String & path_, const String & part_name_, /// Путь к куску
const NamesAndTypesList & columns_, bool use_uncompressed_cache_, MergeTreeData & storage_, const MarkRanges & all_mark_ranges)
: path(path_), columns(columns_), use_uncompressed_cache(use_uncompressed_cache_), storage(storage_)
: path(path_), part_name(part_name_), columns(columns_), use_uncompressed_cache(use_uncompressed_cache_), storage(storage_)
{
for (const NameAndTypePair & column : columns)
addStream(column.name, *column.type, all_mark_ranges);
try
{
for (const NameAndTypePair & column : columns)
addStream(column.name, *column.type, all_mark_ranges);
}
catch (...)
{
storage.reportBrokenPart(part_name);
throw;
}
}
/** Если столбцов нет в блоке, добавляет их, если есть - добавляет прочитанные значения к ним в конец.
@ -109,10 +117,19 @@ public:
}
catch (const Exception & e)
{
if (e.code() != ErrorCodes::ALL_REQUESTED_COLUMNS_ARE_MISSING)
storage.reportBrokenPart(part_name);
/// Более хорошая диагностика.
throw Exception(e.message() + " (while reading from part " + path + " from mark " + toString(from_mark) + " to "
+ toString(to_mark) + ")", e.code());
}
catch (...)
{
storage.reportBrokenPart(part_name);
throw;
}
}
/// Заполняет столбцы, которых нет в блоке, значениями по умолчанию.
@ -291,6 +308,7 @@ private:
typedef std::map<std::string, std::unique_ptr<Stream> > FileStreams;
String path;
String part_name;
FileStreams streams;
NamesAndTypesList columns;
bool use_uncompressed_cache;

View File

@ -10,52 +10,19 @@
namespace DB
{
class StorageReplicatedMergeTree;
class ReplicatedMergeTreePartsServer : public InterserverIOEndpoint
{
public:
ReplicatedMergeTreePartsServer(MergeTreeData & data_, StoragePtr owned_storage_) : data(data_),
owned_storage(owned_storage_), log(&Logger::get(data.getLogName() + " (Replicated PartsServer)")) {}
ReplicatedMergeTreePartsServer(MergeTreeData & data_, StorageReplicatedMergeTree & storage_) : data(data_),
storage(storage_), log(&Logger::get(data.getLogName() + " (Replicated PartsServer)")) {}
void processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out) override
{
String part_name = params.get("part");
LOG_TRACE(log, "Sending part " << part_name);
auto storage_lock = owned_storage->lockStructure(false);
MergeTreeData::DataPartPtr part = findPart(part_name);
Poco::ScopedReadRWLock part_lock(part->columns_lock);
/// Список файлов возьмем из списка контрольных сумм.
MergeTreeData::DataPart::Checksums checksums = part->checksums;
/// Добавим файлы, которых нет в списке контрольных сумм.
checksums.files["checksums.txt"];
checksums.files["columns.txt"];
writeBinary(checksums.files.size(), out);
for (const auto & it : checksums.files)
{
String path = data.getFullPath() + part_name + "/" + it.first;
UInt64 size = Poco::File(path).getSize();
writeStringBinary(it.first, out);
writeBinary(size, out);
ReadBufferFromFile file_in(path);
HashingWriteBuffer hashing_out(out);
copyData(file_in, hashing_out);
if (hashing_out.count() != size)
throw Exception("Unexpected size of file " + path, ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
writeBinary(hashing_out.getHash(), out);
}
}
void processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out) override;
private:
MergeTreeData & data;
StoragePtr owned_storage;
StorageReplicatedMergeTree & storage;
Logger * log;
@ -78,60 +45,7 @@ public:
const String & part_name,
const String & replica_path,
const String & host,
int port)
{
ReadBufferFromHTTP::Params params = {
std::make_pair("endpoint", "ReplicatedMergeTree:" + replica_path),
std::make_pair("part", part_name),
std::make_pair("compress", "false")};
ReadBufferFromHTTP in(host, port, params);
String part_path = data.getFullPath() + "tmp_" + part_name + "/";
if (!Poco::File(part_path).createDirectory())
throw Exception("Directory " + part_path + " already exists");
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data);
new_data_part->name = "tmp_" + part_name;
new_data_part->is_temp = true;
size_t files;
readBinary(files, in);
MergeTreeData::DataPart::Checksums checksums;
for (size_t i = 0; i < files; ++i)
{
String file_name;
UInt64 file_size;
readStringBinary(file_name, in);
readBinary(file_size, in);
WriteBufferFromFile file_out(part_path + file_name);
HashingWriteBuffer hashing_out(file_out);
copyData(in, hashing_out, file_size);
uint128 expected_hash;
readBinary(expected_hash, in);
if (expected_hash != hashing_out.getHash())
throw Exception("Checksum mismatch for file " + part_path + file_name + " transferred from " + replica_path);
if (file_name != "checksums.txt" &&
file_name != "columns.txt")
checksums.addFile(file_name, file_size, expected_hash);
}
assertEOF(in);
ActiveDataPartSet::parsePartName(part_name, *new_data_part);
new_data_part->modification_time = time(0);
new_data_part->loadColumns();
new_data_part->loadChecksums();
new_data_part->loadIndex();
new_data_part->checksums.checkEqual(checksums, false);
return new_data_part;
}
int port);
private:
MergeTreeData & data;

View File

@ -70,6 +70,9 @@ public:
bool supportsIndexForIn() const override { return true; }
/// Добавить кусок в очередь кусков, чьи данные нужно проверить в фоновом потоке.
void enqueuePartForCheck(const String & name);
private:
friend class ReplicatedMergeTreeBlockOutputStream;
@ -158,6 +161,7 @@ private:
typedef std::list<LogEntry> LogEntries;
typedef std::set<String> StringSet;
typedef std::list<String> StringList;
Context & context;
zkutil::ZooKeeperPtr zookeeper;
@ -179,6 +183,15 @@ private:
*/
StringSet future_parts;
/** Куски, для которых нужно проверить одно из двух:
* - Если кусок у нас есть, сверить, его данные с его контрольными суммами, а их с ZooKeeper.
* - Если куска у нас нет, проверить, есть ли он (или покрывающий его кусок) хоть у кого-то.
*/
StringSet parts_to_check_set;
StringList parts_to_check_queue;
Poco::FastMutex parts_to_check_mutex;
Poco::Event parts_to_check_event;
String database_name;
String table_name;
String full_path;
@ -218,7 +231,7 @@ private:
std::unique_ptr<MergeTreeDataSelectExecutor> unreplicated_reader;
std::unique_ptr<MergeTreeDataMerger> unreplicated_merger;
/// Потоки.
/// Потоки:
/// Поток, следящий за обновлениями в логах всех реплик и загружающий их в очередь.
std::thread queue_updating_thread;
@ -241,6 +254,8 @@ private:
std::thread alter_thread;
zkutil::EventPtr alter_thread_event = zkutil::EventPtr(new Poco::Event);
/// Поток, проверяющий данные кусков.
std::thread part_check_thread;
/// Событие, пробуждающее метод alter от ожидания завершения запроса ALTER.
zkutil::EventPtr alter_query_event = zkutil::EventPtr(new Poco::Event);
@ -314,6 +329,9 @@ private:
*/
void checkPartAndAddToZooKeeper(MergeTreeData::DataPartPtr part, zkutil::Ops & ops);
/// Убирает кусок из ZooKeeper и добавляет в очередь задание скачать его. Предполагается это делать с битыми кусками.
void removePartAndEnqueueFetch(const String & part_name);
void clearOldParts();
/// Удалить из ZooKeeper старые записи в логе.
@ -343,7 +361,7 @@ private:
*/
bool executeLogEntry(const LogEntry & entry, BackgroundProcessingPool::Context & pool_context);
/** В бесконечном цикле обновляет очередь.
/** Обновляет очередь.
*/
void queueUpdatingThread();
@ -355,19 +373,23 @@ private:
void becomeLeader();
/** В бесконечном цикле выбирает куски для слияния и записывает в лог.
/** Выбирает куски для слияния и записывает в лог.
*/
void mergeSelectingThread();
/** В бесконечном цикле вызывает clearOldBlocks.
/** Удаляет устаревшие данные.
*/
void cleanupThread();
/** В бесконечном цикле проверяет, не нужно ли сделать локальный ALTER, и делает его.
/** Делает локальный ALTER, когда список столбцов в ZooKeeper меняется.
*/
void alterThread();
/** В бесконечном цикле проверяет, не протухла ли сессия в ZooKeeper.
/** Проверяет целостность кусков.
*/
void partCheckThread();
/** Когда сессия в ZooKeeper протухает, переходит на новую.
*/
void restartingThread();

View File

@ -140,4 +140,12 @@ void ActiveDataPartSet::parsePartName(const String & file_name, Part & part, con
part.right_month = date_lut.toFirstDayNumOfMonth(part.right_date);
}
bool ActiveDataPartSet::contains(const String & outer_part_name, const String & inner_part_name)
{
Part outer, inner;
parsePartName(outer_part_name, outer);
parsePartName(inner_part_name, inner);
return outer.contains(inner);
}
}

View File

@ -27,15 +27,17 @@ MergeTreeData::MergeTreeData(
const String & sign_column_,
const MergeTreeSettings & settings_,
const String & log_name_,
bool require_part_metadata_)
bool require_part_metadata_,
BrokenPartCallback broken_part_callback_)
: context(context_),
date_column_name(date_column_name_), sampling_expression(sampling_expression_),
index_granularity(index_granularity_),
mode(mode_), sign_column(sign_column_),
settings(settings_), primary_expr_ast(primary_expr_ast_->clone()),
require_part_metadata(require_part_metadata_),
full_path(full_path_), columns(columns_), log_name(log_name_),
log(&Logger::get(log_name + " (Data)"))
full_path(full_path_), columns(columns_),
broken_part_callback(broken_part_callback_),
log_name(log_name_), log(&Logger::get(log_name + " (Data)"))
{
/// создаём директорию, если её нет
Poco::File(full_path).createDirectories();
@ -265,7 +267,8 @@ Strings MergeTreeData::clearOldParts()
{
int ref_count = it->use_count();
if (ref_count == 1 && /// После этого ref_count не может увеличиться.
(*it)->remove_time + settings.old_parts_lifetime < now)
(*it)->remove_time < now &&
now - (*it)->remove_time > settings.old_parts_lifetime)
{
LOG_DEBUG(log, "Removing part " << (*it)->name);
@ -532,6 +535,9 @@ void MergeTreeData::AlterDataPartTransaction::commit()
mutable_part.size_in_bytes = MergeTreeData::DataPart::calcTotalSize(path);
/// TODO: можно не сбрасывать кеши при добавлении столбца.
data_part->storage.context.resetCaches();
clear();
}
catch (...)
@ -704,13 +710,15 @@ void MergeTreeData::renameAndDetachPart(DataPartPtr part, const String & prefix)
Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);
if (!all_data_parts.erase(part))
throw Exception("No such data part", ErrorCodes::NO_SUCH_DATA_PART);
part->remove_time = time(0);
data_parts.erase(part);
part->renameAddPrefix(prefix);
}
void MergeTreeData::deletePart(DataPartPtr part)
void MergeTreeData::deletePart(DataPartPtr part, bool clear_without_timeout)
{
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
part->remove_time = clear_without_timeout ? 0 : time(0);
data_parts.erase(part);
}

View File

@ -107,16 +107,13 @@ struct Stream
if (uncompressed_hashing_buf.position() == uncompressed_hashing_buf.buffer().end())
{
if (!strict)
{
/// Если засечка должна быть ровно на границе блоков, нам подходит и засечка, указывающая на конец предыдущего блока,
/// и на начало следующего.
data_mark.offset_in_compressed_file = compressed_hashing_buf.count() - uncompressing_buf.getSizeCompressed();
data_mark.offset_in_decompressed_block = uncompressed_hashing_buf.offset();
/// Если засечка должна быть ровно на границе блоков, нам подходит и засечка, указывающая на конец предыдущего блока,
/// и на начало следующего.
data_mark.offset_in_compressed_file = compressed_hashing_buf.count() - uncompressing_buf.getSizeCompressed();
data_mark.offset_in_decompressed_block = uncompressed_hashing_buf.offset();
if (mrk_mark == data_mark)
return;
}
if (mrk_mark == data_mark)
return;
uncompressed_hashing_buf.next();
}

View File

@ -0,0 +1,122 @@
#include <DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
namespace DB
{
void ReplicatedMergeTreePartsServer::processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out)
{
String part_name = params.get("part");
LOG_TRACE(log, "Sending part " << part_name);
try
{
auto storage_lock = storage.lockStructure(false);
MergeTreeData::DataPartPtr part = findPart(part_name);
Poco::ScopedReadRWLock part_lock(part->columns_lock);
/// Список файлов возьмем из списка контрольных сумм.
MergeTreeData::DataPart::Checksums checksums = part->checksums;
/// Добавим файлы, которых нет в списке контрольных сумм.
checksums.files["checksums.txt"];
checksums.files["columns.txt"];
MergeTreeData::DataPart::Checksums data_checksums;
writeBinary(checksums.files.size(), out);
for (const auto & it : checksums.files)
{
String file_name = it.first;
String path = data.getFullPath() + part_name + "/" + file_name;
UInt64 size = Poco::File(path).getSize();
writeStringBinary(it.first, out);
writeBinary(size, out);
ReadBufferFromFile file_in(path);
HashingWriteBuffer hashing_out(out);
copyData(file_in, hashing_out);
if (hashing_out.count() != size)
throw Exception("Unexpected size of file " + path, ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
writeBinary(hashing_out.getHash(), out);
if (file_name != "checksums.txt" &&
file_name != "columns.txt")
data_checksums.addFile(file_name, hashing_out.count(), hashing_out.getHash());
}
part->checksums.checkEqual(data_checksums, false);
}
catch (...)
{
storage.enqueuePartForCheck(part_name);
throw;
}
}
MergeTreeData::MutableDataPartPtr ReplicatedMergeTreePartsFetcher::fetchPart(
const String & part_name,
const String & replica_path,
const String & host,
int port)
{
ReadBufferFromHTTP::Params params = {
std::make_pair("endpoint", "ReplicatedMergeTree:" + replica_path),
std::make_pair("part", part_name),
std::make_pair("compress", "false")};
ReadBufferFromHTTP in(host, port, params);
String part_path = data.getFullPath() + "tmp_" + part_name + "/";
if (!Poco::File(part_path).createDirectory())
throw Exception("Directory " + part_path + " already exists");
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data);
new_data_part->name = "tmp_" + part_name;
new_data_part->is_temp = true;
size_t files;
readBinary(files, in);
MergeTreeData::DataPart::Checksums checksums;
for (size_t i = 0; i < files; ++i)
{
String file_name;
UInt64 file_size;
readStringBinary(file_name, in);
readBinary(file_size, in);
WriteBufferFromFile file_out(part_path + file_name);
HashingWriteBuffer hashing_out(file_out);
copyData(in, hashing_out, file_size);
uint128 expected_hash;
readBinary(expected_hash, in);
if (expected_hash != hashing_out.getHash())
throw Exception("Checksum mismatch for file " + part_path + file_name + " transferred from " + replica_path);
if (file_name != "checksums.txt" &&
file_name != "columns.txt")
checksums.addFile(file_name, file_size, expected_hash);
}
assertEOF(in);
ActiveDataPartSet::parsePartName(part_name, *new_data_part);
new_data_part->modification_time = time(0);
new_data_part->loadColumns();
new_data_part->loadChecksums();
new_data_part->loadIndex();
new_data_part->checksums.checkEqual(checksums, false);
return new_data_part;
}
}

View File

@ -1,6 +1,7 @@
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h>
#include <DB/Storages/MergeTree/MergeTreePartChecker.h>
#include <DB/Parsers/formatAST.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/IO/ReadBufferFromString.h>
@ -35,7 +36,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
table_name(name_), full_path(path_ + escapeForFileName(table_name) + '/'), zookeeper_path(zookeeper_path_),
replica_name(replica_name_),
data( full_path, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
index_granularity_, mode_, sign_column_, settings_, database_name_ + "." + table_name, true),
index_granularity_, mode_, sign_column_, settings_, database_name_ + "." + table_name, true,
std::bind(&StorageReplicatedMergeTree::enqueuePartForCheck, this, std::placeholders::_1)),
reader(data), writer(data), merger(data), fetcher(data),
log(&Logger::get(database_name_ + "." + table_name + " (StorageReplicatedMergeTree)")),
shutdown_event(false), permanent_shutdown_event(false)
@ -119,7 +121,7 @@ StoragePtr StorageReplicatedMergeTree::create(
if (!res->is_read_only)
{
String endpoint_name = "ReplicatedMergeTree:" + res->replica_path;
InterserverIOEndpointPtr endpoint = new ReplicatedMergeTreePartsServer(res->data, res_ptr);
InterserverIOEndpointPtr endpoint = new ReplicatedMergeTreePartsServer(res->data, *res);
res->endpoint_holder = new InterserverIOEndpointHolder(endpoint_name, endpoint, res->context.getInterserverIOHandler());
}
return res_ptr;
@ -660,7 +662,7 @@ void StorageReplicatedMergeTree::pullLogsToQueue(zkutil::EventPtr next_update_ev
Strings entries = zookeeper->getChildren(zookeeper_path + "/log");
index = entries.empty() ? 0 : parse<UInt64>(std::min_element(entries.begin(), entries.end())->substr(strlen("log-")));
zookeeper->set(replica_path + "/log_pointer", toString(index), zkutil::CreateMode::Persistent);
zookeeper->set(replica_path + "/log_pointer", toString(index));
}
else
{
@ -893,6 +895,10 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
return false;
}
}
/// Если ни у кого нет куска, и в очереди нет слияний с его участием, проверим, есть ли у кого-то покрывающий его.
if (replica.empty())
enqueuePartForCheck(entry.new_part_name);
}
catch (...)
{
@ -1082,10 +1088,7 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
/// Уберем больше не нужные отметки о несуществующих блоках.
for (UInt64 number = parts[i]->right + 1; number <= parts[i + 1]->left - 1; ++number)
{
String number_str = toString(number);
while (number_str.size() < 10)
number_str = '0' + number_str;
String path = zookeeper_path + "/block_numbers/" + month_name + "/block-" + number_str;
String path = zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number);
zookeeper->tryRemove(path);
}
@ -1190,7 +1193,8 @@ void StorageReplicatedMergeTree::alterThread()
for (const MergeTreeData::DataPartPtr & part : parts)
{
/// Обновим кусок и запишем результат во временные файлы.
/// TODO: Можно пропускать проверку на слишком большие изменения, если в ZooKeeper есть, например, нода /flags/force_alter.
/// TODO: Можно пропускать проверку на слишком большие изменения, если в ZooKeeper есть, например,
/// нода /flags/force_alter.
auto transaction = data.alterDataPart(part, columns);
if (!transaction)
@ -1248,6 +1252,250 @@ void StorageReplicatedMergeTree::alterThread()
LOG_DEBUG(log, "alter thread finished");
}
void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_name)
{
String part_path = replica_path + "/parts/" + part_name;
LogEntry log_entry;
log_entry.type = LogEntry::GET_PART;
log_entry.source_replica = "";
log_entry.new_part_name = part_name;
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Create(
replica_path + "/queue/queue-", log_entry.toString(), zookeeper->getDefaultACL(),
zkutil::CreateMode::PersistentSequential));
ops.push_back(new zkutil::Op::Remove(part_path + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(part_path + "/columns", -1));
ops.push_back(new zkutil::Op::Remove(part_path, -1));
auto results = zookeeper->multi(ops);
{
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
String path_created = dynamic_cast<zkutil::Op::Create &>(ops[0]).getPathCreated();
log_entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
log_entry.addResultToVirtualParts(*this);
queue.push_back(log_entry);
}
}
void StorageReplicatedMergeTree::enqueuePartForCheck(const String & name)
{
Poco::ScopedLock<Poco::FastMutex> lock(parts_to_check_mutex);
if (parts_to_check_set.count(name))
return;
parts_to_check_queue.push_back(name);
parts_to_check_set.insert(name);
parts_to_check_event.set();
}
void StorageReplicatedMergeTree::partCheckThread()
{
while (!shutdown_called)
{
try
{
/// Достанем из очереди кусок для проверки.
String part_name;
{
Poco::ScopedLock<Poco::FastMutex> lock(parts_to_check_mutex);
if (parts_to_check_queue.empty())
{
if (!parts_to_check_set.empty())
{
LOG_ERROR(log, "Non-empty parts_to_check_set with empty parts_to_check_queue. This is a bug.");
parts_to_check_set.clear();
}
}
else
{
part_name = parts_to_check_queue.front();
}
}
if (part_name.empty())
{
parts_to_check_event.wait();
continue;
}
LOG_WARNING(log, "Checking part " << part_name);
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecks);
auto part = data.getContainingPart(part_name);
String part_path = replica_path + "/parts/" + part_name;
/// Этого или покрывающего куска у нас нет.
if (!part)
{
/// Если кусок есть в ZooKeeper, удалим его оттуда и добавим в очередь задание скачать его.
if (zookeeper->exists(part_path))
{
LOG_WARNING(log, "Part " << part_name << " exists in ZooKeeper but not locally. "
"Removing from ZooKeeper and queueing a fetch.");
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
removePartAndEnqueueFetch(part_name);
}
/// Если куска нет в ZooKeeper, проверим есть ли он хоть у кого-то.
else
{
ActiveDataPartSet::Part part_info;
ActiveDataPartSet::parsePartName(part_name, part_info);
/** Будем проверять только куски, не полученные в результате слияния.
* Для кусков, полученных в результате слияния такая проверка была бы некорректной,
* потому что слитого куска может еще ни у кого не быть.
*/
if (part_info.left == part_info.right)
{
LOG_WARNING(log, "Checking if anyone has part covering " << part_name << ".");
bool found = false;
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
for (const String & replica : replicas)
{
Strings parts = zookeeper->getChildren(zookeeper_path + "/replicas/" + replica + "/parts");
for (const String & part_on_replica : parts)
{
if (part_on_replica == part_name || ActiveDataPartSet::contains(part_on_replica, part_name))
{
found = true;
LOG_WARNING(log, "Found part " << part_on_replica << " on " << replica);
break;
}
}
if (found)
break;
}
if (!found)
{
/** Такая ситуация возможна при нормальной работе, без потери данных, например, так:
* ReplicatedMergeTreeBlockOutputStream записал кусок, попытался добавить его в ZK,
* получил operation timeout, удалил локальный кусок и бросил исключение,
* а на самом деле, кусок добавился в ZK.
*/
LOG_ERROR(log, "No replica has part covering " << part_name << ". This part is lost forever. "
<< "There might or might not be a data loss.");
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
/** Если ни у кого нет такого куска, удалим его из нашей очереди.
*
* Еще можно было бы добавить его в block_numbers, чтобы он не мешал слияниям,
* но если так сделать, ZooKeeper почему-то пропустит один номер для автоинкремента,
* и в номерах блоков все равно останется дырка.
* TODO: можно это исправить, сделав две директории block_numbers: для автоинкрементных и ручных нод.
*/
{
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
/** NOTE: Не удалятся записи в очереди, которые сейчас выполняются.
* Они пофейлятся и положат кусок снова в очередь на проверку.
* Расчитываем, что это редкая ситуация.
*/
for (LogEntries::iterator it = queue.begin(); it != queue.end(); )
{
if (it->new_part_name == part_name)
{
zookeeper->remove(replica_path + "/queue/" + it->znode_name);
queue.erase(it++);
}
else
{
++it;
}
}
}
}
}
}
}
/// У нас есть этот кусок, и он активен.
else if (part->name == part_name)
{
/// Если кусок есть в ZooKeeper, сверим его данные с его чексуммами, а их с ZooKeeper.
if (zookeeper->exists(replica_path + "/parts/" + part_name))
{
LOG_WARNING(log, "Checking data of part " << part_name << ".");
try
{
auto zk_checksums = MergeTreeData::DataPart::Checksums::parse(
zookeeper->get(replica_path + "/parts/" + part_name + "/checksums"));
zk_checksums.checkEqual(part->checksums, true);
auto zk_columns = NamesAndTypesList::parse(
zookeeper->get(replica_path + "/parts/" + part_name + "/columns"), context.getDataTypeFactory());
if (part->columns != zk_columns)
throw Exception("Columns of local part " + part_name + " are different from ZooKeeper");
MergeTreePartChecker::checkDataPart(
data.getFullPath() + part_name, data.index_granularity, true, context.getDataTypeFactory());
LOG_INFO(log, "Part " << part_name << " looks good.");
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
LOG_ERROR(log, "Part " << part_name << " looks broken. Removing it and queueing a fetch.");
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
removePartAndEnqueueFetch(part_name);
/// Удалим кусок локально.
data.deletePart(part, true);
}
}
/// Если куска нет в ZooKeeper, удалим его локально.
else
{
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
/// Если этот кусок еще и получен в результате слияния, это уже чересчур странно.
if (part->left != part->right)
{
LOG_ERROR(log, "Unexpected part " << part_name << " is a result of a merge. You have to resolve this manually.");
}
else
{
LOG_ERROR(log, "Unexpected part " << part_name << ". Removing.");
data.deletePart(part, false);
}
}
}
else
{
/// Если у нас есть покрывающий кусок, игнорируем все проблемы с этим куском.
/// В худшем случае в лог еще old_parts_lifetime секунд будут валиться ошибки, пока кусок не удалится как старый.
}
/// Удалим кусок из очереди.
{
Poco::ScopedLock<Poco::FastMutex> lock(parts_to_check_mutex);
if (parts_to_check_queue.empty() || parts_to_check_queue.front() != part_name)
{
LOG_ERROR(log, "Someone changed parts_to_check_queue.front(). This is a bug.");
}
else
{
parts_to_check_queue.pop_front();
parts_to_check_set.erase(part_name);
}
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
parts_to_check_event.tryWait(ERROR_SLEEP_MS);
}
}
}
bool StorageReplicatedMergeTree::canMergeParts(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
{
/// Если какой-то из кусков уже собираются слить в больший, не соглашаемся его сливать.
@ -1265,10 +1513,7 @@ bool StorageReplicatedMergeTree::canMergeParts(const MergeTreeData::DataPartPtr
/// Можно слить куски, если все номера между ними заброшены - не соответствуют никаким блокам.
for (UInt64 number = left->right + 1; number <= right->left - 1; ++number)
{
String number_str = toString(number);
while (number_str.size() < 10)
number_str = '0' + number_str;
String path = zookeeper_path + "/block_numbers/" + month_name + "/block-" + number_str;
String path = zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number);
if (AbandonableLockInZooKeeper::check(path, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED)
{
@ -1370,6 +1615,7 @@ void StorageReplicatedMergeTree::partialShutdown()
queue_updating_event->set();
alter_thread_event->set();
alter_query_event->set();
parts_to_check_event.set();
replica_is_active_node = nullptr;
merger.cancelAll();
@ -1389,6 +1635,8 @@ void StorageReplicatedMergeTree::partialShutdown()
cleanup_thread.join();
if (alter_thread.joinable())
alter_thread.join();
if (part_check_thread.joinable())
part_check_thread.join();
if (queue_task_handle)
context.getBackgroundPool().removeTask(queue_task_handle);
queue_task_handle.reset();
@ -1423,6 +1671,7 @@ void StorageReplicatedMergeTree::startup()
queue_updating_thread = std::thread(&StorageReplicatedMergeTree::queueUpdatingThread, this);
cleanup_thread = std::thread(&StorageReplicatedMergeTree::cleanupThread, this);
alter_thread = std::thread(&StorageReplicatedMergeTree::alterThread, this);
part_check_thread = std::thread(&StorageReplicatedMergeTree::partCheckThread, this);
queue_task_handle = context.getBackgroundPool().addTask(
std::bind(&StorageReplicatedMergeTree::queueTask, this, std::placeholders::_1));
}