This commit is contained in:
Sergei Trifonov 2024-09-19 17:16:51 +02:00 committed by GitHub
commit 5ada7d357a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
42 changed files with 363 additions and 30 deletions

View File

@ -737,6 +737,14 @@ Number of sessions (connections) to ZooKeeper. Should be no more than one, becau
Number of watches (event subscriptions) in ZooKeeper.
### ConcurrencyControlAcquired
Total number of acquired CPU slots.
### ConcurrencyControlSoftLimit
Value of soft limit on number of CPU slots.
**See Also**
- [system.asynchronous_metrics](../../operations/system-tables/asynchronous_metrics.md#system_tables-asynchronous_metrics) — Contains periodically calculated metrics.

View File

@ -1643,6 +1643,7 @@ try
concurrent_threads_soft_limit = value;
}
ConcurrencyControl::instance().setMaxConcurrency(concurrent_threads_soft_limit);
LOG_INFO(log, "ConcurrencyControl limit is set to {}", concurrent_threads_soft_limit);
global_context->getProcessList().setMaxSize(new_server_settings.max_concurrent_queries);
global_context->getProcessList().setMaxInsertQueriesAmount(new_server_settings.max_concurrent_insert_queries);

View File

@ -566,6 +566,7 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, Iden
PullingAsyncPipelineExecutor executor(io.pipeline);
io.pipeline.setProgressCallback(context->getProgressCallback());
io.pipeline.setProcessListElement(context->getProcessListElement());
io.pipeline.setConcurrencyControl(context->getSettingsRef().use_concurrency_control);
Block block;

View File

