Merge branch 'fix-substr-zk-metadata' of github.com:sunny19930321/ClickHouse into fix-substr-zk-metadata

This commit is contained in:
liyang830 2022-01-27 16:35:02 +08:00
commit c2bdeb349c
66 changed files with 1371 additions and 377 deletions

View File

@ -1,8 +1,9 @@
self-hosted-runner: self-hosted-runner:
labels: labels:
- builder - builder
- func-tester
- func-tester-aarch64
- fuzzer-unit-tester - fuzzer-unit-tester
- stress-tester - stress-tester
- style-checker - style-checker
- func-tester-aarch64 - style-checker-aarch64
- func-tester

View File

@ -10,7 +10,7 @@ on: # yamllint disable-line rule:truthy
- 'backport/**' - 'backport/**'
jobs: jobs:
DockerHubPushAarch64: DockerHubPushAarch64:
runs-on: [self-hosted, func-tester-aarch64] runs-on: [self-hosted, style-checker-aarch64]
steps: steps:
- name: Clear repository - name: Clear repository
run: | run: |

View File

@ -30,7 +30,7 @@ jobs:
python3 run_check.py python3 run_check.py
DockerHubPushAarch64: DockerHubPushAarch64:
needs: CheckLabels needs: CheckLabels
runs-on: [self-hosted, func-tester-aarch64] runs-on: [self-hosted, style-checker-aarch64]
steps: steps:
- name: Clear repository - name: Clear repository
run: | run: |

View File

@ -20,7 +20,7 @@ on: # yamllint disable-line rule:truthy
workflow_dispatch: workflow_dispatch:
jobs: jobs:
DockerHubPushAarch64: DockerHubPushAarch64:
runs-on: [self-hosted, func-tester-aarch64] runs-on: [self-hosted, style-checker-aarch64]
steps: steps:
- name: Clear repository - name: Clear repository
run: | run: |

View File

@ -9,8 +9,20 @@ on: # yamllint disable-line rule:truthy
branches: branches:
- 'master' - 'master'
jobs: jobs:
PythonUnitTests:
runs-on: [self-hosted, style-checker]
steps:
- name: Clear repository
run: |
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
- name: Check out repository code
uses: actions/checkout@v2
- name: Python unit tests
run: |
cd "$GITHUB_WORKSPACE/tests/ci"
python3 -m unittest discover -s . -p '*_test.py'
DockerHubPushAarch64: DockerHubPushAarch64:
runs-on: [self-hosted, func-tester-aarch64] runs-on: [self-hosted, style-checker-aarch64]
steps: steps:
- name: Clear repository - name: Clear repository
run: | run: |
@ -44,7 +56,7 @@ jobs:
name: changed_images_amd64 name: changed_images_amd64
path: ${{ runner.temp }}/docker_images_check/changed_images_amd64.json path: ${{ runner.temp }}/docker_images_check/changed_images_amd64.json
DockerHubPush: DockerHubPush:
needs: [DockerHubPushAmd64, DockerHubPushAarch64] needs: [DockerHubPushAmd64, DockerHubPushAarch64, PythonUnitTests]
runs-on: [self-hosted, style-checker] runs-on: [self-hosted, style-checker]
steps: steps:
- name: Clear repository - name: Clear repository

View File

@ -31,9 +31,22 @@ jobs:
run: | run: |
cd "$GITHUB_WORKSPACE/tests/ci" cd "$GITHUB_WORKSPACE/tests/ci"
python3 run_check.py python3 run_check.py
PythonUnitTests:
needs: CheckLabels
runs-on: [self-hosted, style-checker]
steps:
- name: Clear repository
run: |
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
- name: Check out repository code
uses: actions/checkout@v2
- name: Python unit tests
run: |
cd "$GITHUB_WORKSPACE/tests/ci"
python3 -m unittest discover -s . -p '*_test.py'
DockerHubPushAarch64: DockerHubPushAarch64:
needs: CheckLabels needs: CheckLabels
runs-on: [self-hosted, func-tester-aarch64] runs-on: [self-hosted, style-checker-aarch64]
steps: steps:
- name: Clear repository - name: Clear repository
run: | run: |
@ -68,7 +81,7 @@ jobs:
name: changed_images_amd64 name: changed_images_amd64
path: ${{ runner.temp }}/docker_images_check/changed_images_amd64.json path: ${{ runner.temp }}/docker_images_check/changed_images_amd64.json
DockerHubPush: DockerHubPush:
needs: [DockerHubPushAmd64, DockerHubPushAarch64] needs: [DockerHubPushAmd64, DockerHubPushAarch64, PythonUnitTests]
runs-on: [self-hosted, style-checker] runs-on: [self-hosted, style-checker]
steps: steps:
- name: Clear repository - name: Clear repository

View File

@ -13,7 +13,7 @@ on: # yamllint disable-line rule:truthy
jobs: jobs:
DockerHubPushAarch64: DockerHubPushAarch64:
runs-on: [self-hosted, func-tester-aarch64] runs-on: [self-hosted, style-checker-aarch64]
steps: steps:
- name: Clear repository - name: Clear repository
run: | run: |

View File

@ -104,9 +104,8 @@ message (STATUS "CMAKE_BUILD_TYPE: ${CMAKE_BUILD_TYPE}")
string (TOUPPER ${CMAKE_BUILD_TYPE} CMAKE_BUILD_TYPE_UC) string (TOUPPER ${CMAKE_BUILD_TYPE} CMAKE_BUILD_TYPE_UC)
option(USE_STATIC_LIBRARIES "Disable to use shared libraries" ON) option(USE_STATIC_LIBRARIES "Disable to use shared libraries" ON)
option(MAKE_STATIC_LIBRARIES "Disable to make shared libraries" ${USE_STATIC_LIBRARIES})
if (NOT MAKE_STATIC_LIBRARIES) if (NOT USE_STATIC_LIBRARIES)
# DEVELOPER ONLY. # DEVELOPER ONLY.
# Faster linking if turned on. # Faster linking if turned on.
option(SPLIT_SHARED_LIBRARIES "Keep all internal libraries as separate .so files") option(SPLIT_SHARED_LIBRARIES "Keep all internal libraries as separate .so files")
@ -115,11 +114,11 @@ if (NOT MAKE_STATIC_LIBRARIES)
"Make several binaries (clickhouse-server, clickhouse-client etc.) instead of one bundled") "Make several binaries (clickhouse-server, clickhouse-client etc.) instead of one bundled")
endif () endif ()
if (MAKE_STATIC_LIBRARIES AND SPLIT_SHARED_LIBRARIES) if (USE_STATIC_LIBRARIES AND SPLIT_SHARED_LIBRARIES)
message(FATAL_ERROR "Defining SPLIT_SHARED_LIBRARIES=1 without MAKE_STATIC_LIBRARIES=0 has no effect.") message(FATAL_ERROR "Defining SPLIT_SHARED_LIBRARIES=1 without USE_STATIC_LIBRARIES=0 has no effect.")
endif() endif()
if (NOT MAKE_STATIC_LIBRARIES AND SPLIT_SHARED_LIBRARIES) if (NOT USE_STATIC_LIBRARIES AND SPLIT_SHARED_LIBRARIES)
set(BUILD_SHARED_LIBS 1 CACHE INTERNAL "") set(BUILD_SHARED_LIBS 1 CACHE INTERNAL "")
endif () endif ()
@ -201,21 +200,13 @@ endif ()
option(ENABLE_TESTS "Provide unit_test_dbms target with Google.Test unit tests" ON) option(ENABLE_TESTS "Provide unit_test_dbms target with Google.Test unit tests" ON)
option(ENABLE_EXAMPLES "Build all example programs in 'examples' subdirectories" OFF) option(ENABLE_EXAMPLES "Build all example programs in 'examples' subdirectories" OFF)
if (OS_LINUX AND (ARCH_AMD64 OR ARCH_AARCH64) AND MAKE_STATIC_LIBRARIES AND NOT SPLIT_SHARED_LIBRARIES AND NOT USE_MUSL) if (OS_LINUX AND (ARCH_AMD64 OR ARCH_AARCH64) AND USE_STATIC_LIBRARIES AND NOT SPLIT_SHARED_LIBRARIES AND NOT USE_MUSL)
# Only for Linux, x86_64 or aarch64. # Only for Linux, x86_64 or aarch64.
option(GLIBC_COMPATIBILITY "Enable compatibility with older glibc libraries." ON) option(GLIBC_COMPATIBILITY "Enable compatibility with older glibc libraries." ON)
elseif(GLIBC_COMPATIBILITY) elseif(GLIBC_COMPATIBILITY)
message (${RECONFIGURE_MESSAGE_LEVEL} "Glibc compatibility cannot be enabled in current configuration") message (${RECONFIGURE_MESSAGE_LEVEL} "Glibc compatibility cannot be enabled in current configuration")
endif () endif ()
if (GLIBC_COMPATIBILITY)
# NOTE: we may also want to check glibc version and add -include only for 2.32+
# however this is extra complexity, especially for cross compiling.
# And anyway it should not break anything for <2.32.
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -include ${CMAKE_CURRENT_SOURCE_DIR}/base/glibc-compatibility/glibc-compat-2.32.h")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -include ${CMAKE_CURRENT_SOURCE_DIR}/base/glibc-compatibility/glibc-compat-2.32.h")
endif()
# Make sure the final executable has symbols exported # Make sure the final executable has symbols exported
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -rdynamic") set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -rdynamic")
@ -444,7 +435,7 @@ endif ()
set (CMAKE_POSTFIX_VARIABLE "CMAKE_${CMAKE_BUILD_TYPE_UC}_POSTFIX") set (CMAKE_POSTFIX_VARIABLE "CMAKE_${CMAKE_BUILD_TYPE_UC}_POSTFIX")
if (MAKE_STATIC_LIBRARIES) if (USE_STATIC_LIBRARIES)
set (CMAKE_POSITION_INDEPENDENT_CODE OFF) set (CMAKE_POSITION_INDEPENDENT_CODE OFF)
if (OS_LINUX AND NOT ARCH_ARM) if (OS_LINUX AND NOT ARCH_ARM)
# Slightly more efficient code can be generated # Slightly more efficient code can be generated
@ -480,7 +471,6 @@ endif ()
message (STATUS message (STATUS
"Building for: ${CMAKE_SYSTEM} ${CMAKE_SYSTEM_PROCESSOR} ${CMAKE_LIBRARY_ARCHITECTURE} ; "Building for: ${CMAKE_SYSTEM} ${CMAKE_SYSTEM_PROCESSOR} ${CMAKE_LIBRARY_ARCHITECTURE} ;
USE_STATIC_LIBRARIES=${USE_STATIC_LIBRARIES} USE_STATIC_LIBRARIES=${USE_STATIC_LIBRARIES}
MAKE_STATIC_LIBRARIES=${MAKE_STATIC_LIBRARIES}
SPLIT_SHARED=${SPLIT_SHARED_LIBRARIES} SPLIT_SHARED=${SPLIT_SHARED_LIBRARIES}
CCACHE=${CCACHE_FOUND} ${CCACHE_VERSION}") CCACHE=${CCACHE_FOUND} ${CCACHE_VERSION}")
@ -528,7 +518,7 @@ macro (add_executable target)
# - _je_zone_register due to JEMALLOC_PRIVATE_NAMESPACE=je_ under OS X. # - _je_zone_register due to JEMALLOC_PRIVATE_NAMESPACE=je_ under OS X.
# - but jemalloc-cmake does not run private_namespace.sh # - but jemalloc-cmake does not run private_namespace.sh
# so symbol name should be _zone_register # so symbol name should be _zone_register
if (ENABLE_JEMALLOC AND MAKE_STATIC_LIBRARIES AND OS_DARWIN) if (ENABLE_JEMALLOC AND USE_STATIC_LIBRARIES AND OS_DARWIN)
set_property(TARGET ${target} APPEND PROPERTY LINK_OPTIONS -u_zone_register) set_property(TARGET ${target} APPEND PROPERTY LINK_OPTIONS -u_zone_register)
endif() endif()
endif() endif()

View File

@ -42,7 +42,7 @@ endif ()
target_include_directories(common PUBLIC .. "${CMAKE_CURRENT_BINARY_DIR}/..") target_include_directories(common PUBLIC .. "${CMAKE_CURRENT_BINARY_DIR}/..")
if (OS_DARWIN AND NOT MAKE_STATIC_LIBRARIES) if (OS_DARWIN AND NOT USE_STATIC_LIBRARIES)
target_link_libraries(common PUBLIC -Wl,-U,_inside_main) target_link_libraries(common PUBLIC -Wl,-U,_inside_main)
endif() endif()

View File

@ -6,7 +6,7 @@ add_library (daemon
target_include_directories (daemon PUBLIC ..) target_include_directories (daemon PUBLIC ..)
if (OS_DARWIN AND NOT MAKE_STATIC_LIBRARIES) if (OS_DARWIN AND NOT USE_STATIC_LIBRARIES)
target_link_libraries (daemon PUBLIC -Wl,-undefined,dynamic_lookup) target_link_libraries (daemon PUBLIC -Wl,-undefined,dynamic_lookup)
endif() endif()

View File

@ -37,7 +37,7 @@ if (GLIBC_COMPATIBILITY)
target_include_directories(glibc-compatibility PRIVATE libcxxabi ${musl_arch_include_dir}) target_include_directories(glibc-compatibility PRIVATE libcxxabi ${musl_arch_include_dir})
if (NOT USE_STATIC_LIBRARIES AND NOT MAKE_STATIC_LIBRARIES) if (NOT USE_STATIC_LIBRARIES AND NOT USE_STATIC_LIBRARIES)
target_compile_options(glibc-compatibility PRIVATE -fPIC) target_compile_options(glibc-compatibility PRIVATE -fPIC)
endif () endif ()

View File

@ -1,50 +0,0 @@
/// In glibc 2.32 new version of some symbols had been added [1]:
///
/// $ nm -D clickhouse | fgrep -e @GLIBC_2.32
/// U pthread_getattr_np@GLIBC_2.32
/// U pthread_sigmask@GLIBC_2.32
///
/// [1]: https://www.spinics.net/lists/fedora-devel/msg273044.html
///
/// Right now ubuntu 20.04 is used as official image for building
/// ClickHouse, however once it will be switched someone may not be happy
/// with that fact that he/she cannot use official binaries anymore because
/// they have glibc < 2.32.
///
/// To avoid this dependency, let's force previous version of those
/// symbols from glibc.
///
/// Also note, that the following approach had been tested:
/// a) -Wl,--wrap -- but it goes into endless recursion whey you try to do
/// something like this:
///
/// int __pthread_getattr_np_compact(pthread_t thread, pthread_attr_t *attr);
/// GLIBC_COMPAT_SYMBOL(__pthread_getattr_np_compact, pthread_getattr_np)
/// int __pthread_getattr_np_compact(pthread_t thread, pthread_attr_t *attr);
/// int __wrap_pthread_getattr_np(pthread_t thread, pthread_attr_t *attr)
/// {
/// return __pthread_getattr_np_compact(thread, attr);
/// }
///
/// int __pthread_sigmask_compact(int how, const sigset_t *set, sigset_t *oldset);
/// GLIBC_COMPAT_SYMBOL(__pthread_sigmask_compact, pthread_sigmask)
/// int __pthread_sigmask_compact(int how, const sigset_t *set, sigset_t *oldset);
/// int __wrap_pthread_sigmask(int how, const sigset_t *set, sigset_t *oldset)
/// {
/// return __pthread_sigmask_compact(how, set, oldset);
/// }
///
/// b) -Wl,--defsym -- same problems (and you cannot use version of symbol with
/// version in the expression)
/// c) this approach -- simply add this file with -include directive.
#if defined(__amd64__)
#define GLIBC_COMPAT_SYMBOL(func) __asm__(".symver " #func "," #func "@GLIBC_2.2.5");
#elif defined(__aarch64__)
#define GLIBC_COMPAT_SYMBOL(func) __asm__(".symver " #func "," #func "@GLIBC_2.17");
#else
#error Your platform is not supported.
#endif
GLIBC_COMPAT_SYMBOL(pthread_sigmask)
GLIBC_COMPAT_SYMBOL(pthread_getattr_np)

View File

@ -23,7 +23,7 @@ if (SANITIZE)
if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU") if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} ${ASAN_FLAGS}") set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} ${ASAN_FLAGS}")
endif() endif()
if (MAKE_STATIC_LIBRARIES AND CMAKE_CXX_COMPILER_ID STREQUAL "GNU") if (USE_STATIC_LIBRARIES AND CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libasan") set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libasan")
endif () endif ()
if (COMPILER_GCC) if (COMPILER_GCC)
@ -48,7 +48,7 @@ if (SANITIZE)
if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU") if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=memory") set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=memory")
endif() endif()
if (MAKE_STATIC_LIBRARIES AND CMAKE_CXX_COMPILER_ID STREQUAL "GNU") if (USE_STATIC_LIBRARIES AND CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libmsan") set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libmsan")
endif () endif ()
@ -69,7 +69,7 @@ if (SANITIZE)
if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU") if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=thread") set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=thread")
endif() endif()
if (MAKE_STATIC_LIBRARIES AND CMAKE_CXX_COMPILER_ID STREQUAL "GNU") if (USE_STATIC_LIBRARIES AND CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libtsan") set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libtsan")
endif () endif ()
if (COMPILER_GCC) if (COMPILER_GCC)
@ -101,7 +101,7 @@ if (SANITIZE)
if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU") if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=undefined") set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=undefined")
endif() endif()
if (MAKE_STATIC_LIBRARIES AND CMAKE_CXX_COMPILER_ID STREQUAL "GNU") if (USE_STATIC_LIBRARIES AND CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libubsan") set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libubsan")
endif () endif ()
if (COMPILER_GCC) if (COMPILER_GCC)

View File

