Compare commits

...

90 Commits

Author SHA1 Message Date
Sergei Trifonov
1b7b1325d4
Merge 758acb433d into c0c83236b6 2024-09-19 06:53:11 +00:00
Yakov Olkhovskiy
c0c83236b6
Merge pull request #69570 from alexkats/fix-azure
Mask azure connection string sensitive info
2024-09-19 05:40:47 +00:00
Yarik Briukhovetskyi
3eb5bc1a0f
Merge pull request #68963 from yariks5s/hive_partitioning_filtration
Filtering for hive partitioning
2024-09-18 22:16:58 +00:00
Robert Schulze
b94a7167a8
Merge pull request #69580 from rschu1ze/bump-libpqxx
Bump libpqxx to v7.7.5
2024-09-18 18:56:12 +00:00
Alex Katsman
b88cd79959 Mask azure connection string sensitive info 2024-09-18 18:32:22 +00:00
Konstantin Bogdanov
64e58baba1
Merge pull request #69682 from ClickHouse/more-asserts-for-hashjoin
Try fix asserts failure in `HashJoin`
2024-09-18 18:20:27 +00:00
max-vostrikov
a3fe155579
Merge pull request #69737 from ClickHouse/test_printf
added some edge cases for printf tests
2024-09-18 17:49:57 +00:00
maxvostrikov
f4b4b3cc35 added some edge cases for printf tests
added some edge cases for printf tests
2024-09-18 17:22:36 +02:00
Konstantin Bogdanov
cb24849396
Move assert 2024-09-18 15:24:48 +02:00
Mikhail f. Shiryaev
758acb433d
Replace renamed files in bugfix test results 2024-09-18 13:33:18 +02:00
Yarik Briukhovetskyi
143d9f0201
Merge branch 'ClickHouse:master' into hive_partitioning_filtration 2024-09-18 11:11:04 +02:00
Mikhail f. Shiryaev
7af0ec8b23
Increase bugfix timeout to 40 minutes 2024-09-17 19:26:54 +02:00
Konstantin Bogdanov
b08e727aef
Count allocated bytes from scratch after rerange 2024-09-17 19:02:10 +02:00
Yarik Briukhovetskyi
f52cdfb795
Merge branch 'ClickHouse:master' into hive_partitioning_filtration 2024-09-17 18:50:43 +02:00
Konstantin Bogdanov
a210f98819
Lint 2024-09-17 18:28:27 +02:00
Konstantin Bogdanov
7c5d55c6b2
Lint 2024-09-17 18:10:51 +02:00
Konstantin Bogdanov
80259659ff
More asserts 2024-09-17 18:03:19 +02:00
Yarik Briukhovetskyi
3a7c68a052
Update src/Storages/VirtualColumnUtils.cpp
Co-authored-by: Kruglov Pavel <48961922+Avogar@users.noreply.github.com>
2024-09-17 15:39:26 +02:00
Yarik Briukhovetskyi
e8d50aa97f
review 2024-09-17 15:02:33 +02:00
Mikhail f. Shiryaev
e5640acc52
Do not fail the job if an additional log is missing 2024-09-17 14:56:14 +02:00
Yarik Briukhovetskyi
cb92aaf968
fix 03232_file_path_normalizing 2024-09-17 11:26:13 +02:00
serxa
c75118c78d Merge branch 'master' into fix-use-concurrency-control 2024-09-16 17:36:07 +00:00
Yarik Briukhovetskyi
0cdec0acf1
fix logical error 2024-09-16 19:13:30 +02:00
serxa
d86ad992f1 Merge branch 'master' into fix-use-concurrency-control 2024-09-16 15:05:14 +00:00
serxa
10819dda2a make concurrency control integration test rerunnable 2024-09-16 15:04:26 +00:00
serxa
b8ea0e3396 avoid number of threads lowering to 16 by concurrency control 2024-09-16 14:53:31 +00:00
Yarik Briukhovetskyi
04f23332c3
fix filter issue 2024-09-16 15:59:22 +02:00
Yarik Briukhovetskyi
7d5203f8a7
add resize for partitioning_columns 2024-09-13 21:38:48 +02:00
Yarik Briukhovetskyi
0d1d750437
fix crash 2024-09-13 20:43:51 +02:00
Yarik Briukhovetskyi
ad31d86a15
move the block inserting 2024-09-13 19:58:19 +02:00
Yarik Briukhovetskyi
991279e5c6
revert 2024-09-13 19:23:00 +02:00
Yarik Briukhovetskyi
c184aae686
review 2024-09-13 16:40:01 +02:00
Yarik Briukhovetskyi
14a6b0422b
disable optimize_count_from_files 2024-09-13 16:33:17 +02:00
Robert Schulze
aab0d3dd9e
Bump to 7.7.5 2024-09-12 19:42:32 +00:00
Robert Schulze
5a34b9f24e
Bump to 7.6.1 2024-09-12 19:14:41 +00:00
Robert Schulze
a0a4858e00
Scratch build of libpqxx at 7.5.3 + patches 2024-09-12 18:55:35 +00:00
Sergei Trifonov
90645d7c0e
Merge branch 'master' into fix-use-concurrency-control 2024-09-11 18:48:11 +02:00
Yarik Briukhovetskyi
e8cec05d08
shellcheck 2024-09-11 13:52:20 +02:00
Yarik Briukhovetskyi
2876a4e714
add retries 2024-09-11 13:32:12 +02:00
Yarik Briukhovetskyi
a903e1a726
remove logging + fixing bug 2024-09-06 20:24:18 +02:00
Yarik Briukhovetskyi
2fa6be55ff
tests fix 2024-09-04 17:02:01 +02:00
Yarik Briukhovetskyi
8896d1b78b
try to fix tests 2024-09-04 14:46:29 +02:00
Yarik Briukhovetskyi
f688b903db
empty commit 2024-09-03 15:58:22 +02:00
Yarik Briukhovetskyi
21f9669836
empty commit 2024-09-03 15:41:43 +02:00
Yarik Briukhovetskyi
1a386ae4d5
Merge branch 'ClickHouse:master' into hive_partitioning_filtration 2024-09-03 15:35:31 +02:00
Yarik Briukhovetskyi
24f4e87f8b
revert debugging in tests 2024-09-03 15:20:22 +02:00
robot-clickhouse
b5eb0ef857 Automatic style fix 2024-08-30 17:57:54 +00:00
Max Kainov
78e8dbe008 increase flaky check timeout 1h->3h 2024-08-30 19:49:47 +02:00
serxa
5c18ffb8d1 Merge branch 'master' into fix-use-concurrency-control 2024-08-30 13:09:08 +00:00
Yarik Briukhovetskyi
620640a042
just to test 2024-08-30 12:58:21 +02:00
Yarik Briukhovetskyi
ec469a117d
testing 2024-08-30 00:56:35 +02:00
serxa
190d82ddd8 Merge branch 'master' into fix-use-concurrency-control 2024-08-29 19:31:58 +00:00
Yarik Briukhovetskyi
7a879980d8
try to fix tests 2024-08-29 18:25:11 +02:00
Yarik Briukhovetskyi
2adc61c215
add flush logs 2024-08-29 16:39:22 +02:00
serxa
d06dc22999 fix wrong conflict resolution 2024-08-29 14:17:14 +00:00
serxa
9e3f04c5eb fix wrong config settings inherited from server/config.xml 2024-08-29 12:17:46 +00:00
Yarik Briukhovetskyi
afc4d08aad
add no-fasttest tag 2024-08-29 13:31:05 +02:00
Sergei Trifonov
897e2def34
Merge branch 'master' into fix-use-concurrency-control 2024-08-29 12:42:37 +02:00
yariks5s
edc5d8dd92 fix path 2024-08-28 23:15:01 +00:00
yariks5s
d6b2a9d534 CLICKHOUSE_LOCAL -> CLIENT 2024-08-28 22:32:44 +00:00
yariks5s
dc97bd6b92 review + testing the code 2024-08-28 17:22:47 +00:00
Yarik Briukhovetskyi
60c6eb2610
trying to fix the test 2024-08-27 19:42:47 +02:00
Yarik Briukhovetskyi
9133505952
fix the test 2024-08-27 19:16:05 +02:00
Yarik Briukhovetskyi
2741bf00e4 chmod +x 2024-08-27 16:53:14 +00:00
Yarik Briukhovetskyi
4eca00a666
fix style 2024-08-27 18:10:41 +02:00
Yarik Briukhovetskyi
c6804122cb
fix shell 2024-08-27 16:52:29 +02:00
Yarik Briukhovetskyi
189cbe25fe
init 2024-08-27 16:28:18 +02:00
serxa
0632febfba better 2024-08-02 17:10:32 +00:00
serxa
67f51ded37 fix 02532_send_logs_level_test 2024-08-02 17:07:10 +00:00
serxa
14666f9be3 threads created under use_concurency_control=0 should not occupy a CPU slot 2024-08-02 17:00:32 +00:00
serxa
2f101367db add logging on CPU slot allocation in PipelineExecutor 2024-08-02 15:42:32 +00:00
serxa
06cac68db7 Merge branch 'master' into fix-use-concurrency-control 2024-08-02 15:12:12 +00:00
serxa
141179736e fix tidy build 2024-08-02 15:10:46 +00:00
serxa
184c156e09 Merge branch 'master' into fix-use-concurrency-control 2024-07-31 17:21:55 +00:00
serxa
cf9ea4af10 fix build 2024-07-31 08:47:16 +00:00
serxa
30fed916fa set use_concurrency_control=0 if some tests 2024-07-31 07:30:49 +00:00
serxa
384e2ff399 Merge branch 'master' into fix-use-concurrency-control 2024-07-31 07:25:16 +00:00
serxa
091883b8eb Merge branch 'fix-use-concurrency-control' of github.com:ClickHouse/ClickHouse into fix-use-concurrency-control 2024-07-12 13:02:14 +00:00
serxa
cb6438e472 fix style 2024-07-12 13:01:52 +00:00
robot-clickhouse
1cdcc0da57 Automatic style fix 2024-07-12 12:23:39 +00:00
serxa
6472fea0fd add tests for 'use_concurrency_control' and related profile events 2024-07-12 12:14:25 +00:00
serxa
dec92f60c6 fix ConcurrencyControlGrantedHard profile event 2024-07-12 12:13:33 +00:00
serxa
060ec6b68c return back normal concurrency_control passing logic 2024-07-12 12:12:57 +00:00
serxa
bf125d5da6 Merge branch 'master' into fix-use-concurrency-control 2024-07-12 08:44:12 +00:00
Sergei Trifonov
dd22140b57
Merge branch 'master' into fix-use-concurrency-control 2024-05-22 11:36:20 +02:00
serxa
d01ab205a9 aspell fix 2024-05-22 09:34:23 +00:00
serxa
e66e9d34ac Fixes 2024-05-14 16:56:06 +00:00
serxa
0acdfd6a37 Explicit passing of use_concurrency_control setting to find all place where it is not set at all 2024-05-14 10:41:40 +00:00
Sergei Trifonov
b252062767
Merge branch 'master' into fix-use-concurrency-control 2024-04-01 18:32:40 +02:00
serxa
dd6c1ac19a add diagnostics for concurrency control 2024-03-15 18:40:50 +00:00
69 changed files with 707 additions and 173 deletions

2
contrib/libpqxx vendored

@ -1 +1 @@
Subproject commit c995193a3a14d71f4711f1f421f65a1a1db64640 Subproject commit 41e4c331564167cca97ad6eccbd5b8879c2ca044

View File

