2022-08-12 09:32:13 +00:00
|
|
|
#include <Storages/MergeTree/ReplicatedMergeTreeAttachThread.h>
|
|
|
|
#include <Storages/StorageReplicatedMergeTree.h>
|
2022-08-24 17:44:14 +00:00
|
|
|
#include <Common/ZooKeeper/IKeeper.h>
|
2022-08-12 09:32:13 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2022-08-25 20:08:48 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int SUPPORT_IS_DISABLED;
|
2022-09-26 18:11:30 +00:00
|
|
|
extern const int REPLICA_STATUS_CHANGED;
|
2022-08-25 20:08:48 +00:00
|
|
|
}
|
|
|
|
|
2022-08-12 09:32:13 +00:00
|
|
|
ReplicatedMergeTreeAttachThread::ReplicatedMergeTreeAttachThread(StorageReplicatedMergeTree & storage_)
|
|
|
|
: storage(storage_)
|
|
|
|
, log_name(storage.getStorageID().getFullTableName() + " (ReplicatedMergeTreeAttachThread)")
|
|
|
|
, log(&Poco::Logger::get(log_name))
|
|
|
|
{
|
|
|
|
task = storage.getContext()->getSchedulePool().createTask(log_name, [this] { run(); });
|
2022-08-16 11:20:58 +00:00
|
|
|
const auto storage_settings = storage.getSettings();
|
|
|
|
retry_period = storage_settings->initialization_retry_period.totalSeconds();
|
2022-08-12 09:32:13 +00:00
|
|
|
}
|
|
|
|
|
2022-08-17 08:28:53 +00:00
|
|
|
ReplicatedMergeTreeAttachThread::~ReplicatedMergeTreeAttachThread()
|
|
|
|
{
|
|
|
|
shutdown();
|
|
|
|
}
|
|
|
|
|
2022-08-12 09:32:13 +00:00
|
|
|
void ReplicatedMergeTreeAttachThread::shutdown()
|
|
|
|
{
|
2022-08-17 08:28:53 +00:00
|
|
|
if (!shutdown_called.exchange(true))
|
2022-08-19 14:56:17 +00:00
|
|
|
{
|
2022-08-17 08:28:53 +00:00
|
|
|
task->deactivate();
|
2022-08-19 14:56:17 +00:00
|
|
|
LOG_INFO(log, "Attach thread finished");
|
|
|
|
}
|
2022-08-12 09:32:13 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void ReplicatedMergeTreeAttachThread::run()
|
|
|
|
{
|
2022-08-19 08:17:02 +00:00
|
|
|
bool needs_retry{false};
|
2022-08-12 09:32:13 +00:00
|
|
|
try
|
|
|
|
{
|
2022-08-19 09:39:59 +00:00
|
|
|
// we delay the first reconnect if the storage failed to connect to ZK initially
|
|
|
|
if (!first_try_done && !storage.current_zookeeper)
|
|
|
|
{
|
|
|
|
needs_retry = true;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
runImpl();
|
|
|
|
finalizeInitialization();
|
|
|
|
}
|
2022-08-19 08:17:02 +00:00
|
|
|
}
|
|
|
|
catch (const Exception & e)
|
|
|
|
{
|
|
|
|
if (const auto * coordination_exception = dynamic_cast<const Coordination::Exception *>(&e))
|
2022-08-24 17:44:14 +00:00
|
|
|
needs_retry = Coordination::isHardwareError(coordination_exception->code);
|
2022-09-26 18:11:30 +00:00
|
|
|
else if (e.code() == ErrorCodes::REPLICA_STATUS_CHANGED)
|
|
|
|
needs_retry = true;
|
2022-08-17 08:28:53 +00:00
|
|
|
|
2022-08-24 17:44:14 +00:00
|
|
|
if (needs_retry)
|
2022-08-17 08:28:53 +00:00
|
|
|
{
|
2022-08-24 17:44:14 +00:00
|
|
|
LOG_ERROR(log, "Initialization failed. Error: {}", e.message());
|
2022-08-19 11:12:20 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2022-08-24 17:44:14 +00:00
|
|
|
LOG_ERROR(log, "Initialization failed, table will remain readonly. Error: {}", e.message());
|
|
|
|
storage.initialization_done = true;
|
2022-08-17 08:28:53 +00:00
|
|
|
}
|
2022-08-19 08:17:02 +00:00
|
|
|
}
|
2022-08-12 09:32:13 +00:00
|
|
|
|
2022-08-19 08:17:02 +00:00
|
|
|
if (!first_try_done.exchange(true))
|
|
|
|
first_try_done.notify_one();
|
2022-08-16 08:19:02 +00:00
|
|
|
|
2022-08-19 08:17:02 +00:00
|
|
|
if (shutdown_called)
|
|
|
|
{
|
|
|
|
LOG_WARNING(log, "Shutdown called, cancelling initialization");
|
|
|
|
return;
|
|
|
|
}
|
2022-08-12 09:32:13 +00:00
|
|
|
|
2022-08-19 08:17:02 +00:00
|
|
|
if (needs_retry)
|
2022-08-19 08:49:51 +00:00
|
|
|
{
|
|
|
|
LOG_INFO(log, "Will retry initialization in {}s", retry_period);
|
2022-08-19 08:17:02 +00:00
|
|
|
task->scheduleAfter(retry_period * 1000);
|
2022-08-19 08:49:51 +00:00
|
|
|
}
|
2022-08-19 08:17:02 +00:00
|
|
|
}
|
2022-08-12 09:32:13 +00:00
|
|
|
|
2022-08-25 20:08:48 +00:00
|
|
|
void ReplicatedMergeTreeAttachThread::checkHasReplicaMetadataInZooKeeper(const zkutil::ZooKeeperPtr & zookeeper, const String & replica_path)
|
|
|
|
{
|
2022-09-26 18:11:30 +00:00
|
|
|
/// Since 20.4 and until 22.9 "/metadata" node was created on replica startup and "/metadata_version" was created on ALTER.
|
2022-08-25 20:08:48 +00:00
|
|
|
/// Since 21.12 we could use "/metadata" to check if replica is dropped (see StorageReplicatedMergeTree::dropReplica),
|
|
|
|
/// but it did not work correctly, because "/metadata" node was re-created on server startup.
|
|
|
|
/// Since 22.9 we do not recreate these nodes and use "/host" to check if replica is dropped.
|
|
|
|
|
|
|
|
String replica_metadata;
|
|
|
|
const bool replica_metadata_exists = zookeeper->tryGet(replica_path + "/metadata", replica_metadata);
|
2022-09-26 18:11:30 +00:00
|
|
|
if (!replica_metadata_exists || replica_metadata.empty())
|
2022-08-25 20:08:48 +00:00
|
|
|
{
|
|
|
|
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Upgrade from 20.3 and older to 22.9 and newer "
|
|
|
|
"should be done through an intermediate version (failed to get metadata or metadata_version for {},"
|
|
|
|
"assuming it's because of upgrading)", replica_path);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-08-19 08:17:02 +00:00
|
|
|
void ReplicatedMergeTreeAttachThread::runImpl()
|
|
|
|
{
|
2022-08-19 09:39:59 +00:00
|
|
|
storage.setZooKeeper();
|
2022-08-16 08:19:02 +00:00
|
|
|
|
2022-08-19 08:49:51 +00:00
|
|
|
auto zookeeper = storage.getZooKeeper();
|
2022-08-19 08:17:02 +00:00
|
|
|
const auto & zookeeper_path = storage.zookeeper_path;
|
|
|
|
bool metadata_exists = zookeeper->exists(zookeeper_path + "/metadata");
|
|
|
|
if (!metadata_exists)
|
|
|
|
{
|
|
|
|
LOG_WARNING(log, "No metadata in ZooKeeper for {}: table will stay in readonly mode.", zookeeper_path);
|
|
|
|
storage.has_metadata_in_zookeeper = false;
|
|
|
|
return;
|
|
|
|
}
|
2022-08-12 09:32:13 +00:00
|
|
|
|
2022-08-19 08:17:02 +00:00
|
|
|
auto metadata_snapshot = storage.getInMemoryMetadataPtr();
|
2022-08-12 09:32:13 +00:00
|
|
|
|
2022-08-19 08:17:02 +00:00
|
|
|
const auto & replica_path = storage.replica_path;
|
|
|
|
/// May it be ZK lost not the whole root, so the upper check passed, but only the /replicas/replica
|
|
|
|
/// folder.
|
|
|
|
bool replica_path_exists = zookeeper->exists(replica_path);
|
|
|
|
if (!replica_path_exists)
|
|
|
|
{
|
|
|
|
LOG_WARNING(log, "No metadata in ZooKeeper for {}: table will stay in readonly mode", replica_path);
|
|
|
|
storage.has_metadata_in_zookeeper = false;
|
|
|
|
return;
|
|
|
|
}
|
2022-08-12 09:32:13 +00:00
|
|
|
|
2022-08-25 20:08:48 +00:00
|
|
|
bool host_node_exists = zookeeper->exists(replica_path + "/host");
|
|
|
|
if (!host_node_exists)
|
2022-08-19 08:17:02 +00:00
|
|
|
{
|
2022-08-25 20:08:48 +00:00
|
|
|
LOG_WARNING(log, "Replica {} is dropped (but metadata is not completely removed from ZooKeeper), "
|
|
|
|
"table will stay in readonly mode", replica_path);
|
|
|
|
storage.has_metadata_in_zookeeper = false;
|
|
|
|
return;
|
|
|
|
}
|
2022-08-12 09:32:13 +00:00
|
|
|
|
2022-08-25 20:08:48 +00:00
|
|
|
storage.has_metadata_in_zookeeper = true;
|
2022-08-12 09:32:13 +00:00
|
|
|
|
2022-08-25 20:08:48 +00:00
|
|
|
checkHasReplicaMetadataInZooKeeper(zookeeper, replica_path);
|
2022-08-19 08:17:02 +00:00
|
|
|
|
2022-09-26 18:11:30 +00:00
|
|
|
String replica_metadata_version;
|
|
|
|
const bool replica_metadata_version_exists = zookeeper->tryGet(replica_path + "/metadata_version", replica_metadata_version);
|
|
|
|
if (replica_metadata_version_exists)
|
|
|
|
{
|
2022-09-27 11:26:45 +00:00
|
|
|
storage.metadata_version = parse<int>(replica_metadata_version);
|
2022-09-26 18:11:30 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
/// Table was created before 20.4 and was never altered,
|
|
|
|
/// let's initialize replica metadata version from global metadata version.
|
|
|
|
Coordination::Stat table_metadata_version_stat;
|
|
|
|
zookeeper->get(zookeeper_path + "/metadata", &table_metadata_version_stat);
|
|
|
|
|
|
|
|
Coordination::Requests ops;
|
|
|
|
ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/metadata", table_metadata_version_stat.version));
|
|
|
|
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata_version", toString(table_metadata_version_stat.version), zkutil::CreateMode::Persistent));
|
|
|
|
|
|
|
|
Coordination::Responses res;
|
|
|
|
auto code = zookeeper->tryMulti(ops, res);
|
|
|
|
|
|
|
|
if (code == Coordination::Error::ZBADVERSION)
|
|
|
|
throw Exception(ErrorCodes::REPLICA_STATUS_CHANGED, "Failed to initialize metadata_version "
|
|
|
|
"because table was concurrently altered, will retry");
|
|
|
|
|
|
|
|
zkutil::KeeperMultiException::check(code, ops, res);
|
|
|
|
}
|
|
|
|
|
2022-08-19 08:49:51 +00:00
|
|
|
storage.checkTableStructure(replica_path, metadata_snapshot);
|
|
|
|
storage.checkParts(skip_sanity_checks);
|
2022-08-19 08:17:02 +00:00
|
|
|
|
|
|
|
/// Temporary directories contain uninitialized results of Merges or Fetches (after forced restart),
|
|
|
|
/// don't allow to reinitialize them, delete each of them immediately.
|
|
|
|
storage.clearOldTemporaryDirectories(0, {"tmp_", "delete_tmp_", "tmp-fetch_"});
|
|
|
|
storage.clearOldWriteAheadLogs();
|
|
|
|
if (storage.getSettings()->merge_tree_enable_clear_old_broken_detached)
|
2022-09-05 01:50:24 +00:00
|
|
|
storage.clearOldBrokenPartsFromDetachedDirectory();
|
2022-08-17 08:28:53 +00:00
|
|
|
|
2022-08-19 08:49:51 +00:00
|
|
|
storage.createNewZooKeeperNodes();
|
|
|
|
storage.syncPinnedPartUUIDs();
|
2022-08-17 08:28:53 +00:00
|
|
|
|
2022-08-19 08:49:51 +00:00
|
|
|
storage.createTableSharedID();
|
2022-08-12 09:32:13 +00:00
|
|
|
};
|
|
|
|
|
2022-08-19 11:12:20 +00:00
|
|
|
void ReplicatedMergeTreeAttachThread::finalizeInitialization() TSA_NO_THREAD_SAFETY_ANALYSIS
|
2022-08-16 08:19:02 +00:00
|
|
|
{
|
2022-08-24 17:44:14 +00:00
|
|
|
storage.startupImpl();
|
2022-08-19 11:12:20 +00:00
|
|
|
storage.initialization_done = true;
|
|
|
|
LOG_INFO(log, "Table is initialized");
|
2022-08-16 08:19:02 +00:00
|
|
|
}
|
|
|
|
|
2022-08-12 09:32:13 +00:00
|
|
|
void ReplicatedMergeTreeAttachThread::setSkipSanityChecks(bool skip_sanity_checks_)
|
|
|
|
{
|
|
|
|
skip_sanity_checks = skip_sanity_checks_;
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|