Compare commits

...

90 Commits

Author SHA1 Message Date
Sergei Trifonov
1b7b1325d4
Merge 758acb433d into c0c83236b6 2024-09-19 06:53:11 +00:00
Yakov Olkhovskiy
c0c83236b6
Merge pull request #69570 from alexkats/fix-azure
Mask azure connection string sensitive info
2024-09-19 05:40:47 +00:00
Yarik Briukhovetskyi
3eb5bc1a0f
Merge pull request #68963 from yariks5s/hive_partitioning_filtration
Filtering for hive partitioning
2024-09-18 22:16:58 +00:00
Robert Schulze
b94a7167a8
Merge pull request #69580 from rschu1ze/bump-libpqxx
Bump libpqxx to v7.7.5
2024-09-18 18:56:12 +00:00
Alex Katsman
b88cd79959 Mask azure connection string sensitive info 2024-09-18 18:32:22 +00:00
Konstantin Bogdanov
64e58baba1
Merge pull request #69682 from ClickHouse/more-asserts-for-hashjoin
Try fix asserts failure in `HashJoin`
2024-09-18 18:20:27 +00:00
max-vostrikov
a3fe155579
Merge pull request #69737 from ClickHouse/test_printf
added some edge cases for printf tests
2024-09-18 17:49:57 +00:00
maxvostrikov
f4b4b3cc35 added some edge cases for printf tests
added some edge cases for printf tests
2024-09-18 17:22:36 +02:00
Konstantin Bogdanov
cb24849396
Move assert 2024-09-18 15:24:48 +02:00
Mikhail f. Shiryaev
758acb433d
Replace renamed files in bugfix test results 2024-09-18 13:33:18 +02:00
Yarik Briukhovetskyi
143d9f0201
Merge branch 'ClickHouse:master' into hive_partitioning_filtration 2024-09-18 11:11:04 +02:00
Mikhail f. Shiryaev
7af0ec8b23
Increase bugfix timeout to 40 minutes 2024-09-17 19:26:54 +02:00
Konstantin Bogdanov
b08e727aef
Count allocated bytes from scratch after rerange 2024-09-17 19:02:10 +02:00
Yarik Briukhovetskyi
f52cdfb795
Merge branch 'ClickHouse:master' into hive_partitioning_filtration 2024-09-17 18:50:43 +02:00
Konstantin Bogdanov
a210f98819
Lint 2024-09-17 18:28:27 +02:00
Konstantin Bogdanov
7c5d55c6b2
Lint 2024-09-17 18:10:51 +02:00
Konstantin Bogdanov
80259659ff
More asserts 2024-09-17 18:03:19 +02:00
Yarik Briukhovetskyi
3a7c68a052
Update src/Storages/VirtualColumnUtils.cpp
Co-authored-by: Kruglov Pavel <48961922+Avogar@users.noreply.github.com>
2024-09-17 15:39:26 +02:00
Yarik Briukhovetskyi
e8d50aa97f
review 2024-09-17 15:02:33 +02:00
Mikhail f. Shiryaev
e5640acc52
Do not fail the job if an additional log is missing 2024-09-17 14:56:14 +02:00
Yarik Briukhovetskyi
cb92aaf968
fix 03232_file_path_normalizing 2024-09-17 11:26:13 +02:00
serxa
c75118c78d Merge branch 'master' into fix-use-concurrency-control 2024-09-16 17:36:07 +00:00
Yarik Briukhovetskyi
0cdec0acf1
fix logical error 2024-09-16 19:13:30 +02:00
serxa
d86ad992f1 Merge branch 'master' into fix-use-concurrency-control 2024-09-16 15:05:14 +00:00
serxa
10819dda2a make concurrency control integration test rerunnable 2024-09-16 15:04:26 +00:00
serxa
b8ea0e3396 avoid number of threads lowering to 16 by concurrency control 2024-09-16 14:53:31 +00:00
Yarik Briukhovetskyi
04f23332c3
fix filter issue 2024-09-16 15:59:22 +02:00
Yarik Briukhovetskyi
7d5203f8a7
add resize for partitioning_columns 2024-09-13 21:38:48 +02:00
Yarik Briukhovetskyi
0d1d750437
fix crash 2024-09-13 20:43:51 +02:00
Yarik Briukhovetskyi
ad31d86a15
move the block inserting 2024-09-13 19:58:19 +02:00
Yarik Briukhovetskyi
991279e5c6
revert 2024-09-13 19:23:00 +02:00
Yarik Briukhovetskyi
c184aae686
review 2024-09-13 16:40:01 +02:00
Yarik Briukhovetskyi
14a6b0422b
disable optimize_count_from_files 2024-09-13 16:33:17 +02:00
Robert Schulze
aab0d3dd9e
Bump to 7.7.5 2024-09-12 19:42:32 +00:00
Robert Schulze
5a34b9f24e
Bump to 7.6.1 2024-09-12 19:14:41 +00:00
Robert Schulze
a0a4858e00
Scratch build of libpqxx at 7.5.3 + patches 2024-09-12 18:55:35 +00:00
Sergei Trifonov
90645d7c0e
Merge branch 'master' into fix-use-concurrency-control 2024-09-11 18:48:11 +02:00
Yarik Briukhovetskyi
e8cec05d08
shellcheck 2024-09-11 13:52:20 +02:00
Yarik Briukhovetskyi
2876a4e714
add retries 2024-09-11 13:32:12 +02:00
Yarik Briukhovetskyi
a903e1a726
remove logging + fixing bug 2024-09-06 20:24:18 +02:00
Yarik Briukhovetskyi
2fa6be55ff
tests fix 2024-09-04 17:02:01 +02:00
Yarik Briukhovetskyi
8896d1b78b
try to fix tests 2024-09-04 14:46:29 +02:00
Yarik Briukhovetskyi
f688b903db
empty commit 2024-09-03 15:58:22 +02:00
Yarik Briukhovetskyi
21f9669836
empty commit 2024-09-03 15:41:43 +02:00
Yarik Briukhovetskyi
1a386ae4d5
Merge branch 'ClickHouse:master' into hive_partitioning_filtration 2024-09-03 15:35:31 +02:00
Yarik Briukhovetskyi
24f4e87f8b
revert debugging in tests 2024-09-03 15:20:22 +02:00
robot-clickhouse
b5eb0ef857 Automatic style fix 2024-08-30 17:57:54 +00:00
Max Kainov
78e8dbe008 increase flaky check timeout 1h->3h 2024-08-30 19:49:47 +02:00
serxa
5c18ffb8d1 Merge branch 'master' into fix-use-concurrency-control 2024-08-30 13:09:08 +00:00
Yarik Briukhovetskyi
620640a042
just to test 2024-08-30 12:58:21 +02:00
Yarik Briukhovetskyi
ec469a117d
testing 2024-08-30 00:56:35 +02:00
serxa
190d82ddd8 Merge branch 'master' into fix-use-concurrency-control 2024-08-29 19:31:58 +00:00
Yarik Briukhovetskyi
7a879980d8
try to fix tests 2024-08-29 18:25:11 +02:00
Yarik Briukhovetskyi
2adc61c215
add flush logs 2024-08-29 16:39:22 +02:00
serxa
d06dc22999 fix wrong conflict resolution 2024-08-29 14:17:14 +00:00
serxa
9e3f04c5eb fix wrong config settings inherited from server/config.xml 2024-08-29 12:17:46 +00:00
Yarik Briukhovetskyi
afc4d08aad
add no-fasttest tag 2024-08-29 13:31:05 +02:00
Sergei Trifonov
897e2def34
Merge branch 'master' into fix-use-concurrency-control 2024-08-29 12:42:37 +02:00
yariks5s
edc5d8dd92 fix path 2024-08-28 23:15:01 +00:00
yariks5s
d6b2a9d534 CLICKHOUSE_LOCAL -> CLIENT 2024-08-28 22:32:44 +00:00
yariks5s
dc97bd6b92 review + testing the code 2024-08-28 17:22:47 +00:00
Yarik Briukhovetskyi
60c6eb2610
trying to fix the test 2024-08-27 19:42:47 +02:00
Yarik Briukhovetskyi
9133505952
fix the test 2024-08-27 19:16:05 +02:00
Yarik Briukhovetskyi
2741bf00e4 chmod +x 2024-08-27 16:53:14 +00:00
Yarik Briukhovetskyi
4eca00a666
fix style 2024-08-27 18:10:41 +02:00
Yarik Briukhovetskyi
c6804122cb
fix shell 2024-08-27 16:52:29 +02:00
Yarik Briukhovetskyi
189cbe25fe
init 2024-08-27 16:28:18 +02:00
serxa
0632febfba better 2024-08-02 17:10:32 +00:00
serxa
67f51ded37 fix 02532_send_logs_level_test 2024-08-02 17:07:10 +00:00
serxa
14666f9be3 threads created under use_concurency_control=0 should not occupy a CPU slot 2024-08-02 17:00:32 +00:00
serxa
2f101367db add logging on CPU slot allocation in PipelineExecutor 2024-08-02 15:42:32 +00:00
serxa
06cac68db7 Merge branch 'master' into fix-use-concurrency-control 2024-08-02 15:12:12 +00:00
serxa
141179736e fix tidy build 2024-08-02 15:10:46 +00:00
serxa
184c156e09 Merge branch 'master' into fix-use-concurrency-control 2024-07-31 17:21:55 +00:00
serxa
cf9ea4af10 fix build 2024-07-31 08:47:16 +00:00
serxa
30fed916fa set use_concurrency_control=0 if some tests 2024-07-31 07:30:49 +00:00
serxa
384e2ff399 Merge branch 'master' into fix-use-concurrency-control 2024-07-31 07:25:16 +00:00
serxa
091883b8eb Merge branch 'fix-use-concurrency-control' of github.com:ClickHouse/ClickHouse into fix-use-concurrency-control 2024-07-12 13:02:14 +00:00
serxa
cb6438e472 fix style 2024-07-12 13:01:52 +00:00
robot-clickhouse
1cdcc0da57 Automatic style fix 2024-07-12 12:23:39 +00:00
serxa
6472fea0fd add tests for 'use_concurrency_control' and related profile events 2024-07-12 12:14:25 +00:00
serxa
dec92f60c6 fix ConcurrencyControlGrantedHard profile event 2024-07-12 12:13:33 +00:00
serxa
060ec6b68c return back normal concurrency_control passing logic 2024-07-12 12:12:57 +00:00
serxa
bf125d5da6 Merge branch 'master' into fix-use-concurrency-control 2024-07-12 08:44:12 +00:00
Sergei Trifonov
dd22140b57
Merge branch 'master' into fix-use-concurrency-control 2024-05-22 11:36:20 +02:00
serxa
d01ab205a9 aspell fix 2024-05-22 09:34:23 +00:00
serxa
e66e9d34ac Fixes 2024-05-14 16:56:06 +00:00
serxa
0acdfd6a37 Explicit passing of use_concurrency_control setting to find all place where it is not set at all 2024-05-14 10:41:40 +00:00
Sergei Trifonov
b252062767
Merge branch 'master' into fix-use-concurrency-control 2024-04-01 18:32:40 +02:00
serxa
dd6c1ac19a add diagnostics for concurrency control 2024-03-15 18:40:50 +00:00
69 changed files with 707 additions and 173 deletions