@ -1,9 +1,9 @@
set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/libpqxx") set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/libpqxx")
set (SRCS set (SRCS
"${LIBRARY_DIR}/src/strconv.cxx"
"${LIBRARY_DIR}/src/array.cxx" "${LIBRARY_DIR}/src/array.cxx"
"${LIBRARY_DIR}/src/binarystring.cxx" "${LIBRARY_DIR}/src/binarystring.cxx"
"${LIBRARY_DIR}/src/blob.cxx"
"${LIBRARY_DIR}/src/connection.cxx" "${LIBRARY_DIR}/src/connection.cxx"
"${LIBRARY_DIR}/src/cursor.cxx" "${LIBRARY_DIR}/src/cursor.cxx"
"${LIBRARY_DIR}/src/encodings.cxx" "${LIBRARY_DIR}/src/encodings.cxx"
@ -12,59 +12,25 @@ set (SRCS
"${LIBRARY_DIR}/src/field.cxx" "${LIBRARY_DIR}/src/field.cxx"
"${LIBRARY_DIR}/src/largeobject.cxx" "${LIBRARY_DIR}/src/largeobject.cxx"
"${LIBRARY_DIR}/src/notification.cxx" "${LIBRARY_DIR}/src/notification.cxx"
"${LIBRARY_DIR}/src/params.cxx"
"${LIBRARY_DIR}/src/pipeline.cxx" "${LIBRARY_DIR}/src/pipeline.cxx"
"${LIBRARY_DIR}/src/result.cxx" "${LIBRARY_DIR}/src/result.cxx"
"${LIBRARY_DIR}/src/robusttransaction.cxx" "${LIBRARY_DIR}/src/robusttransaction.cxx"
"${LIBRARY_DIR}/src/row.cxx"
"${LIBRARY_DIR}/src/sql_cursor.cxx" "${LIBRARY_DIR}/src/sql_cursor.cxx"
"${LIBRARY_DIR}/src/strconv.cxx"
"${LIBRARY_DIR}/src/stream_from.cxx" "${LIBRARY_DIR}/src/stream_from.cxx"
"${LIBRARY_DIR}/src/stream_to.cxx" "${LIBRARY_DIR}/src/stream_to.cxx"
"${LIBRARY_DIR}/src/subtransaction.cxx" "${LIBRARY_DIR}/src/subtransaction.cxx"
"${LIBRARY_DIR}/src/time.cxx"
"${LIBRARY_DIR}/src/transaction.cxx" "${LIBRARY_DIR}/src/transaction.cxx"
"${LIBRARY_DIR}/src/transaction_base.cxx" "${LIBRARY_DIR}/src/transaction_base.cxx"
"${LIBRARY_DIR}/src/row.cxx"
"${LIBRARY_DIR}/src/params.cxx"
"${LIBRARY_DIR}/src/util.cxx" "${LIBRARY_DIR}/src/util.cxx"
"${LIBRARY_DIR}/src/version.cxx" "${LIBRARY_DIR}/src/version.cxx"
"${LIBRARY_DIR}/src/wait.cxx"
) )
# Need to explicitly include each header file, because in the directory include/pqxx there are also files add_library(_libpqxx ${SRCS})
# like just 'array'. So if including the whole directory with `target_include_directories`, it will make
# conflicts with all includes of <array>.
set (HDRS
"${LIBRARY_DIR}/include/pqxx/array.hxx"
"${LIBRARY_DIR}/include/pqxx/params.hxx"
"${LIBRARY_DIR}/include/pqxx/binarystring.hxx"
"${LIBRARY_DIR}/include/pqxx/composite.hxx"
"${LIBRARY_DIR}/include/pqxx/connection.hxx"
"${LIBRARY_DIR}/include/pqxx/cursor.hxx"
"${LIBRARY_DIR}/include/pqxx/dbtransaction.hxx"
"${LIBRARY_DIR}/include/pqxx/errorhandler.hxx"
"${LIBRARY_DIR}/include/pqxx/except.hxx"
"${LIBRARY_DIR}/include/pqxx/field.hxx"
"${LIBRARY_DIR}/include/pqxx/isolation.hxx"
"${LIBRARY_DIR}/include/pqxx/largeobject.hxx"
"${LIBRARY_DIR}/include/pqxx/nontransaction.hxx"
"${LIBRARY_DIR}/include/pqxx/notification.hxx"
"${LIBRARY_DIR}/include/pqxx/pipeline.hxx"
"${LIBRARY_DIR}/include/pqxx/prepared_statement.hxx"
"${LIBRARY_DIR}/include/pqxx/result.hxx"
"${LIBRARY_DIR}/include/pqxx/robusttransaction.hxx"
"${LIBRARY_DIR}/include/pqxx/row.hxx"
"${LIBRARY_DIR}/include/pqxx/separated_list.hxx"
"${LIBRARY_DIR}/include/pqxx/strconv.hxx"
"${LIBRARY_DIR}/include/pqxx/stream_from.hxx"
"${LIBRARY_DIR}/include/pqxx/stream_to.hxx"
"${LIBRARY_DIR}/include/pqxx/subtransaction.hxx"
"${LIBRARY_DIR}/include/pqxx/transaction.hxx"
"${LIBRARY_DIR}/include/pqxx/transaction_base.hxx"
"${LIBRARY_DIR}/include/pqxx/types.hxx"
"${LIBRARY_DIR}/include/pqxx/util.hxx"
"${LIBRARY_DIR}/include/pqxx/version.hxx"
"${LIBRARY_DIR}/include/pqxx/zview.hxx"
)
add_library(_libpqxx ${SRCS} ${HDRS})
target_link_libraries(_libpqxx PUBLIC ch_contrib::libpq) target_link_libraries(_libpqxx PUBLIC ch_contrib::libpq)
target_include_directories (_libpqxx SYSTEM BEFORE PUBLIC "${LIBRARY_DIR}/include") target_include_directories (_libpqxx SYSTEM BEFORE PUBLIC "${LIBRARY_DIR}/include")

View File

@ -737,6 +737,14 @@ Number of sessions (connections) to ZooKeeper. Should be no more than one, becau
Number of watches (event subscriptions) in ZooKeeper. Number of watches (event subscriptions) in ZooKeeper.
### ConcurrencyControlAcquired
Total number of acquired CPU slots.
### ConcurrencyControlSoftLimit
Value of soft limit on number of CPU slots.
**See Also** **See Also**
- [system.asynchronous_metrics](../../operations/system-tables/asynchronous_metrics.md#system_tables-asynchronous_metrics) — Contains periodically calculated metrics. - [system.asynchronous_metrics](../../operations/system-tables/asynchronous_metrics.md#system_tables-asynchronous_metrics) — Contains periodically calculated metrics.

View File

@ -1631,6 +1631,7 @@ try
concurrent_threads_soft_limit = value; concurrent_threads_soft_limit = value;
} }
ConcurrencyControl::instance().setMaxConcurrency(concurrent_threads_soft_limit); ConcurrencyControl::instance().setMaxConcurrency(concurrent_threads_soft_limit);
LOG_INFO(log, "ConcurrencyControl limit is set to {}", concurrent_threads_soft_limit);
global_context->getProcessList().setMaxSize(new_server_settings.max_concurrent_queries); global_context->getProcessList().setMaxSize(new_server_settings.max_concurrent_queries);
global_context->getProcessList().setMaxInsertQueriesAmount(new_server_settings.max_concurrent_insert_queries); global_context->getProcessList().setMaxInsertQueriesAmount(new_server_settings.max_concurrent_insert_queries);

View File

@ -537,6 +537,7 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, Iden
PullingAsyncPipelineExecutor executor(io.pipeline); PullingAsyncPipelineExecutor executor(io.pipeline);
io.pipeline.setProgressCallback(context->getProgressCallback()); io.pipeline.setProgressCallback(context->getProgressCallback());
io.pipeline.setProcessListElement(context->getProcessListElement()); io.pipeline.setProcessListElement(context->getProcessListElement());
io.pipeline.setConcurrencyControl(context->getSettingsRef().use_concurrency_control);
Block block; Block block;

View File

@ -1701,6 +1701,9 @@ try
QueryPipeline pipeline(std::move(pipe)); QueryPipeline pipeline(std::move(pipe));
PullingAsyncPipelineExecutor executor(pipeline); PullingAsyncPipelineExecutor executor(pipeline);
/// Concurrency control in client is not required
pipeline.setConcurrencyControl(false);
if (need_render_progress) if (need_render_progress)
{ {
pipeline.setProgressCallback([this](const Progress & progress){ onProgress(progress); }); pipeline.setProgressCallback([this](const Progress & progress){ onProgress(progress); });

View File

@ -240,6 +240,7 @@ void LocalConnection::sendQuery(
{ {
state->block = state->io.pipeline.getHeader(); state->block = state->io.pipeline.getHeader();
state->executor = std::make_unique<PullingAsyncPipelineExecutor>(state->io.pipeline); state->executor = std::make_unique<PullingAsyncPipelineExecutor>(state->io.pipeline);
state->io.pipeline.setConcurrencyControl(false);
} }
else if (state->io.pipeline.completed()) else if (state->io.pipeline.completed())
{ {

View File

@ -1,7 +1,23 @@
#include <Common/ISlotControl.h>
#include <Common/ConcurrencyControl.h> #include <Common/ConcurrencyControl.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/ProfileEvents.h>
namespace ProfileEvents
{
extern const Event ConcurrencyControlGrantedHard;
extern const Event ConcurrencyControlGrantDelayed;
extern const Event ConcurrencyControlAcquiredTotal;
extern const Event ConcurrencyControlAllocationDelayed;
}
namespace CurrentMetrics
{
extern const Metric ConcurrencyControlAcquired;
extern const Metric ConcurrencyControlSoftLimit;
}
namespace DB namespace DB
{ {
@ -17,6 +33,7 @@ ConcurrencyControl::Slot::~Slot()
ConcurrencyControl::Slot::Slot(SlotAllocationPtr && allocation_) ConcurrencyControl::Slot::Slot(SlotAllocationPtr && allocation_)
: allocation(std::move(allocation_)) : allocation(std::move(allocation_))
, acquired_slot_increment(CurrentMetrics::ConcurrencyControlAcquired)
{ {
} }
@ -34,6 +51,7 @@ ConcurrencyControl::Allocation::~Allocation()
{ {
if (granted.compare_exchange_strong(value, value - 1)) if (granted.compare_exchange_strong(value, value - 1))
{ {
ProfileEvents::increment(ProfileEvents::ConcurrencyControlAcquiredTotal, 1);
std::unique_lock lock{mutex}; std::unique_lock lock{mutex};
return AcquiredSlotPtr(new Slot(shared_from_this())); // can't use std::make_shared due to private ctor return AcquiredSlotPtr(new Slot(shared_from_this())); // can't use std::make_shared due to private ctor
} }
@ -84,6 +102,7 @@ void ConcurrencyControl::Allocation::release()
ConcurrencyControl::ConcurrencyControl() ConcurrencyControl::ConcurrencyControl()
: cur_waiter(waiters.end()) : cur_waiter(waiters.end())
, max_concurrency_metric(CurrentMetrics::ConcurrencyControlSoftLimit, 0)
{ {
} }
@ -103,11 +122,16 @@ ConcurrencyControl::~ConcurrencyControl()
// Acquire as many slots as we can, but not lower than `min` // Acquire as many slots as we can, but not lower than `min`
SlotCount granted = std::max(min, std::min(max, available(lock))); SlotCount granted = std::max(min, std::min(max, available(lock)));
cur_concurrency += granted; cur_concurrency += granted;
ProfileEvents::increment(ProfileEvents::ConcurrencyControlGrantedHard, min);
// Create allocation and start waiting if more slots are required // Create allocation and start waiting if more slots are required
if (granted < max) if (granted < max)
{
ProfileEvents::increment(ProfileEvents::ConcurrencyControlGrantDelayed, max - granted);
ProfileEvents::increment(ProfileEvents::ConcurrencyControlAllocationDelayed);
return SlotAllocationPtr(new Allocation(*this, max, granted, return SlotAllocationPtr(new Allocation(*this, max, granted,
waiters.insert(cur_waiter, nullptr /* pointer is set by Allocation ctor */))); waiters.insert(cur_waiter, nullptr /* pointer is set by Allocation ctor */)));
}
else else
return SlotAllocationPtr(new Allocation(*this, max, granted)); return SlotAllocationPtr(new Allocation(*this, max, granted));
} }
@ -116,6 +140,7 @@ void ConcurrencyControl::setMaxConcurrency(SlotCount value)
{ {
std::unique_lock lock{mutex}; std::unique_lock lock{mutex};
max_concurrency = std::max<SlotCount>(1, value); // never allow max_concurrency to be zero max_concurrency = std::max<SlotCount>(1, value); // never allow max_concurrency to be zero
max_concurrency_metric.changeTo(max_concurrency == UnlimitedSlots ? 0 : max_concurrency);
schedule(lock); schedule(lock);
} }

View File

@ -8,6 +8,7 @@
#include <base/types.h> #include <base/types.h>
#include <boost/core/noncopyable.hpp> #include <boost/core/noncopyable.hpp>
#include <Common/CurrentMetrics.h>
#include <Common/ISlotControl.h> #include <Common/ISlotControl.h>
namespace DB namespace DB
@ -53,6 +54,7 @@ public:
explicit Slot(SlotAllocationPtr && allocation_); explicit Slot(SlotAllocationPtr && allocation_);
SlotAllocationPtr allocation; SlotAllocationPtr allocation;
CurrentMetrics::Increment acquired_slot_increment;
}; };
// Manages group of slots for a single query, see ConcurrencyControl::allocate(min, max) // Manages group of slots for a single query, see ConcurrencyControl::allocate(min, max)
@ -131,6 +133,7 @@ private:
Waiters::iterator cur_waiter; // round-robin pointer Waiters::iterator cur_waiter; // round-robin pointer
SlotCount max_concurrency = UnlimitedSlots; SlotCount max_concurrency = UnlimitedSlots;
SlotCount cur_concurrency = 0; SlotCount cur_concurrency = 0;
CurrentMetrics::Increment max_concurrency_metric;
}; };
} }

View File

@ -313,6 +313,9 @@
M(FilteringMarksWithPrimaryKey, "Number of threads currently doing filtering of mark ranges by the primary key") \ M(FilteringMarksWithPrimaryKey, "Number of threads currently doing filtering of mark ranges by the primary key") \
M(FilteringMarksWithSecondaryKeys, "Number of threads currently doing filtering of mark ranges by secondary keys") \ M(FilteringMarksWithSecondaryKeys, "Number of threads currently doing filtering of mark ranges by secondary keys") \
\ \
M(ConcurrencyControlAcquired, "Total number of acquired CPU slots") \
M(ConcurrencyControlSoftLimit, "Value of soft limit on number of CPU slots") \
\
M(DiskS3NoSuchKeyErrors, "The number of `NoSuchKey` errors that occur when reading data from S3 cloud storage through ClickHouse disks.") \ M(DiskS3NoSuchKeyErrors, "The number of `NoSuchKey` errors that occur when reading data from S3 cloud storage through ClickHouse disks.") \
#ifdef APPLY_FOR_EXTERNAL_METRICS #ifdef APPLY_FOR_EXTERNAL_METRICS

View File

@ -73,4 +73,44 @@ public:
[[nodiscard]] virtual SlotAllocationPtr allocate(SlotCount min, SlotCount max) = 0; [[nodiscard]] virtual SlotAllocationPtr allocate(SlotCount min, SlotCount max) = 0;
}; };
/// Allocation that grants all the slots immediately on creation
class GrantedAllocation : public ISlotAllocation
{
public:
explicit GrantedAllocation(SlotCount granted_)
: granted(granted_)
, allocated(granted_)
{}
[[nodiscard]] AcquiredSlotPtr tryAcquire() override
{
SlotCount value = granted.load();
while (value)
{
if (granted.compare_exchange_strong(value, value - 1))
return std::make_shared<IAcquiredSlot>();
}
return {};
}
SlotCount grantedCount() const override
{
return granted.load();
}
SlotCount allocatedCount() const override
{
return allocated;
}
private:
std::atomic<SlotCount> granted; // allocated, but not yet acquired
const SlotCount allocated;
};
[[nodiscard]] inline SlotAllocationPtr grantSlots(SlotCount count)
{
return SlotAllocationPtr(new GrantedAllocation(count));
}
} }

