Merge branch 'master' into merging_template_format

This commit is contained in:
Alexander Tokmakov 2019-09-02 12:41:34 +03:00
commit e2928481a3
149 changed files with 1188 additions and 1657 deletions

3
.gitmodules vendored
View File

@ -97,9 +97,6 @@
[submodule "contrib/rapidjson"]
path = contrib/rapidjson
url = https://github.com/Tencent/rapidjson
[submodule "contrib/mimalloc"]
path = contrib/mimalloc
url = https://github.com/ClickHouse-Extras/mimalloc
[submodule "contrib/fastops"]
path = contrib/fastops
url = https://github.com/ClickHouse-Extras/fastops

View File

@ -340,7 +340,6 @@ include (cmake/find_consistent-hashing.cmake)
include (cmake/find_base64.cmake)
include (cmake/find_parquet.cmake)
include (cmake/find_hyperscan.cmake)
include (cmake/find_mimalloc.cmake)
include (cmake/find_simdjson.cmake)
include (cmake/find_rapidjson.cmake)
include (cmake/find_fastops.cmake)

View File

@ -1,13 +1,14 @@
find_program (CCACHE_FOUND ccache)
if (CCACHE_FOUND AND NOT CMAKE_CXX_COMPILER_LAUNCHER MATCHES "ccache" AND NOT CMAKE_CXX_COMPILER MATCHES "ccache")
execute_process(COMMAND ${CCACHE_FOUND} "-V" OUTPUT_VARIABLE CCACHE_VERSION)
string(REGEX REPLACE "ccache version ([0-9\\.]+).*" "\\1" CCACHE_VERSION ${CCACHE_VERSION})
if (CCACHE_VERSION VERSION_GREATER "3.2.0" OR NOT CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
#message(STATUS "Using ${CCACHE_FOUND} ${CCACHE_VERSION}")
set_property (GLOBAL PROPERTY RULE_LAUNCH_COMPILE ${CCACHE_FOUND})
set_property (GLOBAL PROPERTY RULE_LAUNCH_LINK ${CCACHE_FOUND})
else ()
message(STATUS "Not using ${CCACHE_FOUND} ${CCACHE_VERSION} bug: https://bugzilla.samba.org/show_bug.cgi?id=8118")
endif ()
if (CCACHE_FOUND AND NOT CMAKE_CXX_COMPILER_LAUNCHER MATCHES "ccache" AND NOT CMAKE_CXX_COMPILER MATCHES "ccache")
execute_process(COMMAND ${CCACHE_FOUND} "-V" OUTPUT_VARIABLE CCACHE_VERSION)
string(REGEX REPLACE "ccache version ([0-9\\.]+).*" "\\1" CCACHE_VERSION ${CCACHE_VERSION})
if (CCACHE_VERSION VERSION_GREATER "3.2.0" OR NOT CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
#message(STATUS "Using ${CCACHE_FOUND} ${CCACHE_VERSION}")
set_property (GLOBAL PROPERTY RULE_LAUNCH_COMPILE ${CCACHE_FOUND})
set_property (GLOBAL PROPERTY RULE_LAUNCH_LINK ${CCACHE_FOUND})
else ()
message(STATUS "Not using ${CCACHE_FOUND} ${CCACHE_VERSION} bug: https://bugzilla.samba.org/show_bug.cgi?id=8118")
endif ()
endif ()

View File

@ -1,5 +1,5 @@
if (OS_LINUX AND COMPILER_CLANG)
option (USE_LIBCXX "Use libc++ and libc++abi instead of libstdc++" ${HAVE_LIBCXX})
option (USE_LIBCXX "Use libc++ and libc++abi instead of libstdc++" ON)
option (USE_INTERNAL_LIBCXX_LIBRARY "Set to FALSE to use system libcxx and libcxxabi libraries instead of bundled" ${NOT_UNBUNDLED})
endif()
@ -25,6 +25,8 @@ if (USE_LIBCXX)
find_library (LIBCXXFS_LIBRARY c++fs)
find_library (LIBCXXABI_LIBRARY c++abi)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libc++")
target_link_libraries(global-libs INTERFACE ${EXCEPTION_HANDLING_LIBRARY})
else ()
set (LIBCXX_LIBRARY cxx)
@ -38,7 +40,6 @@ if (USE_LIBCXX)
target_link_libraries(global-libs INTERFACE ${LIBCXX_LIBRARY} ${LIBCXXABI_LIBRARY} ${LIBCXXFS_LIBRARY})
set (HAVE_LIBCXX 1)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libc++")
message (STATUS "Using libcxx: ${LIBCXX_LIBRARY}")
message (STATUS "Using libcxxfs: ${LIBCXXFS_LIBRARY}")

View File

@ -1,17 +0,0 @@
if (OS_LINUX AND NOT SANITIZE AND NOT ARCH_ARM AND NOT ARCH_32 AND NOT ARCH_PPC64LE)
option (ENABLE_MIMALLOC "Set to FALSE to disable usage of mimalloc for internal ClickHouse caches" FALSE)
endif ()
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/mimalloc/include/mimalloc.h")
message (WARNING "submodule contrib/mimalloc is missing. to fix try run: \n git submodule update --init --recursive")
return()
endif ()
if (ENABLE_MIMALLOC)
message (FATAL_ERROR "Mimalloc is not production ready. (Disable with cmake -D ENABLE_MIMALLOC=0). If you want to use mimalloc, you must manually remove this message.")
set (MIMALLOC_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/mimalloc/include)
set (USE_MIMALLOC 1)
set (MIMALLOC_LIBRARY mimalloc-static)
message (STATUS "Using mimalloc: ${MIMALLOC_INCLUDE_DIR} : ${MIMALLOC_LIBRARY}")
endif ()

View File

@ -28,7 +28,8 @@ set (KJ_SRCS
)
add_library(kj ${KJ_SRCS})
target_include_directories(kj INTERFACE ${CAPNPROTO_SOURCE_DIR})
target_include_directories(kj PUBLIC ${CAPNPROTO_SOURCE_DIR})
target_compile_options(kj PUBLIC -Wno-non-virtual-dtor)
set (CAPNP_SRCS
${CAPNPROTO_SOURCE_DIR}/capnp/c++.capnp.c++

View File

@ -42,7 +42,7 @@ add_library(cxx ${SRCS})
target_include_directories(cxx SYSTEM BEFORE PUBLIC $<BUILD_INTERFACE:${LIBCXX_SOURCE_DIR}/include>)
target_compile_definitions(cxx PRIVATE -D_LIBCPP_BUILDING_LIBRARY -DLIBCXX_BUILDING_LIBCXXABI)
target_compile_options(cxx PRIVATE -nostdinc++)
target_compile_options(cxx PUBLIC -nostdinc++ -Wno-reserved-id-macro)
target_link_libraries(cxx PUBLIC cxxabi)
install(
@ -50,4 +50,5 @@ install(
EXPORT global
ARCHIVE DESTINATION lib
RUNTIME DESTINATION lib
LIBRARY DESTINATION lib
)

View File

@ -29,17 +29,13 @@ target_include_directories(cxxabi SYSTEM BEFORE
PRIVATE $<BUILD_INTERFACE:${LIBCXXABI_SOURCE_DIR}/../libcxx/include>
)
target_compile_definitions(cxxabi PRIVATE -D_LIBCPP_BUILDING_LIBRARY)
target_compile_options(cxxabi PRIVATE -nostdinc++ -fno-sanitize=undefined) # If we don't disable UBSan, infinite recursion happens in dynamic_cast.
if (USE_UNWIND)
target_link_libraries(cxxabi PRIVATE ${UNWIND_LIBRARIES})
else ()
target_link_libraries(cxxabi PRIVATE gcc_eh)
endif ()
target_compile_options(cxxabi PRIVATE -nostdinc++ -fno-sanitize=undefined -Wno-macro-redefined) # If we don't disable UBSan, infinite recursion happens in dynamic_cast.
target_link_libraries(cxxabi PUBLIC ${EXCEPTION_HANDLING_LIBRARY})
install(
TARGETS cxxabi
EXPORT global
ARCHIVE DESTINATION lib
RUNTIME DESTINATION lib
LIBRARY DESTINATION lib
)

1
contrib/mimalloc vendored

@ -1 +0,0 @@
Subproject commit a787bdebce94bf3776dc0d1ad597917f479ab8d5

View File

@ -256,11 +256,6 @@ if(RE2_INCLUDE_DIR)
target_include_directories(clickhouse_common_io SYSTEM BEFORE PUBLIC ${RE2_INCLUDE_DIR})
endif()
if (USE_MIMALLOC)
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${MIMALLOC_INCLUDE_DIR})
target_link_libraries (clickhouse_common_io PRIVATE ${MIMALLOC_LIBRARY})
endif ()
if(CPUID_LIBRARY)
target_link_libraries(clickhouse_common_io PRIVATE ${CPUID_LIBRARY})
endif()

View File

@ -97,7 +97,7 @@ void PerformanceTestInfo::applySettings(XMLConfigurationPtr config)
}
extractSettings(config, "settings", config_settings, settings_to_apply);
settings.loadFromChanges(settings_to_apply);
settings.applyChanges(settings_to_apply);
if (settings_contain("average_rows_speed_precision"))
TestStats::avg_rows_speed_precision =

View File

@ -520,7 +520,18 @@ int Server::main(const std::vector<std::string> & /*args*/)
/// Init trace collector only after trace_log system table was created
/// Disable it if we collect test coverage information, because it will work extremely slow.
#if USE_UNWIND && !WITH_COVERAGE
///
/// It also cannot work with sanitizers.
/// Sanitizers are using quick "frame walking" stack unwinding (this implies -fno-omit-frame-pointer)
/// And they do unwiding frequently (on every malloc/free, thread/mutex operations, etc).
/// They change %rbp during unwinding and it confuses libunwind if signal comes during sanitizer unwiding
/// and query profiler decide to unwind stack with libunwind at this moment.
///
/// Symptoms: you'll get silent Segmentation Fault - without sanitizer message and without usual ClickHouse diagnostics.
///
/// Look at compiler-rt/lib/sanitizer_common/sanitizer_stacktrace.h
///
#if USE_UNWIND && !WITH_COVERAGE && !defined(SANITIZER)
/// QueryProfiler cannot work reliably with any other libunwind or without PHDR cache.
if (hasPHDRCache())
global_context->initializeTraceCollector();

View File

@ -446,7 +446,7 @@ namespace ErrorCodes
extern const int VIOLATED_CONSTRAINT = 469;
extern const int QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW = 470;
extern const int SETTINGS_ARE_NOT_SUPPORTED = 471;
extern const int IMMUTABLE_SETTING = 472;
extern const int READONLY_SETTING = 472;
extern const int INVALID_TEMPLATE_FORMAT = 473;
extern const int KEEPER_EXCEPTION = 999;

View File

@ -1,70 +0,0 @@
#include "MiAllocator.h"
#if USE_MIMALLOC
#include <mimalloc.h>
#include <Common/Exception.h>
#include <Common/formatReadable.h>
#include <IO/WriteHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_ALLOCATE_MEMORY;
}
void * MiAllocator::alloc(size_t size, size_t alignment)
{
void * ptr;
if (alignment == 0)
{
ptr = mi_malloc(size);
if (!ptr)
DB::throwFromErrno("MiAllocator: Cannot allocate in mimalloc " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
}
else
{
ptr = mi_malloc_aligned(size, alignment);
if (!ptr)
DB::throwFromErrno("MiAllocator: Cannot allocate in mimalloc (mi_malloc_aligned) " + formatReadableSizeWithBinarySuffix(size) + " with alignment " + toString(alignment) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
}
return ptr;
}
void MiAllocator::free(void * buf, size_t)
{
mi_free(buf);
}
void * MiAllocator::realloc(void * old_ptr, size_t, size_t new_size, size_t alignment)
{
if (old_ptr == nullptr)
return alloc(new_size, alignment);
if (new_size == 0)
{
mi_free(old_ptr);
return nullptr;
}
void * ptr;
if (alignment == 0)
{
ptr = mi_realloc(old_ptr, alignment);
if (!ptr)
DB::throwFromErrno("MiAllocator: Cannot reallocate in mimalloc " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
}
else
{
ptr = mi_realloc_aligned(old_ptr, new_size, alignment);
if (!ptr)
DB::throwFromErrno("MiAllocator: Cannot reallocate in mimalloc (mi_realloc_aligned) " + formatReadableSizeWithBinarySuffix(size) + " with alignment " + toString(alignment) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
}
return ptr;
}
}
#endif

View File

@ -1,27 +0,0 @@
#pragma once
#include <Common/config.h>
#if USE_MIMALLOC
#include <cstddef>
namespace DB
{
/*
* This is a different allocator that is based on mimalloc (Microsoft malloc).
* It can be used separately from main allocator to catch heap corruptions and vulnerabilities (for example, for caches).
* We use MI_SECURE mode in mimalloc to achieve such behaviour.
*/
struct MiAllocator
{
static void * alloc(size_t size, size_t alignment = 0);
static void free(void * buf, size_t);
static void * realloc(void * old_ptr, size_t, size_t new_size, size_t alignment = 0);
};
}
#endif

View File

@ -36,8 +36,10 @@
M(MarkCacheMisses, "") \
M(CreatedReadBufferOrdinary, "") \
M(CreatedReadBufferAIO, "") \
M(CreatedReadBufferAIOFailed, "") \
M(CreatedWriteBufferOrdinary, "") \
M(CreatedWriteBufferAIO, "") \
M(CreatedWriteBufferAIOFailed, "") \
M(DiskReadElapsedMicroseconds, "Total time spent waiting for read syscall. This include reads from page cache.") \
M(DiskWriteElapsedMicroseconds, "Total time spent waiting for write syscall. This include writes to page cache.") \
M(NetworkReceiveElapsedMicroseconds, "") \

View File

@ -61,6 +61,7 @@ public:
InternalTextLogsQueueWeakPtr logs_queue_ptr;
std::vector<UInt32> thread_numbers;
std::vector<UInt32> os_thread_ids;
/// The first thread created this thread group
UInt32 master_thread_number = 0;

View File

@ -8,6 +8,5 @@
#cmakedefine01 USE_CPUID
#cmakedefine01 USE_CPUINFO
#cmakedefine01 USE_BROTLI
#cmakedefine01 USE_MIMALLOC
#cmakedefine01 USE_UNWIND
#cmakedefine01 CLICKHOUSE_SPLIT_BINARY

View File

@ -76,8 +76,5 @@ target_link_libraries (cow_compositions PRIVATE clickhouse_common_io)
add_executable (stopwatch stopwatch.cpp)
target_link_libraries (stopwatch PRIVATE clickhouse_common_io)
add_executable (mi_malloc_test mi_malloc_test.cpp)
target_link_libraries (mi_malloc_test PRIVATE clickhouse_common_io)
add_executable (symbol_index symbol_index.cpp)
target_link_libraries (symbol_index PRIVATE clickhouse_common_io)

View File

@ -1,118 +0,0 @@
/** In addition to ClickHouse (Apache 2) license, this file can be also used under MIT license:
MIT License
Copyright (c) 2019 Yandex LLC, Alexey Milovidov
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include <map>
#include <vector>
#include <cstdint>
#include <random>
#include <stdexcept>
#include <iostream>
#include <Common/config.h>
//#undef USE_MIMALLOC
//#define USE_MIMALLOC 0
#if USE_MIMALLOC
#include <mimalloc.h>
#define malloc mi_malloc
#define free mi_free
#else
#include <stdlib.h>
#endif
size_t total_size{0};
struct Allocation
{
void * ptr = nullptr;
size_t size = 0;
Allocation() {}
Allocation(size_t size_)
: size(size_)
{
ptr = malloc(size);
if (!ptr)
throw std::runtime_error("Cannot allocate memory");
total_size += size;
}
~Allocation()
{
if (ptr)
{
free(ptr);
total_size -= size;
}
ptr = nullptr;
}
Allocation(const Allocation &) = delete;
Allocation(Allocation && rhs)
{
ptr = rhs.ptr;
size = rhs.size;
rhs.ptr = nullptr;
rhs.size = 0;
}
};
int main(int, char **)
{
std::vector<Allocation> allocations;
constexpr size_t limit = 100000000;
constexpr size_t min_alloc_size = 65536;
constexpr size_t max_alloc_size = 10000000;
std::mt19937 rng;
auto distribution = std::uniform_int_distribution(min_alloc_size, max_alloc_size);
size_t total_allocations = 0;
while (true)
{
size_t size = distribution(rng);
while (total_size + size > limit)
allocations.pop_back();
allocations.emplace_back(size);
++total_allocations;
if (total_allocations % (1ULL << 20) == 0)
std::cerr << "Total allocations: " << total_allocations << "\n";
}
}

View File

@ -124,6 +124,8 @@
#endif
#endif
/// TODO Strange enough, there is no way to detect UB sanitizer.
/// Explicitly allow undefined behaviour for certain functions. Use it as a function attribute.
/// It is useful in case when compiler cannot see (and exploit) it, but UBSan can.
/// Example: multiplication of signed integers with possibility of overflow when both sides are from user input.

View File

@ -42,8 +42,7 @@ struct Settings : public SettingsCollection<Settings>
* but we are not going to do it, because settings is used everywhere as static struct fields.
*/
/// M (mutable) for normal settings, IM (immutable) for not updateable settings.
#define LIST_OF_SETTINGS(M, IM) \
#define LIST_OF_SETTINGS(M) \
M(SettingUInt64, min_compress_block_size, 65536, "The actual size of the block to compress, if the uncompressed data less than max_compress_block_size is no less than this value and no less than the volume of data for one mark.") \
M(SettingUInt64, max_compress_block_size, 1048576, "The maximum size of blocks of uncompressed data before compressing for writing to a table.") \
M(SettingUInt64, max_block_size, DEFAULT_BLOCK_SIZE, "Maximum block size for reading") \

View File

@ -17,11 +17,6 @@ class Field;
class ReadBuffer;
class WriteBuffer;
namespace ErrorCodes
{
extern const int IMMUTABLE_SETTING;
}
/** One setting for any type.
* Stores a value within itself, as well as a flag - whether the value was changed.
* This is done so that you can send to the remote servers only changed settings (or explicitly specified in the config) values.
@ -317,15 +312,12 @@ private:
using DeserializeFunction = void (*)(Derived &, ReadBuffer & buf);
using CastValueWithoutApplyingFunction = Field (*)(const Field &);
struct MemberInfo
{
IsChangedFunction is_changed;
StringRef name;
StringRef description;
/// Can be updated after first load for config/definition.
/// Non updatable settings can be `changed`,
/// if they were overwritten in config/definition.
const bool updateable;
GetStringFunction get_string;
GetFieldFunction get_field;
SetStringFunction set_string;
@ -405,7 +397,6 @@ public:
const_reference(const const_reference & src) = default;
const StringRef & getName() const { return member->name; }
const StringRef & getDescription() const { return member->description; }
bool isUpdateable() const { return member->updateable; }
bool isChanged() const { return member->isChanged(*collection); }
Field getValue() const { return member->get_field(*collection); }
String getValueAsString() const { return member->get_string(*collection); }
@ -425,18 +416,6 @@ public:
reference(const const_reference & src) : const_reference(src) {}
void setValue(const Field & value) { this->member->set_field(*const_cast<Derived *>(this->collection), value); }
void setValue(const String & value) { this->member->set_string(*const_cast<Derived *>(this->collection), value); }
void updateValue(const Field & value)
{
if (!this->member->updateable)
throw Exception("Setting '" + this->member->name.toString() + "' is restricted for updates.", ErrorCodes::IMMUTABLE_SETTING);
setValue(value);
}
void updateValue(const String & value)
{
if (!this->member->updateable)
throw Exception("Setting '" + this->member->name.toString() + "' is restricted for updates.", ErrorCodes::IMMUTABLE_SETTING);
setValue(value);
}
};
/// Iterator to iterating through all the settings.
@ -519,15 +498,6 @@ public:
void set(size_t index, const String & value) { (*this)[index].setValue(value); }
void set(const String & name, const String & value) { (*this)[name].setValue(value); }
/// Updates setting's value. Checks it' mutability.
void update(size_t index, const Field & value) { (*this)[index].updateValue(value); }
void update(const String & name, const Field & value) { (*this)[name].updateValue(value); }
void update(size_t index, const String & value) { (*this)[index].updateValue(value); }
void update(const String & name, const String & value) { (*this)[name].updateValue(value); }
/// Returns value of a setting.
Field get(size_t index) const { return (*this)[index].getValue(); }
Field get(const String & name) const { return (*this)[name].getValue(); }
@ -591,35 +561,19 @@ public:
return found_changes;
}
/// Applies change to the settings. Doesn't check settings mutability.
void loadFromChange(const SettingChange & change)
/// Applies change to concrete setting.
void applyChange(const SettingChange & change)
{
set(change.name, change.value);
}
/// Applies changes to the settings. Should be used in initial settings loading.
/// (on table creation or loading from config)
void loadFromChanges(const SettingsChanges & changes)
/// Applies changes to the settings.
void applyChanges(const SettingsChanges & changes)
{
for (const SettingChange & change : changes)
loadFromChange(change);
applyChange(change);
}
/// Applies change to the settings, checks settings mutability.
void updateFromChange(const SettingChange & change)
{
update(change.name, change.value);
}
/// Applies changes to the settings. Should be used for settigns update.
/// (ALTER MODIFY SETTINGS)
void updateFromChanges(const SettingsChanges & changes)
{
for (const SettingChange & change : changes)
updateFromChange(change);
}
void copyChangesFrom(const Derived & src)
{
for (const auto & member : members())
@ -663,7 +617,7 @@ public:
};
#define DECLARE_SETTINGS_COLLECTION(LIST_OF_SETTINGS_MACRO) \
LIST_OF_SETTINGS_MACRO(DECLARE_SETTINGS_COLLECTION_DECLARE_VARIABLES_HELPER_, DECLARE_SETTINGS_COLLECTION_DECLARE_VARIABLES_HELPER_)
LIST_OF_SETTINGS_MACRO(DECLARE_SETTINGS_COLLECTION_DECLARE_VARIABLES_HELPER_)
#define IMPLEMENT_SETTINGS_COLLECTION(DERIVED_CLASS_NAME, LIST_OF_SETTINGS_MACRO) \
@ -673,9 +627,9 @@ public:
using Derived = DERIVED_CLASS_NAME; \
struct Functions \
{ \
LIST_OF_SETTINGS_MACRO(IMPLEMENT_SETTINGS_COLLECTION_DEFINE_FUNCTIONS_HELPER_, IMPLEMENT_SETTINGS_COLLECTION_DEFINE_FUNCTIONS_HELPER_) \
LIST_OF_SETTINGS_MACRO(IMPLEMENT_SETTINGS_COLLECTION_DEFINE_FUNCTIONS_HELPER_) \
}; \
LIST_OF_SETTINGS_MACRO(IMPLEMENT_SETTINGS_COLLECTION_ADD_MUTABLE_MEMBER_INFO_HELPER_, IMPLEMENT_SETTINGS_COLLECTION_ADD_IMMUTABLE_MEMBER_INFO_HELPER_) \
LIST_OF_SETTINGS_MACRO(IMPLEMENT_SETTINGS_COLLECTION_ADD_MEMBER_INFO_HELPER_) \
}
@ -690,22 +644,14 @@ public:
static void NAME##_setField(Derived & collection, const Field & value) { collection.NAME.set(value); } \
static void NAME##_serialize(const Derived & collection, WriteBuffer & buf) { collection.NAME.serialize(buf); } \
static void NAME##_deserialize(Derived & collection, ReadBuffer & buf) { collection.NAME.deserialize(buf); } \
static Field NAME##_castValueWithoutApplying(const Field & value) { TYPE temp{DEFAULT}; temp.set(value); return temp.toField(); }
static Field NAME##_castValueWithoutApplying(const Field & value) { TYPE temp{DEFAULT}; temp.set(value); return temp.toField(); } \
#define IMPLEMENT_SETTINGS_COLLECTION_ADD_MUTABLE_MEMBER_INFO_HELPER_(TYPE, NAME, DEFAULT, DESCRIPTION) \
#define IMPLEMENT_SETTINGS_COLLECTION_ADD_MEMBER_INFO_HELPER_(TYPE, NAME, DEFAULT, DESCRIPTION) \
add({[](const Derived & d) { return d.NAME.changed; }, \
StringRef(#NAME, strlen(#NAME)), StringRef(DESCRIPTION, strlen(DESCRIPTION)), true, \
StringRef(#NAME, strlen(#NAME)), StringRef(DESCRIPTION, strlen(DESCRIPTION)), \
&Functions::NAME##_getString, &Functions::NAME##_getField, \
&Functions::NAME##_setString, &Functions::NAME##_setField, \
&Functions::NAME##_serialize, &Functions::NAME##_deserialize, \
&Functions::NAME##_castValueWithoutApplying });
#define IMPLEMENT_SETTINGS_COLLECTION_ADD_IMMUTABLE_MEMBER_INFO_HELPER_(TYPE, NAME, DEFAULT, DESCRIPTION) \
add({[](const Derived & d) { return d.NAME.changed; }, \
StringRef(#NAME, strlen(#NAME)), StringRef(DESCRIPTION, strlen(DESCRIPTION)), false, \
&Functions::NAME##_getString, &Functions::NAME##_getField, \
&Functions::NAME##_setString, &Functions::NAME##_setField, \
&Functions::NAME##_serialize, &Functions::NAME##_deserialize, \
&Functions::NAME##_castValueWithoutApplying });
}

View File

@ -314,7 +314,11 @@ private:
/// NOTE: Acquire a read lock, therefore f() should be thread safe
std::shared_lock lock(children_mutex);
for (auto & child : children)
// Reduce lock scope and avoid recursive locking since that is undefined for shared_mutex.
const auto children_copy = children;
lock.unlock();
for (auto & child : children_copy)
if (f(*child))
return;
}

View File

@ -6,10 +6,6 @@
#include <IO/WriteHelpers.h>
#include <Common/PODArray.h>
#include <Common/config.h>
#if USE_MIMALLOC
#include <Common/MiAllocator.h>
#endif
namespace DB
{
@ -43,9 +39,7 @@ struct MarkInCompressedFile
}
};
#if USE_MIMALLOC
using MarksInCompressedFile = PODArray<MarkInCompressedFile, 4096, MiAllocator>;
#else
using MarksInCompressedFile = PODArray<MarkInCompressedFile>;
#endif
}

View File

@ -26,7 +26,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
* Although now any insertion into the table is done via PushingToViewsBlockOutputStream,
* but it's clear that here is not the best place for this functionality.
*/
addTableLock(storage->lockStructureForShare(true, context.getCurrentQueryId()));
addTableLock(storage->lockStructureForShare(true, context.getInitialQueryId()));
/// If the "root" table deduplactes blocks, there are no need to make deduplication for children
/// Moreover, deduplication for AggregatingMergeTree children could produce false positives due to low size of inserting blocks

View File

@ -70,6 +70,7 @@ CacheDictionary::CacheDictionary(
, dict_struct(dict_struct_)
, source_ptr{std::move(source_ptr_)}
, dict_lifetime(dict_lifetime_)
, log(&Logger::get("ExternalDictionaries"))
, size{roundUpToPowerOfTwoOrZero(std::max(size_, size_t(max_collision_length)))}
, size_overlap_mask{this->size - 1}
, cells{this->size}
@ -575,6 +576,12 @@ BlockInputStreamPtr CacheDictionary::getBlockInputStream(const Names & column_na
return std::make_shared<BlockInputStreamType>(shared_from_this(), max_block_size, getCachedIds(), column_names);
}
std::exception_ptr CacheDictionary::getLastException() const
{
const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
return last_exception;
}
void registerDictionaryCache(DictionaryFactory & factory)
{
auto create_layout = [=](const std::string & name,

View File

@ -7,6 +7,7 @@
#include <shared_mutex>
#include <variant>
#include <vector>
#include <common/logger_useful.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnString.h>
#include <pcg_random.hpp>
@ -74,6 +75,8 @@ public:
void isInVectorConstant(const PaddedPODArray<Key> & child_ids, const Key ancestor_id, PaddedPODArray<UInt8> & out) const override;
void isInConstantVector(const Key child_id, const PaddedPODArray<Key> & ancestor_ids, PaddedPODArray<UInt8> & out) const override;
std::exception_ptr getLastException() const override;
template <typename T>
using ResultArrayType = std::conditional_t<IsDecimalNumber<T>, DecimalPaddedPODArray<T>, PaddedPODArray<T>>;
@ -253,8 +256,9 @@ private:
const std::string name;
const DictionaryStructure dict_struct;
const DictionarySourcePtr source_ptr;
mutable DictionarySourcePtr source_ptr;
const DictionaryLifetime dict_lifetime;
Logger * const log;
mutable std::shared_mutex rw_lock;
@ -274,6 +278,10 @@ private:
Attribute * hierarchical_attribute = nullptr;
std::unique_ptr<ArenaWithFreeLists> string_arena;
mutable std::exception_ptr last_exception;
mutable size_t error_count = 0;
mutable std::chrono::system_clock::time_point backoff_end_time;
mutable pcg64 rnd_engine;
mutable size_t bytes_allocated = 0;

View File

@ -3,6 +3,7 @@
#include <Columns/ColumnsNumber.h>
#include <Common/ProfilingScopedRWLock.h>
#include <Common/typeid_cast.h>
#include <common/DateLUT.h>
#include <DataStreams/IBlockInputStream.h>
#include <ext/map.h>
#include <ext/range.h>
@ -243,77 +244,102 @@ template <typename PresentIdHandler, typename AbsentIdHandler>
void CacheDictionary::update(
const std::vector<Key> & requested_ids, PresentIdHandler && on_cell_updated, AbsentIdHandler && on_id_not_found) const
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::DictCacheRequests};
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, requested_ids.size());
std::unordered_map<Key, UInt8> remaining_ids{requested_ids.size()};
for (const auto id : requested_ids)
remaining_ids.insert({id, 0});
std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec};
const auto now = std::chrono::system_clock::now();
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
if (now > backoff_end_time)
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::DictCacheRequests};
Stopwatch watch;
auto stream = source_ptr->loadIds(requested_ids);
stream->readPrefix();
const auto now = std::chrono::system_clock::now();
while (const auto block = stream->read())
try
{
const auto id_column = typeid_cast<const ColumnUInt64 *>(block.safeGetByPosition(0).column.get());
if (!id_column)
throw Exception{name + ": id column has type different from UInt64.", ErrorCodes::TYPE_MISMATCH};
const auto & ids = id_column->getData();
/// cache column pointers
const auto column_ptrs = ext::map<std::vector>(
ext::range(0, attributes.size()), [&block](size_t i) { return block.safeGetByPosition(i + 1).column.get(); });
for (const auto i : ext::range(0, ids.size()))
if (error_count)
{
const auto id = ids[i];
const auto find_result = findCellIdx(id, now);
const auto & cell_idx = find_result.cell_idx;
auto & cell = cells[cell_idx];
for (const auto attribute_idx : ext::range(0, attributes.size()))
{
const auto & attribute_column = *column_ptrs[attribute_idx];
auto & attribute = attributes[attribute_idx];
setAttributeValue(attribute, cell_idx, attribute_column[i]);
}
/// if cell id is zero and zero does not map to this cell, then the cell is unused
if (cell.id == 0 && cell_idx != zero_cell_idx)
element_count.fetch_add(1, std::memory_order_relaxed);
cell.id = id;
if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0)
cell.setExpiresAt(std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)});
else
cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());
/// inform caller
on_cell_updated(id, cell_idx);
/// mark corresponding id as found
remaining_ids[id] = 1;
/// Recover after error: we have to clone the source here because
/// it could keep connections which should be reset after error.
source_ptr = source_ptr->clone();
}
Stopwatch watch;
auto stream = source_ptr->loadIds(requested_ids);
stream->readPrefix();
while (const auto block = stream->read())
{
const auto id_column = typeid_cast<const ColumnUInt64 *>(block.safeGetByPosition(0).column.get());
if (!id_column)
throw Exception{name + ": id column has type different from UInt64.", ErrorCodes::TYPE_MISMATCH};
const auto & ids = id_column->getData();
/// cache column pointers
const auto column_ptrs = ext::map<std::vector>(
ext::range(0, attributes.size()), [&block](size_t i) { return block.safeGetByPosition(i + 1).column.get(); });
for (const auto i : ext::range(0, ids.size()))
{
const auto id = ids[i];
const auto find_result = findCellIdx(id, now);
const auto & cell_idx = find_result.cell_idx;
auto & cell = cells[cell_idx];
for (const auto attribute_idx : ext::range(0, attributes.size()))
{
const auto & attribute_column = *column_ptrs[attribute_idx];
auto & attribute = attributes[attribute_idx];
setAttributeValue(attribute, cell_idx, attribute_column[i]);
}
/// if cell id is zero and zero does not map to this cell, then the cell is unused
if (cell.id == 0 && cell_idx != zero_cell_idx)
element_count.fetch_add(1, std::memory_order_relaxed);
cell.id = id;
if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0)
{
std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec};
cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)});
}
else
cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());
/// inform caller
on_cell_updated(id, cell_idx);
/// mark corresponding id as found
remaining_ids[id] = 1;
}
}
stream->readSuffix();
error_count = 0;
last_exception = std::exception_ptr{};
backoff_end_time = std::chrono::system_clock::time_point{};
ProfileEvents::increment(ProfileEvents::DictCacheRequestTimeNs, watch.elapsed());
}
catch (...)
{
++error_count;
last_exception = std::current_exception();
backoff_end_time = now + std::chrono::seconds(ExternalLoadableBackoff{}.calculateDuration(rnd_engine, error_count));
stream->readSuffix();
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, requested_ids.size());
ProfileEvents::increment(ProfileEvents::DictCacheRequestTimeNs, watch.elapsed());
tryLogException(last_exception, log, "Could not update cache dictionary '" + getName() +
"', next update is scheduled at " + DateLUT::instance().timeToString(std::chrono::system_clock::to_time_t(backoff_end_time)));
}
}
size_t not_found_num = 0, found_num = 0;
const auto now = std::chrono::system_clock::now();
/// Check which ids have not been found and require setting null_value
for (const auto & id_found_pair : remaining_ids)
{
@ -328,24 +354,45 @@ void CacheDictionary::update(
const auto find_result = findCellIdx(id, now);
const auto & cell_idx = find_result.cell_idx;
auto & cell = cells[cell_idx];
/// Set null_value for each attribute
for (auto & attribute : attributes)
setDefaultAttributeValue(attribute, cell_idx);
if (error_count)
{
if (find_result.outdated)
{
/// We have expired data for that `id` so we can continue using it.
bool was_default = cell.isDefault();
cell.setExpiresAt(backoff_end_time);
if (was_default)
cell.setDefault();
if (was_default)
on_id_not_found(id, cell_idx);
else
on_cell_updated(id, cell_idx);
continue;
}
/// We don't have expired data for that `id` so all we can do is to rethrow `last_exception`.
std::rethrow_exception(last_exception);
}
/// Check if cell had not been occupied before and increment element counter if it hadn't
if (cell.id == 0 && cell_idx != zero_cell_idx)
element_count.fetch_add(1, std::memory_order_relaxed);
cell.id = id;
if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0)
cell.setExpiresAt(std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)});
{
std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec};
cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)});
}
else
cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());
/// Set null_value for each attribute
cell.setDefault();
for (auto & attribute : attributes)
setDefaultAttributeValue(attribute, cell_idx);
/// inform caller that the cell has not been found
on_id_not_found(id, cell_idx);