@ -29,7 +29,7 @@ if (OS_FREEBSD)
message (FATAL_ERROR "Using internal parquet library on FreeBSD is not supported") message (FATAL_ERROR "Using internal parquet library on FreeBSD is not supported")
endif() endif()
if(MAKE_STATIC_LIBRARIES) if(USE_STATIC_LIBRARIES)
set(FLATBUFFERS_LIBRARY flatbuffers) set(FLATBUFFERS_LIBRARY flatbuffers)
else() else()
set(FLATBUFFERS_LIBRARY flatbuffers_shared) set(FLATBUFFERS_LIBRARY flatbuffers_shared)
@ -84,7 +84,7 @@ set(FLATBUFFERS_BINARY_DIR "${ClickHouse_BINARY_DIR}/contrib/flatbuffers")
set(FLATBUFFERS_INCLUDE_DIR "${FLATBUFFERS_SRC_DIR}/include") set(FLATBUFFERS_INCLUDE_DIR "${FLATBUFFERS_SRC_DIR}/include")
# set flatbuffers CMake options # set flatbuffers CMake options
if (MAKE_STATIC_LIBRARIES) if (USE_STATIC_LIBRARIES)
set(FLATBUFFERS_BUILD_FLATLIB ON CACHE BOOL "Enable the build of the flatbuffers library") set(FLATBUFFERS_BUILD_FLATLIB ON CACHE BOOL "Enable the build of the flatbuffers library")
set(FLATBUFFERS_BUILD_SHAREDLIB OFF CACHE BOOL "Disable the build of the flatbuffers shared library") set(FLATBUFFERS_BUILD_SHAREDLIB OFF CACHE BOOL "Disable the build of the flatbuffers shared library")
else () else ()

View File

@ -46,7 +46,7 @@ set(_gRPC_SSL_LIBRARIES OpenSSL::Crypto OpenSSL::SSL)
set(gRPC_ABSL_PROVIDER "clickhouse" CACHE STRING "" FORCE) set(gRPC_ABSL_PROVIDER "clickhouse" CACHE STRING "" FORCE)
# Choose to build static or shared library for c-ares. # Choose to build static or shared library for c-ares.
if (MAKE_STATIC_LIBRARIES) if (USE_STATIC_LIBRARIES)
set(CARES_STATIC ON CACHE BOOL "" FORCE) set(CARES_STATIC ON CACHE BOOL "" FORCE)
set(CARES_SHARED OFF CACHE BOOL "" FORCE) set(CARES_SHARED OFF CACHE BOOL "" FORCE)
else () else ()

View File

@ -133,7 +133,7 @@ add_library(ch_contrib::uv ALIAS _uv)
target_compile_definitions(_uv PRIVATE ${uv_defines}) target_compile_definitions(_uv PRIVATE ${uv_defines})
target_include_directories(_uv SYSTEM PUBLIC ${SOURCE_DIR}/include PRIVATE ${SOURCE_DIR}/src) target_include_directories(_uv SYSTEM PUBLIC ${SOURCE_DIR}/include PRIVATE ${SOURCE_DIR}/src)
target_link_libraries(_uv ${uv_libraries}) target_link_libraries(_uv ${uv_libraries})
if (NOT MAKE_STATIC_LIBRARIES) if (NOT USE_STATIC_LIBRARIES)
target_compile_definitions(_uv target_compile_definitions(_uv
INTERFACE USING_UV_SHARED=1 INTERFACE USING_UV_SHARED=1
PRIVATE BUILDING_UV_SHARED=1) PRIVATE BUILDING_UV_SHARED=1)

View File

@ -129,6 +129,10 @@ If you want to divide an existing ZooKeeper cluster into two, the correct way is
Do not run ZooKeeper on the same servers as ClickHouse. Because ZooKeeper is very sensitive for latency and ClickHouse may utilize all available system resources. Do not run ZooKeeper on the same servers as ClickHouse. Because ZooKeeper is very sensitive for latency and ClickHouse may utilize all available system resources.
You can have ZooKeeper observers in an ensemble but ClickHouse servers should not interact with observers.
Do not change `minSessionTimeout` setting, large values may affect ClickHouse restart stability.
With the default settings, ZooKeeper is a time bomb: With the default settings, ZooKeeper is a time bomb:
> The ZooKeeper server wont delete files from old snapshots and logs when using the default configuration (see autopurge), and this is the responsibility of the operator. > The ZooKeeper server wont delete files from old snapshots and logs when using the default configuration (see autopurge), and this is the responsibility of the operator.

View File

@ -1 +0,0 @@
../../../en/faq/general/columnar-database.md

View File