View File

@ -826,6 +826,11 @@ The server successfully detected this situation and will download merged part fr
M(ReadWriteBufferFromHTTPRequestsSent, "Number of HTTP requests sent by ReadWriteBufferFromHTTP") \ M(ReadWriteBufferFromHTTPRequestsSent, "Number of HTTP requests sent by ReadWriteBufferFromHTTP") \
M(ReadWriteBufferFromHTTPBytes, "Total size of payload bytes received and sent by ReadWriteBufferFromHTTP. Doesn't include HTTP headers.") \ M(ReadWriteBufferFromHTTPBytes, "Total size of payload bytes received and sent by ReadWriteBufferFromHTTP. Doesn't include HTTP headers.") \
\ \
M(ConcurrencyControlGrantedHard, "Number of CPU slot granted according to guarantee of 1 thread per query and for queries with setting 'use_concurrency_control' = 0") \
M(ConcurrencyControlGrantDelayed, "Number of CPU slot not granted initially and required to wait for a free CPU slot") \
M(ConcurrencyControlAcquiredTotal, "Total number of CPU slot acquired") \
M(ConcurrencyControlAllocationDelayed, "Total number of CPU slot allocations (queries) that were required to wait for slots to upscale") \
\
M(GWPAsanAllocateSuccess, "Number of successful allocations done by GWPAsan") \ M(GWPAsanAllocateSuccess, "Number of successful allocations done by GWPAsan") \
M(GWPAsanAllocateFailed, "Number of failed allocations done by GWPAsan (i.e. filled pool)") \ M(GWPAsanAllocateFailed, "Number of failed allocations done by GWPAsan (i.e. filled pool)") \
M(GWPAsanFree, "Number of free operations done by GWPAsan") \ M(GWPAsanFree, "Number of free operations done by GWPAsan") \

View File

