This commit is contained in:
Alexander Tokmakov 2024-02-16 14:05:22 +01:00
parent 22c97be2a4
commit 228a29b93f
15 changed files with 46 additions and 53 deletions

View File

@ -669,11 +669,12 @@ Coordination::Error ZooKeeper::multiImpl(const Coordination::Requests & requests
if (check_session_valid)
{
if (code != Coordination::Error::ZOK && getFailedOpIndex(code, responses) == requests.size())
if (code != Coordination::Error::ZOK && !Coordination::isHardwareError(code) && getFailedOpIndex(code, responses) == requests.size())
{
impl->finalize(fmt::format("Session was killed: {}", requests.back()->getPath()));
}
responses.pop_back();
chassert(code == Coordination::Error::ZOK || responses.back()->error != Coordination::Error::ZOK);
}
return code;
@ -956,17 +957,6 @@ Coordination::ReconfigResponse ZooKeeper::reconfig(
return future_result.get();
}
ZooKeeperPtr ZooKeeper::createWithoutKillingPreviousSessions(const ZooKeeperArgs & args_)
{
return std::shared_ptr<ZooKeeper>(new ZooKeeper(args_));
}
ZooKeeperPtr ZooKeeper::createWithoutKillingPreviousSessions(const Poco::Util::AbstractConfiguration & config, const std::string & config_name)
{
return std::shared_ptr<ZooKeeper>(new ZooKeeper(config, config_name, /* zk_log */ nullptr));
}
ZooKeeperPtr ZooKeeper::create(const Poco::Util::AbstractConfiguration & config, const std::string & config_name, std::shared_ptr<DB::ZooKeeperLog> zk_log_)
{
auto res = std::shared_ptr<ZooKeeper>(new ZooKeeper(config, config_name, zk_log_));

View File

@ -222,7 +222,7 @@ class ZooKeeper
<identity>user:password</identity>
</zookeeper>
*/
ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name, std::shared_ptr<DB::ZooKeeperLog> zk_log_);
ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name, std::shared_ptr<DB::ZooKeeperLog> zk_log_ = nullptr);
/// See addCheckSessionOp
void initSession();
@ -237,10 +237,11 @@ public:
const std::string & config_name,
std::shared_ptr<DB::ZooKeeperLog> zk_log_);
static Ptr createWithoutKillingPreviousSessions(const Poco::Util::AbstractConfiguration & config,
const std::string & config_name);
static Ptr createWithoutKillingPreviousSessions(const ZooKeeperArgs & args_);
template <typename... Args>
static Ptr createWithoutKillingPreviousSessions(Args &&... args)
{
return std::shared_ptr<ZooKeeper>(new ZooKeeper(std::forward<Args>(args)...));
}
/// Creates a new session with the same parameters. This method can be used for reconnecting
/// after the session has expired.

View File

@ -5,9 +5,9 @@
int main(int argc, char ** argv)
try
{
zkutil::ZooKeeper zookeeper{zkutil::ZooKeeperArgs("localhost:2181")};
auto zookeeper = zkutil::ZooKeeper::createWithoutKillingPreviousSessions(zkutil::ZooKeeperArgs("localhost:2181"));
auto nodes = zookeeper.getChildren("/tmp");
auto nodes = zookeeper->getChildren("/tmp");
if (argc < 2)
{
@ -26,7 +26,7 @@ try
std::vector<std::future<Coordination::GetResponse>> futures;
futures.reserve(nodes.size());
for (auto & node : nodes)
futures.push_back(zookeeper.asyncGet("/tmp/" + node));
futures.push_back(zookeeper->asyncGet("/tmp/" + node));
for (auto & future : futures)
std::cerr << (future.get().data.empty() ? ',' : '.');

View File

@ -16,34 +16,34 @@ try
return 1;
}
ZooKeeper zk{zkutil::ZooKeeperArgs(argv[1])};
auto zk = ZooKeeper::createWithoutKillingPreviousSessions(zkutil::ZooKeeperArgs(argv[1]));
std::cout << "create path" << std::endl;
zk.create("/test", "old", zkutil::CreateMode::Persistent);
zk->create("/test", "old", zkutil::CreateMode::Persistent);
Coordination::Stat stat;
zkutil::EventPtr watch = std::make_shared<Poco::Event>();
std::cout << "get path" << std::endl;
zk.get("/test", &stat, watch);
zk->get("/test", &stat, watch);
std::cout << "set path" << std::endl;
zk.set("/test", "new");
zk->set("/test", "new");
watch->wait();
std::cout << "watch happened" << std::endl;
std::cout << "remove path" << std::endl;
std::cout << "list path" << std::endl;
Strings children = zk.getChildren("/");
Strings children = zk->getChildren("/");
for (const auto & name : children)
std::cerr << "\t" << name << "\n";
zk.remove("/test");
zk->remove("/test");
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest("/test", "multi1", CreateMode::Persistent));
ops.emplace_back(zkutil::makeSetRequest("/test", "multi2", -1));
ops.emplace_back(zkutil::makeRemoveRequest("/test", -1));
std::cout << "multi" << std::endl;
Coordination::Responses res = zk.multi(ops);
Coordination::Responses res = zk->multi(ops);
std::cout << "path created: " << dynamic_cast<const Coordination::CreateResponse &>(*res[0]).path_created << std::endl;
return 0;

