Merge branch 'master' into fix-query-parameters

This commit is contained in:
Alexey Milovidov 2023-03-12 18:59:25 +01:00
commit b6169552b0
20 changed files with 239 additions and 132 deletions

View File

@ -110,23 +110,4 @@ ThreadGroupStatusPtr CurrentThread::getGroup()
return current_thread->getThreadGroup();
}
MemoryTracker * CurrentThread::getUserMemoryTracker()
{
if (unlikely(!current_thread))
return nullptr;
auto * tracker = current_thread->memory_tracker.getParent();
while (tracker && tracker->level != VariableContext::User)
tracker = tracker->getParent();
return tracker;
}
void CurrentThread::flushUntrackedMemory()
{
if (unlikely(!current_thread))
return;
current_thread->flushUntrackedMemory();
}
}

View File

@ -40,12 +40,6 @@ public:
/// Group to which belongs current thread
static ThreadGroupStatusPtr getGroup();
/// MemoryTracker for user that owns current thread if any
static MemoryTracker * getUserMemoryTracker();
/// Adjust counters in MemoryTracker hierarchy if untracked_memory is not 0.
static void flushUntrackedMemory();
/// A logs queue used by TCPHandler to pass logs to a client
static void attachInternalTextLogsQueue(const std::shared_ptr<InternalTextLogsQueue> & logs_queue,
LogsLevel client_logs_level);

View File

@ -18,7 +18,6 @@
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/queryToString.h>
#include <Storages/IStorage.h>
#include <Common/CurrentThread.h>
#include <Common/SipHash.h>
#include <Common/FieldVisitorHash.h>
#include <Common/DateLUT.h>
@ -104,10 +103,9 @@ bool AsynchronousInsertQueue::InsertQuery::operator==(const InsertQuery & other)
return query_str == other.query_str && settings == other.settings;
}
AsynchronousInsertQueue::InsertData::Entry::Entry(String && bytes_, String && query_id_, MemoryTracker * user_memory_tracker_)
AsynchronousInsertQueue::InsertData::Entry::Entry(String && bytes_, String && query_id_)
: bytes(std::move(bytes_))
, query_id(std::move(query_id_))
, user_memory_tracker(user_memory_tracker_)
, create_time(std::chrono::system_clock::now())
{
}
@ -236,7 +234,7 @@ AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context)
if (auto quota = query_context->getQuota())
quota->used(QuotaType::WRITTEN_BYTES, bytes.size());
auto entry = std::make_shared<InsertData::Entry>(std::move(bytes), query_context->getCurrentQueryId(), CurrentThread::getUserMemoryTracker());
auto entry = std::make_shared<InsertData::Entry>(std::move(bytes), query_context->getCurrentQueryId());
InsertQuery key{query, settings};
InsertDataPtr data_to_process;

View File

@ -1,7 +1,6 @@
#pragma once
#include <Parsers/IAST_fwd.h>
#include <Common/CurrentThread.h>
#include <Common/ThreadPool.h>
#include <Core/Settings.h>
#include <Poco/Logger.h>
@ -60,31 +59,6 @@ private:
UInt128 calculateHash() const;
};
struct UserMemoryTrackerSwitcher
{
explicit UserMemoryTrackerSwitcher(MemoryTracker * new_tracker)
{
auto * thread_tracker = CurrentThread::getMemoryTracker();
prev_untracked_memory = current_thread->untracked_memory;
prev_memory_tracker_parent = thread_tracker->getParent();
current_thread->untracked_memory = 0;
thread_tracker->setParent(new_tracker);
}
~UserMemoryTrackerSwitcher()
{
CurrentThread::flushUntrackedMemory();
auto * thread_tracker = CurrentThread::getMemoryTracker();
current_thread->untracked_memory = prev_untracked_memory;
thread_tracker->setParent(prev_memory_tracker_parent);
}
MemoryTracker * prev_memory_tracker_parent;
Int64 prev_untracked_memory;
};
struct InsertData
{
struct Entry
@ -92,10 +66,9 @@ private:
public:
const String bytes;
const String query_id;
MemoryTracker * const user_memory_tracker;
const std::chrono::time_point<std::chrono::system_clock> create_time;
Entry(String && bytes_, String && query_id_, MemoryTracker * user_memory_tracker_);
Entry(String && bytes_, String && query_id_);
void finish(std::exception_ptr exception_ = nullptr);
std::future<void> getFuture() { return promise.get_future(); }
@ -106,19 +79,6 @@ private:
std::atomic_bool finished = false;
};
~InsertData()
{
auto it = entries.begin();
// Entries must be destroyed in context of user who runs async insert.
// Each entry in the list may correspond to a different user,
// so we need to switch current thread's MemoryTracker parent on each iteration.
while (it != entries.end())
{
UserMemoryTrackerSwitcher switcher((*it)->user_memory_tracker);
it = entries.erase(it);
}
}
using EntryPtr = std::shared_ptr<Entry>;
std::list<EntryPtr> entries;