@ -151,6 +151,15 @@ Names NamesAndTypesList::getNames() const
return res; return res;
} }
NameSet NamesAndTypesList::getNameSet() const
{
NameSet res;
res.reserve(size());
for (const NameAndTypePair & column : *this)
res.insert(column.name);
return res;
}
DataTypes NamesAndTypesList::getTypes() const DataTypes NamesAndTypesList::getTypes() const
{ {
DataTypes res; DataTypes res;

View File

@ -100,6 +100,7 @@ public:
void getDifference(const NamesAndTypesList & rhs, NamesAndTypesList & deleted, NamesAndTypesList & added) const; void getDifference(const NamesAndTypesList & rhs, NamesAndTypesList & deleted, NamesAndTypesList & added) const;
Names getNames() const; Names getNames() const;
NameSet getNameSet() const;
DataTypes getTypes() const; DataTypes getTypes() const;
/// Remove columns which names are not in the `names`. /// Remove columns which names are not in the `names`.

View File

@ -666,6 +666,7 @@ void CacheDictionary<dictionary_key_type>::update(CacheDictionaryUpdateUnitPtr<d
Columns fetched_columns_during_update = fetch_request.makeAttributesResultColumnsNonMutable(); Columns fetched_columns_during_update = fetch_request.makeAttributesResultColumnsNonMutable();
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor); DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
Block block; Block block;
while (executor.pull(block)) while (executor.pull(block))
{ {

View File

@ -334,6 +334,7 @@ public:
, pipeline(std::move(pipeline_)) , pipeline(std::move(pipeline_))
, executor(pipeline) , executor(pipeline)
{ {
pipeline.setConcurrencyControl(false);
} }
std::string getName() const override std::string getName() const override
@ -378,7 +379,6 @@ Pipe DirectDictionary<dictionary_key_type>::getSourcePipe(
ids.emplace_back(key); ids.emplace_back(key);
auto pipeline = source_ptr->loadIds(ids); auto pipeline = source_ptr->loadIds(ids);
if (use_async_executor) if (use_async_executor)
pipe = Pipe(std::make_shared<SourceFromQueryPipeline<PullingAsyncPipelineExecutor>>(std::move(pipeline))); pipe = Pipe(std::make_shared<SourceFromQueryPipeline<PullingAsyncPipelineExecutor>>(std::move(pipeline)));
else else

View File

@ -454,6 +454,7 @@ void FlatDictionary::updateData()
{ {
QueryPipeline pipeline(source_ptr->loadUpdatedAll()); QueryPipeline pipeline(source_ptr->loadUpdatedAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor); DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
update_field_loaded_block.reset(); update_field_loaded_block.reset();
Block block; Block block;
@ -495,6 +496,7 @@ void FlatDictionary::loadData()
{ {
QueryPipeline pipeline(source_ptr->loadAll()); QueryPipeline pipeline(source_ptr->loadAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor); DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
Block block; Block block;
while (executor.pull(block)) while (executor.pull(block))

View File

@ -475,6 +475,7 @@ void HashedArrayDictionary<dictionary_key_type, sharded>::updateData()
{ {
QueryPipeline pipeline(source_ptr->loadUpdatedAll()); QueryPipeline pipeline(source_ptr->loadUpdatedAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor); DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
update_field_loaded_block.reset(); update_field_loaded_block.reset();
Block block; Block block;
@ -973,6 +974,7 @@ void HashedArrayDictionary<dictionary_key_type, sharded>::loadData()
QueryPipeline pipeline(source_ptr->loadAll()); QueryPipeline pipeline(source_ptr->loadAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor); DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
UInt64 pull_time_microseconds = 0; UInt64 pull_time_microseconds = 0;
UInt64 process_time_microseconds = 0; UInt64 process_time_microseconds = 0;

View File

@ -884,6 +884,7 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::updateData()
{ {
QueryPipeline pipeline(source_ptr->loadUpdatedAll()); QueryPipeline pipeline(source_ptr->loadUpdatedAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor); DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
update_field_loaded_block.reset(); update_field_loaded_block.reset();
Block block; Block block;
@ -1163,6 +1164,7 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::loadData()
QueryPipeline pipeline(source_ptr->loadAll()); QueryPipeline pipeline(source_ptr->loadAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor); DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
Block block; Block block;
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder; DictionaryKeysArenaHolder<dictionary_key_type> arena_holder;

View File

@ -407,6 +407,7 @@ void IPAddressDictionary::loadData()
bool has_ipv6 = false; bool has_ipv6 = false;
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor); DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
Block block; Block block;
while (executor.pull(block)) while (executor.pull(block))
{ {

View File

@ -291,6 +291,7 @@ void IPolygonDictionary::loadData()
QueryPipeline pipeline(source_ptr->loadAll()); QueryPipeline pipeline(source_ptr->loadAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor); DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
Block block; Block block;
while (executor.pull(block)) while (executor.pull(block))
blockToAttributes(block); blockToAttributes(block);

View File

@ -541,6 +541,7 @@ void RangeHashedDictionary<dictionary_key_type>::loadData()
{ {
QueryPipeline pipeline(source_ptr->loadAll()); QueryPipeline pipeline(source_ptr->loadAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor); DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
Block block; Block block;
while (executor.pull(block)) while (executor.pull(block))
@ -692,6 +693,7 @@ void RangeHashedDictionary<dictionary_key_type>::updateData()
{ {
QueryPipeline pipeline(source_ptr->loadUpdatedAll()); QueryPipeline pipeline(source_ptr->loadUpdatedAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor); DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
update_field_loaded_block.reset(); update_field_loaded_block.reset();
Block block; Block block;

View File

@ -314,6 +314,7 @@ void RegExpTreeDictionary::loadData()
{ {
QueryPipeline pipeline(source_ptr->loadAll()); QueryPipeline pipeline(source_ptr->loadAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor); DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
Block block; Block block;
while (executor.pull(block)) while (executor.pull(block))

View File

@ -193,6 +193,7 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr
PullingAsyncPipelineExecutor executor(io.pipeline); PullingAsyncPipelineExecutor executor(io.pipeline);
io.pipeline.setProgressCallback(data.getContext()->getProgressCallback()); io.pipeline.setProgressCallback(data.getContext()->getProgressCallback());
io.pipeline.setConcurrencyControl(data.getContext()->getSettingsRef().use_concurrency_control);
while (block.rows() == 0 && executor.pull(block)) while (block.rows() == 0 && executor.pull(block))
{ {
} }

View File

@ -77,7 +77,6 @@
#include <IO/Operators.h> #include <IO/Operators.h>
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Processors/QueryPlan/QueryPlan.h> #include <Processors/QueryPlan/QueryPlan.h>
#include <Parsers/formatAST.h> #include <Parsers/formatAST.h>
#include <Parsers/QueryParameterVisitor.h> #include <Parsers/QueryParameterVisitor.h>

View File

@ -338,11 +338,8 @@ size_t HashJoin::getTotalRowCount() const
return res; return res;
} }
size_t HashJoin::getTotalByteCount() const void HashJoin::doDebugAsserts() const
{ {
if (!data)
return 0;
#ifndef NDEBUG #ifndef NDEBUG
size_t debug_blocks_allocated_size = 0; size_t debug_blocks_allocated_size = 0;
for (const auto & block : data->blocks) for (const auto & block : data->blocks)
@ -360,6 +357,14 @@ size_t HashJoin::getTotalByteCount() const
throw Exception(ErrorCodes::LOGICAL_ERROR, "data->blocks_nullmaps_allocated_size != debug_blocks_nullmaps_allocated_size ({} != {})", throw Exception(ErrorCodes::LOGICAL_ERROR, "data->blocks_nullmaps_allocated_size != debug_blocks_nullmaps_allocated_size ({} != {})",
data->blocks_nullmaps_allocated_size, debug_blocks_nullmaps_allocated_size); data->blocks_nullmaps_allocated_size, debug_blocks_nullmaps_allocated_size);
#endif #endif
}
size_t HashJoin::getTotalByteCount() const
{
if (!data)
return 0;
doDebugAsserts();
size_t res = 0; size_t res = 0;
@ -544,9 +549,11 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
have_compressed = true; have_compressed = true;
} }
doDebugAsserts();
data->blocks_allocated_size += block_to_save.allocatedBytes(); data->blocks_allocated_size += block_to_save.allocatedBytes();
data->blocks.emplace_back(std::move(block_to_save)); data->blocks.emplace_back(std::move(block_to_save));
Block * stored_block = &data->blocks.back(); Block * stored_block = &data->blocks.back();
doDebugAsserts();
if (rows) if (rows)
data->empty = false; data->empty = false;
@ -634,9 +641,11 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
if (!flag_per_row && !is_inserted) if (!flag_per_row && !is_inserted)
{ {
doDebugAsserts();
LOG_TRACE(log, "Skipping inserting block with {} rows", rows); LOG_TRACE(log, "Skipping inserting block with {} rows", rows);
data->blocks_allocated_size -= stored_block->allocatedBytes(); data->blocks_allocated_size -= stored_block->allocatedBytes();
data->blocks.pop_back(); data->blocks.pop_back();
doDebugAsserts();
} }
if (!check_limits) if (!check_limits)
@ -683,6 +692,8 @@ void HashJoin::shrinkStoredBlocksToFit(size_t & total_bytes_in_join, bool force_
for (auto & stored_block : data->blocks) for (auto & stored_block : data->blocks)
{ {
doDebugAsserts();
size_t old_size = stored_block.allocatedBytes(); size_t old_size = stored_block.allocatedBytes();
stored_block = stored_block.shrinkToFit(); stored_block = stored_block.shrinkToFit();
size_t new_size = stored_block.allocatedBytes(); size_t new_size = stored_block.allocatedBytes();
@ -700,6 +711,8 @@ void HashJoin::shrinkStoredBlocksToFit(size_t & total_bytes_in_join, bool force_
else else
/// Sometimes after clone resized block can be bigger than original /// Sometimes after clone resized block can be bigger than original
data->blocks_allocated_size += new_size - old_size; data->blocks_allocated_size += new_size - old_size;
doDebugAsserts();
} }
auto new_total_bytes_in_join = getTotalByteCount(); auto new_total_bytes_in_join = getTotalByteCount();
@ -1416,7 +1429,13 @@ void HashJoin::tryRerangeRightTableDataImpl(Map & map [[maybe_unused]])
}; };
BlocksList sorted_blocks; BlocksList sorted_blocks;
visit_rows_map(sorted_blocks, map); visit_rows_map(sorted_blocks, map);
doDebugAsserts();
data->blocks.swap(sorted_blocks); data->blocks.swap(sorted_blocks);
size_t new_blocks_allocated_size = 0;
for (const auto & block : data->blocks)
new_blocks_allocated_size += block.allocatedBytes();
data->blocks_allocated_size = new_blocks_allocated_size;
doDebugAsserts();
} }
} }

View File

@ -470,6 +470,7 @@ private:
void tryRerangeRightTableData() override; void tryRerangeRightTableData() override;
template <JoinKind KIND, typename Map, JoinStrictness STRICTNESS> template <JoinKind KIND, typename Map, JoinStrictness STRICTNESS>
void tryRerangeRightTableDataImpl(Map & map); void tryRerangeRightTableDataImpl(Map & map);
void doDebugAsserts() const;
}; };
} }

View File

@ -26,6 +26,8 @@
#include <Analyzer/TableFunctionNode.h> #include <Analyzer/TableFunctionNode.h>
#include <Analyzer/Utils.h> #include <Analyzer/Utils.h>
#include <Core/Settings.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/QueryLog.h> #include <Interpreters/QueryLog.h>
@ -249,6 +251,8 @@ QueryPipelineBuilder InterpreterSelectQueryAnalyzer::buildQueryPipeline()
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(context); auto optimization_settings = QueryPlanOptimizationSettings::fromContext(context);
auto build_pipeline_settings = BuildQueryPipelineSettings::fromContext(context); auto build_pipeline_settings = BuildQueryPipelineSettings::fromContext(context);
query_plan.setConcurrencyControl(context->getSettingsRef().use_concurrency_control);
return std::move(*query_plan.buildQueryPipeline(optimization_settings, build_pipeline_settings)); return std::move(*query_plan.buildQueryPipeline(optimization_settings, build_pipeline_settings));
} }

View File

@ -208,6 +208,7 @@ bool isStorageTouchedByMutations(
} }
PullingAsyncPipelineExecutor executor(io.pipeline); PullingAsyncPipelineExecutor executor(io.pipeline);
io.pipeline.setConcurrencyControl(context->getSettingsRef().use_concurrency_control);
Block block; Block block;
while (block.rows() == 0 && executor.pull(block)); while (block.rows() == 0 && executor.pull(block));
@ -1286,6 +1287,10 @@ void MutationsInterpreter::Source::read(
void MutationsInterpreter::initQueryPlan(Stage & first_stage, QueryPlan & plan) void MutationsInterpreter::initQueryPlan(Stage & first_stage, QueryPlan & plan)
{ {
// Mutations are not using concurrency control now. Queries, merges and mutations running together could lead to CPU overcommit.
// TODO(serxa): Enable concurrency control for mutation queries and mutations. This should be done after CPU scheduler introduction.
plan.setConcurrencyControl(false);
source.read(first_stage, plan, metadata_snapshot, context, settings.apply_deleted_mask, settings.can_execute); source.read(first_stage, plan, metadata_snapshot, context, settings.apply_deleted_mask, settings.can_execute);
addCreatingSetsStep(plan, first_stage.analyzer->getPreparedSets(), context); addCreatingSetsStep(plan, first_stage.analyzer->getPreparedSets(), context);
} }

View File

@ -722,7 +722,14 @@ void ASTFunction::formatImplWithoutAlias(const FormatSettings & settings, Format
assert_cast<const ASTFunction *>(argument.get())->arguments->children[0]->formatImpl(settings, state, nested_dont_need_parens); assert_cast<const ASTFunction *>(argument.get())->arguments->children[0]->formatImpl(settings, state, nested_dont_need_parens);
settings.ostr << (settings.hilite ? hilite_operator : "") << " = " << (settings.hilite ? hilite_none : ""); settings.ostr << (settings.hilite ? hilite_operator : "") << " = " << (settings.hilite ? hilite_none : "");
} }
if (!secret_arguments.replacement.empty())
{
settings.ostr << "'" << secret_arguments.replacement << "'";
}
else
{
settings.ostr << "'[HIDDEN]'"; settings.ostr << "'[HIDDEN]'";
}
if (size <= secret_arguments.start + secret_arguments.count && !secret_arguments.are_named) if (size <= secret_arguments.start + secret_arguments.count && !secret_arguments.are_named)
break; /// All other arguments should also be hidden. break; /// All other arguments should also be hidden.
continue; continue;

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <Common/KnownObjectNames.h> #include <Common/KnownObjectNames.h>
#include <Common/re2.h>
#include <Core/QualifiedTableName.h> #include <Core/QualifiedTableName.h>
#include <base/defines.h> #include <base/defines.h>
#include <boost/algorithm/string/predicate.hpp> #include <boost/algorithm/string/predicate.hpp>
@ -49,6 +50,11 @@ public:
bool are_named = false; /// Arguments like `password = 'password'` are considered as named arguments. bool are_named = false; /// Arguments like `password = 'password'` are considered as named arguments.
/// E.g. "headers" in `url('..', headers('foo' = '[HIDDEN]'))` /// E.g. "headers" in `url('..', headers('foo' = '[HIDDEN]'))`
std::vector<std::string> nested_maps; std::vector<std::string> nested_maps;
/// Full replacement of an argument. Only supported when count is 1, otherwise all arguments will be replaced with this string.
/// It's needed in cases when we don't want to hide the entire parameter, but some part of it, e.g. "connection_string" in
/// `azureBlobStorage('DefaultEndpointsProtocol=https;AccountKey=secretkey;...', ...)` should be replaced with
/// `azureBlobStorage('DefaultEndpointsProtocol=https;AccountKey=[HIDDEN];...', ...)`.
std::string replacement;
bool hasSecrets() const bool hasSecrets() const
{ {
@ -74,6 +80,7 @@ protected:
result.are_named = argument_is_named; result.are_named = argument_is_named;
} }
chassert(index >= result.start); /// We always check arguments consecutively chassert(index >= result.start); /// We always check arguments consecutively
chassert(result.replacement.empty()); /// We shouldn't use replacement with masking other arguments
result.count = index + 1 - result.start; result.count = index + 1 - result.start;
if (!argument_is_named) if (!argument_is_named)
result.are_named = false; result.are_named = false;
@ -199,32 +206,39 @@ protected:
void findAzureBlobStorageFunctionSecretArguments(bool is_cluster_function) void findAzureBlobStorageFunctionSecretArguments(bool is_cluster_function)
{ {
/// azureBlobStorage('cluster_name', 'conn_string/storage_account_url', ...) has 'conn_string/storage_account_url' as its second argument. /// azureBlobStorageCluster('cluster_name', 'conn_string/storage_account_url', ...) has 'conn_string/storage_account_url' as its second argument.
size_t url_arg_idx = is_cluster_function ? 1 : 0; size_t url_arg_idx = is_cluster_function ? 1 : 0;
if (!is_cluster_function && isNamedCollectionName(0)) if (!is_cluster_function && isNamedCollectionName(0))
{ {
/// azureBlobStorage(named_collection, ..., account_key = 'account_key', ...) /// azureBlobStorage(named_collection, ..., account_key = 'account_key', ...)
if (maskAzureConnectionString(-1, true, 1))
return;
findSecretNamedArgument("account_key", 1); findSecretNamedArgument("account_key", 1);
return; return;
} }
else if (is_cluster_function && isNamedCollectionName(1)) else if (is_cluster_function && isNamedCollectionName(1))
{ {
/// azureBlobStorageCluster(cluster, named_collection, ..., account_key = 'account_key', ...) /// azureBlobStorageCluster(cluster, named_collection, ..., account_key = 'account_key', ...)
if (maskAzureConnectionString(-1, true, 2))
return;
findSecretNamedArgument("account_key", 2); findSecretNamedArgument("account_key", 2);
return; return;
} }
/// We should check other arguments first because we don't need to do any replacement in case storage_account_url is not used if (maskAzureConnectionString(url_arg_idx))
/// azureBlobStorage(connection_string|storage_account_url, container_name, blobpath, account_name, account_key, format, compression, structure) return;
/// azureBlobStorageCluster(cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure])
/// We should check other arguments first because we don't need to do any replacement in case of
/// azureBlobStorage(connection_string|storage_account_url, container_name, blobpath, format, [account_name, account_key, ...])
/// azureBlobStorageCluster(cluster, connection_string|storage_account_url, container_name, blobpath, format, [account_name, account_key, ...])
size_t count = function->arguments->size(); size_t count = function->arguments->size();
if ((url_arg_idx + 4 <= count) && (count <= url_arg_idx + 7)) if ((url_arg_idx + 4 <= count) && (count <= url_arg_idx + 7))
{ {
String second_arg; String fourth_arg;
if (tryGetStringFromArgument(url_arg_idx + 3, &second_arg)) if (tryGetStringFromArgument(url_arg_idx + 3, &fourth_arg))
{ {
if (second_arg == "auto" || KnownFormatNames::instance().exists(second_arg)) if (fourth_arg == "auto" || KnownFormatNames::instance().exists(fourth_arg))
return; /// The argument after 'url' is a format: s3('url', 'format', ...) return; /// The argument after 'url' is a format: s3('url', 'format', ...)
} }
} }
@ -234,6 +248,40 @@ protected:
markSecretArgument(url_arg_idx + 4); markSecretArgument(url_arg_idx + 4);
} }
bool maskAzureConnectionString(ssize_t url_arg_idx, bool argument_is_named = false, size_t start = 0)
{
String url_arg;
if (argument_is_named)
{
url_arg_idx = findNamedArgument(&url_arg, "connection_string", start);
if (url_arg_idx == -1 || url_arg.empty())
url_arg_idx = findNamedArgument(&url_arg, "storage_account_url", start);
if (url_arg_idx == -1 || url_arg.empty())
return false;
}
else
{
if (!tryGetStringFromArgument(url_arg_idx, &url_arg))
return false;
}
if (!url_arg.starts_with("http"))
{
static re2::RE2 account_key_pattern = "AccountKey=.*?(;|$)";
if (RE2::Replace(&url_arg, account_key_pattern, "AccountKey=[HIDDEN]\\1"))
{
chassert(result.count == 0); /// We shouldn't use replacement with masking other arguments
result.start = url_arg_idx;
result.are_named = argument_is_named;
result.count = 1;
result.replacement = url_arg;
return true;
}
}
return false;
}
void findURLSecretArguments() void findURLSecretArguments()
{ {
if (!isNamedCollectionName(0)) if (!isNamedCollectionName(0))
@ -513,8 +561,9 @@ protected:
return function->arguments->at(arg_idx)->isIdentifier(); return function->arguments->at(arg_idx)->isIdentifier();
} }
/// Looks for a secret argument with a specified name. This function looks for arguments in format `key=value` where the key is specified. /// Looks for an argument with a specified name. This function looks for arguments in format `key=value` where the key is specified.
void findSecretNamedArgument(const std::string_view & key, size_t start = 0) /// Returns -1 if no argument was found.
ssize_t findNamedArgument(String * res, const std::string_view & key, size_t start = 0)
{ {
for (size_t i = start; i < function->arguments->size(); ++i) for (size_t i = start; i < function->arguments->size(); ++i)
{ {
@ -531,9 +580,23 @@ protected:
continue; continue;
if (found_key == key) if (found_key == key)
markSecretArgument(i, /* argument_is_named= */ true); {
tryGetStringFromArgument(*equals_func->arguments->at(1), res);
return i;
} }
} }
return -1;
}
/// Looks for a secret argument with a specified name. This function looks for arguments in format `key=value` where the key is specified.
/// If the argument is found, it is marked as a secret.
void findSecretNamedArgument(const std::string_view & key, size_t start = 0)
{
ssize_t arg_idx = findNamedArgument(nullptr, key, start);
if (arg_idx >= 0)
markSecretArgument(arg_idx, /* argument_is_named= */ true);
}
}; };
} }

View File

@ -1,4 +1,5 @@
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>
#include "Common/ISlotControl.h"
#include <Common/ThreadPool.h> #include <Common/ThreadPool.h>
#include <Common/CurrentThread.h> #include <Common/CurrentThread.h>
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
@ -342,12 +343,22 @@ void PipelineExecutor::initializeExecution(size_t num_threads, bool concurrency_
is_execution_initialized = true; is_execution_initialized = true;
tryUpdateExecutionStatus(ExecutionStatus::NotStarted, ExecutionStatus::Executing); tryUpdateExecutionStatus(ExecutionStatus::NotStarted, ExecutionStatus::Executing);
size_t use_threads = num_threads; if (concurrency_control)
{
/// Allocate CPU slots from concurrency control /// Allocate CPU slots from concurrency control
size_t min_threads = concurrency_control ? 1uz : num_threads; constexpr size_t min_threads = 1uz; // Number of threads that should be granted to every query no matter how many threads are already running in other queries
cpu_slots = ConcurrencyControl::instance().allocate(min_threads, num_threads); cpu_slots = ConcurrencyControl::instance().allocate(min_threads, num_threads);
use_threads = cpu_slots->grantedCount(); #ifndef NDEBUG
LOG_TEST(log, "Allocate CPU slots. min: {}, max: {}, granted: {}", min_threads, num_threads, cpu_slots->grantedCount());
#endif
}
else
{
/// If concurrency control is not used we should not even count threads as competing.
/// To avoid counting them in ConcurrencyControl, we create dummy slot allocation.
cpu_slots = grantSlots(num_threads);
}
size_t use_threads = cpu_slots->grantedCount();
Queue queue; Queue queue;
graph->initializeExecution(queue); graph->initializeExecution(queue);

View File

@ -199,6 +199,7 @@ QueryPipelineBuilderPtr QueryPlan::buildQueryPipeline(
last_pipeline->setProgressCallback(build_pipeline_settings.progress_callback); last_pipeline->setProgressCallback(build_pipeline_settings.progress_callback);
last_pipeline->setProcessListElement(build_pipeline_settings.process_list_element); last_pipeline->setProcessListElement(build_pipeline_settings.process_list_element);
last_pipeline->addResources(std::move(resources)); last_pipeline->addResources(std::move(resources));
last_pipeline->setConcurrencyControl(getConcurrencyControl());
return last_pipeline; return last_pipeline;
} }

View File

@ -300,7 +300,8 @@ QueryPipelineBuilder QueryPipelineBuilder::unitePipelines(
/// It may happen if max_distributed_connections > max_threads /// It may happen if max_distributed_connections > max_threads
max_threads_limit = std::max(pipeline.max_threads, max_threads_limit); max_threads_limit = std::max(pipeline.max_threads, max_threads_limit);
concurrency_control = pipeline.getConcurrencyControl(); // Use concurrency control if at least one of pipelines is using it
concurrency_control = concurrency_control || pipeline.getConcurrencyControl();
} }
QueryPipelineBuilder pipeline; QueryPipelineBuilder pipeline;
@ -311,8 +312,8 @@ QueryPipelineBuilder QueryPipelineBuilder::unitePipelines(
{ {
pipeline.setMaxThreads(max_threads); pipeline.setMaxThreads(max_threads);
pipeline.limitMaxThreads(max_threads_limit); pipeline.limitMaxThreads(max_threads_limit);
pipeline.setConcurrencyControl(concurrency_control);
} }
pipeline.setConcurrencyControl(concurrency_control);
pipeline.setCollectedProcessors(nullptr); pipeline.setCollectedProcessors(nullptr);
return pipeline; return pipeline;

View File

@ -1232,6 +1232,7 @@ namespace
if (io.pipeline.pulling()) if (io.pipeline.pulling())
{ {
auto executor = std::make_shared<PullingAsyncPipelineExecutor>(io.pipeline); auto executor = std::make_shared<PullingAsyncPipelineExecutor>(io.pipeline);
io.pipeline.setConcurrencyControl(query_context->getSettingsRef().use_concurrency_control);
auto check_for_cancel = [&] auto check_for_cancel = [&]
{ {
if (isQueryCancelled()) if (isQueryCancelled())

View File

@ -1086,6 +1086,7 @@ void TCPHandler::processOrdinaryQuery()
{ {
PullingAsyncPipelineExecutor executor(pipeline); PullingAsyncPipelineExecutor executor(pipeline);
pipeline.setConcurrencyControl(query_context->getSettingsRef().use_concurrency_control);
CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread}; CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread};
/// The following may happen: /// The following may happen:

View File

@ -431,6 +431,7 @@ void StorageLiveView::writeBlock(StorageLiveView & live_view, Block && block, Ch
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder)); auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
PullingAsyncPipelineExecutor executor(pipeline); PullingAsyncPipelineExecutor executor(pipeline);
pipeline.setConcurrencyControl(local_context->getSettingsRef().use_concurrency_control);
Block this_block; Block this_block;
while (executor.pull(this_block)) while (executor.pull(this_block))
@ -567,6 +568,7 @@ MergeableBlocksPtr StorageLiveView::collectMergeableBlocks(ContextPtr local_cont
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder)); auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
PullingAsyncPipelineExecutor executor(pipeline); PullingAsyncPipelineExecutor executor(pipeline);
pipeline.setConcurrencyControl(local_context->getSettingsRef().use_concurrency_control);
Block this_block; Block this_block;
while (executor.pull(this_block)) while (executor.pull(this_block))
@ -672,6 +674,7 @@ bool StorageLiveView::getNewBlocks(const std::lock_guard<std::mutex> & lock)
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder)); auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
PullingAsyncPipelineExecutor executor(pipeline); PullingAsyncPipelineExecutor executor(pipeline);
pipeline.setConcurrencyControl(false);
Block block; Block block;
while (executor.pull(block)) while (executor.pull(block))
{ {

View File

@ -1727,6 +1727,10 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context); auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context);
auto builder = merge_parts_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings); auto builder = merge_parts_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings);
// Merges are not using concurrency control now. Queries and merges running together could lead to CPU overcommit.
// TODO(serxa): Enable concurrency control for merges. This should be done after CPU scheduler introduction.
builder->setConcurrencyControl(false);
global_ctx->merged_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); global_ctx->merged_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
} }