@ -0,0 +1,25 @@
---
title: 什么是列存储数据库?
toc_hidden: true
toc_priority: 101
---
# 什么是列存储数据库? {#what-is-a-columnar-database}
列存储数据库独立存储每个列的数据。这只允许从磁盘读取任何给定查询中使用的列的数据。其代价是影响整行的操作会按比例变得更昂贵。列存储数据库的同义词是面向列的数据库管理系统。ClickHouse就是这样一个典型的例子。
列存储数据库的主要优点是:
- 查询只使用许多列其中的少数列。
— 聚合对大量数据的查询。
— 按列压缩。
下面是构建报表时传统的面向行系统和柱状数据库之间的区别:
**传统行存储**
!(传统行存储)(https://clickhouse.com/docs/en/images/row-oriented.gif)
**列存储**
!(列存储)(https://clickhouse.com/docs/en/images/column-oriented.gif)
列存储数据库是分析应用程序的首选因为它允许在一个表中有许多列以防万一但不会在读取查询执行时为未使用的列付出代价。面向列的数据库是为大数据处理而设计的因为和数据仓库一样它们通常使用分布式的低成本硬件集群来提高吞吐量。ClickHouse结合了[分布式](../../engines/table-engines/special/distributed.md)和[复制式](../../engines/table-engines/mergetree-family/replication.md)两类表。

View File

@ -160,7 +160,7 @@ else()
message(STATUS "ClickHouse keeper-converter mode: OFF") message(STATUS "ClickHouse keeper-converter mode: OFF")
endif() endif()
if(NOT (MAKE_STATIC_LIBRARIES OR SPLIT_SHARED_LIBRARIES)) if(NOT (USE_STATIC_LIBRARIES OR SPLIT_SHARED_LIBRARIES))
set(CLICKHOUSE_ONE_SHARED ON) set(CLICKHOUSE_ONE_SHARED ON)
endif() endif()

View File

@ -70,11 +70,11 @@ static DataTypes convertLowCardinalityTypesToNested(const DataTypes & types)
AggregateFunctionPtr AggregateFunctionFactory::get( AggregateFunctionPtr AggregateFunctionFactory::get(
const String & name, const DataTypes & argument_types, const Array & parameters, AggregateFunctionProperties & out_properties) const const String & name, const DataTypes & argument_types, const Array & parameters, AggregateFunctionProperties & out_properties) const
{ {
auto type_without_low_cardinality = convertLowCardinalityTypesToNested(argument_types); auto types_without_low_cardinality = convertLowCardinalityTypesToNested(argument_types);
/// If one of the types is Nullable, we apply aggregate function combinator "Null". /// If one of the types is Nullable, we apply aggregate function combinator "Null".
if (std::any_of(type_without_low_cardinality.begin(), type_without_low_cardinality.end(), if (std::any_of(types_without_low_cardinality.begin(), types_without_low_cardinality.end(),
[](const auto & type) { return type->isNullable(); })) [](const auto & type) { return type->isNullable(); }))
{ {
AggregateFunctionCombinatorPtr combinator = AggregateFunctionCombinatorFactory::instance().tryFindSuffix("Null"); AggregateFunctionCombinatorPtr combinator = AggregateFunctionCombinatorFactory::instance().tryFindSuffix("Null");
@ -82,10 +82,10 @@ AggregateFunctionPtr AggregateFunctionFactory::get(
throw Exception("Logical error: cannot find aggregate function combinator to apply a function to Nullable arguments.", throw Exception("Logical error: cannot find aggregate function combinator to apply a function to Nullable arguments.",
ErrorCodes::LOGICAL_ERROR); ErrorCodes::LOGICAL_ERROR);
DataTypes nested_types = combinator->transformArguments(type_without_low_cardinality); DataTypes nested_types = combinator->transformArguments(types_without_low_cardinality);
Array nested_parameters = combinator->transformParameters(parameters); Array nested_parameters = combinator->transformParameters(parameters);
bool has_null_arguments = std::any_of(type_without_low_cardinality.begin(), type_without_low_cardinality.end(), bool has_null_arguments = std::any_of(types_without_low_cardinality.begin(), types_without_low_cardinality.end(),
[](const auto & type) { return type->onlyNull(); }); [](const auto & type) { return type->onlyNull(); });
AggregateFunctionPtr nested_function = getImpl( AggregateFunctionPtr nested_function = getImpl(
@ -97,13 +97,10 @@ AggregateFunctionPtr AggregateFunctionFactory::get(
// that are rewritten to AggregateFunctionNothing, in this case // that are rewritten to AggregateFunctionNothing, in this case
// nested_function is nullptr. // nested_function is nullptr.
if (!nested_function || !nested_function->isOnlyWindowFunction()) if (!nested_function || !nested_function->isOnlyWindowFunction())
{ return combinator->transformAggregateFunction(nested_function, out_properties, types_without_low_cardinality, parameters);
return combinator->transformAggregateFunction(nested_function,
out_properties, type_without_low_cardinality, parameters);
}
} }
auto with_original_arguments = getImpl(name, type_without_low_cardinality, parameters, out_properties, false); auto with_original_arguments = getImpl(name, types_without_low_cardinality, parameters, out_properties, false);
if (!with_original_arguments) if (!with_original_arguments)
throw Exception("Logical error: AggregateFunctionFactory returned nullptr", ErrorCodes::LOGICAL_ERROR); throw Exception("Logical error: AggregateFunctionFactory returned nullptr", ErrorCodes::LOGICAL_ERROR);

View File

@ -40,28 +40,6 @@ public:
} }
}; };
/** Given an array of flags, checks if it's all zeros
* When the buffer is all zeros, this is slightly faster than doing a memcmp since doesn't require allocating memory
* When the buffer has values, this is much faster since it avoids visiting all memory (and the allocation and function calls)
*/
static bool ALWAYS_INLINE inline is_all_zeros(const UInt8 * flags, size_t size)
{
size_t unroll_size = size - size % 8;
size_t i = 0;
while (i < unroll_size)
{
UInt64 v = *reinterpret_cast<const UInt64 *>(&flags[i]);
if (v)
return false;
i += 8;
}
for (; i < size; ++i)
if (flags[i])
return false;
return true;
}
/** There are two cases: for single argument and variadic. /** There are two cases: for single argument and variadic.
* Code for single argument is much more efficient. * Code for single argument is much more efficient.
@ -73,6 +51,7 @@ class AggregateFunctionIfNullUnary final
{ {
private: private:
size_t num_arguments; size_t num_arguments;
bool filter_is_nullable = false;
/// The name of the nested function, including combinators (i.e. *If) /// The name of the nested function, including combinators (i.e. *If)
/// ///
@ -92,8 +71,26 @@ private:
using Base = AggregateFunctionNullBase<result_is_nullable, serialize_flag, using Base = AggregateFunctionNullBase<result_is_nullable, serialize_flag,
AggregateFunctionIfNullUnary<result_is_nullable, serialize_flag>>; AggregateFunctionIfNullUnary<result_is_nullable, serialize_flag>>;
public:
inline bool singleFilter(const IColumn ** columns, size_t row_num, size_t num_arguments) const
{
const IColumn * filter_column = columns[num_arguments - 1];
if (filter_is_nullable)
{
const ColumnNullable * nullable_column = assert_cast<const ColumnNullable *>(filter_column);
filter_column = nullable_column->getNestedColumnPtr().get();
const UInt8 * filter_null_map = nullable_column->getNullMapData().data();
return assert_cast<const ColumnUInt8 &>(*filter_column).getData()[row_num] && !filter_null_map[row_num];
}
else
{
return assert_cast<const ColumnUInt8 &>(*filter_column).getData()[row_num];
}
}
public:
String getName() const override String getName() const override
{ {
return name; return name;
@ -105,17 +102,10 @@ public:
, name(name_) , name(name_)
{ {
if (num_arguments == 0) if (num_arguments == 0)
throw Exception("Aggregate function " + getName() + " require at least one argument", throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); "Aggregate function {} require at least one argument", getName());
}
static inline bool singleFilter(const IColumn ** columns, size_t row_num, size_t num_arguments) filter_is_nullable = arguments[num_arguments - 1]->isNullable();
{
const IColumn * filter_column = columns[num_arguments - 1];
if (const ColumnNullable * nullable_column = typeid_cast<const ColumnNullable *>(filter_column))
filter_column = nullable_column->getNestedColumnPtr().get();
return assert_cast<const ColumnUInt8 &>(*filter_column).getData()[row_num];
} }
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override
@ -136,29 +126,41 @@ public:
const IColumn * columns_param[] = {&column->getNestedColumn()}; const IColumn * columns_param[] = {&column->getNestedColumn()};
const IColumn * filter_column = columns[num_arguments - 1]; const IColumn * filter_column = columns[num_arguments - 1];
if (const ColumnNullable * nullable_column = typeid_cast<const ColumnNullable *>(filter_column))
filter_column = nullable_column->getNestedColumnPtr().get(); const UInt8 * filter_values = nullptr;
if constexpr (result_is_nullable) const UInt8 * filter_null_map = nullptr;
if (filter_is_nullable)
{ {
/// We need to check if there is work to do as otherwise setting the flag would be a mistake, const ColumnNullable * nullable_column = assert_cast<const ColumnNullable *>(filter_column);
/// it would mean that the return value would be the default value of the nested type instead of NULL filter_column = nullable_column->getNestedColumnPtr().get();
if (is_all_zeros(assert_cast<const ColumnUInt8 *>(filter_column)->getData().data(), batch_size)) filter_null_map = nullable_column->getNullMapData().data();
return;
} }
filter_values = assert_cast<const ColumnUInt8 *>(filter_column)->getData().data();
/// Combine the 2 flag arrays so we can call a simplified version (one check vs 2) /// Combine the 2 flag arrays so we can call a simplified version (one check vs 2)
/// Note that now the null map will contain 0 if not null and not filtered, or 1 for null or filtered (or both) /// Note that now the null map will contain 0 if not null and not filtered, or 1 for null or filtered (or both)
const auto * filter_flags = assert_cast<const ColumnUInt8 *>(filter_column)->getData().data();
auto final_nulls = std::make_unique<UInt8[]>(batch_size); auto final_nulls = std::make_unique<UInt8[]>(batch_size);
for (size_t i = 0; i < batch_size; ++i)
final_nulls[i] = (!!null_map[i]) | (!filter_flags[i]); if (filter_null_map)
for (size_t i = 0; i < batch_size; ++i)
final_nulls[i] = (!!null_map[i]) | (!filter_values[i]) | (!!filter_null_map[i]);
else
for (size_t i = 0; i < batch_size; ++i)
final_nulls[i] = (!!null_map[i]) | (!filter_values[i]);
if constexpr (result_is_nullable)
{
if (!memoryIsByte(final_nulls.get(), batch_size, 1))
this->setFlag(place);
else
return; /// No work to do.
}
this->nested_function->addBatchSinglePlaceNotNull( this->nested_function->addBatchSinglePlaceNotNull(
batch_size, this->nestedPlace(place), columns_param, final_nulls.get(), arena, -1); batch_size, this->nestedPlace(place), columns_param, final_nulls.get(), arena, -1);
if constexpr (result_is_nullable)
if (!memoryIsByte(null_map, batch_size, 1))
this->setFlag(place);
} }
#if USE_EMBEDDED_COMPILER #if USE_EMBEDDED_COMPILER
@ -367,10 +369,14 @@ AggregateFunctionPtr AggregateFunctionIf::getOwnNullAdapter(
const AggregateFunctionPtr & nested_function, const DataTypes & arguments, const AggregateFunctionPtr & nested_function, const DataTypes & arguments,
const Array & params, const AggregateFunctionProperties & properties) const const Array & params, const AggregateFunctionProperties & properties) const
{ {
bool return_type_is_nullable = !properties.returns_default_when_only_null && getReturnType()->canBeInsideNullable(); assert(!arguments.empty());
size_t nullable_size = std::count_if(arguments.begin(), arguments.end(), [](const auto & element) { return element->isNullable(); });
return_type_is_nullable &= nullable_size != 1 || !arguments.back()->isNullable(); /// If only condition is nullable. we should non-nullable type. /// Nullability of the last argument (condition) does not affect the nullability of the result (NULL is processed as false).
bool serialize_flag = return_type_is_nullable || properties.returns_default_when_only_null; /// For other arguments it is as usual (at least one is NULL then the result is NULL if possible).
bool return_type_is_nullable = !properties.returns_default_when_only_null && getReturnType()->canBeInsideNullable()
&& std::any_of(arguments.begin(), arguments.end() - 1, [](const auto & element) { return element->isNullable(); });
bool need_to_serialize_flag = return_type_is_nullable || properties.returns_default_when_only_null;
if (arguments.size() <= 2 && arguments.front()->isNullable()) if (arguments.size() <= 2 && arguments.front()->isNullable())
{ {
@ -380,7 +386,7 @@ AggregateFunctionPtr AggregateFunctionIf::getOwnNullAdapter(
} }
else else
{ {
if (serialize_flag) if (need_to_serialize_flag)
return std::make_shared<AggregateFunctionIfNullUnary<false, true>>(nested_function->getName(), nested_func, arguments, params); return std::make_shared<AggregateFunctionIfNullUnary<false, true>>(nested_function->getName(), nested_func, arguments, params);
else else
return std::make_shared<AggregateFunctionIfNullUnary<false, false>>(nested_function->getName(), nested_func, arguments, params); return std::make_shared<AggregateFunctionIfNullUnary<false, false>>(nested_function->getName(), nested_func, arguments, params);
@ -394,7 +400,7 @@ AggregateFunctionPtr AggregateFunctionIf::getOwnNullAdapter(
} }
else else
{ {
if (serialize_flag) if (need_to_serialize_flag)
return std::make_shared<AggregateFunctionIfNullVariadic<false, true, true>>(nested_function, arguments, params); return std::make_shared<AggregateFunctionIfNullVariadic<false, true, true>>(nested_function, arguments, params);
else else
return std::make_shared<AggregateFunctionIfNullVariadic<false, false, true>>(nested_function, arguments, params); return std::make_shared<AggregateFunctionIfNullVariadic<false, false, true>>(nested_function, arguments, params);

View File

@ -11,7 +11,7 @@ if(COMPILER_PIPE)
else() else()
set(MAX_COMPILER_MEMORY 1500) set(MAX_COMPILER_MEMORY 1500)
endif() endif()
if(MAKE_STATIC_LIBRARIES) if(USE_STATIC_LIBRARIES)
set(MAX_LINKER_MEMORY 3500) set(MAX_LINKER_MEMORY 3500)
else() else()
set(MAX_LINKER_MEMORY 2500) set(MAX_LINKER_MEMORY 2500)
@ -193,7 +193,7 @@ add_subdirectory(Common/Config)
set (all_modules) set (all_modules)
macro(add_object_library name common_path) macro(add_object_library name common_path)
if (MAKE_STATIC_LIBRARIES OR NOT SPLIT_SHARED_LIBRARIES) if (USE_STATIC_LIBRARIES OR NOT SPLIT_SHARED_LIBRARIES)
add_headers_and_sources(dbms ${common_path}) add_headers_and_sources(dbms ${common_path})
else () else ()
list (APPEND all_modules ${name}) list (APPEND all_modules ${name})
@ -254,7 +254,7 @@ endif()
set (DBMS_COMMON_LIBRARIES) set (DBMS_COMMON_LIBRARIES)
if (MAKE_STATIC_LIBRARIES OR NOT SPLIT_SHARED_LIBRARIES) if (USE_STATIC_LIBRARIES OR NOT SPLIT_SHARED_LIBRARIES)
add_library (dbms STATIC ${dbms_headers} ${dbms_sources}) add_library (dbms STATIC ${dbms_headers} ${dbms_sources})
target_link_libraries (dbms PRIVATE ch_contrib::libdivide ${DBMS_COMMON_LIBRARIES}) target_link_libraries (dbms PRIVATE ch_contrib::libdivide ${DBMS_COMMON_LIBRARIES})
if (TARGET ch_contrib::jemalloc) if (TARGET ch_contrib::jemalloc)

View File

@ -609,6 +609,7 @@
M(638, SNAPPY_UNCOMPRESS_FAILED) \ M(638, SNAPPY_UNCOMPRESS_FAILED) \
M(639, SNAPPY_COMPRESS_FAILED) \ M(639, SNAPPY_COMPRESS_FAILED) \
M(640, NO_HIVEMETASTORE) \ M(640, NO_HIVEMETASTORE) \
M(641, CANNOT_APPEND_TO_FILE) \
\ \
M(999, KEEPER_EXCEPTION) \ M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \ M(1000, POCO_EXCEPTION) \

View File

@ -216,7 +216,7 @@ void KeeperStateMachine::create_snapshot(
std::lock_guard lock(storage_and_responses_lock); std::lock_guard lock(storage_and_responses_lock);
LOG_TRACE(log, "Clearing garbage after snapshot"); LOG_TRACE(log, "Clearing garbage after snapshot");
/// Turn off "snapshot mode" and clear outdate part of storage state /// Turn off "snapshot mode" and clear outdate part of storage state
storage->clearGarbageAfterSnapshot(snapshot->snapshot_container_size); storage->clearGarbageAfterSnapshot();
LOG_TRACE(log, "Cleared garbage after snapshot"); LOG_TRACE(log, "Cleared garbage after snapshot");
snapshot.reset(); snapshot.reset();
} }

View File

@ -177,9 +177,9 @@ public:
} }
/// Clear outdated data from internal container. /// Clear outdated data from internal container.
void clearGarbageAfterSnapshot(size_t up_to_size) void clearGarbageAfterSnapshot()
{ {
container.clearOutdatedNodes(up_to_size); container.clearOutdatedNodes();
} }
/// Get all active sessions /// Get all active sessions

View File

@ -33,6 +33,7 @@ private:
List list; List list;
IndexMap map; IndexMap map;
bool snapshot_mode{false}; bool snapshot_mode{false};
/// Allows to avoid additional copies in updateValue function
size_t snapshot_up_to_size = 0; size_t snapshot_up_to_size = 0;
ArenaWithFreeLists arena; ArenaWithFreeLists arena;
@ -226,8 +227,10 @@ public:
if (snapshot_mode) if (snapshot_mode)
{ {
/// We in snapshot mode but updating some node which is already more
/// fresh than snapshot distance. So it will not participate in
/// snapshot and we don't need to copy it.
size_t distance = std::distance(list.begin(), list_itr); size_t distance = std::distance(list.begin(), list_itr);
if (distance < snapshot_up_to_size) if (distance < snapshot_up_to_size)
{ {
auto elem_copy = *(list_itr); auto elem_copy = *(list_itr);
@ -269,11 +272,11 @@ public:
return it->getMapped()->value; return it->getMapped()->value;
} }
void clearOutdatedNodes(size_t up_to_size) void clearOutdatedNodes()
{ {
auto start = list.begin(); auto start = list.begin();
size_t counter = 0; auto end = list.end();
for (auto itr = start; counter < up_to_size; ++counter) for (auto itr = start; itr != end;)
{ {
if (!itr->active_in_map) if (!itr->active_in_map)
{ {
@ -288,7 +291,6 @@ public:
itr++; itr++;
} }
} }
} }
void clear() void clear()
@ -308,6 +310,7 @@ public:
void disableSnapshotMode() void disableSnapshotMode()
{ {
snapshot_mode = false; snapshot_mode = false;
snapshot_up_to_size = 0; snapshot_up_to_size = 0;
} }

View File

@ -908,7 +908,7 @@ TEST_P(CoordinationTest, SnapshotableHashMapTrySnapshot)
EXPECT_EQ(itr->active_in_map, i != 3 && i != 2); EXPECT_EQ(itr->active_in_map, i != 3 && i != 2);
itr = std::next(itr); itr = std::next(itr);
} }
map_snp.clearOutdatedNodes(map_snp.snapshotSize()); map_snp.clearOutdatedNodes();
EXPECT_EQ(map_snp.snapshotSize(), 4); EXPECT_EQ(map_snp.snapshotSize(), 4);
EXPECT_EQ(map_snp.size(), 4); EXPECT_EQ(map_snp.size(), 4);
@ -957,13 +957,13 @@ TEST_P(CoordinationTest, SnapshotableHashMapDataSize)
hello.updateValue("hello", [](IntNode & value) { value = 2; }); hello.updateValue("hello", [](IntNode & value) { value = 2; });
EXPECT_EQ(hello.getApproximateDataSize(), 18); EXPECT_EQ(hello.getApproximateDataSize(), 18);
hello.clearOutdatedNodes(hello.snapshotSize()); hello.clearOutdatedNodes();
EXPECT_EQ(hello.getApproximateDataSize(), 9); EXPECT_EQ(hello.getApproximateDataSize(), 9);
hello.erase("hello"); hello.erase("hello");
EXPECT_EQ(hello.getApproximateDataSize(), 9); EXPECT_EQ(hello.getApproximateDataSize(), 9);
hello.clearOutdatedNodes(hello.snapshotSize()); hello.clearOutdatedNodes();
EXPECT_EQ(hello.getApproximateDataSize(), 0); EXPECT_EQ(hello.getApproximateDataSize(), 0);
/// Node /// Node
@ -990,7 +990,7 @@ TEST_P(CoordinationTest, SnapshotableHashMapDataSize)
world.updateValue("world", [&](Node & value) { value = n2; }); world.updateValue("world", [&](Node & value) { value = n2; });
EXPECT_EQ(world.getApproximateDataSize(), 196); EXPECT_EQ(world.getApproximateDataSize(), 196);
world.clearOutdatedNodes(world.snapshotSize()); world.clearOutdatedNodes();
EXPECT_EQ(world.getApproximateDataSize(), 98); EXPECT_EQ(world.getApproximateDataSize(), 98);
world.erase("world"); world.erase("world");
@ -1170,7 +1170,7 @@ TEST_P(CoordinationTest, TestStorageSnapshotMode)
} }
EXPECT_TRUE(fs::exists("./snapshots/snapshot_50.bin" + params.extension)); EXPECT_TRUE(fs::exists("./snapshots/snapshot_50.bin" + params.extension));
EXPECT_EQ(storage.container.size(), 26); EXPECT_EQ(storage.container.size(), 26);
storage.clearGarbageAfterSnapshot(storage.container.snapshotSize()); storage.clearGarbageAfterSnapshot();
EXPECT_EQ(storage.container.snapshotSize(), 26); EXPECT_EQ(storage.container.snapshotSize(), 26);
for (size_t i = 0; i < 50; ++i) for (size_t i = 0; i < 50; ++i)
{ {

View File

@ -75,7 +75,11 @@ class IColumn;
M(UInt64, s3_max_single_read_retries, 4, "The maximum number of retries during single S3 read.", 0) \ M(UInt64, s3_max_single_read_retries, 4, "The maximum number of retries during single S3 read.", 0) \
M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \ M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \
M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \ M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \
M(Bool, s3_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables.", 0) \
M(Bool, s3_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in s3 engine tables", 0) \
M(UInt64, hdfs_replication, 0, "The actual number of replications can be specified when the hdfs file is created.", 0) \ M(UInt64, hdfs_replication, 0, "The actual number of replications can be specified when the hdfs file is created.", 0) \
M(Bool, hdfs_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables", 0) \
M(Bool, hdfs_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in hdfs engine tables", 0) \
M(UInt64, hsts_max_age, 0, "Expired time for hsts. 0 means disable HSTS.", 0) \ M(UInt64, hsts_max_age, 0, "Expired time for hsts. 0 means disable HSTS.", 0) \
M(Bool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.", IMPORTANT) \ M(Bool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.", IMPORTANT) \
M(Bool, use_uncompressed_cache, false, "Whether to use the cache of uncompressed blocks.", 0) \ M(Bool, use_uncompressed_cache, false, "Whether to use the cache of uncompressed blocks.", 0) \
@ -490,6 +494,7 @@ class IColumn;
\ \
M(Bool, engine_file_empty_if_not_exists, false, "Allows to select data from a file engine table without file", 0) \ M(Bool, engine_file_empty_if_not_exists, false, "Allows to select data from a file engine table without file", 0) \
M(Bool, engine_file_truncate_on_insert, false, "Enables or disables truncate before insert in file engine tables", 0) \ M(Bool, engine_file_truncate_on_insert, false, "Enables or disables truncate before insert in file engine tables", 0) \
M(Bool, engine_file_allow_create_multiple_files, false, "Enables or disables creating a new file on each insert in file engine tables if format has suffix.", 0) \
M(Bool, allow_experimental_database_replicated, false, "Allow to create databases with Replicated engine", 0) \ M(Bool, allow_experimental_database_replicated, false, "Allow to create databases with Replicated engine", 0) \
M(UInt64, database_replicated_initial_query_timeout_sec, 300, "How long initial DDL query should wait for Replicated database to precess previous DDL queue entries", 0) \ M(UInt64, database_replicated_initial_query_timeout_sec, 300, "How long initial DDL query should wait for Replicated database to precess previous DDL queue entries", 0) \
M(UInt64, max_distributed_depth, 5, "Maximum distributed query depth", 0) \ M(UInt64, max_distributed_depth, 5, "Maximum distributed query depth", 0) \

View File

@ -394,6 +394,27 @@ void FormatFactory::registerNonTrivialPrefixAndSuffixChecker(const String & name
target = std::move(non_trivial_prefix_and_suffix_checker); target = std::move(non_trivial_prefix_and_suffix_checker);
} }
void FormatFactory::registerAppendSupportChecker(const String & name, AppendSupportChecker append_support_checker)
{
auto & target = dict[name].append_support_checker;
if (target)
throw Exception("FormatFactory: Suffix checker " + name + " is already registered", ErrorCodes::LOGICAL_ERROR);
target = std::move(append_support_checker);
}
void FormatFactory::markFormatHasNoAppendSupport(const String & name)
{
registerAppendSupportChecker(name, [](const FormatSettings &){ return false; });
}
bool FormatFactory::checkIfFormatSupportAppend(const String & name, ContextPtr context, const std::optional<FormatSettings> & format_settings_)
{
auto format_settings = format_settings_ ? *format_settings_ : getFormatSettings(context);
auto & append_support_checker = dict[name].append_support_checker;
/// By default we consider that format supports append
return !append_support_checker || append_support_checker(format_settings);
}
void FormatFactory::registerOutputFormat(const String & name, OutputCreator output_creator) void FormatFactory::registerOutputFormat(const String & name, OutputCreator output_creator)
{ {
auto & target = dict[name].output_creator; auto & target = dict[name].output_creator;

View File

@ -93,6 +93,10 @@ private:
/// The checker should return true if parallel parsing should be disabled. /// The checker should return true if parallel parsing should be disabled.
using NonTrivialPrefixAndSuffixChecker = std::function<bool(ReadBuffer & buf)>; using NonTrivialPrefixAndSuffixChecker = std::function<bool(ReadBuffer & buf)>;
/// Some formats can support append depending on settings.
/// The checker should return true if format support append.
using AppendSupportChecker = std::function<bool(const FormatSettings & settings)>;
using SchemaReaderCreator = std::function<SchemaReaderPtr(ReadBuffer & in, const FormatSettings & settings, ContextPtr context)>; using SchemaReaderCreator = std::function<SchemaReaderPtr(ReadBuffer & in, const FormatSettings & settings, ContextPtr context)>;
using ExternalSchemaReaderCreator = std::function<ExternalSchemaReaderPtr(const FormatSettings & settings)>; using ExternalSchemaReaderCreator = std::function<ExternalSchemaReaderPtr(const FormatSettings & settings)>;
@ -106,6 +110,7 @@ private:
bool supports_parallel_formatting{false}; bool supports_parallel_formatting{false};
bool is_column_oriented{false}; bool is_column_oriented{false};
NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker; NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker;
AppendSupportChecker append_support_checker;
}; };
using FormatsDictionary = std::unordered_map<String, Creators>; using FormatsDictionary = std::unordered_map<String, Creators>;
@ -167,6 +172,14 @@ public:
void registerNonTrivialPrefixAndSuffixChecker(const String & name, NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker); void registerNonTrivialPrefixAndSuffixChecker(const String & name, NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker);
void registerAppendSupportChecker(const String & name, AppendSupportChecker append_support_checker);
/// If format always doesn't support append, you can use this method instead of
/// registerAppendSupportChecker with append_support_checker that always returns true.
void markFormatHasNoAppendSupport(const String & name);
bool checkIfFormatSupportAppend(const String & name, ContextPtr context, const std::optional<FormatSettings> & format_settings_ = std::nullopt);
/// Register format by its name. /// Register format by its name.
void registerInputFormat(const String & name, InputCreator input_creator); void registerInputFormat(const String & name, InputCreator input_creator);
void registerOutputFormat(const String & name, OutputCreator output_creator); void registerOutputFormat(const String & name, OutputCreator output_creator);

View File

@ -109,6 +109,23 @@ void AsynchronousMetrics::openSensors()
else else
break; break;
} }
file->rewind();
Int64 temperature = 0;
try
{
readText(temperature, *file);
}
catch (const ErrnoException & e)
{
LOG_WARNING(
&Poco::Logger::get("AsynchronousMetrics"),
"Thermal monitor '{}' exists but could not be read, error {}.",
thermal_device_index,
e.getErrno());
continue;
}
thermal.emplace_back(std::move(file)); thermal.emplace_back(std::move(file));
} }
} }
@ -222,6 +239,23 @@ void AsynchronousMetrics::openSensorsChips()
std::replace(sensor_name.begin(), sensor_name.end(), ' ', '_'); std::replace(sensor_name.begin(), sensor_name.end(), ' ', '_');
} }
file->rewind();
Int64 temperature = 0;
try
{
readText(temperature, *file);
}
catch (const ErrnoException & e)
{
LOG_WARNING(
&Poco::Logger::get("AsynchronousMetrics"),
"Hardware monitor '{}', sensor '{}' exists but could not be read, error {}.",
hwmon_name,
sensor_name,
e.getErrno());
continue;
}
hwmon_devices[hwmon_name][sensor_name] = std::move(file); hwmon_devices[hwmon_name][sensor_name] = std::move(file);
} }
} }

View File

@ -56,7 +56,6 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED; extern const int NOT_IMPLEMENTED;
extern const int TIMEOUT_EXCEEDED; extern const int TIMEOUT_EXCEEDED;
extern const int TABLE_WAS_NOT_DROPPED; extern const int TABLE_WAS_NOT_DROPPED;
extern const int NO_ZOOKEEPER;
} }
@ -472,12 +471,6 @@ void InterpreterSystemQuery::restoreReplica()
{ {
getContext()->checkAccess(AccessType::SYSTEM_RESTORE_REPLICA, table_id); getContext()->checkAccess(AccessType::SYSTEM_RESTORE_REPLICA, table_id);
const zkutil::ZooKeeperPtr & zookeeper = getContext()->getZooKeeper();
if (zookeeper->expired())
throw Exception(ErrorCodes::NO_ZOOKEEPER,
"Cannot restore table metadata because ZooKeeper session has expired");
const StoragePtr table_ptr = DatabaseCatalog::instance().getTable(table_id, getContext()); const StoragePtr table_ptr = DatabaseCatalog::instance().getTable(table_id, getContext());
auto * const table_replicated_ptr = dynamic_cast<StorageReplicatedMergeTree *>(table_ptr.get()); auto * const table_replicated_ptr = dynamic_cast<StorageReplicatedMergeTree *>(table_ptr.get());
@ -485,24 +478,7 @@ void InterpreterSystemQuery::restoreReplica()
if (table_replicated_ptr == nullptr) if (table_replicated_ptr == nullptr)
throw Exception(ErrorCodes::BAD_ARGUMENTS, table_is_not_replicated.data(), table_id.getNameForLogs()); throw Exception(ErrorCodes::BAD_ARGUMENTS, table_is_not_replicated.data(), table_id.getNameForLogs());
auto & table_replicated = *table_replicated_ptr; table_replicated_ptr->restoreMetadataInZooKeeper();
StorageReplicatedMergeTree::Status status;
table_replicated.getStatus(status);
if (!status.is_readonly)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica must be readonly");
const String replica_name = table_replicated.getReplicaName();
const String& zk_root_path = status.zookeeper_path;
if (String replica_path = zk_root_path + "replicas/" + replica_name; zookeeper->exists(replica_path))
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Replica path is present at {} -- nothing to restore. "
"If you are sure that metadata it lost and replica path contain some garbage, "
"then use SYSTEM DROP REPLICA query first.", replica_path);
table_replicated.restoreMetadataInZooKeeper();
} }
StoragePtr InterpreterSystemQuery::tryRestartReplica(const StorageID & replica, ContextMutablePtr system_context, bool need_ddl_guard) StoragePtr InterpreterSystemQuery::tryRestartReplica(const StorageID & replica, ContextMutablePtr system_context, bool need_ddl_guard)

View File

@ -93,6 +93,7 @@ void registerOutputFormatArrow(FormatFactory & factory)
{ {
return std::make_shared<ArrowBlockOutputFormat>(buf, sample, false, format_settings); return std::make_shared<ArrowBlockOutputFormat>(buf, sample, false, format_settings);
}); });
factory.markFormatHasNoAppendSupport("Arrow");
factory.registerOutputFormat( factory.registerOutputFormat(
"ArrowStream", "ArrowStream",
@ -103,6 +104,7 @@ void registerOutputFormatArrow(FormatFactory & factory)
{ {
return std::make_shared<ArrowBlockOutputFormat>(buf, sample, true, format_settings); return std::make_shared<ArrowBlockOutputFormat>(buf, sample, true, format_settings);
}); });
factory.markFormatHasNoAppendSupport("ArrowStream");
} }
} }

View File

@ -479,6 +479,7 @@ void registerOutputFormatAvro(FormatFactory & factory)
{ {
return std::make_shared<AvroRowOutputFormat>(buf, sample, params, settings); return std::make_shared<AvroRowOutputFormat>(buf, sample, params, settings);
}); });
factory.markFormatHasNoAppendSupport("Avro");
} }
} }

View File

@ -91,6 +91,11 @@ void registerOutputFormatCustomSeparated(FormatFactory & factory)
}); });
factory.markOutputFormatSupportsParallelFormatting(format_name); factory.markOutputFormatSupportsParallelFormatting(format_name);
factory.registerAppendSupportChecker(format_name, [](const FormatSettings & settings)
{
return settings.custom.result_after_delimiter.empty();
});
}; };
registerWithNamesAndTypes("CustomSeparated", register_func); registerWithNamesAndTypes("CustomSeparated", register_func);

View File

@ -284,6 +284,7 @@ void registerOutputFormatJSON(FormatFactory & factory)
}); });
factory.markOutputFormatSupportsParallelFormatting("JSON"); factory.markOutputFormatSupportsParallelFormatting("JSON");
factory.markFormatHasNoAppendSupport("JSON");
factory.registerOutputFormat("JSONStrings", []( factory.registerOutputFormat("JSONStrings", [](
WriteBuffer & buf, WriteBuffer & buf,
@ -295,6 +296,7 @@ void registerOutputFormatJSON(FormatFactory & factory)
}); });
factory.markOutputFormatSupportsParallelFormatting("JSONStrings"); factory.markOutputFormatSupportsParallelFormatting("JSONStrings");
factory.markFormatHasNoAppendSupport("JSONStrings");
} }
} }

View File

@ -526,6 +526,7 @@ void registerOutputFormatORC(FormatFactory & factory)
{ {
return std::make_shared<ORCBlockOutputFormat>(buf, sample, format_settings); return std::make_shared<ORCBlockOutputFormat>(buf, sample, format_settings);
}); });
factory.markFormatHasNoAppendSupport("ORC");
} }
} }

View File

@ -85,6 +85,7 @@ void registerOutputFormatParquet(FormatFactory & factory)
{ {
return std::make_shared<ParquetBlockOutputFormat>(buf, sample, format_settings); return std::make_shared<ParquetBlockOutputFormat>(buf, sample, format_settings);
}); });
factory.markFormatHasNoAppendSupport("Parquet");
} }
} }

View File

@ -235,5 +235,19 @@ void registerOutputFormatTemplate(FormatFactory & factory)
return std::make_shared<TemplateBlockOutputFormat>(sample, buf, settings, resultset_format, row_format, settings.template_settings.row_between_delimiter); return std::make_shared<TemplateBlockOutputFormat>(sample, buf, settings, resultset_format, row_format, settings.template_settings.row_between_delimiter);
}); });
factory.registerAppendSupportChecker("Template", [](const FormatSettings & settings)
{
if (settings.template_settings.resultset_format.empty())
return true;
auto resultset_format = ParsedTemplateFormatString(
FormatSchemaInfo(settings.template_settings.resultset_format, "Template", false,
settings.schema.is_server, settings.schema.format_schema_path),
[&](const String & partName)
{
return static_cast<size_t>(TemplateBlockOutputFormat::stringToResultsetPart(partName));
});
return resultset_format.delimiters.empty() || resultset_format.delimiters.back().empty();
});
} }
} }

View File

@ -256,6 +256,7 @@ void registerOutputFormatXML(FormatFactory & factory)
}); });
factory.markOutputFormatSupportsParallelFormatting("XML"); factory.markOutputFormatSupportsParallelFormatting("XML");
factory.markFormatHasNoAppendSupport("XML");
} }
} }

View File

@ -14,9 +14,8 @@
#include <Processors/Formats/IInputFormat.h> #include <Processors/Formats/IInputFormat.h>
#include <Processors/Transforms/AddingDefaultsTransform.h> #include <Processors/Transforms/AddingDefaultsTransform.h>
#include <IO/ReadHelpers.h> #include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h> #include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/ExpressionAnalyzer.h> #include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h> #include <Interpreters/TreeRewriter.h>
@ -28,7 +27,6 @@
#include <Storages/HDFS/WriteBufferFromHDFS.h> #include <Storages/HDFS/WriteBufferFromHDFS.h>
#include <Storages/PartitionedSink.h> #include <Storages/PartitionedSink.h>
#include <Formats/ReadSchemaUtils.h> #include <Formats/ReadSchemaUtils.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <Functions/FunctionsConversion.h> #include <Functions/FunctionsConversion.h>
@ -52,7 +50,9 @@ namespace ErrorCodes
{ {
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ACCESS_DENIED; extern const int ACCESS_DENIED;
extern const int DATABASE_ACCESS_DENIED;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE; extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
} }
namespace namespace
@ -139,20 +139,23 @@ StorageHDFS::StorageHDFS(
ASTPtr partition_by_) ASTPtr partition_by_)
: IStorage(table_id_) : IStorage(table_id_)
, WithContext(context_) , WithContext(context_)
, uri(uri_) , uris({uri_})
, format_name(format_name_) , format_name(format_name_)
, compression_method(compression_method_) , compression_method(compression_method_)
, distributed_processing(distributed_processing_) , distributed_processing(distributed_processing_)
, partition_by(partition_by_) , partition_by(partition_by_)
{ {
context_->getRemoteHostFilter().checkURL(Poco::URI(uri)); context_->getRemoteHostFilter().checkURL(Poco::URI(uri_));
checkHDFSURL(uri); checkHDFSURL(uri_);
String path = uri_.substr(uri_.find('/', uri_.find("//") + 2));
is_path_with_globs = path.find_first_of("*?{") != std::string::npos;
StorageInMemoryMetadata storage_metadata; StorageInMemoryMetadata storage_metadata;
if (columns_.empty()) if (columns_.empty())
{ {
auto columns = getTableStructureFromData(format_name, uri, compression_method, context_); auto columns = getTableStructureFromData(format_name, uri_, compression_method, context_);
storage_metadata.setColumns(columns); storage_metadata.setColumns(columns);
} }
else else
@ -217,6 +220,39 @@ private:
Strings::iterator uris_iter; Strings::iterator uris_iter;
}; };
class HDFSSource::URISIterator::Impl
{
public:
explicit Impl(const std::vector<const String> & uris_, ContextPtr context)
{
auto path_and_uri = getPathFromUriAndUriWithoutPath(uris_[0]);
HDFSBuilderWrapper builder = createHDFSBuilder(path_and_uri.second + "/", context->getGlobalContext()->getConfigRef());
HDFSFSPtr fs = createHDFSFS(builder.get());
for (const auto & uri : uris_)
{
path_and_uri = getPathFromUriAndUriWithoutPath(uri);
if (!hdfsExists(fs.get(), path_and_uri.first.c_str()))
uris.push_back(uri);
}
uris_iter = uris.begin();
}
String next()
{
std::lock_guard lock(mutex);
if (uris_iter == uris.end())
return "";
auto key = *uris_iter;
++uris_iter;
return key;
}
private:
std::mutex mutex;
Strings uris;
Strings::iterator uris_iter;
};
Block HDFSSource::getHeader(const StorageMetadataPtr & metadata_snapshot, bool need_path_column, bool need_file_column) Block HDFSSource::getHeader(const StorageMetadataPtr & metadata_snapshot, bool need_path_column, bool need_file_column)
{ {
auto header = metadata_snapshot->getSampleBlock(); auto header = metadata_snapshot->getSampleBlock();
@ -250,6 +286,15 @@ String HDFSSource::DisclosedGlobIterator::next()
return pimpl->next(); return pimpl->next();
} }
HDFSSource::URISIterator::URISIterator(const std::vector<const String> & uris_, ContextPtr context)
: pimpl(std::make_shared<HDFSSource::URISIterator::Impl>(uris_, context))
{
}
String HDFSSource::URISIterator::next()
{
return pimpl->next();
}
HDFSSource::HDFSSource( HDFSSource::HDFSSource(
StorageHDFSPtr storage_, StorageHDFSPtr storage_,
@ -284,9 +329,8 @@ bool HDFSSource::initialize()
current_path = (*file_iterator)(); current_path = (*file_iterator)();
if (current_path.empty()) if (current_path.empty())
return false; return false;
const size_t begin_of_path = current_path.find('/', current_path.find("//") + 2);
const String path_from_uri = current_path.substr(begin_of_path); const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(current_path);
const String uri_without_path = current_path.substr(0, begin_of_path);
auto compression = chooseCompressionMethod(path_from_uri, storage->compression_method); auto compression = chooseCompressionMethod(path_from_uri, storage->compression_method);
read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(uri_without_path, path_from_uri, getContext()->getGlobalContext()->getConfigRef()), compression); read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(uri_without_path, path_from_uri, getContext()->getGlobalContext()->getConfigRef()), compression);
@ -469,15 +513,23 @@ Pipe StorageHDFS::read(
return callback(); return callback();
}); });
} }
else else if (is_path_with_globs)
{ {
/// Iterate through disclosed globs and make a source for each file /// Iterate through disclosed globs and make a source for each file
auto glob_iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(context_, uri); auto glob_iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(context_, uris[0]);
iterator_wrapper = std::make_shared<HDFSSource::IteratorWrapper>([glob_iterator]() iterator_wrapper = std::make_shared<HDFSSource::IteratorWrapper>([glob_iterator]()
{ {
return glob_iterator->next(); return glob_iterator->next();
}); });
} }
else
{
auto uris_iterator = std::make_shared<HDFSSource::URISIterator>(uris, context_);
iterator_wrapper = std::make_shared<HDFSSource::IteratorWrapper>([uris_iterator]()
{
return uris_iterator->next();
});
}
Pipes pipes; Pipes pipes;
auto this_ptr = std::static_pointer_cast<StorageHDFS>(shared_from_this()); auto this_ptr = std::static_pointer_cast<StorageHDFS>(shared_from_this());
@ -505,9 +557,11 @@ Pipe StorageHDFS::read(
return Pipe::unitePipes(std::move(pipes)); return Pipe::unitePipes(std::move(pipes));
} }
SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/) SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context_)
{ {
bool has_wildcards = uri.find(PartitionedSink::PARTITION_ID_WILDCARD) != String::npos; String current_uri = uris.back();
bool has_wildcards = current_uri.find(PartitionedSink::PARTITION_ID_WILDCARD) != String::npos;
const auto * insert_query = dynamic_cast<const ASTInsertQuery *>(query.get()); const auto * insert_query = dynamic_cast<const ASTInsertQuery *>(query.get());
auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr; auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr;
bool is_partitioned_implementation = partition_by_ast && has_wildcards; bool is_partitioned_implementation = partition_by_ast && has_wildcards;
@ -516,34 +570,70 @@ SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataP
{ {
return std::make_shared<PartitionedHDFSSink>( return std::make_shared<PartitionedHDFSSink>(
partition_by_ast, partition_by_ast,
uri, current_uri,
format_name, format_name,
metadata_snapshot->getSampleBlock(), metadata_snapshot->getSampleBlock(),
getContext(), context_,
chooseCompressionMethod(uri, compression_method)); chooseCompressionMethod(current_uri, compression_method));
} }
else else
{ {
return std::make_shared<HDFSSink>(uri, if (is_path_with_globs)
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "URI '{}' contains globs, so the table is in readonly mode", uris.back());
const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(current_uri);
HDFSBuilderWrapper builder = createHDFSBuilder(uri_without_path + "/", context_->getGlobalContext()->getConfigRef());
HDFSFSPtr fs = createHDFSFS(builder.get());
bool truncate_on_insert = context_->getSettingsRef().hdfs_truncate_on_insert;
if (!truncate_on_insert && !hdfsExists(fs.get(), path_from_uri.c_str()))
{
if (context_->getSettingsRef().hdfs_create_new_file_on_insert)
{
auto pos = uris[0].find_first_of('.', uris[0].find_last_of('/'));
size_t index = uris.size();
String new_uri;
do
{
new_uri = uris[0].substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : uris[0].substr(pos));
++index;
}
while (!hdfsExists(fs.get(), new_uri.c_str()));
uris.push_back(new_uri);
current_uri = new_uri;
}
else
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"File with path {} already exists. If you want to overwrite it, enable setting hdfs_truncate_on_insert, "
"if you want to create new file on each insert, enable setting hdfs_create_new_file_on_insert",
path_from_uri);
}
return std::make_shared<HDFSSink>(current_uri,
format_name, format_name,
metadata_snapshot->getSampleBlock(), metadata_snapshot->getSampleBlock(),
getContext(), context_,
chooseCompressionMethod(uri, compression_method)); chooseCompressionMethod(current_uri, compression_method));
} }
} }
void StorageHDFS::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &) void StorageHDFS::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &)
{ {
const size_t begin_of_path = uri.find('/', uri.find("//") + 2); const size_t begin_of_path = uris[0].find('/', uris[0].find("//") + 2);
const String path = uri.substr(begin_of_path); const String url = uris[0].substr(0, begin_of_path);
const String url = uri.substr(0, begin_of_path);
HDFSBuilderWrapper builder = createHDFSBuilder(url + "/", local_context->getGlobalContext()->getConfigRef()); HDFSBuilderWrapper builder = createHDFSBuilder(url + "/", local_context->getGlobalContext()->getConfigRef());
HDFSFSPtr fs = createHDFSFS(builder.get()); HDFSFSPtr fs = createHDFSFS(builder.get());
int ret = hdfsDelete(fs.get(), path.data(), 0); for (const auto & uri : uris)
if (ret) {
throw Exception(ErrorCodes::ACCESS_DENIED, "Unable to truncate hdfs table: {}", std::string(hdfsGetLastError())); const String path = uri.substr(begin_of_path);
int ret = hdfsDelete(fs.get(), path.data(), 0);
if (ret)
throw Exception(ErrorCodes::ACCESS_DENIED, "Unable to truncate hdfs table: {}", std::string(hdfsGetLastError()));
}
} }

View File

@ -31,7 +31,7 @@ public:
size_t max_block_size, size_t max_block_size,
unsigned num_streams) override; unsigned num_streams) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr /*context*/) override; SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) override;
void truncate( void truncate(
const ASTPtr & query, const ASTPtr & query,
@ -70,11 +70,12 @@ protected:
ASTPtr partition_by = nullptr); ASTPtr partition_by = nullptr);
private: private:
const String uri; std::vector<const String> uris;
String format_name; String format_name;
String compression_method; String compression_method;
const bool distributed_processing; const bool distributed_processing;
ASTPtr partition_by; ASTPtr partition_by;
bool is_path_with_globs;
Poco::Logger * log = &Poco::Logger::get("StorageHDFS"); Poco::Logger * log = &Poco::Logger::get("StorageHDFS");
}; };
@ -95,6 +96,17 @@ public:
std::shared_ptr<Impl> pimpl; std::shared_ptr<Impl> pimpl;
}; };
class URISIterator
{
public:
URISIterator(const std::vector<const String> & uris_, ContextPtr context);
String next();
private:
class Impl;
/// shared_ptr to have copy constructor
std::shared_ptr<Impl> pimpl;
};
using IteratorWrapper = std::function<String()>; using IteratorWrapper = std::function<String()>;
using StorageHDFSPtr = std::shared_ptr<StorageHDFS>; using StorageHDFSPtr = std::shared_ptr<StorageHDFS>;

