Merge branch 'master' into fix-whole-text-serialization

This commit is contained in:
Kruglov Pavel 2021-11-25 14:00:20 +03:00 committed by GitHub
commit f28130193a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
78 changed files with 1156 additions and 233 deletions

View File

@ -152,7 +152,7 @@ jobs:
sudo rm -fr $TEMP_PATH
mkdir -p $TEMP_PATH
cp -r $GITHUB_WORKSPACE $TEMP_PATH
cd $REPO_COPY/tests/ci && python3 compatibility_check.py 0
cd $REPO_COPY/tests/ci && python3 compatibility_check.py
- name: Cleanup
if: always()
run: |
@ -604,7 +604,9 @@ jobs:
sudo rm -fr $TEMP_PATH
FunctionalStatelessTestTsan:
needs: [BuilderDebTsan]
runs-on: [self-hosted, func-tester]
# tests can consume more than 60GB of memory,
# so use bigger server
runs-on: [self-hosted, stress-tester]
steps:
- name: Download json reports
uses: actions/download-artifact@v2
@ -1297,6 +1299,34 @@ jobs:
docker kill $(docker ps -q) ||:
docker rm -f $(docker ps -a -q) ||:
sudo rm -fr $TEMP_PATH
IntegrationTestsFlakyCheck:
needs: [BuilderDebAsan]
runs-on: [self-hosted, stress-tester]
steps:
- name: Download json reports
uses: actions/download-artifact@v2
with:
path: ${{runner.temp}}/reports_dir
- name: Check out repository code
uses: actions/checkout@v2
- name: Integration test
env:
TEMP_PATH: ${{runner.temp}}/integration_tests_asan_flaky_check
REPORTS_PATH: ${{runner.temp}}/reports_dir
CHECK_NAME: 'Integration tests flaky check (asan, actions)'
REPO_COPY: ${{runner.temp}}/integration_tests_asan_flaky_check/ClickHouse
run: |
sudo rm -fr $TEMP_PATH
mkdir -p $TEMP_PATH
cp -r $GITHUB_WORKSPACE $TEMP_PATH
cd $REPO_COPY/tests/ci
python3 integration_test_check.py "$CHECK_NAME"
- name: Cleanup
if: always()
run: |
docker kill $(docker ps -q) ||:
docker rm -f $(docker ps -a -q) ||:
sudo rm -fr $TEMP_PATH
#############################################################################################
#################################### UNIT TESTS #############################################
#############################################################################################
@ -1481,6 +1511,7 @@ jobs:
- UnitTestsReleaseClang
- SplitBuildSmokeTest
- CompatibilityCheck
- IntegrationTestsFlakyCheck
runs-on: [self-hosted, style-checker]
steps:
- name: Check out repository code

View File

@ -18,3 +18,27 @@ if (NOT DEFINED ENV{CLION_IDE} AND NOT DEFINED ENV{XCODE_IDE})
set(CMAKE_GENERATOR "Ninja" CACHE INTERNAL "" FORCE)
endif ()
endif()
# Default toolchain - this is needed to avoid dependency on OS files.
execute_process(COMMAND uname -s OUTPUT_VARIABLE OS)
execute_process(COMMAND uname -m OUTPUT_VARIABLE ARCH)
if (OS MATCHES "Linux"
AND NOT DEFINED CMAKE_TOOLCHAIN_FILE
AND NOT UNBUNDLED
AND NOT DISABLE_HERMETIC_BUILD
AND ($ENV{CC} MATCHES ".*clang.*" OR CMAKE_C_COMPILER MATCHES ".*clang.*")
AND (USE_STATIC_LIBRARIES OR NOT DEFINED USE_STATIC_LIBRARIES))
if (ARCH MATCHES "amd64|x86_64")
set (CMAKE_TOOLCHAIN_FILE "cmake/linux/toolchain-x86_64.cmake" CACHE INTERNAL "" FORCE)
elseif (ARCH MATCHES "^(aarch64.*|AARCH64.*|arm64.*|ARM64.*)")
set (CMAKE_TOOLCHAIN_FILE "cmake/linux/toolchain-aarch64.cmake" CACHE INTERNAL "" FORCE)
elseif (ARCH MATCHES "^(ppc64le.*|PPC64LE.*)")
set (CMAKE_TOOLCHAIN_FILE "cmake/linux/toolchain-ppc64le.cmake" CACHE INTERNAL "" FORCE)
else ()
message (FATAL_ERROR "Unsupported architecture: ${ARCH}")
endif ()
endif()

View File

@ -19,9 +19,11 @@ The following versions of ClickHouse server are currently being supported with s
| 21.4 | :x: |
| 21.5 | :x: |
| 21.6 | :x: |
| 21.7 | |
| 21.7 | :x: |
| 21.8 | ✅ |
| 21.9 | ✅ |
| 21.10 | ✅ |
| 21.11 | ✅ |
## Reporting a Vulnerability

View File

@ -12,13 +12,13 @@ macro (add_warning flag)
if (SUPPORTS_CXXFLAG_${underscored_flag})
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -W${flag}")
else ()
message (WARNING "Flag -W${flag} is unsupported")
message (STATUS "Flag -W${flag} is unsupported")
endif ()
if (SUPPORTS_CFLAG_${underscored_flag})
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -W${flag}")
else ()
message (WARNING "Flag -W${flag} is unsupported")
message (STATUS "Flag -W${flag} is unsupported")
endif ()
endmacro ()
@ -39,7 +39,7 @@ macro (target_add_warning target flag)
if (SUPPORTS_CXXFLAG_${underscored_flag})
target_compile_options (${target} PRIVATE "-W${flag}")
else ()
message (WARNING "Flag -W${flag} is unsupported")
message (STATUS "Flag -W${flag} is unsupported")
endif ()
endmacro ()

View File

@ -3,7 +3,7 @@ set (CMAKE_SYSTEM_PROCESSOR "aarch64")
set (CMAKE_C_COMPILER_TARGET "aarch64-unknown-freebsd12")
set (CMAKE_CXX_COMPILER_TARGET "aarch64-unknown-freebsd12")
set (CMAKE_ASM_COMPILER_TARGET "aarch64-unknown-freebsd12")
set (CMAKE_SYSROOT "${CMAKE_CURRENT_LIST_DIR}/../toolchain/freebsd-aarch64")
set (CMAKE_SYSROOT "${CMAKE_CURRENT_LIST_DIR}/../../contrib/sysroot/freebsd-aarch64")
set (CMAKE_TRY_COMPILE_TARGET_TYPE STATIC_LIBRARY) # disable linkage check - it doesn't work in CMake

View File

@ -3,7 +3,7 @@ set (CMAKE_SYSTEM_PROCESSOR "x86_64")
set (CMAKE_C_COMPILER_TARGET "x86_64-pc-freebsd11")
set (CMAKE_CXX_COMPILER_TARGET "x86_64-pc-freebsd11")
set (CMAKE_ASM_COMPILER_TARGET "x86_64-pc-freebsd11")
set (CMAKE_SYSROOT "${CMAKE_CURRENT_LIST_DIR}/../toolchain/freebsd-x86_64")
set (CMAKE_SYSROOT "${CMAKE_CURRENT_LIST_DIR}/../../contrib/sysroot/freebsd-x86_64")
set (CMAKE_TRY_COMPILE_TARGET_TYPE STATIC_LIBRARY) # disable linkage check - it doesn't work in CMake

View File

@ -5,8 +5,12 @@ set (DEFAULT_LIBS "-nodefaultlibs")
# We need builtins from Clang's RT even without libcxx - for ubsan+int128.
# See https://bugs.llvm.org/show_bug.cgi?id=16404
if (COMPILER_CLANG AND NOT CMAKE_CROSSCOMPILING)
execute_process (COMMAND ${CMAKE_CXX_COMPILER} --print-libgcc-file-name --rtlib=compiler-rt OUTPUT_VARIABLE BUILTINS_LIBRARY OUTPUT_STRIP_TRAILING_WHITESPACE)
if (COMPILER_CLANG)
execute_process (COMMAND ${CMAKE_CXX_COMPILER} --target=${CMAKE_CXX_COMPILER_TARGET} --print-libgcc-file-name --rtlib=compiler-rt OUTPUT_VARIABLE BUILTINS_LIBRARY OUTPUT_STRIP_TRAILING_WHITESPACE)
if (NOT EXISTS "${BUILTINS_LIBRARY}")
set (BUILTINS_LIBRARY "-lgcc")
endif ()
else ()
set (BUILTINS_LIBRARY "-lgcc")
endif ()

View File

