2014-04-02 07:59:43 +00:00
|
|
|
|
#pragma once
|
|
|
|
|
|
|
|
|
|
#include <DB/Storages/StorageReplicatedMergeTree.h>
|
|
|
|
|
#include <DB/Storages/MergeTree/AbandonableLockInZooKeeper.h>
|
2015-09-11 02:13:59 +00:00
|
|
|
|
#include <DB/Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
|
2015-04-16 06:12:35 +00:00
|
|
|
|
#include <DB/DataStreams/IBlockOutputStream.h>
|
2015-09-10 21:32:33 +00:00
|
|
|
|
#include <DB/IO/Operators.h>
|
2014-04-02 07:59:43 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
class ReplicatedMergeTreeBlockOutputStream : public IBlockOutputStream
|
|
|
|
|
{
|
|
|
|
|
public:
|
2015-09-10 20:43:42 +00:00
|
|
|
|
ReplicatedMergeTreeBlockOutputStream(StorageReplicatedMergeTree & storage_, const String & insert_id_, size_t quorum_)
|
|
|
|
|
: storage(storage_), insert_id(insert_id_), quorum(quorum_),
|
2015-09-11 02:13:59 +00:00
|
|
|
|
log(&Logger::get(storage.data.getLogName() + " (Replicated OutputStream)"))
|
|
|
|
|
{
|
|
|
|
|
/// Значение кворума 1 имеет такой же смысл, как если он отключён.
|
|
|
|
|
if (quorum == 1)
|
|
|
|
|
quorum = 0;
|
|
|
|
|
}
|
2014-04-02 07:59:43 +00:00
|
|
|
|
|
2014-09-17 22:57:02 +00:00
|
|
|
|
void writePrefix() override
|
|
|
|
|
{
|
|
|
|
|
/// TODO Можно ли здесь не блокировать структуру таблицы?
|
2014-10-17 01:05:51 +00:00
|
|
|
|
storage.data.delayInsertIfNeeded(&storage.restarting_thread->getWakeupEvent());
|
2014-09-17 22:57:02 +00:00
|
|
|
|
}
|
|
|
|
|
|
2014-04-02 07:59:43 +00:00
|
|
|
|
void write(const Block & block) override
|
|
|
|
|
{
|
2014-12-12 20:50:32 +00:00
|
|
|
|
auto zookeeper = storage.getZooKeeper();
|
|
|
|
|
|
|
|
|
|
assertSessionIsNotExpired(zookeeper);
|
2015-09-10 20:43:42 +00:00
|
|
|
|
|
|
|
|
|
/** Если запись с кворумом, то проверим, что требуемое количество реплик сейчас живо,
|
2015-09-11 02:13:59 +00:00
|
|
|
|
* а также что для всех предыдущих кусков, для которых требуется кворум, этот кворум достигнут.
|
2015-09-10 20:43:42 +00:00
|
|
|
|
*/
|
2015-09-11 02:13:59 +00:00
|
|
|
|
String quorum_status_path = storage.zookeeper_path + "/quorum/status";
|
2015-09-10 20:43:42 +00:00
|
|
|
|
if (quorum)
|
|
|
|
|
{
|
2015-09-10 21:32:33 +00:00
|
|
|
|
/// Список живых реплик. Все они регистрируют эфемерную ноду для leader_election.
|
|
|
|
|
auto live_replicas = zookeeper->getChildren(storage.zookeeper_path + "/leader_election");
|
|
|
|
|
|
|
|
|
|
if (live_replicas.size() < quorum)
|
|
|
|
|
throw Exception("Number of alive replicas ("
|
2015-09-11 02:51:35 +00:00
|
|
|
|
+ toString(live_replicas.size()) + ") is less than requested quorum (" + toString(quorum) + ").",
|
2015-09-10 21:32:33 +00:00
|
|
|
|
ErrorCodes::TOO_LESS_LIVE_REPLICAS);
|
|
|
|
|
|
2015-09-11 02:13:59 +00:00
|
|
|
|
/** Достигнут ли кворум для последнего куска, для которого нужен кворум?
|
|
|
|
|
* Запись всех кусков с включенным кворумом линейно упорядочена.
|
|
|
|
|
* Это значит, что в любой момент времени может быть только один кусок,
|
|
|
|
|
* для которого нужен, но ещё не достигнут кворум.
|
|
|
|
|
* Информация о таком куске будет расположена в ноде /quorum/status.
|
|
|
|
|
* Если кворум достигнут, то нода удаляется.
|
2015-09-10 21:32:33 +00:00
|
|
|
|
*/
|
2015-09-11 02:13:59 +00:00
|
|
|
|
|
|
|
|
|
String quorum_status;
|
|
|
|
|
bool quorum_unsatisfied = zookeeper->tryGet(quorum_status_path, quorum_status);
|
|
|
|
|
|
|
|
|
|
if (quorum_unsatisfied)
|
|
|
|
|
throw Exception("Quorum for previous write has not been satisfied yet. Status: " + quorum_status, ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE);
|
|
|
|
|
|
|
|
|
|
/// Обе проверки неявно делаются и позже (иначе был бы race condition).
|
2015-09-10 20:43:42 +00:00
|
|
|
|
}
|
|
|
|
|
|
2014-04-02 07:59:43 +00:00
|
|
|
|
auto part_blocks = storage.writer.splitBlockIntoParts(block);
|
2014-07-28 14:31:07 +00:00
|
|
|
|
|
2014-04-02 07:59:43 +00:00
|
|
|
|
for (auto & current_block : part_blocks)
|
|
|
|
|
{
|
2014-12-12 20:50:32 +00:00
|
|
|
|
assertSessionIsNotExpired(zookeeper);
|
2014-09-03 02:32:23 +00:00
|
|
|
|
|
2014-04-02 07:59:43 +00:00
|
|
|
|
++block_index;
|
|
|
|
|
String block_id = insert_id.empty() ? "" : insert_id + "__" + toString(block_index);
|
2014-11-06 06:32:23 +00:00
|
|
|
|
String month_name = toString(DateLUT::instance().toNumYYYYMMDD(DayNum_t(current_block.min_date)) / 100);
|
2014-05-07 13:58:20 +00:00
|
|
|
|
|
2014-08-07 09:23:55 +00:00
|
|
|
|
AbandonableLockInZooKeeper block_number_lock = storage.allocateBlockNumber(month_name);
|
2014-04-02 07:59:43 +00:00
|
|
|
|
|
2015-08-17 21:09:36 +00:00
|
|
|
|
Int64 part_number = block_number_lock.getNumber();
|
2014-04-02 07:59:43 +00:00
|
|
|
|
|
|
|
|
|
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, part_number);
|
2014-08-08 10:41:53 +00:00
|
|
|
|
String part_name = ActiveDataPartSet::getPartName(part->left_date, part->right_date, part->left, part->right, part->level);
|
2014-04-02 07:59:43 +00:00
|
|
|
|
|
2014-04-25 12:43:10 +00:00
|
|
|
|
/// Если в запросе не указан ID, возьмем в качестве ID хеш от данных. То есть, не вставляем одинаковые данные дважды.
|
|
|
|
|
/// NOTE: Если такая дедупликация не нужна, можно вместо этого оставлять block_id пустым.
|
|
|
|
|
/// Можно для этого сделать настройку или синтаксис в запросе (например, ID=null).
|
|
|
|
|
if (block_id.empty())
|
2015-09-11 02:13:59 +00:00
|
|
|
|
{
|
2014-04-25 12:43:10 +00:00
|
|
|
|
block_id = part->checksums.summaryDataChecksum();
|
|
|
|
|
|
2015-09-11 02:13:59 +00:00
|
|
|
|
if (block_id.empty())
|
|
|
|
|
throw Exception("Logical error: block_id is empty.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
}
|
|
|
|
|
|
2014-07-07 10:23:24 +00:00
|
|
|
|
LOG_DEBUG(log, "Wrote block " << part_number << " with ID " << block_id << ", " << current_block.block.rows() << " rows");
|
2014-06-27 14:05:38 +00:00
|
|
|
|
|
2014-04-02 10:10:37 +00:00
|
|
|
|
StorageReplicatedMergeTree::LogEntry log_entry;
|
|
|
|
|
log_entry.type = StorageReplicatedMergeTree::LogEntry::GET_PART;
|
2014-04-07 15:45:46 +00:00
|
|
|
|
log_entry.source_replica = storage.replica_name;
|
2014-08-08 10:41:53 +00:00
|
|
|
|
log_entry.new_part_name = part_name;
|
2015-09-11 02:13:59 +00:00
|
|
|
|
log_entry.quorum = quorum;
|
2014-04-02 07:59:43 +00:00
|
|
|
|
|
2014-04-02 10:10:37 +00:00
|
|
|
|
/// Одновременно добавим информацию о куске во все нужные места в ZooKeeper и снимем block_number_lock.
|
2015-09-11 02:13:59 +00:00
|
|
|
|
|
|
|
|
|
/// Информация о блоке.
|
2014-04-02 07:59:43 +00:00
|
|
|
|
zkutil::Ops ops;
|
2015-09-11 02:13:59 +00:00
|
|
|
|
|
|
|
|
|
ops.push_back(
|
|
|
|
|
new zkutil::Op::Create(
|
2014-04-02 13:45:39 +00:00
|
|
|
|
storage.zookeeper_path + "/blocks/" + block_id,
|
|
|
|
|
"",
|
2014-12-12 20:50:32 +00:00
|
|
|
|
zookeeper->getDefaultACL(),
|
2014-04-02 13:45:39 +00:00
|
|
|
|
zkutil::CreateMode::Persistent));
|
2015-09-11 02:13:59 +00:00
|
|
|
|
ops.push_back(
|
|
|
|
|
new zkutil::Op::Create(
|
2014-07-15 09:56:17 +00:00
|
|
|
|
storage.zookeeper_path + "/blocks/" + block_id + "/columns",
|
|
|
|
|
part->columns.toString(),
|
2014-12-12 20:50:32 +00:00
|
|
|
|
zookeeper->getDefaultACL(),
|
2014-04-02 13:45:39 +00:00
|
|
|
|
zkutil::CreateMode::Persistent));
|
2015-09-11 02:13:59 +00:00
|
|
|
|
ops.push_back(
|
|
|
|
|
new zkutil::Op::Create(
|
2014-07-15 09:56:17 +00:00
|
|
|
|
storage.zookeeper_path + "/blocks/" + block_id + "/checksums",
|
|
|
|
|
part->checksums.toString(),
|
2014-12-12 20:50:32 +00:00
|
|
|
|
zookeeper->getDefaultACL(),
|
2014-07-10 08:40:59 +00:00
|
|
|
|
zkutil::CreateMode::Persistent));
|
2015-09-11 02:13:59 +00:00
|
|
|
|
ops.push_back(
|
|
|
|
|
new zkutil::Op::Create(
|
2014-04-02 13:45:39 +00:00
|
|
|
|
storage.zookeeper_path + "/blocks/" + block_id + "/number",
|
|
|
|
|
toString(part_number),
|
2014-12-12 20:50:32 +00:00
|
|
|
|
zookeeper->getDefaultACL(),
|
2014-04-02 13:45:39 +00:00
|
|
|
|
zkutil::CreateMode::Persistent));
|
2015-09-11 02:13:59 +00:00
|
|
|
|
|
|
|
|
|
/// Информация о куске, в данных реплики.
|
2014-08-08 10:41:53 +00:00
|
|
|
|
storage.checkPartAndAddToZooKeeper(part, ops, part_name);
|
2015-09-11 02:13:59 +00:00
|
|
|
|
|
|
|
|
|
/// Лог репликации.
|
2014-04-02 10:10:37 +00:00
|
|
|
|
ops.push_back(new zkutil::Op::Create(
|
2014-07-15 14:37:49 +00:00
|
|
|
|
storage.zookeeper_path + "/log/log-",
|
2014-04-02 10:10:37 +00:00
|
|
|
|
log_entry.toString(),
|
2014-12-12 20:50:32 +00:00
|
|
|
|
zookeeper->getDefaultACL(),
|
2014-04-02 10:10:37 +00:00
|
|
|
|
zkutil::CreateMode::PersistentSequential));
|
2015-09-11 02:13:59 +00:00
|
|
|
|
|
|
|
|
|
/// Удаление информации о том, что номер блока используется для записи.
|
2014-04-02 10:10:37 +00:00
|
|
|
|
block_number_lock.getUnlockOps(ops);
|
2014-04-02 07:59:43 +00:00
|
|
|
|
|
2015-09-11 02:13:59 +00:00
|
|
|
|
/** Если нужен кворум - создание узла, в котором отслеживается кворум.
|
|
|
|
|
* (Если такой узел уже существует - значит кто-то успел одновременно сделать другую кворумную запись, но для неё кворум ещё не достигнут.
|
|
|
|
|
* Делать в это время следующую кворумную запись нельзя.)
|
|
|
|
|
*/
|
|
|
|
|
if (quorum)
|
|
|
|
|
{
|
|
|
|
|
static std::once_flag once_flag;
|
|
|
|
|
std::call_once(once_flag, [&]
|
|
|
|
|
{
|
|
|
|
|
zookeeper->createIfNotExists(storage.zookeeper_path + "/quorum", "");
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
ReplicatedMergeTreeQuorumEntry quorum_entry;
|
|
|
|
|
quorum_entry.part_name = part_name;
|
|
|
|
|
quorum_entry.required_number_of_replicas = quorum;
|
|
|
|
|
quorum_entry.replicas.insert(storage.replica_name);
|
|
|
|
|
|
|
|
|
|
/** В данный момент, этот узел будет содержать информацию о том, что текущая реплика получила кусок.
|
|
|
|
|
* Когда другие реплики будут получать этот кусок (обычным способом, обрабатывая лог репликации),
|
|
|
|
|
* они будут добавлять себя в содержимое этого узла.
|
|
|
|
|
* Когда в нём будет информация о quorum количестве реплик, этот узел удаляется,
|
|
|
|
|
* что говорит о том, что кворум достигнут.
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
ops.push_back(
|
|
|
|
|
new zkutil::Op::Create(
|
|
|
|
|
quorum_status_path,
|
|
|
|
|
quorum_entry.toString(),
|
|
|
|
|
zookeeper->getDefaultACL(),
|
|
|
|
|
zkutil::CreateMode::Persistent));
|
|
|
|
|
}
|
|
|
|
|
|
2014-08-08 10:41:53 +00:00
|
|
|
|
MergeTreeData::Transaction transaction; /// Если не получится добавить кусок в ZK, снова уберем его из рабочего набора.
|
|
|
|
|
storage.data.renameTempPartAndAdd(part, nullptr, &transaction);
|
|
|
|
|
|
2014-07-25 11:15:11 +00:00
|
|
|
|
try
|
2014-06-27 17:21:31 +00:00
|
|
|
|
{
|
2014-12-12 20:50:32 +00:00
|
|
|
|
auto code = zookeeper->tryMulti(ops);
|
2014-07-25 11:15:11 +00:00
|
|
|
|
if (code == ZOK)
|
2014-06-27 17:21:31 +00:00
|
|
|
|
{
|
2014-07-25 11:15:11 +00:00
|
|
|
|
transaction.commit();
|
|
|
|
|
storage.merge_selecting_event.set();
|
|
|
|
|
}
|
|
|
|
|
else if (code == ZNODEEXISTS)
|
|
|
|
|
{
|
|
|
|
|
/// Если блок с таким ID уже есть в таблице, откатим его вставку.
|
|
|
|
|
String expected_checksums_str;
|
2014-12-12 20:50:32 +00:00
|
|
|
|
if (!block_id.empty() && zookeeper->tryGet(
|
2014-07-25 11:15:11 +00:00
|
|
|
|
storage.zookeeper_path + "/blocks/" + block_id + "/checksums", expected_checksums_str))
|
|
|
|
|
{
|
|
|
|
|
LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it (removing part " << part->name << ")");
|
|
|
|
|
|
|
|
|
|
auto expected_checksums = MergeTreeData::DataPart::Checksums::parse(expected_checksums_str);
|
|
|
|
|
|
|
|
|
|
/// Если данные отличались от тех, что были вставлены ранее с тем же ID, бросим исключение.
|
|
|
|
|
expected_checksums.checkEqual(part->checksums, true);
|
2015-09-09 19:03:46 +00:00
|
|
|
|
|
|
|
|
|
/// У part-а уменьшится refcount, и его смогут удалить сразу при откате транзакции, а не позже.
|
|
|
|
|
part.reset();
|
|
|
|
|
transaction.rollback();
|
2014-07-25 11:15:11 +00:00
|
|
|
|
}
|
2015-09-11 02:13:59 +00:00
|
|
|
|
else if (zookeeper->exists(quorum_status_path))
|
|
|
|
|
{
|
|
|
|
|
part.reset();
|
|
|
|
|
transaction.rollback();
|
|
|
|
|
|
|
|
|
|
throw Exception("Another quorum insert has been already started", ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE);
|
|
|
|
|
}
|
2014-07-25 11:15:11 +00:00
|
|
|
|
else
|
|
|
|
|
{
|
2015-09-11 02:13:59 +00:00
|
|
|
|
/// Сюда можем попасть также, если узел с кворумом существовал, но потом быстро был удалён.
|
|
|
|
|
|
2014-07-25 11:15:11 +00:00
|
|
|
|
throw Exception("Unexpected ZNODEEXISTS while adding block " + toString(part_number) + " with ID " + block_id + ": "
|
|
|
|
|
+ zkutil::ZooKeeper::error2string(code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
|
|
|
|
|
}
|
2014-06-27 17:21:31 +00:00
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
2014-07-25 11:15:11 +00:00
|
|
|
|
throw Exception("Unexpected error while adding block " + toString(part_number) + " with ID " + block_id + ": "
|
2014-06-27 17:21:31 +00:00
|
|
|
|
+ zkutil::ZooKeeper::error2string(code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
|
|
|
|
|
}
|
|
|
|
|
}
|
2014-10-02 23:31:18 +00:00
|
|
|
|
catch (const zkutil::KeeperException & e)
|
2014-07-01 15:58:25 +00:00
|
|
|
|
{
|
2014-07-25 11:15:11 +00:00
|
|
|
|
/** Если потерялось соединение, и мы не знаем, применились ли изменения, нельзя удалять локальный кусок:
|
|
|
|
|
* если изменения применились, в /blocks/ появился вставленный блок, и его нельзя будет вставить снова.
|
|
|
|
|
*/
|
|
|
|
|
if (e.code == ZOPERATIONTIMEOUT ||
|
|
|
|
|
e.code == ZCONNECTIONLOSS)
|
|
|
|
|
{
|
|
|
|
|
transaction.commit();
|
2014-07-25 16:09:58 +00:00
|
|
|
|
storage.enqueuePartForCheck(part->name);
|
2014-07-25 11:15:11 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
throw;
|
2014-07-01 15:58:25 +00:00
|
|
|
|
}
|
2015-09-11 02:13:59 +00:00
|
|
|
|
|
|
|
|
|
if (quorum)
|
|
|
|
|
{
|
|
|
|
|
/// Дожидаемся достижения кворума. TODO Настраиваемый таймаут.
|
|
|
|
|
LOG_TRACE(log, "Waiting for quorum");
|
|
|
|
|
zookeeper->waitForDisappear(quorum_status_path);
|
|
|
|
|
LOG_TRACE(log, "Quorum satisfied");
|
|
|
|
|
}
|
2014-04-02 07:59:43 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private:
|
|
|
|
|
StorageReplicatedMergeTree & storage;
|
|
|
|
|
String insert_id;
|
2015-09-10 20:43:42 +00:00
|
|
|
|
size_t quorum;
|
2015-04-16 06:12:35 +00:00
|
|
|
|
size_t block_index = 0;
|
2014-04-02 13:45:39 +00:00
|
|
|
|
|
|
|
|
|
Logger * log;
|
2014-09-04 20:26:14 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/// Позволяет проверить, что сессия в ZooKeeper ещё жива.
|
2014-12-12 20:50:32 +00:00
|
|
|
|
void assertSessionIsNotExpired(zkutil::ZooKeeperPtr & zookeeper)
|
2014-09-04 20:26:14 +00:00
|
|
|
|
{
|
2014-12-12 20:50:32 +00:00
|
|
|
|
if (zookeeper->expired())
|
2014-09-04 20:26:14 +00:00
|
|
|
|
throw Exception("ZooKeeper session has been expired.", ErrorCodes::NO_ZOOKEEPER);
|
|
|
|
|
}
|
2014-04-02 07:59:43 +00:00
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
}
|