2
contrib/libpqxx vendored

@ -1 +1 @@
Subproject commit c995193a3a14d71f4711f1f421f65a1a1db64640
Subproject commit 41e4c331564167cca97ad6eccbd5b8879c2ca044

View File

@ -1,9 +1,9 @@
set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/libpqxx")
set (SRCS
"${LIBRARY_DIR}/src/strconv.cxx"
"${LIBRARY_DIR}/src/array.cxx"
"${LIBRARY_DIR}/src/binarystring.cxx"
"${LIBRARY_DIR}/src/blob.cxx"
"${LIBRARY_DIR}/src/connection.cxx"
"${LIBRARY_DIR}/src/cursor.cxx"
"${LIBRARY_DIR}/src/encodings.cxx"
@ -12,59 +12,25 @@ set (SRCS
"${LIBRARY_DIR}/src/field.cxx"
"${LIBRARY_DIR}/src/largeobject.cxx"
"${LIBRARY_DIR}/src/notification.cxx"
"${LIBRARY_DIR}/src/params.cxx"
"${LIBRARY_DIR}/src/pipeline.cxx"
"${LIBRARY_DIR}/src/result.cxx"
"${LIBRARY_DIR}/src/robusttransaction.cxx"
"${LIBRARY_DIR}/src/row.cxx"
"${LIBRARY_DIR}/src/sql_cursor.cxx"
"${LIBRARY_DIR}/src/strconv.cxx"
"${LIBRARY_DIR}/src/stream_from.cxx"
"${LIBRARY_DIR}/src/stream_to.cxx"
"${LIBRARY_DIR}/src/subtransaction.cxx"
"${LIBRARY_DIR}/src/time.cxx"
"${LIBRARY_DIR}/src/transaction.cxx"
"${LIBRARY_DIR}/src/transaction_base.cxx"
"${LIBRARY_DIR}/src/row.cxx"
"${LIBRARY_DIR}/src/params.cxx"
"${LIBRARY_DIR}/src/util.cxx"
"${LIBRARY_DIR}/src/version.cxx"
"${LIBRARY_DIR}/src/wait.cxx"
)
# Need to explicitly include each header file, because in the directory include/pqxx there are also files
# like just 'array'. So if including the whole directory with `target_include_directories`, it will make
# conflicts with all includes of <array>.
set (HDRS
"${LIBRARY_DIR}/include/pqxx/array.hxx"
"${LIBRARY_DIR}/include/pqxx/params.hxx"
"${LIBRARY_DIR}/include/pqxx/binarystring.hxx"
"${LIBRARY_DIR}/include/pqxx/composite.hxx"
"${LIBRARY_DIR}/include/pqxx/connection.hxx"
"${LIBRARY_DIR}/include/pqxx/cursor.hxx"
"${LIBRARY_DIR}/include/pqxx/dbtransaction.hxx"
"${LIBRARY_DIR}/include/pqxx/errorhandler.hxx"
"${LIBRARY_DIR}/include/pqxx/except.hxx"
"${LIBRARY_DIR}/include/pqxx/field.hxx"
"${LIBRARY_DIR}/include/pqxx/isolation.hxx"
"${LIBRARY_DIR}/include/pqxx/largeobject.hxx"
"${LIBRARY_DIR}/include/pqxx/nontransaction.hxx"
"${LIBRARY_DIR}/include/pqxx/notification.hxx"
"${LIBRARY_DIR}/include/pqxx/pipeline.hxx"
"${LIBRARY_DIR}/include/pqxx/prepared_statement.hxx"
"${LIBRARY_DIR}/include/pqxx/result.hxx"
"${LIBRARY_DIR}/include/pqxx/robusttransaction.hxx"
"${LIBRARY_DIR}/include/pqxx/row.hxx"
"${LIBRARY_DIR}/include/pqxx/separated_list.hxx"
"${LIBRARY_DIR}/include/pqxx/strconv.hxx"
"${LIBRARY_DIR}/include/pqxx/stream_from.hxx"
"${LIBRARY_DIR}/include/pqxx/stream_to.hxx"
"${LIBRARY_DIR}/include/pqxx/subtransaction.hxx"
"${LIBRARY_DIR}/include/pqxx/transaction.hxx"
"${LIBRARY_DIR}/include/pqxx/transaction_base.hxx"
"${LIBRARY_DIR}/include/pqxx/types.hxx"
"${LIBRARY_DIR}/include/pqxx/util.hxx"
"${LIBRARY_DIR}/include/pqxx/version.hxx"
"${LIBRARY_DIR}/include/pqxx/zview.hxx"
)
add_library(_libpqxx ${SRCS} ${HDRS})
add_library(_libpqxx ${SRCS})
target_link_libraries(_libpqxx PUBLIC ch_contrib::libpq)
target_include_directories (_libpqxx SYSTEM BEFORE PUBLIC "${LIBRARY_DIR}/include")

View File

@ -737,6 +737,14 @@ Number of sessions (connections) to ZooKeeper. Should be no more than one, becau
Number of watches (event subscriptions) in ZooKeeper.
### ConcurrencyControlAcquired
Total number of acquired CPU slots.
### ConcurrencyControlSoftLimit
Value of soft limit on number of CPU slots.
**See Also**
- [system.asynchronous_metrics](../../operations/system-tables/asynchronous_metrics.md#system_tables-asynchronous_metrics) — Contains periodically calculated metrics.

View File

@ -1631,6 +1631,7 @@ try
concurrent_threads_soft_limit = value;
}
ConcurrencyControl::instance().setMaxConcurrency(concurrent_threads_soft_limit);
LOG_INFO(log, "ConcurrencyControl limit is set to {}", concurrent_threads_soft_limit);
global_context->getProcessList().setMaxSize(new_server_settings.max_concurrent_queries);
global_context->getProcessList().setMaxInsertQueriesAmount(new_server_settings.max_concurrent_insert_queries);

View File

@ -537,6 +537,7 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, Iden
PullingAsyncPipelineExecutor executor(io.pipeline);
io.pipeline.setProgressCallback(context->getProgressCallback());
io.pipeline.setProcessListElement(context->getProcessListElement());
io.pipeline.setConcurrencyControl(context->getSettingsRef().use_concurrency_control);
Block block;

View File

@ -1701,6 +1701,9 @@ try
QueryPipeline pipeline(std::move(pipe));
PullingAsyncPipelineExecutor executor(pipeline);
/// Concurrency control in client is not required
pipeline.setConcurrencyControl(false);
if (need_render_progress)
{
pipeline.setProgressCallback([this](const Progress & progress){ onProgress(progress); });

View File

@ -240,6 +240,7 @@ void LocalConnection::sendQuery(
{
state->block = state->io.pipeline.getHeader();
state->executor = std::make_unique<PullingAsyncPipelineExecutor>(state->io.pipeline);
state->io.pipeline.setConcurrencyControl(false);
}
else if (state->io.pipeline.completed())
{

View File

@ -1,7 +1,23 @@
#include <Common/ISlotControl.h>
#include <Common/ConcurrencyControl.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
namespace ProfileEvents
{
extern const Event ConcurrencyControlGrantedHard;
extern const Event ConcurrencyControlGrantDelayed;
extern const Event ConcurrencyControlAcquiredTotal;
extern const Event ConcurrencyControlAllocationDelayed;
}
namespace CurrentMetrics
{
extern const Metric ConcurrencyControlAcquired;
extern const Metric ConcurrencyControlSoftLimit;
}
namespace DB
{
@ -17,6 +33,7 @@ ConcurrencyControl::Slot::~Slot()
ConcurrencyControl::Slot::Slot(SlotAllocationPtr && allocation_)
: allocation(std::move(allocation_))
, acquired_slot_increment(CurrentMetrics::ConcurrencyControlAcquired)
{
}
@ -34,6 +51,7 @@ ConcurrencyControl::Allocation::~Allocation()
{
if (granted.compare_exchange_strong(value, value - 1))
{
ProfileEvents::increment(ProfileEvents::ConcurrencyControlAcquiredTotal, 1);
std::unique_lock lock{mutex};
return AcquiredSlotPtr(new Slot(shared_from_this())); // can't use std::make_shared due to private ctor
}
@ -84,6 +102,7 @@ void ConcurrencyControl::Allocation::release()
ConcurrencyControl::ConcurrencyControl()
: cur_waiter(waiters.end())
, max_concurrency_metric(CurrentMetrics::ConcurrencyControlSoftLimit, 0)
{
}
@ -103,11 +122,16 @@ ConcurrencyControl::~ConcurrencyControl()
// Acquire as many slots as we can, but not lower than `min`
SlotCount granted = std::max(min, std::min(max, available(lock)));
cur_concurrency += granted;
ProfileEvents::increment(ProfileEvents::ConcurrencyControlGrantedHard, min);
// Create allocation and start waiting if more slots are required
if (granted < max)
{
ProfileEvents::increment(ProfileEvents::ConcurrencyControlGrantDelayed, max - granted);
ProfileEvents::increment(ProfileEvents::ConcurrencyControlAllocationDelayed);
return SlotAllocationPtr(new Allocation(*this, max, granted,
waiters.insert(cur_waiter, nullptr /* pointer is set by Allocation ctor */)));
}
else
return SlotAllocationPtr(new Allocation(*this, max, granted));
}
@ -116,6 +140,7 @@ void ConcurrencyControl::setMaxConcurrency(SlotCount value)
{
std::unique_lock lock{mutex};
max_concurrency = std::max<SlotCount>(1, value); // never allow max_concurrency to be zero
max_concurrency_metric.changeTo(max_concurrency == UnlimitedSlots ? 0 : max_concurrency);
schedule(lock);
}

View File

@ -8,6 +8,7 @@
#include <base/types.h>
#include <boost/core/noncopyable.hpp>
#include <Common/CurrentMetrics.h>
#include <Common/ISlotControl.h>
namespace DB
@ -53,6 +54,7 @@ public:
explicit Slot(SlotAllocationPtr && allocation_);
SlotAllocationPtr allocation;
CurrentMetrics::Increment acquired_slot_increment;
};
// Manages group of slots for a single query, see ConcurrencyControl::allocate(min, max)
@ -131,6 +133,7 @@ private:
Waiters::iterator cur_waiter; // round-robin pointer
SlotCount max_concurrency = UnlimitedSlots;
SlotCount cur_concurrency = 0;
CurrentMetrics::Increment max_concurrency_metric;
};
}

View File

@ -313,6 +313,9 @@
M(FilteringMarksWithPrimaryKey, "Number of threads currently doing filtering of mark ranges by the primary key") \
M(FilteringMarksWithSecondaryKeys, "Number of threads currently doing filtering of mark ranges by secondary keys") \
\
M(ConcurrencyControlAcquired, "Total number of acquired CPU slots") \
M(ConcurrencyControlSoftLimit, "Value of soft limit on number of CPU slots") \
\
M(DiskS3NoSuchKeyErrors, "The number of `NoSuchKey` errors that occur when reading data from S3 cloud storage through ClickHouse disks.") \
#ifdef APPLY_FOR_EXTERNAL_METRICS

View File

@ -73,4 +73,44 @@ public:
[[nodiscard]] virtual SlotAllocationPtr allocate(SlotCount min, SlotCount max) = 0;
};
/// Allocation that grants all the slots immediately on creation
class GrantedAllocation : public ISlotAllocation
{
public:
explicit GrantedAllocation(SlotCount granted_)
: granted(granted_)
, allocated(granted_)
{}
[[nodiscard]] AcquiredSlotPtr tryAcquire() override
{
SlotCount value = granted.load();
while (value)
{
if (granted.compare_exchange_strong(value, value - 1))
return std::make_shared<IAcquiredSlot>();
}
return {};
}
SlotCount grantedCount() const override
{
return granted.load();
}
SlotCount allocatedCount() const override
{
return allocated;
}
private:
std::atomic<SlotCount> granted; // allocated, but not yet acquired
const SlotCount allocated;
};
[[nodiscard]] inline SlotAllocationPtr grantSlots(SlotCount count)
{
return SlotAllocationPtr(new GrantedAllocation(count));
}
}

