Merge branch 'master' into create-aggregate-func-settings

This commit is contained in:
vdimir 2021-04-16 15:48:39 +03:00
commit 92e813c806
No known key found for this signature in database
GPG Key ID: F57B3E10A21DBB31
57 changed files with 346 additions and 151 deletions

View File

@ -12,7 +12,8 @@
///
/// NOTE: it should be used with caution.
#define SCOPE_EXIT_MEMORY(...) SCOPE_EXIT( \
MemoryTracker::LockExceptionInThread lock_memory_tracker; \
MemoryTracker::LockExceptionInThread \
lock_memory_tracker(VariableContext::Global); \
__VA_ARGS__; \
)
@ -56,7 +57,8 @@
#define SCOPE_EXIT_MEMORY_SAFE(...) SCOPE_EXIT( \
try \
{ \
MemoryTracker::LockExceptionInThread lock_memory_tracker; \
MemoryTracker::LockExceptionInThread \
lock_memory_tracker(VariableContext::Global); \
__VA_ARGS__; \
} \
catch (...) \

View File

@ -96,14 +96,8 @@ if (USE_INTERNAL_ZLIB_LIBRARY)
add_subdirectory (${INTERNAL_ZLIB_NAME})
# We should use same defines when including zlib.h as used when zlib compiled
target_compile_definitions (zlib PUBLIC ZLIB_COMPAT WITH_GZFILEOP)
if (TARGET zlibstatic)
target_compile_definitions (zlibstatic PUBLIC ZLIB_COMPAT WITH_GZFILEOP)
endif ()
if (ARCH_AMD64 OR ARCH_AARCH64)
target_compile_definitions (zlib PUBLIC X86_64 UNALIGNED_OK)
if (TARGET zlibstatic)
target_compile_definitions (zlibstatic PUBLIC X86_64 UNALIGNED_OK)
endif ()
endif ()
endif ()

2
contrib/zlib-ng vendored

@ -1 +1 @@
Subproject commit 6fd1846c8b8f59436fe2dd752d0f316ddbb64df6
Subproject commit b82d3497a5afc46dec3c5d07e4b163b169f251d7

View File

@ -7,7 +7,20 @@
-->
<yandex>
<logger>
<!-- Possible levels: https://github.com/pocoproject/poco/blob/poco-1.9.4-release/Foundation/include/Poco/Logger.h#L105 -->
<!-- Possible levels [1]:
- none (turns off logging)
- fatal
- critical
- error
- warning
- notice
- information
- debug
- trace
[1]: https://github.com/pocoproject/poco/blob/poco-1.9.4-release/Foundation/include/Poco/Logger.h#L105-L114
-->
<level>trace</level>
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>

View File

@ -150,7 +150,7 @@ void tryLogCurrentException(Poco::Logger * logger, const std::string & start_of_
///
/// And in this case the exception will not be logged, so let's block the
/// MemoryTracker until the exception will be logged.
MemoryTracker::LockExceptionInThread lock_memory_tracker;
MemoryTracker::LockExceptionInThread lock_memory_tracker(VariableContext::Global);
try
{

View File

@ -24,8 +24,8 @@ namespace
///
/// - when it is explicitly blocked with LockExceptionInThread
///
/// - to avoid std::terminate(), when stack unwinding is currently in progress
/// in this thread.
/// - when there are uncaught exceptions objects in the current thread
/// (to avoid std::terminate())
///
/// NOTE: that since C++11 destructor marked with noexcept by default, and
/// this means that any throw from destructor (that is not marked with

View File

@ -252,8 +252,6 @@ class IColumn;
* Almost all limits apply to each stream individually. \
*/ \
\
M(UInt64, limit, 0, "Limit on read rows from the most 'end' result for select query, default 0 means no limit length", 0) \
M(UInt64, offset, 0, "Offset on read rows from the most 'end' result for select query", 0) \
M(UInt64, max_rows_to_read, 0, "Limit on read rows from the most 'deep' sources. That is, only in the deepest subquery. When reading from a remote server, it is only checked on a remote server.", 0) \
M(UInt64, max_bytes_to_read, 0, "Limit on read bytes (after decompression) from the most 'deep' sources. That is, only in the deepest subquery. When reading from a remote server, it is only checked on a remote server.", 0) \
M(OverflowMode, read_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \
@ -464,8 +462,11 @@ class IColumn;
\
M(Bool, database_replicated_ddl_output, true, "Obsolete setting, does nothing. Will be removed after 2021-09-08", 0) \
M(HandleKafkaErrorMode, handle_kafka_error_mode, HandleKafkaErrorMode::DEFAULT, "How to handle errors for Kafka engine. Passible values: default, stream.", 0) \
M(UInt64, limit, 0, "Limit on read rows from the most 'end' result for select query, default 0 means no limit length", 0) \
M(UInt64, offset, 0, "Offset on read rows from the most 'end' result for select query", 0) \
M(Bool, allow_experimental_funnel_functions, true, "Enable experimental functions for funnel analysis.", 0) \
// End of COMMON_SETTINGS
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS below.

View File

@ -311,7 +311,7 @@ void PushingToViewsBlockOutputStream::writeSuffix()
UInt64 milliseconds = main_watch.elapsedMilliseconds();
if (views.size() > 1)
{
LOG_TRACE(log, "Pushing from {} to {} views took {} ms.",
LOG_DEBUG(log, "Pushing from {} to {} views took {} ms.",
storage->getStorageID().getNameForLogs(), views.size(),
milliseconds);
}

View File

@ -49,6 +49,16 @@ void copyDataImpl(IBlockInputStream & from, IBlockOutputStream & to, TCancelCall
to.writeSuffix();
}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<void(const Block & block)> & progress,
std::atomic<bool> * is_cancelled)
{
auto is_cancelled_pred = [is_cancelled] ()
{
return isAtomicSet(is_cancelled);
};
copyDataImpl(from, to, is_cancelled_pred, progress);
}
inline void doNothing(const Block &) {}

View File

@ -16,6 +16,9 @@ class Block;
*/
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled = nullptr);
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<void(const Block & block)> & progress,
std::atomic<bool> * is_cancelled = nullptr);
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled);
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled,