View File

@ -15,7 +15,6 @@ namespace ErrorCodes
extern const int NETWORK_ERROR; extern const int NETWORK_ERROR;
extern const int CANNOT_OPEN_FILE; extern const int CANNOT_OPEN_FILE;
extern const int CANNOT_FSYNC; extern const int CANNOT_FSYNC;
extern const int BAD_ARGUMENTS;
} }
@ -38,12 +37,6 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2); const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2);
const String path = hdfs_uri.substr(begin_of_path); const String path = hdfs_uri.substr(begin_of_path);
if (path.find_first_of("*?{") != std::string::npos)
throw Exception(ErrorCodes::CANNOT_OPEN_FILE, "URI '{}' contains globs, so the table is in readonly mode", hdfs_uri);
if (!hdfsExists(fs.get(), path.c_str()))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "File {} already exists", path);
fout = hdfsOpenFile(fs.get(), path.c_str(), flags, 0, replication_, 0); /// O_WRONLY meaning create or overwrite i.e., implies O_TRUNCAT here fout = hdfsOpenFile(fs.get(), path.c_str(), flags, 0, replication_, 0); /// O_WRONLY meaning create or overwrite i.e., implies O_TRUNCAT here
if (fout == nullptr) if (fout == nullptr)

View File

@ -65,6 +65,7 @@ namespace ErrorCodes
extern const int INCOMPATIBLE_COLUMNS; extern const int INCOMPATIBLE_COLUMNS;
extern const int CANNOT_STAT; extern const int CANNOT_STAT;
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int CANNOT_APPEND_TO_FILE;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE; extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
} }
@ -285,6 +286,7 @@ StorageFile::StorageFile(const std::string & table_path_, const std::string & us
{ {
is_db_table = false; is_db_table = false;
paths = getPathsList(table_path_, user_files_path, args.getContext(), total_bytes_to_read); paths = getPathsList(table_path_, user_files_path, args.getContext(), total_bytes_to_read);
is_path_with_globs = paths.size() > 1;
path_for_partitioned_write = table_path_; path_for_partitioned_write = table_path_;
setStorageMetadata(args); setStorageMetadata(args);
} }
@ -603,7 +605,7 @@ public:
int table_fd_, int table_fd_,
bool use_table_fd_, bool use_table_fd_,
std::string base_path_, std::string base_path_,
std::vector<std::string> paths_, std::string path_,
const CompressionMethod compression_method_, const CompressionMethod compression_method_,
const std::optional<FormatSettings> & format_settings_, const std::optional<FormatSettings> & format_settings_,
const String format_name_, const String format_name_,
@ -615,7 +617,7 @@ public:
, table_fd(table_fd_) , table_fd(table_fd_)
, use_table_fd(use_table_fd_) , use_table_fd(use_table_fd_)
, base_path(base_path_) , base_path(base_path_)
, paths(paths_) , path(path_)
, compression_method(compression_method_) , compression_method(compression_method_)
, format_name(format_name_) , format_name(format_name_)
, format_settings(format_settings_) , format_settings(format_settings_)
@ -632,7 +634,7 @@ public:
int table_fd_, int table_fd_,
bool use_table_fd_, bool use_table_fd_,
std::string base_path_, std::string base_path_,
std::vector<std::string> paths_, const std::string & path_,
const CompressionMethod compression_method_, const CompressionMethod compression_method_,
const std::optional<FormatSettings> & format_settings_, const std::optional<FormatSettings> & format_settings_,
const String format_name_, const String format_name_,
@ -644,7 +646,7 @@ public:
, table_fd(table_fd_) , table_fd(table_fd_)
, use_table_fd(use_table_fd_) , use_table_fd(use_table_fd_)
, base_path(base_path_) , base_path(base_path_)
, paths(paths_) , path(path_)
, compression_method(compression_method_) , compression_method(compression_method_)
, format_name(format_name_) , format_name(format_name_)
, format_settings(format_settings_) , format_settings(format_settings_)
@ -666,10 +668,8 @@ public:
} }
else else
{ {
if (paths.size() != 1)
throw Exception("Table '" + table_name_for_log + "' is in readonly mode because of globs in filepath", ErrorCodes::DATABASE_ACCESS_DENIED);
flags |= O_WRONLY | O_APPEND | O_CREAT; flags |= O_WRONLY | O_APPEND | O_CREAT;
naked_buffer = std::make_unique<WriteBufferFromFile>(paths[0], DBMS_DEFAULT_BUFFER_SIZE, flags); naked_buffer = std::make_unique<WriteBufferFromFile>(path, DBMS_DEFAULT_BUFFER_SIZE, flags);
} }
/// In case of formats with prefixes if file is not empty we have already written prefix. /// In case of formats with prefixes if file is not empty we have already written prefix.
@ -709,7 +709,7 @@ private:
int table_fd; int table_fd;
bool use_table_fd; bool use_table_fd;
std::string base_path; std::string base_path;
std::vector<std::string> paths; std::string path;
CompressionMethod compression_method; CompressionMethod compression_method;
std::string format_name; std::string format_name;
std::optional<FormatSettings> format_settings; std::optional<FormatSettings> format_settings;
@ -752,7 +752,6 @@ public:
{ {
auto partition_path = PartitionedSink::replaceWildcards(path, partition_id); auto partition_path = PartitionedSink::replaceWildcards(path, partition_id);
PartitionedSink::validatePartitionKey(partition_path, true); PartitionedSink::validatePartitionKey(partition_path, true);
Strings result_paths = {partition_path};
checkCreationIsAllowed(context, context->getUserFilesPath(), partition_path); checkCreationIsAllowed(context, context->getUserFilesPath(), partition_path);
return std::make_shared<StorageFileSink>( return std::make_shared<StorageFileSink>(
metadata_snapshot, metadata_snapshot,
@ -760,7 +759,7 @@ public:
-1, -1,
/* use_table_fd */false, /* use_table_fd */false,
base_path, base_path,
result_paths, partition_path,
compression_method, compression_method,
format_settings, format_settings,
format_name, format_name,
@ -794,7 +793,6 @@ SinkToStoragePtr StorageFile::write(
int flags = 0; int flags = 0;
std::string path;
if (context->getSettingsRef().engine_file_truncate_on_insert) if (context->getSettingsRef().engine_file_truncate_on_insert)
flags |= O_TRUNC; flags |= O_TRUNC;
@ -815,7 +813,7 @@ SinkToStoragePtr StorageFile::write(
std::unique_lock{rwlock, getLockTimeout(context)}, std::unique_lock{rwlock, getLockTimeout(context)},
base_path, base_path,
path_for_partitioned_write, path_for_partitioned_write,
chooseCompressionMethod(path, compression_method), chooseCompressionMethod(path_for_partitioned_write, compression_method),
format_settings, format_settings,
format_name, format_name,
context, context,
@ -823,10 +821,41 @@ SinkToStoragePtr StorageFile::write(
} }
else else
{ {
String path;
if (!paths.empty()) if (!paths.empty())
{ {
path = paths[0]; if (is_path_with_globs)
throw Exception("Table '" + getStorageID().getNameForLogs() + "' is in readonly mode because of globs in filepath", ErrorCodes::DATABASE_ACCESS_DENIED);
path = paths.back();
fs::create_directories(fs::path(path).parent_path()); fs::create_directories(fs::path(path).parent_path());
if (!context->getSettingsRef().engine_file_truncate_on_insert && !is_path_with_globs
&& !FormatFactory::instance().checkIfFormatSupportAppend(format_name, context, format_settings) && fs::exists(paths.back())
&& fs::file_size(paths.back()) != 0)
{
if (context->getSettingsRef().engine_file_allow_create_multiple_files)
{
auto pos = paths[0].find_first_of('.', paths[0].find_last_of('/'));
size_t index = paths.size();
String new_path;
do
{
new_path = paths[0].substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : paths[0].substr(pos));
++index;
}
while (fs::exists(new_path));
paths.push_back(new_path);
path = new_path;
}
else
throw Exception(
ErrorCodes::CANNOT_APPEND_TO_FILE,
"Cannot append data in format {} to file, because this format doesn't support appends."
" You can allow to create a new file "
"on each insert by enabling setting engine_file_allow_create_multiple_files",
format_name);
}
} }
return std::make_shared<StorageFileSink>( return std::make_shared<StorageFileSink>(
@ -836,7 +865,7 @@ SinkToStoragePtr StorageFile::write(
table_fd, table_fd,
use_table_fd, use_table_fd,
base_path, base_path,
paths, path,
chooseCompressionMethod(path, compression_method), chooseCompressionMethod(path, compression_method),
format_settings, format_settings,
format_name, format_name,
@ -882,7 +911,7 @@ void StorageFile::truncate(
ContextPtr /* context */, ContextPtr /* context */,
TableExclusiveLockHolder &) TableExclusiveLockHolder &)
{ {
if (paths.size() != 1) if (is_path_with_globs)
throw Exception("Can't truncate table '" + getStorageID().getNameForLogs() + "' in readonly mode", ErrorCodes::DATABASE_ACCESS_DENIED); throw Exception("Can't truncate table '" + getStorageID().getNameForLogs() + "' in readonly mode", ErrorCodes::DATABASE_ACCESS_DENIED);
if (use_table_fd) if (use_table_fd)
@ -892,11 +921,14 @@ void StorageFile::truncate(
} }
else else
{ {
if (!fs::exists(paths[0])) for (const auto & path : paths)
return; {
if (!fs::exists(path))
continue;
if (0 != ::truncate(paths[0].c_str(), 0)) if (0 != ::truncate(path.c_str(), 0))
throwFromErrnoWithPath("Cannot truncate file " + paths[0], paths[0], ErrorCodes::CANNOT_TRUNCATE_FILE); throwFromErrnoWithPath("Cannot truncate file " + path, path, ErrorCodes::CANNOT_TRUNCATE_FILE);
}
} }
} }

View File

@ -120,6 +120,8 @@ private:
size_t total_bytes_to_read = 0; size_t total_bytes_to_read = 0;
String path_for_partitioned_write; String path_for_partitioned_write;
bool is_path_with_globs = false;
}; };
} }

View File

@ -774,7 +774,8 @@ void StorageReplicatedMergeTree::drop()
/// or metadata of staled replica were removed manually, /// or metadata of staled replica were removed manually,
/// in this case, has_metadata_in_zookeeper = false, and we also permit to drop the table. /// in this case, has_metadata_in_zookeeper = false, and we also permit to drop the table.
if (has_metadata_in_zookeeper) bool maybe_has_metadata_in_zookeeper = !has_metadata_in_zookeeper.has_value() || *has_metadata_in_zookeeper;
if (maybe_has_metadata_in_zookeeper)
{ {
/// Table can be shut down, restarting thread is not active /// Table can be shut down, restarting thread is not active
/// and calling StorageReplicatedMergeTree::getZooKeeper()/getAuxiliaryZooKeeper() won't suffice. /// and calling StorageReplicatedMergeTree::getZooKeeper()/getAuxiliaryZooKeeper() won't suffice.
@ -4811,12 +4812,22 @@ bool StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(const St
void StorageReplicatedMergeTree::restoreMetadataInZooKeeper() void StorageReplicatedMergeTree::restoreMetadataInZooKeeper()
{ {
LOG_INFO(log, "Restoring replica metadata"); LOG_INFO(log, "Restoring replica metadata");
if (!is_readonly)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica must be readonly");
if (!is_readonly || has_metadata_in_zookeeper) if (getZooKeeper()->exists(replica_path))
throw Exception(ErrorCodes::LOGICAL_ERROR, "It's a bug: replica is not readonly"); throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Replica path is present at {} - nothing to restore. "
"If you are sure that metadata is lost and that replica path contains some garbage, "
"then use SYSTEM DROP REPLICA query first.", replica_path);
if (has_metadata_in_zookeeper.has_value() && *has_metadata_in_zookeeper)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Replica has metadata in ZooKeeper: "
"it's either a bug or it's a result of manual intervention to ZooKeeper");
if (are_restoring_replica.exchange(true)) if (are_restoring_replica.exchange(true))
throw Exception(ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED, "Replica restoration in progress"); throw Exception(ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED, "Replica restoration in progress");
SCOPE_EXIT({ are_restoring_replica.store(false); });
auto metadata_snapshot = getInMemoryMetadataPtr(); auto metadata_snapshot = getInMemoryMetadataPtr();
@ -4857,8 +4868,6 @@ void StorageReplicatedMergeTree::restoreMetadataInZooKeeper()
LOG_INFO(log, "Attached all partitions, starting table"); LOG_INFO(log, "Attached all partitions, starting table");
startup(); startup();
are_restoring_replica.store(false);
} }
void StorageReplicatedMergeTree::dropPartNoWaitNoThrow(const String & part_name) void StorageReplicatedMergeTree::dropPartNoWaitNoThrow(const String & part_name)

View File

@ -322,8 +322,9 @@ private:
/// If true, the table is offline and can not be written to it. /// If true, the table is offline and can not be written to it.
std::atomic_bool is_readonly {false}; std::atomic_bool is_readonly {false};
/// If nullopt - ZooKeeper is not available, so we don't know if there is table metadata.
/// If false - ZooKeeper is available, but there is no table metadata. It's safe to drop table in this case. /// If false - ZooKeeper is available, but there is no table metadata. It's safe to drop table in this case.
bool has_metadata_in_zookeeper = true; std::optional<bool> has_metadata_in_zookeeper;
static constexpr auto default_zookeeper_name = "default"; static constexpr auto default_zookeeper_name = "default";
String zookeeper_name; String zookeeper_name;

View File

@ -68,7 +68,7 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int S3_ERROR; extern const int S3_ERROR;
extern const int UNEXPECTED_EXPRESSION; extern const int UNEXPECTED_EXPRESSION;
extern const int CANNOT_OPEN_FILE; extern const int DATABASE_ACCESS_DENIED;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE; extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
} }
@ -82,8 +82,6 @@ public:
Impl(Aws::S3::S3Client & client_, const S3::URI & globbed_uri_) Impl(Aws::S3::S3Client & client_, const S3::URI & globbed_uri_)
: client(client_), globbed_uri(globbed_uri_) : client(client_), globbed_uri(globbed_uri_)
{ {
std::lock_guard lock(mutex);
if (globbed_uri.bucket.find_first_of("*?{") != globbed_uri.bucket.npos) if (globbed_uri.bucket.find_first_of("*?{") != globbed_uri.bucket.npos)
throw Exception("Expression can not have wildcards inside bucket name", ErrorCodes::UNEXPECTED_EXPRESSION); throw Exception("Expression can not have wildcards inside bucket name", ErrorCodes::UNEXPECTED_EXPRESSION);
@ -176,6 +174,37 @@ String StorageS3Source::DisclosedGlobIterator::next()
return pimpl->next(); return pimpl->next();
} }
class StorageS3Source::KeysIterator::Impl
{
public:
explicit Impl(const std::vector<String> & keys_) : keys(keys_), keys_iter(keys.begin())
{
}
String next()
{
std::lock_guard lock(mutex);
if (keys_iter == keys.end())
return "";
auto key = *keys_iter;
++keys_iter;
return key;
}
private:
std::mutex mutex;
Strings keys;
Strings::iterator keys_iter;
};
StorageS3Source::KeysIterator::KeysIterator(const std::vector<String> & keys_) : pimpl(std::make_shared<StorageS3Source::KeysIterator::Impl>(keys_))
{
}
String StorageS3Source::KeysIterator::next()
{
return pimpl->next();
}
Block StorageS3Source::getHeader(Block sample_block, bool with_path_column, bool with_file_column) Block StorageS3Source::getHeader(Block sample_block, bool with_path_column, bool with_file_column)
{ {
@ -296,6 +325,39 @@ Chunk StorageS3Source::generate()
return generate(); return generate();
} }
static bool checkIfObjectExists(const std::shared_ptr<Aws::S3::S3Client> & client, const String & bucket, const String & key)
{
bool is_finished = false;
Aws::S3::Model::ListObjectsV2Request request;
Aws::S3::Model::ListObjectsV2Outcome outcome;
request.SetBucket(bucket);
request.SetPrefix(key);
while (!is_finished)
{
outcome = client->ListObjectsV2(request);
if (!outcome.IsSuccess())
throw Exception(
ErrorCodes::S3_ERROR,
"Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}",
quoteString(bucket),
quoteString(key),
backQuote(outcome.GetError().GetExceptionName()),
quoteString(outcome.GetError().GetMessage()));
const auto & result_batch = outcome.GetResult().GetContents();
for (const auto & obj : result_batch)
{
if (obj.GetKey() == key)
return true;
}
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
is_finished = !outcome.GetResult().GetIsTruncated();
}
return false;
}
class StorageS3Sink : public SinkToStorage class StorageS3Sink : public SinkToStorage
{ {
@ -315,9 +377,6 @@ public:
, sample_block(sample_block_) , sample_block(sample_block_)
, format_settings(format_settings_) , format_settings(format_settings_)
{ {
if (key.find_first_of("*?{") != std::string::npos)
throw Exception(ErrorCodes::CANNOT_OPEN_FILE, "S3 key '{}' contains globs, so the table is in readonly mode", key);
write_buf = wrapWriteBufferWithCompressionMethod( write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromS3>(client, bucket, key, min_upload_part_size, max_single_part_upload_size), compression_method, 3); std::make_unique<WriteBufferFromS3>(client, bucket, key, min_upload_part_size, max_single_part_upload_size), compression_method, 3);
writer = FormatFactory::instance().getOutputFormatParallelIfPossible(format, *write_buf, sample_block, context, {}, format_settings); writer = FormatFactory::instance().getOutputFormatParallelIfPossible(format, *write_buf, sample_block, context, {}, format_settings);
@ -419,7 +478,6 @@ private:
std::optional<FormatSettings> format_settings; std::optional<FormatSettings> format_settings;
ExpressionActionsPtr partition_by_expr; ExpressionActionsPtr partition_by_expr;
String partition_by_column_name;
static void validateBucket(const String & str) static void validateBucket(const String & str)
{ {
@ -468,6 +526,7 @@ StorageS3::StorageS3(
ASTPtr partition_by_) ASTPtr partition_by_)
: IStorage(table_id_) : IStorage(table_id_)
, client_auth{uri_, access_key_id_, secret_access_key_, max_connections_, {}, {}} /// Client and settings will be updated later , client_auth{uri_, access_key_id_, secret_access_key_, max_connections_, {}, {}} /// Client and settings will be updated later
, keys({uri_.key})
, format_name(format_name_) , format_name(format_name_)
, max_single_read_retries(max_single_read_retries_) , max_single_read_retries(max_single_read_retries_)
, min_upload_part_size(min_upload_part_size_) , min_upload_part_size(min_upload_part_size_)
@ -477,6 +536,7 @@ StorageS3::StorageS3(
, distributed_processing(distributed_processing_) , distributed_processing(distributed_processing_)
, format_settings(format_settings_) , format_settings(format_settings_)
, partition_by(partition_by_) , partition_by(partition_by_)
, is_key_with_globs(uri_.key.find_first_of("*?{") != std::string::npos)
{ {
context_->getGlobalContext()->getRemoteHostFilter().checkURL(uri_.uri); context_->getGlobalContext()->getRemoteHostFilter().checkURL(uri_.uri);
StorageInMemoryMetadata storage_metadata; StorageInMemoryMetadata storage_metadata;
@ -484,7 +544,7 @@ StorageS3::StorageS3(
updateClientAndAuthSettings(context_, client_auth); updateClientAndAuthSettings(context_, client_auth);
if (columns_.empty()) if (columns_.empty())
{ {
auto columns = getTableStructureFromDataImpl(format_name, client_auth, max_single_read_retries_, compression_method, distributed_processing_, format_settings, context_); auto columns = getTableStructureFromDataImpl(format_name, client_auth, max_single_read_retries_, compression_method, distributed_processing_, is_key_with_globs, format_settings, context_);
storage_metadata.setColumns(columns); storage_metadata.setColumns(columns);
} }
else else
@ -495,9 +555,8 @@ StorageS3::StorageS3(
setInMemoryMetadata(storage_metadata); setInMemoryMetadata(storage_metadata);
} }
std::shared_ptr<StorageS3Source::IteratorWrapper> StorageS3::createFileIterator(const ClientAuthentication & client_auth, bool distributed_processing, ContextPtr local_context) std::shared_ptr<StorageS3Source::IteratorWrapper> StorageS3::createFileIterator(const ClientAuthentication & client_auth, const std::vector<String> & keys, bool is_key_with_globs, bool distributed_processing, ContextPtr local_context)
{ {
std::shared_ptr<StorageS3Source::IteratorWrapper> iterator_wrapper{nullptr};
if (distributed_processing) if (distributed_processing)
{ {
return std::make_shared<StorageS3Source::IteratorWrapper>( return std::make_shared<StorageS3Source::IteratorWrapper>(
@ -505,13 +564,23 @@ std::shared_ptr<StorageS3Source::IteratorWrapper> StorageS3::createFileIterator(
return callback(); return callback();
}); });
} }
else if (is_key_with_globs)
/// Iterate through disclosed globs and make a source for each file
auto glob_iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(*client_auth.client, client_auth.uri);
return std::make_shared<StorageS3Source::IteratorWrapper>([glob_iterator]()
{ {
return glob_iterator->next(); /// Iterate through disclosed globs and make a source for each file
}); auto glob_iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(*client_auth.client, client_auth.uri);
return std::make_shared<StorageS3Source::IteratorWrapper>([glob_iterator]()
{
return glob_iterator->next();
});
}
else
{
auto keys_iterator = std::make_shared<StorageS3Source::KeysIterator>(keys);
return std::make_shared<StorageS3Source::IteratorWrapper>([keys_iterator]()
{
return keys_iterator->next();
});
}
} }
Pipe StorageS3::read( Pipe StorageS3::read(
@ -536,7 +605,7 @@ Pipe StorageS3::read(
need_file_column = true; need_file_column = true;
} }
std::shared_ptr<StorageS3Source::IteratorWrapper> iterator_wrapper = createFileIterator(client_auth, distributed_processing, local_context); std::shared_ptr<StorageS3Source::IteratorWrapper> iterator_wrapper = createFileIterator(client_auth, keys, is_key_with_globs, distributed_processing, local_context);
for (size_t i = 0; i < num_streams; ++i) for (size_t i = 0; i < num_streams; ++i)
{ {
@ -567,8 +636,8 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
updateClientAndAuthSettings(local_context, client_auth); updateClientAndAuthSettings(local_context, client_auth);
auto sample_block = metadata_snapshot->getSampleBlock(); auto sample_block = metadata_snapshot->getSampleBlock();
auto chosen_compression_method = chooseCompressionMethod(client_auth.uri.key, compression_method); auto chosen_compression_method = chooseCompressionMethod(keys.back(), compression_method);
bool has_wildcards = client_auth.uri.bucket.find(PARTITION_ID_WILDCARD) != String::npos || client_auth.uri.key.find(PARTITION_ID_WILDCARD) != String::npos; bool has_wildcards = client_auth.uri.bucket.find(PARTITION_ID_WILDCARD) != String::npos || keys.back().find(PARTITION_ID_WILDCARD) != String::npos;
auto insert_query = std::dynamic_pointer_cast<ASTInsertQuery>(query); auto insert_query = std::dynamic_pointer_cast<ASTInsertQuery>(query);
auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr; auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr;
@ -585,12 +654,41 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
chosen_compression_method, chosen_compression_method,
client_auth.client, client_auth.client,
client_auth.uri.bucket, client_auth.uri.bucket,
client_auth.uri.key, keys.back(),
min_upload_part_size, min_upload_part_size,
max_single_part_upload_size); max_single_part_upload_size);
} }
else else
{ {
if (is_key_with_globs)
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "S3 key '{}' contains globs, so the table is in readonly mode", client_auth.uri.key);
bool truncate_in_insert = local_context->getSettingsRef().s3_truncate_on_insert;
if (!truncate_in_insert && checkIfObjectExists(client_auth.client, client_auth.uri.bucket, keys.back()))
{
if (local_context->getSettingsRef().s3_create_new_file_on_insert)
{
size_t index = keys.size();
auto pos = keys[0].find_first_of('.');
String new_key;
do
{
new_key = keys[0].substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : keys[0].substr(pos));
++index;
}
while (checkIfObjectExists(client_auth.client, client_auth.uri.bucket, new_key));
keys.push_back(new_key);
}
else
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Object in bucket {} with key {} already exists. If you want to overwrite it, enable setting s3_truncate_on_insert, if you "
"want to create a new file on each insert, enable setting s3_create_new_file_on_insert",
client_auth.uri.bucket,
keys.back());
}
return std::make_shared<StorageS3Sink>( return std::make_shared<StorageS3Sink>(
format_name, format_name,
sample_block, sample_block,
@ -599,7 +697,7 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
chosen_compression_method, chosen_compression_method,
client_auth.client, client_auth.client,
client_auth.uri.bucket, client_auth.uri.bucket,
client_auth.uri.key, keys.back(),
min_upload_part_size, min_upload_part_size,
max_single_part_upload_size); max_single_part_upload_size);
} }
@ -610,11 +708,17 @@ void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &,
{ {
updateClientAndAuthSettings(local_context, client_auth); updateClientAndAuthSettings(local_context, client_auth);
Aws::S3::Model::ObjectIdentifier obj; if (is_key_with_globs)
obj.SetKey(client_auth.uri.key); throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "S3 key '{}' contains globs, so the table is in readonly mode", client_auth.uri.key);
Aws::S3::Model::Delete delkeys; Aws::S3::Model::Delete delkeys;
delkeys.AddObjects(std::move(obj));
for (const auto & key : keys)
{
Aws::S3::Model::ObjectIdentifier obj;
obj.SetKey(key);
delkeys.AddObjects(std::move(obj));
}
Aws::S3::Model::DeleteObjectsRequest request; Aws::S3::Model::DeleteObjectsRequest request;
request.SetBucket(client_auth.uri.bucket); request.SetBucket(client_auth.uri.bucket);
@ -734,7 +838,7 @@ ColumnsDescription StorageS3::getTableStructureFromData(
{ {
ClientAuthentication client_auth{uri, access_key_id, secret_access_key, max_connections, {}, {}}; ClientAuthentication client_auth{uri, access_key_id, secret_access_key, max_connections, {}, {}};
updateClientAndAuthSettings(ctx, client_auth); updateClientAndAuthSettings(ctx, client_auth);
return getTableStructureFromDataImpl(format, client_auth, max_single_read_retries, compression_method, distributed_processing, format_settings, ctx); return getTableStructureFromDataImpl(format, client_auth, max_single_read_retries, compression_method, distributed_processing, uri.key.find_first_of("*?{") != std::string::npos, format_settings, ctx);
} }
ColumnsDescription StorageS3::getTableStructureFromDataImpl( ColumnsDescription StorageS3::getTableStructureFromDataImpl(
@ -743,12 +847,14 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
UInt64 max_single_read_retries, UInt64 max_single_read_retries,
const String & compression_method, const String & compression_method,
bool distributed_processing, bool distributed_processing,
bool is_key_with_globs,
const std::optional<FormatSettings> & format_settings, const std::optional<FormatSettings> & format_settings,
ContextPtr ctx) ContextPtr ctx)
{ {
std::vector<String> keys = {client_auth.uri.key};
auto read_buffer_creator = [&]() auto read_buffer_creator = [&]()
{ {
auto file_iterator = createFileIterator(client_auth, distributed_processing, ctx); auto file_iterator = createFileIterator(client_auth, keys, is_key_with_globs, distributed_processing, ctx);
String current_key = (*file_iterator)(); String current_key = (*file_iterator)();
if (current_key.empty()) if (current_key.empty())
throw Exception( throw Exception(

View File

@ -44,6 +44,18 @@ public:
std::shared_ptr<Impl> pimpl; std::shared_ptr<Impl> pimpl;
}; };
class KeysIterator
{
public:
explicit KeysIterator(const std::vector<String> & keys_);
String next();
private:
class Impl;
/// shared_ptr to have copy constructor
std::shared_ptr<Impl> pimpl;
};
using IteratorWrapper = std::function<String()>; using IteratorWrapper = std::function<String()>;
static Block getHeader(Block sample_block, bool with_path_column, bool with_file_column); static Block getHeader(Block sample_block, bool with_path_column, bool with_file_column);
@ -174,6 +186,7 @@ private:
}; };
ClientAuthentication client_auth; ClientAuthentication client_auth;
std::vector<String> keys;
String format_name; String format_name;
UInt64 max_single_read_retries; UInt64 max_single_read_retries;
@ -184,10 +197,11 @@ private:
const bool distributed_processing; const bool distributed_processing;
std::optional<FormatSettings> format_settings; std::optional<FormatSettings> format_settings;
ASTPtr partition_by; ASTPtr partition_by;
bool is_key_with_globs = false;
static void updateClientAndAuthSettings(ContextPtr, ClientAuthentication &); static void updateClientAndAuthSettings(ContextPtr, ClientAuthentication &);
static std::shared_ptr<StorageS3Source::IteratorWrapper> createFileIterator(const ClientAuthentication & client_auth, bool distributed_processing, ContextPtr local_context); static std::shared_ptr<StorageS3Source::IteratorWrapper> createFileIterator(const ClientAuthentication & client_auth, const std::vector<String> & keys, bool is_key_with_globs, bool distributed_processing, ContextPtr local_context);
static ColumnsDescription getTableStructureFromDataImpl( static ColumnsDescription getTableStructureFromDataImpl(
const String & format, const String & format,
@ -195,6 +209,7 @@ private:
UInt64 max_single_read_retries, UInt64 max_single_read_retries,
const String & compression_method, const String & compression_method,
bool distributed_processing, bool distributed_processing,
bool is_key_with_globs,
const std::optional<FormatSettings> & format_settings, const std::optional<FormatSettings> & format_settings,
ContextPtr ctx); ContextPtr ctx);
}; };

View File

@ -6,7 +6,7 @@ import os
import shutil import shutil
import subprocess import subprocess
import time import time
from typing import List, Tuple from typing import List, Optional, Set, Tuple, Union
from github import Github from github import Github
@ -24,13 +24,41 @@ NAME = "Push to Dockerhub (actions)"
TEMP_PATH = os.path.join(RUNNER_TEMP, "docker_images_check") TEMP_PATH = os.path.join(RUNNER_TEMP, "docker_images_check")
class DockerImage:
def __init__(
self,
path: str,
repo: str,
parent: Optional["DockerImage"] = None,
gh_repo_path: str = GITHUB_WORKSPACE,
):
self.path = path
self.full_path = os.path.join(gh_repo_path, path)
self.repo = repo
self.parent = parent
self.built = False
def __eq__(self, other):
"""Is used to check if DockerImage is in a set or not"""
return self.path == other.path
def __hash__(self):
return hash(self.path)
def __str__(self):
return self.repo
def __repr__(self):
return f"DockerImage(path={self.path},path={self.path},parent={self.parent})"
def get_changed_docker_images( def get_changed_docker_images(
pr_info: PRInfo, repo_path: str, image_file_path: str pr_info: PRInfo, repo_path: str, image_file_path: str
) -> List[Tuple[str, str]]: ) -> Set[DockerImage]:
images_dict = {} images_dict = {}
path_to_images_file = os.path.join(repo_path, image_file_path) path_to_images_file = os.path.join(repo_path, image_file_path)
if os.path.exists(path_to_images_file): if os.path.exists(path_to_images_file):
with open(path_to_images_file, "r") as dict_file: with open(path_to_images_file, "rb") as dict_file:
images_dict = json.load(dict_file) images_dict = json.load(dict_file)
else: else:
logging.info( logging.info(
@ -38,7 +66,7 @@ def get_changed_docker_images(
) )
if not images_dict: if not images_dict:
return [] return set()
files_changed = pr_info.changed_files files_changed = pr_info.changed_files
@ -54,14 +82,15 @@ def get_changed_docker_images(
for dockerfile_dir, image_description in images_dict.items(): for dockerfile_dir, image_description in images_dict.items():
for f in files_changed: for f in files_changed:
if f.startswith(dockerfile_dir): if f.startswith(dockerfile_dir):
name = image_description["name"]
logging.info( logging.info(
"Found changed file '%s' which affects " "Found changed file '%s' which affects "
"docker image '%s' with path '%s'", "docker image '%s' with path '%s'",
f, f,
image_description["name"], name,
dockerfile_dir, dockerfile_dir,
) )
changed_images.append(dockerfile_dir) changed_images.append(DockerImage(dockerfile_dir, name))
break break
# The order is important: dependents should go later than bases, so that # The order is important: dependents should go later than bases, so that
@ -69,14 +98,14 @@ def get_changed_docker_images(
index = 0 index = 0
while index < len(changed_images): while index < len(changed_images):
image = changed_images[index] image = changed_images[index]
for dependent in images_dict[image]["dependent"]: for dependent in images_dict[image.path]["dependent"]:
logging.info( logging.info(
"Marking docker image '%s' as changed because it " "Marking docker image '%s' as changed because it "
"depends on changed docker image '%s'", "depends on changed docker image '%s'",
dependent, dependent,
image, image,
) )
changed_images.append(dependent) changed_images.append(DockerImage(dependent, image.repo, image))
index += 1 index += 1
if index > 5 * len(images_dict): if index > 5 * len(images_dict):
# Sanity check to prevent infinite loop. # Sanity check to prevent infinite loop.
@ -84,19 +113,9 @@ def get_changed_docker_images(
f"Too many changed docker images, this is a bug. {changed_images}" f"Too many changed docker images, this is a bug. {changed_images}"
) )
# If a dependent image was already in the list because its own files # With reversed changed_images set will use images with parents first, and
# changed, but then it was added as a dependent of a changed base, we # images without parents then
# must remove the earlier entry so that it doesn't go earlier than its result = set(reversed(changed_images))
# base. This way, the dependent will be rebuilt later than the base, and
# will correctly use the updated version of the base.
seen = set()
no_dups_reversed = []
for x in reversed(changed_images):
if x not in seen:
seen.add(x)
no_dups_reversed.append(x)
result = [(x, images_dict[x]["name"]) for x in reversed(no_dups_reversed)]
logging.info( logging.info(
"Changed docker images for PR %s @ %s: '%s'", "Changed docker images for PR %s @ %s: '%s'",
pr_info.number, pr_info.number,
@ -106,66 +125,109 @@ def get_changed_docker_images(
return result return result
def gen_versions(
pr_info: PRInfo, suffix: Optional[str]
) -> Tuple[List[str], Union[str, List[str]]]:
pr_commit_version = str(pr_info.number) + "-" + pr_info.sha
# The order is important, PR number is used as cache during the build
versions = [str(pr_info.number), pr_commit_version]
result_version = pr_commit_version
if pr_info.number == 0:
# First get the latest for cache
versions.insert(0, "latest")
if suffix:
# We should build architecture specific images separately and merge a
# manifest lately in a different script
versions = [f"{v}-{suffix}" for v in versions]
# changed_images_{suffix}.json should contain all changed images
result_version = versions
return versions, result_version
def build_and_push_one_image( def build_and_push_one_image(
path_to_dockerfile_folder: str, image_name: str, version_string: str, push: bool image: DockerImage,
version_string: str,
push: bool,
child: bool,
) -> Tuple[bool, str]: ) -> Tuple[bool, str]:
path = path_to_dockerfile_folder
logging.info( logging.info(
"Building docker image %s with version %s from path %s", "Building docker image %s with version %s from path %s",
image_name, image.repo,
version_string, version_string,
path, image.full_path,
) )
build_log = os.path.join( build_log = os.path.join(
TEMP_PATH, TEMP_PATH, f"build_and_push_log_{image.repo.replace('/', '_')}_{version_string}"
"build_and_push_log_{}_{}".format(
str(image_name).replace("/", "_"), version_string
),
) )
push_arg = "" push_arg = ""
if push: if push:
push_arg = "--push " push_arg = "--push "
with open(build_log, "w") as bl: from_tag_arg = ""
if child:
from_tag_arg = f"--build-arg FROM_TAG={version_string} "
with open(build_log, "wb") as bl:
cmd = ( cmd = (
"docker buildx build --builder default " "docker buildx build --builder default "
f"--build-arg FROM_TAG={version_string} " f"{from_tag_arg}"
f"--build-arg BUILDKIT_INLINE_CACHE=1 " f"--build-arg BUILDKIT_INLINE_CACHE=1 "
f"--tag {image_name}:{version_string} " f"--tag {image.repo}:{version_string} "
f"--cache-from type=registry,ref={image_name}:{version_string} " f"--cache-from type=registry,ref={image.repo}:{version_string} "
f"{push_arg}" f"{push_arg}"
f"--progress plain {path}" f"--progress plain {image.full_path}"
) )
logging.info("Docker command to run: %s", cmd) logging.info("Docker command to run: %s", cmd)
retcode = subprocess.Popen(cmd, shell=True, stderr=bl, stdout=bl).wait() with subprocess.Popen(cmd, shell=True, stderr=bl, stdout=bl) as proc:
retcode = proc.wait()
if retcode != 0: if retcode != 0:
return False, build_log return False, build_log
logging.info("Processing of %s successfully finished", image_name) logging.info("Processing of %s successfully finished", image.repo)
return True, build_log return True, build_log
def process_single_image( def process_single_image(
versions: List[str], path_to_dockerfile_folder: str, image_name: str, push: bool image: DockerImage,
versions: List[str],
push: bool,
child: bool,
) -> List[Tuple[str, str, str]]: ) -> List[Tuple[str, str, str]]:
logging.info("Image will be pushed with versions %s", ", ".join(versions)) logging.info("Image will be pushed with versions %s", ", ".join(versions))
result = [] result = []
for ver in versions: for ver in versions:
for i in range(5): for i in range(5):
success, build_log = build_and_push_one_image( success, build_log = build_and_push_one_image(image, ver, push, child)
path_to_dockerfile_folder, image_name, ver, push
)
if success: if success:
result.append((image_name + ":" + ver, build_log, "OK")) result.append((image.repo + ":" + ver, build_log, "OK"))
break break
logging.info( logging.info(
"Got error will retry %s time and sleep for %s seconds", i, i * 5 "Got error will retry %s time and sleep for %s seconds", i, i * 5
) )
time.sleep(i * 5) time.sleep(i * 5)
else: else:
result.append((image_name + ":" + ver, build_log, "FAIL")) result.append((image.repo + ":" + ver, build_log, "FAIL"))
logging.info("Processing finished") logging.info("Processing finished")
image.built = True
return result
def process_image_with_parents(
image: DockerImage, versions: List[str], push: bool, child: bool = False
) -> List[Tuple[str, str, str]]:
result = [] # type: List[Tuple[str,str,str]]
if image.built:
return result
if image.parent is not None:
result += process_image_with_parents(image.parent, versions, push, False)
child = True
result += process_single_image(image, versions, push, child)
return result return result
@ -182,7 +244,7 @@ def process_test_results(
build_url = s3_client.upload_test_report_to_s3( build_url = s3_client.upload_test_report_to_s3(
build_log, s3_path_prefix + "/" + os.path.basename(build_log) build_log, s3_path_prefix + "/" + os.path.basename(build_log)
) )
url_part += '<a href="{}">build_log</a>'.format(build_url) url_part += f'<a href="{build_url}">build_log</a>'
if url_part: if url_part:
test_name = image + " (" + url_part + ")" test_name = image + " (" + url_part + ")"
else: else:
@ -255,8 +317,6 @@ def main():
shell=True, shell=True,
) )
repo_path = GITHUB_WORKSPACE
if os.path.exists(TEMP_PATH): if os.path.exists(TEMP_PATH):
shutil.rmtree(TEMP_PATH) shutil.rmtree(TEMP_PATH)
os.makedirs(TEMP_PATH) os.makedirs(TEMP_PATH)
@ -267,43 +327,30 @@ def main():
else: else:
pr_info = PRInfo(need_changed_files=True) pr_info = PRInfo(need_changed_files=True)
changed_images = get_changed_docker_images(pr_info, repo_path, "docker/images.json") changed_images = get_changed_docker_images(
logging.info( pr_info, GITHUB_WORKSPACE, "docker/images.json"
"Has changed images %s", ", ".join([str(image[0]) for image in changed_images])
) )
pr_commit_version = str(pr_info.number) + "-" + pr_info.sha logging.info("Has changed images %s", ", ".join([im.path for im in changed_images]))
# The order is important, PR number is used as cache during the build
versions = [str(pr_info.number), pr_commit_version]
result_version = pr_commit_version
if pr_info.number == 0:
# First get the latest for cache
versions.insert(0, "latest")
if args.suffix: image_versions, result_version = gen_versions(pr_info, args.suffix)
# We should build architecture specific images separately and merge a
# manifest lately in a different script
versions = [f"{v}-{args.suffix}" for v in versions]
# changed_images_{suffix}.json should contain all changed images
result_version = versions
result_images = {} result_images = {}
images_processing_result = [] images_processing_result = []
for rel_path, image_name in changed_images: for image in changed_images:
full_path = os.path.join(repo_path, rel_path) images_processing_result += process_image_with_parents(
images_processing_result += process_single_image( image, image_versions, push
versions, full_path, image_name, push
) )
result_images[image_name] = result_version result_images[image.repo] = result_version
if changed_images: if changed_images:
description = "Updated " + ",".join([im[1] for im in changed_images]) description = "Updated " + ",".join([im.repo for im in changed_images])
else: else:
description = "Nothing to update" description = "Nothing to update"
if len(description) >= 140: if len(description) >= 140:
description = description[:136] + "..." description = description[:136] + "..."
with open(changed_json, "w") as images_file: with open(changed_json, "w", encoding="utf-8") as images_file:
json.dump(result_images, images_file) json.dump(result_images, images_file)
s3_helper = S3Helper("https://s3.amazonaws.com") s3_helper = S3Helper("https://s3.amazonaws.com")
@ -317,8 +364,8 @@ def main():
url = upload_results(s3_helper, pr_info.number, pr_info.sha, test_results, [], NAME) url = upload_results(s3_helper, pr_info.number, pr_info.sha, test_results, [], NAME)
print("::notice ::Report url: {}".format(url)) print(f"::notice ::Report url: {url}")
print('::set-output name=url_output::"{}"'.format(url)) print(f'::set-output name=url_output::"{url}"')
if args.no_reports: if args.no_reports:
return return

136
tests/ci/docker_test.py Normal file
View File

@ -0,0 +1,136 @@
#!/usr/bin/env python
import os
import unittest
from unittest.mock import patch
from pr_info import PRInfo
import docker_images_check as di
# di.logging.basicConfig(level=di.logging.INFO)
class TestDockerImageCheck(unittest.TestCase):
docker_images_path = os.path.join(
os.path.dirname(__file__), "tests/docker_images.json"
)
def test_get_changed_docker_images(self):
pr_info = PRInfo(PRInfo.default_event.copy())
pr_info.changed_files = {
"docker/test/stateless",
"docker/test/base",
"docker/docs/builder",
}
images = di.get_changed_docker_images(pr_info, "/", self.docker_images_path)
expected = {
di.DockerImage("docker/test/base", "clickhouse/test-base"),
di.DockerImage("docker/docs/builder", "clickhouse/docs-builder"),
di.DockerImage("docker/test/stateless", "clickhouse/stateless-test"),
di.DockerImage(
"docker/test/integration/base", "clickhouse/integration-test"
),
di.DockerImage("docker/test/fuzzer", "clickhouse/fuzzer"),
di.DockerImage(
"docker/test/keeper-jepsen", "clickhouse/keeper-jepsen-test"
),
di.DockerImage("docker/docs/check", "clickhouse/docs-check"),
di.DockerImage("docker/docs/release", "clickhouse/docs-release"),
di.DockerImage("docker/test/stateful", "clickhouse/stateful-test"),
di.DockerImage("docker/test/unit", "clickhouse/unit-test"),
di.DockerImage("docker/test/stress", "clickhouse/stress-test"),
}
self.assertEqual(images, expected)
def test_gen_version(self):
pr_info = PRInfo(PRInfo.default_event.copy())
versions, result_version = di.gen_versions(pr_info, None)
self.assertEqual(versions, ["latest", "0", "0-HEAD"])
self.assertEqual(result_version, "0-HEAD")
versions, result_version = di.gen_versions(pr_info, "suffix")
self.assertEqual(versions, ["latest-suffix", "0-suffix", "0-HEAD-suffix"])
self.assertEqual(result_version, versions)
pr_info.number = 1
versions, result_version = di.gen_versions(pr_info, None)
self.assertEqual(versions, ["1", "1-HEAD"])
self.assertEqual(result_version, "1-HEAD")
@patch("builtins.open")
@patch("subprocess.Popen")
def test_build_and_push_one_image(self, mock_popen, mock_open):
mock_popen.return_value.__enter__.return_value.wait.return_value = 0
image = di.DockerImage("path", "name", gh_repo_path="")
result, _ = di.build_and_push_one_image(image, "version", True, True)
mock_open.assert_called_once()
mock_popen.assert_called_once()
self.assertIn(
"docker buildx build --builder default --build-arg FROM_TAG=version "
"--build-arg BUILDKIT_INLINE_CACHE=1 --tag name:version --cache-from "
"type=registry,ref=name:version --push --progress plain path",
mock_popen.call_args.args,
)
self.assertTrue(result)
mock_open.reset()
mock_popen.reset()
mock_popen.return_value.__enter__.return_value.wait.return_value = 0
result, _ = di.build_and_push_one_image(image, "version2", False, True)
self.assertIn(
"docker buildx build --builder default --build-arg FROM_TAG=version2 "
"--build-arg BUILDKIT_INLINE_CACHE=1 --tag name:version2 --cache-from "
"type=registry,ref=name:version2 --progress plain path",
mock_popen.call_args.args,
)
self.assertTrue(result)
mock_popen.return_value.__enter__.return_value.wait.return_value = 1
result, _ = di.build_and_push_one_image(image, "version2", False, False)
self.assertIn(
"docker buildx build --builder default "
"--build-arg BUILDKIT_INLINE_CACHE=1 --tag name:version2 --cache-from "
"type=registry,ref=name:version2 --progress plain path",
mock_popen.call_args.args,
)
self.assertFalse(result)
@patch("docker_images_check.build_and_push_one_image")
def test_process_image_with_parents(self, mock_build):
mock_build.side_effect = lambda w, x, y, z: (True, f"{w.repo}_{x}.log")
im1 = di.DockerImage("path1", "repo1")
im2 = di.DockerImage("path2", "repo2", im1)
im3 = di.DockerImage("path3", "repo3", im2)
im4 = di.DockerImage("path4", "repo4", im1)
# We use list to have determined order of image builgings
images = [im4, im1, im3, im2, im1]
results = [
di.process_image_with_parents(im, ["v1", "v2", "latest"], True)
for im in images
]
expected = [
[ # repo4 -> repo1
("repo1:v1", "repo1_v1.log", "OK"),
("repo1:v2", "repo1_v2.log", "OK"),
("repo1:latest", "repo1_latest.log", "OK"),
("repo4:v1", "repo4_v1.log", "OK"),
("repo4:v2", "repo4_v2.log", "OK"),
("repo4:latest", "repo4_latest.log", "OK"),
],
[], # repo1 is built
[ # repo3 -> repo2 -> repo1
("repo2:v1", "repo2_v1.log", "OK"),
("repo2:v2", "repo2_v2.log", "OK"),
("repo2:latest", "repo2_latest.log", "OK"),
("repo3:v1", "repo3_v1.log", "OK"),
("repo3:v2", "repo3_v2.log", "OK"),
("repo3:latest", "repo3_latest.log", "OK"),
],
[], # repo2 -> repo1 are built
[], # repo1 is built
]
self.assertEqual(results, expected)
if __name__ == "__main__":
unittest.main()

View File

@ -174,6 +174,7 @@ def group_runners_by_tag(listed_runners):
"fuzzer-unit-tester", "fuzzer-unit-tester",
"stress-tester", "stress-tester",
"style-checker", "style-checker",
"style-checker-aarch64",
] ]
for runner in listed_runners: for runner in listed_runners:
for tag in runner.tags: for tag in runner.tags:

View File

@ -11,6 +11,8 @@ DIFF_IN_DOCUMENTATION_EXT = [".html", ".md", ".yml", ".txt", ".css", ".js", ".xm
".jpg", ".py", ".sh", ".json"] ".jpg", ".py", ".sh", ".json"]
def get_pr_for_commit(sha, ref): def get_pr_for_commit(sha, ref):
if not ref:
return None
try_get_pr_url = f"https://api.github.com/repos/{GITHUB_REPOSITORY}/commits/{sha}/pulls" try_get_pr_url = f"https://api.github.com/repos/{GITHUB_REPOSITORY}/commits/{sha}/pulls"
try: try:
response = requests.get(try_get_pr_url) response = requests.get(try_get_pr_url)
@ -32,23 +34,24 @@ def get_pr_for_commit(sha, ref):
class PRInfo: class PRInfo:
default_event = {
'commits': 1,
'before': 'HEAD~',
'after': 'HEAD',
'ref': None,
}
def __init__(self, github_event=None, need_orgs=False, need_changed_files=False, labels_from_api=False): def __init__(self, github_event=None, need_orgs=False, need_changed_files=False, labels_from_api=False):
if not github_event: if not github_event:
if GITHUB_EVENT_PATH: if GITHUB_EVENT_PATH:
with open(GITHUB_EVENT_PATH, 'r', encoding='utf-8') as event_file: with open(GITHUB_EVENT_PATH, 'r', encoding='utf-8') as event_file:
github_event = json.load(event_file) github_event = json.load(event_file)
else: else:
github_event = { github_event = PRInfo.default_event.copy()
'commits': 1,
'before': 'HEAD~',
'after': 'HEAD',
'ref': None,
}
self.event = github_event self.event = github_event
self.changed_files = set([]) self.changed_files = set([])
self.body = "" self.body = ""
ref = github_event.get("ref", "refs/head/master") ref = github_event.get("ref", "refs/head/master")
if ref.startswith('refs/heads/'): if ref and ref.startswith('refs/heads/'):
ref = ref[11:] ref = ref[11:]
# workflow completed event, used for PRs only # workflow completed event, used for PRs only

View File

@ -0,0 +1,166 @@
{
"docker/packager/deb": {
"name": "clickhouse/deb-builder",
"dependent": []
},
"docker/packager/binary": {
"name": "clickhouse/binary-builder",
"dependent": [
"docker/test/split_build_smoke_test",
"docker/test/pvs",
"docker/test/codebrowser"
]
},
"docker/test/compatibility/centos": {
"name": "clickhouse/test-old-centos",
"dependent": []
},
"docker/test/compatibility/ubuntu": {
"name": "clickhouse/test-old-ubuntu",
"dependent": []
},
"docker/test/integration/base": {
"name": "clickhouse/integration-test",
"dependent": []
},
"docker/test/fuzzer": {
"name": "clickhouse/fuzzer",
"dependent": []
},
"docker/test/performance-comparison": {
"name": "clickhouse/performance-comparison",
"dependent": []
},
"docker/test/pvs": {
"name": "clickhouse/pvs-test",
"dependent": []
},
"docker/test/util": {
"name": "clickhouse/test-util",
"dependent": [
"docker/test/base",
"docker/test/fasttest"
]
},
"docker/test/stateless": {
"name": "clickhouse/stateless-test",
"dependent": [
"docker/test/stateful",
"docker/test/unit"
]
},
"docker/test/stateful": {
"name": "clickhouse/stateful-test",
"dependent": [
"docker/test/stress"
]
},
"docker/test/unit": {
"name": "clickhouse/unit-test",
"dependent": []
},
"docker/test/stress": {
"name": "clickhouse/stress-test",
"dependent": []
},
"docker/test/split_build_smoke_test": {
"name": "clickhouse/split-build-smoke-test",
"dependent": []
},
"docker/test/codebrowser": {
"name": "clickhouse/codebrowser",
"dependent": []
},
"docker/test/integration/runner": {
"name": "clickhouse/integration-tests-runner",
"dependent": []
},
"docker/test/testflows/runner": {
"name": "clickhouse/testflows-runner",
"dependent": []
},
"docker/test/fasttest": {
"name": "clickhouse/fasttest",
"dependent": []
},
"docker/test/style": {
"name": "clickhouse/style-test",
"dependent": []
},
"docker/test/integration/s3_proxy": {
"name": "clickhouse/s3-proxy",
"dependent": []
},
"docker/test/integration/resolver": {
"name": "clickhouse/python-bottle",
"dependent": []
},
"docker/test/integration/helper_container": {
"name": "clickhouse/integration-helper",
"dependent": []
},
"docker/test/integration/mysql_golang_client": {
"name": "clickhouse/mysql-golang-client",
"dependent": []
},
"docker/test/integration/dotnet_client": {
"name": "clickhouse/dotnet-client",
"dependent": []
},
"docker/test/integration/mysql_java_client": {
"name": "clickhouse/mysql-java-client",
"dependent": []
},
"docker/test/integration/mysql_js_client": {
"name": "clickhouse/mysql-js-client",
"dependent": []
},
"docker/test/integration/mysql_php_client": {
"name": "clickhouse/mysql-php-client",
"dependent": []
},
"docker/test/integration/postgresql_java_client": {
"name": "clickhouse/postgresql-java-client",
"dependent": []
},
"docker/test/integration/kerberos_kdc": {
"name": "clickhouse/kerberos-kdc",
"dependent": []
},
"docker/test/base": {
"name": "clickhouse/test-base",
"dependent": [
"docker/test/stateless",
"docker/test/integration/base",
"docker/test/fuzzer",
"docker/test/keeper-jepsen"
]
},
"docker/test/integration/kerberized_hadoop": {
"name": "clickhouse/kerberized-hadoop",
"dependent": []
},
"docker/test/sqlancer": {
"name": "clickhouse/sqlancer-test",
"dependent": []
},
"docker/test/keeper-jepsen": {
"name": "clickhouse/keeper-jepsen-test",
"dependent": []
},
"docker/docs/builder": {
"name": "clickhouse/docs-builder",
"dependent": [
"docker/docs/check",
"docker/docs/release"
]
},
"docker/docs/check": {
"name": "clickhouse/docs-check",
"dependent": []
},
"docker/docs/release": {
"name": "clickhouse/docs-release",
"dependent": []
}
}

View File

@ -10,7 +10,7 @@ find_program(PYTEST_CMD pytest)
find_program(SUDO_CMD sudo) find_program(SUDO_CMD sudo)
# will mount only one binary to docker container - build with .so cant work # will mount only one binary to docker container - build with .so cant work
if(MAKE_STATIC_LIBRARIES AND DOCKER_CMD) if(USE_STATIC_LIBRARIES AND DOCKER_CMD)
if(INTEGRATION_USE_RUNNER AND SUDO_CMD) if(INTEGRATION_USE_RUNNER AND SUDO_CMD)
add_test(NAME integration-runner WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} COMMAND ${SUDO_CMD} ${CMAKE_CURRENT_SOURCE_DIR}/runner --binary ${ClickHouse_BINARY_DIR}/programs/clickhouse --configs-dir ${ClickHouse_SOURCE_DIR}/programs/server/) add_test(NAME integration-runner WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} COMMAND ${SUDO_CMD} ${CMAKE_CURRENT_SOURCE_DIR}/runner --binary ${ClickHouse_BINARY_DIR}/programs/clickhouse --configs-dir ${ClickHouse_SOURCE_DIR}/programs/server/)
message(STATUS "Using tests in docker with runner SUDO=${SUDO_CMD}; DOCKER=${DOCKER_CMD};") message(STATUS "Using tests in docker with runner SUDO=${SUDO_CMD}; DOCKER=${DOCKER_CMD};")

View File

@ -1,6 +1,7 @@
import pytest import pytest
from helpers.client import QueryRuntimeException from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster
from ast import literal_eval
cluster = ClickHouseCluster(__file__) cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_zookeeper=True) node1 = cluster.add_instance('node1', with_zookeeper=True)
@ -30,12 +31,12 @@ def start_cluster():
def test_replica_is_active(start_cluster): def test_replica_is_active(start_cluster):
query_result = node1.query("select replica_is_active from system.replicas where table = 'test_table'") query_result = node1.query("select replica_is_active from system.replicas where table = 'test_table'")
assert query_result == '{\'node1\':1,\'node2\':1,\'node3\':1}\n' assert literal_eval(query_result) == {'node1': 1, 'node2': 1, 'node3': 1}
node3.stop() node3.stop()
query_result = node1.query("select replica_is_active from system.replicas where table = 'test_table'") query_result = node1.query("select replica_is_active from system.replicas where table = 'test_table'")
assert query_result == '{\'node1\':1,\'node2\':1,\'node3\':0}\n' assert literal_eval(query_result) == {'node1': 1, 'node2': 1, 'node3': 0}
node2.stop() node2.stop()
query_result = node1.query("select replica_is_active from system.replicas where table = 'test_table'") query_result = node1.query("select replica_is_active from system.replicas where table = 'test_table'")
assert query_result == '{\'node1\':1,\'node2\':0,\'node3\':0}\n' assert literal_eval(query_result) == {'node1': 1, 'node2': 0, 'node3': 0}

View File

@ -366,6 +366,43 @@ def test_hdfs_directory_not_exist(started_cluster):
node1.query(ddl) node1.query(ddl)
assert "" == node1.query("select * from HDFSStorageWithNotExistDir") assert "" == node1.query("select * from HDFSStorageWithNotExistDir")
def test_overwrite(started_cluster):
hdfs_api = started_cluster.hdfs_api
table_function = f"hdfs('hdfs://hdfs1:9000/data', 'Parquet', 'a Int32, b String')"
node1.query(f"create table test_overwrite as {table_function}")
node1.query(f"insert into test_overwrite select number, randomString(100) from numbers(5)")
node1.query_and_get_error(f"insert into test_overwrite select number, randomString(100) FROM numbers(10)")
node1.query(f"insert into test_overwrite select number, randomString(100) from numbers(10) settings hdfs_truncate_on_insert=1")
result = node1.query(f"select count() from test_overwrite")
assert(int(result) == 10)
def test_multiple_inserts(started_cluster):
hdfs_api = started_cluster.hdfs_api
table_function = f"hdfs('hdfs://hdfs1:9000/data_multiple_inserts', 'Parquet', 'a Int32, b String')"
node1.query(f"create table test_multiple_inserts as {table_function}")
node1.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(10)")
node1.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(20) settings hdfs_create_new_file_on_insert=1")
node1.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(30) settings hdfs_create_new_file_on_insert=1")
result = node1.query(f"select count() from test_multiple_inserts")
assert(int(result) == 60)
result = node1.query(f"drop table test_multiple_inserts")
table_function = f"hdfs('hdfs://hdfs1:9000/data_multiple_inserts.gz', 'Parquet', 'a Int32, b String')"
node1.query(f"create table test_multiple_inserts as {table_function}")
node1.query(f"insert into test_multiple_inserts select number, randomString(100) FROM numbers(10)")
node1.query(f"insert into test_multiple_inserts select number, randomString(100) FROM numbers(20) settings hdfs_create_new_file_on_insert=1")
node1.query(f"insert into test_multiple_inserts select number, randomString(100) FROM numbers(30) settings hdfs_create_new_file_on_insert=1")
result = node1.query(f"select count() from test_multiple_inserts")
assert(int(result) == 60)
def test_format_detection(started_cluster): def test_format_detection(started_cluster):
node1.query(f"create table arrow_table (x UInt64) engine=HDFS('hdfs://hdfs1:9000/data.arrow')") node1.query(f"create table arrow_table (x UInt64) engine=HDFS('hdfs://hdfs1:9000/data.arrow')")
node1.query(f"insert into arrow_table select 1") node1.query(f"insert into arrow_table select 1")

View File

@ -10,6 +10,11 @@
<access_key_id>minio</access_key_id> <access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key> <secret_access_key>minio123</secret_access_key>
</s3_parquet> </s3_parquet>
<s3_parquet_gz>
<url>http://minio1:9001/root/test_parquet_gz</url>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_parquet_gz>
<s3_orc> <s3_orc>
<url>http://minio1:9001/root/test_orc</url> <url>http://minio1:9001/root/test_orc</url>
<access_key_id>minio</access_key_id> <access_key_id>minio</access_key_id>

View File

@ -136,7 +136,7 @@ def test_put(started_cluster, maybe_auth, positive, compression):
values_csv = "1,2,3\n3,2,1\n78,43,45\n" values_csv = "1,2,3\n3,2,1\n78,43,45\n"
filename = "test.csv" filename = "test.csv"
put_query = f"""insert into table function s3('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{filename}', put_query = f"""insert into table function s3('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{filename}',
{maybe_auth}'CSV', '{table_format}', {compression}) values {values}""" {maybe_auth}'CSV', '{table_format}', {compression}) values settings s3_truncate_on_insert=1 {values}"""
try: try:
run_query(instance, put_query) run_query(instance, put_query)
@ -298,7 +298,7 @@ def test_put_csv(started_cluster, maybe_auth, positive):
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32" table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
filename = "test.csv" filename = "test.csv"
put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') format CSV".format( put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') format CSV settings s3_truncate_on_insert=1".format(
started_cluster.minio_ip, MINIO_INTERNAL_PORT, bucket, filename, maybe_auth, table_format) started_cluster.minio_ip, MINIO_INTERNAL_PORT, bucket, filename, maybe_auth, table_format)
csv_data = "8,9,16\n11,18,13\n22,14,2\n" csv_data = "8,9,16\n11,18,13\n22,14,2\n"
@ -322,7 +322,7 @@ def test_put_get_with_redirect(started_cluster):
values = "(1, 1, 1), (1, 1, 1), (11, 11, 11)" values = "(1, 1, 1), (1, 1, 1), (11, 11, 11)"
values_csv = "1,1,1\n1,1,1\n11,11,11\n" values_csv = "1,1,1\n1,1,1\n11,11,11\n"
filename = "test.csv" filename = "test.csv"
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format( query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values settings s3_truncate_on_insert=1 {}".format(
started_cluster.minio_redirect_host, started_cluster.minio_redirect_port, bucket, filename, table_format, values) started_cluster.minio_redirect_host, started_cluster.minio_redirect_port, bucket, filename, table_format, values)
run_query(instance, query) run_query(instance, query)
@ -350,12 +350,12 @@ def test_put_with_zero_redirect(started_cluster):
filename = "test.csv" filename = "test.csv"
# Should work without redirect # Should work without redirect
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format( query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values settings s3_truncate_on_insert=1 {}".format(
started_cluster.minio_ip, MINIO_INTERNAL_PORT, bucket, filename, table_format, values) started_cluster.minio_ip, MINIO_INTERNAL_PORT, bucket, filename, table_format, values)
run_query(instance, query) run_query(instance, query)
# Should not work with redirect # Should not work with redirect
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format( query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values settings s3_truncate_on_insert=1 {}".format(
started_cluster.minio_redirect_host, started_cluster.minio_redirect_port, bucket, filename, table_format, values) started_cluster.minio_redirect_host, started_cluster.minio_redirect_port, bucket, filename, table_format, values)
exception_raised = False exception_raised = False
try: try:
@ -805,13 +805,13 @@ def test_seekable_formats(started_cluster):
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
table_function = f"s3(s3_parquet, structure='a Int32, b String', format='Parquet')" table_function = f"s3(s3_parquet, structure='a Int32, b String', format='Parquet')"
instance.query(f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000)") instance.query(f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000) settings s3_truncate_on_insert=1")
result = instance.query(f"SELECT count() FROM {table_function}") result = instance.query(f"SELECT count() FROM {table_function}")
assert(int(result) == 5000000) assert(int(result) == 5000000)
table_function = f"s3(s3_orc, structure='a Int32, b String', format='ORC')" table_function = f"s3(s3_orc, structure='a Int32, b String', format='ORC')"
exec_query_with_retry(instance, f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000)") exec_query_with_retry(instance, f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000) settings s3_truncate_on_insert=1")
result = instance.query(f"SELECT count() FROM {table_function}") result = instance.query(f"SELECT count() FROM {table_function}")
assert(int(result) == 5000000) assert(int(result) == 5000000)
@ -827,14 +827,14 @@ def test_seekable_formats_url(started_cluster):
instance = started_cluster.instances["dummy"] instance = started_cluster.instances["dummy"]
table_function = f"s3(s3_parquet, structure='a Int32, b String', format='Parquet')" table_function = f"s3(s3_parquet, structure='a Int32, b String', format='Parquet')"
instance.query(f"insert into table function {table_function} select number, randomString(100) from numbers(5000000)") instance.query(f"insert into table function {table_function} select number, randomString(100) from numbers(5000000) settings s3_truncate_on_insert=1")
table_function = f"url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_parquet', 'Parquet', 'a Int32, b String')" table_function = f"url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_parquet', 'Parquet', 'a Int32, b String')"
result = instance.query(f"SELECT count() FROM {table_function}") result = instance.query(f"SELECT count() FROM {table_function}")
assert(int(result) == 5000000) assert(int(result) == 5000000)
table_function = f"s3(s3_orc, structure='a Int32, b String', format='ORC')" table_function = f"s3(s3_orc, structure='a Int32, b String', format='ORC')"
exec_query_with_retry(instance, f"insert into table function {table_function} select number, randomString(100) from numbers(5000000)") exec_query_with_retry(instance, f"insert into table function {table_function} select number, randomString(100) from numbers(5000000) settings s3_truncate_on_insert=1")
table_function = f"url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_orc', 'ORC', 'a Int32, b String')" table_function = f"url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_orc', 'ORC', 'a Int32, b String')"
result = instance.query(f"SELECT count() FROM {table_function}") result = instance.query(f"SELECT count() FROM {table_function}")
@ -917,6 +917,48 @@ def test_empty_file(started_cluster):
assert(int(result) == 0) assert(int(result) == 0)
def test_overwrite(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
table_function = f"s3(s3_parquet, structure='a Int32, b String', format='Parquet')"
instance.query(f"create table test_overwrite as {table_function}")
instance.query(f"truncate table test_overwrite")
instance.query(f"insert into test_overwrite select number, randomString(100) from numbers(50) settings s3_truncate_on_insert=1")
instance.query_and_get_error(f"insert into test_overwrite select number, randomString(100) from numbers(100)")
instance.query(f"insert into test_overwrite select number, randomString(100) from numbers(200) settings s3_truncate_on_insert=1")
result = instance.query(f"select count() from test_overwrite")
assert(int(result) == 200)
def test_create_new_files_on_insert(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
table_function = f"s3(s3_parquet, structure='a Int32, b String', format='Parquet')"
instance.query(f"create table test_multiple_inserts as {table_function}")
instance.query(f"truncate table test_multiple_inserts")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(10) settings s3_truncate_on_insert=1")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(20) settings s3_create_new_file_on_insert=1")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(30) settings s3_create_new_file_on_insert=1")
result = instance.query(f"select count() from test_multiple_inserts")
assert(int(result) == 60)
instance.query(f"drop table test_multiple_inserts")
table_function = f"s3(s3_parquet_gz, structure='a Int32, b String', format='Parquet')"
instance.query(f"create table test_multiple_inserts as {table_function}")
instance.query(f"truncate table test_multiple_inserts")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(10) settings s3_truncate_on_insert=1")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(20) settings s3_create_new_file_on_insert=1")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(30) settings s3_create_new_file_on_insert=1")
result = instance.query(f"select count() from test_multiple_inserts")
assert(int(result) == 60)
def test_format_detection(started_cluster): def test_format_detection(started_cluster):
bucket = started_cluster.minio_bucket bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] instance = started_cluster.instances["dummy"]