View File

@ -826,6 +826,11 @@ The server successfully detected this situation and will download merged part fr
M(ReadWriteBufferFromHTTPRequestsSent, "Number of HTTP requests sent by ReadWriteBufferFromHTTP") \
M(ReadWriteBufferFromHTTPBytes, "Total size of payload bytes received and sent by ReadWriteBufferFromHTTP. Doesn't include HTTP headers.") \
\
M(ConcurrencyControlGrantedHard, "Number of CPU slot granted according to guarantee of 1 thread per query and for queries with setting 'use_concurrency_control' = 0") \
M(ConcurrencyControlGrantDelayed, "Number of CPU slot not granted initially and required to wait for a free CPU slot") \
M(ConcurrencyControlAcquiredTotal, "Total number of CPU slot acquired") \
M(ConcurrencyControlAllocationDelayed, "Total number of CPU slot allocations (queries) that were required to wait for slots to upscale") \
\
M(GWPAsanAllocateSuccess, "Number of successful allocations done by GWPAsan") \
M(GWPAsanAllocateFailed, "Number of failed allocations done by GWPAsan (i.e. filled pool)") \
M(GWPAsanFree, "Number of free operations done by GWPAsan") \

View File

@ -151,6 +151,15 @@ Names NamesAndTypesList::getNames() const
return res;
}
NameSet NamesAndTypesList::getNameSet() const
{
NameSet res;
res.reserve(size());
for (const NameAndTypePair & column : *this)
res.insert(column.name);
return res;
}
DataTypes NamesAndTypesList::getTypes() const
{
DataTypes res;

View File

@ -100,6 +100,7 @@ public:
void getDifference(const NamesAndTypesList & rhs, NamesAndTypesList & deleted, NamesAndTypesList & added) const;
Names getNames() const;
NameSet getNameSet() const;
DataTypes getTypes() const;
/// Remove columns which names are not in the `names`.

View File

@ -666,6 +666,7 @@ void CacheDictionary<dictionary_key_type>::update(CacheDictionaryUpdateUnitPtr<d
Columns fetched_columns_during_update = fetch_request.makeAttributesResultColumnsNonMutable();
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
Block block;
while (executor.pull(block))
{

View File

@ -334,6 +334,7 @@ public:
, pipeline(std::move(pipeline_))
, executor(pipeline)
{
pipeline.setConcurrencyControl(false);
}
std::string getName() const override
@ -378,7 +379,6 @@ Pipe DirectDictionary<dictionary_key_type>::getSourcePipe(
ids.emplace_back(key);
auto pipeline = source_ptr->loadIds(ids);
if (use_async_executor)
pipe = Pipe(std::make_shared<SourceFromQueryPipeline<PullingAsyncPipelineExecutor>>(std::move(pipeline)));
else

View File

@ -454,6 +454,7 @@ void FlatDictionary::updateData()
{
QueryPipeline pipeline(source_ptr->loadUpdatedAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
update_field_loaded_block.reset();
Block block;
@ -495,6 +496,7 @@ void FlatDictionary::loadData()
{
QueryPipeline pipeline(source_ptr->loadAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
Block block;
while (executor.pull(block))

View File

@ -475,6 +475,7 @@ void HashedArrayDictionary<dictionary_key_type, sharded>::updateData()
{
QueryPipeline pipeline(source_ptr->loadUpdatedAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
update_field_loaded_block.reset();
Block block;
@ -973,6 +974,7 @@ void HashedArrayDictionary<dictionary_key_type, sharded>::loadData()
QueryPipeline pipeline(source_ptr->loadAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
UInt64 pull_time_microseconds = 0;
UInt64 process_time_microseconds = 0;

View File

@ -884,6 +884,7 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::updateData()
{
QueryPipeline pipeline(source_ptr->loadUpdatedAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
update_field_loaded_block.reset();
Block block;
@ -1163,6 +1164,7 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::loadData()
QueryPipeline pipeline(source_ptr->loadAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
Block block;
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder;

View File

@ -407,6 +407,7 @@ void IPAddressDictionary::loadData()
bool has_ipv6 = false;
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
Block block;
while (executor.pull(block))
{

View File

@ -291,6 +291,7 @@ void IPolygonDictionary::loadData()
QueryPipeline pipeline(source_ptr->loadAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
Block block;
while (executor.pull(block))
blockToAttributes(block);

View File

@ -541,6 +541,7 @@ void RangeHashedDictionary<dictionary_key_type>::loadData()
{
QueryPipeline pipeline(source_ptr->loadAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
Block block;
while (executor.pull(block))
@ -692,6 +693,7 @@ void RangeHashedDictionary<dictionary_key_type>::updateData()
{
QueryPipeline pipeline(source_ptr->loadUpdatedAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
update_field_loaded_block.reset();
Block block;

View File

@ -314,6 +314,7 @@ void RegExpTreeDictionary::loadData()
{
QueryPipeline pipeline(source_ptr->loadAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
Block block;
while (executor.pull(block))

View File

@ -193,6 +193,7 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr
PullingAsyncPipelineExecutor executor(io.pipeline);
io.pipeline.setProgressCallback(data.getContext()->getProgressCallback());
io.pipeline.setConcurrencyControl(data.getContext()->getSettingsRef().use_concurrency_control);
while (block.rows() == 0 && executor.pull(block))
{
}

View File

@ -77,7 +77,6 @@
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Parsers/formatAST.h>
#include <Parsers/QueryParameterVisitor.h>

View File

@ -338,11 +338,8 @@ size_t HashJoin::getTotalRowCount() const
return res;
}
size_t HashJoin::getTotalByteCount() const
void HashJoin::doDebugAsserts() const
{
if (!data)
return 0;
#ifndef NDEBUG
size_t debug_blocks_allocated_size = 0;
for (const auto & block : data->blocks)
@ -360,6 +357,14 @@ size_t HashJoin::getTotalByteCount() const
throw Exception(ErrorCodes::LOGICAL_ERROR, "data->blocks_nullmaps_allocated_size != debug_blocks_nullmaps_allocated_size ({} != {})",
data->blocks_nullmaps_allocated_size, debug_blocks_nullmaps_allocated_size);
#endif
}
size_t HashJoin::getTotalByteCount() const
{
if (!data)
return 0;
doDebugAsserts();
size_t res = 0;
@ -544,9 +549,11 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
have_compressed = true;
}
doDebugAsserts();
data->blocks_allocated_size += block_to_save.allocatedBytes();
data->blocks.emplace_back(std::move(block_to_save));
Block * stored_block = &data->blocks.back();
doDebugAsserts();
if (rows)
data->empty = false;
@ -634,9 +641,11 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
if (!flag_per_row && !is_inserted)
{
doDebugAsserts();
LOG_TRACE(log, "Skipping inserting block with {} rows", rows);
data->blocks_allocated_size -= stored_block->allocatedBytes();
data->blocks.pop_back();
doDebugAsserts();
}
if (!check_limits)
@ -683,6 +692,8 @@ void HashJoin::shrinkStoredBlocksToFit(size_t & total_bytes_in_join, bool force_
for (auto & stored_block : data->blocks)
{
doDebugAsserts();
size_t old_size = stored_block.allocatedBytes();
stored_block = stored_block.shrinkToFit();
size_t new_size = stored_block.allocatedBytes();
@ -700,6 +711,8 @@ void HashJoin::shrinkStoredBlocksToFit(size_t & total_bytes_in_join, bool force_
else
/// Sometimes after clone resized block can be bigger than original
data->blocks_allocated_size += new_size - old_size;
doDebugAsserts();
}
auto new_total_bytes_in_join = getTotalByteCount();
@ -1416,7 +1429,13 @@ void HashJoin::tryRerangeRightTableDataImpl(Map & map [[maybe_unused]])
};
BlocksList sorted_blocks;
visit_rows_map(sorted_blocks, map);
doDebugAsserts();
data->blocks.swap(sorted_blocks);
size_t new_blocks_allocated_size = 0;
for (const auto & block : data->blocks)
new_blocks_allocated_size += block.allocatedBytes();
data->blocks_allocated_size = new_blocks_allocated_size;
doDebugAsserts();
}
}

View File

@ -470,6 +470,7 @@ private:
void tryRerangeRightTableData() override;
template <JoinKind KIND, typename Map, JoinStrictness STRICTNESS>
void tryRerangeRightTableDataImpl(Map & map);
void doDebugAsserts() const;
};
}

View File

@ -26,6 +26,8 @@
#include <Analyzer/TableFunctionNode.h>
#include <Analyzer/Utils.h>
#include <Core/Settings.h>
#include <Interpreters/Context.h>
#include <Interpreters/QueryLog.h>
@ -249,6 +251,8 @@ QueryPipelineBuilder InterpreterSelectQueryAnalyzer::buildQueryPipeline()
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(context);
auto build_pipeline_settings = BuildQueryPipelineSettings::fromContext(context);
query_plan.setConcurrencyControl(context->getSettingsRef().use_concurrency_control);
return std::move(*query_plan.buildQueryPipeline(optimization_settings, build_pipeline_settings));
}

View File

@ -208,6 +208,7 @@ bool isStorageTouchedByMutations(
}
PullingAsyncPipelineExecutor executor(io.pipeline);
io.pipeline.setConcurrencyControl(context->getSettingsRef().use_concurrency_control);
Block block;
while (block.rows() == 0 && executor.pull(block));
@ -1286,6 +1287,10 @@ void MutationsInterpreter::Source::read(
void MutationsInterpreter::initQueryPlan(Stage & first_stage, QueryPlan & plan)
{
// Mutations are not using concurrency control now. Queries, merges and mutations running together could lead to CPU overcommit.
// TODO(serxa): Enable concurrency control for mutation queries and mutations. This should be done after CPU scheduler introduction.
plan.setConcurrencyControl(false);
source.read(first_stage, plan, metadata_snapshot, context, settings.apply_deleted_mask, settings.can_execute);
addCreatingSetsStep(plan, first_stage.analyzer->getPreparedSets(), context);
}

View File

@ -722,7 +722,14 @@ void ASTFunction::formatImplWithoutAlias(const FormatSettings & settings, Format
assert_cast<const ASTFunction *>(argument.get())->arguments->children[0]->formatImpl(settings, state, nested_dont_need_parens);
settings.ostr << (settings.hilite ? hilite_operator : "") << " = " << (settings.hilite ? hilite_none : "");
}
settings.ostr << "'[HIDDEN]'";
if (!secret_arguments.replacement.empty())
{
settings.ostr << "'" << secret_arguments.replacement << "'";
}
else
{
settings.ostr << "'[HIDDEN]'";
}
if (size <= secret_arguments.start + secret_arguments.count && !secret_arguments.are_named)
break; /// All other arguments should also be hidden.
continue;

View File

@ -1,6 +1,7 @@
#pragma once
#include <Common/KnownObjectNames.h>
#include <Common/re2.h>
#include <Core/QualifiedTableName.h>
#include <base/defines.h>
#include <boost/algorithm/string/predicate.hpp>
@ -49,6 +50,11 @@ public:
bool are_named = false; /// Arguments like `password = 'password'` are considered as named arguments.
/// E.g. "headers" in `url('..', headers('foo' = '[HIDDEN]'))`
std::vector<std::string> nested_maps;
/// Full replacement of an argument. Only supported when count is 1, otherwise all arguments will be replaced with this string.
/// It's needed in cases when we don't want to hide the entire parameter, but some part of it, e.g. "connection_string" in
/// `azureBlobStorage('DefaultEndpointsProtocol=https;AccountKey=secretkey;...', ...)` should be replaced with
/// `azureBlobStorage('DefaultEndpointsProtocol=https;AccountKey=[HIDDEN];...', ...)`.
std::string replacement;
bool hasSecrets() const
{
@ -74,6 +80,7 @@ protected:
result.are_named = argument_is_named;
}
chassert(index >= result.start); /// We always check arguments consecutively
chassert(result.replacement.empty()); /// We shouldn't use replacement with masking other arguments
result.count = index + 1 - result.start;
if (!argument_is_named)
result.are_named = false;
@ -199,32 +206,39 @@ protected:
void findAzureBlobStorageFunctionSecretArguments(bool is_cluster_function)
{
/// azureBlobStorage('cluster_name', 'conn_string/storage_account_url', ...) has 'conn_string/storage_account_url' as its second argument.
/// azureBlobStorageCluster('cluster_name', 'conn_string/storage_account_url', ...) has 'conn_string/storage_account_url' as its second argument.
size_t url_arg_idx = is_cluster_function ? 1 : 0;
if (!is_cluster_function && isNamedCollectionName(0))
{
/// azureBlobStorage(named_collection, ..., account_key = 'account_key', ...)
if (maskAzureConnectionString(-1, true, 1))
return;
findSecretNamedArgument("account_key", 1);
return;
}
else if (is_cluster_function && isNamedCollectionName(1))
{
/// azureBlobStorageCluster(cluster, named_collection, ..., account_key = 'account_key', ...)
if (maskAzureConnectionString(-1, true, 2))
return;
findSecretNamedArgument("account_key", 2);
return;
}
/// We should check other arguments first because we don't need to do any replacement in case storage_account_url is not used
/// azureBlobStorage(connection_string|storage_account_url, container_name, blobpath, account_name, account_key, format, compression, structure)
/// azureBlobStorageCluster(cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure])
if (maskAzureConnectionString(url_arg_idx))
return;
/// We should check other arguments first because we don't need to do any replacement in case of
/// azureBlobStorage(connection_string|storage_account_url, container_name, blobpath, format, [account_name, account_key, ...])
/// azureBlobStorageCluster(cluster, connection_string|storage_account_url, container_name, blobpath, format, [account_name, account_key, ...])
size_t count = function->arguments->size();
if ((url_arg_idx + 4 <= count) && (count <= url_arg_idx + 7))
{
String second_arg;
if (tryGetStringFromArgument(url_arg_idx + 3, &second_arg))
String fourth_arg;
if (tryGetStringFromArgument(url_arg_idx + 3, &fourth_arg))
{
if (second_arg == "auto" || KnownFormatNames::instance().exists(second_arg))
if (fourth_arg == "auto" || KnownFormatNames::instance().exists(fourth_arg))
return; /// The argument after 'url' is a format: s3('url', 'format', ...)
}
}
@ -234,6 +248,40 @@ protected:
markSecretArgument(url_arg_idx + 4);
}
bool maskAzureConnectionString(ssize_t url_arg_idx, bool argument_is_named = false, size_t start = 0)
{
String url_arg;
if (argument_is_named)
{
url_arg_idx = findNamedArgument(&url_arg, "connection_string", start);
if (url_arg_idx == -1 || url_arg.empty())
url_arg_idx = findNamedArgument(&url_arg, "storage_account_url", start);
if (url_arg_idx == -1 || url_arg.empty())
return false;
}
else
{
if (!tryGetStringFromArgument(url_arg_idx, &url_arg))
return false;
}
if (!url_arg.starts_with("http"))
{
static re2::RE2 account_key_pattern = "AccountKey=.*?(;|$)";
if (RE2::Replace(&url_arg, account_key_pattern, "AccountKey=[HIDDEN]\\1"))
{
chassert(result.count == 0); /// We shouldn't use replacement with masking other arguments
result.start = url_arg_idx;
result.are_named = argument_is_named;
result.count = 1;
result.replacement = url_arg;
return true;
}
}
return false;
}
void findURLSecretArguments()
{
if (!isNamedCollectionName(0))
@ -513,8 +561,9 @@ protected:
return function->arguments->at(arg_idx)->isIdentifier();
}
/// Looks for a secret argument with a specified name. This function looks for arguments in format `key=value` where the key is specified.
void findSecretNamedArgument(const std::string_view & key, size_t start = 0)
/// Looks for an argument with a specified name. This function looks for arguments in format `key=value` where the key is specified.
/// Returns -1 if no argument was found.
ssize_t findNamedArgument(String * res, const std::string_view & key, size_t start = 0)
{
for (size_t i = start; i < function->arguments->size(); ++i)
{
@ -531,8 +580,22 @@ protected:
continue;
if (found_key == key)
markSecretArgument(i, /* argument_is_named= */ true);
{
tryGetStringFromArgument(*equals_func->arguments->at(1), res);
return i;
}
}
return -1;
}
/// Looks for a secret argument with a specified name. This function looks for arguments in format `key=value` where the key is specified.
/// If the argument is found, it is marked as a secret.
void findSecretNamedArgument(const std::string_view & key, size_t start = 0)
{
ssize_t arg_idx = findNamedArgument(nullptr, key, start);
if (arg_idx >= 0)
markSecretArgument(arg_idx, /* argument_is_named= */ true);
}
};

View File

@ -1,4 +1,5 @@
#include <IO/WriteBufferFromString.h>
#include "Common/ISlotControl.h"
#include <Common/ThreadPool.h>
#include <Common/CurrentThread.h>
#include <Common/CurrentMetrics.h>
@ -342,12 +343,22 @@ void PipelineExecutor::initializeExecution(size_t num_threads, bool concurrency_
is_execution_initialized = true;
tryUpdateExecutionStatus(ExecutionStatus::NotStarted, ExecutionStatus::Executing);
size_t use_threads = num_threads;
/// Allocate CPU slots from concurrency control
size_t min_threads = concurrency_control ? 1uz : num_threads;
cpu_slots = ConcurrencyControl::instance().allocate(min_threads, num_threads);
use_threads = cpu_slots->grantedCount();
if (concurrency_control)
{
/// Allocate CPU slots from concurrency control
constexpr size_t min_threads = 1uz; // Number of threads that should be granted to every query no matter how many threads are already running in other queries
cpu_slots = ConcurrencyControl::instance().allocate(min_threads, num_threads);
#ifndef NDEBUG
LOG_TEST(log, "Allocate CPU slots. min: {}, max: {}, granted: {}", min_threads, num_threads, cpu_slots->grantedCount());
#endif
}
else
{
/// If concurrency control is not used we should not even count threads as competing.
/// To avoid counting them in ConcurrencyControl, we create dummy slot allocation.
cpu_slots = grantSlots(num_threads);
}
size_t use_threads = cpu_slots->grantedCount();
Queue queue;
graph->initializeExecution(queue);

View File

@ -199,6 +199,7 @@ QueryPipelineBuilderPtr QueryPlan::buildQueryPipeline(
last_pipeline->setProgressCallback(build_pipeline_settings.progress_callback);
last_pipeline->setProcessListElement(build_pipeline_settings.process_list_element);
last_pipeline->addResources(std::move(resources));
last_pipeline->setConcurrencyControl(getConcurrencyControl());
return last_pipeline;
}

View File

@ -300,7 +300,8 @@ QueryPipelineBuilder QueryPipelineBuilder::unitePipelines(
/// It may happen if max_distributed_connections > max_threads
max_threads_limit = std::max(pipeline.max_threads, max_threads_limit);
concurrency_control = pipeline.getConcurrencyControl();
// Use concurrency control if at least one of pipelines is using it
concurrency_control = concurrency_control || pipeline.getConcurrencyControl();
}
QueryPipelineBuilder pipeline;
@ -311,8 +312,8 @@ QueryPipelineBuilder QueryPipelineBuilder::unitePipelines(
{
pipeline.setMaxThreads(max_threads);
pipeline.limitMaxThreads(max_threads_limit);
pipeline.setConcurrencyControl(concurrency_control);
}
pipeline.setConcurrencyControl(concurrency_control);
pipeline.setCollectedProcessors(nullptr);
return pipeline;

View File

@ -1232,6 +1232,7 @@ namespace
if (io.pipeline.pulling())
{
auto executor = std::make_shared<PullingAsyncPipelineExecutor>(io.pipeline);
io.pipeline.setConcurrencyControl(query_context->getSettingsRef().use_concurrency_control);
auto check_for_cancel = [&]
{
if (isQueryCancelled())

View File

@ -1086,6 +1086,7 @@ void TCPHandler::processOrdinaryQuery()
{
PullingAsyncPipelineExecutor executor(pipeline);
pipeline.setConcurrencyControl(query_context->getSettingsRef().use_concurrency_control);
CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread};
/// The following may happen:

View File

@ -431,6 +431,7 @@ void StorageLiveView::writeBlock(StorageLiveView & live_view, Block && block, Ch
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
PullingAsyncPipelineExecutor executor(pipeline);
pipeline.setConcurrencyControl(local_context->getSettingsRef().use_concurrency_control);
Block this_block;
while (executor.pull(this_block))
@ -567,6 +568,7 @@ MergeableBlocksPtr StorageLiveView::collectMergeableBlocks(ContextPtr local_cont
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
PullingAsyncPipelineExecutor executor(pipeline);
pipeline.setConcurrencyControl(local_context->getSettingsRef().use_concurrency_control);
Block this_block;
while (executor.pull(this_block))
@ -672,6 +674,7 @@ bool StorageLiveView::getNewBlocks(const std::lock_guard<std::mutex> & lock)
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
PullingAsyncPipelineExecutor executor(pipeline);
pipeline.setConcurrencyControl(false);
Block block;
while (executor.pull(block))
{

View File

@ -1727,6 +1727,10 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context);
auto builder = merge_parts_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings);
// Merges are not using concurrency control now. Queries and merges running together could lead to CPU overcommit.
// TODO(serxa): Enable concurrency control for merges. This should be done after CPU scheduler introduction.
builder->setConcurrencyControl(false);
global_ctx->merged_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
}

View File

@ -2057,6 +2057,7 @@ bool MutateTask::prepare()
context_for_reading->setSetting("apply_mutations_on_fly", false);
/// Skip using large sets in KeyCondition
context_for_reading->setSetting("use_index_for_in_with_subqueries_max_values", 100000);
context_for_reading->setSetting("use_concurrency_control", false);
for (const auto & command : *ctx->commands)
if (!canSkipMutationCommandForPart(ctx->source_part, command, context_for_reading))

View File

@ -223,7 +223,7 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
{
account_name = fourth_arg;
account_key = checkAndGetLiteralArgument<String>(engine_args[4], "account_key");
auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format/account_name");
auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format/structure");
if (is_format_arg(sixth_arg))
{
format = sixth_arg;
@ -257,10 +257,10 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
}
else if (with_structure && engine_args.size() == 8)
{
auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "format/account_name");
auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "account_name");
account_name = fourth_arg;
account_key = checkAndGetLiteralArgument<String>(engine_args[4], "account_key");
auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format/account_name");
auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format");
if (!is_format_arg(sixth_arg))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown format {}", sixth_arg);
format = sixth_arg;

View File

@ -131,7 +131,7 @@ std::shared_ptr<StorageObjectStorageSource::IIterator> StorageObjectStorageSourc
else
{
ConfigurationPtr copy_configuration = configuration->clone();
auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, local_context);
if (filter_dag)
{
auto keys = configuration->getPaths();
@ -142,7 +142,7 @@ std::shared_ptr<StorageObjectStorageSource::IIterator> StorageObjectStorageSourc
VirtualColumnUtils::buildSetsForDAG(*filter_dag, local_context);
auto actions = std::make_shared<ExpressionActions>(std::move(*filter_dag));
VirtualColumnUtils::filterByPathOrFile(keys, paths, actions, virtual_columns);
VirtualColumnUtils::filterByPathOrFile(keys, paths, actions, virtual_columns, local_context);
copy_configuration->setPaths(keys);
}
@ -489,6 +489,7 @@ StorageObjectStorageSource::GlobIterator::GlobIterator(
, virtual_columns(virtual_columns_)
, throw_on_zero_files_match(throw_on_zero_files_match_)
, read_keys(read_keys_)
, local_context(context_)
, file_progress_callback(file_progress_callback_)
{
if (configuration->isNamespaceWithGlobs())
@ -510,7 +511,7 @@ StorageObjectStorageSource::GlobIterator::GlobIterator(
}
recursive = key_with_globs == "/**";
if (auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns))
if (auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, local_context))
{
VirtualColumnUtils::buildSetsForDAG(*filter_dag, getContext());
filter_expr = std::make_shared<ExpressionActions>(std::move(*filter_dag));
@ -585,7 +586,7 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::GlobIterator::ne
for (const auto & object_info : new_batch)
paths.push_back(getUniqueStoragePathIdentifier(*configuration, *object_info, false));
VirtualColumnUtils::filterByPathOrFile(new_batch, paths, filter_expr, virtual_columns);
VirtualColumnUtils::filterByPathOrFile(new_batch, paths, filter_expr, virtual_columns, local_context);
LOG_TEST(logger, "Filtered files: {} -> {}", paths.size(), new_batch.size());
}

View File

@ -220,6 +220,7 @@ private:
bool is_finished = false;
bool first_iteration = true;
std::mutex next_mutex;
const ContextPtr local_context;
std::function<void(FileProgress)> file_progress_callback;
};

View File

@ -1141,13 +1141,13 @@ StorageFileSource::FilesIterator::FilesIterator(
{
std::optional<ActionsDAG> filter_dag;
if (!distributed_processing && !archive_info && !files.empty())
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, context_);
if (filter_dag)
{
VirtualColumnUtils::buildSetsForDAG(*filter_dag, context_);
auto actions = std::make_shared<ExpressionActions>(std::move(*filter_dag));
VirtualColumnUtils::filterByPathOrFile(files, files, actions, virtual_columns);
VirtualColumnUtils::filterByPathOrFile(files, files, actions, virtual_columns, context_);
}
}

View File

@ -227,7 +227,7 @@ public:
std::optional<ActionsDAG> filter_dag;
if (!uris.empty())
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, context);
if (filter_dag)
{
@ -238,7 +238,7 @@ public:
VirtualColumnUtils::buildSetsForDAG(*filter_dag, context);
auto actions = std::make_shared<ExpressionActions>(std::move(*filter_dag));
VirtualColumnUtils::filterByPathOrFile(uris, paths, actions, virtual_columns);
VirtualColumnUtils::filterByPathOrFile(uris, paths, actions, virtual_columns, context);
}
}

View File

@ -1,5 +1,6 @@
#include <memory>
#include <stack>
#include <unordered_set>
#include <Core/NamesAndTypes.h>
#include <Core/TypeId.h>
@ -46,6 +47,7 @@
#include "Functions/IFunction.h"
#include "Functions/IFunctionAdaptors.h"
#include "Functions/indexHint.h"
#include <IO/ReadBufferFromString.h>
#include <Interpreters/convertFieldToType.h>
#include <Parsers/makeASTForLogicalFunction.h>
#include <Columns/ColumnSet.h>
@ -124,9 +126,18 @@ void filterBlockWithExpression(const ExpressionActionsPtr & actions, Block & blo
}
}
NamesAndTypesList getCommonVirtualsForFileLikeStorage()
{
return {{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_size", makeNullable(std::make_shared<DataTypeUInt64>())},
{"_time", makeNullable(std::make_shared<DataTypeDateTime>())},
{"_etag", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
}
NameSet getVirtualNamesForFileLikeStorage()
{
return {"_path", "_file", "_size", "_time", "_etag"};
return getCommonVirtualsForFileLikeStorage().getNameSet();
}
std::unordered_map<std::string, std::string> parseHivePartitioningKeysAndValues(const String & path)
@ -154,8 +165,10 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(ColumnsDescription & sto
{
VirtualColumnsDescription desc;
auto add_virtual = [&](const auto & name, const auto & type)
auto add_virtual = [&](const NameAndTypePair & pair)
{
const auto & name = pair.getNameInStorage();
const auto & type = pair.getTypeInStorage();
if (storage_columns.has(name))
{
if (!context->getSettingsRef().use_hive_partitioning)
@ -172,11 +185,8 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(ColumnsDescription & sto
desc.addEphemeral(name, type, "");
};
add_virtual("_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()));
add_virtual("_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()));
add_virtual("_size", makeNullable(std::make_shared<DataTypeUInt64>()));
add_virtual("_time", makeNullable(std::make_shared<DataTypeDateTime>()));
add_virtual("_etag", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()));
for (const auto & item : getCommonVirtualsForFileLikeStorage())
add_virtual(item);
if (context->getSettingsRef().use_hive_partitioning)
{
@ -188,16 +198,16 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(ColumnsDescription & sto
if (type == nullptr)
type = std::make_shared<DataTypeString>();
if (type->canBeInsideLowCardinality())
add_virtual(item.first, std::make_shared<DataTypeLowCardinality>(type));
add_virtual({item.first, std::make_shared<DataTypeLowCardinality>(type)});
else
add_virtual(item.first, type);
add_virtual({item.first, type});
}
}
return desc;
}
static void addPathAndFileToVirtualColumns(Block & block, const String & path, size_t idx)
static void addPathAndFileToVirtualColumns(Block & block, const String & path, size_t idx, const FormatSettings & format_settings, bool use_hive_partitioning)
{
if (block.has("_path"))
block.getByName("_path").column->assumeMutableRef().insert(path);
@ -214,18 +224,34 @@ static void addPathAndFileToVirtualColumns(Block & block, const String & path, s
block.getByName("_file").column->assumeMutableRef().insert(file);
}
if (use_hive_partitioning)
{
auto keys_and_values = parseHivePartitioningKeysAndValues(path);
for (const auto & [key, value] : keys_and_values)
{
if (const auto * column = block.findByName(key))
{
ReadBufferFromString buf(value);
column->type->getDefaultSerialization()->deserializeWholeText(column->column->assumeMutableRef(), buf, format_settings);
}
}
}
block.getByName("_idx").column->assumeMutableRef().insert(idx);
}
std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns)
std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
{
if (!predicate || virtual_columns.empty())
return {};
Block block;
NameSet common_virtuals;
if (context->getSettingsRef().use_hive_partitioning)
common_virtuals = getVirtualNamesForFileLikeStorage();
for (const auto & column : virtual_columns)
{
if (column.name == "_file" || column.name == "_path")
if (column.name == "_file" || column.name == "_path" || !common_virtuals.contains(column.name))
block.insert({column.type->createColumn(), column.type, column.name});
}
@ -233,18 +259,19 @@ std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * pr
return splitFilterDagForAllowedInputs(predicate, &block);
}
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns)
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
{
Block block;
NameSet common_virtuals = getVirtualNamesForFileLikeStorage();
for (const auto & column : virtual_columns)
{
if (column.name == "_file" || column.name == "_path")
if (column.name == "_file" || column.name == "_path" || !common_virtuals.contains(column.name))
block.insert({column.type->createColumn(), column.type, column.name});
}
block.insert({ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "_idx"});
for (size_t i = 0; i != paths.size(); ++i)
addPathAndFileToVirtualColumns(block, paths[i], i);
addPathAndFileToVirtualColumns(block, paths[i], i, getFormatSettings(context), context->getSettingsRef().use_hive_partitioning);
filterBlockWithExpression(actions, block);

View File

@ -75,14 +75,14 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(
const std::string & sample_path = "",
std::optional<FormatSettings> format_settings_ = std::nullopt);
std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns);
std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns);
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
template <typename T>
void filterByPathOrFile(std::vector<T> & sources, const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns)
void filterByPathOrFile(std::vector<T> & sources, const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
{
auto indexes_column = getFilterByPathAndFileIndexes(paths, actions, virtual_columns);
auto indexes_column = getFilterByPathAndFileIndexes(paths, actions, virtual_columns, context);
const auto & indexes = typeid_cast<const ColumnUInt64 &>(*indexes_column).getData();
if (indexes.size() == sources.size())
return;

View File

@ -646,6 +646,7 @@ std::pair<BlocksPtr, Block> StorageWindowView::getNewBlocks(UInt32 watermark)
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
PullingAsyncPipelineExecutor executor(pipeline);
pipeline.setConcurrencyControl(getContext()->getSettingsRef().use_concurrency_control);
Block block;
BlocksPtr new_blocks = std::make_shared<Blocks>();

View File

@ -112,6 +112,7 @@ Block TableFunctionFormat::parseData(const ColumnsDescription & columns, const S
});
}
builder.setConcurrencyControl(context->getSettingsRef().use_concurrency_control);
auto pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
auto reader = std::make_unique<PullingPipelineExecutor>(*pipeline);

View File

@ -139,13 +139,20 @@ def main():
jr = JobReport.load(from_file=report_file)
additional_files.append(report_file)
for file in set(jr.additional_files):
file_ = Path(file)
file_name = file_.name
orig_file = Path(file)
file_name = orig_file.name
file_name = file_name.replace(
".", "__" + CI.Utils.normalize_string(job_id) + ".", 1
)
file_ = file_.rename(file_.parent / file_name)
additional_files.append(file_)
new_file = orig_file.rename(orig_file.parent / file_name)
for tr in test_results:
if tr.log_files is None:
continue
tr.log_files = [
new_file if (Path(log_file) == orig_file) else Path(log_file)
for log_file in tr.log_files
]
additional_files.append(new_file)
JobReport(
description=description,

View File

@ -559,7 +559,7 @@ class CI:
JobNames.BUGFIX_VALIDATE: JobConfig(
run_by_label="pr-bugfix",
run_command="bugfix_validate_check.py",
timeout=900,
timeout=2400,
runner_type=Runners.STYLE_CHECKER,
),
}

View File

@ -1,7 +1,7 @@
import logging
import os
from pathlib import Path
from typing import Dict, List, Optional, Sequence, Union
import os
import logging
from env_helper import (
GITHUB_JOB_URL,
@ -12,6 +12,8 @@ from env_helper import (
from report import TestResults, create_test_html_report
from s3_helper import S3Helper
logger = logging.getLogger(__name__)
def process_logs(
s3_client: S3Helper,
@ -19,7 +21,7 @@ def process_logs(
s3_path_prefix: str,
test_results: TestResults,
) -> List[str]:
logging.info("Upload files to s3 %s", additional_logs)
logger.info("Upload files to s3 %s", additional_logs)
processed_logs = {} # type: Dict[str, str]
# Firstly convert paths of logs from test_results to urls to s3.
@ -33,9 +35,19 @@ def process_logs(
if path in processed_logs:
test_result.log_urls.append(processed_logs[str(path)])
elif path:
url = s3_client.upload_test_report_to_s3(
Path(path), s3_path_prefix + "/" + str(path)
)
try:
url = s3_client.upload_test_report_to_s3(
Path(path), s3_path_prefix + "/" + str(path)
)
except FileNotFoundError:
# Breaking the whole run on the malformed test is a bad idea
# FIXME: report the failure
logger.error(
"A broken TestResult, file '%s' does not exist: %s",
path,
test_result,
)
continue
test_result.log_urls.append(url)
processed_logs[str(path)] = url
@ -48,7 +60,7 @@ def process_logs(
)
)
else:
logging.error("File %s is missing - skip", log_path)
logger.error("File %s is missing - skip", log_path)
return additional_urls
@ -116,8 +128,8 @@ def upload_results(
report_path.write_text(html_report, encoding="utf-8")
url = s3_client.upload_test_report_to_s3(report_path, s3_path_prefix + ".html")
else:
logging.info("report.html was prepared by test job itself")
logger.info("report.html was prepared by test job itself")
url = ready_report_url
logging.info("Search result in url %s", url)
logger.info("Search result in url %s", url)
return url

View File

@ -1,4 +1,5 @@
<clickhouse>
<concurrent_threads_soft_limit_ratio_to_cores>0</concurrent_threads_soft_limit_ratio_to_cores>
<concurrent_threads_soft_limit_num>50</concurrent_threads_soft_limit_num>
<query_log>
<database>system</database>

View File

@ -28,6 +28,16 @@ node4 = cluster.add_instance(
)
def assert_profile_event(node, query_id, profile_event, check):
assert check(
int(
node.query(
f"select ProfileEvents['{profile_event}'] from system.query_log where current_database = currentDatabase() and query_id = '{query_id}' and type = 'QueryFinish' order by query_start_time_microseconds desc limit 1"
)
)
)
@pytest.fixture(scope="module")
def started_cluster():
try:
@ -43,39 +53,176 @@ def test_concurrent_threads_soft_limit_default(started_cluster):
query_id="test_concurrent_threads_soft_limit_1",
)
node1.query("SYSTEM FLUSH LOGS")
assert_profile_event(
node1,
"test_concurrent_threads_soft_limit_1",
"ConcurrencyControlGrantedHard",
lambda x: x == 1,
)
assert_profile_event(
node1,
"test_concurrent_threads_soft_limit_1",
"ConcurrencyControlGrantDelayed",
lambda x: x == 0,
)
assert_profile_event(
node1,
"test_concurrent_threads_soft_limit_1",
"ConcurrencyControlAcquiredTotal",
lambda x: x == 100,
)
assert_profile_event(
node1,
"test_concurrent_threads_soft_limit_1",
"ConcurrencyControlAllocationDelayed",
lambda x: x == 0,
)
assert (
node1.query(
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_1'"
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_1' order by query_start_time_microseconds desc limit 1"
)
== "102\n"
)
@pytest.mark.skip(reason="broken test")
def test_use_concurrency_control_default(started_cluster):
node1.query(
"SELECT count(*) FROM numbers_mt(10000000) SETTINGS use_concurrency_control = 0",
query_id="test_use_concurrency_control",
)
# Concurrency control is not used, all metrics should be zeros
node1.query("SYSTEM FLUSH LOGS")
assert_profile_event(
node1,
"test_use_concurrency_control",
"ConcurrencyControlGrantedHard",
lambda x: x == 0,
)
assert_profile_event(
node1,
"test_use_concurrency_control",
"ConcurrencyControlGrantDelayed",
lambda x: x == 0,
)
assert_profile_event(
node1,
"test_use_concurrency_control",
"ConcurrencyControlAcquiredTotal",
lambda x: x == 0,
)
assert_profile_event(
node1,
"test_use_concurrency_control",
"ConcurrencyControlAllocationDelayed",
lambda x: x == 0,
)
def test_concurrent_threads_soft_limit_defined_50(started_cluster):
node2.query(
"SELECT count(*) FROM numbers_mt(10000000)",
query_id="test_concurrent_threads_soft_limit_2",
)
node2.query("SYSTEM FLUSH LOGS")
assert_profile_event(
node2,
"test_concurrent_threads_soft_limit_2",
"ConcurrencyControlGrantedHard",
lambda x: x == 1,
)
assert_profile_event(
node2,
"test_concurrent_threads_soft_limit_2",
"ConcurrencyControlGrantDelayed",
lambda x: x == 50,
)
assert_profile_event(
node2,
"test_concurrent_threads_soft_limit_2",
"ConcurrencyControlAcquiredTotal",
lambda x: x == 50,
)
assert_profile_event(
node2,
"test_concurrent_threads_soft_limit_2",
"ConcurrencyControlAllocationDelayed",
lambda x: x == 1,
)
assert (
node2.query(
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_2'"
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_2' order by query_start_time_microseconds desc limit 1"
)
== "52\n"
)
@pytest.mark.skip(reason="broken test")
def test_use_concurrency_control_soft_limit_defined_50(started_cluster):
node2.query(
"SELECT count(*) FROM numbers_mt(10000000) SETTINGS use_concurrency_control = 0",
query_id="test_use_concurrency_control_2",
)
# Concurrency control is not used, all metrics should be zeros
node2.query("SYSTEM FLUSH LOGS")
assert_profile_event(
node2,
"test_use_concurrency_control_2",
"ConcurrencyControlGrantedHard",
lambda x: x == 0,
)
assert_profile_event(
node2,
"test_use_concurrency_control_2",
"ConcurrencyControlGrantDelayed",
lambda x: x == 0,
)
assert_profile_event(
node2,
"test_use_concurrency_control_2",
"ConcurrencyControlAcquiredTotal",
lambda x: x == 0,
)
assert_profile_event(
node2,
"test_use_concurrency_control_2",
"ConcurrencyControlAllocationDelayed",
lambda x: x == 0,
)
def test_concurrent_threads_soft_limit_defined_1(started_cluster):
node3.query(
"SELECT count(*) FROM numbers_mt(10000000)",
query_id="test_concurrent_threads_soft_limit_3",
)
node3.query("SYSTEM FLUSH LOGS")
assert_profile_event(
node3,
"test_concurrent_threads_soft_limit_3",
"ConcurrencyControlGrantedHard",
lambda x: x == 1,
)
assert_profile_event(
node3,
"test_concurrent_threads_soft_limit_3",
"ConcurrencyControlGrantDelayed",
lambda x: x == 99,
)
assert_profile_event(
node3,
"test_concurrent_threads_soft_limit_3",
"ConcurrencyControlAcquiredTotal",
lambda x: x == 1,
)
assert_profile_event(
node3,
"test_concurrent_threads_soft_limit_3",
"ConcurrencyControlAllocationDelayed",
lambda x: x == 1,
)
assert (
node3.query(
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_3'"
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_3' order by query_start_time_microseconds desc limit 1"
)
== "3\n"
)
@ -84,7 +231,6 @@ def test_concurrent_threads_soft_limit_defined_1(started_cluster):
# In config_limit_reached.xml there is concurrent_threads_soft_limit=10
# Background query starts in a separate thread to reach this limit.
# When this limit is reached the foreground query gets less than 5 queries despite the fact that it has settings max_threads=5
@pytest.mark.skip(reason="broken test")
def test_concurrent_threads_soft_limit_limit_reached(started_cluster):
def background_query():
try:
@ -117,8 +263,32 @@ def test_concurrent_threads_soft_limit_limit_reached(started_cluster):
)
node4.query("SYSTEM FLUSH LOGS")
assert_profile_event(
node4,
"test_concurrent_threads_soft_limit_4",
"ConcurrencyControlGrantedHard",
lambda x: x == 1,
)
assert_profile_event(
node4,
"test_concurrent_threads_soft_limit_4",
"ConcurrencyControlGrantDelayed",
lambda x: x > 0,
)
assert_profile_event(
node4,
"test_concurrent_threads_soft_limit_4",
"ConcurrencyControlAcquiredTotal",
lambda x: x < 5,
)
assert_profile_event(
node4,
"test_concurrent_threads_soft_limit_4",
"ConcurrencyControlAllocationDelayed",
lambda x: x == 1,
)
s_count = node4.query(
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_4'"
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_4' order by query_start_time_microseconds desc limit 1"
).strip()
if s_count:
count = int(s_count)

View File

@ -1,5 +1,6 @@
import pytest
import random, string
import re
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
@ -336,6 +337,10 @@ def test_create_database():
def test_table_functions():
password = new_password()
azure_conn_string = cluster.env_variables["AZURITE_CONNECTION_STRING"]
account_key_pattern = re.compile("AccountKey=.*?(;|$)")
masked_azure_conn_string = re.sub(
account_key_pattern, "AccountKey=[HIDDEN]\\1", azure_conn_string
)
azure_storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
azure_account_name = "devstoreaccount1"
azure_account_key = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
@ -467,23 +472,23 @@ def test_table_functions():
"CREATE TABLE tablefunc30 (x int) AS s3('http://minio1:9001/root/data/test9.csv.gz', 'NOSIGN', 'CSV')",
"CREATE TABLE tablefunc31 (`x` int) AS s3('http://minio1:9001/root/data/test10.csv.gz', 'minio', '[HIDDEN]')",
"CREATE TABLE tablefunc32 (`x` int) AS deltaLake('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')",
f"CREATE TABLE tablefunc33 (x int) AS azureBlobStorage('{azure_conn_string}', 'cont', 'test_simple.csv', 'CSV')",
f"CREATE TABLE tablefunc34 (x int) AS azureBlobStorage('{azure_conn_string}', 'cont', 'test_simple_1.csv', 'CSV', 'none')",
f"CREATE TABLE tablefunc35 (x int) AS azureBlobStorage('{azure_conn_string}', 'cont', 'test_simple_2.csv', 'CSV', 'none', 'auto')",
f"CREATE TABLE tablefunc33 (`x` int) AS azureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple.csv', 'CSV')",
f"CREATE TABLE tablefunc34 (`x` int) AS azureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple_1.csv', 'CSV', 'none')",
f"CREATE TABLE tablefunc35 (`x` int) AS azureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple_2.csv', 'CSV', 'none', 'auto')",
f"CREATE TABLE tablefunc36 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_3.csv', '{azure_account_name}', '[HIDDEN]')",
f"CREATE TABLE tablefunc37 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_4.csv', '{azure_account_name}', '[HIDDEN]', 'CSV')",
f"CREATE TABLE tablefunc38 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_5.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none')",
f"CREATE TABLE tablefunc39 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_6.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none', 'auto')",
f"CREATE TABLE tablefunc40 (x int) AS azureBlobStorage(named_collection_2, connection_string = '{azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')",
f"CREATE TABLE tablefunc40 (`x` int) AS azureBlobStorage(named_collection_2, connection_string = '{masked_azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')",
f"CREATE TABLE tablefunc41 (`x` int) AS azureBlobStorage(named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')",
f"CREATE TABLE tablefunc42 (x int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_conn_string}', 'cont', 'test_simple_9.csv', 'CSV')",
f"CREATE TABLE tablefunc43 (x int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_conn_string}', 'cont', 'test_simple_10.csv', 'CSV', 'none')",
f"CREATE TABLE tablefunc44 (x int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_conn_string}', 'cont', 'test_simple_11.csv', 'CSV', 'none', 'auto')",
f"CREATE TABLE tablefunc42 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{masked_azure_conn_string}', 'cont', 'test_simple_9.csv', 'CSV')",
f"CREATE TABLE tablefunc43 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{masked_azure_conn_string}', 'cont', 'test_simple_10.csv', 'CSV', 'none')",
f"CREATE TABLE tablefunc44 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{masked_azure_conn_string}', 'cont', 'test_simple_11.csv', 'CSV', 'none', 'auto')",
f"CREATE TABLE tablefunc45 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_12.csv', '{azure_account_name}', '[HIDDEN]')",
f"CREATE TABLE tablefunc46 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_13.csv', '{azure_account_name}', '[HIDDEN]', 'CSV')",
f"CREATE TABLE tablefunc47 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_14.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none')",
f"CREATE TABLE tablefunc48 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_15.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none', 'auto')",
f"CREATE TABLE tablefunc49 (x int) AS azureBlobStorageCluster('test_shard_localhost', named_collection_2, connection_string = '{azure_conn_string}', container = 'cont', blob_path = 'test_simple_16.csv', format = 'CSV')",
f"CREATE TABLE tablefunc49 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', named_collection_2, connection_string = '{masked_azure_conn_string}', container = 'cont', blob_path = 'test_simple_16.csv', format = 'CSV')",
f"CREATE TABLE tablefunc50 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_17.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')",
"CREATE TABLE tablefunc51 (`x` int) AS iceberg('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')",
],

View File

@ -3,6 +3,7 @@
set log_queries=1;
set log_query_threads=1;
set max_threads=0;
set use_concurrency_control=0;
WITH 01091 AS id SELECT 1;
SYSTEM FLUSH LOGS;

View File

@ -6,6 +6,6 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} --log_queries=1 --max_threads=32 --query_id "2015_${CLICKHOUSE_DATABASE}_query" -q "select count() from remote('127.0.0.{2,3}', numbers(10)) where number global in (select number % 5 from numbers_mt(1000000))"
${CLICKHOUSE_CLIENT} --log_queries=1 --max_threads=32 --use_concurrency_control=0 --query_id "2015_${CLICKHOUSE_DATABASE}_query" -q "select count() from remote('127.0.0.{2,3}', numbers(10)) where number global in (select number % 5 from numbers_mt(1000000))"
${CLICKHOUSE_CLIENT} -q "system flush logs"
${CLICKHOUSE_CLIENT} -q "select length(thread_ids) >= 32 from system.query_log where event_date >= yesterday() and query_id = '2015_${CLICKHOUSE_DATABASE}_query' and type = 'QueryFinish' and current_database = currentDatabase()"

View File

@ -5,6 +5,9 @@
-- enforce some defaults to be sure that the env settings will not affect the test
SET max_threads=5, async_socket_for_remote=1, prefer_localhost_replica=1, optimize_read_in_order=1, load_marks_asynchronously=0, local_filesystem_read_method='pread', remote_filesystem_read_method='read', trace_profile_events=0;
-- we do not want concurrency control to limit the number of threads
SET use_concurrency_control=0;
-- we use query_thread_log to check peak thread usage
-- after https://github.com/ClickHouse/ClickHouse/issues/53417 there is a simpler way to check it
-- but that will not allow to backport the test to older versions

View File

@ -1,21 +1,31 @@
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
%d: 123
%d: -123
%d: 0
%d: 9223372036854775807
%i: 123
%u: 123
%o: 173
%x: 7b
%X: 7B
%f: 0.000000
%f: 123.456000
%f: -123.456000
%F: 123.456000
%e: 1.234560e+02
%E: 1.234560E+02
%g: 123.456
%G: 123.456
%a: 0x1.edd2f1a9fbe77p+6
%A: 0X1.EDD2F1A9FBE77P+6
%s: abc
┌─printf('%%s: %s', '\n\t')─┐
1. │ %s:
└───────────────────────────┘
%s:
%%: %
%.5d: 00123
%.2f: 123.46
%.2e: 1.23e+02
%.2g: 1.2e+02
%.2s: ab

View File

@ -1,39 +1,47 @@
-- Testing integer formats
select printf('%%d: %d', 123) = '%d: 123';
select printf('%%i: %i', 123) = '%i: 123';
select printf('%%u: %u', 123) = '%u: 123';
select printf('%%o: %o', 123) = '%o: 173';
select printf('%%x: %x', 123) = '%x: 7b';
select printf('%%X: %X', 123) = '%X: 7B';
select printf('%%d: %d', 123);
select printf('%%d: %d', -123);
select printf('%%d: %d', 0);
select printf('%%d: %d', 9223372036854775807);
select printf('%%i: %i', 123);
select printf('%%u: %u', 123);
select printf('%%o: %o', 123);
select printf('%%x: %x', 123);
select printf('%%X: %X', 123);
-- Testing floating point formats
select printf('%%f: %f', 123.456) = '%f: 123.456000';
select printf('%%F: %F', 123.456) = '%F: 123.456000';
select printf('%%e: %e', 123.456) = '%e: 1.234560e+02';
select printf('%%E: %E', 123.456) = '%E: 1.234560E+02';
select printf('%%g: %g', 123.456) = '%g: 123.456';
select printf('%%G: %G', 123.456) = '%G: 123.456';
select printf('%%a: %a', 123.456) = '%a: 0x1.edd2f1a9fbe77p+6';
select printf('%%A: %A', 123.456) = '%A: 0X1.EDD2F1A9FBE77P+6';
select printf('%%f: %f', 0.0);
select printf('%%f: %f', 123.456);
select printf('%%f: %f', -123.456);
select printf('%%F: %F', 123.456);
select printf('%%e: %e', 123.456);
select printf('%%E: %E', 123.456);
select printf('%%g: %g', 123.456);
select printf('%%G: %G', 123.456);
select printf('%%a: %a', 123.456);
select printf('%%A: %A', 123.456);
-- Testing character formats
select printf('%%s: %s', 'abc') = '%s: abc';
select printf('%%s: %s', 'abc');
SELECT printf('%%s: %s', '\n\t') FORMAT PrettyCompact;
select printf('%%s: %s', '');
-- Testing the %% specifier
select printf('%%%%: %%') = '%%: %';
select printf('%%%%: %%');
-- Testing integer formats with precision
select printf('%%.5d: %.5d', 123) = '%.5d: 00123';
select printf('%%.5d: %.5d', 123);
-- Testing floating point formats with precision
select printf('%%.2f: %.2f', 123.456) = '%.2f: 123.46';
select printf('%%.2e: %.2e', 123.456) = '%.2e: 1.23e+02';
select printf('%%.2g: %.2g', 123.456) = '%.2g: 1.2e+02';
select printf('%%.2f: %.2f', 123.456);
select printf('%%.2e: %.2e', 123.456);
select printf('%%.2g: %.2g', 123.456);
-- Testing character formats with precision
select printf('%%.2s: %.2s', 'abc') = '%.2s: ab';
select printf('%%.2s: %.2s', 'abc');
select printf('%%X: %X', 123.123); -- { serverError BAD_ARGUMENTS }
select printf('%%A: %A', 'abc'); -- { serverError BAD_ARGUMENTS }
select printf('%%s: %s', 100); -- { serverError BAD_ARGUMENTS }
select printf('%%n: %n', 100); -- { serverError BAD_ARGUMENTS }
select printf('%%f: %f', 0); -- { serverError BAD_ARGUMENTS }

View File

@ -33,8 +33,8 @@ Cross Elizabeth
[1,2,3] 42.42
Array(Int64) LowCardinality(Float64)
101
2070
2070
2071
2071
b
1
1

View File

@ -0,0 +1,6 @@
1
1
1
1
1
1

View File

@ -0,0 +1,72 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
DATA_DIR=$USER_FILES_PATH/$CLICKHOUSE_TEST_UNIQUE_NAME
mkdir -p $DATA_DIR
cp -r $CURDIR/data_hive/ $DATA_DIR
$CLICKHOUSE_CLIENT --query_id="test_03231_1_$CLICKHOUSE_TEST_UNIQUE_NAME" --query "
SELECT countDistinct(_path) FROM file('$DATA_DIR/data_hive/partitioning/column0=*/sample.parquet') WHERE column0 = 'Elizabeth' SETTINGS use_hive_partitioning=1, optimize_count_from_files=0;
"
$CLICKHOUSE_CLIENT --query "
SYSTEM FLUSH LOGS;
"
for _ in {1..5}; do
count=$( $CLICKHOUSE_CLIENT --query "
SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log
WHERE query_id='test_03231_1_$CLICKHOUSE_TEST_UNIQUE_NAME' AND
current_database = currentDatabase() and type='QueryFinish';" )
if [[ "$count" == "1" ]]; then
echo "1"
break
fi
sleep 1
done
$CLICKHOUSE_CLIENT --query_id="test_03231_2_$CLICKHOUSE_TEST_UNIQUE_NAME" --query "
SELECT countDistinct(_path) FROM file('$DATA_DIR/data_hive/partitioning/identifier=*/email.csv') WHERE identifier = 2070 SETTINGS use_hive_partitioning=1, optimize_count_from_files=0;
"
$CLICKHOUSE_CLIENT --query "
SYSTEM FLUSH LOGS;
"
for _ in {1..5}; do
count=$( $CLICKHOUSE_CLIENT --query "
SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log
WHERE query_id='test_03231_2_$CLICKHOUSE_TEST_UNIQUE_NAME' AND
current_database = currentDatabase() and type='QueryFinish';" )
if [[ "$count" == "1" ]]; then
echo "1"
break
fi
sleep 1
done
$CLICKHOUSE_CLIENT --query_id="test_03231_3_$CLICKHOUSE_TEST_UNIQUE_NAME" --query "
SELECT countDistinct(_path) FROM file('$DATA_DIR/data_hive/partitioning/array=*/sample.parquet') WHERE array = [1,2,3] SETTINGS use_hive_partitioning=1, optimize_count_from_files=0;
"
$CLICKHOUSE_CLIENT --query "
SYSTEM FLUSH LOGS;
"
for _ in {1..5}; do
count=$( $CLICKHOUSE_CLIENT --query "
SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log
WHERE query_id='test_03231_3_$CLICKHOUSE_TEST_UNIQUE_NAME' AND
current_database = currentDatabase() and type='QueryFinish';" )
if [[ "$count" == "1" ]]; then
echo "1"
break
fi
sleep 1
done
rm -rf $DATA_DIR

View File

@ -1 +1 @@
data_hive/partitioning/column0=Elizabeth/sample.parquet
data_hive/partitioning/non_existing_column=Elizabeth/sample.parquet

View File

@ -5,4 +5,4 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_LOCAL -q "SELECT substring(_path, position(_path, 'data_hive')) FROM file('$CURDIR/data_hive/partitioning/column0=*/sample.parquet') LIMIT 1;"
$CLICKHOUSE_LOCAL -q "SELECT substring(_path, position(_path, 'data_hive')) FROM file('$CURDIR/data_hive/partitioning/non_existing_column=*/sample.parquet') LIMIT 1;"

View File

@ -0,0 +1,5 @@
_login_email,_identifier,_first_name,_last_name
laura@example.com,2070,Laura,Grey
craig@example.com,4081,Craig,Johnson
mary@example.com,9346,Mary,Jenkins
jamie@example.com,5079,Jamie,Smith
1 _login_email _identifier _first_name _last_name
2 laura@example.com 2070 Laura Grey
3 craig@example.com 4081 Craig Johnson
4 mary@example.com 9346 Mary Jenkins
5 jamie@example.com 5079 Jamie Smith

View File

@ -181,6 +181,9 @@ ComplexKeyCache
ComplexKeyDirect
ComplexKeyHashed
Composable
composable
ConcurrencyControlAcquired
ConcurrencyControlSoftLimit
Config
ConnectionDetails
Const