mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 16:50:48 +00:00
ShardPartitionPiece task added
This commit is contained in:
parent
b481682a1e
commit
22789a3b55
@ -1,4 +1,4 @@
|
||||
set(CLICKHOUSE_COPIER_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/ClusterCopier.cpp)
|
||||
set(CLICKHOUSE_COPIER_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/ClusterCopierApp.cpp)
|
||||
set(CLICKHOUSE_COPIER_LINK PRIVATE clickhouse_common_zookeeper clickhouse_parsers clickhouse_functions clickhouse_table_functions clickhouse_aggregate_functions clickhouse_dictionaries string_utils ${Poco_XML_LIBRARY} PUBLIC daemon)
|
||||
set(CLICKHOUSE_COPIER_INCLUDE SYSTEM PRIVATE ${PCG_RANDOM_INCLUDE_DIR})
|
||||
|
||||
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
173
dbms/programs/copier/ClusterCopierApp.cpp
Normal file
173
dbms/programs/copier/ClusterCopierApp.cpp
Normal file
@ -0,0 +1,173 @@
|
||||
#include "ClusterCopierApp.h"
|
||||
#include "ClusterCopier.h"
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// ClusterCopierApp
|
||||
|
||||
void ClusterCopierApp::initialize(Poco::Util::Application & self)
|
||||
{
|
||||
is_help = config().has("help");
|
||||
if (is_help)
|
||||
return;
|
||||
|
||||
config_xml_path = config().getString("config-file");
|
||||
task_path = config().getString("task-path");
|
||||
log_level = config().getString("log-level", "trace");
|
||||
is_safe_mode = config().has("safe-mode");
|
||||
if (config().has("copy-fault-probability"))
|
||||
copy_fault_probability = std::max(std::min(config().getDouble("copy-fault-probability"), 1.0), 0.0);
|
||||
base_dir = (config().has("base-dir")) ? config().getString("base-dir") : Poco::Path::current();
|
||||
// process_id is '<hostname>#<start_timestamp>_<pid>'
|
||||
time_t timestamp = Poco::Timestamp().epochTime();
|
||||
auto curr_pid = Poco::Process::id();
|
||||
|
||||
process_id = std::to_string(DateLUT::instance().toNumYYYYMMDDhhmmss(timestamp)) + "_" + std::to_string(curr_pid);
|
||||
host_id = escapeForFileName(getFQDNOrHostName()) + '#' + process_id;
|
||||
process_path = Poco::Path(base_dir + "/clickhouse-copier_" + process_id).absolute().toString();
|
||||
Poco::File(process_path).createDirectories();
|
||||
|
||||
/// Override variables for BaseDaemon
|
||||
if (config().has("log-level"))
|
||||
config().setString("logger.level", config().getString("log-level"));
|
||||
|
||||
if (config().has("base-dir") || !config().has("logger.log"))
|
||||
config().setString("logger.log", process_path + "/log.log");
|
||||
|
||||
if (config().has("base-dir") || !config().has("logger.errorlog"))
|
||||
config().setString("logger.errorlog", process_path + "/log.err.log");
|
||||
|
||||
Base::initialize(self);
|
||||
}
|
||||
|
||||
|
||||
void ClusterCopierApp::handleHelp(const std::string &, const std::string &)
|
||||
{
|
||||
Poco::Util::HelpFormatter helpFormatter(options());
|
||||
helpFormatter.setCommand(commandName());
|
||||
helpFormatter.setHeader("Copies tables from one cluster to another");
|
||||
helpFormatter.setUsage("--config-file <config-file> --task-path <task-path>");
|
||||
helpFormatter.format(std::cerr);
|
||||
|
||||
stopOptionsProcessing();
|
||||
}
|
||||
|
||||
|
||||
void ClusterCopierApp::defineOptions(Poco::Util::OptionSet & options)
|
||||
{
|
||||
Base::defineOptions(options);
|
||||
|
||||
options.addOption(Poco::Util::Option("task-path", "", "path to task in ZooKeeper")
|
||||
.argument("task-path").binding("task-path"));
|
||||
options.addOption(Poco::Util::Option("task-file", "", "path to task file for uploading in ZooKeeper to task-path")
|
||||
.argument("task-file").binding("task-file"));
|
||||
options.addOption(Poco::Util::Option("task-upload-force", "", "Force upload task-file even node already exists")
|
||||
.argument("task-upload-force").binding("task-upload-force"));
|
||||
options.addOption(Poco::Util::Option("safe-mode", "", "disables ALTER DROP PARTITION in case of errors")
|
||||
.binding("safe-mode"));
|
||||
options.addOption(Poco::Util::Option("copy-fault-probability", "", "the copying fails with specified probability (used to test partition state recovering)")
|
||||
.argument("copy-fault-probability").binding("copy-fault-probability"));
|
||||
options.addOption(Poco::Util::Option("log-level", "", "sets log level")
|
||||
.argument("log-level").binding("log-level"));
|
||||
options.addOption(Poco::Util::Option("base-dir", "", "base directory for copiers, consecutive copier launches will populate /base-dir/launch_id/* directories")
|
||||
.argument("base-dir").binding("base-dir"));
|
||||
|
||||
using Me = std::decay_t<decltype(*this)>;
|
||||
options.addOption(Poco::Util::Option("help", "", "produce this help message").binding("help")
|
||||
.callback(Poco::Util::OptionCallback<Me>(this, &Me::handleHelp)));
|
||||
}
|
||||
|
||||
|
||||
void ClusterCopierApp::mainImpl()
|
||||
{
|
||||
StatusFile status_file(process_path + "/status");
|
||||
ThreadStatus thread_status;
|
||||
|
||||
auto log = &logger();
|
||||
LOG_INFO(log, "Starting clickhouse-copier ("
|
||||
<< "id " << process_id << ", "
|
||||
<< "host_id " << host_id << ", "
|
||||
<< "path " << process_path << ", "
|
||||
<< "revision " << ClickHouseRevision::get() << ")");
|
||||
|
||||
auto context = std::make_unique<Context>(Context::createGlobal());
|
||||
context->makeGlobalContext();
|
||||
SCOPE_EXIT(context->shutdown());
|
||||
|
||||
context->setConfig(loaded_config.configuration);
|
||||
context->setApplicationType(Context::ApplicationType::LOCAL);
|
||||
context->setPath(process_path);
|
||||
|
||||
registerFunctions();
|
||||
registerAggregateFunctions();
|
||||
registerTableFunctions();
|
||||
registerStorages();
|
||||
registerDictionaries();
|
||||
registerDisks();
|
||||
|
||||
static const std::string default_database = "_local";
|
||||
context->addDatabase(default_database, std::make_shared<DatabaseMemory>(default_database));
|
||||
context->setCurrentDatabase(default_database);
|
||||
|
||||
/// Initialize query scope just in case.
|
||||
CurrentThread::QueryScope query_scope(*context);
|
||||
|
||||
auto copier = std::make_unique<ClusterCopier>(task_path, host_id, default_database, *context);
|
||||
copier->setSafeMode(is_safe_mode);
|
||||
copier->setCopyFaultProbability(copy_fault_probability);
|
||||
|
||||
auto task_file = config().getString("task-file", "");
|
||||
if (!task_file.empty())
|
||||
copier->uploadTaskDescription(task_path, task_file, config().getBool("task-upload-force", false));
|
||||
|
||||
copier->init();
|
||||
copier->process(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(context->getSettingsRef()));
|
||||
|
||||
/// Reset ZooKeeper before removing ClusterCopier.
|
||||
/// Otherwise zookeeper watch can call callback which use already removed ClusterCopier object.
|
||||
context->resetZooKeeper();
|
||||
}
|
||||
|
||||
|
||||
int ClusterCopierApp::main(const std::vector<std::string> &)
|
||||
{
|
||||
if (is_help)
|
||||
return 0;
|
||||
|
||||
try
|
||||
{
|
||||
mainImpl();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(&Poco::Logger::root(), __PRETTY_FUNCTION__);
|
||||
auto code = getCurrentExceptionCode();
|
||||
|
||||
return (code) ? code : -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
#pragma GCC diagnostic ignored "-Wunused-function"
|
||||
#pragma GCC diagnostic ignored "-Wmissing-declarations"
|
||||
|
||||
int mainEntryClickHouseClusterCopier(int argc, char ** argv)
|
||||
{
|
||||
try
|
||||
{
|
||||
DB::ClusterCopierApp app;
|
||||
return app.run(argc, argv);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
std::cerr << DB::getCurrentExceptionMessage(true) << "\n";
|
||||
auto code = DB::getCurrentExceptionCode();
|
||||
|
||||
return (code) ? code : -1;
|
||||
}
|
||||
}
|
87
dbms/programs/copier/ClusterCopierApp.h
Normal file
87
dbms/programs/copier/ClusterCopierApp.h
Normal file
@ -0,0 +1,87 @@
|
||||
#pragma once
|
||||
#include <Poco/Util/ServerApplication.h>
|
||||
#include <daemon/BaseDaemon.h>
|
||||
|
||||
/* clickhouse cluster copier util
|
||||
* Copies tables data from one cluster to new tables of other (possibly the same) cluster in distributed fault-tolerant manner.
|
||||
*
|
||||
* See overview in the docs: docs/en/utils/clickhouse-copier.md
|
||||
*
|
||||
* Implementation details:
|
||||
*
|
||||
* cluster-copier workers pull each partition of each shard of the source cluster and push it to the destination cluster through
|
||||
* Distributed table (to preform data resharding). So, worker job is a partition of a source shard.
|
||||
* A job has three states: Active, Finished and Abandoned. Abandoned means that worker died and did not finish the job.
|
||||
*
|
||||
* If an error occurred during the copying (a worker failed or a worker did not finish the INSERT), then the whole partition (on
|
||||
* all destination servers) should be dropped and refilled. So, copying entity is a partition of all destination shards.
|
||||
* If a failure is detected a special /is_dirty node is created in ZooKeeper signalling that other workers copying the same partition
|
||||
* should stop, after a refilling procedure should start.
|
||||
*
|
||||
* ZooKeeper task node has the following structure:
|
||||
* /task/path_root - path passed in --task-path parameter
|
||||
* /description - contains user-defined XML config of the task
|
||||
* /task_active_workers - contains ephemeral nodes of all currently active workers, used to implement max_workers limitation
|
||||
* /server_fqdn#PID_timestamp - cluster-copier worker ID
|
||||
* ...
|
||||
* /tables - directory with table tasks
|
||||
* /cluster.db.table1 - directory of table_hits task
|
||||
* /partition1 - directory for partition1
|
||||
* /shards - directory for source cluster shards
|
||||
* /1 - worker job for the first shard of partition1 of table test.hits
|
||||
* Contains info about current status (Active or Finished) and worker ID.
|
||||
* /2
|
||||
* ...
|
||||
* /partition_active_workers
|
||||
* /1 - for each job in /shards a corresponding ephemeral node created in /partition_active_workers
|
||||
* It is used to detect Abandoned jobs (if there is Active node in /shards and there is no node in
|
||||
* /partition_active_workers).
|
||||
* Also, it is used to track active workers in the partition (when we need to refill the partition we do
|
||||
* not DROP PARTITION while there are active workers)
|
||||
* /2
|
||||
* ...
|
||||
* /is_dirty - the node is set if some worker detected that an error occurred (the INSERT is failed or an Abandoned node is
|
||||
* detected). If the node appeared workers in this partition should stop and start cleaning and refilling
|
||||
* partition procedure.
|
||||
* During this procedure a single 'cleaner' worker is selected. The worker waits for stopping all partition
|
||||
* workers, removes /shards node, executes DROP PARTITION on each destination node and removes /is_dirty node.
|
||||
* /cleaner- An ephemeral node used to select 'cleaner' worker. Contains ID of the worker.
|
||||
* /cluster.db.table2
|
||||
* ...
|
||||
*/
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class ClusterCopierApp : public BaseDaemon
|
||||
{
|
||||
public:
|
||||
|
||||
void initialize(Poco::Util::Application & self) override;
|
||||
|
||||
void handleHelp(const std::string &, const std::string &);
|
||||
|
||||
void defineOptions(Poco::Util::OptionSet & options) override;
|
||||
|
||||
int main(const std::vector<std::string> &) override;
|
||||
|
||||
private:
|
||||
|
||||
using Base = BaseDaemon;
|
||||
|
||||
void mainImpl();
|
||||
|
||||
std::string config_xml_path;
|
||||
std::string task_path;
|
||||
std::string log_level = "trace";
|
||||
bool is_safe_mode = false;
|
||||
double copy_fault_probability = 0;
|
||||
bool is_help = false;
|
||||
|
||||
std::string base_dir;
|
||||
std::string process_path;
|
||||
std::string process_id;
|
||||
std::string host_id;
|
||||
};
|
||||
|
||||
}
|
@ -148,12 +148,14 @@ struct TaskStateWithOwner
|
||||
|
||||
|
||||
/// Hierarchical description of the tasks
|
||||
struct ShardPartitionPiece;
|
||||
struct ShardPartition;
|
||||
struct TaskShard;
|
||||
struct TaskTable;
|
||||
struct TaskCluster;
|
||||
struct ClusterPartition;
|
||||
|
||||
using PartitionPieces = std::vector<ShardPartitionPiece>;
|
||||
using TasksPartition = std::map<String, ShardPartition, std::greater<>>;
|
||||
using ShardInfo = Cluster::ShardInfo;
|
||||
using TaskShardPtr = std::shared_ptr<TaskShard>;
|
||||
@ -162,22 +164,59 @@ using TasksTable = std::list<TaskTable>;
|
||||
using ClusterPartitions = std::map<String, ClusterPartition, std::greater<>>;
|
||||
|
||||
|
||||
struct ShardPartitionPiece
|
||||
{
|
||||
ShardPartitionPiece(ShardPartition & parent, size_t current_piece_number_, bool is_absent_piece_)
|
||||
: is_absent_piece(is_absent_piece_)
|
||||
, current_piece_number(current_piece_number_)
|
||||
, shard_partition(parent) {}
|
||||
|
||||
[[maybe_unused]] String getPartitionPiecePath() const {return "Not implemented.";}
|
||||
[[maybe_unused]] String getPartitionPieceCleanStartPath() const {return "Not implemented.";}
|
||||
[[maybe_unused]] String getCommonPartitionPieceIsDirtyPath() const {return "Not implemented.";}
|
||||
[[maybe_unused]] String getCommonPartitionPieceIsCleanedPath() const {return "Not implemented.";}
|
||||
|
||||
[[maybe_unused]] String getPartitionPieceActiveWorkersPath() const {return "Not implemented.";}
|
||||
[[maybe_unused]] String getActiveWorkerPath() const {return "Not implemented.";}
|
||||
|
||||
/// On what shards do we have current partition.
|
||||
[[maybe_unused]] String getPartitionPieceShardsPath() const {return "Not implemented.";}
|
||||
[[maybe_unused]] String getShardStatusPath() const {return "Not implemented.";}
|
||||
|
||||
bool is_absent_piece;
|
||||
const size_t current_piece_number;
|
||||
|
||||
ShardPartition & shard_partition;
|
||||
};
|
||||
|
||||
|
||||
/// Just destination partition of a shard
|
||||
/// I don't know what this comment means.
|
||||
/// In short, when we discovered what shards contain currently processing partition,
|
||||
/// This class describes a partition (name) that is stored on the shard (parent).
|
||||
struct ShardPartition
|
||||
{
|
||||
ShardPartition(TaskShard & parent, String name_quoted_) : task_shard(parent), name(std::move(name_quoted_)) {}
|
||||
ShardPartition(TaskShard & parent, String name_quoted_, size_t number_of_splits = 10)
|
||||
: task_shard(parent)
|
||||
, name(std::move(name_quoted_))
|
||||
{ pieces.reserve(number_of_splits); }
|
||||
|
||||
String getPartitionPath() const;
|
||||
String getPartitionCleanStartPath() const;
|
||||
String getCommonPartitionIsDirtyPath() const;
|
||||
String getCommonPartitionIsCleanedPath() const;
|
||||
String getPartitionActiveWorkersPath() const;
|
||||
String getActiveWorkerPath() const;
|
||||
String getPartitionShardsPath() const;
|
||||
String getShardStatusPath() const;
|
||||
/*useful*/ String getPartitionPath() const;
|
||||
[[maybe_unused]] String getPartitionPiecePath(size_t current_piece_number) const;
|
||||
/*useful*/ String getPartitionCleanStartPath() const;
|
||||
[[maybe_unused]] String getPartitionPieceCleanStartPath(size_t current_piece_number) const;
|
||||
/*useful*/ String getCommonPartitionIsDirtyPath() const;
|
||||
/*useful*/ String getCommonPartitionIsCleanedPath() const;
|
||||
/*??????*/ String getPartitionActiveWorkersPath() const;
|
||||
/*??????*/ String getActiveWorkerPath() const;
|
||||
/*useful*/ String getPartitionShardsPath() const;
|
||||
/*useful*/ String getShardStatusPath() const;
|
||||
|
||||
/// What partition pieces are present in current shard.
|
||||
/// FYI: Piece is a part of partition which has modulo equals to concrete constant (less than number_of_splits obliously)
|
||||
/// For example SELECT ... from ... WHERE partition=current_partition AND cityHash64(*) == const;
|
||||
/// Absent pieces have field is_absent_piece equals to true.
|
||||
PartitionPieces pieces;
|
||||
|
||||
TaskShard & task_shard;
|
||||
String name;
|
||||
@ -255,7 +294,7 @@ struct TaskTable
|
||||
TaskCluster & task_cluster;
|
||||
|
||||
String getPartitionPath(const String & partition_name) const;
|
||||
[[maybe_unused]] String getPartitionPathWithPieceNumber(const String & partition_name, size_t current_piece_number) const;
|
||||
[[maybe_unused]] String getPartitionPiecePath(const String & partition_name, size_t current_piece_number) const;
|
||||
String getPartitionIsDirtyPath(const String & partition_name) const;
|
||||
String getPartitionIsCleanedPath(const String & partition_name) const;
|
||||
String getPartitionTaskStatusPath(const String & partition_name) const;
|
||||
@ -422,8 +461,9 @@ String TaskTable::getPartitionPath(const String & partition_name) const
|
||||
+ "/" + escapeForFileName(partition_name); // 201701
|
||||
}
|
||||
|
||||
String TaskTable::getPartitionPathWithPieceNumber(const String & partition_name, size_t current_piece_number) const
|
||||
String TaskTable::getPartitionPiecePath(const String & partition_name, size_t current_piece_number) const
|
||||
{
|
||||
assert(current_piece_number < number_of_splits);
|
||||
return getPartitionPath(partition_name) + "/" + std::to_string(current_piece_number); // 1...number_of_splits
|
||||
}
|
||||
|
||||
@ -432,11 +472,23 @@ String ShardPartition::getPartitionCleanStartPath() const
|
||||
return getPartitionPath() + "/clean_start";
|
||||
}
|
||||
|
||||
String ShardPartition::getPartitionPieceCleanStartPath(size_t current_piece_number) const
|
||||
{
|
||||
assert(current_piece_number < task_shard.task_table.number_of_splits);
|
||||
return getPartitionPiecePath(current_piece_number) + "/clean_start";
|
||||
}
|
||||
|
||||
String ShardPartition::getPartitionPath() const
|
||||
{
|
||||
return task_shard.task_table.getPartitionPath(name);
|
||||
}
|
||||
|
||||
String ShardPartition::getPartitionPiecePath(size_t current_piece_number) const
|
||||
{
|
||||
assert(current_piece_number < task_shard.task_table.number_of_splits);
|
||||
return task_shard.task_table.getPartitionPiecePath(name, current_piece_number);
|
||||
}
|
||||
|
||||
String ShardPartition::getShardStatusPath() const
|
||||
{
|
||||
// schema: /<root...>/tables/<table>/<partition>/shards/<shard>
|
||||
|
224
dbms/programs/copier/ZookeeperStaff.h
Normal file
224
dbms/programs/copier/ZookeeperStaff.h
Normal file
@ -0,0 +1,224 @@
|
||||
#pragma once
|
||||
|
||||
/** Allows to compare two incremental counters of type UInt32 in presence of possible overflow.
|
||||
* We assume that we compare values that are not too far away.
|
||||
* For example, when we increment 0xFFFFFFFF, we get 0. So, 0xFFFFFFFF is less than 0.
|
||||
*/
|
||||
class WrappingUInt32
|
||||
{
|
||||
public:
|
||||
UInt32 value;
|
||||
|
||||
explicit WrappingUInt32(UInt32 _value)
|
||||
: value(_value)
|
||||
{}
|
||||
|
||||
bool operator<(const WrappingUInt32 & other) const
|
||||
{
|
||||
return value != other.value && *this <= other;
|
||||
}
|
||||
|
||||
bool operator<=(const WrappingUInt32 & other) const
|
||||
{
|
||||
const UInt32 HALF = 1 << 31;
|
||||
return (value <= other.value && other.value - value < HALF)
|
||||
|| (value > other.value && value - other.value > HALF);
|
||||
}
|
||||
|
||||
bool operator==(const WrappingUInt32 & other) const
|
||||
{
|
||||
return value == other.value;
|
||||
}
|
||||
};
|
||||
|
||||
/** Conforming Zxid definition.
|
||||
* cf. https://github.com/apache/zookeeper/blob/631d1b284f0edb1c4f6b0fb221bf2428aec71aaa/zookeeper-docs/src/main/resources/markdown/zookeeperInternals.md#guarantees-properties-and-definitions
|
||||
*
|
||||
* But it is better to read this: https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html
|
||||
*
|
||||
* Actually here is the definition of Zxid.
|
||||
* Every change to the ZooKeeper state receives a stamp in the form of a zxid (ZooKeeper Transaction Id).
|
||||
* This exposes the total ordering of all changes to ZooKeeper. Each change will have a unique zxid
|
||||
* and if zxid1 is smaller than zxid2 then zxid1 happened before zxid2.
|
||||
*/
|
||||
class Zxid
|
||||
{
|
||||
public:
|
||||
WrappingUInt32 epoch;
|
||||
WrappingUInt32 counter;
|
||||
explicit Zxid(UInt64 _zxid)
|
||||
: epoch(_zxid >> 32)
|
||||
, counter(_zxid)
|
||||
{}
|
||||
|
||||
bool operator<=(const Zxid & other) const
|
||||
{
|
||||
return (epoch < other.epoch)
|
||||
|| (epoch == other.epoch && counter <= other.counter);
|
||||
}
|
||||
|
||||
bool operator==(const Zxid & other) const
|
||||
{
|
||||
return epoch == other.epoch && counter == other.counter;
|
||||
}
|
||||
};
|
||||
|
||||
/* When multiple ClusterCopiers discover that the target partition is not empty,
|
||||
* they will attempt to clean up this partition before proceeding to copying.
|
||||
*
|
||||
* Instead of purging is_dirty, the history of cleaning work is preserved and partition hygiene is established
|
||||
* based on a happens-before relation between the events.
|
||||
* This relation is encoded by LogicalClock based on the mzxid of the is_dirty ZNode and is_dirty/cleaned.
|
||||
* The fact of the partition hygiene is encoded by CleanStateClock.
|
||||
*
|
||||
* For you to know what mzxid means:
|
||||
*
|
||||
* ZooKeeper Stat Structure:
|
||||
* The Stat structure for each znode in ZooKeeper is made up of the following fields:
|
||||
*
|
||||
* -- czxid
|
||||
* The zxid of the change that caused this znode to be created.
|
||||
*
|
||||
* -- mzxid
|
||||
* The zxid of the change that last modified this znode.
|
||||
*
|
||||
* -- ctime
|
||||
* The time in milliseconds from epoch when this znode was created.
|
||||
*
|
||||
* -- mtime
|
||||
* The time in milliseconds from epoch when this znode was last modified.
|
||||
*
|
||||
* -- version
|
||||
* The number of changes to the data of this znode.
|
||||
*
|
||||
* -- cversion
|
||||
* The number of changes to the children of this znode.
|
||||
*
|
||||
* -- aversion
|
||||
* The number of changes to the ACL of this znode.
|
||||
*
|
||||
* -- ephemeralOwner
|
||||
* The session id of the owner of this znode if the znode is an ephemeral node.
|
||||
* If it is not an ephemeral node, it will be zero.
|
||||
*
|
||||
* -- dataLength
|
||||
* The length of the data field of this znode.
|
||||
*
|
||||
* -- numChildren
|
||||
* The number of children of this znode.
|
||||
* */
|
||||
|
||||
class LogicalClock
|
||||
{
|
||||
public:
|
||||
std::optional<Zxid> zxid;
|
||||
|
||||
LogicalClock() = default;
|
||||
|
||||
explicit LogicalClock(UInt64 _zxid)
|
||||
: zxid(_zxid)
|
||||
{}
|
||||
|
||||
bool hasHappened() const
|
||||
{
|
||||
return bool(zxid);
|
||||
}
|
||||
|
||||
/// happens-before relation with a reasonable time bound
|
||||
bool happensBefore(const LogicalClock & other) const
|
||||
{
|
||||
return !zxid
|
||||
|| (other.zxid && *zxid <= *other.zxid);
|
||||
}
|
||||
|
||||
bool operator<=(const LogicalClock & other) const
|
||||
{
|
||||
return happensBefore(other);
|
||||
}
|
||||
|
||||
/// strict equality check
|
||||
bool operator==(const LogicalClock & other) const
|
||||
{
|
||||
return zxid == other.zxid;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
class CleanStateClock
|
||||
{
|
||||
public:
|
||||
LogicalClock discovery_zxid;
|
||||
std::optional<UInt32> discovery_version;
|
||||
|
||||
LogicalClock clean_state_zxid;
|
||||
std::optional<UInt32> clean_state_version;
|
||||
|
||||
std::shared_ptr<std::atomic_bool> stale;
|
||||
|
||||
bool is_clean() const
|
||||
{
|
||||
return
|
||||
!is_stale()
|
||||
&& (
|
||||
!discovery_zxid.hasHappened()
|
||||
|| (clean_state_zxid.hasHappened() && discovery_zxid <= clean_state_zxid));
|
||||
}
|
||||
|
||||
bool is_stale() const
|
||||
{
|
||||
return stale->load();
|
||||
}
|
||||
|
||||
CleanStateClock(
|
||||
const zkutil::ZooKeeperPtr & zookeeper,
|
||||
const String & discovery_path,
|
||||
const String & clean_state_path)
|
||||
: stale(std::make_shared<std::atomic_bool>(false))
|
||||
{
|
||||
Coordination::Stat stat{};
|
||||
String _some_data;
|
||||
auto watch_callback =
|
||||
[stale = stale] (const Coordination::WatchResponse & rsp)
|
||||
{
|
||||
auto logger = &Poco::Logger::get("ClusterCopier");
|
||||
if (rsp.error == Coordination::ZOK)
|
||||
{
|
||||
switch (rsp.type)
|
||||
{
|
||||
case Coordination::CREATED:
|
||||
LOG_DEBUG(logger, "CleanStateClock change: CREATED, at " << rsp.path);
|
||||
stale->store(true);
|
||||
break;
|
||||
case Coordination::CHANGED:
|
||||
LOG_DEBUG(logger, "CleanStateClock change: CHANGED, at" << rsp.path);
|
||||
stale->store(true);
|
||||
}
|
||||
}
|
||||
};
|
||||
if (zookeeper->tryGetWatch(discovery_path, _some_data, &stat, watch_callback))
|
||||
{
|
||||
discovery_zxid = LogicalClock(stat.mzxid);
|
||||
discovery_version = stat.version;
|
||||
}
|
||||
if (zookeeper->tryGetWatch(clean_state_path, _some_data, &stat, watch_callback))
|
||||
{
|
||||
clean_state_zxid = LogicalClock(stat.mzxid);
|
||||
clean_state_version = stat.version;
|
||||
}
|
||||
}
|
||||
|
||||
bool operator==(const CleanStateClock & other) const
|
||||
{
|
||||
return !is_stale()
|
||||
&& !other.is_stale()
|
||||
&& discovery_zxid == other.discovery_zxid
|
||||
&& discovery_version == other.discovery_version
|
||||
&& clean_state_zxid == other.clean_state_zxid
|
||||
&& clean_state_version == other.clean_state_version;
|
||||
}
|
||||
|
||||
bool operator!=(const CleanStateClock & other) const
|
||||
{
|
||||
return !(*this == other);
|
||||
}
|
||||
};
|
Loading…
Reference in New Issue
Block a user