Merge branch 'master' into pamarcos/copy_comment_in_create_as_statement

This commit is contained in:
Pablo Marcos 2024-04-08 08:10:43 +02:00
commit 04f819a124
20 changed files with 198 additions and 56 deletions

View File

@ -507,16 +507,18 @@ Example:
``` xml
<http_handlers>
<rule>
<url><![CDATA[/query_param_with_url/\w+/(?P<name_1>[^/]+)(/(?P<name_2>[^/]+))?]]></url>
<url><![CDATA[regex:/query_param_with_url/(?P<name_1>[^/]+)]]></url>
<methods>GET</methods>
<headers>
<XXX>TEST_HEADER_VALUE</XXX>
<PARAMS_XXX><![CDATA[(?P<name_1>[^/]+)(/(?P<name_2>[^/]+))?]]></PARAMS_XXX>
<PARAMS_XXX><![CDATA[regex:(?P<name_2>[^/]+)]]></PARAMS_XXX>
</headers>
<handler>
<type>predefined_query_handler</type>
<query>SELECT value FROM system.settings WHERE name = {name_1:String}</query>
<query>SELECT name, value FROM system.settings WHERE name = {name_2:String}</query>
<query>
SELECT name, value FROM system.settings
WHERE name IN ({name_1:String}, {name_2:String})
</query>
</handler>
</rule>
<defaults/>
@ -524,13 +526,13 @@ Example:
```
``` bash
$ curl -H 'XXX:TEST_HEADER_VALUE' -H 'PARAMS_XXX:max_threads' 'http://localhost:8123/query_param_with_url/1/max_threads/max_final_threads?max_threads=1&max_final_threads=2'
1
max_final_threads 2
$ curl -H 'XXX:TEST_HEADER_VALUE' -H 'PARAMS_XXX:max_final_threads' 'http://localhost:8123/query_param_with_url/max_threads?max_threads=1&max_final_threads=2'
max_final_threads 2
max_threads 1
```
:::note
In one `predefined_query_handler` only supports one `query` of an insert type.
In one `predefined_query_handler` only one `query` is supported.
:::
### dynamic_query_handler {#dynamic_query_handler}

View File

@ -434,16 +434,18 @@ $ curl -v 'http://localhost:8123/predefined_query'
``` xml
<http_handlers>
<rule>
<url><![CDATA[regex:/query_param_with_url/\w+/(?P<name_1>[^/]+)(/(?P<name_2>[^/]+))?]]></url>
<url><![CDATA[regex:/query_param_with_url/(?P<name_1>[^/]+)]]></url>
<methods>GET</methods>
<headers>
<XXX>TEST_HEADER_VALUE</XXX>
<PARAMS_XXX><![CDATA[(?P<name_1>[^/]+)(/(?P<name_2>[^/]+))?]]></PARAMS_XXX>
<PARAMS_XXX><![CDATA[regex:(?P<name_2>[^/]+)]]></PARAMS_XXX>
</headers>
<handler>
<type>predefined_query_handler</type>
<query>SELECT value FROM system.settings WHERE name = {name_1:String}</query>
<query>SELECT name, value FROM system.settings WHERE name = {name_2:String}</query>
<query>
SELECT name, value FROM system.settings
WHERE name IN ({name_1:String}, {name_2:String})
</query>
</handler>
</rule>
<defaults/>
@ -451,13 +453,13 @@ $ curl -v 'http://localhost:8123/predefined_query'
```
``` bash
$ curl -H 'XXX:TEST_HEADER_VALUE' -H 'PARAMS_XXX:max_threads' 'http://localhost:8123/query_param_with_url/1/max_threads/max_final_threads?max_threads=1&max_final_threads=2'
1
max_final_threads 2
$ curl -H 'XXX:TEST_HEADER_VALUE' -H 'PARAMS_XXX:max_final_threads' 'http://localhost:8123/query_param_with_url/max_threads?max_threads=1&max_final_threads=2'
max_final_threads 2
max_threads 1
```
:::note Предупреждение
В одном `predefined_query_handler` поддерживается только один запрос типа `INSERT`.
В одном `predefined_query_handler` поддерживается только один запрос.
:::
### dynamic_query_handler {#dynamic_query_handler}

