This commit is contained in:
Michael Kolupaev 2014-07-10 12:40:59 +04:00
parent dda1395dba
commit c335519c9c
4 changed files with 71 additions and 39 deletions

View File

@ -74,6 +74,25 @@ public:
}
}
String toString() const
{
String s;
{
WriteBufferFromString out(s);
writeText(out);
}
return s;
}
static NamesAndTypesList parse(const String & s)
{
ReadBufferFromString in(s);
NamesAndTypesList res;
res.readText(in);
assertEOF(in);
return res;
}
/// Все элементы rhs должны быть различны.
bool isSubsetOf(const NamesAndTypesList & rhs) const
{
@ -82,6 +101,16 @@ public:
std::sort(vector.begin(), vector.end());
return std::unique(vector.begin(), vector.end()) == vector.begin() + rhs.size();
}
/// Расстояние Хемминга между множествами
/// (иными словами, добавленные и удаленные столбцы считаются один раз; столбцы, изменившие тип, - дважды).
size_t sizeOfDifference(const NamesAndTypesList & rhs) const
{
NamesAndTypes vector(rhs.begin(), rhs.end());
vector.insert(vector.end(), begin(), end());
std::sort(vector.begin(), vector.end());
return (std::unique(vector.begin(), vector.end()) - vector.begin()) * 2 - size() - rhs.size();
}
};
typedef SharedPtr<NamesAndTypesList> NamesAndTypesListPtr;

View File

@ -67,6 +67,11 @@ public:
part->checksums.toString(),
storage.zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id + "/columns",
part->columns.toString(),
storage.zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id + "/number",
toString(part_number),

View File

@ -270,14 +270,14 @@ private:
/** Проверить, что список столбцов и настройки таблицы совпадают с указанными в ZK (/metadata).
* Если нет - бросить исключение.
*/
void checkTableStructure();
void checkTableStructure(bool skip_sanity_checks);
/** Проверить, что множество кусков соответствует тому, что в ZK (/replicas/me/parts/).
* Если каких-то кусков, описанных в ZK нет локально, бросить исключение.
* Если какие-то локальные куски не упоминаются в ZK, удалить их.
* Но если таких слишком много, на всякий случай бросить исключение - скорее всего, это ошибка конфигурации.
*/
void checkParts();
void checkParts(bool skip_sanity_checks);
/// Положить все куски из data в virtual_parts.
void initVirtualParts();

View File

