Support split for ActionsDAG.

This commit is contained in:
Nikolai Kochetov 2021-01-18 17:59:59 +03:00
parent 8d58ce532a
commit 8bafe9cca3
2 changed files with 208 additions and 0 deletions

View File

@ -798,6 +798,212 @@ ActionsDAGPtr ActionsDAG::merge(ActionsDAG && first, ActionsDAG && second)
return std::make_shared<ActionsDAG>(std::move(first));
}
std::pair<ActionsDAGPtr, ActionsDAGPtr> ActionsDAG::split(std::unordered_set<const Node *> split_nodes) const
{
/// Split DAG into two parts.
/// (first_nodes, first_index) is a part which will have split_list in result.
/// (second_nodes, second_index) is a part which will have same index as current actions.
std::list<Node> second_nodes;
std::list<Node> first_nodes;
Index second_index;
Index first_index;
/// List of nodes from current actions which are not inputs, but will be in second part.
std::vector<const Node *> new_inputs;
struct Frame
{
const Node * node;
size_t next_child_to_visit = 0;
};
struct Data
{
bool needed_by_split_node = false;
bool visited = false;
bool used_in_result = false;
/// Copies of node in one of the DAGs.
/// For COLUMN and INPUT both copies may exist.
Node * to_second = nullptr;
Node * to_first = nullptr;
};
std::stack<Frame> stack;
std::unordered_map<const Node *, Data> data;
for (const auto & node : index)
data[node].used_in_result = true;
/// DFS. Decide if node is needed by split.
for (const auto & node : nodes)
{
if (split_nodes.count(&node) == 0)
continue;
auto & cur_data = data[&node];
if (cur_data.needed_by_split_node)
continue;
cur_data.needed_by_split_node = true;
stack.push({.node = &node});
while (!stack.empty())
{
auto & cur_node = stack.top().node;
stack.pop();
for (const auto * child : cur_node->children)
{
auto & child_data = data[child];
if (!child_data.needed_by_split_node)
{
child_data.needed_by_split_node = true;
stack.push({.node = child});
}
}
}
}
/// DFS. Move nodes to one of the DAGs.
for (const auto & node : nodes)
{
if (!data[&node].visited)
stack.push({.node = &node});
while (!stack.empty())
{
auto & cur = stack.top();
auto & cur_data = data[cur.node];
/// At first, visit all children.
while (cur.next_child_to_visit < cur.node->children.size())
{
auto * child = cur.node->children[cur.next_child_to_visit];
auto & child_data = data[child];
if (!child_data.visited)
{
stack.push({.node = child});
break;
}
++cur.next_child_to_visit;
}
/// Make a copy part.
if (cur.next_child_to_visit == cur.node->children.size())
{
cur_data.visited = true;
stack.pop();
if (!cur_data.needed_by_split_node)
{
auto & copy = second_nodes.emplace_back(*cur.node);
cur_data.to_second = &copy;
/// Replace children to newly created nodes.
for (auto & child : copy.children)
{
auto & child_data = data[child];
/// If children is not created, int may be from split part.
if (!child_data.to_second)
{
if (child->type == ActionType::COLUMN) /// Just create new node for COLUMN action.
{
child_data.to_second = &second_nodes.emplace_back(*child);
}
else
{
/// Node from first part is added as new input.
Node input_node;
input_node.type = ActionType::INPUT;
input_node.result_type = child->result_type;
input_node.result_name = child->result_name;
child_data.to_second = &second_nodes.emplace_back(std::move(input_node));
/// If it is already an input, it was created by other branch.
assert(child->type != ActionType::INPUT);
new_inputs.push_back(child);
}
}
child = child_data.to_second;
}
/// Every input should be in both DAGs.
if (copy.type == ActionType::INPUT)
{
auto & input_copy = first_nodes.emplace_back(*cur.node);
assert(cur_data.to_first == nullptr);
cur_data.to_first = &input_copy;
}
}
else
{
auto & copy = first_nodes.emplace_back(*cur.node);
cur_data.to_first = &copy;
/// Replace children to newly created nodes.
for (auto & child : copy.children)
{
child = data[child].to_first;
assert(child != nullptr);
}
if (cur_data.used_in_result || copy.type == ActionType::INPUT)
{
/// If this node is needed in result, add it as input.
Node input_node;
input_node.type = ActionType::INPUT;
input_node.result_type = node.result_type;
input_node.result_name = node.result_name;
cur_data.to_second = &second_nodes.emplace_back(std::move(input_node));
if (copy.type != ActionType::INPUT)
new_inputs.push_back(cur.node);
}
}
}
}
}
for (auto * node : index)
second_index.insert(data[node].to_second);
Inputs second_inputs;
Inputs first_inputs;
for (auto * input : inputs)
{
const auto & cur = data[input];
second_inputs.push_back(cur.to_second);
first_index.insert(cur.to_first);
first_inputs.push_back(cur.to_first);
}
for (const auto * input : new_inputs)
{
const auto & cur = data[input];
second_inputs.push_back(cur.to_second);
first_index.insert(cur.to_first);
}
auto first_actions = cloneEmpty();
first_actions->nodes.swap(first_nodes);
first_actions->index.swap(first_index);
first_actions->inputs.swap(first_inputs);
auto second_actions = cloneEmpty();
second_actions->nodes.swap(second_nodes);
second_actions->index.swap(second_index);
second_actions->inputs.swap(second_inputs);
return {std::move(first_actions), std::move(second_actions)};
}
ActionsDAGPtr ActionsDAG::splitActionsBeforeArrayJoin(const NameSet & array_joined_columns)
{
/// Split DAG into two parts.

View File

@ -253,6 +253,8 @@ public:
/// Otherwise, any two actions may be combined.
static ActionsDAGPtr merge(ActionsDAG && first, ActionsDAG && second);
std::pair<ActionsDAGPtr, ActionsDAGPtr> split(std::unordered_set<const Node *> split_nodes) const;
private:
Node & addNode(Node node, bool can_replace = false);
Node & getNode(const std::string & name);