View File

@ -56,6 +56,8 @@ struct IDictionaryBase : public IExternalLoadable
return source && source->isModified();
}
virtual std::exception_ptr getLastException() const { return {}; }
std::shared_ptr<IDictionaryBase> shared_from_this()
{
return std::static_pointer_cast<IDictionaryBase>(IExternalLoadable::shared_from_this());

View File

@ -65,7 +65,7 @@ FunctionBasePtr FunctionBuilderJoinGet::buildImpl(const ColumnsWithTypeAndName &
auto join = storage_join->getJoin();
DataTypes data_types(arguments.size());
auto table_lock = storage_join->lockStructureForShare(false, context.getCurrentQueryId());
auto table_lock = storage_join->lockStructureForShare(false, context.getInitialQueryId());
for (size_t i = 0; i < arguments.size(); ++i)
data_types[i] = arguments[i].type;

View File

@ -733,16 +733,15 @@ struct JSONExtractTree
if (!JSONParser::firstArrayElement(it2))
return false;
size_t index = 0;
do
for (size_t index = 0; index != nested.size(); ++index)
{
if (nested[index]->addValueToColumn(tuple.getColumn(index), it2))
were_valid_elements = true;
else
tuple.getColumn(index).insertDefault();
++index;
if (!JSONParser::nextArrayElement(it2))
break;
}
while (JSONParser::nextArrayElement(it2));
set_size(old_size + static_cast<size_t>(were_valid_elements));
return were_valid_elements;
@ -756,16 +755,15 @@ struct JSONExtractTree
if (!JSONParser::firstObjectMember(it2))
return false;
size_t index = 0;
do
for (size_t index = 0; index != nested.size(); ++index)
{
if (nested[index]->addValueToColumn(tuple.getColumn(index), it2))
were_valid_elements = true;
else
tuple.getColumn(index).insertDefault();
++index;
if (!JSONParser::nextObjectMember(it2))
break;
}
while (JSONParser::nextObjectMember(it2));
}
else
{

View File

@ -6,12 +6,14 @@ class FunctionFactory;
void registerFunctionAddressToSymbol(FunctionFactory & factory);
void registerFunctionDemangle(FunctionFactory & factory);
void registerFunctionAddressToLine(FunctionFactory & factory);
void registerFunctionTrap(FunctionFactory & factory);
void registerFunctionsIntrospection(FunctionFactory & factory)
{
registerFunctionAddressToSymbol(factory);
registerFunctionDemangle(factory);
registerFunctionAddressToLine(factory);
registerFunctionTrap(factory);
}
}

143
dbms/src/Functions/trap.cpp Normal file
View File

@ -0,0 +1,143 @@
#if 0
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnString.h>
#include <thread>
#include <memory>
#include <cstdlib>
#include <unistd.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int BAD_ARGUMENTS;
}
/// Various illegal actions to test diagnostic features of ClickHouse itself. Should not be enabled in production builds.
class FunctionTrap : public IFunction
{
public:
static constexpr auto name = "trap";
static FunctionPtr create(const Context &)
{
return std::make_shared<FunctionTrap>();
}
String getName() const override
{
return name;
}
size_t getNumberOfArguments() const override
{
return 1;
}
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (!isString(arguments[0]))
throw Exception("The only argument for function " + getName() + " must be constant String", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return std::make_shared<DataTypeUInt8>();
}
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) override
{
if (const ColumnConst * column = checkAndGetColumnConst<ColumnString>(block.getByPosition(arguments[0]).column.get()))
{
String mode = column->getValue<String>();
if (mode == "read nullptr c++")
{
volatile int x = *reinterpret_cast<const volatile int *>(0);
(void)x;
}
else if (mode == "read nullptr asm")
{
__asm__ volatile ("movq $0, %rax");
__asm__ volatile ("movq (%rax), %rax");
}
else if (mode == "illegal instruction")
{
__asm__ volatile ("ud2a");
}
else if (mode == "abort")
{
abort();
}
else if (mode == "use after free")
{
int * x_ptr;
{
auto x = std::make_unique<int>();
x_ptr = x.get();
}
*x_ptr = 1;
(void)x_ptr;
}
else if (mode == "use after scope")
{
volatile int * x_ptr;
[&]{
volatile int x = 0;
x_ptr = &x;
(void)x;
}();
[&]{
volatile int y = 1;
*x_ptr = 2;
(void)y;
}();
(void)x_ptr;
}
else if (mode == "uninitialized memory")
{
int x;
(void)write(2, &x, sizeof(x));
}
else if (mode == "data race")
{
int x = 0;
std::thread t1([&]{ ++x; });
std::thread t2([&]{ ++x; });
t1.join();
t2.join();
}
else
throw Exception("Unknown trap mode", ErrorCodes::BAD_ARGUMENTS);
}
else
throw Exception("The only argument for function " + getName() + " must be constant String", ErrorCodes::ILLEGAL_COLUMN);
block.getByPosition(result).column = block.getByPosition(result).type->createColumnConst(input_rows_count, 0ULL);
}
};
void registerFunctionTrap(FunctionFactory & factory)
{
factory.registerFunction<FunctionTrap>();
}
}
#else
namespace DB
{
class FunctionFactory;
void registerFunctionTrap(FunctionFactory &) {}
}
#endif

