Merge pull request #10099 from ClickHouse/fix-9826

Fix 9826
This commit is contained in:
alexey-milovidov 2020-04-08 02:41:11 +03:00 committed by GitHub
commit a29393821d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 147 additions and 20 deletions

View File

@ -96,7 +96,7 @@ bool PipelineExecutor::addEdges(UInt64 node)
{
const IProcessor * proc = &it->getOutputPort().getProcessor();
auto output_port_number = proc->getOutputPortNumber(&it->getOutputPort());
add_edge(*it, proc, graph[node].backEdges, true, from_input, output_port_number, &graph[node].post_updated_input_ports);
add_edge(*it, proc, graph[node].backEdges, true, from_input, output_port_number, graph[node].post_updated_input_ports.get());
}
}
@ -111,7 +111,7 @@ bool PipelineExecutor::addEdges(UInt64 node)
{
const IProcessor * proc = &it->getInputPort().getProcessor();
auto input_port_number = proc->getInputPortNumber(&it->getInputPort());
add_edge(*it, proc, graph[node].directEdges, false, input_port_number, from_output, &graph[node].post_updated_output_ports);
add_edge(*it, proc, graph[node].directEdges, false, input_port_number, from_output, graph[node].post_updated_output_ports.get());
}
}
@ -221,7 +221,7 @@ bool PipelineExecutor::expandPipeline(Stack & stack, UInt64 pid)
if (addEdges(node))
{
std::lock_guard guard(graph[node].status_mutex);
std::lock_guard guard(*graph[node].status_mutex);
for (; num_back_edges < graph[node].backEdges.size(); ++num_back_edges)
graph[node].updated_input_ports.emplace_back(num_back_edges);
@ -246,7 +246,7 @@ bool PipelineExecutor::tryAddProcessorToStackIfUpdated(Edge & edge, Queue & queu
auto & node = graph[edge.to];
std::unique_lock lock(node.status_mutex);
std::unique_lock lock(*node.status_mutex);
ExecStatus status = node.status;
@ -340,22 +340,22 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, size_t thread_number, Queue
}
{
for (auto & edge_id : node.post_updated_input_ports)
for (auto & edge_id : *node.post_updated_input_ports)
{
auto edge = static_cast<Edge *>(edge_id);
updated_back_edges.emplace_back(edge);
edge->update_info.trigger();
}
for (auto & edge_id : node.post_updated_output_ports)
for (auto & edge_id : *node.post_updated_output_ports)
{
auto edge = static_cast<Edge *>(edge_id);
updated_direct_edges.emplace_back(edge);
edge->update_info.trigger();
}
node.post_updated_input_ports.clear();
node.post_updated_output_ports.clear();
node.post_updated_input_ports->clear();
node.post_updated_output_ports->clear();
}
}
@ -402,7 +402,7 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, size_t thread_number, Queue
while (!stack.empty())
{
auto item = stack.top();
if (!prepareProcessor(item, thread_number, queue, std::unique_lock<std::mutex>(graph[item].status_mutex)))
if (!prepareProcessor(item, thread_number, queue, std::unique_lock<std::mutex>(*graph[item].status_mutex)))
return false;
stack.pop();
@ -519,7 +519,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
auto prepare_processor = [&](UInt64 pid, Queue & queue)
{
if (!prepareProcessor(pid, thread_num, queue, std::unique_lock<std::mutex>(graph[pid].status_mutex)))
if (!prepareProcessor(pid, thread_num, queue, std::unique_lock<std::mutex>(*graph[pid].status_mutex)))
finish();
};
@ -729,7 +729,7 @@ void PipelineExecutor::executeImpl(size_t num_threads)
UInt64 proc = stack.top();
stack.pop();
prepareProcessor(proc, 0, queue, std::unique_lock<std::mutex>(graph[proc].status_mutex));
prepareProcessor(proc, 0, queue, std::unique_lock<std::mutex>(*graph[proc].status_mutex));
while (!queue.empty())
{

View File

@ -104,10 +104,10 @@ private:
Edges backEdges;
ExecStatus status;
std::mutex status_mutex;
std::unique_ptr<std::mutex> status_mutex;
std::vector<void *> post_updated_input_ports;
std::vector<void *> post_updated_output_ports;
std::unique_ptr<Port::UpdateInfo::UpdateList> post_updated_input_ports;
std::unique_ptr<Port::UpdateInfo::UpdateList> post_updated_output_ports;
/// Last state for profiling.
IProcessor::Status last_processor_status = IProcessor::Status::NeedData;
@ -124,12 +124,10 @@ private:
execution_state->processor = processor;
execution_state->processors_id = processor_id;
execution_state->has_quota = processor->hasQuota();
}
Node(Node && other) noexcept
: processor(other.processor), status(other.status)
, execution_state(std::move(other.execution_state))
{
status_mutex = std::make_unique<std::mutex>();
post_updated_input_ports = std::make_unique<Port::UpdateInfo::UpdateList>();
post_updated_output_ports = std::make_unique<Port::UpdateInfo::UpdateList>();
}
};

