Merge branch 'master' into pufit/role-from-storage

This commit is contained in:
pufit 2023-08-08 20:20:37 -04:00 committed by GitHub
commit 884ea7f592
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
94 changed files with 1659 additions and 479 deletions

View File

@ -3,6 +3,9 @@ name: BackportPR
env:
# Force the stdout and stderr streams to be unbuffered
PYTHONUNBUFFERED: 1
# Export system tables to ClickHouse Cloud
CLICKHOUSE_CI_LOGS_HOST: ${{ secrets.CLICKHOUSE_CI_LOGS_HOST }}
CLICKHOUSE_CI_LOGS_PASSWORD: ${{ secrets.CLICKHOUSE_CI_LOGS_PASSWORD }}
on: # yamllint disable-line rule:truthy
push:

View File

@ -3,6 +3,9 @@ name: MasterCI
env:
# Force the stdout and stderr streams to be unbuffered
PYTHONUNBUFFERED: 1
# Export system tables to ClickHouse Cloud
CLICKHOUSE_CI_LOGS_HOST: ${{ secrets.CLICKHOUSE_CI_LOGS_HOST }}
CLICKHOUSE_CI_LOGS_PASSWORD: ${{ secrets.CLICKHOUSE_CI_LOGS_PASSWORD }}
on: # yamllint disable-line rule:truthy
push:

View File

@ -3,6 +3,9 @@ name: PullRequestCI
env:
# Force the stdout and stderr streams to be unbuffered
PYTHONUNBUFFERED: 1
# Export system tables to ClickHouse Cloud
CLICKHOUSE_CI_LOGS_HOST: ${{ secrets.CLICKHOUSE_CI_LOGS_HOST }}
CLICKHOUSE_CI_LOGS_PASSWORD: ${{ secrets.CLICKHOUSE_CI_LOGS_PASSWORD }}
on: # yamllint disable-line rule:truthy
pull_request:

View File

@ -3,6 +3,9 @@ name: ReleaseBranchCI
env:
# Force the stdout and stderr streams to be unbuffered
PYTHONUNBUFFERED: 1
# Export system tables to ClickHouse Cloud
CLICKHOUSE_CI_LOGS_HOST: ${{ secrets.CLICKHOUSE_CI_LOGS_HOST }}
CLICKHOUSE_CI_LOGS_PASSWORD: ${{ secrets.CLICKHOUSE_CI_LOGS_PASSWORD }}
on: # yamllint disable-line rule:truthy
push:

View File