View File

@ -2057,6 +2057,7 @@ bool MutateTask::prepare()
context_for_reading->setSetting("apply_mutations_on_fly", false); context_for_reading->setSetting("apply_mutations_on_fly", false);
/// Skip using large sets in KeyCondition /// Skip using large sets in KeyCondition
context_for_reading->setSetting("use_index_for_in_with_subqueries_max_values", 100000); context_for_reading->setSetting("use_index_for_in_with_subqueries_max_values", 100000);
context_for_reading->setSetting("use_concurrency_control", false);
for (const auto & command : *ctx->commands) for (const auto & command : *ctx->commands)
if (!canSkipMutationCommandForPart(ctx->source_part, command, context_for_reading)) if (!canSkipMutationCommandForPart(ctx->source_part, command, context_for_reading))

View File

@ -223,7 +223,7 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
{ {
account_name = fourth_arg; account_name = fourth_arg;
account_key = checkAndGetLiteralArgument<String>(engine_args[4], "account_key"); account_key = checkAndGetLiteralArgument<String>(engine_args[4], "account_key");
auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format/account_name"); auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format/structure");
if (is_format_arg(sixth_arg)) if (is_format_arg(sixth_arg))
{ {
format = sixth_arg; format = sixth_arg;
@ -257,10 +257,10 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
} }
else if (with_structure && engine_args.size() == 8) else if (with_structure && engine_args.size() == 8)
{ {
auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "format/account_name"); auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "account_name");
account_name = fourth_arg; account_name = fourth_arg;
account_key = checkAndGetLiteralArgument<String>(engine_args[4], "account_key"); account_key = checkAndGetLiteralArgument<String>(engine_args[4], "account_key");
auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format/account_name"); auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format");
if (!is_format_arg(sixth_arg)) if (!is_format_arg(sixth_arg))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown format {}", sixth_arg); throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown format {}", sixth_arg);
format = sixth_arg; format = sixth_arg;

View File

@ -131,7 +131,7 @@ std::shared_ptr<StorageObjectStorageSource::IIterator> StorageObjectStorageSourc
else else
{ {
ConfigurationPtr copy_configuration = configuration->clone(); ConfigurationPtr copy_configuration = configuration->clone();
auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns); auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, local_context);
if (filter_dag) if (filter_dag)
{ {
auto keys = configuration->getPaths(); auto keys = configuration->getPaths();
@ -142,7 +142,7 @@ std::shared_ptr<StorageObjectStorageSource::IIterator> StorageObjectStorageSourc
VirtualColumnUtils::buildSetsForDAG(*filter_dag, local_context); VirtualColumnUtils::buildSetsForDAG(*filter_dag, local_context);
auto actions = std::make_shared<ExpressionActions>(std::move(*filter_dag)); auto actions = std::make_shared<ExpressionActions>(std::move(*filter_dag));
VirtualColumnUtils::filterByPathOrFile(keys, paths, actions, virtual_columns); VirtualColumnUtils::filterByPathOrFile(keys, paths, actions, virtual_columns, local_context);
copy_configuration->setPaths(keys); copy_configuration->setPaths(keys);
} }
@ -489,6 +489,7 @@ StorageObjectStorageSource::GlobIterator::GlobIterator(
, virtual_columns(virtual_columns_) , virtual_columns(virtual_columns_)
, throw_on_zero_files_match(throw_on_zero_files_match_) , throw_on_zero_files_match(throw_on_zero_files_match_)
, read_keys(read_keys_) , read_keys(read_keys_)
, local_context(context_)
, file_progress_callback(file_progress_callback_) , file_progress_callback(file_progress_callback_)
{ {
if (configuration->isNamespaceWithGlobs()) if (configuration->isNamespaceWithGlobs())
@ -510,7 +511,7 @@ StorageObjectStorageSource::GlobIterator::GlobIterator(
} }
recursive = key_with_globs == "/**"; recursive = key_with_globs == "/**";
if (auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns)) if (auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, local_context))
{ {
VirtualColumnUtils::buildSetsForDAG(*filter_dag, getContext()); VirtualColumnUtils::buildSetsForDAG(*filter_dag, getContext());
filter_expr = std::make_shared<ExpressionActions>(std::move(*filter_dag)); filter_expr = std::make_shared<ExpressionActions>(std::move(*filter_dag));
@ -585,7 +586,7 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::GlobIterator::ne
for (const auto & object_info : new_batch) for (const auto & object_info : new_batch)
paths.push_back(getUniqueStoragePathIdentifier(*configuration, *object_info, false)); paths.push_back(getUniqueStoragePathIdentifier(*configuration, *object_info, false));
VirtualColumnUtils::filterByPathOrFile(new_batch, paths, filter_expr, virtual_columns); VirtualColumnUtils::filterByPathOrFile(new_batch, paths, filter_expr, virtual_columns, local_context);
LOG_TEST(logger, "Filtered files: {} -> {}", paths.size(), new_batch.size()); LOG_TEST(logger, "Filtered files: {} -> {}", paths.size(), new_batch.size());
} }

View File

@ -220,6 +220,7 @@ private:
bool is_finished = false; bool is_finished = false;
bool first_iteration = true; bool first_iteration = true;
std::mutex next_mutex; std::mutex next_mutex;
const ContextPtr local_context;
std::function<void(FileProgress)> file_progress_callback; std::function<void(FileProgress)> file_progress_callback;
}; };

View File

@ -1141,13 +1141,13 @@ StorageFileSource::FilesIterator::FilesIterator(
{ {
std::optional<ActionsDAG> filter_dag; std::optional<ActionsDAG> filter_dag;
if (!distributed_processing && !archive_info && !files.empty()) if (!distributed_processing && !archive_info && !files.empty())
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns); filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, context_);
if (filter_dag) if (filter_dag)
{ {
VirtualColumnUtils::buildSetsForDAG(*filter_dag, context_); VirtualColumnUtils::buildSetsForDAG(*filter_dag, context_);
auto actions = std::make_shared<ExpressionActions>(std::move(*filter_dag)); auto actions = std::make_shared<ExpressionActions>(std::move(*filter_dag));
VirtualColumnUtils::filterByPathOrFile(files, files, actions, virtual_columns); VirtualColumnUtils::filterByPathOrFile(files, files, actions, virtual_columns, context_);
} }
} }

View File

@ -227,7 +227,7 @@ public:
std::optional<ActionsDAG> filter_dag; std::optional<ActionsDAG> filter_dag;
if (!uris.empty()) if (!uris.empty())
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns); filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, context);
if (filter_dag) if (filter_dag)
{ {
@ -238,7 +238,7 @@ public:
VirtualColumnUtils::buildSetsForDAG(*filter_dag, context); VirtualColumnUtils::buildSetsForDAG(*filter_dag, context);
auto actions = std::make_shared<ExpressionActions>(std::move(*filter_dag)); auto actions = std::make_shared<ExpressionActions>(std::move(*filter_dag));
VirtualColumnUtils::filterByPathOrFile(uris, paths, actions, virtual_columns); VirtualColumnUtils::filterByPathOrFile(uris, paths, actions, virtual_columns, context);
} }
} }

View File

