Merge branch 'master' into split-cast-overload-resolver

This commit is contained in:
Alexey Milovidov 2024-03-16 14:07:59 +01:00
commit 9be38c064a
85 changed files with 1294 additions and 316 deletions

View File

@ -45,62 +45,3 @@ jobs:
with:
data: "${{ needs.RunConfig.outputs.data }}"
set_latest: true
SonarCloud:
runs-on: [self-hosted, builder]
env:
SONAR_SCANNER_VERSION: 4.8.0.2856
SONAR_SERVER_URL: "https://sonarcloud.io"
BUILD_WRAPPER_OUT_DIR: build_wrapper_output_directory # Directory where build-wrapper output will be placed
CC: clang-17
CXX: clang++-17
steps:
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true
fetch-depth: 0 # Shallow clones should be disabled for a better relevancy of analysis
filter: tree:0
submodules: true
- name: Set up JDK 11
uses: actions/setup-java@v1
with:
java-version: 11
- name: Download and set up sonar-scanner
env:
SONAR_SCANNER_DOWNLOAD_URL: https://binaries.sonarsource.com/Distribution/sonar-scanner-cli/sonar-scanner-cli-${{ env.SONAR_SCANNER_VERSION }}-linux.zip
run: |
mkdir -p "$HOME/.sonar"
curl -sSLo "$HOME/.sonar/sonar-scanner.zip" "${{ env.SONAR_SCANNER_DOWNLOAD_URL }}"
unzip -o "$HOME/.sonar/sonar-scanner.zip" -d "$HOME/.sonar/"
echo "$HOME/.sonar/sonar-scanner-${{ env.SONAR_SCANNER_VERSION }}-linux/bin" >> "$GITHUB_PATH"
- name: Download and set up build-wrapper
env:
BUILD_WRAPPER_DOWNLOAD_URL: ${{ env.SONAR_SERVER_URL }}/static/cpp/build-wrapper-linux-x86.zip
run: |
curl -sSLo "$HOME/.sonar/build-wrapper-linux-x86.zip" "${{ env.BUILD_WRAPPER_DOWNLOAD_URL }}"
unzip -o "$HOME/.sonar/build-wrapper-linux-x86.zip" -d "$HOME/.sonar/"
echo "$HOME/.sonar/build-wrapper-linux-x86" >> "$GITHUB_PATH"
- name: Set Up Build Tools
run: |
sudo apt-get update
sudo apt-get install -yq git cmake ccache ninja-build python3 yasm nasm
sudo bash -c "$(wget -O - https://apt.llvm.org/llvm.sh)"
- name: Run build-wrapper
run: |
mkdir build
cd build
cmake ..
cd ..
build-wrapper-linux-x86-64 --out-dir ${{ env.BUILD_WRAPPER_OUT_DIR }} cmake --build build/
- name: Run sonar-scanner
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
run: |
sonar-scanner \
--define sonar.host.url="${{ env.SONAR_SERVER_URL }}" \
--define sonar.cfamily.build-wrapper-output="${{ env.BUILD_WRAPPER_OUT_DIR }}" \
--define sonar.projectKey="ClickHouse_ClickHouse" \
--define sonar.organization="clickhouse-java" \
--define sonar.cfamily.cpp23.enabled=true \
--define sonar.exclusions="**/*.java,**/*.ts,**/*.js,**/*.css,**/*.sql"

View File

@ -172,6 +172,7 @@ jobs:
run: |
cd "$GITHUB_WORKSPACE/tests/ci"
python3 finish_check.py
python3 merge_pr.py --check-approved
#############################################################################################

View File

@ -43,8 +43,7 @@ jobs:
runs-on: [self-hosted, '${{inputs.runner_type}}']
steps:
- name: Check out repository code
# WIP: temporary try commit with limited perallelization of checkout
uses: ClickHouse/checkout@0be3f7b3098bae494d3ef5d29d2e0676fb606232
uses: ClickHouse/checkout@v1
with:
clear-repository: true
ref: ${{ fromJson(inputs.data).git_ref }}

View File

