Merge branch 'master' into opt_if_map

This commit is contained in:
taiyang-li 2024-02-04 15:57:06 +08:00
commit c3959611fc
63 changed files with 712 additions and 341 deletions

2
.gitmodules vendored
View File

@ -99,7 +99,7 @@
url = https://github.com/awslabs/aws-c-event-stream url = https://github.com/awslabs/aws-c-event-stream
[submodule "aws-c-common"] [submodule "aws-c-common"]
path = contrib/aws-c-common path = contrib/aws-c-common
url = https://github.com/ClickHouse/aws-c-common url = https://github.com/awslabs/aws-c-common.git
[submodule "aws-checksums"] [submodule "aws-checksums"]
path = contrib/aws-checksums path = contrib/aws-checksums
url = https://github.com/awslabs/aws-checksums url = https://github.com/awslabs/aws-checksums

View File

@ -166,6 +166,12 @@ set (SRCS
) )
add_library (_poco_foundation ${SRCS}) add_library (_poco_foundation ${SRCS})
target_link_libraries (_poco_foundation
PUBLIC
boost::headers_only
boost::system
)
add_library (Poco::Foundation ALIAS _poco_foundation) add_library (Poco::Foundation ALIAS _poco_foundation)
# TODO: remove these warning exclusions # TODO: remove these warning exclusions

View File

@ -22,6 +22,9 @@
#include <cstddef> #include <cstddef>
#include <map> #include <map>
#include <vector> #include <vector>
#include <boost/smart_ptr/intrusive_ptr.hpp>
#include "Poco/Channel.h" #include "Poco/Channel.h"
#include "Poco/Format.h" #include "Poco/Format.h"
#include "Poco/Foundation.h" #include "Poco/Foundation.h"
@ -34,7 +37,7 @@ namespace Poco
class Exception; class Exception;
class Logger; class Logger;
using LoggerPtr = std::shared_ptr<Logger>; using LoggerPtr = boost::intrusive_ptr<Logger>;
class Foundation_API Logger : public Channel class Foundation_API Logger : public Channel
/// Logger is a special Channel that acts as the main /// Logger is a special Channel that acts as the main
@ -871,21 +874,11 @@ public:
/// If the Logger does not yet exist, it is created, based /// If the Logger does not yet exist, it is created, based
/// on its parent logger. /// on its parent logger.
static LoggerPtr getShared(const std::string & name); static LoggerPtr getShared(const std::string & name, bool should_be_owned_by_shared_ptr_if_created = true);
/// Returns a shared pointer to the Logger with the given name. /// Returns a shared pointer to the Logger with the given name.
/// If the Logger does not yet exist, it is created, based /// If the Logger does not yet exist, it is created, based
/// on its parent logger. /// on its parent logger.
static Logger & unsafeGet(const std::string & name);
/// Returns a reference to the Logger with the given name.
/// If the Logger does not yet exist, it is created, based
/// on its parent logger.
///
/// WARNING: This method is not thread safe. You should
/// probably use get() instead.
/// The only time this method should be used is during
/// program initialization, when only one thread is running.
static Logger & create(const std::string & name, Channel * pChannel, int level = Message::PRIO_INFORMATION); static Logger & create(const std::string & name, Channel * pChannel, int level = Message::PRIO_INFORMATION);
/// Creates and returns a reference to a Logger with the /// Creates and returns a reference to a Logger with the
/// given name. The Logger's Channel and log level as set as /// given name. The Logger's Channel and log level as set as
@ -932,6 +925,16 @@ public:
static const std::string ROOT; /// The name of the root logger (""). static const std::string ROOT; /// The name of the root logger ("").
public:
struct LoggerEntry
{
Poco::Logger * logger;
bool owned_by_shared_ptr = false;
};
using LoggerMap = std::unordered_map<std::string, LoggerEntry>;
using LoggerMapIterator = LoggerMap::iterator;
protected: protected:
Logger(const std::string & name, Channel * pChannel, int level); Logger(const std::string & name, Channel * pChannel, int level);
~Logger(); ~Logger();
@ -940,12 +943,19 @@ protected:
void log(const std::string & text, Message::Priority prio, const char * file, int line); void log(const std::string & text, Message::Priority prio, const char * file, int line);
static std::string format(const std::string & fmt, int argc, std::string argv[]); static std::string format(const std::string & fmt, int argc, std::string argv[]);
static Logger & unsafeCreate(const std::string & name, Channel * pChannel, int level = Message::PRIO_INFORMATION);
static Logger & parent(const std::string & name);
static void add(Logger * pLogger);
static Logger * find(const std::string & name);
private: private:
static std::pair<Logger::LoggerMapIterator, bool> unsafeGet(const std::string & name, bool get_shared);
static Logger * unsafeGetRawPtr(const std::string & name);
static std::pair<LoggerMapIterator, bool> unsafeCreate(const std::string & name, Channel * pChannel, int level = Message::PRIO_INFORMATION);
static Logger & parent(const std::string & name);
static std::pair<LoggerMapIterator, bool> add(Logger * pLogger);
static std::optional<LoggerMapIterator> find(const std::string & name);
static Logger * findRawPtr(const std::string & name);
friend void intrusive_ptr_add_ref(Logger * ptr);
friend void intrusive_ptr_release(Logger * ptr);
Logger(); Logger();
Logger(const Logger &); Logger(const Logger &);
Logger & operator=(const Logger &); Logger & operator=(const Logger &);

View File

@ -53,11 +53,10 @@ protected:
virtual ~RefCountedObject(); virtual ~RefCountedObject();
/// Destroys the RefCountedObject. /// Destroys the RefCountedObject.
mutable std::atomic<size_t> _counter;
private: private:
RefCountedObject(const RefCountedObject &); RefCountedObject(const RefCountedObject &);
RefCountedObject & operator=(const RefCountedObject &); RefCountedObject & operator=(const RefCountedObject &);
mutable std::atomic<size_t> _counter;
}; };

View File

@ -38,14 +38,7 @@ std::mutex & getLoggerMutex()
return *logger_mutex; return *logger_mutex;
} }
struct LoggerEntry Poco::Logger::LoggerMap * _pLoggerMap = nullptr;
{
Poco::Logger * logger;
bool owned_by_shared_ptr = false;
};
using LoggerMap = std::unordered_map<std::string, LoggerEntry>;
LoggerMap * _pLoggerMap = nullptr;
} }
@ -309,38 +302,9 @@ void Logger::formatDump(std::string& message, const void* buffer, std::size_t le
namespace namespace
{ {
struct LoggerDeleter
{
void operator()(Poco::Logger * logger)
{
std::lock_guard<std::mutex> lock(getLoggerMutex());
/// If logger infrastructure is destroyed just decrement logger reference count
if (!_pLoggerMap)
{
logger->release();
return;
}
auto it = _pLoggerMap->find(logger->name());
assert(it != _pLoggerMap->end());
/** If reference count is 1, this means this shared pointer owns logger
* and need destroy it.
*/
size_t reference_count_before_release = logger->release();
if (reference_count_before_release == 1)
{
assert(it->second.owned_by_shared_ptr);
_pLoggerMap->erase(it);
}
}
};
inline LoggerPtr makeLoggerPtr(Logger & logger) inline LoggerPtr makeLoggerPtr(Logger & logger)
{ {
return std::shared_ptr<Logger>(&logger, LoggerDeleter()); return LoggerPtr(&logger, false /*add_ref*/);
} }
} }
@ -350,64 +314,87 @@ Logger& Logger::get(const std::string& name)
{ {
std::lock_guard<std::mutex> lock(getLoggerMutex()); std::lock_guard<std::mutex> lock(getLoggerMutex());
Logger & logger = unsafeGet(name); auto [it, inserted] = unsafeGet(name, false /*get_shared*/);
return *it->second.logger;
/** If there are already shared pointer created for this logger
* we need to increment Logger reference count and now logger
* is owned by logger infrastructure.
*/
auto it = _pLoggerMap->find(name);
if (it->second.owned_by_shared_ptr)
{
it->second.logger->duplicate();
it->second.owned_by_shared_ptr = false;
}
return logger;
} }
LoggerPtr Logger::getShared(const std::string & name) LoggerPtr Logger::getShared(const std::string & name, bool should_be_owned_by_shared_ptr_if_created)
{ {
std::lock_guard<std::mutex> lock(getLoggerMutex()); std::lock_guard<std::mutex> lock(getLoggerMutex());
bool logger_exists = _pLoggerMap && _pLoggerMap->contains(name); auto [it, inserted] = unsafeGet(name, true /*get_shared*/);
Logger & logger = unsafeGet(name); /** If during `unsafeGet` logger was created, then this shared pointer owns it.
* If logger was already created, then this shared pointer does not own it.
/** If logger already exists, then this shared pointer does not own it.
* If logger does not exists, logger infrastructure could be already destroyed
* or logger was created.
*/ */
if (logger_exists) if (inserted)
{ {
logger.duplicate(); if (should_be_owned_by_shared_ptr_if_created)
} it->second.owned_by_shared_ptr = true;
else if (_pLoggerMap) else
{ it->second.logger->duplicate();
_pLoggerMap->find(name)->second.owned_by_shared_ptr = true;
} }
return makeLoggerPtr(logger); return makeLoggerPtr(*it->second.logger);
} }
Logger& Logger::unsafeGet(const std::string& name) std::pair<Logger::LoggerMapIterator, bool> Logger::unsafeGet(const std::string& name, bool get_shared)
{ {
Logger* pLogger = find(name); std::optional<Logger::LoggerMapIterator> optional_logger_it = find(name);
if (!pLogger)
bool should_recreate_logger = false;
if (optional_logger_it)
{ {
auto & logger_it = *optional_logger_it;
std::optional<size_t> reference_count_before;
if (get_shared)
{
reference_count_before = logger_it->second.logger->duplicate();
}
else if (logger_it->second.owned_by_shared_ptr)
{
reference_count_before = logger_it->second.logger->duplicate();
logger_it->second.owned_by_shared_ptr = false;
}
/// Other thread already decided to delete this logger, but did not yet remove it from map
if (reference_count_before && reference_count_before == 0)
should_recreate_logger = true;
}
if (!optional_logger_it || should_recreate_logger)
{
Logger * logger = nullptr;
if (name == ROOT) if (name == ROOT)
{ {
pLogger = new Logger(name, 0, Message::PRIO_INFORMATION); logger = new Logger(name, nullptr, Message::PRIO_INFORMATION);
} }
else else
{ {
Logger& par = parent(name); Logger& par = parent(name);
pLogger = new Logger(name, par.getChannel(), par.getLevel()); logger = new Logger(name, par.getChannel(), par.getLevel());
} }
add(pLogger);
if (should_recreate_logger)
{
(*optional_logger_it)->second.logger = logger;
return std::make_pair(*optional_logger_it, true);
}
return add(logger);
} }
return *pLogger;
return std::make_pair(*optional_logger_it, false);
}
Logger * Logger::unsafeGetRawPtr(const std::string & name)
{
return unsafeGet(name, false /*get_shared*/).first->second.logger;
} }
@ -415,24 +402,24 @@ Logger& Logger::create(const std::string& name, Channel* pChannel, int level)
{ {
std::lock_guard<std::mutex> lock(getLoggerMutex()); std::lock_guard<std::mutex> lock(getLoggerMutex());
return unsafeCreate(name, pChannel, level); return *unsafeCreate(name, pChannel, level).first->second.logger;
} }
LoggerPtr Logger::createShared(const std::string & name, Channel * pChannel, int level) LoggerPtr Logger::createShared(const std::string & name, Channel * pChannel, int level)
{ {
std::lock_guard<std::mutex> lock(getLoggerMutex()); std::lock_guard<std::mutex> lock(getLoggerMutex());
Logger & logger = unsafeCreate(name, pChannel, level); auto [it, inserted] = unsafeCreate(name, pChannel, level);
_pLoggerMap->find(name)->second.owned_by_shared_ptr = true; it->second.owned_by_shared_ptr = true;
return makeLoggerPtr(logger); return makeLoggerPtr(*it->second.logger);
} }
Logger& Logger::root() Logger& Logger::root()
{ {
std::lock_guard<std::mutex> lock(getLoggerMutex()); std::lock_guard<std::mutex> lock(getLoggerMutex());
return unsafeGet(ROOT); return *unsafeGetRawPtr(ROOT);
} }
@ -440,7 +427,11 @@ Logger* Logger::has(const std::string& name)
{ {
std::lock_guard<std::mutex> lock(getLoggerMutex()); std::lock_guard<std::mutex> lock(getLoggerMutex());
return find(name); auto optional_it = find(name);
if (!optional_it)
return nullptr;
return (*optional_it)->second.logger;
} }
@ -459,20 +450,69 @@ void Logger::shutdown()
} }
delete _pLoggerMap; delete _pLoggerMap;
_pLoggerMap = 0; _pLoggerMap = nullptr;
} }
} }
Logger* Logger::find(const std::string& name) std::optional<Logger::LoggerMapIterator> Logger::find(const std::string& name)
{ {
if (_pLoggerMap) if (_pLoggerMap)
{ {
LoggerMap::iterator it = _pLoggerMap->find(name); LoggerMap::iterator it = _pLoggerMap->find(name);
if (it != _pLoggerMap->end()) if (it != _pLoggerMap->end())
return it->second.logger; return it;
return {};
} }
return 0;
return {};
}
Logger * Logger::findRawPtr(const std::string & name)
{
auto optional_it = find(name);
if (!optional_it)
return nullptr;
return (*optional_it)->second.logger;
}
void intrusive_ptr_add_ref(Logger * ptr)
{
ptr->duplicate();
}
void intrusive_ptr_release(Logger * ptr)
{
size_t reference_count_before = ptr->_counter.fetch_sub(1, std::memory_order_acq_rel);
if (reference_count_before != 1)
return;
{
std::lock_guard<std::mutex> lock(getLoggerMutex());
if (_pLoggerMap)
{
auto it = _pLoggerMap->find(ptr->name());
/** It is possible that during release other thread created logger and
* updated iterator in map.
*/
if (it != _pLoggerMap->end() && ptr == it->second.logger)
{
/** If reference count is 0, this means this intrusive pointer owns logger
* and need destroy it.
*/
assert(it->second.owned_by_shared_ptr);
_pLoggerMap->erase(it);
}
}
}
delete ptr;
} }
@ -490,28 +530,28 @@ void Logger::names(std::vector<std::string>& names)
} }
} }
Logger& Logger::unsafeCreate(const std::string & name, Channel * pChannel, int level)
std::pair<Logger::LoggerMapIterator, bool> Logger::unsafeCreate(const std::string & name, Channel * pChannel, int level)
{ {
if (find(name)) throw ExistsException(); if (find(name)) throw ExistsException();
Logger* pLogger = new Logger(name, pChannel, level); Logger* pLogger = new Logger(name, pChannel, level);
add(pLogger); return add(pLogger);
return *pLogger;
} }
Logger& Logger::parent(const std::string& name) Logger& Logger::parent(const std::string& name)
{ {
std::string::size_type pos = name.rfind('.'); std::string::size_type pos = name.rfind('.');
if (pos != std::string::npos) if (pos != std::string::npos)
{ {
std::string pname = name.substr(0, pos); std::string pname = name.substr(0, pos);
Logger* pParent = find(pname); Logger* pParent = findRawPtr(pname);
if (pParent) if (pParent)
return *pParent; return *pParent;
else else
return parent(pname); return parent(pname);
} }
else return unsafeGet(ROOT); else return *unsafeGetRawPtr(ROOT);
} }
@ -579,12 +619,14 @@ namespace
} }
void Logger::add(Logger* pLogger) std::pair<Logger::LoggerMapIterator, bool> Logger::add(Logger* pLogger)
{ {
if (!_pLoggerMap) if (!_pLoggerMap)
_pLoggerMap = new LoggerMap; _pLoggerMap = new Logger::LoggerMap;
_pLoggerMap->emplace(pLogger->name(), LoggerEntry{pLogger, false /*owned_by_shared_ptr*/}); auto result = _pLoggerMap->emplace(pLogger->name(), LoggerEntry{pLogger, false /*owned_by_shared_ptr*/});
assert(result.second);
return result;
} }