@ -56,13 +56,26 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
if (!zookeeper->exists(zookeeper_path))
createTable();
checkTableStructure();
createReplica();
checkTableStructure(false);
createReplica(false);
}
else
{
checkTableStructure();
checkParts();
bool skip_sanity_checks = false;
if (zookeeper->exists(replica_path + "/flags/force_restore_data"))
{
skip_sanity_checks = true;
zookeeper->remove(replica_path + "/flags/force_restore_data");
if (skip_sanity_checks)
{
LOG_WARNING(log, "Skipping the limits on severity of changes to data parts (flag "
<< replica_path << "/flags/force_restore_data). " << sanity_report);
}
}
checkTableStructure(skip_sanity_checks);
checkParts(skip_sanity_checks);
}
initVirtualParts();
@ -141,15 +154,10 @@ void StorageReplicatedMergeTree::createTable()
metadata << "sign column: " << data.sign_column << std::endl;
metadata << "primary key: " << formattedAST(data.primary_expr_ast) << std::endl;
metadata << "columns:" << std::endl;
WriteBufferFromOStream buf(metadata);
for (auto & it : data.getColumnsList())
{
writeBackQuotedString(it.name, buf);
writeChar(' ', buf);
writeString(it.type->getName(), buf);
writeChar('\n', buf);
WriteBufferFromOStream buf(metadata);
data.getColumnsList().writeText(buf);
}
buf.next();
zookeeper->create(zookeeper_path + "/metadata", metadata.str(), zkutil::CreateMode::Persistent);
@ -163,7 +171,7 @@ void StorageReplicatedMergeTree::createTable()
/** Проверить, что список столбцов и настройки таблицы совпадают с указанными в ZK (/metadata).
* Если нет - бросить исключение.
*/
void StorageReplicatedMergeTree::checkTableStructure()
void StorageReplicatedMergeTree::checkTableStructure(bool skip_sanity_checks)
{
String metadata_str = zookeeper->get(zookeeper_path + "/metadata");
ReadBufferFromString buf(metadata_str);
@ -181,18 +189,17 @@ void StorageReplicatedMergeTree::checkTableStructure()
assertString("\nprimary key: ", buf);
assertString(formattedAST(data.primary_expr_ast), buf);
assertString("\ncolumns:\n", buf);
for (auto & it : data.getColumnsList())
{
String name;
readBackQuotedString(name, buf);
if (name != it.name)
throw Exception("Unexpected column name in ZooKeeper: expected " + it.name + ", found " + name,
ErrorCodes::UNKNOWN_IDENTIFIER);
assertString(" ", buf);
assertString(it.type->getName(), buf);
assertString("\n", buf);
}
NamesAndTypesList columns;
columns.readText(buf);
assertEOF(buf);
if (columns != data.getColumnsList())
{
if (data.getColumnsList().sizeOfDifference(columns) <= 2 || skip_sanity_checks)
LOG_WARNING(log, "Table structure in ZooKeeper is a little different from local table structure. Assuming ALTER.");
else
throw Exception("Table structure in ZooKeeper is very different from local table structure.",
ErrorCodes::INCOMPATIBLE_COLUMNS);
}
}
void StorageReplicatedMergeTree::createReplica()
@ -339,7 +346,7 @@ void StorageReplicatedMergeTree::activateReplica()
replica_is_active_node = zkutil::EphemeralNodeHolder::existing(replica_path + "/is_active", *zookeeper);
}
void StorageReplicatedMergeTree::checkParts()
void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
{
Strings expected_parts_vec = zookeeper->getChildren(replica_path + "/parts");
@ -391,13 +398,6 @@ void StorageReplicatedMergeTree::checkParts()
for (const String & name : parts_to_fetch)
expected_parts.erase(name);
bool skip_sanity_check = false;
if (zookeeper->exists(replica_path + "/flags/force_restore_data"))
{
skip_sanity_check = true;
zookeeper->remove(replica_path + "/flags/force_restore_data");
}
String sanity_report =
"There are " + toString(unexpected_parts.size()) + " unexpected parts, "
+ toString(parts_to_add.size()) + " unexpectedly merged parts, "
@ -409,12 +409,7 @@ void StorageReplicatedMergeTree::checkParts()
expected_parts.size() > 20 ||
parts_to_fetch.size() > 2;
if (skip_sanity_check)
{
LOG_WARNING(log, "Skipping the limits on severity of changes to data parts (flag "
<< replica_path << "/flags/force_restore_data). " << sanity_report);
}
else if (insane)
if (insane && !skip_sanity_checks)
{
throw Exception("The local set of parts of table " + getTableName() + " doesn't look like the set of parts in ZooKeeper. "
+ sanity_report,
@ -437,6 +432,7 @@ void StorageReplicatedMergeTree::checkParts()
LOG_ERROR(log, "Removing unexpectedly merged local part from ZooKeeper: " << name);
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/columns", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name, -1));
zookeeper->multi(ops);
@ -454,6 +450,7 @@ void StorageReplicatedMergeTree::checkParts()
/// Полагаемся, что это происходит до загрузки очереди (loadQueue).
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/columns", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name, -1));
ops.push_back(new zkutil::Op::Create(
@ -480,6 +477,7 @@ void StorageReplicatedMergeTree::initVirtualParts()
void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataPartPtr part, zkutil::Ops & ops)
{
asdqwe check columns (how to get data for multiple nodes simultaneously?)
String another_replica = findReplicaHavingPart(part->name, false);
if (!another_replica.empty())
{