@ -1715,6 +1715,9 @@ try
QueryPipeline pipeline(std::move(pipe));
PullingAsyncPipelineExecutor executor(pipeline);
/// Concurrency control in client is not required
pipeline.setConcurrencyControl(false);
if (need_render_progress)
{
pipeline.setProgressCallback([this](const Progress & progress){ onProgress(progress); });

View File

@ -268,6 +268,7 @@ void LocalConnection::sendQuery(
{
state->block = state->io.pipeline.getHeader();
state->executor = std::make_unique<PullingAsyncPipelineExecutor>(state->io.pipeline);
state->io.pipeline.setConcurrencyControl(false);
}
else if (state->io.pipeline.completed())
{

View File

@ -1,7 +1,23 @@
#include <Common/ISlotControl.h>
#include <Common/ConcurrencyControl.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
namespace ProfileEvents
{
extern const Event ConcurrencyControlGrantedHard;
extern const Event ConcurrencyControlGrantDelayed;
extern const Event ConcurrencyControlAcquiredTotal;
extern const Event ConcurrencyControlAllocationDelayed;
}
namespace CurrentMetrics
{
extern const Metric ConcurrencyControlAcquired;
extern const Metric ConcurrencyControlSoftLimit;
}
namespace DB
{
@ -17,6 +33,7 @@ ConcurrencyControl::Slot::~Slot()
ConcurrencyControl::Slot::Slot(SlotAllocationPtr && allocation_)
: allocation(std::move(allocation_))
, acquired_slot_increment(CurrentMetrics::ConcurrencyControlAcquired)
{
}
@ -34,6 +51,7 @@ ConcurrencyControl::Allocation::~Allocation()
{
if (granted.compare_exchange_strong(value, value - 1))
{
ProfileEvents::increment(ProfileEvents::ConcurrencyControlAcquiredTotal, 1);
std::unique_lock lock{mutex};
return AcquiredSlotPtr(new Slot(shared_from_this())); // can't use std::make_shared due to private ctor
}
@ -84,6 +102,7 @@ void ConcurrencyControl::Allocation::release()
ConcurrencyControl::ConcurrencyControl()
: cur_waiter(waiters.end())
, max_concurrency_metric(CurrentMetrics::ConcurrencyControlSoftLimit, 0)
{
}
@ -103,11 +122,16 @@ ConcurrencyControl::~ConcurrencyControl()
// Acquire as many slots as we can, but not lower than `min`
SlotCount granted = std::max(min, std::min(max, available(lock)));
cur_concurrency += granted;
ProfileEvents::increment(ProfileEvents::ConcurrencyControlGrantedHard, min);
// Create allocation and start waiting if more slots are required
if (granted < max)
{
ProfileEvents::increment(ProfileEvents::ConcurrencyControlGrantDelayed, max - granted);
ProfileEvents::increment(ProfileEvents::ConcurrencyControlAllocationDelayed);
return SlotAllocationPtr(new Allocation(*this, max, granted,
waiters.insert(cur_waiter, nullptr /* pointer is set by Allocation ctor */)));
}
else
return SlotAllocationPtr(new Allocation(*this, max, granted));
}
@ -116,6 +140,7 @@ void ConcurrencyControl::setMaxConcurrency(SlotCount value)
{
std::unique_lock lock{mutex};
max_concurrency = std::max<SlotCount>(1, value); // never allow max_concurrency to be zero
max_concurrency_metric.changeTo(max_concurrency == UnlimitedSlots ? 0 : max_concurrency);
schedule(lock);
}

View File

@ -8,6 +8,7 @@
#include <base/types.h>
#include <boost/core/noncopyable.hpp>
#include <Common/CurrentMetrics.h>
#include <Common/ISlotControl.h>
namespace DB
@ -53,6 +54,7 @@ public:
explicit Slot(SlotAllocationPtr && allocation_);
SlotAllocationPtr allocation;
CurrentMetrics::Increment acquired_slot_increment;
};
// Manages group of slots for a single query, see ConcurrencyControl::allocate(min, max)
@ -131,6 +133,7 @@ private:
Waiters::iterator cur_waiter; // round-robin pointer
SlotCount max_concurrency = UnlimitedSlots;
SlotCount cur_concurrency = 0;
CurrentMetrics::Increment max_concurrency_metric;
};
}

View File

@ -313,6 +313,9 @@
M(FilteringMarksWithPrimaryKey, "Number of threads currently doing filtering of mark ranges by the primary key") \
M(FilteringMarksWithSecondaryKeys, "Number of threads currently doing filtering of mark ranges by secondary keys") \
\
M(ConcurrencyControlAcquired, "Total number of acquired CPU slots") \
M(ConcurrencyControlSoftLimit, "Value of soft limit on number of CPU slots") \
\
M(DiskS3NoSuchKeyErrors, "The number of `NoSuchKey` errors that occur when reading data from S3 cloud storage through ClickHouse disks.") \
#ifdef APPLY_FOR_EXTERNAL_METRICS

View File

@ -73,4 +73,44 @@ public:
[[nodiscard]] virtual SlotAllocationPtr allocate(SlotCount min, SlotCount max) = 0;
};
/// Allocation that grants all the slots immediately on creation
class GrantedAllocation : public ISlotAllocation
{
public:
explicit GrantedAllocation(SlotCount granted_)
: granted(granted_)
, allocated(granted_)
{}
[[nodiscard]] AcquiredSlotPtr tryAcquire() override
{
SlotCount value = granted.load();
while (value)
{
if (granted.compare_exchange_strong(value, value - 1))
return std::make_shared<IAcquiredSlot>();
}
return {};
}
SlotCount grantedCount() const override
{
return granted.load();
}
SlotCount allocatedCount() const override
{
return allocated;
}
private:
std::atomic<SlotCount> granted; // allocated, but not yet acquired
const SlotCount allocated;
};
[[nodiscard]] inline SlotAllocationPtr grantSlots(SlotCount count)
{
return SlotAllocationPtr(new GrantedAllocation(count));
}
}

View File

