ClickHouse/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h

132 lines
5.3 KiB
C
Raw Normal View History

2014-04-02 07:59:43 +00:00
#pragma once
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/Storages/MergeTree/AbandonableLockInZooKeeper.h>
2014-05-07 13:58:20 +00:00
#include <Yandex/time2str.h>
2014-04-02 07:59:43 +00:00
namespace DB
{
class ReplicatedMergeTreeBlockOutputStream : public IBlockOutputStream
{
public:
ReplicatedMergeTreeBlockOutputStream(StorageReplicatedMergeTree & storage_, const String & insert_id_)
2014-05-08 07:12:01 +00:00
: storage(storage_), insert_id(insert_id_), block_index(0),
log(&Logger::get(storage.data.getLogName() + " (Replicated OutputStream)")) {}
2014-04-02 07:59:43 +00:00
void write(const Block & block) override
{
auto part_blocks = storage.writer.splitBlockIntoParts(block);
for (auto & current_block : part_blocks)
{
2014-05-27 08:43:01 +00:00
storage.data.delayInsertIfNeeded();
2014-04-02 07:59:43 +00:00
++block_index;
String block_id = insert_id.empty() ? "" : insert_id + "__" + toString(block_index);
2014-07-08 23:52:53 +00:00
time_t min_date_time = DateLUT::instance().fromDayNum(DayNum_t(current_block.min_date));
2014-05-07 13:58:20 +00:00
String month_name = toString(Date2OrderedIdentifier(min_date_time) / 100);
2014-07-07 12:08:42 +00:00
storage.zookeeper->createIfNotExists(storage.zookeeper_path + "/block_numbers/" + month_name, "");
2014-04-02 07:59:43 +00:00
AbandonableLockInZooKeeper block_number_lock(
2014-05-07 13:58:20 +00:00
storage.zookeeper_path + "/block_numbers/" + month_name + "/block-",
2014-05-13 10:10:26 +00:00
storage.zookeeper_path + "/temp", *storage.zookeeper);
2014-04-02 07:59:43 +00:00
UInt64 part_number = block_number_lock.getNumber();
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, part_number);
2014-04-25 12:43:10 +00:00
/// Если в запросе не указан ID, возьмем в качестве ID хеш от данных. То есть, не вставляем одинаковые данные дважды.
/// NOTE: Если такая дедупликация не нужна, можно вместо этого оставлять block_id пустым.
/// Можно для этого сделать настройку или синтаксис в запросе (например, ID=null).
if (block_id.empty())
block_id = part->checksums.summaryDataChecksum();
2014-07-07 10:23:24 +00:00
LOG_DEBUG(log, "Wrote block " << part_number << " with ID " << block_id << ", " << current_block.block.rows() << " rows");
2014-07-01 15:58:25 +00:00
MergeTreeData::Transaction transaction; /// Если не получится добавить кусок в ZK, снова уберем его из рабочего набора.
storage.data.renameTempPartAndAdd(part, nullptr, &transaction);
2014-04-02 10:10:37 +00:00
StorageReplicatedMergeTree::LogEntry log_entry;
log_entry.type = StorageReplicatedMergeTree::LogEntry::GET_PART;
2014-04-07 15:45:46 +00:00
log_entry.source_replica = storage.replica_name;
2014-04-02 10:10:37 +00:00
log_entry.new_part_name = part->name;
2014-04-02 07:59:43 +00:00
2014-04-02 10:10:37 +00:00
/// Одновременно добавим информацию о куске во все нужные места в ZooKeeper и снимем block_number_lock.
2014-04-02 07:59:43 +00:00
zkutil::Ops ops;
2014-04-02 13:45:39 +00:00
if (!block_id.empty())
{
ops.push_back(new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id,
"",
2014-05-13 10:10:26 +00:00
storage.zookeeper->getDefaultACL(),
2014-04-02 13:45:39 +00:00
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
2014-07-15 09:56:17 +00:00
storage.zookeeper_path + "/blocks/" + block_id + "/columns",
part->columns.toString(),
2014-05-13 10:10:26 +00:00
storage.zookeeper->getDefaultACL(),
2014-04-02 13:45:39 +00:00
zkutil::CreateMode::Persistent));
2014-07-10 08:40:59 +00:00
ops.push_back(new zkutil::Op::Create(
2014-07-15 09:56:17 +00:00
storage.zookeeper_path + "/blocks/" + block_id + "/checksums",
part->checksums.toString(),
2014-07-10 08:40:59 +00:00
storage.zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent));
2014-04-02 13:45:39 +00:00
ops.push_back(new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id + "/number",
toString(part_number),
2014-05-13 10:10:26 +00:00
storage.zookeeper->getDefaultACL(),
2014-04-02 13:45:39 +00:00
zkutil::CreateMode::Persistent));
}
2014-07-15 09:56:17 +00:00
storage.checkPartAndAddToZooKeeper(part, ops);
2014-04-02 10:10:37 +00:00
ops.push_back(new zkutil::Op::Create(
storage.replica_path + "/log/log-",
log_entry.toString(),
2014-05-13 10:10:26 +00:00
storage.zookeeper->getDefaultACL(),
2014-04-02 10:10:37 +00:00
zkutil::CreateMode::PersistentSequential));
block_number_lock.getUnlockOps(ops);
2014-04-02 07:59:43 +00:00
2014-06-27 17:21:31 +00:00
auto code = storage.zookeeper->tryMulti(ops);
2014-07-01 15:58:25 +00:00
if (code == ZOK)
2014-06-27 17:21:31 +00:00
{
2014-07-01 15:58:25 +00:00
transaction.commit();
2014-07-02 10:16:49 +00:00
storage.merge_selecting_event.set();
2014-07-01 15:58:25 +00:00
}
else if (code == ZNODEEXISTS)
{
/// Если блок с таким ID уже есть в таблице, откатим его вставку.
String expected_checksums_str;
if (!block_id.empty() && storage.zookeeper->tryGet(
storage.zookeeper_path + "/blocks/" + block_id + "/checksums", expected_checksums_str))
2014-06-27 17:21:31 +00:00
{
2014-07-01 15:58:25 +00:00
LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it (removing part " << part->name << ")");
auto expected_checksums = MergeTreeData::DataPart::Checksums::parse(expected_checksums_str);
/// Если данные отличались от тех, что были вставлены ранее с тем же ID, бросим исключение.
expected_checksums.checkEqual(part->checksums, true);
2014-06-27 17:21:31 +00:00
}
else
{
2014-07-01 15:58:25 +00:00
throw Exception("Unexpected ZNODEEXISTS while adding block " + toString(part_number) + " with ID " + block_id + ": "
2014-06-27 17:21:31 +00:00
+ zkutil::ZooKeeper::error2string(code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
}
}
2014-07-01 15:58:25 +00:00
else
{
throw Exception("Unexpected error while adding block " + toString(part_number) + " with ID " + block_id + ": "
+ zkutil::ZooKeeper::error2string(code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
}
2014-04-02 07:59:43 +00:00
}
}
private:
StorageReplicatedMergeTree & storage;
String insert_id;
size_t block_index;
2014-04-02 13:45:39 +00:00
Logger * log;
2014-04-02 07:59:43 +00:00
};
}