2
contrib/aws vendored

@ -1 +1 @@
Subproject commit ca02358dcc7ce3ab733dd4cbcc32734eecfa4ee3 Subproject commit 4ec215f3607c2111bf2cc91ba842046a6b5eb0c4

2
contrib/aws-c-auth vendored

@ -1 +1 @@
Subproject commit 97133a2b5dbca1ccdf88cd6f44f39d0531d27d12 Subproject commit baeffa791d9d1cf61460662a6d9ac2186aaf05df

2
contrib/aws-c-cal vendored

@ -1 +1 @@
Subproject commit 85dd7664b786a389c6fb1a6f031ab4bb2282133d Subproject commit 9453687ff5493ba94eaccf8851200565c4364c77

@ -1 +1 @@
Subproject commit 45dcb2849c891dba2100b270b4676765c92949ff Subproject commit 80f21b3cac5ac51c6b8a62c7d2a5ef58a75195ee

@ -1 +1 @@
Subproject commit b517b7decd0dac30be2162f5186c250221c53aff Subproject commit 99ec79ee2970f1a045d4ced1501b97ee521f2f85

@ -1 +1 @@
Subproject commit 2f9b60c42f90840ec11822acda3d8cdfa97a773d Subproject commit 08f24e384e5be20bcffa42b49213d24dad7881ae

2
contrib/aws-c-http vendored

@ -1 +1 @@
Subproject commit dd34461987947672444d0bc872c5a733dfdb9711 Subproject commit a082f8a2067e4a31db73f1d4ffd702a8dc0f7089

2
contrib/aws-c-io vendored

@ -1 +1 @@
Subproject commit d58ed4f272b1cb4f89ac9196526ceebe5f2b0d89 Subproject commit 11ce3c750a1dac7b04069fc5bff89e97e91bad4d

2
contrib/aws-c-mqtt vendored

@ -1 +1 @@
Subproject commit 33c3455cec82b16feb940e12006cefd7b3ef4194 Subproject commit 6d36cd3726233cb757468d0ea26f6cd8dad151ec

2
contrib/aws-c-s3 vendored

@ -1 +1 @@
Subproject commit d7bfe602d6925948f1fff95784e3613cca6a3900 Subproject commit de36fee8fe7ab02f10987877ae94a805bf440c1f

@ -1 +1 @@
Subproject commit 208a701fa01e99c7c8cc3dcebc8317da71362972 Subproject commit fd8c0ba2e233997eaaefe82fb818b8b444b956d3

@ -1 +1 @@
Subproject commit ad53be196a25bbefa3700a01187fdce573a7d2d0 Subproject commit 321b805559c8e911be5bddba13fcbd222a3e2d3a

View File

