This commit is contained in:
Michael Kolupaev 2014-08-08 14:41:53 +04:00
parent 08340ff6a0
commit dbe4dc64f3
2 changed files with 13 additions and 8 deletions

View File

@ -33,6 +33,7 @@ public:
UInt64 part_number = block_number_lock.getNumber();
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, part_number);
String part_name = ActiveDataPartSet::getPartName(part->left_date, part->right_date, part->left, part->right, part->level);
/// Если в запросе не указан ID, возьмем в качестве ID хеш от данных. То есть, не вставляем одинаковые данные дважды.
/// NOTE: Если такая дедупликация не нужна, можно вместо этого оставлять block_id пустым.
@ -42,13 +43,10 @@ public:
LOG_DEBUG(log, "Wrote block " << part_number << " with ID " << block_id << ", " << current_block.block.rows() << " rows");
MergeTreeData::Transaction transaction; /// Если не получится добавить кусок в ZK, снова уберем его из рабочего набора.
storage.data.renameTempPartAndAdd(part, nullptr, &transaction);
StorageReplicatedMergeTree::LogEntry log_entry;
log_entry.type = StorageReplicatedMergeTree::LogEntry::GET_PART;
log_entry.source_replica = storage.replica_name;
log_entry.new_part_name = part->name;
log_entry.new_part_name = part_name;
/// Одновременно добавим информацию о куске во все нужные места в ZooKeeper и снимем block_number_lock.
zkutil::Ops ops;
@ -75,7 +73,7 @@ public:
storage.zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent));
}
storage.checkPartAndAddToZooKeeper(part, ops);
storage.checkPartAndAddToZooKeeper(part, ops, part_name);
ops.push_back(new zkutil::Op::Create(
storage.zookeeper_path + "/log/log-",
log_entry.toString(),
@ -83,6 +81,9 @@ public:
zkutil::CreateMode::PersistentSequential));
block_number_lock.getUnlockOps(ops);
MergeTreeData::Transaction transaction; /// Если не получится добавить кусок в ZK, снова уберем его из рабочего набора.
storage.data.renameTempPartAndAdd(part, nullptr, &transaction);
try
{
auto code = storage.zookeeper->tryMulti(ops);

View File

@ -868,6 +868,10 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
zkutil::Ops ops;
checkPartAndAddToZooKeeper(part, ops);
/** TODO: Переименование нового куска лучше делать здесь, а не пятью строчками выше,
* чтобы оно было как можно ближе к zookeeper->multi.
*/
zookeeper->multi(ops);
/** При ZCONNECTIONLOSS или ZOPERATIONTIMEOUT можем зря откатить локальные изменения кусков.
@ -1807,12 +1811,12 @@ void StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart(part_name, zookeeper_path + "/replicas/" + replica_name, host, port);
zkutil::Ops ops;
checkPartAndAddToZooKeeper(part, ops, part_name);
MergeTreeData::Transaction transaction;
auto removed_parts = data.renameTempPartAndReplace(part, nullptr, &transaction);
zkutil::Ops ops;
checkPartAndAddToZooKeeper(part, ops);
zookeeper->multi(ops);
transaction.commit();
merge_selecting_event.set();