This commit is contained in:
Michael Kolupaev 2014-07-23 13:15:41 +04:00
parent 256f15b099
commit f3285dba79
6 changed files with 63 additions and 16 deletions

View File

@ -33,6 +33,8 @@
M(ObsoleteReplicatedParts, "Replicated parts rendered obsolete by fetches") \
M(ReplicatedPartMerges, "Replicated part merges") \
M(ReplicatedPartFetchesOfMerged, "Replicated part merges replaced with fetches") \
M(ReplicatedPartChecks, "Replicated part checks") \
M(ReplicatedPartChecksFailed, "Replicated part checks failed") \
\
M(END, "")

View File

@ -109,9 +109,10 @@ protected:
if (!reader)
{
UncompressedCache * uncompressed_cache = use_uncompressed_cache ? storage.context.getUncompressedCache() : NULL;
reader.reset(new MergeTreeReader(path, columns, uncompressed_cache, storage, all_mark_ranges));
reader.reset(new MergeTreeReader(path, owned_data_part->name, columns, uncompressed_cache, storage, all_mark_ranges));
if (prewhere_actions)
pre_reader.reset(new MergeTreeReader(path, pre_columns, uncompressed_cache, storage, all_mark_ranges));
pre_reader.reset(new MergeTreeReader(path, owned_data_part->name, pre_columns, uncompressed_cache, storage,
all_mark_ranges));
}
if (prewhere_actions)

View File