@ -52,7 +52,6 @@
* Add new setting `disable_url_encoding` that allows to disable decoding/encoding path in uri in URL engine. [#52337](https://github.com/ClickHouse/ClickHouse/pull/52337) ([Kruglov Pavel](https://github.com/Avogar)).
#### Performance Improvement
* Writing parquet files is 10x faster, it's multi-threaded now. Almost the same speed as reading. [#49367](https://github.com/ClickHouse/ClickHouse/pull/49367) ([Michael Kolupaev](https://github.com/al13n321)).
* Enable automatic selection of the sparse serialization format by default. It improves performance. The format is supported since version 22.1. After this change, downgrading to versions older than 22.1 might not be possible. You can turn off the usage of the sparse serialization format by providing the `ratio_of_defaults_for_sparse_serialization = 1` setting for your MergeTree tables. [#49631](https://github.com/ClickHouse/ClickHouse/pull/49631) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
* Enable `move_all_conditions_to_prewhere` and `enable_multiple_prewhere_read_steps` settings by default. [#46365](https://github.com/ClickHouse/ClickHouse/pull/46365) ([Alexander Gololobov](https://github.com/davenger)).
* Improves performance of some queries by tuning allocator. [#46416](https://github.com/ClickHouse/ClickHouse/pull/46416) ([Azat Khuzhin](https://github.com/azat)).
@ -114,6 +113,7 @@
* Now interserver port will be closed only after tables are shut down. [#52498](https://github.com/ClickHouse/ClickHouse/pull/52498) ([alesapin](https://github.com/alesapin)).
#### Experimental Feature
* Writing parquet files is 10x faster, it's multi-threaded now. Almost the same speed as reading. [#49367](https://github.com/ClickHouse/ClickHouse/pull/49367) ([Michael Kolupaev](https://github.com/al13n321)). This is controlled by the setting `output_format_parquet_use_custom_encoder` which is disabled by default, because the feature is non-ideal.
* Added support for [PRQL](https://prql-lang.org/) as a query language. [#50686](https://github.com/ClickHouse/ClickHouse/pull/50686) ([János Benjamin Antal](https://github.com/antaljanosbenjamin)).
* Allow to add disk name for custom disks. Previously custom disks would use an internal generated disk name. Now it will be possible with `disk = disk_<name>(...)` (e.g. disk will have name `name`) . [#51552](https://github.com/ClickHouse/ClickHouse/pull/51552) ([Kseniia Sumarokova](https://github.com/kssenii)). This syntax can be changed in this release.
* (experimental MaterializedMySQL) Fixed crash when `mysqlxx::Pool::Entry` is used after it was disconnected. [#52063](https://github.com/ClickHouse/ClickHouse/pull/52063) ([Val Doroshchuk](https://github.com/valbok)).

View File

@ -32,7 +32,7 @@ RUN arch=${TARGETARCH:-amd64} \
esac
ARG REPOSITORY="https://s3.amazonaws.com/clickhouse-builds/22.4/31c367d3cd3aefd316778601ff6565119fe36682/package_release"
ARG VERSION="23.7.3.14"
ARG VERSION="23.7.4.5"
ARG PACKAGES="clickhouse-keeper"
# user/group precreated explicitly with fixed uid/gid on purpose.

View File

@ -101,6 +101,7 @@ RUN add-apt-repository ppa:ubuntu-toolchain-r/test --yes \
python3-boto3 \
yasm \
zstd \
jq \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists

View File

@ -59,7 +59,7 @@ if [ "$BUILD_MUSL_KEEPER" == "1" ]
then
# build keeper with musl separately
# and without rust bindings
cmake --debug-trycompile -DENABLE_RUST=OFF -DBUILD_STANDALONE_KEEPER=1 -DENABLE_CLICKHOUSE_KEEPER=1 -DCMAKE_VERBOSE_MAKEFILE=1 -DUSE_MUSL=1 -LA -DCMAKE_TOOLCHAIN_FILE=/build/cmake/linux/toolchain-x86_64-musl.cmake "-DCMAKE_BUILD_TYPE=$BUILD_TYPE" "-DSANITIZE=$SANITIZER" -DENABLE_CHECK_HEAVY_BUILDS=1 "${CMAKE_FLAGS[@]}" ..
cmake --debug-trycompile -DENABLE_RUST=OFF -DBUILD_STANDALONE_KEEPER=1 -DENABLE_CLICKHOUSE_KEEPER=1 -DCMAKE_VERBOSE_MAKEFILE=1 -DUSE_MUSL=1 -LA -DCMAKE_TOOLCHAIN_FILE=/build/cmake/linux/toolchain-x86_64-musl.cmake "-DCMAKE_BUILD_TYPE=$BUILD_TYPE" "-DSANITIZE=$SANITIZER" -DENABLE_CHECK_HEAVY_BUILDS=1 -DENABLE_BUILD_PROFILING=1 "${CMAKE_FLAGS[@]}" ..
# shellcheck disable=SC2086 # No quotes because I want it to expand to nothing if empty.
ninja $NINJA_FLAGS clickhouse-keeper
@ -74,10 +74,10 @@ then
rm -f CMakeCache.txt
# Build the rest of binaries
cmake --debug-trycompile -DBUILD_STANDALONE_KEEPER=0 -DCREATE_KEEPER_SYMLINK=0 -DCMAKE_VERBOSE_MAKEFILE=1 -LA "-DCMAKE_BUILD_TYPE=$BUILD_TYPE" "-DSANITIZE=$SANITIZER" -DENABLE_CHECK_HEAVY_BUILDS=1 "${CMAKE_FLAGS[@]}" ..
cmake --debug-trycompile -DBUILD_STANDALONE_KEEPER=0 -DCREATE_KEEPER_SYMLINK=0 -DCMAKE_VERBOSE_MAKEFILE=1 -LA "-DCMAKE_BUILD_TYPE=$BUILD_TYPE" "-DSANITIZE=$SANITIZER" -DENABLE_CHECK_HEAVY_BUILDS=1 -DENABLE_BUILD_PROFILING=1 "${CMAKE_FLAGS[@]}" ..
else
# Build everything
cmake --debug-trycompile -DCMAKE_VERBOSE_MAKEFILE=1 -LA "-DCMAKE_BUILD_TYPE=$BUILD_TYPE" "-DSANITIZE=$SANITIZER" -DENABLE_CHECK_HEAVY_BUILDS=1 "${CMAKE_FLAGS[@]}" ..
cmake --debug-trycompile -DCMAKE_VERBOSE_MAKEFILE=1 -LA "-DCMAKE_BUILD_TYPE=$BUILD_TYPE" "-DSANITIZE=$SANITIZER" -DENABLE_CHECK_HEAVY_BUILDS=1 -DENABLE_BUILD_PROFILING=1 "${CMAKE_FLAGS[@]}" ..
fi
# No quotes because I want it to expand to nothing if empty.
@ -181,4 +181,11 @@ then
tar -cv -I pixz -f /output/ccache.log.txz "$CCACHE_LOGFILE"
fi
# Prepare profile info (time-trace)
mkdir -p profile-tmp
../utils/prepare-time-trace/prepare-time-trace.sh . profile-tmp
find profile-tmp -type f -print0 | xargs -0 cat > /profile/profile.json
wc -c /profile/profile.json
ls -l /output

View File

@ -78,11 +78,14 @@ def run_docker_image_with_env(
image_name: str,
as_root: bool,
output_dir: Path,
profile_dir: Path,
env_variables: List[str],
ch_root: Path,
ccache_dir: Optional[Path],
):
output_dir.mkdir(parents=True, exist_ok=True)
profile_dir.mkdir(parents=True, exist_ok=True)
env_part = " -e ".join(env_variables)
if env_part:
env_part = " -e " + env_part
@ -103,7 +106,7 @@ def run_docker_image_with_env(
cmd = (
f"docker run --network=host --user={user} --rm {ccache_mount}"
f"--volume={output_dir}:/output --volume={ch_root}:/build {env_part} "
f"--volume={output_dir}:/output --volume={ch_root}:/build --volume={profile_dir}:/profile {env_part} "
f"{interactive} {image_name}"
)
@ -361,6 +364,7 @@ def parse_args() -> argparse.Namespace:
help="ClickHouse git repository",
)
parser.add_argument("--output-dir", type=dir_name, required=True)
parser.add_argument("--profile-dir", type=dir_name, required=True)
parser.add_argument("--debug-build", action="store_true")
parser.add_argument(
@ -488,6 +492,7 @@ def main():
image_with_version,
args.as_root,
args.output_dir,
args.profile_dir,
env_prepared,
ch_root,
args.ccache_dir,

View File

@ -33,7 +33,7 @@ RUN arch=${TARGETARCH:-amd64} \
# lts / testing / prestable / etc
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
ARG VERSION="23.7.3.14"
ARG VERSION="23.7.4.5"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# user/group precreated explicitly with fixed uid/gid on purpose.

View File

@ -23,7 +23,7 @@ RUN sed -i "s|http://archive.ubuntu.com|${apt_archive}|g" /etc/apt/sources.list
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="deb [signed-by=/usr/share/keyrings/clickhouse-keyring.gpg] https://packages.clickhouse.com/deb ${REPO_CHANNEL} main"
ARG VERSION="23.7.3.14"
ARG VERSION="23.7.4.5"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# set non-empty deb_location_url url to create a docker image

View File

@ -0,0 +1,17 @@
---
sidebar_position: 1
sidebar_label: 2023
---
# 2023 Changelog
### ClickHouse release v23.7.4.5-stable (bd2fcd44553) FIXME as compared to v23.7.3.14-stable (bd9a510550c)
#### Bug Fix (user-visible misbehavior in an official stable release)
* Disable the new parquet encoder [#53130](https://github.com/ClickHouse/ClickHouse/pull/53130) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Revert changes in `ZstdDeflatingAppendableWriteBuffer` [#53111](https://github.com/ClickHouse/ClickHouse/pull/53111) ([Antonio Andelic](https://github.com/antonio2368)).

View File

@ -42,20 +42,20 @@ sudo apt-get install git cmake ccache python3 ninja-build nasm yasm gawk lsb-rel
### Install and Use the Clang compiler
On Ubuntu/Debian you can use LLVM's automatic installation script, see [here](https://apt.llvm.org/).
On Ubuntu/Debian, you can use LLVM's automatic installation script; see [here](https://apt.llvm.org/).
``` bash
sudo bash -c "$(wget -O - https://apt.llvm.org/llvm.sh)"
```
Note: in case of troubles, you can also use this:
Note: in case of trouble, you can also use this:
```bash
sudo apt-get install software-properties-common
sudo add-apt-repository -y ppa:ubuntu-toolchain-r/test
```
For other Linux distribution - check the availability of LLVM's [prebuild packages](https://releases.llvm.org/download.html).
For other Linux distributions - check the availability of LLVM's [prebuild packages](https://releases.llvm.org/download.html).
As of April 2023, clang-16 or higher will work.
GCC as a compiler is not supported.
@ -92,8 +92,12 @@ cmake -S . -B build
cmake --build build # or: `cd build; ninja`
```
:::tip
In case `cmake` isn't able to detect the number of available logical cores, the build will be done by one thread. To overcome this, you can tweak `cmake` to use a specific number of threads with `-j` flag, for example, `cmake --build build -j 16`. Alternatively, you can generate build files with a specific number of jobs in advance to avoid always setting the flag: `cmake -DPARALLEL_COMPILE_JOBS=16 -S . -B build`, where `16` is the desired number of threads.
:::
To create an executable, run `cmake --build build --target clickhouse` (or: `cd build; ninja clickhouse`).
This will create executable `build/programs/clickhouse` which can be used with `client` or `server` arguments.
This will create an executable `build/programs/clickhouse`, which can be used with `client` or `server` arguments.
## Building on Any Linux {#how-to-build-clickhouse-on-any-linux}
@ -107,7 +111,7 @@ The build requires the following components:
- Yasm
- Gawk
If all the components are installed, you may build in the same way as the steps above.
If all the components are installed, you may build it in the same way as the steps above.
Example for OpenSUSE Tumbleweed:
@ -123,7 +127,7 @@ Example for Fedora Rawhide:
``` bash
sudo yum update
sudo yum --nogpg install git cmake make clang python3 ccache nasm yasm gawk
sudo yum --nogpg install git cmake make clang python3 ccache lld nasm yasm gawk
git clone --recursive https://github.com/ClickHouse/ClickHouse.git
mkdir build
cmake -S . -B build

View File

@ -7,6 +7,10 @@ pagination_next: en/operations/settings/settings
# Settings Overview
:::note
XML-based Settings Profiles and [configuration files](https://clickhouse.com/docs/en/operations/configuration-files) are currently not supported for ClickHouse Cloud. To specify settings for your ClickHouse Cloud service, you must use [SQL-driven Settings Profiles](https://clickhouse.com/docs/en/operations/access-rights#settings-profiles-management).
:::
There are two main groups of ClickHouse settings:
- Global server settings

View File

@ -0,0 +1,646 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include <Common/HashTable/HashMap.h>
#include <Common/SymbolIndex.h>
#include <Common/ArenaAllocator.h>
#include <Core/Settings.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
#include <filesystem>
namespace DB
{
namespace ErrorCodes
{
extern const int FUNCTION_NOT_ALLOWED;
extern const int NOT_IMPLEMENTED;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
struct AggregateFunctionFlameGraphTree
{
struct ListNode;
struct TreeNode
{
TreeNode * parent = nullptr;
ListNode * children = nullptr;
UInt64 ptr = 0;
size_t allocated = 0;
};
struct ListNode
{
ListNode * next = nullptr;
TreeNode * child = nullptr;
};
TreeNode root;
static ListNode * createChild(TreeNode * parent, UInt64 ptr, Arena * arena)
{
ListNode * list_node = reinterpret_cast<ListNode *>(arena->alloc(sizeof(ListNode)));
TreeNode * tree_node = reinterpret_cast<TreeNode *>(arena->alloc(sizeof(TreeNode)));
list_node->child = tree_node;
list_node->next = nullptr;
tree_node->parent =parent;
tree_node->children = nullptr;
tree_node->ptr = ptr;
tree_node->allocated = 0;
return list_node;
}
TreeNode * find(const UInt64 * stack, size_t stack_size, Arena * arena)
{
TreeNode * node = &root;
for (size_t i = 0; i < stack_size; ++i)
{
UInt64 ptr = stack[i];
if (ptr == 0)
break;
if (!node->children)
{
node->children = createChild(node, ptr, arena);
node = node->children->child;
}
else
{
ListNode * list = node->children;
while (list->child->ptr != ptr && list->next)
list = list->next;
if (list->child->ptr != ptr)
{
list->next = createChild(node, ptr, arena);
list = list->next;
}
node = list->child;
}
}
return node;
}
static void append(DB::PaddedPODArray<UInt64> & values, DB::PaddedPODArray<UInt64> & offsets, std::vector<UInt64> & frame)
{
UInt64 prev = offsets.empty() ? 0 : offsets.back();
offsets.push_back(prev + frame.size());
for (UInt64 val : frame)
values.push_back(val);
}
struct Trace
{
using Frames = std::vector<UInt64>;
Frames frames;
/// The total number of bytes allocated for traces with the same prefix.
size_t allocated_total = 0;
/// This counter is relevant in case we want to filter some traces with small amount of bytes.
/// It shows the total number of bytes for *filtered* traces with the same prefix.
/// This is the value which is used in flamegraph.
size_t allocated_self = 0;
};
using Traces = std::vector<Trace>;
Traces dump(size_t max_depth, size_t min_bytes) const
{
Traces traces;
Trace::Frames frames;
std::vector<size_t> allocated_total;
std::vector<size_t> allocated_self;
std::vector<ListNode *> nodes;
nodes.push_back(root.children);
allocated_total.push_back(root.allocated);
allocated_self.push_back(root.allocated);
while (!nodes.empty())
{
if (nodes.back() == nullptr)
{
traces.push_back({frames, allocated_total.back(), allocated_self.back()});
nodes.pop_back();
allocated_total.pop_back();
allocated_self.pop_back();
/// We don't have root's frame so framers are empty in the end.
if (!frames.empty())
frames.pop_back();
continue;
}
TreeNode * current = nodes.back()->child;
nodes.back() = nodes.back()->next;
bool enough_bytes = current->allocated >= min_bytes;
bool enough_depth = max_depth == 0 || nodes.size() < max_depth;
if (enough_bytes)
{
frames.push_back(current->ptr);
allocated_self.back() -= current->allocated;
if (enough_depth)
{
allocated_total.push_back(current->allocated);
allocated_self.push_back(current->allocated);
nodes.push_back(current->children);
}
else
{
traces.push_back({frames, current->allocated, current->allocated});
frames.pop_back();
}
}
}
return traces;
}
};
static void insertData(DB::PaddedPODArray<UInt8> & chars, DB::PaddedPODArray<UInt64> & offsets, const char * pos, size_t length)
{
const size_t old_size = chars.size();
const size_t new_size = old_size + length + 1;
chars.resize(new_size);
if (length)
memcpy(chars.data() + old_size, pos, length);
chars[old_size + length] = 0;
offsets.push_back(new_size);
}
/// Split str by line feed and write as separate row to ColumnString.
static void fillColumn(DB::PaddedPODArray<UInt8> & chars, DB::PaddedPODArray<UInt64> & offsets, const std::string & str)
{
size_t start = 0;
size_t end = 0;
size_t size = str.size();
while (end < size)
{
if (str[end] == '\n')
{
insertData(chars, offsets, str.data() + start, end - start);
start = end + 1;
}
++end;
}
if (start < end)
insertData(chars, offsets, str.data() + start, end - start);
}
void dumpFlameGraph(
const AggregateFunctionFlameGraphTree::Traces & traces,
DB::PaddedPODArray<UInt8> & chars,
DB::PaddedPODArray<UInt64> & offsets)
{
DB::WriteBufferFromOwnString out;
std::unordered_map<uintptr_t, size_t> mapping;
#if defined(__ELF__) && !defined(OS_FREEBSD)
const DB::SymbolIndex & symbol_index = DB::SymbolIndex::instance();
#endif
for (const auto & trace : traces)
{
if (trace.allocated_self == 0)
continue;
for (size_t i = 0; i < trace.frames.size(); ++i)
{
if (i)
out << ";";
const void * ptr = reinterpret_cast<const void *>(trace.frames[i]);
#if defined(__ELF__) && !defined(OS_FREEBSD)
if (const auto * symbol = symbol_index.findSymbol(ptr))
writeString(demangle(symbol->name), out);
else
DB::writePointerHex(ptr, out);
#else
DB::writePointerHex(ptr, out);
#endif
}
out << ' ' << trace.allocated_self << "\n";
}
fillColumn(chars, offsets, out.str());
}
struct AggregateFunctionFlameGraphData
{
struct Entry
{
AggregateFunctionFlameGraphTree::TreeNode * trace;
UInt64 size;
Entry * next = nullptr;
};
struct Pair
{
Entry * allocation = nullptr;
Entry * deallocation = nullptr;
};
using Entries = HashMap<UInt64, Pair>;
AggregateFunctionFlameGraphTree tree;
Entries entries;
Entry * free_list = nullptr;
Entry * alloc(Arena * arena)
{
if (free_list)
{
auto * res = free_list;
free_list = free_list->next;
return res;
}
return reinterpret_cast<Entry *>(arena->alloc(sizeof(Entry)));
}
void release(Entry * entry)
{
entry->next = free_list;
free_list = entry;
}
static void track(Entry * allocation)
{
auto * node = allocation->trace;
while (node)
{
node->allocated += allocation->size;
node = node->parent;
}
}
static void untrack(Entry * allocation)
{
auto * node = allocation->trace;
while (node)
{
node->allocated -= allocation->size;
node = node->parent;
}
}
static Entry * tryFindMatchAndRemove(Entry *& list, UInt64 size)
{
if (!list)
return nullptr;
if (list->size == size)
{
Entry * entry = list;
list = list->next;
return entry;
}
else
{
Entry * parent = list;
while (parent->next && parent->next->size != size)
parent = parent->next;
if (parent->next && parent->next->size == size)
{
Entry * entry = parent->next;
parent->next = entry->next;
return entry;
}
return nullptr;
}
}
void add(UInt64 ptr, Int64 size, const UInt64 * stack, size_t stack_size, Arena * arena)
{
/// In case if argument is nullptr, only track allocations.
if (ptr == 0)
{
if (size > 0)
{
auto * node = tree.find(stack, stack_size, arena);
Entry entry{.trace = node, .size = UInt64(size)};
track(&entry);
}
return;
}
auto & place = entries[ptr];
if (size > 0)
{
if (auto * deallocation = tryFindMatchAndRemove(place.deallocation, size))
{
release(deallocation);
}
else
{
auto * node = tree.find(stack, stack_size, arena);
auto * allocation = alloc(arena);
allocation->size = UInt64(size);
allocation->trace = node;
track(allocation);
allocation->next = place.allocation;
place.allocation = allocation;
}
}
else if (size < 0)
{
UInt64 abs_size = -size;
if (auto * allocation = tryFindMatchAndRemove(place.allocation, abs_size))
{
untrack(allocation);
release(allocation);
}
else
{
auto * deallocation = alloc(arena);
deallocation->size = abs_size;
deallocation->next = place.deallocation;
place.deallocation = deallocation;
}
}
}
void merge(const AggregateFunctionFlameGraphTree & other_tree, Arena * arena)
{
AggregateFunctionFlameGraphTree::Trace::Frames frames;
std::vector<AggregateFunctionFlameGraphTree::ListNode *> nodes;
nodes.push_back(other_tree.root.children);
while (!nodes.empty())
{
if (nodes.back() == nullptr)
{
nodes.pop_back();
/// We don't have root's frame so framers are empty in the end.
if (!frames.empty())
frames.pop_back();
continue;
}
AggregateFunctionFlameGraphTree::TreeNode * current = nodes.back()->child;
nodes.back() = nodes.back()->next;
frames.push_back(current->ptr);
if (current->children)
nodes.push_back(current->children);
else
{
if (current->allocated)
add(0, current->allocated, frames.data(), frames.size(), arena);
frames.pop_back();
}
}
}
void merge(const AggregateFunctionFlameGraphData & other, Arena * arena)
{
AggregateFunctionFlameGraphTree::Trace::Frames frames;
for (const auto & entry : other.entries)
{
for (auto * allocation = entry.value.second.allocation; allocation; allocation = allocation->next)
{
frames.clear();
const auto * node = allocation->trace;
while (node->ptr)
{
frames.push_back(node->ptr);
node = node->parent;
}
std::reverse(frames.begin(), frames.end());
add(entry.value.first, allocation->size, frames.data(), frames.size(), arena);
untrack(allocation);
}
for (auto * deallocation = entry.value.second.deallocation; deallocation; deallocation = deallocation->next)
{
add(entry.value.first, -Int64(deallocation->size), nullptr, 0, arena);
}
}
merge(other.tree, arena);
}
void dumpFlameGraph(
DB::PaddedPODArray<UInt8> & chars,
DB::PaddedPODArray<UInt64> & offsets,
size_t max_depth, size_t min_bytes) const
{
DB::dumpFlameGraph(tree.dump(max_depth, min_bytes), chars, offsets);
}
};
/// Aggregate function which builds a flamegraph using the list of stacktraces.
/// The output is an array of strings which can be used by flamegraph.pl util.
/// See https://github.com/brendangregg/FlameGraph
///
/// Syntax: flameGraph(traces, [size = 1], [ptr = 0])
/// - trace : Array(UInt64), a stacktrace
/// - size : Int64, an allocation size (for memory profiling)
/// - ptr : UInt64, an allocation address
/// In case if ptr != 0, a flameGraph will map allocations (size > 0) and deallocations (size < 0) with the same size and ptr.
/// Only allocations which were not freed are shown. Not mapped deallocations are ignored.
///
/// Usage:
///
/// * Build a flamegraph based on CPU query profiler
/// set query_profiler_cpu_time_period_ns=10000000;
/// SELECT SearchPhrase, COUNT(DISTINCT UserID) AS u FROM hits WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY u DESC LIMIT 10;
/// clickhouse client --allow_introspection_functions=1
/// -q "select arrayJoin(flameGraph(arrayReverse(trace))) from system.trace_log where trace_type = 'CPU' and query_id = 'xxx'"
/// | ~/dev/FlameGraph/flamegraph.pl > flame_cpu.svg
///
/// * Build a flamegraph based on memory query profiler, showing all allocations
/// set memory_profiler_sample_probability=1, max_untracked_memory=1;
/// SELECT SearchPhrase, COUNT(DISTINCT UserID) AS u FROM hits WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY u DESC LIMIT 10;
/// clickhouse client --allow_introspection_functions=1
/// -q "select arrayJoin(flameGraph(trace, size)) from system.trace_log where trace_type = 'MemorySample' and query_id = 'xxx'"
/// | ~/dev/FlameGraph/flamegraph.pl --countname=bytes --color=mem > flame_mem.svg
///
/// * Build a flamegraph based on memory query profiler, showing allocations which were not deallocated in query context
/// set memory_profiler_sample_probability=1, max_untracked_memory=1, use_uncompressed_cache=1, merge_tree_max_rows_to_use_cache=100000000000, merge_tree_max_bytes_to_use_cache=1000000000000;
/// SELECT SearchPhrase, COUNT(DISTINCT UserID) AS u FROM hits WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY u DESC LIMIT 10;
/// clickhouse client --allow_introspection_functions=1
/// -q "select arrayJoin(flameGraph(trace, size, ptr)) from system.trace_log where trace_type = 'MemorySample' and query_id = 'xxx'"
/// | ~/dev/FlameGraph/flamegraph.pl --countname=bytes --color=mem > flame_mem_untracked.svg
///
/// * Build a flamegraph based on memory query profiler, showing active allocations at the fixed point of time
/// set memory_profiler_sample_probability=1, max_untracked_memory=1;
/// SELECT SearchPhrase, COUNT(DISTINCT UserID) AS u FROM hits WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY u DESC LIMIT 10;
/// 1. Memory usage per second
/// select event_time, m, formatReadableSize(max(s) as m) from (select event_time, sum(size) over (order by event_time) as s from system.trace_log where query_id = 'xxx' and trace_type = 'MemorySample') group by event_time order by event_time;
/// 2. Find a time point with maximal memory usage
/// select argMax(event_time, s), max(s) from (select event_time, sum(size) over (order by event_time) as s from system.trace_log where query_id = 'xxx' and trace_type = 'MemorySample');
/// 3. Fix active allocations at fixed point of time
/// clickhouse client --allow_introspection_functions=1
/// -q "select arrayJoin(flameGraph(trace, size, ptr)) from (select * from system.trace_log where trace_type = 'MemorySample' and query_id = 'xxx' and event_time <= 'yyy' order by event_time)"
/// | ~/dev/FlameGraph/flamegraph.pl --countname=bytes --color=mem > flame_mem_time_point_pos.svg
/// 4. Find deallocations at fixed point of time
/// clickhouse client --allow_introspection_functions=1
/// -q "select arrayJoin(flameGraph(trace, -size, ptr)) from (select * from system.trace_log where trace_type = 'MemorySample' and query_id = 'xxx' and event_time > 'yyy' order by event_time desc)"
/// | ~/dev/FlameGraph/flamegraph.pl --countname=bytes --color=mem > flame_mem_time_point_neg.svg
class AggregateFunctionFlameGraph final : public IAggregateFunctionDataHelper<AggregateFunctionFlameGraphData, AggregateFunctionFlameGraph>
{
public:
explicit AggregateFunctionFlameGraph(const DataTypes & argument_types_)
: IAggregateFunctionDataHelper<AggregateFunctionFlameGraphData, AggregateFunctionFlameGraph>(argument_types_, {}, createResultType())
{}
String getName() const override { return "flameGraph"; }
static DataTypePtr createResultType()
{
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>());
}
bool allocatesMemoryInArena() const override { return true; }
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{
const auto & trace = assert_cast<const ColumnArray &>(*columns[0]);
const auto & trace_offsets = trace.getOffsets();
const auto & trace_values = assert_cast<const ColumnUInt64 &>(trace.getData()).getData();
UInt64 prev_offset = 0;
if (row_num)
prev_offset = trace_offsets[row_num - 1];
UInt64 trace_size = trace_offsets[row_num] - prev_offset;
Int64 allocated = 1;
if (argument_types.size() >= 2)
{
const auto & sizes = assert_cast<const ColumnInt64 &>(*columns[1]).getData();
allocated = sizes[row_num];
}
UInt64 ptr = 0;
if (argument_types.size() >= 3)
{
const auto & ptrs = assert_cast<const ColumnUInt64 &>(*columns[2]).getData();
ptr = ptrs[row_num];
}
this->data(place).add(ptr, allocated, trace_values.data() + prev_offset, trace_size, arena);
}
void addManyDefaults(
AggregateDataPtr __restrict /*place*/,
const IColumn ** /*columns*/,
size_t /*length*/,
Arena * /*arena*/) const override
{
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).merge(this->data(rhs), arena);
}
void serialize(ConstAggregateDataPtr __restrict, WriteBuffer &, std::optional<size_t> /* version */) const override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Serialization for function flameGraph is not implemented.");
}
void deserialize(AggregateDataPtr __restrict, ReadBuffer &, std::optional<size_t> /* version */, Arena *) const override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Deserialization for function flameGraph is not implemented.");
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
{
auto & array = assert_cast<ColumnArray &>(to);
auto & str = assert_cast<ColumnString &>(array.getData());
this->data(place).dumpFlameGraph(str.getChars(), str.getOffsets(), 0, 0);
array.getOffsets().push_back(str.size());
}
};
static void check(const std::string & name, const DataTypes & argument_types, const Array & params)
{
assertNoParameters(name, params);
if (argument_types.empty() || argument_types.size() > 3)
throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Aggregate function {} requires 1 to 3 arguments : trace, [size = 1], [ptr = 0]",
name);
auto ptr_type = std::make_shared<DataTypeUInt64>();
auto trace_type = std::make_shared<DataTypeArray>(ptr_type);
auto size_type = std::make_shared<DataTypeInt64>();
if (!argument_types[0]->equals(*trace_type))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"First argument (trace) for function {} must be Array(UInt64), but it has type {}",
name, argument_types[0]->getName());
if (argument_types.size() >= 2 && !argument_types[1]->equals(*size_type))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Second argument (size) for function {} must be Int64, but it has type {}",
name, argument_types[1]->getName());
if (argument_types.size() >= 3 && !argument_types[2]->equals(*ptr_type))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Third argument (ptr) for function {} must be UInt64, but it has type {}",
name, argument_types[2]->getName());
}
AggregateFunctionPtr createAggregateFunctionFlameGraph(const std::string & name, const DataTypes & argument_types, const Array & params, const Settings * settings)
{
if (!settings->allow_introspection_functions)
throw Exception(ErrorCodes::FUNCTION_NOT_ALLOWED,
"Introspection functions are disabled, because setting 'allow_introspection_functions' is set to 0");
check(name, argument_types, params);
return std::make_shared<AggregateFunctionFlameGraph>(argument_types);
}
void registerAggregateFunctionFlameGraph(AggregateFunctionFactory & factory)
{
AggregateFunctionProperties properties = { .returns_default_when_only_null = true, .is_order_dependent = true };
factory.registerFunction("flameGraph", { createAggregateFunctionFlameGraph, properties });
}
}

View File

@ -80,6 +80,7 @@ void registerAggregateFunctionExponentialMovingAverage(AggregateFunctionFactory
void registerAggregateFunctionSparkbar(AggregateFunctionFactory &);
void registerAggregateFunctionIntervalLengthSum(AggregateFunctionFactory &);
void registerAggregateFunctionAnalysisOfVariance(AggregateFunctionFactory &);
void registerAggregateFunctionFlameGraph(AggregateFunctionFactory &);
void registerAggregateFunctionKolmogorovSmirnovTest(AggregateFunctionFactory & factory);
class AggregateFunctionCombinatorFactory;
@ -173,6 +174,7 @@ void registerAggregateFunctions()
registerAggregateFunctionExponentialMovingAverage(factory);
registerAggregateFunctionSparkbar(factory);
registerAggregateFunctionAnalysisOfVariance(factory);
registerAggregateFunctionFlameGraph(factory);
registerAggregateFunctionKolmogorovSmirnovTest(factory);
registerWindowFunctions(factory);

View File

@ -70,10 +70,13 @@ Block createBlockFromCollection(const Collection & collection, const DataTypes &
{
if (columns_size == 1)
{
auto field = convertFieldToType(value, *block_types[0]);
auto field = convertFieldToTypeStrict(value, *block_types[0]);
if (!field)
continue;
bool need_insert_null = transform_null_in && block_types[0]->isNullable();
if (!field.isNull() || need_insert_null)
columns[0]->insert(std::move(field));
if (!field->isNull() || need_insert_null)
columns[0]->insert(*field);
continue;
}
@ -98,7 +101,11 @@ Block createBlockFromCollection(const Collection & collection, const DataTypes &
size_t i = 0;
for (; i < tuple_size; ++i)
{
tuple_values[i] = convertFieldToType(tuple[i], *block_types[i]);
auto converted_field = convertFieldToTypeStrict(tuple[i], *block_types[i]);
if (!converted_field)
break;
tuple_values[i] = std::move(*converted_field);
bool need_insert_null = transform_null_in && block_types[i]->isNullable();
if (tuple_values[i].isNull() && !need_insert_null)
break;

View File

@ -0,0 +1,34 @@
#pragma once
#include <cstddef>
#include <base/defines.h>
/// This is a structure which is returned by MemoryTracker.
/// Methods onAlloc/onFree should be called after actual memory allocation if it succeed.
/// For now, it will only collect allocation trace with sample_probability.
struct AllocationTrace
{
AllocationTrace() = default;
explicit AllocationTrace(double sample_probability_) : sample_probability(sample_probability_) {}
ALWAYS_INLINE void onAlloc(void * ptr, size_t size) const
{
if (likely(sample_probability <= 0))
return;
onAllocImpl(ptr, size);
}
ALWAYS_INLINE void onFree(void * ptr, size_t size) const
{
if (likely(sample_probability <= 0))
return;
onFreeImpl(ptr, size);
}
private:
double sample_probability = 0;
void onAllocImpl(void * ptr, size_t size) const;
void onFreeImpl(void * ptr, size_t size) const;
};

View File

@ -99,8 +99,10 @@ public:
void * alloc(size_t size, size_t alignment = 0)
{
checkSize(size);
CurrentMemoryTracker::alloc(size);
return allocNoTrack(size, alignment);
auto trace = CurrentMemoryTracker::alloc(size);
void * ptr = allocNoTrack(size, alignment);
trace.onAlloc(ptr, size);
return ptr;
}
/// Free memory range.
@ -110,7 +112,8 @@ public:
{
checkSize(size);
freeNoTrack(buf, size);
CurrentMemoryTracker::free(size);
auto trace = CurrentMemoryTracker::free(size);
trace.onFree(buf, size);
}
catch (...)
{
@ -136,13 +139,17 @@ public:
&& alignment <= MALLOC_MIN_ALIGNMENT)
{
/// Resize malloc'd memory region with no special alignment requirement.
CurrentMemoryTracker::realloc(old_size, new_size);
auto trace_free = CurrentMemoryTracker::free(old_size);
auto trace_alloc = CurrentMemoryTracker::alloc(new_size);
trace_free.onFree(buf, old_size);
void * new_buf = ::realloc(buf, new_size);
if (nullptr == new_buf)
DB::throwFromErrno(fmt::format("Allocator: Cannot realloc from {} to {}.", ReadableSize(old_size), ReadableSize(new_size)), DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
buf = new_buf;
trace_alloc.onAlloc(buf, new_size);
if constexpr (clear_memory)
if (new_size > old_size)
memset(reinterpret_cast<char *>(buf) + old_size, 0, new_size - old_size);
@ -150,7 +157,9 @@ public:
else if (old_size >= MMAP_THRESHOLD && new_size >= MMAP_THRESHOLD)
{
/// Resize mmap'd memory region.
CurrentMemoryTracker::realloc(old_size, new_size);
auto trace_free = CurrentMemoryTracker::free(old_size);
auto trace_alloc = CurrentMemoryTracker::alloc(new_size);
trace_free.onFree(buf, old_size);
// On apple and freebsd self-implemented mremap used (common/mremap.h)
buf = clickhouse_mremap(buf, old_size, new_size, MREMAP_MAYMOVE,
@ -160,13 +169,17 @@ public:
ReadableSize(old_size), ReadableSize(new_size)), DB::ErrorCodes::CANNOT_MREMAP);
/// No need for zero-fill, because mmap guarantees it.
trace_alloc.onAlloc(buf, new_size);
}
else if (new_size < MMAP_THRESHOLD)
{
/// Small allocs that requires a copy. Assume there's enough memory in system. Call CurrentMemoryTracker once.
CurrentMemoryTracker::realloc(old_size, new_size);
auto trace_free = CurrentMemoryTracker::free(old_size);
auto trace_alloc = CurrentMemoryTracker::alloc(new_size);
trace_free.onFree(buf, old_size);
void * new_buf = allocNoTrack(new_size, alignment);
trace_alloc.onAlloc(buf, new_size);
memcpy(new_buf, buf, std::min(old_size, new_size));
freeNoTrack(buf, old_size);
buf = new_buf;

View File

@ -30,21 +30,24 @@ struct AllocatorWithMemoryTracking
throw std::bad_alloc();
size_t bytes = n * sizeof(T);
CurrentMemoryTracker::alloc(bytes);
auto trace = CurrentMemoryTracker::alloc(bytes);
T * p = static_cast<T *>(malloc(bytes));
if (!p)
throw std::bad_alloc();
trace.onAlloc(p, bytes);
return p;
}
void deallocate(T * p, size_t n) noexcept
{
free(p);
size_t bytes = n * sizeof(T);
CurrentMemoryTracker::free(bytes);
free(p);
auto trace = CurrentMemoryTracker::free(bytes);
trace.onFree(p, bytes);
}
};

View File

@ -37,7 +37,7 @@ MemoryTracker * getMemoryTracker()
using DB::current_thread;
void CurrentMemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
AllocationTrace CurrentMemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
{
#ifdef MEMORY_TRACKER_DEBUG_CHECKS
if (unlikely(memory_tracker_always_throw_logical_error_on_allocation))
@ -55,8 +55,9 @@ void CurrentMemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
if (will_be > current_thread->untracked_memory_limit)
{
memory_tracker->allocImpl(will_be, throw_if_memory_exceeded);
auto res = memory_tracker->allocImpl(will_be, throw_if_memory_exceeded);
current_thread->untracked_memory = 0;
return res;
}
else
{
@ -68,36 +69,34 @@ void CurrentMemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
/// total_memory_tracker only, ignore untracked_memory
else
{
memory_tracker->allocImpl(size, throw_if_memory_exceeded);
return memory_tracker->allocImpl(size, throw_if_memory_exceeded);
}
return AllocationTrace(memory_tracker->getSampleProbability(size));
}
return AllocationTrace(0);
}
void CurrentMemoryTracker::check()
{
if (auto * memory_tracker = getMemoryTracker())
memory_tracker->allocImpl(0, true);
std::ignore = memory_tracker->allocImpl(0, true);
}
void CurrentMemoryTracker::alloc(Int64 size)
AllocationTrace CurrentMemoryTracker::alloc(Int64 size)
{
bool throw_if_memory_exceeded = true;
allocImpl(size, throw_if_memory_exceeded);
return allocImpl(size, throw_if_memory_exceeded);
}
void CurrentMemoryTracker::allocNoThrow(Int64 size)
AllocationTrace CurrentMemoryTracker::allocNoThrow(Int64 size)
{
bool throw_if_memory_exceeded = false;
allocImpl(size, throw_if_memory_exceeded);
return allocImpl(size, throw_if_memory_exceeded);
}
void CurrentMemoryTracker::realloc(Int64 old_size, Int64 new_size)
{
Int64 addition = new_size - old_size;
addition > 0 ? alloc(addition) : free(-addition);
}
void CurrentMemoryTracker::free(Int64 size)
AllocationTrace CurrentMemoryTracker::free(Int64 size)
{
if (auto * memory_tracker = getMemoryTracker())
{
@ -106,16 +105,21 @@ void CurrentMemoryTracker::free(Int64 size)
current_thread->untracked_memory -= size;
if (current_thread->untracked_memory < -current_thread->untracked_memory_limit)
{
memory_tracker->free(-current_thread->untracked_memory);
Int64 untracked_memory = current_thread->untracked_memory;
current_thread->untracked_memory = 0;
return memory_tracker->free(-untracked_memory);
}
}
/// total_memory_tracker only, ignore untracked_memory
else
{
memory_tracker->free(size);
return memory_tracker->free(size);
}
return AllocationTrace(memory_tracker->getSampleProbability(size));
}
return AllocationTrace(0);
}
void CurrentMemoryTracker::injectFault()

View File

@ -1,22 +1,22 @@
#pragma once
#include <base/types.h>
#include <Common/AllocationTrace.h>
/// Convenience methods, that use current thread's memory_tracker if it is available.
struct CurrentMemoryTracker
{
/// Call the following functions before calling of corresponding operations with memory allocators.
static void alloc(Int64 size);
static void allocNoThrow(Int64 size);
static void realloc(Int64 old_size, Int64 new_size);
[[nodiscard]] static AllocationTrace alloc(Int64 size);
[[nodiscard]] static AllocationTrace allocNoThrow(Int64 size);
/// This function should be called after memory deallocation.
static void free(Int64 size);
[[nodiscard]] static AllocationTrace free(Int64 size);
static void check();
/// Throws MEMORY_LIMIT_EXCEEDED (if it's allowed to throw exceptions)
static void injectFault();
private:
static void allocImpl(Int64 size, bool throw_if_memory_exceeded);
[[nodiscard]] static AllocationTrace allocImpl(Int64 size, bool throw_if_memory_exceeded);
};

View File

@ -57,7 +57,8 @@ public:
}
/// Do not count guard page in memory usage.
CurrentMemoryTracker::alloc(num_pages * page_size);
auto trace = CurrentMemoryTracker::alloc(num_pages * page_size);
trace.onAlloc(vp, num_pages * page_size);
boost::context::stack_context sctx;
sctx.size = num_bytes;
@ -77,6 +78,7 @@ public:
::munmap(vp, sctx.size);
/// Do not count guard page in memory usage.
CurrentMemoryTracker::free(sctx.size - page_size);
auto trace = CurrentMemoryTracker::free(sctx.size - page_size);
trace.onFree(vp, sctx.size - page_size);
}
};

View File

@ -1,6 +1,7 @@
#include "MemoryTracker.h"
#include <IO/WriteHelpers.h>
#include <Common/HashTable/Hash.h>
#include <Common/VariableContext.h>
#include <Common/TraceSender.h>
#include <Common/Exception.h>
@ -82,6 +83,29 @@ inline std::string_view toDescription(OvercommitResult result)
}
}
bool shouldTrackAllocation(DB::Float64 probability, void * ptr)
{
return intHash64(uintptr_t(ptr)) < std::numeric_limits<uint64_t>::max() * probability;
}
}
void AllocationTrace::onAllocImpl(void * ptr, size_t size) const
{
if (sample_probability < 1 && !shouldTrackAllocation(sample_probability, ptr))
return;
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
DB::TraceSender::send(DB::TraceType::MemorySample, StackTrace(), {.size = Int64(size), .ptr = ptr});
}
void AllocationTrace::onFreeImpl(void * ptr, size_t size) const
{
if (sample_probability < 1 && !shouldTrackAllocation(sample_probability, ptr))
return;
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
DB::TraceSender::send(DB::TraceType::MemorySample, StackTrace(), {.size = -Int64(size), .ptr = ptr});
}
namespace ProfileEvents
@ -180,11 +204,17 @@ void MemoryTracker::debugLogBigAllocationWithoutCheck(Int64 size [[maybe_unused]
#endif
}
void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryTracker * query_tracker)
AllocationTrace MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryTracker * query_tracker, double _sample_probability)
{
if (size < 0)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Negative size ({}) is passed to MemoryTracker. It is a bug.", size);
if (_sample_probability < 0)
_sample_probability = sample_probability;
if (!isSizeOkForSampling(size))
_sample_probability = 0;
if (MemoryTrackerBlockerInThread::isBlocked(level))
{
if (level == VariableContext::Global)
@ -199,9 +229,12 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryT
/// Since the MemoryTrackerBlockerInThread should respect the level, we should go to the next parent.
if (auto * loaded_next = parent.load(std::memory_order_relaxed))
loaded_next->allocImpl(size, throw_if_memory_exceeded,
level == VariableContext::Process ? this : query_tracker);
return;
{
MemoryTracker * tracker = level == VariableContext::Process ? this : query_tracker;
return loaded_next->allocImpl(size, throw_if_memory_exceeded, tracker, _sample_probability);
}
return AllocationTrace(_sample_probability);
}
/** Using memory_order_relaxed means that if allocations are done simultaneously,
@ -228,14 +261,6 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryT
allocation_traced = true;
}
std::bernoulli_distribution sample(sample_probability);
if (unlikely(sample_probability > 0.0 && isSizeOkForSampling(size) && sample(thread_local_rng)))
{
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
DB::TraceSender::send(DB::TraceType::MemorySample, StackTrace(), {.size = size});
allocation_traced = true;
}
std::bernoulli_distribution fault(fault_probability);
if (unlikely(fault_probability > 0.0 && fault(thread_local_rng)))
{
@ -364,16 +389,20 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryT
}
if (auto * loaded_next = parent.load(std::memory_order_relaxed))
loaded_next->allocImpl(size, throw_if_memory_exceeded,
level == VariableContext::Process ? this : query_tracker);
{
MemoryTracker * tracker = level == VariableContext::Process ? this : query_tracker;
return loaded_next->allocImpl(size, throw_if_memory_exceeded, tracker, _sample_probability);
}
return AllocationTrace(_sample_probability);
}
void MemoryTracker::adjustWithUntrackedMemory(Int64 untracked_memory)
{
if (untracked_memory > 0)
allocImpl(untracked_memory, /*throw_if_memory_exceeded*/ false);
std::ignore = allocImpl(untracked_memory, /*throw_if_memory_exceeded*/ false);
else
free(-untracked_memory);
std::ignore = free(-untracked_memory);
}
bool MemoryTracker::updatePeak(Int64 will_be, bool log_memory_usage)
@ -392,9 +421,14 @@ bool MemoryTracker::updatePeak(Int64 will_be, bool log_memory_usage)
return false;
}
void MemoryTracker::free(Int64 size)
AllocationTrace MemoryTracker::free(Int64 size, double _sample_probability)
{
if (_sample_probability < 0)
_sample_probability = sample_probability;
if (!isSizeOkForSampling(size))
_sample_probability = 0;
if (MemoryTrackerBlockerInThread::isBlocked(level))
{
if (level == VariableContext::Global)
@ -408,15 +442,9 @@ void MemoryTracker::free(Int64 size)
/// Since the MemoryTrackerBlockerInThread should respect the level, we should go to the next parent.
if (auto * loaded_next = parent.load(std::memory_order_relaxed))
loaded_next->free(size);
return;
}
return loaded_next->free(size, _sample_probability);
std::bernoulli_distribution sample(sample_probability);
if (unlikely(sample_probability > 0.0 && isSizeOkForSampling(size) && sample(thread_local_rng)))
{
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
DB::TraceSender::send(DB::TraceType::MemorySample, StackTrace(), {.size = -size});
return AllocationTrace(_sample_probability);
}
Int64 accounted_size = size;
@ -444,12 +472,15 @@ void MemoryTracker::free(Int64 size)
if (auto * overcommit_tracker_ptr = overcommit_tracker.load(std::memory_order_relaxed))
overcommit_tracker_ptr->tryContinueQueryExecutionAfterFree(accounted_size);
if (auto * loaded_next = parent.load(std::memory_order_relaxed))
loaded_next->free(size);
/// free should never throw, we can update metric early.
auto metric_loaded = metric.load(std::memory_order_relaxed);
if (metric_loaded != CurrentMetrics::end())
CurrentMetrics::sub(metric_loaded, accounted_size);
if (auto * loaded_next = parent.load(std::memory_order_relaxed))
return loaded_next->free(size, _sample_probability);
return AllocationTrace(_sample_probability);
}
@ -534,6 +565,21 @@ void MemoryTracker::setOrRaiseProfilerLimit(Int64 value)
;
}
double MemoryTracker::getSampleProbability(UInt64 size)
{
if (sample_probability >= 0)
{
if (!isSizeOkForSampling(size))
return 0;
return sample_probability;
}
if (auto * loaded_next = parent.load(std::memory_order_relaxed))
return loaded_next->getSampleProbability(size);
return 0;
}
bool MemoryTracker::isSizeOkForSampling(UInt64 size) const
{
/// We can avoid comparison min_allocation_size_bytes with zero, because we cannot have 0 bytes allocation/deallocation

View File

@ -2,9 +2,11 @@
#include <atomic>
#include <chrono>
#include <optional>
#include <base/types.h>
#include <Common/CurrentMetrics.h>
#include <Common/VariableContext.h>
#include <Common/AllocationTrace.h>
#if !defined(NDEBUG)
#define MEMORY_TRACKER_DEBUG_CHECKS
@ -65,7 +67,7 @@ private:
double fault_probability = 0;
/// To randomly sample allocations and deallocations in trace_log.
double sample_probability = 0;
double sample_probability = -1;
/// Randomly sample allocations only larger or equal to this size
UInt64 min_allocation_size_bytes = 0;
@ -98,8 +100,8 @@ private:
/// allocImpl(...) and free(...) should not be used directly
friend struct CurrentMemoryTracker;
void allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryTracker * query_tracker = nullptr);
void free(Int64 size);
[[nodiscard]] AllocationTrace allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryTracker * query_tracker = nullptr, double _sample_probability = -1.0);
[[nodiscard]] AllocationTrace free(Int64 size, double _sample_probability = -1.0);
public:
static constexpr auto USAGE_EVENT_NAME = "MemoryTrackerUsage";
@ -174,6 +176,8 @@ public:
sample_probability = value;
}
double getSampleProbability(UInt64 size);
void setSampleMinAllocationSize(UInt64 value)
{
min_allocation_size_bytes = value;

View File

@ -28,4 +28,5 @@ public:
}
friend class MemoryTracker;
friend struct AllocationTrace;
};