View File

@ -504,6 +504,9 @@ QueryPipeline InterpreterExplainQuery::executeImpl()
auto pipe = QueryPipelineBuilder::getPipe(std::move(*pipeline), resources);
const auto & processors = pipe.getProcessors();
if (settings.compact)
printPipelineCompact(processors, buf, settings.query_pipeline_options.header);
else
printPipeline(processors, buf);
}
else

View File

@ -1192,7 +1192,7 @@ JoinTreeQueryPlan buildJoinTreeQueryPlan(const QueryTreeNodePtr & query_node,
{
if (table_expression == left_table_expression)
{
query_plans_stack.push_back(std::move(left_table_expression_query_plan));
query_plans_stack.push_back(std::move(left_table_expression_query_plan)); /// NOLINT
left_table_expression = {};
continue;
}

View File

@ -12,10 +12,19 @@ ISourceStep::ISourceStep(DataStream output_stream_)
QueryPipelineBuilderPtr ISourceStep::updatePipeline(QueryPipelineBuilders, const BuildQueryPipelineSettings & settings)
{
auto pipeline = std::make_unique<QueryPipelineBuilder>();
QueryPipelineProcessorsCollector collector(*pipeline, this);
/// For `Source` step, since it's not add new Processors to `pipeline->pipe`
/// in `initializePipeline`, but make an assign with new created Pipe.
/// And Processors for the Step is added here. So we do not need to use
/// `QueryPipelineProcessorsCollector` to collect Processors.
initializePipeline(*pipeline, settings);
auto added_processors = collector.detachProcessors();
processors.insert(processors.end(), added_processors.begin(), added_processors.end());
/// But we need to set QueryPlanStep manually for the Processors, which
/// will be used in `EXPLAIN PIPELINE`
for (auto & processor : processors)
{
processor->setQueryPlanStep(this);
}
return pipeline;
}

View File

@ -0,0 +1,177 @@
#include <QueryPipeline/printPipeline.h>
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <set>
#include <map>
namespace DB
{
void printPipelineCompact(const Processors & processors, WriteBuffer & out, bool with_header)
{
struct Node;
/// Group by processors name, QueryPlanStep and group in this step.
struct Key
{
size_t group;
IQueryPlanStep * step;
std::string name;
auto getTuple() const { return std::forward_as_tuple(group, step, name); }
bool operator<(const Key & other) const
{
return getTuple() < other.getTuple();
}
};
/// Group ports by header.
struct EdgeData
{
Block header;
size_t count;
};
using Edge = std::vector<EdgeData>;
struct Node
{
size_t id = 0;
std::map<Node *, Edge> edges = {};
std::vector<const IProcessor *> agents = {};
};
std::map<Key, Node> graph;
auto get_key = [](const IProcessor & processor)
{
return Key{processor.getQueryPlanStepGroup(), processor.getQueryPlanStep(), processor.getName()};
};
/// Fill nodes.
for (const auto & processor : processors)
{
auto res = graph.emplace(get_key(*processor), Node());
auto & node = res.first->second;
node.agents.emplace_back(processor.get());
if (res.second)
node.id = graph.size();
}
Block empty_header;
/// Fill edges.
for (const auto & processor : processors)
{
auto & from = graph[get_key(*processor)];
for (auto & port : processor->getOutputs())
{
if (!port.isConnected())
continue;
auto & to = graph[get_key(port.getInputPort().getProcessor())];
auto & edge = from.edges[&to];
/// Use empty header for each edge if with_header is false.
const auto & header = with_header ? port.getHeader()
: empty_header;
/// Group by header.
bool found = false;
for (auto & item : edge)
{
if (blocksHaveEqualStructure(header, item.header))
{
found = true;
++item.count;
break;
}
}
if (!found)
edge.emplace_back(EdgeData{header, 1});
}
}
/// Group processors by it's QueryPlanStep.
std::map<IQueryPlanStep *, std::vector<const Node *>> steps_map;
for (const auto & item : graph)
steps_map[item.first.step].emplace_back(&item.second);
out << "digraph\n{\n";
out << " rankdir=\"LR\";\n";
out << " { node [shape = rect]\n";
/// Nodes // TODO quoting and escaping
size_t next_step = 0;
for (const auto & item : steps_map)
{
/// Use separate clusters for each step.
if (item.first != nullptr)
{
out << " subgraph cluster_" << next_step << " {\n";
out << " label =\"" << item.first->getName() << "\";\n";
out << " style=filled;\n";
out << " color=lightgrey;\n";
out << " node [style=filled,color=white];\n";
out << " { rank = same;\n";
++next_step;
}
for (const auto & node : item.second)
{
const auto & processor = node->agents.front();
out << " n" << node->id << " [label=\"" << processor->getName();
if (node->agents.size() > 1)
out << " × " << node->agents.size();
const auto & description = processor->getDescription();
if (!description.empty())
out << ' ' << description;
out << "\"];\n";
}
if (item.first != nullptr)
{
out << " }\n";
out << " }\n";
}
}
out << " }\n";
/// Edges
for (const auto & item : graph)
{
for (const auto & edge : item.second.edges)
{
for (const auto & data : edge.second)
{
out << " n" << item.second.id << " -> " << "n" << edge.first->id << " [label=\"";
if (data.count > 1)
out << "× " << data.count;
if (with_header)
{
for (const auto & elem : data.header)
{
out << "\n";
elem.dumpStructure(out);
}
}
out << "\"];\n";
}
}
}
out << "}\n";
}
}

View File

@ -64,4 +64,9 @@ void printPipeline(const Processors & processors, WriteBuffer & out)
printPipeline(processors, std::vector<IProcessor::Status>(), out);
}
/// Prints pipeline in compact representation.
/// Group processors by it's name, QueryPlanStep and QueryPlanStepGroup.
/// If QueryPlanStep wasn't set for processor, representation may be not correct.
/// If with_header is set, prints block header for each edge.
void printPipelineCompact(const Processors & processors, WriteBuffer & out, bool with_header);
}

