diff --git a/.gitmodules b/.gitmodules index 68016bf8c5b..a618104f364 100644 --- a/.gitmodules +++ b/.gitmodules @@ -99,7 +99,7 @@ url = https://github.com/awslabs/aws-c-event-stream [submodule "aws-c-common"] path = contrib/aws-c-common - url = https://github.com/ClickHouse/aws-c-common + url = https://github.com/awslabs/aws-c-common.git [submodule "aws-checksums"] path = contrib/aws-checksums url = https://github.com/awslabs/aws-checksums diff --git a/base/poco/Foundation/CMakeLists.txt b/base/poco/Foundation/CMakeLists.txt index dfb41a33fb1..5fe644d3057 100644 --- a/base/poco/Foundation/CMakeLists.txt +++ b/base/poco/Foundation/CMakeLists.txt @@ -166,6 +166,12 @@ set (SRCS ) add_library (_poco_foundation ${SRCS}) +target_link_libraries (_poco_foundation + PUBLIC + boost::headers_only + boost::system +) + add_library (Poco::Foundation ALIAS _poco_foundation) # TODO: remove these warning exclusions diff --git a/base/poco/Foundation/include/Poco/Logger.h b/base/poco/Foundation/include/Poco/Logger.h index cf202718662..883294a071a 100644 --- a/base/poco/Foundation/include/Poco/Logger.h +++ b/base/poco/Foundation/include/Poco/Logger.h @@ -22,6 +22,9 @@ #include #include #include + +#include + #include "Poco/Channel.h" #include "Poco/Format.h" #include "Poco/Foundation.h" @@ -34,7 +37,7 @@ namespace Poco class Exception; class Logger; -using LoggerPtr = std::shared_ptr; +using LoggerPtr = boost::intrusive_ptr; class Foundation_API Logger : public Channel /// Logger is a special Channel that acts as the main @@ -871,21 +874,11 @@ public: /// If the Logger does not yet exist, it is created, based /// on its parent logger. - static LoggerPtr getShared(const std::string & name); + static LoggerPtr getShared(const std::string & name, bool should_be_owned_by_shared_ptr_if_created = true); /// Returns a shared pointer to the Logger with the given name. /// If the Logger does not yet exist, it is created, based /// on its parent logger. - static Logger & unsafeGet(const std::string & name); - /// Returns a reference to the Logger with the given name. - /// If the Logger does not yet exist, it is created, based - /// on its parent logger. - /// - /// WARNING: This method is not thread safe. You should - /// probably use get() instead. - /// The only time this method should be used is during - /// program initialization, when only one thread is running. - static Logger & create(const std::string & name, Channel * pChannel, int level = Message::PRIO_INFORMATION); /// Creates and returns a reference to a Logger with the /// given name. The Logger's Channel and log level as set as @@ -932,6 +925,16 @@ public: static const std::string ROOT; /// The name of the root logger (""). +public: + struct LoggerEntry + { + Poco::Logger * logger; + bool owned_by_shared_ptr = false; + }; + + using LoggerMap = std::unordered_map; + using LoggerMapIterator = LoggerMap::iterator; + protected: Logger(const std::string & name, Channel * pChannel, int level); ~Logger(); @@ -940,12 +943,19 @@ protected: void log(const std::string & text, Message::Priority prio, const char * file, int line); static std::string format(const std::string & fmt, int argc, std::string argv[]); - static Logger & unsafeCreate(const std::string & name, Channel * pChannel, int level = Message::PRIO_INFORMATION); - static Logger & parent(const std::string & name); - static void add(Logger * pLogger); - static Logger * find(const std::string & name); private: + static std::pair unsafeGet(const std::string & name, bool get_shared); + static Logger * unsafeGetRawPtr(const std::string & name); + static std::pair unsafeCreate(const std::string & name, Channel * pChannel, int level = Message::PRIO_INFORMATION); + static Logger & parent(const std::string & name); + static std::pair add(Logger * pLogger); + static std::optional find(const std::string & name); + static Logger * findRawPtr(const std::string & name); + + friend void intrusive_ptr_add_ref(Logger * ptr); + friend void intrusive_ptr_release(Logger * ptr); + Logger(); Logger(const Logger &); Logger & operator=(const Logger &); diff --git a/base/poco/Foundation/include/Poco/RefCountedObject.h b/base/poco/Foundation/include/Poco/RefCountedObject.h index db966089e00..1f806bdacb1 100644 --- a/base/poco/Foundation/include/Poco/RefCountedObject.h +++ b/base/poco/Foundation/include/Poco/RefCountedObject.h @@ -53,11 +53,10 @@ protected: virtual ~RefCountedObject(); /// Destroys the RefCountedObject. + mutable std::atomic _counter; private: RefCountedObject(const RefCountedObject &); RefCountedObject & operator=(const RefCountedObject &); - - mutable std::atomic _counter; }; diff --git a/base/poco/Foundation/src/Logger.cpp b/base/poco/Foundation/src/Logger.cpp index cfc063c8979..16fc3a0480e 100644 --- a/base/poco/Foundation/src/Logger.cpp +++ b/base/poco/Foundation/src/Logger.cpp @@ -38,14 +38,7 @@ std::mutex & getLoggerMutex() return *logger_mutex; } -struct LoggerEntry -{ - Poco::Logger * logger; - bool owned_by_shared_ptr = false; -}; - -using LoggerMap = std::unordered_map; -LoggerMap * _pLoggerMap = nullptr; +Poco::Logger::LoggerMap * _pLoggerMap = nullptr; } @@ -309,38 +302,9 @@ void Logger::formatDump(std::string& message, const void* buffer, std::size_t le namespace { -struct LoggerDeleter -{ - void operator()(Poco::Logger * logger) - { - std::lock_guard lock(getLoggerMutex()); - - /// If logger infrastructure is destroyed just decrement logger reference count - if (!_pLoggerMap) - { - logger->release(); - return; - } - - auto it = _pLoggerMap->find(logger->name()); - assert(it != _pLoggerMap->end()); - - /** If reference count is 1, this means this shared pointer owns logger - * and need destroy it. - */ - size_t reference_count_before_release = logger->release(); - if (reference_count_before_release == 1) - { - assert(it->second.owned_by_shared_ptr); - _pLoggerMap->erase(it); - } - } -}; - - inline LoggerPtr makeLoggerPtr(Logger & logger) { - return std::shared_ptr(&logger, LoggerDeleter()); + return LoggerPtr(&logger, false /*add_ref*/); } } @@ -350,64 +314,87 @@ Logger& Logger::get(const std::string& name) { std::lock_guard lock(getLoggerMutex()); - Logger & logger = unsafeGet(name); - - /** If there are already shared pointer created for this logger - * we need to increment Logger reference count and now logger - * is owned by logger infrastructure. - */ - auto it = _pLoggerMap->find(name); - if (it->second.owned_by_shared_ptr) - { - it->second.logger->duplicate(); - it->second.owned_by_shared_ptr = false; - } - - return logger; + auto [it, inserted] = unsafeGet(name, false /*get_shared*/); + return *it->second.logger; } -LoggerPtr Logger::getShared(const std::string & name) +LoggerPtr Logger::getShared(const std::string & name, bool should_be_owned_by_shared_ptr_if_created) { std::lock_guard lock(getLoggerMutex()); - bool logger_exists = _pLoggerMap && _pLoggerMap->contains(name); + auto [it, inserted] = unsafeGet(name, true /*get_shared*/); - Logger & logger = unsafeGet(name); - - /** If logger already exists, then this shared pointer does not own it. - * If logger does not exists, logger infrastructure could be already destroyed - * or logger was created. + /** If during `unsafeGet` logger was created, then this shared pointer owns it. + * If logger was already created, then this shared pointer does not own it. */ - if (logger_exists) + if (inserted) { - logger.duplicate(); - } - else if (_pLoggerMap) - { - _pLoggerMap->find(name)->second.owned_by_shared_ptr = true; + if (should_be_owned_by_shared_ptr_if_created) + it->second.owned_by_shared_ptr = true; + else + it->second.logger->duplicate(); } - return makeLoggerPtr(logger); + return makeLoggerPtr(*it->second.logger); } -Logger& Logger::unsafeGet(const std::string& name) +std::pair Logger::unsafeGet(const std::string& name, bool get_shared) { - Logger* pLogger = find(name); - if (!pLogger) + std::optional optional_logger_it = find(name); + + bool should_recreate_logger = false; + + if (optional_logger_it) { + auto & logger_it = *optional_logger_it; + std::optional reference_count_before; + + if (get_shared) + { + reference_count_before = logger_it->second.logger->duplicate(); + } + else if (logger_it->second.owned_by_shared_ptr) + { + reference_count_before = logger_it->second.logger->duplicate(); + logger_it->second.owned_by_shared_ptr = false; + } + + /// Other thread already decided to delete this logger, but did not yet remove it from map + if (reference_count_before && reference_count_before == 0) + should_recreate_logger = true; + } + + if (!optional_logger_it || should_recreate_logger) + { + Logger * logger = nullptr; + if (name == ROOT) { - pLogger = new Logger(name, 0, Message::PRIO_INFORMATION); + logger = new Logger(name, nullptr, Message::PRIO_INFORMATION); } else { Logger& par = parent(name); - pLogger = new Logger(name, par.getChannel(), par.getLevel()); + logger = new Logger(name, par.getChannel(), par.getLevel()); } - add(pLogger); + + if (should_recreate_logger) + { + (*optional_logger_it)->second.logger = logger; + return std::make_pair(*optional_logger_it, true); + } + + return add(logger); } - return *pLogger; + + return std::make_pair(*optional_logger_it, false); +} + + +Logger * Logger::unsafeGetRawPtr(const std::string & name) +{ + return unsafeGet(name, false /*get_shared*/).first->second.logger; } @@ -415,24 +402,24 @@ Logger& Logger::create(const std::string& name, Channel* pChannel, int level) { std::lock_guard lock(getLoggerMutex()); - return unsafeCreate(name, pChannel, level); + return *unsafeCreate(name, pChannel, level).first->second.logger; } LoggerPtr Logger::createShared(const std::string & name, Channel * pChannel, int level) { std::lock_guard lock(getLoggerMutex()); - Logger & logger = unsafeCreate(name, pChannel, level); - _pLoggerMap->find(name)->second.owned_by_shared_ptr = true; + auto [it, inserted] = unsafeCreate(name, pChannel, level); + it->second.owned_by_shared_ptr = true; - return makeLoggerPtr(logger); + return makeLoggerPtr(*it->second.logger); } Logger& Logger::root() { std::lock_guard lock(getLoggerMutex()); - return unsafeGet(ROOT); + return *unsafeGetRawPtr(ROOT); } @@ -440,7 +427,11 @@ Logger* Logger::has(const std::string& name) { std::lock_guard lock(getLoggerMutex()); - return find(name); + auto optional_it = find(name); + if (!optional_it) + return nullptr; + + return (*optional_it)->second.logger; } @@ -459,20 +450,69 @@ void Logger::shutdown() } delete _pLoggerMap; - _pLoggerMap = 0; + _pLoggerMap = nullptr; } } -Logger* Logger::find(const std::string& name) +std::optional Logger::find(const std::string& name) { if (_pLoggerMap) { LoggerMap::iterator it = _pLoggerMap->find(name); if (it != _pLoggerMap->end()) - return it->second.logger; + return it; + + return {}; } - return 0; + + return {}; +} + +Logger * Logger::findRawPtr(const std::string & name) +{ + auto optional_it = find(name); + if (!optional_it) + return nullptr; + + return (*optional_it)->second.logger; +} + + +void intrusive_ptr_add_ref(Logger * ptr) +{ + ptr->duplicate(); +} + + +void intrusive_ptr_release(Logger * ptr) +{ + size_t reference_count_before = ptr->_counter.fetch_sub(1, std::memory_order_acq_rel); + if (reference_count_before != 1) + return; + + { + std::lock_guard lock(getLoggerMutex()); + + if (_pLoggerMap) + { + auto it = _pLoggerMap->find(ptr->name()); + + /** It is possible that during release other thread created logger and + * updated iterator in map. + */ + if (it != _pLoggerMap->end() && ptr == it->second.logger) + { + /** If reference count is 0, this means this intrusive pointer owns logger + * and need destroy it. + */ + assert(it->second.owned_by_shared_ptr); + _pLoggerMap->erase(it); + } + } + } + + delete ptr; } @@ -490,28 +530,28 @@ void Logger::names(std::vector& names) } } -Logger& Logger::unsafeCreate(const std::string & name, Channel * pChannel, int level) + +std::pair Logger::unsafeCreate(const std::string & name, Channel * pChannel, int level) { if (find(name)) throw ExistsException(); Logger* pLogger = new Logger(name, pChannel, level); - add(pLogger); - - return *pLogger; + return add(pLogger); } + Logger& Logger::parent(const std::string& name) { std::string::size_type pos = name.rfind('.'); if (pos != std::string::npos) { std::string pname = name.substr(0, pos); - Logger* pParent = find(pname); + Logger* pParent = findRawPtr(pname); if (pParent) return *pParent; else return parent(pname); } - else return unsafeGet(ROOT); + else return *unsafeGetRawPtr(ROOT); } @@ -579,12 +619,14 @@ namespace } -void Logger::add(Logger* pLogger) +std::pair Logger::add(Logger* pLogger) { if (!_pLoggerMap) - _pLoggerMap = new LoggerMap; + _pLoggerMap = new Logger::LoggerMap; - _pLoggerMap->emplace(pLogger->name(), LoggerEntry{pLogger, false /*owned_by_shared_ptr*/}); + auto result = _pLoggerMap->emplace(pLogger->name(), LoggerEntry{pLogger, false /*owned_by_shared_ptr*/}); + assert(result.second); + return result; } diff --git a/contrib/aws b/contrib/aws index ca02358dcc7..4ec215f3607 160000 --- a/contrib/aws +++ b/contrib/aws @@ -1 +1 @@ -Subproject commit ca02358dcc7ce3ab733dd4cbcc32734eecfa4ee3 +Subproject commit 4ec215f3607c2111bf2cc91ba842046a6b5eb0c4 diff --git a/contrib/aws-c-auth b/contrib/aws-c-auth index 97133a2b5db..baeffa791d9 160000 --- a/contrib/aws-c-auth +++ b/contrib/aws-c-auth @@ -1 +1 @@ -Subproject commit 97133a2b5dbca1ccdf88cd6f44f39d0531d27d12 +Subproject commit baeffa791d9d1cf61460662a6d9ac2186aaf05df diff --git a/contrib/aws-c-cal b/contrib/aws-c-cal index 85dd7664b78..9453687ff54 160000 --- a/contrib/aws-c-cal +++ b/contrib/aws-c-cal @@ -1 +1 @@ -Subproject commit 85dd7664b786a389c6fb1a6f031ab4bb2282133d +Subproject commit 9453687ff5493ba94eaccf8851200565c4364c77 diff --git a/contrib/aws-c-common b/contrib/aws-c-common index 45dcb2849c8..80f21b3cac5 160000 --- a/contrib/aws-c-common +++ b/contrib/aws-c-common @@ -1 +1 @@ -Subproject commit 45dcb2849c891dba2100b270b4676765c92949ff +Subproject commit 80f21b3cac5ac51c6b8a62c7d2a5ef58a75195ee diff --git a/contrib/aws-c-compression b/contrib/aws-c-compression index b517b7decd0..99ec79ee297 160000 --- a/contrib/aws-c-compression +++ b/contrib/aws-c-compression @@ -1 +1 @@ -Subproject commit b517b7decd0dac30be2162f5186c250221c53aff +Subproject commit 99ec79ee2970f1a045d4ced1501b97ee521f2f85 diff --git a/contrib/aws-c-event-stream b/contrib/aws-c-event-stream index 2f9b60c42f9..08f24e384e5 160000 --- a/contrib/aws-c-event-stream +++ b/contrib/aws-c-event-stream @@ -1 +1 @@ -Subproject commit 2f9b60c42f90840ec11822acda3d8cdfa97a773d +Subproject commit 08f24e384e5be20bcffa42b49213d24dad7881ae diff --git a/contrib/aws-c-http b/contrib/aws-c-http index dd344619879..a082f8a2067 160000 --- a/contrib/aws-c-http +++ b/contrib/aws-c-http @@ -1 +1 @@ -Subproject commit dd34461987947672444d0bc872c5a733dfdb9711 +Subproject commit a082f8a2067e4a31db73f1d4ffd702a8dc0f7089 diff --git a/contrib/aws-c-io b/contrib/aws-c-io index d58ed4f272b..11ce3c750a1 160000 --- a/contrib/aws-c-io +++ b/contrib/aws-c-io @@ -1 +1 @@ -Subproject commit d58ed4f272b1cb4f89ac9196526ceebe5f2b0d89 +Subproject commit 11ce3c750a1dac7b04069fc5bff89e97e91bad4d diff --git a/contrib/aws-c-mqtt b/contrib/aws-c-mqtt index 33c3455cec8..6d36cd37262 160000 --- a/contrib/aws-c-mqtt +++ b/contrib/aws-c-mqtt @@ -1 +1 @@ -Subproject commit 33c3455cec82b16feb940e12006cefd7b3ef4194 +Subproject commit 6d36cd3726233cb757468d0ea26f6cd8dad151ec diff --git a/contrib/aws-c-s3 b/contrib/aws-c-s3 index d7bfe602d69..de36fee8fe7 160000 --- a/contrib/aws-c-s3 +++ b/contrib/aws-c-s3 @@ -1 +1 @@ -Subproject commit d7bfe602d6925948f1fff95784e3613cca6a3900 +Subproject commit de36fee8fe7ab02f10987877ae94a805bf440c1f diff --git a/contrib/aws-c-sdkutils b/contrib/aws-c-sdkutils index 208a701fa01..fd8c0ba2e23 160000 --- a/contrib/aws-c-sdkutils +++ b/contrib/aws-c-sdkutils @@ -1 +1 @@ -Subproject commit 208a701fa01e99c7c8cc3dcebc8317da71362972 +Subproject commit fd8c0ba2e233997eaaefe82fb818b8b444b956d3 diff --git a/contrib/aws-checksums b/contrib/aws-checksums index ad53be196a2..321b805559c 160000 --- a/contrib/aws-checksums +++ b/contrib/aws-checksums @@ -1 +1 @@ -Subproject commit ad53be196a25bbefa3700a01187fdce573a7d2d0 +Subproject commit 321b805559c8e911be5bddba13fcbd222a3e2d3a diff --git a/contrib/aws-cmake/CMakeLists.txt b/contrib/aws-cmake/CMakeLists.txt index 950a0e06cd0..abde20addaf 100644 --- a/contrib/aws-cmake/CMakeLists.txt +++ b/contrib/aws-cmake/CMakeLists.txt @@ -25,6 +25,7 @@ include("${ClickHouse_SOURCE_DIR}/contrib/aws-cmake/AwsFeatureTests.cmake") include("${ClickHouse_SOURCE_DIR}/contrib/aws-cmake/AwsThreadAffinity.cmake") include("${ClickHouse_SOURCE_DIR}/contrib/aws-cmake/AwsThreadName.cmake") include("${ClickHouse_SOURCE_DIR}/contrib/aws-cmake/AwsSIMD.cmake") +include("${ClickHouse_SOURCE_DIR}/contrib/aws-crt-cpp/cmake/AwsGetVersion.cmake") # Gather sources and options. @@ -35,6 +36,8 @@ set(AWS_PUBLIC_COMPILE_DEFS) set(AWS_PRIVATE_COMPILE_DEFS) set(AWS_PRIVATE_LIBS) +list(APPEND AWS_PRIVATE_COMPILE_DEFS "-DINTEL_NO_ITTNOTIFY_API") + if (CMAKE_BUILD_TYPE_UC STREQUAL "DEBUG") list(APPEND AWS_PRIVATE_COMPILE_DEFS "-DDEBUG_BUILD") endif() @@ -85,14 +88,20 @@ file(GLOB AWS_SDK_CORE_SRC "${AWS_SDK_CORE_DIR}/source/external/cjson/*.cpp" "${AWS_SDK_CORE_DIR}/source/external/tinyxml2/*.cpp" "${AWS_SDK_CORE_DIR}/source/http/*.cpp" + "${AWS_SDK_CORE_DIR}/source/http/crt/*.cpp" "${AWS_SDK_CORE_DIR}/source/http/standard/*.cpp" "${AWS_SDK_CORE_DIR}/source/internal/*.cpp" "${AWS_SDK_CORE_DIR}/source/monitoring/*.cpp" + "${AWS_SDK_CORE_DIR}/source/net/*.cpp" + "${AWS_SDK_CORE_DIR}/source/net/linux-shared/*.cpp" + "${AWS_SDK_CORE_DIR}/source/platform/linux-shared/*.cpp" + "${AWS_SDK_CORE_DIR}/source/smithy/tracing/*.cpp" "${AWS_SDK_CORE_DIR}/source/utils/*.cpp" "${AWS_SDK_CORE_DIR}/source/utils/base64/*.cpp" + "${AWS_SDK_CORE_DIR}/source/utils/component-registry/*.cpp" "${AWS_SDK_CORE_DIR}/source/utils/crypto/*.cpp" - "${AWS_SDK_CORE_DIR}/source/utils/crypto/openssl/*.cpp" "${AWS_SDK_CORE_DIR}/source/utils/crypto/factory/*.cpp" + "${AWS_SDK_CORE_DIR}/source/utils/crypto/openssl/*.cpp" "${AWS_SDK_CORE_DIR}/source/utils/event/*.cpp" "${AWS_SDK_CORE_DIR}/source/utils/json/*.cpp" "${AWS_SDK_CORE_DIR}/source/utils/logging/*.cpp" @@ -115,9 +124,8 @@ OPTION(USE_AWS_MEMORY_MANAGEMENT "Aws memory management" OFF) configure_file("${AWS_SDK_CORE_DIR}/include/aws/core/SDKConfig.h.in" "${CMAKE_CURRENT_BINARY_DIR}/include/aws/core/SDKConfig.h" @ONLY) -list(APPEND AWS_PUBLIC_COMPILE_DEFS "-DAWS_SDK_VERSION_MAJOR=1") -list(APPEND AWS_PUBLIC_COMPILE_DEFS "-DAWS_SDK_VERSION_MINOR=10") -list(APPEND AWS_PUBLIC_COMPILE_DEFS "-DAWS_SDK_VERSION_PATCH=36") +aws_get_version(AWS_CRT_CPP_VERSION_MAJOR AWS_CRT_CPP_VERSION_MINOR AWS_CRT_CPP_VERSION_PATCH FULL_VERSION GIT_HASH) +configure_file("${AWS_CRT_DIR}/include/aws/crt/Config.h.in" "${AWS_CRT_DIR}/include/aws/crt/Config.h" @ONLY) list(APPEND AWS_SOURCES ${AWS_SDK_CORE_SRC} ${AWS_SDK_CORE_NET_SRC} ${AWS_SDK_CORE_PLATFORM_SRC}) @@ -176,6 +184,7 @@ file(GLOB AWS_COMMON_SRC "${AWS_COMMON_DIR}/source/*.c" "${AWS_COMMON_DIR}/source/external/*.c" "${AWS_COMMON_DIR}/source/posix/*.c" + "${AWS_COMMON_DIR}/source/linux/*.c" ) file(GLOB AWS_COMMON_ARCH_SRC diff --git a/contrib/aws-crt-cpp b/contrib/aws-crt-cpp index 8a301b7e842..f532d6abc0d 160000 --- a/contrib/aws-crt-cpp +++ b/contrib/aws-crt-cpp @@ -1 +1 @@ -Subproject commit 8a301b7e842f1daed478090c869207300972379f +Subproject commit f532d6abc0d2b0d8b5d6fe9e7c51eaedbe4afbd0 diff --git a/contrib/aws-s2n-tls b/contrib/aws-s2n-tls index 71f4794b758..9a1e7545402 160000 --- a/contrib/aws-s2n-tls +++ b/contrib/aws-s2n-tls @@ -1 +1 @@ -Subproject commit 71f4794b7580cf780eb4aca77d69eded5d3c7bb4 +Subproject commit 9a1e75454023e952b366ce1eab9c54007250119f diff --git a/contrib/libxml2 b/contrib/libxml2 index 8292f361458..223cb03a5d2 160000 --- a/contrib/libxml2 +++ b/contrib/libxml2 @@ -1 +1 @@ -Subproject commit 8292f361458fcffe0bff515a385be02e9d35582c +Subproject commit 223cb03a5d27b1b2393b266a8657443d046139d6 diff --git a/contrib/libxml2-cmake/linux_x86_64/include/libxml/xmlversion.h b/contrib/libxml2-cmake/linux_x86_64/include/libxml/xmlversion.h index d8535e91a0e..c2faeb47cb1 100644 --- a/contrib/libxml2-cmake/linux_x86_64/include/libxml/xmlversion.h +++ b/contrib/libxml2-cmake/linux_x86_64/include/libxml/xmlversion.h @@ -21,7 +21,7 @@ extern "C" { * your library and includes mismatch */ #ifndef LIBXML2_COMPILING_MSCCDEF -XMLPUBFUN void xmlCheckVersion(int version); +XMLPUBFUN void XMLCALL xmlCheckVersion(int version); #endif /* LIBXML2_COMPILING_MSCCDEF */ /** @@ -29,28 +29,28 @@ XMLPUBFUN void xmlCheckVersion(int version); * * the version string like "1.2.3" */ -#define LIBXML_DOTTED_VERSION "2.12.4" +#define LIBXML_DOTTED_VERSION "2.10.3" /** * LIBXML_VERSION: * * the version number: 1.2.3 value is 10203 */ -#define LIBXML_VERSION 21204 +#define LIBXML_VERSION 21003 /** * LIBXML_VERSION_STRING: * * the version number string, 1.2.3 value is "10203" */ -#define LIBXML_VERSION_STRING "21204" +#define LIBXML_VERSION_STRING "21003" /** * LIBXML_VERSION_EXTRA: * * extra version information, used to show a git commit description */ -#define LIBXML_VERSION_EXTRA "-GITv2.12.4" +#define LIBXML_VERSION_EXTRA "" /** * LIBXML_TEST_VERSION: @@ -58,7 +58,7 @@ XMLPUBFUN void xmlCheckVersion(int version); * Macro to check that the libxml version in use is compatible with * the version the software has been compiled against */ -#define LIBXML_TEST_VERSION xmlCheckVersion(21204); +#define LIBXML_TEST_VERSION xmlCheckVersion(21003); #ifndef VMS #if 0 @@ -270,7 +270,7 @@ XMLPUBFUN void xmlCheckVersion(int version); * * Whether iconv support is available */ -#if 1 +#if 0 #define LIBXML_ICONV_ENABLED #endif @@ -313,7 +313,7 @@ XMLPUBFUN void xmlCheckVersion(int version); /** * LIBXML_DEBUG_RUNTIME: * - * Removed + * Whether the runtime debugging is configured in */ #if 0 #define LIBXML_DEBUG_RUNTIME @@ -409,7 +409,12 @@ XMLPUBFUN void xmlCheckVersion(int version); #endif #ifdef __GNUC__ -/** DOC_DISABLE */ + +/** + * ATTRIBUTE_UNUSED: + * + * Macro used to signal to GCC unused function parameters + */ #ifndef ATTRIBUTE_UNUSED # if ((__GNUC__ > 2) || ((__GNUC__ == 2) && (__GNUC_MINOR__ >= 7))) @@ -419,6 +424,12 @@ XMLPUBFUN void xmlCheckVersion(int version); # endif #endif +/** + * LIBXML_ATTR_ALLOC_SIZE: + * + * Macro used to indicate to GCC this is an allocator function + */ + #ifndef LIBXML_ATTR_ALLOC_SIZE # if (!defined(__clang__) && ((__GNUC__ > 4) || ((__GNUC__ == 4) && (__GNUC_MINOR__ >= 3)))) # define LIBXML_ATTR_ALLOC_SIZE(x) __attribute__((alloc_size(x))) @@ -429,6 +440,12 @@ XMLPUBFUN void xmlCheckVersion(int version); # define LIBXML_ATTR_ALLOC_SIZE(x) #endif +/** + * LIBXML_ATTR_FORMAT: + * + * Macro used to indicate to GCC the parameter are printf like + */ + #ifndef LIBXML_ATTR_FORMAT # if ((__GNUC__ > 3) || ((__GNUC__ == 3) && (__GNUC_MINOR__ >= 3))) # define LIBXML_ATTR_FORMAT(fmt,args) __attribute__((__format__(__printf__,fmt,args))) @@ -440,69 +457,44 @@ XMLPUBFUN void xmlCheckVersion(int version); #endif #ifndef XML_DEPRECATED -# if defined (IN_LIBXML) || (__GNUC__ * 100 + __GNUC_MINOR__ < 301) +# ifdef IN_LIBXML # define XML_DEPRECATED -/* Available since at least GCC 3.1 */ # else +/* Available since at least GCC 3.1 */ # define XML_DEPRECATED __attribute__((deprecated)) # endif #endif -#if defined(__clang__) || (__GNUC__ * 100 + __GNUC_MINOR__ >= 406) - #if defined(__clang__) || (__GNUC__ * 100 + __GNUC_MINOR__ >= 800) - #define XML_IGNORE_FPTR_CAST_WARNINGS \ - _Pragma("GCC diagnostic push") \ - _Pragma("GCC diagnostic ignored \"-Wpedantic\"") \ - _Pragma("GCC diagnostic ignored \"-Wcast-function-type\"") - #else - #define XML_IGNORE_FPTR_CAST_WARNINGS \ - _Pragma("GCC diagnostic push") \ - _Pragma("GCC diagnostic ignored \"-Wpedantic\"") - #endif - #define XML_POP_WARNINGS \ - _Pragma("GCC diagnostic pop") -#else - #define XML_IGNORE_FPTR_CAST_WARNINGS - #define XML_POP_WARNINGS -#endif - #else /* ! __GNUC__ */ +/** + * ATTRIBUTE_UNUSED: + * + * Macro used to signal to GCC unused function parameters + */ #define ATTRIBUTE_UNUSED +/** + * LIBXML_ATTR_ALLOC_SIZE: + * + * Macro used to indicate to GCC this is an allocator function + */ #define LIBXML_ATTR_ALLOC_SIZE(x) +/** + * LIBXML_ATTR_FORMAT: + * + * Macro used to indicate to GCC the parameter are printf like + */ #define LIBXML_ATTR_FORMAT(fmt,args) +/** + * XML_DEPRECATED: + * + * Macro used to indicate that a function, variable, type or struct member + * is deprecated. + */ #ifndef XML_DEPRECATED -# if defined (IN_LIBXML) || !defined (_MSC_VER) -# define XML_DEPRECATED -/* Available since Visual Studio 2005 */ -# elif defined (_MSC_VER) && (_MSC_VER >= 1400) -# define XML_DEPRECATED __declspec(deprecated) -# endif -#endif -#if defined (_MSC_VER) && (_MSC_VER >= 1400) -# define XML_IGNORE_FPTR_CAST_WARNINGS __pragma(warning(push)) -#else -# define XML_IGNORE_FPTR_CAST_WARNINGS -#endif -#ifndef XML_POP_WARNINGS -# if defined (_MSC_VER) && (_MSC_VER >= 1400) -# define XML_POP_WARNINGS __pragma(warning(pop)) -# else -# define XML_POP_WARNINGS -# endif +#define XML_DEPRECATED #endif #endif /* __GNUC__ */ -#define XML_NO_ATTR - -#ifdef LIBXML_THREAD_ENABLED - #define XML_DECLARE_GLOBAL(name, type, attrs) \ - attrs XMLPUBFUN type *__##name(void); - #define XML_GLOBAL_MACRO(name) (*__##name()) -#else - #define XML_DECLARE_GLOBAL(name, type, attrs) \ - attrs XMLPUBVAR type name; -#endif - #ifdef __cplusplus } #endif /* __cplusplus */ diff --git a/contrib/update-submodules.sh b/contrib/update-submodules.sh index 7195de020bd..072d7a5dc2f 100755 --- a/contrib/update-submodules.sh +++ b/contrib/update-submodules.sh @@ -24,7 +24,7 @@ git config --file .gitmodules --get-regexp '.*path' | sed 's/[^ ]* //' | xargs - # We don't want to depend on any third-party CMake files. # To check it, find and delete them. grep -o -P '"contrib/[^"]+"' .gitmodules | - grep -v -P 'contrib/(llvm-project|google-protobuf|grpc|abseil-cpp|corrosion)' | + grep -v -P 'contrib/(llvm-project|google-protobuf|grpc|abseil-cpp|corrosion|aws-crt-cpp)' | xargs -I@ find @ \ -'(' -name 'CMakeLists.txt' -or -name '*.cmake' -')' -and -not -name '*.h.cmake' \ -delete diff --git a/docker/test/fasttest/run.sh b/docker/test/fasttest/run.sh index 5af05034415..d78c52f1fe6 100755 --- a/docker/test/fasttest/run.sh +++ b/docker/test/fasttest/run.sh @@ -211,6 +211,17 @@ function build echo "build_clickhouse_fasttest_binary: [ OK ] $BUILD_SECONDS_ELAPSED sec." \ | ts '%Y-%m-%d %H:%M:%S' \ | tee "$FASTTEST_OUTPUT/test_result.txt" + + ( + # This query should fail, and print stacktrace with proper symbol names (even on a stripped binary) + clickhouse_output=$(programs/clickhouse-stripped --stacktrace -q 'select' 2>&1 || :) + if [[ $clickhouse_output =~ DB::LocalServer::main ]]; then + echo "stripped_clickhouse_shows_symbols_names: [ OK ] 0 sec." + else + echo -e "stripped_clickhouse_shows_symbols_names: [ FAIL ] 0 sec. - clickhouse output:\n\n$clickhouse_output\n" + fi + ) | ts '%Y-%m-%d %H:%M:%S' | tee -a "$FASTTEST_OUTPUT/test_result.txt" + if [ "$COPY_CLICKHOUSE_BINARY_TO_OUTPUT" -eq "1" ]; then mkdir -p "$FASTTEST_OUTPUT/binaries/" cp programs/clickhouse "$FASTTEST_OUTPUT/binaries/clickhouse" diff --git a/docs/en/operations/configuration-files.md b/docs/en/operations/configuration-files.md index dfe62d591e3..005c7818eb1 100644 --- a/docs/en/operations/configuration-files.md +++ b/docs/en/operations/configuration-files.md @@ -163,7 +163,7 @@ key: value Corresponding XML: ``` xml -value +value ``` A nested XML node is represented by a YAML map: diff --git a/programs/server/binary.html b/programs/server/binary.html index 74095dff537..eec39cd4463 100644 --- a/programs/server/binary.html +++ b/programs/server/binary.html @@ -70,6 +70,19 @@ if (params.has('password')) { password = params.get('password'); } } + let url = `${host}?allow_introspection_functions=1`; + + if (add_http_cors_header) { + url += '&add_http_cors_header=1'; + } + + if (user) { + url += `&user=${encodeURIComponent(user)}`; + } + if (password) { + url += `&password=${encodeURIComponent(password)}`; + } + let map = L.map('space', { crs: L.CRS.Simple, center: [-512, 512], @@ -103,24 +116,11 @@ const key = `${coords.z}-${coords.x}-${coords.y}`; let buf = cached_tiles[key]; if (!buf) { - let url = `${host}?default_format=RowBinary&allow_introspection_functions=1`; + let request_url = `${url}&default_format=RowBinary` + + `¶m_z=${coords.z}¶m_x=${coords.x}¶m_y=${coords.y}` + + `&enable_http_compression=1&network_compression_method=zstd&network_zstd_compression_level=6`; - if (add_http_cors_header) { - // For debug purposes, you may set add_http_cors_header from a browser console - url += '&add_http_cors_header=1'; - } - - if (user) { - url += `&user=${encodeURIComponent(user)}`; - } - if (password) { - url += `&password=${encodeURIComponent(password)}`; - } - - url += `¶m_z=${coords.z}¶m_x=${coords.x}¶m_y=${coords.y}`; - url += `&enable_http_compression=1&network_compression_method=zstd&network_zstd_compression_level=6`; - - const response = await fetch(url, { method: 'POST', body: sql }); + const response = await fetch(request_url, { method: 'POST', body: sql }); if (!response.ok) { const text = await response.text(); @@ -238,7 +238,7 @@ const addr_hex = '0x' + addr_int.toString(16); const response = fetch( - `http://localhost:8123/?default_format=JSON`, + `${url}&default_format=JSON`, { method: 'POST', body: `SELECT encodeXMLComponent(demangle(addressToSymbol(${addr_int}::UInt64))) AS name, diff --git a/rust/prql/src/lib.rs b/rust/prql/src/lib.rs index fb71d62d527..d51acfbd485 100644 --- a/rust/prql/src/lib.rs +++ b/rust/prql/src/lib.rs @@ -2,6 +2,7 @@ use prql_compiler::sql::Dialect; use prql_compiler::{Options, Target}; use std::ffi::{c_char, CString}; use std::slice; +use std::panic; fn set_output(result: String, out: *mut *mut u8, out_size: *mut u64) { assert!(!out_size.is_null()); @@ -13,8 +14,7 @@ fn set_output(result: String, out: *mut *mut u8, out_size: *mut u64) { *out_ptr = CString::new(result).unwrap().into_raw() as *mut u8; } -#[no_mangle] -pub unsafe extern "C" fn prql_to_sql( +pub unsafe extern "C" fn prql_to_sql_impl( query: *const u8, size: u64, out: *mut *mut u8, @@ -50,6 +50,23 @@ pub unsafe extern "C" fn prql_to_sql( } } +#[no_mangle] +pub unsafe extern "C" fn prql_to_sql( + query: *const u8, + size: u64, + out: *mut *mut u8, + out_size: *mut u64, +) -> i64 { + let ret = panic::catch_unwind(|| { + return prql_to_sql_impl(query, size, out, out_size); + }); + return match ret { + // NOTE: using cxxbridge we can return proper Result<> type. + Err(_err) => 1, + Ok(res) => res, + } +} + #[no_mangle] pub unsafe extern "C" fn prql_free_pointer(ptr_to_free: *mut u8) { std::mem::drop(CString::from_raw(ptr_to_free as *mut c_char)); diff --git a/rust/skim/src/lib.rs b/rust/skim/src/lib.rs index 2221ed63df4..a20b1b35033 100644 --- a/rust/skim/src/lib.rs +++ b/rust/skim/src/lib.rs @@ -1,6 +1,7 @@ use skim::prelude::*; use term::terminfo::TermInfo; use cxx::{CxxString, CxxVector}; +use std::panic; #[cxx::bridge] mod ffi { @@ -36,7 +37,7 @@ impl SkimItem for Item { } } -fn skim(prefix: &CxxString, words: &CxxVector) -> Result { +fn skim_impl(prefix: &CxxString, words: &CxxVector) -> Result { // Let's check is terminal available. To avoid panic. if let Err(err) = TermInfo::from_env() { return Err(format!("{}", err)); @@ -89,3 +90,22 @@ fn skim(prefix: &CxxString, words: &CxxVector) -> Result) -> Result { + let ret = panic::catch_unwind(|| { + return skim_impl(prefix, words); + }); + return match ret { + Err(err) => { + let e = if let Some(s) = err.downcast_ref::() { + format!("{}", s) + } else if let Some(s) = err.downcast_ref::<&str>() { + format!("{}", s) + } else { + format!("Unknown panic type: {:?}", err.type_id()) + }; + Err(format!("Rust panic: {:?}", e)) + }, + Ok(res) => res, + } +} diff --git a/src/AggregateFunctions/AggregateFunctionQuantileGK.cpp b/src/AggregateFunctions/AggregateFunctionQuantileGK.cpp index 2c0b3e55136..2e8ccb2e5e4 100644 --- a/src/AggregateFunctions/AggregateFunctionQuantileGK.cpp +++ b/src/AggregateFunctions/AggregateFunctionQuantileGK.cpp @@ -17,6 +17,7 @@ namespace DB namespace ErrorCodes { extern const int ILLEGAL_TYPE_OF_ARGUMENT; + extern const int INCORRECT_DATA; extern const int LOGICAL_ERROR; extern const int NOT_IMPLEMENTED; } @@ -30,12 +31,12 @@ class ApproxSampler public: struct Stats { - T value; // the sampled value - Int64 g; // the minimum rank jump from the previous value's minimum rank - Int64 delta; // the maximum span of the rank + T value; // The sampled value + Int64 g; // The minimum rank jump from the previous value's minimum rank + Int64 delta; // The maximum span of the rank Stats() = default; - Stats(T value_, Int64 g_, Int64 delta_) : value(value_), g(g_), delta(delta_) {} + Stats(T value_, Int64 g_, Int64 delta_) : value(value_), g(g_), delta(delta_) { } }; struct QueryResult @@ -49,20 +50,20 @@ public: ApproxSampler() = default; - explicit ApproxSampler( - double relative_error_, - size_t compress_threshold_ = default_compress_threshold, - size_t count_ = 0, - bool compressed_ = false) - : relative_error(relative_error_) - , compress_threshold(compress_threshold_) - , count(count_) - , compressed(compressed_) + ApproxSampler(const ApproxSampler & other) + : relative_error(other.relative_error) + , compress_threshold(other.compress_threshold) + , count(other.count) + , compressed(other.compressed) + , sampled(other.sampled.begin(), other.sampled.end()) + , backup_sampled(other.backup_sampled.begin(), other.backup_sampled.end()) + , head_sampled(other.head_sampled.begin(), other.head_sampled.end()) { - sampled.reserve(compress_threshold); - backup_sampled.reserve(compress_threshold); + } - head_sampled.reserve(default_head_size); + explicit ApproxSampler(double relative_error_) + : relative_error(relative_error_), compress_threshold(default_compress_threshold), count(0), compressed(false) + { } bool isCompressed() const { return compressed; } @@ -95,9 +96,9 @@ public: Int64 current_max = std::numeric_limits::min(); for (const auto & stats : sampled) current_max = std::max(stats.delta + stats.g, current_max); - Int64 target_error = current_max/2; + Int64 target_error = current_max / 2; - size_t index= 0; + size_t index = 0; auto min_rank = sampled[0].g; for (size_t i = 0; i < size; ++i) { @@ -118,7 +119,6 @@ public: result[indices[i]] = res.value; } } - } void compress() @@ -256,16 +256,27 @@ public: void read(ReadBuffer & buf) { readBinaryLittleEndian(compress_threshold, buf); + if (compress_threshold != default_compress_threshold) + throw Exception( + ErrorCodes::INCORRECT_DATA, + "The compress threshold {} isn't the expected one {}", + compress_threshold, + default_compress_threshold); + readBinaryLittleEndian(relative_error, buf); readBinaryLittleEndian(count, buf); size_t sampled_len = 0; readBinaryLittleEndian(sampled_len, buf); + if (sampled_len > compress_threshold) + throw Exception( + ErrorCodes::INCORRECT_DATA, "The number of elements {} for quantileGK exceeds {}", sampled_len, compress_threshold); + sampled.resize(sampled_len); for (size_t i = 0; i < sampled_len; ++i) { - auto stats = sampled[i]; + auto & stats = sampled[i]; readBinaryLittleEndian(stats.value, buf); readBinaryLittleEndian(stats.g, buf); readBinaryLittleEndian(stats.delta, buf); @@ -291,7 +302,7 @@ private: min_rank += curr_sample.g; } } - return {sampled.size()-1, 0, sampled.back().value}; + return {sampled.size() - 1, 0, sampled.back().value}; } void withHeadBufferInserted() @@ -389,12 +400,11 @@ private: double relative_error; size_t compress_threshold; - size_t count = 0; + size_t count; bool compressed; PaddedPODArray sampled; PaddedPODArray backup_sampled; - PaddedPODArray head_sampled; static constexpr size_t default_compress_threshold = 10000; @@ -406,17 +416,14 @@ class QuantileGK { private: using Data = ApproxSampler; - mutable Data data; + Data data; public: QuantileGK() = default; explicit QuantileGK(size_t accuracy) : data(1.0 / static_cast(accuracy)) { } - void add(const Value & x) - { - data.insert(x); - } + void add(const Value & x) { data.insert(x); } template void add(const Value &, const Weight &) @@ -429,22 +436,34 @@ public: if (!data.isCompressed()) data.compress(); - data.merge(rhs.data); + if (rhs.data.isCompressed()) + data.merge(rhs.data); + else + { + /// We can't modify rhs, so copy it and compress + Data rhs_data_copy(rhs.data); + rhs_data_copy.compress(); + data.merge(rhs_data_copy); + } } void serialize(WriteBuffer & buf) const { - /// Always compress before serialization - if (!data.isCompressed()) - data.compress(); - - data.write(buf); + if (data.isCompressed()) + data.write(buf); + else + { + /// We can't modify rhs, so copy it and compress + Data data_copy(data); + data_copy.compress(); + data_copy.write(buf); + } } void deserialize(ReadBuffer & buf) { data.read(buf); - + /// Serialized data is always compressed data.setCompressed(); } @@ -481,7 +500,6 @@ public: } }; - template using FuncQuantileGK = AggregateFunctionQuantile, NameQuantileGK, false, void, false, true>; template using FuncQuantilesGK = AggregateFunctionQuantile, NameQuantilesGK, false, void, true, true>; diff --git a/src/Common/Logger.h b/src/Common/Logger.h index 6dcdea9a9d8..0425da8c847 100644 --- a/src/Common/Logger.h +++ b/src/Common/Logger.h @@ -2,6 +2,8 @@ #include +#include + #include #include #include @@ -24,6 +26,16 @@ using LoggerRawPtr = Poco::Logger *; */ LoggerPtr getLogger(const std::string & name); +/** Get Logger with specified name. If the Logger does not exists, it is created. + * This overload was added for specific purpose, when logger is constructed from constexpr string. + * Logger is destroyed only during program shutdown. + */ +template +ALWAYS_INLINE LoggerPtr getLogger(const char (&name)[n]) +{ + return Poco::Logger::getShared(name, false /*should_be_owned_by_shared_ptr_if_created*/); +} + /** Create Logger with specified name, channel and logging level. * If Logger already exists, throws exception. * Logger is destroyed, when last shared ptr that refers to Logger with specified name is destroyed. diff --git a/src/Common/StackTrace.cpp b/src/Common/StackTrace.cpp index 4e5c9bd7893..8431630b16c 100644 --- a/src/Common/StackTrace.cpp +++ b/src/Common/StackTrace.cpp @@ -317,16 +317,19 @@ constexpr std::pair replacements[] // Demangle @c symbol_name if it's not from __functional header (as such functions don't provide any useful // information but pollute stack traces). // Replace parts from @c replacements with shorter aliases -String demangleAndCollapseNames(std::string_view file, const char * const symbol_name) +String demangleAndCollapseNames(std::optional file, const char * const symbol_name) { if (!symbol_name) return "?"; - std::string_view file_copy = file; - if (auto trim_pos = file.find_last_of('/'); trim_pos != file.npos) - file_copy.remove_suffix(file.size() - trim_pos); - if (file_copy.ends_with("functional")) - return "?"; + if (file.has_value()) + { + std::string_view file_copy = file.value(); + if (auto trim_pos = file_copy.find_last_of('/'); trim_pos != file_copy.npos) + file_copy.remove_suffix(file_copy.size() - trim_pos); + if (file_copy.ends_with("functional")) + return "?"; + } String haystack = demangle(symbol_name); @@ -393,8 +396,8 @@ toStringEveryLineImpl([[maybe_unused]] bool fatal, const StackTraceRefTriple & s if (frame.file.has_value() && frame.line.has_value()) out << *frame.file << ':' << *frame.line << ": "; - if (frame.symbol.has_value() && frame.file.has_value()) - out << demangleAndCollapseNames(*frame.file, frame.symbol->data()); + if (frame.symbol.has_value()) + out << demangleAndCollapseNames(frame.file, frame.symbol->data()); else out << "?"; diff --git a/src/Common/tests/gtest_log.cpp b/src/Common/tests/gtest_log.cpp index 622497fe2f5..6d2bd56ad77 100644 --- a/src/Common/tests/gtest_log.cpp +++ b/src/Common/tests/gtest_log.cpp @@ -9,6 +9,7 @@ #include #include #include +#include TEST(Logger, Log) @@ -100,3 +101,75 @@ TEST(Logger, SideEffects) LOG_TRACE(log, "test no throw {}", getLogMessageParamOrThrow()); } + +TEST(Logger, SharedRawLogger) +{ + { + std::ostringstream stream; // STYLE_CHECK_ALLOW_STD_STRING_STREAM + auto stream_channel = Poco::AutoPtr(new Poco::StreamChannel(stream)); + + auto shared_logger = getLogger("Logger_1"); + shared_logger->setChannel(stream_channel.get()); + shared_logger->setLevel("trace"); + + LOG_TRACE(shared_logger, "SharedLogger1Log1"); + LOG_TRACE(getRawLogger("Logger_1"), "RawLogger1Log"); + LOG_TRACE(shared_logger, "SharedLogger1Log2"); + + auto actual = stream.str(); + EXPECT_EQ(actual, "SharedLogger1Log1\nRawLogger1Log\nSharedLogger1Log2\n"); + } + { + std::ostringstream stream; // STYLE_CHECK_ALLOW_STD_STRING_STREAM + auto stream_channel = Poco::AutoPtr(new Poco::StreamChannel(stream)); + + auto * raw_logger = getRawLogger("Logger_2"); + raw_logger->setChannel(stream_channel.get()); + raw_logger->setLevel("trace"); + + LOG_TRACE(getLogger("Logger_2"), "SharedLogger2Log1"); + LOG_TRACE(raw_logger, "RawLogger2Log"); + LOG_TRACE(getLogger("Logger_2"), "SharedLogger2Log2"); + + auto actual = stream.str(); + EXPECT_EQ(actual, "SharedLogger2Log1\nRawLogger2Log\nSharedLogger2Log2\n"); + } +} + +TEST(Logger, SharedLoggersThreadSafety) +{ + static size_t threads_count = std::thread::hardware_concurrency(); + static constexpr size_t loggers_count = 10; + static constexpr size_t logger_get_count = 1000; + + Poco::Logger::root(); + + std::vector names; + + Poco::Logger::names(names); + size_t loggers_size_before = names.size(); + + std::vector threads; + + for (size_t thread_index = 0; thread_index < threads_count; ++thread_index) + { + threads.emplace_back([]() + { + for (size_t logger_index = 0; logger_index < loggers_count; ++logger_index) + { + for (size_t iteration = 0; iteration < logger_get_count; ++iteration) + { + getLogger("Logger_" + std::to_string(logger_index)); + } + } + }); + } + + for (auto & thread : threads) + thread.join(); + + Poco::Logger::names(names); + size_t loggers_size_after = names.size(); + + EXPECT_EQ(loggers_size_before, loggers_size_after); +} diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index 8d50f0a76b1..c82f8301eff 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -136,12 +136,12 @@ namespace { void assertDigest( - const KeeperStorage::Digest & first, - const KeeperStorage::Digest & second, + const KeeperStorage::Digest & expected, + const KeeperStorage::Digest & actual, const Coordination::ZooKeeperRequest & request, bool committing) { - if (!KeeperStorage::checkDigest(first, second)) + if (!KeeperStorage::checkDigest(expected, actual)) { LOG_FATAL( getLogger("KeeperStateMachine"), @@ -149,9 +149,9 @@ void assertDigest( "{}). Keeper will terminate to avoid inconsistencies.\nExtra information about the request:\n{}", committing ? "committing" : "preprocessing", request.getOpNum(), - first.value, - second.value, - first.version, + expected.value, + actual.value, + expected.version, request.toString()); std::terminate(); } diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 992d4ca8a95..f30cbb65182 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -174,7 +174,6 @@ uint64_t calculateDigest(std::string_view path, std::string_view data, const Kee hash.update(data); - hash.update(stat.czxid); hash.update(stat.czxid); hash.update(stat.mzxid); hash.update(stat.ctime); @@ -183,7 +182,6 @@ uint64_t calculateDigest(std::string_view path, std::string_view data, const Kee hash.update(stat.cversion); hash.update(stat.aversion); hash.update(stat.ephemeralOwner); - hash.update(data.length()); hash.update(stat.numChildren); hash.update(stat.pzxid); @@ -2531,6 +2529,17 @@ void KeeperStorage::recalculateStats() container.recalculateDataSize(); } +bool KeeperStorage::checkDigest(const Digest & first, const Digest & second) +{ + if (first.version != second.version) + return true; + + if (first.version == DigestVersion::NO_DIGEST) + return true; + + return first.value == second.value; +} + String KeeperStorage::generateDigest(const String & userdata) { std::vector user_password; diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index 01c1413a884..048adf3ffaa 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -95,10 +95,11 @@ public: { NO_DIGEST = 0, V1 = 1, - V2 = 2 // added system nodes that modify the digest on startup so digest from V0 is invalid + V2 = 2, // added system nodes that modify the digest on startup so digest from V0 is invalid + V3 = 3 // fixed bug with casting, removed duplicate czxid usage }; - static constexpr auto CURRENT_DIGEST_VERSION = DigestVersion::V2; + static constexpr auto CURRENT_DIGEST_VERSION = DigestVersion::V3; struct ResponseForSession { @@ -113,16 +114,7 @@ public: uint64_t value{0}; }; - static bool checkDigest(const Digest & first, const Digest & second) - { - if (first.version != second.version) - return true; - - if (first.version == DigestVersion::NO_DIGEST) - return true; - - return first.value == second.value; - } + static bool checkDigest(const Digest & first, const Digest & second); static String generateDigest(const String & userdata); diff --git a/src/Disks/ObjectStorages/ObjectStorageFactory.cpp b/src/Disks/ObjectStorages/ObjectStorageFactory.cpp index ec6f7081c85..4a6bb924bdc 100644 --- a/src/Disks/ObjectStorages/ObjectStorageFactory.cpp +++ b/src/Disks/ObjectStorages/ObjectStorageFactory.cpp @@ -102,7 +102,7 @@ void checkS3Capabilities( if (s3_capabilities.support_batch_delete && !checkBatchRemove(storage, key_with_trailing_slash)) { LOG_WARNING( - &Poco::Logger::get("S3ObjectStorage"), + getLogger("S3ObjectStorage"), "Storage for disk {} does not support batch delete operations, " "so `s3_capabilities.support_batch_delete` was automatically turned off during the access check. " "To remove this message set `s3_capabilities.support_batch_delete` for the disk to `false`.", diff --git a/src/Disks/ObjectStorages/Web/WebObjectStorage.cpp b/src/Disks/ObjectStorages/Web/WebObjectStorage.cpp index 0223c24973e..786b23caf48 100644 --- a/src/Disks/ObjectStorages/Web/WebObjectStorage.cpp +++ b/src/Disks/ObjectStorages/Web/WebObjectStorage.cpp @@ -82,7 +82,7 @@ WebObjectStorage::loadFiles(const String & path, const std::unique_lock #include #include +#include #include #include #include diff --git a/src/IO/ReadHelpers.h b/src/IO/ReadHelpers.h index 2549b40e243..49530f4787a 100644 --- a/src/IO/ReadHelpers.h +++ b/src/IO/ReadHelpers.h @@ -38,7 +38,6 @@ #include #include #include -#include #include #include @@ -51,6 +50,7 @@ namespace DB template struct Memory; +class PeekableReadBuffer; namespace ErrorCodes { diff --git a/src/Interpreters/GatherFunctionQuantileVisitor.cpp b/src/Interpreters/GatherFunctionQuantileVisitor.cpp index 664bb9e9383..6b6dc362771 100644 --- a/src/Interpreters/GatherFunctionQuantileVisitor.cpp +++ b/src/Interpreters/GatherFunctionQuantileVisitor.cpp @@ -30,6 +30,7 @@ static const std::unordered_map quantile_fuse_name_mapping = {"quantileTDigestWeighted", "quantilesTDigestWeighted"}, {"quantileTiming", "quantilesTiming"}, {"quantileTimingWeighted", "quantilesTimingWeighted"}, + {"quantileGK", "quantilesGK"}, }; String GatherFunctionQuantileData::toFusedNameOrSelf(const String & func_name) diff --git a/src/Interpreters/GlobalSubqueriesVisitor.h b/src/Interpreters/GlobalSubqueriesVisitor.h index 08fbd748e48..5f029395df9 100644 --- a/src/Interpreters/GlobalSubqueriesVisitor.h +++ b/src/Interpreters/GlobalSubqueriesVisitor.h @@ -267,8 +267,10 @@ private: /// We don't support WITH cte as (subquery) Select table JOIN cte because we don't do conversion in AST bool is_subquery = false; if (const auto * ast_table_expr = table_elem.table_expression->as()) - is_subquery = ast_table_expr->subquery->as() != nullptr + { + is_subquery = ast_table_expr->subquery && ast_table_expr->subquery->as() != nullptr && ast_table_expr->subquery->as()->cte_name.empty(); + } else if (table_elem.table_expression->as()) is_subquery = true; diff --git a/src/Processors/Formats/Impl/CSVRowInputFormat.h b/src/Processors/Formats/Impl/CSVRowInputFormat.h index c4b3c8feb8c..fe4d4e3be08 100644 --- a/src/Processors/Formats/Impl/CSVRowInputFormat.h +++ b/src/Processors/Formats/Impl/CSVRowInputFormat.h @@ -7,6 +7,7 @@ #include #include #include +#include namespace DB diff --git a/src/Processors/Formats/Impl/JSONRowInputFormat.cpp b/src/Processors/Formats/Impl/JSONRowInputFormat.cpp index f78ce530ecb..23faa057715 100644 --- a/src/Processors/Formats/Impl/JSONRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/JSONRowInputFormat.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include namespace DB diff --git a/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.h b/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.h index 00a270e9611..32abd532a52 100644 --- a/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.h +++ b/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.h @@ -4,6 +4,7 @@ #include #include #include +#include namespace DB diff --git a/src/Processors/Formats/RowInputFormatWithNamesAndTypes.cpp b/src/Processors/Formats/RowInputFormatWithNamesAndTypes.cpp index 478ce41f924..2ad6a825c8f 100644 --- a/src/Processors/Formats/RowInputFormatWithNamesAndTypes.cpp +++ b/src/Processors/Formats/RowInputFormatWithNamesAndTypes.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index a9f30e6e522..3ca746a7197 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -20,7 +20,6 @@ #include #include #include -#include #include #include #include @@ -47,7 +46,6 @@ #include #include #include -#include #include #include #include @@ -62,9 +60,7 @@ #include #include #include -#include #include -#include #include #include #include @@ -75,12 +71,10 @@ #include #include #include -#include #include #include #include #include -#include #include #include #include @@ -91,13 +85,10 @@ #include #include -#include #include #include -#include #include -#include #include #include #include diff --git a/src/Storages/MergeTree/MergeTreeDataPartCloner.cpp b/src/Storages/MergeTree/MergeTreeDataPartCloner.cpp index 04019d2c665..107e21c2dda 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartCloner.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartCloner.cpp @@ -180,7 +180,7 @@ std::pair cloneSourcePart( } LOG_DEBUG( - &Poco::Logger::get("MergeTreeDataPartCloner"), + getLogger("MergeTreeDataPartCloner"), "Clone {} part {} to {}{}", src_flushed_tmp_part ? "flushed" : "", src_part_storage->getFullPath(), diff --git a/src/Storages/S3Queue/S3QueueFilesMetadata.cpp b/src/Storages/S3Queue/S3QueueFilesMetadata.cpp index 61f6b7fe052..ac80ded5792 100644 --- a/src/Storages/S3Queue/S3QueueFilesMetadata.cpp +++ b/src/Storages/S3Queue/S3QueueFilesMetadata.cpp @@ -637,25 +637,31 @@ void S3QueueFilesMetadata::setFileProcessedForUnorderedMode(ProcessingNodeHolder "this could be a result of expired zookeeper session", path); } + void S3QueueFilesMetadata::setFileProcessedForOrderedMode(ProcessingNodeHolderPtr holder) +{ + auto processed_node_path = isShardedProcessing() + ? zookeeper_processed_path / toString(getProcessingIdForPath(holder->path)) + : zookeeper_processed_path; + + return setFileProcessedForOrderedModeImpl(holder->path, holder, processed_node_path); +} + +void S3QueueFilesMetadata::setFileProcessedForOrderedModeImpl( + const std::string & path, ProcessingNodeHolderPtr holder, const std::string & processed_node_path) { /// Update a persistent node in /processed and remove ephemeral node from /processing. - const auto & path = holder->path; const auto node_name = getNodeName(path); const auto node_metadata = createNodeMetadata(path).toString(); const auto zk_client = getZooKeeper(); - auto processed_node = isShardedProcessing() - ? zookeeper_processed_path / toString(getProcessingIdForPath(path)) - : zookeeper_processed_path; - - LOG_TEST(log, "Setting file `{}` as processed", path); + LOG_TEST(log, "Setting file `{}` as processed (at {})", path, processed_node_path); while (true) { std::string res; Coordination::Stat stat; - bool exists = zk_client->tryGet(processed_node, res, &stat); + bool exists = zk_client->tryGet(processed_node_path, res, &stat); Coordination::Requests requests; if (exists) { @@ -664,39 +670,41 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedMode(ProcessingNodeHolderPt auto metadata = NodeMetadata::fromString(res); if (metadata.file_path >= path) { - /// Here we get in the case that maximum processed file is bigger than ours. - /// This is possible to achieve in case of parallel processing - /// but for local processing we explicitly disable parallel mode and do everything in a single thread - /// (see constructor of StorageS3Queue where s3queue_processing_threads_num is explicitly set to 1 in case of Ordered mode). - /// Nevertheless, in case of distributed processing we cannot do anything with parallelism. - /// What this means? - /// It means that in scenario "distributed processing + Ordered mode" - /// a setting s3queue_loading_retries will not work. It is possible to fix, it is in TODO. - - /// Return because there is nothing to change, - /// the max processed file is already bigger than ours. + LOG_TRACE(log, "File {} is already processed, current max processed file: {}", path, metadata.file_path); return; } } - requests.push_back(zkutil::makeSetRequest(processed_node, node_metadata, stat.version)); + requests.push_back(zkutil::makeSetRequest(processed_node_path, node_metadata, stat.version)); } else { - requests.push_back(zkutil::makeCreateRequest(processed_node, node_metadata, zkutil::CreateMode::Persistent)); + requests.push_back(zkutil::makeCreateRequest(processed_node_path, node_metadata, zkutil::CreateMode::Persistent)); } Coordination::Responses responses; - if (holder->remove(&requests, &responses)) + if (holder) { - LOG_TEST(log, "Moved file `{}` to processed", path); - if (max_loading_retries) - zk_client->tryRemove(zookeeper_failed_path / (node_name + ".retriable"), -1); - return; + if (holder->remove(&requests, &responses)) + { + LOG_TEST(log, "Moved file `{}` to processed", path); + if (max_loading_retries) + zk_client->tryRemove(zookeeper_failed_path / (node_name + ".retriable"), -1); + return; + } + } + else + { + auto code = zk_client->tryMulti(requests, responses); + if (code == Coordination::Error::ZOK) + return; } /// Failed to update max processed node, retry. if (!responses.empty() && responses[0]->error != Coordination::Error::ZOK) + { + LOG_TRACE(log, "Failed to update processed node ({}). Will retry.", magic_enum::enum_name(responses[0]->error)); continue; + } LOG_WARNING(log, "Cannot set file ({}) as processed since processing node " "does not exist with expected processing id does not exist, " @@ -705,6 +713,22 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedMode(ProcessingNodeHolderPt } } +void S3QueueFilesMetadata::setFileProcessed(const std::string & path, size_t shard_id) +{ + if (mode != S3QueueMode::ORDERED) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Can set file as preprocessed only for Ordered mode"); + + if (isShardedProcessing()) + { + for (const auto & processor : getProcessingIdsForShard(shard_id)) + setFileProcessedForOrderedModeImpl(path, nullptr, zookeeper_processed_path / toString(processor)); + } + else + { + setFileProcessedForOrderedModeImpl(path, nullptr, zookeeper_processed_path); + } +} + void S3QueueFilesMetadata::setFileFailed(ProcessingNodeHolderPtr holder, const String & exception_message) { auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileFailedMicroseconds); diff --git a/src/Storages/S3Queue/S3QueueFilesMetadata.h b/src/Storages/S3Queue/S3QueueFilesMetadata.h index c83c6f20b92..9301ea7ceb8 100644 --- a/src/Storages/S3Queue/S3QueueFilesMetadata.h +++ b/src/Storages/S3Queue/S3QueueFilesMetadata.h @@ -42,6 +42,7 @@ public: ~S3QueueFilesMetadata(); void setFileProcessed(ProcessingNodeHolderPtr holder); + void setFileProcessed(const std::string & path, size_t shard_id); void setFileFailed(ProcessingNodeHolderPtr holder, const std::string & exception_message); @@ -141,6 +142,9 @@ private: void setFileProcessedForUnorderedMode(ProcessingNodeHolderPtr holder); std::string getZooKeeperPathForShard(size_t shard_id) const; + void setFileProcessedForOrderedModeImpl( + const std::string & path, ProcessingNodeHolderPtr holder, const std::string & processed_node_path); + enum class SetFileProcessingResult { Success, diff --git a/src/Storages/S3Queue/S3QueueSettings.h b/src/Storages/S3Queue/S3QueueSettings.h index d65b38f77f2..c26e973a1c0 100644 --- a/src/Storages/S3Queue/S3QueueSettings.h +++ b/src/Storages/S3Queue/S3QueueSettings.h @@ -22,6 +22,7 @@ class ASTStorage; M(UInt32, s3queue_loading_retries, 0, "Retry loading up to specified number of times", 0) \ M(UInt32, s3queue_processing_threads_num, 1, "Number of processing threads", 0) \ M(UInt32, s3queue_enable_logging_to_s3queue_log, 1, "Enable logging to system table system.s3queue_log", 0) \ + M(String, s3queue_last_processed_path, "", "For Ordered mode. Files that have lexicographically smaller file name are considered already processed", 0) \ M(UInt32, s3queue_tracked_file_ttl_sec, 0, "Maximum number of seconds to store processed files in ZooKeeper node (store forever by default)", 0) \ M(UInt32, s3queue_polling_min_timeout_ms, 1000, "Minimal timeout before next polling", 0) \ M(UInt32, s3queue_polling_max_timeout_ms, 10000, "Maximum timeout before next polling", 0) \ diff --git a/src/Storages/S3Queue/StorageS3Queue.cpp b/src/Storages/S3Queue/StorageS3Queue.cpp index 23ef9aec980..0723205b544 100644 --- a/src/Storages/S3Queue/StorageS3Queue.cpp +++ b/src/Storages/S3Queue/StorageS3Queue.cpp @@ -155,20 +155,20 @@ StorageS3Queue::StorageS3Queue( LOG_INFO(log, "Using zookeeper path: {}", zk_path.string()); task = getContext()->getSchedulePool().createTask("S3QueueStreamingTask", [this] { threadFunc(); }); - /// Get metadata manager from S3QueueMetadataFactory, - /// it will increase the ref count for the metadata object. - /// The ref count is decreased when StorageS3Queue::drop() method is called. - files_metadata = S3QueueMetadataFactory::instance().getOrCreate(zk_path, *s3queue_settings); try { createOrCheckMetadata(storage_metadata); } catch (...) { - S3QueueMetadataFactory::instance().remove(zk_path); throw; } + /// Get metadata manager from S3QueueMetadataFactory, + /// it will increase the ref count for the metadata object. + /// The ref count is decreased when StorageS3Queue::drop() method is called. + files_metadata = S3QueueMetadataFactory::instance().getOrCreate(zk_path, *s3queue_settings); + if (files_metadata->isShardedProcessing()) { if (!s3queue_settings->s3queue_current_shard_num.changed) @@ -181,6 +181,10 @@ StorageS3Queue::StorageS3Queue( files_metadata->registerNewShard(s3queue_settings->s3queue_current_shard_num); } } + if (s3queue_settings->mode == S3QueueMode::ORDERED && !s3queue_settings->s3queue_last_processed_path.value.empty()) + { + files_metadata->setFileProcessed(s3queue_settings->s3queue_last_processed_path.value, s3queue_settings->s3queue_current_shard_num); + } } void StorageS3Queue::startup() diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index 8b8a151fb1d..0d9e79d1d54 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 1224af4d6cb..6bd57cc4d6d 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -1,6 +1,5 @@ #include -#include #include #include @@ -29,17 +28,14 @@ #include #include #include -#include #include #include #include #include -#include #include #include #include #include -#include #include #include #include @@ -64,21 +60,16 @@ #include #include #include -#include #include -#include #include #include #include #include -#include -#include #include #include #include #include -#include #include #include @@ -106,9 +97,6 @@ #include #include -#include - -#include #include #include diff --git a/src/Storages/VirtualColumnUtils.cpp b/src/Storages/VirtualColumnUtils.cpp index 430ed012fa8..33ff6e7104f 100644 --- a/src/Storages/VirtualColumnUtils.cpp +++ b/src/Storages/VirtualColumnUtils.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include diff --git a/tests/config/config.d/keeper_port.xml b/tests/config/config.d/keeper_port.xml index b87014d2485..b724d5dd87e 100644 --- a/tests/config/config.d/keeper_port.xml +++ b/tests/config/config.d/keeper_port.xml @@ -4,6 +4,7 @@ 1 1 + 1 10000 diff --git a/tests/integration/test_storage_s3_queue/test.py b/tests/integration/test_storage_s3_queue/test.py index 5e86b798bf7..810c4f29e9d 100644 --- a/tests/integration/test_storage_s3_queue/test.py +++ b/tests/integration/test_storage_s3_queue/test.py @@ -99,6 +99,7 @@ def started_cluster(): main_configs=[ "configs/s3queue_log.xml", ], + stay_alive=True, ) logging.info("Starting cluster...") @@ -539,10 +540,7 @@ def test_multiple_tables_meta_mismatch(started_cluster): }, ) except QueryRuntimeException as e: - assert ( - "Metadata with the same `s3queue_zookeeper_path` was already created but with different settings" - in str(e) - ) + assert "Existing table metadata in ZooKeeper differs in engine mode" in str(e) failed = True assert failed is True @@ -1283,3 +1281,108 @@ def test_settings_check(started_cluster): ) node.query(f"DROP TABLE {table_name} SYNC") + + +@pytest.mark.parametrize("processing_threads", [1, 5]) +def test_processed_file_setting(started_cluster, processing_threads): + node = started_cluster.instances["instance"] + table_name = f"test_processed_file_setting_{processing_threads}" + dst_table_name = f"{table_name}_dst" + keeper_path = f"/clickhouse/test_{table_name}" + files_path = f"{table_name}_data" + files_to_generate = 10 + + create_table( + started_cluster, + node, + table_name, + "ordered", + files_path, + additional_settings={ + "keeper_path": keeper_path, + "s3queue_processing_threads_num": processing_threads, + "s3queue_last_processed_path": f"{files_path}/test_5.csv", + }, + ) + total_values = generate_random_files( + started_cluster, files_path, files_to_generate, start_ind=0, row_num=1 + ) + + create_mv(node, table_name, dst_table_name) + + def get_count(): + return int(node.query(f"SELECT count() FROM {dst_table_name}")) + + expected_rows = 4 + for _ in range(20): + if expected_rows == get_count(): + break + time.sleep(1) + + assert expected_rows == get_count() + + node.restart_clickhouse() + time.sleep(10) + + expected_rows = 4 + for _ in range(20): + if expected_rows == get_count(): + break + time.sleep(1) + + assert expected_rows == get_count() + + +@pytest.mark.parametrize("processing_threads", [1, 5]) +def test_processed_file_setting_distributed(started_cluster, processing_threads): + node = started_cluster.instances["instance"] + node_2 = started_cluster.instances["instance2"] + table_name = f"test_processed_file_setting_distributed_{processing_threads}" + dst_table_name = f"{table_name}_dst" + keeper_path = f"/clickhouse/test_{table_name}" + files_path = f"{table_name}_data" + files_to_generate = 10 + + for instance in [node, node_2]: + create_table( + started_cluster, + instance, + table_name, + "ordered", + files_path, + additional_settings={ + "keeper_path": keeper_path, + "s3queue_processing_threads_num": processing_threads, + "s3queue_last_processed_path": f"{files_path}/test_5.csv", + "s3queue_total_shards_num": 2, + }, + ) + + total_values = generate_random_files( + started_cluster, files_path, files_to_generate, start_ind=0, row_num=1 + ) + + for instance in [node, node_2]: + create_mv(instance, table_name, dst_table_name) + + def get_count(): + query = f"SELECT count() FROM {dst_table_name}" + return int(node.query(query)) + int(node_2.query(query)) + + expected_rows = 4 + for _ in range(20): + if expected_rows == get_count(): + break + time.sleep(1) + assert expected_rows == get_count() + + for instance in [node, node_2]: + instance.restart_clickhouse() + + time.sleep(10) + expected_rows = 4 + for _ in range(20): + if expected_rows == get_count(): + break + time.sleep(1) + assert expected_rows == get_count() diff --git a/tests/queries/0_stateless/02661_quantile_approx.reference b/tests/queries/0_stateless/02661_quantile_approx.reference index 8369363aa9b..0ee846a268b 100644 --- a/tests/queries/0_stateless/02661_quantile_approx.reference +++ b/tests/queries/0_stateless/02661_quantile_approx.reference @@ -19,6 +19,20 @@ select quantilesGK(1000, 100/1000, 200/1000, 250/1000, 314/1000, 777/1000)(numbe [99,199,249,313,776] select quantilesGK(10000, 100/1000, 200/1000, 250/1000, 314/1000, 777/1000)(number + 1) from numbers(1000); [100,200,250,314,777] +SELECT quantileGKMerge(100, 0.5)(x) +FROM +( + SELECT quantileGKState(100, 0.5)(number + 1) AS x + FROM numbers(49999) +); +24902 +SELECT quantilesGKMerge(100, 0.5, 0.9, 0.99)(x) +FROM +( + SELECT quantilesGKState(100, 0.5, 0.9, 0.99)(number + 1) AS x + FROM numbers(49999) +); +[24902,44518,49999] select medianGK()(number) from numbers(10) SETTINGS allow_experimental_analyzer = 0; -- { serverError BAD_ARGUMENTS } select medianGK()(number) from numbers(10) SETTINGS allow_experimental_analyzer = 1; -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH } select quantileGK()(number) from numbers(10) SETTINGS allow_experimental_analyzer = 0; -- { serverError BAD_ARGUMENTS } diff --git a/tests/queries/0_stateless/02661_quantile_approx.sql b/tests/queries/0_stateless/02661_quantile_approx.sql index 52c2979ad44..c0004260fa1 100644 --- a/tests/queries/0_stateless/02661_quantile_approx.sql +++ b/tests/queries/0_stateless/02661_quantile_approx.sql @@ -15,6 +15,19 @@ select quantilesGK(100, 100/1000, 200/1000, 250/1000, 314/1000, 777/1000)(number select quantilesGK(1000, 100/1000, 200/1000, 250/1000, 314/1000, 777/1000)(number + 1) from numbers(1000); select quantilesGK(10000, 100/1000, 200/1000, 250/1000, 314/1000, 777/1000)(number + 1) from numbers(1000); +SELECT quantileGKMerge(100, 0.5)(x) +FROM +( + SELECT quantileGKState(100, 0.5)(number + 1) AS x + FROM numbers(49999) +); + +SELECT quantilesGKMerge(100, 0.5, 0.9, 0.99)(x) +FROM +( + SELECT quantilesGKState(100, 0.5, 0.9, 0.99)(number + 1) AS x + FROM numbers(49999) +); select medianGK()(number) from numbers(10) SETTINGS allow_experimental_analyzer = 0; -- { serverError BAD_ARGUMENTS } select medianGK()(number) from numbers(10) SETTINGS allow_experimental_analyzer = 1; -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH } diff --git a/tests/queries/0_stateless/02720_row_policy_column_with_dots.reference b/tests/queries/0_stateless/02720_row_policy_column_with_dots.reference index dd2c30cc9f8..d00491fd7e5 100644 --- a/tests/queries/0_stateless/02720_row_policy_column_with_dots.reference +++ b/tests/queries/0_stateless/02720_row_policy_column_with_dots.reference @@ -1 +1 @@ -2024-01-01 Hello World +1 diff --git a/tests/queries/0_stateless/02720_row_policy_column_with_dots.sql b/tests/queries/0_stateless/02720_row_policy_column_with_dots.sql index 361bd0e0ec7..fcb0bf62859 100644 --- a/tests/queries/0_stateless/02720_row_policy_column_with_dots.sql +++ b/tests/queries/0_stateless/02720_row_policy_column_with_dots.sql @@ -1,6 +1,6 @@ -CREATE table if not exists table_with_dot_column (date Date, regular_column String, `other_column.2` String) ENGINE = MergeTree() ORDER BY date; -INSERT INTO table_with_dot_column select '2020-01-01', 'Hello', 'World'; -INSERT INTO table_with_dot_column select '2024-01-01', 'Hello', 'World'; +CREATE TABLE IF NOT EXISTS table_with_dot_column (date Date, regular_column String, `other_column.2` String) ENGINE = MergeTree() ORDER BY date; +INSERT INTO table_with_dot_column SELECT '2020-01-01', 'Hello', 'World'; +INSERT INTO table_with_dot_column SELECT toDate(now() + 48*3600), 'Hello', 'World'; CREATE ROW POLICY IF NOT EXISTS row_policy ON table_with_dot_column USING toDate(date) >= today() - 30 TO ALL; -SELECT * FROM table_with_dot_column; +SELECT count(*) FROM table_with_dot_column; DROP TABLE table_with_dot_column; diff --git a/tests/queries/0_stateless/02972_parallel_replicas_cte.reference b/tests/queries/0_stateless/02972_parallel_replicas_cte.reference index fe21e3cec22..449fe3d34e3 100644 --- a/tests/queries/0_stateless/02972_parallel_replicas_cte.reference +++ b/tests/queries/0_stateless/02972_parallel_replicas_cte.reference @@ -1,2 +1,3 @@ 990000 990000 +10 diff --git a/tests/queries/0_stateless/02972_parallel_replicas_cte.sql b/tests/queries/0_stateless/02972_parallel_replicas_cte.sql index d65374a3e02..c39ad172a27 100644 --- a/tests/queries/0_stateless/02972_parallel_replicas_cte.sql +++ b/tests/queries/0_stateless/02972_parallel_replicas_cte.sql @@ -19,5 +19,9 @@ WITH filtered_groups AS (SELECT a FROM pr_1 WHERE a >= 10000) SELECT count() FROM pr_2 INNER JOIN filtered_groups ON pr_2.a = filtered_groups.a SETTINGS allow_experimental_analyzer = 0, allow_experimental_parallel_reading_from_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_two_shards', max_parallel_replicas = 3; -- { serverError SUPPORT_IS_DISABLED } +-- Sanitizer +SELECT count() FROM pr_2 JOIN numbers(10) as pr_1 ON pr_2.a = pr_1.number +SETTINGS allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_two_shards', max_parallel_replicas = 3; + DROP TABLE IF EXISTS pr_1; DROP TABLE IF EXISTS pr_2; diff --git a/utils/keeper-data-dumper/main.cpp b/utils/keeper-data-dumper/main.cpp index e06b301edbf..9e107c99534 100644 --- a/utils/keeper-data-dumper/main.cpp +++ b/utils/keeper-data-dumper/main.cpp @@ -59,7 +59,7 @@ int main(int argc, char *argv[]) Poco::Logger::root().setChannel(channel); Poco::Logger::root().setLevel("trace"); } - auto * logger = &Poco::Logger::get("keeper-dumper"); + auto logger = getLogger("keeper-dumper"); ResponsesQueue queue(std::numeric_limits::max()); SnapshotsQueue snapshots_queue{1}; CoordinationSettingsPtr settings = std::make_shared();