Merge branch 'master' into remove-dag-flags

This commit is contained in:
Nikolai Kochetov 2024-06-19 17:59:21 +00:00
commit addaa0dd8f
76 changed files with 1562 additions and 466 deletions

View File

@ -104,10 +104,9 @@ jobs:
with:
stage: Tests_2
data: ${{ needs.RunConfig.outputs.data }}
# stage for jobs that do not prohibit merge
Tests_3:
# Test_3 should not wait for Test_1/Test_2 and should not be blocked by them on master branch since all jobs need to run there.
needs: [RunConfig, Builds_1, Builds_2]
needs: [RunConfig, Builds_1]
if: ${{ !failure() && !cancelled() && contains(fromJson(needs.RunConfig.outputs.data).stages_data.stages_to_do, 'Tests_3') }}
uses: ./.github/workflows/reusable_test_stage.yml
with:

View File

@ -134,9 +134,8 @@ jobs:
with:
stage: Tests_2
data: ${{ needs.RunConfig.outputs.data }}
# stage for jobs that do not prohibit merge
Tests_3:
needs: [RunConfig, Builds_1, Tests_1, Builds_2, Tests_2]
needs: [RunConfig, Builds_1, Tests_1]
if: ${{ !failure() && !cancelled() && contains(fromJson(needs.RunConfig.outputs.data).stages_data.stages_to_do, 'Tests_3') }}
uses: ./.github/workflows/reusable_test_stage.yml
with:
@ -157,7 +156,8 @@ jobs:
CheckReadyForMerge:
if: ${{ !cancelled() && needs.StyleCheck.result == 'success' }}
needs: [RunConfig, BuildDockers, StyleCheck, FastTest, Builds_1, Builds_2, Builds_Report, Tests_1, Tests_2]
# Test_2 or Test_3 must not have jobs required for Mergeable check
needs: [RunConfig, BuildDockers, StyleCheck, FastTest, Builds_1, Builds_2, Builds_Report, Tests_1]
runs-on: [self-hosted, style-checker-aarch64]
steps:
- name: Check out repository code
@ -196,8 +196,7 @@ jobs:
concurrency:
group: jepsen
if: ${{ !failure() && !cancelled() && contains(fromJson(needs.RunConfig.outputs.data).jobs_data.jobs_to_do, 'ClickHouse Keeper Jepsen') }}
# jepsen needs binary_release build which is in Builds_2
needs: [RunConfig, Builds_2]
needs: [RunConfig, Builds_1]
uses: ./.github/workflows/reusable_test.yml
with:
test_name: ClickHouse Keeper Jepsen

View File