@ -826,6 +826,11 @@ The server successfully detected this situation and will download merged part fr
M(ReadWriteBufferFromHTTPRequestsSent, "Number of HTTP requests sent by ReadWriteBufferFromHTTP") \
M(ReadWriteBufferFromHTTPBytes, "Total size of payload bytes received and sent by ReadWriteBufferFromHTTP. Doesn't include HTTP headers.") \
\
M(ConcurrencyControlGrantedHard, "Number of CPU slot granted according to guarantee of 1 thread per query and for queries with setting 'use_concurrency_control' = 0") \
M(ConcurrencyControlGrantDelayed, "Number of CPU slot not granted initially and required to wait for a free CPU slot") \
M(ConcurrencyControlAcquiredTotal, "Total number of CPU slot acquired") \
M(ConcurrencyControlAllocationDelayed, "Total number of CPU slot allocations (queries) that were required to wait for slots to upscale") \
\
M(GWPAsanAllocateSuccess, "Number of successful allocations done by GWPAsan") \
M(GWPAsanAllocateFailed, "Number of failed allocations done by GWPAsan (i.e. filled pool)") \
M(GWPAsanFree, "Number of free operations done by GWPAsan") \

View File

@ -666,6 +666,7 @@ void CacheDictionary<dictionary_key_type>::update(CacheDictionaryUpdateUnitPtr<d
Columns fetched_columns_during_update = fetch_request.makeAttributesResultColumnsNonMutable();
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
Block block;
while (executor.pull(block))
{

View File

@ -338,6 +338,7 @@ public:
, pipeline(std::move(pipeline_))
, executor(pipeline)
{
pipeline.setConcurrencyControl(false);
}
std::string getName() const override
@ -382,7 +383,6 @@ Pipe DirectDictionary<dictionary_key_type>::getSourcePipe(
ids.emplace_back(key);
auto pipeline = source_ptr->loadIds(ids);
if (use_async_executor)
pipe = Pipe(std::make_shared<SourceFromQueryPipeline<PullingAsyncPipelineExecutor>>(std::move(pipeline)));
else

View File

@ -454,6 +454,7 @@ void FlatDictionary::updateData()
{
QueryPipeline pipeline(source_ptr->loadUpdatedAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
update_field_loaded_block.reset();
Block block;
@ -495,6 +496,7 @@ void FlatDictionary::loadData()
{
QueryPipeline pipeline(source_ptr->loadAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
Block block;
while (executor.pull(block))

View File

@ -480,6 +480,7 @@ void HashedArrayDictionary<dictionary_key_type, sharded>::updateData()
{
QueryPipeline pipeline(source_ptr->loadUpdatedAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
update_field_loaded_block.reset();
Block block;
@ -978,6 +979,7 @@ void HashedArrayDictionary<dictionary_key_type, sharded>::loadData()
QueryPipeline pipeline(source_ptr->loadAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
UInt64 pull_time_microseconds = 0;
UInt64 process_time_microseconds = 0;

View File

@ -884,6 +884,7 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::updateData()
{
QueryPipeline pipeline(source_ptr->loadUpdatedAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
update_field_loaded_block.reset();
Block block;
@ -1163,6 +1164,7 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::loadData()
QueryPipeline pipeline(source_ptr->loadAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
Block block;
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder;

View File

@ -412,6 +412,7 @@ void IPAddressDictionary::loadData()
bool has_ipv6 = false;
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
Block block;
while (executor.pull(block))
{

View File

@ -291,6 +291,7 @@ void IPolygonDictionary::loadData()
QueryPipeline pipeline(source_ptr->loadAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
Block block;
while (executor.pull(block))
blockToAttributes(block);

View File

@ -541,6 +541,7 @@ void RangeHashedDictionary<dictionary_key_type>::loadData()
{
QueryPipeline pipeline(source_ptr->loadAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
Block block;
while (executor.pull(block))
@ -692,6 +693,7 @@ void RangeHashedDictionary<dictionary_key_type>::updateData()
{
QueryPipeline pipeline(source_ptr->loadUpdatedAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
update_field_loaded_block.reset();
Block block;

View File

@ -321,6 +321,7 @@ void RegExpTreeDictionary::loadData()
{
QueryPipeline pipeline(source_ptr->loadAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
Block block;
while (executor.pull(block))

View File

@ -199,6 +199,7 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr
PullingAsyncPipelineExecutor executor(io.pipeline);
io.pipeline.setProgressCallback(data.getContext()->getProgressCallback());
io.pipeline.setConcurrencyControl(data.getContext()->getSettingsRef().use_concurrency_control);
while (block.rows() == 0 && executor.pull(block))
{
}

View File

@ -77,7 +77,6 @@
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Parsers/formatAST.h>
#include <Parsers/QueryParameterVisitor.h>

View File

@ -26,6 +26,8 @@
#include <Analyzer/TableFunctionNode.h>
#include <Analyzer/Utils.h>
#include <Core/Settings.h>
#include <Interpreters/Context.h>
#include <Interpreters/QueryLog.h>
@ -249,6 +251,8 @@ QueryPipelineBuilder InterpreterSelectQueryAnalyzer::buildQueryPipeline()
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(context);
auto build_pipeline_settings = BuildQueryPipelineSettings::fromContext(context);
query_plan.setConcurrencyControl(context->getSettingsRef().use_concurrency_control);
return std::move(*query_plan.buildQueryPipeline(optimization_settings, build_pipeline_settings));
}

View File

@ -214,6 +214,7 @@ bool isStorageTouchedByMutations(
}
PullingAsyncPipelineExecutor executor(io.pipeline);
io.pipeline.setConcurrencyControl(context->getSettingsRef().use_concurrency_control);
Block block;
while (block.rows() == 0 && executor.pull(block));
@ -1292,6 +1293,10 @@ void MutationsInterpreter::Source::read(
void MutationsInterpreter::initQueryPlan(Stage & first_stage, QueryPlan & plan)
{
// Mutations are not using concurrency control now. Queries, merges and mutations running together could lead to CPU overcommit.
// TODO(serxa): Enable concurrency control for mutation queries and mutations. This should be done after CPU scheduler introduction.
plan.setConcurrencyControl(false);
source.read(first_stage, plan, metadata_snapshot, context, settings.apply_deleted_mask, settings.can_execute);
addCreatingSetsStep(plan, first_stage.analyzer->getPreparedSets(), context);
}

View File

@ -1,4 +1,5 @@
#include <IO/WriteBufferFromString.h>
#include "Common/ISlotControl.h"
#include <Common/ThreadPool.h>
#include <Common/CurrentThread.h>
#include <Common/CurrentMetrics.h>
@ -347,12 +348,22 @@ void PipelineExecutor::initializeExecution(size_t num_threads, bool concurrency_
is_execution_initialized = true;
tryUpdateExecutionStatus(ExecutionStatus::NotStarted, ExecutionStatus::Executing);
size_t use_threads = num_threads;
/// Allocate CPU slots from concurrency control
size_t min_threads = concurrency_control ? 1uz : num_threads;
cpu_slots = ConcurrencyControl::instance().allocate(min_threads, num_threads);
use_threads = cpu_slots->grantedCount();
if (concurrency_control)
{
/// Allocate CPU slots from concurrency control
constexpr size_t min_threads = 1uz; // Number of threads that should be granted to every query no matter how many threads are already running in other queries
cpu_slots = ConcurrencyControl::instance().allocate(min_threads, num_threads);
#ifndef NDEBUG
LOG_TEST(log, "Allocate CPU slots. min: {}, max: {}, granted: {}", min_threads, num_threads, cpu_slots->grantedCount());
#endif
}
else
{
/// If concurrency control is not used we should not even count threads as competing.
/// To avoid counting them in ConcurrencyControl, we create dummy slot allocation.
cpu_slots = grantSlots(num_threads);
}
size_t use_threads = cpu_slots->grantedCount();
Queue queue;
graph->initializeExecution(queue);

View File

@ -199,6 +199,7 @@ QueryPipelineBuilderPtr QueryPlan::buildQueryPipeline(
last_pipeline->setProgressCallback(build_pipeline_settings.progress_callback);
last_pipeline->setProcessListElement(build_pipeline_settings.process_list_element);
last_pipeline->addResources(std::move(resources));
last_pipeline->setConcurrencyControl(getConcurrencyControl());
return last_pipeline;
}

View File

@ -300,7 +300,8 @@ QueryPipelineBuilder QueryPipelineBuilder::unitePipelines(
/// It may happen if max_distributed_connections > max_threads
max_threads_limit = std::max(pipeline.max_threads, max_threads_limit);
concurrency_control = pipeline.getConcurrencyControl();
// Use concurrency control if at least one of pipelines is using it
concurrency_control = concurrency_control || pipeline.getConcurrencyControl();
}
QueryPipelineBuilder pipeline;
@ -311,8 +312,8 @@ QueryPipelineBuilder QueryPipelineBuilder::unitePipelines(
{
pipeline.setMaxThreads(max_threads);
pipeline.limitMaxThreads(max_threads_limit);
pipeline.setConcurrencyControl(concurrency_control);
}
pipeline.setConcurrencyControl(concurrency_control);
pipeline.setCollectedProcessors(nullptr);
return pipeline;

View File

@ -1246,6 +1246,7 @@ namespace
if (io.pipeline.pulling())
{
auto executor = std::make_shared<PullingAsyncPipelineExecutor>(io.pipeline);
io.pipeline.setConcurrencyControl(query_context->getSettingsRef().use_concurrency_control);
auto check_for_cancel = [&]
{
if (isQueryCancelled())

View File

@ -1110,6 +1110,7 @@ void TCPHandler::processOrdinaryQuery()
{
PullingAsyncPipelineExecutor executor(pipeline);
pipeline.setConcurrencyControl(query_context->getSettingsRef().use_concurrency_control);
CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread};
/// The following may happen:

View File

@ -452,6 +452,7 @@ void StorageLiveView::writeBlock(StorageLiveView & live_view, Block && block, Ch
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
PullingAsyncPipelineExecutor executor(pipeline);
pipeline.setConcurrencyControl(local_context->getSettingsRef().use_concurrency_control);
Block this_block;
while (executor.pull(this_block))
@ -588,6 +589,7 @@ MergeableBlocksPtr StorageLiveView::collectMergeableBlocks(ContextPtr local_cont
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
PullingAsyncPipelineExecutor executor(pipeline);
pipeline.setConcurrencyControl(local_context->getSettingsRef().use_concurrency_control);
Block this_block;
while (executor.pull(this_block))
@ -694,6 +696,7 @@ bool StorageLiveView::getNewBlocks(const std::lock_guard<std::mutex> & lock)
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
PullingAsyncPipelineExecutor executor(pipeline);
pipeline.setConcurrencyControl(false);
Block block;
while (executor.pull(block))
{

View File

@ -1735,6 +1735,10 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context);
auto builder = merge_parts_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings);
// Merges are not using concurrency control now. Queries and merges running together could lead to CPU overcommit.
// TODO(serxa): Enable concurrency control for merges. This should be done after CPU scheduler introduction.
builder->setConcurrencyControl(false);
global_ctx->merged_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
}

View File

@ -2062,6 +2062,7 @@ bool MutateTask::prepare()
context_for_reading->setSetting("apply_mutations_on_fly", false);
/// Skip using large sets in KeyCondition
context_for_reading->setSetting("use_index_for_in_with_subqueries_max_values", 100000);
context_for_reading->setSetting("use_concurrency_control", false);
for (const auto & command : *ctx->commands)
if (!canSkipMutationCommandForPart(ctx->source_part, command, context_for_reading))

View File

@ -661,6 +661,7 @@ std::pair<BlocksPtr, Block> StorageWindowView::getNewBlocks(UInt32 watermark)
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
PullingAsyncPipelineExecutor executor(pipeline);
pipeline.setConcurrencyControl(getContext()->getSettingsRef().use_concurrency_control);
Block block;
BlocksPtr new_blocks = std::make_shared<Blocks>();

View File

@ -116,6 +116,7 @@ Block TableFunctionFormat::parseData(const ColumnsDescription & columns, const S
});
}
builder.setConcurrencyControl(context->getSettingsRef().use_concurrency_control);
auto pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
auto reader = std::make_unique<PullingPipelineExecutor>(*pipeline);

View File

@ -139,13 +139,20 @@ def main():
jr = JobReport.load(from_file=report_file)
additional_files.append(report_file)
for file in set(jr.additional_files):
file_ = Path(file)
file_name = file_.name
orig_file = Path(file)
file_name = orig_file.name
file_name = file_name.replace(
".", "__" + CI.Utils.normalize_string(job_id) + ".", 1
)
file_ = file_.rename(file_.parent / file_name)
additional_files.append(file_)
new_file = orig_file.rename(orig_file.parent / file_name)
for tr in test_results:
if tr.log_files is None:
continue
tr.log_files = [
new_file if (Path(log_file) == orig_file) else Path(log_file)
for log_file in tr.log_files
]
additional_files.append(new_file)
JobReport(
description=description,

View File

@ -559,7 +559,7 @@ class CI:
JobNames.BUGFIX_VALIDATE: JobConfig(
run_by_label="pr-bugfix",
run_command="bugfix_validate_check.py",
timeout=900,
timeout=2400,
runner_type=Runners.STYLE_CHECKER,
),
}

View File

@ -7,6 +7,8 @@ from env_helper import GITHUB_REPOSITORY, GITHUB_RUN_URL, GITHUB_SERVER_URL
from report import GITHUB_JOB_URL, TestResults, create_test_html_report
from s3_helper import S3Helper
logger = logging.getLogger(__name__)
def process_logs(
s3_client: S3Helper,
@ -14,7 +16,7 @@ def process_logs(
s3_path_prefix: str,
test_results: TestResults,
) -> List[str]:
logging.info("Upload files to s3 %s", additional_logs)
logger.info("Upload files to s3 %s", additional_logs)
processed_logs = {} # type: Dict[str, str]
# Firstly convert paths of logs from test_results to urls to s3.
@ -28,9 +30,19 @@ def process_logs(
if path in processed_logs:
test_result.log_urls.append(processed_logs[str(path)])
elif path:
url = s3_client.upload_test_report_to_s3(
Path(path), s3_path_prefix + "/" + str(path)
)
try:
url = s3_client.upload_test_report_to_s3(
Path(path), s3_path_prefix + "/" + str(path)
)
except FileNotFoundError:
# Breaking the whole run on the malformed test is a bad idea
# FIXME: report the failure
logger.error(
"A broken TestResult, file '%s' does not exist: %s",
path,
test_result,
)
continue
test_result.log_urls.append(url)
processed_logs[str(path)] = url
@ -43,7 +55,7 @@ def process_logs(
)
)
else:
logging.error("File %s is missing - skip", log_path)
logger.error("File %s is missing - skip", log_path)
return additional_urls
@ -111,8 +123,8 @@ def upload_results(
report_path.write_text(html_report, encoding="utf-8")
url = s3_client.upload_test_report_to_s3(report_path, s3_path_prefix + ".html")
else:
logging.info("report.html was prepared by test job itself")
logger.info("report.html was prepared by test job itself")
url = ready_report_url
logging.info("Search result in url %s", url)
logger.info("Search result in url %s", url)
return url

View File

@ -1,4 +1,5 @@
<clickhouse>
<concurrent_threads_soft_limit_ratio_to_cores>0</concurrent_threads_soft_limit_ratio_to_cores>
<concurrent_threads_soft_limit_num>50</concurrent_threads_soft_limit_num>
<query_log>
<database>system</database>

View File

@ -28,6 +28,16 @@ node4 = cluster.add_instance(
)
def assert_profile_event(node, query_id, profile_event, check):
assert check(
int(
node.query(
f"select ProfileEvents['{profile_event}'] from system.query_log where current_database = currentDatabase() and query_id = '{query_id}' and type = 'QueryFinish' order by query_start_time_microseconds desc limit 1"
)
)
)
@pytest.fixture(scope="module")
def started_cluster():
try:
@ -43,39 +53,176 @@ def test_concurrent_threads_soft_limit_default(started_cluster):
query_id="test_concurrent_threads_soft_limit_1",
)
node1.query("SYSTEM FLUSH LOGS")
assert_profile_event(
node1,
"test_concurrent_threads_soft_limit_1",
"ConcurrencyControlGrantedHard",
lambda x: x == 1,
)
assert_profile_event(
node1,
"test_concurrent_threads_soft_limit_1",
"ConcurrencyControlGrantDelayed",
lambda x: x == 0,
)
assert_profile_event(
node1,
"test_concurrent_threads_soft_limit_1",
"ConcurrencyControlAcquiredTotal",
lambda x: x == 100,
)
assert_profile_event(
node1,
"test_concurrent_threads_soft_limit_1",
"ConcurrencyControlAllocationDelayed",
lambda x: x == 0,
)
assert (
node1.query(
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_1'"
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_1' order by query_start_time_microseconds desc limit 1"
)
== "102\n"
)
@pytest.mark.skip(reason="broken test")
def test_use_concurrency_control_default(started_cluster):
node1.query(
"SELECT count(*) FROM numbers_mt(10000000) SETTINGS use_concurrency_control = 0",
query_id="test_use_concurrency_control",
)
# Concurrency control is not used, all metrics should be zeros
node1.query("SYSTEM FLUSH LOGS")
assert_profile_event(
node1,
"test_use_concurrency_control",
"ConcurrencyControlGrantedHard",
lambda x: x == 0,
)
assert_profile_event(
node1,
"test_use_concurrency_control",
"ConcurrencyControlGrantDelayed",
lambda x: x == 0,
)
assert_profile_event(
node1,
"test_use_concurrency_control",
"ConcurrencyControlAcquiredTotal",
lambda x: x == 0,
)
assert_profile_event(
node1,
"test_use_concurrency_control",
"ConcurrencyControlAllocationDelayed",
lambda x: x == 0,
)
def test_concurrent_threads_soft_limit_defined_50(started_cluster):
node2.query(
"SELECT count(*) FROM numbers_mt(10000000)",
query_id="test_concurrent_threads_soft_limit_2",
)
node2.query("SYSTEM FLUSH LOGS")
assert_profile_event(
node2,
"test_concurrent_threads_soft_limit_2",
"ConcurrencyControlGrantedHard",
lambda x: x == 1,
)
assert_profile_event(
node2,
"test_concurrent_threads_soft_limit_2",
"ConcurrencyControlGrantDelayed",
lambda x: x == 50,
)
assert_profile_event(
node2,
"test_concurrent_threads_soft_limit_2",
"ConcurrencyControlAcquiredTotal",
lambda x: x == 50,
)
assert_profile_event(
node2,
"test_concurrent_threads_soft_limit_2",
"ConcurrencyControlAllocationDelayed",
lambda x: x == 1,
)
assert (
node2.query(
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_2'"
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_2' order by query_start_time_microseconds desc limit 1"
)
== "52\n"
)
@pytest.mark.skip(reason="broken test")
def test_use_concurrency_control_soft_limit_defined_50(started_cluster):
node2.query(
"SELECT count(*) FROM numbers_mt(10000000) SETTINGS use_concurrency_control = 0",
query_id="test_use_concurrency_control_2",
)
# Concurrency control is not used, all metrics should be zeros
node2.query("SYSTEM FLUSH LOGS")
assert_profile_event(
node2,
"test_use_concurrency_control_2",
"ConcurrencyControlGrantedHard",
lambda x: x == 0,
)
assert_profile_event(
node2,
"test_use_concurrency_control_2",
"ConcurrencyControlGrantDelayed",
lambda x: x == 0,
)
assert_profile_event(
node2,
"test_use_concurrency_control_2",
"ConcurrencyControlAcquiredTotal",
lambda x: x == 0,
)
assert_profile_event(
node2,
"test_use_concurrency_control_2",
"ConcurrencyControlAllocationDelayed",
lambda x: x == 0,
)
def test_concurrent_threads_soft_limit_defined_1(started_cluster):
node3.query(
"SELECT count(*) FROM numbers_mt(10000000)",
query_id="test_concurrent_threads_soft_limit_3",
)
node3.query("SYSTEM FLUSH LOGS")
assert_profile_event(
node3,
"test_concurrent_threads_soft_limit_3",
"ConcurrencyControlGrantedHard",
lambda x: x == 1,
)
assert_profile_event(
node3,
"test_concurrent_threads_soft_limit_3",
"ConcurrencyControlGrantDelayed",
lambda x: x == 99,
)
assert_profile_event(
node3,
"test_concurrent_threads_soft_limit_3",
"ConcurrencyControlAcquiredTotal",
lambda x: x == 1,
)
assert_profile_event(
node3,
"test_concurrent_threads_soft_limit_3",
"ConcurrencyControlAllocationDelayed",
lambda x: x == 1,
)
assert (
node3.query(
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_3'"
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_3' order by query_start_time_microseconds desc limit 1"
)
== "3\n"
)
@ -84,7 +231,6 @@ def test_concurrent_threads_soft_limit_defined_1(started_cluster):
# In config_limit_reached.xml there is concurrent_threads_soft_limit=10
# Background query starts in a separate thread to reach this limit.
# When this limit is reached the foreground query gets less than 5 queries despite the fact that it has settings max_threads=5
@pytest.mark.skip(reason="broken test")
def test_concurrent_threads_soft_limit_limit_reached(started_cluster):
def background_query():
try:
@ -117,8 +263,32 @@ def test_concurrent_threads_soft_limit_limit_reached(started_cluster):
)
node4.query("SYSTEM FLUSH LOGS")
assert_profile_event(
node4,
"test_concurrent_threads_soft_limit_4",
"ConcurrencyControlGrantedHard",
lambda x: x == 1,
)
assert_profile_event(
node4,
"test_concurrent_threads_soft_limit_4",
"ConcurrencyControlGrantDelayed",
lambda x: x > 0,
)
assert_profile_event(
node4,
"test_concurrent_threads_soft_limit_4",
"ConcurrencyControlAcquiredTotal",
lambda x: x < 5,
)
assert_profile_event(
node4,
"test_concurrent_threads_soft_limit_4",
"ConcurrencyControlAllocationDelayed",
lambda x: x == 1,
)
s_count = node4.query(
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_4'"
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_4' order by query_start_time_microseconds desc limit 1"
).strip()
if s_count:
count = int(s_count)

View File

@ -3,6 +3,7 @@
set log_queries=1;
set log_query_threads=1;
set max_threads=0;
set use_concurrency_control=0;
WITH 01091 AS id SELECT 1;
SYSTEM FLUSH LOGS;

View File

@ -6,6 +6,6 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} --log_queries=1 --max_threads=32 --query_id "2015_${CLICKHOUSE_DATABASE}_query" -q "select count() from remote('127.0.0.{2,3}', numbers(10)) where number global in (select number % 5 from numbers_mt(1000000))"
${CLICKHOUSE_CLIENT} --log_queries=1 --max_threads=32 --use_concurrency_control=0 --query_id "2015_${CLICKHOUSE_DATABASE}_query" -q "select count() from remote('127.0.0.{2,3}', numbers(10)) where number global in (select number % 5 from numbers_mt(1000000))"
${CLICKHOUSE_CLIENT} -q "system flush logs"
${CLICKHOUSE_CLIENT} -q "select length(thread_ids) >= 32 from system.query_log where event_date >= yesterday() and query_id = '2015_${CLICKHOUSE_DATABASE}_query' and type = 'QueryFinish' and current_database = currentDatabase()"

View File

@ -5,6 +5,9 @@
-- enforce some defaults to be sure that the env settings will not affect the test
SET max_threads=5, async_socket_for_remote=1, prefer_localhost_replica=1, optimize_read_in_order=1, load_marks_asynchronously=0, local_filesystem_read_method='pread', remote_filesystem_read_method='read', trace_profile_events=0;
-- we do not want concurrency control to limit the number of threads
SET use_concurrency_control=0;
-- we use query_thread_log to check peak thread usage
-- after https://github.com/ClickHouse/ClickHouse/issues/53417 there is a simpler way to check it
-- but that will not allow to backport the test to older versions

View File

@ -181,6 +181,9 @@ ComplexKeyCache
ComplexKeyDirect
ComplexKeyHashed
Composable
composable
ConcurrencyControlAcquired
ConcurrencyControlSoftLimit
Config
ConnectionDetails
Const