DiskLocal checker

Add DiskLocal checker so that ReplicatedMergeTree can recover data when some of its disks are broken.
This commit is contained in:
Amos Bird 2022-02-01 04:47:04 +08:00
parent 321fa4a9e8
commit ec7d367814
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
25 changed files with 696 additions and 66 deletions

View File

@ -194,6 +194,7 @@ namespace
{ {
void setupTmpPath(Poco::Logger * log, const std::string & path) void setupTmpPath(Poco::Logger * log, const std::string & path)
try
{ {
LOG_DEBUG(log, "Setting up {} to store temporary data in it", path); LOG_DEBUG(log, "Setting up {} to store temporary data in it", path);
@ -212,6 +213,15 @@ void setupTmpPath(Poco::Logger * log, const std::string & path)
LOG_DEBUG(log, "Skipped file in temporary path {}", it->path().string()); LOG_DEBUG(log, "Skipped file in temporary path {}", it->path().string());
} }
} }
catch (...)
{
DB::tryLogCurrentException(
log,
fmt::format(
"Caught exception while setup temporary path: {}. It is ok to skip this exception as cleaning old temporary files is not "
"necessary",
path));
}
int waitServersToFinish(std::vector<DB::ProtocolServerAdapter> & servers, size_t seconds_to_wait) int waitServersToFinish(std::vector<DB::ProtocolServerAdapter> & servers, size_t seconds_to_wait)
{ {

View File

@ -11,6 +11,14 @@
#include <fstream> #include <fstream>
#include <unistd.h> #include <unistd.h>
#include <Disks/DiskFactory.h>
#include <Disks/DiskMemory.h>
#include <Disks/DiskRestartProxy.h>
#include <Common/randomSeed.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromTemporaryFile.h>
#include <IO/WriteHelpers.h>
#include <base/logger_useful.h>
namespace CurrentMetrics namespace CurrentMetrics
{ {
@ -25,7 +33,7 @@ namespace ErrorCodes
extern const int UNKNOWN_ELEMENT_IN_CONFIG; extern const int UNKNOWN_ELEMENT_IN_CONFIG;
extern const int EXCESSIVE_ELEMENT_IN_CONFIG; extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
extern const int PATH_ACCESS_DENIED; extern const int PATH_ACCESS_DENIED;
extern const int INCORRECT_DISK_INDEX; extern const int LOGICAL_ERROR;
extern const int CANNOT_TRUNCATE_FILE; extern const int CANNOT_TRUNCATE_FILE;
extern const int CANNOT_UNLINK; extern const int CANNOT_UNLINK;
extern const int CANNOT_RMDIR; extern const int CANNOT_RMDIR;
@ -61,9 +69,6 @@ static void loadDiskLocalConfig(const String & name,
throw Exception("Disk path must end with /. Disk " + name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); throw Exception("Disk path must end with /. Disk " + name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
} }
if (!FS::canRead(path) || !FS::canWrite(path))
throw Exception("There is no RW access to the disk " + name + " (" + path + ")", ErrorCodes::PATH_ACCESS_DENIED);
bool has_space_ratio = config.has(config_prefix + ".keep_free_space_ratio"); bool has_space_ratio = config.has(config_prefix + ".keep_free_space_ratio");
if (config.has(config_prefix + ".keep_free_space_bytes") && has_space_ratio) if (config.has(config_prefix + ".keep_free_space_bytes") && has_space_ratio)
@ -113,13 +118,48 @@ public:
UInt64 getSize() const override { return size; } UInt64 getSize() const override { return size; }
DiskPtr getDisk(size_t i) const override; DiskPtr getDisk(size_t i) const override
{
if (i != 0)
throw Exception("Can't use i != 0 with single disk reservation. It's a bug", ErrorCodes::LOGICAL_ERROR);
return disk;
}
Disks getDisks() const override { return {disk}; } Disks getDisks() const override { return {disk}; }
void update(UInt64 new_size) override; void update(UInt64 new_size) override
{
std::lock_guard lock(DiskLocal::reservation_mutex);
disk->reserved_bytes -= size;
size = new_size;
disk->reserved_bytes += size;
}
~DiskLocalReservation() override; ~DiskLocalReservation() override
{
try
{
std::lock_guard lock(DiskLocal::reservation_mutex);
if (disk->reserved_bytes < size)
{
disk->reserved_bytes = 0;
LOG_ERROR(&Poco::Logger::get("DiskLocal"), "Unbalanced reservations size for disk '{}'.", disk->getName());
}
else
{
disk->reserved_bytes -= size;
}
if (disk->reservation_count == 0)
LOG_ERROR(&Poco::Logger::get("DiskLocal"), "Unbalanced reservation count for disk '{}'.", disk->getName());
else
--disk->reservation_count;
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
private: private:
DiskLocalPtr disk; DiskLocalPtr disk;
@ -188,7 +228,7 @@ bool DiskLocal::tryReserve(UInt64 bytes)
return false; return false;
} }
UInt64 DiskLocal::getTotalSpace() const static UInt64 getTotalSpaceByName(const String & name, const String & disk_path, UInt64 keep_free_space_bytes)
{ {
struct statvfs fs; struct statvfs fs;
if (name == "default") /// for default disk we get space from path/data/ if (name == "default") /// for default disk we get space from path/data/
@ -201,8 +241,17 @@ UInt64 DiskLocal::getTotalSpace() const
return total_size - keep_free_space_bytes; return total_size - keep_free_space_bytes;
} }
UInt64 DiskLocal::getTotalSpace() const
{
if (broken || readonly)
return 0;
return getTotalSpaceByName(name, disk_path, keep_free_space_bytes);
}
UInt64 DiskLocal::getAvailableSpace() const UInt64 DiskLocal::getAvailableSpace() const
{ {
if (broken || readonly)
return 0;
/// we use f_bavail, because part of b_free space is /// we use f_bavail, because part of b_free space is
/// available for superuser only and for system purposes /// available for superuser only and for system purposes
struct statvfs fs; struct statvfs fs;
@ -268,7 +317,7 @@ void DiskLocal::moveDirectory(const String & from_path, const String & to_path)
DiskDirectoryIteratorPtr DiskLocal::iterateDirectory(const String & path) DiskDirectoryIteratorPtr DiskLocal::iterateDirectory(const String & path)
{ {
fs::path meta_path = fs::path(disk_path) / path; fs::path meta_path = fs::path(disk_path) / path;
if (fs::exists(meta_path) && fs::is_directory(meta_path)) if (!broken && fs::exists(meta_path) && fs::is_directory(meta_path))
return std::make_unique<DiskLocalDirectoryIterator>(disk_path, path); return std::make_unique<DiskLocalDirectoryIterator>(disk_path, path);
else else
return std::make_unique<DiskLocalDirectoryIterator>(); return std::make_unique<DiskLocalDirectoryIterator>();
@ -409,49 +458,191 @@ void DiskLocal::applyNewSettings(const Poco::Util::AbstractConfiguration & confi
keep_free_space_bytes = new_keep_free_space_bytes; keep_free_space_bytes = new_keep_free_space_bytes;
} }
DiskPtr DiskLocalReservation::getDisk(size_t i) const DiskLocal::DiskLocal(const String & name_, const String & path_, UInt64 keep_free_space_bytes_)
: name(name_)
, disk_path(path_)
, keep_free_space_bytes(keep_free_space_bytes_)
, logger(&Poco::Logger::get("DiskLocal"))
{ {
if (i != 0)
{
throw Exception("Can't use i != 0 with single disk reservation", ErrorCodes::INCORRECT_DISK_INDEX);
}
return disk;
} }
void DiskLocalReservation::update(UInt64 new_size) DiskLocal::DiskLocal(
const String & name_, const String & path_, UInt64 keep_free_space_bytes_, ContextPtr context, UInt64 local_disk_check_period_ms)
: DiskLocal(name_, path_, keep_free_space_bytes_)
{ {
std::lock_guard lock(DiskLocal::reservation_mutex); if (local_disk_check_period_ms > 0)
disk->reserved_bytes -= size; disk_checker = std::make_unique<DiskLocalCheckThread>(this, context, local_disk_check_period_ms);
size = new_size;
disk->reserved_bytes += size;
} }
DiskLocalReservation::~DiskLocalReservation() void DiskLocal::startup()
{ {
try try
{ {
std::lock_guard lock(DiskLocal::reservation_mutex); broken = false;
if (disk->reserved_bytes < size) disk_checker_magic_number = -1;
{ disk_checker_can_check_read = true;
disk->reserved_bytes = 0; readonly = !setup();
LOG_ERROR(disk->log, "Unbalanced reservations size for disk '{}'.", disk->getName());
}
else
{
disk->reserved_bytes -= size;
}
if (disk->reservation_count == 0)
LOG_ERROR(disk->log, "Unbalanced reservation count for disk '{}'.", disk->getName());
else
--disk->reservation_count;
} }
catch (...) catch (...)
{ {
tryLogCurrentException(__PRETTY_FUNCTION__); tryLogCurrentException(logger, fmt::format("Disk {} is marked as broken during startup", name));
broken = true;
/// Disk checker is disabled when failing to start up.
disk_checker_can_check_read = false;
} }
if (disk_checker && disk_checker_can_check_read)
disk_checker->startup();
} }
void DiskLocal::shutdown()
{
if (disk_checker)
disk_checker->shutdown();
}
std::optional<UInt32> DiskLocal::readDiskCheckerMagicNumber() const noexcept
try
{
ReadSettings read_settings;
/// Proper disk read checking requires direct io
read_settings.direct_io_threshold = 1;
auto buf = readFile(disk_checker_path, read_settings, {}, {});
UInt32 magic_number;
readIntBinary(magic_number, *buf);
if (buf->eof())
return magic_number;
LOG_WARNING(logger, "The size of disk check magic number is more than 4 bytes. Mark it as read failure");
return {};
}
catch (...)
{
tryLogCurrentException(logger, fmt::format("Cannot read correct disk check magic number from from {}{}", disk_path, disk_checker_path));
return {};
}
bool DiskLocal::canRead() const noexcept
try
{
if (FS::canRead(fs::path(disk_path) / disk_checker_path))
{
auto magic_number = readDiskCheckerMagicNumber();
if (magic_number && *magic_number == disk_checker_magic_number)
return true;
}
return false;
}
catch (...)
{
LOG_WARNING(logger, "Cannot achieve read over the disk directory: {}", disk_path);
return false;
}
struct DiskWriteCheckData
{
constexpr static size_t PAGE_SIZE = 4096;
char data[PAGE_SIZE]{};
DiskWriteCheckData()
{
static const char * magic_string = "ClickHouse disk local write check";
static size_t magic_string_len = strlen(magic_string);
memcpy(data, magic_string, magic_string_len);
memcpy(data + PAGE_SIZE - magic_string_len, magic_string, magic_string_len);
}
};
bool DiskLocal::canWrite() const noexcept
try
{
static DiskWriteCheckData data;
String tmp_template = fs::path(disk_path) / "";
{
auto buf = WriteBufferFromTemporaryFile::create(tmp_template);
buf->write(data.data, data.PAGE_SIZE);
buf->sync();
}
return true;
}
catch (...)
{
LOG_WARNING(logger, "Cannot achieve write over the disk directory: {}", disk_path);
return false;
}
bool DiskLocal::setup()
{
try
{
fs::create_directories(disk_path);
}
catch (...)
{
LOG_ERROR(logger, "Cannot create the directory of disk {} ({}).", name, disk_path);
throw;
}
try
{
if (!FS::canRead(disk_path))
throw Exception(ErrorCodes::PATH_ACCESS_DENIED, "There is no read access to disk {} ({}).", name, disk_path);
}
catch (...)
{
LOG_ERROR(logger, "Cannot gain read access of the disk directory: {}", disk_path);
throw;
}
/// If disk checker is disabled, just assume RW by default.
if (!disk_checker)
return true;
try
{
if (exists(disk_checker_path))
{
auto magic_number = readDiskCheckerMagicNumber();
if (magic_number)
disk_checker_magic_number = *magic_number;
else
{
/// The checker file is incorrect. Mark the magic number to uninitialized and try to generate a new checker file.
disk_checker_magic_number = -1;
}
}
}
catch (...)
{
LOG_ERROR(logger, "We cannot tell if {} exists anymore, or read from it. Most likely disk {} is broken", disk_checker_path, name);
throw;
}
/// Try to create a new checker file. The disk status can be either broken or readonly.
if (disk_checker_magic_number == -1)
try
{
pcg32_fast rng(randomSeed());
UInt32 magic_number = rng();
{
auto buf = writeFile(disk_checker_path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
writeIntBinary(magic_number, *buf);
}
disk_checker_magic_number = magic_number;
}
catch (...)
{
LOG_WARNING(
logger,
"Cannot create/write to {0}. Disk {1} is either readonly or broken. Without setting up disk checker file, DiskLocalCheckThread "
"will not be started. Disk is assumed to be RW. Try manually fix the disk and do `SYSTEM RESTART DISK {1}`",
disk_checker_path,
name);
disk_checker_can_check_read = false;
return true;
}
if (disk_checker_magic_number == -1)
throw Exception("disk_checker_magic_number is not initialized. It's a bug", ErrorCodes::LOGICAL_ERROR);
return true;
}
void registerDiskLocal(DiskFactory & factory) void registerDiskLocal(DiskFactory & factory)
{ {
@ -459,17 +650,20 @@ void registerDiskLocal(DiskFactory & factory)
const Poco::Util::AbstractConfiguration & config, const Poco::Util::AbstractConfiguration & config,
const String & config_prefix, const String & config_prefix,
ContextPtr context, ContextPtr context,
const DisksMap & map) -> DiskPtr { const DisksMap & map) -> DiskPtr
{
String path; String path;
UInt64 keep_free_space_bytes; UInt64 keep_free_space_bytes;
loadDiskLocalConfig(name, config, config_prefix, context, path, keep_free_space_bytes); loadDiskLocalConfig(name, config, config_prefix, context, path, keep_free_space_bytes);
for (const auto & [disk_name, disk_ptr] : map) for (const auto & [disk_name, disk_ptr] : map)
{
if (path == disk_ptr->getPath()) if (path == disk_ptr->getPath())
throw Exception("Disk " + name + " and Disk " + disk_name + " cannot have the same path" + " (" + path + ")", ErrorCodes::BAD_ARGUMENTS); throw Exception(ErrorCodes::BAD_ARGUMENTS, "Disk {} and disk {} cannot have the same path ({})", name, disk_name, path);
}
return std::make_shared<DiskLocal>(name, path, keep_free_space_bytes); std::shared_ptr<IDisk> disk
= std::make_shared<DiskLocal>(name, path, keep_free_space_bytes, context, config.getUInt("local_disk_check_period_ms", 0));
disk->startup();
return std::make_shared<DiskRestartProxy>(disk);
}; };
factory.registerDiskType("local", creator); factory.registerDiskType("local", creator);
} }

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <base/logger_useful.h> #include <base/logger_useful.h>
#include <Disks/DiskLocalCheckThread.h>
#include <Disks/IDisk.h> #include <Disks/IDisk.h>
#include <IO/ReadBufferFromFile.h> #include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromFileBase.h> #include <IO/ReadBufferFromFileBase.h>
@ -10,24 +11,22 @@
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
class DiskLocalReservation; class DiskLocalReservation;
class DiskLocal : public IDisk class DiskLocal : public IDisk
{ {
public: public:
friend class DiskLocalCheckThread;
friend class DiskLocalReservation; friend class DiskLocalReservation;
DiskLocal(const String & name_, const String & path_, UInt64 keep_free_space_bytes_) DiskLocal(const String & name_, const String & path_, UInt64 keep_free_space_bytes_);
: name(name_), disk_path(path_), keep_free_space_bytes(keep_free_space_bytes_) DiskLocal(
{ const String & name_,
if (disk_path.back() != '/') const String & path_,
throw Exception("Disk path must end with '/', but '" + disk_path + "' doesn't.", ErrorCodes::LOGICAL_ERROR); UInt64 keep_free_space_bytes_,
} ContextPtr context,
UInt64 local_disk_check_period_ms);
const String & getName() const override { return name; } const String & getName() const override { return name; }
@ -106,13 +105,33 @@ public:
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap &) override; void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap &) override;
bool isBroken() const override { return broken; }
void startup() override;
void shutdown() override;
/// Check if the disk is OK to proceed read/write operations. Currently the check is
/// rudimentary. The more advanced choice would be using
/// https://github.com/smartmontools/smartmontools. However, it's good enough for now.
bool canRead() const noexcept;
bool canWrite() const noexcept;
private: private:
bool tryReserve(UInt64 bytes); bool tryReserve(UInt64 bytes);
private: /// Setup disk for healthy check. Returns true if it's read-write, false if read-only.
/// Throw exception if it's not possible to setup necessary files and directories.
bool setup();
/// Read magic number from disk checker file. Return std::nullopt if exception happens.
std::optional<UInt32> readDiskCheckerMagicNumber() const noexcept;
const String name; const String name;
const String disk_path; const String disk_path;
const String disk_checker_path = ".disk_checker_file";
std::atomic<UInt64> keep_free_space_bytes; std::atomic<UInt64> keep_free_space_bytes;
Poco::Logger * logger;
UInt64 reserved_bytes = 0; UInt64 reserved_bytes = 0;
UInt64 reservation_count = 0; UInt64 reservation_count = 0;
@ -120,6 +139,14 @@ private:
static std::mutex reservation_mutex; static std::mutex reservation_mutex;
Poco::Logger * log = &Poco::Logger::get("DiskLocal"); Poco::Logger * log = &Poco::Logger::get("DiskLocal");
std::atomic<bool> broken{false};
std::atomic<bool> readonly{false};
std::unique_ptr<DiskLocalCheckThread> disk_checker;
/// A magic number to vaguely check if reading operation generates correct result.
/// -1 means there is no available disk_checker_file yet.
Int64 disk_checker_magic_number = -1;
bool disk_checker_can_check_read = true;
}; };

View File

@ -0,0 +1,70 @@
#include <Disks/DiskLocalCheckThread.h>
#include <Disks/DiskLocal.h>
#include <Interpreters/Context.h>
#include <base/logger_useful.h>
namespace DB
{
static const auto DISK_CHECK_ERROR_SLEEP_MS = 1000;
static const auto DISK_CHECK_ERROR_RETRY_TIME = 3;
DiskLocalCheckThread::DiskLocalCheckThread(DiskLocal * disk_, ContextPtr context_, UInt64 local_disk_check_period_ms)
: WithContext(context_)
, disk(std::move(disk_))
, check_period_ms(local_disk_check_period_ms)
, log(&Poco::Logger::get(fmt::format("DiskLocalCheckThread({})", disk->getName())))
{
task = getContext()->getSchedulePool().createTask(log->name(), [this] { run(); });
}
void DiskLocalCheckThread::startup()
{
need_stop = false;
retry = 0;
task->activateAndSchedule();
}
void DiskLocalCheckThread::run()
{
if (need_stop)
return;
bool can_read = disk->canRead();
bool can_write = disk->canWrite();
if (can_read)
{
if (disk->broken)
LOG_INFO(log, "Disk {0} seems to be fine. It can be recovered using `SYSTEM RESTART DISK {0}`", disk->getName());
retry = 0;
if (can_write)
disk->readonly = false;
else
{
disk->readonly = true;
LOG_INFO(log, "Disk {} is readonly", disk->getName());
}
task->scheduleAfter(check_period_ms);
}
else if (!disk->broken && retry < DISK_CHECK_ERROR_RETRY_TIME)
{
++retry;
task->scheduleAfter(DISK_CHECK_ERROR_SLEEP_MS);
}
else
{
retry = 0;
disk->broken = true;
LOG_INFO(log, "Disk {} is broken", disk->getName());
task->scheduleAfter(check_period_ms);
}
}
void DiskLocalCheckThread::shutdown()
{
need_stop = true;
task->deactivate();
LOG_TRACE(log, "DiskLocalCheck thread finished");
}
}

View File

@ -0,0 +1,39 @@
#pragma once
#include <Core/BackgroundSchedulePool.h>
#include <Interpreters/Context_fwd.h>
namespace Poco
{
class Logger;
}
namespace DB
{
class DiskLocal;
class DiskLocalCheckThread : WithContext
{
public:
friend class DiskLocal;
DiskLocalCheckThread(DiskLocal * disk_, ContextPtr context_, UInt64 local_disk_check_period_ms);
void startup();
void shutdown();
private:
bool check();
void run();
DiskLocal * disk;
size_t check_period_ms;
Poco::Logger * log;
std::atomic<bool> need_stop{false};
BackgroundSchedulePool::TaskHolder task;
size_t retry{};
};
}

View File

@ -40,7 +40,12 @@ DiskSelector::DiskSelector(const Poco::Util::AbstractConfiguration & config, con
disks.emplace(disk_name, factory.create(disk_name, config, disk_config_prefix, context, disks)); disks.emplace(disk_name, factory.create(disk_name, config, disk_config_prefix, context, disks));
} }
if (!has_default_disk) if (!has_default_disk)
disks.emplace(default_disk_name, std::make_shared<DiskLocal>(default_disk_name, context->getPath(), 0)); {
disks.emplace(
default_disk_name,
std::make_shared<DiskLocal>(
default_disk_name, context->getPath(), 0, context, config.getUInt("local_disk_check_period_ms", 0)));
}
} }

View File

@ -37,6 +37,12 @@ public:
disks.emplace(name, disk); disks.emplace(name, disk);
} }
void shutdown()
{
for (auto & e : disks)
e.second->shutdown();
}
private: private:
DisksMap disks; DisksMap disks;
}; };