@ -1,5 +1,6 @@
#include <memory> #include <memory>
#include <stack> #include <stack>
#include <unordered_set>
#include <Core/NamesAndTypes.h> #include <Core/NamesAndTypes.h>
#include <Core/TypeId.h> #include <Core/TypeId.h>
@ -46,6 +47,7 @@
#include "Functions/IFunction.h" #include "Functions/IFunction.h"
#include "Functions/IFunctionAdaptors.h" #include "Functions/IFunctionAdaptors.h"
#include "Functions/indexHint.h" #include "Functions/indexHint.h"
#include <IO/ReadBufferFromString.h>
#include <Interpreters/convertFieldToType.h> #include <Interpreters/convertFieldToType.h>
#include <Parsers/makeASTForLogicalFunction.h> #include <Parsers/makeASTForLogicalFunction.h>
#include <Columns/ColumnSet.h> #include <Columns/ColumnSet.h>
@ -124,9 +126,18 @@ void filterBlockWithExpression(const ExpressionActionsPtr & actions, Block & blo
} }
} }
NamesAndTypesList getCommonVirtualsForFileLikeStorage()
{
return {{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_size", makeNullable(std::make_shared<DataTypeUInt64>())},
{"_time", makeNullable(std::make_shared<DataTypeDateTime>())},
{"_etag", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
}
NameSet getVirtualNamesForFileLikeStorage() NameSet getVirtualNamesForFileLikeStorage()
{ {
return {"_path", "_file", "_size", "_time", "_etag"}; return getCommonVirtualsForFileLikeStorage().getNameSet();
} }
std::unordered_map<std::string, std::string> parseHivePartitioningKeysAndValues(const String & path) std::unordered_map<std::string, std::string> parseHivePartitioningKeysAndValues(const String & path)
@ -154,8 +165,10 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(ColumnsDescription & sto
{ {
VirtualColumnsDescription desc; VirtualColumnsDescription desc;
auto add_virtual = [&](const auto & name, const auto & type) auto add_virtual = [&](const NameAndTypePair & pair)
{ {
const auto & name = pair.getNameInStorage();
const auto & type = pair.getTypeInStorage();
if (storage_columns.has(name)) if (storage_columns.has(name))
{ {
if (!context->getSettingsRef().use_hive_partitioning) if (!context->getSettingsRef().use_hive_partitioning)
@ -172,11 +185,8 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(ColumnsDescription & sto
desc.addEphemeral(name, type, ""); desc.addEphemeral(name, type, "");
}; };
add_virtual("_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())); for (const auto & item : getCommonVirtualsForFileLikeStorage())
add_virtual("_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())); add_virtual(item);
add_virtual("_size", makeNullable(std::make_shared<DataTypeUInt64>()));
add_virtual("_time", makeNullable(std::make_shared<DataTypeDateTime>()));
add_virtual("_etag", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()));
if (context->getSettingsRef().use_hive_partitioning) if (context->getSettingsRef().use_hive_partitioning)
{ {
@ -188,16 +198,16 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(ColumnsDescription & sto
if (type == nullptr) if (type == nullptr)
type = std::make_shared<DataTypeString>(); type = std::make_shared<DataTypeString>();
if (type->canBeInsideLowCardinality()) if (type->canBeInsideLowCardinality())
add_virtual(item.first, std::make_shared<DataTypeLowCardinality>(type)); add_virtual({item.first, std::make_shared<DataTypeLowCardinality>(type)});
else else
add_virtual(item.first, type); add_virtual({item.first, type});
} }
} }
return desc; return desc;
} }
static void addPathAndFileToVirtualColumns(Block & block, const String & path, size_t idx) static void addPathAndFileToVirtualColumns(Block & block, const String & path, size_t idx, const FormatSettings & format_settings, bool use_hive_partitioning)
{ {
if (block.has("_path")) if (block.has("_path"))
block.getByName("_path").column->assumeMutableRef().insert(path); block.getByName("_path").column->assumeMutableRef().insert(path);
@ -214,18 +224,34 @@ static void addPathAndFileToVirtualColumns(Block & block, const String & path, s
block.getByName("_file").column->assumeMutableRef().insert(file); block.getByName("_file").column->assumeMutableRef().insert(file);
} }
if (use_hive_partitioning)
{
auto keys_and_values = parseHivePartitioningKeysAndValues(path);
for (const auto & [key, value] : keys_and_values)
{
if (const auto * column = block.findByName(key))
{
ReadBufferFromString buf(value);
column->type->getDefaultSerialization()->deserializeWholeText(column->column->assumeMutableRef(), buf, format_settings);
}
}
}
block.getByName("_idx").column->assumeMutableRef().insert(idx); block.getByName("_idx").column->assumeMutableRef().insert(idx);
} }
std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns) std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
{ {
if (!predicate || virtual_columns.empty()) if (!predicate || virtual_columns.empty())
return {}; return {};
Block block; Block block;
NameSet common_virtuals;
if (context->getSettingsRef().use_hive_partitioning)
common_virtuals = getVirtualNamesForFileLikeStorage();
for (const auto & column : virtual_columns) for (const auto & column : virtual_columns)
{ {
if (column.name == "_file" || column.name == "_path") if (column.name == "_file" || column.name == "_path" || !common_virtuals.contains(column.name))
block.insert({column.type->createColumn(), column.type, column.name}); block.insert({column.type->createColumn(), column.type, column.name});
} }
@ -233,18 +259,19 @@ std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * pr
return splitFilterDagForAllowedInputs(predicate, &block); return splitFilterDagForAllowedInputs(predicate, &block);
} }
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns) ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
{ {
Block block; Block block;
NameSet common_virtuals = getVirtualNamesForFileLikeStorage();
for (const auto & column : virtual_columns) for (const auto & column : virtual_columns)
{ {
if (column.name == "_file" || column.name == "_path") if (column.name == "_file" || column.name == "_path" || !common_virtuals.contains(column.name))
block.insert({column.type->createColumn(), column.type, column.name}); block.insert({column.type->createColumn(), column.type, column.name});
} }
block.insert({ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "_idx"}); block.insert({ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "_idx"});
for (size_t i = 0; i != paths.size(); ++i) for (size_t i = 0; i != paths.size(); ++i)
addPathAndFileToVirtualColumns(block, paths[i], i); addPathAndFileToVirtualColumns(block, paths[i], i, getFormatSettings(context), context->getSettingsRef().use_hive_partitioning);
filterBlockWithExpression(actions, block); filterBlockWithExpression(actions, block);

View File

@ -75,14 +75,14 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(
const std::string & sample_path = "", const std::string & sample_path = "",
std::optional<FormatSettings> format_settings_ = std::nullopt); std::optional<FormatSettings> format_settings_ = std::nullopt);
std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns); std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns); ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
template <typename T> template <typename T>
void filterByPathOrFile(std::vector<T> & sources, const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns) void filterByPathOrFile(std::vector<T> & sources, const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
{ {
auto indexes_column = getFilterByPathAndFileIndexes(paths, actions, virtual_columns); auto indexes_column = getFilterByPathAndFileIndexes(paths, actions, virtual_columns, context);
const auto & indexes = typeid_cast<const ColumnUInt64 &>(*indexes_column).getData(); const auto & indexes = typeid_cast<const ColumnUInt64 &>(*indexes_column).getData();
if (indexes.size() == sources.size()) if (indexes.size() == sources.size())
return; return;

View File

@ -646,6 +646,7 @@ std::pair<BlocksPtr, Block> StorageWindowView::getNewBlocks(UInt32 watermark)
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder)); auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
PullingAsyncPipelineExecutor executor(pipeline); PullingAsyncPipelineExecutor executor(pipeline);
pipeline.setConcurrencyControl(getContext()->getSettingsRef().use_concurrency_control);
Block block; Block block;
BlocksPtr new_blocks = std::make_shared<Blocks>(); BlocksPtr new_blocks = std::make_shared<Blocks>();

View File

@ -112,6 +112,7 @@ Block TableFunctionFormat::parseData(const ColumnsDescription & columns, const S
}); });
} }
builder.setConcurrencyControl(context->getSettingsRef().use_concurrency_control);
auto pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder))); auto pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
auto reader = std::make_unique<PullingPipelineExecutor>(*pipeline); auto reader = std::make_unique<PullingPipelineExecutor>(*pipeline);

View File

@ -139,13 +139,20 @@ def main():
jr = JobReport.load(from_file=report_file) jr = JobReport.load(from_file=report_file)
additional_files.append(report_file) additional_files.append(report_file)
for file in set(jr.additional_files): for file in set(jr.additional_files):
file_ = Path(file) orig_file = Path(file)
file_name = file_.name file_name = orig_file.name
file_name = file_name.replace( file_name = file_name.replace(
".", "__" + CI.Utils.normalize_string(job_id) + ".", 1 ".", "__" + CI.Utils.normalize_string(job_id) + ".", 1
) )
file_ = file_.rename(file_.parent / file_name) new_file = orig_file.rename(orig_file.parent / file_name)
additional_files.append(file_) for tr in test_results:
if tr.log_files is None:
continue
tr.log_files = [
new_file if (Path(log_file) == orig_file) else Path(log_file)
for log_file in tr.log_files
]
additional_files.append(new_file)
JobReport( JobReport(
description=description, description=description,

View File

@ -559,7 +559,7 @@ class CI:
JobNames.BUGFIX_VALIDATE: JobConfig( JobNames.BUGFIX_VALIDATE: JobConfig(
run_by_label="pr-bugfix", run_by_label="pr-bugfix",
run_command="bugfix_validate_check.py", run_command="bugfix_validate_check.py",
timeout=900, timeout=2400,
runner_type=Runners.STYLE_CHECKER, runner_type=Runners.STYLE_CHECKER,
), ),
} }

View File