View File

@ -4121,9 +4121,9 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, const Contex
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception(
ErrorCodes::TOO_MANY_PARTS,
"Too many parts ({}) in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified "
"Too many parts ({}) in all partitions in total in table '{}'. This indicates wrong choice of partition key. The threshold can be modified "
"with 'max_parts_in_total' setting in <merge_tree> element in config.xml or with per-table setting.",
parts_count_in_total);
parts_count_in_total, getLogName());
}
size_t outdated_parts_over_threshold = 0;
@ -4137,8 +4137,8 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, const Contex
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception(
ErrorCodes::TOO_MANY_PARTS,
"Too many inactive parts ({}). Parts cleaning are processing significantly slower than inserts",
outdated_parts_count_in_partition);
"Too many inactive parts ({}) in table '{}'. Parts cleaning are processing significantly slower than inserts",
outdated_parts_count_in_partition, getLogName());
}
if (settings->inactive_parts_to_delay_insert > 0 && outdated_parts_count_in_partition >= settings->inactive_parts_to_delay_insert)
outdated_parts_over_threshold = outdated_parts_count_in_partition - settings->inactive_parts_to_delay_insert + 1;
@ -4151,6 +4151,7 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, const Contex
const auto active_parts_to_throw_insert
= query_settings.parts_to_throw_insert ? query_settings.parts_to_throw_insert : settings->parts_to_throw_insert;
size_t active_parts_over_threshold = 0;
{
bool parts_are_large_enough_in_average
= settings->max_avg_part_size_for_too_many_parts && average_part_size > settings->max_avg_part_size_for_too_many_parts;
@ -4160,9 +4161,10 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, const Contex
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception(
ErrorCodes::TOO_MANY_PARTS,
"Too many parts ({} with average size of {}). Merges are processing significantly slower than inserts",
"Too many parts ({} with average size of {}) in table '{}'. Merges are processing significantly slower than inserts",
parts_count_in_partition,
ReadableSize(average_part_size));
ReadableSize(average_part_size),
getLogName());
}
if (active_parts_to_delay_insert > 0 && parts_count_in_partition >= active_parts_to_delay_insert
&& !parts_are_large_enough_in_average)

View File

@ -1,40 +0,0 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance("node")
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_memory_usage():
node.query(
"CREATE TABLE async_table(data Array(UInt64)) ENGINE=MergeTree() ORDER BY data"
)
node.get_query_request("SELECT count() FROM system.numbers")
INSERT_QUERY = "INSERT INTO async_table SETTINGS async_insert=1, wait_for_async_insert=1 VALUES ({})"
for iter in range(10):
values = list(range(iter * 5000000, (iter + 1) * 5000000))
node.query(INSERT_QUERY.format(values))
response = node.get_query_request(
"SELECT groupArray(number) FROM numbers(1000000) SETTINGS max_memory_usage_for_user={}".format(
30 * (2**23)
)
)
_, err = response.get_answer_and_error()
assert err == "", "Query failed with error {}".format(err)
node.query("DROP TABLE async_table")