@ -25,6 +25,7 @@ include("${ClickHouse_SOURCE_DIR}/contrib/aws-cmake/AwsFeatureTests.cmake")
include("${ClickHouse_SOURCE_DIR}/contrib/aws-cmake/AwsThreadAffinity.cmake") include("${ClickHouse_SOURCE_DIR}/contrib/aws-cmake/AwsThreadAffinity.cmake")
include("${ClickHouse_SOURCE_DIR}/contrib/aws-cmake/AwsThreadName.cmake") include("${ClickHouse_SOURCE_DIR}/contrib/aws-cmake/AwsThreadName.cmake")
include("${ClickHouse_SOURCE_DIR}/contrib/aws-cmake/AwsSIMD.cmake") include("${ClickHouse_SOURCE_DIR}/contrib/aws-cmake/AwsSIMD.cmake")
include("${ClickHouse_SOURCE_DIR}/contrib/aws-crt-cpp/cmake/AwsGetVersion.cmake")
# Gather sources and options. # Gather sources and options.
@ -35,6 +36,8 @@ set(AWS_PUBLIC_COMPILE_DEFS)
set(AWS_PRIVATE_COMPILE_DEFS) set(AWS_PRIVATE_COMPILE_DEFS)
set(AWS_PRIVATE_LIBS) set(AWS_PRIVATE_LIBS)
list(APPEND AWS_PRIVATE_COMPILE_DEFS "-DINTEL_NO_ITTNOTIFY_API")
if (CMAKE_BUILD_TYPE_UC STREQUAL "DEBUG") if (CMAKE_BUILD_TYPE_UC STREQUAL "DEBUG")
list(APPEND AWS_PRIVATE_COMPILE_DEFS "-DDEBUG_BUILD") list(APPEND AWS_PRIVATE_COMPILE_DEFS "-DDEBUG_BUILD")
endif() endif()
@ -85,14 +88,20 @@ file(GLOB AWS_SDK_CORE_SRC
"${AWS_SDK_CORE_DIR}/source/external/cjson/*.cpp" "${AWS_SDK_CORE_DIR}/source/external/cjson/*.cpp"
"${AWS_SDK_CORE_DIR}/source/external/tinyxml2/*.cpp" "${AWS_SDK_CORE_DIR}/source/external/tinyxml2/*.cpp"
"${AWS_SDK_CORE_DIR}/source/http/*.cpp" "${AWS_SDK_CORE_DIR}/source/http/*.cpp"
"${AWS_SDK_CORE_DIR}/source/http/crt/*.cpp"
"${AWS_SDK_CORE_DIR}/source/http/standard/*.cpp" "${AWS_SDK_CORE_DIR}/source/http/standard/*.cpp"
"${AWS_SDK_CORE_DIR}/source/internal/*.cpp" "${AWS_SDK_CORE_DIR}/source/internal/*.cpp"
"${AWS_SDK_CORE_DIR}/source/monitoring/*.cpp" "${AWS_SDK_CORE_DIR}/source/monitoring/*.cpp"
"${AWS_SDK_CORE_DIR}/source/net/*.cpp"
"${AWS_SDK_CORE_DIR}/source/net/linux-shared/*.cpp"
"${AWS_SDK_CORE_DIR}/source/platform/linux-shared/*.cpp"
"${AWS_SDK_CORE_DIR}/source/smithy/tracing/*.cpp"
"${AWS_SDK_CORE_DIR}/source/utils/*.cpp" "${AWS_SDK_CORE_DIR}/source/utils/*.cpp"
"${AWS_SDK_CORE_DIR}/source/utils/base64/*.cpp" "${AWS_SDK_CORE_DIR}/source/utils/base64/*.cpp"
"${AWS_SDK_CORE_DIR}/source/utils/component-registry/*.cpp"
"${AWS_SDK_CORE_DIR}/source/utils/crypto/*.cpp" "${AWS_SDK_CORE_DIR}/source/utils/crypto/*.cpp"
"${AWS_SDK_CORE_DIR}/source/utils/crypto/openssl/*.cpp"
"${AWS_SDK_CORE_DIR}/source/utils/crypto/factory/*.cpp" "${AWS_SDK_CORE_DIR}/source/utils/crypto/factory/*.cpp"
"${AWS_SDK_CORE_DIR}/source/utils/crypto/openssl/*.cpp"
"${AWS_SDK_CORE_DIR}/source/utils/event/*.cpp" "${AWS_SDK_CORE_DIR}/source/utils/event/*.cpp"
"${AWS_SDK_CORE_DIR}/source/utils/json/*.cpp" "${AWS_SDK_CORE_DIR}/source/utils/json/*.cpp"
"${AWS_SDK_CORE_DIR}/source/utils/logging/*.cpp" "${AWS_SDK_CORE_DIR}/source/utils/logging/*.cpp"
@ -115,9 +124,8 @@ OPTION(USE_AWS_MEMORY_MANAGEMENT "Aws memory management" OFF)
configure_file("${AWS_SDK_CORE_DIR}/include/aws/core/SDKConfig.h.in" configure_file("${AWS_SDK_CORE_DIR}/include/aws/core/SDKConfig.h.in"
"${CMAKE_CURRENT_BINARY_DIR}/include/aws/core/SDKConfig.h" @ONLY) "${CMAKE_CURRENT_BINARY_DIR}/include/aws/core/SDKConfig.h" @ONLY)
list(APPEND AWS_PUBLIC_COMPILE_DEFS "-DAWS_SDK_VERSION_MAJOR=1") aws_get_version(AWS_CRT_CPP_VERSION_MAJOR AWS_CRT_CPP_VERSION_MINOR AWS_CRT_CPP_VERSION_PATCH FULL_VERSION GIT_HASH)
list(APPEND AWS_PUBLIC_COMPILE_DEFS "-DAWS_SDK_VERSION_MINOR=10") configure_file("${AWS_CRT_DIR}/include/aws/crt/Config.h.in" "${AWS_CRT_DIR}/include/aws/crt/Config.h" @ONLY)
list(APPEND AWS_PUBLIC_COMPILE_DEFS "-DAWS_SDK_VERSION_PATCH=36")
list(APPEND AWS_SOURCES ${AWS_SDK_CORE_SRC} ${AWS_SDK_CORE_NET_SRC} ${AWS_SDK_CORE_PLATFORM_SRC}) list(APPEND AWS_SOURCES ${AWS_SDK_CORE_SRC} ${AWS_SDK_CORE_NET_SRC} ${AWS_SDK_CORE_PLATFORM_SRC})
@ -176,6 +184,7 @@ file(GLOB AWS_COMMON_SRC
"${AWS_COMMON_DIR}/source/*.c" "${AWS_COMMON_DIR}/source/*.c"
"${AWS_COMMON_DIR}/source/external/*.c" "${AWS_COMMON_DIR}/source/external/*.c"
"${AWS_COMMON_DIR}/source/posix/*.c" "${AWS_COMMON_DIR}/source/posix/*.c"
"${AWS_COMMON_DIR}/source/linux/*.c"
) )
file(GLOB AWS_COMMON_ARCH_SRC file(GLOB AWS_COMMON_ARCH_SRC

2
contrib/aws-crt-cpp vendored

@ -1 +1 @@
Subproject commit 8a301b7e842f1daed478090c869207300972379f Subproject commit f532d6abc0d2b0d8b5d6fe9e7c51eaedbe4afbd0

2
contrib/aws-s2n-tls vendored

@ -1 +1 @@
Subproject commit 71f4794b7580cf780eb4aca77d69eded5d3c7bb4 Subproject commit 9a1e75454023e952b366ce1eab9c54007250119f

2
contrib/libxml2 vendored

@ -1 +1 @@
Subproject commit 8292f361458fcffe0bff515a385be02e9d35582c Subproject commit 223cb03a5d27b1b2393b266a8657443d046139d6

View File

@ -21,7 +21,7 @@ extern "C" {
* your library and includes mismatch * your library and includes mismatch
*/ */
#ifndef LIBXML2_COMPILING_MSCCDEF #ifndef LIBXML2_COMPILING_MSCCDEF
XMLPUBFUN void xmlCheckVersion(int version); XMLPUBFUN void XMLCALL xmlCheckVersion(int version);
#endif /* LIBXML2_COMPILING_MSCCDEF */ #endif /* LIBXML2_COMPILING_MSCCDEF */
/** /**
@ -29,28 +29,28 @@ XMLPUBFUN void xmlCheckVersion(int version);
* *
* the version string like "1.2.3" * the version string like "1.2.3"
*/ */
#define LIBXML_DOTTED_VERSION "2.12.4" #define LIBXML_DOTTED_VERSION "2.10.3"
/** /**
* LIBXML_VERSION: * LIBXML_VERSION:
* *
* the version number: 1.2.3 value is 10203 * the version number: 1.2.3 value is 10203
*/ */
#define LIBXML_VERSION 21204 #define LIBXML_VERSION 21003
/** /**
* LIBXML_VERSION_STRING: * LIBXML_VERSION_STRING:
* *
* the version number string, 1.2.3 value is "10203" * the version number string, 1.2.3 value is "10203"
*/ */
#define LIBXML_VERSION_STRING "21204" #define LIBXML_VERSION_STRING "21003"
/** /**
* LIBXML_VERSION_EXTRA: * LIBXML_VERSION_EXTRA:
* *
* extra version information, used to show a git commit description * extra version information, used to show a git commit description
*/ */
#define LIBXML_VERSION_EXTRA "-GITv2.12.4" #define LIBXML_VERSION_EXTRA ""
/** /**
* LIBXML_TEST_VERSION: * LIBXML_TEST_VERSION:
@ -58,7 +58,7 @@ XMLPUBFUN void xmlCheckVersion(int version);
* Macro to check that the libxml version in use is compatible with * Macro to check that the libxml version in use is compatible with
* the version the software has been compiled against * the version the software has been compiled against
*/ */
#define LIBXML_TEST_VERSION xmlCheckVersion(21204); #define LIBXML_TEST_VERSION xmlCheckVersion(21003);
#ifndef VMS #ifndef VMS
#if 0 #if 0
@ -270,7 +270,7 @@ XMLPUBFUN void xmlCheckVersion(int version);
* *
* Whether iconv support is available * Whether iconv support is available
*/ */
#if 1 #if 0
#define LIBXML_ICONV_ENABLED #define LIBXML_ICONV_ENABLED
#endif #endif
@ -313,7 +313,7 @@ XMLPUBFUN void xmlCheckVersion(int version);
/** /**
* LIBXML_DEBUG_RUNTIME: * LIBXML_DEBUG_RUNTIME:
* *
* Removed * Whether the runtime debugging is configured in
*/ */
#if 0 #if 0
#define LIBXML_DEBUG_RUNTIME #define LIBXML_DEBUG_RUNTIME
@ -409,7 +409,12 @@ XMLPUBFUN void xmlCheckVersion(int version);
#endif #endif
#ifdef __GNUC__ #ifdef __GNUC__
/** DOC_DISABLE */
/**
* ATTRIBUTE_UNUSED:
*
* Macro used to signal to GCC unused function parameters
*/
#ifndef ATTRIBUTE_UNUSED #ifndef ATTRIBUTE_UNUSED
# if ((__GNUC__ > 2) || ((__GNUC__ == 2) && (__GNUC_MINOR__ >= 7))) # if ((__GNUC__ > 2) || ((__GNUC__ == 2) && (__GNUC_MINOR__ >= 7)))
@ -419,6 +424,12 @@ XMLPUBFUN void xmlCheckVersion(int version);
# endif # endif
#endif #endif
/**
* LIBXML_ATTR_ALLOC_SIZE:
*
* Macro used to indicate to GCC this is an allocator function
*/
#ifndef LIBXML_ATTR_ALLOC_SIZE #ifndef LIBXML_ATTR_ALLOC_SIZE
# if (!defined(__clang__) && ((__GNUC__ > 4) || ((__GNUC__ == 4) && (__GNUC_MINOR__ >= 3)))) # if (!defined(__clang__) && ((__GNUC__ > 4) || ((__GNUC__ == 4) && (__GNUC_MINOR__ >= 3))))
# define LIBXML_ATTR_ALLOC_SIZE(x) __attribute__((alloc_size(x))) # define LIBXML_ATTR_ALLOC_SIZE(x) __attribute__((alloc_size(x)))
@ -429,6 +440,12 @@ XMLPUBFUN void xmlCheckVersion(int version);
# define LIBXML_ATTR_ALLOC_SIZE(x) # define LIBXML_ATTR_ALLOC_SIZE(x)
#endif #endif
/**
* LIBXML_ATTR_FORMAT:
*
* Macro used to indicate to GCC the parameter are printf like
*/
#ifndef LIBXML_ATTR_FORMAT #ifndef LIBXML_ATTR_FORMAT
# if ((__GNUC__ > 3) || ((__GNUC__ == 3) && (__GNUC_MINOR__ >= 3))) # if ((__GNUC__ > 3) || ((__GNUC__ == 3) && (__GNUC_MINOR__ >= 3)))
# define LIBXML_ATTR_FORMAT(fmt,args) __attribute__((__format__(__printf__,fmt,args))) # define LIBXML_ATTR_FORMAT(fmt,args) __attribute__((__format__(__printf__,fmt,args)))
@ -440,69 +457,44 @@ XMLPUBFUN void xmlCheckVersion(int version);
#endif #endif
#ifndef XML_DEPRECATED #ifndef XML_DEPRECATED
# if defined (IN_LIBXML) || (__GNUC__ * 100 + __GNUC_MINOR__ < 301) # ifdef IN_LIBXML
# define XML_DEPRECATED # define XML_DEPRECATED
/* Available since at least GCC 3.1 */
# else # else
/* Available since at least GCC 3.1 */
# define XML_DEPRECATED __attribute__((deprecated)) # define XML_DEPRECATED __attribute__((deprecated))
# endif # endif
#endif #endif
#if defined(__clang__) || (__GNUC__ * 100 + __GNUC_MINOR__ >= 406)
#if defined(__clang__) || (__GNUC__ * 100 + __GNUC_MINOR__ >= 800)
#define XML_IGNORE_FPTR_CAST_WARNINGS \
_Pragma("GCC diagnostic push") \
_Pragma("GCC diagnostic ignored \"-Wpedantic\"") \
_Pragma("GCC diagnostic ignored \"-Wcast-function-type\"")
#else
#define XML_IGNORE_FPTR_CAST_WARNINGS \
_Pragma("GCC diagnostic push") \
_Pragma("GCC diagnostic ignored \"-Wpedantic\"")
#endif
#define XML_POP_WARNINGS \
_Pragma("GCC diagnostic pop")
#else
#define XML_IGNORE_FPTR_CAST_WARNINGS
#define XML_POP_WARNINGS
#endif
#else /* ! __GNUC__ */ #else /* ! __GNUC__ */
/**
* ATTRIBUTE_UNUSED:
*
* Macro used to signal to GCC unused function parameters
*/
#define ATTRIBUTE_UNUSED #define ATTRIBUTE_UNUSED
/**
* LIBXML_ATTR_ALLOC_SIZE:
*
* Macro used to indicate to GCC this is an allocator function
*/
#define LIBXML_ATTR_ALLOC_SIZE(x) #define LIBXML_ATTR_ALLOC_SIZE(x)
/**
* LIBXML_ATTR_FORMAT:
*
* Macro used to indicate to GCC the parameter are printf like
*/
#define LIBXML_ATTR_FORMAT(fmt,args) #define LIBXML_ATTR_FORMAT(fmt,args)
/**
* XML_DEPRECATED:
*
* Macro used to indicate that a function, variable, type or struct member
* is deprecated.
*/
#ifndef XML_DEPRECATED #ifndef XML_DEPRECATED
# if defined (IN_LIBXML) || !defined (_MSC_VER) #define XML_DEPRECATED
# define XML_DEPRECATED
/* Available since Visual Studio 2005 */
# elif defined (_MSC_VER) && (_MSC_VER >= 1400)
# define XML_DEPRECATED __declspec(deprecated)
# endif
#endif
#if defined (_MSC_VER) && (_MSC_VER >= 1400)
# define XML_IGNORE_FPTR_CAST_WARNINGS __pragma(warning(push))
#else
# define XML_IGNORE_FPTR_CAST_WARNINGS
#endif
#ifndef XML_POP_WARNINGS
# if defined (_MSC_VER) && (_MSC_VER >= 1400)
# define XML_POP_WARNINGS __pragma(warning(pop))
# else
# define XML_POP_WARNINGS
# endif
#endif #endif
#endif /* __GNUC__ */ #endif /* __GNUC__ */
#define XML_NO_ATTR
#ifdef LIBXML_THREAD_ENABLED
#define XML_DECLARE_GLOBAL(name, type, attrs) \
attrs XMLPUBFUN type *__##name(void);
#define XML_GLOBAL_MACRO(name) (*__##name())
#else
#define XML_DECLARE_GLOBAL(name, type, attrs) \
attrs XMLPUBVAR type name;
#endif
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif /* __cplusplus */ #endif /* __cplusplus */

View File

@ -24,7 +24,7 @@ git config --file .gitmodules --get-regexp '.*path' | sed 's/[^ ]* //' | xargs -
# We don't want to depend on any third-party CMake files. # We don't want to depend on any third-party CMake files.
# To check it, find and delete them. # To check it, find and delete them.
grep -o -P '"contrib/[^"]+"' .gitmodules | grep -o -P '"contrib/[^"]+"' .gitmodules |
grep -v -P 'contrib/(llvm-project|google-protobuf|grpc|abseil-cpp|corrosion)' | grep -v -P 'contrib/(llvm-project|google-protobuf|grpc|abseil-cpp|corrosion|aws-crt-cpp)' |
xargs -I@ find @ \ xargs -I@ find @ \
-'(' -name 'CMakeLists.txt' -or -name '*.cmake' -')' -and -not -name '*.h.cmake' \ -'(' -name 'CMakeLists.txt' -or -name '*.cmake' -')' -and -not -name '*.h.cmake' \
-delete -delete

View File

@ -211,6 +211,17 @@ function build
echo "build_clickhouse_fasttest_binary: [ OK ] $BUILD_SECONDS_ELAPSED sec." \ echo "build_clickhouse_fasttest_binary: [ OK ] $BUILD_SECONDS_ELAPSED sec." \
| ts '%Y-%m-%d %H:%M:%S' \ | ts '%Y-%m-%d %H:%M:%S' \
| tee "$FASTTEST_OUTPUT/test_result.txt" | tee "$FASTTEST_OUTPUT/test_result.txt"
(
# This query should fail, and print stacktrace with proper symbol names (even on a stripped binary)
clickhouse_output=$(programs/clickhouse-stripped --stacktrace -q 'select' 2>&1 || :)
if [[ $clickhouse_output =~ DB::LocalServer::main ]]; then
echo "stripped_clickhouse_shows_symbols_names: [ OK ] 0 sec."
else
echo -e "stripped_clickhouse_shows_symbols_names: [ FAIL ] 0 sec. - clickhouse output:\n\n$clickhouse_output\n"
fi
) | ts '%Y-%m-%d %H:%M:%S' | tee -a "$FASTTEST_OUTPUT/test_result.txt"
if [ "$COPY_CLICKHOUSE_BINARY_TO_OUTPUT" -eq "1" ]; then if [ "$COPY_CLICKHOUSE_BINARY_TO_OUTPUT" -eq "1" ]; then
mkdir -p "$FASTTEST_OUTPUT/binaries/" mkdir -p "$FASTTEST_OUTPUT/binaries/"
cp programs/clickhouse "$FASTTEST_OUTPUT/binaries/clickhouse" cp programs/clickhouse "$FASTTEST_OUTPUT/binaries/clickhouse"

View File

@ -163,7 +163,7 @@ key: value
Corresponding XML: Corresponding XML:
``` xml ``` xml
<key>value</value> <key>value</key>
``` ```
A nested XML node is represented by a YAML map: A nested XML node is represented by a YAML map:

View File

@ -70,6 +70,19 @@
if (params.has('password')) { password = params.get('password'); } if (params.has('password')) { password = params.get('password'); }
} }
let url = `${host}?allow_introspection_functions=1`;
if (add_http_cors_header) {
url += '&add_http_cors_header=1';
}
if (user) {
url += `&user=${encodeURIComponent(user)}`;
}
if (password) {
url += `&password=${encodeURIComponent(password)}`;
}
let map = L.map('space', { let map = L.map('space', {
crs: L.CRS.Simple, crs: L.CRS.Simple,
center: [-512, 512], center: [-512, 512],
@ -103,24 +116,11 @@
const key = `${coords.z}-${coords.x}-${coords.y}`; const key = `${coords.z}-${coords.x}-${coords.y}`;
let buf = cached_tiles[key]; let buf = cached_tiles[key];
if (!buf) { if (!buf) {
let url = `${host}?default_format=RowBinary&allow_introspection_functions=1`; let request_url = `${url}&default_format=RowBinary` +
`&param_z=${coords.z}&param_x=${coords.x}&param_y=${coords.y}` +
`&enable_http_compression=1&network_compression_method=zstd&network_zstd_compression_level=6`;
if (add_http_cors_header) { const response = await fetch(request_url, { method: 'POST', body: sql });
// For debug purposes, you may set add_http_cors_header from a browser console
url += '&add_http_cors_header=1';
}
if (user) {
url += `&user=${encodeURIComponent(user)}`;
}
if (password) {
url += `&password=${encodeURIComponent(password)}`;
}
url += `&param_z=${coords.z}&param_x=${coords.x}&param_y=${coords.y}`;
url += `&enable_http_compression=1&network_compression_method=zstd&network_zstd_compression_level=6`;
const response = await fetch(url, { method: 'POST', body: sql });
if (!response.ok) { if (!response.ok) {
const text = await response.text(); const text = await response.text();
@ -238,7 +238,7 @@
const addr_hex = '0x' + addr_int.toString(16); const addr_hex = '0x' + addr_int.toString(16);
const response = fetch( const response = fetch(
`http://localhost:8123/?default_format=JSON`, `${url}&default_format=JSON`,
{ {
method: 'POST', method: 'POST',
body: `SELECT encodeXMLComponent(demangle(addressToSymbol(${addr_int}::UInt64))) AS name, body: `SELECT encodeXMLComponent(demangle(addressToSymbol(${addr_int}::UInt64))) AS name,

View File

@ -2,6 +2,7 @@ use prql_compiler::sql::Dialect;
use prql_compiler::{Options, Target}; use prql_compiler::{Options, Target};
use std::ffi::{c_char, CString}; use std::ffi::{c_char, CString};
use std::slice; use std::slice;
use std::panic;
fn set_output(result: String, out: *mut *mut u8, out_size: *mut u64) { fn set_output(result: String, out: *mut *mut u8, out_size: *mut u64) {
assert!(!out_size.is_null()); assert!(!out_size.is_null());
@ -13,8 +14,7 @@ fn set_output(result: String, out: *mut *mut u8, out_size: *mut u64) {
*out_ptr = CString::new(result).unwrap().into_raw() as *mut u8; *out_ptr = CString::new(result).unwrap().into_raw() as *mut u8;
} }
#[no_mangle] pub unsafe extern "C" fn prql_to_sql_impl(
pub unsafe extern "C" fn prql_to_sql(
query: *const u8, query: *const u8,
size: u64, size: u64,
out: *mut *mut u8, out: *mut *mut u8,
@ -50,6 +50,23 @@ pub unsafe extern "C" fn prql_to_sql(
} }
} }
#[no_mangle]
pub unsafe extern "C" fn prql_to_sql(
query: *const u8,
size: u64,
out: *mut *mut u8,
out_size: *mut u64,
) -> i64 {
let ret = panic::catch_unwind(|| {
return prql_to_sql_impl(query, size, out, out_size);
});
return match ret {
// NOTE: using cxxbridge we can return proper Result<> type.
Err(_err) => 1,
Ok(res) => res,
}
}
#[no_mangle] #[no_mangle]
pub unsafe extern "C" fn prql_free_pointer(ptr_to_free: *mut u8) { pub unsafe extern "C" fn prql_free_pointer(ptr_to_free: *mut u8) {
std::mem::drop(CString::from_raw(ptr_to_free as *mut c_char)); std::mem::drop(CString::from_raw(ptr_to_free as *mut c_char));

View File

@ -1,6 +1,7 @@
use skim::prelude::*; use skim::prelude::*;
use term::terminfo::TermInfo; use term::terminfo::TermInfo;
use cxx::{CxxString, CxxVector}; use cxx::{CxxString, CxxVector};
use std::panic;
#[cxx::bridge] #[cxx::bridge]
mod ffi { mod ffi {
@ -36,7 +37,7 @@ impl SkimItem for Item {
} }
} }
fn skim(prefix: &CxxString, words: &CxxVector<CxxString>) -> Result<String, String> { fn skim_impl(prefix: &CxxString, words: &CxxVector<CxxString>) -> Result<String, String> {
// Let's check is terminal available. To avoid panic. // Let's check is terminal available. To avoid panic.
if let Err(err) = TermInfo::from_env() { if let Err(err) = TermInfo::from_env() {
return Err(format!("{}", err)); return Err(format!("{}", err));
@ -89,3 +90,22 @@ fn skim(prefix: &CxxString, words: &CxxVector<CxxString>) -> Result<String, Stri
} }
return Ok(output.selected_items[0].output().to_string()); return Ok(output.selected_items[0].output().to_string());
} }
fn skim(prefix: &CxxString, words: &CxxVector<CxxString>) -> Result<String, String> {
let ret = panic::catch_unwind(|| {
return skim_impl(prefix, words);
});
return match ret {
Err(err) => {
let e = if let Some(s) = err.downcast_ref::<String>() {
format!("{}", s)
} else if let Some(s) = err.downcast_ref::<&str>() {
format!("{}", s)
} else {
format!("Unknown panic type: {:?}", err.type_id())
};
Err(format!("Rust panic: {:?}", e))
},
Ok(res) => res,
}
}

View File

@ -17,6 +17,7 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int ILLEGAL_TYPE_OF_ARGUMENT; extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int INCORRECT_DATA;
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED; extern const int NOT_IMPLEMENTED;
} }
@ -30,12 +31,12 @@ class ApproxSampler
public: public:
struct Stats struct Stats
{ {
T value; // the sampled value T value; // The sampled value
Int64 g; // the minimum rank jump from the previous value's minimum rank Int64 g; // The minimum rank jump from the previous value's minimum rank
Int64 delta; // the maximum span of the rank Int64 delta; // The maximum span of the rank
Stats() = default; Stats() = default;
Stats(T value_, Int64 g_, Int64 delta_) : value(value_), g(g_), delta(delta_) {} Stats(T value_, Int64 g_, Int64 delta_) : value(value_), g(g_), delta(delta_) { }
}; };
struct QueryResult struct QueryResult
@ -49,20 +50,20 @@ public:
ApproxSampler() = default; ApproxSampler() = default;
explicit ApproxSampler( ApproxSampler(const ApproxSampler & other)
double relative_error_, : relative_error(other.relative_error)
size_t compress_threshold_ = default_compress_threshold, , compress_threshold(other.compress_threshold)
size_t count_ = 0, , count(other.count)
bool compressed_ = false) , compressed(other.compressed)
: relative_error(relative_error_) , sampled(other.sampled.begin(), other.sampled.end())
, compress_threshold(compress_threshold_) , backup_sampled(other.backup_sampled.begin(), other.backup_sampled.end())
, count(count_) , head_sampled(other.head_sampled.begin(), other.head_sampled.end())
, compressed(compressed_)
{ {
sampled.reserve(compress_threshold); }
backup_sampled.reserve(compress_threshold);
head_sampled.reserve(default_head_size); explicit ApproxSampler(double relative_error_)
: relative_error(relative_error_), compress_threshold(default_compress_threshold), count(0), compressed(false)
{
} }
bool isCompressed() const { return compressed; } bool isCompressed() const { return compressed; }
@ -95,9 +96,9 @@ public:
Int64 current_max = std::numeric_limits<Int64>::min(); Int64 current_max = std::numeric_limits<Int64>::min();
for (const auto & stats : sampled) for (const auto & stats : sampled)
current_max = std::max(stats.delta + stats.g, current_max); current_max = std::max(stats.delta + stats.g, current_max);
Int64 target_error = current_max/2; Int64 target_error = current_max / 2;
size_t index= 0; size_t index = 0;
auto min_rank = sampled[0].g; auto min_rank = sampled[0].g;
for (size_t i = 0; i < size; ++i) for (size_t i = 0; i < size; ++i)
{ {
@ -118,7 +119,6 @@ public:
result[indices[i]] = res.value; result[indices[i]] = res.value;
} }
} }
} }
void compress() void compress()
@ -256,16 +256,27 @@ public:
void read(ReadBuffer & buf) void read(ReadBuffer & buf)
{ {
readBinaryLittleEndian(compress_threshold, buf); readBinaryLittleEndian(compress_threshold, buf);
if (compress_threshold != default_compress_threshold)
throw Exception(
ErrorCodes::INCORRECT_DATA,
"The compress threshold {} isn't the expected one {}",
compress_threshold,
default_compress_threshold);
readBinaryLittleEndian(relative_error, buf); readBinaryLittleEndian(relative_error, buf);
readBinaryLittleEndian(count, buf); readBinaryLittleEndian(count, buf);
size_t sampled_len = 0; size_t sampled_len = 0;
readBinaryLittleEndian(sampled_len, buf); readBinaryLittleEndian(sampled_len, buf);
if (sampled_len > compress_threshold)
throw Exception(
ErrorCodes::INCORRECT_DATA, "The number of elements {} for quantileGK exceeds {}", sampled_len, compress_threshold);
sampled.resize(sampled_len); sampled.resize(sampled_len);
for (size_t i = 0; i < sampled_len; ++i) for (size_t i = 0; i < sampled_len; ++i)
{ {
auto stats = sampled[i]; auto & stats = sampled[i];
readBinaryLittleEndian(stats.value, buf); readBinaryLittleEndian(stats.value, buf);
readBinaryLittleEndian(stats.g, buf); readBinaryLittleEndian(stats.g, buf);
readBinaryLittleEndian(stats.delta, buf); readBinaryLittleEndian(stats.delta, buf);
@ -291,7 +302,7 @@ private:
min_rank += curr_sample.g; min_rank += curr_sample.g;
} }
} }
return {sampled.size()-1, 0, sampled.back().value}; return {sampled.size() - 1, 0, sampled.back().value};
} }
void withHeadBufferInserted() void withHeadBufferInserted()
@ -389,12 +400,11 @@ private:
double relative_error; double relative_error;
size_t compress_threshold; size_t compress_threshold;
size_t count = 0; size_t count;
bool compressed; bool compressed;
PaddedPODArray<Stats> sampled; PaddedPODArray<Stats> sampled;
PaddedPODArray<Stats> backup_sampled; PaddedPODArray<Stats> backup_sampled;
PaddedPODArray<T> head_sampled; PaddedPODArray<T> head_sampled;
static constexpr size_t default_compress_threshold = 10000; static constexpr size_t default_compress_threshold = 10000;
@ -406,17 +416,14 @@ class QuantileGK
{ {
private: private:
using Data = ApproxSampler<Value>; using Data = ApproxSampler<Value>;
mutable Data data; Data data;
public: public:
QuantileGK() = default; QuantileGK() = default;
explicit QuantileGK(size_t accuracy) : data(1.0 / static_cast<double>(accuracy)) { } explicit QuantileGK(size_t accuracy) : data(1.0 / static_cast<double>(accuracy)) { }
void add(const Value & x) void add(const Value & x) { data.insert(x); }
{
data.insert(x);
}
template <typename Weight> template <typename Weight>
void add(const Value &, const Weight &) void add(const Value &, const Weight &)
@ -429,22 +436,34 @@ public:
if (!data.isCompressed()) if (!data.isCompressed())
data.compress(); data.compress();
data.merge(rhs.data); if (rhs.data.isCompressed())
data.merge(rhs.data);
else
{
/// We can't modify rhs, so copy it and compress
Data rhs_data_copy(rhs.data);
rhs_data_copy.compress();
data.merge(rhs_data_copy);
}
} }
void serialize(WriteBuffer & buf) const void serialize(WriteBuffer & buf) const
{ {
/// Always compress before serialization if (data.isCompressed())
if (!data.isCompressed()) data.write(buf);
data.compress(); else
{
data.write(buf); /// We can't modify rhs, so copy it and compress
Data data_copy(data);
data_copy.compress();
data_copy.write(buf);
}
} }
void deserialize(ReadBuffer & buf) void deserialize(ReadBuffer & buf)
{ {
data.read(buf); data.read(buf);
/// Serialized data is always compressed
data.setCompressed(); data.setCompressed();
} }
@ -481,7 +500,6 @@ public:
} }
}; };
template <typename Value, bool _> using FuncQuantileGK = AggregateFunctionQuantile<Value, QuantileGK<Value>, NameQuantileGK, false, void, false, true>; template <typename Value, bool _> using FuncQuantileGK = AggregateFunctionQuantile<Value, QuantileGK<Value>, NameQuantileGK, false, void, false, true>;
template <typename Value, bool _> using FuncQuantilesGK = AggregateFunctionQuantile<Value, QuantileGK<Value>, NameQuantilesGK, false, void, true, true>; template <typename Value, bool _> using FuncQuantilesGK = AggregateFunctionQuantile<Value, QuantileGK<Value>, NameQuantilesGK, false, void, true, true>;

View File

@ -2,6 +2,8 @@
#include <memory> #include <memory>
#include <base/defines.h>
#include <Poco/Channel.h> #include <Poco/Channel.h>
#include <Poco/Logger.h> #include <Poco/Logger.h>
#include <Poco/Message.h> #include <Poco/Message.h>
@ -24,6 +26,16 @@ using LoggerRawPtr = Poco::Logger *;
*/ */
LoggerPtr getLogger(const std::string & name); LoggerPtr getLogger(const std::string & name);
/** Get Logger with specified name. If the Logger does not exists, it is created.
* This overload was added for specific purpose, when logger is constructed from constexpr string.
* Logger is destroyed only during program shutdown.
*/
template <size_t n>
ALWAYS_INLINE LoggerPtr getLogger(const char (&name)[n])
{
return Poco::Logger::getShared(name, false /*should_be_owned_by_shared_ptr_if_created*/);
}
/** Create Logger with specified name, channel and logging level. /** Create Logger with specified name, channel and logging level.
* If Logger already exists, throws exception. * If Logger already exists, throws exception.
* Logger is destroyed, when last shared ptr that refers to Logger with specified name is destroyed. * Logger is destroyed, when last shared ptr that refers to Logger with specified name is destroyed.

View File

@ -317,16 +317,19 @@ constexpr std::pair<std::string_view, std::string_view> replacements[]
// Demangle @c symbol_name if it's not from __functional header (as such functions don't provide any useful // Demangle @c symbol_name if it's not from __functional header (as such functions don't provide any useful
// information but pollute stack traces). // information but pollute stack traces).
// Replace parts from @c replacements with shorter aliases // Replace parts from @c replacements with shorter aliases
String demangleAndCollapseNames(std::string_view file, const char * const symbol_name) String demangleAndCollapseNames(std::optional<std::string_view> file, const char * const symbol_name)
{ {
if (!symbol_name) if (!symbol_name)
return "?"; return "?";
std::string_view file_copy = file; if (file.has_value())
if (auto trim_pos = file.find_last_of('/'); trim_pos != file.npos) {
file_copy.remove_suffix(file.size() - trim_pos); std::string_view file_copy = file.value();
if (file_copy.ends_with("functional")) if (auto trim_pos = file_copy.find_last_of('/'); trim_pos != file_copy.npos)
return "?"; file_copy.remove_suffix(file_copy.size() - trim_pos);
if (file_copy.ends_with("functional"))
return "?";
}
String haystack = demangle(symbol_name); String haystack = demangle(symbol_name);
@ -393,8 +396,8 @@ toStringEveryLineImpl([[maybe_unused]] bool fatal, const StackTraceRefTriple & s
if (frame.file.has_value() && frame.line.has_value()) if (frame.file.has_value() && frame.line.has_value())
out << *frame.file << ':' << *frame.line << ": "; out << *frame.file << ':' << *frame.line << ": ";
if (frame.symbol.has_value() && frame.file.has_value()) if (frame.symbol.has_value())
out << demangleAndCollapseNames(*frame.file, frame.symbol->data()); out << demangleAndCollapseNames(frame.file, frame.symbol->data());
else else
out << "?"; out << "?";

View File

@ -9,6 +9,7 @@
#include <Poco/NullChannel.h> #include <Poco/NullChannel.h>
#include <Poco/StreamChannel.h> #include <Poco/StreamChannel.h>
#include <sstream> #include <sstream>
#include <thread>
TEST(Logger, Log) TEST(Logger, Log)
@ -100,3 +101,75 @@ TEST(Logger, SideEffects)
LOG_TRACE(log, "test no throw {}", getLogMessageParamOrThrow()); LOG_TRACE(log, "test no throw {}", getLogMessageParamOrThrow());
} }
TEST(Logger, SharedRawLogger)
{
{
std::ostringstream stream; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
auto stream_channel = Poco::AutoPtr<Poco::StreamChannel>(new Poco::StreamChannel(stream));
auto shared_logger = getLogger("Logger_1");
shared_logger->setChannel(stream_channel.get());
shared_logger->setLevel("trace");
LOG_TRACE(shared_logger, "SharedLogger1Log1");
LOG_TRACE(getRawLogger("Logger_1"), "RawLogger1Log");
LOG_TRACE(shared_logger, "SharedLogger1Log2");
auto actual = stream.str();
EXPECT_EQ(actual, "SharedLogger1Log1\nRawLogger1Log\nSharedLogger1Log2\n");
}
{
std::ostringstream stream; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
auto stream_channel = Poco::AutoPtr<Poco::StreamChannel>(new Poco::StreamChannel(stream));
auto * raw_logger = getRawLogger("Logger_2");
raw_logger->setChannel(stream_channel.get());
raw_logger->setLevel("trace");
LOG_TRACE(getLogger("Logger_2"), "SharedLogger2Log1");
LOG_TRACE(raw_logger, "RawLogger2Log");
LOG_TRACE(getLogger("Logger_2"), "SharedLogger2Log2");
auto actual = stream.str();
EXPECT_EQ(actual, "SharedLogger2Log1\nRawLogger2Log\nSharedLogger2Log2\n");
}
}
TEST(Logger, SharedLoggersThreadSafety)
{
static size_t threads_count = std::thread::hardware_concurrency();
static constexpr size_t loggers_count = 10;
static constexpr size_t logger_get_count = 1000;
Poco::Logger::root();
std::vector<std::string> names;
Poco::Logger::names(names);
size_t loggers_size_before = names.size();
std::vector<std::thread> threads;
for (size_t thread_index = 0; thread_index < threads_count; ++thread_index)
{
threads.emplace_back([]()
{
for (size_t logger_index = 0; logger_index < loggers_count; ++logger_index)
{
for (size_t iteration = 0; iteration < logger_get_count; ++iteration)
{
getLogger("Logger_" + std::to_string(logger_index));
}
}
});
}
for (auto & thread : threads)
thread.join();
Poco::Logger::names(names);
size_t loggers_size_after = names.size();
EXPECT_EQ(loggers_size_before, loggers_size_after);
}

View File

@ -136,12 +136,12 @@ namespace
{ {
void assertDigest( void assertDigest(
const KeeperStorage::Digest & first, const KeeperStorage::Digest & expected,
const KeeperStorage::Digest & second, const KeeperStorage::Digest & actual,
const Coordination::ZooKeeperRequest & request, const Coordination::ZooKeeperRequest & request,
bool committing) bool committing)
{ {
if (!KeeperStorage::checkDigest(first, second)) if (!KeeperStorage::checkDigest(expected, actual))
{ {
LOG_FATAL( LOG_FATAL(
getLogger("KeeperStateMachine"), getLogger("KeeperStateMachine"),
@ -149,9 +149,9 @@ void assertDigest(
"{}). Keeper will terminate to avoid inconsistencies.\nExtra information about the request:\n{}", "{}). Keeper will terminate to avoid inconsistencies.\nExtra information about the request:\n{}",
committing ? "committing" : "preprocessing", committing ? "committing" : "preprocessing",
request.getOpNum(), request.getOpNum(),
first.value, expected.value,
second.value, actual.value,
first.version, expected.version,
request.toString()); request.toString());
std::terminate(); std::terminate();
} }

View File

@ -174,7 +174,6 @@ uint64_t calculateDigest(std::string_view path, std::string_view data, const Kee
hash.update(data); hash.update(data);
hash.update(stat.czxid);
hash.update(stat.czxid); hash.update(stat.czxid);
hash.update(stat.mzxid); hash.update(stat.mzxid);
hash.update(stat.ctime); hash.update(stat.ctime);
@ -183,7 +182,6 @@ uint64_t calculateDigest(std::string_view path, std::string_view data, const Kee
hash.update(stat.cversion); hash.update(stat.cversion);
hash.update(stat.aversion); hash.update(stat.aversion);
hash.update(stat.ephemeralOwner); hash.update(stat.ephemeralOwner);
hash.update(data.length());
hash.update(stat.numChildren); hash.update(stat.numChildren);
hash.update(stat.pzxid); hash.update(stat.pzxid);
@ -2531,6 +2529,17 @@ void KeeperStorage::recalculateStats()
container.recalculateDataSize(); container.recalculateDataSize();
} }
bool KeeperStorage::checkDigest(const Digest & first, const Digest & second)
{
if (first.version != second.version)
return true;
if (first.version == DigestVersion::NO_DIGEST)
return true;
return first.value == second.value;
}
String KeeperStorage::generateDigest(const String & userdata) String KeeperStorage::generateDigest(const String & userdata)
{ {
std::vector<String> user_password; std::vector<String> user_password;

View File

@ -95,10 +95,11 @@ public:
{ {
NO_DIGEST = 0, NO_DIGEST = 0,
V1 = 1, V1 = 1,
V2 = 2 // added system nodes that modify the digest on startup so digest from V0 is invalid V2 = 2, // added system nodes that modify the digest on startup so digest from V0 is invalid
V3 = 3 // fixed bug with casting, removed duplicate czxid usage
}; };
static constexpr auto CURRENT_DIGEST_VERSION = DigestVersion::V2; static constexpr auto CURRENT_DIGEST_VERSION = DigestVersion::V3;
struct ResponseForSession struct ResponseForSession
{ {
@ -113,16 +114,7 @@ public:
uint64_t value{0}; uint64_t value{0};
}; };
static bool checkDigest(const Digest & first, const Digest & second) static bool checkDigest(const Digest & first, const Digest & second);
{
if (first.version != second.version)
return true;
if (first.version == DigestVersion::NO_DIGEST)
return true;
return first.value == second.value;
}
static String generateDigest(const String & userdata); static String generateDigest(const String & userdata);

View File

@ -102,7 +102,7 @@ void checkS3Capabilities(
if (s3_capabilities.support_batch_delete && !checkBatchRemove(storage, key_with_trailing_slash)) if (s3_capabilities.support_batch_delete && !checkBatchRemove(storage, key_with_trailing_slash))
{ {
LOG_WARNING( LOG_WARNING(
&Poco::Logger::get("S3ObjectStorage"), getLogger("S3ObjectStorage"),
"Storage for disk {} does not support batch delete operations, " "Storage for disk {} does not support batch delete operations, "
"so `s3_capabilities.support_batch_delete` was automatically turned off during the access check. " "so `s3_capabilities.support_batch_delete` was automatically turned off during the access check. "
"To remove this message set `s3_capabilities.support_batch_delete` for the disk to `false`.", "To remove this message set `s3_capabilities.support_batch_delete` for the disk to `false`.",

View File

@ -82,7 +82,7 @@ WebObjectStorage::loadFiles(const String & path, const std::unique_lock<std::sha
if (!inserted) if (!inserted)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Loading data for {} more than once", file_path); throw Exception(ErrorCodes::LOGICAL_ERROR, "Loading data for {} more than once", file_path);
LOG_TRACE(&Poco::Logger::get("DiskWeb"), "Adding file: {}, size: {}", file_path, size); LOG_TRACE(getLogger("DiskWeb"), "Adding file: {}, size: {}", file_path, size);
loaded_files.emplace_back(file_path); loaded_files.emplace_back(file_path);
} }

View File

@ -6,6 +6,7 @@
#include <Formats/FormatSettings.h> #include <Formats/FormatSettings.h>
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>
#include <IO/BufferWithOwnMemory.h> #include <IO/BufferWithOwnMemory.h>
#include <IO/PeekableReadBuffer.h>
#include <IO/readFloatText.h> #include <IO/readFloatText.h>
#include <IO/Operators.h> #include <IO/Operators.h>
#include <base/find_symbols.h> #include <base/find_symbols.h>

View File

@ -38,7 +38,6 @@
#include <IO/CompressionMethod.h> #include <IO/CompressionMethod.h>
#include <IO/ReadBuffer.h> #include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromMemory.h> #include <IO/ReadBufferFromMemory.h>
#include <IO/PeekableReadBuffer.h>
#include <IO/VarInt.h> #include <IO/VarInt.h>
#include <pcg_random.hpp> #include <pcg_random.hpp>
@ -51,6 +50,7 @@ namespace DB
template <typename Allocator> template <typename Allocator>
struct Memory; struct Memory;
class PeekableReadBuffer;
namespace ErrorCodes namespace ErrorCodes
{ {

View File

@ -30,6 +30,7 @@ static const std::unordered_map<String, String> quantile_fuse_name_mapping =
{"quantileTDigestWeighted", "quantilesTDigestWeighted"}, {"quantileTDigestWeighted", "quantilesTDigestWeighted"},
{"quantileTiming", "quantilesTiming"}, {"quantileTiming", "quantilesTiming"},
{"quantileTimingWeighted", "quantilesTimingWeighted"}, {"quantileTimingWeighted", "quantilesTimingWeighted"},
{"quantileGK", "quantilesGK"},
}; };
String GatherFunctionQuantileData::toFusedNameOrSelf(const String & func_name) String GatherFunctionQuantileData::toFusedNameOrSelf(const String & func_name)

View File

@ -267,8 +267,10 @@ private:
/// We don't support WITH cte as (subquery) Select table JOIN cte because we don't do conversion in AST /// We don't support WITH cte as (subquery) Select table JOIN cte because we don't do conversion in AST
bool is_subquery = false; bool is_subquery = false;
if (const auto * ast_table_expr = table_elem.table_expression->as<ASTTableExpression>()) if (const auto * ast_table_expr = table_elem.table_expression->as<ASTTableExpression>())
is_subquery = ast_table_expr->subquery->as<ASTSubquery>() != nullptr {
is_subquery = ast_table_expr->subquery && ast_table_expr->subquery->as<ASTSubquery>() != nullptr
&& ast_table_expr->subquery->as<ASTSubquery>()->cte_name.empty(); && ast_table_expr->subquery->as<ASTSubquery>()->cte_name.empty();
}
else if (table_elem.table_expression->as<ASTSubquery>()) else if (table_elem.table_expression->as<ASTSubquery>())
is_subquery = true; is_subquery = true;

View File

@ -7,6 +7,7 @@
#include <Processors/Formats/RowInputFormatWithNamesAndTypes.h> #include <Processors/Formats/RowInputFormatWithNamesAndTypes.h>
#include <Processors/Formats/ISchemaReader.h> #include <Processors/Formats/ISchemaReader.h>
#include <Formats/FormatSettings.h> #include <Formats/FormatSettings.h>
#include <IO/PeekableReadBuffer.h>
namespace DB namespace DB

View File

@ -2,6 +2,7 @@
#include <Formats/JSONUtils.h> #include <Formats/JSONUtils.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <Formats/EscapingRuleUtils.h> #include <Formats/EscapingRuleUtils.h>
#include <IO/PeekableReadBuffer.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
namespace DB namespace DB

View File

@ -4,6 +4,7 @@
#include <Formats/FormatSettings.h> #include <Formats/FormatSettings.h>
#include <Processors/Formats/RowInputFormatWithNamesAndTypes.h> #include <Processors/Formats/RowInputFormatWithNamesAndTypes.h>
#include <Processors/Formats/ISchemaReader.h> #include <Processors/Formats/ISchemaReader.h>
#include <IO/PeekableReadBuffer.h>
namespace DB namespace DB

View File

@ -7,6 +7,7 @@
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <IO/Operators.h> #include <IO/Operators.h>
#include <IO/ReadBufferFromString.h> #include <IO/ReadBufferFromString.h>
#include <IO/PeekableReadBuffer.h>
#include <Formats/EscapingRuleUtils.h> #include <Formats/EscapingRuleUtils.h>

View File

@ -20,7 +20,6 @@
#include <Disks/ObjectStorages/DiskObjectStorage.h> #include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Disks/TemporaryFileOnDisk.h> #include <Disks/TemporaryFileOnDisk.h>
#include <Disks/createVolume.h> #include <Disks/createVolume.h>
#include <Functions/IFunction.h>
#include <IO/Operators.h> #include <IO/Operators.h>
#include <IO/S3Common.h> #include <IO/S3Common.h>
#include <IO/SharedThreadPools.h> #include <IO/SharedThreadPools.h>
@ -47,7 +46,6 @@
#include <Parsers/ASTPartition.h> #include <Parsers/ASTPartition.h>
#include <Parsers/ASTSetQuery.h> #include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h> #include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/parseQuery.h> #include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h> #include <Parsers/queryToString.h>
#include <Processors/Formats/IInputFormat.h> #include <Processors/Formats/IInputFormat.h>
@ -62,9 +60,7 @@
#include <Storages/MergeTree/MergeTreeDataPartCloner.h> #include <Storages/MergeTree/MergeTreeDataPartCloner.h>
#include <Storages/MergeTree/MergeTreeDataPartCompact.h> #include <Storages/MergeTree/MergeTreeDataPartCompact.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h> #include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/MergeTreeDataPartWide.h>
#include <Storages/Statistics/Estimator.h> #include <Storages/Statistics/Estimator.h>
#include <Storages/MergeTree/MergeTreeSelectProcessor.h>
#include <Storages/MergeTree/RangesInDataPart.h> #include <Storages/MergeTree/RangesInDataPart.h>
#include <Storages/MergeTree/checkDataPart.h> #include <Storages/MergeTree/checkDataPart.h>
#include <Storages/MutationCommands.h> #include <Storages/MutationCommands.h>
@ -75,12 +71,10 @@
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
#include <Common/Increment.h> #include <Common/Increment.h>
#include <Common/ProfileEventsScope.h> #include <Common/ProfileEventsScope.h>
#include <Common/SimpleIncrement.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <Common/StringUtils/StringUtils.h> #include <Common/StringUtils/StringUtils.h>
#include <Common/ThreadFuzzer.h> #include <Common/ThreadFuzzer.h>
#include <Common/escapeForFileName.h> #include <Common/escapeForFileName.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/noexcept_scope.h> #include <Common/noexcept_scope.h>
#include <Common/quoteString.h> #include <Common/quoteString.h>
#include <Common/scope_guard_safe.h> #include <Common/scope_guard_safe.h>
@ -91,13 +85,10 @@
#include <base/insertAtEnd.h> #include <base/insertAtEnd.h>
#include <base/interpolate.h> #include <base/interpolate.h>
#include <base/defines.h>
#include <algorithm> #include <algorithm>
#include <atomic> #include <atomic>
#include <cmath>
#include <chrono> #include <chrono>
#include <iomanip>
#include <limits> #include <limits>
#include <optional> #include <optional>
#include <ranges> #include <ranges>

View File

@ -180,7 +180,7 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> cloneSourcePart(
} }
LOG_DEBUG( LOG_DEBUG(
&Poco::Logger::get("MergeTreeDataPartCloner"), getLogger("MergeTreeDataPartCloner"),
"Clone {} part {} to {}{}", "Clone {} part {} to {}{}",
src_flushed_tmp_part ? "flushed" : "", src_flushed_tmp_part ? "flushed" : "",
src_part_storage->getFullPath(), src_part_storage->getFullPath(),

View File

@ -637,25 +637,31 @@ void S3QueueFilesMetadata::setFileProcessedForUnorderedMode(ProcessingNodeHolder
"this could be a result of expired zookeeper session", path); "this could be a result of expired zookeeper session", path);
} }
void S3QueueFilesMetadata::setFileProcessedForOrderedMode(ProcessingNodeHolderPtr holder) void S3QueueFilesMetadata::setFileProcessedForOrderedMode(ProcessingNodeHolderPtr holder)
{
auto processed_node_path = isShardedProcessing()
? zookeeper_processed_path / toString(getProcessingIdForPath(holder->path))
: zookeeper_processed_path;
return setFileProcessedForOrderedModeImpl(holder->path, holder, processed_node_path);
}
void S3QueueFilesMetadata::setFileProcessedForOrderedModeImpl(
const std::string & path, ProcessingNodeHolderPtr holder, const std::string & processed_node_path)
{ {
/// Update a persistent node in /processed and remove ephemeral node from /processing. /// Update a persistent node in /processed and remove ephemeral node from /processing.
const auto & path = holder->path;
const auto node_name = getNodeName(path); const auto node_name = getNodeName(path);
const auto node_metadata = createNodeMetadata(path).toString(); const auto node_metadata = createNodeMetadata(path).toString();
const auto zk_client = getZooKeeper(); const auto zk_client = getZooKeeper();
auto processed_node = isShardedProcessing() LOG_TEST(log, "Setting file `{}` as processed (at {})", path, processed_node_path);
? zookeeper_processed_path / toString(getProcessingIdForPath(path))
: zookeeper_processed_path;
LOG_TEST(log, "Setting file `{}` as processed", path);
while (true) while (true)
{ {
std::string res; std::string res;
Coordination::Stat stat; Coordination::Stat stat;
bool exists = zk_client->tryGet(processed_node, res, &stat); bool exists = zk_client->tryGet(processed_node_path, res, &stat);
Coordination::Requests requests; Coordination::Requests requests;
if (exists) if (exists)
{ {
@ -664,39 +670,41 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedMode(ProcessingNodeHolderPt
auto metadata = NodeMetadata::fromString(res); auto metadata = NodeMetadata::fromString(res);
if (metadata.file_path >= path) if (metadata.file_path >= path)
{ {
/// Here we get in the case that maximum processed file is bigger than ours. LOG_TRACE(log, "File {} is already processed, current max processed file: {}", path, metadata.file_path);
/// This is possible to achieve in case of parallel processing
/// but for local processing we explicitly disable parallel mode and do everything in a single thread
/// (see constructor of StorageS3Queue where s3queue_processing_threads_num is explicitly set to 1 in case of Ordered mode).
/// Nevertheless, in case of distributed processing we cannot do anything with parallelism.
/// What this means?
/// It means that in scenario "distributed processing + Ordered mode"
/// a setting s3queue_loading_retries will not work. It is possible to fix, it is in TODO.
/// Return because there is nothing to change,
/// the max processed file is already bigger than ours.
return; return;
} }
} }
requests.push_back(zkutil::makeSetRequest(processed_node, node_metadata, stat.version)); requests.push_back(zkutil::makeSetRequest(processed_node_path, node_metadata, stat.version));
} }
else else
{ {
requests.push_back(zkutil::makeCreateRequest(processed_node, node_metadata, zkutil::CreateMode::Persistent)); requests.push_back(zkutil::makeCreateRequest(processed_node_path, node_metadata, zkutil::CreateMode::Persistent));
} }
Coordination::Responses responses; Coordination::Responses responses;
if (holder->remove(&requests, &responses)) if (holder)
{ {
LOG_TEST(log, "Moved file `{}` to processed", path); if (holder->remove(&requests, &responses))
if (max_loading_retries) {
zk_client->tryRemove(zookeeper_failed_path / (node_name + ".retriable"), -1); LOG_TEST(log, "Moved file `{}` to processed", path);
return; if (max_loading_retries)
zk_client->tryRemove(zookeeper_failed_path / (node_name + ".retriable"), -1);
return;
}
}
else
{
auto code = zk_client->tryMulti(requests, responses);
if (code == Coordination::Error::ZOK)
return;
} }
/// Failed to update max processed node, retry. /// Failed to update max processed node, retry.
if (!responses.empty() && responses[0]->error != Coordination::Error::ZOK) if (!responses.empty() && responses[0]->error != Coordination::Error::ZOK)
{
LOG_TRACE(log, "Failed to update processed node ({}). Will retry.", magic_enum::enum_name(responses[0]->error));
continue; continue;
}
LOG_WARNING(log, "Cannot set file ({}) as processed since processing node " LOG_WARNING(log, "Cannot set file ({}) as processed since processing node "
"does not exist with expected processing id does not exist, " "does not exist with expected processing id does not exist, "
@ -705,6 +713,22 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedMode(ProcessingNodeHolderPt
} }
} }
void S3QueueFilesMetadata::setFileProcessed(const std::string & path, size_t shard_id)
{
if (mode != S3QueueMode::ORDERED)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Can set file as preprocessed only for Ordered mode");
if (isShardedProcessing())
{
for (const auto & processor : getProcessingIdsForShard(shard_id))
setFileProcessedForOrderedModeImpl(path, nullptr, zookeeper_processed_path / toString(processor));
}
else
{
setFileProcessedForOrderedModeImpl(path, nullptr, zookeeper_processed_path);
}
}
void S3QueueFilesMetadata::setFileFailed(ProcessingNodeHolderPtr holder, const String & exception_message) void S3QueueFilesMetadata::setFileFailed(ProcessingNodeHolderPtr holder, const String & exception_message)
{ {
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileFailedMicroseconds); auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileFailedMicroseconds);

View File

@ -42,6 +42,7 @@ public:
~S3QueueFilesMetadata(); ~S3QueueFilesMetadata();
void setFileProcessed(ProcessingNodeHolderPtr holder); void setFileProcessed(ProcessingNodeHolderPtr holder);
void setFileProcessed(const std::string & path, size_t shard_id);
void setFileFailed(ProcessingNodeHolderPtr holder, const std::string & exception_message); void setFileFailed(ProcessingNodeHolderPtr holder, const std::string & exception_message);
@ -141,6 +142,9 @@ private:
void setFileProcessedForUnorderedMode(ProcessingNodeHolderPtr holder); void setFileProcessedForUnorderedMode(ProcessingNodeHolderPtr holder);
std::string getZooKeeperPathForShard(size_t shard_id) const; std::string getZooKeeperPathForShard(size_t shard_id) const;
void setFileProcessedForOrderedModeImpl(
const std::string & path, ProcessingNodeHolderPtr holder, const std::string & processed_node_path);
enum class SetFileProcessingResult enum class SetFileProcessingResult
{ {
Success, Success,

View File

@ -22,6 +22,7 @@ class ASTStorage;
M(UInt32, s3queue_loading_retries, 0, "Retry loading up to specified number of times", 0) \ M(UInt32, s3queue_loading_retries, 0, "Retry loading up to specified number of times", 0) \
M(UInt32, s3queue_processing_threads_num, 1, "Number of processing threads", 0) \ M(UInt32, s3queue_processing_threads_num, 1, "Number of processing threads", 0) \
M(UInt32, s3queue_enable_logging_to_s3queue_log, 1, "Enable logging to system table system.s3queue_log", 0) \ M(UInt32, s3queue_enable_logging_to_s3queue_log, 1, "Enable logging to system table system.s3queue_log", 0) \
M(String, s3queue_last_processed_path, "", "For Ordered mode. Files that have lexicographically smaller file name are considered already processed", 0) \
M(UInt32, s3queue_tracked_file_ttl_sec, 0, "Maximum number of seconds to store processed files in ZooKeeper node (store forever by default)", 0) \ M(UInt32, s3queue_tracked_file_ttl_sec, 0, "Maximum number of seconds to store processed files in ZooKeeper node (store forever by default)", 0) \
M(UInt32, s3queue_polling_min_timeout_ms, 1000, "Minimal timeout before next polling", 0) \ M(UInt32, s3queue_polling_min_timeout_ms, 1000, "Minimal timeout before next polling", 0) \
M(UInt32, s3queue_polling_max_timeout_ms, 10000, "Maximum timeout before next polling", 0) \ M(UInt32, s3queue_polling_max_timeout_ms, 10000, "Maximum timeout before next polling", 0) \

View File

@ -155,20 +155,20 @@ StorageS3Queue::StorageS3Queue(
LOG_INFO(log, "Using zookeeper path: {}", zk_path.string()); LOG_INFO(log, "Using zookeeper path: {}", zk_path.string());
task = getContext()->getSchedulePool().createTask("S3QueueStreamingTask", [this] { threadFunc(); }); task = getContext()->getSchedulePool().createTask("S3QueueStreamingTask", [this] { threadFunc(); });
/// Get metadata manager from S3QueueMetadataFactory,
/// it will increase the ref count for the metadata object.
/// The ref count is decreased when StorageS3Queue::drop() method is called.
files_metadata = S3QueueMetadataFactory::instance().getOrCreate(zk_path, *s3queue_settings);
try try
{ {
createOrCheckMetadata(storage_metadata); createOrCheckMetadata(storage_metadata);
} }
catch (...) catch (...)
{ {
S3QueueMetadataFactory::instance().remove(zk_path);
throw; throw;
} }
/// Get metadata manager from S3QueueMetadataFactory,
/// it will increase the ref count for the metadata object.
/// The ref count is decreased when StorageS3Queue::drop() method is called.
files_metadata = S3QueueMetadataFactory::instance().getOrCreate(zk_path, *s3queue_settings);
if (files_metadata->isShardedProcessing()) if (files_metadata->isShardedProcessing())
{ {
if (!s3queue_settings->s3queue_current_shard_num.changed) if (!s3queue_settings->s3queue_current_shard_num.changed)
@ -181,6 +181,10 @@ StorageS3Queue::StorageS3Queue(
files_metadata->registerNewShard(s3queue_settings->s3queue_current_shard_num); files_metadata->registerNewShard(s3queue_settings->s3queue_current_shard_num);
} }
} }
if (s3queue_settings->mode == S3QueueMode::ORDERED && !s3queue_settings->s3queue_last_processed_path.value.empty())
{
files_metadata->setFileProcessed(s3queue_settings->s3queue_last_processed_path.value, s3queue_settings->s3queue_current_shard_num);
}
} }
void StorageS3Queue::startup() void StorageS3Queue::startup()

View File

@ -25,6 +25,7 @@
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <IO/Archives/createArchiveReader.h> #include <IO/Archives/createArchiveReader.h>
#include <IO/Archives/IArchiveReader.h> #include <IO/Archives/IArchiveReader.h>
#include <IO/PeekableReadBuffer.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <Formats/ReadSchemaUtils.h> #include <Formats/ReadSchemaUtils.h>

View File

@ -1,6 +1,5 @@
#include <Core/Defines.h> #include <Core/Defines.h>
#include <cstddef>
#include <ranges> #include <ranges>
#include <chrono> #include <chrono>
@ -29,17 +28,14 @@
#include <Storages/AlterCommands.h> #include <Storages/AlterCommands.h>
#include <Storages/ColumnsDescription.h> #include <Storages/ColumnsDescription.h>
#include <Storages/Freeze.h> #include <Storages/Freeze.h>
#include <Storages/MergeTree/AsyncBlockIDsCache.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h> #include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h> #include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/LeaderElection.h> #include <Storages/MergeTree/LeaderElection.h>
#include <Storages/MergeTree/MergeFromLogEntryTask.h> #include <Storages/MergeTree/MergeFromLogEntryTask.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/MergeTreeBackgroundExecutor.h> #include <Storages/MergeTree/MergeTreeBackgroundExecutor.h>
#include <Storages/MergeTree/MergeTreeDataFormatVersion.h> #include <Storages/MergeTree/MergeTreeDataFormatVersion.h>
#include <Storages/MergeTree/MergeTreePartInfo.h> #include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MergeTree/MergeTreePartitionCompatibilityVerifier.h> #include <Storages/MergeTree/MergeTreePartitionCompatibilityVerifier.h>
#include <Storages/MergeTree/MergeTreeReaderCompact.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h> #include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/MutateFromLogEntryTask.h> #include <Storages/MergeTree/MutateFromLogEntryTask.h>
#include <Storages/MergeTree/PinnedPartUUIDs.h> #include <Storages/MergeTree/PinnedPartUUIDs.h>
@ -64,21 +60,16 @@
#include <Parsers/formatAST.h> #include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h> #include <Parsers/parseQuery.h>
#include <Parsers/ASTInsertQuery.h> #include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <Parsers/ASTOptimizeQuery.h>
#include <Parsers/ASTPartition.h> #include <Parsers/ASTPartition.h>
#include <Parsers/ASTLiteral.h> #include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectWithUnionQuery.h> #include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/queryToString.h> #include <Parsers/queryToString.h>
#include <Parsers/ASTCheckQuery.h>
#include <Parsers/ExpressionListParsers.h>
#include <Processors/QueryPlan/QueryPlan.h> #include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/Sources/RemoteSource.h> #include <Processors/Sources/RemoteSource.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h> #include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h> #include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/Sinks/EmptySink.h> #include <Processors/Sinks/EmptySink.h>
#include <Planner/Utils.h> #include <Planner/Utils.h>
@ -106,9 +97,6 @@
#include <Backups/IRestoreCoordination.h> #include <Backups/IRestoreCoordination.h>
#include <Backups/RestorerFromBackup.h> #include <Backups/RestorerFromBackup.h>
#include <Poco/DirectoryIterator.h>
#include <base/scope_guard.h>
#include <Common/scope_guard_safe.h> #include <Common/scope_guard_safe.h>
#include <boost/algorithm/string/join.hpp> #include <boost/algorithm/string/join.hpp>

View File

@ -1,5 +1,6 @@
#include <algorithm> #include <algorithm>
#include <memory> #include <memory>
#include <stack>
#include <Core/NamesAndTypes.h> #include <Core/NamesAndTypes.h>
#include <Core/TypeId.h> #include <Core/TypeId.h>

View File

@ -4,6 +4,7 @@
<server_id>1</server_id> <server_id>1</server_id>
<create_snapshot_on_exit>1</create_snapshot_on_exit> <create_snapshot_on_exit>1</create_snapshot_on_exit>
<digest_enabled>1</digest_enabled>
<coordination_settings> <coordination_settings>
<operation_timeout_ms>10000</operation_timeout_ms> <operation_timeout_ms>10000</operation_timeout_ms>

View File

@ -99,6 +99,7 @@ def started_cluster():
main_configs=[ main_configs=[
"configs/s3queue_log.xml", "configs/s3queue_log.xml",
], ],
stay_alive=True,
) )
logging.info("Starting cluster...") logging.info("Starting cluster...")
@ -539,10 +540,7 @@ def test_multiple_tables_meta_mismatch(started_cluster):
}, },
) )
except QueryRuntimeException as e: except QueryRuntimeException as e:
assert ( assert "Existing table metadata in ZooKeeper differs in engine mode" in str(e)
"Metadata with the same `s3queue_zookeeper_path` was already created but with different settings"
in str(e)
)
failed = True failed = True
assert failed is True assert failed is True
@ -1283,3 +1281,108 @@ def test_settings_check(started_cluster):
) )
node.query(f"DROP TABLE {table_name} SYNC") node.query(f"DROP TABLE {table_name} SYNC")
@pytest.mark.parametrize("processing_threads", [1, 5])
def test_processed_file_setting(started_cluster, processing_threads):
node = started_cluster.instances["instance"]
table_name = f"test_processed_file_setting_{processing_threads}"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 10
create_table(
started_cluster,
node,
table_name,
"ordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": processing_threads,
"s3queue_last_processed_path": f"{files_path}/test_5.csv",
},
)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
)
create_mv(node, table_name, dst_table_name)
def get_count():
return int(node.query(f"SELECT count() FROM {dst_table_name}"))
expected_rows = 4
for _ in range(20):
if expected_rows == get_count():
break
time.sleep(1)
assert expected_rows == get_count()
node.restart_clickhouse()
time.sleep(10)
expected_rows = 4
for _ in range(20):
if expected_rows == get_count():
break
time.sleep(1)
assert expected_rows == get_count()
@pytest.mark.parametrize("processing_threads", [1, 5])
def test_processed_file_setting_distributed(started_cluster, processing_threads):
node = started_cluster.instances["instance"]
node_2 = started_cluster.instances["instance2"]
table_name = f"test_processed_file_setting_distributed_{processing_threads}"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 10
for instance in [node, node_2]:
create_table(
started_cluster,
instance,
table_name,
"ordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": processing_threads,
"s3queue_last_processed_path": f"{files_path}/test_5.csv",
"s3queue_total_shards_num": 2,
},
)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
)
for instance in [node, node_2]:
create_mv(instance, table_name, dst_table_name)
def get_count():
query = f"SELECT count() FROM {dst_table_name}"
return int(node.query(query)) + int(node_2.query(query))
expected_rows = 4
for _ in range(20):
if expected_rows == get_count():
break
time.sleep(1)
assert expected_rows == get_count()
for instance in [node, node_2]:
instance.restart_clickhouse()
time.sleep(10)
expected_rows = 4
for _ in range(20):
if expected_rows == get_count():
break
time.sleep(1)
assert expected_rows == get_count()

View File

@ -19,6 +19,20 @@ select quantilesGK(1000, 100/1000, 200/1000, 250/1000, 314/1000, 777/1000)(numbe
[99,199,249,313,776] [99,199,249,313,776]
select quantilesGK(10000, 100/1000, 200/1000, 250/1000, 314/1000, 777/1000)(number + 1) from numbers(1000); select quantilesGK(10000, 100/1000, 200/1000, 250/1000, 314/1000, 777/1000)(number + 1) from numbers(1000);
[100,200,250,314,777] [100,200,250,314,777]
SELECT quantileGKMerge(100, 0.5)(x)
FROM
(
SELECT quantileGKState(100, 0.5)(number + 1) AS x
FROM numbers(49999)
);
24902
SELECT quantilesGKMerge(100, 0.5, 0.9, 0.99)(x)
FROM
(
SELECT quantilesGKState(100, 0.5, 0.9, 0.99)(number + 1) AS x
FROM numbers(49999)
);
[24902,44518,49999]
select medianGK()(number) from numbers(10) SETTINGS allow_experimental_analyzer = 0; -- { serverError BAD_ARGUMENTS } select medianGK()(number) from numbers(10) SETTINGS allow_experimental_analyzer = 0; -- { serverError BAD_ARGUMENTS }
select medianGK()(number) from numbers(10) SETTINGS allow_experimental_analyzer = 1; -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH } select medianGK()(number) from numbers(10) SETTINGS allow_experimental_analyzer = 1; -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH }
select quantileGK()(number) from numbers(10) SETTINGS allow_experimental_analyzer = 0; -- { serverError BAD_ARGUMENTS } select quantileGK()(number) from numbers(10) SETTINGS allow_experimental_analyzer = 0; -- { serverError BAD_ARGUMENTS }

View File

@ -15,6 +15,19 @@ select quantilesGK(100, 100/1000, 200/1000, 250/1000, 314/1000, 777/1000)(number
select quantilesGK(1000, 100/1000, 200/1000, 250/1000, 314/1000, 777/1000)(number + 1) from numbers(1000); select quantilesGK(1000, 100/1000, 200/1000, 250/1000, 314/1000, 777/1000)(number + 1) from numbers(1000);
select quantilesGK(10000, 100/1000, 200/1000, 250/1000, 314/1000, 777/1000)(number + 1) from numbers(1000); select quantilesGK(10000, 100/1000, 200/1000, 250/1000, 314/1000, 777/1000)(number + 1) from numbers(1000);
SELECT quantileGKMerge(100, 0.5)(x)
FROM
(
SELECT quantileGKState(100, 0.5)(number + 1) AS x
FROM numbers(49999)
);
SELECT quantilesGKMerge(100, 0.5, 0.9, 0.99)(x)
FROM
(
SELECT quantilesGKState(100, 0.5, 0.9, 0.99)(number + 1) AS x
FROM numbers(49999)
);
select medianGK()(number) from numbers(10) SETTINGS allow_experimental_analyzer = 0; -- { serverError BAD_ARGUMENTS } select medianGK()(number) from numbers(10) SETTINGS allow_experimental_analyzer = 0; -- { serverError BAD_ARGUMENTS }
select medianGK()(number) from numbers(10) SETTINGS allow_experimental_analyzer = 1; -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH } select medianGK()(number) from numbers(10) SETTINGS allow_experimental_analyzer = 1; -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH }

View File

@ -1 +1 @@
2024-01-01 Hello World 1

View File

@ -1,6 +1,6 @@
CREATE table if not exists table_with_dot_column (date Date, regular_column String, `other_column.2` String) ENGINE = MergeTree() ORDER BY date; CREATE TABLE IF NOT EXISTS table_with_dot_column (date Date, regular_column String, `other_column.2` String) ENGINE = MergeTree() ORDER BY date;
INSERT INTO table_with_dot_column select '2020-01-01', 'Hello', 'World'; INSERT INTO table_with_dot_column SELECT '2020-01-01', 'Hello', 'World';
INSERT INTO table_with_dot_column select '2024-01-01', 'Hello', 'World'; INSERT INTO table_with_dot_column SELECT toDate(now() + 48*3600), 'Hello', 'World';
CREATE ROW POLICY IF NOT EXISTS row_policy ON table_with_dot_column USING toDate(date) >= today() - 30 TO ALL; CREATE ROW POLICY IF NOT EXISTS row_policy ON table_with_dot_column USING toDate(date) >= today() - 30 TO ALL;
SELECT * FROM table_with_dot_column; SELECT count(*) FROM table_with_dot_column;
DROP TABLE table_with_dot_column; DROP TABLE table_with_dot_column;

View File

@ -1,2 +1,3 @@
990000 990000
990000 990000
10

View File

@ -19,5 +19,9 @@ WITH filtered_groups AS (SELECT a FROM pr_1 WHERE a >= 10000)
SELECT count() FROM pr_2 INNER JOIN filtered_groups ON pr_2.a = filtered_groups.a SELECT count() FROM pr_2 INNER JOIN filtered_groups ON pr_2.a = filtered_groups.a
SETTINGS allow_experimental_analyzer = 0, allow_experimental_parallel_reading_from_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_two_shards', max_parallel_replicas = 3; -- { serverError SUPPORT_IS_DISABLED } SETTINGS allow_experimental_analyzer = 0, allow_experimental_parallel_reading_from_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_two_shards', max_parallel_replicas = 3; -- { serverError SUPPORT_IS_DISABLED }
-- Sanitizer
SELECT count() FROM pr_2 JOIN numbers(10) as pr_1 ON pr_2.a = pr_1.number
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_two_shards', max_parallel_replicas = 3;
DROP TABLE IF EXISTS pr_1; DROP TABLE IF EXISTS pr_1;
DROP TABLE IF EXISTS pr_2; DROP TABLE IF EXISTS pr_2;

View File

@ -59,7 +59,7 @@ int main(int argc, char *argv[])
Poco::Logger::root().setChannel(channel); Poco::Logger::root().setChannel(channel);
Poco::Logger::root().setLevel("trace"); Poco::Logger::root().setLevel("trace");
} }
auto * logger = &Poco::Logger::get("keeper-dumper"); auto logger = getLogger("keeper-dumper");
ResponsesQueue queue(std::numeric_limits<size_t>::max()); ResponsesQueue queue(std::numeric_limits<size_t>::max());
SnapshotsQueue snapshots_queue{1}; SnapshotsQueue snapshots_queue{1};
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>(); CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();