View File

@ -427,29 +427,32 @@ $ curl -v 'http://localhost:8123/predefined_query'
``` xml
<http_handlers>
<rule>
<url><![CDATA[/query_param_with_url/\w+/(?P<name_1>[^/]+)(/(?P<name_2>[^/]+))?]]></url>
<method>GET</method>
<url><![CDATA[regex:/query_param_with_url/(?P<name_1>[^/]+)]]></url>
<methods>GET</methods>
<headers>
<XXX>TEST_HEADER_VALUE</XXX>
<PARAMS_XXX><![CDATA[(?P<name_1>[^/]+)(/(?P<name_2>[^/]+))?]]></PARAMS_XXX>
<PARAMS_XXX><![CDATA[regex:(?P<name_2>[^/]+)]]></PARAMS_XXX>
</headers>
<handler>
<type>predefined_query_handler</type>
<query>SELECT value FROM system.settings WHERE name = {name_1:String}</query>
<query>SELECT name, value FROM system.settings WHERE name = {name_2:String}</query>
<query>
SELECT name, value FROM system.settings
WHERE name IN ({name_1:String}, {name_2:String})
</query>
</handler>
</rule>
<defaults/>
</http_handlers>
```
``` bash
$ curl -H 'XXX:TEST_HEADER_VALUE' -H 'PARAMS_XXX:max_threads' 'http://localhost:8123/query_param_with_url/1/max_threads/max_final_threads?max_threads=1&max_final_threads=2'
1
max_final_threads 2
$ curl -H 'XXX:TEST_HEADER_VALUE' -H 'PARAMS_XXX:max_final_threads' 'http://localhost:8123/query_param_with_url/max_threads?max_threads=1&max_final_threads=2'
max_final_threads 2
max_threads 1
```
:::warning
在一个`predefined_query_handler`中,只支持insert类型的一个`查询`。
在一个`predefined_query_handler`中,只支持的一个`查询`。
:::
### 动态查询 {#dynamic_query_handler}

View File

@ -662,7 +662,6 @@ int mainEntryClickHouseInstall(int argc, char ** argv)
" <server>\n"
" <certificateFile>" << (config_dir / "server.crt").string() << "</certificateFile>\n"
" <privateKeyFile>" << (config_dir / "server.key").string() << "</privateKeyFile>\n"
" <dhParamsFile>" << (config_dir / "dhparam.pem").string() << "</dhParamsFile>\n"
" </server>\n"
" </openSSL>\n"
"</clickhouse>\n";

View File