View File

@ -13,15 +13,24 @@ $CLICKHOUSE_CLIENT --query="CREATE TABLE small_table (a UInt64 default 0, n UInt
$CLICKHOUSE_CLIENT --query="INSERT INTO small_table (n) SELECT * from system.numbers limit 100000;"
$CLICKHOUSE_CLIENT --query="OPTIMIZE TABLE small_table FINAL;"
cached_query="SELECT count() FROM small_table where n > 0;"
cached_query="SELECT count() FROM small_table WHERE n > 0;"
$CLICKHOUSE_CLIENT --use_uncompressed_cache=1 --query="$cached_query" &> /dev/null
$CLICKHOUSE_CLIENT --use_uncompressed_cache=1 --allow_prefetched_read_pool_for_remote_filesystem=0 --allow_prefetched_read_pool_for_local_filesystem=0 --query_id="test-query-uncompressed-cache" --query="$cached_query" &> /dev/null
$CLICKHOUSE_CLIENT --log_queries 1 --use_uncompressed_cache 1 --query="$cached_query"
$CLICKHOUSE_CLIENT --log_queries 1 --use_uncompressed_cache 1 --allow_prefetched_read_pool_for_remote_filesystem 0 --allow_prefetched_read_pool_for_local_filesystem 0 --query_id="test-query-uncompressed-cache" --query="$cached_query"
$CLICKHOUSE_CLIENT --query="SYSTEM FLUSH LOGS"
$CLICKHOUSE_CLIENT --query="SELECT ProfileEvents['Seek'], ProfileEvents['ReadCompressedBytes'], ProfileEvents['UncompressedCacheHits'] AS hit FROM system.query_log WHERE (query_id = 'test-query-uncompressed-cache') and current_database = currentDatabase() AND (type = 2) AND event_date >= yesterday() ORDER BY event_time DESC LIMIT 1"
$CLICKHOUSE_CLIENT --query="
SELECT
ProfileEvents['Seek'],
ProfileEvents['ReadCompressedBytes'],
ProfileEvents['UncompressedCacheHits'] AS hit
FROM system.query_log
WHERE query_id = 'test-query-uncompressed-cache'
AND current_database = currentDatabase()
AND type = 2
AND event_date >= yesterday()
ORDER BY event_time DESC
LIMIT 1"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS small_table"

View File

@ -16,7 +16,7 @@ for STORAGE_POLICY in 's3_cache' 'local_cache'; do
ORDER BY key
SETTINGS storage_policy='$STORAGE_POLICY', min_bytes_for_wide_part = 10485760"
$CLICKHOUSE_CLIENT --query "SYSTEM STOP MERGES"
$CLICKHOUSE_CLIENT --query "SYSTEM STOP MERGES test_02286"
$CLICKHOUSE_CLIENT --query "SYSTEM DROP FILESYSTEM CACHE"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache"

View File

@ -11,7 +11,8 @@ CREATE TABLE video_log
)
ENGINE = MergeTree
PARTITION BY toDate(datetime)
ORDER BY (user_id, device_id);
ORDER BY (user_id, device_id)
SETTINGS index_granularity = 8192, index_granularity_bytes = '10Mi';
DROP TABLE IF EXISTS rng;
@ -57,7 +58,8 @@ CREATE TABLE video_log_result
)
ENGINE = MergeTree
PARTITION BY toDate(hour)
ORDER BY sum_bytes;
ORDER BY sum_bytes
SETTINGS index_granularity = 8192, index_granularity_bytes = '10Mi';
INSERT INTO video_log_result SELECT
toStartOfHour(datetime) AS hour,

View File

@ -1,7 +1,12 @@
-- The server does not crash after these queries:
DROP TABLE IF EXISTS t1;
CREATE TABLE t1(ID UInt64, name String) engine=MergeTree order by ID;
insert into t1(ID, name) values (1, 'abc'), (2, 'bbb');
-- The returned node order is uncertain
explain pipeline graph=1 select count(ID) from t1 FORMAT Null;
explain pipeline graph=1 select sum(1) from t1 FORMAT Null;
explain pipeline graph=1 select min(ID) from t1 FORMAT Null;
explain pipeline graph=1 select max(ID) from t1 FORMAT Null;
DROP TABLE t1;