Merge pull request #42660 from ClickHouse/storage-system-moves

Add active moves system table
This commit is contained in:
Sergei Trifonov 2022-11-22 15:07:22 +01:00 committed by GitHub
commit 1f615e4588
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 214 additions and 5 deletions

View File

@ -5,6 +5,7 @@
#define APPLY_FOR_METRICS(M) \
M(Query, "Number of executing queries") \
M(Merge, "Number of executing background merges") \
M(Move, "Number of currently executing moves") \
M(PartMutation, "Number of mutations (ALTER DELETE/UPDATE)") \
M(ReplicatedFetch, "Number of data parts being fetched from replica") \
M(ReplicatedSend, "Number of data parts being sent to replicas") \

View File

@ -24,6 +24,7 @@
#include <Storages/IStorage.h>
#include <Storages/MarkCache.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/MovesList.h>
#include <Storages/MergeTree/ReplicatedFetchList.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
@ -229,6 +230,7 @@ struct ContextSharedPart : boost::noncopyable
ProcessList process_list; /// Executing queries at the moment.
GlobalOvercommitTracker global_overcommit_tracker;
MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree)
MovesList moves_list; /// The list of executing moves (for (Replicated)?MergeTree)
ReplicatedFetchList replicated_fetch_list;
ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections.
InterserverIOHandler interserver_io_handler; /// Handler for interserver communication.
@ -637,6 +639,8 @@ const ProcessList & Context::getProcessList() const { return shared->process_lis
OvercommitTracker * Context::getGlobalOvercommitTracker() const { return &shared->global_overcommit_tracker; }
MergeList & Context::getMergeList() { return shared->merge_list; }
const MergeList & Context::getMergeList() const { return shared->merge_list; }
MovesList & Context::getMovesList() { return shared->moves_list; }
const MovesList & Context::getMovesList() const { return shared->moves_list; }
ReplicatedFetchList & Context::getReplicatedFetchList() { return shared->replicated_fetch_list; }
const ReplicatedFetchList & Context::getReplicatedFetchList() const { return shared->replicated_fetch_list; }

View File

@ -63,6 +63,7 @@ using InterserverCredentialsPtr = std::shared_ptr<const InterserverCredentials>;
class InterserverIOHandler;
class BackgroundSchedulePool;
class MergeList;
class MovesList;
class ReplicatedFetchList;
class Cluster;
class Compiler;
@ -775,6 +776,9 @@ public:
MergeList & getMergeList();
const MergeList & getMergeList() const;
MovesList & getMovesList();
const MovesList & getMovesList() const;
ReplicatedFetchList & getReplicatedFetchList();
const ReplicatedFetchList & getReplicatedFetchList() const;

View File

@ -10,7 +10,7 @@ namespace DB
{
/// Common code for background processes lists, like system.merges and system.replicated_fetches
/// Look at examples in MergeList and ReplicatedFetchList
/// Look at examples in MergeList, MovesList and ReplicatedFetchList
template <typename ListElement, typename Info>
class BackgroundProcessList;

View File

@ -6650,7 +6650,7 @@ MergeTreeData::CurrentlyMovingPartsTagger::CurrentlyMovingPartsTagger(MergeTreeM
MergeTreeData::CurrentlyMovingPartsTagger::~CurrentlyMovingPartsTagger()
{
std::lock_guard lock(data.moving_parts_mutex);
for (const auto & moving_part : parts_to_move)
for (auto & moving_part : parts_to_move)
{
/// Something went completely wrong
if (!data.currently_moving_parts.contains(moving_part.part))
@ -6776,6 +6776,14 @@ bool MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagge
nullptr);
};
// Register in global moves list (StorageSystemMoves)
auto moves_list_entry = getContext()->getMovesList().insert(
getStorageID(),
moving_part.part->name,
moving_part.reserved_space->getDisk()->getName(),
moving_part.reserved_space->getDisk()->getPath(),
moving_part.part->getBytesOnDisk());
try
{
/// If zero-copy replication enabled than replicas shouldn't try to

View File

@ -5,14 +5,14 @@
#include <vector>
#include <Disks/StoragePolicy.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MovesList.h>
#include <Common/ActionBlocker.h>
namespace DB
{
/// Active part from storage and destination reservation where
/// it have to be moved.
/// Active part from storage and destination reservation where it has to be moved
struct MergeTreeMoveEntry
{
std::shared_ptr<const IMergeTreeDataPart> part;

View File

@ -0,0 +1,37 @@
#include <Storages/MergeTree/MovesList.h>
#include <Common/CurrentMetrics.h>
#include <base/getThreadId.h>
namespace DB
{
MovesListElement::MovesListElement(
const StorageID & table_id_,
const std::string & part_name_,
const std::string & target_disk_name_,
const std::string & target_disk_path_,
UInt64 part_size_)
: table_id(table_id_)
, part_name(part_name_)
, target_disk_name(target_disk_name_)
, target_disk_path(target_disk_path_)
, part_size(part_size_)
, thread_id(getThreadId())
{
}
MoveInfo MovesListElement::getInfo() const
{
MoveInfo res;
res.database = table_id.database_name;
res.table = table_id.table_name;
res.part_name = part_name;
res.target_disk_name = target_disk_name;
res.target_disk_path = target_disk_path;
res.part_size = part_size;
res.elapsed = watch.elapsedSeconds();
res.thread_id = thread_id;
return res;
}
}

View File

@ -0,0 +1,64 @@
#pragma once
#include <Storages/MergeTree/BackgroundProcessList.h>
#include <Interpreters/StorageID.h>
#include <Common/Stopwatch.h>
#include <Common/CurrentMetrics.h>
#include <Poco/URI.h>
#include <boost/noncopyable.hpp>
namespace CurrentMetrics
{
extern const Metric Move;
}
namespace DB
{
struct MoveInfo
{
std::string database;
std::string table;
std::string part_name;
std::string target_disk_name;
std::string target_disk_path;
UInt64 part_size;
Float64 elapsed;
UInt64 thread_id;
};
struct MovesListElement : private boost::noncopyable
{
const StorageID table_id;
const std::string part_name;
const std::string target_disk_name;
const std::string target_disk_path;
const UInt64 part_size;
Stopwatch watch;
const UInt64 thread_id;
MovesListElement(
const StorageID & table_id_,
const std::string & part_name_,
const std::string & target_disk_name_,
const std::string & target_disk_path_,
UInt64 part_size_);
MoveInfo getInfo() const;
};
/// List of currently processing moves
class MovesList final : public BackgroundProcessList<MovesListElement, MoveInfo>
{
private:
using Parent = BackgroundProcessList<MovesListElement, MoveInfo>;
public:
MovesList()
: Parent(CurrentMetrics::Move)
{}
};
}

View File

@ -0,0 +1,47 @@
#include <Interpreters/Context.h>
#include <Storages/MergeTree/MovesList.h>
#include <Storages/System/StorageSystemMoves.h>
#include <Access/ContextAccess.h>
namespace DB
{
NamesAndTypesList StorageSystemMoves::getNamesAndTypes()
{
return {
{"database", std::make_shared<DataTypeString>()},
{"table", std::make_shared<DataTypeString>()},
{"elapsed", std::make_shared<DataTypeFloat64>()},
{"target_disk_name", std::make_shared<DataTypeString>()},
{"target_disk_path", std::make_shared<DataTypeString>()},
{"part_name", std::make_shared<DataTypeString>()},
{"part_size", std::make_shared<DataTypeUInt64>()},
{"thread_id", std::make_shared<DataTypeUInt64>()},
};
}
void StorageSystemMoves::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
{
const auto access = context->getAccess();
const bool check_access_for_tables = !access->isGranted(AccessType::SHOW_TABLES);
for (const auto & move : context->getMovesList().get())
{
if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, move.database, move.table))
continue;
size_t i = 0;
res_columns[i++]->insert(move.database);
res_columns[i++]->insert(move.table);
res_columns[i++]->insert(move.elapsed);
res_columns[i++]->insert(move.target_disk_name);
res_columns[i++]->insert(move.target_disk_path);
res_columns[i++]->insert(move.part_name);
res_columns[i++]->insert(move.part_size);
res_columns[i++]->insert(move.thread_id);
}
}
}

View File

@ -0,0 +1,28 @@
#pragma once
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <Storages/System/IStorageSystemOneBlock.h>
namespace DB
{
class Context;
class StorageSystemMoves final : public IStorageSystemOneBlock<StorageSystemMoves>
{
public:
std::string getName() const override { return "SystemMoves"; }
static NamesAndTypesList getNamesAndTypes();
protected:
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
};
}

View File

@ -23,6 +23,7 @@
#include <Storages/System/StorageSystemGraphite.h>
#include <Storages/System/StorageSystemMacros.h>
#include <Storages/System/StorageSystemMerges.h>
#include <Storages/System/StorageSystemMoves.h>
#include <Storages/System/StorageSystemReplicatedFetches.h>
#include <Storages/System/StorageSystemMetrics.h>
#include <Storages/System/StorageSystemModels.h>
@ -159,6 +160,7 @@ void attachSystemTablesServer(ContextPtr context, IDatabase & system_database, b
attach<StorageSystemProcesses>(context, system_database, "processes");
attach<StorageSystemMetrics>(context, system_database, "metrics");
attach<StorageSystemMerges>(context, system_database, "merges");
attach<StorageSystemMoves>(context, system_database, "moves");
attach<StorageSystemMutations>(context, system_database, "mutations");
attach<StorageSystemReplicas>(context, system_database, "replicas");
attach<StorageSystemReplicationQueue>(context, system_database, "replication_queue");

View File

@ -371,6 +371,19 @@ CREATE TABLE system.metrics
)
ENGINE = SystemMetrics
COMMENT 'SYSTEM TABLE is built on the fly.'
CREATE TABLE system.moves
(
`database` String,
`table` String,
`elapsed` Float64,
`target_disk_name` String,
`target_disk_path` String,
`part_name` String,
`part_size` UInt64,
`thread_id` UInt64
)
ENGINE = SystemMoves
COMMENT 'SYSTEM TABLE is built on the fly.'
CREATE TABLE system.mutations
(
`database` String,

View File

@ -45,6 +45,7 @@ show create table macros format TSVRaw;
show create table merge_tree_settings format TSVRaw;
show create table merges format TSVRaw;
show create table metrics format TSVRaw;
show create table moves format TSVRaw;
show create table mutations format TSVRaw;
show create table numbers format TSVRaw;
show create table numbers_mt format TSVRaw;