@ -56,13 +56,13 @@ option(ENABLE_CHECK_HEAVY_BUILDS "Don't allow C++ translation units to compile t
if (ENABLE_CHECK_HEAVY_BUILDS)
# set DATA (since RSS does not work since 2.6.x+) to 5G
set (RLIMIT_DATA 5000000000)
# set VIRT (RLIMIT_AS) to 10G (DATA*10)
# set VIRT (RLIMIT_AS) to 10G (DATA*2)
set (RLIMIT_AS 10000000000)
# set CPU time limit to 1000 seconds
set (RLIMIT_CPU 1000)
# -fsanitize=memory and address are too heavy
if (SANITIZE)
if (SANITIZE OR SANITIZE_COVERAGE OR WITH_COVERAGE)
set (RLIMIT_DATA 10000000000) # 10G
endif()

View File

@ -248,6 +248,9 @@ Some of the files might not download fully. Check the file sizes and re-download
``` bash
$ curl -O https://datasets.clickhouse.com/trips_mergetree/partitions/trips_mergetree.tar
# Validate the checksum
$ md5sum trips_mergetree.tar
# Checksum should be equal to: f3b8d469b41d9a82da064ded7245d12c
$ tar xvf trips_mergetree.tar -C /var/lib/clickhouse # path to ClickHouse data directory
$ # check permissions of unpacked data, fix if required
$ sudo service clickhouse-server restart

View File

@ -26,7 +26,9 @@ priority: 0
is_active: 0
active_children: 0
dequeued_requests: 67
canceled_requests: 0
dequeued_cost: 4692272
canceled_cost: 0
busy_periods: 63
vruntime: 938454.1999999989
system_vruntime: ᴺᵁᴸᴸ
@ -54,7 +56,9 @@ Columns:
- `is_active` (`UInt8`) - Whether this node is currently active - has resource requests to be dequeued and constraints satisfied.
- `active_children` (`UInt64`) - The number of children in active state.
- `dequeued_requests` (`UInt64`) - The total number of resource requests dequeued from this node.
- `canceled_requests` (`UInt64`) - The total number of resource requests canceled from this node.
- `dequeued_cost` (`UInt64`) - The sum of costs (e.g. size in bytes) of all requests dequeued from this node.
- `canceled_cost` (`UInt64`) - The sum of costs (e.g. size in bytes) of all requests canceled from this node.
- `busy_periods` (`UInt64`) - The total number of deactivations of this node.
- `vruntime` (`Nullable(Float64)`) - For children of `fair` nodes only. Virtual runtime of a node used by SFQ algorithm to select the next child to process in a max-min fair manner.
- `system_vruntime` (`Nullable(Float64)`) - For `fair` nodes only. Virtual runtime showing `vruntime` of the last processed resource request. Used during child activation as the new value of `vruntime`.

View File

@ -36,9 +36,9 @@ You can explicitly set a time zone for `DateTime`-type columns when creating a t
The [clickhouse-client](../../interfaces/cli.md) applies the server time zone by default if a time zone isnt explicitly set when initializing the data type. To use the client time zone, run `clickhouse-client` with the `--use_client_time_zone` parameter.
ClickHouse outputs values depending on the value of the [date_time_output_format](../../operations/settings/settings-formats.md#date_time_output_format) setting. `YYYY-MM-DD hh:mm:ss` text format by default. Additionally, you can change the output with the [formatDateTime](../../sql-reference/functions/date-time-functions.md#formatdatetime) function.
ClickHouse outputs values depending on the value of the [date_time_output_format](../../operations/settings/settings.md#settings-date_time_output_format) setting. `YYYY-MM-DD hh:mm:ss` text format by default. Additionally, you can change the output with the [formatDateTime](../../sql-reference/functions/date-time-functions.md#formatdatetime) function.
When inserting data into ClickHouse, you can use different formats of date and time strings, depending on the value of the [date_time_input_format](../../operations/settings/settings-formats.md#date_time_input_format) setting.
When inserting data into ClickHouse, you can use different formats of date and time strings, depending on the value of the [date_time_input_format](../../operations/settings/settings.md#settings-date_time_input_format) setting.
## Examples
@ -147,8 +147,8 @@ Time shifts for multiple days. Some pacific islands changed their timezone offse
- [Type conversion functions](../../sql-reference/functions/type-conversion-functions.md)
- [Functions for working with dates and times](../../sql-reference/functions/date-time-functions.md)
- [Functions for working with arrays](../../sql-reference/functions/array-functions.md)
- [The `date_time_input_format` setting](../../operations/settings/settings-formats.md#date_time_input_format)
- [The `date_time_output_format` setting](../../operations/settings/settings-formats.md#date_time_output_format)
- [The `date_time_input_format` setting](../../operations/settings/settings-formats.md#settings-date_time_input_format)
- [The `date_time_output_format` setting](../../operations/settings/settings-formats.md#settings-date_time_output_format)
- [The `timezone` server configuration parameter](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-timezone)
- [The `session_timezone` setting](../../operations/settings/settings.md#session_timezone)
- [Operators for working with dates and times](../../sql-reference/operators/index.md#operators-datetime)

View File

@ -27,9 +27,9 @@ DateTime([timezone])
Консольный клиент ClickHouse по умолчанию использует часовой пояс сервера, если для значения `DateTime` часовой пояс не был задан в явном виде при инициализации типа данных. Чтобы использовать часовой пояс клиента, запустите [clickhouse-client](../../interfaces/cli.md) с параметром `--use_client_time_zone`.
ClickHouse отображает значения в зависимости от значения параметра [date\_time\_output\_format](../../operations/settings/settings-formats.md#date_time_output_format). Текстовый формат по умолчанию `YYYY-MM-DD hh:mm:ss`. Кроме того, вы можете поменять отображение с помощью функции [formatDateTime](../../sql-reference/functions/date-time-functions.md#formatdatetime).
ClickHouse отображает значения в зависимости от значения параметра [date\_time\_output\_format](../../operations/settings/index.md#settings-date_time_output_format). Текстовый формат по умолчанию `YYYY-MM-DD hh:mm:ss`. Кроме того, вы можете поменять отображение с помощью функции [formatDateTime](../../sql-reference/functions/date-time-functions.md#formatdatetime).
При вставке данных в ClickHouse, можно использовать различные форматы даты и времени в зависимости от значения настройки [date_time_input_format](../../operations/settings/settings-formats.md#date_time_input_format).
При вставке данных в ClickHouse, можно использовать различные форматы даты и времени в зависимости от значения настройки [date_time_input_format](../../operations/settings/index.md#settings-date_time_input_format).
## Примеры {#primery}
@ -119,8 +119,8 @@ FROM dt
- [Функции преобразования типов](../../sql-reference/functions/type-conversion-functions.md)
- [Функции для работы с датой и временем](../../sql-reference/functions/date-time-functions.md)
- [Функции для работы с массивами](../../sql-reference/functions/array-functions.md)
- [Настройка `date_time_input_format`](../../operations/settings/settings-formats.md#date_time_input_format)
- [Настройка `date_time_output_format`](../../operations/settings/settings-formats.md#date_time_output_format)
- [Настройка `date_time_input_format`](../../operations/settings/index.md#settings-date_time_input_format)
- [Настройка `date_time_output_format`](../../operations/settings/index.md)
- [Конфигурационный параметр сервера `timezone`](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-timezone)
- [Параметр `session_timezone`](../../operations/settings/settings.md#session_timezone)
- [Операторы для работы с датой и временем](../../sql-reference/operators/index.md#operators-datetime)

View File

@ -483,6 +483,7 @@ public:
}
bool isAbleToParallelizeMerge() const override { return is_able_to_parallelize_merge; }
bool canOptimizeEqualKeysRanges() const override { return !is_able_to_parallelize_merge; }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena *) const override
{
@ -576,6 +577,7 @@ public:
}
bool isAbleToParallelizeMerge() const override { return is_able_to_parallelize_merge; }
bool canOptimizeEqualKeysRanges() const override { return !is_able_to_parallelize_merge; }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena *) const override
{

View File

@ -142,6 +142,7 @@ public:
}
bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); }
bool canOptimizeEqualKeysRanges() const override { return nested_func->canOptimizeEqualKeysRanges(); }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override
{

View File

@ -165,6 +165,7 @@ public:
}
bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); }
bool canOptimizeEqualKeysRanges() const override { return nested_func->canOptimizeEqualKeysRanges(); }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override
{

View File

@ -111,6 +111,7 @@ public:
}
bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); }
bool canOptimizeEqualKeysRanges() const override { return nested_func->canOptimizeEqualKeysRanges(); }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override
{

View File

@ -152,6 +152,7 @@ public:
}
bool isAbleToParallelizeMerge() const override { return nested_function->isAbleToParallelizeMerge(); }
bool canOptimizeEqualKeysRanges() const override { return nested_function->canOptimizeEqualKeysRanges(); }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override
{

View File

@ -92,6 +92,7 @@ public:
}
bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); }
bool canOptimizeEqualKeysRanges() const override { return nested_func->canOptimizeEqualKeysRanges(); }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override
{

View File

@ -162,6 +162,10 @@ public:
/// Tells if merge() with thread pool parameter could be used.
virtual bool isAbleToParallelizeMerge() const { return false; }
/// Return true if it is allowed to replace call of `addBatch`
/// to `addBatchSinglePlace` for ranges of consecutive equal keys.
virtual bool canOptimizeEqualKeysRanges() const { return true; }
/// Should be used only if isAbleToParallelizeMerge() returned true.
virtual void
merge(AggregateDataPtr __restrict /*place*/, ConstAggregateDataPtr /*rhs*/, ThreadPool & /*thread_pool*/, Arena * /*arena*/) const

View File

@ -54,10 +54,10 @@ public:
if (!constant_node)
return;
const auto & constant_value_literal = constant_node->getValue();
if (!isInt64OrUInt64FieldType(constant_value_literal.getType()))
if (auto constant_type = constant_node->getResultType(); !isNativeInteger(constant_type))
return;
const auto & constant_value_literal = constant_node->getValue();
if (getSettings().aggregate_functions_null_for_empty)
return;

View File

@ -101,10 +101,12 @@ RestorerFromBackup::RestorerFromBackup(
RestorerFromBackup::~RestorerFromBackup()
{
if (!futures.empty())
/// If an exception occurs we can come here to the destructor having some tasks still unfinished.
/// We have to wait until they finish.
if (getNumFutures() > 0)
{
LOG_ERROR(log, "RestorerFromBackup must not be destroyed while {} tasks are still running", futures.size());
chassert(false && "RestorerFromBackup must not be destroyed while some tasks are still running");
LOG_INFO(log, "Waiting for {} tasks to finish", getNumFutures());
waitFutures();
}
}

View File

@ -914,6 +914,38 @@ ASTPtr QueryFuzzer::fuzzLiteralUnderExpressionList(ASTPtr child)
child = makeASTFunction(
"toFixedString", std::make_shared<ASTLiteral>(value), std::make_shared<ASTLiteral>(static_cast<UInt64>(value.size())));
}
else if (type == Field::Types::Which::UInt64 && fuzz_rand() % 7 == 0)
{
child = makeASTFunction(fuzz_rand() % 2 == 0 ? "toUInt128" : "toUInt256", std::make_shared<ASTLiteral>(l->value.get<UInt64>()));
}
else if (type == Field::Types::Which::Int64 && fuzz_rand() % 7 == 0)
{
child = makeASTFunction(fuzz_rand() % 2 == 0 ? "toInt128" : "toInt256", std::make_shared<ASTLiteral>(l->value.get<Int64>()));
}
else if (type == Field::Types::Which::Float64 && fuzz_rand() % 7 == 0)
{
int decimal = fuzz_rand() % 4;
if (decimal == 0)
child = makeASTFunction(
"toDecimal32",
std::make_shared<ASTLiteral>(l->value.get<Float64>()),
std::make_shared<ASTLiteral>(static_cast<UInt64>(fuzz_rand() % 9)));
else if (decimal == 1)
child = makeASTFunction(
"toDecimal64",
std::make_shared<ASTLiteral>(l->value.get<Float64>()),
std::make_shared<ASTLiteral>(static_cast<UInt64>(fuzz_rand() % 18)));
else if (decimal == 2)
child = makeASTFunction(
"toDecimal128",
std::make_shared<ASTLiteral>(l->value.get<Float64>()),
std::make_shared<ASTLiteral>(static_cast<UInt64>(fuzz_rand() % 38)));
else
child = makeASTFunction(
"toDecimal256",
std::make_shared<ASTLiteral>(l->value.get<Float64>()),
std::make_shared<ASTLiteral>(static_cast<UInt64>(fuzz_rand() % 76)));
}
if (fuzz_rand() % 7 == 0)
child = makeASTFunction("toNullable", child);
@ -933,7 +965,19 @@ ASTPtr QueryFuzzer::reverseLiteralFuzzing(ASTPtr child)
{
if (auto * function = child.get()->as<ASTFunction>())
{
std::unordered_set<String> can_be_reverted{"toNullable", "toLowCardinality", "materialize"};
const std::unordered_set<String> can_be_reverted{
"materialize",
"toDecimal32", /// Keeping the first parameter only should be ok (valid query most of the time)
"toDecimal64",
"toDecimal128",
"toDecimal256",
"toFixedString", /// Same as toDecimal
"toInt128",
"toInt256",
"toLowCardinality",
"toNullable",
"toUInt128",
"toUInt256"};
if (can_be_reverted.contains(function->name) && function->children.size() == 1)
{
if (fuzz_rand() % 7 == 0)

View File

@ -9,6 +9,7 @@
#include <IO/MMappedFileCache.h>
#include <IO/ReadHelpers.h>
#include <base/errnoToString.h>
#include <base/find_symbols.h>
#include <base/getPageSize.h>
#include <sys/resource.h>
#include <chrono>
@ -90,6 +91,9 @@ AsynchronousMetrics::AsynchronousMetrics(
openFileIfExists("/sys/fs/cgroup/cpu/cpu.cfs_quota_us", cgroupcpu_cfs_quota);
}
openFileIfExists("/proc/sys/vm/max_map_count", vm_max_map_count);
openFileIfExists("/proc/self/maps", vm_maps);
openSensors();
openBlockDevices();
openEDAC();
@ -1423,6 +1427,55 @@ void AsynchronousMetrics::update(TimePoint update_time, bool force_update)
}
}
if (vm_max_map_count)
{
try
{
vm_max_map_count->rewind();
uint64_t max_map_count = 0;
readText(max_map_count, *vm_max_map_count);
new_values["VMMaxMapCount"] = { max_map_count, "The maximum number of memory mappings a process may have (/proc/sys/vm/max_map_count)."};
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
openFileIfExists("/proc/sys/vm/max_map_count", vm_max_map_count);
}
}
if (vm_maps)
{
try
{
vm_maps->rewind();
uint64_t num_maps = 0;
while (!vm_maps->eof())
{
char * next_pos = find_first_symbols<'\n'>(vm_maps->position(), vm_maps->buffer().end());
vm_maps->position() = next_pos;
if (!vm_maps->hasPendingData())
continue;
if (*vm_maps->position() == '\n')
{
++num_maps;
++vm_maps->position();
}
}
new_values["VMNumMaps"] = { num_maps,
"The current number of memory mappings of the process (/proc/self/maps)."
" If it is close to the maximum (VMMaxMapCount), you should increase the limit for vm.max_map_count in /etc/sysctl.conf"};
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
openFileIfExists("/proc/self/maps", vm_maps);
}
}
try
{
for (size_t i = 0, size = thermal.size(); i < size; ++i)

View File

@ -123,6 +123,9 @@ private:
std::optional<ReadBufferFromFilePRead> cgroupcpu_cfs_quota TSA_GUARDED_BY(data_mutex);
std::optional<ReadBufferFromFilePRead> cgroupcpu_max TSA_GUARDED_BY(data_mutex);
std::optional<ReadBufferFromFilePRead> vm_max_map_count TSA_GUARDED_BY(data_mutex);
std::optional<ReadBufferFromFilePRead> vm_maps TSA_GUARDED_BY(data_mutex);
std::vector<std::unique_ptr<ReadBufferFromFilePRead>> thermal TSA_GUARDED_BY(data_mutex);
std::unordered_map<String /* device name */,

View File

@ -62,7 +62,6 @@ struct LastElementCache
bool check(const Key & key) const { return value.first == key; }
bool hasOnlyOneValue() const { return found && misses == 1; }
UInt64 getMisses() const { return misses; }
};
template <typename Data>
@ -232,7 +231,7 @@ public:
ALWAYS_INLINE UInt64 getCacheMissesSinceLastReset() const
{
if constexpr (consecutive_keys_optimization)
return cache.getMisses();
return cache.misses;
return 0;
}

View File

@ -534,6 +534,7 @@ The server successfully detected this situation and will download merged part fr
\
M(AggregationPreallocatedElementsInHashTables, "How many elements were preallocated in hash tables for aggregation.") \
M(AggregationHashTablesInitializedAsTwoLevel, "How many hash tables were inited as two-level for aggregation.") \
M(AggregationOptimizedEqualRangesOfKeys, "For how many blocks optimization of equal ranges of keys was applied") \
\
M(MetadataFromKeeperCacheHit, "Number of times an object storage metadata request was answered from cache without making request to Keeper") \
M(MetadataFromKeeperCacheMiss, "Number of times an object storage metadata request had to be answered from Keeper") \

View File

@ -387,7 +387,9 @@ public:
/// Introspection
std::atomic<UInt64> dequeued_requests{0};
std::atomic<UInt64> canceled_requests{0};
std::atomic<ResourceCost> dequeued_cost{0};
std::atomic<ResourceCost> canceled_cost{0};
std::atomic<UInt64> busy_periods{0};
};

View File

@ -50,6 +50,12 @@ public:
/// Should be called outside of scheduling subsystem, implementation must be thread-safe.
virtual void enqueueRequest(ResourceRequest * request) = 0;
/// Cancel previously enqueued request.
/// Returns `false` and does nothing given unknown or already executed request.
/// Returns `true` if requests has been found and canceled.
/// Should be called outside of scheduling subsystem, implementation must be thread-safe.
virtual bool cancelRequest(ResourceRequest * request) = 0;
/// For introspection
ResourceCost getBudget() const
{

View File

@ -134,56 +134,65 @@ public:
std::pair<ResourceRequest *, bool> dequeueRequest() override
{
if (heap_size == 0)
return {nullptr, false};
// Recursively pull request from child
auto [request, child_active] = items.front().child->dequeueRequest();
assert(request != nullptr);
std::pop_heap(items.begin(), items.begin() + heap_size);
Item & current = items[heap_size - 1];
// SFQ fairness invariant: system vruntime equals last served request start-time
assert(current.vruntime >= system_vruntime);
system_vruntime = current.vruntime;
// By definition vruntime is amount of consumed resource (cost) divided by weight
current.vruntime += double(request->cost) / current.child->info.weight;
max_vruntime = std::max(max_vruntime, current.vruntime);
if (child_active) // Put active child back in heap after vruntime update
// Cycle is required to do deactivations in the case of canceled requests, when dequeueRequest returns `nullptr`
while (true)
{
std::push_heap(items.begin(), items.begin() + heap_size);
}
else // Deactivate child if it is empty, but remember it's vruntime for latter activations
{
heap_size--;
if (heap_size == 0)
return {nullptr, false};
// Store index of this inactive child in `parent.idx`
// This enables O(1) search of inactive children instead of O(n)
current.child->info.parent.idx = heap_size;
}
// Recursively pull request from child
auto [request, child_active] = items.front().child->dequeueRequest();
std::pop_heap(items.begin(), items.begin() + heap_size);
Item & current = items[heap_size - 1];
// Reset any difference between children on busy period end
if (heap_size == 0)
{
// Reset vtime to zero to avoid floating-point error accumulation,
// but do not reset too often, because it's O(N)
UInt64 ns = clock_gettime_ns();
if (last_reset_ns + 1000000000 < ns)
if (request)
{
last_reset_ns = ns;
for (Item & item : items)
item.vruntime = 0;
max_vruntime = 0;
}
system_vruntime = max_vruntime;
busy_periods++;
}
// SFQ fairness invariant: system vruntime equals last served request start-time
assert(current.vruntime >= system_vruntime);
system_vruntime = current.vruntime;
dequeued_requests++;
dequeued_cost += request->cost;
return {request, heap_size > 0};
// By definition vruntime is amount of consumed resource (cost) divided by weight
current.vruntime += double(request->cost) / current.child->info.weight;
max_vruntime = std::max(max_vruntime, current.vruntime);
}
if (child_active) // Put active child back in heap after vruntime update
{
std::push_heap(items.begin(), items.begin() + heap_size);
}
else // Deactivate child if it is empty, but remember it's vruntime for latter activations
{
heap_size--;
// Store index of this inactive child in `parent.idx`
// This enables O(1) search of inactive children instead of O(n)
current.child->info.parent.idx = heap_size;
}
// Reset any difference between children on busy period end
if (heap_size == 0)
{
// Reset vtime to zero to avoid floating-point error accumulation,
// but do not reset too often, because it's O(N)
UInt64 ns = clock_gettime_ns();
if (last_reset_ns + 1000000000 < ns)
{
last_reset_ns = ns;
for (Item & item : items)
item.vruntime = 0;
max_vruntime = 0;
}
system_vruntime = max_vruntime;
busy_periods++;
}
if (request)
{
dequeued_requests++;
dequeued_cost += request->cost;
return {request, heap_size > 0};
}
}
}
bool isActive() override

View File

@ -39,8 +39,7 @@ public:
void enqueueRequest(ResourceRequest * request) override
{
std::unique_lock lock(mutex);
request->enqueue_ns = clock_gettime_ns();
std::lock_guard lock(mutex);
queue_cost += request->cost;
bool was_empty = requests.empty();
requests.push_back(request);
@ -50,7 +49,7 @@ public:
std::pair<ResourceRequest *, bool> dequeueRequest() override
{
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
if (requests.empty())
return {nullptr, false};
ResourceRequest * result = requests.front();
@ -63,9 +62,29 @@ public:
return {result, !requests.empty()};
}
bool cancelRequest(ResourceRequest * request) override
{
std::lock_guard lock(mutex);
// TODO(serxa): reimplement queue as intrusive list of ResourceRequest to make this O(1) instead of O(N)
for (auto i = requests.begin(), e = requests.end(); i != e; ++i)
{
if (*i == request)
{
requests.erase(i);
if (requests.empty())
busy_periods++;
queue_cost -= request->cost;
canceled_requests++;
canceled_cost += request->cost;
return true;
}
}
return false;
}
bool isActive() override
{
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
return !requests.empty();
}
@ -98,14 +117,14 @@ public:
std::pair<UInt64, Int64> getQueueLengthAndCost()
{
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
return {requests.size(), queue_cost};
}
private:
std::mutex mutex;
Int64 queue_cost = 0;
std::deque<ResourceRequest *> requests;
std::deque<ResourceRequest *> requests; // TODO(serxa): reimplement it using intrusive list to avoid allocations/deallocations and O(N) during cancel
};
}

View File

@ -102,25 +102,31 @@ public:
std::pair<ResourceRequest *, bool> dequeueRequest() override
{
if (items.empty())
return {nullptr, false};
// Recursively pull request from child
auto [request, child_active] = items.front().child->dequeueRequest();
assert(request != nullptr);
// Deactivate child if it is empty
if (!child_active)
// Cycle is required to do deactivations in the case of canceled requests, when dequeueRequest returns `nullptr`
while (true)
{
std::pop_heap(items.begin(), items.end());
items.pop_back();
if (items.empty())
busy_periods++;
}
return {nullptr, false};
dequeued_requests++;
dequeued_cost += request->cost;
return {request, !items.empty()};
// Recursively pull request from child
auto [request, child_active] = items.front().child->dequeueRequest();
// Deactivate child if it is empty
if (!child_active)
{
std::pop_heap(items.begin(), items.end());
items.pop_back();
if (items.empty())
busy_periods++;
}
if (request)
{
dequeued_requests++;
dequeued_cost += request->cost;
return {request, !items.empty()};
}
}
}
bool isActive() override

View File

@ -38,7 +38,6 @@ TEST(SchedulerDynamicResourceManager, Smoke)
{
ResourceGuard gA(cA->get("res1"), ResourceGuard::PostponeLocking);
gA.lock();
gA.setFailure();
gA.unlock();
ResourceGuard gB(cB->get("res1"));

View File

@ -4,6 +4,7 @@
#include <Common/Scheduler/Nodes/tests/ResourceTest.h>
#include <barrier>
#include <future>
using namespace DB;
@ -73,6 +74,22 @@ struct ResourceHolder
}
};
struct MyRequest : public ResourceRequest
{
std::function<void()> on_execute;
explicit MyRequest(ResourceCost cost_, std::function<void()> on_execute_)
: ResourceRequest(cost_)
, on_execute(on_execute_)
{}
void execute() override
{
if (on_execute)
on_execute();
}
};
TEST(SchedulerRoot, Smoke)
{
ResourceTest t;
@ -111,3 +128,49 @@ TEST(SchedulerRoot, Smoke)
EXPECT_TRUE(fc2->requests.contains(&rg.request));
}
}
TEST(SchedulerRoot, Cancel)
{
ResourceTest t;
ResourceHolder r1(t);
auto * fc1 = r1.add<ConstraintTest>("/", "<max_requests>1</max_requests>");
r1.add<PriorityPolicy>("/prio");
auto a = r1.addQueue("/prio/A", "<priority>1</priority>");
auto b = r1.addQueue("/prio/B", "<priority>2</priority>");
r1.registerResource();
std::barrier destruct_sync(2);
std::barrier sync(2);
std::thread consumer1([&]
{
MyRequest request(1,[&]
{
sync.arrive_and_wait(); // (A)
EXPECT_TRUE(fc1->requests.contains(&request));
sync.arrive_and_wait(); // (B)
request.finish();
destruct_sync.arrive_and_wait(); // (C)
});
a.queue->enqueueRequest(&request);
destruct_sync.arrive_and_wait(); // (C)
});
std::thread consumer2([&]
{
MyRequest request(1,[&]
{
FAIL() << "This request must be canceled, but instead executes";
});
sync.arrive_and_wait(); // (A) wait for request of consumer1 to be inside execute, so that constraint is in violated state and our request will not be executed immediately
b.queue->enqueueRequest(&request);
bool canceled = b.queue->cancelRequest(&request);
EXPECT_TRUE(canceled);
sync.arrive_and_wait(); // (B) release request of consumer1 to be finished
});
consumer1.join();
consumer2.join();
EXPECT_TRUE(fc1->requests.empty());
}

View File

@ -71,8 +71,7 @@ public:
// lock(mutex) is not required because `Dequeued` request cannot be used by the scheduler thread
chassert(state == Dequeued);
state = Finished;
if (constraint)
constraint->finishRequest(this);
ResourceRequest::finish();
}
static Request & local()
@ -126,12 +125,6 @@ public:
}
}
/// Mark request as unsuccessful; by default request is considered to be successful
void setFailure()
{
request.successful = false;
}
ResourceLink link;
Request & request;
};

View File

@ -0,0 +1,13 @@
#include <Common/Scheduler/ResourceRequest.h>
#include <Common/Scheduler/ISchedulerConstraint.h>
namespace DB
{
void ResourceRequest::finish()
{
if (constraint)
constraint->finishRequest(this);
}
}

View File

@ -14,9 +14,6 @@ class ISchedulerConstraint;
using ResourceCost = Int64;
constexpr ResourceCost ResourceCostMax = std::numeric_limits<int>::max();
/// Timestamps (nanoseconds since epoch)
using ResourceNs = UInt64;
/*
* Request for a resource consumption. The main moving part of the scheduling subsystem.
* Resource requests processing workflow:
@ -31,7 +28,7 @@ using ResourceNs = UInt64;
* 3) Scheduler calls ISchedulerNode::dequeueRequest() that returns the request.
* 4) Callback ResourceRequest::execute() is called to provide access to the resource.
* 5) The resource consumption is happening outside of the scheduling subsystem.
* 6) request->constraint->finishRequest() is called when consumption is finished.
* 6) ResourceRequest::finish() is called when consumption is finished.
*
* Steps (5) and (6) can be omitted if constraint is not used by the resource.
*
@ -39,7 +36,10 @@ using ResourceNs = UInt64;
* Request ownership is done outside of the scheduling subsystem.
* After (6) request can be destructed safely.
*
* Request cancelling is not supported yet.
* Request can also be canceled before (3) using ISchedulerQueue::cancelRequest().
* Returning false means it is too late for request to be canceled. It should be processed in a regular way.
* Returning true means successful cancel and therefore steps (4) and (5) are not going to happen
* and step (6) MUST be omitted.
*/
class ResourceRequest
{
@ -48,32 +48,20 @@ public:
/// NOTE: If cost is not known in advance, ResourceBudget should be used (note that every ISchedulerQueue has it)
ResourceCost cost;
/// Request outcome
/// Should be filled during resource consumption
bool successful;
/// Scheduler node to be notified on consumption finish
/// Auto-filled during request enqueue/dequeue
ISchedulerConstraint * constraint;
/// Timestamps for introspection
ResourceNs enqueue_ns;
ResourceNs execute_ns;
ResourceNs finish_ns;
explicit ResourceRequest(ResourceCost cost_ = 1)
{
reset(cost_);
}
/// ResourceRequest object may be reused again after reset()
void reset(ResourceCost cost_)
{
cost = cost_;
successful = true;
constraint = nullptr;
enqueue_ns = 0;
execute_ns = 0;
finish_ns = 0;
}
virtual ~ResourceRequest() = default;
@ -83,6 +71,12 @@ public:
/// just triggering start of a consumption, not doing the consumption itself
/// (e.g. setting an std::promise or creating a job in a thread pool)
virtual void execute() = 0;
/// Stop resource consumption and notify resource scheduler.
/// Should be called when resource consumption is finished by consumer.
/// ResourceRequest should not be destructed or reset before calling to `finish()`.
/// WARNING: this function MUST not be called if request was canceled.
void finish();
};
}

View File

@ -145,22 +145,27 @@ public:
std::pair<ResourceRequest *, bool> dequeueRequest() override
{
if (current == nullptr) // No active resources
return {nullptr, false};
while (true)
{
if (current == nullptr) // No active resources
return {nullptr, false};
// Dequeue request from current resource
auto [request, resource_active] = current->root->dequeueRequest();
assert(request != nullptr);
// Dequeue request from current resource
auto [request, resource_active] = current->root->dequeueRequest();
// Deactivate resource if required
if (!resource_active)
deactivate(current);
else
current = current->next; // Just move round-robin pointer
// Deactivate resource if required
if (!resource_active)
deactivate(current);
else
current = current->next; // Just move round-robin pointer
dequeued_requests++;
dequeued_cost += request->cost;
return {request, current != nullptr};
if (request == nullptr) // Possible in case of request cancel, just retry
continue;
dequeued_requests++;
dequeued_cost += request->cost;
return {request, current != nullptr};
}
}
bool isActive() override
@ -245,7 +250,6 @@ private:
void execute(ResourceRequest * request)
{
request->execute_ns = clock_gettime_ns();
request->execute();
}

View File

@ -121,7 +121,8 @@ void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractCo
auth_settings.use_insecure_imds_request.value_or(false),
auth_settings.expiration_window_seconds.value_or(S3::DEFAULT_EXPIRATION_WINDOW_SECONDS),
auth_settings.no_sign_request.value_or(false),
});
},
credentials.GetSessionToken());
auto new_client = std::make_shared<KeeperSnapshotManagerS3::S3Configuration>(std::move(new_uri), std::move(auth_settings), std::move(client));

View File

@ -53,6 +53,7 @@ namespace ProfileEvents
extern const Event OverflowThrow;
extern const Event OverflowBreak;
extern const Event OverflowAny;
extern const Event AggregationOptimizedEqualRangesOfKeys;
}
namespace CurrentMetrics
@ -1344,6 +1345,7 @@ void NO_INLINE Aggregator::executeImplBatch(
if (use_compiled_functions)
{
std::vector<ColumnData> columns_data;
bool can_optimize_equal_keys_ranges = true;
for (size_t i = 0; i < aggregate_functions.size(); ++i)
{
@ -1352,13 +1354,15 @@ void NO_INLINE Aggregator::executeImplBatch(
AggregateFunctionInstruction * inst = aggregate_instructions + i;
size_t arguments_size = inst->that->getArgumentTypes().size(); // NOLINT
can_optimize_equal_keys_ranges &= inst->can_optimize_equal_keys_ranges;
for (size_t argument_index = 0; argument_index < arguments_size; ++argument_index)
columns_data.emplace_back(getColumnData(inst->batch_arguments[argument_index]));
}
if (all_keys_are_const || (!no_more_keys && state.hasOnlyOneValueSinceLastReset()))
if (all_keys_are_const || (can_optimize_equal_keys_ranges && state.hasOnlyOneValueSinceLastReset()))
{
ProfileEvents::increment(ProfileEvents::AggregationOptimizedEqualRangesOfKeys);
auto add_into_aggregate_states_function_single_place = compiled_aggregate_functions_holder->compiled_aggregate_functions.add_into_aggregate_states_function_single_place;
add_into_aggregate_states_function_single_place(row_begin, row_end, columns_data.data(), places[key_start]);
}
@ -1380,10 +1384,15 @@ void NO_INLINE Aggregator::executeImplBatch(
AggregateFunctionInstruction * inst = aggregate_instructions + i;
if (all_keys_are_const || (!no_more_keys && state.hasOnlyOneValueSinceLastReset()))
if (all_keys_are_const || (inst->can_optimize_equal_keys_ranges && state.hasOnlyOneValueSinceLastReset()))
{
ProfileEvents::increment(ProfileEvents::AggregationOptimizedEqualRangesOfKeys);
addBatchSinglePlace(row_begin, row_end, inst, places[key_start] + inst->state_offset, aggregates_pool);
}
else
{
addBatch(row_begin, row_end, inst, places.get(), aggregates_pool);
}
}
}
@ -1573,6 +1582,7 @@ void Aggregator::prepareAggregateInstructions(
}
aggregate_functions_instructions[i].has_sparse_arguments = has_sparse_arguments;
aggregate_functions_instructions[i].can_optimize_equal_keys_ranges = aggregate_functions[i]->canOptimizeEqualKeysRanges();
aggregate_functions_instructions[i].arguments = aggregate_columns[i].data();
aggregate_functions_instructions[i].state_offset = offsets_of_aggregate_states[i];

View File

@ -1221,6 +1221,7 @@ public:
const IColumn ** batch_arguments{};
const UInt64 * offsets{};
bool has_sparse_arguments = false;
bool can_optimize_equal_keys_ranges = true;
};
/// Used for optimize_aggregation_in_order:

View File

@ -1,3 +1,4 @@
#include <Interpreters/ApplyWithSubqueryVisitor.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/InterpreterFactory.h>
@ -71,11 +72,15 @@ BlockIO InterpreterAlterQuery::execute()
BlockIO InterpreterAlterQuery::executeToTable(const ASTAlterQuery & alter)
{
ASTSelectWithUnionQuery * modify_query = nullptr;
for (auto & child : alter.command_list->children)
{
auto * command_ast = child->as<ASTAlterCommand>();
if (command_ast->sql_security)
InterpreterCreateQuery::processSQLSecurityOption(getContext(), command_ast->sql_security->as<ASTSQLSecurity &>());
else if (command_ast->type == ASTAlterCommand::MODIFY_QUERY)
modify_query = command_ast->select->as<ASTSelectWithUnionQuery>();
}
BlockIO res;
@ -123,6 +128,12 @@ BlockIO InterpreterAlterQuery::executeToTable(const ASTAlterQuery & alter)
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is read-only");
auto table_lock = table->lockForShare(getContext()->getCurrentQueryId(), getContext()->getSettingsRef().lock_acquire_timeout);
if (modify_query)
{
// Expand CTE before filling default database
ApplyWithSubqueryVisitor().visit(*modify_query);
}
/// Add default database to table identifiers that we can encounter in e.g. default expressions, mutation expression, etc.
AddDefaultDatabaseVisitor visitor(getContext(), table_id.getDatabaseName());
ASTPtr command_list_ptr = alter.command_list->ptr();

View File

@ -48,6 +48,12 @@ PlannerContext::PlannerContext(ContextMutablePtr query_context_, GlobalPlannerCo
, is_ast_level_optimization_allowed(!(query_context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY || select_query_options_.ignore_ast_optimizations))
{}
PlannerContext::PlannerContext(ContextMutablePtr query_context_, PlannerContextPtr planner_context_)
: query_context(std::move(query_context_))
, global_planner_context(planner_context_->global_planner_context)
, is_ast_level_optimization_allowed(planner_context_->is_ast_level_optimization_allowed)
{}
TableExpressionData & PlannerContext::getOrCreateTableExpressionData(const QueryTreeNodePtr & table_expression_node)
{
auto [it, _] = table_expression_node_to_data.emplace(table_expression_node, TableExpressionData());

View File

@ -75,12 +75,18 @@ private:
using GlobalPlannerContextPtr = std::shared_ptr<GlobalPlannerContext>;
class PlannerContext;
using PlannerContextPtr = std::shared_ptr<PlannerContext>;
class PlannerContext
{
public:
/// Create planner context with query context and global planner context
PlannerContext(ContextMutablePtr query_context_, GlobalPlannerContextPtr global_planner_context_, const SelectQueryOptions & select_query_options_);
/// Create planner with modified query_context
PlannerContext(ContextMutablePtr query_context_, PlannerContextPtr planner_context_);
/// Get planner context query context
ContextPtr getQueryContext() const
{
@ -191,6 +197,4 @@ private:
PreparedSets prepared_sets;
};
using PlannerContextPtr = std::shared_ptr<PlannerContext>;
}

View File

@ -11,6 +11,7 @@ namespace DB
namespace ErrorCodes
{
extern const int UNKNOWN_SETTING;
extern const int INVALID_SETTING_VALUE;
}
IMPLEMENT_SETTINGS_TRAITS(FileLogSettingsTraits, LIST_OF_FILELOG_SETTINGS)
@ -36,6 +37,11 @@ void FileLogSettings::loadFromQuery(ASTStorage & storage_def)
settings_ast->is_standalone = false;
storage_def.set(storage_def.settings, settings_ast);
}
/// Check that batch size is not too high (the same as we check setting max_block_size).
constexpr UInt64 max_sane_block_rows_size = 4294967296; // 2^32
if (poll_max_batch_size > max_sane_block_rows_size)
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Sanity check: 'poll_max_batch_size' value is too high ({})", poll_max_batch_size);
}
}

View File

@ -609,6 +609,15 @@ UInt64 IMergeTreeDataPart::getMarksCount() const
return index_granularity.getMarksCount();
}
UInt64 IMergeTreeDataPart::getExistingBytesOnDisk() const
{
if (storage.getSettings()->exclude_deleted_rows_for_part_size_in_merge && supportLightweightDeleteMutate() && hasLightweightDelete()
&& existing_rows_count.has_value() && existing_rows_count.value() < rows_count && rows_count > 0)
return bytes_on_disk * existing_rows_count.value() / rows_count;
else
return bytes_on_disk;
}
size_t IMergeTreeDataPart::getFileSizeOrZero(const String & file_name) const
{
auto checksum = checksums.files.find(file_name);
@ -691,6 +700,7 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks
calculateColumnsAndSecondaryIndicesSizesOnDisk();
loadRowsCount(); /// Must be called after loadIndexGranularity() as it uses the value of `index_granularity`.
loadExistingRowsCount(); /// Must be called after loadRowsCount() as it uses the value of `rows_count`.
loadPartitionAndMinMaxIndex();
if (!parent_part)
{
@ -1334,6 +1344,87 @@ void IMergeTreeDataPart::loadRowsCount()
}
}
void IMergeTreeDataPart::loadExistingRowsCount()
{
if (existing_rows_count.has_value())
return;
if (!rows_count || !storage.getSettings()->load_existing_rows_count_for_old_parts || !supportLightweightDeleteMutate()
|| !hasLightweightDelete())
existing_rows_count = rows_count;
else
existing_rows_count = readExistingRowsCount();
}
UInt64 IMergeTreeDataPart::readExistingRowsCount()
{
const size_t total_mark = getMarksCount();
if (!total_mark)
return rows_count;
NamesAndTypesList cols;
cols.emplace_back(RowExistsColumn::name, RowExistsColumn::type);
StorageMetadataPtr metadata_ptr = storage.getInMemoryMetadataPtr();
StorageSnapshotPtr storage_snapshot_ptr = std::make_shared<StorageSnapshot>(storage, metadata_ptr);
MergeTreeReaderPtr reader = getReader(
cols,
storage_snapshot_ptr,
MarkRanges{MarkRange(0, total_mark)},
/*virtual_fields=*/ {},
/*uncompressed_cache=*/{},
storage.getContext()->getMarkCache().get(),
std::make_shared<AlterConversions>(),
MergeTreeReaderSettings{},
ValueSizeMap{},
ReadBufferFromFileBase::ProfileCallback{});
if (!reader)
{
LOG_WARNING(storage.log, "Create reader failed while reading existing rows count");
return rows_count;
}
size_t current_mark = 0;
bool continue_reading = false;
size_t current_row = 0;
size_t existing_count = 0;
while (current_row < rows_count)
{
size_t rows_to_read = index_granularity.getMarkRows(current_mark);
continue_reading = (current_mark != 0);
Columns result;
result.resize(1);
size_t rows_read = reader->readRows(current_mark, total_mark, continue_reading, rows_to_read, result);
if (!rows_read)
{
LOG_WARNING(storage.log, "Part {} has lightweight delete, but _row_exists column not found", name);
return rows_count;
}
current_row += rows_read;
current_mark += (rows_to_read == rows_read);
const ColumnUInt8 * row_exists_col = typeid_cast<const ColumnUInt8 *>(result[0].get());
if (!row_exists_col)
{
LOG_WARNING(storage.log, "Part {} _row_exists column type is not UInt8", name);
return rows_count;
}
for (UInt8 row_exists : row_exists_col->getData())
if (row_exists)
existing_count++;
}
LOG_DEBUG(storage.log, "Part {} existing_rows_count = {}", name, existing_count);
return existing_count;
}
void IMergeTreeDataPart::appendFilesOfRowsCount(Strings & files)
{
files.push_back("count.txt");

View File

@ -231,6 +231,9 @@ public:
size_t rows_count = 0;
/// Existing rows count (excluding lightweight deleted rows)
std::optional<size_t> existing_rows_count;
time_t modification_time = 0;
/// When the part is removed from the working set. Changes once.
mutable std::atomic<time_t> remove_time { std::numeric_limits<time_t>::max() };
@ -373,6 +376,10 @@ public:
void setBytesOnDisk(UInt64 bytes_on_disk_) { bytes_on_disk = bytes_on_disk_; }
void setBytesUncompressedOnDisk(UInt64 bytes_uncompressed_on_disk_) { bytes_uncompressed_on_disk = bytes_uncompressed_on_disk_; }
/// Returns estimated size of existing rows if setting exclude_deleted_rows_for_part_size_in_merge is true
/// Otherwise returns bytes_on_disk
UInt64 getExistingBytesOnDisk() const;
size_t getFileSizeOrZero(const String & file_name) const;
auto getFilesChecksums() const { return checksums.files; }
@ -499,6 +506,9 @@ public:
/// True if here is lightweight deleted mask file in part.
bool hasLightweightDelete() const;
/// Read existing rows count from _row_exists column
UInt64 readExistingRowsCount();
void writeChecksums(const MergeTreeDataPartChecksums & checksums_, const WriteSettings & settings);
/// Checks the consistency of this data part.
@ -664,6 +674,9 @@ private:
/// For the older format version calculates rows count from the size of a column with a fixed size.
void loadRowsCount();
/// Load existing rows count from _row_exists column if load_existing_rows_count_for_old_parts is true.
void loadExistingRowsCount();
static void appendFilesOfRowsCount(Strings & files);
/// Loads ttl infos in json format from file ttl.txt. If file doesn't exists assigns ttl infos with all zeros

View File

@ -174,7 +174,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
}
/// Start to make the main work
size_t estimated_space_for_merge = MergeTreeDataMergerMutator::estimateNeededDiskSpace(parts);
size_t estimated_space_for_merge = MergeTreeDataMergerMutator::estimateNeededDiskSpace(parts, true);
/// Can throw an exception while reserving space.
IMergeTreeDataPart::TTLInfos ttl_infos;

View File

@ -8263,6 +8263,7 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::createE
new_data_part->setColumns(columns, {}, metadata_snapshot->getMetadataVersion());
new_data_part->rows_count = block.rows();
new_data_part->existing_rows_count = block.rows();
new_data_part->partition = partition;

View File

@ -405,7 +405,7 @@ MergeTreeDataMergerMutator::MergeSelectingInfo MergeTreeDataMergerMutator::getPo
}
IMergeSelector::Part part_info;
part_info.size = part->getBytesOnDisk();
part_info.size = part->getExistingBytesOnDisk();
part_info.age = res.current_time - part->modification_time;
part_info.level = part->info.level;
part_info.data = &part;
@ -611,7 +611,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectAllPartsToMergeWithinParti
return SelectPartsDecision::CANNOT_SELECT;
}
sum_bytes += (*it)->getBytesOnDisk();
sum_bytes += (*it)->getExistingBytesOnDisk();
prev_it = it;
++it;
@ -793,7 +793,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMergerMutator::renameMergedTemporaryPart
}
size_t MergeTreeDataMergerMutator::estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts)
size_t MergeTreeDataMergerMutator::estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts, const bool & account_for_deleted)
{
size_t res = 0;
time_t current_time = std::time(nullptr);
@ -804,7 +804,10 @@ size_t MergeTreeDataMergerMutator::estimateNeededDiskSpace(const MergeTreeData::
if (part_max_ttl && part_max_ttl <= current_time)
continue;
res += part->getBytesOnDisk();
if (account_for_deleted)
res += part->getExistingBytesOnDisk();
else
res += part->getBytesOnDisk();
}
return static_cast<size_t>(res * DISK_USAGE_COEFFICIENT_TO_RESERVE);

View File

@ -193,7 +193,7 @@ public:
/// The approximate amount of disk space needed for merge or mutation. With a surplus.
static size_t estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts);
static size_t estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts, const bool & account_for_deleted = false);
private:
/** Select all parts belonging to the same partition.

View File

@ -537,6 +537,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
new_data_part->setColumns(columns, infos, metadata_snapshot->getMetadataVersion());
new_data_part->rows_count = block.rows();
new_data_part->existing_rows_count = block.rows();
new_data_part->partition = std::move(partition);
new_data_part->minmax_idx = std::move(minmax_idx);
new_data_part->is_temp = true;

View File

@ -42,6 +42,7 @@ struct Settings;
M(UInt64, compact_parts_max_bytes_to_buffer, 128 * 1024 * 1024, "Only available in ClickHouse Cloud", 0) \
M(UInt64, compact_parts_max_granules_to_buffer, 128, "Only available in ClickHouse Cloud", 0) \
M(UInt64, compact_parts_merge_max_bytes_to_prefetch_part, 16 * 1024 * 1024, "Only available in ClickHouse Cloud", 0) \
M(Bool, load_existing_rows_count_for_old_parts, false, "Whether to load existing_rows_count for existing parts. If false, existing_rows_count will be equal to rows_count for existing parts.", 0) \
\
/** Merge settings. */ \
M(UInt64, merge_max_block_size, 8192, "How many rows in blocks should be formed for merge operations. By default has the same value as `index_granularity`.", 0) \
@ -79,6 +80,7 @@ struct Settings;
M(UInt64, number_of_mutations_to_throw, 1000, "If table has at least that many unfinished mutations, throw 'Too many mutations' exception. Disabled if set to 0", 0) \
M(UInt64, min_delay_to_mutate_ms, 10, "Min delay of mutating MergeTree table in milliseconds, if there are a lot of unfinished mutations", 0) \
M(UInt64, max_delay_to_mutate_ms, 1000, "Max delay of mutating MergeTree table in milliseconds, if there are a lot of unfinished mutations", 0) \
M(Bool, exclude_deleted_rows_for_part_size_in_merge, false, "Use an estimated source part size (excluding lightweight deleted rows) when selecting parts to merge", 0) \
\
/** Inserts settings. */ \
M(UInt64, parts_to_delay_insert, 1000, "If table contains at least that many active parts in single partition, artificially slow down insert into table. Disabled if set to 0", 0) \

View File

@ -188,6 +188,11 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
new_part->index_granularity = writer->getIndexGranularity();
new_part->calculateColumnsAndSecondaryIndicesSizesOnDisk();
/// In mutation, existing_rows_count is already calculated in PartMergerWriter
/// In merge situation, lightweight deleted rows was physically deleted, existing_rows_count equals rows_count
if (!new_part->existing_rows_count.has_value())
new_part->existing_rows_count = rows_count;
if (default_codec != nullptr)
new_part->default_codec = default_codec;

View File

@ -49,7 +49,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
}
/// TODO - some better heuristic?
size_t estimated_space_for_result = MergeTreeDataMergerMutator::estimateNeededDiskSpace({source_part});
size_t estimated_space_for_result = MergeTreeDataMergerMutator::estimateNeededDiskSpace({source_part}, false);
if (entry.create_time + storage_settings_ptr->prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr)
&& estimated_space_for_result >= storage_settings_ptr->prefer_fetch_merged_part_size_threshold)

View File

@ -60,6 +60,26 @@ static bool checkOperationIsNotCanceled(ActionBlocker & merges_blocker, MergeLis
return true;
}
static UInt64 getExistingRowsCount(const Block & block)
{
auto column = block.getByName(RowExistsColumn::name).column;
const ColumnUInt8 * row_exists_col = typeid_cast<const ColumnUInt8 *>(column.get());
if (!row_exists_col)
{
LOG_WARNING(&Poco::Logger::get("MutationHelpers::getExistingRowsCount"), "_row_exists column type is not UInt8");
return block.rows();
}
UInt64 existing_count = 0;
for (UInt8 row_exists : row_exists_col->getData())
if (row_exists)
existing_count++;
return existing_count;
}
/** Split mutation commands into two parts:
* First part should be executed by mutations interpreter.
* Other is just simple drop/renames, so they can be executed without interpreter.
@ -997,6 +1017,9 @@ struct MutationContext
bool need_prefix = true;
scope_guard temporary_directory_lock;
/// Whether we need to count lightweight delete rows in this mutation
bool count_lightweight_deleted_rows;
};
using MutationContextPtr = std::shared_ptr<MutationContext>;
@ -1191,6 +1214,7 @@ public:
}
case State::SUCCESS:
{
finalize();
return false;
}
}
@ -1226,6 +1250,11 @@ private:
const ProjectionsDescription & projections;
ExecutableTaskPtr merge_projection_parts_task_ptr;
/// Existing rows count calculated during part writing.
/// It is initialized in prepare(), calculated in mutateOriginalPartAndPrepareProjections()
/// and set to new_data_part in finalize()
size_t existing_rows_count;
};
@ -1238,6 +1267,8 @@ void PartMergerWriter::prepare()
// We split the materialization into multiple stages similar to the process of INSERT SELECT query.
projection_squashes.emplace_back(settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes);
}
existing_rows_count = 0;
}
@ -1251,6 +1282,10 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()
ctx->out->write(cur_block);
/// TODO: move this calculation to DELETE FROM mutation
if (ctx->count_lightweight_deleted_rows)
existing_rows_count += MutationHelpers::getExistingRowsCount(cur_block);
for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i)
{
const auto & projection = *ctx->projections_to_build[i];
@ -1340,6 +1375,12 @@ bool PartMergerWriter::iterateThroughAllProjections()
return true;
}
void PartMergerWriter::finalize()
{
if (ctx->count_lightweight_deleted_rows)
ctx->new_data_part->existing_rows_count = existing_rows_count;
}
class MutateAllPartColumnsTask : public IExecutableTask
{
public:
@ -2185,6 +2226,20 @@ bool MutateTask::prepare()
if (ctx->mutating_pipeline_builder.initialized())
ctx->execute_ttl_type = MutationHelpers::shouldExecuteTTL(ctx->metadata_snapshot, ctx->interpreter->getColumnDependencies());
if (ctx->data->getSettings()->exclude_deleted_rows_for_part_size_in_merge && ctx->updated_header.has(RowExistsColumn::name))
{
/// This mutation contains lightweight delete and we need to count the deleted rows,
/// Reset existing_rows_count of new data part to 0 and it will be updated while writing _row_exists column
ctx->count_lightweight_deleted_rows = true;
}
else
{
ctx->count_lightweight_deleted_rows = false;
/// No need to count deleted rows, copy existing_rows_count from source part
ctx->new_data_part->existing_rows_count = ctx->source_part->existing_rows_count.value_or(ctx->source_part->rows_count);
}
/// All columns from part are changed and may be some more that were missing before in part
/// TODO We can materialize compact part without copying data
if (!isWidePart(ctx->source_part) || !isFullPartStorage(ctx->source_part->getDataPartStorage())

View File

@ -1350,7 +1350,10 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
auto part = data.getPartIfExists(name, {MergeTreeDataPartState::PreActive, MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated});
if (part)
{
sum_parts_size_in_bytes += part->getBytesOnDisk();
if (entry.type == LogEntry::MERGE_PARTS)
sum_parts_size_in_bytes += part->getExistingBytesOnDisk();
else
sum_parts_size_in_bytes += part->getBytesOnDisk();
if (entry.type == LogEntry::MUTATE_PART && !storage.mutation_backoff_policy.partCanBeMutated(part->name))
{

View File

@ -422,6 +422,7 @@ void ReadFromMerge::initializePipeline(QueryPipelineBuilder & pipeline, const Bu
std::vector<std::unique_ptr<QueryPipelineBuilder>> pipelines;
auto table_it = selected_tables.begin();
auto modified_context = Context::createCopy(context);
for (size_t i = 0; i < selected_tables.size(); ++i, ++table_it)
{
auto & child_plan = child_plans->at(i);
@ -438,7 +439,7 @@ void ReadFromMerge::initializePipeline(QueryPipelineBuilder & pipeline, const Bu
if (child_plan.row_policy_data_opt)
child_plan.row_policy_data_opt->extendNames(real_column_names);
auto modified_query_info = getModifiedQueryInfo(context, table, nested_storage_snaphsot, real_column_names, column_names_as_aliases, aliases);
auto modified_query_info = getModifiedQueryInfo(modified_context, table, nested_storage_snaphsot, real_column_names, column_names_as_aliases, aliases);
auto source_pipeline = createSources(
child_plan.plan,
@ -547,9 +548,10 @@ std::vector<ReadFromMerge::ChildPlan> ReadFromMerge::createChildrenPlans(SelectQ
}
/// Settings will be modified when planning children tables.
auto modified_context = Context::createCopy(context);
for (const auto & table : selected_tables)
{
auto modified_context = Context::createCopy(context);
size_t current_need_streams = tables_count >= num_streams ? 1 : (num_streams / tables_count);
size_t current_streams = std::min(current_need_streams, remaining_streams);
remaining_streams -= current_streams;
@ -570,25 +572,25 @@ std::vector<ReadFromMerge::ChildPlan> ReadFromMerge::createChildrenPlans(SelectQ
auto & aliases = res.back().table_aliases;
auto & row_policy_data_opt = res.back().row_policy_data_opt;
auto storage_metadata_snapshot = storage->getInMemoryMetadataPtr();
auto nested_storage_snaphsot = storage->getStorageSnapshot(storage_metadata_snapshot, context);
auto nested_storage_snaphsot = storage->getStorageSnapshot(storage_metadata_snapshot, modified_context);
Names column_names_as_aliases;
Names real_column_names = column_names;
const auto & database_name = std::get<0>(table);
const auto & table_name = std::get<3>(table);
auto row_policy_filter_ptr = context->getRowPolicyFilter(
auto row_policy_filter_ptr = modified_context->getRowPolicyFilter(
database_name,
table_name,
RowPolicyFilterType::SELECT_FILTER);
if (row_policy_filter_ptr)
{
row_policy_data_opt = RowPolicyData(row_policy_filter_ptr, storage, context);
row_policy_data_opt = RowPolicyData(row_policy_filter_ptr, storage, modified_context);
row_policy_data_opt->extendNames(real_column_names);
}
auto modified_query_info
= getModifiedQueryInfo(context, table, nested_storage_snaphsot, real_column_names, column_names_as_aliases, aliases);
= getModifiedQueryInfo(modified_context, table, nested_storage_snaphsot, real_column_names, column_names_as_aliases, aliases);
if (!context->getSettingsRef().allow_experimental_analyzer)
{
@ -657,10 +659,9 @@ std::vector<ReadFromMerge::ChildPlan> ReadFromMerge::createChildrenPlans(SelectQ
row_policy_data_opt,
modified_context,
current_streams);
res.back().plan.addInterpreterContext(modified_context);
}
if (!res.empty())
res[0].plan.addInterpreterContext(modified_context);
return res;
}
@ -681,8 +682,9 @@ public:
{
if (column->hasExpression())
{
auto column_name = column->getColumnName();
node = column->getExpressionOrThrow();
node->setAlias(column->getColumnName());
node->setAlias(column_name);
}
else
column->setColumnSource(replacement_table_expression);
@ -863,7 +865,7 @@ QueryTreeNodePtr replaceTableExpressionAndRemoveJoin(
}
SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const ContextPtr & modified_context,
SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const ContextMutablePtr & modified_context,
const StorageWithLockAndName & storage_with_lock_and_name,
const StorageSnapshotPtr & storage_snapshot_,
Names required_column_names,
@ -877,6 +879,9 @@ SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const ContextPtr & modified_
if (modified_query_info.optimized_prewhere_info && !modified_query_info.prewhere_info)
modified_query_info.prewhere_info = modified_query_info.optimized_prewhere_info;
if (modified_query_info.planner_context)
modified_query_info.planner_context = std::make_shared<PlannerContext>(modified_context, modified_query_info.planner_context);
if (modified_query_info.table_expression)
{
auto replacement_table_expression = std::make_shared<TableNode>(storage, storage_lock, storage_snapshot_);

View File

@ -192,7 +192,7 @@ private:
using Aliases = std::vector<AliasData>;
SelectQueryInfo getModifiedQueryInfo(const ContextPtr & modified_context,
SelectQueryInfo getModifiedQueryInfo(const ContextMutablePtr & modified_context,
const StorageWithLockAndName & storage_with_lock_and_name,
const StorageSnapshotPtr & storage_snapshot,
Names required_column_names,

View File

@ -1113,7 +1113,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
if (isTTLMergeType(future_part->merge_type))
getContext()->getMergeList().bookMergeWithTTL();
merging_tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part->parts), *this, metadata_snapshot, false);
merging_tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part->parts, true), *this, metadata_snapshot, false);
return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(merging_tagger), std::make_shared<MutationCommands>());
}
@ -1336,7 +1336,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
future_part->name = part->getNewName(new_part_info);
future_part->part_format = part->getFormat();
tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, metadata_snapshot, true);
tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}, false), *this, metadata_snapshot, true);
return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(tagger), commands, txn);
}
}

View File

@ -1451,7 +1451,8 @@ void StorageS3::Configuration::connect(const ContextPtr & context)
auth_settings.expiration_window_seconds.value_or(
context->getConfigRef().getUInt64("s3.expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS)),
auth_settings.no_sign_request.value_or(context->getConfigRef().getBool("s3.no_sign_request", false)),
});
},
credentials.GetSessionToken());
}
void StorageS3::processNamedCollectionResult(StorageS3::Configuration & configuration, const NamedCollection & collection)

View File

@ -30,7 +30,9 @@ ColumnsDescription StorageSystemScheduler::getColumnsDescription()
{"is_active", std::make_shared<DataTypeUInt8>(), "Whether this node is currently active - has resource requests to be dequeued and constraints satisfied."},
{"active_children", std::make_shared<DataTypeUInt64>(), "The number of children in active state."},
{"dequeued_requests", std::make_shared<DataTypeUInt64>(), "The total number of resource requests dequeued from this node."},
{"canceled_requests", std::make_shared<DataTypeUInt64>(), "The total number of resource requests canceled from this node."},
{"dequeued_cost", std::make_shared<DataTypeInt64>(), "The sum of costs (e.g. size in bytes) of all requests dequeued from this node."},
{"canceled_cost", std::make_shared<DataTypeInt64>(), "The sum of costs (e.g. size in bytes) of all requests canceled from this node."},
{"busy_periods", std::make_shared<DataTypeUInt64>(), "The total number of deactivations of this node."},
{"vruntime", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeFloat64>()),
"For children of `fair` nodes only. Virtual runtime of a node used by SFQ algorithm to select the next child to process in a max-min fair manner."},
@ -93,7 +95,9 @@ void StorageSystemScheduler::fillData(MutableColumns & res_columns, ContextPtr c
res_columns[i++]->insert(node->isActive());
res_columns[i++]->insert(node->activeChildren());
res_columns[i++]->insert(node->dequeued_requests.load());
res_columns[i++]->insert(node->canceled_requests.load());
res_columns[i++]->insert(node->dequeued_cost.load());
res_columns[i++]->insert(node->canceled_cost.load());
res_columns[i++]->insert(node->busy_periods.load());
Field vruntime;

View File

@ -1,3 +1,2 @@
test_concurrent_backups_s3/test.py::test_concurrent_backups
test_distributed_type_object/test.py::test_distributed_type_object
test_merge_table_over_distributed/test.py::test_global_in

View File

@ -138,13 +138,15 @@ check_spot_instance_is_old() {
check_proceed_spot_termination() {
# The function checks and proceeds spot instance termination if exists
# The event for spot instance termination
local FORCE
FORCE=${1:-}
if TERMINATION_DATA=$(curl -s --fail http://169.254.169.254/latest/meta-data/spot/instance-action); then
# https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/spot-instance-termination-notices.html#instance-action-metadata
_action=$(jq '.action' -r <<< "$TERMINATION_DATA")
_time=$(jq '.time | fromdate' <<< "$TERMINATION_DATA")
_until_action=$((_time - $(date +%s)))
echo "Received the '$_action' event that will be effective in $_until_action seconds"
if (( _until_action <= 30 )); then
if (( _until_action <= 30 )) || [ "$FORCE" == "force" ]; then
echo "The action $_action will be done in $_until_action, killing the runner and exit"
local runner_pid
runner_pid=$(pgrep Runner.Listener)
@ -309,7 +311,7 @@ while true; do
echo "Checking if the instance suppose to terminate"
no_terminating_metadata || terminate_on_event
check_spot_instance_is_old && terminate_and_exit
check_proceed_spot_termination
check_proceed_spot_termination force
echo "Going to configure runner"
sudo -u ubuntu ./config.sh --url $RUNNER_URL --token "$(get_runner_token)" \
@ -319,7 +321,7 @@ while true; do
echo "Another one check to avoid race between runner and infrastructure"
no_terminating_metadata || terminate_on_event
check_spot_instance_is_old && terminate_and_exit
check_proceed_spot_termination
check_proceed_spot_termination force
echo "Run"
sudo -u ubuntu \

View File

@ -4,9 +4,11 @@
# pylint: disable=global-variable-not-assigned
# pylint: disable=too-many-lines
# pylint: disable=anomalous-backslash-in-string
# pylint: disable=protected-access
import copy
import enum
import tempfile
import glob
# Not requests, to avoid requiring extra dependency.
@ -68,6 +70,144 @@ TEST_FILE_EXTENSIONS = [".sql", ".sql.j2", ".sh", ".py", ".expect"]
VERSION_PATTERN = r"^((\d+\.)?(\d+\.)?(\d+\.)?\d+)$"
class SharedEngineReplacer:
ENGINES_NON_REPLICATED_REGEXP = r"[ =]((Collapsing|VersionedCollapsing|Summing|Replacing|Aggregating|)MergeTree\(?\)?)"
ENGINES_MAPPING_REPLICATED = [
("ReplicatedMergeTree", "SharedMergeTree"),
("ReplicatedCollapsingMergeTree", "SharedCollapsingMergeTree"),
(
"ReplicatedVersionedCollapsingMergeTree",
"SharedVersionedCollapsingMergeTree",
),
("ReplicatedSummingMergeTree", "SharedSummingMergeTree"),
("ReplicatedReplacingMergeTree", "SharedReplacingMergeTree"),
("ReplicatedAggregatingMergeTree", "SharedAggregatingMergeTree"),
]
NEW_SYNTAX_REPLICATED_MERGE_TREE_RE = (
r"Replicated[a-zA-Z]*MergeTree\((\\?'.*\\?')?,?(\\?'.*\\?')?[a-zA-Z, _}{]*\)"
)
OLD_SYNTAX_OR_ARGUMENTS_RE = r"Tree\(.*[0-9]+.*\)"
def _check_replicad_new_syntax(self, line):
return re.search(self.NEW_SYNTAX_REPLICATED_MERGE_TREE_RE, line) is not None
def _check_old_syntax_or_arguments(self, line):
return re.search(self.OLD_SYNTAX_OR_ARGUMENTS_RE, line) is not None
@staticmethod
def _is_comment_line(line):
return line.startswith("SELECT") or line.startswith("select")
@staticmethod
def _is_create_query(line):
return (
line.startswith("CREATE")
or line.startswith("create")
or line.startswith("ENGINE")
or line.startswith("engine")
)
def _replace_non_replicated(self, line, escape_quotes, use_random_path):
groups = re.search(self.ENGINES_NON_REPLICATED_REGEXP, line)
if groups is not None and not self._check_old_syntax_or_arguments(line):
non_replicated_engine = groups.groups()[0]
basename_no_ext = os.path.splitext(os.path.basename(self.file_name))[0]
if use_random_path:
shared_path = "/" + os.path.join(
basename_no_ext.replace("_", "/"),
str(os.getpid()),
str(random.randint(1, 1000)),
)
else:
shared_path = "/" + os.path.join(
basename_no_ext.replace("_", "/"), str(os.getpid())
)
if escape_quotes:
shared_engine = (
"Shared"
+ non_replicated_engine.replace("()", "")
+ f"(\\'{shared_path}\\', \\'1\\')"
)
else:
shared_engine = (
"Shared"
+ non_replicated_engine.replace("()", "")
+ f"('{shared_path}', '1')"
)
return line.replace(non_replicated_engine, shared_engine)
return line
def _need_to_replace_something(self):
return (
self.replace_replicated or self.replace_non_replicated
) and "shared_merge_tree" not in self.file_name
def _has_show_create_table(self):
with open(self.file_name, "r", encoding="utf-8") as f:
return re.search("show create table", f.read(), re.IGNORECASE)
def __init__(
self, file_name, replace_replicated, replace_non_replicated, reference_file
):
self.file_name = file_name
self.temp_file_path = get_temp_file_path()
self.replace_replicated = replace_replicated
self.replace_non_replicated = replace_non_replicated
use_random_path = not reference_file and not self._has_show_create_table()
if not self._need_to_replace_something():
return
shutil.copyfile(self.file_name, self.temp_file_path)
shutil.copymode(self.file_name, self.temp_file_path)
with open(self.file_name, "w", newline="", encoding="utf-8") as modified:
with open(self.temp_file_path, "r", newline="", encoding="utf-8") as source:
for line in source:
if self._is_comment_line(line) or (
reference_file and not self._is_create_query(line)
):
modified.write(line)
continue
if self.replace_replicated:
for (
engine_from,
engine_to,
) in SharedEngineReplacer.ENGINES_MAPPING_REPLICATED:
if engine_from in line and (
self._check_replicad_new_syntax(line)
or engine_from + " " in line
or engine_from + ";" in line
):
line = line.replace(engine_from, engine_to)
break
if self.replace_non_replicated:
line = self._replace_non_replicated(
line, reference_file, use_random_path
)
modified.write(line)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_tb):
if not self._need_to_replace_something():
return
shutil.move(self.temp_file_path, self.file_name)
def get_temp_file_path():
return os.path.join(
tempfile._get_default_tempdir(), next(tempfile._get_candidate_names())
)
def stringhash(s: str) -> int:
# default hash() function consistent
# only during process invocation https://stackoverflow.com/a/42089311
@ -92,6 +232,16 @@ def trim_for_log(s):
return "\n".join(lines)
def is_valid_utf_8(fname):
try:
with open(fname, "rb") as f:
contents = f.read()
contents.decode("utf-8")
return True
except UnicodeDecodeError:
return False
class TestException(Exception):
pass
@ -536,6 +686,8 @@ class FailureReason(enum.Enum):
INTERNAL_QUERY_FAIL = "Internal query (CREATE/DROP DATABASE) failed:"
# SKIPPED reasons
NOT_SUPPORTED_IN_CLOUD = "not supported in cloud environment"
NOT_SUPPORTED_IN_PRIVATE = "not supported in private build"
DISABLED = "disabled"
SKIP = "skip"
NO_JINJA = "no jinja"
@ -548,6 +700,7 @@ class FailureReason(enum.Enum):
S3_STORAGE = "s3-storage"
BUILD = "not running for current build"
NO_PARALLEL_REPLICAS = "smth in not supported with parallel replicas"
SHARED_MERGE_TREE = "no-shared-merge-tree"
# UNKNOWN reasons
NO_REFERENCE = "no reference file"
@ -606,8 +759,6 @@ class SettingsRandomizer:
"read_in_order_two_level_merge_threshold": lambda: random.randint(0, 100),
"optimize_aggregation_in_order": lambda: random.randint(0, 1),
"aggregation_in_order_max_block_bytes": lambda: random.randint(0, 50000000),
"min_compress_block_size": lambda: random.randint(1, 1048576 * 3),
"max_compress_block_size": lambda: random.randint(1, 1048576 * 3),
"use_uncompressed_cache": lambda: random.randint(0, 1),
"min_bytes_to_use_direct_io": threshold_generator(
0.2, 0.5, 1, 10 * 1024 * 1024 * 1024
@ -659,6 +810,11 @@ class SettingsRandomizer:
0.3, 0.5, 1, 10 * 1024 * 1024 * 1024
),
"max_bytes_before_remerge_sort": lambda: random.randint(1, 3000000000),
"min_compress_block_size": lambda: random.randint(1, 1048576 * 3),
"max_compress_block_size": lambda: random.randint(1, 1048576 * 3),
"merge_tree_compact_parts_min_granules_to_multibuffer_read": lambda: random.randint(
1, 128
),
"optimize_sorting_by_input_stream_properties": lambda: random.randint(0, 1),
"http_response_buffer_size": lambda: random.randint(0, 10 * 1048576),
"http_wait_end_of_query": lambda: random.random() > 0.5,
@ -684,6 +840,7 @@ class SettingsRandomizer:
get_localzone(),
]
),
"prefer_warmed_unmerged_parts_seconds": lambda: random.randint(0, 10),
"use_page_cache_for_disks_without_file_cache": lambda: random.random() < 0.7,
"page_cache_inject_eviction": lambda: random.random() < 0.5,
}
@ -733,6 +890,17 @@ class MergeTreeSettingsRandomizer:
"primary_key_compress_block_size": lambda: random.randint(8000, 100000),
"replace_long_file_name_to_hash": lambda: random.randint(0, 1),
"max_file_name_length": threshold_generator(0.3, 0.3, 0, 128),
"min_bytes_for_full_part_storage": threshold_generator(
0.3, 0.3, 0, 512 * 1024 * 1024
),
"compact_parts_max_bytes_to_buffer": lambda: random.randint(
1024, 512 * 1024 * 1024
),
"compact_parts_max_granules_to_buffer": threshold_generator(0.15, 0.15, 1, 256),
"compact_parts_merge_max_bytes_to_prefetch_part": lambda: random.randint(
1, 32 * 1024 * 1024
),
"cache_populated_by_fetch": lambda: random.randint(0, 1),
}
@staticmethod
@ -744,6 +912,10 @@ class MergeTreeSettingsRandomizer:
return random_settings
def replace_in_file(filename, what, with_what):
os.system(f"LC_ALL=C sed -i -e 's|{what}|{with_what}|g' {filename}")
class TestResult:
def __init__(
self,
@ -972,6 +1144,15 @@ class TestCase:
if tags and ("disabled" in tags) and not args.disabled:
return FailureReason.DISABLED
elif args.private and self.name in suite.private_skip_list:
return FailureReason.NOT_SUPPORTED_IN_PRIVATE
elif args.cloud and ("no-replicated-database" in tags):
return FailureReason.REPLICATED_DB
elif args.cloud and self.name in suite.cloud_skip_list:
return FailureReason.NOT_SUPPORTED_IN_CLOUD
elif (
os.path.exists(os.path.join(suite.suite_path, self.name) + ".disabled")
and not args.disabled
@ -1022,6 +1203,13 @@ class TestCase:
):
return FailureReason.NON_ATOMIC_DB
elif (
tags
and ("no-shared-merge-tree" in tags)
and args.replace_replicated_with_shared
):
return FailureReason.SHARED_MERGE_TREE
elif tags and ("no-s3-storage" in tags) and args.s3_storage:
return FailureReason.S3_STORAGE
elif (
@ -1051,7 +1239,8 @@ class TestCase:
):
description = ""
debug_log = trim_for_log(debug_log)
if debug_log:
debug_log = "\n".join(debug_log.splitlines()[:100])
if proc:
if proc.returncode is None:
@ -1136,6 +1325,7 @@ class TestCase:
description += "\nstdout:\n"
description += trim_for_log(stdout)
description += "\n"
if debug_log:
description += "\n"
description += debug_log
@ -1148,9 +1338,7 @@ class TestCase:
)
if "Exception" in stdout:
description += "\n"
description += trim_for_log(stdout)
description += "\n"
description += "\n{}\n".format("\n".join(stdout.splitlines()[:100]))
if debug_log:
description += "\n"
description += debug_log
@ -1358,7 +1546,13 @@ class TestCase:
# because there are also output of per test database creation
pattern = "{test} > {stdout} 2> {stderr}"
if self.ext == ".sql":
if self.ext == ".sql" and args.cloud:
# Get at least some logs, because we don't have access to system.text_log and pods...
pattern = (
"{client} --send_logs_level={logs_level} {secure} --multiquery {options}"
" --send_logs_level=trace < {test} > {stdout} 2>> /test_output/some_logs_from_server.log"
)
elif self.ext == ".sql" and not args.cloud:
pattern = (
"{client} --send_logs_level={logs_level} {secure} --multiquery {options} < "
+ pattern
@ -1396,17 +1590,15 @@ class TestCase:
total_time = (datetime.now() - start_time).total_seconds()
# Normalize randomized database names in stdout, stderr files.
os.system(f"LC_ALL=C sed -i -e 's/{database}/default/g' {self.stdout_file}")
replace_in_file(self.stdout_file, database, "default")
if args.hide_db_name:
os.system(f"LC_ALL=C sed -i -e 's/{database}/default/g' {self.stderr_file}")
replace_in_file(self.stderr_file, database, "default")
if args.replicated_database:
os.system(f"LC_ALL=C sed -i -e 's|/auto_{{shard}}||g' {self.stdout_file}")
os.system(f"LC_ALL=C sed -i -e 's|auto_{{replica}}||g' {self.stdout_file}")
replace_in_file(self.stdout_file, "/auto_{shard}", "")
replace_in_file(self.stdout_file, "auto_{replica}", "")
# Normalize hostname in stdout file.
os.system(
f"LC_ALL=C sed -i -e 's/{socket.gethostname()}/localhost/g' {self.stdout_file}"
)
replace_in_file(self.stdout_file, socket.gethostname(), "localhost")
stdout = ""
if os.path.exists(self.stdout_file):
@ -1444,18 +1636,51 @@ class TestCase:
self.testcase_args = self.configure_testcase_args(
args, self.case_file, suite.suite_tmp_path
)
client_options = self.add_random_settings(client_options)
proc, stdout, stderr, debug_log, total_time = self.run_single_test(
server_logs_level, client_options
)
result = self.process_result_impl(
proc, stdout, stderr, debug_log, total_time
)
result.check_if_need_retry(args, stdout, stderr, self.runs_count)
# to avoid breaking CSV parser
result.description = result.description.replace("\0", "")
if not is_valid_utf_8(self.case_file) or not is_valid_utf_8(
self.reference_file
):
proc, stdout, stderr, debug_log, total_time = self.run_single_test(
server_logs_level, client_options
)
result = self.process_result_impl(
proc, stdout, stderr, debug_log, total_time
)
result.check_if_need_retry(args, stdout, stderr, self.runs_count)
# to avoid breaking CSV parser
result.description = result.description.replace("\0", "")
else:
with SharedEngineReplacer(
self.case_file,
args.replace_replicated_with_shared,
args.replace_non_replicated_with_shared,
False,
):
with SharedEngineReplacer(
self.reference_file,
args.replace_replicated_with_shared,
args.replace_non_replicated_with_shared,
True,
):
(
proc,
stdout,
stderr,
debug_log,
total_time,
) = self.run_single_test(server_logs_level, client_options)
result = self.process_result_impl(
proc, stdout, stderr, debug_log, total_time
)
result.check_if_need_retry(
args, stdout, stderr, self.runs_count
)
# to avoid breaking CSV parser
result.description = result.description.replace("\0", "")
if result.status == TestStatus.FAIL:
result.description = self.add_info_about_settings(result.description)
@ -1688,6 +1913,8 @@ class TestSuite:
self.suite_path: str = suite_path
self.suite_tmp_path: str = suite_tmp_path
self.suite: str = suite
self.cloud_skip_list: List[str] = []
self.private_skip_list: List[str] = []
if args.run_by_hash_num is not None and args.run_by_hash_total is not None:
if args.run_by_hash_num > args.run_by_hash_total:
@ -1987,10 +2214,16 @@ def check_server_started(args):
sys.stdout.flush()
retry_count = args.server_check_retries
query = "SELECT version(), arrayStringConcat(groupArray(value), ' ') FROM system.build_options WHERE name IN ('GIT_HASH', 'GIT_BRANCH')"
while retry_count > 0:
try:
clickhouse_execute(args, "SELECT 1", max_http_retries=1)
res = (
str(clickhouse_execute(args, query).decode())
.strip()
.replace("\t", " @ ")
)
print(" OK")
print(f"Connected to server {res}")
sys.stdout.flush()
return True
except (ConnectionError, http.client.ImproperConnectionState) as e:
@ -2412,6 +2645,23 @@ def reportLogStats(args):
print("\n")
def try_get_skip_list(base_dir, name):
test_names_to_skip = []
skip_list_path = os.path.join(base_dir, name)
if not os.path.exists(skip_list_path):
return test_names_to_skip
with open(skip_list_path, "r", encoding="utf-8") as fd:
for line in fd.read().split("\n"):
if line == "" or line[0] == " ":
continue
test_name = line.split()[0].strip()
if test_name != "":
test_names_to_skip.append(test_name)
return test_names_to_skip
def main(args):
global server_died
global stop_time
@ -2430,18 +2680,18 @@ def main(args):
args.build_flags = collect_build_flags(args)
args.changed_merge_tree_settings = collect_changed_merge_tree_settings(args)
args.suppport_system_processes_is_all_data_sent = check_table_column(
args, "system", "processes", "is_all_data_sent"
)
if args.s3_storage and (
BuildFlags.THREAD in args.build_flags or BuildFlags.DEBUG in args.build_flags
):
if args.s3_storage and (BuildFlags.RELEASE not in args.build_flags):
args.no_random_settings = True
if args.skip:
args.skip = set(args.skip)
if args.replace_replicated_with_shared:
if not args.skip:
args.skip = set([])
args.skip = set(args.skip)
base_dir = os.path.abspath(args.queries)
# Keep same default values as in queries/shell_config.sh
@ -2516,6 +2766,8 @@ def main(args):
)
total_tests_run = 0
cloud_skip_list = try_get_skip_list(base_dir, "../queries-no-cloud-tests.txt")
private_skip_list = try_get_skip_list(base_dir, "../queries-no-private-tests.txt")
for suite in sorted(os.listdir(base_dir), key=suite_key_func):
if server_died.is_set():
@ -2525,6 +2777,8 @@ def main(args):
if test_suite is None:
continue
test_suite.cloud_skip_list = cloud_skip_list
test_suite.private_skip_list = private_skip_list
total_tests_run += do_run_tests(args.jobs, test_suite, args.parallel)
if server_died.is_set():
@ -2644,7 +2898,14 @@ def find_clickhouse_command(binary, command):
def get_additional_client_options(args):
if args.client_option:
return " ".join("--" + option for option in args.client_option)
client_options = " ".join("--" + option for option in args.client_option)
if "CLICKHOUSE_CLIENT_OPT" in os.environ:
return os.environ["CLICKHOUSE_CLIENT_OPT"] + client_options
else:
return client_options
else:
if "CLICKHOUSE_CLIENT_OPT" in os.environ:
return os.environ["CLICKHOUSE_CLIENT_OPT"]
return ""
@ -2839,6 +3100,43 @@ def parse_args():
help="Display $ characters after line with trailing whitespaces in diff output",
)
group = parser.add_mutually_exclusive_group(required=False)
group.add_argument(
"--cloud",
action="store_true",
default=None,
dest="cloud",
help="Run only tests that are supported in ClickHouse Cloud environment",
)
group.add_argument(
"--no-cloud",
action="store_false",
default=None,
dest="cloud",
help="Run all the tests, including the ones not supported in ClickHouse Cloud environment",
)
parser.set_defaults(cloud=False)
group = parser.add_mutually_exclusive_group(required=False)
group.add_argument(
"--private",
action="store_true",
default=None,
dest="private",
help="Run only tests that are supported in the private build",
)
group.add_argument(
"--no-private",
action="store_false",
default=None,
dest="private",
help="Run all the tests, including the ones not supported in the private build",
)
# Only used to skip tests via "../queries-no-private-tests.txt", so it's fine to keep it enabled by default
parser.set_defaults(private=True)
group = parser.add_mutually_exclusive_group(required=False)
group.add_argument(
"--zookeeper",
@ -2920,6 +3218,18 @@ def parse_args():
default=False,
help="Do not include tests that are not supported with parallel replicas feature",
)
parser.add_argument(
"--replace-replicated-with-shared",
action="store_true",
default=os.environ.get("USE_META_IN_KEEPER_FOR_MERGE_TREE", False),
help="Replace ReplicatedMergeTree engine with SharedMergeTree",
)
parser.add_argument(
"--replace-non-replicated-with-shared",
action="store_true",
default=False,
help="Replace ordinary MergeTree engine with SharedMergeTree",
)
return parser.parse_args()
@ -3062,6 +3372,7 @@ if __name__ == "__main__":
client_options_query_str = get_additional_client_options_url(args)
args.client_options_query_str = client_options_query_str + "&"
args.client_options_query_str += os.environ["CLICKHOUSE_URL_PARAMS"]
os.environ["CLICKHOUSE_URL_PARAMS"] += client_options_query_str
else:
args.client_options_query_str = ""
@ -3072,4 +3383,7 @@ if __name__ == "__main__":
if args.db_engine and args.db_engine == "Ordinary":
MESSAGES_TO_RETRY.append(" locking attempt on ")
if args.replace_replicated_with_shared:
args.s3_storage = True
main(args)

View File

@ -332,7 +332,13 @@ def test_different_part_types_on_replicas(start_cluster, table, part_type):
for _ in range(3):
insert_random_data(table, leader, 100)
leader.query("OPTIMIZE TABLE {} FINAL".format(table))
exec_query_with_retry(
leader,
"OPTIMIZE TABLE {} FINAL".format(table),
settings={"optimize_throw_if_noop": 1},
silent=True,
)
follower.query("SYSTEM SYNC REPLICA {}".format(table), timeout=20)
expected = "{}\t1\n".format(part_type)

View File

@ -1414,10 +1414,10 @@ def test_signatures(started_cluster):
)
assert int(result) == 1
result = instance.query(
error = instance.query_and_get_error(
f"select * from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test.arrow', 'minio', 'minio123', '{session_token}')"
)
assert int(result) == 1
assert "S3_ERROR" in error
result = instance.query(
f"select * from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test.arrow', 'Arrow', 'x UInt64', 'auto')"
@ -1429,20 +1429,20 @@ def test_signatures(started_cluster):
)
assert int(result) == 1
result = instance.query(
error = instance.query_and_get_error(
f"select * from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test.arrow', 'minio', 'minio123', '{session_token}', 'Arrow')"
)
assert int(result) == 1
assert "S3_ERROR" in error
lt = instance.query(
error = instance.query_and_get_error(
f"select * from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test.arrow', 'minio', 'minio123', '{session_token}', 'Arrow', 'x UInt64')"
)
assert int(result) == 1
assert "S3_ERROR" in error
lt = instance.query(
error = instance.query_and_get_error(
f"select * from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test.arrow', 'minio', 'minio123', '{session_token}', 'Arrow', 'x UInt64', 'auto')"
)
assert int(result) == 1
assert "S3_ERROR" in error
def test_select_columns(started_cluster):

View File

@ -12,20 +12,12 @@ cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node", with_zookeeper=True, main_configs=["configs/remote_servers.xml"]
)
node_1 = cluster.add_instance("node_1", with_zookeeper=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
node_1.query_with_retry("DROP TABLE IF EXISTS replicated")
node_1.query_with_retry(
"""CREATE TABLE replicated (id UInt32, date Date) ENGINE =
ReplicatedMergeTree('/clickhouse/tables/replicated', 'node_1') ORDER BY id PARTITION BY toYYYYMM(date)"""
)
node.query_with_retry(
"CREATE TABLE distributed (id UInt32, date Date) ENGINE = Distributed('test_cluster', 'default', 'replicated')"
)
@ -37,8 +29,6 @@ def started_cluster():
def test(started_cluster):
cluster.pause_container("node_1")
node.query("SYSTEM RELOAD CONFIG")
error = node.query_and_get_error(
"SELECT count() FROM distributed SETTINGS receive_timeout=1, handshake_timeout_ms=1"
@ -67,5 +57,3 @@ def test(started_cluster):
assert recovery_time == 0
assert errors_count == 0
cluster.unpause_container("node_1")

View File

@ -1,9 +1,9 @@
#!/usr/bin/expect -f
# Tags: no-fasttest
# Tag no-fasttest: 180 seconds running
log_user 0
set timeout 60
match_max 100000
if ![info exists env(CLICKHOUSE_PORT_TCP)] {set env(CLICKHOUSE_PORT_TCP) 9000}
@ -13,15 +13,23 @@ expect ":) "
# Make a query
send -- "SELECT 1\r"
expect ":-] "
send -- "-- xxx\r"
expect ":-] "
send -- ", 2\r"
expect ":-] "
send -- ";\r"
send -- ";"
# For some reason this sleep is required for this test to work properly
sleep 1
send -- "\r"
expect {
"│ 1 │ 2 │" { }
timeout { exit 1 }
}
expect "│ 1 │ 2 │"
expect ":) "
send -- "\4"
expect eof
send -- ""
expect {
eof { exit 0 }
timeout { exit 1 }
}

View File

@ -0,0 +1,10 @@
alias1
1 4 16 23
23 16 4 1
2020-02-02 1 4 2 16 3 23
alias2
1 3 4 4
4 4 3 1
23 16 4 1
2020-02-01 1 3 2 4 3 4
2020-02-02 1 4 2 16 3 23

View File

@ -0,0 +1,60 @@
-- Tags: no-parallel
drop table if exists merge;
set allow_experimental_analyzer = 1;
create table merge
(
dt Date,
colAlias0 Int32,
colAlias1 Int32,
col2 Int32,
colAlias2 UInt32,
col3 Int32,
colAlias3 UInt32
)
engine = Merge(currentDatabase(), '^alias_');
drop table if exists alias_1;
drop table if exists alias_2;
create table alias_1
(
dt Date,
col Int32,
colAlias0 UInt32 alias col,
colAlias1 UInt32 alias col3 + colAlias0,
col2 Int32,
colAlias2 Int32 alias colAlias1 + col2 + 10,
col3 Int32,
colAlias3 Int32 alias colAlias2 + colAlias1 + col3
)
engine = MergeTree()
order by (dt);
insert into alias_1 (dt, col, col2, col3) values ('2020-02-02', 1, 2, 3);
select 'alias1';
select colAlias0, colAlias1, colAlias2, colAlias3 from alias_1;
select colAlias3, colAlias2, colAlias1, colAlias0 from merge;
select * from merge;
create table alias_2
(
dt Date,
col Int32,
col2 Int32,
colAlias0 UInt32 alias col,
colAlias3 Int32 alias col3 + colAlias0,
colAlias1 UInt32 alias colAlias0 + col2,
colAlias2 Int32 alias colAlias0 + colAlias1,
col3 Int32
)
engine = MergeTree()
order by (dt);
insert into alias_2 (dt, col, col2, col3) values ('2020-02-01', 1, 2, 3);
select 'alias2';
select colAlias0, colAlias1, colAlias2, colAlias3 from alias_2;
select colAlias3, colAlias2, colAlias1, colAlias0 from merge order by dt;
select * from merge order by dt;

View File

@ -1 +1,2 @@
localhost 127.0.0.1 IPv4 1
hostname ip_address ip_family cached_at
String String Enum8(\'IPv4\' = 0, \'IPv6\' = 1, \'UNIX_LOCAL\' = 2) DateTime

View File

@ -1,26 +0,0 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# Retries are necessary because the DNS cache may be flushed before second statement is executed
i=0
retries=5
while [[ $i -lt $retries ]]; do
${CLICKHOUSE_CURL} -sS --fail --data "SELECT * FROM url('http://localhost:8123/ping', CSV, 'auto', headers())" "${CLICKHOUSE_URL}" | grep -oP -q 'Ok.' || continue
RECORDS=$(${CLICKHOUSE_CURL} -sS --fail --data "SELECT hostname, ip_address, ip_family, (isNotNull(cached_at) AND cached_at > '1970-01-01 00:00:00') FROM system.dns_cache WHERE hostname = 'localhost' and ip_family = 'IPv4';" "${CLICKHOUSE_URL}")
if [[ -n "${RECORDS}" ]]; then
echo "${RECORDS}"
exit 0
fi
((++i))
sleep 0.2
done
echo "All tries to fetch entries for localhost failed, no rows returned.
Probably the DNS cache is disabled or the ClickHouse instance not responds to ping."
exit 1

View File

@ -0,0 +1,3 @@
SELECT hostname, ip_address, ip_family, cached_at FROM system.dns_cache
LIMIT 0
FORMAT TSVWithNamesAndTypes;

View File

@ -0,0 +1,3 @@
2
2
1

View File

@ -0,0 +1,26 @@
DROP TABLE IF EXISTS lwd_merge;
CREATE TABLE lwd_merge (id UInt64 CODEC(NONE))
ENGINE = MergeTree ORDER BY id
SETTINGS max_bytes_to_merge_at_max_space_in_pool = 80000, exclude_deleted_rows_for_part_size_in_merge = 0;
INSERT INTO lwd_merge SELECT number FROM numbers(10000);
INSERT INTO lwd_merge SELECT number FROM numbers(10000, 10000);
OPTIMIZE TABLE lwd_merge;
SELECT count() FROM system.parts WHERE database = currentDatabase() AND table = 'lwd_merge' AND active = 1;
DELETE FROM lwd_merge WHERE id % 10 > 0;
OPTIMIZE TABLE lwd_merge;
SELECT count() FROM system.parts WHERE database = currentDatabase() AND table = 'lwd_merge' AND active = 1;
ALTER TABLE lwd_merge MODIFY SETTING exclude_deleted_rows_for_part_size_in_merge = 1;
-- delete again because deleted rows will be counted in mutation
DELETE FROM lwd_merge WHERE id % 100 == 0;
OPTIMIZE TABLE lwd_merge;
SELECT count() FROM system.parts WHERE database = currentDatabase() AND table = 'lwd_merge' AND active = 1;
DROP TABLE IF EXISTS lwd_merge;

View File

@ -0,0 +1,2 @@
CREATE MATERIALIZED VIEW default.mv_03002 TO default.table_03002\n(\n `ts` DateTime\n)\nAS SELECT ts\nFROM default.table_03002
CREATE MATERIALIZED VIEW default.mv_03002 TO default.table_03002\n(\n `ts` DateTime\n)\nAS WITH MY_CTE AS\n (\n SELECT ts\n FROM default.table_03002\n )\nSELECT *\nFROM MY_CTE

View File

@ -0,0 +1,15 @@
CREATE TABLE table_03002 (ts DateTime, event_type String) ENGINE = MergeTree ORDER BY (event_type, ts);
CREATE MATERIALIZED VIEW mv_03002 TO table_03002 AS SELECT ts FROM table_03002;
SHOW CREATE TABLE mv_03002;
ALTER TABLE mv_03002 MODIFY QUERY
WITH MY_CTE AS (SELECT ts FROM table_03002)
SELECT * FROM MY_CTE;
SHOW CREATE TABLE mv_03002;
DROP TABLE mv_03002;
DROP TABLE table_03002;

View File

@ -0,0 +1,16 @@
0 30000
1 30000
2 30000
0 30000
1 30000
2 30000
0 449985000
1 449985000
2 449985000
0 449985000
1 449985000
2 449985000
sum 1 1
sum 16 1
uniqExact 1 1
uniqExact 16 0

View File

@ -0,0 +1,29 @@
DROP TABLE IF EXISTS t_optimize_equal_ranges;
CREATE TABLE t_optimize_equal_ranges (a UInt64, b String, c UInt64) ENGINE = MergeTree ORDER BY a;
SET max_block_size = 1024;
SET max_bytes_before_external_group_by = 0;
SET optimize_aggregation_in_order = 0;
SET optimize_use_projections = 0;
INSERT INTO t_optimize_equal_ranges SELECT 0, toString(number), number FROM numbers(30000);
INSERT INTO t_optimize_equal_ranges SELECT 1, toString(number), number FROM numbers(30000);
INSERT INTO t_optimize_equal_ranges SELECT 2, toString(number), number FROM numbers(30000);
SELECT a, uniqExact(b) FROM t_optimize_equal_ranges GROUP BY a ORDER BY a SETTINGS max_threads = 16;
SELECT a, uniqExact(b) FROM t_optimize_equal_ranges GROUP BY a ORDER BY a SETTINGS max_threads = 1;
SELECT a, sum(c) FROM t_optimize_equal_ranges GROUP BY a ORDER BY a SETTINGS max_threads = 16;
SELECT a, sum(c) FROM t_optimize_equal_ranges GROUP BY a ORDER BY a SETTINGS max_threads = 1;
SYSTEM FLUSH LOGS;
SELECT
used_aggregate_functions[1] AS func,
Settings['max_threads'] AS threads,
ProfileEvents['AggregationOptimizedEqualRangesOfKeys'] > 0
FROM system.query_log
WHERE type = 'QueryFinish' AND current_database = currentDatabase() AND query LIKE '%SELECT%FROM%t_optimize_equal_ranges%'
ORDER BY func, threads;
DROP TABLE t_optimize_equal_ranges;

View File

@ -0,0 +1,36 @@
DROP TABLE IF EXISTS t_uniq_exact;
CREATE TABLE t_uniq_exact (a UInt64, b String, c UInt64) ENGINE = MergeTree ORDER BY a;
SET group_by_two_level_threshold_bytes = 1;
SET group_by_two_level_threshold = 1;
SET max_threads = 4;
SET max_bytes_before_external_group_by = 0;
SET optimize_aggregation_in_order = 0;
INSERT INTO t_uniq_exact SELECT 0, randomPrintableASCII(5), rand() FROM numbers(300000);
INSERT INTO t_uniq_exact SELECT 1, randomPrintableASCII(5), rand() FROM numbers(300000);
INSERT INTO t_uniq_exact SELECT 2, randomPrintableASCII(5), rand() FROM numbers(300000);
INSERT INTO t_uniq_exact SELECT 3, randomPrintableASCII(5), rand() FROM numbers(300000);
INSERT INTO t_uniq_exact SELECT 4, randomPrintableASCII(5), rand() FROM numbers(300000);
INSERT INTO t_uniq_exact SELECT 5, randomPrintableASCII(5), rand() FROM numbers(300000);
INSERT INTO t_uniq_exact SELECT 6, randomPrintableASCII(5), rand() FROM numbers(300000);
INSERT INTO t_uniq_exact SELECT 7, randomPrintableASCII(5), rand() FROM numbers(300000);
INSERT INTO t_uniq_exact SELECT 8, randomPrintableASCII(5), rand() FROM numbers(300000);
INSERT INTO t_uniq_exact SELECT 9, randomPrintableASCII(5), rand() FROM numbers(300000);
OPTIMIZE TABLE t_uniq_exact FINAL;
SELECT a, uniqExact(b) FROM t_uniq_exact GROUP BY a ORDER BY a
SETTINGS min_hit_rate_to_use_consecutive_keys_optimization = 1.0
EXCEPT
SELECT a, uniqExact(b) FROM t_uniq_exact GROUP BY a ORDER BY a
SETTINGS min_hit_rate_to_use_consecutive_keys_optimization = 0.5;
SELECT a, sum(c) FROM t_uniq_exact GROUP BY a ORDER BY a
SETTINGS min_hit_rate_to_use_consecutive_keys_optimization = 1.0
EXCEPT
SELECT a, sum(c) FROM t_uniq_exact GROUP BY a ORDER BY a
SETTINGS min_hit_rate_to_use_consecutive_keys_optimization = 0.5;
DROP TABLE t_uniq_exact;

View File

@ -0,0 +1,2 @@
create table test (number UInt64) engine=FileLog('./user_files/data.jsonl', 'JSONEachRow') settings poll_max_batch_size=18446744073709; -- {serverError INVALID_SETTING_VALUE}

View File

@ -0,0 +1,68 @@
(5,NULL)
(5,NULL)
(5,NULL)
QUERY id: 0
PROJECTION COLUMNS
(sumIf(toNullable(1), equals(modulo(number, 2), 0)), NULL) Tuple(Nullable(UInt64), Nullable(Nothing))
PROJECTION
LIST id: 1, nodes: 1
FUNCTION id: 2, function_name: tuple, function_type: ordinary, result_type: Tuple(Nullable(UInt64), Nullable(Nothing))
ARGUMENTS
LIST id: 3, nodes: 2
FUNCTION id: 4, function_name: sumIf, function_type: aggregate, result_type: Nullable(UInt64)
ARGUMENTS
LIST id: 5, nodes: 2
CONSTANT id: 6, constant_value: UInt64_1, constant_value_type: Nullable(UInt8)
EXPRESSION
FUNCTION id: 7, function_name: toNullable, function_type: ordinary, result_type: Nullable(UInt8)
ARGUMENTS
LIST id: 8, nodes: 1
CONSTANT id: 9, constant_value: UInt64_1, constant_value_type: UInt8
FUNCTION id: 10, function_name: equals, function_type: ordinary, result_type: UInt8
ARGUMENTS
LIST id: 11, nodes: 2
FUNCTION id: 12, function_name: modulo, function_type: ordinary, result_type: UInt8
ARGUMENTS
LIST id: 13, nodes: 2
COLUMN id: 14, column_name: number, result_type: UInt64, source_id: 15
CONSTANT id: 16, constant_value: UInt64_2, constant_value_type: UInt8
CONSTANT id: 17, constant_value: UInt64_0, constant_value_type: UInt8
CONSTANT id: 18, constant_value: NULL, constant_value_type: Nullable(Nothing)
JOIN TREE
TABLE_FUNCTION id: 15, alias: __table1, table_function_name: numbers
ARGUMENTS
LIST id: 19, nodes: 1
CONSTANT id: 20, constant_value: UInt64_10, constant_value_type: UInt8
(5,NULL)
QUERY id: 0
PROJECTION COLUMNS
(sum(if(equals(modulo(number, 2), 0), toNullable(1), 0)), NULL) Tuple(Nullable(UInt64), Nullable(Nothing))
PROJECTION
LIST id: 1, nodes: 1
FUNCTION id: 2, function_name: tuple, function_type: ordinary, result_type: Tuple(Nullable(UInt64), Nullable(Nothing))
ARGUMENTS
LIST id: 3, nodes: 2
FUNCTION id: 4, function_name: sumOrNullIf, function_type: aggregate, result_type: Nullable(UInt64)
ARGUMENTS
LIST id: 5, nodes: 2
CONSTANT id: 6, constant_value: UInt64_1, constant_value_type: Nullable(UInt8)
EXPRESSION
FUNCTION id: 7, function_name: toNullable, function_type: ordinary, result_type: Nullable(UInt8)
ARGUMENTS
LIST id: 8, nodes: 1
CONSTANT id: 9, constant_value: UInt64_1, constant_value_type: UInt8
FUNCTION id: 10, function_name: equals, function_type: ordinary, result_type: UInt8
ARGUMENTS
LIST id: 11, nodes: 2
FUNCTION id: 12, function_name: modulo, function_type: ordinary, result_type: UInt8
ARGUMENTS
LIST id: 13, nodes: 2
COLUMN id: 14, column_name: number, result_type: UInt64, source_id: 15
CONSTANT id: 16, constant_value: UInt64_2, constant_value_type: UInt8
CONSTANT id: 17, constant_value: UInt64_0, constant_value_type: UInt8
CONSTANT id: 18, constant_value: NULL, constant_value_type: Nullable(Nothing)
JOIN TREE
TABLE_FUNCTION id: 15, alias: __table1, table_function_name: numbers
ARGUMENTS
LIST id: 19, nodes: 1
CONSTANT id: 20, constant_value: UInt64_10, constant_value_type: UInt8

View File

@ -0,0 +1,11 @@
SET optimize_rewrite_sum_if_to_count_if = 1;
SET allow_experimental_analyzer = 0;
SELECT (sumIf(toNullable(1), (number % 2) = 0), NULL) FROM numbers(10);
SELECT (sum(if((number % 2) = 0, toNullable(1), 0)), NULL) FROM numbers(10);
SET allow_experimental_analyzer = 1;
SELECT (sumIf(toNullable(1), (number % 2) = 0), NULL) FROM numbers(10);
EXPLAIN QUERY TREE SELECT (sumIf(toNullable(1), (number % 2) = 0), NULL) FROM numbers(10);
SELECT (sum(if((number % 2) = 0, toNullable(1), 0)), NULL) FROM numbers(10);
EXPLAIN QUERY TREE SELECT (sum(if((number % 2) = 0, toNullable(1), 0)), NULL) FROM numbers(10);

View File

@ -0,0 +1,4 @@
-- Tags: no-replicated-database
SELECT least(value, 0) FROM system.asynchronous_metrics WHERE metric = 'VMMaxMapCount';
SELECT least(value, 0) FROM system.asynchronous_metrics WHERE metric = 'VMNumMaps';

View File

@ -10,7 +10,7 @@ TU_EXCLUDES=(
Aggregator
)
if find $1 -name '*.o' | xargs wc -c | grep -v total | sort -rn | awk '{ if ($1 > 50000000) print }' \
if find $1 -name '*.o' | xargs wc -c | grep --regexp='\.o$' | sort -rn | awk '{ if ($1 > 50000000) print }' \
| grep -v -f <(printf "%s\n" "${TU_EXCLUDES[@]}")
then
echo "^ It's not allowed to have so large translation units."

View File

@ -8,6 +8,19 @@ use Data::Dumper;
my @current_stack = ();
my $grouped_stacks = {};
sub process_stacktrace
{
my $group = \$grouped_stacks;
for my $frame (reverse @current_stack)
{
$$group->{count} ||= 0;
++$$group->{count};
$group = \$$group->{children}{$frame};
}
@current_stack = ();
}
while (my $line = <>)
{
chomp $line;
@ -21,18 +34,12 @@ while (my $line = <>)
if ($line eq '')
{
my $group = \$grouped_stacks;
for my $frame (reverse @current_stack)
{
$$group->{count} ||= 0;
++$$group->{count};
$group = \$$group->{children}{$frame};
}
@current_stack = ();
process_stacktrace();
}
}
process_stacktrace();
sub print_group
{
my $group = shift;