@ -1,7 +1,7 @@
import logging
import os
from pathlib import Path from pathlib import Path
from typing import Dict, List, Optional, Sequence, Union from typing import Dict, List, Optional, Sequence, Union
import os
import logging
from env_helper import ( from env_helper import (
GITHUB_JOB_URL, GITHUB_JOB_URL,
@ -12,6 +12,8 @@ from env_helper import (
from report import TestResults, create_test_html_report from report import TestResults, create_test_html_report
from s3_helper import S3Helper from s3_helper import S3Helper
logger = logging.getLogger(__name__)
def process_logs( def process_logs(
s3_client: S3Helper, s3_client: S3Helper,
@ -19,7 +21,7 @@ def process_logs(
s3_path_prefix: str, s3_path_prefix: str,
test_results: TestResults, test_results: TestResults,
) -> List[str]: ) -> List[str]:
logging.info("Upload files to s3 %s", additional_logs) logger.info("Upload files to s3 %s", additional_logs)
processed_logs = {} # type: Dict[str, str] processed_logs = {} # type: Dict[str, str]
# Firstly convert paths of logs from test_results to urls to s3. # Firstly convert paths of logs from test_results to urls to s3.
@ -33,9 +35,19 @@ def process_logs(
if path in processed_logs: if path in processed_logs:
test_result.log_urls.append(processed_logs[str(path)]) test_result.log_urls.append(processed_logs[str(path)])
elif path: elif path:
try:
url = s3_client.upload_test_report_to_s3( url = s3_client.upload_test_report_to_s3(
Path(path), s3_path_prefix + "/" + str(path) Path(path), s3_path_prefix + "/" + str(path)
) )
except FileNotFoundError:
# Breaking the whole run on the malformed test is a bad idea
# FIXME: report the failure
logger.error(
"A broken TestResult, file '%s' does not exist: %s",
path,
test_result,
)
continue
test_result.log_urls.append(url) test_result.log_urls.append(url)
processed_logs[str(path)] = url processed_logs[str(path)] = url
@ -48,7 +60,7 @@ def process_logs(
) )
) )
else: else:
logging.error("File %s is missing - skip", log_path) logger.error("File %s is missing - skip", log_path)
return additional_urls return additional_urls
@ -116,8 +128,8 @@ def upload_results(
report_path.write_text(html_report, encoding="utf-8") report_path.write_text(html_report, encoding="utf-8")
url = s3_client.upload_test_report_to_s3(report_path, s3_path_prefix + ".html") url = s3_client.upload_test_report_to_s3(report_path, s3_path_prefix + ".html")
else: else:
logging.info("report.html was prepared by test job itself") logger.info("report.html was prepared by test job itself")
url = ready_report_url url = ready_report_url
logging.info("Search result in url %s", url) logger.info("Search result in url %s", url)
return url return url

View File

@ -1,4 +1,5 @@
<clickhouse> <clickhouse>
<concurrent_threads_soft_limit_ratio_to_cores>0</concurrent_threads_soft_limit_ratio_to_cores>
<concurrent_threads_soft_limit_num>50</concurrent_threads_soft_limit_num> <concurrent_threads_soft_limit_num>50</concurrent_threads_soft_limit_num>
<query_log> <query_log>
<database>system</database> <database>system</database>

View File

@ -28,6 +28,16 @@ node4 = cluster.add_instance(
) )
def assert_profile_event(node, query_id, profile_event, check):
assert check(
int(
node.query(
f"select ProfileEvents['{profile_event}'] from system.query_log where current_database = currentDatabase() and query_id = '{query_id}' and type = 'QueryFinish' order by query_start_time_microseconds desc limit 1"
)
)
)
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def started_cluster(): def started_cluster():
try: try:
@ -43,39 +53,176 @@ def test_concurrent_threads_soft_limit_default(started_cluster):
query_id="test_concurrent_threads_soft_limit_1", query_id="test_concurrent_threads_soft_limit_1",
) )
node1.query("SYSTEM FLUSH LOGS") node1.query("SYSTEM FLUSH LOGS")
assert_profile_event(
node1,
"test_concurrent_threads_soft_limit_1",
"ConcurrencyControlGrantedHard",
lambda x: x == 1,
)
assert_profile_event(
node1,
"test_concurrent_threads_soft_limit_1",
"ConcurrencyControlGrantDelayed",
lambda x: x == 0,
)
assert_profile_event(
node1,
"test_concurrent_threads_soft_limit_1",
"ConcurrencyControlAcquiredTotal",
lambda x: x == 100,
)
assert_profile_event(
node1,
"test_concurrent_threads_soft_limit_1",
"ConcurrencyControlAllocationDelayed",
lambda x: x == 0,
)
assert ( assert (
node1.query( node1.query(
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_1'" "select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_1' order by query_start_time_microseconds desc limit 1"
) )
== "102\n" == "102\n"
) )
@pytest.mark.skip(reason="broken test") def test_use_concurrency_control_default(started_cluster):
node1.query(
"SELECT count(*) FROM numbers_mt(10000000) SETTINGS use_concurrency_control = 0",
query_id="test_use_concurrency_control",
)
# Concurrency control is not used, all metrics should be zeros
node1.query("SYSTEM FLUSH LOGS")
assert_profile_event(
node1,
"test_use_concurrency_control",
"ConcurrencyControlGrantedHard",
lambda x: x == 0,
)
assert_profile_event(
node1,
"test_use_concurrency_control",
"ConcurrencyControlGrantDelayed",
lambda x: x == 0,
)
assert_profile_event(
node1,
"test_use_concurrency_control",
"ConcurrencyControlAcquiredTotal",
lambda x: x == 0,
)
assert_profile_event(
node1,
"test_use_concurrency_control",
"ConcurrencyControlAllocationDelayed",
lambda x: x == 0,
)
def test_concurrent_threads_soft_limit_defined_50(started_cluster): def test_concurrent_threads_soft_limit_defined_50(started_cluster):
node2.query( node2.query(
"SELECT count(*) FROM numbers_mt(10000000)", "SELECT count(*) FROM numbers_mt(10000000)",
query_id="test_concurrent_threads_soft_limit_2", query_id="test_concurrent_threads_soft_limit_2",
) )
node2.query("SYSTEM FLUSH LOGS") node2.query("SYSTEM FLUSH LOGS")
assert_profile_event(
node2,
"test_concurrent_threads_soft_limit_2",
"ConcurrencyControlGrantedHard",
lambda x: x == 1,
)
assert_profile_event(
node2,
"test_concurrent_threads_soft_limit_2",
"ConcurrencyControlGrantDelayed",
lambda x: x == 50,
)
assert_profile_event(
node2,
"test_concurrent_threads_soft_limit_2",
"ConcurrencyControlAcquiredTotal",
lambda x: x == 50,
)
assert_profile_event(
node2,
"test_concurrent_threads_soft_limit_2",
"ConcurrencyControlAllocationDelayed",
lambda x: x == 1,
)
assert ( assert (
node2.query( node2.query(
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_2'" "select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_2' order by query_start_time_microseconds desc limit 1"
) )
== "52\n" == "52\n"
) )
@pytest.mark.skip(reason="broken test") def test_use_concurrency_control_soft_limit_defined_50(started_cluster):
node2.query(
"SELECT count(*) FROM numbers_mt(10000000) SETTINGS use_concurrency_control = 0",
query_id="test_use_concurrency_control_2",
)
# Concurrency control is not used, all metrics should be zeros
node2.query("SYSTEM FLUSH LOGS")
assert_profile_event(
node2,
"test_use_concurrency_control_2",
"ConcurrencyControlGrantedHard",
lambda x: x == 0,
)
assert_profile_event(
node2,
"test_use_concurrency_control_2",
"ConcurrencyControlGrantDelayed",
lambda x: x == 0,
)
assert_profile_event(
node2,
"test_use_concurrency_control_2",
"ConcurrencyControlAcquiredTotal",
lambda x: x == 0,
)
assert_profile_event(
node2,
"test_use_concurrency_control_2",
"ConcurrencyControlAllocationDelayed",
lambda x: x == 0,
)
def test_concurrent_threads_soft_limit_defined_1(started_cluster): def test_concurrent_threads_soft_limit_defined_1(started_cluster):
node3.query( node3.query(
"SELECT count(*) FROM numbers_mt(10000000)", "SELECT count(*) FROM numbers_mt(10000000)",
query_id="test_concurrent_threads_soft_limit_3", query_id="test_concurrent_threads_soft_limit_3",
) )
node3.query("SYSTEM FLUSH LOGS") node3.query("SYSTEM FLUSH LOGS")
assert_profile_event(
node3,
"test_concurrent_threads_soft_limit_3",
"ConcurrencyControlGrantedHard",
lambda x: x == 1,
)
assert_profile_event(
node3,
"test_concurrent_threads_soft_limit_3",
"ConcurrencyControlGrantDelayed",
lambda x: x == 99,
)
assert_profile_event(
node3,
"test_concurrent_threads_soft_limit_3",
"ConcurrencyControlAcquiredTotal",
lambda x: x == 1,
)
assert_profile_event(
node3,
"test_concurrent_threads_soft_limit_3",
"ConcurrencyControlAllocationDelayed",
lambda x: x == 1,
)
assert ( assert (
node3.query( node3.query(
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_3'" "select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_3' order by query_start_time_microseconds desc limit 1"
) )
== "3\n" == "3\n"
) )
@ -84,7 +231,6 @@ def test_concurrent_threads_soft_limit_defined_1(started_cluster):
# In config_limit_reached.xml there is concurrent_threads_soft_limit=10 # In config_limit_reached.xml there is concurrent_threads_soft_limit=10
# Background query starts in a separate thread to reach this limit. # Background query starts in a separate thread to reach this limit.
# When this limit is reached the foreground query gets less than 5 queries despite the fact that it has settings max_threads=5 # When this limit is reached the foreground query gets less than 5 queries despite the fact that it has settings max_threads=5
@pytest.mark.skip(reason="broken test")
def test_concurrent_threads_soft_limit_limit_reached(started_cluster): def test_concurrent_threads_soft_limit_limit_reached(started_cluster):
def background_query(): def background_query():
try: try:
@ -117,8 +263,32 @@ def test_concurrent_threads_soft_limit_limit_reached(started_cluster):
) )
node4.query("SYSTEM FLUSH LOGS") node4.query("SYSTEM FLUSH LOGS")
assert_profile_event(
node4,
"test_concurrent_threads_soft_limit_4",
"ConcurrencyControlGrantedHard",
lambda x: x == 1,
)
assert_profile_event(
node4,
"test_concurrent_threads_soft_limit_4",
"ConcurrencyControlGrantDelayed",
lambda x: x > 0,
)
assert_profile_event(
node4,
"test_concurrent_threads_soft_limit_4",
"ConcurrencyControlAcquiredTotal",
lambda x: x < 5,
)
assert_profile_event(
node4,
"test_concurrent_threads_soft_limit_4",
"ConcurrencyControlAllocationDelayed",
lambda x: x == 1,
)
s_count = node4.query( s_count = node4.query(
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_4'" "select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_4' order by query_start_time_microseconds desc limit 1"
).strip() ).strip()
if s_count: if s_count:
count = int(s_count) count = int(s_count)

View File

@ -1,5 +1,6 @@
import pytest import pytest
import random, string import random, string
import re
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV from helpers.test_tools import TSV
@ -336,6 +337,10 @@ def test_create_database():
def test_table_functions(): def test_table_functions():
password = new_password() password = new_password()
azure_conn_string = cluster.env_variables["AZURITE_CONNECTION_STRING"] azure_conn_string = cluster.env_variables["AZURITE_CONNECTION_STRING"]
account_key_pattern = re.compile("AccountKey=.*?(;|$)")
masked_azure_conn_string = re.sub(
account_key_pattern, "AccountKey=[HIDDEN]\\1", azure_conn_string
)
azure_storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"] azure_storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
azure_account_name = "devstoreaccount1" azure_account_name = "devstoreaccount1"
azure_account_key = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" azure_account_key = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
@ -467,23 +472,23 @@ def test_table_functions():
"CREATE TABLE tablefunc30 (x int) AS s3('http://minio1:9001/root/data/test9.csv.gz', 'NOSIGN', 'CSV')", "CREATE TABLE tablefunc30 (x int) AS s3('http://minio1:9001/root/data/test9.csv.gz', 'NOSIGN', 'CSV')",
"CREATE TABLE tablefunc31 (`x` int) AS s3('http://minio1:9001/root/data/test10.csv.gz', 'minio', '[HIDDEN]')", "CREATE TABLE tablefunc31 (`x` int) AS s3('http://minio1:9001/root/data/test10.csv.gz', 'minio', '[HIDDEN]')",
"CREATE TABLE tablefunc32 (`x` int) AS deltaLake('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')", "CREATE TABLE tablefunc32 (`x` int) AS deltaLake('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')",
f"CREATE TABLE tablefunc33 (x int) AS azureBlobStorage('{azure_conn_string}', 'cont', 'test_simple.csv', 'CSV')", f"CREATE TABLE tablefunc33 (`x` int) AS azureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple.csv', 'CSV')",
f"CREATE TABLE tablefunc34 (x int) AS azureBlobStorage('{azure_conn_string}', 'cont', 'test_simple_1.csv', 'CSV', 'none')", f"CREATE TABLE tablefunc34 (`x` int) AS azureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple_1.csv', 'CSV', 'none')",
f"CREATE TABLE tablefunc35 (x int) AS azureBlobStorage('{azure_conn_string}', 'cont', 'test_simple_2.csv', 'CSV', 'none', 'auto')", f"CREATE TABLE tablefunc35 (`x` int) AS azureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple_2.csv', 'CSV', 'none', 'auto')",
f"CREATE TABLE tablefunc36 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_3.csv', '{azure_account_name}', '[HIDDEN]')", f"CREATE TABLE tablefunc36 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_3.csv', '{azure_account_name}', '[HIDDEN]')",
f"CREATE TABLE tablefunc37 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_4.csv', '{azure_account_name}', '[HIDDEN]', 'CSV')", f"CREATE TABLE tablefunc37 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_4.csv', '{azure_account_name}', '[HIDDEN]', 'CSV')",
f"CREATE TABLE tablefunc38 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_5.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none')", f"CREATE TABLE tablefunc38 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_5.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none')",
f"CREATE TABLE tablefunc39 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_6.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none', 'auto')", f"CREATE TABLE tablefunc39 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_6.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none', 'auto')",
f"CREATE TABLE tablefunc40 (x int) AS azureBlobStorage(named_collection_2, connection_string = '{azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')", f"CREATE TABLE tablefunc40 (`x` int) AS azureBlobStorage(named_collection_2, connection_string = '{masked_azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')",
f"CREATE TABLE tablefunc41 (`x` int) AS azureBlobStorage(named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')", f"CREATE TABLE tablefunc41 (`x` int) AS azureBlobStorage(named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')",
f"CREATE TABLE tablefunc42 (x int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_conn_string}', 'cont', 'test_simple_9.csv', 'CSV')", f"CREATE TABLE tablefunc42 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{masked_azure_conn_string}', 'cont', 'test_simple_9.csv', 'CSV')",
f"CREATE TABLE tablefunc43 (x int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_conn_string}', 'cont', 'test_simple_10.csv', 'CSV', 'none')", f"CREATE TABLE tablefunc43 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{masked_azure_conn_string}', 'cont', 'test_simple_10.csv', 'CSV', 'none')",
f"CREATE TABLE tablefunc44 (x int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_conn_string}', 'cont', 'test_simple_11.csv', 'CSV', 'none', 'auto')", f"CREATE TABLE tablefunc44 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{masked_azure_conn_string}', 'cont', 'test_simple_11.csv', 'CSV', 'none', 'auto')",
f"CREATE TABLE tablefunc45 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_12.csv', '{azure_account_name}', '[HIDDEN]')", f"CREATE TABLE tablefunc45 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_12.csv', '{azure_account_name}', '[HIDDEN]')",
f"CREATE TABLE tablefunc46 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_13.csv', '{azure_account_name}', '[HIDDEN]', 'CSV')", f"CREATE TABLE tablefunc46 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_13.csv', '{azure_account_name}', '[HIDDEN]', 'CSV')",
f"CREATE TABLE tablefunc47 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_14.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none')", f"CREATE TABLE tablefunc47 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_14.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none')",
f"CREATE TABLE tablefunc48 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_15.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none', 'auto')", f"CREATE TABLE tablefunc48 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_15.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none', 'auto')",
f"CREATE TABLE tablefunc49 (x int) AS azureBlobStorageCluster('test_shard_localhost', named_collection_2, connection_string = '{azure_conn_string}', container = 'cont', blob_path = 'test_simple_16.csv', format = 'CSV')", f"CREATE TABLE tablefunc49 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', named_collection_2, connection_string = '{masked_azure_conn_string}', container = 'cont', blob_path = 'test_simple_16.csv', format = 'CSV')",
f"CREATE TABLE tablefunc50 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_17.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')", f"CREATE TABLE tablefunc50 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_17.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')",
"CREATE TABLE tablefunc51 (`x` int) AS iceberg('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')", "CREATE TABLE tablefunc51 (`x` int) AS iceberg('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')",
], ],

