mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-10-10 02:20:48 +00:00
Merge remote-tracking branch 'upstream/master' into fix25
This commit is contained in:
commit
13fb59e07e
3
.gitmodules
vendored
3
.gitmodules
vendored
@ -97,9 +97,6 @@
|
||||
[submodule "contrib/rapidjson"]
|
||||
path = contrib/rapidjson
|
||||
url = https://github.com/Tencent/rapidjson
|
||||
[submodule "contrib/mimalloc"]
|
||||
path = contrib/mimalloc
|
||||
url = https://github.com/ClickHouse-Extras/mimalloc
|
||||
[submodule "contrib/fastops"]
|
||||
path = contrib/fastops
|
||||
url = https://github.com/ClickHouse-Extras/fastops
|
||||
|
@ -340,7 +340,6 @@ include (cmake/find_consistent-hashing.cmake)
|
||||
include (cmake/find_base64.cmake)
|
||||
include (cmake/find_parquet.cmake)
|
||||
include (cmake/find_hyperscan.cmake)
|
||||
include (cmake/find_mimalloc.cmake)
|
||||
include (cmake/find_simdjson.cmake)
|
||||
include (cmake/find_rapidjson.cmake)
|
||||
include (cmake/find_fastops.cmake)
|
||||
|
@ -1,4 +1,5 @@
|
||||
find_program (CCACHE_FOUND ccache)
|
||||
|
||||
if (CCACHE_FOUND AND NOT CMAKE_CXX_COMPILER_LAUNCHER MATCHES "ccache" AND NOT CMAKE_CXX_COMPILER MATCHES "ccache")
|
||||
execute_process(COMMAND ${CCACHE_FOUND} "-V" OUTPUT_VARIABLE CCACHE_VERSION)
|
||||
string(REGEX REPLACE "ccache version ([0-9\\.]+).*" "\\1" CCACHE_VERSION ${CCACHE_VERSION})
|
||||
|
@ -25,6 +25,8 @@ if (USE_LIBCXX)
|
||||
find_library (LIBCXXFS_LIBRARY c++fs)
|
||||
find_library (LIBCXXABI_LIBRARY c++abi)
|
||||
|
||||
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libc++")
|
||||
|
||||
target_link_libraries(global-libs INTERFACE ${EXCEPTION_HANDLING_LIBRARY})
|
||||
else ()
|
||||
set (LIBCXX_LIBRARY cxx)
|
||||
@ -38,7 +40,6 @@ if (USE_LIBCXX)
|
||||
target_link_libraries(global-libs INTERFACE ${LIBCXX_LIBRARY} ${LIBCXXABI_LIBRARY} ${LIBCXXFS_LIBRARY})
|
||||
|
||||
set (HAVE_LIBCXX 1)
|
||||
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libc++")
|
||||
|
||||
message (STATUS "Using libcxx: ${LIBCXX_LIBRARY}")
|
||||
message (STATUS "Using libcxxfs: ${LIBCXXFS_LIBRARY}")
|
||||
|
@ -1,17 +0,0 @@
|
||||
if (OS_LINUX AND NOT SANITIZE AND NOT ARCH_ARM AND NOT ARCH_32 AND NOT ARCH_PPC64LE)
|
||||
option (ENABLE_MIMALLOC "Set to FALSE to disable usage of mimalloc for internal ClickHouse caches" FALSE)
|
||||
endif ()
|
||||
|
||||
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/mimalloc/include/mimalloc.h")
|
||||
message (WARNING "submodule contrib/mimalloc is missing. to fix try run: \n git submodule update --init --recursive")
|
||||
return()
|
||||
endif ()
|
||||
|
||||
if (ENABLE_MIMALLOC)
|
||||
message (FATAL_ERROR "Mimalloc is not production ready. (Disable with cmake -D ENABLE_MIMALLOC=0). If you want to use mimalloc, you must manually remove this message.")
|
||||
|
||||
set (MIMALLOC_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/mimalloc/include)
|
||||
set (USE_MIMALLOC 1)
|
||||
set (MIMALLOC_LIBRARY mimalloc-static)
|
||||
message (STATUS "Using mimalloc: ${MIMALLOC_INCLUDE_DIR} : ${MIMALLOC_LIBRARY}")
|
||||
endif ()
|
@ -40,7 +40,7 @@ ${LIBCXX_SOURCE_DIR}/src/random.cpp
|
||||
|
||||
add_library(cxx ${SRCS})
|
||||
|
||||
target_include_directories(cxx BEFORE PUBLIC $<BUILD_INTERFACE:${LIBCXX_SOURCE_DIR}/include>)
|
||||
target_include_directories(cxx SYSTEM BEFORE PUBLIC $<BUILD_INTERFACE:${LIBCXX_SOURCE_DIR}/include>)
|
||||
target_compile_definitions(cxx PRIVATE -D_LIBCPP_BUILDING_LIBRARY -DLIBCXX_BUILDING_LIBCXXABI)
|
||||
target_compile_options(cxx PUBLIC -nostdinc++ -Wno-reserved-id-macro)
|
||||
target_link_libraries(cxx PUBLIC cxxabi)
|
||||
|
@ -30,12 +30,7 @@ target_include_directories(cxxabi SYSTEM BEFORE
|
||||
)
|
||||
target_compile_definitions(cxxabi PRIVATE -D_LIBCPP_BUILDING_LIBRARY)
|
||||
target_compile_options(cxxabi PRIVATE -nostdinc++ -fno-sanitize=undefined -Wno-macro-redefined) # If we don't disable UBSan, infinite recursion happens in dynamic_cast.
|
||||
|
||||
if (USE_UNWIND)
|
||||
target_link_libraries(cxxabi PRIVATE ${UNWIND_LIBRARIES})
|
||||
else ()
|
||||
target_link_libraries(cxxabi PRIVATE gcc_eh)
|
||||
endif ()
|
||||
target_link_libraries(cxxabi PUBLIC ${EXCEPTION_HANDLING_LIBRARY})
|
||||
|
||||
install(
|
||||
TARGETS cxxabi
|
||||
|
1
contrib/mimalloc
vendored
1
contrib/mimalloc
vendored
@ -1 +0,0 @@
|
||||
Subproject commit a787bdebce94bf3776dc0d1ad597917f479ab8d5
|
@ -256,11 +256,6 @@ if(RE2_INCLUDE_DIR)
|
||||
target_include_directories(clickhouse_common_io SYSTEM BEFORE PUBLIC ${RE2_INCLUDE_DIR})
|
||||
endif()
|
||||
|
||||
if (USE_MIMALLOC)
|
||||
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${MIMALLOC_INCLUDE_DIR})
|
||||
target_link_libraries (clickhouse_common_io PRIVATE ${MIMALLOC_LIBRARY})
|
||||
endif ()
|
||||
|
||||
if(CPUID_LIBRARY)
|
||||
target_link_libraries(clickhouse_common_io PRIVATE ${CPUID_LIBRARY})
|
||||
endif()
|
||||
|
@ -97,7 +97,7 @@ void PerformanceTestInfo::applySettings(XMLConfigurationPtr config)
|
||||
}
|
||||
|
||||
extractSettings(config, "settings", config_settings, settings_to_apply);
|
||||
settings.loadFromChanges(settings_to_apply);
|
||||
settings.applyChanges(settings_to_apply);
|
||||
|
||||
if (settings_contain("average_rows_speed_precision"))
|
||||
TestStats::avg_rows_speed_precision =
|
||||
|
@ -520,7 +520,18 @@ int Server::main(const std::vector<std::string> & /*args*/)
|
||||
|
||||
/// Init trace collector only after trace_log system table was created
|
||||
/// Disable it if we collect test coverage information, because it will work extremely slow.
|
||||
#if USE_UNWIND && !WITH_COVERAGE
|
||||
///
|
||||
/// It also cannot work with sanitizers.
|
||||
/// Sanitizers are using quick "frame walking" stack unwinding (this implies -fno-omit-frame-pointer)
|
||||
/// And they do unwiding frequently (on every malloc/free, thread/mutex operations, etc).
|
||||
/// They change %rbp during unwinding and it confuses libunwind if signal comes during sanitizer unwiding
|
||||
/// and query profiler decide to unwind stack with libunwind at this moment.
|
||||
///
|
||||
/// Symptoms: you'll get silent Segmentation Fault - without sanitizer message and without usual ClickHouse diagnostics.
|
||||
///
|
||||
/// Look at compiler-rt/lib/sanitizer_common/sanitizer_stacktrace.h
|
||||
///
|
||||
#if USE_UNWIND && !WITH_COVERAGE && !defined(SANITIZER)
|
||||
/// QueryProfiler cannot work reliably with any other libunwind or without PHDR cache.
|
||||
if (hasPHDRCache())
|
||||
global_context->initializeTraceCollector();
|
||||
|
@ -446,7 +446,7 @@ namespace ErrorCodes
|
||||
extern const int VIOLATED_CONSTRAINT = 469;
|
||||
extern const int QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW = 470;
|
||||
extern const int SETTINGS_ARE_NOT_SUPPORTED = 471;
|
||||
extern const int IMMUTABLE_SETTING = 472;
|
||||
extern const int READONLY_SETTING = 472;
|
||||
|
||||
extern const int KEEPER_EXCEPTION = 999;
|
||||
extern const int POCO_EXCEPTION = 1000;
|
||||
|
@ -1,70 +0,0 @@
|
||||
#include "MiAllocator.h"
|
||||
|
||||
#if USE_MIMALLOC
|
||||
#include <mimalloc.h>
|
||||
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/formatReadable.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int CANNOT_ALLOCATE_MEMORY;
|
||||
}
|
||||
|
||||
void * MiAllocator::alloc(size_t size, size_t alignment)
|
||||
{
|
||||
void * ptr;
|
||||
if (alignment == 0)
|
||||
{
|
||||
ptr = mi_malloc(size);
|
||||
if (!ptr)
|
||||
DB::throwFromErrno("MiAllocator: Cannot allocate in mimalloc " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
||||
}
|
||||
else
|
||||
{
|
||||
ptr = mi_malloc_aligned(size, alignment);
|
||||
if (!ptr)
|
||||
DB::throwFromErrno("MiAllocator: Cannot allocate in mimalloc (mi_malloc_aligned) " + formatReadableSizeWithBinarySuffix(size) + " with alignment " + toString(alignment) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
||||
}
|
||||
return ptr;
|
||||
}
|
||||
|
||||
void MiAllocator::free(void * buf, size_t)
|
||||
{
|
||||
mi_free(buf);
|
||||
}
|
||||
|
||||
void * MiAllocator::realloc(void * old_ptr, size_t, size_t new_size, size_t alignment)
|
||||
{
|
||||
if (old_ptr == nullptr)
|
||||
return alloc(new_size, alignment);
|
||||
|
||||
if (new_size == 0)
|
||||
{
|
||||
mi_free(old_ptr);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void * ptr;
|
||||
|
||||
if (alignment == 0)
|
||||
{
|
||||
ptr = mi_realloc(old_ptr, alignment);
|
||||
if (!ptr)
|
||||
DB::throwFromErrno("MiAllocator: Cannot reallocate in mimalloc " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
||||
}
|
||||
else
|
||||
{
|
||||
ptr = mi_realloc_aligned(old_ptr, new_size, alignment);
|
||||
if (!ptr)
|
||||
DB::throwFromErrno("MiAllocator: Cannot reallocate in mimalloc (mi_realloc_aligned) " + formatReadableSizeWithBinarySuffix(size) + " with alignment " + toString(alignment) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
||||
}
|
||||
return ptr;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endif
|
@ -1,27 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/config.h>
|
||||
|
||||
#if USE_MIMALLOC
|
||||
#include <cstddef>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/*
|
||||
* This is a different allocator that is based on mimalloc (Microsoft malloc).
|
||||
* It can be used separately from main allocator to catch heap corruptions and vulnerabilities (for example, for caches).
|
||||
* We use MI_SECURE mode in mimalloc to achieve such behaviour.
|
||||
*/
|
||||
struct MiAllocator
|
||||
{
|
||||
static void * alloc(size_t size, size_t alignment = 0);
|
||||
|
||||
static void free(void * buf, size_t);
|
||||
|
||||
static void * realloc(void * old_ptr, size_t, size_t new_size, size_t alignment = 0);
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif
|
@ -36,8 +36,10 @@
|
||||
M(MarkCacheMisses, "") \
|
||||
M(CreatedReadBufferOrdinary, "") \
|
||||
M(CreatedReadBufferAIO, "") \
|
||||
M(CreatedReadBufferAIOFailed, "") \
|
||||
M(CreatedWriteBufferOrdinary, "") \
|
||||
M(CreatedWriteBufferAIO, "") \
|
||||
M(CreatedWriteBufferAIOFailed, "") \
|
||||
M(DiskReadElapsedMicroseconds, "Total time spent waiting for read syscall. This include reads from page cache.") \
|
||||
M(DiskWriteElapsedMicroseconds, "Total time spent waiting for write syscall. This include writes to page cache.") \
|
||||
M(NetworkReceiveElapsedMicroseconds, "") \
|
||||
|
@ -61,6 +61,7 @@ public:
|
||||
InternalTextLogsQueueWeakPtr logs_queue_ptr;
|
||||
|
||||
std::vector<UInt32> thread_numbers;
|
||||
std::vector<UInt32> os_thread_ids;
|
||||
|
||||
/// The first thread created this thread group
|
||||
UInt32 master_thread_number = 0;
|
||||
|
@ -8,6 +8,5 @@
|
||||
#cmakedefine01 USE_CPUID
|
||||
#cmakedefine01 USE_CPUINFO
|
||||
#cmakedefine01 USE_BROTLI
|
||||
#cmakedefine01 USE_MIMALLOC
|
||||
#cmakedefine01 USE_UNWIND
|
||||
#cmakedefine01 CLICKHOUSE_SPLIT_BINARY
|
||||
|
@ -76,8 +76,5 @@ target_link_libraries (cow_compositions PRIVATE clickhouse_common_io)
|
||||
add_executable (stopwatch stopwatch.cpp)
|
||||
target_link_libraries (stopwatch PRIVATE clickhouse_common_io)
|
||||
|
||||
add_executable (mi_malloc_test mi_malloc_test.cpp)
|
||||
target_link_libraries (mi_malloc_test PRIVATE clickhouse_common_io)
|
||||
|
||||
add_executable (symbol_index symbol_index.cpp)
|
||||
target_link_libraries (symbol_index PRIVATE clickhouse_common_io)
|
||||
|
@ -1,118 +0,0 @@
|
||||
/** In addition to ClickHouse (Apache 2) license, this file can be also used under MIT license:
|
||||
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2019 Yandex LLC, Alexey Milovidov
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
|
||||
*/
|
||||
|
||||
#include <map>
|
||||
#include <vector>
|
||||
#include <cstdint>
|
||||
#include <random>
|
||||
#include <stdexcept>
|
||||
#include <iostream>
|
||||
|
||||
#include <Common/config.h>
|
||||
|
||||
//#undef USE_MIMALLOC
|
||||
//#define USE_MIMALLOC 0
|
||||
|
||||
#if USE_MIMALLOC
|
||||
|
||||
#include <mimalloc.h>
|
||||
#define malloc mi_malloc
|
||||
#define free mi_free
|
||||
|
||||
#else
|
||||
|
||||
#include <stdlib.h>
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
size_t total_size{0};
|
||||
|
||||
struct Allocation
|
||||
{
|
||||
void * ptr = nullptr;
|
||||
size_t size = 0;
|
||||
|
||||
Allocation() {}
|
||||
|
||||
Allocation(size_t size_)
|
||||
: size(size_)
|
||||
{
|
||||
ptr = malloc(size);
|
||||
if (!ptr)
|
||||
throw std::runtime_error("Cannot allocate memory");
|
||||
total_size += size;
|
||||
}
|
||||
|
||||
~Allocation()
|
||||
{
|
||||
if (ptr)
|
||||
{
|
||||
free(ptr);
|
||||
total_size -= size;
|
||||
}
|
||||
ptr = nullptr;
|
||||
}
|
||||
|
||||
Allocation(const Allocation &) = delete;
|
||||
|
||||
Allocation(Allocation && rhs)
|
||||
{
|
||||
ptr = rhs.ptr;
|
||||
size = rhs.size;
|
||||
rhs.ptr = nullptr;
|
||||
rhs.size = 0;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
int main(int, char **)
|
||||
{
|
||||
std::vector<Allocation> allocations;
|
||||
|
||||
constexpr size_t limit = 100000000;
|
||||
constexpr size_t min_alloc_size = 65536;
|
||||
constexpr size_t max_alloc_size = 10000000;
|
||||
|
||||
std::mt19937 rng;
|
||||
auto distribution = std::uniform_int_distribution(min_alloc_size, max_alloc_size);
|
||||
|
||||
size_t total_allocations = 0;
|
||||
|
||||
while (true)
|
||||
{
|
||||
size_t size = distribution(rng);
|
||||
|
||||
while (total_size + size > limit)
|
||||
allocations.pop_back();
|
||||
|
||||
allocations.emplace_back(size);
|
||||
|
||||
++total_allocations;
|
||||
if (total_allocations % (1ULL << 20) == 0)
|
||||
std::cerr << "Total allocations: " << total_allocations << "\n";
|
||||
}
|
||||
}
|
@ -124,6 +124,8 @@
|
||||
#endif
|
||||
#endif
|
||||
|
||||
/// TODO Strange enough, there is no way to detect UB sanitizer.
|
||||
|
||||
/// Explicitly allow undefined behaviour for certain functions. Use it as a function attribute.
|
||||
/// It is useful in case when compiler cannot see (and exploit) it, but UBSan can.
|
||||
/// Example: multiplication of signed integers with possibility of overflow when both sides are from user input.
|
||||
|
@ -42,8 +42,7 @@ struct Settings : public SettingsCollection<Settings>
|
||||
* but we are not going to do it, because settings is used everywhere as static struct fields.
|
||||
*/
|
||||
|
||||
/// M (mutable) for normal settings, IM (immutable) for not updateable settings.
|
||||
#define LIST_OF_SETTINGS(M, IM) \
|
||||
#define LIST_OF_SETTINGS(M) \
|
||||
M(SettingUInt64, min_compress_block_size, 65536, "The actual size of the block to compress, if the uncompressed data less than max_compress_block_size is no less than this value and no less than the volume of data for one mark.") \
|
||||
M(SettingUInt64, max_compress_block_size, 1048576, "The maximum size of blocks of uncompressed data before compressing for writing to a table.") \
|
||||
M(SettingUInt64, max_block_size, DEFAULT_BLOCK_SIZE, "Maximum block size for reading") \
|
||||
|
@ -17,11 +17,6 @@ class Field;
|
||||
class ReadBuffer;
|
||||
class WriteBuffer;
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int IMMUTABLE_SETTING;
|
||||
}
|
||||
|
||||
/** One setting for any type.
|
||||
* Stores a value within itself, as well as a flag - whether the value was changed.
|
||||
* This is done so that you can send to the remote servers only changed settings (or explicitly specified in the config) values.
|
||||
@ -317,15 +312,12 @@ private:
|
||||
using DeserializeFunction = void (*)(Derived &, ReadBuffer & buf);
|
||||
using CastValueWithoutApplyingFunction = Field (*)(const Field &);
|
||||
|
||||
|
||||
struct MemberInfo
|
||||
{
|
||||
IsChangedFunction is_changed;
|
||||
StringRef name;
|
||||
StringRef description;
|
||||
/// Can be updated after first load for config/definition.
|
||||
/// Non updatable settings can be `changed`,
|
||||
/// if they were overwritten in config/definition.
|
||||
const bool updateable;
|
||||
GetStringFunction get_string;
|
||||
GetFieldFunction get_field;
|
||||
SetStringFunction set_string;
|
||||
@ -405,7 +397,6 @@ public:
|
||||
const_reference(const const_reference & src) = default;
|
||||
const StringRef & getName() const { return member->name; }
|
||||
const StringRef & getDescription() const { return member->description; }
|
||||
bool isUpdateable() const { return member->updateable; }
|
||||
bool isChanged() const { return member->isChanged(*collection); }
|
||||
Field getValue() const { return member->get_field(*collection); }
|
||||
String getValueAsString() const { return member->get_string(*collection); }
|
||||
@ -425,18 +416,6 @@ public:
|
||||
reference(const const_reference & src) : const_reference(src) {}
|
||||
void setValue(const Field & value) { this->member->set_field(*const_cast<Derived *>(this->collection), value); }
|
||||
void setValue(const String & value) { this->member->set_string(*const_cast<Derived *>(this->collection), value); }
|
||||
void updateValue(const Field & value)
|
||||
{
|
||||
if (!this->member->updateable)
|
||||
throw Exception("Setting '" + this->member->name.toString() + "' is restricted for updates.", ErrorCodes::IMMUTABLE_SETTING);
|
||||
setValue(value);
|
||||
}
|
||||
void updateValue(const String & value)
|
||||
{
|
||||
if (!this->member->updateable)
|
||||
throw Exception("Setting '" + this->member->name.toString() + "' is restricted for updates.", ErrorCodes::IMMUTABLE_SETTING);
|
||||
setValue(value);
|
||||
}
|
||||
};
|
||||
|
||||
/// Iterator to iterating through all the settings.
|
||||
@ -519,15 +498,6 @@ public:
|
||||
void set(size_t index, const String & value) { (*this)[index].setValue(value); }
|
||||
void set(const String & name, const String & value) { (*this)[name].setValue(value); }
|
||||
|
||||
/// Updates setting's value. Checks it' mutability.
|
||||
void update(size_t index, const Field & value) { (*this)[index].updateValue(value); }
|
||||
|
||||
void update(const String & name, const Field & value) { (*this)[name].updateValue(value); }
|
||||
|
||||
void update(size_t index, const String & value) { (*this)[index].updateValue(value); }
|
||||
|
||||
void update(const String & name, const String & value) { (*this)[name].updateValue(value); }
|
||||
|
||||
/// Returns value of a setting.
|
||||
Field get(size_t index) const { return (*this)[index].getValue(); }
|
||||
Field get(const String & name) const { return (*this)[name].getValue(); }
|
||||
@ -591,35 +561,19 @@ public:
|
||||
return found_changes;
|
||||
}
|
||||
|
||||
/// Applies change to the settings. Doesn't check settings mutability.
|
||||
void loadFromChange(const SettingChange & change)
|
||||
/// Applies change to concrete setting.
|
||||
void applyChange(const SettingChange & change)
|
||||
{
|
||||
set(change.name, change.value);
|
||||
}
|
||||
|
||||
/// Applies changes to the settings. Should be used in initial settings loading.
|
||||
/// (on table creation or loading from config)
|
||||
void loadFromChanges(const SettingsChanges & changes)
|
||||
/// Applies changes to the settings.
|
||||
void applyChanges(const SettingsChanges & changes)
|
||||
{
|
||||
for (const SettingChange & change : changes)
|
||||
loadFromChange(change);
|
||||
applyChange(change);
|
||||
}
|
||||
|
||||
/// Applies change to the settings, checks settings mutability.
|
||||
void updateFromChange(const SettingChange & change)
|
||||
{
|
||||
update(change.name, change.value);
|
||||
}
|
||||
|
||||
/// Applies changes to the settings. Should be used for settigns update.
|
||||
/// (ALTER MODIFY SETTINGS)
|
||||
void updateFromChanges(const SettingsChanges & changes)
|
||||
{
|
||||
for (const SettingChange & change : changes)
|
||||
updateFromChange(change);
|
||||
}
|
||||
|
||||
|
||||
void copyChangesFrom(const Derived & src)
|
||||
{
|
||||
for (const auto & member : members())
|
||||
@ -663,7 +617,7 @@ public:
|
||||
};
|
||||
|
||||
#define DECLARE_SETTINGS_COLLECTION(LIST_OF_SETTINGS_MACRO) \
|
||||
LIST_OF_SETTINGS_MACRO(DECLARE_SETTINGS_COLLECTION_DECLARE_VARIABLES_HELPER_, DECLARE_SETTINGS_COLLECTION_DECLARE_VARIABLES_HELPER_)
|
||||
LIST_OF_SETTINGS_MACRO(DECLARE_SETTINGS_COLLECTION_DECLARE_VARIABLES_HELPER_)
|
||||
|
||||
|
||||
#define IMPLEMENT_SETTINGS_COLLECTION(DERIVED_CLASS_NAME, LIST_OF_SETTINGS_MACRO) \
|
||||
@ -673,9 +627,9 @@ public:
|
||||
using Derived = DERIVED_CLASS_NAME; \
|
||||
struct Functions \
|
||||
{ \
|
||||
LIST_OF_SETTINGS_MACRO(IMPLEMENT_SETTINGS_COLLECTION_DEFINE_FUNCTIONS_HELPER_, IMPLEMENT_SETTINGS_COLLECTION_DEFINE_FUNCTIONS_HELPER_) \
|
||||
LIST_OF_SETTINGS_MACRO(IMPLEMENT_SETTINGS_COLLECTION_DEFINE_FUNCTIONS_HELPER_) \
|
||||
}; \
|
||||
LIST_OF_SETTINGS_MACRO(IMPLEMENT_SETTINGS_COLLECTION_ADD_MUTABLE_MEMBER_INFO_HELPER_, IMPLEMENT_SETTINGS_COLLECTION_ADD_IMMUTABLE_MEMBER_INFO_HELPER_) \
|
||||
LIST_OF_SETTINGS_MACRO(IMPLEMENT_SETTINGS_COLLECTION_ADD_MEMBER_INFO_HELPER_) \
|
||||
}
|
||||
|
||||
|
||||
@ -690,20 +644,12 @@ public:
|
||||
static void NAME##_setField(Derived & collection, const Field & value) { collection.NAME.set(value); } \
|
||||
static void NAME##_serialize(const Derived & collection, WriteBuffer & buf) { collection.NAME.serialize(buf); } \
|
||||
static void NAME##_deserialize(Derived & collection, ReadBuffer & buf) { collection.NAME.deserialize(buf); } \
|
||||
static Field NAME##_castValueWithoutApplying(const Field & value) { TYPE temp{DEFAULT}; temp.set(value); return temp.toField(); }
|
||||
static Field NAME##_castValueWithoutApplying(const Field & value) { TYPE temp{DEFAULT}; temp.set(value); return temp.toField(); } \
|
||||
|
||||
|
||||
#define IMPLEMENT_SETTINGS_COLLECTION_ADD_MUTABLE_MEMBER_INFO_HELPER_(TYPE, NAME, DEFAULT, DESCRIPTION) \
|
||||
#define IMPLEMENT_SETTINGS_COLLECTION_ADD_MEMBER_INFO_HELPER_(TYPE, NAME, DEFAULT, DESCRIPTION) \
|
||||
add({[](const Derived & d) { return d.NAME.changed; }, \
|
||||
StringRef(#NAME, strlen(#NAME)), StringRef(DESCRIPTION, strlen(DESCRIPTION)), true, \
|
||||
&Functions::NAME##_getString, &Functions::NAME##_getField, \
|
||||
&Functions::NAME##_setString, &Functions::NAME##_setField, \
|
||||
&Functions::NAME##_serialize, &Functions::NAME##_deserialize, \
|
||||
&Functions::NAME##_castValueWithoutApplying });
|
||||
|
||||
#define IMPLEMENT_SETTINGS_COLLECTION_ADD_IMMUTABLE_MEMBER_INFO_HELPER_(TYPE, NAME, DEFAULT, DESCRIPTION) \
|
||||
add({[](const Derived & d) { return d.NAME.changed; }, \
|
||||
StringRef(#NAME, strlen(#NAME)), StringRef(DESCRIPTION, strlen(DESCRIPTION)), false, \
|
||||
StringRef(#NAME, strlen(#NAME)), StringRef(DESCRIPTION, strlen(DESCRIPTION)), \
|
||||
&Functions::NAME##_getString, &Functions::NAME##_getField, \
|
||||
&Functions::NAME##_setString, &Functions::NAME##_setField, \
|
||||
&Functions::NAME##_serialize, &Functions::NAME##_deserialize, \
|
||||
|
@ -314,7 +314,11 @@ private:
|
||||
/// NOTE: Acquire a read lock, therefore f() should be thread safe
|
||||
std::shared_lock lock(children_mutex);
|
||||
|
||||
for (auto & child : children)
|
||||
// Reduce lock scope and avoid recursive locking since that is undefined for shared_mutex.
|
||||
const auto children_copy = children;
|
||||
lock.unlock();
|
||||
|
||||
for (auto & child : children_copy)
|
||||
if (f(*child))
|
||||
return;
|
||||
}
|
||||
|
@ -6,10 +6,6 @@
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Common/PODArray.h>
|
||||
|
||||
#include <Common/config.h>
|
||||
#if USE_MIMALLOC
|
||||
#include <Common/MiAllocator.h>
|
||||
#endif
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -43,9 +39,7 @@ struct MarkInCompressedFile
|
||||
}
|
||||
|
||||
};
|
||||
#if USE_MIMALLOC
|
||||
using MarksInCompressedFile = PODArray<MarkInCompressedFile, 4096, MiAllocator>;
|
||||
#else
|
||||
|
||||
using MarksInCompressedFile = PODArray<MarkInCompressedFile>;
|
||||
#endif
|
||||
|
||||
}
|
||||
|
@ -26,7 +26,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
|
||||
* Although now any insertion into the table is done via PushingToViewsBlockOutputStream,
|
||||
* but it's clear that here is not the best place for this functionality.
|
||||
*/
|
||||
addTableLock(storage->lockStructureForShare(true, context.getCurrentQueryId()));
|
||||
addTableLock(storage->lockStructureForShare(true, context.getInitialQueryId()));
|
||||
|
||||
/// If the "root" table deduplactes blocks, there are no need to make deduplication for children
|
||||
/// Moreover, deduplication for AggregatingMergeTree children could produce false positives due to low size of inserting blocks
|
||||
|
@ -70,6 +70,7 @@ CacheDictionary::CacheDictionary(
|
||||
, dict_struct(dict_struct_)
|
||||
, source_ptr{std::move(source_ptr_)}
|
||||
, dict_lifetime(dict_lifetime_)
|
||||
, log(&Logger::get("ExternalDictionaries"))
|
||||
, size{roundUpToPowerOfTwoOrZero(std::max(size_, size_t(max_collision_length)))}
|
||||
, size_overlap_mask{this->size - 1}
|
||||
, cells{this->size}
|
||||
@ -575,6 +576,12 @@ BlockInputStreamPtr CacheDictionary::getBlockInputStream(const Names & column_na
|
||||
return std::make_shared<BlockInputStreamType>(shared_from_this(), max_block_size, getCachedIds(), column_names);
|
||||
}
|
||||
|
||||
std::exception_ptr CacheDictionary::getLastException() const
|
||||
{
|
||||
const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
|
||||
return last_exception;
|
||||
}
|
||||
|
||||
void registerDictionaryCache(DictionaryFactory & factory)
|
||||
{
|
||||
auto create_layout = [=](const std::string & name,
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include <shared_mutex>
|
||||
#include <variant>
|
||||
#include <vector>
|
||||
#include <common/logger_useful.h>
|
||||
#include <Columns/ColumnDecimal.h>
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <pcg_random.hpp>
|
||||
@ -74,6 +75,8 @@ public:
|
||||
void isInVectorConstant(const PaddedPODArray<Key> & child_ids, const Key ancestor_id, PaddedPODArray<UInt8> & out) const override;
|
||||
void isInConstantVector(const Key child_id, const PaddedPODArray<Key> & ancestor_ids, PaddedPODArray<UInt8> & out) const override;
|
||||
|
||||
std::exception_ptr getLastException() const override;
|
||||
|
||||
template <typename T>
|
||||
using ResultArrayType = std::conditional_t<IsDecimalNumber<T>, DecimalPaddedPODArray<T>, PaddedPODArray<T>>;
|
||||
|
||||
@ -253,8 +256,9 @@ private:
|
||||
|
||||
const std::string name;
|
||||
const DictionaryStructure dict_struct;
|
||||
const DictionarySourcePtr source_ptr;
|
||||
mutable DictionarySourcePtr source_ptr;
|
||||
const DictionaryLifetime dict_lifetime;
|
||||
Logger * const log;
|
||||
|
||||
mutable std::shared_mutex rw_lock;
|
||||
|
||||
@ -274,6 +278,10 @@ private:
|
||||
Attribute * hierarchical_attribute = nullptr;
|
||||
std::unique_ptr<ArenaWithFreeLists> string_arena;
|
||||
|
||||
mutable std::exception_ptr last_exception;
|
||||
mutable size_t error_count = 0;
|
||||
mutable std::chrono::system_clock::time_point backoff_end_time;
|
||||
|
||||
mutable pcg64 rnd_engine;
|
||||
|
||||
mutable size_t bytes_allocated = 0;
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
#include <Common/ProfilingScopedRWLock.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <common/DateLUT.h>
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
#include <ext/map.h>
|
||||
#include <ext/range.h>
|
||||
@ -243,22 +244,32 @@ template <typename PresentIdHandler, typename AbsentIdHandler>
|
||||
void CacheDictionary::update(
|
||||
const std::vector<Key> & requested_ids, PresentIdHandler && on_cell_updated, AbsentIdHandler && on_id_not_found) const
|
||||
{
|
||||
CurrentMetrics::Increment metric_increment{CurrentMetrics::DictCacheRequests};
|
||||
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, requested_ids.size());
|
||||
|
||||
std::unordered_map<Key, UInt8> remaining_ids{requested_ids.size()};
|
||||
for (const auto id : requested_ids)
|
||||
remaining_ids.insert({id, 0});
|
||||
|
||||
std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec};
|
||||
const auto now = std::chrono::system_clock::now();
|
||||
|
||||
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
|
||||
|
||||
if (now > backoff_end_time)
|
||||
{
|
||||
CurrentMetrics::Increment metric_increment{CurrentMetrics::DictCacheRequests};
|
||||
try
|
||||
{
|
||||
if (error_count)
|
||||
{
|
||||
/// Recover after error: we have to clone the source here because
|
||||
/// it could keep connections which should be reset after error.
|
||||
source_ptr = source_ptr->clone();
|
||||
}
|
||||
|
||||
Stopwatch watch;
|
||||
auto stream = source_ptr->loadIds(requested_ids);
|
||||
stream->readPrefix();
|
||||
|
||||
const auto now = std::chrono::system_clock::now();
|
||||
|
||||
while (const auto block = stream->read())
|
||||
{
|
||||
const auto id_column = typeid_cast<const ColumnUInt64 *>(block.safeGetByPosition(0).column.get());
|
||||
@ -294,7 +305,10 @@ void CacheDictionary::update(
|
||||
|
||||
cell.id = id;
|
||||
if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0)
|
||||
cell.setExpiresAt(std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)});
|
||||
{
|
||||
std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec};
|
||||
cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)});
|
||||
}
|
||||
else
|
||||
cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());
|
||||
|
||||
@ -307,13 +321,25 @@ void CacheDictionary::update(
|
||||
|
||||
stream->readSuffix();
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, requested_ids.size());
|
||||
error_count = 0;
|
||||
last_exception = std::exception_ptr{};
|
||||
backoff_end_time = std::chrono::system_clock::time_point{};
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::DictCacheRequestTimeNs, watch.elapsed());
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
++error_count;
|
||||
last_exception = std::current_exception();
|
||||
backoff_end_time = now + std::chrono::seconds(ExternalLoadableBackoff{}.calculateDuration(rnd_engine, error_count));
|
||||
|
||||
tryLogException(last_exception, log, "Could not update cache dictionary '" + getName() +
|
||||
"', next update is scheduled at " + DateLUT::instance().timeToString(std::chrono::system_clock::to_time_t(backoff_end_time)));
|
||||
}
|
||||
}
|
||||
|
||||
size_t not_found_num = 0, found_num = 0;
|
||||
|
||||
const auto now = std::chrono::system_clock::now();
|
||||
/// Check which ids have not been found and require setting null_value
|
||||
for (const auto & id_found_pair : remaining_ids)
|
||||
{
|
||||
@ -328,24 +354,45 @@ void CacheDictionary::update(
|
||||
|
||||
const auto find_result = findCellIdx(id, now);
|
||||
const auto & cell_idx = find_result.cell_idx;
|
||||
|
||||
auto & cell = cells[cell_idx];
|
||||
|
||||
/// Set null_value for each attribute
|
||||
for (auto & attribute : attributes)
|
||||
setDefaultAttributeValue(attribute, cell_idx);
|
||||
if (error_count)
|
||||
{
|
||||
if (find_result.outdated)
|
||||
{
|
||||
/// We have expired data for that `id` so we can continue using it.
|
||||
bool was_default = cell.isDefault();
|
||||
cell.setExpiresAt(backoff_end_time);
|
||||
if (was_default)
|
||||
cell.setDefault();
|
||||
if (was_default)
|
||||
on_id_not_found(id, cell_idx);
|
||||
else
|
||||
on_cell_updated(id, cell_idx);
|
||||
continue;
|
||||
}
|
||||
/// We don't have expired data for that `id` so all we can do is to rethrow `last_exception`.
|
||||
std::rethrow_exception(last_exception);
|
||||
}
|
||||
|
||||
/// Check if cell had not been occupied before and increment element counter if it hadn't
|
||||
if (cell.id == 0 && cell_idx != zero_cell_idx)
|
||||
element_count.fetch_add(1, std::memory_order_relaxed);
|
||||
|
||||
cell.id = id;
|
||||
|
||||
if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0)
|
||||
cell.setExpiresAt(std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)});
|
||||
{
|
||||
std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec};
|
||||
cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)});
|
||||
}
|
||||
else
|
||||
cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());
|
||||
|
||||
/// Set null_value for each attribute
|
||||
cell.setDefault();
|
||||
for (auto & attribute : attributes)
|
||||
setDefaultAttributeValue(attribute, cell_idx);
|
||||
|
||||
/// inform caller that the cell has not been found
|
||||
on_id_not_found(id, cell_idx);
|
||||
|
@ -56,6 +56,8 @@ struct IDictionaryBase : public IExternalLoadable
|
||||
return source && source->isModified();
|
||||
}
|
||||
|
||||
virtual std::exception_ptr getLastException() const { return {}; }
|
||||
|
||||
std::shared_ptr<IDictionaryBase> shared_from_this()
|
||||
{
|
||||
return std::static_pointer_cast<IDictionaryBase>(IExternalLoadable::shared_from_this());
|
||||
|
@ -65,7 +65,7 @@ FunctionBasePtr FunctionBuilderJoinGet::buildImpl(const ColumnsWithTypeAndName &
|
||||
auto join = storage_join->getJoin();
|
||||
DataTypes data_types(arguments.size());
|
||||
|
||||
auto table_lock = storage_join->lockStructureForShare(false, context.getCurrentQueryId());
|
||||
auto table_lock = storage_join->lockStructureForShare(false, context.getInitialQueryId());
|
||||
for (size_t i = 0; i < arguments.size(); ++i)
|
||||
data_types[i] = arguments[i].type;
|
||||
|
||||
|
@ -6,12 +6,14 @@ class FunctionFactory;
|
||||
void registerFunctionAddressToSymbol(FunctionFactory & factory);
|
||||
void registerFunctionDemangle(FunctionFactory & factory);
|
||||
void registerFunctionAddressToLine(FunctionFactory & factory);
|
||||
void registerFunctionTrap(FunctionFactory & factory);
|
||||
|
||||
void registerFunctionsIntrospection(FunctionFactory & factory)
|
||||
{
|
||||
registerFunctionAddressToSymbol(factory);
|
||||
registerFunctionDemangle(factory);
|
||||
registerFunctionAddressToLine(factory);
|
||||
registerFunctionTrap(factory);
|
||||
}
|
||||
|
||||
}
|
||||
|
143
dbms/src/Functions/trap.cpp
Normal file
143
dbms/src/Functions/trap.cpp
Normal file
@ -0,0 +1,143 @@
|
||||
#if 0
|
||||
|
||||
#include <Functions/IFunction.h>
|
||||
#include <Functions/FunctionFactory.h>
|
||||
#include <Functions/FunctionHelpers.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <Columns/ColumnString.h>
|
||||
|
||||
#include <thread>
|
||||
#include <memory>
|
||||
#include <cstdlib>
|
||||
#include <unistd.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int ILLEGAL_COLUMN;
|
||||
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
|
||||
/// Various illegal actions to test diagnostic features of ClickHouse itself. Should not be enabled in production builds.
|
||||
class FunctionTrap : public IFunction
|
||||
{
|
||||
public:
|
||||
static constexpr auto name = "trap";
|
||||
static FunctionPtr create(const Context &)
|
||||
{
|
||||
return std::make_shared<FunctionTrap>();
|
||||
}
|
||||
|
||||
String getName() const override
|
||||
{
|
||||
return name;
|
||||
}
|
||||
|
||||
size_t getNumberOfArguments() const override
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
|
||||
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
|
||||
{
|
||||
if (!isString(arguments[0]))
|
||||
throw Exception("The only argument for function " + getName() + " must be constant String", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
|
||||
return std::make_shared<DataTypeUInt8>();
|
||||
}
|
||||
|
||||
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) override
|
||||
{
|
||||
if (const ColumnConst * column = checkAndGetColumnConst<ColumnString>(block.getByPosition(arguments[0]).column.get()))
|
||||
{
|
||||
String mode = column->getValue<String>();
|
||||
|
||||
if (mode == "read nullptr c++")
|
||||
{
|
||||
volatile int x = *reinterpret_cast<const volatile int *>(0);
|
||||
(void)x;
|
||||
}
|
||||
else if (mode == "read nullptr asm")
|
||||
{
|
||||
__asm__ volatile ("movq $0, %rax");
|
||||
__asm__ volatile ("movq (%rax), %rax");
|
||||
}
|
||||
else if (mode == "illegal instruction")
|
||||
{
|
||||
__asm__ volatile ("ud2a");
|
||||
}
|
||||
else if (mode == "abort")
|
||||
{
|
||||
abort();
|
||||
}
|
||||
else if (mode == "use after free")
|
||||
{
|
||||
int * x_ptr;
|
||||
{
|
||||
auto x = std::make_unique<int>();
|
||||
x_ptr = x.get();
|
||||
}
|
||||
*x_ptr = 1;
|
||||
(void)x_ptr;
|
||||
}
|
||||
else if (mode == "use after scope")
|
||||
{
|
||||
volatile int * x_ptr;
|
||||
[&]{
|
||||
volatile int x = 0;
|
||||
x_ptr = &x;
|
||||
(void)x;
|
||||
}();
|
||||
[&]{
|
||||
volatile int y = 1;
|
||||
*x_ptr = 2;
|
||||
(void)y;
|
||||
}();
|
||||
(void)x_ptr;
|
||||
}
|
||||
else if (mode == "uninitialized memory")
|
||||
{
|
||||
int x;
|
||||
(void)write(2, &x, sizeof(x));
|
||||
}
|
||||
else if (mode == "data race")
|
||||
{
|
||||
int x = 0;
|
||||
std::thread t1([&]{ ++x; });
|
||||
std::thread t2([&]{ ++x; });
|
||||
t1.join();
|
||||
t2.join();
|
||||
}
|
||||
else
|
||||
throw Exception("Unknown trap mode", ErrorCodes::BAD_ARGUMENTS);
|
||||
}
|
||||
else
|
||||
throw Exception("The only argument for function " + getName() + " must be constant String", ErrorCodes::ILLEGAL_COLUMN);
|
||||
|
||||
block.getByPosition(result).column = block.getByPosition(result).type->createColumnConst(input_rows_count, 0ULL);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
void registerFunctionTrap(FunctionFactory & factory)
|
||||
{
|
||||
factory.registerFunction<FunctionTrap>();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class FunctionFactory;
|
||||
void registerFunctionTrap(FunctionFactory &) {}
|
||||
}
|
||||
|
||||
#endif
|
@ -6,11 +6,6 @@
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include <IO/BufferWithOwnMemory.h>
|
||||
|
||||
#include <Common/config.h>
|
||||
#if USE_MIMALLOC
|
||||
#include <Common/MiAllocator.h>
|
||||
#endif
|
||||
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
@ -25,11 +20,7 @@ namespace DB
|
||||
|
||||
struct UncompressedCacheCell
|
||||
{
|
||||
#if USE_MIMALLOC
|
||||
Memory<MiAllocator> data;
|
||||
#else
|
||||
Memory<> data;
|
||||
#endif
|
||||
size_t compressed_size;
|
||||
UInt32 additional_bytes;
|
||||
};
|
||||
|
@ -10,34 +10,38 @@ namespace ProfileEvents
|
||||
{
|
||||
extern const Event CreatedReadBufferOrdinary;
|
||||
extern const Event CreatedReadBufferAIO;
|
||||
extern const Event CreatedReadBufferAIOFailed;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
#if !defined(__linux__)
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
}
|
||||
#endif
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(const std::string & filename_, size_t estimated_size,
|
||||
size_t aio_threshold, size_t buffer_size_, int flags_, char * existing_memory_, size_t alignment)
|
||||
{
|
||||
if ((aio_threshold == 0) || (estimated_size < aio_threshold))
|
||||
#if defined(__linux__) || defined(__FreeBSD__)
|
||||
if (aio_threshold && estimated_size >= aio_threshold)
|
||||
{
|
||||
/// Attempt to open a file with O_DIRECT
|
||||
try
|
||||
{
|
||||
auto res = std::make_unique<ReadBufferAIO>(filename_, buffer_size_, flags_, existing_memory_);
|
||||
ProfileEvents::increment(ProfileEvents::CreatedReadBufferAIO);
|
||||
return res;
|
||||
}
|
||||
catch (const ErrnoException &)
|
||||
{
|
||||
/// Fallback to cached IO if O_DIRECT is not supported.
|
||||
ProfileEvents::increment(ProfileEvents::CreatedReadBufferAIOFailed);
|
||||
}
|
||||
}
|
||||
#else
|
||||
(void)aio_threshold;
|
||||
(void)estimated_size;
|
||||
#endif
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::CreatedReadBufferOrdinary);
|
||||
return std::make_unique<ReadBufferFromFile>(filename_, buffer_size_, flags_, existing_memory_, alignment);
|
||||
}
|
||||
else
|
||||
{
|
||||
#if defined(__linux__) || defined(__FreeBSD__)
|
||||
ProfileEvents::increment(ProfileEvents::CreatedReadBufferAIO);
|
||||
return std::make_unique<ReadBufferAIO>(filename_, buffer_size_, flags_, existing_memory_);
|
||||
#else
|
||||
throw Exception("AIO is implemented only on Linux and FreeBSD", ErrorCodes::NOT_IMPLEMENTED);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -10,36 +10,39 @@ namespace ProfileEvents
|
||||
{
|
||||
extern const Event CreatedWriteBufferOrdinary;
|
||||
extern const Event CreatedWriteBufferAIO;
|
||||
extern const Event CreatedWriteBufferAIOFailed;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
#if !defined(__linux__)
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
}
|
||||
#endif
|
||||
|
||||
std::unique_ptr<WriteBufferFromFileBase> createWriteBufferFromFileBase(const std::string & filename_, size_t estimated_size,
|
||||
size_t aio_threshold, size_t buffer_size_, int flags_, mode_t mode, char * existing_memory_,
|
||||
size_t alignment)
|
||||
{
|
||||
if ((aio_threshold == 0) || (estimated_size < aio_threshold))
|
||||
#if defined(__linux__) || defined(__FreeBSD__)
|
||||
if (aio_threshold && estimated_size >= aio_threshold)
|
||||
{
|
||||
/// Attempt to open a file with O_DIRECT
|
||||
try
|
||||
{
|
||||
auto res = std::make_unique<WriteBufferAIO>(filename_, buffer_size_, flags_, mode, existing_memory_);
|
||||
ProfileEvents::increment(ProfileEvents::CreatedWriteBufferAIO);
|
||||
return res;
|
||||
}
|
||||
catch (const ErrnoException &)
|
||||
{
|
||||
/// Fallback to cached IO if O_DIRECT is not supported.
|
||||
ProfileEvents::increment(ProfileEvents::CreatedWriteBufferAIOFailed);
|
||||
}
|
||||
}
|
||||
#else
|
||||
(void)aio_threshold;
|
||||
(void)estimated_size;
|
||||
#endif
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::CreatedWriteBufferOrdinary);
|
||||
return std::make_unique<WriteBufferFromFile>(filename_, buffer_size_, flags_, mode, existing_memory_, alignment);
|
||||
}
|
||||
else
|
||||
{
|
||||
#if defined(__linux__) || defined(__FreeBSD__)
|
||||
ProfileEvents::increment(ProfileEvents::CreatedWriteBufferAIO);
|
||||
return std::make_unique<WriteBufferAIO>(filename_, buffer_size_, flags_, mode, existing_memory_);
|
||||
#else
|
||||
throw Exception("AIO is implemented only on Linux and FreeBSD", ErrorCodes::NOT_IMPLEMENTED);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1132,7 +1132,7 @@ void Context::updateSettingsChanges(const SettingsChanges & changes)
|
||||
if (change.name == "profile")
|
||||
setProfile(change.value.safeGet<String>());
|
||||
else
|
||||
settings.updateFromChange(change);
|
||||
settings.applyChange(change);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1162,6 +1162,12 @@ String Context::getCurrentQueryId() const
|
||||
}
|
||||
|
||||
|
||||
String Context::getInitialQueryId() const
|
||||
{
|
||||
return client_info.initial_query_id;
|
||||
}
|
||||
|
||||
|
||||
void Context::setCurrentDatabase(const String & name)
|
||||
{
|
||||
auto lock = getLock();
|
||||
|
@ -264,6 +264,10 @@ public:
|
||||
|
||||
String getCurrentDatabase() const;
|
||||
String getCurrentQueryId() const;
|
||||
|
||||
/// Id of initiating query for distributed queries; or current query id if it's not a distributed query.
|
||||
String getInitialQueryId() const;
|
||||
|
||||
void setCurrentDatabase(const String & name);
|
||||
void setCurrentQueryId(const String & query_id);
|
||||
|
||||
|
@ -1,6 +1,5 @@
|
||||
#include "ExternalLoader.h"
|
||||
|
||||
#include <cmath>
|
||||
#include <mutex>
|
||||
#include <pcg_random.hpp>
|
||||
#include <common/DateLUT.h>
|
||||
@ -933,6 +932,8 @@ private:
|
||||
class ExternalLoader::PeriodicUpdater : private boost::noncopyable
|
||||
{
|
||||
public:
|
||||
static constexpr UInt64 check_period_sec = 5;
|
||||
|
||||
PeriodicUpdater(ConfigFilesReader & config_files_reader_, LoadingDispatcher & loading_dispatcher_)
|
||||
: config_files_reader(config_files_reader_), loading_dispatcher(loading_dispatcher_)
|
||||
{
|
||||
@ -940,11 +941,10 @@ public:
|
||||
|
||||
~PeriodicUpdater() { enable(false); }
|
||||
|
||||
void enable(bool enable_, const ExternalLoaderUpdateSettings & settings_ = {})
|
||||
void enable(bool enable_)
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
enabled = enable_;
|
||||
settings = settings_;
|
||||
|
||||
if (enable_)
|
||||
{
|
||||
@ -985,9 +985,7 @@ public:
|
||||
return std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)};
|
||||
}
|
||||
|
||||
std::uniform_int_distribution<UInt64> distribution(0, static_cast<UInt64>(std::exp2(error_count - 1)));
|
||||
std::chrono::seconds delay(std::min<UInt64>(settings.backoff_max_sec, settings.backoff_initial_sec + distribution(rnd_engine)));
|
||||
return std::chrono::system_clock::now() + delay;
|
||||
return std::chrono::system_clock::now() + std::chrono::seconds(ExternalLoadableBackoff{}.calculateDuration(rnd_engine, error_count));
|
||||
}
|
||||
|
||||
private:
|
||||
@ -996,9 +994,8 @@ private:
|
||||
setThreadName("ExterLdrReload");
|
||||
|
||||
std::unique_lock lock{mutex};
|
||||
auto timeout = [this] { return std::chrono::seconds(settings.check_period_sec); };
|
||||
auto pred = [this] { return !enabled; };
|
||||
while (!event.wait_for(lock, timeout(), pred))
|
||||
while (!event.wait_for(lock, std::chrono::seconds(check_period_sec), pred))
|
||||
{
|
||||
lock.unlock();
|
||||
loading_dispatcher.setConfiguration(config_files_reader.read());
|
||||
@ -1012,7 +1009,6 @@ private:
|
||||
|
||||
mutable std::mutex mutex;
|
||||
bool enabled = false;
|
||||
ExternalLoaderUpdateSettings settings;
|
||||
ThreadFromGlobalPool thread;
|
||||
std::condition_variable event;
|
||||
mutable pcg64 rnd_engine{randomSeed()};
|
||||
@ -1051,9 +1047,9 @@ void ExternalLoader::enableAsyncLoading(bool enable)
|
||||
loading_dispatcher->enableAsyncLoading(enable);
|
||||
}
|
||||
|
||||
void ExternalLoader::enablePeriodicUpdates(bool enable_, const ExternalLoaderUpdateSettings & settings_)
|
||||
void ExternalLoader::enablePeriodicUpdates(bool enable_)
|
||||
{
|
||||
periodic_updater->enable(enable_, settings_);
|
||||
periodic_updater->enable(enable_);
|
||||
}
|
||||
|
||||
bool ExternalLoader::hasCurrentlyLoadedObjects() const
|
||||
|
@ -11,19 +11,6 @@
|
||||
|
||||
namespace DB
|
||||
{
|
||||
struct ExternalLoaderUpdateSettings
|
||||
{
|
||||
UInt64 check_period_sec = 5;
|
||||
UInt64 backoff_initial_sec = 5;
|
||||
/// 10 minutes
|
||||
UInt64 backoff_max_sec = 10 * 60;
|
||||
|
||||
ExternalLoaderUpdateSettings() = default;
|
||||
ExternalLoaderUpdateSettings(UInt64 check_period_sec_, UInt64 backoff_initial_sec_, UInt64 backoff_max_sec_)
|
||||
: check_period_sec(check_period_sec_), backoff_initial_sec(backoff_initial_sec_), backoff_max_sec(backoff_max_sec_) {}
|
||||
};
|
||||
|
||||
|
||||
/* External configuration structure.
|
||||
*
|
||||
* <external_group>
|
||||
@ -105,7 +92,7 @@ public:
|
||||
void enableAsyncLoading(bool enable);
|
||||
|
||||
/// Sets settings for periodic updates.
|
||||
void enablePeriodicUpdates(bool enable, const ExternalLoaderUpdateSettings & settings = {});
|
||||
void enablePeriodicUpdates(bool enable);
|
||||
|
||||
/// Returns the status of the object.
|
||||
/// If the object has not been loaded yet then the function returns Status::NOT_LOADED.
|
||||
|
@ -1,7 +1,7 @@
|
||||
#include <Interpreters/IExternalLoadable.h>
|
||||
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
|
||||
#include <cmath>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -16,4 +16,13 @@ ExternalLoadableLifetime::ExternalLoadableLifetime(const Poco::Util::AbstractCon
|
||||
max_sec = has_min ? config.getUInt64(config_prefix + ".max") : min_sec;
|
||||
}
|
||||
|
||||
|
||||
UInt64 ExternalLoadableBackoff::calculateDuration(pcg64 & rnd_engine, size_t error_count) const
|
||||
{
|
||||
if (error_count < 1)
|
||||
error_count = 1;
|
||||
std::uniform_int_distribution<UInt64> distribution(0, static_cast<UInt64>(std::exp2(error_count - 1)));
|
||||
return std::min<UInt64>(backoff_max_sec, backoff_initial_sec + distribution(rnd_engine));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include <string>
|
||||
#include <memory>
|
||||
#include <boost/noncopyable.hpp>
|
||||
#include <pcg_random.hpp>
|
||||
#include <Core/Types.h>
|
||||
|
||||
|
||||
@ -25,6 +26,17 @@ struct ExternalLoadableLifetime
|
||||
};
|
||||
|
||||
|
||||
/// Delay before trying to load again after error.
|
||||
struct ExternalLoadableBackoff
|
||||
{
|
||||
UInt64 backoff_initial_sec = 5;
|
||||
UInt64 backoff_max_sec = 10 * 60; /// 10 minutes
|
||||
|
||||
/// Calculates time to try loading again after error.
|
||||
UInt64 calculateDuration(pcg64 & rnd_engine, size_t error_count = 1) const;
|
||||
};
|
||||
|
||||
|
||||
/// Basic interface for external loadable objects. Is used in ExternalLoader.
|
||||
class IExternalLoadable : public std::enable_shared_from_this<IExternalLoadable>, private boost::noncopyable
|
||||
{
|
||||
|
@ -92,7 +92,7 @@ BlockInputStreamPtr InterpreterDescribeQuery::executeImpl()
|
||||
table = context.getTable(database_name, table_name);
|
||||
}
|
||||
|
||||
auto table_lock = table->lockStructureForShare(false, context.getCurrentQueryId());
|
||||
auto table_lock = table->lockStructureForShare(false, context.getInitialQueryId());
|
||||
columns = table->getColumns();
|
||||
}
|
||||
|
||||
|
@ -100,7 +100,7 @@ BlockIO InterpreterInsertQuery::execute()
|
||||
checkAccess(query);
|
||||
StoragePtr table = getTable(query);
|
||||
|
||||
auto table_lock = table->lockStructureForShare(true, context.getCurrentQueryId());
|
||||
auto table_lock = table->lockStructureForShare(true, context.getInitialQueryId());
|
||||
|
||||
/// We create a pipeline of several streams, into which we will write data.
|
||||
BlockOutputStreamPtr out;
|
||||
|
@ -265,11 +265,6 @@ Block InterpreterKillQueryQuery::getSelectResult(const String & columns, const S
|
||||
if (where_expression)
|
||||
select_query += " WHERE " + queryToString(where_expression);
|
||||
|
||||
auto use_processors = context.getSettingsRef().experimental_use_processors;
|
||||
context.getSettingsRef().experimental_use_processors = false;
|
||||
|
||||
SCOPE_EXIT(context.getSettingsRef().experimental_use_processors = use_processors);
|
||||
|
||||
BlockIO block_io = executeQuery(select_query, context, true);
|
||||
Block res = block_io.in->read();
|
||||
|
||||
|
@ -26,8 +26,6 @@ struct RenameDescription
|
||||
to_table_name(elem.to.table)
|
||||
{}
|
||||
|
||||
TableStructureWriteLockHolder from_table_lock;
|
||||
|
||||
String from_database_name;
|
||||
String from_table_name;
|
||||
|
||||
@ -77,8 +75,6 @@ BlockIO InterpreterRenameQuery::execute()
|
||||
}
|
||||
};
|
||||
|
||||
std::map<UniqueTableName, TableStructureWriteLockHolder> tables_from_locks;
|
||||
|
||||
/// Don't allow to drop tables (that we are renaming); don't allow to create tables in places where tables will be renamed.
|
||||
std::map<UniqueTableName, std::unique_ptr<DDLGuard>> table_guards;
|
||||
|
||||
@ -89,36 +85,26 @@ BlockIO InterpreterRenameQuery::execute()
|
||||
UniqueTableName from(descriptions.back().from_database_name, descriptions.back().from_table_name);
|
||||
UniqueTableName to(descriptions.back().to_database_name, descriptions.back().to_table_name);
|
||||
|
||||
if (!tables_from_locks.count(from))
|
||||
if (auto table = context.tryGetTable(from.database_name, from.table_name))
|
||||
tables_from_locks.emplace(from, table->lockExclusively(context.getCurrentQueryId()));
|
||||
|
||||
descriptions.back().from_table_lock = tables_from_locks[from];
|
||||
|
||||
if (!table_guards.count(from))
|
||||
table_guards.emplace(from, context.getDDLGuard(from.database_name, from.table_name));
|
||||
|
||||
if (!table_guards.count(to))
|
||||
table_guards.emplace(to, context.getDDLGuard(to.database_name, to.table_name));
|
||||
table_guards[from];
|
||||
table_guards[to];
|
||||
}
|
||||
|
||||
/** All tables are locked. If there are more than one rename in chain,
|
||||
* we need to hold global lock while doing all renames. Order matters to avoid deadlocks.
|
||||
* It provides atomicity of all RENAME chain as a whole, from the point of view of DBMS client,
|
||||
* but only in cases when there was no exceptions during this process and server does not fall.
|
||||
*/
|
||||
|
||||
decltype(context.getLock()) lock;
|
||||
|
||||
if (descriptions.size() > 1)
|
||||
lock = context.getLock();
|
||||
/// Must do it in consistent order.
|
||||
for (auto & table_guard : table_guards)
|
||||
table_guard.second = context.getDDLGuard(table_guard.first.database_name, table_guard.first.table_name);
|
||||
|
||||
for (auto & elem : descriptions)
|
||||
{
|
||||
context.assertTableDoesntExist(elem.to_database_name, elem.to_table_name);
|
||||
auto from_table = context.getTable(elem.from_database_name, elem.from_table_name);
|
||||
auto from_table_lock = from_table->lockExclusively(context.getCurrentQueryId());
|
||||
|
||||
context.getDatabase(elem.from_database_name)->renameTable(
|
||||
context, elem.from_table_name, *context.getDatabase(elem.to_database_name), elem.to_table_name, elem.from_table_lock);
|
||||
context,
|
||||
elem.from_table_name,
|
||||
*context.getDatabase(elem.to_database_name),
|
||||
elem.to_table_name,
|
||||
from_table_lock);
|
||||
}
|
||||
|
||||
return {};
|
||||
|
@ -294,7 +294,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
|
||||
}
|
||||
|
||||
if (storage)
|
||||
table_lock = storage->lockStructureForShare(false, context.getCurrentQueryId());
|
||||
table_lock = storage->lockStructureForShare(false, context.getInitialQueryId());
|
||||
|
||||
syntax_analyzer_result = SyntaxAnalyzer(context, options).analyze(
|
||||
query_ptr, source_header.getNamesAndTypesList(), required_result_column_names, storage, NamesAndTypesList());
|
||||
|
@ -444,6 +444,7 @@ QueryStatusInfo QueryStatus::getInfo(bool get_thread_list, bool get_profile_even
|
||||
{
|
||||
std::lock_guard lock(thread_group->mutex);
|
||||
res.thread_numbers = thread_group->thread_numbers;
|
||||
res.os_thread_ids = thread_group->os_thread_ids;
|
||||
}
|
||||
|
||||
if (get_profile_events)
|
||||
|
@ -66,6 +66,7 @@ struct QueryStatusInfo
|
||||
|
||||
/// Optional fields, filled by request
|
||||
std::vector<UInt32> thread_numbers;
|
||||
std::vector<UInt32> os_thread_ids;
|
||||
std::shared_ptr<ProfileEvents::Counters> profile_counters;
|
||||
std::shared_ptr<Settings> query_settings;
|
||||
};
|
||||
|
@ -78,6 +78,7 @@ Block QueryLogElement::createBlock()
|
||||
{std::make_shared<DataTypeUInt32>(), "revision"},
|
||||
|
||||
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt32>()), "thread_numbers"},
|
||||
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt32>()), "os_thread_ids"},
|
||||
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "ProfileEvents.Names"},
|
||||
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>()), "ProfileEvents.Values"},
|
||||
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "Settings.Names"},
|
||||
@ -123,6 +124,14 @@ void QueryLogElement::appendToBlock(Block & block) const
|
||||
columns[i++]->insert(threads_array);
|
||||
}
|
||||
|
||||
{
|
||||
Array threads_array;
|
||||
threads_array.reserve(os_thread_ids.size());
|
||||
for (const UInt32 thread_number : os_thread_ids)
|
||||
threads_array.emplace_back(UInt64(thread_number));
|
||||
columns[i++]->insert(threads_array);
|
||||
}
|
||||
|
||||
if (profile_counters)
|
||||
{
|
||||
auto column_names = columns[i++].get();
|
||||
|
@ -60,6 +60,7 @@ struct QueryLogElement
|
||||
ClientInfo client_info;
|
||||
|
||||
std::vector<UInt32> thread_numbers;
|
||||
std::vector<UInt32> os_thread_ids;
|
||||
std::shared_ptr<ProfileEvents::Counters> profile_counters;
|
||||
std::shared_ptr<Settings> query_settings;
|
||||
|
||||
|
@ -61,6 +61,7 @@ void ThreadStatus::initializeQuery()
|
||||
thread_group->memory_tracker.setDescription("(for query)");
|
||||
|
||||
thread_group->thread_numbers.emplace_back(thread_number);
|
||||
thread_group->os_thread_ids.emplace_back(os_thread_id);
|
||||
thread_group->master_thread_number = thread_number;
|
||||
thread_group->master_thread_os_id = os_thread_id;
|
||||
|
||||
@ -99,6 +100,7 @@ void ThreadStatus::attachQuery(const ThreadGroupStatusPtr & thread_group_, bool
|
||||
|
||||
/// NOTE: A thread may be attached multiple times if it is reused from a thread pool.
|
||||
thread_group->thread_numbers.emplace_back(thread_number);
|
||||
thread_group->os_thread_ids.emplace_back(os_thread_id);
|
||||
}
|
||||
|
||||
if (query_context)
|
||||
|
@ -400,6 +400,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
}
|
||||
|
||||
elem.thread_numbers = std::move(info.thread_numbers);
|
||||
elem.os_thread_ids = std::move(info.os_thread_ids);
|
||||
elem.profile_counters = std::move(info.profile_counters);
|
||||
|
||||
if (log_queries)
|
||||
@ -437,6 +438,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
elem.memory_usage = info.peak_memory_usage > 0 ? info.peak_memory_usage : 0;
|
||||
|
||||
elem.thread_numbers = std::move(info.thread_numbers);
|
||||
elem.os_thread_ids = std::move(info.os_thread_ids);
|
||||
elem.profile_counters = std::move(info.profile_counters);
|
||||
}
|
||||
|
||||
|
@ -543,15 +543,8 @@ void AlterCommands::validate(const IStorage & table, const Context & context)
|
||||
}
|
||||
}
|
||||
else if (command.type == AlterCommand::MODIFY_SETTING)
|
||||
{
|
||||
for (const auto & change : command.settings_changes)
|
||||
{
|
||||
if (!table.hasSetting(change.name))
|
||||
{
|
||||
throw Exception{"Storage '" + table.getName() + "' doesn't have setting '" + change.name + "'", ErrorCodes::UNKNOWN_SETTING};
|
||||
}
|
||||
}
|
||||
}
|
||||
table.checkSettingCanBeChanged(change.name);
|
||||
}
|
||||
|
||||
/** Existing defaulted columns may require default expression extensions with a type conversion,
|
||||
|
@ -309,11 +309,10 @@ bool IStorage::isVirtualColumn(const String & column_name) const
|
||||
return getColumns().get(column_name).is_virtual;
|
||||
}
|
||||
|
||||
bool IStorage::hasSetting(const String & /* setting_name */) const
|
||||
void IStorage::checkSettingCanBeChanged(const String & /* setting_name */) const
|
||||
{
|
||||
if (!supportsSettings())
|
||||
throw Exception("Storage '" + getName() + "' doesn't support settings.", ErrorCodes::SETTINGS_ARE_NOT_SUPPORTED);
|
||||
return false;
|
||||
}
|
||||
|
||||
TableStructureReadLockHolder IStorage::lockStructureForShare(bool will_add_new_data, const String & query_id)
|
||||
@ -381,17 +380,14 @@ IDatabase::ASTModifier IStorage::getSettingsModifier(const SettingsChanges & new
|
||||
/// Make storage settings unique
|
||||
for (const auto & change : new_changes)
|
||||
{
|
||||
if (hasSetting(change.name))
|
||||
{
|
||||
checkSettingCanBeChanged(change.name);
|
||||
|
||||
auto finder = [&change] (const SettingChange & c) { return c.name == change.name; };
|
||||
if (auto it = std::find_if(storage_changes.begin(), storage_changes.end(), finder); it != storage_changes.end())
|
||||
it->value = change.value;
|
||||
else
|
||||
storage_changes.push_back(change);
|
||||
}
|
||||
else
|
||||
throw Exception{"Storage '" + getName() + "' doesn't have setting '" + change.name + "'", ErrorCodes::UNKNOWN_SETTING};
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -139,8 +139,8 @@ public: /// thread-unsafe part. lockStructure must be acquired
|
||||
/// If |need_all| is set, then checks that all the columns of the table are in the block.
|
||||
void check(const Block & block, bool need_all = false) const;
|
||||
|
||||
/// Check storage has setting. Exception will be thrown if it doesn't support settings at all.
|
||||
virtual bool hasSetting(const String & setting_name) const;
|
||||
/// Check storage has setting and setting can be modified.
|
||||
virtual void checkSettingCanBeChanged(const String & setting_name) const;
|
||||
|
||||
protected: /// still thread-unsafe part.
|
||||
void setIndices(IndicesDescription indices_);
|
||||
@ -150,7 +150,7 @@ protected: /// still thread-unsafe part.
|
||||
virtual bool isVirtualColumn(const String & column_name) const;
|
||||
|
||||
/// Returns modifier of settings in storage definition
|
||||
virtual IDatabase::ASTModifier getSettingsModifier(const SettingsChanges & new_changes) const;
|
||||
IDatabase::ASTModifier getSettingsModifier(const SettingsChanges & new_changes) const;
|
||||
|
||||
private:
|
||||
ColumnsDescription columns; /// combined real and virtual columns
|
||||
|
@ -22,7 +22,7 @@ void KafkaSettings::loadFromQuery(ASTStorage & storage_def)
|
||||
{
|
||||
try
|
||||
{
|
||||
loadFromChanges(storage_def.settings->changes);
|
||||
applyChanges(storage_def.settings->changes);
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
|
@ -15,18 +15,17 @@ struct KafkaSettings : public SettingsCollection<KafkaSettings>
|
||||
{
|
||||
|
||||
|
||||
/// M (mutable) for normal settings, IM (immutable) for not updateable settings.
|
||||
#define LIST_OF_KAFKA_SETTINGS(M, IM) \
|
||||
IM(SettingString, kafka_broker_list, "", "A comma-separated list of brokers for Kafka engine.") \
|
||||
IM(SettingString, kafka_topic_list, "", "A list of Kafka topics.") \
|
||||
IM(SettingString, kafka_group_name, "", "A group of Kafka consumers.") \
|
||||
IM(SettingString, kafka_format, "", "The message format for Kafka engine.") \
|
||||
IM(SettingChar, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.") \
|
||||
IM(SettingString, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine") \
|
||||
IM(SettingUInt64, kafka_num_consumers, 1, "The number of consumers per table for Kafka engine.") \
|
||||
IM(SettingUInt64, kafka_max_block_size, 0, "The maximum block size per table for Kafka engine.") \
|
||||
IM(SettingUInt64, kafka_skip_broken_messages, 0, "Skip at least this number of broken messages from Kafka topic per block") \
|
||||
IM(SettingUInt64, kafka_commit_every_batch, 0, "Commit every consumed and handled batch instead of a single commit after writing a whole block")
|
||||
#define LIST_OF_KAFKA_SETTINGS(M) \
|
||||
M(SettingString, kafka_broker_list, "", "A comma-separated list of brokers for Kafka engine.") \
|
||||
M(SettingString, kafka_topic_list, "", "A list of Kafka topics.") \
|
||||
M(SettingString, kafka_group_name, "", "A group of Kafka consumers.") \
|
||||
M(SettingString, kafka_format, "", "The message format for Kafka engine.") \
|
||||
M(SettingChar, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.") \
|
||||
M(SettingString, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine") \
|
||||
M(SettingUInt64, kafka_num_consumers, 1, "The number of consumers per table for Kafka engine.") \
|
||||
M(SettingUInt64, kafka_max_block_size, 0, "The maximum block size per table for Kafka engine.") \
|
||||
M(SettingUInt64, kafka_skip_broken_messages, 0, "Skip at least this number of broken messages from Kafka topic per block") \
|
||||
M(SettingUInt64, kafka_commit_every_batch, 0, "Commit every consumed and handled batch instead of a single commit after writing a whole block")
|
||||
|
||||
DECLARE_SETTINGS_COLLECTION(LIST_OF_KAFKA_SETTINGS)
|
||||
|
||||
|
@ -44,6 +44,8 @@ namespace ErrorCodes
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
extern const int UNSUPPORTED_METHOD;
|
||||
extern const int UNKNOWN_SETTING;
|
||||
extern const int READONLY_SETTING;
|
||||
}
|
||||
|
||||
namespace
|
||||
@ -410,14 +412,12 @@ bool StorageKafka::streamToViews()
|
||||
}
|
||||
|
||||
|
||||
bool StorageKafka::hasSetting(const String & setting_name) const
|
||||
void StorageKafka::checkSettingCanBeChanged(const String & setting_name) const
|
||||
{
|
||||
return KafkaSettings::findIndex(setting_name) != KafkaSettings::npos;
|
||||
}
|
||||
if (KafkaSettings::findIndex(setting_name) == KafkaSettings::npos)
|
||||
throw Exception{"Storage '" + getName() + "' doesn't have setting '" + setting_name + "'", ErrorCodes::UNKNOWN_SETTING};
|
||||
|
||||
IDatabase::ASTModifier StorageKafka::getSettingsModifier(const SettingsChanges & /* new_changes */) const
|
||||
{
|
||||
throw Exception("Storage '" + getName() + "' doesn't support settings alter", ErrorCodes::UNSUPPORTED_METHOD);
|
||||
throw Exception{"Setting '" + setting_name + "' is readonly for storage '" + getName() + "'", ErrorCodes::READONLY_SETTING};
|
||||
}
|
||||
|
||||
void registerStorageKafka(StorageFactory & factory)
|
||||
|
@ -57,8 +57,7 @@ public:
|
||||
const auto & getSchemaName() const { return schema_name; }
|
||||
const auto & skipBroken() const { return skip_broken; }
|
||||
|
||||
bool hasSetting(const String & setting_name) const override;
|
||||
|
||||
void checkSettingCanBeChanged(const String & setting_name) const override;
|
||||
|
||||
protected:
|
||||
StorageKafka(
|
||||
@ -71,7 +70,6 @@ protected:
|
||||
size_t num_consumers_, UInt64 max_block_size_, size_t skip_broken,
|
||||
bool intermediate_commit_);
|
||||
|
||||
IDatabase::ASTModifier getSettingsModifier(const SettingsChanges & new_changes) const override;
|
||||
private:
|
||||
// Configuration and state
|
||||
String table_name;
|
||||
|
@ -593,7 +593,7 @@ void registerStorageLiveView(StorageFactory & factory)
|
||||
{
|
||||
factory.registerStorage("LiveView", [](const StorageFactory::Arguments & args)
|
||||
{
|
||||
if (!args.local_context.getSettingsRef().allow_experimental_live_view)
|
||||
if (!args.attach && !args.local_context.getSettingsRef().allow_experimental_live_view)
|
||||
throw Exception("Experimental LIVE VIEW feature is not enabled (the setting 'allow_experimental_live_view')", ErrorCodes::SUPPORT_IS_DISABLED);
|
||||
|
||||
return StorageLiveView::create(args.table_name, args.database_name, args.local_context, args.query, args.columns);
|
||||
|
@ -333,7 +333,7 @@ void IMergedBlockOutputStream::calculateAndSerializeSkipIndices(
|
||||
{
|
||||
/// Creating block for update
|
||||
Block indices_update_block(skip_indexes_columns);
|
||||
size_t skip_index_current_mark = 0;
|
||||
size_t skip_index_current_data_mark = 0;
|
||||
|
||||
/// Filling and writing skip indices like in IMergedBlockOutputStream::writeColumn
|
||||
for (size_t i = 0; i < skip_indices.size(); ++i)
|
||||
@ -341,7 +341,7 @@ void IMergedBlockOutputStream::calculateAndSerializeSkipIndices(
|
||||
const auto index = skip_indices[i];
|
||||
auto & stream = *skip_indices_streams[i];
|
||||
size_t prev_pos = 0;
|
||||
skip_index_current_mark = skip_index_mark;
|
||||
skip_index_current_data_mark = skip_index_data_mark;
|
||||
while (prev_pos < rows)
|
||||
{
|
||||
UInt64 limit = 0;
|
||||
@ -351,7 +351,7 @@ void IMergedBlockOutputStream::calculateAndSerializeSkipIndices(
|
||||
}
|
||||
else
|
||||
{
|
||||
limit = index_granularity.getMarkRows(skip_index_current_mark);
|
||||
limit = index_granularity.getMarkRows(skip_index_current_data_mark);
|
||||
if (skip_indices_aggregators[i]->empty())
|
||||
{
|
||||
skip_indices_aggregators[i] = index->createIndexAggregator();
|
||||
@ -366,9 +366,9 @@ void IMergedBlockOutputStream::calculateAndSerializeSkipIndices(
|
||||
/// to be compatible with normal .mrk2 file format
|
||||
if (can_use_adaptive_granularity)
|
||||
writeIntBinary(1UL, stream.marks);
|
||||
|
||||
++skip_index_current_mark;
|
||||
}
|
||||
/// this mark is aggregated, go to the next one
|
||||
skip_index_current_data_mark++;
|
||||
}
|
||||
|
||||
size_t pos = prev_pos;
|
||||
@ -388,7 +388,7 @@ void IMergedBlockOutputStream::calculateAndSerializeSkipIndices(
|
||||
prev_pos = pos;
|
||||
}
|
||||
}
|
||||
skip_index_mark = skip_index_current_mark;
|
||||
skip_index_data_mark = skip_index_current_data_mark;
|
||||
}
|
||||
|
||||
void IMergedBlockOutputStream::finishSkipIndicesSerialization(
|
||||
|
@ -141,7 +141,10 @@ protected:
|
||||
size_t aio_threshold;
|
||||
|
||||
size_t current_mark = 0;
|
||||
size_t skip_index_mark = 0;
|
||||
|
||||
/// Number of mark in data from which skip indices have to start
|
||||
/// aggregation. I.e. it's data mark number, not skip indices mark.
|
||||
size_t skip_index_data_mark = 0;
|
||||
|
||||
const bool can_use_adaptive_granularity;
|
||||
const std::string marks_file_extension;
|
||||
|
@ -91,6 +91,7 @@ namespace ErrorCodes
|
||||
extern const int INCORRECT_FILE_NAME;
|
||||
extern const int BAD_DATA_PART_NAME;
|
||||
extern const int UNKNOWN_SETTING;
|
||||
extern const int READONLY_SETTING;
|
||||
}
|
||||
|
||||
|
||||
@ -1324,10 +1325,7 @@ void MergeTreeData::checkAlter(const AlterCommands & commands, const Context & c
|
||||
setTTLExpressions(new_columns.getColumnTTLs(), new_ttl_table_ast, /* only_check = */ true);
|
||||
|
||||
for (const auto & setting : new_changes)
|
||||
{
|
||||
if (!hasSetting(setting.name))
|
||||
throw Exception{"Storage '" + getName() + "' doesn't have setting '" + setting.name + "'", ErrorCodes::UNKNOWN_SETTING};
|
||||
}
|
||||
checkSettingCanBeChanged(setting.name);
|
||||
|
||||
/// Check that type conversions are possible.
|
||||
ExpressionActionsPtr unused_expression;
|
||||
@ -1654,14 +1652,18 @@ void MergeTreeData::changeSettings(
|
||||
if (!new_changes.empty())
|
||||
{
|
||||
MergeTreeSettings copy = *getSettings();
|
||||
copy.updateFromChanges(new_changes);
|
||||
copy.applyChanges(new_changes);
|
||||
storage_settings.set(std::make_unique<const MergeTreeSettings>(copy));
|
||||
}
|
||||
}
|
||||
|
||||
bool MergeTreeData::hasSetting(const String & setting_name) const
|
||||
void MergeTreeData::checkSettingCanBeChanged(const String & setting_name) const
|
||||
{
|
||||
return MergeTreeSettings::findIndex(setting_name) != MergeTreeSettings::npos;
|
||||
if (MergeTreeSettings::findIndex(setting_name) == MergeTreeSettings::npos)
|
||||
throw Exception{"Storage '" + getName() + "' doesn't have setting '" + setting_name + "'", ErrorCodes::UNKNOWN_SETTING};
|
||||
if (MergeTreeSettings::isReadonlySetting(setting_name))
|
||||
throw Exception{"Setting '" + setting_name + "' is readonly for storage '" + getName() + "'", ErrorCodes::READONLY_SETTING};
|
||||
|
||||
}
|
||||
|
||||
void MergeTreeData::removeEmptyColumnsFromPart(MergeTreeData::MutableDataPartPtr & data_part)
|
||||
|
@ -543,7 +543,7 @@ public:
|
||||
TableStructureWriteLockHolder & table_lock_holder);
|
||||
|
||||
/// All MergeTreeData children have settings.
|
||||
bool hasSetting(const String & setting_name) const override;
|
||||
void checkSettingCanBeChanged(const String & setting_name) const override;
|
||||
|
||||
/// Remove columns, that have been markedd as empty after zeroing values with expired ttl
|
||||
void removeEmptyColumnsFromPart(MergeTreeData::MutableDataPartPtr & data_part);
|
||||
|
@ -561,7 +561,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
|
||||
NamesAndTypesList all_columns = data.getColumns().getAllPhysical();
|
||||
const auto data_settings = data.getSettings();
|
||||
|
||||
NamesAndTypesList gathering_columns, merging_columns;
|
||||
NamesAndTypesList gathering_columns;
|
||||
NamesAndTypesList merging_columns;
|
||||
Names gathering_column_names, merging_column_names;
|
||||
extractMergingAndGatheringColumns(
|
||||
all_columns, data.sorting_key_expr, data.skip_indices,
|
||||
|
@ -46,7 +46,7 @@ void MergeTreeSettings::loadFromQuery(ASTStorage & storage_def)
|
||||
{
|
||||
try
|
||||
{
|
||||
loadFromChanges(storage_def.settings->changes);
|
||||
applyChanges(storage_def.settings->changes);
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
#include <Core/Defines.h>
|
||||
#include <Core/SettingsCommon.h>
|
||||
#include <Common/SettingsChanges.h>
|
||||
|
||||
|
||||
namespace Poco
|
||||
@ -24,9 +25,8 @@ class ASTStorage;
|
||||
struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
|
||||
{
|
||||
|
||||
/// M (mutable) for normal settings, IM (immutable) for not updateable settings.
|
||||
#define LIST_OF_MERGE_TREE_SETTINGS(M, IM) \
|
||||
IM(SettingUInt64, index_granularity, 8192, "How many rows correspond to one primary key value.") \
|
||||
#define LIST_OF_MERGE_TREE_SETTINGS(M) \
|
||||
M(SettingUInt64, index_granularity, 8192, "How many rows correspond to one primary key value.") \
|
||||
\
|
||||
/** Merge settings. */ \
|
||||
M(SettingUInt64, max_bytes_to_merge_at_max_space_in_pool, 150ULL * 1024 * 1024 * 1024, "Maximum in total size of parts to merge, when there are maximum free threads in background pool (or entries in replication queue).") \
|
||||
@ -80,7 +80,7 @@ struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
|
||||
M(SettingBool, use_minimalistic_part_header_in_zookeeper, false, "Store part header (checksums and columns) in a compact format and a single part znode instead of separate znodes (<part>/columns and <part>/checksums). This can dramatically reduce snapshot size in ZooKeeper. Before enabling check that all replicas support new format.") \
|
||||
M(SettingUInt64, finished_mutations_to_keep, 100, "How many records about mutations that are done to keep. If zero, then keep all of them.") \
|
||||
M(SettingUInt64, min_merge_bytes_to_use_direct_io, 10ULL * 1024 * 1024 * 1024, "Minimal amount of bytes to enable O_DIRECT in merge (0 - disabled).") \
|
||||
IM(SettingUInt64, index_granularity_bytes, 10 * 1024 * 1024, "Approximate amount of bytes in single granule (0 - disabled).") \
|
||||
M(SettingUInt64, index_granularity_bytes, 10 * 1024 * 1024, "Approximate amount of bytes in single granule (0 - disabled).") \
|
||||
M(SettingInt64, merge_with_ttl_timeout, 3600 * 24, "Minimal time in seconds, when merge with TTL can be repeated.") \
|
||||
M(SettingBool, ttl_only_drop_parts, false, "Only drop altogether the expired parts and not partially prune them.") \
|
||||
M(SettingBool, write_final_mark, 1, "Write final mark after end of column (0 - disabled, do nothing if index_granularity_bytes=0)") \
|
||||
@ -99,6 +99,12 @@ struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
|
||||
|
||||
/// NOTE: will rewrite the AST to add immutable settings.
|
||||
void loadFromQuery(ASTStorage & storage_def);
|
||||
|
||||
/// We check settings after storage creation
|
||||
static bool isReadonlySetting(const String & name)
|
||||
{
|
||||
return name == "index_granularity" || name == "index_granularity_bytes";
|
||||
}
|
||||
};
|
||||
|
||||
using MergeTreeSettingsPtr = std::shared_ptr<const MergeTreeSettings>;
|
||||
|
@ -332,7 +332,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
|
||||
else if (skip_indexes_column_name_to_position.end() != skip_index_column_it)
|
||||
{
|
||||
const auto & index_column = *skip_indexes_columns[skip_index_column_it->second].column;
|
||||
writeColumn(column.name, *column.type, index_column, offset_columns, false, serialization_states[i], current_mark);
|
||||
std::tie(std::ignore, new_index_offset) = writeColumn(column.name, *column.type, index_column, offset_columns, false, serialization_states[i], current_mark);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -349,6 +349,8 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
|
||||
|
||||
rows_count += rows;
|
||||
|
||||
/// Should be written before index offset update, because we calculate,
|
||||
/// indices of currently written granules
|
||||
calculateAndSerializeSkipIndices(skip_indexes_columns, rows);
|
||||
|
||||
{
|
||||
|
@ -68,7 +68,6 @@ void MergedColumnOnlyOutputStream::write(const Block & block)
|
||||
if (!rows)
|
||||
return;
|
||||
|
||||
calculateAndSerializeSkipIndices(skip_indexes_columns, rows);
|
||||
|
||||
size_t new_index_offset = 0;
|
||||
size_t new_current_mark = 0;
|
||||
@ -79,6 +78,10 @@ void MergedColumnOnlyOutputStream::write(const Block & block)
|
||||
std::tie(new_current_mark, new_index_offset) = writeColumn(column.name, *column.type, *column.column, offset_columns, skip_offsets, serialization_states[i], current_mark);
|
||||
}
|
||||
|
||||
/// Should be written before index offset update, because we calculate,
|
||||
/// indices of currently written granules
|
||||
calculateAndSerializeSkipIndices(skip_indexes_columns, rows);
|
||||
|
||||
index_offset = new_index_offset;
|
||||
current_mark = new_current_mark;
|
||||
}
|
||||
@ -103,7 +106,6 @@ MergeTreeData::DataPart::Checksums MergedColumnOnlyOutputStream::writeSuffixAndG
|
||||
serialize_settings.getter = createStreamGetter(column.name, already_written_offset_columns, skip_offsets);
|
||||
column.type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]);
|
||||
|
||||
|
||||
if (with_final_mark)
|
||||
writeFinalMark(column.name, column.type, offset_columns, skip_offsets, serialize_settings.path);
|
||||
}
|
||||
|
@ -50,10 +50,11 @@ void StorageSystemDictionaries::fillData(MutableColumns & res_columns, const Con
|
||||
res_columns[i++]->insert(static_cast<Int8>(load_result.status));
|
||||
res_columns[i++]->insert(load_result.origin);
|
||||
|
||||
if (load_result.object)
|
||||
{
|
||||
const auto dict_ptr = std::static_pointer_cast<const IDictionaryBase>(load_result.object);
|
||||
std::exception_ptr last_exception = load_result.exception;
|
||||
|
||||
const auto dict_ptr = std::dynamic_pointer_cast<const IDictionaryBase>(load_result.object);
|
||||
if (dict_ptr)
|
||||
{
|
||||
res_columns[i++]->insert(dict_ptr->getTypeName());
|
||||
|
||||
const auto & dict_struct = dict_ptr->getStructure();
|
||||
@ -66,6 +67,9 @@ void StorageSystemDictionaries::fillData(MutableColumns & res_columns, const Con
|
||||
res_columns[i++]->insert(dict_ptr->getElementCount());
|
||||
res_columns[i++]->insert(dict_ptr->getLoadFactor());
|
||||
res_columns[i++]->insert(dict_ptr->getSource()->toString());
|
||||
|
||||
if (!last_exception)
|
||||
last_exception = dict_ptr->getLastException();
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -76,8 +80,8 @@ void StorageSystemDictionaries::fillData(MutableColumns & res_columns, const Con
|
||||
res_columns[i++]->insert(static_cast<UInt64>(std::chrono::system_clock::to_time_t(load_result.loading_start_time)));
|
||||
res_columns[i++]->insert(std::chrono::duration_cast<std::chrono::duration<float>>(load_result.loading_duration).count());
|
||||
|
||||
if (load_result.exception)
|
||||
res_columns[i++]->insert(getExceptionMessage(load_result.exception, false));
|
||||
if (last_exception)
|
||||
res_columns[i++]->insert(getExceptionMessage(last_exception, false));
|
||||
else
|
||||
res_columns[i++]->insertDefault();
|
||||
}
|
||||
|
@ -58,6 +58,7 @@ NamesAndTypesList StorageSystemProcesses::getNamesAndTypes()
|
||||
{"query", std::make_shared<DataTypeString>()},
|
||||
|
||||
{"thread_numbers", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt32>())},
|
||||
{"os_thread_ids", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt32>())},
|
||||
{"ProfileEvents.Names", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
|
||||
{"ProfileEvents.Values", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>())},
|
||||
{"Settings.Names", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
|
||||
@ -120,6 +121,14 @@ void StorageSystemProcesses::fillData(MutableColumns & res_columns, const Contex
|
||||
res_columns[i++]->insert(threads_array);
|
||||
}
|
||||
|
||||
{
|
||||
Array threads_array;
|
||||
threads_array.reserve(process.os_thread_ids.size());
|
||||
for (const UInt32 thread_number : process.os_thread_ids)
|
||||
threads_array.emplace_back(thread_number);
|
||||
res_columns[i++]->insert(threads_array);
|
||||
}
|
||||
|
||||
{
|
||||
IColumn * column_profile_events_names = res_columns[i++].get();
|
||||
IColumn * column_profile_events_values = res_columns[i++].get();
|
||||
|
@ -0,0 +1,31 @@
|
||||
<yandex>
|
||||
<dictionary>
|
||||
<name>cache_xypairs</name>
|
||||
<source>
|
||||
<clickhouse>
|
||||
<host>localhost</host>
|
||||
<port>9000</port>
|
||||
<user>default</user>
|
||||
<password></password>
|
||||
<db>test</db>
|
||||
<table>xypairs</table>
|
||||
</clickhouse>
|
||||
</source>
|
||||
<lifetime>1</lifetime>
|
||||
<layout>
|
||||
<cache>
|
||||
<size_in_cells>5</size_in_cells>
|
||||
</cache>
|
||||
</layout>
|
||||
<structure>
|
||||
<id>
|
||||
<name>x</name>
|
||||
</id>
|
||||
<attribute>
|
||||
<name>y</name>
|
||||
<type>UInt64</type>
|
||||
<null_value>0</null_value>
|
||||
</attribute>
|
||||
</structure>
|
||||
</dictionary>
|
||||
</yandex>
|
@ -17,6 +17,10 @@ def get_status(dictionary_name):
|
||||
return instance.query("SELECT status FROM system.dictionaries WHERE name='" + dictionary_name + "'").rstrip("\n")
|
||||
|
||||
|
||||
def get_last_exception(dictionary_name):
|
||||
return instance.query("SELECT last_exception FROM system.dictionaries WHERE name='" + dictionary_name + "'").rstrip("\n").replace("\\'", "'")
|
||||
|
||||
|
||||
def get_loading_start_time(dictionary_name):
|
||||
s = instance.query("SELECT loading_start_time FROM system.dictionaries WHERE name='" + dictionary_name + "'").rstrip("\n")
|
||||
if s == "0000-00-00 00:00:00":
|
||||
@ -350,3 +354,58 @@ def test_reload_after_fail_by_timer(started_cluster):
|
||||
time.sleep(6);
|
||||
query("SELECT dictGetInt32('no_file_2', 'a', toUInt64(9))") == "10\n"
|
||||
assert get_status("no_file_2") == "LOADED"
|
||||
|
||||
|
||||
def test_reload_after_fail_in_cache_dictionary(started_cluster):
|
||||
query = instance.query
|
||||
query_and_get_error = instance.query_and_get_error
|
||||
|
||||
# Can't get a value from the cache dictionary because the source (table `test.xypairs`) doesn't respond.
|
||||
expected_error = "Table test.xypairs doesn't exist"
|
||||
assert expected_error in query_and_get_error("SELECT dictGetUInt64('cache_xypairs', 'y', toUInt64(1))")
|
||||
assert get_status("cache_xypairs") == "LOADED"
|
||||
assert expected_error in get_last_exception("cache_xypairs")
|
||||
|
||||
# Create table `test.xypairs`.
|
||||
query('''
|
||||
drop table if exists test.xypairs;
|
||||
create table test.xypairs (x UInt64, y UInt64) engine=Log;
|
||||
insert into test.xypairs values (1, 56), (3, 78);
|
||||
''')
|
||||
|
||||
# Cache dictionary now works.
|
||||
assert_eq_with_retry(instance, "SELECT dictGet('cache_xypairs', 'y', toUInt64(1))", "56", ignore_error=True)
|
||||
query("SELECT dictGet('cache_xypairs', 'y', toUInt64(2))") == "0"
|
||||
assert get_last_exception("cache_xypairs") == ""
|
||||
|
||||
# Drop table `test.xypairs`.
|
||||
query('drop table if exists test.xypairs')
|
||||
|
||||
# Values are cached so we can get them.
|
||||
query("SELECT dictGet('cache_xypairs', 'y', toUInt64(1))") == "56"
|
||||
query("SELECT dictGet('cache_xypairs', 'y', toUInt64(2))") == "0"
|
||||
assert get_last_exception("cache_xypairs") == ""
|
||||
|
||||
# But we can't get a value from the source table which isn't cached.
|
||||
assert expected_error in query_and_get_error("SELECT dictGetUInt64('cache_xypairs', 'y', toUInt64(3))")
|
||||
assert expected_error in get_last_exception("cache_xypairs")
|
||||
|
||||
# Passed time should not spoil the cache.
|
||||
time.sleep(5);
|
||||
query("SELECT dictGet('cache_xypairs', 'y', toUInt64(1))") == "56"
|
||||
query("SELECT dictGet('cache_xypairs', 'y', toUInt64(2))") == "0"
|
||||
assert expected_error in query_and_get_error("SELECT dictGetUInt64('cache_xypairs', 'y', toUInt64(3))")
|
||||
assert expected_error in get_last_exception("cache_xypairs")
|
||||
|
||||
# Create table `test.xypairs` again with changed values.
|
||||
query('''
|
||||
drop table if exists test.xypairs;
|
||||
create table test.xypairs (x UInt64, y UInt64) engine=Log;
|
||||
insert into test.xypairs values (1, 57), (3, 79);
|
||||
''')
|
||||
|
||||
# The cache dictionary returns new values now.
|
||||
assert_eq_with_retry(instance, "SELECT dictGet('cache_xypairs', 'y', toUInt64(1))", "57")
|
||||
query("SELECT dictGet('cache_xypairs', 'y', toUInt64(2))") == "0"
|
||||
query("SELECT dictGet('cache_xypairs', 'y', toUInt64(3))") == "79"
|
||||
assert get_last_exception("cache_xypairs") == ""
|
||||
|
@ -605,7 +605,7 @@ def test_kafka_produce_consume(kafka_cluster):
|
||||
assert int(result) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result)
|
||||
|
||||
|
||||
@pytest.mark.timeout(120)
|
||||
@pytest.mark.timeout(300)
|
||||
def test_kafka_commit_on_block_write(kafka_cluster):
|
||||
instance.query('''
|
||||
DROP TABLE IF EXISTS test.view;
|
||||
|
@ -19,7 +19,7 @@ $CLICKHOUSE_CLIENT --use_uncompressed_cache=1 --query_id="test-query-uncompresse
|
||||
sleep 1
|
||||
$CLICKHOUSE_CLIENT --query="SYSTEM FLUSH LOGS"
|
||||
|
||||
$CLICKHOUSE_CLIENT --query="SELECT ProfileEvents.Values[indexOf(ProfileEvents.Names, 'Seek')], ProfileEvents.Values[indexOf(ProfileEvents.Names, 'ReadCompressedBytes')], ProfileEvents.Values[indexOf(ProfileEvents.Names, 'UncompressedCacheHits')] AS hit FROM system.query_log WHERE (query_id = 'test-query-uncompressed-cache') AND (type = 2) ORDER BY event_time DESC LIMIT 1"
|
||||
$CLICKHOUSE_CLIENT --query="SELECT ProfileEvents.Values[indexOf(ProfileEvents.Names, 'Seek')], ProfileEvents.Values[indexOf(ProfileEvents.Names, 'ReadCompressedBytes')], ProfileEvents.Values[indexOf(ProfileEvents.Names, 'UncompressedCacheHits')] AS hit FROM system.query_log WHERE (query_id = 'test-query-uncompressed-cache') AND (type = 2) AND event_date >= yesterday() ORDER BY event_time DESC LIMIT 1"
|
||||
|
||||
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS small_table"
|
||||
|
||||
|
@ -10,7 +10,7 @@ do
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query="SYSTEM FLUSH LOGS"
|
||||
sleep 0.1;
|
||||
if [[ $($CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL" -d "SELECT count() > 0 FROM system.text_log WHERE position(system.text_log.message, 'SELECT 6103') > 0") == 1 ]]; then echo 1; exit; fi;
|
||||
if [[ $($CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL" -d "SELECT count() > 0 FROM system.text_log WHERE position(system.text_log.message, 'SELECT 6103') > 0 AND event_date >= yesterday()") == 1 ]]; then echo 1; exit; fi;
|
||||
|
||||
done;
|
||||
|
||||
|
@ -0,0 +1,2 @@
|
||||
1
|
||||
1
|
@ -0,0 +1,28 @@
|
||||
SET allow_experimental_data_skipping_indices=1;
|
||||
|
||||
DROP TABLE IF EXISTS bad_skip_idx;
|
||||
|
||||
CREATE TABLE bad_skip_idx
|
||||
(
|
||||
id UInt64,
|
||||
value String
|
||||
) ENGINE MergeTree()
|
||||
ORDER BY id SETTINGS index_granularity_bytes = 64, vertical_merge_algorithm_min_rows_to_activate = 0, vertical_merge_algorithm_min_columns_to_activate = 0; -- actually vertical merge is not required condition for this bug, but it's more easy to reproduce (becuse we don't recalc granularities)
|
||||
|
||||
-- 7 rows per granule
|
||||
INSERT INTO bad_skip_idx SELECT number, concat('x', toString(number)) FROM numbers(1000);
|
||||
|
||||
-- 3 rows per granule
|
||||
INSERT INTO bad_skip_idx SELECT number, concat('xxxxxxxxxx', toString(number)) FROM numbers(1000,1000);
|
||||
|
||||
SELECT COUNT(*) from bad_skip_idx WHERE value = 'xxxxxxxxxx1015'; -- check no exception
|
||||
|
||||
INSERT INTO bad_skip_idx SELECT number, concat('x', toString(number)) FROM numbers(1000);
|
||||
|
||||
ALTER TABLE bad_skip_idx ADD INDEX idx value TYPE bloom_filter(0.01) GRANULARITY 4;
|
||||
|
||||
OPTIMIZE TABLE bad_skip_idx FINAL;
|
||||
|
||||
SELECT COUNT(*) from bad_skip_idx WHERE value = 'xxxxxxxxxx1015'; -- check no exception
|
||||
|
||||
DROP TABLE IF EXISTS bad_skip_idx;
|
39
dbms/tests/queries/0_stateless/01001_rename_merge_race_condition.sh
Executable file
39
dbms/tests/queries/0_stateless/01001_rename_merge_race_condition.sh
Executable file
@ -0,0 +1,39 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
. $CURDIR/../shell_config.sh
|
||||
|
||||
set -e
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS test1";
|
||||
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS test2";
|
||||
$CLICKHOUSE_CLIENT --query "CREATE TABLE test1 (x UInt64) ENGINE = Memory";
|
||||
|
||||
|
||||
function thread1()
|
||||
{
|
||||
while true; do
|
||||
seq 1 1000 | sed -r -e 's/.+/RENAME TABLE test1 TO test2; RENAME TABLE test2 TO test1;/' | $CLICKHOUSE_CLIENT -n
|
||||
done
|
||||
}
|
||||
|
||||
function thread2()
|
||||
{
|
||||
while true; do
|
||||
$CLICKHOUSE_CLIENT --query "SELECT * FROM merge(currentDatabase(), '^test[12]$')"
|
||||
done
|
||||
}
|
||||
|
||||
# https://stackoverflow.com/questions/9954794/execute-a-shell-function-with-timeout
|
||||
export -f thread1;
|
||||
export -f thread2;
|
||||
|
||||
TIMEOUT=10
|
||||
|
||||
timeout $TIMEOUT bash -c thread1 2> /dev/null &
|
||||
timeout $TIMEOUT bash -c thread2 2> /dev/null &
|
||||
|
||||
wait
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS test1";
|
||||
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS test2";
|
58
dbms/tests/queries/0_stateless/01002_alter_nullable_adaptive_granularity.sh
Executable file
58
dbms/tests/queries/0_stateless/01002_alter_nullable_adaptive_granularity.sh
Executable file
@ -0,0 +1,58 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
. $CURDIR/../shell_config.sh
|
||||
|
||||
set -e
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS test";
|
||||
$CLICKHOUSE_CLIENT --query "CREATE TABLE test (x UInt8, s String MATERIALIZED toString(rand64())) ENGINE = MergeTree ORDER BY s";
|
||||
|
||||
function thread1()
|
||||
{
|
||||
while true; do
|
||||
$CLICKHOUSE_CLIENT --query "INSERT INTO test SELECT rand() FROM numbers(1000)";
|
||||
done
|
||||
}
|
||||
|
||||
function thread2()
|
||||
{
|
||||
while true; do
|
||||
$CLICKHOUSE_CLIENT -n --query "ALTER TABLE test MODIFY COLUMN x Nullable(UInt8);";
|
||||
sleep 0.0$RANDOM
|
||||
$CLICKHOUSE_CLIENT -n --query "ALTER TABLE test MODIFY COLUMN x UInt8;";
|
||||
sleep 0.0$RANDOM
|
||||
done
|
||||
}
|
||||
|
||||
function thread3()
|
||||
{
|
||||
while true; do
|
||||
$CLICKHOUSE_CLIENT -n --query "SELECT count() FROM test FORMAT Null";
|
||||
done
|
||||
}
|
||||
|
||||
function thread4()
|
||||
{
|
||||
while true; do
|
||||
$CLICKHOUSE_CLIENT -n --query "OPTIMIZE TABLE test FINAL";
|
||||
sleep 0.1$RANDOM
|
||||
done
|
||||
}
|
||||
|
||||
# https://stackoverflow.com/questions/9954794/execute-a-shell-function-with-timeout
|
||||
export -f thread1;
|
||||
export -f thread2;
|
||||
export -f thread3;
|
||||
export -f thread4;
|
||||
|
||||
TIMEOUT=10
|
||||
|
||||
timeout $TIMEOUT bash -c thread1 2> /dev/null &
|
||||
timeout $TIMEOUT bash -c thread2 2> /dev/null &
|
||||
timeout $TIMEOUT bash -c thread3 2> /dev/null &
|
||||
timeout $TIMEOUT bash -c thread4 2> /dev/null &
|
||||
|
||||
wait
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "DROP TABLE test"
|
50
dbms/tests/queries/0_stateless/01003_kill_query_race_condition.sh
Executable file
50
dbms/tests/queries/0_stateless/01003_kill_query_race_condition.sh
Executable file
@ -0,0 +1,50 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
. $CURDIR/../shell_config.sh
|
||||
|
||||
set -e
|
||||
|
||||
function thread1()
|
||||
{
|
||||
while true; do
|
||||
$CLICKHOUSE_CLIENT --query_id=hello --query "SELECT count() FROM numbers(1000000000)" --format Null;
|
||||
done
|
||||
}
|
||||
|
||||
function thread2()
|
||||
{
|
||||
while true; do
|
||||
$CLICKHOUSE_CLIENT --query "KILL QUERY WHERE query_id = 'hello'" --format Null;
|
||||
sleep 0.$RANDOM
|
||||
done
|
||||
}
|
||||
|
||||
function thread3()
|
||||
{
|
||||
while true; do
|
||||
$CLICKHOUSE_CLIENT --query "SHOW PROCESSLIST" --format Null;
|
||||
$CLICKHOUSE_CLIENT --query "SELECT * FROM system.processes" --format Null;
|
||||
done
|
||||
}
|
||||
|
||||
# https://stackoverflow.com/questions/9954794/execute-a-shell-function-with-timeout
|
||||
export -f thread1;
|
||||
export -f thread2;
|
||||
export -f thread3;
|
||||
|
||||
TIMEOUT=10
|
||||
|
||||
timeout $TIMEOUT bash -c thread1 2> /dev/null &
|
||||
timeout $TIMEOUT bash -c thread1 2> /dev/null &
|
||||
timeout $TIMEOUT bash -c thread1 2> /dev/null &
|
||||
timeout $TIMEOUT bash -c thread1 2> /dev/null &
|
||||
|
||||
timeout $TIMEOUT bash -c thread2 2> /dev/null &
|
||||
|
||||
timeout $TIMEOUT bash -c thread3 2> /dev/null &
|
||||
timeout $TIMEOUT bash -c thread3 2> /dev/null &
|
||||
timeout $TIMEOUT bash -c thread3 2> /dev/null &
|
||||
timeout $TIMEOUT bash -c thread3 2> /dev/null &
|
||||
|
||||
wait
|
60
dbms/tests/queries/0_stateless/01004_rename_deadlock.sh
Executable file
60
dbms/tests/queries/0_stateless/01004_rename_deadlock.sh
Executable file
@ -0,0 +1,60 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
. $CURDIR/../shell_config.sh
|
||||
|
||||
set -e
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS test1";
|
||||
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS test2";
|
||||
$CLICKHOUSE_CLIENT --query "CREATE TABLE test1 (x UInt8) ENGINE = MergeTree ORDER BY x";
|
||||
$CLICKHOUSE_CLIENT --query "CREATE TABLE test2 (x UInt8) ENGINE = MergeTree ORDER BY x";
|
||||
|
||||
function thread1()
|
||||
{
|
||||
while true; do
|
||||
$CLICKHOUSE_CLIENT --query "RENAME TABLE test1 TO test_tmp, test2 TO test1, test_tmp TO test2"
|
||||
done
|
||||
}
|
||||
|
||||
function thread2()
|
||||
{
|
||||
while true; do
|
||||
$CLICKHOUSE_CLIENT --query "SELECT * FROM test1 UNION ALL SELECT * FROM test2" --format Null
|
||||
done
|
||||
}
|
||||
|
||||
function thread3()
|
||||
{
|
||||
while true; do
|
||||
$CLICKHOUSE_CLIENT --query "SELECT * FROM system.tables" --format Null
|
||||
done
|
||||
}
|
||||
|
||||
# https://stackoverflow.com/questions/9954794/execute-a-shell-function-with-timeout
|
||||
export -f thread1;
|
||||
export -f thread2;
|
||||
export -f thread3;
|
||||
|
||||
TIMEOUT=10
|
||||
|
||||
timeout $TIMEOUT bash -c thread1 2> /dev/null &
|
||||
timeout $TIMEOUT bash -c thread2 2> /dev/null &
|
||||
timeout $TIMEOUT bash -c thread3 2> /dev/null &
|
||||
|
||||
timeout $TIMEOUT bash -c thread1 2> /dev/null &
|
||||
timeout $TIMEOUT bash -c thread2 2> /dev/null &
|
||||
timeout $TIMEOUT bash -c thread3 2> /dev/null &
|
||||
|
||||
timeout $TIMEOUT bash -c thread1 2> /dev/null &
|
||||
timeout $TIMEOUT bash -c thread2 2> /dev/null &
|
||||
timeout $TIMEOUT bash -c thread3 2> /dev/null &
|
||||
|
||||
timeout $TIMEOUT bash -c thread1 2> /dev/null &
|
||||
timeout $TIMEOUT bash -c thread2 2> /dev/null &
|
||||
timeout $TIMEOUT bash -c thread3 2> /dev/null &
|
||||
|
||||
wait
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "DROP TABLE test1"
|
||||
$CLICKHOUSE_CLIENT -q "DROP TABLE test2"
|
46
dbms/tests/queries/0_stateless/01005_rwr_shard_deadlock.sh
Executable file
46
dbms/tests/queries/0_stateless/01005_rwr_shard_deadlock.sh
Executable file
@ -0,0 +1,46 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
. $CURDIR/../shell_config.sh
|
||||
|
||||
set -e
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS test1";
|
||||
$CLICKHOUSE_CLIENT --query "CREATE TABLE test1 (x UInt8) ENGINE = MergeTree ORDER BY tuple()";
|
||||
|
||||
function thread1()
|
||||
{
|
||||
while true; do
|
||||
$CLICKHOUSE_CLIENT --query "ALTER TABLE test1 MODIFY COLUMN x Nullable(UInt8)"
|
||||
$CLICKHOUSE_CLIENT --query "ALTER TABLE test1 MODIFY COLUMN x UInt8"
|
||||
done
|
||||
}
|
||||
|
||||
function thread2()
|
||||
{
|
||||
while true; do
|
||||
$CLICKHOUSE_CLIENT --query "SELECT x FROM test1 WHERE x IN (SELECT x FROM remote('127.0.0.2', currentDatabase(), test1))" --format Null
|
||||
done
|
||||
}
|
||||
|
||||
# https://stackoverflow.com/questions/9954794/execute-a-shell-function-with-timeout
|
||||
export -f thread1;
|
||||
export -f thread2;
|
||||
|
||||
TIMEOUT=10
|
||||
|
||||
timeout $TIMEOUT bash -c thread1 2> /dev/null &
|
||||
timeout $TIMEOUT bash -c thread2 2> /dev/null &
|
||||
|
||||
timeout $TIMEOUT bash -c thread1 2> /dev/null &
|
||||
timeout $TIMEOUT bash -c thread2 2> /dev/null &
|
||||
|
||||
timeout $TIMEOUT bash -c thread1 2> /dev/null &
|
||||
timeout $TIMEOUT bash -c thread2 2> /dev/null &
|
||||
|
||||
timeout $TIMEOUT bash -c thread1 2> /dev/null &
|
||||
timeout $TIMEOUT bash -c thread2 2> /dev/null &
|
||||
|
||||
wait
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "DROP TABLE test1"
|
23
dbms/tests/stress
Executable file
23
dbms/tests/stress
Executable file
@ -0,0 +1,23 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
# https://stackoverflow.com/questions/360201/how-do-i-kill-background-processes-jobs-when-my-shell-script-exits
|
||||
trap 'kill -9 $(jobs -p)' EXIT
|
||||
|
||||
function thread()
|
||||
{
|
||||
while true; do
|
||||
./clickhouse-test --order random 2>&1 | awk '/^\w+:/ { printf("\033[0;%s%sm \033[0m", ('$1' % 2 ? "4" : "10"), (int('$1' / 2) % 8)) }'
|
||||
done
|
||||
}
|
||||
|
||||
# https://stackoverflow.com/questions/9954794/execute-a-shell-function-with-timeout
|
||||
export -f thread;
|
||||
|
||||
NUM_THREADS=${1:-"16"}
|
||||
TIMEOUT=${2:-"300"}
|
||||
|
||||
for i in $(seq 1 $NUM_THREADS); do
|
||||
timeout $TIMEOUT bash -c "thread $i" 2> /dev/null &
|
||||
done
|
||||
|
||||
wait
|
@ -143,7 +143,7 @@ def parse_env_variables(build_type, compiler, sanitizer, package_type, cache, di
|
||||
result.append("ALIEN_PKGS='" + ' '.join(['--' + pkg for pkg in alien_pkgs]) + "'")
|
||||
|
||||
if unbundled:
|
||||
cmake_flags.append('-DUNBUNDLED=1 -DENABLE_MYSQL=0 -DENABLE_POCO_ODBC=0 -DENABLE_ODBC=0 -DUSE_CAPNP=0')
|
||||
cmake_flags.append('-DUNBUNDLED=1 -DENABLE_MYSQL=0 -DENABLE_POCO_ODBC=0 -DENABLE_ODBC=0')
|
||||
|
||||
if split_binary:
|
||||
cmake_flags.append('-DUSE_STATIC_LIBRARIES=0 -DSPLIT_SHARED_LIBRARIES=1 -DCLICKHOUSE_SPLIT_BINARY=1')
|
||||
|
Loading…
Reference in New Issue
Block a user