View File

@ -250,6 +250,9 @@ public:
virtual bool isReadOnly() const { return false; } virtual bool isReadOnly() const { return false; }
/// Check if disk is broken. Broken disks will have 0 space and not be used.
virtual bool isBroken() const { return false; }
/// Invoked when Global Context is shutdown. /// Invoked when Global Context is shutdown.
virtual void shutdown() {} virtual void shutdown() {}

View File

@ -60,6 +60,7 @@ public:
DiskPtr getDisk() const { return getDisk(0); } DiskPtr getDisk() const { return getDisk(0); }
virtual DiskPtr getDisk(size_t i) const { return disks[i]; } virtual DiskPtr getDisk(size_t i) const { return disks[i]; }
Disks & getDisks() { return disks; }
const Disks & getDisks() const { return disks; } const Disks & getDisks() const { return disks; }
/// Returns effective value of whether merges are allowed on this volume (true) or not (false). /// Returns effective value of whether merges are allowed on this volume (true) or not (false).

View File

@ -164,10 +164,18 @@ DiskPtr StoragePolicy::getAnyDisk() const
if (volumes.empty()) if (volumes.empty())
throw Exception("Storage policy " + backQuote(name) + " has no volumes. It's a bug.", ErrorCodes::LOGICAL_ERROR); throw Exception("Storage policy " + backQuote(name) + " has no volumes. It's a bug.", ErrorCodes::LOGICAL_ERROR);
if (volumes[0]->getDisks().empty()) for (const auto & volume : volumes)
throw Exception("Volume " + backQuote(name) + "." + backQuote(volumes[0]->getName()) + " has no disks. It's a bug.", ErrorCodes::LOGICAL_ERROR); {
if (volume->getDisks().empty())
throw Exception("Volume '" + volume->getName() + "' has no disks. It's a bug", ErrorCodes::LOGICAL_ERROR);
for (const auto & disk : volume->getDisks())
{
if (!disk->isBroken())
return disk;
}
}
return volumes[0]->getDisks()[0]; throw Exception(ErrorCodes::NOT_ENOUGH_SPACE, "All disks in storage policy {} are broken", name);
} }
@ -233,6 +241,10 @@ ReservationPtr StoragePolicy::makeEmptyReservationOnLargestDisk() const
} }
} }
} }
if (!max_disk)
throw Exception(
"There is no space on any disk in storage policy: " + name + ". It's likely all disks are broken",
ErrorCodes::NOT_ENOUGH_SPACE);
auto reservation = max_disk->reserve(0); auto reservation = max_disk->reserve(0);
if (!reservation) if (!reservation)
{ {

View File

@ -2650,6 +2650,19 @@ void Context::shutdown()
} }
} }
// Special volumes might also use disks that require shutdown.
for (const auto & volume : {shared->tmp_volume, shared->backups_volume})
{
if (volume)
{
auto & disks = volume->getDisks();
for (auto & disk : disks)
{
disk->shutdown();
}
}
}
shared->shutdown(); shared->shutdown();
} }