View File

@ -0,0 +1,100 @@
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

View File

@ -0,0 +1,39 @@
-- Tags: no-fasttest, no-parallel
drop table if exists test;
create table test (number UInt64) engine=File('Parquet');
insert into test select * from numbers(10);
insert into test select * from numbers(10, 10); -- { serverError CANNOT_APPEND_TO_FILE }
insert into test select * from numbers(10, 10) settings engine_file_allow_create_multiple_files=1;
select * from test order by number;
truncate table test;
drop table test;
create table test (number UInt64) engine=File('Parquet', 'test_02155/test1/data.Parquet');
insert into test select * from numbers(10) settings engine_file_truncate_on_insert=1;
insert into test select * from numbers(10, 10); -- { serverError CANNOT_APPEND_TO_FILE }
insert into test select * from numbers(10, 10) settings engine_file_allow_create_multiple_files=1;
select * from test order by number;
drop table test;
insert into table function file(concat(currentDatabase(), '/test2/data.Parquet'), 'Parquet', 'number UInt64') select * from numbers(10) settings engine_file_truncate_on_insert=1;
insert into table function file(concat(currentDatabase(), '/test2/data.Parquet'), 'Parquet', 'number UInt64') select * from numbers(10, 10); -- { serverError CANNOT_APPEND_TO_FILE }
insert into table function file(concat(currentDatabase(), '/test2/data.Parquet'), 'Parquet', 'number UInt64') select * from numbers(10, 10) settings engine_file_allow_create_multiple_files=1;
select * from file(concat(currentDatabase(), '/test2/data.Parquet'), 'Parquet', 'number UInt64');
select * from file(concat(currentDatabase(), '/test2/data.1.Parquet'), 'Parquet', 'number UInt64');
create table test (number UInt64) engine=File('Parquet', 'test_02155/test3/data.Parquet.gz');
insert into test select * from numbers(10) settings engine_file_truncate_on_insert=1;
;
insert into test select * from numbers(10, 10); -- { serverError CANNOT_APPEND_TO_FILE }
insert into test select * from numbers(10, 10) settings engine_file_allow_create_multiple_files=1;
select * from test order by number;
drop table test;
insert into table function file(concat(currentDatabase(), '/test4/data.Parquet.gz'), 'Parquet', 'number UInt64') select * from numbers(10) settings engine_file_truncate_on_insert=1;
insert into table function file(concat(currentDatabase(), '/test4/data.Parquet.gz'), 'Parquet', 'number UInt64') select * from numbers(10, 10); -- { serverError CANNOT_APPEND_TO_FILE }
insert into table function file(concat(currentDatabase(), '/test4/data.Parquet.gz'), 'Parquet', 'number UInt64') select * from numbers(10, 10) settings engine_file_allow_create_multiple_files=1;
select * from file(concat(currentDatabase(), '/test4/data.Parquet.gz'), 'Parquet', 'number UInt64');
select * from file(concat(currentDatabase(), '/test4/data.1.Parquet.gz'), 'Parquet', 'number UInt64');

