Add active moves system table

This commit is contained in:
serxa 2022-10-25 13:56:46 +00:00
parent b3b8123f73
commit 55a86bc2de
11 changed files with 191 additions and 5 deletions

View File

@ -5,6 +5,7 @@
#define APPLY_FOR_METRICS(M) \
M(Query, "Number of executing queries") \
M(Merge, "Number of executing background merges") \
M(Move, "Number of currently executing moves") \
M(PartMutation, "Number of mutations (ALTER DELETE/UPDATE)") \
M(ReplicatedFetch, "Number of data parts being fetched from replica") \
M(ReplicatedSend, "Number of data parts being sent to replicas") \

View File

@ -23,6 +23,7 @@
#include <Storages/IStorage.h>
#include <Storages/MarkCache.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/MovesList.h>
#include <Storages/MergeTree/ReplicatedFetchList.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
@ -221,6 +222,7 @@ struct ContextSharedPart
ProcessList process_list; /// Executing queries at the moment.
GlobalOvercommitTracker global_overcommit_tracker;
MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree)
MovesList moves_list; /// The list of executing moves (for (Replicated)?MergeTree)
ReplicatedFetchList replicated_fetch_list;
ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections.
InterserverIOHandler interserver_io_handler; /// Handler for interserver communication.
@ -551,6 +553,8 @@ const ProcessList & Context::getProcessList() const { return shared->process_lis
OvercommitTracker * Context::getGlobalOvercommitTracker() const { return &shared->global_overcommit_tracker; }
MergeList & Context::getMergeList() { return shared->merge_list; }
const MergeList & Context::getMergeList() const { return shared->merge_list; }
MovesList & Context::getMovesList() { return shared->moves_list; }
const MovesList & Context::getMovesList() const { return shared->moves_list; }
ReplicatedFetchList & Context::getReplicatedFetchList() { return shared->replicated_fetch_list; }
const ReplicatedFetchList & Context::getReplicatedFetchList() const { return shared->replicated_fetch_list; }

View File

@ -60,6 +60,7 @@ using InterserverCredentialsPtr = std::shared_ptr<const InterserverCredentials>;
class InterserverIOHandler;
class BackgroundSchedulePool;
class MergeList;
class MovesList;
class ReplicatedFetchList;
class Cluster;
class Compiler;
@ -754,6 +755,9 @@ public:
MergeList & getMergeList();
const MergeList & getMergeList() const;
MovesList & getMovesList();
const MovesList & getMovesList() const;
ReplicatedFetchList & getReplicatedFetchList();
const ReplicatedFetchList & getReplicatedFetchList() const;

View File

@ -10,7 +10,7 @@ namespace DB
{
/// Common code for background processes lists, like system.merges and system.replicated_fetches
/// Look at examples in MergeList and ReplicatedFetchList
/// Look at examples in MergeList, MovesList and ReplicatedFetchList
template <typename ListElement, typename Info>
class BackgroundProcessList;

View File

@ -6511,17 +6511,22 @@ MergeTreeData::CurrentlyMovingPartsTagger::CurrentlyMovingPartsTagger(MergeTreeM
for (const auto & moving_part : parts_to_move)
if (!data.currently_moving_parts.emplace(moving_part.part).second)
throw Exception("Cannot move part '" + moving_part.part->name + "'. It's already moving.", ErrorCodes::LOGICAL_ERROR);
// Register in global moves list (StorageSystemMoves)
for (auto & moving_part : parts_to_move)
moving_part.moves_list_entry = data.getContext()->getMovesList().insert(data.getStorageID(), moving_part.part->name);
}
MergeTreeData::CurrentlyMovingPartsTagger::~CurrentlyMovingPartsTagger()
{
std::lock_guard lock(data.moving_parts_mutex);
for (const auto & moving_part : parts_to_move)
for (auto & moving_part : parts_to_move)
{
/// Something went completely wrong
if (!data.currently_moving_parts.contains(moving_part.part))
std::terminate();
data.currently_moving_parts.erase(moving_part.part);
moving_part.moves_list_entry.reset(); // Unregister from global moves list
}
}

View File

@ -5,18 +5,19 @@
#include <vector>
#include <Disks/StoragePolicy.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MovesList.h>
#include <Common/ActionBlocker.h>
namespace DB
{
/// Active part from storage and destination reservation where
/// it have to be moved.
/// Active part from storage and destination reservation where it has to be moved
struct MergeTreeMoveEntry
{
std::shared_ptr<const IMergeTreeDataPart> part;
ReservationPtr reserved_space;
MovesList::EntryPtr moves_list_entry;
MergeTreeMoveEntry(const std::shared_ptr<const IMergeTreeDataPart> & part_, ReservationPtr reservation_)
: part(part_), reserved_space(std::move(reservation_))
@ -54,7 +55,7 @@ public:
/// Replaces cloned part from detached directory into active data parts set.
/// Replacing part changes state to DeleteOnDestroy and will be removed from disk after destructor of
///IMergeTreeDataPart called. If replacing part doesn't exists or not active (committed) than
/// IMergeTreeDataPart called. If replacing part doesn't exists or not active (committed) than
/// cloned part will be removed and log message will be reported. It may happen in case of concurrent
/// merge or mutation.
void swapClonedPart(const MergeTreeDataPartPtr & cloned_parts) const;

View File

@ -0,0 +1,30 @@
#include <Storages/MergeTree/MovesList.h>
#include <Common/CurrentMetrics.h>
#include <base/getThreadId.h>
namespace DB
{
MovesListElement::MovesListElement(
const StorageID & table_id_, const std::string & partition_id_)
: table_id(table_id_)
, partition_id(partition_id_)
, thread_id(getThreadId())
{
}
MoveInfo MovesListElement::getInfo() const
{
MoveInfo res;
res.database = table_id.database_name;
res.table = table_id.table_name;
res.partition_id = partition_id;
res.result_part_name = result_part_name;
res.result_part_path = result_part_path;
res.part_size = part_size;
res.elapsed = watch.elapsedSeconds();
res.thread_id = thread_id;
return res;
}
}

View File

@ -0,0 +1,64 @@
#pragma once
#include <Storages/MergeTree/BackgroundProcessList.h>
#include <Interpreters/StorageID.h>
#include <Common/Stopwatch.h>
#include <Common/CurrentMetrics.h>
#include <Poco/URI.h>
#include <boost/noncopyable.hpp>
namespace CurrentMetrics
{
extern const Metric Move;
}
namespace DB
{
struct MoveInfo
{
std::string database;
std::string table;
std::string partition_id;
std::string result_part_name;
std::string result_part_path;
UInt64 part_size;
Float64 elapsed;
UInt64 thread_id;
};
struct MovesListElement : private boost::noncopyable
{
const StorageID table_id;
const std::string partition_id;
// TODO(serxa): fill it
std::string result_part_name;
std::string result_part_path;
UInt64 part_size{};
Stopwatch watch;
const UInt64 thread_id;
MovesListElement(
const StorageID & table_id_,
const std::string & partition_id_);
MoveInfo getInfo() const;
};
/// List of currently processing moves
class MovesList final : public BackgroundProcessList<MovesListElement, MoveInfo>
{
private:
using Parent = BackgroundProcessList<MovesListElement, MoveInfo>;
public:
MovesList()
: Parent(CurrentMetrics::Move)
{}
};
}

View File

@ -0,0 +1,47 @@
#include <Interpreters/Context.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/System/StorageSystemMoves.h>
#include <Access/ContextAccess.h>
namespace DB
{
NamesAndTypesList StorageSystemMoves::getNamesAndTypes()
{
return {
{"database", std::make_shared<DataTypeString>()},
{"table", std::make_shared<DataTypeString>()},
{"elapsed", std::make_shared<DataTypeFloat64>()},
{"result_part_name", std::make_shared<DataTypeString>()},
{"result_part_path", std::make_shared<DataTypeString>()},
{"partition_id", std::make_shared<DataTypeString>()},
{"part_size", std::make_shared<DataTypeUInt64>()},
{"thread_id", std::make_shared<DataTypeUInt64>()},
};
}
void StorageSystemMoves::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
{
const auto access = context->getAccess();
const bool check_access_for_tables = !access->isGranted(AccessType::SHOW_TABLES);
for (const auto & move : context->getMovesList().get())
{
if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, move.database, move.table))
continue;
size_t i = 0;
res_columns[i++]->insert(move.database);
res_columns[i++]->insert(move.table);
res_columns[i++]->insert(move.elapsed);
res_columns[i++]->insert(move.result_part_name);
res_columns[i++]->insert(move.result_part_path);
res_columns[i++]->insert(move.partition_id);
res_columns[i++]->insert(move.part_size);
res_columns[i++]->insert(move.thread_id);
}
}
}

View File

@ -0,0 +1,28 @@
#pragma once
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <Storages/System/IStorageSystemOneBlock.h>
namespace DB
{
class Context;
class StorageSystemMoves final : public IStorageSystemOneBlock<StorageSystemMoves>
{
public:
std::string getName() const override { return "SystemMoves"; }
static NamesAndTypesList getNamesAndTypes();
protected:
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
};
}

View File

@ -23,6 +23,7 @@
#include <Storages/System/StorageSystemGraphite.h>
#include <Storages/System/StorageSystemMacros.h>
#include <Storages/System/StorageSystemMerges.h>
#include <Storages/System/StorageSystemMoves.h>
#include <Storages/System/StorageSystemReplicatedFetches.h>
#include <Storages/System/StorageSystemMetrics.h>
#include <Storages/System/StorageSystemModels.h>
@ -158,6 +159,7 @@ void attachSystemTablesServer(ContextPtr context, IDatabase & system_database, b
attach<StorageSystemProcesses>(context, system_database, "processes");
attach<StorageSystemMetrics>(context, system_database, "metrics");
attach<StorageSystemMerges>(context, system_database, "merges");
attach<StorageSystemMoves>(context, system_database, "moves");
attach<StorageSystemMutations>(context, system_database, "mutations");
attach<StorageSystemReplicas>(context, system_database, "replicas");
attach<StorageSystemReplicationQueue>(context, system_database, "replication_queue");