mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
Compare commits
90 Commits
e764f6726e
...
1b7b1325d4
Author | SHA1 | Date | |
---|---|---|---|
|
1b7b1325d4 | ||
|
c0c83236b6 | ||
|
3eb5bc1a0f | ||
|
b94a7167a8 | ||
|
b88cd79959 | ||
|
64e58baba1 | ||
|
a3fe155579 | ||
|
f4b4b3cc35 | ||
|
cb24849396 | ||
|
758acb433d | ||
|
143d9f0201 | ||
|
7af0ec8b23 | ||
|
b08e727aef | ||
|
f52cdfb795 | ||
|
a210f98819 | ||
|
7c5d55c6b2 | ||
|
80259659ff | ||
|
3a7c68a052 | ||
|
e8d50aa97f | ||
|
e5640acc52 | ||
|
cb92aaf968 | ||
|
c75118c78d | ||
|
0cdec0acf1 | ||
|
d86ad992f1 | ||
|
10819dda2a | ||
|
b8ea0e3396 | ||
|
04f23332c3 | ||
|
7d5203f8a7 | ||
|
0d1d750437 | ||
|
ad31d86a15 | ||
|
991279e5c6 | ||
|
c184aae686 | ||
|
14a6b0422b | ||
|
aab0d3dd9e | ||
|
5a34b9f24e | ||
|
a0a4858e00 | ||
|
90645d7c0e | ||
|
e8cec05d08 | ||
|
2876a4e714 | ||
|
a903e1a726 | ||
|
2fa6be55ff | ||
|
8896d1b78b | ||
|
f688b903db | ||
|
21f9669836 | ||
|
1a386ae4d5 | ||
|
24f4e87f8b | ||
|
b5eb0ef857 | ||
|
78e8dbe008 | ||
|
5c18ffb8d1 | ||
|
620640a042 | ||
|
ec469a117d | ||
|
190d82ddd8 | ||
|
7a879980d8 | ||
|
2adc61c215 | ||
|
d06dc22999 | ||
|
9e3f04c5eb | ||
|
afc4d08aad | ||
|
897e2def34 | ||
|
edc5d8dd92 | ||
|
d6b2a9d534 | ||
|
dc97bd6b92 | ||
|
60c6eb2610 | ||
|
9133505952 | ||
|
2741bf00e4 | ||
|
4eca00a666 | ||
|
c6804122cb | ||
|
189cbe25fe | ||
|
0632febfba | ||
|
67f51ded37 | ||
|
14666f9be3 | ||
|
2f101367db | ||
|
06cac68db7 | ||
|
141179736e | ||
|
184c156e09 | ||
|
cf9ea4af10 | ||
|
30fed916fa | ||
|
384e2ff399 | ||
|
091883b8eb | ||
|
cb6438e472 | ||
|
1cdcc0da57 | ||
|
6472fea0fd | ||
|
dec92f60c6 | ||
|
060ec6b68c | ||
|
bf125d5da6 | ||
|
dd22140b57 | ||
|
d01ab205a9 | ||
|
e66e9d34ac | ||
|
0acdfd6a37 | ||
|
b252062767 | ||
|
dd6c1ac19a |
2
contrib/libpqxx
vendored
2
contrib/libpqxx
vendored
@ -1 +1 @@
|
||||
Subproject commit c995193a3a14d71f4711f1f421f65a1a1db64640
|
||||
Subproject commit 41e4c331564167cca97ad6eccbd5b8879c2ca044
|
@ -1,9 +1,9 @@
|
||||
set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/libpqxx")
|
||||
|
||||
set (SRCS
|
||||
"${LIBRARY_DIR}/src/strconv.cxx"
|
||||
"${LIBRARY_DIR}/src/array.cxx"
|
||||
"${LIBRARY_DIR}/src/binarystring.cxx"
|
||||
"${LIBRARY_DIR}/src/blob.cxx"
|
||||
"${LIBRARY_DIR}/src/connection.cxx"
|
||||
"${LIBRARY_DIR}/src/cursor.cxx"
|
||||
"${LIBRARY_DIR}/src/encodings.cxx"
|
||||
@ -12,59 +12,25 @@ set (SRCS
|
||||
"${LIBRARY_DIR}/src/field.cxx"
|
||||
"${LIBRARY_DIR}/src/largeobject.cxx"
|
||||
"${LIBRARY_DIR}/src/notification.cxx"
|
||||
"${LIBRARY_DIR}/src/params.cxx"
|
||||
"${LIBRARY_DIR}/src/pipeline.cxx"
|
||||
"${LIBRARY_DIR}/src/result.cxx"
|
||||
"${LIBRARY_DIR}/src/robusttransaction.cxx"
|
||||
"${LIBRARY_DIR}/src/row.cxx"
|
||||
"${LIBRARY_DIR}/src/sql_cursor.cxx"
|
||||
"${LIBRARY_DIR}/src/strconv.cxx"
|
||||
"${LIBRARY_DIR}/src/stream_from.cxx"
|
||||
"${LIBRARY_DIR}/src/stream_to.cxx"
|
||||
"${LIBRARY_DIR}/src/subtransaction.cxx"
|
||||
"${LIBRARY_DIR}/src/time.cxx"
|
||||
"${LIBRARY_DIR}/src/transaction.cxx"
|
||||
"${LIBRARY_DIR}/src/transaction_base.cxx"
|
||||
"${LIBRARY_DIR}/src/row.cxx"
|
||||
"${LIBRARY_DIR}/src/params.cxx"
|
||||
"${LIBRARY_DIR}/src/util.cxx"
|
||||
"${LIBRARY_DIR}/src/version.cxx"
|
||||
"${LIBRARY_DIR}/src/wait.cxx"
|
||||
)
|
||||
|
||||
# Need to explicitly include each header file, because in the directory include/pqxx there are also files
|
||||
# like just 'array'. So if including the whole directory with `target_include_directories`, it will make
|
||||
# conflicts with all includes of <array>.
|
||||
set (HDRS
|
||||
"${LIBRARY_DIR}/include/pqxx/array.hxx"
|
||||
"${LIBRARY_DIR}/include/pqxx/params.hxx"
|
||||
"${LIBRARY_DIR}/include/pqxx/binarystring.hxx"
|
||||
"${LIBRARY_DIR}/include/pqxx/composite.hxx"
|
||||
"${LIBRARY_DIR}/include/pqxx/connection.hxx"
|
||||
"${LIBRARY_DIR}/include/pqxx/cursor.hxx"
|
||||
"${LIBRARY_DIR}/include/pqxx/dbtransaction.hxx"
|
||||
"${LIBRARY_DIR}/include/pqxx/errorhandler.hxx"
|
||||
"${LIBRARY_DIR}/include/pqxx/except.hxx"
|
||||
"${LIBRARY_DIR}/include/pqxx/field.hxx"
|
||||
"${LIBRARY_DIR}/include/pqxx/isolation.hxx"
|
||||
"${LIBRARY_DIR}/include/pqxx/largeobject.hxx"
|
||||
"${LIBRARY_DIR}/include/pqxx/nontransaction.hxx"
|
||||
"${LIBRARY_DIR}/include/pqxx/notification.hxx"
|
||||
"${LIBRARY_DIR}/include/pqxx/pipeline.hxx"
|
||||
"${LIBRARY_DIR}/include/pqxx/prepared_statement.hxx"
|
||||
"${LIBRARY_DIR}/include/pqxx/result.hxx"
|
||||
"${LIBRARY_DIR}/include/pqxx/robusttransaction.hxx"
|
||||
"${LIBRARY_DIR}/include/pqxx/row.hxx"
|
||||
"${LIBRARY_DIR}/include/pqxx/separated_list.hxx"
|
||||
"${LIBRARY_DIR}/include/pqxx/strconv.hxx"
|
||||
"${LIBRARY_DIR}/include/pqxx/stream_from.hxx"
|
||||
"${LIBRARY_DIR}/include/pqxx/stream_to.hxx"
|
||||
"${LIBRARY_DIR}/include/pqxx/subtransaction.hxx"
|
||||
"${LIBRARY_DIR}/include/pqxx/transaction.hxx"
|
||||
"${LIBRARY_DIR}/include/pqxx/transaction_base.hxx"
|
||||
"${LIBRARY_DIR}/include/pqxx/types.hxx"
|
||||
"${LIBRARY_DIR}/include/pqxx/util.hxx"
|
||||
"${LIBRARY_DIR}/include/pqxx/version.hxx"
|
||||
"${LIBRARY_DIR}/include/pqxx/zview.hxx"
|
||||
)
|
||||
|
||||
add_library(_libpqxx ${SRCS} ${HDRS})
|
||||
|
||||
add_library(_libpqxx ${SRCS})
|
||||
target_link_libraries(_libpqxx PUBLIC ch_contrib::libpq)
|
||||
target_include_directories (_libpqxx SYSTEM BEFORE PUBLIC "${LIBRARY_DIR}/include")
|
||||
|
||||
|
@ -737,6 +737,14 @@ Number of sessions (connections) to ZooKeeper. Should be no more than one, becau
|
||||
|
||||
Number of watches (event subscriptions) in ZooKeeper.
|
||||
|
||||
### ConcurrencyControlAcquired
|
||||
|
||||
Total number of acquired CPU slots.
|
||||
|
||||
### ConcurrencyControlSoftLimit
|
||||
|
||||
Value of soft limit on number of CPU slots.
|
||||
|
||||
**See Also**
|
||||
|
||||
- [system.asynchronous_metrics](../../operations/system-tables/asynchronous_metrics.md#system_tables-asynchronous_metrics) — Contains periodically calculated metrics.
|
||||
|
@ -1631,6 +1631,7 @@ try
|
||||
concurrent_threads_soft_limit = value;
|
||||
}
|
||||
ConcurrencyControl::instance().setMaxConcurrency(concurrent_threads_soft_limit);
|
||||
LOG_INFO(log, "ConcurrencyControl limit is set to {}", concurrent_threads_soft_limit);
|
||||
|
||||
global_context->getProcessList().setMaxSize(new_server_settings.max_concurrent_queries);
|
||||
global_context->getProcessList().setMaxInsertQueriesAmount(new_server_settings.max_concurrent_insert_queries);
|
||||
|
@ -537,6 +537,7 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, Iden
|
||||
PullingAsyncPipelineExecutor executor(io.pipeline);
|
||||
io.pipeline.setProgressCallback(context->getProgressCallback());
|
||||
io.pipeline.setProcessListElement(context->getProcessListElement());
|
||||
io.pipeline.setConcurrencyControl(context->getSettingsRef().use_concurrency_control);
|
||||
|
||||
Block block;
|
||||
|
||||
|
@ -1701,6 +1701,9 @@ try
|
||||
QueryPipeline pipeline(std::move(pipe));
|
||||
PullingAsyncPipelineExecutor executor(pipeline);
|
||||
|
||||
/// Concurrency control in client is not required
|
||||
pipeline.setConcurrencyControl(false);
|
||||
|
||||
if (need_render_progress)
|
||||
{
|
||||
pipeline.setProgressCallback([this](const Progress & progress){ onProgress(progress); });
|
||||
|
@ -240,6 +240,7 @@ void LocalConnection::sendQuery(
|
||||
{
|
||||
state->block = state->io.pipeline.getHeader();
|
||||
state->executor = std::make_unique<PullingAsyncPipelineExecutor>(state->io.pipeline);
|
||||
state->io.pipeline.setConcurrencyControl(false);
|
||||
}
|
||||
else if (state->io.pipeline.completed())
|
||||
{
|
||||
|
@ -1,7 +1,23 @@
|
||||
#include <Common/ISlotControl.h>
|
||||
#include <Common/ConcurrencyControl.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event ConcurrencyControlGrantedHard;
|
||||
extern const Event ConcurrencyControlGrantDelayed;
|
||||
extern const Event ConcurrencyControlAcquiredTotal;
|
||||
extern const Event ConcurrencyControlAllocationDelayed;
|
||||
}
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
extern const Metric ConcurrencyControlAcquired;
|
||||
extern const Metric ConcurrencyControlSoftLimit;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -17,6 +33,7 @@ ConcurrencyControl::Slot::~Slot()
|
||||
|
||||
ConcurrencyControl::Slot::Slot(SlotAllocationPtr && allocation_)
|
||||
: allocation(std::move(allocation_))
|
||||
, acquired_slot_increment(CurrentMetrics::ConcurrencyControlAcquired)
|
||||
{
|
||||
}
|
||||
|
||||
@ -34,6 +51,7 @@ ConcurrencyControl::Allocation::~Allocation()
|
||||
{
|
||||
if (granted.compare_exchange_strong(value, value - 1))
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::ConcurrencyControlAcquiredTotal, 1);
|
||||
std::unique_lock lock{mutex};
|
||||
return AcquiredSlotPtr(new Slot(shared_from_this())); // can't use std::make_shared due to private ctor
|
||||
}
|
||||
@ -84,6 +102,7 @@ void ConcurrencyControl::Allocation::release()
|
||||
|
||||
ConcurrencyControl::ConcurrencyControl()
|
||||
: cur_waiter(waiters.end())
|
||||
, max_concurrency_metric(CurrentMetrics::ConcurrencyControlSoftLimit, 0)
|
||||
{
|
||||
}
|
||||
|
||||
@ -103,11 +122,16 @@ ConcurrencyControl::~ConcurrencyControl()
|
||||
// Acquire as many slots as we can, but not lower than `min`
|
||||
SlotCount granted = std::max(min, std::min(max, available(lock)));
|
||||
cur_concurrency += granted;
|
||||
ProfileEvents::increment(ProfileEvents::ConcurrencyControlGrantedHard, min);
|
||||
|
||||
// Create allocation and start waiting if more slots are required
|
||||
if (granted < max)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::ConcurrencyControlGrantDelayed, max - granted);
|
||||
ProfileEvents::increment(ProfileEvents::ConcurrencyControlAllocationDelayed);
|
||||
return SlotAllocationPtr(new Allocation(*this, max, granted,
|
||||
waiters.insert(cur_waiter, nullptr /* pointer is set by Allocation ctor */)));
|
||||
}
|
||||
else
|
||||
return SlotAllocationPtr(new Allocation(*this, max, granted));
|
||||
}
|
||||
@ -116,6 +140,7 @@ void ConcurrencyControl::setMaxConcurrency(SlotCount value)
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
max_concurrency = std::max<SlotCount>(1, value); // never allow max_concurrency to be zero
|
||||
max_concurrency_metric.changeTo(max_concurrency == UnlimitedSlots ? 0 : max_concurrency);
|
||||
schedule(lock);
|
||||
}
|
||||
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include <base/types.h>
|
||||
#include <boost/core/noncopyable.hpp>
|
||||
|
||||
#include <Common/CurrentMetrics.h>
|
||||
#include <Common/ISlotControl.h>
|
||||
|
||||
namespace DB
|
||||
@ -53,6 +54,7 @@ public:
|
||||
explicit Slot(SlotAllocationPtr && allocation_);
|
||||
|
||||
SlotAllocationPtr allocation;
|
||||
CurrentMetrics::Increment acquired_slot_increment;
|
||||
};
|
||||
|
||||
// Manages group of slots for a single query, see ConcurrencyControl::allocate(min, max)
|
||||
@ -131,6 +133,7 @@ private:
|
||||
Waiters::iterator cur_waiter; // round-robin pointer
|
||||
SlotCount max_concurrency = UnlimitedSlots;
|
||||
SlotCount cur_concurrency = 0;
|
||||
CurrentMetrics::Increment max_concurrency_metric;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -313,6 +313,9 @@
|
||||
M(FilteringMarksWithPrimaryKey, "Number of threads currently doing filtering of mark ranges by the primary key") \
|
||||
M(FilteringMarksWithSecondaryKeys, "Number of threads currently doing filtering of mark ranges by secondary keys") \
|
||||
\
|
||||
M(ConcurrencyControlAcquired, "Total number of acquired CPU slots") \
|
||||
M(ConcurrencyControlSoftLimit, "Value of soft limit on number of CPU slots") \
|
||||
\
|
||||
M(DiskS3NoSuchKeyErrors, "The number of `NoSuchKey` errors that occur when reading data from S3 cloud storage through ClickHouse disks.") \
|
||||
|
||||
#ifdef APPLY_FOR_EXTERNAL_METRICS
|
||||
|
@ -73,4 +73,44 @@ public:
|
||||
[[nodiscard]] virtual SlotAllocationPtr allocate(SlotCount min, SlotCount max) = 0;
|
||||
};
|
||||
|
||||
/// Allocation that grants all the slots immediately on creation
|
||||
class GrantedAllocation : public ISlotAllocation
|
||||
{
|
||||
public:
|
||||
explicit GrantedAllocation(SlotCount granted_)
|
||||
: granted(granted_)
|
||||
, allocated(granted_)
|
||||
{}
|
||||
|
||||
[[nodiscard]] AcquiredSlotPtr tryAcquire() override
|
||||
{
|
||||
SlotCount value = granted.load();
|
||||
while (value)
|
||||
{
|
||||
if (granted.compare_exchange_strong(value, value - 1))
|
||||
return std::make_shared<IAcquiredSlot>();
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
SlotCount grantedCount() const override
|
||||
{
|
||||
return granted.load();
|
||||
}
|
||||
|
||||
SlotCount allocatedCount() const override
|
||||
{
|
||||
return allocated;
|
||||
}
|
||||
|
||||
private:
|
||||
std::atomic<SlotCount> granted; // allocated, but not yet acquired
|
||||
const SlotCount allocated;
|
||||
};
|
||||
|
||||
[[nodiscard]] inline SlotAllocationPtr grantSlots(SlotCount count)
|
||||
{
|
||||
return SlotAllocationPtr(new GrantedAllocation(count));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -826,6 +826,11 @@ The server successfully detected this situation and will download merged part fr
|
||||
M(ReadWriteBufferFromHTTPRequestsSent, "Number of HTTP requests sent by ReadWriteBufferFromHTTP") \
|
||||
M(ReadWriteBufferFromHTTPBytes, "Total size of payload bytes received and sent by ReadWriteBufferFromHTTP. Doesn't include HTTP headers.") \
|
||||
\
|
||||
M(ConcurrencyControlGrantedHard, "Number of CPU slot granted according to guarantee of 1 thread per query and for queries with setting 'use_concurrency_control' = 0") \
|
||||
M(ConcurrencyControlGrantDelayed, "Number of CPU slot not granted initially and required to wait for a free CPU slot") \
|
||||
M(ConcurrencyControlAcquiredTotal, "Total number of CPU slot acquired") \
|
||||
M(ConcurrencyControlAllocationDelayed, "Total number of CPU slot allocations (queries) that were required to wait for slots to upscale") \
|
||||
\
|
||||
M(GWPAsanAllocateSuccess, "Number of successful allocations done by GWPAsan") \
|
||||
M(GWPAsanAllocateFailed, "Number of failed allocations done by GWPAsan (i.e. filled pool)") \
|
||||
M(GWPAsanFree, "Number of free operations done by GWPAsan") \
|
||||
|
@ -151,6 +151,15 @@ Names NamesAndTypesList::getNames() const
|
||||
return res;
|
||||
}
|
||||
|
||||
NameSet NamesAndTypesList::getNameSet() const
|
||||
{
|
||||
NameSet res;
|
||||
res.reserve(size());
|
||||
for (const NameAndTypePair & column : *this)
|
||||
res.insert(column.name);
|
||||
return res;
|
||||
}
|
||||
|
||||
DataTypes NamesAndTypesList::getTypes() const
|
||||
{
|
||||
DataTypes res;
|
||||
|
@ -100,6 +100,7 @@ public:
|
||||
void getDifference(const NamesAndTypesList & rhs, NamesAndTypesList & deleted, NamesAndTypesList & added) const;
|
||||
|
||||
Names getNames() const;
|
||||
NameSet getNameSet() const;
|
||||
DataTypes getTypes() const;
|
||||
|
||||
/// Remove columns which names are not in the `names`.
|
||||
|
@ -666,6 +666,7 @@ void CacheDictionary<dictionary_key_type>::update(CacheDictionaryUpdateUnitPtr<d
|
||||
Columns fetched_columns_during_update = fetch_request.makeAttributesResultColumnsNonMutable();
|
||||
|
||||
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
||||
pipeline.setConcurrencyControl(false);
|
||||
Block block;
|
||||
while (executor.pull(block))
|
||||
{
|
||||
|
@ -334,6 +334,7 @@ public:
|
||||
, pipeline(std::move(pipeline_))
|
||||
, executor(pipeline)
|
||||
{
|
||||
pipeline.setConcurrencyControl(false);
|
||||
}
|
||||
|
||||
std::string getName() const override
|
||||
@ -378,7 +379,6 @@ Pipe DirectDictionary<dictionary_key_type>::getSourcePipe(
|
||||
ids.emplace_back(key);
|
||||
|
||||
auto pipeline = source_ptr->loadIds(ids);
|
||||
|
||||
if (use_async_executor)
|
||||
pipe = Pipe(std::make_shared<SourceFromQueryPipeline<PullingAsyncPipelineExecutor>>(std::move(pipeline)));
|
||||
else
|
||||
|
@ -454,6 +454,7 @@ void FlatDictionary::updateData()
|
||||
{
|
||||
QueryPipeline pipeline(source_ptr->loadUpdatedAll());
|
||||
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
||||
pipeline.setConcurrencyControl(false);
|
||||
update_field_loaded_block.reset();
|
||||
Block block;
|
||||
|
||||
@ -495,6 +496,7 @@ void FlatDictionary::loadData()
|
||||
{
|
||||
QueryPipeline pipeline(source_ptr->loadAll());
|
||||
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
||||
pipeline.setConcurrencyControl(false);
|
||||
|
||||
Block block;
|
||||
while (executor.pull(block))
|
||||
|
@ -475,6 +475,7 @@ void HashedArrayDictionary<dictionary_key_type, sharded>::updateData()
|
||||
{
|
||||
QueryPipeline pipeline(source_ptr->loadUpdatedAll());
|
||||
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
||||
pipeline.setConcurrencyControl(false);
|
||||
update_field_loaded_block.reset();
|
||||
Block block;
|
||||
|
||||
@ -973,6 +974,7 @@ void HashedArrayDictionary<dictionary_key_type, sharded>::loadData()
|
||||
|
||||
QueryPipeline pipeline(source_ptr->loadAll());
|
||||
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
||||
pipeline.setConcurrencyControl(false);
|
||||
|
||||
UInt64 pull_time_microseconds = 0;
|
||||
UInt64 process_time_microseconds = 0;
|
||||
|
@ -884,6 +884,7 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::updateData()
|
||||
{
|
||||
QueryPipeline pipeline(source_ptr->loadUpdatedAll());
|
||||
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
||||
pipeline.setConcurrencyControl(false);
|
||||
update_field_loaded_block.reset();
|
||||
Block block;
|
||||
|
||||
@ -1163,6 +1164,7 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::loadData()
|
||||
QueryPipeline pipeline(source_ptr->loadAll());
|
||||
|
||||
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
||||
pipeline.setConcurrencyControl(false);
|
||||
Block block;
|
||||
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder;
|
||||
|
||||
|
@ -407,6 +407,7 @@ void IPAddressDictionary::loadData()
|
||||
bool has_ipv6 = false;
|
||||
|
||||
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
||||
pipeline.setConcurrencyControl(false);
|
||||
Block block;
|
||||
while (executor.pull(block))
|
||||
{
|
||||
|
@ -291,6 +291,7 @@ void IPolygonDictionary::loadData()
|
||||
QueryPipeline pipeline(source_ptr->loadAll());
|
||||
|
||||
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
||||
pipeline.setConcurrencyControl(false);
|
||||
Block block;
|
||||
while (executor.pull(block))
|
||||
blockToAttributes(block);
|
||||
|
@ -541,6 +541,7 @@ void RangeHashedDictionary<dictionary_key_type>::loadData()
|
||||
{
|
||||
QueryPipeline pipeline(source_ptr->loadAll());
|
||||
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
||||
pipeline.setConcurrencyControl(false);
|
||||
Block block;
|
||||
|
||||
while (executor.pull(block))
|
||||
@ -692,6 +693,7 @@ void RangeHashedDictionary<dictionary_key_type>::updateData()
|
||||
{
|
||||
QueryPipeline pipeline(source_ptr->loadUpdatedAll());
|
||||
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
||||
pipeline.setConcurrencyControl(false);
|
||||
update_field_loaded_block.reset();
|
||||
Block block;
|
||||
|
||||
|
@ -314,6 +314,7 @@ void RegExpTreeDictionary::loadData()
|
||||
{
|
||||
QueryPipeline pipeline(source_ptr->loadAll());
|
||||
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
|
||||
pipeline.setConcurrencyControl(false);
|
||||
|
||||
Block block;
|
||||
while (executor.pull(block))
|
||||
|
@ -193,6 +193,7 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr
|
||||
|
||||
PullingAsyncPipelineExecutor executor(io.pipeline);
|
||||
io.pipeline.setProgressCallback(data.getContext()->getProgressCallback());
|
||||
io.pipeline.setConcurrencyControl(data.getContext()->getSettingsRef().use_concurrency_control);
|
||||
while (block.rows() == 0 && executor.pull(block))
|
||||
{
|
||||
}
|
||||
|
@ -77,7 +77,6 @@
|
||||
#include <IO/Operators.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
|
||||
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
|
||||
#include <Processors/QueryPlan/QueryPlan.h>
|
||||
#include <Parsers/formatAST.h>
|
||||
#include <Parsers/QueryParameterVisitor.h>
|
||||
|
@ -338,11 +338,8 @@ size_t HashJoin::getTotalRowCount() const
|
||||
return res;
|
||||
}
|
||||
|
||||
size_t HashJoin::getTotalByteCount() const
|
||||
void HashJoin::doDebugAsserts() const
|
||||
{
|
||||
if (!data)
|
||||
return 0;
|
||||
|
||||
#ifndef NDEBUG
|
||||
size_t debug_blocks_allocated_size = 0;
|
||||
for (const auto & block : data->blocks)
|
||||
@ -360,6 +357,14 @@ size_t HashJoin::getTotalByteCount() const
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "data->blocks_nullmaps_allocated_size != debug_blocks_nullmaps_allocated_size ({} != {})",
|
||||
data->blocks_nullmaps_allocated_size, debug_blocks_nullmaps_allocated_size);
|
||||
#endif
|
||||
}
|
||||
|
||||
size_t HashJoin::getTotalByteCount() const
|
||||
{
|
||||
if (!data)
|
||||
return 0;
|
||||
|
||||
doDebugAsserts();
|
||||
|
||||
size_t res = 0;
|
||||
|
||||
@ -544,9 +549,11 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
|
||||
have_compressed = true;
|
||||
}
|
||||
|
||||
doDebugAsserts();
|
||||
data->blocks_allocated_size += block_to_save.allocatedBytes();
|
||||
data->blocks.emplace_back(std::move(block_to_save));
|
||||
Block * stored_block = &data->blocks.back();
|
||||
doDebugAsserts();
|
||||
|
||||
if (rows)
|
||||
data->empty = false;
|
||||
@ -634,9 +641,11 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
|
||||
|
||||
if (!flag_per_row && !is_inserted)
|
||||
{
|
||||
doDebugAsserts();
|
||||
LOG_TRACE(log, "Skipping inserting block with {} rows", rows);
|
||||
data->blocks_allocated_size -= stored_block->allocatedBytes();
|
||||
data->blocks.pop_back();
|
||||
doDebugAsserts();
|
||||
}
|
||||
|
||||
if (!check_limits)
|
||||
@ -683,6 +692,8 @@ void HashJoin::shrinkStoredBlocksToFit(size_t & total_bytes_in_join, bool force_
|
||||
|
||||
for (auto & stored_block : data->blocks)
|
||||
{
|
||||
doDebugAsserts();
|
||||
|
||||
size_t old_size = stored_block.allocatedBytes();
|
||||
stored_block = stored_block.shrinkToFit();
|
||||
size_t new_size = stored_block.allocatedBytes();
|
||||
@ -700,6 +711,8 @@ void HashJoin::shrinkStoredBlocksToFit(size_t & total_bytes_in_join, bool force_
|
||||
else
|
||||
/// Sometimes after clone resized block can be bigger than original
|
||||
data->blocks_allocated_size += new_size - old_size;
|
||||
|
||||
doDebugAsserts();
|
||||
}
|
||||
|
||||
auto new_total_bytes_in_join = getTotalByteCount();
|
||||
@ -1416,7 +1429,13 @@ void HashJoin::tryRerangeRightTableDataImpl(Map & map [[maybe_unused]])
|
||||
};
|
||||
BlocksList sorted_blocks;
|
||||
visit_rows_map(sorted_blocks, map);
|
||||
doDebugAsserts();
|
||||
data->blocks.swap(sorted_blocks);
|
||||
size_t new_blocks_allocated_size = 0;
|
||||
for (const auto & block : data->blocks)
|
||||
new_blocks_allocated_size += block.allocatedBytes();
|
||||
data->blocks_allocated_size = new_blocks_allocated_size;
|
||||
doDebugAsserts();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -470,6 +470,7 @@ private:
|
||||
void tryRerangeRightTableData() override;
|
||||
template <JoinKind KIND, typename Map, JoinStrictness STRICTNESS>
|
||||
void tryRerangeRightTableDataImpl(Map & map);
|
||||
void doDebugAsserts() const;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -26,6 +26,8 @@
|
||||
#include <Analyzer/TableFunctionNode.h>
|
||||
#include <Analyzer/Utils.h>
|
||||
|
||||
#include <Core/Settings.h>
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/QueryLog.h>
|
||||
|
||||
@ -249,6 +251,8 @@ QueryPipelineBuilder InterpreterSelectQueryAnalyzer::buildQueryPipeline()
|
||||
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(context);
|
||||
auto build_pipeline_settings = BuildQueryPipelineSettings::fromContext(context);
|
||||
|
||||
query_plan.setConcurrencyControl(context->getSettingsRef().use_concurrency_control);
|
||||
|
||||
return std::move(*query_plan.buildQueryPipeline(optimization_settings, build_pipeline_settings));
|
||||
}
|
||||
|
||||
|
@ -208,6 +208,7 @@ bool isStorageTouchedByMutations(
|
||||
}
|
||||
|
||||
PullingAsyncPipelineExecutor executor(io.pipeline);
|
||||
io.pipeline.setConcurrencyControl(context->getSettingsRef().use_concurrency_control);
|
||||
|
||||
Block block;
|
||||
while (block.rows() == 0 && executor.pull(block));
|
||||
@ -1286,6 +1287,10 @@ void MutationsInterpreter::Source::read(
|
||||
|
||||
void MutationsInterpreter::initQueryPlan(Stage & first_stage, QueryPlan & plan)
|
||||
{
|
||||
// Mutations are not using concurrency control now. Queries, merges and mutations running together could lead to CPU overcommit.
|
||||
// TODO(serxa): Enable concurrency control for mutation queries and mutations. This should be done after CPU scheduler introduction.
|
||||
plan.setConcurrencyControl(false);
|
||||
|
||||
source.read(first_stage, plan, metadata_snapshot, context, settings.apply_deleted_mask, settings.can_execute);
|
||||
addCreatingSetsStep(plan, first_stage.analyzer->getPreparedSets(), context);
|
||||
}
|
||||
|
@ -722,7 +722,14 @@ void ASTFunction::formatImplWithoutAlias(const FormatSettings & settings, Format
|
||||
assert_cast<const ASTFunction *>(argument.get())->arguments->children[0]->formatImpl(settings, state, nested_dont_need_parens);
|
||||
settings.ostr << (settings.hilite ? hilite_operator : "") << " = " << (settings.hilite ? hilite_none : "");
|
||||
}
|
||||
if (!secret_arguments.replacement.empty())
|
||||
{
|
||||
settings.ostr << "'" << secret_arguments.replacement << "'";
|
||||
}
|
||||
else
|
||||
{
|
||||
settings.ostr << "'[HIDDEN]'";
|
||||
}
|
||||
if (size <= secret_arguments.start + secret_arguments.count && !secret_arguments.are_named)
|
||||
break; /// All other arguments should also be hidden.
|
||||
continue;
|
||||
|
@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/KnownObjectNames.h>
|
||||
#include <Common/re2.h>
|
||||
#include <Core/QualifiedTableName.h>
|
||||
#include <base/defines.h>
|
||||
#include <boost/algorithm/string/predicate.hpp>
|
||||
@ -49,6 +50,11 @@ public:
|
||||
bool are_named = false; /// Arguments like `password = 'password'` are considered as named arguments.
|
||||
/// E.g. "headers" in `url('..', headers('foo' = '[HIDDEN]'))`
|
||||
std::vector<std::string> nested_maps;
|
||||
/// Full replacement of an argument. Only supported when count is 1, otherwise all arguments will be replaced with this string.
|
||||
/// It's needed in cases when we don't want to hide the entire parameter, but some part of it, e.g. "connection_string" in
|
||||
/// `azureBlobStorage('DefaultEndpointsProtocol=https;AccountKey=secretkey;...', ...)` should be replaced with
|
||||
/// `azureBlobStorage('DefaultEndpointsProtocol=https;AccountKey=[HIDDEN];...', ...)`.
|
||||
std::string replacement;
|
||||
|
||||
bool hasSecrets() const
|
||||
{
|
||||
@ -74,6 +80,7 @@ protected:
|
||||
result.are_named = argument_is_named;
|
||||
}
|
||||
chassert(index >= result.start); /// We always check arguments consecutively
|
||||
chassert(result.replacement.empty()); /// We shouldn't use replacement with masking other arguments
|
||||
result.count = index + 1 - result.start;
|
||||
if (!argument_is_named)
|
||||
result.are_named = false;
|
||||
@ -199,32 +206,39 @@ protected:
|
||||
|
||||
void findAzureBlobStorageFunctionSecretArguments(bool is_cluster_function)
|
||||
{
|
||||
/// azureBlobStorage('cluster_name', 'conn_string/storage_account_url', ...) has 'conn_string/storage_account_url' as its second argument.
|
||||
/// azureBlobStorageCluster('cluster_name', 'conn_string/storage_account_url', ...) has 'conn_string/storage_account_url' as its second argument.
|
||||
size_t url_arg_idx = is_cluster_function ? 1 : 0;
|
||||
|
||||
if (!is_cluster_function && isNamedCollectionName(0))
|
||||
{
|
||||
/// azureBlobStorage(named_collection, ..., account_key = 'account_key', ...)
|
||||
if (maskAzureConnectionString(-1, true, 1))
|
||||
return;
|
||||
findSecretNamedArgument("account_key", 1);
|
||||
return;
|
||||
}
|
||||
else if (is_cluster_function && isNamedCollectionName(1))
|
||||
{
|
||||
/// azureBlobStorageCluster(cluster, named_collection, ..., account_key = 'account_key', ...)
|
||||
if (maskAzureConnectionString(-1, true, 2))
|
||||
return;
|
||||
findSecretNamedArgument("account_key", 2);
|
||||
return;
|
||||
}
|
||||
|
||||
/// We should check other arguments first because we don't need to do any replacement in case storage_account_url is not used
|
||||
/// azureBlobStorage(connection_string|storage_account_url, container_name, blobpath, account_name, account_key, format, compression, structure)
|
||||
/// azureBlobStorageCluster(cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure])
|
||||
if (maskAzureConnectionString(url_arg_idx))
|
||||
return;
|
||||
|
||||
/// We should check other arguments first because we don't need to do any replacement in case of
|
||||
/// azureBlobStorage(connection_string|storage_account_url, container_name, blobpath, format, [account_name, account_key, ...])
|
||||
/// azureBlobStorageCluster(cluster, connection_string|storage_account_url, container_name, blobpath, format, [account_name, account_key, ...])
|
||||
size_t count = function->arguments->size();
|
||||
if ((url_arg_idx + 4 <= count) && (count <= url_arg_idx + 7))
|
||||
{
|
||||
String second_arg;
|
||||
if (tryGetStringFromArgument(url_arg_idx + 3, &second_arg))
|
||||
String fourth_arg;
|
||||
if (tryGetStringFromArgument(url_arg_idx + 3, &fourth_arg))
|
||||
{
|
||||
if (second_arg == "auto" || KnownFormatNames::instance().exists(second_arg))
|
||||
if (fourth_arg == "auto" || KnownFormatNames::instance().exists(fourth_arg))
|
||||
return; /// The argument after 'url' is a format: s3('url', 'format', ...)
|
||||
}
|
||||
}
|
||||
@ -234,6 +248,40 @@ protected:
|
||||
markSecretArgument(url_arg_idx + 4);
|
||||
}
|
||||
|
||||
bool maskAzureConnectionString(ssize_t url_arg_idx, bool argument_is_named = false, size_t start = 0)
|
||||
{
|
||||
String url_arg;
|
||||
if (argument_is_named)
|
||||
{
|
||||
url_arg_idx = findNamedArgument(&url_arg, "connection_string", start);
|
||||
if (url_arg_idx == -1 || url_arg.empty())
|
||||
url_arg_idx = findNamedArgument(&url_arg, "storage_account_url", start);
|
||||
if (url_arg_idx == -1 || url_arg.empty())
|
||||
return false;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!tryGetStringFromArgument(url_arg_idx, &url_arg))
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!url_arg.starts_with("http"))
|
||||
{
|
||||
static re2::RE2 account_key_pattern = "AccountKey=.*?(;|$)";
|
||||
if (RE2::Replace(&url_arg, account_key_pattern, "AccountKey=[HIDDEN]\\1"))
|
||||
{
|
||||
chassert(result.count == 0); /// We shouldn't use replacement with masking other arguments
|
||||
result.start = url_arg_idx;
|
||||
result.are_named = argument_is_named;
|
||||
result.count = 1;
|
||||
result.replacement = url_arg;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
void findURLSecretArguments()
|
||||
{
|
||||
if (!isNamedCollectionName(0))
|
||||
@ -513,8 +561,9 @@ protected:
|
||||
return function->arguments->at(arg_idx)->isIdentifier();
|
||||
}
|
||||
|
||||
/// Looks for a secret argument with a specified name. This function looks for arguments in format `key=value` where the key is specified.
|
||||
void findSecretNamedArgument(const std::string_view & key, size_t start = 0)
|
||||
/// Looks for an argument with a specified name. This function looks for arguments in format `key=value` where the key is specified.
|
||||
/// Returns -1 if no argument was found.
|
||||
ssize_t findNamedArgument(String * res, const std::string_view & key, size_t start = 0)
|
||||
{
|
||||
for (size_t i = start; i < function->arguments->size(); ++i)
|
||||
{
|
||||
@ -531,9 +580,23 @@ protected:
|
||||
continue;
|
||||
|
||||
if (found_key == key)
|
||||
markSecretArgument(i, /* argument_is_named= */ true);
|
||||
{
|
||||
tryGetStringFromArgument(*equals_func->arguments->at(1), res);
|
||||
return i;
|
||||
}
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
/// Looks for a secret argument with a specified name. This function looks for arguments in format `key=value` where the key is specified.
|
||||
/// If the argument is found, it is marked as a secret.
|
||||
void findSecretNamedArgument(const std::string_view & key, size_t start = 0)
|
||||
{
|
||||
ssize_t arg_idx = findNamedArgument(nullptr, key, start);
|
||||
if (arg_idx >= 0)
|
||||
markSecretArgument(arg_idx, /* argument_is_named= */ true);
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -1,4 +1,5 @@
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include "Common/ISlotControl.h"
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
@ -342,12 +343,22 @@ void PipelineExecutor::initializeExecution(size_t num_threads, bool concurrency_
|
||||
is_execution_initialized = true;
|
||||
tryUpdateExecutionStatus(ExecutionStatus::NotStarted, ExecutionStatus::Executing);
|
||||
|
||||
size_t use_threads = num_threads;
|
||||
|
||||
if (concurrency_control)
|
||||
{
|
||||
/// Allocate CPU slots from concurrency control
|
||||
size_t min_threads = concurrency_control ? 1uz : num_threads;
|
||||
constexpr size_t min_threads = 1uz; // Number of threads that should be granted to every query no matter how many threads are already running in other queries
|
||||
cpu_slots = ConcurrencyControl::instance().allocate(min_threads, num_threads);
|
||||
use_threads = cpu_slots->grantedCount();
|
||||
#ifndef NDEBUG
|
||||
LOG_TEST(log, "Allocate CPU slots. min: {}, max: {}, granted: {}", min_threads, num_threads, cpu_slots->grantedCount());
|
||||
#endif
|
||||
}
|
||||
else
|
||||
{
|
||||
/// If concurrency control is not used we should not even count threads as competing.
|
||||
/// To avoid counting them in ConcurrencyControl, we create dummy slot allocation.
|
||||
cpu_slots = grantSlots(num_threads);
|
||||
}
|
||||
size_t use_threads = cpu_slots->grantedCount();
|
||||
|
||||
Queue queue;
|
||||
graph->initializeExecution(queue);
|
||||
|
@ -199,6 +199,7 @@ QueryPipelineBuilderPtr QueryPlan::buildQueryPipeline(
|
||||
last_pipeline->setProgressCallback(build_pipeline_settings.progress_callback);
|
||||
last_pipeline->setProcessListElement(build_pipeline_settings.process_list_element);
|
||||
last_pipeline->addResources(std::move(resources));
|
||||
last_pipeline->setConcurrencyControl(getConcurrencyControl());
|
||||
|
||||
return last_pipeline;
|
||||
}
|
||||
|
@ -300,7 +300,8 @@ QueryPipelineBuilder QueryPipelineBuilder::unitePipelines(
|
||||
/// It may happen if max_distributed_connections > max_threads
|
||||
max_threads_limit = std::max(pipeline.max_threads, max_threads_limit);
|
||||
|
||||
concurrency_control = pipeline.getConcurrencyControl();
|
||||
// Use concurrency control if at least one of pipelines is using it
|
||||
concurrency_control = concurrency_control || pipeline.getConcurrencyControl();
|
||||
}
|
||||
|
||||
QueryPipelineBuilder pipeline;
|
||||
@ -311,8 +312,8 @@ QueryPipelineBuilder QueryPipelineBuilder::unitePipelines(
|
||||
{
|
||||
pipeline.setMaxThreads(max_threads);
|
||||
pipeline.limitMaxThreads(max_threads_limit);
|
||||
pipeline.setConcurrencyControl(concurrency_control);
|
||||
}
|
||||
pipeline.setConcurrencyControl(concurrency_control);
|
||||
|
||||
pipeline.setCollectedProcessors(nullptr);
|
||||
return pipeline;
|
||||
|
@ -1232,6 +1232,7 @@ namespace
|
||||
if (io.pipeline.pulling())
|
||||
{
|
||||
auto executor = std::make_shared<PullingAsyncPipelineExecutor>(io.pipeline);
|
||||
io.pipeline.setConcurrencyControl(query_context->getSettingsRef().use_concurrency_control);
|
||||
auto check_for_cancel = [&]
|
||||
{
|
||||
if (isQueryCancelled())
|
||||
|
@ -1086,6 +1086,7 @@ void TCPHandler::processOrdinaryQuery()
|
||||
|
||||
{
|
||||
PullingAsyncPipelineExecutor executor(pipeline);
|
||||
pipeline.setConcurrencyControl(query_context->getSettingsRef().use_concurrency_control);
|
||||
CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread};
|
||||
|
||||
/// The following may happen:
|
||||
|
@ -431,6 +431,7 @@ void StorageLiveView::writeBlock(StorageLiveView & live_view, Block && block, Ch
|
||||
|
||||
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
|
||||
PullingAsyncPipelineExecutor executor(pipeline);
|
||||
pipeline.setConcurrencyControl(local_context->getSettingsRef().use_concurrency_control);
|
||||
Block this_block;
|
||||
|
||||
while (executor.pull(this_block))
|
||||
@ -567,6 +568,7 @@ MergeableBlocksPtr StorageLiveView::collectMergeableBlocks(ContextPtr local_cont
|
||||
|
||||
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
|
||||
PullingAsyncPipelineExecutor executor(pipeline);
|
||||
pipeline.setConcurrencyControl(local_context->getSettingsRef().use_concurrency_control);
|
||||
Block this_block;
|
||||
|
||||
while (executor.pull(this_block))
|
||||
@ -672,6 +674,7 @@ bool StorageLiveView::getNewBlocks(const std::lock_guard<std::mutex> & lock)
|
||||
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
|
||||
|
||||
PullingAsyncPipelineExecutor executor(pipeline);
|
||||
pipeline.setConcurrencyControl(false);
|
||||
Block block;
|
||||
while (executor.pull(block))
|
||||
{
|
||||
|
@ -1727,6 +1727,10 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const
|
||||
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context);
|
||||
auto builder = merge_parts_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings);
|
||||
|
||||
// Merges are not using concurrency control now. Queries and merges running together could lead to CPU overcommit.
|
||||
// TODO(serxa): Enable concurrency control for merges. This should be done after CPU scheduler introduction.
|
||||
builder->setConcurrencyControl(false);
|
||||
|
||||
global_ctx->merged_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
|
||||
}
|
||||
|
||||
|
@ -2057,6 +2057,7 @@ bool MutateTask::prepare()
|
||||
context_for_reading->setSetting("apply_mutations_on_fly", false);
|
||||
/// Skip using large sets in KeyCondition
|
||||
context_for_reading->setSetting("use_index_for_in_with_subqueries_max_values", 100000);
|
||||
context_for_reading->setSetting("use_concurrency_control", false);
|
||||
|
||||
for (const auto & command : *ctx->commands)
|
||||
if (!canSkipMutationCommandForPart(ctx->source_part, command, context_for_reading))
|
||||
|
@ -223,7 +223,7 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
|
||||
{
|
||||
account_name = fourth_arg;
|
||||
account_key = checkAndGetLiteralArgument<String>(engine_args[4], "account_key");
|
||||
auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format/account_name");
|
||||
auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format/structure");
|
||||
if (is_format_arg(sixth_arg))
|
||||
{
|
||||
format = sixth_arg;
|
||||
@ -257,10 +257,10 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
|
||||
}
|
||||
else if (with_structure && engine_args.size() == 8)
|
||||
{
|
||||
auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "format/account_name");
|
||||
auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "account_name");
|
||||
account_name = fourth_arg;
|
||||
account_key = checkAndGetLiteralArgument<String>(engine_args[4], "account_key");
|
||||
auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format/account_name");
|
||||
auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format");
|
||||
if (!is_format_arg(sixth_arg))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown format {}", sixth_arg);
|
||||
format = sixth_arg;
|
||||
|
@ -131,7 +131,7 @@ std::shared_ptr<StorageObjectStorageSource::IIterator> StorageObjectStorageSourc
|
||||
else
|
||||
{
|
||||
ConfigurationPtr copy_configuration = configuration->clone();
|
||||
auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
|
||||
auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, local_context);
|
||||
if (filter_dag)
|
||||
{
|
||||
auto keys = configuration->getPaths();
|
||||
@ -142,7 +142,7 @@ std::shared_ptr<StorageObjectStorageSource::IIterator> StorageObjectStorageSourc
|
||||
|
||||
VirtualColumnUtils::buildSetsForDAG(*filter_dag, local_context);
|
||||
auto actions = std::make_shared<ExpressionActions>(std::move(*filter_dag));
|
||||
VirtualColumnUtils::filterByPathOrFile(keys, paths, actions, virtual_columns);
|
||||
VirtualColumnUtils::filterByPathOrFile(keys, paths, actions, virtual_columns, local_context);
|
||||
copy_configuration->setPaths(keys);
|
||||
}
|
||||
|
||||
@ -489,6 +489,7 @@ StorageObjectStorageSource::GlobIterator::GlobIterator(
|
||||
, virtual_columns(virtual_columns_)
|
||||
, throw_on_zero_files_match(throw_on_zero_files_match_)
|
||||
, read_keys(read_keys_)
|
||||
, local_context(context_)
|
||||
, file_progress_callback(file_progress_callback_)
|
||||
{
|
||||
if (configuration->isNamespaceWithGlobs())
|
||||
@ -510,7 +511,7 @@ StorageObjectStorageSource::GlobIterator::GlobIterator(
|
||||
}
|
||||
|
||||
recursive = key_with_globs == "/**";
|
||||
if (auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns))
|
||||
if (auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, local_context))
|
||||
{
|
||||
VirtualColumnUtils::buildSetsForDAG(*filter_dag, getContext());
|
||||
filter_expr = std::make_shared<ExpressionActions>(std::move(*filter_dag));
|
||||
@ -585,7 +586,7 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::GlobIterator::ne
|
||||
for (const auto & object_info : new_batch)
|
||||
paths.push_back(getUniqueStoragePathIdentifier(*configuration, *object_info, false));
|
||||
|
||||
VirtualColumnUtils::filterByPathOrFile(new_batch, paths, filter_expr, virtual_columns);
|
||||
VirtualColumnUtils::filterByPathOrFile(new_batch, paths, filter_expr, virtual_columns, local_context);
|
||||
|
||||
LOG_TEST(logger, "Filtered files: {} -> {}", paths.size(), new_batch.size());
|
||||
}
|
||||
|
@ -220,6 +220,7 @@ private:
|
||||
bool is_finished = false;
|
||||
bool first_iteration = true;
|
||||
std::mutex next_mutex;
|
||||
const ContextPtr local_context;
|
||||
|
||||
std::function<void(FileProgress)> file_progress_callback;
|
||||
};
|
||||
|
@ -1141,13 +1141,13 @@ StorageFileSource::FilesIterator::FilesIterator(
|
||||
{
|
||||
std::optional<ActionsDAG> filter_dag;
|
||||
if (!distributed_processing && !archive_info && !files.empty())
|
||||
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
|
||||
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, context_);
|
||||
|
||||
if (filter_dag)
|
||||
{
|
||||
VirtualColumnUtils::buildSetsForDAG(*filter_dag, context_);
|
||||
auto actions = std::make_shared<ExpressionActions>(std::move(*filter_dag));
|
||||
VirtualColumnUtils::filterByPathOrFile(files, files, actions, virtual_columns);
|
||||
VirtualColumnUtils::filterByPathOrFile(files, files, actions, virtual_columns, context_);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -227,7 +227,7 @@ public:
|
||||
|
||||
std::optional<ActionsDAG> filter_dag;
|
||||
if (!uris.empty())
|
||||
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
|
||||
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, context);
|
||||
|
||||
if (filter_dag)
|
||||
{
|
||||
@ -238,7 +238,7 @@ public:
|
||||
|
||||
VirtualColumnUtils::buildSetsForDAG(*filter_dag, context);
|
||||
auto actions = std::make_shared<ExpressionActions>(std::move(*filter_dag));
|
||||
VirtualColumnUtils::filterByPathOrFile(uris, paths, actions, virtual_columns);
|
||||
VirtualColumnUtils::filterByPathOrFile(uris, paths, actions, virtual_columns, context);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
#include <memory>
|
||||
#include <stack>
|
||||
#include <unordered_set>
|
||||
#include <Core/NamesAndTypes.h>
|
||||
#include <Core/TypeId.h>
|
||||
|
||||
@ -46,6 +47,7 @@
|
||||
#include "Functions/IFunction.h"
|
||||
#include "Functions/IFunctionAdaptors.h"
|
||||
#include "Functions/indexHint.h"
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <Interpreters/convertFieldToType.h>
|
||||
#include <Parsers/makeASTForLogicalFunction.h>
|
||||
#include <Columns/ColumnSet.h>
|
||||
@ -124,9 +126,18 @@ void filterBlockWithExpression(const ExpressionActionsPtr & actions, Block & blo
|
||||
}
|
||||
}
|
||||
|
||||
NamesAndTypesList getCommonVirtualsForFileLikeStorage()
|
||||
{
|
||||
return {{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
|
||||
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
|
||||
{"_size", makeNullable(std::make_shared<DataTypeUInt64>())},
|
||||
{"_time", makeNullable(std::make_shared<DataTypeDateTime>())},
|
||||
{"_etag", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
|
||||
}
|
||||
|
||||
NameSet getVirtualNamesForFileLikeStorage()
|
||||
{
|
||||
return {"_path", "_file", "_size", "_time", "_etag"};
|
||||
return getCommonVirtualsForFileLikeStorage().getNameSet();
|
||||
}
|
||||
|
||||
std::unordered_map<std::string, std::string> parseHivePartitioningKeysAndValues(const String & path)
|
||||
@ -154,8 +165,10 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(ColumnsDescription & sto
|
||||
{
|
||||
VirtualColumnsDescription desc;
|
||||
|
||||
auto add_virtual = [&](const auto & name, const auto & type)
|
||||
auto add_virtual = [&](const NameAndTypePair & pair)
|
||||
{
|
||||
const auto & name = pair.getNameInStorage();
|
||||
const auto & type = pair.getTypeInStorage();
|
||||
if (storage_columns.has(name))
|
||||
{
|
||||
if (!context->getSettingsRef().use_hive_partitioning)
|
||||
@ -172,11 +185,8 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(ColumnsDescription & sto
|
||||
desc.addEphemeral(name, type, "");
|
||||
};
|
||||
|
||||
add_virtual("_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()));
|
||||
add_virtual("_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()));
|
||||
add_virtual("_size", makeNullable(std::make_shared<DataTypeUInt64>()));
|
||||
add_virtual("_time", makeNullable(std::make_shared<DataTypeDateTime>()));
|
||||
add_virtual("_etag", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()));
|
||||
for (const auto & item : getCommonVirtualsForFileLikeStorage())
|
||||
add_virtual(item);
|
||||
|
||||
if (context->getSettingsRef().use_hive_partitioning)
|
||||
{
|
||||
@ -188,16 +198,16 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(ColumnsDescription & sto
|
||||
if (type == nullptr)
|
||||
type = std::make_shared<DataTypeString>();
|
||||
if (type->canBeInsideLowCardinality())
|
||||
add_virtual(item.first, std::make_shared<DataTypeLowCardinality>(type));
|
||||
add_virtual({item.first, std::make_shared<DataTypeLowCardinality>(type)});
|
||||
else
|
||||
add_virtual(item.first, type);
|
||||
add_virtual({item.first, type});
|
||||
}
|
||||
}
|
||||
|
||||
return desc;
|
||||
}
|
||||
|
||||
static void addPathAndFileToVirtualColumns(Block & block, const String & path, size_t idx)
|
||||
static void addPathAndFileToVirtualColumns(Block & block, const String & path, size_t idx, const FormatSettings & format_settings, bool use_hive_partitioning)
|
||||
{
|
||||
if (block.has("_path"))
|
||||
block.getByName("_path").column->assumeMutableRef().insert(path);
|
||||
@ -214,18 +224,34 @@ static void addPathAndFileToVirtualColumns(Block & block, const String & path, s
|
||||
block.getByName("_file").column->assumeMutableRef().insert(file);
|
||||
}
|
||||
|
||||
if (use_hive_partitioning)
|
||||
{
|
||||
auto keys_and_values = parseHivePartitioningKeysAndValues(path);
|
||||
for (const auto & [key, value] : keys_and_values)
|
||||
{
|
||||
if (const auto * column = block.findByName(key))
|
||||
{
|
||||
ReadBufferFromString buf(value);
|
||||
column->type->getDefaultSerialization()->deserializeWholeText(column->column->assumeMutableRef(), buf, format_settings);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
block.getByName("_idx").column->assumeMutableRef().insert(idx);
|
||||
}
|
||||
|
||||
std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns)
|
||||
std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
|
||||
{
|
||||
if (!predicate || virtual_columns.empty())
|
||||
return {};
|
||||
|
||||
Block block;
|
||||
NameSet common_virtuals;
|
||||
if (context->getSettingsRef().use_hive_partitioning)
|
||||
common_virtuals = getVirtualNamesForFileLikeStorage();
|
||||
for (const auto & column : virtual_columns)
|
||||
{
|
||||
if (column.name == "_file" || column.name == "_path")
|
||||
if (column.name == "_file" || column.name == "_path" || !common_virtuals.contains(column.name))
|
||||
block.insert({column.type->createColumn(), column.type, column.name});
|
||||
}
|
||||
|
||||
@ -233,18 +259,19 @@ std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * pr
|
||||
return splitFilterDagForAllowedInputs(predicate, &block);
|
||||
}
|
||||
|
||||
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns)
|
||||
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
|
||||
{
|
||||
Block block;
|
||||
NameSet common_virtuals = getVirtualNamesForFileLikeStorage();
|
||||
for (const auto & column : virtual_columns)
|
||||
{
|
||||
if (column.name == "_file" || column.name == "_path")
|
||||
if (column.name == "_file" || column.name == "_path" || !common_virtuals.contains(column.name))
|
||||
block.insert({column.type->createColumn(), column.type, column.name});
|
||||
}
|
||||
block.insert({ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "_idx"});
|
||||
|
||||
for (size_t i = 0; i != paths.size(); ++i)
|
||||
addPathAndFileToVirtualColumns(block, paths[i], i);
|
||||
addPathAndFileToVirtualColumns(block, paths[i], i, getFormatSettings(context), context->getSettingsRef().use_hive_partitioning);
|
||||
|
||||
filterBlockWithExpression(actions, block);
|
||||
|
||||
|
@ -75,14 +75,14 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(
|
||||
const std::string & sample_path = "",
|
||||
std::optional<FormatSettings> format_settings_ = std::nullopt);
|
||||
|
||||
std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns);
|
||||
std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
|
||||
|
||||
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns);
|
||||
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
|
||||
|
||||
template <typename T>
|
||||
void filterByPathOrFile(std::vector<T> & sources, const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns)
|
||||
void filterByPathOrFile(std::vector<T> & sources, const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
|
||||
{
|
||||
auto indexes_column = getFilterByPathAndFileIndexes(paths, actions, virtual_columns);
|
||||
auto indexes_column = getFilterByPathAndFileIndexes(paths, actions, virtual_columns, context);
|
||||
const auto & indexes = typeid_cast<const ColumnUInt64 &>(*indexes_column).getData();
|
||||
if (indexes.size() == sources.size())
|
||||
return;
|
||||
|
@ -646,6 +646,7 @@ std::pair<BlocksPtr, Block> StorageWindowView::getNewBlocks(UInt32 watermark)
|
||||
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
|
||||
|
||||
PullingAsyncPipelineExecutor executor(pipeline);
|
||||
pipeline.setConcurrencyControl(getContext()->getSettingsRef().use_concurrency_control);
|
||||
Block block;
|
||||
BlocksPtr new_blocks = std::make_shared<Blocks>();
|
||||
|
||||
|
@ -112,6 +112,7 @@ Block TableFunctionFormat::parseData(const ColumnsDescription & columns, const S
|
||||
});
|
||||
}
|
||||
|
||||
builder.setConcurrencyControl(context->getSettingsRef().use_concurrency_control);
|
||||
auto pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
|
||||
auto reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
|
||||
|
||||
|
@ -139,13 +139,20 @@ def main():
|
||||
jr = JobReport.load(from_file=report_file)
|
||||
additional_files.append(report_file)
|
||||
for file in set(jr.additional_files):
|
||||
file_ = Path(file)
|
||||
file_name = file_.name
|
||||
orig_file = Path(file)
|
||||
file_name = orig_file.name
|
||||
file_name = file_name.replace(
|
||||
".", "__" + CI.Utils.normalize_string(job_id) + ".", 1
|
||||
)
|
||||
file_ = file_.rename(file_.parent / file_name)
|
||||
additional_files.append(file_)
|
||||
new_file = orig_file.rename(orig_file.parent / file_name)
|
||||
for tr in test_results:
|
||||
if tr.log_files is None:
|
||||
continue
|
||||
tr.log_files = [
|
||||
new_file if (Path(log_file) == orig_file) else Path(log_file)
|
||||
for log_file in tr.log_files
|
||||
]
|
||||
additional_files.append(new_file)
|
||||
|
||||
JobReport(
|
||||
description=description,
|
||||
|
@ -559,7 +559,7 @@ class CI:
|
||||
JobNames.BUGFIX_VALIDATE: JobConfig(
|
||||
run_by_label="pr-bugfix",
|
||||
run_command="bugfix_validate_check.py",
|
||||
timeout=900,
|
||||
timeout=2400,
|
||||
runner_type=Runners.STYLE_CHECKER,
|
||||
),
|
||||
}
|
||||
|
@ -1,7 +1,7 @@
|
||||
import logging
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Sequence, Union
|
||||
import os
|
||||
import logging
|
||||
|
||||
from env_helper import (
|
||||
GITHUB_JOB_URL,
|
||||
@ -12,6 +12,8 @@ from env_helper import (
|
||||
from report import TestResults, create_test_html_report
|
||||
from s3_helper import S3Helper
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def process_logs(
|
||||
s3_client: S3Helper,
|
||||
@ -19,7 +21,7 @@ def process_logs(
|
||||
s3_path_prefix: str,
|
||||
test_results: TestResults,
|
||||
) -> List[str]:
|
||||
logging.info("Upload files to s3 %s", additional_logs)
|
||||
logger.info("Upload files to s3 %s", additional_logs)
|
||||
|
||||
processed_logs = {} # type: Dict[str, str]
|
||||
# Firstly convert paths of logs from test_results to urls to s3.
|
||||
@ -33,9 +35,19 @@ def process_logs(
|
||||
if path in processed_logs:
|
||||
test_result.log_urls.append(processed_logs[str(path)])
|
||||
elif path:
|
||||
try:
|
||||
url = s3_client.upload_test_report_to_s3(
|
||||
Path(path), s3_path_prefix + "/" + str(path)
|
||||
)
|
||||
except FileNotFoundError:
|
||||
# Breaking the whole run on the malformed test is a bad idea
|
||||
# FIXME: report the failure
|
||||
logger.error(
|
||||
"A broken TestResult, file '%s' does not exist: %s",
|
||||
path,
|
||||
test_result,
|
||||
)
|
||||
continue
|
||||
test_result.log_urls.append(url)
|
||||
processed_logs[str(path)] = url
|
||||
|
||||
@ -48,7 +60,7 @@ def process_logs(
|
||||
)
|
||||
)
|
||||
else:
|
||||
logging.error("File %s is missing - skip", log_path)
|
||||
logger.error("File %s is missing - skip", log_path)
|
||||
|
||||
return additional_urls
|
||||
|
||||
@ -116,8 +128,8 @@ def upload_results(
|
||||
report_path.write_text(html_report, encoding="utf-8")
|
||||
url = s3_client.upload_test_report_to_s3(report_path, s3_path_prefix + ".html")
|
||||
else:
|
||||
logging.info("report.html was prepared by test job itself")
|
||||
logger.info("report.html was prepared by test job itself")
|
||||
url = ready_report_url
|
||||
|
||||
logging.info("Search result in url %s", url)
|
||||
logger.info("Search result in url %s", url)
|
||||
return url
|
||||
|
@ -1,4 +1,5 @@
|
||||
<clickhouse>
|
||||
<concurrent_threads_soft_limit_ratio_to_cores>0</concurrent_threads_soft_limit_ratio_to_cores>
|
||||
<concurrent_threads_soft_limit_num>50</concurrent_threads_soft_limit_num>
|
||||
<query_log>
|
||||
<database>system</database>
|
||||
|
@ -28,6 +28,16 @@ node4 = cluster.add_instance(
|
||||
)
|
||||
|
||||
|
||||
def assert_profile_event(node, query_id, profile_event, check):
|
||||
assert check(
|
||||
int(
|
||||
node.query(
|
||||
f"select ProfileEvents['{profile_event}'] from system.query_log where current_database = currentDatabase() and query_id = '{query_id}' and type = 'QueryFinish' order by query_start_time_microseconds desc limit 1"
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
@ -43,39 +53,176 @@ def test_concurrent_threads_soft_limit_default(started_cluster):
|
||||
query_id="test_concurrent_threads_soft_limit_1",
|
||||
)
|
||||
node1.query("SYSTEM FLUSH LOGS")
|
||||
assert_profile_event(
|
||||
node1,
|
||||
"test_concurrent_threads_soft_limit_1",
|
||||
"ConcurrencyControlGrantedHard",
|
||||
lambda x: x == 1,
|
||||
)
|
||||
assert_profile_event(
|
||||
node1,
|
||||
"test_concurrent_threads_soft_limit_1",
|
||||
"ConcurrencyControlGrantDelayed",
|
||||
lambda x: x == 0,
|
||||
)
|
||||
assert_profile_event(
|
||||
node1,
|
||||
"test_concurrent_threads_soft_limit_1",
|
||||
"ConcurrencyControlAcquiredTotal",
|
||||
lambda x: x == 100,
|
||||
)
|
||||
assert_profile_event(
|
||||
node1,
|
||||
"test_concurrent_threads_soft_limit_1",
|
||||
"ConcurrencyControlAllocationDelayed",
|
||||
lambda x: x == 0,
|
||||
)
|
||||
assert (
|
||||
node1.query(
|
||||
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_1'"
|
||||
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_1' order by query_start_time_microseconds desc limit 1"
|
||||
)
|
||||
== "102\n"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="broken test")
|
||||
def test_use_concurrency_control_default(started_cluster):
|
||||
node1.query(
|
||||
"SELECT count(*) FROM numbers_mt(10000000) SETTINGS use_concurrency_control = 0",
|
||||
query_id="test_use_concurrency_control",
|
||||
)
|
||||
|
||||
# Concurrency control is not used, all metrics should be zeros
|
||||
node1.query("SYSTEM FLUSH LOGS")
|
||||
assert_profile_event(
|
||||
node1,
|
||||
"test_use_concurrency_control",
|
||||
"ConcurrencyControlGrantedHard",
|
||||
lambda x: x == 0,
|
||||
)
|
||||
assert_profile_event(
|
||||
node1,
|
||||
"test_use_concurrency_control",
|
||||
"ConcurrencyControlGrantDelayed",
|
||||
lambda x: x == 0,
|
||||
)
|
||||
assert_profile_event(
|
||||
node1,
|
||||
"test_use_concurrency_control",
|
||||
"ConcurrencyControlAcquiredTotal",
|
||||
lambda x: x == 0,
|
||||
)
|
||||
assert_profile_event(
|
||||
node1,
|
||||
"test_use_concurrency_control",
|
||||
"ConcurrencyControlAllocationDelayed",
|
||||
lambda x: x == 0,
|
||||
)
|
||||
|
||||
|
||||
def test_concurrent_threads_soft_limit_defined_50(started_cluster):
|
||||
node2.query(
|
||||
"SELECT count(*) FROM numbers_mt(10000000)",
|
||||
query_id="test_concurrent_threads_soft_limit_2",
|
||||
)
|
||||
node2.query("SYSTEM FLUSH LOGS")
|
||||
assert_profile_event(
|
||||
node2,
|
||||
"test_concurrent_threads_soft_limit_2",
|
||||
"ConcurrencyControlGrantedHard",
|
||||
lambda x: x == 1,
|
||||
)
|
||||
assert_profile_event(
|
||||
node2,
|
||||
"test_concurrent_threads_soft_limit_2",
|
||||
"ConcurrencyControlGrantDelayed",
|
||||
lambda x: x == 50,
|
||||
)
|
||||
assert_profile_event(
|
||||
node2,
|
||||
"test_concurrent_threads_soft_limit_2",
|
||||
"ConcurrencyControlAcquiredTotal",
|
||||
lambda x: x == 50,
|
||||
)
|
||||
assert_profile_event(
|
||||
node2,
|
||||
"test_concurrent_threads_soft_limit_2",
|
||||
"ConcurrencyControlAllocationDelayed",
|
||||
lambda x: x == 1,
|
||||
)
|
||||
assert (
|
||||
node2.query(
|
||||
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_2'"
|
||||
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_2' order by query_start_time_microseconds desc limit 1"
|
||||
)
|
||||
== "52\n"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="broken test")
|
||||
def test_use_concurrency_control_soft_limit_defined_50(started_cluster):
|
||||
node2.query(
|
||||
"SELECT count(*) FROM numbers_mt(10000000) SETTINGS use_concurrency_control = 0",
|
||||
query_id="test_use_concurrency_control_2",
|
||||
)
|
||||
# Concurrency control is not used, all metrics should be zeros
|
||||
node2.query("SYSTEM FLUSH LOGS")
|
||||
assert_profile_event(
|
||||
node2,
|
||||
"test_use_concurrency_control_2",
|
||||
"ConcurrencyControlGrantedHard",
|
||||
lambda x: x == 0,
|
||||
)
|
||||
assert_profile_event(
|
||||
node2,
|
||||
"test_use_concurrency_control_2",
|
||||
"ConcurrencyControlGrantDelayed",
|
||||
lambda x: x == 0,
|
||||
)
|
||||
assert_profile_event(
|
||||
node2,
|
||||
"test_use_concurrency_control_2",
|
||||
"ConcurrencyControlAcquiredTotal",
|
||||
lambda x: x == 0,
|
||||
)
|
||||
assert_profile_event(
|
||||
node2,
|
||||
"test_use_concurrency_control_2",
|
||||
"ConcurrencyControlAllocationDelayed",
|
||||
lambda x: x == 0,
|
||||
)
|
||||
|
||||
|
||||
def test_concurrent_threads_soft_limit_defined_1(started_cluster):
|
||||
node3.query(
|
||||
"SELECT count(*) FROM numbers_mt(10000000)",
|
||||
query_id="test_concurrent_threads_soft_limit_3",
|
||||
)
|
||||
node3.query("SYSTEM FLUSH LOGS")
|
||||
assert_profile_event(
|
||||
node3,
|
||||
"test_concurrent_threads_soft_limit_3",
|
||||
"ConcurrencyControlGrantedHard",
|
||||
lambda x: x == 1,
|
||||
)
|
||||
assert_profile_event(
|
||||
node3,
|
||||
"test_concurrent_threads_soft_limit_3",
|
||||
"ConcurrencyControlGrantDelayed",
|
||||
lambda x: x == 99,
|
||||
)
|
||||
assert_profile_event(
|
||||
node3,
|
||||
"test_concurrent_threads_soft_limit_3",
|
||||
"ConcurrencyControlAcquiredTotal",
|
||||
lambda x: x == 1,
|
||||
)
|
||||
assert_profile_event(
|
||||
node3,
|
||||
"test_concurrent_threads_soft_limit_3",
|
||||
"ConcurrencyControlAllocationDelayed",
|
||||
lambda x: x == 1,
|
||||
)
|
||||
assert (
|
||||
node3.query(
|
||||
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_3'"
|
||||
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_3' order by query_start_time_microseconds desc limit 1"
|
||||
)
|
||||
== "3\n"
|
||||
)
|
||||
@ -84,7 +231,6 @@ def test_concurrent_threads_soft_limit_defined_1(started_cluster):
|
||||
# In config_limit_reached.xml there is concurrent_threads_soft_limit=10
|
||||
# Background query starts in a separate thread to reach this limit.
|
||||
# When this limit is reached the foreground query gets less than 5 queries despite the fact that it has settings max_threads=5
|
||||
@pytest.mark.skip(reason="broken test")
|
||||
def test_concurrent_threads_soft_limit_limit_reached(started_cluster):
|
||||
def background_query():
|
||||
try:
|
||||
@ -117,8 +263,32 @@ def test_concurrent_threads_soft_limit_limit_reached(started_cluster):
|
||||
)
|
||||
|
||||
node4.query("SYSTEM FLUSH LOGS")
|
||||
assert_profile_event(
|
||||
node4,
|
||||
"test_concurrent_threads_soft_limit_4",
|
||||
"ConcurrencyControlGrantedHard",
|
||||
lambda x: x == 1,
|
||||
)
|
||||
assert_profile_event(
|
||||
node4,
|
||||
"test_concurrent_threads_soft_limit_4",
|
||||
"ConcurrencyControlGrantDelayed",
|
||||
lambda x: x > 0,
|
||||
)
|
||||
assert_profile_event(
|
||||
node4,
|
||||
"test_concurrent_threads_soft_limit_4",
|
||||
"ConcurrencyControlAcquiredTotal",
|
||||
lambda x: x < 5,
|
||||
)
|
||||
assert_profile_event(
|
||||
node4,
|
||||
"test_concurrent_threads_soft_limit_4",
|
||||
"ConcurrencyControlAllocationDelayed",
|
||||
lambda x: x == 1,
|
||||
)
|
||||
s_count = node4.query(
|
||||
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_4'"
|
||||
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_4' order by query_start_time_microseconds desc limit 1"
|
||||
).strip()
|
||||
if s_count:
|
||||
count = int(s_count)
|
||||
|
@ -1,5 +1,6 @@
|
||||
import pytest
|
||||
import random, string
|
||||
import re
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.test_tools import TSV
|
||||
|
||||
@ -336,6 +337,10 @@ def test_create_database():
|
||||
def test_table_functions():
|
||||
password = new_password()
|
||||
azure_conn_string = cluster.env_variables["AZURITE_CONNECTION_STRING"]
|
||||
account_key_pattern = re.compile("AccountKey=.*?(;|$)")
|
||||
masked_azure_conn_string = re.sub(
|
||||
account_key_pattern, "AccountKey=[HIDDEN]\\1", azure_conn_string
|
||||
)
|
||||
azure_storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
|
||||
azure_account_name = "devstoreaccount1"
|
||||
azure_account_key = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
|
||||
@ -467,23 +472,23 @@ def test_table_functions():
|
||||
"CREATE TABLE tablefunc30 (x int) AS s3('http://minio1:9001/root/data/test9.csv.gz', 'NOSIGN', 'CSV')",
|
||||
"CREATE TABLE tablefunc31 (`x` int) AS s3('http://minio1:9001/root/data/test10.csv.gz', 'minio', '[HIDDEN]')",
|
||||
"CREATE TABLE tablefunc32 (`x` int) AS deltaLake('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')",
|
||||
f"CREATE TABLE tablefunc33 (x int) AS azureBlobStorage('{azure_conn_string}', 'cont', 'test_simple.csv', 'CSV')",
|
||||
f"CREATE TABLE tablefunc34 (x int) AS azureBlobStorage('{azure_conn_string}', 'cont', 'test_simple_1.csv', 'CSV', 'none')",
|
||||
f"CREATE TABLE tablefunc35 (x int) AS azureBlobStorage('{azure_conn_string}', 'cont', 'test_simple_2.csv', 'CSV', 'none', 'auto')",
|
||||
f"CREATE TABLE tablefunc33 (`x` int) AS azureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple.csv', 'CSV')",
|
||||
f"CREATE TABLE tablefunc34 (`x` int) AS azureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple_1.csv', 'CSV', 'none')",
|
||||
f"CREATE TABLE tablefunc35 (`x` int) AS azureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple_2.csv', 'CSV', 'none', 'auto')",
|
||||
f"CREATE TABLE tablefunc36 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_3.csv', '{azure_account_name}', '[HIDDEN]')",
|
||||
f"CREATE TABLE tablefunc37 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_4.csv', '{azure_account_name}', '[HIDDEN]', 'CSV')",
|
||||
f"CREATE TABLE tablefunc38 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_5.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none')",
|
||||
f"CREATE TABLE tablefunc39 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_6.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none', 'auto')",
|
||||
f"CREATE TABLE tablefunc40 (x int) AS azureBlobStorage(named_collection_2, connection_string = '{azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')",
|
||||
f"CREATE TABLE tablefunc40 (`x` int) AS azureBlobStorage(named_collection_2, connection_string = '{masked_azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')",
|
||||
f"CREATE TABLE tablefunc41 (`x` int) AS azureBlobStorage(named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')",
|
||||
f"CREATE TABLE tablefunc42 (x int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_conn_string}', 'cont', 'test_simple_9.csv', 'CSV')",
|
||||
f"CREATE TABLE tablefunc43 (x int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_conn_string}', 'cont', 'test_simple_10.csv', 'CSV', 'none')",
|
||||
f"CREATE TABLE tablefunc44 (x int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_conn_string}', 'cont', 'test_simple_11.csv', 'CSV', 'none', 'auto')",
|
||||
f"CREATE TABLE tablefunc42 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{masked_azure_conn_string}', 'cont', 'test_simple_9.csv', 'CSV')",
|
||||
f"CREATE TABLE tablefunc43 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{masked_azure_conn_string}', 'cont', 'test_simple_10.csv', 'CSV', 'none')",
|
||||
f"CREATE TABLE tablefunc44 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{masked_azure_conn_string}', 'cont', 'test_simple_11.csv', 'CSV', 'none', 'auto')",
|
||||
f"CREATE TABLE tablefunc45 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_12.csv', '{azure_account_name}', '[HIDDEN]')",
|
||||
f"CREATE TABLE tablefunc46 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_13.csv', '{azure_account_name}', '[HIDDEN]', 'CSV')",
|
||||
f"CREATE TABLE tablefunc47 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_14.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none')",
|
||||
f"CREATE TABLE tablefunc48 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_15.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none', 'auto')",
|
||||
f"CREATE TABLE tablefunc49 (x int) AS azureBlobStorageCluster('test_shard_localhost', named_collection_2, connection_string = '{azure_conn_string}', container = 'cont', blob_path = 'test_simple_16.csv', format = 'CSV')",
|
||||
f"CREATE TABLE tablefunc49 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', named_collection_2, connection_string = '{masked_azure_conn_string}', container = 'cont', blob_path = 'test_simple_16.csv', format = 'CSV')",
|
||||
f"CREATE TABLE tablefunc50 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_17.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')",
|
||||
"CREATE TABLE tablefunc51 (`x` int) AS iceberg('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')",
|
||||
],
|
||||
|
@ -3,6 +3,7 @@
|
||||
set log_queries=1;
|
||||
set log_query_threads=1;
|
||||
set max_threads=0;
|
||||
set use_concurrency_control=0;
|
||||
|
||||
WITH 01091 AS id SELECT 1;
|
||||
SYSTEM FLUSH LOGS;
|
||||
|
@ -6,6 +6,6 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
${CLICKHOUSE_CLIENT} --log_queries=1 --max_threads=32 --query_id "2015_${CLICKHOUSE_DATABASE}_query" -q "select count() from remote('127.0.0.{2,3}', numbers(10)) where number global in (select number % 5 from numbers_mt(1000000))"
|
||||
${CLICKHOUSE_CLIENT} --log_queries=1 --max_threads=32 --use_concurrency_control=0 --query_id "2015_${CLICKHOUSE_DATABASE}_query" -q "select count() from remote('127.0.0.{2,3}', numbers(10)) where number global in (select number % 5 from numbers_mt(1000000))"
|
||||
${CLICKHOUSE_CLIENT} -q "system flush logs"
|
||||
${CLICKHOUSE_CLIENT} -q "select length(thread_ids) >= 32 from system.query_log where event_date >= yesterday() and query_id = '2015_${CLICKHOUSE_DATABASE}_query' and type = 'QueryFinish' and current_database = currentDatabase()"
|
||||
|
@ -5,6 +5,9 @@
|
||||
-- enforce some defaults to be sure that the env settings will not affect the test
|
||||
SET max_threads=5, async_socket_for_remote=1, prefer_localhost_replica=1, optimize_read_in_order=1, load_marks_asynchronously=0, local_filesystem_read_method='pread', remote_filesystem_read_method='read', trace_profile_events=0;
|
||||
|
||||
-- we do not want concurrency control to limit the number of threads
|
||||
SET use_concurrency_control=0;
|
||||
|
||||
-- we use query_thread_log to check peak thread usage
|
||||
-- after https://github.com/ClickHouse/ClickHouse/issues/53417 there is a simpler way to check it
|
||||
-- but that will not allow to backport the test to older versions
|
||||
|
@ -1,21 +1,31 @@
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
%d: 123
|
||||
%d: -123
|
||||
%d: 0
|
||||
%d: 9223372036854775807
|
||||
%i: 123
|
||||
%u: 123
|
||||
%o: 173
|
||||
%x: 7b
|
||||
%X: 7B
|
||||
%f: 0.000000
|
||||
%f: 123.456000
|
||||
%f: -123.456000
|
||||
%F: 123.456000
|
||||
%e: 1.234560e+02
|
||||
%E: 1.234560E+02
|
||||
%g: 123.456
|
||||
%G: 123.456
|
||||
%a: 0x1.edd2f1a9fbe77p+6
|
||||
%A: 0X1.EDD2F1A9FBE77P+6
|
||||
%s: abc
|
||||
┌─printf('%%s: %s', '\n\t')─┐
|
||||
1. │ %s:
|
||||
│
|
||||
└───────────────────────────┘
|
||||
%s:
|
||||
%%: %
|
||||
%.5d: 00123
|
||||
%.2f: 123.46
|
||||
%.2e: 1.23e+02
|
||||
%.2g: 1.2e+02
|
||||
%.2s: ab
|
||||
|
@ -1,39 +1,47 @@
|
||||
-- Testing integer formats
|
||||
select printf('%%d: %d', 123) = '%d: 123';
|
||||
select printf('%%i: %i', 123) = '%i: 123';
|
||||
select printf('%%u: %u', 123) = '%u: 123';
|
||||
select printf('%%o: %o', 123) = '%o: 173';
|
||||
select printf('%%x: %x', 123) = '%x: 7b';
|
||||
select printf('%%X: %X', 123) = '%X: 7B';
|
||||
select printf('%%d: %d', 123);
|
||||
select printf('%%d: %d', -123);
|
||||
select printf('%%d: %d', 0);
|
||||
select printf('%%d: %d', 9223372036854775807);
|
||||
select printf('%%i: %i', 123);
|
||||
select printf('%%u: %u', 123);
|
||||
select printf('%%o: %o', 123);
|
||||
select printf('%%x: %x', 123);
|
||||
select printf('%%X: %X', 123);
|
||||
|
||||
-- Testing floating point formats
|
||||
select printf('%%f: %f', 123.456) = '%f: 123.456000';
|
||||
select printf('%%F: %F', 123.456) = '%F: 123.456000';
|
||||
select printf('%%e: %e', 123.456) = '%e: 1.234560e+02';
|
||||
select printf('%%E: %E', 123.456) = '%E: 1.234560E+02';
|
||||
select printf('%%g: %g', 123.456) = '%g: 123.456';
|
||||
select printf('%%G: %G', 123.456) = '%G: 123.456';
|
||||
select printf('%%a: %a', 123.456) = '%a: 0x1.edd2f1a9fbe77p+6';
|
||||
select printf('%%A: %A', 123.456) = '%A: 0X1.EDD2F1A9FBE77P+6';
|
||||
select printf('%%f: %f', 0.0);
|
||||
select printf('%%f: %f', 123.456);
|
||||
select printf('%%f: %f', -123.456);
|
||||
select printf('%%F: %F', 123.456);
|
||||
select printf('%%e: %e', 123.456);
|
||||
select printf('%%E: %E', 123.456);
|
||||
select printf('%%g: %g', 123.456);
|
||||
select printf('%%G: %G', 123.456);
|
||||
select printf('%%a: %a', 123.456);
|
||||
select printf('%%A: %A', 123.456);
|
||||
|
||||
-- Testing character formats
|
||||
select printf('%%s: %s', 'abc') = '%s: abc';
|
||||
select printf('%%s: %s', 'abc');
|
||||
SELECT printf('%%s: %s', '\n\t') FORMAT PrettyCompact;
|
||||
select printf('%%s: %s', '');
|
||||
|
||||
-- Testing the %% specifier
|
||||
select printf('%%%%: %%') = '%%: %';
|
||||
select printf('%%%%: %%');
|
||||
|
||||
-- Testing integer formats with precision
|
||||
select printf('%%.5d: %.5d', 123) = '%.5d: 00123';
|
||||
select printf('%%.5d: %.5d', 123);
|
||||
|
||||
-- Testing floating point formats with precision
|
||||
select printf('%%.2f: %.2f', 123.456) = '%.2f: 123.46';
|
||||
select printf('%%.2e: %.2e', 123.456) = '%.2e: 1.23e+02';
|
||||
select printf('%%.2g: %.2g', 123.456) = '%.2g: 1.2e+02';
|
||||
select printf('%%.2f: %.2f', 123.456);
|
||||
select printf('%%.2e: %.2e', 123.456);
|
||||
select printf('%%.2g: %.2g', 123.456);
|
||||
|
||||
-- Testing character formats with precision
|
||||
select printf('%%.2s: %.2s', 'abc') = '%.2s: ab';
|
||||
select printf('%%.2s: %.2s', 'abc');
|
||||
|
||||
select printf('%%X: %X', 123.123); -- { serverError BAD_ARGUMENTS }
|
||||
select printf('%%A: %A', 'abc'); -- { serverError BAD_ARGUMENTS }
|
||||
select printf('%%s: %s', 100); -- { serverError BAD_ARGUMENTS }
|
||||
select printf('%%n: %n', 100); -- { serverError BAD_ARGUMENTS }
|
||||
select printf('%%f: %f', 0); -- { serverError BAD_ARGUMENTS }
|
||||
|
@ -33,8 +33,8 @@ Cross Elizabeth
|
||||
[1,2,3] 42.42
|
||||
Array(Int64) LowCardinality(Float64)
|
||||
101
|
||||
2070
|
||||
2070
|
||||
2071
|
||||
2071
|
||||
b
|
||||
1
|
||||
1
|
||||
|
@ -0,0 +1,6 @@
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
72
tests/queries/0_stateless/03231_hive_partitioning_filtering.sh
Executable file
72
tests/queries/0_stateless/03231_hive_partitioning_filtering.sh
Executable file
@ -0,0 +1,72 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: no-fasttest
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
DATA_DIR=$USER_FILES_PATH/$CLICKHOUSE_TEST_UNIQUE_NAME
|
||||
mkdir -p $DATA_DIR
|
||||
cp -r $CURDIR/data_hive/ $DATA_DIR
|
||||
|
||||
$CLICKHOUSE_CLIENT --query_id="test_03231_1_$CLICKHOUSE_TEST_UNIQUE_NAME" --query "
|
||||
SELECT countDistinct(_path) FROM file('$DATA_DIR/data_hive/partitioning/column0=*/sample.parquet') WHERE column0 = 'Elizabeth' SETTINGS use_hive_partitioning=1, optimize_count_from_files=0;
|
||||
"
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "
|
||||
SYSTEM FLUSH LOGS;
|
||||
"
|
||||
|
||||
for _ in {1..5}; do
|
||||
count=$( $CLICKHOUSE_CLIENT --query "
|
||||
SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log
|
||||
WHERE query_id='test_03231_1_$CLICKHOUSE_TEST_UNIQUE_NAME' AND
|
||||
current_database = currentDatabase() and type='QueryFinish';" )
|
||||
if [[ "$count" == "1" ]]; then
|
||||
echo "1"
|
||||
break
|
||||
fi
|
||||
sleep 1
|
||||
done
|
||||
|
||||
$CLICKHOUSE_CLIENT --query_id="test_03231_2_$CLICKHOUSE_TEST_UNIQUE_NAME" --query "
|
||||
SELECT countDistinct(_path) FROM file('$DATA_DIR/data_hive/partitioning/identifier=*/email.csv') WHERE identifier = 2070 SETTINGS use_hive_partitioning=1, optimize_count_from_files=0;
|
||||
"
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "
|
||||
SYSTEM FLUSH LOGS;
|
||||
"
|
||||
|
||||
for _ in {1..5}; do
|
||||
count=$( $CLICKHOUSE_CLIENT --query "
|
||||
SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log
|
||||
WHERE query_id='test_03231_2_$CLICKHOUSE_TEST_UNIQUE_NAME' AND
|
||||
current_database = currentDatabase() and type='QueryFinish';" )
|
||||
if [[ "$count" == "1" ]]; then
|
||||
echo "1"
|
||||
break
|
||||
fi
|
||||
sleep 1
|
||||
done
|
||||
|
||||
$CLICKHOUSE_CLIENT --query_id="test_03231_3_$CLICKHOUSE_TEST_UNIQUE_NAME" --query "
|
||||
SELECT countDistinct(_path) FROM file('$DATA_DIR/data_hive/partitioning/array=*/sample.parquet') WHERE array = [1,2,3] SETTINGS use_hive_partitioning=1, optimize_count_from_files=0;
|
||||
"
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "
|
||||
SYSTEM FLUSH LOGS;
|
||||
"
|
||||
|
||||
for _ in {1..5}; do
|
||||
count=$( $CLICKHOUSE_CLIENT --query "
|
||||
SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log
|
||||
WHERE query_id='test_03231_3_$CLICKHOUSE_TEST_UNIQUE_NAME' AND
|
||||
current_database = currentDatabase() and type='QueryFinish';" )
|
||||
if [[ "$count" == "1" ]]; then
|
||||
echo "1"
|
||||
break
|
||||
fi
|
||||
sleep 1
|
||||
done
|
||||
|
||||
rm -rf $DATA_DIR
|
@ -1 +1 @@
|
||||
data_hive/partitioning/column0=Elizabeth/sample.parquet
|
||||
data_hive/partitioning/non_existing_column=Elizabeth/sample.parquet
|
||||
|
@ -5,4 +5,4 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
$CLICKHOUSE_LOCAL -q "SELECT substring(_path, position(_path, 'data_hive')) FROM file('$CURDIR/data_hive/partitioning/column0=*/sample.parquet') LIMIT 1;"
|
||||
$CLICKHOUSE_LOCAL -q "SELECT substring(_path, position(_path, 'data_hive')) FROM file('$CURDIR/data_hive/partitioning/non_existing_column=*/sample.parquet') LIMIT 1;"
|
||||
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -0,0 +1,5 @@
|
||||
_login_email,_identifier,_first_name,_last_name
|
||||
laura@example.com,2070,Laura,Grey
|
||||
craig@example.com,4081,Craig,Johnson
|
||||
mary@example.com,9346,Mary,Jenkins
|
||||
jamie@example.com,5079,Jamie,Smith
|
|
@ -181,6 +181,9 @@ ComplexKeyCache
|
||||
ComplexKeyDirect
|
||||
ComplexKeyHashed
|
||||
Composable
|
||||
composable
|
||||
ConcurrencyControlAcquired
|
||||
ConcurrencyControlSoftLimit
|
||||
Config
|
||||
ConnectionDetails
|
||||
Const
|
||||
|
Loading…
Reference in New Issue
Block a user