View File

@ -3,6 +3,7 @@
set log_queries=1; set log_queries=1;
set log_query_threads=1; set log_query_threads=1;
set max_threads=0; set max_threads=0;
set use_concurrency_control=0;
WITH 01091 AS id SELECT 1; WITH 01091 AS id SELECT 1;
SYSTEM FLUSH LOGS; SYSTEM FLUSH LOGS;

View File

@ -6,6 +6,6 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh # shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh . "$CURDIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} --log_queries=1 --max_threads=32 --query_id "2015_${CLICKHOUSE_DATABASE}_query" -q "select count() from remote('127.0.0.{2,3}', numbers(10)) where number global in (select number % 5 from numbers_mt(1000000))" ${CLICKHOUSE_CLIENT} --log_queries=1 --max_threads=32 --use_concurrency_control=0 --query_id "2015_${CLICKHOUSE_DATABASE}_query" -q "select count() from remote('127.0.0.{2,3}', numbers(10)) where number global in (select number % 5 from numbers_mt(1000000))"
${CLICKHOUSE_CLIENT} -q "system flush logs" ${CLICKHOUSE_CLIENT} -q "system flush logs"
${CLICKHOUSE_CLIENT} -q "select length(thread_ids) >= 32 from system.query_log where event_date >= yesterday() and query_id = '2015_${CLICKHOUSE_DATABASE}_query' and type = 'QueryFinish' and current_database = currentDatabase()" ${CLICKHOUSE_CLIENT} -q "select length(thread_ids) >= 32 from system.query_log where event_date >= yesterday() and query_id = '2015_${CLICKHOUSE_DATABASE}_query' and type = 'QueryFinish' and current_database = currentDatabase()"

View File

@ -5,6 +5,9 @@
-- enforce some defaults to be sure that the env settings will not affect the test -- enforce some defaults to be sure that the env settings will not affect the test
SET max_threads=5, async_socket_for_remote=1, prefer_localhost_replica=1, optimize_read_in_order=1, load_marks_asynchronously=0, local_filesystem_read_method='pread', remote_filesystem_read_method='read', trace_profile_events=0; SET max_threads=5, async_socket_for_remote=1, prefer_localhost_replica=1, optimize_read_in_order=1, load_marks_asynchronously=0, local_filesystem_read_method='pread', remote_filesystem_read_method='read', trace_profile_events=0;
-- we do not want concurrency control to limit the number of threads
SET use_concurrency_control=0;
-- we use query_thread_log to check peak thread usage -- we use query_thread_log to check peak thread usage
-- after https://github.com/ClickHouse/ClickHouse/issues/53417 there is a simpler way to check it -- after https://github.com/ClickHouse/ClickHouse/issues/53417 there is a simpler way to check it
-- but that will not allow to backport the test to older versions -- but that will not allow to backport the test to older versions

View File

@ -1,21 +1,31 @@
1 %d: 123
1 %d: -123
1 %d: 0
1 %d: 9223372036854775807
1 %i: 123
1 %u: 123
1 %o: 173
1 %x: 7b
1 %X: 7B
1 %f: 0.000000
1 %f: 123.456000
1 %f: -123.456000
1 %F: 123.456000
1 %e: 1.234560e+02
1 %E: 1.234560E+02
1 %g: 123.456
1 %G: 123.456
1 %a: 0x1.edd2f1a9fbe77p+6
1 %A: 0X1.EDD2F1A9FBE77P+6
1 %s: abc
1 ┌─printf('%%s: %s', '\n\t')─┐
1. │ %s:
└───────────────────────────┘
%s:
%%: %
%.5d: 00123
%.2f: 123.46
%.2e: 1.23e+02
%.2g: 1.2e+02
%.2s: ab

View File

@ -1,39 +1,47 @@
-- Testing integer formats -- Testing integer formats
select printf('%%d: %d', 123) = '%d: 123'; select printf('%%d: %d', 123);
select printf('%%i: %i', 123) = '%i: 123'; select printf('%%d: %d', -123);
select printf('%%u: %u', 123) = '%u: 123'; select printf('%%d: %d', 0);
select printf('%%o: %o', 123) = '%o: 173'; select printf('%%d: %d', 9223372036854775807);
select printf('%%x: %x', 123) = '%x: 7b'; select printf('%%i: %i', 123);
select printf('%%X: %X', 123) = '%X: 7B'; select printf('%%u: %u', 123);
select printf('%%o: %o', 123);
select printf('%%x: %x', 123);
select printf('%%X: %X', 123);
-- Testing floating point formats -- Testing floating point formats
select printf('%%f: %f', 123.456) = '%f: 123.456000'; select printf('%%f: %f', 0.0);
select printf('%%F: %F', 123.456) = '%F: 123.456000'; select printf('%%f: %f', 123.456);
select printf('%%e: %e', 123.456) = '%e: 1.234560e+02'; select printf('%%f: %f', -123.456);
select printf('%%E: %E', 123.456) = '%E: 1.234560E+02'; select printf('%%F: %F', 123.456);
select printf('%%g: %g', 123.456) = '%g: 123.456'; select printf('%%e: %e', 123.456);
select printf('%%G: %G', 123.456) = '%G: 123.456'; select printf('%%E: %E', 123.456);
select printf('%%a: %a', 123.456) = '%a: 0x1.edd2f1a9fbe77p+6'; select printf('%%g: %g', 123.456);
select printf('%%A: %A', 123.456) = '%A: 0X1.EDD2F1A9FBE77P+6'; select printf('%%G: %G', 123.456);
select printf('%%a: %a', 123.456);
select printf('%%A: %A', 123.456);
-- Testing character formats -- Testing character formats
select printf('%%s: %s', 'abc') = '%s: abc'; select printf('%%s: %s', 'abc');
SELECT printf('%%s: %s', '\n\t') FORMAT PrettyCompact;
select printf('%%s: %s', '');
-- Testing the %% specifier -- Testing the %% specifier
select printf('%%%%: %%') = '%%: %'; select printf('%%%%: %%');
-- Testing integer formats with precision -- Testing integer formats with precision
select printf('%%.5d: %.5d', 123) = '%.5d: 00123'; select printf('%%.5d: %.5d', 123);
-- Testing floating point formats with precision -- Testing floating point formats with precision
select printf('%%.2f: %.2f', 123.456) = '%.2f: 123.46'; select printf('%%.2f: %.2f', 123.456);
select printf('%%.2e: %.2e', 123.456) = '%.2e: 1.23e+02'; select printf('%%.2e: %.2e', 123.456);
select printf('%%.2g: %.2g', 123.456) = '%.2g: 1.2e+02'; select printf('%%.2g: %.2g', 123.456);
-- Testing character formats with precision -- Testing character formats with precision
select printf('%%.2s: %.2s', 'abc') = '%.2s: ab'; select printf('%%.2s: %.2s', 'abc');
select printf('%%X: %X', 123.123); -- { serverError BAD_ARGUMENTS } select printf('%%X: %X', 123.123); -- { serverError BAD_ARGUMENTS }
select printf('%%A: %A', 'abc'); -- { serverError BAD_ARGUMENTS } select printf('%%A: %A', 'abc'); -- { serverError BAD_ARGUMENTS }
select printf('%%s: %s', 100); -- { serverError BAD_ARGUMENTS } select printf('%%s: %s', 100); -- { serverError BAD_ARGUMENTS }
select printf('%%n: %n', 100); -- { serverError BAD_ARGUMENTS } select printf('%%n: %n', 100); -- { serverError BAD_ARGUMENTS }
select printf('%%f: %f', 0); -- { serverError BAD_ARGUMENTS }

View File

@ -33,8 +33,8 @@ Cross Elizabeth
[1,2,3] 42.42 [1,2,3] 42.42
Array(Int64) LowCardinality(Float64) Array(Int64) LowCardinality(Float64)
101 101
2070 2071
2070 2071
b b
1 1
1 1

View File

@ -0,0 +1,6 @@
1
1
1
1
1
1

View File

@ -0,0 +1,72 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
DATA_DIR=$USER_FILES_PATH/$CLICKHOUSE_TEST_UNIQUE_NAME
mkdir -p $DATA_DIR
cp -r $CURDIR/data_hive/ $DATA_DIR
$CLICKHOUSE_CLIENT --query_id="test_03231_1_$CLICKHOUSE_TEST_UNIQUE_NAME" --query "
SELECT countDistinct(_path) FROM file('$DATA_DIR/data_hive/partitioning/column0=*/sample.parquet') WHERE column0 = 'Elizabeth' SETTINGS use_hive_partitioning=1, optimize_count_from_files=0;
"
$CLICKHOUSE_CLIENT --query "
SYSTEM FLUSH LOGS;
"
for _ in {1..5}; do
count=$( $CLICKHOUSE_CLIENT --query "
SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log
WHERE query_id='test_03231_1_$CLICKHOUSE_TEST_UNIQUE_NAME' AND
current_database = currentDatabase() and type='QueryFinish';" )
if [[ "$count" == "1" ]]; then
echo "1"
break
fi
sleep 1
done
$CLICKHOUSE_CLIENT --query_id="test_03231_2_$CLICKHOUSE_TEST_UNIQUE_NAME" --query "
SELECT countDistinct(_path) FROM file('$DATA_DIR/data_hive/partitioning/identifier=*/email.csv') WHERE identifier = 2070 SETTINGS use_hive_partitioning=1, optimize_count_from_files=0;
"
$CLICKHOUSE_CLIENT --query "
SYSTEM FLUSH LOGS;
"
for _ in {1..5}; do
count=$( $CLICKHOUSE_CLIENT --query "
SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log
WHERE query_id='test_03231_2_$CLICKHOUSE_TEST_UNIQUE_NAME' AND
current_database = currentDatabase() and type='QueryFinish';" )
if [[ "$count" == "1" ]]; then
echo "1"
break
fi
sleep 1
done
$CLICKHOUSE_CLIENT --query_id="test_03231_3_$CLICKHOUSE_TEST_UNIQUE_NAME" --query "
SELECT countDistinct(_path) FROM file('$DATA_DIR/data_hive/partitioning/array=*/sample.parquet') WHERE array = [1,2,3] SETTINGS use_hive_partitioning=1, optimize_count_from_files=0;
"
$CLICKHOUSE_CLIENT --query "
SYSTEM FLUSH LOGS;
"
for _ in {1..5}; do
count=$( $CLICKHOUSE_CLIENT --query "
SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log
WHERE query_id='test_03231_3_$CLICKHOUSE_TEST_UNIQUE_NAME' AND
current_database = currentDatabase() and type='QueryFinish';" )
if [[ "$count" == "1" ]]; then
echo "1"
break
fi
sleep 1
done
rm -rf $DATA_DIR

View File

@ -1 +1 @@
data_hive/partitioning/column0=Elizabeth/sample.parquet data_hive/partitioning/non_existing_column=Elizabeth/sample.parquet

View File

@ -5,4 +5,4 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh # shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh . "$CURDIR"/../shell_config.sh
$CLICKHOUSE_LOCAL -q "SELECT substring(_path, position(_path, 'data_hive')) FROM file('$CURDIR/data_hive/partitioning/column0=*/sample.parquet') LIMIT 1;" $CLICKHOUSE_LOCAL -q "SELECT substring(_path, position(_path, 'data_hive')) FROM file('$CURDIR/data_hive/partitioning/non_existing_column=*/sample.parquet') LIMIT 1;"

View File

@ -0,0 +1,5 @@
_login_email,_identifier,_first_name,_last_name
laura@example.com,2070,Laura,Grey
craig@example.com,4081,Craig,Johnson
mary@example.com,9346,Mary,Jenkins
jamie@example.com,5079,Jamie,Smith
1 _login_email _identifier _first_name _last_name
2 laura@example.com 2070 Laura Grey
3 craig@example.com 4081 Craig Johnson
4 mary@example.com 9346 Mary Jenkins
5 jamie@example.com 5079 Jamie Smith

View File

@ -181,6 +181,9 @@ ComplexKeyCache
ComplexKeyDirect ComplexKeyDirect
ComplexKeyHashed ComplexKeyHashed
Composable Composable
composable
ConcurrencyControlAcquired
ConcurrencyControlSoftLimit
Config Config
ConnectionDetails ConnectionDetails
Const Const