View File

@ -556,7 +556,7 @@ void ZooKeeperMetadataTransaction::commit()
if (state != CREATED)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Incorrect state ({}), it's a bug", state);
state = FAILED;
current_zookeeper->multi(ops);
current_zookeeper->multi(ops, /* check_session_valid */ true);
state = COMMITTED;
}

View File

@ -682,7 +682,7 @@ std::pair<int32_t, int32_t> ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::Zo
ops.emplace_back(zkutil::makeSetRequest(
fs::path(replica_path) / "min_unprocessed_insert_time", toString(*min_unprocessed_insert_time_changed), -1));
auto responses = zookeeper->multi(ops);
auto responses = zookeeper->multi(ops, /* check_session_valid */ true);
/// Now we have successfully updated the queue in ZooKeeper. Update it in RAM.

View File

@ -233,7 +233,7 @@ public:
will_be);
}
zookeeper->multi(requests);
zookeeper->multi(requests, /* check_session_valid */ true);
}
};
@ -1210,7 +1210,7 @@ void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr loca
}
Coordination::Responses responses;
auto status = client->tryMulti(delete_requests, responses);
auto status = client->tryMulti(delete_requests, responses, /* check_session_valid */ true);
if (status == Coordination::Error::ZOK)
return;

View File

@ -1293,7 +1293,7 @@ bool StorageReplicatedMergeTree::removeTableNodesFromZooKeeper(zkutil::ZooKeeper
ops.emplace_back(zkutil::makeRemoveRequest(metadata_drop_lock->getPath(), -1));
ops.emplace_back(zkutil::makeRemoveRequest(fs::path(zookeeper_path) / "dropped", -1));
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path, -1));
code = zookeeper->tryMulti(ops, responses);
code = zookeeper->tryMulti(ops, responses, /* check_session_valid */ true);
if (code == Coordination::Error::ZNONODE)
{
@ -2897,7 +2897,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
/// Check that log pointer of source replica didn't changed while we read queue entries
ops.push_back(zkutil::makeCheckRequest(fs::path(source_path) / "log_pointer", log_pointer_stat.version));
auto rc = zookeeper->tryMulti(ops, responses);
auto rc = zookeeper->tryMulti(ops, responses, /* check_session_valid */ true);
if (rc == Coordination::Error::ZOK)
{
@ -3252,7 +3252,7 @@ void StorageReplicatedMergeTree::cloneMetadataIfNeeded(const String & source_rep
ops.emplace_back(zkutil::makeCheckRequest(source_path + "/metadata", metadata_stat.version));
ops.emplace_back(zkutil::makeCheckRequest(source_path + "/columns", columns_stat.version));
Coordination::Error code = zookeeper->tryMulti(ops, responses);
Coordination::Error code = zookeeper->tryMulti(ops, responses, /* check_session_valid */ true);
if (code == Coordination::Error::ZOK)
break;
else if (code == Coordination::Error::ZBADVERSION)
@ -4127,7 +4127,7 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n
zkutil::CreateMode::PersistentSequential));
Coordination::Responses results;
auto rc = zookeeper->tryMulti(ops, results);
auto rc = zookeeper->tryMulti(ops, results, /* check_session_valid */ true);
if (rc == Coordination::Error::ZBADVERSION)
{
@ -5841,7 +5841,7 @@ bool StorageReplicatedMergeTree::executeMetadataAlter(const StorageReplicatedMer
}
else
{
zookeeper->multi(requests);
zookeeper->multi(requests, /* check_session_valid */ true);
}
{

View File

@ -26,7 +26,7 @@ try
auto config = processor.loadConfig().configuration;
String root_path = argv[2];
zkutil::ZooKeeper zk(*config, zkutil::getZooKeeperConfigName(*config), nullptr);
auto zk = zkutil::ZooKeeper::createWithoutKillingPreviousSessions(*config, zkutil::getZooKeeperConfigName(*config), nullptr);
String temp_path = root_path + "/temp";
String blocks_path = root_path + "/block_numbers";
@ -34,7 +34,7 @@ try
Stopwatch total_timer;
Stopwatch timer;
EphemeralLocksInAllPartitions locks(blocks_path, "test_lock-", temp_path, zk);
EphemeralLocksInAllPartitions locks(blocks_path, "test_lock-", temp_path, *zk);
std::cerr << "Locked, elapsed: " << timer.elapsedSeconds() << std::endl;
for (const auto & lock : locks.getLocks())

View File

@ -29,7 +29,7 @@ try
auto config = processor.loadConfig().configuration;
String zookeeper_path = argv[2];
auto zookeeper = std::make_shared<zkutil::ZooKeeper>(*config, zkutil::getZooKeeperConfigName(*config), nullptr);
auto zookeeper = zkutil::ZooKeeper::createWithoutKillingPreviousSessions(*config, zkutil::getZooKeeperConfigName(*config), nullptr);
std::unordered_map<String, std::set<Int64>> current_inserts;

View File

@ -5,16 +5,18 @@ log
::ffff:127.0.0.1 Request 0 Create /test/01158/default/rmt/log/log- 0 1 \N 0 1 \N \N \N 0 0 0 0
::ffff:127.0.0.1 Response 0 Create /test/01158/default/rmt/log/log- 0 1 \N 0 1 ZOK \N \N /test/01158/default/rmt/log/log-0000000000 0 0 0 0
parts
Request 0 Multi 0 0 \N 4 0 \N \N \N 0 0 0 0
Request 0 Multi 0 0 \N 5 0 \N \N \N 0 0 0 0
Request 0 Create /test/01158/default/rmt/log/log- 0 1 \N 0 1 \N \N \N 0 0 0 0
Request 0 Remove /test/01158/default/rmt/block_numbers/all/block-0000000000 0 0 -1 0 2 \N \N \N 0 0 0 0
Request 0 Create /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 \N 0 3 \N \N \N 0 0 0 0
Request 0 Create /test/01158/default/rmt/replicas/1/parts/all_0_0_0 0 0 \N 0 4 \N \N \N 0 0 0 0
Response 0 Multi 0 0 \N 4 0 ZOK \N \N 0 0 0 0
Request 0 Check /clickhouse/sessions/<uuid> 0 0 1 0 5 \N \N \N 0 0 0 0
Response 0 Multi 0 0 \N 5 0 ZOK \N \N 0 0 0 0
Response 0 Create /test/01158/default/rmt/log/log- 0 1 \N 0 1 ZOK \N \N /test/01158/default/rmt/log/log-0000000000 0 0 0 0
Response 0 Remove /test/01158/default/rmt/block_numbers/all/block-0000000000 0 0 -1 0 2 ZOK \N \N 0 0 0 0
Response 0 Create /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 \N 0 3 ZOK \N \N /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 0 0
Response 0 Create /test/01158/default/rmt/replicas/1/parts/all_0_0_0 0 0 \N 0 4 ZOK \N \N /test/01158/default/rmt/replicas/1/parts/all_0_0_0 0 0 0 0
Response 0 Check /clickhouse/sessions/<uuid> 0 0 1 0 5 ZOK \N \N 0 0 0 0
Request 0 Exists /test/01158/default/rmt/replicas/1/parts/all_0_0_0 0 0 \N 0 0 \N \N \N 0 0 0 0
Response 0 Exists /test/01158/default/rmt/replicas/1/parts/all_0_0_0 0 0 \N 0 0 ZOK \N \N 0 0 96 0
blocks

View File

@ -26,7 +26,7 @@ from system.zookeeper_log where path like '/test/01158/' || currentDatabase() ||
order by xid, type, request_idx;
select 'parts';
select type, has_watch, op_num, path, is_ephemeral, is_sequential, version, requests_size, request_idx, error, watch_type,
select type, has_watch, op_num, replace(path, toString(serverUUID()), '<uuid>'), is_ephemeral, is_sequential, version, requests_size, request_idx, error, watch_type,
watch_state, path_created, stat_version, stat_cversion, stat_dataLength, stat_numChildren
from system.zookeeper_log
where (session_id, xid) in (select session_id, xid from system.zookeeper_log where path='/test/01158/' || currentDatabase() || '/rmt/replicas/1/parts/all_0_0_0')

View File

@ -69,7 +69,7 @@ int main(int argc, char ** argv)
Poco::Logger::root().setChannel(channel);
Poco::Logger::root().setLevel("trace");
zkutil::ZooKeeper zk{zkutil::ZooKeeperArgs(argv[1])};
auto zk = zkutil::ZooKeeper::createWithoutKillingPreviousSessions(zkutil::ZooKeeperArgs(argv[1]));
DB::LineReader lr({}, false, {"\\"}, {});
do
@ -96,7 +96,7 @@ int main(int argc, char ** argv)
ss >> w;
bool watch = w == "w";
zkutil::EventPtr event = watch ? std::make_shared<Poco::Event>() : nullptr;
std::vector<std::string> v = zk.getChildren(path, nullptr, event);
std::vector<std::string> v = zk->getChildren(path, nullptr, event);
for (const auto & child : v)
std::cout << child << std::endl;
if (watch)
@ -132,15 +132,15 @@ int main(int argc, char ** argv)
std::cout << "Bad create mode" << std::endl;
continue;
}
std::cout << zk.create(path, data, m) << std::endl;
std::cout << zk->create(path, data, m) << std::endl;
}
else if (cmd == "remove")
{
zk.remove(path);
zk->remove(path);
}
else if (cmd == "rmr")
{
zk.removeRecursive(path);
zk->removeRecursive(path);
}
else if (cmd == "exists")
{
@ -149,7 +149,7 @@ int main(int argc, char ** argv)
bool watch = w == "w";
zkutil::EventPtr event = watch ? std::make_shared<Poco::Event>() : nullptr;
Coordination::Stat stat;
bool e = zk.exists(path, &stat, event);
bool e = zk->exists(path, &stat, event);
if (e)
printStat(stat);
else
@ -164,7 +164,7 @@ int main(int argc, char ** argv)
bool watch = w == "w";
zkutil::EventPtr event = watch ? std::make_shared<Poco::Event>() : nullptr;
Coordination::Stat stat;
std::string data = zk.get(path, &stat, event);
std::string data = zk->get(path, &stat, event);
std::cout << "Data: " << data << std::endl;
printStat(stat);
if (watch)
@ -188,7 +188,7 @@ int main(int argc, char ** argv)
DB::readText(version, in);
Coordination::Stat stat;
zk.set(path, data, version, &stat);
zk->set(path, data, version, &stat);
printStat(stat);
}
else if (!cmd.empty())

View File

@ -33,7 +33,7 @@ int main(int argc, char ** argv)
bool dump_ctime = options.count("ctime");
zkutil::ZooKeeperPtr zookeeper = std::make_shared<zkutil::ZooKeeper>(options.at("address").as<std::string>());
zkutil::ZooKeeperPtr zookeeper = zkutil::ZooKeeper::createWithoutKillingPreviousSessions(options.at("address").as<std::string>());
std::string initial_path = options.at("path").as<std::string>();

View File

@ -26,7 +26,7 @@ try
return 1;
}
zkutil::ZooKeeper zookeeper(options.at("address").as<std::string>());
auto zookeeper = zkutil::ZooKeeper::createWithoutKillingPreviousSessions(options.at("address").as<std::string>());
DB::ReadBufferFromFileDescriptor in(STDIN_FILENO);
std::list<std::future<Coordination::RemoveResponse>> futures;
@ -37,7 +37,7 @@ try
std::string path;
DB::readEscapedString(path, in);
DB::assertString("\n", in);
futures.emplace_back(zookeeper.asyncRemove(path));
futures.emplace_back(zookeeper->asyncRemove(path));
std::cerr << ".";
}
std::cerr << "\n";