@ -213,6 +213,7 @@ target_compile_definitions (_poco_foundation
)
target_include_directories (_poco_foundation SYSTEM PUBLIC "include")
target_link_libraries (_poco_foundation PRIVATE clickhouse_common_io)
target_link_libraries (_poco_foundation
PRIVATE

View File

@ -48,7 +48,13 @@ class Foundation_API ThreadPool
/// from the pool.
{
public:
ThreadPool(int minCapacity = 2, int maxCapacity = 16, int idleTime = 60, int stackSize = POCO_THREAD_STACK_SIZE);
explicit ThreadPool(
int minCapacity = 2,
int maxCapacity = 16,
int idleTime = 60,
int stackSize = POCO_THREAD_STACK_SIZE,
size_t global_profiler_real_time_period_ns_ = 0,
size_t global_profiler_cpu_time_period_ns_ = 0);
/// Creates a thread pool with minCapacity threads.
/// If required, up to maxCapacity threads are created
/// a NoThreadAvailableException exception is thrown.
@ -56,8 +62,14 @@ public:
/// and more than minCapacity threads are running, the thread
/// is killed. Threads are created with given stack size.
ThreadPool(
const std::string & name, int minCapacity = 2, int maxCapacity = 16, int idleTime = 60, int stackSize = POCO_THREAD_STACK_SIZE);
explicit ThreadPool(
const std::string & name,
int minCapacity = 2,
int maxCapacity = 16,
int idleTime = 60,
int stackSize = POCO_THREAD_STACK_SIZE,
size_t global_profiler_real_time_period_ns_ = 0,
size_t global_profiler_cpu_time_period_ns_ = 0);
/// Creates a thread pool with the given name and minCapacity threads.
/// If required, up to maxCapacity threads are created
/// a NoThreadAvailableException exception is thrown.
@ -171,6 +183,8 @@ private:
int _serial;
int _age;
int _stackSize;
size_t _globalProfilerRealTimePeriodNs;
size_t _globalProfilerCPUTimePeriodNs;
ThreadVec _threads;
mutable FastMutex _mutex;
};

View File

@ -20,6 +20,7 @@
#include "Poco/ErrorHandler.h"
#include <sstream>
#include <ctime>
#include <Common/ThreadPool.h>
namespace Poco {
@ -28,7 +29,11 @@ namespace Poco {
class PooledThread: public Runnable
{
public:
PooledThread(const std::string& name, int stackSize = POCO_THREAD_STACK_SIZE);
explicit PooledThread(
const std::string& name,
int stackSize = POCO_THREAD_STACK_SIZE,
size_t globalProfilerRealTimePeriodNs_ = 0,
size_t globalProfilerCPUTimePeriodNs_ = 0);
~PooledThread();
void start();
@ -51,16 +56,24 @@ private:
Event _targetCompleted;
Event _started;
FastMutex _mutex;
size_t _globalProfilerRealTimePeriodNs;
size_t _globalProfilerCPUTimePeriodNs;
};
PooledThread::PooledThread(const std::string& name, int stackSize):
_idle(true),
_idleTime(0),
_pTarget(0),
_name(name),
PooledThread::PooledThread(
const std::string& name,
int stackSize,
size_t globalProfilerRealTimePeriodNs_,
size_t globalProfilerCPUTimePeriodNs_) :
_idle(true),
_idleTime(0),
_pTarget(0),
_name(name),
_thread(name),
_targetCompleted(false)
_targetCompleted(false),
_globalProfilerRealTimePeriodNs(globalProfilerRealTimePeriodNs_),
_globalProfilerCPUTimePeriodNs(globalProfilerCPUTimePeriodNs_)
{
poco_assert_dbg (stackSize >= 0);
_thread.setStackSize(stackSize);
@ -83,7 +96,7 @@ void PooledThread::start()
void PooledThread::start(Thread::Priority priority, Runnable& target)
{
FastMutex::ScopedLock lock(_mutex);
poco_assert (_pTarget == 0);
_pTarget = &target;
@ -109,7 +122,7 @@ void PooledThread::start(Thread::Priority priority, Runnable& target, const std:
}
_thread.setName(fullName);
_thread.setPriority(priority);
poco_assert (_pTarget == 0);
_pTarget = &target;
@ -145,7 +158,7 @@ void PooledThread::join()
void PooledThread::activate()
{
FastMutex::ScopedLock lock(_mutex);
poco_assert (_idle);
_idle = false;
_targetCompleted.reset();
@ -155,7 +168,7 @@ void PooledThread::activate()
void PooledThread::release()
{
const long JOIN_TIMEOUT = 10000;
_mutex.lock();
_pTarget = 0;
_mutex.unlock();
@ -174,6 +187,10 @@ void PooledThread::release()
void PooledThread::run()
{
DB::ThreadStatus thread_status;
if (unlikely(_globalProfilerRealTimePeriodNs != 0 || _globalProfilerCPUTimePeriodNs != 0))
thread_status.initGlobalProfiler(_globalProfilerRealTimePeriodNs, _globalProfilerCPUTimePeriodNs);
_started.set();
for (;;)
{
@ -220,13 +237,17 @@ void PooledThread::run()
ThreadPool::ThreadPool(int minCapacity,
int maxCapacity,
int idleTime,
int stackSize):
_minCapacity(minCapacity),
_maxCapacity(maxCapacity),
int stackSize,
size_t globalProfilerRealTimePeriodNs_,
size_t globalProfilerCPUTimePeriodNs_) :
_minCapacity(minCapacity),
_maxCapacity(maxCapacity),
_idleTime(idleTime),
_serial(0),
_age(0),
_stackSize(stackSize)
_stackSize(stackSize),
_globalProfilerRealTimePeriodNs(globalProfilerRealTimePeriodNs_),
_globalProfilerCPUTimePeriodNs(globalProfilerCPUTimePeriodNs_)
{
poco_assert (minCapacity >= 1 && maxCapacity >= minCapacity && idleTime > 0);
@ -243,14 +264,18 @@ ThreadPool::ThreadPool(const std::string& name,
int minCapacity,
int maxCapacity,
int idleTime,
int stackSize):
int stackSize,
size_t globalProfilerRealTimePeriodNs_,
size_t globalProfilerCPUTimePeriodNs_) :
_name(name),
_minCapacity(minCapacity),
_maxCapacity(maxCapacity),
_minCapacity(minCapacity),
_maxCapacity(maxCapacity),
_idleTime(idleTime),
_serial(0),
_age(0),
_stackSize(stackSize)
_stackSize(stackSize),
_globalProfilerRealTimePeriodNs(globalProfilerRealTimePeriodNs_),
_globalProfilerCPUTimePeriodNs(globalProfilerCPUTimePeriodNs_)
{
poco_assert (minCapacity >= 1 && maxCapacity >= minCapacity && idleTime > 0);
@ -393,15 +418,15 @@ void ThreadPool::housekeep()
ThreadVec activeThreads;
idleThreads.reserve(_threads.size());
activeThreads.reserve(_threads.size());
for (ThreadVec::iterator it = _threads.begin(); it != _threads.end(); ++it)
{
if ((*it)->idle())
{
if ((*it)->idleTime() < _idleTime)
idleThreads.push_back(*it);
else
expiredThreads.push_back(*it);
else
expiredThreads.push_back(*it);
}
else activeThreads.push_back(*it);
}
@ -463,7 +488,7 @@ PooledThread* ThreadPool::createThread()
{
std::ostringstream name;
name << _name << "[#" << ++_serial << "]";
return new PooledThread(name.str(), _stackSize);
return new PooledThread(name.str(), _stackSize, _globalProfilerRealTimePeriodNs, _globalProfilerCPUTimePeriodNs);
}
@ -481,7 +506,7 @@ public:
ThreadPool* pool()
{
FastMutex::ScopedLock lock(_mutex);
if (!_pPool)
{
_pPool = new ThreadPool("default");
@ -490,7 +515,7 @@ public:
}
return _pPool;
}
private:
ThreadPool* _pPool;
FastMutex _mutex;

View File

@ -37,7 +37,7 @@ Using named collections:
<named_collections>
<iceberg_conf>
<url>http://test.s3.amazonaws.com/clickhouse-bucket/</url>
<access_key_id>test<access_key_id>
<access_key_id>test</access_key_id>
<secret_access_key>test</secret_access_key>
</iceberg_conf>
</named_collections>

View File

@ -13,7 +13,7 @@ This engine provides integration with [Amazon S3](https://aws.amazon.com/s3/) ec
CREATE TABLE s3_queue_engine_table (name String, value UInt32)
ENGINE = S3Queue(path, [NOSIGN, | aws_access_key_id, aws_secret_access_key,] format, [compression])
[SETTINGS]
[mode = 'unordered',]
[mode = '',]
[after_processing = 'keep',]
[keeper_path = '',]
[s3queue_loading_retries = 0,]

View File

@ -591,6 +591,22 @@ Default value: 100000
<max_part_num_to_warn>400</max_part_num_to_warn>
```
## max\_table\_num\_to\_throw {#max-table-num-to-throw}
If number of tables is greater than this value, server will throw an exception. 0 means no limitation. View, remote tables, dictionary, system tables are not counted. Only count table in Atomic/Ordinary/Replicated/Lazy database engine.Default value: 0
**Example**
```xml
<max_table_num_to_throw>400</max_table_num_to_throw>
```
## max\_database\_num\_to\_throw {#max-table-num-to-throw}
If number of _database is greater than this value, server will throw an exception. 0 means no limitation.
Default value: 0
**Example**
```xml
<max_database_num_to_throw>400</max_database_num_to_throw>
```
## max_temporary_data_on_disk_size

View File

@ -1592,19 +1592,19 @@ Default value: `default`.
## parallel_replicas_custom_key_range_lower {#parallel_replicas_custom_key_range_lower}
Allows the filter type `range` to split the work evenly between replicas based on the custom range `[parallel_replicas_custom_key_range_lower, INT_MAX]`.
Allows the filter type `range` to split the work evenly between replicas based on the custom range `[parallel_replicas_custom_key_range_lower, INT_MAX]`.
When used in conjuction with [parallel_replicas_custom_key_range_upper](#parallel_replicas_custom_key_range_upper), it lets the filter evenly split the work over replicas for the range `[parallel_replicas_custom_key_range_lower, parallel_replicas_custom_key_range_upper]`.
When used in conjuction with [parallel_replicas_custom_key_range_upper](#parallel_replicas_custom_key_range_upper), it lets the filter evenly split the work over replicas for the range `[parallel_replicas_custom_key_range_lower, parallel_replicas_custom_key_range_upper]`.
Note: This setting will not cause any additional data to be filtered during query processing, rather it changes the points at which the range filter breaks up the range `[0, INT_MAX]` for parallel processing.
Note: This setting will not cause any additional data to be filtered during query processing, rather it changes the points at which the range filter breaks up the range `[0, INT_MAX]` for parallel processing.
## parallel_replicas_custom_key_range_upper {#parallel_replicas_custom_key_range_upper}
Allows the filter type `range` to split the work evenly between replicas based on the custom range `[0, parallel_replicas_custom_key_range_upper]`. A value of 0 disables the upper bound, setting it the max value of the custom key expression.
When used in conjuction with [parallel_replicas_custom_key_range_lower](#parallel_replicas_custom_key_range_lower), it lets the filter evenly split the work over replicas for the range `[parallel_replicas_custom_key_range_lower, parallel_replicas_custom_key_range_upper]`.
When used in conjuction with [parallel_replicas_custom_key_range_lower](#parallel_replicas_custom_key_range_lower), it lets the filter evenly split the work over replicas for the range `[parallel_replicas_custom_key_range_lower, parallel_replicas_custom_key_range_upper]`.
Note: This setting will not cause any additional data to be filtered during query processing, rather it changes the points at which the range filter breaks up the range `[0, INT_MAX]` for parallel processing.
Note: This setting will not cause any additional data to be filtered during query processing, rather it changes the points at which the range filter breaks up the range `[0, INT_MAX]` for parallel processing.
## allow_experimental_parallel_reading_from_replicas
@ -3188,7 +3188,7 @@ Default value: `0`.
## lightweight_deletes_sync {#lightweight_deletes_sync}
The same as 'mutation_sync', but controls only execution of lightweight deletes.
The same as 'mutation_sync', but controls only execution of lightweight deletes.
Possible values:
@ -5150,7 +5150,7 @@ Allows using statistic to optimize the order of [prewhere conditions](../../sql-
## analyze_index_with_space_filling_curves
If a table has a space-filling curve in its index, e.g. `ORDER BY mortonEncode(x, y)`, and the query has conditions on its arguments, e.g. `x >= 10 AND x <= 20 AND y >= 20 AND y <= 30`, use the space-filling curve for index analysis.
If a table has a space-filling curve in its index, e.g. `ORDER BY mortonEncode(x, y)` or `ORDER BY hilbertEncode(x, y)`, and the query has conditions on its arguments, e.g. `x >= 10 AND x <= 20 AND y >= 20 AND y <= 30`, use the space-filling curve for index analysis.
## query_plan_enable_optimizations {#query_plan_enable_optimizations}

View File

@ -137,7 +137,7 @@ If the time transition (due to daylight saving time or for other reasons) was pe
Non-monotonic calendar dates. For example, in Happy Valley - Goose Bay, the time was transitioned one hour backwards at 00:01:00 7 Nov 2010 (one minute after midnight). So after 6th Nov has ended, people observed a whole one minute of 7th Nov, then time was changed back to 23:01 6th Nov and after another 59 minutes the 7th Nov started again. ClickHouse does not (yet) support this kind of fun. During these days the results of time processing functions may be slightly incorrect.
Similar issue exists for Casey Antarctic station in year 2010. They changed time three hours back at 5 Mar, 02:00. If you are working in antarctic station, please don't afraid to use ClickHouse. Just make sure you set timezone to UTC or be aware of inaccuracies.
Similar issue exists for Casey Antarctic station in year 2010. They changed time three hours back at 5 Mar, 02:00. If you are working in antarctic station, please don't be afraid to use ClickHouse. Just make sure you set timezone to UTC or be aware of inaccuracies.
Time shifts for multiple days. Some pacific islands changed their timezone offset from UTC+14 to UTC-12. That's alright but some inaccuracies may present if you do calculations with their timezone for historical time points at the days of conversion.

View File

@ -10,6 +10,7 @@
#include <Poco/Net/NetException.h>
#include <Poco/Util/HelpFormatter.h>
#include <Poco/Environment.h>
#include <Poco/Config.h>
#include <Common/scope_guard_safe.h>
#include <Common/logger_useful.h>
#include <base/phdr_cache.h>
@ -721,11 +722,6 @@ try
CurrentMetrics::set(CurrentMetrics::Revision, ClickHouseRevision::getVersionRevision());
CurrentMetrics::set(CurrentMetrics::VersionInteger, ClickHouseRevision::getVersionInteger());
Poco::ThreadPool server_pool(3, server_settings.max_connections);
std::mutex servers_lock;
std::vector<ProtocolServerAdapter> servers;
std::vector<ProtocolServerAdapter> servers_to_start_before_tables;
/** Context contains all that query execution is dependent:
* settings, available functions, data types, aggregate functions, databases, ...
*/
@ -823,6 +819,18 @@ try
total_memory_tracker.setSampleMaxAllocationSize(server_settings.total_memory_profiler_sample_max_allocation_size);
}
Poco::ThreadPool server_pool(
/* minCapacity */3,
/* maxCapacity */server_settings.max_connections,
/* idleTime */60,
/* stackSize */POCO_THREAD_STACK_SIZE,
server_settings.global_profiler_real_time_period_ns,
server_settings.global_profiler_cpu_time_period_ns);
std::mutex servers_lock;
std::vector<ProtocolServerAdapter> servers;
std::vector<ProtocolServerAdapter> servers_to_start_before_tables;
/// Wait for all threads to avoid possible use-after-free (for example logging objects can be already destroyed).
SCOPE_EXIT({
Stopwatch watch;

View File

@ -602,6 +602,8 @@
M(721, DEPRECATED_FUNCTION) \
M(722, ASYNC_LOAD_WAIT_FAILED) \
M(723, PARQUET_EXCEPTION) \
M(724, TOO_MANY_TABLES) \
M(725, TOO_MANY_DATABASES) \
\
M(900, DISTRIBUTED_CACHE_ERROR) \
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \

161
src/Common/HilbertUtils.h Normal file
View File

@ -0,0 +1,161 @@
#pragma once
#include <Core/Types.h>
#include <Common/BitHelpers.h>
#include "base/types.h"
#include <Functions/hilbertDecode2DLUT.h>
#include <base/defines.h>
#include <array>
#include <set>
namespace HilbertDetails
{
struct Segment // represents [begin; end], all bounds are included
{
UInt64 begin;
UInt64 end;
};
}
/*
Given the range of values of hilbert code - and this function will return segments of the Hilbert curve
such that each of them lies in a whole domain (aka square)
0 1
0 00xxx 11xxx
| |
| |
_______________________________
| |
| |
| |
1 01xxx___________10xxx
Imagine a square, one side of which is a x-axis, other is a y-axis.
First approximation of the Hilbert curve is on the picture - U curve.
So we divide Hilbert Code Interval on 4 parts each of which is represented by a square
and look where the given interval [start, finish] is located:
[00xxxxxx | 01xxxxxx | 10xxxxxx | 11xxxxxx ]
1: [ ]
start = 0010111 end = 10111110
2: [ ] [ ]
If it contains a whole sector (that represents a domain=square),
then we take this range. In the example above - it is a sector [01000000, 01111111]
Then we dig into the recursion and check the remaining ranges.
Note that after the first call all other ranges in the recursion will have either start or finish on the end of a range,
so the complexity of the algorithm will be O(logN), where N is the maximum of hilbert code.
*/
template <typename F>
void segmentBinaryPartition(UInt64 start, UInt64 finish, UInt8 current_bits, F && callback)
{
if (current_bits == 0)
return;
const auto next_bits = current_bits - 2;
const auto history = current_bits == 64 ? 0 : (start >> current_bits) << current_bits;
const auto chunk_mask = 0b11;
const auto start_chunk = (start >> next_bits) & chunk_mask;
const auto finish_chunk = (finish >> next_bits) & chunk_mask;
auto construct_range = [next_bits, history](UInt64 chunk)
{
return HilbertDetails::Segment{
.begin = history + (chunk << next_bits),
.end = history + ((chunk + 1) << next_bits) - 1
};
};
if (start_chunk == finish_chunk)
{
if ((finish - start + 1) == (1 << next_bits)) // it means that [begin, end] is a range
{
callback(HilbertDetails::Segment{.begin = start, .end = finish});
return;
}
segmentBinaryPartition(start, finish, next_bits, callback);
return;
}
for (auto range_chunk = start_chunk + 1; range_chunk < finish_chunk; ++range_chunk)
{
callback(construct_range(range_chunk));
}
const auto start_range = construct_range(start_chunk);
if (start == start_range.begin)
{
callback(start_range);
}
else
{
segmentBinaryPartition(start, start_range.end, next_bits, callback);
}
const auto finish_range = construct_range(finish_chunk);
if (finish == finish_range.end)
{
callback(finish_range);
}
else
{
segmentBinaryPartition(finish_range.begin, finish, next_bits, callback);
}
}
// Given 2 points representing ends of the range of Hilbert Curve that lies in a whole domain.
// The are neighbour corners of some square - and the function returns ranges of both sides of this square
inline std::array<std::pair<UInt64, UInt64>, 2> createRangeFromCorners(UInt64 x1, UInt64 y1, UInt64 x2, UInt64 y2)
{
UInt64 dist_x = x1 > x2 ? x1 - x2 : x2 - x1;
UInt64 dist_y = y1 > y2 ? y1 - y2 : y2 - y1;
UInt64 range_size = std::max(dist_x, dist_y);
bool contains_minimum_vertice = x1 % (range_size + 1) == 0;
if (contains_minimum_vertice)
{
UInt64 x_min = std::min(x1, x2);
UInt64 y_min = std::min(y1, y2);
return {
std::pair<UInt64, UInt64>{x_min, x_min + range_size},
std::pair<UInt64, UInt64>{y_min, y_min + range_size}
};
}
else
{
UInt64 x_max = std::max(x1, x2);
UInt64 y_max = std::max(y1, y2);
chassert(x_max >= range_size);
chassert(y_max >= range_size);
return {
std::pair<UInt64, UInt64>{x_max - range_size, x_max},
std::pair<UInt64, UInt64>{y_max - range_size, y_max}
};
}
}
/** Unpack an interval of Hilbert curve to hyperrectangles covered by it across N dimensions.
*/
template <typename F>
void hilbertIntervalToHyperrectangles2D(UInt64 first, UInt64 last, F && callback)
{
const auto equal_bits_count = getLeadingZeroBits(last | first);
const auto even_equal_bits_count = equal_bits_count - equal_bits_count % 2;
segmentBinaryPartition(first, last, 64 - even_equal_bits_count, [&](HilbertDetails::Segment range)
{
auto interval1 = DB::FunctionHilbertDecode2DWIthLookupTableImpl<3>::decode(range.begin);
auto interval2 = DB::FunctionHilbertDecode2DWIthLookupTableImpl<3>::decode(range.end);
std::array<std::pair<UInt64, UInt64>, 2> unpacked_range = createRangeFromCorners(
std::get<0>(interval1), std::get<1>(interval1),
std::get<0>(interval2), std::get<1>(interval2));
callback(unpacked_range);
});
}

View File

@ -102,6 +102,8 @@ namespace DB
M(UInt64, max_dictionary_num_to_warn, 1000lu, "If the number of dictionaries is greater than this value, the server will create a warning that will displayed to user.", 0) \
M(UInt64, max_database_num_to_warn, 1000lu, "If the number of databases is greater than this value, the server will create a warning that will displayed to user.", 0) \
M(UInt64, max_part_num_to_warn, 100000lu, "If the number of parts is greater than this value, the server will create a warning that will displayed to user.", 0) \
M(UInt64, max_table_num_to_throw, 0lu, "If number of tables is greater than this value, server will throw an exception. 0 means no limitation. View, remote tables, dictionary, system tables are not counted. Only count table in Atomic/Ordinary/Replicated/Lazy database engine.", 0) \
M(UInt64, max_database_num_to_throw, 0lu, "If number of databases is greater than this value, server will throw an exception. 0 means no limitation.", 0) \
M(UInt64, concurrent_threads_soft_limit_num, 0, "Sets how many concurrent thread can be allocated before applying CPU pressure. Zero means unlimited.", 0) \
M(UInt64, concurrent_threads_soft_limit_ratio_to_cores, 0, "Same as concurrent_threads_soft_limit_num, but with ratio to cores.", 0) \
\

View File

@ -186,6 +186,7 @@ void DatabaseLazy::attachTable(ContextPtr /* context_ */, const String & table_n
throw Exception(ErrorCodes::TABLE_ALREADY_EXISTS, "Table {}.{} already exists.", backQuote(database_name), backQuote(table_name));
it->second.expiration_iterator = cache_expiration_queue.emplace(cache_expiration_queue.end(), current_time, table_name);
CurrentMetrics::add(CurrentMetrics::AttachedTable, 1);
}
@ -202,6 +203,7 @@ StoragePtr DatabaseLazy::detachTable(ContextPtr /* context */, const String & ta
if (it->second.expiration_iterator != cache_expiration_queue.end())
cache_expiration_queue.erase(it->second.expiration_iterator);
tables_cache.erase(it);
CurrentMetrics::sub(CurrentMetrics::AttachedTable, 1);
}
return res;

View File

@ -260,7 +260,9 @@ StoragePtr DatabaseWithOwnTablesBase::detachTableUnlocked(const String & table_n
res = it->second;
tables.erase(it);
res->is_detached = true;
CurrentMetrics::sub(getAttachedCounterForStorage(res), 1);
if (res->isSystemStorage() == false)
CurrentMetrics::sub(getAttachedCounterForStorage(res), 1);
auto table_id = res->getStorageID();
if (table_id.hasUUID())
@ -301,7 +303,9 @@ void DatabaseWithOwnTablesBase::attachTableUnlocked(const String & table_name, c
/// It is important to reset is_detached here since in case of RENAME in
/// non-Atomic database the is_detached is set to true before RENAME.
table->is_detached = false;
CurrentMetrics::add(getAttachedCounterForStorage(table), 1);
if (table->isSystemStorage() == false && table_id.database_name != DatabaseCatalog::SYSTEM_DATABASE)
CurrentMetrics::add(getAttachedCounterForStorage(table), 1);
}
void DatabaseWithOwnTablesBase::shutdown()

View File

@ -129,6 +129,7 @@ public:
static constexpr const char * SYSTEM_DATABASE = "system";
static constexpr const char * INFORMATION_SCHEMA = "information_schema";
static constexpr const char * INFORMATION_SCHEMA_UPPERCASE = "INFORMATION_SCHEMA";
static constexpr const char * DEFAULT_DATABASE = "default";
/// Returns true if a passed name is one of the predefined databases' names.
static bool isPredefinedDatabase(std::string_view database_name);

View File

@ -88,6 +88,11 @@
#include <Interpreters/ReplaceQueryParameterVisitor.h>
#include <Parsers/QueryParameterVisitor.h>
namespace CurrentMetrics
{
extern const Metric AttachedTable;
}
namespace DB
{
@ -113,6 +118,8 @@ namespace ErrorCodes
extern const int UNKNOWN_STORAGE;
extern const int SYNTAX_ERROR;
extern const int SUPPORT_IS_DISABLED;
extern const int TOO_MANY_TABLES;
extern const int TOO_MANY_DATABASES;
}
namespace fs = std::filesystem;
@ -138,6 +145,31 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
throw Exception(ErrorCodes::DATABASE_ALREADY_EXISTS, "Database {} already exists.", database_name);
}
auto db_num_limit = getContext()->getGlobalContext()->getServerSettings().max_database_num_to_throw;
if (db_num_limit > 0)
{
size_t db_count = DatabaseCatalog::instance().getDatabases().size();
std::vector<String> system_databases = {
DatabaseCatalog::TEMPORARY_DATABASE,
DatabaseCatalog::SYSTEM_DATABASE,
DatabaseCatalog::INFORMATION_SCHEMA,
DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE,
DatabaseCatalog::DEFAULT_DATABASE
};
for (const auto & system_database : system_databases)
{
if (db_count > 0 && DatabaseCatalog::instance().isDatabaseExist(system_database))
db_count--;
}
if (db_count >= db_num_limit)
throw Exception(ErrorCodes::TOO_MANY_DATABASES,
"Too many databases in the Clickhouse. "
"The limit (setting 'max_database_num_to_throw') is set to {}, current number of databases is {}",
db_num_limit, db_count);
}
/// Will write file with database metadata, if needed.
String database_name_escaped = escapeForFileName(database_name);
fs::path metadata_path = fs::weakly_canonical(getContext()->getPath());
@ -1543,6 +1575,17 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
}
}
UInt64 table_num_limit = getContext()->getGlobalContext()->getServerSettings().max_table_num_to_throw;
if (table_num_limit > 0 && create.getDatabase() != DatabaseCatalog::SYSTEM_DATABASE)
{
UInt64 table_count = CurrentMetrics::get(CurrentMetrics::AttachedTable);
if (table_count >= table_num_limit)
throw Exception(ErrorCodes::TOO_MANY_TABLES,
"Too many tables in the Clickhouse. "
"The limit (setting 'max_table_num_to_throw') is set to {}, current number of tables is {}",
table_num_limit, table_count);
}
database->createTable(getContext(), create.getTable(), res, query_ptr);
/// Move table data to the proper place. Wo do not move data earlier to avoid situations

View File

@ -26,7 +26,8 @@
#include <Processors/Transforms/CountingTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/Transforms/SquashingChunksTransform.h>
#include <Processors/Transforms/SquashingTransform.h>
#include <Processors/Transforms/PlanSquashingTransform.h>
#include <Processors/Transforms/getSourceFromASTInsertQuery.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
@ -625,9 +626,15 @@ BlockIO InterpreterInsertQuery::execute()
{
bool table_prefers_large_blocks = table->prefersLargeBlocks();
pipeline.addTransform(std::make_shared<PlanSquashingTransform>(
header,
table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size,
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL,
presink_chains.size()));
pipeline.addSimpleTransform([&](const Block & in_header) -> ProcessorPtr
{
return std::make_shared<SimpleSquashingChunksTransform>(
return std::make_shared<ApplySquashingTransform>(
in_header,
table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size,
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL);
@ -683,12 +690,20 @@ BlockIO InterpreterInsertQuery::execute()
{
bool table_prefers_large_blocks = table->prefersLargeBlocks();
auto squashing = std::make_shared<SimpleSquashingChunksTransform>(
chain.getInputHeader(),
table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size,
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL);
auto squashing = std::make_shared<ApplySquashingTransform>(
chain.getInputHeader(),
table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size,
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL);
chain.addSource(std::move(squashing));
auto balancing = std::make_shared<PlanSquashingTransform>(
chain.getInputHeader(),
table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size,
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL,
presink_chains.size());
chain.addSource(std::move(balancing));
}
auto context_ptr = getContext();

View File

@ -1473,6 +1473,9 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
if (expressions.hasHaving() && query.group_by_with_totals && (query.group_by_with_rollup || query.group_by_with_cube))
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "WITH TOTALS and WITH ROLLUP or CUBE are not supported together in presence of HAVING");
if (query.qualify())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "QUALIFY clause is not supported in the old analyzer");
if (options.only_analyze)
{
auto read_nothing = std::make_unique<ReadNothingStep>(source_header);

View File

@ -0,0 +1,159 @@
#include <vector>
#include <Interpreters/Squashing.h>
#include <Common/CurrentThread.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
Squashing::Squashing(Block header_, size_t min_block_size_rows_, size_t min_block_size_bytes_)
: header(header_)
, min_block_size_rows(min_block_size_rows_)
, min_block_size_bytes(min_block_size_bytes_)
{
}
Chunk Squashing::flush()
{
return convertToChunk(std::move(chunks_to_merge_vec));
}
Chunk Squashing::squash(Chunk && input_chunk)
{
if (!input_chunk.hasChunkInfo())
return Chunk();
const auto *info = getInfoFromChunk(input_chunk);
return squash(info->chunks);
}
Chunk Squashing::add(Chunk && input_chunk)
{
if (!input_chunk)
return {};
/// Just read block is already enough.
if (isEnoughSize(input_chunk.getNumRows(), input_chunk.bytes()))
{
/// If no accumulated data, return just read block.
if (chunks_to_merge_vec.empty())
{
chunks_to_merge_vec.push_back(std::move(input_chunk));
Chunk res_chunk = convertToChunk(std::move(chunks_to_merge_vec));
chunks_to_merge_vec.clear();
return res_chunk;
}
/// Return accumulated data (maybe it has small size) and place new block to accumulated data.
Chunk res_chunk = convertToChunk(std::move(chunks_to_merge_vec));
chunks_to_merge_vec.clear();
changeCurrentSize(input_chunk.getNumRows(), input_chunk.bytes());
chunks_to_merge_vec.push_back(std::move(input_chunk));
return res_chunk;
}
/// Accumulated block is already enough.
if (isEnoughSize(accumulated_size.rows, accumulated_size.bytes))
{
/// Return accumulated data and place new block to accumulated data.
Chunk res_chunk = convertToChunk(std::move(chunks_to_merge_vec));
chunks_to_merge_vec.clear();
changeCurrentSize(input_chunk.getNumRows(), input_chunk.bytes());
chunks_to_merge_vec.push_back(std::move(input_chunk));
return res_chunk;
}
/// Pushing data into accumulating vector
expandCurrentSize(input_chunk.getNumRows(), input_chunk.bytes());
chunks_to_merge_vec.push_back(std::move(input_chunk));
/// If accumulated data is big enough, we send it
if (isEnoughSize(accumulated_size.rows, accumulated_size.bytes))
{
Chunk res_chunk = convertToChunk(std::move(chunks_to_merge_vec));
changeCurrentSize(0, 0);
chunks_to_merge_vec.clear();
return res_chunk;
}
return {};
}
Chunk Squashing::convertToChunk(std::vector<Chunk> && chunks) const
{
if (chunks.empty())
return {};
auto info = std::make_shared<ChunksToSquash>();
info->chunks = std::move(chunks);
chunks.clear();
return Chunk(header.cloneEmptyColumns(), 0, info);
}
Chunk Squashing::squash(std::vector<Chunk> & input_chunks)
{
Chunk accumulated_chunk;
std::vector<IColumn::MutablePtr> mutable_columns = {};
size_t rows = 0;
for (const Chunk & chunk : input_chunks)
rows += chunk.getNumRows();
{
auto & first_chunk = input_chunks[0];
Columns columns = first_chunk.detachColumns();
for (auto & column : columns)
{
mutable_columns.push_back(IColumn::mutate(std::move(column)));
mutable_columns.back()->reserve(rows);
}
}
for (size_t i = 1; i < input_chunks.size(); ++i) // We've already processed the first chunk above
{
Columns columns = input_chunks[i].detachColumns();
for (size_t j = 0, size = mutable_columns.size(); j < size; ++j)
{
const auto source_column = columns[j];
mutable_columns[j]->insertRangeFrom(*source_column, 0, source_column->size());
}
}
accumulated_chunk.setColumns(std::move(mutable_columns), rows);
return accumulated_chunk;
}
const ChunksToSquash* Squashing::getInfoFromChunk(const Chunk & chunk)
{
const auto& info = chunk.getChunkInfo();
const auto * agg_info = typeid_cast<const ChunksToSquash *>(info.get());
if (!agg_info)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no ChunksToSquash in ChunkInfoPtr");
return agg_info;
}
void Squashing::expandCurrentSize(size_t rows, size_t bytes)
{
accumulated_size.rows += rows;
accumulated_size.bytes += bytes;
}
void Squashing::changeCurrentSize(size_t rows, size_t bytes)
{
accumulated_size.rows = rows;
accumulated_size.bytes = bytes;
}
bool Squashing::isEnoughSize(size_t rows, size_t bytes) const
{
return (!min_block_size_rows && !min_block_size_bytes)
|| (min_block_size_rows && rows >= min_block_size_rows)
|| (min_block_size_bytes && bytes >= min_block_size_bytes);
}
}

View File

@ -0,0 +1,69 @@
#pragma once
#include <vector>
#include <Core/Block.h>
#include <Processors/Chunk.h>
namespace DB
{
struct ChunksToSquash : public ChunkInfo
{
mutable std::vector<Chunk> chunks = {};
};
/** Merging consecutive passed blocks to specified minimum size.
*
* (But if one of input blocks has already at least specified size,
* then don't merge it with neighbours, even if neighbours are small.)
*
* Used to prepare blocks to adequate size for INSERT queries,
* because such storages as Memory, StripeLog, Log, TinyLog...
* store or compress data in blocks exactly as passed to it,
* and blocks of small size are not efficient.
*
* Order of data is kept.
*/
class Squashing
{
public:
explicit Squashing(Block header_, size_t min_block_size_rows_, size_t min_block_size_bytes_);
Squashing(Squashing && other) = default;
Chunk add(Chunk && input_chunk);
static Chunk squash(Chunk && input_chunk);
Chunk flush();
bool isDataLeft()
{
return !chunks_to_merge_vec.empty();
}
Block header;
private:
struct CurrentSize
{
size_t rows = 0;
size_t bytes = 0;
};
std::vector<Chunk> chunks_to_merge_vec = {};
size_t min_block_size_rows;
size_t min_block_size_bytes;
CurrentSize accumulated_size;
static const ChunksToSquash * getInfoFromChunk(const Chunk & chunk);
static Chunk squash(std::vector<Chunk> & input_chunks);
void expandCurrentSize(size_t rows, size_t bytes);
void changeCurrentSize(size_t rows, size_t bytes);
bool isEnoughSize(size_t rows, size_t bytes) const;
Chunk convertToChunk(std::vector<Chunk> && chunks) const;
};
}

View File

@ -1,145 +0,0 @@
#include <Interpreters/SquashingTransform.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
extern const int LOGICAL_ERROR;
}
SquashingTransform::SquashingTransform(size_t min_block_size_rows_, size_t min_block_size_bytes_)
: min_block_size_rows(min_block_size_rows_)
, min_block_size_bytes(min_block_size_bytes_)
{
}
Block SquashingTransform::add(Block && input_block)
{
return addImpl<Block &&>(std::move(input_block));
}
Block SquashingTransform::add(const Block & input_block)
{
return addImpl<const Block &>(input_block);
}
/*
* To minimize copying, accept two types of argument: const reference for output
* stream, and rvalue reference for input stream, and decide whether to copy
* inside this function. This allows us not to copy Block unless we absolutely
* have to.
*/
template <typename ReferenceType>
Block SquashingTransform::addImpl(ReferenceType input_block)
{
/// End of input stream.
if (!input_block)
{
Block to_return;
std::swap(to_return, accumulated_block);
return to_return;
}
/// Just read block is already enough.
if (isEnoughSize(input_block))
{
/// If no accumulated data, return just read block.
if (!accumulated_block)
{
return std::move(input_block);
}
/// Return accumulated data (maybe it has small size) and place new block to accumulated data.
Block to_return = std::move(input_block);
std::swap(to_return, accumulated_block);
return to_return;
}
/// Accumulated block is already enough.
if (isEnoughSize(accumulated_block))
{
/// Return accumulated data and place new block to accumulated data.
Block to_return = std::move(input_block);
std::swap(to_return, accumulated_block);
return to_return;
}
append<ReferenceType>(std::move(input_block));
if (isEnoughSize(accumulated_block))
{
Block to_return;
std::swap(to_return, accumulated_block);
return to_return;
}
/// Squashed block is not ready.
return {};
}
template <typename ReferenceType>
void SquashingTransform::append(ReferenceType input_block)
{
if (!accumulated_block)
{
accumulated_block = std::move(input_block);
return;
}
assert(blocksHaveEqualStructure(input_block, accumulated_block));
try
{
for (size_t i = 0, size = accumulated_block.columns(); i < size; ++i)
{
const auto source_column = input_block.getByPosition(i).column;
auto mutable_column = IColumn::mutate(std::move(accumulated_block.getByPosition(i).column));
mutable_column->insertRangeFrom(*source_column, 0, source_column->size());
accumulated_block.getByPosition(i).column = std::move(mutable_column);
}
}
catch (...)
{
/// add() may be called again even after a previous add() threw an exception.
/// Keep accumulated_block in a valid state.
/// Seems ok to discard accumulated data because we're throwing an exception, which the caller will
/// hopefully interpret to mean "this block and all *previous* blocks are potentially lost".
accumulated_block.clear();
throw;
}
}
bool SquashingTransform::isEnoughSize(const Block & block)
{
size_t rows = 0;
size_t bytes = 0;
for (const auto & [column, type, name] : block)
{
if (!column)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid column in block.");
if (!rows)
rows = column->size();
else if (rows != column->size())
throw Exception(ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH, "Sizes of columns doesn't match");
bytes += column->byteSize();
}
return isEnoughSize(rows, bytes);
}
bool SquashingTransform::isEnoughSize(size_t rows, size_t bytes) const
{
return (!min_block_size_rows && !min_block_size_bytes)
|| (min_block_size_rows && rows >= min_block_size_rows)
|| (min_block_size_bytes && bytes >= min_block_size_bytes);
}
}

View File

@ -1,50 +0,0 @@
#pragma once
#include <Core/Block.h>
namespace DB
{
/** Merging consecutive passed blocks to specified minimum size.
*
* (But if one of input blocks has already at least specified size,
* then don't merge it with neighbours, even if neighbours are small.)
*
* Used to prepare blocks to adequate size for INSERT queries,
* because such storages as Memory, StripeLog, Log, TinyLog...
* store or compress data in blocks exactly as passed to it,
* and blocks of small size are not efficient.
*
* Order of data is kept.
*/
class SquashingTransform
{
public:
/// Conditions on rows and bytes are OR-ed. If one of them is zero, then corresponding condition is ignored.
SquashingTransform(size_t min_block_size_rows_, size_t min_block_size_bytes_);
/** Add next block and possibly returns squashed block.
* At end, you need to pass empty block. As the result for last (empty) block, you will get last Result with ready = true.
*/
Block add(Block && block);
Block add(const Block & block);
private:
size_t min_block_size_rows;
size_t min_block_size_bytes;
Block accumulated_block;
template <typename ReferenceType>
Block addImpl(ReferenceType block);
template <typename ReferenceType>
void append(ReferenceType block);
bool isEnoughSize(const Block & block);
bool isEnoughSize(size_t rows, size_t bytes) const;
};
}

View File

@ -2,10 +2,25 @@
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Interpreters/ActionsDAG.h>
#include <Functions/FunctionsLogical.h>
#include <Functions/IFunctionAdaptors.h>
namespace DB::QueryPlanOptimizations
{
static void removeFromOutputs(ActionsDAG & dag, const ActionsDAG::Node & node)
{
auto & outputs = dag.getOutputs();
for (size_t i = 0; i < outputs.size(); ++i)
{
if (&node == outputs[i])
{
outputs.erase(outputs.begin() + i);
return;
}
}
}
size_t tryMergeExpressions(QueryPlan::Node * parent_node, QueryPlan::Nodes &)
{
if (parent_node->children.size() != 1)
@ -19,6 +34,7 @@ size_t tryMergeExpressions(QueryPlan::Node * parent_node, QueryPlan::Nodes &)
auto * parent_expr = typeid_cast<ExpressionStep *>(parent.get());
auto * parent_filter = typeid_cast<FilterStep *>(parent.get());
auto * child_expr = typeid_cast<ExpressionStep *>(child.get());
auto * child_filter = typeid_cast<FilterStep *>(child.get());
if (parent_expr && child_expr)
{
@ -60,6 +76,42 @@ size_t tryMergeExpressions(QueryPlan::Node * parent_node, QueryPlan::Nodes &)
parent_node->children.swap(child_node->children);
return 1;
}
else if (parent_filter && child_filter)
{
const auto & child_actions = child_filter->getExpression();
const auto & parent_actions = parent_filter->getExpression();
if (child_actions->hasArrayJoin())
return 0;
auto actions = child_actions->clone();
const auto & child_filter_node = actions->findInOutputs(child_filter->getFilterColumnName());
if (child_filter->removesFilterColumn())
removeFromOutputs(*actions, child_filter_node);
actions->mergeInplace(std::move(*parent_actions->clone()));
const auto & parent_filter_node = actions->findInOutputs(parent_filter->getFilterColumnName());
if (parent_filter->removesFilterColumn())
removeFromOutputs(*actions, parent_filter_node);
FunctionOverloadResolverPtr func_builder_and = std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionAnd>());
const auto & condition = actions->addFunction(func_builder_and, {&child_filter_node, &parent_filter_node}, {});
auto & outputs = actions->getOutputs();
outputs.insert(outputs.begin(), &condition);
actions->removeUnusedActions(false);
auto filter = std::make_unique<FilterStep>(child_filter->getInputStreams().front(),
actions,
condition.result_name,
true);
filter->setStepDescription("(" + parent_filter->getStepDescription() + " + " + child_filter->getStepDescription() + ")");
parent_node->step = std::move(filter);
parent_node->children.swap(child_node->children);
return 1;
}
return 0;
}

View File

@ -5,7 +5,7 @@
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/Transforms/SquashingChunksTransform.h>
#include <Processors/Transforms/SquashingTransform.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <QueryPipeline/QueryPipelineBuilder.h>

View File

@ -9,7 +9,7 @@
#include <Common/logger_useful.h>
#include <Common/formatReadable.h>
#include <Processors/Transforms/SquashingChunksTransform.h>
#include <Processors/Transforms/SquashingTransform.h>
namespace ProfileEvents
@ -783,7 +783,7 @@ void AggregatingTransform::initGenerate()
{
/// Just a reasonable constant, matches default value for the setting `preferred_block_size_bytes`
static constexpr size_t oneMB = 1024 * 1024;
return std::make_shared<SimpleSquashingChunksTransform>(header, params->params.max_block_size, oneMB);
return std::make_shared<SimpleSquashingTransform>(header, params->params.max_block_size, oneMB);
});
}
/// AggregatingTransform::expandPipeline expects single output port.

View File

@ -0,0 +1,63 @@
#pragma once
#include <Interpreters/Squashing.h>
#include <Processors/ISimpleTransform.h>
#include <Processors/Sinks/SinkToStorage.h>
namespace DB
{
class ApplySquashingTransform : public ExceptionKeepingTransform
{
public:
explicit ApplySquashingTransform(const Block & header, const size_t min_block_size_rows, const size_t min_block_size_bytes)
: ExceptionKeepingTransform(header, header, false)
, squashing(header, min_block_size_rows, min_block_size_bytes)
{
}
String getName() const override { return "ApplySquashingTransform"; }
void work() override
{
if (stage == Stage::Exception)
{
data.chunk.clear();
ready_input = false;
return;
}
ExceptionKeepingTransform::work();
if (finish_chunk)
{
data.chunk = std::move(finish_chunk);
ready_output = true;
}
}
protected:
void onConsume(Chunk chunk) override
{
if (auto res_chunk = DB::Squashing::squash(std::move(chunk)))
cur_chunk.setColumns(res_chunk.getColumns(), res_chunk.getNumRows());
}
GenerateResult onGenerate() override
{
GenerateResult res;
res.chunk = std::move(cur_chunk);
res.is_done = true;
return res;
}
void onFinish() override
{
auto chunk = DB::Squashing::squash({});
finish_chunk.setColumns(chunk.getColumns(), chunk.getNumRows());
}
private:
Squashing squashing;
Chunk cur_chunk;
Chunk finish_chunk;
};
}

View File

@ -0,0 +1,145 @@
#include <Processors/Transforms/PlanSquashingTransform.h>
#include <Processors/IProcessor.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
PlanSquashingTransform::PlanSquashingTransform(const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, size_t num_ports)
: IProcessor(InputPorts(num_ports, header), OutputPorts(num_ports, header)), squashing(header, min_block_size_rows, min_block_size_bytes)
{
}
IProcessor::Status PlanSquashingTransform::prepare()
{
Status status = Status::Ready;
while (planning_status != PlanningStatus::FINISH)
{
switch (planning_status)
{
case INIT:
init();
break;
case READ_IF_CAN:
return prepareConsume();
case PUSH:
return sendOrFlush();
case FLUSH:
return sendOrFlush();
case FINISH:
break; /// never reached
}
}
if (status == Status::Ready)
status = finish();
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "There should be a Ready status to finish the PlanSquashing");
return status;
}
void PlanSquashingTransform::work()
{
prepare();
}
void PlanSquashingTransform::init()
{
for (auto input: inputs)
if (!input.isFinished())
input.setNeeded();
planning_status = PlanningStatus::READ_IF_CAN;
}
IProcessor::Status PlanSquashingTransform::prepareConsume()
{
bool all_finished = true;
for (auto & input : inputs)
{
if (!input.isFinished())
{
all_finished = false;
input.setNeeded();
}
else
continue;
if (input.hasData())
{
chunk = input.pull();
chunk = transform(std::move(chunk));
if (chunk.hasChunkInfo())
{
planning_status = PlanningStatus::PUSH;
return Status::Ready;
}
}
}
if (all_finished) /// If all inputs are closed, we check if we have data in balancing
{
if (squashing.isDataLeft()) /// If we have data in balancing, we process this data
{
planning_status = PlanningStatus::FLUSH;
chunk = flushChunk();
return Status::Ready;
}
planning_status = PlanningStatus::FINISH;
return Status::Ready;
}
return Status::NeedData;
}
Chunk PlanSquashingTransform::transform(Chunk && chunk_)
{
return squashing.add(std::move(chunk_));
}
Chunk PlanSquashingTransform::flushChunk()
{
return squashing.flush();
}
IProcessor::Status PlanSquashingTransform::sendOrFlush()
{
if (!chunk)
{
planning_status = PlanningStatus::FINISH;
return Status::Ready;
}
for (auto &output : outputs)
{
if (output.canPush())
{
if (planning_status == PlanningStatus::PUSH)
planning_status = PlanningStatus::READ_IF_CAN;
else
planning_status = PlanningStatus::FINISH;
output.push(std::move(chunk));
return Status::Ready;
}
}
return Status::PortFull;
}
IProcessor::Status PlanSquashingTransform::finish()
{
for (auto & in : inputs)
in.close();
for (auto & output : outputs)
output.finish();
return Status::Finished;
}
}

View File

@ -0,0 +1,47 @@
#pragma once
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/IProcessor.h>
#include <Interpreters/Squashing.h>
enum PlanningStatus
{
INIT,
READ_IF_CAN,
PUSH,
FLUSH,
FINISH
};
namespace DB
{
class PlanSquashingTransform : public IProcessor
{
public:
PlanSquashingTransform(
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, size_t num_ports);
String getName() const override { return "PlanSquashingTransform"; }
InputPorts & getInputPorts() { return inputs; }
OutputPorts & getOutputPorts() { return outputs; }
Status prepare() override;
void work() override;
void init();
Status prepareConsume();
Status sendOrFlush();
Status waitForDataIn();
Status finish();
Chunk transform(Chunk && chunk);
Chunk flushChunk();
private:
Chunk chunk;
Squashing squashing;
PlanningStatus planning_status = PlanningStatus::INIT;
};
}

View File

@ -1,94 +0,0 @@
#include <Processors/Transforms/SquashingChunksTransform.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
SquashingChunksTransform::SquashingChunksTransform(
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes)
: ExceptionKeepingTransform(header, header, false)
, squashing(min_block_size_rows, min_block_size_bytes)
{
}
void SquashingChunksTransform::onConsume(Chunk chunk)
{
if (auto block = squashing.add(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns())))
{
cur_chunk.setColumns(block.getColumns(), block.rows());
}
}
SquashingChunksTransform::GenerateResult SquashingChunksTransform::onGenerate()
{
GenerateResult res;
res.chunk = std::move(cur_chunk);
res.is_done = true;
return res;
}
void SquashingChunksTransform::onFinish()
{
auto block = squashing.add({});
finish_chunk.setColumns(block.getColumns(), block.rows());
}
void SquashingChunksTransform::work()
{
if (stage == Stage::Exception)
{
data.chunk.clear();
ready_input = false;
return;
}
ExceptionKeepingTransform::work();
if (finish_chunk)
{
data.chunk = std::move(finish_chunk);
ready_output = true;
}
}
SimpleSquashingChunksTransform::SimpleSquashingChunksTransform(
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes)
: IInflatingTransform(header, header), squashing(min_block_size_rows, min_block_size_bytes)
{
}
void SimpleSquashingChunksTransform::consume(Chunk chunk)
{
Block current_block = squashing.add(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()));
squashed_chunk.setColumns(current_block.getColumns(), current_block.rows());
}
Chunk SimpleSquashingChunksTransform::generate()
{
if (squashed_chunk.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't generate chunk in SimpleSquashingChunksTransform");
Chunk result_chunk;
result_chunk.swap(squashed_chunk);
return result_chunk;
}
bool SimpleSquashingChunksTransform::canGenerate()
{
return !squashed_chunk.empty();
}
Chunk SimpleSquashingChunksTransform::getRemaining()
{
Block current_block = squashing.add({});
squashed_chunk.setColumns(current_block.getColumns(), current_block.rows());
Chunk result_chunk;
result_chunk.swap(squashed_chunk);
return result_chunk;
}
}

View File

@ -0,0 +1,108 @@
#include <Processors/Transforms/SquashingTransform.h>
#include <Interpreters/Squashing.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
SquashingTransform::SquashingTransform(
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes)
: ExceptionKeepingTransform(header, header, false)
, squashing(header, min_block_size_rows, min_block_size_bytes)
{
}
void SquashingTransform::onConsume(Chunk chunk)
{
Chunk planned_chunk = squashing.add(std::move(chunk));
if (planned_chunk.hasChunkInfo())
cur_chunk = DB::Squashing::squash(std::move(planned_chunk));
}
SquashingTransform::GenerateResult SquashingTransform::onGenerate()
{
GenerateResult res;
res.chunk = std::move(cur_chunk);
res.is_done = true;
return res;
}
void SquashingTransform::onFinish()
{
Chunk chunk = squashing.flush();
if (chunk.hasChunkInfo())
chunk = DB::Squashing::squash(std::move(chunk));
finish_chunk.setColumns(chunk.getColumns(), chunk.getNumRows());
}
void SquashingTransform::work()
{
if (stage == Stage::Exception)
{
data.chunk.clear();
ready_input = false;
return;
}
ExceptionKeepingTransform::work();
if (finish_chunk)
{
data.chunk = std::move(finish_chunk);
ready_output = true;
}
}
SimpleSquashingTransform::SimpleSquashingTransform(
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes)
: ISimpleTransform(header, header, false)
, squashing(header, min_block_size_rows, min_block_size_bytes)
{
}
void SimpleSquashingTransform::transform(Chunk & chunk)
{
if (!finished)
{
Chunk planned_chunk = squashing.add(std::move(chunk));
if (planned_chunk.hasChunkInfo())
chunk = DB::Squashing::squash(std::move(planned_chunk));
}
else
{
if (chunk.hasRows())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk expected to be empty, otherwise it will be lost");
chunk = squashing.flush();
if (chunk.hasChunkInfo())
chunk = DB::Squashing::squash(std::move(chunk));
}
}
IProcessor::Status SimpleSquashingTransform::prepare()
{
if (!finished && input.isFinished())
{
if (output.isFinished())
return Status::Finished;
if (!output.canPush())
return Status::PortFull;
if (has_output)
{
output.pushData(std::move(output_data));
has_output = false;
return Status::PortFull;
}
finished = true;
/// On the next call to transform() we will return all data buffered in `squashing` (if any)
return Status::Ready;
}
return ISimpleTransform::prepare();
}
}

View File

@ -1,17 +1,17 @@
#pragma once
#include <Interpreters/SquashingTransform.h>
#include <Interpreters/Squashing.h>
#include <Processors/ISimpleTransform.h>
#include <Processors/IInflatingTransform.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Transforms/ApplySquashingTransform.h>
namespace DB
{
class SquashingChunksTransform : public ExceptionKeepingTransform
class SquashingTransform : public ExceptionKeepingTransform
{
public:
explicit SquashingChunksTransform(
explicit SquashingTransform(
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes);
String getName() const override { return "SquashingTransform"; }
@ -24,28 +24,27 @@ protected:
void onFinish() override;
private:
SquashingTransform squashing;
Squashing squashing;
Chunk cur_chunk;
Chunk finish_chunk;
};
/// Doesn't care about propagating exceptions and thus doesn't throw LOGICAL_ERROR if the following transform closes its input port.
class SimpleSquashingChunksTransform : public IInflatingTransform
class SimpleSquashingTransform : public ISimpleTransform
{
public:
explicit SimpleSquashingChunksTransform(const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes);
explicit SimpleSquashingTransform(const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes);
String getName() const override { return "SimpleSquashingTransform"; }
protected:
void consume(Chunk chunk) override;
bool canGenerate() override;
Chunk generate() override;
Chunk getRemaining() override;
void transform(Chunk &) override;
IProcessor::Status prepare() override;
private:
SquashingTransform squashing;
Chunk squashed_chunk;
};
Squashing squashing;
bool finished = false;
};
}

View File

@ -6,7 +6,8 @@
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Parsers/ASTInsertQuery.h>
#include <Processors/Transforms/CountingTransform.h>
#include <Processors/Transforms/SquashingChunksTransform.h>
#include <Processors/Transforms/PlanSquashingTransform.h>
#include <Processors/Transforms/SquashingTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Storages/LiveView/StorageLiveView.h>
@ -371,7 +372,7 @@ std::optional<Chain> generateViewChain(
bool table_prefers_large_blocks = inner_table->prefersLargeBlocks();
const auto & settings = insert_context->getSettingsRef();
out.addSource(std::make_shared<SquashingChunksTransform>(
out.addSource(std::make_shared<SquashingTransform>(
out.getInputHeader(),
table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size,
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL));
@ -622,7 +623,7 @@ static QueryPipeline process(Block block, ViewRuntimeData & view, const ViewsDat
/// Squashing is needed here because the materialized view query can generate a lot of blocks
/// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY
/// and two-level aggregation is triggered).
pipeline.addTransform(std::make_shared<SquashingChunksTransform>(
pipeline.addTransform(std::make_shared<SquashingTransform>(
pipeline.getHeader(),
context->getSettingsRef().min_insert_block_size_rows,
context->getSettingsRef().min_insert_block_size_bytes));

View File

@ -1046,12 +1046,21 @@ void HTTPHandler::formatExceptionForClient(int exception_code, HTTPServerRequest
/// FIXME: make sure that no one else is reading from the same stream at the moment.
/// If HTTP method is POST and Keep-Alive is turned on, we should read the whole request body
/// If HTTP method is POST and Keep-Alive is turned on, we should try to read the whole request body
/// to avoid reading part of the current request body in the next request.
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST && response.getKeepAlive()
&& exception_code != ErrorCodes::HTTP_LENGTH_REQUIRED && !request.getStream().eof())
&& exception_code != ErrorCodes::HTTP_LENGTH_REQUIRED)
{
request.getStream().ignoreAll();
try
{
if (!request.getStream().eof())
request.getStream().ignoreAll();
}
catch (...)
{
tryLogCurrentException(log, "Cannot read remaining request body during exception handling");
response.setKeepAlive(false);
}
}
if (exception_code == ErrorCodes::REQUIRED_PASSWORD)
@ -1063,7 +1072,6 @@ void HTTPHandler::formatExceptionForClient(int exception_code, HTTPServerRequest
void HTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event)
{
setThreadName("HTTPHandler");
ThreadStatus thread_status;
session = std::make_unique<Session>(server.context(), ClientInfo::Interface::HTTP, request.isSecure());
SCOPE_EXIT({ session.reset(); });

View File

@ -8,7 +8,6 @@
#include <Interpreters/InterserverIOHandler.h>
#include <Server/HTTP/HTMLForm.h>
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
#include <Common/ThreadStatus.h>
#include <Common/logger_useful.h>
#include <Common/setThreadName.h>
@ -81,7 +80,6 @@ void InterserverIOHTTPHandler::processQuery(HTTPServerRequest & request, HTTPSer
void InterserverIOHTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event)
{
setThreadName("IntersrvHandler");
ThreadStatus thread_status;
/// In order to work keep-alive.
if (request.getVersion() == HTTPServerRequest::HTTP_1_1)

View File

@ -309,7 +309,6 @@ Poco::Timespan KeeperTCPHandler::receiveHandshake(int32_t handshake_length, bool
void KeeperTCPHandler::runImpl()
{
setThreadName("KeeperHandler");
ThreadStatus thread_status;
socket().setReceiveTimeout(receive_timeout);
socket().setSendTimeout(send_timeout);

View File

@ -24,7 +24,6 @@
#include <Common/CurrentThread.h>
#include <Common/NetException.h>
#include <Common/OpenSSLHelpers.h>
#include <Common/ThreadStatus.h>
#include <Common/config_version.h>
#include <Common/logger_useful.h>
#include <Common/re2.h>
@ -199,7 +198,6 @@ MySQLHandler::~MySQLHandler() = default;
void MySQLHandler::run()
{
setThreadName("MySQLHandler");
ThreadStatus thread_status;
session = std::make_unique<Session>(server.context(), ClientInfo::Interface::MYSQL);
SCOPE_EXIT({ session.reset(); });

View File

@ -10,7 +10,6 @@
#include <base/scope_guard.h>
#include <pcg_random.hpp>
#include <Common/CurrentThread.h>
#include <Common/ThreadStatus.h>
#include <Common/config_version.h>
#include <Common/randomSeed.h>
#include <Common/setThreadName.h>
@ -59,7 +58,6 @@ void PostgreSQLHandler::changeIO(Poco::Net::StreamSocket & socket)
void PostgreSQLHandler::run()
{
setThreadName("PostgresHandler");
ThreadStatus thread_status;
session = std::make_unique<Session>(server.context(), ClientInfo::Interface::POSTGRESQL);
SCOPE_EXIT({ session.reset(); });

View File

@ -1,9 +1,8 @@
#include "Interpreters/AsynchronousInsertQueue.h"
#include "Interpreters/SquashingTransform.h"
#include "Parsers/ASTInsertQuery.h"
#include <Interpreters/AsynchronousInsertQueue.h>
#include <Interpreters/Squashing.h>
#include <Parsers/ASTInsertQuery.h>
#include <algorithm>
#include <exception>
#include <iterator>
#include <memory>
#include <mutex>
#include <vector>
@ -246,7 +245,6 @@ TCPHandler::~TCPHandler()
void TCPHandler::runImpl()
{
setThreadName("TCPHandler");
ThreadStatus thread_status;
extractConnectionSettingsFromContext(server.context());
@ -886,13 +884,16 @@ AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(Asynchro
using PushResult = AsynchronousInsertQueue::PushResult;
startInsertQuery();
SquashingTransform squashing(0, query_context->getSettingsRef().async_insert_max_data_size);
Squashing squashing(state.input_header, 0, query_context->getSettingsRef().async_insert_max_data_size);
while (readDataNext())
{
auto result = squashing.add(std::move(state.block_for_insert));
if (result)
squashing.header = state.block_for_insert;
auto planned_chunk = squashing.add({state.block_for_insert.getColumns(), state.block_for_insert.rows()});
if (planned_chunk.hasChunkInfo())
{
Chunk result_chunk = DB::Squashing::squash(std::move(planned_chunk));
auto result = state.block_for_insert.cloneWithColumns(result_chunk.getColumns());
return PushResult
{
.status = PushResult::TOO_MUCH_DATA,
@ -901,7 +902,12 @@ AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(Asynchro
}
}
auto result = squashing.add({});
auto planned_chunk = squashing.flush();
Chunk result_chunk;
if (planned_chunk.hasChunkInfo())
result_chunk = DB::Squashing::squash(std::move(planned_chunk));
auto result = squashing.header.cloneWithColumns(result_chunk.getColumns());
return insert_queue.pushQueryWithBlock(state.parsed_query, std::move(result), query_context);
}

View File

@ -7,7 +7,6 @@
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <Common/Stopwatch.h>
#include <Common/ThreadStatus.h>
#include <Core/Protocol.h>
#include <Core/QueryProcessingStage.h>
#include <IO/Progress.h>

View File

@ -21,7 +21,7 @@ limitations under the License. */
#include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Transforms/SquashingChunksTransform.h>
#include <Processors/Transforms/SquashingTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <QueryPipeline/QueryPlanResourceHolder.h>
#include <Common/logger_useful.h>
@ -626,7 +626,7 @@ QueryPipelineBuilder StorageLiveView::completeQuery(Pipes pipes)
/// and two-level aggregation is triggered).
builder.addSimpleTransform([&](const Block & cur_header)
{
return std::make_shared<SquashingChunksTransform>(
return std::make_shared<SquashingTransform>(
cur_header,
getContext()->getSettingsRef().min_insert_block_size_rows,
getContext()->getSettingsRef().min_insert_block_size_bytes);

View File

@ -18,6 +18,7 @@
#include <Functions/CastOverloadResolver.h>
#include <Functions/IFunction.h>
#include <Common/FieldVisitorToString.h>
#include <Common/HilbertUtils.h>
#include <Common/MortonUtils.h>
#include <Common/typeid_cast.h>
#include <Columns/ColumnSet.h>
@ -689,6 +690,11 @@ static const ActionsDAG::Node & cloneASTWithInversionPushDown(
return *res;
}
const std::unordered_map<String, KeyCondition::SpaceFillingCurveType> KeyCondition::space_filling_curve_name_to_type {
{"mortonEncode", SpaceFillingCurveType::Morton},
{"hilbertEncode", SpaceFillingCurveType::Hilbert}
};
ActionsDAGPtr KeyCondition::cloneASTWithInversionPushDown(ActionsDAG::NodeRawConstPtrs nodes, const ContextPtr & context)
{
auto res = std::make_shared<ActionsDAG>();
@ -744,16 +750,17 @@ static NameSet getAllSubexpressionNames(const ExpressionActions & key_expr)
void KeyCondition::getAllSpaceFillingCurves()
{
/// So far the only supported function is mortonEncode (Morton curve).
/// So far the only supported function is mortonEncode and hilbertEncode (Morton and Hilbert curves).
for (const auto & action : key_expr->getActions())
{
if (action.node->type == ActionsDAG::ActionType::FUNCTION
&& action.node->children.size() >= 2
&& action.node->function_base->getName() == "mortonEncode")
&& space_filling_curve_name_to_type.contains(action.node->function_base->getName()))
{
SpaceFillingCurveDescription curve;
curve.function_name = action.node->function_base->getName();
curve.type = space_filling_curve_name_to_type.at(curve.function_name);
curve.key_column_pos = key_columns.at(action.node->result_name);
for (const auto & child : action.node->children)
{
@ -2665,6 +2672,15 @@ BoolMask KeyCondition::checkInHyperrectangle(
const DataTypes & data_types) const
{
std::vector<BoolMask> rpn_stack;
auto curve_type = [&](size_t key_column_pos)
{
for (const auto & curve : key_space_filling_curves)
if (curve.key_column_pos == key_column_pos)
return curve.type;
return SpaceFillingCurveType::Unknown;
};
for (const auto & element : rpn)
{
if (element.argument_num_of_space_filling_curve.has_value())
@ -2764,26 +2780,43 @@ BoolMask KeyCondition::checkInHyperrectangle(
UInt64 right = key_range.right.get<UInt64>();
BoolMask mask(false, true);
mortonIntervalToHyperrectangles<2>(left, right,
[&](std::array<std::pair<UInt64, UInt64>, 2> morton_hyperrectangle)
auto hyperrectangle_intersection_callback = [&](std::array<std::pair<UInt64, UInt64>, 2> curve_hyperrectangle)
{
BoolMask current_intersection(true, false);
for (size_t dim = 0; dim < num_dimensions; ++dim)
{
BoolMask current_intersection(true, false);
for (size_t dim = 0; dim < num_dimensions; ++dim)
{
const Range & condition_arg_range = element.space_filling_curve_args_hyperrectangle[dim];
const Range & condition_arg_range = element.space_filling_curve_args_hyperrectangle[dim];
const Range morton_arg_range(
morton_hyperrectangle[dim].first, true,
morton_hyperrectangle[dim].second, true);
const Range curve_arg_range(
curve_hyperrectangle[dim].first, true,
curve_hyperrectangle[dim].second, true);
bool intersects = condition_arg_range.intersectsRange(morton_arg_range);
bool contains = condition_arg_range.containsRange(morton_arg_range);
bool intersects = condition_arg_range.intersectsRange(curve_arg_range);
bool contains = condition_arg_range.containsRange(curve_arg_range);
current_intersection = current_intersection & BoolMask(intersects, !contains);
}
current_intersection = current_intersection & BoolMask(intersects, !contains);
}
mask = mask | current_intersection;
});
mask = mask | current_intersection;
};
switch (curve_type(element.key_column))
{
case SpaceFillingCurveType::Hilbert:
{
hilbertIntervalToHyperrectangles2D(left, right, hyperrectangle_intersection_callback);
break;
}
case SpaceFillingCurveType::Morton:
{
mortonIntervalToHyperrectangles<2>(left, right, hyperrectangle_intersection_callback);
break;
}
case SpaceFillingCurveType::Unknown:
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "curve_type is `Unknown`. It is a bug.");
}
}
rpn_stack.emplace_back(mask);
}

View File

@ -328,11 +328,20 @@ private:
const NameSet key_subexpr_names;
/// Space-filling curves in the key
enum class SpaceFillingCurveType
{
Unknown = 0,
Morton,
Hilbert
};
static const std::unordered_map<String, SpaceFillingCurveType> space_filling_curve_name_to_type;
struct SpaceFillingCurveDescription
{
size_t key_column_pos;
String function_name;
std::vector<String> arguments;
SpaceFillingCurveType type;
};
using SpaceFillingCurveDescriptions = std::vector<SpaceFillingCurveDescription>;
SpaceFillingCurveDescriptions key_space_filling_curves;

View File

@ -7,7 +7,7 @@
#include <Storages/Statistics/Statistics.h>
#include <Columns/ColumnsNumber.h>
#include <Parsers/queryToString.h>
#include <Interpreters/SquashingTransform.h>
#include <Interpreters/Squashing.h>
#include <Interpreters/MergeTreeTransaction.h>
#include <Interpreters/PreparedSets.h>
#include <Processors/Transforms/TTLTransform.h>
@ -29,6 +29,7 @@
#include <DataTypes/DataTypeVariant.h>
#include <boost/algorithm/string/replace.hpp>
#include <Common/ProfileEventsScope.h>
#include <Core/ColumnsWithTypeAndName.h>
namespace ProfileEvents
@ -1267,7 +1268,7 @@ private:
ProjectionNameToItsBlocks projection_parts;
std::move_iterator<ProjectionNameToItsBlocks::iterator> projection_parts_iterator;
std::vector<SquashingTransform> projection_squashes;
std::vector<Squashing> projection_squashes;
const ProjectionsDescription & projections;
ExecutableTaskPtr merge_projection_parts_task_ptr;
@ -1286,7 +1287,7 @@ void PartMergerWriter::prepare()
for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i)
{
// We split the materialization into multiple stages similar to the process of INSERT SELECT query.
projection_squashes.emplace_back(settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes);
projection_squashes.emplace_back(ctx->updated_header, settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes);
}
existing_rows_count = 0;
@ -1311,16 +1312,18 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()
{
const auto & projection = *ctx->projections_to_build[i];
Block projection_block;
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::MutateTaskProjectionsCalculationMicroseconds);
projection_block = projection_squashes[i].add(projection.calculate(cur_block, ctx->context));
}
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::MutateTaskProjectionsCalculationMicroseconds);
Block block_to_squash = projection.calculate(cur_block, ctx->context);
projection_squashes[i].header = block_to_squash;
Chunk planned_chunk = projection_squashes[i].add({block_to_squash.getColumns(), block_to_squash.rows()});
if (projection_block)
if (planned_chunk.hasChunkInfo())
{
Chunk projection_chunk = DB::Squashing::squash(std::move(planned_chunk));
auto result = block_to_squash.cloneWithColumns(projection_chunk.getColumns());
auto tmp_part = MergeTreeDataWriter::writeTempProjectionPart(
*ctx->data, ctx->log, projection_block, projection, ctx->new_data_part.get(), ++block_num);
*ctx->data, ctx->log, result, projection, ctx->new_data_part.get(), ++block_num);
tmp_part.finalize();
tmp_part.part->getDataPartStorage().commitTransaction();
projection_parts[projection.name].emplace_back(std::move(tmp_part.part));
@ -1338,12 +1341,15 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()
for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i)
{
const auto & projection = *ctx->projections_to_build[i];
auto & projection_squash = projection_squashes[i];
auto projection_block = projection_squash.add({});
if (projection_block)
auto & projection_squash_plan = projection_squashes[i];
auto planned_chunk = projection_squash_plan.flush();
if (planned_chunk.hasChunkInfo())
{
Chunk projection_chunk = DB::Squashing::squash(std::move(planned_chunk));
auto result = projection_squash_plan.header.cloneWithColumns(projection_chunk.getColumns());
auto temp_part = MergeTreeDataWriter::writeTempProjectionPart(
*ctx->data, ctx->log, projection_block, projection, ctx->new_data_part.get(), ++block_num);
*ctx->data, ctx->log, result, projection, ctx->new_data_part.get(), ++block_num);
temp_part.finalize();
temp_part.part->getDataPartStorage().commitTransaction();
projection_parts[projection.name].emplace_back(std::move(temp_part.part));

View File

@ -16,7 +16,8 @@
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Transforms/SquashingChunksTransform.h>
#include <Processors/Transforms/PlanSquashingTransform.h>
#include <Processors/Transforms/SquashingTransform.h>
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <base/range.h>
@ -306,7 +307,9 @@ Block ProjectionDescription::calculate(const Block & block, ContextPtr context)
builder.resize(1);
// Generate aggregated blocks with rows less or equal than the original block.
// There should be only one output block after this transformation.
builder.addTransform(std::make_shared<SquashingChunksTransform>(builder.getHeader(), block.rows(), 0));
builder.addTransform(std::make_shared<PlanSquashingTransform>(builder.getHeader(), block.rows(), 0, 1));
builder.addTransform(std::make_shared<ApplySquashingTransform>(builder.getHeader(), block.rows(), 0));
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
PullingPipelineExecutor executor(pipeline);

View File

@ -140,6 +140,9 @@ class IMergeTreeDataPart;
using ManyExpressionActions = std::vector<ExpressionActionsPtr>;
struct StorageSnapshot;
using StorageSnapshotPtr = std::shared_ptr<StorageSnapshot>;
/** Query along with some additional data,
* that can be used during query processing
* inside storage engines.
@ -173,6 +176,13 @@ struct SelectQueryInfo
/// Local storage limits
StorageLimits local_storage_limits;
/// This is a leak of abstraction.
/// StorageMerge replaces storage into query_tree. However, column types may be changed for inner table.
/// So, resolved query tree might have incompatible types.
/// StorageDistributed uses this query tree to calculate a header, throws if we use storage snapshot.
/// To avoid this, we use initial merge_storage_snapshot.
StorageSnapshotPtr merge_storage_snapshot;
/// Cluster for the query.
ClusterPtr cluster;
/// Optimized cluster for the query.

View File

@ -846,7 +846,7 @@ void StorageDistributed::read(
remote_storage_id = StorageID{remote_database, remote_table};
auto query_tree_distributed = buildQueryTreeDistributed(modified_query_info,
storage_snapshot,
query_info.merge_storage_snapshot ? query_info.merge_storage_snapshot : storage_snapshot,
remote_storage_id,
remote_table_function_ptr);
header = InterpreterSelectQueryAnalyzer::getSampleBlock(query_tree_distributed, local_context, SelectQueryOptions(processed_stage).analyze());

View File

@ -889,6 +889,8 @@ SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const ContextMutablePtr & mo
SelectQueryInfo modified_query_info = query_info;
modified_query_info.merge_storage_snapshot = merge_storage_snapshot;
if (modified_query_info.planner_context)
modified_query_info.planner_context = std::make_shared<PlannerContext>(modified_context, modified_query_info.planner_context);
@ -1198,7 +1200,10 @@ ReadFromMerge::ChildPlan ReadFromMerge::createPlanForTable(
if (allow_experimental_analyzer)
{
InterpreterSelectQueryAnalyzer interpreter(modified_query_info.query_tree,
/// Converting query to AST because types might be different in the source table.
/// Need to resolve types again.
auto ast = modified_query_info.query_tree->toAST();
InterpreterSelectQueryAnalyzer interpreter(ast,
modified_context,
SelectQueryOptions(processed_stage));

View File

@ -35,7 +35,7 @@
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/WatermarkTransform.h>
#include <Processors/Transforms/SquashingChunksTransform.h>
#include <Processors/Transforms/SquashingTransform.h>
#include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
@ -633,7 +633,7 @@ std::pair<BlocksPtr, Block> StorageWindowView::getNewBlocks(UInt32 watermark)
});
builder.addSimpleTransform([&](const Block & current_header)
{
return std::make_shared<SquashingChunksTransform>(
return std::make_shared<SquashingTransform>(
current_header,
getContext()->getSettingsRef().min_insert_block_size_rows,
getContext()->getSettingsRef().min_insert_block_size_bytes);
@ -1532,7 +1532,7 @@ void StorageWindowView::writeIntoWindowView(
builder = select_block.buildQueryPipeline();
builder.addSimpleTransform([&](const Block & current_header)
{
return std::make_shared<SquashingChunksTransform>(
return std::make_shared<SquashingTransform>(
current_header,
local_context->getSettingsRef().min_insert_block_size_rows,
local_context->getSettingsRef().min_insert_block_size_bytes);

View File

@ -17,7 +17,7 @@
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/Transforms/SquashingChunksTransform.h>
#include <Processors/Transforms/SquashingTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/removeGroupingFunctionSpecializations.h>
#include <Storages/StorageDistributed.h>
@ -290,7 +290,7 @@ TableNodePtr executeSubqueryNode(const QueryTreeNodePtr & subquery_node,
size_t min_block_size_rows = mutable_context->getSettingsRef().min_external_table_block_size_rows;
size_t min_block_size_bytes = mutable_context->getSettingsRef().min_external_table_block_size_bytes;
auto squashing = std::make_shared<SimpleSquashingChunksTransform>(builder->getHeader(), min_block_size_rows, min_block_size_bytes);
auto squashing = std::make_shared<SimpleSquashingTransform>(builder->getHeader(), min_block_size_rows, min_block_size_bytes);
builder->resize(1);
builder->addTransform(std::move(squashing));

View File

@ -208,7 +208,7 @@ class StatusNames(metaclass=WithIter):
# mergeable status
MERGEABLE = "Mergeable Check"
# status of a sync pr
SYNC = "A Sync"
SYNC = "Cloud fork sync (only for ClickHouse Inc. employees)"
# PR formatting check status
PR_CHECK = "PR Check"

View File

@ -0,0 +1,44 @@
<clickhouse>
<keeper_server>
<s3_snapshot>
<endpoint>http://minio1:9001/snapshots/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_snapshot>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<four_letter_word_white_list>*</four_letter_word_white_list>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<min_session_timeout_ms>5000</min_session_timeout_ms>
<snapshot_distance>50</snapshot_distance>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
<global_profiler_real_time_period_ns>1000000000</global_profiler_real_time_period_ns>
<global_profiler_cpu_time_period_ns>1000000000</global_profiler_cpu_time_period_ns>
</clickhouse>

View File

@ -0,0 +1,44 @@
<clickhouse>
<keeper_server>
<s3_snapshot>
<endpoint>http://minio1:9001/snapshots/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_snapshot>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<four_letter_word_white_list>*</four_letter_word_white_list>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<min_session_timeout_ms>5000</min_session_timeout_ms>
<snapshot_distance>75</snapshot_distance>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
<global_profiler_real_time_period_ns>1000000000</global_profiler_real_time_period_ns>
<global_profiler_cpu_time_period_ns>1000000000</global_profiler_cpu_time_period_ns>
</clickhouse>

View File

@ -0,0 +1,44 @@
<clickhouse>
<keeper_server>
<s3_snapshot>
<endpoint>http://minio1:9001/snapshots/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_snapshot>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<four_letter_word_white_list>*</four_letter_word_white_list>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<min_session_timeout_ms>5000</min_session_timeout_ms>
<snapshot_distance>75</snapshot_distance>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
<global_profiler_real_time_period_ns>1000000000</global_profiler_real_time_period_ns>
<global_profiler_cpu_time_period_ns>1000000000</global_profiler_cpu_time_period_ns>
</clickhouse>

View File

@ -0,0 +1,96 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
from helpers.keeper_utils import KeeperClient, KeeperException
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node1",
main_configs=["configs/keeper_config1.xml"],
stay_alive=True,
)
node2 = cluster.add_instance(
"node2",
main_configs=["configs/keeper_config2.xml"],
stay_alive=True,
with_minio=True,
)
node3 = cluster.add_instance(
"node3",
main_configs=["configs/keeper_config3.xml"],
stay_alive=True,
with_minio=True,
)
@pytest.fixture(scope="module", autouse=True)
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_profiler(started_cluster):
node = cluster.instances["node1"]
if node.is_built_with_sanitizer():
return
node.query(
"CREATE TABLE t (key UInt32, value String) Engine = ReplicatedMergeTree('/clickhouse-tables/test1', 'r1') ORDER BY key"
)
for _ in range(100):
node.query("INSERT INTO t SELECT number, toString(number) from numbers(100)")
node.query("system flush logs")
assert int(node.query("exists system.trace_log"))
result = node.query(
"""
set allow_introspection_functions=1;
system flush logs;
select cnt from (
select count() as cnt, formatReadableSize(sum(size)),
arrayStringConcat(
arrayMap(x, y -> concat(x, ': ', y), arrayMap(x -> addressToLine(x), trace), arrayMap(x -> demangle(addressToSymbol(x)), trace)),
'\n') as trace
from system.trace_log where trace_type = Real and (trace ilike '%KeeperTCPHandler%' or trace ilike '%KeeperDispatcher%') group by trace order by cnt desc) limit 1;
"""
)
if len(result) == 0:
assert 0 < int(
node.query(
"""
set allow_introspection_functions=1;
system flush logs;
select sum(cnt) from (
select count() as cnt, formatReadableSize(sum(size)),
arrayStringConcat(
arrayMap(x, y -> concat(x, ': ', y), arrayMap(x -> addressToLine(x), trace), arrayMap(x -> demangle(addressToSymbol(x)), trace)),
'\n') as trace
from system.trace_log where trace_type = Real group by trace);
"""
)
)
result = node.query(
"""
set allow_introspection_functions=1;
system flush logs;
select * from (
select count() as cnt, formatReadableSize(sum(size)),
arrayStringConcat(
arrayMap(x, y -> concat(x, ': ', y), arrayMap(x -> addressToLine(x), trace), arrayMap(x -> demangle(addressToSymbol(x)), trace)),
'\n') as trace
from system.trace_log where trace_type = Real group by trace);
"""
)
print(result)
assert False
assert 1 < int(result)

View File

@ -877,7 +877,7 @@ def test_max_set_age(started_cluster):
assert "Cannot parse input" in node.query(
"SELECT exception FROM system.s3queue WHERE file_name ilike '%fff.csv' ORDER BY processing_end_time DESC LIMIT 1"
)
assert 2 == int(
assert 1 < int(
node.query(
"SELECT count() FROM system.s3queue_log WHERE file_name ilike '%fff.csv' AND notEmpty(exception)"
)

View File

@ -0,0 +1,5 @@
<clickhouse>
<max_table_num_to_throw>10</max_table_num_to_throw>
<max_database_num_to_throw>10</max_database_num_to_throw>
</clickhouse>

View File

@ -0,0 +1,43 @@
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1", main_configs=["config/config.xml"], with_zookeeper=True
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_table_db_limit(started_cluster):
for i in range(10):
node1.query("create database db{}".format(i))
with pytest.raises(QueryRuntimeException) as exp_info:
node1.query("create database db_exp".format(i))
assert "TOO_MANY_DATABASES" in str(exp_info)
for i in range(10):
node1.query("create table t{} (a Int32) Engine = Log".format(i))
node1.query("system flush logs")
for i in range(10):
node1.query("drop table t{}".format(i))
for i in range(10):
node1.query("create table t{} (a Int32) Engine = Log".format(i))
with pytest.raises(QueryRuntimeException) as exp_info:
node1.query("create table default.tx (a Int32) Engine = Log")
assert "TOO_MANY_TABLES" in str(exp_info)

View File

@ -163,7 +163,6 @@ Filter column: notEquals(__table1.y, 2_UInt8)
> filter is pushed down before CreatingSets
CreatingSets
Filter
Filter
1
3
> one condition of filter is pushed down before LEFT JOIN

View File

@ -0,0 +1,10 @@
Filter (((WHERE + (Change column names to column identifiers + (Project names + Projection))) + HAVING))
Filter column: and(notEquals(sum(__table2.number), 0_UInt8), equals(__table1.key, 7_UInt8)) (removed)
Aggregating
Filter (( + (Before GROUP BY + Change column names to column identifiers)))
Filter column: equals(__table1.key, 7_UInt8) (removed)
Filter (((WHERE + (Projection + Before ORDER BY)) + HAVING))
Filter column: and(notEquals(sum(number), 0), equals(key, 7)) (removed)
Aggregating
Filter ((( + Before GROUP BY) + WHERE))
Filter column: and(equals(bitAnd(number, 15), 7), equals(key, 7)) (removed)

View File

@ -0,0 +1,5 @@
set allow_experimental_analyzer=1;
select explain from (explain actions = 1 select * from (select sum(number) as v, bitAnd(number, 15) as key from numbers(1e8) group by key having v != 0) where key = 7) where explain like '%Filter%' or explain like '%Aggregating%';
set allow_experimental_analyzer=0;
select explain from (explain actions = 1 select * from (select sum(number) as v, bitAnd(number, 15) as key from numbers(1e8) group by key having v != 0) where key = 7) where explain like '%Filter%' or explain like '%Aggregating%';

View File

@ -4,6 +4,12 @@
Prewhere info
Prewhere filter
Prewhere filter column: and(notEmpty(v), equals(k, 3)) (removed)
Prewhere info
Prewhere filter
Prewhere filter column: and(notEmpty(v), equals(k, 3)) (removed)
Prewhere info
Prewhere filter
Prewhere filter column: and(notEmpty(v), equals(k, 3)) (removed)
2
Filter column: and(equals(k, 3), notEmpty(v)) (removed)
Prewhere info

View File

@ -24,7 +24,8 @@ INSERT INTO t_02156_mt1 SELECT number, toString(number) FROM numbers(10000);
INSERT INTO t_02156_mt2 SELECT number, toString(number) FROM numbers(10000);
INSERT INTO t_02156_log SELECT number, toString(number) FROM numbers(10000);
SELECT replaceRegexpAll(explain, '__table1\.|_UInt8', '') FROM (EXPLAIN actions=1 SELECT count() FROM t_02156_merge1 WHERE k = 3 AND notEmpty(v)) WHERE explain LIKE '%Prewhere%' OR explain LIKE '%Filter column%';
SELECT replaceRegexpAll(explain, '__table1\.|_UInt8', '') FROM (EXPLAIN actions=1 SELECT count() FROM t_02156_merge1 WHERE k = 3 AND notEmpty(v)) WHERE explain LIKE '%Prewhere%' OR explain LIKE '%Filter column%' settings allow_experimental_analyzer=1;
SELECT replaceRegexpAll(explain, '__table1\.|_UInt8', '') FROM (EXPLAIN actions=1 SELECT count() FROM t_02156_merge1 WHERE k = 3 AND notEmpty(v)) WHERE explain LIKE '%Prewhere%' OR explain LIKE '%Filter column%' settings allow_experimental_analyzer=0;
SELECT count() FROM t_02156_merge1 WHERE k = 3 AND notEmpty(v);
SELECT replaceRegexpAll(explain, '__table1\.|_UInt8', '') FROM (EXPLAIN actions=1 SELECT count() FROM t_02156_merge2 WHERE k = 3 AND notEmpty(v)) WHERE explain LIKE '%Prewhere%' OR explain LIKE '%Filter column%';

View File

@ -0,0 +1,12 @@
1 a
1 a
2 b
2 b
1 a
1 a
2 b
2 b
1 a
2 b
1 a
2 b

View File

@ -0,0 +1,17 @@
DROP TABLE IF EXISTS t_02156_ololo_1;
DROP TABLE IF EXISTS t_02156_ololo_2;
DROP TABLE IF EXISTS t_02156_ololo_dist;
CREATE TABLE t_02156_ololo_1 (k UInt32, v Nullable(String)) ENGINE = MergeTree order by k;
CREATE TABLE t_02156_ololo_2 (k UInt32, v String) ENGINE = MergeTree order by k;
CREATE TABLE t_02156_ololo_dist (k UInt32, v String) ENGINE = Distributed(test_shard_localhost, currentDatabase(), t_02156_ololo_2);
CREATE TABLE t_02156_ololo_dist2 (k UInt32, v Nullable(String)) ENGINE = Distributed(test_shard_localhost, currentDatabase(), t_02156_ololo_1);
insert into t_02156_ololo_1 values (1, 'a');
insert into t_02156_ololo_2 values (2, 'b');
select * from merge('t_02156_ololo') where k != 0 and notEmpty(v) order by k settings optimize_move_to_prewhere=0;
select * from merge('t_02156_ololo') where k != 0 and notEmpty(v) order by k settings optimize_move_to_prewhere=1;
select * from merge('t_02156_ololo_dist') where k != 0 and notEmpty(v) order by k settings optimize_move_to_prewhere=0;
select * from merge('t_02156_ololo_dist') where k != 0 and notEmpty(v) order by k settings optimize_move_to_prewhere=1;

View File

@ -16,7 +16,7 @@ def main():
sock.settimeout(60)
s = "POST / HTTP/1.1\r\n"
s += "Host: %s\r\n" % host
s += "Content-type: multipart/form-data\r\n"
s += "Content-type: multipart/form-data; boundary=--b3f1zid8kqwy\r\n"
s += "Transfer-encoding: chunked\r\n"
s += "\r\n"
s += "ffffffffffffffff"

View File

@ -1,3 +1,3 @@
HTTP/1.1 200 OK
HTTP/1.1 500 Internal Server Error
encoding type chunked
error code 1000
error code 69

View File

@ -332,13 +332,12 @@ SETTINGS optimize_aggregators_of_group_by_keys=0 -- avoid removing any() as it d
Expression (Projection)
Sorting (Sorting for ORDER BY)
Expression (Before ORDER BY)
Filter ((WHERE + (Projection + Before ORDER BY)))
Filter (HAVING)
Aggregating
Expression ((Before GROUP BY + Projection))
Sorting (Sorting for ORDER BY)
Expression ((Before ORDER BY + (Projection + Before ORDER BY)))
ReadFromSystemNumbers
Filter (((WHERE + (Projection + Before ORDER BY)) + HAVING))
Aggregating
Expression ((Before GROUP BY + Projection))
Sorting (Sorting for ORDER BY)
Expression ((Before ORDER BY + (Projection + Before ORDER BY)))
ReadFromSystemNumbers
-- execute
1
2

View File

@ -29,20 +29,16 @@ WHERE type_1 = \'all\'
ExpressionTransform × 2
(Filter)
FilterTransform × 2
(Filter)
FilterTransform × 2
(Filter)
FilterTransform × 2
(Aggregating)
ExpressionTransform × 2
AggregatingTransform × 2
Copy 1 → 2
(Expression)
ExpressionTransform
(Expression)
ExpressionTransform
(ReadFromMergeTree)
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
(Aggregating)
ExpressionTransform × 2
AggregatingTransform × 2
Copy 1 → 2
(Expression)
ExpressionTransform
(Expression)
ExpressionTransform
(ReadFromMergeTree)
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
(Expression)
ExpressionTransform × 2
(Filter)
@ -68,14 +64,10 @@ ExpressionTransform × 2
ExpressionTransform × 2
AggregatingTransform × 2
Copy 1 → 2
(Filter)
FilterTransform
(Filter)
FilterTransform
(Expression)
ExpressionTransform
(ReadFromMergeTree)
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
(Expression)
ExpressionTransform
(ReadFromMergeTree)
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
(Expression)
ExpressionTransform × 2
(Aggregating)

View File

@ -0,0 +1,9 @@
121
121
32
21
10
32
22
11
1

View File

@ -0,0 +1,35 @@
DROP TABLE IF EXISTS test_hilbert_encode_hilbert_encode;
CREATE TABLE test_hilbert_encode (x UInt32, y UInt32) ENGINE = MergeTree ORDER BY hilbertEncode(x, y) SETTINGS index_granularity = 8192, index_granularity_bytes = '1Mi';
INSERT INTO test_hilbert_encode SELECT number DIV 1024, number % 1024 FROM numbers(1048576);
SET max_rows_to_read = 8192, force_primary_key = 1, analyze_index_with_space_filling_curves = 1;
SELECT count() FROM test_hilbert_encode WHERE x >= 10 AND x <= 20 AND y >= 20 AND y <= 30;
SET max_rows_to_read = 8192, force_primary_key = 1, analyze_index_with_space_filling_curves = 0;
SELECT count() FROM test_hilbert_encode WHERE x >= 10 AND x <= 20 AND y >= 20 AND y <= 30; -- { serverError 277 }
DROP TABLE test_hilbert_encode;
-- The same, but with more precise index
CREATE TABLE test_hilbert_encode (x UInt32, y UInt32) ENGINE = MergeTree ORDER BY hilbertEncode(x, y) SETTINGS index_granularity = 1;
SET max_rows_to_read = 0;
INSERT INTO test_hilbert_encode SELECT number DIV 32, number % 32 FROM numbers(1024);
SET max_rows_to_read = 200, force_primary_key = 1, analyze_index_with_space_filling_curves = 1;
SELECT count() FROM test_hilbert_encode WHERE x >= 10 AND x <= 20 AND y >= 20 AND y <= 30;
-- Various other conditions
SELECT count() FROM test_hilbert_encode WHERE x = 10 SETTINGS max_rows_to_read = 49;
SELECT count() FROM test_hilbert_encode WHERE x = 10 AND y > 10 SETTINGS max_rows_to_read = 33;
SELECT count() FROM test_hilbert_encode WHERE x = 10 AND y < 10 SETTINGS max_rows_to_read = 15;
SELECT count() FROM test_hilbert_encode WHERE y = 10 SETTINGS max_rows_to_read = 50;
SELECT count() FROM test_hilbert_encode WHERE x >= 10 AND y = 10 SETTINGS max_rows_to_read = 35;
SELECT count() FROM test_hilbert_encode WHERE y = 10 AND x <= 10 SETTINGS max_rows_to_read = 17;
SELECT count() FROM test_hilbert_encode PREWHERE x >= 10 WHERE x < 11 AND y = 10 SETTINGS max_rows_to_read = 2;
DROP TABLE test_hilbert_encode;

View File

@ -0,0 +1,3 @@
100
49
100

View File

@ -0,0 +1,11 @@
drop table if exists test_qualify;
create table test_qualify (number Int64) ENGINE = MergeTree ORDER BY (number);
insert into test_qualify SELECT * FROM numbers(100);
select count() from test_qualify; -- 100
select * from test_qualify qualify row_number() over (order by number) = 50 SETTINGS allow_experimental_analyzer = 1; -- 49
select * from test_qualify qualify row_number() over (order by number) = 50 SETTINGS allow_experimental_analyzer = 0; -- { serverError NOT_IMPLEMENTED }
delete from test_qualify where number in (select number from test_qualify qualify row_number() over (order by number) = 50); -- { serverError UNFINISHED }
select count() from test_qualify; -- 100