View File

@ -51,6 +51,14 @@ Columns DirectDictionary<dictionary_key_type>::getColumns(
key_to_fetched_index.reserve(requested_keys.size());
auto fetched_columns_from_storage = request.makeAttributesResultColumns();
for (size_t attribute_index = 0; attribute_index < request.attributesSize(); ++attribute_index)
{
if (!request.shouldFillResultColumnWithIndex(attribute_index))
continue;
auto & fetched_column_from_storage = fetched_columns_from_storage[attribute_index];
fetched_column_from_storage->reserve(requested_keys.size());
}
size_t fetched_key_index = 0;

View File

@ -122,7 +122,7 @@ void registerDiskS3(DiskFactory & factory)
throw Exception("S3 path must ends with '/', but '" + uri.key + "' doesn't.", ErrorCodes::BAD_ARGUMENTS);
client_configuration.connectTimeoutMs = config.getUInt(config_prefix + ".connect_timeout_ms", 10000);
client_configuration.httpRequestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 5000);
client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 5000);
client_configuration.maxConnections = config.getUInt(config_prefix + ".max_connections", 100);
client_configuration.endpointOverride = uri.endpoint;

View File

@ -1,5 +1,7 @@
configure_file(config_functions.h.in ${ConfigIncludePath}/config_functions.h)
add_subdirectory(divide)
include(${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake)
add_headers_and_sources(clickhouse_functions .)
@ -25,7 +27,7 @@ target_link_libraries(clickhouse_functions
PRIVATE
${ZLIB_LIBRARIES}
boost::filesystem
libdivide
divide_impl
)
if (OPENSSL_CRYPTO_LIBRARY)

View File

@ -0,0 +1,22 @@
# A library for integer division by constant with CPU dispatching.
if (ARCH_AMD64)
add_library(divide_impl_sse2 divideImpl.cpp)
target_compile_options(divide_impl_sse2 PRIVATE -msse2 -DNAMESPACE=SSE2)
target_link_libraries(divide_impl_sse2 libdivide)
add_library(divide_impl_avx2 divideImpl.cpp)
target_compile_options(divide_impl_avx2 PRIVATE -mavx2 -DNAMESPACE=AVX2)
target_link_libraries(divide_impl_avx2 libdivide)
set(IMPLEMENTATIONS divide_impl_sse2 divide_impl_avx2)
else ()
add_library(divide_impl_generic divideImpl.cpp)
target_compile_options(divide_impl_generic PRIVATE -DNAMESPACE=Generic)
target_link_libraries(divide_impl_generic libdivide)
set(IMPLEMENTATIONS divide_impl_generic)
endif ()
add_library(divide_impl divide.cpp)
target_link_libraries(divide_impl ${IMPLEMENTATIONS} clickhouse_common_io)

View File

@ -0,0 +1,57 @@
#include "divide.h"
#include <Common/CpuId.h>
#if defined(__x86_64__) && !defined(ARCADIA_BUILD)
namespace SSE2
{
template <typename A, typename B, typename ResultType>
void divideImpl(const A * __restrict a_pos, B b, ResultType * __restrict c_pos, size_t size);
}
namespace AVX2
{
template <typename A, typename B, typename ResultType>
void divideImpl(const A * __restrict a_pos, B b, ResultType * __restrict c_pos, size_t size);
}
#else
namespace Generic
{
template <typename A, typename B, typename ResultType>
void divideImpl(const A * __restrict a_pos, B b, ResultType * __restrict c_pos, size_t size);
}
#endif
template <typename A, typename B, typename ResultType>
void divideImpl(const A * __restrict a_pos, B b, ResultType * __restrict c_pos, size_t size)
{
#if defined(__x86_64__) && !defined(ARCADIA_BUILD)
if (DB::Cpu::CpuFlagsCache::have_AVX2)
AVX2::divideImpl(a_pos, b, c_pos, size);
else if (DB::Cpu::CpuFlagsCache::have_SSE2)
SSE2::divideImpl(a_pos, b, c_pos, size);
#else
Generic::divideImpl(a_pos, b, c_pos, size);
#endif
}
template void divideImpl<uint64_t, uint64_t, uint64_t>(const uint64_t * __restrict, uint64_t, uint64_t * __restrict, size_t);
template void divideImpl<uint64_t, uint32_t, uint64_t>(const uint64_t * __restrict, uint32_t, uint64_t * __restrict, size_t);
template void divideImpl<uint64_t, uint16_t, uint64_t>(const uint64_t * __restrict, uint16_t, uint64_t * __restrict, size_t);
template void divideImpl<uint64_t, char8_t, uint64_t>(const uint64_t * __restrict, char8_t, uint64_t * __restrict, size_t);
template void divideImpl<uint32_t, uint64_t, uint32_t>(const uint32_t * __restrict, uint64_t, uint32_t * __restrict, size_t);
template void divideImpl<uint32_t, uint32_t, uint32_t>(const uint32_t * __restrict, uint32_t, uint32_t * __restrict, size_t);
template void divideImpl<uint32_t, uint16_t, uint32_t>(const uint32_t * __restrict, uint16_t, uint32_t * __restrict, size_t);
template void divideImpl<uint32_t, char8_t, uint32_t>(const uint32_t * __restrict, char8_t, uint32_t * __restrict, size_t);
template void divideImpl<int64_t, int64_t, int64_t>(const int64_t * __restrict, int64_t, int64_t * __restrict, size_t);
template void divideImpl<int64_t, int32_t, int64_t>(const int64_t * __restrict, int32_t, int64_t * __restrict, size_t);
template void divideImpl<int64_t, int16_t, int64_t>(const int64_t * __restrict, int16_t, int64_t * __restrict, size_t);
template void divideImpl<int64_t, int8_t, int64_t>(const int64_t * __restrict, int8_t, int64_t * __restrict, size_t);
template void divideImpl<int32_t, int64_t, int32_t>(const int32_t * __restrict, int64_t, int32_t * __restrict, size_t);
template void divideImpl<int32_t, int32_t, int32_t>(const int32_t * __restrict, int32_t, int32_t * __restrict, size_t);
template void divideImpl<int32_t, int16_t, int32_t>(const int32_t * __restrict, int16_t, int32_t * __restrict, size_t);
template void divideImpl<int32_t, int8_t, int32_t>(const int32_t * __restrict, int8_t, int32_t * __restrict, size_t);

View File

@ -0,0 +1,6 @@
#pragma once
#include <cstddef>
template <typename A, typename B, typename ResultType>
extern void divideImpl(const A * __restrict a_pos, B b, ResultType * __restrict c_pos, size_t size);

View File

@ -0,0 +1,79 @@
/// This translation unit should be compiled multiple times
/// with different values of NAMESPACE and machine flags (sse2, avx2).
#if !defined(NAMESPACE)
#if defined(ARCADIA_BUILD)
#define NAMESPACE Generic
#else
#error "NAMESPACE macro must be defined"
#endif
#endif
#if defined(__AVX2__)
#define REG_SIZE 32
#define LIBDIVIDE_AVX2
#elif defined(__SSE2__)
#define REG_SIZE 16
#define LIBDIVIDE_SSE2
#endif
#include <libdivide.h>
namespace NAMESPACE
{
template <typename A, typename B, typename ResultType>
void divideImpl(const A * __restrict a_pos, B b, ResultType * __restrict c_pos, size_t size)
{
libdivide::divider<A> divider(b);
const A * a_end = a_pos + size;
#if defined(__SSE2__)
static constexpr size_t values_per_simd_register = REG_SIZE / sizeof(A);
const A * a_end_simd = a_pos + size / values_per_simd_register * values_per_simd_register;
while (a_pos < a_end_simd)
{
#if defined(__AVX2__)
_mm256_storeu_si256(reinterpret_cast<__m256i *>(c_pos),
_mm256_loadu_si256(reinterpret_cast<const __m256i *>(a_pos)) / divider);
#else
_mm_storeu_si128(reinterpret_cast<__m128i *>(c_pos),
_mm_loadu_si128(reinterpret_cast<const __m128i *>(a_pos)) / divider);
#endif
a_pos += values_per_simd_register;
c_pos += values_per_simd_register;
}
#endif
while (a_pos < a_end)
{
*c_pos = *a_pos / divider;
++a_pos;
++c_pos;
}
}
template void divideImpl<uint64_t, uint64_t, uint64_t>(const uint64_t * __restrict, uint64_t, uint64_t * __restrict, size_t);
template void divideImpl<uint64_t, uint32_t, uint64_t>(const uint64_t * __restrict, uint32_t, uint64_t * __restrict, size_t);
template void divideImpl<uint64_t, uint16_t, uint64_t>(const uint64_t * __restrict, uint16_t, uint64_t * __restrict, size_t);
template void divideImpl<uint64_t, char8_t, uint64_t>(const uint64_t * __restrict, char8_t, uint64_t * __restrict, size_t);
template void divideImpl<uint32_t, uint64_t, uint32_t>(const uint32_t * __restrict, uint64_t, uint32_t * __restrict, size_t);
template void divideImpl<uint32_t, uint32_t, uint32_t>(const uint32_t * __restrict, uint32_t, uint32_t * __restrict, size_t);
template void divideImpl<uint32_t, uint16_t, uint32_t>(const uint32_t * __restrict, uint16_t, uint32_t * __restrict, size_t);
template void divideImpl<uint32_t, char8_t, uint32_t>(const uint32_t * __restrict, char8_t, uint32_t * __restrict, size_t);
template void divideImpl<int64_t, int64_t, int64_t>(const int64_t * __restrict, int64_t, int64_t * __restrict, size_t);
template void divideImpl<int64_t, int32_t, int64_t>(const int64_t * __restrict, int32_t, int64_t * __restrict, size_t);
template void divideImpl<int64_t, int16_t, int64_t>(const int64_t * __restrict, int16_t, int64_t * __restrict, size_t);
template void divideImpl<int64_t, int8_t, int64_t>(const int64_t * __restrict, int8_t, int64_t * __restrict, size_t);
template void divideImpl<int32_t, int64_t, int32_t>(const int32_t * __restrict, int64_t, int32_t * __restrict, size_t);
template void divideImpl<int32_t, int32_t, int32_t>(const int32_t * __restrict, int32_t, int32_t * __restrict, size_t);
template void divideImpl<int32_t, int16_t, int32_t>(const int32_t * __restrict, int16_t, int32_t * __restrict, size_t);
template void divideImpl<int32_t, int8_t, int32_t>(const int32_t * __restrict, int8_t, int32_t * __restrict, size_t);
}

View File

@ -1,11 +1,7 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionBinaryArithmetic.h>
#if defined(__SSE2__)
# define LIBDIVIDE_SSE2 1
#endif
#include <libdivide.h>
#include "divide/divide.h"
namespace DB
@ -70,34 +66,11 @@ struct DivideIntegralByConstantImpl
if (unlikely(static_cast<A>(b) == 0))
throw Exception("Division by zero", ErrorCodes::ILLEGAL_DIVISION);
libdivide::divider<A> divider(b);
const A * a_end = a_pos + size;
#if defined(__SSE2__)
static constexpr size_t values_per_sse_register = 16 / sizeof(A);
const A * a_end_sse = a_pos + size / values_per_sse_register * values_per_sse_register;
while (a_pos < a_end_sse)
{
_mm_storeu_si128(reinterpret_cast<__m128i *>(c_pos),
_mm_loadu_si128(reinterpret_cast<const __m128i *>(a_pos)) / divider);
a_pos += values_per_sse_register;
c_pos += values_per_sse_register;
}
#endif
while (a_pos < a_end)
{
*c_pos = *a_pos / divider;
++a_pos;
++c_pos;
}
divideImpl(a_pos, b, c_pos, size);
}
};
/** Specializations are specified for dividing numbers of the type UInt64 and UInt32 by the numbers of the same sign.
/** Specializations are specified for dividing numbers of the type UInt64, UInt32, Int64, Int32 by the numbers of the same sign.
* Can be expanded to all possible combinations, but more code is needed.
*/