View File

@ -1123,6 +1123,7 @@ UInt64 IMergeTreeDataPart::calculateTotalSizeOnDisk(const DiskPtr & disk_, const
void IMergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_new_dir_if_exists) const void IMergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_new_dir_if_exists) const
try
{ {
assertOnDisk(); assertOnDisk();
@ -1159,6 +1160,18 @@ void IMergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_
storage.lockSharedData(*this); storage.lockSharedData(*this);
} }
catch (...)
{
if (startsWith(new_relative_path, "detached/"))
{
// Don't throw when the destination is to the detached folder. It might be able to
// recover in some cases, such as fetching parts into multi-disks while some of the
// disks are broken.
tryLogCurrentException(__PRETTY_FUNCTION__);
}
else
throw;
}
void IMergeTreeDataPart::cleanupOldName(const String & old_part_name) const void IMergeTreeDataPart::cleanupOldName(const String & old_part_name) const
{ {

View File

@ -264,9 +264,13 @@ MergeTreeData::MergeTreeData(
/// Creating directories, if not exist. /// Creating directories, if not exist.
for (const auto & disk : getDisks()) for (const auto & disk : getDisks())
{ {
if (disk->isBroken())
continue;
disk->createDirectories(relative_data_path); disk->createDirectories(relative_data_path);
disk->createDirectories(fs::path(relative_data_path) / MergeTreeData::DETACHED_DIR_NAME); disk->createDirectories(fs::path(relative_data_path) / MergeTreeData::DETACHED_DIR_NAME);
String current_version_file_path = fs::path(relative_data_path) / MergeTreeData::FORMAT_VERSION_FILE_NAME; String current_version_file_path = fs::path(relative_data_path) / MergeTreeData::FORMAT_VERSION_FILE_NAME;
if (disk->exists(current_version_file_path)) if (disk->exists(current_version_file_path))
{ {
if (!version_file.first.empty()) if (!version_file.first.empty())
@ -1195,6 +1199,9 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
for (const auto & [disk_name, disk] : getContext()->getDisksMap()) for (const auto & [disk_name, disk] : getContext()->getDisksMap())
{ {
if (disk->isBroken())
continue;
if (defined_disk_names.count(disk_name) == 0 && disk->exists(relative_data_path)) if (defined_disk_names.count(disk_name) == 0 && disk->exists(relative_data_path))
{ {
for (const auto it = disk->iterateDirectory(relative_data_path); it->isValid(); it->next()) for (const auto it = disk->iterateDirectory(relative_data_path); it->isValid(); it->next())
@ -1215,6 +1222,9 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
std::mutex wal_init_lock; std::mutex wal_init_lock;
for (const auto & disk_ptr : disks) for (const auto & disk_ptr : disks)
{ {
if (disk_ptr->isBroken())
continue;
auto & disk_parts = disk_part_map[disk_ptr->getName()]; auto & disk_parts = disk_part_map[disk_ptr->getName()];
auto & disk_wal_parts = disk_wal_part_map[disk_ptr->getName()]; auto & disk_wal_parts = disk_wal_part_map[disk_ptr->getName()];
@ -1287,6 +1297,9 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
if (!parts_from_wal.empty()) if (!parts_from_wal.empty())
loadDataPartsFromWAL(broken_parts_to_detach, duplicate_parts_to_remove, parts_from_wal, part_lock); loadDataPartsFromWAL(broken_parts_to_detach, duplicate_parts_to_remove, parts_from_wal, part_lock);
for (auto & part : duplicate_parts_to_remove)
part->remove();
for (auto & part : broken_parts_to_detach) for (auto & part : broken_parts_to_detach)
part->renameToDetached("broken-on-start"); /// detached parts must not have '_' in prefixes part->renameToDetached("broken-on-start"); /// detached parts must not have '_' in prefixes
@ -1381,6 +1394,9 @@ size_t MergeTreeData::clearOldTemporaryDirectories(const MergeTreeDataMergerMuta
/// Delete temporary directories older than a day. /// Delete temporary directories older than a day.
for (const auto & disk : getDisks()) for (const auto & disk : getDisks())
{ {
if (disk->isBroken())
continue;
for (auto it = disk->iterateDirectory(relative_data_path); it->isValid(); it->next()) for (auto it = disk->iterateDirectory(relative_data_path); it->isValid(); it->next())
{ {
const std::string & basename = it->name(); const std::string & basename = it->name();
@ -1654,6 +1670,8 @@ size_t MergeTreeData::clearOldWriteAheadLogs()
for (auto disk_it = disks.rbegin(); disk_it != disks.rend(); ++disk_it) for (auto disk_it = disks.rbegin(); disk_it != disks.rend(); ++disk_it)
{ {
auto disk_ptr = *disk_it; auto disk_ptr = *disk_it;
if (disk_ptr->isBroken())
continue;
for (auto it = disk_ptr->iterateDirectory(relative_data_path); it->isValid(); it->next()) for (auto it = disk_ptr->iterateDirectory(relative_data_path); it->isValid(); it->next())
{ {
auto min_max_block_number = MergeTreeWriteAheadLog::tryParseMinMaxBlockNumber(it->name()); auto min_max_block_number = MergeTreeWriteAheadLog::tryParseMinMaxBlockNumber(it->name());
@ -1736,6 +1754,9 @@ void MergeTreeData::dropAllData()
for (const auto & disk : getDisks()) for (const auto & disk : getDisks())
{ {
if (disk->isBroken())
continue;
try try
{ {
disk->removeRecursive(relative_data_path); disk->removeRecursive(relative_data_path);
@ -1770,6 +1791,8 @@ void MergeTreeData::dropIfEmpty()
{ {
for (const auto & disk : getDisks()) for (const auto & disk : getDisks())
{ {
if (disk->isBroken())
continue;
/// Non recursive, exception is thrown if there are more files. /// Non recursive, exception is thrown if there are more files.
disk->removeFileIfExists(fs::path(relative_data_path) / MergeTreeData::FORMAT_VERSION_FILE_NAME); disk->removeFileIfExists(fs::path(relative_data_path) / MergeTreeData::FORMAT_VERSION_FILE_NAME);
disk->removeDirectory(fs::path(relative_data_path) / MergeTreeData::DETACHED_DIR_NAME); disk->removeDirectory(fs::path(relative_data_path) / MergeTreeData::DETACHED_DIR_NAME);
@ -5150,6 +5173,23 @@ Strings MergeTreeData::getDataPaths() const
} }
void MergeTreeData::reportBrokenPart(MergeTreeData::DataPartPtr & data_part) const
{
if (data_part->volume && data_part->volume->getDisk()->isBroken())
{
auto disk = data_part->volume->getDisk();
auto parts = getDataParts();
LOG_WARNING(log, "Scanning parts to recover on broken disk {}.", disk->getName() + "@" + disk->getPath());
for (const auto & part : parts)
{
if (part->volume && part->volume->getDisk()->getName() == disk->getName())
broken_part_callback(part->name);
}
}
else
broken_part_callback(data_part->name);
}
MergeTreeData::MatcherFn MergeTreeData::getPartitionMatcher(const ASTPtr & partition_ast, ContextPtr local_context) const MergeTreeData::MatcherFn MergeTreeData::getPartitionMatcher(const ASTPtr & partition_ast, ContextPtr local_context) const
{ {
bool prefixed = false; bool prefixed = false;

View File

@ -605,6 +605,10 @@ public:
broken_part_callback(name); broken_part_callback(name);
} }
/// Same as above but has the ability to check all other parts
/// which reside on the same disk of the suspicious part.
void reportBrokenPart(MergeTreeData::DataPartPtr & data_part) const;
/// TODO (alesap) Duplicate method required for compatibility. /// TODO (alesap) Duplicate method required for compatibility.
/// Must be removed. /// Must be removed.
static ASTPtr extractKeyExpressionList(const ASTPtr & node) static ASTPtr extractKeyExpressionList(const ASTPtr & node)

View File

@ -113,7 +113,7 @@ bool MergeTreePartsMover::selectPartsForMove(
UInt64 required_maximum_available_space = disk->getTotalSpace() * policy->getMoveFactor(); UInt64 required_maximum_available_space = disk->getTotalSpace() * policy->getMoveFactor();
UInt64 unreserved_space = disk->getUnreservedSpace(); UInt64 unreserved_space = disk->getUnreservedSpace();
if (unreserved_space < required_maximum_available_space) if (unreserved_space < required_maximum_available_space && !disk->isBroken())
need_to_move.emplace(disk, required_maximum_available_space - unreserved_space); need_to_move.emplace(disk, required_maximum_available_space - unreserved_space);
} }
} }

View File

@ -118,7 +118,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
} }
catch (...) catch (...)
{ {
storage.reportBrokenPart(data_part->name); storage.reportBrokenPart(data_part);
throw; throw;
} }
} }
@ -175,7 +175,7 @@ size_t MergeTreeReaderCompact::readRows(
catch (Exception & e) catch (Exception & e)
{ {
if (e.code() != ErrorCodes::MEMORY_LIMIT_EXCEEDED) if (e.code() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
storage.reportBrokenPart(data_part->name); storage.reportBrokenPart(data_part);
/// Better diagnostics. /// Better diagnostics.
e.addMessage("(while reading column " + column_from_part.name + ")"); e.addMessage("(while reading column " + column_from_part.name + ")");
@ -183,7 +183,7 @@ size_t MergeTreeReaderCompact::readRows(
} }
catch (...) catch (...)
{ {
storage.reportBrokenPart(data_part->name); storage.reportBrokenPart(data_part);
throw; throw;
} }
} }

View File

@ -56,7 +56,7 @@ MergeTreeReaderWide::MergeTreeReaderWide(
} }
catch (...) catch (...)
{ {
storage.reportBrokenPart(data_part->name); storage.reportBrokenPart(data_part);
throw; throw;
} }
} }
@ -144,7 +144,7 @@ size_t MergeTreeReaderWide::readRows(
catch (Exception & e) catch (Exception & e)
{ {
if (e.code() != ErrorCodes::MEMORY_LIMIT_EXCEEDED) if (e.code() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
storage.reportBrokenPart(data_part->name); storage.reportBrokenPart(data_part);
/// Better diagnostics. /// Better diagnostics.
e.addMessage("(while reading from part " + data_part->getFullPath() + " " e.addMessage("(while reading from part " + data_part->getFullPath() + " "
@ -154,7 +154,7 @@ size_t MergeTreeReaderWide::readRows(
} }
catch (...) catch (...)
{ {
storage.reportBrokenPart(data_part->name); storage.reportBrokenPart(data_part);
throw; throw;
} }

View File

@ -40,7 +40,7 @@ catch (...)
{ {
/// Suspicion of the broken part. A part is added to the queue for verification. /// Suspicion of the broken part. A part is added to the queue for verification.
if (getCurrentExceptionCode() != ErrorCodes::MEMORY_LIMIT_EXCEEDED) if (getCurrentExceptionCode() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
storage.reportBrokenPart(data_part->name); storage.reportBrokenPart(data_part);
throw; throw;
} }

View File

@ -126,7 +126,7 @@ catch (...)
{ {
/// Suspicion of the broken part. A part is added to the queue for verification. /// Suspicion of the broken part. A part is added to the queue for verification.
if (getCurrentExceptionCode() != ErrorCodes::MEMORY_LIMIT_EXCEEDED) if (getCurrentExceptionCode() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
storage.reportBrokenPart(data_part->name); storage.reportBrokenPart(data_part);
throw; throw;
} }

View File

@ -466,6 +466,8 @@ void ReplicatedMergeTreePartCheckThread::run()
} }
} }
storage.checkBrokenDisks();
task->schedule(); task->schedule();
} }
catch (const Coordination::Exception & e) catch (const Coordination::Exception & e)

View File

@ -7027,6 +7027,50 @@ CheckResults StorageReplicatedMergeTree::checkData(const ASTPtr & query, Context
} }
void StorageReplicatedMergeTree::checkBrokenDisks()
{
auto disks = getStoragePolicy()->getDisks();
std::unique_ptr<DataPartsVector> parts;
for (auto disk_it = disks.rbegin(); disk_it != disks.rend(); ++disk_it)
{
auto disk_ptr = *disk_it;
if (disk_ptr->isBroken())
{
{
std::unique_lock lock(last_broken_disks_mutex);
if (!last_broken_disks.insert(disk_ptr->getName()).second)
continue;
}
LOG_INFO(log, "Scanning parts to recover on broken disk {} with path {}", disk_ptr->getName(), disk_ptr->getPath());
if (!parts)
parts = std::make_unique<DataPartsVector>(getDataPartsVector());
for (auto & part : *parts)
{
if (part->volume && part->volume->getDisk()->getName() == disk_ptr->getName())
broken_part_callback(part->name);
}
continue;
}
else
{
{
std::unique_lock lock(last_broken_disks_mutex);
if (last_broken_disks.erase(disk_ptr->getName()) > 0)
LOG_INFO(
log,
"Disk {} with path {} is recovered. Exclude it from last_broken_disks",
disk_ptr->getName(),
disk_ptr->getPath());
}
}
}
}
bool StorageReplicatedMergeTree::canUseAdaptiveGranularity() const bool StorageReplicatedMergeTree::canUseAdaptiveGranularity() const
{ {
const auto storage_settings_ptr = getSettings(); const auto storage_settings_ptr = getSettings();

View File

@ -283,6 +283,9 @@ public:
static const String getDefaultZooKeeperName() { return default_zookeeper_name; } static const String getDefaultZooKeeperName() { return default_zookeeper_name; }
/// Check if there are new broken disks and enqueue part recovery tasks.
void checkBrokenDisks();
private: private:
std::atomic_bool are_restoring_replica {false}; std::atomic_bool are_restoring_replica {false};
@ -418,6 +421,9 @@ private:
/// Global ID, synced via ZooKeeper between replicas /// Global ID, synced via ZooKeeper between replicas
UUID table_shared_id; UUID table_shared_id;
std::mutex last_broken_disks_mutex;
std::set<String> last_broken_disks;
template <class Func> template <class Func>
void foreachActiveParts(Func && func, bool select_sequential_consistency) const; void foreachActiveParts(Func && func, bool select_sequential_consistency) const;

View File

@ -0,0 +1,30 @@
<yandex>
<local_disk_check_period_ms>1000</local_disk_check_period_ms>
<storage_configuration>
<disks>
<default>
<keep_free_space_bytes>1024</keep_free_space_bytes>
</default>
<jbod1>
<path>/jbod1/</path>
</jbod1>
<jbod2>
<path>/jbod2/</path>
</jbod2>
<jbod3>
<path>/jbod3/</path>
</jbod3>
</disks>
<policies>
<jbod>
<volumes>
<jbod_volume>
<disk>jbod1</disk>
<disk>jbod2</disk>
<disk>jbod3</disk>
</jbod_volume>
</volumes>
</jbod>
</policies>
</storage_configuration>
</yandex>

View File

@ -0,0 +1,111 @@
import json
import random
import re
import string
import threading
import time
from multiprocessing.dummy import Pool
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
main_configs=["configs/config.d/storage_configuration.xml",],
with_zookeeper=True,
stay_alive=True,
tmpfs=["/jbod1:size=100M", "/jbod2:size=100M", "/jbod3:size=100M"],
macros={"shard": 0, "replica": 1},
)
node2 = cluster.add_instance(
"node2",
main_configs=["configs/config.d/storage_configuration.xml"],
with_zookeeper=True,
stay_alive=True,
tmpfs=["/jbod1:size=100M", "/jbod2:size=100M", "/jbod3:size=100M"],
macros={"shard": 0, "replica": 2},
)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_jbod_ha(start_cluster):
try:
for i, node in enumerate([node1, node2]):
node.query(
"""
CREATE TABLE tbl (p UInt8, d String)
ENGINE = ReplicatedMergeTree('/clickhouse/tbl', '{}')
PARTITION BY p
ORDER BY tuple()
SETTINGS
storage_policy = 'jbod',
old_parts_lifetime = 1,
cleanup_delay_period = 1,
cleanup_delay_period_random_add = 2,
max_bytes_to_merge_at_max_space_in_pool = 4096
""".format(
i
)
)
for i in range(50):
# around 1k per block
node1.query(
"insert into tbl select randConstant() % 2, randomPrintableASCII(16) from numbers(50)"
)
node2.query("SYSTEM SYNC REPLICA tbl", timeout=10)
# mimic disk failure
node1.exec_in_container(
["bash", "-c", "chmod -R 000 /jbod1"], privileged=True, user="root"
)
time.sleep(3)
# after 3 seconds jbod1 will be set as broken disk. Let's wait for another 5 seconds for data to be recovered
time.sleep(5)
assert (
int(
node1.query("select total_space from system.disks where name = 'jbod1'")
)
== 0
)
assert int(node1.query("select count(p) from tbl")) == 2500
# mimic disk recovery
node1.exec_in_container(
["bash", "-c", "chmod -R 755 /jbod1"],
privileged=True,
user="root",
)
node1.query("system restart disk jbod1")
time.sleep(5)
assert (
int(
node1.query("select total_space from system.disks where name = 'jbod1'")
)
> 0
)
finally:
for node in [node1, node2]:
node.query("DROP TABLE IF EXISTS tbl SYNC")