View File

@ -30,7 +30,9 @@ class Port
public:
struct UpdateInfo
{
std::vector<void *> * update_list = nullptr;
using UpdateList = std::vector<void *>;
UpdateList * update_list = nullptr;
void * id = nullptr;
UInt64 version = 0;
UInt64 prev_version = 0;

View File

@ -0,0 +1,10 @@
8
8
8
8
8
8
8
8
8
8

View File

@ -0,0 +1,117 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
set -o errexit
set -o pipefail
echo "
DROP TABLE if exists tableA;
DROP TABLE if exists tableB;
create table tableA (id UInt64, col1 UInt64, colDate Date) engine = ReplacingMergeTree(colDate, id, 8192);
create table tableB (id UInt64, Aid UInt64, colDate Date) engine = ReplacingMergeTree(colDate, id, 8192);
insert into tableA select number, number % 10, addDays(toDate('2020-01-01'), - number % 1000) from numbers(100000);
insert into tableB select number, number % 100000, addDays(toDate('2020-01-01'), number % 90) from numbers(50000000);
" | $CLICKHOUSE_CLIENT -n
for i in {1..10}; do echo "
SELECT tableName
FROM
(
SELECT
col1,
'T1_notJoin1' AS tableName,
count(*) AS c
FROM tableA
GROUP BY col1
UNION ALL
SELECT
a.col1,
'T2_filteredAfterJoin1' AS tableName,
count(*) AS c
FROM tableB AS b
INNER JOIN tableA AS a ON a.id = b.Aid
WHERE b.colDate = '2020-01-01'
GROUP BY a.col1
UNION ALL
SELECT
a.col1,
'T3_filteredAfterJoin2' AS tableName,
count(*) AS c
FROM tableB AS b
INNER JOIN
tableA AS a
ON a.id = b.Aid
WHERE b.colDate = '2020-01-02'
GROUP BY a.col1
UNION ALL
SELECT
a.col1,
'T4_filteredBeforeJoin1' AS tableName,
count(*) AS c
FROM tableA AS a
INNER JOIN
(
SELECT
Aid
FROM tableB
WHERE colDate = '2020-01-01'
) AS b ON a.id = b.Aid
GROUP BY a.col1
UNION ALL
SELECT
a.col1,
'T5_filteredBeforeJoin2' AS tableName,
count(*) AS c
FROM tableA AS a
INNER JOIN
(
SELECT
Aid
FROM tableB
WHERE colDate = '2020-01-02'
) AS b ON a.id = b.Aid
GROUP BY a.col1
UNION ALL
SELECT
a.col1,
'T6_filteredAfterJoin3' AS tableName,
count(*) AS c
FROM tableB AS b
INNER JOIN tableA AS a ON a.id = b.Aid
WHERE b.colDate = '2020-01-03'
GROUP BY a.col1
UNION ALL
SELECT
col1,
'T7_notJoin2' AS tableName,
count(*) AS c
FROM tableA
GROUP BY col1
UNION ALL
SELECT
a.col1,
'T8_filteredBeforeJoin3' AS tableName,
count(*) AS c
FROM tableA AS a
INNER JOIN
(
SELECT
Aid
FROM tableB
WHERE colDate = '2020-01-03'
) AS b ON a.id = b.Aid
GROUP BY a.col1
) AS a
GROUP BY tableName
ORDER BY tableName ASC;
" | $CLICKHOUSE_CLIENT -n | wc -l ; done;
echo "
DROP TABLE tableA;
DROP TABLE tableB;
" | $CLICKHOUSE_CLIENT -n