update part set from ZK; skeleton for other threads

This commit is contained in:
Alexey Zatelepin 2017-12-29 18:11:34 +03:00
parent 9ba0afbf2e
commit b606258c28
4 changed files with 228 additions and 34 deletions

View File

@ -1,5 +1,4 @@
#include <Storages/NextGenReplication/NextGenReplicatedBlockOutputStream.h>
#include <Storages/NextGenReplication/StorageNextGenReplicatedMergeTree.h>
#include <Interpreters/PartLog.h>
#include <Common/hex.h>
@ -39,9 +38,9 @@ void NextGenReplicatedBlockOutputStream::write(const Block & block)
/// Generate a unique seqnum and ensure that the ephemeral part node is created before any other
/// part node with the higher seqnum.
String ephemeral_node_prefix = parts_path + "/tmp_insert_";
String ephemeral_node_prefix = parts_path + "/insert_" + part->info.partition_id + "_";
String ephemeral_node_path = zookeeper->create(
ephemeral_node_prefix, String(), zkutil::CreateMode::EphemeralSequential);
ephemeral_node_prefix, "replica: " + storage.replica_name, zkutil::CreateMode::EphemeralSequential);
SCOPE_EXIT(
{
if (!ephemeral_node_path.empty())
@ -60,6 +59,16 @@ void NextGenReplicatedBlockOutputStream::write(const Block & block)
else
part->name = part->info.getPartName();
Part * inserted_part = nullptr;
{
std::lock_guard<std::mutex> lock(storage.parts_mutex);
auto insertion = storage.parts.emplace(part->info, Part{part->info, Part::State::Ephemeral});
inserted_part = &insertion.first->second;
if (!insertion.second && inserted_part->state != Part::State::Ephemeral)
throw Exception("Part " + part->name + " already exists", ErrorCodes::DUPLICATE_DATA_PART);
/// TODO check covering and covered parts;
}
String part_path = parts_path + "/" + part->name;
String hash_string;
@ -73,18 +82,24 @@ void NextGenReplicatedBlockOutputStream::write(const Block & block)
writeHexByteLowercase(hash_data[i], &hash_string[2 * i]);
}
MergeTreeData::Transaction transaction;
{
std::lock_guard<std::mutex> lock(storage.parts_mutex);
storage.data.renameTempPartAndAdd(part, nullptr, &transaction);
inserted_part->state = Part::State::Prepared;
}
zkutil::Ops ops;
auto acl = zookeeper->getDefaultACL();
ops.emplace_back(std::make_unique<zkutil::Op::Remove>(ephemeral_node_path, -1));
ops.emplace_back(std::make_unique<zkutil::Op::Create>(
part_path, storage.replica_name, acl, zkutil::CreateMode::Persistent));
part_path, String(), acl, zkutil::CreateMode::Persistent));
ops.emplace_back(std::make_unique<zkutil::Op::Create>(
part_path + "/checksum", hash_string, acl, zkutil::CreateMode::Persistent));
MergeTreeData::Transaction transaction;
storage.data.renameTempPartAndAdd(part, nullptr, &transaction);
ops.emplace_back(std::make_unique<zkutil::Op::Create>(
part_path + "/replicas", String(), acl, zkutil::CreateMode::Persistent));
ops.emplace_back(std::make_unique<zkutil::Op::Create>(
part_path + "/replicas/" + storage.replica_name, String(), acl, zkutil::CreateMode::Persistent));
try
{
zookeeper->multi(ops);
@ -94,17 +109,34 @@ void NextGenReplicatedBlockOutputStream::write(const Block & block)
{
if (e.code == ZOPERATIONTIMEOUT || e.code == ZCONNECTIONLOSS)
{
transaction.commit();
/// TODO: mark that the status is unknown.
{
std::lock_guard<std::mutex> lock(storage.parts_mutex);
inserted_part->state = Part::State::MaybeCommitted;
transaction.commit();
}
throw Exception("Unknown status, client must retry. Reason: " + e.displayText(),
ErrorCodes::UNKNOWN_STATUS_OF_INSERT);
}
{
std::lock_guard<std::mutex> lock(storage.parts_mutex);
inserted_part->state = Part::State::Outdated;
}
throw;
}
transaction.commit();
storage.merge_selecting_event.set();
{
std::lock_guard<std::mutex> lock(storage.parts_mutex);
inserted_part->state = Part::State::Committed;
transaction.commit();
}
/// TODO: here we have no guarantee that there isn't any ephemeral part between the part that was
/// just inserted and the other parts. So we can schedule an incorrect merge.
/// Solution: store the max LSN seen by the part_set_updating_thread and don't schedule merges with
/// parts that are more recent than that LSN.
storage.merge_selecting_event->set();
PartLog::addNewPartToTheLog(storage.context, *part, watch.elapsed());
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Storages/NextGenReplication/StorageNextGenReplicatedMergeTree.h>
#include <common/logger_useful.h>
@ -22,6 +23,8 @@ public:
private:
StorageNextGenReplicatedMergeTree & storage;
Logger * log;
using Part = StorageNextGenReplicatedMergeTree::Part;
};
}