@ -734,13 +734,17 @@ try
LOG_INFO(log, "Available CPU instruction sets: {}", cpu_info);
#endif
bool will_have_trace_collector = hasPHDRCache() && config().has("trace_log");
// Initialize global thread pool. Do it before we fetch configs from zookeeper
// nodes (`from_zk`), because ZooKeeper interface uses the pool. We will
// ignore `max_thread_pool_size` in configs we fetch from ZK, but oh well.
GlobalThreadPool::initialize(
server_settings.max_thread_pool_size,
server_settings.max_thread_pool_free_size,
server_settings.thread_pool_queue_size);
server_settings.thread_pool_queue_size,
will_have_trace_collector ? server_settings.global_profiler_real_time_period_ns : 0,
will_have_trace_collector ? server_settings.global_profiler_cpu_time_period_ns : 0);
/// Wait for all threads to avoid possible use-after-free (for example logging objects can be already destroyed).
SCOPE_EXIT({
Stopwatch watch;

View File

@ -211,23 +211,13 @@ void Timer::cleanup()
#endif
template <typename ProfilerImpl>
QueryProfilerBase<ProfilerImpl>::QueryProfilerBase(UInt64 thread_id, int clock_type, UInt32 period, int pause_signal_)
QueryProfilerBase<ProfilerImpl>::QueryProfilerBase([[maybe_unused]] UInt64 thread_id, [[maybe_unused]] int clock_type, [[maybe_unused]] UInt32 period, [[maybe_unused]] int pause_signal_)
: log(getLogger("QueryProfiler"))
, pause_signal(pause_signal_)
{
#if defined(SANITIZER)
UNUSED(thread_id);
UNUSED(clock_type);
UNUSED(period);
UNUSED(pause_signal);
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "QueryProfiler disabled because they cannot work under sanitizers");
#elif defined(__APPLE__)
UNUSED(thread_id);
UNUSED(clock_type);
UNUSED(period);
UNUSED(pause_signal);
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "QueryProfiler cannot work on OSX");
#else
/// Sanity check.
@ -261,6 +251,20 @@ QueryProfilerBase<ProfilerImpl>::QueryProfilerBase(UInt64 thread_id, int clock_t
#endif
}
template <typename ProfilerImpl>
void QueryProfilerBase<ProfilerImpl>::setPeriod([[maybe_unused]] UInt32 period_)
{
#if defined(SANITIZER)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "QueryProfiler disabled because they cannot work under sanitizers");
#elif defined(__APPLE__)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "QueryProfiler cannot work on OSX");
#else
timer.set(period_);
#endif
}
template <typename ProfilerImpl>
QueryProfilerBase<ProfilerImpl>::~QueryProfilerBase()
{

View File

@ -57,6 +57,8 @@ public:
QueryProfilerBase(UInt64 thread_id, int clock_type, UInt32 period, int pause_signal_);
~QueryProfilerBase();
void setPeriod(UInt32 period_);
private:
void cleanup();

View File

@ -490,8 +490,9 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
template class ThreadPoolImpl<std::thread>;
template class ThreadPoolImpl<ThreadFromGlobalPoolImpl<false>>;
template class ThreadFromGlobalPoolImpl<true>;
template class ThreadPoolImpl<ThreadFromGlobalPoolImpl<false, true>>;
template class ThreadFromGlobalPoolImpl<true, true>;
template class ThreadFromGlobalPoolImpl<true, false>;
std::unique_ptr<GlobalThreadPool> GlobalThreadPool::the_instance;
@ -500,7 +501,9 @@ GlobalThreadPool::GlobalThreadPool(
size_t max_threads_,
size_t max_free_threads_,
size_t queue_size_,
const bool shutdown_on_exception_)
const bool shutdown_on_exception_,
UInt64 global_profiler_real_time_period_ns_,
UInt64 global_profiler_cpu_time_period_ns_)
: FreeThreadPool(
CurrentMetrics::GlobalThread,
CurrentMetrics::GlobalThreadActive,
@ -509,10 +512,12 @@ GlobalThreadPool::GlobalThreadPool(
max_free_threads_,
queue_size_,
shutdown_on_exception_)
, global_profiler_real_time_period_ns(global_profiler_real_time_period_ns_)
, global_profiler_cpu_time_period_ns(global_profiler_cpu_time_period_ns_)
{
}
void GlobalThreadPool::initialize(size_t max_threads, size_t max_free_threads, size_t queue_size)
void GlobalThreadPool::initialize(size_t max_threads, size_t max_free_threads, size_t queue_size, UInt64 global_profiler_real_time_period_ns, UInt64 global_profiler_cpu_time_period_ns)
{
if (the_instance)
{
@ -520,7 +525,7 @@ void GlobalThreadPool::initialize(size_t max_threads, size_t max_free_threads, s
"The global thread pool is initialized twice");
}
the_instance.reset(new GlobalThreadPool(max_threads, max_free_threads, queue_size, false /*shutdown_on_exception*/));
the_instance.reset(new GlobalThreadPool(max_threads, max_free_threads, queue_size, false /*shutdown_on_exception*/, global_profiler_real_time_period_ns, global_profiler_cpu_time_period_ns));
}
GlobalThreadPool & GlobalThreadPool::instance()

View File

@ -172,10 +172,21 @@ class GlobalThreadPool : public FreeThreadPool, private boost::noncopyable
size_t max_threads_,
size_t max_free_threads_,
size_t queue_size_,
bool shutdown_on_exception_);
bool shutdown_on_exception_,
UInt64 global_profiler_real_time_period_ns_,
UInt64 global_profiler_cpu_time_period_ns_);
public:
static void initialize(size_t max_threads = 10000, size_t max_free_threads = 1000, size_t queue_size = 10000);
UInt64 global_profiler_real_time_period_ns;
UInt64 global_profiler_cpu_time_period_ns;
static void initialize(
size_t max_threads = 10000,
size_t max_free_threads = 1000,
size_t queue_size = 10000,
UInt64 global_profiler_real_time_period_ns_ = 0,
UInt64 global_profiler_cpu_time_period_ns_ = 0);
static GlobalThreadPool & instance();
static void shutdown();
};
@ -187,7 +198,7 @@ public:
* NOTE: User code should use 'ThreadFromGlobalPool' declared below instead of directly using this class.
*
*/
template <bool propagate_opentelemetry_context = true>
template <bool propagate_opentelemetry_context = true, bool global_trace_collector_allowed = true>
class ThreadFromGlobalPoolImpl : boost::noncopyable
{
public:
@ -197,11 +208,15 @@ public:
explicit ThreadFromGlobalPoolImpl(Function && func, Args &&... args)
: state(std::make_shared<State>())
{
UInt64 global_profiler_real_time_period = GlobalThreadPool::instance().global_profiler_real_time_period_ns;
UInt64 global_profiler_cpu_time_period = GlobalThreadPool::instance().global_profiler_cpu_time_period_ns;
/// NOTE:
/// - If this will throw an exception, the destructor won't be called
/// - this pointer cannot be passed in the lambda, since after detach() it will not be valid
GlobalThreadPool::instance().scheduleOrThrow([
my_state = state,
global_profiler_real_time_period,
global_profiler_cpu_time_period,
my_func = std::forward<Function>(func),
my_args = std::make_tuple(std::forward<Args>(args)...)]() mutable /// mutable is needed to destroy capture
{
@ -220,6 +235,12 @@ public:
/// Thread status holds raw pointer on query context, thus it always must be destroyed
/// before sending signal that permits to join this thread.
DB::ThreadStatus thread_status;
if constexpr (global_trace_collector_allowed)
{
if (unlikely(global_profiler_real_time_period != 0 || global_profiler_cpu_time_period != 0))
thread_status.initGlobalProfiler(global_profiler_real_time_period, global_profiler_cpu_time_period);
}
std::apply(function, arguments);
},
{}, // default priority
@ -305,11 +326,12 @@ protected:
/// you need to use class, or you need to use ThreadFromGlobalPool below.
///
/// See the comments of ThreadPool below to know how it works.
using ThreadFromGlobalPoolNoTracingContextPropagation = ThreadFromGlobalPoolImpl<false>;
using ThreadFromGlobalPoolNoTracingContextPropagation = ThreadFromGlobalPoolImpl<false, true>;
/// An alias of thread that execute jobs/tasks on global thread pool by implicit passing tracing context on current thread to underlying worker as parent tracing context.
/// If jobs/tasks are directly scheduled by using APIs of this class, you need to use this class or you need to use class above.
using ThreadFromGlobalPool = ThreadFromGlobalPoolImpl<true>;
using ThreadFromGlobalPool = ThreadFromGlobalPoolImpl<true, true>;
using ThreadFromGlobalPoolWithoutTraceCollector = ThreadFromGlobalPoolImpl<true, false>;
/// Recommended thread pool for the case when multiple thread pools are created and destroyed.
///

View File

@ -3,11 +3,12 @@
template <typename Thread>
class ThreadPoolImpl;
template <bool propagate_opentelemetry_context>
template <bool propagate_opentelemetry_context, bool global_trace_collector_allowed>
class ThreadFromGlobalPoolImpl;
using ThreadFromGlobalPoolNoTracingContextPropagation = ThreadFromGlobalPoolImpl<false>;
using ThreadFromGlobalPoolNoTracingContextPropagation = ThreadFromGlobalPoolImpl<false, true>;
using ThreadFromGlobalPool = ThreadFromGlobalPoolImpl<true>;
using ThreadFromGlobalPool = ThreadFromGlobalPoolImpl<true, true>;
using ThreadFromGlobalPoolWithoutTraceCollector = ThreadFromGlobalPoolImpl<true, false>;
using ThreadPool = ThreadPoolImpl<ThreadFromGlobalPoolNoTracingContextPropagation>;

View File

@ -124,6 +124,26 @@ ThreadStatus::ThreadStatus(bool check_current_thread_on_destruction_)
#endif
}
void ThreadStatus::initGlobalProfiler([[maybe_unused]] UInt64 global_profiler_real_time_period, [[maybe_unused]] UInt64 global_profiler_cpu_time_period)
{
#if !defined(SANITIZER) && !defined(CLICKHOUSE_KEEPER_STANDALONE_BUILD) && !defined(__APPLE__)
try
{
if (global_profiler_real_time_period > 0)
query_profiler_real = std::make_unique<QueryProfilerReal>(thread_id,
/* period= */ static_cast<UInt32>(global_profiler_real_time_period));
if (global_profiler_cpu_time_period > 0)
query_profiler_cpu = std::make_unique<QueryProfilerCPU>(thread_id,
/* period= */ static_cast<UInt32>(global_profiler_cpu_time_period));
}
catch (...)
{
tryLogCurrentException("ThreadStatus", "Cannot initialize GlobalProfiler");
}
#endif
}
ThreadGroupPtr ThreadStatus::getThreadGroup() const
{
chassert(current_thread == this);

View File

@ -307,6 +307,8 @@ public:
void flushUntrackedMemory();
void initGlobalProfiler(UInt64 global_profiler_real_time_period, UInt64 global_profiler_cpu_time_period);
private:
void applyGlobalSettings();
void applyQuerySettings();

View File

@ -137,6 +137,8 @@ namespace DB
M(UInt64, http_connections_soft_limit, 100, "Connections above this limit have significantly shorter time to live. The limit applies to the http connections which do not belong to any disk or storage.", 0) \
M(UInt64, http_connections_warn_limit, 1000, "Warning massages are written to the logs if number of in-use connections are higher than this limit. The limit applies to the http connections which do not belong to any disk or storage.", 0) \
M(UInt64, http_connections_store_limit, 5000, "Connections above this limit reset after use. Set to 0 to turn connection cache off. The limit applies to the http connections which do not belong to any disk or storage.", 0) \
M(UInt64, global_profiler_real_time_period_ns, 0, "Period for real clock timer of global profiler (in nanoseconds). Set 0 value to turn off the real clock global profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
M(UInt64, global_profiler_cpu_time_period_ns, 0, "Period for CPU clock timer of global profiler (in nanoseconds). Set 0 value to turn off the CPU clock global profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
/// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in StorageSystemServerSettings.cpp

View File

@ -21,6 +21,7 @@
#include <Common/DateLUT.h>
#include <Common/logger_useful.h>
#include <base/errnoToString.h>
#include <Core/ServerSettings.h>
#if defined(OS_LINUX)
# include <Common/hasLinuxCapability.h>
@ -474,12 +475,22 @@ void ThreadStatus::initQueryProfiler()
try
{
if (settings.query_profiler_real_time_period_ns > 0)
query_profiler_real = std::make_unique<QueryProfilerReal>(thread_id,
/* period= */ static_cast<UInt32>(settings.query_profiler_real_time_period_ns));
{
if (!query_profiler_real)
query_profiler_real = std::make_unique<QueryProfilerReal>(thread_id,
/* period= */ static_cast<UInt32>(settings.query_profiler_real_time_period_ns));
else
query_profiler_real->setPeriod(static_cast<UInt32>(settings.query_profiler_real_time_period_ns));
}
if (settings.query_profiler_cpu_time_period_ns > 0)
query_profiler_cpu = std::make_unique<QueryProfilerCPU>(thread_id,
/* period= */ static_cast<UInt32>(settings.query_profiler_cpu_time_period_ns));
{
if (!query_profiler_cpu)
query_profiler_cpu = std::make_unique<QueryProfilerCPU>(thread_id,
/* period= */ static_cast<UInt32>(settings.query_profiler_cpu_time_period_ns));
else
query_profiler_cpu->setPeriod(static_cast<UInt32>(settings.query_profiler_cpu_time_period_ns));
}
}
catch (...)
{

View File

@ -357,8 +357,9 @@ def update_contributors(
# format: " 1016 Alexey Arno"
shortlog = git_runner.run("git shortlog HEAD --summary")
escaping = str.maketrans({"\\": "\\\\", '"': '\\"'})
contributors = sorted(
[c.split(maxsplit=1)[-1].replace('"', r"\"") for c in shortlog.split("\n")],
[c.split(maxsplit=1)[-1].translate(escaping) for c in shortlog.split("\n")],
)
contributors = [f' "{c}",' for c in contributors]

View File

@ -0,0 +1,4 @@
<clickhouse>
<global_profiler_real_time_period_ns>1000000000</global_profiler_real_time_period_ns>
<global_profiler_cpu_time_period_ns>1000000000</global_profiler_cpu_time_period_ns>
</clickhouse>

View File

@ -67,6 +67,7 @@ ln -sf $SRC_PATH/config.d/validate_tcp_client_information.xml $DEST_SERVER_PATH/
ln -sf $SRC_PATH/config.d/zero_copy_destructive_operations.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/block_number.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/handlers.yaml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/serverwide_trace_collector.xml $DEST_SERVER_PATH/config.d/
# Not supported with fasttest.
if [ "${DEST_SERVER_PATH}" = "/etc/clickhouse-server" ]

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,4 @@
<clickhouse>
<global_profiler_real_time_period_ns>10000000</global_profiler_real_time_period_ns>
<global_profiler_cpu_time_period_ns>10000000</global_profiler_cpu_time_period_ns>
</clickhouse>

View File

@ -0,0 +1,52 @@
#!/usr/bin/env python3
import pytest
import time
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance("node1", main_configs=["configs/global_profiler.xml"])
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_global_thread_profiler(start_cluster):
if node1.is_built_with_sanitizer():
return
node1.query(
"CREATE TABLE t (key UInt32, value String) Engine = MergeTree() ORDER BY key"
)
node1.query("INSERT INTO t SELECT number, toString(number) from numbers(100)")
node1.query("INSERT INTO t SELECT number, toString(number) from numbers(100)")
node1.query("INSERT INTO t SELECT number, toString(number) from numbers(100)")
node1.query("INSERT INTO t SELECT number, toString(number) from numbers(100)")
node1.query("INSERT INTO t SELECT number, toString(number) from numbers(100)")
node1.query("INSERT INTO t SELECT number, toString(number) from numbers(100)")
node1.query("INSERT INTO t SELECT number, toString(number) from numbers(100)")
node1.query("INSERT INTO t SELECT number, toString(number) from numbers(100)")
time.sleep(5)
node1.query("SYSTEM FLUSH LOGS")
assert (
int(
node1.query(
"SELECT count() FROM system.trace_log where trace_type='Real' and query_id = ''"
).strip()
)
> 0
)