View File

@ -1,5 +1,5 @@
-- Tags: no-fasttest -- Tags: no-fasttest, no-parallel
insert into table function file('data.avro', 'Avro', 'x UInt64') select * from numbers(10); insert into table function file('data.avro', 'Parquet', 'x UInt64') select * from numbers(10);
insert into table function file('data.avro', 'Avro', 'x UInt64') select * from numbers(10); insert into table function file('data.avro', 'Parquet', 'x UInt64') select * from numbers(10); -- { serverError CANNOT_APPEND_TO_FILE }
insert into table function file('data.avro', 'Avro', 'x UInt64') select * from numbers(10); insert into table function file('data.avro', 'Parquet', 'x UInt64') select * from numbers(10); -- { serverError CANNOT_APPEND_TO_FILE }
select 'OK'; select 'OK';

View File

@ -0,0 +1,40 @@
\N
\N
Hello
Hello
Hello
Hello
Hello
0 \N
1 \N
0 Hello
1 Hello
0
1
0 Hello
1 Hello
0 \N
1 \N
0 Hello
1 Hello
\N
\N
Hello
Hello
Hello
Hello
Hello
0 \N
1 \N
0 Hello
1 Hello
0
1
0 Hello
1 Hello
0 \N
1 \N
0 Hello
1 Hello