@ -118,6 +118,9 @@ struct MergeTreeSettings
class MergeTreeData : public ITableDeclaration
{
public:
/// Функция, которую можно вызвать, если есть подозрение, что данные куска испорчены.
typedef std::function<void (const String &)> BrokenPartCallback;
/// Описание куска с данными.
struct DataPart : public ActiveDataPartSet::Part
{
@ -531,6 +534,8 @@ public:
Aggregating,
};
static void doNothing(const String & name) {}
/** Подцепить таблицу с соответствующим именем, по соответствующему пути (с / на конце),
* (корректность имён и путей не проверяется)
* состоящую из указанных столбцов.
@ -550,8 +555,8 @@ public:
const String & sign_column_,
const MergeTreeSettings & settings_,
const String & log_name_,
bool require_part_metadata_
);
bool require_part_metadata_,
BrokenPartCallback broken_part_callback_ = &MergeTreeData::doNothing);
std::string getModePrefix() const;
@ -651,6 +656,12 @@ public:
/// Нужно вызывать под залоченным lockStructureForAlter().
void setColumnsList(const NamesAndTypesList & new_columns) { columns = new NamesAndTypesList(new_columns); }
/// Нужно вызвать, если есть подозрение, что данные куска испорчены.
void reportBrokenPart(const String & name)
{
broken_part_callback(name);
}
ExpressionActionsPtr getPrimaryExpression() const { return primary_expr; }
SortDescription getSortDescription() const { return sort_descr; }
@ -679,6 +690,8 @@ private:
NamesAndTypesListPtr columns;
BrokenPartCallback broken_part_callback;
String log_name;
Logger * log;

View File

@ -37,13 +37,21 @@ class MergeTreeReader
typedef std::map<std::string, ColumnPtr> OffsetColumns;
public:
MergeTreeReader(const String & path_, /// Путь к куску
MergeTreeReader(const String & path_, const String & part_name_, /// Путь к куску
const NamesAndTypesList & columns_, bool use_uncompressed_cache_, MergeTreeData & storage_, const MarkRanges & all_mark_ranges)
: path(path_), columns(columns_), use_uncompressed_cache(use_uncompressed_cache_), storage(storage_)
: path(path_), part_name(part_name_), columns(columns_), use_uncompressed_cache(use_uncompressed_cache_), storage(storage_)
{
try
{
for (const NameAndTypePair & column : columns)
addStream(column.name, *column.type, all_mark_ranges);
}
catch (...)
{
storage.reportBrokenPart(part_name);
throw;
}
}
/** Если столбцов нет в блоке, добавляет их, если есть - добавляет прочитанные значения к ним в конец.
* Не добавляет столбцы, для которых нет файлов. Чтобы их добавить, нужно вызвать fillMissingColumns.
@ -109,10 +117,19 @@ public:
}
catch (const Exception & e)
{
if (e.code() != ErrorCodes::ALL_REQUESTED_COLUMNS_ARE_MISSING)
storage.reportBrokenPart(part_name);
/// Более хорошая диагностика.
throw Exception(e.message() + " (while reading from part " + path + " from mark " + toString(from_mark) + " to "
+ toString(to_mark) + ")", e.code());
}
catch (...)
{
storage.reportBrokenPart(part_name);
throw;
}
}
/// Заполняет столбцы, которых нет в блоке, значениями по умолчанию.
@ -291,6 +308,7 @@ private:
typedef std::map<std::string, std::unique_ptr<Stream> > FileStreams;
String path;
String part_name;
FileStreams streams;
NamesAndTypesList columns;
bool use_uncompressed_cache;

View File

@ -27,15 +27,17 @@ MergeTreeData::MergeTreeData(
const String & sign_column_,
const MergeTreeSettings & settings_,
const String & log_name_,
bool require_part_metadata_)
bool require_part_metadata_,
BrokenPartCallback broken_part_callback_)
: context(context_),
date_column_name(date_column_name_), sampling_expression(sampling_expression_),
index_granularity(index_granularity_),
mode(mode_), sign_column(sign_column_),
settings(settings_), primary_expr_ast(primary_expr_ast_->clone()),
require_part_metadata(require_part_metadata_),
full_path(full_path_), columns(columns_), log_name(log_name_),
log(&Logger::get(log_name + " (Data)"))
full_path(full_path_), columns(columns_),
broken_part_callback(broken_part_callback_),
log_name(log_name_), log(&Logger::get(log_name + " (Data)"))
{
/// создаём директорию, если её нет
Poco::File(full_path).createDirectories();

View File

@ -36,7 +36,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
table_name(name_), full_path(path_ + escapeForFileName(table_name) + '/'), zookeeper_path(zookeeper_path_),
replica_name(replica_name_),
data( full_path, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
index_granularity_, mode_, sign_column_, settings_, database_name_ + "." + table_name, true),
index_granularity_, mode_, sign_column_, settings_, database_name_ + "." + table_name, true,
std::bind(&StorageReplicatedMergeTree::enqueuePartForCheck, this, std::placeholders::_1)),
reader(data), writer(data), merger(data), fetcher(data),
log(&Logger::get(database_name_ + "." + table_name + " (StorageReplicatedMergeTree)")),
shutdown_event(false), permanent_shutdown_event(false)
@ -894,6 +895,10 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
return false;
}
}
/// Если ни у кого нет куска, и в очереди нет слияний с его участием, проверим, есть ли у кого-то покрывающий его.
if (replica.empty())
enqueuePartForCheck(entry.new_part_name);
}
catch (...)
{
@ -1315,6 +1320,7 @@ void StorageReplicatedMergeTree::partCheckThread()
}
LOG_WARNING(log, "Checking part " << part_name);
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecks);
auto part = data.getContainingPart(part_name);
String part_path = replica_path + "/parts/" + part_name;
@ -1327,6 +1333,7 @@ void StorageReplicatedMergeTree::partCheckThread()
{
LOG_WARNING(log, "Part " << part_name << " exists in ZooKeeper but not locally. "
"Removing from ZooKeeper and queueing a fetch.");
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
removePartAndEnqueueFetch(part_name);
}
@ -1362,6 +1369,7 @@ void StorageReplicatedMergeTree::partCheckThread()
*/
LOG_ERROR(log, "No replica has part covering " << part_name << ". This part is lost forever. "
<< "There might or might not be a data loss.");
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
/// Если ни у кого нет такого куска, удалим его из нашей очереди и добавим его в block_numbers.
@ -1432,7 +1440,8 @@ void StorageReplicatedMergeTree::partCheckThread()
{
tryLogCurrentException(__PRETTY_FUNCTION__);
LOG_INFO(log, "Part " << part_name << " looks broken. Removing it and queueing a fetch.");
LOG_ERROR(log, "Part " << part_name << " looks broken. Removing it and queueing a fetch.");
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
removePartAndEnqueueFetch(part_name);
@ -1443,6 +1452,8 @@ void StorageReplicatedMergeTree::partCheckThread()
/// Если куска нет в ZooKeeper, удалим его локально.
else
{
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
/// Если этот кусок еще и получен в результате слияния, это уже чересчур странно.
if (part->left != part->right)
{