View File

@ -6,11 +6,6 @@
#include <Common/ProfileEvents.h>
#include <IO/BufferWithOwnMemory.h>
#include <Common/config.h>
#if USE_MIMALLOC
#include <Common/MiAllocator.h>
#endif
namespace ProfileEvents
{
@ -25,11 +20,7 @@ namespace DB
struct UncompressedCacheCell
{
#if USE_MIMALLOC
Memory<MiAllocator> data;
#else
Memory<> data;
#endif
size_t compressed_size;
UInt32 additional_bytes;
};

View File

@ -10,34 +10,38 @@ namespace ProfileEvents
{
extern const Event CreatedReadBufferOrdinary;
extern const Event CreatedReadBufferAIO;
extern const Event CreatedReadBufferAIOFailed;
}
namespace DB
{
#if !defined(__linux__)
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
#endif
std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(const std::string & filename_, size_t estimated_size,
size_t aio_threshold, size_t buffer_size_, int flags_, char * existing_memory_, size_t alignment)
{
if ((aio_threshold == 0) || (estimated_size < aio_threshold))
{
ProfileEvents::increment(ProfileEvents::CreatedReadBufferOrdinary);
return std::make_unique<ReadBufferFromFile>(filename_, buffer_size_, flags_, existing_memory_, alignment);
}
else
{
#if defined(__linux__) || defined(__FreeBSD__)
ProfileEvents::increment(ProfileEvents::CreatedReadBufferAIO);
return std::make_unique<ReadBufferAIO>(filename_, buffer_size_, flags_, existing_memory_);
#else
throw Exception("AIO is implemented only on Linux and FreeBSD", ErrorCodes::NOT_IMPLEMENTED);
#endif
if (aio_threshold && estimated_size >= aio_threshold)
{
/// Attempt to open a file with O_DIRECT
try
{
auto res = std::make_unique<ReadBufferAIO>(filename_, buffer_size_, flags_, existing_memory_);
ProfileEvents::increment(ProfileEvents::CreatedReadBufferAIO);
return res;
}
catch (const ErrnoException &)
{
/// Fallback to cached IO if O_DIRECT is not supported.
ProfileEvents::increment(ProfileEvents::CreatedReadBufferAIOFailed);
}
}
#else
(void)aio_threshold;
(void)estimated_size;
#endif
ProfileEvents::increment(ProfileEvents::CreatedReadBufferOrdinary);
return std::make_unique<ReadBufferFromFile>(filename_, buffer_size_, flags_, existing_memory_, alignment);
}
}

View File

@ -10,36 +10,39 @@ namespace ProfileEvents
{
extern const Event CreatedWriteBufferOrdinary;
extern const Event CreatedWriteBufferAIO;
extern const Event CreatedWriteBufferAIOFailed;
}
namespace DB
{
#if !defined(__linux__)
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
#endif
std::unique_ptr<WriteBufferFromFileBase> createWriteBufferFromFileBase(const std::string & filename_, size_t estimated_size,
size_t aio_threshold, size_t buffer_size_, int flags_, mode_t mode, char * existing_memory_,
size_t alignment)
{
if ((aio_threshold == 0) || (estimated_size < aio_threshold))
{
ProfileEvents::increment(ProfileEvents::CreatedWriteBufferOrdinary);
return std::make_unique<WriteBufferFromFile>(filename_, buffer_size_, flags_, mode, existing_memory_, alignment);
}
else
{
#if defined(__linux__) || defined(__FreeBSD__)
ProfileEvents::increment(ProfileEvents::CreatedWriteBufferAIO);
return std::make_unique<WriteBufferAIO>(filename_, buffer_size_, flags_, mode, existing_memory_);
#else
throw Exception("AIO is implemented only on Linux and FreeBSD", ErrorCodes::NOT_IMPLEMENTED);
#endif
if (aio_threshold && estimated_size >= aio_threshold)
{
/// Attempt to open a file with O_DIRECT
try
{
auto res = std::make_unique<WriteBufferAIO>(filename_, buffer_size_, flags_, mode, existing_memory_);
ProfileEvents::increment(ProfileEvents::CreatedWriteBufferAIO);
return res;
}
catch (const ErrnoException &)
{
/// Fallback to cached IO if O_DIRECT is not supported.
ProfileEvents::increment(ProfileEvents::CreatedWriteBufferAIOFailed);
}
}
#else
(void)aio_threshold;
(void)estimated_size;
#endif
ProfileEvents::increment(ProfileEvents::CreatedWriteBufferOrdinary);
return std::make_unique<WriteBufferFromFile>(filename_, buffer_size_, flags_, mode, existing_memory_, alignment);
}
}

View File

@ -1132,7 +1132,7 @@ void Context::updateSettingsChanges(const SettingsChanges & changes)
if (change.name == "profile")
setProfile(change.value.safeGet<String>());
else
settings.updateFromChange(change);
settings.applyChange(change);
}
}
@ -1162,6 +1162,12 @@ String Context::getCurrentQueryId() const
}
String Context::getInitialQueryId() const
{
return client_info.initial_query_id;
}
void Context::setCurrentDatabase(const String & name)
{
auto lock = getLock();

View File

@ -264,6 +264,10 @@ public:
String getCurrentDatabase() const;
String getCurrentQueryId() const;
/// Id of initiating query for distributed queries; or current query id if it's not a distributed query.
String getInitialQueryId() const;
void setCurrentDatabase(const String & name);
void setCurrentQueryId(const String & query_id);

View File

@ -726,7 +726,7 @@ void ExpressionActions::addImpl(ExpressionAction action, Names & new_names)
new_names.push_back(action.result_name);
new_names.insert(new_names.end(), action.array_joined_columns.begin(), action.array_joined_columns.end());
/// Compiled functions are custom functions and them don't need building
/// Compiled functions are custom functions and they don't need building
if (action.type == ExpressionAction::APPLY_FUNCTION && !action.is_function_compiled)
{
if (sample_block.has(action.result_name))

View File

@ -1,6 +1,5 @@
#include "ExternalLoader.h"
#include <cmath>
#include <mutex>
#include <pcg_random.hpp>
#include <common/DateLUT.h>
@ -933,6 +932,8 @@ private:
class ExternalLoader::PeriodicUpdater : private boost::noncopyable
{
public:
static constexpr UInt64 check_period_sec = 5;
PeriodicUpdater(ConfigFilesReader & config_files_reader_, LoadingDispatcher & loading_dispatcher_)
: config_files_reader(config_files_reader_), loading_dispatcher(loading_dispatcher_)
{
@ -940,11 +941,10 @@ public:
~PeriodicUpdater() { enable(false); }
void enable(bool enable_, const ExternalLoaderUpdateSettings & settings_ = {})
void enable(bool enable_)
{
std::unique_lock lock{mutex};
enabled = enable_;
settings = settings_;
if (enable_)
{
@ -985,9 +985,7 @@ public:
return std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)};
}
std::uniform_int_distribution<UInt64> distribution(0, static_cast<UInt64>(std::exp2(error_count - 1)));
std::chrono::seconds delay(std::min<UInt64>(settings.backoff_max_sec, settings.backoff_initial_sec + distribution(rnd_engine)));
return std::chrono::system_clock::now() + delay;
return std::chrono::system_clock::now() + std::chrono::seconds(ExternalLoadableBackoff{}.calculateDuration(rnd_engine, error_count));
}
private:
@ -996,9 +994,8 @@ private:
setThreadName("ExterLdrReload");
std::unique_lock lock{mutex};
auto timeout = [this] { return std::chrono::seconds(settings.check_period_sec); };
auto pred = [this] { return !enabled; };
while (!event.wait_for(lock, timeout(), pred))
while (!event.wait_for(lock, std::chrono::seconds(check_period_sec), pred))
{
lock.unlock();
loading_dispatcher.setConfiguration(config_files_reader.read());
@ -1012,7 +1009,6 @@ private:
mutable std::mutex mutex;
bool enabled = false;
ExternalLoaderUpdateSettings settings;
ThreadFromGlobalPool thread;
std::condition_variable event;
mutable pcg64 rnd_engine{randomSeed()};
@ -1051,9 +1047,9 @@ void ExternalLoader::enableAsyncLoading(bool enable)
loading_dispatcher->enableAsyncLoading(enable);
}
void ExternalLoader::enablePeriodicUpdates(bool enable_, const ExternalLoaderUpdateSettings & settings_)
void ExternalLoader::enablePeriodicUpdates(bool enable_)
{
periodic_updater->enable(enable_, settings_);
periodic_updater->enable(enable_);
}
bool ExternalLoader::hasCurrentlyLoadedObjects() const

View File

@ -11,19 +11,6 @@
namespace DB
{
struct ExternalLoaderUpdateSettings
{
UInt64 check_period_sec = 5;
UInt64 backoff_initial_sec = 5;
/// 10 minutes
UInt64 backoff_max_sec = 10 * 60;
ExternalLoaderUpdateSettings() = default;
ExternalLoaderUpdateSettings(UInt64 check_period_sec_, UInt64 backoff_initial_sec_, UInt64 backoff_max_sec_)
: check_period_sec(check_period_sec_), backoff_initial_sec(backoff_initial_sec_), backoff_max_sec(backoff_max_sec_) {}
};
/* External configuration structure.
*
* <external_group>
@ -105,7 +92,7 @@ public:
void enableAsyncLoading(bool enable);
/// Sets settings for periodic updates.
void enablePeriodicUpdates(bool enable, const ExternalLoaderUpdateSettings & settings = {});
void enablePeriodicUpdates(bool enable);
/// Returns the status of the object.
/// If the object has not been loaded yet then the function returns Status::NOT_LOADED.

View File

@ -1,7 +1,7 @@
#include <Interpreters/IExternalLoadable.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <cmath>
namespace DB
{
@ -16,4 +16,13 @@ ExternalLoadableLifetime::ExternalLoadableLifetime(const Poco::Util::AbstractCon
max_sec = has_min ? config.getUInt64(config_prefix + ".max") : min_sec;
}
UInt64 ExternalLoadableBackoff::calculateDuration(pcg64 & rnd_engine, size_t error_count) const
{
if (error_count < 1)
error_count = 1;
std::uniform_int_distribution<UInt64> distribution(0, static_cast<UInt64>(std::exp2(error_count - 1)));
return std::min<UInt64>(backoff_max_sec, backoff_initial_sec + distribution(rnd_engine));
}
}

View File

@ -3,6 +3,7 @@
#include <string>
#include <memory>
#include <boost/noncopyable.hpp>
#include <pcg_random.hpp>
#include <Core/Types.h>
@ -25,6 +26,17 @@ struct ExternalLoadableLifetime
};
/// Delay before trying to load again after error.
struct ExternalLoadableBackoff
{
UInt64 backoff_initial_sec = 5;
UInt64 backoff_max_sec = 10 * 60; /// 10 minutes
/// Calculates time to try loading again after error.
UInt64 calculateDuration(pcg64 & rnd_engine, size_t error_count = 1) const;
};
/// Basic interface for external loadable objects. Is used in ExternalLoader.
class IExternalLoadable : public std::enable_shared_from_this<IExternalLoadable>, private boost::noncopyable
{

View File

@ -92,7 +92,7 @@ BlockInputStreamPtr InterpreterDescribeQuery::executeImpl()
table = context.getTable(database_name, table_name);
}
auto table_lock = table->lockStructureForShare(false, context.getCurrentQueryId());
auto table_lock = table->lockStructureForShare(false, context.getInitialQueryId());
columns = table->getColumns();
}

View File

@ -38,8 +38,8 @@ namespace ErrorCodes
InterpreterInsertQuery::InterpreterInsertQuery(
const ASTPtr & query_ptr_, const Context & context_, bool allow_materialized_)
: query_ptr(query_ptr_), context(context_), allow_materialized(allow_materialized_)
const ASTPtr & query_ptr_, const Context & context_, bool allow_materialized_, bool no_squash_)
: query_ptr(query_ptr_), context(context_), allow_materialized(allow_materialized_), no_squash(no_squash_)
{
checkStackSize();
}
@ -100,7 +100,7 @@ BlockIO InterpreterInsertQuery::execute()
checkAccess(query);
StoragePtr table = getTable(query);
auto table_lock = table->lockStructureForShare(true, context.getCurrentQueryId());
auto table_lock = table->lockStructureForShare(true, context.getInitialQueryId());
/// We create a pipeline of several streams, into which we will write data.
BlockOutputStreamPtr out;
@ -109,7 +109,7 @@ BlockIO InterpreterInsertQuery::execute()
/// Do not squash blocks if it is a sync INSERT into Distributed, since it lead to double bufferization on client and server side.
/// Client-side bufferization might cause excessive timeouts (especially in case of big blocks).
if (!(context.getSettingsRef().insert_distributed_sync && table->isRemote()))
if (!(context.getSettingsRef().insert_distributed_sync && table->isRemote()) && !no_squash)
{
out = std::make_shared<SquashingBlockOutputStream>(
out, out->getHeader(), context.getSettingsRef().min_insert_block_size_rows, context.getSettingsRef().min_insert_block_size_bytes);

View File

@ -15,7 +15,7 @@ namespace DB
class InterpreterInsertQuery : public IInterpreter
{
public:
InterpreterInsertQuery(const ASTPtr & query_ptr_, const Context & context_, bool allow_materialized_ = false);
InterpreterInsertQuery(const ASTPtr & query_ptr_, const Context & context_, bool allow_materialized_ = false, bool no_squash_ = false);
/** Prepare a request for execution. Return block streams
* - the stream into which you can write data to execute the query, if INSERT;
@ -33,7 +33,8 @@ private:
ASTPtr query_ptr;
const Context & context;
bool allow_materialized;
const bool allow_materialized;
const bool no_squash;
};

View File

@ -265,11 +265,6 @@ Block InterpreterKillQueryQuery::getSelectResult(const String & columns, const S
if (where_expression)
select_query += " WHERE " + queryToString(where_expression);
auto use_processors = context.getSettingsRef().experimental_use_processors;
context.getSettingsRef().experimental_use_processors = false;
SCOPE_EXIT(context.getSettingsRef().experimental_use_processors = use_processors);
BlockIO block_io = executeQuery(select_query, context, true);
Block res = block_io.in->read();

View File

@ -26,8 +26,6 @@ struct RenameDescription
to_table_name(elem.to.table)
{}
TableStructureWriteLockHolder from_table_lock;
String from_database_name;
String from_table_name;
@ -77,8 +75,6 @@ BlockIO InterpreterRenameQuery::execute()
}
};
std::map<UniqueTableName, TableStructureWriteLockHolder> tables_from_locks;
/// Don't allow to drop tables (that we are renaming); don't allow to create tables in places where tables will be renamed.
std::map<UniqueTableName, std::unique_ptr<DDLGuard>> table_guards;
@ -89,36 +85,26 @@ BlockIO InterpreterRenameQuery::execute()
UniqueTableName from(descriptions.back().from_database_name, descriptions.back().from_table_name);
UniqueTableName to(descriptions.back().to_database_name, descriptions.back().to_table_name);
if (!tables_from_locks.count(from))
if (auto table = context.tryGetTable(from.database_name, from.table_name))
tables_from_locks.emplace(from, table->lockExclusively(context.getCurrentQueryId()));
descriptions.back().from_table_lock = tables_from_locks[from];
if (!table_guards.count(from))
table_guards.emplace(from, context.getDDLGuard(from.database_name, from.table_name));
if (!table_guards.count(to))
table_guards.emplace(to, context.getDDLGuard(to.database_name, to.table_name));
table_guards[from];
table_guards[to];
}
/** All tables are locked. If there are more than one rename in chain,
* we need to hold global lock while doing all renames. Order matters to avoid deadlocks.
* It provides atomicity of all RENAME chain as a whole, from the point of view of DBMS client,
* but only in cases when there was no exceptions during this process and server does not fall.
*/
decltype(context.getLock()) lock;
if (descriptions.size() > 1)
lock = context.getLock();
/// Must do it in consistent order.
for (auto & table_guard : table_guards)
table_guard.second = context.getDDLGuard(table_guard.first.database_name, table_guard.first.table_name);
for (auto & elem : descriptions)
{
context.assertTableDoesntExist(elem.to_database_name, elem.to_table_name);
auto from_table = context.getTable(elem.from_database_name, elem.from_table_name);
auto from_table_lock = from_table->lockExclusively(context.getCurrentQueryId());
context.getDatabase(elem.from_database_name)->renameTable(
context, elem.from_table_name, *context.getDatabase(elem.to_database_name), elem.to_table_name, elem.from_table_lock);
context,
elem.from_table_name,
*context.getDatabase(elem.to_database_name),
elem.to_table_name,
from_table_lock);
}
return {};

View File

@ -294,7 +294,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
}
if (storage)
table_lock = storage->lockStructureForShare(false, context.getCurrentQueryId());
table_lock = storage->lockStructureForShare(false, context.getInitialQueryId());
syntax_analyzer_result = SyntaxAnalyzer(context, options).analyze(
query_ptr, source_header.getNamesAndTypesList(), required_result_column_names, storage, NamesAndTypesList());

View File

@ -289,13 +289,15 @@ void MutationsInterpreter::prepare(bool dry_run)
throw Exception("Unknown mutation command type: " + DB::toString<int>(command.type), ErrorCodes::UNKNOWN_MUTATION_COMMAND);
}
/// We cares about affected indices because we also need to rewrite them
/// when one of index columns updated or filtered with delete
if (!affected_indices_columns.empty())
{
if (!stages.empty())
{
std::vector<Stage> stages_copy;
/// Copy all filled stages except index calculation stage.
for (const auto &stage : stages)
for (const auto & stage : stages)
{
stages_copy.emplace_back(context);
stages_copy.back().column_to_updated = stage.column_to_updated;

View File

@ -444,6 +444,7 @@ QueryStatusInfo QueryStatus::getInfo(bool get_thread_list, bool get_profile_even
{
std::lock_guard lock(thread_group->mutex);
res.thread_numbers = thread_group->thread_numbers;
res.os_thread_ids = thread_group->os_thread_ids;
}
if (get_profile_events)

View File

@ -66,6 +66,7 @@ struct QueryStatusInfo
/// Optional fields, filled by request
std::vector<UInt32> thread_numbers;
std::vector<UInt32> os_thread_ids;
std::shared_ptr<ProfileEvents::Counters> profile_counters;
std::shared_ptr<Settings> query_settings;
};

View File

@ -78,6 +78,7 @@ Block QueryLogElement::createBlock()
{std::make_shared<DataTypeUInt32>(), "revision"},
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt32>()), "thread_numbers"},
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt32>()), "os_thread_ids"},
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "ProfileEvents.Names"},
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>()), "ProfileEvents.Values"},
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "Settings.Names"},
@ -123,6 +124,14 @@ void QueryLogElement::appendToBlock(Block & block) const
columns[i++]->insert(threads_array);
}
{
Array threads_array;
threads_array.reserve(os_thread_ids.size());
for (const UInt32 thread_number : os_thread_ids)
threads_array.emplace_back(UInt64(thread_number));
columns[i++]->insert(threads_array);
}
if (profile_counters)
{
auto column_names = columns[i++].get();

View File

@ -60,6 +60,7 @@ struct QueryLogElement
ClientInfo client_info;
std::vector<UInt32> thread_numbers;
std::vector<UInt32> os_thread_ids;
std::shared_ptr<ProfileEvents::Counters> profile_counters;
std::shared_ptr<Settings> query_settings;

View File

@ -61,6 +61,7 @@ void ThreadStatus::initializeQuery()
thread_group->memory_tracker.setDescription("(for query)");
thread_group->thread_numbers.emplace_back(thread_number);
thread_group->os_thread_ids.emplace_back(os_thread_id);
thread_group->master_thread_number = thread_number;
thread_group->master_thread_os_id = os_thread_id;
@ -99,6 +100,7 @@ void ThreadStatus::attachQuery(const ThreadGroupStatusPtr & thread_group_, bool
/// NOTE: A thread may be attached multiple times if it is reused from a thread pool.
thread_group->thread_numbers.emplace_back(thread_number);
thread_group->os_thread_ids.emplace_back(os_thread_id);
}
if (query_context)

View File

@ -400,6 +400,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
}
elem.thread_numbers = std::move(info.thread_numbers);
elem.os_thread_ids = std::move(info.os_thread_ids);
elem.profile_counters = std::move(info.profile_counters);
if (log_queries)
@ -437,6 +438,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
elem.memory_usage = info.peak_memory_usage > 0 ? info.peak_memory_usage : 0;
elem.thread_numbers = std::move(info.thread_numbers);
elem.os_thread_ids = std::move(info.os_thread_ids);
elem.profile_counters = std::move(info.profile_counters);
}