View File

@ -33,6 +33,7 @@ void TraceSender::send(TraceType trace_type, const StackTrace & stack_trace, Ext
+ sizeof(TraceType) /// trace type
+ sizeof(UInt64) /// thread_id
+ sizeof(Int64) /// size
+ sizeof(void *) /// ptr
+ sizeof(ProfileEvents::Event) /// event
+ sizeof(ProfileEvents::Count); /// increment
@ -74,6 +75,7 @@ void TraceSender::send(TraceType trace_type, const StackTrace & stack_trace, Ext
writePODBinary(trace_type, out);
writePODBinary(thread_id, out);
writePODBinary(extras.size, out);
writePODBinary(UInt64(extras.ptr), out);
writePODBinary(extras.event, out);
writePODBinary(extras.increment, out);

View File

@ -28,8 +28,9 @@ class TraceSender
public:
struct Extras
{
/// size - for memory tracing is the amount of memory allocated; for other trace types it is 0.
/// size, ptr - for memory tracing is the amount of memory allocated; for other trace types it is 0.
Int64 size{};
void * ptr = nullptr;
/// Event type and increment for 'ProfileEvent' trace type; for other trace types defaults.
ProfileEvents::Event event{ProfileEvents::end()};
ProfileEvents::Count increment{};

View File

@ -9,7 +9,11 @@ extern "C" void * clickhouse_malloc(size_t size)
{
void * res = malloc(size);
if (res)
Memory::trackMemory(size);
{
AllocationTrace trace;
size_t actual_size = Memory::trackMemory(size, trace);
trace.onAlloc(res, actual_size);
}
return res;
}
@ -17,17 +21,29 @@ extern "C" void * clickhouse_calloc(size_t number_of_members, size_t size)
{
void * res = calloc(number_of_members, size);
if (res)
Memory::trackMemory(number_of_members * size);
{
AllocationTrace trace;
size_t actual_size = Memory::trackMemory(number_of_members * size, trace);
trace.onAlloc(res, actual_size);
}
return res;
}
extern "C" void * clickhouse_realloc(void * ptr, size_t size)
{
if (ptr)
Memory::untrackMemory(ptr);
{
AllocationTrace trace;
size_t actual_size = Memory::untrackMemory(ptr, trace);
trace.onFree(ptr, actual_size);
}
void * res = realloc(ptr, size);
if (res)
Memory::trackMemory(size);
{
AllocationTrace trace;
size_t actual_size = Memory::trackMemory(size, trace);
trace.onAlloc(res, actual_size);
}
return res;
}
@ -42,7 +58,9 @@ extern "C" void * clickhouse_reallocarray(void * ptr, size_t number_of_members,
extern "C" void clickhouse_free(void * ptr)
{
Memory::untrackMemory(ptr);
AllocationTrace trace;
size_t actual_size = Memory::untrackMemory(ptr, trace);
trace.onFree(ptr, actual_size);
free(ptr);
}
@ -50,6 +68,10 @@ extern "C" int clickhouse_posix_memalign(void ** memptr, size_t alignment, size_
{
int res = posix_memalign(memptr, alignment, size);
if (res == 0)
Memory::trackMemory(size);
{
AllocationTrace trace;
size_t actual_size = Memory::trackMemory(size, trace);
trace.onAlloc(*memptr, actual_size);
}
return res;
}

View File

@ -169,23 +169,26 @@ inline ALWAYS_INLINE size_t getActualAllocationSize(size_t size, TAlign... align
template <std::same_as<std::align_val_t>... TAlign>
requires DB::OptionalArgument<TAlign...>
inline ALWAYS_INLINE void trackMemory(std::size_t size, TAlign... align)
inline ALWAYS_INLINE size_t trackMemory(std::size_t size, AllocationTrace & trace, TAlign... align)
{
std::size_t actual_size = getActualAllocationSize(size, align...);
CurrentMemoryTracker::allocNoThrow(actual_size);
trace = CurrentMemoryTracker::allocNoThrow(actual_size);
return actual_size;
}
template <std::same_as<std::align_val_t>... TAlign>
requires DB::OptionalArgument<TAlign...>
inline ALWAYS_INLINE void untrackMemory(void * ptr [[maybe_unused]], std::size_t size [[maybe_unused]] = 0, TAlign... align [[maybe_unused]]) noexcept
inline ALWAYS_INLINE size_t untrackMemory(void * ptr [[maybe_unused]], AllocationTrace & trace, std::size_t size [[maybe_unused]] = 0, TAlign... align [[maybe_unused]]) noexcept
{
std::size_t actual_size = 0;
#if USE_GWP_ASAN
if (unlikely(GuardedAlloc.pointerIsMine(ptr)))
{
if (!size)
size = GuardedAlloc.getSize(ptr);
CurrentMemoryTracker::free(size);
return;
trace = CurrentMemoryTracker::free(size);
return size;
}
#endif
@ -197,23 +200,26 @@ inline ALWAYS_INLINE void untrackMemory(void * ptr [[maybe_unused]], std::size_t
if (likely(ptr != nullptr))
{
if constexpr (sizeof...(TAlign) == 1)
CurrentMemoryTracker::free(sallocx(ptr, MALLOCX_ALIGN(alignToSizeT(align...))));
actual_size = sallocx(ptr, MALLOCX_ALIGN(alignToSizeT(align...)));
else
CurrentMemoryTracker::free(sallocx(ptr, 0));
actual_size = sallocx(ptr, 0);
}
#else
if (size)
CurrentMemoryTracker::free(size);
actual_size = size;
# if defined(_GNU_SOURCE)
/// It's innaccurate resource free for sanitizers. malloc_usable_size() result is greater or equal to allocated size.
else
CurrentMemoryTracker::free(malloc_usable_size(ptr));
actual_size = malloc_usable_size(ptr);
# endif
#endif
trace = CurrentMemoryTracker::free(actual_size);
}
catch (...)
{
}
return actual_size;
}
}

View File

@ -71,50 +71,74 @@ static struct InitGwpAsan
void * operator new(std::size_t size)
{
Memory::trackMemory(size);
return Memory::newImpl(size);
AllocationTrace trace;
std::size_t actual_size = Memory::trackMemory(size, trace);
void * ptr = Memory::newImpl(size);
trace.onAlloc(ptr, actual_size);
return ptr;
}
void * operator new(std::size_t size, std::align_val_t align)
{
Memory::trackMemory(size, align);
return Memory::newImpl(size, align);
AllocationTrace trace;
std::size_t actual_size = Memory::trackMemory(size, trace, align);
void * ptr = Memory::newImpl(size, align);
trace.onAlloc(ptr, actual_size);
return ptr;
}
void * operator new[](std::size_t size)
{
Memory::trackMemory(size);
return Memory::newImpl(size);
AllocationTrace trace;
std::size_t actual_size = Memory::trackMemory(size, trace);
void * ptr = Memory::newImpl(size);
trace.onAlloc(ptr, actual_size);
return ptr;
}
void * operator new[](std::size_t size, std::align_val_t align)
{
Memory::trackMemory(size, align);
return Memory::newImpl(size, align);
AllocationTrace trace;
std::size_t actual_size = Memory::trackMemory(size, trace, align);
void * ptr = Memory::newImpl(size, align);
trace.onAlloc(ptr, actual_size);
return ptr;
}
void * operator new(std::size_t size, const std::nothrow_t &) noexcept
{
Memory::trackMemory(size);
return Memory::newNoExept(size);
AllocationTrace trace;
std::size_t actual_size = Memory::trackMemory(size, trace);
void * ptr = Memory::newNoExept(size);
trace.onAlloc(ptr, actual_size);
return ptr;
}
void * operator new[](std::size_t size, const std::nothrow_t &) noexcept
{
Memory::trackMemory(size);
return Memory::newNoExept(size);
AllocationTrace trace;
std::size_t actual_size = Memory::trackMemory(size, trace);
void * ptr = Memory::newNoExept(size);
trace.onAlloc(ptr, actual_size);
return ptr;
}
void * operator new(std::size_t size, std::align_val_t align, const std::nothrow_t &) noexcept
{
Memory::trackMemory(size, align);
return Memory::newNoExept(size, align);
AllocationTrace trace;
std::size_t actual_size = Memory::trackMemory(size, trace, align);
void * ptr = Memory::newNoExept(size, align);
trace.onAlloc(ptr, actual_size);
return ptr;
}
void * operator new[](std::size_t size, std::align_val_t align, const std::nothrow_t &) noexcept
{
Memory::trackMemory(size, align);
return Memory::newNoExept(size, align);
AllocationTrace trace;
std::size_t actual_size = Memory::trackMemory(size, trace, align);
void * ptr = Memory::newNoExept(size, align);
trace.onAlloc(ptr, actual_size);
return ptr;
}
/// delete
@ -130,48 +154,64 @@ void * operator new[](std::size_t size, std::align_val_t align, const std::nothr
void operator delete(void * ptr) noexcept
{
Memory::untrackMemory(ptr);
AllocationTrace trace;
std::size_t actual_size = Memory::untrackMemory(ptr, trace);
trace.onFree(ptr, actual_size);
Memory::deleteImpl(ptr);
}
void operator delete(void * ptr, std::align_val_t align) noexcept
{
Memory::untrackMemory(ptr, 0, align);
AllocationTrace trace;
std::size_t actual_size = Memory::untrackMemory(ptr, trace, 0, align);
trace.onFree(ptr, actual_size);
Memory::deleteImpl(ptr);
}
void operator delete[](void * ptr) noexcept
{
Memory::untrackMemory(ptr);
AllocationTrace trace;
std::size_t actual_size = Memory::untrackMemory(ptr, trace);
trace.onFree(ptr, actual_size);
Memory::deleteImpl(ptr);
}
void operator delete[](void * ptr, std::align_val_t align) noexcept
{
Memory::untrackMemory(ptr, 0, align);
AllocationTrace trace;
std::size_t actual_size = Memory::untrackMemory(ptr, trace, 0, align);
trace.onFree(ptr, actual_size);
Memory::deleteImpl(ptr);
}
void operator delete(void * ptr, std::size_t size) noexcept
{
Memory::untrackMemory(ptr, size);
AllocationTrace trace;
std::size_t actual_size = Memory::untrackMemory(ptr, trace, size);
trace.onFree(ptr, actual_size);
Memory::deleteSized(ptr, size);
}
void operator delete(void * ptr, std::size_t size, std::align_val_t align) noexcept
{
Memory::untrackMemory(ptr, size, align);
AllocationTrace trace;
std::size_t actual_size = Memory::untrackMemory(ptr, trace, size, align);
trace.onFree(ptr, actual_size);
Memory::deleteSized(ptr, size, align);
}
void operator delete[](void * ptr, std::size_t size) noexcept
{
Memory::untrackMemory(ptr, size);
AllocationTrace trace;
std::size_t actual_size = Memory::untrackMemory(ptr, trace, size);
trace.onFree(ptr, actual_size);
Memory::deleteSized(ptr, size);
}
void operator delete[](void * ptr, std::size_t size, std::align_val_t align) noexcept
{
Memory::untrackMemory(ptr, size, align);
AllocationTrace trace;
std::size_t actual_size = Memory::untrackMemory(ptr, trace, size, align);
trace.onFree(ptr, actual_size);
Memory::deleteSized(ptr, size, align);
}

View File

@ -43,7 +43,7 @@ void setThreadName(const char * name)
#else
if (0 != prctl(PR_SET_NAME, name, 0, 0, 0))
#endif
if (errno != ENOSYS) /// It's ok if the syscall is unsupported in some environments.
if (errno != ENOSYS && errno != EPERM) /// It's ok if the syscall is unsupported or not allowed in some environments.
DB::throwFromErrno("Cannot set thread name with prctl(PR_SET_NAME, ...)", DB::ErrorCodes::PTHREAD_ERROR);
memcpy(thread_name, name, std::min<size_t>(1 + strlen(name), THREAD_NAME_SIZE - 1));
@ -63,7 +63,7 @@ const char * getThreadName()
// throw DB::Exception(DB::ErrorCodes::PTHREAD_ERROR, "Cannot get thread name with pthread_get_name_np()");
#else
if (0 != prctl(PR_GET_NAME, thread_name, 0, 0, 0))
if (errno != ENOSYS) /// It's ok if the syscall is unsupported in some environments.
if (errno != ENOSYS && errno != EPERM) /// It's ok if the syscall is unsupported or not allowed in some environments.
DB::throwFromErrno("Cannot get thread name with prctl(PR_GET_NAME)", DB::ErrorCodes::PTHREAD_ERROR);
#endif

View File

@ -11,6 +11,7 @@
#include <Common/assert_cast.h>
#include <Core/NamesAndTypes.h>
#include <Columns/ColumnConst.h>
namespace DB
@ -20,6 +21,7 @@ namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
using FieldType = Array;
DataTypeArray::DataTypeArray(const DataTypePtr & nested_)
@ -33,7 +35,6 @@ MutableColumnPtr DataTypeArray::createColumn() const
return ColumnArray::create(nested->createColumn(), ColumnArray::ColumnOffsets::create());
}
Field DataTypeArray::getDefault() const
{
return Array();

View File

@ -2,6 +2,7 @@
#include <DataTypes/IDataType.h>
#include <DataTypes/Serializations/SerializationArray.h>
#include <Columns/ColumnArray.h>
namespace DB
@ -15,6 +16,8 @@ private:
DataTypePtr nested;
public:
using FieldType = Array;
using ColumnType = ColumnArray;
static constexpr bool is_parametric = true;
explicit DataTypeArray(const DataTypePtr & nested_);
@ -42,6 +45,7 @@ public:
MutableColumnPtr createColumn() const override;
Field getDefault() const override;
bool equals(const IDataType & rhs) const override;

View File

@ -346,7 +346,7 @@ void RegExpTreeDictionary::loadData()
ids[i] = static_cast<unsigned>(i+1);
hs_error_t err = hs_compile_lit_multi(patterns.data(), flags.data(), ids.get(), lengths.data(), static_cast<unsigned>(patterns.size()), HS_MODE_BLOCK, nullptr, &db, &compile_error);
origin_db = (db);
origin_db.reset(db);
if (err != HS_SUCCESS)
{
/// CompilerError is a unique_ptr, so correct memory free after the exception is thrown.
@ -658,7 +658,7 @@ std::unordered_map<String, ColumnPtr> RegExpTreeDictionary::match(
};
hs_error_t err = hs_scan(
origin_db,
origin_db.get(),
reinterpret_cast<const char *>(keys_data.data()) + offset,
static_cast<unsigned>(length),
0,

View File

@ -199,7 +199,7 @@ private:
#if USE_VECTORSCAN
MultiRegexps::DeferredConstructedRegexpsPtr hyperscan_regex;
MultiRegexps::ScratchPtr origin_scratch;
hs_database_t* origin_db;
MultiRegexps::DataBasePtr origin_db;
#endif
Poco::Logger * logger;

View File

@ -42,6 +42,15 @@
#include <Common/assert_cast.h>
#include <Common/typeid_cast.h>
#include <Common/Arena.h>
#include <Core/ColumnWithTypeAndName.h>
#include <base/types.h>
#include <Columns/ColumnArray.h>
#include <Columns/IColumn.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <DataTypes/IDataType.h>
#include <DataTypes/getMostSubtype.h>
#include <base/TypeLists.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Interpreters/Context.h>
@ -62,6 +71,7 @@ namespace ErrorCodes
extern const int DECIMAL_OVERFLOW;
extern const int CANNOT_ADD_DIFFERENT_AGGREGATE_STATES;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int SIZES_OF_ARRAYS_DONT_MATCH;
}
namespace traits_
@ -102,6 +112,9 @@ template <typename DataType> constexpr bool IsFloatingPoint = false;
template <> inline constexpr bool IsFloatingPoint<DataTypeFloat32> = true;
template <> inline constexpr bool IsFloatingPoint<DataTypeFloat64> = true;
template <typename DataType> constexpr bool IsArray = false;
template <> inline constexpr bool IsArray<DataTypeArray> = true;
template <typename DataType> constexpr bool IsDateOrDateTime = false;
template <> inline constexpr bool IsDateOrDateTime<DataTypeDate> = true;
template <> inline constexpr bool IsDateOrDateTime<DataTypeDateTime> = true;
@ -1125,6 +1138,73 @@ class FunctionBinaryArithmetic : public IFunction
return function->execute(arguments, result_type, input_rows_count);
}
ColumnPtr executeArrayImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const
{
const auto * return_type_array = checkAndGetDataType<DataTypeArray>(result_type.get());
if (!return_type_array)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Return type for function {} must be array.", getName());
auto num_args = arguments.size();
DataTypes data_types;
ColumnsWithTypeAndName new_arguments {num_args};
DataTypePtr result_array_type;
const auto * left_const = typeid_cast<const ColumnConst *>(arguments[0].column.get());
const auto * right_const = typeid_cast<const ColumnConst *>(arguments[1].column.get());
/// Unpacking arrays if both are constants.
if (left_const && right_const)
{
new_arguments[0] = {left_const->getDataColumnPtr(), arguments[0].type, arguments[0].name};
new_arguments[1] = {right_const->getDataColumnPtr(), arguments[1].type, arguments[1].name};
auto col = executeImpl(new_arguments, result_type, 1);
return ColumnConst::create(std::move(col), input_rows_count);
}
/// Unpacking arrays if at least one column is constant.
if (left_const || right_const)
{
new_arguments[0] = {arguments[0].column->convertToFullColumnIfConst(), arguments[0].type, arguments[0].name};
new_arguments[1] = {arguments[1].column->convertToFullColumnIfConst(), arguments[1].type, arguments[1].name};
return executeImpl(new_arguments, result_type, input_rows_count);
}
const auto * left_array_col = typeid_cast<const ColumnArray *>(arguments[0].column.get());
const auto * right_array_col = typeid_cast<const ColumnArray *>(arguments[1].column.get());
const auto & left_offsets = left_array_col->getOffsets();
const auto & right_offsets = right_array_col->getOffsets();
chassert(left_offsets.size() == right_offsets.size() && "Unexpected difference in number of offsets");
/// Unpacking non-const arrays and checking sizes of them.
for (auto offset_index = 0U; offset_index < left_offsets.size(); ++offset_index)
{
if (left_offsets[offset_index] != right_offsets[offset_index])
{
throw Exception(ErrorCodes::SIZES_OF_ARRAYS_DONT_MATCH,
"Cannot apply operation for arrays of different sizes. Size of the first argument: {}, size of the second argument: {}",
*left_array_col->getOffsets().data(),
*right_array_col ->getOffsets().data());
}
}
const auto & left_array_type = typeid_cast<const DataTypeArray *>(arguments[0].type.get())->getNestedType();
new_arguments[0] = {left_array_col->getDataPtr(), left_array_type, arguments[0].name};
const auto & right_array_type = typeid_cast<const DataTypeArray *>(arguments[1].type.get())->getNestedType();
new_arguments[1] = {right_array_col->getDataPtr(), right_array_type, arguments[1].name};
result_array_type = typeid_cast<const DataTypeArray *>(result_type.get())->getNestedType();
size_t rows_count = 0;
if (!left_offsets.empty())
rows_count = left_offsets.back();
auto res = executeImpl(new_arguments, result_array_type, rows_count);
return ColumnArray::create(res, typeid_cast<const ColumnArray *>(arguments[0].column.get())->getOffsetsPtr());
}
ColumnPtr executeTupleNumberOperator(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type,
size_t input_rows_count, const FunctionOverloadResolverPtr & function_builder) const
{
@ -1326,6 +1406,20 @@ public:
return getReturnTypeImplStatic(new_arguments, context);
}
if constexpr (is_plus || is_minus)
{
if (isArray(arguments[0]) && isArray(arguments[1]))
{
DataTypes new_arguments {
static_cast<const DataTypeArray &>(*arguments[0]).getNestedType(),
static_cast<const DataTypeArray &>(*arguments[1]).getNestedType(),
};
return std::make_shared<DataTypeArray>(getReturnTypeImplStatic(new_arguments, context));
}
}
/// Special case when the function is plus or minus, one of arguments is Date/DateTime and another is Interval.
if (auto function_builder = getFunctionForIntervalArithmetic(arguments[0], arguments[1], context))
{
@ -2031,6 +2125,9 @@ ColumnPtr executeStringInteger(const ColumnsWithTypeAndName & arguments, const A
return (res = executeNumeric(arguments, left, right, right_nullmap)) != nullptr;
});
if (isArray(result_type))
return executeArrayImpl(arguments, result_type, input_rows_count);
if (!valid)
{
// This is a logical error, because the types should have been checked

View File

@ -8,7 +8,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNEXPECTED_END_OF_FILE;
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int SEEK_POSITION_OUT_OF_BOUND;
@ -260,7 +260,7 @@ void ParallelReadBuffer::readerThreadFunction(ReadWorkerPtr read_worker)
if (!on_progress(r) && r < read_worker->segment.size())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
ErrorCodes::UNEXPECTED_END_OF_FILE,
"Failed to read all the data from the reader at offset {}, got {}/{} bytes",
read_worker->start_offset, r, read_worker->segment.size());
}

View File

@ -94,38 +94,6 @@ static size_t getTypeDepth(const DataTypePtr & type)
return 0;
}
template <typename T>
static bool decimalEqualsFloat(Field field, Float64 float_value)
{
auto decimal_field = field.get<DecimalField<T>>();
auto decimal_to_float = DecimalUtils::convertTo<Float64>(decimal_field.getValue(), decimal_field.getScale());
return decimal_to_float == float_value;
}
/// Applies stricter rules than convertFieldToType:
/// Doesn't allow :
/// - loss of precision converting to Decimal
static bool convertFieldToTypeStrict(const Field & from_value, const IDataType & to_type, Field & result_value)
{
result_value = convertFieldToType(from_value, to_type);
if (Field::isDecimal(from_value.getType()) && Field::isDecimal(result_value.getType()))
return applyVisitor(FieldVisitorAccurateEquals{}, from_value, result_value);
if (from_value.getType() == Field::Types::Float64 && Field::isDecimal(result_value.getType()))
{
/// Convert back to Float64 and compare
if (result_value.getType() == Field::Types::Decimal32)
return decimalEqualsFloat<Decimal32>(result_value, from_value.get<Float64>());
if (result_value.getType() == Field::Types::Decimal64)
return decimalEqualsFloat<Decimal64>(result_value, from_value.get<Float64>());
if (result_value.getType() == Field::Types::Decimal128)
return decimalEqualsFloat<Decimal128>(result_value, from_value.get<Float64>());
if (result_value.getType() == Field::Types::Decimal256)
return decimalEqualsFloat<Decimal256>(result_value, from_value.get<Float64>());
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown decimal type {}", result_value.getTypeName());
}
return true;
}
/// The `convertFieldToTypeStrict` is used to prevent unexpected results in case of conversion with loss of precision.
/// Example: `SELECT 33.3 :: Decimal(9, 1) AS a WHERE a IN (33.33 :: Decimal(9, 2))`
/// 33.33 in the set is converted to 33.3, but it is not equal to 33.3 in the column, so the result should still be empty.
@ -146,11 +114,10 @@ static Block createBlockFromCollection(const Collection & collection, const Data
{
if (columns_num == 1)
{
Field field;
bool is_conversion_ok = convertFieldToTypeStrict(value, *types[0], field);
auto field = convertFieldToTypeStrict(value, *types[0]);
bool need_insert_null = transform_null_in && types[0]->isNullable();
if (is_conversion_ok && (!field.isNull() || need_insert_null))
columns[0]->insert(field);
if (field && (!field->isNull() || need_insert_null))
columns[0]->insert(*field);
}
else
{
@ -171,9 +138,10 @@ static Block createBlockFromCollection(const Collection & collection, const Data
size_t i = 0;
for (; i < tuple_size; ++i)
{
bool is_conversion_ok = convertFieldToTypeStrict(tuple[i], *types[i], tuple_values[i]);
if (!is_conversion_ok)
auto converted_field = convertFieldToTypeStrict(tuple[i], *types[i]);
if (!converted_field)
break;
tuple_values[i] = std::move(*converted_field);
bool need_insert_null = transform_null_in && types[i]->isNullable();
if (tuple_values[i].isNull() && !need_insert_null)

View File

@ -1130,9 +1130,17 @@ JoinPtr SelectQueryExpressionAnalyzer::makeJoin(
if (auto storage = analyzed_join->getStorageJoin())
{
auto joined_block_actions = analyzed_join->createJoinedBlockActions(getContext());
NamesWithAliases required_columns_with_aliases = analyzed_join->getRequiredColumns(
Block(joined_block_actions->getResultColumns()), joined_block_actions->getRequiredColumns().getNames());
Names original_right_column_names;
for (auto & pr : required_columns_with_aliases)
original_right_column_names.push_back(pr.first);
auto right_columns = storage->getRightSampleBlock().getColumnsWithTypeAndName();
std::tie(left_convert_actions, right_convert_actions) = analyzed_join->createConvertingActions(left_columns, right_columns);
return storage->getJoinLocked(analyzed_join, getContext());
return storage->getJoinLocked(analyzed_join, getContext(), original_right_column_names);
}
joined_plan = buildJoinedPlan(getContext(), join_element, *analyzed_join, query_options);

View File

@ -189,7 +189,6 @@ private:
template <typename LeftNamesAndTypes, typename RightNamesAndTypes>
void inferJoinKeyCommonType(const LeftNamesAndTypes & left, const RightNamesAndTypes & right, bool allow_right, bool strict);
NamesAndTypesList correctedColumnsAddedByJoin() const;
void deduplicateAndQualifyColumnNames(const NameSet & left_table_columns, const String & right_table_prefix);
@ -371,6 +370,8 @@ public:
bool isSpecialStorage() const { return !right_storage_name.empty() || right_storage_join || right_kv_storage; }
std::shared_ptr<const IKeyValueEntity> getStorageKeyValue() { return right_kv_storage; }
NamesAndTypesList correctedColumnsAddedByJoin() const;
};
}

View File

@ -112,6 +112,9 @@ void TraceCollector::run()
Int64 size;
readPODBinary(size, in);
UInt64 ptr;
readPODBinary(ptr, in);
ProfileEvents::Event event;
readPODBinary(event, in);
@ -127,7 +130,8 @@ void TraceCollector::run()
UInt64 time = static_cast<UInt64>(ts.tv_sec * 1000000000LL + ts.tv_nsec);
UInt64 time_in_microseconds = static_cast<UInt64>((ts.tv_sec * 1000000LL) + (ts.tv_nsec / 1000));
TraceLogElement element{time_t(time / 1000000000), time_in_microseconds, time, trace_type, thread_id, query_id, trace, size, event, increment};
TraceLogElement element{time_t(time / 1000000000), time_in_microseconds, time, trace_type, thread_id, query_id, trace, size, ptr, event, increment};
trace_log->add(std::move(element));
}
}

View File

@ -38,6 +38,7 @@ NamesAndTypesList TraceLogElement::getNamesAndTypes()
{"query_id", std::make_shared<DataTypeString>()},
{"trace", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>())},
{"size", std::make_shared<DataTypeInt64>()},
{"ptr", std::make_shared<DataTypeUInt64>()},
{"event", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"increment", std::make_shared<DataTypeInt64>()},
};
@ -57,6 +58,7 @@ void TraceLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insertData(query_id.data(), query_id.size());
columns[i++]->insert(trace);
columns[i++]->insert(size);
columns[i++]->insert(ptr);
String event_name;
if (event != ProfileEvents::end())

View File

@ -27,8 +27,10 @@ struct TraceLogElement
UInt64 thread_id{};
String query_id{};
Array trace{};
/// Allocation size in bytes for TraceType::Memory.
/// Allocation size in bytes for TraceType::Memory and TraceType::MemorySample.
Int64 size{};
/// Allocation ptr for TraceType::MemorySample.
UInt64 ptr{};
/// ProfileEvent for TraceType::ProfileEvent.
ProfileEvents::Event event{ProfileEvents::end()};
/// Increment of profile event for TraceType::ProfileEvent.

View File

@ -21,6 +21,7 @@
#include <Common/typeid_cast.h>
#include <Common/NaNUtils.h>
#include <Common/FieldVisitorsAccurateComparison.h>
#include <Common/FieldVisitorToString.h>
#include <Common/FieldVisitorConvertToNumber.h>
#include <Common/DateLUT.h>
@ -32,6 +33,7 @@ namespace DB
namespace ErrorCodes
{
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int LOGICAL_ERROR;
extern const int TYPE_MISMATCH;
extern const int UNEXPECTED_DATA_AFTER_PARSED_VALUE;
}
@ -565,4 +567,39 @@ Field convertFieldToTypeOrThrow(const Field & from_value, const IDataType & to_t
return converted;
}
template <typename T>
static bool decimalEqualsFloat(Field field, Float64 float_value)
{
auto decimal_field = field.get<DecimalField<T>>();
auto decimal_to_float = DecimalUtils::convertTo<Float64>(decimal_field.getValue(), decimal_field.getScale());
return decimal_to_float == float_value;
}
std::optional<Field> convertFieldToTypeStrict(const Field & from_value, const IDataType & to_type)
{
Field result_value = convertFieldToType(from_value, to_type);
if (Field::isDecimal(from_value.getType()) && Field::isDecimal(result_value.getType()))
{
bool is_equal = applyVisitor(FieldVisitorAccurateEquals{}, from_value, result_value);
return is_equal ? result_value : std::optional<Field>{};
}
if (from_value.getType() == Field::Types::Float64 && Field::isDecimal(result_value.getType()))
{
/// Convert back to Float64 and compare
if (result_value.getType() == Field::Types::Decimal32)
return decimalEqualsFloat<Decimal32>(result_value, from_value.get<Float64>()) ? result_value : std::optional<Field>{};
if (result_value.getType() == Field::Types::Decimal64)
return decimalEqualsFloat<Decimal64>(result_value, from_value.get<Float64>()) ? result_value : std::optional<Field>{};
if (result_value.getType() == Field::Types::Decimal128)
return decimalEqualsFloat<Decimal128>(result_value, from_value.get<Float64>()) ? result_value : std::optional<Field>{};
if (result_value.getType() == Field::Types::Decimal256)
return decimalEqualsFloat<Decimal256>(result_value, from_value.get<Float64>()) ? result_value : std::optional<Field>{};
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown decimal type {}", result_value.getTypeName());
}
return result_value;
}
}

View File

@ -20,4 +20,8 @@ Field convertFieldToType(const Field & from_value, const IDataType & to_type, co
/// Does the same, but throws ARGUMENT_OUT_OF_BOUND if value does not fall into the range.
Field convertFieldToTypeOrThrow(const Field & from_value, const IDataType & to_type, const IDataType * from_type_hint = nullptr);
/// Applies stricter rules than convertFieldToType, doesn't allow loss of precision converting to Decimal.
/// Returns `Field` if the conversion was successful and the result is equal to the original value, otherwise returns nullopt.
std::optional<Field> convertFieldToTypeStrict(const Field & from_value, const IDataType & to_type);
}

View File

@ -100,8 +100,9 @@ public:
if (isNameOfInFunction(function_node.getFunctionName()))
{
const auto & in_first_argument_node = function_node.getArguments().getNodes().at(0);
const auto & in_second_argument_node = function_node.getArguments().getNodes().at(1);
in_function_second_argument_node_name = planner_context.createSetKey(in_second_argument_node);
in_function_second_argument_node_name = planner_context.createSetKey(in_first_argument_node->getResultType(), in_second_argument_node);
}
WriteBufferFromOwnString buffer;
@ -628,8 +629,6 @@ PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::ma
auto in_first_argument = function_node.getArguments().getNodes().at(0);
auto in_second_argument = function_node.getArguments().getNodes().at(1);
//auto set_key = planner_context->createSetKey(in_second_argument);
DataTypes set_element_types;
auto in_second_argument_node_type = in_second_argument->getNodeType();
@ -665,7 +664,7 @@ PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::ma
PreparedSets::toString(set_key, set_element_types));
ColumnWithTypeAndName column;
column.name = planner_context->createSetKey(in_second_argument);
column.name = planner_context->createSetKey(in_first_argument->getResultType(), in_second_argument);
column.type = std::make_shared<DataTypeSet>();
bool set_is_created = set->get() != nullptr;

View File

@ -2,6 +2,7 @@
#include <Analyzer/TableNode.h>
#include <Analyzer/ColumnNode.h>
#include <Analyzer/ConstantNode.h>
namespace DB
{
@ -112,9 +113,24 @@ const ColumnIdentifier * PlannerContext::getColumnNodeIdentifierOrNull(const Que
return table_expression_data->getColumnIdentifierOrNull(column_name);
}
PlannerContext::SetKey PlannerContext::createSetKey(const QueryTreeNodePtr & set_source_node)
PlannerContext::SetKey PlannerContext::createSetKey(const DataTypePtr & left_operand_type, const QueryTreeNodePtr & set_source_node)
{
auto set_source_hash = set_source_node->getTreeHash();
if (set_source_node->as<ConstantNode>())
{
/* We need to hash the type of the left operand because we can build different sets for different types.
* (It's done for performance reasons. It's cheaper to convert a small set of values from literal to the type of the left operand.)
*
* For example in expression `(a :: Decimal(9, 1) IN (1.0, 2.5)) AND (b :: Decimal(9, 0) IN (1, 2.5))`
* we need to build two different sets:
* - `{1, 2.5} :: Set(Decimal(9, 1))` for a
* - `{1} :: Set(Decimal(9, 0))` for b (2.5 omitted because bercause it's not representable as Decimal(9, 0)).
*/
return "__set_" + left_operand_type->getName() + '_' + toString(set_source_hash.first) + '_' + toString(set_source_hash.second);
}
/// For other cases we will cast left operand to the type of the set source, so no difference in types.
return "__set_" + toString(set_source_hash.first) + '_' + toString(set_source_hash.second);
}

View File

@ -132,7 +132,7 @@ public:
using SetKey = std::string;
/// Create set key for set source node
static SetKey createSetKey(const QueryTreeNodePtr & set_source_node);
static SetKey createSetKey(const DataTypePtr & left_operand_type, const QueryTreeNodePtr & set_source_node);
PreparedSets & getPreparedSets() { return prepared_sets; }

View File

@ -635,6 +635,7 @@ std::shared_ptr<IJoin> chooseJoinAlgorithm(std::shared_ptr<TableJoin> & table_jo
/// JOIN with JOIN engine.
if (auto storage = table_join->getStorageJoin())
{
Names required_column_names;
for (const auto & result_column : right_table_expression_header)
{
const auto * source_column_name = right_table_expression_data.getColumnNameOrNull(result_column.name);
@ -644,8 +645,9 @@ std::shared_ptr<IJoin> chooseJoinAlgorithm(std::shared_ptr<TableJoin> & table_jo
fmt::join(storage->getKeyNames(), ", "), result_column.name);
table_join->setRename(*source_column_name, result_column.name);
required_column_names.push_back(*source_column_name);
}
return storage->getJoinLocked(table_join, planner_context->getQueryContext());
return storage->getJoinLocked(table_join, planner_context->getQueryContext(), required_column_names);
}
/** JOIN with constant.

View File

@ -141,7 +141,7 @@ void CreateSetAndFilterOnTheFlyStep::transformPipeline(QueryPipelineBuilder & pi
/// Add balancing transform
auto idx = position == JoinTableSide::Left ? PingPongProcessor::First : PingPongProcessor::Second;
auto stream_balancer = std::make_shared<ReadHeadBalancedProcessor>(input_header, num_ports, max_rows_in_set, idx);
stream_balancer->setDescription(getStepDescription());
stream_balancer->setDescription("Reads rows from two streams evenly");
/// Regular inputs just bypass data for respective ports
connectAllInputs(ports, stream_balancer->getInputs(), num_ports);
@ -163,7 +163,7 @@ void CreateSetAndFilterOnTheFlyStep::transformPipeline(QueryPipelineBuilder & pi
{
auto & port = *output_it++;
auto transform = std::make_shared<FilterBySetOnTheFlyTransform>(port.getHeader(), column_names, filtering_set);
transform->setDescription(this->getStepDescription());
transform->setDescription("Filter rows using other join table side's set");
connect(port, transform->getInputPort());
result_transforms.emplace_back(std::move(transform));
}

View File

@ -201,20 +201,20 @@ static IMergeTreeDataPart::Checksums checkDataPart(
continue;
auto checksum_it = checksums_data.files.find(file_name);
/// Skip files that we already calculated. Also skip metadata files that are not checksummed.
if (checksum_it == checksums_data.files.end() && !files_without_checksums.contains(file_name))
{
auto txt_checksum_it = checksums_txt_files.find(file_name);
if (txt_checksum_it == checksums_txt_files.end() || txt_checksum_it->second.uncompressed_size == 0)
if ((txt_checksum_it != checksums_txt_files.end() && txt_checksum_it->second.is_compressed))
{
/// If we have both compressed and uncompressed in txt or its .cmrk(2/3) or .cidx, then calculate them
checksums_data.files[file_name] = checksum_compressed_file(data_part_storage, file_name);
}
else
{
/// The file is not compressed.
checksum_file(file_name);
}
else /// If we have both compressed and uncompressed in txt, then calculate them
{
checksums_data.files[file_name] = checksum_compressed_file(data_part_storage, file_name);
}
}
}

View File

@ -396,9 +396,9 @@ std::unique_ptr<ReadBuffer> createReadBuffer(
throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP,
"Cannot compile regex from glob ({}): {}", current_path, matcher->error());
return reader->readFile([matcher = std::move(matcher)](const std::string & path)
return reader->readFile([my_matcher = std::move(matcher)](const std::string & path)
{
return re2::RE2::FullMatch(path, *matcher);
return re2::RE2::FullMatch(path, *my_matcher);
});
}
else

View File

@ -14,6 +14,7 @@
#include <Interpreters/castColumn.h>
#include <Common/quoteString.h>
#include <Common/Exception.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <Interpreters/JoinUtils.h>
#include <Compression/CompressedWriteBuffer.h>
@ -177,7 +178,7 @@ void StorageJoin::mutate(const MutationCommands & commands, ContextPtr context)
}
}
HashJoinPtr StorageJoin::getJoinLocked(std::shared_ptr<TableJoin> analyzed_join, ContextPtr context) const
HashJoinPtr StorageJoin::getJoinLocked(std::shared_ptr<TableJoin> analyzed_join, ContextPtr context, const Names & required_columns_names) const
{
auto metadata_snapshot = getInMemoryMetadataPtr();
if (!analyzed_join->sameStrictnessAndKind(strictness, kind))
@ -237,8 +238,10 @@ HashJoinPtr StorageJoin::getJoinLocked(std::shared_ptr<TableJoin> analyzed_join,
/// Qualifies will be added by join implementation (TableJoin contains a rename mapping).
analyzed_join->setRightKeys(key_names);
analyzed_join->setLeftKeys(left_key_names_resorted);
HashJoinPtr join_clone = std::make_shared<HashJoin>(analyzed_join, getRightSampleBlock());
Block right_sample_block;
for (const auto & name : required_columns_names)
right_sample_block.insert(getRightSampleBlock().getByName(name));
HashJoinPtr join_clone = std::make_shared<HashJoin>(analyzed_join, right_sample_block);
RWLockImpl::LockHolder holder = tryLockTimedWithContext(rwlock, RWLockImpl::Read, context);
join_clone->setLock(holder);

View File

@ -49,7 +49,7 @@ public:
/// Return instance of HashJoin holding lock that protects from insertions to StorageJoin.
/// HashJoin relies on structure of hash table that's why we need to return it with locked mutex.
HashJoinPtr getJoinLocked(std::shared_ptr<TableJoin> analyzed_join, ContextPtr context) const;
HashJoinPtr getJoinLocked(std::shared_ptr<TableJoin> analyzed_join, ContextPtr context, const Names & required_columns_names) const;
/// Get result type for function "joinGet(OrNull)"
DataTypePtr joinGetCheckAndGetReturnType(const DataTypes & data_types, const String & column_name, bool or_null) const;

View File

@ -1,5 +1,4 @@
test_access_for_functions/test.py::test_access_rights_for_function
test_backward_compatibility/test_normalized_count_comparison.py::test_select_aggregate_alias_column
test_concurrent_backups_s3/test.py::test_concurrent_backups
test_distributed_ddl/test.py::test_default_database[configs]
test_distributed_ddl/test.py::test_default_database[configs_secure]
@ -29,16 +28,6 @@ test_distributed_load_balancing/test.py::test_load_balancing_default
test_distributed_load_balancing/test.py::test_load_balancing_priority_round_robin[dist_priority]
test_distributed_load_balancing/test.py::test_load_balancing_priority_round_robin[dist_priority_negative]
test_distributed_load_balancing/test.py::test_load_balancing_round_robin
test_backward_compatibility/test.py::test_backward_compatability1
test_backward_compatibility/test_aggregate_fixed_key.py::test_two_level_merge
test_backward_compatibility/test_aggregate_function_state.py::test_backward_compatability_for_avg
test_backward_compatibility/test_aggregate_function_state.py::test_backward_compatability_for_uniq_exact[1000]
test_backward_compatibility/test_aggregate_function_state.py::test_backward_compatability_for_uniq_exact[500000]
test_backward_compatibility/test_aggregate_function_state.py::test_backward_compatability_for_uniq_exact_variadic[1000]
test_backward_compatibility/test_aggregate_function_state.py::test_backward_compatability_for_uniq_exact_variadic[500000]
test_backward_compatibility/test_ip_types_binary_compatibility.py::test_ip_types_binary_compatibility
test_backward_compatibility/test_select_aggregate_alias_column.py::test_select_aggregate_alias_column
test_backward_compatibility/test_short_strings_aggregation.py::test_backward_compatability
test_mask_sensitive_info/test.py::test_encryption_functions
test_merge_table_over_distributed/test.py::test_global_in
test_merge_table_over_distributed/test.py::test_select_table_name_from_merge_over_distributed
@ -87,7 +76,6 @@ test_row_policy/test.py::test_users_xml_is_readonly
test_row_policy/test.py::test_with_prewhere
test_row_policy/test.py::test_with_prewhere
test_settings_constraints_distributed/test.py::test_select_clamps_settings
test_backward_compatibility/test_cte_distributed.py::test_cte_distributed
test_compression_codec_read/test.py::test_default_codec_read
test_dictionaries_update_and_reload/test.py::test_reload_after_fail_in_cache_dictionary
test_distributed_type_object/test.py::test_distributed_type_object
@ -98,9 +86,6 @@ test_storage_postgresql/test.py::test_postgres_select_insert
test_storage_rabbitmq/test.py::test_rabbitmq_materialized_view
test_system_merges/test.py::test_mutation_simple[]
test_system_merges/test.py::test_mutation_simple[replicated]
test_backward_compatibility/test_insert_profile_events.py::test_new_client_compatible
test_backward_compatibility/test_insert_profile_events.py::test_old_client_compatible
test_backward_compatibility/test_vertical_merges_from_compact_parts.py::test_vertical_merges_from_compact_parts
test_disk_over_web_server/test.py::test_cache[node2]
test_disk_over_web_server/test.py::test_incorrect_usage
test_disk_over_web_server/test.py::test_replicated_database
@ -108,16 +93,8 @@ test_disk_over_web_server/test.py::test_unavailable_server
test_disk_over_web_server/test.py::test_usage[node2]
test_distributed_backward_compatability/test.py::test_distributed_in_tuple
test_executable_table_function/test.py::test_executable_function_input_python
test_groupBitmapAnd_on_distributed/test_groupBitmapAndState_on_distributed_table.py::test_groupBitmapAndState_on_different_version_nodes
test_groupBitmapAnd_on_distributed/test_groupBitmapAndState_on_distributed_table.py::test_groupBitmapAndState_on_distributed_table
test_settings_profile/test.py::test_show_profiles
test_sql_user_defined_functions_on_cluster/test.py::test_sql_user_defined_functions_on_cluster
test_backward_compatibility/test_functions.py::test_aggregate_states
test_backward_compatibility/test_functions.py::test_string_functions
test_default_compression_codec/test.py::test_default_codec_for_compact_parts
test_default_compression_codec/test.py::test_default_codec_multiple
test_default_compression_codec/test.py::test_default_codec_single
test_default_compression_codec/test.py::test_default_codec_version_update
test_postgresql_protocol/test.py::test_python_client
test_quota/test.py::test_add_remove_interval
test_quota/test.py::test_add_remove_quota
@ -135,40 +112,8 @@ test_quota/test.py::test_reload_users_xml_by_timer
test_quota/test.py::test_simpliest_quota
test_quota/test.py::test_tracking_quota
test_quota/test.py::test_users_xml_is_readonly
test_replicated_merge_tree_compatibility/test.py::test_replicated_merge_tree_defaults_compatibility
test_polymorphic_parts/test.py::test_different_part_types_on_replicas[polymorphic_table_wide-Wide]
test_old_versions/test.py::test_client_is_older_than_server
test_polymorphic_parts/test.py::test_polymorphic_parts_non_adaptive
test_old_versions/test.py::test_server_is_older_than_client
test_polymorphic_parts/test.py::test_compact_parts_only
test_polymorphic_parts/test.py::test_different_part_types_on_replicas[polymorphic_table_compact-Compact]
test_polymorphic_parts/test.py::test_polymorphic_parts_index
test_old_versions/test.py::test_distributed_query_initiator_is_older_than_shard
test_polymorphic_parts/test.py::test_polymorphic_parts_basics[first_node1-second_node1]
test_polymorphic_parts/test.py::test_polymorphic_parts_basics[first_node0-second_node0]
test_ttl_replicated/test.py::test_ttl_table[DELETE]
test_ttl_replicated/test.py::test_ttl_columns
test_ttl_replicated/test.py::test_ttl_compatibility[node_left2-node_right2-2]
test_ttl_replicated/test.py::test_ttl_table[]
test_version_update/test.py::test_aggregate_function_versioning_server_upgrade
test_version_update/test.py::test_aggregate_function_versioning_fetch_data_from_old_to_new_server
test_ttl_replicated/test.py::test_ttl_double_delete_rule_returns_error
test_ttl_replicated/test.py::test_ttl_alter_delete[test_ttl_alter_delete]
test_ttl_replicated/test.py::test_ttl_alter_delete[test_ttl_alter_delete_replicated]
test_ttl_replicated/test.py::test_ttl_compatibility[node_left0-node_right0-0]
test_version_update/test.py::test_modulo_partition_key_issue_23508
test_ttl_replicated/test.py::test_ttl_many_columns
test_ttl_replicated/test.py::test_modify_column_ttl
test_ttl_replicated/test.py::test_merge_with_ttl_timeout
test_ttl_replicated/test.py::test_ttl_empty_parts
test_ttl_replicated/test.py::test_ttl_compatibility[node_left1-node_right1-1]
test_version_update/test.py::test_aggregate_function_versioning_persisting_metadata
test_version_update/test.py::test_aggregate_function_versioning_issue_16587
test_ttl_replicated/test.py::test_modify_ttl
test_mysql_database_engine/test.py::test_mysql_ddl_for_mysql_database
test_profile_events_s3/test.py::test_profile_events
test_version_update_after_mutation/test.py::test_upgrade_while_mutation
test_version_update_after_mutation/test.py::test_mutate_and_upgrade
test_system_flush_logs/test.py::test_system_logs[system.text_log-0]
test_user_defined_object_persistence/test.py::test_persistence
test_settings_profile/test.py::test_show_profiles
@ -176,7 +121,6 @@ test_sql_user_defined_functions_on_cluster/test.py::test_sql_user_defined_functi
test_select_access_rights/test_main.py::test_alias_columns
test_select_access_rights/test_main.py::test_select_count
test_select_access_rights/test_main.py::test_select_join
test_replicated_merge_tree_compatibility/test.py::test_replicated_merge_tree_defaults_compatibility
test_postgresql_protocol/test.py::test_python_client
test_quota/test.py::test_add_remove_interval
test_quota/test.py::test_add_remove_quota
@ -197,8 +141,7 @@ test_quota/test.py::test_users_xml_is_readonly
test_replicating_constants/test.py::test_different_versions
test_merge_tree_s3/test.py::test_heavy_insert_select_check_memory[node]
test_drop_is_lock_free/test.py::test_query_is_lock_free[detach table]
test_backward_compatibility/test_data_skipping_indices.py::test_index
test_backward_compatibility/test_convert_ordinary.py::test_convert_ordinary_to_atomic
test_backward_compatibility/test_memory_bound_aggregation.py::test_backward_compatability
test_odbc_interaction/test.py::test_postgres_insert
test_zookeeper_config/test.py::test_chroot_with_different_root
test_zookeeper_config/test.py::test_chroot_with_same_root
test_merge_tree_azure_blob_storage/test.py::test_table_manipulations

View File

@ -2,7 +2,6 @@
00562_in_subquery_merge_tree
00593_union_all_assert_columns_removed
00673_subquery_prepared_set_performance
00700_decimal_compare
00717_merge_and_distributed
00725_memory_tracking
00754_distributed_optimize_skip_select_on_unused_shards
@ -89,9 +88,7 @@
02382_join_and_filtering_set
02402_merge_engine_with_view
02404_memory_bound_merging
02421_decimal_in_precision_issue_41125
02426_orc_bug
02428_decimal_in_floating_point_literal
02428_parameterized_view
02458_use_structure_from_insertion_table
02479_race_condition_between_insert_and_droppin_mv

View File

@ -1,12 +1,14 @@
#!/usr/bin/env python3
from typing import List, Tuple
import subprocess
import logging
import json
import os
import sys
import time
from typing import List, Tuple
import urllib.parse
import requests # type: ignore
from ci_config import CI_CONFIG, BuildConfig
from docker_pull_helper import get_image_with_version
@ -30,6 +32,7 @@ from version_helper import (
from clickhouse_helper import (
ClickHouseHelper,
prepare_tests_results_for_clickhouse,
get_instance_type,
)
from stopwatch import Stopwatch
@ -51,6 +54,7 @@ def get_packager_cmd(
build_config: BuildConfig,
packager_path: str,
output_path: str,
profile_path: str,
build_version: str,
image_version: str,
official: bool,
@ -59,8 +63,8 @@ def get_packager_cmd(
comp = build_config.compiler
cmake_flags = "-DENABLE_CLICKHOUSE_SELF_EXTRACTING=1"
cmd = (
f"cd {packager_path} && CMAKE_FLAGS='{cmake_flags}' ./packager --output-dir={output_path} "
f"--package-type={package_type} --compiler={comp}"
f"cd {packager_path} && CMAKE_FLAGS='{cmake_flags}' ./packager --output-dir={output_path} --profile-dir={profile_path}"
f" --package-type={package_type} --compiler={comp}"
)
if build_config.debug_build:
@ -286,10 +290,15 @@ def main():
if not os.path.exists(build_output_path):
os.makedirs(build_output_path)
build_profile_path = os.path.join(TEMP_PATH, f"{build_name}_profile")
if not os.path.exists(build_profile_path):
os.makedirs(build_profile_path)
packager_cmd = get_packager_cmd(
build_config,
os.path.join(REPO_COPY, "docker/packager"),
build_output_path,
build_profile_path,
version.string,
image_version,
official_flag,
@ -360,6 +369,69 @@ def main():
upload_master_static_binaries(pr_info, build_config, s3_helper, build_output_path)
# Upload profile data
instance_type = get_instance_type()
query = urllib.parse.quote(
f"""
INSERT INTO build_time_trace
(
pull_request_number,
commit_sha,
check_start_time,
check_name,
instance_type,
file,
library,
time,
pid,
tid,
ph,
ts,
dur,
cat,
name,
detail,
count,
avgMs,
args_name
)
SELECT {pr_info.number}, '{pr_info.sha}', '{stopwatch.start_time_str}', '{build_name}', '{instance_type}', *
FROM input('
file String,
library String,
time DateTime64(6),
pid UInt32,
tid UInt32,
ph String,
ts UInt64,
dur UInt64,
cat String,
name String,
detail String,
count UInt64,
avgMs UInt64,
args_name String')
FORMAT JSONCompactEachRow
"""
)
clickhouse_ci_logs_host = os.getenv("CLICKHOUSE_CI_LOGS_HOST")
maybe_clickhouse_ci_logs_password: str = (
os.getenv("CLICKHOUSE_CI_LOGS_PASSWORD") or ""
)
url = f"https://{clickhouse_ci_logs_host}/?query={query}"
file_path = os.path.join(build_profile_path, "profile.json")
file_size = os.path.getsize(file_path)
print(
f"::notice ::Log Uploading profile data, path: {file_path}, size: {file_size}, query: {query}"
)
with open(file_path, "rb") as file:
requests.post(url, data=file, auth=("ci", maybe_clickhouse_ci_logs_password))
# Upload statistics to CI database
ch_helper = ClickHouseHelper()
prepared_events = prepare_tests_results_for_clickhouse(
pr_info,

View File

@ -373,9 +373,6 @@ REQUIRED_CHECKS = [
"Stress test (tsan)",
"Stress test (ubsan)",
"Upgrade check (asan)",
"Upgrade check (debug)",
"Upgrade check (msan)",
"Upgrade check (tsan)",
"Style Check",
"Unit tests (asan)",
"Unit tests (msan)",

View File

@ -20,7 +20,12 @@ def tune_local_port_range():
#
# NOTE: 5K is not enough, and sometimes leads to EADDRNOTAVAIL error.
# NOTE: it is not inherited, so you may need to specify this in docker_compose_$SERVICE.yml
run_and_check(["sysctl net.ipv4.ip_local_port_range='55000 65535'"], shell=True)
try:
run_and_check(["sysctl net.ipv4.ip_local_port_range='55000 65535'"], shell=True)
except Exception as ex:
logging.warning(
"Failed to run sysctl, tests may fail with EADDRINUSE %s", str(ex)
)
@pytest.fixture(autouse=True, scope="session")

View File

@ -11,6 +11,7 @@ node1 = cluster.add_instance(
tag="20.8.11.17",
with_installed_binary=True,
stay_alive=True,
allow_analyzer=False,
)

View File

@ -24,6 +24,7 @@ node3 = cluster.add_instance(
tag="20.3.16",
stay_alive=True,
with_installed_binary=True,
allow_analyzer=False,
)
node4 = cluster.add_instance("node4")

View File

@ -36,6 +36,7 @@ def cluster():
with_installed_binary=True,
image="clickhouse/clickhouse-server",
tag="22.8.14.53",
allow_analyzer=False,
)
cluster.start()

View File

@ -11,6 +11,7 @@ node_old = cluster.add_instance(
tag="20.8.9.6",
stay_alive=True,
with_installed_binary=True,
allow_analyzer=False,
)
node_new = cluster.add_instance(
"node2",

View File

@ -14,6 +14,7 @@ node_dist = cluster.add_instance(
tag="21.11.9.1",
stay_alive=True,
with_installed_binary=True,
allow_analyzer=False,
)

View File

@ -33,6 +33,7 @@ backward = make_instance(
# version without DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2
tag="23.2.3",
with_installed_binary=True,
allow_analyzer=False,
)
users = pytest.mark.parametrize(

View File

@ -5,13 +5,22 @@ from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1", main_configs=["configs/clusters.xml"], with_zookeeper=True
"node1",
main_configs=["configs/clusters.xml"],
with_zookeeper=True,
allow_analyzer=False,
)
node2 = cluster.add_instance(
"node2", main_configs=["configs/clusters.xml"], with_zookeeper=True
"node2",
main_configs=["configs/clusters.xml"],
with_zookeeper=True,
allow_analyzer=False,
)
node3 = cluster.add_instance(
"node3", main_configs=["configs/clusters.xml"], with_zookeeper=True
"node3",
main_configs=["configs/clusters.xml"],
with_zookeeper=True,
allow_analyzer=False,
)
node4 = cluster.add_instance(
"node4",
@ -19,6 +28,7 @@ node4 = cluster.add_instance(
image="yandex/clickhouse-server",
tag="21.5",
with_zookeeper=True,
allow_analyzer=False,
)

View File

@ -5,13 +5,22 @@ from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1", main_configs=["configs/clusters.xml"], with_zookeeper=True
"node1",
main_configs=["configs/clusters.xml"],
with_zookeeper=True,
allow_analyzer=False,
)
node2 = cluster.add_instance(
"node2", main_configs=["configs/clusters.xml"], with_zookeeper=True
"node2",
main_configs=["configs/clusters.xml"],
with_zookeeper=True,
allow_analyzer=False,
)
node3 = cluster.add_instance(
"node3", main_configs=["configs/clusters.xml"], with_zookeeper=True
"node3",
main_configs=["configs/clusters.xml"],
with_zookeeper=True,
allow_analyzer=False,
)
node4 = cluster.add_instance(
"node4",
@ -20,6 +29,7 @@ node4 = cluster.add_instance(
tag="21.6",
with_installed_binary=True,
with_zookeeper=True,
allow_analyzer=False,
)

View File

@ -855,6 +855,11 @@ def test_s3_engine_heavy_write_check_mem(
memory = in_flight_memory[1]
node = cluster.instances[node_name]
# it's bad idea to test something related to memory with sanitizers
if node.is_built_with_sanitizer():
pytest.skip("Disabled for sanitizers")
node.query("DROP TABLE IF EXISTS s3_test SYNC")
node.query(
"CREATE TABLE s3_test"
@ -887,7 +892,7 @@ def test_s3_engine_heavy_write_check_mem(
assert int(memory_usage) < 1.2 * memory
assert int(memory_usage) > 0.8 * memory
assert int(wait_inflight) > 10 * 1000 * 1000
assert int(wait_inflight) > in_flight * 1000 * 1000
check_no_objects_after_drop(cluster, node_name=node_name)

View File

@ -1166,152 +1166,6 @@ def produce_alter_move(node, name):
pass
@pytest.mark.parametrize(
"name,engine",
[
pytest.param("concurrently_altering_mt", "MergeTree()", id="mt"),
pytest.param(
"concurrently_altering_replicated_mt",
"ReplicatedMergeTree('/clickhouse/concurrently_altering_replicated_mt', '1')",
id="replicated",
),
],
)
def test_concurrent_alter_move(start_cluster, name, engine):
try:
node1.query_with_retry(
"""
CREATE TABLE IF NOT EXISTS {name} (
EventDate Date,
number UInt64
) ENGINE = {engine}
ORDER BY tuple()
PARTITION BY toYYYYMM(EventDate)
SETTINGS storage_policy='jbods_with_external'
""".format(
name=name, engine=engine
)
)
values = list({random.randint(1, 1000000) for _ in range(0, 1000)})
def insert(num):
for i in range(num):
day = random.randint(11, 30)
value = values.pop()
month = "0" + str(random.choice([3, 4]))
node1.query_with_retry(
"INSERT INTO {} VALUES(toDate('2019-{m}-{d}'), {v})".format(
name, m=month, d=day, v=value
)
)
def alter_move(num):
for i in range(num):
produce_alter_move(node1, name)
def alter_update(num):
for i in range(num):
node1.query(
"ALTER TABLE {} UPDATE number = number + 1 WHERE 1".format(name)
)
def optimize_table(num):
for i in range(num):
node1.query_with_retry("OPTIMIZE TABLE {} FINAL".format(name))
p = Pool(15)
tasks = []
for i in range(5):
tasks.append(p.apply_async(insert, (100,)))
tasks.append(p.apply_async(alter_move, (100,)))
tasks.append(p.apply_async(alter_update, (100,)))
tasks.append(p.apply_async(optimize_table, (100,)))
for task in tasks:
task.get(timeout=240)
assert node1.query("SELECT 1") == "1\n"
assert node1.query("SELECT COUNT() FROM {}".format(name)) == "500\n"
finally:
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
@pytest.mark.parametrize(
"name,engine",
[
pytest.param("concurrently_dropping_mt", "MergeTree()", id="mt"),
pytest.param(
"concurrently_dropping_replicated_mt",
"ReplicatedMergeTree('/clickhouse/concurrently_dropping_replicated_mt', '1')",
id="replicated",
),
],
)
def test_concurrent_alter_move_and_drop(start_cluster, name, engine):
try:
node1.query(
"""
CREATE TABLE IF NOT EXISTS {name} (
EventDate Date,
number UInt64
) ENGINE = {engine}
ORDER BY tuple()
PARTITION BY toYYYYMM(EventDate)
SETTINGS storage_policy='jbods_with_external'
""".format(
name=name, engine=engine
)
)
values = list({random.randint(1, 1000000) for _ in range(0, 1000)})
def insert(num):
for i in range(num):
day = random.randint(11, 30)
value = values.pop()
month = "0" + str(random.choice([3, 4]))
node1.query_with_retry(
"INSERT INTO {} VALUES(toDate('2019-{m}-{d}'), {v})".format(
name, m=month, d=day, v=value
)
)
def alter_move(num):
for i in range(num):
produce_alter_move(node1, name)
def alter_drop(num):
for i in range(num):
partition = random.choice([201903, 201904])
op = random.choice(["drop", "detach"])
try:
node1.query(
"ALTER TABLE {} {} PARTITION {}".format(name, op, partition)
)
except QueryRuntimeException as e:
if "Code: 650" in e.stderr:
pass
else:
raise e
insert(20)
p = Pool(15)
tasks = []
for i in range(5):
tasks.append(p.apply_async(insert, (20,)))
tasks.append(p.apply_async(alter_move, (20,)))
tasks.append(p.apply_async(alter_drop, (20,)))
for task in tasks:
task.get(timeout=120)
assert node1.query("SELECT 1") == "1\n"
finally:
node1.query_with_retry(f"DROP TABLE IF EXISTS {name} SYNC")
@pytest.mark.parametrize(
"name,engine",
[

View File

@ -10,6 +10,7 @@ node18_14 = cluster.add_instance(
tag="18.14.19",
with_installed_binary=True,
main_configs=["configs/config.d/test_cluster.xml"],
allow_analyzer=False,
)
node19_1 = cluster.add_instance(
"node19_1",
@ -17,6 +18,7 @@ node19_1 = cluster.add_instance(
tag="19.1.16",
with_installed_binary=True,
main_configs=["configs/config.d/test_cluster.xml"],
allow_analyzer=False,
)
node19_4 = cluster.add_instance(
"node19_4",
@ -24,6 +26,7 @@ node19_4 = cluster.add_instance(
tag="19.4.5.35",
with_installed_binary=True,
main_configs=["configs/config.d/test_cluster.xml"],
allow_analyzer=False,
)
node19_8 = cluster.add_instance(
"node19_8",
@ -31,6 +34,7 @@ node19_8 = cluster.add_instance(
tag="19.8.3.8",
with_installed_binary=True,
main_configs=["configs/config.d/test_cluster.xml"],
allow_analyzer=False,
)
node19_11 = cluster.add_instance(
"node19_11",
@ -38,6 +42,7 @@ node19_11 = cluster.add_instance(
tag="19.11.13.74",
with_installed_binary=True,
main_configs=["configs/config.d/test_cluster.xml"],
allow_analyzer=False,
)
node19_13 = cluster.add_instance(
"node19_13",
@ -45,6 +50,7 @@ node19_13 = cluster.add_instance(
tag="19.13.7.57",
with_installed_binary=True,
main_configs=["configs/config.d/test_cluster.xml"],
allow_analyzer=False,
)
node19_16 = cluster.add_instance(
"node19_16",
@ -52,6 +58,7 @@ node19_16 = cluster.add_instance(
tag="19.16.2.2",
with_installed_binary=True,
main_configs=["configs/config.d/test_cluster.xml"],
allow_analyzer=False,
)
old_nodes = [node18_14, node19_1, node19_4, node19_8, node19_11, node19_13, node19_16]
new_node = cluster.add_instance("node_new")

View File

@ -363,6 +363,7 @@ node7 = cluster.add_instance(
tag="19.17.8.54",
stay_alive=True,
with_installed_binary=True,
allow_analyzer=False,
)
node8 = cluster.add_instance(
"node8",

View File

@ -9,6 +9,7 @@ node1 = cluster.add_instance(
tag="20.12.4.5",
stay_alive=True,
with_installed_binary=True,
allow_analyzer=False,
)
node2 = cluster.add_instance(
"node2",
@ -17,6 +18,7 @@ node2 = cluster.add_instance(
tag="20.12.4.5",
stay_alive=True,
with_installed_binary=True,
allow_analyzer=False,
)

View File

@ -11,6 +11,7 @@ node2 = cluster.add_instance(
image="yandex/clickhouse-server",
tag="19.1.14",
with_installed_binary=True,
allow_analyzer=False,
)

View File

@ -35,10 +35,13 @@ def cluster():
cluster.shutdown()
def azure_query(node, query, try_num=10, settings={}):
def azure_query(node, query, expect_error="false", try_num=10, settings={}):
for i in range(try_num):
try:
return node.query(query, settings=settings)
if expect_error == "true":
return node.query_and_get_error(query, settings=settings)
else:
return node.query(query, settings=settings)
except Exception as ex:
retriable_errors = [
"DB::Exception: Azure::Core::Http::TransportException: Connection was closed by the server while trying to read a response",
@ -656,7 +659,7 @@ def test_read_from_not_existing_container(cluster):
node = cluster.instances["node"]
query = f"select * from azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont_not_exists', 'test_table.csv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto')"
expected_err_msg = "container does not exist"
assert expected_err_msg in node.query_and_get_error(query)
assert expected_err_msg in azure_query(node, query, expect_error="true")
def test_function_signatures(cluster):

View File

@ -17,6 +17,7 @@ from helpers.test_tools import TSV
from helpers.network import PartitionManager
from helpers.mock_servers import start_mock_servers
from helpers.test_tools import exec_query_with_retry
from test_storage_azure_blob_storage.test import azure_query
@pytest.fixture(scope="module")
@ -48,26 +49,6 @@ def cluster():
cluster.shutdown()
def azure_query(node, query, try_num=3, settings={}):
for i in range(try_num):
try:
return node.query(query, settings=settings)
except Exception as ex:
retriable_errors = [
"DB::Exception: Azure::Core::Http::TransportException: Connection was closed by the server while trying to read a response"
]
retry = False
for error in retriable_errors:
if error in str(ex):
retry = True
print(f"Try num: {i}. Having retriable error: {ex}")
time.sleep(i)
break
if not retry or i == try_num - 1:
raise Exception(ex)
continue
def get_azure_file_content(filename):
container_name = "cont"
connection_string = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;"
@ -89,20 +70,22 @@ def test_select_all(cluster):
)
print(get_azure_file_content("test_cluster_select_all.csv"))
pure_azure = node.query(
pure_azure = azure_query(
node,
"""
SELECT * from azureBlobStorage(
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_cluster_select_all.csv', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV',
'auto')"""
'auto')""",
)
print(pure_azure)
distributed_azure = node.query(
distributed_azure = azure_query(
node,
"""
SELECT * from azureBlobStorageCluster(
'simple_cluster', 'http://azurite1:10000/devstoreaccount1', 'cont', 'test_cluster_select_all.csv', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV',
'auto')"""
'auto')""",
)
print(distributed_azure)
assert TSV(pure_azure) == TSV(distributed_azure)
@ -119,20 +102,22 @@ def test_count(cluster):
)
print(get_azure_file_content("test_cluster_count.csv"))
pure_azure = node.query(
pure_azure = azure_query(
node,
"""
SELECT count(*) from azureBlobStorage(
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_cluster_count.csv', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV',
'auto', 'key UInt64')"""
'auto', 'key UInt64')""",
)
print(pure_azure)
distributed_azure = node.query(
distributed_azure = azure_query(
node,
"""
SELECT count(*) from azureBlobStorageCluster(
'simple_cluster', 'http://azurite1:10000/devstoreaccount1', 'cont', 'test_cluster_count.csv', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV',
'auto', 'key UInt64')"""
'auto', 'key UInt64')""",
)
print(distributed_azure)
assert TSV(pure_azure) == TSV(distributed_azure)
@ -148,7 +133,8 @@ def test_union_all(cluster):
"'auto', 'a Int32, b String') VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')",
)
pure_azure = node.query(
pure_azure = azure_query(
node,
"""
SELECT * FROM
(
@ -163,9 +149,10 @@ def test_union_all(cluster):
'auto', 'a Int32, b String')
)
ORDER BY (a)
"""
""",
)
azure_distributed = node.query(
azure_distributed = azure_query(
node,
"""
SELECT * FROM
(
@ -182,7 +169,7 @@ def test_union_all(cluster):
'auto', 'a Int32, b String')
)
ORDER BY (a)
"""
""",
)
assert TSV(pure_azure) == TSV(azure_distributed)
@ -197,14 +184,15 @@ def test_skip_unavailable_shards(cluster):
"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', "
"'auto', 'a UInt64') VALUES (1), (2)",
)
result = node.query(
result = azure_query(
node,
"""
SELECT count(*) from azureBlobStorageCluster(
'cluster_non_existent_port',
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_skip_unavailable.csv', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==')
SETTINGS skip_unavailable_shards = 1
"""
""",
)
assert result == "2\n"
@ -220,13 +208,14 @@ def test_unset_skip_unavailable_shards(cluster):
"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', "
"'auto', 'a UInt64') VALUES (1), (2)",
)
result = node.query(
result = azure_query(
node,
"""
SELECT count(*) from azureBlobStorageCluster(
'cluster_non_existent_port',
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_skip_unavailable.csv', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==')
"""
""",
)
assert result == "2\n"
@ -243,19 +232,21 @@ def test_cluster_with_named_collection(cluster):
"'auto', 'a UInt64') VALUES (1), (2)",
)
pure_azure = node.query(
pure_azure = azure_query(
node,
"""
SELECT * from azureBlobStorage(
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_cluster_with_named_collection.csv', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==')
"""
""",
)
azure_cluster = node.query(
azure_cluster = azure_query(
node,
"""
SELECT * from azureBlobStorageCluster(
'simple_cluster', azure_conf2, container='cont', blob_path='test_cluster_with_named_collection.csv')
"""
""",
)
assert TSV(pure_azure) == TSV(azure_cluster)
@ -277,12 +268,13 @@ def test_partition_parallel_readig_withcluster(cluster):
assert "3,2,1\n" == get_azure_file_content("test_tf_1.csv")
assert "78,43,45\n" == get_azure_file_content("test_tf_45.csv")
azure_cluster = node.query(
azure_cluster = azure_query(
node,
"""
SELECT count(*) from azureBlobStorageCluster(
'simple_cluster',
azure_conf2, container='cont', blob_path='test_tf_*.csv', format='CSV', compression='auto', structure='column1 UInt32, column2 UInt32, column3 UInt32')
"""
""",
)
assert azure_cluster == "3\n"

View File

@ -23,6 +23,7 @@ node4 = cluster.add_instance(
main_configs=[
"configs/compat.xml",
],
allow_analyzer=False,
)
node5 = cluster.add_instance(
@ -35,6 +36,7 @@ node5 = cluster.add_instance(
main_configs=[
"configs/compat.xml",
],
allow_analyzer=False,
)
node6 = cluster.add_instance(
"node6",
@ -46,6 +48,7 @@ node6 = cluster.add_instance(
main_configs=[
"configs/compat.xml",
],
allow_analyzer=False,
)

View File

@ -15,6 +15,7 @@ node2 = cluster.add_instance(
tag="21.2",
with_installed_binary=True,
stay_alive=True,
allow_analyzer=False,
)
# Use differents nodes because if there is node.restart_from_latest_version(), then in later tests
@ -25,6 +26,7 @@ node3 = cluster.add_instance(
tag="21.5",
with_installed_binary=True,
stay_alive=True,
allow_analyzer=False,
)
node4 = cluster.add_instance(
"node4",
@ -32,6 +34,7 @@ node4 = cluster.add_instance(
tag="21.5",
with_installed_binary=True,
stay_alive=True,
allow_analyzer=False,
)
node5 = cluster.add_instance(
"node5",
@ -39,6 +42,7 @@ node5 = cluster.add_instance(
tag="21.5",
with_installed_binary=True,
stay_alive=True,
allow_analyzer=False,
)
node6 = cluster.add_instance(
"node6",
@ -46,6 +50,7 @@ node6 = cluster.add_instance(
tag="21.5",
with_installed_binary=True,
stay_alive=True,
allow_analyzer=False,
)

View File

@ -16,6 +16,7 @@ node1 = cluster.add_instance(
main_configs=[
"configs/compat.xml",
],
allow_analyzer=False,
)
node2 = cluster.add_instance(
"node2",
@ -27,6 +28,7 @@ node2 = cluster.add_instance(
main_configs=[
"configs/compat.xml",
],
allow_analyzer=False,
)
node3 = cluster.add_instance(
"node3",
@ -38,6 +40,7 @@ node3 = cluster.add_instance(
main_configs=[
"configs/compat.xml",
],
allow_analyzer=False,
)

View File

@ -0,0 +1,19 @@
<test>
<settings>
<max_threads>1</max_threads>
</settings>
<create_query>CREATE TABLE keys (key UInt64) ENGINE = MergeTree ORDER BY key;</create_query>
<create_query>CREATE TABLE dict (key UInt64, value1 UInt64, value2 Float64, value3 String,
value4 String, value5 String, value6 String, value7 String, value8 String, value9 String,
value10 String) ENGINE = Join(ANY, LEFT, key);</create_query>
<fill_query>INSERT INTO keys SELECT rand() FROM numbers(10000000);</fill_query>
<fill_query>INSERT INTO dict SELECT rand(), rand()%1000, rand()*0.0001, toString(number),
toString(number), toString(number), toString(number), toString(number), toString(number),
toString(number), toString(number) FROM numbers(1000000);</fill_query>
<query>SELECT keys.key, value1 FROM keys ANY LEFT JOIN dict AS d ON (keys.key = d.key) FORMAT Null;</query>
<query>SELECT keys.key, value1 FROM keys ANY LEFT JOIN dict AS d ON (keys.key = d.key) FORMAT Null SETTINGS
allow_experimental_analyzer=1</query>
</test>

View File

@ -27,6 +27,7 @@ SELECT a > 0, b > 0, g > 0 FROM decimal ORDER BY a DESC;
SELECT a, g > toInt8(0), g > toInt16(0), g > toInt32(0), g > toInt64(0) FROM decimal ORDER BY a;
SELECT a, g > toUInt8(0), g > toUInt16(0), g > toUInt32(0), g > toUInt64(0) FROM decimal ORDER BY a;
SELECT a, b, g FROM decimal WHERE a IN(42) AND b IN(42) AND g IN(42);
SELECT a, b, g FROM decimal WHERE a IN(42) AND b IN(42) AND g IN(42) SETTINGS allow_experimental_analyzer = 1;
SELECT a, b, g FROM decimal WHERE a > 0 AND a <= 42 AND b <= 42 AND g <= 42;
SELECT d, e, f from decimal WHERE d > 0 AND d < 1 AND e > 0 AND e < 1 AND f > 0 AND f < 1;

View File

@ -18,19 +18,19 @@ $CLICKHOUSE_CLIENT --query "
PARTITION BY date
"
$CLICKHOUSE_CLIENT --query "INSERT INTO replicated_mutation_table SELECT toDate('2019-10-02'), number, '42' FROM numbers(4)"
$CLICKHOUSE_CLIENT --query "INSERT INTO replicated_mutation_table SELECT toDate('2019-10-02'), number, '42' FROM numbers(10)"
$CLICKHOUSE_CLIENT --query "INSERT INTO replicated_mutation_table SELECT toDate('2019-10-02'), number, 'Hello' FROM numbers(4)"
$CLICKHOUSE_CLIENT --query "INSERT INTO replicated_mutation_table SELECT toDate('2019-10-02'), number, 'Hello' FROM numbers(10)"
$CLICKHOUSE_CLIENT --query "ALTER TABLE replicated_mutation_table UPDATE key = key + 1 WHERE sleepEachRow(1) == 0 SETTINGS mutations_sync = 2" 2>&1 | grep -o 'Mutation 0000000000 was killed' | head -n 1 &
check_query="SELECT count() FROM system.mutations WHERE table='replicated_mutation_table' and database='$CLICKHOUSE_DATABASE' and mutation_id='0000000000'"
query_result=$($CLICKHOUSE_CLIENT --query="$check_query" 2>&1)
query_result=$(curl $CLICKHOUSE_URL --silent --fail --data "$check_query")
while [ "$query_result" != "1" ]
do
query_result=$($CLICKHOUSE_CLIENT --query="$check_query" 2>&1)
query_result=$(curl $CLICKHOUSE_URL --silent --fail --data "$check_query")
sleep 0.1
done
@ -38,7 +38,7 @@ $CLICKHOUSE_CLIENT --query "KILL MUTATION WHERE table='replicated_mutation_table
while [ "$query_result" != "0" ]
do
query_result=$($CLICKHOUSE_CLIENT --query="$check_query" 2>&1)
query_result=$(curl $CLICKHOUSE_URL --silent --fail --data "$check_query")
sleep 0.5
done
@ -49,11 +49,11 @@ $CLICKHOUSE_CLIENT --query "ALTER TABLE replicated_mutation_table MODIFY COLUMN
check_query="SELECT type = 'UInt64' FROM system.columns WHERE table='replicated_mutation_table' and database='$CLICKHOUSE_DATABASE' and name='value'"
query_result=$($CLICKHOUSE_CLIENT --query="$check_query" 2>&1)
query_result=$(curl $CLICKHOUSE_URL --silent --fail --data "$check_query")
while [ "$query_result" != "1" ]
do
query_result=$($CLICKHOUSE_CLIENT --query="$check_query" 2>&1)
query_result=$(curl $CLICKHOUSE_URL --silent --fail --data "$check_query")
sleep 0.5
done
@ -66,7 +66,7 @@ $CLICKHOUSE_CLIENT --query "KILL MUTATION WHERE table='replicated_mutation_table
while [ "$query_result" != "0" ]
do
query_result=$($CLICKHOUSE_CLIENT --query="$check_query" 2>&1)
query_result=$(curl $CLICKHOUSE_URL --silent --fail --data "$check_query")
sleep 0.5
done

View File

@ -0,0 +1,8 @@
1
1000
0
0
0
1
0
0_0_0_0 Wide 370db59d5dcaef5d762b11d319c368c7 514a8be2dac94fd039dbd230065e58a4 b324ada5cd6bb14402c1e59200bd003a

View File

@ -0,0 +1,37 @@
#!/usr/bin/env bash
# Tags: zookeeper, no-replicated-database
# no-replicated-database because it adds extra replicas
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "drop table if exists rmt sync;"
$CLICKHOUSE_CLIENT -q "CREATE TABLE rmt (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16))
ENGINE = ReplicatedMergeTree('/test/02253/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/rmt', '1') ORDER BY a PARTITION BY b % 10
SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 0,
cleanup_thread_preferred_points_per_iteration=0, min_bytes_for_wide_part=0, remove_empty_parts=0"
$CLICKHOUSE_CLIENT --insert_keeper_fault_injection_probability=0 -q "INSERT INTO rmt SELECT rand(1), 0, 1 / rand(3), toString(rand(4)), [rand(5), rand(6)], rand(7) % 2 ? NULL : generateUUIDv4(), (rand(8), rand(9)) FROM numbers(1000);"
$CLICKHOUSE_CLIENT -q "check table rmt"
$CLICKHOUSE_CLIENT -q "select count() from rmt"
path=$($CLICKHOUSE_CLIENT -q "select path from system.parts where database='$CLICKHOUSE_DATABASE' and table='rmt' and name='0_0_0_0'")
# ensure that path is absolute before removing
$CLICKHOUSE_CLIENT -q "select throwIf(substring('$path', 1, 1) != '/', 'Path is relative: $path')" || exit
rm -rf "$path"
# detach the broken part, replace it with empty one
$CLICKHOUSE_CLIENT -q "check table rmt" 2>/dev/null
$CLICKHOUSE_CLIENT -q "select count() from rmt"
$CLICKHOUSE_CLIENT --receive_timeout=60 -q "system sync replica rmt"
# the empty part should pass the check
$CLICKHOUSE_CLIENT -q "check table rmt"
$CLICKHOUSE_CLIENT -q "select count() from rmt"
$CLICKHOUSE_CLIENT -q "select name, part_type, hash_of_all_files, hash_of_uncompressed_files, uncompressed_hash_of_compressed_files from system.parts where database=currentDatabase()"
$CLICKHOUSE_CLIENT -q "drop table rmt sync;"

View File

@ -8,3 +8,12 @@ Ok
Ok
Ok
Ok
Ok
Ok
Ok
Ok
Ok
Ok
Ok
Ok
Ok

View File

@ -10,6 +10,9 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -mn -q """
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;
CREATE TABLE t1 (x UInt64, y UInt64) ENGINE = MergeTree ORDER BY y
AS SELECT sipHash64(number, 't1_x') % 100 AS x, sipHash64(number, 't1_y') % 100 AS y FROM numbers(100);
@ -17,6 +20,26 @@ CREATE TABLE t2 (x UInt64, y UInt64) ENGINE = MergeTree ORDER BY y
AS SELECT sipHash64(number, 't2_x') % 100 AS x, sipHash64(number, 't2_y') % 100 AS y FROM numbers(100);
"""
# Arguments:
# - Query result
# - Processor name
# - Expected description
# - Check first occurrence
function match_description() {
QUERY_RESULT=$1
PROCESSOR_NAME=$2
EXPECTED_DESCRIPTION=$3
CHECK_FIRST_OCCURRENCE=${4:-true}
SED_EXPR="/$PROCESSOR_NAME/{ n; s/^[ \t]*Description: //; p"
[ $CHECK_FIRST_OCCURRENCE = true ] && SED_EXPR+="; q }" || SED_EXPR+=" }"
DESC=$(sed -n "$SED_EXPR" <<< "$QUERY_RESULT")
[[ "$DESC" == "$EXPECTED_DESCRIPTION" ]] && echo "Ok" || echo "Fail: ReadHeadBalancedProcessor description '$DESC' != '$EXPECTED_DESCRIPTION' "
}
# Arguments:
# - value of max_rows_in_set_to_optimize_join
# - join kind
@ -37,10 +60,20 @@ RES=$(
EXPECTED_PIPELINE_STEPS=$4
RES=$(
$CLICKHOUSE_CLIENT --max_rows_in_set_to_optimize_join=${PARAM_VALUE} --join_algorithm='full_sorting_merge' \
-q "EXPLAIN PIPELINE SELECT count() FROM t1 ${JOIN_KIND} JOIN t2 ON t1.x = t2.x" \
| grep -o -e ReadHeadBalancedProcessor -e FilterBySetOnTheFlyTransform -e CreatingSetsOnTheFlyTransform | wc -l
-q "EXPLAIN PIPELINE SELECT count() FROM t1 ${JOIN_KIND} JOIN t2 ON t1.x = t2.x"
)
[ "$RES" -eq "$EXPECTED_PIPELINE_STEPS" ] && echo "Ok" || echo "Fail: $RES != $EXPECTED_PIPELINE_STEPS"
# Count match
COUNT=$(echo "$RES" | grep -o -e ReadHeadBalancedProcessor -e FilterBySetOnTheFlyTransform -e CreatingSetsOnTheFlyTransform | wc -l)
[ "$COUNT" -eq "$EXPECTED_PIPELINE_STEPS" ] && echo "Ok" || echo "Fail: $COUNT != $EXPECTED_PIPELINE_STEPS"
# Description matchers
if [ "$EXPECTED_PIPELINE_STEPS" -ne 0 ]; then
match_description "$RES" 'ReadHeadBalancedProcessor' 'Reads rows from two streams evenly'
match_description "$RES" 'FilterBySetOnTheFlyTransform' "Filter rows using other join table side\'s set"
match_description "$RES" 'CreatingSetsOnTheFlyTransform' 'Create set and filter Left joined stream
Create set and filter Right joined stream' false
fi
}

View File

@ -11,3 +11,16 @@
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1

View File

@ -1,10 +1,30 @@
DROP TABLE IF EXISTS dtest;
SELECT count() == 0 FROM (SELECT '33.3' :: Decimal(9, 1) AS a WHERE a IN ('33.33' :: Decimal(9, 2)));
CREATE TABLE dtest ( `a` Decimal(18, 0), `b` Decimal(18, 1), `c` Decimal(36, 0) ) ENGINE = Memory;
INSERT INTO dtest VALUES ('33', '44.4', '35');
SELECT count() == 0 FROM (SELECT '33.3' :: Decimal(9, 1) AS a WHERE a IN ('33.33' :: Decimal(9, 2)));
SELECT count() == 0 FROM dtest WHERE a IN toDecimal32('33.3000', 4);
SELECT count() == 0 FROM dtest WHERE a IN toDecimal64('33.3000', 4);
SELECT count() == 0 FROM dtest WHERE a IN toDecimal128('33.3000', 4);
SELECT count() == 0 FROM dtest WHERE a IN toDecimal256('33.3000', 4);
SELECT count() == 0 FROM dtest WHERE b IN toDecimal32('44.4000', 0);
SELECT count() == 0 FROM dtest WHERE b IN toDecimal64('44.4000', 0);
SELECT count() == 0 FROM dtest WHERE b IN toDecimal128('44.4000', 0);
SELECT count() == 0 FROM dtest WHERE b IN toDecimal256('44.4000', 0);
SELECT count() == 1 FROM dtest WHERE b IN toDecimal32('44.4000', 4);
SELECT count() == 1 FROM dtest WHERE b IN toDecimal64('44.4000', 4);
SELECT count() == 1 FROM dtest WHERE b IN toDecimal128('44.4000', 4);
SELECT count() == 1 FROM dtest WHERE b IN toDecimal256('44.4000', 4);
SET allow_experimental_analyzer = 1;
SELECT count() == 0 FROM (SELECT '33.3' :: Decimal(9, 1) AS a WHERE a IN ('33.33' :: Decimal(9, 2)));
SELECT count() == 0 FROM dtest WHERE a IN toDecimal32('33.3000', 4);
SELECT count() == 0 FROM dtest WHERE a IN toDecimal64('33.3000', 4);
SELECT count() == 0 FROM dtest WHERE a IN toDecimal128('33.3000', 4);

View File

@ -19,3 +19,24 @@
1
1
1
0
1
1
0
0
1
1
0
0
1
1
0
0
1
1
0
1
1
1
1
1

View File

@ -1,3 +1,8 @@
DROP TABLE IF EXISTS decimal_in_float_test;
CREATE TABLE decimal_in_float_test ( `a` Decimal(18, 0), `b` Decimal(36, 2) ) ENGINE = Memory;
INSERT INTO decimal_in_float_test VALUES ('33', '44.44');
SELECT toDecimal32(1.555,3) IN (1.5551);
SELECT toDecimal32(1.555,3) IN (1.5551,1.555);
SELECT toDecimal32(1.555,3) IN (1.5551,1.555000);
@ -18,10 +23,36 @@ SELECT toDecimal256(1.555,3) IN (1.5551,1.555);
SELECT toDecimal256(1.555,3) IN (1.5551,1.555000);
SELECT toDecimal256(1.555,3) IN (1.550,1.5);
DROP TABLE IF EXISTS decimal_in_float_test;
CREATE TABLE decimal_in_float_test ( `a` Decimal(18, 0), `b` Decimal(36, 2) ) ENGINE = Memory;
INSERT INTO decimal_in_float_test VALUES ('33', '44.44');
SELECT count() == 1 FROM decimal_in_float_test WHERE a IN (33);
SELECT count() == 1 FROM decimal_in_float_test WHERE a IN (33.0);
SELECT count() == 1 FROM decimal_in_float_test WHERE a NOT IN (33.333);
SELECT count() == 1 FROM decimal_in_float_test WHERE b IN (44.44);
SELECT count() == 1 FROM decimal_in_float_test WHERE b NOT IN (44.4,44.444);
SET allow_experimental_analyzer = 1;
SELECT toDecimal32(1.555,3) IN (1.5551);
SELECT toDecimal32(1.555,3) IN (1.5551,1.555);
SELECT toDecimal32(1.555,3) IN (1.5551,1.555000);
SELECT toDecimal32(1.555,3) IN (1.550,1.5);
SELECT toDecimal64(1.555,3) IN (1.5551);
SELECT toDecimal64(1.555,3) IN (1.5551,1.555);
SELECT toDecimal64(1.555,3) IN (1.5551,1.555000);
SELECT toDecimal64(1.555,3) IN (1.550,1.5);
SELECT toDecimal128(1.555,3) IN (1.5551);
SELECT toDecimal128(1.555,3) IN (1.5551,1.555);
SELECT toDecimal128(1.555,3) IN (1.5551,1.555000);
SELECT toDecimal128(1.555,3) IN (1.550,1.5);
SELECT toDecimal256(1.555,3) IN (1.5551);
SELECT toDecimal256(1.555,3) IN (1.5551,1.555);
SELECT toDecimal256(1.555,3) IN (1.5551,1.555000);
SELECT toDecimal256(1.555,3) IN (1.550,1.5);
SELECT count() == 1 FROM decimal_in_float_test WHERE a IN (33);
SELECT count() == 1 FROM decimal_in_float_test WHERE a IN (33.0);

View File

@ -0,0 +1,23 @@
[2,5]
[2,6]
[4.5,5,12,10.1]
[(11.1,5.4),(6,21)]
[[13,2],[3]]
[2,2]
[2,3]
[2,4]
[2,5]
[2,6]
[2,2]
[2,3]
[2,4]
[2,5]
[2,6]
[0,0,0]
[(NULL,100000000000000000000),(NULL,1048833)]
[2,2]
[2,3]
[2,4]
[2,5]
[2,6]
[11,1,-2]

View File

@ -0,0 +1,18 @@
SELECT (materialize([1,1]) + materialize([1,4]));
SELECT ([1,2] + [1,4]);
SELECT ([2.5, 1, 3, 10.1] + [2, 4, 9, 0]);
SELECT ([(1,3), (2,9)] + [(10.1, 2.4), (4,12)]);
SELECT ([[1,1],[2]]+[[12,1],[1]]);
SELECT ([1,2]+[1,number]) from numbers(5);
SELECT ([1,2::UInt64]+[1,number]) from numbers(5);
SELECT ([materialize(1),materialize(2),materialize(3)]-[1,2,3]);
SELECT [(NULL, 256), (NULL, 256)] + [(1., 100000000000000000000.), (NULL, 1048577)];
SELECT ([1,2::UInt64]+[1,number]) from numbers(5);
CREATE TABLE my_table (values Array(Int32)) ENGINE = MergeTree() ORDER BY values;
INSERT INTO my_table (values) VALUES ([12, 3, 1]);
SELECT values - [1,2,3] FROM my_table WHERE arrayExists(x -> x > 5, values);
SELECT ([12,13] % [5,6]); -- { serverError 43 }
SELECT ([2,3,4]-[1,-2,10,29]); -- { serverError 190 }
CREATE TABLE a ( x Array(UInt64), y Array(UInt64)) ENGINE = Memory;
INSERT INTO a VALUES ([2,3],[4,5]),([1,2,3], [4,5]),([6,7],[8,9,10]);
SELECT x, y, x+y FROM a; -- { serverError 190 }

View File

@ -1,3 +1,4 @@
v23.7.4.5-stable 2023-08-08
v23.7.3.14-stable 2023-08-05
v23.7.2.25-stable 2023-08-03
v23.7.1.2470-stable 2023-07-27

1 v23.7.3.14-stable v23.7.4.5-stable 2023-08-05 2023-08-08
1 v23.7.4.5-stable 2023-08-08
2 v23.7.3.14-stable v23.7.3.14-stable 2023-08-05 2023-08-05
3 v23.7.2.25-stable v23.7.2.25-stable 2023-08-03 2023-08-03
4 v23.7.1.2470-stable v23.7.1.2470-stable 2023-07-27 2023-07-27

View File

@ -35,7 +35,6 @@ ENGINE = MergeTree ORDER BY (date, file, name, args_name);
INPUT_DIR=$1
OUTPUT_DIR=$2
EXTRA_COLUMN_VALUES=$3
find "$INPUT_DIR" -name '*.json' | grep -P '\.(c|cpp|cc|cxx)\.json$' | xargs -P $(nproc) -I{} bash -c "
@ -43,7 +42,7 @@ find "$INPUT_DIR" -name '*.json' | grep -P '\.(c|cpp|cc|cxx)\.json$' | xargs -P
LIBRARY_NAME=\$(echo '{}' | sed -r -e 's!^.*/CMakeFiles/([^/]+)\.dir/.*\$!\1!')
START_TIME=\$(jq '.beginningOfTime' '{}')
jq -c '.traceEvents[] | [${EXTRA_COLUMN_VALUES} \"'\"\$ORIGINAL_FILENAME\"'\", \"'\"\$LIBRARY_NAME\"'\", '\$START_TIME', .pid, .tid, .ph, .ts, .dur, .cat, .name, .args.detail, .args.count, .args[\"avg ms\"], .args.name]' '{}' > \"${OUTPUT_DIR}/\$\$\"
jq -c '.traceEvents[] | [\"'\"\$ORIGINAL_FILENAME\"'\", \"'\"\$LIBRARY_NAME\"'\", '\$START_TIME', .pid, .tid, .ph, .ts, .dur, .cat, .name, .args.detail, .args.count, .args[\"avg ms\"], .args.name]' '{}' > \"${OUTPUT_DIR}/\$\$\"
"
# Now you can upload it as follows: