Merge branch 'master' into ch_canh_fix_decrypt_with_null

This commit is contained in:
Duc Canh Le 2022-10-04 13:20:29 +08:00
commit a886804cbf
80 changed files with 1225 additions and 178 deletions

3
.gitmodules vendored
View File

@ -284,3 +284,6 @@
[submodule "contrib/llvm-project"]
path = contrib/llvm-project
url = https://github.com/ClickHouse/llvm-project.git
[submodule "contrib/corrosion"]
path = contrib/corrosion
url = https://github.com/corrosion-rs/corrosion.git

View File

@ -1,4 +1,4 @@
cmake_minimum_required(VERSION 3.15)
cmake_minimum_required(VERSION 3.20)
project(ClickHouse LANGUAGES C CXX ASM)
@ -574,6 +574,10 @@ include_directories(${ConfigIncludePath})
include (cmake/warnings.cmake)
include (cmake/print_flags.cmake)
if (ENABLE_RUST)
add_subdirectory (rust)
endif()
add_subdirectory (base)
add_subdirectory (src)
add_subdirectory (programs)

View File

@ -176,6 +176,249 @@ void __explicit_bzero_chk(void * buf, size_t len, size_t unused)
}
#include <unistd.h>
#include "syscall.h"
ssize_t copy_file_range(int fd_in, off_t *off_in, int fd_out, off_t *off_out, size_t len, unsigned flags)
{
return syscall(SYS_copy_file_range, fd_in, off_in, fd_out, off_out, len, flags);
}
long splice(int fd_in, off_t *off_in, int fd_out, off_t *off_out, size_t len, unsigned flags)
{
return syscall(SYS_splice, fd_in, off_in, fd_out, off_out, len, flags);
}
#define _BSD_SOURCE
#include <sys/stat.h>
#include <stdint.h>
#if !defined(__aarch64__)
struct statx {
uint32_t stx_mask;
uint32_t stx_blksize;
uint64_t stx_attributes;
uint32_t stx_nlink;
uint32_t stx_uid;
uint32_t stx_gid;
uint16_t stx_mode;
uint16_t pad1;
uint64_t stx_ino;
uint64_t stx_size;
uint64_t stx_blocks;
uint64_t stx_attributes_mask;
struct {
int64_t tv_sec;
uint32_t tv_nsec;
int32_t pad;
} stx_atime, stx_btime, stx_ctime, stx_mtime;
uint32_t stx_rdev_major;
uint32_t stx_rdev_minor;
uint32_t stx_dev_major;
uint32_t stx_dev_minor;
uint64_t spare[14];
};
#endif
int statx(int fd, const char *restrict path, int flag,
unsigned int mask, struct statx *restrict statxbuf)
{
return syscall(SYS_statx, fd, path, flag, mask, statxbuf);
}
#include <syscall.h>
ssize_t getrandom(void *buf, size_t buflen, unsigned flags)
{
/// There was cancellable syscall (syscall_cp), but I don't care too.
return syscall(SYS_getrandom, buf, buflen, flags);
}
#include <errno.h>
#include <limits.h>
#define ALIGN (sizeof(size_t))
#define ONES ((size_t)-1/UCHAR_MAX)
#define HIGHS (ONES * (UCHAR_MAX/2+1))
#define HASZERO(x) ((x)-ONES & ~(x) & HIGHS)
char *__strchrnul(const char *s, int c)
{
c = (unsigned char)c;
if (!c) return (char *)s + strlen(s);
#ifdef __GNUC__
typedef size_t __attribute__((__may_alias__)) word;
const word *w;
for (; (uintptr_t)s % ALIGN; s++)
if (!*s || *(unsigned char *)s == c) return (char *)s;
size_t k = ONES * c;
for (w = (void *)s; !HASZERO(*w) && !HASZERO(*w^k); w++);
s = (void *)w;
#endif
for (; *s && *(unsigned char *)s != c; s++);
return (char *)s;
}
int __execvpe(const char *file, char *const argv[], char *const envp[])
{
const char *p, *z, *path = getenv("PATH");
size_t l, k;
int seen_eacces = 0;
errno = ENOENT;
if (!*file) return -1;
if (strchr(file, '/'))
return execve(file, argv, envp);
if (!path) path = "/usr/local/bin:/bin:/usr/bin";
k = strnlen(file, NAME_MAX+1);
if (k > NAME_MAX) {
errno = ENAMETOOLONG;
return -1;
}
l = strnlen(path, PATH_MAX-1)+1;
for(p=path; ; p=z) {
char b[l+k+1];
z = __strchrnul(p, ':');
if (z-p >= l) {
if (!*z++) break;
continue;
}
memcpy(b, p, z-p);
b[z-p] = '/';
memcpy(b+(z-p)+(z>p), file, k+1);
execve(b, argv, envp);
switch (errno) {
case EACCES:
seen_eacces = 1;
case ENOENT:
case ENOTDIR:
break;
default:
return -1;
}
if (!*z++) break;
}
if (seen_eacces) errno = EACCES;
return -1;
}
#include "spawn.h"
int posix_spawnp(pid_t *restrict res, const char *restrict file,
const posix_spawn_file_actions_t *fa,
const posix_spawnattr_t *restrict attr,
char *const argv[restrict], char *const envp[restrict])
{
posix_spawnattr_t spawnp_attr = { 0 };
if (attr) spawnp_attr = *attr;
spawnp_attr.__fn = (void *)__execvpe;
return posix_spawn(res, file, fa, &spawnp_attr, argv, envp);
}
#define FDOP_CLOSE 1
#define FDOP_DUP2 2
#define FDOP_OPEN 3
#define FDOP_CHDIR 4
#define FDOP_FCHDIR 5
#define ENOMEM 12
#define EBADF 9
struct fdop {
struct fdop *next, *prev;
int cmd, fd, srcfd, oflag;
mode_t mode;
char path[];
};
int posix_spawn_file_actions_init(posix_spawn_file_actions_t *fa) {
fa->__actions = 0;
return 0;
}
int posix_spawn_file_actions_addchdir_np(posix_spawn_file_actions_t *restrict fa, const char *restrict path) {
struct fdop *op = malloc(sizeof *op + strlen(path) + 1);
if (!op) return ENOMEM;
op->cmd = FDOP_CHDIR;
op->fd = -1;
strcpy(op->path, path);
if ((op->next = fa->__actions)) op->next->prev = op;
op->prev = 0;
fa->__actions = op;
return 0;
}
int posix_spawn_file_actions_addclose(posix_spawn_file_actions_t *fa, int fd) {
if (fd < 0) return EBADF;
struct fdop *op = malloc(sizeof *op);
if (!op) return ENOMEM;
op->cmd = FDOP_CLOSE;
op->fd = fd;
if ((op->next = fa->__actions)) op->next->prev = op;
op->prev = 0;
fa->__actions = op;
return 0;
}
int posix_spawn_file_actions_adddup2(posix_spawn_file_actions_t *fa, int srcfd, int fd) {
if (srcfd < 0 || fd < 0) return EBADF;
struct fdop *op = malloc(sizeof *op);
if (!op) return ENOMEM;
op->cmd = FDOP_DUP2;
op->srcfd = srcfd;
op->fd = fd;
if ((op->next = fa->__actions)) op->next->prev = op;
op->prev = 0;
fa->__actions = op;
return 0;
}
int posix_spawn_file_actions_addfchdir_np(posix_spawn_file_actions_t *fa, int fd) {
if (fd < 0) return EBADF;
struct fdop *op = malloc(sizeof *op);
if (!op) return ENOMEM;
op->cmd = FDOP_FCHDIR;
op->fd = fd;
if ((op->next = fa->__actions)) op->next->prev = op;
op->prev = 0;
fa->__actions = op;
return 0;
}
int posix_spawn_file_actions_addopen(posix_spawn_file_actions_t *restrict fa, int fd, const char *restrict path, int flags, mode_t mode) {
if (fd < 0) return EBADF;
struct fdop *op = malloc(sizeof *op + strlen(path) + 1);
if (!op) return ENOMEM;
op->cmd = FDOP_OPEN;
op->fd = fd;
op->oflag = flags;
op->mode = mode;
strcpy(op->path, path);
if ((op->next = fa->__actions)) op->next->prev = op;
op->prev = 0;
fa->__actions = op;
return 0;
}
int posix_spawn_file_actions_destroy(posix_spawn_file_actions_t *fa) {
struct fdop *op = fa->__actions, *next;
while (op) {
next = op->next;
free(op);
op = next;
}
return 0;
}
#if defined (__cplusplus)
}
#endif

View File

@ -0,0 +1,32 @@
#ifndef _SPAWN_H
#define _SPAWN_H
#ifdef __cplusplus
extern "C" {
#endif
#include <features.h>
typedef struct {
int __flags;
pid_t __pgrp;
sigset_t __def, __mask;
int __prio, __pol;
void *__fn;
char __pad[64-sizeof(void *)];
} posix_spawnattr_t;
typedef struct {
int __pad0[2];
void *__actions;
int __pad[16];
} posix_spawn_file_actions_t;
int posix_spawn(pid_t *__restrict, const char *__restrict, const posix_spawn_file_actions_t *,
const posix_spawnattr_t *__restrict, char *const *__restrict, char *const *__restrict);
#ifdef __cplusplus
}
#endif
#endif

View File

@ -92,6 +92,8 @@ add_contrib (openldap-cmake openldap)
add_contrib (grpc-cmake grpc)
add_contrib (msgpack-c-cmake msgpack-c)
add_contrib (corrosion-cmake corrosion)
if (ENABLE_FUZZING)
add_contrib (libprotobuf-mutator-cmake libprotobuf-mutator)
endif()

1
contrib/corrosion vendored Submodule

@ -0,0 +1 @@
Subproject commit d9dfdefaa3d9ec4ba1245c7070727359c65c7869

View File

@ -0,0 +1,46 @@
if (NOT ENABLE_LIBRARIES)
set(DEFAULT_ENABLE_RUST FALSE)
elseif((CMAKE_TOOLCHAIN_FILE MATCHES "darwin") AND (CMAKE_TOOLCHAIN_FILE MATCHES "aarch64"))
message(STATUS "Rust is not available on aarch64-apple-darwin")
set(DEFAULT_ENABLE_RUST FALSE)
else()
list (APPEND CMAKE_MODULE_PATH "${ClickHouse_SOURCE_DIR}/contrib/corrosion/cmake")
find_package(Rust)
set(DEFAULT_ENABLE_RUST ${Rust_FOUND})
endif()
option(ENABLE_RUST "Enable rust" ${DEFAULT_ENABLE_RUST})
message(STATUS ${ENABLE_RUST})
if(NOT ENABLE_RUST)
message(STATUS "Not using rust")
return()
endif()
message(STATUS "Checking Rust toolchain for current target")
if(CMAKE_TOOLCHAIN_FILE MATCHES "linux/toolchain-x86_64")
set(Rust_CARGO_TARGET "x86_64-unknown-linux-gnu")
endif()
if(CMAKE_TOOLCHAIN_FILE MATCHES "linux/toolchain-aarch64")
set(Rust_CARGO_TARGET "aarch64-unknown-linux-gnu")
endif()
if((CMAKE_TOOLCHAIN_FILE MATCHES "darwin") AND (CMAKE_TOOLCHAIN_FILE MATCHES "x86_64"))
set(Rust_CARGO_TARGET "x86_64-apple-darwin")
endif()
if((CMAKE_TOOLCHAIN_FILE MATCHES "freebsd") AND (CMAKE_TOOLCHAIN_FILE MATCHES "x86_64"))
set(Rust_CARGO_TARGET "x86_64-unknown-freebsd")
endif()
if(CMAKE_TOOLCHAIN_FILE MATCHES "ppc64le")
set(Rust_CARGO_TARGET "powerpc64le-unknown-linux-gnu")
endif()
message(STATUS "Switched Rust target to ${Rust_CARGO_TARGET}")
# Define function corrosion_import_crate()
include ("${ClickHouse_SOURCE_DIR}/contrib/corrosion/cmake/Corrosion.cmake")

@ -1 +1 @@
Subproject commit 6ca2b5b3927226f6bcf6c656f502ff5d012ad9b6
Subproject commit dc972a767ff2e9488d96cb2a6e67de160fbe15a7

