Merge remote-tracking branch 'origin/master' into HEAD

This commit is contained in:
Alexander Kuzmenkov 2020-11-25 23:38:55 +03:00
commit 394b81ac46
171 changed files with 8648 additions and 726 deletions

6
.gitmodules vendored
View File

@ -44,6 +44,7 @@
[submodule "contrib/protobuf"]
path = contrib/protobuf
url = https://github.com/ClickHouse-Extras/protobuf.git
branch = v3.13.0.1
[submodule "contrib/boost"]
path = contrib/boost
url = https://github.com/ClickHouse-Extras/boost.git
@ -107,6 +108,7 @@
[submodule "contrib/grpc"]
path = contrib/grpc
url = https://github.com/ClickHouse-Extras/grpc.git
branch = v1.33.2
[submodule "contrib/aws"]
path = contrib/aws
url = https://github.com/ClickHouse-Extras/aws-sdk-cpp.git
@ -200,3 +202,7 @@
[submodule "contrib/xz"]
path = contrib/xz
url = https://github.com/xz-mirror/xz
[submodule "contrib/abseil-cpp"]
path = contrib/abseil-cpp
url = https://github.com/ClickHouse-Extras/abseil-cpp.git
branch = lts_2020_02_25

View File

@ -3,7 +3,6 @@
/// Macros for convenient usage of Poco logger.
#include <fmt/format.h>
#include <fmt/ostream.h>
#include <Poco/Logger.h>
#include <Poco/Message.h>
#include <Common/CurrentThread.h>

View File

@ -0,0 +1,19 @@
#define _GNU_SOURCE
#include <sys/socket.h>
#include <errno.h>
#include <fcntl.h>
#include "syscall.h"
int accept4(int fd, struct sockaddr *restrict addr, socklen_t *restrict len, int flg)
{
if (!flg) return accept(fd, addr, len);
int ret = socketcall_cp(accept4, fd, addr, len, flg, 0, 0);
if (ret>=0 || (errno != ENOSYS && errno != EINVAL)) return ret;
ret = accept(fd, addr, len);
if (ret<0) return ret;
if (flg & SOCK_CLOEXEC)
__syscall(SYS_fcntl, ret, F_SETFD, FD_CLOEXEC);
if (flg & SOCK_NONBLOCK)
__syscall(SYS_fcntl, ret, F_SETFL, O_NONBLOCK);
return ret;
}

View File

@ -0,0 +1,37 @@
#include <sys/epoll.h>
#include <signal.h>
#include <errno.h>
#include "syscall.h"
int epoll_create(int size)
{
return epoll_create1(0);
}
int epoll_create1(int flags)
{
int r = __syscall(SYS_epoll_create1, flags);
#ifdef SYS_epoll_create
if (r==-ENOSYS && !flags) r = __syscall(SYS_epoll_create, 1);
#endif
return __syscall_ret(r);
}
int epoll_ctl(int fd, int op, int fd2, struct epoll_event *ev)
{
return syscall(SYS_epoll_ctl, fd, op, fd2, ev);
}
int epoll_pwait(int fd, struct epoll_event *ev, int cnt, int to, const sigset_t *sigs)
{
int r = __syscall(SYS_epoll_pwait, fd, ev, cnt, to, sigs, _NSIG/8);
#ifdef SYS_epoll_wait
if (r==-ENOSYS && !sigs) r = __syscall(SYS_epoll_wait, fd, ev, cnt, to);
#endif
return __syscall_ret(r);
}
int epoll_wait(int fd, struct epoll_event *ev, int cnt, int to)
{
return epoll_pwait(fd, ev, cnt, to, 0);
}

View File

@ -0,0 +1,23 @@
#include <sys/eventfd.h>
#include <unistd.h>
#include <errno.h>
#include "syscall.h"
int eventfd(unsigned int count, int flags)
{
int r = __syscall(SYS_eventfd2, count, flags);
#ifdef SYS_eventfd
if (r==-ENOSYS && !flags) r = __syscall(SYS_eventfd, count);
#endif
return __syscall_ret(r);
}
int eventfd_read(int fd, eventfd_t *value)
{
return (sizeof(*value) == read(fd, value, sizeof(*value))) ? 0 : -1;
}
int eventfd_write(int fd, eventfd_t value)
{
return (sizeof(value) == write(fd, &value, sizeof(value))) ? 0 : -1;
}

View File

@ -0,0 +1,45 @@
#include <sys/auxv.h>
#include <unistd.h> // __environ
#include <errno.h>
// We don't have libc struct available here. Compute aux vector manually.
static unsigned long * __auxv = NULL;
static unsigned long __auxv_secure = 0;
static size_t __find_auxv(unsigned long type)
{
size_t i;
for (i = 0; __auxv[i]; i += 2)
{
if (__auxv[i] == type)
return i + 1;
}
return (size_t) -1;
}
__attribute__((constructor)) static void __auxv_init()
{
size_t i;
for (i = 0; __environ[i]; i++);
__auxv = (unsigned long *) (__environ + i + 1);
size_t secure_idx = __find_auxv(AT_SECURE);
if (secure_idx != ((size_t) -1))
__auxv_secure = __auxv[secure_idx];
}
unsigned long getauxval(unsigned long type)
{
if (type == AT_SECURE)
return __auxv_secure;
if (__auxv)
{
size_t index = __find_auxv(type);
if (index != ((size_t) -1))
return __auxv[index];
}
errno = ENOENT;
return 0;
}

View File

@ -0,0 +1,8 @@
#define _GNU_SOURCE
#include <stdlib.h>
#include <sys/auxv.h>
char * secure_getenv(const char * name)
{
return getauxval(AT_SECURE) ? NULL : getenv(name);
}

View File

@ -13,3 +13,11 @@ long __syscall(syscall_arg_t, ...);
__attribute__((visibility("hidden")))
void *__vdsosym(const char *, const char *);
#define syscall(...) __syscall_ret(__syscall(__VA_ARGS__))
#define socketcall(...) __syscall_ret(__socketcall(__VA_ARGS__))
#define __socketcall(nm,a,b,c,d,e,f) __syscall(SYS_##nm, a, b, c, d, e, f)
#define socketcall_cp socketcall

View File