View File

@ -41,7 +41,7 @@ public:
return res;
}
void formatImpl(const FormatSettings & s, FormatState &state, FormatStateStacked frame) const override
void formatImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const override
{
frame.need_parens = false;
std::string indent_str = s.one_line ? "" : std::string(4 * frame.indent, ' ');

View File

@ -543,15 +543,8 @@ void AlterCommands::validate(const IStorage & table, const Context & context)
}
}
else if (command.type == AlterCommand::MODIFY_SETTING)
{
for (const auto & change : command.settings_changes)
{
if (!table.hasSetting(change.name))
{
throw Exception{"Storage '" + table.getName() + "' doesn't have setting '" + change.name + "'", ErrorCodes::UNKNOWN_SETTING};
}
}
}
table.checkSettingCanBeChanged(change.name);
}
/** Existing defaulted columns may require default expression extensions with a type conversion,

View File

@ -309,11 +309,10 @@ bool IStorage::isVirtualColumn(const String & column_name) const
return getColumns().get(column_name).is_virtual;
}
bool IStorage::hasSetting(const String & /* setting_name */) const
void IStorage::checkSettingCanBeChanged(const String & /* setting_name */) const
{
if (!supportsSettings())
throw Exception("Storage '" + getName() + "' doesn't support settings.", ErrorCodes::SETTINGS_ARE_NOT_SUPPORTED);
return false;
}
TableStructureReadLockHolder IStorage::lockStructureForShare(bool will_add_new_data, const String & query_id)
@ -381,16 +380,13 @@ IDatabase::ASTModifier IStorage::getSettingsModifier(const SettingsChanges & new
/// Make storage settings unique
for (const auto & change : new_changes)
{
if (hasSetting(change.name))
{
auto finder = [&change] (const SettingChange & c) { return c.name == change.name; };
if (auto it = std::find_if(storage_changes.begin(), storage_changes.end(), finder); it != storage_changes.end())
it->value = change.value;
else
storage_changes.push_back(change);
}
checkSettingCanBeChanged(change.name);
auto finder = [&change] (const SettingChange & c) { return c.name == change.name; };
if (auto it = std::find_if(storage_changes.begin(), storage_changes.end(), finder); it != storage_changes.end())
it->value = change.value;
else
throw Exception{"Storage '" + getName() + "' doesn't have setting '" + change.name + "'", ErrorCodes::UNKNOWN_SETTING};
storage_changes.push_back(change);
}
}
};

