diff --git a/base/ext/scope_guard_safe.h b/base/ext/scope_guard_safe.h index 7cfb3959a81..55140213572 100644 --- a/base/ext/scope_guard_safe.h +++ b/base/ext/scope_guard_safe.h @@ -12,7 +12,8 @@ /// /// NOTE: it should be used with caution. #define SCOPE_EXIT_MEMORY(...) SCOPE_EXIT( \ - MemoryTracker::LockExceptionInThread lock_memory_tracker; \ + MemoryTracker::LockExceptionInThread \ + lock_memory_tracker(VariableContext::Global); \ __VA_ARGS__; \ ) @@ -56,7 +57,8 @@ #define SCOPE_EXIT_MEMORY_SAFE(...) SCOPE_EXIT( \ try \ { \ - MemoryTracker::LockExceptionInThread lock_memory_tracker; \ + MemoryTracker::LockExceptionInThread \ + lock_memory_tracker(VariableContext::Global); \ __VA_ARGS__; \ } \ catch (...) \ diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index d05177739fe..42792784a14 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -96,14 +96,8 @@ if (USE_INTERNAL_ZLIB_LIBRARY) add_subdirectory (${INTERNAL_ZLIB_NAME}) # We should use same defines when including zlib.h as used when zlib compiled target_compile_definitions (zlib PUBLIC ZLIB_COMPAT WITH_GZFILEOP) - if (TARGET zlibstatic) - target_compile_definitions (zlibstatic PUBLIC ZLIB_COMPAT WITH_GZFILEOP) - endif () if (ARCH_AMD64 OR ARCH_AARCH64) target_compile_definitions (zlib PUBLIC X86_64 UNALIGNED_OK) - if (TARGET zlibstatic) - target_compile_definitions (zlibstatic PUBLIC X86_64 UNALIGNED_OK) - endif () endif () endif () diff --git a/contrib/zlib-ng b/contrib/zlib-ng index 6fd1846c8b8..b82d3497a5a 160000 --- a/contrib/zlib-ng +++ b/contrib/zlib-ng @@ -1 +1 @@ -Subproject commit 6fd1846c8b8f59436fe2dd752d0f316ddbb64df6 +Subproject commit b82d3497a5afc46dec3c5d07e4b163b169f251d7 diff --git a/programs/server/config.xml b/programs/server/config.xml index 9c01b328290..d75163ca907 100644 --- a/programs/server/config.xml +++ b/programs/server/config.xml @@ -7,7 +7,20 @@ --> - + trace /var/log/clickhouse-server/clickhouse-server.log /var/log/clickhouse-server/clickhouse-server.err.log diff --git a/src/Common/Exception.cpp b/src/Common/Exception.cpp index e8a98021588..dca19eea7f2 100644 --- a/src/Common/Exception.cpp +++ b/src/Common/Exception.cpp @@ -150,7 +150,7 @@ void tryLogCurrentException(Poco::Logger * logger, const std::string & start_of_ /// /// And in this case the exception will not be logged, so let's block the /// MemoryTracker until the exception will be logged. - MemoryTracker::LockExceptionInThread lock_memory_tracker; + MemoryTracker::LockExceptionInThread lock_memory_tracker(VariableContext::Global); try { diff --git a/src/Common/MemoryTracker.cpp b/src/Common/MemoryTracker.cpp index a584885cf0f..60fb4d06b14 100644 --- a/src/Common/MemoryTracker.cpp +++ b/src/Common/MemoryTracker.cpp @@ -24,8 +24,8 @@ namespace /// /// - when it is explicitly blocked with LockExceptionInThread /// -/// - to avoid std::terminate(), when stack unwinding is currently in progress -/// in this thread. +/// - when there are uncaught exceptions objects in the current thread +/// (to avoid std::terminate()) /// /// NOTE: that since C++11 destructor marked with noexcept by default, and /// this means that any throw from destructor (that is not marked with diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 539e6c7780e..e4ace3a9451 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -252,8 +252,6 @@ class IColumn; * Almost all limits apply to each stream individually. \ */ \ \ - M(UInt64, limit, 0, "Limit on read rows from the most 'end' result for select query, default 0 means no limit length", 0) \ - M(UInt64, offset, 0, "Offset on read rows from the most 'end' result for select query", 0) \ M(UInt64, max_rows_to_read, 0, "Limit on read rows from the most 'deep' sources. That is, only in the deepest subquery. When reading from a remote server, it is only checked on a remote server.", 0) \ M(UInt64, max_bytes_to_read, 0, "Limit on read bytes (after decompression) from the most 'deep' sources. That is, only in the deepest subquery. When reading from a remote server, it is only checked on a remote server.", 0) \ M(OverflowMode, read_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \ @@ -464,8 +462,11 @@ class IColumn; \ M(Bool, database_replicated_ddl_output, true, "Obsolete setting, does nothing. Will be removed after 2021-09-08", 0) \ M(HandleKafkaErrorMode, handle_kafka_error_mode, HandleKafkaErrorMode::DEFAULT, "How to handle errors for Kafka engine. Passible values: default, stream.", 0) \ + M(UInt64, limit, 0, "Limit on read rows from the most 'end' result for select query, default 0 means no limit length", 0) \ + M(UInt64, offset, 0, "Offset on read rows from the most 'end' result for select query", 0) \ M(Bool, allow_experimental_funnel_functions, true, "Enable experimental functions for funnel analysis.", 0) \ + // End of COMMON_SETTINGS // Please add settings related to formats into the FORMAT_FACTORY_SETTINGS below. diff --git a/src/DataStreams/PushingToViewsBlockOutputStream.cpp b/src/DataStreams/PushingToViewsBlockOutputStream.cpp index 56100205b0d..16baf4377e0 100644 --- a/src/DataStreams/PushingToViewsBlockOutputStream.cpp +++ b/src/DataStreams/PushingToViewsBlockOutputStream.cpp @@ -311,7 +311,7 @@ void PushingToViewsBlockOutputStream::writeSuffix() UInt64 milliseconds = main_watch.elapsedMilliseconds(); if (views.size() > 1) { - LOG_TRACE(log, "Pushing from {} to {} views took {} ms.", + LOG_DEBUG(log, "Pushing from {} to {} views took {} ms.", storage->getStorageID().getNameForLogs(), views.size(), milliseconds); } diff --git a/src/DataStreams/copyData.cpp b/src/DataStreams/copyData.cpp index a0651999034..a26052778a8 100644 --- a/src/DataStreams/copyData.cpp +++ b/src/DataStreams/copyData.cpp @@ -49,6 +49,16 @@ void copyDataImpl(IBlockInputStream & from, IBlockOutputStream & to, TCancelCall to.writeSuffix(); } +void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function & progress, + std::atomic * is_cancelled) +{ + auto is_cancelled_pred = [is_cancelled] () + { + return isAtomicSet(is_cancelled); + }; + + copyDataImpl(from, to, is_cancelled_pred, progress); +} inline void doNothing(const Block &) {} diff --git a/src/DataStreams/copyData.h b/src/DataStreams/copyData.h index f2bce8f411b..3dc90aed37d 100644 --- a/src/DataStreams/copyData.h +++ b/src/DataStreams/copyData.h @@ -16,6 +16,9 @@ class Block; */ void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic * is_cancelled = nullptr); +void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function & progress, + std::atomic * is_cancelled = nullptr); + void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function & is_cancelled); void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function & is_cancelled, diff --git a/src/Dictionaries/DirectDictionary.cpp b/src/Dictionaries/DirectDictionary.cpp index 0a3e685df1a..2038704414e 100644 --- a/src/Dictionaries/DirectDictionary.cpp +++ b/src/Dictionaries/DirectDictionary.cpp @@ -51,6 +51,14 @@ Columns DirectDictionary::getColumns( key_to_fetched_index.reserve(requested_keys.size()); auto fetched_columns_from_storage = request.makeAttributesResultColumns(); + for (size_t attribute_index = 0; attribute_index < request.attributesSize(); ++attribute_index) + { + if (!request.shouldFillResultColumnWithIndex(attribute_index)) + continue; + + auto & fetched_column_from_storage = fetched_columns_from_storage[attribute_index]; + fetched_column_from_storage->reserve(requested_keys.size()); + } size_t fetched_key_index = 0; diff --git a/src/Disks/S3/registerDiskS3.cpp b/src/Disks/S3/registerDiskS3.cpp index a15b6bcf822..7e3fbf27c6d 100644 --- a/src/Disks/S3/registerDiskS3.cpp +++ b/src/Disks/S3/registerDiskS3.cpp @@ -122,7 +122,7 @@ void registerDiskS3(DiskFactory & factory) throw Exception("S3 path must ends with '/', but '" + uri.key + "' doesn't.", ErrorCodes::BAD_ARGUMENTS); client_configuration.connectTimeoutMs = config.getUInt(config_prefix + ".connect_timeout_ms", 10000); - client_configuration.httpRequestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 5000); + client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 5000); client_configuration.maxConnections = config.getUInt(config_prefix + ".max_connections", 100); client_configuration.endpointOverride = uri.endpoint; diff --git a/src/Functions/CMakeLists.txt b/src/Functions/CMakeLists.txt index 1c3beb2e47d..7cbca175c0d 100644 --- a/src/Functions/CMakeLists.txt +++ b/src/Functions/CMakeLists.txt @@ -1,5 +1,7 @@ configure_file(config_functions.h.in ${ConfigIncludePath}/config_functions.h) +add_subdirectory(divide) + include(${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake) add_headers_and_sources(clickhouse_functions .) @@ -25,7 +27,7 @@ target_link_libraries(clickhouse_functions PRIVATE ${ZLIB_LIBRARIES} boost::filesystem - libdivide + divide_impl ) if (OPENSSL_CRYPTO_LIBRARY) diff --git a/src/Functions/divide/CMakeLists.txt b/src/Functions/divide/CMakeLists.txt new file mode 100644 index 00000000000..e5a10f0817c --- /dev/null +++ b/src/Functions/divide/CMakeLists.txt @@ -0,0 +1,22 @@ +# A library for integer division by constant with CPU dispatching. + +if (ARCH_AMD64) + add_library(divide_impl_sse2 divideImpl.cpp) + target_compile_options(divide_impl_sse2 PRIVATE -msse2 -DNAMESPACE=SSE2) + target_link_libraries(divide_impl_sse2 libdivide) + + add_library(divide_impl_avx2 divideImpl.cpp) + target_compile_options(divide_impl_avx2 PRIVATE -mavx2 -DNAMESPACE=AVX2) + target_link_libraries(divide_impl_avx2 libdivide) + + set(IMPLEMENTATIONS divide_impl_sse2 divide_impl_avx2) +else () + add_library(divide_impl_generic divideImpl.cpp) + target_compile_options(divide_impl_generic PRIVATE -DNAMESPACE=Generic) + target_link_libraries(divide_impl_generic libdivide) + + set(IMPLEMENTATIONS divide_impl_generic) +endif () + +add_library(divide_impl divide.cpp) +target_link_libraries(divide_impl ${IMPLEMENTATIONS} clickhouse_common_io) diff --git a/src/Functions/divide/divide.cpp b/src/Functions/divide/divide.cpp new file mode 100644 index 00000000000..5ab11df2a65 --- /dev/null +++ b/src/Functions/divide/divide.cpp @@ -0,0 +1,57 @@ +#include "divide.h" +#include + +#if defined(__x86_64__) && !defined(ARCADIA_BUILD) +namespace SSE2 +{ + template + void divideImpl(const A * __restrict a_pos, B b, ResultType * __restrict c_pos, size_t size); +} + +namespace AVX2 +{ + template + void divideImpl(const A * __restrict a_pos, B b, ResultType * __restrict c_pos, size_t size); +} +#else +namespace Generic +{ + template + void divideImpl(const A * __restrict a_pos, B b, ResultType * __restrict c_pos, size_t size); +} +#endif + + +template +void divideImpl(const A * __restrict a_pos, B b, ResultType * __restrict c_pos, size_t size) +{ +#if defined(__x86_64__) && !defined(ARCADIA_BUILD) + if (DB::Cpu::CpuFlagsCache::have_AVX2) + AVX2::divideImpl(a_pos, b, c_pos, size); + else if (DB::Cpu::CpuFlagsCache::have_SSE2) + SSE2::divideImpl(a_pos, b, c_pos, size); +#else + Generic::divideImpl(a_pos, b, c_pos, size); +#endif +} + + +template void divideImpl(const uint64_t * __restrict, uint64_t, uint64_t * __restrict, size_t); +template void divideImpl(const uint64_t * __restrict, uint32_t, uint64_t * __restrict, size_t); +template void divideImpl(const uint64_t * __restrict, uint16_t, uint64_t * __restrict, size_t); +template void divideImpl(const uint64_t * __restrict, char8_t, uint64_t * __restrict, size_t); + +template void divideImpl(const uint32_t * __restrict, uint64_t, uint32_t * __restrict, size_t); +template void divideImpl(const uint32_t * __restrict, uint32_t, uint32_t * __restrict, size_t); +template void divideImpl(const uint32_t * __restrict, uint16_t, uint32_t * __restrict, size_t); +template void divideImpl(const uint32_t * __restrict, char8_t, uint32_t * __restrict, size_t); + +template void divideImpl(const int64_t * __restrict, int64_t, int64_t * __restrict, size_t); +template void divideImpl(const int64_t * __restrict, int32_t, int64_t * __restrict, size_t); +template void divideImpl(const int64_t * __restrict, int16_t, int64_t * __restrict, size_t); +template void divideImpl(const int64_t * __restrict, int8_t, int64_t * __restrict, size_t); + +template void divideImpl(const int32_t * __restrict, int64_t, int32_t * __restrict, size_t); +template void divideImpl(const int32_t * __restrict, int32_t, int32_t * __restrict, size_t); +template void divideImpl(const int32_t * __restrict, int16_t, int32_t * __restrict, size_t); +template void divideImpl(const int32_t * __restrict, int8_t, int32_t * __restrict, size_t); diff --git a/src/Functions/divide/divide.h b/src/Functions/divide/divide.h new file mode 100644 index 00000000000..1c17a461159 --- /dev/null +++ b/src/Functions/divide/divide.h @@ -0,0 +1,6 @@ +#pragma once + +#include + +template +extern void divideImpl(const A * __restrict a_pos, B b, ResultType * __restrict c_pos, size_t size); diff --git a/src/Functions/divide/divideImpl.cpp b/src/Functions/divide/divideImpl.cpp new file mode 100644 index 00000000000..f4c1a97d3ad --- /dev/null +++ b/src/Functions/divide/divideImpl.cpp @@ -0,0 +1,79 @@ +/// This translation unit should be compiled multiple times +/// with different values of NAMESPACE and machine flags (sse2, avx2). + +#if !defined(NAMESPACE) + #if defined(ARCADIA_BUILD) + #define NAMESPACE Generic + #else + #error "NAMESPACE macro must be defined" + #endif +#endif + +#if defined(__AVX2__) + #define REG_SIZE 32 + #define LIBDIVIDE_AVX2 +#elif defined(__SSE2__) + #define REG_SIZE 16 + #define LIBDIVIDE_SSE2 +#endif + +#include + + +namespace NAMESPACE +{ + +template +void divideImpl(const A * __restrict a_pos, B b, ResultType * __restrict c_pos, size_t size) +{ + libdivide::divider divider(b); + const A * a_end = a_pos + size; + +#if defined(__SSE2__) + static constexpr size_t values_per_simd_register = REG_SIZE / sizeof(A); + const A * a_end_simd = a_pos + size / values_per_simd_register * values_per_simd_register; + + while (a_pos < a_end_simd) + { +#if defined(__AVX2__) + _mm256_storeu_si256(reinterpret_cast<__m256i *>(c_pos), + _mm256_loadu_si256(reinterpret_cast(a_pos)) / divider); +#else + _mm_storeu_si128(reinterpret_cast<__m128i *>(c_pos), + _mm_loadu_si128(reinterpret_cast(a_pos)) / divider); +#endif + + a_pos += values_per_simd_register; + c_pos += values_per_simd_register; + } +#endif + + while (a_pos < a_end) + { + *c_pos = *a_pos / divider; + ++a_pos; + ++c_pos; + } +} + +template void divideImpl(const uint64_t * __restrict, uint64_t, uint64_t * __restrict, size_t); +template void divideImpl(const uint64_t * __restrict, uint32_t, uint64_t * __restrict, size_t); +template void divideImpl(const uint64_t * __restrict, uint16_t, uint64_t * __restrict, size_t); +template void divideImpl(const uint64_t * __restrict, char8_t, uint64_t * __restrict, size_t); + +template void divideImpl(const uint32_t * __restrict, uint64_t, uint32_t * __restrict, size_t); +template void divideImpl(const uint32_t * __restrict, uint32_t, uint32_t * __restrict, size_t); +template void divideImpl(const uint32_t * __restrict, uint16_t, uint32_t * __restrict, size_t); +template void divideImpl(const uint32_t * __restrict, char8_t, uint32_t * __restrict, size_t); + +template void divideImpl(const int64_t * __restrict, int64_t, int64_t * __restrict, size_t); +template void divideImpl(const int64_t * __restrict, int32_t, int64_t * __restrict, size_t); +template void divideImpl(const int64_t * __restrict, int16_t, int64_t * __restrict, size_t); +template void divideImpl(const int64_t * __restrict, int8_t, int64_t * __restrict, size_t); + +template void divideImpl(const int32_t * __restrict, int64_t, int32_t * __restrict, size_t); +template void divideImpl(const int32_t * __restrict, int32_t, int32_t * __restrict, size_t); +template void divideImpl(const int32_t * __restrict, int16_t, int32_t * __restrict, size_t); +template void divideImpl(const int32_t * __restrict, int8_t, int32_t * __restrict, size_t); + +} diff --git a/src/Functions/intDiv.cpp b/src/Functions/intDiv.cpp index 804696f2776..79e35a19283 100644 --- a/src/Functions/intDiv.cpp +++ b/src/Functions/intDiv.cpp @@ -1,11 +1,7 @@ #include #include -#if defined(__SSE2__) -# define LIBDIVIDE_SSE2 1 -#endif - -#include +#include "divide/divide.h" namespace DB @@ -70,34 +66,11 @@ struct DivideIntegralByConstantImpl if (unlikely(static_cast(b) == 0)) throw Exception("Division by zero", ErrorCodes::ILLEGAL_DIVISION); - libdivide::divider divider(b); - - const A * a_end = a_pos + size; - -#if defined(__SSE2__) - static constexpr size_t values_per_sse_register = 16 / sizeof(A); - const A * a_end_sse = a_pos + size / values_per_sse_register * values_per_sse_register; - - while (a_pos < a_end_sse) - { - _mm_storeu_si128(reinterpret_cast<__m128i *>(c_pos), - _mm_loadu_si128(reinterpret_cast(a_pos)) / divider); - - a_pos += values_per_sse_register; - c_pos += values_per_sse_register; - } -#endif - - while (a_pos < a_end) - { - *c_pos = *a_pos / divider; - ++a_pos; - ++c_pos; - } + divideImpl(a_pos, b, c_pos, size); } }; -/** Specializations are specified for dividing numbers of the type UInt64 and UInt32 by the numbers of the same sign. +/** Specializations are specified for dividing numbers of the type UInt64, UInt32, Int64, Int32 by the numbers of the same sign. * Can be expanded to all possible combinations, but more code is needed. */ diff --git a/src/Functions/modulo.cpp b/src/Functions/modulo.cpp index fe215851bb6..6ac0c82bd3e 100644 --- a/src/Functions/modulo.cpp +++ b/src/Functions/modulo.cpp @@ -78,12 +78,11 @@ struct ModuloByConstantImpl if (b < 0) b = -b; - libdivide::divider divider(b); - /// Here we failed to make the SSE variant from libdivide give an advantage. if (b & (b - 1)) { + libdivide::divider divider(b); for (size_t i = 0; i < size; ++i) dst[i] = src[i] - (src[i] / divider) * b; /// NOTE: perhaps, the division semantics with the remainder of negative numbers is not preserved. } diff --git a/src/Functions/ya.make b/src/Functions/ya.make index 52ed54ec64f..660f7b115bf 100644 --- a/src/Functions/ya.make +++ b/src/Functions/ya.make @@ -229,6 +229,8 @@ SRCS( defaultValueOfTypeName.cpp demange.cpp divide.cpp + divide/divide.cpp + divide/divideImpl.cpp dumpColumnStructure.cpp e.cpp empty.cpp diff --git a/src/IO/BrotliWriteBuffer.cpp b/src/IO/BrotliWriteBuffer.cpp index 512ed5fc93f..3d0afa86360 100644 --- a/src/IO/BrotliWriteBuffer.cpp +++ b/src/IO/BrotliWriteBuffer.cpp @@ -50,7 +50,7 @@ BrotliWriteBuffer::BrotliWriteBuffer(std::unique_ptr out_, int comp BrotliWriteBuffer::~BrotliWriteBuffer() { /// FIXME move final flush into the caller - MemoryTracker::LockExceptionInThread lock; + MemoryTracker::LockExceptionInThread lock(VariableContext::Global); finish(); } diff --git a/src/IO/LZMADeflatingWriteBuffer.cpp b/src/IO/LZMADeflatingWriteBuffer.cpp index 7ea4f7945dc..29cde872241 100644 --- a/src/IO/LZMADeflatingWriteBuffer.cpp +++ b/src/IO/LZMADeflatingWriteBuffer.cpp @@ -50,7 +50,7 @@ LZMADeflatingWriteBuffer::LZMADeflatingWriteBuffer( LZMADeflatingWriteBuffer::~LZMADeflatingWriteBuffer() { /// FIXME move final flush into the caller - MemoryTracker::LockExceptionInThread lock; + MemoryTracker::LockExceptionInThread lock(VariableContext::Global); finish(); lzma_end(&lstr); diff --git a/src/IO/S3/PocoHTTPClient.cpp b/src/IO/S3/PocoHTTPClient.cpp index 6bd6d0d36ba..6df8d5ec5ad 100644 --- a/src/IO/S3/PocoHTTPClient.cpp +++ b/src/IO/S3/PocoHTTPClient.cpp @@ -81,8 +81,8 @@ PocoHTTPClient::PocoHTTPClient(const PocoHTTPClientConfiguration & clientConfigu : per_request_configuration(clientConfiguration.perRequestConfiguration) , timeouts(ConnectionTimeouts( Poco::Timespan(clientConfiguration.connectTimeoutMs * 1000), /// connection timeout. - Poco::Timespan(clientConfiguration.httpRequestTimeoutMs * 1000), /// send timeout. - Poco::Timespan(clientConfiguration.httpRequestTimeoutMs * 1000) /// receive timeout. + Poco::Timespan(clientConfiguration.requestTimeoutMs * 1000), /// send timeout. + Poco::Timespan(clientConfiguration.requestTimeoutMs * 1000) /// receive timeout. )) , remote_host_filter(clientConfiguration.remote_host_filter) , s3_max_redirects(clientConfiguration.s3_max_redirects) diff --git a/src/IO/WriteBufferFromFile.cpp b/src/IO/WriteBufferFromFile.cpp index b3a63842326..67cd7ba27d6 100644 --- a/src/IO/WriteBufferFromFile.cpp +++ b/src/IO/WriteBufferFromFile.cpp @@ -79,7 +79,7 @@ WriteBufferFromFile::~WriteBufferFromFile() return; /// FIXME move final flush into the caller - MemoryTracker::LockExceptionInThread lock; + MemoryTracker::LockExceptionInThread lock(VariableContext::Global); next(); diff --git a/src/IO/WriteBufferFromFileDescriptor.cpp b/src/IO/WriteBufferFromFileDescriptor.cpp index bfd874ee396..cd265653bb9 100644 --- a/src/IO/WriteBufferFromFileDescriptor.cpp +++ b/src/IO/WriteBufferFromFileDescriptor.cpp @@ -98,7 +98,7 @@ WriteBufferFromFileDescriptor::~WriteBufferFromFileDescriptor() } /// FIXME move final flush into the caller - MemoryTracker::LockExceptionInThread lock; + MemoryTracker::LockExceptionInThread lock(VariableContext::Global); next(); } diff --git a/src/IO/WriteBufferFromOStream.cpp b/src/IO/WriteBufferFromOStream.cpp index cf731934c93..36fbf8301c1 100644 --- a/src/IO/WriteBufferFromOStream.cpp +++ b/src/IO/WriteBufferFromOStream.cpp @@ -43,7 +43,7 @@ WriteBufferFromOStream::WriteBufferFromOStream( WriteBufferFromOStream::~WriteBufferFromOStream() { /// FIXME move final flush into the caller - MemoryTracker::LockExceptionInThread lock; + MemoryTracker::LockExceptionInThread lock(VariableContext::Global); next(); } diff --git a/src/IO/WriteBufferFromPocoSocket.cpp b/src/IO/WriteBufferFromPocoSocket.cpp index 45f6e96218a..7338bcabdb5 100644 --- a/src/IO/WriteBufferFromPocoSocket.cpp +++ b/src/IO/WriteBufferFromPocoSocket.cpp @@ -73,7 +73,7 @@ WriteBufferFromPocoSocket::WriteBufferFromPocoSocket(Poco::Net::Socket & socket_ WriteBufferFromPocoSocket::~WriteBufferFromPocoSocket() { /// FIXME move final flush into the caller - MemoryTracker::LockExceptionInThread lock; + MemoryTracker::LockExceptionInThread lock(VariableContext::Global); next(); } diff --git a/src/IO/WriteBufferFromS3.cpp b/src/IO/WriteBufferFromS3.cpp index 93aaf9456b5..a63e00efce4 100644 --- a/src/IO/WriteBufferFromS3.cpp +++ b/src/IO/WriteBufferFromS3.cpp @@ -87,7 +87,7 @@ void WriteBufferFromS3::allocateBuffer() void WriteBufferFromS3::finalize() { /// FIXME move final flush into the caller - MemoryTracker::LockExceptionInThread lock; + MemoryTracker::LockExceptionInThread lock(VariableContext::Global); finalizeImpl(); } diff --git a/src/IO/WriteBufferFromVector.h b/src/IO/WriteBufferFromVector.h index 6341a9b698b..0e0cd3c522b 100644 --- a/src/IO/WriteBufferFromVector.h +++ b/src/IO/WriteBufferFromVector.h @@ -95,7 +95,7 @@ public: ~WriteBufferFromVector() override { /// FIXME move final flush into the caller - MemoryTracker::LockExceptionInThread lock; + MemoryTracker::LockExceptionInThread lock(VariableContext::Global); finalize(); } }; diff --git a/src/IO/WriteBufferValidUTF8.cpp b/src/IO/WriteBufferValidUTF8.cpp index 1071ac1078d..ecdf38eae34 100644 --- a/src/IO/WriteBufferValidUTF8.cpp +++ b/src/IO/WriteBufferValidUTF8.cpp @@ -138,7 +138,7 @@ void WriteBufferValidUTF8::finish() WriteBufferValidUTF8::~WriteBufferValidUTF8() { /// FIXME move final flush into the caller - MemoryTracker::LockExceptionInThread lock; + MemoryTracker::LockExceptionInThread lock(VariableContext::Global); finish(); } diff --git a/src/IO/ZlibDeflatingWriteBuffer.cpp b/src/IO/ZlibDeflatingWriteBuffer.cpp index 7e91820f298..3cf00f627b3 100644 --- a/src/IO/ZlibDeflatingWriteBuffer.cpp +++ b/src/IO/ZlibDeflatingWriteBuffer.cpp @@ -49,7 +49,7 @@ ZlibDeflatingWriteBuffer::ZlibDeflatingWriteBuffer( ZlibDeflatingWriteBuffer::~ZlibDeflatingWriteBuffer() { /// FIXME move final flush into the caller - MemoryTracker::LockExceptionInThread lock; + MemoryTracker::LockExceptionInThread lock(VariableContext::Global); finish(); diff --git a/src/IO/ZstdDeflatingWriteBuffer.cpp b/src/IO/ZstdDeflatingWriteBuffer.cpp index 5b97588b33e..08b289a2e04 100644 --- a/src/IO/ZstdDeflatingWriteBuffer.cpp +++ b/src/IO/ZstdDeflatingWriteBuffer.cpp @@ -31,7 +31,7 @@ ZstdDeflatingWriteBuffer::ZstdDeflatingWriteBuffer( ZstdDeflatingWriteBuffer::~ZstdDeflatingWriteBuffer() { /// FIXME move final flush into the caller - MemoryTracker::LockExceptionInThread lock; + MemoryTracker::LockExceptionInThread lock(VariableContext::Global); finish(); diff --git a/src/Interpreters/DDLWorker.cpp b/src/Interpreters/DDLWorker.cpp index 6081f06b25f..1c023f757f8 100644 --- a/src/Interpreters/DDLWorker.cpp +++ b/src/Interpreters/DDLWorker.cpp @@ -372,7 +372,20 @@ void DDLWorker::scheduleTasks(bool reinitialized) } Strings queue_nodes = zookeeper->getChildren(queue_dir, nullptr, queue_updated_event); + size_t size_before_filtering = queue_nodes.size(); filterAndSortQueueNodes(queue_nodes); + /// The following message is too verbose, but it can be useful too debug mysterious test failures in CI + LOG_TRACE(log, "scheduleTasks: initialized={}, size_before_filtering={}, queue_size={}, " + "entries={}..{}, " + "first_failed_task_name={}, current_tasks_size={}," + "last_current_task={}," + "last_skipped_entry_name={}", + initialized, size_before_filtering, queue_nodes.size(), + queue_nodes.empty() ? "none" : queue_nodes.front(), queue_nodes.empty() ? "none" : queue_nodes.back(), + first_failed_task_name ? *first_failed_task_name : "none", current_tasks.size(), + current_tasks.empty() ? "none" : current_tasks.back()->entry_name, + last_skipped_entry_name ? *last_skipped_entry_name : "none"); + if (max_tasks_in_queue < queue_nodes.size()) cleanup_event->set(); diff --git a/src/Interpreters/ThreadStatusExt.cpp b/src/Interpreters/ThreadStatusExt.cpp index ec6bdb8d526..c04534e11a1 100644 --- a/src/Interpreters/ThreadStatusExt.cpp +++ b/src/Interpreters/ThreadStatusExt.cpp @@ -323,7 +323,7 @@ void ThreadStatus::finalizeQueryProfiler() void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits) { - MemoryTracker::LockExceptionInThread lock; + MemoryTracker::LockExceptionInThread lock(VariableContext::Global); if (exit_if_already_detached && thread_state == ThreadState::DetachedFromQuery) { diff --git a/src/Processors/Formats/Impl/MarkdownRowOutputFormat.cpp b/src/Processors/Formats/Impl/MarkdownRowOutputFormat.cpp index 5108650ff0d..ee5d4193a45 100644 --- a/src/Processors/Formats/Impl/MarkdownRowOutputFormat.cpp +++ b/src/Processors/Formats/Impl/MarkdownRowOutputFormat.cpp @@ -21,16 +21,13 @@ void MarkdownRowOutputFormat::writePrefix() } writeCString("\n|", out); String left_alignment = ":-|"; - String central_alignment = ":-:|"; String right_alignment = "-:|"; for (size_t i = 0; i < columns; ++i) { - if (isInteger(types[i])) + if (types[i]->shouldAlignRightInPrettyFormats()) writeString(right_alignment, out); - else if (isString(types[i])) - writeString(left_alignment, out); else - writeString(central_alignment, out); + writeString(left_alignment, out); } writeChar('\n', out); } diff --git a/src/Server/HTTP/WriteBufferFromHTTPServerResponse.cpp b/src/Server/HTTP/WriteBufferFromHTTPServerResponse.cpp index 355af038da9..a4fe3649e6f 100644 --- a/src/Server/HTTP/WriteBufferFromHTTPServerResponse.cpp +++ b/src/Server/HTTP/WriteBufferFromHTTPServerResponse.cpp @@ -196,7 +196,7 @@ void WriteBufferFromHTTPServerResponse::finalize() WriteBufferFromHTTPServerResponse::~WriteBufferFromHTTPServerResponse() { /// FIXME move final flush into the caller - MemoryTracker::LockExceptionInThread lock; + MemoryTracker::LockExceptionInThread lock(VariableContext::Global); finalize(); } diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index e2b5ce7c325..15dd5b553b0 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -585,6 +586,8 @@ void StorageKafka::threadFunc(size_t idx) bool StorageKafka::streamToViews() { + Stopwatch watch; + auto table_id = getStorageID(); auto table = DatabaseCatalog::instance().getTable(table_id, getContext()); if (!table) @@ -637,7 +640,11 @@ bool StorageKafka::streamToViews() // We can't cancel during copyData, as it's not aware of commits and other kafka-related stuff. // It will be cancelled on underlying layer (kafka buffer) std::atomic stub = {false}; - copyData(*in, *block_io.out, &stub); + size_t rows = 0; + copyData(*in, *block_io.out, [&rows](const Block & block) + { + rows += block.rows(); + }, &stub); bool some_stream_is_stalled = false; for (auto & stream : streams) @@ -646,6 +653,10 @@ bool StorageKafka::streamToViews() stream->as()->commit(); } + UInt64 milliseconds = watch.elapsedMilliseconds(); + LOG_DEBUG(log, "Pushing {} rows to {} took {} ms.", + formatReadableQuantity(rows), table_id.getNameForLogs(), milliseconds); + return some_stream_is_stalled; } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp index 792a77d5e1a..502c6215a9a 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp @@ -342,6 +342,15 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks() timed_blocks.begin(), timed_blocks.end(), block_threshold, NodeWithStat::greaterByTime); auto first_outdated_block = std::min(first_outdated_block_fixed_threshold, first_outdated_block_time_threshold); + auto num_nodes_to_delete = timed_blocks.end() - first_outdated_block; + if (!num_nodes_to_delete) + return; + + auto last_outdated_block = timed_blocks.end() - 1; + LOG_TRACE(log, "Will clear {} old blocks from {} (ctime {}) to {} (ctime {})", num_nodes_to_delete, + first_outdated_block->node, first_outdated_block->ctime, + last_outdated_block->node, last_outdated_block->ctime); + zkutil::AsyncResponses try_remove_futures; for (auto it = first_outdated_block; it != timed_blocks.end(); ++it) { @@ -372,9 +381,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks() first_outdated_block++; } - auto num_nodes_to_delete = timed_blocks.end() - first_outdated_block; - if (num_nodes_to_delete) - LOG_TRACE(log, "Cleared {} old blocks from ZooKeeper", num_nodes_to_delete); + LOG_TRACE(log, "Cleared {} old blocks from ZooKeeper", num_nodes_to_delete); } diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 5c82e5378e8..3d96796a79b 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -469,7 +469,7 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage( /// Always calculate optimized cluster here, to avoid conditions during read() /// (Anyway it will be calculated in the read()) - if (settings.optimize_skip_unused_shards) + if (getClusterQueriedNodes(settings, cluster) > 1 && settings.optimize_skip_unused_shards) { ClusterPtr optimized_cluster = getOptimizedCluster(local_context, metadata_snapshot, query_info.query); if (optimized_cluster) diff --git a/tests/integration/test_merge_tree_s3/test.py b/tests/integration/test_merge_tree_s3/test.py index 3ab8d5d006b..4b685542170 100644 --- a/tests/integration/test_merge_tree_s3/test.py +++ b/tests/integration/test_merge_tree_s3/test.py @@ -68,6 +68,16 @@ def create_table(cluster, table_name, additional_settings=None): node.query(create_table_statement) +def wait_for_delete_s3_objects(cluster, expected, timeout=30): + minio = cluster.minio_client + while timeout > 0: + if len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == expected: + return + timeout -= 1 + time.sleep(1) + assert(len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == expected) + + @pytest.fixture(autouse=True) def drop_table(cluster): yield @@ -75,8 +85,9 @@ def drop_table(cluster): minio = cluster.minio_client node.query("DROP TABLE IF EXISTS s3_test NO DELAY") + try: - assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 0 + wait_for_delete_s3_objects(cluster, 0) finally: # Remove extra objects to prevent tests cascade failing for obj in list(minio.list_objects(cluster.minio_bucket, 'data/')): @@ -151,7 +162,7 @@ def test_insert_same_partition_and_merge(cluster, merge_vertical): assert node.query("SELECT sum(id) FROM s3_test FORMAT Values") == "(0)" assert node.query("SELECT count(distinct(id)) FROM s3_test FORMAT Values") == "(8192)" - assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD + wait_for_delete_s3_objects(cluster, FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD) def test_alter_table_columns(cluster): @@ -167,32 +178,20 @@ def test_alter_table_columns(cluster): # To ensure parts have merged node.query("OPTIMIZE TABLE s3_test") - # Wait for merges, mutations and old parts deletion - time.sleep(3) - assert node.query("SELECT sum(col1) FROM s3_test FORMAT Values") == "(8192)" assert node.query("SELECT sum(col1) FROM s3_test WHERE id > 0 FORMAT Values") == "(4096)" - assert len(list(minio.list_objects(cluster.minio_bucket, - 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD_PER_COLUMN + wait_for_delete_s3_objects(cluster, FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD_PER_COLUMN) node.query("ALTER TABLE s3_test MODIFY COLUMN col1 String", settings={"mutations_sync": 2}) - # Wait for old parts deletion - time.sleep(3) - assert node.query("SELECT distinct(col1) FROM s3_test FORMAT Values") == "('1')" # and file with mutation - assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == ( - FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD_PER_COLUMN + 1) + wait_for_delete_s3_objects(cluster, FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD_PER_COLUMN + 1) node.query("ALTER TABLE s3_test DROP COLUMN col1", settings={"mutations_sync": 2}) - # Wait for old parts deletion - time.sleep(3) - # and 2 files with mutations - assert len( - list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + 2 + wait_for_delete_s3_objects(cluster, FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + 2) def test_attach_detach_partition(cluster): @@ -320,9 +319,7 @@ def test_move_replace_partition_to_another_table(cluster): assert node.query("SELECT count(*) FROM s3_clone FORMAT Values") == "(8192)" # Wait for outdated partitions deletion. - time.sleep(3) - assert len(list( - minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD * 2 + FILES_OVERHEAD_PER_PART_WIDE * 4 + wait_for_delete_s3_objects(cluster, FILES_OVERHEAD * 2 + FILES_OVERHEAD_PER_PART_WIDE * 4) node.query("DROP TABLE s3_clone NO DELAY") assert node.query("SELECT sum(id) FROM s3_test FORMAT Values") == "(0)" @@ -338,7 +335,8 @@ def test_move_replace_partition_to_another_table(cluster): node.query("DROP TABLE s3_test NO DELAY") # Backup data should remain in S3. - assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD_PER_PART_WIDE * 4 + + wait_for_delete_s3_objects(cluster, FILES_OVERHEAD_PER_PART_WIDE * 4) for obj in list(minio.list_objects(cluster.minio_bucket, 'data/')): minio.remove_object(cluster.minio_bucket, obj.object_name) diff --git a/tests/msan_suppressions.txt b/tests/msan_suppressions.txt index 4c7aeaf4a4c..cf468b0be96 100644 --- a/tests/msan_suppressions.txt +++ b/tests/msan_suppressions.txt @@ -7,7 +7,6 @@ fun:tolower # Suppress some failures in contrib so that we can enable MSan in CI. # Ideally, we should report these upstream. -src:*/contrib/zlib-ng/* # Hyperscan fun:roseRunProgram diff --git a/tests/performance/direct_dictionary.xml b/tests/performance/direct_dictionary.xml index e827ea0a76f..3f01449ed99 100644 --- a/tests/performance/direct_dictionary.xml +++ b/tests/performance/direct_dictionary.xml @@ -55,14 +55,14 @@ INSERT INTO simple_key_direct_dictionary_source_table SELECT number, number, toString(number), toDecimal64(number, 8), toString(number) FROM system.numbers - LIMIT 100000; + LIMIT 50000; INSERT INTO complex_key_direct_dictionary_source_table SELECT number, toString(number), number, toString(number), toDecimal64(number, 8), toString(number) FROM system.numbers - LIMIT 100000; + LIMIT 50000; @@ -79,47 +79,51 @@ elements_count - 25000 50000 75000 - 100000 - SELECT dictGet('default.simple_key_direct_dictionary', {column_name}, number) + WITH rand64() % toUInt64({elements_count}) as key + SELECT dictGet('default.simple_key_direct_dictionary', {column_name}, key) FROM system.numbers LIMIT {elements_count} FORMAT Null; - SELECT dictGet('default.simple_key_direct_dictionary', ('value_int', 'value_string', 'value_decimal', 'value_string_nullable'), number) + WITH rand64() % toUInt64({elements_count}) as key + SELECT dictGet('default.simple_key_direct_dictionary', ('value_int', 'value_string', 'value_decimal', 'value_string_nullable'), key) FROM system.numbers LIMIT {elements_count} FORMAT Null; - SELECT dictHas('default.simple_key_direct_dictionary', number) + WITH rand64() % toUInt64({elements_count}) as key + SELECT dictHas('default.simple_key_direct_dictionary', key) FROM system.numbers LIMIT {elements_count} FORMAT Null; - SELECT dictGet('default.complex_key_direct_dictionary', {column_name}, (number, toString(number))) + WITH (number, toString(number)) as key + SELECT dictGet('default.complex_key_direct_dictionary', {column_name}, key) FROM system.numbers LIMIT {elements_count} FORMAT Null; - SELECT dictGet('default.complex_key_direct_dictionary', ('value_int', 'value_string', 'value_decimal', 'value_string_nullable'), (number, toString(number))) + WITH (number, toString(number)) as key + SELECT dictGet('default.complex_key_direct_dictionary', ('value_int', 'value_string', 'value_decimal', 'value_string_nullable'), key) FROM system.numbers LIMIT {elements_count} FORMAT Null; - SELECT dictHas('default.complex_key_direct_dictionary', (number, toString(number))) + WITH (number, toString(number)) as key + SELECT dictHas('default.complex_key_direct_dictionary', key) FROM system.numbers LIMIT {elements_count} FORMAT Null; diff --git a/tests/performance/flat_dictionary.xml b/tests/performance/flat_dictionary.xml index a571785a7f0..a80631db541 100644 --- a/tests/performance/flat_dictionary.xml +++ b/tests/performance/flat_dictionary.xml @@ -1,8 +1,4 @@ - - please_fix_me - - CREATE TABLE simple_key_flat_dictionary_source_table ( @@ -50,25 +46,30 @@ elements_count - 2500000 5000000 7500000 - 10000000 - SELECT dictGet('default.simple_key_flat_dictionary', {column_name}, number) + WITH rand64() % toUInt64({elements_count}) as key + SELECT dictGet('default.simple_key_flat_dictionary', {column_name}, key) FROM system.numbers LIMIT {elements_count} FORMAT Null; - SELECT dictHas('default.simple_key_flat_dictionary', number) + SELECT * FROM simple_key_flat_dictionary + FORMAT Null; + + + + WITH rand64() % toUInt64(75000000) as key + SELECT dictHas('default.simple_key_flat_dictionary', key) FROM system.numbers - LIMIT {elements_count} + LIMIT 75000000 FORMAT Null; diff --git a/tests/performance/hashed_dictionary.xml b/tests/performance/hashed_dictionary.xml index a38d2f30c23..26164b4f888 100644 --- a/tests/performance/hashed_dictionary.xml +++ b/tests/performance/hashed_dictionary.xml @@ -81,35 +81,37 @@ elements_count - 2500000 5000000 7500000 - 10000000 - SELECT dictGet('default.simple_key_hashed_dictionary', {column_name}, number) + WITH rand64() % toUInt64({elements_count}) as key + SELECT dictGet('default.simple_key_hashed_dictionary', {column_name}, key) FROM system.numbers LIMIT {elements_count} FORMAT Null; - SELECT dictHas('default.simple_key_hashed_dictionary', number) + WITH rand64() % toUInt64({elements_count}) as key + SELECT dictHas('default.simple_key_hashed_dictionary', key) FROM system.numbers LIMIT {elements_count} FORMAT Null; - SELECT dictGet('default.complex_key_hashed_dictionary', {column_name}, (number, toString(number))) + WITH (rand64() % toUInt64({elements_count}), toString(rand64() % toUInt64({elements_count}))) as key + SELECT dictGet('default.complex_key_hashed_dictionary', {column_name}, key) FROM system.numbers LIMIT {elements_count} FORMAT Null; - SELECT dictHas('default.complex_key_hashed_dictionary', (number, toString(number))) + WITH (rand64() % toUInt64({elements_count}), toString(rand64() % toUInt64({elements_count}))) as key + SELECT dictHas('default.complex_key_hashed_dictionary', key) FROM system.numbers LIMIT {elements_count} FORMAT Null; diff --git a/tests/performance/intDiv.xml b/tests/performance/intDiv.xml new file mode 100644 index 00000000000..c6fa0238986 --- /dev/null +++ b/tests/performance/intDiv.xml @@ -0,0 +1,5 @@ + + SELECT count() FROM numbers(200000000) WHERE NOT ignore(intDiv(number, 1000000000)) + SELECT count() FROM numbers(200000000) WHERE NOT ignore(divide(number, 1000000000)) + SELECT count() FROM numbers(200000000) WHERE NOT ignore(toUInt32(divide(number, 1000000000))) + diff --git a/tests/queries/0_stateless/00953_zookeeper_suetin_deduplication_bug.sh b/tests/queries/0_stateless/00953_zookeeper_suetin_deduplication_bug.sh index baa2b0cf53f..71ca29bfd96 100755 --- a/tests/queries/0_stateless/00953_zookeeper_suetin_deduplication_bug.sh +++ b/tests/queries/0_stateless/00953_zookeeper_suetin_deduplication_bug.sh @@ -21,15 +21,12 @@ ORDER BY (engine_id) SETTINGS replicated_deduplication_window = 2, cleanup_delay_period=4, cleanup_delay_period_random_add=0;" $CLICKHOUSE_CLIENT --query="INSERT INTO elog VALUES (toDate('2018-10-01'), 1, 'hello')" -sleep 1 $CLICKHOUSE_CLIENT --query="INSERT INTO elog VALUES (toDate('2018-10-01'), 2, 'hello')" -sleep 1 $CLICKHOUSE_CLIENT --query="INSERT INTO elog VALUES (toDate('2018-10-01'), 3, 'hello')" $CLICKHOUSE_CLIENT --query="SELECT count(*) from elog" # 3 rows count=$($CLICKHOUSE_CLIENT --query="SELECT COUNT(*) FROM system.zookeeper where path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/elog/s1/blocks'") - while [[ $count != 2 ]] do sleep 1 @@ -39,9 +36,8 @@ done $CLICKHOUSE_CLIENT --query="INSERT INTO elog VALUES (toDate('2018-10-01'), 1, 'hello')" $CLICKHOUSE_CLIENT --query="SELECT count(*) from elog" # 4 rows + count=$($CLICKHOUSE_CLIENT --query="SELECT COUNT(*) FROM system.zookeeper where path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/elog/s1/blocks'") - - while [[ $count != 2 ]] do sleep 1 @@ -53,12 +49,10 @@ $CLICKHOUSE_CLIENT --query="INSERT INTO elog VALUES (toDate('2018-10-01'), 2, 'h $CLICKHOUSE_CLIENT --query="SELECT count(*) from elog" # 5 rows count=$($CLICKHOUSE_CLIENT --query="SELECT COUNT(*) FROM system.zookeeper where path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/elog/s1/blocks'") - while [[ $count != 2 ]] do sleep 1 count=$($CLICKHOUSE_CLIENT --query="SELECT COUNT(*) FROM system.zookeeper where path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/elog/s1/blocks'") - done $CLICKHOUSE_CLIENT --query="INSERT INTO elog VALUES (toDate('2018-10-01'), 2, 'hello')" diff --git a/tests/queries/0_stateless/01231_markdown_format.reference b/tests/queries/0_stateless/01231_markdown_format.reference index e2ec03b401a..65838bfede7 100644 --- a/tests/queries/0_stateless/01231_markdown_format.reference +++ b/tests/queries/0_stateless/01231_markdown_format.reference @@ -1,5 +1,5 @@ -| id | name | array | -|-:|:-|:-:| -| 1 | name1 | [1,2,3] | -| 2 | name2 | [4,5,6] | -| 3 | name3 | [7,8,9] | +| id | name | array | nullable | low_cardinality | decimal | +|-:|:-|:-|:-|:-|-:| +| 1 | name1 | [1,2,3] | Some long string | name1 | 1.110000 | +| 2 | name2 | [4,5,60000] | \N | Another long string | 222.222222 | +| 30000 | One more long string | [7,8,9] | name3 | name3 | 3.330000 | diff --git a/tests/queries/0_stateless/01231_markdown_format.sql b/tests/queries/0_stateless/01231_markdown_format.sql index 693664be1ab..65c65389e12 100644 --- a/tests/queries/0_stateless/01231_markdown_format.sql +++ b/tests/queries/0_stateless/01231_markdown_format.sql @@ -1,6 +1,6 @@ DROP TABLE IF EXISTS makrdown; -CREATE TABLE markdown (id UInt32, name String, array Array(Int8)) ENGINE = Memory; -INSERT INTO markdown VALUES (1, 'name1', [1,2,3]), (2, 'name2', [4,5,6]), (3, 'name3', [7,8,9]); +CREATE TABLE markdown (id UInt32, name String, array Array(Int32), nullable Nullable(String), low_cardinality LowCardinality(String), decimal Decimal32(6)) ENGINE = Memory; +INSERT INTO markdown VALUES (1, 'name1', [1,2,3], 'Some long string', 'name1', 1.11), (2, 'name2', [4,5,60000], Null, 'Another long string', 222.222222), (30000, 'One more long string', [7,8,9], 'name3', 'name3', 3.33); SELECT * FROM markdown FORMAT Markdown; DROP TABLE IF EXISTS markdown diff --git a/tests/queries/0_stateless/01305_replica_create_drop_zookeeper.sh b/tests/queries/0_stateless/01305_replica_create_drop_zookeeper.sh index 01bb9af461c..e7b8091284a 100755 --- a/tests/queries/0_stateless/01305_replica_create_drop_zookeeper.sh +++ b/tests/queries/0_stateless/01305_replica_create_drop_zookeeper.sh @@ -8,21 +8,11 @@ set -e function thread() { - db_engine=`$CLICKHOUSE_CLIENT -q "SELECT engine FROM system.databases WHERE name='$CLICKHOUSE_DATABASE'"` - if [[ $db_engine == "Atomic" ]]; then - # Ignore "Replica already exists" exception - while true; do - $CLICKHOUSE_CLIENT -n -q "DROP TABLE IF EXISTS test_table_$1 NO DELAY; - CREATE TABLE test_table_$1 (a UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r_$1') ORDER BY tuple();" 2>&1 | - grep -vP '(^$)|(^Received exception from server)|(^\d+\. )|because the last replica of the table was dropped right now|is already started to be removing by another replica right now|is already finished removing by another replica right now|Removing leftovers from table|Another replica was suddenly created|was successfully removed from ZooKeeper|was created by another server at the same moment|was suddenly removed|some other replicas were created at the same time|already exists' - done - else - while true; do - $CLICKHOUSE_CLIENT -n -q "DROP TABLE IF EXISTS test_table_$1; - CREATE TABLE test_table_$1 (a UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r_$1') ORDER BY tuple();" 2>&1 | - grep -vP '(^$)|(^Received exception from server)|(^\d+\. )|because the last replica of the table was dropped right now|is already started to be removing by another replica right now|is already finished removing by another replica right now|Removing leftovers from table|Another replica was suddenly created|was successfully removed from ZooKeeper|was created by another server at the same moment|was suddenly removed|some other replicas were created at the same time' - done - fi + while true; do + $CLICKHOUSE_CLIENT -n -q "DROP TABLE IF EXISTS test_table_$1 SYNC; + CREATE TABLE test_table_$1 (a UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r_$1') ORDER BY tuple();" 2>&1 | + grep -vP '(^$)|(^Received exception from server)|(^\d+\. )|because the last replica of the table was dropped right now|is already started to be removing by another replica right now|is already finished removing by another replica right now|Removing leftovers from table|Another replica was suddenly created|was successfully removed from ZooKeeper|was created by another server at the same moment|was suddenly removed|some other replicas were created at the same time' + done } diff --git a/tests/queries/0_stateless/018001_dateDiff_DateTime64.reference b/tests/queries/0_stateless/01801_dateDiff_DateTime64.reference similarity index 100% rename from tests/queries/0_stateless/018001_dateDiff_DateTime64.reference rename to tests/queries/0_stateless/01801_dateDiff_DateTime64.reference diff --git a/tests/queries/0_stateless/018001_dateDiff_DateTime64.sql b/tests/queries/0_stateless/01801_dateDiff_DateTime64.sql similarity index 100% rename from tests/queries/0_stateless/018001_dateDiff_DateTime64.sql rename to tests/queries/0_stateless/01801_dateDiff_DateTime64.sql diff --git a/tests/queries/0_stateless/018002_formatDateTime_DateTime64_century.reference b/tests/queries/0_stateless/01802_formatDateTime_DateTime64_century.reference similarity index 100% rename from tests/queries/0_stateless/018002_formatDateTime_DateTime64_century.reference rename to tests/queries/0_stateless/01802_formatDateTime_DateTime64_century.reference diff --git a/tests/queries/0_stateless/018002_formatDateTime_DateTime64_century.sql b/tests/queries/0_stateless/01802_formatDateTime_DateTime64_century.sql similarity index 100% rename from tests/queries/0_stateless/018002_formatDateTime_DateTime64_century.sql rename to tests/queries/0_stateless/01802_formatDateTime_DateTime64_century.sql diff --git a/tests/queries/0_stateless/018002_toDateTime64_large_values.reference b/tests/queries/0_stateless/01802_toDateTime64_large_values.reference similarity index 100% rename from tests/queries/0_stateless/018002_toDateTime64_large_values.reference rename to tests/queries/0_stateless/01802_toDateTime64_large_values.reference diff --git a/tests/queries/0_stateless/018002_toDateTime64_large_values.sql b/tests/queries/0_stateless/01802_toDateTime64_large_values.sql similarity index 100% rename from tests/queries/0_stateless/018002_toDateTime64_large_values.sql rename to tests/queries/0_stateless/01802_toDateTime64_large_values.sql diff --git a/tests/queries/0_stateless/01812_optimize_skip_unused_shards_single_node.reference b/tests/queries/0_stateless/01812_optimize_skip_unused_shards_single_node.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/01812_optimize_skip_unused_shards_single_node.sql b/tests/queries/0_stateless/01812_optimize_skip_unused_shards_single_node.sql new file mode 100644 index 00000000000..c39947f2c04 --- /dev/null +++ b/tests/queries/0_stateless/01812_optimize_skip_unused_shards_single_node.sql @@ -0,0 +1,3 @@ +-- remote() does not have sharding key, while force_optimize_skip_unused_shards=2 requires from table to have it. +-- But due to only one node, everything works. +select * from remote('127.1', system.one) settings optimize_skip_unused_shards=1, force_optimize_skip_unused_shards=2 format Null;