@ -40,24 +40,10 @@ static int checkver(Verdef *def, int vsym, const char *vername, char *strings)
#define OK_TYPES (1<<STT_NOTYPE | 1<<STT_OBJECT | 1<<STT_FUNC | 1<<STT_COMMON)
#define OK_BINDS (1<<STB_GLOBAL | 1<<STB_WEAK | 1<<STB_GNU_UNIQUE)
extern char** environ;
static Ehdr *eh = NULL;
void *__vdsosym(const char *vername, const char *name);
// We don't have libc struct available here. Compute aux vector manually.
__attribute__((constructor)) static void auxv_init()
{
size_t i, *auxv;
for (i=0; environ[i]; i++);
auxv = (void *)(environ+i+1);
for (i=0; auxv[i] != AT_SYSINFO_EHDR; i+=2)
if (!auxv[i]) return;
if (!auxv[i+1]) return;
eh = (void *)auxv[i+1];
}
void *__vdsosym(const char *vername, const char *name)
{
size_t i;
Ehdr * eh = (void *) getauxval(AT_SYSINFO_EHDR);
if (!eh) return 0;
Phdr *ph = (void *)((char *)eh + eh->e_phoff);
size_t *dynv=0, base=-1;

View File

@ -6,11 +6,9 @@ Defines the following variables:
The include directories of the gRPC framework, including the include directories of the C++ wrapper.
``gRPC_LIBRARIES``
The libraries of the gRPC framework.
``gRPC_UNSECURE_LIBRARIES``
The libraries of the gRPC framework without SSL.
``_gRPC_CPP_PLUGIN``
``gRPC_CPP_PLUGIN``
The plugin for generating gRPC client and server C++ stubs from `.proto` files
``_gRPC_PYTHON_PLUGIN``
``gRPC_PYTHON_PLUGIN``
The plugin for generating gRPC client and server Python stubs from `.proto` files
The following :prop_tgt:`IMPORTED` targets are also defined:
@ -19,6 +17,13 @@ The following :prop_tgt:`IMPORTED` targets are also defined:
``grpc_cpp_plugin``
``grpc_python_plugin``
Set the following variables to adjust the behaviour of this script:
``gRPC_USE_UNSECURE_LIBRARIES``
if set gRPC_LIBRARIES will be filled with the unsecure version of the libraries (i.e. without SSL)
instead of the secure ones.
``gRPC_DEBUG`
if set the debug message will be printed.
Add custom commands to process ``.proto`` files to C++::
protobuf_generate_grpc_cpp(<SRCS> <HDRS>
[DESCRIPTORS <DESC>] [EXPORT_MACRO <MACRO>] [<ARGN>...])
@ -242,6 +247,7 @@ find_library(gRPC_LIBRARY NAMES grpc)
find_library(gRPC_CPP_LIBRARY NAMES grpc++)
find_library(gRPC_UNSECURE_LIBRARY NAMES grpc_unsecure)
find_library(gRPC_CPP_UNSECURE_LIBRARY NAMES grpc++_unsecure)
find_library(gRPC_CARES_LIBRARY NAMES cares)
set(gRPC_LIBRARIES)
if(gRPC_USE_UNSECURE_LIBRARIES)
@ -259,6 +265,7 @@ else()
set(gRPC_LIBRARIES ${gRPC_LIBRARIES} ${gRPC_CPP_LIBRARY})
endif()
endif()
set(gRPC_LIBRARIES ${gRPC_LIBRARIES} ${gRPC_CARES_LIBRARY})
# Restore the original find library ordering.
if(gRPC_USE_STATIC_LIBS)
@ -278,11 +285,11 @@ else()
endif()
# Get full path to plugin.
find_program(_gRPC_CPP_PLUGIN
find_program(gRPC_CPP_PLUGIN
NAMES grpc_cpp_plugin
DOC "The plugin for generating gRPC client and server C++ stubs from `.proto` files")
find_program(_gRPC_PYTHON_PLUGIN
find_program(gRPC_PYTHON_PLUGIN
NAMES grpc_python_plugin
DOC "The plugin for generating gRPC client and server Python stubs from `.proto` files")
@ -317,14 +324,14 @@ endif()
#include(FindPackageHandleStandardArgs.cmake)
FIND_PACKAGE_HANDLE_STANDARD_ARGS(gRPC
REQUIRED_VARS gRPC_LIBRARY gRPC_CPP_LIBRARY gRPC_UNSECURE_LIBRARY gRPC_CPP_UNSECURE_LIBRARY
gRPC_INCLUDE_DIR gRPC_CPP_INCLUDE_DIR _gRPC_CPP_PLUGIN _gRPC_PYTHON_PLUGIN)
REQUIRED_VARS gRPC_LIBRARY gRPC_CPP_LIBRARY gRPC_UNSECURE_LIBRARY gRPC_CPP_UNSECURE_LIBRARY gRPC_CARES_LIBRARY
gRPC_INCLUDE_DIR gRPC_CPP_INCLUDE_DIR gRPC_CPP_PLUGIN gRPC_PYTHON_PLUGIN)
if(gRPC_FOUND)
if(gRPC_DEBUG)
message(STATUS "gRPC: INCLUDE_DIRS=${gRPC_INCLUDE_DIRS}")
message(STATUS "gRPC: LIBRARIES=${gRPC_LIBRARIES}")
message(STATUS "gRPC: CPP_PLUGIN=${_gRPC_CPP_PLUGIN}")
message(STATUS "gRPC: PYTHON_PLUGIN=${_gRPC_PYTHON_PLUGIN}")
message(STATUS "gRPC: CPP_PLUGIN=${gRPC_CPP_PLUGIN}")
message(STATUS "gRPC: PYTHON_PLUGIN=${gRPC_PYTHON_PLUGIN}")
endif()
endif()

View File

@ -37,8 +37,8 @@ if(NOT USE_INTERNAL_GRPC_LIBRARY)
if(NOT gRPC_INCLUDE_DIRS OR NOT gRPC_LIBRARIES)
message(${RECONFIGURE_MESSAGE_LEVEL} "Can't find system gRPC library")
set(EXTERNAL_GRPC_LIBRARY_FOUND 0)
elseif(NOT _gRPC_CPP_PLUGIN)
message(${RECONFIGURE_MESSAGE_LEVEL} "Can't find system grcp_cpp_plugin")
elseif(NOT gRPC_CPP_PLUGIN)
message(${RECONFIGURE_MESSAGE_LEVEL} "Can't find system grpc_cpp_plugin")
set(EXTERNAL_GRPC_LIBRARY_FOUND 0)
else()
set(EXTERNAL_GRPC_LIBRARY_FOUND 1)
@ -53,8 +53,8 @@ if(NOT EXTERNAL_GRPC_LIBRARY_FOUND AND NOT MISSING_INTERNAL_GRPC_LIBRARY)
else()
set(gRPC_LIBRARIES grpc grpc++)
endif()
set(_gRPC_CPP_PLUGIN $<TARGET_FILE:grpc_cpp_plugin>)
set(_gRPC_PROTOC_EXECUTABLE $<TARGET_FILE:protobuf::protoc>)
set(gRPC_CPP_PLUGIN $<TARGET_FILE:grpc_cpp_plugin>)
set(gRPC_PYTHON_PLUGIN $<TARGET_FILE:grpc_python_plugin>)
include("${ClickHouse_SOURCE_DIR}/contrib/grpc-cmake/protobuf_generate_grpc.cmake")
@ -62,4 +62,4 @@ if(NOT EXTERNAL_GRPC_LIBRARY_FOUND AND NOT MISSING_INTERNAL_GRPC_LIBRARY)
set(USE_GRPC 1)
endif()
message(STATUS "Using gRPC=${USE_GRPC}: ${gRPC_INCLUDE_DIRS} : ${gRPC_LIBRARIES} : ${_gRPC_CPP_PLUGIN}")
message(STATUS "Using gRPC=${USE_GRPC}: ${gRPC_INCLUDE_DIRS} : ${gRPC_LIBRARIES} : ${gRPC_CPP_PLUGIN}")

1
contrib/abseil-cpp vendored Submodule

@ -0,0 +1 @@
Subproject commit 4f3b686f86c3ebaba7e4e926e62a79cb1c659a54

2
contrib/grpc vendored

@ -1 +1 @@
Subproject commit a6570b863cf76c9699580ba51c7827d5bffaac43
Subproject commit 7436366ceb341ba5c00ea29f1645e02a2b70bf93

View File

@ -1,6 +1,7 @@
set(_gRPC_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/grpc")
set(_gRPC_BINARY_DIR "${ClickHouse_BINARY_DIR}/contrib/grpc")
# Use re2 from ClickHouse contrib, not from gRPC third_party.
if(NOT RE2_INCLUDE_DIR)
message(FATAL_ERROR " grpc: The location of the \"re2\" library is unknown")
endif()
@ -8,6 +9,7 @@ set(gRPC_RE2_PROVIDER "clickhouse" CACHE STRING "" FORCE)
set(_gRPC_RE2_INCLUDE_DIR "${RE2_INCLUDE_DIR}")
set(_gRPC_RE2_LIBRARIES "${RE2_LIBRARY}")
# Use zlib from ClickHouse contrib, not from gRPC third_party.
if(NOT ZLIB_INCLUDE_DIRS)
message(FATAL_ERROR " grpc: The location of the \"zlib\" library is unknown")
endif()
@ -15,6 +17,7 @@ set(gRPC_ZLIB_PROVIDER "clickhouse" CACHE STRING "" FORCE)
set(_gRPC_ZLIB_INCLUDE_DIR "${ZLIB_INCLUDE_DIRS}")
set(_gRPC_ZLIB_LIBRARIES "${ZLIB_LIBRARIES}")
# Use protobuf from ClickHouse contrib, not from gRPC third_party.
if(NOT Protobuf_INCLUDE_DIR OR NOT Protobuf_LIBRARY)
message(FATAL_ERROR " grpc: The location of the \"protobuf\" library is unknown")
elseif (NOT Protobuf_PROTOC_EXECUTABLE)
@ -29,21 +32,33 @@ set(_gRPC_PROTOBUF_PROTOC "protoc")
set(_gRPC_PROTOBUF_PROTOC_EXECUTABLE "${Protobuf_PROTOC_EXECUTABLE}")
set(_gRPC_PROTOBUF_PROTOC_LIBRARIES "${Protobuf_PROTOC_LIBRARY}")
# Use OpenSSL from ClickHouse contrib, not from gRPC third_party.
set(gRPC_SSL_PROVIDER "clickhouse" CACHE STRING "" FORCE)
set(_gRPC_SSL_INCLUDE_DIR ${OPENSSL_INCLUDE_DIR})
set(_gRPC_SSL_LIBRARIES ${OPENSSL_LIBRARIES})
# Use abseil-cpp from ClickHouse contrib, not from gRPC third_party.
set(gRPC_ABSL_PROVIDER "clickhouse" CACHE STRING "" FORCE)
set(ABSL_ROOT_DIR "${ClickHouse_SOURCE_DIR}/contrib/abseil-cpp")
if(NOT EXISTS "${ABSL_ROOT_DIR}/CMakeLists.txt")
message(FATAL_ERROR " grpc: submodule third_party/abseil-cpp is missing. To fix try run: \n git submodule update --init --recursive")
endif()
add_subdirectory("${ABSL_ROOT_DIR}" "${ClickHouse_BINARY_DIR}/contrib/abseil-cpp")
# Choose to build static or shared library for c-ares.
if (MAKE_STATIC_LIBRARIES)
set(CARES_STATIC ON CACHE BOOL "" FORCE)
set(CARES_SHARED OFF CACHE BOOL "" FORCE)
else ()
set(CARES_STATIC OFF CACHE BOOL "" FORCE)
set(CARES_SHARED ON CACHE BOOL "" FORCE)
endif ()
# We don't want to build C# extensions.
set(gRPC_BUILD_CSHARP_EXT OFF)
# We don't want to build abseil tests, so we temporarily switch BUILD_TESTING off.
set(_gRPC_ORIG_BUILD_TESTING ${BUILD_TESTING})
set(BUILD_TESTING OFF)
add_subdirectory("${_gRPC_SOURCE_DIR}" "${_gRPC_BINARY_DIR}")
set(BUILD_TESTING ${_gRPC_ORIG_BUILD_TESTING})
# The contrib/grpc/CMakeLists.txt redefined the PROTOBUF_GENERATE_GRPC_CPP() function for its own purposes,
# so we need to redefine it back.
include("${ClickHouse_SOURCE_DIR}/contrib/grpc-cmake/protobuf_generate_grpc.cmake")

View File

@ -22,7 +22,16 @@ set_source_files_properties(${LIBUNWIND_C_SOURCES} PROPERTIES COMPILE_FLAGS "-st
set(LIBUNWIND_ASM_SOURCES
${LIBUNWIND_SOURCE_DIR}/src/UnwindRegistersRestore.S
${LIBUNWIND_SOURCE_DIR}/src/UnwindRegistersSave.S)
set_source_files_properties(${LIBUNWIND_ASM_SOURCES} PROPERTIES LANGUAGE C)
# CMake doesn't pass the correct architecture for Apple prior to CMake 3.19 [1]
# Workaround these two issues by compiling as C.
#
# [1]: https://gitlab.kitware.com/cmake/cmake/-/issues/20771
if (APPLE AND CMAKE_VERSION VERSION_LESS 3.19)
set_source_files_properties(${LIBUNWIND_ASM_SOURCES} PROPERTIES LANGUAGE C)
else()
enable_language(ASM)
endif()
set(LIBUNWIND_SOURCES
${LIBUNWIND_CXX_SOURCES}

2
contrib/protobuf vendored

@ -1 +1 @@
Subproject commit 445d1ae73a450b1e94622e7040989aa2048402e3
Subproject commit 73b12814204ad9068ba352914d0dc244648b48ee

View File

@ -56,6 +56,7 @@ RUN apt-get update \
libprotoc-dev \
libgrpc++-dev \
protobuf-compiler-grpc \
libc-ares-dev \
rapidjson-dev \
libsnappy-dev \
libparquet-dev \

View File

@ -10,6 +10,11 @@ RUN apt-get update --yes \
gpg-agent \
debsig-verify \
strace \
protobuf-compiler \
protobuf-compiler-grpc \
libprotoc-dev \
libgrpc++-dev \
libc-ares-dev \
--yes --no-install-recommends
#RUN wget -nv -O - http://files.viva64.com/etc/pubkey.txt | sudo apt-key add -
@ -33,7 +38,8 @@ RUN set -x \
&& dpkg -i "${PKG_VERSION}.deb"
CMD echo "Running PVS version $PKG_VERSION" && cd /repo_folder && pvs-studio-analyzer credentials $LICENCE_NAME $LICENCE_KEY -o ./licence.lic \
&& cmake . -D"ENABLE_EMBEDDED_COMPILER"=OFF && ninja re2_st \
&& cmake . -D"ENABLE_EMBEDDED_COMPILER"=OFF -D"USE_INTERNAL_PROTOBUF_LIBRARY"=OFF -D"USE_INTERNAL_GRPC_LIBRARY"=OFF \
&& ninja re2_st clickhouse_grpc_protos \
&& pvs-studio-analyzer analyze -o pvs-studio.log -e contrib -j 4 -l ./licence.lic; \
plog-converter -a GA:1,2 -t fullhtml -o /test_output/pvs-studio-html-report pvs-studio.log; \
plog-converter -a GA:1,2 -t tasklist -o /test_output/pvs-studio-task-report.txt pvs-studio.log

View File

@ -152,7 +152,7 @@ You can specify default arguments for `Replicated` table engine in the server co
```xml
<default_replica_path>/clickhouse/tables/{shard}/{database}/{table}</default_replica_path>
<default_replica_name>{replica}</default_replica_path>
<default_replica_name>{replica}</default_replica_name>
```
In this case, you can omit arguments when creating tables:

View File

@ -44,11 +44,10 @@ stages, such as query planning or distributed queries.
To be useful, the tracing information has to be exported to a monitoring system
that supports OpenTelemetry, such as Jaeger or Prometheus. ClickHouse avoids
a dependency on a particular monitoring system, instead only
providing the tracing data conforming to the standard. A natural way to do so
in an SQL RDBMS is a system table. OpenTelemetry trace span information
a dependency on a particular monitoring system, instead only providing the
tracing data through a system table. OpenTelemetry trace span information
[required by the standard](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/overview.md#span)
is stored in the system table called `system.opentelemetry_span_log`.
is stored in the `system.opentelemetry_span_log` table.
The table must be enabled in the server configuration, see the `opentelemetry_span_log`
element in the default config file `config.xml`. It is enabled by default.
@ -67,3 +66,31 @@ The table has the following columns:
The tags or attributes are saved as two parallel arrays, containing the keys
and values. Use `ARRAY JOIN` to work with them.
## Integration with monitoring systems
At the moment, there is no ready tool that can export the tracing data from
ClickHouse to a monitoring system.
For testing, it is possible to setup the export using a materialized view with the URL engine over the `system.opentelemetry_span_log` table, which would push the arriving log data to an HTTP endpoint of a trace collector. For example, to push the minimal span data to a Zipkin instance running at `http://localhost:9411`, in Zipkin v2 JSON format:
```sql
CREATE MATERIALIZED VIEW default.zipkin_spans
ENGINE = URL('http://127.0.0.1:9411/api/v2/spans', 'JSONEachRow')
SETTINGS output_format_json_named_tuples_as_objects = 1,
output_format_json_array_of_rows = 1 AS
SELECT
lower(hex(reinterpretAsFixedString(trace_id))) AS traceId,
lower(hex(parent_span_id)) AS parentId,
lower(hex(span_id)) AS id,
operation_name AS name,
start_time_us AS timestamp,
finish_time_us - start_time_us AS duration,
cast(tuple('clickhouse'), 'Tuple(serviceName text)') AS localEndpoint,
cast(tuple(
attribute.values[indexOf(attribute.names, 'db.statement')]),
'Tuple("db.statement" text)') AS tags
FROM system.opentelemetry_span_log
```
In case of any errors, the part of the log data for which the error has occurred will be silently lost. Check the server log for error messages if the data does not arrive.

View File

@ -25,7 +25,7 @@ SELECT [DISTINCT] expr_list
[ORDER BY expr_list] [WITH FILL] [FROM expr] [TO expr] [STEP expr]
[LIMIT [offset_value, ]n BY columns]
[LIMIT [n, ]m] [WITH TIES]
[UNION ALL ...]
[UNION ...]
[INTO OUTFILE filename]
[FORMAT format]
```
@ -46,7 +46,7 @@ Specifics of each optional clause are covered in separate sections, which are li
- [SELECT clause](#select-clause)
- [DISTINCT clause](../../../sql-reference/statements/select/distinct.md)
- [LIMIT clause](../../../sql-reference/statements/select/limit.md)
- [UNION ALL clause](../../../sql-reference/statements/select/union-all.md)
- [UNION clause](../../../sql-reference/statements/select/union-all.md)
- [INTO OUTFILE clause](../../../sql-reference/statements/select/into-outfile.md)
- [FORMAT clause](../../../sql-reference/statements/select/format.md)

View File

@ -1,5 +1,5 @@
---
toc_title: UNION ALL
toc_title: UNION
---
# UNION ALL Clause {#union-all-clause}
@ -25,10 +25,13 @@ Type casting is performed for unions. For example, if two queries being combined
Queries that are parts of `UNION ALL` cant be enclosed in round brackets. [ORDER BY](../../../sql-reference/statements/select/order-by.md) and [LIMIT](../../../sql-reference/statements/select/limit.md) are applied to separate queries, not to the final result. If you need to apply a conversion to the final result, you can put all the queries with `UNION ALL` in a subquery in the [FROM](../../../sql-reference/statements/select/from.md) clause.
## Limitations {#limitations}
# UNION DISTINCT Clause {#union-distinct-clause}
The difference between `UNION ALL` and `UNION DISTINCT` is that `UNION DISTINCT` will do a distinct transform for union result, it is equivalent to `SELECT DISTINCT` from a subquery containing `UNION ALL`.
# UNION Clause {#union-clause}
By default, `UNION` has the same behavior as `UNION DISTINCT`, but you can specify union mode by setting `union_default_mode`, values can be 'ALL', 'DISTINCT' or empty string. However, if you use `UNION` with setting `union_default_mode` to empty string, it will throw an exception.
Only `UNION ALL` is supported. The regular `UNION` (`UNION DISTINCT`) is not supported. If you need `UNION DISTINCT`, you can write `SELECT DISTINCT` from a subquery containing `UNION ALL`.
## Implementation Details {#implementation-details}
Queries that are parts of `UNION ALL` can be run simultaneously, and their results can be mixed together.
Queries that are parts of `UNION/UNION ALL/UNION DISTINCT` can be run simultaneously, and their results can be mixed together.

View File

@ -180,6 +180,8 @@ add_subdirectory (obfuscator)
add_subdirectory (install)
add_subdirectory (git-import)
#add_subdirectory (grpc-client)
if (ENABLE_CLICKHOUSE_ODBC_BRIDGE)
add_subdirectory (odbc-bridge)
endif ()

View File

@ -2515,7 +2515,7 @@ public:
{
std::string traceparent = options["opentelemetry-traceparent"].as<std::string>();
std::string error;
if (!context.getClientInfo().parseTraceparentHeader(
if (!context.getClientInfo().client_trace_context.parseTraceparentHeader(
traceparent, error))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
@ -2526,7 +2526,7 @@ public:
if (options.count("opentelemetry-tracestate"))
{
context.getClientInfo().opentelemetry_tracestate =
context.getClientInfo().client_trace_context.tracestate =
options["opentelemetry-tracestate"].as<std::string>();
}

View File

@ -62,6 +62,9 @@ decltype(auto) ClusterCopier::retry(T && func, UInt64 max_tries)
{
std::exception_ptr exception;
if (max_tries == 0)
throw Exception("Cannot perform zero retries", ErrorCodes::LOGICAL_ERROR);
for (UInt64 try_number = 1; try_number <= max_tries; ++try_number)
{
try
@ -605,7 +608,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
settings_push.replication_alter_partitions_sync = 2;
query_alter_ast_string += " ALTER TABLE " + getQuotedTable(original_table) +
" ATTACH PARTITION " + partition_name +
((partition_name == "'all'") ? " ATTACH PARTITION ID " : " ATTACH PARTITION ") + partition_name +
" FROM " + getQuotedTable(helping_table);
LOG_DEBUG(log, "Executing ALTER query: {}", query_alter_ast_string);
@ -636,7 +639,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
if (!task_table.isReplicatedTable())
{
query_deduplicate_ast_string += " OPTIMIZE TABLE " + getQuotedTable(original_table) +
" PARTITION " + partition_name + " DEDUPLICATE;";
((partition_name == "'all'") ? " PARTITION ID " : " PARTITION ") + partition_name + " DEDUPLICATE;";
LOG_DEBUG(log, "Executing OPTIMIZE DEDUPLICATE query: {}", query_alter_ast_string);
@ -807,7 +810,7 @@ bool ClusterCopier::tryDropPartitionPiece(
DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number));
String query = "ALTER TABLE " + getQuotedTable(helping_table);
query += " DROP PARTITION " + task_partition.name + "";
query += ((task_partition.name == "'all'") ? " DROP PARTITION ID " : " DROP PARTITION ") + task_partition.name + "";
/// TODO: use this statement after servers will be updated up to 1.1.54310
// query += " DROP PARTITION ID '" + task_partition.name + "'";
@ -1567,7 +1570,7 @@ void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskT
DatabaseAndTableName original_table = task_table.table_push;
DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number));
String query = "ALTER TABLE " + getQuotedTable(helping_table) + " DROP PARTITION " + partition_name;
String query = "ALTER TABLE " + getQuotedTable(helping_table) + ((partition_name == "'all'") ? " DROP PARTITION ID " : " DROP PARTITION ") + partition_name;
const ClusterPtr & cluster_push = task_table.cluster_push;
Settings settings_push = task_cluster->settings_push;
@ -1670,14 +1673,24 @@ void ClusterCopier::createShardInternalTables(const ConnectionTimeouts & timeout
std::set<String> ClusterCopier::getShardPartitions(const ConnectionTimeouts & timeouts, TaskShard & task_shard)
{
std::set<String> res;
createShardInternalTables(timeouts, task_shard, false);
TaskTable & task_table = task_shard.task_table;
const String & partition_name = queryToString(task_table.engine_push_partition_key_ast);
if (partition_name == "'all'")
{
res.emplace("'all'");
return res;
}
String query;
{
WriteBufferFromOwnString wb;
wb << "SELECT DISTINCT " << queryToString(task_table.engine_push_partition_key_ast) << " AS partition FROM"
wb << "SELECT DISTINCT " << partition_name << " AS partition FROM"
<< " " << getQuotedTable(task_shard.table_read_shard) << " ORDER BY partition DESC";
query = wb.str();
}
@ -1692,7 +1705,6 @@ std::set<String> ClusterCopier::getShardPartitions(const ConnectionTimeouts & ti
local_context.setSettings(task_cluster->settings_pull);
Block block = getBlockWithAllStreamData(InterpreterFactory::get(query_ast, local_context)->execute().getInputStream());
std::set<String> res;
if (block)
{
ColumnWithTypeAndName & column = block.getByPosition(0);
@ -1803,7 +1815,7 @@ UInt64 ClusterCopier::executeQueryOnCluster(
if (execution_mode == ClusterExecutionMode::ON_EACH_NODE)
max_successful_executions_per_shard = 0;
std::atomic<size_t> origin_replicas_number;
std::atomic<size_t> origin_replicas_number = 0;
/// We need to execute query on one replica at least
auto do_for_shard = [&] (UInt64 shard_index, Settings shard_settings)

View File

@ -0,0 +1,7 @@
include_directories(${CMAKE_CURRENT_BINARY_DIR})
get_filename_component(rpc_proto "${CMAKE_CURRENT_SOURCE_DIR}/../server/grpc_protos/GrpcConnection.proto" ABSOLUTE)
protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS ${rpc_proto})
PROTOBUF_GENERATE_GRPC_CPP(GRPC_SRCS GRPC_HDRS ${rpc_proto})
add_executable(grpc-client grpc_client.cpp ${PROTO_SRCS} ${PROTO_HDRS} ${GRPC_SRCS} ${GRPC_HDRS})
target_link_libraries(grpc-client PUBLIC grpc++ PUBLIC libprotobuf PUBLIC daemon)

View File

@ -0,0 +1,173 @@
#include <iostream>
#include <memory>
#include <string>
#include <vector>
#include <thread>
#include <stdlib.h>
#include <grpc++/channel.h>
#include <grpc++/client_context.h>
#include <grpc++/create_channel.h>
#include <grpc++/security/credentials.h>
#include "GrpcConnection.grpc.pb.h"
class GRPCClient
{
public:
explicit GRPCClient(std::shared_ptr<grpc::Channel> channel)
: stub_(GRPCConnection::GRPC::NewStub(channel))
{}
std::string Query(const GRPCConnection::User& userInfo,
const std::string& query,
std::vector<std::string> insert_data = {})
{
GRPCConnection::QueryRequest request;
grpc::Status status;
GRPCConnection::QueryResponse reply;
grpc::ClientContext context;
auto deadline = std::chrono::system_clock::now() + std::chrono::milliseconds(10000);
context.set_deadline(deadline);
auto user = std::make_unique<GRPCConnection::User>(userInfo);
auto querySettigs = std::make_unique<GRPCConnection::QuerySettings>();
int id = rand();
request.set_allocated_user_info(user.release());
// interactive_delay in miliseconds
request.set_interactive_delay(1000);
querySettigs->set_query(query);
querySettigs->set_format("Values");
querySettigs->set_query_id(std::to_string(id));
querySettigs->set_data_stream((insert_data.size() != 0));
(*querySettigs->mutable_settings())["max_query_size"] ="100";
request.set_allocated_query_info(querySettigs.release());
void* got_tag = (void*)1;
bool ok = false;
std::unique_ptr<grpc::ClientReaderWriter<GRPCConnection::QueryRequest, GRPCConnection::QueryResponse> > reader(stub_->Query(&context));
reader->Write(request);
auto write = [&reply, &reader, &insert_data]()
{
GRPCConnection::QueryRequest request_insert;
for (const auto& data : insert_data)
{
request_insert.set_insert_data(data);
if (reply.exception_occured().empty())
{
reader->Write(request_insert);
}
else
{
break;
}
}
request_insert.set_insert_data("");
if (reply.exception_occured().empty())
{
reader->Write(request_insert);
}
// reader->WritesDone();
};
std::thread write_thread(write);
write_thread.detach();
while (reader->Read(&reply))
{
if (!reply.output().empty())
{
std::cout << "Query Part:\n " << id<< reply.output()<<'\n';
}
else if (reply.progress().read_rows()
|| reply.progress().read_bytes()
|| reply.progress().total_rows_to_read()
|| reply.progress().written_rows()
|| reply.progress().written_bytes())
{
std::cout << "Progress " << id<< ":{\n" << "read_rows: " << reply.progress().read_rows() << '\n'
<< "read_bytes: " << reply.progress().read_bytes() << '\n'
<< "total_rows_to_read: " << reply.progress().total_rows_to_read() << '\n'
<< "written_rows: " << reply.progress().written_rows() << '\n'
<< "written_bytes: " << reply.progress().written_bytes() << '\n';
}
else if (!reply.totals().empty())
{
std::cout << "Totals:\n " << id << " " << reply.totals() <<'\n';
}
else if (!reply.extremes().empty())
{
std::cout << "Extremes:\n " << id << " " << reply.extremes() <<'\n';
}
}
if (status.ok() && reply.exception_occured().empty())
{
return "";
}
else if (status.ok() && !reply.exception_occured().empty())
{
return reply.exception_occured();
}
else
{
return "RPC failed";
}
}
private:
std::unique_ptr<GRPCConnection::GRPC::Stub> stub_;
};
int main(int argc, char** argv)
{
GRPCConnection::User userInfo1;
userInfo1.set_user("default");
userInfo1.set_password("");
userInfo1.set_quota("default");
std::cout << "Try: " << argv[1] << std::endl;
grpc::ChannelArguments ch_args;
ch_args.SetMaxReceiveMessageSize(-1);
GRPCClient client(
grpc::CreateCustomChannel(argv[1], grpc::InsecureChannelCredentials(), ch_args));
{
std::cout << client.Query(userInfo1, "CREATE TABLE t (a UInt8) ENGINE = Memory") << std::endl;
std::cout << client.Query(userInfo1, "CREATE TABLE t (a UInt8) ENGINE = Memory") << std::endl;
std::cout << client.Query(userInfo1, "INSERT INTO t VALUES", {"(1),(2),(3)", "(4),(6),(5)"}) << std::endl;
std::cout << client.Query(userInfo1, "INSERT INTO t_not_defined VALUES", {"(1),(2),(3)", "(4),(6),(5)"}) << std::endl;
std::cout << client.Query(userInfo1, "SELECT a FROM t ORDER BY a") << std::endl;
std::cout << client.Query(userInfo1, "DROP TABLE t") << std::endl;
}
{
std::cout << client.Query(userInfo1, "SELECT count() FROM numbers(1)") << std::endl;
std::cout << client.Query(userInfo1, "SELECT 100") << std::endl;
std::cout << client.Query(userInfo1, "SELECT count() FROM numbers(10000000000)") << std::endl;
std::cout << client.Query(userInfo1, "SELECT count() FROM numbers(100)") << std::endl;
}
{
std::cout << client.Query(userInfo1, "CREATE TABLE arrays_test (s String, arr Array(UInt8)) ENGINE = Memory;") << std::endl;
std::cout << client.Query(userInfo1, "INSERT INTO arrays_test VALUES ('Hello', [1,2]), ('World', [3,4,5]), ('Goodbye', []);") << std::endl;
std::cout << client.Query(userInfo1, "SELECT s FROM arrays_test") << std::endl;
std::cout << client.Query(userInfo1, "DROP TABLE arrays_test") << std::endl;
std::cout << client.Query(userInfo1, "") << std::endl;
}
{//Check null return from pipe
std::cout << client.Query(userInfo1, "CREATE TABLE table2 (x UInt8, y UInt8) ENGINE = Memory;") << std::endl;
std::cout << client.Query(userInfo1, "SELECT x FROM table2") << std::endl;
std::cout << client.Query(userInfo1, "DROP TABLE table2") << std::endl;
}
{//Check Totals
std::cout << client.Query(userInfo1, "CREATE TABLE tabl (x UInt8, y UInt8) ENGINE = Memory;") << std::endl;
std::cout << client.Query(userInfo1, "INSERT INTO tabl VALUES (1, 2), (2, 4), (3, 2), (3, 3), (3, 4);") << std::endl;
std::cout << client.Query(userInfo1, "SELECT sum(x), y FROM tabl GROUP BY y WITH TOTALS") << std::endl;
std::cout << client.Query(userInfo1, "DROP TABLE tabl") << std::endl;
}
return 0;
}

View File

@ -109,6 +109,14 @@ void ODBCBridge::defineOptions(Poco::Util::OptionSet & options)
.argument("err-log-path")
.binding("logger.errorlog"));
options.addOption(Poco::Util::Option("stdout-path", "", "stdout log path, default console")
.argument("stdout-path")
.binding("logger.stdout"));
options.addOption(Poco::Util::Option("stderr-path", "", "stderr log path, default console")
.argument("stderr-path")
.binding("logger.stderr"));
using Me = std::decay_t<decltype(*this)>;
options.addOption(Poco::Util::Option("help", "", "produce this help message")
.binding("help")
@ -127,6 +135,27 @@ void ODBCBridge::initialize(Application & self)
config().setString("logger", "ODBCBridge");
/// Redirect stdout, stderr to specified files.
/// Some libraries and sanitizers write to stderr in case of errors.
const auto stdout_path = config().getString("logger.stdout", "");
if (!stdout_path.empty())
{
if (!freopen(stdout_path.c_str(), "a+", stdout))
throw Poco::OpenFileException("Cannot attach stdout to " + stdout_path);
/// Disable buffering for stdout.
setbuf(stdout, nullptr);
}
const auto stderr_path = config().getString("logger.stderr", "");
if (!stderr_path.empty())
{
if (!freopen(stderr_path.c_str(), "a+", stderr))
throw Poco::OpenFileException("Cannot attach stderr to " + stderr_path);
/// Disable buffering for stderr.
setbuf(stderr, nullptr);
}
buildLoggers(config(), logger(), self.commandName());
BaseDaemon::logRevision();

View File

@ -64,6 +64,7 @@
#include <Common/ThreadFuzzer.h>
#include <Server/MySQLHandlerFactory.h>
#include <Server/PostgreSQLHandlerFactory.h>
#include <Server/ProtocolServerAdapter.h>
#if !defined(ARCADIA_BUILD)
@ -84,6 +85,11 @@
# include <Poco/Net/SecureServerSocket.h>
#endif
#if USE_GRPC
# include <Server/GRPCServer.h>
#endif
namespace CurrentMetrics
{
extern const Metric Revision;
@ -806,7 +812,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
http_params->setTimeout(settings.http_receive_timeout);
http_params->setKeepAliveTimeout(keep_alive_timeout);
std::vector<std::unique_ptr<Poco::Net::TCPServer>> servers;
std::vector<ProtocolServerAdapter> servers;
std::vector<std::string> listen_hosts = DB::getMultipleValuesFromConfig(config(), "", "listen_host");
@ -1035,6 +1041,15 @@ int Server::main(const std::vector<std::string> & /*args*/)
LOG_INFO(log, "Listening for PostgreSQL compatibility protocol: " + address.toString());
});
#if USE_GRPC
create_server("grpc_port", [&](UInt16 port)
{
Poco::Net::SocketAddress server_address(listen_host, port);
servers.emplace_back(std::make_unique<GRPCServer>(*this, make_socket_address(listen_host, port)));
LOG_INFO(log, "Listening for gRPC protocol: " + server_address.toString());
});
#endif
/// Prometheus (if defined and not setup yet with http_port)
create_server("prometheus.port", [&](UInt16 port)
{
@ -1056,7 +1071,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->enableNamedSessions();
for (auto & server : servers)
server->start();
server.start();
{
String level_str = config().getString("text_log.level", "");
@ -1088,8 +1103,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
int current_connections = 0;
for (auto & server : servers)
{
server->stop();
current_connections += server->currentConnections();
server.stop();
current_connections += server.currentConnections();
}
if (current_connections)
@ -1109,7 +1124,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
{
current_connections = 0;
for (auto & server : servers)
current_connections += server->currentConnections();
current_connections += server.currentConnections();
if (!current_connections)
break;
sleep_current_ms += sleep_one_ms;

View File

@ -134,6 +134,34 @@
<max_connections>4096</max_connections>
<keep_alive_timeout>3</keep_alive_timeout>
<!-- gRPC protocol (see src/Server/grpc_protos/clickhouse_grpc.proto for the API)
<grpc_port>9001</grpc_port>
<grpc>
<enable_ssl>true</enable_ssl> -->
<!-- The following two files are used only if enable_ssl=1
<ssl_cert_file>/path/to/ssl_cert_file</ssl_cert_file>
<ssl_key_file>/path/to/ssl_key_file</ssl_key_file> -->
<!-- Whether server will request client for a certificate
<ssl_require_client_auth>true</ssl_require_client_auth> -->
<!-- The following file is used only if ssl_require_client_auth=1
<ssl_ca_cert_file>/path/to/ssl_ca_cert_file</ssl_ca_cert_file> -->
<!-- Default compression algorithm (applied if client doesn't specify another algorithm).
Supported algorithms: none, deflate, gzip, stream_gzip
<compression>gzip</compression> -->
<!-- Default compression level (applied if client doesn't specify another level).
Supported levels: none, low, medium, high
<compression_level>high</compression_level> -->
<!-- Send/receive message size limits in bytes. -1 means unlimited
<max_send_message_size>-1</max_send_message_size>
<max_receive_message_size>4194304</max_receive_message_size>
</grpc> -->
<!-- Maximum number of concurrent queries. -->
<max_concurrent_queries>100</max_concurrent_queries>

View File

@ -382,6 +382,10 @@ if (USE_PROTOBUF)
dbms_target_include_directories (SYSTEM BEFORE PRIVATE ${Protobuf_INCLUDE_DIR})
endif ()
if (USE_GRPC)
dbms_target_link_libraries (PUBLIC clickhouse_grpc_protos)
endif()
if (USE_HDFS)
target_link_libraries (clickhouse_common_io PUBLIC ${HDFS3_LIBRARY})
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${HDFS3_INCLUDE_DIR})

View File

@ -207,6 +207,12 @@ void Connection::receiveHello()
/// Receive hello packet.
UInt64 packet_type = 0;
/// Prevent read after eof in readVarUInt in case of reset connection
/// (Poco should throw such exception while reading from socket but
/// sometimes it doesn't for unknown reason)
if (in->eof())
throw Poco::Net::NetException("Connection reset by peer");
readVarUInt(packet_type, *in);
if (packet_type == Protocol::Server::Hello)
{

View File

@ -523,6 +523,9 @@
M(554, LZMA_STREAM_DECODER_FAILED) \
M(555, ROCKSDB_ERROR) \
M(556, SYNC_MYSQL_USER_ACCESS_ERROR)\
M(557, UNKNOWN_UNION) \
M(558, EXPECTED_ALL_OR_DISTINCT) \
M(559, INVALID_GRPC_QUERY_INFO) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -0,0 +1,21 @@
#pragma once
namespace DB
{
// The runtime info we need to create new OpenTelemetry spans.
struct OpenTelemetryTraceContext
{
__uint128_t trace_id = 0;
UInt64 span_id = 0;
// The incoming tracestate header and the trace flags, we just pass them
// downstream. See https://www.w3.org/TR/trace-context/
String tracestate;
__uint8_t trace_flags = 0;
// Parse/compose OpenTelemetry traceparent header.
bool parseTraceparentHeader(const std::string & traceparent, std::string & error);
std::string composeTraceparentHeader() const;
};
}

View File

@ -97,9 +97,6 @@
M(DistributedConnectionStaleReplica, "") \
M(DistributedConnectionFailAtAll, "Total count when distributed connection fails after all retries finished") \
\
M(CompileAttempt, "Number of times a compilation of generated C++ code was initiated.") \
M(CompileSuccess, "Number of times a compilation of generated C++ code was successful.") \
\
M(CompileFunction, "Number of times a compilation of generated LLVM code (to create fused function for complex expressions) was initiated.") \
M(CompiledFunctionExecute, "Number of times a compiled function was executed.") \
M(CompileExpressionsMicroseconds, "Total time spent for compilation of expressions to LLVM code.") \

View File

@ -176,7 +176,7 @@ template class QueryProfilerBase<QueryProfilerReal>;
template class QueryProfilerBase<QueryProfilerCpu>;
QueryProfilerReal::QueryProfilerReal(const UInt64 thread_id, const UInt32 period)
: QueryProfilerBase(thread_id, CLOCK_REALTIME, period, SIGUSR1)
: QueryProfilerBase(thread_id, CLOCK_MONOTONIC, period, SIGUSR1)
{}
void QueryProfilerReal::signalHandler(int sig, siginfo_t * info, void * context)

View File

@ -2,6 +2,7 @@
#include <Common/ThreadProfileEvents.h>
#include <Common/QueryProfiler.h>
#include <Common/ThreadStatus.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Poco/Logger.h>
#include <common/getThreadId.h>

View File

@ -3,6 +3,7 @@
#include <common/StringRef.h>
#include <Common/ProfileEvents.h>
#include <Common/MemoryTracker.h>
#include <Common/OpenTelemetryTraceContext.h>
#include <Core/SettingsEnums.h>
@ -31,6 +32,7 @@ class ThreadStatus;
class QueryProfilerReal;
class QueryProfilerCpu;
class QueryThreadLog;
struct OpenTelemetrySpanHolder;
class TasksStatsCounters;
struct RUsageCounters;
struct PerfEventsCounters;
@ -86,9 +88,6 @@ extern thread_local ThreadStatus * current_thread;
class ThreadStatus : public boost::noncopyable
{
public:
ThreadStatus();
~ThreadStatus();
/// Linux's PID (or TGID) (the same id is shown by ps util)
const UInt64 thread_id = 0;
/// Also called "nice" value. If it was changed to non-zero (when attaching query) - will be reset to zero when query is detached.
@ -110,6 +109,52 @@ public:
using Deleter = std::function<void()>;
Deleter deleter;
// This is the current most-derived OpenTelemetry span for this thread. It
// can be changed throughout the query execution, whenever we enter a new
// span or exit it. See OpenTelemetrySpanHolder that is normally responsible
// for these changes.
OpenTelemetryTraceContext thread_trace_context;
protected:
ThreadGroupStatusPtr thread_group;
std::atomic<int> thread_state{ThreadState::DetachedFromQuery};
/// Is set once
Context * global_context = nullptr;
/// Use it only from current thread
Context * query_context = nullptr;
String query_id;
/// A logs queue used by TCPHandler to pass logs to a client
InternalTextLogsQueueWeakPtr logs_queue_ptr;
bool performance_counters_finalized = false;
UInt64 query_start_time_nanoseconds = 0;
UInt64 query_start_time_microseconds = 0;
time_t query_start_time = 0;
size_t queries_started = 0;
// CPU and Real time query profilers
std::unique_ptr<QueryProfilerReal> query_profiler_real;
std::unique_ptr<QueryProfilerCpu> query_profiler_cpu;
Poco::Logger * log = nullptr;
friend class CurrentThread;
/// Use ptr not to add extra dependencies in the header
std::unique_ptr<RUsageCounters> last_rusage;
std::unique_ptr<TasksStatsCounters> taskstats;
/// Is used to send logs from logs_queue to client in case of fatal errors.
std::function<void()> fatal_error_callback;
public:
ThreadStatus();
~ThreadStatus();
ThreadGroupStatusPtr getThreadGroup() const
{
return thread_group;
@ -176,40 +221,6 @@ protected:
void assertState(const std::initializer_list<int> & permitted_states, const char * description = nullptr) const;
ThreadGroupStatusPtr thread_group;
std::atomic<int> thread_state{ThreadState::DetachedFromQuery};
/// Is set once
Context * global_context = nullptr;
/// Use it only from current thread
Context * query_context = nullptr;
String query_id;
/// A logs queue used by TCPHandler to pass logs to a client
InternalTextLogsQueueWeakPtr logs_queue_ptr;
bool performance_counters_finalized = false;
UInt64 query_start_time_nanoseconds = 0;
UInt64 query_start_time_microseconds = 0;
time_t query_start_time = 0;
size_t queries_started = 0;
// CPU and Real time query profilers
std::unique_ptr<QueryProfilerReal> query_profiler_real;
std::unique_ptr<QueryProfilerCpu> query_profiler_cpu;
Poco::Logger * log = nullptr;
friend class CurrentThread;
/// Use ptr not to add extra dependencies in the header
std::unique_ptr<RUsageCounters> last_rusage;
std::unique_ptr<TasksStatsCounters> taskstats;
/// Is used to send logs from logs_queue to client in case of fatal errors.
std::function<void()> fatal_error_callback;
private:
void setupState(const ThreadGroupStatusPtr & thread_group_);

View File

@ -326,6 +326,16 @@ struct ODBCBridgeMixin
cmd_args.push_back("--err-log-path");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_errlog"));
}
if (config.has("logger." + configPrefix() + "_stdout"))
{
cmd_args.push_back("--stdout-path");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_stdout"));
}
if (config.has("logger." + configPrefix() + "_stderr"))
{
cmd_args.push_back("--stderr-path");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_stderr"));
}
if (config.has("logger." + configPrefix() + "_level"))
{
cmd_args.push_back("--log-level");

View File

@ -1114,6 +1114,7 @@ void ZooKeeper::sendThread()
info.request->probably_sent = true;
info.request->write(*out);
/// We sent close request, exit
if (info.request->xid == close_xid)
break;
}
@ -1342,21 +1343,25 @@ void ZooKeeper::receiveEvent()
void ZooKeeper::finalize(bool error_send, bool error_receive)
{
/// If some thread (send/receive) already finalizing session don't try to do it
if (finalization_started.exchange(true))
return;
auto expire_session_if_not_expired = [&]
{
std::lock_guard lock(push_request_mutex);
if (expired)
return;
expired = true;
}
active_session_metric_increment.destroy();
if (!expired)
{
expired = true;
active_session_metric_increment.destroy();
}
};
try
{
if (!error_send)
{
/// Send close event. This also signals sending thread to wakeup and then stop.
/// Send close event. This also signals sending thread to stop.
try
{
close();
@ -1364,12 +1369,18 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
catch (...)
{
/// This happens for example, when "Cannot push request to queue within operation timeout".
/// Just mark session expired in case of error on close request, otherwise sendThread may not stop.
expire_session_if_not_expired();
tryLogCurrentException(__PRETTY_FUNCTION__);
}
/// Send thread will exit after sending close request or on expired flag
send_thread.join();
}
/// Set expired flag after we sent close event
expire_session_if_not_expired();
try
{
/// This will also wakeup the receiving thread.

View File

@ -187,6 +187,9 @@ private:
std::atomic<XID> next_xid {1};
std::atomic<bool> expired {false};
/// Mark session finalization start. Used to avoid simultaneous
/// finalization from different threads. One-shot flag.
std::atomic<bool> finalization_started {false};
std::mutex push_request_mutex;
using clock = std::chrono::steady_clock;

View File

@ -407,6 +407,7 @@ class IColumn;
M(Bool, force_optimize_skip_unused_shards_no_nested, false, "Obsolete setting, does nothing. Will be removed after 2020-12-01. Use force_optimize_skip_unused_shards_nesting instead.", 0) \
M(Bool, enable_debug_queries, false, "Enabled debug queries, but now is obsolete", 0) \
M(Bool, allow_experimental_database_atomic, true, "Obsolete setting, does nothing. Will be removed after 2021-02-12", 0) \
M(UnionMode, union_default_mode, UnionMode::DISTINCT, "Set default Union Mode in SelectWithUnion query. Possible values: empty string, 'ALL', 'DISTINCT'. If empty, query without Union Mode will throw exception.", 0)
// End of COMMON_SETTINGS
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS below.

View File

@ -12,6 +12,7 @@ namespace ErrorCodes
extern const int UNKNOWN_JOIN;
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_MYSQL_DATATYPES_SUPPORT_LEVEL;
extern const int UNKNOWN_UNION;
}
@ -96,4 +97,9 @@ IMPLEMENT_SETTING_MULTI_ENUM(MySQLDataTypesSupport, ErrorCodes::UNKNOWN_MYSQL_DA
{{"decimal", MySQLDataTypesSupport::DECIMAL},
{"datetime64", MySQLDataTypesSupport::DATETIME64}})
IMPLEMENT_SETTING_ENUM(UnionMode, ErrorCodes::UNKNOWN_UNION,
{{"", UnionMode::Unspecified},
{"ALL", UnionMode::ALL},
{"DISTINCT", UnionMode::DISTINCT}})
}

View File

@ -129,4 +129,13 @@ enum class MySQLDataTypesSupport
DECLARE_SETTING_MULTI_ENUM(MySQLDataTypesSupport)
enum class UnionMode
{
Unspecified = 0, // Query UNION without UnionMode will throw exception
ALL, // Query UNION without UnionMode -> SELECT ... UNION ALL SELECT ...
DISTINCT // Query UNION without UnionMode -> SELECT ... UNION DISTINCT SELECT ...
};
DECLARE_SETTING_ENUM(UnionMode)
}

View File

@ -0,0 +1,13 @@
#pragma once
// .h autogenerated by cmake!
#define USE_ICU 1
#define USE_MYSQL 1
#define USE_RDKAFKA 1
#define USE_AMQPCPP 1
#define USE_EMBEDDED_COMPILER 0
#define USE_INTERNAL_LLVM_LIBRARY 0
#define USE_SSL 1
#define USE_OPENCL 0
#define USE_LDAP 1

View File

@ -27,6 +27,11 @@ RemoteBlockOutputStream::RemoteBlockOutputStream(Connection & connection_,
{
ClientInfo modified_client_info = client_info_;
modified_client_info.query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
if (CurrentThread::isInitialized())
{
modified_client_info.client_trace_context
= CurrentThread::get().thread_trace_context;
}
/** Send query and receive "header", that describes table structure.
* Header is needed to know, what structure is required for blocks to be passed to 'write' method.

View File

@ -156,6 +156,10 @@ void RemoteQueryExecutor::sendQuery()
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
ClientInfo modified_client_info = context.getClientInfo();
modified_client_info.query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
if (CurrentThread::isInitialized())
{
modified_client_info.client_trace_context = CurrentThread::get().thread_trace_context;
}
multiplexed_connections->sendQuery(timeouts, query, query_id, stage, modified_client_info, true);

View File

@ -237,7 +237,10 @@ void assertResponseIsOk(const Poco::Net::HTTPRequest & request, Poco::Net::HTTPR
{
auto status = response.getStatus();
if (!(status == Poco::Net::HTTPResponse::HTTP_OK || (isRedirect(status) && allow_redirects)))
if (!(status == Poco::Net::HTTPResponse::HTTP_OK
|| status == Poco::Net::HTTPResponse::HTTP_CREATED
|| status == Poco::Net::HTTPResponse::HTTP_ACCEPTED
|| (isRedirect(status) && allow_redirects)))
{
std::stringstream error_message; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
error_message.exceptions(std::ios::failbit);

View File

@ -85,6 +85,8 @@ public:
void restart()
{
if (vector.empty())
vector.resize(initial_size);
set(reinterpret_cast<Position>(vector.data()), vector.size());
is_finished = false;
}

View File

@ -859,15 +859,15 @@ template <typename T>
inline std::enable_if_t<std::is_floating_point_v<T>, void>
writeText(const T & x, WriteBuffer & buf) { writeFloatText(x, buf); }
inline void writeText(const String & x, WriteBuffer & buf) { writeEscapedString(x, buf); }
inline void writeText(const String & x, WriteBuffer & buf) { writeString(x.c_str(), x.size(), buf); }
/// Implemented as template specialization (not function overload) to avoid preference over templates on arithmetic types above.
template <> inline void writeText<bool>(const bool & x, WriteBuffer & buf) { writeBoolText(x, buf); }
/// unlike the method for std::string
/// assumes here that `x` is a null-terminated string.
inline void writeText(const char * x, WriteBuffer & buf) { writeEscapedString(x, strlen(x), buf); }
inline void writeText(const char * x, size_t size, WriteBuffer & buf) { writeEscapedString(x, size, buf); }
inline void writeText(const char * x, WriteBuffer & buf) { writeCString(x, buf); }
inline void writeText(const char * x, size_t size, WriteBuffer & buf) { writeString(x, size, buf); }
inline void writeText(const DayNum & x, WriteBuffer & buf) { writeDateText(LocalDate(x), buf); }
inline void writeText(const LocalDate & x, WriteBuffer & buf) { writeDateText(x, buf); }

View File

@ -62,16 +62,16 @@ void ClientInfo::write(WriteBuffer & out, const UInt64 server_protocol_revision)
if (server_protocol_revision >= DBMS_MIN_REVISION_WITH_OPENTELEMETRY)
{
if (opentelemetry_trace_id)
if (client_trace_context.trace_id)
{
// Have OpenTelemetry header.
writeBinary(uint8_t(1), out);
// No point writing these numbers with variable length, because they
// are random and will probably require the full length anyway.
writeBinary(opentelemetry_trace_id, out);
writeBinary(opentelemetry_span_id, out);
writeBinary(opentelemetry_tracestate, out);
writeBinary(opentelemetry_trace_flags, out);
writeBinary(client_trace_context.trace_id, out);
writeBinary(client_trace_context.span_id, out);
writeBinary(client_trace_context.tracestate, out);
writeBinary(client_trace_context.trace_flags, out);
}
else
{
@ -139,10 +139,10 @@ void ClientInfo::read(ReadBuffer & in, const UInt64 client_protocol_revision)
readBinary(have_trace_id, in);
if (have_trace_id)
{
readBinary(opentelemetry_trace_id, in);
readBinary(opentelemetry_span_id, in);
readBinary(opentelemetry_tracestate, in);
readBinary(opentelemetry_trace_flags, in);
readBinary(client_trace_context.trace_id, in);
readBinary(client_trace_context.span_id, in);
readBinary(client_trace_context.tracestate, in);
readBinary(client_trace_context.trace_flags, in);
}
}
}
@ -155,74 +155,6 @@ void ClientInfo::setInitialQuery()
client_name = (DBMS_NAME " ") + client_name;
}
bool ClientInfo::parseTraceparentHeader(const std::string & traceparent,
std::string & error)
{
uint8_t version = -1;
uint64_t trace_id_high = 0;
uint64_t trace_id_low = 0;
uint64_t trace_parent = 0;
uint8_t trace_flags = 0;
// Version 00, which is the only one we can parse, is fixed width. Use this
// fact for an additional sanity check.
const int expected_length = 2 + 1 + 32 + 1 + 16 + 1 + 2;
if (traceparent.length() != expected_length)
{
error = fmt::format("unexpected length {}, expected {}",
traceparent.length(), expected_length);
return false;
}
// clang-tidy doesn't like sscanf:
// error: 'sscanf' used to convert a string to an unsigned integer value,
// but function will not report conversion errors; consider using 'strtoul'
// instead [cert-err34-c,-warnings-as-errors]
// There is no other ready solution, and hand-rolling a more complicated
// parser for an HTTP header in C++ sounds like RCE.
// NOLINTNEXTLINE(cert-err34-c)
int result = sscanf(&traceparent[0],
"%2" SCNx8 "-%16" SCNx64 "%16" SCNx64 "-%16" SCNx64 "-%2" SCNx8,
&version, &trace_id_high, &trace_id_low, &trace_parent, &trace_flags);
if (result == EOF)
{
error = "EOF";
return false;
}
// We read uint128 as two uint64, so 5 parts and not 4.
if (result != 5)
{
error = fmt::format("could only read {} parts instead of the expected 5",
result);
return false;
}
if (version != 0)
{
error = fmt::format("unexpected version {}, expected 00", version);
return false;
}
opentelemetry_trace_id = static_cast<__uint128_t>(trace_id_high) << 64
| trace_id_low;
opentelemetry_span_id = trace_parent;
opentelemetry_trace_flags = trace_flags;
return true;
}
std::string ClientInfo::composeTraceparentHeader() const
{
// This span is a parent for its children, so we specify this span_id as a
// parent id.
return fmt::format("00-{:032x}-{:016x}-{:02x}", opentelemetry_trace_id,
opentelemetry_span_id,
// This cast is needed because fmt is being weird and complaining that
// "mixing character types is not allowed".
static_cast<uint8_t>(opentelemetry_trace_flags));
}
void ClientInfo::fillOSUserHostNameAndVersionInfo()
{

View File

@ -3,7 +3,7 @@
#include <Poco/Net/SocketAddress.h>
#include <Common/UInt128.h>
#include <common/types.h>
#include <Common/OpenTelemetryTraceContext.h>
namespace DB
{
@ -25,6 +25,7 @@ public:
{
TCP = 1,
HTTP = 2,
GRPC = 3,
};
enum class HTTPMethod : uint8_t
@ -59,16 +60,9 @@ public:
String initial_query_id;
Poco::Net::SocketAddress initial_address;
// OpenTelemetry trace information.
__uint128_t opentelemetry_trace_id = 0;
// The span id we get the in the incoming client info becomes our parent span
// id, and the span id we send becomes downstream parent span id.
UInt64 opentelemetry_span_id = 0;
UInt64 opentelemetry_parent_span_id = 0;
// The incoming tracestate header and the trace flags, we just pass them downstream.
// They are described at https://www.w3.org/TR/trace-context/
String opentelemetry_tracestate;
UInt8 opentelemetry_trace_flags = 0;
// OpenTelemetry trace context we received from client, or which we are going
// to send to server.
OpenTelemetryTraceContext client_trace_context;
/// All below are parameters related to initial query.
@ -102,16 +96,6 @@ public:
/// Initialize parameters on client initiating query.
void setInitialQuery();
// Parse/compose OpenTelemetry traceparent header.
// Note that these functions use span_id field, not parent_span_id, same as
// in native protocol. The incoming traceparent corresponds to the upstream
// trace span, and the outgoing traceparent corresponds to our current span.
// We use the same ClientInfo structure first for incoming span, and then
// for our span: when we switch, we use old span_id as parent_span_id, and
// generate a new span_id (currently this happens in Context::setQueryId()).
bool parseTraceparentHeader(const std::string & traceparent, std::string & error);
std::string composeTraceparentHeader() const;
private:
void fillOSUserHostNameAndVersionInfo();
};

View File

@ -1127,8 +1127,14 @@ void Context::setCurrentQueryId(const String & query_id)
random.words.a = thread_local_rng(); //-V656
random.words.b = thread_local_rng(); //-V656
if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY
&& client_info.opentelemetry_trace_id == 0)
if (client_info.client_trace_context.trace_id != 0)
{
// Use the OpenTelemetry trace context we received from the client, and
// create a new span for the query.
query_trace_context = client_info.client_trace_context;
query_trace_context.span_id = thread_local_rng();
}
else if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
{
// If this is an initial query without any parent OpenTelemetry trace, we
// might start the trace ourselves, with some configurable probability.
@ -1138,20 +1144,12 @@ void Context::setCurrentQueryId(const String & query_id)
if (should_start_trace(thread_local_rng))
{
// Use the randomly generated default query id as the new trace id.
client_info.opentelemetry_trace_id = random.uuid;
client_info.opentelemetry_parent_span_id = 0;
client_info.opentelemetry_span_id = thread_local_rng();
query_trace_context.trace_id = random.uuid;
query_trace_context.span_id = thread_local_rng();
// Mark this trace as sampled in the flags.
client_info.opentelemetry_trace_flags = 1;
query_trace_context.trace_flags = 1;
}
}
else
{
// The incoming request has an OpenTelemtry trace context. Its span id
// becomes our parent span id.
client_info.opentelemetry_parent_span_id = client_info.opentelemetry_span_id;
client_info.opentelemetry_span_id = thread_local_rng();
}
String query_id_to_set = query_id;
if (query_id_to_set.empty()) /// If the user did not submit his query_id, then we generate it ourselves.

View File

@ -13,6 +13,7 @@
#include <Common/LRUCache.h>
#include <Common/MultiVersion.h>
#include <Common/ThreadPool.h>
#include <Common/OpenTelemetryTraceContext.h>
#include <Storages/IStorage_fwd.h>
#include <atomic>
#include <chrono>
@ -198,6 +199,12 @@ private:
Context * session_context = nullptr; /// Session context or nullptr. Could be equal to this.
Context * global_context = nullptr; /// Global context. Could be equal to this.
public:
// Top-level OpenTelemetry trace context for the query. Makes sense only for
// a query context.
OpenTelemetryTraceContext query_trace_context;
private:
friend class NamedSessions;
using SampleBlockCache = std::unordered_map<std::string, Block>;

View File

@ -0,0 +1,39 @@
#pragma once
#include <Interpreters/Context.h>
#include <Interpreters/IInterpreter.h>
#include <Interpreters/SelectQueryOptions.h>
#include <Parsers/IAST_fwd.h>
namespace DB
{
class IInterpreterUnionOrSelectQuery : public IInterpreter
{
public:
IInterpreterUnionOrSelectQuery(const ASTPtr & query_ptr_, const Context & context_, const SelectQueryOptions & options_)
: query_ptr(query_ptr_)
, context(std::make_shared<Context>(context_))
, options(options_)
, max_streams(context->getSettingsRef().max_threads)
{
}
virtual void buildQueryPlan(QueryPlan & query_plan) = 0;
virtual void ignoreWithTotals() = 0;
virtual ~IInterpreterUnionOrSelectQuery() override = default;
Block getSampleBlock() { return result_header; }
size_t getMaxStreams() const { return max_streams; }
protected:
ASTPtr query_ptr;
std::shared_ptr<Context> context;
Block result_header;
SelectQueryOptions options;
size_t max_streams = 1;
};
}

View File

@ -115,7 +115,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
auto ast = DatabaseOnDisk::parseQueryFromMetadata(nullptr, context, metadata_file_path);
create = ast->as<ASTCreateQuery &>();
if (!create.table.empty() || !create.storage)
throw Exception(ErrorCodes::INCORRECT_QUERY, "Metadata file {} contains incorrect CREATE DATABASE query", metadata_file_path);
throw Exception(ErrorCodes::INCORRECT_QUERY, "Metadata file {} contains incorrect CREATE DATABASE query", metadata_file_path.string());
create.attach = true;
create.attach_short_syntax = true;
create.database = database_name;
@ -149,7 +149,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
metadata_path = metadata_path / "store" / DatabaseCatalog::getPathForUUID(create.uuid);
if (!create.attach && fs::exists(metadata_path))
throw Exception(ErrorCodes::DATABASE_ALREADY_EXISTS, "Metadata directory {} already exists", metadata_path);
throw Exception(ErrorCodes::DATABASE_ALREADY_EXISTS, "Metadata directory {} already exists", metadata_path.string());
}
else
{

View File

@ -66,6 +66,7 @@
#include <Interpreters/InterpreterUseQuery.h>
#include <Interpreters/InterpreterWatchQuery.h>
#include <Interpreters/InterpreterExternalDDLQuery.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Parsers/ASTSystemQuery.h>
@ -93,6 +94,8 @@ namespace ErrorCodes
std::unique_ptr<IInterpreter> InterpreterFactory::get(ASTPtr & query, Context & context, QueryProcessingStage::Enum stage)
{
OpenTelemetrySpanHolder span("InterpreterFactory::get()");
ProfileEvents::increment(ProfileEvents::Query);
if (query->as<ASTSelectQuery>())

View File

@ -140,34 +140,39 @@ Block InterpreterInsertQuery::getSampleBlock(
/** A query that just reads all data without any complex computations or filetering.
* If we just pipe the result to INSERT, we don't have to use too many threads for read.
*/
static bool isTrivialSelect(const ASTSelectQuery & select_query)
static bool isTrivialSelect(const ASTPtr & select)
{
const auto & tables = select_query.tables();
if (auto * select_query = select->as<ASTSelectQuery>())
{
const auto & tables = select_query->tables();
if (!tables)
return false;
if (!tables)
return false;
const auto & tables_in_select_query = tables->as<ASTTablesInSelectQuery &>();
const auto & tables_in_select_query = tables->as<ASTTablesInSelectQuery &>();
if (tables_in_select_query.children.size() != 1)
return false;
if (tables_in_select_query.children.size() != 1)
return false;
const auto & child = tables_in_select_query.children.front();
const auto & table_element = child->as<ASTTablesInSelectQueryElement &>();
const auto & table_expr = table_element.table_expression->as<ASTTableExpression &>();
const auto & child = tables_in_select_query.children.front();
const auto & table_element = child->as<ASTTablesInSelectQueryElement &>();
const auto & table_expr = table_element.table_expression->as<ASTTableExpression &>();
if (table_expr.subquery)
return false;
if (table_expr.subquery)
return false;
/// Note: how to write it in more generic way?
return (!select_query.distinct
&& !select_query.limit_with_ties
&& !select_query.prewhere()
&& !select_query.where()
&& !select_query.groupBy()
&& !select_query.having()
&& !select_query.orderBy()
&& !select_query.limitBy());
/// Note: how to write it in more generic way?
return (!select_query->distinct
&& !select_query->limit_with_ties
&& !select_query->prewhere()
&& !select_query->where()
&& !select_query->groupBy()
&& !select_query->having()
&& !select_query->orderBy()
&& !select_query->limitBy());
}
/// This query is ASTSelectWithUnionQuery subquery
return false;
};
@ -196,23 +201,25 @@ BlockIO InterpreterInsertQuery::execute()
auto new_query = std::dynamic_pointer_cast<ASTInsertQuery>(query.clone());
if (select.list_of_selects->children.size() == 1)
{
auto & select_query = select.list_of_selects->children.at(0)->as<ASTSelectQuery &>();
JoinedTables joined_tables(Context(context), select_query);
if (joined_tables.tablesCount() == 1)
if (auto * select_query = select.list_of_selects->children.at(0)->as<ASTSelectQuery>())
{
storage_src = std::dynamic_pointer_cast<StorageDistributed>(joined_tables.getLeftTableStorage());
if (storage_src)
JoinedTables joined_tables(Context(context), *select_query);
if (joined_tables.tablesCount() == 1)
{
const auto select_with_union_query = std::make_shared<ASTSelectWithUnionQuery>();
select_with_union_query->list_of_selects = std::make_shared<ASTExpressionList>();
storage_src = std::dynamic_pointer_cast<StorageDistributed>(joined_tables.getLeftTableStorage());
if (storage_src)
{
const auto select_with_union_query = std::make_shared<ASTSelectWithUnionQuery>();
select_with_union_query->list_of_selects = std::make_shared<ASTExpressionList>();
auto new_select_query = std::dynamic_pointer_cast<ASTSelectQuery>(select_query.clone());
select_with_union_query->list_of_selects->children.push_back(new_select_query);
auto new_select_query = std::dynamic_pointer_cast<ASTSelectQuery>(select_query->clone());
select_with_union_query->list_of_selects->children.push_back(new_select_query);
new_select_query->replaceDatabaseAndTable(storage_src->getRemoteDatabaseName(), storage_src->getRemoteTableName());
new_select_query->replaceDatabaseAndTable(storage_src->getRemoteDatabaseName(), storage_src->getRemoteTableName());
new_query->select = select_with_union_query;
new_query->select = select_with_union_query;
}
}
}
}
@ -275,12 +282,17 @@ BlockIO InterpreterInsertQuery::execute()
if (settings.optimize_trivial_insert_select)
{
const auto & selects = query.select->as<ASTSelectWithUnionQuery &>().list_of_selects->children;
const auto & select_query = query.select->as<ASTSelectWithUnionQuery &>();
const auto & selects = select_query.list_of_selects->children;
const auto & union_modes = select_query.list_of_modes;
is_trivial_insert_select = std::all_of(selects.begin(), selects.end(), [](const ASTPtr & select)
{
return isTrivialSelect(select->as<ASTSelectQuery &>());
});
/// ASTSelectWithUnionQuery is not normalized now, so it may pass some querys which can be Trivial select querys
is_trivial_insert_select
= std::all_of(
union_modes.begin(),
union_modes.end(),
[](const ASTSelectWithUnionQuery::Mode & mode) { return mode == ASTSelectWithUnionQuery::Mode::ALL; })
&& std::all_of(selects.begin(), selects.end(), [](const ASTPtr & select) { return isTrivialSelect(select); });
}
if (is_trivial_insert_select)

View File

@ -29,6 +29,7 @@
#include <Interpreters/TableJoin.h>
#include <Interpreters/JoinSwitcher.h>
#include <Interpreters/JoinedTables.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Interpreters/QueryAliasesVisitor.h>
#include <Processors/Pipe.h>
@ -216,10 +217,8 @@ InterpreterSelectQuery::InterpreterSelectQuery(
const SelectQueryOptions & options_,
const Names & required_result_column_names,
const StorageMetadataPtr & metadata_snapshot_)
: options(options_)
/// NOTE: the query almost always should be cloned because it will be modified during analysis.
, query_ptr(options.modify_inplace ? query_ptr_ : query_ptr_->clone())
, context(std::make_shared<Context>(context_))
: IInterpreterUnionOrSelectQuery(options_.modify_inplace ? query_ptr_ : query_ptr_->clone(), context_, options_)
, storage(storage_)
, input(input_)
, input_pipe(std::move(input_pipe_))
@ -465,12 +464,6 @@ InterpreterSelectQuery::InterpreterSelectQuery(
sanitizeBlock(result_header, true);
}
Block InterpreterSelectQuery::getSampleBlock()
{
return result_header;
}
void InterpreterSelectQuery::buildQueryPlan(QueryPlan & query_plan)
{
executeImpl(query_plan, input, std::move(input_pipe));
@ -502,6 +495,8 @@ BlockIO InterpreterSelectQuery::execute()
Block InterpreterSelectQuery::getSampleBlockImpl()
{
OpenTelemetrySpanHolder span(__PRETTY_FUNCTION__);
query_info.query = query_ptr;
if (storage && !options.only_analyze)

View File

@ -3,16 +3,15 @@
#include <memory>
#include <Core/QueryProcessingStage.h>
#include <Parsers/ASTSelectQuery.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/IInterpreter.h>
#include <Interpreters/SelectQueryOptions.h>
#include <Interpreters/IInterpreterUnionOrSelectQuery.h>
#include <Interpreters/StorageID.h>
#include <Parsers/ASTSelectQuery.h>
#include <Storages/ReadInOrderOptimizer.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/TableLockHolder.h>
#include <Storages/ReadInOrderOptimizer.h>
#include <Interpreters/StorageID.h>
#include <Columns/FilterDescription.h>
@ -32,7 +31,7 @@ using TreeRewriterResultPtr = std::shared_ptr<const TreeRewriterResult>;
/** Interprets the SELECT query. Returns the stream of blocks with the results of the query before `to_stage` stage.
*/
class InterpreterSelectQuery : public IInterpreter
class InterpreterSelectQuery : public IInterpreterUnionOrSelectQuery
{
public:
/**
@ -79,18 +78,12 @@ public:
BlockIO execute() override;
/// Builds QueryPlan for current query.
void buildQueryPlan(QueryPlan & query_plan);
virtual void buildQueryPlan(QueryPlan & query_plan) override;
bool ignoreLimits() const override { return options.ignore_limits; }
bool ignoreQuota() const override { return options.ignore_quota; }
Block getSampleBlock();
void ignoreWithTotals();
ASTPtr getQuery() const { return query_ptr; }
size_t getMaxStreams() const { return max_streams; }
virtual void ignoreWithTotals() override;
const SelectQueryInfo & getQueryInfo() const { return query_info; }
@ -160,9 +153,6 @@ private:
*/
void initSettings();
SelectQueryOptions options;
ASTPtr query_ptr;
std::shared_ptr<Context> context;
TreeRewriterResultPtr syntax_analyzer_result;
std::unique_ptr<SelectQueryExpressionAnalyzer> query_analyzer;
SelectQueryInfo query_info;
@ -174,15 +164,10 @@ private:
QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns;
/// How many streams we ask for storage to produce, and in how many threads we will do further processing.
size_t max_streams = 1;
/// List of columns to read to execute the query.
Names required_columns;
/// Structure of query source (table, subquery, etc).
Block source_header;
/// Structure of query result.
Block result_header;
/// The subquery interpreter, if the subquery
std::unique_ptr<InterpreterSelectWithUnionQuery> interpreter_subquery;

View File

@ -1,15 +1,17 @@
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <Columns/getLeastSuperColumn.h>
#include <Common/typeid_cast.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/queryToString.h>
#include <Processors/QueryPlan/DistinctStep.h>
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/UnionStep.h>
#include <Common/typeid_cast.h>
#include <Interpreters/InDepthNodeVisitor.h>
namespace DB
{
@ -18,50 +20,158 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNION_ALL_RESULT_STRUCTURES_MISMATCH;
extern const int EXPECTED_ALL_OR_DISTINCT;
}
struct CustomizeASTSelectWithUnionQueryNormalize
{
using TypeToVisit = ASTSelectWithUnionQuery;
const UnionMode & union_default_mode;
static void getSelectsFromUnionListNode(ASTPtr & ast_select, ASTs & selects)
{
if (auto * inner_union = ast_select->as<ASTSelectWithUnionQuery>())
{
for (auto & child : inner_union->list_of_selects->children)
getSelectsFromUnionListNode(child, selects);
return;
}
selects.push_back(std::move(ast_select));
}
void visit(ASTSelectWithUnionQuery & ast, ASTPtr &) const
{
auto & union_modes = ast.list_of_modes;
ASTs selects;
auto & select_list = ast.list_of_selects->children;
int i;
for (i = union_modes.size() - 1; i >= 0; --i)
{
/// Rewrite UNION Mode
if (union_modes[i] == ASTSelectWithUnionQuery::Mode::Unspecified)
{
if (union_default_mode == UnionMode::ALL)
union_modes[i] = ASTSelectWithUnionQuery::Mode::ALL;
else if (union_default_mode == UnionMode::DISTINCT)
union_modes[i] = ASTSelectWithUnionQuery::Mode::DISTINCT;
else
throw Exception(
"Expected ALL or DISTINCT in SelectWithUnion query, because setting (union_default_mode) is empty",
DB::ErrorCodes::EXPECTED_ALL_OR_DISTINCT);
}
if (union_modes[i] == ASTSelectWithUnionQuery::Mode::ALL)
{
if (auto * inner_union = select_list[i + 1]->as<ASTSelectWithUnionQuery>())
{
/// Inner_union is an UNION ALL list, just lift up
for (auto child = inner_union->list_of_selects->children.rbegin();
child != inner_union->list_of_selects->children.rend();
++child)
selects.push_back(std::move(*child));
}
else
selects.push_back(std::move(select_list[i + 1]));
}
/// flatten all left nodes and current node to a UNION DISTINCT list
else if (union_modes[i] == ASTSelectWithUnionQuery::Mode::DISTINCT)
{
auto distinct_list = std::make_shared<ASTSelectWithUnionQuery>();
distinct_list->list_of_selects = std::make_shared<ASTExpressionList>();
distinct_list->children.push_back(distinct_list->list_of_selects);
for (int j = 0; j <= i + 1; ++j)
{
getSelectsFromUnionListNode(select_list[j], distinct_list->list_of_selects->children);
}
distinct_list->union_mode = ASTSelectWithUnionQuery::Mode::DISTINCT;
distinct_list->is_normalized = true;
selects.push_back(std::move(distinct_list));
break;
}
}
/// No UNION DISTINCT or only one child in select_list
if (i == -1)
{
if (auto * inner_union = select_list[0]->as<ASTSelectWithUnionQuery>())
{
/// Inner_union is an UNION ALL list, just lift it up
for (auto child = inner_union->list_of_selects->children.rbegin(); child != inner_union->list_of_selects->children.rend();
++child)
selects.push_back(std::move(*child));
}
else
selects.push_back(std::move(select_list[0]));
}
// reverse children list
std::reverse(selects.begin(), selects.end());
ast.is_normalized = true;
ast.union_mode = ASTSelectWithUnionQuery::Mode::ALL;
ast.list_of_selects->children = std::move(selects);
}
};
/// We need normalize children first, so we should visit AST tree bottom up
using CustomizeASTSelectWithUnionQueryNormalizeVisitor
= InDepthNodeVisitor<OneTypeMatcher<CustomizeASTSelectWithUnionQueryNormalize>, false>;
InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
const ASTPtr & query_ptr_,
const Context & context_,
const SelectQueryOptions & options_,
const Names & required_result_column_names)
: options(options_),
query_ptr(query_ptr_),
context(std::make_shared<Context>(context_)),
max_streams(context->getSettingsRef().max_threads)
const ASTPtr & query_ptr_, const Context & context_, const SelectQueryOptions & options_, const Names & required_result_column_names)
: IInterpreterUnionOrSelectQuery(query_ptr_, context_, options_)
{
const auto & ast = query_ptr->as<ASTSelectWithUnionQuery &>();
auto & ast = query_ptr->as<ASTSelectWithUnionQuery &>();
size_t num_selects = ast.list_of_selects->children.size();
/// Normalize AST Tree
if (!ast.is_normalized)
{
CustomizeASTSelectWithUnionQueryNormalizeVisitor::Data union_default_mode{context->getSettingsRef().union_default_mode};
CustomizeASTSelectWithUnionQueryNormalizeVisitor(union_default_mode).visit(query_ptr);
if (!num_selects)
/// After normalization, if it only has one ASTSelectWithUnionQuery child,
/// we can lift it up, this can reduce one unnecessary recursion later.
if (ast.list_of_selects->children.size() == 1 && ast.list_of_selects->children.at(0)->as<ASTSelectWithUnionQuery>())
{
query_ptr = std::move(ast.list_of_selects->children.at(0));
ast = query_ptr->as<ASTSelectWithUnionQuery &>();
}
}
size_t num_children = ast.list_of_selects->children.size();
if (!num_children)
throw Exception("Logical error: no children in ASTSelectWithUnionQuery", ErrorCodes::LOGICAL_ERROR);
/// Initialize interpreters for each SELECT query.
/// Note that we pass 'required_result_column_names' to first SELECT.
/// And for the rest, we pass names at the corresponding positions of 'required_result_column_names' in the result of first SELECT,
/// because names could be different.
nested_interpreters.reserve(num_selects);
nested_interpreters.reserve(num_children);
std::vector<Names> required_result_column_names_for_other_selects(num_children);
std::vector<Names> required_result_column_names_for_other_selects(num_selects);
if (!required_result_column_names.empty() && num_selects > 1)
if (!required_result_column_names.empty() && num_children > 1)
{
/// Result header if there are no filtering by 'required_result_column_names'.
/// We use it to determine positions of 'required_result_column_names' in SELECT clause.
Block full_result_header = InterpreterSelectQuery(
ast.list_of_selects->children.at(0), *context, options.copy().analyze().noModify()).getSampleBlock();
Block full_result_header = getCurrentChildResultHeader(ast.list_of_selects->children.at(0), required_result_column_names);
std::vector<size_t> positions_of_required_result_columns(required_result_column_names.size());
for (size_t required_result_num = 0, size = required_result_column_names.size(); required_result_num < size; ++required_result_num)
positions_of_required_result_columns[required_result_num] = full_result_header.getPositionByName(required_result_column_names[required_result_num]);
for (size_t query_num = 1; query_num < num_selects; ++query_num)
for (size_t query_num = 1; query_num < num_children; ++query_num)
{
Block full_result_header_for_current_select = InterpreterSelectQuery(
ast.list_of_selects->children.at(query_num), *context, options.copy().analyze().noModify()).getSampleBlock();
Block full_result_header_for_current_select
= getCurrentChildResultHeader(ast.list_of_selects->children.at(query_num), required_result_column_names);
if (full_result_header_for_current_select.columns() != full_result_header.columns())
throw Exception("Different number of columns in UNION ALL elements:\n"
@ -76,28 +186,25 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
}
}
for (size_t query_num = 0; query_num < num_selects; ++query_num)
for (size_t query_num = 0; query_num < num_children; ++query_num)
{
const Names & current_required_result_column_names
= query_num == 0 ? required_result_column_names : required_result_column_names_for_other_selects[query_num];
nested_interpreters.emplace_back(std::make_unique<InterpreterSelectQuery>(
ast.list_of_selects->children.at(query_num),
*context,
options,
current_required_result_column_names));
nested_interpreters.emplace_back(
buildCurrentChildInterpreter(ast.list_of_selects->children.at(query_num), current_required_result_column_names));
}
/// Determine structure of the result.
if (num_selects == 1)
if (num_children == 1)
{
result_header = nested_interpreters.front()->getSampleBlock();
}
else
{
Blocks headers(num_selects);
for (size_t query_num = 0; query_num < num_selects; ++query_num)
Blocks headers(num_children);
for (size_t query_num = 0; query_num < num_children; ++query_num)
headers[query_num] = nested_interpreters[query_num]->getSampleBlock();
result_header = getCommonHeaderForUnion(headers);
@ -115,8 +222,8 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
}
options.ignore_limits |= all_nested_ignore_limits;
options.ignore_quota |= all_nested_ignore_quota;
}
}
Block InterpreterSelectWithUnionQuery::getCommonHeaderForUnion(const Blocks & headers)
{
@ -148,32 +255,43 @@ Block InterpreterSelectWithUnionQuery::getCommonHeaderForUnion(const Blocks & he
return common_header;
}
Block InterpreterSelectWithUnionQuery::getCurrentChildResultHeader(const ASTPtr & ast_ptr_, const Names & required_result_column_names)
{
if (ast_ptr_->as<ASTSelectWithUnionQuery>())
return InterpreterSelectWithUnionQuery(ast_ptr_, *context, options.copy().analyze().noModify(), required_result_column_names)
.getSampleBlock();
else
return InterpreterSelectQuery(ast_ptr_, *context, options.copy().analyze().noModify()).getSampleBlock();
}
std::unique_ptr<IInterpreterUnionOrSelectQuery>
InterpreterSelectWithUnionQuery::buildCurrentChildInterpreter(const ASTPtr & ast_ptr_, const Names & current_required_result_column_names)
{
if (ast_ptr_->as<ASTSelectWithUnionQuery>())
return std::make_unique<InterpreterSelectWithUnionQuery>(ast_ptr_, *context, options, current_required_result_column_names);
else
return std::make_unique<InterpreterSelectQuery>(ast_ptr_, *context, options, current_required_result_column_names);
}
InterpreterSelectWithUnionQuery::~InterpreterSelectWithUnionQuery() = default;
Block InterpreterSelectWithUnionQuery::getSampleBlock()
Block InterpreterSelectWithUnionQuery::getSampleBlock(const ASTPtr & query_ptr_, const Context & context_)
{
return result_header;
}
Block InterpreterSelectWithUnionQuery::getSampleBlock(
const ASTPtr & query_ptr,
const Context & context)
{
auto & cache = context.getSampleBlockCache();
auto & cache = context_.getSampleBlockCache();
/// Using query string because query_ptr changes for every internal SELECT
auto key = queryToString(query_ptr);
auto key = queryToString(query_ptr_);
if (cache.find(key) != cache.end())
{
return cache[key];
}
return cache[key] = InterpreterSelectWithUnionQuery(query_ptr, context, SelectQueryOptions().analyze()).getSampleBlock();
return cache[key] = InterpreterSelectWithUnionQuery(query_ptr_, context_, SelectQueryOptions().analyze()).getSampleBlock();
}
void InterpreterSelectWithUnionQuery::buildQueryPlan(QueryPlan & query_plan)
{
// auto num_distinct_union = optimizeUnionList();
size_t num_plans = nested_interpreters.size();
/// Skip union for single interpreter.
@ -197,6 +315,19 @@ void InterpreterSelectWithUnionQuery::buildQueryPlan(QueryPlan & query_plan)
auto union_step = std::make_unique<UnionStep>(std::move(data_streams), result_header, max_threads);
query_plan.unitePlans(std::move(union_step), std::move(plans));
const auto & query = query_ptr->as<ASTSelectWithUnionQuery &>();
if (query.union_mode == ASTSelectWithUnionQuery::Mode::DISTINCT)
{
/// Add distinct transform
const Settings & settings = context->getSettingsRef();
SizeLimits limits(settings.max_rows_in_distinct, settings.max_bytes_in_distinct, settings.distinct_overflow_mode);
auto distinct_step = std::make_unique<DistinctStep>(query_plan.getCurrentDataStream(), limits, 0, result_header.getNames(), false);
query_plan.addStep(std::move(distinct_step));
}
}
BlockIO InterpreterSelectWithUnionQuery::execute()

View File

@ -1,9 +1,7 @@
#pragma once
#include <Core/QueryProcessingStage.h>
#include <Interpreters/IInterpreter.h>
#include <Interpreters/SelectQueryOptions.h>
#include <Parsers/IAST_fwd.h>
#include <Interpreters/IInterpreterUnionOrSelectQuery.h>
namespace DB
{
@ -12,11 +10,13 @@ class Context;
class InterpreterSelectQuery;
class QueryPlan;
/** Interprets one or multiple SELECT queries inside UNION ALL chain.
/** Interprets one or multiple SELECT queries inside UNION/UNION ALL/UNION DISTINCT chain.
*/
class InterpreterSelectWithUnionQuery : public IInterpreter
class InterpreterSelectWithUnionQuery : public IInterpreterUnionOrSelectQuery
{
public:
using IInterpreterUnionOrSelectQuery::getSampleBlock;
InterpreterSelectWithUnionQuery(
const ASTPtr & query_ptr_,
const Context & context_,
@ -26,35 +26,29 @@ public:
~InterpreterSelectWithUnionQuery() override;
/// Builds QueryPlan for current query.
void buildQueryPlan(QueryPlan & query_plan);
virtual void buildQueryPlan(QueryPlan & query_plan) override;
BlockIO execute() override;
bool ignoreLimits() const override { return options.ignore_limits; }
bool ignoreQuota() const override { return options.ignore_quota; }
Block getSampleBlock();
static Block getSampleBlock(
const ASTPtr & query_ptr_,
const Context & context_);
void ignoreWithTotals();
ASTPtr getQuery() const { return query_ptr; }
virtual void ignoreWithTotals() override;
private:
SelectQueryOptions options;
ASTPtr query_ptr;
std::shared_ptr<Context> context;
std::vector<std::unique_ptr<InterpreterSelectQuery>> nested_interpreters;
Block result_header;
size_t max_streams = 1;
std::vector<std::unique_ptr<IInterpreterUnionOrSelectQuery>> nested_interpreters;
static Block getCommonHeaderForUnion(const Blocks & headers);
Block getCurrentChildResultHeader(const ASTPtr & ast_ptr_, const Names & required_result_column_names);
std::unique_ptr<IInterpreterUnionOrSelectQuery>
buildCurrentChildInterpreter(const ASTPtr & ast_ptr_, const Names & current_required_result_column_names);
};
}

View File

@ -18,8 +18,18 @@ Block OpenTelemetrySpanLogElement::createBlock()
{std::make_shared<DataTypeUInt64>(), "span_id"},
{std::make_shared<DataTypeUInt64>(), "parent_span_id"},
{std::make_shared<DataTypeString>(), "operation_name"},
{std::make_shared<DataTypeDateTime64>(6), "start_time_us"},
{std::make_shared<DataTypeDateTime64>(6), "finish_time_us"},
// DateTime64 is really unwieldy -- there is no "normal" way to convert
// it to an UInt64 count of microseconds, except:
// 1) reinterpretAsUInt64(reinterpretAsFixedString(date)), which just
// doesn't look sane;
// 2) things like toUInt64(toDecimal64(date, 6) * 1000000) that are also
// excessively verbose -- why do I have to write scale '6' again, and
// write out 6 zeros? -- and also don't work because of overflow.
// Also subtraction of two DateTime64 points doesn't work, so you can't
// get duration.
// It is much less hassle to just use UInt64 of microseconds.
{std::make_shared<DataTypeUInt64>(), "start_time_us"},
{std::make_shared<DataTypeUInt64>(), "finish_time_us"},
{std::make_shared<DataTypeDate>(), "finish_date"},
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()),
"attribute.names"},
@ -28,6 +38,7 @@ Block OpenTelemetrySpanLogElement::createBlock()
};
}
void OpenTelemetrySpanLogElement::appendToBlock(MutableColumns & columns) const
{
size_t i = 0;
@ -40,8 +51,177 @@ void OpenTelemetrySpanLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(finish_time_us);
columns[i++]->insert(DateLUT::instance().toDayNum(finish_time_us / 1000000));
columns[i++]->insert(attribute_names);
columns[i++]->insert(attribute_values);
// The user might add some ints values, and we will have Int Field, and the
// insert will fail because the column requires Strings. Convert the fields
// here, because it's hard to remember to convert them in all other places.
Array string_values;
string_values.reserve(attribute_values.size());
for (const auto & value : attribute_values)
{
string_values.push_back(toString(value));
}
columns[i++]->insert(string_values);
}
OpenTelemetrySpanHolder::OpenTelemetrySpanHolder(const std::string & _operation_name)
{
trace_id = 0;
if (!CurrentThread::isInitialized())
{
// There may be no thread context if we're running inside the
// clickhouse-client, e.g. reading an external table provided with the
// `--external` option.
return;
}
auto & thread = CurrentThread::get();
trace_id = thread.thread_trace_context.trace_id;
if (!trace_id)
{
return;
}
parent_span_id = thread.thread_trace_context.span_id;
span_id = thread_local_rng();
operation_name = _operation_name;
start_time_us = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
#ifndef NDEBUG
attribute_names.push_back("clickhouse.start.stacktrace");
attribute_values.push_back(StackTrace().toString());
#endif
thread.thread_trace_context.span_id = span_id;
}
OpenTelemetrySpanHolder::~OpenTelemetrySpanHolder()
{
try
{
if (!trace_id)
{
return;
}
// First of all, return old value of current span.
auto & thread = CurrentThread::get();
assert(thread.thread_trace_context.span_id = span_id);
thread.thread_trace_context.span_id = parent_span_id;
// Not sure what's the best way to access the log from here.
auto * thread_group = CurrentThread::getGroup().get();
// Not sure whether and when this can be null.
if (!thread_group)
{
return;
}
auto * context = thread_group->query_context;
if (!context)
{
// Both global and query contexts can be null when executing a
// background task, and global context can be null for some
// queries.
return;
}
#ifndef NDEBUG
attribute_names.push_back("clickhouse.end.stacktrace");
attribute_values.push_back(StackTrace().toString());
#endif
auto log = context->getOpenTelemetrySpanLog();
if (!log)
{
// The log might be disabled.
return;
}
finish_time_us = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
log->add(OpenTelemetrySpanLogElement(
static_cast<OpenTelemetrySpan>(*this)));
}
catch (...)
{
tryLogCurrentException(__FUNCTION__);
}
}
bool OpenTelemetryTraceContext::parseTraceparentHeader(const std::string & traceparent,
std::string & error)
{
trace_id = 0;
uint8_t version = -1;
uint64_t trace_id_high = 0;
uint64_t trace_id_low = 0;
// Version 00, which is the only one we can parse, is fixed width. Use this
// fact for an additional sanity check.
const int expected_length = 2 + 1 + 32 + 1 + 16 + 1 + 2;
if (traceparent.length() != expected_length)
{
error = fmt::format("unexpected length {}, expected {}",
traceparent.length(), expected_length);
return false;
}
// clang-tidy doesn't like sscanf:
// error: 'sscanf' used to convert a string to an unsigned integer value,
// but function will not report conversion errors; consider using 'strtoul'
// instead [cert-err34-c,-warnings-as-errors]
// There is no other ready solution, and hand-rolling a more complicated
// parser for an HTTP header in C++ sounds like RCE.
// NOLINTNEXTLINE(cert-err34-c)
int result = sscanf(&traceparent[0],
"%2" SCNx8 "-%16" SCNx64 "%16" SCNx64 "-%16" SCNx64 "-%2" SCNx8,
&version, &trace_id_high, &trace_id_low, &span_id, &trace_flags);
if (result == EOF)
{
error = "EOF";
return false;
}
// We read uint128 as two uint64, so 5 parts and not 4.
if (result != 5)
{
error = fmt::format("could only read {} parts instead of the expected 5",
result);
return false;
}
if (version != 0)
{
error = fmt::format("unexpected version {}, expected 00", version);
return false;
}
trace_id = static_cast<__uint128_t>(trace_id_high) << 64
| trace_id_low;
return true;
}
std::string OpenTelemetryTraceContext::composeTraceparentHeader() const
{
// This span is a parent for its children, so we specify this span_id as a
// parent id.
return fmt::format("00-{:032x}-{:016x}-{:02x}", trace_id,
span_id,
// This cast is needed because fmt is being weird and complaining that
// "mixing character types is not allowed".
static_cast<uint8_t>(trace_flags));
}
}

View File

@ -11,9 +11,8 @@ struct OpenTelemetrySpan
UInt64 span_id;
UInt64 parent_span_id;
std::string operation_name;
Decimal64 start_time_us;
Decimal64 finish_time_us;
UInt64 duration_ns;
UInt64 start_time_us;
UInt64 finish_time_us;
Array attribute_names;
Array attribute_values;
// I don't understand how Links work, namely, which direction should they
@ -23,6 +22,10 @@ struct OpenTelemetrySpan
struct OpenTelemetrySpanLogElement : public OpenTelemetrySpan
{
OpenTelemetrySpanLogElement() = default;
OpenTelemetrySpanLogElement(const OpenTelemetrySpan & span)
: OpenTelemetrySpan(span) {}
static std::string name() { return "OpenTelemetrySpanLog"; }
static Block createBlock();
void appendToBlock(MutableColumns & columns) const;
@ -36,4 +39,10 @@ public:
using SystemLog<OpenTelemetrySpanLogElement>::SystemLog;
};
struct OpenTelemetrySpanHolder : public OpenTelemetrySpan
{
OpenTelemetrySpanHolder(const std::string & _operation_name);
~OpenTelemetrySpanHolder();
};
}

View File

@ -205,7 +205,7 @@ bool Set::insertFromBlock(const Block & block)
{
for (size_t i = 0; i < keys_size; ++i)
{
auto filtered_column = block.getByPosition(i).column->filter(filter->getData(), rows);
auto filtered_column = key_columns[i]->filter(filter->getData(), rows);
if (set_elements[i]->empty())
set_elements[i] = filtered_column;
else

View File

@ -2,6 +2,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Interpreters/QueryThreadLog.h>
#include <Common/CurrentThread.h>
#include <Common/Exception.h>
@ -73,6 +74,15 @@ void ThreadStatus::attachQueryContext(Context & query_context_)
thread_group->global_context = global_context;
}
// Generate new span for thread manually here, because we can't depend
// on OpenTelemetrySpanHolder due to link order issues.
// FIXME why and how is this different from setupState()?
thread_trace_context = query_context->query_trace_context;
if (thread_trace_context.trace_id)
{
thread_trace_context.span_id = thread_local_rng();
}
applyQuerySettings();
}
@ -108,8 +118,22 @@ void ThreadStatus::setupState(const ThreadGroupStatusPtr & thread_group_)
}
if (query_context)
{
applyQuerySettings();
// Generate new span for thread manually here, because we can't depend
// on OpenTelemetrySpanHolder due to link order issues.
thread_trace_context = query_context->query_trace_context;
if (thread_trace_context.trace_id)
{
thread_trace_context.span_id = thread_local_rng();
}
}
else
{
thread_trace_context.trace_id = 0;
}
initPerformanceCounters();
thread_state = ThreadState::AttachedToQuery;
@ -300,6 +324,46 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
assertState({ThreadState::AttachedToQuery}, __PRETTY_FUNCTION__);
std::shared_ptr<OpenTelemetrySpanLog> opentelemetry_span_log;
if (thread_trace_context.trace_id && query_context)
{
opentelemetry_span_log = query_context->getOpenTelemetrySpanLog();
}
if (opentelemetry_span_log)
{
// Log the current thread span.
// We do this manually, because we can't use OpenTelemetrySpanHolder as a
// ThreadStatus member, because of linking issues. This file is linked
// separately, so we can reference OpenTelemetrySpanLog here, but if we had
// the span holder as a field, we would have to reference it in the
// destructor, which is in another library.
OpenTelemetrySpanLogElement span;
span.trace_id = thread_trace_context.trace_id;
// All child span holders should be finished by the time we detach this
// thread, so the current span id should be the thread span id. If not,
// an assertion for a proper parent span in ~OpenTelemetrySpanHolder()
// is going to fail, because we're going to reset it to zero later in
// this function.
span.span_id = thread_trace_context.span_id;
span.parent_span_id = query_context->query_trace_context.span_id;
span.operation_name = getThreadName();
span.start_time_us = query_start_time_microseconds;
span.finish_time_us =
std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
span.attribute_names.push_back("clickhouse.thread_id");
span.attribute_values.push_back(thread_id);
#ifndef NDEBUG
span.attribute_names.push_back("clickhouse.end.stacktrace");
span.attribute_values.push_back(StackTrace().toString());
#endif
opentelemetry_span_log->add(span);
}
finalizeQueryProfiler();
finalizePerformanceCounters();
@ -312,6 +376,8 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
query_id.clear();
query_context = nullptr;
thread_trace_context.trace_id = 0;
thread_trace_context.span_id = 0;
thread_group.reset();
thread_state = thread_exits ? ThreadState::Died : ThreadState::DetachedFromQuery;

View File

@ -153,13 +153,11 @@ static void logQuery(const String & query, const Context & context, bool interna
(!initial_query_id.empty() && current_query_id != initial_query_id ? ", initial_query_id: " + initial_query_id : std::string()),
joinLines(query));
if (client_info.opentelemetry_trace_id)
if (client_info.client_trace_context.trace_id)
{
LOG_TRACE(&Poco::Logger::get("executeQuery"),
"OpenTelemetry trace id {:x}, span id {}, parent span id {}",
client_info.opentelemetry_trace_id,
client_info.opentelemetry_span_id,
client_info.opentelemetry_parent_span_id);
"OpenTelemetry traceparent '{}'",
client_info.client_trace_context.composeTraceparentHeader());
}
}
}
@ -247,17 +245,16 @@ static void onExceptionBeforeStart(const String & query_for_logging, Context & c
query_log->add(elem);
if (auto opentelemetry_span_log = context.getOpenTelemetrySpanLog();
context.getClientInfo().opentelemetry_trace_id
context.query_trace_context.trace_id
&& opentelemetry_span_log)
{
OpenTelemetrySpanLogElement span;
span.trace_id = context.getClientInfo().opentelemetry_trace_id;
span.span_id = context.getClientInfo().opentelemetry_span_id;
span.parent_span_id = context.getClientInfo().opentelemetry_parent_span_id;
span.trace_id = context.query_trace_context.trace_id;
span.span_id = context.query_trace_context.span_id;
span.parent_span_id = context.getClientInfo().client_trace_context.span_id;
span.operation_name = "query";
span.start_time_us = current_time_us;
span.finish_time_us = current_time_us;
span.duration_ns = 0;
/// Keep values synchronized to type enum in QueryLogElement::createBlock.
span.attribute_names.push_back("clickhouse.query_status");
@ -269,11 +266,11 @@ static void onExceptionBeforeStart(const String & query_for_logging, Context & c
span.attribute_names.push_back("clickhouse.query_id");
span.attribute_values.push_back(elem.client_info.current_query_id);
if (!context.getClientInfo().opentelemetry_tracestate.empty())
if (!context.query_trace_context.tracestate.empty())
{
span.attribute_names.push_back("clickhouse.tracestate");
span.attribute_values.push_back(
context.getClientInfo().opentelemetry_tracestate);
context.query_trace_context.tracestate);
}
opentelemetry_span_log->add(span);
@ -349,8 +346,14 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
{
if (!select_with_union_query->list_of_selects->children.empty())
{
if (auto new_settings = select_with_union_query->list_of_selects->children.back()->as<ASTSelectQuery>()->settings())
InterpreterSetQuery(new_settings, context).executeForCurrentContext();
// We might have an arbitrarily complex UNION tree, so just give
// up if the last first-order child is not a plain SELECT.
// It is flattened later, when we process UNION ALL/DISTINCT.
const auto * last_select = select_with_union_query->list_of_selects->children.back()->as<ASTSelectQuery>();
if (last_select && last_select->settings())
{
InterpreterSetQuery(last_select->settings(), context).executeForCurrentContext();
}
}
}
else if (const auto * query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get()))
@ -479,7 +482,11 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode);
}
res = interpreter->execute();
{
OpenTelemetrySpanHolder span("IInterpreter::execute()");
res = interpreter->execute();
}
QueryPipeline & pipeline = res.pipeline;
bool use_processors = pipeline.initialized();
@ -685,17 +692,16 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
}
if (auto opentelemetry_span_log = context.getOpenTelemetrySpanLog();
context.getClientInfo().opentelemetry_trace_id
context.query_trace_context.trace_id
&& opentelemetry_span_log)
{
OpenTelemetrySpanLogElement span;
span.trace_id = context.getClientInfo().opentelemetry_trace_id;
span.span_id = context.getClientInfo().opentelemetry_span_id;
span.parent_span_id = context.getClientInfo().opentelemetry_parent_span_id;
span.trace_id = context.query_trace_context.trace_id;
span.span_id = context.query_trace_context.span_id;
span.parent_span_id = context.getClientInfo().client_trace_context.span_id;
span.operation_name = "query";
span.start_time_us = elem.query_start_time_microseconds;
span.finish_time_us = time_in_microseconds(finish_time);
span.duration_ns = elapsed_seconds * 1000000000;
/// Keep values synchronized to type enum in QueryLogElement::createBlock.
span.attribute_names.push_back("clickhouse.query_status");
@ -706,11 +712,11 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
span.attribute_names.push_back("clickhouse.query_id");
span.attribute_values.push_back(elem.client_info.current_query_id);
if (!context.getClientInfo().opentelemetry_tracestate.empty())
if (!context.query_trace_context.tracestate.empty())
{
span.attribute_names.push_back("clickhouse.tracestate");
span.attribute_values.push_back(
context.getClientInfo().opentelemetry_tracestate);
context.query_trace_context.tracestate);
}
opentelemetry_span_log->add(span);

View File

@ -1,8 +1,10 @@
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTSubquery.h>
#include <Common/typeid_cast.h>
#include <IO/Operators.h>
#include <iostream>
namespace DB
{
@ -15,6 +17,10 @@ ASTPtr ASTSelectWithUnionQuery::clone() const
res->list_of_selects = list_of_selects->clone();
res->children.push_back(res->list_of_selects);
res->union_mode = union_mode;
res->list_of_modes = list_of_modes;
cloneOutputOptions(*res);
return res;
}
@ -24,15 +30,44 @@ void ASTSelectWithUnionQuery::formatQueryImpl(const FormatSettings & settings, F
{
std::string indent_str = settings.one_line ? "" : std::string(4 * frame.indent, ' ');
auto mode_to_str = [&](auto mode)
{
if (mode == Mode::Unspecified)
return "";
else if (mode == Mode::ALL)
return "ALL";
else
return "DISTINCT";
};
for (ASTs::const_iterator it = list_of_selects->children.begin(); it != list_of_selects->children.end(); ++it)
{
if (it != list_of_selects->children.begin())
settings.ostr
<< settings.nl_or_ws << indent_str << (settings.hilite ? hilite_keyword : "")
<< "UNION ALL" << (settings.hilite ? hilite_none : "")
<< settings.nl_or_ws;
settings.ostr << settings.nl_or_ws << indent_str << (settings.hilite ? hilite_keyword : "") << "UNION "
<< mode_to_str((is_normalized) ? union_mode : list_of_modes[it - list_of_selects->children.begin() - 1])
<< (settings.hilite ? hilite_none : "");
(*it)->formatImpl(settings, state, frame);
if (auto * node = (*it)->as<ASTSelectWithUnionQuery>())
{
if (node->list_of_selects->children.size() == 1)
{
if (it != list_of_selects->children.begin())
settings.ostr << settings.nl_or_ws;
(node->list_of_selects->children.at(0))->formatImpl(settings, state, frame);
}
else
{
auto sub_query = std::make_shared<ASTSubquery>();
sub_query->children.push_back(*it);
sub_query->formatImpl(settings, state, frame);
}
}
else
{
if (it != list_of_selects->children.begin())
settings.ostr << settings.nl_or_ws;
(*it)->formatImpl(settings, state, frame);
}
}
}

View File

@ -5,9 +5,8 @@
namespace DB
{
/** Single SELECT query or multiple SELECT queries with UNION ALL.
* Only UNION ALL is possible. No UNION DISTINCT or plain UNION.
/** Single SELECT query or multiple SELECT queries with UNION
* or UNION or UNION DISTINCT
*/
class ASTSelectWithUnionQuery : public ASTQueryWithOutput
{
@ -17,6 +16,21 @@ public:
ASTPtr clone() const override;
void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
enum class Mode
{
Unspecified,
ALL,
DISTINCT
};
using UnionModes = std::vector<Mode>;
Mode union_mode;
UnionModes list_of_modes;
bool is_normalized = false;
ASTPtr list_of_selects;
};

View File

@ -103,6 +103,51 @@ bool ParserList::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
return true;
}
bool ParserUnionList::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ASTs elements;
auto parse_element = [&]
{
ASTPtr element;
if (!elem_parser->parse(pos, element, expected))
return false;
elements.push_back(element);
return true;
};
/// Parse UNION type
auto parse_separator = [&]
{
if (s_union_parser->ignore(pos, expected))
{
// SELECT ... UNION ALL SELECT ...
if (s_all_parser->check(pos, expected))
{
union_modes.push_back(ASTSelectWithUnionQuery::Mode::ALL);
}
// SELECT ... UNION DISTINCT SELECT ...
else if (s_distinct_parser->check(pos, expected))
{
union_modes.push_back(ASTSelectWithUnionQuery::Mode::DISTINCT);
}
// SELECT ... UNION SELECT ...
else
union_modes.push_back(ASTSelectWithUnionQuery::Mode::Unspecified);
return true;
}
return false;
};
if (!parseUtil(pos, parse_element, parse_separator))
return false;
auto list = std::make_shared<ASTExpressionList>();
list->children = std::move(elements);
node = list;
return true;
}
static bool parseOperator(IParser::Pos & pos, const char * op, Expected & expected)
{

View File

@ -5,6 +5,7 @@
#include <Parsers/IParserBase.h>
#include <Parsers/CommonParsers.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Common/IntervalKind.h>
namespace DB
@ -74,6 +75,52 @@ private:
char result_separator;
};
class ParserUnionList : public IParserBase
{
public:
ParserUnionList(ParserPtr && elem_parser_, ParserPtr && s_union_parser_, ParserPtr && s_all_parser_, ParserPtr && s_distinct_parser_)
: elem_parser(std::move(elem_parser_))
, s_union_parser(std::move(s_union_parser_))
, s_all_parser(std::move(s_all_parser_))
, s_distinct_parser(std::move(s_distinct_parser_))
{
}
template <typename ElemFunc, typename SepFunc>
static bool parseUtil(Pos & pos, const ElemFunc & parse_element, const SepFunc & parse_separator)
{
Pos begin = pos;
if (!parse_element())
{
pos = begin;
return false;
}
while (true)
{
begin = pos;
if (!parse_separator() || !parse_element())
{
pos = begin;
return true;
}
}
return false;
}
auto getUnionModes() const { return union_modes; }
protected:
const char * getName() const override { return "list of union elements"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
private:
ParserPtr elem_parser;
ParserPtr s_union_parser;
ParserPtr s_all_parser;
ParserPtr s_distinct_parser;
ASTSelectWithUnionQuery::UnionModes union_modes;
};
/** An expression with an infix binary left-associative operator.
* For example, a + b - c + d.

View File

@ -3,43 +3,44 @@
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ParserUnionQueryElement.h>
#include <Parsers/ASTExpressionList.h>
#include <Common/typeid_cast.h>
namespace DB
{
static void getSelectsFromUnionListNode(ASTPtr & ast_select, ASTs & selects)
{
if (auto * inner_union = ast_select->as<ASTSelectWithUnionQuery>())
{
for (auto & child : inner_union->list_of_selects->children)
getSelectsFromUnionListNode(child, selects);
return;
}
selects.push_back(std::move(ast_select));
}
bool ParserSelectWithUnionQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ASTPtr list_node;
ParserList parser(std::make_unique<ParserUnionQueryElement>(), std::make_unique<ParserKeyword>("UNION ALL"), false);
ParserUnionList parser(
std::make_unique<ParserUnionQueryElement>(),
std::make_unique<ParserKeyword>("UNION"),
std::make_unique<ParserKeyword>("ALL"),
std::make_unique<ParserKeyword>("DISTINCT"));
if (!parser.parse(pos, list_node, expected))
return false;
/// NOTE: We cann't simply flatten inner union query now, since we may have different union mode in query,
/// so flatten may change it's semantics. For example:
/// flatten `SELECT 1 UNION (SELECT 1 UNION ALL SELETC 1)` -> `SELECT 1 UNION SELECT 1 UNION ALL SELECT 1`
/// If we got only one child which is ASTSelectWithUnionQuery, just lift it up
auto & expr_list = list_node->as<ASTExpressionList &>();
if (expr_list.children.size() == 1)
{
if (expr_list.children.at(0)->as<ASTSelectWithUnionQuery>())
{
node = std::move(expr_list.children.at(0));
return true;
}
}
auto select_with_union_query = std::make_shared<ASTSelectWithUnionQuery>();
node = select_with_union_query;
select_with_union_query->list_of_selects = std::make_shared<ASTExpressionList>();
select_with_union_query->list_of_selects = list_node;
select_with_union_query->children.push_back(select_with_union_query->list_of_selects);
// flatten inner union query
for (auto & child : list_node->children)
getSelectsFromUnionListNode(child, select_with_union_query->list_of_selects->children);
select_with_union_query->list_of_modes = parser.getUnionModes();
return true;
}

View File

@ -1,6 +1,5 @@
#pragma once
#include <ostream>
#include <Parsers/IAST.h>
@ -29,3 +28,20 @@ inline WriteBuffer & operator<<(WriteBuffer & buf, const ASTPtr & ast)
}
}
template<>
struct fmt::formatter<DB::ASTPtr>
{
template<typename ParseContext>
constexpr auto parse(ParseContext & context)
{
return context.begin();
}
template<typename FormatContext>
auto format(const DB::ASTPtr & ast, FormatContext & context)
{
return fmt::format_to(context.out(), "{}", DB::serializeAST(*ast));
}
};

View File

@ -1,4 +1,6 @@
#include <Parsers/parseQuery.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Parsers/ParserQuery.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/Lexer.h>

View File

@ -8,6 +8,7 @@
#include <Processors/ISource.h>
#include <Common/setThreadName.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#ifndef NDEBUG
#include <Common/Stopwatch.h>
@ -692,6 +693,8 @@ void PipelineExecutor::initializeExecution(size_t num_threads)
void PipelineExecutor::executeImpl(size_t num_threads)
{
OpenTelemetrySpanHolder span("PipelineExecutor::executeImpl()");
initializeExecution(num_threads);
using ThreadsData = std::vector<ThreadFromGlobalPool>;

View File

@ -0,0 +1,3 @@
if (USE_GRPC)
add_subdirectory (grpc_protos)
endif()

1634
src/Server/GRPCServer.cpp Normal file

File diff suppressed because it is too large Load Diff

51
src/Server/GRPCServer.h Normal file
View File

@ -0,0 +1,51 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include <Common/config.h>
#endif
#if USE_GRPC
#include <Poco/Net/SocketAddress.h>
#include "clickhouse_grpc.grpc.pb.h"
namespace Poco { class Logger; }
namespace grpc
{
class Server;
class ServerCompletionQueue;
}
namespace DB
{
class IServer;
class GRPCServer
{
public:
GRPCServer(IServer & iserver_, const Poco::Net::SocketAddress & address_to_listen_);
~GRPCServer();
/// Starts the server. A new thread will be created that waits for and accepts incoming connections.
void start();
/// Stops the server. No new connections will be accepted.
void stop();
/// Returns the number of currently handled connections.
size_t currentConnections() const;
private:
using GRPCService = clickhouse::grpc::ClickHouse::AsyncService;
class Runner;
IServer & iserver;
const Poco::Net::SocketAddress address_to_listen;
Poco::Logger * log;
GRPCService grpc_service;
std::unique_ptr<grpc::Server> grpc_server;
std::unique_ptr<grpc::ServerCompletionQueue> queue;
std::unique_ptr<Runner> runner;
};
}
#endif

View File

@ -318,7 +318,7 @@ void HTTPHandler::processQuery(
{
std::string opentelemetry_traceparent = request.get("traceparent");
std::string error;
if (!context.getClientInfo().parseTraceparentHeader(
if (!context.getClientInfo().client_trace_context.parseTraceparentHeader(
opentelemetry_traceparent, error))
{
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER,
@ -326,7 +326,7 @@ void HTTPHandler::processQuery(
opentelemetry_traceparent, error);
}
context.getClientInfo().opentelemetry_tracestate = request.get("tracestate", "");
context.getClientInfo().client_trace_context.tracestate = request.get("tracestate", "");
}
#endif

View File

@ -0,0 +1,50 @@
#include <Server/ProtocolServerAdapter.h>
#include <Poco/Net/TCPServer.h>
#if USE_GRPC
#include <Server/GRPCServer.h>
#endif
namespace DB
{
class ProtocolServerAdapter::TCPServerAdapterImpl : public Impl
{
public:
explicit TCPServerAdapterImpl(std::unique_ptr<Poco::Net::TCPServer> tcp_server_) : tcp_server(std::move(tcp_server_)) {}
~TCPServerAdapterImpl() override = default;
void start() override { tcp_server->start(); }
void stop() override { tcp_server->stop(); }
size_t currentConnections() const override { return tcp_server->currentConnections(); }
private:
std::unique_ptr<Poco::Net::TCPServer> tcp_server;
};
ProtocolServerAdapter::ProtocolServerAdapter(std::unique_ptr<Poco::Net::TCPServer> tcp_server_)
{
impl = std::make_unique<TCPServerAdapterImpl>(std::move(tcp_server_));
}
#if USE_GRPC
class ProtocolServerAdapter::GRPCServerAdapterImpl : public Impl
{
public:
explicit GRPCServerAdapterImpl(std::unique_ptr<GRPCServer> grpc_server_) : grpc_server(std::move(grpc_server_)) {}
~GRPCServerAdapterImpl() override = default;
void start() override { grpc_server->start(); }
void stop() override { grpc_server->stop(); }
size_t currentConnections() const override { return grpc_server->currentConnections(); }
private:
std::unique_ptr<GRPCServer> grpc_server;
};
ProtocolServerAdapter::ProtocolServerAdapter(std::unique_ptr<GRPCServer> grpc_server_)
{
impl = std::make_unique<GRPCServerAdapterImpl>(std::move(grpc_server_));
}
#endif
}

View File

@ -0,0 +1,55 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include <Common/config.h>
#endif
#include <memory>
namespace Poco::Net { class TCPServer; }
namespace DB
{
class GRPCServer;
/// Provides an unified interface to access a protocol implementing server
/// no matter what type it has (HTTPServer, TCPServer, MySQLServer, GRPCServer, ...).
class ProtocolServerAdapter
{
public:
ProtocolServerAdapter() {}
ProtocolServerAdapter(ProtocolServerAdapter && src) = default;
ProtocolServerAdapter & operator =(ProtocolServerAdapter && src) = default;
~ProtocolServerAdapter() {}
ProtocolServerAdapter(std::unique_ptr<Poco::Net::TCPServer> tcp_server_);
#if USE_GRPC
ProtocolServerAdapter(std::unique_ptr<GRPCServer> grpc_server_);
#endif
/// Starts the server. A new thread will be created that waits for and accepts incoming connections.
void start() { impl->start(); }
/// Stops the server. No new connections will be accepted.
void stop() { impl->stop(); }
/// Returns the number of currently handled connections.
size_t currentConnections() const { return impl->currentConnections(); }
private:
class Impl
{
public:
virtual ~Impl() {}
virtual void start() = 0;
virtual void stop() = 0;
virtual size_t currentConnections() const = 0;
};
class TCPServerAdapterImpl;
class GRPCServerAdapterImpl;
std::unique_ptr<Impl> impl;
};
}

View File

@ -20,6 +20,7 @@
#include <Interpreters/executeQuery.h>
#include <Interpreters/TablesStatus.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Storages/StorageMemory.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Core/ExternalTable.h>
@ -517,6 +518,8 @@ void TCPHandler::processInsertQuery(const Settings & connection_settings)
void TCPHandler::processOrdinaryQuery()
{
OpenTelemetrySpanHolder span(__PRETTY_FUNCTION__);
/// Pull query execution result, if exists, and send it to network.
if (state.io.in)
{

View File

@ -0,0 +1,11 @@
PROTOBUF_GENERATE_GRPC_CPP(clickhouse_grpc_proto_sources clickhouse_grpc_proto_headers clickhouse_grpc.proto)
# Ignore warnings while compiling protobuf-generated *.pb.h and *.pb.cpp files.
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -w")
# Disable clang-tidy for protobuf-generated *.pb.h and *.pb.cpp files.
set (CMAKE_CXX_CLANG_TIDY "")
add_library(clickhouse_grpc_protos ${clickhouse_grpc_proto_headers} ${clickhouse_grpc_proto_sources})
target_include_directories(clickhouse_grpc_protos SYSTEM PUBLIC ${gRPC_INCLUDE_DIRS} ${Protobuf_INCLUDE_DIR} ${CMAKE_CURRENT_BINARY_DIR})
target_link_libraries (clickhouse_grpc_protos PUBLIC ${gRPC_LIBRARIES})

View File

@ -0,0 +1,151 @@
/* This file describes gRPC protocol supported in ClickHouse.
*
* To use this protocol a client should send one or more messages of the QueryInfo type
* and then receive one or more messages of the Result type.
* According to that the service provides four methods for that:
* ExecuteQuery(QueryInfo) returns (Result)
* ExecuteQueryWithStreamInput(stream QueryInfo) returns (Result)
* ExecuteQueryWithStreamOutput(QueryInfo) returns (stream Result)
* ExecuteQueryWithStreamIO(stream QueryInfo) returns (stream Result)
* It's up to the client to choose which method to use.
* For example, ExecuteQueryWithStreamInput() allows the client to add data multiple times
* while executing a query, which is suitable for inserting many rows.
*/
syntax = "proto3";
package clickhouse.grpc;
message NameAndType {
string name = 1;
string type = 2;
}
// Desribes an external table - a table which will exists only while a query is executing.
message ExternalTable {
// Name of the table. If omitted, "_data" is used.
string name = 1;
// Columns of the table. Types are required, names can be omitted. If the names are omitted, "_1", "_2", ... is used.
repeated NameAndType columns = 2;
// Data to insert to the external table.
// If a method with streaming input (i.e. ExecuteQueryWithStreamInput() or ExecuteQueryWithStreamIO()) is used,
// then data for insertion to the same external table can be splitted between multiple QueryInfos.
string data = 3;
// Format of the data to insert to the external table.
string format = 4;
// Settings for executing that insertion, applied after QueryInfo.settings.
map<string, string> settings = 5;
}
// Information about a query which a client sends to a ClickHouse server.
// The first QueryInfo can set any of the following fields. Extra QueryInfos only add extra data.
// In extra QueryInfos only `input_data`, `external_tables`, `next_query_info` and `cancel` fields can be set.
message QueryInfo {
string query = 1;
string query_id = 2;
map<string, string> settings = 3;
// Default database.
string database = 4;
// Input data, used both as data for INSERT query and as data for the input() function.
string input_data = 5;
// Delimiter for input_data, inserted between input_data from adjacent QueryInfos.
string input_data_delimiter = 6;
// Default output format. If not specified, 'TabSeparated' is used.
string output_format = 7;
repeated ExternalTable external_tables = 8;
string user_name = 9;
string password = 10;
string quota = 11;
// Works exactly like sessions in the HTTP protocol.
string session_id = 12;
bool session_check = 13;
uint32 session_timeout = 14;
// Set `cancel` to true to stop executing the query.
bool cancel = 15;
// If true there will be at least one more QueryInfo in the input stream.
// `next_query_info` is allowed to be set only if a method with streaming input (i.e. ExecuteQueryWithStreamInput() or ExecuteQueryWithStreamIO()) is used.
bool next_query_info = 16;
}
enum LogsLevel {
LOG_NONE = 0;
LOG_FATAL = 1;
LOG_CRITICAL = 2;
LOG_ERROR = 3;
LOG_WARNING = 4;
LOG_NOTICE = 5;
LOG_INFORMATION = 6;
LOG_DEBUG = 7;
LOG_TRACE = 8;
}
message LogEntry {
uint32 time = 1;
uint32 time_microseconds = 2;
uint64 thread_id = 3;
string query_id = 4;
LogsLevel level = 5;
string source = 6;
string text = 7;
}
message Progress {
uint64 read_rows = 1;
uint64 read_bytes = 2;
uint64 total_rows_to_read = 3;
uint64 written_rows = 4;
uint64 written_bytes = 5;
}
message Stats {
uint64 rows = 1;
uint64 blocks = 2;
uint64 allocated_bytes = 3;
bool applied_limit = 4;
uint64 rows_before_limit = 5;
}
message Exception {
int32 code = 1;
string name = 2;
string display_text = 3;
string stack_trace = 4;
}
// Result of execution of a query which is sent back by the ClickHouse server to the client.
message Result {
// Output of the query, represented in the `output_format` or in a format specified in `query`.
string output = 1;
string totals = 2;
string extremes = 3;
repeated LogEntry logs = 4;
Progress progress = 5;
Stats stats = 6;
// Set by the ClickHouse server if there was an exception thrown while executing.
Exception exception = 7;
// Set by the ClickHouse server if executing was cancelled by the `cancel` field in QueryInfo.
bool cancelled = 8;
}
service ClickHouse {
rpc ExecuteQuery(QueryInfo) returns (Result) {}
rpc ExecuteQueryWithStreamInput(stream QueryInfo) returns (Result) {}
rpc ExecuteQueryWithStreamOutput(QueryInfo) returns (stream Result) {}
rpc ExecuteQueryWithStreamIO(stream QueryInfo) returns (stream Result) {}
}

View File

@ -10,6 +10,7 @@ PEERDIR(
SRCS(
GRPCServer.cpp
HTTPHandler.cpp
HTTPHandlerFactory.cpp
InterserverIOHTTPHandler.cpp
@ -20,6 +21,7 @@ SRCS(
PostgreSQLHandlerFactory.cpp
PrometheusMetricsWriter.cpp
PrometheusRequestHandler.cpp
ProtocolServerAdapter.cpp
ReplicasStatusHandler.cpp
StaticRequestHandler.cpp
TCPHandler.cpp

View File

@ -63,34 +63,34 @@ void ColumnDescription::writeText(WriteBuffer & buf) const
{
writeBackQuotedString(name, buf);
writeChar(' ', buf);
DB::writeText(type->getName(), buf);
writeEscapedString(type->getName(), buf);
if (default_desc.expression)
{
writeChar('\t', buf);
DB::writeText(DB::toString(default_desc.kind), buf);
writeChar('\t', buf);
DB::writeText(queryToString(default_desc.expression), buf);
writeEscapedString(queryToString(default_desc.expression), buf);
}
if (!comment.empty())
{
writeChar('\t', buf);
DB::writeText("COMMENT ", buf);
DB::writeText(queryToString(ASTLiteral(Field(comment))), buf);
writeEscapedString(queryToString(ASTLiteral(Field(comment))), buf);
}
if (codec)
{
writeChar('\t', buf);
DB::writeText(queryToString(codec), buf);
writeEscapedString(queryToString(codec), buf);
}
if (ttl)
{
writeChar('\t', buf);
DB::writeText("TTL ", buf);
DB::writeText(queryToString(ttl), buf);
writeEscapedString(queryToString(ttl), buf);
}
writeChar('\n', buf);

View File

@ -4,6 +4,7 @@
#include <cppkafka/cppkafka.h>
#include <boost/algorithm/string/join.hpp>
#include <fmt/ostream.h>
#include <algorithm>
namespace DB

View File

@ -16,7 +16,7 @@ namespace DB
RabbitMQBlockInputStream::RabbitMQBlockInputStream(
StorageRabbitMQ & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const Context & context_,
std::shared_ptr<Context> context_,
const Names & columns,
size_t max_block_size_,
bool ack_in_suffix_)
@ -54,7 +54,7 @@ Block RabbitMQBlockInputStream::getHeader() const
void RabbitMQBlockInputStream::readPrefixImpl()
{
auto timeout = std::chrono::milliseconds(context.getSettingsRef().rabbitmq_max_wait_ms.totalMilliseconds());
auto timeout = std::chrono::milliseconds(context->getSettingsRef().rabbitmq_max_wait_ms.totalMilliseconds());
buffer = storage.popReadBuffer(timeout);
}
@ -96,7 +96,7 @@ Block RabbitMQBlockInputStream::readImpl()
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
auto input_format = FormatFactory::instance().getInputFormat(
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size);
storage.getFormatName(), *buffer, non_virtual_header, *context, max_block_size);
InputPort port(input_format->getPort().getHeader(), input_format.get());
connect(input_format->getPort(), port);

View File

@ -15,7 +15,7 @@ public:
RabbitMQBlockInputStream(
StorageRabbitMQ & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const Context & context_,
std::shared_ptr<Context> context_,
const Names & columns,
size_t max_block_size_,
bool ack_in_suffix = true);
@ -37,7 +37,7 @@ public:
private:
StorageRabbitMQ & storage;
StorageMetadataPtr metadata_snapshot;
const Context & context;
std::shared_ptr<Context> context;
Names column_names;
const size_t max_block_size;
bool ack_in_suffix;

View File

@ -74,7 +74,6 @@ StorageRabbitMQ::StorageRabbitMQ(
std::unique_ptr<RabbitMQSettings> rabbitmq_settings_)
: IStorage(table_id_)
, global_context(context_.getGlobalContext())
, rabbitmq_context(Context(global_context))
, rabbitmq_settings(std::move(rabbitmq_settings_))
, exchange_name(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_exchange_name.value))
, format_name(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_format.value))
@ -114,8 +113,8 @@ StorageRabbitMQ::StorageRabbitMQ(
storage_metadata.setColumns(columns_);
setInMemoryMetadata(storage_metadata);
rabbitmq_context.makeQueryContext();
rabbitmq_context = addSettings(rabbitmq_context);
rabbitmq_context = addSettings(global_context);
rabbitmq_context->makeQueryContext();
/// One looping task for all consumers as they share the same connection == the same handler == the same event loop
event_handler->updateLoopState(Loop::STOP);
@ -193,16 +192,17 @@ String StorageRabbitMQ::getTableBasedName(String name, const StorageID & table_i
}
Context StorageRabbitMQ::addSettings(Context context) const
std::shared_ptr<Context> StorageRabbitMQ::addSettings(const Context & context) const
{
context.setSetting("input_format_skip_unknown_fields", true);
context.setSetting("input_format_allow_errors_ratio", 0.);
context.setSetting("input_format_allow_errors_num", rabbitmq_settings->rabbitmq_skip_broken_messages.value);
auto modified_context = std::make_shared<Context>(context);
modified_context->setSetting("input_format_skip_unknown_fields", true);
modified_context->setSetting("input_format_allow_errors_ratio", 0.);
modified_context->setSetting("input_format_allow_errors_num", rabbitmq_settings->rabbitmq_skip_broken_messages.value);
if (!schema_name.empty())
context.setSetting("format_schema", schema_name);
modified_context->setSetting("format_schema", schema_name);
return context;
return modified_context;
}
@ -538,6 +538,7 @@ Pipe StorageRabbitMQ::read(
auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
auto modified_context = addSettings(context);
auto block_size = getMaxBlockSize();
bool update_channels = false;
@ -581,7 +582,9 @@ Pipe StorageRabbitMQ::read(
looping_task->activateAndSchedule();
LOG_DEBUG(log, "Starting reading {} streams", pipes.size());
return Pipe::unitePipes(std::move(pipes));
auto united_pipe = Pipe::unitePipes(std::move(pipes));
united_pipe.addInterpreterContext(modified_context);
return united_pipe;
}
@ -785,7 +788,7 @@ bool StorageRabbitMQ::streamToViews()
insert->table_id = table_id;
// Only insert into dependent views and expect that input blocks contain virtual columns
InterpreterInsertQuery interpreter(insert, rabbitmq_context, false, true, true);
InterpreterInsertQuery interpreter(insert, *rabbitmq_context, false, true, true);
auto block_io = interpreter.execute();
auto metadata_snapshot = getInMemoryMetadataPtr();

View File

@ -73,7 +73,7 @@ protected:
private:
const Context & global_context;
Context rabbitmq_context;
std::shared_ptr<Context> rabbitmq_context;
std::unique_ptr<RabbitMQSettings> rabbitmq_settings;
const String exchange_name;
@ -135,7 +135,7 @@ private:
static AMQP::ExchangeType defineExchangeType(String exchange_type_);
static String getTableBasedName(String name, const StorageID & table_id);
Context addSettings(Context context) const;
std::shared_ptr<Context> addSettings(const Context & context) const;
size_t getMaxBlockSize() const;
void deactivateTask(BackgroundSchedulePool::TaskHolder & task, bool wait, bool stop_loop);

View File

@ -98,20 +98,35 @@ void checkAllowedQueries(const ASTSelectQuery & query)
}
/// check if only one single select query in SelectWithUnionQuery
static bool isSingleSelect(const ASTPtr & select, ASTPtr & res)
{
auto new_select = select->as<ASTSelectWithUnionQuery &>();
if (new_select.list_of_selects->children.size() != 1)
return false;
auto & new_inner_query = new_select.list_of_selects->children.at(0);
if (new_inner_query->as<ASTSelectQuery>())
{
res = new_inner_query;
return true;
}
else
return isSingleSelect(new_inner_query, res);
}
SelectQueryDescription SelectQueryDescription::getSelectQueryFromASTForMatView(const ASTPtr & select, const Context & context)
{
auto & new_select = select->as<ASTSelectWithUnionQuery &>();
ASTPtr new_inner_query;
if (new_select.list_of_selects->children.size() != 1)
if (!isSingleSelect(select, new_inner_query))
throw Exception("UNION is not supported for MATERIALIZED VIEW", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_MATERIALIZED_VIEW);
auto & new_inner_query = new_select.list_of_selects->children.at(0);
auto & select_query = new_inner_query->as<ASTSelectQuery &>();
checkAllowedQueries(select_query);
SelectQueryDescription result;
result.select_table_id = extractDependentTableFromSelectQuery(select_query, context);
result.select_query = new_select.clone();
result.select_query = select->as<ASTSelectWithUnionQuery &>().clone();
result.inner_query = new_inner_query->clone();
return result;

View File

@ -695,7 +695,7 @@ void StorageDistributed::createDirectoryMonitors(const std::string & disk)
if (std::filesystem::is_empty(dir_path))
{
LOG_DEBUG(log, "Removing {} (used for async INSERT into Distributed)", dir_path);
LOG_DEBUG(log, "Removing {} (used for async INSERT into Distributed)", dir_path.string());
/// Will be created by DistributedBlockOutputStream on demand.
std::filesystem::remove(dir_path);
}

View File

@ -74,16 +74,19 @@ namespace
ReadWriteBufferFromHTTP::HTTPHeaderEntries header;
// Propagate OpenTelemetry trace context, if any, downstream.
const auto & client_info = context.getClientInfo();
if (client_info.opentelemetry_trace_id)
if (CurrentThread::isInitialized())
{
header.emplace_back("traceparent",
client_info.composeTraceparentHeader());
if (!client_info.opentelemetry_tracestate.empty())
const auto & thread_trace_context = CurrentThread::get().thread_trace_context;
if (thread_trace_context.trace_id)
{
header.emplace_back("tracestate",
client_info.opentelemetry_tracestate);
header.emplace_back("traceparent",
thread_trace_context.composeTraceparentHeader());
if (!thread_trace_context.tracestate.empty())
{
header.emplace_back("tracestate",
thread_trace_context.tracestate);
}
}
}

View File

@ -33,6 +33,7 @@ ln -sf $SRC_PATH/users.d/log_queries.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/readonly.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/access_management.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/database_atomic_drop_detach_sync.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/opentelemetry.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/ints_dictionary.xml $DEST_SERVER_PATH/
ln -sf $SRC_PATH/strings_dictionary.xml $DEST_SERVER_PATH/

View File

@ -0,0 +1,7 @@
<yandex>
<profiles>
<default>
<opentelemetry_start_trace_probability>0.1</opentelemetry_start_trace_probability>
</default>
</profiles>
</yandex>

View File

@ -907,6 +907,10 @@ class ClickHouseInstance:
build_opts = self.query("SELECT value FROM system.build_options WHERE name = 'CXX_FLAGS'")
return "-fsanitize=thread" in build_opts
def is_built_with_address_sanitizer(self):
build_opts = self.query("SELECT value FROM system.build_options WHERE name = 'CXX_FLAGS'")
return "-fsanitize=address" in build_opts
# Connects to the instance via clickhouse-client, sends a query (1st argument) and returns the answer
def query(self, sql, stdin=None, timeout=None, settings=None, user=None, password=None, database=None,
ignore_error=False):

View File

@ -16,6 +16,7 @@ CONTAINER_NAME = "clickhouse_integration_tests"
CONFIG_DIR_IN_REPO = "programs/server"
INTERGATION_DIR_IN_REPO = "tests/integration"
SRC_DIR_IN_REPO = "src"
DIND_INTEGRATION_TESTS_IMAGE_NAME = "yandex/clickhouse-integration-tests-runner"
@ -51,6 +52,13 @@ def check_args_and_update_paths(args):
args.cases_dir = os.path.abspath(os.path.join(CLICKHOUSE_ROOT, INTERGATION_DIR_IN_REPO))
logging.info("Cases dir is not set. Will use {}".format(args.cases_dir))
if args.src_dir:
if not os.path.isabs(args.src_dir):
args.src_dir = os.path.abspath(os.path.join(CURRENT_WORK_DIR, args.src_dir))
else:
args.src_dir = os.path.abspath(os.path.join(CLICKHOUSE_ROOT, SRC_DIR_IN_REPO))
logging.info("src dir is not set. Will use {}".format(args.src_dir))
logging.info("base_configs_dir: {}, binary: {}, cases_dir: {} ".format(args.base_configs_dir, args.binary, args.cases_dir))
for path in [args.binary, args.bridge_binary, args.base_configs_dir, args.cases_dir, CLICKHOUSE_ROOT]:
@ -104,6 +112,11 @@ if __name__ == "__main__":
default=os.environ.get("CLICKHOUSE_TESTS_INTEGRATION_PATH"),
help="Path to integration tests cases and configs directory. For example tests/integration in repository")
parser.add_argument(
"--src-dir",
default=os.environ.get("CLICKHOUSE_SRC_DIR"),
help="Path to the 'src' directory in repository. Used to provide schemas (e.g. *.proto) for some tests when those schemas are located in the 'src' directory")
parser.add_argument(
"--clickhouse-root",
help="Path to repository root folder. Used to take configuration from repository default paths.")
@ -174,6 +187,7 @@ if __name__ == "__main__":
cmd = "docker run {net} {tty} --rm --name {name} --privileged --volume={bridge_bin}:/clickhouse-odbc-bridge --volume={bin}:/clickhouse \
--volume={base_cfg}:/clickhouse-config --volume={cases_dir}:/ClickHouse/tests/integration \
--volume={src_dir}/Server/grpc_protos:/ClickHouse/src/Server/grpc_protos \
--volume={name}_volume:/var/lib/docker {env_tags} -e PYTEST_OPTS='{opts}' {img} {command}".format(
net=net,
tty=tty,
@ -181,6 +195,7 @@ if __name__ == "__main__":
bridge_bin=args.bridge_binary,
base_cfg=args.base_configs_dir,
cases_dir=args.cases_dir,
src_dir=args.src_dir,
env_tags=env_tags,
opts=' '.join(args.pytest_args),
img=DIND_INTEGRATION_TESTS_IMAGE_NAME + ":" + args.docker_image_version,

View File

@ -0,0 +1,39 @@
<yandex>
<remote_servers>
<source_cluster>
<shard>
<weight>1</weight>
<replica>
<host>s0_0_0</host>
<port>9000</port>
</replica>
</shard>
</source_cluster>
<default_cluster>
<shard>
<weight>1</weight>
<replica>
<host>s1_1_0</host>
<port>9000</port>
</replica>
</shard>
</default_cluster>
</remote_servers>
<max_workers>1</max_workers>
<tables>
<table_copier_test1>
<cluster_pull>source_cluster</cluster_pull>
<database_pull>default</database_pull>
<table_pull>copier_test1</table_pull>
<cluster_push>default_cluster</cluster_push>
<database_push>default</database_push>
<table_push>copier_test1_1</table_push>
<engine>ENGINE = MergeTree ORDER BY date SETTINGS index_granularity = 8192</engine>
<sharding_key>rand()</sharding_key>
</table_copier_test1>
</tables>
</yandex>

View File

@ -230,6 +230,27 @@ class Task_no_arg:
instance = cluster.instances['s1_1_0']
instance.query("DROP TABLE copier_test1_1")
class Task_non_partitioned_table:
def __init__(self, cluster):
self.cluster = cluster
self.zk_task_path = "/clickhouse-copier/task_non_partitoned_table"
self.copier_task_config = open(os.path.join(CURRENT_TEST_DIR, 'task_non_partitioned_table.xml'), 'r').read()
self.rows = 1000000
def start(self):
instance = cluster.instances['s0_0_0']
instance.query(
"create table copier_test1 (date Date, id UInt32) engine = MergeTree ORDER BY date SETTINGS index_granularity = 8192")
instance.query("insert into copier_test1 values ('2016-01-01', 10);")
def check(self):
assert TSV(self.cluster.instances['s1_1_0'].query("SELECT date FROM copier_test1_1")) == TSV("2016-01-01\n")
instance = cluster.instances['s0_0_0']
instance.query("DROP TABLE copier_test1")
instance = cluster.instances['s1_1_0']
instance.query("DROP TABLE copier_test1_1")
def execute_task(task, cmd_options):
task.start()
@ -359,6 +380,8 @@ def test_no_index(started_cluster):
def test_no_arg(started_cluster):
execute_task(Task_no_arg(started_cluster), [])
def test_non_partitioned_table(started_cluster):
execute_task(Task_non_partitioned_table(started_cluster), [])
if __name__ == '__main__':
with contextmanager(started_cluster)() as cluster:

Some files were not shown because too many files have changed in this diff Show More