Improve TablesDependencyGraph.

This commit is contained in:
Vitaly Baranov 2022-12-04 01:13:54 +01:00
parent 6487acf9c0
commit 78c433b79d
2 changed files with 165 additions and 69 deletions

View File

@ -1,5 +1,6 @@
#include <Databases/TablesDependencyGraph.h>
#include <Common/logger_useful.h>
#include <IO/WriteHelpers.h>
#include <boost/range/adaptor/reversed.hpp>
@ -9,12 +10,13 @@ namespace DB
namespace ErrorCodes
{
extern const int INFINITE_LOOP;
extern const int LOGICAL_ERROR;
}
namespace
{
constexpr const size_t CYCLIC_LEVEL = static_cast<size_t>(-2);
constexpr const size_t CYCLIC_LEVEL = std::numeric_limits<size_t>::max();
}
@ -40,7 +42,7 @@ TablesDependencyGraph::TablesDependencyGraph(TablesDependencyGraph && src) noexc
TablesDependencyGraph & TablesDependencyGraph::operator=(const TablesDependencyGraph & src)
{
if (&src != this)
if (this != &src)
{
nodes = src.nodes;
nodes_by_database_and_table_names = src.nodes_by_database_and_table_names;
@ -54,11 +56,14 @@ TablesDependencyGraph & TablesDependencyGraph::operator=(const TablesDependencyG
TablesDependencyGraph & TablesDependencyGraph::operator=(TablesDependencyGraph && src) noexcept
{
nodes = std::exchange(src.nodes, decltype(nodes){});
nodes_by_database_and_table_names = std::exchange(src.nodes_by_database_and_table_names, decltype(nodes_by_database_and_table_names){});
nodes_by_uuid = std::exchange(src.nodes_by_uuid, decltype(nodes_by_uuid){});
levels_calculated = std::exchange(src.levels_calculated, false);
nodes_sorted_by_level_lazy = std::exchange(src.nodes_sorted_by_level_lazy, decltype(nodes_sorted_by_level_lazy){});
if (this != &src)
{
nodes = std::exchange(src.nodes, decltype(nodes){});
nodes_by_database_and_table_names = std::exchange(src.nodes_by_database_and_table_names, decltype(nodes_by_database_and_table_names){});
nodes_by_uuid = std::exchange(src.nodes_by_uuid, decltype(nodes_by_uuid){});
levels_calculated = std::exchange(src.levels_calculated, false);
nodes_sorted_by_level_lazy = std::exchange(src.nodes_sorted_by_level_lazy, decltype(nodes_sorted_by_level_lazy){});
}
return *this;
}
@ -89,11 +94,13 @@ void TablesDependencyGraph::addDependency(const StorageID & table_id, const Stor
auto * table_node = addOrUpdateNode(table_id);
auto * dependency_node = addOrUpdateNode(dependency);
if (table_node->dependencies.contains(dependency_node))
return; /// Already have this dependency.
bool inserted = table_node->dependencies.insert(dependency_node).second;
if (!inserted)
return; /// Not inserted because we already had this dependency.
table_node->dependencies.insert(dependency_node);
dependency_node->dependents.insert(table_node);
/// `dependency_node` must be updated too.
[[maybe_unused]] bool inserted_to_set = dependency_node->dependents.insert(table_node).second;
chassert(inserted_to_set);
setNeedRecalculateLevels();
}
@ -126,13 +133,19 @@ void TablesDependencyGraph::addDependencies(const StorageID & table_id, const st
for (auto * dependency_node : old_dependency_nodes)
{
if (!new_dependency_nodes.contains(dependency_node))
dependency_node->dependents.erase(table_node);
{
[[maybe_unused]] bool removed_from_set = dependency_node->dependents.erase(table_node);
chassert(removed_from_set);
}
}
for (auto * dependency_node : new_dependency_nodes)
{
if (!old_dependency_nodes.contains(dependency_node))
dependency_node->dependents.insert(table_node);
{
[[maybe_unused]] bool inserted_to_set = dependency_node->dependents.insert(table_node).second;
chassert(inserted_to_set);
}
}
table_node->dependencies = std::move(new_dependency_nodes);
@ -167,21 +180,28 @@ bool TablesDependencyGraph::removeDependency(const StorageID & table_id, const S
auto dependency_it = table_node->dependencies.find(dependency_node);
if (dependency_it == table_node->dependencies.end())
return false;
return false; /// No such dependency, nothing to remove.
table_node->dependencies.erase(dependency_it);
dependency_node->dependents.erase(table_node);
bool table_node_removed = false;
/// `dependency_node` must be updated too.
[[maybe_unused]] bool removed_from_set = dependency_node->dependents.erase(table_node);
chassert(removed_from_set);
if (remove_isolated_tables && dependency_node->dependencies.empty() && dependency_node->dependents.empty())
{
/// The dependency table has no dependencies and no dependents now, so we will remove it from the graph.
removeNode(dependency_node);
if (table_node == dependency_node)
table_node_removed = true;
}
if (remove_isolated_tables && !table_node_removed && table_node->dependencies.empty() && table_node->dependents.empty())
{
/// The table `table_id` has no dependencies and no dependents now, so we will remove it from the graph.
removeNode(table_node);
}
setNeedRecalculateLevels();
return true;
@ -203,19 +223,28 @@ std::vector<StorageID> TablesDependencyGraph::removeDependencies(const StorageID
for (auto * dependency_node : dependency_nodes)
{
/// We're gathering the list of dependencies the table `table_id` had in the graph to return from the function.
dependencies.emplace_back(dependency_node->storage_id);
dependency_node->dependents.erase(table_node);
/// Update `dependency_node`.
[[maybe_unused]] bool removed_from_set = dependency_node->dependents.erase(table_node);
chassert(removed_from_set);
if (remove_isolated_tables && dependency_node->dependencies.empty() && dependency_node->dependents.empty())
{
/// The dependency table has no dependencies and no dependents now, so we will remove it from the graph.
removeNode(dependency_node);
if (table_node == dependency_node)
table_node_removed = true;
}
}
if (remove_isolated_tables && !table_node_removed && table_node->dependencies.empty() && table_node->dependents.empty())
chassert(table_node->dependencies.empty());
if (remove_isolated_tables && !table_node_removed && table_node->dependents.empty())
{
/// The table `table_id` has no dependencies and no dependents now, so we will remove it from the graph.
removeNode(table_node);
}
setNeedRecalculateLevels();
return dependencies;
@ -251,7 +280,12 @@ TablesDependencyGraph::Node * TablesDependencyGraph::findNode(const StorageID &
{
auto * node = it->second;
if (table_id.hasUUID() && node->storage_id.hasUUID() && (table_id.uuid != node->storage_id.uuid))
return nullptr; /// UUID is different, it's not the node we're looking for.
{
/// We found a table with specified database and table names in the graph, but surprisingly it has a different UUID.
/// Maybe an "EXCHANGE TABLES" command has been executed somehow without changing the graph?
LOG_WARNING(getLogger(), "Found table {} in the graph with unexpected UUID {}", table_id, node->storage_id.uuid);
return nullptr; /// Act like it's not found.
}
return node; /// Found by table name.
}
}
@ -268,7 +302,8 @@ TablesDependencyGraph::Node * TablesDependencyGraph::addOrUpdateNode(const Stora
if (table_id.hasUUID() && !node->storage_id.hasUUID())
{
node->storage_id.uuid = table_id.uuid;
nodes_by_uuid.emplace(node->storage_id.uuid, node);
[[maybe_unused]] bool inserted_to_map = nodes_by_uuid.emplace(node->storage_id.uuid, node).second;
chassert(inserted_to_map);
}
if (!table_id.table_name.empty() && ((table_id.table_name != node->storage_id.table_name) || (table_id.database_name != node->storage_id.database_name)))
@ -283,7 +318,8 @@ TablesDependencyGraph::Node * TablesDependencyGraph::addOrUpdateNode(const Stora
nodes_by_database_and_table_names.erase(node->storage_id);
node->storage_id.database_name = table_id.database_name;
node->storage_id.table_name = table_id.table_name;
nodes_by_database_and_table_names.emplace(node->storage_id, node);
[[maybe_unused]] bool inserted_to_map = nodes_by_database_and_table_names.emplace(node->storage_id, node).second;
chassert(inserted_to_map);
}
}
else
@ -303,9 +339,15 @@ TablesDependencyGraph::Node * TablesDependencyGraph::addOrUpdateNode(const Stora
nodes.insert(node_ptr);
node = node_ptr.get();
if (table_id.hasUUID())
nodes_by_uuid.emplace(table_id.uuid, node);
{
[[maybe_unused]] bool inserted_to_map = nodes_by_uuid.emplace(table_id.uuid, node).second;
chassert(inserted_to_map);
}
if (!table_id.table_name.empty())
nodes_by_database_and_table_names.emplace(table_id, node);
{
[[maybe_unused]] bool inserted_to_map = nodes_by_database_and_table_names.emplace(table_id, node).second;
chassert(inserted_to_map);
}
}
return node;
}
@ -313,22 +355,39 @@ TablesDependencyGraph::Node * TablesDependencyGraph::addOrUpdateNode(const Stora
void TablesDependencyGraph::removeNode(Node * node)
{
chassert(node);
auto dependency_nodes = std::move(node->dependencies);
auto dependent_nodes = std::move(node->dependents);
if (node->storage_id.hasUUID())
nodes_by_uuid.erase(node->storage_id.uuid);
{
[[maybe_unused]] bool removed_from_map = nodes_by_uuid.erase(node->storage_id.uuid);
chassert(removed_from_map);
}
if (!node->storage_id.table_name.empty())
nodes_by_database_and_table_names.erase(node->storage_id);
{
[[maybe_unused]]bool removed_from_map = nodes_by_database_and_table_names.erase(node->storage_id);
chassert(removed_from_map);
}
for (auto * dependency_node : dependency_nodes)
dependency_node->dependents.erase(node);
{
[[maybe_unused]] bool removed_from_set = dependency_node->dependents.erase(node);
chassert(removed_from_set);
}
for (auto * dependent_node : dependent_nodes)
dependent_node->dependencies.erase(node);
{
[[maybe_unused]] bool removed_from_set = dependent_node->dependencies.erase(node);
chassert(removed_from_set);
}
nodes.erase(node->shared_from_this());
auto it = nodes.find(node);
chassert(it != nodes.end());
nodes.erase(it);
nodes_sorted_by_level_lazy.clear();
}
@ -533,7 +592,7 @@ String TablesDependencyGraph::describeCyclicDependencies() const
}
void TablesDependencyGraph::setNeedRecalculateLevels()
void TablesDependencyGraph::setNeedRecalculateLevels() const
{
levels_calculated = false;
nodes_sorted_by_level_lazy.clear();
@ -546,49 +605,73 @@ void TablesDependencyGraph::calculateLevels() const
return;
levels_calculated = true;
/// First find tables with no dependencies, add them to `nodes_sorted_by_level_lazy`.
/// Then remove those tables from the dependency graph (we imitate that removing by decrementing `num_dependencies_to_count`),
/// and find which tables have no dependencies now.
/// Repeat until we have tables with no dependencies.
/// In the end we expect all nodes from `nodes` to be added to `nodes_sorted_by_level_lazy`.
/// If some nodes are still not added to `nodes_sorted_by_level_lazy` in the end then there is a cyclic dependency.
/// Complexity: O(V + E)
nodes_sorted_by_level_lazy.clear();
nodes_sorted_by_level_lazy.reserve(nodes.size());
std::unordered_set<const Node *> nodes_to_process;
for (const auto & node_ptr : nodes)
nodes_to_process.emplace(node_ptr.get());
size_t current_level = 0;
while (!nodes_to_process.empty())
/// Find tables with no dependencies.
for (const auto & node_ptr : nodes)
{
size_t old_num_sorted = nodes_sorted_by_level_lazy.size();
for (auto it = nodes_to_process.begin(); it != nodes_to_process.end();)
const Node * node = node_ptr.get();
node->num_dependencies_to_count = node->dependencies.size();
if (!node->num_dependencies_to_count)
{
const auto * current_node = *(it++);
bool has_dependencies = false;
for (const auto * dependency : current_node->dependencies)
{
if (nodes_to_process.contains(dependency))
has_dependencies = true;
}
node->level = current_level;
nodes_sorted_by_level_lazy.emplace_back(node);
}
}
if (!has_dependencies)
size_t num_nodes_without_dependencies = nodes_sorted_by_level_lazy.size();
++current_level;
while (num_nodes_without_dependencies)
{
size_t begin = nodes_sorted_by_level_lazy.size() - num_nodes_without_dependencies;
size_t end = nodes_sorted_by_level_lazy.size();
/// Decrement number of dependencies for each dependent table.
for (size_t i = begin; i != end; ++i)
{
const Node * current_node = nodes_sorted_by_level_lazy[i];
for (const Node * dependent_node : current_node->dependents)
{
current_node->level = current_level;
nodes_sorted_by_level_lazy.emplace_back(current_node);
if (!dependent_node->num_dependencies_to_count)
throw Exception(ErrorCodes::LOGICAL_ERROR, "{}: Trying to decrement 0 dependencies counter for {}. It's a bug", name_for_logging, dependent_node->storage_id);
if (!--dependent_node->num_dependencies_to_count)
{
dependent_node->level = current_level;
nodes_sorted_by_level_lazy.emplace_back(dependent_node);
}
}
}
if (nodes_sorted_by_level_lazy.size() == old_num_sorted)
break;
for (size_t i = old_num_sorted; i != nodes_sorted_by_level_lazy.size(); ++i)
nodes_to_process.erase(nodes_sorted_by_level_lazy[i]);
if (nodes_sorted_by_level_lazy.size() > nodes.size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "{}: Some tables were found more than once while passing through the dependency graph. It's a bug", name_for_logging);
num_nodes_without_dependencies = nodes_sorted_by_level_lazy.size() - end;
++current_level;
}
for (const auto * node_with_cyclic_dependencies : nodes_to_process)
if (nodes_sorted_by_level_lazy.size() < nodes.size())
{
node_with_cyclic_dependencies->level = CYCLIC_LEVEL;
nodes_sorted_by_level_lazy.emplace_back(node_with_cyclic_dependencies);
for (const auto & node_ptr : nodes)
{
const Node * node = node_ptr.get();
if (node->num_dependencies_to_count)
{
node->level = CYCLIC_LEVEL;
nodes_sorted_by_level_lazy.emplace_back(node);
}
}
}
}
@ -630,7 +713,7 @@ std::vector<std::vector<StorageID>> TablesDependencyGraph::getTablesSortedByDepe
void TablesDependencyGraph::log() const
{
if (empty())
if (nodes.empty())
{
LOG_TEST(getLogger(), "No tables");
return;

View File

@ -20,11 +20,11 @@ using TableNamesSet = std::unordered_set<QualifiedTableName>;
///
/// This class is used to represent various types of table-table dependencies:
/// 1. View dependencies: "source_table -> materialized_view".
/// Data inserted to a source table is also inserted to corresponding materialized views.
/// Data inserted to a source table is also inserted to corresponding materialized views.
/// 2. Loading dependencies: specify in which order tables must be loaded during startup.
/// For example a dictionary should be loaded after it's source table and it's written in the graph as "dictionary -> source_table".
/// For example a dictionary should be loaded after it's source table and it's written in the graph as "dictionary -> source_table".
/// 3. Referential dependencies: "table -> all tables mentioned in its definition".
/// Referential dependencies are checked to decide if it's safe to drop a table (it can be unsafe if the table is used by another table).
/// Referential dependencies are checked to decide if it's safe to drop a table (it can be unsafe if the table is used by another table).
///
/// WARNING: This class doesn't have an embedded mutex, so it must be synchronized outside.
class TablesDependencyGraph
@ -98,8 +98,8 @@ public:
/// Cyclic dependencies are dependencies like "A->A" or "A->B->C->D->A".
void checkNoCyclicDependencies() const;
bool hasCyclicDependencies() const;
std::vector<StorageID> getTablesWithCyclicDependencies() const;
String describeCyclicDependencies() const;
std::vector<StorageID> getTablesWithCyclicDependencies() const;
/// Returns a list of tables sorted by their dependencies:
/// tables without dependencies first, then
@ -113,8 +113,12 @@ public:
/// Outputs information about this graph as a bunch of logging messages.
void log() const;
/// Calculates levels - this is required for checking cyclic dependencies, to sort tables by dependency, and to log the graph.
/// This function is called automatically by the functions which need it, but can be invoked directly.
void calculateLevels() const;
private:
struct Node : public std::enable_shared_from_this<Node>
struct Node
{
StorageID storage_id;
@ -128,28 +132,38 @@ private:
/// Calculated lazily.
mutable size_t level = 0;
/// Number of dependencies left, used only while we're calculating levels.
mutable size_t num_dependencies_to_count = 0;
explicit Node(const StorageID & storage_id_) : storage_id(storage_id_) {}
};
using NodeSharedPtr = std::shared_ptr<Node>;
struct LessByLevel
struct Hash
{
bool operator()(const Node * left, const Node * right) { return left->level < right->level; }
using is_transparent = void;
size_t operator()(const Node * node) const { return std::hash<const Node *>{}(node); }
size_t operator()(const NodeSharedPtr & node_ptr) const { return operator()(node_ptr.get()); }
};
std::unordered_set<NodeSharedPtr> nodes;
struct Equal
{
using is_transparent = void;
size_t operator()(const NodeSharedPtr & left, const Node * right) const { return left.get() == right; }
size_t operator()(const NodeSharedPtr & left, const NodeSharedPtr & right) const { return left == right; }
};
std::unordered_set<NodeSharedPtr, Hash, Equal> nodes;
/// Nodes can be found either by UUID or by database name & table name. That's why we need two maps here.
std::unordered_map<StorageID, Node *, StorageID::DatabaseAndTableNameHash, StorageID::DatabaseAndTableNameEqual> nodes_by_database_and_table_names;
std::unordered_map<UUID, Node *> nodes_by_uuid;
/// This is set if both `level` inside each node and `nodes_sorted_by_level_lazy` are calculated.
mutable bool levels_calculated = false;
/// Nodes sorted by their level. Calculated lazily.
using NodesSortedByLevel = std::vector<const Node *>;
mutable NodesSortedByLevel nodes_sorted_by_level_lazy;
mutable bool levels_calculated = false;
const String name_for_logging;
mutable Poco::Logger * logger = nullptr;
@ -161,8 +175,7 @@ private:
static std::vector<StorageID> getDependencies(const Node & node);
static std::vector<StorageID> getDependents(const Node & node);
void setNeedRecalculateLevels();
void calculateLevels() const;
void setNeedRecalculateLevels() const;
const NodesSortedByLevel & getNodesSortedByLevel() const;
Poco::Logger * getLogger() const;