View File

@ -78,12 +78,11 @@ struct ModuloByConstantImpl
if (b < 0)
b = -b;
libdivide::divider<A> divider(b);
/// Here we failed to make the SSE variant from libdivide give an advantage.
if (b & (b - 1))
{
libdivide::divider<A> divider(b);
for (size_t i = 0; i < size; ++i)
dst[i] = src[i] - (src[i] / divider) * b; /// NOTE: perhaps, the division semantics with the remainder of negative numbers is not preserved.
}

View File

@ -229,6 +229,8 @@ SRCS(
defaultValueOfTypeName.cpp
demange.cpp
divide.cpp
divide/divide.cpp
divide/divideImpl.cpp
dumpColumnStructure.cpp
e.cpp
empty.cpp

View File

@ -50,7 +50,7 @@ BrotliWriteBuffer::BrotliWriteBuffer(std::unique_ptr<WriteBuffer> out_, int comp
BrotliWriteBuffer::~BrotliWriteBuffer()
{
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finish();
}

View File

@ -50,7 +50,7 @@ LZMADeflatingWriteBuffer::LZMADeflatingWriteBuffer(
LZMADeflatingWriteBuffer::~LZMADeflatingWriteBuffer()
{
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finish();
lzma_end(&lstr);

View File

@ -81,8 +81,8 @@ PocoHTTPClient::PocoHTTPClient(const PocoHTTPClientConfiguration & clientConfigu
: per_request_configuration(clientConfiguration.perRequestConfiguration)
, timeouts(ConnectionTimeouts(
Poco::Timespan(clientConfiguration.connectTimeoutMs * 1000), /// connection timeout.
Poco::Timespan(clientConfiguration.httpRequestTimeoutMs * 1000), /// send timeout.
Poco::Timespan(clientConfiguration.httpRequestTimeoutMs * 1000) /// receive timeout.
Poco::Timespan(clientConfiguration.requestTimeoutMs * 1000), /// send timeout.
Poco::Timespan(clientConfiguration.requestTimeoutMs * 1000) /// receive timeout.
))
, remote_host_filter(clientConfiguration.remote_host_filter)
, s3_max_redirects(clientConfiguration.s3_max_redirects)

View File

@ -79,7 +79,7 @@ WriteBufferFromFile::~WriteBufferFromFile()
return;
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
next();

View File

@ -98,7 +98,7 @@ WriteBufferFromFileDescriptor::~WriteBufferFromFileDescriptor()
}
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
next();
}

View File

@ -43,7 +43,7 @@ WriteBufferFromOStream::WriteBufferFromOStream(
WriteBufferFromOStream::~WriteBufferFromOStream()
{
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
next();
}

View File

@ -73,7 +73,7 @@ WriteBufferFromPocoSocket::WriteBufferFromPocoSocket(Poco::Net::Socket & socket_
WriteBufferFromPocoSocket::~WriteBufferFromPocoSocket()
{
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
next();
}

View File

@ -87,7 +87,7 @@ void WriteBufferFromS3::allocateBuffer()
void WriteBufferFromS3::finalize()
{
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalizeImpl();
}

View File

@ -95,7 +95,7 @@ public:
~WriteBufferFromVector() override
{
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalize();
}
};

View File

@ -138,7 +138,7 @@ void WriteBufferValidUTF8::finish()
WriteBufferValidUTF8::~WriteBufferValidUTF8()
{
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finish();
}

View File

@ -49,7 +49,7 @@ ZlibDeflatingWriteBuffer::ZlibDeflatingWriteBuffer(
ZlibDeflatingWriteBuffer::~ZlibDeflatingWriteBuffer()
{
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finish();

View File

@ -31,7 +31,7 @@ ZstdDeflatingWriteBuffer::ZstdDeflatingWriteBuffer(
ZstdDeflatingWriteBuffer::~ZstdDeflatingWriteBuffer()
{
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finish();

View File

@ -372,7 +372,20 @@ void DDLWorker::scheduleTasks(bool reinitialized)
}
Strings queue_nodes = zookeeper->getChildren(queue_dir, nullptr, queue_updated_event);
size_t size_before_filtering = queue_nodes.size();
filterAndSortQueueNodes(queue_nodes);
/// The following message is too verbose, but it can be useful too debug mysterious test failures in CI
LOG_TRACE(log, "scheduleTasks: initialized={}, size_before_filtering={}, queue_size={}, "
"entries={}..{}, "
"first_failed_task_name={}, current_tasks_size={},"
"last_current_task={},"
"last_skipped_entry_name={}",
initialized, size_before_filtering, queue_nodes.size(),
queue_nodes.empty() ? "none" : queue_nodes.front(), queue_nodes.empty() ? "none" : queue_nodes.back(),
first_failed_task_name ? *first_failed_task_name : "none", current_tasks.size(),
current_tasks.empty() ? "none" : current_tasks.back()->entry_name,
last_skipped_entry_name ? *last_skipped_entry_name : "none");
if (max_tasks_in_queue < queue_nodes.size())
cleanup_event->set();

View File

@ -323,7 +323,7 @@ void ThreadStatus::finalizeQueryProfiler()
void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
{
MemoryTracker::LockExceptionInThread lock;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
if (exit_if_already_detached && thread_state == ThreadState::DetachedFromQuery)
{

View File

@ -21,16 +21,13 @@ void MarkdownRowOutputFormat::writePrefix()
}
writeCString("\n|", out);
String left_alignment = ":-|";
String central_alignment = ":-:|";
String right_alignment = "-:|";
for (size_t i = 0; i < columns; ++i)
{
if (isInteger(types[i]))
if (types[i]->shouldAlignRightInPrettyFormats())
writeString(right_alignment, out);
else if (isString(types[i]))
writeString(left_alignment, out);
else
writeString(central_alignment, out);
writeString(left_alignment, out);
}
writeChar('\n', out);
}

View File

@ -196,7 +196,7 @@ void WriteBufferFromHTTPServerResponse::finalize()
WriteBufferFromHTTPServerResponse::~WriteBufferFromHTTPServerResponse()
{
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock;
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finalize();
}

View File

@ -28,6 +28,7 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/Exception.h>
#include <Common/Macros.h>
#include <Common/formatReadable.h>
#include <Common/config_version.h>
#include <Common/setThreadName.h>
#include <Common/typeid_cast.h>
@ -585,6 +586,8 @@ void StorageKafka::threadFunc(size_t idx)
bool StorageKafka::streamToViews()
{
Stopwatch watch;
auto table_id = getStorageID();
auto table = DatabaseCatalog::instance().getTable(table_id, getContext());
if (!table)
@ -637,7 +640,11 @@ bool StorageKafka::streamToViews()
// We can't cancel during copyData, as it's not aware of commits and other kafka-related stuff.
// It will be cancelled on underlying layer (kafka buffer)
std::atomic<bool> stub = {false};
copyData(*in, *block_io.out, &stub);
size_t rows = 0;
copyData(*in, *block_io.out, [&rows](const Block & block)
{
rows += block.rows();
}, &stub);
bool some_stream_is_stalled = false;
for (auto & stream : streams)
@ -646,6 +653,10 @@ bool StorageKafka::streamToViews()
stream->as<KafkaBlockInputStream>()->commit();
}
UInt64 milliseconds = watch.elapsedMilliseconds();
LOG_DEBUG(log, "Pushing {} rows to {} took {} ms.",
formatReadableQuantity(rows), table_id.getNameForLogs(), milliseconds);
return some_stream_is_stalled;
}

View File

@ -342,6 +342,15 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
timed_blocks.begin(), timed_blocks.end(), block_threshold, NodeWithStat::greaterByTime);
auto first_outdated_block = std::min(first_outdated_block_fixed_threshold, first_outdated_block_time_threshold);
auto num_nodes_to_delete = timed_blocks.end() - first_outdated_block;
if (!num_nodes_to_delete)
return;
auto last_outdated_block = timed_blocks.end() - 1;
LOG_TRACE(log, "Will clear {} old blocks from {} (ctime {}) to {} (ctime {})", num_nodes_to_delete,
first_outdated_block->node, first_outdated_block->ctime,
last_outdated_block->node, last_outdated_block->ctime);
zkutil::AsyncResponses<Coordination::RemoveResponse> try_remove_futures;
for (auto it = first_outdated_block; it != timed_blocks.end(); ++it)
{
@ -372,9 +381,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
first_outdated_block++;
}
auto num_nodes_to_delete = timed_blocks.end() - first_outdated_block;
if (num_nodes_to_delete)
LOG_TRACE(log, "Cleared {} old blocks from ZooKeeper", num_nodes_to_delete);
LOG_TRACE(log, "Cleared {} old blocks from ZooKeeper", num_nodes_to_delete);
}

View File

@ -469,7 +469,7 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(
/// Always calculate optimized cluster here, to avoid conditions during read()
/// (Anyway it will be calculated in the read())
if (settings.optimize_skip_unused_shards)
if (getClusterQueriedNodes(settings, cluster) > 1 && settings.optimize_skip_unused_shards)
{
ClusterPtr optimized_cluster = getOptimizedCluster(local_context, metadata_snapshot, query_info.query);
if (optimized_cluster)

View File

@ -68,6 +68,16 @@ def create_table(cluster, table_name, additional_settings=None):
node.query(create_table_statement)
def wait_for_delete_s3_objects(cluster, expected, timeout=30):
minio = cluster.minio_client
while timeout > 0:
if len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == expected:
return
timeout -= 1
time.sleep(1)
assert(len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == expected)
@pytest.fixture(autouse=True)
def drop_table(cluster):
yield
@ -75,8 +85,9 @@ def drop_table(cluster):
minio = cluster.minio_client
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
try:
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 0
wait_for_delete_s3_objects(cluster, 0)
finally:
# Remove extra objects to prevent tests cascade failing
for obj in list(minio.list_objects(cluster.minio_bucket, 'data/')):
@ -151,7 +162,7 @@ def test_insert_same_partition_and_merge(cluster, merge_vertical):
assert node.query("SELECT sum(id) FROM s3_test FORMAT Values") == "(0)"
assert node.query("SELECT count(distinct(id)) FROM s3_test FORMAT Values") == "(8192)"
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD
wait_for_delete_s3_objects(cluster, FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD)
def test_alter_table_columns(cluster):
@ -167,32 +178,20 @@ def test_alter_table_columns(cluster):
# To ensure parts have merged
node.query("OPTIMIZE TABLE s3_test")
# Wait for merges, mutations and old parts deletion
time.sleep(3)
assert node.query("SELECT sum(col1) FROM s3_test FORMAT Values") == "(8192)"
assert node.query("SELECT sum(col1) FROM s3_test WHERE id > 0 FORMAT Values") == "(4096)"
assert len(list(minio.list_objects(cluster.minio_bucket,
'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD_PER_COLUMN
wait_for_delete_s3_objects(cluster, FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD_PER_COLUMN)
node.query("ALTER TABLE s3_test MODIFY COLUMN col1 String", settings={"mutations_sync": 2})
# Wait for old parts deletion
time.sleep(3)
assert node.query("SELECT distinct(col1) FROM s3_test FORMAT Values") == "('1')"
# and file with mutation
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == (
FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD_PER_COLUMN + 1)
wait_for_delete_s3_objects(cluster, FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD_PER_COLUMN + 1)
node.query("ALTER TABLE s3_test DROP COLUMN col1", settings={"mutations_sync": 2})
# Wait for old parts deletion
time.sleep(3)
# and 2 files with mutations
assert len(
list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + 2
wait_for_delete_s3_objects(cluster, FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + 2)
def test_attach_detach_partition(cluster):
@ -320,9 +319,7 @@ def test_move_replace_partition_to_another_table(cluster):
assert node.query("SELECT count(*) FROM s3_clone FORMAT Values") == "(8192)"
# Wait for outdated partitions deletion.
time.sleep(3)
assert len(list(
minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD * 2 + FILES_OVERHEAD_PER_PART_WIDE * 4
wait_for_delete_s3_objects(cluster, FILES_OVERHEAD * 2 + FILES_OVERHEAD_PER_PART_WIDE * 4)
node.query("DROP TABLE s3_clone NO DELAY")
assert node.query("SELECT sum(id) FROM s3_test FORMAT Values") == "(0)"
@ -338,7 +335,8 @@ def test_move_replace_partition_to_another_table(cluster):
node.query("DROP TABLE s3_test NO DELAY")
# Backup data should remain in S3.
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD_PER_PART_WIDE * 4
wait_for_delete_s3_objects(cluster, FILES_OVERHEAD_PER_PART_WIDE * 4)
for obj in list(minio.list_objects(cluster.minio_bucket, 'data/')):
minio.remove_object(cluster.minio_bucket, obj.object_name)

View File

@ -7,7 +7,6 @@ fun:tolower
# Suppress some failures in contrib so that we can enable MSan in CI.
# Ideally, we should report these upstream.
src:*/contrib/zlib-ng/*
# Hyperscan
fun:roseRunProgram

View File

@ -55,14 +55,14 @@
INSERT INTO simple_key_direct_dictionary_source_table
SELECT number, number, toString(number), toDecimal64(number, 8), toString(number)
FROM system.numbers
LIMIT 100000;
LIMIT 50000;
</fill_query>
<fill_query>
INSERT INTO complex_key_direct_dictionary_source_table
SELECT number, toString(number), number, toString(number), toDecimal64(number, 8), toString(number)
FROM system.numbers
LIMIT 100000;
LIMIT 50000;
</fill_query>
<substitutions>
@ -79,47 +79,51 @@
<substitution>
<name>elements_count</name>
<values>
<value>25000</value>
<value>50000</value>
<value>75000</value>
<value>100000</value>
</values>
</substitution>
</substitutions>
<query>
SELECT dictGet('default.simple_key_direct_dictionary', {column_name}, number)
WITH rand64() % toUInt64({elements_count}) as key
SELECT dictGet('default.simple_key_direct_dictionary', {column_name}, key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;
</query>
<query>
SELECT dictGet('default.simple_key_direct_dictionary', ('value_int', 'value_string', 'value_decimal', 'value_string_nullable'), number)
WITH rand64() % toUInt64({elements_count}) as key
SELECT dictGet('default.simple_key_direct_dictionary', ('value_int', 'value_string', 'value_decimal', 'value_string_nullable'), key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;
</query>
<query>
SELECT dictHas('default.simple_key_direct_dictionary', number)
WITH rand64() % toUInt64({elements_count}) as key
SELECT dictHas('default.simple_key_direct_dictionary', key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;
</query>
<query>
SELECT dictGet('default.complex_key_direct_dictionary', {column_name}, (number, toString(number)))
WITH (number, toString(number)) as key
SELECT dictGet('default.complex_key_direct_dictionary', {column_name}, key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;
</query>
<query>
SELECT dictGet('default.complex_key_direct_dictionary', ('value_int', 'value_string', 'value_decimal', 'value_string_nullable'), (number, toString(number)))
WITH (number, toString(number)) as key
SELECT dictGet('default.complex_key_direct_dictionary', ('value_int', 'value_string', 'value_decimal', 'value_string_nullable'), key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;
</query>
<query>
SELECT dictHas('default.complex_key_direct_dictionary', (number, toString(number)))
WITH (number, toString(number)) as key
SELECT dictHas('default.complex_key_direct_dictionary', key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;

View File

@ -1,8 +1,4 @@
<test max_ignored_relative_change="0.3">
<preconditions>
<table_exists>please_fix_me</table_exists>
</preconditions>
<create_query>
CREATE TABLE simple_key_flat_dictionary_source_table
(
@ -50,25 +46,30 @@
<substitution>
<name>elements_count</name>
<values>
<value>2500000</value>
<value>5000000</value>
<value>7500000</value>
<value>10000000</value>
</values>
</substitution>
</substitutions>
<query>
SELECT dictGet('default.simple_key_flat_dictionary', {column_name}, number)
WITH rand64() % toUInt64({elements_count}) as key
SELECT dictGet('default.simple_key_flat_dictionary', {column_name}, key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;
</query>
<query>
SELECT dictHas('default.simple_key_flat_dictionary', number)
SELECT * FROM simple_key_flat_dictionary
FORMAT Null;
</query>
<query>
WITH rand64() % toUInt64(75000000) as key
SELECT dictHas('default.simple_key_flat_dictionary', key)
FROM system.numbers
LIMIT {elements_count}
LIMIT 75000000
FORMAT Null;
</query>

View File

@ -81,35 +81,37 @@
<substitution>
<name>elements_count</name>
<values>
<value>2500000</value>
<value>5000000</value>
<value>7500000</value>
<value>10000000</value>
</values>
</substitution>
</substitutions>
<query>
SELECT dictGet('default.simple_key_hashed_dictionary', {column_name}, number)
WITH rand64() % toUInt64({elements_count}) as key
SELECT dictGet('default.simple_key_hashed_dictionary', {column_name}, key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;
</query>
<query>
SELECT dictHas('default.simple_key_hashed_dictionary', number)
WITH rand64() % toUInt64({elements_count}) as key
SELECT dictHas('default.simple_key_hashed_dictionary', key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;
</query>
<query>
SELECT dictGet('default.complex_key_hashed_dictionary', {column_name}, (number, toString(number)))
WITH (rand64() % toUInt64({elements_count}), toString(rand64() % toUInt64({elements_count}))) as key
SELECT dictGet('default.complex_key_hashed_dictionary', {column_name}, key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;
</query>
<query>
SELECT dictHas('default.complex_key_hashed_dictionary', (number, toString(number)))
WITH (rand64() % toUInt64({elements_count}), toString(rand64() % toUInt64({elements_count}))) as key
SELECT dictHas('default.complex_key_hashed_dictionary', key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;

View File

@ -0,0 +1,5 @@
<test>
<query>SELECT count() FROM numbers(200000000) WHERE NOT ignore(intDiv(number, 1000000000))</query>
<query>SELECT count() FROM numbers(200000000) WHERE NOT ignore(divide(number, 1000000000))</query>
<query>SELECT count() FROM numbers(200000000) WHERE NOT ignore(toUInt32(divide(number, 1000000000)))</query>
</test>

View File

@ -21,15 +21,12 @@ ORDER BY (engine_id)
SETTINGS replicated_deduplication_window = 2, cleanup_delay_period=4, cleanup_delay_period_random_add=0;"
$CLICKHOUSE_CLIENT --query="INSERT INTO elog VALUES (toDate('2018-10-01'), 1, 'hello')"
sleep 1
$CLICKHOUSE_CLIENT --query="INSERT INTO elog VALUES (toDate('2018-10-01'), 2, 'hello')"
sleep 1
$CLICKHOUSE_CLIENT --query="INSERT INTO elog VALUES (toDate('2018-10-01'), 3, 'hello')"
$CLICKHOUSE_CLIENT --query="SELECT count(*) from elog" # 3 rows
count=$($CLICKHOUSE_CLIENT --query="SELECT COUNT(*) FROM system.zookeeper where path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/elog/s1/blocks'")
while [[ $count != 2 ]]
do
sleep 1
@ -39,9 +36,8 @@ done
$CLICKHOUSE_CLIENT --query="INSERT INTO elog VALUES (toDate('2018-10-01'), 1, 'hello')"
$CLICKHOUSE_CLIENT --query="SELECT count(*) from elog" # 4 rows
count=$($CLICKHOUSE_CLIENT --query="SELECT COUNT(*) FROM system.zookeeper where path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/elog/s1/blocks'")
while [[ $count != 2 ]]
do
sleep 1
@ -53,12 +49,10 @@ $CLICKHOUSE_CLIENT --query="INSERT INTO elog VALUES (toDate('2018-10-01'), 2, 'h
$CLICKHOUSE_CLIENT --query="SELECT count(*) from elog" # 5 rows
count=$($CLICKHOUSE_CLIENT --query="SELECT COUNT(*) FROM system.zookeeper where path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/elog/s1/blocks'")
while [[ $count != 2 ]]
do
sleep 1
count=$($CLICKHOUSE_CLIENT --query="SELECT COUNT(*) FROM system.zookeeper where path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/elog/s1/blocks'")
done
$CLICKHOUSE_CLIENT --query="INSERT INTO elog VALUES (toDate('2018-10-01'), 2, 'hello')"

View File

@ -1,5 +1,5 @@
| id | name | array |
|-:|:-|:-:|
| 1 | name1 | [1,2,3] |
| 2 | name2 | [4,5,6] |
| 3 | name3 | [7,8,9] |
| id | name | array | nullable | low_cardinality | decimal |
|-:|:-|:-|:-|:-|-:|
| 1 | name1 | [1,2,3] | Some long string | name1 | 1.110000 |
| 2 | name2 | [4,5,60000] | \N | Another long string | 222.222222 |
| 30000 | One more long string | [7,8,9] | name3 | name3 | 3.330000 |

View File

@ -1,6 +1,6 @@
DROP TABLE IF EXISTS makrdown;
CREATE TABLE markdown (id UInt32, name String, array Array(Int8)) ENGINE = Memory;
INSERT INTO markdown VALUES (1, 'name1', [1,2,3]), (2, 'name2', [4,5,6]), (3, 'name3', [7,8,9]);
CREATE TABLE markdown (id UInt32, name String, array Array(Int32), nullable Nullable(String), low_cardinality LowCardinality(String), decimal Decimal32(6)) ENGINE = Memory;
INSERT INTO markdown VALUES (1, 'name1', [1,2,3], 'Some long string', 'name1', 1.11), (2, 'name2', [4,5,60000], Null, 'Another long string', 222.222222), (30000, 'One more long string', [7,8,9], 'name3', 'name3', 3.33);
SELECT * FROM markdown FORMAT Markdown;
DROP TABLE IF EXISTS markdown

View File

@ -8,21 +8,11 @@ set -e
function thread()
{
db_engine=`$CLICKHOUSE_CLIENT -q "SELECT engine FROM system.databases WHERE name='$CLICKHOUSE_DATABASE'"`
if [[ $db_engine == "Atomic" ]]; then
# Ignore "Replica already exists" exception
while true; do
$CLICKHOUSE_CLIENT -n -q "DROP TABLE IF EXISTS test_table_$1 NO DELAY;
CREATE TABLE test_table_$1 (a UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r_$1') ORDER BY tuple();" 2>&1 |
grep -vP '(^$)|(^Received exception from server)|(^\d+\. )|because the last replica of the table was dropped right now|is already started to be removing by another replica right now|is already finished removing by another replica right now|Removing leftovers from table|Another replica was suddenly created|was successfully removed from ZooKeeper|was created by another server at the same moment|was suddenly removed|some other replicas were created at the same time|already exists'
done
else
while true; do
$CLICKHOUSE_CLIENT -n -q "DROP TABLE IF EXISTS test_table_$1;
CREATE TABLE test_table_$1 (a UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r_$1') ORDER BY tuple();" 2>&1 |
grep -vP '(^$)|(^Received exception from server)|(^\d+\. )|because the last replica of the table was dropped right now|is already started to be removing by another replica right now|is already finished removing by another replica right now|Removing leftovers from table|Another replica was suddenly created|was successfully removed from ZooKeeper|was created by another server at the same moment|was suddenly removed|some other replicas were created at the same time'
done
fi
while true; do
$CLICKHOUSE_CLIENT -n -q "DROP TABLE IF EXISTS test_table_$1 SYNC;
CREATE TABLE test_table_$1 (a UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r_$1') ORDER BY tuple();" 2>&1 |
grep -vP '(^$)|(^Received exception from server)|(^\d+\. )|because the last replica of the table was dropped right now|is already started to be removing by another replica right now|is already finished removing by another replica right now|Removing leftovers from table|Another replica was suddenly created|was successfully removed from ZooKeeper|was created by another server at the same moment|was suddenly removed|some other replicas were created at the same time'
done
}

View File

@ -0,0 +1,3 @@
-- remote() does not have sharding key, while force_optimize_skip_unused_shards=2 requires from table to have it.
-- But due to only one node, everything works.
select * from remote('127.1', system.one) settings optimize_skip_unused_shards=1, force_optimize_skip_unused_shards=2 format Null;