View File

@ -0,0 +1,40 @@
SELECT anyIf(toNullable('Hello'), arrayJoin([1, NULL]) = 0);
SELECT anyIf(toNullable('Hello'), arrayJoin([1, 1]) = 0);
SELECT anyIf(toNullable('Hello'), arrayJoin([1, 0]) = 0);
SELECT anyIf(toNullable('Hello'), arrayJoin([0, 1]) = 0);
SELECT anyIf(toNullable('Hello'), arrayJoin([0, 0]) = 0);
SELECT anyIf('Hello', arrayJoin([1, NULL]) = 0);
SELECT anyIf('Hello', arrayJoin([1, NULL]) = 1);
SELECT anyIf('Hello', arrayJoin([1, NULL]) IS NULL);
SELECT number, anyIf(toNullable('Hello'), arrayJoin([1, NULL]) = 0) FROM numbers(2) GROUP BY number ORDER BY number;
SELECT number, anyIf(toNullable('Hello'), arrayJoin([1, NULL, 0]) = 0) FROM numbers(2) GROUP BY number ORDER BY number;
SELECT number, anyIf('Hello', arrayJoin([1, NULL]) = 0) FROM numbers(2) GROUP BY number ORDER BY number;
SELECT number, anyIf('Hello', arrayJoin([1, NULL, 0]) = 0) FROM numbers(2) GROUP BY number ORDER BY number;
SELECT number, anyIf(toNullable('Hello'), arrayJoin([1, 1]) = 0) FROM numbers(2) GROUP BY number ORDER BY number;
SELECT number, anyIf(toNullable('Hello'), arrayJoin([1, 0]) = 0) FROM numbers(2) GROUP BY number ORDER BY number;
SELECT anyIf(toNullable('Hello'), arrayJoin([1, NULL]) = 0) FROM remote('127.0.0.{1,2}', system.one);
SELECT anyIf(toNullable('Hello'), arrayJoin([1, 1]) = 0) FROM remote('127.0.0.{1,2}', system.one);
SELECT anyIf(toNullable('Hello'), arrayJoin([1, 0]) = 0) FROM remote('127.0.0.{1,2}', system.one);
SELECT anyIf(toNullable('Hello'), arrayJoin([0, 1]) = 0) FROM remote('127.0.0.{1,2}', system.one);
SELECT anyIf(toNullable('Hello'), arrayJoin([0, 0]) = 0) FROM remote('127.0.0.{1,2}', system.one);
SELECT anyIf('Hello', arrayJoin([1, NULL]) = 0) FROM remote('127.0.0.{1,2}', system.one);
SELECT anyIf('Hello', arrayJoin([1, NULL]) = 1) FROM remote('127.0.0.{1,2}', system.one);
SELECT anyIf('Hello', arrayJoin([1, NULL]) IS NULL) FROM remote('127.0.0.{1,2}', system.one);
SELECT number, anyIf(toNullable('Hello'), arrayJoin([1, NULL]) = 0) FROM remote('127.0.0.{1,2}', numbers(2)) GROUP BY number ORDER BY number;
SELECT number, anyIf(toNullable('Hello'), arrayJoin([1, NULL, 0]) = 0) FROM remote('127.0.0.{1,2}', numbers(2)) GROUP BY number ORDER BY number;
SELECT number, anyIf('Hello', arrayJoin([1, NULL]) = 0) FROM remote('127.0.0.{1,2}', numbers(2)) GROUP BY number ORDER BY number;
SELECT number, anyIf('Hello', arrayJoin([1, NULL, 0]) = 0) FROM remote('127.0.0.{1,2}', numbers(2)) GROUP BY number ORDER BY number;
SELECT number, anyIf(toNullable('Hello'), arrayJoin([1, 1]) = 0) FROM remote('127.0.0.{1,2}', numbers(2)) GROUP BY number ORDER BY number;
SELECT number, anyIf(toNullable('Hello'), arrayJoin([1, 0]) = 0) FROM remote('127.0.0.{1,2}', numbers(2)) GROUP BY number ORDER BY number;

View File

@ -2,7 +2,7 @@ if (USE_CLANG_TIDY)
set (CMAKE_CXX_CLANG_TIDY "${CLANG_TIDY_PATH}") set (CMAKE_CXX_CLANG_TIDY "${CLANG_TIDY_PATH}")
endif () endif ()
if(MAKE_STATIC_LIBRARIES) if(USE_STATIC_LIBRARIES)
set(MAX_LINKER_MEMORY 3500) set(MAX_LINKER_MEMORY 3500)
else() else()
set(MAX_LINKER_MEMORY 2500) set(MAX_LINKER_MEMORY 2500)

View File

@ -37,9 +37,9 @@ void dumpMachine(std::shared_ptr<KeeperStateMachine> machine)
for (const auto & child : value.children) for (const auto & child : value.children)
{ {
if (key == "/") if (key == "/")
keys.push(key + child); keys.push(key + child.toString());
else else
keys.push(key + "/" + child); keys.push(key + "/" + child.toString());
} }
} }
std::cout << std::flush; std::cout << std::flush;