View File

@ -1,8 +1,10 @@
#include <Storages/NextGenReplication/StorageNextGenReplicatedMergeTree.h>
#include <Storages/NextGenReplication/NextGenReplicatedBlockOutputStream.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAddress.h>
#include <Common/Macros.h>
#include <Common/escapeForFileName.h>
namespace DB
{
@ -43,9 +45,9 @@ StorageNextGenReplicatedMergeTree::StorageNextGenReplicatedMergeTree(
merging_params_, settings_, true, attach,
[this] (const String & /* part_name */) { /* TODO: enqueue part for check */; })
, reader(data), writer(data), merger(data, context.getBackgroundPool())
, parts_fetcher(data)
, zookeeper_path(context.getMacros().expand(zookeeper_path_))
, replica_name(context.getMacros().expand(replica_name_))
, fetcher(data)
{
/// TODO: unify with StorageReplicatedMergeTree and move to common place
if (!zookeeper_path.empty() && zookeeper_path.back() == '/')
@ -106,12 +108,37 @@ void StorageNextGenReplicatedMergeTree::startup()
if (is_readonly)
return;
/// TODO: activate replica.
{
StoragePtr ptr = shared_from_this();
InterserverIOEndpointPtr endpoint = std::make_shared<DataPartsExchange::Service>(data, ptr);
parts_exchange_service = std::make_shared<InterserverIOEndpointHolder>(
endpoint->getId(replica_path), endpoint, context.getInterserverIOHandler());
}
StoragePtr ptr = shared_from_this();
InterserverIOEndpointPtr data_parts_exchange_endpoint = std::make_shared<DataPartsExchange::Service>(data, ptr);
data_parts_exchange_endpoint_holder = std::make_shared<InterserverIOEndpointHolder>(
data_parts_exchange_endpoint->getId(replica_path), data_parts_exchange_endpoint, context.getInterserverIOHandler());
{
auto host_port = context.getInterserverIOAddress();
/// How other replicas can find us.
ReplicatedMergeTreeAddress address;
address.host = host_port.first;
address.replication_port = host_port.second;
address.queries_port = context.getTCPPort();
address.database = database_name;
address.table = table_name;
auto zookeeper = getZooKeeper();
is_active_node = zkutil::EphemeralNodeHolder::create(
replica_path + "/is_active", *zookeeper, address.toString());
/// TODO: do something if the node already exists.
}
/// TODO initialize the part set from disk.
part_set_updating_thread = std::thread([this] { runPartSetUpdatingThread(); });
parts_producing_task = context.getBackgroundPool().addTask([this] { return runPartsProducingTask(); });
merge_selecting_thread = std::thread([this] { runMergeSelectingThread(); });
}
void StorageNextGenReplicatedMergeTree::createTableOrReplica(zkutil::ZooKeeper & zookeeper)
@ -125,6 +152,7 @@ void StorageNextGenReplicatedMergeTree::createTableOrReplica(zkutil::ZooKeeper &
zookeeper.createAncestors(zookeeper_path);
/// TODO: save metadata.
/// TODO: split parts node by partitions.
zkutil::Ops ops;
ops.emplace_back(std::make_unique<zkutil::Op::Create>(
@ -159,16 +187,38 @@ void StorageNextGenReplicatedMergeTree::createTableOrReplica(zkutil::ZooKeeper &
void StorageNextGenReplicatedMergeTree::shutdown()
{
/** This must be done before waiting for restarting_thread.
* Because restarting_thread will wait for finishing of tasks in background pool,
* and parts are fetched in that tasks.
*/
fetcher.blocker.cancelForever();
shutdown_called = true;
if (data_parts_exchange_endpoint_holder)
if (merge_selecting_thread.joinable())
{
data_parts_exchange_endpoint_holder->cancelForever();
data_parts_exchange_endpoint_holder = nullptr;
/// TODO: needs protection from concurrent drops.
merge_selecting_event->set();
merge_selecting_thread.join();
}
parts_fetcher.blocker.cancelForever();
merger.merges_blocker.cancelForever();
if (parts_producing_task)
{
context.getBackgroundPool().removeTask(parts_producing_task);
parts_producing_task.reset();
}
if (part_set_updating_thread.joinable())
{
/// TODO: needs protection from concurrent drops.
part_set_updating_event->set();
part_set_updating_thread.join();
}
is_active_node.reset();
if (parts_exchange_service)
{
parts_exchange_service->cancelForever();
parts_exchange_service.reset();
}
}
@ -233,4 +283,86 @@ zkutil::ZooKeeperPtr StorageNextGenReplicatedMergeTree::getZooKeeper()
return res;
}
void StorageNextGenReplicatedMergeTree::runPartSetUpdatingThread()
{
while (!shutdown_called)
{
try
{
auto zookeeper = getZooKeeper();
Strings part_nodes = zookeeper->getChildren(zookeeper_path + "/parts", nullptr, part_set_updating_event);
bool have_new_parts = false;
{
std::lock_guard<std::mutex> lock(parts_mutex);
/// TODO maybe not the best idea to lock the mutex for so long.
for (auto it = parts.begin(); it != parts.end(); )
{
if (it->second.state == Part::State::Ephemeral)
it = parts.erase(it);
else
++it;
}
for (const String & part_name : part_nodes)
{
if (startsWith(part_name, "insert_"))
{
/// TODO extract ephemeral part name creation/parsing code.
const char * begin = part_name.data() + strlen("insert_");
const char * end = part_name.data() + part_name.length();
String partition_id;
for (; begin != end && *begin != '_'; ++begin)
partition_id.push_back(*begin);
if (begin == end)
throw Exception("Bad ephemeral insert node name: " + part_name, ErrorCodes::LOGICAL_ERROR);
++begin;
UInt64 block_num = parse<UInt64>(begin, end - begin);
MergeTreePartInfo part_info(partition_id, block_num, block_num, 0);
parts.emplace(part_info, Part{part_info, Part::State::Ephemeral});
}
else
{
auto part_info = MergeTreePartInfo::fromPartName(part_name, data.format_version);
auto insertion = parts.emplace(part_info, Part{part_info, Part::State::Virtual});
if (insertion.second)
{
LOG_TRACE(log, "New part: " << part_name);
have_new_parts = true;
}
}
}
}
if (have_new_parts)
parts_producing_task->wake();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
part_set_updating_event->tryWait(1000);
}
part_set_updating_event->wait();
}
}
bool StorageNextGenReplicatedMergeTree::runPartsProducingTask()
{
return false;
}
void StorageNextGenReplicatedMergeTree::runMergeSelectingThread()
{
while (!shutdown_called)
{
merge_selecting_event->tryWait(1000);
}
}
}

View File

@ -85,6 +85,9 @@ private:
MergeTreeDataWriter writer;
MergeTreeDataMerger merger;
InterserverIOEndpointHolderPtr parts_exchange_service;
DataPartsExchange::Fetcher parts_fetcher;
String zookeeper_path;
String replica_name;
String replica_path;
@ -94,21 +97,45 @@ private:
zkutil::ZooKeeperPtr tryGetZooKeeper();
zkutil::ZooKeeperPtr getZooKeeper();
InterserverIOEndpointHolderPtr data_parts_exchange_endpoint_holder;
DataPartsExchange::Fetcher fetcher;
std::atomic<bool> is_readonly {false};
std::atomic<bool> shutdown_called {false};
std::atomic_bool is_readonly {false};
zkutil::EphemeralNodeHolderPtr is_active_node;
/// A thread that keeps track of the updates in the part set.
struct Part
{
enum class State
{
Ephemeral,
Virtual,
Preparing,
Prepared,
MaybeCommitted,
Committed,
Outdated,
Deleting,
};
MergeTreePartInfo info;
State state;
};
mutable std::mutex parts_mutex;
std::map<MergeTreePartInfo, Part> parts;
/// A thread that updates the part set based on ZooKeeper notifications.
std::thread part_set_updating_thread;
Poco::Event part_set_updating_event;
zkutil::EventPtr part_set_updating_event = std::make_shared<Poco::Event>();
void runPartSetUpdatingThread();
/// A task that performs actions to get needed parts.
BackgroundProcessingPool::TaskHandle task_execution_thread;
BackgroundProcessingPool::TaskHandle parts_producing_task;
bool runPartsProducingTask();
/// A thread that selects parts to merge.
std::thread merge_selecting_thread;
Poco::Event merge_selecting_event;
zkutil::EventPtr merge_selecting_event = std::make_shared<Poco::Event>();
void runMergeSelectingThread();
friend class NextGenReplicatedBlockOutputStream;
};