View File

@ -139,8 +139,8 @@ public: /// thread-unsafe part. lockStructure must be acquired
/// If |need_all| is set, then checks that all the columns of the table are in the block.
void check(const Block & block, bool need_all = false) const;
/// Check storage has setting. Exception will be thrown if it doesn't support settings at all.
virtual bool hasSetting(const String & setting_name) const;
/// Check storage has setting and setting can be modified.
virtual void checkSettingCanBeChanged(const String & setting_name) const;
protected: /// still thread-unsafe part.
void setIndices(IndicesDescription indices_);
@ -150,7 +150,7 @@ protected: /// still thread-unsafe part.
virtual bool isVirtualColumn(const String & column_name) const;
/// Returns modifier of settings in storage definition
virtual IDatabase::ASTModifier getSettingsModifier(const SettingsChanges & new_changes) const;
IDatabase::ASTModifier getSettingsModifier(const SettingsChanges & new_changes) const;
private:
ColumnsDescription columns; /// combined real and virtual columns

View File

@ -22,7 +22,7 @@ void KafkaSettings::loadFromQuery(ASTStorage & storage_def)
{
try
{
loadFromChanges(storage_def.settings->changes);
applyChanges(storage_def.settings->changes);
}
catch (Exception & e)
{

View File

@ -15,18 +15,17 @@ struct KafkaSettings : public SettingsCollection<KafkaSettings>
{
/// M (mutable) for normal settings, IM (immutable) for not updateable settings.
#define LIST_OF_KAFKA_SETTINGS(M, IM) \
IM(SettingString, kafka_broker_list, "", "A comma-separated list of brokers for Kafka engine.") \
IM(SettingString, kafka_topic_list, "", "A list of Kafka topics.") \
IM(SettingString, kafka_group_name, "", "A group of Kafka consumers.") \
IM(SettingString, kafka_format, "", "The message format for Kafka engine.") \
IM(SettingChar, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.") \
IM(SettingString, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine") \
IM(SettingUInt64, kafka_num_consumers, 1, "The number of consumers per table for Kafka engine.") \
IM(SettingUInt64, kafka_max_block_size, 0, "The maximum block size per table for Kafka engine.") \
IM(SettingUInt64, kafka_skip_broken_messages, 0, "Skip at least this number of broken messages from Kafka topic per block") \
IM(SettingUInt64, kafka_commit_every_batch, 0, "Commit every consumed and handled batch instead of a single commit after writing a whole block")
#define LIST_OF_KAFKA_SETTINGS(M) \
M(SettingString, kafka_broker_list, "", "A comma-separated list of brokers for Kafka engine.") \
M(SettingString, kafka_topic_list, "", "A list of Kafka topics.") \
M(SettingString, kafka_group_name, "", "A group of Kafka consumers.") \
M(SettingString, kafka_format, "", "The message format for Kafka engine.") \
M(SettingChar, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.") \
M(SettingString, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine") \
M(SettingUInt64, kafka_num_consumers, 1, "The number of consumers per table for Kafka engine.") \
M(SettingUInt64, kafka_max_block_size, 0, "The maximum block size per table for Kafka engine.") \
M(SettingUInt64, kafka_skip_broken_messages, 0, "Skip at least this number of broken messages from Kafka topic per block") \
M(SettingUInt64, kafka_commit_every_batch, 0, "Commit every consumed and handled batch instead of a single commit after writing a whole block")
DECLARE_SETTINGS_COLLECTION(LIST_OF_KAFKA_SETTINGS)

View File

@ -4,14 +4,21 @@ namespace DB
{
using namespace std::chrono_literals;
ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
ConsumerPtr consumer_, Poco::Logger * log_, size_t max_batch_size, size_t poll_timeout_, bool intermediate_commit_)
ConsumerPtr consumer_,
Poco::Logger * log_,
size_t max_batch_size,
size_t poll_timeout_,
bool intermediate_commit_,
const std::atomic<bool> & stopped_)
: ReadBuffer(nullptr, 0)
, consumer(consumer_)
, log(log_)
, batch_size(max_batch_size)
, poll_timeout(poll_timeout_)
, intermediate_commit(intermediate_commit_)
, stopped(stopped_)
, current(messages.begin())
{
}
@ -26,11 +33,46 @@ ReadBufferFromKafkaConsumer::~ReadBufferFromKafkaConsumer()
void ReadBufferFromKafkaConsumer::commit()
{
auto PrintOffsets = [this] (const char * prefix, const cppkafka::TopicPartitionList & offsets)
{
for (const auto & topic_part : offsets)
{
auto print_special_offset = [&topic_part]
{
switch (topic_part.get_offset())
{
case cppkafka::TopicPartition::OFFSET_BEGINNING: return "BEGINNING";
case cppkafka::TopicPartition::OFFSET_END: return "END";
case cppkafka::TopicPartition::OFFSET_STORED: return "STORED";
case cppkafka::TopicPartition::OFFSET_INVALID: return "INVALID";
default: return "";
}
};
if (topic_part.get_offset() < 0)
{
LOG_TRACE(
log,
prefix << " " << print_special_offset() << " (topic: " << topic_part.get_topic()
<< ", partition: " << topic_part.get_partition() << ")");
}
else
{
LOG_TRACE(
log,
prefix << " " << topic_part.get_offset() << " (topic: " << topic_part.get_topic()
<< ", partition: " << topic_part.get_partition() << ")");
}
}
};
PrintOffsets("Polled offset", consumer->get_offsets_position(consumer->get_assignment()));
if (current != messages.end())
{
/// Since we can poll more messages than we already processed,
/// commit only processed messages.
consumer->async_commit(*current);
consumer->async_commit(*std::prev(current));
}
else
{
@ -41,14 +83,7 @@ void ReadBufferFromKafkaConsumer::commit()
consumer->async_commit();
}
const auto & offsets = consumer->get_offsets_committed(consumer->get_assignment());
for (const auto & topic_part : offsets)
{
LOG_TRACE(
log,
"Committed offset " << topic_part.get_offset() << " (topic: " << topic_part.get_topic()
<< ", partition: " << topic_part.get_partition() << ")");
}
PrintOffsets("Committed offset", consumer->get_offsets_committed(consumer->get_assignment()));
stalled = false;
}
@ -114,7 +149,7 @@ bool ReadBufferFromKafkaConsumer::nextImpl()
/// NOTE: ReadBuffer was implemented with an immutable underlying contents in mind.
/// If we failed to poll any message once - don't try again.
/// Otherwise, the |poll_timeout| expectations get flawn.
if (stalled)
if (stalled || stopped)
return false;
if (current == messages.end())

View File

@ -17,7 +17,12 @@ class ReadBufferFromKafkaConsumer : public ReadBuffer
{
public:
ReadBufferFromKafkaConsumer(
ConsumerPtr consumer_, Poco::Logger * log_, size_t max_batch_size, size_t poll_timeout_, bool intermediate_commit_);
ConsumerPtr consumer_,
Poco::Logger * log_,
size_t max_batch_size,
size_t poll_timeout_,
bool intermediate_commit_,
const std::atomic<bool> & stopped_);
~ReadBufferFromKafkaConsumer() override;
void commit(); // Commit all processed messages.
@ -43,6 +48,8 @@ private:
bool stalled = false;
bool intermediate_commit = true;
const std::atomic<bool> & stopped;
Messages messages;
Messages::const_iterator current;

View File

@ -44,6 +44,8 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int UNSUPPORTED_METHOD;
extern const int UNKNOWN_SETTING;
extern const int READONLY_SETTING;
}
namespace
@ -274,8 +276,10 @@ ConsumerBufferPtr StorageKafka::createReadBuffer()
batch_size = settings.max_block_size.value;
size_t poll_timeout = settings.stream_poll_timeout_ms.totalMilliseconds();
/// NOTE: we pass |stream_cancelled| by reference here, so the buffers should not outlive the storage.
return std::make_shared<DelimitedReadBuffer>(
std::make_unique<ReadBufferFromKafkaConsumer>(consumer, log, batch_size, poll_timeout, intermediate_commit), row_delimiter);
std::make_unique<ReadBufferFromKafkaConsumer>(consumer, log, batch_size, poll_timeout, intermediate_commit, stream_cancelled),
row_delimiter);
}
@ -371,7 +375,7 @@ bool StorageKafka::streamToViews()
block_size = settings.max_block_size;
// Create a stream for each consumer and join them in a union stream
InterpreterInsertQuery interpreter{insert, global_context};
InterpreterInsertQuery interpreter(insert, global_context, false, true);
auto block_io = interpreter.execute();
// Create a stream for each consumer and join them in a union stream
@ -396,7 +400,8 @@ bool StorageKafka::streamToViews()
else
in = streams[0];
copyData(*in, *block_io.out, &stream_cancelled);
std::atomic<bool> stub;
copyData(*in, *block_io.out, &stub);
// Check whether the limits were applied during query execution
bool limits_applied = false;
@ -407,14 +412,12 @@ bool StorageKafka::streamToViews()
}
bool StorageKafka::hasSetting(const String & setting_name) const
void StorageKafka::checkSettingCanBeChanged(const String & setting_name) const
{
return KafkaSettings::findIndex(setting_name) != KafkaSettings::npos;
}
if (KafkaSettings::findIndex(setting_name) == KafkaSettings::npos)
throw Exception{"Storage '" + getName() + "' doesn't have setting '" + setting_name + "'", ErrorCodes::UNKNOWN_SETTING};
IDatabase::ASTModifier StorageKafka::getSettingsModifier(const SettingsChanges & /* new_changes */) const
{
throw Exception("Storage '" + getName() + "' doesn't support settings alter", ErrorCodes::UNSUPPORTED_METHOD);
throw Exception{"Setting '" + setting_name + "' is readonly for storage '" + getName() + "'", ErrorCodes::READONLY_SETTING};
}
void registerStorageKafka(StorageFactory & factory)

