mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Compare commits
90 Commits
e764f6726e
...
1b7b1325d4
Author | SHA1 | Date | |
---|---|---|---|
|
1b7b1325d4 | ||
|
c0c83236b6 | ||
|
3eb5bc1a0f | ||
|
b94a7167a8 | ||
|
b88cd79959 | ||
|
64e58baba1 | ||
|
a3fe155579 | ||
|
f4b4b3cc35 | ||
|
cb24849396 | ||
|
758acb433d | ||
|
143d9f0201 | ||
|
7af0ec8b23 | ||
|
b08e727aef | ||
|
f52cdfb795 | ||
|
a210f98819 | ||
|
7c5d55c6b2 | ||
|
80259659ff | ||
|
3a7c68a052 | ||
|
e8d50aa97f | ||
|
e5640acc52 | ||
|
cb92aaf968 | ||
|
c75118c78d | ||
|
0cdec0acf1 | ||
|
d86ad992f1 | ||
|
10819dda2a | ||
|
b8ea0e3396 | ||
|
04f23332c3 | ||
|
7d5203f8a7 | ||
|
0d1d750437 | ||
|
ad31d86a15 | ||
|
991279e5c6 | ||
|
c184aae686 | ||
|
14a6b0422b | ||
|
aab0d3dd9e | ||
|
5a34b9f24e | ||
|
a0a4858e00 | ||
|
90645d7c0e | ||
|
e8cec05d08 | ||
|
2876a4e714 | ||
|
a903e1a726 | ||
|
2fa6be55ff | ||
|
8896d1b78b | ||
|
f688b903db | ||
|
21f9669836 | ||
|
1a386ae4d5 | ||
|
24f4e87f8b | ||
|
b5eb0ef857 | ||
|
78e8dbe008 | ||
|
5c18ffb8d1 | ||
|
620640a042 | ||
|
ec469a117d | ||
|
190d82ddd8 | ||
|
7a879980d8 | ||
|
2adc61c215 | ||
|
d06dc22999 | ||
|
9e3f04c5eb | ||
|
afc4d08aad | ||
|
897e2def34 | ||
|
edc5d8dd92 | ||
|
d6b2a9d534 | ||
|
dc97bd6b92 | ||
|
60c6eb2610 | ||
|
9133505952 | ||
|
2741bf00e4 | ||
|
4eca00a666 | ||
|
c6804122cb | ||
|
189cbe25fe | ||
|
0632febfba | ||
|
67f51ded37 | ||
|
14666f9be3 | ||
|
2f101367db | ||
|
06cac68db7 | ||
|
141179736e | ||
|
184c156e09 | ||
|
cf9ea4af10 | ||
|
30fed916fa | ||
|
384e2ff399 | ||
|
091883b8eb | ||
|
cb6438e472 | ||
|
1cdcc0da57 | ||
|
6472fea0fd | ||
|
dec92f60c6 | ||
|
060ec6b68c | ||
|
bf125d5da6 | ||
|
dd22140b57 | ||
|
d01ab205a9 | ||
|
e66e9d34ac | ||
|
0acdfd6a37 | ||
|
b252062767 | ||
|
dd6c1ac19a |
2
contrib/libpqxx
vendored
2
contrib/libpqxx
vendored
@ -1 +1 @@
|
|||||||
Subproject commit c995193a3a14d71f4711f1f421f65a1a1db64640
|
Subproject commit 41e4c331564167cca97ad6eccbd5b8879c2ca044
|
@ -1,9 +1,9 @@
|
|||||||
set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/libpqxx")
|
set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/libpqxx")
|
||||||
|
|
||||||
set (SRCS
|
set (SRCS
|
||||||
"${LIBRARY_DIR}/src/strconv.cxx"
|
|
||||||
"${LIBRARY_DIR}/src/array.cxx"
|
"${LIBRARY_DIR}/src/array.cxx"
|
||||||
"${LIBRARY_DIR}/src/binarystring.cxx"
|
"${LIBRARY_DIR}/src/binarystring.cxx"
|
||||||
|
"${LIBRARY_DIR}/src/blob.cxx"
|
||||||
"${LIBRARY_DIR}/src/connection.cxx"
|
"${LIBRARY_DIR}/src/connection.cxx"
|
||||||
"${LIBRARY_DIR}/src/cursor.cxx"
|
"${LIBRARY_DIR}/src/cursor.cxx"
|
||||||
"${LIBRARY_DIR}/src/encodings.cxx"
|
"${LIBRARY_DIR}/src/encodings.cxx"
|
||||||
@ -12,59 +12,25 @@ set (SRCS
|
|||||||
"${LIBRARY_DIR}/src/field.cxx"
|
"${LIBRARY_DIR}/src/field.cxx"
|
||||||
"${LIBRARY_DIR}/src/largeobject.cxx"
|
"${LIBRARY_DIR}/src/largeobject.cxx"
|
||||||
"${LIBRARY_DIR}/src/notification.cxx"
|
"${LIBRARY_DIR}/src/notification.cxx"
|
||||||
|
"${LIBRARY_DIR}/src/params.cxx"
|
||||||
"${LIBRARY_DIR}/src/pipeline.cxx"
|
"${LIBRARY_DIR}/src/pipeline.cxx"
|
||||||
"${LIBRARY_DIR}/src/result.cxx"
|
"${LIBRARY_DIR}/src/result.cxx"
|
||||||
"${LIBRARY_DIR}/src/robusttransaction.cxx"
|
"${LIBRARY_DIR}/src/robusttransaction.cxx"
|
||||||
|
"${LIBRARY_DIR}/src/row.cxx"
|
||||||
"${LIBRARY_DIR}/src/sql_cursor.cxx"
|
"${LIBRARY_DIR}/src/sql_cursor.cxx"
|
||||||
|
"${LIBRARY_DIR}/src/strconv.cxx"
|
||||||
"${LIBRARY_DIR}/src/stream_from.cxx"
|
"${LIBRARY_DIR}/src/stream_from.cxx"
|
||||||
"${LIBRARY_DIR}/src/stream_to.cxx"
|
"${LIBRARY_DIR}/src/stream_to.cxx"
|
||||||
"${LIBRARY_DIR}/src/subtransaction.cxx"
|
"${LIBRARY_DIR}/src/subtransaction.cxx"
|
||||||
|
"${LIBRARY_DIR}/src/time.cxx"
|
||||||
"${LIBRARY_DIR}/src/transaction.cxx"
|
"${LIBRARY_DIR}/src/transaction.cxx"
|
||||||
"${LIBRARY_DIR}/src/transaction_base.cxx"
|
"${LIBRARY_DIR}/src/transaction_base.cxx"
|
||||||
"${LIBRARY_DIR}/src/row.cxx"
|
|
||||||
"${LIBRARY_DIR}/src/params.cxx"
|
|
||||||
"${LIBRARY_DIR}/src/util.cxx"
|
"${LIBRARY_DIR}/src/util.cxx"
|
||||||
"${LIBRARY_DIR}/src/version.cxx"
|
"${LIBRARY_DIR}/src/version.cxx"
|
||||||
|
"${LIBRARY_DIR}/src/wait.cxx"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Need to explicitly include each header file, because in the directory include/pqxx there are also files
|
add_library(_libpqxx ${SRCS})
|
||||||
# like just 'array'. So if including the whole directory with `target_include_directories`, it will make
|
|
||||||
# conflicts with all includes of <array>.
|
|
||||||
set (HDRS
|
|
||||||
"${LIBRARY_DIR}/include/pqxx/array.hxx"
|
|
||||||
"${LIBRARY_DIR}/include/pqxx/params.hxx"
|
|
||||||
"${LIBRARY_DIR}/include/pqxx/binarystring.hxx"
|
|
||||||
"${LIBRARY_DIR}/include/pqxx/composite.hxx"
|
|
||||||
"${LIBRARY_DIR}/include/pqxx/connection.hxx"
|
|
||||||
"${LIBRARY_DIR}/include/pqxx/cursor.hxx"
|
|
||||||
"${LIBRARY_DIR}/include/pqxx/dbtransaction.hxx"
|
|
||||||
"${LIBRARY_DIR}/include/pqxx/errorhandler.hxx"
|
|
||||||
"${LIBRARY_DIR}/include/pqxx/except.hxx"
|
|
||||||
"${LIBRARY_DIR}/include/pqxx/field.hxx"
|
|
||||||
"${LIBRARY_DIR}/include/pqxx/isolation.hxx"
|
|
||||||
"${LIBRARY_DIR}/include/pqxx/largeobject.hxx"
|
|
||||||
"${LIBRARY_DIR}/include/pqxx/nontransaction.hxx"
|
|
||||||
"${LIBRARY_DIR}/include/pqxx/notification.hxx"
|
|
||||||
"${LIBRARY_DIR}/include/pqxx/pipeline.hxx"
|
|
||||||
"${LIBRARY_DIR}/include/pqxx/prepared_statement.hxx"
|
|
||||||
"${LIBRARY_DIR}/include/pqxx/result.hxx"
|
|
||||||
"${LIBRARY_DIR}/include/pqxx/robusttransaction.hxx"
|
|
||||||
"${LIBRARY_DIR}/include/pqxx/row.hxx"
|
|
||||||
"${LIBRARY_DIR}/include/pqxx/separated_list.hxx"
|
|
||||||
"${LIBRARY_DIR}/include/pqxx/strconv.hxx"
|
|
||||||
"${LIBRARY_DIR}/include/pqxx/stream_from.hxx"
|
|
||||||
"${LIBRARY_DIR}/include/pqxx/stream_to.hxx"
|
|
||||||
"${LIBRARY_DIR}/include/pqxx/subtransaction.hxx"
|
|
||||||
"${LIBRARY_DIR}/include/pqxx/transaction.hxx"
|
|
||||||
"${LIBRARY_DIR}/include/pqxx/transaction_base.hxx"
|
|
||||||
"${LIBRARY_DIR}/include/pqxx/types.hxx"
|
|
||||||
"${LIBRARY_DIR}/include/pqxx/util.hxx"
|
|
||||||
"${LIBRARY_DIR}/include/pqxx/version.hxx"
|
|
||||||
"${LIBRARY_DIR}/include/pqxx/zview.hxx"
|
|
||||||
)
|
|
||||||
|
|
||||||
add_library(_libpqxx ${SRCS} ${HDRS})
|
|
||||||
|
|
||||||
target_link_libraries(_libpqxx PUBLIC ch_contrib::libpq)
|
target_link_libraries(_libpqxx PUBLIC ch_contrib::libpq)
|
||||||
target_include_directories (_libpqxx SYSTEM BEFORE PUBLIC "${LIBRARY_DIR}/include")
|
target_include_directories (_libpqxx SYSTEM BEFORE PUBLIC "${LIBRARY_DIR}/include")
|
||||||
|
|
||||||
|
@ -737,6 +737,14 @@ Number of sessions (connections) to ZooKeeper. Should be no more than one, becau
|
|||||||
|
|
||||||
Number of watches (event subscriptions) in ZooKeeper.
|
Number of watches (event subscriptions) in ZooKeeper.
|
||||||
|
|
||||||
|
### ConcurrencyControlAcquired
|
||||||
|
|
||||||
|
Total number of acquired CPU slots.
|
||||||
|
|
||||||
|
### ConcurrencyControlSoftLimit
|
||||||
|
|
||||||
|
Value of soft limit on number of CPU slots.
|
||||||
|
|
||||||
**See Also**
|
**See Also**
|
||||||
|
|
||||||
- [system.asynchronous_metrics](../../operations/system-tables/asynchronous_metrics.md#system_tables-asynchronous_metrics) — Contains periodically calculated metrics.
|
- [system.asynchronous_metrics](../../operations/system-tables/asynchronous_metrics.md#system_tables-asynchronous_metrics) — Contains periodically calculated metrics.
|
||||||
|
@ -1631,6 +1631,7 @@ try
|
|||||||
concurrent_threads_soft_limit = value;
|
concurrent_threads_soft_limit = value;
|
||||||
}
|
}
|
||||||
ConcurrencyControl::instance().setMaxConcurrency(concurrent_threads_soft_limit);
|
ConcurrencyControl::instance().setMaxConcurrency(concurrent_threads_soft_limit);
|
||||||
|
LOG_INFO(log, "ConcurrencyControl limit is set to {}", concurrent_threads_soft_limit);
|
||||||
|
|
||||||
global_context->getProcessList().setMaxSize(new_server_settings.max_concurrent_queries);
|
global_context->getProcessList().setMaxSize(new_server_settings.max_concurrent_queries);
|
||||||
global_context->getProcessList().setMaxInsertQueriesAmount(new_server_settings.max_concurrent_insert_queries);
|
global_context->getProcessList().setMaxInsertQueriesAmount(new_server_settings.max_concurrent_insert_queries);
|
||||||
|
@ -537,6 +537,7 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, Iden
|
|||||||
PullingAsyncPipelineExecutor executor(io.pipeline);
|
PullingAsyncPipelineExecutor executor(io.pipeline);
|
||||||
io.pipeline.setProgressCallback(context->getProgressCallback());
|
io.pipeline.setProgressCallback(context->getProgressCallback());
|
||||||
io.pipeline.setProcessListElement(context->getProcessListElement());
|
io.pipeline.setProcessListElement(context->getProcessListElement());
|
||||||
|
io.pipeline.setConcurrencyControl(context->getSettingsRef().use_concurrency_control);
|
||||||
|
|
||||||
Block block;
|
Block block;
|
||||||
|
|
||||||
|
@ -1701,6 +1701,9 @@ try
|
|||||||
QueryPipeline pipeline(std::move(pipe));
|
QueryPipeline pipeline(std::move(pipe));
|
||||||
PullingAsyncPipelineExecutor executor(pipeline);
|
PullingAsyncPipelineExecutor executor(pipeline);
|
||||||
|
|
||||||
|
/// Concurrency control in client is not required
|
||||||
|
pipeline.setConcurrencyControl(false);
|
||||||
|
|
||||||
if (need_render_progress)
|
if (need_render_progress)
|
||||||
{
|
{
|
||||||
pipeline.setProgressCallback([this](const Progress & progress){ onProgress(progress); });
|
pipeline.setProgressCallback([this](const Progress & progress){ onProgress(progress); });
|
||||||
|
@ -240,6 +240,7 @@ void LocalConnection::sendQuery(
|
|||||||
{
|
{
|
||||||
state->block = state->io.pipeline.getHeader();
|
state->block = state->io.pipeline.getHeader();
|
||||||
state->executor = std::make_unique<PullingAsyncPipelineExecutor>(state->io.pipeline);
|
state->executor = std::make_unique<PullingAsyncPipelineExecutor>(state->io.pipeline);
|
||||||
|
state->io.pipeline.setConcurrencyControl(false);
|
||||||
}
|
}
|
||||||
else if (state->io.pipeline.completed())
|
else if (state->io.pipeline.completed())
|
||||||
{
|
{
|
||||||
|
@ -1,7 +1,23 @@
|
|||||||
|
#include <Common/ISlotControl.h>
|
||||||
#include <Common/ConcurrencyControl.h>
|
#include <Common/ConcurrencyControl.h>
|
||||||
#include <Common/Exception.h>
|
#include <Common/Exception.h>
|
||||||
|
#include <Common/ProfileEvents.h>
|
||||||
|
|
||||||
|
|
||||||
|
namespace ProfileEvents
|
||||||
|
{
|
||||||
|
extern const Event ConcurrencyControlGrantedHard;
|
||||||
|
extern const Event ConcurrencyControlGrantDelayed;
|
||||||
|
extern const Event ConcurrencyControlAcquiredTotal;
|
||||||
|
extern const Event ConcurrencyControlAllocationDelayed;
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace CurrentMetrics
|
||||||
|
{
|
||||||
|
extern const Metric ConcurrencyControlAcquired;
|
||||||
|
extern const Metric ConcurrencyControlSoftLimit;
|
||||||
|
}
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -17,6 +33,7 @@ ConcurrencyControl::Slot::~Slot()
|
|||||||
|
|
||||||
ConcurrencyControl::Slot::Slot(SlotAllocationPtr && allocation_)
|
ConcurrencyControl::Slot::Slot(SlotAllocationPtr && allocation_)
|
||||||
: allocation(std::move(allocation_))
|
: allocation(std::move(allocation_))
|
||||||
|
, acquired_slot_increment(CurrentMetrics::ConcurrencyControlAcquired)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -34,6 +51,7 @@ ConcurrencyControl::Allocation::~Allocation()
|
|||||||
{
|
{
|
||||||
if (granted.compare_exchange_strong(value, value - 1))
|
if (granted.compare_exchange_strong(value, value - 1))
|
||||||
{
|
{
|
||||||
|
ProfileEvents::increment(ProfileEvents::ConcurrencyControlAcquiredTotal, 1);
|
||||||
std::unique_lock lock{mutex};
|
std::unique_lock lock{mutex};
|
||||||
return AcquiredSlotPtr(new Slot(shared_from_this())); // can't use std::make_shared due to private ctor
|
return AcquiredSlotPtr(new Slot(shared_from_this())); // can't use std::make_shared due to private ctor
|
||||||
}
|
}
|
||||||
@ -84,6 +102,7 @@ void ConcurrencyControl::Allocation::release()
|
|||||||
|
|
||||||
ConcurrencyControl::ConcurrencyControl()
|
ConcurrencyControl::ConcurrencyControl()
|
||||||
: cur_waiter(waiters.end())
|
: cur_waiter(waiters.end())
|
||||||
|
, max_concurrency_metric(CurrentMetrics::ConcurrencyControlSoftLimit, 0)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -103,11 +122,16 @@ ConcurrencyControl::~ConcurrencyControl()
|
|||||||
// Acquire as many slots as we can, but not lower than `min`
|
// Acquire as many slots as we can, but not lower than `min`
|
||||||
SlotCount granted = std::max(min, std::min(max, available(lock)));
|
SlotCount granted = std::max(min, std::min(max, available(lock)));
|
||||||
cur_concurrency += granted;
|
cur_concurrency += granted;
|
||||||
|
ProfileEvents::increment(ProfileEvents::ConcurrencyControlGrantedHard, min);
|
||||||
|
|
||||||
// Create allocation and start waiting if more slots are required
|
// Create allocation and start waiting if more slots are required
|
||||||
if (granted < max)
|
if (granted < max)
|
||||||
|
{
|
||||||
|
ProfileEvents::increment(ProfileEvents::ConcurrencyControlGrantDelayed, max - granted);
|
||||||
|
ProfileEvents::increment(ProfileEvents::ConcurrencyControlAllocationDelayed);
|
||||||
return SlotAllocationPtr(new Allocation(*this, max, granted,
|
return SlotAllocationPtr(new Allocation(*this, max, granted,
|
||||||
waiters.insert(cur_waiter, nullptr /* pointer is set by Allocation ctor */)));
|
waiters.insert(cur_waiter, nullptr /* pointer is set by Allocation ctor */)));
|
||||||
|
}
|
||||||
else
|
else
|
||||||
return SlotAllocationPtr(new Allocation(*this, max, granted));
|
return SlotAllocationPtr(new Allocation(*this, max, granted));
|
||||||
}
|
}
|
||||||
@ -116,6 +140,7 @@ void ConcurrencyControl::setMaxConcurrency(SlotCount value)
|
|||||||
{
|
{
|
||||||
std::unique_lock lock{mutex};
|
std::unique_lock lock{mutex};
|
||||||
max_concurrency = std::max<SlotCount>(1, value); // never allow max_concurrency to be zero
|
max_concurrency = std::max<SlotCount>(1, value); // never allow max_concurrency to be zero
|
||||||
|
max_concurrency_metric.changeTo(max_concurrency == UnlimitedSlots ? 0 : max_concurrency);
|
||||||
schedule(lock);
|
schedule(lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -8,6 +8,7 @@
|
|||||||
#include <base/types.h>
|
#include <base/types.h>
|
||||||
#include <boost/core/noncopyable.hpp>
|
#include <boost/core/noncopyable.hpp>
|
||||||
|
|
||||||
|
#include <Common/CurrentMetrics.h>
|
||||||
#include <Common/ISlotControl.h>
|
#include <Common/ISlotControl.h>
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -53,6 +54,7 @@ public:
|
|||||||
explicit Slot(SlotAllocationPtr && allocation_);
|
explicit Slot(SlotAllocationPtr && allocation_);
|
||||||
|
|
||||||
SlotAllocationPtr allocation;
|
SlotAllocationPtr allocation;
|
||||||
|
CurrentMetrics::Increment acquired_slot_increment;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Manages group of slots for a single query, see ConcurrencyControl::allocate(min, max)
|
// Manages group of slots for a single query, see ConcurrencyControl::allocate(min, max)
|
||||||
@ -131,6 +133,7 @@ private:
|
|||||||
Waiters::iterator cur_waiter; // round-robin pointer
|
Waiters::iterator cur_waiter; // round-robin pointer
|
||||||
SlotCount max_concurrency = UnlimitedSlots;
|
SlotCount max_concurrency = UnlimitedSlots;
|
||||||
SlotCount cur_concurrency = 0;
|
SlotCount cur_concurrency = 0;
|
||||||
|
CurrentMetrics::Increment max_concurrency_metric;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -313,6 +313,9 @@
|
|||||||
M(FilteringMarksWithPrimaryKey, "Number of threads currently doing filtering of mark ranges by the primary key") \
|
M(FilteringMarksWithPrimaryKey, "Number of threads currently doing filtering of mark ranges by the primary key") \
|
||||||
M(FilteringMarksWithSecondaryKeys, "Number of threads currently doing filtering of mark ranges by secondary keys") \
|
M(FilteringMarksWithSecondaryKeys, "Number of threads currently doing filtering of mark ranges by secondary keys") \
|
||||||
\
|
\
|
||||||
|
M(ConcurrencyControlAcquired, "Total number of acquired CPU slots") \
|
||||||
|
M(ConcurrencyControlSoftLimit, "Value of soft limit on number of CPU slots") \
|
||||||
|
\
|
||||||
M(DiskS3NoSuchKeyErrors, "The number of `NoSuchKey` errors that occur when reading data from S3 cloud storage through ClickHouse disks.") \
|
M(DiskS3NoSuchKeyErrors, "The number of `NoSuchKey` errors that occur when reading data from S3 cloud storage through ClickHouse disks.") \
|
||||||
|
|
||||||
#ifdef APPLY_FOR_EXTERNAL_METRICS
|
#ifdef APPLY_FOR_EXTERNAL_METRICS
|
||||||
|
@ -73,4 +73,44 @@ public:
|
|||||||
[[nodiscard]] virtual SlotAllocationPtr allocate(SlotCount min, SlotCount max) = 0;
|
[[nodiscard]] virtual SlotAllocationPtr allocate(SlotCount min, SlotCount max) = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// Allocation that grants all the slots immediately on creation
|
||||||
|
class GrantedAllocation : public ISlotAllocation
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
explicit GrantedAllocation(SlotCount granted_)
|
||||||
|
: granted(granted_)
|
||||||
|
, allocated(granted_)
|
||||||
|
{}
|
||||||
|
|
||||||
|
[[nodiscard]] AcquiredSlotPtr tryAcquire() override
|
||||||
|
{
|
||||||
|
SlotCount value = granted.load();
|
||||||
|
while (value)
|
||||||
|
{
|
||||||
|
if (granted.compare_exchange_strong(value, value - 1))
|
||||||
|
return std::make_shared<IAcquiredSlot>();
|
||||||
|
}
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
SlotCount grantedCount() const override
|
||||||
|
{
|
||||||
|
return granted.load();
|
||||||
|
}
|
||||||
|
|
||||||
|
SlotCount allocatedCount() const override
|
||||||
|
{
|
||||||
|
return allocated;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::atomic<SlotCount> granted; // allocated, but not yet acquired
|
||||||
|
const SlotCount allocated;
|
||||||
|
};
|
||||||
|
|
||||||
|
[[nodiscard]] inline SlotAllocationPtr grantSlots(SlotCount count)
|
||||||
|
{
|
||||||
|
return SlotAllocationPtr(new GrantedAllocation(count));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -826,6 +826,11 @@ The server successfully detected this situation and will download merged part fr
|
|||||||
M(ReadWriteBufferFromHTTPRequestsSent, "Number of HTTP requests sent by ReadWriteBufferFromHTTP") \
|
M(ReadWriteBufferFromHTTPRequestsSent, "Number of HTTP requests sent by ReadWriteBufferFromHTTP") \
|
||||||
M(ReadWriteBufferFromHTTPBytes, "Total size of payload bytes received and sent by ReadWriteBufferFromHTTP. Doesn't include HTTP headers.") \
|
M(ReadWriteBufferFromHTTPBytes, "Total size of payload bytes received and sent by ReadWriteBufferFromHTTP. Doesn't include HTTP headers.") \
|
||||||
\
|
\
|
||||||
|
M(ConcurrencyControlGrantedHard, "Number of CPU slot granted according to guarantee of 1 thread per query and for queries with setting 'use_concurrency_control' = 0") \
|
||||||
|
M(ConcurrencyControlGrantDelayed, "Number of CPU slot not granted initially and required to wait for a free CPU slot") \
|
||||||
|
M(ConcurrencyControlAcquiredTotal, "Total number of CPU slot acquired") \
|
||||||
|
M(ConcurrencyControlAllocationDelayed, "Total number of CPU slot allocations (queries) that were required to wait for slots to upscale") \
|
||||||
|
\
|
||||||
M(GWPAsanAllocateSuccess, "Number of successful allocations done by GWPAsan") \
|
M(GWPAsanAllocateSuccess, "Number of successful allocations done by GWPAsan") \
|
||||||
M(GWPAsanAllocateFailed, "Number of failed allocations done by GWPAsan (i.e. filled pool)") \
|
M(GWPAsanAllocateFailed, "Number of failed allocations done by GWPAsan (i.e. filled pool)") \
|
||||||
M(GWPAsanFree, "Number of free operations done by GWPAsan") \
|
M(GWPAsanFree, "Number of free operations done by GWPAsan") \
|
||||||
|
@ -151,6 +151,15 @@ Names NamesAndTypesList::getNames() const
|
|||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
NameSet NamesAndTypesList::getNameSet() const
|
||||||
|
{
|
||||||
|
NameSet res;
|
||||||
|
res.reserve(size());
|
||||||
|
for (const NameAndTypePair & column : *this)
|
||||||
|
res.insert(column.name);
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
DataTypes NamesAndTypesList::getTypes() const
|
DataTypes NamesAndTypesList::getTypes() const
|
||||||
{
|
{
|
||||||
DataTypes res;
|
DataTypes res;
|
||||||
|
@ -100,6 +100,7 @@ public:
|
|||||||
void getDifference(const NamesAndTypesList & rhs, NamesAndTypesList & deleted, NamesAndTypesList & added) const;
|
void getDifference(const NamesAndTypesList & rhs, NamesAndTypesList & deleted, NamesAndTypesList & added) const;
|
||||||
|
|
||||||
Names getNames() const;
|
Names getNames() const;
|
||||||
|
NameSet getNameSet() const;
|
||||||
DataTypes getTypes() const;
|
DataTypes getTypes() const;
|
||||||
|
|
||||||
/// Remove columns which names are not in the `names`.
|
/// Remove columns which names are not in the `names`.
|
||||||
|
@ -666,6 +666,7 @@ void CacheDictionary<dictionary_key_type>::update(CacheDictionaryUpdateUnitPtr<d
|
|||||||
Columns fetched_columns_during_update = fetch_request.makeAttributesResultColumnsNonMutable();
|
Columns fetched_columns_during_update = fetch_request.makeAttributesResultColumnsNonMutable();
|
||||||
|
|
||||||
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
||||||
|
pipeline.setConcurrencyControl(false);
|
||||||
Block block;
|
Block block;
|
||||||
while (executor.pull(block))
|
while (executor.pull(block))
|
||||||
{
|
{
|
||||||
|
@ -334,6 +334,7 @@ public:
|
|||||||
, pipeline(std::move(pipeline_))
|
, pipeline(std::move(pipeline_))
|
||||||
, executor(pipeline)
|
, executor(pipeline)
|
||||||
{
|
{
|
||||||
|
pipeline.setConcurrencyControl(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string getName() const override
|
std::string getName() const override
|
||||||
@ -378,7 +379,6 @@ Pipe DirectDictionary<dictionary_key_type>::getSourcePipe(
|
|||||||
ids.emplace_back(key);
|
ids.emplace_back(key);
|
||||||
|
|
||||||
auto pipeline = source_ptr->loadIds(ids);
|
auto pipeline = source_ptr->loadIds(ids);
|
||||||
|
|
||||||
if (use_async_executor)
|
if (use_async_executor)
|
||||||
pipe = Pipe(std::make_shared<SourceFromQueryPipeline<PullingAsyncPipelineExecutor>>(std::move(pipeline)));
|
pipe = Pipe(std::make_shared<SourceFromQueryPipeline<PullingAsyncPipelineExecutor>>(std::move(pipeline)));
|
||||||
else
|
else
|
||||||
|
@ -454,6 +454,7 @@ void FlatDictionary::updateData()
|
|||||||
{
|
{
|
||||||
QueryPipeline pipeline(source_ptr->loadUpdatedAll());
|
QueryPipeline pipeline(source_ptr->loadUpdatedAll());
|
||||||
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
||||||
|
pipeline.setConcurrencyControl(false);
|
||||||
update_field_loaded_block.reset();
|
update_field_loaded_block.reset();
|
||||||
Block block;
|
Block block;
|
||||||
|
|
||||||
@ -495,6 +496,7 @@ void FlatDictionary::loadData()
|
|||||||
{
|
{
|
||||||
QueryPipeline pipeline(source_ptr->loadAll());
|
QueryPipeline pipeline(source_ptr->loadAll());
|
||||||
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
||||||
|
pipeline.setConcurrencyControl(false);
|
||||||
|
|
||||||
Block block;
|
Block block;
|
||||||
while (executor.pull(block))
|
while (executor.pull(block))
|
||||||
|
@ -475,6 +475,7 @@ void HashedArrayDictionary<dictionary_key_type, sharded>::updateData()
|
|||||||
{
|
{
|
||||||
QueryPipeline pipeline(source_ptr->loadUpdatedAll());
|
QueryPipeline pipeline(source_ptr->loadUpdatedAll());
|
||||||
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
||||||
|
pipeline.setConcurrencyControl(false);
|
||||||
update_field_loaded_block.reset();
|
update_field_loaded_block.reset();
|
||||||
Block block;
|
Block block;
|
||||||
|
|
||||||
@ -973,6 +974,7 @@ void HashedArrayDictionary<dictionary_key_type, sharded>::loadData()
|
|||||||
|
|
||||||
QueryPipeline pipeline(source_ptr->loadAll());
|
QueryPipeline pipeline(source_ptr->loadAll());
|
||||||
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
||||||
|
pipeline.setConcurrencyControl(false);
|
||||||
|
|
||||||
UInt64 pull_time_microseconds = 0;
|
UInt64 pull_time_microseconds = 0;
|
||||||
UInt64 process_time_microseconds = 0;
|
UInt64 process_time_microseconds = 0;
|
||||||
|
@ -884,6 +884,7 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::updateData()
|
|||||||
{
|
{
|
||||||
QueryPipeline pipeline(source_ptr->loadUpdatedAll());
|
QueryPipeline pipeline(source_ptr->loadUpdatedAll());
|
||||||
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
||||||
|
pipeline.setConcurrencyControl(false);
|
||||||
update_field_loaded_block.reset();
|
update_field_loaded_block.reset();
|
||||||
Block block;
|
Block block;
|
||||||
|
|
||||||
@ -1163,6 +1164,7 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::loadData()
|
|||||||
QueryPipeline pipeline(source_ptr->loadAll());
|
QueryPipeline pipeline(source_ptr->loadAll());
|
||||||
|
|
||||||
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
||||||
|
pipeline.setConcurrencyControl(false);
|
||||||
Block block;
|
Block block;
|
||||||
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder;
|
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder;
|
||||||
|
|
||||||
|
@ -407,6 +407,7 @@ void IPAddressDictionary::loadData()
|
|||||||
bool has_ipv6 = false;
|
bool has_ipv6 = false;
|
||||||
|
|
||||||
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
||||||
|
pipeline.setConcurrencyControl(false);
|
||||||
Block block;
|
Block block;
|
||||||
while (executor.pull(block))
|
while (executor.pull(block))
|
||||||
{
|
{
|
||||||
|
@ -291,6 +291,7 @@ void IPolygonDictionary::loadData()
|
|||||||
QueryPipeline pipeline(source_ptr->loadAll());
|
QueryPipeline pipeline(source_ptr->loadAll());
|
||||||
|
|
||||||
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
||||||
|
pipeline.setConcurrencyControl(false);
|
||||||
Block block;
|
Block block;
|
||||||
while (executor.pull(block))
|
while (executor.pull(block))
|
||||||
blockToAttributes(block);
|
blockToAttributes(block);
|
||||||
|
@ -541,6 +541,7 @@ void RangeHashedDictionary<dictionary_key_type>::loadData()
|
|||||||
{
|
{
|
||||||
QueryPipeline pipeline(source_ptr->loadAll());
|
QueryPipeline pipeline(source_ptr->loadAll());
|
||||||
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
||||||
|
pipeline.setConcurrencyControl(false);
|
||||||
Block block;
|
Block block;
|
||||||
|
|
||||||
while (executor.pull(block))
|
while (executor.pull(block))
|
||||||
@ -692,6 +693,7 @@ void RangeHashedDictionary<dictionary_key_type>::updateData()
|
|||||||
{
|
{
|
||||||
QueryPipeline pipeline(source_ptr->loadUpdatedAll());
|
QueryPipeline pipeline(source_ptr->loadUpdatedAll());
|
||||||
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
||||||
|
pipeline.setConcurrencyControl(false);
|
||||||
update_field_loaded_block.reset();
|
update_field_loaded_block.reset();
|
||||||
Block block;
|
Block block;
|
||||||
|
|
||||||
|
@ -314,6 +314,7 @@ void RegExpTreeDictionary::loadData()
|
|||||||
{
|
{
|
||||||
QueryPipeline pipeline(source_ptr->loadAll());
|
QueryPipeline pipeline(source_ptr->loadAll());
|
||||||
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
||||||
|
pipeline.setConcurrencyControl(false);
|
||||||
|
|
||||||
Block block;
|
Block block;
|
||||||
while (executor.pull(block))
|
while (executor.pull(block))
|
||||||
|
@ -193,6 +193,7 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr
|
|||||||
|
|
||||||
PullingAsyncPipelineExecutor executor(io.pipeline);
|
PullingAsyncPipelineExecutor executor(io.pipeline);
|
||||||
io.pipeline.setProgressCallback(data.getContext()->getProgressCallback());
|
io.pipeline.setProgressCallback(data.getContext()->getProgressCallback());
|
||||||
|
io.pipeline.setConcurrencyControl(data.getContext()->getSettingsRef().use_concurrency_control);
|
||||||
while (block.rows() == 0 && executor.pull(block))
|
while (block.rows() == 0 && executor.pull(block))
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
@ -77,7 +77,6 @@
|
|||||||
#include <IO/Operators.h>
|
#include <IO/Operators.h>
|
||||||
#include <IO/WriteBufferFromString.h>
|
#include <IO/WriteBufferFromString.h>
|
||||||
|
|
||||||
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
|
|
||||||
#include <Processors/QueryPlan/QueryPlan.h>
|
#include <Processors/QueryPlan/QueryPlan.h>
|
||||||
#include <Parsers/formatAST.h>
|
#include <Parsers/formatAST.h>
|
||||||
#include <Parsers/QueryParameterVisitor.h>
|
#include <Parsers/QueryParameterVisitor.h>
|
||||||
|
@ -338,11 +338,8 @@ size_t HashJoin::getTotalRowCount() const
|
|||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t HashJoin::getTotalByteCount() const
|
void HashJoin::doDebugAsserts() const
|
||||||
{
|
{
|
||||||
if (!data)
|
|
||||||
return 0;
|
|
||||||
|
|
||||||
#ifndef NDEBUG
|
#ifndef NDEBUG
|
||||||
size_t debug_blocks_allocated_size = 0;
|
size_t debug_blocks_allocated_size = 0;
|
||||||
for (const auto & block : data->blocks)
|
for (const auto & block : data->blocks)
|
||||||
@ -360,6 +357,14 @@ size_t HashJoin::getTotalByteCount() const
|
|||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "data->blocks_nullmaps_allocated_size != debug_blocks_nullmaps_allocated_size ({} != {})",
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "data->blocks_nullmaps_allocated_size != debug_blocks_nullmaps_allocated_size ({} != {})",
|
||||||
data->blocks_nullmaps_allocated_size, debug_blocks_nullmaps_allocated_size);
|
data->blocks_nullmaps_allocated_size, debug_blocks_nullmaps_allocated_size);
|
||||||
#endif
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t HashJoin::getTotalByteCount() const
|
||||||
|
{
|
||||||
|
if (!data)
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
doDebugAsserts();
|
||||||
|
|
||||||
size_t res = 0;
|
size_t res = 0;
|
||||||
|
|
||||||
@ -544,9 +549,11 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
|
|||||||
have_compressed = true;
|
have_compressed = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
doDebugAsserts();
|
||||||
data->blocks_allocated_size += block_to_save.allocatedBytes();
|
data->blocks_allocated_size += block_to_save.allocatedBytes();
|
||||||
data->blocks.emplace_back(std::move(block_to_save));
|
data->blocks.emplace_back(std::move(block_to_save));
|
||||||
Block * stored_block = &data->blocks.back();
|
Block * stored_block = &data->blocks.back();
|
||||||
|
doDebugAsserts();
|
||||||
|
|
||||||
if (rows)
|
if (rows)
|
||||||
data->empty = false;
|
data->empty = false;
|
||||||
@ -634,9 +641,11 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
|
|||||||
|
|
||||||
if (!flag_per_row && !is_inserted)
|
if (!flag_per_row && !is_inserted)
|
||||||
{
|
{
|
||||||
|
doDebugAsserts();
|
||||||
LOG_TRACE(log, "Skipping inserting block with {} rows", rows);
|
LOG_TRACE(log, "Skipping inserting block with {} rows", rows);
|
||||||
data->blocks_allocated_size -= stored_block->allocatedBytes();
|
data->blocks_allocated_size -= stored_block->allocatedBytes();
|
||||||
data->blocks.pop_back();
|
data->blocks.pop_back();
|
||||||
|
doDebugAsserts();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!check_limits)
|
if (!check_limits)
|
||||||
@ -683,6 +692,8 @@ void HashJoin::shrinkStoredBlocksToFit(size_t & total_bytes_in_join, bool force_
|
|||||||
|
|
||||||
for (auto & stored_block : data->blocks)
|
for (auto & stored_block : data->blocks)
|
||||||
{
|
{
|
||||||
|
doDebugAsserts();
|
||||||
|
|
||||||
size_t old_size = stored_block.allocatedBytes();
|
size_t old_size = stored_block.allocatedBytes();
|
||||||
stored_block = stored_block.shrinkToFit();
|
stored_block = stored_block.shrinkToFit();
|
||||||
size_t new_size = stored_block.allocatedBytes();
|
size_t new_size = stored_block.allocatedBytes();
|
||||||
@ -700,6 +711,8 @@ void HashJoin::shrinkStoredBlocksToFit(size_t & total_bytes_in_join, bool force_
|
|||||||
else
|
else
|
||||||
/// Sometimes after clone resized block can be bigger than original
|
/// Sometimes after clone resized block can be bigger than original
|
||||||
data->blocks_allocated_size += new_size - old_size;
|
data->blocks_allocated_size += new_size - old_size;
|
||||||
|
|
||||||
|
doDebugAsserts();
|
||||||
}
|
}
|
||||||
|
|
||||||
auto new_total_bytes_in_join = getTotalByteCount();
|
auto new_total_bytes_in_join = getTotalByteCount();
|
||||||
@ -1416,7 +1429,13 @@ void HashJoin::tryRerangeRightTableDataImpl(Map & map [[maybe_unused]])
|
|||||||
};
|
};
|
||||||
BlocksList sorted_blocks;
|
BlocksList sorted_blocks;
|
||||||
visit_rows_map(sorted_blocks, map);
|
visit_rows_map(sorted_blocks, map);
|
||||||
|
doDebugAsserts();
|
||||||
data->blocks.swap(sorted_blocks);
|
data->blocks.swap(sorted_blocks);
|
||||||
|
size_t new_blocks_allocated_size = 0;
|
||||||
|
for (const auto & block : data->blocks)
|
||||||
|
new_blocks_allocated_size += block.allocatedBytes();
|
||||||
|
data->blocks_allocated_size = new_blocks_allocated_size;
|
||||||
|
doDebugAsserts();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -470,6 +470,7 @@ private:
|
|||||||
void tryRerangeRightTableData() override;
|
void tryRerangeRightTableData() override;
|
||||||
template <JoinKind KIND, typename Map, JoinStrictness STRICTNESS>
|
template <JoinKind KIND, typename Map, JoinStrictness STRICTNESS>
|
||||||
void tryRerangeRightTableDataImpl(Map & map);
|
void tryRerangeRightTableDataImpl(Map & map);
|
||||||
|
void doDebugAsserts() const;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -26,6 +26,8 @@
|
|||||||
#include <Analyzer/TableFunctionNode.h>
|
#include <Analyzer/TableFunctionNode.h>
|
||||||
#include <Analyzer/Utils.h>
|
#include <Analyzer/Utils.h>
|
||||||
|
|
||||||
|
#include <Core/Settings.h>
|
||||||
|
|
||||||
#include <Interpreters/Context.h>
|
#include <Interpreters/Context.h>
|
||||||
#include <Interpreters/QueryLog.h>
|
#include <Interpreters/QueryLog.h>
|
||||||
|
|
||||||
@ -249,6 +251,8 @@ QueryPipelineBuilder InterpreterSelectQueryAnalyzer::buildQueryPipeline()
|
|||||||
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(context);
|
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(context);
|
||||||
auto build_pipeline_settings = BuildQueryPipelineSettings::fromContext(context);
|
auto build_pipeline_settings = BuildQueryPipelineSettings::fromContext(context);
|
||||||
|
|
||||||
|
query_plan.setConcurrencyControl(context->getSettingsRef().use_concurrency_control);
|
||||||
|
|
||||||
return std::move(*query_plan.buildQueryPipeline(optimization_settings, build_pipeline_settings));
|
return std::move(*query_plan.buildQueryPipeline(optimization_settings, build_pipeline_settings));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -208,6 +208,7 @@ bool isStorageTouchedByMutations(
|
|||||||
}
|
}
|
||||||
|
|
||||||
PullingAsyncPipelineExecutor executor(io.pipeline);
|
PullingAsyncPipelineExecutor executor(io.pipeline);
|
||||||
|
io.pipeline.setConcurrencyControl(context->getSettingsRef().use_concurrency_control);
|
||||||
|
|
||||||
Block block;
|
Block block;
|
||||||
while (block.rows() == 0 && executor.pull(block));
|
while (block.rows() == 0 && executor.pull(block));
|
||||||
@ -1286,6 +1287,10 @@ void MutationsInterpreter::Source::read(
|
|||||||
|
|
||||||
void MutationsInterpreter::initQueryPlan(Stage & first_stage, QueryPlan & plan)
|
void MutationsInterpreter::initQueryPlan(Stage & first_stage, QueryPlan & plan)
|
||||||
{
|
{
|
||||||
|
// Mutations are not using concurrency control now. Queries, merges and mutations running together could lead to CPU overcommit.
|
||||||
|
// TODO(serxa): Enable concurrency control for mutation queries and mutations. This should be done after CPU scheduler introduction.
|
||||||
|
plan.setConcurrencyControl(false);
|
||||||
|
|
||||||
source.read(first_stage, plan, metadata_snapshot, context, settings.apply_deleted_mask, settings.can_execute);
|
source.read(first_stage, plan, metadata_snapshot, context, settings.apply_deleted_mask, settings.can_execute);
|
||||||
addCreatingSetsStep(plan, first_stage.analyzer->getPreparedSets(), context);
|
addCreatingSetsStep(plan, first_stage.analyzer->getPreparedSets(), context);
|
||||||
}
|
}
|
||||||
|
@ -722,7 +722,14 @@ void ASTFunction::formatImplWithoutAlias(const FormatSettings & settings, Format
|
|||||||
assert_cast<const ASTFunction *>(argument.get())->arguments->children[0]->formatImpl(settings, state, nested_dont_need_parens);
|
assert_cast<const ASTFunction *>(argument.get())->arguments->children[0]->formatImpl(settings, state, nested_dont_need_parens);
|
||||||
settings.ostr << (settings.hilite ? hilite_operator : "") << " = " << (settings.hilite ? hilite_none : "");
|
settings.ostr << (settings.hilite ? hilite_operator : "") << " = " << (settings.hilite ? hilite_none : "");
|
||||||
}
|
}
|
||||||
|
if (!secret_arguments.replacement.empty())
|
||||||
|
{
|
||||||
|
settings.ostr << "'" << secret_arguments.replacement << "'";
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
settings.ostr << "'[HIDDEN]'";
|
settings.ostr << "'[HIDDEN]'";
|
||||||
|
}
|
||||||
if (size <= secret_arguments.start + secret_arguments.count && !secret_arguments.are_named)
|
if (size <= secret_arguments.start + secret_arguments.count && !secret_arguments.are_named)
|
||||||
break; /// All other arguments should also be hidden.
|
break; /// All other arguments should also be hidden.
|
||||||
continue;
|
continue;
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <Common/KnownObjectNames.h>
|
#include <Common/KnownObjectNames.h>
|
||||||
|
#include <Common/re2.h>
|
||||||
#include <Core/QualifiedTableName.h>
|
#include <Core/QualifiedTableName.h>
|
||||||
#include <base/defines.h>
|
#include <base/defines.h>
|
||||||
#include <boost/algorithm/string/predicate.hpp>
|
#include <boost/algorithm/string/predicate.hpp>
|
||||||
@ -49,6 +50,11 @@ public:
|
|||||||
bool are_named = false; /// Arguments like `password = 'password'` are considered as named arguments.
|
bool are_named = false; /// Arguments like `password = 'password'` are considered as named arguments.
|
||||||
/// E.g. "headers" in `url('..', headers('foo' = '[HIDDEN]'))`
|
/// E.g. "headers" in `url('..', headers('foo' = '[HIDDEN]'))`
|
||||||
std::vector<std::string> nested_maps;
|
std::vector<std::string> nested_maps;
|
||||||
|
/// Full replacement of an argument. Only supported when count is 1, otherwise all arguments will be replaced with this string.
|
||||||
|
/// It's needed in cases when we don't want to hide the entire parameter, but some part of it, e.g. "connection_string" in
|
||||||
|
/// `azureBlobStorage('DefaultEndpointsProtocol=https;AccountKey=secretkey;...', ...)` should be replaced with
|
||||||
|
/// `azureBlobStorage('DefaultEndpointsProtocol=https;AccountKey=[HIDDEN];...', ...)`.
|
||||||
|
std::string replacement;
|
||||||
|
|
||||||
bool hasSecrets() const
|
bool hasSecrets() const
|
||||||
{
|
{
|
||||||
@ -74,6 +80,7 @@ protected:
|
|||||||
result.are_named = argument_is_named;
|
result.are_named = argument_is_named;
|
||||||
}
|
}
|
||||||
chassert(index >= result.start); /// We always check arguments consecutively
|
chassert(index >= result.start); /// We always check arguments consecutively
|
||||||
|
chassert(result.replacement.empty()); /// We shouldn't use replacement with masking other arguments
|
||||||
result.count = index + 1 - result.start;
|
result.count = index + 1 - result.start;
|
||||||
if (!argument_is_named)
|
if (!argument_is_named)
|
||||||
result.are_named = false;
|
result.are_named = false;
|
||||||
@ -199,32 +206,39 @@ protected:
|
|||||||
|
|
||||||
void findAzureBlobStorageFunctionSecretArguments(bool is_cluster_function)
|
void findAzureBlobStorageFunctionSecretArguments(bool is_cluster_function)
|
||||||
{
|
{
|
||||||
/// azureBlobStorage('cluster_name', 'conn_string/storage_account_url', ...) has 'conn_string/storage_account_url' as its second argument.
|
/// azureBlobStorageCluster('cluster_name', 'conn_string/storage_account_url', ...) has 'conn_string/storage_account_url' as its second argument.
|
||||||
size_t url_arg_idx = is_cluster_function ? 1 : 0;
|
size_t url_arg_idx = is_cluster_function ? 1 : 0;
|
||||||
|
|
||||||
if (!is_cluster_function && isNamedCollectionName(0))
|
if (!is_cluster_function && isNamedCollectionName(0))
|
||||||
{
|
{
|
||||||
/// azureBlobStorage(named_collection, ..., account_key = 'account_key', ...)
|
/// azureBlobStorage(named_collection, ..., account_key = 'account_key', ...)
|
||||||
|
if (maskAzureConnectionString(-1, true, 1))
|
||||||
|
return;
|
||||||
findSecretNamedArgument("account_key", 1);
|
findSecretNamedArgument("account_key", 1);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
else if (is_cluster_function && isNamedCollectionName(1))
|
else if (is_cluster_function && isNamedCollectionName(1))
|
||||||
{
|
{
|
||||||
/// azureBlobStorageCluster(cluster, named_collection, ..., account_key = 'account_key', ...)
|
/// azureBlobStorageCluster(cluster, named_collection, ..., account_key = 'account_key', ...)
|
||||||
|
if (maskAzureConnectionString(-1, true, 2))
|
||||||
|
return;
|
||||||
findSecretNamedArgument("account_key", 2);
|
findSecretNamedArgument("account_key", 2);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// We should check other arguments first because we don't need to do any replacement in case storage_account_url is not used
|
if (maskAzureConnectionString(url_arg_idx))
|
||||||
/// azureBlobStorage(connection_string|storage_account_url, container_name, blobpath, account_name, account_key, format, compression, structure)
|
return;
|
||||||
/// azureBlobStorageCluster(cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure])
|
|
||||||
|
/// We should check other arguments first because we don't need to do any replacement in case of
|
||||||
|
/// azureBlobStorage(connection_string|storage_account_url, container_name, blobpath, format, [account_name, account_key, ...])
|
||||||
|
/// azureBlobStorageCluster(cluster, connection_string|storage_account_url, container_name, blobpath, format, [account_name, account_key, ...])
|
||||||
size_t count = function->arguments->size();
|
size_t count = function->arguments->size();
|
||||||
if ((url_arg_idx + 4 <= count) && (count <= url_arg_idx + 7))
|
if ((url_arg_idx + 4 <= count) && (count <= url_arg_idx + 7))
|
||||||
{
|
{
|
||||||
String second_arg;
|
String fourth_arg;
|
||||||
if (tryGetStringFromArgument(url_arg_idx + 3, &second_arg))
|
if (tryGetStringFromArgument(url_arg_idx + 3, &fourth_arg))
|
||||||
{
|
{
|
||||||
if (second_arg == "auto" || KnownFormatNames::instance().exists(second_arg))
|
if (fourth_arg == "auto" || KnownFormatNames::instance().exists(fourth_arg))
|
||||||
return; /// The argument after 'url' is a format: s3('url', 'format', ...)
|
return; /// The argument after 'url' is a format: s3('url', 'format', ...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -234,6 +248,40 @@ protected:
|
|||||||
markSecretArgument(url_arg_idx + 4);
|
markSecretArgument(url_arg_idx + 4);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool maskAzureConnectionString(ssize_t url_arg_idx, bool argument_is_named = false, size_t start = 0)
|
||||||
|
{
|
||||||
|
String url_arg;
|
||||||
|
if (argument_is_named)
|
||||||
|
{
|
||||||
|
url_arg_idx = findNamedArgument(&url_arg, "connection_string", start);
|
||||||
|
if (url_arg_idx == -1 || url_arg.empty())
|
||||||
|
url_arg_idx = findNamedArgument(&url_arg, "storage_account_url", start);
|
||||||
|
if (url_arg_idx == -1 || url_arg.empty())
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (!tryGetStringFromArgument(url_arg_idx, &url_arg))
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!url_arg.starts_with("http"))
|
||||||
|
{
|
||||||
|
static re2::RE2 account_key_pattern = "AccountKey=.*?(;|$)";
|
||||||
|
if (RE2::Replace(&url_arg, account_key_pattern, "AccountKey=[HIDDEN]\\1"))
|
||||||
|
{
|
||||||
|
chassert(result.count == 0); /// We shouldn't use replacement with masking other arguments
|
||||||
|
result.start = url_arg_idx;
|
||||||
|
result.are_named = argument_is_named;
|
||||||
|
result.count = 1;
|
||||||
|
result.replacement = url_arg;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
void findURLSecretArguments()
|
void findURLSecretArguments()
|
||||||
{
|
{
|
||||||
if (!isNamedCollectionName(0))
|
if (!isNamedCollectionName(0))
|
||||||
@ -513,8 +561,9 @@ protected:
|
|||||||
return function->arguments->at(arg_idx)->isIdentifier();
|
return function->arguments->at(arg_idx)->isIdentifier();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Looks for a secret argument with a specified name. This function looks for arguments in format `key=value` where the key is specified.
|
/// Looks for an argument with a specified name. This function looks for arguments in format `key=value` where the key is specified.
|
||||||
void findSecretNamedArgument(const std::string_view & key, size_t start = 0)
|
/// Returns -1 if no argument was found.
|
||||||
|
ssize_t findNamedArgument(String * res, const std::string_view & key, size_t start = 0)
|
||||||
{
|
{
|
||||||
for (size_t i = start; i < function->arguments->size(); ++i)
|
for (size_t i = start; i < function->arguments->size(); ++i)
|
||||||
{
|
{
|
||||||
@ -531,9 +580,23 @@ protected:
|
|||||||
continue;
|
continue;
|
||||||
|
|
||||||
if (found_key == key)
|
if (found_key == key)
|
||||||
markSecretArgument(i, /* argument_is_named= */ true);
|
{
|
||||||
|
tryGetStringFromArgument(*equals_func->arguments->at(1), res);
|
||||||
|
return i;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Looks for a secret argument with a specified name. This function looks for arguments in format `key=value` where the key is specified.
|
||||||
|
/// If the argument is found, it is marked as a secret.
|
||||||
|
void findSecretNamedArgument(const std::string_view & key, size_t start = 0)
|
||||||
|
{
|
||||||
|
ssize_t arg_idx = findNamedArgument(nullptr, key, start);
|
||||||
|
if (arg_idx >= 0)
|
||||||
|
markSecretArgument(arg_idx, /* argument_is_named= */ true);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
#include <IO/WriteBufferFromString.h>
|
#include <IO/WriteBufferFromString.h>
|
||||||
|
#include "Common/ISlotControl.h"
|
||||||
#include <Common/ThreadPool.h>
|
#include <Common/ThreadPool.h>
|
||||||
#include <Common/CurrentThread.h>
|
#include <Common/CurrentThread.h>
|
||||||
#include <Common/CurrentMetrics.h>
|
#include <Common/CurrentMetrics.h>
|
||||||
@ -342,12 +343,22 @@ void PipelineExecutor::initializeExecution(size_t num_threads, bool concurrency_
|
|||||||
is_execution_initialized = true;
|
is_execution_initialized = true;
|
||||||
tryUpdateExecutionStatus(ExecutionStatus::NotStarted, ExecutionStatus::Executing);
|
tryUpdateExecutionStatus(ExecutionStatus::NotStarted, ExecutionStatus::Executing);
|
||||||
|
|
||||||
size_t use_threads = num_threads;
|
if (concurrency_control)
|
||||||
|
{
|
||||||
/// Allocate CPU slots from concurrency control
|
/// Allocate CPU slots from concurrency control
|
||||||
size_t min_threads = concurrency_control ? 1uz : num_threads;
|
constexpr size_t min_threads = 1uz; // Number of threads that should be granted to every query no matter how many threads are already running in other queries
|
||||||
cpu_slots = ConcurrencyControl::instance().allocate(min_threads, num_threads);
|
cpu_slots = ConcurrencyControl::instance().allocate(min_threads, num_threads);
|
||||||
use_threads = cpu_slots->grantedCount();
|
#ifndef NDEBUG
|
||||||
|
LOG_TEST(log, "Allocate CPU slots. min: {}, max: {}, granted: {}", min_threads, num_threads, cpu_slots->grantedCount());
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/// If concurrency control is not used we should not even count threads as competing.
|
||||||
|
/// To avoid counting them in ConcurrencyControl, we create dummy slot allocation.
|
||||||
|
cpu_slots = grantSlots(num_threads);
|
||||||
|
}
|
||||||
|
size_t use_threads = cpu_slots->grantedCount();
|
||||||
|
|
||||||
Queue queue;
|
Queue queue;
|
||||||
graph->initializeExecution(queue);
|
graph->initializeExecution(queue);
|
||||||
|
@ -199,6 +199,7 @@ QueryPipelineBuilderPtr QueryPlan::buildQueryPipeline(
|
|||||||
last_pipeline->setProgressCallback(build_pipeline_settings.progress_callback);
|
last_pipeline->setProgressCallback(build_pipeline_settings.progress_callback);
|
||||||
last_pipeline->setProcessListElement(build_pipeline_settings.process_list_element);
|
last_pipeline->setProcessListElement(build_pipeline_settings.process_list_element);
|
||||||
last_pipeline->addResources(std::move(resources));
|
last_pipeline->addResources(std::move(resources));
|
||||||
|
last_pipeline->setConcurrencyControl(getConcurrencyControl());
|
||||||
|
|
||||||
return last_pipeline;
|
return last_pipeline;
|
||||||
}
|
}
|
||||||
|
@ -300,7 +300,8 @@ QueryPipelineBuilder QueryPipelineBuilder::unitePipelines(
|
|||||||
/// It may happen if max_distributed_connections > max_threads
|
/// It may happen if max_distributed_connections > max_threads
|
||||||
max_threads_limit = std::max(pipeline.max_threads, max_threads_limit);
|
max_threads_limit = std::max(pipeline.max_threads, max_threads_limit);
|
||||||
|
|
||||||
concurrency_control = pipeline.getConcurrencyControl();
|
// Use concurrency control if at least one of pipelines is using it
|
||||||
|
concurrency_control = concurrency_control || pipeline.getConcurrencyControl();
|
||||||
}
|
}
|
||||||
|
|
||||||
QueryPipelineBuilder pipeline;
|
QueryPipelineBuilder pipeline;
|
||||||
@ -311,8 +312,8 @@ QueryPipelineBuilder QueryPipelineBuilder::unitePipelines(
|
|||||||
{
|
{
|
||||||
pipeline.setMaxThreads(max_threads);
|
pipeline.setMaxThreads(max_threads);
|
||||||
pipeline.limitMaxThreads(max_threads_limit);
|
pipeline.limitMaxThreads(max_threads_limit);
|
||||||
pipeline.setConcurrencyControl(concurrency_control);
|
|
||||||
}
|
}
|
||||||
|
pipeline.setConcurrencyControl(concurrency_control);
|
||||||
|
|
||||||
pipeline.setCollectedProcessors(nullptr);
|
pipeline.setCollectedProcessors(nullptr);
|
||||||
return pipeline;
|
return pipeline;
|
||||||
|
@ -1232,6 +1232,7 @@ namespace
|
|||||||
if (io.pipeline.pulling())
|
if (io.pipeline.pulling())
|
||||||
{
|
{
|
||||||
auto executor = std::make_shared<PullingAsyncPipelineExecutor>(io.pipeline);
|
auto executor = std::make_shared<PullingAsyncPipelineExecutor>(io.pipeline);
|
||||||
|
io.pipeline.setConcurrencyControl(query_context->getSettingsRef().use_concurrency_control);
|
||||||
auto check_for_cancel = [&]
|
auto check_for_cancel = [&]
|
||||||
{
|
{
|
||||||
if (isQueryCancelled())
|
if (isQueryCancelled())
|
||||||
|
@ -1086,6 +1086,7 @@ void TCPHandler::processOrdinaryQuery()
|
|||||||
|
|
||||||
{
|
{
|
||||||
PullingAsyncPipelineExecutor executor(pipeline);
|
PullingAsyncPipelineExecutor executor(pipeline);
|
||||||
|
pipeline.setConcurrencyControl(query_context->getSettingsRef().use_concurrency_control);
|
||||||
CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread};
|
CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread};
|
||||||
|
|
||||||
/// The following may happen:
|
/// The following may happen:
|
||||||
|
@ -431,6 +431,7 @@ void StorageLiveView::writeBlock(StorageLiveView & live_view, Block && block, Ch
|
|||||||
|
|
||||||
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
|
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
|
||||||
PullingAsyncPipelineExecutor executor(pipeline);
|
PullingAsyncPipelineExecutor executor(pipeline);
|
||||||
|
pipeline.setConcurrencyControl(local_context->getSettingsRef().use_concurrency_control);
|
||||||
Block this_block;
|
Block this_block;
|
||||||
|
|
||||||
while (executor.pull(this_block))
|
while (executor.pull(this_block))
|
||||||
@ -567,6 +568,7 @@ MergeableBlocksPtr StorageLiveView::collectMergeableBlocks(ContextPtr local_cont
|
|||||||
|
|
||||||
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
|
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
|
||||||
PullingAsyncPipelineExecutor executor(pipeline);
|
PullingAsyncPipelineExecutor executor(pipeline);
|
||||||
|
pipeline.setConcurrencyControl(local_context->getSettingsRef().use_concurrency_control);
|
||||||
Block this_block;
|
Block this_block;
|
||||||
|
|
||||||
while (executor.pull(this_block))
|
while (executor.pull(this_block))
|
||||||
@ -672,6 +674,7 @@ bool StorageLiveView::getNewBlocks(const std::lock_guard<std::mutex> & lock)
|
|||||||
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
|
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
|
||||||
|
|
||||||
PullingAsyncPipelineExecutor executor(pipeline);
|
PullingAsyncPipelineExecutor executor(pipeline);
|
||||||
|
pipeline.setConcurrencyControl(false);
|
||||||
Block block;
|
Block block;
|
||||||
while (executor.pull(block))
|
while (executor.pull(block))
|
||||||
{
|
{
|
||||||
|
@ -1727,6 +1727,10 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const
|
|||||||
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context);
|
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context);
|
||||||
auto builder = merge_parts_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings);
|
auto builder = merge_parts_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings);
|
||||||
|
|
||||||
|
// Merges are not using concurrency control now. Queries and merges running together could lead to CPU overcommit.
|
||||||
|
// TODO(serxa): Enable concurrency control for merges. This should be done after CPU scheduler introduction.
|
||||||
|
builder->setConcurrencyControl(false);
|
||||||
|
|
||||||
global_ctx->merged_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
|
global_ctx->merged_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2057,6 +2057,7 @@ bool MutateTask::prepare()
|
|||||||
context_for_reading->setSetting("apply_mutations_on_fly", false);
|
context_for_reading->setSetting("apply_mutations_on_fly", false);
|
||||||
/// Skip using large sets in KeyCondition
|
/// Skip using large sets in KeyCondition
|
||||||
context_for_reading->setSetting("use_index_for_in_with_subqueries_max_values", 100000);
|
context_for_reading->setSetting("use_index_for_in_with_subqueries_max_values", 100000);
|
||||||
|
context_for_reading->setSetting("use_concurrency_control", false);
|
||||||
|
|
||||||
for (const auto & command : *ctx->commands)
|
for (const auto & command : *ctx->commands)
|
||||||
if (!canSkipMutationCommandForPart(ctx->source_part, command, context_for_reading))
|
if (!canSkipMutationCommandForPart(ctx->source_part, command, context_for_reading))
|
||||||
|
@ -223,7 +223,7 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
|
|||||||
{
|
{
|
||||||
account_name = fourth_arg;
|
account_name = fourth_arg;
|
||||||
account_key = checkAndGetLiteralArgument<String>(engine_args[4], "account_key");
|
account_key = checkAndGetLiteralArgument<String>(engine_args[4], "account_key");
|
||||||
auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format/account_name");
|
auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format/structure");
|
||||||
if (is_format_arg(sixth_arg))
|
if (is_format_arg(sixth_arg))
|
||||||
{
|
{
|
||||||
format = sixth_arg;
|
format = sixth_arg;
|
||||||
@ -257,10 +257,10 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
|
|||||||
}
|
}
|
||||||
else if (with_structure && engine_args.size() == 8)
|
else if (with_structure && engine_args.size() == 8)
|
||||||
{
|
{
|
||||||
auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "format/account_name");
|
auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "account_name");
|
||||||
account_name = fourth_arg;
|
account_name = fourth_arg;
|
||||||
account_key = checkAndGetLiteralArgument<String>(engine_args[4], "account_key");
|
account_key = checkAndGetLiteralArgument<String>(engine_args[4], "account_key");
|
||||||
auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format/account_name");
|
auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format");
|
||||||
if (!is_format_arg(sixth_arg))
|
if (!is_format_arg(sixth_arg))
|
||||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown format {}", sixth_arg);
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown format {}", sixth_arg);
|
||||||
format = sixth_arg;
|
format = sixth_arg;
|
||||||
|
@ -131,7 +131,7 @@ std::shared_ptr<StorageObjectStorageSource::IIterator> StorageObjectStorageSourc
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
ConfigurationPtr copy_configuration = configuration->clone();
|
ConfigurationPtr copy_configuration = configuration->clone();
|
||||||
auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
|
auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, local_context);
|
||||||
if (filter_dag)
|
if (filter_dag)
|
||||||
{
|
{
|
||||||
auto keys = configuration->getPaths();
|
auto keys = configuration->getPaths();
|
||||||
@ -142,7 +142,7 @@ std::shared_ptr<StorageObjectStorageSource::IIterator> StorageObjectStorageSourc
|
|||||||
|
|
||||||
VirtualColumnUtils::buildSetsForDAG(*filter_dag, local_context);
|
VirtualColumnUtils::buildSetsForDAG(*filter_dag, local_context);
|
||||||
auto actions = std::make_shared<ExpressionActions>(std::move(*filter_dag));
|
auto actions = std::make_shared<ExpressionActions>(std::move(*filter_dag));
|
||||||
VirtualColumnUtils::filterByPathOrFile(keys, paths, actions, virtual_columns);
|
VirtualColumnUtils::filterByPathOrFile(keys, paths, actions, virtual_columns, local_context);
|
||||||
copy_configuration->setPaths(keys);
|
copy_configuration->setPaths(keys);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -489,6 +489,7 @@ StorageObjectStorageSource::GlobIterator::GlobIterator(
|
|||||||
, virtual_columns(virtual_columns_)
|
, virtual_columns(virtual_columns_)
|
||||||
, throw_on_zero_files_match(throw_on_zero_files_match_)
|
, throw_on_zero_files_match(throw_on_zero_files_match_)
|
||||||
, read_keys(read_keys_)
|
, read_keys(read_keys_)
|
||||||
|
, local_context(context_)
|
||||||
, file_progress_callback(file_progress_callback_)
|
, file_progress_callback(file_progress_callback_)
|
||||||
{
|
{
|
||||||
if (configuration->isNamespaceWithGlobs())
|
if (configuration->isNamespaceWithGlobs())
|
||||||
@ -510,7 +511,7 @@ StorageObjectStorageSource::GlobIterator::GlobIterator(
|
|||||||
}
|
}
|
||||||
|
|
||||||
recursive = key_with_globs == "/**";
|
recursive = key_with_globs == "/**";
|
||||||
if (auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns))
|
if (auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, local_context))
|
||||||
{
|
{
|
||||||
VirtualColumnUtils::buildSetsForDAG(*filter_dag, getContext());
|
VirtualColumnUtils::buildSetsForDAG(*filter_dag, getContext());
|
||||||
filter_expr = std::make_shared<ExpressionActions>(std::move(*filter_dag));
|
filter_expr = std::make_shared<ExpressionActions>(std::move(*filter_dag));
|
||||||
@ -585,7 +586,7 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::GlobIterator::ne
|
|||||||
for (const auto & object_info : new_batch)
|
for (const auto & object_info : new_batch)
|
||||||
paths.push_back(getUniqueStoragePathIdentifier(*configuration, *object_info, false));
|
paths.push_back(getUniqueStoragePathIdentifier(*configuration, *object_info, false));
|
||||||
|
|
||||||
VirtualColumnUtils::filterByPathOrFile(new_batch, paths, filter_expr, virtual_columns);
|
VirtualColumnUtils::filterByPathOrFile(new_batch, paths, filter_expr, virtual_columns, local_context);
|
||||||
|
|
||||||
LOG_TEST(logger, "Filtered files: {} -> {}", paths.size(), new_batch.size());
|
LOG_TEST(logger, "Filtered files: {} -> {}", paths.size(), new_batch.size());
|
||||||
}
|
}
|
||||||
|
@ -220,6 +220,7 @@ private:
|
|||||||
bool is_finished = false;
|
bool is_finished = false;
|
||||||
bool first_iteration = true;
|
bool first_iteration = true;
|
||||||
std::mutex next_mutex;
|
std::mutex next_mutex;
|
||||||
|
const ContextPtr local_context;
|
||||||
|
|
||||||
std::function<void(FileProgress)> file_progress_callback;
|
std::function<void(FileProgress)> file_progress_callback;
|
||||||
};
|
};
|
||||||
|
@ -1141,13 +1141,13 @@ StorageFileSource::FilesIterator::FilesIterator(
|
|||||||
{
|
{
|
||||||
std::optional<ActionsDAG> filter_dag;
|
std::optional<ActionsDAG> filter_dag;
|
||||||
if (!distributed_processing && !archive_info && !files.empty())
|
if (!distributed_processing && !archive_info && !files.empty())
|
||||||
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
|
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, context_);
|
||||||
|
|
||||||
if (filter_dag)
|
if (filter_dag)
|
||||||
{
|
{
|
||||||
VirtualColumnUtils::buildSetsForDAG(*filter_dag, context_);
|
VirtualColumnUtils::buildSetsForDAG(*filter_dag, context_);
|
||||||
auto actions = std::make_shared<ExpressionActions>(std::move(*filter_dag));
|
auto actions = std::make_shared<ExpressionActions>(std::move(*filter_dag));
|
||||||
VirtualColumnUtils::filterByPathOrFile(files, files, actions, virtual_columns);
|
VirtualColumnUtils::filterByPathOrFile(files, files, actions, virtual_columns, context_);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -227,7 +227,7 @@ public:
|
|||||||
|
|
||||||
std::optional<ActionsDAG> filter_dag;
|
std::optional<ActionsDAG> filter_dag;
|
||||||
if (!uris.empty())
|
if (!uris.empty())
|
||||||
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
|
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, context);
|
||||||
|
|
||||||
if (filter_dag)
|
if (filter_dag)
|
||||||
{
|
{
|
||||||
@ -238,7 +238,7 @@ public:
|
|||||||
|
|
||||||
VirtualColumnUtils::buildSetsForDAG(*filter_dag, context);
|
VirtualColumnUtils::buildSetsForDAG(*filter_dag, context);
|
||||||
auto actions = std::make_shared<ExpressionActions>(std::move(*filter_dag));
|
auto actions = std::make_shared<ExpressionActions>(std::move(*filter_dag));
|
||||||
VirtualColumnUtils::filterByPathOrFile(uris, paths, actions, virtual_columns);
|
VirtualColumnUtils::filterByPathOrFile(uris, paths, actions, virtual_columns, context);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
#include <memory>
|
#include <memory>
|
||||||
#include <stack>
|
#include <stack>
|
||||||
|
#include <unordered_set>
|
||||||
#include <Core/NamesAndTypes.h>
|
#include <Core/NamesAndTypes.h>
|
||||||
#include <Core/TypeId.h>
|
#include <Core/TypeId.h>
|
||||||
|
|
||||||
@ -46,6 +47,7 @@
|
|||||||
#include "Functions/IFunction.h"
|
#include "Functions/IFunction.h"
|
||||||
#include "Functions/IFunctionAdaptors.h"
|
#include "Functions/IFunctionAdaptors.h"
|
||||||
#include "Functions/indexHint.h"
|
#include "Functions/indexHint.h"
|
||||||
|
#include <IO/ReadBufferFromString.h>
|
||||||
#include <Interpreters/convertFieldToType.h>
|
#include <Interpreters/convertFieldToType.h>
|
||||||
#include <Parsers/makeASTForLogicalFunction.h>
|
#include <Parsers/makeASTForLogicalFunction.h>
|
||||||
#include <Columns/ColumnSet.h>
|
#include <Columns/ColumnSet.h>
|
||||||
@ -124,9 +126,18 @@ void filterBlockWithExpression(const ExpressionActionsPtr & actions, Block & blo
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
NamesAndTypesList getCommonVirtualsForFileLikeStorage()
|
||||||
|
{
|
||||||
|
return {{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
|
||||||
|
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
|
||||||
|
{"_size", makeNullable(std::make_shared<DataTypeUInt64>())},
|
||||||
|
{"_time", makeNullable(std::make_shared<DataTypeDateTime>())},
|
||||||
|
{"_etag", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
|
||||||
|
}
|
||||||
|
|
||||||
NameSet getVirtualNamesForFileLikeStorage()
|
NameSet getVirtualNamesForFileLikeStorage()
|
||||||
{
|
{
|
||||||
return {"_path", "_file", "_size", "_time", "_etag"};
|
return getCommonVirtualsForFileLikeStorage().getNameSet();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unordered_map<std::string, std::string> parseHivePartitioningKeysAndValues(const String & path)
|
std::unordered_map<std::string, std::string> parseHivePartitioningKeysAndValues(const String & path)
|
||||||
@ -154,8 +165,10 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(ColumnsDescription & sto
|
|||||||
{
|
{
|
||||||
VirtualColumnsDescription desc;
|
VirtualColumnsDescription desc;
|
||||||
|
|
||||||
auto add_virtual = [&](const auto & name, const auto & type)
|
auto add_virtual = [&](const NameAndTypePair & pair)
|
||||||
{
|
{
|
||||||
|
const auto & name = pair.getNameInStorage();
|
||||||
|
const auto & type = pair.getTypeInStorage();
|
||||||
if (storage_columns.has(name))
|
if (storage_columns.has(name))
|
||||||
{
|
{
|
||||||
if (!context->getSettingsRef().use_hive_partitioning)
|
if (!context->getSettingsRef().use_hive_partitioning)
|
||||||
@ -172,11 +185,8 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(ColumnsDescription & sto
|
|||||||
desc.addEphemeral(name, type, "");
|
desc.addEphemeral(name, type, "");
|
||||||
};
|
};
|
||||||
|
|
||||||
add_virtual("_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()));
|
for (const auto & item : getCommonVirtualsForFileLikeStorage())
|
||||||
add_virtual("_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()));
|
add_virtual(item);
|
||||||
add_virtual("_size", makeNullable(std::make_shared<DataTypeUInt64>()));
|
|
||||||
add_virtual("_time", makeNullable(std::make_shared<DataTypeDateTime>()));
|
|
||||||
add_virtual("_etag", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()));
|
|
||||||
|
|
||||||
if (context->getSettingsRef().use_hive_partitioning)
|
if (context->getSettingsRef().use_hive_partitioning)
|
||||||
{
|
{
|
||||||
@ -188,16 +198,16 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(ColumnsDescription & sto
|
|||||||
if (type == nullptr)
|
if (type == nullptr)
|
||||||
type = std::make_shared<DataTypeString>();
|
type = std::make_shared<DataTypeString>();
|
||||||
if (type->canBeInsideLowCardinality())
|
if (type->canBeInsideLowCardinality())
|
||||||
add_virtual(item.first, std::make_shared<DataTypeLowCardinality>(type));
|
add_virtual({item.first, std::make_shared<DataTypeLowCardinality>(type)});
|
||||||
else
|
else
|
||||||
add_virtual(item.first, type);
|
add_virtual({item.first, type});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return desc;
|
return desc;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void addPathAndFileToVirtualColumns(Block & block, const String & path, size_t idx)
|
static void addPathAndFileToVirtualColumns(Block & block, const String & path, size_t idx, const FormatSettings & format_settings, bool use_hive_partitioning)
|
||||||
{
|
{
|
||||||
if (block.has("_path"))
|
if (block.has("_path"))
|
||||||
block.getByName("_path").column->assumeMutableRef().insert(path);
|
block.getByName("_path").column->assumeMutableRef().insert(path);
|
||||||
@ -214,18 +224,34 @@ static void addPathAndFileToVirtualColumns(Block & block, const String & path, s
|
|||||||
block.getByName("_file").column->assumeMutableRef().insert(file);
|
block.getByName("_file").column->assumeMutableRef().insert(file);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (use_hive_partitioning)
|
||||||
|
{
|
||||||
|
auto keys_and_values = parseHivePartitioningKeysAndValues(path);
|
||||||
|
for (const auto & [key, value] : keys_and_values)
|
||||||
|
{
|
||||||
|
if (const auto * column = block.findByName(key))
|
||||||
|
{
|
||||||
|
ReadBufferFromString buf(value);
|
||||||
|
column->type->getDefaultSerialization()->deserializeWholeText(column->column->assumeMutableRef(), buf, format_settings);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
block.getByName("_idx").column->assumeMutableRef().insert(idx);
|
block.getByName("_idx").column->assumeMutableRef().insert(idx);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns)
|
std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
|
||||||
{
|
{
|
||||||
if (!predicate || virtual_columns.empty())
|
if (!predicate || virtual_columns.empty())
|
||||||
return {};
|
return {};
|
||||||
|
|
||||||
Block block;
|
Block block;
|
||||||
|
NameSet common_virtuals;
|
||||||
|
if (context->getSettingsRef().use_hive_partitioning)
|
||||||
|
common_virtuals = getVirtualNamesForFileLikeStorage();
|
||||||
for (const auto & column : virtual_columns)
|
for (const auto & column : virtual_columns)
|
||||||
{
|
{
|
||||||
if (column.name == "_file" || column.name == "_path")
|
if (column.name == "_file" || column.name == "_path" || !common_virtuals.contains(column.name))
|
||||||
block.insert({column.type->createColumn(), column.type, column.name});
|
block.insert({column.type->createColumn(), column.type, column.name});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -233,18 +259,19 @@ std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * pr
|
|||||||
return splitFilterDagForAllowedInputs(predicate, &block);
|
return splitFilterDagForAllowedInputs(predicate, &block);
|
||||||
}
|
}
|
||||||
|
|
||||||
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns)
|
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
|
||||||
{
|
{
|
||||||
Block block;
|
Block block;
|
||||||
|
NameSet common_virtuals = getVirtualNamesForFileLikeStorage();
|
||||||
for (const auto & column : virtual_columns)
|
for (const auto & column : virtual_columns)
|
||||||
{
|
{
|
||||||
if (column.name == "_file" || column.name == "_path")
|
if (column.name == "_file" || column.name == "_path" || !common_virtuals.contains(column.name))
|
||||||
block.insert({column.type->createColumn(), column.type, column.name});
|
block.insert({column.type->createColumn(), column.type, column.name});
|
||||||
}
|
}
|
||||||
block.insert({ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "_idx"});
|
block.insert({ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "_idx"});
|
||||||
|
|
||||||
for (size_t i = 0; i != paths.size(); ++i)
|
for (size_t i = 0; i != paths.size(); ++i)
|
||||||
addPathAndFileToVirtualColumns(block, paths[i], i);
|
addPathAndFileToVirtualColumns(block, paths[i], i, getFormatSettings(context), context->getSettingsRef().use_hive_partitioning);
|
||||||
|
|
||||||
filterBlockWithExpression(actions, block);
|
filterBlockWithExpression(actions, block);
|
||||||
|
|
||||||
|
@ -75,14 +75,14 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(
|
|||||||
const std::string & sample_path = "",
|
const std::string & sample_path = "",
|
||||||
std::optional<FormatSettings> format_settings_ = std::nullopt);
|
std::optional<FormatSettings> format_settings_ = std::nullopt);
|
||||||
|
|
||||||
std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns);
|
std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
|
||||||
|
|
||||||
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns);
|
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
|
||||||
|
|
||||||
template <typename T>
|
template <typename T>
|
||||||
void filterByPathOrFile(std::vector<T> & sources, const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns)
|
void filterByPathOrFile(std::vector<T> & sources, const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
|
||||||
{
|
{
|
||||||
auto indexes_column = getFilterByPathAndFileIndexes(paths, actions, virtual_columns);
|
auto indexes_column = getFilterByPathAndFileIndexes(paths, actions, virtual_columns, context);
|
||||||
const auto & indexes = typeid_cast<const ColumnUInt64 &>(*indexes_column).getData();
|
const auto & indexes = typeid_cast<const ColumnUInt64 &>(*indexes_column).getData();
|
||||||
if (indexes.size() == sources.size())
|
if (indexes.size() == sources.size())
|
||||||
return;
|
return;
|
||||||
|
@ -646,6 +646,7 @@ std::pair<BlocksPtr, Block> StorageWindowView::getNewBlocks(UInt32 watermark)
|
|||||||
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
|
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
|
||||||
|
|
||||||
PullingAsyncPipelineExecutor executor(pipeline);
|
PullingAsyncPipelineExecutor executor(pipeline);
|
||||||
|
pipeline.setConcurrencyControl(getContext()->getSettingsRef().use_concurrency_control);
|
||||||
Block block;
|
Block block;
|
||||||
BlocksPtr new_blocks = std::make_shared<Blocks>();
|
BlocksPtr new_blocks = std::make_shared<Blocks>();
|
||||||
|
|
||||||
|
@ -112,6 +112,7 @@ Block TableFunctionFormat::parseData(const ColumnsDescription & columns, const S
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
builder.setConcurrencyControl(context->getSettingsRef().use_concurrency_control);
|
||||||
auto pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
|
auto pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
|
||||||
auto reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
|
auto reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
|
||||||
|
|
||||||
|
@ -139,13 +139,20 @@ def main():
|
|||||||
jr = JobReport.load(from_file=report_file)
|
jr = JobReport.load(from_file=report_file)
|
||||||
additional_files.append(report_file)
|
additional_files.append(report_file)
|
||||||
for file in set(jr.additional_files):
|
for file in set(jr.additional_files):
|
||||||
file_ = Path(file)
|
orig_file = Path(file)
|
||||||
file_name = file_.name
|
file_name = orig_file.name
|
||||||
file_name = file_name.replace(
|
file_name = file_name.replace(
|
||||||
".", "__" + CI.Utils.normalize_string(job_id) + ".", 1
|
".", "__" + CI.Utils.normalize_string(job_id) + ".", 1
|
||||||
)
|
)
|
||||||
file_ = file_.rename(file_.parent / file_name)
|
new_file = orig_file.rename(orig_file.parent / file_name)
|
||||||
additional_files.append(file_)
|
for tr in test_results:
|
||||||
|
if tr.log_files is None:
|
||||||
|
continue
|
||||||
|
tr.log_files = [
|
||||||
|
new_file if (Path(log_file) == orig_file) else Path(log_file)
|
||||||
|
for log_file in tr.log_files
|
||||||
|
]
|
||||||
|
additional_files.append(new_file)
|
||||||
|
|
||||||
JobReport(
|
JobReport(
|
||||||
description=description,
|
description=description,
|
||||||
|
@ -559,7 +559,7 @@ class CI:
|
|||||||
JobNames.BUGFIX_VALIDATE: JobConfig(
|
JobNames.BUGFIX_VALIDATE: JobConfig(
|
||||||
run_by_label="pr-bugfix",
|
run_by_label="pr-bugfix",
|
||||||
run_command="bugfix_validate_check.py",
|
run_command="bugfix_validate_check.py",
|
||||||
timeout=900,
|
timeout=2400,
|
||||||
runner_type=Runners.STYLE_CHECKER,
|
runner_type=Runners.STYLE_CHECKER,
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
|
import logging
|
||||||
|
import os
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Dict, List, Optional, Sequence, Union
|
from typing import Dict, List, Optional, Sequence, Union
|
||||||
import os
|
|
||||||
import logging
|
|
||||||
|
|
||||||
from env_helper import (
|
from env_helper import (
|
||||||
GITHUB_JOB_URL,
|
GITHUB_JOB_URL,
|
||||||
@ -12,6 +12,8 @@ from env_helper import (
|
|||||||
from report import TestResults, create_test_html_report
|
from report import TestResults, create_test_html_report
|
||||||
from s3_helper import S3Helper
|
from s3_helper import S3Helper
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def process_logs(
|
def process_logs(
|
||||||
s3_client: S3Helper,
|
s3_client: S3Helper,
|
||||||
@ -19,7 +21,7 @@ def process_logs(
|
|||||||
s3_path_prefix: str,
|
s3_path_prefix: str,
|
||||||
test_results: TestResults,
|
test_results: TestResults,
|
||||||
) -> List[str]:
|
) -> List[str]:
|
||||||
logging.info("Upload files to s3 %s", additional_logs)
|
logger.info("Upload files to s3 %s", additional_logs)
|
||||||
|
|
||||||
processed_logs = {} # type: Dict[str, str]
|
processed_logs = {} # type: Dict[str, str]
|
||||||
# Firstly convert paths of logs from test_results to urls to s3.
|
# Firstly convert paths of logs from test_results to urls to s3.
|
||||||
@ -33,9 +35,19 @@ def process_logs(
|
|||||||
if path in processed_logs:
|
if path in processed_logs:
|
||||||
test_result.log_urls.append(processed_logs[str(path)])
|
test_result.log_urls.append(processed_logs[str(path)])
|
||||||
elif path:
|
elif path:
|
||||||
|
try:
|
||||||
url = s3_client.upload_test_report_to_s3(
|
url = s3_client.upload_test_report_to_s3(
|
||||||
Path(path), s3_path_prefix + "/" + str(path)
|
Path(path), s3_path_prefix + "/" + str(path)
|
||||||
)
|
)
|
||||||
|
except FileNotFoundError:
|
||||||
|
# Breaking the whole run on the malformed test is a bad idea
|
||||||
|
# FIXME: report the failure
|
||||||
|
logger.error(
|
||||||
|
"A broken TestResult, file '%s' does not exist: %s",
|
||||||
|
path,
|
||||||
|
test_result,
|
||||||
|
)
|
||||||
|
continue
|
||||||
test_result.log_urls.append(url)
|
test_result.log_urls.append(url)
|
||||||
processed_logs[str(path)] = url
|
processed_logs[str(path)] = url
|
||||||
|
|
||||||
@ -48,7 +60,7 @@ def process_logs(
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
logging.error("File %s is missing - skip", log_path)
|
logger.error("File %s is missing - skip", log_path)
|
||||||
|
|
||||||
return additional_urls
|
return additional_urls
|
||||||
|
|
||||||
@ -116,8 +128,8 @@ def upload_results(
|
|||||||
report_path.write_text(html_report, encoding="utf-8")
|
report_path.write_text(html_report, encoding="utf-8")
|
||||||
url = s3_client.upload_test_report_to_s3(report_path, s3_path_prefix + ".html")
|
url = s3_client.upload_test_report_to_s3(report_path, s3_path_prefix + ".html")
|
||||||
else:
|
else:
|
||||||
logging.info("report.html was prepared by test job itself")
|
logger.info("report.html was prepared by test job itself")
|
||||||
url = ready_report_url
|
url = ready_report_url
|
||||||
|
|
||||||
logging.info("Search result in url %s", url)
|
logger.info("Search result in url %s", url)
|
||||||
return url
|
return url
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
<clickhouse>
|
<clickhouse>
|
||||||
|
<concurrent_threads_soft_limit_ratio_to_cores>0</concurrent_threads_soft_limit_ratio_to_cores>
|
||||||
<concurrent_threads_soft_limit_num>50</concurrent_threads_soft_limit_num>
|
<concurrent_threads_soft_limit_num>50</concurrent_threads_soft_limit_num>
|
||||||
<query_log>
|
<query_log>
|
||||||
<database>system</database>
|
<database>system</database>
|
||||||
|
@ -28,6 +28,16 @@ node4 = cluster.add_instance(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def assert_profile_event(node, query_id, profile_event, check):
|
||||||
|
assert check(
|
||||||
|
int(
|
||||||
|
node.query(
|
||||||
|
f"select ProfileEvents['{profile_event}'] from system.query_log where current_database = currentDatabase() and query_id = '{query_id}' and type = 'QueryFinish' order by query_start_time_microseconds desc limit 1"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="module")
|
@pytest.fixture(scope="module")
|
||||||
def started_cluster():
|
def started_cluster():
|
||||||
try:
|
try:
|
||||||
@ -43,39 +53,176 @@ def test_concurrent_threads_soft_limit_default(started_cluster):
|
|||||||
query_id="test_concurrent_threads_soft_limit_1",
|
query_id="test_concurrent_threads_soft_limit_1",
|
||||||
)
|
)
|
||||||
node1.query("SYSTEM FLUSH LOGS")
|
node1.query("SYSTEM FLUSH LOGS")
|
||||||
|
assert_profile_event(
|
||||||
|
node1,
|
||||||
|
"test_concurrent_threads_soft_limit_1",
|
||||||
|
"ConcurrencyControlGrantedHard",
|
||||||
|
lambda x: x == 1,
|
||||||
|
)
|
||||||
|
assert_profile_event(
|
||||||
|
node1,
|
||||||
|
"test_concurrent_threads_soft_limit_1",
|
||||||
|
"ConcurrencyControlGrantDelayed",
|
||||||
|
lambda x: x == 0,
|
||||||
|
)
|
||||||
|
assert_profile_event(
|
||||||
|
node1,
|
||||||
|
"test_concurrent_threads_soft_limit_1",
|
||||||
|
"ConcurrencyControlAcquiredTotal",
|
||||||
|
lambda x: x == 100,
|
||||||
|
)
|
||||||
|
assert_profile_event(
|
||||||
|
node1,
|
||||||
|
"test_concurrent_threads_soft_limit_1",
|
||||||
|
"ConcurrencyControlAllocationDelayed",
|
||||||
|
lambda x: x == 0,
|
||||||
|
)
|
||||||
assert (
|
assert (
|
||||||
node1.query(
|
node1.query(
|
||||||
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_1'"
|
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_1' order by query_start_time_microseconds desc limit 1"
|
||||||
)
|
)
|
||||||
== "102\n"
|
== "102\n"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skip(reason="broken test")
|
def test_use_concurrency_control_default(started_cluster):
|
||||||
|
node1.query(
|
||||||
|
"SELECT count(*) FROM numbers_mt(10000000) SETTINGS use_concurrency_control = 0",
|
||||||
|
query_id="test_use_concurrency_control",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Concurrency control is not used, all metrics should be zeros
|
||||||
|
node1.query("SYSTEM FLUSH LOGS")
|
||||||
|
assert_profile_event(
|
||||||
|
node1,
|
||||||
|
"test_use_concurrency_control",
|
||||||
|
"ConcurrencyControlGrantedHard",
|
||||||
|
lambda x: x == 0,
|
||||||
|
)
|
||||||
|
assert_profile_event(
|
||||||
|
node1,
|
||||||
|
"test_use_concurrency_control",
|
||||||
|
"ConcurrencyControlGrantDelayed",
|
||||||
|
lambda x: x == 0,
|
||||||
|
)
|
||||||
|
assert_profile_event(
|
||||||
|
node1,
|
||||||
|
"test_use_concurrency_control",
|
||||||
|
"ConcurrencyControlAcquiredTotal",
|
||||||
|
lambda x: x == 0,
|
||||||
|
)
|
||||||
|
assert_profile_event(
|
||||||
|
node1,
|
||||||
|
"test_use_concurrency_control",
|
||||||
|
"ConcurrencyControlAllocationDelayed",
|
||||||
|
lambda x: x == 0,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_concurrent_threads_soft_limit_defined_50(started_cluster):
|
def test_concurrent_threads_soft_limit_defined_50(started_cluster):
|
||||||
node2.query(
|
node2.query(
|
||||||
"SELECT count(*) FROM numbers_mt(10000000)",
|
"SELECT count(*) FROM numbers_mt(10000000)",
|
||||||
query_id="test_concurrent_threads_soft_limit_2",
|
query_id="test_concurrent_threads_soft_limit_2",
|
||||||
)
|
)
|
||||||
node2.query("SYSTEM FLUSH LOGS")
|
node2.query("SYSTEM FLUSH LOGS")
|
||||||
|
assert_profile_event(
|
||||||
|
node2,
|
||||||
|
"test_concurrent_threads_soft_limit_2",
|
||||||
|
"ConcurrencyControlGrantedHard",
|
||||||
|
lambda x: x == 1,
|
||||||
|
)
|
||||||
|
assert_profile_event(
|
||||||
|
node2,
|
||||||
|
"test_concurrent_threads_soft_limit_2",
|
||||||
|
"ConcurrencyControlGrantDelayed",
|
||||||
|
lambda x: x == 50,
|
||||||
|
)
|
||||||
|
assert_profile_event(
|
||||||
|
node2,
|
||||||
|
"test_concurrent_threads_soft_limit_2",
|
||||||
|
"ConcurrencyControlAcquiredTotal",
|
||||||
|
lambda x: x == 50,
|
||||||
|
)
|
||||||
|
assert_profile_event(
|
||||||
|
node2,
|
||||||
|
"test_concurrent_threads_soft_limit_2",
|
||||||
|
"ConcurrencyControlAllocationDelayed",
|
||||||
|
lambda x: x == 1,
|
||||||
|
)
|
||||||
assert (
|
assert (
|
||||||
node2.query(
|
node2.query(
|
||||||
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_2'"
|
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_2' order by query_start_time_microseconds desc limit 1"
|
||||||
)
|
)
|
||||||
== "52\n"
|
== "52\n"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skip(reason="broken test")
|
def test_use_concurrency_control_soft_limit_defined_50(started_cluster):
|
||||||
|
node2.query(
|
||||||
|
"SELECT count(*) FROM numbers_mt(10000000) SETTINGS use_concurrency_control = 0",
|
||||||
|
query_id="test_use_concurrency_control_2",
|
||||||
|
)
|
||||||
|
# Concurrency control is not used, all metrics should be zeros
|
||||||
|
node2.query("SYSTEM FLUSH LOGS")
|
||||||
|
assert_profile_event(
|
||||||
|
node2,
|
||||||
|
"test_use_concurrency_control_2",
|
||||||
|
"ConcurrencyControlGrantedHard",
|
||||||
|
lambda x: x == 0,
|
||||||
|
)
|
||||||
|
assert_profile_event(
|
||||||
|
node2,
|
||||||
|
"test_use_concurrency_control_2",
|
||||||
|
"ConcurrencyControlGrantDelayed",
|
||||||
|
lambda x: x == 0,
|
||||||
|
)
|
||||||
|
assert_profile_event(
|
||||||
|
node2,
|
||||||
|
"test_use_concurrency_control_2",
|
||||||
|
"ConcurrencyControlAcquiredTotal",
|
||||||
|
lambda x: x == 0,
|
||||||
|
)
|
||||||
|
assert_profile_event(
|
||||||
|
node2,
|
||||||
|
"test_use_concurrency_control_2",
|
||||||
|
"ConcurrencyControlAllocationDelayed",
|
||||||
|
lambda x: x == 0,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_concurrent_threads_soft_limit_defined_1(started_cluster):
|
def test_concurrent_threads_soft_limit_defined_1(started_cluster):
|
||||||
node3.query(
|
node3.query(
|
||||||
"SELECT count(*) FROM numbers_mt(10000000)",
|
"SELECT count(*) FROM numbers_mt(10000000)",
|
||||||
query_id="test_concurrent_threads_soft_limit_3",
|
query_id="test_concurrent_threads_soft_limit_3",
|
||||||
)
|
)
|
||||||
node3.query("SYSTEM FLUSH LOGS")
|
node3.query("SYSTEM FLUSH LOGS")
|
||||||
|
assert_profile_event(
|
||||||
|
node3,
|
||||||
|
"test_concurrent_threads_soft_limit_3",
|
||||||
|
"ConcurrencyControlGrantedHard",
|
||||||
|
lambda x: x == 1,
|
||||||
|
)
|
||||||
|
assert_profile_event(
|
||||||
|
node3,
|
||||||
|
"test_concurrent_threads_soft_limit_3",
|
||||||
|
"ConcurrencyControlGrantDelayed",
|
||||||
|
lambda x: x == 99,
|
||||||
|
)
|
||||||
|
assert_profile_event(
|
||||||
|
node3,
|
||||||
|
"test_concurrent_threads_soft_limit_3",
|
||||||
|
"ConcurrencyControlAcquiredTotal",
|
||||||
|
lambda x: x == 1,
|
||||||
|
)
|
||||||
|
assert_profile_event(
|
||||||
|
node3,
|
||||||
|
"test_concurrent_threads_soft_limit_3",
|
||||||
|
"ConcurrencyControlAllocationDelayed",
|
||||||
|
lambda x: x == 1,
|
||||||
|
)
|
||||||
assert (
|
assert (
|
||||||
node3.query(
|
node3.query(
|
||||||
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_3'"
|
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_3' order by query_start_time_microseconds desc limit 1"
|
||||||
)
|
)
|
||||||
== "3\n"
|
== "3\n"
|
||||||
)
|
)
|
||||||
@ -84,7 +231,6 @@ def test_concurrent_threads_soft_limit_defined_1(started_cluster):
|
|||||||
# In config_limit_reached.xml there is concurrent_threads_soft_limit=10
|
# In config_limit_reached.xml there is concurrent_threads_soft_limit=10
|
||||||
# Background query starts in a separate thread to reach this limit.
|
# Background query starts in a separate thread to reach this limit.
|
||||||
# When this limit is reached the foreground query gets less than 5 queries despite the fact that it has settings max_threads=5
|
# When this limit is reached the foreground query gets less than 5 queries despite the fact that it has settings max_threads=5
|
||||||
@pytest.mark.skip(reason="broken test")
|
|
||||||
def test_concurrent_threads_soft_limit_limit_reached(started_cluster):
|
def test_concurrent_threads_soft_limit_limit_reached(started_cluster):
|
||||||
def background_query():
|
def background_query():
|
||||||
try:
|
try:
|
||||||
@ -117,8 +263,32 @@ def test_concurrent_threads_soft_limit_limit_reached(started_cluster):
|
|||||||
)
|
)
|
||||||
|
|
||||||
node4.query("SYSTEM FLUSH LOGS")
|
node4.query("SYSTEM FLUSH LOGS")
|
||||||
|
assert_profile_event(
|
||||||
|
node4,
|
||||||
|
"test_concurrent_threads_soft_limit_4",
|
||||||
|
"ConcurrencyControlGrantedHard",
|
||||||
|
lambda x: x == 1,
|
||||||
|
)
|
||||||
|
assert_profile_event(
|
||||||
|
node4,
|
||||||
|
"test_concurrent_threads_soft_limit_4",
|
||||||
|
"ConcurrencyControlGrantDelayed",
|
||||||
|
lambda x: x > 0,
|
||||||
|
)
|
||||||
|
assert_profile_event(
|
||||||
|
node4,
|
||||||
|
"test_concurrent_threads_soft_limit_4",
|
||||||
|
"ConcurrencyControlAcquiredTotal",
|
||||||
|
lambda x: x < 5,
|
||||||
|
)
|
||||||
|
assert_profile_event(
|
||||||
|
node4,
|
||||||
|
"test_concurrent_threads_soft_limit_4",
|
||||||
|
"ConcurrencyControlAllocationDelayed",
|
||||||
|
lambda x: x == 1,
|
||||||
|
)
|
||||||
s_count = node4.query(
|
s_count = node4.query(
|
||||||
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_4'"
|
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_4' order by query_start_time_microseconds desc limit 1"
|
||||||
).strip()
|
).strip()
|
||||||
if s_count:
|
if s_count:
|
||||||
count = int(s_count)
|
count = int(s_count)
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
import pytest
|
import pytest
|
||||||
import random, string
|
import random, string
|
||||||
|
import re
|
||||||
from helpers.cluster import ClickHouseCluster
|
from helpers.cluster import ClickHouseCluster
|
||||||
from helpers.test_tools import TSV
|
from helpers.test_tools import TSV
|
||||||
|
|
||||||
@ -336,6 +337,10 @@ def test_create_database():
|
|||||||
def test_table_functions():
|
def test_table_functions():
|
||||||
password = new_password()
|
password = new_password()
|
||||||
azure_conn_string = cluster.env_variables["AZURITE_CONNECTION_STRING"]
|
azure_conn_string = cluster.env_variables["AZURITE_CONNECTION_STRING"]
|
||||||
|
account_key_pattern = re.compile("AccountKey=.*?(;|$)")
|
||||||
|
masked_azure_conn_string = re.sub(
|
||||||
|
account_key_pattern, "AccountKey=[HIDDEN]\\1", azure_conn_string
|
||||||
|
)
|
||||||
azure_storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
|
azure_storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
|
||||||
azure_account_name = "devstoreaccount1"
|
azure_account_name = "devstoreaccount1"
|
||||||
azure_account_key = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
|
azure_account_key = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
|
||||||
@ -467,23 +472,23 @@ def test_table_functions():
|
|||||||
"CREATE TABLE tablefunc30 (x int) AS s3('http://minio1:9001/root/data/test9.csv.gz', 'NOSIGN', 'CSV')",
|
"CREATE TABLE tablefunc30 (x int) AS s3('http://minio1:9001/root/data/test9.csv.gz', 'NOSIGN', 'CSV')",
|
||||||
"CREATE TABLE tablefunc31 (`x` int) AS s3('http://minio1:9001/root/data/test10.csv.gz', 'minio', '[HIDDEN]')",
|
"CREATE TABLE tablefunc31 (`x` int) AS s3('http://minio1:9001/root/data/test10.csv.gz', 'minio', '[HIDDEN]')",
|
||||||
"CREATE TABLE tablefunc32 (`x` int) AS deltaLake('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')",
|
"CREATE TABLE tablefunc32 (`x` int) AS deltaLake('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')",
|
||||||
f"CREATE TABLE tablefunc33 (x int) AS azureBlobStorage('{azure_conn_string}', 'cont', 'test_simple.csv', 'CSV')",
|
f"CREATE TABLE tablefunc33 (`x` int) AS azureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple.csv', 'CSV')",
|
||||||
f"CREATE TABLE tablefunc34 (x int) AS azureBlobStorage('{azure_conn_string}', 'cont', 'test_simple_1.csv', 'CSV', 'none')",
|
f"CREATE TABLE tablefunc34 (`x` int) AS azureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple_1.csv', 'CSV', 'none')",
|
||||||
f"CREATE TABLE tablefunc35 (x int) AS azureBlobStorage('{azure_conn_string}', 'cont', 'test_simple_2.csv', 'CSV', 'none', 'auto')",
|
f"CREATE TABLE tablefunc35 (`x` int) AS azureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple_2.csv', 'CSV', 'none', 'auto')",
|
||||||
f"CREATE TABLE tablefunc36 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_3.csv', '{azure_account_name}', '[HIDDEN]')",
|
f"CREATE TABLE tablefunc36 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_3.csv', '{azure_account_name}', '[HIDDEN]')",
|
||||||
f"CREATE TABLE tablefunc37 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_4.csv', '{azure_account_name}', '[HIDDEN]', 'CSV')",
|
f"CREATE TABLE tablefunc37 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_4.csv', '{azure_account_name}', '[HIDDEN]', 'CSV')",
|
||||||
f"CREATE TABLE tablefunc38 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_5.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none')",
|
f"CREATE TABLE tablefunc38 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_5.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none')",
|
||||||
f"CREATE TABLE tablefunc39 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_6.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none', 'auto')",
|
f"CREATE TABLE tablefunc39 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_6.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none', 'auto')",
|
||||||
f"CREATE TABLE tablefunc40 (x int) AS azureBlobStorage(named_collection_2, connection_string = '{azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')",
|
f"CREATE TABLE tablefunc40 (`x` int) AS azureBlobStorage(named_collection_2, connection_string = '{masked_azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')",
|
||||||
f"CREATE TABLE tablefunc41 (`x` int) AS azureBlobStorage(named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')",
|
f"CREATE TABLE tablefunc41 (`x` int) AS azureBlobStorage(named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')",
|
||||||
f"CREATE TABLE tablefunc42 (x int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_conn_string}', 'cont', 'test_simple_9.csv', 'CSV')",
|
f"CREATE TABLE tablefunc42 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{masked_azure_conn_string}', 'cont', 'test_simple_9.csv', 'CSV')",
|
||||||
f"CREATE TABLE tablefunc43 (x int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_conn_string}', 'cont', 'test_simple_10.csv', 'CSV', 'none')",
|
f"CREATE TABLE tablefunc43 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{masked_azure_conn_string}', 'cont', 'test_simple_10.csv', 'CSV', 'none')",
|
||||||
f"CREATE TABLE tablefunc44 (x int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_conn_string}', 'cont', 'test_simple_11.csv', 'CSV', 'none', 'auto')",
|
f"CREATE TABLE tablefunc44 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{masked_azure_conn_string}', 'cont', 'test_simple_11.csv', 'CSV', 'none', 'auto')",
|
||||||
f"CREATE TABLE tablefunc45 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_12.csv', '{azure_account_name}', '[HIDDEN]')",
|
f"CREATE TABLE tablefunc45 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_12.csv', '{azure_account_name}', '[HIDDEN]')",
|
||||||
f"CREATE TABLE tablefunc46 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_13.csv', '{azure_account_name}', '[HIDDEN]', 'CSV')",
|
f"CREATE TABLE tablefunc46 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_13.csv', '{azure_account_name}', '[HIDDEN]', 'CSV')",
|
||||||
f"CREATE TABLE tablefunc47 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_14.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none')",
|
f"CREATE TABLE tablefunc47 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_14.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none')",
|
||||||
f"CREATE TABLE tablefunc48 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_15.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none', 'auto')",
|
f"CREATE TABLE tablefunc48 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_15.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none', 'auto')",
|
||||||
f"CREATE TABLE tablefunc49 (x int) AS azureBlobStorageCluster('test_shard_localhost', named_collection_2, connection_string = '{azure_conn_string}', container = 'cont', blob_path = 'test_simple_16.csv', format = 'CSV')",
|
f"CREATE TABLE tablefunc49 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', named_collection_2, connection_string = '{masked_azure_conn_string}', container = 'cont', blob_path = 'test_simple_16.csv', format = 'CSV')",
|
||||||
f"CREATE TABLE tablefunc50 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_17.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')",
|
f"CREATE TABLE tablefunc50 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_17.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')",
|
||||||
"CREATE TABLE tablefunc51 (`x` int) AS iceberg('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')",
|
"CREATE TABLE tablefunc51 (`x` int) AS iceberg('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')",
|
||||||
],
|
],
|
||||||
|
@ -3,6 +3,7 @@
|
|||||||
set log_queries=1;
|
set log_queries=1;
|
||||||
set log_query_threads=1;
|
set log_query_threads=1;
|
||||||
set max_threads=0;
|
set max_threads=0;
|
||||||
|
set use_concurrency_control=0;
|
||||||
|
|
||||||
WITH 01091 AS id SELECT 1;
|
WITH 01091 AS id SELECT 1;
|
||||||
SYSTEM FLUSH LOGS;
|
SYSTEM FLUSH LOGS;
|
||||||
|
@ -6,6 +6,6 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
|||||||
# shellcheck source=../shell_config.sh
|
# shellcheck source=../shell_config.sh
|
||||||
. "$CURDIR"/../shell_config.sh
|
. "$CURDIR"/../shell_config.sh
|
||||||
|
|
||||||
${CLICKHOUSE_CLIENT} --log_queries=1 --max_threads=32 --query_id "2015_${CLICKHOUSE_DATABASE}_query" -q "select count() from remote('127.0.0.{2,3}', numbers(10)) where number global in (select number % 5 from numbers_mt(1000000))"
|
${CLICKHOUSE_CLIENT} --log_queries=1 --max_threads=32 --use_concurrency_control=0 --query_id "2015_${CLICKHOUSE_DATABASE}_query" -q "select count() from remote('127.0.0.{2,3}', numbers(10)) where number global in (select number % 5 from numbers_mt(1000000))"
|
||||||
${CLICKHOUSE_CLIENT} -q "system flush logs"
|
${CLICKHOUSE_CLIENT} -q "system flush logs"
|
||||||
${CLICKHOUSE_CLIENT} -q "select length(thread_ids) >= 32 from system.query_log where event_date >= yesterday() and query_id = '2015_${CLICKHOUSE_DATABASE}_query' and type = 'QueryFinish' and current_database = currentDatabase()"
|
${CLICKHOUSE_CLIENT} -q "select length(thread_ids) >= 32 from system.query_log where event_date >= yesterday() and query_id = '2015_${CLICKHOUSE_DATABASE}_query' and type = 'QueryFinish' and current_database = currentDatabase()"
|
||||||
|
@ -5,6 +5,9 @@
|
|||||||
-- enforce some defaults to be sure that the env settings will not affect the test
|
-- enforce some defaults to be sure that the env settings will not affect the test
|
||||||
SET max_threads=5, async_socket_for_remote=1, prefer_localhost_replica=1, optimize_read_in_order=1, load_marks_asynchronously=0, local_filesystem_read_method='pread', remote_filesystem_read_method='read', trace_profile_events=0;
|
SET max_threads=5, async_socket_for_remote=1, prefer_localhost_replica=1, optimize_read_in_order=1, load_marks_asynchronously=0, local_filesystem_read_method='pread', remote_filesystem_read_method='read', trace_profile_events=0;
|
||||||
|
|
||||||
|
-- we do not want concurrency control to limit the number of threads
|
||||||
|
SET use_concurrency_control=0;
|
||||||
|
|
||||||
-- we use query_thread_log to check peak thread usage
|
-- we use query_thread_log to check peak thread usage
|
||||||
-- after https://github.com/ClickHouse/ClickHouse/issues/53417 there is a simpler way to check it
|
-- after https://github.com/ClickHouse/ClickHouse/issues/53417 there is a simpler way to check it
|
||||||
-- but that will not allow to backport the test to older versions
|
-- but that will not allow to backport the test to older versions
|
||||||
|
@ -1,21 +1,31 @@
|
|||||||
1
|
%d: 123
|
||||||
1
|
%d: -123
|
||||||
1
|
%d: 0
|
||||||
1
|
%d: 9223372036854775807
|
||||||
1
|
%i: 123
|
||||||
1
|
%u: 123
|
||||||
1
|
%o: 173
|
||||||
1
|
%x: 7b
|
||||||
1
|
%X: 7B
|
||||||
1
|
%f: 0.000000
|
||||||
1
|
%f: 123.456000
|
||||||
1
|
%f: -123.456000
|
||||||
1
|
%F: 123.456000
|
||||||
1
|
%e: 1.234560e+02
|
||||||
1
|
%E: 1.234560E+02
|
||||||
1
|
%g: 123.456
|
||||||
1
|
%G: 123.456
|
||||||
1
|
%a: 0x1.edd2f1a9fbe77p+6
|
||||||
1
|
%A: 0X1.EDD2F1A9FBE77P+6
|
||||||
1
|
%s: abc
|
||||||
1
|
┌─printf('%%s: %s', '\n\t')─┐
|
||||||
|
1. │ %s:
|
||||||
|
│
|
||||||
|
└───────────────────────────┘
|
||||||
|
%s:
|
||||||
|
%%: %
|
||||||
|
%.5d: 00123
|
||||||
|
%.2f: 123.46
|
||||||
|
%.2e: 1.23e+02
|
||||||
|
%.2g: 1.2e+02
|
||||||
|
%.2s: ab
|
||||||
|
@ -1,39 +1,47 @@
|
|||||||
-- Testing integer formats
|
-- Testing integer formats
|
||||||
select printf('%%d: %d', 123) = '%d: 123';
|
select printf('%%d: %d', 123);
|
||||||
select printf('%%i: %i', 123) = '%i: 123';
|
select printf('%%d: %d', -123);
|
||||||
select printf('%%u: %u', 123) = '%u: 123';
|
select printf('%%d: %d', 0);
|
||||||
select printf('%%o: %o', 123) = '%o: 173';
|
select printf('%%d: %d', 9223372036854775807);
|
||||||
select printf('%%x: %x', 123) = '%x: 7b';
|
select printf('%%i: %i', 123);
|
||||||
select printf('%%X: %X', 123) = '%X: 7B';
|
select printf('%%u: %u', 123);
|
||||||
|
select printf('%%o: %o', 123);
|
||||||
|
select printf('%%x: %x', 123);
|
||||||
|
select printf('%%X: %X', 123);
|
||||||
|
|
||||||
-- Testing floating point formats
|
-- Testing floating point formats
|
||||||
select printf('%%f: %f', 123.456) = '%f: 123.456000';
|
select printf('%%f: %f', 0.0);
|
||||||
select printf('%%F: %F', 123.456) = '%F: 123.456000';
|
select printf('%%f: %f', 123.456);
|
||||||
select printf('%%e: %e', 123.456) = '%e: 1.234560e+02';
|
select printf('%%f: %f', -123.456);
|
||||||
select printf('%%E: %E', 123.456) = '%E: 1.234560E+02';
|
select printf('%%F: %F', 123.456);
|
||||||
select printf('%%g: %g', 123.456) = '%g: 123.456';
|
select printf('%%e: %e', 123.456);
|
||||||
select printf('%%G: %G', 123.456) = '%G: 123.456';
|
select printf('%%E: %E', 123.456);
|
||||||
select printf('%%a: %a', 123.456) = '%a: 0x1.edd2f1a9fbe77p+6';
|
select printf('%%g: %g', 123.456);
|
||||||
select printf('%%A: %A', 123.456) = '%A: 0X1.EDD2F1A9FBE77P+6';
|
select printf('%%G: %G', 123.456);
|
||||||
|
select printf('%%a: %a', 123.456);
|
||||||
|
select printf('%%A: %A', 123.456);
|
||||||
|
|
||||||
-- Testing character formats
|
-- Testing character formats
|
||||||
select printf('%%s: %s', 'abc') = '%s: abc';
|
select printf('%%s: %s', 'abc');
|
||||||
|
SELECT printf('%%s: %s', '\n\t') FORMAT PrettyCompact;
|
||||||
|
select printf('%%s: %s', '');
|
||||||
|
|
||||||
-- Testing the %% specifier
|
-- Testing the %% specifier
|
||||||
select printf('%%%%: %%') = '%%: %';
|
select printf('%%%%: %%');
|
||||||
|
|
||||||
-- Testing integer formats with precision
|
-- Testing integer formats with precision
|
||||||
select printf('%%.5d: %.5d', 123) = '%.5d: 00123';
|
select printf('%%.5d: %.5d', 123);
|
||||||
|
|
||||||
-- Testing floating point formats with precision
|
-- Testing floating point formats with precision
|
||||||
select printf('%%.2f: %.2f', 123.456) = '%.2f: 123.46';
|
select printf('%%.2f: %.2f', 123.456);
|
||||||
select printf('%%.2e: %.2e', 123.456) = '%.2e: 1.23e+02';
|
select printf('%%.2e: %.2e', 123.456);
|
||||||
select printf('%%.2g: %.2g', 123.456) = '%.2g: 1.2e+02';
|
select printf('%%.2g: %.2g', 123.456);
|
||||||
|
|
||||||
-- Testing character formats with precision
|
-- Testing character formats with precision
|
||||||
select printf('%%.2s: %.2s', 'abc') = '%.2s: ab';
|
select printf('%%.2s: %.2s', 'abc');
|
||||||
|
|
||||||
select printf('%%X: %X', 123.123); -- { serverError BAD_ARGUMENTS }
|
select printf('%%X: %X', 123.123); -- { serverError BAD_ARGUMENTS }
|
||||||
select printf('%%A: %A', 'abc'); -- { serverError BAD_ARGUMENTS }
|
select printf('%%A: %A', 'abc'); -- { serverError BAD_ARGUMENTS }
|
||||||
select printf('%%s: %s', 100); -- { serverError BAD_ARGUMENTS }
|
select printf('%%s: %s', 100); -- { serverError BAD_ARGUMENTS }
|
||||||
select printf('%%n: %n', 100); -- { serverError BAD_ARGUMENTS }
|
select printf('%%n: %n', 100); -- { serverError BAD_ARGUMENTS }
|
||||||
|
select printf('%%f: %f', 0); -- { serverError BAD_ARGUMENTS }
|
||||||
|
@ -33,8 +33,8 @@ Cross Elizabeth
|
|||||||
[1,2,3] 42.42
|
[1,2,3] 42.42
|
||||||
Array(Int64) LowCardinality(Float64)
|
Array(Int64) LowCardinality(Float64)
|
||||||
101
|
101
|
||||||
2070
|
2071
|
||||||
2070
|
2071
|
||||||
b
|
b
|
||||||
1
|
1
|
||||||
1
|
1
|
||||||
|
@ -0,0 +1,6 @@
|
|||||||
|
1
|
||||||
|
1
|
||||||
|
1
|
||||||
|
1
|
||||||
|
1
|
||||||
|
1
|
72
tests/queries/0_stateless/03231_hive_partitioning_filtering.sh
Executable file
72
tests/queries/0_stateless/03231_hive_partitioning_filtering.sh
Executable file
@ -0,0 +1,72 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
# Tags: no-fasttest
|
||||||
|
|
||||||
|
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||||
|
# shellcheck source=../shell_config.sh
|
||||||
|
. "$CURDIR"/../shell_config.sh
|
||||||
|
|
||||||
|
DATA_DIR=$USER_FILES_PATH/$CLICKHOUSE_TEST_UNIQUE_NAME
|
||||||
|
mkdir -p $DATA_DIR
|
||||||
|
cp -r $CURDIR/data_hive/ $DATA_DIR
|
||||||
|
|
||||||
|
$CLICKHOUSE_CLIENT --query_id="test_03231_1_$CLICKHOUSE_TEST_UNIQUE_NAME" --query "
|
||||||
|
SELECT countDistinct(_path) FROM file('$DATA_DIR/data_hive/partitioning/column0=*/sample.parquet') WHERE column0 = 'Elizabeth' SETTINGS use_hive_partitioning=1, optimize_count_from_files=0;
|
||||||
|
"
|
||||||
|
|
||||||
|
$CLICKHOUSE_CLIENT --query "
|
||||||
|
SYSTEM FLUSH LOGS;
|
||||||
|
"
|
||||||
|
|
||||||
|
for _ in {1..5}; do
|
||||||
|
count=$( $CLICKHOUSE_CLIENT --query "
|
||||||
|
SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log
|
||||||
|
WHERE query_id='test_03231_1_$CLICKHOUSE_TEST_UNIQUE_NAME' AND
|
||||||
|
current_database = currentDatabase() and type='QueryFinish';" )
|
||||||
|
if [[ "$count" == "1" ]]; then
|
||||||
|
echo "1"
|
||||||
|
break
|
||||||
|
fi
|
||||||
|
sleep 1
|
||||||
|
done
|
||||||
|
|
||||||
|
$CLICKHOUSE_CLIENT --query_id="test_03231_2_$CLICKHOUSE_TEST_UNIQUE_NAME" --query "
|
||||||
|
SELECT countDistinct(_path) FROM file('$DATA_DIR/data_hive/partitioning/identifier=*/email.csv') WHERE identifier = 2070 SETTINGS use_hive_partitioning=1, optimize_count_from_files=0;
|
||||||
|
"
|
||||||
|
|
||||||
|
$CLICKHOUSE_CLIENT --query "
|
||||||
|
SYSTEM FLUSH LOGS;
|
||||||
|
"
|
||||||
|
|
||||||
|
for _ in {1..5}; do
|
||||||
|
count=$( $CLICKHOUSE_CLIENT --query "
|
||||||
|
SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log
|
||||||
|
WHERE query_id='test_03231_2_$CLICKHOUSE_TEST_UNIQUE_NAME' AND
|
||||||
|
current_database = currentDatabase() and type='QueryFinish';" )
|
||||||
|
if [[ "$count" == "1" ]]; then
|
||||||
|
echo "1"
|
||||||
|
break
|
||||||
|
fi
|
||||||
|
sleep 1
|
||||||
|
done
|
||||||
|
|
||||||
|
$CLICKHOUSE_CLIENT --query_id="test_03231_3_$CLICKHOUSE_TEST_UNIQUE_NAME" --query "
|
||||||
|
SELECT countDistinct(_path) FROM file('$DATA_DIR/data_hive/partitioning/array=*/sample.parquet') WHERE array = [1,2,3] SETTINGS use_hive_partitioning=1, optimize_count_from_files=0;
|
||||||
|
"
|
||||||
|
|
||||||
|
$CLICKHOUSE_CLIENT --query "
|
||||||
|
SYSTEM FLUSH LOGS;
|
||||||
|
"
|
||||||
|
|
||||||
|
for _ in {1..5}; do
|
||||||
|
count=$( $CLICKHOUSE_CLIENT --query "
|
||||||
|
SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log
|
||||||
|
WHERE query_id='test_03231_3_$CLICKHOUSE_TEST_UNIQUE_NAME' AND
|
||||||
|
current_database = currentDatabase() and type='QueryFinish';" )
|
||||||
|
if [[ "$count" == "1" ]]; then
|
||||||
|
echo "1"
|
||||||
|
break
|
||||||
|
fi
|
||||||
|
sleep 1
|
||||||
|
done
|
||||||
|
|
||||||
|
rm -rf $DATA_DIR
|
@ -1 +1 @@
|
|||||||
data_hive/partitioning/column0=Elizabeth/sample.parquet
|
data_hive/partitioning/non_existing_column=Elizabeth/sample.parquet
|
||||||
|
@ -5,4 +5,4 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
|||||||
# shellcheck source=../shell_config.sh
|
# shellcheck source=../shell_config.sh
|
||||||
. "$CURDIR"/../shell_config.sh
|
. "$CURDIR"/../shell_config.sh
|
||||||
|
|
||||||
$CLICKHOUSE_LOCAL -q "SELECT substring(_path, position(_path, 'data_hive')) FROM file('$CURDIR/data_hive/partitioning/column0=*/sample.parquet') LIMIT 1;"
|
$CLICKHOUSE_LOCAL -q "SELECT substring(_path, position(_path, 'data_hive')) FROM file('$CURDIR/data_hive/partitioning/non_existing_column=*/sample.parquet') LIMIT 1;"
|
||||||
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -0,0 +1,5 @@
|
|||||||
|
_login_email,_identifier,_first_name,_last_name
|
||||||
|
laura@example.com,2070,Laura,Grey
|
||||||
|
craig@example.com,4081,Craig,Johnson
|
||||||
|
mary@example.com,9346,Mary,Jenkins
|
||||||
|
jamie@example.com,5079,Jamie,Smith
|
|
@ -181,6 +181,9 @@ ComplexKeyCache
|
|||||||
ComplexKeyDirect
|
ComplexKeyDirect
|
||||||
ComplexKeyHashed
|
ComplexKeyHashed
|
||||||
Composable
|
Composable
|
||||||
|
composable
|
||||||
|
ConcurrencyControlAcquired
|
||||||
|
ConcurrencyControlSoftLimit
|
||||||
Config
|
Config
|
||||||
ConnectionDetails
|
ConnectionDetails
|
||||||
Const
|
Const
|
||||||
|
Loading…
Reference in New Issue
Block a user