mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Merge branch 'master' into fix-long-test-select_projection_normal_agg
This commit is contained in:
commit
1347bdac0a
@ -110,23 +110,4 @@ ThreadGroupStatusPtr CurrentThread::getGroup()
|
||||
return current_thread->getThreadGroup();
|
||||
}
|
||||
|
||||
MemoryTracker * CurrentThread::getUserMemoryTracker()
|
||||
{
|
||||
if (unlikely(!current_thread))
|
||||
return nullptr;
|
||||
|
||||
auto * tracker = current_thread->memory_tracker.getParent();
|
||||
while (tracker && tracker->level != VariableContext::User)
|
||||
tracker = tracker->getParent();
|
||||
|
||||
return tracker;
|
||||
}
|
||||
|
||||
void CurrentThread::flushUntrackedMemory()
|
||||
{
|
||||
if (unlikely(!current_thread))
|
||||
return;
|
||||
current_thread->flushUntrackedMemory();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -40,12 +40,6 @@ public:
|
||||
/// Group to which belongs current thread
|
||||
static ThreadGroupStatusPtr getGroup();
|
||||
|
||||
/// MemoryTracker for user that owns current thread if any
|
||||
static MemoryTracker * getUserMemoryTracker();
|
||||
|
||||
/// Adjust counters in MemoryTracker hierarchy if untracked_memory is not 0.
|
||||
static void flushUntrackedMemory();
|
||||
|
||||
/// A logs queue used by TCPHandler to pass logs to a client
|
||||
static void attachInternalTextLogsQueue(const std::shared_ptr<InternalTextLogsQueue> & logs_queue,
|
||||
LogsLevel client_logs_level);
|
||||
|
@ -18,7 +18,6 @@
|
||||
#include <Parsers/ASTInsertQuery.h>
|
||||
#include <Parsers/queryToString.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Common/SipHash.h>
|
||||
#include <Common/FieldVisitorHash.h>
|
||||
#include <Common/DateLUT.h>
|
||||
@ -104,10 +103,9 @@ bool AsynchronousInsertQueue::InsertQuery::operator==(const InsertQuery & other)
|
||||
return query_str == other.query_str && settings == other.settings;
|
||||
}
|
||||
|
||||
AsynchronousInsertQueue::InsertData::Entry::Entry(String && bytes_, String && query_id_, MemoryTracker * user_memory_tracker_)
|
||||
AsynchronousInsertQueue::InsertData::Entry::Entry(String && bytes_, String && query_id_)
|
||||
: bytes(std::move(bytes_))
|
||||
, query_id(std::move(query_id_))
|
||||
, user_memory_tracker(user_memory_tracker_)
|
||||
, create_time(std::chrono::system_clock::now())
|
||||
{
|
||||
}
|
||||
@ -236,7 +234,7 @@ AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context)
|
||||
if (auto quota = query_context->getQuota())
|
||||
quota->used(QuotaType::WRITTEN_BYTES, bytes.size());
|
||||
|
||||
auto entry = std::make_shared<InsertData::Entry>(std::move(bytes), query_context->getCurrentQueryId(), CurrentThread::getUserMemoryTracker());
|
||||
auto entry = std::make_shared<InsertData::Entry>(std::move(bytes), query_context->getCurrentQueryId());
|
||||
|
||||
InsertQuery key{query, settings};
|
||||
InsertDataPtr data_to_process;
|
||||
|
@ -1,7 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <Parsers/IAST_fwd.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Core/Settings.h>
|
||||
#include <Poco/Logger.h>
|
||||
@ -60,31 +59,6 @@ private:
|
||||
UInt128 calculateHash() const;
|
||||
};
|
||||
|
||||
struct UserMemoryTrackerSwitcher
|
||||
{
|
||||
explicit UserMemoryTrackerSwitcher(MemoryTracker * new_tracker)
|
||||
{
|
||||
auto * thread_tracker = CurrentThread::getMemoryTracker();
|
||||
prev_untracked_memory = current_thread->untracked_memory;
|
||||
prev_memory_tracker_parent = thread_tracker->getParent();
|
||||
|
||||
current_thread->untracked_memory = 0;
|
||||
thread_tracker->setParent(new_tracker);
|
||||
}
|
||||
|
||||
~UserMemoryTrackerSwitcher()
|
||||
{
|
||||
CurrentThread::flushUntrackedMemory();
|
||||
auto * thread_tracker = CurrentThread::getMemoryTracker();
|
||||
|
||||
current_thread->untracked_memory = prev_untracked_memory;
|
||||
thread_tracker->setParent(prev_memory_tracker_parent);
|
||||
}
|
||||
|
||||
MemoryTracker * prev_memory_tracker_parent;
|
||||
Int64 prev_untracked_memory;
|
||||
};
|
||||
|
||||
struct InsertData
|
||||
{
|
||||
struct Entry
|
||||
@ -92,10 +66,9 @@ private:
|
||||
public:
|
||||
const String bytes;
|
||||
const String query_id;
|
||||
MemoryTracker * const user_memory_tracker;
|
||||
const std::chrono::time_point<std::chrono::system_clock> create_time;
|
||||
|
||||
Entry(String && bytes_, String && query_id_, MemoryTracker * user_memory_tracker_);
|
||||
Entry(String && bytes_, String && query_id_);
|
||||
|
||||
void finish(std::exception_ptr exception_ = nullptr);
|
||||
std::future<void> getFuture() { return promise.get_future(); }
|
||||
@ -106,19 +79,6 @@ private:
|
||||
std::atomic_bool finished = false;
|
||||
};
|
||||
|
||||
~InsertData()
|
||||
{
|
||||
auto it = entries.begin();
|
||||
// Entries must be destroyed in context of user who runs async insert.
|
||||
// Each entry in the list may correspond to a different user,
|
||||
// so we need to switch current thread's MemoryTracker parent on each iteration.
|
||||
while (it != entries.end())
|
||||
{
|
||||
UserMemoryTrackerSwitcher switcher((*it)->user_memory_tracker);
|
||||
it = entries.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
using EntryPtr = std::shared_ptr<Entry>;
|
||||
|
||||
std::list<EntryPtr> entries;
|
||||
|
@ -504,7 +504,10 @@ QueryPipeline InterpreterExplainQuery::executeImpl()
|
||||
auto pipe = QueryPipelineBuilder::getPipe(std::move(*pipeline), resources);
|
||||
const auto & processors = pipe.getProcessors();
|
||||
|
||||
printPipeline(processors, buf);
|
||||
if (settings.compact)
|
||||
printPipelineCompact(processors, buf, settings.query_pipeline_options.header);
|
||||
else
|
||||
printPipeline(processors, buf);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -12,10 +12,19 @@ ISourceStep::ISourceStep(DataStream output_stream_)
|
||||
QueryPipelineBuilderPtr ISourceStep::updatePipeline(QueryPipelineBuilders, const BuildQueryPipelineSettings & settings)
|
||||
{
|
||||
auto pipeline = std::make_unique<QueryPipelineBuilder>();
|
||||
QueryPipelineProcessorsCollector collector(*pipeline, this);
|
||||
|
||||
/// For `Source` step, since it's not add new Processors to `pipeline->pipe`
|
||||
/// in `initializePipeline`, but make an assign with new created Pipe.
|
||||
/// And Processors for the Step is added here. So we do not need to use
|
||||
/// `QueryPipelineProcessorsCollector` to collect Processors.
|
||||
initializePipeline(*pipeline, settings);
|
||||
auto added_processors = collector.detachProcessors();
|
||||
processors.insert(processors.end(), added_processors.begin(), added_processors.end());
|
||||
|
||||
/// But we need to set QueryPlanStep manually for the Processors, which
|
||||
/// will be used in `EXPLAIN PIPELINE`
|
||||
for (auto & processor : processors)
|
||||
{
|
||||
processor->setQueryPlanStep(this);
|
||||
}
|
||||
return pipeline;
|
||||
}
|
||||
|
||||
|
177
src/QueryPipeline/printPipeline.cpp
Normal file
177
src/QueryPipeline/printPipeline.cpp
Normal file
@ -0,0 +1,177 @@
|
||||
#include <QueryPipeline/printPipeline.h>
|
||||
#include <Processors/QueryPlan/IQueryPlanStep.h>
|
||||
#include <set>
|
||||
#include <map>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
void printPipelineCompact(const Processors & processors, WriteBuffer & out, bool with_header)
|
||||
{
|
||||
struct Node;
|
||||
|
||||
/// Group by processors name, QueryPlanStep and group in this step.
|
||||
struct Key
|
||||
{
|
||||
size_t group;
|
||||
IQueryPlanStep * step;
|
||||
std::string name;
|
||||
|
||||
auto getTuple() const { return std::forward_as_tuple(group, step, name); }
|
||||
|
||||
bool operator<(const Key & other) const
|
||||
{
|
||||
return getTuple() < other.getTuple();
|
||||
}
|
||||
};
|
||||
|
||||
/// Group ports by header.
|
||||
struct EdgeData
|
||||
{
|
||||
Block header;
|
||||
size_t count;
|
||||
};
|
||||
|
||||
using Edge = std::vector<EdgeData>;
|
||||
|
||||
struct Node
|
||||
{
|
||||
size_t id = 0;
|
||||
std::map<Node *, Edge> edges = {};
|
||||
std::vector<const IProcessor *> agents = {};
|
||||
};
|
||||
|
||||
std::map<Key, Node> graph;
|
||||
|
||||
auto get_key = [](const IProcessor & processor)
|
||||
{
|
||||
return Key{processor.getQueryPlanStepGroup(), processor.getQueryPlanStep(), processor.getName()};
|
||||
};
|
||||
|
||||
/// Fill nodes.
|
||||
for (const auto & processor : processors)
|
||||
{
|
||||
auto res = graph.emplace(get_key(*processor), Node());
|
||||
auto & node = res.first->second;
|
||||
node.agents.emplace_back(processor.get());
|
||||
|
||||
if (res.second)
|
||||
node.id = graph.size();
|
||||
}
|
||||
|
||||
Block empty_header;
|
||||
|
||||
/// Fill edges.
|
||||
for (const auto & processor : processors)
|
||||
{
|
||||
auto & from = graph[get_key(*processor)];
|
||||
|
||||
for (auto & port : processor->getOutputs())
|
||||
{
|
||||
if (!port.isConnected())
|
||||
continue;
|
||||
|
||||
auto & to = graph[get_key(port.getInputPort().getProcessor())];
|
||||
auto & edge = from.edges[&to];
|
||||
|
||||
/// Use empty header for each edge if with_header is false.
|
||||
const auto & header = with_header ? port.getHeader()
|
||||
: empty_header;
|
||||
|
||||
/// Group by header.
|
||||
bool found = false;
|
||||
for (auto & item : edge)
|
||||
{
|
||||
if (blocksHaveEqualStructure(header, item.header))
|
||||
{
|
||||
found = true;
|
||||
++item.count;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!found)
|
||||
edge.emplace_back(EdgeData{header, 1});
|
||||
}
|
||||
}
|
||||
|
||||
/// Group processors by it's QueryPlanStep.
|
||||
std::map<IQueryPlanStep *, std::vector<const Node *>> steps_map;
|
||||
|
||||
for (const auto & item : graph)
|
||||
steps_map[item.first.step].emplace_back(&item.second);
|
||||
|
||||
out << "digraph\n{\n";
|
||||
out << " rankdir=\"LR\";\n";
|
||||
out << " { node [shape = rect]\n";
|
||||
|
||||
/// Nodes // TODO quoting and escaping
|
||||
size_t next_step = 0;
|
||||
for (const auto & item : steps_map)
|
||||
{
|
||||
/// Use separate clusters for each step.
|
||||
if (item.first != nullptr)
|
||||
{
|
||||
out << " subgraph cluster_" << next_step << " {\n";
|
||||
out << " label =\"" << item.first->getName() << "\";\n";
|
||||
out << " style=filled;\n";
|
||||
out << " color=lightgrey;\n";
|
||||
out << " node [style=filled,color=white];\n";
|
||||
out << " { rank = same;\n";
|
||||
|
||||
++next_step;
|
||||
}
|
||||
|
||||
for (const auto & node : item.second)
|
||||
{
|
||||
const auto & processor = node->agents.front();
|
||||
out << " n" << node->id << " [label=\"" << processor->getName();
|
||||
|
||||
if (node->agents.size() > 1)
|
||||
out << " × " << node->agents.size();
|
||||
|
||||
const auto & description = processor->getDescription();
|
||||
if (!description.empty())
|
||||
out << ' ' << description;
|
||||
|
||||
out << "\"];\n";
|
||||
}
|
||||
|
||||
if (item.first != nullptr)
|
||||
{
|
||||
out << " }\n";
|
||||
out << " }\n";
|
||||
}
|
||||
}
|
||||
|
||||
out << " }\n";
|
||||
|
||||
/// Edges
|
||||
for (const auto & item : graph)
|
||||
{
|
||||
for (const auto & edge : item.second.edges)
|
||||
{
|
||||
for (const auto & data : edge.second)
|
||||
{
|
||||
out << " n" << item.second.id << " -> " << "n" << edge.first->id << " [label=\"";
|
||||
|
||||
if (data.count > 1)
|
||||
out << "× " << data.count;
|
||||
|
||||
if (with_header)
|
||||
{
|
||||
for (const auto & elem : data.header)
|
||||
{
|
||||
out << "\n";
|
||||
elem.dumpStructure(out);
|
||||
}
|
||||
}
|
||||
|
||||
out << "\"];\n";
|
||||
}
|
||||
}
|
||||
}
|
||||
out << "}\n";
|
||||
}
|
||||
|
||||
}
|
@ -64,4 +64,9 @@ void printPipeline(const Processors & processors, WriteBuffer & out)
|
||||
printPipeline(processors, std::vector<IProcessor::Status>(), out);
|
||||
}
|
||||
|
||||
/// Prints pipeline in compact representation.
|
||||
/// Group processors by it's name, QueryPlanStep and QueryPlanStepGroup.
|
||||
/// If QueryPlanStep wasn't set for processor, representation may be not correct.
|
||||
/// If with_header is set, prints block header for each edge.
|
||||
void printPipelineCompact(const Processors & processors, WriteBuffer & out, bool with_header);
|
||||
}
|
||||
|
@ -1,40 +0,0 @@
|
||||
import pytest
|
||||
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
node = cluster.add_instance("node")
|
||||
|
||||
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
def start_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def test_memory_usage():
|
||||
node.query(
|
||||
"CREATE TABLE async_table(data Array(UInt64)) ENGINE=MergeTree() ORDER BY data"
|
||||
)
|
||||
|
||||
node.get_query_request("SELECT count() FROM system.numbers")
|
||||
|
||||
INSERT_QUERY = "INSERT INTO async_table SETTINGS async_insert=1, wait_for_async_insert=1 VALUES ({})"
|
||||
for iter in range(10):
|
||||
values = list(range(iter * 5000000, (iter + 1) * 5000000))
|
||||
node.query(INSERT_QUERY.format(values))
|
||||
|
||||
response = node.get_query_request(
|
||||
"SELECT groupArray(number) FROM numbers(1000000) SETTINGS max_memory_usage_for_user={}".format(
|
||||
30 * (2**23)
|
||||
)
|
||||
)
|
||||
|
||||
_, err = response.get_answer_and_error()
|
||||
assert err == "", "Query failed with error {}".format(err)
|
||||
|
||||
node.query("DROP TABLE async_table")
|
@ -1,7 +1,12 @@
|
||||
-- The server does not crash after these queries:
|
||||
|
||||
DROP TABLE IF EXISTS t1;
|
||||
CREATE TABLE t1(ID UInt64, name String) engine=MergeTree order by ID;
|
||||
|
||||
insert into t1(ID, name) values (1, 'abc'), (2, 'bbb');
|
||||
|
||||
-- The returned node order is uncertain
|
||||
explain pipeline graph=1 select count(ID) from t1 FORMAT Null;
|
||||
explain pipeline graph=1 select sum(1) from t1 FORMAT Null;
|
||||
explain pipeline graph=1 select min(ID) from t1 FORMAT Null;
|
||||
explain pipeline graph=1 select max(ID) from t1 FORMAT Null;
|
||||
|
||||
DROP TABLE t1;
|
Loading…
Reference in New Issue
Block a user