View File

@ -57,8 +57,7 @@ public:
const auto & getSchemaName() const { return schema_name; }
const auto & skipBroken() const { return skip_broken; }
bool hasSetting(const String & setting_name) const override;
void checkSettingCanBeChanged(const String & setting_name) const override;
protected:
StorageKafka(
@ -71,7 +70,6 @@ protected:
size_t num_consumers_, UInt64 max_block_size_, size_t skip_broken,
bool intermediate_commit_);
IDatabase::ASTModifier getSettingsModifier(const SettingsChanges & new_changes) const override;
private:
// Configuration and state
String table_name;

View File

@ -593,7 +593,7 @@ void registerStorageLiveView(StorageFactory & factory)
{
factory.registerStorage("LiveView", [](const StorageFactory::Arguments & args)
{
if (!args.local_context.getSettingsRef().allow_experimental_live_view)
if (!args.attach && !args.local_context.getSettingsRef().allow_experimental_live_view)
throw Exception("Experimental LIVE VIEW feature is not enabled (the setting 'allow_experimental_live_view')", ErrorCodes::SUPPORT_IS_DISABLED);
return StorageLiveView::create(args.table_name, args.database_name, args.local_context, args.query, args.columns);

View File

@ -333,15 +333,15 @@ void IMergedBlockOutputStream::calculateAndSerializeSkipIndices(
{
/// Creating block for update
Block indices_update_block(skip_indexes_columns);
size_t skip_index_current_mark = 0;
size_t skip_index_current_data_mark = 0;
/// Filling and writing skip indices like in IMergedBlockOutputStream::writeColumn
for (size_t i = 0; i < storage.skip_indices.size(); ++i)
for (size_t i = 0; i < skip_indices.size(); ++i)
{
const auto index = storage.skip_indices[i];
const auto index = skip_indices[i];
auto & stream = *skip_indices_streams[i];
size_t prev_pos = 0;
skip_index_current_mark = skip_index_mark;
skip_index_current_data_mark = skip_index_data_mark;
while (prev_pos < rows)
{
UInt64 limit = 0;
@ -351,7 +351,7 @@ void IMergedBlockOutputStream::calculateAndSerializeSkipIndices(
}
else
{
limit = index_granularity.getMarkRows(skip_index_current_mark);
limit = index_granularity.getMarkRows(skip_index_current_data_mark);
if (skip_indices_aggregators[i]->empty())
{
skip_indices_aggregators[i] = index->createIndexAggregator();
@ -366,9 +366,9 @@ void IMergedBlockOutputStream::calculateAndSerializeSkipIndices(
/// to be compatible with normal .mrk2 file format
if (can_use_adaptive_granularity)
writeIntBinary(1UL, stream.marks);
++skip_index_current_mark;
}
/// this mark is aggregated, go to the next one
skip_index_current_data_mark++;
}
size_t pos = prev_pos;
@ -388,7 +388,7 @@ void IMergedBlockOutputStream::calculateAndSerializeSkipIndices(
prev_pos = pos;
}
}
skip_index_mark = skip_index_current_mark;
skip_index_data_mark = skip_index_current_data_mark;
}
void IMergedBlockOutputStream::finishSkipIndicesSerialization(

View File

@ -141,7 +141,10 @@ protected:
size_t aio_threshold;
size_t current_mark = 0;
size_t skip_index_mark = 0;
/// Number of mark in data from which skip indices have to start
/// aggregation. I.e. it's data mark number, not skip indices mark.
size_t skip_index_data_mark = 0;
const bool can_use_adaptive_granularity;
const std::string marks_file_extension;

View File

@ -91,6 +91,7 @@ namespace ErrorCodes
extern const int INCORRECT_FILE_NAME;
extern const int BAD_DATA_PART_NAME;
extern const int UNKNOWN_SETTING;
extern const int READONLY_SETTING;
}
@ -358,10 +359,10 @@ void MergeTreeData::setProperties(
const auto & index_decl = std::dynamic_pointer_cast<ASTIndexDeclaration>(index_ast);
new_indices.push_back(
MergeTreeIndexFactory::instance().get(
all_columns,
std::dynamic_pointer_cast<ASTIndexDeclaration>(index_decl->clone()),
global_context));
MergeTreeIndexFactory::instance().get(
all_columns,
std::dynamic_pointer_cast<ASTIndexDeclaration>(index_decl->clone()),
global_context));
if (indices_names.find(new_indices.back()->name) != indices_names.end())
throw Exception(
@ -1293,7 +1294,7 @@ void MergeTreeData::checkAlter(const AlterCommands & commands, const Context & c
}
if (columns_alter_forbidden.count(command.column_name))
throw Exception("trying to ALTER key column " + command.column_name, ErrorCodes::ILLEGAL_COLUMN);
throw Exception("Trying to ALTER key column " + command.column_name, ErrorCodes::ILLEGAL_COLUMN);
if (columns_alter_metadata_only.count(command.column_name))
{
@ -1324,10 +1325,7 @@ void MergeTreeData::checkAlter(const AlterCommands & commands, const Context & c
setTTLExpressions(new_columns.getColumnTTLs(), new_ttl_table_ast, /* only_check = */ true);
for (const auto & setting : new_changes)
{
if (!hasSetting(setting.name))
throw Exception{"Storage '" + getName() + "' doesn't have setting '" + setting.name + "'", ErrorCodes::UNKNOWN_SETTING};
}
checkSettingCanBeChanged(setting.name);
/// Check that type conversions are possible.
ExpressionActionsPtr unused_expression;
@ -1578,7 +1576,8 @@ void MergeTreeData::alterDataPart(
if (expression)
{
BlockInputStreamPtr part_in = std::make_shared<MergeTreeSequentialBlockInputStream>(
*this, part, expression->getRequiredColumns(), false, /* take_column_types_from_storage = */ false);
*this, part, expression->getRequiredColumns(), false, /* take_column_types_from_storage = */ false);
auto compression_codec = global_context.chooseCompressionCodec(
part->bytes_on_disk,
@ -1600,7 +1599,8 @@ void MergeTreeData::alterDataPart(
true /* sync */,
compression_codec,
true /* skip_offsets */,
{},
/// Don't recalc indices because indices alter is restricted
std::vector<MergeTreeIndexPtr>{},
unused_written_offsets,
part->index_granularity,
&part->index_granularity_info);
@ -1652,14 +1652,18 @@ void MergeTreeData::changeSettings(
if (!new_changes.empty())
{
MergeTreeSettings copy = *getSettings();
copy.updateFromChanges(new_changes);
copy.applyChanges(new_changes);
storage_settings.set(std::make_unique<const MergeTreeSettings>(copy));
}
}
bool MergeTreeData::hasSetting(const String & setting_name) const
void MergeTreeData::checkSettingCanBeChanged(const String & setting_name) const
{
return MergeTreeSettings::findIndex(setting_name) != MergeTreeSettings::npos;
if (MergeTreeSettings::findIndex(setting_name) == MergeTreeSettings::npos)
throw Exception{"Storage '" + getName() + "' doesn't have setting '" + setting_name + "'", ErrorCodes::UNKNOWN_SETTING};
if (MergeTreeSettings::isReadonlySetting(setting_name))
throw Exception{"Setting '" + setting_name + "' is readonly for storage '" + getName() + "'", ErrorCodes::READONLY_SETTING};
}
void MergeTreeData::removeEmptyColumnsFromPart(MergeTreeData::MutableDataPartPtr & data_part)

View File

@ -543,7 +543,7 @@ public:
TableStructureWriteLockHolder & table_lock_holder);
/// All MergeTreeData children have settings.
bool hasSetting(const String & setting_name) const override;
void checkSettingCanBeChanged(const String & setting_name) const override;
/// Remove columns, that have been markedd as empty after zeroing values with expired ttl
void removeEmptyColumnsFromPart(MergeTreeData::MutableDataPartPtr & data_part);
@ -621,6 +621,13 @@ public:
(settings->enable_mixed_granularity_parts || !has_non_adaptive_index_granularity_parts);
}
/// Get constant pointer to storage settings.
/// Copy this pointer into your scope and you will
/// get consistent settings.
MergeTreeSettingsPtr getSettings() const
{
return storage_settings.get();
}
MergeTreeDataFormatVersion format_version;
@ -679,13 +686,6 @@ public:
bool has_non_adaptive_index_granularity_parts = false;
/// Get constant pointer to storage settings.
/// Copy this pointer into your scope and you will
/// get consistent settings.
MergeTreeSettingsPtr getSettings() const
{
return storage_settings.get();
}
protected:

View File

@ -379,7 +379,7 @@ static void extractMergingAndGatheringColumns(
std::set<String> key_columns(sort_key_columns_vec.cbegin(), sort_key_columns_vec.cend());
for (const auto & index : indexes)
{
Names index_columns_vec = index->expr->getRequiredColumns();
Names index_columns_vec = index->getColumnsRequiredForIndexCalc();
std::copy(index_columns_vec.cbegin(), index_columns_vec.cend(),
std::inserter(key_columns, key_columns.end()));
}
@ -561,7 +561,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
NamesAndTypesList all_columns = data.getColumns().getAllPhysical();
const auto data_settings = data.getSettings();
NamesAndTypesList gathering_columns, merging_columns;
NamesAndTypesList gathering_columns;
NamesAndTypesList merging_columns;
Names gathering_column_names, merging_column_names;
extractMergingAndGatheringColumns(
all_columns, data.sorting_key_expr, data.skip_indices,
@ -828,6 +829,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
rows_sources_read_buf.seek(0, 0);
ColumnGathererStream column_gathered_stream(column_name, column_part_streams, rows_sources_read_buf);
MergedColumnOnlyOutputStream column_to(
data,
column_gathered_stream.getHeader(),
@ -835,10 +837,13 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
false,
compression_codec,
false,
{},
/// we don't need to recalc indices here
/// because all of them were already recalculated and written
/// as key part of vertical merge
std::vector<MergeTreeIndexPtr>{},
written_offset_columns,
to.getIndexGranularity()
);
to.getIndexGranularity());
size_t column_elems_written = 0;
column_to.writePrefix();
@ -1019,9 +1024,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
for (size_t i = 0; i < data.skip_indices.size(); ++i)
{
const auto & index = data.skip_indices[i];
const auto & index_cols = index->expr->getRequiredColumns();
auto it = find(cbegin(index_cols), cend(index_cols), col);
if (it != cend(index_cols) && indices_to_recalc.insert(index).second)
const auto & index_cols = index->getColumnsRequiredForIndexCalc();
auto it = std::find(std::cbegin(index_cols), std::cend(index_cols), col);
if (it != std::cend(index_cols) && indices_to_recalc.insert(index).second)
{
ASTPtr expr_list = MergeTreeData::extractKeyExpressionList(
storage_from_source_part->getIndices().indices[i]->expr->clone());
@ -1038,6 +1043,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
auto indices_recalc_expr = ExpressionAnalyzer(
indices_recalc_expr_list,
indices_recalc_syntax, context).getActions(false);
/// We can update only one column, but some skip idx expression may depend on several
/// columns (c1 + c2 * c3). It works because in stream was created with help of
/// MutationsInterpreter which knows about skip indices and stream 'in' already has
/// all required columns.
/// TODO move this logic to single place.
in = std::make_shared<MaterializingBlockInputStream>(
std::make_shared<ExpressionBlockInputStream>(in, indices_recalc_expr));
}

View File

@ -51,6 +51,7 @@ std::unique_ptr<IMergeTreeIndex> MergeTreeIndexFactory::get(
return lft + ", " + rht.first;
}),
ErrorCodes::INCORRECT_QUERY);
return it->second(columns, node, context);
}

View File

@ -104,11 +104,25 @@ public:
virtual MergeTreeIndexConditionPtr createIndexCondition(
const SelectQueryInfo & query_info, const Context & context) const = 0;
Names getColumnsRequiredForIndexCalc() const { return expr->getRequiredColumns(); }
/// Index name
String name;
/// Index expression (x * y)
/// with columns arguments
ExpressionActionsPtr expr;
/// Names of columns for index
Names columns;
/// Data types of columns
DataTypes data_types;
/// Block with columns and data_types
Block header;
/// Skip index granularity
size_t granularity;
};

View File

@ -46,7 +46,7 @@ void MergeTreeSettings::loadFromQuery(ASTStorage & storage_def)
{
try
{
loadFromChanges(storage_def.settings->changes);
applyChanges(storage_def.settings->changes);
}
catch (Exception & e)
{

View File

@ -2,6 +2,7 @@
#include <Core/Defines.h>
#include <Core/SettingsCommon.h>
#include <Common/SettingsChanges.h>
namespace Poco
@ -24,9 +25,8 @@ class ASTStorage;
struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
{
/// M (mutable) for normal settings, IM (immutable) for not updateable settings.
#define LIST_OF_MERGE_TREE_SETTINGS(M, IM) \
IM(SettingUInt64, index_granularity, 8192, "How many rows correspond to one primary key value.") \
#define LIST_OF_MERGE_TREE_SETTINGS(M) \
M(SettingUInt64, index_granularity, 8192, "How many rows correspond to one primary key value.") \
\
/** Merge settings. */ \
M(SettingUInt64, max_bytes_to_merge_at_max_space_in_pool, 150ULL * 1024 * 1024 * 1024, "Maximum in total size of parts to merge, when there are maximum free threads in background pool (or entries in replication queue).") \
@ -80,7 +80,7 @@ struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
M(SettingBool, use_minimalistic_part_header_in_zookeeper, false, "Store part header (checksums and columns) in a compact format and a single part znode instead of separate znodes (<part>/columns and <part>/checksums). This can dramatically reduce snapshot size in ZooKeeper. Before enabling check that all replicas support new format.") \
M(SettingUInt64, finished_mutations_to_keep, 100, "How many records about mutations that are done to keep. If zero, then keep all of them.") \
M(SettingUInt64, min_merge_bytes_to_use_direct_io, 10ULL * 1024 * 1024 * 1024, "Minimal amount of bytes to enable O_DIRECT in merge (0 - disabled).") \
IM(SettingUInt64, index_granularity_bytes, 10 * 1024 * 1024, "Approximate amount of bytes in single granule (0 - disabled).") \
M(SettingUInt64, index_granularity_bytes, 10 * 1024 * 1024, "Approximate amount of bytes in single granule (0 - disabled).") \
M(SettingInt64, merge_with_ttl_timeout, 3600 * 24, "Minimal time in seconds, when merge with TTL can be repeated.") \
M(SettingBool, ttl_only_drop_parts, false, "Only drop altogether the expired parts and not partially prune them.") \
M(SettingBool, write_final_mark, 1, "Write final mark after end of column (0 - disabled, do nothing if index_granularity_bytes=0)") \
@ -99,6 +99,12 @@ struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
/// NOTE: will rewrite the AST to add immutable settings.
void loadFromQuery(ASTStorage & storage_def);
/// We check settings after storage creation
static bool isReadonlySetting(const String & name)
{
return name == "index_granularity" || name == "index_granularity_bytes";
}
};
using MergeTreeSettingsPtr = std::shared_ptr<const MergeTreeSettings>;

View File

@ -332,7 +332,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
else if (skip_indexes_column_name_to_position.end() != skip_index_column_it)
{
const auto & index_column = *skip_indexes_columns[skip_index_column_it->second].column;
writeColumn(column.name, *column.type, index_column, offset_columns, false, serialization_states[i], current_mark);
std::tie(std::ignore, new_index_offset) = writeColumn(column.name, *column.type, index_column, offset_columns, false, serialization_states[i], current_mark);
}
else
{
@ -349,6 +349,8 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
rows_count += rows;
/// Should be written before index offset update, because we calculate,
/// indices of currently written granules
calculateAndSerializeSkipIndices(skip_indexes_columns, rows);
{

View File

@ -68,7 +68,6 @@ void MergedColumnOnlyOutputStream::write(const Block & block)
if (!rows)
return;
calculateAndSerializeSkipIndices(skip_indexes_columns, rows);
size_t new_index_offset = 0;
size_t new_current_mark = 0;
@ -79,6 +78,10 @@ void MergedColumnOnlyOutputStream::write(const Block & block)
std::tie(new_current_mark, new_index_offset) = writeColumn(column.name, *column.type, *column.column, offset_columns, skip_offsets, serialization_states[i], current_mark);
}
/// Should be written before index offset update, because we calculate,
/// indices of currently written granules
calculateAndSerializeSkipIndices(skip_indexes_columns, rows);
index_offset = new_index_offset;
current_mark = new_current_mark;
}
@ -103,7 +106,6 @@ MergeTreeData::DataPart::Checksums MergedColumnOnlyOutputStream::writeSuffixAndG
serialize_settings.getter = createStreamGetter(column.name, already_written_offset_columns, skip_offsets);
column.type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]);
if (with_final_mark)
writeFinalMark(column.name, column.type, offset_columns, skip_offsets, serialize_settings.path);
}

View File

@ -366,8 +366,8 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const ASTPtr
if (storage.get() != this)
{
virtual_column->insert(storage->getTableName());
selected_tables.emplace_back(storage, get_lock ? storage->lockStructureForShare(false, query_id) : TableStructureReadLockHolder{});
virtual_column->insert(storage->getTableName());
}
iterator->next();

View File

@ -50,10 +50,11 @@ void StorageSystemDictionaries::fillData(MutableColumns & res_columns, const Con
res_columns[i++]->insert(static_cast<Int8>(load_result.status));
res_columns[i++]->insert(load_result.origin);
if (load_result.object)
{
const auto dict_ptr = std::static_pointer_cast<const IDictionaryBase>(load_result.object);
std::exception_ptr last_exception = load_result.exception;
const auto dict_ptr = std::dynamic_pointer_cast<const IDictionaryBase>(load_result.object);
if (dict_ptr)
{
res_columns[i++]->insert(dict_ptr->getTypeName());
const auto & dict_struct = dict_ptr->getStructure();
@ -66,6 +67,9 @@ void StorageSystemDictionaries::fillData(MutableColumns & res_columns, const Con
res_columns[i++]->insert(dict_ptr->getElementCount());
res_columns[i++]->insert(dict_ptr->getLoadFactor());
res_columns[i++]->insert(dict_ptr->getSource()->toString());
if (!last_exception)
last_exception = dict_ptr->getLastException();
}
else
{
@ -76,8 +80,8 @@ void StorageSystemDictionaries::fillData(MutableColumns & res_columns, const Con
res_columns[i++]->insert(static_cast<UInt64>(std::chrono::system_clock::to_time_t(load_result.loading_start_time)));
res_columns[i++]->insert(std::chrono::duration_cast<std::chrono::duration<float>>(load_result.loading_duration).count());
if (load_result.exception)
res_columns[i++]->insert(getExceptionMessage(load_result.exception, false));
if (last_exception)
res_columns[i++]->insert(getExceptionMessage(last_exception, false));
else
res_columns[i++]->insertDefault();
}

View File

@ -58,6 +58,7 @@ NamesAndTypesList StorageSystemProcesses::getNamesAndTypes()
{"query", std::make_shared<DataTypeString>()},
{"thread_numbers", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt32>())},
{"os_thread_ids", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt32>())},
{"ProfileEvents.Names", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"ProfileEvents.Values", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>())},
{"Settings.Names", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
@ -120,6 +121,14 @@ void StorageSystemProcesses::fillData(MutableColumns & res_columns, const Contex
res_columns[i++]->insert(threads_array);
}
{
Array threads_array;
threads_array.reserve(process.os_thread_ids.size());
for (const UInt32 thread_number : process.os_thread_ids)
threads_array.emplace_back(thread_number);
res_columns[i++]->insert(threads_array);
}
{
IColumn * column_profile_events_names = res_columns[i++].get();
IColumn * column_profile_events_values = res_columns[i++].get();

View File

@ -0,0 +1,31 @@
<yandex>
<dictionary>
<name>cache_xypairs</name>
<source>
<clickhouse>
<host>localhost</host>
<port>9000</port>
<user>default</user>
<password></password>
<db>test</db>
<table>xypairs</table>
</clickhouse>
</source>
<lifetime>1</lifetime>
<layout>
<cache>
<size_in_cells>5</size_in_cells>
</cache>
</layout>
<structure>
<id>
<name>x</name>
</id>
<attribute>
<name>y</name>
<type>UInt64</type>
<null_value>0</null_value>
</attribute>
</structure>
</dictionary>
</yandex>

View File

@ -17,6 +17,10 @@ def get_status(dictionary_name):
return instance.query("SELECT status FROM system.dictionaries WHERE name='" + dictionary_name + "'").rstrip("\n")
def get_last_exception(dictionary_name):
return instance.query("SELECT last_exception FROM system.dictionaries WHERE name='" + dictionary_name + "'").rstrip("\n").replace("\\'", "'")
def get_loading_start_time(dictionary_name):
s = instance.query("SELECT loading_start_time FROM system.dictionaries WHERE name='" + dictionary_name + "'").rstrip("\n")
if s == "0000-00-00 00:00:00":
@ -350,3 +354,58 @@ def test_reload_after_fail_by_timer(started_cluster):
time.sleep(6);
query("SELECT dictGetInt32('no_file_2', 'a', toUInt64(9))") == "10\n"
assert get_status("no_file_2") == "LOADED"
def test_reload_after_fail_in_cache_dictionary(started_cluster):
query = instance.query
query_and_get_error = instance.query_and_get_error
# Can't get a value from the cache dictionary because the source (table `test.xypairs`) doesn't respond.
expected_error = "Table test.xypairs doesn't exist"
assert expected_error in query_and_get_error("SELECT dictGetUInt64('cache_xypairs', 'y', toUInt64(1))")
assert get_status("cache_xypairs") == "LOADED"
assert expected_error in get_last_exception("cache_xypairs")
# Create table `test.xypairs`.
query('''
drop table if exists test.xypairs;
create table test.xypairs (x UInt64, y UInt64) engine=Log;
insert into test.xypairs values (1, 56), (3, 78);
''')
# Cache dictionary now works.
assert_eq_with_retry(instance, "SELECT dictGet('cache_xypairs', 'y', toUInt64(1))", "56", ignore_error=True)
query("SELECT dictGet('cache_xypairs', 'y', toUInt64(2))") == "0"
assert get_last_exception("cache_xypairs") == ""
# Drop table `test.xypairs`.
query('drop table if exists test.xypairs')
# Values are cached so we can get them.
query("SELECT dictGet('cache_xypairs', 'y', toUInt64(1))") == "56"
query("SELECT dictGet('cache_xypairs', 'y', toUInt64(2))") == "0"
assert get_last_exception("cache_xypairs") == ""
# But we can't get a value from the source table which isn't cached.
assert expected_error in query_and_get_error("SELECT dictGetUInt64('cache_xypairs', 'y', toUInt64(3))")
assert expected_error in get_last_exception("cache_xypairs")
# Passed time should not spoil the cache.
time.sleep(5);
query("SELECT dictGet('cache_xypairs', 'y', toUInt64(1))") == "56"
query("SELECT dictGet('cache_xypairs', 'y', toUInt64(2))") == "0"
assert expected_error in query_and_get_error("SELECT dictGetUInt64('cache_xypairs', 'y', toUInt64(3))")
assert expected_error in get_last_exception("cache_xypairs")
# Create table `test.xypairs` again with changed values.
query('''
drop table if exists test.xypairs;
create table test.xypairs (x UInt64, y UInt64) engine=Log;
insert into test.xypairs values (1, 57), (3, 79);
''')
# The cache dictionary returns new values now.
assert_eq_with_retry(instance, "SELECT dictGet('cache_xypairs', 'y', toUInt64(1))", "57")
query("SELECT dictGet('cache_xypairs', 'y', toUInt64(2))") == "0"
query("SELECT dictGet('cache_xypairs', 'y', toUInt64(3))") == "79"
assert get_last_exception("cache_xypairs") == ""

View File

@ -504,6 +504,7 @@ def test_kafka_virtual_columns_with_materialized_view(kafka_cluster):
kafka_check_result(result, True, 'test_kafka_virtual2.reference')
@pytest.mark.timeout(60)
def test_kafka_insert(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
@ -540,6 +541,7 @@ def test_kafka_insert(kafka_cluster):
kafka_check_result(result, True)
@pytest.mark.timeout(60)
def test_kafka_produce_consume(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
@ -603,6 +605,78 @@ def test_kafka_produce_consume(kafka_cluster):
assert int(result) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result)
@pytest.mark.timeout(300)
def test_kafka_commit_on_block_write(kafka_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'block',
kafka_group_name = 'block',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 100,
kafka_row_delimiter = '\\n';
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree()
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.kafka;
''')
cancel = threading.Event()
i = [0]
def produce():
while not cancel.is_set():
messages = []
for _ in range(101):
messages.append(json.dumps({'key': i[0], 'value': i[0]}))
i[0] += 1
kafka_produce('block', messages)
kafka_thread = threading.Thread(target=produce)
kafka_thread.start()
while int(instance.query('SELECT count() FROM test.view')) == 0:
time.sleep(1)
cancel.set()
instance.query('''
DROP TABLE test.kafka;
''')
while int(instance.query("SELECT count() FROM system.tables WHERE database='test' AND name='kafka'")) == 1:
time.sleep(1)
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'block',
kafka_group_name = 'block',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 100,
kafka_row_delimiter = '\\n';
''')
while int(instance.query('SELECT uniqExact(key) FROM test.view')) < i[0]:
time.sleep(1)
result = int(instance.query('SELECT count() == uniqExact(key) FROM test.view'))
instance.query('''
DROP TABLE test.consumer;
DROP TABLE test.view;
''')
kafka_thread.join()
assert result == 1, 'Messages from kafka get duplicated!'
if __name__ == '__main__':
cluster.start()
raw_input("Cluster created, press any key to destroy...")

View File

@ -46,6 +46,13 @@ hello
1
Thursday
Friday
(3,5)
(7,3)
(5,0)
(3,5)
(3,0)
(3,5)
(3,0)
--JSONExtractKeysAndValues--
[('a','hello')]
[('b',[-100,200,300])]
@ -121,6 +128,13 @@ hello
1
Thursday
Friday
(3,5)
(7,3)
(5,0)
(3,5)
(3,0)
(3,5)
(3,0)
--JSONExtractKeysAndValues--
[('a','hello')]
[('b',[-100,200,300])]

View File

@ -54,6 +54,13 @@ SELECT JSONExtract('{"a": "hello", "b": [-100, 200.0, 300]}', 'b', 4, 'Nullable(
SELECT JSONExtract('{"passed": true}', 'passed', 'UInt8');
SELECT JSONExtract('{"day": "Thursday"}', 'day', 'Enum8(\'Sunday\' = 0, \'Monday\' = 1, \'Tuesday\' = 2, \'Wednesday\' = 3, \'Thursday\' = 4, \'Friday\' = 5, \'Saturday\' = 6)');
SELECT JSONExtract('{"day": 5}', 'day', 'Enum8(\'Sunday\' = 0, \'Monday\' = 1, \'Tuesday\' = 2, \'Wednesday\' = 3, \'Thursday\' = 4, \'Friday\' = 5, \'Saturday\' = 6)');
SELECT JSONExtract('{"a":3,"b":5,"c":7}', 'Tuple(a Int, b Int)');
SELECT JSONExtract('{"a":3,"b":5,"c":7}', 'Tuple(c Int, a Int)');
SELECT JSONExtract('{"a":3,"b":5,"c":7}', 'Tuple(b Int, d Int)');
SELECT JSONExtract('{"a":3,"b":5,"c":7}', 'Tuple(Int, Int)');
SELECT JSONExtract('{"a":3}', 'Tuple(Int, Int)');
SELECT JSONExtract('[3,5,7]', 'Tuple(Int, Int)');
SELECT JSONExtract('[3]', 'Tuple(Int, Int)');
SELECT '--JSONExtractKeysAndValues--';
SELECT JSONExtractKeysAndValues('{"a": "hello", "b": [-100, 200.0, 300]}', 'String');
@ -138,6 +145,13 @@ SELECT JSONExtract('{"a": "hello", "b": [-100, 200.0, 300]}', 'b', 4, 'Nullable(
SELECT JSONExtract('{"passed": true}', 'passed', 'UInt8');
SELECT JSONExtract('{"day": "Thursday"}', 'day', 'Enum8(\'Sunday\' = 0, \'Monday\' = 1, \'Tuesday\' = 2, \'Wednesday\' = 3, \'Thursday\' = 4, \'Friday\' = 5, \'Saturday\' = 6)');
SELECT JSONExtract('{"day": 5}', 'day', 'Enum8(\'Sunday\' = 0, \'Monday\' = 1, \'Tuesday\' = 2, \'Wednesday\' = 3, \'Thursday\' = 4, \'Friday\' = 5, \'Saturday\' = 6)');
SELECT JSONExtract('{"a":3,"b":5,"c":7}', 'Tuple(a Int, b Int)');
SELECT JSONExtract('{"a":3,"b":5,"c":7}', 'Tuple(c Int, a Int)');
SELECT JSONExtract('{"a":3,"b":5,"c":7}', 'Tuple(b Int, d Int)');
SELECT JSONExtract('{"a":3,"b":5,"c":7}', 'Tuple(Int, Int)');
SELECT JSONExtract('{"a":3}', 'Tuple(Int, Int)');
SELECT JSONExtract('[3,5,7]', 'Tuple(Int, Int)');
SELECT JSONExtract('[3]', 'Tuple(Int, Int)');
SELECT '--JSONExtractKeysAndValues--';
SELECT JSONExtractKeysAndValues('{"a": "hello", "b": [-100, 200.0, 300]}', 'String');

View File

@ -19,7 +19,7 @@ $CLICKHOUSE_CLIENT --use_uncompressed_cache=1 --query_id="test-query-uncompresse
sleep 1
$CLICKHOUSE_CLIENT --query="SYSTEM FLUSH LOGS"
$CLICKHOUSE_CLIENT --query="SELECT ProfileEvents.Values[indexOf(ProfileEvents.Names, 'Seek')], ProfileEvents.Values[indexOf(ProfileEvents.Names, 'ReadCompressedBytes')], ProfileEvents.Values[indexOf(ProfileEvents.Names, 'UncompressedCacheHits')] AS hit FROM system.query_log WHERE (query_id = 'test-query-uncompressed-cache') AND (type = 2) ORDER BY event_time DESC LIMIT 1"
$CLICKHOUSE_CLIENT --query="SELECT ProfileEvents.Values[indexOf(ProfileEvents.Names, 'Seek')], ProfileEvents.Values[indexOf(ProfileEvents.Names, 'ReadCompressedBytes')], ProfileEvents.Values[indexOf(ProfileEvents.Names, 'UncompressedCacheHits')] AS hit FROM system.query_log WHERE (query_id = 'test-query-uncompressed-cache') AND (type = 2) AND event_date >= yesterday() ORDER BY event_time DESC LIMIT 1"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS small_table"

View File

@ -1,47 +0,0 @@
#!/usr/bin/env python
import os
import sys
import signal
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
client1.expect(prompt)
client2.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client2.send('SET allow_experimental_live_view = 1')
client2.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send(' DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
client1.send('WATCH test.lv EVENTS')
client1.expect('1.*' + end_of_block)
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect('2.*' + end_of_block)
client2.send('INSERT INTO test.mt VALUES (4),(5),(6)')
client1.expect('3.*' + end_of_block)
# send Ctrl-C
client1.send('\x03', eol='')
match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]:
client1.send(client1.command)
client1.expect(prompt)
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -1,20 +0,0 @@
SET allow_experimental_live_view = 1;
DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt;
WATCH test.lv LIMIT 0;
INSERT INTO test.mt VALUES (1),(2),(3);
WATCH test.lv LIMIT 0;
INSERT INTO test.mt VALUES (4),(5),(6);
WATCH test.lv LIMIT 0;
DROP TABLE test.lv;
DROP TABLE test.mt;

View File

@ -1,47 +0,0 @@
#!/usr/bin/env python
import os
import sys
import signal
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
client1.expect(prompt)
client2.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client2.send('SET allow_experimental_live_view = 1')
client2.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
client1.send('WATCH test.lv')
client1.expect(r'0.*1' + end_of_block)
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect(r'6.*2' + end_of_block)
client2.send('INSERT INTO test.mt VALUES (4),(5),(6)')
client1.expect(r'21.*3' + end_of_block)
# send Ctrl-C
client1.send('\x03', eol='')
match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]:
client1.send(client1.command)
client1.expect(prompt)
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -1,54 +0,0 @@
#!/usr/bin/env python
import os
import sys
import signal
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
client1.expect(prompt)
client2.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client2.send('SET allow_experimental_live_view = 1')
client2.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('SET temporary_live_view_timeout=1')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
client1.send('WATCH test.lv')
client1.expect(r'0.*1' + end_of_block)
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client2.expect(prompt)
client1.expect(r'6.*2' + end_of_block)
client2.send('INSERT INTO test.mt VALUES (4),(5),(6)')
client2.expect(prompt)
client1.expect(r'21.*3' + end_of_block)
# send Ctrl-C
client1.send('\x03', eol='')
match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]:
client1.send(client1.command)
client1.expect(prompt)
client1.send('SELECT sleep(1)')
client1.expect(prompt)
client1.send('DROP TABLE test.lv')
client1.expect('Table test.lv doesn\'t exist')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -1,49 +0,0 @@
#!/usr/bin/env python
import os
import sys
import signal
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
client1.expect(prompt)
client2.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client2.send('SET allow_experimental_live_view = 1')
client2.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send(' DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('SET live_view_heartbeat_interval=1')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
client1.send('WATCH test.lv EVENTS')
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect('2.*' + end_of_block)
client1.expect('Progress: 2.00 rows.*\)')
# wait for heartbeat
client1.expect('Progress: 2.00 rows.*\)')
# send Ctrl-C
client1.send('\x03', eol='')
match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]:
client1.send(client1.command)
client1.expect(prompt)
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -1,50 +0,0 @@
#!/usr/bin/env python
import os
import sys
import signal
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
client1.expect(prompt)
client2.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client2.send('SET allow_experimental_live_view = 1')
client2.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send(' DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('SET live_view_heartbeat_interval=1')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
client1.send('WATCH test.lv')
client1.expect(r'0.*1' + end_of_block)
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect(r'6.*2' + end_of_block)
client1.expect('Progress: 2.00 rows.*\)')
# wait for heartbeat
client1.expect('Progress: 2.00 rows.*\)')
# send Ctrl-C
client1.send('\x03', eol='')
match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]:
client1.send(client1.command)
client1.expect(prompt)
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -1,40 +0,0 @@
#!/usr/bin/env python
import os
import sys
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
from httpclient import client as http_client
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1:
client1.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send(' DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
with http_client({'method':'GET', 'url': '/?allow_experimental_live_view=1&query=WATCH%20test.lv%20EVENTS'}, name='client2>', log=log) as client2:
client2.expect('.*1\n')
client1.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect(prompt)
client2.expect('.*2\n')
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -1,40 +0,0 @@
#!/usr/bin/env python
import os
import sys
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
from httpclient import client as http_client
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1:
client1.expect(prompt)
client1.send('SET allow_experimental_live_view = 1')
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send(' DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
with http_client({'method':'GET', 'url':'/?allow_experimental_live_view=1&query=WATCH%20test.lv'}, name='client2>', log=log) as client2:
client2.expect('.*0\t1\n')
client1.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect(prompt)
client2.expect('.*6\t2\n')
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -1,4 +0,0 @@
{"row":{"a":1}}
{"row":{"a":2}}
{"row":{"a":3}}
{"progress":{"read_rows":"3","read_bytes":"36","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}

Some files were not shown because too many files have changed in this diff Show More