2020-11-27 14:04:03 +00:00
|
|
|
#include <Databases/DatabaseReplicatedWorker.h>
|
|
|
|
#include <Databases/DatabaseReplicated.h>
|
|
|
|
#include <Interpreters/DDLTask.h>
|
2021-05-31 13:31:03 +00:00
|
|
|
#include <Common/ZooKeeper/KeeperException.h>
|
2021-05-08 10:59:55 +00:00
|
|
|
#include <filesystem>
|
|
|
|
|
|
|
|
namespace fs = std::filesystem;
|
2020-11-27 14:04:03 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int LOGICAL_ERROR;
|
2021-02-01 19:29:47 +00:00
|
|
|
extern const int DATABASE_REPLICATION_FAILED;
|
2021-02-04 19:41:44 +00:00
|
|
|
extern const int NOT_A_LEADER;
|
|
|
|
extern const int UNFINISHED;
|
2020-11-27 14:04:03 +00:00
|
|
|
}
|
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
DatabaseReplicatedDDLWorker::DatabaseReplicatedDDLWorker(DatabaseReplicated * db, ContextPtr context_)
|
2020-11-27 14:04:03 +00:00
|
|
|
: DDLWorker(/* pool_size */ 1, db->zookeeper_path + "/log", context_, nullptr, {}, fmt::format("DDLWorker({})", db->getDatabaseName()))
|
|
|
|
, database(db)
|
|
|
|
{
|
2021-02-01 19:29:47 +00:00
|
|
|
/// Pool size must be 1 to avoid reordering of log entries.
|
|
|
|
/// TODO Make a dependency graph of DDL queries. It will allow to execute independent entries in parallel.
|
|
|
|
/// We also need similar graph to load tables on server startup in order of topsort.
|
2020-11-27 14:04:03 +00:00
|
|
|
}
|
|
|
|
|
DDLWorker: avoid NULL dereference on termination and failed zookeeper initialization
Log snipped shows the problem:
2021.02.24 04:40:29.349181 [ 39 ] {} <Warning> DDLWorker: DDLWorker is configured to use multiple threads. It's not recommended because queries can be reordered. Also it may cause some unknown issues to appear.
2021.02.24 04:40:29.349516 [ 39 ] {} <Information> Application: Ready for connections.
2021.02.24 04:40:29.349602 [ 74 ] {} <Debug> DDLWorker: Started DDLWorker cleanup thread
2021.02.24 04:40:29.349639 [ 73 ] {} <Debug> DDLWorker: Starting DDLWorker thread
2021.02.24 04:40:29.349698 [ 73 ] {} <Debug> DDLWorker: Started DDLWorker thread
2021.02.24 04:40:29.352548 [ 73 ] {} <Error> virtual void DB::DDLWorker::initializeMainThread(): Code: 999, e.displayText() = Coordination::Exception: All connection tries failed while connecting to ZooKeeper. nodes: 192.168.112.3:2181
Poco::Exception. Code: 1000, e.code() = 111, e.displayText() = Connection refused (version 21.3.1.1), 192.168.112.3:2181
Poco::Exception. Code: 1000, e.code() = 111, e.displayText() = Connection refused (version 21.3.1.1), 192.168.112.3:2181
Poco::Exception. Code: 1000, e.code() = 111, e.displayText() = Connection refused (version 21.3.1.1), 192.168.112.3:2181
(Connection loss), Stack trace (when copying this message, always include the lines below):
0. Coordination::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Coordination::Error, int) @ 0xfe93923 in /usr/bin/clickhouse
1. Coordination::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Coordination::Error) @ 0xfe93ba2 in /usr/bin/clickhouse
2. Coordination::ZooKeeper::connect(std::__1::vector<Coordination::ZooKeeper::Node, std::__1::allocator<Coordination::ZooKeeper::Node> > const&, Poco::Timespan) @ 0xfed3a01 in /usr/bin/clickhouse
3. Coordination::ZooKeeper::ZooKeeper(std::__1::vector<Coordination::ZooKeeper::Node, std::__1::allocator<Coordination::ZooKeeper::Node> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Poco::Timespan, Poco::Timespan, Poco::Timespan) @ 0xfed2222 in /usr/bin/clickhouse
4. zkutil::ZooKeeper::init(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int, int, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) @ 0xfe961cd in /usr/bin/clickhouse
5. zkutil::ZooKeeper::ZooKeeper(Poco::Util::AbstractConfiguration const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) @ 0xfe97a96 in /usr/bin/clickhouse
6. void std::__1::allocator_traits<std::__1::allocator<zkutil::ZooKeeper> >::__construct<zkutil::ZooKeeper, Poco::Util::AbstractConfiguration const&, char const (&) [10]>(std::__1::integral_constant<bool, true>, std::__1::allocator<zkutil::ZooKeeper>&, zkutil::ZooKeeper*, Poco::Util::AbstractConfiguration const&, char const (&) [10]) @ 0xed98387 in /usr/bin/clickhouse
7. DB::Context::getZooKeeper() const @ 0xed75190 in /usr/bin/clickhouse
8. DB::DDLWorker::getAndSetZooKeeper() @ 0xedb81c9 in /usr/bin/clickhouse
9. DB::DDLWorker::initializeMainThread() @ 0xedc9eb0 in /usr/bin/clickhouse
10. DB::DDLWorker::runMainThread() @ 0xedb5d01 in /usr/bin/clickhouse
11. ThreadFromGlobalPool::ThreadFromGlobalPool<void (DB::DDLWorker::*)(), DB::DDLWorker*>(void (DB::DDLWorker::*&&)(), DB::DDLWorker*&&)::'lambda'()::operator()() @ 0xedcafa1 in /usr/bin/clickhouse
12. ThreadPoolImpl<std::__1::thread>::worker(std::__1::__list_iterator<std::__1::thread, void*>) @ 0x892651f in /usr/bin/clickhouse
13. ? @ 0x8929fb3 in /usr/bin/clickhouse
14. start_thread @ 0x8ea7 in /lib/x86_64-linux-gnu/libpthread-2.31.so
15. __clone @ 0xfddef in /lib/x86_64-linux-gnu/libc-2.31.so
(version 21.3.1.1)
...
2021.02.24 04:40:30.025278 [ 41 ] {} <Trace> BaseDaemon: Received signal 15
2021.02.24 04:40:30.025336 [ 41 ] {} <Information> Application: Received termination signal (Terminated)
...
2021.02.24 04:40:30.582078 [ 39 ] {} <Information> Application: Closed all listening sockets.
2021.02.24 04:40:30.582124 [ 39 ] {} <Information> Application: Closed connections.
2021.02.24 04:40:30.583770 [ 39 ] {} <Information> Application: Shutting down storages.
2021.02.24 04:40:30.583932 [ 39 ] {} <Information> Context: Shutdown disk data
2021.02.24 04:40:30.583951 [ 39 ] {} <Information> Context: Shutdown disk default
2021.02.24 04:40:30.584163 [ 46 ] {} <Trace> SystemLog (system.query_log): Terminating
2021.02.24 04:40:30.586025 [ 39 ] {} <Trace> BackgroundSchedulePool/BgSchPool: Waiting for threads to finish.
2021.02.24 04:40:34.352701 [ 73 ] {} <Debug> DDLWorker: Initialized DDLWorker thread
2021.02.24 04:40:34.352758 [ 73 ] {} <Debug> DDLWorker: Scheduling tasks
2021-02-24 05:07:31 +00:00
|
|
|
bool DatabaseReplicatedDDLWorker::initializeMainThread()
|
2020-12-03 18:14:27 +00:00
|
|
|
{
|
2021-02-10 20:30:40 +00:00
|
|
|
while (!stop_flag)
|
2020-12-03 18:14:27 +00:00
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
2022-06-29 14:27:21 +00:00
|
|
|
chassert(!database->is_probably_dropped);
|
2020-12-03 18:14:27 +00:00
|
|
|
auto zookeeper = getAndSetZooKeeper();
|
2021-02-15 00:04:46 +00:00
|
|
|
if (database->is_readonly)
|
2022-07-20 20:54:43 +00:00
|
|
|
database->tryConnectToZooKeeperAndInitDatabase(LoadingStrictnessLevel::ATTACH);
|
2020-12-03 18:14:27 +00:00
|
|
|
initializeReplication();
|
|
|
|
initialized = true;
|
DDLWorker: avoid NULL dereference on termination and failed zookeeper initialization
Log snipped shows the problem:
2021.02.24 04:40:29.349181 [ 39 ] {} <Warning> DDLWorker: DDLWorker is configured to use multiple threads. It's not recommended because queries can be reordered. Also it may cause some unknown issues to appear.
2021.02.24 04:40:29.349516 [ 39 ] {} <Information> Application: Ready for connections.
2021.02.24 04:40:29.349602 [ 74 ] {} <Debug> DDLWorker: Started DDLWorker cleanup thread
2021.02.24 04:40:29.349639 [ 73 ] {} <Debug> DDLWorker: Starting DDLWorker thread
2021.02.24 04:40:29.349698 [ 73 ] {} <Debug> DDLWorker: Started DDLWorker thread
2021.02.24 04:40:29.352548 [ 73 ] {} <Error> virtual void DB::DDLWorker::initializeMainThread(): Code: 999, e.displayText() = Coordination::Exception: All connection tries failed while connecting to ZooKeeper. nodes: 192.168.112.3:2181
Poco::Exception. Code: 1000, e.code() = 111, e.displayText() = Connection refused (version 21.3.1.1), 192.168.112.3:2181
Poco::Exception. Code: 1000, e.code() = 111, e.displayText() = Connection refused (version 21.3.1.1), 192.168.112.3:2181
Poco::Exception. Code: 1000, e.code() = 111, e.displayText() = Connection refused (version 21.3.1.1), 192.168.112.3:2181
(Connection loss), Stack trace (when copying this message, always include the lines below):
0. Coordination::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Coordination::Error, int) @ 0xfe93923 in /usr/bin/clickhouse
1. Coordination::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Coordination::Error) @ 0xfe93ba2 in /usr/bin/clickhouse
2. Coordination::ZooKeeper::connect(std::__1::vector<Coordination::ZooKeeper::Node, std::__1::allocator<Coordination::ZooKeeper::Node> > const&, Poco::Timespan) @ 0xfed3a01 in /usr/bin/clickhouse
3. Coordination::ZooKeeper::ZooKeeper(std::__1::vector<Coordination::ZooKeeper::Node, std::__1::allocator<Coordination::ZooKeeper::Node> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Poco::Timespan, Poco::Timespan, Poco::Timespan) @ 0xfed2222 in /usr/bin/clickhouse
4. zkutil::ZooKeeper::init(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int, int, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) @ 0xfe961cd in /usr/bin/clickhouse
5. zkutil::ZooKeeper::ZooKeeper(Poco::Util::AbstractConfiguration const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) @ 0xfe97a96 in /usr/bin/clickhouse
6. void std::__1::allocator_traits<std::__1::allocator<zkutil::ZooKeeper> >::__construct<zkutil::ZooKeeper, Poco::Util::AbstractConfiguration const&, char const (&) [10]>(std::__1::integral_constant<bool, true>, std::__1::allocator<zkutil::ZooKeeper>&, zkutil::ZooKeeper*, Poco::Util::AbstractConfiguration const&, char const (&) [10]) @ 0xed98387 in /usr/bin/clickhouse
7. DB::Context::getZooKeeper() const @ 0xed75190 in /usr/bin/clickhouse
8. DB::DDLWorker::getAndSetZooKeeper() @ 0xedb81c9 in /usr/bin/clickhouse
9. DB::DDLWorker::initializeMainThread() @ 0xedc9eb0 in /usr/bin/clickhouse
10. DB::DDLWorker::runMainThread() @ 0xedb5d01 in /usr/bin/clickhouse
11. ThreadFromGlobalPool::ThreadFromGlobalPool<void (DB::DDLWorker::*)(), DB::DDLWorker*>(void (DB::DDLWorker::*&&)(), DB::DDLWorker*&&)::'lambda'()::operator()() @ 0xedcafa1 in /usr/bin/clickhouse
12. ThreadPoolImpl<std::__1::thread>::worker(std::__1::__list_iterator<std::__1::thread, void*>) @ 0x892651f in /usr/bin/clickhouse
13. ? @ 0x8929fb3 in /usr/bin/clickhouse
14. start_thread @ 0x8ea7 in /lib/x86_64-linux-gnu/libpthread-2.31.so
15. __clone @ 0xfddef in /lib/x86_64-linux-gnu/libc-2.31.so
(version 21.3.1.1)
...
2021.02.24 04:40:30.025278 [ 41 ] {} <Trace> BaseDaemon: Received signal 15
2021.02.24 04:40:30.025336 [ 41 ] {} <Information> Application: Received termination signal (Terminated)
...
2021.02.24 04:40:30.582078 [ 39 ] {} <Information> Application: Closed all listening sockets.
2021.02.24 04:40:30.582124 [ 39 ] {} <Information> Application: Closed connections.
2021.02.24 04:40:30.583770 [ 39 ] {} <Information> Application: Shutting down storages.
2021.02.24 04:40:30.583932 [ 39 ] {} <Information> Context: Shutdown disk data
2021.02.24 04:40:30.583951 [ 39 ] {} <Information> Context: Shutdown disk default
2021.02.24 04:40:30.584163 [ 46 ] {} <Trace> SystemLog (system.query_log): Terminating
2021.02.24 04:40:30.586025 [ 39 ] {} <Trace> BackgroundSchedulePool/BgSchPool: Waiting for threads to finish.
2021.02.24 04:40:34.352701 [ 73 ] {} <Debug> DDLWorker: Initialized DDLWorker thread
2021.02.24 04:40:34.352758 [ 73 ] {} <Debug> DDLWorker: Scheduling tasks
2021-02-24 05:07:31 +00:00
|
|
|
return true;
|
2020-12-03 18:14:27 +00:00
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
tryLogCurrentException(log, fmt::format("Error on initialization of {}", database->getDatabaseName()));
|
|
|
|
sleepForSeconds(5);
|
|
|
|
}
|
|
|
|
}
|
DDLWorker: avoid NULL dereference on termination and failed zookeeper initialization
Log snipped shows the problem:
2021.02.24 04:40:29.349181 [ 39 ] {} <Warning> DDLWorker: DDLWorker is configured to use multiple threads. It's not recommended because queries can be reordered. Also it may cause some unknown issues to appear.
2021.02.24 04:40:29.349516 [ 39 ] {} <Information> Application: Ready for connections.
2021.02.24 04:40:29.349602 [ 74 ] {} <Debug> DDLWorker: Started DDLWorker cleanup thread
2021.02.24 04:40:29.349639 [ 73 ] {} <Debug> DDLWorker: Starting DDLWorker thread
2021.02.24 04:40:29.349698 [ 73 ] {} <Debug> DDLWorker: Started DDLWorker thread
2021.02.24 04:40:29.352548 [ 73 ] {} <Error> virtual void DB::DDLWorker::initializeMainThread(): Code: 999, e.displayText() = Coordination::Exception: All connection tries failed while connecting to ZooKeeper. nodes: 192.168.112.3:2181
Poco::Exception. Code: 1000, e.code() = 111, e.displayText() = Connection refused (version 21.3.1.1), 192.168.112.3:2181
Poco::Exception. Code: 1000, e.code() = 111, e.displayText() = Connection refused (version 21.3.1.1), 192.168.112.3:2181
Poco::Exception. Code: 1000, e.code() = 111, e.displayText() = Connection refused (version 21.3.1.1), 192.168.112.3:2181
(Connection loss), Stack trace (when copying this message, always include the lines below):
0. Coordination::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Coordination::Error, int) @ 0xfe93923 in /usr/bin/clickhouse
1. Coordination::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Coordination::Error) @ 0xfe93ba2 in /usr/bin/clickhouse
2. Coordination::ZooKeeper::connect(std::__1::vector<Coordination::ZooKeeper::Node, std::__1::allocator<Coordination::ZooKeeper::Node> > const&, Poco::Timespan) @ 0xfed3a01 in /usr/bin/clickhouse
3. Coordination::ZooKeeper::ZooKeeper(std::__1::vector<Coordination::ZooKeeper::Node, std::__1::allocator<Coordination::ZooKeeper::Node> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Poco::Timespan, Poco::Timespan, Poco::Timespan) @ 0xfed2222 in /usr/bin/clickhouse
4. zkutil::ZooKeeper::init(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int, int, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) @ 0xfe961cd in /usr/bin/clickhouse
5. zkutil::ZooKeeper::ZooKeeper(Poco::Util::AbstractConfiguration const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) @ 0xfe97a96 in /usr/bin/clickhouse
6. void std::__1::allocator_traits<std::__1::allocator<zkutil::ZooKeeper> >::__construct<zkutil::ZooKeeper, Poco::Util::AbstractConfiguration const&, char const (&) [10]>(std::__1::integral_constant<bool, true>, std::__1::allocator<zkutil::ZooKeeper>&, zkutil::ZooKeeper*, Poco::Util::AbstractConfiguration const&, char const (&) [10]) @ 0xed98387 in /usr/bin/clickhouse
7. DB::Context::getZooKeeper() const @ 0xed75190 in /usr/bin/clickhouse
8. DB::DDLWorker::getAndSetZooKeeper() @ 0xedb81c9 in /usr/bin/clickhouse
9. DB::DDLWorker::initializeMainThread() @ 0xedc9eb0 in /usr/bin/clickhouse
10. DB::DDLWorker::runMainThread() @ 0xedb5d01 in /usr/bin/clickhouse
11. ThreadFromGlobalPool::ThreadFromGlobalPool<void (DB::DDLWorker::*)(), DB::DDLWorker*>(void (DB::DDLWorker::*&&)(), DB::DDLWorker*&&)::'lambda'()::operator()() @ 0xedcafa1 in /usr/bin/clickhouse
12. ThreadPoolImpl<std::__1::thread>::worker(std::__1::__list_iterator<std::__1::thread, void*>) @ 0x892651f in /usr/bin/clickhouse
13. ? @ 0x8929fb3 in /usr/bin/clickhouse
14. start_thread @ 0x8ea7 in /lib/x86_64-linux-gnu/libpthread-2.31.so
15. __clone @ 0xfddef in /lib/x86_64-linux-gnu/libc-2.31.so
(version 21.3.1.1)
...
2021.02.24 04:40:30.025278 [ 41 ] {} <Trace> BaseDaemon: Received signal 15
2021.02.24 04:40:30.025336 [ 41 ] {} <Information> Application: Received termination signal (Terminated)
...
2021.02.24 04:40:30.582078 [ 39 ] {} <Information> Application: Closed all listening sockets.
2021.02.24 04:40:30.582124 [ 39 ] {} <Information> Application: Closed connections.
2021.02.24 04:40:30.583770 [ 39 ] {} <Information> Application: Shutting down storages.
2021.02.24 04:40:30.583932 [ 39 ] {} <Information> Context: Shutdown disk data
2021.02.24 04:40:30.583951 [ 39 ] {} <Information> Context: Shutdown disk default
2021.02.24 04:40:30.584163 [ 46 ] {} <Trace> SystemLog (system.query_log): Terminating
2021.02.24 04:40:30.586025 [ 39 ] {} <Trace> BackgroundSchedulePool/BgSchPool: Waiting for threads to finish.
2021.02.24 04:40:34.352701 [ 73 ] {} <Debug> DDLWorker: Initialized DDLWorker thread
2021.02.24 04:40:34.352758 [ 73 ] {} <Debug> DDLWorker: Scheduling tasks
2021-02-24 05:07:31 +00:00
|
|
|
|
|
|
|
return false;
|
2020-12-03 18:14:27 +00:00
|
|
|
}
|
|
|
|
|
2021-02-12 16:22:01 +00:00
|
|
|
void DatabaseReplicatedDDLWorker::shutdown()
|
|
|
|
{
|
|
|
|
DDLWorker::shutdown();
|
|
|
|
wait_current_task_change.notify_all();
|
|
|
|
}
|
|
|
|
|
2020-12-03 18:14:27 +00:00
|
|
|
void DatabaseReplicatedDDLWorker::initializeReplication()
|
2020-11-27 14:04:03 +00:00
|
|
|
{
|
|
|
|
/// Check if we need to recover replica.
|
2021-02-04 19:41:44 +00:00
|
|
|
/// Invariant: replica is lost if it's log_ptr value is less then max_log_ptr - logs_to_keep.
|
2020-11-27 14:04:03 +00:00
|
|
|
|
2021-07-30 16:34:18 +00:00
|
|
|
auto zookeeper = getAndSetZooKeeper();
|
|
|
|
String log_ptr_str = zookeeper->get(database->replica_path + "/log_ptr");
|
2021-02-09 15:14:20 +00:00
|
|
|
UInt32 our_log_ptr = parse<UInt32>(log_ptr_str);
|
2021-07-30 16:34:18 +00:00
|
|
|
UInt32 max_log_ptr = parse<UInt32>(zookeeper->get(database->zookeeper_path + "/max_log_ptr"));
|
|
|
|
logs_to_keep = parse<UInt32>(zookeeper->get(database->zookeeper_path + "/logs_to_keep"));
|
2021-02-08 19:36:17 +00:00
|
|
|
if (our_log_ptr == 0 || our_log_ptr + logs_to_keep < max_log_ptr)
|
2022-04-04 22:51:48 +00:00
|
|
|
{
|
2021-07-30 16:34:18 +00:00
|
|
|
database->recoverLostReplica(zookeeper, our_log_ptr, max_log_ptr);
|
2022-04-05 15:36:53 +00:00
|
|
|
zookeeper->set(database->replica_path + "/log_ptr", toString(max_log_ptr));
|
|
|
|
initializeLogPointer(DDLTaskBase::getLogEntryName(max_log_ptr));
|
2022-04-04 22:51:48 +00:00
|
|
|
}
|
2021-02-09 15:14:20 +00:00
|
|
|
else
|
2022-04-04 22:51:48 +00:00
|
|
|
{
|
2022-04-05 15:36:53 +00:00
|
|
|
String log_entry_name = DDLTaskBase::getLogEntryName(our_log_ptr);
|
|
|
|
last_skipped_entry_name.emplace(log_entry_name);
|
|
|
|
initializeLogPointer(log_entry_name);
|
2022-04-04 22:51:48 +00:00
|
|
|
}
|
2020-11-27 14:04:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
String DatabaseReplicatedDDLWorker::enqueueQuery(DDLLogEntry & entry)
|
|
|
|
{
|
|
|
|
auto zookeeper = getAndSetZooKeeper();
|
2021-05-31 13:31:03 +00:00
|
|
|
return enqueueQueryImpl(zookeeper, entry, database);
|
|
|
|
}
|
|
|
|
|
2022-05-01 13:40:18 +00:00
|
|
|
|
|
|
|
bool DatabaseReplicatedDDLWorker::waitForReplicaToProcessAllEntries(UInt64 timeout_ms)
|
|
|
|
{
|
|
|
|
auto zookeeper = getAndSetZooKeeper();
|
|
|
|
const auto our_log_ptr_path = database->replica_path + "/log_ptr";
|
|
|
|
const auto max_log_ptr_path = database->zookeeper_path + "/max_log_ptr";
|
|
|
|
UInt32 our_log_ptr = parse<UInt32>(zookeeper->get(our_log_ptr_path));
|
|
|
|
UInt32 max_log_ptr = parse<UInt32>(zookeeper->get(max_log_ptr_path));
|
2022-06-29 14:27:21 +00:00
|
|
|
chassert(our_log_ptr <= max_log_ptr);
|
2022-05-01 13:40:18 +00:00
|
|
|
|
|
|
|
/// max_log_ptr is the number of the last successfully executed request on the initiator
|
|
|
|
/// The log could contain other entries which are not committed yet
|
|
|
|
/// This equality is enough to say that current replicas is up-to-date
|
|
|
|
if (our_log_ptr == max_log_ptr)
|
|
|
|
return true;
|
|
|
|
|
|
|
|
auto max_log = DDLTask::getLogEntryName(max_log_ptr);
|
|
|
|
LOG_TRACE(log, "Waiting for worker thread to process all entries before {}, current task is {}", max_log, current_task);
|
|
|
|
|
|
|
|
{
|
2022-05-06 16:37:20 +00:00
|
|
|
std::unique_lock lock{mutex};
|
|
|
|
bool processed = wait_current_task_change.wait_for(lock, std::chrono::milliseconds(timeout_ms), [&]()
|
|
|
|
{
|
|
|
|
return zookeeper->expired() || current_task == max_log || stop_flag;
|
|
|
|
});
|
2022-05-01 13:40:18 +00:00
|
|
|
|
2022-05-06 16:37:20 +00:00
|
|
|
if (!processed)
|
|
|
|
return false;
|
|
|
|
}
|
2022-05-01 13:40:18 +00:00
|
|
|
|
|
|
|
LOG_TRACE(log, "Waiting for worker thread to process all entries before {}, current task is {}", max_log, current_task);
|
|
|
|
|
|
|
|
/// Lets now wait for max_log_ptr to be processed
|
|
|
|
Coordination::Stat stat;
|
|
|
|
auto event_ptr = std::make_shared<Poco::Event>();
|
|
|
|
auto new_log = zookeeper->get(our_log_ptr_path, &stat, event_ptr);
|
|
|
|
|
|
|
|
if (new_log == toString(max_log_ptr))
|
|
|
|
return true;
|
|
|
|
|
|
|
|
return event_ptr->tryWait(timeout_ms);
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-05-31 13:31:03 +00:00
|
|
|
String DatabaseReplicatedDDLWorker::enqueueQueryImpl(const ZooKeeperPtr & zookeeper, DDLLogEntry & entry,
|
|
|
|
DatabaseReplicated * const database, bool committed)
|
|
|
|
{
|
|
|
|
const String query_path_prefix = database->zookeeper_path + "/log/query-";
|
2020-11-27 14:04:03 +00:00
|
|
|
|
|
|
|
/// We cannot create sequential node and it's ephemeral child in a single transaction, so allocate sequential number another way
|
|
|
|
String counter_prefix = database->zookeeper_path + "/counter/cnt-";
|
2021-05-31 13:31:03 +00:00
|
|
|
String counter_lock_path = database->zookeeper_path + "/counter_lock";
|
|
|
|
|
|
|
|
String counter_path;
|
|
|
|
size_t iters = 1000;
|
|
|
|
while (--iters)
|
|
|
|
{
|
|
|
|
Coordination::Requests ops;
|
|
|
|
ops.emplace_back(zkutil::makeCreateRequest(counter_lock_path, database->getFullReplicaName(), zkutil::CreateMode::Ephemeral));
|
|
|
|
ops.emplace_back(zkutil::makeCreateRequest(counter_prefix, "", zkutil::CreateMode::EphemeralSequential));
|
|
|
|
Coordination::Responses res;
|
|
|
|
|
|
|
|
Coordination::Error code = zookeeper->tryMulti(ops, res);
|
|
|
|
if (code == Coordination::Error::ZOK)
|
|
|
|
{
|
|
|
|
counter_path = dynamic_cast<const Coordination::CreateResponse &>(*res.back()).path_created;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
else if (code != Coordination::Error::ZNODEEXISTS)
|
|
|
|
zkutil::KeeperMultiException::check(code, ops, res);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (iters == 0)
|
|
|
|
throw Exception(ErrorCodes::UNFINISHED,
|
|
|
|
"Cannot enqueue query, because some replica are trying to enqueue another query. "
|
|
|
|
"It may happen on high queries rate or, in rare cases, after connection loss. Client should retry.");
|
|
|
|
|
2020-11-27 14:04:03 +00:00
|
|
|
String node_path = query_path_prefix + counter_path.substr(counter_prefix.size());
|
|
|
|
|
2021-05-31 13:31:03 +00:00
|
|
|
/// Now create task in queue
|
2020-11-27 14:04:03 +00:00
|
|
|
Coordination::Requests ops;
|
|
|
|
/// Query is not committed yet, but we have to write it into log to avoid reordering
|
|
|
|
ops.emplace_back(zkutil::makeCreateRequest(node_path, entry.toString(), zkutil::CreateMode::Persistent));
|
|
|
|
/// '/try' will be replaced with '/committed' or will be removed due to expired session or other error
|
2021-05-31 13:31:03 +00:00
|
|
|
if (committed)
|
|
|
|
ops.emplace_back(zkutil::makeCreateRequest(node_path + "/committed", database->getFullReplicaName(), zkutil::CreateMode::Persistent));
|
|
|
|
else
|
|
|
|
ops.emplace_back(zkutil::makeCreateRequest(node_path + "/try", database->getFullReplicaName(), zkutil::CreateMode::Ephemeral));
|
2020-11-27 14:04:03 +00:00
|
|
|
/// We don't need it anymore
|
|
|
|
ops.emplace_back(zkutil::makeRemoveRequest(counter_path, -1));
|
2021-05-31 13:31:03 +00:00
|
|
|
/// Unlock counters
|
|
|
|
ops.emplace_back(zkutil::makeRemoveRequest(counter_lock_path, -1));
|
2020-11-27 14:04:03 +00:00
|
|
|
/// Create status dirs
|
|
|
|
ops.emplace_back(zkutil::makeCreateRequest(node_path + "/active", "", zkutil::CreateMode::Persistent));
|
|
|
|
ops.emplace_back(zkutil::makeCreateRequest(node_path + "/finished", "", zkutil::CreateMode::Persistent));
|
|
|
|
zookeeper->multi(ops);
|
|
|
|
|
2021-05-31 13:31:03 +00:00
|
|
|
|
2020-11-27 14:04:03 +00:00
|
|
|
return node_path;
|
|
|
|
}
|
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
String DatabaseReplicatedDDLWorker::tryEnqueueAndExecuteEntry(DDLLogEntry & entry, ContextPtr query_context)
|
2021-02-01 19:29:47 +00:00
|
|
|
{
|
2021-02-04 19:41:44 +00:00
|
|
|
/// NOTE Possibly it would be better to execute initial query on the most up-to-date node,
|
|
|
|
/// but it requires more complex logic around /try node.
|
|
|
|
|
2021-02-01 19:29:47 +00:00
|
|
|
auto zookeeper = getAndSetZooKeeper();
|
2022-04-04 22:51:48 +00:00
|
|
|
UInt32 our_log_ptr = getLogPointer();
|
2021-02-04 19:41:44 +00:00
|
|
|
UInt32 max_log_ptr = parse<UInt32>(zookeeper->get(database->zookeeper_path + "/max_log_ptr"));
|
2022-04-04 22:51:48 +00:00
|
|
|
|
|
|
|
if (our_log_ptr + database->db_settings.max_replication_lag_to_enqueue < max_log_ptr)
|
2021-02-04 19:41:44 +00:00
|
|
|
throw Exception(ErrorCodes::NOT_A_LEADER, "Cannot enqueue query on this replica, "
|
|
|
|
"because it has replication lag of {} queries. Try other replica.", max_log_ptr - our_log_ptr);
|
2021-02-01 19:29:47 +00:00
|
|
|
|
|
|
|
String entry_path = enqueueQuery(entry);
|
|
|
|
auto try_node = zkutil::EphemeralNodeHolder::existing(entry_path + "/try", *zookeeper);
|
|
|
|
String entry_name = entry_path.substr(entry_path.rfind('/') + 1);
|
|
|
|
auto task = std::make_unique<DatabaseReplicatedTask>(entry_name, entry_path, database);
|
|
|
|
task->entry = entry;
|
|
|
|
task->parseQueryFromEntry(context);
|
2022-06-29 14:27:21 +00:00
|
|
|
chassert(!task->entry.query.empty());
|
2021-02-01 19:29:47 +00:00
|
|
|
assert(!zookeeper->exists(task->getFinishedNodePath()));
|
|
|
|
task->is_initial_query = true;
|
|
|
|
|
|
|
|
LOG_DEBUG(log, "Waiting for worker thread to process all entries before {}", entry_name);
|
2021-04-10 23:33:54 +00:00
|
|
|
UInt64 timeout = query_context->getSettingsRef().database_replicated_initial_query_timeout_sec;
|
2021-02-01 19:29:47 +00:00
|
|
|
{
|
|
|
|
std::unique_lock lock{mutex};
|
2021-02-04 19:41:44 +00:00
|
|
|
bool processed = wait_current_task_change.wait_for(lock, std::chrono::seconds(timeout), [&]()
|
|
|
|
{
|
|
|
|
assert(zookeeper->expired() || current_task <= entry_name);
|
|
|
|
return zookeeper->expired() || current_task == entry_name || stop_flag;
|
|
|
|
});
|
|
|
|
|
|
|
|
if (!processed)
|
2021-03-18 19:13:21 +00:00
|
|
|
throw Exception(ErrorCodes::UNFINISHED, "Timeout: Cannot enqueue query on this replica, "
|
2021-02-04 19:41:44 +00:00
|
|
|
"most likely because replica is busy with previous queue entries");
|
2021-02-01 19:29:47 +00:00
|
|
|
}
|
|
|
|
|
2021-02-12 16:22:01 +00:00
|
|
|
if (zookeeper->expired() || stop_flag)
|
|
|
|
throw Exception(ErrorCodes::DATABASE_REPLICATION_FAILED, "ZooKeeper session expired or replication stopped, try again");
|
2021-02-01 19:29:47 +00:00
|
|
|
|
2021-02-19 23:41:58 +00:00
|
|
|
processTask(*task, zookeeper);
|
2021-02-01 19:29:47 +00:00
|
|
|
|
|
|
|
if (!task->was_executed)
|
|
|
|
{
|
2022-03-28 09:48:17 +00:00
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::LOGICAL_ERROR,
|
|
|
|
"Entry {} was executed, but was not committed: code {}: {}",
|
|
|
|
task->entry_name,
|
|
|
|
task->execution_status.code,
|
|
|
|
task->execution_status.message);
|
2021-02-01 19:29:47 +00:00
|
|
|
}
|
|
|
|
|
2021-02-19 23:41:58 +00:00
|
|
|
try_node->setAlreadyRemoved();
|
2021-02-01 19:29:47 +00:00
|
|
|
|
|
|
|
return entry_path;
|
|
|
|
}
|
|
|
|
|
2020-11-27 14:04:03 +00:00
|
|
|
DDLTaskPtr DatabaseReplicatedDDLWorker::initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper)
|
|
|
|
{
|
2021-02-01 19:29:47 +00:00
|
|
|
{
|
|
|
|
std::lock_guard lock{mutex};
|
2021-02-04 19:41:44 +00:00
|
|
|
if (current_task < entry_name)
|
|
|
|
{
|
|
|
|
current_task = entry_name;
|
|
|
|
wait_current_task_change.notify_all();
|
|
|
|
}
|
2021-02-01 19:29:47 +00:00
|
|
|
}
|
|
|
|
|
2022-04-04 22:51:48 +00:00
|
|
|
UInt32 our_log_ptr = getLogPointer();
|
2020-11-27 14:04:03 +00:00
|
|
|
UInt32 entry_num = DatabaseReplicatedTask::getLogEntryNumber(entry_name);
|
|
|
|
|
|
|
|
if (entry_num <= our_log_ptr)
|
|
|
|
{
|
|
|
|
out_reason = fmt::format("Task {} already executed according to log pointer {}", entry_name, our_log_ptr);
|
|
|
|
return {};
|
|
|
|
}
|
|
|
|
|
2021-05-08 10:59:55 +00:00
|
|
|
String entry_path = fs::path(queue_dir) / entry_name;
|
2020-11-27 14:04:03 +00:00
|
|
|
auto task = std::make_unique<DatabaseReplicatedTask>(entry_name, entry_path, database);
|
|
|
|
|
|
|
|
String initiator_name;
|
|
|
|
zkutil::EventPtr wait_committed_or_failed = std::make_shared<Poco::Event>();
|
|
|
|
|
2021-05-08 10:59:55 +00:00
|
|
|
String try_node_path = fs::path(entry_path) / "try";
|
2021-02-04 19:41:44 +00:00
|
|
|
if (zookeeper->tryGet(try_node_path, initiator_name, nullptr, wait_committed_or_failed))
|
2020-11-27 14:04:03 +00:00
|
|
|
{
|
2021-02-01 19:29:47 +00:00
|
|
|
task->is_initial_query = initiator_name == task->host_id_str;
|
2021-02-04 19:41:44 +00:00
|
|
|
|
2020-11-27 14:04:03 +00:00
|
|
|
/// Query is not committed yet. We cannot just skip it and execute next one, because reordering may break replication.
|
2021-02-01 19:29:47 +00:00
|
|
|
LOG_TRACE(log, "Waiting for initiator {} to commit or rollback entry {}", initiator_name, entry_path);
|
2021-02-04 19:41:44 +00:00
|
|
|
constexpr size_t wait_time_ms = 1000;
|
2021-02-19 23:41:58 +00:00
|
|
|
size_t max_iterations = database->db_settings.wait_entry_commited_timeout_sec;
|
2021-02-04 19:41:44 +00:00
|
|
|
size_t iteration = 0;
|
|
|
|
|
|
|
|
while (!wait_committed_or_failed->tryWait(wait_time_ms))
|
|
|
|
{
|
|
|
|
if (stop_flag)
|
|
|
|
{
|
|
|
|
/// We cannot return task to process and we cannot return nullptr too,
|
|
|
|
/// because nullptr means "task should not be executed".
|
|
|
|
/// We can only exit by exception.
|
|
|
|
throw Exception(ErrorCodes::UNFINISHED, "Replication was stopped");
|
|
|
|
}
|
|
|
|
|
|
|
|
if (max_iterations <= ++iteration)
|
|
|
|
{
|
|
|
|
/// What can we do if initiator hangs for some reason? Seems like we can remove /try node.
|
2021-02-19 23:41:58 +00:00
|
|
|
/// Initiator will fail to commit ZooKeeperMetadataTransaction (including ops for replicated table) if /try does not exist.
|
2021-02-04 19:41:44 +00:00
|
|
|
/// But it's questionable.
|
|
|
|
|
|
|
|
/// We use tryRemove(...) because multiple hosts (including initiator) may try to do it concurrently.
|
|
|
|
auto code = zookeeper->tryRemove(try_node_path);
|
|
|
|
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE)
|
|
|
|
throw Coordination::Exception(code, try_node_path);
|
|
|
|
|
2021-05-08 10:59:55 +00:00
|
|
|
if (!zookeeper->exists(fs::path(entry_path) / "committed"))
|
2021-02-04 19:41:44 +00:00
|
|
|
{
|
|
|
|
out_reason = fmt::format("Entry {} was forcefully cancelled due to timeout", entry_name);
|
|
|
|
return {};
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2020-11-27 14:04:03 +00:00
|
|
|
}
|
|
|
|
|
2021-05-08 10:59:55 +00:00
|
|
|
if (!zookeeper->exists(fs::path(entry_path) / "committed"))
|
2020-11-27 14:04:03 +00:00
|
|
|
{
|
2021-02-04 19:41:44 +00:00
|
|
|
out_reason = fmt::format("Entry {} hasn't been committed", entry_name);
|
2020-11-27 14:04:03 +00:00
|
|
|
return {};
|
|
|
|
}
|
|
|
|
|
2021-02-01 19:29:47 +00:00
|
|
|
if (task->is_initial_query)
|
|
|
|
{
|
2021-05-08 10:59:55 +00:00
|
|
|
assert(!zookeeper->exists(fs::path(entry_path) / "try"));
|
|
|
|
assert(zookeeper->exists(fs::path(entry_path) / "committed") == (zookeeper->get(task->getFinishedNodePath()) == ExecutionStatus(0).serializeText()));
|
2021-02-04 19:41:44 +00:00
|
|
|
out_reason = fmt::format("Entry {} has been executed as initial query", entry_name);
|
2021-02-01 19:29:47 +00:00
|
|
|
return {};
|
|
|
|
}
|
|
|
|
|
2020-11-27 14:04:03 +00:00
|
|
|
String node_data;
|
|
|
|
if (!zookeeper->tryGet(entry_path, node_data))
|
|
|
|
{
|
|
|
|
LOG_ERROR(log, "Cannot get log entry {}", entry_path);
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "should be unreachable");
|
|
|
|
}
|
|
|
|
|
2020-11-29 11:45:32 +00:00
|
|
|
task->entry.parse(node_data);
|
|
|
|
|
|
|
|
if (task->entry.query.empty())
|
2020-11-27 14:04:03 +00:00
|
|
|
{
|
2021-03-09 17:05:24 +00:00
|
|
|
/// Some replica is added or removed, let's update cached cluster
|
|
|
|
database->setCluster(database->getClusterImpl());
|
2021-02-04 19:41:44 +00:00
|
|
|
out_reason = fmt::format("Entry {} is a dummy task", entry_name);
|
2020-12-03 18:14:27 +00:00
|
|
|
return {};
|
2020-11-29 11:45:32 +00:00
|
|
|
}
|
2020-12-03 18:14:27 +00:00
|
|
|
|
|
|
|
task->parseQueryFromEntry(context);
|
|
|
|
|
|
|
|
if (zookeeper->exists(task->getFinishedNodePath()))
|
2020-11-29 11:45:32 +00:00
|
|
|
{
|
2021-02-04 19:41:44 +00:00
|
|
|
out_reason = fmt::format("Task {} has been already processed", entry_name);
|
2020-12-03 18:14:27 +00:00
|
|
|
return {};
|
2020-11-27 14:04:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return task;
|
|
|
|
}
|
|
|
|
|
2021-02-09 15:14:20 +00:00
|
|
|
bool DatabaseReplicatedDDLWorker::canRemoveQueueEntry(const String & entry_name, const Coordination::Stat &)
|
|
|
|
{
|
|
|
|
UInt32 entry_number = DDLTaskBase::getLogEntryNumber(entry_name);
|
2021-05-08 10:59:55 +00:00
|
|
|
UInt32 max_log_ptr = parse<UInt32>(getAndSetZooKeeper()->get(fs::path(database->zookeeper_path) / "max_log_ptr"));
|
2021-02-09 15:14:20 +00:00
|
|
|
return entry_number + logs_to_keep < max_log_ptr;
|
|
|
|
}
|
|
|
|
|
2022-04-05 15:36:53 +00:00
|
|
|
void DatabaseReplicatedDDLWorker::initializeLogPointer(const String & processed_entry_name)
|
2022-04-04 22:51:48 +00:00
|
|
|
{
|
|
|
|
updateMaxDDLEntryID(processed_entry_name);
|
|
|
|
}
|
|
|
|
|
|
|
|
UInt32 DatabaseReplicatedDDLWorker::getLogPointer() const
|
|
|
|
{
|
2022-04-05 15:36:53 +00:00
|
|
|
/// NOTE it may not be equal to the log_ptr in zk:
|
2022-04-04 22:51:48 +00:00
|
|
|
/// - max_id can be equal to log_ptr - 1 due to race condition (when it's updated in zk, but not updated in memory yet)
|
|
|
|
/// - max_id can be greater than log_ptr, because log_ptr is not updated for failed and dummy entries
|
|
|
|
return max_id.load();
|
|
|
|
}
|
|
|
|
|
2020-11-27 14:04:03 +00:00
|
|
|
}
|