View File

@ -14,7 +14,7 @@ endif()
# TODO: Enable shared library build
# TODO: Enable compilation on AArch64
set (LLVM_VERSION "13.0.0bundled")
set (LLVM_VERSION "14.0.0bundled")
set (LLVM_INCLUDE_DIRS
"${ClickHouse_SOURCE_DIR}/contrib/llvm-project/llvm/include"
"${ClickHouse_BINARY_DIR}/contrib/llvm-project/llvm/include"

View File

@ -3,6 +3,33 @@
ARG FROM_TAG=latest
FROM clickhouse/test-util:$FROM_TAG
# Rust toolchain and libraries
ENV RUSTUP_HOME=/rust/rustup
ENV CARGO_HOME=/rust/cargo
RUN curl https://sh.rustup.rs -sSf | bash -s -- -y
RUN chmod 777 -R /rust
ENV PATH="/rust/cargo/env:${PATH}"
ENV PATH="/rust/cargo/bin:${PATH}"
RUN rustup target add aarch64-unknown-linux-gnu && \
rustup target add x86_64-apple-darwin && \
rustup target add x86_64-unknown-freebsd && \
rustup target add aarch64-apple-darwin && \
rustup target add powerpc64le-unknown-linux-gnu
RUN apt-get install \
gcc-aarch64-linux-gnu \
build-essential \
libc6 \
libc6-dev \
libc6-dev-arm64-cross \
--yes
# Install CMake 3.20+ for Rust compilation
# Used https://askubuntu.com/a/1157132 as reference
RUN apt purge cmake --yes
RUN wget -O - https://apt.kitware.com/keys/kitware-archive-latest.asc 2>/dev/null | gpg --dearmor - | tee /etc/apt/trusted.gpg.d/kitware.gpg >/dev/null
RUN apt-add-repository 'deb https://apt.kitware.com/ubuntu/ focal main'
RUN apt update && apt install cmake --yes
ENV CC=clang-${LLVM_VERSION}
ENV CXX=clang++-${LLVM_VERSION}

View File

@ -19,6 +19,12 @@ RUN apt-get update \
pv \
--yes --no-install-recommends
# Install CMake 3.20+ for Rust compilation
RUN apt purge cmake --yes
RUN wget -O - https://apt.kitware.com/keys/kitware-archive-latest.asc 2>/dev/null | gpg --dearmor - | tee /etc/apt/trusted.gpg.d/kitware.gpg >/dev/null
RUN apt-add-repository 'deb https://apt.kitware.com/ubuntu/ focal main'
RUN apt update && apt install cmake --yes
RUN pip3 install numpy scipy pandas Jinja2
ARG odbc_driver_url="https://github.com/ClickHouse/clickhouse-odbc/releases/download/v1.1.4.20200302/clickhouse-odbc-1.1.4-Linux.tar.gz"

View File

@ -157,7 +157,6 @@ function run_cmake
"-DUSE_UNWIND=1"
"-DENABLE_NURAFT=1"
"-DENABLE_JEMALLOC=1"
"-DENABLE_REPLXX=1"
)
export CCACHE_DIR="$FASTTEST_WORKSPACE/ccache"

View File

@ -35,6 +35,8 @@ RUN apt-get update \
tzdata \
vim \
wget \
rustc \
cargo \
&& pip3 --no-cache-dir install 'clickhouse-driver==0.2.1' scipy \
&& apt-get purge --yes python3-dev g++ \
&& apt-get autoremove --yes \

View File

@ -35,12 +35,13 @@ RUN apt-get update -y \
tree \
unixodbc \
wget \
rustc \
cargo \
zstd \
file \
pv \
&& apt-get clean
RUN pip3 install numpy scipy pandas Jinja2
RUN mkdir -p /tmp/clickhouse-odbc-tmp \

View File