@ -500,7 +500,6 @@ function(preprocess_et out_var)
COMMAND perl "${KRB5_SOURCE_DIR}/util/et/compile_et" -d "${KRB5_SOURCE_DIR}/util/et" ${in_f}
DEPENDS ${in_f} "${KRB5_SOURCE_DIR}/util/et/compile_et"
WORKING_DIRECTORY ${ET_PATH}
COMMENT "Creating preprocessed file ${F_C}"
VERBATIM
)
list(APPEND result ${F_C})
@ -526,7 +525,6 @@ add_custom_command(
add_custom_target(
ERROR_MAP_H
DEPENDS "${KRB5_SOURCE_DIR}/lib/gssapi/krb5/error_map.h"
COMMENT "generating error_map.h"
VERBATIM
)
@ -539,14 +537,12 @@ add_custom_command(
add_custom_target(
ERRMAP_H
DEPENDS "${KRB5_SOURCE_DIR}/lib/gssapi/generic/errmap.h"
COMMENT "generating errmap.h"
VERBATIM
)
add_custom_target(
KRB_5_H
DEPENDS "${CMAKE_CURRENT_BINARY_DIR}/include/krb5/krb5.h"
COMMENT "generating krb5.h"
VERBATIM
)

2
contrib/libhdfs3 vendored

@ -1 +1 @@
Subproject commit a8c37ee001af1ae88e5dfa637ae5b31b087c96d3
Subproject commit 9194af44588633c1b2dae44bf945804401ff883e

View File

@ -211,7 +211,9 @@ add_library(protobuf::libprotoc ALIAS libprotoc)
set(protoc_files ${protobuf_source_dir}/src/google/protobuf/compiler/main.cc)
if (NOT CMAKE_CROSSCOMPILING)
if (CMAKE_HOST_SYSTEM_NAME STREQUAL CMAKE_SYSTEM_NAME
AND CMAKE_HOST_SYSTEM_PROCESSOR STREQUAL CMAKE_SYSTEM_PROCESSOR)
add_executable(protoc ${protoc_files})
target_link_libraries(protoc libprotoc libprotobuf pthread)
add_executable(protobuf::protoc ALIAS protoc)

2
contrib/sysroot vendored

@ -1 +1 @@
Subproject commit 1a64956aa7c280448be6526251bb2b8e6d380ab1
Subproject commit 4ef348b7f30f2ad5b02b266268b3c948e51ad457

View File

@ -93,9 +93,6 @@ RUN git clone https://github.com/tpoechtrager/cctools-port.git \
# Download toolchain and SDK for Darwin
RUN wget -nv https://github.com/phracker/MacOSX-SDKs/releases/download/11.3/MacOSX11.0.sdk.tar.xz
# Download toolchain for FreeBSD 11.3
RUN wget -nv https://clickhouse-datasets.s3.yandex.net/toolchains/toolchains/freebsd-11.3-toolchain.tar.xz
# NOTE: Seems like gcc-11 is too new for ubuntu20 repository
RUN add-apt-repository ppa:ubuntu-toolchain-r/test --yes \
&& apt-get update \

View File

@ -6,9 +6,6 @@ mkdir -p build/cmake/toolchain/darwin-x86_64
tar xJf MacOSX11.0.sdk.tar.xz -C build/cmake/toolchain/darwin-x86_64 --strip-components=1
ln -sf darwin-x86_64 build/cmake/toolchain/darwin-aarch64
mkdir -p build/cmake/toolchain/freebsd-x86_64
tar xJf freebsd-11.3-toolchain.tar.xz -C build/cmake/toolchain/freebsd-x86_64 --strip-components=1
# Uncomment to debug ccache. Don't put ccache log in /output right away, or it
# will be confusingly packed into the "performance" package.
# export CCACHE_LOGFILE=/build/ccache.log

View File

@ -52,14 +52,18 @@ RUN apt-get update \
--yes --no-install-recommends
# Sanitizer options for services (clickhouse-server)
RUN echo "TSAN_OPTIONS='verbosity=1000 halt_on_error=1 history_size=7'" >> /etc/environment; \
# Set resident memory limit for TSAN to 45GiB (46080MiB) to avoid OOMs in Stress tests
# and MEMORY_LIMIT_EXCEEDED exceptions in Functional tests (total memory limit in Functional tests is ~55.24 GiB).
# TSAN will flush shadow memory when reaching this limit.
# It may cause false-negatives, but it's better than OOM.
RUN echo "TSAN_OPTIONS='verbosity=1000 halt_on_error=1 history_size=7 memory_limit_mb=46080'" >> /etc/environment; \
echo "UBSAN_OPTIONS='print_stacktrace=1'" >> /etc/environment; \
echo "MSAN_OPTIONS='abort_on_error=1 poison_in_dtor=1'" >> /etc/environment; \
echo "LSAN_OPTIONS='suppressions=/usr/share/clickhouse-test/config/lsan_suppressions.txt'" >> /etc/environment; \
ln -s /usr/lib/llvm-${LLVM_VERSION}/bin/llvm-symbolizer /usr/bin/llvm-symbolizer;
# Sanitizer options for current shell (not current, but the one that will be spawned on "docker run")
# (but w/o verbosity for TSAN, otherwise test.reference will not match)
ENV TSAN_OPTIONS='halt_on_error=1 history_size=7'
ENV TSAN_OPTIONS='halt_on_error=1 history_size=7 memory_limit_mb=46080'
ENV UBSAN_OPTIONS='print_stacktrace=1'
ENV MSAN_OPTIONS='abort_on_error=1 poison_in_dtor=1'

View File

@ -159,6 +159,7 @@ function clone_submodules
cd "$FASTTEST_SOURCE"
SUBMODULES_TO_UPDATE=(
contrib/sysroot
contrib/magic_enum
contrib/abseil-cpp
contrib/boost

View File

@ -40,7 +40,7 @@ RUN set -x \
ENV CCACHE_DIR=/test_output/ccache
CMD echo "Running PVS version $PKG_VERSION" && mkdir -p $CCACHE_DIR && cd /repo_folder && pvs-studio-analyzer credentials $LICENCE_NAME $LICENCE_KEY -o ./licence.lic \
&& cmake . -D"ENABLE_EMBEDDED_COMPILER"=OFF -D"USE_INTERNAL_PROTOBUF_LIBRARY"=OFF -D"USE_INTERNAL_GRPC_LIBRARY"=OFF -DCMAKE_C_COMPILER=clang-13 -DCMAKE_CXX_COMPILER=clang\+\+-13 \
&& cmake . -D"ENABLE_EMBEDDED_COMPILER"=OFF -D"DISABLE_HERMETIC_BUILD"=ON -DCMAKE_C_COMPILER=clang-13 -DCMAKE_CXX_COMPILER=clang\+\+-13 \
&& ninja re2_st clickhouse_grpc_protos \
&& pvs-studio-analyzer analyze -o pvs-studio.log -e contrib -j 4 -l ./licence.lic; \
cp /repo_folder/pvs-studio.log /test_output; \

View File

@ -588,7 +588,7 @@ For more information, see the section [Creating replicated tables](../../engines
Approximate size (in bytes) of the cache of marks used by table engines of the [MergeTree](../../engines/table-engines/mergetree-family/mergetree.md) family.
The cache is shared for the server and memory is allocated as needed. The cache size must be at least 5368709120.
The cache is shared for the server and memory is allocated as needed.
**Example**

View File

@ -85,9 +85,6 @@
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
# include "Common/config_version.h"
# if USE_OPENCL
# include "Common/BitonicSort.h"
# endif
#endif
#if defined(OS_LINUX)

View File

@ -102,11 +102,16 @@ private:
size_t width;
X min_x;
X max_x;
bool specified_min_max_x;
String getBar(const UInt8 value) const
template <class T>
String getBar(const T value) const
{
if (isNaN(value) || value > 8 || value < 1)
return " ";
// ▁▂▃▄▅▆▇█
switch (value)
switch (static_cast<UInt8>(value))
{
case 1: return "";
case 2: return "";
@ -136,9 +141,20 @@ private:
String value;
if (data.points.empty() || !width)
return value;
X local_min_x = data.min_x;
X local_max_x = data.max_x;
size_t diff_x = local_max_x - local_min_x;
size_t diff_x;
X min_x_local;
if (specified_min_max_x)
{
diff_x = max_x - min_x;
min_x_local = min_x;
}
else
{
diff_x = data.max_x - data.min_x;
min_x_local = data.min_x;
}
if ((diff_x + 1) <= width)
{
Y min_y = data.min_y;
@ -149,15 +165,15 @@ private:
{
for (size_t i = 0; i <= diff_x; ++i)
{
auto it = data.points.find(local_min_x + i);
auto it = data.points.find(min_x_local + i);
bool found = it != data.points.end();
value += getBar(found ? static_cast<UInt8>(std::round(((it->getMapped() - min_y) / diff_y) * 7) + 1) : 0);
value += getBar(found ? std::round(((it->getMapped() - min_y) / diff_y) * 7) + 1 : 0.0);
}
}
else
{
for (size_t i = 0; i <= diff_x; ++i)
value += getBar(data.points.has(local_min_x + i) ? 1 : 0);
value += getBar(data.points.has(min_x_local + i) ? 1 : 0);
}
}
else
@ -186,9 +202,9 @@ private:
if (i == bound.first) // is bound
{
Float64 proportion = bound.second - bound.first;
auto it = data.points.find(local_min_x + i);
auto it = data.points.find(min_x_local + i);
bool found = (it != data.points.end());
if (found)
if (found && proportion > 0)
new_y = new_y.value_or(0) + it->getMapped() * proportion;
if (new_y)
@ -213,7 +229,7 @@ private:
}
else
{
auto it = data.points.find(local_min_x + i);
auto it = data.points.find(min_x_local + i);
if (it != data.points.end())
new_y = new_y.value_or(0) + it->getMapped();
}
@ -226,7 +242,7 @@ private:
auto getBars = [&] (const std::optional<Float64> & point_y)
{
value += getBar(point_y ? static_cast<UInt8>(std::round(((point_y.value() - min_y.value()) / diff_y) * 7) + 1) : 0);
value += getBar(point_y ? std::round(((point_y.value() - min_y.value()) / diff_y) * 7) + 1 : 0);
};
auto getBarsForConstant = [&] (const std::optional<Float64> & point_y)
{
@ -250,11 +266,13 @@ public:
width = params.at(0).safeGet<UInt64>();
if (params.size() == 3)
{
specified_min_max_x = true;
min_x = params.at(1).safeGet<X>();
max_x = params.at(2).safeGet<X>();
}
else
{
specified_min_max_x = false;
min_x = std::numeric_limits<X>::min();
max_x = std::numeric_limits<X>::max();
}

View File

@ -263,7 +263,7 @@ void ColumnMap::getExtremes(Field & min, Field & max) const
void ColumnMap::forEachSubcolumn(ColumnCallback callback)
{
nested->forEachSubcolumn(callback);
callback(nested);
}
bool ColumnMap::structureEquals(const IColumn & rhs) const

View File

@ -598,7 +598,8 @@
M(628, OFFSET_FETCH_WITHOUT_ORDER_BY) \
M(629, HTTP_RANGE_NOT_SATISFIABLE) \
M(630, HAVE_DEPENDENT_OBJECTS) \
M(631, UNEXPECTED_DATA_AFTER_PARSED_VALUE) \
M(631, UNKNOWN_FILE_SIZE) \
M(632, UNEXPECTED_DATA_AFTER_PARSED_VALUE) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -261,6 +261,8 @@
M(RemoteFSUnprefetchedReads, "Number of reads from unprefetched buffer") \
M(RemoteFSBuffers, "Number of buffers created for asynchronous reading from remote filesystem") \
\
M(ReadBufferSeekCancelConnection, "Number of seeks which lead to new connection (s3, http)") \
\
M(SleepFunctionCalls, "Number of times a sleep function (sleep, sleepEachRow) has been called.") \
M(SleepFunctionMicroseconds, "Time spent sleeping due to a sleep function call.") \
\

View File

@ -4,13 +4,6 @@
#include <Poco/Environment.h>
#include <filesystem>
#if defined(linux) || defined(__linux) || defined(__linux__)
#include <unistd.h>
#include <fcntl.h>
#include <sys/syscall.h>
#include <linux/fs.h>
#endif
namespace fs = std::filesystem;
namespace DB
@ -25,25 +18,56 @@ namespace ErrorCodes
extern const int FILE_ALREADY_EXISTS;
}
}
#if defined(__linux__)
#include <unistd.h>
#include <fcntl.h>
#include <sys/syscall.h>
#include <linux/fs.h>
/// For old versions of libc.
#if !defined(RENAME_NOREPLACE)
#define RENAME_NOREPLACE 1
#endif
#if !defined(RENAME_EXCHANGE)
#define RENAME_EXCHANGE 2
#endif
#if !defined(__NR_renameat2)
#if defined(__x86_64__)
#define __NR_renameat2 316
#elif defined(__aarch64__)
#define __NR_renameat2 276
#elif defined(__ppc64__)
#define __NR_renameat2 357
#elif defined(__riscv)
#define __NR_renameat2 276
#else
#error "Unsupported architecture"
#endif
#endif
namespace DB
{
static bool supportsRenameat2Impl()
{
#if defined(__NR_renameat2)
VersionNumber renameat2_minimal_version(3, 15, 0);
VersionNumber linux_version(Poco::Environment::osVersion());
return linux_version >= renameat2_minimal_version;
#else
return false;
#endif
}
#if defined(__NR_renameat2)
static bool renameat2(const std::string & old_path, const std::string & new_path, int flags)
{
if (!supportsRenameat2())
return false;
if (old_path.empty() || new_path.empty())
throw Exception("Cannot rename " + old_path + " to " + new_path + ": path is empty", ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot rename {} to {}: path is empty", old_path, new_path);
/// int olddirfd (ignored for absolute oldpath), const char *oldpath,
/// int newdirfd (ignored for absolute newpath), const char *newpath,
@ -63,28 +87,50 @@ static bool renameat2(const std::string & old_path, const std::string & new_path
return false;
if (errno == EEXIST)
throwFromErrno("Cannot rename " + old_path + " to " + new_path + " because the second path already exists", ErrorCodes::ATOMIC_RENAME_FAIL);
throwFromErrno(fmt::format("Cannot rename {} to {} because the second path already exists", old_path, new_path), ErrorCodes::ATOMIC_RENAME_FAIL);
if (errno == ENOENT)
throwFromErrno("Paths cannot be exchanged because " + old_path + " or " + new_path + " does not exist", ErrorCodes::ATOMIC_RENAME_FAIL);
throwFromErrnoWithPath("Cannot rename " + old_path + " to " + new_path, new_path, ErrorCodes::SYSTEM_ERROR);
throwFromErrno(fmt::format("Paths cannot be exchanged because {} or {} does not exist", old_path, new_path), ErrorCodes::ATOMIC_RENAME_FAIL);
throwFromErrnoWithPath(fmt::format("Cannot rename {} to {}", old_path, new_path), new_path, ErrorCodes::SYSTEM_ERROR);
}
bool supportsRenameat2()
{
static bool supports = supportsRenameat2Impl();
return supports;
}
}
#else
#define RENAME_NOREPLACE -1
#define RENAME_EXCHANGE -1
namespace DB
{
static bool renameat2(const std::string &, const std::string &, int)
{
return false;
}
bool supportsRenameat2()
{
return false;
}
}
#endif
namespace DB
{
static void renameNoReplaceFallback(const std::string & old_path, const std::string & new_path)
{
/// NOTE it's unsafe
if (fs::exists(new_path))
throw Exception("File " + new_path + " exists", ErrorCodes::FILE_ALREADY_EXISTS);
throw Exception(ErrorCodes::FILE_ALREADY_EXISTS, "File {} exists", new_path);
fs::rename(old_path, new_path);
}
@ -97,13 +143,6 @@ static void renameExchangeFallback(const std::string &, const std::string &)
}
#pragma GCC diagnostic pop
bool supportsRenameat2()
{
static bool supports = supportsRenameat2Impl();
return supports;
}
void renameNoReplace(const std::string & old_path, const std::string & new_path)
{
if (!renameat2(old_path, new_path, RENAME_NOREPLACE))

View File

@ -514,6 +514,7 @@ class IColumn;
M(Int64, read_priority, 0, "Priority to read data from local filesystem. Only supported for 'pread_threadpool' method.", 0) \
M(UInt64, merge_tree_min_rows_for_concurrent_read_for_remote_filesystem, (20 * 8192), "If at least as many lines are read from one file, the reading can be parallelized, when reading from remote filesystem.", 0) \
M(UInt64, merge_tree_min_bytes_for_concurrent_read_for_remote_filesystem, (24 * 10 * 1024 * 1024), "If at least as many bytes are read from one file, the reading can be parallelized, when reading from remote filesystem.", 0) \
M(UInt64, remote_read_min_bytes_for_seek, DBMS_DEFAULT_BUFFER_SIZE, "Min bytes required for remote read (url, s3) to do seek, instead for read with ignore.", 0) \
\
M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \
M(Bool, async_insert, false, "If true, data from INSERT query is stored in queue and later flushed to table in background. Makes sense only for inserts via HTTP protocol. If wait_for_async_insert is false, INSERT query is processed almost instantly, otherwise client will wait until data will be flushed to table", 0) \
@ -582,6 +583,7 @@ class IColumn;
M(Bool, input_format_arrow_import_nested, false, "Allow to insert array of structs into Nested table in Arrow input format.", 0) \
M(Bool, input_format_orc_import_nested, false, "Allow to insert array of structs into Nested table in ORC input format.", 0) \
M(Bool, input_format_parquet_import_nested, false, "Allow to insert array of structs into Nested table in Parquet input format.", 0) \
M(Bool, input_format_allow_seeks, true, "Allow seeks while reading in ORC/Parquet/Arrow input formats", 0) \
\
M(DateTimeInputFormat, date_time_input_format, FormatSettings::DateTimeInputFormat::Basic, "Method to read DateTime from text input formats. Possible values: 'basic' and 'best_effort'.", 0) \
M(DateTimeOutputFormat, date_time_output_format, FormatSettings::DateTimeOutputFormat::Simple, "Method to write DateTime to text output. Possible values: 'simple', 'iso', 'unix_timestamp'.", 0) \

View File

@ -229,15 +229,8 @@ void DatabaseAtomic::renameTable(ContextPtr local_context, const String & table_
StoragePtr table = getTableUnlocked(table_name, db_lock);
if (table->isDictionary() && !dictionary)
{
if (exchange)
throw Exception(ErrorCodes::INCORRECT_QUERY,
"Use EXCHANGE DICTIONARIES for dictionaries and EXCHANGE TABLES for tables.");
else
throw Exception(ErrorCodes::INCORRECT_QUERY,
"Use RENAME DICTIONARY for dictionaries and RENAME TABLE for tables.");
}
if (dictionary && !table->isDictionary())
throw Exception(ErrorCodes::INCORRECT_QUERY, "Use RENAME/EXCHANGE TABLE (instead of RENAME/EXCHANGE DICTIONARY) for tables");
table->checkTableCanBeRenamed();
assert_can_move_mat_view(table);
@ -245,6 +238,8 @@ void DatabaseAtomic::renameTable(ContextPtr local_context, const String & table_
if (exchange)
{
other_table = other_db.getTableUnlocked(to_table_name, other_db_lock);
if (dictionary && !other_table->isDictionary())
throw Exception(ErrorCodes::INCORRECT_QUERY, "Use RENAME/EXCHANGE TABLE (instead of RENAME/EXCHANGE DICTIONARY) for tables");
other_table->checkTableCanBeRenamed();
assert_can_move_mat_view(other_table);
}

View File

@ -40,6 +40,7 @@ namespace ErrorCodes
extern const int TABLE_ALREADY_EXISTS;
extern const int EMPTY_LIST_OF_COLUMNS_PASSED;
extern const int DATABASE_NOT_EMPTY;
extern const int INCORRECT_QUERY;
}
@ -349,8 +350,6 @@ void DatabaseOnDisk::renameTable(
{
if (exchange)
throw Exception("Tables can be exchanged only in Atomic databases", ErrorCodes::NOT_IMPLEMENTED);
if (dictionary)
throw Exception("Dictionaries can be renamed only in Atomic databases", ErrorCodes::NOT_IMPLEMENTED);
bool from_ordinary_to_atomic = false;
bool from_atomic_to_ordinary = false;
@ -372,7 +371,11 @@ void DatabaseOnDisk::renameTable(
ASTPtr attach_query;
/// DatabaseLazy::detachTable may return nullptr even if table exists, so we need tryGetTable for this case.
StoragePtr table = tryGetTable(table_name, local_context);
if (dictionary && table && !table->isDictionary())
throw Exception("Use RENAME/EXCHANGE TABLE (instead of RENAME/EXCHANGE DICTIONARY) for tables", ErrorCodes::INCORRECT_QUERY);
detachTable(local_context, table_name);
UUID prev_uuid = UUIDHelpers::Nil;
try
{

View File

@ -88,6 +88,9 @@ void ReadBufferFromRemoteFSGather::initialize()
{
current_buf = createImplementationBuffer(file_path, read_until_position);
current_buf_idx = i;
if (auto * in = dynamic_cast<SeekableReadBufferWithSize *>(current_buf.get()))
in->setReadType(SeekableReadBufferWithSize::ReadType::DISK_READ);
}
current_buf->seek(current_buf_offset, SEEK_SET);

View File

@ -114,6 +114,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.orc.import_nested = settings.input_format_orc_import_nested;
format_settings.defaults_for_omitted_fields = settings.input_format_defaults_for_omitted_fields;
format_settings.capn_proto.enum_comparing_mode = settings.format_capn_proto_enum_comparising_mode;
format_settings.seekable_read = settings.input_format_allow_seeks;
/// Validate avro_schema_registry_url with RemoteHostFilter when non-empty and in Server context
if (format_settings.schema.is_server)

View File

@ -32,6 +32,8 @@ struct FormatSettings
bool decimal_trailing_zeros = false;
bool defaults_for_omitted_fields = true;
bool seekable_read = true;
enum class DateTimeInputFormat
{
Basic, /// Default format for fast parsing: YYYY-MM-DD hh:mm:ss (ISO-8601 without fractional part and timezone) or NNNNNNNNNN unix timestamp.

View File

@ -8,6 +8,7 @@
#include <aws/s3/S3Client.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <aws/s3/model/HeadObjectRequest.h>
#include <base/logger_useful.h>
#include <base/sleep.h>
@ -20,6 +21,7 @@ namespace ProfileEvents
extern const Event S3ReadMicroseconds;
extern const Event S3ReadBytes;
extern const Event S3ReadRequestsErrors;
extern const Event ReadBufferSeekCancelConnection;
}
namespace DB
@ -34,9 +36,14 @@ namespace ErrorCodes
ReadBufferFromS3::ReadBufferFromS3(
std::shared_ptr<Aws::S3::S3Client> client_ptr_, const String & bucket_, const String & key_,
UInt64 max_single_read_retries_, const ReadSettings & settings_, bool use_external_buffer_, size_t read_until_position_)
: SeekableReadBuffer(nullptr, 0)
std::shared_ptr<Aws::S3::S3Client> client_ptr_,
const String & bucket_,
const String & key_,
UInt64 max_single_read_retries_,
const ReadSettings & settings_,
bool use_external_buffer_,
size_t read_until_position_)
: SeekableReadBufferWithSize(nullptr, 0)
, client_ptr(std::move(client_ptr_))
, bucket(bucket_)
, key(key_)
@ -142,9 +149,12 @@ bool ReadBufferFromS3::nextImpl()
return true;
}
off_t ReadBufferFromS3::seek(off_t offset_, int whence)
{
if (impl)
bool restricted_seek = read_type == SeekableReadBufferWithSize::ReadType::DISK_READ;
if (impl && restricted_seek)
throw Exception("Seek is allowed only before first read attempt from the buffer.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
if (whence != SEEK_SET)
@ -153,11 +163,57 @@ off_t ReadBufferFromS3::seek(off_t offset_, int whence)
if (offset_ < 0)
throw Exception("Seek position is out of bounds. Offset: " + std::to_string(offset_), ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
offset = offset_;
if (!restricted_seek)
{
if (!working_buffer.empty()
&& size_t(offset_) >= offset - working_buffer.size()
&& offset_ < offset)
{
pos = working_buffer.end() - (offset - offset_);
assert(pos >= working_buffer.begin());
assert(pos <= working_buffer.end());
return getPosition();
}
auto position = getPosition();
if (offset_ > position)
{
size_t diff = offset_ - position;
if (diff < read_settings.remote_read_min_bytes_for_seek)
{
ignore(diff);
return offset_;
}
}
pos = working_buffer.end();
if (impl)
{
ProfileEvents::increment(ProfileEvents::ReadBufferSeekCancelConnection);
impl.reset();
}
}
offset = offset_;
return offset;
}
std::optional<size_t> ReadBufferFromS3::getTotalSize()
{
if (file_size)
return file_size;
Aws::S3::Model::HeadObjectRequest request;
request.SetBucket(bucket);
request.SetKey(key);
auto outcome = client_ptr->HeadObject(request);
auto head_result = outcome.GetResultWithOwnership();
file_size = head_result.GetContentLength();
return file_size;
}
off_t ReadBufferFromS3::getPosition()
{
return offset - available();

View File

@ -23,7 +23,7 @@ namespace DB
/**
* Perform S3 HTTP GET request and provide response to read.
*/
class ReadBufferFromS3 : public SeekableReadBuffer
class ReadBufferFromS3 : public SeekableReadBufferWithSize
{
private:
std::shared_ptr<Aws::S3::S3Client> client_ptr;
@ -49,13 +49,18 @@ public:
bool nextImpl() override;
off_t seek(off_t off, int whence) override;
off_t getPosition() override;
std::optional<size_t> getTotalSize() override;
private:
std::unique_ptr<ReadBuffer> initialize();
ReadSettings read_settings;
bool use_external_buffer;
off_t read_until_position = 0;
};

View File

@ -77,6 +77,8 @@ struct ReadSettings
size_t remote_fs_read_max_backoff_ms = 10000;
size_t remote_fs_read_backoff_max_tries = 4;
size_t remote_read_min_bytes_for_seek = DBMS_DEFAULT_BUFFER_SIZE;
size_t http_max_tries = 1;
size_t http_retry_initial_backoff_ms = 100;
size_t http_retry_max_backoff_ms = 1600;

View File

@ -18,11 +18,15 @@
#include <Poco/Version.h>
#include <Common/DNSResolver.h>
#include <Common/RemoteHostFilter.h>
#include <Common/config.h>
#include <base/logger_useful.h>
#include <Poco/URIStreamFactory.h>
#include <Common/config.h>
namespace ProfileEvents
{
extern const Event ReadBufferSeekCancelConnection;
}
namespace DB
{
@ -34,6 +38,8 @@ namespace ErrorCodes
extern const int TOO_MANY_REDIRECTS;
extern const int HTTP_RANGE_NOT_SATISFIABLE;
extern const int BAD_ARGUMENTS;
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int SEEK_POSITION_OUT_OF_BOUND;
}
template <typename SessionPtr>
@ -83,7 +89,7 @@ public:
namespace detail
{
template <typename UpdatableSessionPtr>
class ReadWriteBufferFromHTTPBase : public ReadBuffer
class ReadWriteBufferFromHTTPBase : public SeekableReadBufferWithSize
{
public:
using HTTPHeaderEntry = std::tuple<std::string, std::string>;
@ -114,7 +120,7 @@ namespace detail
size_t buffer_size;
bool use_external_buffer;
size_t bytes_read = 0;
size_t offset_from_begin_pos = 0;
Range read_range;
/// Delayed exception in case retries with partial content are not satisfiable.
@ -137,16 +143,16 @@ namespace detail
size_t getOffset() const
{
return read_range.begin + bytes_read;
return read_range.begin + offset_from_begin_pos;
}
std::istream * call(Poco::URI uri_, Poco::Net::HTTPResponse & response)
std::istream * call(Poco::URI uri_, Poco::Net::HTTPResponse & response, const std::string & method_)
{
// With empty path poco will send "POST HTTP/1.1" its bug.
if (uri_.getPath().empty())
uri_.setPath("/");
Poco::Net::HTTPRequest request(method, uri_.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1);
Poco::Net::HTTPRequest request(method_, uri_.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1);
request.setHost(uri_.getHost()); // use original, not resolved host name in header
if (out_stream_callback)
@ -195,6 +201,42 @@ namespace detail
}
}
std::optional<size_t> getTotalSize() override
{
if (read_range.end)
return *read_range.end - read_range.begin;
Poco::Net::HTTPResponse response;
for (size_t i = 0; i < 10; ++i)
{
try
{
call(uri, response, Poco::Net::HTTPRequest::HTTP_HEAD);
while (isRedirect(response.getStatus()))
{
Poco::URI uri_redirect(response.get("Location"));
remote_host_filter.checkURL(uri_redirect);
session->updateSession(uri_redirect);
istr = call(uri_redirect, response, method);
}
break;
}
catch (const Poco::Exception & e)
{
LOG_ERROR(log, "Failed to make HTTP_HEAD request to {}. Error: {}", uri.toString(), e.displayText());
}
}
if (response.hasContentLength())
read_range.end = read_range.begin + response.getContentLength();
return read_range.end;
}
public:
using NextCallback = std::function<void(size_t)>;
using OutStreamCallback = std::function<void(std::ostream &)>;
@ -212,7 +254,7 @@ namespace detail
const RemoteHostFilter & remote_host_filter_ = {},
bool delay_initialization = false,
bool use_external_buffer_ = false)
: ReadBuffer(nullptr, 0)
: SeekableReadBufferWithSize(nullptr, 0)
, uri {uri_}
, method {!method_.empty() ? method_ : out_stream_callback_ ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET}
, session {session_}
@ -245,7 +287,7 @@ namespace detail
{
Poco::Net::HTTPResponse response;
istr = call(saved_uri_redirect ? *saved_uri_redirect : uri, response);
istr = call(saved_uri_redirect ? *saved_uri_redirect : uri, response, method);
while (isRedirect(response.getStatus()))
{
@ -253,7 +295,8 @@ namespace detail
remote_host_filter.checkURL(uri_redirect);
session->updateSession(uri_redirect);
istr = call(uri_redirect, response);
istr = call(uri_redirect, response, method);
saved_uri_redirect = uri_redirect;
}
@ -277,7 +320,7 @@ namespace detail
}
}
if (!bytes_read && !read_range.end && response.hasContentLength())
if (!offset_from_begin_pos && !read_range.end && response.hasContentLength())
read_range.end = read_range.begin + response.getContentLength();
try
@ -375,9 +418,9 @@ namespace detail
{
/**
* Retry request unconditionally if nothing has been read yet.
* Otherwise if it is GET method retry with range header starting from bytes_read.
* Otherwise if it is GET method retry with range header.
*/
bool can_retry_request = !bytes_read || method == Poco::Net::HTTPRequest::HTTP_GET;
bool can_retry_request = !offset_from_begin_pos || method == Poco::Net::HTTPRequest::HTTP_GET;
if (!can_retry_request)
throw;
@ -408,10 +451,61 @@ namespace detail
internal_buffer = impl->buffer();
working_buffer = internal_buffer;
bytes_read += working_buffer.size();
offset_from_begin_pos += working_buffer.size();
return true;
}
off_t getPosition() override
{
return getOffset() - available();
}
off_t seek(off_t offset_, int whence) override
{
if (whence != SEEK_SET)
throw Exception("Only SEEK_SET mode is allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
if (offset_ < 0)
throw Exception("Seek position is out of bounds. Offset: " + std::to_string(offset_), ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
off_t current_offset = getOffset();
if (!working_buffer.empty()
&& size_t(offset_) >= current_offset - working_buffer.size()
&& offset_ < current_offset)
{
pos = working_buffer.end() - (current_offset - offset_);
assert(pos >= working_buffer.begin());
assert(pos <= working_buffer.end());
return getPosition();
}
auto position = getPosition();
if (offset_ > position)
{
size_t diff = offset_ - position;
if (diff < settings.remote_read_min_bytes_for_seek)
{
ignore(diff);
return offset_;
}
}
if (impl)
{
ProfileEvents::increment(ProfileEvents::ReadBufferSeekCancelConnection);
impl.reset();
}
pos = working_buffer.end();
read_range.begin = offset_;
read_range.end = std::nullopt;
offset_from_begin_pos = 0;
return offset_;
}
std::string getResponseCookie(const std::string & name, const std::string & def) const
{
for (const auto & cookie : cookies)
@ -463,7 +557,7 @@ class ReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase<std::
using Parent = detail::ReadWriteBufferFromHTTPBase<std::shared_ptr<UpdatableSession>>;
public:
explicit ReadWriteBufferFromHTTP(
explicit ReadWriteBufferFromHTTP(
Poco::URI uri_,
const std::string & method_,
OutStreamCallback out_stream_callback_,

View File

@ -51,7 +51,7 @@ const std::pair<DB::LogsLevel, Poco::Message::Priority> & convertLogLevel(Aws::U
{Aws::Utils::Logging::LogLevel::Error, {DB::LogsLevel::error, Poco::Message::PRIO_ERROR}},
{Aws::Utils::Logging::LogLevel::Warn, {DB::LogsLevel::warning, Poco::Message::PRIO_WARNING}},
{Aws::Utils::Logging::LogLevel::Info, {DB::LogsLevel::information, Poco::Message::PRIO_INFORMATION}},
{Aws::Utils::Logging::LogLevel::Debug, {DB::LogsLevel::trace, Poco::Message::PRIO_TRACE}},
{Aws::Utils::Logging::LogLevel::Debug, {DB::LogsLevel::debug, Poco::Message::PRIO_DEBUG}},
{Aws::Utils::Logging::LogLevel::Trace, {DB::LogsLevel::trace, Poco::Message::PRIO_TRACE}},
};
return mapping.at(log_level);

View File

@ -1,6 +1,7 @@
#pragma once
#include <IO/ReadBuffer.h>
#include <optional>
namespace DB
{
@ -34,4 +35,36 @@ public:
};
using SeekableReadBufferPtr = std::shared_ptr<SeekableReadBuffer>;
class SeekableReadBufferWithSize : public SeekableReadBuffer
{
public:
SeekableReadBufferWithSize(Position ptr, size_t size)
: SeekableReadBuffer(ptr, size) {}
SeekableReadBufferWithSize(Position ptr, size_t size, size_t offset)
: SeekableReadBuffer(ptr, size, offset) {}
/// set std::nullopt in case it is impossible to find out total size.
virtual std::optional<size_t> getTotalSize() = 0;
/**
* Some buffers might have different seek restrictions according to where it is used.
* For example, ReadBufferFromS3 and ReadBufferFromWebServer, when used for reading
* from remote disks, require some additional invariants and restrictions, which
* are not needed in other cases.
*/
enum class ReadType
{
DEFAULT,
DISK_READ
};
void setReadType(ReadType type) { read_type = type; }
protected:
ReadType read_type = ReadType::DEFAULT;
std::optional<size_t> file_size;
};
}

View File

@ -30,6 +30,8 @@
#define SYS_preadv2 286
#elif defined(__ppc64__)
#define SYS_preadv2 380
#elif defined(__riscv)
#define SYS_preadv2 286
#else
#error "Unsupported architecture"
#endif

View File

@ -3089,6 +3089,8 @@ ReadSettings Context::getReadSettings() const
res.remote_fs_read_max_backoff_ms = settings.remote_fs_read_max_backoff_ms;
res.remote_fs_read_backoff_max_tries = settings.remote_fs_read_backoff_max_tries;
res.remote_read_min_bytes_for_seek = settings.remote_read_min_bytes_for_seek;
res.local_fs_buffer_size = settings.max_read_buffer_size;
res.direct_io_threshold = settings.min_bytes_to_use_direct_io;
res.mmap_threshold = settings.min_bytes_to_use_mmap_io;

View File

@ -21,6 +21,7 @@ namespace ErrorCodes
extern const int INCORRECT_NUMBER_OF_COLUMNS;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int INCORRECT_DATA;
extern const int CANNOT_PARSE_DOMAIN_VALUE_FROM_STRING;
}
@ -36,7 +37,8 @@ bool isParseError(int code)
|| code == ErrorCodes::CANNOT_READ_ALL_DATA
|| code == ErrorCodes::TOO_LARGE_STRING_SIZE
|| code == ErrorCodes::ARGUMENT_OUT_OF_BOUND /// For Decimals
|| code == ErrorCodes::INCORRECT_DATA; /// For some ReadHelpers
|| code == ErrorCodes::INCORRECT_DATA /// For some ReadHelpers
|| code == ErrorCodes::CANNOT_PARSE_DOMAIN_VALUE_FROM_STRING;
}
IRowInputFormat::IRowInputFormat(Block header, ReadBuffer & in_, Params params_)

View File

@ -94,7 +94,7 @@ void ArrowBlockInputFormat::prepareReader()
}
else
{
auto file_reader_status = arrow::ipc::RecordBatchFileReader::Open(asArrowFile(*in));
auto file_reader_status = arrow::ipc::RecordBatchFileReader::Open(asArrowFile(*in, format_settings));
if (!file_reader_status.ok())
throw Exception(ErrorCodes::UNKNOWN_EXCEPTION,
"Error while opening a table: {}", file_reader_status.status().ToString());

View File

@ -6,6 +6,7 @@
#if USE_ARROW || USE_ORC || USE_PARQUET
#include <Common/assert_cast.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromString.h>
#include <IO/copyData.h>
@ -19,6 +20,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_FILE_SIZE;
}
ArrowBufferedOutputStream::ArrowBufferedOutputStream(WriteBuffer & out_) : out{out_}, is_open{true}
{
}
@ -46,9 +52,22 @@ RandomAccessFileFromSeekableReadBuffer::RandomAccessFileFromSeekableReadBuffer(S
{
}
RandomAccessFileFromSeekableReadBuffer::RandomAccessFileFromSeekableReadBuffer(SeekableReadBufferWithSize & in_)
: in{in_}, is_open{true}
{
}
arrow::Result<int64_t> RandomAccessFileFromSeekableReadBuffer::GetSize()
{
return arrow::Result<int64_t>(file_size);
if (!file_size)
{
auto * buf_with_size = dynamic_cast<SeekableReadBufferWithSize *>(&in);
if (buf_with_size)
file_size = buf_with_size->getTotalSize();
if (!file_size)
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out size of file");
}
return arrow::Result<int64_t>(*file_size);
}
arrow::Status RandomAccessFileFromSeekableReadBuffer::Close()
@ -121,7 +140,7 @@ arrow::Status ArrowInputStreamFromReadBuffer::Close()
return arrow::Status();
}
std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(ReadBuffer & in)
std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(ReadBuffer & in, const FormatSettings & settings)
{
if (auto * fd_in = dynamic_cast<ReadBufferFromFileDescriptor *>(&in))
{
@ -131,6 +150,11 @@ std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(ReadBuffer & in)
if (res == 0 && S_ISREG(stat.st_mode))
return std::make_shared<RandomAccessFileFromSeekableReadBuffer>(*fd_in, stat.st_size);
}
else if (auto * seekable_in = dynamic_cast<SeekableReadBufferWithSize *>(&in))
{
if (settings.seekable_read)
return std::make_shared<RandomAccessFileFromSeekableReadBuffer>(*seekable_in);
}
// fallback to loading the entire file in memory
std::string file_data;

View File

@ -4,14 +4,18 @@
#if USE_ARROW || USE_ORC || USE_PARQUET
#include <arrow/io/interfaces.h>
#include <optional>
namespace DB
{
class ReadBuffer;
class SeekableReadBuffer;
class WriteBuffer;
class SeekableReadBuffer;
class SeekableReadBufferWithSize;
struct FormatSettings;
class ArrowBufferedOutputStream : public arrow::io::OutputStream
{
public:
@ -40,6 +44,8 @@ class RandomAccessFileFromSeekableReadBuffer : public arrow::io::RandomAccessFil
public:
RandomAccessFileFromSeekableReadBuffer(SeekableReadBuffer & in_, off_t file_size_);
RandomAccessFileFromSeekableReadBuffer(SeekableReadBufferWithSize & in_);
arrow::Result<int64_t> GetSize() override;
arrow::Status Close() override;
@ -56,7 +62,7 @@ public:
private:
SeekableReadBuffer & in;
off_t file_size;
std::optional<off_t> file_size;
bool is_open = false;
ARROW_DISALLOW_COPY_AND_ASSIGN(RandomAccessFileFromSeekableReadBuffer);
@ -80,7 +86,7 @@ private:
ARROW_DISALLOW_COPY_AND_ASSIGN(ArrowInputStreamFromReadBuffer);
};
std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(ReadBuffer & in);
std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(ReadBuffer & in, const FormatSettings & settings);
}

View File

@ -518,13 +518,6 @@ ArrowColumnToCHColumn::ArrowColumnToCHColumn(
void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table)
{
Columns columns_list;
UInt64 num_rows = 0;
columns_list.reserve(header.rows());
using NameToColumnPtr = std::unordered_map<std::string, std::shared_ptr<arrow::ChunkedArray>>;
NameToColumnPtr name_to_column_ptr;
for (const auto& column_name : table->ColumnNames())
{
@ -532,6 +525,16 @@ void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arr
name_to_column_ptr[column_name] = arrow_column;
}
arrowColumnsToCHChunk(res, name_to_column_ptr);
}
void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & name_to_column_ptr)
{
Columns columns_list;
UInt64 num_rows = 0;
columns_list.reserve(header.rows());
std::unordered_map<String, BlockPtr> nested_tables;
for (size_t column_i = 0, columns = header.columns(); column_i < columns; ++column_i)
{
@ -587,7 +590,5 @@ void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arr
res.setColumns(columns_list, num_rows);
}
}
#endif

View File

@ -19,6 +19,8 @@ class Chunk;
class ArrowColumnToCHColumn
{
public:
using NameToColumnPtr = std::unordered_map<std::string, std::shared_ptr<arrow::ChunkedArray>>;
ArrowColumnToCHColumn(const Block & header_, const std::string & format_name_, bool import_nested_);
/// Constructor that create header by arrow schema. It will be useful for inserting
@ -27,6 +29,8 @@ public:
void arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table);
void arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & name_to_column_ptr);
private:
const Block header;
const std::string format_name;

View File

@ -1,15 +1,16 @@
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferValidUTF8.h>
#include <IO/WriteBufferFromString.h>
#include <Processors/Formats/Impl/JSONEachRowWithProgressRowOutputFormat.h>
#include <Formats/FormatFactory.h>
namespace DB
{
void JSONEachRowWithProgressRowOutputFormat::writeRowStartDelimiter()
{
if (has_progress)
writeProgress();
writeCString("{\"row\":{", out);
}
@ -22,11 +23,39 @@ void JSONEachRowWithProgressRowOutputFormat::writeRowEndDelimiter()
void JSONEachRowWithProgressRowOutputFormat::onProgress(const Progress & value)
{
progress.incrementPiecewiseAtomically(value);
writeCString("{\"progress\":", out);
progress.writeJSON(out);
writeCString("}\n", out);
String progress_line;
WriteBufferFromString ostr(progress_line);
writeCString("{\"progress\":", ostr);
progress.writeJSON(ostr);
writeCString("}\n", ostr);
ostr.finalize();
std::lock_guard lock(progress_lines_mutex);
progress_lines.emplace_back(std::move(progress_line));
has_progress = true;
}
void JSONEachRowWithProgressRowOutputFormat::flush()
{
if (has_progress)
writeProgress();
IOutputFormat::flush();
}
void JSONEachRowWithProgressRowOutputFormat::writeSuffix()
{
if (has_progress)
writeProgress();
JSONEachRowRowOutputFormat::writeSuffix();
}
void JSONEachRowWithProgressRowOutputFormat::writeProgress()
{
std::lock_guard lock(progress_lines_mutex);
for (const auto & progress_line : progress_lines)
writeString(progress_line, out);
progress_lines.clear();
has_progress = false;
}
void registerOutputFormatJSONEachRowWithProgress(FormatFactory & factory)
{

View File

@ -1,5 +1,6 @@
#pragma once
#include <Processors/Formats/Impl/JSONEachRowRowOutputFormat.h>
#include <mutex>
namespace DB
{
@ -10,12 +11,21 @@ public:
using JSONEachRowRowOutputFormat::JSONEachRowRowOutputFormat;
void onProgress(const Progress & value) override;
void flush() override;
private:
void writeRowStartDelimiter() override;
void writeRowEndDelimiter() override;
void writeSuffix() override;
void writeProgress();
Progress progress;
std::vector<String> progress_lines;
std::mutex progress_lines_mutex;
/// To not lock mutex and check progress_lines every row,
/// we will use atomic flag that progress_lines is not empty.
std::atomic_bool has_progress = false;
};
}

View File

@ -5,7 +5,6 @@
#include <IO/ReadBufferFromMemory.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <arrow/adapters/orc/adapter.h>
#include <arrow/io/memory.h>
#include "ArrowBufferedStreams.h"
#include "ArrowColumnToCHColumn.h"
@ -39,23 +38,38 @@ Chunk ORCBlockInputFormat::generate()
if (!file_reader)
prepareReader();
if (stripe_current >= stripe_total)
return res;
if (!batch_reader)
{
arrow::Status reader_status = file_reader->NextStripeReader(
DBMS_DEFAULT_BUFFER_SIZE, include_indices, &batch_reader);
if (!reader_status.ok())
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA,
"Failed to create batch reader: {}",
reader_status.ToString());
if (!batch_reader)
return res;
}
std::shared_ptr<arrow::RecordBatch> batch_result;
arrow::Status batch_status = file_reader->ReadStripe(stripe_current, include_indices, &batch_result);
arrow::Status batch_status = batch_reader->ReadNext(&batch_result);
if (!batch_status.ok())
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA,
"Error while reading batch of ORC data: {}", batch_status.ToString());
"Error while reading batch of ORC data: {}",
batch_status.ToString());
auto table_result = arrow::Table::FromRecordBatches({batch_result});
if (!table_result.ok())
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA,
"Error while reading batch of ORC data: {}", table_result.status().ToString());
if (!batch_result || !batch_result->num_rows())
return res;
++stripe_current;
ArrowColumnToCHColumn::NameToColumnPtr name_to_column_ptr;
for (const auto & column_name : column_names)
{
arrow::ArrayVector vec = {batch_result->GetColumnByName(column_name)};
std::shared_ptr<arrow::ChunkedArray> arrow_column = std::make_shared<arrow::ChunkedArray>(vec);
name_to_column_ptr[column_name] = arrow_column;
}
arrow_column_to_ch_column->arrowColumnsToCHChunk(res, name_to_column_ptr);
batch_reader.reset();
arrow_column_to_ch_column->arrowTableToCHChunk(res, *table_result);
return res;
}
@ -93,7 +107,7 @@ static size_t countIndicesForType(std::shared_ptr<arrow::DataType> type)
void ORCBlockInputFormat::prepareReader()
{
THROW_ARROW_NOT_OK(arrow::adapters::orc::ORCFileReader::Open(asArrowFile(*in), arrow::default_memory_pool(), &file_reader));
THROW_ARROW_NOT_OK(arrow::adapters::orc::ORCFileReader::Open(asArrowFile(*in, format_settings), arrow::default_memory_pool(), &file_reader));
stripe_total = file_reader->NumberOfStripes();
stripe_current = 0;
@ -117,6 +131,7 @@ void ORCBlockInputFormat::prepareReader()
const auto & name = schema->field(i)->name();
if (getPort().getHeader().has(name) || nested_table_names.contains(name))
{
column_names.push_back(name);
for (int j = 0; j != indexes_count; ++j)
include_indices.push_back(index + j);
}

View File

@ -5,7 +5,12 @@
#include <Processors/Formats/IInputFormat.h>
#include <Formats/FormatSettings.h>
namespace arrow::adapters::orc { class ORCFileReader; }
#include <arrow/adapters/orc/adapter.h>
namespace arrow::adapters::orc
{
class ORCFileReader;
}
namespace DB
{
@ -30,8 +35,12 @@ private:
std::unique_ptr<arrow::adapters::orc::ORCFileReader> file_reader;
std::shared_ptr<arrow::RecordBatchReader> batch_reader;
std::unique_ptr<ArrowColumnToCHColumn> arrow_column_to_ch_column;
std::vector<String> column_names;
int stripe_total = 0;
int stripe_current = 0;

View File

@ -93,7 +93,7 @@ static size_t countIndicesForType(std::shared_ptr<arrow::DataType> type)
void ParquetBlockInputFormat::prepareReader()
{
THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(asArrowFile(*in), arrow::default_memory_pool(), &file_reader));
THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(asArrowFile(*in, format_settings), arrow::default_memory_pool(), &file_reader));
row_group_total = file_reader->num_row_groups();
row_group_current = 0;

View File

@ -83,7 +83,7 @@ private:
/// Fills Join with block from right table.
/// Has single input and single output port.
/// Output port has empty header. It is closed when al data is inserted in join.
/// Output port has empty header. It is closed when all data is inserted in join.
class FillingRightJoinSideTransform : public IProcessor
{
public:

View File

@ -929,6 +929,15 @@ void HTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
request_credentials.reset(); // ...so that the next requests on the connection have to always start afresh in case of exceptions.
});
/// Check if exception was thrown in used_output.finalize().
/// In this case used_output can be in invalid state and we
/// cannot write in it anymore. So, just log this exception.
if (used_output.isFinalized())
{
tryLogCurrentException(log, "Cannot flush data to client");
return;
}
tryLogCurrentException(log);
/** If exception is received from remote server, then stack trace is embedded in message.

View File

@ -60,13 +60,19 @@ private:
/// Points to 'out' or to CompressedWriteBuffer(*out) or to CascadeWriteBuffer.
std::shared_ptr<WriteBuffer> out_maybe_delayed_and_compressed;
bool finalized = false;
inline bool hasDelayed() const
{
return out_maybe_delayed_and_compressed != out_maybe_compressed;
}
inline void finalize() const
inline void finalize()
{
if (finalized)
return;
finalized = true;
if (out_maybe_delayed_and_compressed)
out_maybe_delayed_and_compressed->finalize();
if (out_maybe_compressed)
@ -74,6 +80,11 @@ private:
if (out)
out->finalize();
}
inline bool isFinalized() const
{
return finalized;
}
};
IServer & server;

View File

@ -33,7 +33,6 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
HDFSFSPtr fs;
off_t offset = 0;
bool initialized = false;
off_t read_until_position = 0;
explicit ReadBufferFromHDFSImpl(
@ -64,24 +63,16 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
hdfsCloseFile(fs.get(), fin);
}
void initialize() const
std::optional<size_t> getTotalSize() const
{
if (!offset)
return;
int seek_status = hdfsSeek(fs.get(), fin, offset);
if (seek_status != 0)
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Fail to seek HDFS file: {}, error: {}", hdfs_uri, std::string(hdfsGetLastError()));
auto * file_info = hdfsGetPathInfo(fs.get(), hdfs_file_path.c_str());
if (!file_info)
return std::nullopt;
return file_info->mSize;
}
bool nextImpl() override
{
if (!initialized)
{
initialize();
initialized = true;
}
size_t num_bytes_to_read;
if (read_until_position)
{
@ -117,17 +108,13 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
off_t seek(off_t offset_, int whence) override
{
if (initialized)
throw Exception("Seek is allowed only before first read attempt from the buffer.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
if (whence != SEEK_SET)
throw Exception("Only SEEK_SET mode is allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
if (offset_ < 0)
throw Exception(ErrorCodes::SEEK_POSITION_OUT_OF_BOUND, "Seek position is out of bounds. Offset: {}", std::to_string(offset_));
throw Exception(ErrorCodes::LOGICAL_ERROR, "Only SEEK_SET is supported");
offset = offset_;
int seek_status = hdfsSeek(fs.get(), fin, offset);
if (seek_status != 0)
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Fail to seek HDFS file: {}, error: {}", hdfs_uri, std::string(hdfsGetLastError()));
return offset;
}
@ -145,11 +132,15 @@ ReadBufferFromHDFS::ReadBufferFromHDFS(
const String & hdfs_file_path_,
const Poco::Util::AbstractConfiguration & config_,
size_t buf_size_, size_t read_until_position_)
: SeekableReadBuffer(nullptr, 0)
: SeekableReadBufferWithSize(nullptr, 0)
, impl(std::make_unique<ReadBufferFromHDFSImpl>(hdfs_uri_, hdfs_file_path_, config_, buf_size_, read_until_position_))
{
}
std::optional<size_t> ReadBufferFromHDFS::getTotalSize()
{
return impl->getTotalSize();
}
bool ReadBufferFromHDFS::nextImpl()
{
@ -163,9 +154,28 @@ bool ReadBufferFromHDFS::nextImpl()
}
off_t ReadBufferFromHDFS::seek(off_t off, int whence)
off_t ReadBufferFromHDFS::seek(off_t offset_, int whence)
{
return impl->seek(off, whence);
if (whence != SEEK_SET)
throw Exception("Only SEEK_SET mode is allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
if (offset_ < 0)
throw Exception("Seek position is out of bounds. Offset: " + std::to_string(offset_), ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
if (!working_buffer.empty()
&& size_t(offset_) >= impl->getPosition() - working_buffer.size()
&& offset_ < impl->getPosition())
{
pos = working_buffer.end() - (impl->getPosition() - offset_);
assert(pos >= working_buffer.begin());
assert(pos <= working_buffer.end());
return getPosition();
}
pos = working_buffer.end();
impl->seek(offset_, whence);
return impl->getPosition();
}

View File

@ -19,7 +19,7 @@ namespace DB
/** Accepts HDFS path to file and opens it.
* Closes file by himself (thus "owns" a file descriptor).
*/
class ReadBufferFromHDFS : public SeekableReadBuffer
class ReadBufferFromHDFS : public SeekableReadBufferWithSize
{
struct ReadBufferFromHDFSImpl;
@ -37,6 +37,8 @@ public:
off_t getPosition() override;
std::optional<size_t> getTotalSize() override;
private:
std::unique_ptr<ReadBufferFromHDFSImpl> impl;
};

View File

@ -212,19 +212,39 @@ void StorageDictionary::renameInMemory(const StorageID & new_table_id)
auto old_table_id = getStorageID();
IStorage::renameInMemory(new_table_id);
bool has_configuration = false;
assert((location == Location::SameDatabaseAndNameAsDictionary) == (getConfiguration().get() != nullptr));
if (location != Location::SameDatabaseAndNameAsDictionary)
return;
/// It's DDL dictionary, need to update configuration and reload
bool move_to_atomic = old_table_id.uuid == UUIDHelpers::Nil && new_table_id.uuid != UUIDHelpers::Nil;
bool move_to_ordinary = old_table_id.uuid != UUIDHelpers::Nil && new_table_id.uuid == UUIDHelpers::Nil;
assert(old_table_id.uuid == new_table_id.uuid || move_to_atomic || move_to_ordinary);
{
std::lock_guard<std::mutex> lock(dictionary_config_mutex);
if (configuration)
{
has_configuration = true;
configuration->setString("dictionary.database", new_table_id.database_name);
configuration->setString("dictionary.name", new_table_id.table_name);
}
configuration->setString("dictionary.database", new_table_id.database_name);
configuration->setString("dictionary.name", new_table_id.table_name);
if (move_to_atomic)
configuration->setString("dictionary.uuid", toString(new_table_id.uuid));
else if (move_to_ordinary)
configuration->remove("dictionary.uuid");
}
if (has_configuration)
/// Dictionary is moving between databases of different engines or is renaming inside Ordinary database
bool recreate_dictionary = old_table_id.uuid == UUIDHelpers::Nil || new_table_id.uuid == UUIDHelpers::Nil;
if (recreate_dictionary)
{
/// It's too hard to update both name and uuid, better to reload dictionary with new name
removeDictionaryConfigurationFromRepository();
auto repository = std::make_unique<ExternalLoaderDictionaryStorageConfigRepository>(*this);
remove_repository_callback = getContext()->getExternalDictionariesLoader().addConfigRepository(std::move(repository));
/// Dictionary will be reloaded lazily to avoid exceptions in the middle of renaming
}
else
{
const auto & external_dictionaries_loader = getContext()->getExternalDictionariesLoader();
auto result = external_dictionaries_loader.getLoadResult(old_table_id.getInternalDictionaryName());

View File

@ -39,17 +39,13 @@ def get_changed_docker_images(pr_info, repo_path, image_file_path):
if image_description['name'].startswith('clickhouse/'):
dockerhub_repo_name = 'clickhouse'
if 'release' in pr_info.labels:
logging.info("Release PR, will rebuild all images from branch, including %s", dockerfile_dir)
changed_images.append(dockerfile_dir)
else:
for f in files_changed:
if f.startswith(dockerfile_dir):
logging.info(
"Found changed file '%s' which affects docker image '%s' with path '%s'",
f, image_description['name'], dockerfile_dir)
changed_images.append(dockerfile_dir)
break
for f in files_changed:
if f.startswith(dockerfile_dir):
logging.info(
"Found changed file '%s' which affects docker image '%s' with path '%s'",
f, image_description['name'], dockerfile_dir)
changed_images.append(dockerfile_dir)
break
# The order is important: dependents should go later than bases, so that
# they are built with updated base versions.

View File

@ -34,12 +34,12 @@ IMAGES = [
"yandex/clickhouse-integration-helper",
]
def get_json_params_dict(check_name, commit_sha, pr_number, docker_images):
def get_json_params_dict(check_name, pr_info, docker_images):
return {
'context_name': check_name,
'commit': commit_sha,
'pull_request': pr_number,
'pr_info': None,
'commit': pr_info.sha,
'pull_request': pr_info.number,
'pr_info': {'changed_files' : list(pr_info.changed_files)},
'docker_images_with_versions': docker_images,
'shuffle_test_groups': False,
'use_tmpfs': False,
@ -111,7 +111,8 @@ if __name__ == "__main__":
with open(os.getenv('GITHUB_EVENT_PATH'), 'r', encoding='utf-8') as event_file:
event = json.load(event_file)
pr_info = PRInfo(event)
is_flaky_check = 'flaky' in check_name
pr_info = PRInfo(event, need_changed_files=is_flaky_check)
gh = Github(get_best_robot_token())
@ -135,7 +136,7 @@ if __name__ == "__main__":
json_path = os.path.join(work_path, 'params.json')
with open(json_path, 'w', encoding='utf-8') as json_params:
json_params.write(json.dumps(get_json_params_dict(check_name, pr_info.sha, pr_info.number, images_with_versions)))
json_params.write(json.dumps(get_json_params_dict(check_name, pr_info, images_with_versions)))
output_path_log = os.path.join(result_path, "main_script_log.txt")

View File

@ -90,15 +90,25 @@ class PRInfo:
self.pr_html_url = pull_request['html_url']
if need_changed_files:
commit_before = github_event['before']
response = requests.get(f"https://api.github.com/repos/{os.getenv('GITHUB_REPOSITORY')}/compare/{commit_before}...{self.sha}")
response.raise_for_status()
diff = response.json()
if self.number == 0:
commit_before = github_event['before']
response = requests.get(f"https://api.github.com/repos/{os.getenv('GITHUB_REPOSITORY')}/compare/{commit_before}...{self.sha}")
response.raise_for_status()
diff = response.json()
if 'files' in diff:
self.changed_files = [f['filename'] for f in diff['files']]
if 'files' in diff:
self.changed_files = [f['filename'] for f in diff['files']]
else:
self.changed_files = set([])
else:
self.changed_files = set([])
if 'pr-backport' in self.labels:
diff_url = f"https://github.com/{os.getenv('GITHUB_REPOSITORY')}/compare/master...{self.head_ref}.diff"
else:
diff_url = pull_request['diff_url']
diff = urllib.request.urlopen(diff_url)
diff_object = PatchSet(diff, diff.headers.get_charsets()[0])
self.changed_files = { f.path for f in diff_object }
else:
self.changed_files = set([])
else:

250
tests/ci/push_to_artifactory.py Executable file
View File

@ -0,0 +1,250 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import logging
import os
import re
from artifactory import ArtifactorySaaSPath
from build_download_helper import dowload_build_with_progress
# Necessary ENV variables
def getenv(name, default=None):
env = os.getenv(name, default)
if env is not None:
return env
raise KeyError(f"Necessary {name} environment is not set")
TEMP_PATH = getenv("TEMP_PATH", ".")
# One of the following ENVs is necessary
JFROG_API_KEY = getenv("JFROG_API_KEY", "")
JFROG_TOKEN = getenv("JFROG_TOKEN", "")
class Packages(object):
rpm_arch = dict(all="noarch", amd64="x86_64")
packages = (
("clickhouse-client", "all"),
("clickhouse-common-static", "amd64"),
("clickhouse-common-static-dbg", "amd64"),
("clickhouse-server", "all"),
("clickhouse-test", "all"),
)
def __init__(self, version: str):
self.deb = tuple(
"_".join((name, version, arch + ".deb")) for name, arch in self.packages
)
rev = "2"
self.rpm = tuple(
"-".join((name, version, rev + "." + self.rpm_arch[arch] + ".rpm"))
for name, arch in self.packages
)
def arch(self, deb_pkg: str) -> str:
if deb_pkg not in self.deb:
raise ValueError("{} not in {}".format(deb_pkg, self.deb))
return deb_pkg.removesuffix(".deb").split("_")[-1]
@staticmethod
def path(package):
return os.path.join(TEMP_PATH, package)
class S3(object):
template = (
"https://s3.amazonaws.com/"
# "clickhouse-builds/"
"{bucket_name}/"
# "33333/" or "0/"
"{pr}/"
# "2bef313f75e4cacc6ea2ef2133e8849ecf0385ec/"
"{commit}/"
# "clickhouse_build_check_(actions)/"
"{check_name}/"
# "clang-13_relwithdebuginfo_memory_bundled_unsplitted_notidy_without_coverage_deb/"
"{build_name}/"
# "clickhouse-common-static_21.11.5.0_amd64.deb"
"{package}"
)
def __init__(
self,
bucket_name: str,
pr: int,
commit: str,
check_name: str,
build_name: str,
version: str,
):
self._common = dict(
bucket_name=bucket_name,
pr=pr,
commit=commit,
check_name=check_name,
build_name=build_name,
)
self.packages = Packages(version)
def download_package(self, package):
url = self.template.format_map({**self._common, "package": package})
dowload_build_with_progress(url, Packages.path(package))
def download_deb(self):
for package in self.packages.deb:
self.download_package(package)
def download_rpm(self):
for package in self.packages.rpm:
self.download_package(package)
class Release(object):
def __call__(self, name: str) -> str:
r = re.compile(r"^v\d{2}[.]\d+[.]\d+[.]\d+-(testing|prestable|stable|lts)$")
if not r.match(name):
raise argparse.ArgumentTypeError(
"release name does not match v12.13.14-TYPE pattern"
)
self._name = name
return self
@property
def version(self) -> str:
if getattr(self, "_version", False):
return self._version
version = self._name.removeprefix("v")
self._version = version.split("-")[0]
return self._version
@property
def type(self) -> str:
if getattr(self, "_type", False):
return self._type
self._type = self._name.split("-")[-1]
return self._type
class Artifactory(object):
def __init__(self, url: str, release: str, deb_repo="deb", rpm_repo="rpm"):
self._url = url
self._release = release
self._deb_url = "/".join((self._url, deb_repo, "pool", self._release)) + "/"
self._rpm_url = "/".join((self._url, rpm_repo, self._release)) + "/"
def deploy_deb(self, packages: Packages):
for package in packages.deb:
path = packages.path(package)
dist = self._release
comp = "main"
arch = packages.arch(package)
logging.info(
f"Deploy {path} distribution={dist};component={comp};"
f"architecture={arch} to artifactory"
)
self.deb(package).deploy_deb(path, dist, comp, arch)
def deploy_rpm(self, packages: Packages):
for package in packages.rpm:
path = packages.path(package)
logging.info(f"Deploy {path} to artifactory")
self.rpm(package).deploy_file(path)
def __path_helper(self, name, package) -> ArtifactorySaaSPath:
url = "/".join((getattr(self, name + "_url"), package))
path = None
if JFROG_API_KEY:
path = ArtifactorySaaSPath(url, apikey=JFROG_API_KEY)
elif JFROG_TOKEN:
path = ArtifactorySaaSPath(url, token=JFROG_TOKEN)
else:
raise KeyError("Neither JFROG_API_KEY nor JFROG_TOKEN env are defined")
return path
def deb(self, package) -> ArtifactorySaaSPath:
return self.__path_helper("_deb", package)
def rpm(self, package) -> ArtifactorySaaSPath:
return self.__path_helper("_rpm", package)
def commit(name):
r = re.compile(r"^([0-9]|[a-f]){40}$")
if not r.match(name):
raise argparse.ArgumentTypeError(
"commit hash should contain exactly 40 hex characters"
)
return name
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
description="Program to download artifacts from S3 and push them to "
"artifactory. ENV variables JFROG_API_KEY and JFROG_TOKEN are used "
"for authentication in the given order",
)
parser.add_argument("--pull-request", type=int, default=0, help="release name")
parser.add_argument(
"--commit", required=True, type=commit, help="commit hash for S3 bucket"
)
parser.add_argument("--release", required=True, type=Release(), help="release name")
parser.add_argument(
"--bucket-name",
default="clickhouse-builds",
help="AWS S3 bucket name",
)
parser.add_argument(
"--check-name",
default="ClickHouse build check (actions)",
help="check name, a part of bucket path, "
"will be converted to lower case with spaces->underscore",
)
parser.add_argument(
"--build-name",
default="clang-13_relwithdebuginfo_none_bundled_"
"unsplitted_notidy_without_coverage_deb",
help="build name, a part of bucket path",
)
parser.add_argument(
"--deb", action="store_true", help="if Debian packages should be processed"
)
parser.add_argument(
"--rpm", action="store_true", help="if RPM packages should be processed"
)
parser.add_argument(
"--artifactory-url", default="https://clickhousedb.jfrog.io/artifactory"
)
args = parser.parse_args()
if not args.deb and not args.rpm:
parser.error("at least one of --deb and --rpm should be specified")
args.check_name = args.check_name.lower().replace(" ", "_")
return args
def main():
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
args = parse_args()
s3 = S3(
args.bucket_name,
args.pull_request,
args.commit,
args.check_name,
args.build_name,
args.release.version,
)
art_client = Artifactory(args.artifactory_url, args.release.type)
if args.deb:
s3.download_deb()
art_client.deploy_deb(s3.packages)
if args.rpm:
s3.download_rpm()
art_client.deploy_rpm(s3.packages)
if __name__ == "__main__":
main()

View File

@ -83,7 +83,7 @@ def test_read_write_two_nodes(started_cluster):
finally:
try:
for zk_conn in [node1_zk, node2_zk, node3_zk]:
for zk_conn in [node1_zk, node2_zk]:
zk_conn.stop()
zk_conn.close()
except:
@ -156,7 +156,7 @@ def test_read_write_two_nodes_with_blocade(started_cluster):
finally:
try:
for zk_conn in [node1_zk, node2_zk, node3_zk]:
for zk_conn in [node1_zk, node2_zk]:
zk_conn.stop()
zk_conn.close()
except:

View File

@ -296,6 +296,21 @@ def test_partition_by(started_cluster):
assert(result.strip() == "1\t2\t3")
def test_seekable_formats(started_cluster):
hdfs_api = started_cluster.hdfs_api
table_function = f"hdfs('hdfs://hdfs1:9000/parquet', 'Parquet', 'a Int32, b String')"
node1.query(f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000)")
result = node1.query(f"SELECT count() FROM {table_function}")
assert(int(result) == 5000000)
table_function = f"hdfs('hdfs://hdfs1:9000/orc', 'ORC', 'a Int32, b String')"
node1.query(f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000)")
result = node1.query(f"SELECT count() FROM {table_function}")
assert(int(result) == 5000000)
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")

View File

@ -5,5 +5,15 @@
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_conf1>
<s3_parquet>
<url>http://minio1:9001/root/test_parquet</url>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_parquet>
<s3_orc>
<url>http://minio1:9001/root/test_orc</url>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_orc>
</named_collections>
</clickhouse>

View File

@ -759,6 +759,7 @@ def test_predefined_connection_configuration(started_cluster):
result = instance.query("SELECT * FROM s3(s3_conf1, format='CSV', structure='id UInt32')")
assert result == instance.query("SELECT number FROM numbers(10)")
result = ""
def test_url_reconnect_in_the_middle(started_cluster):
bucket = started_cluster.minio_bucket
@ -795,3 +796,47 @@ def test_url_reconnect_in_the_middle(started_cluster):
thread.join()
assert(int(result), 3914219105369203805)
def test_seekable_formats(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
table_function = f"s3(s3_parquet, structure='a Int32, b String', format='Parquet')"
instance.query(f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000)")
result = instance.query(f"SELECT count() FROM {table_function}")
assert(int(result) == 5000000)
table_function = f"s3(s3_orc, structure='a Int32, b String', format='ORC')"
instance.query(f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000)")
result = instance.query(f"SELECT count() FROM {table_function}")
assert(int(result) == 5000000)
result = instance.query(f"SELECT formatReadableSize(memory_usage) FROM system.query_log WHERE startsWith(query, 'SELECT count() FROM s3') AND memory_usage > 0 ORDER BY event_time desc")
print(result[:3])
assert(int(result[:3]) < 200)
def test_seekable_formats_url(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
table_function = f"s3(s3_parquet, structure='a Int32, b String', format='Parquet')"
instance.query(f"insert into table function {table_function} select number, randomString(100) from numbers(5000000)")
table_function = f"url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_parquet', 'Parquet', 'a Int32, b String')"
result = instance.query(f"SELECT count() FROM {table_function}")
assert(int(result) == 5000000)
table_function = f"s3(s3_orc, structure='a Int32, b String', format='ORC')"
instance.query(f"insert into table function {table_function} select number, randomString(100) from numbers(5000000)")
table_function = f"url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_orc', 'ORC', 'a Int32, b String')"
result = instance.query(f"SELECT count() FROM {table_function}")
assert(int(result) == 5000000)
result = instance.query(f"SELECT formatReadableSize(memory_usage) FROM system.query_log WHERE startsWith(query, 'SELECT count() FROM url') AND memory_usage > 0 ORDER BY event_time desc")
print(result[:3])
assert(int(result[:3]) < 200)

View File

@ -0,0 +1,26 @@
<test>
<settings>
<output_format_parallel_formatting>0</output_format_parallel_formatting>
</settings>
<substitutions>
<substitution>
<name>format</name>
<values>
<value>JSON</value>
<value>JSONCompact</value>
<value>XML</value>
</values>
</substitution>
</substitutions>
<preconditions>
<table_exists>test.hits</table_exists>
</preconditions>
<create_query>CREATE TABLE IF NOT EXISTS table_{format} ENGINE = File({format}, '/dev/null') AS SELECT SearchPhrase, ClientIP6, URL, Referer, URLDomain FROM test.hits limit 0</create_query>
<query>INSERT INTO table_{format} SELECT SearchPhrase, ClientIP6, URL, Referer, URLDomain FROM test.hits LIMIT 100000</query>
<drop_query>DROP TABLE IF EXISTS table_{format}</drop_query>
</test>

View File

@ -26,3 +26,5 @@ echo -ne 'x=1\ts=TSKV\nx=minus2\ts=trash1\ns=trash2\tx=-3\ns=TSKV Ok\tx=4\ns=tra
$CLICKHOUSE_CLIENT --query="SELECT * FROM formats_test ORDER BY x, s"
$CLICKHOUSE_CLIENT --query="DROP TABLE formats_test"
echo '::' | $CLICKHOUSE_LOCAL --structure 'i IPv4' --query='SELECT * FROM table' --input_format_allow_errors_num=1

View File

@ -16,7 +16,7 @@ SELECT * FROM replicated_truncate1 ORDER BY k;
SELECT * FROM replicated_truncate2 ORDER BY k;
SELECT '======After Truncate And Empty======';
TRUNCATE TABLE replicated_truncate1;
TRUNCATE TABLE replicated_truncate1 SETTINGS replication_alter_partitions_sync=2;
SELECT * FROM replicated_truncate1 ORDER BY k;
SELECT * FROM replicated_truncate2 ORDER BY k;

View File

@ -4,8 +4,13 @@
1 mv1 before moving tablesmv1
1 mv2 before moving tablesmv2
1 src before moving tables
asdf
asdf
test_01155_ordinary dict1 00000000-0000-0000-0000-000000000000
asdf
ordinary:
.inner.mv1
dict
dist
dst
mv1
@ -14,6 +19,7 @@ src
ordinary after rename:
atomic after rename:
.inner_id.
dict
dist
dst
mv1
@ -33,12 +39,14 @@ src
3 src after moving tables
3 src after renaming database
3 src before moving tables
.inner_id.
dist
dst
mv1
mv2
src
asdf
test_01155_ordinary .inner_id.
test_01155_ordinary dict
test_01155_ordinary dist
test_01155_ordinary dst
test_01155_ordinary mv1
test_01155_ordinary mv2
test_01155_ordinary src
CREATE DATABASE test_01155_atomic\nENGINE = Atomic
4 .inner.mv1 after renaming databasemv1
4 .inner.mv1 after renaming tablesmv1
@ -60,8 +68,11 @@ CREATE DATABASE test_01155_atomic\nENGINE = Atomic
4 src after renaming database
4 src after renaming tables
4 src before moving tables
asdf
test_01155_ordinary dict 00000000-0000-0000-0000-000000000000
test_01155_ordinary:
.inner.mv1
dict
dist
dst
mv1

View File

@ -7,15 +7,27 @@ CREATE DATABASE test_01155_ordinary ENGINE=Ordinary;
CREATE DATABASE test_01155_atomic ENGINE=Atomic;
USE test_01155_ordinary;
CREATE TABLE src (s String) ENGINE=MergeTree() PARTITION BY tuple() ORDER BY s;
CREATE MATERIALIZED VIEW mv1 (s String) ENGINE=MergeTree() PARTITION BY tuple() ORDER BY s AS SELECT (*,).1 || 'mv1' as s FROM src;
CREATE TABLE dst (s String) ENGINE=MergeTree() PARTITION BY tuple() ORDER BY s;
CREATE MATERIALIZED VIEW mv2 TO dst (s String) AS SELECT (*,).1 || 'mv2' as s FROM src;
CREATE TABLE dist (s String) Engine=Distributed(test_shard_localhost, test_01155_ordinary, src);
INSERT INTO dist VALUES ('before moving tables');
CREATE TABLE src (s String, x String DEFAULT 'a') ENGINE=MergeTree() PARTITION BY tuple() ORDER BY s;
CREATE MATERIALIZED VIEW mv1 (s String, x String DEFAULT 'b') ENGINE=MergeTree() PARTITION BY tuple() ORDER BY s AS SELECT (*,).1 || 'mv1' as s FROM src;
CREATE TABLE dst (s String, x String DEFAULT 'c') ENGINE=MergeTree() PARTITION BY tuple() ORDER BY s;
CREATE MATERIALIZED VIEW mv2 TO dst (s String, x String DEFAULT 'd') AS SELECT (*,).1 || 'mv2' as s FROM src;
CREATE TABLE dist (s String, x String DEFAULT 'asdf') ENGINE=Distributed(test_shard_localhost, test_01155_ordinary, src);
INSERT INTO dist(s) VALUES ('before moving tables');
SYSTEM FLUSH DISTRIBUTED dist;
CREATE DICTIONARY dict (s String, x String DEFAULT 'qwerty') PRIMARY KEY s
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'dist' DB 'test_01155_ordinary'))
LIFETIME(MIN 0 MAX 2) LAYOUT(COMPLEX_KEY_CACHE(SIZE_IN_CELLS 123));
-- FIXME Cannot convert column `1` because it is non constant in source stream but must be constant in result
SELECT materialize(1), substr(_table, 1, 10), s FROM merge('test_01155_ordinary', '') ORDER BY _table, s;
SELECT dictGet('test_01155_ordinary.dict', 'x', 'before moving tables');
RENAME DICTIONARY test_01155_ordinary.dict TO test_01155_ordinary.dict1;
SELECT dictGet('test_01155_ordinary.dict1', 'x', 'before moving tables');
SELECT database, name, uuid FROM system.dictionaries WHERE database='test_01155_ordinary';
RENAME TABLE test_01155_ordinary.dict1 TO test_01155_ordinary.dict;
SELECT dictGet('test_01155_ordinary.dict', 'x', 'before moving tables');
-- Move tables with materialized views from Ordinary to Atomic
SELECT 'ordinary:';
@ -24,7 +36,10 @@ RENAME TABLE test_01155_ordinary.mv1 TO test_01155_atomic.mv1;
RENAME TABLE test_01155_ordinary.mv2 TO test_01155_atomic.mv2;
RENAME TABLE test_01155_ordinary.dst TO test_01155_atomic.dst;
RENAME TABLE test_01155_ordinary.src TO test_01155_atomic.src;
SET check_table_dependencies=0;
RENAME TABLE test_01155_ordinary.dist TO test_01155_atomic.dist;
SET check_table_dependencies=1;
RENAME DICTIONARY test_01155_ordinary.dict TO test_01155_atomic.dict;
SELECT 'ordinary after rename:';
SELECT substr(name, 1, 10) FROM system.tables WHERE database='test_01155_ordinary';
SELECT 'atomic after rename:';
@ -32,17 +47,19 @@ SELECT substr(name, 1, 10) FROM system.tables WHERE database='test_01155_atomic'
DROP DATABASE test_01155_ordinary;
USE default;
INSERT INTO test_01155_atomic.src VALUES ('after moving tables');
SELECT materialize(2), substr(_table, 1, 10), s FROM merge('test_01155_atomic', '') ORDER BY _table, s; -- { serverError 81 }
INSERT INTO test_01155_atomic.src(s) VALUES ('after moving tables');
--SELECT materialize(2), substr(_table, 1, 10), s FROM merge('test_01155_atomic', '') ORDER BY _table, s; -- { serverError 81 }
--SELECT dictGet('test_01155_ordinary.dict', 'x', 'after moving tables'); -- { serverError 36 }
RENAME DATABASE test_01155_atomic TO test_01155_ordinary;
USE test_01155_ordinary;
INSERT INTO dist VALUES ('after renaming database');
INSERT INTO dist(s) VALUES ('after renaming database');
SYSTEM FLUSH DISTRIBUTED dist;
SELECT materialize(3), substr(_table, 1, 10), s FROM merge('test_01155_ordinary', '') ORDER BY _table, s;
SELECT dictGet('test_01155_ordinary.dict', 'x', 'after renaming database');
SELECT substr(name, 1, 10) FROM system.tables WHERE database='test_01155_ordinary';
SELECT database, substr(name, 1, 10) FROM system.tables WHERE database like 'test_01155_%';
-- Move tables back
RENAME DATABASE test_01155_ordinary TO test_01155_atomic;
@ -55,10 +72,13 @@ RENAME TABLE test_01155_atomic.mv2 TO test_01155_ordinary.mv2;
RENAME TABLE test_01155_atomic.dst TO test_01155_ordinary.dst;
RENAME TABLE test_01155_atomic.src TO test_01155_ordinary.src;
RENAME TABLE test_01155_atomic.dist TO test_01155_ordinary.dist;
RENAME DICTIONARY test_01155_atomic.dict TO test_01155_ordinary.dict;
INSERT INTO dist VALUES ('after renaming tables');
INSERT INTO dist(s) VALUES ('after renaming tables');
SYSTEM FLUSH DISTRIBUTED dist;
SELECT materialize(4), substr(_table, 1, 10), s FROM merge('test_01155_ordinary', '') ORDER BY _table, s;
SELECT dictGet('test_01155_ordinary.dict', 'x', 'after renaming tables');
SELECT database, name, uuid FROM system.dictionaries WHERE database='test_01155_ordinary';
SELECT 'test_01155_ordinary:';
SHOW TABLES FROM test_01155_ordinary;
SELECT 'test_01155_atomic:';

View File

@ -1,7 +1,13 @@
dict NOT_LOADED
_ Memory
dict Dictionary
dict1 NOT_LOADED
t Memory
t NOT_LOADED
_ Memory
dict Memory
t Dictionary
test
dict1 LOADED
_ Memory
dict1 Dictionary
test

View File

@ -4,6 +4,7 @@ DROP DATABASE IF EXISTS test_01191;
CREATE DATABASE test_01191 ENGINE=Atomic;
CREATE TABLE test_01191._ (n UInt64, s String) ENGINE = Memory();
CREATE TABLE test_01191.t (n UInt64, s String) ENGINE = Memory();
CREATE DICTIONARY test_01191.dict (n UInt64, s String)
PRIMARY KEY n
@ -16,9 +17,15 @@ SELECT name, status FROM system.dictionaries WHERE database='test_01191';
SELECT name, engine FROM system.tables WHERE database='test_01191' ORDER BY name;
RENAME DICTIONARY test_01191.table TO test_01191.table1; -- {serverError 60}
EXCHANGE TABLES test_01191.table AND test_01191.dict; -- {serverError 60}
EXCHANGE TABLES test_01191.dict AND test_01191.table; -- {serverError 80}
RENAME TABLE test_01191.dict TO test_01191.dict1; -- {serverError 80}
EXCHANGE DICTIONARIES test_01191._ AND test_01191.dict; -- {serverError 80}
EXCHANGE TABLES test_01191.t AND test_01191.dict;
SELECT name, status FROM system.dictionaries WHERE database='test_01191';
SELECT name, engine FROM system.tables WHERE database='test_01191' ORDER BY name;
SELECT dictGet(test_01191.t, 's', toUInt64(42));
EXCHANGE TABLES test_01191.dict AND test_01191.t;
RENAME DICTIONARY test_01191.t TO test_01191.dict1; -- {serverError 80}
DROP DICTIONARY test_01191.t; -- {serverError 80}
DROP TABLE test_01191.t;
CREATE DATABASE dummy_db ENGINE=Atomic;
RENAME DICTIONARY test_01191.dict TO dummy_db.dict1;

View File

@ -8,7 +8,6 @@ import uuid
CLICKHOUSE_HOST = os.environ.get('CLICKHOUSE_HOST', '127.0.0.1')
CLICKHOUSE_PORT = int(os.environ.get('CLICKHOUSE_PORT_TCP', '900000'))
CLICKHOUSE_DATABASE = os.environ.get('CLICKHOUSE_DATABASE', 'default')
CLICKHOUSE_QUERY_ID = uuid.uuid4().hex
def writeVarUInt(x, ba):
for _ in range(0, 9):
@ -111,9 +110,9 @@ def receiveHello(s):
# print("Version patch: ", server_version_patch)
def serializeClientInfo(ba):
def serializeClientInfo(ba, query_id):
writeStringBinary('default', ba) # initial_user
writeStringBinary(CLICKHOUSE_QUERY_ID, ba) # initial_query_id
writeStringBinary(query_id, ba) # initial_query_id
writeStringBinary('127.0.0.1:9000', ba) # initial_address
ba.extend([0] * 8) # initial_query_start_time_microseconds
ba.append(1) # TCP
@ -131,13 +130,14 @@ def serializeClientInfo(ba):
def sendQuery(s, query):
ba = bytearray()
query_id = uuid.uuid4().hex
writeVarUInt(1, ba) # query
writeStringBinary(CLICKHOUSE_QUERY_ID, ba)
writeStringBinary(query_id, ba)
ba.append(1) # INITIAL_QUERY
# client info
serializeClientInfo(ba)
serializeClientInfo(ba, query_id)
writeStringBinary('', ba) # No settings
writeStringBinary('', ba) # No interserver secret

View File

@ -18,7 +18,9 @@
▁▂▇▆█ ▁
▁█
▁█
▁▁█
▂█▁
▁▃▅█
▁▄▂▇█
▁▃▅█
0 ▁▁▂▂▂▂▃▃▃▄▄▄▅▅▅▅▆▆▆▇▇▇▇██
1 ▁▁▂▂▂▂▃▃▃▄▄▄▅▅▅▅▆▆▆▇▇▇▇██

View File

@ -33,3 +33,5 @@ SELECT sparkbar(4,toDate('2020-01-01'),toDate('2020-01-08'))(event_date,cnt) FRO
SELECT sparkbar(5,toDate('2020-01-01'),toDate('2020-01-10'))(event_date,cnt) FROM spark_bar_test;
DROP TABLE IF EXISTS spark_bar_test;
WITH number DIV 50 AS k, number % 50 AS value SELECT k, sparkbar(50, 0, 99)(number, value) FROM numbers(100) GROUP BY k ORDER BY k;

View File

@ -4,5 +4,10 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
touch test_exception
$CLICKHOUSE_LOCAL --query="SELECT 1 INTO OUTFILE 'test_exception' FORMAT Native" 2>&1 | grep -q "Code: 76. DB::ErrnoException:" && echo 'OK' || echo 'FAIL' ||:
touch "${CLICKHOUSE_TMP}/test_exception"
function cleanup()
{
rm "${CLICKHOUSE_TMP}/test_exception"
}
trap cleanup EXIT
$CLICKHOUSE_LOCAL --query="SELECT 1 INTO OUTFILE '${CLICKHOUSE_TMP}/test_exception' FORMAT Native" 2>&1 | grep -q "Code: 76. DB::ErrnoException:" && echo 'OK' || echo 'FAIL' ||:

View File

@ -37,10 +37,10 @@ SELECT d, count() FROM with_fill_date GROUP BY d ORDER BY d WITH FILL STEP INTER
DROP TABLE with_fill_date;
DROP TABLE IF EXISTS with_fill_date;
CREATE TABLE with_fill_date (d DateTime, d64 DateTime64) ENGINE = Memory;
CREATE TABLE with_fill_date (d DateTime('UTC'), d64 DateTime64(3, 'UTC')) ENGINE = Memory;
INSERT INTO with_fill_date VALUES (toDateTime('2020-02-05 10:20:00'), toDateTime64('2020-02-05 10:20:00', 3));
INSERT INTO with_fill_date VALUES (toDateTime('2020-03-08 11:01:00'), toDateTime64('2020-03-08 11:01:00', 3));
INSERT INTO with_fill_date VALUES (toDateTime('2020-02-05 10:20:00', 'UTC'), toDateTime64('2020-02-05 10:20:00', 3, 'UTC'));
INSERT INTO with_fill_date VALUES (toDateTime('2020-03-08 11:01:00', 'UTC'), toDateTime64('2020-03-08 11:01:00', 3, 'UTC'));
SELECT '15 MINUTE';
SELECT d, count() FROM with_fill_date GROUP BY d ORDER BY d WITH FILL STEP INTERVAL 15 MINUTE LIMIT 5;

View File

@ -0,0 +1,35 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS t_buffer_map"
$CLICKHOUSE_CLIENT -q "CREATE TABLE t_buffer_map(m1 Map(String, UInt64), m2 Map(String, String)) ENGINE = Buffer('', '', 1, 1, 1, 1000000000000, 1000000000000, 1000000000000, 1000000000000)"
function insert1
{
while true; do
$CLICKHOUSE_CLIENT -q "INSERT INTO t_buffer_map SELECT (range(10), range(10)), (range(10), range(10)) from numbers(100)"
done
}
function select1
{
while true; do
$CLICKHOUSE_CLIENT -q "SELECT * FROM t_buffer_map" 2> /dev/null > /dev/null
done
}
TIMEOUT=10
export -f insert1
export -f select1
timeout $TIMEOUT bash -c insert1 &
timeout $TIMEOUT bash -c select1 &
wait
echo "OK"
$CLICKHOUSE_CLIENT -q "DROP TABLE t_buffer_map"

View File

@ -0,0 +1 @@
200000

View File

@ -0,0 +1,11 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
${CLICKHOUSE_CURL} -sS ${CLICKHOUSE_URL} -d "drop table if exists test_progress"
${CLICKHOUSE_CURL} -sS ${CLICKHOUSE_URL} -d "create table test_progress (x UInt64, y UInt64, d Date, a Array(UInt64), s String) engine=MergeTree() order by x"
${CLICKHOUSE_CURL} -sS ${CLICKHOUSE_URL} -d "insert into test_progress select number as x, number + 1 as y, toDate(number) as d, range(number % 10) as a, repeat(toString(number), 10) as s from numbers(200000)"
${CLICKHOUSE_CURL} -sS ${CLICKHOUSE_URL} -d "SELECT * from test_progress FORMAT JSONEachRowWithProgress" | grep -v --text "progress" | wc -l
${CLICKHOUSE_CURL} -sS ${CLICKHOUSE_URL} -d "drop table test_progress";