@ -6,16 +6,32 @@ sidebar_label: MySQL Interface
# MySQL Interface
ClickHouse supports MySQL wire protocol. It can be enabled by [mysql_port](../operations/server-configuration-parameters/settings.md#server_configuration_parameters-mysql_port) setting in configuration file:
ClickHouse supports MySQL wire protocol. To enable the MySQL wire protocol, add the [mysql_port](../operations/server-configuration-parameters/settings.md#server_configuration_parameters-mysql_port) setting to your server's configuration file. For example, you could define the port in a new XML file in your `config.d` folder:
``` xml
<mysql_port>9004</mysql_port>
<clickhouse>
<mysql_port>9004</mysql_port>
</clickhouse>
```
Example of connecting using command-line tool `mysql`:
Startup your ClickHouse server and look for a log message similar to the following that mentions Listening for MySQL compatibility protocol:
```
{} <Information> Application: Listening for MySQL compatibility protocol: 127.0.0.1:9004
```
## Connect mysql to ClickHouse
The following command demonstrates how to connect the MySQL client `mysql` to ClickHouse:
```bash
mysql --protocol tcp -h [hostname] -u [username] -P [port_number] [database_name]
```
For example:
``` bash
$ mysql --protocol tcp -u default -P 9004
$ mysql --protocol tcp -h 127.0.0.1 -u default -P 9004 default
```
Output if a connection succeeded:

View File

@ -10,7 +10,7 @@ Creates new [roles](../../../operations/access-rights.md#role-management). Role
Syntax:
``` sql
CREATE ROLE [IF NOT EXISTS | OR REPLACE] name1 [, name2 ...]
CREATE ROLE [IF NOT EXISTS | OR REPLACE] name1 [ON CLUSTER cluster_name1] [, name2 [ON CLUSTER cluster_name2] ...]
[SETTINGS variable [= value] [MIN [=] min_value] [MAX [=] max_value] [CONST|READONLY|WRITABLE|CHANGEABLE_IN_READONLY] | PROFILE 'profile_name'] [,...]
```

View File

@ -13,7 +13,7 @@ Creates a new view. Views can be [normal](#normal-view), [materialized](#materia
Syntax:
``` sql
CREATE [OR REPLACE] VIEW [IF NOT EXISTS] [db.]table_name [ON CLUSTER] AS SELECT ...
CREATE [OR REPLACE] VIEW [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster_name] AS SELECT ...
```
Normal views do not store any data. They just perform a read from another table on each access. In other words, a normal view is nothing more than a saved query. When reading from a view, this saved query is used as a subquery in the [FROM](../../../sql-reference/statements/select/from.md) clause.

View File

@ -430,9 +430,9 @@ FROM
### Cumulative sum.
```sql
CREATE TABLE events
CREATE TABLE warehouse
(
`metric` String,
`item` String,
`ts` DateTime,
`value` Float
)

View File

@ -11,7 +11,7 @@ sidebar_label: "Роль"
Синтаксис:
```sql
CREATE ROLE [IF NOT EXISTS | OR REPLACE] name1 [, name2 ...]
CREATE ROLE [IF NOT EXISTS | OR REPLACE] name1 [ON CLUSTER cluster_name1] [, name2 [ON CLUSTER cluster_name2] ...]
[SETTINGS variable [= value] [MIN [=] min_value] [MAX [=] max_value] [CONST|READONLY|WRITABLE|CHANGEABLE_IN_READONLY] | PROFILE 'profile_name'] [,...]
```

View File

@ -11,7 +11,7 @@ sidebar_label: "Представление"
## Обычные представления {#normal}
``` sql
CREATE [OR REPLACE] VIEW [IF NOT EXISTS] [db.]table_name [ON CLUSTER] AS SELECT ...
CREATE [OR REPLACE] VIEW [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster_name] AS SELECT ...
```
Обычные представления не хранят никаких данных, они выполняют чтение данных из другой таблицы при каждом доступе. Другими словами, обычное представление — это не что иное, как сохраненный запрос. При чтении данных из представления этот сохраненный запрос используется как подзапрос в секции [FROM](../../../sql-reference/statements/select/from.md).

View File

@ -13,7 +13,7 @@ sidebar_label: VIEW
语法:
``` sql
CREATE [OR REPLACE] VIEW [IF NOT EXISTS] [db.]table_name [ON CLUSTER] AS SELECT ...
CREATE [OR REPLACE] VIEW [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster_name] AS SELECT ...
```
普通视图不存储任何数据。 他们只是在每次访问时从另一个表执行读取。换句话说,普通视图只不过是一个保存的查询。 从视图中读取时,此保存的查询用作[FROM](../../../sql-reference/statements/select/from.md)子句中的子查询.

View File

@ -927,7 +927,11 @@ namespace
executable.string(), config.string(), pid_file.string());
if (!user.empty())
command = fmt::format("clickhouse su '{}' {}", user, command);
{
/// sudo respects limits in /etc/security/limits.conf e.g. open files,
/// that's why we are using it instead of the 'clickhouse su' tool.
command = fmt::format("sudo -u '{}' {}", user, command);
}
fmt::print("Will run {}\n", command);
executeScript(command, true);

4
rust/BLAKE3/CMakeLists.txt Executable file
View File

@ -0,0 +1,4 @@
corrosion_import_crate(MANIFEST_PATH Cargo.toml NO_STD)
target_include_directories(_ch_rust_blake3 INTERFACE include)
add_library(ch_rust::blake3 ALIAS _ch_rust_blake3)

92
rust/BLAKE3/Cargo.lock generated Normal file
View File

@ -0,0 +1,92 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "_ch_rust_blake3"
version = "0.1.0"
dependencies = [
"blake3",
"libc",
]
[[package]]
name = "arrayref"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4c527152e37cf757a3f78aae5a06fbeefdb07ccc535c980a3208ee3060dd544"
[[package]]
name = "arrayvec"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6"
[[package]]
name = "blake3"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "526c210b4520e416420759af363083471656e819a75e831b8d2c9d5a584f2413"
dependencies = [
"arrayref",
"arrayvec",
"cc",
"cfg-if",
"constant_time_eq",
"digest",
]
[[package]]
name = "cc"
version = "1.0.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11"
[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "constant_time_eq"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc"
[[package]]
name = "digest"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066"
dependencies = [
"generic-array",
]
[[package]]
name = "generic-array"
version = "0.14.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bff49e947297f3312447abdca79f45f4738097cc82b06e72054d2223f601f1b9"
dependencies = [
"typenum",
"version_check",
]
[[package]]
name = "libc"
version = "0.2.132"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8371e4e5341c3a96db127eb2465ac681ced4c433e01dd0e938adbef26ba93ba5"
[[package]]
name = "typenum"
version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987"
[[package]]
name = "version_check"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"

13
rust/BLAKE3/Cargo.toml Normal file
View File

@ -0,0 +1,13 @@
[package]
name = "_ch_rust_blake3"
version = "0.1.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
blake3 = "1.2.0"
libc = "0.2.132"
[lib]
crate-type = ["staticlib"]

View File

@ -0,0 +1,17 @@
#ifndef BLAKE3_H
#define BLAKE3_H
#include <cstdint>
extern "C" {
char *blake3_apply_shim(const char *begin, uint32_t _size, uint8_t *out_char_data);
char *blake3_apply_shim_msan_compat(const char *begin, uint32_t size, uint8_t *out_char_data);
void blake3_free_char_pointer(char *ptr_to_free);
} // extern "C"
#endif /* BLAKE3_H */

55
rust/BLAKE3/src/lib.rs Normal file
View File

@ -0,0 +1,55 @@
extern crate blake3;
extern crate libc;
use std::ffi::{CStr, CString};
use std::os::raw::c_char;
use std::mem;
#[no_mangle]
pub unsafe extern "C" fn blake3_apply_shim(
begin: *const c_char,
_size: u32,
out_char_data: *mut u8,
) -> *mut c_char {
if begin.is_null() {
let err_str = CString::new("input was a null pointer").unwrap();
return err_str.into_raw();
}
let mut hasher = blake3::Hasher::new();
let input_bytes = CStr::from_ptr(begin);
let input_res = input_bytes.to_bytes();
hasher.update(input_res);
let mut reader = hasher.finalize_xof();
reader.fill(std::slice::from_raw_parts_mut(out_char_data, blake3::OUT_LEN));
std::ptr::null_mut()
}
#[no_mangle]
pub unsafe extern "C" fn blake3_apply_shim_msan_compat(
mut begin: *const c_char,
size: u32,
out_char_data: *mut u8,
) -> *mut c_char {
if begin.is_null() {
let err_str = CString::new("input was a null pointer").unwrap();
return err_str.into_raw();
}
libc::memset(out_char_data as *mut libc::c_void, 0, mem::size_of::<u8>());
let mut hasher = blake3::Hasher::new();
let mut vec = Vec::<u8>::new();
for _ in 0..size {
vec.push(*begin as u8);
begin = begin.add(1);
}
let input_res = vec.as_mut_slice();
hasher.update(input_res);
let mut reader = hasher.finalize_xof();
reader.fill(std::slice::from_raw_parts_mut(out_char_data, blake3::OUT_LEN));
std::ptr::null_mut()
}
// Freeing memory according to docs: https://doc.rust-lang.org/std/ffi/struct.CString.html#method.into_raw
#[no_mangle]
pub unsafe extern "C" fn blake3_free_char_pointer(ptr_to_free: *mut c_char) {
std::mem::drop(CString::from_raw(ptr_to_free));
}

1
rust/CMakeLists.txt Normal file
View File

@ -0,0 +1 @@
add_subdirectory (BLAKE3)

View File

@ -236,15 +236,8 @@ public:
if constexpr (result_is_nullable)
{
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
#endif
auto * aggregate_data_is_null_dst_value = b.CreateLoad(aggregate_data_dst_ptr);
auto * aggregate_data_is_null_src_value = b.CreateLoad(aggregate_data_src_ptr);
#ifdef __clang__
#pragma clang diagnostic pop
#endif
auto * aggregate_data_is_null_dst_value = b.CreateLoad(aggregate_data_dst_ptr->getType()->getPointerElementType(), aggregate_data_dst_ptr);
auto * aggregate_data_is_null_src_value = b.CreateLoad(aggregate_data_src_ptr->getType()->getPointerElementType(), aggregate_data_src_ptr);
auto * is_src_null = nativeBoolCast(b, std::make_shared<DataTypeUInt8>(), aggregate_data_is_null_src_value);
auto * is_null_result_value = b.CreateSelect(is_src_null, llvm::ConstantInt::get(b.getInt8Ty(), 1), aggregate_data_is_null_dst_value);

View File

@ -279,7 +279,7 @@ void ColumnAggregateFunction::insertRangeFrom(const IColumn & from, size_t start
size_t end = start + length;
for (size_t i = start; i < end; ++i)
insertFrom(from, i);
insertFromWithOwnership(from, i);
}
else
{
@ -448,7 +448,7 @@ void ColumnAggregateFunction::insertData(const char * pos, size_t /*length*/)
data.push_back(*reinterpret_cast<const AggregateDataPtr *>(pos));
}
void ColumnAggregateFunction::insertFrom(const IColumn & from, size_t n)
void ColumnAggregateFunction::insertFromWithOwnership(const IColumn & from, size_t n)
{
/// Must create new state of aggregate function and take ownership of it,
/// because ownership of states of aggregate function cannot be shared for individual rows,
@ -458,6 +458,11 @@ void ColumnAggregateFunction::insertFrom(const IColumn & from, size_t n)
insertMergeFrom(from, n);
}
void ColumnAggregateFunction::insertFrom(const IColumn & from, size_t n)
{
insertRangeFrom(from, n, 1);
}
void ColumnAggregateFunction::insertFrom(ConstAggregateDataPtr place)
{
ensureOwnership();

View File

@ -98,6 +98,8 @@ private:
ColumnAggregateFunction(const ColumnAggregateFunction & src_);
void insertFromWithOwnership(const IColumn & from, size_t n);
public:
~ColumnAggregateFunction() override;

View File

@ -1,11 +1,11 @@
#pragma once
#include <base/types.h>
#include <future>
#include <memory>
#include <vector>
#include <Common/ZooKeeper/IKeeper.h>
#include <base/types.h>
#include <Poco/Event.h>
#include <Common/ZooKeeper/IKeeper.h>
namespace zkutil
@ -34,4 +34,9 @@ Coordination::RequestPtr makeRemoveRequest(const std::string & path, int version
Coordination::RequestPtr makeSetRequest(const std::string & path, const std::string & data, int version);
Coordination::RequestPtr makeCheckRequest(const std::string & path, int version);
Coordination::RequestPtr makeGetRequest(const std::string & path);
Coordination::RequestPtr
makeListRequest(const std::string & path, Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
Coordination::RequestPtr makeSimpleListRequest(const std::string & path);
Coordination::RequestPtr makeExistsRequest(const std::string & path);
}

View File

@ -6,6 +6,8 @@
#include <functional>
#include <filesystem>
#include <Common/ZooKeeper/Types.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/randomSeed.h>
#include <base/find_symbols.h>
#include <base/sort.h>
@ -989,6 +991,24 @@ std::future<Coordination::ListResponse> ZooKeeper::asyncTryGetChildrenNoThrow(
return future;
}
std::future<Coordination::ListResponse>
ZooKeeper::asyncTryGetChildren(const std::string & path, Coordination::ListRequestType list_request_type)
{
auto promise = std::make_shared<std::promise<Coordination::ListResponse>>();
auto future = promise->get_future();
auto callback = [promise, path](const Coordination::ListResponse & response) mutable
{
if (response.error != Coordination::Error::ZOK && response.error != Coordination::Error::ZNONODE)
promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error)));
else
promise->set_value(response);
};
impl->list(path, list_request_type, std::move(callback), {});
return future;
}
std::future<Coordination::RemoveResponse> ZooKeeper::asyncRemove(const std::string & path, int32_t version)
{
auto promise = std::make_shared<std::promise<Coordination::RemoveResponse>>();
@ -1207,6 +1227,37 @@ Coordination::RequestPtr makeCheckRequest(const std::string & path, int version)
return request;
}
Coordination::RequestPtr makeGetRequest(const std::string & path)
{
auto request = std::make_shared<Coordination::GetRequest>();
request->path = path;
return request;
}
Coordination::RequestPtr makeListRequest(const std::string & path, Coordination::ListRequestType list_request_type)
{
// Keeper server that support MultiRead also support FilteredList
auto request = std::make_shared<Coordination::ZooKeeperFilteredListRequest>();
request->path = path;
request->list_request_type = list_request_type;
return request;
}
Coordination::RequestPtr makeSimpleListRequest(const std::string & path)
{
auto request = std::make_shared<Coordination::ZooKeeperSimpleListRequest>();
request->path = path;
return request;
}
Coordination::RequestPtr makeExistsRequest(const std::string & path)
{
auto request = std::make_shared<Coordination::ZooKeeperExistsRequest>();
request->path = path;
return request;
}
std::string normalizeZooKeeperPath(std::string zookeeper_path, bool check_starts_with_slash, Poco::Logger * log)
{
if (!zookeeper_path.empty() && zookeeper_path.back() == '/')

View File

@ -12,6 +12,7 @@
#include <Common/CurrentMetrics.h>
#include <Common/Stopwatch.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/ZooKeeperConstants.h>
#include <Common/ZooKeeper/ZooKeeperArgs.h>
#include <Common/thread_local_rng.h>
@ -72,6 +73,63 @@ struct RemoveException
using GetPriorityForLoadBalancing = DB::GetPriorityForLoadBalancing;
template <typename T>
concept ZooKeeperResponse = std::derived_from<T, Coordination::Response>;
template <ZooKeeperResponse ResponseType>
struct MultiReadResponses
{
template <typename TResponses>
explicit MultiReadResponses(TResponses responses_) : responses(std::move(responses_))
{}
size_t size() const
{
return std::visit([](auto && resp) { return resp.size(); }, responses);
}
ResponseType & operator[](size_t index)
{
return std::visit(
[&]<typename TResponses>(TResponses & resp) -> ResponseType &
{
if constexpr (std::same_as<TResponses, RegularResponses>)
return dynamic_cast<ResponseType &>(*resp[index]);
else
return resp[index];
},
responses);
}
private:
using RegularResponses = std::vector<Coordination::ResponsePtr>;
using FutureResponses = std::vector<std::future<ResponseType>>;
struct ResponsesWithFutures
{
ResponsesWithFutures(FutureResponses future_responses_) : future_responses(std::move(future_responses_))
{
cached_responses.resize(future_responses.size());
}
FutureResponses future_responses;
std::vector<std::optional<ResponseType>> cached_responses;
ResponseType & operator[](size_t index)
{
if (cached_responses[index].has_value())
return *cached_responses[index];
cached_responses[index] = future_responses[index].get();
return *cached_responses[index];
}
size_t size() const { return future_responses.size(); }
};
std::variant<RegularResponses, ResponsesWithFutures> responses;
};
/// ZooKeeper session. The interface is substantially different from the usual libzookeeper API.
///
/// Poco::Event objects are used for watches. The event is set only once on the first
@ -89,7 +147,6 @@ public:
ZooKeeper(const ZooKeeperArgs & args_, std::shared_ptr<DB::ZooKeeperLog> zk_log_ = nullptr);
/** Config of the form:
<zookeeper>
<node>
@ -160,17 +217,64 @@ public:
bool exists(const std::string & path, Coordination::Stat * stat = nullptr, const EventPtr & watch = nullptr);
bool existsWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback);
using MultiExistsResponse = MultiReadResponses<Coordination::ExistsResponse>;
template <typename TIter>
MultiExistsResponse exists(TIter start, TIter end)
{
return multiRead<Coordination::ExistsResponse, true>(
start, end, zkutil::makeExistsRequest, [&](const auto & path) { return asyncExists(path); });
}
MultiExistsResponse exists(const std::vector<std::string> & paths)
{
return exists(paths.begin(), paths.end());
}
std::string get(const std::string & path, Coordination::Stat * stat = nullptr, const EventPtr & watch = nullptr);
std::string getWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback);
using MultiGetResponse = MultiReadResponses<Coordination::GetResponse>;
template <typename TIter>
MultiGetResponse get(TIter start, TIter end)
{
return multiRead<Coordination::GetResponse, false>(
start, end, zkutil::makeGetRequest, [&](const auto & path) { return asyncGet(path); });
}
MultiGetResponse get(const std::vector<std::string> & paths)
{
return get(paths.begin(), paths.end());
}
/// Doesn't not throw in the following cases:
/// * The node doesn't exist. Returns false in this case.
bool tryGet(const std::string & path, std::string & res, Coordination::Stat * stat = nullptr, const EventPtr & watch = nullptr,
bool tryGet(
const std::string & path,
std::string & res,
Coordination::Stat * stat = nullptr,
const EventPtr & watch = nullptr,
Coordination::Error * code = nullptr);
bool tryGetWatch(const std::string & path, std::string & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback,
bool tryGetWatch(
const std::string & path,
std::string & res,
Coordination::Stat * stat,
Coordination::WatchCallback watch_callback,
Coordination::Error * code = nullptr);
template <typename TIter>
MultiGetResponse tryGet(TIter start, TIter end)
{
return multiRead<Coordination::GetResponse, true>(
start, end, zkutil::makeGetRequest, [&](const auto & path) { return asyncTryGet(path); });
}
MultiGetResponse tryGet(const std::vector<std::string> & paths)
{
return tryGet(paths.begin(), paths.end());
}
void set(const std::string & path, const std::string & data,
int32_t version = -1, Coordination::Stat * stat = nullptr);
@ -193,18 +297,58 @@ public:
Coordination::WatchCallback watch_callback,
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
using MultiGetChildrenResponse = MultiReadResponses<Coordination::ListResponse>;
template <typename TIter>
MultiGetChildrenResponse
getChildren(TIter start, TIter end, Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL)
{
return multiRead<Coordination::ListResponse, false>(
start,
end,
[list_request_type](const auto & path) { return zkutil::makeListRequest(path, list_request_type); },
[&](const auto & path) { return asyncGetChildren(path, {}, list_request_type); });
}
MultiGetChildrenResponse
getChildren(const std::vector<std::string> & paths, Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL)
{
return getChildren(paths.begin(), paths.end(), list_request_type);
}
/// Doesn't not throw in the following cases:
/// * The node doesn't exist.
Coordination::Error tryGetChildren(const std::string & path, Strings & res,
Coordination::Error tryGetChildren(
const std::string & path,
Strings & res,
Coordination::Stat * stat = nullptr,
const EventPtr & watch = nullptr,
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
Coordination::Error tryGetChildrenWatch(const std::string & path, Strings & res,
Coordination::Error tryGetChildrenWatch(
const std::string & path,
Strings & res,
Coordination::Stat * stat,
Coordination::WatchCallback watch_callback,
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
template <typename TIter>
MultiGetChildrenResponse
tryGetChildren(TIter start, TIter end, Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL)
{
return multiRead<Coordination::ListResponse, true>(
start,
end,
[list_request_type](const auto & path) { return zkutil::makeListRequest(path, list_request_type); },
[&](const auto & path) { return asyncTryGetChildren(path, list_request_type); });
}
MultiGetChildrenResponse
tryGetChildren(const std::vector<std::string> & paths, Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL)
{
return tryGetChildren(paths.begin(), paths.end(), list_request_type);
}
/// Performs several operations in a transaction.
/// Throws on every error.
Coordination::Responses multi(const Coordination::Requests & requests);
@ -327,6 +471,12 @@ public:
/// * The node doesn't exist
FutureGet asyncTryGet(const std::string & path);
/// Doesn't throw in the following cases:
/// * The node doesn't exist
FutureGetChildren asyncTryGetChildren(
const std::string & path,
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
void finalize(const String & reason);
void setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_);
@ -354,6 +504,46 @@ private:
Coordination::Error existsImpl(const std::string & path, Coordination::Stat * stat_, Coordination::WatchCallback watch_callback);
Coordination::Error syncImpl(const std::string & path, std::string & returned_path);
using RequestFactory = std::function<Coordination::RequestPtr(const std::string &)>;
template <typename TResponse>
using AsyncFunction = std::function<std::future<TResponse>(const std::string &)>;
template <typename TResponse, bool try_multi, typename TIter>
MultiReadResponses<TResponse> multiRead(TIter start, TIter end, RequestFactory request_factory, AsyncFunction<TResponse> async_fun)
{
if (getApiVersion() >= DB::KeeperApiVersion::WITH_MULTI_READ)
{
Coordination::Requests requests;
for (auto it = start; it != end; ++it)
requests.push_back(request_factory(*it));
if constexpr (try_multi)
{
Coordination::Responses responses;
tryMulti(requests, responses);
return MultiReadResponses<TResponse>{std::move(responses)};
}
else
{
auto responses = multi(requests);
return MultiReadResponses<TResponse>{std::move(responses)};
}
}
auto responses_size = std::distance(start, end);
std::vector<std::future<TResponse>> future_responses;
if (responses_size == 0)
return MultiReadResponses<TResponse>(std::move(future_responses));
future_responses.reserve(responses_size);
for (auto it = start; it != end; ++it)
future_responses.push_back(async_fun(*it));
return MultiReadResponses<TResponse>{std::move(future_responses)};
}
std::unique_ptr<Coordination::IKeeper> impl;
ZooKeeperArgs args;

View File

@ -337,6 +337,15 @@ void ZooKeeperListResponse::writeImpl(WriteBuffer & out) const
Coordination::write(stat, out);
}
void ZooKeeperSimpleListResponse::readImpl(ReadBuffer & in)
{
Coordination::read(names, in);
}
void ZooKeeperSimpleListResponse::writeImpl(WriteBuffer & out) const
{
Coordination::write(names, out);
}
void ZooKeeperSetACLRequest::writeImpl(WriteBuffer & out) const
{
@ -426,16 +435,29 @@ void ZooKeeperErrorResponse::writeImpl(WriteBuffer & out) const
Coordination::write(error, out);
}
void ZooKeeperMultiRequest::checkOperationType(OperationType type)
{
chassert(!operation_type.has_value() || *operation_type == type);
operation_type = type;
}
OpNum ZooKeeperMultiRequest::getOpNum() const
{
return !operation_type.has_value() || *operation_type == OperationType::Write ? OpNum::Multi : OpNum::MultiRead;
}
ZooKeeperMultiRequest::ZooKeeperMultiRequest(const Requests & generic_requests, const ACLs & default_acls)
{
/// Convert nested Requests to ZooKeeperRequests.
/// Note that deep copy is required to avoid modifying path in presence of chroot prefix.
requests.reserve(generic_requests.size());
using enum OperationType;
for (const auto & generic_request : generic_requests)
{
if (const auto * concrete_request_create = dynamic_cast<const CreateRequest *>(generic_request.get()))
{
checkOperationType(Write);
auto create = std::make_shared<ZooKeeperCreateRequest>(*concrete_request_create);
if (create->acls.empty())
create->acls = default_acls;
@ -443,16 +465,39 @@ ZooKeeperMultiRequest::ZooKeeperMultiRequest(const Requests & generic_requests,
}
else if (const auto * concrete_request_remove = dynamic_cast<const RemoveRequest *>(generic_request.get()))
{
checkOperationType(Write);
requests.push_back(std::make_shared<ZooKeeperRemoveRequest>(*concrete_request_remove));
}
else if (const auto * concrete_request_set = dynamic_cast<const SetRequest *>(generic_request.get()))
{
checkOperationType(Write);
requests.push_back(std::make_shared<ZooKeeperSetRequest>(*concrete_request_set));
}
else if (const auto * concrete_request_check = dynamic_cast<const CheckRequest *>(generic_request.get()))
{
checkOperationType(Write);
requests.push_back(std::make_shared<ZooKeeperCheckRequest>(*concrete_request_check));
}
else if (const auto * concrete_request_get = dynamic_cast<const GetRequest *>(generic_request.get()))
{
checkOperationType(Read);
requests.push_back(std::make_shared<ZooKeeperGetRequest>(*concrete_request_get));
}
else if (const auto * concrete_request_exists = dynamic_cast<const ExistsRequest *>(generic_request.get()))
{
checkOperationType(Read);
requests.push_back(std::make_shared<ZooKeeperExistsRequest>(*concrete_request_exists));
}
else if (const auto * concrete_request_simple_list = dynamic_cast<const ZooKeeperSimpleListRequest *>(generic_request.get()))
{
checkOperationType(Read);
requests.push_back(std::make_shared<ZooKeeperSimpleListRequest>(*concrete_request_simple_list));
}
else if (const auto * concrete_request_list = dynamic_cast<const ZooKeeperFilteredListRequest *>(generic_request.get()))
{
checkOperationType(Read);
requests.push_back(std::make_shared<ZooKeeperFilteredListRequest>(*concrete_request_list));
}
else
throw Exception("Illegal command as part of multi ZooKeeper request", Error::ZBADARGUMENTS);
}
@ -526,8 +571,7 @@ std::string ZooKeeperMultiRequest::toStringImpl() const
bool ZooKeeperMultiRequest::isReadRequest() const
{
/// Possibly we can do better
return false;
return getOpNum() == OpNum::MultiRead;
}
void ZooKeeperMultiResponse::readImpl(ReadBuffer & in)
@ -622,8 +666,18 @@ ZooKeeperResponsePtr ZooKeeperExistsRequest::makeResponse() const { return setTi
ZooKeeperResponsePtr ZooKeeperGetRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperGetResponse>()); }
ZooKeeperResponsePtr ZooKeeperSetRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperSetResponse>()); }
ZooKeeperResponsePtr ZooKeeperListRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperListResponse>()); }
ZooKeeperResponsePtr ZooKeeperSimpleListRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperSimpleListResponse>()); }
ZooKeeperResponsePtr ZooKeeperCheckRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperCheckResponse>()); }
ZooKeeperResponsePtr ZooKeeperMultiRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperMultiResponse>(requests)); }
ZooKeeperResponsePtr ZooKeeperMultiRequest::makeResponse() const
{
std::shared_ptr<ZooKeeperMultiResponse> response;
if (getOpNum() == OpNum::Multi)
response = std::make_shared<ZooKeeperMultiWriteResponse>(requests);
else
response = std::make_shared<ZooKeeperMultiReadResponse>(requests);
return setTime(std::move(response));
}
ZooKeeperResponsePtr ZooKeeperCloseRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperCloseResponse>()); }
ZooKeeperResponsePtr ZooKeeperSetACLRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperSetACLResponse>()); }
ZooKeeperResponsePtr ZooKeeperGetACLRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperGetACLResponse>()); }
@ -873,6 +927,12 @@ void registerZooKeeperRequest(ZooKeeperRequestFactory & factory)
{
auto res = std::make_shared<RequestT>();
res->request_created_time_ns = clock_gettime_ns();
if constexpr (num == OpNum::MultiRead)
res->operation_type = ZooKeeperMultiRequest::OperationType::Read;
else if constexpr (num == OpNum::Multi)
res->operation_type = ZooKeeperMultiRequest::OperationType::Write;
return res;
});
}
@ -892,6 +952,7 @@ ZooKeeperRequestFactory::ZooKeeperRequestFactory()
registerZooKeeperRequest<OpNum::List, ZooKeeperListRequest>(*this);
registerZooKeeperRequest<OpNum::Check, ZooKeeperCheckRequest>(*this);
registerZooKeeperRequest<OpNum::Multi, ZooKeeperMultiRequest>(*this);
registerZooKeeperRequest<OpNum::MultiRead, ZooKeeperMultiRequest>(*this);
registerZooKeeperRequest<OpNum::SessionID, ZooKeeperSessionIDRequest>(*this);
registerZooKeeperRequest<OpNum::GetACL, ZooKeeperGetACLRequest>(*this);
registerZooKeeperRequest<OpNum::SetACL, ZooKeeperSetACLRequest>(*this);

View File

@ -257,6 +257,9 @@ struct ZooKeeperRemoveResponse final : RemoveResponse, ZooKeeperResponse
struct ZooKeeperExistsRequest final : ExistsRequest, ZooKeeperRequest
{
ZooKeeperExistsRequest() = default;
explicit ZooKeeperExistsRequest(const ExistsRequest & base) : ExistsRequest(base) {}
OpNum getOpNum() const override { return OpNum::Exists; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
@ -281,6 +284,9 @@ struct ZooKeeperExistsResponse final : ExistsResponse, ZooKeeperResponse
struct ZooKeeperGetRequest final : GetRequest, ZooKeeperRequest
{
ZooKeeperGetRequest() = default;
explicit ZooKeeperGetRequest(const GetRequest & base) : GetRequest(base) {}
OpNum getOpNum() const override { return OpNum::Get; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
@ -333,6 +339,9 @@ struct ZooKeeperSetResponse final : SetResponse, ZooKeeperResponse
struct ZooKeeperListRequest : ListRequest, ZooKeeperRequest
{
ZooKeeperListRequest() = default;
explicit ZooKeeperListRequest(const ListRequest & base) : ListRequest(base) {}
OpNum getOpNum() const override { return OpNum::List; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
@ -346,6 +355,7 @@ struct ZooKeeperListRequest : ListRequest, ZooKeeperRequest
struct ZooKeeperSimpleListRequest final : ZooKeeperListRequest
{
OpNum getOpNum() const override { return OpNum::SimpleList; }
ZooKeeperResponsePtr makeResponse() const override;
};
struct ZooKeeperFilteredListRequest final : ZooKeeperListRequest
@ -373,7 +383,11 @@ struct ZooKeeperListResponse : ListResponse, ZooKeeperResponse
struct ZooKeeperSimpleListResponse final : ZooKeeperListResponse
{
void readImpl(ReadBuffer & in) override;
void writeImpl(WriteBuffer & out) const override;
OpNum getOpNum() const override { return OpNum::SimpleList; }
size_t bytesSize() const override { return ZooKeeperListResponse::bytesSize() - sizeof(stat); }
};
struct ZooKeeperCheckRequest final : CheckRequest, ZooKeeperRequest
@ -458,7 +472,7 @@ struct ZooKeeperGetACLResponse final : GetACLResponse, ZooKeeperResponse
struct ZooKeeperMultiRequest final : MultiRequest, ZooKeeperRequest
{
OpNum getOpNum() const override { return OpNum::Multi; }
OpNum getOpNum() const override;
ZooKeeperMultiRequest() = default;
ZooKeeperMultiRequest(const Requests & generic_requests, const ACLs & default_acls);
@ -473,12 +487,20 @@ struct ZooKeeperMultiRequest final : MultiRequest, ZooKeeperRequest
size_t bytesSize() const override { return MultiRequest::bytesSize() + sizeof(xid) + sizeof(has_watch); }
void createLogElements(LogElements & elems) const override;
enum class OperationType : UInt8
{
Read,
Write
};
std::optional<OperationType> operation_type;
private:
void checkOperationType(OperationType type);
};
struct ZooKeeperMultiResponse final : MultiResponse, ZooKeeperResponse
struct ZooKeeperMultiResponse : MultiResponse, ZooKeeperResponse
{
OpNum getOpNum() const override { return OpNum::Multi; }
explicit ZooKeeperMultiResponse(const Requests & requests)
{
responses.reserve(requests.size());
@ -501,6 +523,18 @@ struct ZooKeeperMultiResponse final : MultiResponse, ZooKeeperResponse
void fillLogElements(LogElements & elems, size_t idx) const override;
};
struct ZooKeeperMultiWriteResponse final : public ZooKeeperMultiResponse
{
OpNum getOpNum() const override { return OpNum::Multi; }
using ZooKeeperMultiResponse::ZooKeeperMultiResponse;
};
struct ZooKeeperMultiReadResponse final : public ZooKeeperMultiResponse
{
OpNum getOpNum() const override { return OpNum::MultiRead; }
using ZooKeeperMultiResponse::ZooKeeperMultiResponse;
};
/// Fake internal coordination (keeper) response. Never received from client
/// and never send to client.
struct ZooKeeperSessionIDRequest final : ZooKeeperRequest

View File

@ -20,6 +20,7 @@ static const std::unordered_set<int32_t> VALID_OPERATIONS =
static_cast<int32_t>(OpNum::List),
static_cast<int32_t>(OpNum::Check),
static_cast<int32_t>(OpNum::Multi),
static_cast<int32_t>(OpNum::MultiRead),
static_cast<int32_t>(OpNum::Auth),
static_cast<int32_t>(OpNum::SessionID),
static_cast<int32_t>(OpNum::SetACL),
@ -53,6 +54,8 @@ std::string toString(OpNum op_num)
return "Check";
case OpNum::Multi:
return "Multi";
case OpNum::MultiRead:
return "MultiRead";
case OpNum::Sync:
return "Sync";
case OpNum::Heartbeat:

View File

@ -31,6 +31,7 @@ enum class OpNum : int32_t
List = 12,
Check = 13,
Multi = 14,
MultiRead = 22,
Auth = 100,
// CH Keeper specific operations

View File

@ -1,5 +1,7 @@
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperImpl.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/Exception.h>
#include <Common/EventNotifier.h>
@ -14,6 +16,7 @@
#include <base/getThreadId.h>
#include <Common/config.h>
#include "Coordination/KeeperConstants.h"
#if USE_SSL
# include <Poco/Net/SecureStreamSocket.h>
@ -1298,6 +1301,9 @@ void ZooKeeper::multi(
{
ZooKeeperMultiRequest request(requests, default_acls);
if (request.getOpNum() == OpNum::MultiRead && keeper_api_version < Coordination::KeeperApiVersion::WITH_MULTI_READ)
throw Exception(Error::ZBADARGUMENTS, "MultiRead request type cannot be used because it's not supported by the server");
RequestInfo request_info;
request_info.request = std::make_shared<ZooKeeperMultiRequest>(std::move(request));
request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const MultiResponse &>(response)); };

View File

@ -8,10 +8,11 @@ namespace DB
enum class KeeperApiVersion : uint8_t
{
ZOOKEEPER_COMPATIBLE = 0,
WITH_FILTERED_LIST
WITH_FILTERED_LIST,
WITH_MULTI_READ
};
inline constexpr auto current_keeper_api_version = KeeperApiVersion::WITH_FILTERED_LIST;
inline constexpr auto current_keeper_api_version = KeeperApiVersion::WITH_MULTI_READ;
const std::string keeper_system_path = "/keeper";
const std::string keeper_api_version_path = keeper_system_path + "/api_version";

View File

@ -1601,6 +1601,9 @@ struct KeeperStorageGetACLRequestProcessor final : public KeeperStorageRequestPr
struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestProcessor
{
using OperationType = Coordination::ZooKeeperMultiRequest::OperationType;
std::optional<OperationType> operation_type;
bool checkAuth(KeeperStorage & storage, int64_t session_id, bool is_local) const override
{
for (const auto & concrete_request : concrete_requests)
@ -1616,28 +1619,55 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
Coordination::ZooKeeperMultiRequest & request = dynamic_cast<Coordination::ZooKeeperMultiRequest &>(*zk_request);
concrete_requests.reserve(request.requests.size());
const auto check_operation_type = [&](OperationType type)
{
if (operation_type.has_value() && *operation_type != type)
throw DB::Exception(ErrorCodes::BAD_ARGUMENTS, "Illegal mixing of read and write operations in multi request");
operation_type = type;
};
for (const auto & sub_request : request.requests)
{
auto sub_zk_request = std::dynamic_pointer_cast<Coordination::ZooKeeperRequest>(sub_request);
switch (sub_zk_request->getOpNum())
{
case Coordination::OpNum::Create:
check_operation_type(OperationType::Write);
concrete_requests.push_back(std::make_shared<KeeperStorageCreateRequestProcessor>(sub_zk_request));
break;
case Coordination::OpNum::Remove:
check_operation_type(OperationType::Write);
concrete_requests.push_back(std::make_shared<KeeperStorageRemoveRequestProcessor>(sub_zk_request));
break;
case Coordination::OpNum::Set:
check_operation_type(OperationType::Write);
concrete_requests.push_back(std::make_shared<KeeperStorageSetRequestProcessor>(sub_zk_request));
break;
case Coordination::OpNum::Check:
check_operation_type(OperationType::Write);
concrete_requests.push_back(std::make_shared<KeeperStorageCheckRequestProcessor>(sub_zk_request));
break;
case Coordination::OpNum::Get:
check_operation_type(OperationType::Read);
concrete_requests.push_back(std::make_shared<KeeperStorageGetRequestProcessor>(sub_zk_request));
break;
case Coordination::OpNum::Exists:
check_operation_type(OperationType::Read);
concrete_requests.push_back(std::make_shared<KeeperStorageExistsRequestProcessor>(sub_zk_request));
break;
case Coordination::OpNum::List:
case Coordination::OpNum::FilteredList:
case Coordination::OpNum::SimpleList:
check_operation_type(OperationType::Read);
concrete_requests.push_back(std::make_shared<KeeperStorageListRequestProcessor>(sub_zk_request));
break;
default:
throw DB::Exception(
ErrorCodes::BAD_ARGUMENTS, "Illegal command as part of multi ZooKeeper request {}", sub_zk_request->getOpNum());
}
}
assert(request.requests.empty() || operation_type.has_value());
}
std::vector<KeeperStorage::Delta>
@ -1652,7 +1682,8 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
if (!new_deltas.empty())
{
if (auto * error = std::get_if<KeeperStorage::ErrorDelta>(&new_deltas.back().operation))
if (auto * error = std::get_if<KeeperStorage::ErrorDelta>(&new_deltas.back().operation);
error && *operation_type == OperationType::Write)
{
storage.uncommitted_state.rollback(zxid);
response_errors.push_back(error->error);
@ -1699,8 +1730,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
for (size_t i = 0; i < concrete_requests.size(); ++i)
{
auto cur_response = concrete_requests[i]->process(storage, zxid);
response.responses[i] = cur_response;
response.responses[i] = concrete_requests[i]->process(storage, zxid);
storage.uncommitted_state.commit(zxid);
}
@ -1715,26 +1745,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
for (size_t i = 0; i < concrete_requests.size(); ++i)
{
auto cur_response = concrete_requests[i]->processLocal(storage, zxid);
response.responses[i] = cur_response;
if (cur_response->error != Coordination::Error::ZOK)
{
for (size_t j = 0; j <= i; ++j)
{
auto response_error = response.responses[j]->error;
response.responses[j] = std::make_shared<Coordination::ZooKeeperErrorResponse>();
response.responses[j]->error = response_error;
}
for (size_t j = i + 1; j < response.responses.size(); ++j)
{
response.responses[j] = std::make_shared<Coordination::ZooKeeperErrorResponse>();
response.responses[j]->error = Coordination::Error::ZRUNTIMEINCONSISTENCY;
}
return response_ptr;
}
response.responses[i] = concrete_requests[i]->processLocal(storage, zxid);
}
response.error = Coordination::Error::ZOK;
@ -1881,6 +1892,7 @@ KeeperStorageRequestProcessorsFactory::KeeperStorageRequestProcessorsFactory()
registerKeeperRequestProcessor<Coordination::OpNum::FilteredList, KeeperStorageListRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::Check, KeeperStorageCheckRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::Multi, KeeperStorageMultiRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::MultiRead, KeeperStorageMultiRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::SetACL, KeeperStorageSetACLRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::GetACL, KeeperStorageGetACLRequestProcessor>(*this);
}

View File

@ -261,8 +261,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, fallback_to_stale_replicas_for_distributed_queries, true, "Suppose max_replica_delay_for_distributed_queries is set and all replicas for the queried table are stale. If this setting is enabled, the query will be performed anyway, otherwise the error will be reported.", 0) \
M(UInt64, preferred_max_column_in_block_size_bytes, 0, "Limit on max column size in block while reading. Helps to decrease cache misses count. Should be close to L2 cache size.", 0) \
\
M(UInt64, parts_to_delay_insert, 150, "If the destination table contains at least that many active parts in a single partition, artificially slow down insert into table.", 0) \
M(UInt64, parts_to_throw_insert, 300, "If more than this number active parts in a single partition of the destination table, throw 'Too many parts ...' exception.", 0) \
M(UInt64, parts_to_delay_insert, 0, "If the destination table contains at least that many active parts in a single partition, artificially slow down insert into table.", 0) \
M(UInt64, parts_to_throw_insert, 0, "If more than this number active parts in a single partition of the destination table, throw 'Too many parts ...' exception.", 0) \
M(Bool, insert_distributed_sync, false, "If setting is enabled, insert query into distributed waits until data will be sent to all nodes in cluster.", 0) \
M(UInt64, insert_distributed_timeout, 0, "Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. Zero value means no timeout.", 0) \
M(Int64, distributed_ddl_task_timeout, 180, "Timeout for DDL query responses from all hosts in cluster. If a ddl request has not been performed on all hosts, a response will contain a timeout error and a request will be executed in an async mode. Negative value means infinite. Zero means async mode.", 0) \

View File

@ -14,5 +14,5 @@ endif()
target_link_libraries (daemon PUBLIC loggers common PRIVATE clickhouse_common_io clickhouse_common_config ${EXECINFO_LIBRARIES})
if (TARGET ch_contrib::sentry)
target_link_libraries (daemon PRIVATE ch_contrib::sentry)
target_link_libraries (daemon PRIVATE ch_contrib::sentry dbms)
endif ()

View File

@ -33,6 +33,13 @@ list (APPEND PRIVATE_LIBS
divide_impl
)
if (TARGET ch_rust::blake3)
list (APPEND PUBLIC_LIBS
ch_rust::blake3
)
endif()
if (TARGET OpenSSL::Crypto)
list (APPEND PUBLIC_LIBS OpenSSL::Crypto)
endif()

View File

@ -41,5 +41,22 @@ REGISTER_FUNCTION(Hashing)
factory.registerFunction<FunctionXxHash64>();
factory.registerFunction<FunctionWyHash64>();
#if USE_BLAKE3
factory.registerFunction<FunctionBLAKE3>(
{
R"(
Calculates BLAKE3 hash string and returns the resulting set of bytes as FixedString.
This cryptographic hash-function is integrated into ClickHouse with BLAKE3 Rust library.
The function is rather fast and shows approximately two times faster performance compared to SHA-2, while generating hashes of the same length as SHA-256.
It returns a BLAKE3 hash as a byte array with type FixedString(32).
)",
Documentation::Examples{
{"hash", "SELECT hex(blake3('ABC'))"}},
Documentation::Categories{"Hash"}
},
FunctionFactory::CaseSensitive);
#endif
}
}

View File

@ -10,6 +10,10 @@
#include "config_functions.h"
#include "config_core.h"
#if USE_BLAKE3
# include <blake3.h>
#endif
#include <Common/SipHash.h>
#include <Common/typeid_cast.h>
#include <Common/HashTable/Hash.h>
@ -615,6 +619,32 @@ struct ImplXxHash64
static constexpr bool use_int_hash_for_pods = false;
};
#if USE_BLAKE3
struct ImplBLAKE3
{
static constexpr auto name = "blake3";
enum { length = 32 };
static void apply(const char * begin, const size_t size, unsigned char* out_char_data)
{
#if defined(MEMORY_SANITIZER)
auto err_msg = blake3_apply_shim_msan_compat(begin, size, out_char_data);
__msan_unpoison(out_char_data, length);
#else
auto err_msg = blake3_apply_shim(begin, size, out_char_data);
#endif
if (err_msg != nullptr)
{
auto err_st = std::string(err_msg);
blake3_free_char_pointer(err_msg);
throw Exception("Function returned error message: " + std::string(err_msg), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
}
};
#endif
template <typename Impl>
class FunctionStringHashFixedString : public IFunction
{
@ -1474,4 +1504,8 @@ using FunctionXxHash64 = FunctionAnyHash<ImplXxHash64>;
using FunctionWyHash64 = FunctionAnyHash<ImplWyHash64>;
#if USE_BLAKE3
using FunctionBLAKE3 = FunctionStringHashFixedString<ImplBLAKE3>;
#endif
}

View File

@ -8,5 +8,6 @@
#cmakedefine01 USE_H3
#cmakedefine01 USE_S2_GEOMETRY
#cmakedefine01 USE_FASTOPS
#cmakedefine01 USE_BLAKE3
#cmakedefine01 USE_NLP
#cmakedefine01 USE_VECTORSCAN

View File

@ -6,6 +6,7 @@
#include <Access/Common/AccessRightsElement.h>
#include <Common/typeid_cast.h>
#include <Parsers/ASTExpressionList.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Interpreters/processColumnTransformers.h>
@ -75,6 +76,9 @@ BlockIO InterpreterOptimizeQuery::execute()
}
}
if (auto * snapshot_data = dynamic_cast<MergeTreeData::SnapshotData *>(storage_snapshot->data.get()))
snapshot_data->parts = {};
table->optimize(query_ptr, metadata_snapshot, ast.partition, ast.final, ast.deduplicate, column_names, getContext());
return {};

View File

@ -19,9 +19,9 @@
#include <llvm/ExecutionEngine/SectionMemoryManager.h>
#include <llvm/ExecutionEngine/JITEventListener.h>
#include <llvm/MC/SubtargetFeature.h>
#include <llvm/MC/TargetRegistry.h>
#include <llvm/Support/DynamicLibrary.h>
#include <llvm/Support/Host.h>
#include <llvm/Support/TargetRegistry.h>
#include <llvm/Support/TargetSelect.h>
#include <llvm/Transforms/IPO/PassManagerBuilder.h>
#include <llvm/Support/SmallVectorMemoryBuffer.h>

View File

@ -873,15 +873,10 @@ CompiledSortDescriptionFunction compileSortDescription(
auto * lhs_column_data = b.CreatePointerCast(b.CreateExtractValue(lhs_column, {0}), column_native_type_pointer);
auto * lhs_column_null_data = column_type_is_nullable ? b.CreateExtractValue(lhs_column, {1}) : nullptr;
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
#endif
auto * ty_lhs_column_data = llvm::cast<llvm::PointerType>(lhs_column_data->getType()->getScalarType())->getElementType();
llvm::Value * lhs_value = b.CreateLoad(b.CreateInBoundsGEP(ty_lhs_column_data, lhs_column_data, lhs_index_arg));
#ifdef __clang__
#pragma clang diagnostic pop
#endif
llvm::Value * lhs_cib_gep = b.CreateInBoundsGEP(ty_lhs_column_data, lhs_column_data, lhs_index_arg);
llvm::Value * lhs_value = b.CreateLoad(lhs_cib_gep->getType()->getPointerElementType(), lhs_cib_gep);
if (lhs_column_null_data)
{
@ -896,15 +891,11 @@ CompiledSortDescriptionFunction compileSortDescription(
auto * rhs_column_data = b.CreatePointerCast(b.CreateExtractValue(rhs_column, {0}), column_native_type_pointer);
auto * rhs_column_null_data = column_type_is_nullable ? b.CreateExtractValue(rhs_column, {1}) : nullptr;
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
#endif
auto * ty_rhs_column_data = llvm::cast<llvm::PointerType>(rhs_column_data->getType()->getScalarType())->getElementType();
llvm::Value * rhs_value = b.CreateLoad(b.CreateInBoundsGEP(ty_rhs_column_data, rhs_column_data, rhs_index_arg));
#ifdef __clang__
#pragma clang diagnostic pop
#endif
llvm::Value * rhs_cib_gep = b.CreateInBoundsGEP(ty_rhs_column_data, rhs_column_data, rhs_index_arg);
llvm::Value * rhs_value = b.CreateLoad(rhs_cib_gep->getType()->getPointerElementType(), rhs_cib_gep);
if (rhs_column_null_data)
{
auto * ty_rhs_column_null_data = llvm::cast<llvm::PointerType>(rhs_column_null_data->getType()->getScalarType())->getElementType();

View File

@ -116,29 +116,29 @@ String TransactionLog::serializeTID(const TransactionID & tid)
void TransactionLog::loadEntries(Strings::const_iterator beg, Strings::const_iterator end)
{
std::vector<std::future<Coordination::GetResponse>> futures;
size_t entries_count = std::distance(beg, end);
if (!entries_count)
return;
String last_entry = *std::prev(end);
LOG_TRACE(log, "Loading {} entries from {}: {}..{}", entries_count, zookeeper_path_log, *beg, last_entry);
futures.reserve(entries_count);
std::vector<std::string> entry_paths;
entry_paths.reserve(entries_count);
for (auto it = beg; it != end; ++it)
futures.emplace_back(TSA_READ_ONE_THREAD(zookeeper)->asyncGet(fs::path(zookeeper_path_log) / *it));
entry_paths.emplace_back(fs::path(zookeeper_path_log) / *it);
auto entries = TSA_READ_ONE_THREAD(zookeeper)->get(entry_paths);
std::vector<std::pair<TIDHash, CSNEntry>> loaded;
loaded.reserve(entries_count);
auto it = beg;
for (size_t i = 0; i < entries_count; ++i, ++it)
{
auto res = futures[i].get();
auto res = entries[i];
CSN csn = deserializeCSN(*it);
TransactionID tid = deserializeTID(res.data);
loaded.emplace_back(tid.getHash(), CSNEntry{csn, tid});
LOG_TEST(log, "Got entry {} -> {}", tid, csn);
}
futures.clear();
NOEXCEPT_SCOPE_STRICT({
std::lock_guard lock{mutex};

View File

@ -83,6 +83,7 @@ NamesAndTypesList ZooKeeperLogElement::getNamesAndTypes()
{"List", static_cast<Int16>(Coordination::OpNum::List)},
{"Check", static_cast<Int16>(Coordination::OpNum::Check)},
{"Multi", static_cast<Int16>(Coordination::OpNum::Multi)},
{"MultiRead", static_cast<Int16>(Coordination::OpNum::MultiRead)},
{"Auth", static_cast<Int16>(Coordination::OpNum::Auth)},
{"SessionID", static_cast<Int16>(Coordination::OpNum::SessionID)},
{"FilteredList", static_cast<Int16>(Coordination::OpNum::FilteredList)},

View File

@ -127,6 +127,7 @@ bool PullingAsyncPipelineExecutor::pull(Chunk & chunk, uint64_t milliseconds)
if (lazy_format)
{
chunk = lazy_format->getChunk(milliseconds);
data->rethrowExceptionIfHas();
return true;
}

View File

@ -110,7 +110,7 @@ namespace ErrorCodes
namespace
{
bool tryAddHeadersFromConfig(HTTPServerResponse & response, const Poco::Util::LayeredConfiguration & config)
bool tryAddHttpOptionHeadersFromConfig(HTTPServerResponse & response, const Poco::Util::LayeredConfiguration & config)
{
if (config.has("http_options_response"))
{
@ -138,7 +138,7 @@ bool tryAddHeadersFromConfig(HTTPServerResponse & response, const Poco::Util::La
void processOptionsRequest(HTTPServerResponse & response, const Poco::Util::LayeredConfiguration & config)
{
/// If can add some headers from config
if (tryAddHeadersFromConfig(response, config))
if (tryAddHttpOptionHeadersFromConfig(response, config))
{
response.setKeepAlive(false);
response.setStatusAndReason(HTTPResponse::HTTP_NO_CONTENT);
@ -774,16 +774,11 @@ void HTTPHandler::processQuery(
if (in_post_compressed && settings.http_native_compression_disable_checksumming_on_decompress)
static_cast<CompressedReadBuffer &>(*in_post_maybe_compressed).disableChecksumming();
/// Add CORS header if 'add_http_cors_header' setting is turned on send * in Access-Control-Allow-Origin,
/// or if config has http_options_response, which means that there
/// are some headers to be sent, and the client passed Origin header.
if (!request.get("Origin", "").empty())
{
if (config.has("http_options_response"))
tryAddHeadersFromConfig(response, config);
else if (settings.add_http_cors_header)
/// Add CORS header if 'add_http_cors_header' setting is turned on send * in Access-Control-Allow-Origin
/// Note that whether the header is added is determined by the settings, and we can only get the user settings after authentication.
/// Once the authentication fails, the header can't be added.
if (settings.add_http_cors_header && !request.get("Origin", "").empty() && !config.has("http_options_response"))
used_output.out->addHeaderCORS(true);
}
auto append_callback = [context = context] (ProgressCallback callback)
{
@ -971,6 +966,10 @@ void HTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
response.setContentType("text/plain; charset=UTF-8");
response.set("X-ClickHouse-Server-Display-Name", server_display_name);
if (!request.get("Origin", "").empty())
tryAddHttpOptionHeadersFromConfig(response, server.config());
/// For keep-alive to work.
if (request.getVersion() == HTTPServerRequest::HTTP_1_1)
response.setChunkedTransferEncoding(true);

View File

@ -330,7 +330,7 @@ bool MergeFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrite
write_part_log(ExecutionStatus::fromCurrentException());
if (storage.getSettings()->detach_not_byte_identical_parts)
storage.forgetPartAndMoveToDetached(std::move(part), "merge-not-byte-identical");
storage.forcefullyMovePartToDetachedAndRemoveFromMemory(std::move(part), "merge-not-byte-identical");
else
storage.tryRemovePartImmediately(std::move(part));

View File

@ -3210,7 +3210,20 @@ void MergeTreeData::restoreAndActivatePart(const DataPartPtr & part, DataPartsLo
modifyPartState(part, DataPartState::Active);
}
void MergeTreeData::forgetPartAndMoveToDetached(const MergeTreeData::DataPartPtr & part_to_detach, const String & prefix, bool restore_covered)
void MergeTreeData::outdateBrokenPartAndCloneToDetached(const DataPartPtr & part_to_detach, const String & prefix)
{
auto metadata_snapshot = getInMemoryMetadataPtr();
if (prefix.empty())
LOG_INFO(log, "Cloning part {} to {} and making it obsolete.", part_to_detach->data_part_storage->getPartDirectory(), part_to_detach->name);
else
LOG_INFO(log, "Cloning part {} to {}_{} and making it obsolete.", part_to_detach->data_part_storage->getPartDirectory(), prefix, part_to_detach->name);
part_to_detach->makeCloneInDetached(prefix, metadata_snapshot);
removePartsFromWorkingSet(NO_TRANSACTION_RAW, {part_to_detach}, true);
}
void MergeTreeData::forcefullyMovePartToDetachedAndRemoveFromMemory(const MergeTreeData::DataPartPtr & part_to_detach, const String & prefix, bool restore_covered)
{
if (prefix.empty())
LOG_INFO(log, "Renaming {} to {} and forgetting it.", part_to_detach->data_part_storage->getPartDirectory(), part_to_detach->name);
@ -3508,8 +3521,8 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, ContextPtr q
k_inactive = static_cast<ssize_t>(inactive_parts_count_in_partition) - static_cast<ssize_t>(settings->inactive_parts_to_delay_insert);
}
auto parts_to_delay_insert = query_settings.parts_to_delay_insert.changed ? query_settings.parts_to_delay_insert : settings->parts_to_delay_insert;
auto parts_to_throw_insert = query_settings.parts_to_throw_insert.changed ? query_settings.parts_to_throw_insert : settings->parts_to_throw_insert;
auto parts_to_delay_insert = query_settings.parts_to_delay_insert ? query_settings.parts_to_delay_insert : settings->parts_to_delay_insert;
auto parts_to_throw_insert = query_settings.parts_to_throw_insert ? query_settings.parts_to_throw_insert : settings->parts_to_throw_insert;
if (parts_count_in_partition >= parts_to_throw_insert)
{

View File

@ -596,7 +596,12 @@ public:
/// Renames the part to detached/<prefix>_<part> and removes it from data_parts,
//// so it will not be deleted in clearOldParts.
/// If restore_covered is true, adds to the working set inactive parts, which were merged into the deleted part.
void forgetPartAndMoveToDetached(const DataPartPtr & part, const String & prefix = "", bool restore_covered = false);
/// NOTE: This method is safe to use only for parts which nobody else holds (like on server start or for parts which was not committed).
/// For active parts it's unsafe because this method modifies fields of part (rename) while some other thread can try to read it.
void forcefullyMovePartToDetachedAndRemoveFromMemory(const DataPartPtr & part, const String & prefix = "", bool restore_covered = false);
/// Outdate broken part, set remove time to zero (remove as fast as possible) and make clone in detached directory.
void outdateBrokenPartAndCloneToDetached(const DataPartPtr & part, const String & prefix);
/// If the part is Obsolete and not used by anybody else, immediately delete it from filesystem and remove from memory.
void tryRemovePartImmediately(DataPartPtr && part);

View File

@ -218,7 +218,7 @@ bool MutateFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrit
write_part_log(ExecutionStatus::fromCurrentException());
if (storage.getSettings()->detach_not_byte_identical_parts)
storage.forgetPartAndMoveToDetached(std::move(new_part), "mutate-not-byte-identical");
storage.forcefullyMovePartToDetachedAndRemoveFromMemory(std::move(new_part), "mutate-not-byte-identical");
else
storage.tryRemovePartImmediately(std::move(new_part));

View File

@ -373,7 +373,7 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPart(const String & part_na
LOG_ERROR(log, fmt::runtime(message));
/// Delete part locally.
storage.forgetPartAndMoveToDetached(part, "broken");
storage.outdateBrokenPartAndCloneToDetached(part, "broken");
/// Part is broken, let's try to find it and fetch.
searchForMissingPartAndFetchIfPossible(part_name, exists_in_zookeeper);
@ -390,7 +390,7 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPart(const String & part_na
String message = "Unexpected part " + part_name + " in filesystem. Removing.";
LOG_ERROR(log, fmt::runtime(message));
storage.forgetPartAndMoveToDetached(part, "unexpected");
storage.outdateBrokenPartAndCloneToDetached(part, "unexpected");
return {part_name, false, message};
}
else

View File

@ -1977,9 +1977,12 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
if (!lock_holder_paths.empty())
{
Strings partitions = zookeeper->getChildren(fs::path(queue.zookeeper_path) / "block_numbers");
std::vector<std::future<Coordination::ListResponse>> lock_futures;
std::vector<std::string> paths;
paths.reserve(partitions.size());
for (const String & partition : partitions)
lock_futures.push_back(zookeeper->asyncGetChildren(fs::path(queue.zookeeper_path) / "block_numbers" / partition));
paths.push_back(fs::path(queue.zookeeper_path) / "block_numbers" / partition);
auto locks_children = zookeeper->getChildren(paths);
struct BlockInfoInZooKeeper
{
@ -1992,7 +1995,7 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
std::vector<BlockInfoInZooKeeper> block_infos;
for (size_t i = 0; i < partitions.size(); ++i)
{
Strings partition_block_numbers = lock_futures[i].get().names;
Strings partition_block_numbers = locks_children[i].names;
for (const String & entry : partition_block_numbers)
{
/// TODO: cache block numbers that are abandoned.

View File

@ -249,7 +249,7 @@ void ReplicatedMergeTreeRestartingThread::removeFailedQuorumParts()
if (part)
{
LOG_DEBUG(log, "Found part {} with failed quorum. Moving to detached. This shouldn't happen often.", part_name);
storage.forgetPartAndMoveToDetached(part, "noquorum");
storage.forcefullyMovePartToDetachedAndRemoveFromMemory(part, "noquorum");
storage.queue.removeFailedQuorumPart(part->info);
}
}

View File

@ -124,8 +124,6 @@ public:
{
auto zookeeper = storage.getClient();
Coordination::Requests requests;
auto keys_limit = storage.keysLimit();
size_t current_keys_num = 0;
@ -140,23 +138,25 @@ public:
current_keys_num = data_stat.numChildren;
}
std::vector<std::pair<const std::string *, std::future<Coordination::ExistsResponse>>> exist_responses;
for (const auto & [key, value] : new_values)
{
auto path = storage.fullPathForKey(key);
std::vector<std::string> key_paths;
key_paths.reserve(new_values.size());
for (const auto & [key, _] : new_values)
key_paths.push_back(storage.fullPathForKey(key));
exist_responses.push_back({&key, zookeeper->asyncExists(path)});
}
auto results = zookeeper->exists(key_paths);
for (auto & [key, response] : exist_responses)
Coordination::Requests requests;
requests.reserve(key_paths.size());
for (size_t i = 0; i < key_paths.size(); ++i)
{
if (response.get().error == Coordination::Error::ZOK)
auto key = fs::path(key_paths[i]).filename();
if (results[i].error == Coordination::Error::ZOK)
{
requests.push_back(zkutil::makeSetRequest(storage.fullPathForKey(*key), new_values[*key], -1));
requests.push_back(zkutil::makeSetRequest(key_paths[i], new_values[key], -1));
}
else
{
requests.push_back(zkutil::makeCreateRequest(storage.fullPathForKey(*key), new_values[*key], zkutil::CreateMode::Persistent));
requests.push_back(zkutil::makeCreateRequest(key_paths[i], new_values[key], zkutil::CreateMode::Persistent));
++new_keys_num;
}
}

View File

@ -1243,7 +1243,7 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
for (const DataPartPtr & part : unexpected_parts)
{
LOG_ERROR(log, "Renaming unexpected part {} to ignored_{}", part->name, part->name);
forgetPartAndMoveToDetached(part, "ignored", true);
forcefullyMovePartToDetachedAndRemoveFromMemory(part, "ignored", true);
}
}
@ -5017,7 +5017,7 @@ void StorageReplicatedMergeTree::restoreMetadataInZooKeeper()
if (part->getState() == DataPartState::Active)
active_parts_names.push_back(part->name);
forgetPartAndMoveToDetached(part);
forcefullyMovePartToDetachedAndRemoveFromMemory(part);
}
LOG_INFO(log, "Moved all parts to detached/");

View File

@ -19,6 +19,9 @@ endif()
if (TARGET ch_contrib::rdkafka)
set(USE_RDKAFKA 1)
endif()
if (TARGET ch_rust::blake3)
set(USE_BLAKE3 1)
endif()
if (TARGET OpenSSL::SSL)
set(USE_SSL 1)
endif()

View File

@ -24,7 +24,8 @@ def test_merge_and_part_corruption(started_cluster):
node1.query(
"""
CREATE TABLE replicated_mt(date Date, id UInt32, value Int32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/replicated_mt', '{replica}') ORDER BY id;
ENGINE = ReplicatedMergeTree('/clickhouse/tables/replicated_mt', '{replica}') ORDER BY id
SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1;
""".format(
replica=node1.name
)

View File

@ -40,7 +40,8 @@ def remove_part_from_disk(node, table, part_name):
def test_lost_part_same_replica(start_cluster):
for node in [node1, node2]:
node.query(
"CREATE TABLE mt0 (id UInt64, date Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/t', '{}') ORDER BY tuple() PARTITION BY date".format(
"CREATE TABLE mt0 (id UInt64, date Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/t', '{}') ORDER BY tuple() PARTITION BY date "
"SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1".format(
node.name
)
)
@ -73,7 +74,7 @@ def test_lost_part_same_replica(start_cluster):
node1.query("ATTACH TABLE mt0")
node1.query("SYSTEM START MERGES mt0")
res, err = node1.http_query_and_get_answer_with_error("SYSTEM SYNC REPLICA mt0")
res, err = node1.query_and_get_answer_with_error("SYSTEM SYNC REPLICA mt0")
print("result: ", res)
print("error: ", res)
@ -104,7 +105,8 @@ def test_lost_part_same_replica(start_cluster):
def test_lost_part_other_replica(start_cluster):
for node in [node1, node2]:
node.query(
"CREATE TABLE mt1 (id UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/t1', '{}') ORDER BY tuple()".format(
"CREATE TABLE mt1 (id UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/t1', '{}') ORDER BY tuple() "
"SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1".format(
node.name
)
)
@ -136,7 +138,7 @@ def test_lost_part_other_replica(start_cluster):
node1.query("CHECK TABLE mt1")
node2.query("SYSTEM START REPLICATION QUEUES")
res, err = node1.http_query_and_get_answer_with_error("SYSTEM SYNC REPLICA mt1")
res, err = node1.query_and_get_answer_with_error("SYSTEM SYNC REPLICA mt1")
print("result: ", res)
print("error: ", res)
@ -168,7 +170,8 @@ def test_lost_part_other_replica(start_cluster):
def test_lost_part_mutation(start_cluster):
for node in [node1, node2]:
node.query(
"CREATE TABLE mt2 (id UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/t2', '{}') ORDER BY tuple()".format(
"CREATE TABLE mt2 (id UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/t2', '{}') ORDER BY tuple() "
"SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1".format(
node.name
)
)
@ -196,7 +199,7 @@ def test_lost_part_mutation(start_cluster):
node1.query("CHECK TABLE mt2")
node1.query("SYSTEM START MERGES mt2")
res, err = node1.http_query_and_get_answer_with_error("SYSTEM SYNC REPLICA mt2")
res, err = node1.query_and_get_answer_with_error("SYSTEM SYNC REPLICA mt2")
print("result: ", res)
print("error: ", res)
@ -225,7 +228,9 @@ def test_lost_last_part(start_cluster):
for node in [node1, node2]:
node.query(
"CREATE TABLE mt3 (id UInt64, p String) ENGINE ReplicatedMergeTree('/clickhouse/tables/t3', '{}') "
"ORDER BY tuple() PARTITION BY p".format(node.name)
"ORDER BY tuple() PARTITION BY p SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1".format(
node.name
)
)
node1.query("SYSTEM STOP MERGES mt3")
@ -246,9 +251,6 @@ def test_lost_last_part(start_cluster):
node1.query("CHECK TABLE mt3")
node1.query("SYSTEM START MERGES mt3")
res, err = node1.http_query_and_get_answer_with_error("SYSTEM SYNC REPLICA mt3")
print("result: ", res)
print("error: ", res)
for i in range(10):
result = node1.query("SELECT count() FROM system.replication_queue")
@ -259,6 +261,10 @@ def test_lost_last_part(start_cluster):
"DROP/DETACH PARTITION"
):
break
if node1.contains_in_log(
"Created empty part 8b8f0fede53df97513a9fb4cb19dc1e4_0_0_0 "
):
break
time.sleep(1)
else:
assert False, "Don't have required messages in node1 log"

View File

@ -0,0 +1,12 @@
<test>
<query>
WITH
(
SELECT bitmapBuild(groupArray(number))
FROM numbers(3000000)
) AS a,
[a, a, a] AS b
SELECT sum(bitmapCardinality(b[(number % 3) + 1]))
FROM numbers(10000)
</query>
</test>

View File

@ -8,6 +8,7 @@
<value>SHA224</value>
<value>SHA256</value>
<value>halfMD5</value>
<value>blake3</value>
</values>
</substitution>
<substitution>

View File

@ -9,7 +9,7 @@ Sum before DETACH PARTITION:
Sum after DETACH PARTITION:
0
system.detached_parts after DETACH PARTITION:
default not_partitioned all all_1_2_1 default 1 2 1
default not_partitioned all all_1_2_1 1 2 1
*** Partitioned by week ***
Parts before OPTIMIZE:
1999-12-27 19991227_1_1_0

View File

@ -1,4 +1,3 @@
-- Tags: no-s3-storage
SELECT '*** Not partitioned ***';
DROP TABLE IF EXISTS not_partitioned;
@ -19,7 +18,7 @@ ALTER TABLE not_partitioned DETACH PARTITION ID 'all';
SELECT 'Sum after DETACH PARTITION:';
SELECT sum(x) FROM not_partitioned;
SELECT 'system.detached_parts after DETACH PARTITION:';
SELECT * FROM system.detached_parts WHERE database = currentDatabase() AND table = 'not_partitioned';
SELECT system.detached_parts.* EXCEPT disk FROM system.detached_parts WHERE database = currentDatabase() AND table = 'not_partitioned';
DROP TABLE not_partitioned;

View File

@ -1,5 +1,6 @@
#!/usr/bin/env bash
# Tags: long, no-s3-storage
# no-s3 because read FileOpen metric
set -e

View File

@ -1,4 +1,5 @@
-- Tags: no-parallel, no-s3-storage
-- With s3 policy TTL TO DISK 'default' doesn't work (because we have no default, only 's3')
drop table if exists ttl;
set mutations_sync = 2;

View File

@ -1,7 +1,6 @@
#!/usr/bin/env bash
# Tags: long, no-replicated-database, no-s3-storage
# Tags: long, no-replicated-database
# Tag no-replicated-database: Fails due to additional replicas or shards
# Tag no-s3-storage: Merge assigned to replica 2, but replication queues are stopped for it
set -e

View File

@ -1,5 +1,5 @@
-- Tags: no-s3-storage
-- Temporary supressed
-- no-s3 because read FileOpen metric
DROP TABLE IF EXISTS nested;
SET flatten_nested = 0;

View File

@ -3,3 +3,8 @@
< Access-Control-Allow-Headers: origin, x-requested-with
< Access-Control-Allow-Methods: POST, GET, OPTIONS
< Access-Control-Max-Age: 86400
< HTTP/1.1 403 Forbidden
< Access-Control-Allow-Origin: *
< Access-Control-Allow-Headers: origin, x-requested-with
< Access-Control-Allow-Methods: POST, GET, OPTIONS
< Access-Control-Max-Age: 86400

View File

@ -6,3 +6,6 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# grep all fields, that should be set for CORS support (see CORS.xml)
$CLICKHOUSE_CURL "${CLICKHOUSE_URL}" -X OPTIONS -vs 2>&1 | grep -E "HTTP/1.1 204 No Content|Access-Control-Allow-Origin|Access-Control-Allow-Headers|Access-Control-Allow-Methods|Access-Control-Max-Age"
# grep all fields, that should be set for CORS support (see CORS.xml)
echo 'SELECT 1' | $CLICKHOUSE_CURL -X POST -H 'Origin: clickhouse-test' "${CLICKHOUSE_URL}&password=wrong_password" --data @- -vs 2>&1 | grep -E "HTTP/1.1 403 Forbidden|Access-Control-Allow-Origin|Access-Control-Allow-Headers|Access-Control-Allow-Methods|Access-Control-Max-Age"

View File

@ -1,4 +1,4 @@
-- Tags: no-parallel, no-fasttest, no-s3-storage
-- Tags: no-parallel, no-fasttest
DROP TABLE IF EXISTS t_s3_compressed_blocks;

View File

@ -0,0 +1,3 @@
0C673DA1EF75D2DAA895483138340F041881EA975D57C1435D487F454A111B74
007ED777B7A1CBA08D37BDA339EFABB42FA460D953070779903125B0F4D5FB5F
E25232688E2A4D3A55174DECB33815A27B2A92DC8839E3CDA456105C259BB071

View File

@ -0,0 +1,5 @@
-- Tags: no-fasttest
SELECT hex(blake3('test_1'));
SELECT hex(blake3('test_2'));
SELECT hex(blake3('test_3'));

View File

@ -13,15 +13,15 @@ export CLICKHOUSE_TEST_NAME
export CLICKHOUSE_TEST_ZOOKEEPER_PREFIX="${CLICKHOUSE_TEST_NAME}_${CLICKHOUSE_DATABASE}"
export CLICKHOUSE_TEST_UNIQUE_NAME="${CLICKHOUSE_TEST_NAME}_${CLICKHOUSE_DATABASE}"
[ -v CLICKHOUSE_CONFIG_CLIENT ] && CLICKHOUSE_CLIENT_OPT0+=" --config-file=${CLICKHOUSE_CONFIG_CLIENT} "
[ -v CLICKHOUSE_HOST ] && CLICKHOUSE_CLIENT_OPT0+=" --host=${CLICKHOUSE_HOST} "
[ -v CLICKHOUSE_PORT_TCP ] && CLICKHOUSE_CLIENT_OPT0+=" --port=${CLICKHOUSE_PORT_TCP} "
[ -v CLICKHOUSE_PORT_TCP ] && CLICKHOUSE_BENCHMARK_OPT0+=" --port=${CLICKHOUSE_PORT_TCP} "
[ -v CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL ] && CLICKHOUSE_CLIENT_OPT0+=" --send_logs_level=${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL} "
[ -v CLICKHOUSE_DATABASE ] && CLICKHOUSE_CLIENT_OPT0+=" --database=${CLICKHOUSE_DATABASE} "
[ -v CLICKHOUSE_LOG_COMMENT ] && CLICKHOUSE_CLIENT_OPT0+=" --log_comment $(printf '%q' ${CLICKHOUSE_LOG_COMMENT}) "
[ -v CLICKHOUSE_DATABASE ] && CLICKHOUSE_BENCHMARK_OPT0+=" --database=${CLICKHOUSE_DATABASE} "
[ -v CLICKHOUSE_LOG_COMMENT ] && CLICKHOUSE_BENCHMARK_OPT0+=" --log_comment $(printf '%q' ${CLICKHOUSE_LOG_COMMENT}) "
[ -n "${CLICKHOUSE_CONFIG_CLIENT:-}" ] && CLICKHOUSE_CLIENT_OPT0+=" --config-file=${CLICKHOUSE_CONFIG_CLIENT} "
[ -n "${CLICKHOUSE_HOST:-}" ] && CLICKHOUSE_CLIENT_OPT0+=" --host=${CLICKHOUSE_HOST} "
[ -n "${CLICKHOUSE_PORT_TCP:-}" ] && CLICKHOUSE_CLIENT_OPT0+=" --port=${CLICKHOUSE_PORT_TCP} "
[ -n "${CLICKHOUSE_PORT_TCP:-}" ] && CLICKHOUSE_BENCHMARK_OPT0+=" --port=${CLICKHOUSE_PORT_TCP} "
[ -n "${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL:-}" ] && CLICKHOUSE_CLIENT_OPT0+=" --send_logs_level=${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL} "
[ -n "${CLICKHOUSE_DATABASE:-}" ] && CLICKHOUSE_CLIENT_OPT0+=" --database=${CLICKHOUSE_DATABASE} "
[ -n "${CLICKHOUSE_LOG_COMMENT:-}" ] && CLICKHOUSE_CLIENT_OPT0+=" --log_comment $(printf '%q' ${CLICKHOUSE_LOG_COMMENT}) "
[ -n "${CLICKHOUSE_DATABASE:-}" ] && CLICKHOUSE_BENCHMARK_OPT0+=" --database=${CLICKHOUSE_DATABASE} "
[ -n "${CLICKHOUSE_LOG_COMMENT:-}" ] && CLICKHOUSE_BENCHMARK_OPT0+=" --log_comment $(printf '%q' ${CLICKHOUSE_LOG_COMMENT}) "
export CLICKHOUSE_BINARY=${CLICKHOUSE_BINARY:="clickhouse"}
# client
@ -81,20 +81,20 @@ export CLICKHOUSE_PORT_KEEPER=${CLICKHOUSE_PORT_KEEPER:="9181"}
export CLICKHOUSE_CLIENT_SECURE=${CLICKHOUSE_CLIENT_SECURE:=$(echo "${CLICKHOUSE_CLIENT}" | sed 's/--secure //' | sed 's/'"--port=${CLICKHOUSE_PORT_TCP}"'//g; s/$/'"--secure --accept-invalid-certificate --port=${CLICKHOUSE_PORT_TCP_SECURE}"'/g')}
# Add database and log comment to url params
if [ -v CLICKHOUSE_URL_PARAMS ]
if [ -n "${CLICKHOUSE_URL_PARAMS:-}" ]
then
export CLICKHOUSE_URL_PARAMS="${CLICKHOUSE_URL_PARAMS}&database=${CLICKHOUSE_DATABASE}"
else
export CLICKHOUSE_URL_PARAMS="database=${CLICKHOUSE_DATABASE}"
fi
# Note: missing url encoding of the log comment.
[ -v CLICKHOUSE_LOG_COMMENT ] && export CLICKHOUSE_URL_PARAMS="${CLICKHOUSE_URL_PARAMS}&log_comment=${CLICKHOUSE_LOG_COMMENT}"
[ -n "${CLICKHOUSE_LOG_COMMENT:-}" ] && export CLICKHOUSE_URL_PARAMS="${CLICKHOUSE_URL_PARAMS}&log_comment=${CLICKHOUSE_LOG_COMMENT}"
export CLICKHOUSE_URL=${CLICKHOUSE_URL:="${CLICKHOUSE_PORT_HTTP_PROTO}://${CLICKHOUSE_HOST}:${CLICKHOUSE_PORT_HTTP}/"}
export CLICKHOUSE_URL_HTTPS=${CLICKHOUSE_URL_HTTPS:="https://${CLICKHOUSE_HOST}:${CLICKHOUSE_PORT_HTTPS}/"}
# Add url params to url
if [ -v CLICKHOUSE_URL_PARAMS ]
if [ -n "${CLICKHOUSE_URL_PARAMS:-}" ]
then
export CLICKHOUSE_URL="${CLICKHOUSE_URL}?${CLICKHOUSE_URL_PARAMS}"
export CLICKHOUSE_URL_HTTPS="${CLICKHOUSE_URL_HTTPS}?${CLICKHOUSE_URL_PARAMS}"
@ -117,10 +117,10 @@ mkdir -p ${CLICKHOUSE_TMP}
export MYSQL_CLIENT_BINARY=${MYSQL_CLIENT_BINARY:="mysql"}
export MYSQL_CLIENT_CLICKHOUSE_USER=${MYSQL_CLIENT_CLICKHOUSE_USER:="default"}
# Avoids "Can't connect to local MySQL server through socket '/var/run/mysqld/mysqld.sock'" when connecting to localhost
[ -v CLICKHOUSE_HOST ] && MYSQL_CLIENT_OPT0+=" --protocol tcp "
[ -v CLICKHOUSE_HOST ] && MYSQL_CLIENT_OPT0+=" --host ${CLICKHOUSE_HOST} "
[ -v CLICKHOUSE_PORT_MYSQL ] && MYSQL_CLIENT_OPT0+=" --port ${CLICKHOUSE_PORT_MYSQL} "
[ -v CLICKHOUSE_DATABASE ] && MYSQL_CLIENT_OPT0+=" --database ${CLICKHOUSE_DATABASE} "
[ -n "${CLICKHOUSE_HOST:-}" ] && MYSQL_CLIENT_OPT0+=" --protocol tcp "
[ -n "${CLICKHOUSE_HOST:-}" ] && MYSQL_CLIENT_OPT0+=" --host ${CLICKHOUSE_HOST} "
[ -n "${CLICKHOUSE_PORT_MYSQL:-}" ] && MYSQL_CLIENT_OPT0+=" --port ${CLICKHOUSE_PORT_MYSQL} "
[ -n "${CLICKHOUSE_DATABASE:-}" ] && MYSQL_CLIENT_OPT0+=" --database ${CLICKHOUSE_DATABASE} "
MYSQL_CLIENT_OPT0+=" --user ${MYSQL_CLIENT_CLICKHOUSE_USER} "
export MYSQL_CLIENT_OPT="${MYSQL_CLIENT_OPT0:-} ${MYSQL_CLIENT_OPT:-}"
export MYSQL_CLIENT=${MYSQL_CLIENT:="$MYSQL_CLIENT_BINARY ${MYSQL_CLIENT_OPT:-}"}